traced_probes ftrace: switch back to single-threaded, nonblocking read(2)-only approach

Per the recent performance measurements (go/perfetto-bg-cpu), we no longer
think that the multi-threading and block/nonblock splice/read approach is
worthwhile in its current state. It is not just highly complex, it also has an
unnecessarily high cpu% overhead.

The reading is now guided by a single repeating task that reads & parses the
contents of all per-cpu ftrace pipes.

What we lose in this version:
* ability to sleep until a page of ftrace events is filled (with blocking
  splice). This would only make a difference for tracing sessions with truly
  low-frequency events (not important to optimize for atm).
* scalability for many-core machines. This version works well for an 8 core
  phone, but is likely to struggle on a 64 core workstation. Let's treat this
  patch as a reset for complexity, and reintroduce it only as necessary.
  Update: ran on my 72 core dev workstation as a smoke test, it kept up fine.
* possibly splice efficiency? Haven't tried a single-thread splicer, but the
  bigger immediate wins are probably in the parsing code (this version is at
  5:1 utime:stime ratio according to my measurements).

Rough measurements of traced_probes cpu% on a crosshatch (standalone ndk build
+ tmux script), with the methodology as in go/perfetto-bg-cpu:

tuned cfg: 32k page (i.e. chunk) size, 1s ftrace drain period.

idle device, default cfg: 2.4%
idle device, tuned cfg: <1%

video rec, default cfg: 13.5%
video rec, tuned cfg: 6%

So we're doing much better with a tuned config, waking up for 60ms to process
all cores once a second.

Unfortunately this patch is lacking in programmatic tests, I'm not really sure
which ones would be worthwhile with the existing mock-heavy test setups. This
will likely require a separate pass (in a separate cl) to be more
unit-testable (it'd be nice to test the cpu_reader loop stop/continue logic).

Bug: 133312949
Change-Id: Ia79a267f43214f336b5396f4dd5789bc49ab1e67
diff --git a/Android.bp b/Android.bp
index 924cf8c..2ceb40d 100644
--- a/Android.bp
+++ b/Android.bp
@@ -310,7 +310,6 @@
     "src/traced/probes/ftrace/ftrace_metadata.cc",
     "src/traced/probes/ftrace/ftrace_procfs.cc",
     "src/traced/probes/ftrace/ftrace_stats.cc",
-    "src/traced/probes/ftrace/page_pool.cc",
     "src/traced/probes/ftrace/proto_translation_table.cc",
     "src/traced/probes/metatrace/metatrace_data_source.cc",
     "src/traced/probes/packages_list/packages_list_data_source.cc",
@@ -846,7 +845,6 @@
     "src/traced/probes/ftrace/ftrace_procfs.cc",
     "src/traced/probes/ftrace/ftrace_procfs_integrationtest.cc",
     "src/traced/probes/ftrace/ftrace_stats.cc",
-    "src/traced/probes/ftrace/page_pool.cc",
     "src/traced/probes/ftrace/proto_translation_table.cc",
     "src/traced/probes/ftrace/test/cpu_reader_support.cc",
     "src/traced/probes/metatrace/metatrace_data_source.cc",
@@ -3520,8 +3518,6 @@
     "src/traced/probes/ftrace/ftrace_procfs.cc",
     "src/traced/probes/ftrace/ftrace_procfs_unittest.cc",
     "src/traced/probes/ftrace/ftrace_stats.cc",
-    "src/traced/probes/ftrace/page_pool.cc",
-    "src/traced/probes/ftrace/page_pool_unittest.cc",
     "src/traced/probes/ftrace/proto_translation_table.cc",
     "src/traced/probes/ftrace/proto_translation_table_unittest.cc",
     "src/traced/probes/ftrace/test/cpu_reader_support.cc",
diff --git a/docs/ftrace.md b/docs/ftrace.md
index c35d95c..6e29fc5 100644
--- a/docs/ftrace.md
+++ b/docs/ftrace.md
@@ -12,41 +12,5 @@
 - Describe how to generate ftrace protos (`tools/pull_ftrace_format_files.py`,
   `tools/udate_protos.py`)
 - Describe how session multiplexing works.
-- Describe the page-by-page scheduling algorithm that uses vmsplice()
 
 Code lives in [/src/traced/probes/ftrace](/src/traced/probes/ftrace/).
-
-From https://android-review.googlesource.com/c/platform/external/perfetto/+/603793/
-```
-
-  main thread                           [drain] [unblock]
-                                        /:              |
-                            post .-----' :              |
-                                /        :              v
-  worker #0  [splice ...] [wakeup] [block ............] [splice]
-                                         :
-  worker #1  [splice ...]     [wakeup] [block ........] [splice]
-                                         :
-  worker #2  [splice ..........................................]
-                                         :
-                                         :
-                                    drain period (100ms)
-
-In other words, the splice(2) system call is used to move data from
-the raw kernel ftrace pipe into an intermediate pipe at a page
-granularity. This call allows every per-cpu worker to sleep until there
-is at least one page of data available.
-
-When a worker wakes up, it will attempt to move as many pages as
-possible to its staging pipe (up to 64K, depending on the
-system's pipe buffer size) in a non-blocking way. After this, it
-will notify the main thread that data is available. This notification
-will block the calling worker until the main thread has drained the
-data.
-
-When at least one worker has woken up, we schedule a drain operation
-on the main thread for the next drain period (every 100ms by default).
-The drain operation parses ftrace data from the staging pipes of
-every worker having pending data. After this, each waiting worker is
-allowed to issue another call to splice(), restarting the cycle.
-```
diff --git a/include/perfetto/ext/base/metatrace_events.h b/include/perfetto/ext/base/metatrace_events.h
index f356313..1f2aea6 100644
--- a/include/perfetto/ext/base/metatrace_events.h
+++ b/include/perfetto/ext/base/metatrace_events.h
@@ -39,24 +39,27 @@
 // DO NOT remove or reshuffle items in this list, only append. The ID of these
 // events are an ABI, the trace processor relies on these to open old traces.
 #define PERFETTO_METATRACE_EVENTS(F) \
-  F(EVENT_ZERO_UNUSED),\
-  F(FTRACE_CPU_READER_READ), \
-  F(FTRACE_DRAIN_CPUS), \
-  F(FTRACE_UNBLOCK_READERS), \
-  F(FTRACE_CPU_READ_NONBLOCK), \
-  F(FTRACE_CPU_READ_BLOCK), \
-  F(FTRACE_CPU_SPLICE_NONBLOCK), \
-  F(FTRACE_CPU_SPLICE_BLOCK), \
-  F(FTRACE_CPU_WAIT_CMD), \
-  F(FTRACE_CPU_RUN_CYCLE), \
+  F(EVENT_ZERO_UNUSED), \
+  F(FTRACE_CPU_READER_READ), /*unused*/ \
+  F(FTRACE_DRAIN_CPUS), /*unused*/ \
+  F(FTRACE_UNBLOCK_READERS), /*unused*/ \
+  F(FTRACE_CPU_READ_NONBLOCK), /*unused*/ \
+  F(FTRACE_CPU_READ_BLOCK), /*unused*/ \
+  F(FTRACE_CPU_SPLICE_NONBLOCK), /*unused*/ \
+  F(FTRACE_CPU_SPLICE_BLOCK), /*unused*/ \
+  F(FTRACE_CPU_WAIT_CMD), /*unused*/ \
+  F(FTRACE_CPU_RUN_CYCLE), /*unused*/ \
   F(FTRACE_CPU_FLUSH), \
-  F(FTRACE_CPU_DRAIN), \
+  F(FTRACE_CPU_DRAIN), /*unused*/ \
   F(READ_SYS_STATS), \
   F(PS_WRITE_ALL_PROCESSES), \
   F(PS_ON_PIDS), \
   F(PS_ON_RENAME_PIDS), \
   F(PS_WRITE_ALL_PROCESS_STATS), \
-  F(TRACE_WRITER_COMMIT_STARTUP_WRITER_BATCH)
+  F(TRACE_WRITER_COMMIT_STARTUP_WRITER_BATCH), \
+  F(FTRACE_READ_TICK), \
+  F(FTRACE_CPU_READ_CYCLE), \
+  F(FTRACE_CPU_READ_BATCH)
 
 // Append only, see above.
 #define PERFETTO_METATRACE_COUNTERS(F) \
diff --git a/src/traced/probes/ftrace/BUILD.gn b/src/traced/probes/ftrace/BUILD.gn
index 17aa2f0..a38dbfe 100644
--- a/src/traced/probes/ftrace/BUILD.gn
+++ b/src/traced/probes/ftrace/BUILD.gn
@@ -64,7 +64,6 @@
     "ftrace_config_unittest.cc",
     "ftrace_controller_unittest.cc",
     "ftrace_procfs_unittest.cc",
-    "page_pool_unittest.cc",
     "proto_translation_table_unittest.cc",
   ]
 }
@@ -148,8 +147,6 @@
     "ftrace_procfs.h",
     "ftrace_stats.cc",
     "ftrace_stats.h",
-    "page_pool.cc",
-    "page_pool.h",
     "proto_translation_table.cc",
     "proto_translation_table.h",
   ]
diff --git a/src/traced/probes/ftrace/cpu_reader.cc b/src/traced/probes/ftrace/cpu_reader.cc
index dbd7a75..23276df 100644
--- a/src/traced/probes/ftrace/cpu_reader.cc
+++ b/src/traced/probes/ftrace/cpu_reader.cc
@@ -31,7 +31,6 @@
 #include "perfetto/ext/base/utils.h"
 #include "src/traced/probes/ftrace/ftrace_controller.h"
 #include "src/traced/probes/ftrace/ftrace_data_source.h"
-#include "src/traced/probes/ftrace/ftrace_thread_sync.h"
 #include "src/traced/probes/ftrace/proto_translation_table.h"
 
 #include "perfetto/trace/ftrace/ftrace_event.pbzero.h"
@@ -115,6 +114,11 @@
   return fcntl(fd, F_SETFL, flags) == 0;
 }
 
