tp: support remote clock sync

Implement injestion of the RemoteClockSync message for multi-machine
tracing sessions. A RemoteClockSync message consists of multiple
SyncedClocks messages, each of which contains ClockSnapshot taken on the
remote client and host sides:

```
  remote_clock_sync {
    synced_clocks {
      client_clocks {
        clocks {
          clock_id: 6 # CLOCK_BOOTTIME in the client.
          timestamp: 10000
        }
        # Other clocks domains
      }
      host_clocks {
        clocks {
          clock_id: 6 # CLOCK_BOOTTIME in the host
          timestamp: 111000
        }
      }
    }
    # more synced_clocks {}
  }
```

A round of clock synchronization consists of 4 snapshots taken in the
order of client, host, client, host. The process is repeated with an
interval of 30 seconds.

The proto trace reader injests this message to make an estimation of
clock offset between client and host for converting timestamps from
client to the host clock domains. The trace processor takes an average
of the estimated offset in the RemoteClockSync message to provide a more
stable and predictable offset value. For more details, refer to section
"Clock Synchronization" of go/crosetto-vm-tracing.

Bug: 347623660
Change-Id: Ide96e8fb8101b76464416effe971f57ac3c33374
diff --git a/Android.bp b/Android.bp
index fe5dbfa..333828d 100644
--- a/Android.bp
+++ b/Android.bp
@@ -12591,6 +12591,7 @@
         "src/trace_processor/importers/proto/perf_sample_tracker_unittest.cc",
         "src/trace_processor/importers/proto/profile_packet_sequence_state_unittest.cc",
         "src/trace_processor/importers/proto/proto_trace_parser_impl_unittest.cc",
+        "src/trace_processor/importers/proto/proto_trace_reader_unittest.cc",
         "src/trace_processor/importers/proto/proto_trace_tokenizer_unittest.cc",
         "src/trace_processor/importers/proto/string_encoding_utils_unittests.cc",
     ],
diff --git a/src/trace_processor/importers/common/clock_tracker.h b/src/trace_processor/importers/common/clock_tracker.h
index 117e0e1..babfca6 100644
--- a/src/trace_processor/importers/common/clock_tracker.h
+++ b/src/trace_processor/importers/common/clock_tracker.h
@@ -28,9 +28,12 @@
 #include <vector>
 
 #include "perfetto/base/logging.h"
+#include "perfetto/ext/base/flat_hash_map.h"
 #include "perfetto/ext/base/status_or.h"
+#include "perfetto/public/compiler.h"
 #include "src/trace_processor/importers/common/metadata_tracker.h"
 #include "src/trace_processor/types/trace_processor_context.h"
+#include "src/trace_processor/util/status_macros.h"
 
 namespace perfetto {
 namespace trace_processor {
@@ -159,6 +162,27 @@
   // Returns the internal snapshot id of this set of clocks.
   base::StatusOr<uint32_t> AddSnapshot(const std::vector<ClockTimestamp>&);
 
+  // Sets clock offset for the given clock domain to convert to the host trace
+  // time. This is typically called by the code that reads the RemoteClockSync
+  // packet. Typically only the offset of |trace_time_clock_id_| (which is
+  // CLOCK_BOOTTIME) is used.
+  void SetClockOffset(ClockId clock_id, int64_t offset) {
+    clock_offsets_[clock_id] = offset;
+  }
+
+  // Apply the clock offset to convert remote trace times to host trace time.
+  int64_t ToHostTraceTime(int64_t timestamp) {
+    if (PERFETTO_LIKELY(!context_->machine_id())) {
+      // No need to convert host timestamps.
+      return timestamp;
+    }
+
+    // Find the offset for |trace_time_clock_id_| and apply the offset, or
+    // default offset 0 if not offset is found for |trace_time_clock_id_|.
+    int64_t clock_offset = clock_offsets_[trace_time_clock_id_];
+    return timestamp - clock_offset;
+  }
+
   base::StatusOr<int64_t> ToTraceTime(ClockId clock_id, int64_t timestamp) {
     if (PERFETTO_UNLIKELY(!trace_time_clock_id_used_for_conversion_)) {
       context_->metadata_tracker->SetMetadata(
@@ -167,9 +191,13 @@
       trace_time_clock_id_used_for_conversion_ = true;
     }
     trace_time_clock_id_used_for_conversion_ = true;
+
     if (clock_id == trace_time_clock_id_)
-      return timestamp;
-    return Convert(clock_id, timestamp, trace_time_clock_id_);
+      return ToHostTraceTime(timestamp);
+
+    ASSIGN_OR_RETURN(int64_t ts,
+                     Convert(clock_id, timestamp, trace_time_clock_id_));
+    return ToHostTraceTime(ts);
   }
 
   // If trace clock and source clock are available in the snapshot will return
@@ -196,6 +224,10 @@
     cache_lookups_disabled_for_testing_ = v;
   }
 
+  const base::FlatHashMap<ClockId, int64_t>& clock_offsets_for_testing() {
+    return clock_offsets_;
+  }
+
  private:
   using SnapshotHash = uint32_t;
 
@@ -339,6 +371,7 @@
   std::minstd_rand rnd_;  // For cache eviction.
   uint32_t cur_snapshot_id_ = 0;
   bool trace_time_clock_id_used_for_conversion_ = false;
+  base::FlatHashMap<ClockId, int64_t> clock_offsets_;
 };
 
 }  // namespace trace_processor
