Split large TracePackets over IPC

Allows to send TracePackets that are larger than the
IPC buffer size, by separating its chunks over several
messages. This relies on the fact that a packet's slice
will be << IPC buffer size, because of paging/chunking of
the SharedMemoryABI.

Bug: 76155349
Change-Id: Ia658bd349de17151a2b095b45e5c2cd4259a3b30
diff --git a/include/perfetto/ipc/basic_types.h b/include/perfetto/ipc/basic_types.h
index 1d8612f..10c4dc7 100644
--- a/include/perfetto/ipc/basic_types.h
+++ b/include/perfetto/ipc/basic_types.h
@@ -17,6 +17,7 @@
 #ifndef INCLUDE_PERFETTO_IPC_BASIC_TYPES_H_
 #define INCLUDE_PERFETTO_IPC_BASIC_TYPES_H_
 
+#include <stddef.h>
 #include <stdint.h>
 
 namespace google {
@@ -34,6 +35,10 @@
 using ClientID = uint64_t;
 using RequestID = uint64_t;
 
+// This determines the maximum size allowed for an IPC message. Trying to send
+// or receive a larger message will hit DCHECK(s) and auto-disconnect.
+constexpr size_t kIPCBufferSize = 128 * 1024;
+
 }  // namespace ipc
 }  // namespace perfetto
 
diff --git a/include/perfetto/tracing/core/shared_memory_abi.h b/include/perfetto/tracing/core/shared_memory_abi.h
index 4d85f1b..303deff 100644
--- a/include/perfetto/tracing/core/shared_memory_abi.h
+++ b/include/perfetto/tracing/core/shared_memory_abi.h
@@ -142,6 +142,9 @@
 
 class SharedMemoryABI {
  public:
+  // This is due to Chunk::size being 16 bits.
+  static constexpr size_t kMaxPageSize = 64 * 1024;
+
   // "14" is the max number that can be encoded in a 32 bit atomic word using
   // 2 state bits per Chunk and leaving 4 bits for the page layout.
   // See PageLayout below.
diff --git a/include/perfetto/tracing/core/slice.h b/include/perfetto/tracing/core/slice.h
index f845ebc..954e58d 100644
--- a/include/perfetto/tracing/core/slice.h
+++ b/include/perfetto/tracing/core/slice.h
@@ -21,6 +21,7 @@
 #include <string.h>
 
 #include <memory>
+#include <string>
 #include <vector>
 
 #include "perfetto/base/logging.h"
@@ -32,6 +33,11 @@
 struct Slice {
   Slice() : start(nullptr), size(0) {}
   Slice(const void* st, size_t sz) : start(st), size(sz) {}
+
+  // Used to inherit ownership of a buffer from a protobuf via release_str().
+  explicit Slice(std::unique_ptr<std::string> str)
+      : start(&(*str)[0]), size(str->size()), moved_str_data_(std::move(str)) {}
+
   Slice(Slice&& other) noexcept = default;
 
   // Create a Slice which owns |size| bytes of memory.
@@ -56,6 +62,7 @@
   void operator=(const Slice&) = delete;
 
   std::unique_ptr<uint8_t[]> own_data_;
+  std::unique_ptr<std::string> moved_str_data_;
 };
 
 // TODO(primiano): most TracePacket(s) fit in a slice or two. We need something
diff --git a/include/perfetto/tracing/core/trace_packet.h b/include/perfetto/tracing/core/trace_packet.h
index 14c4e0d..0536d70 100644
--- a/include/perfetto/tracing/core/trace_packet.h
+++ b/include/perfetto/tracing/core/trace_packet.h
@@ -88,6 +88,10 @@
   size_t size_ = 0;   // SUM(slice.size for slice in slices_).
   char preamble_[8];  // Deliberately not initialized.
   std::unique_ptr<DecodedTracePacket> decoded_packet_;
+
+  // Remember to update the move operators and their unittest if adding new
+  // fields. ConsumerIPCClientImpl::OnReadBuffersResponse() relies on
+  // std::move(TracePacket) to clear up the moved-from instance.
 };
 
 }  // namespace perfetto
