Merge "Check trace packet contents in trace_writer_impl_unittest"
diff --git a/src/tracing/core/trace_writer_impl_unittest.cc b/src/tracing/core/trace_writer_impl_unittest.cc
index c822e50..6750880 100644
--- a/src/tracing/core/trace_writer_impl_unittest.cc
+++ b/src/tracing/core/trace_writer_impl_unittest.cc
@@ -16,10 +16,14 @@
 
 #include "src/tracing/core/trace_writer_impl.h"
 
+#include <vector>
+
 #include "perfetto/ext/base/utils.h"
 #include "perfetto/ext/tracing/core/commit_data_request.h"
+#include "perfetto/ext/tracing/core/shared_memory_abi.h"
 #include "perfetto/ext/tracing/core/trace_writer.h"
 #include "perfetto/ext/tracing/core/tracing_service.h"
+#include "perfetto/protozero/proto_utils.h"
 #include "src/base/test/gtest_test_suite.h"
 #include "src/base/test/test_task_runner.h"
 #include "src/tracing/core/shared_memory_arbiter_impl.h"
@@ -27,12 +31,15 @@
 #include "src/tracing/test/mock_producer_endpoint.h"
 #include "test/gtest_and_gmock.h"
 
+#include "protos/perfetto/trace/test_event.gen.h"
 #include "protos/perfetto/trace/test_event.pbzero.h"
+#include "protos/perfetto/trace/trace_packet.gen.h"
 #include "protos/perfetto/trace/trace_packet.pbzero.h"
 
 namespace perfetto {
 namespace {
 
+using ChunkHeader = SharedMemoryABI::ChunkHeader;
 using ::testing::AllOf;
 using ::testing::MockFunction;
 using ::testing::Ne;
@@ -43,6 +50,14 @@
 
 class TraceWriterImplTest : public AlignedBufferTest {
  public:
+  struct PatchKey {
+    uint32_t writer_id;
+    uint32_t chunk_id;
+    bool operator<(const PatchKey& other) const {
+      return std::tie(writer_id, chunk_id) <
+             std::tie(other.writer_id, other.chunk_id);
+    }
+  };
   void SetUp() override {
     SharedMemoryArbiterImpl::set_default_layout_for_testing(
         SharedMemoryABI::PageLayout::kPageDiv4);
@@ -56,6 +71,10 @@
                            MockProducerEndpoint::CommitDataCallback cb) {
           last_commit_ = req;
           last_commit_callback_ = cb;
+          for (const CommitDataRequest::ChunkToPatch& c :
+               req.chunks_to_patch()) {
+            patches_[PatchKey{c.writer_id(), c.chunk_id()}] = c.patches();
+          }
         });
   }
 
@@ -64,8 +83,97 @@
     task_runner_.reset();
   }
 
