blob: 5f8dd4fb3ee0c5bca3876138c93d2d0ad7bb13a6 [file] [log] [blame]
Primiano Tucci5944dd72019-05-21 23:56:17 +01001/*
2 * Copyright (C) 2019 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Primiano Tucci2c5488f2019-06-01 03:27:28 +010017#include "src/tracing/internal/tracing_muxer_impl.h"
Primiano Tucci5944dd72019-05-21 23:56:17 +010018
19#include <algorithm>
20#include <atomic>
Sami Kyostila389861f2019-07-09 20:52:43 +010021#include <mutex>
Primiano Tucci5944dd72019-05-21 23:56:17 +010022#include <vector>
23
Primiano Tucci6dca1a32019-06-14 11:50:00 +010024#include "perfetto/base/build_config.h"
Primiano Tucci5944dd72019-05-21 23:56:17 +010025#include "perfetto/base/logging.h"
26#include "perfetto/base/task_runner.h"
Primiano Tucci2c5488f2019-06-01 03:27:28 +010027#include "perfetto/ext/base/thread_checker.h"
Sami Kyostila389861f2019-07-09 20:52:43 +010028#include "perfetto/ext/base/waitable_event.h"
Primiano Tucci2c5488f2019-06-01 03:27:28 +010029#include "perfetto/ext/tracing/core/trace_packet.h"
30#include "perfetto/ext/tracing/core/trace_writer.h"
31#include "perfetto/ext/tracing/core/tracing_service.h"
Florian Mayer974a8b82019-10-03 10:04:27 +010032#include "perfetto/tracing/buffer_exhausted_policy.h"
Primiano Tucci0f9e0222019-06-05 09:36:41 +010033#include "perfetto/tracing/core/data_source_config.h"
Primiano Tucci2c5488f2019-06-01 03:27:28 +010034#include "perfetto/tracing/data_source.h"
35#include "perfetto/tracing/internal/data_source_internal.h"
36#include "perfetto/tracing/trace_writer_base.h"
37#include "perfetto/tracing/tracing.h"
38#include "perfetto/tracing/tracing_backend.h"
39#include "src/tracing/internal/in_process_tracing_backend.h"
40#include "src/tracing/internal/system_tracing_backend.h"
Primiano Tucci5944dd72019-05-21 23:56:17 +010041
42namespace perfetto {
43namespace internal {
44
Primiano Tuccidd5ebc92019-07-25 01:09:37 +010045namespace {
46
47class StopArgsImpl : public DataSourceBase::StopArgs {
48 public:
49 std::function<void()> HandleStopAsynchronously() const override {
50 auto closure = std::move(async_stop_closure);
51 async_stop_closure = std::function<void()>();
52 return closure;
53 }
54
55 mutable std::function<void()> async_stop_closure;
56};
57
58} // namespace
59
Primiano Tucci5944dd72019-05-21 23:56:17 +010060// ----- Begin of TracingMuxerImpl::ProducerImpl
61TracingMuxerImpl::ProducerImpl::ProducerImpl(TracingMuxerImpl* muxer,
62 TracingBackendId backend_id)
63 : muxer_(muxer), backend_id_(backend_id) {}
64TracingMuxerImpl::ProducerImpl::~ProducerImpl() = default;
65
66void TracingMuxerImpl::ProducerImpl::Initialize(
67 std::unique_ptr<ProducerEndpoint> endpoint) {
68 service_ = std::move(endpoint);
69}
70
71void TracingMuxerImpl::ProducerImpl::OnConnect() {
72 PERFETTO_DLOG("Producer connected");
73 PERFETTO_DCHECK_THREAD(thread_checker_);
74 PERFETTO_DCHECK(!connected_);
75 connected_ = true;
76 muxer_->UpdateDataSourcesOnAllBackends();
77}
78
79void TracingMuxerImpl::ProducerImpl::OnDisconnect() {
80 PERFETTO_DCHECK_THREAD(thread_checker_);
81 connected_ = false;
Florian Mayer49781632019-10-08 17:23:28 +010082 // TODO: handle more graceful.
83 PERFETTO_ELOG("Cannot connect to traced. Is it running?");
Primiano Tucci5944dd72019-05-21 23:56:17 +010084}
85
86void TracingMuxerImpl::ProducerImpl::OnTracingSetup() {
87 PERFETTO_DCHECK_THREAD(thread_checker_);
88}
89
90void TracingMuxerImpl::ProducerImpl::SetupDataSource(
91 DataSourceInstanceID id,
92 const DataSourceConfig& cfg) {
93 PERFETTO_DCHECK_THREAD(thread_checker_);
94 muxer_->SetupDataSource(backend_id_, id, cfg);
95}
96
97void TracingMuxerImpl::ProducerImpl::StartDataSource(DataSourceInstanceID id,
98 const DataSourceConfig&) {
99 PERFETTO_DCHECK_THREAD(thread_checker_);
100 muxer_->StartDataSource(backend_id_, id);
101 service_->NotifyDataSourceStarted(id);
102}
103
104void TracingMuxerImpl::ProducerImpl::StopDataSource(DataSourceInstanceID id) {
105 PERFETTO_DCHECK_THREAD(thread_checker_);
Primiano Tuccidd5ebc92019-07-25 01:09:37 +0100106 muxer_->StopDataSource_AsyncBegin(backend_id_, id);
Primiano Tucci5944dd72019-05-21 23:56:17 +0100107}
108
109void TracingMuxerImpl::ProducerImpl::Flush(FlushRequestID flush_id,
110 const DataSourceInstanceID*,
111 size_t) {
112 // Flush is not plumbed for now, we just ack straight away.
113 PERFETTO_DCHECK_THREAD(thread_checker_);
114 service_->NotifyFlushComplete(flush_id);
115}
116
117void TracingMuxerImpl::ProducerImpl::ClearIncrementalState(
118 const DataSourceInstanceID*,
119 size_t) {
120 PERFETTO_DCHECK_THREAD(thread_checker_);
Sami Kyostila490cfbc2019-08-13 17:46:57 +0100121 // TODO(skyostil): Mark each affected data source's incremental state as
122 // needing to be cleared.
Primiano Tucci5944dd72019-05-21 23:56:17 +0100123}
124// ----- End of TracingMuxerImpl::ProducerImpl methods.
125
126// ----- Begin of TracingMuxerImpl::ConsumerImpl
127TracingMuxerImpl::ConsumerImpl::ConsumerImpl(TracingMuxerImpl* muxer,
128 TracingBackendId backend_id,
129 TracingSessionGlobalID session_id)
130 : muxer_(muxer), backend_id_(backend_id), session_id_(session_id) {}
131
132TracingMuxerImpl::ConsumerImpl::~ConsumerImpl() = default;
133
134void TracingMuxerImpl::ConsumerImpl::Initialize(
135 std::unique_ptr<ConsumerEndpoint> endpoint) {
136 PERFETTO_DCHECK_THREAD(thread_checker_);
137 service_ = std::move(endpoint);
Sami Kyostila389861f2019-07-09 20:52:43 +0100138 // Observe data source instance events so we get notified when tracing starts.
139 service_->ObserveEvents(ConsumerEndpoint::kDataSourceInstances);
Primiano Tucci5944dd72019-05-21 23:56:17 +0100140}
141
142void TracingMuxerImpl::ConsumerImpl::OnConnect() {
143 PERFETTO_DCHECK_THREAD(thread_checker_);
144 PERFETTO_DCHECK(!connected_);
145 connected_ = true;
146
147 // If the API client configured and started tracing before we connected,
148 // tell the backend about it now.
149 if (trace_config_) {
150 muxer_->SetupTracingSession(session_id_, trace_config_);
151 if (start_pending_)
152 muxer_->StartTracingSession(session_id_);
Sami Kyostila23bde0d2019-08-02 11:53:37 +0100153 if (stop_pending_)
154 muxer_->StopTracingSession(session_id_);
Primiano Tucci5944dd72019-05-21 23:56:17 +0100155 }
156}
157
158void TracingMuxerImpl::ConsumerImpl::OnDisconnect() {
159 PERFETTO_DCHECK_THREAD(thread_checker_);
160 // It shouldn't be necessary to call StopTracingSession. If we get this call
161 // it means that the service did shutdown before us, so there is no point
162 // trying it to ask it to stop the session. We should just remember to cleanup
163 // the consumer vector.
164 connected_ = false;
165
166 // TODO notify the client somehow.
167}
168
169void TracingMuxerImpl::ConsumerImpl::OnTracingDisabled() {
170 PERFETTO_DCHECK_THREAD(thread_checker_);
Sami Kyostila389861f2019-07-09 20:52:43 +0100171 PERFETTO_DCHECK(!stopped_);
172 stopped_ = true;
173 // If we're still waiting for the start event, fire it now. This may happen if
174 // there are no active data sources in the session.
Sami Kyostila23bde0d2019-08-02 11:53:37 +0100175 NotifyStartComplete();
176 NotifyStopComplete();
177}
178
179void TracingMuxerImpl::ConsumerImpl::NotifyStartComplete() {
180 PERFETTO_DCHECK_THREAD(thread_checker_);
Sami Kyostila389861f2019-07-09 20:52:43 +0100181 if (blocking_start_complete_callback_) {
182 muxer_->task_runner_->PostTask(
183 std::move(blocking_start_complete_callback_));
184 blocking_start_complete_callback_ = nullptr;
185 }
Sami Kyostila23bde0d2019-08-02 11:53:37 +0100186}
187
188void TracingMuxerImpl::ConsumerImpl::NotifyStopComplete() {
189 PERFETTO_DCHECK_THREAD(thread_checker_);
190 if (stop_complete_callback_) {
191 muxer_->task_runner_->PostTask(std::move(stop_complete_callback_));
192 stop_complete_callback_ = nullptr;
193 }
Sami Kyostila389861f2019-07-09 20:52:43 +0100194 if (blocking_stop_complete_callback_) {
195 muxer_->task_runner_->PostTask(std::move(blocking_stop_complete_callback_));
196 blocking_stop_complete_callback_ = nullptr;
197 }
Primiano Tucci5944dd72019-05-21 23:56:17 +0100198}
199
200void TracingMuxerImpl::ConsumerImpl::OnTraceData(
201 std::vector<TracePacket> packets,
202 bool has_more) {
203 PERFETTO_DCHECK_THREAD(thread_checker_);
204 if (!read_trace_callback_)
205 return;
206
207 size_t capacity = 0;
208 for (const auto& packet : packets) {
209 // 16 is an over-estimation of the proto preamble size
210 capacity += packet.size() + 16;
211 }
212
213 // The shared_ptr is to avoid making a copy of the buffer when PostTask-ing.
214 std::shared_ptr<std::vector<char>> buf(new std::vector<char>());
215 buf->reserve(capacity);
216 for (auto& packet : packets) {
217 char* start;
218 size_t size;
219 std::tie(start, size) = packet.GetProtoPreamble();
220 buf->insert(buf->end(), start, start + size);
221 for (auto& slice : packet.slices()) {
222 const auto* slice_data = reinterpret_cast<const char*>(slice.start);
223 buf->insert(buf->end(), slice_data, slice_data + slice.size);
224 }
225 }
226
227 auto callback = read_trace_callback_;
228 muxer_->task_runner_->PostTask([callback, buf, has_more] {
229 TracingSession::ReadTraceCallbackArgs callback_arg{};
230 callback_arg.data = &(*buf)[0];
231 callback_arg.size = buf->size();
232 callback_arg.has_more = has_more;
233 callback(callback_arg);
234 });
235
236 if (!has_more)
237 read_trace_callback_ = nullptr;
238}
239
Sami Kyostila389861f2019-07-09 20:52:43 +0100240void TracingMuxerImpl::ConsumerImpl::OnObservableEvents(
241 const ObservableEvents& events) {
242 if (events.instance_state_changes_size()) {
243 for (const auto& state_change : events.instance_state_changes()) {
244 DataSourceHandle handle{state_change.producer_name(),
245 state_change.data_source_name()};
246 data_source_states_[handle] =
247 state_change.state() ==
Primiano Tucci57dd66b2019-10-15 23:09:04 +0100248 ObservableEvents::DATA_SOURCE_INSTANCE_STATE_STARTED;
Sami Kyostila389861f2019-07-09 20:52:43 +0100249 }
250 // Data sources are first reported as being stopped before starting, so once
251 // all the data sources we know about have started we can declare tracing
252 // begun.
253 if (blocking_start_complete_callback_) {
254 bool all_data_sources_started = std::all_of(
255 data_source_states_.cbegin(), data_source_states_.cend(),
256 [](std::pair<DataSourceHandle, bool> state) { return state.second; });
Sami Kyostila23bde0d2019-08-02 11:53:37 +0100257 if (all_data_sources_started)
258 NotifyStartComplete();
Sami Kyostila389861f2019-07-09 20:52:43 +0100259 }
260 }
261}
262
Primiano Tucci5944dd72019-05-21 23:56:17 +0100263// The callbacks below are not used.
264void TracingMuxerImpl::ConsumerImpl::OnDetach(bool) {}
265void TracingMuxerImpl::ConsumerImpl::OnAttach(bool, const TraceConfig&) {}
266void TracingMuxerImpl::ConsumerImpl::OnTraceStats(bool, const TraceStats&) {}
Primiano Tucci5944dd72019-05-21 23:56:17 +0100267// ----- End of TracingMuxerImpl::ConsumerImpl
268
269// ----- Begin of TracingMuxerImpl::TracingSessionImpl
270
271// TracingSessionImpl is the RAII object returned to API clients when they
272// invoke Tracing::CreateTracingSession. They use it for starting/stopping
273// tracing.
274
275TracingMuxerImpl::TracingSessionImpl::TracingSessionImpl(
276 TracingMuxerImpl* muxer,
277 TracingSessionGlobalID session_id)
278 : muxer_(muxer), session_id_(session_id) {}
279
280// Can be destroyed from any thread.
281TracingMuxerImpl::TracingSessionImpl::~TracingSessionImpl() {
282 auto* muxer = muxer_;
283 auto session_id = session_id_;
284 muxer->task_runner_->PostTask(
285 [muxer, session_id] { muxer->DestroyTracingSession(session_id); });
286}
287
288// Can be called from any thread.
Sami Kyostilac09ca292019-07-26 19:10:15 +0100289void TracingMuxerImpl::TracingSessionImpl::Setup(const TraceConfig& cfg,
290 int fd) {
Primiano Tucci5944dd72019-05-21 23:56:17 +0100291 auto* muxer = muxer_;
292 auto session_id = session_id_;
293 std::shared_ptr<TraceConfig> trace_config(new TraceConfig(cfg));
Sami Kyostilac09ca292019-07-26 19:10:15 +0100294 if (fd >= 0) {
295 trace_config->set_write_into_file(true);
296 fd = dup(fd);
297 }
298 muxer->task_runner_->PostTask([muxer, session_id, trace_config, fd] {
299 muxer->SetupTracingSession(session_id, trace_config, base::ScopedFile(fd));
Primiano Tucci5944dd72019-05-21 23:56:17 +0100300 });
301}
302
303// Can be called from any thread.
304void TracingMuxerImpl::TracingSessionImpl::Start() {
305 auto* muxer = muxer_;
306 auto session_id = session_id_;
307 muxer->task_runner_->PostTask(
308 [muxer, session_id] { muxer->StartTracingSession(session_id); });
309}
310
Sami Kyostila389861f2019-07-09 20:52:43 +0100311// Can be called from any thread except the service thread.
312void TracingMuxerImpl::TracingSessionImpl::StartBlocking() {
313 PERFETTO_DCHECK(!muxer_->task_runner_->RunsTasksOnCurrentThread());
314 auto* muxer = muxer_;
315 auto session_id = session_id_;
316 base::WaitableEvent tracing_started;
317 muxer->task_runner_->PostTask([muxer, session_id, &tracing_started] {
318 auto* consumer = muxer->FindConsumer(session_id);
319 PERFETTO_DCHECK(!consumer->blocking_start_complete_callback_);
320 consumer->blocking_start_complete_callback_ = [&] {
321 tracing_started.Notify();
322 };
323 muxer->StartTracingSession(session_id);
324 });
325 tracing_started.Wait();
326}
327
Primiano Tucci5944dd72019-05-21 23:56:17 +0100328// Can be called from any thread.
329void TracingMuxerImpl::TracingSessionImpl::Stop() {
330 auto* muxer = muxer_;
331 auto session_id = session_id_;
332 muxer->task_runner_->PostTask(
333 [muxer, session_id] { muxer->StopTracingSession(session_id); });
334}
335
Sami Kyostila389861f2019-07-09 20:52:43 +0100336// Can be called from any thread except the service thread.
337void TracingMuxerImpl::TracingSessionImpl::StopBlocking() {
338 PERFETTO_DCHECK(!muxer_->task_runner_->RunsTasksOnCurrentThread());
339 auto* muxer = muxer_;
340 auto session_id = session_id_;
341 base::WaitableEvent tracing_stopped;
342 muxer->task_runner_->PostTask([muxer, session_id, &tracing_stopped] {
343 auto* consumer = muxer->FindConsumer(session_id);
344 PERFETTO_DCHECK(!consumer->blocking_stop_complete_callback_);
345 consumer->blocking_stop_complete_callback_ = [&] {
346 tracing_stopped.Notify();
347 };
348 muxer->StopTracingSession(session_id);
349 });
350 tracing_stopped.Wait();
351}
352
Primiano Tucci5944dd72019-05-21 23:56:17 +0100353// Can be called from any thread.
354void TracingMuxerImpl::TracingSessionImpl::ReadTrace(ReadTraceCallback cb) {
355 auto* muxer = muxer_;
356 auto session_id = session_id_;
357 muxer->task_runner_->PostTask([muxer, session_id, cb] {
358 muxer->ReadTracingSessionData(session_id, std::move(cb));
359 });
360}
361
362// Can be called from any thread.
363void TracingMuxerImpl::TracingSessionImpl::SetOnStopCallback(
364 std::function<void()> cb) {
365 auto* muxer = muxer_;
366 auto session_id = session_id_;
367 muxer->task_runner_->PostTask([muxer, session_id, cb] {
368 auto* consumer = muxer->FindConsumer(session_id);
369 consumer->stop_complete_callback_ = cb;
370 });
371}
372// ----- End of TracingMuxerImpl::TracingSessionImpl
373
374// static
375TracingMuxer* TracingMuxer::instance_ = nullptr;
376
377// This is called by perfetto::Tracing::Initialize().
378// Can be called on any thread. Typically, but not necessarily, that will be
379// the embedder's main thread.
380TracingMuxerImpl::TracingMuxerImpl(const TracingInitArgs& args)
381 : TracingMuxer(args.platform ? args.platform
382 : Platform::GetDefaultPlatform()) {
383 PERFETTO_DETACH_FROM_THREAD(thread_checker_);
384
385 // Create the thread where muxer, producers and service will live.
386 task_runner_ = platform_->CreateTaskRunner({});
387
388 // Run the initializer on that thread.
389 task_runner_->PostTask([this, args] { Initialize(args); });
390}
391
392void TracingMuxerImpl::Initialize(const TracingInitArgs& args) {
393 PERFETTO_DCHECK_THREAD(thread_checker_); // Rebind the thread checker.
394
Primiano Tucci696fa9c2019-08-22 16:33:52 +0200395 auto add_backend = [this, &args](TracingBackend* backend, BackendType type) {
Primiano Tucci5944dd72019-05-21 23:56:17 +0100396 TracingBackendId backend_id = backends_.size();
397 backends_.emplace_back();
398 RegisteredBackend& rb = backends_.back();
399 rb.backend = backend;
400 rb.id = backend_id;
401 rb.type = type;
402 rb.producer.reset(new ProducerImpl(this, backend_id));
403 TracingBackend::ConnectProducerArgs conn_args;
404 conn_args.producer = rb.producer.get();
405 conn_args.producer_name = platform_->GetCurrentProcessName();
406 conn_args.task_runner = task_runner_.get();
Primiano Tucci696fa9c2019-08-22 16:33:52 +0200407 conn_args.shmem_size_hint_bytes = args.shmem_size_hint_kb * 1024;
408 conn_args.shmem_page_size_hint_bytes = args.shmem_page_size_hint_kb * 1024;
Primiano Tucci5944dd72019-05-21 23:56:17 +0100409 rb.producer->Initialize(rb.backend->ConnectProducer(conn_args));
410 };
411
Primiano Tucci6dca1a32019-06-14 11:50:00 +0100412 if (args.backends & kSystemBackend) {
Primiano Tucci2bf82842019-08-27 07:10:55 +0200413#if (PERFETTO_BUILDFLAG(PERFETTO_IPC))
Primiano Tucci5944dd72019-05-21 23:56:17 +0100414 add_backend(SystemTracingBackend::GetInstance(), kSystemBackend);
Primiano Tucci6dca1a32019-06-14 11:50:00 +0100415#else
416 PERFETTO_ELOG("System backend not supporteed in the current configuration");
417#endif
418 }
Primiano Tucci5944dd72019-05-21 23:56:17 +0100419
420 if (args.backends & kInProcessBackend)
421 add_backend(InProcessTracingBackend::GetInstance(), kInProcessBackend);
422
423 if (args.backends & kCustomBackend) {
424 PERFETTO_CHECK(args.custom_backend);
425 add_backend(args.custom_backend, kCustomBackend);
426 }
427
428 if (args.backends & ~(kSystemBackend | kInProcessBackend | kCustomBackend)) {
429 PERFETTO_FATAL("Unsupported tracing backend type");
430 }
431}
432
Sami Kyostilabf10bbe2019-08-01 18:06:43 +0100433// Can be called from any thread (but not concurrently).
Primiano Tucci5944dd72019-05-21 23:56:17 +0100434bool TracingMuxerImpl::RegisterDataSource(
435 const DataSourceDescriptor& descriptor,
436 DataSourceFactory factory,
437 DataSourceStaticState* static_state) {
Sami Kyostilabf10bbe2019-08-01 18:06:43 +0100438 // Ignore repeated registrations.
439 if (static_state->index != kMaxDataSources)
440 return true;
441
Primiano Tucci5944dd72019-05-21 23:56:17 +0100442 static std::atomic<uint32_t> last_id{};
443 uint32_t new_index = last_id++;
444 if (new_index >= kMaxDataSources - 1) {
445 PERFETTO_DLOG(
446 "RegisterDataSource failed: too many data sources already registered");
447 return false;
448 }
449
450 // Initialize the static state.
451 static_assert(sizeof(static_state->instances[0]) >= sizeof(DataSourceState),
452 "instances[] size mismatch");
453 for (size_t i = 0; i < static_state->instances.size(); i++)
454 new (&static_state->instances[i]) DataSourceState{};
455
456 static_state->index = new_index;
457
458 task_runner_->PostTask([this, descriptor, factory, static_state] {
459 data_sources_.emplace_back();
460 RegisteredDataSource& rds = data_sources_.back();
461 rds.descriptor = descriptor;
462 rds.factory = factory;
463 rds.static_state = static_state;
464 UpdateDataSourcesOnAllBackends();
465 });
466 return true;
467}
468
469// Called by the service of one of the backends.
470void TracingMuxerImpl::SetupDataSource(TracingBackendId backend_id,
471 DataSourceInstanceID instance_id,
472 const DataSourceConfig& cfg) {
473 PERFETTO_DCHECK_THREAD(thread_checker_);
474 PERFETTO_DLOG("Setting up data source %" PRIu64 " %s", instance_id,
475 cfg.name().c_str());
476
477 for (const auto& rds : data_sources_) {
478 if (rds.descriptor.name() != cfg.name())
479 continue;
480
481 DataSourceStaticState& static_state = *rds.static_state;
482 for (uint32_t i = 0; i < kMaxDataSourceInstances; i++) {
483 // Find a free slot.
484 if (static_state.TryGet(i))
485 continue;
486
487 auto* internal_state =
488 reinterpret_cast<DataSourceState*>(&static_state.instances[i]);
Primiano Tucci0cb4a1a2019-08-08 11:28:18 +0200489 std::lock_guard<std::recursive_mutex> guard(internal_state->lock);
Primiano Tucci5944dd72019-05-21 23:56:17 +0100490 static_assert(
491 std::is_same<decltype(internal_state->data_source_instance_id),
492 DataSourceInstanceID>::value,
493 "data_source_instance_id type mismatch");
494 internal_state->backend_id = backend_id;
495 internal_state->data_source_instance_id = instance_id;
496 internal_state->buffer_id =
497 static_cast<internal::BufferId>(cfg.target_buffer());
498 internal_state->data_source = rds.factory();
499
500 // This must be made at the end. See matching acquire-load in
501 // DataSource::Trace().
502 static_state.valid_instances.fetch_or(1 << i, std::memory_order_acq_rel);
503
504 DataSourceBase::SetupArgs setup_args;
505 setup_args.config = &cfg;
506 internal_state->data_source->OnSetup(setup_args);
Sami Kyostilaa6e50b82019-07-31 17:45:50 +0100507 return;
Primiano Tucci5944dd72019-05-21 23:56:17 +0100508 }
Florian Mayer3e39cf92019-07-31 12:03:51 +0100509 PERFETTO_ELOG(
510 "Maximum number of data source instances exhausted. "
511 "Dropping data source %" PRIu64,
512 instance_id);
Primiano Tucci5944dd72019-05-21 23:56:17 +0100513 }
514}
515
516// Called by the service of one of the backends.
517void TracingMuxerImpl::StartDataSource(TracingBackendId backend_id,
518 DataSourceInstanceID instance_id) {
519 PERFETTO_DLOG("Starting data source %" PRIu64, instance_id);
520 PERFETTO_DCHECK_THREAD(thread_checker_);
Primiano Tucci5944dd72019-05-21 23:56:17 +0100521
Primiano Tuccidd5ebc92019-07-25 01:09:37 +0100522 auto ds = FindDataSource(backend_id, instance_id);
523 if (!ds) {
524 PERFETTO_ELOG("Could not find data source to start");
525 return;
Primiano Tucci5944dd72019-05-21 23:56:17 +0100526 }
Primiano Tuccidd5ebc92019-07-25 01:09:37 +0100527
Primiano Tucci0cb4a1a2019-08-08 11:28:18 +0200528 std::lock_guard<std::recursive_mutex> guard(ds.internal_state->lock);
Primiano Tuccidd5ebc92019-07-25 01:09:37 +0100529 ds.internal_state->trace_lambda_enabled = true;
530 ds.internal_state->data_source->OnStart(DataSourceBase::StartArgs{});
Primiano Tucci5944dd72019-05-21 23:56:17 +0100531}
532
533// Called by the service of one of the backends.
Primiano Tuccidd5ebc92019-07-25 01:09:37 +0100534void TracingMuxerImpl::StopDataSource_AsyncBegin(
535 TracingBackendId backend_id,
536 DataSourceInstanceID instance_id) {
Primiano Tucci5944dd72019-05-21 23:56:17 +0100537 PERFETTO_DLOG("Stopping data source %" PRIu64, instance_id);
538 PERFETTO_DCHECK_THREAD(thread_checker_);
Primiano Tucci5944dd72019-05-21 23:56:17 +0100539
Primiano Tuccidd5ebc92019-07-25 01:09:37 +0100540 auto ds = FindDataSource(backend_id, instance_id);
541 if (!ds) {
542 PERFETTO_ELOG("Could not find data source to stop");
543 return;
Primiano Tucci5944dd72019-05-21 23:56:17 +0100544 }
Primiano Tuccidd5ebc92019-07-25 01:09:37 +0100545
546 StopArgsImpl stop_args{};
547 stop_args.async_stop_closure = [this, backend_id, instance_id] {
548 // TracingMuxerImpl is long lived, capturing |this| is okay.
549 // The notification closure can be moved out of the StopArgs by the
550 // embedder to handle stop asynchronously. The embedder might then
551 // call the closure on a different thread than the current one, hence
552 // this nested PostTask().
553 task_runner_->PostTask([this, backend_id, instance_id] {
554 StopDataSource_AsyncEnd(backend_id, instance_id);
555 });
556 };
557
558 {
Primiano Tucci0cb4a1a2019-08-08 11:28:18 +0200559 std::lock_guard<std::recursive_mutex> guard(ds.internal_state->lock);
Primiano Tuccidd5ebc92019-07-25 01:09:37 +0100560 ds.internal_state->data_source->OnStop(stop_args);
561 }
562
563 // If the embedder hasn't called StopArgs.HandleStopAsynchronously() run the
564 // async closure here. In theory we could avoid the PostTask and call
565 // straight into CompleteDataSourceAsyncStop(). We keep that to reduce
566 // divergencies between the deferred-stop vs non-deferred-stop code paths.
567 if (stop_args.async_stop_closure)
568 std::move(stop_args.async_stop_closure)();
569}
570
571void TracingMuxerImpl::StopDataSource_AsyncEnd(
572 TracingBackendId backend_id,
573 DataSourceInstanceID instance_id) {
574 PERFETTO_DLOG("Ending async stop of data source %" PRIu64, instance_id);
575 PERFETTO_DCHECK_THREAD(thread_checker_);
576
577 auto ds = FindDataSource(backend_id, instance_id);
578 if (!ds) {
579 PERFETTO_ELOG(
580 "Async stop of data source %" PRIu64
581 " failed. This might be due to calling the async_stop_closure twice.",
582 instance_id);
583 return;
584 }
585
586 const uint32_t mask = ~(1 << ds.instance_idx);
587 ds.static_state->valid_instances.fetch_and(mask, std::memory_order_acq_rel);
588
589 // Take the mutex to prevent that the data source is in the middle of
590 // a Trace() execution where it called GetDataSourceLocked() while we
591 // destroy it.
592 {
Primiano Tucci0cb4a1a2019-08-08 11:28:18 +0200593 std::lock_guard<std::recursive_mutex> guard(ds.internal_state->lock);
Primiano Tuccidd5ebc92019-07-25 01:09:37 +0100594 ds.internal_state->trace_lambda_enabled = false;
595 ds.internal_state->data_source.reset();
596 }
597
598 // The other fields of internal_state are deliberately *not* cleared.
599 // See races-related comments of DataSource::Trace().
600
601 TracingMuxer::generation_++;
602
603 // |backends_| is append-only, Backend instances are always valid.
604 PERFETTO_CHECK(backend_id < backends_.size());
605 ProducerImpl* producer = backends_[backend_id].producer.get();
606 if (producer && producer->connected_)
607 producer->service_->NotifyDataSourceStopped(instance_id);
Primiano Tucci5944dd72019-05-21 23:56:17 +0100608}
609
610void TracingMuxerImpl::DestroyStoppedTraceWritersForCurrentThread() {
611 // Iterate across all possible data source types.
612 auto cur_generation = generation_.load(std::memory_order_acquire);
613 auto* root_tls = GetOrCreateTracingTLS();
614 for (size_t ds_idx = 0; ds_idx < kMaxDataSources; ds_idx++) {
615 // |tls| has a vector of per-data-source-instance thread-local state.
616 DataSourceThreadLocalState& tls = root_tls->data_sources_tls[ds_idx];
617 DataSourceStaticState* static_state = tls.static_state;
618 if (!static_state)
619 continue; // Slot not used.
620
621 // Iterate across all possible instances for this data source.
622 for (uint32_t inst = 0; inst < kMaxDataSourceInstances; inst++) {
623 DataSourceInstanceThreadLocalState& ds_tls = tls.per_instance[inst];
624 if (!ds_tls.trace_writer)
625 continue;
626
627 DataSourceState* ds_state = static_state->TryGet(inst);
628 if (ds_state && ds_state->backend_id == ds_tls.backend_id &&
629 ds_state->buffer_id == ds_tls.buffer_id) {
630 continue;
631 }
632
633 // The DataSource instance has been destroyed or recycled.
634 ds_tls.Reset(); // Will also destroy the |ds_tls.trace_writer|.
635 }
636 }
637 root_tls->generation = cur_generation;
638}
639
640// Called both when a new data source is registered or when a new backend
641// connects. In both cases we want to be sure we reflected the data source
642// registrations on the backends.
643void TracingMuxerImpl::UpdateDataSourcesOnAllBackends() {
644 PERFETTO_DCHECK_THREAD(thread_checker_);
645 for (RegisteredDataSource& rds : data_sources_) {
646 for (RegisteredBackend& backend : backends_) {
647 // We cannot call RegisterDataSource on the backend before it connects.
648 if (!backend.producer->connected_)
649 continue;
650
651 PERFETTO_DCHECK(rds.static_state->index < kMaxDataSourceInstances);
652 if (backend.producer->registered_data_sources_.test(
653 rds.static_state->index))
654 continue;
655
656 rds.descriptor.set_will_notify_on_start(true);
657 rds.descriptor.set_will_notify_on_stop(true);
658 backend.producer->service_->RegisterDataSource(rds.descriptor);
659 backend.producer->registered_data_sources_.set(rds.static_state->index);
660 }
661 }
662}
663
664void TracingMuxerImpl::SetupTracingSession(
665 TracingSessionGlobalID session_id,
Sami Kyostilac09ca292019-07-26 19:10:15 +0100666 const std::shared_ptr<TraceConfig>& trace_config,
667 base::ScopedFile trace_fd) {
Primiano Tucci5944dd72019-05-21 23:56:17 +0100668 PERFETTO_DCHECK_THREAD(thread_checker_);
Sami Kyostilac09ca292019-07-26 19:10:15 +0100669 PERFETTO_CHECK(!trace_fd || trace_config->write_into_file());
Primiano Tucci5944dd72019-05-21 23:56:17 +0100670
671 auto* consumer = FindConsumer(session_id);
Primiano Tucci5944dd72019-05-21 23:56:17 +0100672 if (!consumer)
673 return;
674
675 consumer->trace_config_ = trace_config;
Sami Kyostilac09ca292019-07-26 19:10:15 +0100676 if (trace_fd)
677 consumer->trace_fd_ = std::move(trace_fd);
Primiano Tucci5944dd72019-05-21 23:56:17 +0100678
679 if (!consumer->connected_)
680 return;
681
682 // Only used in the deferred start mode.
Sami Kyostilac09ca292019-07-26 19:10:15 +0100683 if (trace_config->deferred_start()) {
684 consumer->service_->EnableTracing(*trace_config,
685 std::move(consumer->trace_fd_));
686 }
Primiano Tucci5944dd72019-05-21 23:56:17 +0100687}
688
689void TracingMuxerImpl::StartTracingSession(TracingSessionGlobalID session_id) {
690 PERFETTO_DCHECK_THREAD(thread_checker_);
691
692 auto* consumer = FindConsumer(session_id);
693
694 if (!consumer)
695 return;
696
697 if (!consumer->trace_config_) {
698 PERFETTO_ELOG("Must call Setup(config) first");
699 return;
700 }
701
702 if (!consumer->connected_) {
703 consumer->start_pending_ = true;
704 return;
705 }
706
Sami Kyostila23bde0d2019-08-02 11:53:37 +0100707 consumer->start_pending_ = false;
Primiano Tucci5944dd72019-05-21 23:56:17 +0100708 if (consumer->trace_config_->deferred_start()) {
709 consumer->service_->StartTracing();
710 } else {
Sami Kyostilac09ca292019-07-26 19:10:15 +0100711 consumer->service_->EnableTracing(*consumer->trace_config_,
712 std::move(consumer->trace_fd_));
Primiano Tucci5944dd72019-05-21 23:56:17 +0100713 }
714
715 // TODO implement support for the deferred-start + fast-triggering case.
716}
717
718void TracingMuxerImpl::StopTracingSession(TracingSessionGlobalID session_id) {
719 PERFETTO_DCHECK_THREAD(thread_checker_);
720 auto* consumer = FindConsumer(session_id);
721 if (!consumer)
722 return;
723
724 if (!consumer->trace_config_) {
725 PERFETTO_ELOG("Must call Setup(config) and Start() first");
726 return;
727 }
728
Sami Kyostila23bde0d2019-08-02 11:53:37 +0100729 if (consumer->start_pending_) {
730 // If the session hasn't started yet, wait until it does before stopping.
731 consumer->stop_pending_ = true;
732 return;
733 }
734
735 consumer->stop_pending_ = false;
Sami Kyostila389861f2019-07-09 20:52:43 +0100736 if (consumer->stopped_) {
Sami Kyostila23bde0d2019-08-02 11:53:37 +0100737 // If the session was already stopped (e.g., it failed to start), don't try
738 // stopping again.
739 consumer->NotifyStopComplete();
Sami Kyostila389861f2019-07-09 20:52:43 +0100740 } else {
741 consumer->service_->DisableTracing();
742 }
743
Primiano Tucci5944dd72019-05-21 23:56:17 +0100744 consumer->trace_config_.reset();
745}
746
747void TracingMuxerImpl::DestroyTracingSession(
748 TracingSessionGlobalID session_id) {
749 PERFETTO_DCHECK_THREAD(thread_checker_);
750 for (RegisteredBackend& backend : backends_) {
751 auto pred = [session_id](const std::unique_ptr<ConsumerImpl>& consumer) {
752 return consumer->session_id_ == session_id;
753 };
Greg Kaiser64dbd4f2019-05-23 07:17:35 -0700754 backend.consumers.erase(std::remove_if(backend.consumers.begin(),
755 backend.consumers.end(), pred),
756 backend.consumers.end());
Primiano Tucci5944dd72019-05-21 23:56:17 +0100757 }
758}
759
760void TracingMuxerImpl::ReadTracingSessionData(
761 TracingSessionGlobalID session_id,
762 std::function<void(TracingSession::ReadTraceCallbackArgs)> callback) {
763 PERFETTO_DCHECK_THREAD(thread_checker_);
764 auto* consumer = FindConsumer(session_id);
765 if (!consumer)
766 return;
767 PERFETTO_DCHECK(!consumer->read_trace_callback_);
768 consumer->read_trace_callback_ = std::move(callback);
769 consumer->service_->ReadBuffers();
770}
771
772TracingMuxerImpl::ConsumerImpl* TracingMuxerImpl::FindConsumer(
773 TracingSessionGlobalID session_id) {
774 PERFETTO_DCHECK_THREAD(thread_checker_);
775 for (RegisteredBackend& backend : backends_) {
776 for (auto& consumer : backend.consumers) {
777 if (consumer->session_id_ == session_id)
778 return consumer.get();
779 }
780 }
781 return nullptr;
782}
783
Primiano Tuccidd5ebc92019-07-25 01:09:37 +0100784TracingMuxerImpl::FindDataSourceRes TracingMuxerImpl::FindDataSource(
785 TracingBackendId backend_id,
786 DataSourceInstanceID instance_id) {
787 PERFETTO_DCHECK_THREAD(thread_checker_);
788 for (const auto& rds : data_sources_) {
789 DataSourceStaticState* static_state = rds.static_state;
790 for (uint32_t i = 0; i < kMaxDataSourceInstances; i++) {
791 auto* internal_state = static_state->TryGet(i);
792 if (internal_state && internal_state->backend_id == backend_id &&
793 internal_state->data_source_instance_id == instance_id) {
794 return FindDataSourceRes(static_state, internal_state, i);
795 }
796 }
797 }
798 return FindDataSourceRes();
799}
800
Primiano Tucci5944dd72019-05-21 23:56:17 +0100801// Can be called from any thread.
802std::unique_ptr<TraceWriterBase> TracingMuxerImpl::CreateTraceWriter(
Florian Mayer974a8b82019-10-03 10:04:27 +0100803 DataSourceState* data_source,
804 BufferExhaustedPolicy buffer_exhausted_policy) {
Primiano Tucci5944dd72019-05-21 23:56:17 +0100805 ProducerImpl* producer = backends_[data_source->backend_id].producer.get();
Eric Seckler3d99b0f2019-08-09 15:15:19 +0100806 return producer->service_->CreateTraceWriter(data_source->buffer_id,
Florian Mayer974a8b82019-10-03 10:04:27 +0100807 buffer_exhausted_policy);
Primiano Tucci5944dd72019-05-21 23:56:17 +0100808}
809
810// This is called via the public API Tracing::NewTrace().
811// Can be called from any thread.
812std::unique_ptr<TracingSession> TracingMuxerImpl::CreateTracingSession(
813 BackendType backend_type) {
814 TracingSessionGlobalID session_id = ++next_tracing_session_id_;
815
816 // |backend_type| can only specify one backend, not an OR-ed mask.
817 PERFETTO_CHECK((backend_type & (backend_type - 1)) == 0);
818
819 // Capturing |this| is fine because the TracingMuxer is a leaky singleton.
820 task_runner_->PostTask([this, backend_type, session_id] {
821 for (RegisteredBackend& backend : backends_) {
822 if (backend_type && backend.type != backend_type)
823 continue;
824
825 backend.consumers.emplace_back(
826 new ConsumerImpl(this, backend.id, session_id));
827 auto& consumer = backend.consumers.back();
828 TracingBackend::ConnectConsumerArgs conn_args;
829 conn_args.consumer = consumer.get();
830 conn_args.task_runner = task_runner_.get();
831 consumer->Initialize(backend.backend->ConnectConsumer(conn_args));
832 return;
833 }
834 PERFETTO_ELOG(
835 "Cannot create tracing session, no tracing backend ready for type=%d",
836 backend_type);
837 });
838
839 return std::unique_ptr<TracingSession>(
840 new TracingSessionImpl(this, session_id));
841}
842
843void TracingMuxerImpl::InitializeInstance(const TracingInitArgs& args) {
844 if (instance_)
845 PERFETTO_FATAL("Tracing already initialized");
846 instance_ = new TracingMuxerImpl(args);
847}
848
849TracingMuxer::~TracingMuxer() = default;
850
851static_assert(std::is_same<internal::BufferId, BufferID>::value,
852 "public's BufferId and tracing/core's BufferID diverged");
853
854} // namespace internal
855} // namespace perfetto