diff --git a/protos/perfetto/ipc/consumer_port.proto b/protos/perfetto/ipc/consumer_port.proto
index abad343..d9b1db5 100644
--- a/protos/perfetto/ipc/consumer_port.proto
+++ b/protos/perfetto/ipc/consumer_port.proto
@@ -77,22 +77,21 @@
 message ReadBuffersResponse {
   // TODO: uint32 buffer_id = 1;
 
-  // Each streaming reply returns one or more trace packets (see
-  // trace_packet.proto).
-  // Why "bytes" below? If we just return the full TracePacket object, that will
-  // force the Consumer to deserialize it. In many occasions, the Consumer will
-  // not consume the TracePacket(s) locally but will just forward them over
-  // the network or save them to a file. Deserializing them on-device would be
-  // a waste of time, memory and energy.
+  // Each streaming reply returns one or more slices for one or more trace
+  // packets, or even just a portion of it (if it's too big to fit within one
+  // IPC). The returned slices are ordered and contiguous: packets' slices are
+  // not interleaved and slices are sent only once all slices for a packet are
+  // available (i.e. the consumer will never see any gap).
+  message Slice {
+    optional bytes data = 1;
 
-  // TODO: in the past we agreed that a TracePacket can be very large (MBs).
-  // However here it will hit the limit of the IPC layer in order to keep
-  // the socket buffer bounded. On one side we could upgrade this protocol to
-  // support chunks, so we could directly propagate the chunked TracePacket
-  // stored in the log buffer. On the other side, this will likely just move
-  // the problem on the consumer, that will need larger buffers for reassembly.
-  // Perhaps we should just cap the size of a TracePacket to a lower size?
-  repeated bytes trace_packets = 2;
+    // When true, this is the last slice for the packet. A ReadBufferResponse
+    // might have no slices marked as |last_slice_for_packet|==true, in the case
+    // of a very large packet that gets chunked into several IPCs (in which case
+    // only the last IPC for the packet will have this flag set).
+    optional bool last_slice_for_packet = 2;
+  }
+  repeated Slice slices = 2;
 }
 
 // Arguments for rpc FreeBuffers().
diff --git a/src/ipc/buffered_frame_deserializer.cc b/src/ipc/buffered_frame_deserializer.cc
index 9665398..472b302 100644
--- a/src/ipc/buffered_frame_deserializer.cc
+++ b/src/ipc/buffered_frame_deserializer.cc
@@ -184,6 +184,8 @@
   frame.AppendToString(&buf);
   const uint32_t payload_size = static_cast<uint32_t>(buf.size() - kHeaderSize);
   PERFETTO_DCHECK(payload_size == static_cast<uint32_t>(frame.GetCachedSize()));
+  // Don't send messages larger than what the receiver can handle.
+  PERFETTO_DCHECK(kHeaderSize + payload_size <= kIPCBufferSize);
   char header[kHeaderSize];
   memcpy(header, base::AssumeLittleEndian(&payload_size), kHeaderSize);
   buf.replace(0, kHeaderSize, header, kHeaderSize);
diff --git a/src/ipc/buffered_frame_deserializer.h b/src/ipc/buffered_frame_deserializer.h
index 40e1a64..52a5f5f 100644
--- a/src/ipc/buffered_frame_deserializer.h
+++ b/src/ipc/buffered_frame_deserializer.h
@@ -25,6 +25,7 @@
 #include <sys/mman.h>
 
 #include "perfetto/base/page_allocator.h"
