TraceProcessor RPC: add binary-pipe interface

This CL introduces support for a bidirectional channel with a
remote TraceProcessor instance.
All it needs is a byte-oriented pipe (e.g., a TCP socket, a
pipe(2) between two processes or a postmessage channel in the
JS+Wasm case).
The messages exchanged on these pipes are TraceProcessorRpc
protos (defined in trace_processor.proto).

Bug: 159142289
Change-Id: I5e802e81c01a467a5969883ad5f63a30906a6e05
diff --git a/protos/perfetto/trace_processor/trace_processor.proto b/protos/perfetto/trace_processor/trace_processor.proto
index f32745b..efebc38 100644
--- a/protos/perfetto/trace_processor/trace_processor.proto
+++ b/protos/perfetto/trace_processor/trace_processor.proto
@@ -35,6 +35,94 @@
 //    In this case these messages are used to {,un}marshall HTTP requests and
 //    response made through src/trace_processor/rpc/httpd.cc .
 
+// At lowest level, the wire-format of the RPC procol is a linear sequence of
+// TraceProcessorRpc messages on each side of the byte pipe
+// Each message is prefixed by a tag (field = 1, type = length delimited) and a
+// varint encoding its size (this is so the whole stream can also be read /
+// written as if it was a repeated field of TraceProcessorRpcStream).
+
+message TraceProcessorRpcStream {
+  repeated TraceProcessorRpc msg = 1;
+}
+
+message TraceProcessorRpc {
+  // A monotonic counter used only for debugging purposes, to detect if the
+  // underlying stream is missing or duping data. The counter starts at 0 on
+  // each side of the pipe and is incremented on each message.
+  // Do NOT expect that a response has the same |seq| of its corresponding
+  // request: some requests (e.g., a query returning many rows) can yield more
+  // than one response message, bringing the tx and rq seq our of sync.
+  optional int64 seq = 1;
+
+  enum TraceProcessorMethod {
+    TPM_UNSPECIFIED = 0;
+    TPM_APPEND_TRACE_DATA = 1;
+    TPM_FINALIZE_TRACE_DATA = 2;
+    TPM_QUERY_STREAMING = 3;
+    TPM_QUERY_RAW_DEPRECATED = 4;
+    TPM_COMPUTE_METRIC = 5;
+    TPM_GET_METRIC_DESCRIPTORS = 6;
+    TPM_RESTORE_INITIAL_TABLES = 7;
+    TPM_ENABLE_METATRACE = 8;
+    TPM_DISABLE_AND_READ_METATRACE = 9;
+  }
+
+  oneof type {
+    // Client -> TraceProcessor requests.
+    TraceProcessorMethod request = 2;
+
+    // TraceProcessor -> Client responses.
+    TraceProcessorMethod response = 3;
+
+    // This is sent back instead of filling |response| when the client sends a
+    // |request| which is not known by the TraceProcessor service. This can
+    // happen when the client is newer than the service.
+    TraceProcessorMethod invalid_request = 4;
+  }
+
+  // Request/Response arguments.
+  // Not all requests / responses require an argument.
+
+  oneof args {
+    // TraceProcessorMethod request args.
+
+    // For TPM_APPEND_TRACE_DATA.
+    bytes append_trace_data = 101;
+    // For TPM_QUERY_STREAMING.
+    QueryArgs query_args = 103;
+    // For TPM_QUERY_RAW_DEPRECATED.
+    RawQueryArgs raw_query_args = 104;
+    // For TPM_COMPUTE_METRIC.
+    ComputeMetricArgs compute_metric_args = 105;
+
+    // TraceProcessorMethod response args.
+    // For TPM_APPEND_TRACE_DATA.
+    AppendTraceDataResult append_result = 201;
+    // For TPM_QUERY_STREAMING.
+    QueryResult query_result = 203;
+    // For TPM_QUERY_RAW_DEPRECATED.
+    RawQueryResult raw_query_result = 204;
+    // For TPM_COMPUTE_METRIC.
+    ComputeMetricResult metric_result = 205;
+    // For TPM_GET_METRIC_DESCRIPTORS.
+    DescriptorSet metric_descriptors = 206;
+    // For TPM_DISABLE_AND_READ_METATRACE.
+    DisableAndReadMetatraceResult metatrace = 209;
+  }
+}
+
+message AppendTraceDataResult {
+  optional int64 total_bytes_parsed = 1;
+  optional string error = 2;
+}
+
+message QueryArgs {
+  optional string sql_query = 1;
+
+  // Wall time when the query was queued. Used only for query stats.
+  optional uint64 time_queued_ns = 2;
+}
+
 // Input for the /raw_query endpoint.
 message RawQueryArgs {
   optional string sql_query = 1;
@@ -117,7 +205,6 @@
     // NUL-terminated. This is because JS incurs into a non-negligible overhead
     // when decoding strings and one decode + split('\0') is measurably faster
     // than decoding N strings. See goto.google.com/postmessage-benchmark .
-    // \0-concatenated.
     optional string string_cells = 5;
 
     // If true this is the last batch for the query result.
@@ -189,11 +276,3 @@
 message DescriptorSet {
   repeated DescriptorProto descriptors = 1;
 }
-
-// Input for the /get_metric_descriptors endpoint.
-message GetMetricDescriptorsArgs {}
-
-// Output for the /get_metric_descriptors endpoint.
-message GetMetricDescriptorsResult {
-  optional DescriptorSet descriptor_set = 1;
-}
diff --git a/src/protozero/proto_ring_buffer.cc b/src/protozero/proto_ring_buffer.cc
index 92fe153..631d355 100644
--- a/src/protozero/proto_ring_buffer.cc
+++ b/src/protozero/proto_ring_buffer.cc
@@ -106,7 +106,7 @@
 
   size_t avail = buf_.size() - wr_;
   if (data_len > avail) {
-    // This whole section should be hit extremely rare.
+    // This whole section should be hit extremely rarely.
 
     // Try first just recompacting the buffer by moving everything to the left.
     // This can happen if we received "a message and a bit" on each Append call
diff --git a/src/protozero/proto_ring_buffer.h b/src/protozero/proto_ring_buffer.h
index 06ef539..d71a231 100644
--- a/src/protozero/proto_ring_buffer.h
+++ b/src/protozero/proto_ring_buffer.h
@@ -60,10 +60,10 @@
 // Internally this is similar to a ring-buffer, with the caveat that it never
 // wraps, it only expands. Expansions are rare. The deal is that in most cases
 // the read cursor follows very closely the write cursor. For instance, if the
-// uderlying behaves as a dgram socket, after each Append, the read cursor will
-// chase completely the write cursor. Even if the underyling stream is not
-// always atomic, the expectation is that the read cursor will eventually reach
-// the write one within few messages.
+// underlying transport behaves as a dgram socket, after each Append, the read
+// cursor will chase completely the write cursor. Even if the underlying stream
+// is not always atomic, the expectation is that the read cursor will eventually
+// reach the write one within few messages.
 // A visual example, imagine we have four messages: 2it 4will 2be 4fine
 // Visually:
 //
diff --git a/src/trace_processor/python/perfetto/trace_processor/trace_processor.descriptor b/src/trace_processor/python/perfetto/trace_processor/trace_processor.descriptor
index 3d2bf28..c3672b2 100644
--- a/src/trace_processor/python/perfetto/trace_processor/trace_processor.descriptor
+++ b/src/trace_processor/python/perfetto/trace_processor/trace_processor.descriptor
Binary files differ
diff --git a/src/trace_processor/python/perfetto/trace_processor/trace_processor.descriptor.sha1 b/src/trace_processor/python/perfetto/trace_processor/trace_processor.descriptor.sha1
index 5bbd54a..5f5b733 100644
--- a/src/trace_processor/python/perfetto/trace_processor/trace_processor.descriptor.sha1
+++ b/src/trace_processor/python/perfetto/trace_processor/trace_processor.descriptor.sha1
@@ -2,5 +2,5 @@
 // SHA1(tools/gen_binary_descriptors)
 // 9fc6d77de57ec76a80b76aa282f4c7cf5ce55eec
 // SHA1(protos/perfetto/trace_processor/trace_processor.proto)
-// 8320f306d6d5bbcb5ef6ba8cd62cc70a0994d102
+// ee875dc7384617e22ecb9e9d4ac03df4bba82252
   
\ No newline at end of file
diff --git a/src/trace_processor/rpc/httpd.cc b/src/trace_processor/rpc/httpd.cc
index 9412279..edc4568 100644
--- a/src/trace_processor/rpc/httpd.cc
+++ b/src/trace_processor/rpc/httpd.cc
@@ -71,6 +71,9 @@
   ~HttpServer() override;
   void Run(const char*, const char*);
 
+  // This is non-null only while serving an HTTP request.
+  Client* active_client() { return active_client_; }
+
  private:
   size_t ParseOneHttpRequest(Client* client);
   void HandleRequest(Client*, const HttpRequest&);
@@ -85,9 +88,12 @@
   base::UnixTaskRunner task_runner_;
   std::unique_ptr<base::UnixSocket> sock4_;
   std::unique_ptr<base::UnixSocket> sock6_;
-  std::vector<Client> clients_;
+  std::list<Client> clients_;
+  Client* active_client_ = nullptr;
 };
 
+HttpServer* g_httpd_instance;
+
 void Append(std::vector<char>& buf, const char* str) {
   buf.insert(buf.end(), str, str + strlen(str));
 }
@@ -197,7 +203,9 @@
   // At this point |rxbuf| can contain a partial HTTP request, a full one or
   // more (in case of HTTP Keepalive pipelining).
   for (;;) {
+    active_client_ = client;
     size_t bytes_consumed = ParseOneHttpRequest(client);
+    active_client_ = nullptr;
     if (bytes_consumed == 0)
       break;
     memmove(rxbuf, &rxbuf[bytes_consumed], client->rxbuf_used - bytes_consumed);
@@ -306,6 +314,37 @@
                      });
   }
 
