traced_probes ftrace: switch back to single-threaded, nonblocking read(2)-only approach
Per the recent performance measurements (go/perfetto-bg-cpu), we no longer
think that the multi-threading and block/nonblock splice/read approach is
worthwhile in its current state. It is not just highly complex, it also has an
unnecessarily high cpu% overhead.
The reading is now guided by a single repeating task that reads & parses the
contents of all per-cpu ftrace pipes.
What we lose in this version:
* ability to sleep until a page of ftrace events is filled (with blocking
splice). This would only make a difference for tracing sessions with truly
low-frequency events (not important to optimize for atm).
* scalability for many-core machines. This version works well for an 8 core
phone, but is likely to struggle on a 64 core workstation. Let's treat this
patch as a reset for complexity, and reintroduce it only as necessary.
Update: ran on my 72 core dev workstation as a smoke test, it kept up fine.
* possibly splice efficiency? Haven't tried a single-thread splicer, but the
bigger immediate wins are probably in the parsing code (this version is at
5:1 utime:stime ratio according to my measurements).
Rough measurements of traced_probes cpu% on a crosshatch (standalone ndk build
+ tmux script), with the methodology as in go/perfetto-bg-cpu:
tuned cfg: 32k page (i.e. chunk) size, 1s ftrace drain period.
idle device, default cfg: 2.4%
idle device, tuned cfg: <1%
video rec, default cfg: 13.5%
video rec, tuned cfg: 6%
So we're doing much better with a tuned config, waking up for 60ms to process
all cores once a second.
Unfortunately this patch is lacking in programmatic tests, I'm not really sure
which ones would be worthwhile with the existing mock-heavy test setups. This
will likely require a separate pass (in a separate cl) to be more
unit-testable (it'd be nice to test the cpu_reader loop stop/continue logic).
Bug: 133312949
Change-Id: Ia79a267f43214f336b5396f4dd5789bc49ab1e67
diff --git a/Android.bp b/Android.bp
index 924cf8c..2ceb40d 100644
--- a/Android.bp
+++ b/Android.bp
@@ -310,7 +310,6 @@
"src/traced/probes/ftrace/ftrace_metadata.cc",
"src/traced/probes/ftrace/ftrace_procfs.cc",
"src/traced/probes/ftrace/ftrace_stats.cc",
- "src/traced/probes/ftrace/page_pool.cc",
"src/traced/probes/ftrace/proto_translation_table.cc",
"src/traced/probes/metatrace/metatrace_data_source.cc",
"src/traced/probes/packages_list/packages_list_data_source.cc",
@@ -846,7 +845,6 @@
"src/traced/probes/ftrace/ftrace_procfs.cc",
"src/traced/probes/ftrace/ftrace_procfs_integrationtest.cc",
"src/traced/probes/ftrace/ftrace_stats.cc",
- "src/traced/probes/ftrace/page_pool.cc",
"src/traced/probes/ftrace/proto_translation_table.cc",
"src/traced/probes/ftrace/test/cpu_reader_support.cc",
"src/traced/probes/metatrace/metatrace_data_source.cc",
@@ -3520,8 +3518,6 @@
"src/traced/probes/ftrace/ftrace_procfs.cc",
"src/traced/probes/ftrace/ftrace_procfs_unittest.cc",
"src/traced/probes/ftrace/ftrace_stats.cc",
- "src/traced/probes/ftrace/page_pool.cc",
- "src/traced/probes/ftrace/page_pool_unittest.cc",
"src/traced/probes/ftrace/proto_translation_table.cc",
"src/traced/probes/ftrace/proto_translation_table_unittest.cc",
"src/traced/probes/ftrace/test/cpu_reader_support.cc",
diff --git a/docs/ftrace.md b/docs/ftrace.md
index c35d95c..6e29fc5 100644
--- a/docs/ftrace.md
+++ b/docs/ftrace.md
@@ -12,41 +12,5 @@
- Describe how to generate ftrace protos (`tools/pull_ftrace_format_files.py`,
`tools/udate_protos.py`)
- Describe how session multiplexing works.
-- Describe the page-by-page scheduling algorithm that uses vmsplice()
Code lives in [/src/traced/probes/ftrace](/src/traced/probes/ftrace/).
-
-From https://android-review.googlesource.com/c/platform/external/perfetto/+/603793/
-```
-
- main thread [drain] [unblock]
- /: |
- post .-----' : |
- / : v
- worker #0 [splice ...] [wakeup] [block ............] [splice]
- :
- worker #1 [splice ...] [wakeup] [block ........] [splice]
- :
- worker #2 [splice ..........................................]
- :
- :
- drain period (100ms)
-
-In other words, the splice(2) system call is used to move data from
-the raw kernel ftrace pipe into an intermediate pipe at a page
-granularity. This call allows every per-cpu worker to sleep until there
-is at least one page of data available.
-
-When a worker wakes up, it will attempt to move as many pages as
-possible to its staging pipe (up to 64K, depending on the
-system's pipe buffer size) in a non-blocking way. After this, it
-will notify the main thread that data is available. This notification
-will block the calling worker until the main thread has drained the
-data.
-
-When at least one worker has woken up, we schedule a drain operation
-on the main thread for the next drain period (every 100ms by default).
-The drain operation parses ftrace data from the staging pipes of
-every worker having pending data. After this, each waiting worker is
-allowed to issue another call to splice(), restarting the cycle.
-```
diff --git a/include/perfetto/ext/base/metatrace_events.h b/include/perfetto/ext/base/metatrace_events.h
index f356313..1f2aea6 100644
--- a/include/perfetto/ext/base/metatrace_events.h
+++ b/include/perfetto/ext/base/metatrace_events.h
@@ -39,24 +39,27 @@
// DO NOT remove or reshuffle items in this list, only append. The ID of these
// events are an ABI, the trace processor relies on these to open old traces.
#define PERFETTO_METATRACE_EVENTS(F) \
- F(EVENT_ZERO_UNUSED),\
- F(FTRACE_CPU_READER_READ), \
- F(FTRACE_DRAIN_CPUS), \
- F(FTRACE_UNBLOCK_READERS), \
- F(FTRACE_CPU_READ_NONBLOCK), \
- F(FTRACE_CPU_READ_BLOCK), \
- F(FTRACE_CPU_SPLICE_NONBLOCK), \
- F(FTRACE_CPU_SPLICE_BLOCK), \
- F(FTRACE_CPU_WAIT_CMD), \
- F(FTRACE_CPU_RUN_CYCLE), \
+ F(EVENT_ZERO_UNUSED), \
+ F(FTRACE_CPU_READER_READ), /*unused*/ \
+ F(FTRACE_DRAIN_CPUS), /*unused*/ \
+ F(FTRACE_UNBLOCK_READERS), /*unused*/ \
+ F(FTRACE_CPU_READ_NONBLOCK), /*unused*/ \
+ F(FTRACE_CPU_READ_BLOCK), /*unused*/ \
+ F(FTRACE_CPU_SPLICE_NONBLOCK), /*unused*/ \
+ F(FTRACE_CPU_SPLICE_BLOCK), /*unused*/ \
+ F(FTRACE_CPU_WAIT_CMD), /*unused*/ \
+ F(FTRACE_CPU_RUN_CYCLE), /*unused*/ \
F(FTRACE_CPU_FLUSH), \
- F(FTRACE_CPU_DRAIN), \
+ F(FTRACE_CPU_DRAIN), /*unused*/ \
F(READ_SYS_STATS), \
F(PS_WRITE_ALL_PROCESSES), \
F(PS_ON_PIDS), \
F(PS_ON_RENAME_PIDS), \
F(PS_WRITE_ALL_PROCESS_STATS), \
- F(TRACE_WRITER_COMMIT_STARTUP_WRITER_BATCH)
+ F(TRACE_WRITER_COMMIT_STARTUP_WRITER_BATCH), \
+ F(FTRACE_READ_TICK), \
+ F(FTRACE_CPU_READ_CYCLE), \
+ F(FTRACE_CPU_READ_BATCH)
// Append only, see above.
#define PERFETTO_METATRACE_COUNTERS(F) \
diff --git a/src/traced/probes/ftrace/BUILD.gn b/src/traced/probes/ftrace/BUILD.gn
index 17aa2f0..a38dbfe 100644
--- a/src/traced/probes/ftrace/BUILD.gn
+++ b/src/traced/probes/ftrace/BUILD.gn
@@ -64,7 +64,6 @@
"ftrace_config_unittest.cc",
"ftrace_controller_unittest.cc",
"ftrace_procfs_unittest.cc",
- "page_pool_unittest.cc",
"proto_translation_table_unittest.cc",
]
}
@@ -148,8 +147,6 @@
"ftrace_procfs.h",
"ftrace_stats.cc",
"ftrace_stats.h",
- "page_pool.cc",
- "page_pool.h",
"proto_translation_table.cc",
"proto_translation_table.h",
]
diff --git a/src/traced/probes/ftrace/cpu_reader.cc b/src/traced/probes/ftrace/cpu_reader.cc
index dbd7a75..23276df 100644
--- a/src/traced/probes/ftrace/cpu_reader.cc
+++ b/src/traced/probes/ftrace/cpu_reader.cc
@@ -31,7 +31,6 @@
#include "perfetto/ext/base/utils.h"
#include "src/traced/probes/ftrace/ftrace_controller.h"
#include "src/traced/probes/ftrace/ftrace_data_source.h"
-#include "src/traced/probes/ftrace/ftrace_thread_sync.h"
#include "src/traced/probes/ftrace/proto_translation_table.h"
#include "perfetto/trace/ftrace/ftrace_event.pbzero.h"
@@ -115,6 +114,11 @@
return fcntl(fd, F_SETFL, flags) == 0;
}
+// TODO(rsavitski): |overwrite| extraction seems wrong, the top part of the
+// second ("commit") field is a bit mask (see RB_MISSED_FLAGS in kernel
+// sources), not the direct count. The kernel conditionally appends the missed
+// events count to the end of the page (if there's space), which we might
+// also need to account for when parsing the page.
base::Optional<PageHeader> ParsePageHeader(const uint8_t** ptr,
uint16_t page_header_size_len) {
const uint8_t* end_of_page = *ptr + base::kPageSize;
@@ -149,286 +153,144 @@
using protos::pbzero::GenericFtraceEvent;
CpuReader::CpuReader(const ProtoTranslationTable* table,
- FtraceThreadSync* thread_sync,
size_t cpu,
- int generation,
- base::ScopedFile fd)
- : table_(table),
- thread_sync_(thread_sync),
- cpu_(cpu),
- trace_fd_(std::move(fd)) {
- // Make reads from the raw pipe blocking so that splice() can sleep.
+ base::ScopedFile trace_fd)
+ : table_(table), cpu_(cpu), trace_fd_(std::move(trace_fd)) {
PERFETTO_CHECK(trace_fd_);
- PERFETTO_CHECK(SetBlocking(*trace_fd_, true));
-
- // We need a non-default SIGPIPE handler to make it so that the blocking
- // splice() is woken up when the ~CpuReader() dtor destroys the pipes.
- // Just masking out the signal would cause an implicit syscall restart and
- // hence make the join() in the dtor unreliable.
- struct sigaction current_act = {};
- PERFETTO_CHECK(sigaction(SIGPIPE, nullptr, ¤t_act) == 0);
-#pragma GCC diagnostic push
-#if defined(__clang__)
-#pragma GCC diagnostic ignored "-Wdisabled-macro-expansion"
-#endif
- if (current_act.sa_handler == SIG_DFL || current_act.sa_handler == SIG_IGN) {
- struct sigaction act = {};
- act.sa_sigaction = [](int, siginfo_t*, void*) {};
- PERFETTO_CHECK(sigaction(SIGPIPE, &act, nullptr) == 0);
- }
-#pragma GCC diagnostic pop
-
- worker_thread_ = std::thread(std::bind(&RunWorkerThread, cpu_, generation,
- *trace_fd_, &pool_, thread_sync_,
- table->page_header_size_len()));
+ PERFETTO_CHECK(SetBlocking(*trace_fd_, false));
}
-CpuReader::~CpuReader() {
-// FtraceController (who owns this) is supposed to issue a kStop notification
-// to the thread sync object before destroying the CpuReader.
-#if PERFETTO_DCHECK_IS_ON()
+CpuReader::~CpuReader() = default;
+
+size_t CpuReader::ReadCycle(
+ uint8_t* parsing_buf,
+ size_t parsing_buf_size_pages,
+ size_t max_pages,
+ const std::set<FtraceDataSource*>& started_data_sources) {
+ PERFETTO_DCHECK(max_pages >= parsing_buf_size_pages &&
+ max_pages % parsing_buf_size_pages == 0);
+ metatrace::ScopedEvent evt(metatrace::TAG_FTRACE,
+ metatrace::FTRACE_CPU_READ_CYCLE);
+
+ // Work in batches to keep cache locality, and limit memory usage.
+ size_t total_pages_read = 0;
+ for (bool is_first_batch = true;; is_first_batch = false) {
+ size_t pages_read =
+ ReadAndProcessBatch(parsing_buf, parsing_buf_size_pages, is_first_batch,
+ started_data_sources);
+
+ PERFETTO_DCHECK(pages_read <= parsing_buf_size_pages);
+ total_pages_read += pages_read;
+
+ // Check whether we've caught up to the writer, or possibly giving up on
+ // this attempt due to some error.
+ if (pages_read != parsing_buf_size_pages)
+ break;
+ // Check if we've hit the limit of work for this cycle.
+ if (total_pages_read >= max_pages)
+ break;
+ }
+ PERFETTO_METATRACE_COUNTER(TAG_FTRACE, FTRACE_PAGES_DRAINED,
+ total_pages_read);
+ return total_pages_read;
+}
+
+// metatrace note: mark the reading phase as FTRACE_CPU_READ_BATCH, but let the
+// parsing time be implied (by the difference between the caller's span, and
+// this reading span). Makes it easier to estimate the read/parse ratio when
+// looking at the trace in the UI.
+size_t CpuReader::ReadAndProcessBatch(
+ uint8_t* parsing_buf,
+ size_t max_pages,
+ bool first_batch_in_cycle,
+ const std::set<FtraceDataSource*>& started_data_sources) {
+ size_t pages_read = 0;
{
- std::lock_guard<std::mutex> lock(thread_sync_->mutex);
- PERFETTO_DCHECK(thread_sync_->cmd == FtraceThreadSync::kQuit);
- }
-#endif
-
- // The kernel's splice implementation for the trace pipe doesn't generate a
- // SIGPIPE if the output pipe is closed (b/73807072). Instead, the call to
- // close() on the pipe hangs forever. To work around this, we first close the
- // trace fd (which prevents another splice from starting), raise SIGPIPE and
- // wait for the worker to exit (i.e., to guarantee no splice is in progress)
- // and only then close the staging pipe.
- trace_fd_.reset();
- InterruptWorkerThreadWithSignal();
- worker_thread_.join();
-}
-
-void CpuReader::InterruptWorkerThreadWithSignal() {
- pthread_kill(worker_thread_.native_handle(), SIGPIPE);
-}
-
-// The worker thread reads data from the ftrace trace_pipe_raw and moves it to
-// the page |pool| allowing the main thread to read and decode that.
-// See //docs/ftrace.md for the design of the ftrace worker scheduler.
-// static
-void CpuReader::RunWorkerThread(size_t cpu,
- int generation,
- int trace_fd,
- PagePool* pool,
- FtraceThreadSync* thread_sync,
- uint16_t header_size_len) {
-// Before attempting any changes to this function, think twice. The kernel
-// ftrace pipe code is full of caveats and bugs. This code carefully works
-// around those bugs. See b/120188810 and b/119805587 for the full narrative.
-#if PERFETTO_BUILDFLAG(PERFETTO_OS_LINUX) || \
- PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
- char thread_name[16];
- snprintf(thread_name, sizeof(thread_name), "traced_probes%zu", cpu);
- pthread_setname_np(pthread_self(), thread_name);
-
- // When using splice() the target fd needs to be an actual pipe. This pipe is
- // used only within this thread and is mainly for synchronization purposes.
- // A blocking splice() is the only way to block and wait for a new page of
- // ftrace data.
- base::Pipe sync_pipe = base::Pipe::Create(base::Pipe::kBothNonBlock);
-
- enum ReadMode { kRead, kSplice };
- enum Block { kBlock, kNonBlock };
- constexpr auto kPageSize = base::kPageSize;
-
- // This lambda function reads the ftrace raw pipe using either read() or
- // splice(), either in blocking or non-blocking mode.
- // Returns the number of ftrace bytes read, or -1 in case of failure.
- auto read_ftrace_pipe = [&sync_pipe, trace_fd, pool, header_size_len](
- ReadMode mode, Block block) -> int {
- auto eid = mode == kRead
- ? (block == kNonBlock ? metatrace::FTRACE_CPU_READ_NONBLOCK
- : metatrace::FTRACE_CPU_READ_BLOCK)
- : (block == kNonBlock ? metatrace::FTRACE_CPU_SPLICE_NONBLOCK
- : metatrace::FTRACE_CPU_SPLICE_BLOCK);
- metatrace::ScopedEvent evt(metatrace::TAG_FTRACE, eid);
-
- uint8_t* pool_page = pool->BeginWrite();
- PERFETTO_DCHECK(pool_page);
-
- ssize_t res;
- int err = 0;
- if (mode == kSplice) {
- uint32_t flg = SPLICE_F_MOVE | ((block == kNonBlock) * SPLICE_F_NONBLOCK);
- res = splice(trace_fd, nullptr, *sync_pipe.wr, nullptr, kPageSize, flg);
- err = errno;
- if (res > 0) {
- // If the splice() succeeded, read back from the other end of our own
- // pipe and copy the data into the pool.
- ssize_t rdres = read(*sync_pipe.rd, pool_page, kPageSize);
- PERFETTO_DCHECK(rdres == res);
+ metatrace::ScopedEvent evt(metatrace::TAG_FTRACE,
+ metatrace::FTRACE_CPU_READ_BATCH);
+ for (; pages_read < max_pages;) {
+ uint8_t* curr_page = parsing_buf + (pages_read * base::kPageSize);
+ ssize_t res =
+ PERFETTO_EINTR(read(*trace_fd_, curr_page, base::kPageSize));
+ if (res < 0) {
+ // Expected errors:
+ // EAGAIN: no data (since we're in non-blocking mode).
+ // ENONMEM, EBUSY: temporary ftrace failures (they happen).
+ if (errno != EAGAIN && errno != ENOMEM && errno != EBUSY)
+ PERFETTO_PLOG("Unexpected error on raw ftrace read");
+ break; // stop reading regardless of errno
}
- } else {
- if (block == kNonBlock)
- SetBlocking(trace_fd, false);
- res = read(trace_fd, pool_page, kPageSize);
- err = errno;
- if (res > 0) {
- // Need to copy the ptr, ParsePageHeader() advances the passed ptr arg.
- const uint8_t* ptr = pool_page;
- // The caller of this function wants to have a sufficient approximation
- // of how many bytes of ftrace data have been read. Unfortunately the
- // return value of read() is a lie. The problem is that the ftrace
- // read() implementation, for good reasons, always reconstructs a whole
- // ftrace page, copying the events over and zero-filling at the end.
- // This is nice, because we always get a valid ftrace header, but also
- // causes read to always returns 4096. The only way to have a good
- // indication of how many bytes of ftrace data have been read is to
- // parse the ftrace header.
- // Note: |header_size_len| is *not* an indication on how many bytes are
- // available form |ptr|. It's just an independent piece of information
- // that needs to be passed to ParsePageHeader() (a static function) in
- // order to work.
- base::Optional<PageHeader> hdr = ParsePageHeader(&ptr, header_size_len);
- PERFETTO_DCHECK(hdr && hdr->size > 0 && hdr->size <= base::kPageSize);
- res = hdr.has_value() ? static_cast<int>(hdr->size) : -1;
- }
- if (block == kNonBlock)
- SetBlocking(trace_fd, true);
- }
-
- if (res > 0) {
- // splice() should return full pages, read can return < a page.
- PERFETTO_DCHECK(res == base::kPageSize || mode == kRead);
- pool->EndWrite();
- return static_cast<int>(res);
- }
-
- // It is fine to leave the BeginWrite() unpaired in the error case.
-
- if (res && err != EAGAIN && err != ENOMEM && err != EBUSY && err != EINTR &&
- err != EBADF) {
- // EAGAIN: no data when in non-blocking mode.
- // ENONMEM, EBUSY: temporary ftrace failures (they happen).
- // EINTR: signal interruption, likely from main thread to issue a new cmd.
- // EBADF: the main thread has closed the fd (happens during dtor).
- PERFETTO_PLOG("Unexpected %s() err", mode == kRead ? "read" : "splice");
- }
- return -1;
- };
-
- uint64_t last_cmd_id = 0;
- ReadMode cur_mode = kSplice;
- for (bool run_loop = true; run_loop;) {
- FtraceThreadSync::Cmd cmd;
- // Wait for a new command from the main thread issued by FtraceController.
- // The FtraceController issues also a signal() after every new command. This
- // is not necessary for the condition variable itself, but it's necessary to
- // unblock us if we are in a blocking read() or splice().
- // Commands are tagged with an ID, every new command has a new |cmd_id|, so
- // we can distinguish spurious wakeups from actual cmd requests.
- {
- PERFETTO_METATRACE_SCOPED(TAG_FTRACE, FTRACE_CPU_WAIT_CMD);
- std::unique_lock<std::mutex> lock(thread_sync->mutex);
- while (thread_sync->cmd_id == last_cmd_id)
- thread_sync->cond.wait(lock);
- cmd = thread_sync->cmd;
- last_cmd_id = thread_sync->cmd_id;
- }
-
- // An empirical threshold (bytes read/spliced from the raw pipe) to make an
- // educated guess on whether we should read/splice more. If we read fewer
- // bytes it means that we caught up with the write pointer and we started
- // consuming ftrace events in real-time. This cannot be just 4096 because
- // it needs to account for fragmentation, i.e. for the fact that the last
- // trace event didn't fit in the current page and hence the current page
- // was terminated prematurely.
- constexpr int kRoughlyAPage = 4096 - 512;
-
- switch (cmd) {
- case FtraceThreadSync::kQuit:
- run_loop = false;
- break;
-
- case FtraceThreadSync::kRun: {
- PERFETTO_METATRACE_SCOPED(TAG_FTRACE, FTRACE_CPU_RUN_CYCLE);
-
- // Do a blocking read/splice. This can fail for a variety of reasons:
- // - FtraceController interrupts us with a signal for a new cmd
- // (e.g. it wants us to quit or do a flush).
- // - A temporary read/splice() failure occurred (it has been observed
- // to happen if the system is under high load).
- // In all these cases the most useful thing we can do is skip the
- // current cycle and try again later.
- if (read_ftrace_pipe(cur_mode, kBlock) <= 0)
- break; // Wait for next command.
-
- // If we are in read mode (because of a previous flush) check if the
- // in-kernel read cursor is page-aligned again. If a non-blocking splice
- // succeeds, it means that we can safely switch back to splice mode
- // (See b/120188810).
- if (cur_mode == kRead && read_ftrace_pipe(kSplice, kNonBlock) > 0)
- cur_mode = kSplice;
-
- // Do as many non-blocking read/splice as we can.
- while (read_ftrace_pipe(cur_mode, kNonBlock) > kRoughlyAPage) {
- }
- size_t num_pages = pool->CommitWrittenPages();
- PERFETTO_METATRACE_COUNTER(TAG_FTRACE, FTRACE_PAGES_DRAINED, num_pages);
- FtraceController::OnCpuReaderRead(cpu, generation, thread_sync);
+ // As long as all of our reads are for a single page, the kernel should
+ // return exactly a well-formed raw ftrace page (if not in the steady
+ // state of reading out fully-written pages, the kernel will construct
+ // pages as necessary, copying over events and zero-filling at the end).
+ // A sub-page read() is therefore not expected in practice (unless
+ // there's a concurrent reader requesting less than a page?). Crash if
+ // encountering this situation. Kernel source pointer: see usage of
+ // |info->read| within |tracing_buffers_read|.
+ // TODO(rsavitski): don't crash, throw away the partial read & pipe
+ // through an error signal.
+ if (res == 0) {
+ // Very rare, but possible. Stop for now, should recover.
+ PERFETTO_DLOG("[cpu%zu]: 0-sized read from ftrace pipe.", cpu_);
break;
}
+ PERFETTO_CHECK(res == static_cast<ssize_t>(base::kPageSize));
- case FtraceThreadSync::kFlush: {
- PERFETTO_METATRACE_SCOPED(TAG_FTRACE, FTRACE_CPU_FLUSH);
- cur_mode = kRead;
- while (read_ftrace_pipe(cur_mode, kNonBlock) > kRoughlyAPage) {
- }
- size_t num_pages = pool->CommitWrittenPages();
- PERFETTO_METATRACE_COUNTER(TAG_FTRACE, FTRACE_PAGES_DRAINED, num_pages);
- FtraceController::OnCpuReaderFlush(cpu, generation, thread_sync);
+ pages_read += 1;
+
+ // Compare the amount of ftrace data read against an empirical threshold
+ // to make an educated guess on whether we should read more. To figure
+ // out the amount of ftrace data, we need to parse the page header (since
+ // the read always returns a page, zero-filled at the end). If we read
+ // fewer bytes than the threshold, it means that we caught up with the
+ // write pointer and we started consuming ftrace events in real-time.
+ // This cannot be just 4096 because it needs to account for
+ // fragmentation, i.e. for the fact that the last trace event didn't fit
+ // in the current page and hence the current page was terminated
+ // prematurely.
+ static constexpr size_t kRoughlyAPage = base::kPageSize - 512;
+ const uint8_t* scratch_ptr = curr_page;
+ base::Optional<PageHeader> hdr =
+ ParsePageHeader(&scratch_ptr, table_->page_header_size_len());
+ PERFETTO_DCHECK(hdr && hdr->size > 0 && hdr->size <= base::kPageSize);
+ if (!hdr.has_value()) {
+ PERFETTO_ELOG("[cpu%zu]: can't parse page header", cpu_);
break;
}
- } // switch(cmd)
- } // for(run_loop)
- PERFETTO_DPLOG("Terminating CPUReader thread for CPU %zd.", cpu);
-#else
- base::ignore_result(cpu);
- base::ignore_result(generation);
- base::ignore_result(trace_fd);
- base::ignore_result(pool);
- base::ignore_result(thread_sync);
- base::ignore_result(header_size_len);
- PERFETTO_ELOG("Supported only on Linux/Android");
-#endif
-}
-
-// Invoked on the main thread by FtraceController, |drain_rate_ms| after the
-// first CPU wakes up from the blocking read()/splice().
-void CpuReader::Drain(const std::set<FtraceDataSource*>& data_sources) {
- PERFETTO_DCHECK_THREAD(thread_checker_);
- PERFETTO_METATRACE_SCOPED(TAG_FTRACE, FTRACE_CPU_DRAIN);
-
- auto page_blocks = pool_.BeginRead();
- for (const auto& page_block : page_blocks) {
- for (size_t i = 0; i < page_block.size(); i++) {
- const uint8_t* page = page_block.At(i);
-
- for (FtraceDataSource* data_source : data_sources) {
- auto packet = data_source->trace_writer()->NewTracePacket();
- auto* bundle = packet->set_ftrace_events();
- auto* metadata = data_source->mutable_metadata();
- auto* filter = data_source->event_filter();
-
- // Note: The fastpath in proto_trace_parser.cc speculates on the fact
- // that the cpu field is the first field of the proto message. If this
- // changes, change proto_trace_parser.cc accordingly.
- bundle->set_cpu(static_cast<uint32_t>(cpu_));
-
- size_t evt_size = ParsePage(page, filter, bundle, table_, metadata);
- PERFETTO_DCHECK(evt_size);
- bundle->set_overwrite_count(metadata->overwrite_count);
+ // Note that the first read after starting the read cycle being small is
+ // normal. It means that we're given the remainder of events from a
+ // page that we've partially consumed during the last read of the previous
+ // cycle (having caught up to the writer).
+ if (hdr->size < kRoughlyAPage &&
+ !(first_batch_in_cycle && pages_read == 1)) {
+ break;
}
}
+ } // end of metatrace::FTRACE_CPU_READ_BATCH
+
+ // Parse the pages and write to the trace for of all relevant data
+ // sources.
+ for (size_t i = 0; i < pages_read; i++) {
+ uint8_t* curr_page = parsing_buf + (i * base::kPageSize);
+ for (FtraceDataSource* data_source : started_data_sources) {
+ auto packet = data_source->trace_writer()->NewTracePacket();
+ auto* bundle = packet->set_ftrace_events();
+ auto* metadata = data_source->mutable_metadata();
+ auto* filter = data_source->event_filter();
+
+ // Note: The fastpath in proto_trace_parser.cc speculates on the fact
+ // that the cpu field is the first field of the proto message. If this
+ // changes, change proto_trace_parser.cc accordingly.
+ bundle->set_cpu(static_cast<uint32_t>(cpu_));
+
+ size_t evt_size = ParsePage(curr_page, filter, bundle, table_, metadata);
+ PERFETTO_DCHECK(evt_size);
+ bundle->set_overwrite_count(metadata->overwrite_count);
+ }
}
- pool_.EndRead(std::move(page_blocks));
+ return pages_read;
}
// The structure of a raw trace buffer page is as follows:
diff --git a/src/traced/probes/ftrace/cpu_reader.h b/src/traced/probes/ftrace/cpu_reader.h
index 9ee0a61..4d14c52 100644
--- a/src/traced/probes/ftrace/cpu_reader.h
+++ b/src/traced/probes/ftrace/cpu_reader.h
@@ -34,13 +34,11 @@
#include "perfetto/protozero/message.h"
#include "perfetto/protozero/message_handle.h"
#include "src/traced/probes/ftrace/ftrace_metadata.h"
-#include "src/traced/probes/ftrace/page_pool.h"
#include "src/traced/probes/ftrace/proto_translation_table.h"
namespace perfetto {
class FtraceDataSource;
-struct FtraceThreadSync;
class ProtoTranslationTable;
namespace protos {
@@ -49,24 +47,23 @@
} // namespace pbzero
} // namespace protos
-
-// Reads raw ftrace data for a cpu and writes that into the perfetto userspace
-// buffer.
+// Reads raw ftrace data for a cpu, parses it, and writes it into the perfetto
+// tracing buffers.
class CpuReader {
public:
using FtraceEventBundle = protos::pbzero::FtraceEventBundle;
- CpuReader(const ProtoTranslationTable*,
- FtraceThreadSync*,
+ CpuReader(const ProtoTranslationTable* table,
size_t cpu,
- int generation,
- base::ScopedFile fd);
+ base::ScopedFile trace_fd);
~CpuReader();
- // Drains all available data into the buffer of the passed data sources.
- void Drain(const std::set<FtraceDataSource*>&);
-
- void InterruptWorkerThreadWithSignal();
+ // Reads and parses all ftrace data for this cpu (in batches), until we catch
+ // up to the writer, or hit |max_pages|. Returns number of pages read.
+ size_t ReadCycle(uint8_t* parsing_buf,
+ size_t parsing_buf_size_pages,
+ size_t max_pages,
+ const std::set<FtraceDataSource*>& started_data_sources);
template <typename T>
static bool ReadAndAdvance(const uint8_t** ptr, const uint8_t* end, T* out) {
@@ -178,25 +175,23 @@
FtraceMetadata* metadata);
private:
- static void RunWorkerThread(size_t cpu,
- int generation,
- int trace_fd,
- PagePool*,
- FtraceThreadSync*,
- uint16_t header_size_len);
-
CpuReader(const CpuReader&) = delete;
CpuReader& operator=(const CpuReader&) = delete;
- const ProtoTranslationTable* const table_;
- FtraceThreadSync* const thread_sync_;
- const size_t cpu_;
- PagePool pool_;
- base::ScopedFile trace_fd_;
- std::thread worker_thread_;
- PERFETTO_THREAD_CHECKER(thread_checker_)
-};
+ // Reads at most |max_pages| of ftrace data, parses it, and writes it
+ // into |started_data_sources|. Returns number of pages read.
+ // See comment on ftrace_controller.cc:kMaxParsingWorkingSetPages for
+ // rationale behind the batching.
+ size_t ReadAndProcessBatch(
+ uint8_t* parsing_buf,
+ size_t max_pages,
+ bool first_batch_in_cycle,
+ const std::set<FtraceDataSource*>& started_data_sources);
+ const ProtoTranslationTable* const table_;
+ const size_t cpu_;
+ base::ScopedFile trace_fd_;
+};
} // namespace perfetto
diff --git a/src/traced/probes/ftrace/ftrace_config_muxer.cc b/src/traced/probes/ftrace/ftrace_config_muxer.cc
index 21c1c7a..8ec4fd5 100644
--- a/src/traced/probes/ftrace/ftrace_config_muxer.cc
+++ b/src/traced/probes/ftrace/ftrace_config_muxer.cc
@@ -32,9 +32,6 @@
// trace_clocks in preference order.
constexpr const char* kClocks[] = {"boot", "global", "local"};
-constexpr int kDefaultPerCpuBufferSizeKb = 2 * 1024; // 2mb
-constexpr int kMaxPerCpuBufferSizeKb = 64 * 1024; // 64mb
-
void AddEventGroup(const ProtoTranslationTable* table,
const std::string& group,
std::set<GroupAndName>* to) {
diff --git a/src/traced/probes/ftrace/ftrace_config_muxer.h b/src/traced/probes/ftrace/ftrace_config_muxer.h
index 9bffd45..a8f92d5 100644
--- a/src/traced/probes/ftrace/ftrace_config_muxer.h
+++ b/src/traced/probes/ftrace/ftrace_config_muxer.h
@@ -27,6 +27,9 @@
namespace perfetto {
+constexpr int kDefaultPerCpuBufferSizeKb = 2 * 1024; // 2mb
+constexpr int kMaxPerCpuBufferSizeKb = 64 * 1024; // 64mb
+
// Ftrace is a bunch of globaly modifiable persistent state.
// Given a number of FtraceConfig's we need to find the best union of all
// the settings to make eveyone happy while also watching out for anybody
diff --git a/src/traced/probes/ftrace/ftrace_controller.cc b/src/traced/probes/ftrace/ftrace_controller.cc
index 907e6f1..e2f60fe 100644
--- a/src/traced/probes/ftrace/ftrace_controller.cc
+++ b/src/traced/probes/ftrace/ftrace_controller.cc
@@ -48,10 +48,24 @@
namespace {
constexpr int kDefaultDrainPeriodMs = 100;
-constexpr int kControllerFlushTimeoutMs = 500;
constexpr int kMinDrainPeriodMs = 1;
constexpr int kMaxDrainPeriodMs = 1000 * 60;
+// When reading and parsing data for a particular cpu, we do it in batches of
+// this many pages. In other words, we'll read up to
+// |kParsingBufferSizePages| into memory, parse them, and then repeat if we
+// still haven't caught up to the writer. A working set of 32 pages is 128k of
+// data, which should fit in a typical L1D cache. Furthermore, the batching
+// limits the memory usage of traced_probes.
+constexpr size_t kParsingBufferSizePages = 32;
+
+// Read at most this many pages of data per cpu per read task. If we hit this
+// limit on at least one cpu, we stop and repost the read task, letting other
+// tasks get some cpu time before continuing reading.
+// TODO(rsavitski): propagate the knowledge of the current per-cpu kernel buffer
+// size to this controller, and select the lower of the two.
+constexpr size_t kMaxPagesPerCpuPerReadTick = 256; // 1 MB per cpu
+
uint32_t ClampDrainPeriodMs(uint32_t drain_period_ms) {
if (drain_period_ms == 0) {
return kDefaultDrainPeriodMs;
@@ -135,16 +149,12 @@
Observer* observer)
: task_runner_(task_runner),
observer_(observer),
- thread_sync_(task_runner),
ftrace_procfs_(std::move(ftrace_procfs)),
table_(std::move(table)),
ftrace_config_muxer_(std::move(model)),
- weak_factory_(this) {
- thread_sync_.trace_controller_weak = GetWeakPtr();
-}
+ weak_factory_(this) {}
FtraceController::~FtraceController() {
- PERFETTO_DCHECK_THREAD(thread_checker_);
for (const auto* data_source : data_sources_)
ftrace_config_muxer_->RemoveConfig(data_source->config_id());
data_sources_.clear();
@@ -156,161 +166,86 @@
return static_cast<uint64_t>(base::GetWallTimeMs().count());
}
-// The OnCpuReader* methods below are called on the CpuReader worker threads.
-// Lifetime is guaranteed to be valid, because the FtraceController dtor
-// (that happens on the main thread) joins the worker threads.
-
-// static
-void FtraceController::OnCpuReaderRead(size_t cpu,
- int generation,
- FtraceThreadSync* thread_sync) {
- PERFETTO_METATRACE_SCOPED(TAG_FTRACE, FTRACE_CPU_READER_READ);
- {
- std::lock_guard<std::mutex> lock(thread_sync->mutex);
- // If this was the first CPU to wake up, schedule a drain for the next
- // drain interval.
- bool post_drain_task = thread_sync->cpus_to_drain.none();
- thread_sync->cpus_to_drain[cpu] = true;
- if (!post_drain_task)
- return;
- } // lock(thread_sync_.mutex)
-
- base::WeakPtr<FtraceController> weak_ctl = thread_sync->trace_controller_weak;
- base::TaskRunner* task_runner = thread_sync->task_runner;
-
- // The nested PostTask is used because the FtraceController (and hence
- // GetDrainPeriodMs()) can be called only on the main thread.
- task_runner->PostTask([weak_ctl, task_runner, generation] {
-
- if (!weak_ctl)
- return;
- uint32_t drain_period_ms = weak_ctl->GetDrainPeriodMs();
-
- task_runner->PostDelayedTask(
- [weak_ctl, generation] {
- if (weak_ctl)
- weak_ctl->DrainCPUs(generation);
- },
- drain_period_ms - (weak_ctl->NowMs() % drain_period_ms));
-
- });
-}
-
-// static
-void FtraceController::OnCpuReaderFlush(size_t cpu,
- int generation,
- FtraceThreadSync* thread_sync) {
- // In the case of a flush, we want to drain the data as quickly as possible to
- // minimize the flush latency, at the cost of more tasks / wakeups (eventually
- // one task per cpu). Flushes are not supposed to happen too frequently.
- {
- std::lock_guard<std::mutex> lock(thread_sync->mutex);
- thread_sync->cpus_to_drain[cpu] = true;
- thread_sync->flush_acks[cpu] = true;
- } // lock(thread_sync_.mutex)
-
- base::WeakPtr<FtraceController> weak_ctl = thread_sync->trace_controller_weak;
- thread_sync->task_runner->PostTask([weak_ctl, generation] {
- if (weak_ctl)
- weak_ctl->DrainCPUs(generation);
- });
-}
-
-void FtraceController::DrainCPUs(int generation) {
- PERFETTO_DCHECK_THREAD(thread_checker_);
- PERFETTO_METATRACE_SCOPED(TAG_FTRACE, FTRACE_DRAIN_CPUS);
-
- if (generation != generation_)
- return;
-
- const size_t num_cpus = ftrace_procfs_->NumberOfCpus();
- PERFETTO_DCHECK(cpu_readers_.size() == num_cpus);
- FlushRequestID ack_flush_request_id = 0;
- std::bitset<base::kMaxCpus> cpus_to_drain;
- {
- std::lock_guard<std::mutex> lock(thread_sync_.mutex);
- std::swap(cpus_to_drain, thread_sync_.cpus_to_drain);
-
- // Check also if a flush is pending and if all cpus have acked. If that's
- // the case, ack the overall Flush() request at the end of this function.
- if (cur_flush_request_id_ && thread_sync_.flush_acks.count() >= num_cpus) {
- thread_sync_.flush_acks.reset();
- ack_flush_request_id = cur_flush_request_id_;
- cur_flush_request_id_ = 0;
- }
- }
-
- for (size_t cpu = 0; cpu < num_cpus; cpu++) {
- if (!cpus_to_drain[cpu])
- continue;
- // This method reads the pipe and converts the raw ftrace data into
- // protobufs using the |data_source|'s TraceWriter.
- cpu_readers_[cpu]->Drain(started_data_sources_);
- OnDrainCpuForTesting(cpu);
- }
-
- // If we filled up any SHM pages while draining the data, we will have posted
- // a task to notify traced about this. Only unblock the readers after this
- // notification is sent to make it less likely that they steal CPU time away
- // from traced. Also, don't unblock the readers until all of them have replied
- // to the flush.
- if (!cur_flush_request_id_) {
- base::WeakPtr<FtraceController> weak_this = weak_factory_.GetWeakPtr();
- task_runner_->PostTask([weak_this] {
- if (weak_this)
- weak_this->UnblockReaders();
- });
- }
-
- observer_->OnFtraceDataWrittenIntoDataSourceBuffers();
-
- if (ack_flush_request_id) {
- // Flush completed, all CpuReader(s) acked.
-
- IssueThreadSyncCmd(FtraceThreadSync::kRun); // Switch back to reading mode.
-
- // This will call FtraceDataSource::OnFtraceFlushComplete(), which in turn
- // will flush the userspace buffers and ack the flush to the ProbesProducer
- // which in turn will ack the flush to the tracing service.
- NotifyFlushCompleteToStartedDataSources(ack_flush_request_id);
- }
-}
-
-void FtraceController::UnblockReaders() {
- PERFETTO_METATRACE_SCOPED(TAG_FTRACE, FTRACE_UNBLOCK_READERS);
-
- // If a flush or a quit is pending, do nothing.
- std::unique_lock<std::mutex> lock(thread_sync_.mutex);
- if (thread_sync_.cmd != FtraceThreadSync::kRun)
- return;
-
- // Unblock all waiting readers to start moving more data into their
- // respective staging pipes.
- IssueThreadSyncCmd(FtraceThreadSync::kRun, std::move(lock));
-}
-
void FtraceController::StartIfNeeded() {
if (started_data_sources_.size() > 1)
return;
PERFETTO_DCHECK(!started_data_sources_.empty());
PERFETTO_DCHECK(cpu_readers_.empty());
- base::WeakPtr<FtraceController> weak_this = weak_factory_.GetWeakPtr();
- {
- std::lock_guard<std::mutex> lock(thread_sync_.mutex);
- thread_sync_.cpus_to_drain.reset();
- thread_sync_.cmd = FtraceThreadSync::kRun;
- thread_sync_.cmd_id++;
+ // Lazily allocate the memory used for reading & parsing ftrace.
+ if (!parsing_mem_.IsValid()) {
+ parsing_mem_ =
+ base::PagedMemory::Allocate(base::kPageSize * kParsingBufferSizePages);
}
- generation_++;
cpu_readers_.clear();
cpu_readers_.reserve(ftrace_procfs_->NumberOfCpus());
for (size_t cpu = 0; cpu < ftrace_procfs_->NumberOfCpus(); cpu++) {
cpu_readers_.emplace_back(
- new CpuReader(table_.get(), &thread_sync_, cpu, generation_,
- ftrace_procfs_->OpenPipeForCpu(cpu)));
+ new CpuReader(table_.get(), cpu, ftrace_procfs_->OpenPipeForCpu(cpu)));
}
+
+ // Start the repeating read tasks.
+ auto generation = ++generation_;
+ auto drain_period_ms = GetDrainPeriodMs();
+ auto weak_this = weak_factory_.GetWeakPtr();
+ task_runner_->PostDelayedTask(
+ [weak_this, generation] {
+ if (weak_this)
+ weak_this->ReadTick(generation);
+ },
+ drain_period_ms - (NowMs() % drain_period_ms));
+}
+
+void FtraceController::ReadTick(int generation) {
+ metatrace::ScopedEvent evt(metatrace::TAG_FTRACE,
+ metatrace::FTRACE_READ_TICK);
+ if (started_data_sources_.empty() || generation != generation_) {
+ return;
+ }
+
+ bool all_cpus_caught_up = ReadAllCpuBuffers(kMaxPagesPerCpuPerReadTick);
+
+ // The reading for a given cpu will read at most |kMaxPagesPerCpuPerReadTick|
+ // pages. If we hit this limit on at least one cpu, repost the task
+ // at the end of the immediate queue, letting the other tasks get some cpu
+ // time before we continue catching up with the event stream.
+ auto weak_this = weak_factory_.GetWeakPtr();
+ if (!all_cpus_caught_up) {
+ PERFETTO_DLOG("Reposting immediate ReadTick as there's more work.");
+ task_runner_->PostTask([weak_this, generation] {
+ if (weak_this)
+ weak_this->ReadTick(generation);
+ });
+ } else {
+ // Done until next drain period.
+ auto drain_period_ms = GetDrainPeriodMs();
+ task_runner_->PostDelayedTask(
+ [weak_this, generation] {
+ if (weak_this)
+ weak_this->ReadTick(generation);
+ },
+ drain_period_ms - (NowMs() % drain_period_ms));
+ }
+}
+
+bool FtraceController::ReadAllCpuBuffers(size_t max_pages) {
+ PERFETTO_DCHECK(parsing_mem_.IsValid() &&
+ parsing_mem_.size() ==
+ base::kPageSize * kParsingBufferSizePages);
+
+ bool all_cpus_caught_up = true;
+ auto* parsing_buf = reinterpret_cast<uint8_t*>(parsing_mem_.Get());
+ for (auto& cpu_reader : cpu_readers_) {
+ size_t pages_read = cpu_reader->ReadCycle(
+ parsing_buf, kParsingBufferSizePages, max_pages, started_data_sources_);
+
+ // ReadCycle gave up early as it was doing too much work.
+ if (pages_read == max_pages)
+ all_cpus_caught_up = false;
+ }
+ observer_->OnFtraceDataWrittenIntoDataSourceBuffers();
+ return all_cpus_caught_up;
}
uint32_t FtraceController::GetDrainPeriodMs() {
@@ -337,45 +272,19 @@
}
void FtraceController::Flush(FlushRequestID flush_id) {
- PERFETTO_DCHECK_THREAD(thread_checker_);
+ metatrace::ScopedEvent evt(metatrace::TAG_FTRACE,
+ metatrace::FTRACE_CPU_FLUSH);
- if (flush_id == cur_flush_request_id_)
- return; // Already dealing with this flush request.
+ // Read all cpus in one go, limiting the per-cpu buffer size to make sure we
+ // don't get stuck chasing the writer if there's a very high bandwidth of
+ // events.
+ // TODO(rsavitski): use the active buffer size as the limit, instead of the
+ // largest one we allow.
+ size_t max_pages_per_cpu = kMaxPerCpuBufferSizeKb / (base::kPageSize / 1024);
+ ReadAllCpuBuffers(max_pages_per_cpu);
- cur_flush_request_id_ = flush_id;
- {
- std::unique_lock<std::mutex> lock(thread_sync_.mutex);
- thread_sync_.flush_acks.reset();
- IssueThreadSyncCmd(FtraceThreadSync::kFlush, std::move(lock));
- }
-
- base::WeakPtr<FtraceController> weak_this = weak_factory_.GetWeakPtr();
- task_runner_->PostDelayedTask(
- [weak_this, flush_id] {
- if (weak_this)
- weak_this->OnFlushTimeout(flush_id);
- },
- kControllerFlushTimeoutMs);
-}
-
-void FtraceController::OnFlushTimeout(FlushRequestID flush_request_id) {
- if (flush_request_id != cur_flush_request_id_)
- return;
-
- std::string acks; // For debugging purposes only.
- {
- // Unlock the cpu readers and move on.
- std::unique_lock<std::mutex> lock(thread_sync_.mutex);
- acks = thread_sync_.flush_acks.to_string();
- thread_sync_.flush_acks.reset();
- if (thread_sync_.cmd == FtraceThreadSync::kFlush)
- IssueThreadSyncCmd(FtraceThreadSync::kRun, std::move(lock));
- }
-
- PERFETTO_ELOG("Ftrace flush(%" PRIu64 ") timed out. Acked cpus mask: [%s]",
- flush_request_id, acks.c_str());
- cur_flush_request_id_ = 0;
- NotifyFlushCompleteToStartedDataSources(flush_request_id);
+ for (FtraceDataSource* data_source : started_data_sources_)
+ data_source->OnFtraceFlushComplete(flush_id);
}
void FtraceController::StopIfNeeded() {
@@ -386,15 +295,14 @@
// ask for an explicit flush before stopping, unless it needs to perform a
// non-graceful stop.
- IssueThreadSyncCmd(FtraceThreadSync::kQuit);
-
- // Destroying the CpuReader(s) will join on their worker threads.
cpu_readers_.clear();
- generation_++;
+
+ if (parsing_mem_.IsValid()) {
+ parsing_mem_.AdviseDontNeed(parsing_mem_.Get(), parsing_mem_.size());
+ }
}
bool FtraceController::AddDataSource(FtraceDataSource* data_source) {
- PERFETTO_DCHECK_THREAD(thread_checker_);
if (!ValidConfig(data_source->config()))
return false;
@@ -410,7 +318,6 @@
}
bool FtraceController::StartDataSource(FtraceDataSource* data_source) {
- PERFETTO_DCHECK_THREAD(thread_checker_);
PERFETTO_DCHECK(data_sources_.count(data_source) > 0);
FtraceConfigId config_id = data_source->config_id();
@@ -425,7 +332,6 @@
}
void FtraceController::RemoveDataSource(FtraceDataSource* data_source) {
- PERFETTO_DCHECK_THREAD(thread_checker_);
started_data_sources_.erase(data_source);
size_t removed = data_sources_.erase(data_source);
if (!removed)
@@ -438,43 +344,6 @@
DumpAllCpuStats(ftrace_procfs_.get(), stats);
}
-void FtraceController::IssueThreadSyncCmd(
- FtraceThreadSync::Cmd cmd,
- std::unique_lock<std::mutex> pass_lock_from_caller) {
- PERFETTO_DCHECK_THREAD(thread_checker_);
- {
- std::unique_lock<std::mutex> lock(std::move(pass_lock_from_caller));
- if (!lock.owns_lock())
- lock = std::unique_lock<std::mutex>(thread_sync_.mutex);
-
- if (thread_sync_.cmd == FtraceThreadSync::kQuit &&
- cmd != FtraceThreadSync::kQuit) {
- // If in kQuit state, we should never issue any other commands.
- return;
- }
-
- thread_sync_.cmd = cmd;
- thread_sync_.cmd_id++;
- }
-
- // Send a SIGPIPE to all worker threads to wake them up if they are sitting in
- // a blocking splice(). If they are not and instead they are sitting in the
- // cond-variable.wait(), this, together with the one below, will have at best
- // the same effect of a spurious wakeup, depending on the implementation of
- // the condition variable.
- for (const auto& cpu_reader : cpu_readers_)
- cpu_reader->InterruptWorkerThreadWithSignal();
-
- thread_sync_.cond.notify_all();
-}
-
-void FtraceController::NotifyFlushCompleteToStartedDataSources(
- FlushRequestID flush_request_id) {
- PERFETTO_DCHECK_THREAD(thread_checker_);
- for (FtraceDataSource* data_source : started_data_sources_)
- data_source->OnFtraceFlushComplete(flush_request_id);
-}
-
FtraceController::Observer::~Observer() = default;
} // namespace perfetto
diff --git a/src/traced/probes/ftrace/ftrace_controller.h b/src/traced/probes/ftrace/ftrace_controller.h
index 63514b7..bee7d27 100644
--- a/src/traced/probes/ftrace/ftrace_controller.h
+++ b/src/traced/probes/ftrace/ftrace_controller.h
@@ -28,11 +28,11 @@
#include <string>
#include "perfetto/base/task_runner.h"
+#include "perfetto/ext/base/paged_memory.h"
#include "perfetto/ext/base/utils.h"
#include "perfetto/ext/base/weak_ptr.h"
#include "perfetto/ext/tracing/core/basic_types.h"
#include "src/traced/probes/ftrace/ftrace_config_utils.h"
-#include "src/traced/probes/ftrace/ftrace_thread_sync.h"
namespace perfetto {
@@ -61,10 +61,6 @@
static std::unique_ptr<FtraceController> Create(base::TaskRunner*, Observer*);
virtual ~FtraceController();
- // These two methods are called by CpuReader(s) from their worker threads.
- static void OnCpuReaderRead(size_t cpu, int generation, FtraceThreadSync*);
- static void OnCpuReaderFlush(size_t cpu, int generation, FtraceThreadSync*);
-
void DisableAllEvents();
void WriteTraceMarker(const std::string& s);
void ClearTrace();
@@ -92,8 +88,6 @@
base::TaskRunner*,
Observer*);
- virtual void OnDrainCpuForTesting(size_t /*cpu*/) {}
-
// Protected and virtual for testing.
virtual uint64_t NowMs() const;
@@ -103,12 +97,12 @@
FtraceController(const FtraceController&) = delete;
FtraceController& operator=(const FtraceController&) = delete;
- void OnFlushTimeout(FlushRequestID);
- void DrainCPUs(int generation);
- void UnblockReaders();
- void NotifyFlushCompleteToStartedDataSources(FlushRequestID);
- void IssueThreadSyncCmd(FtraceThreadSync::Cmd,
- std::unique_lock<std::mutex> = {});
+ // Periodic task that reads all per-cpu ftrace buffers.
+ void ReadTick(int generation);
+ // Returns true if we've caught up with the ftrace events for all cpus. False
+ // if we hit |max_pages| on at least one cpu, and are therefore stopping
+ // early (to not hog the thread for too long).
+ bool ReadAllCpuBuffers(size_t max_pages);
uint32_t GetDrainPeriodMs();
@@ -117,17 +111,15 @@
base::TaskRunner* const task_runner_;
Observer* const observer_;
- FtraceThreadSync thread_sync_;
+ base::PagedMemory parsing_mem_;
std::unique_ptr<FtraceProcfs> ftrace_procfs_;
std::unique_ptr<ProtoTranslationTable> table_;
std::unique_ptr<FtraceConfigMuxer> ftrace_config_muxer_;
int generation_ = 0;
- FlushRequestID cur_flush_request_id_ = 0;
bool atrace_running_ = false;
std::vector<std::unique_ptr<CpuReader>> cpu_readers_;
std::set<FtraceDataSource*> data_sources_;
std::set<FtraceDataSource*> started_data_sources_;
- PERFETTO_THREAD_CHECKER(thread_checker_)
base::WeakPtrFactory<FtraceController> weak_factory_; // Keep last.
};
diff --git a/src/traced/probes/ftrace/ftrace_controller_unittest.cc b/src/traced/probes/ftrace/ftrace_controller_unittest.cc
index abf6aa6..ac55677 100644
--- a/src/traced/probes/ftrace/ftrace_controller_unittest.cc
+++ b/src/traced/probes/ftrace/ftrace_controller_unittest.cc
@@ -38,13 +38,14 @@
using testing::_;
using testing::AnyNumber;
using testing::ByMove;
-using testing::Invoke;
-using testing::NiceMock;
-using testing::MatchesRegex;
-using testing::Return;
-using testing::IsEmpty;
using testing::ElementsAre;
+using testing::Invoke;
+using testing::IsEmpty;
+using testing::MatchesRegex;
+using testing::Mock;
+using testing::NiceMock;
using testing::Pair;
+using testing::Return;
using Table = perfetto::ProtoTranslationTable;
using FtraceEventBundle = perfetto::protos::pbzero::FtraceEventBundle;
@@ -58,47 +59,11 @@
class MockTaskRunner : public base::TaskRunner {
public:
- MockTaskRunner() {
- ON_CALL(*this, PostTask(_))
- .WillByDefault(Invoke(this, &MockTaskRunner::OnPostTask));
- ON_CALL(*this, PostDelayedTask(_, _))
- .WillByDefault(Invoke(this, &MockTaskRunner::OnPostDelayedTask));
- }
-
- void OnPostTask(std::function<void()> task) {
- std::unique_lock<std::mutex> lock(lock_);
- EXPECT_FALSE(task_);
- task_ = std::move(task);
- }
-
- void OnPostDelayedTask(std::function<void()> task, int /*delay*/) {
- std::unique_lock<std::mutex> lock(lock_);
- EXPECT_FALSE(task_);
- task_ = std::move(task);
- }
-
- void RunLastTask() {
- auto task = TakeTask();
- if (task)
- task();
- }
-
- std::function<void()> TakeTask() {
- std::unique_lock<std::mutex> lock(lock_);
- auto task(std::move(task_));
- task_ = std::function<void()>();
- return task;
- }
-
MOCK_METHOD1(PostTask, void(std::function<void()>));
MOCK_METHOD2(PostDelayedTask, void(std::function<void()>, uint32_t delay_ms));
MOCK_METHOD2(AddFileDescriptorWatch, void(int fd, std::function<void()>));
MOCK_METHOD1(RemoveFileDescriptorWatch, void(int fd));
MOCK_CONST_METHOD0(RunsTasksOnCurrentThread, bool());
-
- private:
- std::mutex lock_;
- std::function<void()> task_;
};
std::unique_ptr<Table> FakeTable(FtraceProcfs* ftrace) {
@@ -214,34 +179,11 @@
runner_(std::move(runner)),
procfs_(raw_procfs) {}
- MOCK_METHOD1(OnDrainCpuForTesting, void(size_t cpu));
-
MockTaskRunner* runner() { return runner_.get(); }
MockFtraceProcfs* procfs() { return procfs_; }
-
uint64_t NowMs() const override { return now_ms; }
-
uint32_t drain_period_ms() { return GetDrainPeriodMs(); }
- std::function<void()> GetDataAvailableCallback(size_t cpu) {
- int generation = generation_;
- auto* thread_sync = &thread_sync_;
- return [cpu, generation, thread_sync] {
- FtraceController::OnCpuReaderRead(cpu, generation, thread_sync);
- };
- }
-
- void WaitForData(size_t cpu) {
- for (;;) {
- {
- std::unique_lock<std::mutex> lock(thread_sync_.mutex);
- if (thread_sync_.cpus_to_drain[cpu])
- return;
- }
- usleep(5000);
- }
- }
-
std::unique_ptr<FtraceDataSource> AddFakeDataSource(const FtraceConfig& cfg) {
std::unique_ptr<FtraceDataSource> data_source(new FtraceDataSource(
GetWeakPtr(), 0 /* session id */, cfg, nullptr /* trace_writer */));
@@ -265,15 +207,10 @@
namespace {
std::unique_ptr<TestFtraceController> CreateTestController(
- bool runner_is_nice_mock,
bool procfs_is_nice_mock,
size_t cpu_count = 1) {
- std::unique_ptr<MockTaskRunner> runner;
- if (runner_is_nice_mock) {
- runner = std::unique_ptr<MockTaskRunner>(new NiceMock<MockTaskRunner>());
- } else {
- runner = std::unique_ptr<MockTaskRunner>(new MockTaskRunner());
- }
+ std::unique_ptr<MockTaskRunner> runner =
+ std::unique_ptr<MockTaskRunner>(new NiceMock<MockTaskRunner>());
std::unique_ptr<MockFtraceProcfs> ftrace_procfs;
if (procfs_is_nice_mock) {
@@ -297,16 +234,14 @@
} // namespace
TEST(FtraceControllerTest, NonExistentEventsDontCrash) {
- auto controller =
- CreateTestController(true /* nice runner */, true /* nice procfs */);
+ auto controller = CreateTestController(true /* nice procfs */);
FtraceConfig config = CreateFtraceConfig({"not_an_event"});
EXPECT_TRUE(controller->AddFakeDataSource(config));
}
TEST(FtraceControllerTest, RejectsBadEventNames) {
- auto controller =
- CreateTestController(true /* nice runner */, true /* nice procfs */);
+ auto controller = CreateTestController(true /* nice procfs */);
FtraceConfig config = CreateFtraceConfig({"../try/to/escape"});
EXPECT_FALSE(controller->AddFakeDataSource(config));
@@ -317,8 +252,10 @@
}
TEST(FtraceControllerTest, OneSink) {
- auto controller =
- CreateTestController(true /* nice runner */, false /* nice procfs */);
+ auto controller = CreateTestController(false /* nice procfs */);
+
+ // No read tasks posted as part of adding the data source.
+ EXPECT_CALL(*controller->runner(), PostDelayedTask(_, _)).Times(0);
FtraceConfig config = CreateFtraceConfig({"group/foo"});
@@ -327,9 +264,18 @@
auto data_source = controller->AddFakeDataSource(config);
ASSERT_TRUE(data_source);
+ // Verify that no read tasks have been posted. And set up expectation that
+ // a single recurring read task will be posted as part of starting the data
+ // source.
+ Mock::VerifyAndClearExpectations(controller->runner());
+ EXPECT_CALL(*controller->runner(), PostDelayedTask(_, _)).Times(1);
+
EXPECT_CALL(*controller->procfs(), WriteToFile("/root/tracing_on", "1"));
ASSERT_TRUE(controller->StartDataSource(data_source.get()));
+ // Verify single posted read task.
+ Mock::VerifyAndClearExpectations(controller->runner());
+
EXPECT_CALL(*controller->procfs(), WriteToFile("/root/buffer_size_kb", "0"));
EXPECT_CALL(*controller->procfs(), ClearFile("/root/trace"))
.WillOnce(Return(true));
@@ -346,22 +292,33 @@
}
TEST(FtraceControllerTest, MultipleSinks) {
- auto controller =
- CreateTestController(false /* nice runner */, false /* nice procfs */);
+ auto controller = CreateTestController(false /* nice procfs */);
FtraceConfig configA = CreateFtraceConfig({"group/foo"});
FtraceConfig configB = CreateFtraceConfig({"group/foo", "group/bar"});
+ // No read tasks posted as part of adding the data sources.
+ EXPECT_CALL(*controller->runner(), PostDelayedTask(_, _)).Times(0);
+
EXPECT_CALL(*controller->procfs(), WriteToFile("/root/buffer_size_kb", _));
EXPECT_CALL(*controller->procfs(), WriteToFile(kFooEnablePath, "1"));
auto data_sourceA = controller->AddFakeDataSource(configA);
EXPECT_CALL(*controller->procfs(), WriteToFile(kBarEnablePath, "1"));
auto data_sourceB = controller->AddFakeDataSource(configB);
+ // Verify that no read tasks have been posted. And set up expectation that
+ // a single recurring read task will be posted as part of starting the data
+ // sources.
+ Mock::VerifyAndClearExpectations(controller->runner());
+ EXPECT_CALL(*controller->runner(), PostDelayedTask(_, _)).Times(1);
+
EXPECT_CALL(*controller->procfs(), WriteToFile("/root/tracing_on", "1"));
ASSERT_TRUE(controller->StartDataSource(data_sourceA.get()));
ASSERT_TRUE(controller->StartDataSource(data_sourceB.get()));
+ // Verify single posted read task.
+ Mock::VerifyAndClearExpectations(controller->runner());
+
data_sourceA.reset();
EXPECT_CALL(*controller->procfs(), WriteToFile(kFooEnablePath, "0"));
@@ -376,8 +333,7 @@
}
TEST(FtraceControllerTest, ControllerMayDieFirst) {
- auto controller =
- CreateTestController(false /* nice runner */, false /* nice procfs */);
+ auto controller = CreateTestController(false /* nice procfs */);
FtraceConfig config = CreateFtraceConfig({"group/foo"});
@@ -401,51 +357,8 @@
data_source.reset();
}
-TEST(FtraceControllerTest, BackToBackEnableDisable) {
- auto controller =
- CreateTestController(false /* nice runner */, false /* nice procfs */);
-
- // For this test we don't care about calls to WriteToFile/ClearFile.
- EXPECT_CALL(*controller->procfs(), WriteToFile(_, _)).Times(AnyNumber());
- EXPECT_CALL(*controller->procfs(), ClearFile(_)).Times(AnyNumber());
- EXPECT_CALL(*controller->procfs(), ReadOneCharFromFile("/root/tracing_on"))
- .Times(AnyNumber());
-
- EXPECT_CALL(*controller->runner(), PostTask(_)).Times(2);
- EXPECT_CALL(*controller->runner(), PostDelayedTask(_, 100)).Times(2);
- FtraceConfig config = CreateFtraceConfig({"group/foo"});
- auto data_source = controller->AddFakeDataSource(config);
- ASSERT_TRUE(controller->StartDataSource(data_source.get()));
-
- auto on_data_available = controller->GetDataAvailableCallback(0u);
- std::thread worker([on_data_available] { on_data_available(); });
- controller->WaitForData(0u);
-
- // Disable the first data source and run the delayed task that it generated.
- // It should be a no-op.
- worker.join();
- data_source.reset();
- controller->runner()->RunLastTask();
- controller->runner()->RunLastTask();
-
- // Register another data source and wait for it to generate data.
- data_source = controller->AddFakeDataSource(config);
- ASSERT_TRUE(controller->StartDataSource(data_source.get()));
-
- on_data_available = controller->GetDataAvailableCallback(0u);
- std::thread worker2([on_data_available] { on_data_available(); });
- controller->WaitForData(0u);
-
- // This drain should also be a no-op after the data source is unregistered.
- worker2.join();
- data_source.reset();
- controller->runner()->RunLastTask();
- controller->runner()->RunLastTask();
-}
-
TEST(FtraceControllerTest, BufferSize) {
- auto controller =
- CreateTestController(true /* nice runner */, false /* nice procfs */);
+ auto controller = CreateTestController(false /* nice procfs */);
// For this test we don't care about most calls to WriteToFile/ClearFile.
EXPECT_CALL(*controller->procfs(), WriteToFile(_, _)).Times(AnyNumber());
@@ -514,8 +427,7 @@
}
TEST(FtraceControllerTest, PeriodicDrainConfig) {
- auto controller =
- CreateTestController(true /* nice runner */, false /* nice procfs */);
+ auto controller = CreateTestController(false /* nice procfs */);
// For this test we don't care about calls to WriteToFile/ClearFile.
EXPECT_CALL(*controller->procfs(), WriteToFile(_, _)).Times(AnyNumber());
diff --git a/src/traced/probes/ftrace/ftrace_thread_sync.h b/src/traced/probes/ftrace/ftrace_thread_sync.h
deleted file mode 100644
index fbfa3fd..0000000
--- a/src/traced/probes/ftrace/ftrace_thread_sync.h
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Copyright (C) 2018 The Android Open Source Project
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef SRC_TRACED_PROBES_FTRACE_FTRACE_THREAD_SYNC_H_
-#define SRC_TRACED_PROBES_FTRACE_FTRACE_THREAD_SYNC_H_
-
-#include <stdint.h>
-
-#include <bitset>
-#include <condition_variable>
-#include <mutex>
-
-#include "perfetto/ext/base/utils.h"
-#include "perfetto/ext/base/weak_ptr.h"
-
-namespace perfetto {
-
-namespace base {
-class TaskRunner;
-} // namespace base
-
-class FtraceController;
-
-// This struct is accessed both by the FtraceController on the main thread and
-// by the CpuReader(s) on their worker threads. It is used to synchronize
-// handshakes between FtraceController and CpuReader(s). There is only *ONE*
-// instance of this state, owned by the FtraceController and shared with all
-// CpuReader(s).
-struct FtraceThreadSync {
- explicit FtraceThreadSync(base::TaskRunner* tr) : task_runner(tr) {}
-
- // These variables are set upon initialization time and never changed. Can
- // be accessed outside of the |mutex|.
- base::TaskRunner* const task_runner; // Where the FtraceController lives.
- base::WeakPtr<FtraceController> trace_controller_weak;
-
- // Mutex & condition variable shared by main thread and all per-cpu workers.
- // All fields below are read and modified holding |mutex|.
- std::mutex mutex;
-
- // Used to suspend CpuReader(s) between cycles and to wake them up at the
- // same time.
- std::condition_variable cond;
-
- // |cmd| and |cmd_id| are written only by FtraceController. On each cycle,
- // FtraceController increases the |cmd_id| monotonic counter and issues the
- // new command. |cmd_id| is used by the CpuReader(s) to distinguish a new
- // command from a spurious wakeup.
- enum Cmd { kRun = 0, kFlush, kQuit };
- Cmd cmd = kRun;
- uint64_t cmd_id = 0;
-
- // This bitmap is cleared by the FtraceController before every kRun command
- // and is optionally set by OnDataAvailable() if a CpuReader did fetch any
- // ftrace data during the read cycle.
- std::bitset<base::kMaxCpus> cpus_to_drain;
-
- // This bitmap is cleared by the FtraceController before issuing a kFlush
- // command and set by each CpuReader after they have completed the flush.
- std::bitset<base::kMaxCpus> flush_acks;
-};
-
-} // namespace perfetto
-
-#endif // SRC_TRACED_PROBES_FTRACE_FTRACE_THREAD_SYNC_H_
diff --git a/src/traced/probes/ftrace/page_pool.cc b/src/traced/probes/ftrace/page_pool.cc
deleted file mode 100644
index f66309d..0000000
--- a/src/traced/probes/ftrace/page_pool.cc
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Copyright (C) 2018 The Android Open Source Project
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "src/traced/probes/ftrace/page_pool.h"
-
-#include <array>
-
-namespace perfetto {
-
-namespace {
-constexpr size_t kMaxFreelistBlocks = 128; // 128 * 32 * 4KB = 16MB.
-}
-
-void PagePool::NewPageBlock() {
- std::lock_guard<std::mutex> lock(mutex_);
- if (freelist_.empty()) {
- write_queue_.emplace_back(PageBlock::Create());
- } else {
- write_queue_.emplace_back(std::move(freelist_.back()));
- freelist_.pop_back();
- }
- PERFETTO_DCHECK(write_queue_.back().size() == 0);
-}
-
-void PagePool::EndRead(std::vector<PageBlock> page_blocks) {
- PERFETTO_DCHECK_THREAD(reader_thread_);
- for (PageBlock& page_block : page_blocks)
- page_block.Clear();
-
- std::lock_guard<std::mutex> lock(mutex_);
- freelist_.insert(freelist_.end(),
- std::make_move_iterator(page_blocks.begin()),
- std::make_move_iterator(page_blocks.end()));
-
- // Even if blocks in the freelist don't waste any resident memory (because
- // the Clear() call above madvise()s them) let's avoid that in pathological
- // cases we keep accumulating virtual address space reservations.
- if (freelist_.size() > kMaxFreelistBlocks)
- freelist_.erase(freelist_.begin() + kMaxFreelistBlocks, freelist_.end());
-}
-
-} // namespace perfetto
diff --git a/src/traced/probes/ftrace/page_pool.h b/src/traced/probes/ftrace/page_pool.h
deleted file mode 100644
index de5c250..0000000
--- a/src/traced/probes/ftrace/page_pool.h
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Copyright (C) 2018 The Android Open Source Project
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License At
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef SRC_TRACED_PROBES_FTRACE_PAGE_POOL_H_
-#define SRC_TRACED_PROBES_FTRACE_PAGE_POOL_H_
-
-#include <stdint.h>
-
-#include <mutex>
-#include <vector>
-
-#include "perfetto/base/logging.h"
-#include "perfetto/ext/base/optional.h"
-#include "perfetto/ext/base/paged_memory.h"
-#include "perfetto/ext/base/thread_checker.h"
-#include "perfetto/ext/base/utils.h"
-
-namespace perfetto {
-
-// This class is a page pool tailored around the needs of the ftrace CpuReader.
-// It has two responsibilities:
-// 1) A cheap bump-pointer page allocator for the writing side of CpuReader.
-// 2) A thread-safe producer/consumer queue to synchronize the read/write
-// threads of CpuReader.
-// For context, CpuReader (and hence this class) is used on two threads:
-// (1) A worker thread that writes into the buffer and (2) the main thread which
-// reads all the content in big batches and turn them into protos.
-// There is at most one thread writing and at most one thread reading. In rare
-// circumstances they can be active At the same time.
-// This class is optimized for the following use case:
-// - Most of the times CpuReader wants to write 4096 bytes. In some rare cases
-// (read() during flush) it wants to write < 4096 bytes.
-// - Even when it writes < 4096 bytes, CpuReader can figure out the size of the
-// payload from the ftrace header. We don't need extra tracking to tell how
-// much of each page is used.
-// - Doing a syscall for each page write is overkill. In most occasions
-// CpuReader writes bursts of several pages in one go.
-// - We can't really predict upfront how big the write bursts will be, hence we
-// cannot predict the size of the pool, unless we accept a very high bound.
-// In extreme, yet rare, conditions, CpuReader will read the whole per-cpu
-// ftrace buffer, while the reader is still reading the previous batch.
-// - Write burst should not be too frequent, so once they are over it's worth
-// spending some extra cycles to release the memory.
-// - The reader side always wants to read *all* the written pages in one batch.
-// While this happens though, the write might want to write more.
-//
-// The architecture of this class is as follows. Pages are organized in
-// PageBlock(s). A PageBlock is simply an array of pages and is the elementary
-// unit of memory allocation and frees. Pages within one block are cheaply
-// allocated with a simple bump-pointer allocator.
-//
-// [ Writer (thread worker) ] | [ Reader (main thread) ]
-// ~~~~~~~~~~~~~~~~~~~~~
-// +---> write queue ------------> ready queue --+
-// | |
-// +------------------------------- freelist <---+
-// ~~~~~~~~~~~~~~~~~~~~~
-// ~ mutex protected ~
-// ~~~~~~~~~~~~~~~~~~~~~
-class PagePool {
- public:
- class PageBlock {
- public:
- static constexpr size_t kPagesPerBlock = 32; // 32 * 4KB = 128 KB.
- static constexpr size_t kBlockSize = kPagesPerBlock * base::kPageSize;
-
- // This factory method is just that we accidentally create extra blocks
- // without realizing by triggering the default constructor in containers.
- static PageBlock Create() { return PageBlock(); }
-
- PageBlock(PageBlock&&) noexcept = default;
- PageBlock& operator=(PageBlock&&) = default;
-
- size_t size() const { return size_; }
- bool IsFull() const { return size_ >= kPagesPerBlock; }
-
- // Returns the pointer to the contents of the i-th page in the block.
- uint8_t* At(size_t i) const {
- PERFETTO_DCHECK(i < kPagesPerBlock);
- return reinterpret_cast<uint8_t*>(mem_.Get()) + i * base::kPageSize;
- }
-
- uint8_t* CurPage() const { return At(size_); }
-
- void NextPage() {
- PERFETTO_DCHECK(!IsFull());
- size_++;
- }
-
- // Releases memory of the block and marks it available for reuse.
- void Clear() {
- size_ = 0;
- mem_.AdviseDontNeed(mem_.Get(), kBlockSize);
- }
-
- private:
- PageBlock(const PageBlock&) = delete;
- PageBlock& operator=(const PageBlock&) = delete;
- PageBlock() { mem_ = base::PagedMemory::Allocate(kBlockSize); }
-
- base::PagedMemory mem_;
- size_t size_ = 0;
- };
-
- PagePool() {
- PERFETTO_DETACH_FROM_THREAD(writer_thread_);
- PERFETTO_DETACH_FROM_THREAD(reader_thread_);
- }
-
- // Grabs a new page, eventually allocating a whole new PageBlock.
- // If contents are written to the page, the caller must call EndWrite().
- // If no data is written, it is okay to leave the BeginWrite() unpaired
- // (e.g., in case of a non-blocking read returning no data) and call again
- // BeginWrite() in the future.
- uint8_t* BeginWrite() {
- PERFETTO_DCHECK_THREAD(writer_thread_);
- if (write_queue_.empty() || write_queue_.back().IsFull())
- NewPageBlock(); // Slowpath. Tries the freelist first, then allocates.
- return write_queue_.back().CurPage();
- }
-
- // Marks the last page as written and bumps the write pointer.
- void EndWrite() {
- PERFETTO_DCHECK_THREAD(writer_thread_);
- PERFETTO_DCHECK(!write_queue_.empty() && !write_queue_.back().IsFull());
- write_queue_.back().NextPage();
- }
-
- // Makes all written pages available to the reader.
- // Returns an upper bound on the number of pages written.
- size_t CommitWrittenPages() {
- PERFETTO_DCHECK_THREAD(writer_thread_);
- size_t size = write_queue_.size() * PagePool::PageBlock::kPagesPerBlock;
- std::lock_guard<std::mutex> lock(mutex_);
- read_queue_.insert(read_queue_.end(),
- std::make_move_iterator(write_queue_.begin()),
- std::make_move_iterator(write_queue_.end()));
- write_queue_.clear();
- return size;
- }
-
- // Moves ownership of all the page blocks in the read queue to the caller.
- // The caller is expected to move them back after reading through EndRead().
- // PageBlocks will be freed if the caller doesn't call EndRead().
- std::vector<PageBlock> BeginRead() {
- PERFETTO_DCHECK_THREAD(reader_thread_);
- std::lock_guard<std::mutex> lock(mutex_);
- auto res = std::move(read_queue_);
- read_queue_.clear();
- return res;
- }
-
- // Returns the page blocks borrowed for read and makes them available for
- // reuse. This allows the writer to avoid doing syscalls after the initial
- // writes.
- void EndRead(std::vector<PageBlock> page_blocks);
-
- size_t freelist_size_for_testing() const { return freelist_.size(); }
-
- private:
- PagePool(const PagePool&) = delete;
- PagePool& operator=(const PagePool&) = delete;
- void NewPageBlock();
-
- PERFETTO_THREAD_CHECKER(writer_thread_)
- std::vector<PageBlock> write_queue_; // Accessed exclusively by the writer.
-
- std::mutex mutex_; // Protects both the read queue and the freelist.
-
- PERFETTO_THREAD_CHECKER(reader_thread_)
- std::vector<PageBlock> read_queue_; // Accessed by both threads.
- std::vector<PageBlock> freelist_; // Accessed by both threads.
-};
-
-} // namespace perfetto
-
-#endif // SRC_TRACED_PROBES_FTRACE_PAGE_POOL_H_
diff --git a/src/traced/probes/ftrace/page_pool_unittest.cc b/src/traced/probes/ftrace/page_pool_unittest.cc
deleted file mode 100644
index e99b7cb..0000000
--- a/src/traced/probes/ftrace/page_pool_unittest.cc
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Copyright (C) 2018 The Android Open Source Project
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "src/traced/probes/ftrace/page_pool.h"
-
-#include <array>
-#include <mutex>
-#include <random>
-#include <thread>
-#include <vector>
-
-#include <gtest/gtest.h>
-
-namespace perfetto {
-namespace {
-
-TEST(PagePoolTest, SingleThreaded) {
- PagePool pool;
- for (int i = 0; i < 2; i++)
- ASSERT_TRUE(pool.BeginRead().empty());
-
- for (int repeat = 0; repeat < 3; repeat++) {
- for (uint32_t seed = 0; seed < 6; seed++) {
- uint8_t* page = pool.BeginWrite();
- std::minstd_rand0 rnd_engine(seed);
- std::generate(page, page + base::kPageSize, rnd_engine);
- // Deliberately make it so pages 3 is overwritten, so we should see only
- // pages 0, 1, 2, 4, 5.
- if (seed != 3)
- pool.EndWrite();
- }
-
- // No write should be visible until the CommitWrittenPages() call.
- ASSERT_TRUE(pool.BeginRead().empty());
-
- pool.CommitWrittenPages();
-
- auto blocks = pool.BeginRead();
- ASSERT_EQ(blocks.size(), 1);
- ASSERT_EQ(blocks[0].size(), 5);
- for (uint32_t i = 0; i < blocks[0].size(); i++) {
- auto seed = std::array<uint32_t, 5>{{0, 1, 2, 4, 5}}[i];
- const char* page = reinterpret_cast<const char*>(blocks[0].At(i));
- char expected[base::kPageSize];
- std::minstd_rand0 rnd_engine(seed);
- std::generate(expected, expected + base::kPageSize, rnd_engine);
- EXPECT_STREQ(page, expected);
- }
-
- pool.EndRead(std::move(blocks));
- ASSERT_EQ(pool.freelist_size_for_testing(), 1);
- }
-}
-
-TEST(PagePoolTest, MultiThreaded) {
- PagePool pool;
-
- // Generate some random content.
- std::vector<std::string> expected_pages;
- std::minstd_rand0 rnd_engine(0);
- for (int i = 0; i < 1000; i++) {
- expected_pages.emplace_back();
- std::string& page = expected_pages.back();
- page.resize(base::kPageSize);
- std::generate(page.begin(), page.end(), rnd_engine);
- }
-
- auto writer_fn = [&pool, &expected_pages] {
- std::minstd_rand0 rnd(0);
- for (const std::string& expected_page : expected_pages) {
- uint8_t* dst = pool.BeginWrite();
- memcpy(dst, expected_page.data(), base::kPageSize);
- pool.EndWrite();
- if (rnd() % 16 == 0)
- pool.CommitWrittenPages();
- }
- pool.CommitWrittenPages();
- };
-
- auto reader_fn = [&pool, &expected_pages] {
- for (size_t page_idx = 0; page_idx < expected_pages.size();) {
- auto blocks = pool.BeginRead();
- for (const auto& block : blocks) {
- for (size_t i = 0; i < block.size(); i++) {
- const char* page = reinterpret_cast<const char*>(block.At(i));
- EXPECT_EQ(expected_pages[page_idx],
- std::string(page, base::kPageSize));
- page_idx++;
- }
- }
- pool.EndRead(std::move(blocks));
- }
- };
-
- std::thread writer(writer_fn);
- std::thread reader(reader_fn);
- writer.join();
- reader.join();
-}
-
-} // namespace
-} // namespace perfetto