+#include "perfetto/ipc/basic_types.h"
 
 namespace perfetto {
 namespace ipc {
@@ -81,7 +82,8 @@
     size_t size;
   };
 
-  explicit BufferedFrameDeserializer(size_t max_capacity = 128 * 1024);
+  // |max_capacity| is overridable only for tests.
+  explicit BufferedFrameDeserializer(size_t max_capacity = kIPCBufferSize);
   ~BufferedFrameDeserializer();
 
   // This function doesn't really belong here as it does Serialization, unlike
diff --git a/src/ipc/unix_socket.cc b/src/ipc/unix_socket.cc
index 463682b..fa5fc2e 100644
--- a/src/ipc/unix_socket.cc
+++ b/src/ipc/unix_socket.cc
@@ -351,15 +351,16 @@
   if (blocking_mode == BlockingMode::kBlocking)
     SetBlockingIO(false);
 
-  if (sz >= 0) {
-    // There should be no way a non-blocking socket returns < |len|.
-    // If the queueing fails, sendmsg() must return -1 + errno = EWOULDBLOCK.
-    PERFETTO_CHECK(static_cast<size_t>(sz) == len);
+  if (sz == static_cast<ssize_t>(len)) {
     last_error_ = 0;
     return true;
   }
 
-  if (errno == EAGAIN || errno == EWOULDBLOCK) {
+  // If sendmsg() succeds but the returned size is < |len| it means that the
+  // endpoint disconnected in the middle of the read, and we managed to send
+  // only a portion of the buffer. In this case we should just give up.
+
+  if (sz < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
     // A genuine out-of-buffer. The client should retry or give up.
     // Man pages specify that EAGAIN and EWOULDBLOCK have the same semantic here
     // and clients should check for both.
diff --git a/src/ipc/unix_socket_unittest.cc b/src/ipc/unix_socket_unittest.cc
index 0c07094..056353b 100644
--- a/src/ipc/unix_socket_unittest.cc
+++ b/src/ipc/unix_socket_unittest.cc
@@ -478,6 +478,55 @@
   tx_thread.join();
 }
 
+// Regression test for b/76155349 . If the receiver end disconnects while the
+// sender is in the middle of a large send(), the socket should gracefully give
+// up (i.e. Shutdown()) but not crash.
+TEST_F(UnixSocketTest, ReceiverDisconnectsDuringSend) {
+  auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_);
+  ASSERT_TRUE(srv->is_listening());
+  const int kTimeoutMs = 30000;
+
+  auto receive_done = task_runner_.CreateCheckpoint("receive_done");
+  EXPECT_CALL(event_listener_, OnNewIncomingConnection(srv.get(), _))
+      .WillOnce(Invoke([this, receive_done](UnixSocket*, UnixSocket* srv_conn) {
+        EXPECT_CALL(event_listener_, OnDataAvailable(srv_conn))
+            .WillOnce(Invoke([receive_done](UnixSocket* s) {
+              char buf[1024];
+              size_t res = s->Receive(buf, sizeof(buf));
+              ASSERT_EQ(1024u, res);
+              s->Shutdown(false /*notify*/);
+              receive_done();
+            }));
+      }));
+
+  // Perform the blocking send form another thread.
+  std::thread tx_thread([] {
+    base::TestTaskRunner tx_task_runner;
+    MockEventListener tx_events;
+    auto cli = UnixSocket::Connect(kSocketName, &tx_events, &tx_task_runner);
+
+    auto cli_connected = tx_task_runner.CreateCheckpoint("cli_connected");
+    EXPECT_CALL(tx_events, OnConnect(cli.get(), true))
+        .WillOnce(InvokeWithoutArgs(cli_connected));
+    tx_task_runner.RunUntilCheckpoint("cli_connected");
+
+    auto send_done = tx_task_runner.CreateCheckpoint("send_done");
+    // We need a
+    static constexpr size_t kBufSize = 32 * 1024 * 1024;
+    std::unique_ptr<char[]> buf(new char[kBufSize]);
+    tx_task_runner.PostTask([&cli, &buf, send_done] {
+      bool send_res = cli->Send(buf.get(), kBufSize, -1 /*fd*/,
+                                UnixSocket::BlockingMode::kBlocking);
+      ASSERT_FALSE(send_res);
+      send_done();
+    });
+
+    tx_task_runner.RunUntilCheckpoint("send_done", kTimeoutMs);
+  });
+  task_runner_.RunUntilCheckpoint("receive_done", kTimeoutMs);
+  tx_thread.join();
+}
+
 // TODO(primiano): add a test to check that in the case of a peer sending a fd
 // and the other end just doing a recv (without taking it), the fd is closed and
 // not left around.
diff --git a/src/tracing/core/service_impl.cc b/src/tracing/core/service_impl.cc
index 552898e..cb22c4f 100644
--- a/src/tracing/core/service_impl.cc
+++ b/src/tracing/core/service_impl.cc
@@ -53,7 +53,6 @@
 namespace {
 constexpr size_t kDefaultShmSize = 256 * 1024ul;
 constexpr size_t kMaxShmSize = 4096 * 1024 * 512ul;
-constexpr size_t kMaxShmPageSizeKb = 16ul;
 constexpr size_t kDefaultShmPageSizeKb = base::kPageSize / 1024ul;
 constexpr int kMaxBuffersPerConsumer = 128;
 constexpr base::TimeMillis kClockSnapshotInterval(10 * 1000);
@@ -730,7 +729,7 @@
     producer->shared_buffer_page_size_kb_ = std::min<size_t>(
         (producer_config.page_size_kb() == 0) ? kDefaultShmPageSizeKb
                                               : producer_config.page_size_kb(),
-        kMaxShmPageSizeKb);
+        SharedMemoryABI::kMaxPageSize);
 
     size_t shm_size =
         std::min<size_t>(producer_config.shm_size_kb() * 1024, kMaxShmSize);
diff --git a/src/tracing/core/shared_memory_abi.cc b/src/tracing/core/shared_memory_abi.cc
index 70021a8..da49199 100644
--- a/src/tracing/core/shared_memory_abi.cc
+++ b/src/tracing/core/shared_memory_abi.cc
@@ -51,6 +51,7 @@
 constexpr size_t SharedMemoryABI::kNumChunksForLayout[];
 constexpr const char* SharedMemoryABI::kChunkStateStr[];
 constexpr const size_t SharedMemoryABI::kInvalidPageIdx;
+constexpr const size_t SharedMemoryABI::kMaxPageSize;
 
 SharedMemoryABI::SharedMemoryABI() = default;
 
@@ -109,9 +110,10 @@
   chunk_header.writer_id = -1;
   PERFETTO_CHECK(kMaxWriterID <= chunk_header.writer_id);
 
-  PERFETTO_CHECK(page_size >= 4096);
-  PERFETTO_CHECK(page_size % 4096 == 0);
-  PERFETTO_CHECK(reinterpret_cast<uintptr_t>(start) % 4096 == 0);
+  PERFETTO_CHECK(page_size >= base::kPageSize);
+  PERFETTO_CHECK(page_size <= kMaxPageSize);
+  PERFETTO_CHECK(page_size % base::kPageSize == 0);
+  PERFETTO_CHECK(reinterpret_cast<uintptr_t>(start) % base::kPageSize == 0);
   PERFETTO_CHECK(size % page_size == 0);
 }
 
diff --git a/src/tracing/core/trace_packet.cc b/src/tracing/core/trace_packet.cc
index 9ebcc47..5e72695 100644
--- a/src/tracing/core/trace_packet.cc
+++ b/src/tracing/core/trace_packet.cc
@@ -28,8 +28,20 @@
 TracePacket::TracePacket() = default;
 TracePacket::~TracePacket() = default;
 
-TracePacket::TracePacket(TracePacket&&) noexcept = default;
-TracePacket& TracePacket::operator=(TracePacket&&) = default;
+TracePacket::TracePacket(TracePacket&& other) noexcept {
+  *this = std::move(other);
+}
+
+TracePacket& TracePacket::operator=(TracePacket&& other) {
+  slices_ = std::move(other.slices_);
+  other.slices_.clear();
+
+  size_ = other.size_;
+  other.size_ = 0;
+
+  decoded_packet_ = std::move(other.decoded_packet_);
+  return *this;
+}
 
 bool TracePacket::Decode() {
   if (decoded_packet_)
diff --git a/src/tracing/core/trace_packet_unittest.cc b/src/tracing/core/trace_packet_unittest.cc
index ef5c41b..9e3e4ed 100644
--- a/src/tracing/core/trace_packet_unittest.cc
+++ b/src/tracing/core/trace_packet_unittest.cc
@@ -43,16 +43,6 @@
   ASSERT_NE(nullptr, tp.operator->());
   ASSERT_EQ(proto.for_testing().str(), tp->for_testing().str());
   ASSERT_EQ(proto.for_testing().str(), (*tp).for_testing().str());
-
-  // Check move operators.
-  TracePacket moved_tp(std::move(tp));
-  ASSERT_NE(nullptr, moved_tp.operator->());
-  ASSERT_EQ(proto.for_testing().str(), moved_tp->for_testing().str());
-
-  TracePacket moved_tp_2;
-  moved_tp_2 = std::move(moved_tp);
-  ASSERT_NE(nullptr, moved_tp_2.operator->());
-  ASSERT_EQ(proto.for_testing().str(), moved_tp_2->for_testing().str());
 }
 
 TEST(TracePacketTest, Sliced) {
@@ -131,5 +121,29 @@
   ASSERT_EQ(payload, trace.packet(0).for_testing().str());
 }
 
+TEST(TracePacketTest, MoveOperators) {
+  char buf1[5]{};
+  char buf2[7]{};
+
+  TracePacket tp;
+  tp.AddSlice(buf1, sizeof(buf1));
+  tp.AddSlice(buf2, sizeof(buf2));
+  tp.AddSlice(Slice::Allocate(11));
+  tp.AddSlice(Slice(std::unique_ptr<std::string>(new std::string("foobar"))));
+
+  TracePacket moved_tp(std::move(tp));
+  ASSERT_EQ(0u, tp.size());
+  ASSERT_TRUE(tp.slices().empty());
+  ASSERT_EQ(4u, moved_tp.slices().size());
+  ASSERT_EQ(5u + 7u + 11u + 6u, moved_tp.size());
+
+  TracePacket moved_tp_2;
+  moved_tp_2 = std::move(moved_tp);
+  ASSERT_EQ(0u, moved_tp.size());
+  ASSERT_TRUE(moved_tp.slices().empty());
+  ASSERT_EQ(4u, moved_tp_2.slices().size());
+  ASSERT_EQ(5u + 7u + 11u + 6u, moved_tp_2.size());
+}
+
 }  // namespace
 }  // namespace perfetto
