Merge "perfetto-ui: Use table name in cpu track"
diff --git a/Android.bp b/Android.bp
index f6762ed..0946c40 100644
--- a/Android.bp
+++ b/Android.bp
@@ -37,6 +37,7 @@
     "src/base/file_utils.cc",
     "src/base/metatrace.cc",
     "src/base/paged_memory.cc",
+    "src/base/pipe.cc",
     "src/base/string_splitter.cc",
     "src/base/string_utils.cc",
     "src/base/temp_file.cc",
@@ -137,6 +138,7 @@
     "src/base/file_utils.cc",
     "src/base/metatrace.cc",
     "src/base/paged_memory.cc",
+    "src/base/pipe.cc",
     "src/base/string_splitter.cc",
     "src/base/string_utils.cc",
     "src/base/temp_file.cc",
@@ -191,6 +193,7 @@
     "src/base/file_utils.cc",
     "src/base/metatrace.cc",
     "src/base/paged_memory.cc",
+    "src/base/pipe.cc",
     "src/base/string_splitter.cc",
     "src/base/string_utils.cc",
     "src/base/temp_file.cc",
@@ -322,6 +325,7 @@
     "src/base/file_utils.cc",
     "src/base/metatrace.cc",
     "src/base/paged_memory.cc",
+    "src/base/pipe.cc",
     "src/base/string_splitter.cc",
     "src/base/string_utils.cc",
     "src/base/temp_file.cc",
@@ -469,6 +473,7 @@
     "src/base/file_utils.cc",
     "src/base/metatrace.cc",
     "src/base/paged_memory.cc",
+    "src/base/pipe.cc",
     "src/base/string_splitter.cc",
     "src/base/string_utils.cc",
     "src/base/temp_file.cc",
@@ -4286,6 +4291,7 @@
     "src/base/file_utils.cc",
     "src/base/metatrace.cc",
     "src/base/paged_memory.cc",
+    "src/base/pipe.cc",
     "src/base/string_splitter.cc",
     "src/base/string_utils.cc",
     "src/base/temp_file.cc",
@@ -4486,6 +4492,7 @@
     "src/base/optional_unittest.cc",
     "src/base/paged_memory.cc",
     "src/base/paged_memory_unittest.cc",
+    "src/base/pipe.cc",
     "src/base/scoped_file_unittest.cc",
     "src/base/string_splitter.cc",
     "src/base/string_splitter_unittest.cc",
@@ -4725,6 +4732,7 @@
     "src/base/file_utils.cc",
     "src/base/metatrace.cc",
     "src/base/paged_memory.cc",
+    "src/base/pipe.cc",
     "src/base/string_splitter.cc",
     "src/base/string_utils.cc",
     "src/base/temp_file.cc",
diff --git a/BUILD.gn b/BUILD.gn
index f5a0eb1..ca8a9c3 100644
--- a/BUILD.gn
+++ b/BUILD.gn
@@ -66,6 +66,7 @@
         "tools/ftrace_proto_gen:ftrace_proto_gen",
         "tools/proto_to_cpp",
         "tools/trace_to_text",
+        "tools/trace_to_text:trace_to_text_lite_host($host_toolchain)",
       ]
       if (is_linux || is_android) {
         deps += [ "tools/skippy" ]
diff --git a/include/perfetto/base/BUILD.gn b/include/perfetto/base/BUILD.gn
index 05baf78..16c56ff 100644
--- a/include/perfetto/base/BUILD.gn
+++ b/include/perfetto/base/BUILD.gn
@@ -26,6 +26,7 @@
     "metatrace.h",
     "optional.h",
     "paged_memory.h",
+    "pipe.h",
     "scoped_file.h",
     "small_set.h",
     "string_splitter.h",
diff --git a/include/perfetto/base/optional.h b/include/perfetto/base/optional.h
index 1f51582..3967727 100644
--- a/include/perfetto/base/optional.h
+++ b/include/perfetto/base/optional.h
@@ -160,7 +160,7 @@
   // Define it explicitly.
   OptionalStorage() = default;
 
-  OptionalStorage(const OptionalStorage& other) {
+  OptionalStorage(const OptionalStorage& other) : OptionalStorageBase<T>() {
     if (other.is_populated_)
       Init(other.value_);
   }
diff --git a/include/perfetto/base/pipe.h b/include/perfetto/base/pipe.h
new file mode 100644
index 0000000..a1fde3c
--- /dev/null
+++ b/include/perfetto/base/pipe.h
@@ -0,0 +1,47 @@
+/*
+ * Copyright (C) 2018 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef INCLUDE_PERFETTO_BASE_PIPE_H_
+#define INCLUDE_PERFETTO_BASE_PIPE_H_
+
+#include "perfetto/base/scoped_file.h"
+
+namespace perfetto {
+namespace base {
+
+class Pipe {
+ public:
+  enum Flags {
+    kBothBlock = 0,
+    kBothNonBlock,
+    kRdNonBlock,
+    kWrNonBlock,
+  };
+
+  static Pipe Create(Flags = kBothBlock);
+
+  Pipe();
+  Pipe(Pipe&&) noexcept;
+  Pipe& operator=(Pipe&&);
+
+  ScopedFile rd;
+  ScopedFile wr;
+};
+
+}  // namespace base
+}  // namespace perfetto
+
+#endif  // INCLUDE_PERFETTO_BASE_PIPE_H_
diff --git a/protos/third_party/pprof/profile.proto b/protos/third_party/pprof/profile.proto
index 9540940..2c4acca 100644
--- a/protos/third_party/pprof/profile.proto
+++ b/protos/third_party/pprof/profile.proto
@@ -43,6 +43,11 @@
 option java_package = "com.google.perftools.profiles";
 option java_outer_classname = "ProfileProto";
 
+// Perfetto Changes ===========================================================
+// 1. Add optimize_for = LITE_RUNTIME
+option optimize_for = LITE_RUNTIME;
+// ============================================================================
+
 message Profile {
   // A description of the samples associated with each Sample.value.
   // For a cpu profile this might be:
diff --git a/src/base/BUILD.gn b/src/base/BUILD.gn
index 3dd1ecb..06c88b5 100644
--- a/src/base/BUILD.gn
+++ b/src/base/BUILD.gn
@@ -38,6 +38,7 @@
   if (!is_win) {
     sources += [
       "event.cc",
+      "pipe.cc",
       "temp_file.cc",
       "unix_task_runner.cc",
     ]
diff --git a/src/base/event.cc b/src/base/event.cc
index e2ced94..4fa6ec0 100644
--- a/src/base/event.cc
+++ b/src/base/event.cc
@@ -19,6 +19,7 @@
 
 #include "perfetto/base/event.h"
 #include "perfetto/base/logging.h"
+#include "perfetto/base/pipe.h"
 
 #if PERFETTO_USE_EVENTFD()
 #include <sys/eventfd.h>
@@ -32,19 +33,11 @@
   fd_.reset(eventfd(/* start value */ 0, EFD_CLOEXEC | EFD_NONBLOCK));
   PERFETTO_CHECK(fd_);
 #else
-  int pipe_fds[2];
-  PERFETTO_CHECK(pipe(pipe_fds) == 0);
-
   // Make the pipe non-blocking so that we never block the waking thread (either
   // the main thread or another one) when scheduling a wake-up.
-  for (auto fd : pipe_fds) {
-    int flags = fcntl(fd, F_GETFL, 0);
-    PERFETTO_CHECK(flags != -1);
-    PERFETTO_CHECK(fcntl(fd, F_SETFL, flags | O_NONBLOCK) == 0);
-    PERFETTO_CHECK(fcntl(fd, F_SETFD, FD_CLOEXEC) == 0);
-  }
-  fd_.reset(pipe_fds[0]);
-  write_fd_.reset(pipe_fds[1]);
+  Pipe pipe = Pipe::Create(Pipe::kBothNonBlock);
+  fd_ = std::move(pipe.rd);
+  write_fd_ = std::move(pipe.wr);
 #endif  // !PERFETTO_USE_EVENTFD()
 }
 
