Introduce TraceWriter and SharedMemoryArbiter for Producer(s)

These classes allow a Producer to get writer instances to allow
writing from several threads sharing the same per-Producer
SharedMemoryBuffer.

Test: perfetto_tests --gtest_filter=*SharedMemoryArbiter*
Test: perfetto_tests --gtest_filter=*TraceWriterImpl*
Bug: 70284518
Bug: 68854243
Change-Id: I56c4c699d9722bd34a8c5b32f514c6faa9cb95c5
diff --git a/Android.bp b/Android.bp
index 1379d31..c5411f1 100644
--- a/Android.bp
+++ b/Android.bp
@@ -169,6 +169,24 @@
   ],
 }
 
+// GN target: //protos:zero_gen
+genrule {
+  name: "perfetto_protos_zero_gen",
+  srcs: [
+    "protos/test_event.proto",
+    "protos/trace_packet.proto",
+  ],
+  tools: [
+    "aprotoc",
+    "perfetto_src_protozero_protoc_plugin_protoc_plugin___gn_standalone_toolchain_gcc_like_host_",
+  ],
+  cmd: "mkdir -p $(genDir)/external/perfetto && $(location aprotoc) --cpp_out=$(genDir)/external/perfetto --proto_path=external/perfetto --plugin=protoc-gen-plugin=$(location perfetto_src_protozero_protoc_plugin_protoc_plugin___gn_standalone_toolchain_gcc_like_host_) --plugin_out=wrapper_namespace=pbzero:$(genDir)/external/perfetto $(in)",
+  out: [
+    "external/perfetto/protos/test_event.pbzero.cc",
+    "external/perfetto/protos/trace_packet.pbzero.cc",
+  ],
+}
+
 // GN target: //src/ipc:test_messages_gen
 genrule {
   name: "perfetto_src_ipc_test_messages_gen",
@@ -196,6 +214,7 @@
 genrule {
   name: "perfetto_protos_lite_gen_headers",
   srcs: [
+    "protos/test_event.proto",
     "protos/trace_packet.proto",
   ],
   tools: [
@@ -203,6 +222,7 @@
   ],
   cmd: "mkdir -p $(genDir)/external/perfetto && $(location aprotoc) --cpp_out=$(genDir)/external/perfetto --proto_path=external/perfetto $(in)",
   out: [
+    "external/perfetto/protos/test_event.pb.h",
     "external/perfetto/protos/trace_packet.pb.h",
   ],
   export_include_dirs: [
@@ -225,6 +245,44 @@
   ],
 }
 
+// GN target: //protos:zero_gen
+genrule {
+  name: "perfetto_protos_zero_gen_headers",
+  srcs: [
+    "protos/test_event.proto",
+    "protos/trace_packet.proto",
+  ],
+  tools: [
+    "aprotoc",
+    "perfetto_src_protozero_protoc_plugin_protoc_plugin___gn_standalone_toolchain_gcc_like_host_",
+  ],
+  cmd: "mkdir -p $(genDir)/external/perfetto && $(location aprotoc) --cpp_out=$(genDir)/external/perfetto --proto_path=external/perfetto --plugin=protoc-gen-plugin=$(location perfetto_src_protozero_protoc_plugin_protoc_plugin___gn_standalone_toolchain_gcc_like_host_) --plugin_out=wrapper_namespace=pbzero:$(genDir)/external/perfetto $(in)",
+  out: [
+    "external/perfetto/protos/test_event.pbzero.h",
+    "external/perfetto/protos/trace_packet.pbzero.h",
+  ],
+  export_include_dirs: [
+    ".",
+  ],
+}
+
+// GN target: //protos:lite_gen
+genrule {
+  name: "perfetto_protos_lite_gen",
+  srcs: [
+    "protos/test_event.proto",
+    "protos/trace_packet.proto",
+  ],
+  tools: [
+    "aprotoc",
+  ],
+  cmd: "mkdir -p $(genDir)/external/perfetto && $(location aprotoc) --cpp_out=$(genDir)/external/perfetto --proto_path=external/perfetto $(in)",
+  out: [
+    "external/perfetto/protos/test_event.pb.cc",
+    "external/perfetto/protos/trace_packet.pb.cc",
+  ],
+}
+
 // GN target: //src/ftrace_reader:ftrace_reader_test_messages_zero_gen
 genrule {
   name: "perfetto_src_ftrace_reader_ftrace_reader_test_messages_zero_gen_headers",
@@ -244,21 +302,6 @@
   ],
 }
 
-// GN target: //protos:lite_gen
-genrule {
-  name: "perfetto_protos_lite_gen",
-  srcs: [
-    "protos/trace_packet.proto",
-  ],
-  tools: [
-    "aprotoc",
-  ],
-  cmd: "mkdir -p $(genDir)/external/perfetto && $(location aprotoc) --cpp_out=$(genDir)/external/perfetto --proto_path=external/perfetto $(in)",
-  out: [
-    "external/perfetto/protos/trace_packet.pb.cc",
-  ],
-}
-
 // GN target: //protos/ftrace:zero_gen
 genrule {
   name: "perfetto_protos_ftrace_zero_gen",
@@ -328,6 +371,7 @@
     ":perfetto_protos_lite_gen",
     ":perfetto_protos_tracing_service_lite_gen",
     ":perfetto_protos_tracing_service_tracing_service_gen",
+    ":perfetto_protos_zero_gen",
     ":perfetto_src_ftrace_reader_ftrace_reader_test_messages_lite_gen",
     ":perfetto_src_ftrace_reader_ftrace_reader_test_messages_zero_gen",
     ":perfetto_src_ipc_test_messages_gen",
@@ -388,15 +432,20 @@
     "src/tracing/core/service_impl_unittest.cc",
     "src/tracing/core/shared_memory_abi.cc",
     "src/tracing/core/shared_memory_abi_unittest.cc",
+    "src/tracing/core/shared_memory_arbiter.cc",
+    "src/tracing/core/shared_memory_arbiter_unittest.cc",
     "src/tracing/core/trace_config.cc",
     "src/tracing/core/trace_packet.cc",
     "src/tracing/core/trace_packet_unittest.cc",
+    "src/tracing/core/trace_writer_impl.cc",
+    "src/tracing/core/trace_writer_impl_unittest.cc",
     "src/tracing/ipc/consumer/consumer_ipc_client_impl.cc",
     "src/tracing/ipc/posix_shared_memory.cc",
     "src/tracing/ipc/posix_shared_memory_unittest.cc",
     "src/tracing/ipc/producer/producer_ipc_client_impl.cc",
     "src/tracing/ipc/service/producer_ipc_service.cc",
     "src/tracing/ipc/service/service_ipc_host_impl.cc",
+    "src/tracing/test/aligned_buffer_test.cc",
     "src/tracing/test/test_shared_memory.cc",
     "tools/sanitizers_unittests/sanitizers_unittest.cc",
   ],
@@ -414,6 +463,7 @@
     "perfetto_protos_lite_gen_headers",
     "perfetto_protos_tracing_service_lite_gen_headers",
     "perfetto_protos_tracing_service_tracing_service_gen_headers",
