Merge "perfetto-ui: A number of small ui fixes"
diff --git a/Android.bp b/Android.bp
index ec143a5..f3df9e5 100644
--- a/Android.bp
+++ b/Android.bp
@@ -89,11 +89,13 @@
     "src/tracing/core/shared_memory_arbiter_impl.cc",
     "src/tracing/core/sliced_protobuf_input_stream.cc",
     "src/tracing/core/startup_trace_writer.cc",
+    "src/tracing/core/startup_trace_writer_registry.cc",
     "src/tracing/core/sys_stats_config.cc",
     "src/tracing/core/test_config.cc",
     "src/tracing/core/trace_buffer.cc",
     "src/tracing/core/trace_config.cc",
     "src/tracing/core/trace_packet.cc",
+    "src/tracing/core/trace_stats.cc",
     "src/tracing/core/trace_writer_impl.cc",
     "src/tracing/core/tracing_service_impl.cc",
     "src/tracing/core/virtual_destructors.cc",
@@ -280,11 +282,13 @@
     "src/tracing/core/shared_memory_arbiter_impl.cc",
     "src/tracing/core/sliced_protobuf_input_stream.cc",
     "src/tracing/core/startup_trace_writer.cc",
+    "src/tracing/core/startup_trace_writer_registry.cc",
     "src/tracing/core/sys_stats_config.cc",
     "src/tracing/core/test_config.cc",
     "src/tracing/core/trace_buffer.cc",
     "src/tracing/core/trace_config.cc",
     "src/tracing/core/trace_packet.cc",
+    "src/tracing/core/trace_stats.cc",
     "src/tracing/core/trace_writer_impl.cc",
     "src/tracing/core/tracing_service_impl.cc",
     "src/tracing/core/virtual_destructors.cc",
@@ -424,11 +428,13 @@
     "src/tracing/core/shared_memory_arbiter_impl.cc",
     "src/tracing/core/sliced_protobuf_input_stream.cc",
     "src/tracing/core/startup_trace_writer.cc",
+    "src/tracing/core/startup_trace_writer_registry.cc",
     "src/tracing/core/sys_stats_config.cc",
     "src/tracing/core/test_config.cc",
     "src/tracing/core/trace_buffer.cc",
     "src/tracing/core/trace_config.cc",
     "src/tracing/core/trace_packet.cc",
+    "src/tracing/core/trace_stats.cc",
     "src/tracing/core/trace_writer_impl.cc",
     "src/tracing/core/tracing_service_impl.cc",
     "src/tracing/core/virtual_destructors.cc",
@@ -621,11 +627,13 @@
     "src/tracing/core/shared_memory_arbiter_impl.cc",
     "src/tracing/core/sliced_protobuf_input_stream.cc",
     "src/tracing/core/startup_trace_writer.cc",
+    "src/tracing/core/startup_trace_writer_registry.cc",
     "src/tracing/core/sys_stats_config.cc",
     "src/tracing/core/test_config.cc",
     "src/tracing/core/trace_buffer.cc",
     "src/tracing/core/trace_config.cc",
     "src/tracing/core/trace_packet.cc",
+    "src/tracing/core/trace_stats.cc",
     "src/tracing/core/trace_writer_impl.cc",
     "src/tracing/core/tracing_service_impl.cc",
     "src/tracing/core/virtual_destructors.cc",
@@ -697,6 +705,7 @@
     "protos/perfetto/common/android_log_constants.proto",
     "protos/perfetto/common/commit_data_request.proto",
     "protos/perfetto/common/sys_stats_counters.proto",
+    "protos/perfetto/common/trace_stats.proto",
   ],
   tools: [
     "aprotoc",
@@ -706,6 +715,7 @@
     "external/perfetto/protos/perfetto/common/android_log_constants.pb.cc",
     "external/perfetto/protos/perfetto/common/commit_data_request.pb.cc",
     "external/perfetto/protos/perfetto/common/sys_stats_counters.pb.cc",
+    "external/perfetto/protos/perfetto/common/trace_stats.pb.cc",
   ],
 }
 
@@ -716,6 +726,7 @@
     "protos/perfetto/common/android_log_constants.proto",
     "protos/perfetto/common/commit_data_request.proto",
     "protos/perfetto/common/sys_stats_counters.proto",
+    "protos/perfetto/common/trace_stats.proto",
   ],
   tools: [
     "aprotoc",
@@ -725,6 +736,7 @@
     "external/perfetto/protos/perfetto/common/android_log_constants.pb.h",
     "external/perfetto/protos/perfetto/common/commit_data_request.pb.h",
     "external/perfetto/protos/perfetto/common/sys_stats_counters.pb.h",
+    "external/perfetto/protos/perfetto/common/trace_stats.pb.h",
   ],
   export_include_dirs: [
     "protos",
@@ -738,6 +750,7 @@
     "protos/perfetto/common/android_log_constants.proto",
     "protos/perfetto/common/commit_data_request.proto",
     "protos/perfetto/common/sys_stats_counters.proto",
+    "protos/perfetto/common/trace_stats.proto",
   ],
   tools: [
     "aprotoc",
@@ -748,6 +761,7 @@
     "external/perfetto/protos/perfetto/common/android_log_constants.pbzero.cc",
     "external/perfetto/protos/perfetto/common/commit_data_request.pbzero.cc",
     "external/perfetto/protos/perfetto/common/sys_stats_counters.pbzero.cc",
+    "external/perfetto/protos/perfetto/common/trace_stats.pbzero.cc",
   ],
 }
 
@@ -758,6 +772,7 @@
     "protos/perfetto/common/android_log_constants.proto",
     "protos/perfetto/common/commit_data_request.proto",
     "protos/perfetto/common/sys_stats_counters.proto",
+    "protos/perfetto/common/trace_stats.proto",
   ],
   tools: [
     "aprotoc",
@@ -768,6 +783,7 @@
     "external/perfetto/protos/perfetto/common/android_log_constants.pbzero.h",
     "external/perfetto/protos/perfetto/common/commit_data_request.pbzero.h",
     "external/perfetto/protos/perfetto/common/sys_stats_counters.pbzero.h",
+    "external/perfetto/protos/perfetto/common/trace_stats.pbzero.h",
   ],
   export_include_dirs: [
     "protos",
@@ -1531,7 +1547,6 @@
   name: "perfetto_protos_perfetto_trace_minimal_lite_gen",
   srcs: [
     "protos/perfetto/trace/clock_snapshot.proto",
-    "protos/perfetto/trace/trace_stats.proto",
   ],
   tools: [
     "aprotoc",
@@ -1539,7 +1554,6 @@
   cmd: "mkdir -p $(genDir)/external/perfetto/protos && $(location aprotoc) --cpp_out=$(genDir)/external/perfetto/protos --proto_path=external/perfetto/protos $(in)",
   out: [
     "external/perfetto/protos/perfetto/trace/clock_snapshot.pb.cc",
-    "external/perfetto/protos/perfetto/trace/trace_stats.pb.cc",
   ],
 }
 
@@ -1548,7 +1562,6 @@
   name: "perfetto_protos_perfetto_trace_minimal_lite_gen_headers",
   srcs: [
     "protos/perfetto/trace/clock_snapshot.proto",
-    "protos/perfetto/trace/trace_stats.proto",
   ],
   tools: [
     "aprotoc",
@@ -1556,7 +1569,6 @@
   cmd: "mkdir -p $(genDir)/external/perfetto/protos && $(location aprotoc) --cpp_out=$(genDir)/external/perfetto/protos --proto_path=external/perfetto/protos $(in)",
   out: [
     "external/perfetto/protos/perfetto/trace/clock_snapshot.pb.h",
-    "external/perfetto/protos/perfetto/trace/trace_stats.pb.h",
   ],
   export_include_dirs: [
     "protos",
@@ -1884,7 +1896,6 @@
     "protos/perfetto/trace/test_event.proto",
     "protos/perfetto/trace/trace.proto",
     "protos/perfetto/trace/trace_packet.proto",
-    "protos/perfetto/trace/trace_stats.proto",
   ],
   tools: [
     "aprotoc",
@@ -1896,7 +1907,6 @@
     "external/perfetto/protos/perfetto/trace/test_event.pbzero.cc",
     "external/perfetto/protos/perfetto/trace/trace.pbzero.cc",
     "external/perfetto/protos/perfetto/trace/trace_packet.pbzero.cc",
-    "external/perfetto/protos/perfetto/trace/trace_stats.pbzero.cc",
   ],
 }
 
@@ -1908,7 +1918,6 @@
     "protos/perfetto/trace/test_event.proto",
     "protos/perfetto/trace/trace.proto",
     "protos/perfetto/trace/trace_packet.proto",
-    "protos/perfetto/trace/trace_stats.proto",
   ],
   tools: [
     "aprotoc",
@@ -1920,7 +1929,6 @@
     "external/perfetto/protos/perfetto/trace/test_event.pbzero.h",
     "external/perfetto/protos/perfetto/trace/trace.pbzero.h",
     "external/perfetto/protos/perfetto/trace/trace_packet.pbzero.h",
-    "external/perfetto/protos/perfetto/trace/trace_stats.pbzero.h",
   ],
   export_include_dirs: [
     "protos",
@@ -2344,11 +2352,13 @@
     "src/tracing/core/shared_memory_arbiter_impl.cc",
     "src/tracing/core/sliced_protobuf_input_stream.cc",
     "src/tracing/core/startup_trace_writer.cc",
+    "src/tracing/core/startup_trace_writer_registry.cc",
     "src/tracing/core/sys_stats_config.cc",
     "src/tracing/core/test_config.cc",
     "src/tracing/core/trace_buffer.cc",
     "src/tracing/core/trace_config.cc",
     "src/tracing/core/trace_packet.cc",
+    "src/tracing/core/trace_stats.cc",
     "src/tracing/core/trace_writer_impl.cc",
     "src/tracing/core/tracing_service_impl.cc",
     "src/tracing/core/virtual_destructors.cc",
@@ -2676,6 +2686,7 @@
     "src/tracing/core/sliced_protobuf_input_stream.cc",
     "src/tracing/core/sliced_protobuf_input_stream_unittest.cc",
     "src/tracing/core/startup_trace_writer.cc",
+    "src/tracing/core/startup_trace_writer_registry.cc",
     "src/tracing/core/startup_trace_writer_unittest.cc",
     "src/tracing/core/sys_stats_config.cc",
     "src/tracing/core/test_config.cc",
@@ -2684,6 +2695,7 @@
     "src/tracing/core/trace_config.cc",
     "src/tracing/core/trace_packet.cc",
     "src/tracing/core/trace_packet_unittest.cc",
+    "src/tracing/core/trace_stats.cc",
     "src/tracing/core/trace_writer_for_testing.cc",
     "src/tracing/core/trace_writer_impl.cc",
     "src/tracing/core/trace_writer_impl_unittest.cc",
diff --git a/include/perfetto/tracing/core/BUILD.gn b/include/perfetto/tracing/core/BUILD.gn
index b09026c..d9e375c 100644
--- a/include/perfetto/tracing/core/BUILD.gn
+++ b/include/perfetto/tracing/core/BUILD.gn
@@ -28,8 +28,10 @@
     "shared_memory_arbiter.h",
     "slice.h",
     "startup_trace_writer.h",
+    "startup_trace_writer_registry.h",
     "trace_config.h",
     "trace_packet.h",
+    "trace_stats.h",
     "trace_writer.h",
     "tracing_service.h",
   ]
diff --git a/include/perfetto/tracing/core/consumer.h b/include/perfetto/tracing/core/consumer.h
index e6a2614..6e5e779 100644
--- a/include/perfetto/tracing/core/consumer.h
+++ b/include/perfetto/tracing/core/consumer.h
@@ -27,6 +27,7 @@
 
 class TraceConfig;
 class TracePacket;
+class TraceStats;
 
 class PERFETTO_EXPORT Consumer {
  public:
@@ -68,6 +69,10 @@
   // Called back by the Service (or transport layer) after invoking
   // TracingService::ConsumerEndpoint::Attach().
   virtual void OnAttach(bool success, const TraceConfig&) = 0;
+
+  // Called back by the Service (or transport layer) after invoking
+  // TracingService::ConsumerEndpoint::GetTraceStats().
+  virtual void OnTraceStats(bool success, const TraceStats&) = 0;
 };
 
 }  // namespace perfetto
diff --git a/include/perfetto/tracing/core/shared_memory_arbiter.h b/include/perfetto/tracing/core/shared_memory_arbiter.h
index e285648..5dc4c03 100644
--- a/include/perfetto/tracing/core/shared_memory_arbiter.h
+++ b/include/perfetto/tracing/core/shared_memory_arbiter.h
@@ -35,6 +35,7 @@
 
 class CommitDataRequest;
 class StartupTraceWriter;
+class StartupTraceWriterRegistry;
 class SharedMemory;
 class TraceWriter;
 
@@ -51,14 +52,24 @@
   virtual std::unique_ptr<TraceWriter> CreateTraceWriter(
       BufferID target_buffer) = 0;
 
-  // Binds the provided unbound StartupTraceWriter to a new TraceWriter
-  // associated with the arbiter's SMB. Returns |false| if binding failed
-  // because the writer is concurrently writing data to its temporary buffer. In
-  // this case, the caller should retry (it is free to try again immediately or
-  // schedule a wakeup to retry later).
-  virtual bool BindStartupTraceWriter(StartupTraceWriter* writer,
-                                      BufferID target_buffer)
-      PERFETTO_WARN_UNUSED_RESULT = 0;
+  // Binds the provided unbound StartupTraceWriterRegistry to the arbiter's SMB.
+  // Normally this happens when the perfetto service has been initialized and we
+  // want to rebind all the writers created in the early startup phase.
+  //
+  // All StartupTraceWriters created by the registry are bound to the arbiter
+  // and the given target buffer. The writers may not be bound immediately if
+  // they are concurrently being written to. The registry will retry on the
+  // arbiter's TaskRunner until all writers were bound successfully.
+  //
+  // Should only be called on the passed TaskRunner's sequence. By calling this
+  // method, the registry's ownership is transferred to the arbiter. The arbiter
+  // will delete the registry once all writers were bound.
+  //
+  // TODO(eseckler): Make target buffer assignment more flexible (i.e. per
+  // writer). For now, embedders can use multiple registries instead.
+  virtual void BindStartupTraceWriterRegistry(
+      std::unique_ptr<StartupTraceWriterRegistry>,
+      BufferID target_buffer) = 0;
 
   // Notifies the service that all data for the given FlushRequestID has been
   // committed in the shared memory buffer.
diff --git a/include/perfetto/tracing/core/startup_trace_writer.h b/include/perfetto/tracing/core/startup_trace_writer.h
index b93e2f1..d1ad82c 100644
--- a/include/perfetto/tracing/core/startup_trace_writer.h
+++ b/include/perfetto/tracing/core/startup_trace_writer.h
@@ -19,6 +19,7 @@
 
 #include <memory>
 #include <mutex>
+#include <set>
 #include <vector>
 
 #include "perfetto/base/export.h"
