TraceProcesor: move from pull (BlobReader) to push architecture
So far the TraceProcessor was based on a pull architecture, where
the caller exposes a seekable pull interface (BlobReader) and the
TraceProcessor drives everything. This has two drawbacks:
- It's unnecessary: conversely to what we thought initially, we
process the trace in one linear scan and never seek back.
- For the 'load from network' case, it requires to fully buffer
the trace in JS/TS before passing that to the TP. This clearly
doesn't work for big traces, both in terms of memory and
latency.
This CL switches to a push model, where the caller simply calls
Parse(data, length) on the TP whenever a new trace chunk is
available (the caller doesn't have to care about boundaries).
This has the following advantages:
- Allows to get rid of a lot of complexity.
- Allows to directly stream chunks of the trace as they are
loaded from the network. Not only this saves memory in the
JS/TS engine, but also allows to hide the network latency
because packets are parsed in a streaming fashion as they
are received from the network.
- In the trace_processor_shell we can hide some I/O latencies
in a similar fashion, by pre-fetching the next chunk while
the trace processor is parsing the current one.
- Bump the trace_processor_shell performance to 450 MB/s.
Change-Id: I8192e83731daaffe9f3fe8e6344188de1adffbf3
diff --git a/src/trace_processor/trace_processor_shell.cc b/src/trace_processor/trace_processor_shell.cc
index 383df52..2782e34 100644
--- a/src/trace_processor/trace_processor_shell.cc
+++ b/src/trace_processor/trace_processor_shell.cc
@@ -14,6 +14,9 @@
* limitations under the License.
*/
+#include <aio.h>
+#include <fcntl.h>
+#include <sys/stat.h>
#include <unistd.h>
#include <functional>
@@ -21,8 +24,6 @@
#include "perfetto/base/build_config.h"
#include "perfetto/base/logging.h"
#include "perfetto/base/time.h"
-#include "perfetto/base/unix_task_runner.h"
-#include "src/trace_processor/file_reader.h"
#include "src/trace_processor/trace_processor.h"
#include "perfetto/trace_processor/raw_query.pb.h"
@@ -116,40 +117,73 @@
trace_file_path = argv[i];
}
- base::UnixTaskRunner task_runner;
- FileReader reader(trace_file_path, /*print_progress=*/true);
- TraceProcessor tp(&task_runner);
+ // Load the trace file into the trace processor.
+ TraceProcessor tp;
+ base::ScopedFile fd;
+ fd.reset(open(trace_file_path, O_RDONLY));
+ PERFETTO_CHECK(fd);
+
+ // Load the trace in chunks using async IO. We create a simple pipeline where,
+ // at each iteration, we parse the current chunk and asynchronously start
+ // reading the next chunk.
+
+ // 1MB chunk size seems the best tradeoff on a MacBook Pro 2013 - i7 2.8 GHz.
+ constexpr size_t kChunkSize = 1024 * 1024;
+ struct aiocb cb {};
+ cb.aio_nbytes = kChunkSize;
+ cb.aio_fildes = *fd;
+
+ // The control block has ownership of the buffer while the read is in-flight.
+ cb.aio_buf = new uint8_t[kChunkSize];
+
+ PERFETTO_CHECK(aio_read(&cb) == 0);
+ struct aiocb* aio_list[1] = {&cb};
+
+ uint64_t file_size = 0;
+ auto t_load_start = base::GetWallTimeMs();
+ for (int i = 0;; i++) {
+ if (i % 128 == 0)
+ fprintf(stderr, "\rLoading trace: %.2f MB\r", file_size / 1E6);
+
+ // Block waiting for the pending read to complete.
+ PERFETTO_CHECK(aio_suspend(aio_list, 1, nullptr) == 0);
+ auto rsize = aio_return(&cb);
+ if (rsize <= 0)
+ break;
+ file_size += static_cast<uint64_t>(rsize);
+
+ // Take ownership of the completed buffer and enqueue a new async read
+ // with a fresh buffer.
+ std::unique_ptr<uint8_t[]> buf(
+ reinterpret_cast<uint8_t*>(const_cast<void*>(cb.aio_buf)));
+ cb.aio_buf = new uint8_t[kChunkSize];
+ cb.aio_offset += rsize;
+ PERFETTO_CHECK(aio_read(&cb) == 0);
+
+ // Parse the completed buffer while the async read is in-flight.
+ tp.Parse(std::move(buf), static_cast<size_t>(rsize));
+ }
+ double t_load = (base::GetWallTimeMs() - t_load_start).count() / 1E3;
+ double size_mb = file_size / 1E6;
+ PERFETTO_ILOG("Trace loaded: %.2f MB (%.1f MB/s)", size_mb, size_mb / t_load);
g_tp = &tp;
- task_runner.PostTask([&reader]() {
- auto t_start = base::GetWallTimeMs();
- auto on_trace_loaded = [t_start, &reader] {
#if PERFETTO_HAS_SIGNAL_H()
- signal(SIGINT, [](int) { g_tp->InterruptQuery(); });
+ signal(SIGINT, [](int) { g_tp->InterruptQuery(); });
#endif
- double s = (base::GetWallTimeMs() - t_start).count() / 1000.0;
- double size_mb = reader.file_size() / 1000000.0;
- PERFETTO_ILOG("Trace loaded: %.2f MB (%.1f MB/s)", size_mb, size_mb / s);
- PrintPrompt();
- };
- g_tp->LoadTrace(&reader, on_trace_loaded);
- });
- task_runner.AddFileDescriptorWatch(STDIN_FILENO, [&task_runner] {
+ for (;;) {
+ PrintPrompt();
char line[1024];
- if (!fgets(line, sizeof(line) - 1, stdin) || strcmp(line, "q\n") == 0) {
- task_runner.Quit();
- return;
- }
+ if (!fgets(line, sizeof(line) - 1, stdin) || strcmp(line, "q\n") == 0)
+ return 0;
+ if (strcmp(line, "\n") == 0)
+ continue;
protos::RawQueryArgs query;
query.set_sql_query(line);
base::TimeNanos t_start = base::GetWallTimeNs();
g_tp->ExecuteQuery(query, [t_start](const protos::RawQueryResult& res) {
OnQueryResult(t_start, res);
});
- PrintPrompt();
- });
-
- task_runner.Run();
- return 0;
+ }
}