| /* |
| * Copyright (C) 2017 The Android Open Source Project |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #include "src/tracing/ipc/producer/producer_ipc_client_impl.h" |
| |
| #include <cinttypes> |
| |
| #include <string.h> |
| |
| #include "perfetto/base/logging.h" |
| #include "perfetto/base/task_runner.h" |
| #include "perfetto/ext/base/unix_socket.h" |
| #include "perfetto/ext/base/version.h" |
| #include "perfetto/ext/ipc/client.h" |
| #include "perfetto/ext/tracing/core/commit_data_request.h" |
| #include "perfetto/ext/tracing/core/producer.h" |
| #include "perfetto/ext/tracing/core/shared_memory_abi.h" |
| #include "perfetto/ext/tracing/core/shared_memory_arbiter.h" |
| #include "perfetto/ext/tracing/core/trace_writer.h" |
| #include "perfetto/tracing/core/data_source_config.h" |
| #include "perfetto/tracing/core/data_source_descriptor.h" |
| #include "perfetto/tracing/core/trace_config.h" |
| #include "src/tracing/core/in_process_shared_memory.h" |
| |
| #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN) |
| #include "src/tracing/ipc/shared_memory_windows.h" |
| #else |
| #include "src/tracing/ipc/posix_shared_memory.h" |
| #endif |
| |
| // TODO(fmayer): think to what happens when ProducerIPCClientImpl gets destroyed |
| // w.r.t. the Producer pointer. Also think to lifetime of the Producer* during |
| // the callbacks. |
| |
| namespace perfetto { |
| |
| // static. (Declared in include/tracing/ipc/producer_ipc_client.h). |
| std::unique_ptr<TracingService::ProducerEndpoint> ProducerIPCClient::Connect( |
| const char* service_sock_name, |
| Producer* producer, |
| const std::string& producer_name, |
| base::TaskRunner* task_runner, |
| TracingService::ProducerSMBScrapingMode smb_scraping_mode, |
| size_t shared_memory_size_hint_bytes, |
| size_t shared_memory_page_size_hint_bytes, |
| std::unique_ptr<SharedMemory> shm, |
| std::unique_ptr<SharedMemoryArbiter> shm_arbiter, |
| ConnectionFlags conn_flags) { |
| return std::unique_ptr<TracingService::ProducerEndpoint>( |
| new ProducerIPCClientImpl( |
| {service_sock_name, |
| conn_flags == |
| ProducerIPCClient::ConnectionFlags::kRetryIfUnreachable}, |
| producer, producer_name, task_runner, smb_scraping_mode, |
| shared_memory_size_hint_bytes, shared_memory_page_size_hint_bytes, |
| std::move(shm), std::move(shm_arbiter), nullptr)); |
| } |
| |
| // static. (Declared in include/tracing/ipc/producer_ipc_client.h). |
| std::unique_ptr<TracingService::ProducerEndpoint> ProducerIPCClient::Connect( |
| ipc::Client::ConnArgs conn_args, |
| Producer* producer, |
| const std::string& producer_name, |
| base::TaskRunner* task_runner, |
| TracingService::ProducerSMBScrapingMode smb_scraping_mode, |
| size_t shared_memory_size_hint_bytes, |
| size_t shared_memory_page_size_hint_bytes, |
| std::unique_ptr<SharedMemory> shm, |
| std::unique_ptr<SharedMemoryArbiter> shm_arbiter, |
| CreateSocketAsync create_socket_async) { |
| return std::unique_ptr<TracingService::ProducerEndpoint>( |
| new ProducerIPCClientImpl( |
| std::move(conn_args), producer, producer_name, task_runner, |
| smb_scraping_mode, shared_memory_size_hint_bytes, |
| shared_memory_page_size_hint_bytes, std::move(shm), |
| std::move(shm_arbiter), create_socket_async)); |
| } |
| |
| ProducerIPCClientImpl::ProducerIPCClientImpl( |
| ipc::Client::ConnArgs conn_args, |
| Producer* producer, |
| const std::string& producer_name, |
| base::TaskRunner* task_runner, |
| TracingService::ProducerSMBScrapingMode smb_scraping_mode, |
| size_t shared_memory_size_hint_bytes, |
| size_t shared_memory_page_size_hint_bytes, |
| std::unique_ptr<SharedMemory> shm, |
| std::unique_ptr<SharedMemoryArbiter> shm_arbiter, |
| CreateSocketAsync create_socket_async) |
| : producer_(producer), |
| task_runner_(task_runner), |
| receive_shmem_fd_cb_fuchsia_( |
| std::move(conn_args.receive_shmem_fd_cb_fuchsia)), |
| producer_port_( |
| new protos::gen::ProducerPortProxy(this /* event_listener */)), |
| shared_memory_(std::move(shm)), |
| shared_memory_arbiter_(std::move(shm_arbiter)), |
| name_(producer_name), |
| shared_memory_page_size_hint_bytes_(shared_memory_page_size_hint_bytes), |
| shared_memory_size_hint_bytes_(shared_memory_size_hint_bytes), |
| smb_scraping_mode_(smb_scraping_mode) { |
| // Check for producer-provided SMB (used by Chrome for startup tracing). |
| if (shared_memory_) { |
| // We also expect a valid (unbound) arbiter. Bind it to this endpoint now. |
| PERFETTO_CHECK(shared_memory_arbiter_); |
| shared_memory_arbiter_->BindToProducerEndpoint(this, task_runner_); |
| |
| // If the service accepts our SMB, then it must match our requested page |
| // layout. The protocol doesn't allow the service to change the size and |
| // layout when the SMB is provided by the producer. |
| shared_buffer_page_size_kb_ = shared_memory_page_size_hint_bytes_ / 1024; |
| } |
| |
| if (create_socket_async) { |
| PERFETTO_DCHECK(conn_args.socket_name); |
| auto weak_this = weak_factory_.GetWeakPtr(); |
| create_socket_async( |
| [weak_this, task_runner = task_runner_](base::SocketHandle fd) { |
| task_runner->PostTask([weak_this, fd] { |
| base::ScopedSocketHandle handle(fd); |
| if (!weak_this) { |
| return; |
| } |
| ipc::Client::ConnArgs args(std::move(handle)); |
| weak_this->ipc_channel_ = ipc::Client::CreateInstance( |
| std::move(args), weak_this->task_runner_); |
| weak_this->ipc_channel_->BindService( |
| weak_this->producer_port_->GetWeakPtr()); |
| }); |
| }); |
| } else { |
| ipc_channel_ = |
| ipc::Client::CreateInstance(std::move(conn_args), task_runner); |
| ipc_channel_->BindService(producer_port_->GetWeakPtr()); |
| } |
| PERFETTO_DCHECK_THREAD(thread_checker_); |
| } |
| |
| ProducerIPCClientImpl::~ProducerIPCClientImpl() { |
| PERFETTO_DCHECK_THREAD(thread_checker_); |
| } |
| |
| void ProducerIPCClientImpl::Disconnect() { |
| PERFETTO_DCHECK_THREAD(thread_checker_); |
| if (!producer_port_) |
| return; |
| // Reset the producer port so that no further IPCs are received and IPC |
| // callbacks are no longer executed. Also reset the IPC channel so that the |
| // service is notified of the disconnection. |
| producer_port_.reset(); |
| ipc_channel_.reset(); |
| // Perform disconnect synchronously. |
| OnDisconnect(); |
| } |
| |
| // Called by the IPC layer if the BindService() succeeds. |
| void ProducerIPCClientImpl::OnConnect() { |
| PERFETTO_DCHECK_THREAD(thread_checker_); |
| connected_ = true; |
| |
| // The IPC layer guarantees that any outstanding callback will be dropped on |
| // the floor if producer_port_ is destroyed between the request and the reply. |
| // Binding |this| is hence safe. |
| ipc::Deferred<protos::gen::InitializeConnectionResponse> on_init; |
| on_init.Bind( |
| [this](ipc::AsyncResult<protos::gen::InitializeConnectionResponse> resp) { |
| OnConnectionInitialized( |
| resp.success(), |
| resp.success() ? resp->using_shmem_provided_by_producer() : false, |
| resp.success() ? resp->direct_smb_patching_supported() : false, |
| resp.success() ? resp->use_shmem_emulation() : false); |
| }); |
| protos::gen::InitializeConnectionRequest req; |
| req.set_producer_name(name_); |
| req.set_shared_memory_size_hint_bytes( |
| static_cast<uint32_t>(shared_memory_size_hint_bytes_)); |
| req.set_shared_memory_page_size_hint_bytes( |
| static_cast<uint32_t>(shared_memory_page_size_hint_bytes_)); |
| switch (smb_scraping_mode_) { |
| case TracingService::ProducerSMBScrapingMode::kDefault: |
| // No need to set the mode, it defaults to use the service default if |
| // unspecified. |
| break; |
| case TracingService::ProducerSMBScrapingMode::kEnabled: |
| req.set_smb_scraping_mode( |
| protos::gen::InitializeConnectionRequest::SMB_SCRAPING_ENABLED); |
| break; |
| case TracingService::ProducerSMBScrapingMode::kDisabled: |
| req.set_smb_scraping_mode( |
| protos::gen::InitializeConnectionRequest::SMB_SCRAPING_DISABLED); |
| break; |
| } |
| |
| int shm_fd = -1; |
| if (shared_memory_) { |
| req.set_producer_provided_shmem(true); |
| #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN) |
| auto key = static_cast<SharedMemoryWindows*>(shared_memory_.get())->key(); |
| req.set_shm_key_windows(key); |
| #else |
| shm_fd = static_cast<PosixSharedMemory*>(shared_memory_.get())->fd(); |
| #endif |
| } |
| |
| req.set_sdk_version(base::GetVersionString()); |
| producer_port_->InitializeConnection(req, std::move(on_init), shm_fd); |
| |
| // Create the back channel to receive commands from the Service. |
| ipc::Deferred<protos::gen::GetAsyncCommandResponse> on_cmd; |
| on_cmd.Bind( |
| [this](ipc::AsyncResult<protos::gen::GetAsyncCommandResponse> resp) { |
| if (!resp) |
| return; // The IPC channel was closed and |resp| was auto-rejected. |
| OnServiceRequest(*resp); |
| }); |
| producer_port_->GetAsyncCommand(protos::gen::GetAsyncCommandRequest(), |
| std::move(on_cmd)); |
| |
| // If there are pending Sync() requests, send them now. |
| for (auto& pending_sync : pending_sync_reqs_) |
| Sync(std::move(pending_sync)); |
| pending_sync_reqs_.clear(); |
| } |
| |
| void ProducerIPCClientImpl::OnDisconnect() { |
| PERFETTO_DCHECK_THREAD(thread_checker_); |
| PERFETTO_DLOG("Tracing service connection failure"); |
| connected_ = false; |
| data_sources_setup_.clear(); |
| producer_->OnDisconnect(); // Note: may delete |this|. |
| } |
| |
| void ProducerIPCClientImpl::ScheduleDisconnect() { |
| // |ipc_channel| doesn't allow disconnection in the middle of handling |
| // an IPC call, so the connection drop must take place over two phases. |
| |
| // First, synchronously drop the |producer_port_| so that no more IPC |
| // messages are handled. |
| producer_port_.reset(); |
| |
| // Then schedule an async task for performing the remainder of the |
| // disconnection operations outside the context of the IPC method handler. |
| auto weak_this = weak_factory_.GetWeakPtr(); |
| task_runner_->PostTask([weak_this]() { |
| if (weak_this) { |
| weak_this->Disconnect(); |
| } |
| }); |
| } |
| |
| void ProducerIPCClientImpl::OnConnectionInitialized( |
| bool connection_succeeded, |
| bool using_shmem_provided_by_producer, |
| bool direct_smb_patching_supported, |
| bool use_shmem_emulation) { |
| PERFETTO_DCHECK_THREAD(thread_checker_); |
| // If connection_succeeded == false, the OnDisconnect() call will follow next |
| // and there we'll notify the |producer_|. TODO: add a test for this. |
| if (!connection_succeeded) |
| return; |
| is_shmem_provided_by_producer_ = using_shmem_provided_by_producer; |
| direct_smb_patching_supported_ = direct_smb_patching_supported; |
| // The tracing service may reject using shared memory and tell the client to |
| // commit data over the socket. This can happen when the client connects to |
| // the service via a relay service: |
| // client <-Unix socket-> relay service <- vsock -> tracing service. |
| use_shmem_emulation_ = use_shmem_emulation; |
| producer_->OnConnect(); |
| |
| // Bail out if the service failed to adopt our producer-allocated SMB. |
| // TODO(eseckler): Handle adoption failure more gracefully. |
| if (shared_memory_ && !is_shmem_provided_by_producer_) { |
| PERFETTO_DLOG("Service failed adopt producer-provided SMB, disconnecting."); |
| Disconnect(); |
| return; |
| } |
| } |
| |
| void ProducerIPCClientImpl::OnServiceRequest( |
| const protos::gen::GetAsyncCommandResponse& cmd) { |
| PERFETTO_DCHECK_THREAD(thread_checker_); |
| |
| // This message is sent only when connecting to a service running Android Q+. |
| // See comment below in kStartDataSource. |
| if (cmd.has_setup_data_source()) { |
| const auto& req = cmd.setup_data_source(); |
| const DataSourceInstanceID dsid = req.new_instance_id(); |
| data_sources_setup_.insert(dsid); |
| producer_->SetupDataSource(dsid, req.config()); |
| return; |
| } |
| |
| if (cmd.has_start_data_source()) { |
| const auto& req = cmd.start_data_source(); |
| const DataSourceInstanceID dsid = req.new_instance_id(); |
| const DataSourceConfig& cfg = req.config(); |
| if (!data_sources_setup_.count(dsid)) { |
| // When connecting with an older (Android P) service, the service will not |
| // send a SetupDataSource message. We synthesize it here in that case. |
| producer_->SetupDataSource(dsid, cfg); |
| } |
| producer_->StartDataSource(dsid, cfg); |
| return; |
| } |
| |
| if (cmd.has_stop_data_source()) { |
| const DataSourceInstanceID dsid = cmd.stop_data_source().instance_id(); |
| producer_->StopDataSource(dsid); |
| data_sources_setup_.erase(dsid); |
| return; |
| } |
| |
| if (cmd.has_setup_tracing()) { |
| std::unique_ptr<SharedMemory> ipc_shared_memory; |
| #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN) |
| const std::string& shm_key = cmd.setup_tracing().shm_key_windows(); |
| if (!shm_key.empty()) |
| ipc_shared_memory = SharedMemoryWindows::Attach(shm_key); |
| #elif PERFETTO_BUILDFLAG(PERFETTO_OS_FUCHSIA) |
| // On Fuchsia, the embedder is responsible for routing the shared memory |
| // FD, which is provided to this code via a blocking callback. |
| PERFETTO_CHECK(receive_shmem_fd_cb_fuchsia_); |
| |
| base::ScopedFile shmem_fd(receive_shmem_fd_cb_fuchsia_()); |
| if (!shmem_fd) { |
| // Failure to get a shared memory buffer is a protocol violation and |
| // therefore we should drop the Protocol connection. |
| PERFETTO_ELOG("Could not get shared memory FD from embedder."); |
| ScheduleDisconnect(); |
| return; |
| } |
| |
| ipc_shared_memory = |
| PosixSharedMemory::AttachToFd(std::move(shmem_fd), |
| /*require_seals_if_supported=*/false); |
| #else |
| base::ScopedFile shmem_fd = ipc_channel_->TakeReceivedFD(); |
| if (shmem_fd) { |
| // TODO(primiano): handle mmap failure in case of OOM. |
| ipc_shared_memory = |
| PosixSharedMemory::AttachToFd(std::move(shmem_fd), |
| /*require_seals_if_supported=*/false); |
| } |
| #endif |
| if (use_shmem_emulation_) { |
| PERFETTO_CHECK(!ipc_shared_memory); |
| // Need to create an emulated shmem buffer when the transport deosn't |
| // support it. |
| // TODO(chinglinyu): Let the tracing service decide on the shmem size and |
| // propagate the size in InitializeConnectionResponse. |
| ipc_shared_memory = InProcessSharedMemory::Create( |
| /*size=*/InProcessSharedMemory::kDefaultSize); |
| } |
| if (ipc_shared_memory) { |
| auto shmem_mode = use_shmem_emulation_ |
| ? SharedMemoryABI::ShmemMode::kShmemEmulation |
| : SharedMemoryABI::ShmemMode::kDefault; |
| // This is the nominal case used in most configurations, where the service |
| // provides the SMB. |
| PERFETTO_CHECK(!is_shmem_provided_by_producer_ && !shared_memory_); |
| shared_memory_ = std::move(ipc_shared_memory); |
| shared_buffer_page_size_kb_ = |
| cmd.setup_tracing().shared_buffer_page_size_kb(); |
| shared_memory_arbiter_ = SharedMemoryArbiter::CreateInstance( |
| shared_memory_.get(), shared_buffer_page_size_kb_ * 1024, shmem_mode, |
| this, task_runner_); |
| if (direct_smb_patching_supported_) |
| shared_memory_arbiter_->SetDirectSMBPatchingSupportedByService(); |
| } else { |
| // Producer-provided SMB (used by Chrome for startup tracing). |
| PERFETTO_CHECK(is_shmem_provided_by_producer_ && shared_memory_ && |
| shared_memory_arbiter_); |
| } |
| producer_->OnTracingSetup(); |
| return; |
| } |
| |
| if (cmd.has_flush()) { |
| // This cast boilerplate is required only because protobuf uses its own |
| // uint64 and not stdint's uint64_t. On some 64 bit archs they differ on the |
| // type (long vs long long) even though they have the same size. |
| const auto* data_source_ids = cmd.flush().data_source_ids().data(); |
| static_assert(sizeof(data_source_ids[0]) == sizeof(DataSourceInstanceID), |
| "data_source_ids should be 64-bit"); |
| |
| FlushFlags flags(cmd.flush().flags()); |
| producer_->Flush( |
| cmd.flush().request_id(), |
| reinterpret_cast<const DataSourceInstanceID*>(data_source_ids), |
| static_cast<size_t>(cmd.flush().data_source_ids().size()), flags); |
| return; |
| } |
| |
| if (cmd.has_clear_incremental_state()) { |
| const auto* data_source_ids = |
| cmd.clear_incremental_state().data_source_ids().data(); |
| static_assert(sizeof(data_source_ids[0]) == sizeof(DataSourceInstanceID), |
| "data_source_ids should be 64-bit"); |
| producer_->ClearIncrementalState( |
| reinterpret_cast<const DataSourceInstanceID*>(data_source_ids), |
| static_cast<size_t>( |
| cmd.clear_incremental_state().data_source_ids().size())); |
| return; |
| } |
| |
| PERFETTO_DFATAL("Unknown async request received from tracing service"); |
| } |
| |
| void ProducerIPCClientImpl::RegisterDataSource( |
| const DataSourceDescriptor& descriptor) { |
| PERFETTO_DCHECK_THREAD(thread_checker_); |
| if (!connected_) { |
| PERFETTO_DLOG( |
| "Cannot RegisterDataSource(), not connected to tracing service"); |
| } |
| protos::gen::RegisterDataSourceRequest req; |
| *req.mutable_data_source_descriptor() = descriptor; |
| ipc::Deferred<protos::gen::RegisterDataSourceResponse> async_response; |
| async_response.Bind( |
| [](ipc::AsyncResult<protos::gen::RegisterDataSourceResponse> response) { |
| if (!response) |
| PERFETTO_DLOG("RegisterDataSource() failed: connection reset"); |
| }); |
| producer_port_->RegisterDataSource(req, std::move(async_response)); |
| } |
| |
| void ProducerIPCClientImpl::UpdateDataSource( |
| const DataSourceDescriptor& descriptor) { |
| PERFETTO_DCHECK_THREAD(thread_checker_); |
| if (!connected_) { |
| PERFETTO_DLOG( |
| "Cannot UpdateDataSource(), not connected to tracing service"); |
| } |
| protos::gen::UpdateDataSourceRequest req; |
| *req.mutable_data_source_descriptor() = descriptor; |
| ipc::Deferred<protos::gen::UpdateDataSourceResponse> async_response; |
| async_response.Bind( |
| [](ipc::AsyncResult<protos::gen::UpdateDataSourceResponse> response) { |
| if (!response) |
| PERFETTO_DLOG("UpdateDataSource() failed: connection reset"); |
| }); |
| producer_port_->UpdateDataSource(req, std::move(async_response)); |
| } |
| |
| void ProducerIPCClientImpl::UnregisterDataSource(const std::string& name) { |
| PERFETTO_DCHECK_THREAD(thread_checker_); |
| if (!connected_) { |
| PERFETTO_DLOG( |
| "Cannot UnregisterDataSource(), not connected to tracing service"); |
| return; |
| } |
| protos::gen::UnregisterDataSourceRequest req; |
| req.set_data_source_name(name); |
| producer_port_->UnregisterDataSource( |
| req, ipc::Deferred<protos::gen::UnregisterDataSourceResponse>()); |
| } |
| |
| void ProducerIPCClientImpl::RegisterTraceWriter(uint32_t writer_id, |
| uint32_t target_buffer) { |
| PERFETTO_DCHECK_THREAD(thread_checker_); |
| if (!connected_) { |
| PERFETTO_DLOG( |
| "Cannot RegisterTraceWriter(), not connected to tracing service"); |
| return; |
| } |
| protos::gen::RegisterTraceWriterRequest req; |
| req.set_trace_writer_id(writer_id); |
| req.set_target_buffer(target_buffer); |
| producer_port_->RegisterTraceWriter( |
| req, ipc::Deferred<protos::gen::RegisterTraceWriterResponse>()); |
| } |
| |
| void ProducerIPCClientImpl::UnregisterTraceWriter(uint32_t writer_id) { |
| PERFETTO_DCHECK_THREAD(thread_checker_); |
| if (!connected_) { |
| PERFETTO_DLOG( |
| "Cannot UnregisterTraceWriter(), not connected to tracing service"); |
| return; |
| } |
| protos::gen::UnregisterTraceWriterRequest req; |
| req.set_trace_writer_id(writer_id); |
| producer_port_->UnregisterTraceWriter( |
| req, ipc::Deferred<protos::gen::UnregisterTraceWriterResponse>()); |
| } |
| |
| void ProducerIPCClientImpl::CommitData(const CommitDataRequest& req, |
| CommitDataCallback callback) { |
| PERFETTO_DCHECK_THREAD(thread_checker_); |
| if (!connected_) { |
| PERFETTO_DLOG("Cannot CommitData(), not connected to tracing service"); |
| return; |
| } |
| ipc::Deferred<protos::gen::CommitDataResponse> async_response; |
| // TODO(primiano): add a test that destroys ProducerIPCClientImpl soon after |
| // this call and checks that the callback is dropped. |
| if (callback) { |
| async_response.Bind( |
| [callback](ipc::AsyncResult<protos::gen::CommitDataResponse> response) { |
| if (!response) { |
| PERFETTO_DLOG("CommitData() failed: connection reset"); |
| return; |
| } |
| callback(); |
| }); |
| } |
| producer_port_->CommitData(req, std::move(async_response)); |
| } |
| |
| void ProducerIPCClientImpl::NotifyDataSourceStarted(DataSourceInstanceID id) { |
| PERFETTO_DCHECK_THREAD(thread_checker_); |
| if (!connected_) { |
| PERFETTO_DLOG( |
| "Cannot NotifyDataSourceStarted(), not connected to tracing service"); |
| return; |
| } |
| protos::gen::NotifyDataSourceStartedRequest req; |
| req.set_data_source_id(id); |
| producer_port_->NotifyDataSourceStarted( |
| req, ipc::Deferred<protos::gen::NotifyDataSourceStartedResponse>()); |
| } |
| |
| void ProducerIPCClientImpl::NotifyDataSourceStopped(DataSourceInstanceID id) { |
| PERFETTO_DCHECK_THREAD(thread_checker_); |
| if (!connected_) { |
| PERFETTO_DLOG( |
| "Cannot NotifyDataSourceStopped(), not connected to tracing service"); |
| return; |
| } |
| protos::gen::NotifyDataSourceStoppedRequest req; |
| req.set_data_source_id(id); |
| producer_port_->NotifyDataSourceStopped( |
| req, ipc::Deferred<protos::gen::NotifyDataSourceStoppedResponse>()); |
| } |
| |
| void ProducerIPCClientImpl::ActivateTriggers( |
| const std::vector<std::string>& triggers) { |
| PERFETTO_DCHECK_THREAD(thread_checker_); |
| if (!connected_) { |
| PERFETTO_DLOG( |
| "Cannot ActivateTriggers(), not connected to tracing service"); |
| return; |
| } |
| protos::gen::ActivateTriggersRequest proto_req; |
| for (const auto& name : triggers) { |
| *proto_req.add_trigger_names() = name; |
| } |
| producer_port_->ActivateTriggers( |
| proto_req, ipc::Deferred<protos::gen::ActivateTriggersResponse>()); |
| } |
| |
| void ProducerIPCClientImpl::Sync(std::function<void()> callback) { |
| PERFETTO_DCHECK_THREAD(thread_checker_); |
| if (!connected_) { |
| pending_sync_reqs_.emplace_back(std::move(callback)); |
| return; |
| } |
| ipc::Deferred<protos::gen::SyncResponse> resp; |
| resp.Bind([callback](ipc::AsyncResult<protos::gen::SyncResponse>) { |
| // Here we ACK the callback even if the service replies with a failure |
| // (i.e. the service is too old and doesn't understand Sync()). In that |
| // case the service has still seen the request, the IPC roundtrip is |
| // still a (weaker) linearization fence. |
| callback(); |
| }); |
| producer_port_->Sync(protos::gen::SyncRequest(), std::move(resp)); |
| } |
| |
| std::unique_ptr<TraceWriter> ProducerIPCClientImpl::CreateTraceWriter( |
| BufferID target_buffer, |
| BufferExhaustedPolicy buffer_exhausted_policy) { |
| // This method can be called by different threads. |shared_memory_arbiter_| is |
| // thread-safe but be aware of accessing any other state in this function. |
| return shared_memory_arbiter_->CreateTraceWriter(target_buffer, |
| buffer_exhausted_policy); |
| } |
| |
| SharedMemoryArbiter* ProducerIPCClientImpl::MaybeSharedMemoryArbiter() { |
| return shared_memory_arbiter_.get(); |
| } |
| |
| bool ProducerIPCClientImpl::IsShmemProvidedByProducer() const { |
| return is_shmem_provided_by_producer_; |
| } |
| |
| void ProducerIPCClientImpl::NotifyFlushComplete(FlushRequestID req_id) { |
| return shared_memory_arbiter_->NotifyFlushComplete(req_id); |
| } |
| |
| SharedMemory* ProducerIPCClientImpl::shared_memory() const { |
| return shared_memory_.get(); |
| } |
| |
| size_t ProducerIPCClientImpl::shared_buffer_page_size_kb() const { |
| return shared_buffer_page_size_kb_; |
| } |
| |
| } // namespace perfetto |