ftrace: write "last read event timestamp" to assist parsing of ring buffer traces

-- Problem: --

Ftrace data is recorded per-cpu, grouping series of events from a given
per-cpu kernel buffer into an ftrace_event_bundle packet. When a
perfetto trace uses RING_BUFFER, the oldest trace packet wrapping can
leave us with very imbalanced per-cpu state as entire bundles of events
are overwritten.

This in turn is problematic when interpreting certain ftrace events
requires pairing them up (e.g. slice begin/end) or otherwise tracking
transitions, and the related events are not constrained to a single cpu.

Consider the following scenario of a {begin, end} pair of events that
represent a tracing slice. Assuming there's one bundle per cpu:
  cpu0: [e...E..e..e]    <- bundle
             ^event end
  cpu1: [.B.....e..eee.] <- bundle
          ^event begin

It is possible for us to observe cpu1's events, while cpu0's have been
overwritten in the ring buffer. In such a case, a naive parsing of all
events would conclude that the slice began and never ended.

The above is just the simplest example, things get more complicated once
we start reasoning about different kernel tracing bandwidth across cpus
and userspace transcoding breaking up reads in terms of KBs of tracing
data instead of timestamps.

-- Solution: --

Since traced_probes is the exclusive reader of per-cpu kernel ring
buffers, it knows the latest "consumed" event's timestamp for that
buffer. By putting this timestamp into the *next* bundle, we're
recording that "all ftrace data in this bundle is guaranteed to cover
the time range starting from where the last read stopped".

Example sketch:
  Initial state of the per-cpu ring buffers:
    cpu0: .eeee.e.
    cpu1: ee...ee.
    cpu2: ..ee.ee.
  After a read pass, some data has been consumed from each buf:
    cpu0: XXXee.e.
    cpu1: XXXXXXe.
    cpu2: XXXe.ee.
  Now, on the *next* bundle per cpu, we'll be emitting how far the
  consumed ('X') area extends.

In turn, from the trace_processor perspective, we can drop events until
the earliest timestamp covered by all per-cpu event bundles. This is
the maximum of all per-cpu "valid from" timestamps.

Bug: 192586066
Change-Id: Iebc6952a2f2a6a399b8cde64f210e28a69c04111
diff --git a/protos/perfetto/trace/ftrace/ftrace_event_bundle.proto b/protos/perfetto/trace/ftrace/ftrace_event_bundle.proto
index 5bfab9d..a884396 100644
--- a/protos/perfetto/trace/ftrace/ftrace_event_bundle.proto
+++ b/protos/perfetto/trace/ftrace/ftrace_event_bundle.proto
@@ -112,6 +112,15 @@
     optional FtraceParseStatus status = 2;
   }
   repeated FtraceError error = 8;
+
+  // The timestamp (boottime) of the last event consumed from this per-cpu
+  // kernel buffer prior to starting this bundle. In other words: the last
+  // event in the previous bundle.
+  // Lets the trace processing find an initial timestamp after which ftrace
+  // data is known to be valid across all cpus. Of particular importance when
+  // the perfetto trace buffer is a ring buffer as well, as the overwriting of
+  // oldest bundles can skew the first valid timestamp per cpu significantly.
+  optional uint64 last_read_event_timestamp = 9;
 }
 
 enum FtraceClock {
diff --git a/protos/perfetto/trace/perfetto_trace.proto b/protos/perfetto/trace/perfetto_trace.proto
index a166cae..0bae4d2 100644
--- a/protos/perfetto/trace/perfetto_trace.proto
+++ b/protos/perfetto/trace/perfetto_trace.proto
@@ -10805,6 +10805,15 @@
     optional FtraceParseStatus status = 2;
   }
   repeated FtraceError error = 8;
+
+  // The timestamp (boottime) of the last event consumed from this per-cpu
+  // kernel buffer prior to starting this bundle. In other words: the last
+  // event in the previous bundle.
+  // Lets the trace processing find an initial timestamp after which ftrace
+  // data is known to be valid across all cpus. Of particular importance when
+  // the perfetto trace buffer is a ring buffer as well, as the overwriting of
+  // oldest bundles can skew the first valid timestamp per cpu significantly.
+  optional uint64 last_read_event_timestamp = 9;
 }
 
 enum FtraceClock {
diff --git a/src/traced/probes/ftrace/cpu_reader.cc b/src/traced/probes/ftrace/cpu_reader.cc
index c161203..462c992 100644
--- a/src/traced/probes/ftrace/cpu_reader.cc
+++ b/src/traced/probes/ftrace/cpu_reader.cc
@@ -114,8 +114,7 @@
 }
 
 // Reads a signed ftrace value as an int64_t, sign extending if necessary.
-static int64_t ReadSignedFtraceValue(const uint8_t* ptr,
-                                     FtraceFieldType ftrace_type) {
+int64_t ReadSignedFtraceValue(const uint8_t* ptr, FtraceFieldType ftrace_type) {
   if (ftrace_type == kFtraceInt32) {
     int32_t value;
     memcpy(&value, reinterpret_cast<const void*>(ptr), sizeof(value));
@@ -302,30 +301,36 @@
     ProcessPagesForDataSource(
         data_source->trace_writer(), data_source->mutable_metadata(), cpu_,
         data_source->parsing_config(), data_source->mutable_parse_errors(),
-        parsing_buf, pages_read, compact_sched_buf, table_, symbolizer_,
-        ftrace_clock_snapshot_, ftrace_clock_);
+        &last_read_event_ts_, parsing_buf, pages_read, compact_sched_buf,
+        table_, symbolizer_, ftrace_clock_snapshot_, ftrace_clock_);
   }
 
   return pages_read;
 }
 
