Merge "trace_processor: Use more precise timestamps for atoms if possible"
diff --git a/src/trace_processor/importers/proto/statsd_module.cc b/src/trace_processor/importers/proto/statsd_module.cc
index 35bcb36..a3cce7a 100644
--- a/src/trace_processor/importers/proto/statsd_module.cc
+++ b/src/trace_processor/importers/proto/statsd_module.cc
@@ -16,12 +16,15 @@
 #include "src/trace_processor/importers/proto/statsd_module.h"
 
 #include "perfetto/ext/base/string_utils.h"
+#include "perfetto/protozero/scattered_heap_buffer.h"
 #include "protos/perfetto/trace/statsd/statsd_atom.pbzero.h"
 #include "protos/perfetto/trace/trace_packet.pbzero.h"
 #include "src/trace_processor/importers/common/async_track_set_tracker.h"
 #include "src/trace_processor/importers/common/slice_tracker.h"
 #include "src/trace_processor/importers/common/track_tracker.h"
+#include "src/trace_processor/importers/proto/packet_sequence_state.h"
 #include "src/trace_processor/sorter/trace_sorter.h"
+#include "src/trace_processor/storage/stats.h"
 #include "src/trace_processor/storage/trace_storage.h"
 #include "src/trace_processor/util/descriptors.h"
 
@@ -140,6 +143,7 @@
 
 }  // namespace
 
+using perfetto::protos::pbzero::StatsdAtom;
 using perfetto::protos::pbzero::TracePacket;
 
 PoolAndDescriptor::PoolAndDescriptor(const uint8_t* data,
@@ -163,6 +167,44 @@
 
 StatsdModule::~StatsdModule() = default;
 
+ModuleResult StatsdModule::TokenizePacket(const TracePacket::Decoder& decoder,
+                                          TraceBlobView* /*packet*/,
+                                          int64_t packet_timestamp,
+                                          PacketSequenceState* state,
+                                          uint32_t field_id) {
+  if (field_id != TracePacket::kStatsdAtomFieldNumber) {
+    return ModuleResult::Ignored();
+  }
+  const auto& atoms_wrapper = StatsdAtom::Decoder(decoder.statsd_atom());
+  auto it_timestamps = atoms_wrapper.timestamp_nanos();
+  for (auto it = atoms_wrapper.atom(); it; ++it) {
+    int64_t atom_timestamp;
+
+    if (it_timestamps) {
+      atom_timestamp = *it_timestamps++;
+    } else {
+      context_->storage->IncrementStats(stats::atom_timestamp_missing);
+      atom_timestamp = packet_timestamp;
+    }
+
+    protozero::HeapBuffered<TracePacket> forged;
+
+    forged->set_timestamp(static_cast<uint64_t>(atom_timestamp));
+
+    auto* statsd = forged->set_statsd_atom();
+    statsd->AppendBytes(StatsdAtom::kAtomFieldNumber, (*it).data, (*it).size);
+
+    std::vector<uint8_t> vec = forged.SerializeAsArray();
+    TraceBlob blob = TraceBlob::CopyFrom(vec.data(), vec.size());
+
+    context_->sorter->PushTracePacket(atom_timestamp,
+                                      state->current_generation(),
+                                      TraceBlobView(std::move(blob)));
+  }
+
+  return ModuleResult::Handled();
+}
+
 void StatsdModule::ParseTracePacketData(const TracePacket::Decoder& decoder,
                                         int64_t ts,
                                         const TracePacketData&,
@@ -170,11 +212,13 @@
   if (field_id != TracePacket::kStatsdAtomFieldNumber) {
     return;
   }
-  const auto& atoms_wrapper =
-      protos::pbzero::StatsdAtom::Decoder(decoder.statsd_atom());
-  for (auto it = atoms_wrapper.atom(); it; ++it) {
-    ParseAtom(ts, *it);
-  }
+  const auto& atoms_wrapper = StatsdAtom::Decoder(decoder.statsd_atom());
+  auto it = atoms_wrapper.atom();
+  // There should be exactly one atom per trace packet at this point.
+  // If not something has gone wrong in tokenization above.
+  PERFETTO_CHECK(it);
+  ParseAtom(ts, *it++);
+  PERFETTO_CHECK(!it);
 }
 
 void StatsdModule::ParseAtom(int64_t ts, protozero::ConstBytes nested_bytes) {
@@ -211,12 +255,14 @@
   StringId* cached_name = atom_names_.Find(atom_field_id);
   if (cached_name == nullptr) {
     if (pool_.descriptor() == nullptr) {
+      context_->storage->IncrementStats(stats::atom_unknown);
       return context_->storage->InternString("Could not load atom descriptor");
     }
 
     const auto& fields = pool_.descriptor()->fields();
     const auto& field_it = fields.find(atom_field_id);
     if (field_it == fields.end()) {
+      context_->storage->IncrementStats(stats::atom_unknown);
       return context_->storage->InternString("Unknown atom");
     }
 
diff --git a/src/trace_processor/importers/proto/statsd_module.h b/src/trace_processor/importers/proto/statsd_module.h
index 653c1eb..07a524d 100644
--- a/src/trace_processor/importers/proto/statsd_module.h
+++ b/src/trace_processor/importers/proto/statsd_module.h
@@ -63,7 +63,13 @@
 
   ~StatsdModule() override;
 
-  void ParseTracePacketData(const protos::pbzero::TracePacket_Decoder& decoder,
+  ModuleResult TokenizePacket(const protos::pbzero::TracePacket::Decoder&,
+                              TraceBlobView* packet,
+                              int64_t packet_timestamp,
+                              PacketSequenceState* state,
+                              uint32_t field_id) override;
+
+  void ParseTracePacketData(const protos::pbzero::TracePacket::Decoder& decoder,
                             int64_t ts,
                             const TracePacketData&,
                             uint32_t field_id) override;
diff --git a/src/trace_processor/storage/stats.h b/src/trace_processor/storage/stats.h
index caea76b..d7b08f4 100644
--- a/src/trace_processor/storage/stats.h
+++ b/src/trace_processor/storage/stats.h
@@ -248,7 +248,12 @@
       "missing some arguments. You may need a newer version of trace "         \
       "processor to parse them."),                                             \
   F(network_trace_intern_errors,          kSingle,  kInfo,     kAnalysis, ""), \
-  F(network_trace_parse_errors,           kSingle,  kInfo,     kAnalysis, "")
+  F(network_trace_parse_errors,           kSingle,  kInfo,     kAnalysis, ""), \
+  F(atom_timestamp_missing,               kSingle,  kError,    kTrace,         \
+      "The corresponding timestamp_nanos entry for a StatsdAtom was "          \
+      "missing. Defaulted to inaccurate packet timestamp."),                   \
+  F(atom_unknown,                         kSingle,  kInfo,     kAnalysis,      \
+      "Unknown statsd atom. Atom descriptor may need to be updated")
 // clang-format on
 
 enum Type {