UI: switch to new /rpc endpoint, simplify wasm_bridge

This CL simplifies the way the TypeScript code interacts
with TraceProcessor, reducing diverding code paths and
mainteinance cost.

Before this CL:
Each TraceProcessor method required:
 - A function in TraceProcessor::Rpc to deal with {un,}marshalling
 - A REST endpoint for the --httpd mode
 - Corresponding REST code in http_rpc_engine.ts
 - A C exported function in wasm_bridge.cc
 - Corresponding Wasm glue layer code in wasm_engine_proxy.ts
 - Various abstractions layers in the TS code

After the CL:
There is only one communication endpoint, both for Wasm and RPC.
That endpoint is a very simple byte pipe "here's some bytes for the
request" / "here are some bytes for the response".
The Engine implementation (Wasm or HTTP) is only required to deal
with transporting bytes. It doesn't even need to guarantee that
byte buffers boundaries are atomic (they can be coalesced or
re-fragmented), it just needs to guarantee ordering and delivery.
In essence the transport needs to give the same guarantees of
a TCP stream.
All the remaining logic, including the method {de,}multiplexing is
now centralized.

Bug: 159142289
Change-Id: I62da4aba145b20d1a2c44002024c1a0ddb5b3060
diff --git a/protos/perfetto/trace_processor/trace_processor.proto b/protos/perfetto/trace_processor/trace_processor.proto
index efebc38..b47bd3f 100644
--- a/protos/perfetto/trace_processor/trace_processor.proto
+++ b/protos/perfetto/trace_processor/trace_processor.proto
@@ -54,6 +54,11 @@
   // than one response message, bringing the tx and rq seq our of sync.
   optional int64 seq = 1;
 
+  // This is returned when some unrecoverable error has been detected by the
+  // peer. The typical case is TraceProcessor detecting that the |seq| sequence
+  // is broken (e.g. when having two tabs open with the same --httpd instance).
+  optional string fatal_error = 5;
+
   enum TraceProcessorMethod {
     TPM_UNSPECIFIED = 0;
     TPM_APPEND_TRACE_DATA = 1;
diff --git a/src/trace_processor/python/perfetto/trace_processor/trace_processor.descriptor b/src/trace_processor/python/perfetto/trace_processor/trace_processor.descriptor
index c3672b2..476c0b9 100644
--- a/src/trace_processor/python/perfetto/trace_processor/trace_processor.descriptor
+++ b/src/trace_processor/python/perfetto/trace_processor/trace_processor.descriptor
Binary files differ
diff --git a/src/trace_processor/python/perfetto/trace_processor/trace_processor.descriptor.sha1 b/src/trace_processor/python/perfetto/trace_processor/trace_processor.descriptor.sha1
index 5f5b733..1b19c08 100644
--- a/src/trace_processor/python/perfetto/trace_processor/trace_processor.descriptor.sha1
+++ b/src/trace_processor/python/perfetto/trace_processor/trace_processor.descriptor.sha1
@@ -2,5 +2,5 @@
 // SHA1(tools/gen_binary_descriptors)
 // 9fc6d77de57ec76a80b76aa282f4c7cf5ce55eec
 // SHA1(protos/perfetto/trace_processor/trace_processor.proto)
-// ee875dc7384617e22ecb9e9d4ac03df4bba82252
+// f0ba693bdd1111c81e9240c966e16babd489bb03
   
\ No newline at end of file
diff --git a/src/trace_processor/rpc/httpd.cc b/src/trace_processor/rpc/httpd.cc
index edc4568..da690fc 100644
--- a/src/trace_processor/rpc/httpd.cc
+++ b/src/trace_processor/rpc/httpd.cc
@@ -287,9 +287,6 @@
   std::string allow_origin_hdr =
       "Access-Control-Allow-Origin: " + req.origin.ToStdString();
 
-  std::string tp_session_id_header =
-      "X-TP-Session-ID: " + trace_processor_rpc_.GetSessionId();
-
   // This is the default. Overridden by the /query handler for chunked replies.
   char transfer_encoding_hdr[255] = "Transfer-Encoding: identity";
   std::initializer_list<const char*> headers = {
@@ -297,10 +294,8 @@
       "Cache-Control: no-cache",                         //
       "Keep-Alive: timeout=5, max=1000",                 //
       "Content-Type: application/x-protobuf",            //
-      "Access-Control-Expose-Headers: X-TP-Session-ID",  //
       transfer_encoding_hdr,                             //
       allow_origin_hdr.c_str(),
-      tp_session_id_header.c_str(),
   };
 
   if (req.method == "OPTIONS") {
@@ -328,6 +323,7 @@
       PERFETTO_CHECK(http_client);
       if (data == nullptr) {
         // Unrecoverable RPC error case.
+        http_client->sock->Send("0\r\n\r\n", 5);
         http_client->sock->Shutdown(/*notify=*/true);
         return;
       }
diff --git a/src/trace_processor/rpc/rpc.cc b/src/trace_processor/rpc/rpc.cc
index a0d4a05..17df0be 100644
--- a/src/trace_processor/rpc/rpc.cc
+++ b/src/trace_processor/rpc/rpc.cc
@@ -88,8 +88,7 @@
 }  // namespace
 
 Rpc::Rpc(std::unique_ptr<TraceProcessor> preloaded_instance)
-    : trace_processor_(std::move(preloaded_instance)),
-      session_id_(base::Uuidv4()) {
+    : trace_processor_(std::move(preloaded_instance)) {
   if (!trace_processor_)
     ResetTraceProcessor();
 }
@@ -112,8 +111,13 @@
   for (;;) {
     auto msg = rxbuf_.ReadMessage();
     if (!msg.valid()) {
-      if (msg.fatal_framing_error)
+      if (msg.fatal_framing_error) {
+        protozero::HeapBuffered<TraceProcessorRpcStream> err_msg;
+        err_msg->add_msg()->set_fatal_error("RPC framing error");
+        auto err = err_msg.SerializeAsArray();
+        rpc_response_fn_(err.data(), static_cast<uint32_t>(err.size()));
         rpc_response_fn_(nullptr, 0);  // Disconnect.
+      }
       break;
     }
     ParseRpcRequest(msg.start, msg.len);
@@ -128,9 +132,17 @@
   // We allow restarting the sequence from 0. This happens when refreshing the
   // browser while using the external trace_processor_shell --httpd.
   if (req.seq() != 0 && rx_seq_id_ != 0 && req.seq() != rx_seq_id_ + 1) {
-    PERFETTO_ELOG("RPC request out of order. Expected %" PRId64
-                  ", got %" PRId64,
-                  rx_seq_id_ + 1, req.seq());
+    char err_str[255];
+    // "(ERR:rpc_seq)" is intercepted by error_dialog.ts in the UI.
+    sprintf(err_str,
+            "RPC request out of order. Expected %" PRId64 ", got %" PRId64
+            " (ERR:rpc_seq)",
+            rx_seq_id_ + 1, req.seq());
+    PERFETTO_ELOG("%s", err_str);
+    protozero::HeapBuffered<TraceProcessorRpcStream> err_msg;
+    err_msg->add_msg()->set_fatal_error(err_str);
+    auto err = err_msg.SerializeAsArray();
+    rpc_response_fn_(err.data(), static_cast<uint32_t>(err.size()));
     rpc_response_fn_(nullptr, 0);  // Disconnect.
     return;
   }
@@ -447,7 +459,6 @@
 
 void Rpc::RestoreInitialTables() {
   trace_processor_->RestoreInitialTables();
-  session_id_ = base::Uuidv4();
 }
 
 std::vector<uint8_t> Rpc::ComputeMetric(const uint8_t* args, size_t len) {
@@ -523,9 +534,5 @@
   }
 }
 
-std::string Rpc::GetSessionId() {
-  return session_id_.ToPrettyString();
-}
-
 }  // namespace trace_processor
 }  // namespace perfetto
