tp: add rowdataframe class to easily pass many rows between C++/SQL

Change-Id: I4a572936e46f6ea692f5956aa3075b0a751fdd10
diff --git a/Android.bp b/Android.bp
index a28df2b..7bc9bd8 100644
--- a/Android.bp
+++ b/Android.bp
@@ -12909,12 +12909,10 @@
 filegroup {
     name: "perfetto_src_trace_processor_perfetto_sql_intrinsics_functions_functions",
     srcs: [
-        "src/trace_processor/perfetto_sql/intrinsics/functions/array.cc",
         "src/trace_processor/perfetto_sql/intrinsics/functions/base64.cc",
         "src/trace_processor/perfetto_sql/intrinsics/functions/create_function.cc",
         "src/trace_processor/perfetto_sql/intrinsics/functions/create_view_function.cc",
         "src/trace_processor/perfetto_sql/intrinsics/functions/dominator_tree.cc",
-        "src/trace_processor/perfetto_sql/intrinsics/functions/graph_helpers.cc",
         "src/trace_processor/perfetto_sql/intrinsics/functions/graph_traversal.cc",
         "src/trace_processor/perfetto_sql/intrinsics/functions/import.cc",
         "src/trace_processor/perfetto_sql/intrinsics/functions/layout_functions.cc",
@@ -12922,9 +12920,9 @@
         "src/trace_processor/perfetto_sql/intrinsics/functions/pprof_functions.cc",
         "src/trace_processor/perfetto_sql/intrinsics/functions/sqlite3_str_split.cc",
         "src/trace_processor/perfetto_sql/intrinsics/functions/stack_functions.cc",
-        "src/trace_processor/perfetto_sql/intrinsics/functions/struct.cc",
         "src/trace_processor/perfetto_sql/intrinsics/functions/structural_tree_partition.cc",
         "src/trace_processor/perfetto_sql/intrinsics/functions/to_ftrace.cc",
+        "src/trace_processor/perfetto_sql/intrinsics/functions/type_builders.cc",
     ],
 }
 