@@ -33,6 +34,7 @@
 namespace perfetto {
 
 class SharedMemoryArbiterImpl;
+class StartupTraceWriterRegistryHandle;
 
 namespace protos {
 namespace pbzero {
@@ -44,8 +46,9 @@
 // when the perfetto service is not available yet.
 //
 // Until the service is available, producer threads instantiate an unbound
-// StartupTraceWriter instance and use it to emit trace events. Each writer will
-// record the serialized trace events into a temporary local memory buffer.
+// StartupTraceWriter instance (via a StartupTraceWriterRegistry) and use it to
+// emit trace events. Each writer will record the serialized trace events into a
+// temporary local memory buffer.
 //
 // Once the service is available, the producer binds each StartupTraceWriter to
 // the SMB by calling SharedMemoryArbiter::BindStartupTraceWriter(). The data in
@@ -66,10 +69,6 @@
     : public TraceWriter,
       public protozero::MessageHandleBase::FinalizationListener {
  public:
-  // Create an unbound StartupTraceWriter that can later be bound by calling
-  // BindToTraceWriter().
-  StartupTraceWriter();
-
   // Create a StartupTraceWriter bound to |trace_writer|. Should only be called
   // on the writer thread.
   explicit StartupTraceWriter(std::unique_ptr<TraceWriter> trace_writer);
@@ -87,19 +86,6 @@
 
   uint64_t written() const override;
 
-  // Bind this StartupTraceWriter to the provided SharedMemoryArbiterImpl.
-  // Called by SharedMemoryArbiterImpl::BindStartupTraceWriter().
-  //
-  // This method can be called on any thread. If any data was written locally
-  // before the writer was bound, BindToArbiter() will copy this data into
-  // chunks in the provided target buffer via the SMB. Any future packets will
-  // be directly written into the SMB via a newly obtained TraceWriter from the
-  // arbiter.
-  //
-  // Will fail and return |false| if a concurrent write is in progress.
-  bool BindToArbiter(SharedMemoryArbiterImpl*,
-                     BufferID target_buffer) PERFETTO_WARN_UNUSED_RESULT;
-
   // Returns |true| if the writer thread has observed that the writer was bound
   // to an SMB. Should only be called on the writer thread.
   //
@@ -116,6 +102,31 @@
   size_t used_buffer_size();
 
  private:
+  friend class StartupTraceWriterRegistry;
+  friend class StartupTraceWriterTest;
+
+  // Create an unbound StartupTraceWriter associated with the registry pointed
+  // to by the handle. The writer can later be bound by calling
+  // BindToTraceWriter(). The registry handle may be nullptr in tests.
+  StartupTraceWriter(std::shared_ptr<StartupTraceWriterRegistryHandle>);
+
+  StartupTraceWriter(const StartupTraceWriter&) = delete;
+  StartupTraceWriter& operator=(const StartupTraceWriter&) = delete;
+
+  // Bind this StartupTraceWriter to the provided SharedMemoryArbiterImpl.
+  // Called by StartupTraceWriterRegistry::BindToArbiter().
+  //
+  // This method can be called on any thread. If any data was written locally
+  // before the writer was bound, BindToArbiter() will copy this data into
+  // chunks in the provided target buffer via the SMB. Any future packets will
+  // be directly written into the SMB via a newly obtained TraceWriter from the
+  // arbiter.
+  //
+  // Will fail and return |false| if a concurrent write is in progress. Returns
+  // |true| if successfully bound and should then not be called again.
+  bool BindToArbiter(SharedMemoryArbiterImpl*,
+                     BufferID target_buffer) PERFETTO_WARN_UNUSED_RESULT;
+
   // protozero::MessageHandleBase::FinalizationListener implementation.
   void OnMessageFinalized(protozero::Message* message) override;
 
@@ -124,6 +135,8 @@
 
   PERFETTO_THREAD_CHECKER(writer_thread_checker_)
 
+  std::shared_ptr<StartupTraceWriterRegistryHandle> registry_handle_;
+
   // Only set and accessed from the writer thread. The writer thread flips this
   // bit when it sees that trace_writer_ is set (while holding the lock).
   // Caching this fact in this variable avoids the need to acquire the lock to
@@ -141,7 +154,6 @@
   std::unique_ptr<protozero::ScatteredStreamWriter> memory_stream_writer_;
 
   std::vector<uint32_t> packet_sizes_;
-  size_t total_payload_size = 0;
 
   // Whether the writer thread is currently writing a TracePacket.
   bool write_in_progress_ = false;
diff --git a/include/perfetto/tracing/core/startup_trace_writer_registry.h b/include/perfetto/tracing/core/startup_trace_writer_registry.h
new file mode 100644
index 0000000..db19862
--- /dev/null
+++ b/include/perfetto/tracing/core/startup_trace_writer_registry.h
@@ -0,0 +1,149 @@
+/*
+ * Copyright (C) 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef INCLUDE_PERFETTO_TRACING_CORE_STARTUP_TRACE_WRITER_REGISTRY_H_
+#define INCLUDE_PERFETTO_TRACING_CORE_STARTUP_TRACE_WRITER_REGISTRY_H_
+
+#include <functional>
+#include <memory>
+#include <mutex>
+#include <set>
+#include <vector>
+
+#include "perfetto/base/export.h"
+#include "perfetto/base/weak_ptr.h"
+#include "perfetto/tracing/core/basic_types.h"
+
+namespace perfetto {
+
+class SharedMemoryArbiterImpl;
+class StartupTraceWriter;
+class StartupTraceWriterRegistry;
+
+namespace base {
+class TaskRunner;
+}  // namespace base
+
+// Notifies the registry about the destruction of a StartupTraceWriter, provided
+// the registry itself wasn't deleted yet. The indirection via the handle is
+// necessary to avoid potential deadlocks caused by lock order inversion. These
+// issues are avoided by locking on the handle's common lock in the destructors
+// of the registry and writer.
+class StartupTraceWriterRegistryHandle {
+ public:
+  explicit StartupTraceWriterRegistryHandle(StartupTraceWriterRegistry*);
+
+  // Called by StartupTraceWriter destructor.
+  void OnWriterDestroyed(StartupTraceWriter*);
+
+  // Called by StartupTraceWriterRegistry destructor.
+  void OnRegistryDestroyed();
+
+ private:
+  StartupTraceWriterRegistryHandle(const StartupTraceWriterRegistryHandle&) =
+      delete;
+  StartupTraceWriterRegistryHandle& operator=(
+      const StartupTraceWriterRegistryHandle&) = delete;
+
+  std::mutex lock_;
+  StartupTraceWriterRegistry* registry_;
+};
+
+// Embedders can use this registry to create unbound StartupTraceWriters during
+// startup, and later bind them all safely to an arbiter and target buffer.
+class PERFETTO_EXPORT StartupTraceWriterRegistry {
+ public:
+  StartupTraceWriterRegistry();
+  ~StartupTraceWriterRegistry();
+
+  // Returns a new unbound StartupTraceWriter. Should only be called while
+  // unbound. Usually called on a writer thread.
+  std::unique_ptr<StartupTraceWriter> CreateUnboundTraceWriter();
+
+  // Return an unbound StartupTraceWriter back to the registry before it could
+  // be bound (usually called when the writer's thread is destroyed). The
+  // registry will keep this writer alive until the registry is bound to an
+  // arbiter (or destroyed itself). This way, its buffered data is retained.
+  // Should only be called while unbound. All packets written to the passed
+  // writer should have been completed and it should no longer be used to write
+  // data after calling this method.
+  void ReturnUnboundTraceWriter(std::unique_ptr<StartupTraceWriter>);
+
+  // Binds all StartupTraceWriters created by this registry to the given arbiter
+  // and target buffer. Should only be called once and on the passed
+  // TaskRunner's sequence. See
+  // SharedMemoryArbiter::BindStartupTraceWriterRegistry() for details.
+  //
+  // Note that the writers may not be bound synchronously if they are
+  // concurrently being written to. The registry will retry on the passed
+  // TaskRunner until all writers were bound successfully.
+  //
+  // Calls |on_bound_callback| asynchronously on |trace_writer| once all writers
+  // were bound.
+  void BindToArbiter(
+      SharedMemoryArbiterImpl*,
+      BufferID target_buffer,
+      base::TaskRunner*,
+      std::function<void(StartupTraceWriterRegistry*)> on_bound_callback);
+
+ private:
+  friend class StartupTraceWriterRegistryHandle;
+  friend class StartupTraceWriterTest;
+
+  StartupTraceWriterRegistry(const StartupTraceWriterRegistry&) = delete;
+  StartupTraceWriterRegistry& operator=(const StartupTraceWriterRegistry&) =
+      delete;
+
+  // Called by StartupTraceWriterRegistryHandle.
+  void OnStartupTraceWriterDestroyed(StartupTraceWriter*);
+
+  // Try to bind the remaining unbound writers and post a continuation to
+  // |task_runner_| if any writers could not be bound.
+  void TryBindWriters();
+
+  // Notifies the arbiter when we have bound all writers. May delete |this|.
+  void OnUnboundWritersRemovedLocked();
+
+  std::shared_ptr<StartupTraceWriterRegistryHandle> handle_;
+
+  // Begin lock-protected members.
+  std::mutex lock_;
+
+  // Unbound writers that we handed out to writer threads. These writers may be
+  // concurrently written to by the writer threads.
+  std::set<StartupTraceWriter*> unbound_writers_;
+
+  // Unbound writers that writer threads returned to the registry by calling
+  // ReturnUnboundTraceWriter(). Writers are removed from |unbound_writers_|
+  // when they are added to |unbound_owned_writers_|. No new data can be written
+  // to these writers.
+  std::vector<std::unique_ptr<StartupTraceWriter>> unbound_owned_writers_;
+
+  SharedMemoryArbiterImpl* arbiter_ = nullptr;  // |nullptr| while unbound.
+  BufferID target_buffer_ = 0;
+  base::TaskRunner* task_runner_;
+  std::function<void(StartupTraceWriterRegistry*)> on_bound_callback_ = nullptr;
+
+  // Keep at the end. Initialized during |BindToArbiter()|, like |task_runner_|.
+  // Weak pointers are only valid on |task_runner_|'s thread/sequence.
+  std::unique_ptr<base::WeakPtrFactory<StartupTraceWriterRegistry>>
+      weak_ptr_factory_;
+  // End lock-protected members.
+};
+
+}  // namespace perfetto
+
+#endif  // INCLUDE_PERFETTO_TRACING_CORE_STARTUP_TRACE_WRITER_REGISTRY_H_
diff --git a/include/perfetto/tracing/core/trace_stats.h b/include/perfetto/tracing/core/trace_stats.h
new file mode 100644
index 0000000..5d02163
--- /dev/null
+++ b/include/perfetto/tracing/core/trace_stats.h
@@ -0,0 +1,204 @@
+/*
+ * Copyright (C) 2017 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*******************************************************************************
+ * AUTOGENERATED - DO NOT EDIT
+ *******************************************************************************
+ * This file has been generated from the protobuf message
+ * perfetto/common/trace_stats.proto
+ * by
+ * ../../tools/proto_to_cpp/proto_to_cpp.cc.
+ * If you need to make changes here, change the .proto file and then run
+ * ./tools/gen_tracing_cpp_headers_from_protos
+ */
+
+#ifndef INCLUDE_PERFETTO_TRACING_CORE_TRACE_STATS_H_
+#define INCLUDE_PERFETTO_TRACING_CORE_TRACE_STATS_H_
+
+#include <stdint.h>
+#include <string>
+#include <type_traits>
+#include <vector>
+
+#include "perfetto/base/export.h"
+
+// Forward declarations for protobuf types.
+namespace perfetto {
+namespace protos {
+class TraceStats;
+class TraceStats_BufferStats;
+}  // namespace protos
+}  // namespace perfetto
+
+namespace perfetto {
+
+class PERFETTO_EXPORT TraceStats {
+ public:
+  class PERFETTO_EXPORT BufferStats {
+   public:
+    BufferStats();
+    ~BufferStats();
+    BufferStats(BufferStats&&) noexcept;
+    BufferStats& operator=(BufferStats&&);
+    BufferStats(const BufferStats&);
+    BufferStats& operator=(const BufferStats&);
+
+    // Conversion methods from/to the corresponding protobuf types.
+    void FromProto(const perfetto::protos::TraceStats_BufferStats&);
+    void ToProto(perfetto::protos::TraceStats_BufferStats*) const;
+
+    uint64_t buffer_size() const { return buffer_size_; }
+    void set_buffer_size(uint64_t value) { buffer_size_ = value; }
+
+    uint64_t bytes_written() const { return bytes_written_; }
+    void set_bytes_written(uint64_t value) { bytes_written_ = value; }
+
+    uint64_t bytes_overwritten() const { return bytes_overwritten_; }
+    void set_bytes_overwritten(uint64_t value) { bytes_overwritten_ = value; }
+
+    uint64_t bytes_read() const { return bytes_read_; }
+    void set_bytes_read(uint64_t value) { bytes_read_ = value; }
+
+    uint64_t padding_bytes_written() const { return padding_bytes_written_; }
+    void set_padding_bytes_written(uint64_t value) {
+      padding_bytes_written_ = value;
+    }
+
+    uint64_t padding_bytes_cleared() const { return padding_bytes_cleared_; }
+    void set_padding_bytes_cleared(uint64_t value) {
+      padding_bytes_cleared_ = value;
+    }
+
+    uint64_t chunks_written() const { return chunks_written_; }
+    void set_chunks_written(uint64_t value) { chunks_written_ = value; }
+
+    uint64_t chunks_rewritten() const { return chunks_rewritten_; }
+    void set_chunks_rewritten(uint64_t value) { chunks_rewritten_ = value; }
+
+    uint64_t chunks_overwritten() const { return chunks_overwritten_; }
+    void set_chunks_overwritten(uint64_t value) { chunks_overwritten_ = value; }
+
+    uint64_t chunks_read() const { return chunks_read_; }
+    void set_chunks_read(uint64_t value) { chunks_read_ = value; }
+
+    uint64_t chunks_committed_out_of_order() const {
+      return chunks_committed_out_of_order_;
+    }
+    void set_chunks_committed_out_of_order(uint64_t value) {
+      chunks_committed_out_of_order_ = value;
+    }
+
+    uint64_t write_wrap_count() const { return write_wrap_count_; }
+    void set_write_wrap_count(uint64_t value) { write_wrap_count_ = value; }
+
+    uint64_t patches_succeeded() const { return patches_succeeded_; }
+    void set_patches_succeeded(uint64_t value) { patches_succeeded_ = value; }
+
+    uint64_t patches_failed() const { return patches_failed_; }
+    void set_patches_failed(uint64_t value) { patches_failed_ = value; }
+
+    uint64_t readaheads_succeeded() const { return readaheads_succeeded_; }
+    void set_readaheads_succeeded(uint64_t value) {
+      readaheads_succeeded_ = value;
+    }
+
+    uint64_t readaheads_failed() const { return readaheads_failed_; }
+    void set_readaheads_failed(uint64_t value) { readaheads_failed_ = value; }
+
+    uint64_t abi_violations() const { return abi_violations_; }
+    void set_abi_violations(uint64_t value) { abi_violations_ = value; }
+
+   private:
+    uint64_t buffer_size_ = {};
+    uint64_t bytes_written_ = {};
+    uint64_t bytes_overwritten_ = {};
+    uint64_t bytes_read_ = {};
+    uint64_t padding_bytes_written_ = {};
+    uint64_t padding_bytes_cleared_ = {};
+    uint64_t chunks_written_ = {};
+    uint64_t chunks_rewritten_ = {};
+    uint64_t chunks_overwritten_ = {};
+    uint64_t chunks_read_ = {};
+    uint64_t chunks_committed_out_of_order_ = {};
+    uint64_t write_wrap_count_ = {};
+    uint64_t patches_succeeded_ = {};
+    uint64_t patches_failed_ = {};
+    uint64_t readaheads_succeeded_ = {};
+    uint64_t readaheads_failed_ = {};
+    uint64_t abi_violations_ = {};
+
+    // Allows to preserve unknown protobuf fields for compatibility
+    // with future versions of .proto files.
+    std::string unknown_fields_;
+  };
+
+  TraceStats();
+  ~TraceStats();
+  TraceStats(TraceStats&&) noexcept;
+  TraceStats& operator=(TraceStats&&);
+  TraceStats(const TraceStats&);
+  TraceStats& operator=(const TraceStats&);
+
+  // Conversion methods from/to the corresponding protobuf types.
+  void FromProto(const perfetto::protos::TraceStats&);
+  void ToProto(perfetto::protos::TraceStats*) const;
+
+  int buffer_stats_size() const {
+    return static_cast<int>(buffer_stats_.size());
+  }
+  const std::vector<BufferStats>& buffer_stats() const { return buffer_stats_; }
+  BufferStats* add_buffer_stats() {
+    buffer_stats_.emplace_back();
+    return &buffer_stats_.back();
+  }
+
+  uint32_t producers_connected() const { return producers_connected_; }
+  void set_producers_connected(uint32_t value) { producers_connected_ = value; }
+
+  uint64_t producers_seen() const { return producers_seen_; }
+  void set_producers_seen(uint64_t value) { producers_seen_ = value; }
+
+  uint32_t data_sources_registered() const { return data_sources_registered_; }
+  void set_data_sources_registered(uint32_t value) {
+    data_sources_registered_ = value;
+  }
+
+  uint64_t data_sources_seen() const { return data_sources_seen_; }
+  void set_data_sources_seen(uint64_t value) { data_sources_seen_ = value; }
+
+  uint32_t tracing_sessions() const { return tracing_sessions_; }
+  void set_tracing_sessions(uint32_t value) { tracing_sessions_ = value; }
+
+  uint32_t total_buffers() const { return total_buffers_; }
+  void set_total_buffers(uint32_t value) { total_buffers_ = value; }
+
+ private:
+  std::vector<BufferStats> buffer_stats_;
+  uint32_t producers_connected_ = {};
+  uint64_t producers_seen_ = {};
+  uint32_t data_sources_registered_ = {};
+  uint64_t data_sources_seen_ = {};
+  uint32_t tracing_sessions_ = {};
+  uint32_t total_buffers_ = {};
+
+  // Allows to preserve unknown protobuf fields for compatibility
+  // with future versions of .proto files.
+  std::string unknown_fields_;
+};
+
+}  // namespace perfetto
+
+#endif  // INCLUDE_PERFETTO_TRACING_CORE_TRACE_STATS_H_
diff --git a/include/perfetto/tracing/core/tracing_service.h b/include/perfetto/tracing/core/tracing_service.h
index c4f369e..b80f2a8 100644
--- a/include/perfetto/tracing/core/tracing_service.h
+++ b/include/perfetto/tracing/core/tracing_service.h
@@ -167,6 +167,9 @@
 
     // Will call OnAttach().
     virtual void Attach(const std::string& key) = 0;
+
+    // Will call OnTraceStats().
+    virtual void GetTraceStats() = 0;
   };  // class ConsumerEndpoint.
 
   // Implemented in src/core/tracing_service_impl.cc .
diff --git a/protos/perfetto/common/BUILD.gn b/protos/perfetto/common/BUILD.gn
index e2b22b8..fb34919 100644
--- a/protos/perfetto/common/BUILD.gn
+++ b/protos/perfetto/common/BUILD.gn
@@ -20,6 +20,7 @@
   "commit_data_request.proto",
   "android_log_constants.proto",
   "sys_stats_counters.proto",
+  "trace_stats.proto",
 ]
 
 # Proto messages that are required by the IPC service definitions but have also
diff --git a/protos/perfetto/trace/trace_stats.proto b/protos/perfetto/common/trace_stats.proto
similarity index 63%
rename from protos/perfetto/trace/trace_stats.proto
rename to protos/perfetto/common/trace_stats.proto
index cd69e2b..5993e3e 100644
--- a/protos/perfetto/trace/trace_stats.proto
+++ b/protos/perfetto/common/trace_stats.proto
@@ -23,11 +23,45 @@
 message TraceStats {
   // From TraceBuffer::Stats.
   //
-  // NEXT ID: 12
+  // Next id: 18.
   message BufferStats {
-    // Num. bytes written into the circular buffer.
+    // Size of the circular buffer in bytes.
+    optional uint64 buffer_size = 12;
+
+    // Num. bytes written into the circular buffer, including chunk headers.
     optional uint64 bytes_written = 1;
 
+    // Num. bytes overwritten before they have been read (i.e. loss of data).
+    optional uint64 bytes_overwritten = 13;
+
+    // Total size of chunks that were fully read from the circular buffer by the
+    // consumer. This may not be equal to |bytes_written| either in the middle
+    // of tracing, or if |chunks_overwritten| is non-zero. Note that this is the
+    // size of the chunks read from the buffer, including chunk headers, which
+    // will be different from the total size of packets returned to the
+    // consumer.
+    //
+    // The current utilization of the trace buffer (mid-tracing) can be obtained
+    // by subtracting |bytes_read| and |bytes_overwritten| from |bytes_written|,
+    // adding the difference of |padding_bytes_written| and
+    // |padding_bytes_cleared|, and comparing this sum to the |buffer_size|.
+    // Note that this represents the total size of buffered data in the buffer,
+    // yet this data may be spread non-contiguously through the buffer and may
+    // be overridden before the utilization reaches 100%.
+    optional uint64 bytes_read = 14;
+
+    // Num. bytes that were allocated as padding between chunks in the circular
+    // buffer.
+    optional uint64 padding_bytes_written = 15;
+
+    // Num. of padding bytes that were removed from the circular buffer when
+    // they were overwritten.
+    //
+    // The difference between |padding_bytes_written| and
+    // |padding_bytes_cleared| denotes the total size of padding currently
+    // present in the buffer.
+    optional uint64 padding_bytes_cleared = 16;
+
     // Num. chunks (!= packets) written into the buffer.
     optional uint64 chunks_written = 2;
 
@@ -38,6 +72,11 @@
     // Num. chunks overwritten before they have been read (i.e. loss of data).
     optional uint64 chunks_overwritten = 3;
 
+    // Num. chunks (!= packets) that were fully read from the circular buffer by
+    // the consumer. This may not be equal to |chunks_written| either in the
+    // middle of tracing, or if |chunks_overwritten| is non-zero.
+    optional uint64 chunks_read = 17;
+
     // Num. chunks that were committed out of order.
     optional uint64 chunks_committed_out_of_order = 11;
 
@@ -81,7 +120,7 @@
   // Num. data sources registered for all trace sessions.
   optional uint32 data_sources_registered = 4;
 
-  // Num. data sources ever seen for all trace sessions since startupb
+  // Num. data sources ever seen for all trace sessions since startup.
   optional uint64 data_sources_seen = 5;
 
   // Num. concurrently active tracing sessions.
diff --git a/protos/perfetto/ipc/consumer_port.proto b/protos/perfetto/ipc/consumer_port.proto
index 944090e..6e7affc 100644
--- a/protos/perfetto/ipc/consumer_port.proto
+++ b/protos/perfetto/ipc/consumer_port.proto
@@ -17,6 +17,7 @@
 syntax = "proto2";
 option optimize_for = LITE_RUNTIME;
 
+import "perfetto/common/trace_stats.proto";
 import "perfetto/config/trace_config.proto";
 
 package perfetto.protos;
@@ -85,6 +86,10 @@
   // in the standard non-detached case.
   rpc Attach(AttachRequest) returns (AttachResponse) {}
 
+  // Allows the consumer to obtain statistics about the current tracing session,
+  // such as buffer usage stats. Intended for debugging or UI use.
+  rpc GetTraceStats(GetTraceStatsRequest) returns (GetTraceStatsResponse) {}
+
   // TODO rpc ListDataSources(), for the UI.
 }
 
@@ -161,12 +166,22 @@
 message DetachRequest {
   optional string key = 1;
 }
+
 message DetachResponse {}
 
 // Arguments for rpc Attach.
 message AttachRequest {
   optional string key = 1;
 }
+
 message AttachResponse {
   optional protos.TraceConfig trace_config = 1;
 }
+
+// Arguments for rpc GetTraceStats.
+
+message GetTraceStatsRequest {}
+
+message GetTraceStatsResponse {
+  optional TraceStats trace_stats = 1;
+}
diff --git a/protos/perfetto/trace/BUILD.gn b/protos/perfetto/trace/BUILD.gn
index ad3a35b..3dbef21 100644
--- a/protos/perfetto/trace/BUILD.gn
+++ b/protos/perfetto/trace/BUILD.gn
@@ -18,10 +18,7 @@
 
 # Common protos used by both the ":minimal_lite" target (for the service) and
 # the generic ":lite" target
-proto_sources_minimal = [
-  "clock_snapshot.proto",
-  "trace_stats.proto",
-]
+proto_sources_minimal = [ "clock_snapshot.proto" ]
 
 proto_sources_trusted = [ "trusted_packet.proto" ]
 
diff --git a/protos/perfetto/trace/chrome/chrome_trace_packet.proto b/protos/perfetto/trace/chrome/chrome_trace_packet.proto
index 76eab46..0c72eae 100644
--- a/protos/perfetto/trace/chrome/chrome_trace_packet.proto
+++ b/protos/perfetto/trace/chrome/chrome_trace_packet.proto
@@ -26,10 +26,10 @@
 syntax = "proto3";
 option optimize_for = LITE_RUNTIME;
 
+import "perfetto/common/trace_stats.proto";
 import "perfetto/config/trace_config.proto";
 import "perfetto/trace/chrome/chrome_trace_event.proto";
 import "perfetto/trace/clock_snapshot.proto";
-import "perfetto/trace/trace_stats.proto";
 
 package perfetto.protos;
 
diff --git a/protos/perfetto/trace/trace_packet.proto b/protos/perfetto/trace/trace_packet.proto
index 6a118d1..9a25982 100644
--- a/protos/perfetto/trace/trace_packet.proto
+++ b/protos/perfetto/trace/trace_packet.proto
@@ -17,6 +17,7 @@
 syntax = "proto2";
 option optimize_for = LITE_RUNTIME;
 
+import "perfetto/common/trace_stats.proto";
 import "perfetto/config/trace_config.proto";
 import "perfetto/trace/android/android_log.proto";
 import "perfetto/trace/chrome/chrome_trace_event.proto";
@@ -30,7 +31,6 @@
 import "perfetto/trace/ps/process_tree.proto";
 import "perfetto/trace/sys_stats/sys_stats.proto";
 import "perfetto/trace/test_event.proto";
-import "perfetto/trace/trace_stats.proto";
 
 package perfetto.protos;
 
diff --git a/protos/perfetto/trace/trusted_packet.proto b/protos/perfetto/trace/trusted_packet.proto
index 758b3ca..84c6a52 100644
--- a/protos/perfetto/trace/trusted_packet.proto
+++ b/protos/perfetto/trace/trusted_packet.proto
@@ -26,9 +26,9 @@
 syntax = "proto3";
 option optimize_for = LITE_RUNTIME;
 
+import "perfetto/common/trace_stats.proto";
 import "perfetto/config/trace_config.proto";
 import "perfetto/trace/clock_snapshot.proto";
-import "perfetto/trace/trace_stats.proto";
 
 package perfetto.protos;
 
diff --git a/src/perfetto_cmd/perfetto_cmd.cc b/src/perfetto_cmd/perfetto_cmd.cc
index ede5c5b..bf890b7 100644
--- a/src/perfetto_cmd/perfetto_cmd.cc
+++ b/src/perfetto_cmd/perfetto_cmd.cc
@@ -706,6 +706,11 @@
   }
 }
 
+void PerfettoCmd::OnTraceStats(bool /*success*/,
+                               const TraceStats& /*trace_config*/) {
+  // TODO(eseckler): Support GetTraceStats().
+}
+
 int __attribute__((visibility("default")))
 PerfettoCmdMain(int argc, char** argv) {
   g_consumer_cmd = new perfetto::PerfettoCmd();
diff --git a/src/perfetto_cmd/perfetto_cmd.h b/src/perfetto_cmd/perfetto_cmd.h
index 943f204..8c6fbc7 100644
--- a/src/perfetto_cmd/perfetto_cmd.h
+++ b/src/perfetto_cmd/perfetto_cmd.h
@@ -60,6 +60,7 @@
   void OnTraceData(std::vector<TracePacket>, bool has_more) override;
   void OnDetach(bool) override;
   void OnAttach(bool, const TraceConfig&) override;
+  void OnTraceStats(bool, const TraceStats&) override;
 
   void SignalCtrlC() { ctrl_c_evt_.Notify(); }
 
diff --git a/src/tracing/BUILD.gn b/src/tracing/BUILD.gn
index ae49d44..06969cc 100644
--- a/src/tracing/BUILD.gn
+++ b/src/tracing/BUILD.gn
@@ -55,12 +55,14 @@
     "core/sliced_protobuf_input_stream.cc",
     "core/sliced_protobuf_input_stream.h",
     "core/startup_trace_writer.cc",
+    "core/startup_trace_writer_registry.cc",
     "core/sys_stats_config.cc",
     "core/test_config.cc",
     "core/trace_buffer.cc",
     "core/trace_buffer.h",
     "core/trace_config.cc",
     "core/trace_packet.cc",
+    "core/trace_stats.cc",
     "core/trace_writer_impl.cc",
     "core/trace_writer_impl.h",
     "core/tracing_service_impl.cc",
diff --git a/src/tracing/api_impl/consumer_api.cc b/src/tracing/api_impl/consumer_api.cc
index 438f00d..544926e 100644
--- a/src/tracing/api_impl/consumer_api.cc
+++ b/src/tracing/api_impl/consumer_api.cc
@@ -91,6 +91,7 @@
   void OnTraceData(std::vector<TracePacket>, bool has_more) override;
   void OnDetach(bool) override;
   void OnAttach(bool, const TraceConfig&) override;
+  void OnTraceStats(bool, const TraceStats&) override;
 
  private:
   TracingSession(const TracingSession&) = delete;
@@ -235,6 +236,11 @@
   PERFETTO_DCHECK(false);  // Should never be called, Attach() is not used here.
 }
 
+void TracingSession::OnTraceStats(bool, const TraceStats&) {
+  // Should never be called, GetTraceStats() is not used here.
+  PERFETTO_DCHECK(false);
+}
+
 void TracingSession::DestroyConnection() {
   // Destroys the connection in a separate task. This is to avoid destroying
   // the IPC connection directly from within the IPC callback.
diff --git a/src/tracing/core/service_impl_unittest.cc b/src/tracing/core/service_impl_unittest.cc
index 2a23b15..1cf8ff6 100644
--- a/src/tracing/core/service_impl_unittest.cc
+++ b/src/tracing/core/service_impl_unittest.cc
@@ -1374,4 +1374,33 @@
   consumer->WaitForTracingDisabled(5000);
 }
 
+TEST_F(TracingServiceImplTest, GetTraceStats) {
+  std::unique_ptr<MockConsumer> consumer = CreateMockConsumer();
+  consumer->Connect(svc.get());
+
+  consumer->GetTraceStats();
+  consumer->WaitForTraceStats(false);
+
+  std::unique_ptr<MockProducer> producer = CreateMockProducer();
+  producer->Connect(svc.get(), "mock_producer");
+  producer->RegisterDataSource("data_source");
+
+  TraceConfig trace_config;
+  trace_config.add_buffers()->set_size_kb(128);
+  auto* ds_config = trace_config.add_data_sources()->mutable_config();
+  ds_config->set_name("data_source");
+
+  consumer->EnableTracing(trace_config);
+  producer->WaitForTracingSetup();
+  producer->WaitForDataSourceSetup("data_source");
+  producer->WaitForDataSourceStart("data_source");
+
+  consumer->GetTraceStats();
+  consumer->WaitForTraceStats(true);
+
+  consumer->DisableTracing();
+  producer->WaitForDataSourceStop("data_source");
+  consumer->WaitForTracingDisabled();
+}
+
 }  // namespace perfetto
