Merge "tp: return an explicit error if we failed to build the flamegraph" into main
diff --git a/BUILD.gn b/BUILD.gn
index cff5360..0a93081 100644
--- a/BUILD.gn
+++ b/BUILD.gn
@@ -39,6 +39,9 @@
   if (enable_perfetto_traced_probes) {
     all_targets += [ "src/traced/probes:traced_probes" ]
   }
+  if (enable_perfetto_traced_relay) {
+    all_targets += [ "src/traced_relay:traced_relay" ]
+  }
 }
 
 if (enable_perfetto_trace_processor && enable_perfetto_trace_processor_sqlite) {
diff --git a/gn/perfetto.gni b/gn/perfetto.gni
index b23aa06..ecc7252 100644
--- a/gn/perfetto.gni
+++ b/gn/perfetto.gni
@@ -266,6 +266,10 @@
   # extremely low.
   enable_perfetto_traced_probes = enable_perfetto_platform_services && !is_win
 
+  # The relay service is enabled when platform services are enabled.
+  # TODO(chinglinyu) check if we can enable on Windows.
+  enable_perfetto_traced_relay = enable_perfetto_platform_services && !is_win
+
   # Whether info-level logging is enabled.
   perfetto_verbose_logs_enabled =
       !build_with_chromium || perfetto_force_dlog == "on"
diff --git a/include/perfetto/ext/base/file_utils.h b/include/perfetto/ext/base/file_utils.h
index d2412d0..42e9a28e 100644
--- a/include/perfetto/ext/base/file_utils.h
+++ b/include/perfetto/ext/base/file_utils.h
@@ -93,6 +93,11 @@
 base::Status ListFilesRecursive(const std::string& dir_path,
                                 std::vector<std::string>& output);
 
+// Sets |path|'s owner group to |group_name| and permission mode bits to
+// |mode_bits|.
+base::Status SetFilePermissions(const std::string& path,
+                                const std::string& group_name,
+                                const std::string& mode_bits);
 }  // namespace base
 }  // namespace perfetto
 
diff --git a/include/perfetto/ext/tracing/ipc/default_socket.h b/include/perfetto/ext/tracing/ipc/default_socket.h
index c173ebf..86d7b78 100644
--- a/include/perfetto/ext/tracing/ipc/default_socket.h
+++ b/include/perfetto/ext/tracing/ipc/default_socket.h
@@ -31,6 +31,11 @@
     const char* producer_socket_names);
 PERFETTO_EXPORT_COMPONENT const char* GetProducerSocket();
 
+// Optionally returns the relay socket name (nullable). The relay socket is used
+// for forwarding the IPC messages between the local producers and the remote
+// tracing service.
+PERFETTO_EXPORT_COMPONENT const char* GetRelaySocket();
+
 }  // namespace perfetto
 
 #endif  // INCLUDE_PERFETTO_EXT_TRACING_IPC_DEFAULT_SOCKET_H_
diff --git a/protos/perfetto/ipc/wire_protocol.proto b/protos/perfetto/ipc/wire_protocol.proto
index 038d4bb..8117316 100644
--- a/protos/perfetto/ipc/wire_protocol.proto
+++ b/protos/perfetto/ipc/wire_protocol.proto
@@ -65,6 +65,17 @@
   // Host -> Client.
   message RequestError { optional string error = 1; }
 
+  // Client (relay service) -> Host. This is generated by the relay service to
+  // fill the producer identity in the guest. This message is sent to the host
+  // service *before* any IPCFrame is from a local producer is relayed. This is
+  // accepted only on AF_VSOCK and AF_INET sockets, where we cannot validate the
+  // endpoont of the connection. for AF_UNIX sockets, this is ignored and traced
+  // uses instead the SO_PEERCRED.
+  message SetPeerIdentity {
+    optional int32 pid = 1;
+    optional int32 uid = 2;
+  }
+
   // The client is expected to send requests with monotonically increasing
   // request_id. The host will match the request_id sent from the client.
   // In the case of a Streaming response (has_more = true) the host will send
@@ -77,6 +88,7 @@
     InvokeMethod msg_invoke_method = 5;
     InvokeMethodReply msg_invoke_method_reply = 6;
     RequestError msg_request_error = 7;
+    SetPeerIdentity set_peer_identity = 8;
   }
 
   // Used only in unittests to generate a parsable message of arbitrary size.
diff --git a/src/base/file_utils.cc b/src/base/file_utils.cc
index e7d4c94..0969515 100644
--- a/src/base/file_utils.cc
+++ b/src/base/file_utils.cc
@@ -26,11 +26,13 @@
 #include <vector>
 
 #include "perfetto/base/build_config.h"
+#include "perfetto/base/compiler.h"
 #include "perfetto/base/logging.h"
 #include "perfetto/base/platform_handle.h"
 #include "perfetto/base/status.h"
 #include "perfetto/ext/base/platform.h"
 #include "perfetto/ext/base/scoped_file.h"
+#include "perfetto/ext/base/string_utils.h"
 #include "perfetto/ext/base/utils.h"
 
 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
@@ -43,6 +45,17 @@
 #include <unistd.h>
 #endif
 
+#if PERFETTO_BUILDFLAG(PERFETTO_OS_LINUX) ||   \
+    PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID) || \
+    PERFETTO_BUILDFLAG(PERFETTO_OS_APPLE)
+#define PERFETTO_SET_FILE_PERMISSIONS
+#include <fcntl.h>
+#include <grp.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <unistd.h>
+#endif
+
 namespace perfetto {
 namespace base {
 namespace {
@@ -346,5 +359,54 @@
   return filename.substr(ext_idx);
 }
 
+base::Status SetFilePermissions(const std::string& file_path,
+                                const std::string& group_name_or_id,
+                                const std::string& mode_bits) {
+#ifdef PERFETTO_SET_FILE_PERMISSIONS
+  PERFETTO_CHECK(!file_path.empty());
+  PERFETTO_CHECK(!group_name_or_id.empty());
+
+  // Default |group_id| to -1 for not changing the group ownership.
+  gid_t group_id = static_cast<gid_t>(-1);
+  auto maybe_group_id = base::StringToUInt32(group_name_or_id);
+  if (maybe_group_id) {  // A numerical group ID.
+    group_id = *maybe_group_id;
+  } else {  // A group name.
+    struct group* file_group = nullptr;
+    // Query the group ID of |group|.
+    do {
+      file_group = getgrnam(group_name_or_id.c_str());
+    } while (file_group == nullptr && errno == EINTR);
+    if (file_group == nullptr) {
+      return base::ErrStatus("Failed to get group information of %s ",
+                             group_name_or_id.c_str());
+    }
+    group_id = file_group->gr_gid;
+  }
+
+  if (PERFETTO_EINTR(chown(file_path.c_str(), geteuid(), group_id))) {
+    return base::ErrStatus("Failed to chown %s ", file_path.c_str());
+  }
+
+  // |mode| accepts values like "0660" as "rw-rw----" mode bits.
+  auto mode_value = base::StringToInt32(mode_bits, 8);
+  if (!(mode_bits.size() == 4 && mode_value.has_value())) {
+    return base::ErrStatus(
+        "The chmod mode bits must be a 4-digit octal number, e.g. 0660");
+  }
+  if (PERFETTO_EINTR(
+          chmod(file_path.c_str(), static_cast<mode_t>(mode_value.value())))) {
+    return base::ErrStatus("Failed to chmod %s", file_path.c_str());
+  }
+  return base::OkStatus();
+#else
+  base::ignore_result(file_path);
+  base::ignore_result(group_name_or_id);
+  base::ignore_result(mode_bits);
+  return base::ErrStatus(
+      "Setting file permissions is not supported on this platform");
+#endif
+}
+
 }  // namespace base
 }  // namespace perfetto
diff --git a/src/ipc/BUILD.gn b/src/ipc/BUILD.gn
index 39843f1..1cd1a8d 100644
--- a/src/ipc/BUILD.gn
+++ b/src/ipc/BUILD.gn
@@ -141,6 +141,7 @@
 perfetto_component("perfetto_ipc") {
   public_deps = [
     ":client",
+    ":common",
     ":host",
     "../../gn:default_deps",
   ]
diff --git a/src/ipc/host_impl.cc b/src/ipc/host_impl.cc
index 0b852c0..029b450 100644
--- a/src/ipc/host_impl.cc
+++ b/src/ipc/host_impl.cc
@@ -21,9 +21,12 @@
 #include <utility>
 
 #include "perfetto/base/build_config.h"
+#include "perfetto/base/logging.h"
 #include "perfetto/base/task_runner.h"
 #include "perfetto/base/time.h"
 #include "perfetto/ext/base/crash_keys.h"
+#include "perfetto/ext/base/sys_types.h"
+#include "perfetto/ext/base/unix_socket.h"
 #include "perfetto/ext/base/utils.h"
 #include "perfetto/ext/ipc/service.h"
 #include "perfetto/ext/ipc/service_descriptor.h"
@@ -41,8 +44,9 @@
     kUseTCPSocket ? base::SockFamily::kInet : base::SockFamily::kUnix;
 
 base::CrashKey g_crash_key_uid("ipc_uid");
+}  // namespace
 
-uid_t GetPosixPeerUid(base::UnixSocket* sock) {
+uid_t HostImpl::ClientConnection::GetPosixPeerUid() const {
 #if PERFETTO_BUILDFLAG(PERFETTO_OS_LINUX) ||   \
     PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID) || \
     PERFETTO_BUILDFLAG(PERFETTO_OS_APPLE)
@@ -50,22 +54,23 @@
     return sock->peer_uid_posix();
 #endif
 
-  // Unsupported. Must be != kInvalidUid or the PacketValidator will fail.
-  base::ignore_result(sock);
+  // For non-unix sockets, check if the UID is set in OnSetPeerIdentity().
+  if (uid_override != base::kInvalidUid)
+    return uid_override;
+  // Must be != kInvalidUid or the PacketValidator will fail.
   return 0;
 }
 