diff --git a/BUILD b/BUILD
index d682b2d..ad83cbe 100644
--- a/BUILD
+++ b/BUILD
@@ -2335,8 +2335,6 @@
 perfetto_filegroup(
     name = "src_trace_processor_perfetto_sql_intrinsics_functions_functions",
     srcs = [
-        "src/trace_processor/perfetto_sql/intrinsics/functions/array.cc",
-        "src/trace_processor/perfetto_sql/intrinsics/functions/array.h",
         "src/trace_processor/perfetto_sql/intrinsics/functions/base64.cc",
         "src/trace_processor/perfetto_sql/intrinsics/functions/base64.h",
         "src/trace_processor/perfetto_sql/intrinsics/functions/clock_functions.h",
@@ -2346,8 +2344,6 @@
         "src/trace_processor/perfetto_sql/intrinsics/functions/create_view_function.h",
         "src/trace_processor/perfetto_sql/intrinsics/functions/dominator_tree.cc",
         "src/trace_processor/perfetto_sql/intrinsics/functions/dominator_tree.h",
-        "src/trace_processor/perfetto_sql/intrinsics/functions/graph_helpers.cc",
-        "src/trace_processor/perfetto_sql/intrinsics/functions/graph_helpers.h",
         "src/trace_processor/perfetto_sql/intrinsics/functions/graph_traversal.cc",
         "src/trace_processor/perfetto_sql/intrinsics/functions/graph_traversal.h",
         "src/trace_processor/perfetto_sql/intrinsics/functions/import.cc",
@@ -2362,12 +2358,12 @@
         "src/trace_processor/perfetto_sql/intrinsics/functions/sqlite3_str_split.h",
         "src/trace_processor/perfetto_sql/intrinsics/functions/stack_functions.cc",
         "src/trace_processor/perfetto_sql/intrinsics/functions/stack_functions.h",
-        "src/trace_processor/perfetto_sql/intrinsics/functions/struct.cc",
-        "src/trace_processor/perfetto_sql/intrinsics/functions/struct.h",
         "src/trace_processor/perfetto_sql/intrinsics/functions/structural_tree_partition.cc",
         "src/trace_processor/perfetto_sql/intrinsics/functions/structural_tree_partition.h",
         "src/trace_processor/perfetto_sql/intrinsics/functions/to_ftrace.cc",
         "src/trace_processor/perfetto_sql/intrinsics/functions/to_ftrace.h",
+        "src/trace_processor/perfetto_sql/intrinsics/functions/type_builders.cc",
+        "src/trace_processor/perfetto_sql/intrinsics/functions/type_builders.h",
         "src/trace_processor/perfetto_sql/intrinsics/functions/utils.h",
         "src/trace_processor/perfetto_sql/intrinsics/functions/window_functions.h",
     ],
@@ -2472,6 +2468,7 @@
     srcs = [
         "src/trace_processor/perfetto_sql/intrinsics/types/array.h",
         "src/trace_processor/perfetto_sql/intrinsics/types/node.h",
+        "src/trace_processor/perfetto_sql/intrinsics/types/row_dataframe.h",
         "src/trace_processor/perfetto_sql/intrinsics/types/struct.h",
         "src/trace_processor/perfetto_sql/intrinsics/types/value.h",
     ],
diff --git a/src/trace_processor/perfetto_sql/intrinsics/functions/BUILD.gn b/src/trace_processor/perfetto_sql/intrinsics/functions/BUILD.gn
index 7d5c234..7f8dd9a 100644
--- a/src/trace_processor/perfetto_sql/intrinsics/functions/BUILD.gn
+++ b/src/trace_processor/perfetto_sql/intrinsics/functions/BUILD.gn
@@ -19,8 +19,6 @@
 
 source_set("functions") {
   sources = [
-    "array.cc",
-    "array.h",
     "base64.cc",
     "base64.h",
     "clock_functions.h",
@@ -30,8 +28,6 @@
     "create_view_function.h",
     "dominator_tree.cc",
     "dominator_tree.h",
-    "graph_helpers.cc",
-    "graph_helpers.h",
     "graph_traversal.cc",
     "graph_traversal.h",
     "import.cc",
@@ -46,12 +42,12 @@
     "sqlite3_str_split.h",
     "stack_functions.cc",
     "stack_functions.h",
-    "struct.cc",
-    "struct.h",
     "structural_tree_partition.cc",
     "structural_tree_partition.h",
     "to_ftrace.cc",
     "to_ftrace.h",
+    "type_builders.cc",
+    "type_builders.h",
     "utils.h",
     "window_functions.h",
   ]
diff --git a/src/trace_processor/perfetto_sql/intrinsics/functions/array.cc b/src/trace_processor/perfetto_sql/intrinsics/functions/array.cc
deleted file mode 100644
index 5639d3f..0000000
--- a/src/trace_processor/perfetto_sql/intrinsics/functions/array.cc
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Copyright (C) 2024 The Android Open Source Project
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "src/trace_processor/perfetto_sql/intrinsics/functions/array.h"
-
-#include <cstdint>
-#include <memory>
-#include <optional>
-#include <string>
-#include <utility>
-#include <variant>
-#include <vector>
-
-#include "perfetto/base/logging.h"
-#include "perfetto/base/status.h"
-#include "perfetto/public/compiler.h"
-#include "src/trace_processor/perfetto_sql/engine/function_util.h"
-#include "src/trace_processor/perfetto_sql/engine/perfetto_sql_engine.h"
-#include "src/trace_processor/perfetto_sql/intrinsics/types/array.h"
-#include "src/trace_processor/sqlite/bindings/sqlite_aggregate_function.h"
-#include "src/trace_processor/sqlite/bindings/sqlite_result.h"
-#include "src/trace_processor/sqlite/bindings/sqlite_type.h"
-#include "src/trace_processor/sqlite/bindings/sqlite_value.h"
-
-namespace perfetto::trace_processor {
-namespace {
-
-using Array = std::variant<perfetto_sql::IntArray,
-                           perfetto_sql::DoubleArray,
-                           perfetto_sql::StringArray>;
-
-struct AggCtx : SqliteAggregateContext<AggCtx> {
-  template <typename T>
-  void Push(sqlite3_context* ctx, T value) {
-    if (PERFETTO_UNLIKELY(!array)) {
-      array = std::vector<T>{std::move(value)};
-      return;
-    }
-    auto* a = std::get_if<std::vector<T>>(&*array);
-    if (!a) {
-      return sqlite::result::Error(
-          ctx, "ARRAY_AGG: all values must have the same type");
-    }
-    a->emplace_back(std::move(value));
-  }
-  template <typename T>
-  void Result(sqlite3_context* ctx, const char* type) {
-    auto res = std::make_unique<std::vector<T>>(
-        std::get<std::vector<T>>(std::move(*array)));
-    return sqlite::result::RawPointer(ctx, res.release(), type, [](void* ptr) {
-      std::unique_ptr<std::vector<T>>(static_cast<std::vector<T>*>(ptr));
-    });
-  }
-
-  std::optional<Array> array;
-};
-
-// An SQL aggregate-function which creates an array.
-struct ArrayAgg : public SqliteAggregateFunction<ArrayAgg> {
-  static constexpr char kName[] = "__intrinsic_array_agg";
-  static constexpr int kArgCount = 1;
-
-  static void Step(sqlite3_context*, int argc, sqlite3_value** argv);
-  static void Final(sqlite3_context* ctx);
-};
-
-void ArrayAgg::Step(sqlite3_context* ctx, int argc, sqlite3_value** argv) {
-  PERFETTO_DCHECK(argc == kArgCount);
-
-  auto& agg_ctx = AggCtx::GetOrCreateContextForStep(ctx);
-  switch (sqlite::value::Type(argv[0])) {
-    case sqlite::Type::kInteger:
-      return agg_ctx.Push(ctx, sqlite::value::Int64(argv[0]));
-    case sqlite::Type::kText:
-      return agg_ctx.Push<std::string>(ctx, sqlite::value::Text(argv[0]));
-    case sqlite::Type::kFloat:
-      return agg_ctx.Push(ctx, sqlite::value::Double(argv[0]));
-    case sqlite::Type::kNull:
-      return sqlite::result::Error(
-          ctx,
-          "ARRAY_AGG: nulls are not supported. They should be filtered out "
-          "before calling ARRAY_AGG.");
-    case sqlite::Type::kBlob:
-      return sqlite::result::Error(ctx, "ARRAY_AGG: blobs are not supported.");
-  }
-}
-
-void ArrayAgg::Final(sqlite3_context* ctx) {
-  auto raw_agg_ctx = AggCtx::GetContextOrNullForFinal(ctx);
-  if (!raw_agg_ctx) {
-    return sqlite::result::Null(ctx);
-  }
-
-  auto& array = *raw_agg_ctx.get()->array;
-  switch (array.index()) {
-    case 0 /* int64_t */:
-      return raw_agg_ctx.get()->Result<int64_t>(ctx, "ARRAY<LONG>");
-    case 1 /* double */:
-      return raw_agg_ctx.get()->Result<double>(ctx, "ARRAY<DOUBLE>");
-    case 2 /* std::string */:
-      return raw_agg_ctx.get()->Result<std::string>(ctx, "ARRAY<STRING>");
-    default:
-      PERFETTO_FATAL("%zu is not a valid index", array.index());
-  }
-}
-
-}  // namespace
-
-base::Status RegisterArrayFunctions(PerfettoSqlEngine& engine) {
-  return engine.RegisterSqliteAggregateFunction<ArrayAgg>(nullptr);
-}
-
-}  // namespace perfetto::trace_processor
diff --git a/src/trace_processor/perfetto_sql/intrinsics/functions/array.h b/src/trace_processor/perfetto_sql/intrinsics/functions/array.h
deleted file mode 100644
index 8afc13d..0000000
--- a/src/trace_processor/perfetto_sql/intrinsics/functions/array.h
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Copyright (C) 2024 The Android Open Source Project
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef SRC_TRACE_PROCESSOR_PERFETTO_SQL_INTRINSICS_FUNCTIONS_ARRAY_H_
-#define SRC_TRACE_PROCESSOR_PERFETTO_SQL_INTRINSICS_FUNCTIONS_ARRAY_H_
-
-#include "perfetto/base/status.h"
-
-namespace perfetto::trace_processor {
-
-class PerfettoSqlEngine;
-
-// Registers the following array related functions with SQLite:
-//  * __intrinsic_array_agg: an aggregate function which allows building
-//    arrays from a table.
-// TODO(lalitm): once we have some stability here, expand the comments
-// here.
-base::Status RegisterArrayFunctions(PerfettoSqlEngine& engine);
-
-}  // namespace perfetto::trace_processor
-
-#endif  // SRC_TRACE_PROCESSOR_PERFETTO_SQL_INTRINSICS_FUNCTIONS_ARRAY_H_
diff --git a/src/trace_processor/perfetto_sql/intrinsics/functions/graph_helpers.cc b/src/trace_processor/perfetto_sql/intrinsics/functions/graph_helpers.cc
deleted file mode 100644
index 7e6b576..0000000
--- a/src/trace_processor/perfetto_sql/intrinsics/functions/graph_helpers.cc
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Copyright (C) 2024 The Android Open Source Project
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "src/trace_processor/perfetto_sql/intrinsics/functions/graph_helpers.h"
-
-#include <algorithm>
-#include <cstddef>
-#include <cstdint>
-#include <memory>
-#include <utility>
-#include <vector>
-
-#include "perfetto/base/logging.h"
-#include "perfetto/base/status.h"
-#include "src/trace_processor/containers/string_pool.h"
-#include "src/trace_processor/perfetto_sql/engine/perfetto_sql_engine.h"
-#include "src/trace_processor/perfetto_sql/intrinsics/types/node.h"
-#include "src/trace_processor/sqlite/bindings/sqlite_aggregate_function.h"
-#include "src/trace_processor/sqlite/bindings/sqlite_result.h"
-#include "src/trace_processor/sqlite/bindings/sqlite_value.h"
-
-namespace perfetto::trace_processor {
-namespace {
-
-struct AggCtx : SqliteAggregateContext<AggCtx> {
-  perfetto_sql::Graph graph;
-};
-
-struct NodeAgg : public SqliteAggregateFunction<NodeAgg> {
-  static constexpr char kName[] = "__intrinsic_graph_agg";
-  static constexpr int kArgCount = 2;
-  using UserDataContext = void;
-
-  static void Step(sqlite3_context*, int argc, sqlite3_value** argv);
-  static void Final(sqlite3_context* ctx);
-};
-
-void NodeAgg::Step(sqlite3_context* ctx, int argc, sqlite3_value** argv) {
-  PERFETTO_DCHECK(argc == kArgCount);
-
-  auto source_id = static_cast<uint32_t>(sqlite::value::Int64(argv[0]));
-  auto target_id = static_cast<uint32_t>(sqlite::value::Int64(argv[1]));
-  uint32_t max_id = std::max(source_id, target_id);
-  auto& agg_ctx = AggCtx::GetOrCreateContextForStep(ctx);
-  if (max_id >= agg_ctx.graph.size()) {
-    agg_ctx.graph.resize(max_id + 1);
-  }
-  agg_ctx.graph[source_id].outgoing_edges.push_back(target_id);
-}
-
-void NodeAgg::Final(sqlite3_context* ctx) {
-  auto raw_agg_ctx = AggCtx::GetContextOrNullForFinal(ctx);
-  if (!raw_agg_ctx.get()) {
-    return;
-  }
-  auto nodes = std::make_unique<perfetto_sql::Graph>(
-      std::move(raw_agg_ctx.get()->graph));
-  return sqlite::result::UniquePointer(ctx, std::move(nodes), "GRAPH");
-}
-
-}  // namespace
-
-base::Status RegisterGraphHelperFunctions(PerfettoSqlEngine& engine,
-                                          StringPool& pool) {
-  return engine.RegisterSqliteAggregateFunction<NodeAgg>(&pool);
-}
-
-}  // namespace perfetto::trace_processor
diff --git a/src/trace_processor/perfetto_sql/intrinsics/functions/graph_helpers.h b/src/trace_processor/perfetto_sql/intrinsics/functions/graph_helpers.h
deleted file mode 100644
index e958dce..0000000
--- a/src/trace_processor/perfetto_sql/intrinsics/functions/graph_helpers.h
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Copyright (C) 2024 The Android Open Source Project
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef SRC_TRACE_PROCESSOR_PERFETTO_SQL_INTRINSICS_FUNCTIONS_GRAPH_HELPERS_H_
-#define SRC_TRACE_PROCESSOR_PERFETTO_SQL_INTRINSICS_FUNCTIONS_GRAPH_HELPERS_H_
-
-#include "perfetto/base/status.h"
-
-namespace perfetto::trace_processor {
-
-class PerfettoSqlEngine;
-class StringPool;
-
-// Registers the following array related functions with SQLite:
-//  * __intrinsic_node_agg: an aggregate function which groups together
-//    the edges of a graph corresponding to a node.
-// TODO(lalitm): once we have some stability here, expand the comments
-// here.
-base::Status RegisterGraphHelperFunctions(PerfettoSqlEngine& engine,
-                                          StringPool& string_pool);
-
-}  // namespace perfetto::trace_processor
-
-#endif  // SRC_TRACE_PROCESSOR_PERFETTO_SQL_INTRINSICS_FUNCTIONS_GRAPH_HELPERS_H_
diff --git a/src/trace_processor/perfetto_sql/intrinsics/functions/struct.cc b/src/trace_processor/perfetto_sql/intrinsics/functions/struct.cc
deleted file mode 100644
index 33dcde3..0000000
--- a/src/trace_processor/perfetto_sql/intrinsics/functions/struct.cc
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Copyright (C) 2024 The Android Open Source Project
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "src/trace_processor/perfetto_sql/intrinsics/functions/struct.h"
-
-#include <cstddef>
-#include <cstdint>
-#include <cstdlib>
-#include <cstring>
-#include <memory>
-#include <variant>
-
-#include "perfetto/base/status.h"
-#include "src/trace_processor/perfetto_sql/engine/function_util.h"
-#include "src/trace_processor/perfetto_sql/engine/perfetto_sql_engine.h"
-#include "src/trace_processor/perfetto_sql/intrinsics/types/struct.h"
-#include "src/trace_processor/sqlite/bindings/sqlite_function.h"
-#include "src/trace_processor/sqlite/bindings/sqlite_result.h"
-#include "src/trace_processor/sqlite/bindings/sqlite_type.h"
-#include "src/trace_processor/sqlite/bindings/sqlite_value.h"
-#include "src/trace_processor/sqlite/sqlite_utils.h"
-
-namespace perfetto::trace_processor {
-namespace {
-
-// An SQL scalar function which creates an struct.
-// TODO(lalitm): once we have some stability here, expand the comments
-// here.
-struct Struct : public SqliteFunction<Struct> {
-  static constexpr char kName[] = "__intrinsic_struct";
-  static constexpr int kArgCount = -1;
-
-  static void Step(sqlite3_context*, int argc, sqlite3_value** argv);
-};
-
-void Struct::Step(sqlite3_context* ctx, int rargc, sqlite3_value** argv) {
-  auto argc = static_cast<uint32_t>(rargc);
-  if (argc % 2 != 0) {
-    return sqlite::result::Error(
-        ctx, "STRUCT: must have an even number of arguments");
-  }
-  if (argc / 2 > perfetto_sql::Struct::kMaxFields) {
-    return sqlite::utils::SetError(
-        ctx, base::ErrStatus("STRUCT: only at most %d fields are supported",
-                             perfetto_sql::Struct::kMaxFields));
-  }
-
-  auto s = std::make_unique<perfetto_sql::Struct>();
-  s->field_count = argc / 2;
-  for (uint32_t i = 0; i < s->field_count; ++i) {
-    if (sqlite::value::Type(argv[i]) != sqlite::Type::kText) {
-      return sqlite::result::Error(ctx, "STRUCT: field names must be strings");
-    }
-    auto& field = s->fields[i];
-    field.first = sqlite::value::Text(argv[i]);
-    switch (sqlite::value::Type(argv[s->field_count + i])) {
-      case sqlite::Type::kText:
-        field.second = sqlite::value::Text(argv[s->field_count + i]);
-        break;
-      case sqlite::Type::kInteger:
-        field.second = sqlite::value::Int64(argv[s->field_count + i]);
-        break;
-      case sqlite::Type::kFloat:
-        field.second = sqlite::value::Double(argv[s->field_count + i]);
-        break;
-      case sqlite::Type::kNull:
-        field.second = std::monostate();
-        break;
-      case sqlite::Type::kBlob:
-        return sqlite::result::Error(ctx, "STRUCT: blob fields not supported");
-    }
-  }
-  sqlite::result::RawPointer(ctx, s.release(), "STRUCT", [](void* ptr) {
-    std::unique_ptr<perfetto_sql::Struct>(
-        static_cast<perfetto_sql::Struct*>(ptr));
-  });
-}
-
-}  // namespace
-
-base::Status RegisterStructFunctions(PerfettoSqlEngine& engine) {
-  return engine.RegisterSqliteFunction<Struct>(nullptr);
-}
-
-}  // namespace perfetto::trace_processor
diff --git a/src/trace_processor/perfetto_sql/intrinsics/functions/type_builders.cc b/src/trace_processor/perfetto_sql/intrinsics/functions/type_builders.cc
new file mode 100644
index 0000000..bd99fa3
--- /dev/null
+++ b/src/trace_processor/perfetto_sql/intrinsics/functions/type_builders.cc
@@ -0,0 +1,283 @@
+/*
+ * Copyright (C) 2024 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "src/trace_processor/perfetto_sql/intrinsics/functions/type_builders.h"
+
+#include <algorithm>
+#include <cstddef>
+#include <cstdint>
+#include <limits>
+#include <memory>
+#include <optional>
+#include <string>
+#include <utility>
+#include <variant>
+#include <vector>
+
+#include "perfetto/base/logging.h"
+#include "perfetto/base/status.h"
+#include "perfetto/public/compiler.h"
+#include "src/trace_processor/perfetto_sql/engine/perfetto_sql_engine.h"
+#include "src/trace_processor/perfetto_sql/intrinsics/types/array.h"
+#include "src/trace_processor/perfetto_sql/intrinsics/types/node.h"
+#include "src/trace_processor/perfetto_sql/intrinsics/types/row_dataframe.h"
+#include "src/trace_processor/perfetto_sql/intrinsics/types/struct.h"
+#include "src/trace_processor/sqlite/bindings/sqlite_aggregate_function.h"
+#include "src/trace_processor/sqlite/bindings/sqlite_function.h"
+#include "src/trace_processor/sqlite/bindings/sqlite_result.h"
+#include "src/trace_processor/sqlite/bindings/sqlite_type.h"
+#include "src/trace_processor/sqlite/bindings/sqlite_value.h"
+#include "src/trace_processor/sqlite/sqlite_utils.h"
+#include "src/trace_processor/util/status_macros.h"
+
+namespace perfetto::trace_processor {
+namespace {
+
+using Array = std::variant<perfetto_sql::IntArray,
+                           perfetto_sql::DoubleArray,
+                           perfetto_sql::StringArray>;
+
+// An SQL aggregate-function which creates an array.
+struct ArrayAgg : public SqliteAggregateFunction<ArrayAgg> {
+  static constexpr char kName[] = "__intrinsic_array_agg";
+  static constexpr int kArgCount = 1;
+  struct AggCtx : SqliteAggregateContext<AggCtx> {
+    template <typename T>
+    void Push(sqlite3_context* ctx, T value) {
+      if (PERFETTO_UNLIKELY(!array)) {
+        array = std::vector<T>{std::move(value)};
+        return;
+      }
+      auto* a = std::get_if<std::vector<T>>(&*array);
+      if (!a) {
+        return sqlite::result::Error(
+            ctx, "ARRAY_AGG: all values must have the same type");
+      }
+      a->emplace_back(std::move(value));
+    }
+    template <typename T>
+    void Result(sqlite3_context* ctx, const char* type) {
+      auto res = std::make_unique<std::vector<T>>(
+          std::get<std::vector<T>>(std::move(*array)));
+      return sqlite::result::UniquePointer(ctx, std::move(res), type);
+    }
+
+    std::optional<Array> array;
+  };
+
+  static void Step(sqlite3_context* ctx, int argc, sqlite3_value** argv) {
+    PERFETTO_DCHECK(argc == kArgCount);
+
+    auto& agg_ctx = AggCtx::GetOrCreateContextForStep(ctx);
+    switch (sqlite::value::Type(argv[0])) {
+      case sqlite::Type::kInteger:
+        return agg_ctx.Push(ctx, sqlite::value::Int64(argv[0]));
+      case sqlite::Type::kText:
+        return agg_ctx.Push<std::string>(ctx, sqlite::value::Text(argv[0]));
+      case sqlite::Type::kFloat:
+        return agg_ctx.Push(ctx, sqlite::value::Double(argv[0]));
+      case sqlite::Type::kNull:
+        return sqlite::result::Error(
+            ctx,
+            "ARRAY_AGG: nulls are not supported. They should be filtered out "
+            "before calling ARRAY_AGG.");
+      case sqlite::Type::kBlob:
+        return sqlite::result::Error(ctx,
+                                     "ARRAY_AGG: blobs are not supported.");
+    }
+  }
+  static void Final(sqlite3_context* ctx) {
+    auto raw_agg_ctx = AggCtx::GetContextOrNullForFinal(ctx);
+    if (!raw_agg_ctx) {
+      return sqlite::result::Null(ctx);
+    }
+
+    auto& array = *raw_agg_ctx.get()->array;
+    switch (array.index()) {
+      case 0 /* int64_t */:
+        return raw_agg_ctx.get()->Result<int64_t>(ctx, "ARRAY<LONG>");
+      case 1 /* double */:
+        return raw_agg_ctx.get()->Result<double>(ctx, "ARRAY<DOUBLE>");
+      case 2 /* std::string */:
+        return raw_agg_ctx.get()->Result<std::string>(ctx, "ARRAY<STRING>");
+      default:
+        PERFETTO_FATAL("%zu is not a valid index", array.index());
+    }
+  }
+};
+
+// An SQL aggregate function which creates a graph.
+struct NodeAgg : public SqliteAggregateFunction<NodeAgg> {
+  static constexpr char kName[] = "__intrinsic_graph_agg";
+  static constexpr int kArgCount = 2;
+  struct AggCtx : SqliteAggregateContext<AggCtx> {
+    perfetto_sql::Graph graph;
+  };
+
+  static void Step(sqlite3_context* ctx, int argc, sqlite3_value** argv) {
+    PERFETTO_DCHECK(argc == kArgCount);
+
+    auto source_id = static_cast<uint32_t>(sqlite::value::Int64(argv[0]));
+    auto target_id = static_cast<uint32_t>(sqlite::value::Int64(argv[1]));
+    uint32_t max_id = std::max(source_id, target_id);
+    auto& agg_ctx = AggCtx::GetOrCreateContextForStep(ctx);
+    if (max_id >= agg_ctx.graph.size()) {
+      agg_ctx.graph.resize(max_id + 1);
+    }
+    agg_ctx.graph[source_id].outgoing_edges.push_back(target_id);
+  }
+  static void Final(sqlite3_context* ctx) {
+    auto raw_agg_ctx = AggCtx::GetContextOrNullForFinal(ctx);
+    if (!raw_agg_ctx.get()) {
+      return;
+    }
+    auto nodes = std::make_unique<perfetto_sql::Graph>(
+        std::move(raw_agg_ctx.get()->graph));
+    return sqlite::result::UniquePointer(ctx, std::move(nodes), "GRAPH");
+  }
+};
+
+// An SQL scalar function which creates an struct.
+struct Struct : public SqliteFunction<Struct> {
+  static constexpr char kName[] = "__intrinsic_struct";
+  static constexpr int kArgCount = -1;
+
+  static void Step(sqlite3_context* ctx, int rargc, sqlite3_value** argv) {
+    auto argc = static_cast<uint32_t>(rargc);
+    if (argc % 2 != 0) {
+      return sqlite::result::Error(
+          ctx, "STRUCT: must have an even number of arguments");
+    }
+    if (argc / 2 > perfetto_sql::Struct::kMaxFields) {
+      return sqlite::utils::SetError(
+          ctx, base::ErrStatus("STRUCT: only at most %d fields are supported",
+                               perfetto_sql::Struct::kMaxFields));
+    }
+
+    auto s = std::make_unique<perfetto_sql::Struct>();
+    s->field_count = argc / 2;
+    for (uint32_t i = 0; i < s->field_count; ++i) {
+      if (sqlite::value::Type(argv[i]) != sqlite::Type::kText) {
+        return sqlite::result::Error(ctx,
+                                     "STRUCT: field names must be strings");
+      }
+      auto& field = s->fields[i];
+      field.first = sqlite::value::Text(argv[i]);
+      switch (sqlite::value::Type(argv[s->field_count + i])) {
+        case sqlite::Type::kText:
+          field.second = sqlite::value::Text(argv[s->field_count + i]);
+          break;
+        case sqlite::Type::kInteger:
+          field.second = sqlite::value::Int64(argv[s->field_count + i]);
+          break;
+        case sqlite::Type::kFloat:
+          field.second = sqlite::value::Double(argv[s->field_count + i]);
+          break;
+        case sqlite::Type::kNull:
+          field.second = std::monostate();
+          break;
+        case sqlite::Type::kBlob:
+          return sqlite::result::Error(ctx,
+                                       "STRUCT: blob fields not supported");
+      }
+    }
+    return sqlite::result::UniquePointer(ctx, std::move(s), "STRUCT");
+  }
+};
+
+// An SQL aggregate function which creates a RowDataframe.
+struct RowDataframeAgg : public SqliteAggregateFunction<Struct> {
+  static constexpr char kName[] = "__intrinsic_row_dataframe_agg";
+  static constexpr int kArgCount = -1;
+  struct AggCtx : SqliteAggregateContext<AggCtx> {
+    perfetto_sql::RowDataframe dataframe;
+    std::optional<uint32_t> argc_index;
+  };
+
+  static void Step(sqlite3_context* ctx, int rargc, sqlite3_value** argv) {
+    auto argc = static_cast<uint32_t>(rargc);
+    if (argc % 2 != 0) {
+      return sqlite::result::Error(
+          ctx, "ROW_DATAFRAME_AGG: must have an even number of arguments");
+    }
+
+    auto& agg_ctx = AggCtx::GetOrCreateContextForStep(ctx);
+    auto& df = agg_ctx.dataframe;
+    if (df.column_names.empty()) {
+      for (uint32_t i = 0; i < argc; i += 2) {
+        df.column_names.emplace_back(sqlite::value::Text(argv[i]));
+        if (df.column_names.back() == "id") {
+          df.id_column_index = i / 2;
+          agg_ctx.argc_index = i + 1;
+        }
+      }
+    }
+
+    if (agg_ctx.argc_index) {
+      auto id = static_cast<uint32_t>(
+          sqlite::value::Int64(argv[*agg_ctx.argc_index]));
+      if (id >= df.id_to_cell_index.size()) {
+        df.id_to_cell_index.resize(id + 1,
+                                   std::numeric_limits<uint32_t>::max());
+      }
+      df.id_to_cell_index[id] = static_cast<uint32_t>(df.cells.size());
+    }
+
+    for (uint32_t i = 1; i < argc; i += 2) {
+      switch (sqlite::value::Type(argv[i])) {
+        case sqlite::Type::kText:
+          df.cells.emplace_back(sqlite::value::Text(argv[i]));
+          break;
+        case sqlite::Type::kInteger:
+          df.cells.emplace_back(sqlite::value::Int64(argv[i]));
+          break;
+        case sqlite::Type::kFloat:
+          df.cells.emplace_back(sqlite::value::Double(argv[i]));
+          break;
+        case sqlite::Type::kNull:
+          df.cells.emplace_back(std::monostate());
+          break;
+        case sqlite::Type::kBlob:
+          return sqlite::result::Error(
+              ctx, "ROW_DATAFRAME_AGG: blob fields not supported");
+      }
+    }
+  }
+
+  static void Final(sqlite3_context* ctx) {
+    auto raw_agg_ctx = AggCtx::GetContextOrNullForFinal(ctx);
+    if (!raw_agg_ctx) {
+      return sqlite::result::Null(ctx);
+    }
+    return sqlite::result::UniquePointer(
+        ctx,
+        std::make_unique<perfetto_sql::RowDataframe>(
+            std::move(raw_agg_ctx.get()->dataframe)),
+        "ROW_DATAFRAME");
+  }
+};
+
+}  // namespace
+
+base::Status RegisterTypeBuilderFunctions(PerfettoSqlEngine& engine) {
+  RETURN_IF_ERROR(engine.RegisterSqliteAggregateFunction<ArrayAgg>(nullptr));
+  RETURN_IF_ERROR(engine.RegisterSqliteFunction<Struct>(nullptr));
+  RETURN_IF_ERROR(
+      engine.RegisterSqliteAggregateFunction<RowDataframeAgg>(nullptr));
+  return engine.RegisterSqliteAggregateFunction<NodeAgg>(nullptr);
+}
+
+}  // namespace perfetto::trace_processor
diff --git a/src/trace_processor/perfetto_sql/intrinsics/functions/struct.h b/src/trace_processor/perfetto_sql/intrinsics/functions/type_builders.h
similarity index 61%
rename from src/trace_processor/perfetto_sql/intrinsics/functions/struct.h
rename to src/trace_processor/perfetto_sql/intrinsics/functions/type_builders.h
index 0c5f49d..a357bc4 100644
--- a/src/trace_processor/perfetto_sql/intrinsics/functions/struct.h
+++ b/src/trace_processor/perfetto_sql/intrinsics/functions/type_builders.h
@@ -14,22 +14,28 @@
  * limitations under the License.
  */
 