diff --git a/src/tracing/core/shared_memory_arbiter_impl.cc b/src/tracing/core/shared_memory_arbiter_impl.cc
index e4d0e2d..9a3f2d6 100644
--- a/src/tracing/core/shared_memory_arbiter_impl.cc
+++ b/src/tracing/core/shared_memory_arbiter_impl.cc
@@ -21,7 +21,7 @@
 #include "perfetto/base/time.h"
 #include "perfetto/tracing/core/commit_data_request.h"
 #include "perfetto/tracing/core/shared_memory.h"
-#include "perfetto/tracing/core/startup_trace_writer.h"
+#include "perfetto/tracing/core/startup_trace_writer_registry.h"
 #include "src/tracing/core/null_trace_writer.h"
 #include "src/tracing/core/trace_writer_impl.h"
 
@@ -276,9 +276,35 @@
       new TraceWriterImpl(this, id, target_buffer));
 }
 
-bool SharedMemoryArbiterImpl::BindStartupTraceWriter(StartupTraceWriter* writer,
-                                                     BufferID target_buffer) {
-  return writer->BindToArbiter(this, target_buffer);
+void SharedMemoryArbiterImpl::BindStartupTraceWriterRegistry(
+    std::unique_ptr<StartupTraceWriterRegistry> registry,
+    BufferID target_buffer) {
+  // The registry will be owned by the arbiter, so it's safe to capture |this|
+  // in the callback.
+  auto on_bound_callback = [this](StartupTraceWriterRegistry* bound_registry) {
+    std::unique_ptr<StartupTraceWriterRegistry> registry_to_delete;
+    {
+      std::lock_guard<std::mutex> scoped_lock(lock_);
+
+      for (auto it = startup_trace_writer_registries_.begin();
+           it != startup_trace_writer_registries_.end(); it++) {
+        if (it->get() == bound_registry) {
+          // We can't delete the registry while the arbiter's lock is held
+          // (to avoid lock inversion).
+          registry_to_delete = std::move(*it);
+          startup_trace_writer_registries_.erase(it);
+          break;
+        }
+      }
+    }
+
+    // The registry should have been in |startup_trace_writer_registries_|.
+    PERFETTO_DCHECK(registry_to_delete);
+    registry_to_delete.reset();
+  };
+  registry->BindToArbiter(this, target_buffer, task_runner_, on_bound_callback);
+  std::lock_guard<std::mutex> scoped_lock(lock_);
+  startup_trace_writer_registries_.push_back(std::move(registry));
 }
 
 void SharedMemoryArbiterImpl::NotifyFlushComplete(FlushRequestID req_id) {
diff --git a/src/tracing/core/shared_memory_arbiter_impl.h b/src/tracing/core/shared_memory_arbiter_impl.h
index 0ef8abe..0707bcf 100644
--- a/src/tracing/core/shared_memory_arbiter_impl.h
+++ b/src/tracing/core/shared_memory_arbiter_impl.h
@@ -29,6 +29,7 @@
 #include "perfetto/tracing/core/basic_types.h"
 #include "perfetto/tracing/core/shared_memory_abi.h"
 #include "perfetto/tracing/core/shared_memory_arbiter.h"
+#include "perfetto/tracing/core/startup_trace_writer_registry.h"
 #include "src/tracing/core/id_allocator.h"
 
 namespace perfetto {
@@ -105,13 +106,15 @@
   // See include/perfetto/tracing/core/shared_memory_arbiter.h for comments.
   std::unique_ptr<TraceWriter> CreateTraceWriter(
       BufferID target_buffer) override;
-  bool BindStartupTraceWriter(StartupTraceWriter* writer,
-                              BufferID target_buffer) override;
+  void BindStartupTraceWriterRegistry(
+      std::unique_ptr<StartupTraceWriterRegistry>,
+      BufferID target_buffer) override;
 
   void NotifyFlushComplete(FlushRequestID) override;
 
  private:
   friend class TraceWriterImpl;
+  friend class StartupTraceWriterTest;
 
   static SharedMemoryABI::PageLayout default_page_layout;
 
@@ -137,6 +140,10 @@
   std::unique_ptr<CommitDataRequest> commit_data_req_;
   size_t bytes_pending_commit_ = 0;  // SUM(chunk.size() : commit_data_req_).
   IdAllocator<WriterID> active_writer_ids_;
+  // Registries whose Bind() is in progress. We destroy each registry when their
+  // Bind() is complete or when the arbiter is destroyed itself.
+  std::vector<std::unique_ptr<StartupTraceWriterRegistry>>
+      startup_trace_writer_registries_;
   // --- End lock-protected members ---
 
   // Keep at the end.
diff --git a/src/tracing/core/startup_trace_writer.cc b/src/tracing/core/startup_trace_writer.cc
index eb1d320..f4f7f05 100644
--- a/src/tracing/core/startup_trace_writer.cc
+++ b/src/tracing/core/startup_trace_writer.cc
@@ -16,9 +16,13 @@
 
 #include "perfetto/tracing/core/startup_trace_writer.h"
 
+#include <numeric>
+
 #include "perfetto/base/logging.h"
+#include "perfetto/protozero/proto_utils.h"
 #include "perfetto/trace/trace_packet.pbzero.h"
 #include "perfetto/tracing/core/shared_memory_abi.h"
+#include "perfetto/tracing/core/startup_trace_writer_registry.h"
 #include "src/tracing/core/patch_list.h"
 #include "src/tracing/core/shared_memory_arbiter_impl.h"
 
@@ -54,9 +58,12 @@
   LocalBufferReader(protozero::ScatteredHeapBuffer* buffer)
       : buffer_slices_(buffer->slices()), cur_slice_(buffer_slices_.begin()) {}
 
-  size_t ReadBytes(SharedMemoryABI::Chunk* target_chunk, size_t num_bytes) {
-    PERFETTO_CHECK(target_chunk->payload_size() >= num_bytes);
-    uint8_t* chunk_payload = target_chunk->payload_begin();
+  size_t ReadBytes(SharedMemoryABI::Chunk* target_chunk,
+                   size_t num_bytes,
+                   size_t cur_payload_size) {
+    PERFETTO_CHECK(target_chunk->payload_size() >=
+                   num_bytes + cur_payload_size);
+    uint8_t* target_ptr = target_chunk->payload_begin() + cur_payload_size;
     size_t bytes_read = 0;
     while (bytes_read < num_bytes) {
       if (cur_slice_ == buffer_slices_.end())
@@ -72,8 +79,8 @@
 
       size_t read_size = std::min(num_bytes - bytes_read,
                                   cur_slice_range.size() - cur_slice_offset_);
-      memcpy(chunk_payload + bytes_read,
-             cur_slice_range.begin + cur_slice_offset_, read_size);
+      memcpy(target_ptr + bytes_read, cur_slice_range.begin + cur_slice_offset_,
+             read_size);
       cur_slice_offset_ += read_size;
       bytes_read += read_size;
 
@@ -84,6 +91,23 @@
     return bytes_read;
   }
 
+  size_t TotalUsedSize() const {
+    size_t used_size = 0;
+    for (const auto& slice : buffer_slices_) {
+      used_size += slice.GetUsedRange().size();
+    }
+    return used_size;
+  }
+
+  bool DidReadAllData() const {
+    if (cur_slice_ == buffer_slices_.end())
+      return true;
+
+    const auto next_slice = cur_slice_ + 1;
+    return next_slice == buffer_slices_.end() &&
+           cur_slice_->GetUsedRange().size() == cur_slice_offset_;
+  }
+
  private:
   const std::vector<protozero::ScatteredHeapBuffer::Slice>& buffer_slices_;
 
@@ -96,8 +120,10 @@
 
 }  // namespace
 
-StartupTraceWriter::StartupTraceWriter()
-    : memory_buffer_(new protozero::ScatteredHeapBuffer()),
+StartupTraceWriter::StartupTraceWriter(
+    std::shared_ptr<StartupTraceWriterRegistryHandle> registry_handle)
+    : registry_handle_(std::move(registry_handle)),
+      memory_buffer_(new protozero::ScatteredHeapBuffer()),
       memory_stream_writer_(
           new protozero::ScatteredStreamWriter(memory_buffer_.get())) {
   memory_buffer_->set_writer(memory_stream_writer_.get());
@@ -108,7 +134,10 @@
     std::unique_ptr<TraceWriter> trace_writer)
     : was_bound_(true), trace_writer_(std::move(trace_writer)) {}
 
-StartupTraceWriter::~StartupTraceWriter() = default;
+StartupTraceWriter::~StartupTraceWriter() {
+  if (registry_handle_)
+    registry_handle_->OnWriterDestroyed(this);
+}
 
 bool StartupTraceWriter::BindToArbiter(SharedMemoryArbiterImpl* arbiter,
                                        BufferID target_buffer) {
@@ -126,18 +155,18 @@
     PERFETTO_DCHECK(cur_packet_->is_finalized());
     cur_packet_.reset();
   }
-  memory_stream_writer_.reset();
 
   trace_writer_ = arbiter->CreateTraceWriter(target_buffer);
-
   ChunkID next_chunk_id = CommitLocalBufferChunks(
       arbiter, trace_writer_->writer_id(), target_buffer);
-  memory_buffer_.reset();
 
   // The real TraceWriter should start writing at the subsequent chunk ID.
   bool success = trace_writer_->SetFirstChunkId(next_chunk_id);
   PERFETTO_DCHECK(success);
 
+  memory_stream_writer_.reset();
+  memory_buffer_.reset();
+
   return true;
 }
 
@@ -195,7 +224,8 @@
   }
 
   // Can't flush while unbound.
-  callback();
+  if (callback)
+    callback();
 }
 
 WriterID StartupTraceWriter::writer_id() const {
@@ -245,7 +275,6 @@
   // Finalize() is a no-op because the packet is already finalized.
   uint32_t packet_size = cur_packet_->Finalize();
   packet_sizes_.push_back(packet_size);
-  total_payload_size += packet_size;
 
   // Write is complete, reset the flag to allow binding.
   std::lock_guard<std::mutex> lock(lock_);
@@ -266,8 +295,13 @@
   if (packet_sizes_.empty() || !writer_id)
     return 0;
 
+  memory_buffer_->AdjustUsedSizeOfCurrentSlice();
   LocalBufferReader local_buffer_reader(memory_buffer_.get());
 
+  PERFETTO_DCHECK(local_buffer_reader.TotalUsedSize() ==
+                  std::accumulate(packet_sizes_.begin(), packet_sizes_.end(),
+                                  static_cast<size_t>(0u)));
+
   ChunkID next_chunk_id = 0;
   SharedMemoryABI::Chunk cur_chunk =
       NewChunk(arbiter, writer_id, next_chunk_id++, false);
@@ -284,29 +318,41 @@
     do {
       uint32_t fragment_size = static_cast<uint32_t>(
           std::min(static_cast<size_t>(remaining_packet_size),
-                   max_payload_size - cur_payload_size));
+                   max_payload_size - cur_payload_size -
+                       SharedMemoryABI::kPacketHeaderSize));
+      // Write packet header, i.e. the fragment size.
+      protozero::proto_utils::WriteRedundantVarInt(
+          fragment_size, cur_chunk.payload_begin() + cur_payload_size);
+      cur_payload_size += SharedMemoryABI::kPacketHeaderSize;
+
+      // Copy packet content into the chunk.
+      size_t bytes_read = local_buffer_reader.ReadBytes(
+          &cur_chunk, fragment_size, cur_payload_size);
+      PERFETTO_DCHECK(bytes_read == fragment_size);
+
       cur_payload_size += fragment_size;
       remaining_packet_size -= fragment_size;
 
       bool last_write =
           packet_idx == total_num_packets - 1 && remaining_packet_size == 0;
 
-      // Find num_packets that we should copy into current chunk and their
-      // payload_size.
-      bool write_chunk = cur_num_packets == ChunkHeader::Packets::kMaxCount ||
-                         cur_payload_size == max_payload_size || last_write;
+      // We should return the current chunk if we've filled its payload, reached
+      // the maximum number of packets, or wrote everything we wanted to.
+      bool return_chunk =
+          cur_payload_size >=
+              max_payload_size - SharedMemoryABI::kPacketHeaderSize ||
+          cur_num_packets == ChunkHeader::Packets::kMaxCount || last_write;
 
-      if (write_chunk) {
-        // Write chunk payload.
-        local_buffer_reader.ReadBytes(&cur_chunk, cur_payload_size);
-
+      if (return_chunk) {
         auto new_packet_count =
             cur_chunk.IncreasePacketCountTo(cur_num_packets);
         PERFETTO_DCHECK(new_packet_count == cur_num_packets);
 
         bool is_fragmenting = remaining_packet_size > 0;
-        if (is_fragmenting)
+        if (is_fragmenting) {
+          PERFETTO_DCHECK(cur_payload_size == max_payload_size);
           cur_chunk.SetFlag(ChunkHeader::kLastPacketContinuesOnNextChunk);
+        }
 
         arbiter->ReturnCompletedChunk(std::move(cur_chunk), target_buffer,
                                       &empty_patch_list);
@@ -327,6 +373,8 @@
 
   // The last chunk should have been returned.
   PERFETTO_DCHECK(!cur_chunk.is_valid());
+  // We should have read all data from the local buffer.
+  PERFETTO_DCHECK(local_buffer_reader.DidReadAllData());
 
   return next_chunk_id;
 }
diff --git a/src/tracing/core/startup_trace_writer_registry.cc b/src/tracing/core/startup_trace_writer_registry.cc
new file mode 100644
index 0000000..6de4ab0
--- /dev/null
+++ b/src/tracing/core/startup_trace_writer_registry.cc
@@ -0,0 +1,148 @@
+/*
+ * Copyright (C) 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "perfetto/tracing/core/startup_trace_writer_registry.h"
+
+#include <functional>
+
+#include "perfetto/base/logging.h"
+#include "perfetto/base/task_runner.h"
+#include "perfetto/tracing/core/startup_trace_writer.h"
+#include "src/tracing/core/shared_memory_arbiter_impl.h"
+
+using ChunkHeader = perfetto::SharedMemoryABI::ChunkHeader;
+
+namespace perfetto {
+
+StartupTraceWriterRegistryHandle::StartupTraceWriterRegistryHandle(
+    StartupTraceWriterRegistry* registry)
+    : registry_(registry) {}
+
+void StartupTraceWriterRegistryHandle::OnWriterDestroyed(
+    StartupTraceWriter* writer) {
+  std::lock_guard<std::mutex> lock(lock_);
+  if (registry_)
+    registry_->OnStartupTraceWriterDestroyed(writer);
+}
+
+void StartupTraceWriterRegistryHandle::OnRegistryDestroyed() {
+  std::lock_guard<std::mutex> lock(lock_);
+  registry_ = nullptr;
+}
+
+StartupTraceWriterRegistry::StartupTraceWriterRegistry()
+    : handle_(std::make_shared<StartupTraceWriterRegistryHandle>(this)) {}
+
+StartupTraceWriterRegistry::~StartupTraceWriterRegistry() {
+  handle_->OnRegistryDestroyed();
+}
+
+std::unique_ptr<StartupTraceWriter>
+StartupTraceWriterRegistry::CreateUnboundTraceWriter() {
+  std::lock_guard<std::mutex> lock(lock_);
+  PERFETTO_DCHECK(!arbiter_);  // Should only be called while unbound.
+  std::unique_ptr<StartupTraceWriter> writer(new StartupTraceWriter(handle_));
+  unbound_writers_.insert(writer.get());
+  return writer;
+}
+
+void StartupTraceWriterRegistry::ReturnUnboundTraceWriter(
+    std::unique_ptr<StartupTraceWriter> trace_writer) {
+  std::lock_guard<std::mutex> lock(lock_);
+  PERFETTO_DCHECK(!arbiter_);  // Should only be called while unbound.
+  PERFETTO_DCHECK(!trace_writer->write_in_progress_);
+  PERFETTO_DCHECK(unbound_writers_.count(trace_writer.get()));
+  unbound_writers_.erase(trace_writer.get());
+  unbound_owned_writers_.push_back(std::move(trace_writer));
+}
+
+void StartupTraceWriterRegistry::BindToArbiter(
+    SharedMemoryArbiterImpl* arbiter,
+    BufferID target_buffer,
+    base::TaskRunner* task_runner,
+    std::function<void(StartupTraceWriterRegistry*)> on_bound_callback) {
+  std::vector<std::unique_ptr<StartupTraceWriter>> unbound_owned_writers;
+  {
+    std::lock_guard<std::mutex> lock(lock_);
+    PERFETTO_DCHECK(!arbiter_);
+    arbiter_ = arbiter;
+    target_buffer_ = target_buffer;
+    task_runner_ = task_runner;
+    weak_ptr_factory_.reset(
+        new base::WeakPtrFactory<StartupTraceWriterRegistry>(this));
+    on_bound_callback_ = std::move(on_bound_callback);
+    // We can't destroy the writers while holding |lock_|, so we swap them out
+    // here instead. After we are bound, no more writers can be added to the
+    // list.
+    unbound_owned_writers.swap(unbound_owned_writers_);
+  }
+
+  // Bind and destroy the owned writers.
+  for (const auto& writer : unbound_owned_writers) {
+    // This should succeed since nobody can write to these writers concurrently.
+    bool success = writer->BindToArbiter(arbiter_, target_buffer_);
+    PERFETTO_DCHECK(success);
+  }
+  unbound_owned_writers.clear();
+
+  TryBindWriters();
+}
+
+void StartupTraceWriterRegistry::TryBindWriters() {
+  std::lock_guard<std::mutex> lock(lock_);
+  for (auto it = unbound_writers_.begin(); it != unbound_writers_.end();) {
+    if ((*it)->BindToArbiter(arbiter_, target_buffer_)) {
+      it = unbound_writers_.erase(it);
+    } else {
+      it++;
+    }
+  }
+  if (!unbound_writers_.empty()) {
+    auto weak_this = weak_ptr_factory_->GetWeakPtr();
+    task_runner_->PostTask([weak_this] {
+      if (weak_this)
+        weak_this->TryBindWriters();
+    });
+  }
+  OnUnboundWritersRemovedLocked();
+}
+
+void StartupTraceWriterRegistry::OnStartupTraceWriterDestroyed(
+    StartupTraceWriter* trace_writer) {
+  std::lock_guard<std::mutex> lock(lock_);
+  if (unbound_writers_.erase(trace_writer) > 0)
+    OnUnboundWritersRemovedLocked();
+}
+
+void StartupTraceWriterRegistry::OnUnboundWritersRemovedLocked() {
+  if (!unbound_writers_.empty() || !task_runner_ || !on_bound_callback_)
+    return;
+
+  PERFETTO_DCHECK(weak_ptr_factory_);
+  auto weak_this = weak_ptr_factory_->GetWeakPtr();
+  // Run callback in PostTask() since the callback may delete |this| and thus
+  // might otherwise cause a deadlock.
+  auto callback = on_bound_callback_;
+  on_bound_callback_ = nullptr;
+  task_runner_->PostTask([weak_this, callback]() {
+    if (!weak_this)
+      return;
+    // Note: callback may delete |this|.
+    callback(weak_this.get());
+  });
+}
+
+}  // namespace perfetto
diff --git a/src/tracing/core/startup_trace_writer_unittest.cc b/src/tracing/core/startup_trace_writer_unittest.cc
index fd9ea45..2566455 100644
--- a/src/tracing/core/startup_trace_writer_unittest.cc
+++ b/src/tracing/core/startup_trace_writer_unittest.cc
@@ -17,17 +17,21 @@
 #include "perfetto/tracing/core/startup_trace_writer.h"
 
 #include "gtest/gtest.h"
+#include "perfetto/tracing/core/startup_trace_writer_registry.h"
+#include "perfetto/tracing/core/trace_packet.h"
 #include "perfetto/tracing/core/tracing_service.h"
 #include "src/base/test/test_task_runner.h"
 #include "src/tracing/core/shared_memory_arbiter_impl.h"
+#include "src/tracing/core/sliced_protobuf_input_stream.h"
+#include "src/tracing/core/trace_buffer.h"
 #include "src/tracing/test/aligned_buffer_test.h"
 #include "src/tracing/test/fake_producer_endpoint.h"
 
 #include "perfetto/trace/test_event.pbzero.h"
+#include "perfetto/trace/trace_packet.pb.h"
 #include "perfetto/trace/trace_packet.pbzero.h"
 
 namespace perfetto {
-namespace {
 
 class StartupTraceWriterTest : public AlignedBufferTest {
  public:
@@ -46,16 +50,29 @@
     task_runner_.reset();
   }
 
+  std::unique_ptr<StartupTraceWriter> CreateUnboundWriter() {
+    std::shared_ptr<StartupTraceWriterRegistryHandle> registry;
+    return std::unique_ptr<StartupTraceWriter>(
+        new StartupTraceWriter(registry));
+  }
+
+  bool BindWriter(StartupTraceWriter* writer) {
+    const BufferID kBufId = 42;
+    return writer->BindToArbiter(arbiter_.get(), kBufId);
+  }
+
   void WritePackets(StartupTraceWriter* writer, size_t packet_count) {
     for (size_t i = 0; i < packet_count; i++) {
       auto packet = writer->NewTracePacket();
-      packet->set_for_testing()->set_str("foo");
+      packet->set_for_testing()->set_str(kPacketPayload);
     }
   }
 
-  void VerifyPacketCount(size_t expected_count) {
+  void VerifyPackets(size_t expected_count) {
     SharedMemoryABI* abi = arbiter_->shmem_abi_for_testing();
-    size_t packets_count = 0;
+    auto buffer = TraceBuffer::Create(abi->size());
+
+    size_t total_packets_count = 0;
     ChunkID current_max_chunk_id = 0;
     for (size_t page_idx = 0; page_idx < kNumPages; page_idx++) {
       uint32_t page_layout = abi->GetPageLayout(page_idx);
@@ -77,19 +94,74 @@
         current_max_chunk_id = std::max(current_max_chunk_id, chunk_id);
 
         auto packets_header = chunk.header()->packets.load();
-        packets_count += packets_header.count;
+        total_packets_count += packets_header.count;
         if (packets_header.flags &
             SharedMemoryABI::ChunkHeader::kFirstPacketContinuesFromPrevChunk) {
           // Don't count fragmented packets twice.
-          packets_count--;
+          total_packets_count--;
         }
+
+        buffer->CopyChunkUntrusted(
+            /*producer_id_trusted=*/1, /*producer_uid_trusted=*/1,
+            chunk.header()->writer_id.load(), chunk_id, packets_header.count,
+            packets_header.flags, /*chunk_complete=*/true,
+            chunk.payload_begin(), chunk.payload_size());
         abi->ReleaseChunkAsFree(std::move(chunk));
       }
     }
     last_read_max_chunk_id_ = current_max_chunk_id;
