IPC plumbing for TraceWriter and service-side Consumer port.
This CL fixes most of the IPC plumbing after TraceWriter and
related methods are landed. This mainly reduces the need of
keeping .proto and core objects in sync. Also exposes the
service-side implementation of the Consumer port.
Finally gets rid of the ObserverForTesting, which at the end
turned out to be unneeded.
Bug: 70284518
Bug: 68854243
Change-Id: I5e9156fee2945dc8bd42d7016046afb7da643411
diff --git a/Android.bp b/Android.bp
index c5411f1..024a3e5 100644
--- a/Android.bp
+++ b/Android.bp
@@ -443,6 +443,7 @@
"src/tracing/ipc/posix_shared_memory.cc",
"src/tracing/ipc/posix_shared_memory_unittest.cc",
"src/tracing/ipc/producer/producer_ipc_client_impl.cc",
+ "src/tracing/ipc/service/consumer_ipc_service.cc",
"src/tracing/ipc/service/producer_ipc_service.cc",
"src/tracing/ipc/service/service_ipc_host_impl.cc",
"src/tracing/test/aligned_buffer_test.cc",
diff --git a/include/perfetto/tracing/core/service.h b/include/perfetto/tracing/core/service.h
index 2e6e15b..1acb2b3 100644
--- a/include/perfetto/tracing/core/service.h
+++ b/include/perfetto/tracing/core/service.h
@@ -36,6 +36,7 @@
class DataSourceDescriptor;
class Producer;
class TraceConfig;
+class TraceWriter;
// TODO: for the moment this assumes that all the calls hapen on the same
// thread/sequence. Not sure this will be the case long term in Chrome.
@@ -76,8 +77,22 @@
virtual void NotifySharedMemoryUpdate(
const std::vector<uint32_t>& changed_pages) = 0;
- // Returns the SharedMemory buffer for this Producer.
+ // TODO(primiano): remove this, we shouldn't be exposing the raw
+ // SHM object but only the TraceWriter (below).
virtual SharedMemory* shared_memory() const = 0;
+
+ // Creates a trace writer, which allows to create events, handling the
+ // underying shared memory buffer and signalling to the Service. This method
+ // is thread-safe but the returned object is not. A TraceWriter should be
+ // used only from a single thread, or the caller has to handle sequencing
+ // via a mutex or equivalent.
+ // Args:
+ // |target_buffer| is the target buffer ID where the data produced by the
+ // writer should be stored by the tracing service. This value is passed
+ // upon creation of the data source (CreateDataSourceInstance()) in the
+ // DataSourceConfig.target_buffer().
+ virtual std::unique_ptr<TraceWriter> CreateTraceWriter(
+ BufferID target_buffer) = 0;
}; // class ProducerEndpoint.
// The API for the Consumer port of the Service.
@@ -126,24 +141,6 @@
// To disconnect just destroy the returned ConsumerEndpoint object. It is safe
// to destroy the Consumer once the Consumer::OnDisconnect() has been invoked.
virtual std::unique_ptr<ConsumerEndpoint> ConnectConsumer(Consumer*) = 0;
-
- public: // Testing-only
- class ObserverForTesting {
- public:
- virtual ~ObserverForTesting() {}
- virtual void OnProducerConnected(ProducerID) {}
- virtual void OnProducerDisconnected(ProducerID) {}
- virtual void OnDataSourceRegistered(ProducerID, DataSourceID) {}
- virtual void OnDataSourceUnregistered(ProducerID, DataSourceID) {}
- virtual void OnDataSourceInstanceCreated(ProducerID,
- DataSourceID,
- DataSourceInstanceID) {}
- virtual void OnDataSourceInstanceDestroyed(ProducerID,
- DataSourceID,
- DataSourceInstanceID) {}
- };
-
- virtual void set_observer_for_testing(ObserverForTesting*) = 0;
};
} // namespace perfetto
diff --git a/include/perfetto/tracing/core/trace_packet.h b/include/perfetto/tracing/core/trace_packet.h
index dbaa823..f1c703b 100644
--- a/include/perfetto/tracing/core/trace_packet.h
+++ b/include/perfetto/tracing/core/trace_packet.h
@@ -67,6 +67,9 @@
// Mutator, used only by the service and tests.
void AddChunk(Chunk);
+ // Total size of all chunks.
+ size_t size() const { return size_; }
+
private:
TracePacket(const TracePacket&) = delete;
TracePacket& operator=(const TracePacket&) = delete;
@@ -74,6 +77,7 @@
// TODO(primiano): who owns the memory of the chunks? Figure out later.
ChunkSequence chunks_; // Not owned.
+ size_t size_ = 0; // SUM(chunk.size for chunk in chunks_).
std::unique_ptr<DecodedTracePacket> decoded_packet_;
};
diff --git a/include/perfetto/tracing/ipc/service_ipc_host.h b/include/perfetto/tracing/ipc/service_ipc_host.h
index f6e4246..9a3380e 100644
--- a/include/perfetto/tracing/ipc/service_ipc_host.h
+++ b/include/perfetto/tracing/ipc/service_ipc_host.h
@@ -40,7 +40,8 @@
// Start listening on the Producer & Consumer ports. Returns false in case of
// failure (e.g., something else is listening on |socket_name|).
- virtual bool Start(const char* producer_socket_name) = 0;
+ virtual bool Start(const char* producer_socket_name,
+ const char* consumer_socket_name) = 0;
protected:
ServiceIPCHost();
diff --git a/src/tracing/BUILD.gn b/src/tracing/BUILD.gn
index bbfa964..e805494 100644
--- a/src/tracing/BUILD.gn
+++ b/src/tracing/BUILD.gn
@@ -59,6 +59,8 @@
"ipc/posix_shared_memory.h",
"ipc/producer/producer_ipc_client_impl.cc",
"ipc/producer/producer_ipc_client_impl.h",
+ "ipc/service/consumer_ipc_service.cc",
+ "ipc/service/consumer_ipc_service.h",
"ipc/service/producer_ipc_service.cc",
"ipc/service/producer_ipc_service.h",
"ipc/service/service_ipc_host_impl.cc",
diff --git a/src/tracing/core/service_impl.cc b/src/tracing/core/service_impl.cc
index a039d52..6785bdb 100644
--- a/src/tracing/core/service_impl.cc
+++ b/src/tracing/core/service_impl.cc
@@ -73,16 +73,12 @@
auto it_and_inserted = producers_.emplace(id, endpoint.get());
PERFETTO_DCHECK(it_and_inserted.second);
task_runner_->PostTask(std::bind(&Producer::OnConnect, endpoint->producer()));
- if (observer_)
- observer_->OnProducerConnected(id);
return std::move(endpoint);
}
void ServiceImpl::DisconnectProducer(ProducerID id) {
PERFETTO_DCHECK(producers_.count(id));
producers_.erase(id);
- if (observer_)
- observer_->OnProducerDisconnected(id);
}
ServiceImpl::ProducerEndpointImpl* ServiceImpl::GetProducer(
@@ -187,16 +183,12 @@
const DataSourceID dsid = ++last_data_source_id_;
task_runner_->PostTask(std::bind(std::move(callback), dsid));
// TODO implement the bookkeeping logic.
- if (service_->observer_)
- service_->observer_->OnDataSourceRegistered(id_, dsid);
}
void ServiceImpl::ProducerEndpointImpl::UnregisterDataSource(
DataSourceID dsid) {
PERFETTO_CHECK(dsid);
// TODO implement the bookkeeping logic.
- if (service_->observer_)
- service_->observer_->OnDataSourceUnregistered(id_, dsid);
}
void ServiceImpl::ProducerEndpointImpl::NotifySharedMemoryUpdate(
@@ -205,8 +197,13 @@
return;
}
-void ServiceImpl::set_observer_for_testing(ObserverForTesting* observer) {
- observer_ = observer;
+std::unique_ptr<TraceWriter>
+ServiceImpl::ProducerEndpointImpl::CreateTraceWriter(BufferID) {
+ // TODO(primiano): not implemented yet.
+ // This code path is hit only in in-process configuration, where tracing
+ // Service and Producer are hosted in the same process. It's a use case we
+ // want to support, but not too interesting right now.
+ PERFETTO_CHECK(false);
}
SharedMemory* ServiceImpl::ProducerEndpointImpl::shared_memory() const {
diff --git a/src/tracing/core/service_impl.h b/src/tracing/core/service_impl.h
index 24a39bb..91db0fb 100644
--- a/src/tracing/core/service_impl.h
+++ b/src/tracing/core/service_impl.h
@@ -57,10 +57,9 @@
void RegisterDataSource(const DataSourceDescriptor&,
RegisterDataSourceCallback) override;
void UnregisterDataSource(DataSourceID) override;
-
void NotifySharedMemoryUpdate(
const std::vector<uint32_t>& changed_pages) override;
-
+ std::unique_ptr<TraceWriter> CreateTraceWriter(BufferID) override;
SharedMemory* shared_memory() const override;
private:
@@ -121,8 +120,6 @@
std::unique_ptr<Service::ConsumerEndpoint> ConnectConsumer(
Consumer*) override;
- void set_observer_for_testing(ObserverForTesting*) override;
-
// Exposed mainly for testing.
size_t num_producers() const { return producers_.size(); }
ProducerEndpointImpl* GetProducer(ProducerID) const;
@@ -136,7 +133,6 @@
ProducerID last_producer_id_ = 0;
std::map<ProducerID, ProducerEndpointImpl*> producers_;
std::set<ConsumerEndpointImpl*> consumers_;
- ObserverForTesting* observer_ = nullptr;
};
} // namespace perfetto
diff --git a/src/tracing/core/shared_memory_arbiter.cc b/src/tracing/core/shared_memory_arbiter.cc
index e2a059d..de08dc0 100644
--- a/src/tracing/core/shared_memory_arbiter.cc
+++ b/src/tracing/core/shared_memory_arbiter.cc
@@ -33,10 +33,10 @@
SharedMemoryArbiter::SharedMemoryArbiter(void* start,
size_t size,
size_t page_size,
- OnPageCompleteCallback callback,
+ OnPagesCompleteCallback callback,
base::TaskRunner* task_runner)
: task_runner_(task_runner),
- on_page_complete_callback_(std::move(callback)),
+ on_pages_complete_callback_(std::move(callback)),
shmem_abi_(reinterpret_cast<uint8_t*>(start), size, page_size),
active_writer_ids_(SharedMemoryABI::kMaxWriterID + 1) {}
@@ -130,19 +130,19 @@
if (should_post_callback) {
// TODO what happens if the arbiter gets destroyed?
task_runner_->PostTask(
- std::bind(&SharedMemoryArbiter::InvokeOnPageCompleteCallback, this));
+ std::bind(&SharedMemoryArbiter::InvokeOnPagesCompleteCallback, this));
}
}
// This is always invoked on the |task_runner_| thread.
-void SharedMemoryArbiter::InvokeOnPageCompleteCallback() {
+void SharedMemoryArbiter::InvokeOnPagesCompleteCallback() {
std::vector<uint32_t> pages_to_notify;
{
std::lock_guard<std::mutex> scoped_lock(lock_);
pages_to_notify = std::move(pages_to_notify_);
pages_to_notify_.clear();
}
- on_page_complete_callback_(pages_to_notify);
+ on_pages_complete_callback_(pages_to_notify);
}
std::unique_ptr<TraceWriter> SharedMemoryArbiter::CreateTraceWriter(
diff --git a/src/tracing/core/shared_memory_arbiter.h b/src/tracing/core/shared_memory_arbiter.h
index 34267bf..050f2fc 100644
--- a/src/tracing/core/shared_memory_arbiter.h
+++ b/src/tracing/core/shared_memory_arbiter.h
@@ -43,20 +43,20 @@
// current thread-local chunk.
class SharedMemoryArbiter {
public:
- using OnPageCompleteCallback =
+ using OnPagesCompleteCallback =
std::function<void(const std::vector<uint32_t>& /*page_indexes*/)>;
// Args:
// |start|,|size|: boundaries of the shared memory buffer.
// |page_size|: a multiple of 4KB that defines the granularity of tracing
// pages. See tradeoff considerations in shared_memory_abi.h.
- // |OnPageCompleteCallback|: a callback that will be posted on the passed
+ // |OnPagesCompleteCallback|: a callback that will be posted on the passed
// |TaskRunner| when one or more pages are complete (and hence the Producer
// should send a NotifySharedMemoryUpdate() to the Service).
SharedMemoryArbiter(void* start,
size_t size,
size_t page_size,
- OnPageCompleteCallback,
+ OnPagesCompleteCallback,
base::TaskRunner*);
// Creates a new TraceWriter and assigns it a new WriterID. The WriterID is
@@ -93,10 +93,10 @@
// Called by the TraceWriter destructor.
void ReleaseWriterID(WriterID);
- void InvokeOnPageCompleteCallback();
+ void InvokeOnPagesCompleteCallback();
base::TaskRunner* const task_runner_;
- OnPageCompleteCallback on_page_complete_callback_;
+ OnPagesCompleteCallback on_pages_complete_callback_;
// --- Begin lock-protected members ---
std::mutex lock_;
diff --git a/src/tracing/core/shared_memory_arbiter_unittest.cc b/src/tracing/core/shared_memory_arbiter_unittest.cc
index d2b511b..9acd260 100644
--- a/src/tracing/core/shared_memory_arbiter_unittest.cc
+++ b/src/tracing/core/shared_memory_arbiter_unittest.cc
@@ -34,8 +34,8 @@
void SetUp() override {
AlignedBufferTest::SetUp();
auto callback = [this](const std::vector<uint32_t>& arg) {
- if (on_page_complete_)
- on_page_complete_(arg);
+ if (on_pages_complete_)
+ on_pages_complete_(arg);
};
task_runner_.reset(new base::TestTaskRunner());
arbiter_.reset(new SharedMemoryArbiter(buf(), buf_size(), page_size(),
@@ -49,7 +49,7 @@
std::unique_ptr<base::TestTaskRunner> task_runner_;
std::unique_ptr<SharedMemoryArbiter> arbiter_;
- std::function<void(const std::vector<uint32_t>&)> on_page_complete_;
+ std::function<void(const std::vector<uint32_t>&)> on_pages_complete_;
};
size_t const kPageSizes[] = {4096, 65536};
@@ -140,7 +140,7 @@
// check that the notification callback is posted.
auto on_callback = task_runner_->CreateCheckpoint("on_callback");
- on_page_complete_ =
+ on_pages_complete_ =
[on_callback](const std::vector<uint32_t>& completed_pages) {
ASSERT_EQ(2u, completed_pages.size());
ASSERT_EQ(0u, completed_pages[0]);
diff --git a/src/tracing/core/trace_packet.cc b/src/tracing/core/trace_packet.cc
index 8a3637c..8943984 100644
--- a/src/tracing/core/trace_packet.cc
+++ b/src/tracing/core/trace_packet.cc
@@ -42,6 +42,7 @@
void TracePacket::AddChunk(Chunk chunk) {
chunks_.push_back(chunk);
+ size_ += chunk.size;
}
} // namespace perfetto
diff --git a/src/tracing/core/trace_packet_unittest.cc b/src/tracing/core/trace_packet_unittest.cc
index 5f71a39..941290e 100644
--- a/src/tracing/core/trace_packet_unittest.cc
+++ b/src/tracing/core/trace_packet_unittest.cc
@@ -62,6 +62,7 @@
tp.AddChunk({ser_buf.data(), 3});
tp.AddChunk({ser_buf.data() + 3, 5});
tp.AddChunk({ser_buf.data() + 3 + 5, ser_buf.size() - 3 - 5});
+ ASSERT_EQ(ser_buf.size(), tp.size());
auto chunk = tp.begin();
ASSERT_NE(tp.end(), chunk);
diff --git a/src/tracing/core/trace_writer_impl_unittest.cc b/src/tracing/core/trace_writer_impl_unittest.cc
index 15b46a8..3b86acf 100644
--- a/src/tracing/core/trace_writer_impl_unittest.cc
+++ b/src/tracing/core/trace_writer_impl_unittest.cc
@@ -47,7 +47,7 @@
std::unique_ptr<base::TestTaskRunner> task_runner_;
std::unique_ptr<SharedMemoryArbiter> arbiter_;
- std::function<void(const std::vector<uint32_t>&)> on_page_complete_;
+ std::function<void(const std::vector<uint32_t>&)> on_pages_complete_;
};
size_t const kPageSizes[] = {4096, 65536};
diff --git a/src/tracing/ipc/producer/producer_ipc_client_impl.cc b/src/tracing/ipc/producer/producer_ipc_client_impl.cc
index 548c767..277a9b5 100644
--- a/src/tracing/ipc/producer/producer_ipc_client_impl.cc
+++ b/src/tracing/ipc/producer/producer_ipc_client_impl.cc
@@ -24,6 +24,8 @@
#include "perfetto/tracing/core/data_source_config.h"
#include "perfetto/tracing/core/data_source_descriptor.h"
#include "perfetto/tracing/core/producer.h"
+#include "perfetto/tracing/core/trace_writer.h"
+#include "src/tracing/core/shared_memory_arbiter.h"
#include "src/tracing/ipc/posix_shared_memory.h"
// TODO think to what happens when ProducerIPCClientImpl gets destroyed
@@ -49,12 +51,14 @@
ipc_channel_(ipc::Client::CreateInstance(service_sock_name, task_runner)),
producer_port_(this /* event_listener */) {
ipc_channel_->BindService(producer_port_.GetWeakPtr());
+ PERFETTO_DCHECK_THREAD(thread_checker_);
}
ProducerIPCClientImpl::~ProducerIPCClientImpl() = default;
// Called by the IPC layer if the BindService() succeeds.
void ProducerIPCClientImpl::OnConnect() {
+ PERFETTO_DCHECK_THREAD(thread_checker_);
connected_ = true;
// The IPC layer guarantees that any outstanding callback will be dropped on
@@ -78,12 +82,14 @@
}
void ProducerIPCClientImpl::OnDisconnect() {
+ PERFETTO_DCHECK_THREAD(thread_checker_);
PERFETTO_DLOG("Tracing service connection failure");
connected_ = false;
producer_->OnDisconnect();
}
void ProducerIPCClientImpl::OnConnectionInitialized(bool connection_succeeded) {
+ PERFETTO_DCHECK_THREAD(thread_checker_);
// If connection_succeeded == false, the OnDisconnect() call will follow next
// and there we'll notify the |producer_|. TODO: add a test for this.
if (!connection_succeeded)
@@ -91,19 +97,47 @@
base::ScopedFile shmem_fd = ipc_channel_->TakeReceivedFD();
PERFETTO_CHECK(shmem_fd);
+
+ // TODO(primiano): handle mmap failure in case of OOM.
shared_memory_ = PosixSharedMemory::AttachToFd(std::move(shmem_fd));
+
+ auto on_pages_complete = [this](const std::vector<uint32_t>& changed_pages) {
+ OnPagesComplete(changed_pages);
+ };
+ shared_memory_arbiter_.reset(
+ new SharedMemoryArbiter(shared_memory_->start(), shared_memory_->size(),
+ 4096 /* TODO where does this come from? */,
+ on_pages_complete, task_runner_));
+
producer_->OnConnect();
}
+// Called by SharedMemoryArbiter when some chunks are complete and we need to
+// notify the service about that.
+void ProducerIPCClientImpl::OnPagesComplete(
+ const std::vector<uint32_t>& changed_pages) {
+ PERFETTO_DCHECK_THREAD(thread_checker_);
+ if (!connected_) {
+ PERFETTO_DLOG("Cannot OnPagesComplete(), not connected to tracing service");
+ return;
+ }
+ NotifySharedMemoryUpdateRequest req;
+ for (uint32_t page_idx : changed_pages)
+ req.add_changed_pages(page_idx);
+
+ producer_port_.NotifySharedMemoryUpdate(
+ req, ipc::Deferred<NotifySharedMemoryUpdateResponse>());
+}
+
void ProducerIPCClientImpl::OnServiceRequest(
const GetAsyncCommandResponse& cmd) {
+ PERFETTO_DCHECK_THREAD(thread_checker_);
if (cmd.cmd_case() == GetAsyncCommandResponse::kStartDataSource) {
// Keep this in sync with chages in data_source_config.proto.
const auto& req = cmd.start_data_source();
const DataSourceInstanceID dsid = req.new_instance_id();
- const protos::DataSourceConfig& proto_cfg = req.config();
DataSourceConfig cfg;
- cfg.set_trace_category_filters(proto_cfg.trace_category_filters());
+ cfg.FromProto(req.config());
producer_->CreateDataSourceInstance(dsid, cfg);
return;
}
@@ -121,15 +155,14 @@
void ProducerIPCClientImpl::RegisterDataSource(
const DataSourceDescriptor& descriptor,
RegisterDataSourceCallback callback) {
+ PERFETTO_DCHECK_THREAD(thread_checker_);
if (!connected_) {
PERFETTO_DLOG(
"Cannot RegisterDataSource(), not connected to tracing service");
return task_runner_->PostTask(std::bind(callback, 0));
}
- // Keep this in sync with changes in data_source_descriptor.proto.
RegisterDataSourceRequest req;
- auto* proto_descriptor = req.mutable_data_source_descriptor();
- proto_descriptor->set_name(descriptor.name());
+ descriptor.ToProto(req.mutable_data_source_descriptor());
ipc::Deferred<RegisterDataSourceResponse> async_response;
// TODO: add a test that destroys the IPC channel soon after this call and
// checks that the callback(0) is invoked.
@@ -151,6 +184,7 @@
}
void ProducerIPCClientImpl::UnregisterDataSource(DataSourceID id) {
+ PERFETTO_DCHECK_THREAD(thread_checker_);
if (!connected_) {
PERFETTO_DLOG(
"Cannot UnregisterDataSource(), not connected to tracing service");
@@ -164,6 +198,7 @@
void ProducerIPCClientImpl::NotifySharedMemoryUpdate(
const std::vector<uint32_t>& changed_pages) {
+ PERFETTO_DCHECK_THREAD(thread_checker_);
if (!connected_) {
PERFETTO_DLOG(
"Cannot NotifySharedMemoryUpdate(), not connected to tracing service");
@@ -176,6 +211,13 @@
req, ipc::Deferred<NotifySharedMemoryUpdateResponse>());
}
+std::unique_ptr<TraceWriter> ProducerIPCClientImpl::CreateTraceWriter(
+ BufferID target_buffer) {
+ // This method can be called by different threads. |shared_memory_arbiter_| is
+ // thread-safe but be aware of accessing any other state in this function.
+ return shared_memory_arbiter_->CreateTraceWriter(target_buffer);
+}
+
SharedMemory* ProducerIPCClientImpl::shared_memory() const {
return shared_memory_.get();
}
diff --git a/src/tracing/ipc/producer/producer_ipc_client_impl.h b/src/tracing/ipc/producer/producer_ipc_client_impl.h
index 020032c..745b786 100644
--- a/src/tracing/ipc/producer/producer_ipc_client_impl.h
+++ b/src/tracing/ipc/producer/producer_ipc_client_impl.h
@@ -21,6 +21,7 @@
#include <vector>
+#include "perfetto/base/thread_checker.h"
#include "perfetto/ipc/service_proxy.h"
#include "perfetto/tracing/core/basic_types.h"
#include "perfetto/tracing/core/service.h"
@@ -40,6 +41,7 @@
class Producer;
class PosixSharedMemory;
+class SharedMemoryArbiter;
// Exposes a Service endpoint to Producer(s), proxying all requests through a
// IPC channel to the remote Service. This class is the glue layer between the
@@ -61,6 +63,8 @@
void UnregisterDataSource(DataSourceID) override;
void NotifySharedMemoryUpdate(
const std::vector<uint32_t>& changed_pages) override;
+ std::unique_ptr<TraceWriter> CreateTraceWriter(
+ BufferID target_buffer) override;
SharedMemory* shared_memory() const override;
// ipc::ServiceProxy::EventListener implementation.
@@ -77,6 +81,9 @@
// (e.g. start/stop a data source).
void OnServiceRequest(const GetAsyncCommandResponse&);
+ // Callback passed to SharedMemoryArbiter.
+ void OnPagesComplete(const std::vector<uint32_t>&);
+
// TODO think to destruction order, do we rely on any specific dtor sequence?
Producer* const producer_;
base::TaskRunner* const task_runner_;
@@ -89,7 +96,9 @@
ProducerPortProxy producer_port_;
std::unique_ptr<PosixSharedMemory> shared_memory_;
+ std::unique_ptr<SharedMemoryArbiter> shared_memory_arbiter_;
bool connected_ = false;
+ PERFETTO_THREAD_CHECKER(thread_checker_)
};
} // namespace perfetto
diff --git a/src/tracing/ipc/service/consumer_ipc_service.cc b/src/tracing/ipc/service/consumer_ipc_service.cc
new file mode 100644
index 0000000..5371e61
--- /dev/null
+++ b/src/tracing/ipc/service/consumer_ipc_service.cc
@@ -0,0 +1,124 @@
+/*
+ * Copyright (C) 2017 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "src/tracing/ipc/service/consumer_ipc_service.h"
+
+#include <inttypes.h>
+
+#include "perfetto/base/logging.h"
+#include "perfetto/base/task_runner.h"
+#include "perfetto/ipc/host.h"
+#include "perfetto/tracing/core/chunk.h"
+#include "perfetto/tracing/core/service.h"
+#include "perfetto/tracing/core/trace_config.h"
+#include "perfetto/tracing/core/trace_packet.h"
+
+namespace perfetto {
+
+ConsumerIPCService::ConsumerIPCService(Service* core_service)
+ : core_service_(core_service), weak_ptr_factory_(this) {}
+
+ConsumerIPCService::~ConsumerIPCService() = default;
+
+ConsumerIPCService::RemoteConsumer*
+ConsumerIPCService::GetConsumerForCurrentRequest() {
+ const ipc::ClientID ipc_client_id = ipc::Service::client_info().client_id();
+ PERFETTO_CHECK(ipc_client_id);
+ auto it = consumers_.find(ipc_client_id);
+ if (it == consumers_.end()) {
+ auto* remote_consumer = new RemoteConsumer();
+ consumers_[ipc_client_id].reset(remote_consumer);
+ remote_consumer->service_endpoint =
+ core_service_->ConnectConsumer(remote_consumer);
+ return remote_consumer;
+ }
+ return it->second.get();
+}
+
+// Called by the IPC layer.
+void ConsumerIPCService::OnClientDisconnected() {
+ ipc::ClientID client_id = ipc::Service::client_info().client_id();
+ PERFETTO_DLOG("Consumer %" PRIu64 " disconnected", client_id);
+ consumers_.erase(client_id);
+}
+
+// Called by the IPC layer.
+void ConsumerIPCService::EnableTracing(const EnableTracingRequest& req,
+ DeferredEnableTracingResponse resp) {
+ TraceConfig trace_config;
+ trace_config.FromProto(req.trace_config());
+ GetConsumerForCurrentRequest()->service_endpoint->EnableTracing(trace_config);
+ resp.Resolve(ipc::AsyncResult<EnableTracingResponse>::Create());
+}
+
+// Called by the IPC layer.
+void ConsumerIPCService::DisableTracing(const DisableTracingRequest& req,
+ DeferredDisableTracingResponse resp) {
+ GetConsumerForCurrentRequest()->service_endpoint->DisableTracing();
+ resp.Resolve(ipc::AsyncResult<DisableTracingResponse>::Create());
+}
+
+// Called by the IPC layer.
+void ConsumerIPCService::ReadBuffers(const ReadBuffersRequest& req,
+ DeferredReadBuffersResponse resp) {
+ RemoteConsumer* remote_consumer = GetConsumerForCurrentRequest();
+ remote_consumer->read_buffers_response = std::move(resp);
+ remote_consumer->service_endpoint->ReadBuffers();
+}
+
+// Called by the IPC layer.
+void ConsumerIPCService::FreeBuffers(const FreeBuffersRequest& req,
+ DeferredFreeBuffersResponse resp) {
+ GetConsumerForCurrentRequest()->service_endpoint->FreeBuffers();
+ resp.Resolve(ipc::AsyncResult<FreeBuffersResponse>::Create());
+}
+
+////////////////////////////////////////////////////////////////////////////////
+// RemoteConsumer methods
+////////////////////////////////////////////////////////////////////////////////
+
+ConsumerIPCService::RemoteConsumer::RemoteConsumer() = default;
+ConsumerIPCService::RemoteConsumer::~RemoteConsumer() = default;
+
+// Invoked by the |core_service_| business logic after the ConnectConsumer()
+// call. There is nothing to do here, we really expected the ConnectConsumer()
+// to just work in the local case.
+void ConsumerIPCService::RemoteConsumer::OnConnect() {}
+
+// Invoked by the |core_service_| business logic after we destroy the
+// |service_endpoint| (in the RemoteConsumer dtor).
+void ConsumerIPCService::RemoteConsumer::OnDisconnect() {}
+
+void ConsumerIPCService::RemoteConsumer::OnTraceData(
+ const std::vector<TracePacket>& trace_packets,
+ bool has_more) {
+ if (!read_buffers_response.IsBound())
+ return;
+
+ auto result = ipc::AsyncResult<ReadBuffersResponse>::Create();
+ result.set_has_more(has_more);
+ // TODO(primiano): Expose the chunks to the Consumer rather than stitching
+ // them and wasting cpu time to hide this detail.
+ for (const TracePacket& trace_packet : trace_packets) {
+ std::string* dst = result->add_trace_packets();
+ dst->reserve(trace_packet.size());
+ for (const Chunk& chunk : trace_packet)
+ dst->append(reinterpret_cast<const char*>(chunk.start), chunk.size);
+ }
+ read_buffers_response.Resolve(std::move(result));
+}
+
+} // namespace perfetto
diff --git a/src/tracing/ipc/service/consumer_ipc_service.h b/src/tracing/ipc/service/consumer_ipc_service.h
new file mode 100644
index 0000000..ad6e203
--- /dev/null
+++ b/src/tracing/ipc/service/consumer_ipc_service.h
@@ -0,0 +1,100 @@
+/*
+ * Copyright (C) 2017 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef SRC_TRACING_IPC_SERVICE_CONSUMER_IPC_SERVICE_H_
+#define SRC_TRACING_IPC_SERVICE_CONSUMER_IPC_SERVICE_H_
+
+#include <map>
+#include <memory>
+#include <string>
+
+#include "perfetto/base/weak_ptr.h"
+#include "perfetto/ipc/basic_types.h"
+#include "perfetto/tracing/core/consumer.h"
+#include "perfetto/tracing/core/service.h"
+
+#include "protos/tracing_service/consumer_port.ipc.h"
+
+namespace perfetto {
+
+namespace ipc {
+class Host;
+} // namespace ipc
+
+// Implements the Consumer port of the IPC service. This class proxies requests
+// and responses between the core service logic (|svc_|) and remote Consumer(s)
+// on the IPC socket, through the methods overriddden from ConsumerPort.
+class ConsumerIPCService : public ConsumerPort /* from consumer_port.proto */ {
+ public:
+ using Service = ::perfetto::Service; // To avoid collisions w/ ipc::Service.
+ explicit ConsumerIPCService(Service* core_service);
+ ~ConsumerIPCService() override;
+
+ // ConsumerPort implementation (from .proto IPC definition).
+ void EnableTracing(const EnableTracingRequest&,
+ DeferredEnableTracingResponse) override;
+ void DisableTracing(const DisableTracingRequest&,
+ DeferredDisableTracingResponse) override;
+ void ReadBuffers(const ReadBuffersRequest&,
+ DeferredReadBuffersResponse) override;
+ void FreeBuffers(const FreeBuffersRequest&,
+ DeferredFreeBuffersResponse) override;
+ void OnClientDisconnected() override;
+
+ private:
+ // Acts like a Consumer with the core Service business logic (which doesn't
+ // know anything about the remote transport), but all it does is proxying
+ // methods to the remote Consumer on the other side of the IPC channel.
+ class RemoteConsumer : public Consumer {
+ public:
+ RemoteConsumer();
+ ~RemoteConsumer() override;
+
+ // These methods are called by the |core_service_| business logic. There is
+ // no connection here, these methods are posted straight away.
+ void OnConnect() override;
+ void OnDisconnect() override;
+ void OnTraceData(const std::vector<TracePacket>&, bool has_more) override;
+
+ // The interface obtained from the core service business logic through
+ // Service::ConnectConsumer(this). This allows to invoke methods for a
+ // specific Consumer on the Service business logic.
+ std::unique_ptr<Service::ConsumerEndpoint> service_endpoint;
+
+ // After DisableTracing() is invoked, this binds the async callback that
+ // allows to stream trace packets back to the client.
+ DeferredReadBuffersResponse read_buffers_response;
+ };
+
+ ConsumerIPCService(const ConsumerIPCService&) = delete;
+ ConsumerIPCService& operator=(const ConsumerIPCService&) = delete;
+
+ // Returns the ConsumerEndpoint in the core business logic that corresponds to
+ // the current IPC request.
+ RemoteConsumer* GetConsumerForCurrentRequest();
+
+ Service* const core_service_;
+
+ // Maps IPC clients to ConsumerEndpoint instances registered on the
+ // |core_service_| business logic.
+ std::map<ipc::ClientID, std::unique_ptr<RemoteConsumer>> consumers_;
+
+ base::WeakPtrFactory<ConsumerIPCService> weak_ptr_factory_;
+};
+
+} // namespace perfetto
+
+#endif // SRC_TRACING_IPC_SERVICE_CONSUMER_IPC_SERVICE_H_
diff --git a/src/tracing/ipc/service/producer_ipc_service.cc b/src/tracing/ipc/service/producer_ipc_service.cc
index 46bd166..ce9ad1a 100644
--- a/src/tracing/ipc/service/producer_ipc_service.cc
+++ b/src/tracing/ipc/service/producer_ipc_service.cc
@@ -231,11 +231,7 @@
auto cmd = ipc::AsyncResult<GetAsyncCommandResponse>::Create();
cmd.set_has_more(true);
cmd->mutable_start_data_source()->set_new_instance_id(dsid);
-
- // Keep this in sync with data_source_config.proto.
- cmd->mutable_start_data_source()
- ->mutable_config()
- ->set_trace_category_filters(cfg.trace_category_filters());
+ cfg.ToProto(cmd->mutable_start_data_source()->mutable_config());
async_producer_commands.Resolve(std::move(cmd));
}
diff --git a/src/tracing/ipc/service/service_ipc_host_impl.cc b/src/tracing/ipc/service/service_ipc_host_impl.cc
index 218bb4d..443a516 100644
--- a/src/tracing/ipc/service/service_ipc_host_impl.cc
+++ b/src/tracing/ipc/service/service_ipc_host_impl.cc
@@ -21,6 +21,7 @@
#include "perfetto/ipc/host.h"
#include "perfetto/tracing/core/service.h"
#include "src/tracing/ipc/posix_shared_memory.h"
+#include "src/tracing/ipc/service/consumer_ipc_service.h"
#include "src/tracing/ipc/service/producer_ipc_service.h"
namespace perfetto {
@@ -39,7 +40,8 @@
ServiceIPCHostImpl::~ServiceIPCHostImpl() {}
-bool ServiceIPCHostImpl::Start(const char* producer_socket_name) {
+bool ServiceIPCHostImpl::Start(const char* producer_socket_name,
+ const char* consumer_socket_name) {
PERFETTO_CHECK(!svc_); // Check if already started.
// Create and initialize the platform-independent tracing business logic.
@@ -50,14 +52,28 @@
// Initialize the IPC transport.
producer_ipc_port_ =
ipc::Host::CreateInstance(producer_socket_name, task_runner_);
- if (!producer_ipc_port_)
+ if (!producer_ipc_port_) {
+ Shutdown();
return false;
+ }
+
+ consumer_ipc_port_ =
+ ipc::Host::CreateInstance(consumer_socket_name, task_runner_);
+ if (!producer_ipc_port_) {
+ Shutdown();
+ return false;
+ }
// TODO: add a test that destroyes the ServiceIPCHostImpl soon after Start()
// and checks that no spurious callbacks are issued.
bool producer_service_exposed = producer_ipc_port_->ExposeService(
std::unique_ptr<ipc::Service>(new ProducerIPCService(svc_.get())));
PERFETTO_CHECK(producer_service_exposed);
+
+ bool consumer_service_exposed = consumer_ipc_port_->ExposeService(
+ std::unique_ptr<ipc::Service>(new ConsumerIPCService(svc_.get())));
+ PERFETTO_CHECK(consumer_service_exposed);
+
return true;
}
@@ -65,6 +81,14 @@
return svc_.get();
}
+void ServiceIPCHostImpl::Shutdown() {
+ // TODO(primiano): add a test that causes the Shutdown() and checks that no
+ // spurious callbacks are issued.
+ producer_ipc_port_.reset();
+ consumer_ipc_port_.reset();
+ svc_.reset();
+}
+
// Definitions for the base class ctor/dtor.
ServiceIPCHost::ServiceIPCHost() = default;
ServiceIPCHost::~ServiceIPCHost() = default;
diff --git a/src/tracing/ipc/service/service_ipc_host_impl.h b/src/tracing/ipc/service/service_ipc_host_impl.h
index 6dfc551..d584c28 100644
--- a/src/tracing/ipc/service/service_ipc_host_impl.h
+++ b/src/tracing/ipc/service/service_ipc_host_impl.h
@@ -37,17 +37,23 @@
~ServiceIPCHostImpl() override;
// ServiceIPCHost implementation.
- bool Start(const char* producer_socket_name) override;
+ bool Start(const char* producer_socket_name,
+ const char* consumer_socket_name) override;
Service* service_for_testing() const;
private:
+ void Shutdown();
+
base::TaskRunner* const task_runner_;
std::unique_ptr<Service> svc_; // The service business logic.
// The IPC host that listens on the Producer socket. It owns the
// PosixServiceProducerPort instance which deals with all producers' IPC(s).
std::unique_ptr<ipc::Host> producer_ipc_port_;
+
+ // As above, but for the Consumer port.
+ std::unique_ptr<ipc::Host> consumer_ipc_port_;
};
} // namespace perfetto