+// TODO(rsavitski): |overwrite| extraction seems wrong, the top part of the
+// second ("commit") field is a bit mask (see RB_MISSED_FLAGS in kernel
+// sources), not the direct count. The kernel conditionally appends the missed
+// events count to the end of the page (if there's space), which we might
+// also need to account for when parsing the page.
 base::Optional<PageHeader> ParsePageHeader(const uint8_t** ptr,
                                            uint16_t page_header_size_len) {
   const uint8_t* end_of_page = *ptr + base::kPageSize;
@@ -149,286 +153,144 @@
 using protos::pbzero::GenericFtraceEvent;
 
 CpuReader::CpuReader(const ProtoTranslationTable* table,
-                     FtraceThreadSync* thread_sync,
                      size_t cpu,
-                     int generation,
-                     base::ScopedFile fd)
-    : table_(table),
-      thread_sync_(thread_sync),
-      cpu_(cpu),
-      trace_fd_(std::move(fd)) {
-  // Make reads from the raw pipe blocking so that splice() can sleep.
+                     base::ScopedFile trace_fd)
+    : table_(table), cpu_(cpu), trace_fd_(std::move(trace_fd)) {
   PERFETTO_CHECK(trace_fd_);
-  PERFETTO_CHECK(SetBlocking(*trace_fd_, true));
-
-  // We need a non-default SIGPIPE handler to make it so that the blocking
-  // splice() is woken up when the ~CpuReader() dtor destroys the pipes.
-  // Just masking out the signal would cause an implicit syscall restart and
-  // hence make the join() in the dtor unreliable.
-  struct sigaction current_act = {};
-  PERFETTO_CHECK(sigaction(SIGPIPE, nullptr, &current_act) == 0);
-#pragma GCC diagnostic push
-#if defined(__clang__)
-#pragma GCC diagnostic ignored "-Wdisabled-macro-expansion"
-#endif
-  if (current_act.sa_handler == SIG_DFL || current_act.sa_handler == SIG_IGN) {
-    struct sigaction act = {};
-    act.sa_sigaction = [](int, siginfo_t*, void*) {};
-    PERFETTO_CHECK(sigaction(SIGPIPE, &act, nullptr) == 0);
-  }
-#pragma GCC diagnostic pop
-
-  worker_thread_ = std::thread(std::bind(&RunWorkerThread, cpu_, generation,
-                                         *trace_fd_, &pool_, thread_sync_,
-                                         table->page_header_size_len()));
+  PERFETTO_CHECK(SetBlocking(*trace_fd_, false));
 }
 
-CpuReader::~CpuReader() {
-// FtraceController (who owns this) is supposed to issue a kStop notification
-// to the thread sync object before destroying the CpuReader.
-#if PERFETTO_DCHECK_IS_ON()
+CpuReader::~CpuReader() = default;
+
+size_t CpuReader::ReadCycle(
+    uint8_t* parsing_buf,
+    size_t parsing_buf_size_pages,
+    size_t max_pages,
+    const std::set<FtraceDataSource*>& started_data_sources) {
+  PERFETTO_DCHECK(max_pages >= parsing_buf_size_pages &&
+                  max_pages % parsing_buf_size_pages == 0);
+  metatrace::ScopedEvent evt(metatrace::TAG_FTRACE,
+                             metatrace::FTRACE_CPU_READ_CYCLE);
+
+  // Work in batches to keep cache locality, and limit memory usage.
+  size_t total_pages_read = 0;
+  for (bool is_first_batch = true;; is_first_batch = false) {
+    size_t pages_read =
+        ReadAndProcessBatch(parsing_buf, parsing_buf_size_pages, is_first_batch,
+                            started_data_sources);
+
+    PERFETTO_DCHECK(pages_read <= parsing_buf_size_pages);
+    total_pages_read += pages_read;
+
+    // Check whether we've caught up to the writer, or possibly giving up on
+    // this attempt due to some error.
+    if (pages_read != parsing_buf_size_pages)
+      break;
+    // Check if we've hit the limit of work for this cycle.
+    if (total_pages_read >= max_pages)
+      break;
+  }
+  PERFETTO_METATRACE_COUNTER(TAG_FTRACE, FTRACE_PAGES_DRAINED,
+                             total_pages_read);
+  return total_pages_read;
+}
+
+// metatrace note: mark the reading phase as FTRACE_CPU_READ_BATCH, but let the
+// parsing time be implied (by the difference between the caller's span, and
+// this reading span). Makes it easier to estimate the read/parse ratio when
+// looking at the trace in the UI.
+size_t CpuReader::ReadAndProcessBatch(
+    uint8_t* parsing_buf,
+    size_t max_pages,
+    bool first_batch_in_cycle,
+    const std::set<FtraceDataSource*>& started_data_sources) {
+  size_t pages_read = 0;
   {
-    std::lock_guard<std::mutex> lock(thread_sync_->mutex);
-    PERFETTO_DCHECK(thread_sync_->cmd == FtraceThreadSync::kQuit);
-  }
-#endif
-
-  // The kernel's splice implementation for the trace pipe doesn't generate a
-  // SIGPIPE if the output pipe is closed (b/73807072). Instead, the call to
-  // close() on the pipe hangs forever. To work around this, we first close the
-  // trace fd (which prevents another splice from starting), raise SIGPIPE and
-  // wait for the worker to exit (i.e., to guarantee no splice is in progress)
-  // and only then close the staging pipe.
-  trace_fd_.reset();
-  InterruptWorkerThreadWithSignal();
-  worker_thread_.join();
-}
-
-void CpuReader::InterruptWorkerThreadWithSignal() {
-  pthread_kill(worker_thread_.native_handle(), SIGPIPE);
-}
-
-// The worker thread reads data from the ftrace trace_pipe_raw and moves it to
-// the page |pool| allowing the main thread to read and decode that.
-// See //docs/ftrace.md for the design of the ftrace worker scheduler.
-// static
-void CpuReader::RunWorkerThread(size_t cpu,
-                                int generation,
-                                int trace_fd,
-                                PagePool* pool,
-                                FtraceThreadSync* thread_sync,
-                                uint16_t header_size_len) {
-// Before attempting any changes to this function, think twice. The kernel
-// ftrace pipe code is full of caveats and bugs. This code carefully works
-// around those bugs. See b/120188810 and b/119805587 for the full narrative.
-#if PERFETTO_BUILDFLAG(PERFETTO_OS_LINUX) || \
-    PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
-  char thread_name[16];
-  snprintf(thread_name, sizeof(thread_name), "traced_probes%zu", cpu);
-  pthread_setname_np(pthread_self(), thread_name);
-
-  // When using splice() the target fd needs to be an actual pipe. This pipe is
-  // used only within this thread and is mainly for synchronization purposes.
-  // A blocking splice() is the only way to block and wait for a new page of
-  // ftrace data.
-  base::Pipe sync_pipe = base::Pipe::Create(base::Pipe::kBothNonBlock);
-
-  enum ReadMode { kRead, kSplice };
-  enum Block { kBlock, kNonBlock };
-  constexpr auto kPageSize = base::kPageSize;
-
-  // This lambda function reads the ftrace raw pipe using either read() or
-  // splice(), either in blocking or non-blocking mode.
-  // Returns the number of ftrace bytes read, or -1 in case of failure.
-  auto read_ftrace_pipe = [&sync_pipe, trace_fd, pool, header_size_len](
-                              ReadMode mode, Block block) -> int {
-    auto eid = mode == kRead
-                   ? (block == kNonBlock ? metatrace::FTRACE_CPU_READ_NONBLOCK
-                                         : metatrace::FTRACE_CPU_READ_BLOCK)
-                   : (block == kNonBlock ? metatrace::FTRACE_CPU_SPLICE_NONBLOCK
-                                         : metatrace::FTRACE_CPU_SPLICE_BLOCK);
-    metatrace::ScopedEvent evt(metatrace::TAG_FTRACE, eid);
-
-    uint8_t* pool_page = pool->BeginWrite();
-    PERFETTO_DCHECK(pool_page);
-
-    ssize_t res;
-    int err = 0;
-    if (mode == kSplice) {
-      uint32_t flg = SPLICE_F_MOVE | ((block == kNonBlock) * SPLICE_F_NONBLOCK);
-      res = splice(trace_fd, nullptr, *sync_pipe.wr, nullptr, kPageSize, flg);
-      err = errno;
-      if (res > 0) {
-        // If the splice() succeeded, read back from the other end of our own
-        // pipe and copy the data into the pool.
-        ssize_t rdres = read(*sync_pipe.rd, pool_page, kPageSize);
-        PERFETTO_DCHECK(rdres == res);
+    metatrace::ScopedEvent evt(metatrace::TAG_FTRACE,
+                               metatrace::FTRACE_CPU_READ_BATCH);
+    for (; pages_read < max_pages;) {
+      uint8_t* curr_page = parsing_buf + (pages_read * base::kPageSize);
+      ssize_t res =
+          PERFETTO_EINTR(read(*trace_fd_, curr_page, base::kPageSize));
+      if (res < 0) {
+        // Expected errors:
+        // EAGAIN: no data (since we're in non-blocking mode).
+        // ENONMEM, EBUSY: temporary ftrace failures (they happen).
+        if (errno != EAGAIN && errno != ENOMEM && errno != EBUSY)
+          PERFETTO_PLOG("Unexpected error on raw ftrace read");
+        break;  // stop reading regardless of errno
       }
-    } else {
-      if (block == kNonBlock)
-        SetBlocking(trace_fd, false);
-      res = read(trace_fd, pool_page, kPageSize);
-      err = errno;
-      if (res > 0) {
-        // Need to copy the ptr, ParsePageHeader() advances the passed ptr arg.
-        const uint8_t* ptr = pool_page;
 
-        // The caller of this function wants to have a sufficient approximation
-        // of how many bytes of ftrace data have been read. Unfortunately the
-        // return value of read() is a lie. The problem is that the ftrace
-        // read() implementation, for good reasons, always reconstructs a whole
-        // ftrace page, copying the events over and zero-filling at the end.
-        // This is nice, because we always get a valid ftrace header, but also
-        // causes read to always returns 4096. The only way to have a good
-        // indication of how many bytes of ftrace data have been read is to
-        // parse the ftrace header.
-        // Note: |header_size_len| is *not* an indication on how many bytes are
-        // available form |ptr|. It's just an independent piece of information
-        // that needs to be passed to ParsePageHeader() (a static function) in
-        // order to work.
-        base::Optional<PageHeader> hdr = ParsePageHeader(&ptr, header_size_len);
-        PERFETTO_DCHECK(hdr && hdr->size > 0 && hdr->size <= base::kPageSize);
-        res = hdr.has_value() ? static_cast<int>(hdr->size) : -1;
-      }
-      if (block == kNonBlock)
-        SetBlocking(trace_fd, true);
-    }
-
-    if (res > 0) {
-      // splice() should return full pages, read can return < a page.
-      PERFETTO_DCHECK(res == base::kPageSize || mode == kRead);
-      pool->EndWrite();
-      return static_cast<int>(res);
-    }
-
-    // It is fine to leave the BeginWrite() unpaired in the error case.
-
-    if (res && err != EAGAIN && err != ENOMEM && err != EBUSY && err != EINTR &&
-        err != EBADF) {
-      // EAGAIN: no data when in non-blocking mode.
-      // ENONMEM, EBUSY: temporary ftrace failures (they happen).
-      // EINTR: signal interruption, likely from main thread to issue a new cmd.
-      // EBADF: the main thread has closed the fd (happens during dtor).
-      PERFETTO_PLOG("Unexpected %s() err", mode == kRead ? "read" : "splice");
-    }
-    return -1;
-  };
-
-  uint64_t last_cmd_id = 0;
-  ReadMode cur_mode = kSplice;
-  for (bool run_loop = true; run_loop;) {
-    FtraceThreadSync::Cmd cmd;
-    // Wait for a new command from the main thread issued by FtraceController.
-    // The FtraceController issues also a signal() after every new command. This
-    // is not necessary for the condition variable itself, but it's necessary to
-    // unblock us if we are in a blocking read() or splice().
-    // Commands are tagged with an ID, every new command has a new |cmd_id|, so
-    // we can distinguish spurious wakeups from actual cmd requests.
-    {
-      PERFETTO_METATRACE_SCOPED(TAG_FTRACE, FTRACE_CPU_WAIT_CMD);
-      std::unique_lock<std::mutex> lock(thread_sync->mutex);
-      while (thread_sync->cmd_id == last_cmd_id)
-        thread_sync->cond.wait(lock);
-      cmd = thread_sync->cmd;
-      last_cmd_id = thread_sync->cmd_id;
-    }
-
-    // An empirical threshold (bytes read/spliced from the raw pipe) to make an
-    // educated guess on whether we should read/splice more. If we read fewer
-    // bytes it means that we caught up with the write pointer and we started
-    // consuming ftrace events in real-time. This cannot be just 4096 because
-    // it needs to account for fragmentation, i.e. for the fact that the last
-    // trace event didn't fit in the current page and hence the current page
-    // was terminated prematurely.
-    constexpr int kRoughlyAPage = 4096 - 512;
-
-    switch (cmd) {
-      case FtraceThreadSync::kQuit:
-        run_loop = false;
-        break;
-
-      case FtraceThreadSync::kRun: {
-        PERFETTO_METATRACE_SCOPED(TAG_FTRACE, FTRACE_CPU_RUN_CYCLE);
-
-        // Do a blocking read/splice. This can fail for a variety of reasons:
-        // - FtraceController interrupts us with a signal for a new cmd
-        //   (e.g. it wants us to quit or do a flush).
-        // - A temporary read/splice() failure occurred (it has been observed
-        //   to happen if the system is under high load).
-        // In all these cases the most useful thing we can do is skip the
-        // current cycle and try again later.
-        if (read_ftrace_pipe(cur_mode, kBlock) <= 0)
-          break;  // Wait for next command.
-
-        // If we are in read mode (because of a previous flush) check if the
-        // in-kernel read cursor is page-aligned again. If a non-blocking splice
-        // succeeds, it means that we can safely switch back to splice mode
-        // (See b/120188810).
-        if (cur_mode == kRead && read_ftrace_pipe(kSplice, kNonBlock) > 0)
-          cur_mode = kSplice;
-
-        // Do as many non-blocking read/splice as we can.
-        while (read_ftrace_pipe(cur_mode, kNonBlock) > kRoughlyAPage) {
-        }
-        size_t num_pages = pool->CommitWrittenPages();
-        PERFETTO_METATRACE_COUNTER(TAG_FTRACE, FTRACE_PAGES_DRAINED, num_pages);
-        FtraceController::OnCpuReaderRead(cpu, generation, thread_sync);
+      // As long as all of our reads are for a single page, the kernel should
+      // return exactly a well-formed raw ftrace page (if not in the steady
+      // state of reading out fully-written pages, the kernel will construct
+      // pages as necessary, copying over events and zero-filling at the end).
+      // A sub-page read() is therefore not expected in practice (unless
+      // there's a concurrent reader requesting less than a page?). Crash if
+      // encountering this situation. Kernel source pointer: see usage of
+      // |info->read| within |tracing_buffers_read|.
+      // TODO(rsavitski): don't crash, throw away the partial read & pipe
+      // through an error signal.
+      if (res == 0) {
+        // Very rare, but possible. Stop for now, should recover.
+        PERFETTO_DLOG("[cpu%zu]: 0-sized read from ftrace pipe.", cpu_);
         break;
       }
+      PERFETTO_CHECK(res == static_cast<ssize_t>(base::kPageSize));
 
-      case FtraceThreadSync::kFlush: {
-        PERFETTO_METATRACE_SCOPED(TAG_FTRACE, FTRACE_CPU_FLUSH);
-        cur_mode = kRead;
-        while (read_ftrace_pipe(cur_mode, kNonBlock) > kRoughlyAPage) {
-        }
-        size_t num_pages = pool->CommitWrittenPages();
-        PERFETTO_METATRACE_COUNTER(TAG_FTRACE, FTRACE_PAGES_DRAINED, num_pages);
-        FtraceController::OnCpuReaderFlush(cpu, generation, thread_sync);
+      pages_read += 1;
+
+      // Compare the amount of ftrace data read against an empirical threshold
+      // to make an educated guess on whether we should read more. To figure
+      // out the amount of ftrace data, we need to parse the page header (since
+      // the read always returns a page, zero-filled at the end). If we read
+      // fewer bytes than the threshold, it means that we caught up with the
+      // write pointer and we started consuming ftrace events in real-time.
+      // This cannot be just 4096 because it needs to account for
+      // fragmentation, i.e. for the fact that the last trace event didn't fit
+      // in the current page and hence the current page was terminated
+      // prematurely.
+      static constexpr size_t kRoughlyAPage = base::kPageSize - 512;
+      const uint8_t* scratch_ptr = curr_page;
+      base::Optional<PageHeader> hdr =
+          ParsePageHeader(&scratch_ptr, table_->page_header_size_len());
+      PERFETTO_DCHECK(hdr && hdr->size > 0 && hdr->size <= base::kPageSize);
+      if (!hdr.has_value()) {
+        PERFETTO_ELOG("[cpu%zu]: can't parse page header", cpu_);
         break;
       }
-    }  // switch(cmd)
-  }    // for(run_loop)
-  PERFETTO_DPLOG("Terminating CPUReader thread for CPU %zd.", cpu);
-#else
-  base::ignore_result(cpu);
-  base::ignore_result(generation);
-  base::ignore_result(trace_fd);
-  base::ignore_result(pool);
-  base::ignore_result(thread_sync);
-  base::ignore_result(header_size_len);
-  PERFETTO_ELOG("Supported only on Linux/Android");
-#endif
-}
-
-// Invoked on the main thread by FtraceController, |drain_rate_ms| after the
-// first CPU wakes up from the blocking read()/splice().
-void CpuReader::Drain(const std::set<FtraceDataSource*>& data_sources) {
-  PERFETTO_DCHECK_THREAD(thread_checker_);
-  PERFETTO_METATRACE_SCOPED(TAG_FTRACE, FTRACE_CPU_DRAIN);
-
-  auto page_blocks = pool_.BeginRead();
-  for (const auto& page_block : page_blocks) {
-    for (size_t i = 0; i < page_block.size(); i++) {
-      const uint8_t* page = page_block.At(i);
-
-      for (FtraceDataSource* data_source : data_sources) {
-        auto packet = data_source->trace_writer()->NewTracePacket();
-        auto* bundle = packet->set_ftrace_events();
-        auto* metadata = data_source->mutable_metadata();
-        auto* filter = data_source->event_filter();
-
-        // Note: The fastpath in proto_trace_parser.cc speculates on the fact
-        // that the cpu field is the first field of the proto message. If this
-        // changes, change proto_trace_parser.cc accordingly.
-        bundle->set_cpu(static_cast<uint32_t>(cpu_));
-
-        size_t evt_size = ParsePage(page, filter, bundle, table_, metadata);
-        PERFETTO_DCHECK(evt_size);
-        bundle->set_overwrite_count(metadata->overwrite_count);
+      // Note that the first read after starting the read cycle being small is
+      // normal. It means that we're given the remainder of events from a
+      // page that we've partially consumed during the last read of the previous
+      // cycle (having caught up to the writer).
+      if (hdr->size < kRoughlyAPage &&
+          !(first_batch_in_cycle && pages_read == 1)) {
+        break;
       }
     }
+  }  // end of metatrace::FTRACE_CPU_READ_BATCH
+
+  // Parse the pages and write to the trace for of all relevant data
+  // sources.
+  for (size_t i = 0; i < pages_read; i++) {
+    uint8_t* curr_page = parsing_buf + (i * base::kPageSize);
+    for (FtraceDataSource* data_source : started_data_sources) {
+      auto packet = data_source->trace_writer()->NewTracePacket();
+      auto* bundle = packet->set_ftrace_events();
+      auto* metadata = data_source->mutable_metadata();
+      auto* filter = data_source->event_filter();
+
+      // Note: The fastpath in proto_trace_parser.cc speculates on the fact
+      // that the cpu field is the first field of the proto message. If this
+      // changes, change proto_trace_parser.cc accordingly.
+      bundle->set_cpu(static_cast<uint32_t>(cpu_));
+
+      size_t evt_size = ParsePage(curr_page, filter, bundle, table_, metadata);
+      PERFETTO_DCHECK(evt_size);
+      bundle->set_overwrite_count(metadata->overwrite_count);
+    }
   }
-  pool_.EndRead(std::move(page_blocks));
+  return pages_read;
 }
 
 // The structure of a raw trace buffer page is as follows:
diff --git a/src/traced/probes/ftrace/cpu_reader.h b/src/traced/probes/ftrace/cpu_reader.h
index 9ee0a61..4d14c52 100644
--- a/src/traced/probes/ftrace/cpu_reader.h
+++ b/src/traced/probes/ftrace/cpu_reader.h
@@ -34,13 +34,11 @@
 #include "perfetto/protozero/message.h"
 #include "perfetto/protozero/message_handle.h"
 #include "src/traced/probes/ftrace/ftrace_metadata.h"
-#include "src/traced/probes/ftrace/page_pool.h"
 #include "src/traced/probes/ftrace/proto_translation_table.h"
 
 namespace perfetto {
 
 class FtraceDataSource;
-struct FtraceThreadSync;
 class ProtoTranslationTable;
 
 namespace protos {
@@ -49,24 +47,23 @@
 }  // namespace pbzero
 }  // namespace protos
 
-
-// Reads raw ftrace data for a cpu and writes that into the perfetto userspace
-// buffer.
+// Reads raw ftrace data for a cpu, parses it, and writes it into the perfetto
+// tracing buffers.
 class CpuReader {
  public:
   using FtraceEventBundle = protos::pbzero::FtraceEventBundle;
 
-  CpuReader(const ProtoTranslationTable*,
-            FtraceThreadSync*,
+  CpuReader(const ProtoTranslationTable* table,
             size_t cpu,
-            int generation,
-            base::ScopedFile fd);
+            base::ScopedFile trace_fd);
   ~CpuReader();
 
-  // Drains all available data into the buffer of the passed data sources.
-  void Drain(const std::set<FtraceDataSource*>&);
-
-  void InterruptWorkerThreadWithSignal();
+  // Reads and parses all ftrace data for this cpu (in batches), until we catch
+  // up to the writer, or hit |max_pages|. Returns number of pages read.
+  size_t ReadCycle(uint8_t* parsing_buf,
+                   size_t parsing_buf_size_pages,
+                   size_t max_pages,
+                   const std::set<FtraceDataSource*>& started_data_sources);
 
   template <typename T>
   static bool ReadAndAdvance(const uint8_t** ptr, const uint8_t* end, T* out) {
@@ -178,25 +175,23 @@
                          FtraceMetadata* metadata);
 
  private:
-  static void RunWorkerThread(size_t cpu,
-                              int generation,
-                              int trace_fd,
-                              PagePool*,
-                              FtraceThreadSync*,
-                              uint16_t header_size_len);
-
   CpuReader(const CpuReader&) = delete;
   CpuReader& operator=(const CpuReader&) = delete;
 
-  const ProtoTranslationTable* const table_;
-  FtraceThreadSync* const thread_sync_;
-  const size_t cpu_;
-  PagePool pool_;
-  base::ScopedFile trace_fd_;
-  std::thread worker_thread_;
-  PERFETTO_THREAD_CHECKER(thread_checker_)
-};
+  // Reads at most |max_pages| of ftrace data, parses it, and writes it
+  // into |started_data_sources|. Returns number of pages read.
+  // See comment on ftrace_controller.cc:kMaxParsingWorkingSetPages for
+  // rationale behind the batching.
+  size_t ReadAndProcessBatch(
+      uint8_t* parsing_buf,
+      size_t max_pages,
+      bool first_batch_in_cycle,
+      const std::set<FtraceDataSource*>& started_data_sources);
 
+  const ProtoTranslationTable* const table_;
+  const size_t cpu_;
+  base::ScopedFile trace_fd_;
+};
 
 }  // namespace perfetto
 
diff --git a/src/traced/probes/ftrace/ftrace_config_muxer.cc b/src/traced/probes/ftrace/ftrace_config_muxer.cc
index 21c1c7a..8ec4fd5 100644
--- a/src/traced/probes/ftrace/ftrace_config_muxer.cc
+++ b/src/traced/probes/ftrace/ftrace_config_muxer.cc
@@ -32,9 +32,6 @@
 // trace_clocks in preference order.
 constexpr const char* kClocks[] = {"boot", "global", "local"};
 
-constexpr int kDefaultPerCpuBufferSizeKb = 2 * 1024;  // 2mb
-constexpr int kMaxPerCpuBufferSizeKb = 64 * 1024;  // 64mb
-
 void AddEventGroup(const ProtoTranslationTable* table,
                    const std::string& group,
                    std::set<GroupAndName>* to) {
diff --git a/src/traced/probes/ftrace/ftrace_config_muxer.h b/src/traced/probes/ftrace/ftrace_config_muxer.h
index 9bffd45..a8f92d5 100644
--- a/src/traced/probes/ftrace/ftrace_config_muxer.h
+++ b/src/traced/probes/ftrace/ftrace_config_muxer.h
@@ -27,6 +27,9 @@
 
 namespace perfetto {
 
+constexpr int kDefaultPerCpuBufferSizeKb = 2 * 1024;  // 2mb
+constexpr int kMaxPerCpuBufferSizeKb = 64 * 1024;     // 64mb
+
 // Ftrace is a bunch of globaly modifiable persistent state.
 // Given a number of FtraceConfig's we need to find the best union of all
 // the settings to make eveyone happy while also watching out for anybody
diff --git a/src/traced/probes/ftrace/ftrace_controller.cc b/src/traced/probes/ftrace/ftrace_controller.cc
index 907e6f1..e2f60fe 100644
--- a/src/traced/probes/ftrace/ftrace_controller.cc
+++ b/src/traced/probes/ftrace/ftrace_controller.cc
@@ -48,10 +48,24 @@
 namespace {
 
 constexpr int kDefaultDrainPeriodMs = 100;
-constexpr int kControllerFlushTimeoutMs = 500;
 constexpr int kMinDrainPeriodMs = 1;
 constexpr int kMaxDrainPeriodMs = 1000 * 60;
 
+// When reading and parsing data for a particular cpu, we do it in batches of
+// this many pages. In other words, we'll read up to
+// |kParsingBufferSizePages| into memory, parse them, and then repeat if we
+// still haven't caught up to the writer. A working set of 32 pages is 128k of
+// data, which should fit in a typical L1D cache. Furthermore, the batching
+// limits the memory usage of traced_probes.
+constexpr size_t kParsingBufferSizePages = 32;
+
+// Read at most this many pages of data per cpu per read task. If we hit this
+// limit on at least one cpu, we stop and repost the read task, letting other
+// tasks get some cpu time before continuing reading.
+// TODO(rsavitski): propagate the knowledge of the current per-cpu kernel buffer
+// size to this controller, and select the lower of the two.
+constexpr size_t kMaxPagesPerCpuPerReadTick = 256;  // 1 MB per cpu
+
 uint32_t ClampDrainPeriodMs(uint32_t drain_period_ms) {
   if (drain_period_ms == 0) {
     return kDefaultDrainPeriodMs;
@@ -135,16 +149,12 @@
                                    Observer* observer)
     : task_runner_(task_runner),
       observer_(observer),
-      thread_sync_(task_runner),
       ftrace_procfs_(std::move(ftrace_procfs)),
       table_(std::move(table)),
       ftrace_config_muxer_(std::move(model)),
-      weak_factory_(this) {
-  thread_sync_.trace_controller_weak = GetWeakPtr();
-}
+      weak_factory_(this) {}
 
 FtraceController::~FtraceController() {
-  PERFETTO_DCHECK_THREAD(thread_checker_);
   for (const auto* data_source : data_sources_)
     ftrace_config_muxer_->RemoveConfig(data_source->config_id());
   data_sources_.clear();
@@ -156,161 +166,86 @@
   return static_cast<uint64_t>(base::GetWallTimeMs().count());
 }
 
-// The OnCpuReader* methods below are called on the CpuReader worker threads.
-// Lifetime is guaranteed to be valid, because the FtraceController dtor
-// (that happens on the main thread) joins the worker threads.
-
-// static
-void FtraceController::OnCpuReaderRead(size_t cpu,
-                                       int generation,
-                                       FtraceThreadSync* thread_sync) {
-  PERFETTO_METATRACE_SCOPED(TAG_FTRACE, FTRACE_CPU_READER_READ);
-  {
-    std::lock_guard<std::mutex> lock(thread_sync->mutex);
-    // If this was the first CPU to wake up, schedule a drain for the next
-    // drain interval.
-    bool post_drain_task = thread_sync->cpus_to_drain.none();
-    thread_sync->cpus_to_drain[cpu] = true;
-    if (!post_drain_task)
-      return;
-  }  // lock(thread_sync_.mutex)
-
-  base::WeakPtr<FtraceController> weak_ctl = thread_sync->trace_controller_weak;
-  base::TaskRunner* task_runner = thread_sync->task_runner;
-
-  // The nested PostTask is used because the FtraceController (and hence
-  // GetDrainPeriodMs()) can be called only on the main thread.
-  task_runner->PostTask([weak_ctl, task_runner, generation] {
-
-    if (!weak_ctl)
-      return;
-    uint32_t drain_period_ms = weak_ctl->GetDrainPeriodMs();
-
-    task_runner->PostDelayedTask(
-        [weak_ctl, generation] {
-          if (weak_ctl)
-            weak_ctl->DrainCPUs(generation);
-        },
-        drain_period_ms - (weak_ctl->NowMs() % drain_period_ms));
-
-  });
-}
-
-// static
-void FtraceController::OnCpuReaderFlush(size_t cpu,
-                                        int generation,
-                                        FtraceThreadSync* thread_sync) {
-  // In the case of a flush, we want to drain the data as quickly as possible to
-  // minimize the flush latency, at the cost of more tasks / wakeups (eventually
-  // one task per cpu). Flushes are not supposed to happen too frequently.
-  {
-    std::lock_guard<std::mutex> lock(thread_sync->mutex);
-    thread_sync->cpus_to_drain[cpu] = true;
-    thread_sync->flush_acks[cpu] = true;
-  }  // lock(thread_sync_.mutex)
-
-  base::WeakPtr<FtraceController> weak_ctl = thread_sync->trace_controller_weak;
-  thread_sync->task_runner->PostTask([weak_ctl, generation] {
-    if (weak_ctl)
-      weak_ctl->DrainCPUs(generation);
-  });
-}
-
-void FtraceController::DrainCPUs(int generation) {
-  PERFETTO_DCHECK_THREAD(thread_checker_);
-  PERFETTO_METATRACE_SCOPED(TAG_FTRACE, FTRACE_DRAIN_CPUS);
-
-  if (generation != generation_)
-    return;
-
-  const size_t num_cpus = ftrace_procfs_->NumberOfCpus();
-  PERFETTO_DCHECK(cpu_readers_.size() == num_cpus);
-  FlushRequestID ack_flush_request_id = 0;
-  std::bitset<base::kMaxCpus> cpus_to_drain;
-  {
-    std::lock_guard<std::mutex> lock(thread_sync_.mutex);
-    std::swap(cpus_to_drain, thread_sync_.cpus_to_drain);
-
-    // Check also if a flush is pending and if all cpus have acked. If that's
-    // the case, ack the overall Flush() request at the end of this function.
-    if (cur_flush_request_id_ && thread_sync_.flush_acks.count() >= num_cpus) {
-      thread_sync_.flush_acks.reset();
-      ack_flush_request_id = cur_flush_request_id_;
-      cur_flush_request_id_ = 0;
-    }
-  }
-
-  for (size_t cpu = 0; cpu < num_cpus; cpu++) {
-    if (!cpus_to_drain[cpu])
-      continue;
-    // This method reads the pipe and converts the raw ftrace data into
-    // protobufs using the |data_source|'s TraceWriter.
-    cpu_readers_[cpu]->Drain(started_data_sources_);
-    OnDrainCpuForTesting(cpu);
-  }
-
-  // If we filled up any SHM pages while draining the data, we will have posted
-  // a task to notify traced about this. Only unblock the readers after this
-  // notification is sent to make it less likely that they steal CPU time away
-  // from traced. Also, don't unblock the readers until all of them have replied
-  // to the flush.
-  if (!cur_flush_request_id_) {
-    base::WeakPtr<FtraceController> weak_this = weak_factory_.GetWeakPtr();
-    task_runner_->PostTask([weak_this] {
-      if (weak_this)
-        weak_this->UnblockReaders();
-    });
-  }
-
-  observer_->OnFtraceDataWrittenIntoDataSourceBuffers();
-
-  if (ack_flush_request_id) {
-    // Flush completed, all CpuReader(s) acked.
-
-    IssueThreadSyncCmd(FtraceThreadSync::kRun);  // Switch back to reading mode.
-
-    // This will call FtraceDataSource::OnFtraceFlushComplete(), which in turn
-    // will flush the userspace buffers and ack the flush to the ProbesProducer
-    // which in turn will ack the flush to the tracing service.
-    NotifyFlushCompleteToStartedDataSources(ack_flush_request_id);
-  }
-}
-
-void FtraceController::UnblockReaders() {
-  PERFETTO_METATRACE_SCOPED(TAG_FTRACE, FTRACE_UNBLOCK_READERS);
-
-  // If a flush or a quit is pending, do nothing.
-  std::unique_lock<std::mutex> lock(thread_sync_.mutex);
-  if (thread_sync_.cmd != FtraceThreadSync::kRun)
-    return;
-
-  // Unblock all waiting readers to start moving more data into their
-  // respective staging pipes.
-  IssueThreadSyncCmd(FtraceThreadSync::kRun, std::move(lock));
-}
-
 void FtraceController::StartIfNeeded() {
   if (started_data_sources_.size() > 1)
     return;
   PERFETTO_DCHECK(!started_data_sources_.empty());
   PERFETTO_DCHECK(cpu_readers_.empty());
-  base::WeakPtr<FtraceController> weak_this = weak_factory_.GetWeakPtr();
 
-  {
-    std::lock_guard<std::mutex> lock(thread_sync_.mutex);
-    thread_sync_.cpus_to_drain.reset();
-    thread_sync_.cmd = FtraceThreadSync::kRun;
-    thread_sync_.cmd_id++;
+  // Lazily allocate the memory used for reading & parsing ftrace.
+  if (!parsing_mem_.IsValid()) {
+    parsing_mem_ =
+        base::PagedMemory::Allocate(base::kPageSize * kParsingBufferSizePages);
   }
 
-  generation_++;
   cpu_readers_.clear();
   cpu_readers_.reserve(ftrace_procfs_->NumberOfCpus());
   for (size_t cpu = 0; cpu < ftrace_procfs_->NumberOfCpus(); cpu++) {
     cpu_readers_.emplace_back(
-        new CpuReader(table_.get(), &thread_sync_, cpu, generation_,
-                      ftrace_procfs_->OpenPipeForCpu(cpu)));
+        new CpuReader(table_.get(), cpu, ftrace_procfs_->OpenPipeForCpu(cpu)));
   }
+
+  // Start the repeating read tasks.
+  auto generation = ++generation_;
+  auto drain_period_ms = GetDrainPeriodMs();
+  auto weak_this = weak_factory_.GetWeakPtr();
+  task_runner_->PostDelayedTask(
+      [weak_this, generation] {
+        if (weak_this)
+          weak_this->ReadTick(generation);
+      },
+      drain_period_ms - (NowMs() % drain_period_ms));
+}
+
+void FtraceController::ReadTick(int generation) {
+  metatrace::ScopedEvent evt(metatrace::TAG_FTRACE,
+                             metatrace::FTRACE_READ_TICK);
+  if (started_data_sources_.empty() || generation != generation_) {
+    return;
+  }
+
+  bool all_cpus_caught_up = ReadAllCpuBuffers(kMaxPagesPerCpuPerReadTick);
+
+  // The reading for a given cpu will read at most |kMaxPagesPerCpuPerReadTick|
+  // pages. If we hit this limit on at least one cpu, repost the task
+  // at the end of the immediate queue, letting the other tasks get some cpu
+  // time before we continue catching up with the event stream.
+  auto weak_this = weak_factory_.GetWeakPtr();
+  if (!all_cpus_caught_up) {
+    PERFETTO_DLOG("Reposting immediate ReadTick as there's more work.");
+    task_runner_->PostTask([weak_this, generation] {
+      if (weak_this)
+        weak_this->ReadTick(generation);
+    });
+  } else {
+    // Done until next drain period.
+    auto drain_period_ms = GetDrainPeriodMs();
+    task_runner_->PostDelayedTask(
+        [weak_this, generation] {
+          if (weak_this)
+            weak_this->ReadTick(generation);
+        },
+        drain_period_ms - (NowMs() % drain_period_ms));
+  }
+}
+
+bool FtraceController::ReadAllCpuBuffers(size_t max_pages) {
+  PERFETTO_DCHECK(parsing_mem_.IsValid() &&
+                  parsing_mem_.size() ==
+                      base::kPageSize * kParsingBufferSizePages);
+
+  bool all_cpus_caught_up = true;
+  auto* parsing_buf = reinterpret_cast<uint8_t*>(parsing_mem_.Get());
+  for (auto& cpu_reader : cpu_readers_) {
+    size_t pages_read = cpu_reader->ReadCycle(
+        parsing_buf, kParsingBufferSizePages, max_pages, started_data_sources_);
+
+    // ReadCycle gave up early as it was doing too much work.
+    if (pages_read == max_pages)
+      all_cpus_caught_up = false;
+  }
+  observer_->OnFtraceDataWrittenIntoDataSourceBuffers();
+  return all_cpus_caught_up;
 }
 
 uint32_t FtraceController::GetDrainPeriodMs() {
@@ -337,45 +272,19 @@
 }
 
 void FtraceController::Flush(FlushRequestID flush_id) {
-  PERFETTO_DCHECK_THREAD(thread_checker_);
+  metatrace::ScopedEvent evt(metatrace::TAG_FTRACE,
+                             metatrace::FTRACE_CPU_FLUSH);
 
-  if (flush_id == cur_flush_request_id_)
-    return;  // Already dealing with this flush request.
+  // Read all cpus in one go, limiting the per-cpu buffer size to make sure we
+  // don't get stuck chasing the writer if there's a very high bandwidth of
+  // events.
+  // TODO(rsavitski): use the active buffer size as the limit, instead of the
+  // largest one we allow.
+  size_t max_pages_per_cpu = kMaxPerCpuBufferSizeKb / (base::kPageSize / 1024);
+  ReadAllCpuBuffers(max_pages_per_cpu);
 
-  cur_flush_request_id_ = flush_id;
-  {
-    std::unique_lock<std::mutex> lock(thread_sync_.mutex);
-    thread_sync_.flush_acks.reset();
-    IssueThreadSyncCmd(FtraceThreadSync::kFlush, std::move(lock));
-  }
-
-  base::WeakPtr<FtraceController> weak_this = weak_factory_.GetWeakPtr();
-  task_runner_->PostDelayedTask(
-      [weak_this, flush_id] {
-        if (weak_this)
-          weak_this->OnFlushTimeout(flush_id);
-      },
-      kControllerFlushTimeoutMs);
-}
-
-void FtraceController::OnFlushTimeout(FlushRequestID flush_request_id) {
-  if (flush_request_id != cur_flush_request_id_)
-    return;
-
-  std::string acks;  // For debugging purposes only.
-  {
-    // Unlock the cpu readers and move on.
-    std::unique_lock<std::mutex> lock(thread_sync_.mutex);
-    acks = thread_sync_.flush_acks.to_string();
-    thread_sync_.flush_acks.reset();
-    if (thread_sync_.cmd == FtraceThreadSync::kFlush)
-      IssueThreadSyncCmd(FtraceThreadSync::kRun, std::move(lock));
-  }
-
-  PERFETTO_ELOG("Ftrace flush(%" PRIu64 ") timed out. Acked cpus mask: [%s]",
-                flush_request_id, acks.c_str());
-  cur_flush_request_id_ = 0;
-  NotifyFlushCompleteToStartedDataSources(flush_request_id);
+  for (FtraceDataSource* data_source : started_data_sources_)
+    data_source->OnFtraceFlushComplete(flush_id);
 }
 
 void FtraceController::StopIfNeeded() {
@@ -386,15 +295,14 @@
   // ask for an explicit flush before stopping, unless it needs to perform a
   // non-graceful stop.
 
-  IssueThreadSyncCmd(FtraceThreadSync::kQuit);
-
-  // Destroying the CpuReader(s) will join on their worker threads.
   cpu_readers_.clear();
-  generation_++;
+
+  if (parsing_mem_.IsValid()) {
+    parsing_mem_.AdviseDontNeed(parsing_mem_.Get(), parsing_mem_.size());
+  }
 }
 
 bool FtraceController::AddDataSource(FtraceDataSource* data_source) {
-  PERFETTO_DCHECK_THREAD(thread_checker_);
   if (!ValidConfig(data_source->config()))
     return false;
 
@@ -410,7 +318,6 @@
 }
 
 bool FtraceController::StartDataSource(FtraceDataSource* data_source) {
-  PERFETTO_DCHECK_THREAD(thread_checker_);
   PERFETTO_DCHECK(data_sources_.count(data_source) > 0);
 
   FtraceConfigId config_id = data_source->config_id();
@@ -425,7 +332,6 @@
 }
 
 void FtraceController::RemoveDataSource(FtraceDataSource* data_source) {
-  PERFETTO_DCHECK_THREAD(thread_checker_);
   started_data_sources_.erase(data_source);
   size_t removed = data_sources_.erase(data_source);
   if (!removed)
@@ -438,43 +344,6 @@
   DumpAllCpuStats(ftrace_procfs_.get(), stats);
 }
 
-void FtraceController::IssueThreadSyncCmd(
-    FtraceThreadSync::Cmd cmd,
-    std::unique_lock<std::mutex> pass_lock_from_caller) {
-  PERFETTO_DCHECK_THREAD(thread_checker_);
-  {
-    std::unique_lock<std::mutex> lock(std::move(pass_lock_from_caller));
-    if (!lock.owns_lock())
-      lock = std::unique_lock<std::mutex>(thread_sync_.mutex);
-
-    if (thread_sync_.cmd == FtraceThreadSync::kQuit &&
-        cmd != FtraceThreadSync::kQuit) {
-      // If in kQuit state, we should never issue any other commands.
-      return;
-    }
-
-    thread_sync_.cmd = cmd;
-    thread_sync_.cmd_id++;
-  }
-
-  // Send a SIGPIPE to all worker threads to wake them up if they are sitting in
-  // a blocking splice(). If they are not and instead they are sitting in the
-  // cond-variable.wait(), this, together with the one below, will have at best
-  // the same effect of a spurious wakeup, depending on the implementation of
-  // the condition variable.
-  for (const auto& cpu_reader : cpu_readers_)
-    cpu_reader->InterruptWorkerThreadWithSignal();
-
-  thread_sync_.cond.notify_all();
-}
-
-void FtraceController::NotifyFlushCompleteToStartedDataSources(
-    FlushRequestID flush_request_id) {
-  PERFETTO_DCHECK_THREAD(thread_checker_);
-  for (FtraceDataSource* data_source : started_data_sources_)
-    data_source->OnFtraceFlushComplete(flush_request_id);
-}
-
 FtraceController::Observer::~Observer() = default;
 
 }  // namespace perfetto
diff --git a/src/traced/probes/ftrace/ftrace_controller.h b/src/traced/probes/ftrace/ftrace_controller.h
index 63514b7..bee7d27 100644
--- a/src/traced/probes/ftrace/ftrace_controller.h
+++ b/src/traced/probes/ftrace/ftrace_controller.h
@@ -28,11 +28,11 @@
 #include <string>
 
 #include "perfetto/base/task_runner.h"
+#include "perfetto/ext/base/paged_memory.h"
 #include "perfetto/ext/base/utils.h"
 #include "perfetto/ext/base/weak_ptr.h"
 #include "perfetto/ext/tracing/core/basic_types.h"
 #include "src/traced/probes/ftrace/ftrace_config_utils.h"
-#include "src/traced/probes/ftrace/ftrace_thread_sync.h"
 
 namespace perfetto {
 
@@ -61,10 +61,6 @@
   static std::unique_ptr<FtraceController> Create(base::TaskRunner*, Observer*);
   virtual ~FtraceController();
 
-  // These two methods are called by CpuReader(s) from their worker threads.
-  static void OnCpuReaderRead(size_t cpu, int generation, FtraceThreadSync*);
-  static void OnCpuReaderFlush(size_t cpu, int generation, FtraceThreadSync*);
-
   void DisableAllEvents();
   void WriteTraceMarker(const std::string& s);
   void ClearTrace();
@@ -92,8 +88,6 @@
                    base::TaskRunner*,
                    Observer*);
 
-  virtual void OnDrainCpuForTesting(size_t /*cpu*/) {}
-
   // Protected and virtual for testing.
   virtual uint64_t NowMs() const;
 
@@ -103,12 +97,12 @@
   FtraceController(const FtraceController&) = delete;
   FtraceController& operator=(const FtraceController&) = delete;
 
-  void OnFlushTimeout(FlushRequestID);
-  void DrainCPUs(int generation);
-  void UnblockReaders();
-  void NotifyFlushCompleteToStartedDataSources(FlushRequestID);
-  void IssueThreadSyncCmd(FtraceThreadSync::Cmd,
-                          std::unique_lock<std::mutex> = {});
+  // Periodic task that reads all per-cpu ftrace buffers.
+  void ReadTick(int generation);
+  // Returns true if we've caught up with the ftrace events for all cpus. False
+  // if we hit |max_pages| on at least one cpu, and are therefore stopping
+  // early (to not hog the thread for too long).
+  bool ReadAllCpuBuffers(size_t max_pages);
 
   uint32_t GetDrainPeriodMs();
 
@@ -117,17 +111,15 @@
 
   base::TaskRunner* const task_runner_;
   Observer* const observer_;
-  FtraceThreadSync thread_sync_;
+  base::PagedMemory parsing_mem_;
   std::unique_ptr<FtraceProcfs> ftrace_procfs_;
   std::unique_ptr<ProtoTranslationTable> table_;
   std::unique_ptr<FtraceConfigMuxer> ftrace_config_muxer_;
   int generation_ = 0;
-  FlushRequestID cur_flush_request_id_ = 0;
   bool atrace_running_ = false;
   std::vector<std::unique_ptr<CpuReader>> cpu_readers_;
   std::set<FtraceDataSource*> data_sources_;
   std::set<FtraceDataSource*> started_data_sources_;
-  PERFETTO_THREAD_CHECKER(thread_checker_)
   base::WeakPtrFactory<FtraceController> weak_factory_;  // Keep last.
 };
 
diff --git a/src/traced/probes/ftrace/ftrace_controller_unittest.cc b/src/traced/probes/ftrace/ftrace_controller_unittest.cc
index abf6aa6..ac55677 100644
--- a/src/traced/probes/ftrace/ftrace_controller_unittest.cc
+++ b/src/traced/probes/ftrace/ftrace_controller_unittest.cc
@@ -38,13 +38,14 @@
 using testing::_;
 using testing::AnyNumber;
 using testing::ByMove;
-using testing::Invoke;
-using testing::NiceMock;
-using testing::MatchesRegex;
-using testing::Return;
-using testing::IsEmpty;
 using testing::ElementsAre;
+using testing::Invoke;
+using testing::IsEmpty;
+using testing::MatchesRegex;
+using testing::Mock;
+using testing::NiceMock;
 using testing::Pair;
+using testing::Return;
 
 using Table = perfetto::ProtoTranslationTable;
 using FtraceEventBundle = perfetto::protos::pbzero::FtraceEventBundle;
@@ -58,47 +59,11 @@
 
 class MockTaskRunner : public base::TaskRunner {
  public:
-  MockTaskRunner() {
-    ON_CALL(*this, PostTask(_))
-        .WillByDefault(Invoke(this, &MockTaskRunner::OnPostTask));
-    ON_CALL(*this, PostDelayedTask(_, _))
-        .WillByDefault(Invoke(this, &MockTaskRunner::OnPostDelayedTask));
-  }
-
-  void OnPostTask(std::function<void()> task) {
-    std::unique_lock<std::mutex> lock(lock_);
-    EXPECT_FALSE(task_);
-    task_ = std::move(task);
-  }
-
-  void OnPostDelayedTask(std::function<void()> task, int /*delay*/) {
-    std::unique_lock<std::mutex> lock(lock_);
-    EXPECT_FALSE(task_);
-    task_ = std::move(task);
-  }
-
-  void RunLastTask() {
-    auto task = TakeTask();
-    if (task)
-      task();
-  }
-
-  std::function<void()> TakeTask() {
-    std::unique_lock<std::mutex> lock(lock_);
-    auto task(std::move(task_));
-    task_ = std::function<void()>();
-    return task;
-  }
-
   MOCK_METHOD1(PostTask, void(std::function<void()>));
   MOCK_METHOD2(PostDelayedTask, void(std::function<void()>, uint32_t delay_ms));
   MOCK_METHOD2(AddFileDescriptorWatch, void(int fd, std::function<void()>));
   MOCK_METHOD1(RemoveFileDescriptorWatch, void(int fd));
   MOCK_CONST_METHOD0(RunsTasksOnCurrentThread, bool());
-
- private:
-  std::mutex lock_;
-  std::function<void()> task_;
 };
 
 std::unique_ptr<Table> FakeTable(FtraceProcfs* ftrace) {
@@ -214,34 +179,11 @@
         runner_(std::move(runner)),
         procfs_(raw_procfs) {}
 
-  MOCK_METHOD1(OnDrainCpuForTesting, void(size_t cpu));
-
   MockTaskRunner* runner() { return runner_.get(); }
   MockFtraceProcfs* procfs() { return procfs_; }
-
   uint64_t NowMs() const override { return now_ms; }
-
   uint32_t drain_period_ms() { return GetDrainPeriodMs(); }
 
-  std::function<void()> GetDataAvailableCallback(size_t cpu) {
-    int generation = generation_;
-    auto* thread_sync = &thread_sync_;
-    return [cpu, generation, thread_sync] {
-      FtraceController::OnCpuReaderRead(cpu, generation, thread_sync);
-    };
-  }
-
-  void WaitForData(size_t cpu) {
-    for (;;) {
-      {
-        std::unique_lock<std::mutex> lock(thread_sync_.mutex);
-        if (thread_sync_.cpus_to_drain[cpu])
-          return;
-      }
-      usleep(5000);
-    }
-  }
-
   std::unique_ptr<FtraceDataSource> AddFakeDataSource(const FtraceConfig& cfg) {
     std::unique_ptr<FtraceDataSource> data_source(new FtraceDataSource(
         GetWeakPtr(), 0 /* session id */, cfg, nullptr /* trace_writer */));
@@ -265,15 +207,10 @@
 namespace {
 
 std::unique_ptr<TestFtraceController> CreateTestController(
-    bool runner_is_nice_mock,
     bool procfs_is_nice_mock,
     size_t cpu_count = 1) {
-  std::unique_ptr<MockTaskRunner> runner;
-  if (runner_is_nice_mock) {
-    runner = std::unique_ptr<MockTaskRunner>(new NiceMock<MockTaskRunner>());
-  } else {
-    runner = std::unique_ptr<MockTaskRunner>(new MockTaskRunner());
-  }
+  std::unique_ptr<MockTaskRunner> runner =
+      std::unique_ptr<MockTaskRunner>(new NiceMock<MockTaskRunner>());
 
   std::unique_ptr<MockFtraceProcfs> ftrace_procfs;
   if (procfs_is_nice_mock) {
@@ -297,16 +234,14 @@
 }  // namespace
 
 TEST(FtraceControllerTest, NonExistentEventsDontCrash) {
-  auto controller =
-      CreateTestController(true /* nice runner */, true /* nice procfs */);
+  auto controller = CreateTestController(true /* nice procfs */);
 
   FtraceConfig config = CreateFtraceConfig({"not_an_event"});
   EXPECT_TRUE(controller->AddFakeDataSource(config));
 }
 
 TEST(FtraceControllerTest, RejectsBadEventNames) {
-  auto controller =
-      CreateTestController(true /* nice runner */, true /* nice procfs */);
+  auto controller = CreateTestController(true /* nice procfs */);
 
   FtraceConfig config = CreateFtraceConfig({"../try/to/escape"});
   EXPECT_FALSE(controller->AddFakeDataSource(config));
@@ -317,8 +252,10 @@
 }
 
 TEST(FtraceControllerTest, OneSink) {
-  auto controller =
-      CreateTestController(true /* nice runner */, false /* nice procfs */);
+  auto controller = CreateTestController(false /* nice procfs */);
+
+  // No read tasks posted as part of adding the data source.
+  EXPECT_CALL(*controller->runner(), PostDelayedTask(_, _)).Times(0);
 
   FtraceConfig config = CreateFtraceConfig({"group/foo"});
 
@@ -327,9 +264,18 @@
   auto data_source = controller->AddFakeDataSource(config);
   ASSERT_TRUE(data_source);
 
+  // Verify that no read tasks have been posted. And set up expectation that
+  // a single recurring read task will be posted as part of starting the data
+  // source.
+  Mock::VerifyAndClearExpectations(controller->runner());
+  EXPECT_CALL(*controller->runner(), PostDelayedTask(_, _)).Times(1);
+
   EXPECT_CALL(*controller->procfs(), WriteToFile("/root/tracing_on", "1"));
   ASSERT_TRUE(controller->StartDataSource(data_source.get()));
 
+  // Verify single posted read task.
+  Mock::VerifyAndClearExpectations(controller->runner());
+
   EXPECT_CALL(*controller->procfs(), WriteToFile("/root/buffer_size_kb", "0"));
   EXPECT_CALL(*controller->procfs(), ClearFile("/root/trace"))
       .WillOnce(Return(true));
@@ -346,22 +292,33 @@
 }
 
 TEST(FtraceControllerTest, MultipleSinks) {
-  auto controller =
-      CreateTestController(false /* nice runner */, false /* nice procfs */);
+  auto controller = CreateTestController(false /* nice procfs */);
 
   FtraceConfig configA = CreateFtraceConfig({"group/foo"});
   FtraceConfig configB = CreateFtraceConfig({"group/foo", "group/bar"});
 
+  // No read tasks posted as part of adding the data sources.
+  EXPECT_CALL(*controller->runner(), PostDelayedTask(_, _)).Times(0);
+
   EXPECT_CALL(*controller->procfs(), WriteToFile("/root/buffer_size_kb", _));
   EXPECT_CALL(*controller->procfs(), WriteToFile(kFooEnablePath, "1"));
   auto data_sourceA = controller->AddFakeDataSource(configA);
   EXPECT_CALL(*controller->procfs(), WriteToFile(kBarEnablePath, "1"));
   auto data_sourceB = controller->AddFakeDataSource(configB);
 
+  // Verify that no read tasks have been posted. And set up expectation that
+  // a single recurring read task will be posted as part of starting the data
+  // sources.
+  Mock::VerifyAndClearExpectations(controller->runner());
+  EXPECT_CALL(*controller->runner(), PostDelayedTask(_, _)).Times(1);
+
   EXPECT_CALL(*controller->procfs(), WriteToFile("/root/tracing_on", "1"));
   ASSERT_TRUE(controller->StartDataSource(data_sourceA.get()));
   ASSERT_TRUE(controller->StartDataSource(data_sourceB.get()));
 
+  // Verify single posted read task.
+  Mock::VerifyAndClearExpectations(controller->runner());
+
   data_sourceA.reset();
 
   EXPECT_CALL(*controller->procfs(), WriteToFile(kFooEnablePath, "0"));
@@ -376,8 +333,7 @@
 }
 
 TEST(FtraceControllerTest, ControllerMayDieFirst) {
-  auto controller =
-      CreateTestController(false /* nice runner */, false /* nice procfs */);
+  auto controller = CreateTestController(false /* nice procfs */);
 
   FtraceConfig config = CreateFtraceConfig({"group/foo"});
 
@@ -401,51 +357,8 @@
   data_source.reset();
 }
 
-TEST(FtraceControllerTest, BackToBackEnableDisable) {
-  auto controller =
-      CreateTestController(false /* nice runner */, false /* nice procfs */);
-
-  // For this test we don't care about calls to WriteToFile/ClearFile.
-  EXPECT_CALL(*controller->procfs(), WriteToFile(_, _)).Times(AnyNumber());
-  EXPECT_CALL(*controller->procfs(), ClearFile(_)).Times(AnyNumber());
-  EXPECT_CALL(*controller->procfs(), ReadOneCharFromFile("/root/tracing_on"))
-      .Times(AnyNumber());
-
-  EXPECT_CALL(*controller->runner(), PostTask(_)).Times(2);
-  EXPECT_CALL(*controller->runner(), PostDelayedTask(_, 100)).Times(2);
-  FtraceConfig config = CreateFtraceConfig({"group/foo"});
-  auto data_source = controller->AddFakeDataSource(config);
-  ASSERT_TRUE(controller->StartDataSource(data_source.get()));
-
-  auto on_data_available = controller->GetDataAvailableCallback(0u);
-  std::thread worker([on_data_available] { on_data_available(); });
-  controller->WaitForData(0u);
-
-  // Disable the first data source and run the delayed task that it generated.
-  // It should be a no-op.
-  worker.join();
-  data_source.reset();
-  controller->runner()->RunLastTask();
-  controller->runner()->RunLastTask();
-
-  // Register another data source and wait for it to generate data.
-  data_source = controller->AddFakeDataSource(config);
-  ASSERT_TRUE(controller->StartDataSource(data_source.get()));
-
-  on_data_available = controller->GetDataAvailableCallback(0u);
-  std::thread worker2([on_data_available] { on_data_available(); });
-  controller->WaitForData(0u);
-
-  // This drain should also be a no-op after the data source is unregistered.
-  worker2.join();
-  data_source.reset();
-  controller->runner()->RunLastTask();
-  controller->runner()->RunLastTask();
-}
-
 TEST(FtraceControllerTest, BufferSize) {
-  auto controller =
-      CreateTestController(true /* nice runner */, false /* nice procfs */);
+  auto controller = CreateTestController(false /* nice procfs */);
 
   // For this test we don't care about most calls to WriteToFile/ClearFile.
   EXPECT_CALL(*controller->procfs(), WriteToFile(_, _)).Times(AnyNumber());
@@ -514,8 +427,7 @@
 }
 
 TEST(FtraceControllerTest, PeriodicDrainConfig) {
-  auto controller =
-      CreateTestController(true /* nice runner */, false /* nice procfs */);
+  auto controller = CreateTestController(false /* nice procfs */);
 
   // For this test we don't care about calls to WriteToFile/ClearFile.
   EXPECT_CALL(*controller->procfs(), WriteToFile(_, _)).Times(AnyNumber());
diff --git a/src/traced/probes/ftrace/ftrace_thread_sync.h b/src/traced/probes/ftrace/ftrace_thread_sync.h
deleted file mode 100644
index fbfa3fd..0000000
--- a/src/traced/probes/ftrace/ftrace_thread_sync.h
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Copyright (C) 2018 The Android Open Source Project
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef SRC_TRACED_PROBES_FTRACE_FTRACE_THREAD_SYNC_H_
-#define SRC_TRACED_PROBES_FTRACE_FTRACE_THREAD_SYNC_H_
-
-#include <stdint.h>
-
-#include <bitset>
-#include <condition_variable>
-#include <mutex>
-
-#include "perfetto/ext/base/utils.h"
-#include "perfetto/ext/base/weak_ptr.h"
-
-namespace perfetto {
-
-namespace base {
-class TaskRunner;
-}  // namespace base
-
-class FtraceController;
-
-// This struct is accessed both by the FtraceController on the main thread and
-// by the CpuReader(s) on their worker threads. It is used to synchronize
-// handshakes between FtraceController and CpuReader(s). There is only *ONE*
-// instance of this state, owned by the FtraceController and shared with all
-// CpuReader(s).
-struct FtraceThreadSync {
-  explicit FtraceThreadSync(base::TaskRunner* tr) : task_runner(tr) {}
-
-  // These variables are set upon initialization time and never changed. Can
-  // be accessed outside of the |mutex|.
-  base::TaskRunner* const task_runner;  // Where the FtraceController lives.
-  base::WeakPtr<FtraceController> trace_controller_weak;
-
-  // Mutex & condition variable shared by main thread and all per-cpu workers.
-  // All fields below are read and modified holding |mutex|.
-  std::mutex mutex;
-
-  // Used to suspend CpuReader(s) between cycles and to wake them up at the
-  // same time.
-  std::condition_variable cond;
-
-  // |cmd| and |cmd_id| are written only by FtraceController. On each cycle,
-  // FtraceController increases the |cmd_id| monotonic counter and issues the
-  // new command. |cmd_id| is used by the CpuReader(s) to distinguish a new
-  // command from a spurious wakeup.
-  enum Cmd { kRun = 0, kFlush, kQuit };
-  Cmd cmd = kRun;
-  uint64_t cmd_id = 0;
-
-  // This bitmap is cleared by the FtraceController before every kRun command
-  // and is optionally set by OnDataAvailable() if a CpuReader did fetch any
-  // ftrace data during the read cycle.
-  std::bitset<base::kMaxCpus> cpus_to_drain;
-
-  // This bitmap is cleared by the FtraceController before issuing a kFlush
-  // command and set by each CpuReader after they have completed the flush.
-  std::bitset<base::kMaxCpus> flush_acks;
-};
-
-}  // namespace perfetto
-
-#endif  // SRC_TRACED_PROBES_FTRACE_FTRACE_THREAD_SYNC_H_
diff --git a/src/traced/probes/ftrace/page_pool.cc b/src/traced/probes/ftrace/page_pool.cc
deleted file mode 100644
index f66309d..0000000
--- a/src/traced/probes/ftrace/page_pool.cc
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Copyright (C) 2018 The Android Open Source Project
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "src/traced/probes/ftrace/page_pool.h"
-
-#include <array>
-
-namespace perfetto {
-
-namespace {
-constexpr size_t kMaxFreelistBlocks = 128;  // 128 * 32 * 4KB = 16MB.
-}
-
-void PagePool::NewPageBlock() {
-  std::lock_guard<std::mutex> lock(mutex_);
-  if (freelist_.empty()) {
-    write_queue_.emplace_back(PageBlock::Create());
-  } else {
-    write_queue_.emplace_back(std::move(freelist_.back()));
-    freelist_.pop_back();
-  }
-  PERFETTO_DCHECK(write_queue_.back().size() == 0);
-}
-
-void PagePool::EndRead(std::vector<PageBlock> page_blocks) {
-  PERFETTO_DCHECK_THREAD(reader_thread_);
-  for (PageBlock& page_block : page_blocks)
-    page_block.Clear();
-
-  std::lock_guard<std::mutex> lock(mutex_);
-  freelist_.insert(freelist_.end(),
-                   std::make_move_iterator(page_blocks.begin()),
-                   std::make_move_iterator(page_blocks.end()));
-
-  // Even if blocks in the freelist don't waste any resident memory (because
-  // the Clear() call above madvise()s them) let's avoid that in pathological
-  // cases we keep accumulating virtual address space reservations.
-  if (freelist_.size() > kMaxFreelistBlocks)
-    freelist_.erase(freelist_.begin() + kMaxFreelistBlocks, freelist_.end());
-}
-
-}  // namespace perfetto
diff --git a/src/traced/probes/ftrace/page_pool.h b/src/traced/probes/ftrace/page_pool.h
deleted file mode 100644
index de5c250..0000000
--- a/src/traced/probes/ftrace/page_pool.h
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Copyright (C) 2018 The Android Open Source Project
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License At
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef SRC_TRACED_PROBES_FTRACE_PAGE_POOL_H_
-#define SRC_TRACED_PROBES_FTRACE_PAGE_POOL_H_
-
-#include <stdint.h>
-
-#include <mutex>
-#include <vector>
-
-#include "perfetto/base/logging.h"
-#include "perfetto/ext/base/optional.h"
-#include "perfetto/ext/base/paged_memory.h"
-#include "perfetto/ext/base/thread_checker.h"
-#include "perfetto/ext/base/utils.h"
-
-namespace perfetto {
-
-// This class is a page pool tailored around the needs of the ftrace CpuReader.
-// It has two responsibilities:
-// 1) A cheap bump-pointer page allocator for the writing side of CpuReader.
-// 2) A thread-safe producer/consumer queue to synchronize the read/write
-//    threads of CpuReader.
-// For context, CpuReader (and hence this class) is used on two threads:
-// (1) A worker thread that writes into the buffer and (2) the main thread which
-// reads all the content in big batches and turn them into protos.
-// There is at most one thread writing and at most one thread reading. In rare
-// circumstances they can be active At the same time.
-// This class is optimized for the following use case:
-// - Most of the times CpuReader wants to write 4096 bytes. In some rare cases
-//   (read() during flush) it wants to write < 4096 bytes.
-// - Even when it writes < 4096 bytes, CpuReader can figure out the size of the
-//   payload from the ftrace header. We don't need extra tracking to tell how
-//   much of each page is used.
-// - Doing a syscall for each page write is overkill. In most occasions
-//   CpuReader writes bursts of several pages in one go.
-// - We can't really predict upfront how big the write bursts will be, hence we
-//   cannot predict the size of the pool, unless we accept a very high bound.
-//   In extreme, yet rare, conditions, CpuReader will read the whole per-cpu
-//   ftrace buffer, while the reader is still reading the previous batch.
-// - Write burst should not be too frequent, so once they are over it's worth
-//   spending some extra cycles to release the memory.
-// - The reader side always wants to read *all* the written pages in one batch.
-//   While this happens though, the write might want to write more.
-//
-// The architecture of this class is as follows. Pages are organized in
-// PageBlock(s). A PageBlock is simply an array of pages and is the elementary
-// unit of memory allocation and frees. Pages within one block are cheaply
-// allocated with a simple bump-pointer allocator.
-//
-//      [      Writer (thread worker)    ] | [    Reader (main thread)   ]
-//                                  ~~~~~~~~~~~~~~~~~~~~~
-//      +---> write queue ------------> ready queue --+
-//      |                                             |
-//      +------------------------------- freelist <---+
-//                                  ~~~~~~~~~~~~~~~~~~~~~
-//                                  ~  mutex protected  ~
-//                                  ~~~~~~~~~~~~~~~~~~~~~
-class PagePool {
- public:
-  class PageBlock {
-   public:
-    static constexpr size_t kPagesPerBlock = 32;  // 32 * 4KB = 128 KB.
-    static constexpr size_t kBlockSize = kPagesPerBlock * base::kPageSize;
-
-    // This factory method is just that we accidentally create extra blocks
-    // without realizing by triggering the default constructor in containers.
-    static PageBlock Create() { return PageBlock(); }
-
-    PageBlock(PageBlock&&) noexcept = default;
-    PageBlock& operator=(PageBlock&&) = default;
-
-    size_t size() const { return size_; }
-    bool IsFull() const { return size_ >= kPagesPerBlock; }
-
-    // Returns the pointer to the contents of the i-th page in the block.
-    uint8_t* At(size_t i) const {
-      PERFETTO_DCHECK(i < kPagesPerBlock);
-      return reinterpret_cast<uint8_t*>(mem_.Get()) + i * base::kPageSize;
-    }
-
-    uint8_t* CurPage() const { return At(size_); }
-
-    void NextPage() {
-      PERFETTO_DCHECK(!IsFull());
-      size_++;
-    }
-
-    // Releases memory of the block and marks it available for reuse.
-    void Clear() {
-      size_ = 0;
-      mem_.AdviseDontNeed(mem_.Get(), kBlockSize);
-    }
-
-   private:
-    PageBlock(const PageBlock&) = delete;
-    PageBlock& operator=(const PageBlock&) = delete;
-    PageBlock() { mem_ = base::PagedMemory::Allocate(kBlockSize); }
-
-    base::PagedMemory mem_;
-    size_t size_ = 0;
-  };
-
-  PagePool() {
-    PERFETTO_DETACH_FROM_THREAD(writer_thread_);
-    PERFETTO_DETACH_FROM_THREAD(reader_thread_);
-  }
-
-  // Grabs a new page, eventually allocating a whole new PageBlock.
-  // If contents are written to the page, the caller must call EndWrite().
-  // If no data is written, it is okay to leave the BeginWrite() unpaired
-  // (e.g., in case of a non-blocking read returning no data) and call again
-  // BeginWrite() in the future.
-  uint8_t* BeginWrite() {
-    PERFETTO_DCHECK_THREAD(writer_thread_);
-    if (write_queue_.empty() || write_queue_.back().IsFull())
-      NewPageBlock();  // Slowpath. Tries the freelist first, then allocates.
-    return write_queue_.back().CurPage();
-  }
-
-  // Marks the last page as written and bumps the write pointer.
-  void EndWrite() {
-    PERFETTO_DCHECK_THREAD(writer_thread_);
-    PERFETTO_DCHECK(!write_queue_.empty() && !write_queue_.back().IsFull());
-    write_queue_.back().NextPage();
-  }
-
-  // Makes all written pages available to the reader.
-  // Returns an upper bound on the number of pages written.
-  size_t CommitWrittenPages() {
-    PERFETTO_DCHECK_THREAD(writer_thread_);
-    size_t size = write_queue_.size() * PagePool::PageBlock::kPagesPerBlock;
-    std::lock_guard<std::mutex> lock(mutex_);
-    read_queue_.insert(read_queue_.end(),
-                       std::make_move_iterator(write_queue_.begin()),
-                       std::make_move_iterator(write_queue_.end()));
-    write_queue_.clear();
-    return size;
-  }
-
-  // Moves ownership of all the page blocks in the read queue to the caller.
-  // The caller is expected to move them back after reading through EndRead().
-  // PageBlocks will be freed if the caller doesn't call EndRead().
-  std::vector<PageBlock> BeginRead() {
-    PERFETTO_DCHECK_THREAD(reader_thread_);
-    std::lock_guard<std::mutex> lock(mutex_);
-    auto res = std::move(read_queue_);
-    read_queue_.clear();
-    return res;
-  }
-
-  // Returns the page blocks borrowed for read and makes them available for
-  // reuse. This allows the writer to avoid doing syscalls after the initial
-  // writes.
-  void EndRead(std::vector<PageBlock> page_blocks);
-
-  size_t freelist_size_for_testing() const { return freelist_.size(); }
-
- private:
-  PagePool(const PagePool&) = delete;
-  PagePool& operator=(const PagePool&) = delete;
-  void NewPageBlock();
-
-  PERFETTO_THREAD_CHECKER(writer_thread_)
-  std::vector<PageBlock> write_queue_;  // Accessed exclusively by the writer.
-
-  std::mutex mutex_;  // Protects both the read queue and the freelist.
-
-  PERFETTO_THREAD_CHECKER(reader_thread_)
-  std::vector<PageBlock> read_queue_;  // Accessed by both threads.
-  std::vector<PageBlock> freelist_;    // Accessed by both threads.
-};
-
-}  // namespace perfetto
-
-#endif  // SRC_TRACED_PROBES_FTRACE_PAGE_POOL_H_
diff --git a/src/traced/probes/ftrace/page_pool_unittest.cc b/src/traced/probes/ftrace/page_pool_unittest.cc
deleted file mode 100644
index e99b7cb..0000000
--- a/src/traced/probes/ftrace/page_pool_unittest.cc
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Copyright (C) 2018 The Android Open Source Project
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "src/traced/probes/ftrace/page_pool.h"
-
-#include <array>
-#include <mutex>
-#include <random>
-#include <thread>
-#include <vector>
-
-#include <gtest/gtest.h>
-
-namespace perfetto {
-namespace {
-
-TEST(PagePoolTest, SingleThreaded) {
-  PagePool pool;
-  for (int i = 0; i < 2; i++)
-    ASSERT_TRUE(pool.BeginRead().empty());
-
-  for (int repeat = 0; repeat < 3; repeat++) {
-    for (uint32_t seed = 0; seed < 6; seed++) {
-      uint8_t* page = pool.BeginWrite();
-      std::minstd_rand0 rnd_engine(seed);
-      std::generate(page, page + base::kPageSize, rnd_engine);
-      // Deliberately make it so pages 3 is overwritten, so we should see only
-      // pages 0, 1, 2, 4, 5.
-      if (seed != 3)
-        pool.EndWrite();
-    }
-
-    // No write should be visible until the CommitWrittenPages() call.
-    ASSERT_TRUE(pool.BeginRead().empty());
-
-    pool.CommitWrittenPages();
-
-    auto blocks = pool.BeginRead();
-    ASSERT_EQ(blocks.size(), 1);
-    ASSERT_EQ(blocks[0].size(), 5);
-    for (uint32_t i = 0; i < blocks[0].size(); i++) {
-      auto seed = std::array<uint32_t, 5>{{0, 1, 2, 4, 5}}[i];
-      const char* page = reinterpret_cast<const char*>(blocks[0].At(i));
-      char expected[base::kPageSize];
-      std::minstd_rand0 rnd_engine(seed);
-      std::generate(expected, expected + base::kPageSize, rnd_engine);
-      EXPECT_STREQ(page, expected);
-    }
-
-    pool.EndRead(std::move(blocks));
-    ASSERT_EQ(pool.freelist_size_for_testing(), 1);
-  }
-}
-
-TEST(PagePoolTest, MultiThreaded) {
-  PagePool pool;
-
-  // Generate some random content.
-  std::vector<std::string> expected_pages;
-  std::minstd_rand0 rnd_engine(0);
-  for (int i = 0; i < 1000; i++) {
-    expected_pages.emplace_back();
-    std::string& page = expected_pages.back();
-    page.resize(base::kPageSize);
-    std::generate(page.begin(), page.end(), rnd_engine);
-  }
-
-  auto writer_fn = [&pool, &expected_pages] {
-    std::minstd_rand0 rnd(0);
-    for (const std::string& expected_page : expected_pages) {
-      uint8_t* dst = pool.BeginWrite();
-      memcpy(dst, expected_page.data(), base::kPageSize);
-      pool.EndWrite();
-      if (rnd() % 16 == 0)
-        pool.CommitWrittenPages();
-    }
-    pool.CommitWrittenPages();
-  };
-
-  auto reader_fn = [&pool, &expected_pages] {
-    for (size_t page_idx = 0; page_idx < expected_pages.size();) {
-      auto blocks = pool.BeginRead();
-      for (const auto& block : blocks) {
-        for (size_t i = 0; i < block.size(); i++) {
-          const char* page = reinterpret_cast<const char*>(block.At(i));
-          EXPECT_EQ(expected_pages[page_idx],
-                    std::string(page, base::kPageSize));
-          page_idx++;
-        }
-      }
-      pool.EndRead(std::move(blocks));
-    }
-  };
-
-  std::thread writer(writer_fn);
-  std::thread reader(reader_fn);
-  writer.join();
-  reader.join();
-}
-
-}  // namespace
-}  // namespace perfetto