-    EXPECT_EQ(expected_count, packets_count);
+    EXPECT_EQ(expected_count, total_packets_count);
+
+    // Now verify chunk and packet contents.
+    buffer->BeginRead();
+    size_t num_packets_read = 0;
+    while (true) {
+      TracePacket packet;
+      uid_t producer_uid = kInvalidUid;
+      if (!buffer->ReadNextTracePacket(&packet, &producer_uid))
+        break;
+      EXPECT_EQ(static_cast<uid_t>(1), producer_uid);
+
+      SlicedProtobufInputStream stream(&packet.slices());
+      size_t size = 0;
+      for (const Slice& slice : packet.slices())
+        size += slice.size;
+      protos::TracePacket parsed_packet;
+      bool success = parsed_packet.ParseFromBoundedZeroCopyStream(
+          &stream, static_cast<int>(size));
+      EXPECT_TRUE(success);
+      if (!success)
+        break;
+      EXPECT_TRUE(parsed_packet.has_for_testing());
+      EXPECT_EQ(kPacketPayload, parsed_packet.for_testing().str());
+      num_packets_read++;
+    }
+    EXPECT_EQ(expected_count, num_packets_read);
   }
 
+  size_t GetUnboundWriterCount(
+      const StartupTraceWriterRegistry& registry) const {
+    return registry.unbound_writers_.size() +
+           registry.unbound_owned_writers_.size();
+  }
+
+  size_t GetBindingRegistriesCount(
+      const SharedMemoryArbiterImpl& arbiter) const {
+    return arbiter.startup_trace_writer_registries_.size();
+  }
+
+  size_t GetUnboundWriterCount(const SharedMemoryArbiterImpl& arbiter) const {
+    size_t count = 0u;
+    for (const auto& reg : arbiter.startup_trace_writer_registries_) {
+      count += reg->unbound_writers_.size();
+      count += reg->unbound_owned_writers_.size();
+    }
+    return count;
+  }
+
+ protected:
+  static constexpr char kPacketPayload[] = "foo";
+
   FakeProducerEndpoint fake_producer_endpoint_;
   std::unique_ptr<base::TestTaskRunner> task_runner_;
   std::unique_ptr<SharedMemoryArbiterImpl> arbiter_;
