trace_processor: better error handling while parsing traces

Build upon the Status class to improve error handling and propogation in
trace processor during trace parsing

Change-Id: Ice57e67783cfee05f7a390029ff77fe6fa4fede4
diff --git a/src/trace_processor/chunked_trace_reader.h b/src/trace_processor/chunked_trace_reader.h
index 26a6cc3..bbf93a2 100644
--- a/src/trace_processor/chunked_trace_reader.h
+++ b/src/trace_processor/chunked_trace_reader.h
@@ -22,6 +22,8 @@
 
 #include <memory>
 
+#include "perfetto/trace_processor/basic_types.h"
+
 namespace perfetto {
 namespace trace_processor {
 
@@ -35,9 +37,7 @@
   // caller to match line/protos boundaries. The parser class has to deal with
   // intermediate buffering lines/protos that span across different chunks.
   // The buffer size is guaranteed to be > 0.
-  // Returns true if the data has been succesfully parsed, false if some
-  // unrecoverable parsing error happened and no more chunks should be pushed.
-  virtual bool Parse(std::unique_ptr<uint8_t[]>, size_t) = 0;
+  virtual util::Status Parse(std::unique_ptr<uint8_t[]>, size_t) = 0;
 };
 
 }  // namespace trace_processor
diff --git a/src/trace_processor/fuchsia_trace_tokenizer.cc b/src/trace_processor/fuchsia_trace_tokenizer.cc
index 0c4d216..db90da2 100644
--- a/src/trace_processor/fuchsia_trace_tokenizer.cc
+++ b/src/trace_processor/fuchsia_trace_tokenizer.cc
@@ -69,8 +69,8 @@
 
 FuchsiaTraceTokenizer::~FuchsiaTraceTokenizer() = default;
 
-bool FuchsiaTraceTokenizer::Parse(std::unique_ptr<uint8_t[]> data,
-                                  size_t size) {
+util::Status FuchsiaTraceTokenizer::Parse(std::unique_ptr<uint8_t[]> data,
+                                          size_t size) {
   // The relevant internal state is |leftover_bytes_|. Each call to Parse should
   // maintain the following properties, unless a fatal error occurs in which
   // case it should return false and no assumptions should be made about the
@@ -96,7 +96,7 @@
     // record, so just add the new bytes to |leftover_bytes_| and return.
     leftover_bytes_.insert(leftover_bytes_.end(), data.get() + byte_offset,
                            data.get() + size);
-    return true;
+    return util::OkStatus();
   }
   if (leftover_bytes_.size() > 0) {
     // There is a record starting from leftover bytes.
@@ -141,7 +141,7 @@
       // have to leftover_bytes_ and wait for more.
       leftover_bytes_.insert(leftover_bytes_.end(), data.get() + byte_offset,
                              data.get() + byte_offset + size);
-      return true;
+      return util::OkStatus();
     }
   }
 
@@ -157,10 +157,8 @@
     uint32_t record_len_bytes =
         fuchsia_trace_utils::ReadField<uint32_t>(header, 4, 15) *
         sizeof(uint64_t);
-    if (record_len_bytes == 0) {
-      PERFETTO_DLOG("Unexpected record of size 0");
-      return false;
-    }
+    if (record_len_bytes == 0)
+      return util::ErrStatus("Unexpected record of size 0");
 
     if (record_offset + record_len_bytes > size)
       break;
@@ -175,7 +173,7 @@
   leftover_bytes_.insert(leftover_bytes_.end(),
                          full_view.data() + record_offset,
                          full_view.data() + size);
-  return true;
+  return util::OkStatus();
 }
 
 // Most record types are read and recorded in |TraceStorage| here directly.
diff --git a/src/trace_processor/fuchsia_trace_tokenizer.h b/src/trace_processor/fuchsia_trace_tokenizer.h
index 9f5bad9..be2f7cb 100644
--- a/src/trace_processor/fuchsia_trace_tokenizer.h
+++ b/src/trace_processor/fuchsia_trace_tokenizer.h
@@ -35,7 +35,7 @@
   ~FuchsiaTraceTokenizer() override;
 
   // ChunkedTraceReader implementation