+  if (req.uri == "/rpc") {
+    // Start the chunked reply.
+    strncpy(transfer_encoding_hdr, "Transfer-Encoding: chunked",
+            sizeof(transfer_encoding_hdr));
+    base::UnixSocket* cli_sock = client->sock.get();
+    HttpReply(cli_sock, "200 OK", headers, nullptr, kOmitContentLength);
+
+    static auto resp_fn = [](const void* data, uint32_t len) {
+      char chunk_hdr[32];
+      auto hdr_len = static_cast<size_t>(sprintf(chunk_hdr, "%x\r\n", len));
+      auto* http_client = g_httpd_instance->active_client();
+      PERFETTO_CHECK(http_client);
+      if (data == nullptr) {
+        // Unrecoverable RPC error case.
+        http_client->sock->Shutdown(/*notify=*/true);
+        return;
+      }
+      http_client->sock->Send(chunk_hdr, hdr_len);
+      http_client->sock->Send(data, len);
+      http_client->sock->Send("\r\n", 2);
+    };
+
+    trace_processor_rpc_.SetRpcResponseFunction(resp_fn);
+    trace_processor_rpc_.OnRpcRequest(req.body.data(), req.body.size());
+    trace_processor_rpc_.SetRpcResponseFunction(nullptr);
+
+    // Terminate chunked stream.
+    cli_sock->Send("0\r\n\r\n", 5);
+    return;
+  }
+
   if (req.uri == "/parse") {
     trace_processor_rpc_.Parse(
         reinterpret_cast<const uint8_t*>(req.body.data()), req.body.size());
@@ -381,13 +420,6 @@
                      res.size());
   }
 