@@ -98,25 +170,27 @@
   ChunkID last_read_max_chunk_id_ = 0;
 };
 
+constexpr char StartupTraceWriterTest::kPacketPayload[];
+
+namespace {
+
 size_t const kPageSizes[] = {4096, 65536};
 INSTANTIATE_TEST_CASE_P(PageSize,
                         StartupTraceWriterTest,
                         ::testing::ValuesIn(kPageSizes));
 
 TEST_P(StartupTraceWriterTest, CreateUnboundAndBind) {
-  // Create an unbound writer.
-  std::unique_ptr<StartupTraceWriter> writer(new StartupTraceWriter());
+  auto writer = CreateUnboundWriter();
 
-  // Bind it right away without having written any data before.
-  const BufferID kBufId = 42;
-  EXPECT_TRUE(arbiter_->BindStartupTraceWriter(writer.get(), kBufId));
+  // Bind writer right away without having written any data before.
+  EXPECT_TRUE(BindWriter(writer.get()));
 
   const size_t kNumPackets = 32;
   WritePackets(writer.get(), kNumPackets);
   // Finalizes the last packet and returns the chunk.
   writer.reset();
 
-  VerifyPacketCount(kNumPackets);
+  VerifyPackets(kNumPackets);
 }
 
 TEST_P(StartupTraceWriterTest, CreateBound) {
@@ -130,12 +204,11 @@
   // Finalizes the last packet and returns the chunk.
   writer.reset();
 
-  VerifyPacketCount(kNumPackets);
+  VerifyPackets(kNumPackets);
 }
 
 TEST_P(StartupTraceWriterTest, WriteWhileUnboundAndDiscard) {
-  // Create an unbound writer.
-  std::unique_ptr<StartupTraceWriter> writer(new StartupTraceWriter());
+  auto writer = CreateUnboundWriter();
 
   const size_t kNumPackets = 32;
   WritePackets(writer.get(), kNumPackets);
@@ -143,22 +216,20 @@
   // Should discard the written data.
   writer.reset();
 
-  VerifyPacketCount(0);
+  VerifyPackets(0);
 }
 
 TEST_P(StartupTraceWriterTest, WriteWhileUnboundAndBind) {
-  // Create an unbound writer.
-  std::unique_ptr<StartupTraceWriter> writer(new StartupTraceWriter());
+  auto writer = CreateUnboundWriter();
 
   const size_t kNumPackets = 32;
   WritePackets(writer.get(), kNumPackets);
 
   // Binding the writer should cause the previously written packets to be
   // written to the SMB and committed.
-  const BufferID kBufId = 42;
-  EXPECT_TRUE(arbiter_->BindStartupTraceWriter(writer.get(), kBufId));
+  EXPECT_TRUE(BindWriter(writer.get()));
 
-  VerifyPacketCount(kNumPackets);
+  VerifyPackets(kNumPackets);
 
   // Any further packets should be written to the SMB directly.
   const size_t kNumAdditionalPackets = 16;
@@ -166,12 +237,11 @@
   // Finalizes the last packet and returns the chunk.
   writer.reset();
 
-  VerifyPacketCount(kNumAdditionalPackets);
+  VerifyPackets(kNumAdditionalPackets);
 }
 
 TEST_P(StartupTraceWriterTest, WriteMultipleChunksWhileUnboundAndBind) {
-  // Create an unbound writer.
-  std::unique_ptr<StartupTraceWriter> writer(new StartupTraceWriter());
+  auto writer = CreateUnboundWriter();
 
   // Write a single packet to determine its size in the buffer.
   WritePackets(writer.get(), 1);
@@ -183,10 +253,9 @@
 
   // Binding the writer should cause the previously written packets to be
   // written to the SMB and committed.
-  const BufferID kBufId = 42;
-  EXPECT_TRUE(arbiter_->BindStartupTraceWriter(writer.get(), kBufId));
+  EXPECT_TRUE(BindWriter(writer.get()));
 
-  VerifyPacketCount(kNumPackets + 1);
+  VerifyPackets(kNumPackets + 1);
 
   // Any further packets should be written to the SMB directly.
   const size_t kNumAdditionalPackets = 16;
@@ -194,25 +263,62 @@
   // Finalizes the last packet and returns the chunk.
   writer.reset();
 
-  VerifyPacketCount(kNumAdditionalPackets);
+  VerifyPackets(kNumAdditionalPackets);
 }
 
 TEST_P(StartupTraceWriterTest, BindingWhileWritingFails) {
-  // Create an unbound writer.
-  std::unique_ptr<StartupTraceWriter> writer(new StartupTraceWriter());
+  auto writer = CreateUnboundWriter();
 
-  const BufferID kBufId = 42;
   {
-    // Begin a write by opening a TracePacket
+    // Begin a write by opening a TracePacket.
     auto packet = writer->NewTracePacket();
+    packet->set_for_testing()->set_str(kPacketPayload);
 
     // Binding while writing should fail.
-    EXPECT_FALSE(arbiter_->BindStartupTraceWriter(writer.get(), kBufId));
+    EXPECT_FALSE(BindWriter(writer.get()));
   }
 
   // Packet was completed, so binding should work now and emit the packet.
-  EXPECT_TRUE(arbiter_->BindStartupTraceWriter(writer.get(), kBufId));
-  VerifyPacketCount(1);
+  EXPECT_TRUE(BindWriter(writer.get()));
+  VerifyPackets(1);
+}
+
+TEST_P(StartupTraceWriterTest, CreateAndBindViaRegistry) {
+  std::unique_ptr<StartupTraceWriterRegistry> registry(
+      new StartupTraceWriterRegistry());
+
+  // Create unbound writers.
+  auto writer1 = registry->CreateUnboundTraceWriter();
+  auto writer2 = registry->CreateUnboundTraceWriter();
+
+  EXPECT_EQ(2u, GetUnboundWriterCount(*registry));
+
+  // Return |writer2|. It should be kept alive until the registry is bound.
+  registry->ReturnUnboundTraceWriter(std::move(writer2));
+
+  {
+    // Begin a write by opening a TracePacket on |writer1|.
+    auto packet = writer1->NewTracePacket();
+
+    // Binding |writer1| writing should fail, but |writer2| should be bound.
+    const BufferID kBufId = 42;
+    arbiter_->BindStartupTraceWriterRegistry(std::move(registry), kBufId);
+    EXPECT_EQ(1u, GetUnboundWriterCount(*arbiter_));
+  }
+
+  // Wait for |writer1| to be bound and the registry to be deleted.
+  auto checkpoint_name = "all_bound";
+  auto all_bound = task_runner_->CreateCheckpoint(checkpoint_name);
+  std::function<void()> task;
+  task = [&task, &all_bound, this]() {
+    if (!GetBindingRegistriesCount(*arbiter_)) {
+      all_bound();
+      return;
+    }
+    task_runner_->PostDelayedTask(task, 1);
+  };
+  task_runner_->PostDelayedTask(task, 1);
+  task_runner_->RunUntilCheckpoint(checkpoint_name);
 }
 
 }  // namespace
diff --git a/src/tracing/core/trace_buffer.cc b/src/tracing/core/trace_buffer.cc
index 4d990e1..5dfc62b 100644
--- a/src/tracing/core/trace_buffer.cc
+++ b/src/tracing/core/trace_buffer.cc
@@ -90,6 +90,7 @@
     return false;
   }
   size_ = size;
+  stats_.set_buffer_size(size);
   max_chunk_size_ = std::min(size, ChunkRecord::kMaxSize);
   wptr_ = begin();
   index_.clear();
@@ -115,7 +116,7 @@
   const size_t record_size =
       base::AlignUp<sizeof(ChunkRecord)>(size + sizeof(ChunkRecord));
   if (PERFETTO_UNLIKELY(record_size > max_chunk_size_)) {
-    stats_.abi_violations++;
+    stats_.set_abi_violations(stats_.abi_violations() + 1);
     PERFETTO_DCHECK(suppress_sanity_dchecks_for_testing_);
     return;
   }
@@ -162,7 +163,7 @@
                           prev->size != record_size ||
                           prev->num_fragments > num_fragments ||
                           (prev->flags & chunk_flags) != prev->flags)) {
-      stats_.abi_violations++;
+      stats_.set_abi_violations(stats_.abi_violations() + 1);
       PERFETTO_DCHECK(suppress_sanity_dchecks_for_testing_);
       return;
     }
@@ -179,7 +180,7 @@
     const auto subsequent_it = index_.find(subsequent_key);
     if (subsequent_it != index_.end() &&
         subsequent_it->second.num_fragments_read > 0) {
-      stats_.abi_violations++;
+      stats_.set_abi_violations(stats_.abi_violations() + 1);
       PERFETTO_DCHECK(suppress_sanity_dchecks_for_testing_);
       return;
     }
@@ -217,7 +218,7 @@
                       wptr - begin() + record_size, record_size);
     WriteChunkRecord(wptr, record, src, size);
     TRACE_BUFFER_DLOG("Chunk raw: %s", HexDump(wptr, record_size).c_str());
-    stats_.chunks_rewritten++;
+    stats_.set_chunks_rewritten(stats_.chunks_rewritten() + 1);
     return;
   }
 
@@ -229,7 +230,7 @@
     PERFETTO_DCHECK(res <= cached_size_to_end);
     AddPaddingRecord(cached_size_to_end);
     wptr_ = begin();
-    stats_.write_wrap_count++;
+    stats_.set_write_wrap_count(stats_.write_wrap_count() + 1);
     PERFETTO_DCHECK(size_to_end() >= record_size);
   }
 
@@ -256,8 +257,8 @@
   size_t padding_size = DeleteNextChunksFor(record_size);
 
   // Now first insert the new chunk. At the end, if necessary, add the padding.
-  stats_.chunks_written++;
-  stats_.bytes_written += size;
+  stats_.set_chunks_written(stats_.chunks_written() + 1);
+  stats_.set_bytes_written(stats_.bytes_written() + record_size);
   auto it_and_inserted = index_.emplace(
       key, ChunkMeta(GetChunkRecordAt(wptr_), num_fragments, chunk_complete,
                      chunk_flags, producer_uid_trusted));
@@ -270,7 +271,7 @@
   if (wptr_ >= end()) {
     PERFETTO_DCHECK(padding_size == 0);
     wptr_ = begin();
-    stats_.write_wrap_count++;
+    stats_.set_write_wrap_count(stats_.write_wrap_count() + 1);
   }
   DcheckIsAlignedAndWithinBounds(wptr_);
 
@@ -293,7 +294,8 @@
   if (chunk_id - last_chunk_id < kMaxChunkID / 2) {
     last_chunk_id = chunk_id;
   } else {
-    stats_.chunks_committed_out_of_order++;
+    stats_.set_chunks_committed_out_of_order(
+        stats_.chunks_committed_out_of_order() + 1);
   }
 
   if (padding_size)
@@ -335,8 +337,11 @@
       bool removed = false;
       if (PERFETTO_LIKELY(it != index_.end())) {
         const ChunkMeta& meta = it->second;
-        if (PERFETTO_UNLIKELY(meta.num_fragments_read < meta.num_fragments))
-          stats_.chunks_overwritten++;
+        if (PERFETTO_UNLIKELY(meta.num_fragments_read < meta.num_fragments)) {
+          stats_.set_chunks_overwritten(stats_.chunks_overwritten() + 1);
+          stats_.set_bytes_overwritten(stats_.bytes_overwritten() +
+                                       next_chunk.size);
+        }
         index_.erase(it);
         removed = true;
       }
@@ -346,6 +351,9 @@
                         next_chunk_ptr - begin(),
                         next_chunk_ptr - begin() + next_chunk.size, removed);
       PERFETTO_DCHECK(removed);
+    } else {
+      stats_.set_padding_bytes_cleared(stats_.padding_bytes_cleared() +
+                                       next_chunk.size);
     }
 
     next_chunk_ptr += next_chunk.size;
@@ -367,6 +375,7 @@
   TRACE_BUFFER_DLOG("AddPaddingRecord @ [%lu - %lu] %zu", wptr_ - begin(),
                     wptr_ - begin() + size, size);
   WriteChunkRecord(wptr_, record, nullptr, size - sizeof(ChunkRecord));
+  stats_.set_padding_bytes_written(stats_.padding_bytes_written() + size);
   // |wptr_| is deliberately not advanced when writing a padding record.
 }
 
@@ -379,7 +388,7 @@
   ChunkMeta::Key key(producer_id, writer_id, chunk_id);
   auto it = index_.find(key);
   if (it == index_.end()) {
-    stats_.patches_failed++;
+    stats_.set_patches_failed(stats_.patches_failed() + 1);
     return false;
   }
   ChunkMeta& chunk_meta = it->second;
@@ -409,7 +418,7 @@
         ptr > chunk_end - Patch::kSize) {
       // Either the IPC was so slow and in the meantime the writer managed to
       // wrap over |chunk_id| or the producer sent a malicious IPC.
-      stats_.patches_failed++;
+      stats_.set_patches_failed(stats_.patches_failed() + 1);
       return false;
     }
 
@@ -425,7 +434,7 @@
       "Chunk raw (after patch): %s",
       HexDump(chunk_begin, chunk_meta.chunk_record->size).c_str());
 
-  stats_.patches_succeeded += patches_size;
+  stats_.set_patches_succeeded(stats_.patches_succeeded() + patches_size);
   if (!other_patches_pending) {
     chunk_meta.flags &= ~kChunkNeedsPatching;
     chunk_meta.chunk_record->flags = chunk_meta.flags;
@@ -617,7 +626,7 @@
         // In extremely rare cases (producer bugged / malicious) the chunk might
         // contain an invalid fragment. In such case we don't want to stall the
         // sequence but just skip the chunk and move on.
-        stats_.abi_violations++;
+        stats_.set_abi_violations(stats_.abi_violations() + 1);
         PERFETTO_DCHECK(suppress_sanity_dchecks_for_testing_);
         break;
       }
@@ -625,7 +634,7 @@
       PERFETTO_DCHECK(action == kTryReadAhead);
       ReadAheadResult ra_res = ReadAhead(packet);
       if (ra_res == ReadAheadResult::kSucceededReturnSlices) {
-        stats_.readaheads_succeeded++;
+        stats_.set_readaheads_succeeded(stats_.readaheads_succeeded() + 1);
         *producer_uid = trusted_uid;
         return true;
       }
@@ -633,7 +642,7 @@
       if (ra_res == ReadAheadResult::kFailedMoveToNextSequence) {
         // readahead didn't find a contigous packet sequence. We'll try again
         // on the next ReadPacket() call.
-        stats_.readaheads_failed++;
+        stats_.set_readaheads_failed(stats_.readaheads_failed() + 1);
 
         // TODO(primiano): optimization: this MoveToEnd() is the reason why
         // MoveNext() (that is called in the outer for(;;MoveNext)) needs to
@@ -723,7 +732,7 @@
     PERFETTO_DCHECK(read_iter_.cur == it.cur);
 
     if (PERFETTO_UNLIKELY(packet_corruption)) {
-      stats_.abi_violations++;
+      stats_.set_abi_violations(stats_.abi_violations() + 1);
       PERFETTO_DCHECK(suppress_sanity_dchecks_for_testing_);
       *packet = TracePacket();  // clear.
       return ReadAheadResult::kFailedStayOnSameSequence;
@@ -749,7 +758,7 @@
                         packet_begin >= record_end)) {
     // The producer has a bug or is malicious and did declare that the chunk
     // contains more packets beyond its boundaries.
-    stats_.abi_violations++;
+    stats_.set_abi_violations(stats_.abi_violations() + 1);
     PERFETTO_DCHECK(suppress_sanity_dchecks_for_testing_);
     return false;
   }