-  bool Parse(std::unique_ptr<uint8_t[]>, size_t) override;
+  util::Status Parse(std::unique_ptr<uint8_t[]>, size_t) override;
 
  private:
   struct ProviderInfo {
diff --git a/src/trace_processor/json_trace_tokenizer.cc b/src/trace_processor/json_trace_tokenizer.cc
index f6e3de6..274d908 100644
--- a/src/trace_processor/json_trace_tokenizer.cc
+++ b/src/trace_processor/json_trace_tokenizer.cc
@@ -98,7 +98,8 @@
     : context_(ctx) {}
 JsonTraceTokenizer::~JsonTraceTokenizer() = default;
 
-bool JsonTraceTokenizer::Parse(std::unique_ptr<uint8_t[]> data, size_t size) {
+util::Status JsonTraceTokenizer::Parse(std::unique_ptr<uint8_t[]> data,
+                                       size_t size) {
   buffer_.insert(buffer_.end(), data.get(), data.get() + size);
   const char* buf = buffer_.data();
   const char* next = buf;
@@ -113,10 +114,8 @@
     while (next != end && *next != '[') {
       next++;
     }
-    if (next == end) {
-      PERFETTO_ELOG("Failed to parse: first chunk missing opening [");
-      return false;
-    }
+    if (next == end)
+      return util::ErrStatus("Failed to parse: first chunk missing opening [");
     next++;
   }
 
@@ -126,7 +125,7 @@
     std::unique_ptr<Json::Value> value(new Json::Value());
     const auto res = ReadOneJsonDict(next, end, value.get(), &next);
     if (res == kFatalError)
-      return false;
+      return util::ErrStatus("Encountered fatal error while parsing JSON");
     if (res == kEndOfTrace || res == kNeedsMoreData)
       break;
 
@@ -143,7 +142,7 @@
 
   offset_ += static_cast<uint64_t>(next - buf);
   buffer_.erase(buffer_.begin(), buffer_.begin() + (next - buf));
-  return true;
+  return util::OkStatus();
 }
 
 }  // namespace trace_processor
diff --git a/src/trace_processor/json_trace_tokenizer.h b/src/trace_processor/json_trace_tokenizer.h
index c2c4ab5..499342a 100644
--- a/src/trace_processor/json_trace_tokenizer.h
+++ b/src/trace_processor/json_trace_tokenizer.h
@@ -48,7 +48,7 @@
   ~JsonTraceTokenizer() override;
 
   // ChunkedTraceReader implementation.
-  bool Parse(std::unique_ptr<uint8_t[]>, size_t) override;
+  util::Status Parse(std::unique_ptr<uint8_t[]>, size_t) override;
 
  private:
   TraceProcessorContext* const context_;
diff --git a/src/trace_processor/metrics/metrics.cc b/src/trace_processor/metrics/metrics.cc
index ac4399f..9de5423 100644
--- a/src/trace_processor/metrics/metrics.cc
+++ b/src/trace_processor/metrics/metrics.cc
@@ -118,15 +118,13 @@
   }
 }
 
