TraceProcessor RPC: add binary-pipe interface
This CL introduces support for a bidirectional channel with a
remote TraceProcessor instance.
All it needs is a byte-oriented pipe (e.g., a TCP socket, a
pipe(2) between two processes or a postmessage channel in the
JS+Wasm case).
The messages exchanged on these pipes are TraceProcessorRpc
protos (defined in trace_processor.proto).
Bug: 159142289
Change-Id: I5e802e81c01a467a5969883ad5f63a30906a6e05
diff --git a/protos/perfetto/trace_processor/trace_processor.proto b/protos/perfetto/trace_processor/trace_processor.proto
index f32745b..efebc38 100644
--- a/protos/perfetto/trace_processor/trace_processor.proto
+++ b/protos/perfetto/trace_processor/trace_processor.proto
@@ -35,6 +35,94 @@
// In this case these messages are used to {,un}marshall HTTP requests and
// response made through src/trace_processor/rpc/httpd.cc .
+// At lowest level, the wire-format of the RPC procol is a linear sequence of
+// TraceProcessorRpc messages on each side of the byte pipe
+// Each message is prefixed by a tag (field = 1, type = length delimited) and a
+// varint encoding its size (this is so the whole stream can also be read /
+// written as if it was a repeated field of TraceProcessorRpcStream).
+
+message TraceProcessorRpcStream {
+ repeated TraceProcessorRpc msg = 1;
+}
+
+message TraceProcessorRpc {
+ // A monotonic counter used only for debugging purposes, to detect if the
+ // underlying stream is missing or duping data. The counter starts at 0 on
+ // each side of the pipe and is incremented on each message.
+ // Do NOT expect that a response has the same |seq| of its corresponding
+ // request: some requests (e.g., a query returning many rows) can yield more
+ // than one response message, bringing the tx and rq seq our of sync.
+ optional int64 seq = 1;
+
+ enum TraceProcessorMethod {
+ TPM_UNSPECIFIED = 0;
+ TPM_APPEND_TRACE_DATA = 1;
+ TPM_FINALIZE_TRACE_DATA = 2;
+ TPM_QUERY_STREAMING = 3;
+ TPM_QUERY_RAW_DEPRECATED = 4;
+ TPM_COMPUTE_METRIC = 5;
+ TPM_GET_METRIC_DESCRIPTORS = 6;
+ TPM_RESTORE_INITIAL_TABLES = 7;
+ TPM_ENABLE_METATRACE = 8;
+ TPM_DISABLE_AND_READ_METATRACE = 9;
+ }
+
+ oneof type {
+ // Client -> TraceProcessor requests.
+ TraceProcessorMethod request = 2;
+
+ // TraceProcessor -> Client responses.
+ TraceProcessorMethod response = 3;
+
+ // This is sent back instead of filling |response| when the client sends a
+ // |request| which is not known by the TraceProcessor service. This can
+ // happen when the client is newer than the service.
+ TraceProcessorMethod invalid_request = 4;
+ }
+
+ // Request/Response arguments.
+ // Not all requests / responses require an argument.
+
+ oneof args {
+ // TraceProcessorMethod request args.
+
+ // For TPM_APPEND_TRACE_DATA.
+ bytes append_trace_data = 101;
+ // For TPM_QUERY_STREAMING.
+ QueryArgs query_args = 103;
+ // For TPM_QUERY_RAW_DEPRECATED.
+ RawQueryArgs raw_query_args = 104;
+ // For TPM_COMPUTE_METRIC.
+ ComputeMetricArgs compute_metric_args = 105;
+
+ // TraceProcessorMethod response args.
+ // For TPM_APPEND_TRACE_DATA.
+ AppendTraceDataResult append_result = 201;
+ // For TPM_QUERY_STREAMING.
+ QueryResult query_result = 203;
+ // For TPM_QUERY_RAW_DEPRECATED.
+ RawQueryResult raw_query_result = 204;
+ // For TPM_COMPUTE_METRIC.
+ ComputeMetricResult metric_result = 205;
+ // For TPM_GET_METRIC_DESCRIPTORS.
+ DescriptorSet metric_descriptors = 206;
+ // For TPM_DISABLE_AND_READ_METATRACE.
+ DisableAndReadMetatraceResult metatrace = 209;
+ }
+}
+
+message AppendTraceDataResult {
+ optional int64 total_bytes_parsed = 1;
+ optional string error = 2;
+}
+
+message QueryArgs {
+ optional string sql_query = 1;
+
+ // Wall time when the query was queued. Used only for query stats.
+ optional uint64 time_queued_ns = 2;
+}
+
// Input for the /raw_query endpoint.
message RawQueryArgs {
optional string sql_query = 1;
@@ -117,7 +205,6 @@
// NUL-terminated. This is because JS incurs into a non-negligible overhead
// when decoding strings and one decode + split('\0') is measurably faster
// than decoding N strings. See goto.google.com/postmessage-benchmark .
- // \0-concatenated.
optional string string_cells = 5;
// If true this is the last batch for the query result.
@@ -189,11 +276,3 @@
message DescriptorSet {
repeated DescriptorProto descriptors = 1;
}
-
-// Input for the /get_metric_descriptors endpoint.
-message GetMetricDescriptorsArgs {}
-
-// Output for the /get_metric_descriptors endpoint.
-message GetMetricDescriptorsResult {
- optional DescriptorSet descriptor_set = 1;
-}
diff --git a/src/protozero/proto_ring_buffer.cc b/src/protozero/proto_ring_buffer.cc
index 92fe153..631d355 100644
--- a/src/protozero/proto_ring_buffer.cc
+++ b/src/protozero/proto_ring_buffer.cc
@@ -106,7 +106,7 @@
size_t avail = buf_.size() - wr_;
if (data_len > avail) {
- // This whole section should be hit extremely rare.
+ // This whole section should be hit extremely rarely.
// Try first just recompacting the buffer by moving everything to the left.
// This can happen if we received "a message and a bit" on each Append call
diff --git a/src/protozero/proto_ring_buffer.h b/src/protozero/proto_ring_buffer.h
index 06ef539..d71a231 100644
--- a/src/protozero/proto_ring_buffer.h
+++ b/src/protozero/proto_ring_buffer.h
@@ -60,10 +60,10 @@
// Internally this is similar to a ring-buffer, with the caveat that it never
// wraps, it only expands. Expansions are rare. The deal is that in most cases
// the read cursor follows very closely the write cursor. For instance, if the
-// uderlying behaves as a dgram socket, after each Append, the read cursor will
-// chase completely the write cursor. Even if the underyling stream is not
-// always atomic, the expectation is that the read cursor will eventually reach
-// the write one within few messages.
+// underlying transport behaves as a dgram socket, after each Append, the read
+// cursor will chase completely the write cursor. Even if the underlying stream
+// is not always atomic, the expectation is that the read cursor will eventually
+// reach the write one within few messages.
// A visual example, imagine we have four messages: 2it 4will 2be 4fine
// Visually:
//
diff --git a/src/trace_processor/python/perfetto/trace_processor/trace_processor.descriptor b/src/trace_processor/python/perfetto/trace_processor/trace_processor.descriptor
index 3d2bf28..c3672b2 100644
--- a/src/trace_processor/python/perfetto/trace_processor/trace_processor.descriptor
+++ b/src/trace_processor/python/perfetto/trace_processor/trace_processor.descriptor
Binary files differ
diff --git a/src/trace_processor/python/perfetto/trace_processor/trace_processor.descriptor.sha1 b/src/trace_processor/python/perfetto/trace_processor/trace_processor.descriptor.sha1
index 5bbd54a..5f5b733 100644
--- a/src/trace_processor/python/perfetto/trace_processor/trace_processor.descriptor.sha1
+++ b/src/trace_processor/python/perfetto/trace_processor/trace_processor.descriptor.sha1
@@ -2,5 +2,5 @@
// SHA1(tools/gen_binary_descriptors)
// 9fc6d77de57ec76a80b76aa282f4c7cf5ce55eec
// SHA1(protos/perfetto/trace_processor/trace_processor.proto)
-// 8320f306d6d5bbcb5ef6ba8cd62cc70a0994d102
+// ee875dc7384617e22ecb9e9d4ac03df4bba82252
\ No newline at end of file
diff --git a/src/trace_processor/rpc/httpd.cc b/src/trace_processor/rpc/httpd.cc
index 9412279..edc4568 100644
--- a/src/trace_processor/rpc/httpd.cc
+++ b/src/trace_processor/rpc/httpd.cc
@@ -71,6 +71,9 @@
~HttpServer() override;
void Run(const char*, const char*);
+ // This is non-null only while serving an HTTP request.
+ Client* active_client() { return active_client_; }
+
private:
size_t ParseOneHttpRequest(Client* client);
void HandleRequest(Client*, const HttpRequest&);
@@ -85,9 +88,12 @@
base::UnixTaskRunner task_runner_;
std::unique_ptr<base::UnixSocket> sock4_;
std::unique_ptr<base::UnixSocket> sock6_;
- std::vector<Client> clients_;
+ std::list<Client> clients_;
+ Client* active_client_ = nullptr;
};
+HttpServer* g_httpd_instance;
+
void Append(std::vector<char>& buf, const char* str) {
buf.insert(buf.end(), str, str + strlen(str));
}
@@ -197,7 +203,9 @@
// At this point |rxbuf| can contain a partial HTTP request, a full one or
// more (in case of HTTP Keepalive pipelining).
for (;;) {
+ active_client_ = client;
size_t bytes_consumed = ParseOneHttpRequest(client);
+ active_client_ = nullptr;
if (bytes_consumed == 0)
break;
memmove(rxbuf, &rxbuf[bytes_consumed], client->rxbuf_used - bytes_consumed);
@@ -306,6 +314,37 @@
});
}
+ if (req.uri == "/rpc") {
+ // Start the chunked reply.
+ strncpy(transfer_encoding_hdr, "Transfer-Encoding: chunked",
+ sizeof(transfer_encoding_hdr));
+ base::UnixSocket* cli_sock = client->sock.get();
+ HttpReply(cli_sock, "200 OK", headers, nullptr, kOmitContentLength);
+
+ static auto resp_fn = [](const void* data, uint32_t len) {
+ char chunk_hdr[32];
+ auto hdr_len = static_cast<size_t>(sprintf(chunk_hdr, "%x\r\n", len));
+ auto* http_client = g_httpd_instance->active_client();
+ PERFETTO_CHECK(http_client);
+ if (data == nullptr) {
+ // Unrecoverable RPC error case.
+ http_client->sock->Shutdown(/*notify=*/true);
+ return;
+ }
+ http_client->sock->Send(chunk_hdr, hdr_len);
+ http_client->sock->Send(data, len);
+ http_client->sock->Send("\r\n", 2);
+ };
+
+ trace_processor_rpc_.SetRpcResponseFunction(resp_fn);
+ trace_processor_rpc_.OnRpcRequest(req.body.data(), req.body.size());
+ trace_processor_rpc_.SetRpcResponseFunction(nullptr);
+
+ // Terminate chunked stream.
+ cli_sock->Send("0\r\n\r\n", 5);
+ return;
+ }
+
if (req.uri == "/parse") {
trace_processor_rpc_.Parse(
reinterpret_cast<const uint8_t*>(req.body.data()), req.body.size());
@@ -381,13 +420,6 @@
res.size());
}
- if (req.uri == "/get_metric_descriptors") {
- std::vector<uint8_t> res = trace_processor_rpc_.GetMetricDescriptors(
- reinterpret_cast<const uint8_t*>(req.body.data()), req.body.size());
- return HttpReply(client->sock.get(), "200 OK", headers, res.data(),
- res.size());
- }
-
if (req.uri == "/enable_metatrace") {
trace_processor_rpc_.EnableMetatrace();
return HttpReply(client->sock.get(), "200 OK", headers);
@@ -407,6 +439,7 @@
void RunHttpRPCServer(std::unique_ptr<TraceProcessor> preloaded_instance,
std::string port_number) {
HttpServer srv(std::move(preloaded_instance));
+ g_httpd_instance = &srv;
std::string port = port_number.empty() ? kBindPort : port_number;
std::string ipv4_addr = "127.0.0.1:" + port;
std::string ipv6_addr = "[::1]:" + port;
diff --git a/src/trace_processor/rpc/query_result_serializer.h b/src/trace_processor/rpc/query_result_serializer.h
index 9c05e0b..c29d66d 100644
--- a/src/trace_processor/rpc/query_result_serializer.h
+++ b/src/trace_processor/rpc/query_result_serializer.h
@@ -53,6 +53,7 @@
// chunked-encoded HTTP response, or through a repetition of Wasm calls.
class QueryResultSerializer {
public:
+ static constexpr uint32_t kDefaultBatchSplitThreshold = 128 * 1024;
explicit QueryResultSerializer(Iterator);
~QueryResultSerializer();
@@ -92,7 +93,7 @@
// the limit (it splits on the next row *after* the limit is hit).
// Overridable for testing only.
uint32_t cells_per_batch_ = 50000;
- uint32_t batch_split_threshold_ = 1024 * 128;
+ uint32_t batch_split_threshold_ = kDefaultBatchSplitThreshold;
};
} // namespace trace_processor
diff --git a/src/trace_processor/rpc/rpc.cc b/src/trace_processor/rpc/rpc.cc
index c63e5cb..a0d4a05 100644
--- a/src/trace_processor/rpc/rpc.cc
+++ b/src/trace_processor/rpc/rpc.cc
@@ -16,39 +16,238 @@
#include "src/trace_processor/rpc/rpc.h"
+#include <string.h>
+
#include <vector>
+#include "perfetto/base/logging.h"
#include "perfetto/base/time.h"
+#include "perfetto/ext/base/utils.h"
#include "perfetto/protozero/scattered_heap_buffer.h"
+#include "perfetto/protozero/scattered_stream_writer.h"
#include "perfetto/trace_processor/trace_processor.h"
-#include "protos/perfetto/trace_processor/trace_processor.pbzero.h"
+#include "src/protozero/proto_ring_buffer.h"
#include "src/trace_processor/rpc/query_result_serializer.h"
#include "src/trace_processor/tp_metatrace.h"
+#include "protos/perfetto/trace_processor/trace_processor.pbzero.h"
+
namespace perfetto {
namespace trace_processor {
-using ColumnValues = protos::pbzero::RawQueryResult::ColumnValues;
-using ColumnDesc = protos::pbzero::RawQueryResult::ColumnDesc;
-
+namespace {
// Writes a "Loading trace ..." update every N bytes.
constexpr size_t kProgressUpdateBytes = 50 * 1000 * 1000;
+using TraceProcessorRpcStream = protos::pbzero::TraceProcessorRpcStream;
+using RpcProto = protos::pbzero::TraceProcessorRpc;
+
+// Most RPC messages are either very small or a query results.
+// QueryResultSerializer splits rows into batches of approximately 128KB. Try
+// avoid extra heap allocations for the nominal case.
+constexpr auto kSliceSize =
+ QueryResultSerializer::kDefaultBatchSplitThreshold + 4096;
+
+// Holds a trace_processor::TraceProcessorRpc pbzero message. Avoids extra
+// copies by doing direct scattered calls from the fragmented heap buffer onto
+// the RpcResponseFunction (the receiver is expected to deal with arbitrary
+// fragmentation anyways). It also takes care of prefixing each message with
+// the proto preamble and varint size.
+class Response {
+ public:
+ Response(int64_t seq, int method);
+ Response(const Response&) = delete;
+ Response& operator=(const Response&) = delete;
+ RpcProto* operator->() { return msg_; }
+ void Send(Rpc::RpcResponseFunction);
+
+ private:
+ RpcProto* msg_ = nullptr;
+
+ // The reason why we use TraceProcessorRpcStream as root message is because
+ // the RPC wire protocol expects each message to be prefixed with a proto
+ // preamble and varint size. This happens to be the same serialization of a
+ // repeated field (this is really the same trick we use between
+ // Trace and TracePacket in trace.proto)
+ protozero::HeapBuffered<TraceProcessorRpcStream> buf_;
+};
+
+Response::Response(int64_t seq, int method) : buf_(kSliceSize, kSliceSize) {
+ msg_ = buf_->add_msg();
+ msg_->set_seq(seq);
+ msg_->set_response(static_cast<RpcProto::TraceProcessorMethod>(method));
+}
+
+void Response::Send(Rpc::RpcResponseFunction send_fn) {
+ buf_->Finalize();
+ for (const auto& slice : buf_.GetSlices()) {
+ auto range = slice.GetUsedRange();
+ send_fn(range.begin, static_cast<uint32_t>(range.size()));
+ }
+}
+
+} // namespace
Rpc::Rpc(std::unique_ptr<TraceProcessor> preloaded_instance)
: trace_processor_(std::move(preloaded_instance)),
- session_id_(base::Uuidv4()) {}
+ session_id_(base::Uuidv4()) {
+ if (!trace_processor_)
+ ResetTraceProcessor();
+}
Rpc::Rpc() : Rpc(nullptr) {}
-
Rpc::~Rpc() = default;
+void Rpc::ResetTraceProcessor() {
+ trace_processor_ = TraceProcessor::CreateInstance(Config());
+ bytes_parsed_ = bytes_last_progress_ = 0;
+ t_parse_started_ = base::GetWallTimeNs().count();
+ // Deliberately not resetting the RPC channel state (rxbuf_, {tx,rx}_seq_id_).
+ // This is invoked from the same client to clear the current trace state
+ // before loading a new one. The IPC channel is orthogonal to that and the
+ // message numbering continues regardless of the reset.
+}
+
+void Rpc::OnRpcRequest(const void* data, size_t len) {
+ rxbuf_.Append(data, len);
+ for (;;) {
+ auto msg = rxbuf_.ReadMessage();
+ if (!msg.valid()) {
+ if (msg.fatal_framing_error)
+ rpc_response_fn_(nullptr, 0); // Disconnect.
+ break;
+ }
+ ParseRpcRequest(msg.start, msg.len);
+ }
+}
+
+// [data, len] here is a tokenized TraceProcessorRpc proto message, without the
+// size header.
+void Rpc::ParseRpcRequest(const uint8_t* data, size_t len) {
+ RpcProto::Decoder req(data, len);
+
+ // We allow restarting the sequence from 0. This happens when refreshing the
+ // browser while using the external trace_processor_shell --httpd.
+ if (req.seq() != 0 && rx_seq_id_ != 0 && req.seq() != rx_seq_id_ + 1) {
+ PERFETTO_ELOG("RPC request out of order. Expected %" PRId64
+ ", got %" PRId64,
+ rx_seq_id_ + 1, req.seq());
+ rpc_response_fn_(nullptr, 0); // Disconnect.
+ return;
+ }
+ rx_seq_id_ = req.seq();
+
+ // The static cast is to prevent that the compiler breaks future proofness.
+ const int req_type = static_cast<int>(req.request());
+ static const char kErrFieldNotSet[] = "RPC error: request field not set";
+ switch (req_type) {
+ case RpcProto::TPM_APPEND_TRACE_DATA: {
+ Response resp(tx_seq_id_++, req_type);
+ auto* result = resp->set_append_result();
+ if (!req.has_append_trace_data()) {
+ result->set_error(kErrFieldNotSet);
+ } else {
+ protozero::ConstBytes byte_range = req.append_trace_data();
+ util::Status res = Parse(byte_range.data, byte_range.size);
+ if (!res.ok()) {
+ result->set_error(res.message());
+ }
+ }
+ resp.Send(rpc_response_fn_);
+ break;
+ }
+ case RpcProto::TPM_FINALIZE_TRACE_DATA: {
+ Response resp(tx_seq_id_++, req_type);
+ NotifyEndOfFile();
+ resp.Send(rpc_response_fn_);
+ break;
+ }
+ case RpcProto::TPM_QUERY_STREAMING: {
+ if (!req.has_query_args()) {
+ Response resp(tx_seq_id_++, req_type);
+ auto* result = resp->set_query_result();
+ result->set_error(kErrFieldNotSet);
+ resp.Send(rpc_response_fn_);
+ } else {
+ protozero::ConstBytes args = req.query_args();
+ auto it = QueryInternal(args.data, args.size);
+ QueryResultSerializer serializer(std::move(it));
+ for (bool has_more = true; has_more;) {
+ Response resp(tx_seq_id_++, req_type);
+ has_more = serializer.Serialize(resp->set_query_result());
+ resp.Send(rpc_response_fn_);
+ }
+ }
+ break;
+ }
+ case RpcProto::TPM_QUERY_RAW_DEPRECATED: {
+ Response resp(tx_seq_id_++, req_type);
+ auto* result = resp->set_raw_query_result();
+ if (!req.has_raw_query_args()) {
+ result->set_error(kErrFieldNotSet);
+ } else {
+ protozero::ConstBytes args = req.raw_query_args();
+ RawQueryInternal(args.data, args.size, result);
+ }
+ resp.Send(rpc_response_fn_);
+ break;
+ }
+ case RpcProto::TPM_COMPUTE_METRIC: {
+ Response resp(tx_seq_id_++, req_type);
+ auto* result = resp->set_metric_result();
+ if (!req.has_compute_metric_args()) {
+ result->set_error(kErrFieldNotSet);
+ } else {
+ protozero::ConstBytes args = req.compute_metric_args();
+ ComputeMetricInternal(args.data, args.size, result);
+ }
+ resp.Send(rpc_response_fn_);
+ break;
+ }
+ case RpcProto::TPM_GET_METRIC_DESCRIPTORS: {
+ Response resp(tx_seq_id_++, req_type);
+ auto descriptor_set = trace_processor_->GetMetricDescriptors();
+ auto* result = resp->set_metric_descriptors();
+ result->AppendRawProtoBytes(descriptor_set.data(), descriptor_set.size());
+ resp.Send(rpc_response_fn_);
+ break;
+ }
+ case RpcProto::TPM_RESTORE_INITIAL_TABLES: {
+ trace_processor_->RestoreInitialTables();
+ Response resp(tx_seq_id_++, req_type);
+ resp.Send(rpc_response_fn_);
+ break;
+ }
+ case RpcProto::TPM_ENABLE_METATRACE: {
+ trace_processor_->EnableMetatrace();
+ Response resp(tx_seq_id_++, req_type);
+ resp.Send(rpc_response_fn_);
+ break;
+ }
+ case RpcProto::TPM_DISABLE_AND_READ_METATRACE: {
+ Response resp(tx_seq_id_++, req_type);
+ DisableAndReadMetatraceInternal(resp->set_metatrace());
+ resp.Send(rpc_response_fn_);
+ break;
+ }
+ default: {
+ // This can legitimately happen if the client is newer. We reply with a
+ // generic "unkown request" response, so the client can do feature
+ // detection
+ PERFETTO_DLOG("[RPC] Uknown request type (%d), size=%zu", req_type, len);
+ Response resp(tx_seq_id_++, req_type);
+ resp->set_invalid_request(
+ static_cast<RpcProto::TraceProcessorMethod>(req_type));
+ resp.Send(rpc_response_fn_);
+ break;
+ }
+ } // switch(req_type)
+}
+
util::Status Rpc::Parse(const uint8_t* data, size_t len) {
if (eof_) {
- // Reset the trace processor state if this is either the first call ever or
- // if another trace has been previously fully loaded.
- trace_processor_ = TraceProcessor::CreateInstance(Config());
- bytes_parsed_ = bytes_last_progress_ = 0;
- t_parse_started_ = base::GetWallTimeNs().count();
+ // Reset the trace processor state if another trace has been previously
+ // loaded.
+ ResetTraceProcessor();
}
eof_ = false;
@@ -65,8 +264,6 @@
}
void Rpc::NotifyEndOfFile() {
- if (!trace_processor_)
- return;
trace_processor_->NotifyEndOfFile();
eof_ = true;
MaybePrintProgress();
@@ -89,23 +286,7 @@
void Rpc::Query(const uint8_t* args,
size_t len,
QueryResultBatchCallback result_callback) {
- protos::pbzero::RawQueryArgs::Decoder query(args, len);
- std::string sql = query.sql_query().ToStdString();
- PERFETTO_DLOG("[RPC] Query < %s", sql.c_str());
- PERFETTO_TP_TRACE("RPC_QUERY",
- [&](metatrace::Record* r) { r->AddArg("SQL", sql); });
-
- if (!trace_processor_) {
- static const char kErr[] = "Query() called before Parse()";
- PERFETTO_ELOG("[RPC] %s", kErr);
- protozero::HeapBuffered<protos::pbzero::QueryResult> result;
- result->set_error(kErr);
- auto vec = result.SerializeAsArray();
- result_callback(vec.data(), vec.size(), /*has_more=*/false);
- return;
- }
-
- auto it = trace_processor_->ExecuteQuery(sql.c_str());
+ auto it = QueryInternal(args, len);
QueryResultSerializer serializer(std::move(it));
std::vector<uint8_t> res;
@@ -116,21 +297,34 @@
}
}
+Iterator Rpc::QueryInternal(const uint8_t* args, size_t len) {
+ protos::pbzero::RawQueryArgs::Decoder query(args, len);
+ std::string sql = query.sql_query().ToStdString();
+ PERFETTO_DLOG("[RPC] Query < %s", sql.c_str());
+ PERFETTO_TP_TRACE("RPC_QUERY",
+ [&](metatrace::Record* r) { r->AddArg("SQL", sql); });
+
+ return trace_processor_->ExecuteQuery(sql.c_str());
+}
+
std::vector<uint8_t> Rpc::RawQuery(const uint8_t* args, size_t len) {
protozero::HeapBuffered<protos::pbzero::RawQueryResult> result;
+ RawQueryInternal(args, len, result.get());
+ return result.SerializeAsArray();
+}
+
+void Rpc::RawQueryInternal(const uint8_t* args,
+ size_t len,
+ protos::pbzero::RawQueryResult* result) {
+ using ColumnValues = protos::pbzero::RawQueryResult::ColumnValues;
+ using ColumnDesc = protos::pbzero::RawQueryResult::ColumnDesc;
+
protos::pbzero::RawQueryArgs::Decoder query(args, len);
std::string sql = query.sql_query().ToStdString();
PERFETTO_DLOG("[RPC] RawQuery < %s", sql.c_str());
PERFETTO_TP_TRACE("RPC_RAW_QUERY",
[&](metatrace::Record* r) { r->AddArg("SQL", sql); });
- if (!trace_processor_) {
- static const char kErr[] = "RawQuery() called before Parse()";
- PERFETTO_ELOG("[RPC] %s", kErr);
- result->set_error(kErr);
- return result.SerializeAsArray();
- }
-
auto it = trace_processor_->ExecuteQuery(sql.c_str());
// This vector contains a standalone protozero message per column. The problem
@@ -245,29 +439,26 @@
if (!status.ok())
result->set_error(status.c_message());
PERFETTO_DLOG("[RPC] RawQuery > %d rows (err: %d)", rows, !status.ok());
-
- return result.SerializeAsArray();
}
std::string Rpc::GetCurrentTraceName() {
- if (!trace_processor_)
- return "";
return trace_processor_->GetCurrentTraceName();
}
void Rpc::RestoreInitialTables() {
- if (trace_processor_)
- trace_processor_->RestoreInitialTables();
+ trace_processor_->RestoreInitialTables();
session_id_ = base::Uuidv4();
}
-std::vector<uint8_t> Rpc::ComputeMetric(const uint8_t* data, size_t len) {
+std::vector<uint8_t> Rpc::ComputeMetric(const uint8_t* args, size_t len) {
protozero::HeapBuffered<protos::pbzero::ComputeMetricResult> result;
- if (!trace_processor_) {
- result->set_error("Null trace processor instance");
- return result.SerializeAsArray();
- }
+ ComputeMetricInternal(args, len, result.get());
+ return result.SerializeAsArray();
+}
+void Rpc::ComputeMetricInternal(const uint8_t* data,
+ size_t len,
+ protos::pbzero::ComputeMetricResult* result) {
protos::pbzero::ComputeMetricArgs::Decoder args(data, len);
std::vector<std::string> metric_names;
for (auto it = args.metric_names(); it; ++it) {
@@ -281,15 +472,16 @@
}
});
+ PERFETTO_DLOG("[RPC] ComputeMetrics(%zu, %s), format=%d", metric_names.size(),
+ metric_names.empty() ? "" : metric_names.front().c_str(),
+ args.format());
switch (args.format()) {
case protos::pbzero::ComputeMetricArgs::BINARY_PROTOBUF: {
std::vector<uint8_t> metrics_proto;
util::Status status =
trace_processor_->ComputeMetric(metric_names, &metrics_proto);
if (status.ok()) {
- result->AppendBytes(
- protos::pbzero::ComputeMetricResult::kMetricsFieldNumber,
- metrics_proto.data(), metrics_proto.size());
+ result->set_metrics(metrics_proto.data(), metrics_proto.size());
} else {
result->set_error(status.message());
}
@@ -301,44 +493,27 @@
metric_names, TraceProcessor::MetricResultFormat::kProtoText,
&metrics_string);
if (status.ok()) {
- result->AppendString(
- protos::pbzero::ComputeMetricResult::kMetricsAsPrototextFieldNumber,
- metrics_string);
+ result->set_metrics_as_prototext(metrics_string);
} else {
result->set_error(status.message());
}
break;
}
}
- return result.SerializeAsArray();
-}
-
-std::vector<uint8_t> Rpc::GetMetricDescriptors(const uint8_t*, size_t) {
- protozero::HeapBuffered<protos::pbzero::GetMetricDescriptorsResult> result;
- if (!trace_processor_) {
- return result.SerializeAsArray();
- }
- std::vector<uint8_t> descriptor_set =
- trace_processor_->GetMetricDescriptors();
- result->AppendBytes(
- protos::pbzero::GetMetricDescriptorsResult::kDescriptorSetFieldNumber,
- descriptor_set.data(), descriptor_set.size());
- return result.SerializeAsArray();
}
void Rpc::EnableMetatrace() {
- if (!trace_processor_)
- return;
trace_processor_->EnableMetatrace();
}
std::vector<uint8_t> Rpc::DisableAndReadMetatrace() {
protozero::HeapBuffered<protos::pbzero::DisableAndReadMetatraceResult> result;
- if (!trace_processor_) {
- result->set_error("Null trace processor instance");
- return result.SerializeAsArray();
- }
+ DisableAndReadMetatraceInternal(result.get());
+ return result.SerializeAsArray();
+}
+void Rpc::DisableAndReadMetatraceInternal(
+ protos::pbzero::DisableAndReadMetatraceResult* result) {
std::vector<uint8_t> trace_proto;
util::Status status = trace_processor_->DisableAndReadMetatrace(&trace_proto);
if (status.ok()) {
@@ -346,7 +521,6 @@
} else {
result->set_error(status.message());
}
- return result.SerializeAsArray();
}
std::string Rpc::GetSessionId() {
diff --git a/src/trace_processor/rpc/rpc.h b/src/trace_processor/rpc/rpc.h
index 3448937..7bca56d 100644
--- a/src/trace_processor/rpc/rpc.h
+++ b/src/trace_processor/rpc/rpc.h
@@ -26,10 +26,21 @@
#include "perfetto/ext/base/uuid.h"
#include "perfetto/trace_processor/status.h"
+#include "src/protozero/proto_ring_buffer.h"
namespace perfetto {
+
+namespace protos {
+namespace pbzero {
+class RawQueryResult;
+class ComputeMetricResult;
+class DisableAndReadMetatraceResult;
+} // namespace pbzero
+} // namespace protos
+
namespace trace_processor {
+class Iterator;
class TraceProcessor;
// This class handles the binary {,un}marshalling for the Trace Processor RPC
@@ -55,6 +66,38 @@
Rpc();
~Rpc();
+ // 1. TraceProcessor byte-pipe RPC interface.
+ // This is a bidirectional channel with a remote TraceProcessor instance. All
+ // it needs is a byte-oriented pipe (e.g., a TCP socket, a pipe(2) between two
+ // processes or a postmessage channel in the JS+Wasm case). The messages
+ // exchanged on these pipes are TraceProcessorRpc protos (defined in
+ // trace_processor.proto). This has been introduced in Perfetto v15.
+
+ // Pushes data received by the RPC channel into the parser. Inbound messages
+ // are tokenized and turned into TraceProcessor method invocations. |data|
+ // does not need to be a whole TraceProcessorRpc message. It can be a portion
+ // of it or a union of >1 messages.
+ // Responses are sent throught the RpcResponseFunction (below).
+ void OnRpcRequest(const void* data, size_t len);
+
+ // The size argument is a uint32_t and not size_t to avoid ABI mismatches
+ // with Wasm, where size_t = uint32_t.
+ // (nullptr, 0) has the semantic of "close the channel" and is issued when an
+ // unrecoverable wire-protocol framing error is detected.
+ using RpcResponseFunction = void (*)(const void* /*data*/, uint32_t /*len*/);
+ void SetRpcResponseFunction(RpcResponseFunction f) { rpc_response_fn_ = f; }
+
+ // 2. TraceProcessor legacy RPC endpoints.
+ // The methods below are exposed for the old RPC interfaces, where each RPC
+ // implementation deals with the method demuxing: (i) wasm_bridge.cc has one
+ // exported C function per method (going away soon); (ii) httpd.cc has one
+ // REST endpoint per method. Over time this turned out to have too much
+ // duplicated boilerplate and we moved to the byte-pipe model above.
+ // We still keep these endpoints around, because httpd.cc still exposes the
+ // individual REST endpoints to legacy clients (TP's Python API). The
+ // mainteinance cost of those is very low. Both the new byte-pipe and the
+ // old endpoints run exactly the same code. The {de,}serialization format is
+ // the same, the only difference is only who does the method demuxing.
// The methods of this class are mirrors (modulo {un,}marshalling of args) of
// the corresponding names in trace_processor.h . See that header for docs.
@@ -62,7 +105,6 @@
void NotifyEndOfFile();
std::string GetCurrentTraceName();
std::vector<uint8_t> ComputeMetric(const uint8_t* data, size_t len);
- std::vector<uint8_t> GetMetricDescriptors(const uint8_t* data, size_t len);
void EnableMetatrace();
std::vector<uint8_t> DisableAndReadMetatrace();
@@ -113,10 +155,25 @@
std::vector<uint8_t> RawQuery(const uint8_t* args, size_t len);
private:
+ void ParseRpcRequest(const uint8_t* data, size_t len);
+ void ResetTraceProcessor();
void MaybePrintProgress();
+ Iterator QueryInternal(const uint8_t* args, size_t len);
+ void RawQueryInternal(const uint8_t* args,
+ size_t len,
+ protos::pbzero::RawQueryResult*);
+ void ComputeMetricInternal(const uint8_t* args,
+ size_t len,
+ protos::pbzero::ComputeMetricResult*);
+ void DisableAndReadMetatraceInternal(
+ protos::pbzero::DisableAndReadMetatraceResult*);
std::unique_ptr<TraceProcessor> trace_processor_;
- bool eof_ = true; // Reset when calling Parse().
+ RpcResponseFunction rpc_response_fn_;
+ protozero::ProtoRingBuffer rxbuf_;
+ int64_t tx_seq_id_ = 0;
+ int64_t rx_seq_id_ = 0;
+ bool eof_ = false;
int64_t t_parse_started_ = 0;
size_t bytes_last_progress_ = 0;
size_t bytes_parsed_ = 0;
diff --git a/src/trace_processor/rpc/wasm_bridge.cc b/src/trace_processor/rpc/wasm_bridge.cc
index 79e5dae..4d2decf 100644
--- a/src/trace_processor/rpc/wasm_bridge.cc
+++ b/src/trace_processor/rpc/wasm_bridge.cc
@@ -97,14 +97,6 @@
static_cast<uint32_t>(res.size()));
}
-void EMSCRIPTEN_KEEPALIVE trace_processor_get_metric_descriptors(uint32_t);
-void trace_processor_get_metric_descriptors(uint32_t size) {
- std::vector<uint8_t> res =
- g_trace_processor_rpc->GetMetricDescriptors(g_req_buf, size);
- g_reply(reinterpret_cast<const char*>(res.data()),
- static_cast<uint32_t>(res.size()));
-}
-
void EMSCRIPTEN_KEEPALIVE trace_processor_enable_metatrace(uint32_t);
void trace_processor_enable_metatrace(uint32_t) {
g_trace_processor_rpc->EnableMetatrace();