Introduce TraceWriter and SharedMemoryArbiter for Producer(s)
These classes allow a Producer to get writer instances to allow
writing from several threads sharing the same per-Producer
SharedMemoryBuffer.
Test: perfetto_tests --gtest_filter=*SharedMemoryArbiter*
Test: perfetto_tests --gtest_filter=*TraceWriterImpl*
Bug: 70284518
Bug: 68854243
Change-Id: I56c4c699d9722bd34a8c5b32f514c6faa9cb95c5
diff --git a/Android.bp b/Android.bp
index 1379d31..c5411f1 100644
--- a/Android.bp
+++ b/Android.bp
@@ -169,6 +169,24 @@
],
}
+// GN target: //protos:zero_gen
+genrule {
+ name: "perfetto_protos_zero_gen",
+ srcs: [
+ "protos/test_event.proto",
+ "protos/trace_packet.proto",
+ ],
+ tools: [
+ "aprotoc",
+ "perfetto_src_protozero_protoc_plugin_protoc_plugin___gn_standalone_toolchain_gcc_like_host_",
+ ],
+ cmd: "mkdir -p $(genDir)/external/perfetto && $(location aprotoc) --cpp_out=$(genDir)/external/perfetto --proto_path=external/perfetto --plugin=protoc-gen-plugin=$(location perfetto_src_protozero_protoc_plugin_protoc_plugin___gn_standalone_toolchain_gcc_like_host_) --plugin_out=wrapper_namespace=pbzero:$(genDir)/external/perfetto $(in)",
+ out: [
+ "external/perfetto/protos/test_event.pbzero.cc",
+ "external/perfetto/protos/trace_packet.pbzero.cc",
+ ],
+}
+
// GN target: //src/ipc:test_messages_gen
genrule {
name: "perfetto_src_ipc_test_messages_gen",
@@ -196,6 +214,7 @@
genrule {
name: "perfetto_protos_lite_gen_headers",
srcs: [
+ "protos/test_event.proto",
"protos/trace_packet.proto",
],
tools: [
@@ -203,6 +222,7 @@
],
cmd: "mkdir -p $(genDir)/external/perfetto && $(location aprotoc) --cpp_out=$(genDir)/external/perfetto --proto_path=external/perfetto $(in)",
out: [
+ "external/perfetto/protos/test_event.pb.h",
"external/perfetto/protos/trace_packet.pb.h",
],
export_include_dirs: [
@@ -225,6 +245,44 @@
],
}
+// GN target: //protos:zero_gen
+genrule {
+ name: "perfetto_protos_zero_gen_headers",
+ srcs: [
+ "protos/test_event.proto",
+ "protos/trace_packet.proto",
+ ],
+ tools: [
+ "aprotoc",
+ "perfetto_src_protozero_protoc_plugin_protoc_plugin___gn_standalone_toolchain_gcc_like_host_",
+ ],
+ cmd: "mkdir -p $(genDir)/external/perfetto && $(location aprotoc) --cpp_out=$(genDir)/external/perfetto --proto_path=external/perfetto --plugin=protoc-gen-plugin=$(location perfetto_src_protozero_protoc_plugin_protoc_plugin___gn_standalone_toolchain_gcc_like_host_) --plugin_out=wrapper_namespace=pbzero:$(genDir)/external/perfetto $(in)",
+ out: [
+ "external/perfetto/protos/test_event.pbzero.h",
+ "external/perfetto/protos/trace_packet.pbzero.h",
+ ],
+ export_include_dirs: [
+ ".",
+ ],
+}
+
+// GN target: //protos:lite_gen
+genrule {
+ name: "perfetto_protos_lite_gen",
+ srcs: [
+ "protos/test_event.proto",
+ "protos/trace_packet.proto",
+ ],
+ tools: [
+ "aprotoc",
+ ],
+ cmd: "mkdir -p $(genDir)/external/perfetto && $(location aprotoc) --cpp_out=$(genDir)/external/perfetto --proto_path=external/perfetto $(in)",
+ out: [
+ "external/perfetto/protos/test_event.pb.cc",
+ "external/perfetto/protos/trace_packet.pb.cc",
+ ],
+}
+
// GN target: //src/ftrace_reader:ftrace_reader_test_messages_zero_gen
genrule {
name: "perfetto_src_ftrace_reader_ftrace_reader_test_messages_zero_gen_headers",
@@ -244,21 +302,6 @@
],
}
-// GN target: //protos:lite_gen
-genrule {
- name: "perfetto_protos_lite_gen",
- srcs: [
- "protos/trace_packet.proto",
- ],
- tools: [
- "aprotoc",
- ],
- cmd: "mkdir -p $(genDir)/external/perfetto && $(location aprotoc) --cpp_out=$(genDir)/external/perfetto --proto_path=external/perfetto $(in)",
- out: [
- "external/perfetto/protos/trace_packet.pb.cc",
- ],
-}
-
// GN target: //protos/ftrace:zero_gen
genrule {
name: "perfetto_protos_ftrace_zero_gen",
@@ -328,6 +371,7 @@
":perfetto_protos_lite_gen",
":perfetto_protos_tracing_service_lite_gen",
":perfetto_protos_tracing_service_tracing_service_gen",
+ ":perfetto_protos_zero_gen",
":perfetto_src_ftrace_reader_ftrace_reader_test_messages_lite_gen",
":perfetto_src_ftrace_reader_ftrace_reader_test_messages_zero_gen",
":perfetto_src_ipc_test_messages_gen",
@@ -388,15 +432,20 @@
"src/tracing/core/service_impl_unittest.cc",
"src/tracing/core/shared_memory_abi.cc",
"src/tracing/core/shared_memory_abi_unittest.cc",
+ "src/tracing/core/shared_memory_arbiter.cc",
+ "src/tracing/core/shared_memory_arbiter_unittest.cc",
"src/tracing/core/trace_config.cc",
"src/tracing/core/trace_packet.cc",
"src/tracing/core/trace_packet_unittest.cc",
+ "src/tracing/core/trace_writer_impl.cc",
+ "src/tracing/core/trace_writer_impl_unittest.cc",
"src/tracing/ipc/consumer/consumer_ipc_client_impl.cc",
"src/tracing/ipc/posix_shared_memory.cc",
"src/tracing/ipc/posix_shared_memory_unittest.cc",
"src/tracing/ipc/producer/producer_ipc_client_impl.cc",
"src/tracing/ipc/service/producer_ipc_service.cc",
"src/tracing/ipc/service/service_ipc_host_impl.cc",
+ "src/tracing/test/aligned_buffer_test.cc",
"src/tracing/test/test_shared_memory.cc",
"tools/sanitizers_unittests/sanitizers_unittest.cc",
],
@@ -414,6 +463,7 @@
"perfetto_protos_lite_gen_headers",
"perfetto_protos_tracing_service_lite_gen_headers",
"perfetto_protos_tracing_service_tracing_service_gen_headers",
+ "perfetto_protos_zero_gen_headers",
"perfetto_src_ftrace_reader_ftrace_reader_test_messages_lite_gen_headers",
"perfetto_src_ftrace_reader_ftrace_reader_test_messages_zero_gen_headers",
"perfetto_src_ipc_test_messages_gen_headers",
diff --git a/include/perfetto/tracing/core/BUILD.gn b/include/perfetto/tracing/core/BUILD.gn
index 8d83c55..1f895ac 100644
--- a/include/perfetto/tracing/core/BUILD.gn
+++ b/include/perfetto/tracing/core/BUILD.gn
@@ -28,5 +28,6 @@
"shared_memory_abi.h",
"trace_config.h",
"trace_packet.h",
+ "trace_writer.h",
]
}
diff --git a/include/perfetto/tracing/core/trace_writer.h b/include/perfetto/tracing/core/trace_writer.h
new file mode 100644
index 0000000..8b64b3e
--- /dev/null
+++ b/include/perfetto/tracing/core/trace_writer.h
@@ -0,0 +1,67 @@
+/*
+ * Copyright (C) 2017 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef INCLUDE_PERFETTO_TRACING_CORE_TRACE_WRITER_H_
+#define INCLUDE_PERFETTO_TRACING_CORE_TRACE_WRITER_H_
+
+#include "perfetto/protozero/protozero_message_handle.h"
+
+namespace perfetto {
+
+namespace protos {
+namespace pbzero {
+class TracePacket;
+} // namespace pbzero
+} // namespace protos
+
+// This is a single-thread write interface that allows to write protobufs
+// directly into the tracing shared buffer without making any copies.
+// It takes care of acquiring and releasing chunks from the
+// SharedMemoryArbiter and splitting protos over chunks.
+// The idea is that each data source creates one (or more) TraceWriter for each
+// thread it wants to write from. Each TraceWriter will get its own dedicated
+// chunk and will write into the shared buffer without any locking most of the
+// time. Locking will happen only when a chunk is exhausted and a new one is
+// acquired from the arbiter.
+
+// TODO: TraceWriter needs to keep the shared memory buffer alive (refcount?).
+// Otherwise if the shared memory buffer goes away (e.g. the Service crashes)
+// the TraceWriter will keep writing into unmapped memory.
+
+class TraceWriter {
+ public:
+ using TracePacketHandle =
+ protozero::ProtoZeroMessageHandle<protos::pbzero::TracePacket>;
+
+ TraceWriter();
+ virtual ~TraceWriter();
+
+ // Returns a handle to the root proto message for the trace. The message will
+ // be finalized either by calling directly handle.Finalize() or by letting the
+ // handle go out of scope. The returned handle can be std::move()'d but cannot
+ // be used after either: (i) the TraceWriter instance is destroyed, (ii) a
+ // subsequence NewTracePacket() call is made on the same TraceWriter instance.
+ virtual TracePacketHandle NewTracePacket() = 0;
+ virtual WriterID writer_id() const = 0;
+
+ private:
+ TraceWriter(const TraceWriter&) = delete;
+ TraceWriter& operator=(const TraceWriter&) = delete;
+};
+
+} // namespace perfetto
+
+#endif // INCLUDE_PERFETTO_TRACING_CORE_TRACE_WRITER_H_
diff --git a/protos/BUILD.gn b/protos/BUILD.gn
index 077d57d..3ba9e45 100644
--- a/protos/BUILD.gn
+++ b/protos/BUILD.gn
@@ -16,7 +16,10 @@
import("../gn/perfetto.gni")
import("../gn/proto_library.gni")
-proto_sources = [ "trace_packet.proto" ]
+proto_sources = [
+ "test_event.proto",
+ "trace_packet.proto",
+]
# Protozero generated stubs, for writers.
protozero_library("zero") {
diff --git a/protos/test_event.proto b/protos/test_event.proto
new file mode 100644
index 0000000..bac6322
--- /dev/null
+++ b/protos/test_event.proto
@@ -0,0 +1,22 @@
+/*
+ * Copyright (C) 2017 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto2";
+option optimize_for = LITE_RUNTIME;
+
+package perfetto.protos;
+
+message TestEvent {}
diff --git a/protos/trace_packet.proto b/protos/trace_packet.proto
index 5db2b1b..e59e991 100644
--- a/protos/trace_packet.proto
+++ b/protos/trace_packet.proto
@@ -18,12 +18,16 @@
option optimize_for = LITE_RUNTIME;
import "protos/ftrace/ftrace_event_bundle.proto";
+import "protos/test_event.proto";
package perfetto.protos;
// The root object emitted by Perfetto. A perfetto trace is just a stream of
// TracePacket(s).
message TracePacket {
- oneof data { FtraceEventBundle ftrace_events = 1; }
+ oneof data {
+ FtraceEventBundle ftrace_events = 1;
+ TestEvent test_event = 536870911; // 2^29 - 1, max field id for protos.
+ }
optional string test = 2;
}
diff --git a/src/tracing/BUILD.gn b/src/tracing/BUILD.gn
index 33697e5..bbfa964 100644
--- a/src/tracing/BUILD.gn
+++ b/src/tracing/BUILD.gn
@@ -23,6 +23,7 @@
deps = [
"../../gn:default_deps",
"../../protos:lite",
+ "../../protos:zero",
"../base",
]
sources = [
@@ -35,8 +36,12 @@
"core/service_impl.cc",
"core/service_impl.h",
"core/shared_memory_abi.cc",
+ "core/shared_memory_arbiter.cc",
+ "core/shared_memory_arbiter.h",
"core/trace_config.cc",
"core/trace_packet.cc",
+ "core/trace_writer_impl.cc",
+ "core/trace_writer_impl.h",
]
}
@@ -76,6 +81,7 @@
"../../gn:default_deps",
"../../gn:gtest_deps",
"../../protos:lite",
+ "../../protos:zero",
"../base",
"../base:test_support",
]
@@ -84,8 +90,12 @@
"core/id_allocator_unittest.cc",
"core/service_impl_unittest.cc",
"core/shared_memory_abi_unittest.cc",
+ "core/shared_memory_arbiter_unittest.cc",
"core/trace_packet_unittest.cc",
+ "core/trace_writer_impl_unittest.cc",
"ipc/posix_shared_memory_unittest.cc",
+ "test/aligned_buffer_test.cc",
+ "test/aligned_buffer_test.h",
"test/test_shared_memory.cc",
"test/test_shared_memory.h",
]
diff --git a/src/tracing/core/shared_memory_abi_unittest.cc b/src/tracing/core/shared_memory_abi_unittest.cc
index d460759..dd75534 100644
--- a/src/tracing/core/shared_memory_abi_unittest.cc
+++ b/src/tracing/core/shared_memory_abi_unittest.cc
@@ -17,7 +17,7 @@
#include "perfetto/tracing/core/shared_memory_abi.h"
#include "gtest/gtest.h"
-#include "perfetto/base/utils.h"
+#include "src/tracing/test/aligned_buffer_test.h"
namespace perfetto {
namespace {
@@ -26,35 +26,18 @@
using Chunk = SharedMemoryABI::Chunk;
using ChunkHeader = SharedMemoryABI::ChunkHeader;
-class SharedMemoryABITest : public ::testing::TestWithParam<size_t> {
- public:
- void SetUp() override {
- page_size_ = GetParam();
- buf_size_ = page_size_ * kNumPages;
- void* mem = nullptr;
- PERFETTO_CHECK(posix_memalign(&mem, page_size_, buf_size_) == 0);
- buf_.reset(reinterpret_cast<uint8_t*>(mem));
- memset(buf_.get(), 0, buf_size_);
- }
-
- void TearDown() override { buf_.reset(); }
-
- const size_t kNumPages = 10;
- std::unique_ptr<uint8_t, base::FreeDeleter> buf_;
- size_t buf_size_;
- size_t page_size_;
-};
+using SharedMemoryABITest = AlignedBufferTest;
size_t const kPageSizes[] = {4096, 8192, 16384, 32768, 65536};
INSTANTIATE_TEST_CASE_P(PageSize, SharedMemoryABITest, ValuesIn(kPageSizes));
TEST_P(SharedMemoryABITest, NominalCases) {
- SharedMemoryABI abi(buf_.get(), buf_size_, page_size_);
+ SharedMemoryABI abi(buf(), buf_size(), page_size());
- ASSERT_EQ(buf_.get(), abi.start());
- ASSERT_EQ(buf_.get() + buf_size_, abi.end());
- ASSERT_EQ(buf_size_, abi.size());
- ASSERT_EQ(page_size_, abi.page_size());
+ ASSERT_EQ(buf(), abi.start());
+ ASSERT_EQ(buf() + buf_size(), abi.end());
+ ASSERT_EQ(buf_size(), abi.size());
+ ASSERT_EQ(page_size(), abi.page_size());
ASSERT_EQ(kNumPages, abi.num_pages());
for (size_t i = 0; i < kNumPages; i++) {
@@ -94,8 +77,8 @@
uint8_t* last_chunk_end = nullptr;
for (size_t page_idx = 0; page_idx <= 4; page_idx++) {
- uint8_t* const page_start = buf_.get() + page_idx * page_size_;
- uint8_t* const page_end = page_start + page_size_;
+ uint8_t* const page_start = buf() + page_idx * page_size();
+ uint8_t* const page_end = page_start + page_size();
const size_t num_chunks =
SharedMemoryABI::GetNumChunksForLayout(abi.page_layout_dbg(page_idx));
const size_t target_buffer = 10 + page_idx;
@@ -132,7 +115,7 @@
// Sanity check chunk bounds.
size_t expected_chunk_size =
- (page_size_ - sizeof(SharedMemoryABI::PageHeader)) / num_chunks;
+ (page_size() - sizeof(SharedMemoryABI::PageHeader)) / num_chunks;
expected_chunk_size = expected_chunk_size - (expected_chunk_size % 4);
ASSERT_EQ(expected_chunk_size, chunk.size());
ASSERT_GT(chunk.begin(), page_start);
@@ -224,7 +207,7 @@
}
TEST_P(SharedMemoryABITest, BatchAcquireAndRelease) {
- SharedMemoryABI abi(buf_.get(), buf_size_, page_size_);
+ SharedMemoryABI abi(buf(), buf_size(), page_size());
ChunkHeader header{};
// TryAcquire on a non-partitioned page should fail.
diff --git a/src/tracing/core/shared_memory_arbiter.cc b/src/tracing/core/shared_memory_arbiter.cc
new file mode 100644
index 0000000..e2a059d
--- /dev/null
+++ b/src/tracing/core/shared_memory_arbiter.cc
@@ -0,0 +1,164 @@
+/*
+ * Copyright (C) 2017 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "src/tracing/core/shared_memory_arbiter.h"
+
+#include "perfetto/base/logging.h"
+#include "perfetto/base/task_runner.h"
+#include "src/tracing/core/trace_writer_impl.h"
+
+#include <limits>
+
+namespace perfetto {
+
+using Chunk = SharedMemoryABI::Chunk;
+
+// static
+SharedMemoryABI::PageLayout SharedMemoryArbiter::default_page_layout =
+ SharedMemoryABI::PageLayout::kPageDiv1;
+
+SharedMemoryArbiter::SharedMemoryArbiter(void* start,
+ size_t size,
+ size_t page_size,
+ OnPageCompleteCallback callback,
+ base::TaskRunner* task_runner)
+ : task_runner_(task_runner),
+ on_page_complete_callback_(std::move(callback)),
+ shmem_abi_(reinterpret_cast<uint8_t*>(start), size, page_size),
+ active_writer_ids_(SharedMemoryABI::kMaxWriterID + 1) {}
+
+Chunk SharedMemoryArbiter::GetNewChunk(
+ const SharedMemoryABI::ChunkHeader& header,
+ BufferID target_buffer,
+ size_t size_hint) {
+ PERFETTO_DCHECK(size_hint == 0); // Not implemented yet.
+
+ for (;;) {
+ // TODO(primiano): Probably this lock is not really required and this code
+ // could be rewritten leveraging only the Try* atomic operations in
+ // SharedMemoryABI. But let's not be too adventurous for the moment.
+ {
+ std::lock_guard<std::mutex> scoped_lock(lock_);
+ const size_t initial_page_idx = page_idx_;
+ // TODO(primiano): instead of scanning, we could maintain a bitmap of
+ // free chunks for each |target_buffer| and one for fully free pages.
+ for (size_t i = 0; i < shmem_abi_.num_pages(); i++) {
+ page_idx_ = (initial_page_idx + i) % shmem_abi_.num_pages();
+ bool is_new_page = false;
+
+ // TODO(primiano): make the page layout dynamic.
+ auto layout = SharedMemoryArbiter::default_page_layout;
+
+ if (shmem_abi_.is_page_free(page_idx_)) {
+ // TODO(primiano): Use the |size_hint| here to decide the layout.
+ is_new_page =
+ shmem_abi_.TryPartitionPage(page_idx_, layout, target_buffer);
+ }
+ uint32_t free_chunks;
+ size_t tbuf;
+ if (is_new_page) {
+ free_chunks = (1 << SharedMemoryABI::kNumChunksForLayout[layout]) - 1;
+ tbuf = target_buffer;
+ } else {
+ free_chunks = shmem_abi_.GetFreeChunks(page_idx_);
+
+ // |tbuf| here is advisory only and could change at any point, before
+ // or after this read. The only use of |tbuf| here is to to skip pages
+ // that are more likely to belong to other target_buffers, avoiding
+ // the more epxensive atomic operations in those cases. The
+ // authoritative check on |tbuf| happens atomically in the
+ // TryAcquireChunkForWriting() call below.
+ tbuf = shmem_abi_.page_header(page_idx_)->target_buffer.load(
+ std::memory_order_relaxed);
+ }
+ PERFETTO_DLOG("Free chunks for page %zu: %x. Target buffer: %zu",
+ page_idx_, free_chunks, tbuf);
+
+ if (tbuf != target_buffer)
+ continue;
+
+ for (uint32_t chunk_idx = 0; free_chunks;
+ chunk_idx++, free_chunks >>= 1) {
+ if (!(free_chunks & 1))
+ continue;
+ // We found a free chunk.
+ Chunk chunk = shmem_abi_.TryAcquireChunkForWriting(
+ page_idx_, chunk_idx, tbuf, &header);
+ if (!chunk.is_valid())
+ continue;
+ PERFETTO_DLOG("Acquired chunk %zu:%u", page_idx_, chunk_idx);
+ return chunk;
+ }
+ // TODO: we should have some policy to guarantee fairness of the SMB
+ // page allocator w.r.t |target_buffer|? Or is the SMB best-effort. All
+ // chunks in the page are busy (either kBeingRead or kBeingWritten), or
+ // all the pages are assigned to a different target buffer. Try with the
+ // next page.
+ }
+ } // std::lock_guard<std::mutex>
+ // All chunks are taken (either kBeingWritten by us or kBeingRead by the
+ // Service). TODO: at this point we should return a bankrupcy chunk, not
+ // crash the process.
+ PERFETTO_ELOG("Shared memory buffer overrun! Stalling");
+ usleep(250000);
+ }
+}
+
+void SharedMemoryArbiter::ReturnCompletedChunk(Chunk chunk) {
+ bool should_post_callback = false;
+ {
+ std::lock_guard<std::mutex> scoped_lock(lock_);
+ size_t page_index = shmem_abi_.ReleaseChunkAsComplete(std::move(chunk));
+ if (page_index != SharedMemoryABI::kInvalidPageIdx) {
+ should_post_callback = pages_to_notify_.empty();
+ pages_to_notify_.push_back(static_cast<uint32_t>(page_index));
+ }
+ }
+ if (should_post_callback) {
+ // TODO what happens if the arbiter gets destroyed?
+ task_runner_->PostTask(
+ std::bind(&SharedMemoryArbiter::InvokeOnPageCompleteCallback, this));
+ }
+}
+
+// This is always invoked on the |task_runner_| thread.
+void SharedMemoryArbiter::InvokeOnPageCompleteCallback() {
+ std::vector<uint32_t> pages_to_notify;
+ {
+ std::lock_guard<std::mutex> scoped_lock(lock_);
+ pages_to_notify = std::move(pages_to_notify_);
+ pages_to_notify_.clear();
+ }
+ on_page_complete_callback_(pages_to_notify);
+}
+
+std::unique_ptr<TraceWriter> SharedMemoryArbiter::CreateTraceWriter(
+ BufferID target_buffer) {
+ WriterID id;
+ {
+ std::lock_guard<std::mutex> scoped_lock(lock_);
+ id = static_cast<WriterID>(active_writer_ids_.Allocate());
+ }
+ return std::unique_ptr<TraceWriter>(
+ id ? new TraceWriterImpl(this, id, target_buffer) : nullptr);
+}
+
+void SharedMemoryArbiter::ReleaseWriterID(WriterID id) {
+ std::lock_guard<std::mutex> scoped_lock(lock_);
+ active_writer_ids_.Free(id);
+}
+
+} // namespace perfetto
diff --git a/src/tracing/core/shared_memory_arbiter.h b/src/tracing/core/shared_memory_arbiter.h
new file mode 100644
index 0000000..34267bf
--- /dev/null
+++ b/src/tracing/core/shared_memory_arbiter.h
@@ -0,0 +1,112 @@
+/*
+ * Copyright (C) 2017 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef SRC_TRACING_CORE_SHARED_MEMORY_ARBITER_H_
+#define SRC_TRACING_CORE_SHARED_MEMORY_ARBITER_H_
+
+#include <stdint.h>
+
+#include <functional>
+#include <mutex>
+#include <vector>
+
+#include "perfetto/tracing/core/basic_types.h"
+#include "perfetto/tracing/core/shared_memory_abi.h"
+#include "src/tracing/core/id_allocator.h"
+
+namespace perfetto {
+
+class TraceWriter;
+
+namespace base {
+class TaskRunner;
+} // namespace base
+
+// This class handles the shared memory buffer on the producer side. It is used
+// to obtain thread-local chunks and to partition pages from several threads.
+// There is one arbiter instance per Producer.
+// This class is thread-safe and uses locks to do so. Data sources are supposed
+// to interact with this sporadically, only when they run out of space on their
+// current thread-local chunk.
+class SharedMemoryArbiter {
+ public:
+ using OnPageCompleteCallback =
+ std::function<void(const std::vector<uint32_t>& /*page_indexes*/)>;
+
+ // Args:
+ // |start|,|size|: boundaries of the shared memory buffer.
+ // |page_size|: a multiple of 4KB that defines the granularity of tracing
+ // pages. See tradeoff considerations in shared_memory_abi.h.
+ // |OnPageCompleteCallback|: a callback that will be posted on the passed
+ // |TaskRunner| when one or more pages are complete (and hence the Producer
+ // should send a NotifySharedMemoryUpdate() to the Service).
+ SharedMemoryArbiter(void* start,
+ size_t size,
+ size_t page_size,
+ OnPageCompleteCallback,
+ base::TaskRunner*);
+
+ // Creates a new TraceWriter and assigns it a new WriterID. The WriterID is
+ // written in each chunk header owned by a given TraceWriter and is used by
+ // the Service to reconstruct TracePackets written by the same TraceWriter.
+ // Returns nullptr if all WriterID slots are exhausted.
+ // TODO(primiano): instead of nullptr this should return a NoopWriter.
+ std::unique_ptr<TraceWriter> CreateTraceWriter(BufferID target_buffer = 0);
+
+ // Returns a new Chunk to write tracing data. The call always returns a valid
+ // Chunk. TODO(primiano): right now this blocks if there are no free chunks
+ // in the SMB. In the long term the caller should be allowed to pick a policy
+ // and handle the retry itself asynchronously.
+ SharedMemoryABI::Chunk GetNewChunk(const SharedMemoryABI::ChunkHeader&,
+ BufferID target_buffer,
+ size_t size_hint = 0);
+
+ void ReturnCompletedChunk(SharedMemoryABI::Chunk chunk);
+
+ SharedMemoryABI* shmem_abi_for_testing() { return &shmem_abi_; }
+
+ static void set_default_layout_for_testing(SharedMemoryABI::PageLayout l) {
+ default_page_layout = l;
+ }
+
+ private:
+ friend class TraceWriterImpl;
+
+ static SharedMemoryABI::PageLayout default_page_layout;
+
+ SharedMemoryArbiter(const SharedMemoryArbiter&) = delete;
+ SharedMemoryArbiter& operator=(const SharedMemoryArbiter&) = delete;
+
+ // Called by the TraceWriter destructor.
+ void ReleaseWriterID(WriterID);
+
+ void InvokeOnPageCompleteCallback();
+
+ base::TaskRunner* const task_runner_;
+ OnPageCompleteCallback on_page_complete_callback_;
+
+ // --- Begin lock-protected members ---
+ std::mutex lock_;
+ SharedMemoryABI shmem_abi_;
+ size_t page_idx_ = 0;
+ IdAllocator active_writer_ids_;
+ std::vector<uint32_t> pages_to_notify_;
+ // --- End lock-protected members ---
+};
+
+} // namespace perfetto
+
+#endif // SRC_TRACING_CORE_SHARED_MEMORY_ARBITER_H_
diff --git a/src/tracing/core/shared_memory_arbiter_unittest.cc b/src/tracing/core/shared_memory_arbiter_unittest.cc
new file mode 100644
index 0000000..d2b511b
--- /dev/null
+++ b/src/tracing/core/shared_memory_arbiter_unittest.cc
@@ -0,0 +1,174 @@
+/*
+ * Copyright (C) 2017 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "src/tracing/core/shared_memory_arbiter.h"
+
+#include "gtest/gtest.h"
+#include "perfetto/base/utils.h"
+#include "perfetto/tracing/core/basic_types.h"
+#include "perfetto/tracing/core/shared_memory_abi.h"
+#include "perfetto/tracing/core/trace_writer.h"
+#include "src/base/test/test_task_runner.h"
+#include "src/tracing/test/aligned_buffer_test.h"
+
+namespace perfetto {
+namespace {
+
+constexpr size_t kMaxWriterID = SharedMemoryABI::kMaxWriterID;
+
+class SharedMemoryArbiterTest : public AlignedBufferTest {
+ public:
+ void SetUp() override {
+ AlignedBufferTest::SetUp();
+ auto callback = [this](const std::vector<uint32_t>& arg) {
+ if (on_page_complete_)
+ on_page_complete_(arg);
+ };
+ task_runner_.reset(new base::TestTaskRunner());
+ arbiter_.reset(new SharedMemoryArbiter(buf(), buf_size(), page_size(),
+ callback, task_runner_.get()));
+ }
+
+ void TearDown() override {
+ arbiter_.reset();
+ task_runner_.reset();
+ }
+
+ std::unique_ptr<base::TestTaskRunner> task_runner_;
+ std::unique_ptr<SharedMemoryArbiter> arbiter_;
+ std::function<void(const std::vector<uint32_t>&)> on_page_complete_;
+};
+
+size_t const kPageSizes[] = {4096, 65536};
+INSTANTIATE_TEST_CASE_P(PageSize,
+ SharedMemoryArbiterTest,
+ ::testing::ValuesIn(kPageSizes));
+
+// Checks that chunks that target different buffer IDs are placed in different
+// pages.
+TEST_P(SharedMemoryArbiterTest, ChunksAllocationByTargetBufferID) {
+ SharedMemoryArbiter::set_default_layout_for_testing(
+ SharedMemoryABI::PageLayout::kPageDiv4);
+ SharedMemoryABI::Chunk chunks[8];
+ chunks[0] = arbiter_->GetNewChunk({}, 1 /* target buffer id */, 0);
+ chunks[1] = arbiter_->GetNewChunk({}, 1 /* target buffer id */, 0);
+ chunks[2] = arbiter_->GetNewChunk({}, 1 /* target buffer id */, 0);
+ chunks[3] = arbiter_->GetNewChunk({}, 2 /* target buffer id */, 0);
+ chunks[4] = arbiter_->GetNewChunk({}, 1 /* target buffer id */, 0);
+ chunks[5] = arbiter_->GetNewChunk({}, 1 /* target buffer id */, 0);
+ chunks[6] = arbiter_->GetNewChunk({}, 3 /* target buffer id */, 0);
+ chunks[7] = arbiter_->GetNewChunk({}, 3 /* target buffer id */, 0);
+
+ // "first" == "page index", "second" == "chunk index".
+ std::pair<size_t, size_t> idx[base::ArraySize(chunks)];
+ for (size_t i = 0; i < base::ArraySize(chunks); i++)
+ idx[i] = arbiter_->shmem_abi_for_testing()->GetPageAndChunkIndex(chunks[i]);
+
+ // The first three chunks should lay in the same page, as they target the same
+ // buffer id (1).
+ EXPECT_EQ(idx[0].first, idx[1].first);
+ EXPECT_EQ(idx[0].first, idx[2].first);
+
+ // Check also that the chunk IDs are different.
+ EXPECT_NE(idx[0].second, idx[1].second);
+ EXPECT_NE(idx[1].second, idx[2].second);
+ EXPECT_NE(idx[0].second, idx[2].second);
+
+ // The next one instead should be given a dedicated page because it targets
+ // a different buffer id (2);
+ EXPECT_NE(idx[2].first, idx[3].first);
+
+ // Hoever the next two chunks should be able to fit back into the same page.
+ EXPECT_EQ(idx[4].first, idx[5].first);
+ EXPECT_NE(idx[4].second, idx[5].second);
+
+ // Similarly the last two chunks should be able to share the same page, but
+ // not any page of the previous chunks.
+ EXPECT_NE(idx[0].first, idx[6].first);
+ EXPECT_NE(idx[3].first, idx[6].first);
+ EXPECT_EQ(idx[6].first, idx[7].first);
+ EXPECT_NE(idx[6].second, idx[7].second);
+
+ // TODO(primiano): check that after saturating all the pages, the arbiter
+ // goes back and reuses free chunks of previous pages. e.g., at some point
+ // a chunk targeting buffer id == 1 should be placed into (page:0, chunk:3).
+}
+
+// The buffer has 14 pages (kNumPages), each will be partitioned in 14 chunks.
+// The test requests all 14 * 14 chunks, alternating amongst 14 target buf IDs.
+// Because a chunk can share a page only if all other chunks in the page have
+// the same target buffer ID, there is only one possible final distribution:
+// each page is filled with chunks that all belong to the same buffer ID.
+TEST_P(SharedMemoryArbiterTest, GetAndReturnChunks) {
+ SharedMemoryArbiter::set_default_layout_for_testing(
+ SharedMemoryABI::PageLayout::kPageDiv14);
+ static constexpr size_t kTotChunks = kNumPages * 14;
+ SharedMemoryABI::Chunk chunks[kTotChunks];
+ for (size_t i = 0; i < kTotChunks; i++) {
+ BufferID target_buffer = i % 14;
+ chunks[i] = arbiter_->GetNewChunk({}, target_buffer, 0 /*size_hint*/);
+ ASSERT_TRUE(chunks[i].is_valid());
+ }
+
+ SharedMemoryABI* abi = arbiter_->shmem_abi_for_testing();
+ for (size_t page_idx = 0; page_idx < kNumPages; page_idx++) {
+ ASSERT_FALSE(abi->is_page_free(page_idx));
+ ASSERT_EQ(0u, abi->GetFreeChunks(page_idx));
+ const uint32_t page_layout = abi->page_layout_dbg(page_idx);
+ ASSERT_EQ(14u, SharedMemoryABI::GetNumChunksForLayout(page_layout));
+ ASSERT_EQ(page_idx % 14, abi->page_header(page_idx)->target_buffer.load());
+ for (size_t chunk_idx = 0; chunk_idx < 14; chunk_idx++) {
+ auto chunk = abi->GetChunkUnchecked(page_idx, page_layout, chunk_idx);
+ ASSERT_TRUE(chunk.is_valid());
+ }
+ }
+
+ // Finally return just two pages marking all their chunks as complete, and
+ // check that the notification callback is posted.
+
+ auto on_callback = task_runner_->CreateCheckpoint("on_callback");
+ on_page_complete_ =
+ [on_callback](const std::vector<uint32_t>& completed_pages) {
+ ASSERT_EQ(2u, completed_pages.size());
+ ASSERT_EQ(0u, completed_pages[0]);
+ ASSERT_EQ(3u, completed_pages[1]);
+ on_callback();
+ };
+ for (size_t i = 0; i < 14; i++) {
+ arbiter_->ReturnCompletedChunk(std::move(chunks[14 * i]));
+ arbiter_->ReturnCompletedChunk(std::move(chunks[14 * i + 3]));
+ }
+ task_runner_->RunUntilCheckpoint("on_callback");
+}
+
+// Check that we can actually create up to kMaxWriterID TraceWriter(s).
+TEST_P(SharedMemoryArbiterTest, WriterIDsAllocation) {
+ std::map<WriterID, std::unique_ptr<TraceWriter>> writers;
+ for (size_t i = 0; i < kMaxWriterID; i++) {
+ std::unique_ptr<TraceWriter> writer = arbiter_->CreateTraceWriter(0);
+ ASSERT_TRUE(writer);
+ WriterID writer_id = writer->writer_id();
+ ASSERT_TRUE(writers.emplace(writer_id, std::move(writer)).second);
+ }
+
+ // A further call should fail as we exhausted writer IDs.
+ ASSERT_EQ(nullptr, arbiter_->CreateTraceWriter(0).get());
+}
+
+// TODO(primiano): add multi-threaded tests.
+
+} // namespace
+} // namespace perfetto
diff --git a/src/tracing/core/trace_writer_impl.cc b/src/tracing/core/trace_writer_impl.cc
new file mode 100644
index 0000000..a17e543
--- /dev/null
+++ b/src/tracing/core/trace_writer_impl.cc
@@ -0,0 +1,172 @@
+/*
+ * Copyright (C) 2017 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "src/tracing/core/trace_writer_impl.h"
+
+#include <string.h>
+
+#include <type_traits>
+#include <utility>
+
+#include "perfetto/base/logging.h"
+#include "perfetto/protozero/proto_utils.h"
+#include "src/tracing/core/shared_memory_arbiter.h"
+
+#include "protos/trace_packet.pbzero.h"
+
+// TODO(primiano): right now this class is accumulating a patchlist but not
+// sending it to the service.
+
+using protozero::proto_utils::kMessageLengthFieldSize;
+using protozero::proto_utils::WriteRedundantVarInt;
+using ChunkHeader = perfetto::SharedMemoryABI::ChunkHeader;
+
+namespace perfetto {
+
+namespace {
+constexpr size_t kPacketHeaderSize = SharedMemoryABI::kPacketHeaderSize;
+} // namespace
+
+TraceWriterImpl::TraceWriterImpl(SharedMemoryArbiter* shmem_arbiter,
+ WriterID id,
+ BufferID target_buffer)
+ : shmem_arbiter_(shmem_arbiter),
+ id_(id),
+ target_buffer_(target_buffer),
+ protobuf_stream_writer_(this) {
+ // TODO(primiano): we could handle the case of running out of TraceWriterID(s)
+ // more gracefully and always return a no-op TracePacket in NewTracePacket().
+ PERFETTO_CHECK(id_ != 0);
+
+ cur_packet_.reset(new protos::pbzero::TracePacket());
+ cur_packet_->Finalize(); // To avoid the DCHECK in NewTracePacket().
+}
+
+TraceWriterImpl::~TraceWriterImpl() {
+ // TODO(primiano): this should also return the current chunk. Add tests.
+ shmem_arbiter_->ReleaseWriterID(id_);
+}
+
+TraceWriterImpl::TracePacketHandle TraceWriterImpl::NewTracePacket() {
+ // If we hit this, the caller is calling NewTracePacket() without having
+ // finalized the previous packet.
+ PERFETTO_DCHECK(cur_packet_->is_finalized());
+
+ fragmenting_packet_ = false;
+
+ // TODO: hack to get a new page every time and reduce fragmentation (that
+ // requires stitching support in the service).
+ protobuf_stream_writer_.Reset(GetNewBuffer());
+
+ // Reserve space for the size of the message. Note: this call might re-enter
+ // into this class invoking GetNewBuffer() if there isn't enough space or if
+ // this is the very first call to NewTracePacket().
+ static_assert(
+ kPacketHeaderSize == kMessageLengthFieldSize,
+ "The packet header must match the ProtoZeroMessage header size");
+ cur_packet_->Reset(&protobuf_stream_writer_);
+ uint8_t* header = protobuf_stream_writer_.ReserveBytes(kPacketHeaderSize);
+ memset(header, 0, kPacketHeaderSize);
+ cur_packet_->set_size_field(header);
+ cur_chunk_.IncrementPacketCount();
+ TracePacketHandle handle(cur_packet_.get());
+ cur_fragment_start_ = protobuf_stream_writer_.write_ptr();
+ fragmenting_packet_ = true;
+ return handle;
+}
+
+// Called by the ProtoZeroMessage. We can get here in two cases:
+// 1. In the middle of writing a ProtoZeroMessage,
+// when |fragmenting_packet_| == true. In this case we want to update the
+// chunk header with a partial packet and start a new partial packet in the
+// new chunk.
+// 2. While calling ReserveBytes() for the packet header in NewTracePacket().
+// In this case |fragmenting_packet_| == false and we just want a new chunk
+// without creating any fragments.
+protozero::ContiguousMemoryRange TraceWriterImpl::GetNewBuffer() {
+ if (fragmenting_packet_) {
+ uint8_t* const wptr = protobuf_stream_writer_.write_ptr();
+ PERFETTO_DCHECK(wptr >= cur_fragment_start_);
+ uint32_t partial_size = static_cast<uint32_t>(wptr - cur_fragment_start_);
+ PERFETTO_DCHECK(partial_size < cur_chunk_.size());
+
+ // Backfill the packet header with the fragment size.
+ cur_packet_->inc_size_already_written(partial_size);
+ cur_chunk_.SetFlag(ChunkHeader::kLastPacketContinuesOnNextChunk);
+ WriteRedundantVarInt(partial_size, cur_packet_->size_field());
+
+ // Descend in the stack of non-finalized nested submessages (if any) and
+ // detour their |size_field| into the |patch_list_|. At this point we have
+ // to release the chunk and they cannot write anymore into that.
+ // TODO(primiano): add tests to cover this logic.
+ for (auto* nested_msg = cur_packet_->nested_message(); nested_msg;
+ nested_msg = nested_msg->nested_message()) {
+ uint8_t* const cur_hdr = nested_msg->size_field();
+ PERFETTO_DCHECK(cur_hdr >= cur_chunk_.payload_begin() &&
+ cur_hdr + kMessageLengthFieldSize <= cur_chunk_.end());
+ auto cur_hdr_offset = static_cast<uint16_t>(cur_hdr - cur_chunk_.begin());
+ patch_list_.emplace_front(cur_chunk_id_, cur_hdr_offset);
+ Patch& patch = patch_list_.front();
+ nested_msg->set_size_field(patch.size_field);
+ PERFETTO_DLOG("Created new patchlist entry for protobuf nested message");
+ }
+ }
+
+ if (cur_chunk_.is_valid())
+ shmem_arbiter_->ReturnCompletedChunk(std::move(cur_chunk_));
+
+ // Start a new chunk.
+ ChunkHeader::Identifier identifier = {};
+ identifier.writer_id = id_;
+ identifier.chunk_id = cur_chunk_id_++;
+
+ ChunkHeader::PacketsState packets_state = {};
+ if (fragmenting_packet_) {
+ packets_state.count = 1;
+ packets_state.flags = ChunkHeader::kFirstPacketContinuesFromPrevChunk;
+ }
+
+ // The memory order of the stores below doesn't really matter. This |header|
+ // is just a local temporary object. The GetNewChunk() call below will copy it
+ // into the shared buffer with the proper barriers.
+ ChunkHeader header = {};
+ header.identifier.store(identifier, std::memory_order_relaxed);
+ header.packets_state.store(packets_state, std::memory_order_relaxed);
+
+ cur_chunk_ = shmem_arbiter_->GetNewChunk(header, target_buffer_);
+ uint8_t* payload_begin = cur_chunk_.payload_begin();
+ if (fragmenting_packet_) {
+ cur_packet_->set_size_field(payload_begin);
+ memset(payload_begin, 0, kPacketHeaderSize);
+ payload_begin += kPacketHeaderSize;
+ cur_fragment_start_ = payload_begin;
+ }
+
+ return protozero::ContiguousMemoryRange{payload_begin, cur_chunk_.end()};
+}
+
+WriterID TraceWriterImpl::writer_id() const {
+ return id_;
+};
+
+TraceWriterImpl::Patch::Patch(uint16_t cid, uint16_t offset)
+ : chunk_id(cid), offset_in_chunk(offset) {}
+
+// Base class ctor/dtor definition.
+TraceWriter::TraceWriter() = default;
+TraceWriter::~TraceWriter() = default;
+
+} // namespace perfetto
diff --git a/src/tracing/core/trace_writer_impl.h b/src/tracing/core/trace_writer_impl.h
new file mode 100644
index 0000000..3c7121d
--- /dev/null
+++ b/src/tracing/core/trace_writer_impl.h
@@ -0,0 +1,118 @@
+/*
+ * Copyright (C) 2017 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef SRC_TRACING_CORE_TRACE_WRITER_IMPL_H_
+#define SRC_TRACING_CORE_TRACE_WRITER_IMPL_H_
+
+#include <forward_list>
+
+#include "perfetto/protozero/protozero_message_handle.h"
+#include "perfetto/protozero/scattered_stream_writer.h"
+#include "perfetto/tracing/core/basic_types.h"
+#include "perfetto/tracing/core/shared_memory_abi.h"
+#include "perfetto/tracing/core/trace_writer.h"
+
+namespace perfetto {
+
+class SharedMemoryArbiter;
+
+// See //include/perfetto/tracing/core/trace_writer.h for docs.
+class TraceWriterImpl : public TraceWriter,
+ public protozero::ScatteredStreamWriter::Delegate {
+ public:
+ // TracePacketHandle is defined in trace_writer.h
+
+ TraceWriterImpl(SharedMemoryArbiter*, WriterID, BufferID);
+ ~TraceWriterImpl() override;
+
+ // TraceWriter implementation. See documentation in trace_writer.h .
+ TracePacketHandle NewTracePacket() override;
+ WriterID writer_id() const override;
+
+ private:
+ // Used to handle the backfilling of the headers (the |size_field|) of nested
+ // messages when a proto is fragmented over several chunks. This patchlist is
+ // sent out of band to the tracing service, after having returned the initial
+ // chunks of the fragment.
+ struct Patch {
+ Patch(uint16_t chunk_id, uint16_t offset);
+ Patch(const Patch&) = delete;
+ Patch& operator=(const Patch&) = delete;
+ Patch(Patch&&) noexcept = delete;
+ Patch& operator=(Patch&&) = delete;
+
+ const uint16_t chunk_id;
+ const uint16_t offset_in_chunk;
+ uint8_t size_field[SharedMemoryABI::kPacketHeaderSize] = {};
+ };
+ TraceWriterImpl(const TraceWriterImpl&) = delete;
+ TraceWriterImpl& operator=(const TraceWriterImpl&) = delete;
+
+ // ScatteredStreamWriter::Delegate implementation.
+ protozero::ContiguousMemoryRange GetNewBuffer() override;
+
+ // The per-producer arbiter that coordinates access to the shared memory
+ // buffer from several threads.
+ SharedMemoryArbiter* const shmem_arbiter_;
+
+ // ID of the current writer.
+ const WriterID id_;
+
+ // This is just copied back into the chunk header.
+ // See comments in data_source_config.proto for |target_buffer|.
+ const BufferID target_buffer_;
+
+ // Monotonic (% wrapping) sequence id of the chunk. Together with the WriterID
+ // this allows the Service to reconstruct the linear sequence of packets.
+ uint16_t cur_chunk_id_ = 0;
+
+ // The chunk we are holding onto (if any).
+ SharedMemoryABI::Chunk cur_chunk_;
+
+ // Passed to protozero message to write directly into |cur_chunk_|. It
+ // keeps track of the write pointer. It calls us back (GetNewBuffer()) when
+ // |cur_chunk_| is filled.
+ protozero::ScatteredStreamWriter protobuf_stream_writer_;
+
+ // The packet returned via NewTracePacket(). Its owned by this class,
+ // TracePacketHandle has just a pointer to it.
+ std::unique_ptr<protos::pbzero::TracePacket> cur_packet_;
+
+ // The start address of |cur_packet_| within |cur_chunk_|. Used to figure out
+ // fragments sizes when a TracePacket write is interrupted by GetNewBuffer().
+ uint8_t* cur_fragment_start_ = nullptr;
+
+ // true if we received a call to GetNewBuffer() after NewTracePacket(),
+ // false if GetNewBuffer() happened during NewTracePacket() prologue, while
+ // starting the TracePacket header.
+ bool fragmenting_packet_ = false;
+
+ // When a packet is fragmented across different chunks, the |size_field| of
+ // the outstanding nested protobuf messages is redirected onto Patch entries
+ // in this list at the time the Chunk is returned (because at that point we
+ // have to release the ownership of the current Chunk). This list will be
+ // later sent out-of-band to the tracing service, who will patch the required
+ // chunks, if they are still around.
+ // Note: the ProtoZeroMessage will take pointers to the |size_field| of these
+ // entries. This container must guarantee that the Patch objects are never
+ // moved around (i.e. cannot be a vector because of reallocations can change
+ // addresses of pre-existing entries).
+ std::forward_list<Patch> patch_list_;
+};
+
+} // namespace perfetto
+
+#endif // SRC_TRACING_CORE_TRACE_WRITER_IMPL_H_
diff --git a/src/tracing/core/trace_writer_impl_unittest.cc b/src/tracing/core/trace_writer_impl_unittest.cc
new file mode 100644
index 0000000..15b46a8
--- /dev/null
+++ b/src/tracing/core/trace_writer_impl_unittest.cc
@@ -0,0 +1,90 @@
+/*
+ * Copyright (C) 2017 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "src/tracing/core/trace_writer_impl.h"
+
+#include "gtest/gtest.h"
+#include "perfetto/base/utils.h"
+#include "perfetto/tracing/core/trace_writer.h"
+#include "src/base/test/test_task_runner.h"
+#include "src/tracing/core/shared_memory_arbiter.h"
+#include "src/tracing/test/aligned_buffer_test.h"
+
+#include "protos/trace_packet.pbzero.h"
+
+namespace perfetto {
+namespace {
+
+class TraceWriterImplTest : public AlignedBufferTest {
+ public:
+ void SetUp() override {
+ SharedMemoryArbiter::set_default_layout_for_testing(
+ SharedMemoryABI::PageLayout::kPageDiv4);
+ AlignedBufferTest::SetUp();
+ auto callback = [](const std::vector<uint32_t>& arg) {};
+ task_runner_.reset(new base::TestTaskRunner());
+ arbiter_.reset(new SharedMemoryArbiter(buf(), buf_size(), page_size(),
+ callback, task_runner_.get()));
+ }
+
+ void TearDown() override {
+ arbiter_.reset();
+ task_runner_.reset();
+ }
+
+ std::unique_ptr<base::TestTaskRunner> task_runner_;
+ std::unique_ptr<SharedMemoryArbiter> arbiter_;
+ std::function<void(const std::vector<uint32_t>&)> on_page_complete_;
+};
+
+size_t const kPageSizes[] = {4096, 65536};
+INSTANTIATE_TEST_CASE_P(PageSize,
+ TraceWriterImplTest,
+ ::testing::ValuesIn(kPageSizes));
+
+TEST_P(TraceWriterImplTest, SingleWriter) {
+ const BufferID tgt_buf_id = 42;
+ std::unique_ptr<TraceWriter> writer = arbiter_->CreateTraceWriter(tgt_buf_id);
+ for (int i = 0; i < 32; i++) {
+ auto packet = writer->NewTracePacket();
+ char str[16];
+ sprintf(str, "foobar %d", i);
+ packet->set_test(str);
+ }
+ writer->NewTracePacket()->set_test("workaround for returing the last chunk");
+ writer.reset();
+
+ SharedMemoryABI* abi = arbiter_->shmem_abi_for_testing();
+ size_t packets_count = 0;
+ for (size_t page_idx = 0; page_idx < kNumPages; page_idx++) {
+ uint32_t page_layout = abi->page_layout_dbg(page_idx);
+ size_t num_chunks = SharedMemoryABI::GetNumChunksForLayout(page_layout);
+ for (size_t chunk_idx = 0; chunk_idx < num_chunks; chunk_idx++) {
+ auto chunk =
+ abi->TryAcquireChunkForReading(page_idx, chunk_idx, tgt_buf_id);
+ if (!chunk.is_valid())
+ continue;
+ packets_count += chunk.header()->packets_state.load().count;
+ }
+ }
+ EXPECT_EQ(32u, packets_count);
+ // TODO(primiano): check also the content of the packets decoding the protos.
+}
+
+// TODO(primiano): add multi-writer test.
+
+} // namespace
+} // namespace perfetto
diff --git a/src/tracing/test/aligned_buffer_test.cc b/src/tracing/test/aligned_buffer_test.cc
new file mode 100644
index 0000000..6a603ea
--- /dev/null
+++ b/src/tracing/test/aligned_buffer_test.cc
@@ -0,0 +1,35 @@
+/*
+ * Copyright (C) 2017 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "src/tracing/test/aligned_buffer_test.h"
+
+#include "perfetto/base/logging.h"
+
+namespace perfetto {
+
+// static
+constexpr size_t AlignedBufferTest::kNumPages;
+
+void AlignedBufferTest::SetUp() {
+ page_size_ = GetParam();
+ buf_.reset(new TestSharedMemory(page_size_ * kNumPages));
+}
+
+void AlignedBufferTest::TearDown() {
+ buf_.reset();
+}
+
+} // namespace perfetto
diff --git a/src/tracing/test/aligned_buffer_test.h b/src/tracing/test/aligned_buffer_test.h
new file mode 100644
index 0000000..23c8b1c
--- /dev/null
+++ b/src/tracing/test/aligned_buffer_test.h
@@ -0,0 +1,51 @@
+/*
+ * Copyright (C) 2017 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef SRC_TRACING_TEST_ALIGNED_BUFFER_TEST_H_
+#define SRC_TRACING_TEST_ALIGNED_BUFFER_TEST_H_
+
+#include <stdlib.h>
+
+#include <memory>
+
+#include "gtest/gtest.h"
+#include "perfetto/base/utils.h"
+#include "src/tracing/test/test_shared_memory.h"
+
+namespace perfetto {
+
+// Base parametrized test for unittests that require an aligned buffer.
+class AlignedBufferTest : public ::testing::TestWithParam<size_t> {
+ public:
+ static constexpr size_t kNumPages = 14;
+ void SetUp() override;
+ void TearDown() override;
+
+ uint8_t* buf() const { return reinterpret_cast<uint8_t*>(buf_->start()); }
+ size_t buf_size() const { return buf_->size(); }
+ size_t page_size() const { return page_size_; }
+
+ private:
+ size_t page_size_ = 0;
+
+ // This doesn't need any sharing. TestSharedMemory just happens to have the
+ // right harness to create a zeroed page-aligned buffer.
+ std::unique_ptr<TestSharedMemory> buf_;
+};
+
+} // namespace perfetto
+
+#endif // SRC_TRACING_TEST_ALIGNED_BUFFER_TEST_H_