-#ifndef SRC_TRACE_PROCESSOR_PERFETTO_SQL_INTRINSICS_FUNCTIONS_STRUCT_H_
-#define SRC_TRACE_PROCESSOR_PERFETTO_SQL_INTRINSICS_FUNCTIONS_STRUCT_H_
+#ifndef SRC_TRACE_PROCESSOR_PERFETTO_SQL_INTRINSICS_FUNCTIONS_TYPE_BUILDERS_H_
+#define SRC_TRACE_PROCESSOR_PERFETTO_SQL_INTRINSICS_FUNCTIONS_TYPE_BUILDERS_H_
 
 #include "perfetto/base/status.h"
 
 namespace perfetto::trace_processor {
 
 class PerfettoSqlEngine;
+class StringPool;
 
-// Registers the following struct related functions with SQLite:
+// Registers the following PerfettoSQL type related functions with SQLite:
+//  * __intrinsic_graph_agg: an aggregate function which builds a graph.
+//  * __intrinsic_array_agg: an aggregate function which allows building
+//    arrays from tables.
 //  * __intrinsic_struct: a scalar function which allows creating a
 //    struct from its component fields.
+//  * __intrinsic_row_dataframe_agg: an aggregate function which
+//    creates a data structure allowing efficient lookups of rows by id.
 // TODO(lalitm): once we have some stability here, expand the comments
 // here.
-base::Status RegisterStructFunctions(PerfettoSqlEngine& engine);
+base::Status RegisterTypeBuilderFunctions(PerfettoSqlEngine& engine);
 
 }  // namespace perfetto::trace_processor
 