-void CpuReader::Bundler::StartNewPacket(bool lost_events) {
+void CpuReader::Bundler::StartNewPacket(bool lost_events,
+                                        uint64_t last_read_event_timestamp) {
   FinalizeAndRunSymbolizer();
   packet_ = trace_writer_->NewTracePacket();
   bundle_ = packet_->set_ftrace_events();
-  if (ftrace_clock_) {
-    bundle_->set_ftrace_clock(ftrace_clock_);
-
-    if (ftrace_clock_snapshot_ && ftrace_clock_snapshot_->ftrace_clock_ts) {
-      bundle_->set_ftrace_timestamp(ftrace_clock_snapshot_->ftrace_clock_ts);
-      bundle_->set_boot_timestamp(ftrace_clock_snapshot_->boot_clock_ts);
-    }
-  }
 
   bundle_->set_cpu(static_cast<uint32_t>(cpu_));
   if (lost_events) {
     bundle_->set_lost_events(true);
   }
+
+  // note: set-to-zero is valid and expected for the first bundle per cpu
+  // (outside of concurrent tracing), with the effective meaning of "all data is
+  // valid since the data source was started".
+  bundle_->set_last_read_event_timestamp(last_read_event_timestamp);
+
+  if (ftrace_clock_) {
+    bundle_->set_ftrace_clock(ftrace_clock_);
+    if (ftrace_clock_snapshot_ && ftrace_clock_snapshot_->ftrace_clock_ts) {
+      bundle_->set_ftrace_timestamp(ftrace_clock_snapshot_->ftrace_clock_ts);
+      bundle_->set_boot_timestamp(ftrace_clock_snapshot_->boot_clock_ts);
+    }
+  }
 }
 
 void CpuReader::Bundler::FinalizeAndRunSymbolizer() {
@@ -409,6 +414,7 @@
     size_t cpu,
     const FtraceDataSourceConfig* ds_config,
     base::FlatSet<protos::pbzero::FtraceParseStatus>* parse_errors,
+    uint64_t* last_read_event_ts,
     const uint8_t* parsing_buf,
     const size_t pages_read,
     CompactSchedBuffer* compact_sched_buf,
@@ -420,7 +426,7 @@
   Bundler bundler(trace_writer, metadata,
                   ds_config->symbolize_ksyms ? symbolizer : nullptr, cpu,
                   ftrace_clock_snapshot, ftrace_clock, compact_sched_buf,
-                  ds_config->compact_sched.enabled);
+                  ds_config->compact_sched.enabled, *last_read_event_ts);
 
   bool success = true;
   size_t pages_parsed = 0;
@@ -456,11 +462,14 @@
             kCompactSchedInternerThreshold;
 
     if (page_header->lost_events || interner_past_threshold) {
-      bundler.StartNewPacket(page_header->lost_events);
+      // pass in an updated last_read_event_ts since we're starting a new
+      // bundle, which needs to reference the last timestamp from the prior one.
+      bundler.StartNewPacket(page_header->lost_events, *last_read_event_ts);
     }
 
-    FtraceParseStatus status = ParsePagePayload(
-        parse_pos, &page_header.value(), table, ds_config, &bundler, metadata);
+    FtraceParseStatus status =
+        ParsePagePayload(parse_pos, &page_header.value(), table, ds_config,
+                         &bundler, metadata, last_read_event_ts);
 
     if (status != FtraceParseStatus::FTRACE_STATUS_OK) {
       WriteAndSetParseError(&bundler, parse_errors, page_header->timestamp,
@@ -540,11 +549,13 @@
     const ProtoTranslationTable* table,
     const FtraceDataSourceConfig* ds_config,
     Bundler* bundler,
-    FtraceMetadata* metadata) {
+    FtraceMetadata* metadata,
+    uint64_t* last_read_event_ts) {
   const uint8_t* ptr = start_of_payload;
   const uint8_t* const end = ptr + page_header->size;
 
   uint64_t timestamp = page_header->timestamp;
+  uint64_t last_data_record_ts = 0;
 
   while (ptr < end) {
     EventHeader event_header;
@@ -674,11 +685,13 @@
             }
           }
         }
-        // Jump to next event.
-        ptr = next;
+        last_data_record_ts = timestamp;
+        ptr = next;  // jump to next event
       }  // default case
     }    // switch (event_header.type_or_length)
   }      // while (ptr < end)
+  if (last_data_record_ts)
+    *last_read_event_ts = last_data_record_ts;
   return FtraceParseStatus::FTRACE_STATUS_OK;
 }
 