diff --git a/src/tracing/ipc/consumer/consumer_ipc_client_impl.cc b/src/tracing/ipc/consumer/consumer_ipc_client_impl.cc
index a0db4e5..3b90301 100644
--- a/src/tracing/ipc/consumer/consumer_ipc_client_impl.cc
+++ b/src/tracing/ipc/consumer/consumer_ipc_client_impl.cc
@@ -23,7 +23,6 @@
 #include "perfetto/ipc/client.h"
 #include "perfetto/tracing/core/consumer.h"
 #include "perfetto/tracing/core/trace_config.h"
-#include "perfetto/tracing/core/trace_packet.h"
 
 // TODO(fmayer): Add a test to check to what happens when ConsumerIPCClientImpl
 // gets destroyed w.r.t. the Consumer pointer. Also think to lifetime of the
@@ -132,16 +131,15 @@
     PERFETTO_DLOG("ReadBuffers() failed");
     return;
   }
-  // TODO(primiano): We have to guarantee that the log buffer stays alive at
-  // least as long as these requests are on flights.
   std::vector<TracePacket> trace_packets;
-  trace_packets.reserve(response->trace_packets().size());
-  for (const std::string& bytes : response->trace_packets()) {
-    trace_packets.emplace_back();
-    trace_packets.back().AddSlice(
-        Slice(reinterpret_cast<const void*>(bytes.data()), bytes.size()));
+  for (auto& resp_slice : *response->mutable_slices()) {
+    partial_packet_.AddSlice(
+        Slice(std::unique_ptr<std::string>(resp_slice.release_data())));
+    if (resp_slice.last_slice_for_packet())
+      trace_packets.emplace_back(std::move(partial_packet_));
   }
-  consumer_->OnTraceData(std::move(trace_packets), response.has_more());
+  if (!trace_packets.empty() || !response.has_more())
+    consumer_->OnTraceData(std::move(trace_packets), response.has_more());
 }
 
 void ConsumerIPCClientImpl::FreeBuffers() {
diff --git a/src/tracing/ipc/consumer/consumer_ipc_client_impl.h b/src/tracing/ipc/consumer/consumer_ipc_client_impl.h
index 57038ce..d94b658 100644
--- a/src/tracing/ipc/consumer/consumer_ipc_client_impl.h
+++ b/src/tracing/ipc/consumer/consumer_ipc_client_impl.h
@@ -26,6 +26,7 @@
 #include "perfetto/ipc/service_proxy.h"
 #include "perfetto/tracing/core/basic_types.h"
 #include "perfetto/tracing/core/service.h"
+#include "perfetto/tracing/core/trace_packet.h"
 #include "perfetto/tracing/ipc/consumer_ipc_client.h"
 
 #include "perfetto/ipc/consumer_port.ipc.h"
@@ -84,6 +85,13 @@
 
   bool connected_ = false;
 
+  // When a packet is too big to fit into a ReadBuffersResponse IPC, the service
+  // will chunk it into several IPCs, each containing few slices of the packet
+  // (a packet's slice is always guaranteed to be << kIPCBufferSize). When
+  // chunking happens this field accumulates the slices received until the
+  // one with |last_slice_for_packet| == true is received.
+  TracePacket partial_packet_;
+
   base::WeakPtrFactory<ConsumerIPCClientImpl> weak_ptr_factory_;
 };
 