diff --git a/src/trace_processor/importers/common/clock_tracker_unittest.cc b/src/trace_processor/importers/common/clock_tracker_unittest.cc
index 51bd020..f94c58a 100644
--- a/src/trace_processor/importers/common/clock_tracker_unittest.cc
+++ b/src/trace_processor/importers/common/clock_tracker_unittest.cc
@@ -19,6 +19,7 @@
 #include <optional>
 #include <random>
 
+#include "src/trace_processor/importers/common/machine_tracker.h"
 #include "src/trace_processor/importers/common/metadata_tracker.h"
 #include "src/trace_processor/storage/trace_storage.h"
 #include "src/trace_processor/types/trace_processor_context.h"
@@ -303,6 +304,119 @@
   }
 }
 
+// Test clock conversion with offset to the host.
+TEST_F(ClockTrackerTest, ClockOffset) {
+  EXPECT_FALSE(ct_.ToTraceTime(REALTIME, 0).ok());
+
+  context_.machine_tracker =
+      std::make_unique<MachineTracker>(&context_, 0x1001);
+
+  // Client-to-host BOOTTIME offset is -10000 ns.
+  ct_.SetClockOffset(BOOTTIME, -10000);
+
+  ct_.AddSnapshot({{REALTIME, 10}, {BOOTTIME, 10010}});
+  ct_.AddSnapshot({{REALTIME, 20}, {BOOTTIME, 20220}});
+  ct_.AddSnapshot({{REALTIME, 30}, {BOOTTIME, 30030}});
+  ct_.AddSnapshot({{MONOTONIC, 1000}, {BOOTTIME, 100000}});
+
+  auto seq_clock_1 = ct_.SequenceToGlobalClock(1, 64);
+  auto seq_clock_2 = ct_.SequenceToGlobalClock(2, 64);
+  ct_.AddSnapshot({{MONOTONIC, 2000}, {seq_clock_1, 1200}});
+  ct_.AddSnapshot({{seq_clock_1, 1300}, {seq_clock_2, 2000, 10, false}});
+
+  EXPECT_EQ(*ct_.ToTraceTime(REALTIME, 0), 20000);
+  EXPECT_EQ(*ct_.ToTraceTime(REALTIME, 1), 20001);
+  EXPECT_EQ(*ct_.ToTraceTime(REALTIME, 9), 20009);
+  EXPECT_EQ(*ct_.ToTraceTime(REALTIME, 10), 20010);
+  EXPECT_EQ(*ct_.ToTraceTime(REALTIME, 11), 20011);
+  EXPECT_EQ(*ct_.ToTraceTime(REALTIME, 19), 20019);
+  EXPECT_EQ(*ct_.ToTraceTime(REALTIME, 20), 30220);
+  EXPECT_EQ(*ct_.ToTraceTime(REALTIME, 21), 30221);
+  EXPECT_EQ(*ct_.ToTraceTime(REALTIME, 29), 30229);
+  EXPECT_EQ(*ct_.ToTraceTime(REALTIME, 30), 40030);
+  EXPECT_EQ(*ct_.ToTraceTime(REALTIME, 40), 40040);
+
+  EXPECT_EQ(*ct_.ToTraceTime(MONOTONIC, 0), 100000 - 1000 + 10000);
+  EXPECT_EQ(*ct_.ToTraceTime(MONOTONIC, 999), 100000 - 1 + 10000);
+  EXPECT_EQ(*ct_.ToTraceTime(MONOTONIC, 1000), 100000 + 10000);
+  EXPECT_EQ(*ct_.ToTraceTime(MONOTONIC, 1e6),
+            static_cast<int64_t>(100000 - 1000 + 1e6 + 10000));
+
+  // seq_clock_1 -> MONOTONIC -> BOOTTIME -> apply offset.
+  EXPECT_EQ(*ct_.ToTraceTime(seq_clock_1, 1100), -100 + 1000 + 100000 + 10000);
+  // seq_clock_2 -> seq_clock_1 -> MONOTONIC -> BOOTTIME -> apply offset.
+  EXPECT_EQ(*ct_.ToTraceTime(seq_clock_2, 2100),
+            100 * 10 + 100 + 1000 + 100000 + 10000);
+}
+
+// Test conversion of remote machine timestamps without offset. This can happen
+// if timestamp conversion for remote machines is done by trace data
+// post-processing.
+TEST_F(ClockTrackerTest, RemoteNoClockOffset) {
+  context_.machine_tracker =
+      std::make_unique<MachineTracker>(&context_, 0x1001);
+
+  ct_.AddSnapshot({{REALTIME, 10}, {BOOTTIME, 10010}});
+  ct_.AddSnapshot({{REALTIME, 20}, {BOOTTIME, 20220}});
+  ct_.AddSnapshot({{MONOTONIC, 1000}, {BOOTTIME, 100000}});
+
+  auto seq_clock_1 = ct_.SequenceToGlobalClock(1, 64);
+  auto seq_clock_2 = ct_.SequenceToGlobalClock(2, 64);
+  ct_.AddSnapshot({{MONOTONIC, 2000}, {seq_clock_1, 1200}});
+  ct_.AddSnapshot({{seq_clock_1, 1300}, {seq_clock_2, 2000, 10, false}});
+
+  EXPECT_EQ(*ct_.ToTraceTime(REALTIME, 0), 10000);
+  EXPECT_EQ(*ct_.ToTraceTime(REALTIME, 9), 10009);
+  EXPECT_EQ(*ct_.ToTraceTime(REALTIME, 10), 10010);
+  EXPECT_EQ(*ct_.ToTraceTime(REALTIME, 11), 10011);
+  EXPECT_EQ(*ct_.ToTraceTime(REALTIME, 19), 10019);
+  EXPECT_EQ(*ct_.ToTraceTime(REALTIME, 20), 20220);
+  EXPECT_EQ(*ct_.ToTraceTime(REALTIME, 21), 20221);
+
+  EXPECT_EQ(*ct_.ToTraceTime(MONOTONIC, 0), 100000 - 1000);
+  EXPECT_EQ(*ct_.ToTraceTime(MONOTONIC, 999), 100000 - 1);
+  EXPECT_EQ(*ct_.ToTraceTime(MONOTONIC, 1000), 100000);
+  EXPECT_EQ(*ct_.ToTraceTime(MONOTONIC, 1e6),
+            static_cast<int64_t>(100000 - 1000 + 1e6));
+
+  // seq_clock_1 -> MONOTONIC -> BOOTTIME.
+  EXPECT_EQ(*ct_.ToTraceTime(seq_clock_1, 1100), -100 + 1000 + 100000);
+  // seq_clock_2 -> seq_clock_1 -> MONOTONIC -> BOOTTIME.
+  EXPECT_EQ(*ct_.ToTraceTime(seq_clock_2, 2100),
+            100 * 10 + 100 + 1000 + 100000);
+}
+
+// Test clock offset of non-defualt trace time clock domain.
+TEST_F(ClockTrackerTest, NonDefaultTraceTimeClock) {
+  context_.machine_tracker =
+      std::make_unique<MachineTracker>(&context_, 0x1001);
+
+  ct_.SetTraceTimeClock(MONOTONIC);
+  ct_.SetClockOffset(MONOTONIC, -2000);
+  ct_.SetClockOffset(BOOTTIME, -10000);  // This doesn't take effect.
+
+  ct_.AddSnapshot({{REALTIME, 10}, {BOOTTIME, 10010}});
+  ct_.AddSnapshot({{MONOTONIC, 1000}, {BOOTTIME, 100000}});
+
+  auto seq_clock_1 = ct_.SequenceToGlobalClock(1, 64);
+  ct_.AddSnapshot({{MONOTONIC, 2000}, {seq_clock_1, 1200}});
+
+  int64_t realtime_to_trace_time_delta = -10 + 10010 - 100000 + 1000 - (-2000);
+  EXPECT_EQ(*ct_.ToTraceTime(REALTIME, 9), 9 + realtime_to_trace_time_delta);
+  EXPECT_EQ(*ct_.ToTraceTime(REALTIME, 10), 10 + realtime_to_trace_time_delta);
+  EXPECT_EQ(*ct_.ToTraceTime(REALTIME, 20), 20 + realtime_to_trace_time_delta);
+
+  int64_t mono_to_trace_time_delta = -2000;
+  EXPECT_EQ(*ct_.ToTraceTime(MONOTONIC, 0), 0 - mono_to_trace_time_delta);
+  EXPECT_EQ(*ct_.ToTraceTime(MONOTONIC, 999), 999 - mono_to_trace_time_delta);
+  EXPECT_EQ(*ct_.ToTraceTime(MONOTONIC, 1000), 1000 - mono_to_trace_time_delta);
+  EXPECT_EQ(*ct_.ToTraceTime(MONOTONIC, 1e6),
+            static_cast<int64_t>(1e6) - mono_to_trace_time_delta);
+
+  // seq_clock_1 -> MONOTONIC.
+  EXPECT_EQ(*ct_.ToTraceTime(seq_clock_1, 1100), 1100 - 1200 + 2000 - (-2000));
+}
+
 }  // namespace
 }  // namespace trace_processor
 }  // namespace perfetto
