tp: Extract Flush logic from NotifyEndOfFile

Change-Id: I96c63de7d298f9c8a079bbd2ecdf4add8d9be7f8
diff --git a/Android.bp b/Android.bp
index 89c806c..b8c49bc 100644
--- a/Android.bp
+++ b/Android.bp
@@ -8900,6 +8900,7 @@
     srcs: [
         "src/trace_processor/iterator_impl.cc",
         "src/trace_processor/read_trace.cc",
+        "src/trace_processor/read_trace_internal.cc",
         "src/trace_processor/trace_processor.cc",
         "src/trace_processor/trace_processor_impl.cc",
     ],
diff --git a/BUILD b/BUILD
index c7c272d..a7605eb 100644
--- a/BUILD
+++ b/BUILD
@@ -1545,6 +1545,8 @@
         "src/trace_processor/iterator_impl.cc",
         "src/trace_processor/iterator_impl.h",
         "src/trace_processor/read_trace.cc",
+        "src/trace_processor/read_trace_internal.cc",
+        "src/trace_processor/read_trace_internal.h",
         "src/trace_processor/trace_processor.cc",
         "src/trace_processor/trace_processor_impl.cc",
         "src/trace_processor/trace_processor_impl.h",
diff --git a/include/perfetto/trace_processor/trace_processor_storage.h b/include/perfetto/trace_processor/trace_processor_storage.h
index d166b68..631e34c 100644
--- a/include/perfetto/trace_processor/trace_processor_storage.h
+++ b/include/perfetto/trace_processor/trace_processor_storage.h
@@ -51,10 +51,16 @@
   // For compatibility with older API clients.
   util::Status Parse(std::unique_ptr<uint8_t[]> buf, size_t size);
 
-  // When parsing a bounded file (as opposite to streaming from a device) this
-  // function should be called when the last chunk of the file has been passed
-  // into Parse(). This allows to flush the events queued in the ordering stage,
-  // without having to wait for their time window to expire.
+  // Forces all data in the trace to be pushed to tables without buffering data
+  // in sorting queues. This is useful if queries need to be performed to
+  // compute post-processing data (e.g. deobfuscation, symbolization etc) which
+  // will be appended to the trace in a future call to Parse.
+  virtual void Flush() = 0;
+
+  // Calls Flush and finishes all of the actions required for parsing the trace.
+  // Should only be called once: in v28, calling this function multiple times
+  // will simply log an error but in subsequent versions, this will become
+  // undefined behaviour.
   virtual void NotifyEndOfFile() = 0;
 };
 
diff --git a/src/trace_processor/BUILD.gn b/src/trace_processor/BUILD.gn
index 17d5982..49214c3 100644
--- a/src/trace_processor/BUILD.gn
+++ b/src/trace_processor/BUILD.gn
@@ -334,6 +334,8 @@
       "iterator_impl.cc",
       "iterator_impl.h",
       "read_trace.cc",
+      "read_trace_internal.cc",
+      "read_trace_internal.h",
       "trace_processor.cc",
       "trace_processor_impl.cc",
       "trace_processor_impl.h",
diff --git a/src/trace_processor/read_trace.cc b/src/trace_processor/read_trace.cc
index c4cb67a..64a400b 100644
--- a/src/trace_processor/read_trace.cc
+++ b/src/trace_processor/read_trace.cc
@@ -28,6 +28,7 @@
 #include "src/trace_processor/forwarding_trace_parser.h"
 #include "src/trace_processor/importers/gzip/gzip_trace_parser.h"
 #include "src/trace_processor/importers/proto/proto_trace_tokenizer.h"
+#include "src/trace_processor/read_trace_internal.h"
 #include "src/trace_processor/util/gzip_utils.h"
 #include "src/trace_processor/util/status_macros.h"
 