-#endif  // SRC_TRACE_PROCESSOR_PERFETTO_SQL_INTRINSICS_FUNCTIONS_STRUCT_H_
+#endif  // SRC_TRACE_PROCESSOR_PERFETTO_SQL_INTRINSICS_FUNCTIONS_TYPE_BUILDERS_H_
diff --git a/src/trace_processor/perfetto_sql/intrinsics/types/BUILD.gn b/src/trace_processor/perfetto_sql/intrinsics/types/BUILD.gn
index c6f95f6..0993dfe 100644
--- a/src/trace_processor/perfetto_sql/intrinsics/types/BUILD.gn
+++ b/src/trace_processor/perfetto_sql/intrinsics/types/BUILD.gn
@@ -21,6 +21,7 @@
   sources = [
     "array.h",
     "node.h",
+    "row_dataframe.h",
     "struct.h",
     "value.h",
   ]
diff --git a/src/trace_processor/perfetto_sql/intrinsics/types/row_dataframe.h b/src/trace_processor/perfetto_sql/intrinsics/types/row_dataframe.h
new file mode 100644
index 0000000..43f546c
--- /dev/null
+++ b/src/trace_processor/perfetto_sql/intrinsics/types/row_dataframe.h
@@ -0,0 +1,58 @@
+/*
+ * Copyright (C) 2024 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef SRC_TRACE_PROCESSOR_PERFETTO_SQL_INTRINSICS_TYPES_ROW_DATAFRAME_H_
+#define SRC_TRACE_PROCESSOR_PERFETTO_SQL_INTRINSICS_TYPES_ROW_DATAFRAME_H_
+
+#include <algorithm>
+#include <cstdint>
+#include <optional>
+#include <string>
+#include <vector>
+
+#include "src/trace_processor/perfetto_sql/intrinsics/types/array.h"
+#include "src/trace_processor/perfetto_sql/intrinsics/types/value.h"
+
+namespace perfetto::trace_processor::perfetto_sql {
+
+// Data structure to allow easy exchange of "table-like" data between SQL and
+// C++ code. Allows fast lookup of rows by id (if an id column exists).
+struct RowDataframe {
+  perfetto_sql::StringArray column_names;
+  std::vector<uint32_t> id_to_cell_index;
+  // Cell = a value at a row + column index.
+  std::vector<perfetto_sql::Value> cells;
+  std::optional<uint32_t> id_column_index;
+
+  const perfetto_sql::Value* RowForId(uint32_t id) const {
+    return cells.data() + id_to_cell_index[id];
+  }
+
+  std::optional<uint32_t> FindColumnWithName(const std::string& name) {
+    auto it = std::find(column_names.begin(), column_names.end(), name);
+    return it == column_names.end() ? std::nullopt
+                                    : std::make_optional(static_cast<uint32_t>(
+                                          it - column_names.begin()));
+  }
+
+  uint32_t size() const {
+    return static_cast<uint32_t>(cells.size() / column_names.size());
+  }
+};
+
+}  // namespace perfetto::trace_processor::perfetto_sql
+
+#endif  // SRC_TRACE_PROCESSOR_PERFETTO_SQL_INTRINSICS_TYPES_ROW_DATAFRAME_H_
diff --git a/src/trace_processor/trace_processor_impl.cc b/src/trace_processor/trace_processor_impl.cc
index 6aba335..b80b7b9 100644
--- a/src/trace_processor/trace_processor_impl.cc
+++ b/src/trace_processor/trace_processor_impl.cc
@@ -68,13 +68,11 @@
 #include "src/trace_processor/metrics/sql/amalgamated_sql_metrics.h"
 #include "src/trace_processor/perfetto_sql/engine/perfetto_sql_engine.h"
 #include "src/trace_processor/perfetto_sql/engine/table_pointer_module.h"
-#include "src/trace_processor/perfetto_sql/intrinsics/functions/array.h"
 #include "src/trace_processor/perfetto_sql/intrinsics/functions/base64.h"
 #include "src/trace_processor/perfetto_sql/intrinsics/functions/clock_functions.h"
 #include "src/trace_processor/perfetto_sql/intrinsics/functions/create_function.h"
 #include "src/trace_processor/perfetto_sql/intrinsics/functions/create_view_function.h"
 #include "src/trace_processor/perfetto_sql/intrinsics/functions/dominator_tree.h"
-#include "src/trace_processor/perfetto_sql/intrinsics/functions/graph_helpers.h"
 #include "src/trace_processor/perfetto_sql/intrinsics/functions/graph_traversal.h"
 #include "src/trace_processor/perfetto_sql/intrinsics/functions/import.h"
 #include "src/trace_processor/perfetto_sql/intrinsics/functions/layout_functions.h"
@@ -82,9 +80,9 @@
 #include "src/trace_processor/perfetto_sql/intrinsics/functions/pprof_functions.h"
 #include "src/trace_processor/perfetto_sql/intrinsics/functions/sqlite3_str_split.h"
 #include "src/trace_processor/perfetto_sql/intrinsics/functions/stack_functions.h"
-#include "src/trace_processor/perfetto_sql/intrinsics/functions/struct.h"
 #include "src/trace_processor/perfetto_sql/intrinsics/functions/structural_tree_partition.h"
 #include "src/trace_processor/perfetto_sql/intrinsics/functions/to_ftrace.h"
+#include "src/trace_processor/perfetto_sql/intrinsics/functions/type_builders.h"
 #include "src/trace_processor/perfetto_sql/intrinsics/functions/utils.h"
 #include "src/trace_processor/perfetto_sql/intrinsics/functions/window_functions.h"
 #include "src/trace_processor/perfetto_sql/intrinsics/operators/counter_mipmap_operator.h"
@@ -751,12 +749,7 @@
       PERFETTO_FATAL("%s", status.c_message());
   }
   {
-    base::Status status = RegisterArrayFunctions(*engine_);
-    if (!status.ok())
-      PERFETTO_FATAL("%s", status.c_message());
-  }
-  {
-    base::Status status = RegisterStructFunctions(*engine_);
+    base::Status status = RegisterTypeBuilderFunctions(*engine_);
     if (!status.ok())
       PERFETTO_FATAL("%s", status.c_message());
   }
@@ -766,12 +759,6 @@
     if (!status.ok())
       PERFETTO_FATAL("%s", status.c_message());
   }
-  {
-    base::Status status = RegisterGraphHelperFunctions(
-        *engine_, *context_.storage->mutable_string_pool());
-    if (!status.ok())
-      PERFETTO_FATAL("%s", status.c_message());
-  }
 
   TraceStorage* storage = context_.storage.get();