Merge "Check trace packet contents in trace_writer_impl_unittest"
diff --git a/src/tracing/core/trace_writer_impl_unittest.cc b/src/tracing/core/trace_writer_impl_unittest.cc
index c822e50..6750880 100644
--- a/src/tracing/core/trace_writer_impl_unittest.cc
+++ b/src/tracing/core/trace_writer_impl_unittest.cc
@@ -16,10 +16,14 @@
#include "src/tracing/core/trace_writer_impl.h"
+#include <vector>
+
#include "perfetto/ext/base/utils.h"
#include "perfetto/ext/tracing/core/commit_data_request.h"
+#include "perfetto/ext/tracing/core/shared_memory_abi.h"
#include "perfetto/ext/tracing/core/trace_writer.h"
#include "perfetto/ext/tracing/core/tracing_service.h"
+#include "perfetto/protozero/proto_utils.h"
#include "src/base/test/gtest_test_suite.h"
#include "src/base/test/test_task_runner.h"
#include "src/tracing/core/shared_memory_arbiter_impl.h"
@@ -27,12 +31,15 @@
#include "src/tracing/test/mock_producer_endpoint.h"
#include "test/gtest_and_gmock.h"
+#include "protos/perfetto/trace/test_event.gen.h"
#include "protos/perfetto/trace/test_event.pbzero.h"
+#include "protos/perfetto/trace/trace_packet.gen.h"
#include "protos/perfetto/trace/trace_packet.pbzero.h"
namespace perfetto {
namespace {
+using ChunkHeader = SharedMemoryABI::ChunkHeader;
using ::testing::AllOf;
using ::testing::MockFunction;
using ::testing::Ne;
@@ -43,6 +50,14 @@
class TraceWriterImplTest : public AlignedBufferTest {
public:
+ struct PatchKey {
+ uint32_t writer_id;
+ uint32_t chunk_id;
+ bool operator<(const PatchKey& other) const {
+ return std::tie(writer_id, chunk_id) <
+ std::tie(other.writer_id, other.chunk_id);
+ }
+ };
void SetUp() override {
SharedMemoryArbiterImpl::set_default_layout_for_testing(
SharedMemoryABI::PageLayout::kPageDiv4);
@@ -56,6 +71,10 @@
MockProducerEndpoint::CommitDataCallback cb) {
last_commit_ = req;
last_commit_callback_ = cb;
+ for (const CommitDataRequest::ChunkToPatch& c :
+ req.chunks_to_patch()) {
+ patches_[PatchKey{c.writer_id(), c.chunk_id()}] = c.patches();
+ }
});
}
@@ -64,8 +83,97 @@
task_runner_.reset();
}
+ std::vector<uint8_t> CopyPayloadAndApplyPatches(
+ SharedMemoryABI::Chunk& chunk) const {
+ std::vector<uint8_t> copy(chunk.payload_begin(),
+ chunk.payload_begin() + chunk.payload_size());
+ ChunkHeader::Packets p = chunk.header()->packets.load();
+
+ auto it = patches_.find(PatchKey{chunk.header()->writer_id.load(),
+ chunk.header()->chunk_id.load()});
+ if (it == patches_.end()) {
+ EXPECT_FALSE(p.flags & ChunkHeader::kChunkNeedsPatching);
+ return copy;
+ }
+ EXPECT_TRUE(p.flags & ChunkHeader::kChunkNeedsPatching);
+
+ for (const CommitDataRequest::ChunkToPatch::Patch& patch : it->second) {
+ if (patch.offset() + patch.data().size() > copy.size()) {
+ ADD_FAILURE() << "Patch out of bounds";
+ continue;
+ }
+ for (size_t i = 0; i < patch.data().size(); i++) {
+ copy[patch.offset() + i] =
+ reinterpret_cast<const uint8_t*>(patch.data().data())[i];
+ }
+ }
+ return copy;
+ }
+
+ // Extracts trace packets from the shared memory buffer, and returns copies of
+ // them (after applying the patches received). The producer that writes to the
+ // shared memory (i.e. the trace writer) must be destroyed.
+ std::vector<std::string> GetPacketsFromShmemAndPatches() {
+ std::vector<std::string> packets;
+ SharedMemoryABI* abi = arbiter_->shmem_abi_for_testing();
+ bool was_fragmenting = false;
+ for (size_t page_idx = 0; page_idx < abi->num_pages(); page_idx++) {
+ uint32_t page_layout = abi->GetPageLayout(page_idx);
+ size_t num_chunks = SharedMemoryABI::GetNumChunksForLayout(page_layout);
+ for (size_t chunk_idx = 0; chunk_idx < num_chunks; chunk_idx++) {
+ SharedMemoryABI::ChunkState chunk_state =
+ abi->GetChunkState(page_idx, chunk_idx);
+ if (chunk_state != SharedMemoryABI::kChunkFree &&
+ chunk_state != SharedMemoryABI::kChunkComplete) {
+ ADD_FAILURE() << "Page " << page_idx << " chunk " << chunk_idx
+ << " unexpected state: " << chunk_state;
+ continue;
+ }
+ SharedMemoryABI::Chunk chunk =
+ abi->TryAcquireChunkForReading(page_idx, chunk_idx);
+ if (!chunk.is_valid())
+ continue;
+ ChunkHeader::Packets p = chunk.header()->packets.load();
+
+ EXPECT_EQ(
+ was_fragmenting,
+ static_cast<bool>(p.flags &
+ ChunkHeader::kFirstPacketContinuesFromPrevChunk));
+
+ std::vector<uint8_t> payload = CopyPayloadAndApplyPatches(chunk);
+
+ const uint8_t* read_ptr = payload.data();
+ const uint8_t* const end_read_ptr = payload.data() + payload.size();
+
+ size_t num_fragments = p.count;
+ for (; num_fragments && read_ptr < end_read_ptr; num_fragments--) {
+ uint64_t len;
+ read_ptr =
+ protozero::proto_utils::ParseVarInt(read_ptr, end_read_ptr, &len);
+ if (!was_fragmenting || packets.empty()) {
+ packets.push_back(std::string());
+ }
+ if (read_ptr + len > end_read_ptr) {
+ ADD_FAILURE() << "Page " << page_idx << " chunk " << chunk_idx
+ << " malformed chunk";
+ }
+ packets.back().append(reinterpret_cast<const char*>(read_ptr),
+ static_cast<size_t>(len));
+ read_ptr += len;
+ }
+ EXPECT_EQ(num_fragments, 0u);
+ if (p.flags & ChunkHeader::kLastPacketContinuesOnNextChunk) {
+ was_fragmenting = true;
+ }
+ }
+ }
+ return packets;
+ }
+
CommitDataRequest last_commit_;
ProducerEndpoint::CommitDataCallback last_commit_callback_;
+ std::map<PatchKey, std::vector<CommitDataRequest::ChunkToPatch::Patch>>
+ patches_;
NiceMock<MockProducerEndpoint> mock_producer_endpoint_;
std::unique_ptr<base::TestTaskRunner> task_runner_;
@@ -75,7 +183,7 @@
size_t const kPageSizes[] = {4096, 65536};
INSTANTIATE_TEST_SUITE_P(PageSize, TraceWriterImplTest, ValuesIn(kPageSizes));
-TEST_P(TraceWriterImplTest, SingleWriter) {
+TEST_P(TraceWriterImplTest, NewTracePacket) {
const BufferID kBufId = 42;
std::unique_ptr<TraceWriter> writer = arbiter_->CreateTraceWriter(kBufId);
const size_t kNumPackets = 32;
@@ -89,31 +197,21 @@
// and the chunk to be put back in the kChunkComplete state.
writer.reset();
- SharedMemoryABI* abi = arbiter_->shmem_abi_for_testing();
- size_t packets_count = 0;
- for (size_t page_idx = 0; page_idx < abi->num_pages(); page_idx++) {
- uint32_t page_layout = abi->GetPageLayout(page_idx);
- size_t num_chunks = SharedMemoryABI::GetNumChunksForLayout(page_layout);
- for (size_t chunk_idx = 0; chunk_idx < num_chunks; chunk_idx++) {
- auto chunk_state = abi->GetChunkState(page_idx, chunk_idx);
- ASSERT_TRUE(chunk_state == SharedMemoryABI::kChunkFree ||
- chunk_state == SharedMemoryABI::kChunkComplete);
- auto chunk = abi->TryAcquireChunkForReading(page_idx, chunk_idx);
- if (!chunk.is_valid())
- continue;
- packets_count += chunk.header()->packets.load().count;
- }
+ std::vector<std::string> packets = GetPacketsFromShmemAndPatches();
+ ASSERT_THAT(packets, SizeIs(kNumPackets));
+ for (size_t i = 0; i < kNumPackets; i++) {
+ protos::gen::TracePacket packet;
+ packet.ParseFromString(packets[i]);
+ EXPECT_EQ(packet.for_testing().str(), "foobar " + std::to_string(i));
}
- EXPECT_EQ(packets_count, kNumPackets);
- // TODO(primiano): check also the content of the packets decoding the protos.
}
TEST_P(TraceWriterImplTest, FragmentingPacketWithProducerAndServicePatching) {
const BufferID kBufId = 42;
std::unique_ptr<TraceWriter> writer = arbiter_->CreateTraceWriter(kBufId);
- // Write a packet that's guaranteed to span more than a single chunk, but less
- // than two chunks.
+ // Write a packet that's guaranteed to span more than a single chunk, but
+ // less than two chunks.
auto packet = writer->NewTracePacket();
size_t chunk_size = page_size() / 4;
std::string large_string(chunk_size, 'x');
@@ -127,8 +225,8 @@
EXPECT_EQ(last_commit_.chunks_to_move()[0].target_buffer(), kBufId);
EXPECT_THAT(last_commit_.chunks_to_patch(), SizeIs(0));
- // We will simulate a batching cycle by first setting the batching period to a
- // very large value and then force-flushing when we are done writing data.
+ // We will simulate a batching cycle by first setting the batching period to
+ // a very large value and then force-flushing when we are done writing data.
arbiter_->SetDirectSMBPatchingSupportedByService();
ASSERT_TRUE(arbiter_->EnableDirectSMBPatching());
arbiter_->SetBatchCommitsDuration(UINT32_MAX);
@@ -154,8 +252,8 @@
SharedMemoryABI* abi = arbiter_->shmem_abi_for_testing();
// The first allocated chunk should be complete but need patching, since the
- // packet extended past the chunk and no patches for the packet size or string
- // field size were applied yet.
+ // packet extended past the chunk and no patches for the packet size or
+ // string field size were applied yet.
ASSERT_EQ(abi->GetChunkState(0u, 0u), SharedMemoryABI::kChunkComplete);
auto chunk = abi->TryAcquireChunkForReading(0u, 0u);
ASSERT_TRUE(chunk.is_valid());
@@ -193,9 +291,9 @@
}
TEST_P(TraceWriterImplTest, FragmentingPacketWithoutEnablingProducerPatching) {
- // We will simulate a batching cycle by first setting the batching period to a
- // very large value and will force flush to simulate a flush happening when we
- // believe it should - in this case when a patch is encountered.
+ // We will simulate a batching cycle by first setting the batching period to
+ // a very large value and will force flush to simulate a flush happening
+ // when we believe it should - in this case when a patch is encountered.
//
// Note: direct producer-side patching should be disabled by default.
arbiter_->SetBatchCommitsDuration(UINT32_MAX);
@@ -216,8 +314,8 @@
arbiter_->FlushPendingCommitDataRequests();
// The first allocated chunk should be complete but need patching, since the
- // packet extended past the chunk and no patches for the packet size or string
- // field size were applied in the producer.
+ // packet extended past the chunk and no patches for the packet size or
+ // string field size were applied in the producer.
SharedMemoryABI* abi = arbiter_->shmem_abi_for_testing();
ASSERT_EQ(abi->GetChunkState(0u, 0u), SharedMemoryABI::kChunkComplete);
auto chunk = abi->TryAcquireChunkForReading(0u, 0u);
@@ -244,9 +342,9 @@
EXPECT_THAT(last_commit_.chunks_to_patch()[0].patches(), SizeIs(1));
}
-// Sets up a scenario in which the SMB is exhausted and TraceWriter fails to get
-// a new chunk while fragmenting a packet. Verifies that data is dropped until
-// the SMB is freed up and TraceWriter can get a new chunk.
+// Sets up a scenario in which the SMB is exhausted and TraceWriter fails to
+// get a new chunk while fragmenting a packet. Verifies that data is dropped
+// until the SMB is freed up and TraceWriter can get a new chunk.
TEST_P(TraceWriterImplTest, FragmentingPacketWhileBufferExhausted) {
const BufferID kBufId = 42;
std::unique_ptr<TraceWriter> writer =
@@ -268,8 +366,8 @@
->drop_packets_for_testing());
}
- // Write a packet that's guaranteed to span more than a single chunk, causing
- // |writer| to attempt to acquire a new chunk but fail to do so.
+ // Write a packet that's guaranteed to span more than a single chunk,
+ // causing |writer| to attempt to acquire a new chunk but fail to do so.
auto packet2 = writer->NewTracePacket();
size_t chunk_size = page_size() / 4;
std::string large_string(chunk_size, 'x');
@@ -314,8 +412,8 @@
EXPECT_FALSE(reinterpret_cast<TraceWriterImpl*>(writer.get())
->drop_packets_for_testing());
- // The first packet in the chunk should have the previous_packet_dropped flag
- // set, so shouldn't be empty.
+ // The first packet in the chunk should have the previous_packet_dropped
+ // flag set, so shouldn't be empty.
EXPECT_GT(packet4->Finalize(), 0u);
// Flushing the writer causes the chunk to be released again.
@@ -325,7 +423,8 @@
EXPECT_EQ(last_commit_.chunks_to_move()[0].chunk(), 0u);
EXPECT_THAT(last_commit_.chunks_to_patch(), SizeIs(0));
- // Chunk should contain only |packet4| and not have any continuation flag set.
+ // Chunk should contain only |packet4| and not have any continuation flag
+ // set.
ASSERT_EQ(abi->GetChunkState(0u, 0u), SharedMemoryABI::kChunkComplete);
chunk = abi->TryAcquireChunkForReading(0u, 0u);
ASSERT_TRUE(chunk.is_valid());
@@ -340,8 +439,8 @@
}
// Verifies that a TraceWriter that is flushed before the SMB is full and then
-// acquires a garbage chunk later recovers and writes a previous_packet_dropped
-// marker into the trace.
+// acquires a garbage chunk later recovers and writes a
+// previous_packet_dropped marker into the trace.
TEST_P(TraceWriterImplTest, FlushBeforeBufferExhausted) {
const BufferID kBufId = 42;
std::unique_ptr<TraceWriter> writer =
@@ -408,8 +507,8 @@
EXPECT_FALSE(reinterpret_cast<TraceWriterImpl*>(writer.get())
->drop_packets_for_testing());
- // The first packet in the chunk should have the previous_packet_dropped flag
- // set, so shouldn't be empty.
+ // The first packet in the chunk should have the previous_packet_dropped
+ // flag set, so shouldn't be empty.
EXPECT_GT(packet4->Finalize(), 0u);
// Flushing the writer causes the chunk to be released again.
@@ -419,7 +518,8 @@
EXPECT_EQ(last_commit_.chunks_to_move()[0].chunk(), 0u);
EXPECT_THAT(last_commit_.chunks_to_patch(), SizeIs(0));
- // Chunk should contain only |packet4| and not have any continuation flag set.
+ // Chunk should contain only |packet4| and not have any continuation flag
+ // set.
ASSERT_EQ(SharedMemoryABI::kChunkComplete, abi->GetChunkState(0u, 0u));
chunk = abi->TryAcquireChunkForReading(0u, 0u);
ASSERT_TRUE(chunk.is_valid());
@@ -433,9 +533,9 @@
SharedMemoryABI::ChunkHeader::kLastPacketContinuesOnNextChunk);
}
-// Regression test that verifies that flushing a TraceWriter while a fragmented
-// packet still has uncommitted patches doesn't hit a DCHECK / crash the writer
-// thread.
+// Regression test that verifies that flushing a TraceWriter while a
+// fragmented packet still has uncommitted patches doesn't hit a DCHECK /
+// crash the writer thread.
TEST_P(TraceWriterImplTest, FlushAfterFragmentingPacketWhileBufferExhausted) {
const BufferID kBufId = 42;
std::unique_ptr<TraceWriter> writer =
@@ -472,8 +572,8 @@
arbiter_->FlushPendingCommitDataRequests();
ASSERT_THAT(last_commit_.chunks_to_move(), SizeIs(2));
- // Flushing should succeed, even though some patches are still in the writer's
- // patch list.
+ // Flushing should succeed, even though some patches are still in the
+ // writer's patch list.
packet2->Finalize();
writer->Flush();
}