diff --git a/src/trace_processor/importers/ftrace/ftrace_tokenizer.cc b/src/trace_processor/importers/ftrace/ftrace_tokenizer.cc
index 1e61a46..39c01ab 100644
--- a/src/trace_processor/importers/ftrace/ftrace_tokenizer.cc
+++ b/src/trace_processor/importers/ftrace/ftrace_tokenizer.cc
@@ -56,7 +56,8 @@
     ClockTracker::ClockId clock_id,
     int64_t ts) {
   // On most traces (i.e. P+), the clock should be BOOTTIME.
-  if (PERFETTO_LIKELY(clock_id == BuiltinClock::BUILTIN_CLOCK_BOOTTIME))
+  if (PERFETTO_LIKELY(clock_id == BuiltinClock::BUILTIN_CLOCK_BOOTTIME &&
+                      !context->machine_id()))
     return ts;
   return context->clock_tracker->ToTraceTime(clock_id, ts);
 }
diff --git a/src/trace_processor/importers/proto/BUILD.gn b/src/trace_processor/importers/proto/BUILD.gn
index 1551d9b..3c9f299 100644
--- a/src/trace_processor/importers/proto/BUILD.gn
+++ b/src/trace_processor/importers/proto/BUILD.gn
@@ -278,6 +278,7 @@
     "perf_sample_tracker_unittest.cc",
     "profile_packet_sequence_state_unittest.cc",
     "proto_trace_parser_impl_unittest.cc",