@@ -715,7 +728,7 @@
                         protos::pbzero::FtraceEvent::kGenericFieldNumber)) {
     nested->AppendString(GenericFtraceEvent::kEventNameFieldNumber, info.name);
     for (const Field& field : info.fields) {
-      auto generic_field = nested->BeginNestedMessage<protozero::Message>(
+      auto* generic_field = nested->BeginNestedMessage<protozero::Message>(
           GenericFtraceEvent::kFieldFieldNumber);
       generic_field->AppendString(GenericFtraceEvent::Field::kNameFieldNumber,
                                   field.ftrace_name);
diff --git a/src/traced/probes/ftrace/cpu_reader.h b/src/traced/probes/ftrace/cpu_reader.h
index 9282901..821ba48 100644
--- a/src/traced/probes/ftrace/cpu_reader.h
+++ b/src/traced/probes/ftrace/cpu_reader.h
@@ -17,8 +17,8 @@
 #ifndef SRC_TRACED_PROBES_FTRACE_CPU_READER_H_
 #define SRC_TRACED_PROBES_FTRACE_CPU_READER_H_
 
-#include <stdint.h>
 #include <string.h>
+#include <cstdint>
 
 #include <optional>
 #include <set>
@@ -115,7 +115,8 @@
             const FtraceClockSnapshot* ftrace_clock_snapshot,
             protos::pbzero::FtraceClock ftrace_clock,
             CompactSchedBuffer* compact_sched_buf,
-            bool compact_sched_enabled)
+            bool compact_sched_enabled,
+            uint64_t last_read_event_ts)
         : trace_writer_(trace_writer),
           metadata_(metadata),
           symbolizer_(symbolizer),
@@ -123,7 +124,8 @@
           ftrace_clock_snapshot_(ftrace_clock_snapshot),
           ftrace_clock_(ftrace_clock),
           compact_sched_enabled_(compact_sched_enabled),
-          compact_sched_buf_(compact_sched_buf) {
+          compact_sched_buf_(compact_sched_buf),
+          initial_last_read_event_ts_(last_read_event_ts) {
       if (compact_sched_enabled_)
         compact_sched_buf_->Reset();
     }
@@ -132,13 +134,13 @@
 
     protos::pbzero::FtraceEventBundle* GetOrCreateBundle() {
       if (!bundle_) {
-        StartNewPacket(false);
+        StartNewPacket(false, initial_last_read_event_ts_);
       }
       return bundle_;
     }
 
     // Forces the creation of a new TracePacket.
-    void StartNewPacket(bool lost_events);
+    void StartNewPacket(bool lost_events, uint64_t last_read_event_timestamp);
 
     // This function is called after the contents of a FtraceBundle are written.
     void FinalizeAndRunSymbolizer();
@@ -158,10 +160,11 @@
     const FtraceClockSnapshot* const ftrace_clock_snapshot_;
     protos::pbzero::FtraceClock const ftrace_clock_;
     const bool compact_sched_enabled_;
+    CompactSchedBuffer* const compact_sched_buf_;
+    uint64_t initial_last_read_event_ts_;
 
     TraceWriter::TracePacketHandle packet_;
     protos::pbzero::FtraceEventBundle* bundle_ = nullptr;
-    CompactSchedBuffer* const compact_sched_buf_;
   };
 
   struct PageHeader {
@@ -302,7 +305,8 @@
       const ProtoTranslationTable* table,
       const FtraceDataSourceConfig* ds_config,
       Bundler* bundler,
-      FtraceMetadata* metadata);
+      FtraceMetadata* metadata,
+      uint64_t* last_read_event_ts);
 
   // Parse a single raw ftrace event beginning at |start| and ending at |end|
   // and write it into the provided bundle as a proto.
@@ -370,8 +374,9 @@
       size_t cpu,
       const FtraceDataSourceConfig* ds_config,
       base::FlatSet<protos::pbzero::FtraceParseStatus>* parse_errors,
+      uint64_t* last_read_event_ts,
       const uint8_t* parsing_buf,
-      const size_t pages_read,
+      size_t pages_read,
       CompactSchedBuffer* compact_sched_buf,
       const ProtoTranslationTable* table,
       LazyKernelSymbolizer* symbolizer,
@@ -397,6 +402,7 @@
   const ProtoTranslationTable* table_;
   LazyKernelSymbolizer* symbolizer_;
   base::ScopedFile trace_fd_;
+  uint64_t last_read_event_ts_ = 0;
   protos::pbzero::FtraceClock ftrace_clock_{};
   const FtraceClockSnapshot* ftrace_clock_snapshot_;
 };
diff --git a/src/traced/probes/ftrace/cpu_reader_benchmark.cc b/src/traced/probes/ftrace/cpu_reader_benchmark.cc
index b9a08db..ae5de7d 100644
--- a/src/traced/probes/ftrace/cpu_reader_benchmark.cc
+++ b/src/traced/probes/ftrace/cpu_reader_benchmark.cc
@@ -840,7 +840,7 @@
       &writer, &metadata, /*symbolizer=*/nullptr, /*cpu=*/0,
       /*ftrace_clock_snapshot=*/nullptr,
       protos::pbzero::FTRACE_CLOCK_UNSPECIFIED, compact_sched_buf.get(),
-      /*compact_sched_enabled=*/false);
+      /*compact_sched_enabled=*/false, /*last_read_event_ts=*/0);
 
   ProtoTranslationTable* table = GetTable(test_case.name);
   auto page = PageFromXxd(test_case.data);
@@ -874,8 +874,10 @@
     if (!page_header.has_value())
       return;
 
+    uint64_t last_read_event_ts = 0;
     CpuReader::ParsePagePayload(parse_pos, &page_header.value(), table,
-                                &ds_config, &bundler, &metadata);
+                                &ds_config, &bundler, &metadata,
+                                &last_read_event_ts);
 
     metadata.Clear();
     bundler.FinalizeAndRunSymbolizer();
@@ -967,13 +969,15 @@
 
   FtraceMetadata metadata{};
   auto compact_sched_buf = std::make_unique<CompactSchedBuffer>();
+  uint64_t last_read_event_ts = 0;
   base::FlatSet<protos::pbzero::FtraceParseStatus> parse_errors;
   while (state.KeepRunning()) {
     CpuReader::ProcessPagesForDataSource(
         &writer, &metadata, /*cpu=*/0, &ds_config, &parse_errors,
-        repeated_pages.get(), page_repetition, compact_sched_buf.get(), table,
+        &last_read_event_ts, repeated_pages.get(), page_repetition,
+        compact_sched_buf.get(), table,
         /*symbolizer=*/nullptr, /*ftrace_clock_snapshot=*/nullptr,
-        /*ftrace_clock=*/protos::pbzero::FTRACE_CLOCK_UNSPECIFIED);
+        protos::pbzero::FTRACE_CLOCK_UNSPECIFIED);
 
     metadata.Clear();
   }