-int ComputeMetrics(TraceProcessor* tp,
-                   const std::vector<std::string>& metric_names,
-                   std::vector<uint8_t>* metrics_proto) {
+util::Status ComputeMetrics(TraceProcessor* tp,
+                            const std::vector<std::string>& metric_names,
+                            std::vector<uint8_t>* metrics_proto) {
   // TODO(lalitm): stop hardcoding android.mem metric and read the proto
   // descriptor for this logic instead.
-  if (metric_names.size() != 1 || metric_names[0] != "android.mem") {
-    PERFETTO_ELOG("Only android.mem metric is currently supported");
-    return 1;
-  }
+  if (metric_names.size() != 1 || metric_names[0] != "android.mem")
+    return util::ErrStatus("Only android.mem metric is currently supported");
 
   auto queries = base::SplitString(sql_metrics::kAndroidMem, ";\n");
   for (const auto& query : queries) {
@@ -135,10 +133,8 @@
     prep_it.Next();
 
     util::Status status = prep_it.Status();
-    if (!status.ok()) {
-      PERFETTO_ELOG("SQLite error: %s", status.c_message());
-      return 1;
-    }
+    if (!status.ok())
+      return status;
   }
 
   protozero::ScatteredHeapBuffer delegate;
@@ -153,10 +149,9 @@
   auto it = tp->ExecuteQuery("SELECT COUNT(*) from lmk_by_score;");
   auto has_next = it.Next();
   util::Status status = it.Status();
-  if (!status.ok()) {
-    PERFETTO_ELOG("SQLite error: %s", status.c_message());
-    return 1;
-  }
+  if (!status.ok())
+    return status;
+
   PERFETTO_CHECK(has_next);
   PERFETTO_CHECK(it.Get(0).type == SqlValue::Type::kLong);
 
@@ -180,14 +175,12 @@
     anon->set_avg(it.Get(3).AsDouble());
   }
   status = it.Status();
-  if (!status.ok()) {
-    PERFETTO_ELOG("SQLite error: %s", status.c_message());
-    return 1;
-  }
+  if (!status.ok())
+    return status;
 
   metrics.Finalize();
   *metrics_proto = delegate.StitchSlices();
-  return 0;
+  return util::OkStatus();
 }
 
 }  // namespace metrics
diff --git a/src/trace_processor/metrics/metrics.h b/src/trace_processor/metrics/metrics.h
index 4c85ecb..2183947 100644
--- a/src/trace_processor/metrics/metrics.h
+++ b/src/trace_processor/metrics/metrics.h
@@ -40,9 +40,9 @@
 // This function implements the RUN_METRIC SQL function.
 void RunMetric(sqlite3_context* ctx, int argc, sqlite3_value** argv);
 
-int ComputeMetrics(TraceProcessor* impl,
-                   const std::vector<std::string>& metric_names,
-                   std::vector<uint8_t>* metrics_proto);
+util::Status ComputeMetrics(TraceProcessor* impl,
+                            const std::vector<std::string>& metric_names,
+                            std::vector<uint8_t>* metrics_proto);
 
 }  // namespace metrics
 }  // namespace trace_processor
diff --git a/src/trace_processor/proto_trace_tokenizer.cc b/src/trace_processor/proto_trace_tokenizer.cc
index 41e175e..9b71331 100644
--- a/src/trace_processor/proto_trace_tokenizer.cc
+++ b/src/trace_processor/proto_trace_tokenizer.cc
@@ -103,8 +103,8 @@
     : context_(ctx) {}
 ProtoTraceTokenizer::~ProtoTraceTokenizer() = default;
 
-bool ProtoTraceTokenizer::Parse(std::unique_ptr<uint8_t[]> owned_buf,
-                                size_t size) {
+util::Status ProtoTraceTokenizer::Parse(std::unique_ptr<uint8_t[]> owned_buf,
+                                        size_t size) {
   uint8_t* data = &owned_buf[0];
   if (!partial_buf_.empty()) {
     // It takes ~5 bytes for a proto preamble + the varint size.
@@ -113,7 +113,7 @@
       size_t missing_len = std::min(kHeaderBytes - partial_buf_.size(), size);
       partial_buf_.insert(partial_buf_.end(), &data[0], &data[missing_len]);
       if (partial_buf_.size() < kHeaderBytes)
-        return true;
+        return util::OkStatus();
       data += missing_len;
       size -= missing_len;
     }
@@ -129,8 +129,8 @@
     bool parse_failed = next == pos;
     pos = next;
     if (proto_field_tag != kTracePacketTag || field_size == 0 || parse_failed) {
-      PERFETTO_ELOG("Failed parsing a TracePacket from the partial buffer");
-      return false;  // Unrecoverable error, stop parsing.
+      return util::ErrStatus(
+          "Failed parsing a TracePacket from the partial buffer");
     }
 
     // At this point we know how big the TracePacket is.
@@ -157,26 +157,22 @@
       size -= size_missing;
       partial_buf_.clear();
       uint8_t* buf_start = &buf[0];  // Note that buf is std::moved below.
-      bool success = ParseInternal(std::move(buf), buf_start, size_incl_header);
-      if (PERFETTO_UNLIKELY(!success)) {
-        PERFETTO_ELOG(
-            "Failed to parse trace.  Check if the trace is corrupted.");
-        return false;
-      }
+      util::Status status =
+          ParseInternal(std::move(buf), buf_start, size_incl_header);
+      if (PERFETTO_UNLIKELY(!status.ok()))
+        return status;
     } else {
       partial_buf_.insert(partial_buf_.end(), data, &data[size]);
-      return true;
+      return util::OkStatus();
     }
   }