-  if (req.uri == "/get_metric_descriptors") {
-    std::vector<uint8_t> res = trace_processor_rpc_.GetMetricDescriptors(
-        reinterpret_cast<const uint8_t*>(req.body.data()), req.body.size());
-    return HttpReply(client->sock.get(), "200 OK", headers, res.data(),
-                     res.size());
-  }
-
   if (req.uri == "/enable_metatrace") {
     trace_processor_rpc_.EnableMetatrace();
     return HttpReply(client->sock.get(), "200 OK", headers);
@@ -407,6 +439,7 @@
 void RunHttpRPCServer(std::unique_ptr<TraceProcessor> preloaded_instance,
                       std::string port_number) {
   HttpServer srv(std::move(preloaded_instance));
+  g_httpd_instance = &srv;
   std::string port = port_number.empty() ? kBindPort : port_number;
   std::string ipv4_addr = "127.0.0.1:" + port;
   std::string ipv6_addr = "[::1]:" + port;
diff --git a/src/trace_processor/rpc/query_result_serializer.h b/src/trace_processor/rpc/query_result_serializer.h
index 9c05e0b..c29d66d 100644
--- a/src/trace_processor/rpc/query_result_serializer.h
+++ b/src/trace_processor/rpc/query_result_serializer.h
@@ -53,6 +53,7 @@
 // chunked-encoded HTTP response, or through a repetition of Wasm calls.
 class QueryResultSerializer {
  public:
+  static constexpr uint32_t kDefaultBatchSplitThreshold = 128 * 1024;
   explicit QueryResultSerializer(Iterator);
   ~QueryResultSerializer();
 
@@ -92,7 +93,7 @@
   // the limit (it splits on the next row *after* the limit is hit).
   // Overridable for testing only.
   uint32_t cells_per_batch_ = 50000;
-  uint32_t batch_split_threshold_ = 1024 * 128;
+  uint32_t batch_split_threshold_ = kDefaultBatchSplitThreshold;
 };
 
 }  // namespace trace_processor
diff --git a/src/trace_processor/rpc/rpc.cc b/src/trace_processor/rpc/rpc.cc
index c63e5cb..a0d4a05 100644
--- a/src/trace_processor/rpc/rpc.cc
+++ b/src/trace_processor/rpc/rpc.cc
@@ -16,39 +16,238 @@
 
 #include "src/trace_processor/rpc/rpc.h"
 
+#include <string.h>
+
 #include <vector>
 
+#include "perfetto/base/logging.h"
 #include "perfetto/base/time.h"
+#include "perfetto/ext/base/utils.h"
 #include "perfetto/protozero/scattered_heap_buffer.h"
+#include "perfetto/protozero/scattered_stream_writer.h"
 #include "perfetto/trace_processor/trace_processor.h"
-#include "protos/perfetto/trace_processor/trace_processor.pbzero.h"
+#include "src/protozero/proto_ring_buffer.h"
 #include "src/trace_processor/rpc/query_result_serializer.h"
 #include "src/trace_processor/tp_metatrace.h"
 
+#include "protos/perfetto/trace_processor/trace_processor.pbzero.h"
+
 namespace perfetto {
 namespace trace_processor {
 
-using ColumnValues = protos::pbzero::RawQueryResult::ColumnValues;
-using ColumnDesc = protos::pbzero::RawQueryResult::ColumnDesc;
-
+namespace {
 // Writes a "Loading trace ..." update every N bytes.
 constexpr size_t kProgressUpdateBytes = 50 * 1000 * 1000;
+using TraceProcessorRpcStream = protos::pbzero::TraceProcessorRpcStream;
+using RpcProto = protos::pbzero::TraceProcessorRpc;
+
+// Most RPC messages are either very small or a query results.
+// QueryResultSerializer splits rows into batches of approximately 128KB. Try
+// avoid extra heap allocations for the nominal case.
+constexpr auto kSliceSize =
+    QueryResultSerializer::kDefaultBatchSplitThreshold + 4096;
+
+// Holds a trace_processor::TraceProcessorRpc pbzero message. Avoids extra
+// copies by doing direct scattered calls from the fragmented heap buffer onto
+// the RpcResponseFunction (the receiver is expected to deal with arbitrary
+// fragmentation anyways). It also takes care of prefixing each message with
+// the proto preamble and varint size.
+class Response {
+ public:
+  Response(int64_t seq, int method);
+  Response(const Response&) = delete;
+  Response& operator=(const Response&) = delete;
+  RpcProto* operator->() { return msg_; }
+  void Send(Rpc::RpcResponseFunction);
+
+ private:
+  RpcProto* msg_ = nullptr;
+
+  // The reason why we use TraceProcessorRpcStream as root message is because
+  // the RPC wire protocol expects each message to be prefixed with a proto
+  // preamble and varint size. This happens to be the same serialization of a
+  // repeated field (this is really the same trick we use between
+  // Trace and TracePacket in trace.proto)
+  protozero::HeapBuffered<TraceProcessorRpcStream> buf_;
+};
+
+Response::Response(int64_t seq, int method) : buf_(kSliceSize, kSliceSize) {
+  msg_ = buf_->add_msg();
+  msg_->set_seq(seq);
+  msg_->set_response(static_cast<RpcProto::TraceProcessorMethod>(method));
+}
+
+void Response::Send(Rpc::RpcResponseFunction send_fn) {
+  buf_->Finalize();
+  for (const auto& slice : buf_.GetSlices()) {
+    auto range = slice.GetUsedRange();
+    send_fn(range.begin, static_cast<uint32_t>(range.size()));
+  }
+}
+
+}  // namespace
 
 Rpc::Rpc(std::unique_ptr<TraceProcessor> preloaded_instance)
     : trace_processor_(std::move(preloaded_instance)),
-      session_id_(base::Uuidv4()) {}
+      session_id_(base::Uuidv4()) {
+  if (!trace_processor_)
+    ResetTraceProcessor();
+}
 
 Rpc::Rpc() : Rpc(nullptr) {}
-
 Rpc::~Rpc() = default;
 
+void Rpc::ResetTraceProcessor() {
+  trace_processor_ = TraceProcessor::CreateInstance(Config());
+  bytes_parsed_ = bytes_last_progress_ = 0;
+  t_parse_started_ = base::GetWallTimeNs().count();
+  // Deliberately not resetting the RPC channel state (rxbuf_, {tx,rx}_seq_id_).
+  // This is invoked from the same client to clear the current trace state
+  // before loading a new one. The IPC channel is orthogonal to that and the
+  // message numbering continues regardless of the reset.
+}
+
+void Rpc::OnRpcRequest(const void* data, size_t len) {
+  rxbuf_.Append(data, len);
+  for (;;) {
+    auto msg = rxbuf_.ReadMessage();
+    if (!msg.valid()) {
+      if (msg.fatal_framing_error)
+        rpc_response_fn_(nullptr, 0);  // Disconnect.
+      break;
+    }
+    ParseRpcRequest(msg.start, msg.len);
+  }
+}
+
+// [data, len] here is a tokenized TraceProcessorRpc proto message, without the
+// size header.
+void Rpc::ParseRpcRequest(const uint8_t* data, size_t len) {
+  RpcProto::Decoder req(data, len);
+
+  // We allow restarting the sequence from 0. This happens when refreshing the
+  // browser while using the external trace_processor_shell --httpd.
+  if (req.seq() != 0 && rx_seq_id_ != 0 && req.seq() != rx_seq_id_ + 1) {
+    PERFETTO_ELOG("RPC request out of order. Expected %" PRId64
+                  ", got %" PRId64,
+                  rx_seq_id_ + 1, req.seq());
+    rpc_response_fn_(nullptr, 0);  // Disconnect.
+    return;
+  }
+  rx_seq_id_ = req.seq();
+
+  // The static cast is to prevent that the compiler breaks future proofness.
+  const int req_type = static_cast<int>(req.request());
+  static const char kErrFieldNotSet[] = "RPC error: request field not set";
+  switch (req_type) {
+    case RpcProto::TPM_APPEND_TRACE_DATA: {
+      Response resp(tx_seq_id_++, req_type);
+      auto* result = resp->set_append_result();
+      if (!req.has_append_trace_data()) {
+        result->set_error(kErrFieldNotSet);
+      } else {
+        protozero::ConstBytes byte_range = req.append_trace_data();
+        util::Status res = Parse(byte_range.data, byte_range.size);
+        if (!res.ok()) {
+          result->set_error(res.message());
+        }
+      }
+      resp.Send(rpc_response_fn_);
+      break;
+    }
+    case RpcProto::TPM_FINALIZE_TRACE_DATA: {
+      Response resp(tx_seq_id_++, req_type);
+      NotifyEndOfFile();
+      resp.Send(rpc_response_fn_);
+      break;
+    }
+    case RpcProto::TPM_QUERY_STREAMING: {
+      if (!req.has_query_args()) {
+        Response resp(tx_seq_id_++, req_type);
+        auto* result = resp->set_query_result();
+        result->set_error(kErrFieldNotSet);
+        resp.Send(rpc_response_fn_);
+      } else {
+        protozero::ConstBytes args = req.query_args();
+        auto it = QueryInternal(args.data, args.size);
+        QueryResultSerializer serializer(std::move(it));
+        for (bool has_more = true; has_more;) {
+          Response resp(tx_seq_id_++, req_type);
+          has_more = serializer.Serialize(resp->set_query_result());
+          resp.Send(rpc_response_fn_);
+        }
+      }
+      break;
+    }
+    case RpcProto::TPM_QUERY_RAW_DEPRECATED: {
+      Response resp(tx_seq_id_++, req_type);
+      auto* result = resp->set_raw_query_result();
+      if (!req.has_raw_query_args()) {
+        result->set_error(kErrFieldNotSet);
+      } else {
+        protozero::ConstBytes args = req.raw_query_args();
+        RawQueryInternal(args.data, args.size, result);
+      }
+      resp.Send(rpc_response_fn_);
+      break;
+    }
+    case RpcProto::TPM_COMPUTE_METRIC: {
+      Response resp(tx_seq_id_++, req_type);
+      auto* result = resp->set_metric_result();
+      if (!req.has_compute_metric_args()) {
+        result->set_error(kErrFieldNotSet);
+      } else {
+        protozero::ConstBytes args = req.compute_metric_args();
+        ComputeMetricInternal(args.data, args.size, result);
+      }
+      resp.Send(rpc_response_fn_);
+      break;
+    }
+    case RpcProto::TPM_GET_METRIC_DESCRIPTORS: {
+      Response resp(tx_seq_id_++, req_type);
+      auto descriptor_set = trace_processor_->GetMetricDescriptors();
+      auto* result = resp->set_metric_descriptors();
+      result->AppendRawProtoBytes(descriptor_set.data(), descriptor_set.size());
+      resp.Send(rpc_response_fn_);
+      break;
+    }
+    case RpcProto::TPM_RESTORE_INITIAL_TABLES: {
+      trace_processor_->RestoreInitialTables();
+      Response resp(tx_seq_id_++, req_type);
+      resp.Send(rpc_response_fn_);
+      break;
+    }
+    case RpcProto::TPM_ENABLE_METATRACE: {
+      trace_processor_->EnableMetatrace();
+      Response resp(tx_seq_id_++, req_type);
+      resp.Send(rpc_response_fn_);
+      break;
+    }
+    case RpcProto::TPM_DISABLE_AND_READ_METATRACE: {
+      Response resp(tx_seq_id_++, req_type);
+      DisableAndReadMetatraceInternal(resp->set_metatrace());
+      resp.Send(rpc_response_fn_);
+      break;
+    }
+    default: {
+      // This can legitimately happen if the client is newer. We reply with a
+      // generic "unkown request" response, so the client can do feature
+      // detection
+      PERFETTO_DLOG("[RPC] Uknown request type (%d), size=%zu", req_type, len);
+      Response resp(tx_seq_id_++, req_type);
+      resp->set_invalid_request(
+          static_cast<RpcProto::TraceProcessorMethod>(req_type));
+      resp.Send(rpc_response_fn_);
+      break;
+    }
+  }  // switch(req_type)
+}
+
 util::Status Rpc::Parse(const uint8_t* data, size_t len) {
   if (eof_) {
-    // Reset the trace processor state if this is either the first call ever or
-    // if another trace has been previously fully loaded.
-    trace_processor_ = TraceProcessor::CreateInstance(Config());
-    bytes_parsed_ = bytes_last_progress_ = 0;
-    t_parse_started_ = base::GetWallTimeNs().count();
+    // Reset the trace processor state if another trace has been previously
+    // loaded.
+    ResetTraceProcessor();
   }
 
   eof_ = false;
@@ -65,8 +264,6 @@
 }
 
 void Rpc::NotifyEndOfFile() {
-  if (!trace_processor_)
-    return;
   trace_processor_->NotifyEndOfFile();
   eof_ = true;
   MaybePrintProgress();
@@ -89,23 +286,7 @@
 void Rpc::Query(const uint8_t* args,
                 size_t len,
                 QueryResultBatchCallback result_callback) {
-  protos::pbzero::RawQueryArgs::Decoder query(args, len);
-  std::string sql = query.sql_query().ToStdString();
-  PERFETTO_DLOG("[RPC] Query < %s", sql.c_str());
-  PERFETTO_TP_TRACE("RPC_QUERY",
-                    [&](metatrace::Record* r) { r->AddArg("SQL", sql); });
-
-  if (!trace_processor_) {
-    static const char kErr[] = "Query() called before Parse()";
-    PERFETTO_ELOG("[RPC] %s", kErr);
-    protozero::HeapBuffered<protos::pbzero::QueryResult> result;
-    result->set_error(kErr);
-    auto vec = result.SerializeAsArray();
-    result_callback(vec.data(), vec.size(), /*has_more=*/false);
-    return;
-  }
-
-  auto it = trace_processor_->ExecuteQuery(sql.c_str());
+  auto it = QueryInternal(args, len);
   QueryResultSerializer serializer(std::move(it));
 
   std::vector<uint8_t> res;
@@ -116,21 +297,34 @@
   }
 }
 
+Iterator Rpc::QueryInternal(const uint8_t* args, size_t len) {
+  protos::pbzero::RawQueryArgs::Decoder query(args, len);
+  std::string sql = query.sql_query().ToStdString();
+  PERFETTO_DLOG("[RPC] Query < %s", sql.c_str());
+  PERFETTO_TP_TRACE("RPC_QUERY",
+                    [&](metatrace::Record* r) { r->AddArg("SQL", sql); });
+
+  return trace_processor_->ExecuteQuery(sql.c_str());
+}
+
 std::vector<uint8_t> Rpc::RawQuery(const uint8_t* args, size_t len) {
   protozero::HeapBuffered<protos::pbzero::RawQueryResult> result;
+  RawQueryInternal(args, len, result.get());
+  return result.SerializeAsArray();
+}
+
+void Rpc::RawQueryInternal(const uint8_t* args,
+                           size_t len,
+                           protos::pbzero::RawQueryResult* result) {
+  using ColumnValues = protos::pbzero::RawQueryResult::ColumnValues;
+  using ColumnDesc = protos::pbzero::RawQueryResult::ColumnDesc;
+
   protos::pbzero::RawQueryArgs::Decoder query(args, len);
   std::string sql = query.sql_query().ToStdString();
   PERFETTO_DLOG("[RPC] RawQuery < %s", sql.c_str());
   PERFETTO_TP_TRACE("RPC_RAW_QUERY",
                     [&](metatrace::Record* r) { r->AddArg("SQL", sql); });
 
-  if (!trace_processor_) {
-    static const char kErr[] = "RawQuery() called before Parse()";
-    PERFETTO_ELOG("[RPC] %s", kErr);
-    result->set_error(kErr);
-    return result.SerializeAsArray();
-  }
-
   auto it = trace_processor_->ExecuteQuery(sql.c_str());
 
   // This vector contains a standalone protozero message per column. The problem
@@ -245,29 +439,26 @@
   if (!status.ok())
     result->set_error(status.c_message());
   PERFETTO_DLOG("[RPC] RawQuery > %d rows (err: %d)", rows, !status.ok());
-
-  return result.SerializeAsArray();
 }
 
 std::string Rpc::GetCurrentTraceName() {
-  if (!trace_processor_)
-    return "";
   return trace_processor_->GetCurrentTraceName();
 }
 
 void Rpc::RestoreInitialTables() {
-  if (trace_processor_)
-    trace_processor_->RestoreInitialTables();
+  trace_processor_->RestoreInitialTables();
   session_id_ = base::Uuidv4();
 }
 
-std::vector<uint8_t> Rpc::ComputeMetric(const uint8_t* data, size_t len) {
+std::vector<uint8_t> Rpc::ComputeMetric(const uint8_t* args, size_t len) {
   protozero::HeapBuffered<protos::pbzero::ComputeMetricResult> result;
-  if (!trace_processor_) {
-    result->set_error("Null trace processor instance");
-    return result.SerializeAsArray();
-  }
+  ComputeMetricInternal(args, len, result.get());
+  return result.SerializeAsArray();
+}
 
+void Rpc::ComputeMetricInternal(const uint8_t* data,
+                                size_t len,
+                                protos::pbzero::ComputeMetricResult* result) {
   protos::pbzero::ComputeMetricArgs::Decoder args(data, len);
   std::vector<std::string> metric_names;
   for (auto it = args.metric_names(); it; ++it) {
@@ -281,15 +472,16 @@
     }
   });
 
+  PERFETTO_DLOG("[RPC] ComputeMetrics(%zu, %s), format=%d", metric_names.size(),
+                metric_names.empty() ? "" : metric_names.front().c_str(),
+                args.format());
   switch (args.format()) {
     case protos::pbzero::ComputeMetricArgs::BINARY_PROTOBUF: {
       std::vector<uint8_t> metrics_proto;
       util::Status status =
           trace_processor_->ComputeMetric(metric_names, &metrics_proto);
       if (status.ok()) {
-        result->AppendBytes(
-            protos::pbzero::ComputeMetricResult::kMetricsFieldNumber,
-            metrics_proto.data(), metrics_proto.size());
+        result->set_metrics(metrics_proto.data(), metrics_proto.size());
       } else {
         result->set_error(status.message());
       }
@@ -301,44 +493,27 @@
           metric_names, TraceProcessor::MetricResultFormat::kProtoText,
           &metrics_string);
       if (status.ok()) {
-        result->AppendString(
-            protos::pbzero::ComputeMetricResult::kMetricsAsPrototextFieldNumber,
-            metrics_string);
+        result->set_metrics_as_prototext(metrics_string);
       } else {
         result->set_error(status.message());
       }
       break;
     }
   }
-  return result.SerializeAsArray();
-}
-
-std::vector<uint8_t> Rpc::GetMetricDescriptors(const uint8_t*, size_t) {
-  protozero::HeapBuffered<protos::pbzero::GetMetricDescriptorsResult> result;
-  if (!trace_processor_) {
-    return result.SerializeAsArray();
-  }
-  std::vector<uint8_t> descriptor_set =
-      trace_processor_->GetMetricDescriptors();
-  result->AppendBytes(
-      protos::pbzero::GetMetricDescriptorsResult::kDescriptorSetFieldNumber,
-      descriptor_set.data(), descriptor_set.size());
-  return result.SerializeAsArray();
 }
 
 void Rpc::EnableMetatrace() {
-  if (!trace_processor_)
-    return;
   trace_processor_->EnableMetatrace();
 }
 
 std::vector<uint8_t> Rpc::DisableAndReadMetatrace() {
   protozero::HeapBuffered<protos::pbzero::DisableAndReadMetatraceResult> result;
-  if (!trace_processor_) {
-    result->set_error("Null trace processor instance");
-    return result.SerializeAsArray();
-  }
+  DisableAndReadMetatraceInternal(result.get());
+  return result.SerializeAsArray();
+}
 
+void Rpc::DisableAndReadMetatraceInternal(
+    protos::pbzero::DisableAndReadMetatraceResult* result) {
   std::vector<uint8_t> trace_proto;
   util::Status status = trace_processor_->DisableAndReadMetatrace(&trace_proto);
   if (status.ok()) {
@@ -346,7 +521,6 @@
   } else {
     result->set_error(status.message());
   }
-  return result.SerializeAsArray();
 }
 
 std::string Rpc::GetSessionId() {
diff --git a/src/trace_processor/rpc/rpc.h b/src/trace_processor/rpc/rpc.h
index 3448937..7bca56d 100644
--- a/src/trace_processor/rpc/rpc.h
+++ b/src/trace_processor/rpc/rpc.h
@@ -26,10 +26,21 @@
 
 #include "perfetto/ext/base/uuid.h"
 #include "perfetto/trace_processor/status.h"
+#include "src/protozero/proto_ring_buffer.h"
 
 namespace perfetto {
+
+namespace protos {
+namespace pbzero {
+class RawQueryResult;
+class ComputeMetricResult;
+class DisableAndReadMetatraceResult;
+}  // namespace pbzero
+}  // namespace protos
+
 namespace trace_processor {
 
+class Iterator;
 class TraceProcessor;
 
 // This class handles the binary {,un}marshalling for the Trace Processor RPC
@@ -55,6 +66,38 @@
   Rpc();
   ~Rpc();
 
+  // 1. TraceProcessor byte-pipe RPC interface.
+  // This is a bidirectional channel with a remote TraceProcessor instance. All
+  // it needs is a byte-oriented pipe (e.g., a TCP socket, a pipe(2) between two
+  // processes or a postmessage channel in the JS+Wasm case). The messages
+  // exchanged on these pipes are TraceProcessorRpc protos (defined in
+  // trace_processor.proto). This has been introduced in Perfetto v15.
+
+  // Pushes data received by the RPC channel into the parser. Inbound messages
+  // are tokenized and turned into TraceProcessor method invocations. |data|
+  // does not need to be a whole TraceProcessorRpc message. It can be a portion
+  // of it or a union of >1 messages.
+  // Responses are sent throught the RpcResponseFunction (below).
+  void OnRpcRequest(const void* data, size_t len);
+
+  // The size argument is a uint32_t and not size_t to avoid ABI mismatches
+  // with Wasm, where size_t = uint32_t.
+  // (nullptr, 0) has the semantic of "close the channel" and is issued when an
+  // unrecoverable wire-protocol framing error is detected.
+  using RpcResponseFunction = void (*)(const void* /*data*/, uint32_t /*len*/);
+  void SetRpcResponseFunction(RpcResponseFunction f) { rpc_response_fn_ = f; }
+
+  // 2. TraceProcessor legacy RPC endpoints.
+  // The methods below are exposed for the old RPC interfaces, where each RPC
+  // implementation deals with the method demuxing: (i) wasm_bridge.cc has one
+  // exported C function per method (going away soon); (ii) httpd.cc has one
+  // REST endpoint per method. Over time this turned out to have too much
+  // duplicated boilerplate and we moved to the byte-pipe model above.
+  // We still keep these endpoints around, because httpd.cc still  exposes the
+  // individual REST endpoints to legacy clients (TP's Python API). The
+  // mainteinance cost of those is very low. Both the new byte-pipe and the
+  // old endpoints run exactly the same code. The {de,}serialization format is
+  // the same, the only difference is only who does the method demuxing.
   // The methods of this class are mirrors (modulo {un,}marshalling of args) of
   // the corresponding names in trace_processor.h . See that header for docs.
 
@@ -62,7 +105,6 @@
   void NotifyEndOfFile();
   std::string GetCurrentTraceName();
   std::vector<uint8_t> ComputeMetric(const uint8_t* data, size_t len);
-  std::vector<uint8_t> GetMetricDescriptors(const uint8_t* data, size_t len);
   void EnableMetatrace();
   std::vector<uint8_t> DisableAndReadMetatrace();
 
@@ -113,10 +155,25 @@
   std::vector<uint8_t> RawQuery(const uint8_t* args, size_t len);
 
  private:
+  void ParseRpcRequest(const uint8_t* data, size_t len);
+  void ResetTraceProcessor();
   void MaybePrintProgress();
+  Iterator QueryInternal(const uint8_t* args, size_t len);
+  void RawQueryInternal(const uint8_t* args,
+                        size_t len,
+                        protos::pbzero::RawQueryResult*);
+  void ComputeMetricInternal(const uint8_t* args,
+                             size_t len,
+                             protos::pbzero::ComputeMetricResult*);
+  void DisableAndReadMetatraceInternal(
+      protos::pbzero::DisableAndReadMetatraceResult*);
 
   std::unique_ptr<TraceProcessor> trace_processor_;
-  bool eof_ = true;  // Reset when calling Parse().
+  RpcResponseFunction rpc_response_fn_;
+  protozero::ProtoRingBuffer rxbuf_;
+  int64_t tx_seq_id_ = 0;
+  int64_t rx_seq_id_ = 0;
+  bool eof_ = false;
   int64_t t_parse_started_ = 0;
   size_t bytes_last_progress_ = 0;
   size_t bytes_parsed_ = 0;
diff --git a/src/trace_processor/rpc/wasm_bridge.cc b/src/trace_processor/rpc/wasm_bridge.cc
index 79e5dae..4d2decf 100644
--- a/src/trace_processor/rpc/wasm_bridge.cc
+++ b/src/trace_processor/rpc/wasm_bridge.cc
@@ -97,14 +97,6 @@
           static_cast<uint32_t>(res.size()));
 }
 
-void EMSCRIPTEN_KEEPALIVE trace_processor_get_metric_descriptors(uint32_t);
-void trace_processor_get_metric_descriptors(uint32_t size) {
-  std::vector<uint8_t> res =
-      g_trace_processor_rpc->GetMetricDescriptors(g_req_buf, size);
-  g_reply(reinterpret_cast<const char*>(res.data()),
-          static_cast<uint32_t>(res.size()));
-}
-
 void EMSCRIPTEN_KEEPALIVE trace_processor_enable_metatrace(uint32_t);
 void trace_processor_enable_metatrace(uint32_t) {
   g_trace_processor_rpc->EnableMetatrace();