diff --git a/src/traced/probes/ftrace/cpu_reader_fuzzer.cc b/src/traced/probes/ftrace/cpu_reader_fuzzer.cc
index afd5bc4..f92aab9 100644
--- a/src/traced/probes/ftrace/cpu_reader_fuzzer.cc
+++ b/src/traced/probes/ftrace/cpu_reader_fuzzer.cc
@@ -15,15 +15,13 @@
  */
 
 #include <stddef.h>
-#include <stdint.h>
+#include <cstdint>
 
 #include <algorithm>
 
 #include "perfetto/base/flat_set.h"
 #include "perfetto/base/logging.h"
 #include "perfetto/ext/base/utils.h"
-#include "perfetto/protozero/scattered_stream_null_delegate.h"
-#include "perfetto/protozero/scattered_stream_writer.h"
 #include "src/traced/probes/ftrace/cpu_reader.h"
 #include "src/traced/probes/ftrace/ftrace_config_muxer.h"
 #include "src/traced/probes/ftrace/test/cpu_reader_support.h"
@@ -69,8 +67,10 @@
   NullTraceWriter null_writer;
   auto compact_sched_buf = std::make_unique<CompactSchedBuffer>();
   base::FlatSet<protos::pbzero::FtraceParseStatus> parse_errors;
+  uint64_t last_read_event_ts = 0;
   CpuReader::ProcessPagesForDataSource(
-      &null_writer, &metadata, /*cpu=*/0, &ds_config, &parse_errors, g_page,
+      &null_writer, &metadata, /*cpu=*/0, &ds_config, &parse_errors,
+      &last_read_event_ts, g_page,
       /*pages_read=*/1, compact_sched_buf.get(), table, /*symbolizer*/ nullptr,
       /*ftrace_clock_snapshot=*/nullptr,
       protos::pbzero::FTRACE_CLOCK_UNSPECIFIED);
diff --git a/src/traced/probes/ftrace/cpu_reader_unittest.cc b/src/traced/probes/ftrace/cpu_reader_unittest.cc
index 2d4671c..33a8ca3 100644
--- a/src/traced/probes/ftrace/cpu_reader_unittest.cc
+++ b/src/traced/probes/ftrace/cpu_reader_unittest.cc
@@ -405,7 +405,8 @@
                      /*cpu=*/0,
                      /*ftrace_clock_snapshot=*/nullptr,
                      protos::pbzero::FTRACE_CLOCK_UNSPECIFIED,
-                     compact_sched_buf_.get(), ds_config.compact_sched.enabled);
+                     compact_sched_buf_.get(), ds_config.compact_sched.enabled,
+                     /*last_read_event_ts=*/0);
     return &bundler_.value();
   }
 
@@ -433,6 +434,7 @@
   std::optional<TraceWriterForTesting> writer_;
   std::unique_ptr<CompactSchedBuffer> compact_sched_buf_;
   std::optional<CpuReader::Bundler> bundler_;
+  uint64_t last_read_event_ts_ = 0;
 };
 
 TEST_F(CpuReaderParsePagePayloadTest, ParseSinglePrint) {
@@ -457,7 +459,7 @@
 
   FtraceParseStatus status = CpuReader::ParsePagePayload(
       parse_pos, &page_header.value(), table, &ds_config,
-      CreateBundler(ds_config), &metadata_);
+      CreateBundler(ds_config), &metadata_, &last_read_event_ts_);
 
   EXPECT_EQ(status, FtraceParseStatus::FTRACE_STATUS_OK);
 
@@ -578,7 +580,8 @@
   EXPECT_LE(parse_pos + page_header->size, page_end);
 
   CpuReader::ParsePagePayload(parse_pos, &page_header.value(), table,
-                              &ds_config, CreateBundler(ds_config), &metadata_);
+                              &ds_config, CreateBundler(ds_config), &metadata_,
+                              &last_read_event_ts_);
 
   auto bundle = GetBundle();
   const protos::gen::FtraceEvent& long_print = bundle.event()[0];
@@ -622,7 +625,7 @@
 
   FtraceParseStatus status = CpuReader::ParsePagePayload(
       parse_pos, &page_header.value(), table, &ds_config,
-      CreateBundler(ds_config), &metadata_);
+      CreateBundler(ds_config), &metadata_, &last_read_event_ts_);
 
   EXPECT_EQ(status, FtraceParseStatus::FTRACE_STATUS_OK);
 
@@ -666,7 +669,7 @@
 
   FtraceParseStatus status = CpuReader::ParsePagePayload(
       parse_pos, &page_header.value(), table, &ds_config,
-      CreateBundler(ds_config), &metadata_);
+      CreateBundler(ds_config), &metadata_, &last_read_event_ts_);
 
   EXPECT_EQ(status, FtraceParseStatus::FTRACE_STATUS_OK);
 
@@ -696,7 +699,7 @@
 
   FtraceParseStatus status = CpuReader::ParsePagePayload(
       parse_pos, &page_header.value(), table, &ds_config,
-      CreateBundler(ds_config), &metadata_);
+      CreateBundler(ds_config), &metadata_, &last_read_event_ts_);
 
   EXPECT_EQ(status, FtraceParseStatus::FTRACE_STATUS_OK);
 
@@ -758,7 +761,7 @@
 
   FtraceParseStatus status = CpuReader::ParsePagePayload(
       parse_pos, &page_header.value(), table, &ds_config,
-      CreateBundler(ds_config), &metadata_);
+      CreateBundler(ds_config), &metadata_, &last_read_event_ts_);
 
   EXPECT_EQ(status, FtraceParseStatus::FTRACE_STATUS_OK);
 