+    "proto_trace_reader_unittest.cc",
     "proto_trace_tokenizer_unittest.cc",
     "string_encoding_utils_unittests.cc",
   ]
diff --git a/src/trace_processor/importers/proto/proto_trace_reader.cc b/src/trace_processor/importers/proto/proto_trace_reader.cc
index d958c0b..1c06b17 100644
--- a/src/trace_processor/importers/proto/proto_trace_reader.cc
+++ b/src/trace_processor/importers/proto/proto_trace_reader.cc
@@ -16,8 +16,10 @@
 
 #include "src/trace_processor/importers/proto/proto_trace_reader.h"
 
+#include <numeric>
 #include <optional>
 #include <string>
+#include <vector>
 
 #include "perfetto/base/build_config.h"
 #include "perfetto/base/logging.h"
@@ -48,6 +50,7 @@
 #include "protos/perfetto/trace/extension_descriptor.pbzero.h"
 #include "protos/perfetto/trace/perfetto/tracing_service_event.pbzero.h"
 #include "protos/perfetto/trace/profiling/profile_common.pbzero.h"
+#include "protos/perfetto/trace/remote_clock_sync.pbzero.h"
 #include "protos/perfetto/trace/trace.pbzero.h"
 #include "protos/perfetto/trace/trace_packet.pbzero.h"
 
@@ -150,6 +153,11 @@
     ParseTraceStats(decoder.trace_stats());
   }
 
+  if (decoder.has_remote_clock_sync()) {
+    PERFETTO_DCHECK(context_->machine_id());
+    return ParseRemoteClockSync(decoder.remote_clock_sync());
+  }
+
   if (decoder.has_service_event()) {
     PERFETTO_DCHECK(decoder.has_timestamp());
     int64_t ts = static_cast<int64_t>(decoder.timestamp());
@@ -444,6 +452,126 @@
   return util::OkStatus();
 }
 