@@ -767,18 +776,30 @@
   const uint8_t* next_packet = packet_data + packet_size;
   if (PERFETTO_UNLIKELY(next_packet <= packet_begin ||
                         next_packet > record_end)) {
-    stats_.abi_violations++;
+    stats_.set_abi_violations(stats_.abi_violations() + 1);
     PERFETTO_DCHECK(suppress_sanity_dchecks_for_testing_);
     chunk_meta->cur_fragment_offset = 0;
     chunk_meta->num_fragments_read = chunk_meta->num_fragments;
+    if (PERFETTO_LIKELY(chunk_meta->is_complete)) {
+      stats_.set_chunks_read(stats_.chunks_read() + 1);
+      stats_.set_bytes_read(stats_.bytes_read() +
+                            chunk_meta->chunk_record->size);
+    }
     return false;
   }
   chunk_meta->cur_fragment_offset =
       static_cast<uint16_t>(next_packet - packets_begin);
   chunk_meta->num_fragments_read++;
 
+  if (PERFETTO_UNLIKELY(chunk_meta->num_fragments_read ==
+                            chunk_meta->num_fragments &&
+                        chunk_meta->is_complete)) {
+    stats_.set_chunks_read(stats_.chunks_read() + 1);
+    stats_.set_bytes_read(stats_.bytes_read() + chunk_meta->chunk_record->size);
+  }
+
   if (PERFETTO_UNLIKELY(packet_size == 0)) {
-    stats_.abi_violations++;
+    stats_.set_abi_violations(stats_.abi_violations() + 1);
     PERFETTO_DCHECK(suppress_sanity_dchecks_for_testing_);
     return false;
   }
diff --git a/src/tracing/core/trace_buffer.h b/src/tracing/core/trace_buffer.h
index 7c33f13..8b17e1c 100644
--- a/src/tracing/core/trace_buffer.h
+++ b/src/tracing/core/trace_buffer.h
@@ -29,6 +29,7 @@
 #include "perfetto/base/paged_memory.h"
 #include "perfetto/tracing/core/basic_types.h"
 #include "perfetto/tracing/core/slice.h"
+#include "perfetto/tracing/core/trace_stats.h"
 
 namespace perfetto {
 
@@ -128,22 +129,6 @@
  public:
   static const size_t InlineChunkHeaderSize;  // For test/fake_packet.{cc,h}.
 
-  // Maintain these fields consistent with trace_stats.proto. See comments in
-  // the .proto for the semantic of these fields.
-  struct Stats {
-    uint64_t bytes_written = 0;
-    uint64_t chunks_written = 0;
-    uint64_t chunks_rewritten = 0;
-    uint64_t chunks_overwritten = 0;
-    uint64_t chunks_committed_out_of_order = 0;
-    uint64_t write_wrap_count = 0;
-    uint64_t patches_succeeded = 0;
-    uint64_t patches_failed = 0;
-    uint64_t readaheads_succeeded = 0;
-    uint64_t readaheads_failed = 0;
-    uint64_t abi_violations = 0;
-  };
-
   // Argument for out-of-band patches applied through TryPatchChunkContents().
   struct Patch {
     // From SharedMemoryABI::kPacketHeaderSize.
@@ -232,7 +217,7 @@
   //   P1, P5, P7, P4 (P4 cannot come after P5)
   bool ReadNextTracePacket(TracePacket*, uid_t* producer_uid);
 
-  const Stats& stats() const { return stats_; }
+  const TraceStats::BufferStats& stats() const { return stats_; }
   size_t size() const { return size_; }
 
  private:
@@ -559,7 +544,7 @@
   std::map<std::pair<ProducerID, WriterID>, ChunkID> last_chunk_id_written_;
 
   // Statistics about buffer usage.
-  Stats stats_;
+  TraceStats::BufferStats stats_;
 
 #if PERFETTO_DCHECK_IS_ON()
   bool changed_since_last_read_ = false;
diff --git a/src/tracing/core/trace_buffer_unittest.cc b/src/tracing/core/trace_buffer_unittest.cc
index 9f1c7a8..d6eef84 100644
--- a/src/tracing/core/trace_buffer_unittest.cc
+++ b/src/tracing/core/trace_buffer_unittest.cc
@@ -159,6 +159,14 @@
     trace_buffer()->BeginRead();
     ASSERT_THAT(ReadPacket(), ElementsAre(FakePacketFragment(42, seed)));
     ASSERT_THAT(ReadPacket(), IsEmpty());
+    EXPECT_EQ(chunk_id + 1u, trace_buffer()->stats().chunks_written());
+    EXPECT_EQ(trace_buffer()->stats().chunks_written(),
+              trace_buffer()->stats().chunks_read());
+    EXPECT_LT(0u, trace_buffer()->stats().bytes_written());
+    EXPECT_EQ(trace_buffer()->stats().bytes_written(),
+              trace_buffer()->stats().bytes_read());
+    EXPECT_EQ(0u, trace_buffer()->stats().padding_bytes_written());
+    EXPECT_EQ(0u, trace_buffer()->stats().padding_bytes_cleared());
   }
 }
 
@@ -250,6 +258,21 @@
   ASSERT_THAT(ReadPacket(), ElementsAre(FakePacketFragment(2048 - 16, 'e')));
   ASSERT_THAT(ReadPacket(), ElementsAre(FakePacketFragment(512 - 16, 'f')));
   ASSERT_THAT(ReadPacket(), IsEmpty());
+
+  EXPECT_EQ(6u, trace_buffer()->stats().chunks_written());
+  EXPECT_EQ(3u, trace_buffer()->stats().chunks_overwritten());
+  EXPECT_EQ(3u, trace_buffer()->stats().chunks_read());
+  EXPECT_EQ(4480u, trace_buffer()->stats().bytes_written());
+  EXPECT_EQ(896u, trace_buffer()->stats().bytes_overwritten());
+  EXPECT_EQ(3584u, trace_buffer()->stats().bytes_read());
+  EXPECT_EQ(512u, trace_buffer()->stats().padding_bytes_written());
+  EXPECT_EQ(0u, trace_buffer()->stats().padding_bytes_cleared());
+
+  // Adding another chunk should clear some of the padding.
+  ASSERT_EQ(128u, CreateChunk(ProducerID(1), WriterID(1), ChunkID(6))
+                      .AddPacket(128 - 16, 'g')
+                      .CopyIntoTraceBuffer());
+  EXPECT_EQ(384u, trace_buffer()->stats().padding_bytes_cleared());
 }
 
 // Like ReadWrite_Padding, but this time the padding introduced is the minimum
@@ -488,7 +511,7 @@
   CreateChunk(ProducerID(1), WriterID(1), ChunkID(2))
       .AddPacket(30, 'c')
       .CopyIntoTraceBuffer();
-  EXPECT_EQ(0u, trace_buffer()->stats().chunks_committed_out_of_order);
+  EXPECT_EQ(0u, trace_buffer()->stats().chunks_committed_out_of_order());
   trace_buffer()->BeginRead();
   ASSERT_THAT(ReadPacket(), ElementsAre(FakePacketFragment(10, 'a')));
   ASSERT_THAT(ReadPacket(), IsEmpty());
@@ -496,7 +519,7 @@
   CreateChunk(ProducerID(1), WriterID(1), ChunkID(1))
       .AddPacket(20, 'b')
       .CopyIntoTraceBuffer();
-  EXPECT_EQ(1u, trace_buffer()->stats().chunks_committed_out_of_order);
+  EXPECT_EQ(1u, trace_buffer()->stats().chunks_committed_out_of_order());
   trace_buffer()->BeginRead();
   ASSERT_THAT(ReadPacket(), ElementsAre(FakePacketFragment(20, 'b')));
   ASSERT_THAT(ReadPacket(), ElementsAre(FakePacketFragment(30, 'c')));
@@ -1285,7 +1308,7 @@
       .AddPacket(100, 'b')
       .PadTo(512)
       .CopyIntoTraceBuffer(/*chunk_complete=*/false);
-  EXPECT_EQ(0u, trace_buffer()->stats().chunks_rewritten);
+  EXPECT_EQ(0u, trace_buffer()->stats().chunks_rewritten());
   CreateChunk(ProducerID(1), WriterID(1), ChunkID(0))
       .AddPacket(100, 'a')
       .AddPacket(100, 'b')
@@ -1294,7 +1317,7 @@
       .PadTo(512)
       .CopyIntoTraceBuffer();
   trace_buffer()->BeginRead();
-  EXPECT_EQ(1u, trace_buffer()->stats().chunks_rewritten);
+  EXPECT_EQ(1u, trace_buffer()->stats().chunks_rewritten());
   ASSERT_THAT(ReadPacket(), ElementsAre(FakePacketFragment(100, 'a')));
   ASSERT_THAT(ReadPacket(), ElementsAre(FakePacketFragment(100, 'b')));
   ASSERT_THAT(ReadPacket(), ElementsAre(FakePacketFragment(100, 'c')));
diff --git a/src/tracing/core/trace_stats.cc b/src/tracing/core/trace_stats.cc
new file mode 100644
index 0000000..4350995
--- /dev/null
+++ b/src/tracing/core/trace_stats.cc
@@ -0,0 +1,330 @@
+/*
+ * Copyright (C) 2017 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*******************************************************************************
+ * AUTOGENERATED - DO NOT EDIT
+ *******************************************************************************
+ * This file has been generated from the protobuf message
+ * perfetto/common/trace_stats.proto
+ * by
+ * ../../tools/proto_to_cpp/proto_to_cpp.cc.
+ * If you need to make changes here, change the .proto file and then run
+ * ./tools/gen_tracing_cpp_headers_from_protos
+ */
+
+#include "perfetto/tracing/core/trace_stats.h"
+
+#include "perfetto/common/trace_stats.pb.h"
+
+namespace perfetto {
+
+TraceStats::TraceStats() = default;
+TraceStats::~TraceStats() = default;
+TraceStats::TraceStats(const TraceStats&) = default;
+TraceStats& TraceStats::operator=(const TraceStats&) = default;
+TraceStats::TraceStats(TraceStats&&) noexcept = default;
+TraceStats& TraceStats::operator=(TraceStats&&) = default;
+
+void TraceStats::FromProto(const perfetto::protos::TraceStats& proto) {
+  buffer_stats_.clear();
+  for (const auto& field : proto.buffer_stats()) {
+    buffer_stats_.emplace_back();
+    buffer_stats_.back().FromProto(field);
+  }
+
+  static_assert(
+      sizeof(producers_connected_) == sizeof(proto.producers_connected()),
+      "size mismatch");
+  producers_connected_ =
+      static_cast<decltype(producers_connected_)>(proto.producers_connected());
+
+  static_assert(sizeof(producers_seen_) == sizeof(proto.producers_seen()),
+                "size mismatch");
+  producers_seen_ =
+      static_cast<decltype(producers_seen_)>(proto.producers_seen());
+
+  static_assert(sizeof(data_sources_registered_) ==
+                    sizeof(proto.data_sources_registered()),
+                "size mismatch");
+  data_sources_registered_ = static_cast<decltype(data_sources_registered_)>(
+      proto.data_sources_registered());
+
+  static_assert(sizeof(data_sources_seen_) == sizeof(proto.data_sources_seen()),
+                "size mismatch");
+  data_sources_seen_ =
+      static_cast<decltype(data_sources_seen_)>(proto.data_sources_seen());
+
+  static_assert(sizeof(tracing_sessions_) == sizeof(proto.tracing_sessions()),
+                "size mismatch");
+  tracing_sessions_ =
+      static_cast<decltype(tracing_sessions_)>(proto.tracing_sessions());
+
+  static_assert(sizeof(total_buffers_) == sizeof(proto.total_buffers()),
+                "size mismatch");
+  total_buffers_ = static_cast<decltype(total_buffers_)>(proto.total_buffers());
+  unknown_fields_ = proto.unknown_fields();
+}
+
+void TraceStats::ToProto(perfetto::protos::TraceStats* proto) const {
+  proto->Clear();
+
+  for (const auto& it : buffer_stats_) {
+    auto* entry = proto->add_buffer_stats();
+    it.ToProto(entry);
+  }
+
+  static_assert(
+      sizeof(producers_connected_) == sizeof(proto->producers_connected()),
+      "size mismatch");
+  proto->set_producers_connected(
+      static_cast<decltype(proto->producers_connected())>(
+          producers_connected_));
+
+  static_assert(sizeof(producers_seen_) == sizeof(proto->producers_seen()),
+                "size mismatch");
+  proto->set_producers_seen(
+      static_cast<decltype(proto->producers_seen())>(producers_seen_));
+
+  static_assert(sizeof(data_sources_registered_) ==
+                    sizeof(proto->data_sources_registered()),
+                "size mismatch");
+  proto->set_data_sources_registered(
+      static_cast<decltype(proto->data_sources_registered())>(
+          data_sources_registered_));
+
+  static_assert(
+      sizeof(data_sources_seen_) == sizeof(proto->data_sources_seen()),
+      "size mismatch");
+  proto->set_data_sources_seen(
+      static_cast<decltype(proto->data_sources_seen())>(data_sources_seen_));
+
+  static_assert(sizeof(tracing_sessions_) == sizeof(proto->tracing_sessions()),
+                "size mismatch");
+  proto->set_tracing_sessions(
+      static_cast<decltype(proto->tracing_sessions())>(tracing_sessions_));
+
+  static_assert(sizeof(total_buffers_) == sizeof(proto->total_buffers()),
+                "size mismatch");
+  proto->set_total_buffers(
+      static_cast<decltype(proto->total_buffers())>(total_buffers_));
+  *(proto->mutable_unknown_fields()) = unknown_fields_;
+}
+
+TraceStats::BufferStats::BufferStats() = default;
+TraceStats::BufferStats::~BufferStats() = default;
+TraceStats::BufferStats::BufferStats(const TraceStats::BufferStats&) = default;
+TraceStats::BufferStats& TraceStats::BufferStats::operator=(
+    const TraceStats::BufferStats&) = default;
+TraceStats::BufferStats::BufferStats(TraceStats::BufferStats&&) noexcept =
+    default;
+TraceStats::BufferStats& TraceStats::BufferStats::operator=(
+    TraceStats::BufferStats&&) = default;
+
+void TraceStats::BufferStats::FromProto(
+    const perfetto::protos::TraceStats_BufferStats& proto) {
+  static_assert(sizeof(buffer_size_) == sizeof(proto.buffer_size()),
+                "size mismatch");
+  buffer_size_ = static_cast<decltype(buffer_size_)>(proto.buffer_size());
+
+  static_assert(sizeof(bytes_written_) == sizeof(proto.bytes_written()),
+                "size mismatch");
+  bytes_written_ = static_cast<decltype(bytes_written_)>(proto.bytes_written());
+
+  static_assert(sizeof(bytes_overwritten_) == sizeof(proto.bytes_overwritten()),
+                "size mismatch");
+  bytes_overwritten_ =
+      static_cast<decltype(bytes_overwritten_)>(proto.bytes_overwritten());
+
+  static_assert(sizeof(bytes_read_) == sizeof(proto.bytes_read()),
+                "size mismatch");
+  bytes_read_ = static_cast<decltype(bytes_read_)>(proto.bytes_read());
+
+  static_assert(
+      sizeof(padding_bytes_written_) == sizeof(proto.padding_bytes_written()),
+      "size mismatch");
+  padding_bytes_written_ = static_cast<decltype(padding_bytes_written_)>(
+      proto.padding_bytes_written());
+
+  static_assert(
+      sizeof(padding_bytes_cleared_) == sizeof(proto.padding_bytes_cleared()),
+      "size mismatch");
+  padding_bytes_cleared_ = static_cast<decltype(padding_bytes_cleared_)>(
+      proto.padding_bytes_cleared());
+
+  static_assert(sizeof(chunks_written_) == sizeof(proto.chunks_written()),
+                "size mismatch");
+  chunks_written_ =
+      static_cast<decltype(chunks_written_)>(proto.chunks_written());
+
+  static_assert(sizeof(chunks_rewritten_) == sizeof(proto.chunks_rewritten()),
+                "size mismatch");
+  chunks_rewritten_ =
+      static_cast<decltype(chunks_rewritten_)>(proto.chunks_rewritten());
+
+  static_assert(
+      sizeof(chunks_overwritten_) == sizeof(proto.chunks_overwritten()),
+      "size mismatch");
+  chunks_overwritten_ =
+      static_cast<decltype(chunks_overwritten_)>(proto.chunks_overwritten());
+
+  static_assert(sizeof(chunks_read_) == sizeof(proto.chunks_read()),
+                "size mismatch");
+  chunks_read_ = static_cast<decltype(chunks_read_)>(proto.chunks_read());
+
+  static_assert(sizeof(chunks_committed_out_of_order_) ==
+                    sizeof(proto.chunks_committed_out_of_order()),
+                "size mismatch");
+  chunks_committed_out_of_order_ =
+      static_cast<decltype(chunks_committed_out_of_order_)>(
+          proto.chunks_committed_out_of_order());
+
+  static_assert(sizeof(write_wrap_count_) == sizeof(proto.write_wrap_count()),
+                "size mismatch");
+  write_wrap_count_ =
+      static_cast<decltype(write_wrap_count_)>(proto.write_wrap_count());
+
+  static_assert(sizeof(patches_succeeded_) == sizeof(proto.patches_succeeded()),
+                "size mismatch");
+  patches_succeeded_ =
+      static_cast<decltype(patches_succeeded_)>(proto.patches_succeeded());
+
+  static_assert(sizeof(patches_failed_) == sizeof(proto.patches_failed()),
+                "size mismatch");
+  patches_failed_ =
+      static_cast<decltype(patches_failed_)>(proto.patches_failed());
+
+  static_assert(
+      sizeof(readaheads_succeeded_) == sizeof(proto.readaheads_succeeded()),
+      "size mismatch");
+  readaheads_succeeded_ = static_cast<decltype(readaheads_succeeded_)>(
+      proto.readaheads_succeeded());
+
+  static_assert(sizeof(readaheads_failed_) == sizeof(proto.readaheads_failed()),
+                "size mismatch");
+  readaheads_failed_ =
+      static_cast<decltype(readaheads_failed_)>(proto.readaheads_failed());
+
+  static_assert(sizeof(abi_violations_) == sizeof(proto.abi_violations()),
+                "size mismatch");
+  abi_violations_ =
+      static_cast<decltype(abi_violations_)>(proto.abi_violations());
+  unknown_fields_ = proto.unknown_fields();
+}
+
+void TraceStats::BufferStats::ToProto(
+    perfetto::protos::TraceStats_BufferStats* proto) const {
+  proto->Clear();
+
+  static_assert(sizeof(buffer_size_) == sizeof(proto->buffer_size()),
+                "size mismatch");
+  proto->set_buffer_size(
+      static_cast<decltype(proto->buffer_size())>(buffer_size_));
+
+  static_assert(sizeof(bytes_written_) == sizeof(proto->bytes_written()),
+                "size mismatch");
+  proto->set_bytes_written(
+      static_cast<decltype(proto->bytes_written())>(bytes_written_));
+
+  static_assert(
+      sizeof(bytes_overwritten_) == sizeof(proto->bytes_overwritten()),
+      "size mismatch");
+  proto->set_bytes_overwritten(
+      static_cast<decltype(proto->bytes_overwritten())>(bytes_overwritten_));
+
+  static_assert(sizeof(bytes_read_) == sizeof(proto->bytes_read()),
+                "size mismatch");
+  proto->set_bytes_read(
+      static_cast<decltype(proto->bytes_read())>(bytes_read_));
+
+  static_assert(
+      sizeof(padding_bytes_written_) == sizeof(proto->padding_bytes_written()),
+      "size mismatch");
+  proto->set_padding_bytes_written(
+      static_cast<decltype(proto->padding_bytes_written())>(
+          padding_bytes_written_));
+
+  static_assert(
+      sizeof(padding_bytes_cleared_) == sizeof(proto->padding_bytes_cleared()),
+      "size mismatch");
+  proto->set_padding_bytes_cleared(
+      static_cast<decltype(proto->padding_bytes_cleared())>(
+          padding_bytes_cleared_));
+
+  static_assert(sizeof(chunks_written_) == sizeof(proto->chunks_written()),
+                "size mismatch");
+  proto->set_chunks_written(
+      static_cast<decltype(proto->chunks_written())>(chunks_written_));
+
+  static_assert(sizeof(chunks_rewritten_) == sizeof(proto->chunks_rewritten()),
+                "size mismatch");
+  proto->set_chunks_rewritten(
+      static_cast<decltype(proto->chunks_rewritten())>(chunks_rewritten_));
+
+  static_assert(
+      sizeof(chunks_overwritten_) == sizeof(proto->chunks_overwritten()),
+      "size mismatch");
+  proto->set_chunks_overwritten(
+      static_cast<decltype(proto->chunks_overwritten())>(chunks_overwritten_));
+
+  static_assert(sizeof(chunks_read_) == sizeof(proto->chunks_read()),
+                "size mismatch");
+  proto->set_chunks_read(
+      static_cast<decltype(proto->chunks_read())>(chunks_read_));
+
+  static_assert(sizeof(chunks_committed_out_of_order_) ==
+                    sizeof(proto->chunks_committed_out_of_order()),
+                "size mismatch");
+  proto->set_chunks_committed_out_of_order(
+      static_cast<decltype(proto->chunks_committed_out_of_order())>(
+          chunks_committed_out_of_order_));
+
+  static_assert(sizeof(write_wrap_count_) == sizeof(proto->write_wrap_count()),
+                "size mismatch");
+  proto->set_write_wrap_count(
+      static_cast<decltype(proto->write_wrap_count())>(write_wrap_count_));
+
+  static_assert(
+      sizeof(patches_succeeded_) == sizeof(proto->patches_succeeded()),
+      "size mismatch");
+  proto->set_patches_succeeded(
+      static_cast<decltype(proto->patches_succeeded())>(patches_succeeded_));
+
+  static_assert(sizeof(patches_failed_) == sizeof(proto->patches_failed()),
+                "size mismatch");
+  proto->set_patches_failed(
+      static_cast<decltype(proto->patches_failed())>(patches_failed_));
+
+  static_assert(
+      sizeof(readaheads_succeeded_) == sizeof(proto->readaheads_succeeded()),
+      "size mismatch");
+  proto->set_readaheads_succeeded(
+      static_cast<decltype(proto->readaheads_succeeded())>(
+          readaheads_succeeded_));
+
+  static_assert(
+      sizeof(readaheads_failed_) == sizeof(proto->readaheads_failed()),
+      "size mismatch");
+  proto->set_readaheads_failed(
+      static_cast<decltype(proto->readaheads_failed())>(readaheads_failed_));
+
+  static_assert(sizeof(abi_violations_) == sizeof(proto->abi_violations()),
+                "size mismatch");
+  proto->set_abi_violations(
+      static_cast<decltype(proto->abi_violations())>(abi_violations_));
+  *(proto->mutable_unknown_fields()) = unknown_fields_;
+}
+
+}  // namespace perfetto
diff --git a/src/tracing/core/tracing_service_impl.cc b/src/tracing/core/tracing_service_impl.cc
index 9eb44d8..4a4be28 100644
--- a/src/tracing/core/tracing_service_impl.cc
+++ b/src/tracing/core/tracing_service_impl.cc
@@ -1598,15 +1598,23 @@
   packet.set_trusted_uid(static_cast<int32_t>(uid_));
 
   protos::TraceStats* trace_stats = packet.mutable_trace_stats();
