TraceProcesor: move from pull (BlobReader) to push architecture

So far the TraceProcessor was based on a pull architecture, where
the caller exposes a seekable pull interface (BlobReader) and the
TraceProcessor drives everything. This has two drawbacks:
- It's unnecessary: conversely to what we thought initially, we
  process the trace in one linear scan and never seek back.
- For the 'load from network' case, it requires to fully buffer
  the trace in JS/TS before passing that to the TP. This clearly
  doesn't work for big traces, both in terms of memory and
  latency.

This CL switches to a push model, where the caller simply calls
Parse(data, length) on the TP whenever a new trace chunk is
available (the caller doesn't have to care about boundaries).

This has the following advantages:
- Allows to get rid of a lot of complexity.
- Allows to directly stream chunks of the trace as they are
  loaded from the network. Not only this saves memory in the
  JS/TS engine, but also allows to hide the network latency
  because packets are parsed in a streaming fashion as they
  are received from the network.
- In the trace_processor_shell we can hide some I/O latencies
  in a similar fashion, by pre-fetching the next chunk while
  the trace processor is parsing the current one.
- Bump the trace_processor_shell performance to 450 MB/s.

Change-Id: I8192e83731daaffe9f3fe8e6344188de1adffbf3
diff --git a/src/trace_processor/trace_processor_shell.cc b/src/trace_processor/trace_processor_shell.cc
index 383df52..2782e34 100644
--- a/src/trace_processor/trace_processor_shell.cc
+++ b/src/trace_processor/trace_processor_shell.cc
@@ -14,6 +14,9 @@
  * limitations under the License.
  */
 
+#include <aio.h>
+#include <fcntl.h>
+#include <sys/stat.h>
 #include <unistd.h>
 
 #include <functional>
@@ -21,8 +24,6 @@
 #include "perfetto/base/build_config.h"
 #include "perfetto/base/logging.h"
 #include "perfetto/base/time.h"
-#include "perfetto/base/unix_task_runner.h"
-#include "src/trace_processor/file_reader.h"
 #include "src/trace_processor/trace_processor.h"
 
 #include "perfetto/trace_processor/raw_query.pb.h"
@@ -116,40 +117,73 @@
     trace_file_path = argv[i];
   }
 
-  base::UnixTaskRunner task_runner;
-  FileReader reader(trace_file_path, /*print_progress=*/true);
-  TraceProcessor tp(&task_runner);
+  // Load the trace file into the trace processor.
+  TraceProcessor tp;
+  base::ScopedFile fd;
+  fd.reset(open(trace_file_path, O_RDONLY));
+  PERFETTO_CHECK(fd);
+
+  // Load the trace in chunks using async IO. We create a simple pipeline where,
+  // at each iteration, we parse the current chunk and asynchronously start
+  // reading the next chunk.
+
+  // 1MB chunk size seems the best tradeoff on a MacBook Pro 2013 - i7 2.8 GHz.
+  constexpr size_t kChunkSize = 1024 * 1024;
+  struct aiocb cb {};
+  cb.aio_nbytes = kChunkSize;
+  cb.aio_fildes = *fd;
+
+  // The control block has ownership of the buffer while the read is in-flight.
+  cb.aio_buf = new uint8_t[kChunkSize];
+
+  PERFETTO_CHECK(aio_read(&cb) == 0);
+  struct aiocb* aio_list[1] = {&cb};
+
+  uint64_t file_size = 0;
+  auto t_load_start = base::GetWallTimeMs();
+  for (int i = 0;; i++) {
+    if (i % 128 == 0)
+      fprintf(stderr, "\rLoading trace: %.2f MB\r", file_size / 1E6);
+
+    // Block waiting for the pending read to complete.
+    PERFETTO_CHECK(aio_suspend(aio_list, 1, nullptr) == 0);
+    auto rsize = aio_return(&cb);
+    if (rsize <= 0)
+      break;
+    file_size += static_cast<uint64_t>(rsize);
+
+    // Take ownership of the completed buffer and enqueue a new async read
+    // with a fresh buffer.
+    std::unique_ptr<uint8_t[]> buf(
+        reinterpret_cast<uint8_t*>(const_cast<void*>(cb.aio_buf)));
+    cb.aio_buf = new uint8_t[kChunkSize];
+    cb.aio_offset += rsize;
+    PERFETTO_CHECK(aio_read(&cb) == 0);
+
+    // Parse the completed buffer while the async read is in-flight.
+    tp.Parse(std::move(buf), static_cast<size_t>(rsize));
+  }
+  double t_load = (base::GetWallTimeMs() - t_load_start).count() / 1E3;
+  double size_mb = file_size / 1E6;
+  PERFETTO_ILOG("Trace loaded: %.2f MB (%.1f MB/s)", size_mb, size_mb / t_load);
   g_tp = &tp;
 
-  task_runner.PostTask([&reader]() {
-    auto t_start = base::GetWallTimeMs();
-    auto on_trace_loaded = [t_start, &reader] {
 #if PERFETTO_HAS_SIGNAL_H()
-      signal(SIGINT, [](int) { g_tp->InterruptQuery(); });
+  signal(SIGINT, [](int) { g_tp->InterruptQuery(); });
 #endif
-      double s = (base::GetWallTimeMs() - t_start).count() / 1000.0;
-      double size_mb = reader.file_size() / 1000000.0;
-      PERFETTO_ILOG("Trace loaded: %.2f MB (%.1f MB/s)", size_mb, size_mb / s);
-      PrintPrompt();
-    };
-    g_tp->LoadTrace(&reader, on_trace_loaded);
-  });
 
-  task_runner.AddFileDescriptorWatch(STDIN_FILENO, [&task_runner] {
+  for (;;) {
+    PrintPrompt();
     char line[1024];
-    if (!fgets(line, sizeof(line) - 1, stdin) || strcmp(line, "q\n") == 0) {
-      task_runner.Quit();
-      return;
-    }
+    if (!fgets(line, sizeof(line) - 1, stdin) || strcmp(line, "q\n") == 0)
+      return 0;
+    if (strcmp(line, "\n") == 0)
+      continue;
     protos::RawQueryArgs query;
     query.set_sql_query(line);
     base::TimeNanos t_start = base::GetWallTimeNs();
     g_tp->ExecuteQuery(query, [t_start](const protos::RawQueryResult& res) {
       OnQueryResult(t_start, res);
     });
-    PrintPrompt();
-  });
-
-  task_runner.Run();
-  return 0;
+  }
 }