diff --git a/src/tracing/ipc/service/consumer_ipc_service.cc b/src/tracing/ipc/service/consumer_ipc_service.cc
index e84d83b..24643d0 100644
--- a/src/tracing/ipc/service/consumer_ipc_service.cc
+++ b/src/tracing/ipc/service/consumer_ipc_service.cc
@@ -21,8 +21,10 @@
 #include "perfetto/base/logging.h"
 #include "perfetto/base/scoped_file.h"
 #include "perfetto/base/task_runner.h"
+#include "perfetto/ipc/basic_types.h"
 #include "perfetto/ipc/host.h"
 #include "perfetto/tracing/core/service.h"
+#include "perfetto/tracing/core/shared_memory_abi.h"
 #include "perfetto/tracing/core/slice.h"
 #include "perfetto/tracing/core/trace_config.h"
 #include "perfetto/tracing/core/trace_packet.h"
@@ -120,16 +122,49 @@
     return;
 
   auto result = ipc::AsyncResult<protos::ReadBuffersResponse>::Create();
-  result.set_has_more(has_more);
-  // TODO(primiano): Expose the slices to the Consumer rather than stitching
-  // them and wasting cpu time to hide this detail.
+
+  // A TracePacket might be too big to fit into a single IPC message (max
+  // kIPCBufferSize). However a TracePacket is made of slices and each slice
+  // is way smaller than kIPCBufferSize (a slice size is effectively bounded by
+  // the max chunk size of the SharedMemoryABI). When sending a TracePacket,
+  // if its slices don't fit within one IPC, chunk them over several contiguous
+  // IPCs using the |last_slice_for_packet| for glueing on the other side.
+  static_assert(ipc::kIPCBufferSize >= SharedMemoryABI::kMaxPageSize * 2,
+                "kIPCBufferSize too small given the max possible slice size");
+
+  auto send_ipc_reply = [this, &result](bool more) {
+    result.set_has_more(more);
+    read_buffers_response.Resolve(std::move(result));
+    result = ipc::AsyncResult<protos::ReadBuffersResponse>::Create();
+  };
+
+  size_t approx_reply_size = 0;
   for (const TracePacket& trace_packet : trace_packets) {
-    std::string* dst = result->add_trace_packets();
-    dst->reserve(trace_packet.size());
-    for (const Slice& slice : trace_packet.slices())
-      dst->append(reinterpret_cast<const char*>(slice.start), slice.size);
+    size_t num_slices_left_for_packet = trace_packet.slices().size();
+    for (const Slice& slice : trace_packet.slices()) {
+      // Check if this slice would cause the IPC to overflow its max size and,
+      // if that is the case, split the IPCs. The "16" and "64" below are
+      // over-estimations of, respectively:
+      // 16: the preamble that prefixes each slice (there are 2 x size fields
+      //     in the proto + the |last_slice_for_packet| bool).
+      // 64: the overhead of the IPC InvokeMethodReply + wire_protocol's frame.
+      // If these estimations are wrong, BufferedFrameDeserializer::Serialize()
+      // will hit a DCHECK anyways.
+      const size_t approx_slice_size = slice.size + 16;
+      if (approx_reply_size + approx_slice_size > ipc::kIPCBufferSize - 64) {
+        // If we hit this CHECK we got a single slice that is > kIPCBufferSize.
+        PERFETTO_CHECK(result->slices_size() > 0);
+        send_ipc_reply(/*has_more=*/true);
+        approx_reply_size = 0;
+      }
+      approx_reply_size += approx_slice_size;
+
+      auto* res_slice = result->add_slices();
+      res_slice->set_last_slice_for_packet(--num_slices_left_for_packet == 0);
+      res_slice->set_data(slice.start, slice.size);
+    }
   }