diff --git a/src/base/pipe.cc b/src/base/pipe.cc
new file mode 100644
index 0000000..db7fd3a
--- /dev/null
+++ b/src/base/pipe.cc
@@ -0,0 +1,56 @@
+/*
+ * Copyright (C) 2018 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "perfetto/base/pipe.h"
+
+#include <sys/types.h>
+#include <unistd.h>
+
+#include "perfetto/base/logging.h"
+
+namespace perfetto {
+namespace base {
+
+Pipe::Pipe() = default;
+Pipe::Pipe(Pipe&&) noexcept = default;
+Pipe& Pipe::operator=(Pipe&&) = default;
+
+Pipe Pipe::Create(Flags flags) {
+  int fds[2];
+  PERFETTO_CHECK(pipe(fds) == 0);
+  Pipe p;
+  p.rd.reset(fds[0]);
+  p.wr.reset(fds[1]);
+
+  PERFETTO_CHECK(fcntl(*p.rd, F_SETFD, FD_CLOEXEC) == 0);
+  PERFETTO_CHECK(fcntl(*p.wr, F_SETFD, FD_CLOEXEC) == 0);
+
+  if (flags == kBothNonBlock || flags == kRdNonBlock) {
+    int cur_flags = fcntl(*p.rd, F_GETFL, 0);
+    PERFETTO_CHECK(cur_flags >= 0);
+    PERFETTO_CHECK(fcntl(*p.rd, F_SETFL, cur_flags | O_NONBLOCK) == 0);
+  }
+
+  if (flags == kBothNonBlock || flags == kWrNonBlock) {
+    int cur_flags = fcntl(*p.wr, F_GETFL, 0);
+    PERFETTO_CHECK(cur_flags >= 0);
+    PERFETTO_CHECK(fcntl(*p.wr, F_SETFL, cur_flags | O_NONBLOCK) == 0);
+  }
+  return p;
+}
+
+}  // namespace base
+}  // namespace perfetto
diff --git a/src/base/task_runner_unittest.cc b/src/base/task_runner_unittest.cc
index 0bde467..eb6309a 100644
--- a/src/base/task_runner_unittest.cc
+++ b/src/base/task_runner_unittest.cc
@@ -28,6 +28,7 @@
 #include <thread>
 
 #include "perfetto/base/file_utils.h"
+#include "perfetto/base/pipe.h"
 
 namespace perfetto {
 namespace base {
@@ -47,28 +48,21 @@
 #endif
 TYPED_TEST_CASE(TaskRunnerTest, TaskRunnerTypes);
 
-struct Pipe {
-  Pipe() {
-    int pipe_fds[2];
-    PERFETTO_DCHECK(pipe(pipe_fds) == 0);
-    read_fd.reset(pipe_fds[0]);
-    write_fd.reset(pipe_fds[1]);
+struct TestPipe : Pipe {
+  TestPipe() : Pipe(Pipe::Create()) {
     // Make the pipe initially readable.
     Write();
   }
 
   void Read() {
     char b;
-    PERFETTO_DCHECK(read(read_fd.get(), &b, 1) == 1);
+    PERFETTO_DCHECK(read(*this->rd, &b, 1) == 1);
   }
 
   void Write() {
     const char b = '?';
-    PERFETTO_DCHECK(WriteAll(write_fd.get(), &b, 1) == 1);
+    PERFETTO_DCHECK(WriteAll(*this->wr, &b, 1) == 1);
   }
-
-  ScopedFile read_fd;
-  ScopedFile write_fd;
 };
 
 TYPED_TEST(TaskRunnerTest, PostImmediateTask) {
@@ -141,20 +135,20 @@
 
 TYPED_TEST(TaskRunnerTest, AddFileDescriptorWatch) {
   auto& task_runner = this->task_runner;
-  Pipe pipe;
-  task_runner.AddFileDescriptorWatch(pipe.read_fd.get(),
+  TestPipe pipe;
+  task_runner.AddFileDescriptorWatch(pipe.rd.get(),
                                      [&task_runner] { task_runner.Quit(); });
   task_runner.Run();
 }
 
 TYPED_TEST(TaskRunnerTest, RemoveFileDescriptorWatch) {
   auto& task_runner = this->task_runner;
-  Pipe pipe;
+  TestPipe pipe;
 
   bool watch_ran = false;
-  task_runner.AddFileDescriptorWatch(pipe.read_fd.get(),
+  task_runner.AddFileDescriptorWatch(pipe.rd.get(),
                                      [&watch_ran] { watch_ran = true; });
-  task_runner.RemoveFileDescriptorWatch(pipe.read_fd.get());
+  task_runner.RemoveFileDescriptorWatch(pipe.rd.get());
   task_runner.PostDelayedTask([&task_runner] { task_runner.Quit(); }, 10);
   task_runner.Run();
 
@@ -163,13 +157,13 @@
 
 TYPED_TEST(TaskRunnerTest, RemoveFileDescriptorWatchFromTask) {
   auto& task_runner = this->task_runner;
-  Pipe pipe;
+  TestPipe pipe;
 
   bool watch_ran = false;
   task_runner.PostTask([&task_runner, &pipe] {
-    task_runner.RemoveFileDescriptorWatch(pipe.read_fd.get());
+    task_runner.RemoveFileDescriptorWatch(pipe.rd.get());
   });
-  task_runner.AddFileDescriptorWatch(pipe.read_fd.get(),
+  task_runner.AddFileDescriptorWatch(pipe.rd.get(),
                                      [&watch_ran] { watch_ran = true; });
   task_runner.PostDelayedTask([&task_runner] { task_runner.Quit(); }, 10);
   task_runner.Run();
@@ -179,30 +173,30 @@
 
 TYPED_TEST(TaskRunnerTest, AddFileDescriptorWatchFromAnotherWatch) {
   auto& task_runner = this->task_runner;
-  Pipe pipe;
-  Pipe pipe2;
+  TestPipe pipe;
+  TestPipe pipe2;
 
   task_runner.AddFileDescriptorWatch(
-      pipe.read_fd.get(), [&task_runner, &pipe, &pipe2] {
+      pipe.rd.get(), [&task_runner, &pipe, &pipe2] {
         pipe.Read();
         task_runner.AddFileDescriptorWatch(
-            pipe2.read_fd.get(), [&task_runner] { task_runner.Quit(); });
+            pipe2.rd.get(), [&task_runner] { task_runner.Quit(); });
       });
   task_runner.Run();
 }
 
 TYPED_TEST(TaskRunnerTest, RemoveFileDescriptorWatchFromAnotherWatch) {
   auto& task_runner = this->task_runner;
-  Pipe pipe;
-  Pipe pipe2;
+  TestPipe pipe;
+  TestPipe pipe2;
 
   bool watch_ran = false;
   task_runner.AddFileDescriptorWatch(
-      pipe.read_fd.get(), [&task_runner, &pipe, &pipe2] {
+      pipe.rd.get(), [&task_runner, &pipe, &pipe2] {
         pipe.Read();
-        task_runner.RemoveFileDescriptorWatch(pipe2.read_fd.get());
+        task_runner.RemoveFileDescriptorWatch(pipe2.rd.get());
       });
-  task_runner.AddFileDescriptorWatch(pipe2.read_fd.get(),
+  task_runner.AddFileDescriptorWatch(pipe2.rd.get(),
                                      [&watch_ran] { watch_ran = true; });
   task_runner.PostDelayedTask([&task_runner] { task_runner.Quit(); }, 10);
   task_runner.Run();
@@ -212,17 +206,16 @@
 
 TYPED_TEST(TaskRunnerTest, ReplaceFileDescriptorWatchFromAnotherWatch) {
   auto& task_runner = this->task_runner;
-  Pipe pipe;
-  Pipe pipe2;
+  TestPipe pipe;
+  TestPipe pipe2;
 
   bool watch_ran = false;
-  task_runner.AddFileDescriptorWatch(
-      pipe.read_fd.get(), [&task_runner, &pipe2] {
-        task_runner.RemoveFileDescriptorWatch(pipe2.read_fd.get());
-        task_runner.AddFileDescriptorWatch(
-            pipe2.read_fd.get(), [&task_runner] { task_runner.Quit(); });
-      });
-  task_runner.AddFileDescriptorWatch(pipe2.read_fd.get(),
+  task_runner.AddFileDescriptorWatch(pipe.rd.get(), [&task_runner, &pipe2] {
+    task_runner.RemoveFileDescriptorWatch(pipe2.rd.get());
+    task_runner.AddFileDescriptorWatch(pipe2.rd.get(),
+                                       [&task_runner] { task_runner.Quit(); });
+  });
+  task_runner.AddFileDescriptorWatch(pipe2.rd.get(),
                                      [&watch_ran] { watch_ran = true; });
   task_runner.Run();
 
@@ -231,10 +224,10 @@
 
 TYPED_TEST(TaskRunnerTest, AddFileDescriptorWatchFromAnotherThread) {
   auto& task_runner = this->task_runner;
-  Pipe pipe;
+  TestPipe pipe;
 
   std::thread thread([&task_runner, &pipe] {
-    task_runner.AddFileDescriptorWatch(pipe.read_fd.get(),
+    task_runner.AddFileDescriptorWatch(pipe.rd.get(),
                                        [&task_runner] { task_runner.Quit(); });
   });
   task_runner.Run();
@@ -243,10 +236,10 @@
 
 TYPED_TEST(TaskRunnerTest, FileDescriptorWatchWithMultipleEvents) {
   auto& task_runner = this->task_runner;
-  Pipe pipe;
+  TestPipe pipe;
 
   int event_count = 0;
-  task_runner.AddFileDescriptorWatch(pipe.read_fd.get(),
+  task_runner.AddFileDescriptorWatch(pipe.rd.get(),
                                      [&task_runner, &pipe, &event_count] {
                                        if (++event_count == 3) {
                                          task_runner.Quit();
@@ -261,13 +254,9 @@
 
 TYPED_TEST(TaskRunnerTest, FileDescriptorClosedEvent) {
   auto& task_runner = this->task_runner;
-  int pipe_fds[2];
-  PERFETTO_DCHECK(pipe(pipe_fds) == 0);
-  ScopedFile read_fd(pipe_fds[0]);
-  ScopedFile write_fd(pipe_fds[1]);
-
-  write_fd.reset();
-  task_runner.AddFileDescriptorWatch(read_fd.get(),
+  TestPipe pipe;
+  pipe.wr.reset();
+  task_runner.AddFileDescriptorWatch(pipe.rd.get(),
                                      [&task_runner] { task_runner.Quit(); });
   task_runner.Run();
 }
@@ -305,9 +294,9 @@
 
 TYPED_TEST(TaskRunnerTest, FileDescriptorWatchesNotStarved) {
   auto& task_runner = this->task_runner;
-  Pipe pipe;
+  TestPipe pipe;
   task_runner.PostTask(std::bind(&RepeatingTask<TypeParam>, &task_runner));
-  task_runner.AddFileDescriptorWatch(pipe.read_fd.get(),
+  task_runner.AddFileDescriptorWatch(pipe.rd.get(),
                                      [&task_runner] { task_runner.Quit(); });
   task_runner.Run();
 }
@@ -324,15 +313,14 @@
 
 TYPED_TEST(TaskRunnerTest, NoDuplicateFileDescriptorWatchCallbacks) {
   auto& task_runner = this->task_runner;
-  Pipe pipe;
+  TestPipe pipe;
   bool watch_called = 0;
   int counter = 10;
-  task_runner.AddFileDescriptorWatch(pipe.read_fd.get(),
-                                     [&pipe, &watch_called] {
-                                       ASSERT_FALSE(watch_called);
-                                       pipe.Read();
-                                       watch_called = true;
-                                     });
+  task_runner.AddFileDescriptorWatch(pipe.rd.get(), [&pipe, &watch_called] {
+    ASSERT_FALSE(watch_called);
+    pipe.Read();
+    watch_called = true;
+  });
   task_runner.PostTask(
       std::bind(&CountdownTask<TypeParam>, &task_runner, &counter));
   task_runner.Run();
@@ -340,16 +328,16 @@
 
 TYPED_TEST(TaskRunnerTest, ReplaceFileDescriptorWatchFromOtherThread) {
   auto& task_runner = this->task_runner;
-  Pipe pipe;
+  TestPipe pipe;
 
   // The two watch tasks here race each other. We don't particularly care which
   // wins as long as one of them runs.
-  task_runner.AddFileDescriptorWatch(pipe.read_fd.get(),
+  task_runner.AddFileDescriptorWatch(pipe.rd.get(),
                                      [&task_runner] { task_runner.Quit(); });
 
   std::thread thread([&task_runner, &pipe] {
-    task_runner.RemoveFileDescriptorWatch(pipe.read_fd.get());
-    task_runner.AddFileDescriptorWatch(pipe.read_fd.get(),
+    task_runner.RemoveFileDescriptorWatch(pipe.rd.get());
+    task_runner.AddFileDescriptorWatch(pipe.rd.get(),
                                        [&task_runner] { task_runner.Quit(); });
   });
 
diff --git a/src/base/unix_socket_unittest.cc b/src/base/unix_socket_unittest.cc
index 9e59757..feff166 100644
--- a/src/base/unix_socket_unittest.cc
+++ b/src/base/unix_socket_unittest.cc
@@ -27,6 +27,7 @@
 #include "perfetto/base/build_config.h"
 #include "perfetto/base/file_utils.h"
 #include "perfetto/base/logging.h"
+#include "perfetto/base/pipe.h"
 #include "perfetto/base/temp_file.h"
 #include "perfetto/base/utils.h"
 #include "src/base/test/test_task_runner.h"
@@ -333,9 +334,7 @@
 // the socket to the client. Both processes mmap the file in shared mode and
 // check that they see the same contents.
 TEST_F(UnixSocketTest, SharedMemory) {
-  int pipes[2];
-  ASSERT_EQ(0, pipe(pipes));
-
+  Pipe pipe = Pipe::Create();
   pid_t pid = fork();
   ASSERT_GE(pid, 0);
   constexpr size_t kTmpSize = 4096;
@@ -353,7 +352,7 @@
     auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_);
     ASSERT_TRUE(srv->is_listening());
     // Signal the other process that it can connect.
-    ASSERT_EQ(1, base::WriteAll(pipes[1], ".", 1));
+    ASSERT_EQ(1, base::WriteAll(*pipe.wr, ".", 1));
     auto checkpoint = task_runner_.CreateCheckpoint("change_seen_by_server");
     EXPECT_CALL(event_listener_, OnNewIncomingConnection(srv.get(), _))
         .WillOnce(Invoke(
@@ -373,7 +372,7 @@
     _exit(0);
   } else {
     char sync_cmd = '\0';
-    ASSERT_EQ(1, PERFETTO_EINTR(read(pipes[0], &sync_cmd, 1)));
+    ASSERT_EQ(1, PERFETTO_EINTR(read(*pipe.rd, &sync_cmd, 1)));
     ASSERT_EQ('.', sync_cmd);
     auto cli =
         UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_);
diff --git a/src/base/utils_unittest.cc b/src/base/utils_unittest.cc
index 1f556d6..4fb35bf 100644
--- a/src/base/utils_unittest.cc
+++ b/src/base/utils_unittest.cc
@@ -24,6 +24,7 @@
 #include "gtest/gtest.h"
 
 #include "perfetto/base/file_utils.h"
+#include "perfetto/base/pipe.h"
 
 namespace perfetto {
 namespace base {
@@ -58,10 +59,8 @@
   EXPECT_EQ(4u, ArraySize(bar_4));
 }
 
-int pipe_fd[2];
-
 TEST(UtilsTest, EintrWrapper) {
-  ASSERT_EQ(0, pipe(pipe_fd));
+  Pipe pipe = Pipe::Create();
 
   struct sigaction sa = {};
   struct sigaction old_sa = {};
@@ -81,21 +80,21 @@
   if (pid == 0 /* child */) {
     usleep(5000);
     kill(parent_pid, SIGUSR2);
-    ignore_result(WriteAll(pipe_fd[1], "foo\0", 4));
+    ignore_result(WriteAll(*pipe.wr, "foo\0", 4));
     _exit(0);
   }
 
   char buf[6] = {};