@@ -812,7 +815,7 @@
 
     FtraceParseStatus status = CpuReader::ParsePagePayload(
         parse_pos, &page_header.value(), table, &ds_config_no_filter,
-        CreateBundler(ds_config_no_filter), &metadata_);
+        CreateBundler(ds_config_no_filter), &metadata_, &last_read_event_ts_);
     EXPECT_EQ(status, FtraceParseStatus::FTRACE_STATUS_OK);
 
     auto bundle = GetBundle();
@@ -845,7 +848,7 @@
 
     FtraceParseStatus status = CpuReader::ParsePagePayload(
         parse_pos, &page_header.value(), table, &ds_config_with_filter,
-        CreateBundler(ds_config_with_filter), &metadata_);
+        CreateBundler(ds_config_with_filter), &metadata_, &last_read_event_ts_);
     EXPECT_EQ(status, FtraceParseStatus::FTRACE_STATUS_OK);
 
     auto bundle = GetBundle();
@@ -898,9 +901,11 @@
 
     TraceWriterForTesting trace_writer;
     base::FlatSet<protos::pbzero::FtraceParseStatus> parse_errors;
+    uint64_t last_read_event_ts = 0;
     bool success = CpuReader::ProcessPagesForDataSource(
         &trace_writer, &metadata, /*cpu=*/1, &with_filter, &parse_errors,
-        buf.get(), kTestPages, compact_sched_buf.get(), table,
+        &last_read_event_ts, buf.get(), kTestPages, compact_sched_buf.get(),
+        table,
         /*symbolizer=*/nullptr,
         /*ftrace_clock_snapshot=*/nullptr,
         protos::pbzero::FTRACE_CLOCK_UNSPECIFIED);
@@ -920,9 +925,11 @@
 
     TraceWriterForTesting trace_writer;
     base::FlatSet<protos::pbzero::FtraceParseStatus> parse_errors;
+    uint64_t last_read_event_ts = 0;
     bool success = CpuReader::ProcessPagesForDataSource(
         &trace_writer, &metadata, /*cpu=*/1, &without_filter, &parse_errors,
-        buf.get(), kTestPages, compact_sched_buf.get(), table,
+        &last_read_event_ts, buf.get(), kTestPages, compact_sched_buf.get(),
+        table,
         /*symbolizer=*/nullptr,
         /*ftrace_clock_snapshot=*/nullptr,
         protos::pbzero::FTRACE_CLOCK_UNSPECIFIED);
@@ -1008,13 +1015,13 @@
 
   FtraceParseStatus status = CpuReader::ParsePagePayload(
       parse_pos, &page_header.value(), table, &ds_config,
-      CreateBundler(ds_config), &metadata_);
+      CreateBundler(ds_config), &metadata_, &last_read_event_ts_);
 
   EXPECT_EQ(status, FtraceParseStatus::FTRACE_STATUS_OK);
+  EXPECT_EQ(last_read_event_ts_, 1'045'157'726'697'236ULL);
 
   auto bundle = GetBundle();
   ASSERT_EQ(bundle.event().size(), 6u);
-
   {
     const protos::gen::FtraceEvent& event = bundle.event()[1];
     EXPECT_EQ(event.pid(), 3733ul);
@@ -1057,9 +1064,10 @@
 
   FtraceParseStatus status = CpuReader::ParsePagePayload(
       parse_pos, &page_header.value(), table, &ds_config,
-      CreateBundler(ds_config), &metadata_);
+      CreateBundler(ds_config), &metadata_, &last_read_event_ts_);
 
   EXPECT_EQ(status, FtraceParseStatus::FTRACE_STATUS_OK);
+  EXPECT_EQ(last_read_event_ts_, 1'045'157'726'697'236ULL);
 
   // sched switch fields were buffered:
   EXPECT_LT(0u, bundler_->compact_sched_buf()->sched_switch().size());
@@ -1172,7 +1180,7 @@
 
   FtraceParseStatus status = CpuReader::ParsePagePayload(
       parse_pos, &page_header.value(), table, &ds_config,
-      CreateBundler(ds_config), &metadata_);
+      CreateBundler(ds_config), &metadata_, &last_read_event_ts_);
 
   EXPECT_EQ(status, FtraceParseStatus::FTRACE_STATUS_OK);
 
@@ -1642,9 +1650,11 @@
   TraceWriterForTesting trace_writer;
   auto compact_sched_buf = std::make_unique<CompactSchedBuffer>();
   base::FlatSet<protos::pbzero::FtraceParseStatus> parse_errors;
+  uint64_t last_read_event_ts = 0;
   bool success = CpuReader::ProcessPagesForDataSource(
-      &trace_writer, &metadata, /*cpu=*/1, &ds_config, &parse_errors, buf.get(),
-      kTestPages, compact_sched_buf.get(), table, /*symbolizer=*/nullptr,
+      &trace_writer, &metadata, /*cpu=*/1, &ds_config, &parse_errors,
+      &last_read_event_ts, buf.get(), kTestPages, compact_sched_buf.get(),
+      table, /*symbolizer=*/nullptr,
       /*ftrace_clock_snapshot=*/nullptr,
       protos::pbzero::FTRACE_CLOCK_UNSPECIFIED);
 
@@ -1695,9 +1705,11 @@
   TraceWriterForTesting trace_writer;
   auto compact_sched_buf = std::make_unique<CompactSchedBuffer>();
   base::FlatSet<protos::pbzero::FtraceParseStatus> parse_errors;
+  uint64_t last_read_event_ts = 0;
   bool success = CpuReader::ProcessPagesForDataSource(
-      &trace_writer, &metadata, /*cpu=*/1, &ds_config, &parse_errors, buf.get(),
-      kTestPages, compact_sched_buf.get(), table, /*symbolizer=*/nullptr,
+      &trace_writer, &metadata, /*cpu=*/1, &ds_config, &parse_errors,
+      &last_read_event_ts, buf.get(), kTestPages, compact_sched_buf.get(),
+      table, /*symbolizer=*/nullptr,
       /*ftrace_clock_snapshot=*/nullptr,
       protos::pbzero::FTRACE_CLOCK_UNSPECIFIED);
 
@@ -1926,7 +1938,7 @@
 
   FtraceParseStatus status = CpuReader::ParsePagePayload(
       parse_pos, &page_header.value(), table, &ds_config,
-      CreateBundler(ds_config), &metadata_);
+      CreateBundler(ds_config), &metadata_, &last_read_event_ts_);
 
   EXPECT_EQ(status, FtraceParseStatus::FTRACE_STATUS_OK);
 
