Merge changes I729e7762,I9ac46f5f into main

* changes:
  tp: implement structs in trace processor
  tp: implement array_agg aggregate function
diff --git a/src/trace_processor/containers/row_map.cc b/src/trace_processor/containers/row_map.cc
index 3ae6ee7..5855b44 100644
--- a/src/trace_processor/containers/row_map.cc
+++ b/src/trace_processor/containers/row_map.cc
@@ -153,11 +153,14 @@
   return RowMap(row_map_algorithms::SelectIvWithIv(iv, selector));
 }
 
+// O(N), but 64 times faster than doing it bit by bit, as we compare words in
+// BitVectors.
 Variant IntersectInternal(BitVector& first, const BitVector& second) {
   first.And(second);
   return std::move(first);
 }
 
+// O(1) complexity.
 Variant IntersectInternal(Range first, Range second) {
   // If both RowMaps have ranges, we can just take the smallest intersection
   // of them as the new RowMap.
@@ -168,6 +171,8 @@
   return Range{start, end};
 }
 
+// O(N + k) complexity, where N is the size of |second| and k is the number of
+// elements that have to be removed from |first|.
 Variant IntersectInternal(std::vector<OutputIndex>& first,
                           const std::vector<OutputIndex>& second) {
   std::unordered_set<OutputIndex> lookup(second.begin(), second.end());
@@ -179,6 +184,7 @@
   return std::move(first);
 }
 
+// O(1) complexity.
 Variant IntersectInternal(Range range, const BitVector& bv) {
   return bv.IntersectRange(range.start, range.end);
 }
diff --git a/src/trace_processor/containers/row_map.h b/src/trace_processor/containers/row_map.h
index 88fe7a1..4443308 100644
--- a/src/trace_processor/containers/row_map.h
+++ b/src/trace_processor/containers/row_map.h
@@ -360,16 +360,8 @@
     return SelectRowsSlow(selector);
   }
 
-  // Intersects the range [start_index, end_index) with |this| writing the
-  // result into |this|. By "intersect", we mean to keep only the indices
-  // present in both this RowMap and in the Range [start_index, end_index). The
-  // order of the preserved indices will be the same as |this|.
-  //
-  // Conceptually, we are performing the following algorithm:
-  // for (idx : this)
-  //   if (start_index <= idx && idx < end_index)
-  //     continue;
-  //   Remove(idx)
+  // Intersects |this| with |second| independent of underlying structure of both
+  // RowMaps. Modifies |this| to only contain indices present in |second|.
   void Intersect(const RowMap& second);
 
   // Intersects this RowMap with |index|. If this RowMap contained |index|, then
diff --git a/src/trace_processor/perfetto_sql/engine/perfetto_sql_engine.cc b/src/trace_processor/perfetto_sql/engine/perfetto_sql_engine.cc
index 3048e68..81c7164 100644
--- a/src/trace_processor/perfetto_sql/engine/perfetto_sql_engine.cc
+++ b/src/trace_processor/perfetto_sql/engine/perfetto_sql_engine.cc
@@ -30,6 +30,7 @@
 #include <variant>
 #include <vector>
 
+#include "perfetto/base/compiler.h"
 #include "perfetto/base/logging.h"
 #include "perfetto/base/status.h"
 #include "perfetto/ext/base/flat_hash_map.h"
@@ -313,6 +314,12 @@
       auto sql = macro->sql;
       RETURN_IF_ERROR(ExecuteCreateMacro(*macro));
       source = RewriteToDummySql(sql);