-  bool success = ParseInternal(std::move(owned_buf), data, size);
-  if (!success)
-    PERFETTO_ELOG("Failed to parse trace. Check if the trace is corrupted.");
-  return success;
+  return ParseInternal(std::move(owned_buf), data, size);
 }
 
-bool ProtoTraceTokenizer::ParseInternal(std::unique_ptr<uint8_t[]> owned_buf,
-                                        uint8_t* data,
-                                        size_t size) {
+util::Status ProtoTraceTokenizer::ParseInternal(
+    std::unique_ptr<uint8_t[]> owned_buf,
+    uint8_t* data,
+    size_t size) {
   PERFETTO_DCHECK(data >= &owned_buf[0]);
   const uint8_t* start = &owned_buf[0];
   const size_t data_off = static_cast<size_t>(data - start);
@@ -185,9 +181,10 @@
   protos::pbzero::Trace::Decoder decoder(data, size);
   for (auto it = decoder.packet(); it; ++it) {
     size_t field_offset = whole_buf.offset_of(it->data());
-    bool success = ParsePacket(whole_buf.slice(field_offset, it->size()));
-    if (PERFETTO_UNLIKELY(!success))
-      return false;
+    util::Status status =
+        ParsePacket(whole_buf.slice(field_offset, it->size()));
+    if (PERFETTO_UNLIKELY(!status.ok()))
+      return status;
   }
 
   const size_t bytes_left = decoder.bytes_left();
@@ -196,13 +193,14 @@
     partial_buf_.insert(partial_buf_.end(), &data[decoder.read_offset()],
                         &data[decoder.read_offset() + bytes_left]);
   }
-  return true;
+  return util::OkStatus();
 }
 