-  trace_stats->set_producers_connected(
-      static_cast<uint32_t>(producers_.size()));
-  trace_stats->set_producers_seen(last_producer_id_);
-  trace_stats->set_data_sources_registered(
+  GetTraceStats(tracing_session).ToProto(trace_stats);
+  Slice slice = Slice::Allocate(static_cast<size_t>(packet.ByteSize()));
+  PERFETTO_CHECK(packet.SerializeWithCachedSizesToArray(slice.own_data()));
+  packets->emplace_back();
+  packets->back().AddSlice(std::move(slice));
+}
+
+TraceStats TracingServiceImpl::GetTraceStats(TracingSession* tracing_session) {
+  TraceStats trace_stats;
+  trace_stats.set_producers_connected(static_cast<uint32_t>(producers_.size()));
+  trace_stats.set_producers_seen(last_producer_id_);
+  trace_stats.set_data_sources_registered(
       static_cast<uint32_t>(data_sources_.size()));
-  trace_stats->set_data_sources_seen(last_data_source_instance_id_);
-  trace_stats->set_tracing_sessions(
+  trace_stats.set_data_sources_seen(last_data_source_instance_id_);
+  trace_stats.set_tracing_sessions(
       static_cast<uint32_t>(tracing_sessions_.size()));
-  trace_stats->set_total_buffers(static_cast<uint32_t>(buffers_.size()));
+  trace_stats.set_total_buffers(static_cast<uint32_t>(buffers_.size()));
 
   for (BufferID buf_id : tracing_session->buffers_index) {
     TraceBuffer* buf = GetBufferByID(buf_id);
@@ -1614,25 +1622,9 @@
       PERFETTO_DFATAL("Buffer not found.");
       continue;
     }
-    auto* buf_stats_proto = trace_stats->add_buffer_stats();
-    const TraceBuffer::Stats& buf_stats = buf->stats();
-    buf_stats_proto->set_bytes_written(buf_stats.bytes_written);
-    buf_stats_proto->set_chunks_written(buf_stats.chunks_written);
-    buf_stats_proto->set_chunks_rewritten(buf_stats.chunks_rewritten);
-    buf_stats_proto->set_chunks_overwritten(buf_stats.chunks_overwritten);
-    buf_stats_proto->set_chunks_committed_out_of_order(
-        buf_stats.chunks_committed_out_of_order);
-    buf_stats_proto->set_write_wrap_count(buf_stats.write_wrap_count);
-    buf_stats_proto->set_patches_succeeded(buf_stats.patches_succeeded);
-    buf_stats_proto->set_patches_failed(buf_stats.patches_failed);
-    buf_stats_proto->set_readaheads_succeeded(buf_stats.readaheads_succeeded);
-    buf_stats_proto->set_readaheads_failed(buf_stats.readaheads_failed);
-    buf_stats_proto->set_abi_violations(buf_stats.abi_violations);
+    *trace_stats.add_buffer_stats() = buf->stats();
   }  // for (buf in session).
-  Slice slice = Slice::Allocate(static_cast<size_t>(packet.ByteSize()));
-  PERFETTO_CHECK(packet.SerializeWithCachedSizesToArray(slice.own_data()));
-  packets->emplace_back();
-  packets->back().AddSlice(std::move(slice));
+  return trace_stats;
 }
 
 void TracingServiceImpl::MaybeEmitTraceConfig(
@@ -1762,6 +1754,22 @@
   });
 }
 
+void TracingServiceImpl::ConsumerEndpointImpl::GetTraceStats() {
+  PERFETTO_DCHECK_THREAD(thread_checker_);
+  bool success = false;
+  TraceStats stats;
+  TracingSession* session = service_->GetTracingSession(tracing_session_id_);
+  if (session) {
+    success = true;
+    stats = service_->GetTraceStats(session);
+  }
+  auto weak_this = GetWeakPtr();
+  task_runner_->PostTask([weak_this, success, stats] {
+    if (weak_this)
+      weak_this->consumer_->OnTraceStats(success, stats);
+  });
+}
+
 base::WeakPtr<TracingServiceImpl::ConsumerEndpointImpl>
 TracingServiceImpl::ConsumerEndpointImpl::GetWeakPtr() {
   PERFETTO_DCHECK_THREAD(thread_checker_);
diff --git a/src/tracing/core/tracing_service_impl.h b/src/tracing/core/tracing_service_impl.h
index f16aaf9..7822b58 100644
--- a/src/tracing/core/tracing_service_impl.h
+++ b/src/tracing/core/tracing_service_impl.h
@@ -32,6 +32,7 @@
 #include "perfetto/tracing/core/data_source_descriptor.h"
 #include "perfetto/tracing/core/shared_memory_abi.h"
 #include "perfetto/tracing/core/trace_config.h"
+#include "perfetto/tracing/core/trace_stats.h"
 #include "perfetto/tracing/core/tracing_service.h"
 #include "src/tracing/core/id_allocator.h"
 
@@ -162,6 +163,7 @@
     void Flush(uint32_t timeout_ms, FlushCallback) override;
     void Detach(const std::string& key) override;
     void Attach(const std::string& key) override;
+    void GetTraceStats() override;
 
    private:
     friend class TracingServiceImpl;
@@ -377,6 +379,7 @@
   void SnapshotSyncMarker(std::vector<TracePacket>*);
   void SnapshotClocks(std::vector<TracePacket>*);
   void SnapshotStats(TracingSession*, std::vector<TracePacket>*);
+  TraceStats GetTraceStats(TracingSession* tracing_session);
   void MaybeEmitTraceConfig(TracingSession*, std::vector<TracePacket>*);
   void OnFlushTimeout(TracingSessionID, FlushRequestID);
   void OnDisableTracingTimeout(TracingSessionID);
diff --git a/src/tracing/ipc/consumer/consumer_ipc_client_impl.cc b/src/tracing/ipc/consumer/consumer_ipc_client_impl.cc
index a37bacd..f7db5a6 100644
--- a/src/tracing/ipc/consumer/consumer_ipc_client_impl.cc
+++ b/src/tracing/ipc/consumer/consumer_ipc_client_impl.cc
@@ -23,6 +23,7 @@
 #include "perfetto/ipc/client.h"
 #include "perfetto/tracing/core/consumer.h"
 #include "perfetto/tracing/core/trace_config.h"
+#include "perfetto/tracing/core/trace_stats.h"
 
 // TODO(fmayer): Add a test to check to what happens when ConsumerIPCClientImpl
 // gets destroyed w.r.t. the Consumer pointer. Also think to lifetime of the
@@ -253,4 +254,29 @@
   }
 }
 
+void ConsumerIPCClientImpl::GetTraceStats() {
+  if (!connected_) {
+    PERFETTO_DLOG("Cannot GetTraceStats(), not connected to tracing service");
+    return;
+  }
+
+  protos::GetTraceStatsRequest req;
+  ipc::Deferred<protos::GetTraceStatsResponse> async_response;
+  auto weak_this = weak_ptr_factory_.GetWeakPtr();
+
+  async_response.Bind(
+      [weak_this](ipc::AsyncResult<protos::GetTraceStatsResponse> response) {
+        if (!weak_this)
+          return;
+        TraceStats trace_stats;
+        if (!response) {
+          weak_this->consumer_->OnTraceStats(/*success=*/false, trace_stats);
+          return;
+        }
+        trace_stats.FromProto(response->trace_stats());
+        weak_this->consumer_->OnTraceStats(/*success=*/true, trace_stats);
+      });
+  consumer_port_.GetTraceStats(req, std::move(async_response));
+}
+
 }  // namespace perfetto
diff --git a/src/tracing/ipc/consumer/consumer_ipc_client_impl.h b/src/tracing/ipc/consumer/consumer_ipc_client_impl.h
index a41b200..0210dc3 100644
--- a/src/tracing/ipc/consumer/consumer_ipc_client_impl.h
+++ b/src/tracing/ipc/consumer/consumer_ipc_client_impl.h
@@ -67,6 +67,7 @@
   void Flush(uint32_t timeout_ms, FlushCallback) override;
   void Detach(const std::string& key) override;
   void Attach(const std::string& key) override;
+  void GetTraceStats() override;
 
   // ipc::ServiceProxy::EventListener implementation.
   // These methods are invoked by the IPC layer, which knows nothing about
diff --git a/src/tracing/ipc/service/consumer_ipc_service.cc b/src/tracing/ipc/service/consumer_ipc_service.cc
index 125b0da..5eade92 100644
--- a/src/tracing/ipc/service/consumer_ipc_service.cc
+++ b/src/tracing/ipc/service/consumer_ipc_service.cc
@@ -27,6 +27,7 @@
 #include "perfetto/tracing/core/slice.h"
 #include "perfetto/tracing/core/trace_config.h"
 #include "perfetto/tracing/core/trace_packet.h"
+#include "perfetto/tracing/core/trace_stats.h"
 #include "perfetto/tracing/core/tracing_service.h"
 
 namespace perfetto {
@@ -137,6 +138,15 @@
   remote_consumer->service_endpoint->Attach(req.key());
 }
 
+// Called by the IPC layer.
+void ConsumerIPCService::GetTraceStats(const protos::GetTraceStatsRequest&,
+                                       DeferredGetTraceStatsResponse resp) {
+  // OnTraceStats() will resolve the |get_trace_stats_response|.
+  RemoteConsumer* remote_consumer = GetConsumerForCurrentRequest();
+  remote_consumer->get_trace_stats_response = std::move(resp);
+  remote_consumer->service_endpoint->GetTraceStats();
+}
+
 // Called by the service in response to a service_endpoint->Flush() request.
 void ConsumerIPCService::OnFlushCallback(
     bool success,
@@ -247,4 +257,15 @@
   std::move(attach_response).Resolve(std::move(response));
 }
 
+void ConsumerIPCService::RemoteConsumer::OnTraceStats(bool success,
+                                                      const TraceStats& stats) {
+  if (!success) {
+    std::move(get_trace_stats_response).Reject();
+    return;
+  }
+  auto response = ipc::AsyncResult<protos::GetTraceStatsResponse>::Create();
+  stats.ToProto(response->mutable_trace_stats());
+  std::move(get_trace_stats_response).Resolve(std::move(response));
+}
+
 }  // namespace perfetto
diff --git a/src/tracing/ipc/service/consumer_ipc_service.h b/src/tracing/ipc/service/consumer_ipc_service.h
index a3fa030..6a3782b 100644
--- a/src/tracing/ipc/service/consumer_ipc_service.h
+++ b/src/tracing/ipc/service/consumer_ipc_service.h
@@ -57,6 +57,8 @@
   void Flush(const protos::FlushRequest&, DeferredFlushResponse) override;
   void Detach(const protos::DetachRequest&, DeferredDetachResponse) override;
   void Attach(const protos::AttachRequest&, DeferredAttachResponse) override;
+  void GetTraceStats(const protos::GetTraceStatsRequest&,
+                     DeferredGetTraceStatsResponse) override;
   void OnClientDisconnected() override;
 
  private:
@@ -76,6 +78,7 @@
     void OnTraceData(std::vector<TracePacket>, bool has_more) override;
     void OnDetach(bool) override;
     void OnAttach(bool, const TraceConfig&) override;
+    void OnTraceStats(bool, const TraceStats&) override;
 
     // The interface obtained from the core service business logic through
     // TracingService::ConnectConsumer(this). This allows to invoke methods for
@@ -96,6 +99,9 @@
 
     // As above, but for the Attach() case.
     DeferredAttachResponse attach_response;