+  std::vector<uint8_t> CopyPayloadAndApplyPatches(
+      SharedMemoryABI::Chunk& chunk) const {
+    std::vector<uint8_t> copy(chunk.payload_begin(),
+                              chunk.payload_begin() + chunk.payload_size());
+    ChunkHeader::Packets p = chunk.header()->packets.load();
+
+    auto it = patches_.find(PatchKey{chunk.header()->writer_id.load(),
+                                     chunk.header()->chunk_id.load()});
+    if (it == patches_.end()) {
+      EXPECT_FALSE(p.flags & ChunkHeader::kChunkNeedsPatching);
+      return copy;
+    }
+    EXPECT_TRUE(p.flags & ChunkHeader::kChunkNeedsPatching);
+
+    for (const CommitDataRequest::ChunkToPatch::Patch& patch : it->second) {
+      if (patch.offset() + patch.data().size() > copy.size()) {
+        ADD_FAILURE() << "Patch out of bounds";
+        continue;
+      }
+      for (size_t i = 0; i < patch.data().size(); i++) {
+        copy[patch.offset() + i] =
+            reinterpret_cast<const uint8_t*>(patch.data().data())[i];
+      }
+    }
+    return copy;
+  }
+
+  // Extracts trace packets from the shared memory buffer, and returns copies of
+  // them (after applying the patches received). The producer that writes to the
+  // shared memory (i.e. the trace writer) must be destroyed.
+  std::vector<std::string> GetPacketsFromShmemAndPatches() {
+    std::vector<std::string> packets;
+    SharedMemoryABI* abi = arbiter_->shmem_abi_for_testing();
+    bool was_fragmenting = false;
+    for (size_t page_idx = 0; page_idx < abi->num_pages(); page_idx++) {
+      uint32_t page_layout = abi->GetPageLayout(page_idx);
+      size_t num_chunks = SharedMemoryABI::GetNumChunksForLayout(page_layout);
+      for (size_t chunk_idx = 0; chunk_idx < num_chunks; chunk_idx++) {
+        SharedMemoryABI::ChunkState chunk_state =
+            abi->GetChunkState(page_idx, chunk_idx);
+        if (chunk_state != SharedMemoryABI::kChunkFree &&
+            chunk_state != SharedMemoryABI::kChunkComplete) {
+          ADD_FAILURE() << "Page " << page_idx << " chunk " << chunk_idx
+                        << " unexpected state: " << chunk_state;
+          continue;
+        }
+        SharedMemoryABI::Chunk chunk =
+            abi->TryAcquireChunkForReading(page_idx, chunk_idx);
+        if (!chunk.is_valid())
+          continue;
+        ChunkHeader::Packets p = chunk.header()->packets.load();
+
+        EXPECT_EQ(
+            was_fragmenting,
+            static_cast<bool>(p.flags &
+                              ChunkHeader::kFirstPacketContinuesFromPrevChunk));
+
+        std::vector<uint8_t> payload = CopyPayloadAndApplyPatches(chunk);
+
+        const uint8_t* read_ptr = payload.data();
+        const uint8_t* const end_read_ptr = payload.data() + payload.size();
+
+        size_t num_fragments = p.count;
+        for (; num_fragments && read_ptr < end_read_ptr; num_fragments--) {
+          uint64_t len;
+          read_ptr =
+              protozero::proto_utils::ParseVarInt(read_ptr, end_read_ptr, &len);
+          if (!was_fragmenting || packets.empty()) {
+            packets.push_back(std::string());
+          }
+          if (read_ptr + len > end_read_ptr) {
+            ADD_FAILURE() << "Page " << page_idx << " chunk " << chunk_idx
+                          << " malformed chunk";
+          }
+          packets.back().append(reinterpret_cast<const char*>(read_ptr),
+                                static_cast<size_t>(len));
+          read_ptr += len;
+        }
+        EXPECT_EQ(num_fragments, 0u);
+        if (p.flags & ChunkHeader::kLastPacketContinuesOnNextChunk) {
+          was_fragmenting = true;
+        }
+      }
+    }
+    return packets;
+  }
+
   CommitDataRequest last_commit_;
   ProducerEndpoint::CommitDataCallback last_commit_callback_;
+  std::map<PatchKey, std::vector<CommitDataRequest::ChunkToPatch::Patch>>
+      patches_;
   NiceMock<MockProducerEndpoint> mock_producer_endpoint_;
 
   std::unique_ptr<base::TestTaskRunner> task_runner_;
@@ -75,7 +183,7 @@
 size_t const kPageSizes[] = {4096, 65536};
 INSTANTIATE_TEST_SUITE_P(PageSize, TraceWriterImplTest, ValuesIn(kPageSizes));
 