diff --git a/src/trace_processor/rpc/rpc.h b/src/trace_processor/rpc/rpc.h
index 7bca56d..9006154 100644
--- a/src/trace_processor/rpc/rpc.h
+++ b/src/trace_processor/rpc/rpc.h
@@ -24,7 +24,6 @@
 #include <stddef.h>
 #include <stdint.h>
 
-#include "perfetto/ext/base/uuid.h"
 #include "perfetto/trace_processor/status.h"
 #include "src/protozero/proto_ring_buffer.h"
 
@@ -108,32 +107,11 @@
   void EnableMetatrace();
   std::vector<uint8_t> DisableAndReadMetatrace();
 
-  // Creates a new RPC session by:
-  // a) deleting all tables and views that have been created (by the UI or user)
-  //    after the trace was loaded; built-in tables/view created
-  //    by the ingestion process are preserved.
-  // b) creates a new session id (see |GetSessionId| for more information).
-  //
-  // Historical note: the name of this function is |RestoreInitialTables|
-  // because it was created before the concept of an RPC session was
-  // defined when this function only reset the views. The scope was expanded
-  // rather than just creating a new function to preserve backward compatibility
-  // for clients.
+  // Creates a new RPC session by deleting all tables and views that have been
+  // created (by the UI or user) after the trace was loaded; built-in
+  // tables/view created by the ingestion process are preserved.
   void RestoreInitialTables();
 
-  // Returns the id of the RPC session. This id is an opaque string
-  // which can be used to globally and uniquely identify a particular session of
-  // RPC class.
-  //
-  // A new session is started (and thus a new session id is generated) when
-  // either:
-  // a) a new RPC instance is created
-  // b) |RestoreInitialTables| is called on an existing instance
-  //
-  // This can be used by RPC clients to determine whether they are talking to
-  // same instance they previously used to create tables/views.
-  std::string GetSessionId();
-
   // Runs a query and returns results in batch. Each batch is a proto-encoded
   // TraceProcessor.QueryResult message and contains a variable number of rows.
   // The callbacks are called inline, so the whole callstack looks as follows:
@@ -177,8 +155,6 @@
   int64_t t_parse_started_ = 0;
   size_t bytes_last_progress_ = 0;
   size_t bytes_parsed_ = 0;
-
-  base::Uuid session_id_;
 };
 
 }  // namespace trace_processor
diff --git a/src/trace_processor/rpc/wasm_bridge.cc b/src/trace_processor/rpc/wasm_bridge.cc
index 4d2decf..6e266b8 100644
--- a/src/trace_processor/rpc/wasm_bridge.cc
+++ b/src/trace_processor/rpc/wasm_bridge.cc
@@ -15,8 +15,6 @@
  */
 
 #include <emscripten/emscripten.h>
-#include <map>
-#include <string>
 
 #include "perfetto/base/logging.h"
 #include "perfetto/trace_processor/trace_processor.h"