+
+    // As above, but for GetTraceStats().
+    DeferredGetTraceStatsResponse get_trace_stats_response;
   };
 
   // This has to be a container that doesn't invalidate iterators.
diff --git a/src/tracing/test/mock_consumer.cc b/src/tracing/test/mock_consumer.cc
index 3822ebe..b92b274 100644
--- a/src/tracing/test/mock_consumer.cc
+++ b/src/tracing/test/mock_consumer.cc
@@ -17,6 +17,7 @@
 #include "src/tracing/test/mock_consumer.h"
 
 #include "perfetto/tracing/core/trace_config.h"
+#include "perfetto/tracing/core/trace_stats.h"
 #include "src/base/test/test_task_runner.h"
 
 using ::testing::_;
@@ -114,4 +115,27 @@
   return decoded_packets;
 }
 
+void MockConsumer::GetTraceStats() {
+  service_endpoint_->GetTraceStats();
+}
+
+void MockConsumer::WaitForTraceStats(bool success) {
+  static int i = 0;
+  auto checkpoint_name = "on_trace_stats_" + std::to_string(i++);
+  auto on_trace_stats = task_runner_->CreateCheckpoint(checkpoint_name);
+  auto result_callback = [on_trace_stats](bool, const TraceStats&) {
+    on_trace_stats();
+  };
+  if (success) {
+    EXPECT_CALL(*this,
+                OnTraceStats(true, testing::Property(&TraceStats::total_buffers,
+                                                     testing::Gt(0))))
+        .WillOnce(Invoke(result_callback));
+  } else {
+    EXPECT_CALL(*this, OnTraceStats(false, _))
+        .WillOnce(Invoke(result_callback));
+  }
+  task_runner_->RunUntilCheckpoint(checkpoint_name);
+}
+
 }  // namespace perfetto
diff --git a/src/tracing/test/mock_consumer.h b/src/tracing/test/mock_consumer.h
index e19729d..184e8da 100644
--- a/src/tracing/test/mock_consumer.h
+++ b/src/tracing/test/mock_consumer.h
@@ -54,6 +54,8 @@
   void WaitForTracingDisabled(uint32_t timeout_ms = 3000);
   FlushRequest Flush(uint32_t timeout_ms = 10000);
   std::vector<protos::TracePacket> ReadBuffers();
+  void GetTraceStats();
+  void WaitForTraceStats(bool success);
 
   TracingService::ConsumerEndpoint* endpoint() {
     return service_endpoint_.get();
@@ -67,6 +69,7 @@
                void(std::vector<TracePacket>* /*packets*/, bool /*has_more*/));
   MOCK_METHOD1(OnDetach, void(bool));
   MOCK_METHOD2(OnAttach, void(bool, const TraceConfig&));
+  MOCK_METHOD2(OnTraceStats, void(bool, const TraceStats&));
 
   // gtest doesn't support move-only types. This wrapper is here jut to pass
   // a pointer to the vector (rather than the vector itself) to the mock method.
diff --git a/src/tracing/test/tracing_integration_test.cc b/src/tracing/test/tracing_integration_test.cc
index 5478dcd..fdac291 100644
--- a/src/tracing/test/tracing_integration_test.cc
+++ b/src/tracing/test/tracing_integration_test.cc
@@ -26,6 +26,7 @@
 #include "perfetto/tracing/core/producer.h"
 #include "perfetto/tracing/core/trace_config.h"
 #include "perfetto/tracing/core/trace_packet.h"
+#include "perfetto/tracing/core/trace_stats.h"
 #include "perfetto/tracing/core/trace_writer.h"
 #include "perfetto/tracing/ipc/consumer_ipc_client.h"
 #include "perfetto/tracing/ipc/producer_ipc_client.h"
@@ -78,6 +79,7 @@
   MOCK_METHOD2(OnTracePackets, void(std::vector<TracePacket>*, bool));
   MOCK_METHOD1(OnDetach, void(bool));
   MOCK_METHOD2(OnAttach, void(bool, const TraceConfig&));
+  MOCK_METHOD2(OnTraceStats, void(bool, const TraceStats&));
 
   // Workaround, gmock doesn't support yet move-only types, passing a pointer.
   void OnTraceData(std::vector<TracePacket> packets, bool has_more) {
diff --git a/test/test_helper.cc b/test/test_helper.cc
index 46572e8..afdd8eb 100644
--- a/test/test_helper.cc
+++ b/test/test_helper.cc
@@ -175,6 +175,8 @@
     std::move(on_attach_callback_)(success);
 }
 
+void TestHelper::OnTraceStats(bool, const TraceStats&) {}
+
 // static
 const char* TestHelper::GetConsumerSocketName() {
   return TEST_CONSUMER_SOCK_NAME;
diff --git a/test/test_helper.h b/test/test_helper.h
index a5a1301..f2fe7dd 100644
--- a/test/test_helper.h
+++ b/test/test_helper.h
@@ -43,6 +43,7 @@
   void OnTraceData(std::vector<TracePacket> packets, bool has_more) override;
   void OnDetach(bool) override;
   void OnAttach(bool, const TraceConfig&) override;
+  void OnTraceStats(bool, const TraceStats&) override;
 
   void StartServiceIfRequired();
   FakeProducer* ConnectFakeProducer();
diff --git a/tools/gen_tracing_cpp_headers_from_protos b/tools/gen_tracing_cpp_headers_from_protos
index ce1cbe4..8c14b6f 100755
--- a/tools/gen_tracing_cpp_headers_from_protos
+++ b/tools/gen_tracing_cpp_headers_from_protos
@@ -21,6 +21,7 @@
   'perfetto/common/android_log_constants.proto',
   'perfetto/common/commit_data_request.proto',
   'perfetto/common/sys_stats_counters.proto',
+  'perfetto/common/trace_stats.proto',
   'perfetto/config/android/android_log_config.proto',
   'perfetto/config/chrome/chrome_config.proto',
   'perfetto/config/data_source_config.proto',
diff --git a/ui/src/base/logging.ts b/ui/src/base/logging.ts
index 5afd734..c3279c1 100644
--- a/ui/src/base/logging.ts
+++ b/ui/src/base/logging.ts
@@ -19,9 +19,12 @@
   return value;
 }
 
-export function assertTrue(value: boolean): boolean {
+export function assertTrue(value: boolean, optMsg?: string) {
   if (value !== true) {
-    throw new Error('Failed assertion');
+    throw new Error(optMsg ? optMsg : 'Failed assertion');
   }
-  return value;
+}
+
+export function assertFalse(value: boolean, optMsg?: string) {
+  assertTrue(!value, optMsg);
 }
diff --git a/ui/src/common/protos.ts b/ui/src/common/protos.ts
index 6f78d61..5017fd4 100644
--- a/ui/src/common/protos.ts
+++ b/ui/src/common/protos.ts
@@ -12,6 +12,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
+import {assertFalse, assertTrue} from '../base/logging';
 import * as protos from '../gen/protos';
 
 // Aliases protos to avoid the super nested namespaces.
@@ -34,16 +35,20 @@
 // TODO(hjd): Maybe these should go in their own file.
 export interface Row { [key: string]: number|string; }
 
+const COLUMN_TYPE_STR = RawQueryResult.ColumnDesc.Type.STRING;
+const COLUMN_TYPE_DOUBLE = RawQueryResult.ColumnDesc.Type.DOUBLE;
+const COLUMN_TYPE_LONG = RawQueryResult.ColumnDesc.Type.LONG;
+
 function getCell(result: RawQueryResult, column: number, row: number): number|
     string|null {
   const values = result.columns[column];
   if (values.isNulls![row]) return null;
   switch (result.columnDescriptors[column].type) {
-    case RawQueryResult.ColumnDesc.Type.LONG:
+    case COLUMN_TYPE_LONG:
       return +values.longValues![row];
-    case RawQueryResult.ColumnDesc.Type.DOUBLE:
+    case COLUMN_TYPE_DOUBLE:
       return +values.doubleValues![row];
-    case RawQueryResult.ColumnDesc.Type.STRING:
+    case COLUMN_TYPE_STR:
       return values.stringValues![row];
     default:
       throw new Error('Unhandled type!');
@@ -88,6 +93,96 @@
   }
 }
 
+export const NUM: number = 0;
+export const STR: string = 'str';
+export const NUM_NULL: number|null = 1;
+export const STR_NULL: string|null = 'str_null';
+
+/**
+ * This function allows for type safe use of RawQueryResults.
+ * The input is a RawQueryResult (|raw|) and a "spec".
+ * A spec is an object where the keys are column names and the values
+ * are constants representing the types. For example:
+ * {
+ *   upid: NUM,
+ *   pid: NUM_NULL,
+ *   processName: STR_NULL,
+ * }
+ * The output is a iterable of rows each row looks like the given spec:
+ * {
+ *   upid: 1,
+ *   pid: 42,
+ *   processName: null,
+ * }
+ * Each row has an appropriate typescript type based on the spec so there
+ * is no need to use ! or cast when using the result of rawQueryToRows.
+ * Note: type checking to ensure that the RawQueryResult matches the spec
+ * happens at runtime, so if a query can return null and this is not reflected
+ * in the spec this will still crash at runtime.
+ */
+export function*
+    rawQueryToRows<T>(raw: RawQueryResult, spec: T): IterableIterator<T> {
+  const allColumns = rawQueryResultColumns(raw);
+  const columns: Array<[string, (row: number) => string | number | null]> = [];
+  for (const [key, columnSpec] of Object.entries(spec)) {
+    const i = allColumns.indexOf(key);
+    assertTrue(i !== -1, `Expected column "${key}" (cols ${allColumns})`);
+
+    const column = raw.columns[i];
+    const isNulls = column.isNulls!;
+    const columnType = raw.columnDescriptors[i].type;
+
+    if (columnSpec === NUM || columnSpec === STR) {
+      for (let j = 0; j < raw.numRecords; j++) {
+        assertFalse(column.isNulls![i], `Unexpected null in ${key} row ${j}`);
+      }
+    }
+
+    if (columnSpec === NUM || columnSpec === NUM_NULL) {
+      if (columnType === COLUMN_TYPE_STR) {
+        throw new Error(`Expected numbers in column ${key} found strings`);
+      }
+    } else if (columnSpec === STR || columnSpec === STR_NULL) {
+      if (columnType === COLUMN_TYPE_LONG ||
+          columnType === COLUMN_TYPE_DOUBLE) {
+        throw new Error(`Expected strings in column ${key} found numbers`);
+      }
+    }
+
+    let accessor;
+    switch (columnType) {
+      case COLUMN_TYPE_LONG: {
+        const values = column.longValues!;
+        accessor = (i: number) => isNulls[i] ? null : +values[i];
+        break;
+      }
+      case COLUMN_TYPE_DOUBLE: {
+        const values = column.doubleValues!;
+        accessor = (i: number) => isNulls[i] ? null : values[i];
+        break;
+      }
+      case COLUMN_TYPE_STR: {
+        const values = column.stringValues!;
+        accessor = (i: number) => isNulls[i] ? null : values[i];
+        break;
+      }
+      default:
+        // We can only reach here if the column is completely null.
+        accessor = (_: number) => null;
+        break;
+    }
+    columns.push([key, accessor]);
+  }
+
+  for (let i = 0; i < raw.numRecords; i++) {
+    const row: {[_: string]: number | string | null} = {};
+    for (const [name, accessor] of columns) {
+      row[name] = accessor(i);
+    }
+    yield row as {} as T;
+  }
+}
+
 export {
   IAndroidPowerConfig,
   IProcessStatsConfig,
diff --git a/ui/src/controller/trace_controller.ts b/ui/src/controller/trace_controller.ts
index 6d60e12..21f8d6a 100644
--- a/ui/src/controller/trace_controller.ts
+++ b/ui/src/controller/trace_controller.ts
@@ -22,12 +22,13 @@
   DeferredAction,
 } from '../common/actions';
 import {Engine} from '../common/engine';
+import {NUM, NUM_NULL, rawQueryToRows, STR_NULL} from '../common/protos';
 import {SCROLLING_TRACK_GROUP} from '../common/state';
 import {TimeSpan} from '../common/time';
 import {QuantizedLoad, ThreadDesc} from '../frontend/globals';
 import {SLICE_TRACK_KIND} from '../tracks/chrome_slices/common';
-import {CPU_SLICE_TRACK_KIND} from '../tracks/cpu_slices/common';
 import {CPU_FREQ_TRACK_KIND} from '../tracks/cpu_freq/common';
+import {CPU_SLICE_TRACK_KIND} from '../tracks/cpu_slices/common';
 import {PROCESS_SUMMARY_TRACK} from '../tracks/process_summary/common';
 
 import {Child, Children, Controller} from './controller';
@@ -322,16 +323,23 @@
       utidToMaxDepth.set(utid, maxDepth);
     }
 
-    // Return all threads with parent process information
+    // Return all threads
     // sorted by:
     //  total cpu time *for the whole parent process*
     //  upid
     //  utid
     const threadQuery = await engine.query(`
-        select utid, tid, upid, pid, thread.name, process.name, total_dur
+        select
+          utid,
+          tid,
+          upid,
+          pid,
+          thread.name as threadName,
+          process.name as processName,
+          total_dur
         from
           thread
-          inner join process using(upid)
+          left join process using(upid)
           left join (select upid, sum(dur) as total_dur
               from sched join thread using(utid)
               group by upid
@@ -339,39 +347,56 @@
         order by total_dur desc, upid, utid`);
 
     const upidToUuid = new Map<number, string>();
+    const utidToUuid = new Map<number, string>();
     const addSummaryTrackActions: DeferredAction[] = [];
     const addTrackGroupActions: DeferredAction[] = [];
-    for (let i = 0; i < threadQuery.numRecords; i++) {
-      const utid = threadQuery.columns[0].longValues![i] as number;
-      const tid = threadQuery.columns[1].longValues![i] as number;
-      const upid = threadQuery.columns[2].longValues![i] as number;
-      const pid = threadQuery.columns[3].longValues![i] as number;
-      const threadName = threadQuery.columns[4].stringValues![i];
-      const processName = threadQuery.columns[5].stringValues![i];
 
-      const maxDepth = utidToMaxDepth.get(utid);
-      if (maxDepth === undefined && !counterUpids.has(upid) &&
+
+    for (const row of rawQueryToRows(threadQuery, {
+           utid: NUM,
+           upid: NUM_NULL,
+           tid: NUM_NULL,
+           pid: NUM_NULL,
+           threadName: STR_NULL,
+           processName: STR_NULL,
+         })) {
+      const utid = row.utid;
+      const tid = row.tid;
+      const upid = row.upid;
+      const pid = row.pid;
+      const threadName = row.threadName;
+      const processName = row.threadName;
+
+      const maxDepth = utid === null ? undefined : utidToMaxDepth.get(utid);
+      if (maxDepth === undefined &&
+          (upid === null || !counterUpids.has(upid)) &&
           !counterUtids.has(utid)) {
         continue;
       }
 
-      let pUuid = upidToUuid.get(upid);
+      // Group by upid if present else by utid.
+      let pUuid = upid === null ? utidToUuid.get(utid) : upidToUuid.get(upid);
       if (pUuid === undefined) {
         pUuid = uuidv4();
         const summaryTrackId = uuidv4();
-        upidToUuid.set(upid, pUuid);
+        if (upid === null) {
+          utidToUuid.set(utid, pUuid);
+        } else {
+          upidToUuid.set(upid, pUuid);
+        }
         addSummaryTrackActions.push(Actions.addTrack({
           id: summaryTrackId,
           engineId: this.engineId,
           kind: PROCESS_SUMMARY_TRACK,
-          name: `${pid} summary`,
+          name: `${upid === null ? pid : tid} summary`,
           config: {upid, pid, maxDepth, utid},
         }));
 
         addTrackGroupActions.push(Actions.addTrackGroup({
           engineId: this.engineId,
           summaryTrackId,
-          name: `${processName} ${pid}`,
+          name: upid === null ? `${threadName} ${tid}` :
+                                `${processName} ${pid}`,
           id: pUuid,
           collapsed: true,
         }));
@@ -416,7 +441,7 @@
         addToTrackActions.push(Actions.addTrack({
           engineId: this.engineId,
           kind: SLICE_TRACK_KIND,
-          name: threadName + `[${tid}]`,
+          name: `${threadName} [${tid}]`,
           trackGroup: pUuid,
           config: {upid, utid, maxDepth},
         }));
diff --git a/ui/src/engine/index.ts b/ui/src/engine/index.ts
index 096376e..7c4916b 100644
--- a/ui/src/engine/index.ts
+++ b/ui/src/engine/index.ts
@@ -19,10 +19,25 @@
 // tslint:disable no-any
 // Proxy all messages to WasmBridge#callWasm.
 const anySelf = (self as any);
+
+// Messages can arrive before we are initialized, queue these for later.
+const msgQueue: MessageEvent[] = [];
+anySelf.onmessage = (msg: MessageEvent) => {
+  msgQueue.push(msg);
+};
+
 const bridge = new WasmBridge(init_trace_processor);
 bridge.whenInitialized.then(() => {
-  anySelf.onmessage = (msg: MessageEvent) => {
+  const handleMsg = (msg: MessageEvent) => {
     const request: WasmBridgeRequest = msg.data;
     anySelf.postMessage(bridge.callWasm(request));
   };
+
+  // Dispatch queued messages.
+  let msg;
+  while (msg = msgQueue.shift()) {
+    handleMsg(msg);
+  }
+
+  anySelf.onmessage = handleMsg;
 });