Split large TracePackets over IPC
Allows to send TracePackets that are larger than the
IPC buffer size, by separating its chunks over several
messages. This relies on the fact that a packet's slice
will be << IPC buffer size, because of paging/chunking of
the SharedMemoryABI.
Bug: 76155349
Change-Id: Ia658bd349de17151a2b095b45e5c2cd4259a3b30
diff --git a/include/perfetto/ipc/basic_types.h b/include/perfetto/ipc/basic_types.h
index 1d8612f..10c4dc7 100644
--- a/include/perfetto/ipc/basic_types.h
+++ b/include/perfetto/ipc/basic_types.h
@@ -17,6 +17,7 @@
#ifndef INCLUDE_PERFETTO_IPC_BASIC_TYPES_H_
#define INCLUDE_PERFETTO_IPC_BASIC_TYPES_H_
+#include <stddef.h>
#include <stdint.h>
namespace google {
@@ -34,6 +35,10 @@
using ClientID = uint64_t;
using RequestID = uint64_t;
+// This determines the maximum size allowed for an IPC message. Trying to send
+// or receive a larger message will hit DCHECK(s) and auto-disconnect.
+constexpr size_t kIPCBufferSize = 128 * 1024;
+
} // namespace ipc
} // namespace perfetto
diff --git a/include/perfetto/tracing/core/shared_memory_abi.h b/include/perfetto/tracing/core/shared_memory_abi.h
index 4d85f1b..303deff 100644
--- a/include/perfetto/tracing/core/shared_memory_abi.h
+++ b/include/perfetto/tracing/core/shared_memory_abi.h
@@ -142,6 +142,9 @@
class SharedMemoryABI {
public:
+ // This is due to Chunk::size being 16 bits.
+ static constexpr size_t kMaxPageSize = 64 * 1024;
+
// "14" is the max number that can be encoded in a 32 bit atomic word using
// 2 state bits per Chunk and leaving 4 bits for the page layout.
// See PageLayout below.
diff --git a/include/perfetto/tracing/core/slice.h b/include/perfetto/tracing/core/slice.h
index f845ebc..954e58d 100644
--- a/include/perfetto/tracing/core/slice.h
+++ b/include/perfetto/tracing/core/slice.h
@@ -21,6 +21,7 @@
#include <string.h>
#include <memory>
+#include <string>
#include <vector>
#include "perfetto/base/logging.h"
@@ -32,6 +33,11 @@
struct Slice {
Slice() : start(nullptr), size(0) {}
Slice(const void* st, size_t sz) : start(st), size(sz) {}
+
+ // Used to inherit ownership of a buffer from a protobuf via release_str().
+ explicit Slice(std::unique_ptr<std::string> str)
+ : start(&(*str)[0]), size(str->size()), moved_str_data_(std::move(str)) {}
+
Slice(Slice&& other) noexcept = default;
// Create a Slice which owns |size| bytes of memory.
@@ -56,6 +62,7 @@
void operator=(const Slice&) = delete;
std::unique_ptr<uint8_t[]> own_data_;
+ std::unique_ptr<std::string> moved_str_data_;
};
// TODO(primiano): most TracePacket(s) fit in a slice or two. We need something
diff --git a/include/perfetto/tracing/core/trace_packet.h b/include/perfetto/tracing/core/trace_packet.h
index 14c4e0d..0536d70 100644
--- a/include/perfetto/tracing/core/trace_packet.h
+++ b/include/perfetto/tracing/core/trace_packet.h
@@ -88,6 +88,10 @@
size_t size_ = 0; // SUM(slice.size for slice in slices_).
char preamble_[8]; // Deliberately not initialized.
std::unique_ptr<DecodedTracePacket> decoded_packet_;
+
+ // Remember to update the move operators and their unittest if adding new
+ // fields. ConsumerIPCClientImpl::OnReadBuffersResponse() relies on
+ // std::move(TracePacket) to clear up the moved-from instance.
};
} // namespace perfetto
diff --git a/protos/perfetto/ipc/consumer_port.proto b/protos/perfetto/ipc/consumer_port.proto
index abad343..d9b1db5 100644
--- a/protos/perfetto/ipc/consumer_port.proto
+++ b/protos/perfetto/ipc/consumer_port.proto
@@ -77,22 +77,21 @@
message ReadBuffersResponse {
// TODO: uint32 buffer_id = 1;
- // Each streaming reply returns one or more trace packets (see
- // trace_packet.proto).
- // Why "bytes" below? If we just return the full TracePacket object, that will
- // force the Consumer to deserialize it. In many occasions, the Consumer will
- // not consume the TracePacket(s) locally but will just forward them over
- // the network or save them to a file. Deserializing them on-device would be
- // a waste of time, memory and energy.
+ // Each streaming reply returns one or more slices for one or more trace
+ // packets, or even just a portion of it (if it's too big to fit within one
+ // IPC). The returned slices are ordered and contiguous: packets' slices are
+ // not interleaved and slices are sent only once all slices for a packet are
+ // available (i.e. the consumer will never see any gap).
+ message Slice {
+ optional bytes data = 1;
- // TODO: in the past we agreed that a TracePacket can be very large (MBs).
- // However here it will hit the limit of the IPC layer in order to keep
- // the socket buffer bounded. On one side we could upgrade this protocol to
- // support chunks, so we could directly propagate the chunked TracePacket
- // stored in the log buffer. On the other side, this will likely just move
- // the problem on the consumer, that will need larger buffers for reassembly.
- // Perhaps we should just cap the size of a TracePacket to a lower size?
- repeated bytes trace_packets = 2;
+ // When true, this is the last slice for the packet. A ReadBufferResponse
+ // might have no slices marked as |last_slice_for_packet|==true, in the case
+ // of a very large packet that gets chunked into several IPCs (in which case
+ // only the last IPC for the packet will have this flag set).
+ optional bool last_slice_for_packet = 2;
+ }
+ repeated Slice slices = 2;
}
// Arguments for rpc FreeBuffers().
diff --git a/src/ipc/buffered_frame_deserializer.cc b/src/ipc/buffered_frame_deserializer.cc
index 9665398..472b302 100644
--- a/src/ipc/buffered_frame_deserializer.cc
+++ b/src/ipc/buffered_frame_deserializer.cc
@@ -184,6 +184,8 @@
frame.AppendToString(&buf);
const uint32_t payload_size = static_cast<uint32_t>(buf.size() - kHeaderSize);
PERFETTO_DCHECK(payload_size == static_cast<uint32_t>(frame.GetCachedSize()));
+ // Don't send messages larger than what the receiver can handle.
+ PERFETTO_DCHECK(kHeaderSize + payload_size <= kIPCBufferSize);
char header[kHeaderSize];
memcpy(header, base::AssumeLittleEndian(&payload_size), kHeaderSize);
buf.replace(0, kHeaderSize, header, kHeaderSize);
diff --git a/src/ipc/buffered_frame_deserializer.h b/src/ipc/buffered_frame_deserializer.h
index 40e1a64..52a5f5f 100644
--- a/src/ipc/buffered_frame_deserializer.h
+++ b/src/ipc/buffered_frame_deserializer.h
@@ -25,6 +25,7 @@
#include <sys/mman.h>
#include "perfetto/base/page_allocator.h"
+#include "perfetto/ipc/basic_types.h"
namespace perfetto {
namespace ipc {
@@ -81,7 +82,8 @@
size_t size;
};
- explicit BufferedFrameDeserializer(size_t max_capacity = 128 * 1024);
+ // |max_capacity| is overridable only for tests.
+ explicit BufferedFrameDeserializer(size_t max_capacity = kIPCBufferSize);
~BufferedFrameDeserializer();
// This function doesn't really belong here as it does Serialization, unlike
diff --git a/src/ipc/unix_socket.cc b/src/ipc/unix_socket.cc
index 463682b..fa5fc2e 100644
--- a/src/ipc/unix_socket.cc
+++ b/src/ipc/unix_socket.cc
@@ -351,15 +351,16 @@
if (blocking_mode == BlockingMode::kBlocking)
SetBlockingIO(false);
- if (sz >= 0) {
- // There should be no way a non-blocking socket returns < |len|.
- // If the queueing fails, sendmsg() must return -1 + errno = EWOULDBLOCK.
- PERFETTO_CHECK(static_cast<size_t>(sz) == len);
+ if (sz == static_cast<ssize_t>(len)) {
last_error_ = 0;
return true;
}
- if (errno == EAGAIN || errno == EWOULDBLOCK) {
+ // If sendmsg() succeds but the returned size is < |len| it means that the
+ // endpoint disconnected in the middle of the read, and we managed to send
+ // only a portion of the buffer. In this case we should just give up.
+
+ if (sz < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
// A genuine out-of-buffer. The client should retry or give up.
// Man pages specify that EAGAIN and EWOULDBLOCK have the same semantic here
// and clients should check for both.
diff --git a/src/ipc/unix_socket_unittest.cc b/src/ipc/unix_socket_unittest.cc
index 0c07094..056353b 100644
--- a/src/ipc/unix_socket_unittest.cc
+++ b/src/ipc/unix_socket_unittest.cc
@@ -478,6 +478,55 @@
tx_thread.join();
}
+// Regression test for b/76155349 . If the receiver end disconnects while the
+// sender is in the middle of a large send(), the socket should gracefully give
+// up (i.e. Shutdown()) but not crash.
+TEST_F(UnixSocketTest, ReceiverDisconnectsDuringSend) {
+ auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_);
+ ASSERT_TRUE(srv->is_listening());
+ const int kTimeoutMs = 30000;
+
+ auto receive_done = task_runner_.CreateCheckpoint("receive_done");
+ EXPECT_CALL(event_listener_, OnNewIncomingConnection(srv.get(), _))
+ .WillOnce(Invoke([this, receive_done](UnixSocket*, UnixSocket* srv_conn) {
+ EXPECT_CALL(event_listener_, OnDataAvailable(srv_conn))
+ .WillOnce(Invoke([receive_done](UnixSocket* s) {
+ char buf[1024];
+ size_t res = s->Receive(buf, sizeof(buf));
+ ASSERT_EQ(1024u, res);
+ s->Shutdown(false /*notify*/);
+ receive_done();
+ }));
+ }));
+
+ // Perform the blocking send form another thread.
+ std::thread tx_thread([] {
+ base::TestTaskRunner tx_task_runner;
+ MockEventListener tx_events;
+ auto cli = UnixSocket::Connect(kSocketName, &tx_events, &tx_task_runner);
+
+ auto cli_connected = tx_task_runner.CreateCheckpoint("cli_connected");
+ EXPECT_CALL(tx_events, OnConnect(cli.get(), true))
+ .WillOnce(InvokeWithoutArgs(cli_connected));
+ tx_task_runner.RunUntilCheckpoint("cli_connected");
+
+ auto send_done = tx_task_runner.CreateCheckpoint("send_done");
+ // We need a
+ static constexpr size_t kBufSize = 32 * 1024 * 1024;
+ std::unique_ptr<char[]> buf(new char[kBufSize]);
+ tx_task_runner.PostTask([&cli, &buf, send_done] {
+ bool send_res = cli->Send(buf.get(), kBufSize, -1 /*fd*/,
+ UnixSocket::BlockingMode::kBlocking);
+ ASSERT_FALSE(send_res);
+ send_done();
+ });
+
+ tx_task_runner.RunUntilCheckpoint("send_done", kTimeoutMs);
+ });
+ task_runner_.RunUntilCheckpoint("receive_done", kTimeoutMs);
+ tx_thread.join();
+}
+
// TODO(primiano): add a test to check that in the case of a peer sending a fd
// and the other end just doing a recv (without taking it), the fd is closed and
// not left around.
diff --git a/src/tracing/core/service_impl.cc b/src/tracing/core/service_impl.cc
index 552898e..cb22c4f 100644
--- a/src/tracing/core/service_impl.cc
+++ b/src/tracing/core/service_impl.cc
@@ -53,7 +53,6 @@
namespace {
constexpr size_t kDefaultShmSize = 256 * 1024ul;
constexpr size_t kMaxShmSize = 4096 * 1024 * 512ul;
-constexpr size_t kMaxShmPageSizeKb = 16ul;
constexpr size_t kDefaultShmPageSizeKb = base::kPageSize / 1024ul;
constexpr int kMaxBuffersPerConsumer = 128;
constexpr base::TimeMillis kClockSnapshotInterval(10 * 1000);
@@ -730,7 +729,7 @@
producer->shared_buffer_page_size_kb_ = std::min<size_t>(
(producer_config.page_size_kb() == 0) ? kDefaultShmPageSizeKb
: producer_config.page_size_kb(),
- kMaxShmPageSizeKb);
+ SharedMemoryABI::kMaxPageSize);
size_t shm_size =
std::min<size_t>(producer_config.shm_size_kb() * 1024, kMaxShmSize);
diff --git a/src/tracing/core/shared_memory_abi.cc b/src/tracing/core/shared_memory_abi.cc
index 70021a8..da49199 100644
--- a/src/tracing/core/shared_memory_abi.cc
+++ b/src/tracing/core/shared_memory_abi.cc
@@ -51,6 +51,7 @@
constexpr size_t SharedMemoryABI::kNumChunksForLayout[];
constexpr const char* SharedMemoryABI::kChunkStateStr[];
constexpr const size_t SharedMemoryABI::kInvalidPageIdx;
+constexpr const size_t SharedMemoryABI::kMaxPageSize;
SharedMemoryABI::SharedMemoryABI() = default;
@@ -109,9 +110,10 @@
chunk_header.writer_id = -1;
PERFETTO_CHECK(kMaxWriterID <= chunk_header.writer_id);
- PERFETTO_CHECK(page_size >= 4096);
- PERFETTO_CHECK(page_size % 4096 == 0);
- PERFETTO_CHECK(reinterpret_cast<uintptr_t>(start) % 4096 == 0);
+ PERFETTO_CHECK(page_size >= base::kPageSize);
+ PERFETTO_CHECK(page_size <= kMaxPageSize);
+ PERFETTO_CHECK(page_size % base::kPageSize == 0);
+ PERFETTO_CHECK(reinterpret_cast<uintptr_t>(start) % base::kPageSize == 0);
PERFETTO_CHECK(size % page_size == 0);
}
diff --git a/src/tracing/core/trace_packet.cc b/src/tracing/core/trace_packet.cc
index 9ebcc47..5e72695 100644
--- a/src/tracing/core/trace_packet.cc
+++ b/src/tracing/core/trace_packet.cc
@@ -28,8 +28,20 @@
TracePacket::TracePacket() = default;
TracePacket::~TracePacket() = default;
-TracePacket::TracePacket(TracePacket&&) noexcept = default;
-TracePacket& TracePacket::operator=(TracePacket&&) = default;
+TracePacket::TracePacket(TracePacket&& other) noexcept {
+ *this = std::move(other);
+}
+
+TracePacket& TracePacket::operator=(TracePacket&& other) {
+ slices_ = std::move(other.slices_);
+ other.slices_.clear();
+
+ size_ = other.size_;
+ other.size_ = 0;
+
+ decoded_packet_ = std::move(other.decoded_packet_);
+ return *this;
+}
bool TracePacket::Decode() {
if (decoded_packet_)
diff --git a/src/tracing/core/trace_packet_unittest.cc b/src/tracing/core/trace_packet_unittest.cc
index ef5c41b..9e3e4ed 100644
--- a/src/tracing/core/trace_packet_unittest.cc
+++ b/src/tracing/core/trace_packet_unittest.cc
@@ -43,16 +43,6 @@
ASSERT_NE(nullptr, tp.operator->());
ASSERT_EQ(proto.for_testing().str(), tp->for_testing().str());
ASSERT_EQ(proto.for_testing().str(), (*tp).for_testing().str());
-
- // Check move operators.
- TracePacket moved_tp(std::move(tp));
- ASSERT_NE(nullptr, moved_tp.operator->());
- ASSERT_EQ(proto.for_testing().str(), moved_tp->for_testing().str());
-
- TracePacket moved_tp_2;
- moved_tp_2 = std::move(moved_tp);
- ASSERT_NE(nullptr, moved_tp_2.operator->());
- ASSERT_EQ(proto.for_testing().str(), moved_tp_2->for_testing().str());
}
TEST(TracePacketTest, Sliced) {
@@ -131,5 +121,29 @@
ASSERT_EQ(payload, trace.packet(0).for_testing().str());
}
+TEST(TracePacketTest, MoveOperators) {
+ char buf1[5]{};
+ char buf2[7]{};
+
+ TracePacket tp;
+ tp.AddSlice(buf1, sizeof(buf1));
+ tp.AddSlice(buf2, sizeof(buf2));
+ tp.AddSlice(Slice::Allocate(11));
+ tp.AddSlice(Slice(std::unique_ptr<std::string>(new std::string("foobar"))));
+
+ TracePacket moved_tp(std::move(tp));
+ ASSERT_EQ(0u, tp.size());
+ ASSERT_TRUE(tp.slices().empty());
+ ASSERT_EQ(4u, moved_tp.slices().size());
+ ASSERT_EQ(5u + 7u + 11u + 6u, moved_tp.size());
+
+ TracePacket moved_tp_2;
+ moved_tp_2 = std::move(moved_tp);
+ ASSERT_EQ(0u, moved_tp.size());
+ ASSERT_TRUE(moved_tp.slices().empty());
+ ASSERT_EQ(4u, moved_tp_2.slices().size());
+ ASSERT_EQ(5u + 7u + 11u + 6u, moved_tp_2.size());
+}
+
} // namespace
} // namespace perfetto
diff --git a/src/tracing/ipc/consumer/consumer_ipc_client_impl.cc b/src/tracing/ipc/consumer/consumer_ipc_client_impl.cc
index a0db4e5..3b90301 100644
--- a/src/tracing/ipc/consumer/consumer_ipc_client_impl.cc
+++ b/src/tracing/ipc/consumer/consumer_ipc_client_impl.cc
@@ -23,7 +23,6 @@
#include "perfetto/ipc/client.h"
#include "perfetto/tracing/core/consumer.h"
#include "perfetto/tracing/core/trace_config.h"
-#include "perfetto/tracing/core/trace_packet.h"
// TODO(fmayer): Add a test to check to what happens when ConsumerIPCClientImpl
// gets destroyed w.r.t. the Consumer pointer. Also think to lifetime of the
@@ -132,16 +131,15 @@
PERFETTO_DLOG("ReadBuffers() failed");
return;
}
- // TODO(primiano): We have to guarantee that the log buffer stays alive at
- // least as long as these requests are on flights.
std::vector<TracePacket> trace_packets;
- trace_packets.reserve(response->trace_packets().size());
- for (const std::string& bytes : response->trace_packets()) {
- trace_packets.emplace_back();
- trace_packets.back().AddSlice(
- Slice(reinterpret_cast<const void*>(bytes.data()), bytes.size()));
+ for (auto& resp_slice : *response->mutable_slices()) {
+ partial_packet_.AddSlice(
+ Slice(std::unique_ptr<std::string>(resp_slice.release_data())));
+ if (resp_slice.last_slice_for_packet())
+ trace_packets.emplace_back(std::move(partial_packet_));
}
- consumer_->OnTraceData(std::move(trace_packets), response.has_more());
+ if (!trace_packets.empty() || !response.has_more())
+ consumer_->OnTraceData(std::move(trace_packets), response.has_more());
}
void ConsumerIPCClientImpl::FreeBuffers() {
diff --git a/src/tracing/ipc/consumer/consumer_ipc_client_impl.h b/src/tracing/ipc/consumer/consumer_ipc_client_impl.h
index 57038ce..d94b658 100644
--- a/src/tracing/ipc/consumer/consumer_ipc_client_impl.h
+++ b/src/tracing/ipc/consumer/consumer_ipc_client_impl.h
@@ -26,6 +26,7 @@
#include "perfetto/ipc/service_proxy.h"
#include "perfetto/tracing/core/basic_types.h"
#include "perfetto/tracing/core/service.h"
+#include "perfetto/tracing/core/trace_packet.h"
#include "perfetto/tracing/ipc/consumer_ipc_client.h"
#include "perfetto/ipc/consumer_port.ipc.h"
@@ -84,6 +85,13 @@
bool connected_ = false;
+ // When a packet is too big to fit into a ReadBuffersResponse IPC, the service
+ // will chunk it into several IPCs, each containing few slices of the packet
+ // (a packet's slice is always guaranteed to be << kIPCBufferSize). When
+ // chunking happens this field accumulates the slices received until the
+ // one with |last_slice_for_packet| == true is received.
+ TracePacket partial_packet_;
+
base::WeakPtrFactory<ConsumerIPCClientImpl> weak_ptr_factory_;
};
diff --git a/src/tracing/ipc/service/consumer_ipc_service.cc b/src/tracing/ipc/service/consumer_ipc_service.cc
index e84d83b..24643d0 100644
--- a/src/tracing/ipc/service/consumer_ipc_service.cc
+++ b/src/tracing/ipc/service/consumer_ipc_service.cc
@@ -21,8 +21,10 @@
#include "perfetto/base/logging.h"
#include "perfetto/base/scoped_file.h"
#include "perfetto/base/task_runner.h"
+#include "perfetto/ipc/basic_types.h"
#include "perfetto/ipc/host.h"
#include "perfetto/tracing/core/service.h"
+#include "perfetto/tracing/core/shared_memory_abi.h"
#include "perfetto/tracing/core/slice.h"
#include "perfetto/tracing/core/trace_config.h"
#include "perfetto/tracing/core/trace_packet.h"
@@ -120,16 +122,49 @@
return;
auto result = ipc::AsyncResult<protos::ReadBuffersResponse>::Create();
- result.set_has_more(has_more);
- // TODO(primiano): Expose the slices to the Consumer rather than stitching
- // them and wasting cpu time to hide this detail.
+
+ // A TracePacket might be too big to fit into a single IPC message (max
+ // kIPCBufferSize). However a TracePacket is made of slices and each slice
+ // is way smaller than kIPCBufferSize (a slice size is effectively bounded by
+ // the max chunk size of the SharedMemoryABI). When sending a TracePacket,
+ // if its slices don't fit within one IPC, chunk them over several contiguous
+ // IPCs using the |last_slice_for_packet| for glueing on the other side.
+ static_assert(ipc::kIPCBufferSize >= SharedMemoryABI::kMaxPageSize * 2,
+ "kIPCBufferSize too small given the max possible slice size");
+
+ auto send_ipc_reply = [this, &result](bool more) {
+ result.set_has_more(more);
+ read_buffers_response.Resolve(std::move(result));
+ result = ipc::AsyncResult<protos::ReadBuffersResponse>::Create();
+ };
+
+ size_t approx_reply_size = 0;
for (const TracePacket& trace_packet : trace_packets) {
- std::string* dst = result->add_trace_packets();
- dst->reserve(trace_packet.size());
- for (const Slice& slice : trace_packet.slices())
- dst->append(reinterpret_cast<const char*>(slice.start), slice.size);
+ size_t num_slices_left_for_packet = trace_packet.slices().size();
+ for (const Slice& slice : trace_packet.slices()) {
+ // Check if this slice would cause the IPC to overflow its max size and,
+ // if that is the case, split the IPCs. The "16" and "64" below are
+ // over-estimations of, respectively:
+ // 16: the preamble that prefixes each slice (there are 2 x size fields
+ // in the proto + the |last_slice_for_packet| bool).
+ // 64: the overhead of the IPC InvokeMethodReply + wire_protocol's frame.
+ // If these estimations are wrong, BufferedFrameDeserializer::Serialize()
+ // will hit a DCHECK anyways.
+ const size_t approx_slice_size = slice.size + 16;
+ if (approx_reply_size + approx_slice_size > ipc::kIPCBufferSize - 64) {
+ // If we hit this CHECK we got a single slice that is > kIPCBufferSize.
+ PERFETTO_CHECK(result->slices_size() > 0);
+ send_ipc_reply(/*has_more=*/true);
+ approx_reply_size = 0;
+ }
+ approx_reply_size += approx_slice_size;
+
+ auto* res_slice = result->add_slices();
+ res_slice->set_last_slice_for_packet(--num_slices_left_for_packet == 0);
+ res_slice->set_data(slice.start, slice.size);
+ }
}
- read_buffers_response.Resolve(std::move(result));
+ send_ipc_reply(has_more);
}
} // namespace perfetto
diff --git a/test/end_to_end_integrationtest.cc b/test/end_to_end_integrationtest.cc
index daff489..acaeb12 100644
--- a/test/end_to_end_integrationtest.cc
+++ b/test/end_to_end_integrationtest.cc
@@ -174,12 +174,12 @@
ds_config->mutable_for_testing()->set_message_size(kMessageSizeBytes);
// Create the random generator with the same seed.
- std::minstd_rand0 random(kRandomSeed);
+ std::minstd_rand0 rnd_engine(kRandomSeed);
// Create the function to handle packets as they come in.
uint64_t total = 0;
auto on_readback_complete = task_runner.CreateCheckpoint("readback.complete");
- auto on_consumer_data = [&total, &on_readback_complete, &random](
+ auto on_consumer_data = [&total, &on_readback_complete, &rnd_engine](
std::vector<TracePacket> packets, bool has_more) {
for (auto& packet : packets) {
ASSERT_TRUE(packet.Decode());
@@ -188,7 +188,7 @@
ASSERT_TRUE(packet->has_for_testing());
ASSERT_EQ(protos::TracePacket::kTrustedUid,
packet->optional_trusted_uid_case());
- ASSERT_EQ(packet->for_testing().seq_value(), random());
+ ASSERT_EQ(packet->for_testing().seq_value(), rnd_engine());
}
total += packets.size();
@@ -226,4 +226,87 @@
consumer.Disconnect();
}
+TEST(PerfettoTest, VeryLargePackets) {
+ base::TestTaskRunner task_runner;
+
+#if PERFETTO_BUILDFLAG(PERFETTO_START_DAEMONS)
+ TaskRunnerThread service_thread("perfetto.svc");
+ service_thread.Start(std::unique_ptr<ServiceDelegate>(
+ new ServiceDelegate(TEST_PRODUCER_SOCK_NAME, TEST_CONSUMER_SOCK_NAME)));
+#endif
+
+ auto on_producer_enabled = task_runner.CreateCheckpoint("producer.enabled");
+ auto posted_on_producer_enabled = [&task_runner, &on_producer_enabled] {
+ task_runner.PostTask(on_producer_enabled);
+ };
+ TaskRunnerThread producer_thread("perfetto.prd");
+ std::unique_ptr<FakeProducerDelegate> producer_delegate(
+ new FakeProducerDelegate(TEST_PRODUCER_SOCK_NAME,
+ posted_on_producer_enabled));
+ FakeProducerDelegate* producer_delegate_cached = producer_delegate.get();
+ producer_thread.Start(std::move(producer_delegate));
+
+ // Setup the TraceConfig for the consumer.
+ TraceConfig trace_config;
+ trace_config.add_buffers()->set_size_kb(4096 * 10);
+
+ static constexpr int kNumPackets = 5;
+ static constexpr uint32_t kRandomSeed = 42;
+ static constexpr uint32_t kMsgSize = 1024 * 1024 - 42;
+ std::minstd_rand0 rnd_engine(kRandomSeed);
+
+ auto* ds_config = trace_config.add_data_sources()->mutable_config();
+ ds_config->set_name("android.perfetto.FakeProducer");
+ ds_config->set_target_buffer(0);
+ ds_config->mutable_for_testing()->set_seed(kRandomSeed);
+ ds_config->mutable_for_testing()->set_message_count(kNumPackets);
+ ds_config->mutable_for_testing()->set_message_size(kMsgSize);
+
+ auto on_readback_complete = task_runner.CreateCheckpoint("readback.complete");
+ int packets_seen = 0;
+ auto on_consumer_data = [&on_readback_complete, &rnd_engine, &packets_seen](
+ std::vector<TracePacket> packets, bool has_more) {
+ for (auto& packet : packets) {
+ ASSERT_TRUE(packet.Decode());
+ if (!packet->has_for_testing())
+ continue;
+ packets_seen++;
+ ASSERT_EQ(packet->for_testing().seq_value(), rnd_engine());
+ size_t msg_size = packet->for_testing().str().size();
+ ASSERT_EQ(kMsgSize, msg_size);
+ for (size_t i = 0; i < msg_size; i++)
+ ASSERT_EQ(i < msg_size - 1 ? '.' : 0, packet->for_testing().str()[i]);
+ }
+
+ if (!has_more)
+ on_readback_complete();
+ };
+
+ auto on_connect = task_runner.CreateCheckpoint("consumer.connected");
+ FakeConsumer consumer(trace_config, std::move(on_connect),
+ std::move(on_consumer_data), &task_runner);
+
+ consumer.Connect(TEST_CONSUMER_SOCK_NAME);
+ task_runner.RunUntilCheckpoint("consumer.connected");
+
+ consumer.EnableTracing();
+ task_runner.RunUntilCheckpoint("producer.enabled");
+
+ auto on_produced_and_committed =
+ task_runner.CreateCheckpoint("produced.and.committed");
+ auto posted_on_produced_and_committed = [&task_runner,
+ &on_produced_and_committed] {
+ task_runner.PostTask(on_produced_and_committed);
+ };
+ FakeProducer* producer = producer_delegate_cached->producer();
+ producer->ProduceEventBatch(posted_on_produced_and_committed);
+ task_runner.RunUntilCheckpoint("produced.and.committed");
+
+ consumer.ReadTraceData();
+ task_runner.RunUntilCheckpoint("readback.complete");
+ ASSERT_EQ(kNumPackets, packets_seen);
+
+ consumer.Disconnect();
+}
+
} // namespace perfetto
diff --git a/test/fake_producer.cc b/test/fake_producer.cc
index 9dadf52..5b2c0a0 100644
--- a/test/fake_producer.cc
+++ b/test/fake_producer.cc
@@ -40,7 +40,7 @@
PERFETTO_DCHECK_THREAD(thread_checker_);
task_runner_ = task_runner;
endpoint_ = ProducerIPCClient::Connect(
- socket_name, this, "com.google.perfetto.fake_producer", task_runner);
+ socket_name, this, "android.perfetto.FakeProducer", task_runner);
on_create_data_source_instance_ = std::move(on_create_data_source_instance);
}
@@ -78,18 +78,15 @@
void FakeProducer::ProduceEventBatch(std::function<void()> callback) {
task_runner_->PostTask([this, callback] {
PERFETTO_CHECK(trace_writer_);
-
- size_t payload_size = message_size_ - sizeof(uint32_t);
- PERFETTO_CHECK(payload_size >= sizeof(char));
-
+ PERFETTO_CHECK(message_size_ > 1);
std::unique_ptr<char, base::FreeDeleter> payload(
- static_cast<char*>(malloc(payload_size)));
- memset(payload.get(), '.', payload_size);
- payload.get()[payload_size - 1] = 0;
+ static_cast<char*>(malloc(message_size_)));
+ memset(payload.get(), '.', message_size_);
+ payload.get()[message_size_ - 1] = 0;
for (size_t i = 0; i < message_count_; i++) {
auto handle = trace_writer_->NewTracePacket();
handle->set_for_testing()->set_seq_value(rnd_engine_());
- handle->set_for_testing()->set_str(payload.get(), payload_size);
+ handle->set_for_testing()->set_str(payload.get(), message_size_);
}
trace_writer_->Flush(callback);
});