-bool ProtoTraceTokenizer::ParsePacket(TraceBlobView packet) {
+util::Status ProtoTraceTokenizer::ParsePacket(TraceBlobView packet) {
   protos::pbzero::TracePacket::Decoder decoder(packet.data(), packet.length());
   if (PERFETTO_UNLIKELY(decoder.bytes_left()))
-    return false;
+    return util::ErrStatus(
+        "Failed to parse proto packet fully; the trace is probably corrupt.");
 
   auto timestamp = decoder.has_timestamp()
                        ? static_cast<int64_t>(decoder.timestamp())
@@ -225,24 +223,24 @@
     auto ftrace_field = decoder.ftrace_events();
     const size_t fld_off = packet.offset_of(ftrace_field.data);
     ParseFtraceBundle(packet.slice(fld_off, ftrace_field.size));
-    return true;
+    return util::OkStatus();
   }
 
   if (decoder.has_track_event()) {
     ParseTrackEventPacket(decoder, std::move(packet));
-    return true;
+    return util::OkStatus();
   }
 
   if (decoder.has_thread_descriptor()) {
     ParseThreadDescriptorPacket(decoder);
-    return true;
+    return util::OkStatus();
   }
 
   // Use parent data and length because we want to parse this again
   // later to get the exact type of the packet.
   context_->sorter->PushTracePacket(timestamp, std::move(packet));
 
-  return true;
+  return util::OkStatus();
 }
 
 void ProtoTraceTokenizer::HandleIncrementalStateCleared(
diff --git a/src/trace_processor/proto_trace_tokenizer.h b/src/trace_processor/proto_trace_tokenizer.h
index 3b68575..918c25a 100644
--- a/src/trace_processor/proto_trace_tokenizer.h
+++ b/src/trace_processor/proto_trace_tokenizer.h
@@ -55,13 +55,13 @@
   ~ProtoTraceTokenizer() override;
 
   // ChunkedTraceReader implementation.
-  bool Parse(std::unique_ptr<uint8_t[]>, size_t size) override;
+  util::Status Parse(std::unique_ptr<uint8_t[]>, size_t size) override;
 
  private:
-  bool ParseInternal(std::unique_ptr<uint8_t[]> owned_buf,
-                     uint8_t* data,
-                     size_t size);
-  bool ParsePacket(TraceBlobView);
+  util::Status ParseInternal(std::unique_ptr<uint8_t[]> owned_buf,
+                             uint8_t* data,
+                             size_t size);
+  util::Status ParsePacket(TraceBlobView);
   void HandleIncrementalStateCleared(
       const protos::pbzero::TracePacket::Decoder& packet_decoder);
   void HandlePreviousPacketDropped(
diff --git a/src/trace_processor/trace_processor_impl.cc b/src/trace_processor/trace_processor_impl.cc
index d5815f7..66918b4 100644
--- a/src/trace_processor/trace_processor_impl.cc
+++ b/src/trace_processor/trace_processor_impl.cc
@@ -357,10 +357,9 @@
 
   auto scoped_trace = context_.storage->TraceExecutionTimeIntoStats(
       stats::parse_trace_duration_ns);
-  bool success = context_.chunk_reader->Parse(std::move(data), size);
-  unrecoverable_parse_error_ |= !success;
-  return success ? util::OkStatus()
-                 : util::ErrStatus("Failed to parse the trace");
+  util::Status status = context_.chunk_reader->Parse(std::move(data), size);
+  unrecoverable_parse_error_ |= !status.ok();
+  return status;
 }
 
 void TraceProcessorImpl::NotifyEndOfFile() {
@@ -404,7 +403,7 @@
   sqlite3_interrupt(db_.get());
 }
 
-int TraceProcessorImpl::ComputeMetric(
+util::Status TraceProcessorImpl::ComputeMetric(
     const std::vector<std::string>& metric_names,
     std::vector<uint8_t>* metrics_proto) {
   return metrics::ComputeMetrics(this, metric_names, metrics_proto);
diff --git a/src/trace_processor/trace_processor_impl.h b/src/trace_processor/trace_processor_impl.h
index 51846d6..f0c765f 100644
--- a/src/trace_processor/trace_processor_impl.h
+++ b/src/trace_processor/trace_processor_impl.h
@@ -58,8 +58,8 @@
   Iterator ExecuteQuery(const std::string& sql,
                         int64_t time_queued = 0) override;
 
-  int ComputeMetric(const std::vector<std::string>& metric_names,
-                    std::vector<uint8_t>* metrics) override;
+  util::Status ComputeMetric(const std::vector<std::string>& metric_names,
+                             std::vector<uint8_t>* metrics) override;
 
   void InterruptQuery() override;
 
diff --git a/src/trace_processor/trace_processor_shell.cc b/src/trace_processor/trace_processor_shell.cc
index c71bd3b..2d0ad66 100644
--- a/src/trace_processor/trace_processor_shell.cc
+++ b/src/trace_processor/trace_processor_shell.cc
@@ -252,9 +252,9 @@
 
 int RunMetrics(const std::vector<std::string>& metric_names) {
   std::vector<uint8_t> metric_result;
-  int res = g_tp->ComputeMetric(metric_names, &metric_result);
-  if (res) {
-    PERFETTO_ELOG("Error when computing metrics");
+  util::Status status = g_tp->ComputeMetric(metric_names, &metric_result);
+  if (!status.ok()) {
+    PERFETTO_ELOG("Error when computing metrics: %s", status.c_message());
     return 1;
   }
   fwrite(metric_result.data(), sizeof(uint8_t), metric_result.size(), stdout);