-TEST_P(TraceWriterImplTest, SingleWriter) {
+TEST_P(TraceWriterImplTest, NewTracePacket) {
   const BufferID kBufId = 42;
   std::unique_ptr<TraceWriter> writer = arbiter_->CreateTraceWriter(kBufId);
   const size_t kNumPackets = 32;
@@ -89,31 +197,21 @@
   // and the chunk to be put back in the kChunkComplete state.
   writer.reset();
 
-  SharedMemoryABI* abi = arbiter_->shmem_abi_for_testing();
-  size_t packets_count = 0;
-  for (size_t page_idx = 0; page_idx < abi->num_pages(); page_idx++) {
-    uint32_t page_layout = abi->GetPageLayout(page_idx);
-    size_t num_chunks = SharedMemoryABI::GetNumChunksForLayout(page_layout);
-    for (size_t chunk_idx = 0; chunk_idx < num_chunks; chunk_idx++) {
-      auto chunk_state = abi->GetChunkState(page_idx, chunk_idx);
-      ASSERT_TRUE(chunk_state == SharedMemoryABI::kChunkFree ||
-                  chunk_state == SharedMemoryABI::kChunkComplete);
-      auto chunk = abi->TryAcquireChunkForReading(page_idx, chunk_idx);
-      if (!chunk.is_valid())
-        continue;
-      packets_count += chunk.header()->packets.load().count;
-    }
+  std::vector<std::string> packets = GetPacketsFromShmemAndPatches();
+  ASSERT_THAT(packets, SizeIs(kNumPackets));
+  for (size_t i = 0; i < kNumPackets; i++) {
+    protos::gen::TracePacket packet;
+    packet.ParseFromString(packets[i]);
+    EXPECT_EQ(packet.for_testing().str(), "foobar " + std::to_string(i));
   }
-  EXPECT_EQ(packets_count, kNumPackets);
-  // TODO(primiano): check also the content of the packets decoding the protos.
 }
 
 TEST_P(TraceWriterImplTest, FragmentingPacketWithProducerAndServicePatching) {
   const BufferID kBufId = 42;
   std::unique_ptr<TraceWriter> writer = arbiter_->CreateTraceWriter(kBufId);
 
-  // Write a packet that's guaranteed to span more than a single chunk, but less
-  // than two chunks.
+  // Write a packet that's guaranteed to span more than a single chunk, but
+  // less than two chunks.
   auto packet = writer->NewTracePacket();
   size_t chunk_size = page_size() / 4;
   std::string large_string(chunk_size, 'x');
@@ -127,8 +225,8 @@
   EXPECT_EQ(last_commit_.chunks_to_move()[0].target_buffer(), kBufId);
   EXPECT_THAT(last_commit_.chunks_to_patch(), SizeIs(0));
 
-  // We will simulate a batching cycle by first setting the batching period to a
-  // very large value and then force-flushing when we are done writing data.
+  // We will simulate a batching cycle by first setting the batching period to
+  // a very large value and then force-flushing when we are done writing data.
   arbiter_->SetDirectSMBPatchingSupportedByService();
   ASSERT_TRUE(arbiter_->EnableDirectSMBPatching());
   arbiter_->SetBatchCommitsDuration(UINT32_MAX);
@@ -154,8 +252,8 @@
   SharedMemoryABI* abi = arbiter_->shmem_abi_for_testing();
 
   // The first allocated chunk should be complete but need patching, since the
-  // packet extended past the chunk and no patches for the packet size or string
-  // field size were applied yet.
+  // packet extended past the chunk and no patches for the packet size or
+  // string field size were applied yet.
   ASSERT_EQ(abi->GetChunkState(0u, 0u), SharedMemoryABI::kChunkComplete);
   auto chunk = abi->TryAcquireChunkForReading(0u, 0u);
   ASSERT_TRUE(chunk.is_valid());
@@ -193,9 +291,9 @@
 }
 
 TEST_P(TraceWriterImplTest, FragmentingPacketWithoutEnablingProducerPatching) {
-  // We will simulate a batching cycle by first setting the batching period to a
-  // very large value and will force flush to simulate a flush happening when we
-  // believe it should - in this case when a patch is encountered.
+  // We will simulate a batching cycle by first setting the batching period to
+  // a very large value and will force flush to simulate a flush happening
+  // when we believe it should - in this case when a patch is encountered.
   //
   // Note: direct producer-side patching should be disabled by default.
   arbiter_->SetBatchCommitsDuration(UINT32_MAX);
@@ -216,8 +314,8 @@
   arbiter_->FlushPendingCommitDataRequests();
 
   // The first allocated chunk should be complete but need patching, since the
-  // packet extended past the chunk and no patches for the packet size or string
-  // field size were applied in the producer.
+  // packet extended past the chunk and no patches for the packet size or
+  // string field size were applied in the producer.
   SharedMemoryABI* abi = arbiter_->shmem_abi_for_testing();
   ASSERT_EQ(abi->GetChunkState(0u, 0u), SharedMemoryABI::kChunkComplete);
   auto chunk = abi->TryAcquireChunkForReading(0u, 0u);
@@ -244,9 +342,9 @@
   EXPECT_THAT(last_commit_.chunks_to_patch()[0].patches(), SizeIs(1));
 }
 
-// Sets up a scenario in which the SMB is exhausted and TraceWriter fails to get
-// a new chunk while fragmenting a packet. Verifies that data is dropped until
-// the SMB is freed up and TraceWriter can get a new chunk.
+// Sets up a scenario in which the SMB is exhausted and TraceWriter fails to
+// get a new chunk while fragmenting a packet. Verifies that data is dropped
+// until the SMB is freed up and TraceWriter can get a new chunk.
 TEST_P(TraceWriterImplTest, FragmentingPacketWhileBufferExhausted) {
   const BufferID kBufId = 42;
   std::unique_ptr<TraceWriter> writer =
@@ -268,8 +366,8 @@
                      ->drop_packets_for_testing());
   }
 
-  // Write a packet that's guaranteed to span more than a single chunk, causing
-  // |writer| to attempt to acquire a new chunk but fail to do so.
+  // Write a packet that's guaranteed to span more than a single chunk,
+  // causing |writer| to attempt to acquire a new chunk but fail to do so.
   auto packet2 = writer->NewTracePacket();
   size_t chunk_size = page_size() / 4;
   std::string large_string(chunk_size, 'x');
@@ -314,8 +412,8 @@
   EXPECT_FALSE(reinterpret_cast<TraceWriterImpl*>(writer.get())
                    ->drop_packets_for_testing());
 
-  // The first packet in the chunk should have the previous_packet_dropped flag
-  // set, so shouldn't be empty.
+  // The first packet in the chunk should have the previous_packet_dropped
+  // flag set, so shouldn't be empty.
   EXPECT_GT(packet4->Finalize(), 0u);
 
   // Flushing the writer causes the chunk to be released again.
@@ -325,7 +423,8 @@
   EXPECT_EQ(last_commit_.chunks_to_move()[0].chunk(), 0u);
   EXPECT_THAT(last_commit_.chunks_to_patch(), SizeIs(0));
 
-  // Chunk should contain only |packet4| and not have any continuation flag set.
+  // Chunk should contain only |packet4| and not have any continuation flag
+  // set.
   ASSERT_EQ(abi->GetChunkState(0u, 0u), SharedMemoryABI::kChunkComplete);
   chunk = abi->TryAcquireChunkForReading(0u, 0u);
   ASSERT_TRUE(chunk.is_valid());
@@ -340,8 +439,8 @@
 }
 
 // Verifies that a TraceWriter that is flushed before the SMB is full and then