-pid_t GetLinuxPeerPid(base::UnixSocket* sock) {
+pid_t HostImpl::ClientConnection::GetLinuxPeerPid() const {
 #if PERFETTO_BUILDFLAG(PERFETTO_OS_LINUX) || \
     PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
   if (sock->family() == base::SockFamily::kUnix)
     return sock->peer_pid_linux();
 #endif
-  base::ignore_result(sock);
-  return base::kInvalidPid;  // Unsupported.
-}
 
-}  // namespace
+  // For non-unix sockets, return the PID set in OnSetPeerIdentity().
+  return pid_override;
+}
 
 // static
 std::unique_ptr<Host> Host::CreateInstance(const char* socket_name,
@@ -179,7 +184,7 @@
   ClientConnection* client = it->second;
   BufferedFrameDeserializer& frame_deserializer = client->frame_deserializer;
 
-  auto peer_uid = GetPosixPeerUid(client->sock.get());
+  auto peer_uid = client->GetPosixPeerUid();
   auto scoped_key = g_crash_key_uid.SetScoped(static_cast<int64_t>(peer_uid));
 
   size_t rsize;
@@ -209,6 +214,8 @@
     return OnBindService(client, req_frame);
   if (req_frame.has_msg_invoke_method())
     return OnInvokeMethod(client, req_frame);
+  if (req_frame.has_set_peer_identity())
+    return OnSetPeerIdentity(client, req_frame);
 
   PERFETTO_DLOG("Received invalid RPC frame from client %" PRIu64, client->id);
   Frame reply_frame;
@@ -276,16 +283,35 @@
     });
   }
 
-  auto peer_uid = GetPosixPeerUid(client->sock.get());
+  auto peer_uid = client->GetPosixPeerUid();
   auto scoped_key = g_crash_key_uid.SetScoped(static_cast<int64_t>(peer_uid));
   service->client_info_ =
-      ClientInfo(client->id, peer_uid, GetLinuxPeerPid(client->sock.get()));
+      ClientInfo(client->id, peer_uid, client->GetLinuxPeerPid());
   service->received_fd_ = &client->received_fd;
   method.invoker(service, *decoded_req_args, std::move(deferred_reply));
   service->received_fd_ = nullptr;
   service->client_info_ = ClientInfo();
 }
 
+void HostImpl::OnSetPeerIdentity(ClientConnection* client,
+                                 const Frame& req_frame) {
+  if (client->sock->family() == base::SockFamily::kUnix) {
+    PERFETTO_DLOG("SetPeerIdentity is ignored for unix socket connections.");
+    return;
+  }
+
+  // This is can only be set once by the relay service.
+  if (client->pid_override != base::kInvalidPid ||
+      client->uid_override != base::kInvalidUid) {
+    PERFETTO_DLOG("Already received SetPeerIdentity.");
+    return;
+  }
+
+  client->pid_override = req_frame.set_peer_identity().pid();
+  client->uid_override =
+      static_cast<uid_t>(req_frame.set_peer_identity().uid());
+}
+
 void HostImpl::ReplyToMethodInvocation(ClientID client_id,
                                        RequestID request_id,
                                        AsyncResult<ProtoMessage> reply) {
@@ -312,7 +338,7 @@
 
 // static
 void HostImpl::SendFrame(ClientConnection* client, const Frame& frame, int fd) {
-  auto peer_uid = GetPosixPeerUid(client->sock.get());
+  auto peer_uid = client->GetPosixPeerUid();
   auto scoped_key = g_crash_key_uid.SetScoped(static_cast<int64_t>(peer_uid));
 
   std::string buf = BufferedFrameDeserializer::Serialize(frame);
@@ -345,10 +371,11 @@
   auto it = clients_by_socket_.find(sock);
   if (it == clients_by_socket_.end())
     return;
-  ClientID client_id = it->second->id;
+  auto* client = it->second;
+  ClientID client_id = client->id;
 
-  ClientInfo client_info(client_id, GetPosixPeerUid(sock),
-                         GetLinuxPeerPid(sock));
+  ClientInfo client_info(client_id, client->GetPosixPeerUid(),
+                         client->GetLinuxPeerPid());
   clients_by_socket_.erase(it);
   PERFETTO_DCHECK(clients_.count(client_id));
   clients_.erase(client_id);
diff --git a/src/ipc/host_impl.h b/src/ipc/host_impl.h
index 788b81a..8738459 100644
--- a/src/ipc/host_impl.h
+++ b/src/ipc/host_impl.h
@@ -24,6 +24,7 @@
 
 #include "perfetto/base/task_runner.h"
 #include "perfetto/ext/base/scoped_file.h"
+#include "perfetto/ext/base/sys_types.h"
 #include "perfetto/ext/base/thread_checker.h"
 #include "perfetto/ext/base/unix_socket.h"
 #include "perfetto/ext/ipc/deferred.h"
@@ -66,6 +67,14 @@
     BufferedFrameDeserializer frame_deserializer;
     base::ScopedFile received_fd;
     std::function<bool(int)> send_fd_cb_fuchsia;
+    // Peer identity set using IPCFrame sent by the client. These 2 fields
+    // should be used only for non-AF_UNIX connections AF_UNIX connections
+    // should only rely on the peer identity obtained from the socket.
+    uid_t uid_override = base::kInvalidUid;
+    pid_t pid_override = base::kInvalidPid;
+
+    pid_t GetLinuxPeerPid() const;
+    uid_t GetPosixPeerUid() const;
   };
   struct ExposedService {
     ExposedService(ServiceID, const std::string&, std::unique_ptr<Service>);
@@ -85,6 +94,8 @@
   void OnReceivedFrame(ClientConnection*, const Frame&);
   void OnBindService(ClientConnection*, const Frame&);
   void OnInvokeMethod(ClientConnection*, const Frame&);
+  void OnSetPeerIdentity(ClientConnection*, const Frame&);
+
   void ReplyToMethodInvocation(ClientID, RequestID, AsyncResult<ProtoMessage>);
   const ExposedService* GetServiceByName(const std::string&);
 
diff --git a/src/trace_processor/perfetto_sql/engine/perfetto_sql_engine.cc b/src/trace_processor/perfetto_sql/engine/perfetto_sql_engine.cc
index b34f556..1066192 100644
--- a/src/trace_processor/perfetto_sql/engine/perfetto_sql_engine.cc
+++ b/src/trace_processor/perfetto_sql/engine/perfetto_sql_engine.cc
@@ -91,6 +91,11 @@
   return status;
 }
 
+SqlSource RewriteToDummySql(const SqlSource& source) {
+  return source.RewriteAllIgnoreExisting(
+      SqlSource::FromTraceProcessorImplementation("SELECT 0 WHERE 0"));
+}
+
 }  // namespace
 
 PerfettoSqlEngine::PerfettoSqlEngine(StringPool* pool)
@@ -202,15 +207,13 @@
           RegisterRuntimeTable(cst->name, cst->sql), parser.statement_sql()));
       // Since the rest of the code requires a statement, just use a no-value
       // dummy statement.
-      source = parser.statement_sql().FullRewrite(
-          SqlSource::FromTraceProcessorImplementation("SELECT 0 WHERE 0"));
+      source = RewriteToDummySql(parser.statement_sql());
     } else if (auto* include = std::get_if<PerfettoSqlParser::Include>(
                    &parser.statement())) {
       RETURN_IF_ERROR(ExecuteInclude(*include, parser));
       // Since the rest of the code requires a statement, just use a no-value
       // dummy statement.
-      source = parser.statement_sql().FullRewrite(
-          SqlSource::FromTraceProcessorImplementation("SELECT 0 WHERE 0"));
+      source = RewriteToDummySql(parser.statement_sql());
     } else {
       // If none of the above matched, this must just be an SQL statement
       // directly executable by SQLite.
@@ -450,8 +453,7 @@
 
     // Since the rest of the code requires a statement, just use a no-value
     // dummy statement.
-    return parser.statement_sql().FullRewrite(
-        SqlSource::FromTraceProcessorImplementation("SELECT 0 WHERE 0"));
+    return RewriteToDummySql(parser.statement_sql());
   }
 
   RuntimeTableFunction::State state{cf.prototype, cf.sql, {}, {}, std::nullopt};
@@ -565,7 +567,7 @@
 
   base::StackString<1024> create(
       "CREATE VIRTUAL TABLE %s USING runtime_table_function", fn_name.c_str());
-  return cf.sql.FullRewrite(
+  return cf.sql.RewriteAllIgnoreExisting(
       SqlSource::FromTraceProcessorImplementation(create.ToStdString()));
 }
 
diff --git a/src/trace_processor/sqlite/sql_source.cc b/src/trace_processor/sqlite/sql_source.cc
index 5e12cd1..449e638 100644
--- a/src/trace_processor/sqlite/sql_source.cc
+++ b/src/trace_processor/sqlite/sql_source.cc
@@ -24,6 +24,7 @@
 #include <string>
 #include <string_view>
 #include <utility>
+#include <vector>
 
 #include "perfetto/base/logging.h"
 #include "perfetto/ext/base/string_utils.h"
@@ -140,9 +141,15 @@
   return source;
 }
 
