| /* |
| * Copyright (C) 2017 The Android Open Source Project |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #include "src/tracing/ipc/service/consumer_ipc_service.h" |
| |
| #include <inttypes.h> |
| |
| #include "perfetto/base/logging.h" |
| #include "perfetto/base/task_runner.h" |
| #include "perfetto/ext/base/scoped_file.h" |
| #include "perfetto/ext/ipc/basic_types.h" |
| #include "perfetto/ext/ipc/host.h" |
| #include "perfetto/ext/tracing/core/shared_memory_abi.h" |
| #include "perfetto/ext/tracing/core/slice.h" |
| #include "perfetto/ext/tracing/core/trace_config.h" |
| #include "perfetto/ext/tracing/core/trace_packet.h" |
| #include "perfetto/ext/tracing/core/trace_stats.h" |
| #include "perfetto/ext/tracing/core/tracing_service.h" |
| #include "perfetto/ext/tracing/core/tracing_service_state.h" |
| |
| namespace perfetto { |
| |
| ConsumerIPCService::ConsumerIPCService(TracingService* core_service) |
| : core_service_(core_service), weak_ptr_factory_(this) {} |
| |
| ConsumerIPCService::~ConsumerIPCService() = default; |
| |
| ConsumerIPCService::RemoteConsumer* |
| ConsumerIPCService::GetConsumerForCurrentRequest() { |
| const ipc::ClientID ipc_client_id = ipc::Service::client_info().client_id(); |
| const uid_t uid = ipc::Service::client_info().uid(); |
| PERFETTO_CHECK(ipc_client_id); |
| auto it = consumers_.find(ipc_client_id); |
| if (it == consumers_.end()) { |
| auto* remote_consumer = new RemoteConsumer(); |
| consumers_[ipc_client_id].reset(remote_consumer); |
| remote_consumer->service_endpoint = |
| core_service_->ConnectConsumer(remote_consumer, uid); |
| return remote_consumer; |
| } |
| return it->second.get(); |
| } |
| |
| // Called by the IPC layer. |
| void ConsumerIPCService::OnClientDisconnected() { |
| ipc::ClientID client_id = ipc::Service::client_info().client_id(); |
| consumers_.erase(client_id); |
| } |
| |
| // Called by the IPC layer. |
| void ConsumerIPCService::EnableTracing(const protos::EnableTracingRequest& req, |
| DeferredEnableTracingResponse resp) { |
| RemoteConsumer* remote_consumer = GetConsumerForCurrentRequest(); |
| if (req.attach_notification_only()) { |
| remote_consumer->enable_tracing_response = std::move(resp); |
| return; |
| } |
| TraceConfig trace_config; |
| trace_config.FromProto(req.trace_config()); |
| base::ScopedFile fd; |
| if (trace_config.write_into_file()) |
| fd = ipc::Service::TakeReceivedFD(); |
| remote_consumer->service_endpoint->EnableTracing(trace_config, std::move(fd)); |
| remote_consumer->enable_tracing_response = std::move(resp); |
| } |
| |
| // Called by the IPC layer. |
| void ConsumerIPCService::StartTracing(const protos::StartTracingRequest&, |
| DeferredStartTracingResponse resp) { |
| RemoteConsumer* remote_consumer = GetConsumerForCurrentRequest(); |
| remote_consumer->service_endpoint->StartTracing(); |
| resp.Resolve(ipc::AsyncResult<protos::StartTracingResponse>::Create()); |
| } |
| |
| // Called by the IPC layer. |
| void ConsumerIPCService::ChangeTraceConfig( |
| const protos::ChangeTraceConfigRequest& req, |
| DeferredChangeTraceConfigResponse resp) { |
| RemoteConsumer* remote_consumer = GetConsumerForCurrentRequest(); |
| TraceConfig trace_config; |
| trace_config.FromProto(req.trace_config()); |
| remote_consumer->service_endpoint->ChangeTraceConfig(trace_config); |
| resp.Resolve(ipc::AsyncResult<protos::ChangeTraceConfigResponse>::Create()); |
| } |
| |
| // Called by the IPC layer. |
| void ConsumerIPCService::DisableTracing(const protos::DisableTracingRequest&, |
| DeferredDisableTracingResponse resp) { |
| GetConsumerForCurrentRequest()->service_endpoint->DisableTracing(); |
| resp.Resolve(ipc::AsyncResult<protos::DisableTracingResponse>::Create()); |
| } |
| |
| // Called by the IPC layer. |
| void ConsumerIPCService::ReadBuffers(const protos::ReadBuffersRequest&, |
| DeferredReadBuffersResponse resp) { |
| RemoteConsumer* remote_consumer = GetConsumerForCurrentRequest(); |
| remote_consumer->read_buffers_response = std::move(resp); |
| remote_consumer->service_endpoint->ReadBuffers(); |
| } |
| |
| // Called by the IPC layer. |
| void ConsumerIPCService::FreeBuffers(const protos::FreeBuffersRequest&, |
| DeferredFreeBuffersResponse resp) { |
| GetConsumerForCurrentRequest()->service_endpoint->FreeBuffers(); |
| resp.Resolve(ipc::AsyncResult<protos::FreeBuffersResponse>::Create()); |
| } |
| |
| // Called by the IPC layer. |
| void ConsumerIPCService::Flush(const protos::FlushRequest& req, |
| DeferredFlushResponse resp) { |
| auto it = pending_flush_responses_.insert(pending_flush_responses_.end(), |
| std::move(resp)); |
| auto weak_this = weak_ptr_factory_.GetWeakPtr(); |
| auto callback = [weak_this, it](bool success) { |
| if (weak_this) |
| weak_this->OnFlushCallback(success, std::move(it)); |
| }; |
| GetConsumerForCurrentRequest()->service_endpoint->Flush(req.timeout_ms(), |
| std::move(callback)); |
| } |
| |
| // Called by the IPC layer. |
| void ConsumerIPCService::Detach(const protos::DetachRequest& req, |
| DeferredDetachResponse resp) { |
| // OnDetach() will resolve the |detach_response|. |
| RemoteConsumer* remote_consumer = GetConsumerForCurrentRequest(); |
| remote_consumer->detach_response = std::move(resp); |
| remote_consumer->service_endpoint->Detach(req.key()); |
| } |
| |
| // Called by the IPC layer. |
| void ConsumerIPCService::Attach(const protos::AttachRequest& req, |
| DeferredAttachResponse resp) { |
| // OnAttach() will resolve the |attach_response|. |
| RemoteConsumer* remote_consumer = GetConsumerForCurrentRequest(); |
| remote_consumer->attach_response = std::move(resp); |
| remote_consumer->service_endpoint->Attach(req.key()); |
| } |
| |
| // Called by the IPC layer. |
| void ConsumerIPCService::GetTraceStats(const protos::GetTraceStatsRequest&, |
| DeferredGetTraceStatsResponse resp) { |
| // OnTraceStats() will resolve the |get_trace_stats_response|. |
| RemoteConsumer* remote_consumer = GetConsumerForCurrentRequest(); |
| remote_consumer->get_trace_stats_response = std::move(resp); |
| remote_consumer->service_endpoint->GetTraceStats(); |
| } |
| |
| // Called by the IPC layer. |
| void ConsumerIPCService::ObserveEvents(const protos::ObserveEventsRequest& req, |
| DeferredObserveEventsResponse resp) { |
| RemoteConsumer* remote_consumer = GetConsumerForCurrentRequest(); |
| |
| // If there's a prior stream, close it so that client can clean it up. |
| remote_consumer->CloseObserveEventsResponseStream(); |
| |
| remote_consumer->observe_events_response = std::move(resp); |
| |
| bool observe_instances = false; |
| for (const auto& type : req.events_to_observe()) { |
| switch (type) { |
| case protos::ObservableEvents::TYPE_DATA_SOURCES_INSTANCES: |
| observe_instances = true; |
| break; |
| default: |
| PERFETTO_DFATAL("Unknown ObservableEvent type: %d", type); |
| break; |
| } |
| } |
| remote_consumer->service_endpoint->ObserveEvents(observe_instances); |
| |
| // If no events are to be observed, close the stream immediately so that the |
| // client can clean up. |
| if (req.events_to_observe().size() == 0) |
| remote_consumer->CloseObserveEventsResponseStream(); |
| } |
| |
| // Called by the IPC layer. |
| void ConsumerIPCService::QueryServiceState( |
| const protos::QueryServiceStateRequest&, |
| DeferredQueryServiceStateResponse resp) { |
| RemoteConsumer* remote_consumer = GetConsumerForCurrentRequest(); |
| auto it = pending_query_service_responses_.insert( |
| pending_query_service_responses_.end(), std::move(resp)); |
| auto weak_this = weak_ptr_factory_.GetWeakPtr(); |
| auto callback = [weak_this, it](bool success, |
| const TracingServiceState& svc_state) { |
| if (weak_this) |
| weak_this->OnQueryServiceCallback(success, svc_state, std::move(it)); |
| }; |
| remote_consumer->service_endpoint->QueryServiceState(callback); |
| } |
| |
| // Called by the service in response to a service_endpoint->Flush() request. |
| void ConsumerIPCService::OnFlushCallback( |
| bool success, |
| PendingFlushResponses::iterator pending_response_it) { |
| DeferredFlushResponse response(std::move(*pending_response_it)); |
| pending_flush_responses_.erase(pending_response_it); |
| if (success) { |
| response.Resolve(ipc::AsyncResult<protos::FlushResponse>::Create()); |
| } else { |
| response.Reject(); |
| } |
| } |
| |
| // Called by the service in response to service_endpoint->QueryServiceState(). |
| void ConsumerIPCService::OnQueryServiceCallback( |
| bool success, |
| const TracingServiceState& svc_state, |
| PendingQuerySvcResponses::iterator pending_response_it) { |
| DeferredQueryServiceStateResponse response(std::move(*pending_response_it)); |
| pending_query_service_responses_.erase(pending_response_it); |
| if (success) { |
| auto resp = ipc::AsyncResult<protos::QueryServiceStateResponse>::Create(); |
| svc_state.ToProto(resp->mutable_service_state()); |
| response.Resolve(std::move(resp)); |
| } else { |
| response.Reject(); |
| } |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| // RemoteConsumer methods |
| //////////////////////////////////////////////////////////////////////////////// |
| |
| ConsumerIPCService::RemoteConsumer::RemoteConsumer() = default; |
| ConsumerIPCService::RemoteConsumer::~RemoteConsumer() = default; |
| |
| // Invoked by the |core_service_| business logic after the ConnectConsumer() |
| // call. There is nothing to do here, we really expected the ConnectConsumer() |
| // to just work in the local case. |
| void ConsumerIPCService::RemoteConsumer::OnConnect() {} |
| |
| // Invoked by the |core_service_| business logic after we destroy the |
| // |service_endpoint| (in the RemoteConsumer dtor). |
| void ConsumerIPCService::RemoteConsumer::OnDisconnect() {} |
| |
| void ConsumerIPCService::RemoteConsumer::OnTracingDisabled() { |
| if (enable_tracing_response.IsBound()) { |
| auto result = ipc::AsyncResult<protos::EnableTracingResponse>::Create(); |
| result->set_disabled(true); |
| enable_tracing_response.Resolve(std::move(result)); |
| } |
| } |
| |
| void ConsumerIPCService::RemoteConsumer::OnTraceData( |
| std::vector<TracePacket> trace_packets, |
| bool has_more) { |
| if (!read_buffers_response.IsBound()) |
| return; |
| |
| auto result = ipc::AsyncResult<protos::ReadBuffersResponse>::Create(); |
| |
| // A TracePacket might be too big to fit into a single IPC message (max |
| // kIPCBufferSize). However a TracePacket is made of slices and each slice |
| // is way smaller than kIPCBufferSize (a slice size is effectively bounded by |
| // the max chunk size of the SharedMemoryABI). When sending a TracePacket, |
| // if its slices don't fit within one IPC, chunk them over several contiguous |
| // IPCs using the |last_slice_for_packet| for glueing on the other side. |
| static_assert(ipc::kIPCBufferSize >= SharedMemoryABI::kMaxPageSize * 2, |
| "kIPCBufferSize too small given the max possible slice size"); |
| |
| auto send_ipc_reply = [this, &result](bool more) { |
| result.set_has_more(more); |
| read_buffers_response.Resolve(std::move(result)); |
| result = ipc::AsyncResult<protos::ReadBuffersResponse>::Create(); |
| }; |
| |
| size_t approx_reply_size = 0; |
| for (const TracePacket& trace_packet : trace_packets) { |
| size_t num_slices_left_for_packet = trace_packet.slices().size(); |
| for (const Slice& slice : trace_packet.slices()) { |
| // Check if this slice would cause the IPC to overflow its max size and, |
| // if that is the case, split the IPCs. The "16" and "64" below are |
| // over-estimations of, respectively: |
| // 16: the preamble that prefixes each slice (there are 2 x size fields |
| // in the proto + the |last_slice_for_packet| bool). |
| // 64: the overhead of the IPC InvokeMethodReply + wire_protocol's frame. |
| // If these estimations are wrong, BufferedFrameDeserializer::Serialize() |
| // will hit a DCHECK anyways. |
| const size_t approx_slice_size = slice.size + 16; |
| if (approx_reply_size + approx_slice_size > ipc::kIPCBufferSize - 64) { |
| // If we hit this CHECK we got a single slice that is > kIPCBufferSize. |
| PERFETTO_CHECK(result->slices_size() > 0); |
| send_ipc_reply(/*has_more=*/true); |
| approx_reply_size = 0; |
| } |
| approx_reply_size += approx_slice_size; |
| |
| auto* res_slice = result->add_slices(); |
| res_slice->set_last_slice_for_packet(--num_slices_left_for_packet == 0); |
| res_slice->set_data(slice.start, slice.size); |
| } |
| } |
| send_ipc_reply(has_more); |
| } |
| |
| void ConsumerIPCService::RemoteConsumer::OnDetach(bool success) { |
| if (!success) { |
| std::move(detach_response).Reject(); |
| return; |
| } |
| auto resp = ipc::AsyncResult<protos::DetachResponse>::Create(); |
| std::move(detach_response).Resolve(std::move(resp)); |
| } |
| |
| void ConsumerIPCService::RemoteConsumer::OnAttach( |
| bool success, |
| const TraceConfig& trace_config) { |
| if (!success) { |
| std::move(attach_response).Reject(); |
| return; |
| } |
| auto response = ipc::AsyncResult<protos::AttachResponse>::Create(); |
| trace_config.ToProto(response->mutable_trace_config()); |
| std::move(attach_response).Resolve(std::move(response)); |
| } |
| |
| void ConsumerIPCService::RemoteConsumer::OnTraceStats(bool success, |
| const TraceStats& stats) { |
| if (!success) { |
| std::move(get_trace_stats_response).Reject(); |
| return; |
| } |
| auto response = ipc::AsyncResult<protos::GetTraceStatsResponse>::Create(); |
| stats.ToProto(response->mutable_trace_stats()); |
| std::move(get_trace_stats_response).Resolve(std::move(response)); |
| } |
| |
| void ConsumerIPCService::RemoteConsumer::OnObservableEvents( |
| const ObservableEvents& events) { |
| if (!observe_events_response.IsBound()) |
| return; |
| |
| auto result = ipc::AsyncResult<protos::ObserveEventsResponse>::Create(); |
| result.set_has_more(true); |
| events.ToProto(result->mutable_events()); |
| observe_events_response.Resolve(std::move(result)); |
| } |
| |
| void ConsumerIPCService::RemoteConsumer::CloseObserveEventsResponseStream() { |
| if (!observe_events_response.IsBound()) |
| return; |
| |
| auto result = ipc::AsyncResult<protos::ObserveEventsResponse>::Create(); |
| result.set_has_more(false); |
| observe_events_response.Resolve(std::move(result)); |
| } |
| |
| } // namespace perfetto |