+    } else if (auto* index = std::get_if<PerfettoSqlParser::CreateIndex>(
+                   &parser.statement())) {
+      // TODO(mayzner): Enable.
+      base::ignore_result(index);
+      return base::ErrStatus("CREATE PERFETTO INDEX not implemented");
+      // source = RewriteToDummySql(parser.statement_sql());
     } else {
       // If none of the above matched, this must just be an SQL statement
       // directly executable by SQLite.
diff --git a/src/trace_processor/perfetto_sql/engine/perfetto_sql_parser.cc b/src/trace_processor/perfetto_sql/engine/perfetto_sql_parser.cc
index a246c6a..20be21f 100644
--- a/src/trace_processor/perfetto_sql/engine/perfetto_sql_parser.cc
+++ b/src/trace_processor/perfetto_sql/engine/perfetto_sql_parser.cc
@@ -24,6 +24,7 @@
 #include <utility>
 #include <vector>
 
+#include "perfetto/base/compiler.h"
 #include "perfetto/base/logging.h"
 #include "perfetto/base/status.h"
 #include "perfetto/ext/base/flat_hash_map.h"
@@ -213,6 +214,9 @@
         if (TokenIsCustomKeyword("macro", token)) {
           return ParseCreatePerfettoMacro(replace);
         }
+        if (TokenIsSqliteKeyword("index", token)) {
+          return ParseCreatePerfettoIndex(replace, *first_non_space_token);
+        }
         base::StackString<1024> err(
             "Expected 'FUNCTION', 'TABLE' or 'MACRO' after 'CREATE PERFETTO', "
             "received '%*s'.",
@@ -298,6 +302,66 @@
   return true;
 }
 
+bool PerfettoSqlParser::ParseCreatePerfettoIndex(bool replace,
+                                                 Token first_non_space_token) {
+  base::ignore_result(replace, first_non_space_token);
+  Token index_name_tok = tokenizer_.NextNonWhitespace();
+  if (index_name_tok.token_type != SqliteTokenType::TK_ID) {
+    base::StackString<1024> err("Invalid index name %.*s",
+                                static_cast<int>(index_name_tok.str.size()),
+                                index_name_tok.str.data());
+    return ErrorAtToken(index_name_tok, err.c_str());
+  }
+  std::string index_name(index_name_tok.str);
+
+  auto token = tokenizer_.NextNonWhitespace();
+  if (!TokenIsSqliteKeyword("on", token)) {
+    base::StackString<1024> err("Expected 'ON' after index name, received %*s.",
+                                static_cast<int>(token.str.size()),
+                                token.str.data());
+    return ErrorAtToken(token, err.c_str());
+  }
+
+  Token table_name_tok = tokenizer_.NextNonWhitespace();
+  if (table_name_tok.token_type != SqliteTokenType::TK_ID) {
+    base::StackString<1024> err("Invalid table name %.*s",
+                                static_cast<int>(table_name_tok.str.size()),
+                                table_name_tok.str.data());
+    return ErrorAtToken(table_name_tok, err.c_str());
+  }
+  std::string table_name(table_name_tok.str);
+
+  token = tokenizer_.NextNonWhitespace();
+  if (token.token_type != SqliteTokenType::TK_LP) {
+    base::StackString<1024> err(
+        "Expected parenthesis after table name, received %*s.",
+        static_cast<int>(token.str.size()), token.str.data());
+    return ErrorAtToken(token, err.c_str());
+  }
+
+  Token col_name_tok = tokenizer_.NextNonWhitespace();
+  if (col_name_tok.token_type != SqliteTokenType::TK_ID) {
+    base::StackString<1024> err("Invalid column name %.*s",
+                                static_cast<int>(col_name_tok.str.size()),
+                                col_name_tok.str.data());
+    return ErrorAtToken(col_name_tok, err.c_str());
+  }
+  std::string col_name(col_name_tok.str);
+
+  token = tokenizer_.NextNonWhitespace();
+  if (token.token_type != SqliteTokenType::TK_RP) {
+    base::StackString<1024> err("Expected closed parenthesis, received %*s.",
+                                static_cast<int>(token.str.size()),
+                                token.str.data());
+    return ErrorAtToken(token, err.c_str());
+  }
+
+  Token terminal = tokenizer_.NextTerminal();
+  statement_sql_ = tokenizer_.Substr(first_non_space_token, terminal);
+  statement_ = CreateIndex{replace, index_name, table_name, col_name};
+  return true;
+}
+
 bool PerfettoSqlParser::ParseCreatePerfettoFunction(
     bool replace,
     Token first_non_space_token) {
diff --git a/src/trace_processor/perfetto_sql/engine/perfetto_sql_parser.h b/src/trace_processor/perfetto_sql/engine/perfetto_sql_parser.h
index a9864e7..3e9671a 100644
--- a/src/trace_processor/perfetto_sql/engine/perfetto_sql_parser.h
+++ b/src/trace_processor/perfetto_sql/engine/perfetto_sql_parser.h
@@ -77,6 +77,14 @@
     SqlSource create_view_sql;
     std::vector<sql_argument::ArgumentDefinition> schema;
   };
+  // Indicates that the specified SQL was a CREATE PERFETTO INDEX statement
+  // with the following parameters.
+  struct CreateIndex {
+    bool replace;
+    std::string name;
+    std::string table_name;
+    std::string col_name;
+  };
   // Indicates that the specified SQL was a INCLUDE PERFETTO MODULE statement
   // with the following parameter.
   struct Include {
@@ -95,6 +103,7 @@
                                  CreateFunction,
                                  CreateTable,
                                  CreateView,
+                                 CreateIndex,
                                  Include,
                                  CreateMacro>;
 
@@ -162,6 +171,9 @@
 
   bool ParseCreatePerfettoMacro(bool replace);
 
+  bool ParseCreatePerfettoIndex(bool replace,
+                                SqliteTokenizer::Token first_non_space_token);
+
   // Convert a "raw" argument (i.e. one that points to specific tokens) to the
   // argument definition consumed by the rest of the SQL code.
   // Guarantees to call ErrorAtToken if std::nullopt is returned.
diff --git a/src/trace_processor/perfetto_sql/engine/perfetto_sql_parser_unittest.cc b/src/trace_processor/perfetto_sql/engine/perfetto_sql_parser_unittest.cc
index e3653ca..bc26039 100644
--- a/src/trace_processor/perfetto_sql/engine/perfetto_sql_parser_unittest.cc
+++ b/src/trace_processor/perfetto_sql/engine/perfetto_sql_parser_unittest.cc
@@ -37,6 +37,7 @@
 using CreateView = PerfettoSqlParser::CreateView;
 using Include = PerfettoSqlParser::Include;
 using CreateMacro = PerfettoSqlParser::CreateMacro;
+using CreateIndex = PerfettoSqlParser::CreateIndex;
 
 namespace {
 
diff --git a/src/trace_processor/perfetto_sql/engine/perfetto_sql_test_utils.h b/src/trace_processor/perfetto_sql/engine/perfetto_sql_test_utils.h
index 5a46950..2bcc6c0 100644
--- a/src/trace_processor/perfetto_sql/engine/perfetto_sql_test_utils.h
+++ b/src/trace_processor/perfetto_sql/engine/perfetto_sql_test_utils.h
@@ -61,6 +61,12 @@
          std::tie(b.replace, b.name, b.sql, b.args);
 }
 
+constexpr bool operator==(const PerfettoSqlParser::CreateIndex& a,
+                          const PerfettoSqlParser::CreateIndex& b) {
+  return std::tie(a.replace, a.name, a.table_name, a.col_name) ==
+         std::tie(b.replace, b.name, b.table_name, b.col_name);
+}
+
 inline std::ostream& operator<<(std::ostream& stream, const SqlSource& sql) {
   return stream << "SqlSource(sql=" << testing::PrintToString(sql.sql()) << ")";
 }