-  read_buffers_response.Resolve(std::move(result));
+  send_ipc_reply(has_more);
 }
 
 }  // namespace perfetto
diff --git a/test/end_to_end_integrationtest.cc b/test/end_to_end_integrationtest.cc
index daff489..acaeb12 100644
--- a/test/end_to_end_integrationtest.cc
+++ b/test/end_to_end_integrationtest.cc
@@ -174,12 +174,12 @@
   ds_config->mutable_for_testing()->set_message_size(kMessageSizeBytes);
 
   // Create the random generator with the same seed.
-  std::minstd_rand0 random(kRandomSeed);
+  std::minstd_rand0 rnd_engine(kRandomSeed);
 
   // Create the function to handle packets as they come in.
   uint64_t total = 0;
   auto on_readback_complete = task_runner.CreateCheckpoint("readback.complete");
-  auto on_consumer_data = [&total, &on_readback_complete, &random](
+  auto on_consumer_data = [&total, &on_readback_complete, &rnd_engine](
                               std::vector<TracePacket> packets, bool has_more) {
     for (auto& packet : packets) {
       ASSERT_TRUE(packet.Decode());
@@ -188,7 +188,7 @@
       ASSERT_TRUE(packet->has_for_testing());
       ASSERT_EQ(protos::TracePacket::kTrustedUid,
                 packet->optional_trusted_uid_case());
-      ASSERT_EQ(packet->for_testing().seq_value(), random());
+      ASSERT_EQ(packet->for_testing().seq_value(), rnd_engine());
     }
     total += packets.size();
 
@@ -226,4 +226,87 @@
   consumer.Disconnect();
 }
 
+TEST(PerfettoTest, VeryLargePackets) {
+  base::TestTaskRunner task_runner;
+
+#if PERFETTO_BUILDFLAG(PERFETTO_START_DAEMONS)
+  TaskRunnerThread service_thread("perfetto.svc");
+  service_thread.Start(std::unique_ptr<ServiceDelegate>(
+      new ServiceDelegate(TEST_PRODUCER_SOCK_NAME, TEST_CONSUMER_SOCK_NAME)));
+#endif
+
+  auto on_producer_enabled = task_runner.CreateCheckpoint("producer.enabled");
+  auto posted_on_producer_enabled = [&task_runner, &on_producer_enabled] {
+    task_runner.PostTask(on_producer_enabled);
+  };
+  TaskRunnerThread producer_thread("perfetto.prd");
+  std::unique_ptr<FakeProducerDelegate> producer_delegate(
+      new FakeProducerDelegate(TEST_PRODUCER_SOCK_NAME,
+                               posted_on_producer_enabled));
+  FakeProducerDelegate* producer_delegate_cached = producer_delegate.get();
+  producer_thread.Start(std::move(producer_delegate));
+
+  // Setup the TraceConfig for the consumer.
+  TraceConfig trace_config;
+  trace_config.add_buffers()->set_size_kb(4096 * 10);
+
+  static constexpr int kNumPackets = 5;
+  static constexpr uint32_t kRandomSeed = 42;
+  static constexpr uint32_t kMsgSize = 1024 * 1024 - 42;
+  std::minstd_rand0 rnd_engine(kRandomSeed);
+
+  auto* ds_config = trace_config.add_data_sources()->mutable_config();
+  ds_config->set_name("android.perfetto.FakeProducer");
+  ds_config->set_target_buffer(0);
+  ds_config->mutable_for_testing()->set_seed(kRandomSeed);
+  ds_config->mutable_for_testing()->set_message_count(kNumPackets);
+  ds_config->mutable_for_testing()->set_message_size(kMsgSize);
+
+  auto on_readback_complete = task_runner.CreateCheckpoint("readback.complete");
+  int packets_seen = 0;
+  auto on_consumer_data = [&on_readback_complete, &rnd_engine, &packets_seen](
+                              std::vector<TracePacket> packets, bool has_more) {
+    for (auto& packet : packets) {
+      ASSERT_TRUE(packet.Decode());
+      if (!packet->has_for_testing())
+        continue;
+      packets_seen++;
+      ASSERT_EQ(packet->for_testing().seq_value(), rnd_engine());
+      size_t msg_size = packet->for_testing().str().size();
+      ASSERT_EQ(kMsgSize, msg_size);
+      for (size_t i = 0; i < msg_size; i++)
+        ASSERT_EQ(i < msg_size - 1 ? '.' : 0, packet->for_testing().str()[i]);
+    }
+
+    if (!has_more)
+      on_readback_complete();
+  };
+
+  auto on_connect = task_runner.CreateCheckpoint("consumer.connected");
+  FakeConsumer consumer(trace_config, std::move(on_connect),
+                        std::move(on_consumer_data), &task_runner);
+
+  consumer.Connect(TEST_CONSUMER_SOCK_NAME);
+  task_runner.RunUntilCheckpoint("consumer.connected");
+
+  consumer.EnableTracing();
+  task_runner.RunUntilCheckpoint("producer.enabled");
+
+  auto on_produced_and_committed =
+      task_runner.CreateCheckpoint("produced.and.committed");
+  auto posted_on_produced_and_committed = [&task_runner,
+                                           &on_produced_and_committed] {
+    task_runner.PostTask(on_produced_and_committed);
+  };
+  FakeProducer* producer = producer_delegate_cached->producer();
+  producer->ProduceEventBatch(posted_on_produced_and_committed);
+  task_runner.RunUntilCheckpoint("produced.and.committed");
+
+  consumer.ReadTraceData();
+  task_runner.RunUntilCheckpoint("readback.complete");
+  ASSERT_EQ(kNumPackets, packets_seen);
+
+  consumer.Disconnect();
+}
+
 }  // namespace perfetto