-SqlSource SqlSource::FullRewrite(SqlSource source) const {
-  SqlSource::Rewriter rewriter(*this);
-  rewriter.Rewrite(0, static_cast<uint32_t>(sql().size()), source);
+SqlSource SqlSource::RewriteAllIgnoreExisting(SqlSource source) const {
+  // Reset any rewrites.
+  SqlSource copy = *this;
+  copy.root_.rewritten_sql = copy.root_.original_sql;
+  copy.root_.rewrites.clear();
+
+  SqlSource::Rewriter rewriter(std::move(copy));
+  rewriter.Rewrite(0, static_cast<uint32_t>(root_.original_sql.size()),
+                   std::move(source));
   return std::move(rewriter).Build();
 }
 
@@ -162,6 +169,7 @@
 }
 
 std::string SqlSource::Node::AsTraceback(uint32_t rewritten_offset) const {
+  PERFETTO_CHECK(rewritten_offset <= rewritten_sql.size());
   uint32_t original_offset = RewrittenOffsetToOriginalOffset(rewritten_offset);
   std::string res = SelfTraceback(original_offset);
   if (auto opt_idx = RewriteForOriginalOffset(original_offset); opt_idx) {
@@ -175,6 +183,7 @@
 }
 
 std::string SqlSource::Node::SelfTraceback(uint32_t original_offset) const {
+  PERFETTO_DCHECK(original_offset <= original_sql.size());
   auto [o_context, o_caret_pos] =
       SqlContextAndCaretPos(original_sql, original_offset);
   std::string header;
@@ -193,17 +202,54 @@
 }
 
 SqlSource::Node SqlSource::Node::Substr(uint32_t offset, uint32_t len) const {
-  PERFETTO_CHECK(rewrites.empty());
-  auto line_and_col =
-      GetLineAndColumnForOffset(rewritten_sql, line, col, offset);
+  uint32_t offset_end = offset + len;
+  PERFETTO_CHECK(offset_end <= rewritten_sql.size());
+
+  uint32_t original_offset_start = RewrittenOffsetToOriginalOffset(offset);
+  uint32_t original_offset_end = RewrittenOffsetToOriginalOffset(offset_end);
+  std::vector<Rewrite> new_rewrites;
+  for (const Rewrite& rewrite : rewrites) {
+    if (offset >= rewrite.rewritten_sql_end) {
+      continue;
+    }
+    if (offset_end < rewrite.rewritten_sql_start) {
+      break;
+    }
+    // Special case: when the end of the substr is in the middle of a rewrite,
+    // we actually want to capture the original SQL up to the end of the
+    // rewrite, not just to the start as |ChildRewrittenOffset| returns.
+    if (offset_end < rewrite.rewritten_sql_end) {
+      original_offset_end = rewrite.original_sql_end;
+    }
+    uint32_t bounded_start = std::max(offset, rewrite.rewritten_sql_start);
+    uint32_t bounded_end = std::min(offset_end, rewrite.rewritten_sql_end);
+
+    uint32_t nested_start = bounded_start - rewrite.rewritten_sql_start;
+    uint32_t nested_len = bounded_end - bounded_start;
+
+    new_rewrites.push_back(Rewrite{
+        rewrite.original_sql_start - original_offset_start,
+        rewrite.original_sql_end - original_offset_start,
+        bounded_start - offset,
+        bounded_end - offset,
+        rewrite.rewrite_node.Substr(nested_start, nested_len),
+    });
+  }
+  std::string new_original = original_sql.substr(
+      original_offset_start, original_offset_end - original_offset_start);
+  std::string new_rewritten = rewritten_sql.substr(offset, len);
+  PERFETTO_DCHECK(ApplyRewrites(new_original, new_rewrites) == new_rewritten);
+
+  auto line_and_col = GetLineAndColumnForOffset(rewritten_sql, line, col,
+                                                original_offset_start);
   return Node{
       name,
       include_traceback_header,
       line_and_col.first,
       line_and_col.second,
-      original_sql.substr(offset, len),
-      {},
-      rewritten_sql.substr(offset, len),
+      new_original,
+      std::move(new_rewrites),
+      new_rewritten,
   };
 }
 
diff --git a/src/trace_processor/sqlite/sql_source.h b/src/trace_processor/sqlite/sql_source.h
index b4b9870..a91ce50 100644
--- a/src/trace_processor/sqlite/sql_source.h
+++ b/src/trace_processor/sqlite/sql_source.h
@@ -78,15 +78,12 @@
   // at |offset| with |len| characters.
   SqlSource Substr(uint32_t offset, uint32_t len) const;
 
-  // Creates a SqlSource instance with the execution SQL rewritten to
-  // |rewrite_sql| but preserving the context from |this|.
+  // Rewrites the SQL backing |this| to SQL from |source| ignoring any existing
+  // rewrites in |this|.
   //
   // This is useful when PerfettoSQL statements are transpiled into SQLite
   // statements but we want to preserve the context of the original statement.
-  //
-  // Note: this function should only be called if |this| has not already been
-  // rewritten (i.e. it is undefined behaviour if |IsRewritten()| returns true).
-  SqlSource FullRewrite(SqlSource) const;
+  SqlSource RewriteAllIgnoreExisting(SqlSource source) const;
 
   // Returns the SQL string backing this SqlSource instance;
   const std::string& sql() const { return root_.rewritten_sql; }
diff --git a/src/trace_processor/sqlite/sql_source_unittest.cc b/src/trace_processor/sqlite/sql_source_unittest.cc
index 945144e..ef8500b 100644
--- a/src/trace_processor/sqlite/sql_source_unittest.cc
+++ b/src/trace_processor/sqlite/sql_source_unittest.cc
@@ -53,10 +53,10 @@
             "          ^\n");
 }
 