+util::Status ProtoTraceReader::ParseRemoteClockSync(ConstBytes blob) {
+  protos::pbzero::RemoteClockSync::Decoder evt(blob.data, blob.size);
+
+  std::vector<SyncClockSnapshots> sync_clock_snapshots;
+  // Decode the RemoteClockSync message into a struct for calculating offsets.
+  for (auto it = evt.synced_clocks(); it; ++it) {
+    sync_clock_snapshots.emplace_back();
+    auto& sync_clocks = sync_clock_snapshots.back();
+
+    protos::pbzero::RemoteClockSync::SyncedClocks::Decoder synced_clocks(*it);
+    protos::pbzero::ClockSnapshot::ClockSnapshot::Decoder host_clocks(
+        synced_clocks.host_clocks());
+    for (auto clock_it = host_clocks.clocks(); clock_it; clock_it++) {
+      protos::pbzero::ClockSnapshot::ClockSnapshot::Clock::Decoder clock(
+          *clock_it);
+      sync_clocks[clock.clock_id()].first = clock.timestamp();
+    }
+
+    std::vector<ClockTracker::ClockTimestamp> clock_timestamps;
+    protos::pbzero::ClockSnapshot::ClockSnapshot::Decoder client_clocks(
+        synced_clocks.client_clocks());
+    for (auto clock_it = client_clocks.clocks(); clock_it; clock_it++) {
+      protos::pbzero::ClockSnapshot::ClockSnapshot::Clock::Decoder clock(
+          *clock_it);
+      sync_clocks[clock.clock_id()].second = clock.timestamp();
+      clock_timestamps.emplace_back(clock.clock_id(), clock.timestamp(), 1,
+                                    false);
+    }
+
+    // In addition for calculating clock offsets, client clock snapshots are
+    // also added to clock tracker to emulate tracing service taking periodical
+    // clock snapshots. This builds a clock conversion path from a local trace
+    // time (e.g. Chrome trace time) to client builtin clock (CLOCK_MONOTONIC)
+    // which can be converted to host trace time (CLOCK_BOOTTIME).
+    context_->clock_tracker->AddSnapshot(clock_timestamps);
+  }
+
+  // Calculate clock offsets and report to the ClockTracker.
+  auto clock_offsets = CalculateClockOffsets(sync_clock_snapshots);
+  for (auto it = clock_offsets.GetIterator(); it; ++it) {
+    context_->clock_tracker->SetClockOffset(it.key(), it.value());
+  }
+
+  return util::OkStatus();
+}
+
+base::FlatHashMap<int64_t /*Clock Id*/, int64_t /*Offset*/>
+ProtoTraceReader::CalculateClockOffsets(
+    std::vector<SyncClockSnapshots>& sync_clock_snapshots) {
+  base::FlatHashMap<int64_t /*Clock Id*/, int64_t /*Offset*/> clock_offsets;
+
+  // The RemoteClockSync message contains a sequence of |synced_clocks|
+  // messages. Each |synced_clocks| message contains pairs of ClockSnapshots
+  // taken on both the client and host sides.
+  //
+  // The "synced_clocks" messages are emitted periodically. A single round of
+  // data collection involves four snapshots:
+  //   1. Client snapshot
+  //   2. Host snapshot (triggered by client's IPC message)
+  //   3. Client snapshot (triggered by host's IPC message)
+  //   4. Host snapshot
+  //
+  // These four snapshots are used to estimate the clock offset between the
+  // client and host for each default clock domain present in the ClockSnapshot.
+  std::map<int64_t, std::vector<int64_t>> raw_clock_offsets;
+  // Remote clock syncs happen in an interval of 30 sec. 2 adjacent clock
+  // snapshots belong to the same round if they happen within 30 secs.
+  constexpr uint64_t clock_sync_interval_ns = 30lu * 1000000000;
+  for (size_t i = 1; i < sync_clock_snapshots.size(); i++) {
+    // Synced clocks are taken by client snapshot -> host snapshot.
+    auto& ping_clocks = sync_clock_snapshots[i - 1];
+    auto& update_clocks = sync_clock_snapshots[i];
+
+    auto ping_client =
+        ping_clocks[protos::pbzero::BuiltinClock::BUILTIN_CLOCK_BOOTTIME]
+            .second;
+    auto update_client =
+        update_clocks[protos::pbzero::BuiltinClock::BUILTIN_CLOCK_BOOTTIME]
+            .second;
+    // |ping_clocks| and |update_clocks| belong to 2 different rounds of remote
+    // clock sync rounds.
+    if (update_client - ping_client >= clock_sync_interval_ns)
+      continue;
+
+    for (auto it = ping_clocks.GetIterator(); it; ++it) {
+      const auto clock_id = it.key();
+      const auto [t1h, t1c] = it.value();
+      const auto [t2h, t2c] = update_clocks[clock_id];
+
+      if (!t1h || !t1c || !t2h || !t2c)
+        continue;
+
+      int64_t offset1 =
+          static_cast<int64_t>(t1c + t2c) / 2 - static_cast<int64_t>(t1h);
+      int64_t offset2 =
+          static_cast<int64_t>(t2c) - static_cast<int64_t>(t1h + t2h) / 2;
+
+      // Clock values are taken in the order of t1c, t1h, t2c, t2h. Offset
+      // calculation requires at least 3 timestamps as a round trip. We have 4,
+      // which can be treated as 2 round trips:
+      //   1. t1c, t1h, t2c as the round trip initiated by the client. Offset 1
+      //      = (t1c + t2c) / 2 - t1h
+      //   2. t1h, t2c, t2h as the round trip initiated by the host. Offset 2 =
+      //      t2c - (t1h + t2h) / 2
+      raw_clock_offsets[clock_id].push_back(offset1);
+      raw_clock_offsets[clock_id].push_back(offset2);
+    }
+
+    // Use the average of estimated clock offsets in the clock tracker.
+    for (const auto& [clock_id, offsets] : raw_clock_offsets) {
+      int64_t avg_offset =
+          std::accumulate(offsets.begin(), offsets.end(), 0LL) /
+          static_cast<int64_t>(offsets.size());
+      clock_offsets[clock_id] = avg_offset;
+    }
+  }
+
+  return clock_offsets;
+}
+
 std::optional<StringId> ProtoTraceReader::GetBuiltinClockNameOrNull(
     int64_t clock_id) {
   switch (clock_id) {
diff --git a/src/trace_processor/importers/proto/proto_trace_reader.h b/src/trace_processor/importers/proto/proto_trace_reader.h
index 4c4e2ca..0356f98 100644
--- a/src/trace_processor/importers/proto/proto_trace_reader.h
+++ b/src/trace_processor/importers/proto/proto_trace_reader.h
@@ -64,11 +64,23 @@
   util::Status Parse(TraceBlobView) override;
   void NotifyEndOfFile() override;
 
+  using SyncClockSnapshots = base::FlatHashMap<
+      int64_t,
+      std::pair</*host ts*/ uint64_t, /*client ts*/ uint64_t>>;
+  base::FlatHashMap<int64_t /*Clock Id*/, int64_t /*Offset*/>
+  CalculateClockOffsetsForTesting(
+      std::vector<SyncClockSnapshots>& sync_clock_snapshots) {
+    return CalculateClockOffsets(sync_clock_snapshots);
+  }
+
+  std::optional<StringId> GetBuiltinClockNameOrNull(int64_t clock_id);
+
  private:
   using ConstBytes = protozero::ConstBytes;
   util::Status ParsePacket(TraceBlobView);
   util::Status ParseServiceEvent(int64_t ts, ConstBytes);
   util::Status ParseClockSnapshot(ConstBytes blob, uint32_t seq_id);
+  util::Status ParseRemoteClockSync(ConstBytes blob);
   void HandleIncrementalStateCleared(
       const protos::pbzero::TracePacket_Decoder&);
   void HandleFirstPacketOnSequence(uint32_t packet_sequence_id);
@@ -80,7 +92,8 @@
   void ParseTraceConfig(ConstBytes);
   void ParseTraceStats(ConstBytes);
 
-  std::optional<StringId> GetBuiltinClockNameOrNull(int64_t clock_id);
+  base::FlatHashMap<int64_t /*Clock Id*/, int64_t /*Offset*/>
+  CalculateClockOffsets(std::vector<SyncClockSnapshots>&);
 
   PacketSequenceStateBuilder* GetIncrementalStateForPacketSequence(
       uint32_t sequence_id) {
diff --git a/src/trace_processor/importers/proto/proto_trace_reader_unittest.cc b/src/trace_processor/importers/proto/proto_trace_reader_unittest.cc
new file mode 100644
index 0000000..166640c
--- /dev/null
+++ b/src/trace_processor/importers/proto/proto_trace_reader_unittest.cc
@@ -0,0 +1,212 @@
+/*
+ * Copyright (C) 2024 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "src/trace_processor/importers/proto/proto_trace_reader.h"
+#include <memory>
+
+#include "perfetto/protozero/scattered_heap_buffer.h"
+#include "protos/perfetto/common/builtin_clock.pbzero.h"
+#include "src/trace_processor/importers/common/clock_tracker.h"
+#include "src/trace_processor/importers/common/machine_tracker.h"
+#include "src/trace_processor/storage/trace_storage.h"
+
+#include "test/gtest_and_gmock.h"
+
+#include "protos/perfetto/trace/clock_snapshot.pbzero.h"
+#include "protos/perfetto/trace/remote_clock_sync.pbzero.h"
+
+namespace perfetto::trace_processor {
+namespace {
+
+constexpr auto REALTIME = protos::pbzero::BUILTIN_CLOCK_REALTIME;
+constexpr auto BOOTTIME = protos::pbzero::BUILTIN_CLOCK_BOOTTIME;
+
+class ProtoTraceReaderTest : public ::testing::Test {
+ public:
+  ProtoTraceReaderTest() {
+    context_.storage = std::make_unique<TraceStorage>();
+    context_.machine_tracker =
+        std::make_unique<MachineTracker>(&context_, 0x1001);
+    context_.clock_tracker = std::make_unique<ClockTracker>(&context_);
+    proto_trace_reader_ = std::make_unique<ProtoTraceReader>(&context_);
+  }
+
+  util::Status Tokenize() {
+    trace_->Finalize();
+    std::vector<uint8_t> trace_bytes = trace_.SerializeAsArray();
+    std::unique_ptr<uint8_t[]> raw_trace(new uint8_t[trace_bytes.size()]);
+    memcpy(raw_trace.get(), trace_bytes.data(), trace_bytes.size());
+    auto status = proto_trace_reader_->Parse(TraceBlobView(
+        TraceBlob::TakeOwnership(std::move(raw_trace), trace_bytes.size())));
+
+    trace_.Reset();
+    return status;
+  }
+
+ protected:
+  protozero::HeapBuffered<protos::pbzero::Trace> trace_;
+  TraceProcessorContext context_;
+  std::unique_ptr<ProtoTraceReader> proto_trace_reader_;
+};
+
+TEST_F(ProtoTraceReaderTest, RemoteClockSync_Valid) {
+  context_.machine_tracker =
+      std::make_unique<MachineTracker>(&context_, 0x1001);
+
+  auto* packet = trace_->add_packet();
+  packet->set_machine_id(0x1001);
+  auto* remote_clock_sync = packet->set_remote_clock_sync();
+  auto* synced_clocks = remote_clock_sync->add_synced_clocks();
+  auto* client_clocks = synced_clocks->set_client_clocks();
+
+  // First synced clock snapshots on both sides.
+  auto* clock = client_clocks->add_clocks();
+  clock->set_clock_id(BOOTTIME);
+  clock->set_timestamp(10000);
+
+  auto* host_clocks = synced_clocks->set_host_clocks();
+  clock = host_clocks->add_clocks();
+  clock->set_clock_id(BOOTTIME);
+  clock->set_timestamp(120000);
+
+  // Second synced clock snapshots on both sides.
+  synced_clocks = remote_clock_sync->add_synced_clocks();
+
+  client_clocks = synced_clocks->set_client_clocks();
+  clock = client_clocks->add_clocks();
+  clock->set_clock_id(BOOTTIME);
+  clock->set_timestamp(25000);
+
+  host_clocks = synced_clocks->set_host_clocks();
+  clock = host_clocks->add_clocks();
+  clock->set_clock_id(BOOTTIME);
+  clock->set_timestamp(135000);
+
+  ASSERT_TRUE(Tokenize().ok());
+  ASSERT_EQ(1u, context_.clock_tracker->clock_offsets_for_testing().size());
+}
+
+TEST_F(ProtoTraceReaderTest, RemoteClockSync_Incomplete) {
+  context_.machine_tracker =
+      std::make_unique<MachineTracker>(&context_, 0x1001);
+
+  auto* packet = trace_->add_packet();
+  packet->set_machine_id(0x1001);
+  auto* remote_clock_sync = packet->set_remote_clock_sync();
+  auto* synced_clocks = remote_clock_sync->add_synced_clocks();
+  auto* client_clocks = synced_clocks->set_client_clocks();
+
+  // First synced clock snapshots on both sides.
+  auto* clock = client_clocks->add_clocks();
+  clock->set_clock_id(BOOTTIME);
+  clock->set_timestamp(10000);
+
+  auto* host_clocks = synced_clocks->set_host_clocks();
+  clock = host_clocks->add_clocks();
+  clock->set_clock_id(BOOTTIME);
+  clock->set_timestamp(120000);
+
+  // Second synced clock snapshots on both sides.
+  synced_clocks = remote_clock_sync->add_synced_clocks();
+
+  client_clocks = synced_clocks->set_client_clocks();
+  clock = client_clocks->add_clocks();
+  clock->set_clock_id(BOOTTIME);
+  clock->set_timestamp(25000);
+
+  // Missing the second host CLOCK_BOOTTIME making it below the minimum
+  // requirement for using the remote_clock_sync for calculating clock offset.
+
+  ASSERT_TRUE(Tokenize().ok());
+  // No valid clock offset.
+  ASSERT_EQ(0u, context_.clock_tracker->clock_offsets_for_testing().size());
+}
+
+TEST_F(ProtoTraceReaderTest, CalculateClockOffset) {
+  std::vector<ProtoTraceReader::SyncClockSnapshots> sync_clock_snapshots;
+  ProtoTraceReader::SyncClockSnapshots snapshots;
+  snapshots[BOOTTIME] = {120000, 10000};
+  snapshots[REALTIME] = {135000, 25000};
+  sync_clock_snapshots.push_back(std::move(snapshots));
+
+  snapshots[BOOTTIME] = {140000, 20000};
+  snapshots[REALTIME] = {150000, 35000};
+  sync_clock_snapshots.push_back(std::move(snapshots));
+
+  auto clock_offsets = proto_trace_reader_->CalculateClockOffsetsForTesting(
+      sync_clock_snapshots);
+  ASSERT_EQ(2u, clock_offsets.size());
+  // Client 10000      20000
+  // Host     120000     140000
+  // Estimated offsets: (10000 + 20000)/2 - 120000 = -105000,
+  //                    20000 - (120000 + 140000) / 2 = -110000.
+  // Average = -107500.
+  ASSERT_EQ(-107500, clock_offsets[BOOTTIME]);
+  // Client 25000      35000
+  // Host     135000     150000
+  // Estimated offsets: (25000 + 35000)/2 - 135000 = -105000,
+  //                    35000 - (135000 + 150000) / 2 = -107500.
+  // Average = -106250.
+  ASSERT_EQ(-106250, clock_offsets[REALTIME]);
+}
+
+TEST_F(ProtoTraceReaderTest, CalculateClockOffset_AboveThreshold) {
+  std::vector<ProtoTraceReader::SyncClockSnapshots> sync_clock_snapshots;
+  ProtoTraceReader::SyncClockSnapshots snapshots;
+  snapshots[BOOTTIME] = {120000, 10000};
+  snapshots[REALTIME] = {135000, 25000};
+  sync_clock_snapshots.push_back(std::move(snapshots));
+
+  // 30 sec interval: the 2 clock snapshots will be considered 2 different
+  // rounds of clock synchronization IPC exchange and won't be used.
+  auto interval = 30ull * 1000 * 1000 * 1000;
+  snapshots[BOOTTIME] = {120000 + interval, 10000 + interval};
+  snapshots[REALTIME] = {135000 + interval, 25000 + interval};
+  sync_clock_snapshots.push_back(std::move(snapshots));
+
+  auto clock_offsets = proto_trace_reader_->CalculateClockOffsetsForTesting(
+      sync_clock_snapshots);
+  ASSERT_EQ(0u, clock_offsets.size());
+}
+
+TEST_F(ProtoTraceReaderTest, CalculateClockOffset_MultiRounds) {
+  std::vector<ProtoTraceReader::SyncClockSnapshots> sync_clock_snapshots;
+  ProtoTraceReader::SyncClockSnapshots snapshots;
+  // This emits clock offsets -105000, -110000.
+  snapshots[BOOTTIME] = {120000, 10000};
+  sync_clock_snapshots.push_back(std::move(snapshots));
+  snapshots[BOOTTIME] = {140000, 20000};
+  sync_clock_snapshots.push_back(std::move(snapshots));
+
+  // The interval works as a delimeter of IPC exchange.
+  auto interval = 30ull * 1000 * 1000 * 1000;
+
+  // This emits clock offsets: (30000 + 45000) / 2 - 160000 = -122500,
+  //                           45000 - (160000 + 170000) / 2 = -120000.
+  snapshots[BOOTTIME] = {160000 + interval, 30000 + interval};
+  sync_clock_snapshots.push_back(std::move(snapshots));
+  snapshots[BOOTTIME] = {170000 + interval, 45000 + interval};
+  sync_clock_snapshots.push_back(std::move(snapshots));
+
+  auto clock_offsets = proto_trace_reader_->CalculateClockOffsetsForTesting(
+      sync_clock_snapshots);
+  ASSERT_EQ(1u, clock_offsets.size());
+  // Average(-105000, -110000, -122500, -120000) = -114375.
+  ASSERT_EQ(-114375, clock_offsets[BOOTTIME]);
+}
+
+}  // namespace
+}  // namespace perfetto::trace_processor
diff --git a/test/data/multi_machine_trace.pb.sha256 b/test/data/multi_machine_trace.pb.sha256
new file mode 100644
index 0000000..2e7c04c
--- /dev/null
+++ b/test/data/multi_machine_trace.pb.sha256
@@ -0,0 +1 @@
+366c817527d84de98bac52943381965425fd959fa1c824dd856237a4cb3c9722
\ No newline at end of file
diff --git a/test/trace_processor/diff_tests/parser/parsing/tests.py b/test/trace_processor/diff_tests/parser/parsing/tests.py
index f798174..5ebee2d 100644
--- a/test/trace_processor/diff_tests/parser/parsing/tests.py
+++ b/test/trace_processor/diff_tests/parser/parsing/tests.py
@@ -1485,3 +1485,71 @@
       0,0,"[NULL]","AArch64 Processor rev 13 (aarch64)"
       1,0,"[NULL]","AArch64 Processor rev 13 (aarch64)"
       """))
+
+  # Test that the sched slices of a VM guest is ingested and not filtered
+  # because timestamp is far before the tracing session.
+  def test_sched_remote_clock_sync(self):
+    return DiffTestBlueprint(
+        trace=DataPath('multi_machine_trace.pb'),
+        query="""
+        SELECT ts, cpu.cpu, thread.name, thread.tid
+        FROM sched JOIN cpu USING(ucpu) JOIN thread USING(utid)
+        WHERE cpu.machine_id IS NOT NULL LIMIT 10
+        """,
+        out=Csv("""
+        "ts","cpu","name","tid"
+        5230310112669,5,"kworker/5:7",32536
+        5230310132355,5,"swapper",0
+        5230310284063,4,"traced_probes",550
+        5230310421518,1,"swapper",0
+        5230310428373,3,"swapper",0
+        5230310587630,1,"rcuog/4",49
+        5230310590258,3,"logd.klogd",246
+        5230310592868,1,"swapper",0
+        5230310659357,3,"swapper",0
+        5230310671279,5,"traced_relay",25171
+        """))
+
+  # A query that selects the sched slices of a host vcpu thread and the guest
+  # sched slices. If remote clock sync works, guest sched slices should not be
+  # far off from host vcpu slices, and the query should return both host and
+  # guest slices.
+  def test_sched_remote_clock_sync_vcpu0(self):
+    return DiffTestBlueprint(
+        trace=DataPath('multi_machine_trace.pb'),
+        query="""
+        SELECT ts, cpu.cpu, utid, machine_id
+        FROM sched JOIN cpu USING (ucpu)
+        WHERE ucpu = 4096
+        UNION
+        SELECT ts, cpu.cpu, utid, machine_id
+        FROM sched JOIN cpu USING (ucpu)
+        WHERE utid = (
+          SELECT utid
+          FROM thread
+          WHERE name = 'crosvm_vcpu0')
+        LIMIT 20
+        """,
+        out=Csv("""
+        "ts","cpu","utid","machine_id"
+        5230311628979,0,1,1
+        5230315517287,0,11,1
+        5230315524649,0,1,1
+        5230315676788,0,10,1
+        5230315684911,0,1,1
+        5230319663217,0,10,1
+        5230319684310,0,1,1
+        5230323692459,0,10,1
+        5230323726976,0,11,1
+        5230323764556,0,1,1
+        5230327702466,0,10,1
+        5230327736100,0,1,1
+        5230331761483,0,10,1
+        5230331800905,0,11,1
+        5230331837332,0,1,1
+        5230421799455,0,10,1
+        5230421810047,0,1,1
+        5230422048874,0,1306,"[NULL]"
+        5230422153284,0,1306,"[NULL]"
+        5230425693562,0,10,1
+        """))