diff --git a/test/fake_producer.cc b/test/fake_producer.cc
index 9dadf52..5b2c0a0 100644
--- a/test/fake_producer.cc
+++ b/test/fake_producer.cc
@@ -40,7 +40,7 @@
   PERFETTO_DCHECK_THREAD(thread_checker_);
   task_runner_ = task_runner;
   endpoint_ = ProducerIPCClient::Connect(
-      socket_name, this, "com.google.perfetto.fake_producer", task_runner);
+      socket_name, this, "android.perfetto.FakeProducer", task_runner);
   on_create_data_source_instance_ = std::move(on_create_data_source_instance);
 }
 
@@ -78,18 +78,15 @@
 void FakeProducer::ProduceEventBatch(std::function<void()> callback) {
   task_runner_->PostTask([this, callback] {
     PERFETTO_CHECK(trace_writer_);
-
-    size_t payload_size = message_size_ - sizeof(uint32_t);
-    PERFETTO_CHECK(payload_size >= sizeof(char));
-
+    PERFETTO_CHECK(message_size_ > 1);
     std::unique_ptr<char, base::FreeDeleter> payload(
-        static_cast<char*>(malloc(payload_size)));
-    memset(payload.get(), '.', payload_size);
-    payload.get()[payload_size - 1] = 0;
+        static_cast<char*>(malloc(message_size_)));
+    memset(payload.get(), '.', message_size_);
+    payload.get()[message_size_ - 1] = 0;
     for (size_t i = 0; i < message_count_; i++) {
       auto handle = trace_writer_->NewTracePacket();
       handle->set_for_testing()->set_seq_value(rnd_engine_());
-      handle->set_for_testing()->set_str(payload.get(), payload_size);
+      handle->set_for_testing()->set_str(payload.get(), message_size_);
     }
     trace_writer_->Flush(callback);
   });