-TEST(SqlSourceTest, FullRewrite) {
+TEST(SqlSourceTest, RewriteAllIgnoreExisting) {
   SqlSource source =
       SqlSource::FromExecuteQuery("macro!()")
-          .FullRewrite(SqlSource::FromTraceProcessorImplementation(
+          .RewriteAllIgnoreExisting(SqlSource::FromTraceProcessorImplementation(
               "SELECT * FROM slice"));
   ASSERT_EQ(source.sql(), "SELECT * FROM slice");
 
@@ -81,12 +81,12 @@
 TEST(SqlSourceTest, NestedFullRewrite) {
   SqlSource nested =
       SqlSource::FromTraceProcessorImplementation("nested!()")
-          .FullRewrite(SqlSource::FromTraceProcessorImplementation(
+          .RewriteAllIgnoreExisting(SqlSource::FromTraceProcessorImplementation(
               "SELECT * FROM slice"));
   ASSERT_EQ(nested.sql(), "SELECT * FROM slice");
 
-  SqlSource source =
-      SqlSource::FromExecuteQuery("macro!()").FullRewrite(std::move(nested));
+  SqlSource source = SqlSource::FromExecuteQuery("macro!()")
+                         .RewriteAllIgnoreExisting(std::move(nested));
   ASSERT_EQ(source.sql(), "SELECT * FROM slice");
 
   ASSERT_EQ(source.AsTraceback(0),
@@ -113,6 +113,32 @@
             "           ^\n");
 }
 
+TEST(SqlSourceTest, RewriteAllIgnoresExistingCorrectly) {
+  SqlSource foo =
+      SqlSource::FromExecuteQuery("foo!()").RewriteAllIgnoreExisting(
+          SqlSource::FromTraceProcessorImplementation("SELECT * FROM slice"));
+  SqlSource source = foo.RewriteAllIgnoreExisting(
+      SqlSource::FromTraceProcessorImplementation("SELECT 0 WHERE 0"));
+  ASSERT_EQ(source.sql(), "SELECT 0 WHERE 0");
+
+  ASSERT_EQ(source.AsTraceback(0),
+            "Traceback (most recent call last):\n"
+            "  File \"stdin\" line 1 col 1\n"
+            "    foo!()\n"
+            "    ^\n"
+            "  Trace Processor Internal line 1 col 1\n"
+            "    SELECT 0 WHERE 0\n"
+            "    ^\n");
+  ASSERT_EQ(source.AsTraceback(4),
+            "Traceback (most recent call last):\n"
+            "  File \"stdin\" line 1 col 1\n"
+            "    foo!()\n"
+            "    ^\n"
+            "  Trace Processor Internal line 1 col 5\n"
+            "    SELECT 0 WHERE 0\n"
+            "        ^\n");
+}
+
 TEST(SqlSourceTest, Rewriter) {
   SqlSource::Rewriter rewriter(
       SqlSource::FromExecuteQuery("SELECT cols!() FROM slice"));
@@ -212,6 +238,91 @@
             "      ^\n");
 }
 
+TEST(SqlSourceTest, NestedRewriteSubstr) {
+  SqlSource::Rewriter nested_rewrite(
+      SqlSource::FromTraceProcessorImplementation(
+          "id, common_cols!(), other_cols!(), name"));
+  nested_rewrite.Rewrite(
+      4, 18, SqlSource::FromTraceProcessorImplementation("ts, dur"));
+  nested_rewrite.Rewrite(20, 33,
+                         SqlSource::FromTraceProcessorImplementation("depth"));
+
+  SqlSource::Rewriter rewriter(
+      SqlSource::FromExecuteQuery("SELECT cols!() FROM slice"));
+  rewriter.Rewrite(7, 14, std::move(nested_rewrite).Build());
+
+  SqlSource rewritten = std::move(rewriter).Build();
+  ASSERT_EQ(rewritten.sql(), "SELECT id, ts, dur, depth, name FROM slice");
+
+  // Full macro cover.
+  SqlSource cols = rewritten.Substr(7, 24);
+  ASSERT_EQ(cols.sql(), "id, ts, dur, depth, name");
+  ASSERT_EQ(cols.AsTraceback(0),
+            "Traceback (most recent call last):\n"
+            "  File \"stdin\" line 1 col 8\n"
+            "    cols!()\n"
+            "    ^\n"
+            "  Trace Processor Internal line 1 col 1\n"
+            "    id, common_cols!(), other_cols!(), name\n"
+            "    ^\n");
+  ASSERT_EQ(cols.AsTraceback(5),
+            "Traceback (most recent call last):\n"
+            "  File \"stdin\" line 1 col 8\n"
+            "    cols!()\n"
+            "    ^\n"
+            "  Trace Processor Internal line 1 col 5\n"
+            "    id, common_cols!(), other_cols!(), name\n"
+            "        ^\n"
+            "  Trace Processor Internal line 1 col 2\n"
+            "    ts, dur\n"
+            "     ^\n");
+  ASSERT_EQ(cols.AsTraceback(14),
+            "Traceback (most recent call last):\n"
+            "  File \"stdin\" line 1 col 8\n"
+            "    cols!()\n"
+            "    ^\n"
+            "  Trace Processor Internal line 1 col 21\n"
+            "    id, common_cols!(), other_cols!(), name\n"
+            "                        ^\n"
+            "  Trace Processor Internal line 1 col 2\n"
+            "    depth\n"
+            "     ^\n");
+
+  // Intersect with nested.
+  SqlSource intersect = rewritten.Substr(8, 13);
+  ASSERT_EQ(intersect.sql(), "d, ts, dur, d");
+  ASSERT_EQ(intersect.AsTraceback(0),
+            "Traceback (most recent call last):\n"
+            "  File \"stdin\" line 1 col 8\n"
+            "    cols!()\n"
+            "    ^\n"
+            "  Trace Processor Internal line 1 col 2\n"
+            "    d, common_cols!(), other_cols!()\n"
+            "    ^\n");
+  ASSERT_EQ(intersect.AsTraceback(4),
+            "Traceback (most recent call last):\n"
+            "  File \"stdin\" line 1 col 8\n"
+            "    cols!()\n"
+            "    ^\n"
+            "  Trace Processor Internal line 1 col 5\n"
+            "    d, common_cols!(), other_cols!()\n"
+            "       ^\n"
+            "  Trace Processor Internal line 1 col 2\n"
+            "    ts, dur\n"
+            "     ^\n");
+  ASSERT_EQ(intersect.AsTraceback(12),
+            "Traceback (most recent call last):\n"
+            "  File \"stdin\" line 1 col 8\n"
+            "    cols!()\n"
+            "    ^\n"
+            "  Trace Processor Internal line 1 col 21\n"
+            "    d, common_cols!(), other_cols!()\n"
+            "                       ^\n"
+            "  Trace Processor Internal line 1 col 1\n"
+            "    d\n"
+            "    ^\n");
+}
+
 }  // namespace
 }  // namespace trace_processor
 }  // namespace perfetto
diff --git a/src/traced/service/service.cc b/src/traced/service/service.cc
index edbc981..a9b6bb7 100644
--- a/src/traced/service/service.cc
+++ b/src/traced/service/service.cc
@@ -17,6 +17,7 @@
 #include <stdio.h>
 #include <algorithm>
 
+#include "perfetto/base/status.h"
 #include "perfetto/ext/base/file_utils.h"
 #include "perfetto/ext/base/getopt.h"
 #include "perfetto/ext/base/string_utils.h"
@@ -29,16 +30,6 @@
 #include "perfetto/ext/tracing/ipc/service_ipc_host.h"
 #include "src/traced/service/builtin_producer.h"
 
-#if PERFETTO_BUILDFLAG(PERFETTO_OS_LINUX) || \
-    PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
-#define PERFETTO_SET_SOCKET_PERMISSIONS
-#include <fcntl.h>
-#include <grp.h>
-#include <sys/stat.h>
-#include <sys/types.h>
-#include <unistd.h>
-#endif
-
 #if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
 #include <sys/system_properties.h>
 #endif
@@ -49,40 +40,6 @@
 
 namespace perfetto {
 namespace {
-#if defined(PERFETTO_SET_SOCKET_PERMISSIONS)
-void SetSocketPermissions(const std::string& socket_name,
-                          const std::string& group_name,
-                          const std::string& mode_bits) {
-  PERFETTO_CHECK(!socket_name.empty());
-  PERFETTO_CHECK(!group_name.empty());
-  struct group* socket_group = nullptr;
-  // Query the group ID of |group|.
-  do {
-    socket_group = getgrnam(group_name.c_str());
-  } while (socket_group == nullptr && errno == EINTR);
-  if (socket_group == nullptr) {
-    PERFETTO_FATAL("Failed to get group information of %s ",
-                   group_name.c_str());
-  }
-
-  if (PERFETTO_EINTR(
-          chown(socket_name.c_str(), geteuid(), socket_group->gr_gid))) {
-    PERFETTO_FATAL("Failed to chown %s ", socket_name.c_str());
-  }
-
-  // |mode| accepts values like "0660" as "rw-rw----" mode bits.
-  auto mode_value = base::StringToInt32(mode_bits, 8);
-  if (!(mode_bits.size() == 4 && mode_value.has_value())) {
-    PERFETTO_FATAL(
-        "The chmod option must be a 4-digit octal number, e.g. 0660");
-  }
-  if (PERFETTO_EINTR(chmod(socket_name.c_str(),
-                           static_cast<mode_t>(mode_value.value())))) {
-    PERFETTO_FATAL("Failed to chmod %s", socket_name.c_str());
-  }
-}
-#endif  // defined(PERFETTO_SET_SOCKET_PERMISSIONS)
-
 void PrintUsage(const char* prog_name) {
   fprintf(stderr, R"(
 Usage: %s [option] ...
@@ -192,18 +149,21 @@
     started = svc->Start(producer_sockets, GetConsumerSocket());
 
     if (!producer_socket_group.empty()) {
-#if defined(PERFETTO_SET_SOCKET_PERMISSIONS)
+      auto status = base::OkStatus();
       for (const auto& producer_socket : producer_sockets) {
-        SetSocketPermissions(producer_socket, producer_socket_group,
-                             producer_socket_mode);
+        status = base::SetFilePermissions(
+            producer_socket, producer_socket_group, producer_socket_mode);
+        if (!status.ok()) {
+          PERFETTO_ELOG("%s", status.c_message());
+          return 1;
+        }
       }
-      SetSocketPermissions(GetConsumerSocket(), consumer_socket_group,
-                           consumer_socket_mode);
-#else
-      PERFETTO_ELOG(
-          "Setting socket permissions is not supported on this platform");
-      return 1;
-#endif
+      status = base::SetFilePermissions(
+          GetConsumerSocket(), consumer_socket_group, consumer_socket_mode);
+      if (!status.ok()) {
+        PERFETTO_ELOG("%s", status.c_message());
+        return 1;
+      }
     }
   }
 
diff --git a/src/traced_relay/BUILD.gn b/src/traced_relay/BUILD.gn
new file mode 100644
index 0000000..d4e27cc
--- /dev/null
+++ b/src/traced_relay/BUILD.gn
@@ -0,0 +1,47 @@
+# Copyright (C) 2023 The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import("../../gn/perfetto.gni")
+import("../../gn/perfetto_component.gni")
+
+executable("traced_relay") {
+  deps = [
+    ":lib",
+    "../../gn:default_deps",
+    "../../include/perfetto/ext/traced",
+    "../base",
+    "../base:unix_socket",
+    "../base:version",
+    "../ipc:perfetto_ipc",
+    "../tracing/ipc:default_socket",
+  ]
+  sources = [ "relay_service_main.cc" ]
+}
+
+source_set("lib") {
+  public_deps = [ "../../include/perfetto/ext/tracing/ipc" ]
+  sources = [
+    "relay_service.cc",
+    "relay_service.h",
+    "socket_relay_handler.cc",
+    "socket_relay_handler.h",
+  ]
+  deps = [
+    "../../gn:default_deps",
+    "../../protos/perfetto/ipc",
+    "../../protos/perfetto/ipc:wire_protocol_cpp",
+    "../base",
+    "//src/ipc:perfetto_ipc",
+  ]
+}
diff --git a/src/traced_relay/relay_service.cc b/src/traced_relay/relay_service.cc
new file mode 100644
index 0000000..4b1dc6b
--- /dev/null
+++ b/src/traced_relay/relay_service.cc
@@ -0,0 +1,123 @@
+/*
+ * Copyright (C) 2023 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "src/traced_relay/relay_service.h"
+#include <memory>
+
+#include "perfetto/base/logging.h"
+#include "perfetto/base/task_runner.h"
+#include "perfetto/ext/base/unix_socket.h"
+#include "perfetto/ext/base/utils.h"
+#include "protos/perfetto/ipc/wire_protocol.gen.h"
+#include "src/ipc/buffered_frame_deserializer.h"
+#include "src/traced_relay/socket_relay_handler.h"
+
+using ::perfetto::protos::gen::IPCFrame;
+
+namespace perfetto {
+
+RelayService::RelayService(base::TaskRunner* task_runner)
+    : task_runner_(task_runner) {}
+
+void RelayService::Start(const char* listening_socket_name,
+                         const char* client_socket_name) {
+  auto sock_family = base::GetSockFamily(listening_socket_name);
+  listening_socket_ =
+      base::UnixSocket::Listen(listening_socket_name, this, task_runner_,
+                               sock_family, base::SockType::kStream);
+  bool producer_socket_listening =
+      listening_socket_ && listening_socket_->is_listening();
+  if (!producer_socket_listening) {
+    PERFETTO_FATAL("Failed to listen to socket %s", listening_socket_name);
+  }
+
+  // Save |client_socket_name| for opening new client connection to remote
+  // service when a local producer connects.
+  client_socket_name_ = client_socket_name;
+}
+
+void RelayService::OnNewIncomingConnection(
+    base::UnixSocket* listen_socket,
+    std::unique_ptr<base::UnixSocket> server_conn) {
+  PERFETTO_DCHECK(listen_socket == listening_socket_.get());
+
+  // Create a connection to the host to pair with |listen_conn|.
+  auto sock_family = base::GetSockFamily(client_socket_name_.c_str());
+  auto client_conn =
+      base::UnixSocket::Connect(client_socket_name_, this, task_runner_,
+                                sock_family, base::SockType::kStream);
+
+  // Pre-queue the SetPeerIdentity request. By enqueueing it into the buffer,
+  // this will be sent out as first frame as soon as we connect to the real
+  // traced.
+  //
+  // This code pretends that we received a SetPeerIdentity frame from the
+  // connecting producer (while instead we are just forging it). The host traced
+  // will only accept only one SetPeerIdentity request pre-queued here.
+  IPCFrame ipc_frame;
+  ipc_frame.set_request_id(0);
+  auto* set_peer_identity = ipc_frame.mutable_set_peer_identity();
+  set_peer_identity->set_pid(server_conn->peer_pid_linux());
+  set_peer_identity->set_uid(
+      static_cast<int32_t>(server_conn->peer_uid_posix()));
+
+  // Buffer the SetPeerIdentity request.
+  auto req = ipc::BufferedFrameDeserializer::Serialize(ipc_frame);
+  SocketWithBuffer server, client;
+  PERFETTO_CHECK(server.available_bytes() >= req.size());
+  memcpy(server.buffer(), req.data(), req.size());
+  server.EnqueueData(req.size());
+
+  // Shut down all callbacks associated with the socket in preparation for the
+  // transfer to |socket_relay_handler_|.
+  server.sock = server_conn->ReleaseSocket();
+  auto new_socket_pair =
+      std::make_unique<SocketPair>(std::move(server), std::move(client));
+  pending_connections_.push_back(
+      {std::move(new_socket_pair), std::move(client_conn)});
+}
+
+void RelayService::OnConnect(base::UnixSocket* self, bool connected) {
+  // This only happens when the client connection is connected or has failed.
+  auto it =
+      std::find_if(pending_connections_.begin(), pending_connections_.end(),
+                   [&](const PendingConnection& pending_conn) {
+                     return pending_conn.connecting_client_conn.get() == self;
+                   });
+  PERFETTO_CHECK(it != pending_connections_.end());
+  // Need to remove the element in |pending_connections_| regardless of
+  // |connected|.
+  auto remover = base::OnScopeExit([&]() { pending_connections_.erase(it); });
+
+  if (!connected)
+    return;  // This closes both sockets in PendingConnection.
+
+  // Shut down event handlers and pair with a server connection.
+  it->socket_pair->second.sock = self->ReleaseSocket();
+
+  // Transfer the socket pair to SocketRelayHandler.
+  socket_relay_handler_.AddSocketPair(std::move(it->socket_pair));
+}
+
+void RelayService::OnDisconnect(base::UnixSocket*) {
+  PERFETTO_DFATAL("Should be unreachable.");
+}
+
+void RelayService::OnDataAvailable(base::UnixSocket*) {
+  PERFETTO_DFATAL("Should be unreachable.");
+}
+
+}  // namespace perfetto
diff --git a/src/traced_relay/relay_service.h b/src/traced_relay/relay_service.h
new file mode 100644
index 0000000..8e5bf6d
--- /dev/null
+++ b/src/traced_relay/relay_service.h
@@ -0,0 +1,75 @@
+/*
+ * Copyright (C) 2023 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef SRC_TRACED_RELAY_RELAY_SERVICE_H_
+#define SRC_TRACED_RELAY_RELAY_SERVICE_H_
+
+#include <memory>
+#include <vector>
+
+#include "perfetto/ext/base/unix_socket.h"
+#include "src/traced_relay/socket_relay_handler.h"
+
+namespace perfetto {
+
+namespace base {
+class TaskRunner;
+}  // namespace base.
+
+// A class for relaying the producer data between the local producers and the
+// remote tracing service.
+class RelayService : public base::UnixSocket::EventListener {
+ public:
+  explicit RelayService(base::TaskRunner* task_runner);
+  ~RelayService() override = default;
+
+  // Starts the service relay that forwards messages between the
+  // |server_socket_name| and |client_socket_name| ports.
+  void Start(const char* server_socket_name, const char* client_socket_name);
+
+ private:
+  struct PendingConnection {
+    // This keeps a connected UnixSocketRaw server socket in its first element.
+    std::unique_ptr<SocketPair> socket_pair;
+    // This keeps the connecting client connection.
+    std::unique_ptr<base::UnixSocket> connecting_client_conn;
+  };
+
+  RelayService(const RelayService&) = delete;
+  RelayService& operator=(const RelayService&) = delete;
+
+  // UnixSocket::EventListener implementation.
+  void OnNewIncomingConnection(base::UnixSocket*,
+                               std::unique_ptr<base::UnixSocket>) override;
+  void OnConnect(base::UnixSocket* self, bool connected) override;
+  void OnDisconnect(base::UnixSocket* self) override;
+  void OnDataAvailable(base::UnixSocket* self) override;
+
+  base::TaskRunner* const task_runner_ = nullptr;
+
+  std::unique_ptr<base::UnixSocket> listening_socket_;
+  std::string client_socket_name_;
+
+  // Keeps the socket pairs while waiting for relay connections to be
+  // established.
+  std::vector<PendingConnection> pending_connections_;
+
+  SocketRelayHandler socket_relay_handler_;
+};
+
+}  // namespace perfetto
+
+#endif  // SRC_TRACED_RELAY_RELAY_SERVICE_H_
diff --git a/src/traced_relay/relay_service_main.cc b/src/traced_relay/relay_service_main.cc
new file mode 100644
index 0000000..9562b2d
--- /dev/null
+++ b/src/traced_relay/relay_service_main.cc
@@ -0,0 +1,134 @@
+/*
+ * Copyright (C) 2023 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "perfetto/ext/base/file_utils.h"
+#include "perfetto/ext/base/getopt.h"
+#include "perfetto/ext/base/string_utils.h"
+#include "perfetto/ext/base/unix_task_runner.h"
+#include "perfetto/ext/base/version.h"
+#include "perfetto/ext/base/watchdog.h"
+#include "perfetto/ext/traced/traced.h"
+#include "perfetto/ext/tracing/ipc/default_socket.h"
+#include "src/traced_relay/relay_service.h"
+
+namespace perfetto {
+namespace {
+void PrintUsage(const char* prog_name) {
+  fprintf(stderr, R"(
+Usage: %s [option] ...
+Options and arguments
+    --background : Exits immediately and continues running in the background
+    --version : print the version number and exit.
+    --set-socket-permissions <permissions> : sets group ownership and permission
+        mode bits of the listening socket.
+        <permissions> format: <prod_group>:<prod_mode>,
+        where <prod_group> is the group name for chgrp the listening socket,
+        <prod_mode> is the mode bits (e.g. 0660) for chmod the producer socket,
+
+Example:
+    %s --set-socket-permissions traced-producer:0660 starts the service and sets
+    the group ownership of the listening socket to "traced-producer". The
+    listening socket is chmod with 0660 (rw-rw----) mode bits. )",
+          prog_name, prog_name);
+}
+
+}  // namespace
+
+static int RelayServiceMain(int argc, char** argv) {
+  enum LongOption {
+    OPT_VERSION = 1000,
+    OPT_SET_SOCKET_PERMISSIONS = 1001,
+    OPT_BACKGROUND,
+  };
+
+  bool background = false;
+
+  static const option long_options[] = {
+      {"background", no_argument, nullptr, OPT_BACKGROUND},
+      {"version", no_argument, nullptr, OPT_VERSION},
+      {"set-socket-permissions", required_argument, nullptr,
+       OPT_SET_SOCKET_PERMISSIONS},
+      {nullptr, 0, nullptr, 0}};
+
+  std::string listen_socket_group, consumer_socket_group,
+      listen_socket_mode_bits, consumer_socket_mode;
+
+  for (;;) {
+    int option = getopt_long(argc, argv, "", long_options, nullptr);
+    if (option == -1)
+      break;
+    switch (option) {
+      case OPT_BACKGROUND:
+        background = true;
+        break;
+      case OPT_VERSION:
+        printf("%s\n", base::GetVersionString());
+        return 0;
+      case OPT_SET_SOCKET_PERMISSIONS: {
+        // Check that the socket permission argument is well formed.
+        auto parts = perfetto::base::SplitString(std::string(optarg), ":");
+        PERFETTO_CHECK(parts.size() == 2);
+        PERFETTO_CHECK(
+            std::all_of(parts.cbegin(), parts.cend(),
+                        [](const std::string& part) { return !part.empty(); }));
+        listen_socket_group = parts[0];
+        listen_socket_mode_bits = parts[1];
+        break;
+      }
+      default:
+        PrintUsage(argv[0]);
+        return 1;
+    }
+  }
+
+  if (background) {
+    base::Daemonize([] { return 0; });
+  }
+
+  auto listen_socket = GetProducerSocket();
+  remove(listen_socket);
+  if (!listen_socket_group.empty()) {
+    auto status = base::SetFilePermissions(listen_socket, listen_socket_group,
+                                           listen_socket_mode_bits);
+    if (!status.ok()) {
+      PERFETTO_ELOG("Failed to set socket permissions: %s", status.c_message());
+      return 1;
+    }
+  }
+
+  base::UnixTaskRunner task_runner;
+  auto svc = std::make_unique<RelayService>(&task_runner);
+  svc->Start(listen_socket, GetRelaySocket());
+
+  // Set the CPU limit and start the watchdog running. The memory limit will
+  // be set inside the service code as it relies on the size of buffers.
+  // The CPU limit is the generic one defined in watchdog.h.
+  base::Watchdog* watchdog = base::Watchdog::GetInstance();
+  watchdog->SetCpuLimit(base::kWatchdogDefaultCpuLimit,
+                        base::kWatchdogDefaultCpuWindow);
+  watchdog->Start();
+
+  PERFETTO_ILOG("Started traced_relay, listening on %s, forwarding to %s",
+                GetProducerSocket(), GetRelaySocket());
+
+  task_runner.Run();
+  return 0;
+}
+}  // namespace perfetto
+
+int main(int argc, char** argv) {
+  return perfetto::RelayServiceMain(argc, argv);
+}
diff --git a/src/traced_relay/socket_relay_handler.cc b/src/traced_relay/socket_relay_handler.cc
new file mode 100644
index 0000000..2930c86
--- /dev/null
+++ b/src/traced_relay/socket_relay_handler.cc
@@ -0,0 +1,287 @@
+/*
+ * Copyright (C) 2023 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "src/traced_relay/socket_relay_handler.h"
+
+#include <fcntl.h>
+#include <sys/poll.h>
+#include <algorithm>
+#include <memory>
+#include <mutex>
+#include <thread>
+#include <utility>
+
+#include "perfetto/base/logging.h"
+#include "perfetto/base/platform_handle.h"
+#include "perfetto/ext/base/thread_checker.h"
+#include "perfetto/ext/base/utils.h"
+#include "perfetto/ext/base/watchdog.h"
+#include "perfetto/ext/base/watchdog_posix.h"
+
+namespace perfetto {
+namespace {
+// Use the the default watchdog timeout for task runners.
+static constexpr int kWatchdogTimeoutMs = 30000;
+// Timeout of the epoll_wait() call.
+static constexpr int kPollTimeoutMs = 30000;
+}  // namespace
+
+FdPoller::Watcher::~Watcher() = default;
+
+FdPoller::FdPoller(Watcher* watcher) : watcher_(watcher) {
+  WatchForRead(notify_fd_.fd());
+
+  // This is done last in the ctor because WatchForRead() asserts using
+  // |thread_checker_|.
+  PERFETTO_DETACH_FROM_THREAD(thread_checker_);
+}
+
+void FdPoller::Poll() {
+  PERFETTO_DCHECK_THREAD(thread_checker_);
+
+  int num_fds =
+      PERFETTO_EINTR(poll(&poll_fds_[0], poll_fds_.size(), kPollTimeoutMs));
+  if (num_fds == -1 && base::IsAgain(errno))
+    return;  // Poll again.
+  PERFETTO_DCHECK(num_fds <= static_cast<int>(poll_fds_.size()));
+
+  // Make a copy of |poll_fds_| so it's safe to watch and unwatch while
+  // notifying the watcher.
+  const auto poll_fds(poll_fds_);
+
+  for (const auto& event : poll_fds) {
+    if (!event.revents)  // This event isn't active.
+      continue;
+
+    // Check whether the poller needs to break the polling loop for updates.
+    if (event.fd == notify_fd_.fd()) {
+      notify_fd_.Clear();
+      continue;
+    }
+
+    // Notify the callers on fd events.
+    if (event.revents & POLLOUT) {
+      watcher_->OnFdWritable(event.fd);
+    } else if (event.revents & POLLIN) {
+      watcher_->OnFdReadable(event.fd);
+    } else {
+      PERFETTO_DLOG("poll() returns events %d on fd %d", event.events,
+                    event.fd);
+    }  // Other events like POLLHUP or POLLERR are ignored.
+  }
+}
+
+void FdPoller::Notify() {
+  // Can be called from any thread.
+  notify_fd_.Notify();
+}
+
+std::vector<pollfd>::iterator FdPoller::FindPollEvent(base::PlatformHandle fd) {
+  PERFETTO_DCHECK_THREAD(thread_checker_);
+
+  return std::find_if(poll_fds_.begin(), poll_fds_.end(),
+                      [fd](const pollfd& item) { return fd == item.fd; });
+}
+
+void FdPoller::WatchFd(base::PlatformHandle fd, WatchEvents events) {
+  auto it = FindPollEvent(fd);
+  if (it == poll_fds_.end()) {
+    poll_fds_.push_back({fd, events, 0});
+  } else {
+    it->events |= events;
+  }
+}
+
+void FdPoller::UnwatchFd(base::PlatformHandle fd, WatchEvents events) {
+  auto it = FindPollEvent(fd);
+  PERFETTO_CHECK(it != poll_fds_.end());
+  it->events &= ~events;
+}
+
+void FdPoller::RemoveWatch(base::PlatformHandle fd) {
+  auto it = FindPollEvent(fd);
+  PERFETTO_CHECK(it != poll_fds_.end());
+  poll_fds_.erase(it);
+}
+
+SocketRelayHandler::SocketRelayHandler() : fd_poller_(this) {
+  PERFETTO_DETACH_FROM_THREAD(io_thread_checker_);
+
+  io_thread_ = std::thread([this]() { this->Run(); });
+}
+
+SocketRelayHandler::~SocketRelayHandler() {
+  RunOnIOThread([this]() { this->exited_ = true; });
+  io_thread_.join();
+}
+
+void SocketRelayHandler::AddSocketPair(
+    std::unique_ptr<SocketPair> socket_pair) {
+  RunOnIOThread([this, socket_pair = std::move(socket_pair)]() mutable {
+    PERFETTO_DCHECK_THREAD(io_thread_checker_);
+
+    base::PlatformHandle fd1 = socket_pair->first.sock.fd();
+    base::PlatformHandle fd2 = socket_pair->second.sock.fd();
+    auto* ptr = socket_pair.get();
+    socket_pairs_.emplace_back(std::move(socket_pair));
+
+    fd_poller_.WatchForRead(fd1);
+    fd_poller_.WatchForRead(fd2);
+
+    socket_pairs_by_fd_[fd1] = ptr;
+    socket_pairs_by_fd_[fd2] = ptr;
+  });
+}
+
+void SocketRelayHandler::Run() {
+  PERFETTO_DCHECK_THREAD(io_thread_checker_);
+
+  while (!exited_) {
+    fd_poller_.Poll();
+
+    auto handle = base::Watchdog::GetInstance()->CreateFatalTimer(
+        kWatchdogTimeoutMs, base::WatchdogCrashReason::kTaskRunnerHung);
+
+    std::deque<std::packaged_task<void()>> pending_tasks;
+    {
+      std::lock_guard<std::mutex> lock(mutex_);
+      pending_tasks = std::move(pending_tasks_);
+    }
+    while (!pending_tasks.empty()) {
+      auto task = std::move(pending_tasks.front());
+      pending_tasks.pop_front();
+      task();
+    }
+  }
+}
+
+void SocketRelayHandler::OnFdReadable(base::PlatformHandle fd) {
+  PERFETTO_DCHECK_THREAD(io_thread_checker_);
+
+  auto socket_pair = GetSocketPair(fd);
+  if (!socket_pair)
+    return;  // Already removed.
+
+  auto [fd_sock, peer_sock] = *socket_pair;
+  // Buffer some bytes.
+  auto peer_fd = peer_sock.sock.fd();
+  while (fd_sock.available_bytes() > 0) {
+    auto rsize =
+        fd_sock.sock.Receive(fd_sock.buffer(), fd_sock.available_bytes());
+    if (rsize > 0) {
+      fd_sock.EnqueueData(static_cast<size_t>(rsize));
+      continue;
+    }
+
+    if (rsize == 0 || (rsize == -1 && !base::IsAgain(errno))) {
+      // TODO(chinglinyu): flush the remaining data to |peer_sock|.
+      RemoveSocketPair(fd_sock, peer_sock);
+      return;
+    }
+
+    // If there is any buffered data that needs to be sent to |peer_sock|, arm
+    // the write watcher.
+    if (fd_sock.data_size() > 0) {
+      fd_poller_.WatchForWrite(peer_fd);
+    }
+    return;
+  }
+  // We are not bufferable: need to turn off POLLIN to avoid spinning.
+  fd_poller_.UnwatchForRead(fd);
+  PERFETTO_DCHECK(fd_sock.data_size() > 0);
+  // Watching for POLLOUT will cause an OnFdWritable() event of
+  // |peer_sock|.
+  fd_poller_.WatchForWrite(peer_fd);
+}
+
+void SocketRelayHandler::OnFdWritable(base::PlatformHandle fd) {
+  PERFETTO_DCHECK_THREAD(io_thread_checker_);
+
+  auto socket_pair = GetSocketPair(fd);
+  if (!socket_pair)
+    return;  // Already removed.
+
+  auto [fd_sock, peer_sock] = *socket_pair;
+  // |fd_sock| can be written to without blocking. Now we can transfer from the
+  // buffer in |peer_sock|.
+  while (peer_sock.data_size() > 0) {
+    auto wsize = fd_sock.sock.Send(peer_sock.data(), peer_sock.data_size());
+    if (wsize > 0) {
+      peer_sock.DequeueData(static_cast<size_t>(wsize));
+      continue;
+    }
+
+    if (wsize == -1 && !base::IsAgain(errno)) {
+      RemoveSocketPair(fd_sock, peer_sock);
+    }
+    // errno == EAGAIN and we still have data to send: continue watching for
+    // read.
+    return;
+  }
+
+  // We don't have buffered data to send. Disable watching for write.
+  fd_poller_.UnwatchForWrite(fd);
+  auto peer_fd = peer_sock.sock.fd();
+  if (peer_sock.available_bytes())
+    fd_poller_.WatchForRead(peer_fd);
+}
+
+std::optional<std::tuple<SocketWithBuffer&, SocketWithBuffer&>>
+SocketRelayHandler::GetSocketPair(base::PlatformHandle fd) {
+  PERFETTO_DCHECK_THREAD(io_thread_checker_);
+
+  auto* socket_pair = socket_pairs_by_fd_.Find(fd);
+  if (!socket_pair)
+    return std::nullopt;
+
+  PERFETTO_DCHECK(fd == (*socket_pair)->first.sock.fd() ||
+                  fd == (*socket_pair)->second.sock.fd());
+
+  if (fd == (*socket_pair)->first.sock.fd())
+    return std::tie((*socket_pair)->first, (*socket_pair)->second);
+
+  return std::tie((*socket_pair)->second, (*socket_pair)->first);
+}
+
+void SocketRelayHandler::RemoveSocketPair(SocketWithBuffer& sock1,
+                                          SocketWithBuffer& sock2) {
+  PERFETTO_DCHECK_THREAD(io_thread_checker_);
+
+  auto fd1 = sock1.sock.fd();
+  auto fd2 = sock2.sock.fd();
+  fd_poller_.RemoveWatch(fd1);
+  fd_poller_.RemoveWatch(fd2);
+
+  auto* ptr1 = socket_pairs_by_fd_.Find(fd1);
+  auto* ptr2 = socket_pairs_by_fd_.Find(fd2);
+  PERFETTO_DCHECK(ptr1 && ptr2);
+  PERFETTO_DCHECK(*ptr1 == *ptr2);
+
+  auto* socket_pair_ptr = *ptr1;
+
+  socket_pairs_by_fd_.Erase(fd1);
+  socket_pairs_by_fd_.Erase(fd2);
+
+  socket_pairs_.erase(
+      std::remove_if(
+          socket_pairs_.begin(), socket_pairs_.end(),
+          [socket_pair_ptr](const std::unique_ptr<SocketPair>& item) {
+            return item.get() == socket_pair_ptr;
+          }),
+      socket_pairs_.end());
+}
+
+}  // namespace perfetto
diff --git a/src/traced_relay/socket_relay_handler.h b/src/traced_relay/socket_relay_handler.h
new file mode 100644
index 0000000..39579a9
--- /dev/null
+++ b/src/traced_relay/socket_relay_handler.h
@@ -0,0 +1,190 @@
+/*
+ * Copyright (C) 2023 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef SRC_TRACED_RELAY_SOCKET_RELAY_HANDLER_H_
+#define SRC_TRACED_RELAY_SOCKET_RELAY_HANDLER_H_
+
+#include <poll.h>
+
+#include <cstring>
+#include <deque>
+#include <future>
+#include <mutex>
+#include <optional>
+#include <thread>
+#include <tuple>
+
+#include "perfetto/base/platform_handle.h"
+#include "perfetto/ext/base/event_fd.h"
+#include "perfetto/ext/base/flat_hash_map.h"
+#include "perfetto/ext/base/thread_checker.h"
+#include "perfetto/ext/base/unix_socket.h"
+#include "perfetto/ext/ipc/basic_types.h"
+
+namespace perfetto {
+
+// FdPoller is a utility for waiting for IO events of a set of watched file
+// descriptors. It's used for multiplexing non-blocking IO operations.
+class FdPoller {
+ public:
+  // The interface class for observing IO events from the FdPoller class.
+  class Watcher {
+   public:
+    virtual ~Watcher();
+    // Called when |fd| can be read from without blocking. For a socket
+    // connection, this indicates the socket read buffer has some data.
+    virtual void OnFdReadable(base::PlatformHandle fd) = 0;
+    // Called when |fd| can be written to without blocking. For a socket
+    // connection, this indicates that the socket write buffer has some capacity
+    // for writting data into.
+    virtual void OnFdWritable(base::PlatformHandle fd) = 0;
+  };
+
+  using WatchEvents = decltype(pollfd::events);
+
+  explicit FdPoller(Watcher* watcher);
+
+  // Watch and unwatch IO event for a given file descriptor.
+  inline void WatchForRead(base::PlatformHandle fd) { WatchFd(fd, POLLIN); }
+  inline void WatchForWrite(base::PlatformHandle fd) { WatchFd(fd, POLLOUT); }
+  inline void UnwatchForRead(base::PlatformHandle fd) { UnwatchFd(fd, POLLIN); }
+  inline void UnwatchForWrite(base::PlatformHandle fd) {
+    UnwatchFd(fd, POLLOUT);
+  }
+
+  // Called when |fd| is no longer of interest (e.g. when |fd| is to be closed).
+  void RemoveWatch(base::PlatformHandle fd);
+
+  // Poll for all watched events previously added with WatchForRead() and
+  // WatchForWrite().
+  //
+  // Must be called on poller thread.
+  void Poll();
+
+  // Notifies the poller for pending updates. Calling Notify() will unblock the
+  // poller and make it return from Poll(). It is caller's responsibility to
+  // call Poll() again once the updates are complete.
+  //
+  // This can be (and typically is) called from any thread.
+  void Notify();
+
+ private:
+  std::vector<pollfd>::iterator FindPollEvent(base::PlatformHandle fd);
+  void WatchFd(base::PlatformHandle fd, WatchEvents events);
+  void UnwatchFd(base::PlatformHandle fd, WatchEvents events);
+
+  base::ThreadChecker thread_checker_;
+  Watcher* const watcher_;
+  base::EventFd notify_fd_;
+  std::vector<pollfd> poll_fds_;
+};
+
+// This class groups a UnixSocketRaw with an associated ring buffer. The ring
+// buffer is used as a temporary storage for data *read* from the socket.
+class SocketWithBuffer {
+ public:
+  constexpr static size_t kBuffSize = ipc::kIPCBufferSize;
+
+  base::UnixSocketRaw sock;
+
+  // Points to the beginning of buffered data.
+  inline uint8_t* data() { return &buf_[0]; }
+  // Size of the buffered data.
+  inline size_t data_size() { return data_size_; }
+
+  // Points to the beginning of the free space for buffering new data.
+  inline uint8_t* buffer() { return &buf_[data_size_]; }
+  // Size of the free space.
+  inline size_t available_bytes() { return buf_.size() - data_size_; }
+
+  // Called when |bytes| of data is enqueued to the buffer.
+  void EnqueueData(size_t bytes) {
+    PERFETTO_CHECK(bytes <= available_bytes());
+    data_size_ += bytes;
+  }
+  // Called when |bytes| of data is dequeued from the buffer.
+  void DequeueData(size_t bytes) {
+    PERFETTO_CHECK(bytes <= data_size());
+    memmove(data(), data() + bytes, data_size() - bytes);
+    data_size_ -= bytes;
+  }
+
+  SocketWithBuffer() : buf_(kBuffSize) {}
+
+  // Movable only.
+  SocketWithBuffer(SocketWithBuffer&& other) = default;
+  SocketWithBuffer& operator=(SocketWithBuffer&& other) = default;
+  SocketWithBuffer(const SocketWithBuffer& other) = delete;
+  SocketWithBuffer& operator=(const SocketWithBuffer& other) = delete;
+
+ private:
+  std::vector<uint8_t> buf_;
+  size_t data_size_ = 0;
+};
+
+using SocketPair = std::pair<SocketWithBuffer, SocketWithBuffer>;
+
+// SocketRelayHandler bidirectionally forwards data between paired sockets.
+// Internally it multiplexes IO operations of the sockets using a FdPoller on a
+// dedicated thread.
+class SocketRelayHandler : public FdPoller::Watcher {
+ public:
+  SocketRelayHandler();
+  SocketRelayHandler(const SocketRelayHandler&) = delete;
+  SocketRelayHandler& operator=(const SocketRelayHandler&) = delete;
+  ~SocketRelayHandler() override;
+
+  // Transfer a pair of sockets to be relayed. Can be called from any thread.
+  void AddSocketPair(std::unique_ptr<SocketPair> socket_pair);
+
+  // The FdPoller::Watcher callbacks.
+  void OnFdReadable(base::PlatformHandle fd) override;
+  void OnFdWritable(base::PlatformHandle fd) override;
+
+ private:
+  void Run();
+  void RemoveSocketPair(SocketWithBuffer&, SocketWithBuffer&);
+
+  // A helper for running a callable object on |io_thread_|.
+  template <typename Callable>
+  void RunOnIOThread(Callable&& c) {
+    std::lock_guard<std::mutex> lock(mutex_);
+    pending_tasks_.emplace_back(std::forward<Callable>(c));
+    fd_poller_.Notify();
+  }
+
+  std::optional<std::tuple<SocketWithBuffer&, SocketWithBuffer&>> GetSocketPair(
+      base::PlatformHandle fd);
+
+  base::FlatHashMap<base::PlatformHandle, SocketPair*> socket_pairs_by_fd_;
+  std::vector<std::unique_ptr<SocketPair>> socket_pairs_;
+
+  FdPoller fd_poller_;
+
+  // The thread that fd_poller_ polls for IO events. Most methods of this class
+  // asserts to be running on this thread.
+  std::thread io_thread_;
+  base::ThreadChecker io_thread_checker_;
+
+  bool exited_ = false;
+
+  //--------------- Member data with multi-thread access ------------------
+  std::mutex mutex_;
+  std::deque<std::packaged_task<void()>> pending_tasks_;
+};
+
+}  // namespace perfetto
+#endif  // SRC_TRACED_RELAY_SOCKET_RELAY_HANDLER_H_
diff --git a/src/tracing/ipc/default_socket.cc b/src/tracing/ipc/default_socket.cc
index a2c11d8..c445c6e 100644
--- a/src/tracing/ipc/default_socket.cc
+++ b/src/tracing/ipc/default_socket.cc
@@ -88,6 +88,11 @@
   return name;
 }
 
+const char* GetRelaySocket() {
+  // The relay socket is optional and is connected only when the env var is set.
+  return getenv("PERFETTO_RELAY_SOCK_NAME");
+}
+
 std::vector<std::string> TokenizeProducerSockets(
     const char* producer_socket_names) {
   return base::SplitString(producer_socket_names, ",");
diff --git a/test/trace_processor/diff_tests/chrome/tests_scroll_jank.py b/test/trace_processor/diff_tests/chrome/tests_scroll_jank.py
index a56194a..ef8ec6a 100644
--- a/test/trace_processor/diff_tests/chrome/tests_scroll_jank.py
+++ b/test/trace_processor/diff_tests/chrome/tests_scroll_jank.py
@@ -48,7 +48,7 @@
     return DiffTestBlueprint(
         trace=DataPath('chrome_input_with_frame_view.pftrace'),
         query="""
-        SELECT IMPORT('chrome.scroll_jank.scroll_jank_v3');
+        INCLUDE PERFETTO MODULE chrome.scroll_jank.scroll_jank_v3;
 
         SELECT
           cause_of_jank,
@@ -63,7 +63,7 @@
     return DiffTestBlueprint(
         trace=DataPath('chrome_input_with_frame_view.pftrace'),
         query="""
-        SELECT IMPORT('chrome.scroll_jank.scroll_jank_v3');
+        INCLUDE PERFETTO MODULE chrome.scroll_jank.scroll_jank_v3;
 
         SELECT
           delayed_frame_percentage
diff --git a/test/trace_processor/diff_tests/tables/tests.py b/test/trace_processor/diff_tests/tables/tests.py
index 4cf4e55..83775a3 100644
--- a/test/trace_processor/diff_tests/tables/tests.py
+++ b/test/trace_processor/diff_tests/tables/tests.py
@@ -268,7 +268,7 @@
     return DiffTestBlueprint(
       trace=DataPath('android_monitor_contention_trace.atr'),
       query="""
-      SELECT import('experimental.thread_state_flattened');
+      INCLUDE PERFETTO MODULE experimental.thread_state_flattened;
       select * from experimental_get_flattened_thread_state_aggregated(11155, NULL);
       """,
       out=Path('thread_state_flattened_aggregated_csv.out'))
@@ -277,7 +277,7 @@
     return DiffTestBlueprint(
       trace=DataPath('android_monitor_contention_trace.atr'),
       query="""
-      SELECT import('experimental.thread_state_flattened');
+      INCLUDE PERFETTO MODULE experimental.thread_state_flattened;
       select * from experimental_get_flattened_thread_state(11155, NULL);
       """,
       out=Path('thread_state_flattened_csv.out'))
diff --git a/test/trace_processor/diff_tests/time/tests.py b/test/trace_processor/diff_tests/time/tests.py
index 5731169..ee8771d 100644
--- a/test/trace_processor/diff_tests/time/tests.py
+++ b/test/trace_processor/diff_tests/time/tests.py
@@ -26,7 +26,7 @@
     return DiffTestBlueprint(
         trace=TextProto(""),
         query="""
-        SELECT IMPORT('common.timestamps');
+        INCLUDE PERFETTO MODULE common.timestamps;
         SELECT ns(4) as result;
       """,
         out=Csv("""
@@ -38,7 +38,7 @@
     return DiffTestBlueprint(
         trace=TextProto(""),
         query="""
-        SELECT IMPORT('common.timestamps');
+        INCLUDE PERFETTO MODULE common.timestamps;
         SELECT us(4) as result;
       """,
         out=Csv("""
@@ -50,7 +50,7 @@
     return DiffTestBlueprint(
         trace=TextProto(""),
         query="""
-        SELECT IMPORT('common.timestamps');
+        INCLUDE PERFETTO MODULE common.timestamps;
         SELECT ms(4) as result;
       """,
         out=Csv("""
@@ -62,7 +62,7 @@
     return DiffTestBlueprint(
         trace=TextProto(""),
         query="""
-        SELECT IMPORT('common.timestamps');
+        INCLUDE PERFETTO MODULE common.timestamps;
         SELECT seconds(4) as result;
       """,
         out=Csv("""
@@ -74,7 +74,7 @@
     return DiffTestBlueprint(
         trace=TextProto(""),
         query="""
-        SELECT IMPORT('common.timestamps');
+        INCLUDE PERFETTO MODULE common.timestamps;
         SELECT minutes(1) as result;
       """,
         out=Csv("""
@@ -86,7 +86,7 @@
     return DiffTestBlueprint(
         trace=TextProto(""),
         query="""
-        SELECT IMPORT('common.timestamps');
+        INCLUDE PERFETTO MODULE common.timestamps;
         SELECT hours(1) as result;
       """,
         out=Csv("""
@@ -98,7 +98,7 @@
     return DiffTestBlueprint(
         trace=TextProto(""),
         query="""
-        SELECT IMPORT('common.timestamps');
+        INCLUDE PERFETTO MODULE common.timestamps;
         SELECT days(1) as result;
       """,
         out=Csv("""
diff --git a/ui/src/controller/track_decider.ts b/ui/src/controller/track_decider.ts
index e6ae81d..04bac66 100644
--- a/ui/src/controller/track_decider.ts
+++ b/ui/src/controller/track_decider.ts
@@ -234,7 +234,7 @@
   async guessCpuSizes(): Promise<Map<number, string>> {
     const cpuToSize = new Map<number, string>();
     await this.engine.query(`
-      SELECT IMPORT('common.cpus');
+      INCLUDE PERFETTO MODULE common.cpus;
     `);
     const result = await this.engine.query(`
       SELECT cpu, GUESS_CPU_SIZE(cpu) as size FROM cpu_counter_track;
diff --git a/ui/src/frontend/chrome_slice_details_tab.ts b/ui/src/frontend/chrome_slice_details_tab.ts
index 965d0b6..93b5e15 100644
--- a/ui/src/frontend/chrome_slice_details_tab.ts
+++ b/ui/src/frontend/chrome_slice_details_tab.ts
@@ -99,7 +99,10 @@
     run: (slice: SliceDetails) => {
       const engine = getEngine();
       if (engine === undefined) return;
-        runQuery(`SELECT IMPORT('android.binder'); SELECT IMPORT('android.monitor_contention');`, engine)
+      runQuery(`
+        INCLUDE PERFETTO MODULE android.binder;
+        INCLUDE PERFETTO MODULE android.monitor_contention;
+      `, engine)
           .then(
               () => addDebugTrack(
                   engine,
diff --git a/ui/src/frontend/pivot_table_query_generator.ts b/ui/src/frontend/pivot_table_query_generator.ts
index 946f6b9..b59aebe 100644
--- a/ui/src/frontend/pivot_table_query_generator.ts
+++ b/ui/src/frontend/pivot_table_query_generator.ts
@@ -166,7 +166,7 @@
               .join(' and\n')}` :
       '';
   const text = `
-    select import('experimental.slices');
+    INCLUDE PERFETTO MODULE experimental.slices;
 
     select
       ${renderedPivots.concat(aggregations).join(',\n')}
diff --git a/ui/src/frontend/sql_table/state.ts b/ui/src/frontend/sql_table/state.ts
index 95c62f0..806c330 100644
--- a/ui/src/frontend/sql_table/state.ts
+++ b/ui/src/frontend/sql_table/state.ts
@@ -154,7 +154,7 @@
   private getSQLImports() {
     const tableImports = this.table.imports || [];
     return [...tableImports, ...this.additionalImports]
-        .map((i) => `SELECT IMPORT("${i}");`)
+        .map((i) => `INCLUDE PERFETTO MODULE ${i};`)
         .join('\n');
   }
 
diff --git a/ui/src/frontend/thread_state_tab.ts b/ui/src/frontend/thread_state_tab.ts
index 634a7c2..4e25723 100644
--- a/ui/src/frontend/thread_state_tab.ts
+++ b/ui/src/frontend/thread_state_tab.ts
@@ -291,7 +291,7 @@
       ), m(Button,
            {
           label: 'Critical path lite',
-          onclick: () => runQuery(`SELECT IMPORT('experimental.thread_executing_span');`, this.engine)
+          onclick: () => runQuery(`INCLUDE PERFETTO MODULE experimental.thread_executing_span;`, this.engine)
               .then(() => addDebugTrack(
               this.engine,
                   {
@@ -323,7 +323,7 @@
       ), m(Button,
            {
           label: 'Critical path',
-          onclick: () => runQuery(`SELECT IMPORT('experimental.thread_executing_span');`, this.engine)
+          onclick: () => runQuery(`INCLUDE PERFETTO MODULE experimental.thread_executing_span;`, this.engine)
               .then(() => addDebugTrack(
               this.engine,
                   {
diff --git a/ui/src/plugins/dev.perfetto.AndroidPerf/index.ts b/ui/src/plugins/dev.perfetto.AndroidPerf/index.ts
index 7f8d8b3..17aa796 100644
--- a/ui/src/plugins/dev.perfetto.AndroidPerf/index.ts
+++ b/ui/src/plugins/dev.perfetto.AndroidPerf/index.ts
@@ -26,7 +26,7 @@
       id: 'dev.perfetto.AndroidPerf#BinderSystemServerIncoming',
       name: 'Run query: system_server incoming binder graph',
       callback: () => viewer.tabs.openQuery(
-          `SELECT IMPORT('android.binder');
+          `INCLUDE PERFETTO MODULE android.binder;
            SELECT * FROM android_binder_incoming_graph((SELECT upid FROM process WHERE name = 'system_server'))`,
           'system_server incoming binder graph'),
     });
@@ -35,7 +35,7 @@
       id: 'dev.perfetto.AndroidPerf#BinderSystemServerOutgoing',
       name: 'Run query: system_server outgoing binder graph',
       callback: () => viewer.tabs.openQuery(
-          `SELECT IMPORT('android.binder');
+          `INCLUDE PERFETTO MODULE android.binder;
            SELECT * FROM android_binder_outgoing_graph((SELECT upid FROM process WHERE name = 'system_server'))`,
           'system_server outgoing binder graph'),
     });
@@ -44,7 +44,7 @@
       id: 'dev.perfetto.AndroidPerf#MonitorContentionSystemServer',
       name: 'Run query: system_server monitor_contention graph',
       callback: () => viewer.tabs.openQuery(
-          `SELECT IMPORT('android.monitor_contention');
+          `INCLUDE PERFETTO MODULE android.monitor_contention;
            SELECT * FROM android_monitor_contention_graph((SELECT upid FROM process WHERE name = 'system_server'))`,
           'system_server monitor_contention graph'),
     });
@@ -53,7 +53,7 @@
       id: 'dev.perfetto.AndroidPerf#BinderAll',
       name: 'Run query: all process binder graph',
       callback: () => viewer.tabs.openQuery(
-          `SELECT IMPORT('android.binder');
+          `INCLUDE PERFETTO MODULE android.binder;
            SELECT * FROM android_binder_graph(-1000, 1000, -1000, 1000)`,
           'all process binder graph'),
     });
diff --git a/ui/src/tracks/chrome_scroll_jank/event_latency_track.ts b/ui/src/tracks/chrome_scroll_jank/event_latency_track.ts
index 370c754..c69e9a2 100644
--- a/ui/src/tracks/chrome_scroll_jank/event_latency_track.ts
+++ b/ui/src/tracks/chrome_scroll_jank/event_latency_track.ts
@@ -183,7 +183,7 @@
     FROM latency_stages ls;`;
 
   await engine.query(
-      `SELECT IMPORT('chrome.scroll_jank.scroll_jank_intervals')`);
+      `INCLUDE PERFETTO MODULE chrome.scroll_jank.scroll_jank_intervals`);
   await engine.query(tableDefSql);
 
   result.tracksToAdd.push({
diff --git a/ui/src/tracks/chrome_scroll_jank/scroll_jank_v3_track.ts b/ui/src/tracks/chrome_scroll_jank/scroll_jank_v3_track.ts
index a2f83f7..6e554d4 100644
--- a/ui/src/tracks/chrome_scroll_jank/scroll_jank_v3_track.ts
+++ b/ui/src/tracks/chrome_scroll_jank/scroll_jank_v3_track.ts
@@ -134,7 +134,7 @@
   };
 
   await engine.query(
-      `SELECT IMPORT('chrome.scroll_jank.scroll_jank_intervals')`);
+      `INCLUDE PERFETTO MODULE chrome.scroll_jank.scroll_jank_intervals`);
 
   result.tracksToAdd.push({
     id: uuidv4(),
diff --git a/ui/src/tracks/cpu_slices/index.ts b/ui/src/tracks/cpu_slices/index.ts
index 40526a4..6694ccd 100644
--- a/ui/src/tracks/cpu_slices/index.ts
+++ b/ui/src/tracks/cpu_slices/index.ts
@@ -503,7 +503,7 @@
   async guessCpuSizes(engine: EngineProxy): Promise<Map<number, string>> {
     const cpuToSize = new Map<number, string>();
     await engine.query(`
-      SELECT IMPORT('common.cpus');
+      INCLUDE PERFETTO MODULE common.cpus;
     `);
     const result = await engine.query(`
       SELECT cpu, GUESS_CPU_SIZE(cpu) as size FROM cpu_counter_track;
diff --git a/ui/src/tracks/screenshots/index.ts b/ui/src/tracks/screenshots/index.ts
index 63ff916..c1eb5ed 100644
--- a/ui/src/tracks/screenshots/index.ts
+++ b/ui/src/tracks/screenshots/index.ts
@@ -68,7 +68,7 @@
     tracksToAdd: [],
   };
 
-  await engine.query(`SELECT IMPORT('android.screenshots')`);
+  await engine.query(`INCLUDE PERFETTO MODULE android.screenshots`);
 
   result.tracksToAdd.push({
     id: uuidv4(),