-  EXPECT_EQ(4, PERFETTO_EINTR(read(pipe_fd[0], buf, sizeof(buf))));
-  EXPECT_STREQ("foo", buf);
-  EXPECT_EQ(0, PERFETTO_EINTR(close(pipe_fd[0])));
-  EXPECT_EQ(0, PERFETTO_EINTR(close(pipe_fd[1])));
+  EXPECT_EQ(4, PERFETTO_EINTR(read(*pipe.rd, buf, sizeof(buf))));
+  EXPECT_EQ(0, PERFETTO_EINTR(close(*pipe.rd)));
+  pipe.wr.reset();
 
   // A 2nd close should fail with the proper errno.
-  int res = close(pipe_fd[0]);
+  int res = close(*pipe.rd);
   auto err = errno;
   EXPECT_EQ(-1, res);
   EXPECT_EQ(EBADF, err);
+  pipe.rd.release();
 
   // Restore the old handler.
   sigaction(SIGUSR2, &old_sa, nullptr);
diff --git a/src/perfetto_cmd/perfetto_cmd.cc b/src/perfetto_cmd/perfetto_cmd.cc
index 54ab538..8131633 100644
--- a/src/perfetto_cmd/perfetto_cmd.cc
+++ b/src/perfetto_cmd/perfetto_cmd.cc
@@ -306,8 +306,14 @@
     return 1;
 
   if (background) {
-    PERFETTO_CHECK(daemon(0 /*nochdir*/, 0 /*noclose*/) == 0);
+    PERFETTO_CHECK(daemon(0 /*nochdir*/, 1 /*noclose*/) == 0);
     PERFETTO_DLOG("Continuing in background");
+    printf("pid: %d\n", getpid());
+    base::ScopedFile null = base::OpenFile("/dev/null", O_RDONLY);
+    PERFETTO_CHECK(null);
+    PERFETTO_CHECK(dup2(*null, STDIN_FILENO) != -1);
+    PERFETTO_CHECK(dup2(*null, STDOUT_FILENO) != -1);
+    PERFETTO_CHECK(dup2(*null, STDERR_FILENO) != -1);
   }
 
   RateLimiter::Args args{};
@@ -464,10 +470,7 @@
 
 void PerfettoCmd::SetupCtrlCSignalHandler() {
   // Setup the pipe used to deliver the CTRL-C notification from signal handler.
-  int pipe_fds[2];
-  PERFETTO_CHECK(pipe(pipe_fds) == 0);
-  ctrl_c_pipe_rd_.reset(pipe_fds[0]);
-  ctrl_c_pipe_wr_.reset(pipe_fds[1]);
+  ctrl_c_pipe_ = base::Pipe::Create();
 
   // Setup signal handler.
   struct sigaction sa {};
@@ -488,7 +491,7 @@
   sigaction(SIGINT, &sa, nullptr);
 
   task_runner_.AddFileDescriptorWatch(
-      *ctrl_c_pipe_rd_, [this] { consumer_endpoint_->DisableTracing(); });
+      *ctrl_c_pipe_.rd, [this] { consumer_endpoint_->DisableTracing(); });
 }
 
 int __attribute__((visibility("default")))
diff --git a/src/perfetto_cmd/perfetto_cmd.h b/src/perfetto_cmd/perfetto_cmd.h
index f96b73f..972664f 100644
--- a/src/perfetto_cmd/perfetto_cmd.h
+++ b/src/perfetto_cmd/perfetto_cmd.h
@@ -24,6 +24,7 @@
 #include <time.h>
 
 #include "perfetto/base/build_config.h"
+#include "perfetto/base/pipe.h"
 #include "perfetto/base/scoped_file.h"
 #include "perfetto/base/unix_task_runner.h"
 #include "perfetto/tracing/core/consumer.h"
@@ -58,7 +59,7 @@
   void OnTracingDisabled() override;
   void OnTraceData(std::vector<TracePacket>, bool has_more) override;
 
-  int ctrl_c_pipe_wr() const { return *ctrl_c_pipe_wr_; }
+  int ctrl_c_pipe_wr() const { return *ctrl_c_pipe_.wr; }
 
  private:
   bool OpenOutputFile();
@@ -73,8 +74,7 @@
   std::unique_ptr<TraceConfig> trace_config_;
   base::ScopedFstream trace_out_stream_;
   std::string trace_out_path_;
-  base::ScopedFile ctrl_c_pipe_wr_;
-  base::ScopedFile ctrl_c_pipe_rd_;
+  base::Pipe ctrl_c_pipe_;
   std::string dropbox_tag_;
   bool did_process_full_trace_ = false;
   uint64_t bytes_written_ = 0;
diff --git a/src/profiling/memory/interner.h b/src/profiling/memory/interner.h
index 5de578f..379b197 100644
--- a/src/profiling/memory/interner.h
+++ b/src/profiling/memory/interner.h
@@ -20,6 +20,8 @@
 #include <stddef.h>
 #include <set>
 
+#include "perfetto/base/logging.h"
+
 namespace perfetto {
 namespace profiling {
 
@@ -29,7 +31,10 @@
 class Interner {
  private:
   struct Entry {
-    Entry(T d, Interner<T>* in) : data(std::move(d)), interner(in) {}
+    template <typename... U>
+    Entry(Interner<T>* in, U... args)
+        : data(std::forward<U...>(args...)), interner(in) {}
+
     bool operator<(const Entry& other) const { return data < other.data; }
 
     const T data;
@@ -85,12 +90,16 @@
     Interner::Entry* entry_;
   };
 
-  Interned Intern(const T& data) {
-    auto itr = entries_.emplace(data, this);
+  template <typename... U>
+  Interned Intern(U... args) {
+    auto itr = entries_.emplace(this, std::forward<U...>(args...));
     Entry& entry = const_cast<Entry&>(*itr.first);
     entry.ref_count++;
     return Interned(&entry);
   }
+
+  ~Interner() { PERFETTO_DCHECK(entries_.empty()); }
+
   size_t entry_count_for_testing() { return entries_.size(); }
 
  private:
diff --git a/src/profiling/memory/interner_unittest.cc b/src/profiling/memory/interner_unittest.cc
index d94fa17..b282ce4 100644
--- a/src/profiling/memory/interner_unittest.cc
+++ b/src/profiling/memory/interner_unittest.cc
@@ -109,6 +109,25 @@
   }
 }
 
+class NoCopyOrMove {
+ public:
+  NoCopyOrMove(const NoCopyOrMove&) = delete;
+  NoCopyOrMove& operator=(const NoCopyOrMove&) = delete;
+  NoCopyOrMove(const NoCopyOrMove&&) = delete;
+  NoCopyOrMove& operator=(const NoCopyOrMove&&) = delete;
+  NoCopyOrMove(int d) : data(d) {}
+  ~NoCopyOrMove() {}
+  bool operator<(const NoCopyOrMove& other) const { return data < other.data; }
+
+ private:
+  int data;
+};
+
+TEST(InternerStringTest, NoCopyOrMove) {
+  Interner<NoCopyOrMove> interner;
+  Interner<NoCopyOrMove>::Interned interned_str = interner.Intern(1);
+}
+
 }  // namespace
 }  // namespace profiling
 }  // namespace perfetto
diff --git a/src/trace_processor/counters_table.cc b/src/trace_processor/counters_table.cc
index fa351df..1e8d665 100644
--- a/src/trace_processor/counters_table.cc
+++ b/src/trace_processor/counters_table.cc
@@ -31,6 +31,7 @@
   ref_types_[RefType::kIrq] = "irq";
   ref_types_[RefType::kSoftIrq] = "softirq";
   ref_types_[RefType::kUpid] = "upid";
