[fuchsia_importer] Add flow event support

Bug: fxbug.dev/23051
Change-Id: I73a57cff069ef9340fff942473f2186504d4fd70
diff --git a/src/trace_processor/importers/fuchsia/fuchsia_trace_parser.cc b/src/trace_processor/importers/fuchsia/fuchsia_trace_parser.cc
index c1c5361..d26a2fe 100644
--- a/src/trace_processor/importers/fuchsia/fuchsia_trace_parser.cc
+++ b/src/trace_processor/importers/fuchsia/fuchsia_trace_parser.cc
@@ -18,6 +18,7 @@
 
 #include "src/trace_processor/importers/common/args_tracker.h"
 #include "src/trace_processor/importers/common/event_tracker.h"
+#include "src/trace_processor/importers/common/flow_tracker.h"
 #include "src/trace_processor/importers/common/process_tracker.h"
 #include "src/trace_processor/importers/common/slice_tracker.h"
 #include "src/trace_processor/importers/common/track_tracker.h"
@@ -38,6 +39,9 @@
 constexpr uint32_t kAsyncBegin = 5;
 constexpr uint32_t kAsyncInstant = 6;
 constexpr uint32_t kAsyncEnd = 7;
+constexpr uint32_t kFlowBegin = 8;
+constexpr uint32_t kFlowStep = 9;
+constexpr uint32_t kFlowEnd = 10;
 
 // Argument Types
 constexpr uint32_t kNull = 0;
@@ -398,6 +402,45 @@
           slices->End(ts, track_id, cat, name);
           break;
         }
+        case kFlowBegin: {
+          uint64_t correlation_id;
+          if (!cursor.ReadUint64(&correlation_id)) {
+            context_->storage->IncrementStats(stats::fuchsia_invalid_event);
+            return;
+          }
+          UniqueTid utid =
+              procs->UpdateThread(static_cast<uint32_t>(tinfo.tid),
+                                  static_cast<uint32_t>(tinfo.pid));
+          TrackId track_id = context_->track_tracker->InternThreadTrack(utid);
+          context_->flow_tracker->Begin(track_id, correlation_id);
+          break;
+        }
+        case kFlowStep: {
+          uint64_t correlation_id;
+          if (!cursor.ReadUint64(&correlation_id)) {
+            context_->storage->IncrementStats(stats::fuchsia_invalid_event);
+            return;
+          }
+          UniqueTid utid =
+              procs->UpdateThread(static_cast<uint32_t>(tinfo.tid),
+                                  static_cast<uint32_t>(tinfo.pid));
+          TrackId track_id = context_->track_tracker->InternThreadTrack(utid);
+          context_->flow_tracker->Step(track_id, correlation_id);
+          break;
+        }
+        case kFlowEnd: {
+          uint64_t correlation_id;
+          if (!cursor.ReadUint64(&correlation_id)) {
+            context_->storage->IncrementStats(stats::fuchsia_invalid_event);
+            return;
+          }
+          UniqueTid utid =
+              procs->UpdateThread(static_cast<uint32_t>(tinfo.tid),
+                                  static_cast<uint32_t>(tinfo.pid));
+          TrackId track_id = context_->track_tracker->InternThreadTrack(utid);
+          context_->flow_tracker->End(track_id, correlation_id, true, true);
+          break;
+        }
       }
       break;
     }
diff --git a/test/trace_processor/fuchsia/fuchsia_smoke_flow.out b/test/trace_processor/fuchsia/fuchsia_smoke_flow.out
new file mode 100644
index 0000000..6726a35
--- /dev/null
+++ b/test/trace_processor/fuchsia/fuchsia_smoke_flow.out
@@ -0,0 +1,11 @@
+"id","slice_out","slice_in"
+0,0,1
+1,2,3
+2,4,5
+3,6,7
+4,8,9
+5,10,11
+6,12,13
+7,14,15
+8,16,17
+9,18,19
diff --git a/test/trace_processor/fuchsia/index b/test/trace_processor/fuchsia/index
index 538f588..9a1e503 100644
--- a/test/trace_processor/fuchsia/index
+++ b/test/trace_processor/fuchsia/index
@@ -5,6 +5,7 @@
 ../../data/fuchsia_trace.fxt ../common/smoke_slices.sql fuchsia_smoke_slices.out
 ../../data/fuchsia_trace.fxt smoke_instants.sql fuchsia_smoke_instants.out
 ../../data/fuchsia_trace.fxt smoke_counters.sql fuchsia_smoke_counters.out
+../../data/fuchsia_trace.fxt smoke_flow.sql fuchsia_smoke_flow.out
 
 # Smoke test a high-CPU trace.
 ../../data/fuchsia_workstation.fxt ../common/smoke_slices.sql fuchsia_workstation_smoke_slices.out
diff --git a/test/trace_processor/fuchsia/smoke_flow.sql b/test/trace_processor/fuchsia/smoke_flow.sql
new file mode 100644
index 0000000..e0b8af4
--- /dev/null
+++ b/test/trace_processor/fuchsia/smoke_flow.sql
@@ -0,0 +1,21 @@
+--
+-- Copyright 2020 The Android Open Source Project
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+--     https://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+select
+  id,
+  slice_out,
+  slice_in
+from flow
+limit 10;