@@ -2410,7 +2422,7 @@
 
   FtraceParseStatus status = CpuReader::ParsePagePayload(
       parse_pos, &page_header.value(), table, &ds_config,
-      CreateBundler(ds_config), &metadata_);
+      CreateBundler(ds_config), &metadata_, &last_read_event_ts_);
 
   EXPECT_EQ(status, FtraceParseStatus::FTRACE_STATUS_OK);
 
@@ -2443,6 +2455,8 @@
 //            <...>-9290  [000] ....  1352.724567: suspend_resume: resume_console[1] begin
 //            <...>-9290  [000] ....  1352.724570: suspend_resume: resume_console[1] end
 //            <...>-9290  [000] ....  1352.724574: suspend_resume: thaw_processes[0] begin
+// clang-format on
+
 static ExamplePage g_suspend_resume {
     "synthetic",
     R"(00000000: edba 155a 3201 0000 7401 0000 0000 0000  ...Z2...t.......
@@ -2488,8 +2502,9 @@
       CpuReader::ParsePageHeader(&parse_pos, table->page_header_size_len());
   ASSERT_TRUE(page_header.has_value());
 
-  CpuReader::ParsePagePayload(
-      parse_pos, &page_header.value(), table, &ds_config, CreateBundler(ds_config), &metadata_);
+  CpuReader::ParsePagePayload(parse_pos, &page_header.value(), table,
+                              &ds_config, CreateBundler(ds_config), &metadata_,
+                              &last_read_event_ts_);
   auto bundle = GetBundle();
   ASSERT_EQ(bundle.event().size(), 13u);
   EXPECT_EQ(bundle.event()[0].suspend_resume().action(), "sync_filesystems");
@@ -2933,7 +2948,7 @@
 
   FtraceParseStatus status = CpuReader::ParsePagePayload(
       parse_pos, &page_header.value(), table, &ds_config,
-      CreateBundler(ds_config), &metadata_);
+      CreateBundler(ds_config), &metadata_, &last_read_event_ts_);
 
   EXPECT_EQ(status, FtraceParseStatus::FTRACE_STATUS_OK);
 
@@ -3039,7 +3054,7 @@
 
   FtraceParseStatus status = CpuReader::ParsePagePayload(
       parse_pos, &page_header.value(), table, &ds_config,
-      CreateBundler(ds_config), &metadata_);
+      CreateBundler(ds_config), &metadata_, &last_read_event_ts_);
 
   // successfully parsed the whole 32 byte event
   ASSERT_EQ(32u, page_header->size);
@@ -3321,7 +3336,7 @@
 // the header says there's valid data, but the contents are a run of zeros
 // (which doesn't decode to valid events per the ring buffer ABI). Confirm that
 // the error is reported in the ftrace event bundle.
-TEST_F(CpuReaderParsePagePayloadTest, ZeroPaddedPageWorkaround) {
+TEST_F(CpuReaderParsePagePayloadTest, InvalidZeroPaddedPage) {
   const ExamplePage* test_case = &g_zero_padded;
   ProtoTranslationTable* table = GetTable(test_case->name);
   auto page = PageFromXxd(test_case->data);
@@ -3341,7 +3356,7 @@
 
   FtraceParseStatus status = CpuReader::ParsePagePayload(
       parse_pos, &page_header.value(), table, &ds_config,
-      CreateBundler(ds_config), &metadata_);
+      CreateBundler(ds_config), &metadata_, &last_read_event_ts_);
 
   EXPECT_EQ(0xff0u, page_header->size);
   EXPECT_EQ(status, FtraceParseStatus::FTRACE_STATUS_ABI_ZERO_DATA_LENGTH);
@@ -3375,7 +3390,7 @@
 
   FtraceParseStatus status = CpuReader::ParsePagePayload(
       parse_pos, &page_header.value(), table, &ds_config,
-      CreateBundler(ds_config), &metadata_);
+      CreateBundler(ds_config), &metadata_, &last_read_event_ts_);
 
   EXPECT_EQ(4u, page_header->size);
   EXPECT_EQ(status, FtraceParseStatus::FTRACE_STATUS_ABI_SHORT_DATA_LENGTH);
@@ -3420,7 +3435,7 @@
 
   FtraceParseStatus status = CpuReader::ParsePagePayload(
       parse_pos, &page_header.value(), table, &ds_config,
-      CreateBundler(ds_config), &metadata_);
+      CreateBundler(ds_config), &metadata_, &last_read_event_ts_);
 
   EXPECT_EQ(status, FtraceParseStatus::FTRACE_STATUS_OK);
 