@@ -25,24 +23,12 @@
 namespace perfetto {
 namespace trace_processor {
 
-using RequestID = uint32_t;
-
-// Reply(): replies to a RPC method invocation.
-// Called asynchronously (i.e. in a separate task) by the C++ code inside the
-// trace processor to return data for a RPC method call.
-// The function is generic and thankfully we need just one for all methods
-// because the output is always a protobuf buffer.
-using ReplyFunction = void (*)(const char* /*proto_reply_data*/,
-                               uint32_t /*len*/);
-
 namespace {
 Rpc* g_trace_processor_rpc;
-ReplyFunction g_reply;
 
 // The buffer used to pass the request arguments. The caller (JS) decides how
 // big this buffer should be in the Initialize() call.
 uint8_t* g_req_buf;
-
 }  // namespace
 
 // +---------------------------------------------------------------------------+
@@ -51,63 +37,26 @@
 extern "C" {
 
 // Returns the address of the allocated request buffer.
-uint8_t* EMSCRIPTEN_KEEPALIVE Initialize(ReplyFunction, uint32_t);
-uint8_t* Initialize(ReplyFunction reply_function, uint32_t req_buffer_size) {
+uint8_t* EMSCRIPTEN_KEEPALIVE trace_processor_rpc_init(Rpc::RpcResponseFunction,
+                                                       uint32_t);
+uint8_t* trace_processor_rpc_init(Rpc::RpcResponseFunction resp_function,
+                                  uint32_t req_buffer_size) {
   g_trace_processor_rpc = new Rpc();
-  g_reply = reply_function;
+
+  // |resp_function| is a JS-bound function passed by wasm_bridge.ts. It will
+  // call back into JavaScript. There the JS code will copy the passed
+  // buffer with the response (a proto-encoded TraceProcessorRpc message) and
+  // postMessage() it to the controller. See the comment in wasm_bridge.ts for
+  // an overview of the JS<>Wasm callstack.
+  g_trace_processor_rpc->SetRpcResponseFunction(resp_function);
+
   g_req_buf = new uint8_t[req_buffer_size];
   return g_req_buf;
 }
 
-// Ingests trace data.
-void EMSCRIPTEN_KEEPALIVE trace_processor_parse(uint32_t);
-void trace_processor_parse(uint32_t size) {
-  // TODO(primiano): Parse() makes a copy of the data, which is unfortunate.
-  // Ideally there should be a way to take the Blob coming from JS and move it.
-  // See https://github.com/WebAssembly/design/issues/1162.
-  auto status = g_trace_processor_rpc->Parse(g_req_buf, size);
-  if (status.ok()) {
-    g_reply("", 0);
-  } else {
-    PERFETTO_FATAL("Fatal failure while parsing the trace: %s",
-                   status.c_message());
-  }
-}
-
-// We keep the same signature as other methods even though we don't take input
-// arguments for simplicity.
-void EMSCRIPTEN_KEEPALIVE trace_processor_notify_eof(uint32_t);
-void trace_processor_notify_eof(uint32_t /* size, not used. */) {
-  g_trace_processor_rpc->NotifyEndOfFile();
-  g_reply("", 0);
-}
-
-void EMSCRIPTEN_KEEPALIVE trace_processor_raw_query(uint32_t);
-void trace_processor_raw_query(uint32_t size) {
-  std::vector<uint8_t> res = g_trace_processor_rpc->RawQuery(g_req_buf, size);
-  g_reply(reinterpret_cast<const char*>(res.data()),
-          static_cast<uint32_t>(res.size()));
-}
-
-void EMSCRIPTEN_KEEPALIVE trace_processor_compute_metric(uint32_t);
-void trace_processor_compute_metric(uint32_t size) {
-  std::vector<uint8_t> res =
-      g_trace_processor_rpc->ComputeMetric(g_req_buf, size);
-  g_reply(reinterpret_cast<const char*>(res.data()),
-          static_cast<uint32_t>(res.size()));
-}
-
-void EMSCRIPTEN_KEEPALIVE trace_processor_enable_metatrace(uint32_t);
-void trace_processor_enable_metatrace(uint32_t) {
-  g_trace_processor_rpc->EnableMetatrace();
-  g_reply("", 0);
-}
-
-void EMSCRIPTEN_KEEPALIVE trace_processor_disable_and_read_metatrace(uint32_t);
-void trace_processor_disable_and_read_metatrace(uint32_t) {
-  std::vector<uint8_t> res = g_trace_processor_rpc->DisableAndReadMetatrace();
-  g_reply(reinterpret_cast<const char*>(res.data()),
-          static_cast<uint32_t>(res.size()));
+void EMSCRIPTEN_KEEPALIVE trace_processor_on_rpc_request(uint32_t);
+void trace_processor_on_rpc_request(uint32_t size) {
+  g_trace_processor_rpc->OnRpcRequest(g_req_buf, size);
 }
 
 }  // extern "C"
@@ -115,13 +64,13 @@
 }  // namespace perfetto
 
 int main(int, char**) {
-  // This is unused but is needed for the following series of reason:
+  // This is unused but is needed for the following reasons:
   // - We need the callMain() Emscripten JS helper function for traceconv (but
   //   not for trace_processor).
   // - Newer versions of emscripten require that callMain is explicitly exported
   //   via EXTRA_EXPORTED_RUNTIME_METHODS = ['callMain'].
   // - We have one set of EXTRA_EXPORTED_RUNTIME_METHODS for both
-  //   trace_processor.wasm (which does not need a main) and traceconv (which
+  //   trace_processor.wasm (which does not need a main()) and traceconv (which
   //   does).
   // - Without this main(), the Wasm bootstrap code will cause a JS error at
   //   runtime when trying to load trace_processor.js.
diff --git a/ui/src/common/engine.ts b/ui/src/common/engine.ts
index 3d387d4..110e429 100644
--- a/ui/src/common/engine.ts
+++ b/ui/src/common/engine.ts
@@ -12,6 +12,11 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
+import {defer, Deferred} from '../base/deferred';
+import {assertExists} from '../base/logging';
+import {perfetto} from '../gen/protos';
+
+import {ProtoRingBuffer} from './proto_ring_buffer';
 import {
   ComputeMetricArgs,
   ComputeMetricResult,
@@ -21,6 +26,10 @@
 import {iter, NUM_NULL, slowlyCountRows, STR} from './query_iterator';
 import {TimeSpan} from './time';
 
+import TraceProcessorRpc = perfetto.protos.TraceProcessorRpc;
+import TraceProcessorRpcStream = perfetto.protos.TraceProcessorRpcStream;
+import TPM = perfetto.protos.TraceProcessorRpc.TraceProcessorMethod;
+
 export interface LoadingTracker {
   beginLoading(): void;
   endLoading(): void;
@@ -36,51 +45,183 @@
 /**
  * Abstract interface of a trace proccessor.
  * This is the TypeScript equivalent of src/trace_processor/rpc.h.
- *
- * Engine also defines helpers for the most common service methods
- * (e.g. query).
+ * There are two concrete implementations:
+ *   1. WasmEngineProxy: creates a Wasm module and interacts over postMessage().
+ *   2. HttpRpcEngine: connects to an external `trace_processor_shell --httpd`.
+ *      and interacts via fetch().
+ * In both cases, we have a byte-oriented pipe to interact with TraceProcessor.
+ * The derived class is only expected to deal with these two functions:
+ * 1. Implement the abstract rpcSendRequestBytes() function, sending the
+ *    proto-encoded TraceProcessorRpc requests to the TraceProcessor instance.
+ * 2. Call onRpcResponseBytes() when response data is received.
  */
 export abstract class Engine {
   abstract readonly id: string;
   private _cpus?: number[];
   private _numGpus?: number;
   private loadingTracker: LoadingTracker;
+  private txSeqId = 0;
+  private rxSeqId = 0;
+  private rxBuf = new ProtoRingBuffer();
+  private pendingParses = new Array<Deferred<void>>();
+  private pendingEOFs = new Array<Deferred<void>>();
+  private pendingRawQueries = new Array<Deferred<RawQueryResult>>();
+  private pendingRestoreTables = new Array<Deferred<void>>();
+  private pendingComputeMetrics = new Array<Deferred<ComputeMetricResult>>();
 
   constructor(tracker?: LoadingTracker) {
     this.loadingTracker = tracker ? tracker : new NullLoadingTracker();
   }
 
   /**
+   * Called to send data to the TraceProcessor instance. This turns into a
+   * postMessage() or a HTTP request, depending on the Engine implementation.
+   */
+  abstract rpcSendRequestBytes(data: Uint8Array): void;
+
+  /**
+   * Called when an inbound message is received by the Engine implementation
+   * (e.g. onmessage for the Wasm case, on when HTTP replies are received for
+   * the HTTP+RPC case).
+   */
+  onRpcResponseBytes(dataWillBeRetained: Uint8Array) {
+    // Note: when hitting the fastpath inside ProtoRingBuffer, the |data| buffer
+    // is returned back by readMessage() (% subarray()-ing it) and held onto by
+    // other classes (e.g., QueryResult). For both fetch() and Wasm we are fine
+    // because every response creates a new buffer.
+    this.rxBuf.append(dataWillBeRetained);
+    for (;;) {
+      const msg = this.rxBuf.readMessage();
+      if (msg === undefined) break;
+      this.onRpcResponseMessage(msg);
+    }
+  }
+
+  /*
+   * Parses a response message.
+   * |rpcMsgEncoded| is a sub-array to to the start of a TraceProcessorRpc
+   * proto-encoded message (without the proto preamble and varint size).
+   */
+  private onRpcResponseMessage(rpcMsgEncoded: Uint8Array) {
+    const rpc = TraceProcessorRpc.decode(rpcMsgEncoded);
+    this.loadingTracker.endLoading();
+
+    if (rpc.fatalError !== undefined && rpc.fatalError.length > 0) {
+      throw new Error(`${rpc.fatalError}`);
+    }
+
+    // Allow restarting sequences from zero (when reloading the browser).
+    if (rpc.seq !== this.rxSeqId + 1 && this.rxSeqId !== 0 && rpc.seq !== 0) {
+      // "(ERR:rpc_seq)" is intercepted by error_dialog.ts to show a more
+      // graceful and actionable error.
+      throw new Error(`RPC sequence id mismatch cur=${rpc.seq} last=${
+          this.rxSeqId} (ERR:rpc_seq)`);
+    }
+
+    this.rxSeqId = rpc.seq;
+
+    switch (rpc.response) {
+      case TPM.TPM_APPEND_TRACE_DATA:
+        const appendResult = assertExists(rpc.appendResult);
+        const pendingPromise = assertExists(this.pendingParses.shift());
+        if (appendResult.error && appendResult.error.length > 0) {
+          pendingPromise.reject(appendResult.error);
+        } else {
+          pendingPromise.resolve();
+        }
+        break;
+      case TPM.TPM_FINALIZE_TRACE_DATA:
+        assertExists(this.pendingEOFs.shift()).resolve();
+        break;
+      case TPM.TPM_RESTORE_INITIAL_TABLES:
+        assertExists(this.pendingRestoreTables.shift()).resolve();
+        break;
+      case TPM.TPM_QUERY_STREAMING:
+        // TODO(primiano): In the next CLs wire up the streaming query decoder.
+        break;
+      case TPM.TPM_QUERY_RAW_DEPRECATED:
+        const queryRes = assertExists(rpc.rawQueryResult) as RawQueryResult;
+        assertExists(this.pendingRawQueries.shift()).resolve(queryRes);
+        break;
+      case TPM.TPM_COMPUTE_METRIC:
+        const metricRes = assertExists(rpc.metricResult) as ComputeMetricResult;
+        if (metricRes.error && metricRes.error.length > 0) {
+          throw new QueryError(`ComputeMetric() error: ${metricRes.error}`);
+        }
+        assertExists(this.pendingComputeMetrics.shift()).resolve(metricRes);
+        break;
+      default:
+        console.log(
+            'Unexpected TraceProcessor response received: ', rpc.response);
+        break;
+    }  // switch(rpc.response);
+  }
+
+  /**
+   * TraceProcessor methods below this point.
+   * The methods below are called by the various controllers in the UI and
+   * deal with marshalling / unmarshaling requests to/from TraceProcessor.
+   */
+
+
+  /**
    * Push trace data into the engine. The engine is supposed to automatically
    * figure out the type of the trace (JSON vs Protobuf).
    */
-  abstract parse(data: Uint8Array): Promise<void>;
+  parse(data: Uint8Array): Promise<void> {
+    const asyncRes = defer<void>();
+    this.pendingParses.push(asyncRes);
+    const rpc = TraceProcessorRpc.create();
+    rpc.request = TPM.TPM_APPEND_TRACE_DATA;
+    rpc.appendTraceData = data;
+    this.rpcSendRequest(rpc);
+    return asyncRes;  // Linearize with the worker.
+  }
 
   /**
-   * Notify the engine no more data is coming.
+   * Notify the engine that we reached the end of the trace.
+   * Called after the last parse() call.
    */
-  abstract notifyEof(): void;
+  notifyEof(): Promise<void> {
+    const asyncRes = defer<void>();
+    this.pendingEOFs.push(asyncRes);
+    const rpc = TraceProcessorRpc.create();
+    rpc.request = TPM.TPM_FINALIZE_TRACE_DATA;
+    this.rpcSendRequest(rpc);
+    return asyncRes;  // Linearize with the worker.
+  }
 
   /**
    * Resets the trace processor state by destroying any table/views created by
    * the UI after loading.
    */
-  abstract restoreInitialTables(): void;
-
-  /*
-   * Performs a SQL query and retruns a proto-encoded RawQueryResult object.
-   */
-  abstract rawQuery(rawQueryArgs: Uint8Array): Promise<Uint8Array>;
-
-  /*
-   * Performs computation of metrics and returns metric result and any errors.
-   * Metric result is a proto binary or text encoded TraceMetrics object.
-   */
-  abstract rawComputeMetric(computeMetricArgs: Uint8Array): Promise<Uint8Array>;
+  restoreInitialTables(): Promise<void> {
+    const asyncRes = defer<void>();
+    this.pendingRestoreTables.push(asyncRes);
+    const rpc = TraceProcessorRpc.create();
+    rpc.request = TPM.TPM_RESTORE_INITIAL_TABLES;
+    this.rpcSendRequest(rpc);
+    return asyncRes;  // Linearize with the worker.
+  }
 
   /**
-   * Shorthand for sending a SQL query to the engine.
-   * Deals with {,un}marshalling of request/response args.
+   * Shorthand for sending a compute metrics request to the engine.
+   */
+  async computeMetric(metrics: string[]): Promise<ComputeMetricResult> {
+    const asyncRes = defer<ComputeMetricResult>();
+    this.pendingComputeMetrics.push(asyncRes);
+    const rpc = TraceProcessorRpc.create();
+    rpc.request = TPM.TPM_COMPUTE_METRIC;
+    const args = rpc.computeMetricArgs = new ComputeMetricArgs();
+    args.metricNames = metrics;
+    args.format = ComputeMetricArgs.ResultFormat.TEXTPROTO;
+    this.rpcSendRequest(rpc);
+    return asyncRes;
+  }
+
+  /**
+   * Runs a SQL query and throws if the query failed.
+   * Queries performed by the controller logic should use this.
    */
   async query(sqlQuery: string): Promise<RawQueryResult> {
     const result = await this.uncheckedQuery(sqlQuery);
@@ -90,38 +231,21 @@
     return result;
   }
 
-  // This method is for noncritical queries that shouldn't throw an error
-  // on failure. The caller must handle the failure.
-  async uncheckedQuery(sqlQuery: string): Promise<RawQueryResult> {
-    this.loadingTracker.beginLoading();
-    try {
-      const args = new RawQueryArgs();
-      args.sqlQuery = sqlQuery;
-      args.timeQueuedNs = Math.floor(performance.now() * 1e6);
-      const argsEncoded = RawQueryArgs.encode(args).finish();
-      const respEncoded = await this.rawQuery(argsEncoded);
-      const result = RawQueryResult.decode(respEncoded);
-      return result;
-    } finally {
-      this.loadingTracker.endLoading();
-    }
-  }
-
   /**
-   * Shorthand for sending a compute metrics request to the engine.
-   * Deals with {,un}marshalling of request/response args.
+   * Runs a SQL query. Does not throw if the query fails.
+   * The caller must handle this failure. This is so this function can be safely
+   * used for user-typed SQL.
    */
-  async computeMetric(metrics: string[]): Promise<ComputeMetricResult> {
-    const args = new ComputeMetricArgs();
-    args.metricNames = metrics;
-    args.format = ComputeMetricArgs.ResultFormat.TEXTPROTO;
-    const argsEncoded = ComputeMetricArgs.encode(args).finish();
-    const respEncoded = await this.rawComputeMetric(argsEncoded);
-    const result = ComputeMetricResult.decode(respEncoded);
-    if (result.error.length > 0) {
-      throw new QueryError(result.error);
-    }
-    return result;
+  uncheckedQuery(sqlQuery: string): Promise<RawQueryResult> {
+    const asyncRes = defer<RawQueryResult>();
+    this.pendingRawQueries.push(asyncRes);
+    const rpc = TraceProcessorRpc.create();
+    rpc.request = TPM.TPM_QUERY_RAW_DEPRECATED;
+    rpc.rawQueryArgs = new RawQueryArgs();
+    rpc.rawQueryArgs.sqlQuery = sqlQuery;
+    rpc.rawQueryArgs.timeQueuedNs = Math.floor(performance.now() * 1e6);
+    this.rpcSendRequest(rpc);
+    return asyncRes;
   }
 
   async queryOneRow(query: string): Promise<number[]> {
@@ -142,6 +266,21 @@
     return res;
   }
 
+  /**
+   * Marshals the TraceProcessorRpc request arguments and sends the request
+   * to the concrete Engine (Wasm or HTTP).
+   */
+  private rpcSendRequest(rpc: TraceProcessorRpc) {
+    rpc.seq = this.txSeqId++;
+    // Each message is wrapped in a TraceProcessorRpcStream to add the varint
+    // preamble with the size, which allows tokenization on the other end.
+    const outerProto = TraceProcessorRpcStream.create();
+    outerProto.msg.push(rpc);
+    const buf = TraceProcessorRpcStream.encode(outerProto).finish();
+    this.loadingTracker.beginLoading();
+    this.rpcSendRequestBytes(buf);
+  }
+
   // TODO(hjd): When streaming must invalidate this somehow.
   async getCpus(): Promise<number[]> {
     if (!this._cpus) {
diff --git a/ui/src/common/http_rpc_engine.ts b/ui/src/common/http_rpc_engine.ts
index 0d50b42..498d086 100644
--- a/ui/src/common/http_rpc_engine.ts
+++ b/ui/src/common/http_rpc_engine.ts
@@ -12,9 +12,8 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-import {defer, Deferred} from '../base/deferred';
 import {fetchWithTimeout} from '../base/http_utils';
-import {assertExists, assertTrue} from '../base/logging';
+import {assertTrue} from '../base/logging';
 import {StatusResult} from '../common/protos';
 
 import {Engine, LoadingTracker} from './engine';
@@ -28,19 +27,10 @@
   failure?: string;
 }
 
-interface QueuedRequest {
-  methodName: string;
-  reqData?: Uint8Array;
-  resp: Deferred<Uint8Array>;
-  id: number;
-}
-
 export class HttpRpcEngine extends Engine {
   readonly id: string;
-  private nextReqId = 0;
-  private sessionId?: string = undefined;
-  private requestQueue = new Array<QueuedRequest>();
-  private pendingRequest?: QueuedRequest = undefined;
+  private requestQueue = new Array<Uint8Array>();
+  private requestPending = false;
   errorHandler: (err: string) => void = () => {};
 
   constructor(id: string, loadingTracker?: LoadingTracker) {
@@ -48,50 +38,17 @@
     this.id = id;
   }
 
-  async parse(data: Uint8Array): Promise<void> {
-    await this.enqueueRequest('parse', data);
-  }
-
-  async notifyEof(): Promise<void> {
-    await this.enqueueRequest('notify_eof');
-  }
-
-  async restoreInitialTables(): Promise<void> {
-    await this.enqueueRequest('restore_initial_tables');
-  }
-
-  rawQuery(rawQueryArgs: Uint8Array): Promise<Uint8Array> {
-    return this.enqueueRequest('raw_query', rawQueryArgs);
-  }
-
-  rawComputeMetric(rawComputeMetricArgs: Uint8Array): Promise<Uint8Array> {
-    return this.enqueueRequest('compute_metric', rawComputeMetricArgs);
-  }
-
-  async enableMetatrace(): Promise<void> {
-    await this.enqueueRequest('enable_metatrace');
-  }
-
-  disableAndReadMetatrace(): Promise<Uint8Array> {
-    return this.enqueueRequest('disable_and_read_metatrace');
-  }
-
-  enqueueRequest(methodName: string, data?: Uint8Array): Promise<Uint8Array> {
-    const resp = defer<Uint8Array>();
-    const req:
-        QueuedRequest = {methodName, reqData: data, resp, id: this.nextReqId++};
-    if (this.pendingRequest === undefined) {
-      this.beginFetch(req);
+  rpcSendRequestBytes(data: Uint8Array): void {
+    if (!this.requestPending && this.requestQueue.length === 0) {
+      this.beginFetch(data);
     } else {
-      this.requestQueue.push(req);
+      this.requestQueue.push(data);
     }
-    return resp;
   }
 
-  private beginFetch(req: QueuedRequest) {
-    assertTrue(this.pendingRequest === undefined);
-    this.pendingRequest = req;
-    const methodName = req.methodName.toLowerCase();
+  private beginFetch(data: Uint8Array) {
+    assertTrue(!this.requestPending);
+    this.requestPending = true;
     // Deliberately not using fetchWithTimeout() here. These queries can be
     // arbitrarily long.
     // Deliberately not setting cache: no-cache. Doing so invalidates also the
@@ -99,69 +56,33 @@
     // no-cache is also useless because trace-processor's replies are already
     // marked as no-cache and browsers generally already assume that POST
     // requests are not idempotent.
-    fetch(RPC_URL + methodName, {
+    fetch(RPC_URL + 'rpc', {
       method: 'post',
-      headers: {
-        'Content-Type': 'application/x-protobuf',
-        'X-Seq-Id': `${req.id}`,  // Used only for debugging.
-      },
-      body: req.reqData || new Uint8Array(),
+      headers: {'Content-Type': 'application/x-protobuf'},
+      body: data,
     })
-        .then(resp => this.endFetch(resp, req.id))
+        .then(resp => this.endFetch(resp))
         .catch(err => this.errorHandler(err));
   }
 
-  private endFetch(resp: Response, expectedReqId: number) {
-    const req = assertExists(this.pendingRequest);
-    this.pendingRequest = undefined;
-    assertTrue(expectedReqId === req.id);
+  private endFetch(resp: Response) {
+    assertTrue(this.requestPending);
     if (resp.status !== 200) {
-      req.resp.reject(`HTTP ${resp.status} - ${resp.statusText}`);
-      return;
+      throw new Error(`HTTP ${resp.status} - ${resp.statusText}`);
     }
-
-    if (req.methodName === 'restore_initial_tables') {
-      // restore_initial_tables resets the trace processor session id
-      // so make sure to also reset on our end for future queries.
-      this.sessionId = undefined;
-    } else {
-      const sessionId = resp.headers.get('X-TP-Session-ID') || undefined;
-      if (this.sessionId !== undefined && sessionId !== this.sessionId) {
-        req.resp.reject(
-            `The trace processor HTTP session does not match the initally seen
-             ID.
-
-             This can happen when using a HTTP trace processor instance and
-             either accidentally sharing this between multiple tabs or
-             restarting the trace processor while still in use by UI.
-
-             Please refresh this tab and ensure that trace processor is used by
-             at most one tab at a time.
-
-             Technical details:
-             Expected session id: ${this.sessionId}
-             Actual session id: ${sessionId}`);
-        return;
-      }
-      this.sessionId = sessionId;
-    }
-
     resp.arrayBuffer().then(arrBuf => {
       // Note: another request can sneak in via enqueueRequest() between the
       // arrayBuffer() call and this continuation. At this point
       // this.pendingRequest might be set again.
       // If not (the most common case) submit the next queued request, if any.
-      this.maybeSubmitNextQueuedRequest();
-      req.resp.resolve(new Uint8Array(arrBuf));
+      this.requestPending = false;
+      if (this.requestQueue.length > 0) {
+        this.beginFetch(this.requestQueue.shift()!);
+      }
+      super.onRpcResponseBytes(new Uint8Array(arrBuf));
     });
   }
 
-  private maybeSubmitNextQueuedRequest() {
-    if (this.pendingRequest === undefined && this.requestQueue.length > 0) {
-      this.beginFetch(this.requestQueue.shift()!);
-    }
-  }
-
   static async checkConnection(): Promise<HttpRpcState> {
     const httpRpcState: HttpRpcState = {connected: false};
     console.info(
diff --git a/ui/src/common/wasm_engine_proxy.ts b/ui/src/common/wasm_engine_proxy.ts
index 43f7b85..aabcc12 100644
--- a/ui/src/common/wasm_engine_proxy.ts
+++ b/ui/src/common/wasm_engine_proxy.ts
@@ -12,10 +12,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-import {defer, Deferred} from '../base/deferred';
 import {assertTrue} from '../base/logging';
-import {WasmBridgeRequest, WasmBridgeResponse} from '../engine/wasm_bridge';
-
 import {Engine, LoadingTracker} from './engine';
 
 const activeWorkers = new Map<string, Worker>();
@@ -63,11 +60,6 @@
   warmWorker = createWorker();
 }
 
-interface PendingRequest {
-  id: number;
-  respHandler: Deferred<Uint8Array>;
-}
-
 /**
  * This implementation of Engine uses a WASM backend hosted in a separate
  * worker thread.
@@ -75,8 +67,6 @@
 export class WasmEngineProxy extends Engine {
   readonly id: string;
   private readonly worker: Worker;
-  private pendingRequests = new Array<PendingRequest>();
-  private nextRequestId = 0;
 
   constructor(id: string, worker: Worker, loadingTracker?: LoadingTracker) {
     super(loadingTracker);
@@ -85,72 +75,15 @@
     this.worker.onmessage = this.onMessage.bind(this);
   }
 
-  async parse(reqData: Uint8Array): Promise<void> {
-    // We don't care about the response data (the method is actually a void). We
-    // just want to linearize and wait for the call to have been completed on
-    // the worker.
-    await this.queueRequest('trace_processor_parse', reqData);
-  }
-
-  async notifyEof(): Promise<void> {
-    // We don't care about the response data (the method is actually a void). We
-    // just want to linearize and wait for the call to have been completed on
-    // the worker.
-    await this.queueRequest('trace_processor_notify_eof', new Uint8Array());
-  }
-
-  restoreInitialTables(): Promise<void> {
-    // We should never get here, restoreInitialTables() should be called only
-    // when using the HttpRpcEngine.
-    throw new Error('restoreInitialTables() not supported by the WASM engine');
-  }
-
-  rawQuery(rawQueryArgs: Uint8Array): Promise<Uint8Array> {
-    return this.queueRequest('trace_processor_raw_query', rawQueryArgs);
-  }
-
-  rawComputeMetric(rawComputeMetric: Uint8Array): Promise<Uint8Array> {
-    return this.queueRequest(
-        'trace_processor_compute_metric', rawComputeMetric);
-  }
-
-  async enableMetatrace(): Promise<void> {
-    await this.queueRequest(
-        'trace_processor_enable_metatrace', new Uint8Array());
-  }
-
-  disableAndReadMetatrace(): Promise<Uint8Array> {
-    return this.queueRequest(
-        'trace_processor_disable_and_read_metatrace', new Uint8Array());
-  }
-
-  // Enqueues a request to the worker queue via postMessage(). The returned
-  // promised will be resolved once the worker replies to the postMessage()
-  // with the paylad of the response, a proto-encoded object which wraps the
-  // method return value (e.g., RawQueryResult for SQL query results).
-  private queueRequest(methodName: string, reqData: Uint8Array):
-      Deferred<Uint8Array> {
-    const respHandler = defer<Uint8Array>();
-    const id = this.nextRequestId++;
-    const request: WasmBridgeRequest = {id, methodName, data: reqData};
-    this.pendingRequests.push({id, respHandler});
-    this.worker.postMessage(request);
-    return respHandler;
-  }
-
   onMessage(m: MessageEvent) {
-    const response = m.data as WasmBridgeResponse;
-    assertTrue(this.pendingRequests.length > 0);
-    const request = this.pendingRequests.shift()!;
+    assertTrue(m.data instanceof Uint8Array);
+    super.onRpcResponseBytes(m.data as Uint8Array);
+  }
 
-    // Requests should be executed and ACKed by the worker in the same order
-    // they came in.
-    assertTrue(request.id === response.id);
-
-    // If the Wasm call fails (e.g. hits a PERFETTO_CHECK) it will throw an
-    // error in wasm_bridge.ts and show the crash dialog. In no case we can
-    // gracefully handle a Wasm crash, so we fail fast there rather than
-    // propagating the error here rejecting the promise.
-    request.respHandler.resolve(response.data);
+  rpcSendRequestBytes(data: Uint8Array): void {
+    // We deliberately don't use a transfer list because protobufjs reuses the
+    // same buffer when encoding messages (which is good, because creating a new
+    // TypedArray for each decode operation would be too expensive).
+    this.worker.postMessage(data);
   }
 }
diff --git a/ui/src/engine/index.ts b/ui/src/engine/index.ts
index 7c4916b..3a4020d 100644
--- a/ui/src/engine/index.ts
+++ b/ui/src/engine/index.ts
@@ -12,25 +12,25 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
+import {assertTrue} from '../base/logging';
 import * as init_trace_processor from '../gen/trace_processor';
 
-import {WasmBridge, WasmBridgeRequest} from './wasm_bridge';
+import {WasmBridge} from './wasm_bridge';
 
-// tslint:disable no-any
-// Proxy all messages to WasmBridge#callWasm.
-const anySelf = (self as any);
+const selfWorker = self as {} as Worker;
 
-// Messages can arrive before we are initialized, queue these for later.
+// Messages can arrive before the Wasm module initialization is complete.
+// Queue these for later.
 const msgQueue: MessageEvent[] = [];
-anySelf.onmessage = (msg: MessageEvent) => {
+selfWorker.onmessage = (msg: MessageEvent) => {
   msgQueue.push(msg);
 };
 
-const bridge = new WasmBridge(init_trace_processor);
+const bridge = new WasmBridge(init_trace_processor, selfWorker);
 bridge.whenInitialized.then(() => {
   const handleMsg = (msg: MessageEvent) => {
-    const request: WasmBridgeRequest = msg.data;
-    anySelf.postMessage(bridge.callWasm(request));
+    assertTrue(msg.data instanceof Uint8Array);
+    bridge.onRpcDataReceived(msg.data as Uint8Array);
   };
 
   // Dispatch queued messages.
@@ -39,5 +39,5 @@
     handleMsg(msg);
   }
 
-  anySelf.onmessage = handleMsg;
+  selfWorker.onmessage = handleMsg;
 });
diff --git a/ui/src/engine/wasm_bridge.ts b/ui/src/engine/wasm_bridge.ts
index fe896b1..92813e2 100644
--- a/ui/src/engine/wasm_bridge.ts
+++ b/ui/src/engine/wasm_bridge.ts
@@ -13,7 +13,6 @@
 // limitations under the License.
 
 import {defer} from '../base/deferred';
-import {assertExists, assertTrue} from '../base/logging';
 import * as init_trace_processor from '../gen/trace_processor';
 
 // The Initialize() call will allocate a buffer of REQ_BUF_SIZE bytes which
@@ -23,31 +22,34 @@
 // HEAPU8[reqBufferAddr, +REQ_BUFFER_SIZE].
 const REQ_BUF_SIZE = 32 * 1024 * 1024;
 
-export interface WasmBridgeRequest {
-  id: number;
-  methodName: string;
-  data: Uint8Array;
+// The common denominator between Worker and MessageChannel. Used to send
+// messages, regardless of Worker vs a dedicated MessagePort.
+export interface PostMessageChannel {
+  postMessage(message: Uint8Array, transfer: Transferable[]): void;
 }
 
-export interface WasmBridgeResponse {
-  id: number;
-  data: Uint8Array;
-}
-
+// The end-to-end interaction between JS and Wasm is as follows:
+// - [JS] Inbound data received by the worker (onmessage() in engine/index.ts).
+//   - [JS] onRpcDataReceived() (this file)
+//     - [C++] trace_processor_on_rpc_request (wasm_bridge.cc)
+//       - [C++] some TraceProcessor::method()
+//         for (batch in result_rows)
+//           - [C++] RpcResponseFunction(bytes) (wasm_bridge.cc)
+//             - [JS] onReply() (this file)
+//               - [JS] postMessage() (this file)
 export class WasmBridge {
   // When this promise has resolved it is safe to call callWasm.
   whenInitialized: Promise<void>;
 
   private aborted: boolean;
-  private currentRequestResult: WasmBridgeResponse|null;
   private connection: init_trace_processor.Module;
   private reqBufferAddr = 0;
   private lastStderr: string[] = [];
+  private postMessageChannel: PostMessageChannel;
 
-  constructor(init: init_trace_processor.InitWasm) {
+  constructor(init: init_trace_processor.InitWasm, chan: PostMessageChannel) {
     this.aborted = false;
-    this.currentRequestResult = null;
-
+    this.postMessageChannel = chan;
     const deferredRuntimeInitialized = defer<void>();
     this.connection = init({
       locateFile: (s: string) => s,
@@ -58,49 +60,50 @@
     this.whenInitialized = deferredRuntimeInitialized.then(() => {
       const fn = this.connection.addFunction(this.onReply.bind(this), 'vii');
       this.reqBufferAddr = this.connection.ccall(
-          'Initialize',
+          'trace_processor_rpc_init',
           /*return=*/ 'number',
           /*args=*/['number', 'number'],
           [fn, REQ_BUF_SIZE]);
     });
   }
 
-  callWasm(req: WasmBridgeRequest): WasmBridgeResponse {
+  onRpcDataReceived(data: Uint8Array) {
     if (this.aborted) {
       throw new Error('Wasm module crashed');
     }
-    assertTrue(req.data.length <= REQ_BUF_SIZE);
-    const endAddr = this.reqBufferAddr + req.data.length;
-    this.connection.HEAPU8.subarray(this.reqBufferAddr, endAddr).set(req.data);
-    try {
-      this.connection.ccall(
-          req.methodName,    // C method name.
-          'void',            // Return type.
-          ['number'],        // Arg types.
-          [req.data.length]  // Args.
-      );
-      const result = assertExists(this.currentRequestResult);
-      this.currentRequestResult = null;
-      result.id = req.id;
-      return result;
-    } catch (err) {
-      this.aborted = true;
-      let abortReason = `${err}`;
-      if (err instanceof Error) {
-        abortReason = `${err.name}: ${err.message}\n${err.stack}`;
+    let wrSize = 0;
+    // If the request data is larger than our JS<>Wasm interop buffer, split it
+    // into multiple writes. The RPC channel is byte-oriented and is designed to
+    // deal with arbitrary fragmentations.
+    while (wrSize < data.length) {
+      const sliceLen = Math.min(data.length - wrSize, REQ_BUF_SIZE);
+      const dataSlice = data.subarray(wrSize, wrSize + sliceLen);
+      this.connection.HEAPU8.set(dataSlice, this.reqBufferAddr);
+      wrSize += sliceLen;
+      try {
+        this.connection.ccall(
+            'trace_processor_on_rpc_request',  // C function name.
+            'void',                            // Return type.
+            ['number'],                        // Arg types.
+            [sliceLen]                         // Args.
+        );
+      } catch (err) {
+        this.aborted = true;
+        let abortReason = `${err}`;
+        if (err instanceof Error) {
+          abortReason = `${err.name}: ${err.message}\n${err.stack}`;
+        }
+        abortReason += '\n\nstderr: \n' + this.lastStderr.join('\n');
+        throw new Error(abortReason);
       }
-      abortReason += '\n\nstderr: \n' + this.lastStderr.join('\n');
-      throw new Error(abortReason);
-    }
+    }  // while(wrSize < data.length)
   }
 
-  // This is invoked from ccall in the same call stack as callWasm.
+  // This function is bound and passed to Initialize and is called by the C++
+  // code while in the ccall(trace_processor_on_rpc_request).
   private onReply(heapPtr: number, size: number) {
     const data = this.connection.HEAPU8.slice(heapPtr, heapPtr + size);
-    this.currentRequestResult = {
-      id: 0,  // Will be set by callWasm()'s epilogue.
-      data,
-    };
+    this.postMessageChannel.postMessage(data, [data.buffer]);
   }
 
   private appendAndLogErr(line: string) {
diff --git a/ui/src/frontend/error_dialog.ts b/ui/src/frontend/error_dialog.ts
index 9fba9a0..aa763bd 100644
--- a/ui/src/frontend/error_dialog.ts
+++ b/ui/src/frontend/error_dialog.ts
@@ -52,6 +52,11 @@
     return;
   }
 
+  if (errLog.includes('(ERR:rpc_seq)')) {
+    showRpcSequencingError();
+    return;
+  }
+
   if (timeLastReport > 0 && now - timeLastReport <= MIN_REPORT_PERIOD_MS) {
     queuedErrors.unshift(errLog);
     if (queuedErrors.length > ERR_QUEUE_MAX_LEN) queuedErrors.pop();
@@ -250,3 +255,19 @@
     buttons: []
   });
 }
+
+function showRpcSequencingError() {
+  showModal({
+    title: 'A TraceProcessor RPC error occurred',
+    content: m(
+        'div',
+        m('p', 'The trace processor RPC sequence ID was broken'),
+        m('p', `This can happen when using a HTTP trace processor instance and
+either accidentally sharing this between multiple tabs or
+restarting the trace processor while still in use by UI.`),
+        m('p', `Please refresh this tab and ensure that trace processor is used
+at most one tab at a time.`),
+        ),
+    buttons: []
+  });
+}