-// acquires a garbage chunk later recovers and writes a previous_packet_dropped
-// marker into the trace.
+// acquires a garbage chunk later recovers and writes a
+// previous_packet_dropped marker into the trace.
 TEST_P(TraceWriterImplTest, FlushBeforeBufferExhausted) {
   const BufferID kBufId = 42;
   std::unique_ptr<TraceWriter> writer =
@@ -408,8 +507,8 @@
   EXPECT_FALSE(reinterpret_cast<TraceWriterImpl*>(writer.get())
                    ->drop_packets_for_testing());
 
-  // The first packet in the chunk should have the previous_packet_dropped flag
-  // set, so shouldn't be empty.
+  // The first packet in the chunk should have the previous_packet_dropped
+  // flag set, so shouldn't be empty.
   EXPECT_GT(packet4->Finalize(), 0u);
 
   // Flushing the writer causes the chunk to be released again.
@@ -419,7 +518,8 @@
   EXPECT_EQ(last_commit_.chunks_to_move()[0].chunk(), 0u);
   EXPECT_THAT(last_commit_.chunks_to_patch(), SizeIs(0));
 
-  // Chunk should contain only |packet4| and not have any continuation flag set.
+  // Chunk should contain only |packet4| and not have any continuation flag
+  // set.
   ASSERT_EQ(SharedMemoryABI::kChunkComplete, abi->GetChunkState(0u, 0u));
   chunk = abi->TryAcquireChunkForReading(0u, 0u);
   ASSERT_TRUE(chunk.is_valid());
@@ -433,9 +533,9 @@
                SharedMemoryABI::ChunkHeader::kLastPacketContinuesOnNextChunk);
 }
 
-// Regression test that verifies that flushing a TraceWriter while a fragmented
-// packet still has uncommitted patches doesn't hit a DCHECK / crash the writer
-// thread.
+// Regression test that verifies that flushing a TraceWriter while a
+// fragmented packet still has uncommitted patches doesn't hit a DCHECK /
+// crash the writer thread.
 TEST_P(TraceWriterImplTest, FlushAfterFragmentingPacketWhileBufferExhausted) {
   const BufferID kBufId = 42;
   std::unique_ptr<TraceWriter> writer =
@@ -472,8 +572,8 @@
   arbiter_->FlushPendingCommitDataRequests();
   ASSERT_THAT(last_commit_.chunks_to_move(), SizeIs(2));
 
-  // Flushing should succeed, even though some patches are still in the writer's
-  // patch list.
+  // Flushing should succeed, even though some patches are still in the
+  // writer's patch list.
   packet2->Finalize();
   writer->Flush();
 }