Implement clock sync in multi-machine tracing
This revision implements the RelayPort exposed on the producer socket
for synchronization of clocks in multi-machine tracing:
* The RelayPort defines the IPC methods between the tracing service and
the relay service for operations that are specific to multi-machine
tracing (clock synchronization).
* ServiceIPCHostImpl exposes a second IPC service that implements
RelayPort to producer sockets (perfetto::RelayIPCService).
* The relay service connects to the RelayPort using the IPC client
implementation (perfetto::RelayIPCClient). On connected, the relay
service invokes the SyncClock() rpc methods with the tracing service to
capture clock snapshots on the host and client.
* In a tracing session, the tracing service emits the ClockSnapshots
packet for the remote machine and also the ClockSync packet for the
clock offsets of supported clocks.
Bug: 284258446
Test: unit tests and integration tests.
Change-Id: I95015c1e0bc8bec7b82b0fdac897900e0de923b7
diff --git a/Android.bp b/Android.bp
index f4559d8..aeecbbb 100644
--- a/Android.bp
+++ b/Android.bp
@@ -2419,6 +2419,7 @@
":perfetto_src_tracing_ipc_consumer_consumer",
":perfetto_src_tracing_ipc_default_socket",
":perfetto_src_tracing_ipc_producer_producer",
+ ":perfetto_src_tracing_ipc_producer_relay",
":perfetto_src_tracing_ipc_service_service",
":perfetto_src_tracing_platform_impl",
":perfetto_src_tracing_service_service",
@@ -4920,6 +4921,7 @@
srcs: [
"protos/perfetto/ipc/consumer_port.proto",
"protos/perfetto/ipc/producer_port.proto",
+ "protos/perfetto/ipc/relay_port.proto",
],
}
@@ -4951,6 +4953,7 @@
out: [
"external/perfetto/protos/perfetto/ipc/consumer_port.gen.cc",
"external/perfetto/protos/perfetto/ipc/producer_port.gen.cc",
+ "external/perfetto/protos/perfetto/ipc/relay_port.gen.cc",
],
}
@@ -4982,6 +4985,7 @@
out: [
"external/perfetto/protos/perfetto/ipc/consumer_port.gen.h",
"external/perfetto/protos/perfetto/ipc/producer_port.gen.h",
+ "external/perfetto/protos/perfetto/ipc/relay_port.gen.h",
],
export_include_dirs: [
".",
@@ -4995,6 +4999,7 @@
srcs: [
"protos/perfetto/ipc/consumer_port.proto",
"protos/perfetto/ipc/producer_port.proto",
+ "protos/perfetto/ipc/relay_port.proto",
],
}
@@ -5028,6 +5033,7 @@
out: [
"external/perfetto/protos/perfetto/ipc/consumer_port.ipc.cc",
"external/perfetto/protos/perfetto/ipc/producer_port.ipc.cc",
+ "external/perfetto/protos/perfetto/ipc/relay_port.ipc.cc",
],
}
@@ -5061,6 +5067,7 @@
out: [
"external/perfetto/protos/perfetto/ipc/consumer_port.ipc.h",
"external/perfetto/protos/perfetto/ipc/producer_port.ipc.h",
+ "external/perfetto/protos/perfetto/ipc/relay_port.ipc.h",
],
export_include_dirs: [
".",
@@ -5969,6 +5976,7 @@
"protos/perfetto/trace/profiling/smaps.proto",
"protos/perfetto/trace/ps/process_stats.proto",
"protos/perfetto/trace/ps/process_tree.proto",
+ "protos/perfetto/trace/remote_clock_sync.proto",
"protos/perfetto/trace/statsd/statsd_atom.proto",
"protos/perfetto/trace/sys_stats/sys_stats.proto",
"protos/perfetto/trace/system_info.proto",
@@ -7606,6 +7614,7 @@
srcs: [
"protos/perfetto/trace/extension_descriptor.proto",
"protos/perfetto/trace/memory_graph.proto",
+ "protos/perfetto/trace/remote_clock_sync.proto",
"protos/perfetto/trace/test_event.proto",
"protos/perfetto/trace/test_extensions.proto",
"protos/perfetto/trace/trace.proto",
@@ -7660,6 +7669,7 @@
out: [
"external/perfetto/protos/perfetto/trace/extension_descriptor.gen.cc",
"external/perfetto/protos/perfetto/trace/memory_graph.gen.cc",
+ "external/perfetto/protos/perfetto/trace/remote_clock_sync.gen.cc",
"external/perfetto/protos/perfetto/trace/test_event.gen.cc",
"external/perfetto/protos/perfetto/trace/test_extensions.gen.cc",
"external/perfetto/protos/perfetto/trace/trace.gen.cc",
@@ -7714,6 +7724,7 @@
out: [
"external/perfetto/protos/perfetto/trace/extension_descriptor.gen.h",
"external/perfetto/protos/perfetto/trace/memory_graph.gen.h",
+ "external/perfetto/protos/perfetto/trace/remote_clock_sync.gen.h",
"external/perfetto/protos/perfetto/trace/test_event.gen.h",
"external/perfetto/protos/perfetto/trace/test_extensions.gen.h",
"external/perfetto/protos/perfetto/trace/trace.gen.h",
@@ -7733,6 +7744,7 @@
srcs: [
"protos/perfetto/trace/extension_descriptor.proto",
"protos/perfetto/trace/memory_graph.proto",
+ "protos/perfetto/trace/remote_clock_sync.proto",
"protos/perfetto/trace/test_event.proto",
"protos/perfetto/trace/test_extensions.proto",
"protos/perfetto/trace/trace.proto",
@@ -7786,6 +7798,7 @@
out: [
"external/perfetto/protos/perfetto/trace/extension_descriptor.pb.cc",
"external/perfetto/protos/perfetto/trace/memory_graph.pb.cc",
+ "external/perfetto/protos/perfetto/trace/remote_clock_sync.pb.cc",
"external/perfetto/protos/perfetto/trace/test_event.pb.cc",
"external/perfetto/protos/perfetto/trace/test_extensions.pb.cc",
"external/perfetto/protos/perfetto/trace/trace.pb.cc",
@@ -7839,6 +7852,7 @@
out: [
"external/perfetto/protos/perfetto/trace/extension_descriptor.pb.h",
"external/perfetto/protos/perfetto/trace/memory_graph.pb.h",
+ "external/perfetto/protos/perfetto/trace/remote_clock_sync.pb.h",
"external/perfetto/protos/perfetto/trace/test_event.pb.h",
"external/perfetto/protos/perfetto/trace/test_extensions.pb.h",
"external/perfetto/protos/perfetto/trace/trace.pb.h",
@@ -7858,6 +7872,7 @@
srcs: [
"protos/perfetto/trace/extension_descriptor.proto",
"protos/perfetto/trace/memory_graph.proto",
+ "protos/perfetto/trace/remote_clock_sync.proto",
"protos/perfetto/trace/test_event.proto",
"protos/perfetto/trace/test_extensions.proto",
"protos/perfetto/trace/trace.proto",
@@ -7912,6 +7927,7 @@
out: [
"external/perfetto/protos/perfetto/trace/extension_descriptor.pbzero.cc",
"external/perfetto/protos/perfetto/trace/memory_graph.pbzero.cc",
+ "external/perfetto/protos/perfetto/trace/remote_clock_sync.pbzero.cc",
"external/perfetto/protos/perfetto/trace/test_event.pbzero.cc",
"external/perfetto/protos/perfetto/trace/test_extensions.pbzero.cc",
"external/perfetto/protos/perfetto/trace/trace.pbzero.cc",
@@ -7966,6 +7982,7 @@
out: [
"external/perfetto/protos/perfetto/trace/extension_descriptor.pbzero.h",
"external/perfetto/protos/perfetto/trace/memory_graph.pbzero.h",
+ "external/perfetto/protos/perfetto/trace/remote_clock_sync.pbzero.h",
"external/perfetto/protos/perfetto/trace/test_event.pbzero.h",
"external/perfetto/protos/perfetto/trace/test_extensions.pbzero.h",
"external/perfetto/protos/perfetto/trace/trace.pbzero.h",
@@ -13037,6 +13054,7 @@
filegroup {
name: "perfetto_src_tracing_core_core",
srcs: [
+ "src/tracing/core/clock_snapshots.cc",
"src/tracing/core/id_allocator.cc",
"src/tracing/core/in_process_shared_memory.cc",
"src/tracing/core/null_trace_writer.cc",
@@ -13120,12 +13138,21 @@
],
}
+// GN: //src/tracing/ipc/producer:relay
+filegroup {
+ name: "perfetto_src_tracing_ipc_producer_relay",
+ srcs: [
+ "src/tracing/ipc/producer/relay_ipc_client.cc",
+ ],
+}
+
// GN: //src/tracing/ipc/service:service
filegroup {
name: "perfetto_src_tracing_ipc_service_service",
srcs: [
"src/tracing/ipc/service/consumer_ipc_service.cc",
"src/tracing/ipc/service/producer_ipc_service.cc",
+ "src/tracing/ipc/service/relay_ipc_service.cc",
"src/tracing/ipc/service/service_ipc_host_impl.cc",
],
}
@@ -13423,6 +13450,7 @@
"protos/perfetto/trace/profiling/smaps.proto",
"protos/perfetto/trace/ps/process_stats.proto",
"protos/perfetto/trace/ps/process_tree.proto",
+ "protos/perfetto/trace/remote_clock_sync.proto",
"protos/perfetto/trace/statsd/statsd_atom.proto",
"protos/perfetto/trace/sys_stats/sys_stats.proto",
"protos/perfetto/trace/system_info.proto",
@@ -13952,6 +13980,7 @@
":perfetto_src_tracing_ipc_consumer_consumer",
":perfetto_src_tracing_ipc_default_socket",
":perfetto_src_tracing_ipc_producer_producer",
+ ":perfetto_src_tracing_ipc_producer_relay",
":perfetto_src_tracing_ipc_service_service",
":perfetto_src_tracing_ipc_unittests",
":perfetto_src_tracing_platform_impl",
diff --git a/BUILD b/BUILD
index 09790fd..75f048c 100644
--- a/BUILD
+++ b/BUILD
@@ -886,6 +886,7 @@
name = "include_perfetto_tracing_core_core",
srcs = [
"include/perfetto/tracing/core/chrome_config.h",
+ "include/perfetto/tracing/core/clock_snapshots.h",
"include/perfetto/tracing/core/data_source_config.h",
"include/perfetto/tracing/core/data_source_descriptor.h",
"include/perfetto/tracing/core/flush_flags.h",
@@ -3190,6 +3191,7 @@
perfetto_filegroup(
name = "src_tracing_core_core",
srcs = [
+ "src/tracing/core/clock_snapshots.cc",
"src/tracing/core/id_allocator.cc",
"src/tracing/core/id_allocator.h",
"src/tracing/core/in_process_shared_memory.cc",
@@ -3233,6 +3235,8 @@
"src/tracing/ipc/service/consumer_ipc_service.h",
"src/tracing/ipc/service/producer_ipc_service.cc",
"src/tracing/ipc/service/producer_ipc_service.h",
+ "src/tracing/ipc/service/relay_ipc_service.cc",
+ "src/tracing/ipc/service/relay_ipc_service.h",
"src/tracing/ipc/service/service_ipc_host_impl.cc",
"src/tracing/ipc/service/service_ipc_host_impl.h",
],
@@ -4229,6 +4233,7 @@
srcs = [
"protos/perfetto/ipc/consumer_port.proto",
"protos/perfetto/ipc/producer_port.proto",
+ "protos/perfetto/ipc/relay_port.proto",
],
visibility = [
PERFETTO_CONFIG.proto_library_visibility,
@@ -4783,6 +4788,7 @@
srcs = [
"protos/perfetto/trace/extension_descriptor.proto",
"protos/perfetto/trace/memory_graph.proto",
+ "protos/perfetto/trace/remote_clock_sync.proto",
"protos/perfetto/trace/test_event.proto",
"protos/perfetto/trace/test_extensions.proto",
"protos/perfetto/trace/trace.proto",
diff --git a/include/perfetto/ext/tracing/core/tracing_service.h b/include/perfetto/ext/tracing/core/tracing_service.h
index 2fa846c..2b33fee 100644
--- a/include/perfetto/ext/tracing/core/tracing_service.h
+++ b/include/perfetto/ext/tracing/core/tracing_service.h
@@ -30,6 +30,7 @@
#include "perfetto/ext/tracing/core/shared_memory.h"
#include "perfetto/ext/tracing/core/trace_packet.h"
#include "perfetto/tracing/buffer_exhausted_policy.h"
+#include "perfetto/tracing/core/clock_snapshots.h"
#include "perfetto/tracing/core/flush_flags.h"
#include "perfetto/tracing/core/forward_decls.h"
@@ -283,6 +284,29 @@
// compressed ones.
using CompressorFn = void (*)(std::vector<TracePacket>*);
CompressorFn compressor_fn = nullptr;
+
+ // Whether the relay endpoint is enabled on producer transport(s).
+ bool enable_relay_endpoint = false;
+};
+
+// The API for the Relay port of the Service. Subclassed by the
+// tracing_service_impl.cc business logic when returning it in response to the
+// ConnectRelayClient() method.
+class PERFETTO_EXPORT_COMPONENT RelayEndpoint {
+ public:
+ virtual ~RelayEndpoint();
+
+ // A snapshot of client and host clocks.
+ struct SyncClockSnapshot {
+ ClockSnapshotVector client_clock_snapshots;
+ ClockSnapshotVector host_clock_snapshots;
+ };
+
+ enum class SyncMode : uint32_t { PING = 1, UPDATE = 2 };
+ virtual void SyncClocks(SyncMode sync_mode,
+ ClockSnapshotVector client_clocks,
+ ClockSnapshotVector host_clocks) = 0;
+ virtual void Disconnect() = 0;
};
// The public API of the tracing Service business logic.
@@ -299,6 +323,7 @@
public:
using ProducerEndpoint = perfetto::ProducerEndpoint;
using ConsumerEndpoint = perfetto::ConsumerEndpoint;
+ using RelayEndpoint = perfetto::RelayEndpoint;
using InitOpts = TracingServiceInitOpts;
// Default sizes used by the service implementation and client library.
@@ -398,6 +423,18 @@
//
// This feature is currently used by Chrome.
virtual void SetSMBScrapingEnabled(bool enabled) = 0;
+
+ using RelayClientID = std::pair<base::MachineID, /*client ID*/ uint64_t>;
+ // Connects a remote RelayClient instance and obtains a RelayEndpoint, which
+ // is a 1:1 channel between one RelayClient and the Service. To disconnect
+ // just call Disconnect() of the RelayEndpoint instance. The relay client is
+ // connected using an identifier of MachineID and client ID. The service
+ // doesn't hold an object that represents the client because the relay port
+ // only has a client-to-host SyncClock() method.
+ //
+ // TODO(chinglinyu): connect the relay client using a RelayClient* object when
+ // we need host-to-client RPC method.
+ virtual std::unique_ptr<RelayEndpoint> ConnectRelayClient(RelayClientID) = 0;
};
} // namespace perfetto
diff --git a/include/perfetto/tracing/core/BUILD.gn b/include/perfetto/tracing/core/BUILD.gn
index 9ea0e2c..63c77ca 100644
--- a/include/perfetto/tracing/core/BUILD.gn
+++ b/include/perfetto/tracing/core/BUILD.gn
@@ -20,6 +20,7 @@
]
sources = [
"chrome_config.h",
+ "clock_snapshots.h",
"data_source_config.h",
"data_source_descriptor.h",
"flush_flags.h",
diff --git a/include/perfetto/tracing/core/clock_snapshots.h b/include/perfetto/tracing/core/clock_snapshots.h
new file mode 100644
index 0000000..49d4c67
--- /dev/null
+++ b/include/perfetto/tracing/core/clock_snapshots.h
@@ -0,0 +1,42 @@
+/*
+ * Copyright (C) 2024 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef INCLUDE_PERFETTO_TRACING_CORE_CLOCK_SNAPSHOTS_H_
+#define INCLUDE_PERFETTO_TRACING_CORE_CLOCK_SNAPSHOTS_H_
+
+#include <cstdint>
+#include <vector>
+
+namespace perfetto {
+struct ClockReading {
+ ClockReading(uint32_t _clock_id, uint64_t _timestamp)
+ : clock_id(_clock_id), timestamp(_timestamp) {}
+ ClockReading() = default;
+
+ // Identifier of the clock domain (of type protos::pbzero::BuiltinClock).
+ uint32_t clock_id = 0;
+ // Clock reading as uint64_t.
+ uint64_t timestamp = 0;
+};
+
+using ClockSnapshotVector = std::vector<ClockReading>;
+
+// Takes snapshots of clock readings of all supported built-in clocks.
+ClockSnapshotVector CaptureClockSnapshots();
+
+} // namespace perfetto
+
+#endif // INCLUDE_PERFETTO_TRACING_CORE_CLOCK_SNAPSHOTS_H_
diff --git a/include/perfetto/tracing/core/forward_decls.h b/include/perfetto/tracing/core/forward_decls.h
index 8d9ceb2..e3fc5f2 100644
--- a/include/perfetto/tracing/core/forward_decls.h
+++ b/include/perfetto/tracing/core/forward_decls.h
@@ -59,6 +59,8 @@
class TraceStats;
class TracingServiceCapabilities;
class TracingServiceState;
+class SyncClockRequest;
+class SyncClockResponse;
} // namespace gen
} // namespace protos
@@ -73,6 +75,8 @@
using TracingServiceCapabilities =
::perfetto::protos::gen::TracingServiceCapabilities;
using TracingServiceState = ::perfetto::protos::gen::TracingServiceState;
+using SyncClockRequest = ::perfetto::protos::gen::SyncClockRequest;
+using SyncClockResponse = ::perfetto::protos::gen::SyncClockResponse;
} // namespace perfetto
diff --git a/protos/perfetto/ipc/BUILD.gn b/protos/perfetto/ipc/BUILD.gn
index fb9bfcf..6d7eccb 100644
--- a/protos/perfetto/ipc/BUILD.gn
+++ b/protos/perfetto/ipc/BUILD.gn
@@ -26,6 +26,7 @@
sources = [
"consumer_port.proto",
"producer_port.proto",
+ "relay_port.proto",
]
deps = [
# Deliberately :cpp and not :@TYPE@. In both cases (:gen and :cpp target)
diff --git a/protos/perfetto/ipc/relay_port.proto b/protos/perfetto/ipc/relay_port.proto
new file mode 100644
index 0000000..2b9c014
--- /dev/null
+++ b/protos/perfetto/ipc/relay_port.proto
@@ -0,0 +1,58 @@
+/*
+ * Copyright (C) 2024 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto2";
+
+package perfetto.protos;
+
+// IPC interface definition for serving the requests from the relay service.
+service RelayPort {
+ // Synchronize the clocks with a guest. This is used for multi-machine tracing
+ // with a VM guest or a remote machine. The client may make consecutive calls
+ // of this method to get better results.
+ rpc SyncClock(SyncClockRequest) returns (SyncClockResponse) {}
+}
+
+// For the client to send its clock readings to the host.
+message SyncClockRequest {
+ // Relay service synchronizes its clocks with the host using round-trip
+ // messages of clock snapshots on both sides for an estimation of clock
+ // offsets of the built-in clocks.
+ enum Phase {
+ // Clock synchronization starts with the client (relay service) sends its
+ // clock snapshots in the PING phase. The host also snapshots its clocks on
+ // receiving the request and acks the client.
+ PING = 1;
+ // Clock synchronization completes with the client sending the second clock
+ // snapshot request after the round-trip of the PING request. The host
+ // estimates the clock offsets using 2 consecutive clock snapshots on both
+ // sides.
+ UPDATE = 2;
+ };
+
+ message Clock {
+ // The clock ID enumerated in builtin_clocks.proto.
+ optional uint32 clock_id = 1;
+ // The clock reading value (e.g. in nanoseconds for BUILTIN_CLOCK_BOOTTIME).
+ optional uint64 timestamp = 2;
+ }
+
+ optional Phase phase = 1;
+ repeated Clock clocks = 2;
+}
+
+// The host doesn't send any information back to the client.
+message SyncClockResponse {}
diff --git a/protos/perfetto/trace/BUILD.gn b/protos/perfetto/trace/BUILD.gn
index b131e7d..629b843 100644
--- a/protos/perfetto/trace/BUILD.gn
+++ b/protos/perfetto/trace/BUILD.gn
@@ -27,6 +27,7 @@
# By default add new protos here.
proto_sources_non_minimal = [
+ "remote_clock_sync.proto",
"trace_packet_defaults.proto",
"test_event.proto",
"test_extensions.proto",
diff --git a/protos/perfetto/trace/perfetto_trace.proto b/protos/perfetto/trace/perfetto_trace.proto
index dde9fdb..3654a9f 100644
--- a/protos/perfetto/trace/perfetto_trace.proto
+++ b/protos/perfetto/trace/perfetto_trace.proto
@@ -13535,6 +13535,24 @@
// End of protos/perfetto/trace/ps/process_tree.proto
+// Begin of protos/perfetto/trace/remote_clock_sync.proto
+
+// Records the parameters for aligning clock readings between machines.
+message RemoteClockSync {
+ // Synchronized clock snapshots taken on both sides of the relay port (the
+ // tracing service and the relay service). A round of clock synchronization
+ // IPC takes emits 2 SyncedClocks messages, i.e., client snapshot -> host
+ // snapshot -> client snapshot -> host snapshot.
+ message SyncedClocks {
+ optional ClockSnapshot client_clocks = 2;
+ optional ClockSnapshot host_clocks = 3;
+ }
+
+ repeated SyncedClocks synced_clocks = 1;
+}
+
+// End of protos/perfetto/trace/remote_clock_sync.proto
+
// Begin of protos/perfetto/trace/statsd/statsd_atom.proto
// Deliberate empty message. See comment on StatsdAtom#atom below.
@@ -14367,7 +14385,7 @@
// See the [Buffers and Dataflow](/docs/concepts/buffers.md) doc for details.
//
// Next reserved id: 14 (up to 15).
-// Next id: 107.
+// Next id: 108.
message TracePacket {
// The timestamp of the TracePacket.
// By default this timestamps refers to the trace clock (CLOCK_BOOTTIME on
@@ -14503,6 +14521,9 @@
// InputFlinger traces
AndroidInputEvent android_input_event = 106;
+ // Clock synchronization with remote machines.
+ RemoteClockSync remote_clock_sync = 107;
+
// This field is only used for testing.
// In previous versions of this proto this field had the id 268435455
// This caused many problems:
diff --git a/protos/perfetto/trace/remote_clock_sync.proto b/protos/perfetto/trace/remote_clock_sync.proto
new file mode 100644
index 0000000..a9d078c
--- /dev/null
+++ b/protos/perfetto/trace/remote_clock_sync.proto
@@ -0,0 +1,35 @@
+/*
+ * Copyright (C) 2024 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto2";
+
+import "protos/perfetto/trace/clock_snapshot.proto";
+
+package perfetto.protos;
+
+// Records the parameters for aligning clock readings between machines.
+message RemoteClockSync {
+ // Synchronized clock snapshots taken on both sides of the relay port (the
+ // tracing service and the relay service). A round of clock synchronization
+ // IPC takes emits 2 SyncedClocks messages, i.e., client snapshot -> host
+ // snapshot -> client snapshot -> host snapshot.
+ message SyncedClocks {
+ optional ClockSnapshot client_clocks = 2;
+ optional ClockSnapshot host_clocks = 3;
+ }
+
+ repeated SyncedClocks synced_clocks = 1;
+}
diff --git a/protos/perfetto/trace/trace_packet.proto b/protos/perfetto/trace/trace_packet.proto
index 8a03395..2547824 100644
--- a/protos/perfetto/trace/trace_packet.proto
+++ b/protos/perfetto/trace/trace_packet.proto
@@ -64,6 +64,7 @@
import "protos/perfetto/trace/profiling/smaps.proto";
import "protos/perfetto/trace/ps/process_stats.proto";
import "protos/perfetto/trace/ps/process_tree.proto";
+import "protos/perfetto/trace/remote_clock_sync.proto";
import "protos/perfetto/trace/sys_stats/sys_stats.proto";
import "protos/perfetto/trace/system_info.proto";
import "protos/perfetto/trace/system_info/cpu_info.proto";
@@ -101,7 +102,7 @@
// See the [Buffers and Dataflow](/docs/concepts/buffers.md) doc for details.
//
// Next reserved id: 14 (up to 15).
-// Next id: 107.
+// Next id: 108.
message TracePacket {
// The timestamp of the TracePacket.
// By default this timestamps refers to the trace clock (CLOCK_BOOTTIME on
@@ -237,6 +238,9 @@
// InputFlinger traces
AndroidInputEvent android_input_event = 106;
+ // Clock synchronization with remote machines.
+ RemoteClockSync remote_clock_sync = 107;
+
// This field is only used for testing.
// In previous versions of this proto this field had the id 268435455
// This caused many problems:
diff --git a/src/traced/service/service.cc b/src/traced/service/service.cc
index 25afe60..745fac5 100644
--- a/src/traced/service/service.cc
+++ b/src/traced/service/service.cc
@@ -26,6 +26,7 @@
#include "perfetto/ext/base/version.h"
#include "perfetto/ext/base/watchdog.h"
#include "perfetto/ext/traced/traced.h"
+#include "perfetto/ext/tracing/core/tracing_service.h"
#include "perfetto/ext/tracing/ipc/service_ipc_host.h"
#include "perfetto/tracing/default_socket.h"
#include "src/traced/service/builtin_producer.h"
@@ -53,6 +54,9 @@
<prod_mode> is the mode bits (e.g. 0660) for chmod the produce socket,
<cons_group> is the group name for chgrp the consumer socket, and
<cons_mode> is the mode bits (e.g. 0660) for chmod the consumer socket.
+ --enable-relay-endpoint : enables the relay endpoint on producer socket(s)
+ for traced_relay to communicate with traced in a multiple-machine
+ tracing session.
Example:
%s --set-socket-permissions traced-producer:0660:traced-consumer:0660
@@ -69,15 +73,19 @@
OPT_VERSION = 1000,
OPT_SET_SOCKET_PERMISSIONS = 1001,
OPT_BACKGROUND,
+ OPT_ENABLE_RELAY_ENDPOINT
};
bool background = false;
+ bool enable_relay_endpoint = false;
static const option long_options[] = {
{"background", no_argument, nullptr, OPT_BACKGROUND},
{"version", no_argument, nullptr, OPT_VERSION},
{"set-socket-permissions", required_argument, nullptr,
OPT_SET_SOCKET_PERMISSIONS},
+ {"enable-relay-endpoint", no_argument, nullptr,
+ OPT_ENABLE_RELAY_ENDPOINT},
{nullptr, 0, nullptr, 0}};
std::string producer_socket_group, consumer_socket_group,
@@ -107,6 +115,9 @@
consumer_socket_mode = parts[3];
break;
}
+ case OPT_ENABLE_RELAY_ENDPOINT:
+ enable_relay_endpoint = true;
+ break;
default:
PrintUsage(argv[0]);
return 1;
@@ -123,6 +134,8 @@
#if PERFETTO_BUILDFLAG(PERFETTO_ZLIB)
init_opts.compressor_fn = &ZlibCompressFn;
#endif
+ if (enable_relay_endpoint)
+ init_opts.enable_relay_endpoint = true;
svc = ServiceIPCHost::CreateInstance(&task_runner, init_opts);
// When built as part of the Android tree, the two socket are created and
diff --git a/src/traced_relay/BUILD.gn b/src/traced_relay/BUILD.gn
index 9acf970..6799e34 100644
--- a/src/traced_relay/BUILD.gn
+++ b/src/traced_relay/BUILD.gn
@@ -44,6 +44,7 @@
"../../protos/perfetto/ipc:wire_protocol_cpp",
"../base",
"../ipc:perfetto_ipc",
+ "../tracing/ipc/producer:relay", # For relay_ipc_client.h
]
}
diff --git a/src/traced_relay/relay_service.cc b/src/traced_relay/relay_service.cc
index 1a67439..f8bd4a0 100644
--- a/src/traced_relay/relay_service.cc
+++ b/src/traced_relay/relay_service.cc
@@ -16,6 +16,7 @@
#include "src/traced_relay/relay_service.h"
+#include <functional>
#include <memory>
#include "perfetto/base/build_config.h"
@@ -26,9 +27,13 @@
#include "perfetto/ext/base/string_utils.h"
#include "perfetto/ext/base/unix_socket.h"
#include "perfetto/ext/base/utils.h"
+#include "perfetto/ext/ipc/client.h"
+#include "perfetto/tracing/core/clock_snapshots.h"
+#include "perfetto/tracing/core/forward_decls.h"
#include "protos/perfetto/ipc/wire_protocol.gen.h"
#include "src/ipc/buffered_frame_deserializer.h"
#include "src/traced_relay/socket_relay_handler.h"
+#include "src/tracing/ipc/producer/relay_ipc_client.h"
#if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID) || \
PERFETTO_BUILDFLAG(PERFETTO_OS_LINUX) || \
@@ -43,6 +48,137 @@
using ::perfetto::protos::gen::IPCFrame;
namespace perfetto {
+namespace {
+
+std::string GenerateSetPeerIdentityRequest(int32_t pid,
+ int32_t uid,
+ const std::string& machine_id_hint) {
+ IPCFrame ipc_frame;
+ ipc_frame.set_request_id(0);
+
+ auto* set_peer_identity = ipc_frame.mutable_set_peer_identity();
+#if PERFETTO_BUILDFLAG(PERFETTO_OS_LINUX) || \
+ PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
+ set_peer_identity->set_pid(pid);
+#else
+ base::IgnoreResult(pid);
+#endif
+ set_peer_identity->set_uid(uid);
+ set_peer_identity->set_machine_id_hint(machine_id_hint);
+
+ return ipc::BufferedFrameDeserializer::Serialize(ipc_frame);
+}
+
+} // Anonymous namespace.
+
+RelayClient::~RelayClient() = default;
+RelayClient::RelayClient(const std::string& client_sock_name,
+ const std::string& machine_id_hint,
+ base::TaskRunner* task_runner,
+ OnErrorCallback on_error_callback)
+ : task_runner_(task_runner),
+ on_error_callback_(on_error_callback),
+ client_sock_name_(client_sock_name),
+ machine_id_hint_(machine_id_hint) {
+ Connect();
+}
+
+void RelayClient::Connect() {
+ auto sock_family = base::GetSockFamily(client_sock_name_.c_str());
+ client_sock_ =
+ base::UnixSocket::Connect(client_sock_name_, this, task_runner_,
+ sock_family, base::SockType::kStream);
+}
+
+void RelayClient::NotifyError() {
+ if (!on_error_callback_)
+ return;
+
+ // Can only notify once.
+ on_error_callback_();
+ on_error_callback_ = nullptr;
+}
+
+void RelayClient::OnConnect(base::UnixSocket* self, bool connected) {
+ if (!connected) {
+ return NotifyError();
+ }
+
+ // The RelayClient needs to send its peer identity to the host.
+ auto req = GenerateSetPeerIdentityRequest(
+ getpid(), static_cast<int32_t>(geteuid()), machine_id_hint_);
+ if (!self->SendStr(req)) {
+ return NotifyError();
+ }
+
+ // Create the IPC client with a connected socket.
+ PERFETTO_DCHECK(self == client_sock_.get());
+ auto sock_fd = client_sock_->ReleaseSocket().ReleaseFd();
+ client_sock_ = nullptr;
+ relay_ipc_client_ = std::make_unique<RelayIPCClient>(
+ ipc::Client::ConnArgs(std::move(sock_fd)),
+ weak_factory_for_ipc_client.GetWeakPtr(), task_runner_);
+}
+
+void RelayClient::OnServiceConnected() {
+ phase_ = Phase::PING;
+ SendSyncClockRequest();
+}
+
+void RelayClient::OnServiceDisconnected() {
+ NotifyError();
+}
+
+void RelayClient::SendSyncClockRequest() {
+ protos::gen::SyncClockRequest request;
+ switch (phase_) {
+ case Phase::CONNECTING:
+ PERFETTO_DFATAL("Should be unreachable.");
+ return;
+ case Phase::PING:
+ request.set_phase(SyncClockRequest::PING);
+ break;
+ case Phase::UPDATE:
+ request.set_phase(SyncClockRequest::UPDATE);
+ break;
+ }
+
+ ClockSnapshotVector snapshot_data = CaptureClockSnapshots();
+ for (auto& clock : snapshot_data) {
+ auto* clock_proto = request.add_clocks();
+ clock_proto->set_clock_id(clock.clock_id);
+ clock_proto->set_timestamp(clock.timestamp);
+ }
+
+ relay_ipc_client_->SyncClock(request);
+}
+
+void RelayClient::OnSyncClockResponse(const protos::gen::SyncClockResponse&) {
+ static constexpr uint32_t kSyncClockIntervalMs = 30000; // 30 Sec.
+ switch (phase_) {
+ case Phase::CONNECTING:
+ PERFETTO_DFATAL("Should be unreachable.");
+ break;
+ case Phase::PING:
+ phase_ = Phase::UPDATE;
+ SendSyncClockRequest();
+ break;
+ case Phase::UPDATE:
+ // The client finished one run of clock sync. Schedule for next sync after
+ // 30 sec.
+ clock_synced_with_service_for_testing_ = true;
+ task_runner_->PostDelayedTask(
+ [self = weak_factory_.GetWeakPtr()]() {
+ if (!self)
+ return;
+
+ self->phase_ = Phase::PING;
+ self->SendSyncClockRequest();
+ },
+ kSyncClockIntervalMs);
+ break;
+ }
+}
RelayService::RelayService(base::TaskRunner* task_runner)
: task_runner_(task_runner), machine_id_hint_(GetMachineIdHint()) {}
@@ -62,6 +198,8 @@
// Save |client_socket_name| for opening new client connection to remote
// service when a local producer connects.
client_socket_name_ = client_socket_name;
+
+ ConnectRelayClient();
}
void RelayService::OnNewIncomingConnection(
@@ -82,20 +220,15 @@
// This code pretends that we received a SetPeerIdentity frame from the
// connecting producer (while instead we are just forging it). The host traced
// will only accept only one SetPeerIdentity request pre-queued here.
- IPCFrame ipc_frame;
- ipc_frame.set_request_id(0);
- auto* set_peer_identity = ipc_frame.mutable_set_peer_identity();
+ int32_t pid = base::kInvalidPid;
#if PERFETTO_BUILDFLAG(PERFETTO_OS_LINUX) || \
PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
- set_peer_identity->set_pid(server_conn->peer_pid_linux());
+ pid = server_conn->peer_pid_linux();
#endif
- set_peer_identity->set_uid(
- static_cast<int32_t>(server_conn->peer_uid_posix()));
-
- set_peer_identity->set_machine_id_hint(machine_id_hint_);
-
+ auto req = GenerateSetPeerIdentityRequest(
+ pid, static_cast<int32_t>(server_conn->peer_uid_posix()),
+ machine_id_hint_);
// Buffer the SetPeerIdentity request.
- auto req = ipc::BufferedFrameDeserializer::Serialize(ipc_frame);
SocketWithBuffer server, client;
PERFETTO_CHECK(server.available_bytes() >= req.size());
memcpy(server.buffer(), req.data(), req.size());
@@ -140,9 +273,29 @@
PERFETTO_DFATAL("Should be unreachable.");
}
+void RelayService::ReconnectRelayClient() {
+ static constexpr uint32_t kMaxRelayClientRetryDelayMs = 30000;
+ task_runner_->PostDelayedTask([this]() { this->ConnectRelayClient(); },
+ relay_client_retry_delay_ms_);
+ relay_client_retry_delay_ms_ =
+ relay_client_->ipc_client_connected()
+ ? kDefaultRelayClientRetryDelayMs
+ : std::min(kMaxRelayClientRetryDelayMs,
+ relay_client_retry_delay_ms_ * 2);
+}
+
+void RelayService::ConnectRelayClient() {
+ if (relay_client_disabled_for_testing_)
+ return;
+
+ relay_client_ = std::make_unique<RelayClient>(
+ client_socket_name_, machine_id_hint_, task_runner_,
+ [this]() { this->ReconnectRelayClient(); });
+}
+
std::string RelayService::GetMachineIdHint(
bool use_pseudo_boot_id_for_testing) {
- // Gets kernel boot ID if possible.
+ // Gets kernel boot ID if available.
std::string boot_id;
if (!use_pseudo_boot_id_for_testing &&
base::ReadFile("/proc/sys/kernel/random/boot_id", &boot_id)) {
diff --git a/src/traced_relay/relay_service.h b/src/traced_relay/relay_service.h
index 1760df3..096961e 100644
--- a/src/traced_relay/relay_service.h
+++ b/src/traced_relay/relay_service.h
@@ -21,7 +21,9 @@
#include <vector>
#include "perfetto/ext/base/unix_socket.h"
+#include "perfetto/ext/tracing/core/tracing_service.h"
#include "src/traced_relay/socket_relay_handler.h"
+#include "src/tracing/ipc/producer/relay_ipc_client.h"
namespace perfetto {
@@ -29,6 +31,74 @@
class TaskRunner;
} // namespace base.
+// RelayClient provides a service that is independent of the relayed producers
+// and is global in the machine. This class implements time synchronization with
+// the host machine:
+// 1. Connects to the host machine using the client socket name (e.g.
+// vsock://2:10001) to port 10001 of VMADDR_CID_HOST.
+// 2. After the socket is connected, send the SetPeerIdentity message to let the
+// tracing service know the identity (machine ID) of this RelayClient.
+// 3. Then hand over the socket to RelayIPCClient, which is the client
+// implementation of the RelayPort RPC service.
+// 4. On any socket error, the RelayClient notifies its user using
+// OnErrorCallback so the user (class RelayService) may retry the connection.
+class RelayClient : private base::UnixSocket::EventListener,
+ private RelayIPCClient::EventListener {
+ public:
+ using OnErrorCallback = std::function<void()>;
+ RelayClient(const std::string& client_sock_name,
+ const std::string& machine_id_hint,
+ base::TaskRunner* task_runner,
+ OnErrorCallback on_destroy_callback);
+ ~RelayClient() override;
+
+ bool ipc_client_connected() const { return phase_ != Phase::CONNECTING; }
+ bool clock_synced_with_service_for_testing() const {
+ return clock_synced_with_service_for_testing_;
+ }
+
+ private:
+ // UnixSocket::EventListener implementation for connecting to the client
+ // socket.
+ void OnNewIncomingConnection(base::UnixSocket*,
+ std::unique_ptr<base::UnixSocket>) override {
+ // This class doesn't open a socket in listening mode.
+ PERFETTO_DFATAL("Should be unreachable.");
+ }
+ void OnConnect(base::UnixSocket* self, bool connected) override;
+ void OnDisconnect(base::UnixSocket*) override { NotifyError(); }
+ void OnDataAvailable(base::UnixSocket*) override {
+ // Should be handled in the IPC client.
+ PERFETTO_DFATAL("Should be unreachable.");
+ }
+
+ void NotifyError();
+ void Connect();
+ void SendSyncClockRequest();
+
+ // RelayIPCClient::EventListener implementation.
+ void OnServiceConnected() override;
+ void OnServiceDisconnected() override;
+ void OnSyncClockResponse(const protos::gen::SyncClockResponse& resp) override;
+
+ enum class Phase : uint32_t { CONNECTING = 1, PING, UPDATE };
+ Phase phase_ = Phase::CONNECTING;
+ bool clock_synced_with_service_for_testing_ = false;
+
+ base::TaskRunner* task_runner_;
+ OnErrorCallback on_error_callback_;
+
+ std::string client_sock_name_;
+ // A hint to the host traced for inferring the identifier of this machine.
+ std::string machine_id_hint_;
+ std::unique_ptr<base::UnixSocket> client_sock_;
+ std::unique_ptr<RelayIPCClient> relay_ipc_client_;
+
+ base::WeakPtrFactory<RelayIPCClient::EventListener>
+ weak_factory_for_ipc_client{this};
+ base::WeakPtrFactory<RelayClient> weak_factory_{this};
+};
+
// A class for relaying the producer data between the local producers and the
// remote tracing service.
class RelayService : public base::UnixSocket::EventListener {
@@ -43,9 +113,13 @@
static std::string GetMachineIdHint(
bool use_pseudo_boot_id_for_testing = false);
+ void SetRelayClientDisabledForTesting(bool disabled) {
+ relay_client_disabled_for_testing_ = disabled;
+ }
void SetMachineIdHintForTesting(std::string machine_id_hint) {
machine_id_hint_ = machine_id_hint;
}
+ RelayClient* relay_client_for_testing() { return relay_client_.get(); }
private:
struct PendingConnection {
@@ -65,6 +139,9 @@
void OnDisconnect(base::UnixSocket* self) override;
void OnDataAvailable(base::UnixSocket* self) override;
+ void ReconnectRelayClient();
+ void ConnectRelayClient();
+
base::TaskRunner* const task_runner_ = nullptr;
// A hint to the host traced for inferring the identifier of this machine.
@@ -78,6 +155,12 @@
std::vector<PendingConnection> pending_connections_;
SocketRelayHandler socket_relay_handler_;
+
+ std::unique_ptr<RelayClient> relay_client_;
+ // On RelayClient connection error, how long should we wait before retrying.
+ static constexpr uint32_t kDefaultRelayClientRetryDelayMs = 1000;
+ uint32_t relay_client_retry_delay_ms_ = kDefaultRelayClientRetryDelayMs;
+ bool relay_client_disabled_for_testing_ = false;
};
} // namespace perfetto
diff --git a/src/traced_relay/relay_service_integrationtest.cc b/src/traced_relay/relay_service_integrationtest.cc
index bb83ecf..9b34656 100644
--- a/src/traced_relay/relay_service_integrationtest.cc
+++ b/src/traced_relay/relay_service_integrationtest.cc
@@ -18,6 +18,7 @@
#include <string>
#include <vector>
#include "perfetto/ext/base/unix_socket.h"
+#include "protos/perfetto/trace/remote_clock_sync.gen.h"
#include "src/traced_relay/relay_service.h"
#include "src/base/test/test_task_runner.h"
@@ -59,11 +60,13 @@
}
TestHelper helper(&task_runner, TestHelper::Mode::kStartDaemons,
- sock_name.c_str());
+ sock_name.c_str(), /*enable_relay_endpoint=*/true);
ASSERT_EQ(helper.num_producers(), 1u);
helper.StartServiceIfRequired();
auto relay_service = std::make_unique<RelayService>(&task_runner);
+ // Don't let RelayClient interfere with the testing of relayed producers.
+ relay_service->SetRelayClientDisabledForTesting(true);
relay_service->Start("@traced_relay", sock_name.c_str());
@@ -152,10 +155,12 @@
for (auto& param : test_params) {
param.relay_service->Start(param.unix_sock_name.c_str(),
param.tcp_sock_name.c_str());
+ // Don't let RelayClient interfere with the testing of relayed producers.
+ param.relay_service->SetRelayClientDisabledForTesting(true);
}
TestHelper helper(&task_runner, TestHelper::Mode::kStartDaemons,
- relay_sock_name.c_str());
+ relay_sock_name.c_str(), /*enable_relay_endpoint=*/true);
ASSERT_EQ(helper.num_producers(), 2u);
helper.StartServiceIfRequired();
@@ -238,5 +243,82 @@
}
}
+TEST(TracedRelayIntegrationTest, RelayClient) {
+ base::TestTaskRunner task_runner;
+
+ std::string sock_name;
+ {
+ // Set up a server UnixSocket to find an unused TCP port.
+ base::UnixSocket::EventListener event_listener;
+ auto srv = base::UnixSocket::Listen("127.0.0.1:0", &event_listener,
+ &task_runner, base::SockFamily::kInet,
+ base::SockType::kStream);
+ ASSERT_TRUE(srv->is_listening());
+ sock_name = srv->GetSockAddr();
+ // Shut down |srv| here to free the port. It's unlikely that the port will
+ // be taken by another process so quickly before we reach the code below.
+ }
+
+ TestHelper helper(&task_runner, TestHelper::Mode::kStartDaemons,
+ sock_name.c_str(), /*enable_relay_endpoint=*/true);
+ ASSERT_EQ(helper.num_producers(), 1u);
+ helper.StartServiceIfRequired();
+
+ auto relay_service = std::make_unique<RelayService>(&task_runner);
+ // This also starts the RelayClient.
+ relay_service->Start("@traced_relay", sock_name.c_str());
+ ASSERT_TRUE(!!relay_service->relay_client_for_testing());
+
+ auto producer_connected =
+ task_runner.CreateCheckpoint("perfetto.FakeProducer.connected");
+ auto noop = []() {};
+ auto connected = [&]() { task_runner.PostTask(producer_connected); };
+
+ // We won't use the built-in fake producer and will start our own.
+ auto producer_thread = std::make_unique<FakeProducerThread>(
+ "@traced_relay", connected, noop, noop, "perfetto.FakeProducer");
+ producer_thread->Connect();
+ task_runner.RunUntilCheckpoint("perfetto.FakeProducer.connected");
+
+ helper.ConnectConsumer();
+ helper.WaitForConsumerConnect();
+
+ while (!relay_service->relay_client_for_testing()
+ ->clock_synced_with_service_for_testing())
+ task_runner.RunUntilIdle();
+
+ TraceConfig trace_config;
+ trace_config.add_buffers()->set_size_kb(1024);
+ trace_config.set_duration_ms(200);
+
+ // // Enable the producer.
+ auto* ds_config = trace_config.add_data_sources()->mutable_config();
+ ds_config->set_name("perfetto.FakeProducer");
+ ds_config->set_target_buffer(0);
+
+ helper.StartTracing(trace_config);
+ helper.WaitForTracingDisabled();
+
+ helper.ReadData();
+ helper.WaitForReadData();
+
+ const auto& packets = helper.trace();
+
+ bool clock_sync_packet_seen = false;
+ for (auto& packet : packets) {
+ if (!packet.has_remote_clock_sync())
+ continue;
+ clock_sync_packet_seen = true;
+
+ auto& synced_clocks = packet.remote_clock_sync().synced_clocks();
+ ASSERT_FALSE(synced_clocks.empty());
+ for (auto& clock_offset : synced_clocks) {
+ ASSERT_TRUE(clock_offset.has_client_clocks());
+ ASSERT_TRUE(clock_offset.has_host_clocks());
+ }
+ }
+ ASSERT_TRUE(clock_sync_packet_seen);
+}
+
} // namespace
} // namespace perfetto
diff --git a/src/traced_relay/relay_service_unittest.cc b/src/traced_relay/relay_service_unittest.cc
index 43e58f3..e27184d 100644
--- a/src/traced_relay/relay_service_unittest.cc
+++ b/src/traced_relay/relay_service_unittest.cc
@@ -61,6 +61,8 @@
TEST(RelayServiceTest, SetPeerIdentity) {
base::TestTaskRunner task_runner;
auto relay_service = std::make_unique<RelayService>(&task_runner);
+ // Disable the extra socket connection created by RelayClient.
+ relay_service->SetRelayClientDisabledForTesting(true);
// Set up a server UnixSocket to find an unused TCP port.
// The TCP connection emulates the socket to the host traced.
@@ -160,6 +162,105 @@
EXPECT_EQ(hint2, hint4);
}
+// Test that the RelayClient notifies its usr with the callback on
+// connection errors.
+TEST(RelayClientTest, OnErrorCallback) {
+ base::TestTaskRunner task_runner;
+
+ // Set up a server UnixSocket to find an unused TCP port.
+ // The TCP connection emulates the socket to the host traced.
+ TestEventListener tcp_listener;
+ auto tcp_server = base::UnixSocket::Listen(
+ "127.0.0.1:0", &tcp_listener, &task_runner, base::SockFamily::kInet,
+ base::SockType::kStream);
+ ASSERT_TRUE(tcp_server->is_listening());
+ auto tcp_sock_name = tcp_server->GetSockAddr();
+
+ auto on_relay_client_error =
+ task_runner.CreateCheckpoint("on_relay_client_error");
+ auto on_error_callback = [&]() { on_relay_client_error(); };
+ auto relay_client = std::make_unique<RelayClient>(
+ tcp_sock_name, "fake_machine_id_hint", &task_runner, on_error_callback);
+
+ base::UnixSocket* tcp_client_connection = nullptr;
+ auto tcp_client_connected =
+ task_runner.CreateCheckpoint("tcp_client_connected");
+ EXPECT_CALL(tcp_listener, OnNewIncomingConnection(_))
+ .WillOnce(Invoke([&](base::UnixSocket* client) {
+ tcp_client_connection = client;
+ tcp_client_connected();
+ }));
+ task_runner.RunUntilCheckpoint("tcp_client_connected");
+
+ // Just drain the data passed over the socket.
+ EXPECT_CALL(tcp_listener, OnDataAvailable(_))
+ .WillRepeatedly(Invoke([&](base::UnixSocket* tcp_conn) {
+ ::testing::IgnoreResult(tcp_conn->ReceiveString());
+ }));
+
+ EXPECT_FALSE(relay_client->clock_synced_with_service_for_testing());
+ // Shutdown the connected connection. The RelayClient should notice this
+ // error.
+ tcp_client_connection->Shutdown(true);
+ task_runner.RunUntilCheckpoint("on_relay_client_error");
+
+ // Shutdown the server. The RelayClient should notice that the connection is
+ // refused.
+ tcp_server->Shutdown(true);
+ on_relay_client_error =
+ task_runner.CreateCheckpoint("on_relay_client_error_2");
+ relay_client = std::make_unique<RelayClient>(
+ tcp_sock_name, "fake_machine_id_hint", &task_runner,
+ [&]() { on_relay_client_error(); });
+ task_runner.RunUntilCheckpoint("on_relay_client_error_2");
+}
+
+TEST(RelayClientTest, SetPeerIdentity) {
+ base::TestTaskRunner task_runner;
+ // Set up a server UnixSocket to find an unused TCP port.
+ // The TCP connection emulates the socket to the host traced.
+ TestEventListener tcp_listener;
+ auto tcp_server = base::UnixSocket::Listen(
+ "127.0.0.1:0", &tcp_listener, &task_runner, base::SockFamily::kInet,
+ base::SockType::kStream);
+ ASSERT_TRUE(tcp_server->is_listening());
+ auto tcp_sock_name = tcp_server->GetSockAddr();
+ auto on_error_callback = [&]() { FAIL() << "Should not be called"; };
+ auto relay_service = std::make_unique<RelayClient>(
+ tcp_sock_name, "fake_machine_id_hint", &task_runner, on_error_callback);
+
+ base::UnixSocket* tcp_client_connection = nullptr;
+ auto tcp_client_connected =
+ task_runner.CreateCheckpoint("tcp_client_connected");
+ EXPECT_CALL(tcp_listener, OnNewIncomingConnection(_))
+ .WillOnce(Invoke([&](base::UnixSocket* client) {
+ tcp_client_connection = client;
+ tcp_client_connected();
+ }));
+ task_runner.RunUntilCheckpoint("tcp_client_connected");
+
+ // Asserts that we can receive the SetPeerIdentity message.
+ auto peer_identity_recv = task_runner.CreateCheckpoint("peer_identity_recv");
+ ipc::BufferedFrameDeserializer deserializer;
+ EXPECT_CALL(tcp_listener, OnDataAvailable(_))
+ .WillRepeatedly(Invoke([&](base::UnixSocket* tcp_conn) {
+ auto buf = deserializer.BeginReceive();
+ auto rsize = tcp_conn->Receive(buf.data, buf.size);
+ EXPECT_TRUE(deserializer.EndReceive(rsize));
+
+ auto frame = deserializer.PopNextFrame();
+ EXPECT_TRUE(frame->has_set_peer_identity());
+
+ const auto& set_peer_identity = frame->set_peer_identity();
+ EXPECT_EQ(set_peer_identity.pid(), getpid());
+ EXPECT_EQ(set_peer_identity.uid(), static_cast<int32_t>(geteuid()));
+ EXPECT_EQ(set_peer_identity.machine_id_hint(), "fake_machine_id_hint");
+
+ peer_identity_recv();
+ }));
+ task_runner.RunUntilCheckpoint("peer_identity_recv");
+}
+
} // namespace
} // namespace perfetto
diff --git a/src/tracing/core/BUILD.gn b/src/tracing/core/BUILD.gn
index 53deb50..1441c97 100644
--- a/src/tracing/core/BUILD.gn
+++ b/src/tracing/core/BUILD.gn
@@ -29,6 +29,7 @@
"../../base",
]
sources = [
+ "clock_snapshots.cc",
"id_allocator.cc",
"id_allocator.h",
"in_process_shared_memory.cc",
diff --git a/src/tracing/core/clock_snapshots.cc b/src/tracing/core/clock_snapshots.cc
new file mode 100644
index 0000000..b32ac65
--- /dev/null
+++ b/src/tracing/core/clock_snapshots.cc
@@ -0,0 +1,72 @@
+/*
+ * Copyright (C) 2024 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "perfetto/tracing/core/clock_snapshots.h"
+
+#include "perfetto/base/build_config.h"
+#include "perfetto/base/time.h"
+#include "protos/perfetto/common/builtin_clock.pbzero.h"
+
+namespace perfetto {
+
+ClockSnapshotVector CaptureClockSnapshots() {
+ ClockSnapshotVector snapshot_data;
+#if !PERFETTO_BUILDFLAG(PERFETTO_OS_APPLE) && \
+ !PERFETTO_BUILDFLAG(PERFETTO_OS_WIN) && \
+ !PERFETTO_BUILDFLAG(PERFETTO_OS_NACL)
+ struct {
+ clockid_t id;
+ protos::pbzero::BuiltinClock type;
+ struct timespec ts;
+ } clocks[] = {
+ {CLOCK_BOOTTIME, protos::pbzero::BUILTIN_CLOCK_BOOTTIME, {0, 0}},
+ {CLOCK_REALTIME_COARSE,
+ protos::pbzero::BUILTIN_CLOCK_REALTIME_COARSE,
+ {0, 0}},
+ {CLOCK_MONOTONIC_COARSE,
+ protos::pbzero::BUILTIN_CLOCK_MONOTONIC_COARSE,
+ {0, 0}},
+ {CLOCK_REALTIME, protos::pbzero::BUILTIN_CLOCK_REALTIME, {0, 0}},
+ {CLOCK_MONOTONIC, protos::pbzero::BUILTIN_CLOCK_MONOTONIC, {0, 0}},
+ {CLOCK_MONOTONIC_RAW,
+ protos::pbzero::BUILTIN_CLOCK_MONOTONIC_RAW,
+ {0, 0}},
+ };
+ // First snapshot all the clocks as atomically as we can.
+ for (auto& clock : clocks) {
+ if (clock_gettime(clock.id, &clock.ts) == -1)
+ PERFETTO_DLOG("clock_gettime failed for clock %d", clock.id);
+ }
+ for (auto& clock : clocks) {
+ snapshot_data.push_back(ClockReading(
+ static_cast<uint32_t>(clock.type),
+ static_cast<uint64_t>(base::FromPosixTimespec(clock.ts).count())));
+ }
+#else // OS_APPLE || OS_WIN && OS_NACL
+ auto wall_time_ns = static_cast<uint64_t>(base::GetWallTimeNs().count());
+ // The default trace clock is boot time, so we always need to emit a path to
+ // it. However since we don't actually have a boot time source on these
+ // platforms, pretend that wall time equals boot time.
+ new_snapshot_data.push_back(
+ ClockReading(protos::pbzero::BUILTIN_CLOCK_BOOTTIME, wall_time_ns));
+ new_snapshot_data.push_back(
+ ClockReading(protos::pbzero::BUILTIN_CLOCK_MONOTONIC, wall_time_ns));
+#endif
+
+ return snapshot_data;
+}
+
+} // namespace perfetto
diff --git a/src/tracing/core/virtual_destructors.cc b/src/tracing/core/virtual_destructors.cc
index 9b3d250..e8ee09c 100644
--- a/src/tracing/core/virtual_destructors.cc
+++ b/src/tracing/core/virtual_destructors.cc
@@ -34,6 +34,7 @@
TracingService::~TracingService() = default;
ConsumerEndpoint::~ConsumerEndpoint() = default;
ProducerEndpoint::~ProducerEndpoint() = default;
+RelayEndpoint::~RelayEndpoint() = default;
SharedMemory::~SharedMemory() = default;
SharedMemory::Factory::~Factory() = default;
SharedMemoryArbiter::~SharedMemoryArbiter() = default;
diff --git a/src/tracing/ipc/producer/BUILD.gn b/src/tracing/ipc/producer/BUILD.gn
index d634eee..1a57b36 100644
--- a/src/tracing/ipc/producer/BUILD.gn
+++ b/src/tracing/ipc/producer/BUILD.gn
@@ -43,3 +43,21 @@
deps += [ "../../../ipc:client" ]
}
}
+
+# Relay port client implementation, used by the relay service.
+source_set("relay") {
+ public_deps = [ "../../../../protos/perfetto/ipc" ]
+ sources = [
+ "relay_ipc_client.cc",
+ "relay_ipc_client.h",
+ ]
+ deps = [
+ "..:common",
+ "../../../../gn:default_deps",
+ ]
+ if (perfetto_component_type == "static_library") {
+ deps += [ "../../../ipc:perfetto_ipc" ]
+ } else {
+ deps += [ "../../../ipc:client" ]
+ }
+}
diff --git a/src/tracing/ipc/producer/relay_ipc_client.cc b/src/tracing/ipc/producer/relay_ipc_client.cc
new file mode 100644
index 0000000..52a6c1c
--- /dev/null
+++ b/src/tracing/ipc/producer/relay_ipc_client.cc
@@ -0,0 +1,79 @@
+/*
+ * Copyright (C) 2024 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "src/tracing/ipc/producer/relay_ipc_client.h"
+
+#include "perfetto/base/task_runner.h"
+#include "perfetto/tracing/core/forward_decls.h"
+#include "protos/perfetto/ipc/relay_port.ipc.h"
+
+namespace perfetto {
+
+RelayIPCClient::EventListener::~EventListener() = default;
+
+RelayIPCClient::RelayIPCClient(ipc::Client::ConnArgs conn_args,
+ base::WeakPtr<EventListener> listener,
+ base::TaskRunner* task_runner)
+ : listener_(std::move(listener)),
+ task_runner_(task_runner),
+ ipc_channel_(
+ ipc::Client::CreateInstance(std::move(conn_args), task_runner)),
+ relay_proxy_(new protos::gen::RelayPortProxy(this /* event_listener */)) {
+ ipc_channel_->BindService(relay_proxy_->GetWeakPtr());
+ PERFETTO_DCHECK_THREAD(thread_checker_);
+}
+
+RelayIPCClient::~RelayIPCClient() = default;
+
+void RelayIPCClient::OnConnect() {
+ PERFETTO_DCHECK_THREAD(thread_checker_);
+ connected_ = true;
+
+ if (listener_)
+ listener_->OnServiceConnected();
+}
+
+void RelayIPCClient::OnDisconnect() {
+ PERFETTO_DCHECK_THREAD(thread_checker_);
+ connected_ = false;
+
+ if (listener_)
+ listener_->OnServiceDisconnected();
+}
+
+void RelayIPCClient::SyncClock(const SyncClockRequest& sync_clock_request) {
+ PERFETTO_DCHECK_THREAD(thread_checker_);
+ if (!connected_) {
+ return task_runner_->PostTask([listener = listener_]() {
+ if (listener)
+ listener->OnServiceDisconnected();
+ });
+ }
+
+ SyncClockResponse resp;
+ ipc::Deferred<protos::gen::SyncClockResponse> async_resp;
+ async_resp.Bind(
+ [listener = listener_](ipc::AsyncResult<SyncClockResponse> resp) {
+ if (!listener)
+ return;
+ if (!resp)
+ return listener->OnServiceDisconnected();
+ listener->OnSyncClockResponse(*resp);
+ });
+ relay_proxy_->SyncClock(sync_clock_request, std::move(async_resp), -1);
+}
+
+} // namespace perfetto
diff --git a/src/tracing/ipc/producer/relay_ipc_client.h b/src/tracing/ipc/producer/relay_ipc_client.h
new file mode 100644
index 0000000..098e5c9
--- /dev/null
+++ b/src/tracing/ipc/producer/relay_ipc_client.h
@@ -0,0 +1,86 @@
+/*
+ * Copyright (C) 2024 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef SRC_TRACING_IPC_PRODUCER_RELAY_IPC_CLIENT_H_
+#define SRC_TRACING_IPC_PRODUCER_RELAY_IPC_CLIENT_H_
+
+#include "perfetto/ext/base/thread_checker.h"
+#include "perfetto/ext/base/weak_ptr.h"
+#include "perfetto/ext/ipc/client.h"
+#include "perfetto/ext/ipc/service_proxy.h"
+#include "perfetto/ext/tracing/core/tracing_service.h"
+
+#include "protos/perfetto/ipc/relay_port.ipc.h"
+
+namespace perfetto {
+
+namespace base {
+class TaskRunner;
+} // namespace base
+
+// Exposes a Service endpoint to the relay service, proxying all requests
+// through an IPC channel to the remote Service. This class is the glue layer
+// between the generic Service interface exposed to the clients of the library
+// and the actual IPC transport.
+class RelayIPCClient : public ipc::ServiceProxy::EventListener {
+ public:
+ class EventListener {
+ public:
+ virtual ~EventListener();
+ // Called when the client receives the response of the SyncClock() request.
+ virtual void OnSyncClockResponse(const SyncClockResponse&) = 0;
+ // Called when the IPC service is connected and is ready for the SyncClock()
+ // requests.
+ virtual void OnServiceConnected() = 0;
+ // Called when the IPC service is disconnected.
+ virtual void OnServiceDisconnected() = 0;
+ };
+
+ using SyncClockCallback = std::function<void(const SyncClockResponse&)>;
+ using OnDisconnectCallback = std::function<void()>;
+
+ RelayIPCClient(ipc::Client::ConnArgs,
+ base::WeakPtr<EventListener>,
+ base::TaskRunner*);
+ ~RelayIPCClient() override;
+
+ void SyncClock(const SyncClockRequest&);
+
+ // ipc::ServiceProxy::EventListener implementation.
+ // These methods are invoked by the IPC layer, which knows nothing about
+ // tracing, producers and consumers.
+ void OnConnect() override;
+ void OnDisconnect() override;
+
+ private:
+ base::WeakPtr<EventListener> listener_;
+
+ base::TaskRunner* const task_runner_;
+
+ // The object that owns the client socket and takes care of IPC traffic.
+ std::unique_ptr<ipc::Client> ipc_channel_;
+
+ // The proxy interface for the producer port of the service. It is bound
+ // to |ipc_channel_| and (de)serializes method invocations over the wire.
+ std::unique_ptr<protos::gen::RelayPortProxy> relay_proxy_;
+
+ bool connected_ = false;
+ PERFETTO_THREAD_CHECKER(thread_checker_)
+};
+
+} // namespace perfetto
+
+#endif // SRC_TRACING_IPC_PRODUCER_RELAY_IPC_CLIENT_H_
diff --git a/src/tracing/ipc/service/BUILD.gn b/src/tracing/ipc/service/BUILD.gn
index 0e102ac..171d677 100644
--- a/src/tracing/ipc/service/BUILD.gn
+++ b/src/tracing/ipc/service/BUILD.gn
@@ -29,6 +29,8 @@
"consumer_ipc_service.h",
"producer_ipc_service.cc",
"producer_ipc_service.h",
+ "relay_ipc_service.cc",
+ "relay_ipc_service.h",
"service_ipc_host_impl.cc",
"service_ipc_host_impl.h",
]
@@ -37,6 +39,7 @@
"../../../../gn:default_deps",
"../../../../protos/perfetto/ipc",
"../../../base",
+ "../../core:core",
"../../service",
]
if (perfetto_component_type == "static_library") {
diff --git a/src/tracing/ipc/service/relay_ipc_service.cc b/src/tracing/ipc/service/relay_ipc_service.cc
new file mode 100644
index 0000000..c305f3b
--- /dev/null
+++ b/src/tracing/ipc/service/relay_ipc_service.cc
@@ -0,0 +1,85 @@
+/*
+ * Copyright (C) 2024 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "src/tracing/ipc/service/relay_ipc_service.h"
+
+#include <cinttypes>
+#include <utility>
+
+#include "perfetto/base/logging.h"
+#include "perfetto/ext/ipc/service.h"
+#include "perfetto/ext/tracing/core/tracing_service.h"
+#include "perfetto/tracing/core/clock_snapshots.h"
+#include "perfetto/tracing/core/forward_decls.h"
+
+namespace perfetto {
+
+RelayIPCService::RelayIPCService(TracingService* core_service)
+ : core_service_(core_service), weak_ptr_factory_(this) {}
+
+TracingService::RelayEndpoint* RelayIPCService::GetRelayEndpoint(
+ ipc::ClientID client_id) {
+ auto* endpoint = relay_endpoints_.Find(client_id);
+ if (!endpoint)
+ return nullptr;
+ return endpoint->get();
+}
+
+void RelayIPCService::OnClientDisconnected() {
+ auto client_id = ipc::Service::client_info().client_id();
+ PERFETTO_DLOG("Relay endpoint %" PRIu64 "disconnected ", client_id);
+
+ auto* endpoint = GetRelayEndpoint(client_id);
+ if (!endpoint)
+ return;
+
+ endpoint->Disconnect();
+ relay_endpoints_.Erase(client_id);
+}
+
+void RelayIPCService::SyncClock(const protos::gen::SyncClockRequest& req,
+ DeferredSyncClockResponse resp) {
+ auto host_clock_snapshots = CaptureClockSnapshots();
+
+ // Send the response to client to reduce RTT.
+ auto async_resp = ipc::AsyncResult<protos::gen::SyncClockResponse>::Create();
+ resp.Resolve(std::move(async_resp));
+
+ ClockSnapshotVector client_clock_snapshots;
+ for (size_t i = 0; i < req.clocks().size(); i++) {
+ auto& client_clock = req.clocks()[i];
+ client_clock_snapshots.emplace_back(client_clock.clock_id(),
+ client_clock.timestamp());
+ }
+
+ // Handle the request in the core service.
+ auto machine_id = ipc::Service::client_info().machine_id();
+ auto client_id = ipc::Service::client_info().client_id();
+ auto* endpoint = GetRelayEndpoint(client_id);
+ if (!endpoint) {
+ auto ep = core_service_->ConnectRelayClient(
+ std::make_pair(machine_id, client_id));
+ endpoint = ep.get();
+ relay_endpoints_.Insert(client_id, std::move(ep));
+ }
+
+ RelayEndpoint::SyncMode mode = req.phase() == SyncClockRequest::PING
+ ? RelayEndpoint::SyncMode::PING
+ : RelayEndpoint::SyncMode::UPDATE;
+ endpoint->SyncClocks(mode, std::move(client_clock_snapshots),
+ std::move(host_clock_snapshots));
+}
+} // namespace perfetto
diff --git a/src/tracing/ipc/service/relay_ipc_service.h b/src/tracing/ipc/service/relay_ipc_service.h
new file mode 100644
index 0000000..e1139cd
--- /dev/null
+++ b/src/tracing/ipc/service/relay_ipc_service.h
@@ -0,0 +1,69 @@
+/*
+ * Copyright (C) 2024 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef SRC_TRACING_IPC_SERVICE_RELAY_IPC_SERVICE_H_
+#define SRC_TRACING_IPC_SERVICE_RELAY_IPC_SERVICE_H_
+
+#include <limits>
+#include <list>
+
+#include "perfetto/ext/base/flat_hash_map.h"
+#include "perfetto/ext/base/sys_types.h"
+#include "perfetto/ext/base/weak_ptr.h"
+#include "perfetto/ext/ipc/basic_types.h"
+#include "perfetto/ext/tracing/core/tracing_service.h"
+
+#include "protos/perfetto/ipc/relay_port.ipc.h"
+
+namespace perfetto {
+
+// Implements the RelayPort IPC service.
+class RelayIPCService : public protos::gen::RelayPort {
+ public:
+ explicit RelayIPCService(TracingService* core_service);
+ ~RelayIPCService() override = default;
+
+ void OnClientDisconnected() override;
+ void SyncClock(const protos::gen::SyncClockRequest&,
+ DeferredSyncClockResponse) override;
+
+ private:
+ TracingService* const core_service_;
+
+ using ClockSnapshots =
+ base::FlatHashMap<uint32_t, std::pair<uint64_t, uint64_t>>;
+ struct ClockSnapshotRecords {
+ base::MachineID machine_id = base::kDefaultMachineID;
+
+ // Keep track of most recent clock snapshots, ordered by local timestamps
+ // (CLOCK_BOOTTIME).
+ std::list<ClockSnapshots> clock_snapshots;
+
+ uint64_t min_rtt = std::numeric_limits<uint64_t>::max();
+ };
+
+ TracingService::RelayEndpoint* GetRelayEndpoint(ipc::ClientID);
+
+ base::FlatHashMap<ipc::ClientID,
+ std::unique_ptr<TracingService::RelayEndpoint>>
+ relay_endpoints_;
+
+ base::WeakPtrFactory<RelayIPCService> weak_ptr_factory_; // Keep last.
+};
+
+} // namespace perfetto
+
+#endif // SRC_TRACING_IPC_SERVICE_RELAY_IPC_SERVICE_H_
diff --git a/src/tracing/ipc/service/service_ipc_host_impl.cc b/src/tracing/ipc/service/service_ipc_host_impl.cc
index b566832..51cbb63 100644
--- a/src/tracing/ipc/service/service_ipc_host_impl.cc
+++ b/src/tracing/ipc/service/service_ipc_host_impl.cc
@@ -18,11 +18,11 @@
#include "perfetto/base/logging.h"
#include "perfetto/base/task_runner.h"
-#include "perfetto/ext/base/string_splitter.h"
#include "perfetto/ext/ipc/host.h"
#include "perfetto/ext/tracing/core/tracing_service.h"
#include "src/tracing/ipc/service/consumer_ipc_service.h"
#include "src/tracing/ipc/service/producer_ipc_service.h"
+#include "src/tracing/ipc/service/relay_ipc_service.h"
#if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
#include "src/tracing/ipc/shared_memory_windows.h"
@@ -126,6 +126,14 @@
bool producer_service_exposed = producer_ipc_port->ExposeService(
std::unique_ptr<ipc::Service>(new ProducerIPCService(svc_.get())));
PERFETTO_CHECK(producer_service_exposed);
+
+ if (!init_opts_.enable_relay_endpoint)
+ continue;
+ // Expose a secondary service for sync with remote relay service
+ // if requested.
+ bool relay_service_exposed = producer_ipc_port->ExposeService(
+ std::unique_ptr<ipc::Service>(new RelayIPCService(svc_.get())));
+ PERFETTO_CHECK(relay_service_exposed);
}
bool consumer_service_exposed = consumer_ipc_port_->ExposeService(
diff --git a/src/tracing/service/tracing_service_impl.cc b/src/tracing/service/tracing_service_impl.cc
index 75c50f2..a72754c 100644
--- a/src/tracing/service/tracing_service_impl.cc
+++ b/src/tracing/service/tracing_service_impl.cc
@@ -18,6 +18,7 @@
#include <errno.h>
#include <limits.h>
+#include <stdint.h>
#include <string.h>
#include <cinttypes>
@@ -27,6 +28,7 @@
#include <unordered_set>
#include "perfetto/base/time.h"
#include "perfetto/ext/tracing/core/client_identity.h"
+#include "perfetto/tracing/core/clock_snapshots.h"
#if !PERFETTO_BUILDFLAG(PERFETTO_OS_WIN) && \
!PERFETTO_BUILDFLAG(PERFETTO_OS_NACL)
@@ -88,6 +90,7 @@
#include "protos/perfetto/config/trace_config.pbzero.h"
#include "protos/perfetto/trace/clock_snapshot.pbzero.h"
#include "protos/perfetto/trace/perfetto/tracing_service_event.pbzero.h"
+#include "protos/perfetto/trace/remote_clock_sync.pbzero.h"
#include "protos/perfetto/trace/system_info.pbzero.h"
#include "protos/perfetto/trace/trace_packet.pbzero.h"
#include "protos/perfetto/trace/trace_uuid.pbzero.h"
@@ -530,6 +533,24 @@
return true;
}
+std::unique_ptr<TracingService::RelayEndpoint>
+TracingServiceImpl::ConnectRelayClient(RelayClientID relay_client_id) {
+ PERFETTO_DCHECK_THREAD(thread_checker_);
+
+ auto endpoint = std::make_unique<RelayEndpointImpl>(relay_client_id, this);
+ relay_clients_[relay_client_id] = endpoint.get();
+
+ return std::move(endpoint);
+}
+
+void TracingServiceImpl::DisconnectRelayClient(RelayClientID relay_client_id) {
+ PERFETTO_DCHECK_THREAD(thread_checker_);
+
+ if (relay_clients_.find(relay_client_id) == relay_clients_.end())
+ return;
+ relay_clients_.erase(relay_client_id);
+}
+
bool TracingServiceImpl::AttachConsumer(ConsumerEndpointImpl* consumer,
const std::string& key) {
PERFETTO_DCHECK_THREAD(thread_checker_);
@@ -2322,6 +2343,11 @@
if (!tracing_session->config.builtin_data_sources().disable_service_events())
EmitLifecycleEvents(tracing_session, &packets);
+ // In a multi-machine tracing session, emit clock synchronization messages for
+ // remote machines.
+ if (!relay_clients_.empty())
+ MaybeEmitRemoteClockSync(tracing_session, &packets);
+
size_t packets_bytes = 0; // SUM(slice.size() for each slice in |packets|).
// Add up size for packets added by the Maybe* calls above.
@@ -3271,50 +3297,7 @@
// been emitted into the trace yet (see comment below).
static constexpr int64_t kSignificantDriftNs = 10 * 1000 * 1000; // 10 ms
- TracingSession::ClockSnapshotData new_snapshot_data;
-
-#if !PERFETTO_BUILDFLAG(PERFETTO_OS_APPLE) && \
- !PERFETTO_BUILDFLAG(PERFETTO_OS_WIN) && \
- !PERFETTO_BUILDFLAG(PERFETTO_OS_NACL)
- struct {
- clockid_t id;
- protos::pbzero::BuiltinClock type;
- struct timespec ts;
- } clocks[] = {
- {CLOCK_BOOTTIME, protos::pbzero::BUILTIN_CLOCK_BOOTTIME, {0, 0}},
- {CLOCK_REALTIME_COARSE,
- protos::pbzero::BUILTIN_CLOCK_REALTIME_COARSE,
- {0, 0}},
- {CLOCK_MONOTONIC_COARSE,
- protos::pbzero::BUILTIN_CLOCK_MONOTONIC_COARSE,
- {0, 0}},
- {CLOCK_REALTIME, protos::pbzero::BUILTIN_CLOCK_REALTIME, {0, 0}},
- {CLOCK_MONOTONIC, protos::pbzero::BUILTIN_CLOCK_MONOTONIC, {0, 0}},
- {CLOCK_MONOTONIC_RAW,
- protos::pbzero::BUILTIN_CLOCK_MONOTONIC_RAW,
- {0, 0}},
- };
- // First snapshot all the clocks as atomically as we can.
- for (auto& clock : clocks) {
- if (clock_gettime(clock.id, &clock.ts) == -1)
- PERFETTO_DLOG("clock_gettime failed for clock %d", clock.id);
- }
- for (auto& clock : clocks) {
- new_snapshot_data.push_back(std::make_pair(
- static_cast<uint32_t>(clock.type),
- static_cast<uint64_t>(base::FromPosixTimespec(clock.ts).count())));
- }
-#else // OS_APPLE || OS_WIN && OS_NACL
- auto wall_time_ns = static_cast<uint64_t>(base::GetWallTimeNs().count());
- // The default trace clock is boot time, so we always need to emit a path to
- // it. However since we don't actually have a boot time source on these
- // platforms, pretend that wall time equals boot time.
- new_snapshot_data.push_back(
- std::make_pair(protos::pbzero::BUILTIN_CLOCK_BOOTTIME, wall_time_ns));
- new_snapshot_data.push_back(
- std::make_pair(protos::pbzero::BUILTIN_CLOCK_MONOTONIC, wall_time_ns));
-#endif
-
+ TracingSession::ClockSnapshotData new_snapshot_data = CaptureClockSnapshots();
// If we're about to update a session's latest clock snapshot that hasn't been
// emitted into the trace yet, check whether the clocks have drifted enough to
// warrant overriding the current snapshot values. The older snapshot would be
@@ -3324,18 +3307,18 @@
// we try to keep it if we can.
if (!snapshot_data->empty()) {
PERFETTO_DCHECK(snapshot_data->size() == new_snapshot_data.size());
- PERFETTO_DCHECK((*snapshot_data)[0].first ==
+ PERFETTO_DCHECK((*snapshot_data)[0].clock_id ==
protos::gen::BUILTIN_CLOCK_BOOTTIME);
bool update_snapshot = false;
- uint64_t old_boot_ns = (*snapshot_data)[0].second;
- uint64_t new_boot_ns = new_snapshot_data[0].second;
+ uint64_t old_boot_ns = (*snapshot_data)[0].timestamp;
+ uint64_t new_boot_ns = new_snapshot_data[0].timestamp;
int64_t boot_diff =
static_cast<int64_t>(new_boot_ns) - static_cast<int64_t>(old_boot_ns);
for (size_t i = 1; i < snapshot_data->size(); i++) {
- uint64_t old_ns = (*snapshot_data)[i].second;
- uint64_t new_ns = new_snapshot_data[i].second;
+ uint64_t old_ns = (*snapshot_data)[i].timestamp;
+ uint64_t new_ns = new_snapshot_data[i].timestamp;
int64_t diff =
static_cast<int64_t>(new_ns) - static_cast<int64_t>(old_ns);
@@ -3374,8 +3357,8 @@
for (auto& clock_id_and_ts : snapshot_data) {
auto* c = snapshot->add_clocks();
- c->set_clock_id(clock_id_and_ts.first);
- c->set_timestamp(clock_id_and_ts.second);
+ c->set_clock_id(clock_id_and_ts.clock_id);
+ c->set_timestamp(clock_id_and_ts.timestamp);
}
packet->set_trusted_uid(static_cast<int32_t>(uid_));
@@ -3594,6 +3577,57 @@
SerializeAndAppendPacket(packets, std::move(pair.second));
}
+void TracingServiceImpl::MaybeEmitRemoteClockSync(
+ TracingSession* tracing_session,
+ std::vector<TracePacket>* packets) {
+ if (tracing_session->did_emit_remote_clock_sync_)
+ return;
+
+ std::unordered_set<MachineID> did_emit_machines;
+ for (const auto& id_and_relay_client : relay_clients_) {
+ const auto& relay_client = id_and_relay_client.second;
+ auto machine_id = relay_client->machine_id();
+ if (did_emit_machines.find(machine_id) != did_emit_machines.end())
+ continue; // Already emitted for the machine (e.g. multiple clients).
+
+ auto& sync_clock_snapshots = relay_client->synced_clocks();
+ if (sync_clock_snapshots.empty()) {
+ PERFETTO_DLOG("Clock not synchronized for machine ID = %" PRIu32,
+ machine_id);
+ continue;
+ }
+
+ // Don't emit twice for the same machine.
+ did_emit_machines.insert(machine_id);
+
+ protozero::HeapBuffered<protos::pbzero::TracePacket> sync_packet;
+ sync_packet->set_machine_id(machine_id);
+ sync_packet->set_trusted_uid(static_cast<int32_t>(uid_));
+ auto* remote_clock_sync = sync_packet->set_remote_clock_sync();
+ for (const auto& sync_exchange : relay_client->synced_clocks()) {
+ auto* sync_exchange_msg = remote_clock_sync->add_synced_clocks();
+
+ auto* client_snapshots = sync_exchange_msg->set_client_clocks();
+ for (auto& client_clock : sync_exchange.client_clocks) {
+ auto* clock = client_snapshots->add_clocks();
+ clock->set_clock_id(client_clock.clock_id);
+ clock->set_timestamp(client_clock.timestamp);
+ }
+
+ auto* host_snapshots = sync_exchange_msg->set_host_clocks();
+ for (auto& host_clock : sync_exchange.host_clocks) {
+ auto* clock = host_snapshots->add_clocks();
+ clock->set_clock_id(host_clock.clock_id);
+ clock->set_timestamp(host_clock.timestamp);
+ }
+ }
+
+ SerializeAndAppendPacket(packets, sync_packet.SerializeAsArray());
+ }
+
+ tracing_session->did_emit_remote_clock_sync_ = true;
+}
+
void TracingServiceImpl::MaybeEmitReceivedTriggers(
TracingSession* tracing_session,
std::vector<TracePacket>* packets) {
@@ -4280,8 +4314,8 @@
if (s.config.has_bugreport_filename())
session->set_bugreport_filename(s.config.bugreport_filename());
for (const auto& snap_kv : s.initial_clock_snapshot) {
- if (snap_kv.first == protos::pbzero::BUILTIN_CLOCK_REALTIME)
- session->set_start_realtime_ns(static_cast<int64_t>(snap_kv.second));
+ if (snap_kv.clock_id == protos::pbzero::BUILTIN_CLOCK_REALTIME)
+ session->set_start_realtime_ns(static_cast<int64_t>(snap_kv.timestamp));
}
for (const auto& buf : s.config.buffers())
session->add_buffer_size_kb(buf.size_kb());
@@ -4687,4 +4721,30 @@
64 /* max_size */);
}
+////////////////////////////////////////////////////////////////////////////////
+// TracingServiceImpl::RelayEndpointImpl implementation
+////////////////////////////////////////////////////////////////////////////////
+TracingServiceImpl::RelayEndpointImpl::RelayEndpointImpl(
+ RelayClientID relay_client_id,
+ TracingServiceImpl* service)
+ : relay_client_id_(relay_client_id), service_(service) {}
+TracingServiceImpl::RelayEndpointImpl::~RelayEndpointImpl() = default;
+
+void TracingServiceImpl::RelayEndpointImpl::SyncClocks(
+ SyncMode sync_mode,
+ ClockSnapshotVector client_clocks,
+ ClockSnapshotVector host_clocks) {
+ // We keep only the most recent 5 clock sync snapshots.
+ static constexpr size_t kNumSyncClocks = 5;
+ if (synced_clocks_.size() >= kNumSyncClocks)
+ synced_clocks_.pop_front();
+
+ synced_clocks_.emplace_back(sync_mode, std::move(client_clocks),
+ std::move(host_clocks));
+}
+
+void TracingServiceImpl::RelayEndpointImpl::Disconnect() {
+ service_->DisconnectRelayClient(relay_client_id_);
+}
+
} // namespace perfetto
diff --git a/src/tracing/service/tracing_service_impl.h b/src/tracing/service/tracing_service_impl.h
index 3a5eb08..5a583ba 100644
--- a/src/tracing/service/tracing_service_impl.h
+++ b/src/tracing/service/tracing_service_impl.h
@@ -268,6 +268,47 @@
base::WeakPtrFactory<ConsumerEndpointImpl> weak_ptr_factory_; // Keep last.
};
+ class RelayEndpointImpl : public TracingService::RelayEndpoint {
+ public:
+ using SyncMode = RelayEndpoint::SyncMode;
+
+ struct SyncedClockSnapshots {
+ SyncedClockSnapshots(SyncMode _sync_mode,
+ ClockSnapshotVector _client_clocks,
+ ClockSnapshotVector _host_clocks)
+ : sync_mode(_sync_mode),
+ client_clocks(std::move(_client_clocks)),
+ host_clocks(std::move(_host_clocks)) {}
+ SyncMode sync_mode;
+ ClockSnapshotVector client_clocks;
+ ClockSnapshotVector host_clocks;
+ };
+
+ explicit RelayEndpointImpl(RelayClientID relay_client_id,
+ TracingServiceImpl* service);
+ ~RelayEndpointImpl() override;
+ void SyncClocks(SyncMode sync_mode,
+ ClockSnapshotVector client_clocks,
+ ClockSnapshotVector host_clocks) override;
+ void Disconnect() override;
+
+ MachineID machine_id() const { return relay_client_id_.first; }
+
+ base::CircularQueue<SyncedClockSnapshots>& synced_clocks() {
+ return synced_clocks_;
+ }
+
+ private:
+ RelayEndpointImpl(const RelayEndpointImpl&) = delete;
+ RelayEndpointImpl& operator=(const RelayEndpointImpl&) = delete;
+
+ RelayClientID relay_client_id_;
+ TracingServiceImpl* const service_;
+ base::CircularQueue<SyncedClockSnapshots> synced_clocks_;
+
+ PERFETTO_THREAD_CHECKER(thread_checker_)
+ };
+
explicit TracingServiceImpl(std::unique_ptr<SharedMemory::Factory>,
base::TaskRunner*,
InitOpts = {});
@@ -358,6 +399,11 @@
Consumer*,
uid_t) override;
+ std::unique_ptr<TracingService::RelayEndpoint> ConnectRelayClient(
+ RelayClientID) override;
+
+ void DisconnectRelayClient(RelayClientID);
+
// Set whether SMB scraping should be enabled by default or not. Producers can
// override this setting for their own SMBs.
void SetSMBScrapingEnabled(bool enabled) override {
@@ -590,6 +636,9 @@
// etc.) into the trace output yet.
bool did_emit_initial_packets = false;
+ // Whether we emitted clock offsets for relay clients yet.
+ bool did_emit_remote_clock_sync_ = false;
+
// Whether we should compress TracePackets after reading them.
bool compress_deflate = false;
@@ -631,8 +680,7 @@
};
std::vector<LifecycleEvent> lifecycle_events;
- using ClockSnapshotData =
- std::vector<std::pair<uint32_t /*clock_id*/, uint64_t /*ts*/>>;
+ using ClockSnapshotData = ClockSnapshotVector;
// Initial clock snapshot, captured at trace start time (when state goes to
// TracingSession::STARTED). Emitted into the trace when the consumer first
@@ -741,6 +789,7 @@
void MaybeEmitTraceConfig(TracingSession*, std::vector<TracePacket>*);
void EmitSystemInfo(std::vector<TracePacket>*);
void MaybeEmitReceivedTriggers(TracingSession*, std::vector<TracePacket>*);
+ void MaybeEmitRemoteClockSync(TracingSession*, std::vector<TracePacket>*);
void MaybeNotifyAllDataSourcesStarted(TracingSession*);
void OnFlushTimeout(TracingSessionID, FlushRequestID);
void OnDisableTracingTimeout(TracingSessionID);
@@ -834,6 +883,7 @@
std::multimap<std::string /*name*/, RegisteredDataSource> data_sources_;
std::map<ProducerID, ProducerEndpointImpl*> producers_;
std::set<ConsumerEndpointImpl*> consumers_;
+ std::map<RelayClientID, RelayEndpointImpl*> relay_clients_;
std::map<TracingSessionID, TracingSession> tracing_sessions_;
std::map<BufferID, std::unique_ptr<TraceBuffer>> buffers_;
std::map<std::string, int64_t> session_to_last_trace_s_;
diff --git a/src/tracing/service/tracing_service_impl_unittest.cc b/src/tracing/service/tracing_service_impl_unittest.cc
index ab6770f..8193bb6 100644
--- a/src/tracing/service/tracing_service_impl_unittest.cc
+++ b/src/tracing/service/tracing_service_impl_unittest.cc
@@ -31,6 +31,10 @@
#include "perfetto/ext/tracing/core/trace_writer.h"
#include "perfetto/tracing/core/data_source_config.h"
#include "perfetto/tracing/core/data_source_descriptor.h"
+#include "perfetto/tracing/core/forward_decls.h"
+#include "protos/perfetto/common/builtin_clock.gen.h"
+#include "protos/perfetto/trace/clock_snapshot.gen.h"
+#include "protos/perfetto/trace/remote_clock_sync.gen.h"
#include "src/base/test/test_task_runner.h"
#include "src/protozero/filtering/filter_bytecode_generator.h"
#include "src/tracing/core/shared_memory_arbiter_impl.h"
@@ -5624,4 +5628,130 @@
task_runner.RunUntilIdle();
}
+TEST_F(TracingServiceImplTest, RelayEndpointClockSync) {
+ std::unique_ptr<MockConsumer> consumer = CreateMockConsumer();
+ consumer->Connect(svc.get());
+
+ std::unique_ptr<MockProducer> producer = CreateMockProducer();
+ producer->Connect(svc.get(), "mock_producer");
+
+ auto relay_client = svc->ConnectRelayClient(
+ std::make_pair<uint32_t, uint64_t>(/*base::MachineID=*/0x103, 1));
+
+ uint32_t clock_id =
+ static_cast<uint32_t>(protos::gen::BuiltinClock::BUILTIN_CLOCK_BOOTTIME);
+
+ relay_client->SyncClocks(RelayEndpoint::SyncMode::PING,
+ /*client_clocks=*/{{clock_id, 100}},
+ /*host_clocks=*/{{clock_id, 1000}});
+ relay_client->SyncClocks(RelayEndpoint::SyncMode::UPDATE,
+ /*client_clocks=*/{{clock_id, 300}},
+ /*host_clocks=*/{{clock_id, 1200}});
+
+ producer->RegisterDataSource("ds");
+
+ TraceConfig trace_config;
+ trace_config.add_buffers()->set_size_kb(128);
+ auto* ds_cfg = trace_config.add_data_sources()->mutable_config();
+ ds_cfg->set_name("ds");
+
+ consumer->EnableTracing(trace_config);
+ producer->WaitForTracingSetup();
+ producer->WaitForDataSourceSetup("ds");
+ producer->WaitForDataSourceStart("ds");
+
+ auto writer1 = producer->CreateTraceWriter("ds");
+
+ consumer->DisableTracing();
+ producer->WaitForDataSourceStop("ds");
+ consumer->WaitForTracingDisabled();
+
+ task_runner.RunUntilIdle();
+
+ auto trace_packets = consumer->ReadBuffers();
+ bool clock_sync_packet_seen = false;
+ for (auto& packet : trace_packets) {
+ if (!packet.has_remote_clock_sync())
+ continue;
+ clock_sync_packet_seen = true;
+
+ auto& remote_clock_sync = packet.remote_clock_sync();
+ ASSERT_EQ(remote_clock_sync.synced_clocks_size(), 2);
+
+ auto& snapshots = remote_clock_sync.synced_clocks();
+ ASSERT_TRUE(snapshots[0].has_client_clocks());
+ auto* snapshot = &snapshots[0].client_clocks();
+ ASSERT_EQ(snapshot->clocks_size(), 1);
+ ASSERT_EQ(snapshot->clocks()[0].clock_id(), clock_id);
+ ASSERT_EQ(snapshot->clocks()[0].timestamp(), 100u);
+
+ snapshot = &snapshots[0].host_clocks();
+ ASSERT_EQ(snapshot->clocks_size(), 1);
+ ASSERT_EQ(snapshot->clocks()[0].clock_id(), clock_id);
+ ASSERT_EQ(snapshot->clocks()[0].timestamp(), 1000u);
+
+ snapshot = &snapshots[1].client_clocks();
+ ASSERT_EQ(snapshot->clocks_size(), 1);
+ ASSERT_EQ(snapshot->clocks()[0].clock_id(), clock_id);
+ ASSERT_EQ(snapshot->clocks()[0].timestamp(), 300u);
+
+ snapshot = &snapshots[1].host_clocks();
+ ASSERT_EQ(snapshot->clocks_size(), 1);
+ ASSERT_EQ(snapshot->clocks()[0].clock_id(), clock_id);
+ ASSERT_EQ(snapshot->clocks()[0].timestamp(), 1200u);
+ }
+ ASSERT_TRUE(clock_sync_packet_seen);
+}
+
+TEST_F(TracingServiceImplTest, RelayEndpointDisconnect) {
+ std::unique_ptr<MockConsumer> consumer = CreateMockConsumer();
+ consumer->Connect(svc.get());
+
+ std::unique_ptr<MockProducer> producer = CreateMockProducer();
+ producer->Connect(svc.get(), "mock_producer");
+
+ auto relay_client = svc->ConnectRelayClient(
+ std::make_pair<uint32_t, uint64_t>(/*base::MachineID=*/0x103, 1));
+ uint32_t clock_id =
+ static_cast<uint32_t>(protos::gen::BuiltinClock::BUILTIN_CLOCK_BOOTTIME);
+
+ relay_client->SyncClocks(RelayEndpoint::SyncMode::PING,
+ /*client_clocks=*/{{clock_id, 100}},
+ /*host_clocks=*/{{clock_id, 1000}});
+ relay_client->SyncClocks(RelayEndpoint::SyncMode::UPDATE,
+ /*client_clocks=*/{{clock_id, 300}},
+ /*host_clocks=*/{{clock_id, 1200}});
+
+ relay_client->Disconnect();
+
+ producer->RegisterDataSource("ds");
+
+ TraceConfig trace_config;
+ trace_config.add_buffers()->set_size_kb(128);
+ auto* ds_cfg = trace_config.add_data_sources()->mutable_config();
+ ds_cfg->set_name("ds");
+
+ consumer->EnableTracing(trace_config);
+ producer->WaitForTracingSetup();
+ producer->WaitForDataSourceSetup("ds");
+ producer->WaitForDataSourceStart("ds");
+
+ auto writer1 = producer->CreateTraceWriter("ds");
+
+ consumer->DisableTracing();
+ producer->WaitForDataSourceStop("ds");
+ consumer->WaitForTracingDisabled();
+
+ task_runner.RunUntilIdle();
+
+ auto trace_packets = consumer->ReadBuffers();
+ bool clock_sync_packet_seen = false;
+ for (auto& packet : trace_packets) {
+ if (!packet.has_remote_clock_sync())
+ continue;
+ clock_sync_packet_seen = true;
+ }
+ ASSERT_FALSE(clock_sync_packet_seen);
+}
+
} // namespace perfetto
diff --git a/test/test_helper.cc b/test/test_helper.cc
index 155a3aa..85e6829 100644
--- a/test/test_helper.cc
+++ b/test/test_helper.cc
@@ -69,13 +69,16 @@
TestHelper::TestHelper(base::TestTaskRunner* task_runner,
Mode mode,
- const char* producer_socket)
+ const char* producer_socket,
+ bool enable_relay_endpoint)
: instance_num_(next_instance_num_++),
task_runner_(task_runner),
mode_(mode),
producer_socket_(producer_socket),
consumer_socket_(ConsumerSocketForMode(mode)),
- service_thread_(producer_socket_, consumer_socket_) {
+ service_thread_(producer_socket_,
+ consumer_socket_,
+ enable_relay_endpoint) {
auto producer_sockets = TokenizeProducerSockets(producer_socket_);
static constexpr const char* kDefaultFakeProducerName =
"android.perfetto.FakeProducer";
diff --git a/test/test_helper.h b/test/test_helper.h
index 462861d..589cdc2 100644
--- a/test/test_helper.h
+++ b/test/test_helper.h
@@ -30,6 +30,7 @@
#include "perfetto/ext/tracing/core/consumer.h"
#include "perfetto/ext/tracing/core/shared_memory_arbiter.h"
#include "perfetto/ext/tracing/core/trace_packet.h"
+#include "perfetto/ext/tracing/core/tracing_service.h"
#include "perfetto/ext/tracing/ipc/consumer_ipc_client.h"
#include "perfetto/ext/tracing/ipc/service_ipc_host.h"
#include "perfetto/tracing/core/trace_config.h"
@@ -118,8 +119,11 @@
class ServiceThread {
public:
ServiceThread(const std::string& producer_socket,
- const std::string& consumer_socket)
- : producer_socket_(producer_socket), consumer_socket_(consumer_socket) {}
+ const std::string& consumer_socket,
+ bool enable_relay_endpoint = false)
+ : producer_socket_(producer_socket),
+ consumer_socket_(consumer_socket),
+ enable_relay_endpoint_(enable_relay_endpoint) {}
~ServiceThread() { Stop(); }
@@ -128,7 +132,10 @@
{"PERFETTO_PRODUCER_SOCK_NAME", "PERFETTO_CONSUMER_SOCK_NAME"});
runner_ = base::ThreadTaskRunner::CreateAndStart("perfetto.svc");
runner_->PostTaskAndWaitForTesting([this]() {
- svc_ = ServiceIPCHost::CreateInstance(runner_->get());
+ TracingService::InitOpts init_opts = {};
+ if (enable_relay_endpoint_)
+ init_opts.enable_relay_endpoint = true;
+ svc_ = ServiceIPCHost::CreateInstance(runner_->get(), init_opts);
auto producer_sockets = TokenizeProducerSockets(producer_socket_.c_str());
for (const auto& producer_socket : producer_sockets) {
// In some cases the socket is a TCP or abstract unix.
@@ -169,6 +176,7 @@
std::string producer_socket_;
std::string consumer_socket_;
+ bool enable_relay_endpoint_ = false;
std::unique_ptr<ServiceIPCHost> svc_;
};
@@ -289,7 +297,8 @@
explicit TestHelper(base::TestTaskRunner* task_runner,
Mode mode,
- const char* producer_socket);
+ const char* producer_socket,
+ bool enable_relay_endpoint = false);
// Consumer implementation.
void OnConnect() override;