@@ -44,36 +45,6 @@
 namespace trace_processor {
 namespace {
 
-// 1MB chunk size seems the best tradeoff on a MacBook Pro 2013 - i7 2.8 GHz.
-constexpr size_t kChunkSize = 1024 * 1024;
-
-util::Status ReadTraceUsingRead(
-    TraceProcessor* tp,
-    int fd,
-    uint64_t* file_size,
-    const std::function<void(uint64_t parsed_size)>& progress_callback) {
-  // Load the trace in chunks using ordinary read().
-  for (int i = 0;; i++) {
-    if (progress_callback && i % 128 == 0)
-      progress_callback(*file_size);
-
-    TraceBlob blob = TraceBlob::Allocate(kChunkSize);
-    auto rsize = base::Read(fd, blob.data(), blob.size());
-    if (rsize == 0)
-      break;
-
-    if (rsize < 0) {
-      return util::ErrStatus("Reading trace file failed (errno: %d, %s)", errno,
-                             strerror(errno));
-    }
-
-    *file_size += static_cast<uint64_t>(rsize);
-    TraceBlobView blob_view(std::move(blob), 0, static_cast<size_t>(rsize));
-    RETURN_IF_ERROR(tp->Parse(std::move(blob_view)));
-  }
-  return util::OkStatus();
-}
-
 class SerializingProtoTraceReader : public ChunkedTraceReader {
  public:
   explicit SerializingProtoTraceReader(std::vector<uint8_t>* output)
@@ -111,49 +82,8 @@
     TraceProcessor* tp,
     const char* filename,
     const std::function<void(uint64_t parsed_size)>& progress_callback) {
-  base::ScopedFile fd(base::OpenFile(filename, O_RDONLY));
-  if (!fd)
-    return util::ErrStatus("Could not open trace file (path: %s)", filename);
-
-  uint64_t bytes_read = 0;
-
-#if TRACE_PROCESSOR_HAS_MMAP()
-  char* no_mmap = getenv("TRACE_PROCESSOR_NO_MMAP");
-  uint64_t whole_size_64 = static_cast<uint64_t>(lseek(*fd, 0, SEEK_END));
-  lseek(*fd, 0, SEEK_SET);
-  bool use_mmap = !no_mmap || *no_mmap != '1';
-  if (sizeof(size_t) < 8 && whole_size_64 > 2147483648ULL)
-    use_mmap = false;  // Cannot use mmap on 32-bit systems for files > 2GB.
-
-  if (use_mmap) {
-    const size_t whole_size = static_cast<size_t>(whole_size_64);
-    void* file_mm = mmap(nullptr, whole_size, PROT_READ, MAP_PRIVATE, *fd, 0);
-    if (file_mm != MAP_FAILED) {
-      TraceBlobView whole_mmap(TraceBlob::FromMmap(file_mm, whole_size));
-      // Parse the file in chunks so we get some status update on stdio.
-      static constexpr size_t kMmapChunkSize = 128ul * 1024 * 1024;
-      while (bytes_read < whole_size_64) {
-        progress_callback(bytes_read);
-        const size_t bytes_read_z = static_cast<size_t>(bytes_read);
-        size_t slice_size = std::min(whole_size - bytes_read_z, kMmapChunkSize);
-        TraceBlobView slice = whole_mmap.slice_off(bytes_read_z, slice_size);
-        RETURN_IF_ERROR(tp->Parse(std::move(slice)));
-        bytes_read += slice_size;
-      }  // while (slices)
-    }    // if (!MAP_FAILED)
-  }      // if (use_mmap)
-  if (bytes_read == 0)
-    PERFETTO_LOG("Cannot use mmap on this system. Falling back on read()");
-#endif  // TRACE_PROCESSOR_HAS_MMAP()
-  if (bytes_read == 0) {
-    RETURN_IF_ERROR(
-        ReadTraceUsingRead(tp, *fd, &bytes_read, progress_callback));
-  }
+  ReadTraceUnfinalized(tp, filename, progress_callback);
   tp->NotifyEndOfFile();
-  tp->SetCurrentTraceName(filename);
-
-  if (progress_callback)
-    progress_callback(bytes_read);
   return util::OkStatus();
 }
 
diff --git a/src/trace_processor/read_trace_internal.cc b/src/trace_processor/read_trace_internal.cc
new file mode 100644
index 0000000..4f56cc0
--- /dev/null
+++ b/src/trace_processor/read_trace_internal.cc
@@ -0,0 +1,126 @@
+/*
+ * Copyright (C) 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "src/trace_processor/read_trace_internal.h"
+
+#include "perfetto/base/logging.h"
+#include "perfetto/ext/base/file_utils.h"
+#include "perfetto/ext/base/scoped_file.h"
+#include "perfetto/ext/base/utils.h"
+#include "perfetto/protozero/proto_utils.h"
+#include "perfetto/trace_processor/trace_processor.h"
+
+#include "perfetto/trace_processor/trace_blob.h"
+#include "perfetto/trace_processor/trace_blob_view.h"
+#include "src/trace_processor/forwarding_trace_parser.h"
+#include "src/trace_processor/importers/gzip/gzip_trace_parser.h"
+#include "src/trace_processor/importers/proto/proto_trace_tokenizer.h"
+#include "src/trace_processor/util/gzip_utils.h"
+#include "src/trace_processor/util/status_macros.h"
+
+#include "protos/perfetto/trace/trace.pbzero.h"
+#include "protos/perfetto/trace/trace_packet.pbzero.h"
+
+#if TRACE_PROCESSOR_HAS_MMAP()
+#include <stdlib.h>
+#include <sys/mman.h>
+#include <unistd.h>
+#endif
+namespace perfetto {
+namespace trace_processor {
+namespace {
+
+// 1MB chunk size seems the best tradeoff on a MacBook Pro 2013 - i7 2.8 GHz.
+constexpr size_t kChunkSize = 1024 * 1024;
+
+util::Status ReadTraceUsingRead(
+    TraceProcessor* tp,
+    int fd,
+    uint64_t* file_size,
+    const std::function<void(uint64_t parsed_size)>& progress_callback) {
+  // Load the trace in chunks using ordinary read().
+  for (int i = 0;; i++) {
+    if (progress_callback && i % 128 == 0)
+      progress_callback(*file_size);
+
+    TraceBlob blob = TraceBlob::Allocate(kChunkSize);
+    auto rsize = base::Read(fd, blob.data(), blob.size());
+    if (rsize == 0)
+      break;
+
+    if (rsize < 0) {
+      return util::ErrStatus("Reading trace file failed (errno: %d, %s)", errno,
+                             strerror(errno));
+    }
+
+    *file_size += static_cast<uint64_t>(rsize);
+    TraceBlobView blob_view(std::move(blob), 0, static_cast<size_t>(rsize));
+    RETURN_IF_ERROR(tp->Parse(std::move(blob_view)));
+  }
+  return util::OkStatus();
+}
+}  // namespace
+
+util::Status ReadTraceUnfinalized(
+    TraceProcessor* tp,
+    const char* filename,
+    const std::function<void(uint64_t parsed_size)>& progress_callback) {
+  base::ScopedFile fd(base::OpenFile(filename, O_RDONLY));
+  if (!fd)
+    return util::ErrStatus("Could not open trace file (path: %s)", filename);
+
+  uint64_t bytes_read = 0;
+
+#if TRACE_PROCESSOR_HAS_MMAP()
+  char* no_mmap = getenv("TRACE_PROCESSOR_NO_MMAP");
+  uint64_t whole_size_64 = static_cast<uint64_t>(lseek(*fd, 0, SEEK_END));
+  lseek(*fd, 0, SEEK_SET);
+  bool use_mmap = !no_mmap || *no_mmap != '1';
+  if (sizeof(size_t) < 8 && whole_size_64 > 2147483648ULL)
+    use_mmap = false;  // Cannot use mmap on 32-bit systems for files > 2GB.
+
+  if (use_mmap) {
+    const size_t whole_size = static_cast<size_t>(whole_size_64);
+    void* file_mm = mmap(nullptr, whole_size, PROT_READ, MAP_PRIVATE, *fd, 0);
+    if (file_mm != MAP_FAILED) {
+      TraceBlobView whole_mmap(TraceBlob::FromMmap(file_mm, whole_size));
+      // Parse the file in chunks so we get some status update on stdio.
+      static constexpr size_t kMmapChunkSize = 128ul * 1024 * 1024;
+      while (bytes_read < whole_size_64) {
+        progress_callback(bytes_read);
+        const size_t bytes_read_z = static_cast<size_t>(bytes_read);
+        size_t slice_size = std::min(whole_size - bytes_read_z, kMmapChunkSize);
+        TraceBlobView slice = whole_mmap.slice_off(bytes_read_z, slice_size);
+        RETURN_IF_ERROR(tp->Parse(std::move(slice)));
+        bytes_read += slice_size;
+      }  // while (slices)
+    }    // if (!MAP_FAILED)
+  }      // if (use_mmap)
+  if (bytes_read == 0)
+    PERFETTO_LOG("Cannot use mmap on this system. Falling back on read()");
+#endif  // TRACE_PROCESSOR_HAS_MMAP()
+  if (bytes_read == 0) {
+    RETURN_IF_ERROR(
+        ReadTraceUsingRead(tp, *fd, &bytes_read, progress_callback));
+  }
+  tp->SetCurrentTraceName(filename);
+
+  if (progress_callback)
+    progress_callback(bytes_read);
+  return util::OkStatus();
+}
+}  // namespace trace_processor
+}  // namespace perfetto
diff --git a/src/trace_processor/read_trace_internal.h b/src/trace_processor/read_trace_internal.h
new file mode 100644
index 0000000..f4bb56e
--- /dev/null
+++ b/src/trace_processor/read_trace_internal.h
@@ -0,0 +1,40 @@
+/*
+ * Copyright (C) 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef SRC_TRACE_PROCESSOR_READ_TRACE_INTERNAL_H_
+#define SRC_TRACE_PROCESSOR_READ_TRACE_INTERNAL_H_
+
+#include <functional>
+#include <vector>
+
+#include "perfetto/base/export.h"
+#include "perfetto/trace_processor/status.h"
+
+namespace perfetto {
+namespace trace_processor {
+
+class TraceProcessor;
+
+// Reads trace without Flushing the data at the end.
+util::Status PERFETTO_EXPORT_COMPONENT ReadTraceUnfinalized(
+    TraceProcessor* tp,
+    const char* filename,
+    const std::function<void(uint64_t parsed_size)>& progress_callback = {});
+
+}  // namespace trace_processor
+}  // namespace perfetto
+
+#endif  // SRC_TRACE_PROCESSOR_READ_TRACE_INTERNAL_H_
diff --git a/src/trace_processor/trace_processor_impl.cc b/src/trace_processor/trace_processor_impl.cc
index 75f46cd..212af21 100644
--- a/src/trace_processor/trace_processor_impl.cc
+++ b/src/trace_processor/trace_processor_impl.cc
@@ -1086,20 +1086,35 @@
   current_trace_name_ = name;
 }
 
-void TraceProcessorImpl::NotifyEndOfFile() {
-  if (current_trace_name_.empty())
-    current_trace_name_ = "Unnamed trace";
-
-  TraceProcessorStorageImpl::NotifyEndOfFile();
+void TraceProcessorImpl::Flush() {
+  TraceProcessorStorageImpl::Flush();
 
   context_.metadata_tracker->SetMetadata(
       metadata::trace_size_bytes,
       Variadic::Integer(static_cast<int64_t>(bytes_parsed_)));
   BuildBoundsTable(*db_, context_.storage->GetTraceTimestampBoundsNs());
+}
 
-  // Create a snapshot of all tables and views created so far. This is so later
-  // we can drop all extra tables created by the UI and reset to the original
-  // state (see RestoreInitialTables).
+void TraceProcessorImpl::NotifyEndOfFile() {
+  if (notify_eof_called_) {
+    PERFETTO_ELOG(
+        "NotifyEndOfFile should only be called once. Try calling Flush instead "
+        "if trying to commit the contents of the trace to tables.");
+    PERFETTO_DCHECK(!notify_eof_called_);
+  }
+  notify_eof_called_ = true;
+
+  if (current_trace_name_.empty())
+    current_trace_name_ = "Unnamed trace";
+
+  // Last opportunity to flush all pending data.
+  Flush();
+
+  TraceProcessorStorageImpl::NotifyEndOfFile();
+
+  // Create a snapshot list of all tables and views created so far. This is so
+  // later we can drop all extra tables created by the UI and reset to the
+  // original state (see RestoreInitialTables).
   initial_tables_.clear();
   auto it = ExecuteQuery(kAllTablesQuery);
   while (it.Next()) {
diff --git a/src/trace_processor/trace_processor_impl.h b/src/trace_processor/trace_processor_impl.h
index 3fab656..17dfab2 100644
--- a/src/trace_processor/trace_processor_impl.h
+++ b/src/trace_processor/trace_processor_impl.h
@@ -59,6 +59,7 @@
 
   // TraceProcessorStorage implementation:
   base::Status Parse(TraceBlobView) override;
+  void Flush() override;
   void NotifyEndOfFile() override;
 
   // TraceProcessor implementation:
@@ -140,6 +141,10 @@
 
   std::string current_trace_name_;
   uint64_t bytes_parsed_ = 0;
+
+  // NotifyEndOfFile should only be called once. Set to true whenever it is
+  // called.
+  bool notify_eof_called_ = false;
 };
 
 }  // namespace trace_processor
diff --git a/src/trace_processor/trace_processor_shell.cc b/src/trace_processor/trace_processor_shell.cc
index 1232a90..3998b78 100644
--- a/src/trace_processor/trace_processor_shell.cc
+++ b/src/trace_processor/trace_processor_shell.cc
@@ -46,6 +46,7 @@
 #include "src/trace_processor/metrics/all_chrome_metrics.descriptor.h"
 #include "src/trace_processor/metrics/metrics.descriptor.h"
 #include "src/trace_processor/metrics/metrics.h"
+#include "src/trace_processor/read_trace_internal.h"
 #include "src/trace_processor/util/proto_to_json.h"
 #include "src/trace_processor/util/status_macros.h"
 
@@ -910,11 +911,12 @@
 }
 
 base::Status LoadTrace(const std::string& trace_file_path, double* size_mb) {
-  base::Status read_status =
-      ReadTrace(g_tp, trace_file_path.c_str(), [&size_mb](size_t parsed_size) {
+  base::Status read_status = ReadTraceUnfinalized(
+      g_tp, trace_file_path.c_str(), [&size_mb](size_t parsed_size) {
         *size_mb = static_cast<double>(parsed_size) / 1E6;
         fprintf(stderr, "\rLoading trace: %.2f MB\r", *size_mb);
       });
+  g_tp->Flush();
   if (!read_status.ok()) {
     return base::ErrStatus("Could not read trace file (path: %s): %s",
                            trace_file_path.c_str(), read_status.c_message());
@@ -936,7 +938,7 @@
             return;
           }
         });
-    g_tp->NotifyEndOfFile();
+    g_tp->Flush();
   }
 
   auto maybe_map = profiling::GetPerfettoProguardMapPath();
@@ -953,6 +955,7 @@
           }
         });
   }
+  g_tp->NotifyEndOfFile();
   return base::OkStatus();
 }
 
diff --git a/src/trace_processor/trace_processor_storage_impl.cc b/src/trace_processor/trace_processor_storage_impl.cc
index c503ed2..25087d5 100644
--- a/src/trace_processor/trace_processor_storage_impl.cc
+++ b/src/trace_processor/trace_processor_storage_impl.cc
@@ -117,21 +117,27 @@
   return status;
 }
 
+void TraceProcessorStorageImpl::Flush() {
+  if (unrecoverable_parse_error_)
+    return;
+
+  if (context_.sorter)
+    context_.sorter->ExtractEventsForced();
+}
+
 void TraceProcessorStorageImpl::NotifyEndOfFile() {
   if (unrecoverable_parse_error_ || !context_.chunk_reader)
     return;
-
+  Flush();
   context_.chunk_reader->NotifyEndOfFile();
-  if (context_.sorter)
-    context_.sorter->ExtractEventsForced();
-  context_.event_tracker->FlushPendingEvents();
-  context_.slice_tracker->FlushPendingSlices();
-  context_.heap_profile_tracker->NotifyEndOfFile();
   for (std::unique_ptr<ProtoImporterModule>& module : context_.modules) {
     module->NotifyEndOfFile();
   }
-  context_.process_tracker->NotifyEndOfFile();
+  context_.event_tracker->FlushPendingEvents();
+  context_.slice_tracker->FlushPendingSlices();
+  context_.heap_profile_tracker->NotifyEndOfFile();
   context_.args_tracker->Flush();
+  context_.process_tracker->NotifyEndOfFile();
 }
 
 }  // namespace trace_processor
diff --git a/src/trace_processor/trace_processor_storage_impl.h b/src/trace_processor/trace_processor_storage_impl.h
index 901256b..fda951a 100644
--- a/src/trace_processor/trace_processor_storage_impl.h
+++ b/src/trace_processor/trace_processor_storage_impl.h
@@ -34,6 +34,7 @@
   ~TraceProcessorStorageImpl() override;
 
   util::Status Parse(TraceBlobView) override;
+  void Flush() override;
   void NotifyEndOfFile() override;
 
   TraceProcessorContext* context() { return &context_; }
diff --git a/src/traceconv/symbolize_profile.cc b/src/traceconv/symbolize_profile.cc
index 1bb2cc3..00cc7b9 100644
--- a/src/traceconv/symbolize_profile.cc
+++ b/src/traceconv/symbolize_profile.cc
@@ -51,14 +51,16 @@
   std::unique_ptr<trace_processor::TraceProcessor> tp =
       trace_processor::TraceProcessor::CreateInstance(config);
 
-  if (!ReadTrace(tp.get(), input))
+  if (!ReadTraceUnfinalized(tp.get(), input))
     PERFETTO_FATAL("Failed to read trace.");
 
-  tp->NotifyEndOfFile();
+  tp->Flush();
 
   SymbolizeDatabase(
       tp.get(), symbolizer.get(),
       [output](const std::string& trace_proto) { *output << trace_proto; });
+
+  tp->NotifyEndOfFile();
   return 0;
 }
 
diff --git a/src/traceconv/trace_to_hprof.cc b/src/traceconv/trace_to_hprof.cc
index 113922a..d60e193 100644
--- a/src/traceconv/trace_to_hprof.cc
+++ b/src/traceconv/trace_to_hprof.cc
@@ -328,7 +328,7 @@
   trace_processor::Config config;
   std::unique_ptr<trace_processor::TraceProcessor> tp =
       trace_processor::TraceProcessor::CreateInstance(config);
-  if (!ReadTrace(tp.get(), input))
+  if (!ReadTraceUnfinalized(tp.get(), input))
     return false;
   tp->NotifyEndOfFile();
   return TraceToHprof(tp.get(), output, pid, timestamps[0]);
diff --git a/src/traceconv/trace_to_json.cc b/src/traceconv/trace_to_json.cc
index dfad4f2..429b4d7 100644
--- a/src/traceconv/trace_to_json.cc
+++ b/src/traceconv/trace_to_json.cc
@@ -91,7 +91,7 @@
   std::unique_ptr<trace_processor::TraceProcessor> tp =
       trace_processor::TraceProcessor::CreateInstance(config);
 
-  if (!ReadTrace(tp.get(), input))
+  if (!ReadTraceUnfinalized(tp.get(), input))
     return 1;
   tp->NotifyEndOfFile();
 
diff --git a/src/traceconv/trace_to_profile.cc b/src/traceconv/trace_to_profile.cc
index 234165a..6211f02 100644
--- a/src/traceconv/trace_to_profile.cc
+++ b/src/traceconv/trace_to_profile.cc
@@ -76,7 +76,7 @@
                                [tp](const std::string& trace_proto) {
                                  IngestTraceOrDie(tp, trace_proto);
                                });
-  tp->NotifyEndOfFile();
+  tp->Flush();
 }
 
 void MaybeDeobfuscate(trace_processor::TraceProcessor* tp) {
@@ -88,7 +88,7 @@
       maybe_map, [tp](const std::string& trace_proto) {
         IngestTraceOrDie(tp, trace_proto);
       });
-  tp->NotifyEndOfFile();
+  tp->Flush();
 }
 
 int TraceToProfile(
@@ -105,15 +105,15 @@
   std::unique_ptr<trace_processor::TraceProcessor> tp =
       trace_processor::TraceProcessor::CreateInstance(config);
 
-  if (!ReadTrace(tp.get(), input))
+  if (!ReadTraceUnfinalized(tp.get(), input))
     return -1;
-
-  tp->NotifyEndOfFile();
+  tp->Flush();
   MaybeSymbolize(tp.get());
   MaybeDeobfuscate(tp.get());
 
   TraceToPprof(tp.get(), &profiles, conversion_mode, conversion_flags, pid,
                timestamps);
+  tp->NotifyEndOfFile();
   if (profiles.empty()) {
     return 0;
   }
diff --git a/src/traceconv/trace_to_systrace.cc b/src/traceconv/trace_to_systrace.cc
index 51a6140..d89f0d6 100644
--- a/src/traceconv/trace_to_systrace.cc
+++ b/src/traceconv/trace_to_systrace.cc
@@ -286,7 +286,7 @@
   std::unique_ptr<trace_processor::TraceProcessor> tp =
       trace_processor::TraceProcessor::CreateInstance(config);
 
-  if (!ReadTrace(tp.get(), input))
+  if (!ReadTraceUnfinalized(tp.get(), input))
     return 1;
   tp->NotifyEndOfFile();
 
diff --git a/src/traceconv/utils.cc b/src/traceconv/utils.cc
index 5611fb9..d223d8d 100644
--- a/src/traceconv/utils.cc
+++ b/src/traceconv/utils.cc
@@ -48,7 +48,8 @@
 
 }  // namespace
 
-bool ReadTrace(trace_processor::TraceProcessor* tp, std::istream* input) {
+bool ReadTraceUnfinalized(trace_processor::TraceProcessor* tp,
+                          std::istream* input) {
   // 1MB chunk size seems the best tradeoff on a MacBook Pro 2013 - i7 2.8 GHz.
   constexpr size_t kChunkSize = 1024 * 1024;
 
diff --git a/src/traceconv/utils.h b/src/traceconv/utils.h
index 45f8daa..ce9f8ce 100644
--- a/src/traceconv/utils.h
+++ b/src/traceconv/utils.h
@@ -53,7 +53,8 @@
 constexpr char kProgressChar = '\r';
 #endif
 
-bool ReadTrace(trace_processor::TraceProcessor* tp, std::istream* input);
+bool ReadTraceUnfinalized(trace_processor::TraceProcessor* tp,
+                          std::istream* input);
 void IngestTraceOrDie(trace_processor::TraceProcessor* tp,
                       const std::string& trace_proto);