Trace Processor: cleanup RPC interface
Minor cleanups to the RPC interface for the --httpd feature.
- Makes the RPC vs original TraceProcessor interfaces more
uniform and code-search friendly.
- Adds a Get/SetCurrentTraceName() method pair.
- Adds the ability to the Rpc class to adopt an existing instance.
This will be used for trace_processor_shell -D trace_from_cmdline.
- Makes method naming more consistent.
- Moved trace_processor.proto definitions within the
perfetto.trace_processor namespace
Bug: 143074239
Change-Id: I29e5b6f5216c3a10afbe036c60f758cbe5dfa172
diff --git a/include/perfetto/trace_processor/trace_processor.h b/include/perfetto/trace_processor/trace_processor.h
index 0d36204..d469937 100644
--- a/include/perfetto/trace_processor/trace_processor.h
+++ b/include/perfetto/trace_processor/trace_processor.h
@@ -105,10 +105,18 @@
// Interrupts the current query. Typically used by Ctrl-C handler.
virtual void InterruptQuery() = 0;
- // Deletes all tables and view that have been create (by the UI or user) after
- // the trace was loaded. It preserves the built-in tables/view created by the
- // loading process. Returns the number of table/views deleted.
+ // Deletes all tables and views that have been created (by the UI or user)
+ // after the trace was loaded. It preserves the built-in tables/view created
+ // by the ingestion process. Returns the number of table/views deleted.
virtual size_t RestoreInitialTables() = 0;
+
+ // Sets/returns the name of the currently loaded trace or an empty string if
+ // no trace is fully loaded yet. This has no effect on the Trace Processor
+ // functionality and is used for UI purposes only.
+ // The returned name is NOT a path and will contain extra text w.r.t. the
+ // argument originally passed to SetCurrentTraceName(), e.g., "file (42 MB)".
+ virtual std::string GetCurrentTraceName() = 0;
+ virtual void SetCurrentTraceName(const std::string&) = 0;
};
// When set, logs SQLite actions on the console.
diff --git a/protos/perfetto/trace_processor/trace_processor.proto b/protos/perfetto/trace_processor/trace_processor.proto
index 904b902..45804b7 100644
--- a/protos/perfetto/trace_processor/trace_processor.proto
+++ b/protos/perfetto/trace_processor/trace_processor.proto
@@ -17,8 +17,24 @@
syntax = "proto2";
option optimize_for = LITE_RUNTIME;
-package perfetto.protos;
+package perfetto.trace_processor.protos;
+// This file defines the schema for {,un}marshalling arguments and return values
+// when interfacing to the trace processor binary interface.
+
+// The Trace Processor can be used in three modes:
+// 1. Fully native from C++ or directly using trace_processor_shell.
+// In this case, this file isn't really relevant because no binary
+// marshalling is involved. Look at include/trace_processor/trace_processor.h
+// for the public C++ API definition.
+// 2. Using WASM within the HTML ui. In this case these messages are used to
+// {,un}marshall calls made through the JS<>WASM interop in
+// src/trace_processor/rpc/wasm_bridge.cc .
+// 3. Using the HTTP+RPC interface, by running trace_processor_shell -D.
+// In this case these messages are used to {,un}marshall HTTP requests and
+// response made through src/trace_processor/rpc/httpd.cc .
+
+// Input for the /raw_query endpoint.
message RawQueryArgs {
optional string sql_query = 1;
@@ -26,6 +42,7 @@
optional uint64 time_queued_ns = 2;
}
+// Output for the /raw_query endpoint.
message RawQueryResult {
message ColumnDesc {
optional string name = 1;
@@ -55,3 +72,14 @@
optional string error = 4;
optional uint64 execution_time_ns = 5;
}
+
+// Input for the /status endpoint.
+message StatusArgs {}
+
+// Output for the /status endpoint.
+message StatusResult {
+ // If present and not empty, a trace is already loaded already. This happens
+ // when using the HTTP+RPC mode nad passing a trace file to the shell, via
+ // trace_processor_shell -D trace_file.pftrace .
+ optional string loaded_trace_name = 1;
+}
diff --git a/src/trace_processor/read_trace.cc b/src/trace_processor/read_trace.cc
index 8ba8f16..0d73cf9 100644
--- a/src/trace_processor/read_trace.cc
+++ b/src/trace_processor/read_trace.cc
@@ -113,6 +113,7 @@
#endif // PERFETTO_HAS_AIO_H()
tp->NotifyEndOfFile();
+ tp->SetCurrentTraceName(filename);
if (progress_callback)
progress_callback(file_size);
diff --git a/src/trace_processor/rpc/BUILD.gn b/src/trace_processor/rpc/BUILD.gn
index f6c188d..02ef12f 100644
--- a/src/trace_processor/rpc/BUILD.gn
+++ b/src/trace_processor/rpc/BUILD.gn
@@ -47,6 +47,7 @@
"../../../protos/perfetto/trace_processor:zero",
"../../base",
"../../base:unix_socket",
+ "../../protozero",
]
}
}
diff --git a/src/trace_processor/rpc/httpd.cc b/src/trace_processor/rpc/httpd.cc
index c7c35bf..73c1bca 100644
--- a/src/trace_processor/rpc/httpd.cc
+++ b/src/trace_processor/rpc/httpd.cc
@@ -28,6 +28,7 @@
#include "perfetto/ext/base/string_view.h"
#include "perfetto/ext/base/unix_socket.h"
#include "perfetto/ext/base/unix_task_runner.h"
+#include "perfetto/protozero/scattered_heap_buffer.h"
#include "perfetto/trace_processor/trace_processor.h"
#include "src/trace_processor/rpc/rpc.h"
@@ -65,6 +66,7 @@
class HttpServer : public base::UnixSocket::EventListener {
public:
+ explicit HttpServer(std::unique_ptr<TraceProcessor>);
~HttpServer() override;
void Run();
@@ -120,6 +122,8 @@
sock->Shutdown(/*notify=*/true);
}
+HttpServer::HttpServer(std::unique_ptr<TraceProcessor> preloaded_instance)
+ : trace_processor_rpc_(std::move(preloaded_instance)) {}
HttpServer::~HttpServer() = default;
void HttpServer::Run() {
@@ -262,18 +266,22 @@
}
if (req.uri == "/parse") {
- trace_processor_rpc_.LoadTrace(
- reinterpret_cast<const uint8_t*>(req.body.data()), req.body.size(),
- /*eof=*/false);
+ trace_processor_rpc_.Parse(
+ reinterpret_cast<const uint8_t*>(req.body.data()), req.body.size());
return HttpReply(client->sock.get(), "200 OK", headers);
}
- if (req.uri == "/notifyeof") {
- trace_processor_rpc_.LoadTrace(nullptr, 0, /*eof=*/true);
+ if (req.uri == "/notify_eof") {
+ trace_processor_rpc_.NotifyEndOfFile();
return HttpReply(client->sock.get(), "200 OK", headers);
}
- if (req.uri == "/rawquery") {
+ if (req.uri == "/restore_initial_tables") {
+ trace_processor_rpc_.RestoreInitialTables();
+ return HttpReply(client->sock.get(), "200 OK", headers);
+ }
+
+ if (req.uri == "/raw_query") {
PERFETTO_CHECK(req.body.size() > 0u);
std::vector<uint8_t> response = trace_processor_rpc_.RawQuery(
reinterpret_cast<const uint8_t*>(req.body.data()), req.body.size());
@@ -282,7 +290,12 @@
}
if (req.uri == "/status") {
- return HttpReply(client->sock.get(), "200 OK", headers, nullptr, 0);
+ protozero::HeapBuffered<protos::pbzero::StatusResult> res;
+ res->set_loaded_trace_name(
+ trace_processor_rpc_.GetCurrentTraceName().c_str());
+ std::vector<uint8_t> buf = res.SerializeAsArray();
+ return HttpReply(client->sock.get(), "200 OK", headers, buf.data(),
+ buf.size());
}
return HttpReply(client->sock.get(), "404 Not Found", headers);
@@ -290,8 +303,8 @@
} // namespace
-void RunHttpRPCServer() {
- HttpServer srv;
+void RunHttpRPCServer(std::unique_ptr<TraceProcessor> preloaded_instance) {
+ HttpServer srv(std::move(preloaded_instance));
srv.Run();
}
diff --git a/src/trace_processor/rpc/httpd.h b/src/trace_processor/rpc/httpd.h
index 235e083..b145da8 100644
--- a/src/trace_processor/rpc/httpd.h
+++ b/src/trace_processor/rpc/httpd.h
@@ -17,12 +17,19 @@
#ifndef SRC_TRACE_PROCESSOR_RPC_HTTPD_H_
#define SRC_TRACE_PROCESSOR_RPC_HTTPD_H_
+#include <memory>
+
namespace perfetto {
namespace trace_processor {
+class TraceProcessor;
+
// Starts a RPC server that handles requests using protobuf-over-HTTP.
// It takes control of the calling thread and does not return.
-void RunHttpRPCServer();
+// The unique_ptr argument is optional. If non-null, the HTTP server will adopt
+// an existing instance with a pre-loaded trace. If null, it will create a new
+// instance when pushing data into the /parse endpoint.
+void RunHttpRPCServer(std::unique_ptr<TraceProcessor>);
} // namespace trace_processor
} // namespace perfetto
diff --git a/src/trace_processor/rpc/rpc.cc b/src/trace_processor/rpc/rpc.cc
index a006b19..78566bb 100644
--- a/src/trace_processor/rpc/rpc.cc
+++ b/src/trace_processor/rpc/rpc.cc
@@ -32,9 +32,12 @@
// Writes a "Loading trace ..." update every N bytes.
constexpr size_t kProgressUpdateBytes = 50 * 1000 * 1000;
+Rpc::Rpc(std::unique_ptr<TraceProcessor> preloaded_instance)
+ : trace_processor_(std::move(preloaded_instance)) {}
+
Rpc::~Rpc() = default;
-util::Status Rpc::LoadTrace(const uint8_t* data, size_t len, bool eof) {
+util::Status Rpc::Parse(const uint8_t* data, size_t len) {
if (eof_) {
// Reset the trace processor state if this is either the first call ever or
// if another trace has been previously fully loaded.
@@ -42,26 +45,37 @@
bytes_parsed_ = bytes_last_progress_ = 0;
t_parse_started_ = base::GetWallTimeNs().count();
}
- eof_ = eof;
+
+ eof_ = false;
bytes_parsed_ += len;
- if (eof || bytes_parsed_ - bytes_last_progress_ > kProgressUpdateBytes) {
+ MaybePrintProgress();
+
+ if (len == 0)
+ return util::OkStatus();
+
+ // TraceProcessor needs take ownership of the memory chunk.
+ std::unique_ptr<uint8_t[]> data_copy(new uint8_t[len]);
+ memcpy(data_copy.get(), data, len);
+ return trace_processor_->Parse(std::move(data_copy), len);
+}
+
+void Rpc::NotifyEndOfFile() {
+ if (!trace_processor_)
+ return;
+ trace_processor_->NotifyEndOfFile();
+ eof_ = true;
+ MaybePrintProgress();
+}
+
+void Rpc::MaybePrintProgress() {
+ if (eof_ || bytes_parsed_ - bytes_last_progress_ > kProgressUpdateBytes) {
bytes_last_progress_ = bytes_parsed_;
auto t_load_s = (base::GetWallTimeNs().count() - t_parse_started_) / 1e9;
fprintf(stderr, "\rLoading trace %.2f MB (%.1f MB/s)%s",
bytes_parsed_ / 1e6, bytes_parsed_ / 1e6 / t_load_s,
- (eof ? "\n" : ""));
+ (eof_ ? "\n" : ""));
fflush(stderr);
}
- util::Status res;
- if (len) {
- // TraceProcessor needs take ownership of the memory chunk.
- std::unique_ptr<uint8_t[]> data_copy(new uint8_t[len]);
- memcpy(data_copy.get(), data, len);
- res = trace_processor_->Parse(std::move(data_copy), len);
- }
- if (eof)
- trace_processor_->NotifyEndOfFile();
- return res;
}
std::vector<uint8_t> Rpc::RawQuery(const uint8_t* args, size_t len) {
@@ -69,6 +83,14 @@
protos::pbzero::RawQueryArgs::Decoder query(args, len);
std::string sql_query = query.sql_query().ToStdString();
PERFETTO_DLOG("[RPC] RawQuery < %s", sql_query.c_str());
+
+ if (!trace_processor_) {
+ static const char kErr[] = "RawQuery() called before Parse()";
+ PERFETTO_ELOG("[RPC] %s", kErr);
+ result->set_error(kErr);
+ return result.SerializeAsArray();
+ }
+
auto it = trace_processor_->ExecuteQuery(sql_query.c_str());
// This vector contains a standalone protozero message per column. The problem
@@ -187,5 +209,16 @@
return result.SerializeAsArray();
}
+std::string Rpc::GetCurrentTraceName() {
+ if (!trace_processor_)
+ return "";
+ return trace_processor_->GetCurrentTraceName();
+}
+
+void Rpc::RestoreInitialTables() {
+ if (trace_processor_)
+ trace_processor_->RestoreInitialTables();
+}
+
} // namespace trace_processor
} // namespace perfetto
diff --git a/src/trace_processor/rpc/rpc.h b/src/trace_processor/rpc/rpc.h
index 6cf91c2..115b818 100644
--- a/src/trace_processor/rpc/rpc.h
+++ b/src/trace_processor/rpc/rpc.h
@@ -46,32 +46,27 @@
// lifetime is tied to the lifetime of the Rpc instance.
class Rpc {
public:
+ // The unique_ptr argument is optional. If non-null it will adopt the passed
+ // instance and allow to directly query that. If null, a new instanace will be
+ // created internally by calling Parse().
+ explicit Rpc(std::unique_ptr<TraceProcessor>);
+ Rpc() : Rpc(nullptr) {}
~Rpc();
- // Loads a trace into the trace processor. Chunked loading is supported for
- // avoid buffering multi-GB traces in memory.
- // Args:
- // (data, len): a protobuf-encoded buffer for of a Trace, or just a portion
- // of it (if eof=false). In the case of eof=false, the passed buffer does
- // NOT need to be chunked on TracePacket boundaries. The internals deal
- // with stitching packets together. This is to allow clients to easily chunk
- // large trace files with fixed arbitrary chunk sizes.
- // eof: if true, this is the last chunk of the trace. The TraceProcessor
- // will flush its internal state and reflect all the data ingested until now
- // into the SQL tables. A further call to LoadTrace() after eof=true will
- // completely reset the TraceProcessor state and restart from scratch.
- util::Status LoadTrace(const uint8_t* data, size_t len, bool eof = true);
+ // The methods of this class are mirrors (modulo {un,}marshalling of args) of
+ // the corresponding names in trace_processor.h . See that header for docs.
- // Executes a SQL query and returns the results.
- // Args: RawQueryArgs proto-encoded bytes.
- // Returns: RawQueryResult proto-encoded bytes.
- // See trace_processor.proto for the proto schema.
- // If the query fails the |error| RawQueryResult.field is set accordingly
+ util::Status Parse(const uint8_t* data, size_t len);
+ void NotifyEndOfFile();
std::vector<uint8_t> RawQuery(const uint8_t* args, size_t len);
+ void RestoreInitialTables();
+ std::string GetCurrentTraceName();
private:
+ void MaybePrintProgress();
+
std::unique_ptr<TraceProcessor> trace_processor_;
- bool eof_ = true; // Reset when calling LoadTrace(..., eof).
+ bool eof_ = true; // Reset when calling Parse().
int64_t t_parse_started_ = 0;
size_t bytes_last_progress_ = 0;
size_t bytes_parsed_ = 0;
diff --git a/src/trace_processor/rpc/wasm_bridge.cc b/src/trace_processor/rpc/wasm_bridge.cc
index 8ebca83..82ae660 100644
--- a/src/trace_processor/rpc/wasm_bridge.cc
+++ b/src/trace_processor/rpc/wasm_bridge.cc
@@ -62,11 +62,10 @@
// Ingests trace data.
void EMSCRIPTEN_KEEPALIVE trace_processor_parse(uint32_t);
void trace_processor_parse(size_t size) {
- // TODO(primiano): LoadTrace() makes a copy of the data, which is unfortunate.
+ // TODO(primiano): Parse() makes a copy of the data, which is unfortunate.
// Ideally there should be a way to take the Blob coming from JS and move it.
// See https://github.com/WebAssembly/design/issues/1162.
- auto status =
- g_trace_processor_rpc->LoadTrace(g_req_buf, size, /*eof=*/false);
+ auto status = g_trace_processor_rpc->Parse(g_req_buf, size);
if (status.ok()) {
g_reply("", 0);
} else {
@@ -79,7 +78,7 @@
// arguments for simplicity.
void EMSCRIPTEN_KEEPALIVE trace_processor_notify_eof(uint32_t);
void trace_processor_notify_eof(uint32_t /* size, not used. */) {
- g_trace_processor_rpc->LoadTrace(nullptr, 0, /*eof=*/true);
+ g_trace_processor_rpc->NotifyEndOfFile();
g_reply("", 0);
}
diff --git a/src/trace_processor/trace_processor_impl.cc b/src/trace_processor/trace_processor_impl.cc
index 95f3718..64c558a 100644
--- a/src/trace_processor/trace_processor_impl.cc
+++ b/src/trace_processor/trace_processor_impl.cc
@@ -364,10 +364,25 @@
util::Status TraceProcessorImpl::Parse(std::unique_ptr<uint8_t[]> data,
size_t size) {
+ bytes_parsed_ += size;
return TraceProcessorStorageImpl::Parse(std::move(data), size);
}
+std::string TraceProcessorImpl::GetCurrentTraceName() {
+ if (current_trace_name_.empty())
+ return "";
+ auto size = " (" + std::to_string(bytes_parsed_ / 1024 / 1024) + " MB)";
+ return current_trace_name_ + size;
+}
+
+void TraceProcessorImpl::SetCurrentTraceName(const std::string& name) {
+ current_trace_name_ = name;
+}
+
void TraceProcessorImpl::NotifyEndOfFile() {
+ if (current_trace_name_.empty())
+ current_trace_name_ = "Unnamed trace";
+
TraceProcessorStorageImpl::NotifyEndOfFile();
BuildBoundsTable(*db_, context_.storage->GetTraceTimestampBoundsNs());
diff --git a/src/trace_processor/trace_processor_impl.h b/src/trace_processor/trace_processor_impl.h
index 22bbbf0..3149225 100644
--- a/src/trace_processor/trace_processor_impl.h
+++ b/src/trace_processor/trace_processor_impl.h
@@ -70,6 +70,9 @@
size_t RestoreInitialTables() override;
+ std::string GetCurrentTraceName() override;
+ void SetCurrentTraceName(const std::string&) override;
+
private:
// Needed for iterators to be able to delete themselves from the vector.
friend class IteratorImpl;
@@ -91,6 +94,9 @@
// by RestoreInitialTables() to delete all the tables/view that have been
// created after that point.
std::vector<std::string> initial_tables_;
+
+ std::string current_trace_name_;
+ uint64_t bytes_parsed_ = 0;
};
// The pointer implementation of TraceProcessor::Iterator.
diff --git a/src/trace_processor/trace_processor_shell.cc b/src/trace_processor/trace_processor_shell.cc
index 3745634..f7a2877 100644
--- a/src/trace_processor/trace_processor_shell.cc
+++ b/src/trace_processor/trace_processor_shell.cc
@@ -839,14 +839,15 @@
exit(1);
}
- // Ensure that we have the tracefile argument only at the end.
- if (optind != argc - 1 || argv[optind] == nullptr) {
+ // The only case where we allow omitting the trace file path is when running
+ // in --http mode. In all other cases, the last argument must be the trace
+ // file.
+ if (optind == argc - 1 && argv[optind]) {
+ command_line_options.trace_file_path = argv[optind];
+ } else if (!command_line_options.enable_httpd) {
PrintUsage(argv);
exit(1);
}
-
- command_line_options.trace_file_path = argv[optind];
-
return command_line_options;
}
@@ -892,38 +893,41 @@
int TraceProcessorMain(int argc, char** argv) {
CommandLineOptions options = ParseCommandLineOptions(argc, argv);
-#if PERFETTO_BUILDFLAG(PERFETTO_TP_HTTPD)
- if (options.enable_httpd) {
- RunHttpRPCServer();
- return 0;
- }
-#endif
-
// Load the trace file into the trace processor.
Config config;
config.force_full_sort = options.force_full_sort;
std::unique_ptr<TraceProcessor> tp = TraceProcessor::CreateInstance(config);
-
- auto t_load_start = base::GetWallTimeNs();
- double size_mb = 0;
- util::Status read_status =
- ReadTrace(tp.get(), options.trace_file_path.c_str(),
- [&size_mb](size_t parsed_size) {
- size_mb = parsed_size / 1E6;
- fprintf(stderr, "\rLoading trace: %.2f MB\r", size_mb);
- });
- if (!read_status.ok()) {
- PERFETTO_ELOG("Could not read trace file (path: %s): %s",
- options.trace_file_path.c_str(), read_status.c_message());
- return 1;
- }
- auto t_load = base::GetWallTimeNs() - t_load_start;
- double t_load_s = t_load.count() / 1E9;
- PERFETTO_ILOG("Trace loaded: %.2f MB (%.1f MB/s)", size_mb,
- size_mb / t_load_s);
g_tp = tp.get();
+ base::TimeNanos t_load{};
+ if (!options.trace_file_path.empty()) {
+ auto t_load_start = base::GetWallTimeNs();
+ double size_mb = 0;
+ util::Status read_status =
+ ReadTrace(tp.get(), options.trace_file_path.c_str(),
+ [&size_mb](size_t parsed_size) {
+ size_mb = parsed_size / 1E6;
+ fprintf(stderr, "\rLoading trace: %.2f MB\r", size_mb);
+ });
+ if (!read_status.ok()) {
+ PERFETTO_ELOG("Could not read trace file (path: %s): %s",
+ options.trace_file_path.c_str(), read_status.c_message());
+ return 1;
+ }
+ t_load = base::GetWallTimeNs() - t_load_start;
+ double t_load_s = t_load.count() / 1E9;
+ PERFETTO_ILOG("Trace loaded: %.2f MB (%.1f MB/s)", size_mb,
+ size_mb / t_load_s);
+ } // if (!trace_file_path.empty())
+
+#if PERFETTO_BUILDFLAG(PERFETTO_TP_HTTPD)
+ if (options.enable_httpd) {
+ RunHttpRPCServer(std::move(tp));
+ return 0;
+ }
+#endif
+
#if PERFETTO_HAS_SIGNAL_H()
signal(SIGINT, [](int) { g_tp->InterruptQuery(); });
#endif
diff --git a/ui/src/common/protos.ts b/ui/src/common/protos.ts
index a3b7866..49533d8 100644
--- a/ui/src/common/protos.ts
+++ b/ui/src/common/protos.ts
@@ -33,18 +33,21 @@
import IAndroidPowerConfig = protos.perfetto.protos.IAndroidPowerConfig;
import IBufferConfig = protos.perfetto.protos.TraceConfig.IBufferConfig;
import IProcessStatsConfig = protos.perfetto.protos.IProcessStatsConfig;
-import IRawQueryArgs = protos.perfetto.protos.IRawQueryArgs;
import ISysStatsConfig = protos.perfetto.protos.ISysStatsConfig;
import ITraceConfig = protos.perfetto.protos.ITraceConfig;
import MeminfoCounters = protos.perfetto.protos.MeminfoCounters;
import ProcessStatsConfig = protos.perfetto.protos.ProcessStatsConfig;
-import RawQueryArgs = protos.perfetto.protos.RawQueryArgs;
-import RawQueryResult = protos.perfetto.protos.RawQueryResult;
import StatCounters = protos.perfetto.protos.SysStatsConfig.StatCounters;
import SysStatsConfig = protos.perfetto.protos.SysStatsConfig;
import TraceConfig = protos.perfetto.protos.TraceConfig;
import VmstatCounters = protos.perfetto.protos.VmstatCounters;
+// Trace Processor protos.
+import IRawQueryArgs = protos.perfetto.trace_processor.protos.IRawQueryArgs;
+import RawQueryArgs = protos.perfetto.trace_processor.protos.RawQueryArgs;
+import RawQueryResult = protos.perfetto.trace_processor.protos.RawQueryResult;
+import StatusResult = protos.perfetto.trace_processor.protos.StatusResult;
+
// TODO(hjd): Maybe these should go in their own file.
export interface Row { [key: string]: number|string; }
@@ -218,6 +221,7 @@
ProcessStatsConfig,
RawQueryArgs,
RawQueryResult,
+ StatusResult,
StatCounters,
SysStatsConfig,
TraceConfig,