+  ref_types_[RefType::kUtidLookupUpid] = "upid";
 }
 
 void CountersTable::RegisterTable(sqlite3* db, const TraceStorage* storage) {
@@ -39,6 +40,7 @@
 
 Table::Schema CountersTable::CreateSchema(int, const char* const*) {
   const auto& counters = storage_->counters();
+
   std::unique_ptr<StorageSchema::Column> cols[] = {
       StorageSchema::NumericColumnPtr("ts", &counters.timestamps(),
                                       false /* hidden */, true /* ordered */),
@@ -48,7 +50,7 @@
       StorageSchema::NumericColumnPtr("dur", &counters.durations()),
       StorageSchema::TsEndPtr("ts_end", &counters.timestamps(),
                               &counters.durations()),
-      StorageSchema::NumericColumnPtr("ref", &counters.refs()),
+      std::unique_ptr<RefColumn>(new RefColumn("ref", storage_)),
       StorageSchema::StringColumnPtr("ref_type", &counters.types(),
                                      &ref_types_)};
   schema_ = StorageSchema({
@@ -85,5 +87,83 @@
   return SQLITE_OK;
 }
 
+CountersTable::RefColumn::RefColumn(std::string col_name,
+                                    const TraceStorage* storage)
+    : Column(col_name, false), storage_(storage) {}
+
+void CountersTable::RefColumn::ReportResult(sqlite3_context* ctx,
+                                            uint32_t row) const {
+  auto ref = storage_->counters().refs()[row];
+  auto type = storage_->counters().types()[row];
+  if (type == RefType::kUtidLookupUpid) {
+    auto upid = storage_->GetThread(static_cast<uint32_t>(ref)).upid;
+    if (upid.has_value()) {
+      sqlite_utils::ReportSqliteResult(ctx, upid.value());
+    } else {
+      sqlite3_result_null(ctx);
+    }
+  } else {
+    sqlite_utils::ReportSqliteResult(ctx, ref);
+  }
+}
+
+CountersTable::RefColumn::Bounds CountersTable::RefColumn::BoundFilter(
+    int,
+    sqlite3_value*) const {
+  return Bounds{};
+}
+
+CountersTable::RefColumn::Predicate CountersTable::RefColumn::Filter(
+    int op,
+    sqlite3_value* value) const {
+  auto binary_op = sqlite_utils::GetPredicateForOp<int64_t>(op);
+  int64_t extracted = sqlite_utils::ExtractSqliteValue<int64_t>(value);
+  return [this, binary_op, extracted](uint32_t idx) {
+    auto ref = storage_->counters().refs()[idx];
+    auto type = storage_->counters().types()[idx];
+    if (type == RefType::kUtidLookupUpid) {
+      auto upid = storage_->GetThread(static_cast<uint32_t>(ref)).upid;
+      // Trying to filter null with any operation we currently handle
+      // should return false.
+      return upid.has_value() && binary_op(upid.value(), extracted);
+    }
+    return binary_op(ref, extracted);
+  };
+}
+
+CountersTable::RefColumn::Comparator CountersTable::RefColumn::Sort(
+    const QueryConstraints::OrderBy& ob) const {
+  if (ob.desc) {
+    return [this](uint32_t f, uint32_t s) { return -CompareRefsAsc(f, s); };
+  }
+  return [this](uint32_t f, uint32_t s) { return CompareRefsAsc(f, s); };
+}
+
+int CountersTable::RefColumn::CompareRefsAsc(uint32_t f, uint32_t s) const {
+  auto ref_f = storage_->counters().refs()[f];
+  auto ref_s = storage_->counters().refs()[s];
+
+  auto type_f = storage_->counters().types()[f];
+  auto type_s = storage_->counters().types()[s];
+
+  if (type_f == RefType::kUtidLookupUpid) {
+    auto upid_f = storage_->GetThread(static_cast<uint32_t>(ref_f)).upid;
+    if (type_s == RefType::kUtidLookupUpid) {
+      auto upid_s = storage_->GetThread(static_cast<uint32_t>(ref_s)).upid;
+      if (!upid_f.has_value() && !upid_s.has_value()) {
+        return 0;
+      } else if (!upid_f.has_value()) {
+        return -1;
+      } else if (!upid_s.has_value()) {
+        return 1;
+      }
+      return sqlite_utils::CompareValuesAsc(upid_f.value(), upid_s.value());
+    }
+    if (!upid_f.has_value())
+      return -1;
+  }
+  return sqlite_utils::CompareValuesAsc(ref_f, ref_s);
+}
+
 }  // namespace trace_processor
 }  // namespace perfetto
diff --git a/src/trace_processor/counters_table.h b/src/trace_processor/counters_table.h
index e165606..6703f56 100644
--- a/src/trace_processor/counters_table.h
+++ b/src/trace_processor/counters_table.h
@@ -37,6 +37,30 @@
   int BestIndex(const QueryConstraints&, BestIndexInfo*) override;
 
  private:
+  class RefColumn final : public StorageSchema::Column {
+   public:
+    RefColumn(std::string col_name, const TraceStorage* storage);
+
+    void ReportResult(sqlite3_context* ctx, uint32_t row) const override;
+
+    Bounds BoundFilter(int op, sqlite3_value* sqlite_val) const override;
+
+    Predicate Filter(int op, sqlite3_value* value) const override;
+
+    Comparator Sort(const QueryConstraints::OrderBy& ob) const override;
+
+    bool IsNaturallyOrdered() const override { return false; }
+
+    Table::ColumnType GetType() const override {
+      return Table::ColumnType::kLong;
+    }
+
+   private:
+    int CompareRefsAsc(uint32_t f, uint32_t s) const;
+
+    const TraceStorage* storage_ = nullptr;
+  };
+
   std::deque<std::string> ref_types_;
   StorageSchema schema_;
   const TraceStorage* const storage_;
diff --git a/src/trace_processor/counters_table_unittest.cc b/src/trace_processor/counters_table_unittest.cc
index 77af0d2..6f84a87 100644
--- a/src/trace_processor/counters_table_unittest.cc
+++ b/src/trace_processor/counters_table_unittest.cc
@@ -16,6 +16,7 @@
 
 #include "src/trace_processor/counters_table.h"
 #include "src/trace_processor/event_tracker.h"
+#include "src/trace_processor/process_tracker.h"
 #include "src/trace_processor/scoped_db.h"
 #include "src/trace_processor/trace_processor_context.h"
 
@@ -35,6 +36,7 @@
 
     context_.storage.reset(new TraceStorage());
     context_.event_tracker.reset(new EventTracker(&context_));
+    context_.process_tracker.reset(new ProcessTracker(&context_));
 
     CountersTable::RegisterTable(db_.get(), context_.storage.get());
   }
@@ -113,6 +115,80 @@
   ASSERT_EQ(sqlite3_step(*stmt_), SQLITE_DONE);
 }
 
+TEST_F(CountersTableUnittest, UtidLookupUpid) {
+  uint64_t timestamp = 1000;
+  uint32_t value = 3000;
+  uint32_t name_id = 1;
+
+  uint32_t utid = context_.process_tracker->UpdateThread(timestamp, 1, 0);
+
+  context_.storage->mutable_counters()->AddCounter(
+      timestamp, 0 /* dur */, name_id, value, utid, RefType::kUtidLookupUpid);
+
+  PrepareValidStatement("SELECT value, ref, ref_type FROM counters");
+
+  ASSERT_EQ(sqlite3_step(*stmt_), SQLITE_ROW);
+  ASSERT_EQ(sqlite3_column_int(*stmt_, 0), value);
+  ASSERT_EQ(sqlite3_column_type(*stmt_, 1), SQLITE_NULL);
+  ASSERT_STREQ(reinterpret_cast<const char*>(sqlite3_column_text(*stmt_, 2)),
+               "upid");
+
+  ASSERT_EQ(sqlite3_step(*stmt_), SQLITE_DONE);
+
+  // Simulate some other processes to avoid a situation where utid == upid so
+  // we cannot assert correctly below.
+  context_.process_tracker->UpdateProcess(4);
+  context_.process_tracker->UpdateProcess(10);
+  context_.process_tracker->UpdateProcess(11);
+
+  auto* thread = context_.storage->GetMutableThread(utid);
+  thread->upid = context_.process_tracker->UpdateProcess(1);
+
+  PrepareValidStatement("SELECT value, ref, ref_type FROM counters");
+
+  ASSERT_EQ(sqlite3_step(*stmt_), SQLITE_ROW);
+  ASSERT_EQ(sqlite3_column_int(*stmt_, 0), value);
+  ASSERT_EQ(sqlite3_column_int(*stmt_, 1), thread->upid.value());
+  ASSERT_STREQ(reinterpret_cast<const char*>(sqlite3_column_text(*stmt_, 2)),
+               "upid");
+
+  ASSERT_EQ(sqlite3_step(*stmt_), SQLITE_DONE);
+}
+
+TEST_F(CountersTableUnittest, UtidLookupUpidSort) {
+  uint64_t timestamp = 1000;
+  uint32_t value = 3000;
+  uint32_t name_id = 1;
+
+  uint32_t utid_a = context_.process_tracker->UpdateThread(timestamp, 100, 0);
+  uint32_t utid_b = context_.process_tracker->UpdateThread(timestamp, 200, 0);
+
+  auto* thread_a = context_.storage->GetMutableThread(utid_a);
+  thread_a->upid = context_.process_tracker->UpdateProcess(100);
+
+  context_.storage->mutable_counters()->AddCounter(
+      timestamp, 0 /* dur */, name_id, value, utid_a, RefType::kUtidLookupUpid);
+  context_.storage->mutable_counters()->AddCounter(timestamp + 1, 0 /* dur */,
+                                                   name_id, value, utid_b,
+                                                   RefType::kUtidLookupUpid);
+
+  PrepareValidStatement("SELECT ts, ref, ref_type FROM counters ORDER BY ref");
+
+  ASSERT_EQ(sqlite3_step(*stmt_), SQLITE_ROW);
+  ASSERT_EQ(sqlite3_column_int(*stmt_, 0), timestamp + 1);
+  ASSERT_EQ(sqlite3_column_type(*stmt_, 1), SQLITE_NULL);
+  ASSERT_STREQ(reinterpret_cast<const char*>(sqlite3_column_text(*stmt_, 2)),
+               "upid");
+
+  ASSERT_EQ(sqlite3_step(*stmt_), SQLITE_ROW);
+  ASSERT_EQ(sqlite3_column_int(*stmt_, 0), timestamp);
+  ASSERT_EQ(sqlite3_column_int(*stmt_, 1), thread_a->upid.value());
+  ASSERT_STREQ(reinterpret_cast<const char*>(sqlite3_column_text(*stmt_, 2)),
+               "upid");
+
+  ASSERT_EQ(sqlite3_step(*stmt_), SQLITE_DONE);
+}
+
 }  // namespace
 }  // namespace trace_processor
 }  // namespace perfetto