+    "perfetto_protos_zero_gen_headers",
     "perfetto_src_ftrace_reader_ftrace_reader_test_messages_lite_gen_headers",
     "perfetto_src_ftrace_reader_ftrace_reader_test_messages_zero_gen_headers",
     "perfetto_src_ipc_test_messages_gen_headers",
diff --git a/include/perfetto/tracing/core/BUILD.gn b/include/perfetto/tracing/core/BUILD.gn
index 8d83c55..1f895ac 100644
--- a/include/perfetto/tracing/core/BUILD.gn
+++ b/include/perfetto/tracing/core/BUILD.gn
@@ -28,5 +28,6 @@
     "shared_memory_abi.h",
     "trace_config.h",
     "trace_packet.h",
+    "trace_writer.h",
   ]
 }
diff --git a/include/perfetto/tracing/core/trace_writer.h b/include/perfetto/tracing/core/trace_writer.h
new file mode 100644
index 0000000..8b64b3e
--- /dev/null
+++ b/include/perfetto/tracing/core/trace_writer.h
@@ -0,0 +1,67 @@
+/*
+ * Copyright (C) 2017 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef INCLUDE_PERFETTO_TRACING_CORE_TRACE_WRITER_H_
+#define INCLUDE_PERFETTO_TRACING_CORE_TRACE_WRITER_H_
+
+#include "perfetto/protozero/protozero_message_handle.h"
+
+namespace perfetto {
+
+namespace protos {
+namespace pbzero {
+class TracePacket;
+}  // namespace pbzero
+}  // namespace protos
+
+// This is a single-thread write interface that allows to write protobufs
+// directly into the tracing shared buffer without making any copies.
+// It takes care of acquiring and releasing chunks from the
+// SharedMemoryArbiter and splitting protos over chunks.
+// The idea is that each data source creates one (or more) TraceWriter for each
+// thread it wants to write from. Each TraceWriter will get its own dedicated
+// chunk and will write into the shared buffer without any locking most of the
+// time. Locking will happen only when a chunk is exhausted and a new one is
+// acquired from the arbiter.
+
+// TODO: TraceWriter needs to keep the shared memory buffer alive (refcount?).
+// Otherwise if the shared memory buffer goes away (e.g. the Service crashes)
+// the TraceWriter will keep writing into unmapped memory.
+
+class TraceWriter {
+ public:
+  using TracePacketHandle =
+      protozero::ProtoZeroMessageHandle<protos::pbzero::TracePacket>;
+
+  TraceWriter();
+  virtual ~TraceWriter();
+
+  // Returns a handle to the root proto message for the trace. The message will
+  // be finalized either by calling directly handle.Finalize() or by letting the
+  // handle go out of scope. The returned handle can be std::move()'d but cannot
+  // be used after either: (i) the TraceWriter instance is destroyed, (ii) a
+  // subsequence NewTracePacket() call is made on the same TraceWriter instance.
+  virtual TracePacketHandle NewTracePacket() = 0;
+  virtual WriterID writer_id() const = 0;
+
+ private:
+  TraceWriter(const TraceWriter&) = delete;
+  TraceWriter& operator=(const TraceWriter&) = delete;
+};
+
+}  // namespace perfetto
+
+#endif  // INCLUDE_PERFETTO_TRACING_CORE_TRACE_WRITER_H_
diff --git a/protos/BUILD.gn b/protos/BUILD.gn
index 077d57d..3ba9e45 100644
--- a/protos/BUILD.gn
+++ b/protos/BUILD.gn
@@ -16,7 +16,10 @@
 import("../gn/perfetto.gni")
 import("../gn/proto_library.gni")
 
-proto_sources = [ "trace_packet.proto" ]
+proto_sources = [
+  "test_event.proto",
+  "trace_packet.proto",
+]
 
 # Protozero generated stubs, for writers.
 protozero_library("zero") {
diff --git a/protos/test_event.proto b/protos/test_event.proto
new file mode 100644
index 0000000..bac6322
--- /dev/null
+++ b/protos/test_event.proto
@@ -0,0 +1,22 @@
+/*
+ * Copyright (C) 2017 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto2";
+option optimize_for = LITE_RUNTIME;
+
+package perfetto.protos;
+
+message TestEvent {}
diff --git a/protos/trace_packet.proto b/protos/trace_packet.proto
index 5db2b1b..e59e991 100644
--- a/protos/trace_packet.proto
+++ b/protos/trace_packet.proto
@@ -18,12 +18,16 @@
 option optimize_for = LITE_RUNTIME;
 
 import "protos/ftrace/ftrace_event_bundle.proto";
+import "protos/test_event.proto";
 
 package perfetto.protos;
 
 // The root object emitted by Perfetto. A perfetto trace is just a stream of
 // TracePacket(s).
 message TracePacket {
-  oneof data { FtraceEventBundle ftrace_events = 1; }
+  oneof data {
+    FtraceEventBundle ftrace_events = 1;
+    TestEvent test_event = 536870911;  // 2^29 - 1, max field id for protos.
+  }
   optional string test = 2;
 }
diff --git a/src/tracing/BUILD.gn b/src/tracing/BUILD.gn
index 33697e5..bbfa964 100644
--- a/src/tracing/BUILD.gn
+++ b/src/tracing/BUILD.gn
@@ -23,6 +23,7 @@
   deps = [
     "../../gn:default_deps",
     "../../protos:lite",
+    "../../protos:zero",
     "../base",
   ]
   sources = [
@@ -35,8 +36,12 @@
     "core/service_impl.cc",
     "core/service_impl.h",
     "core/shared_memory_abi.cc",
+    "core/shared_memory_arbiter.cc",
+    "core/shared_memory_arbiter.h",
     "core/trace_config.cc",
     "core/trace_packet.cc",
+    "core/trace_writer_impl.cc",
+    "core/trace_writer_impl.h",
   ]
 }
 
@@ -76,6 +81,7 @@
     "../../gn:default_deps",
     "../../gn:gtest_deps",
     "../../protos:lite",
+    "../../protos:zero",
     "../base",
     "../base:test_support",
   ]
@@ -84,8 +90,12 @@
     "core/id_allocator_unittest.cc",
     "core/service_impl_unittest.cc",
     "core/shared_memory_abi_unittest.cc",
+    "core/shared_memory_arbiter_unittest.cc",
     "core/trace_packet_unittest.cc",
+    "core/trace_writer_impl_unittest.cc",
     "ipc/posix_shared_memory_unittest.cc",
+    "test/aligned_buffer_test.cc",
+    "test/aligned_buffer_test.h",
     "test/test_shared_memory.cc",
     "test/test_shared_memory.h",
   ]
diff --git a/src/tracing/core/shared_memory_abi_unittest.cc b/src/tracing/core/shared_memory_abi_unittest.cc
index d460759..dd75534 100644
--- a/src/tracing/core/shared_memory_abi_unittest.cc
+++ b/src/tracing/core/shared_memory_abi_unittest.cc
@@ -17,7 +17,7 @@
 #include "perfetto/tracing/core/shared_memory_abi.h"
 
 #include "gtest/gtest.h"
-#include "perfetto/base/utils.h"
+#include "src/tracing/test/aligned_buffer_test.h"
 
 namespace perfetto {
 namespace {
@@ -26,35 +26,18 @@
 using Chunk = SharedMemoryABI::Chunk;
 using ChunkHeader = SharedMemoryABI::ChunkHeader;
 
-class SharedMemoryABITest : public ::testing::TestWithParam<size_t> {
- public:
-  void SetUp() override {
-    page_size_ = GetParam();
-    buf_size_ = page_size_ * kNumPages;
-    void* mem = nullptr;
-    PERFETTO_CHECK(posix_memalign(&mem, page_size_, buf_size_) == 0);
-    buf_.reset(reinterpret_cast<uint8_t*>(mem));
-    memset(buf_.get(), 0, buf_size_);
-  }
-
-  void TearDown() override { buf_.reset(); }
-
-  const size_t kNumPages = 10;
-  std::unique_ptr<uint8_t, base::FreeDeleter> buf_;
-  size_t buf_size_;
-  size_t page_size_;
-};
+using SharedMemoryABITest = AlignedBufferTest;
 
 size_t const kPageSizes[] = {4096, 8192, 16384, 32768, 65536};
 INSTANTIATE_TEST_CASE_P(PageSize, SharedMemoryABITest, ValuesIn(kPageSizes));
 
 TEST_P(SharedMemoryABITest, NominalCases) {
-  SharedMemoryABI abi(buf_.get(), buf_size_, page_size_);
+  SharedMemoryABI abi(buf(), buf_size(), page_size());
 
-  ASSERT_EQ(buf_.get(), abi.start());
-  ASSERT_EQ(buf_.get() + buf_size_, abi.end());
-  ASSERT_EQ(buf_size_, abi.size());
-  ASSERT_EQ(page_size_, abi.page_size());
+  ASSERT_EQ(buf(), abi.start());
+  ASSERT_EQ(buf() + buf_size(), abi.end());
+  ASSERT_EQ(buf_size(), abi.size());
+  ASSERT_EQ(page_size(), abi.page_size());
   ASSERT_EQ(kNumPages, abi.num_pages());
 
   for (size_t i = 0; i < kNumPages; i++) {
@@ -94,8 +77,8 @@
   uint8_t* last_chunk_end = nullptr;
 
   for (size_t page_idx = 0; page_idx <= 4; page_idx++) {
-    uint8_t* const page_start = buf_.get() + page_idx * page_size_;
-    uint8_t* const page_end = page_start + page_size_;
+    uint8_t* const page_start = buf() + page_idx * page_size();
+    uint8_t* const page_end = page_start + page_size();
     const size_t num_chunks =
         SharedMemoryABI::GetNumChunksForLayout(abi.page_layout_dbg(page_idx));
     const size_t target_buffer = 10 + page_idx;
@@ -132,7 +115,7 @@
 
       // Sanity check chunk bounds.
       size_t expected_chunk_size =
-          (page_size_ - sizeof(SharedMemoryABI::PageHeader)) / num_chunks;
+          (page_size() - sizeof(SharedMemoryABI::PageHeader)) / num_chunks;
       expected_chunk_size = expected_chunk_size - (expected_chunk_size % 4);
       ASSERT_EQ(expected_chunk_size, chunk.size());
       ASSERT_GT(chunk.begin(), page_start);
@@ -224,7 +207,7 @@
 }
 
 TEST_P(SharedMemoryABITest, BatchAcquireAndRelease) {
-  SharedMemoryABI abi(buf_.get(), buf_size_, page_size_);
+  SharedMemoryABI abi(buf(), buf_size(), page_size());
   ChunkHeader header{};
 
   // TryAcquire on a non-partitioned page should fail.
diff --git a/src/tracing/core/shared_memory_arbiter.cc b/src/tracing/core/shared_memory_arbiter.cc
new file mode 100644
index 0000000..e2a059d
--- /dev/null
+++ b/src/tracing/core/shared_memory_arbiter.cc
@@ -0,0 +1,164 @@
+/*
+ * Copyright (C) 2017 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "src/tracing/core/shared_memory_arbiter.h"
+
+#include "perfetto/base/logging.h"
+#include "perfetto/base/task_runner.h"
+#include "src/tracing/core/trace_writer_impl.h"
+
+#include <limits>
+
+namespace perfetto {
+
+using Chunk = SharedMemoryABI::Chunk;
+
+// static
+SharedMemoryABI::PageLayout SharedMemoryArbiter::default_page_layout =
+    SharedMemoryABI::PageLayout::kPageDiv1;
+
+SharedMemoryArbiter::SharedMemoryArbiter(void* start,
+                                         size_t size,
+                                         size_t page_size,
+                                         OnPageCompleteCallback callback,
+                                         base::TaskRunner* task_runner)
+    : task_runner_(task_runner),
+      on_page_complete_callback_(std::move(callback)),
+      shmem_abi_(reinterpret_cast<uint8_t*>(start), size, page_size),
+      active_writer_ids_(SharedMemoryABI::kMaxWriterID + 1) {}
+
+Chunk SharedMemoryArbiter::GetNewChunk(
+    const SharedMemoryABI::ChunkHeader& header,
+    BufferID target_buffer,
+    size_t size_hint) {
+  PERFETTO_DCHECK(size_hint == 0);  // Not implemented yet.
+
+  for (;;) {
+    // TODO(primiano): Probably this lock is not really required and this code
+    // could be rewritten leveraging only the Try* atomic operations in
+    // SharedMemoryABI. But let's not be too adventurous for the moment.
+    {
+      std::lock_guard<std::mutex> scoped_lock(lock_);
+      const size_t initial_page_idx = page_idx_;
+      // TODO(primiano): instead of scanning, we could maintain a bitmap of
+      // free chunks for each |target_buffer| and one for fully free pages.
+      for (size_t i = 0; i < shmem_abi_.num_pages(); i++) {
+        page_idx_ = (initial_page_idx + i) % shmem_abi_.num_pages();
+        bool is_new_page = false;
+
+        // TODO(primiano): make the page layout dynamic.
+        auto layout = SharedMemoryArbiter::default_page_layout;
+
+        if (shmem_abi_.is_page_free(page_idx_)) {
+          // TODO(primiano): Use the |size_hint| here to decide the layout.
+          is_new_page =
+              shmem_abi_.TryPartitionPage(page_idx_, layout, target_buffer);
+        }
+        uint32_t free_chunks;
+        size_t tbuf;
+        if (is_new_page) {
+          free_chunks = (1 << SharedMemoryABI::kNumChunksForLayout[layout]) - 1;
+          tbuf = target_buffer;
+        } else {
+          free_chunks = shmem_abi_.GetFreeChunks(page_idx_);
+
+          // |tbuf| here is advisory only and could change at any point, before
+          // or after this read. The only use of |tbuf| here is to to skip pages
+          // that are more likely to belong to other target_buffers, avoiding
+          // the more epxensive atomic operations in those cases. The
+          // authoritative check on |tbuf| happens atomically in the
+          // TryAcquireChunkForWriting() call below.
+          tbuf = shmem_abi_.page_header(page_idx_)->target_buffer.load(
+              std::memory_order_relaxed);
+        }
+        PERFETTO_DLOG("Free chunks for page %zu: %x. Target buffer: %zu",
+                      page_idx_, free_chunks, tbuf);
+
+        if (tbuf != target_buffer)
+          continue;
+
+        for (uint32_t chunk_idx = 0; free_chunks;
+             chunk_idx++, free_chunks >>= 1) {
+          if (!(free_chunks & 1))
+            continue;
+          // We found a free chunk.
+          Chunk chunk = shmem_abi_.TryAcquireChunkForWriting(
+              page_idx_, chunk_idx, tbuf, &header);
+          if (!chunk.is_valid())
+            continue;
+          PERFETTO_DLOG("Acquired chunk %zu:%u", page_idx_, chunk_idx);
+          return chunk;
+        }
+        // TODO: we should have some policy to guarantee fairness of the SMB
+        // page allocator w.r.t |target_buffer|? Or is the SMB best-effort. All
+        // chunks in the page are busy (either kBeingRead or kBeingWritten), or
+        // all the pages are assigned to a different target buffer. Try with the
+        // next page.
+      }
+    }  // std::lock_guard<std::mutex>
+    // All chunks are taken (either kBeingWritten by us or kBeingRead by the
+    // Service). TODO: at this point we should return a bankrupcy chunk, not
+    // crash the process.
+    PERFETTO_ELOG("Shared memory buffer overrun! Stalling");
+    usleep(250000);
+  }
+}
+
+void SharedMemoryArbiter::ReturnCompletedChunk(Chunk chunk) {
+  bool should_post_callback = false;
+  {
+    std::lock_guard<std::mutex> scoped_lock(lock_);
+    size_t page_index = shmem_abi_.ReleaseChunkAsComplete(std::move(chunk));
+    if (page_index != SharedMemoryABI::kInvalidPageIdx) {
+      should_post_callback = pages_to_notify_.empty();
+      pages_to_notify_.push_back(static_cast<uint32_t>(page_index));
+    }
+  }
+  if (should_post_callback) {
+    // TODO what happens if the arbiter gets destroyed?
+    task_runner_->PostTask(
+        std::bind(&SharedMemoryArbiter::InvokeOnPageCompleteCallback, this));
+  }
+}
+
+// This is always invoked on the |task_runner_| thread.
+void SharedMemoryArbiter::InvokeOnPageCompleteCallback() {
+  std::vector<uint32_t> pages_to_notify;
+  {
+    std::lock_guard<std::mutex> scoped_lock(lock_);
+    pages_to_notify = std::move(pages_to_notify_);
+    pages_to_notify_.clear();
+  }
+  on_page_complete_callback_(pages_to_notify);
+}
+
+std::unique_ptr<TraceWriter> SharedMemoryArbiter::CreateTraceWriter(
+    BufferID target_buffer) {
+  WriterID id;
+  {
+    std::lock_guard<std::mutex> scoped_lock(lock_);
+    id = static_cast<WriterID>(active_writer_ids_.Allocate());
+  }
+  return std::unique_ptr<TraceWriter>(
+      id ? new TraceWriterImpl(this, id, target_buffer) : nullptr);
+}
+
+void SharedMemoryArbiter::ReleaseWriterID(WriterID id) {
+  std::lock_guard<std::mutex> scoped_lock(lock_);
+  active_writer_ids_.Free(id);
+}
+
+}  // namespace perfetto
diff --git a/src/tracing/core/shared_memory_arbiter.h b/src/tracing/core/shared_memory_arbiter.h
new file mode 100644
index 0000000..34267bf
--- /dev/null
+++ b/src/tracing/core/shared_memory_arbiter.h
@@ -0,0 +1,112 @@
+/*
+ * Copyright (C) 2017 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef SRC_TRACING_CORE_SHARED_MEMORY_ARBITER_H_
+#define SRC_TRACING_CORE_SHARED_MEMORY_ARBITER_H_
+
+#include <stdint.h>
+
+#include <functional>
+#include <mutex>
+#include <vector>
+
+#include "perfetto/tracing/core/basic_types.h"
+#include "perfetto/tracing/core/shared_memory_abi.h"
+#include "src/tracing/core/id_allocator.h"
+
+namespace perfetto {
+
+class TraceWriter;
+
+namespace base {
+class TaskRunner;
+}  // namespace base
+
+// This class handles the shared memory buffer on the producer side. It is used
+// to obtain thread-local chunks and to partition pages from several threads.
+// There is one arbiter instance per Producer.
+// This class is thread-safe and uses locks to do so. Data sources are supposed
+// to interact with this sporadically, only when they run out of space on their
+// current thread-local chunk.
+class SharedMemoryArbiter {
+ public:
+  using OnPageCompleteCallback =
+      std::function<void(const std::vector<uint32_t>& /*page_indexes*/)>;
+
+  // Args:
+  // |start|,|size|: boundaries of the shared memory buffer.
+  // |page_size|: a multiple of 4KB that defines the granularity of tracing
+  // pages. See tradeoff considerations in shared_memory_abi.h.
+  // |OnPageCompleteCallback|: a callback that will be posted on the passed
+  // |TaskRunner| when one or more pages are complete (and hence the Producer
+  // should send a NotifySharedMemoryUpdate() to the Service).
+  SharedMemoryArbiter(void* start,
+                      size_t size,
+                      size_t page_size,
+                      OnPageCompleteCallback,
+                      base::TaskRunner*);
+
+  // Creates a new TraceWriter and assigns it a new WriterID. The WriterID is
+  // written in each chunk header owned by a given TraceWriter and is used by
+  // the Service to reconstruct TracePackets written by the same TraceWriter.
+  // Returns nullptr if all WriterID slots are exhausted.
+  // TODO(primiano): instead of nullptr this should return a NoopWriter.
+  std::unique_ptr<TraceWriter> CreateTraceWriter(BufferID target_buffer = 0);
+
+  // Returns a new Chunk to write tracing data. The call always returns a valid
+  // Chunk. TODO(primiano): right now this blocks if there are no free chunks
+  // in the SMB. In the long term the caller should be allowed to pick a policy
+  // and handle the retry itself asynchronously.
+  SharedMemoryABI::Chunk GetNewChunk(const SharedMemoryABI::ChunkHeader&,
+                                     BufferID target_buffer,
+                                     size_t size_hint = 0);
+
+  void ReturnCompletedChunk(SharedMemoryABI::Chunk chunk);
+
+  SharedMemoryABI* shmem_abi_for_testing() { return &shmem_abi_; }
+
+  static void set_default_layout_for_testing(SharedMemoryABI::PageLayout l) {
+    default_page_layout = l;
+  }
+
+ private:
+  friend class TraceWriterImpl;
+
+  static SharedMemoryABI::PageLayout default_page_layout;
+
+  SharedMemoryArbiter(const SharedMemoryArbiter&) = delete;
+  SharedMemoryArbiter& operator=(const SharedMemoryArbiter&) = delete;
+
+  // Called by the TraceWriter destructor.
+  void ReleaseWriterID(WriterID);
+
+  void InvokeOnPageCompleteCallback();
+
+  base::TaskRunner* const task_runner_;
+  OnPageCompleteCallback on_page_complete_callback_;
+
+  // --- Begin lock-protected members ---
+  std::mutex lock_;
+  SharedMemoryABI shmem_abi_;
+  size_t page_idx_ = 0;
+  IdAllocator active_writer_ids_;
+  std::vector<uint32_t> pages_to_notify_;
+  // --- End lock-protected members ---
+};
+
+}  // namespace perfetto
+
+#endif  // SRC_TRACING_CORE_SHARED_MEMORY_ARBITER_H_
diff --git a/src/tracing/core/shared_memory_arbiter_unittest.cc b/src/tracing/core/shared_memory_arbiter_unittest.cc
new file mode 100644
index 0000000..d2b511b
--- /dev/null
+++ b/src/tracing/core/shared_memory_arbiter_unittest.cc
@@ -0,0 +1,174 @@
+/*
+ * Copyright (C) 2017 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "src/tracing/core/shared_memory_arbiter.h"
+
+#include "gtest/gtest.h"
+#include "perfetto/base/utils.h"
+#include "perfetto/tracing/core/basic_types.h"
+#include "perfetto/tracing/core/shared_memory_abi.h"
+#include "perfetto/tracing/core/trace_writer.h"
+#include "src/base/test/test_task_runner.h"
+#include "src/tracing/test/aligned_buffer_test.h"
+
+namespace perfetto {
+namespace {
+
+constexpr size_t kMaxWriterID = SharedMemoryABI::kMaxWriterID;
+
+class SharedMemoryArbiterTest : public AlignedBufferTest {
+ public:
+  void SetUp() override {
+    AlignedBufferTest::SetUp();
+    auto callback = [this](const std::vector<uint32_t>& arg) {
+      if (on_page_complete_)
+        on_page_complete_(arg);
+    };
+    task_runner_.reset(new base::TestTaskRunner());
+    arbiter_.reset(new SharedMemoryArbiter(buf(), buf_size(), page_size(),
+                                           callback, task_runner_.get()));
+  }
+
+  void TearDown() override {
+    arbiter_.reset();
+    task_runner_.reset();
+  }
+
+  std::unique_ptr<base::TestTaskRunner> task_runner_;
+  std::unique_ptr<SharedMemoryArbiter> arbiter_;
+  std::function<void(const std::vector<uint32_t>&)> on_page_complete_;
+};
+
+size_t const kPageSizes[] = {4096, 65536};
+INSTANTIATE_TEST_CASE_P(PageSize,
+                        SharedMemoryArbiterTest,
+                        ::testing::ValuesIn(kPageSizes));
+
+// Checks that chunks that target different buffer IDs are placed in different
+// pages.
+TEST_P(SharedMemoryArbiterTest, ChunksAllocationByTargetBufferID) {
+  SharedMemoryArbiter::set_default_layout_for_testing(
+      SharedMemoryABI::PageLayout::kPageDiv4);
+  SharedMemoryABI::Chunk chunks[8];
+  chunks[0] = arbiter_->GetNewChunk({}, 1 /* target buffer id */, 0);
+  chunks[1] = arbiter_->GetNewChunk({}, 1 /* target buffer id */, 0);
+  chunks[2] = arbiter_->GetNewChunk({}, 1 /* target buffer id */, 0);
+  chunks[3] = arbiter_->GetNewChunk({}, 2 /* target buffer id */, 0);
+  chunks[4] = arbiter_->GetNewChunk({}, 1 /* target buffer id */, 0);
+  chunks[5] = arbiter_->GetNewChunk({}, 1 /* target buffer id */, 0);
+  chunks[6] = arbiter_->GetNewChunk({}, 3 /* target buffer id */, 0);
+  chunks[7] = arbiter_->GetNewChunk({}, 3 /* target buffer id */, 0);
+
+  // "first" == "page index", "second" == "chunk index".
+  std::pair<size_t, size_t> idx[base::ArraySize(chunks)];
+  for (size_t i = 0; i < base::ArraySize(chunks); i++)
+    idx[i] = arbiter_->shmem_abi_for_testing()->GetPageAndChunkIndex(chunks[i]);
+
+  // The first three chunks should lay in the same page, as they target the same
+  // buffer id (1).
+  EXPECT_EQ(idx[0].first, idx[1].first);
+  EXPECT_EQ(idx[0].first, idx[2].first);
+
+  // Check also that the chunk IDs are different.
+  EXPECT_NE(idx[0].second, idx[1].second);
+  EXPECT_NE(idx[1].second, idx[2].second);
+  EXPECT_NE(idx[0].second, idx[2].second);
+
+  // The next one instead should be given a dedicated page because it targets
+  // a different buffer id (2);
+  EXPECT_NE(idx[2].first, idx[3].first);
+
+  // Hoever the next two chunks should be able to fit back into the same page.
+  EXPECT_EQ(idx[4].first, idx[5].first);
+  EXPECT_NE(idx[4].second, idx[5].second);
+
+  // Similarly the last two chunks should be able to share the same page, but
+  // not any page of the previous chunks.
+  EXPECT_NE(idx[0].first, idx[6].first);
+  EXPECT_NE(idx[3].first, idx[6].first);
+  EXPECT_EQ(idx[6].first, idx[7].first);
+  EXPECT_NE(idx[6].second, idx[7].second);
+
+  // TODO(primiano): check that after saturating all the pages, the arbiter
+  // goes back and reuses free chunks of previous pages. e.g., at some point
+  // a chunk targeting buffer id == 1 should be placed into (page:0, chunk:3).
+}
+
+// The buffer has 14 pages (kNumPages), each will be partitioned in 14 chunks.
+// The test requests all 14 * 14 chunks, alternating amongst 14 target buf IDs.
+// Because a chunk can share a page only if all other chunks in the page have
+// the same target buffer ID, there is only one possible final distribution:
+// each page is filled with chunks that all belong to the same buffer ID.
+TEST_P(SharedMemoryArbiterTest, GetAndReturnChunks) {
+  SharedMemoryArbiter::set_default_layout_for_testing(
+      SharedMemoryABI::PageLayout::kPageDiv14);
+  static constexpr size_t kTotChunks = kNumPages * 14;
+  SharedMemoryABI::Chunk chunks[kTotChunks];
+  for (size_t i = 0; i < kTotChunks; i++) {
+    BufferID target_buffer = i % 14;
+    chunks[i] = arbiter_->GetNewChunk({}, target_buffer, 0 /*size_hint*/);
+    ASSERT_TRUE(chunks[i].is_valid());
+  }
+
+  SharedMemoryABI* abi = arbiter_->shmem_abi_for_testing();
+  for (size_t page_idx = 0; page_idx < kNumPages; page_idx++) {
+    ASSERT_FALSE(abi->is_page_free(page_idx));
+    ASSERT_EQ(0u, abi->GetFreeChunks(page_idx));
+    const uint32_t page_layout = abi->page_layout_dbg(page_idx);
+    ASSERT_EQ(14u, SharedMemoryABI::GetNumChunksForLayout(page_layout));
+    ASSERT_EQ(page_idx % 14, abi->page_header(page_idx)->target_buffer.load());
+    for (size_t chunk_idx = 0; chunk_idx < 14; chunk_idx++) {
+      auto chunk = abi->GetChunkUnchecked(page_idx, page_layout, chunk_idx);
+      ASSERT_TRUE(chunk.is_valid());
+    }
+  }
+
+  // Finally return just two pages marking all their chunks as complete, and
+  // check that the notification callback is posted.
+
+  auto on_callback = task_runner_->CreateCheckpoint("on_callback");
+  on_page_complete_ =
+      [on_callback](const std::vector<uint32_t>& completed_pages) {
+        ASSERT_EQ(2u, completed_pages.size());
+        ASSERT_EQ(0u, completed_pages[0]);
+        ASSERT_EQ(3u, completed_pages[1]);
+        on_callback();
+      };
+  for (size_t i = 0; i < 14; i++) {
+    arbiter_->ReturnCompletedChunk(std::move(chunks[14 * i]));
+    arbiter_->ReturnCompletedChunk(std::move(chunks[14 * i + 3]));
+  }
+  task_runner_->RunUntilCheckpoint("on_callback");
+}
+
+// Check that we can actually create up to kMaxWriterID TraceWriter(s).
+TEST_P(SharedMemoryArbiterTest, WriterIDsAllocation) {
+  std::map<WriterID, std::unique_ptr<TraceWriter>> writers;
+  for (size_t i = 0; i < kMaxWriterID; i++) {
+    std::unique_ptr<TraceWriter> writer = arbiter_->CreateTraceWriter(0);
+    ASSERT_TRUE(writer);
+    WriterID writer_id = writer->writer_id();
+    ASSERT_TRUE(writers.emplace(writer_id, std::move(writer)).second);
+  }
+
+  // A further call should fail as we exhausted writer IDs.
+  ASSERT_EQ(nullptr, arbiter_->CreateTraceWriter(0).get());
+}
+
+// TODO(primiano): add multi-threaded tests.
+
+}  // namespace
+}  // namespace perfetto
diff --git a/src/tracing/core/trace_writer_impl.cc b/src/tracing/core/trace_writer_impl.cc
new file mode 100644
index 0000000..a17e543
--- /dev/null
+++ b/src/tracing/core/trace_writer_impl.cc
@@ -0,0 +1,172 @@
+/*
+ * Copyright (C) 2017 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "src/tracing/core/trace_writer_impl.h"
+
+#include <string.h>
+
+#include <type_traits>
+#include <utility>
+
+#include "perfetto/base/logging.h"
+#include "perfetto/protozero/proto_utils.h"
+#include "src/tracing/core/shared_memory_arbiter.h"
+
+#include "protos/trace_packet.pbzero.h"
+
+// TODO(primiano): right now this class is accumulating a patchlist but not
+// sending it to the service.
+
+using protozero::proto_utils::kMessageLengthFieldSize;
+using protozero::proto_utils::WriteRedundantVarInt;
+using ChunkHeader = perfetto::SharedMemoryABI::ChunkHeader;
+
+namespace perfetto {
+
+namespace {
+constexpr size_t kPacketHeaderSize = SharedMemoryABI::kPacketHeaderSize;
+}  // namespace
+
+TraceWriterImpl::TraceWriterImpl(SharedMemoryArbiter* shmem_arbiter,
+                                 WriterID id,
+                                 BufferID target_buffer)
+    : shmem_arbiter_(shmem_arbiter),
+      id_(id),
+      target_buffer_(target_buffer),
+      protobuf_stream_writer_(this) {
+  // TODO(primiano): we could handle the case of running out of TraceWriterID(s)
+  // more gracefully and always return a no-op TracePacket in NewTracePacket().
+  PERFETTO_CHECK(id_ != 0);
+
+  cur_packet_.reset(new protos::pbzero::TracePacket());
+  cur_packet_->Finalize();  // To avoid the DCHECK in NewTracePacket().
+}
+
+TraceWriterImpl::~TraceWriterImpl() {
+  // TODO(primiano): this should also return the current chunk. Add tests.
+  shmem_arbiter_->ReleaseWriterID(id_);
+}
+
+TraceWriterImpl::TracePacketHandle TraceWriterImpl::NewTracePacket() {
+  // If we hit this, the caller is calling NewTracePacket() without having
+  // finalized the previous packet.
+  PERFETTO_DCHECK(cur_packet_->is_finalized());
+
+  fragmenting_packet_ = false;
+
+  // TODO: hack to get a new page every time and reduce fragmentation (that
+  // requires stitching support in the service).
+  protobuf_stream_writer_.Reset(GetNewBuffer());
+
+  // Reserve space for the size of the message. Note: this call might re-enter
+  // into this class invoking GetNewBuffer() if there isn't enough space or if
+  // this is the very first call to NewTracePacket().
+  static_assert(
+      kPacketHeaderSize == kMessageLengthFieldSize,
+      "The packet header must match the ProtoZeroMessage header size");
+  cur_packet_->Reset(&protobuf_stream_writer_);
+  uint8_t* header = protobuf_stream_writer_.ReserveBytes(kPacketHeaderSize);
+  memset(header, 0, kPacketHeaderSize);
+  cur_packet_->set_size_field(header);
+  cur_chunk_.IncrementPacketCount();
+  TracePacketHandle handle(cur_packet_.get());
+  cur_fragment_start_ = protobuf_stream_writer_.write_ptr();
+  fragmenting_packet_ = true;
+  return handle;
+}
+
+// Called by the ProtoZeroMessage. We can get here in two cases:
+// 1. In the middle of writing a ProtoZeroMessage,
+// when |fragmenting_packet_| == true. In this case we want to update the
+// chunk header with a partial packet and start a new partial packet in the
+// new chunk.
+// 2. While calling ReserveBytes() for the packet header in NewTracePacket().
+// In this case |fragmenting_packet_| == false and we just want a new chunk
+// without creating any fragments.
+protozero::ContiguousMemoryRange TraceWriterImpl::GetNewBuffer() {
+  if (fragmenting_packet_) {
+    uint8_t* const wptr = protobuf_stream_writer_.write_ptr();
+    PERFETTO_DCHECK(wptr >= cur_fragment_start_);
+    uint32_t partial_size = static_cast<uint32_t>(wptr - cur_fragment_start_);
+    PERFETTO_DCHECK(partial_size < cur_chunk_.size());
+
+    // Backfill the packet header with the fragment size.
+    cur_packet_->inc_size_already_written(partial_size);
+    cur_chunk_.SetFlag(ChunkHeader::kLastPacketContinuesOnNextChunk);
+    WriteRedundantVarInt(partial_size, cur_packet_->size_field());
+
+    // Descend in the stack of non-finalized nested submessages (if any) and
+    // detour their |size_field| into the |patch_list_|. At this point we have
+    // to release the chunk and they cannot write anymore into that.
+    // TODO(primiano): add tests to cover this logic.
+    for (auto* nested_msg = cur_packet_->nested_message(); nested_msg;
+         nested_msg = nested_msg->nested_message()) {
+      uint8_t* const cur_hdr = nested_msg->size_field();
+      PERFETTO_DCHECK(cur_hdr >= cur_chunk_.payload_begin() &&
+                      cur_hdr + kMessageLengthFieldSize <= cur_chunk_.end());
+      auto cur_hdr_offset = static_cast<uint16_t>(cur_hdr - cur_chunk_.begin());
+      patch_list_.emplace_front(cur_chunk_id_, cur_hdr_offset);
+      Patch& patch = patch_list_.front();
+      nested_msg->set_size_field(patch.size_field);
+      PERFETTO_DLOG("Created new patchlist entry for protobuf nested message");
+    }
+  }
+
+  if (cur_chunk_.is_valid())
+    shmem_arbiter_->ReturnCompletedChunk(std::move(cur_chunk_));
+
+  // Start a new chunk.
+  ChunkHeader::Identifier identifier = {};
+  identifier.writer_id = id_;
+  identifier.chunk_id = cur_chunk_id_++;
+
+  ChunkHeader::PacketsState packets_state = {};
+  if (fragmenting_packet_) {
+    packets_state.count = 1;
+    packets_state.flags = ChunkHeader::kFirstPacketContinuesFromPrevChunk;
+  }
+
+  // The memory order of the stores below doesn't really matter. This |header|
+  // is just a local temporary object. The GetNewChunk() call below will copy it
+  // into the shared buffer with the proper barriers.
+  ChunkHeader header = {};
+  header.identifier.store(identifier, std::memory_order_relaxed);
+  header.packets_state.store(packets_state, std::memory_order_relaxed);
+
+  cur_chunk_ = shmem_arbiter_->GetNewChunk(header, target_buffer_);
+  uint8_t* payload_begin = cur_chunk_.payload_begin();
+  if (fragmenting_packet_) {
+    cur_packet_->set_size_field(payload_begin);
+    memset(payload_begin, 0, kPacketHeaderSize);
+    payload_begin += kPacketHeaderSize;
+    cur_fragment_start_ = payload_begin;
+  }
+
+  return protozero::ContiguousMemoryRange{payload_begin, cur_chunk_.end()};
+}
+
+WriterID TraceWriterImpl::writer_id() const {
+  return id_;
+};
+
+TraceWriterImpl::Patch::Patch(uint16_t cid, uint16_t offset)
+    : chunk_id(cid), offset_in_chunk(offset) {}
+
+// Base class ctor/dtor definition.
+TraceWriter::TraceWriter() = default;
+TraceWriter::~TraceWriter() = default;
+
+}  // namespace perfetto
diff --git a/src/tracing/core/trace_writer_impl.h b/src/tracing/core/trace_writer_impl.h
new file mode 100644
index 0000000..3c7121d
--- /dev/null
+++ b/src/tracing/core/trace_writer_impl.h
@@ -0,0 +1,118 @@
+/*
+ * Copyright (C) 2017 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef SRC_TRACING_CORE_TRACE_WRITER_IMPL_H_
+#define SRC_TRACING_CORE_TRACE_WRITER_IMPL_H_
+
+#include <forward_list>
+
+#include "perfetto/protozero/protozero_message_handle.h"
+#include "perfetto/protozero/scattered_stream_writer.h"
+#include "perfetto/tracing/core/basic_types.h"
+#include "perfetto/tracing/core/shared_memory_abi.h"
+#include "perfetto/tracing/core/trace_writer.h"
+
+namespace perfetto {
+
+class SharedMemoryArbiter;
+
+// See //include/perfetto/tracing/core/trace_writer.h for docs.
+class TraceWriterImpl : public TraceWriter,
+                        public protozero::ScatteredStreamWriter::Delegate {
+ public:
+  // TracePacketHandle is defined in trace_writer.h
+
+  TraceWriterImpl(SharedMemoryArbiter*, WriterID, BufferID);
+  ~TraceWriterImpl() override;
+
+  // TraceWriter implementation. See documentation in trace_writer.h .
+  TracePacketHandle NewTracePacket() override;
+  WriterID writer_id() const override;
+
+ private:
+  // Used to handle the backfilling of the headers (the |size_field|) of nested
+  // messages when a proto is fragmented over several chunks. This patchlist is
+  // sent out of band to the tracing service, after having returned the initial
+  // chunks of the fragment.
+  struct Patch {
+    Patch(uint16_t chunk_id, uint16_t offset);
+    Patch(const Patch&) = delete;
+    Patch& operator=(const Patch&) = delete;
+    Patch(Patch&&) noexcept = delete;
+    Patch& operator=(Patch&&) = delete;
+
+    const uint16_t chunk_id;
+    const uint16_t offset_in_chunk;
+    uint8_t size_field[SharedMemoryABI::kPacketHeaderSize] = {};
+  };
+  TraceWriterImpl(const TraceWriterImpl&) = delete;
+  TraceWriterImpl& operator=(const TraceWriterImpl&) = delete;
+
+  // ScatteredStreamWriter::Delegate implementation.
+  protozero::ContiguousMemoryRange GetNewBuffer() override;
+
+  // The per-producer arbiter that coordinates access to the shared memory
+  // buffer from several threads.
+  SharedMemoryArbiter* const shmem_arbiter_;
+
+  // ID of the current writer.
+  const WriterID id_;
+
+  // This is just copied back into the chunk header.
+  // See comments in data_source_config.proto for |target_buffer|.
+  const BufferID target_buffer_;
+
+  // Monotonic (% wrapping) sequence id of the chunk. Together with the WriterID
+  // this allows the Service to reconstruct the linear sequence of packets.
+  uint16_t cur_chunk_id_ = 0;
+
+  // The chunk we are holding onto (if any).
+  SharedMemoryABI::Chunk cur_chunk_;
+
+  // Passed to protozero message to write directly into |cur_chunk_|. It
+  // keeps track of the write pointer. It calls us back (GetNewBuffer()) when
+  // |cur_chunk_| is filled.
+  protozero::ScatteredStreamWriter protobuf_stream_writer_;
+
+  // The packet returned via NewTracePacket(). Its owned by this class,
+  // TracePacketHandle has just a pointer to it.
+  std::unique_ptr<protos::pbzero::TracePacket> cur_packet_;
+
+  // The start address of |cur_packet_| within |cur_chunk_|. Used to figure out
+  // fragments sizes when a TracePacket write is interrupted by GetNewBuffer().
+  uint8_t* cur_fragment_start_ = nullptr;
+
+  // true if we received a call to GetNewBuffer() after NewTracePacket(),
+  // false if GetNewBuffer() happened during NewTracePacket() prologue, while
+  // starting the TracePacket header.
+  bool fragmenting_packet_ = false;
+
+  // When a packet is fragmented across different chunks, the |size_field| of
+  // the outstanding nested protobuf messages is redirected onto Patch entries
+  // in this list at the time the Chunk is returned (because at that point we
+  // have to release the ownership of the current Chunk). This list will be
+  // later sent out-of-band to the tracing service, who will patch the required
+  // chunks, if they are still around.
+  // Note: the ProtoZeroMessage will take pointers to the |size_field| of these
+  // entries. This container must guarantee that the Patch objects are never
+  // moved around (i.e. cannot be a vector because of reallocations can change
+  // addresses of pre-existing entries).
+  std::forward_list<Patch> patch_list_;
+};
+
+}  // namespace perfetto
+
+#endif  // SRC_TRACING_CORE_TRACE_WRITER_IMPL_H_
diff --git a/src/tracing/core/trace_writer_impl_unittest.cc b/src/tracing/core/trace_writer_impl_unittest.cc
new file mode 100644
index 0000000..15b46a8
--- /dev/null
+++ b/src/tracing/core/trace_writer_impl_unittest.cc
@@ -0,0 +1,90 @@
+/*
+ * Copyright (C) 2017 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "src/tracing/core/trace_writer_impl.h"
+
+#include "gtest/gtest.h"
+#include "perfetto/base/utils.h"
+#include "perfetto/tracing/core/trace_writer.h"
+#include "src/base/test/test_task_runner.h"
+#include "src/tracing/core/shared_memory_arbiter.h"
+#include "src/tracing/test/aligned_buffer_test.h"
+
+#include "protos/trace_packet.pbzero.h"
+
+namespace perfetto {
+namespace {
+
+class TraceWriterImplTest : public AlignedBufferTest {
+ public:
+  void SetUp() override {
+    SharedMemoryArbiter::set_default_layout_for_testing(
+        SharedMemoryABI::PageLayout::kPageDiv4);
+    AlignedBufferTest::SetUp();
+    auto callback = [](const std::vector<uint32_t>& arg) {};
+    task_runner_.reset(new base::TestTaskRunner());
+    arbiter_.reset(new SharedMemoryArbiter(buf(), buf_size(), page_size(),
+                                           callback, task_runner_.get()));
+  }
+
+  void TearDown() override {
+    arbiter_.reset();
+    task_runner_.reset();
+  }
+
+  std::unique_ptr<base::TestTaskRunner> task_runner_;
+  std::unique_ptr<SharedMemoryArbiter> arbiter_;
+  std::function<void(const std::vector<uint32_t>&)> on_page_complete_;
+};
+
+size_t const kPageSizes[] = {4096, 65536};
+INSTANTIATE_TEST_CASE_P(PageSize,
+                        TraceWriterImplTest,
+                        ::testing::ValuesIn(kPageSizes));
+
+TEST_P(TraceWriterImplTest, SingleWriter) {
+  const BufferID tgt_buf_id = 42;
+  std::unique_ptr<TraceWriter> writer = arbiter_->CreateTraceWriter(tgt_buf_id);
+  for (int i = 0; i < 32; i++) {
+    auto packet = writer->NewTracePacket();
+    char str[16];
+    sprintf(str, "foobar %d", i);
+    packet->set_test(str);
+  }
+  writer->NewTracePacket()->set_test("workaround for returing the last chunk");
+  writer.reset();
+
+  SharedMemoryABI* abi = arbiter_->shmem_abi_for_testing();
+  size_t packets_count = 0;
+  for (size_t page_idx = 0; page_idx < kNumPages; page_idx++) {
+    uint32_t page_layout = abi->page_layout_dbg(page_idx);
+    size_t num_chunks = SharedMemoryABI::GetNumChunksForLayout(page_layout);
+    for (size_t chunk_idx = 0; chunk_idx < num_chunks; chunk_idx++) {
+      auto chunk =
+          abi->TryAcquireChunkForReading(page_idx, chunk_idx, tgt_buf_id);
+      if (!chunk.is_valid())
+        continue;
+      packets_count += chunk.header()->packets_state.load().count;
+    }
+  }
+  EXPECT_EQ(32u, packets_count);
+  // TODO(primiano): check also the content of the packets decoding the protos.
+}
+
+// TODO(primiano): add multi-writer test.
+
+}  // namespace
+}  // namespace perfetto
diff --git a/src/tracing/test/aligned_buffer_test.cc b/src/tracing/test/aligned_buffer_test.cc
new file mode 100644
index 0000000..6a603ea
--- /dev/null
+++ b/src/tracing/test/aligned_buffer_test.cc
@@ -0,0 +1,35 @@
+/*
+ * Copyright (C) 2017 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "src/tracing/test/aligned_buffer_test.h"
+
+#include "perfetto/base/logging.h"
+
+namespace perfetto {
+
+// static
+constexpr size_t AlignedBufferTest::kNumPages;
+
+void AlignedBufferTest::SetUp() {
+  page_size_ = GetParam();
+  buf_.reset(new TestSharedMemory(page_size_ * kNumPages));
+}
+
+void AlignedBufferTest::TearDown() {
+  buf_.reset();
+}
+
+}  // namespace perfetto
diff --git a/src/tracing/test/aligned_buffer_test.h b/src/tracing/test/aligned_buffer_test.h
new file mode 100644
index 0000000..23c8b1c
--- /dev/null
+++ b/src/tracing/test/aligned_buffer_test.h
@@ -0,0 +1,51 @@
+/*
+ * Copyright (C) 2017 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef SRC_TRACING_TEST_ALIGNED_BUFFER_TEST_H_
+#define SRC_TRACING_TEST_ALIGNED_BUFFER_TEST_H_
+
+#include <stdlib.h>
+
+#include <memory>
+
+#include "gtest/gtest.h"
+#include "perfetto/base/utils.h"
+#include "src/tracing/test/test_shared_memory.h"
+
+namespace perfetto {
+
+// Base parametrized test for unittests that require an aligned buffer.
+class AlignedBufferTest : public ::testing::TestWithParam<size_t> {
+ public:
+  static constexpr size_t kNumPages = 14;
+  void SetUp() override;
+  void TearDown() override;
+
+  uint8_t* buf() const { return reinterpret_cast<uint8_t*>(buf_->start()); }
+  size_t buf_size() const { return buf_->size(); }
+  size_t page_size() const { return page_size_; }
+
+ private:
+  size_t page_size_ = 0;
+
+  // This doesn't need any sharing. TestSharedMemory just happens to have the
+  // right harness to create a zeroed page-aligned buffer.
+  std::unique_ptr<TestSharedMemory> buf_;
+};
+
+}  // namespace perfetto
+
+#endif  // SRC_TRACING_TEST_ALIGNED_BUFFER_TEST_H_