@@ -3474,7 +3489,7 @@
 
   FtraceParseStatus status = CpuReader::ParsePagePayload(
       parse_pos, &page_header.value(), table, &ds_config,
-      CreateBundler(ds_config), &metadata_);
+      CreateBundler(ds_config), &metadata_, &last_read_event_ts_);
 
   EXPECT_EQ(status, FtraceParseStatus::FTRACE_STATUS_OK);
 
@@ -3491,5 +3506,132 @@
   EXPECT_EQ(event.f2fs_truncate_partial_nodes().err(), 3);
 }
 
+// one print
+char g_last_ts_test_page_0[] = R"(
+    00000000: cd79 fb3a 2fa4 0400 2c00 0000 0000 0000  .y.:/...,.......
+    00000010: 7eb6 e5eb 8f11 0000 0800 0000 0500 0000  ~...............
+    00000020: 1e83 1400 42ab e0af ffff ffff 6669 7273  ....B.......firs
+    00000030: 745f 7072 696e 740a 0000 0000 0000 0000  t_print.........
+  )";
+
+// one print
+char g_last_ts_test_page_1[] = R"(
+    00000000: 3c11 d579 99a5 0400 2c00 0000 0000 0000  <..y....,.......
+    00000010: 3ed1 6315 3701 0000 0800 0000 0500 0000  >.c.7...........
+    00000020: 9e8c 1400 42ab e0af ffff ffff 7365 636f  ....B.......seco
+    00000030: 6e64 5f70 7269 6e74 0a00 0000 0000 0000  nd_print........
+  )";
+
+// data loss marker ("since last read") + multiple sched_switch + one print
+char g_last_ts_test_page_2[] = R"(
+    00000000: 8ac6 cb70 a8a5 0400 4c02 0080 ffff ffff  ...p....L.......
+    00000010: 1000 0000 4701 0102 01b1 0f00 636f 6465  ....G.......code
+    00000020: 0000 0000 0000 0000 0000 0000 01b1 0f00  ................
+    00000030: 7800 0000 0100 0000 0000 0000 7377 6170  x...........swap
+    00000040: 7065 722f 3000 0000 0000 0000 0000 0000  per/0...........
+    00000050: 7800 0000 b0e3 f602 4701 0102 0000 0000  x.......G.......
+    00000060: 7377 6170 7065 722f 3000 0000 0000 0000  swapper/0.......
+    00000070: 0000 0000 7800 0000 0000 0000 0000 0000  ....x...........
+    00000080: 6b77 6f72 6b65 722f 303a 3500 0000 0000  kworker/0:5.....
+    00000090: ac85 1400 7800 0000 1002 0300 4701 0102  ....x.......G...
+    000000a0: ac85 1400 6b77 6f72 6b65 722f 303a 3500  ....kworker/0:5.
+    000000b0: 0000 0000 ac85 1400 7800 0000 8000 0000  ........x.......
+    000000c0: 0000 0000 7377 6170 7065 722f 3000 0000  ....swapper/0...
+    000000d0: 0000 0000 0000 0000 7800 0000 f086 7106  ........x.....q.
+    000000e0: 4701 0102 0000 0000 7377 6170 7065 722f  G.......swapper/
+    000000f0: 3000 0000 0000 0000 0000 0000 7800 0000  0...........x...
+    00000100: 0000 0000 0000 0000 6f62 6e6f 2d64 6573  ........obno-des
+    00000110: 6b74 6f70 2d6e 6f00 d513 0000 7800 0000  ktop-no.....x...
+    00000120: 3013 1000 4701 0102 d513 0000 6f62 6e6f  0...G.......obno
+    00000130: 2d64 6573 6b74 6f70 2d6e 6f00 d513 0000  -desktop-no.....
+    00000140: 7800 0000 0100 0000 0000 0000 7377 6170  x...........swap
+    00000150: 7065 722f 3000 0000 0000 0000 0000 0000  per/0...........
+    00000160: 7800 0000 10b0 2703 4701 0102 0000 0000  x.....'.G.......
+    00000170: 7377 6170 7065 722f 3000 0000 0000 0000  swapper/0.......
+    00000180: 0000 0000 7800 0000 0000 0000 0000 0000  ....x...........
+    00000190: 6b77 6f72 6b65 722f 303a 3500 0000 0000  kworker/0:5.....
+    000001a0: ac85 1400 7800 0000 70e7 0200 4701 0102  ....x...p...G...
+    000001b0: ac85 1400 6b77 6f72 6b65 722f 303a 3500  ....kworker/0:5.
+    000001c0: 0000 0000 ac85 1400 7800 0000 8000 0000  ........x.......
+    000001d0: 0000 0000 6b73 6f66 7469 7271 642f 3000  ....ksoftirqd/0.
+    000001e0: 0000 0000 0f00 0000 7800 0000 10a4 0200  ........x.......
+    000001f0: 4701 0102 0f00 0000 6b73 6f66 7469 7271  G.......ksoftirq
+    00000200: 642f 3000 0000 0000 0f00 0000 7800 0000  d/0.........x...
+    00000210: 0100 0000 0000 0000 7377 6170 7065 722f  ........swapper/
+    00000220: 3000 0000 0000 0000 0000 0000 7800 0000  0...........x...
+    00000230: fef2 0a4d 7500 0000 0800 0000 0500 0000  ...Mu...........
+    00000240: 1a8d 1400 42ab e0af ffff ffff 7468 6972  ....B.......thir
+    00000250: 645f 7072 696e 740a 0000 0000 0000 0000  d_print.........
+  )";
+
+// Tests that |last_read_event_timestamp| is correctly updated in cases where a
+// single ProcessPagesForDataSource call produces multiple ftrace bundle packets
+// (due to splitting on data loss markers).
+TEST(CpuReaderTest, LastReadEventTimestampWithSplitBundles) {
+  // build test buffer with 3 pages
+  ProtoTranslationTable* table = GetTable("synthetic");
+  std::vector<std::unique_ptr<uint8_t[]>> test_pages;
+  test_pages.emplace_back(PageFromXxd(g_last_ts_test_page_0));
+  test_pages.emplace_back(PageFromXxd(g_last_ts_test_page_1));
+  test_pages.emplace_back(PageFromXxd(g_last_ts_test_page_2));
+  size_t num_pages = test_pages.size();
+  size_t page_sz = base::GetSysPageSize();
+  auto buf = std::make_unique<uint8_t[]>(page_sz * num_pages);
+  for (size_t i = 0; i < num_pages; i++) {
+    void* dest = buf.get() + (i * page_sz);
+    memcpy(dest, static_cast<const void*>(test_pages[i].get()), page_sz);
+  }
+
+  // build cfg requesting ftrace/print
+  auto compact_sched_buf = std::make_unique<CompactSchedBuffer>();
+  FtraceMetadata metadata{};
+  FtraceDataSourceConfig ftrace_cfg = EmptyConfig();
+  ftrace_cfg.event_filter.AddEnabledEvent(
+      table->EventToFtraceId(GroupAndName("ftrace", "print")));
+
+  // invoke ProcessPagesForDataSource
+  TraceWriterForTesting trace_writer;
+  base::FlatSet<protos::pbzero::FtraceParseStatus> parse_errors;
+  uint64_t last_read_event_ts = 0;
+  bool success = CpuReader::ProcessPagesForDataSource(
+      &trace_writer, &metadata, /*cpu=*/0, &ftrace_cfg, &parse_errors,
+      &last_read_event_ts, buf.get(), num_pages, compact_sched_buf.get(), table,
+      /*symbolizer=*/nullptr,
+      /*ftrace_clock_snapshot=*/nullptr,
+      protos::pbzero::FTRACE_CLOCK_UNSPECIFIED);
+
+  EXPECT_TRUE(success);
+
+  // We've read three pages, one print event on each. There is a data loss
+  // marker on the third page, indicating that the kernel overwrote events
+  // between 2nd and 3rd page (imagine our daemon getting cpu starved between
+  // those reads).
+  //
+  // Therefore we expect two bundles, as we start a new one whenever we
+  // encounter data loss (to set the |lost_events| field in the bundle proto).
+  //
+  // In terms of |last_read_event_timestamp|, the first bundle will emit zero
+  // since that's our initial input. The second bundle needs to emit the
+  // timestamp of the last event in the first bundle.
+  auto packets = trace_writer.GetAllTracePackets();
+  ASSERT_EQ(2u, packets.size());
+
+  // 2 prints
+  auto const& first_bundle = packets[0].ftrace_events();
+  EXPECT_FALSE(first_bundle.lost_events());
+  ASSERT_EQ(2u, first_bundle.event().size());
+  EXPECT_TRUE(first_bundle.has_last_read_event_timestamp());
+  EXPECT_EQ(0u, first_bundle.last_read_event_timestamp());
+
+  const uint64_t kSecondPrintTs = 1308020252356549ULL;
+  EXPECT_EQ(kSecondPrintTs, first_bundle.event()[1].timestamp());
+
+  // 1 print + lost_events + updated last_read_event_timestamp
+  auto const& second_bundle = packets[1].ftrace_events();
+  EXPECT_TRUE(second_bundle.lost_events());
+  EXPECT_EQ(1u, second_bundle.event().size());
+  EXPECT_EQ(kSecondPrintTs, second_bundle.last_read_event_timestamp());
+}
+
 }  // namespace
 }  // namespace perfetto