diff --git a/src/trace_processor/proto_trace_parser.cc b/src/trace_processor/proto_trace_parser.cc
index f765a33..540b151 100644
--- a/src/trace_processor/proto_trace_parser.cc
+++ b/src/trace_processor/proto_trace_parser.cc
@@ -443,14 +443,6 @@
   }
 
   UniqueTid utid = context_->process_tracker->UpdateThread(ts, pid, 0);
-  auto opt_upid = context_->storage->GetThread(utid).upid;
-  if (!opt_upid.has_value()) {
-    PERFETTO_DLOG("Could not find process associated with utid %" PRIu32
-                  " when parsing mem counters.",
-                  utid);
-    context_->storage->mutable_stats()->mem_counter_no_process++;
-    return;
-  }
 
   // Skip field_id 0 (invalid) and 1 (pid).
   for (size_t field_id = 2; field_id < counter_values.size(); field_id++) {
@@ -461,8 +453,8 @@
     // pre-cached |proc_mem_counter_names_| map.
     StringId name = proc_mem_counter_names_[field_id];
     uint64_t value = counter_values[field_id];
-    context_->event_tracker->PushCounter(ts, value, name, opt_upid.value(),
-                                         RefType::kUpid);
+    context_->event_tracker->PushCounter(ts, value, name, utid,
+                                         RefType::kUtidLookupUpid);
   }
 
   PERFETTO_DCHECK(decoder.IsEndOfBuffer());
@@ -667,17 +659,9 @@
     return;
   }
   UniqueTid utid = context_->process_tracker->UpdateThread(timestamp, pid, 0);
-  auto opt_upid = context_->storage->GetThread(utid).upid;
-  if (!opt_upid.has_value()) {
-    PERFETTO_DLOG("Could not find process associated with utid %" PRIu32
-                  " when parsing rss stat.",
-                  utid);
-    context_->storage->mutable_stats()->rss_stat_no_process++;
-    return;
-  }
 
   context_->event_tracker->PushCounter(timestamp, size, rss_members_[member],
-                                       opt_upid.value(), RefType::kUpid);
+                                       utid, RefType::kUtidLookupUpid);
   PERFETTO_DCHECK(decoder.IsEndOfBuffer());
 }
 
diff --git a/src/trace_processor/sqlite_utils.h b/src/trace_processor/sqlite_utils.h
index 53d1e19..4540a25 100644
--- a/src/trace_processor/sqlite_utils.h
+++ b/src/trace_processor/sqlite_utils.h
@@ -17,6 +17,7 @@
 #ifndef SRC_TRACE_PROCESSOR_SQLITE_UTILS_H_
 #define SRC_TRACE_PROCESSOR_SQLITE_UTILS_H_
 
+#include <math.h>
 #include <sqlite3.h>
 
 #include <functional>
@@ -137,6 +138,99 @@
 #endif  // PERFETTO_BUILDFLAG(PERFETTO_OS_MACOSX)
 
 template <typename T>
+using is_float =
+    typename std::enable_if<std::is_floating_point<T>::value, T>::type;
+
+template <typename T>
+using is_int = typename std::enable_if<std::is_integral<T>::value, T>::type;
+
+// Greater bound for floating point numbers.
+template <typename T, typename sqlite_utils::is_float<T>* = nullptr>
+T FindGtBound(bool is_eq, sqlite3_value* sqlite_val) {
+  constexpr auto kMax = static_cast<long double>(std::numeric_limits<T>::max());
+  auto type = sqlite3_value_type(sqlite_val);
+  if (type != SQLITE_INTEGER && type != SQLITE_FLOAT) {
+    return kMax;
+  }
+
+  // If this is a strict gt bound then just get the next highest float
+  // after value.
+  auto value = ExtractSqliteValue<T>(sqlite_val);
+  return is_eq ? value : nexttoward(value, kMax);
+}
+
+template <typename T, typename sqlite_utils::is_int<T>* = nullptr>
+T FindGtBound(bool is_eq, sqlite3_value* sqlite_val) {
+  auto type = sqlite3_value_type(sqlite_val);
+  if (type == SQLITE_INTEGER) {
+    auto value = ExtractSqliteValue<T>(sqlite_val);
+    return is_eq ? value : value + 1;
+  } else if (type == SQLITE_FLOAT) {
+    auto value = ExtractSqliteValue<double>(sqlite_val);
+    auto above = ceil(value);
+    auto cast = static_cast<T>(above);
+    return value < above ? cast : (is_eq ? cast : cast + 1);
+  } else {
+    return std::numeric_limits<T>::max();
+  }
+}
+
+template <typename T, typename sqlite_utils::is_float<T>* = nullptr>
+T FindLtBound(bool is_eq, sqlite3_value* sqlite_val) {
+  constexpr auto kMin =
+      static_cast<long double>(std::numeric_limits<T>::lowest());
+  auto type = sqlite3_value_type(sqlite_val);
+  if (type != SQLITE_INTEGER && type != SQLITE_FLOAT) {
+    return kMin;
+  }
+
+  // If this is a strict lt bound then just get the next lowest float
+  // before value.
+  auto value = ExtractSqliteValue<T>(sqlite_val);
+  return is_eq ? value : nexttoward(value, kMin);
+}
+
+template <typename T, typename sqlite_utils::is_int<T>* = nullptr>
+T FindLtBound(bool is_eq, sqlite3_value* sqlite_val) {
+  auto type = sqlite3_value_type(sqlite_val);
+  if (type == SQLITE_INTEGER) {
+    auto value = ExtractSqliteValue<T>(sqlite_val);
+    return is_eq ? value : value - 1;
+  } else if (type == SQLITE_FLOAT) {
+    auto value = ExtractSqliteValue<double>(sqlite_val);
+    auto below = floor(value);
+    auto cast = static_cast<T>(below);
+    return value > below ? cast : (is_eq ? cast : cast - 1);
+  } else {
+    return std::numeric_limits<T>::max();
+  }
+}
+
+template <typename T, typename sqlite_utils::is_float<T>* = nullptr>
+T FindEqBound(sqlite3_value* sqlite_val) {
+  auto type = sqlite3_value_type(sqlite_val);
+  if (type != SQLITE_INTEGER && type != SQLITE_FLOAT) {
+    return std::numeric_limits<T>::max();
+  }
+  return ExtractSqliteValue<T>(sqlite_val);
+}
+
+template <typename T, typename sqlite_utils::is_int<T>* = nullptr>
+T FindEqBound(sqlite3_value* sqlite_val) {
+  auto type = sqlite3_value_type(sqlite_val);
+  if (type == SQLITE_INTEGER) {
+    return ExtractSqliteValue<T>(sqlite_val);
+  } else if (type == SQLITE_FLOAT) {
+    auto value = ExtractSqliteValue<double>(sqlite_val);
+    auto below = floor(value);
+    auto cast = static_cast<T>(below);
+    return value > below ? std::numeric_limits<T>::max() : cast;
+  } else {
+    return std::numeric_limits<T>::max();
+  }
+}
+
+template <typename T>
 void ReportSqliteResult(sqlite3_context*, T value);
 
 template <>
@@ -236,6 +330,16 @@
   return columns;
 }
 
+template <typename T>
+int CompareValuesAsc(const T& f, const T& s) {
+  return f < s ? -1 : (f > s ? 1 : 0);
+}
+
+template <typename T>
+int CompareValuesDesc(const T& f, const T& s) {
+  return -CompareValuesAsc(f, s);
+}
+
 }  // namespace sqlite_utils
 }  // namespace trace_processor
 }  // namespace perfetto
diff --git a/src/trace_processor/storage_schema.cc b/src/trace_processor/storage_schema.cc
index e381d1f..63b25d5 100644
--- a/src/trace_processor/storage_schema.cc
+++ b/src/trace_processor/storage_schema.cc
@@ -86,13 +86,13 @@
     return [this](uint32_t f, uint32_t s) {
       uint64_t a = (*ts_start_)[f] + (*dur_)[f];
       uint64_t b = (*ts_start_)[s] + (*dur_)[s];
-      return a > b ? -1 : (a < b ? 1 : 0);
+      return sqlite_utils::CompareValuesDesc(a, b);
     };
   }
   return [this](uint32_t f, uint32_t s) {
     uint64_t a = (*ts_start_)[f] + (*dur_)[f];
     uint64_t b = (*ts_start_)[s] + (*dur_)[s];
-    return a < b ? -1 : (a > b ? 1 : 0);
+    return sqlite_utils::CompareValuesAsc(a, b);
   };
 }
 
diff --git a/src/trace_processor/storage_schema.h b/src/trace_processor/storage_schema.h
index ebca90f..60f9607 100644
--- a/src/trace_processor/storage_schema.h
+++ b/src/trace_processor/storage_schema.h
@@ -98,26 +98,24 @@
       if (!is_naturally_ordered_)
         return bounds;
 
-      auto min = std::numeric_limits<T>::min();
-      auto max = std::numeric_limits<T>::max();
-
       // Makes the below code much more readable.
       using namespace sqlite_utils;
 
-      // Try and bound the min and max value based on the constraints.
-      auto value = sqlite_utils::ExtractSqliteValue<T>(sqlite_val);
+      T min = kTMin;
+      T max = kTMax;
       if (IsOpGe(op) || IsOpGt(op)) {
-        min = IsOpGe(op) ? value : value + 1;
+        min = FindGtBound<T>(IsOpGe(op), sqlite_val);
       } else if (IsOpLe(op) || IsOpLt(op)) {
-        max = IsOpLe(op) ? value : value - 1;
+        max = FindLtBound<T>(IsOpLe(op), sqlite_val);
       } else if (IsOpEq(op)) {
-        min = value;
-        max = value;
-      } else {
-        // We cannot bound on this constraint.
-        return bounds;
+        auto val = FindEqBound<T>(sqlite_val);
+        min = val;
+        max = val;
       }
 
+      if (min <= kTMin && max >= kTMax)
+        return bounds;
+
       // Convert the values into indices into the deque.
       auto min_it = std::lower_bound(deque_->begin(), deque_->end(), min);
       bounds.min_idx =
@@ -144,15 +142,11 @@
     Comparator Sort(const QueryConstraints::OrderBy& ob) const override {
       if (ob.desc) {
         return [this](uint32_t f, uint32_t s) {
-          T a = (*deque_)[f];
-          T b = (*deque_)[s];
-          return a > b ? -1 : (a < b ? 1 : 0);
+          return sqlite_utils::CompareValuesDesc((*deque_)[f], (*deque_)[s]);
         };
       }
       return [this](uint32_t f, uint32_t s) {
-        T a = (*deque_)[f];
-        T b = (*deque_)[s];
-        return a < b ? -1 : (a > b ? 1 : 0);
+        return sqlite_utils::CompareValuesAsc((*deque_)[f], (*deque_)[s]);
       };
     }
 
@@ -175,6 +169,9 @@
     }
 
    private:
+    T kTMin = std::numeric_limits<T>::lowest();
+    T kTMax = std::numeric_limits<T>::max();
+
     template <typename C>
     Predicate FilterWithCast(int op, sqlite3_value* value) const {
       auto binary_op = sqlite_utils::GetPredicateForOp<C>(op);
@@ -223,13 +220,13 @@
         return [this](uint32_t f, uint32_t s) {
           const std::string& a = (*string_map_)[(*deque_)[f]];
           const std::string& b = (*string_map_)[(*deque_)[s]];
-          return a > b ? -1 : (a < b ? 1 : 0);
+          return sqlite_utils::CompareValuesDesc(a, b);
         };
       }
       return [this](uint32_t f, uint32_t s) {
         const std::string& a = (*string_map_)[(*deque_)[f]];
         const std::string& b = (*string_map_)[(*deque_)[s]];
-        return a < b ? -1 : (a > b ? 1 : 0);
+        return sqlite_utils::CompareValuesAsc(a, b);
       };
     }
 
diff --git a/src/trace_processor/trace_storage.h b/src/trace_processor/trace_storage.h
index 9100c34..3cceb00 100644
--- a/src/trace_processor/trace_storage.h
+++ b/src/trace_processor/trace_storage.h
@@ -51,7 +51,8 @@
   kIrq = 3,
   kSoftIrq = 4,
   kUpid = 5,
-  kMax = kUpid + 1
+  kUtidLookupUpid = 6,
+  kMax = kUtidLookupUpid + 1
 };
 
 // Stores a data inside a trace file in a columnar form. This makes it efficient
@@ -294,7 +295,7 @@
   virtual StringId InternString(base::StringView);
 
   Process* GetMutableProcess(UniquePid upid) {
-    PERFETTO_DCHECK(upid > 0 && upid < unique_processes_.size());
+    PERFETTO_DCHECK(upid < unique_processes_.size());
     return &unique_processes_[upid];
   }
 
@@ -310,7 +311,7 @@
   }
 
   const Process& GetProcess(UniquePid upid) const {
-    PERFETTO_DCHECK(upid > 0 && upid < unique_processes_.size());
+    PERFETTO_DCHECK(upid < unique_processes_.size());
     return unique_processes_[upid];
   }
 
diff --git a/src/traced/probes/filesystem/inode_file_data_source.cc b/src/traced/probes/filesystem/inode_file_data_source.cc
index 9d3a098..02eed04 100644
--- a/src/traced/probes/filesystem/inode_file_data_source.cc
+++ b/src/traced/probes/filesystem/inode_file_data_source.cc
@@ -182,9 +182,10 @@
     PERFETTO_DLOG("%" PRIu64 " inodes found in cache", cache_found_count);
 }
 