diff --git a/src/traced/probes/ftrace/cpu_stats_parser.cc b/src/traced/probes/ftrace/cpu_stats_parser.cc
index 1d1cf5a..b3aaf49 100644
--- a/src/traced/probes/ftrace/cpu_stats_parser.cc
+++ b/src/traced/probes/ftrace/cpu_stats_parser.cc
@@ -74,11 +74,13 @@
 }
 
 bool DumpAllCpuStats(FtraceProcfs* ftrace, FtraceStats* stats) {
-  stats->cpu_stats.resize(ftrace->NumberOfCpus(), {});
-  for (size_t cpu = 0; cpu < ftrace->NumberOfCpus(); cpu++) {
+  size_t num_cpus = ftrace->NumberOfCpus();
+  stats->cpu_stats.resize(num_cpus, {});
+  for (size_t cpu = 0; cpu < num_cpus; cpu++) {
     stats->cpu_stats[cpu].cpu = cpu;
-    if (!DumpCpuStats(ftrace->ReadCpuStats(cpu), &stats->cpu_stats[cpu]))
+    if (!DumpCpuStats(ftrace->ReadCpuStats(cpu), &stats->cpu_stats[cpu])) {
       return false;
+    }
   }
   return true;
 }
diff --git a/src/traced/probes/ftrace/ftrace_procfs.cc b/src/traced/probes/ftrace/ftrace_procfs.cc
index 5576627..5346c0d 100644
--- a/src/traced/probes/ftrace/ftrace_procfs.cc
+++ b/src/traced/probes/ftrace/ftrace_procfs.cc
@@ -397,7 +397,7 @@
   // We cannot use PERFETTO_CHECK as we might get a permission denied error
   // on Android. The permissions to these files are configured in
   // platform/framework/native/cmds/atrace/atrace.rc.
-  for (size_t cpu = 0; cpu < NumberOfCpus(); cpu++) {
+  for (size_t cpu = 0, num_cpus = NumberOfCpus(); cpu < num_cpus; cpu++) {
     ClearPerCpuTrace(cpu);
   }
 }