-void InodeFileDataSource::Flush() {
+void InodeFileDataSource::Flush(FlushRequestID,
+                                std::function<void()> callback) {
   ResetTracePacket();
-  writer_->Flush();
+  writer_->Flush(callback);
 }
 
 void InodeFileDataSource::OnInodes(
diff --git a/src/traced/probes/filesystem/inode_file_data_source.h b/src/traced/probes/filesystem/inode_file_data_source.h
index 143df66..2afdeed 100644
--- a/src/traced/probes/filesystem/inode_file_data_source.h
+++ b/src/traced/probes/filesystem/inode_file_data_source.h
@@ -85,7 +85,7 @@
 
   // ProbesDataSource implementation.
   void Start() override;
-  void Flush() override;
+  void Flush(FlushRequestID, std::function<void()> callback) override;
 
  protected:
   std::multimap<BlockDeviceID, std::string> mount_points_;
diff --git a/src/traced/probes/ftrace/atrace_wrapper.cc b/src/traced/probes/ftrace/atrace_wrapper.cc
index 5987d46..e0e63f8 100644
--- a/src/traced/probes/ftrace/atrace_wrapper.cc
+++ b/src/traced/probes/ftrace/atrace_wrapper.cc
@@ -17,6 +17,7 @@
 #include "src/traced/probes/ftrace/atrace_wrapper.h"
 
 #include <fcntl.h>
+#include <poll.h>
 #include <stdint.h>
 #include <string.h>
 #include <sys/stat.h>
@@ -25,6 +26,8 @@
 #include <unistd.h>
 
 #include "perfetto/base/logging.h"
+#include "perfetto/base/pipe.h"
+#include "perfetto/base/time.h"
 
 namespace perfetto {
 
@@ -45,50 +48,91 @@
   argv.push_back(nullptr);
 
   // Create the pipe for the child process to return stderr.
-  int filedes[2];
-  if (pipe(filedes) == -1)
-    return false;
+  base::Pipe err_pipe = base::Pipe::Create(base::Pipe::kRdNonBlock);
 
   pid_t pid = fork();
   PERFETTO_CHECK(pid >= 0);
   if (pid == 0) {
     // Duplicate the write end of the pipe into stderr.
-    if ((dup2(filedes[1], STDERR_FILENO) == -1)) {
+    if ((dup2(*err_pipe.wr, STDERR_FILENO) == -1)) {
       const char kError[] = "Unable to duplicate stderr fd";
-      write(filedes[1], kError, sizeof(kError));
+      base::ignore_result(write(*err_pipe.wr, kError, sizeof(kError)));
       _exit(1);
     }
 
     // Close stdin/out + any file descriptor that we might have mistakenly
-    // not marked as FD_CLOEXEC.
+    // not marked as FD_CLOEXEC. |err_pipe| is FD_CLOEXEC and will be
+    // automatically closed on exec.
     for (int i = 0; i < 128; i++) {
       if (i != STDERR_FILENO)
         close(i);
     }
 
-    // Close the read and write end of the pipe fds.
-    close(filedes[1]);
-    close(filedes[0]);
-
     execv("/system/bin/atrace", &argv[0]);
+
     // Reached only if execv fails.
     _exit(1);
   }
 
   // Close the write end of the pipe.
-  close(filedes[1]);
+  err_pipe.wr.reset();
 
   // Collect the output from child process.
-  std::string error;
   char buffer[4096];
-  while (true) {
-    ssize_t count = PERFETTO_EINTR(read(filedes[0], buffer, sizeof(buffer)));
-    if (count == 0 || count == -1)
+  std::string error;
+
+  // Get the read end of the pipe.
+  constexpr uint8_t kFdCount = 1;
+  struct pollfd fds[kFdCount]{};
+  fds[0].fd = *err_pipe.rd;
+  fds[0].events = POLLIN;
+
+  // Store the start time of atrace and setup the timeout.
+  constexpr auto timeout = base::TimeMillis(7500);
+  auto start = base::GetWallTimeMs();
+  for (;;) {
+    // Check if we are below the timeout and update the select timeout to
+    // the time remaining.
+    auto now = base::GetWallTimeMs();
+    auto remaining = timeout - (now - start);
+    auto timeout_ms = static_cast<int>(remaining.count());
+    if (timeout_ms <= 0) {
+      // Kill atrace.
+      kill(pid, SIGKILL);
+
+      std::string cmdline = "/system/bin/atrace";
+      for (const auto& arg : args) {
+        cmdline += " " + arg;
+      }
+      error.append("Timed out waiting for atrace (cmdline: " + cmdline + ")");
       break;
+    }
+
+    // Wait for the value of the timeout.
+    auto ret = poll(fds, kFdCount, timeout_ms);
+    if (ret == 0 || (ret < 0 && errno == EINTR)) {
+      // Either timeout occured in poll (in which case continue so that this
+      // will be picked up by our own timeout logic) or we received an EINTR and
+      // we should try again.
+      continue;
+    } else if (ret < 0) {
+      error.append("Error while polling atrace stderr");
+      break;
+    }
+
+    // Data is available to be read from the fd.
+    int64_t count = PERFETTO_EINTR(read(*err_pipe.rd, buffer, sizeof(buffer)));
+    if (ret < 0 && errno == EAGAIN) {
+      continue;
+    } else if (count < 0) {
+      error.append("Error while reading atrace stderr");
+      break;
+    } else if (count == 0) {
+      // EOF so we can exit this loop.
+      break;
+    }
     error.append(buffer, static_cast<size_t>(count));
   }
-  // Close the read end of the pipe.
-  close(filedes[0]);
 
   // Wait until the child process exits fully.
   PERFETTO_EINTR(waitpid(pid, &status, 0));
@@ -96,7 +140,7 @@
   bool ok = WIFEXITED(status) && WEXITSTATUS(status) == 0;
   if (!ok) {
     // TODO(lalitm): use the stderr result from atrace.
-    base::ignore_result(error);
+    PERFETTO_ELOG("%s", error.c_str());
   }
   return ok;
 }
diff --git a/src/traced/probes/ftrace/cpu_reader.cc b/src/traced/probes/ftrace/cpu_reader.cc
index e607cf7..baa89aa 100644
--- a/src/traced/probes/ftrace/cpu_reader.cc
+++ b/src/traced/probes/ftrace/cpu_reader.cc
@@ -136,23 +136,16 @@
                      base::ScopedFile fd,
                      std::function<void()> on_data_available)
     : table_(table), cpu_(cpu), trace_fd_(std::move(fd)) {
-  int pipe_fds[2];
-  PERFETTO_CHECK(pipe(&pipe_fds[0]) == 0);
-  staging_read_fd_.reset(pipe_fds[0]);
-  staging_write_fd_.reset(pipe_fds[1]);
+  // Both reads and writes from/to the staging pipe are always non-blocking.
+  // Note: O_NONBLOCK seems to be ignored by splice() on the target pipe. The
+  // blocking vs non-blocking behavior is controlled solely by the
+  // SPLICE_F_NONBLOCK flag passed to splice().
+  staging_pipe_ = base::Pipe::Create(base::Pipe::kBothNonBlock);
 
   // Make reads from the raw pipe blocking so that splice() can sleep.
   PERFETTO_CHECK(trace_fd_);
   SetBlocking(*trace_fd_, true);
 
-  // Reads from the staging pipe are always non-blocking.
-  SetBlocking(*staging_read_fd_, false);
-
-  // Note: O_NONBLOCK seems to be ignored by splice() on the target pipe. The
-  // blocking vs non-blocking behavior is controlled solely by the
-  // SPLICE_F_NONBLOCK flag passed to splice().
-  SetBlocking(*staging_write_fd_, false);
-
   // We need a non-default SIGPIPE handler to make it so that the blocking
   // splice() is woken up when the ~CpuReader() dtor destroys the pipes.
   // Just masking out the signal would cause an implicit syscall restart and
@@ -172,7 +165,7 @@
 
   worker_thread_ =
       std::thread(std::bind(&RunWorkerThread, cpu_, *trace_fd_,
-                            *staging_write_fd_, on_data_available, &cmd_));
+                            *staging_pipe_.wr, on_data_available, &cmd_));
 }
 
 CpuReader::~CpuReader() {
@@ -271,7 +264,7 @@
   for (;;) {
     uint8_t* buffer = GetBuffer();
     long bytes =
-        PERFETTO_EINTR(read(*staging_read_fd_, buffer, base::kPageSize));
+        PERFETTO_EINTR(read(*staging_pipe_.rd, buffer, base::kPageSize));
     if (bytes == -1 && errno == EAGAIN)
       break;
     PERFETTO_CHECK(static_cast<size_t>(bytes) == base::kPageSize);
diff --git a/src/traced/probes/ftrace/cpu_reader.h b/src/traced/probes/ftrace/cpu_reader.h
index bdd067f..7cedd50 100644
--- a/src/traced/probes/ftrace/cpu_reader.h
+++ b/src/traced/probes/ftrace/cpu_reader.h
@@ -28,6 +28,7 @@
 
 #include "perfetto/base/gtest_prod_util.h"
 #include "perfetto/base/paged_memory.h"
+#include "perfetto/base/pipe.h"
 #include "perfetto/base/scoped_file.h"
 #include "perfetto/base/thread_checker.h"
 #include "perfetto/protozero/message.h"
@@ -193,8 +194,7 @@
   const ProtoTranslationTable* const table_;
   const size_t cpu_;
   base::ScopedFile trace_fd_;
-  base::ScopedFile staging_read_fd_;
-  base::ScopedFile staging_write_fd_;
+  base::Pipe staging_pipe_;
   base::PagedMemory buffer_;
   std::thread worker_thread_;
   std::atomic<ThreadCtl> cmd_{kRun};
diff --git a/src/traced/probes/ftrace/ftrace_data_source.cc b/src/traced/probes/ftrace/ftrace_data_source.cc
index d7bc843..0272cd4 100644
--- a/src/traced/probes/ftrace/ftrace_data_source.cc
+++ b/src/traced/probes/ftrace/ftrace_data_source.cc
@@ -65,7 +65,7 @@
     controller_weak_->DumpFtraceStats(stats);
 }
 
-void FtraceDataSource::Flush() {
+void FtraceDataSource::Flush(FlushRequestID, std::function<void()> callback) {
   // TODO(primiano): this still doesn't flush data from the kernel ftrace
   // buffers (see b/73886018). We should do that and delay the
   // NotifyFlushComplete() until the ftrace data has been drained from the
@@ -73,7 +73,7 @@
   if (!writer_)
     return;
   WriteStats();
-  writer_->Flush();
+  writer_->Flush(callback);
 }
 
 void FtraceDataSource::WriteStats() {
diff --git a/src/traced/probes/ftrace/ftrace_data_source.h b/src/traced/probes/ftrace/ftrace_data_source.h
index e576a5a..f15c1d1 100644
--- a/src/traced/probes/ftrace/ftrace_data_source.h
+++ b/src/traced/probes/ftrace/ftrace_data_source.h
@@ -67,7 +67,7 @@
 
   // Flushes the ftrace buffers into the userspace trace buffers and writes
   // also ftrace stats.
-  void Flush() override;
+  void Flush(FlushRequestID, std::function<void()> callback) override;
 
   FtraceConfigId config_id() const { return config_id_; }
   const FtraceConfig& config() const { return config_; }
diff --git a/src/traced/probes/probes_data_source.h b/src/traced/probes/probes_data_source.h
index 13d47e1..1e0e8cb 100644
--- a/src/traced/probes/probes_data_source.h
+++ b/src/traced/probes/probes_data_source.h
@@ -17,6 +17,8 @@
 #ifndef SRC_TRACED_PROBES_PROBES_DATA_SOURCE_H_
 #define SRC_TRACED_PROBES_PROBES_DATA_SOURCE_H_
 
+#include <functional>
+
 #include "perfetto/tracing/core/basic_types.h"
 
 namespace perfetto {
@@ -29,7 +31,7 @@
   virtual ~ProbesDataSource();
 
   virtual void Start() = 0;
-  virtual void Flush() = 0;
+  virtual void Flush(FlushRequestID, std::function<void()> callback) = 0;
 
   const TracingSessionID tracing_session_id;
   const int type_id;
diff --git a/src/traced/probes/probes_producer.cc b/src/traced/probes/probes_producer.cc
index c309180..b7d666c 100644
--- a/src/traced/probes/probes_producer.cc
+++ b/src/traced/probes/probes_producer.cc
@@ -47,6 +47,10 @@
 
 constexpr uint32_t kInitialConnectionBackoffMs = 100;
 constexpr uint32_t kMaxConnectionBackoffMs = 30 * 1000;
+
+// Should be larger than FtraceController::kFlushTimeoutMs.
+constexpr uint32_t kFlushTimeoutMs = 1000;
+
 constexpr char kFtraceSourceName[] = "linux.ftrace";
 constexpr char kProcessStatsSourceName[] = "linux.process_stats";
 constexpr char kInodeMapSourceName[] = "linux.inode_file_map";
@@ -280,12 +284,64 @@
 void ProbesProducer::Flush(FlushRequestID flush_request_id,
                            const DataSourceInstanceID* data_source_ids,
                            size_t num_data_sources) {
+  PERFETTO_DCHECK(flush_request_id);
+  auto weak_this = weak_factory_.GetWeakPtr();
+
+  // Issue a Flush() to all started data sources.
+  bool flush_queued = false;
   for (size_t i = 0; i < num_data_sources; i++) {
-    auto it = data_sources_.find(data_source_ids[i]);
+    DataSourceInstanceID ds_id = data_source_ids[i];
+    auto it = data_sources_.find(ds_id);
     if (it == data_sources_.end() || !it->second->started)
       continue;
-    it->second->Flush();
+    pending_flushes_.emplace(flush_request_id, ds_id);
+    flush_queued = true;
+    auto flush_callback = [weak_this, flush_request_id, ds_id] {
+      if (weak_this)
+        weak_this->OnDataSourceFlushComplete(flush_request_id, ds_id);
+    };
+    it->second->Flush(flush_request_id, flush_callback);
   }
+
+  // If there is nothing to flush, ack immediately.
+  if (!flush_queued) {
+    endpoint_->NotifyFlushComplete(flush_request_id);
+    return;
+  }
+
+  // Otherwise, post the timeout task.
+  task_runner_->PostDelayedTask(
+      [weak_this, flush_request_id] {
+        if (weak_this)
+          weak_this->OnFlushTimeout(flush_request_id);
+      },
+      kFlushTimeoutMs);
+}
+
+void ProbesProducer::OnDataSourceFlushComplete(FlushRequestID flush_request_id,
+                                               DataSourceInstanceID ds_id) {
+  PERFETTO_DLOG("Flush %" PRIu64 " acked by data source %" PRIu64,
+                flush_request_id, ds_id);
+  auto range = pending_flushes_.equal_range(flush_request_id);
+  for (auto it = range.first; it != range.second; it++) {
+    if (it->second == ds_id) {
+      pending_flushes_.erase(it);
+      break;
+    }
+  }
+
+  if (pending_flushes_.count(flush_request_id))
+    return;  // Still waiting for other data sources to ack.
+
+  PERFETTO_DLOG("All data sources acked to flush %" PRIu64, flush_request_id);
+  endpoint_->NotifyFlushComplete(flush_request_id);
+}
+
+void ProbesProducer::OnFlushTimeout(FlushRequestID flush_request_id) {
+  if (pending_flushes_.count(flush_request_id) == 0)
+    return;  // All acked.
+  PERFETTO_ELOG("Flush(%" PRIu64 ") timed out", flush_request_id);
+  pending_flushes_.erase(flush_request_id);
   endpoint_->NotifyFlushComplete(flush_request_id);
 }
 
diff --git a/src/traced/probes/probes_producer.h b/src/traced/probes/probes_producer.h
index 14737d6..44ac19e 100644
--- a/src/traced/probes/probes_producer.h
+++ b/src/traced/probes/probes_producer.h
@@ -95,6 +95,8 @@
   void Restart();
   void ResetConnectionBackoff();
   void IncreaseConnectionBackoff();
+  void OnDataSourceFlushComplete(FlushRequestID, DataSourceInstanceID);
+  void OnFlushTimeout(FlushRequestID);
 
   State state_ = kNotStarted;
   base::TaskRunner* task_runner_ = nullptr;
@@ -112,6 +114,9 @@
   std::unordered_multimap<TracingSessionID, ProbesDataSource*>
       session_data_sources_;
 
+  std::unordered_multimap<FlushRequestID, DataSourceInstanceID>
+      pending_flushes_;
+
   std::unordered_map<DataSourceInstanceID, base::Watchdog::Timer> watchdogs_;
   LRUInodeCache cache_{kLRUInodeCacheSize};
   std::map<BlockDeviceID, std::unordered_map<Inode, InodeMapValue>>
diff --git a/src/traced/probes/ps/process_stats_data_source.cc b/src/traced/probes/ps/process_stats_data_source.cc
index ddff880..858f25f 100644
--- a/src/traced/probes/ps/process_stats_data_source.cc
+++ b/src/traced/probes/ps/process_stats_data_source.cc
@@ -161,11 +161,12 @@
   FinalizeCurPacket();
 }
 
-void ProcessStatsDataSource::Flush() {
+void ProcessStatsDataSource::Flush(FlushRequestID,
+                                   std::function<void()> callback) {
   // We shouldn't get this in the middle of WriteAllProcesses() or OnPids().
   PERFETTO_DCHECK(!cur_ps_tree_);
   PERFETTO_DCHECK(!cur_ps_stats_);
-  writer_->Flush();
+  writer_->Flush(callback);
 }
 
 void ProcessStatsDataSource::WriteProcessOrThread(int32_t pid) {
diff --git a/src/traced/probes/ps/process_stats_data_source.h b/src/traced/probes/ps/process_stats_data_source.h
index 65b6e2e..f5329cf 100644
--- a/src/traced/probes/ps/process_stats_data_source.h
+++ b/src/traced/probes/ps/process_stats_data_source.h
@@ -57,7 +57,7 @@
 
   // ProbesDataSource implementation.
   void Start() override;
-  void Flush() override;
+  void Flush(FlushRequestID, std::function<void()> callback) override;
 
   bool on_demand_dumps_enabled() const { return enable_on_demand_dumps_; }
 
diff --git a/src/traced/probes/ps/process_stats_data_source_unittest.cc b/src/traced/probes/ps/process_stats_data_source_unittest.cc
index 4587cda..b985bcc 100644
--- a/src/traced/probes/ps/process_stats_data_source_unittest.cc
+++ b/src/traced/probes/ps/process_stats_data_source_unittest.cc
@@ -169,7 +169,7 @@
 
   data_source->Start();
   task_runner_.RunUntilCheckpoint("all_done");
-  data_source->Flush();
+  data_source->Flush(1 /* FlushRequestId */, []() {});
 
   // |packet| will contain the merge of all kNumIter packets written.
   std::unique_ptr<protos::TracePacket> packet = writer_raw_->ParseProto();
diff --git a/src/traced/probes/sys_stats/sys_stats_data_source.cc b/src/traced/probes/sys_stats/sys_stats_data_source.cc
index 61d869d..befe396 100644
--- a/src/traced/probes/sys_stats/sys_stats_data_source.cc
+++ b/src/traced/probes/sys_stats/sys_stats_data_source.cc
@@ -297,8 +297,8 @@
   return weak_factory_.GetWeakPtr();
 }
 
-void SysStatsDataSource::Flush() {
-  writer_->Flush();
+void SysStatsDataSource::Flush(FlushRequestID, std::function<void()> callback) {
+  writer_->Flush(callback);
 }
 
 size_t SysStatsDataSource::ReadFile(base::ScopedFile* fd, const char* path) {
diff --git a/src/traced/probes/sys_stats/sys_stats_data_source.h b/src/traced/probes/sys_stats/sys_stats_data_source.h
index bdb5e10..0fa085c 100644
--- a/src/traced/probes/sys_stats/sys_stats_data_source.h
+++ b/src/traced/probes/sys_stats/sys_stats_data_source.h
@@ -56,7 +56,7 @@
 
   // ProbesDataSource implementation.
   void Start() override;
-  void Flush() override;
+  void Flush(FlushRequestID, std::function<void()> callback) override;
 
   base::WeakPtr<SysStatsDataSource> GetWeakPtr() const;
 
diff --git a/src/tracing/core/trace_writer_impl.cc b/src/tracing/core/trace_writer_impl.cc
index bfdf1f4..0d0e3e2 100644
--- a/src/tracing/core/trace_writer_impl.cc
+++ b/src/tracing/core/trace_writer_impl.cc
@@ -68,10 +68,12 @@
   if (cur_chunk_.is_valid()) {
     shmem_arbiter_->ReturnCompletedChunk(std::move(cur_chunk_), target_buffer_,
                                          &patch_list_);
-    shmem_arbiter_->FlushPendingCommitDataRequests(callback);
   } else {
     PERFETTO_DCHECK(patch_list_.empty());
   }
+  // Always issue the Flush request, even if there is nothing to flush, just
+  // for the sake of getting the callback posted back.
+  shmem_arbiter_->FlushPendingCommitDataRequests(callback);
   protobuf_stream_writer_.Reset({nullptr, nullptr});
 }
 
diff --git a/tools/ftrace_proto_gen/ftrace_proto_gen.cc b/tools/ftrace_proto_gen/ftrace_proto_gen.cc
index 4a1747b..7c6d4ac 100644
--- a/tools/ftrace_proto_gen/ftrace_proto_gen.cc
+++ b/tools/ftrace_proto_gen/ftrace_proto_gen.cc
@@ -25,6 +25,7 @@
 
 #include "perfetto/base/file_utils.h"
 #include "perfetto/base/logging.h"
+#include "perfetto/base/pipe.h"
 #include "perfetto/base/string_splitter.h"
 
 namespace perfetto {
@@ -43,30 +44,17 @@
   return haystack.find(needle) != std::string::npos;
 }
 
-int SetNonBlocking(int fd) {
-  int flags = fcntl(fd, F_GETFL, 0);
-  if (flags == -1)
-    return -1;
-  return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
-}
-
 std::string RunClangFmt(const std::string& input) {
   std::string output;
   pid_t pid;
-  int input_pipes[2];
-  int output_pipes[2];
-  PERFETTO_CHECK(pipe(input_pipes) != -1);
-  PERFETTO_CHECK(SetNonBlocking(input_pipes[0]) != -1);
-  PERFETTO_CHECK(SetNonBlocking(input_pipes[1]) != -1);
-  PERFETTO_CHECK(pipe(output_pipes) != -1);
-  PERFETTO_CHECK(SetNonBlocking(output_pipes[0]) != -1);
-  PERFETTO_CHECK(SetNonBlocking(output_pipes[1]) != -1);
+  base::Pipe input_pipe = base::Pipe::Create(base::Pipe::kBothNonBlock);
+  base::Pipe output_pipe = base::Pipe::Create(base::Pipe::kBothNonBlock);
   if ((pid = fork()) == 0) {
     // Child
-    PERFETTO_CHECK(dup2(input_pipes[0], STDIN_FILENO) != -1);
-    PERFETTO_CHECK(dup2(output_pipes[1], STDOUT_FILENO) != -1);
-    close(input_pipes[1]);
-    close(output_pipes[0]);
+    PERFETTO_CHECK(dup2(*input_pipe.rd, STDIN_FILENO) != -1);
+    PERFETTO_CHECK(dup2(*output_pipe.wr, STDOUT_FILENO) != -1);
+    input_pipe.wr.reset();
+    output_pipe.rd.reset();
     PERFETTO_CHECK(execl("buildtools/linux64/clang-format", "clang-format",
                          nullptr) != -1);
   }
@@ -74,15 +62,15 @@
   // Parent
   size_t written = 0;
   size_t bytes_read = 0;
-  close(input_pipes[0]);
-  close(output_pipes[1]);
+  input_pipe.rd.reset();
+  output_pipe.wr.reset();
   // This cannot be left uninitialized because there's as continue statement
   // before the first assignment to this in the loop.
   ssize_t r = -1;
   do {
     if (written < input.size()) {
       ssize_t w =
-          write(input_pipes[1], &(input[written]), input.size() - written);
+          write(*input_pipe.wr, &(input[written]), input.size() - written);
       if (w == -1) {
         if (errno == EAGAIN || errno == EINTR)
           continue;
@@ -90,12 +78,12 @@
       }
       written += static_cast<size_t>(w);
       if (written == input.size())
-        close(input_pipes[1]);
+        input_pipe.wr.reset();
     }
 
     if (bytes_read + base::kPageSize > output.size())
       output.resize(output.size() + base::kPageSize);
-    r = read(output_pipes[0], &(output[bytes_read]), base::kPageSize);
+    r = read(*output_pipe.rd, &(output[bytes_read]), base::kPageSize);
     if (r == -1) {
       if (errno == EAGAIN || errno == EINTR)
         continue;
diff --git a/tools/install-build-deps b/tools/install-build-deps
index e886ebc..49b949e 100755
--- a/tools/install-build-deps
+++ b/tools/install-build-deps
@@ -149,13 +149,13 @@
   # This is the amalgamated source whose compiled output is meant to be faster.
   # We still pull the full source for the extensions (not amalgamated).
   ('buildtools/sqlite.zip',
-   'https://storage.googleapis.com/perfetto/sqlite-amalgamation-3230100.zip',
-   '98729f2c2d57d166e3d6d9862744c1d66388e286',
+   'https://storage.googleapis.com/perfetto/sqlite-amalgamation-3250300.zip',
+   'b78c2cb0d2c9182686c582312479f96a82bf5380',
    'all'
   ),
   ('buildtools/sqlite_src.zip',
-   'https://storage.googleapis.com/perfetto/sqlite-src-3230100.zip',
-   '90bea4e46a01e9c27356b19c3150a8bf146c09fc',
+   'https://storage.googleapis.com/perfetto/sqlite-src-3250300.zip',
+   'd1af2883bb800852946f9bf8ab6055e7698e18ee',
    'all'
   ),
 
diff --git a/tools/pipestats.cc b/tools/pipestats.cc
index 6279efc..df83569 100644
--- a/tools/pipestats.cc
+++ b/tools/pipestats.cc
@@ -25,6 +25,7 @@
 #include <unistd.h>  // pipe
 
 #include "perfetto/base/logging.h"
+#include "perfetto/base/pipe.h"
 #include "perfetto/base/scoped_file.h"
 #include "perfetto/base/time.h"
 #include "perfetto/base/utils.h"
@@ -51,24 +52,17 @@
   PERFETTO_CHECK(trace_fd);
   std::thread reader(ReadLoop, trace_fd.get());
 
-  int pipe_fds[2];
-  PERFETTO_CHECK(pipe(&pipe_fds[0]) == 0);
-  base::ScopedFile staging_read_fd(pipe_fds[0]);
-  base::ScopedFile staging_write_fd(pipe_fds[1]);
+  // Reads from the staging pipe are always non-blocking.
+  // Note: O_NONBLOCK seems to be ignored by splice() on the target pipe. The
+  // blocking vs non-blocking behavior is controlled solely by the
+  // SPLICE_F_NONBLOCK flag passed to splice().
+  base::Pipe pipe = base::Pipe::Create(base::Pipe::kBothNonBlock);
 
   // Make reads from the raw pipe blocking so that splice() can sleep.
   SetBlocking(*trace_fd, true);
 
-  // Reads from the staging pipe are always non-blocking.
-  SetBlocking(*staging_read_fd, false);
-
-  // Note: O_NONBLOCK seems to be ignored by splice() on the target pipe. The
-  // blocking vs non-blocking behavior is controlled solely by the
-  // SPLICE_F_NONBLOCK flag passed to splice().
-  SetBlocking(*staging_write_fd, false);
-
   for (;;) {
-    ssize_t splice_res = splice(*trace_fd, nullptr, *staging_write_fd, nullptr,
+    ssize_t splice_res = splice(*trace_fd, nullptr, *pipe.wr, nullptr,
                                 base::kPageSize, SPLICE_F_MOVE);
     if (splice_res > 0) {
       auto cur = base::GetWallTimeNs();
diff --git a/tools/trace_to_text/BUILD.gn b/tools/trace_to_text/BUILD.gn
index 019b244..97ada9c 100644
--- a/tools/trace_to_text/BUILD.gn
+++ b/tools/trace_to_text/BUILD.gn
@@ -80,6 +80,15 @@
       "../../gn:default_deps",
     ]
   }
+
+  # WASM is too permissive, build a normal version of the binary to test for
+  # missing symbols.
+  executable("trace_to_text_lite_host") {
+    deps = [
+      ":lite",
+      "../../gn:default_deps",
+    ]
+  }
 }
 
 wasm_lib("trace_to_text_wasm") {
diff --git a/tools/trace_to_text/trace_to_profile.cc b/tools/trace_to_text/trace_to_profile.cc
index 9849090..4ddb9b7 100644
--- a/tools/trace_to_text/trace_to_profile.cc
+++ b/tools/trace_to_text/trace_to_profile.cc
@@ -20,6 +20,7 @@
 
 #include <algorithm>
 #include <map>
+#include <set>
 #include <vector>
 
 #include "tools/trace_to_text/utils.h"