tp: migrate window table to new module based api

Change-Id: I1c07ba8902fc08461b6e9e6a2656725f5a0f868a
diff --git a/src/trace_processor/perfetto_sql/intrinsics/operators/window_operator.cc b/src/trace_processor/perfetto_sql/intrinsics/operators/window_operator.cc
index d0712ff..eb31bb3 100644
--- a/src/trace_processor/perfetto_sql/intrinsics/operators/window_operator.cc
+++ b/src/trace_processor/perfetto_sql/intrinsics/operators/window_operator.cc
@@ -16,144 +16,204 @@
 
 #include "src/trace_processor/perfetto_sql/intrinsics/operators/window_operator.h"
 
-#include "perfetto/base/status.h"
+#include <sqlite3.h>
+#include <cstdint>
+#include <memory>
+
+#include "perfetto/base/logging.h"
 #include "src/trace_processor/sqlite/bindings/sqlite_result.h"
 #include "src/trace_processor/sqlite/sqlite_utils.h"
 
-namespace perfetto {
-namespace trace_processor {
+namespace perfetto::trace_processor {
 
-WindowOperatorTable::WindowOperatorTable(sqlite3*, const TraceStorage*) {}
-WindowOperatorTable::~WindowOperatorTable() = default;
-
-base::Status WindowOperatorTable::Init(int,
-                                       const char* const*,
-                                       Schema* schema) {
-  const bool kHidden = true;
-  *schema = Schema(
-      {
-          // These are the operator columns:
-          SqliteTableLegacy::Column(Column::kRowId, "rowid",
-                                    SqlValue::Type::kLong, kHidden),
-          SqliteTableLegacy::Column(Column::kQuantum, "quantum",
-                                    SqlValue::Type::kLong, kHidden),
-          SqliteTableLegacy::Column(Column::kWindowStart, "window_start",
-                                    SqlValue::Type::kLong, kHidden),
-          SqliteTableLegacy::Column(Column::kWindowDur, "window_dur",
-                                    SqlValue::Type::kLong, kHidden),
-          // These are the ouput columns:
-          SqliteTableLegacy::Column(Column::kTs, "ts", SqlValue::Type::kLong),
-          SqliteTableLegacy::Column(Column::kDuration, "dur",
-                                    SqlValue::Type::kLong),
-          SqliteTableLegacy::Column(Column::kQuantumTs, "quantum_ts",
-                                    SqlValue::Type::kLong),
-      },
-      {Column::kRowId});
-  return base::OkStatus();
+namespace {
+constexpr char kSchema[] = R"(
+    CREATE TABLE x(
+      rowid BIGINT HIDDEN,
+      quantum BIGINT HIDDEN,
+      window_start BIGINT HIDDEN,
+      window_dur BIGINT HIDDEN,
+      ts BIGINT,
+      dur BIGINT,
+      quantum_ts BIGINT,
+      PRIMARY KEY(rowid)
+    ) WITHOUT ROWID
+  )";
 }
 
-std::unique_ptr<SqliteTableLegacy::BaseCursor>
-WindowOperatorTable::CreateCursor() {
-  return std::unique_ptr<SqliteTableLegacy::BaseCursor>(new Cursor(this));
-}
+int WindowOperatorModule::Create(sqlite3* db,
+                                 void* raw_ctx,
+                                 int argc,
+                                 const char* const* argv,
+                                 sqlite3_vtab** vtab,
+                                 char**) {
+  PERFETTO_CHECK(argc == 3);
+  if (int ret = sqlite3_declare_vtab(db, kSchema); ret != SQLITE_OK) {
+    return ret;
+  }
+  auto* ctx = GetContext(raw_ctx);
+  auto it_and_inserted = ctx->state_by_name.Insert(argv[2], nullptr);
+  PERFETTO_CHECK(
+      it_and_inserted.second ||
+      (it_and_inserted.first && it_and_inserted.first->get()->disconnected));
+  *it_and_inserted.first = std::make_unique<State>();
 
-int WindowOperatorTable::BestIndex(const QueryConstraints&, BestIndexInfo*) {
+  std::unique_ptr<Vtab> res = std::make_unique<Vtab>();
+  res->context = ctx;
+  res->name = argv[2];
+  res->state = it_and_inserted.first->get();
+  *vtab = res.release();
   return SQLITE_OK;
 }
 
-base::Status WindowOperatorTable::ModifyConstraints(QueryConstraints* qc) {
-  // Remove ordering on timestamp if it is the only ordering as we are already
-  // sorted on TS. This makes span joining significantly faster.
-  const auto& ob = qc->order_by();
-  if (ob.size() == 1 && ob[0].iColumn == Column::kTs && !ob[0].desc) {
-    qc->mutable_order_by()->clear();
-  }
-  return base::OkStatus();
+int WindowOperatorModule::Destroy(sqlite3_vtab* vtab) {
+  auto* tab = GetVtab(vtab);
+  PERFETTO_CHECK(tab->context->state_by_name.Erase(tab->name));
+  delete tab;
+  return SQLITE_OK;
 }
 
-base::Status WindowOperatorTable::Update(int argc,
-                                         sqlite3_value** argv,
-                                         sqlite3_int64*) {
-  // We only support updates to ts and dur. Disallow deletes (argc == 1) and
-  // inserts (argv[0] == null).
-  if (argc < 2 || sqlite3_value_type(argv[0]) == SQLITE_NULL) {
-    return base::ErrStatus(
-        "Invalid number/value of arguments when updating window table");
+int WindowOperatorModule::Connect(sqlite3* db,
+                                  void* raw_ctx,
+                                  int argc,
+                                  const char* const* argv,
+                                  sqlite3_vtab** vtab,
+                                  char**) {
+  PERFETTO_CHECK(argc == 3);
+  if (int ret = sqlite3_declare_vtab(db, kSchema); ret != SQLITE_OK) {
+    return ret;
   }
+  auto* ctx = GetContext(raw_ctx);
+  auto* ptr = ctx->state_by_name.Find(argv[2]);
+  PERFETTO_CHECK(ptr);
+  ptr->get()->disconnected = false;
 
-  int64_t new_quantum = sqlite3_value_int64(argv[3]);
-  int64_t new_start = sqlite3_value_int64(argv[4]);
-  int64_t new_dur = sqlite3_value_int64(argv[5]);
-  if (new_dur == 0) {
-    return base::ErrStatus("Cannot set duration of window table to zero.");
-  }
-
-  quantum_ = new_quantum;
-  window_start_ = new_start;
-  window_dur_ = new_dur;
-
-  return base::OkStatus();
+  std::unique_ptr<Vtab> res = std::make_unique<Vtab>();
+  res->context = ctx;
+  res->name = argv[2];
+  res->state = ptr->get();
+  *vtab = res.release();
+  return SQLITE_OK;
 }
 
-WindowOperatorTable::Cursor::Cursor(WindowOperatorTable* table)
-    : SqliteTableLegacy::BaseCursor(table), table_(table) {}
-WindowOperatorTable::Cursor::~Cursor() = default;
+int WindowOperatorModule::Disconnect(sqlite3_vtab* vtab) {
+  auto* tab = GetVtab(vtab);
+  auto* ptr = tab->context->state_by_name.Find(tab->name);
+  PERFETTO_CHECK(ptr);
+  ptr->get()->disconnected = true;
+  delete tab;
+  return SQLITE_OK;
+}
 
-base::Status WindowOperatorTable::Cursor::Filter(const QueryConstraints& qc,
-                                                 sqlite3_value** argv,
-                                                 FilterHistory) {
-  *this = Cursor(table_);
-  window_start_ = table_->window_start_;
-  window_end_ = table_->window_start_ + table_->window_dur_;
-  step_size_ = table_->quantum_ == 0 ? table_->window_dur_ : table_->quantum_;
-
-  current_ts_ = window_start_;
+int WindowOperatorModule::BestIndex(sqlite3_vtab*, sqlite3_index_info* info) {
+  info->orderByConsumed = info->nOrderBy == 1 &&
+                          info->aOrderBy[0].iColumn == Column::kTs &&
+                          !info->aOrderBy[0].desc;
 
   // Set return first if there is a equals constraint on the row id asking to
   // return the first row.
-  bool return_first = qc.constraints().size() == 1 &&
-                      qc.constraints()[0].column == Column::kRowId &&
-                      sqlite::utils::IsOpEq(qc.constraints()[0].op) &&
-                      sqlite3_value_int(argv[0]) == 0;
-  if (return_first) {
-    filter_type_ = FilterType::kReturnFirst;
+  bool is_row_id_constraint = info->nConstraint == 1 &&
+                              info->aConstraint[0].iColumn == Column::kRowId &&
+                              info->aConstraint[0].usable &&
+                              sqlite::utils::IsOpEq(info->aConstraint[0].op);
+  if (is_row_id_constraint) {
+    info->idxNum = 1;
+    info->aConstraintUsage[0].argvIndex = 1;
   } else {
-    filter_type_ = FilterType::kReturnAll;
+    info->idxNum = 0;
   }
-  return base::OkStatus();
+  return SQLITE_OK;
 }
 
-base::Status WindowOperatorTable::Cursor::Column(sqlite3_context* context,
-                                                 int N) {
+int WindowOperatorModule::Open(sqlite3_vtab*, sqlite3_vtab_cursor** cursor) {
+  std::unique_ptr<Cursor> c = std::make_unique<Cursor>();
+  *cursor = c.release();
+  return SQLITE_OK;
+}
+
+int WindowOperatorModule::Close(sqlite3_vtab_cursor* cursor) {
+  delete GetCursor(cursor);
+  return SQLITE_OK;
+}
+
+int WindowOperatorModule::Filter(sqlite3_vtab_cursor* cursor,
+                                 int is_row_id_constraint,
+                                 const char*,
+                                 int argc,
+                                 sqlite3_value** argv) {
+  auto* t = GetVtab(cursor->pVtab);
+  auto* c = GetCursor(cursor);
+
+  c->window_start = t->state->window_start;
+  c->window_end = t->state->window_start + t->state->window_dur;
+  c->step_size =
+      t->state->quantum == 0 ? t->state->window_dur : t->state->quantum;
+
+  c->current_ts = c->window_start;
+
+  if (is_row_id_constraint) {
+    PERFETTO_CHECK(argc == 1);
+    c->filter_type = sqlite3_value_int(argv[0]) == 0 ? FilterType::kReturnFirst
+                                                     : FilterType::kReturnAll;
+  } else {
+    c->filter_type = FilterType::kReturnAll;
+  }
+  return SQLITE_OK;
+}
+
+int WindowOperatorModule::Next(sqlite3_vtab_cursor* cursor) {
+  auto* c = GetCursor(cursor);
+  switch (c->filter_type) {
+    case FilterType::kReturnFirst:
+      c->current_ts = c->window_end;
+      break;
+    case FilterType::kReturnAll:
+      c->current_ts += c->step_size;
+      c->quantum_ts++;
+      break;
+  }
+  c->row_id++;
+  return SQLITE_OK;
+}
+
+int WindowOperatorModule::Eof(sqlite3_vtab_cursor* cursor) {
+  auto* c = GetCursor(cursor);
+  return c->current_ts >= c->window_end;
+}
+
+int WindowOperatorModule::Column(sqlite3_vtab_cursor* cursor,
+                                 sqlite3_context* ctx,
+                                 int N) {
+  auto* t = GetVtab(cursor->pVtab);
+  auto* c = GetCursor(cursor);
   switch (N) {
     case Column::kQuantum: {
-      sqlite::result::Long(context,
-                           static_cast<sqlite_int64>(table_->quantum_));
+      sqlite::result::Long(ctx, static_cast<sqlite_int64>(t->state->quantum));
       break;
     }
     case Column::kWindowStart: {
-      sqlite::result::Long(context,
-                           static_cast<sqlite_int64>(table_->window_start_));
+      sqlite::result::Long(ctx,
+                           static_cast<sqlite_int64>(t->state->window_start));
       break;
     }
     case Column::kWindowDur: {
-      sqlite::result::Long(context, static_cast<int>(table_->window_dur_));
+      sqlite::result::Long(ctx, static_cast<int>(t->state->window_dur));
       break;
     }
     case Column::kTs: {
-      sqlite::result::Long(context, static_cast<sqlite_int64>(current_ts_));
+      sqlite::result::Long(ctx, static_cast<sqlite_int64>(c->current_ts));
       break;
     }
     case Column::kDuration: {
-      sqlite::result::Long(context, static_cast<sqlite_int64>(step_size_));
+      sqlite::result::Long(ctx, static_cast<sqlite_int64>(c->step_size));
       break;
     }
     case Column::kQuantumTs: {
-      sqlite::result::Long(context, static_cast<sqlite_int64>(quantum_ts_));
+      sqlite::result::Long(ctx, static_cast<sqlite_int64>(c->quantum_ts));
       break;
     }
     case Column::kRowId: {
-      sqlite::result::Long(context, static_cast<sqlite_int64>(row_id_));
+      sqlite::result::Long(ctx, static_cast<sqlite_int64>(c->row_id));
       break;
     }
     default: {
@@ -161,26 +221,39 @@
       break;
     }
   }
-  return base::OkStatus();
+  return SQLITE_OK;
 }
 
-base::Status WindowOperatorTable::Cursor::Next() {
-  switch (filter_type_) {
-    case FilterType::kReturnFirst:
-      current_ts_ = window_end_;
-      break;
-    case FilterType::kReturnAll:
-      current_ts_ += step_size_;
-      quantum_ts_++;
-      break;
+int WindowOperatorModule::Rowid(sqlite3_vtab_cursor*, sqlite_int64*) {
+  return SQLITE_ERROR;
+}
+
+int WindowOperatorModule::Update(sqlite3_vtab* tab,
+                                 int argc,
+                                 sqlite3_value** argv,
+                                 sqlite_int64*) {
+  auto* t = GetVtab(tab);
+
+  // We only support updates to ts and dur. Disallow deletes (argc == 1) and
+  // inserts (argv[0] == null).
+  if (argc < 2 || sqlite3_value_type(argv[0]) == SQLITE_NULL) {
+    return sqlite::utils::SetError(
+        tab, "Invalid number/value of arguments when updating window table");
   }
-  row_id_++;
-  return base::OkStatus();
+
+  int64_t new_quantum = sqlite3_value_int64(argv[3]);
+  int64_t new_start = sqlite3_value_int64(argv[4]);
+  int64_t new_dur = sqlite3_value_int64(argv[5]);
+  if (new_dur == 0) {
+    return sqlite::utils::SetError(
+        tab, "Cannot set duration of window table to zero.");
+  }
+
+  t->state->quantum = new_quantum;
+  t->state->window_start = new_start;
+  t->state->window_dur = new_dur;
+
+  return SQLITE_OK;
 }
 
-bool WindowOperatorTable::Cursor::Eof() {
-  return current_ts_ >= window_end_;
-}
-
-}  // namespace trace_processor
-}  // namespace perfetto
+}  // namespace perfetto::trace_processor
diff --git a/src/trace_processor/perfetto_sql/intrinsics/operators/window_operator.h b/src/trace_processor/perfetto_sql/intrinsics/operators/window_operator.h
index f9c2414..d023042 100644
--- a/src/trace_processor/perfetto_sql/intrinsics/operators/window_operator.h
+++ b/src/trace_processor/perfetto_sql/intrinsics/operators/window_operator.h
@@ -17,20 +17,55 @@
 #ifndef SRC_TRACE_PROCESSOR_PERFETTO_SQL_INTRINSICS_OPERATORS_WINDOW_OPERATOR_H_
 #define SRC_TRACE_PROCESSOR_PERFETTO_SQL_INTRINSICS_OPERATORS_WINDOW_OPERATOR_H_
 
+#include <cstdint>
 #include <limits>
 #include <memory>
+#include <string>
 
-#include "perfetto/base/status.h"
-#include "src/trace_processor/sqlite/sqlite_table.h"
+#include "perfetto/ext/base/flat_hash_map.h"
+#include "src/trace_processor/sqlite/bindings/sqlite_module.h"
 
-namespace perfetto {
-namespace trace_processor {
+namespace perfetto::trace_processor {
 
 class TraceStorage;
 
-class WindowOperatorTable final
-    : public TypedSqliteTable<WindowOperatorTable, const TraceStorage*> {
- public:
+// Operator table which can emit spans of a configurable duration.
+struct WindowOperatorModule : sqlite::Module<WindowOperatorModule> {
+  // Defines the data to be generated by the table.
+  enum FilterType {
+    // Returns all the spans.
+    kReturnAll = 0,
+    // Only returns the first span of the table. Useful for UPDATE operations.
+    kReturnFirst = 1,
+  };
+  struct State {
+    bool disconnected = false;
+    int64_t quantum = 0;
+    int64_t window_start = 0;
+
+    // max of int64_t because SQLite technically only supports int64s and not
+    // uint64s.
+    int64_t window_dur = std::numeric_limits<int64_t>::max();
+  };
+  struct Context {
+    base::FlatHashMap<std::string, std::unique_ptr<State>> state_by_name;
+  };
+  struct Vtab : sqlite::Module<WindowOperatorModule>::Vtab {
+    Context* context;
+    std::string name;
+    State* state = nullptr;
+  };
+  struct Cursor : sqlite::Module<WindowOperatorModule>::Cursor {
+    int64_t window_start = 0;
+    int64_t window_end = 0;
+    int64_t step_size = 0;
+
+    int64_t current_ts = 0;
+    int64_t quantum_ts = 0;
+    int64_t row_id = 0;
+
+    FilterType filter_type = FilterType::kReturnAll;
+  };
   enum Column {
     kRowId = 0,
     kQuantum = 1,
@@ -40,64 +75,44 @@
     kDuration = 5,
     kQuantumTs = 6
   };
-  class Cursor final : public SqliteTableLegacy::BaseCursor {
-   public:
-    explicit Cursor(WindowOperatorTable*);
-    ~Cursor() final;
 
-    Cursor(Cursor&&) = default;
-    Cursor& operator=(Cursor&&) = default;
+  static constexpr auto kType = kCreateOnly;
+  static constexpr bool kDoesOverloadFunctions = false;
 
-    // Implementation of SqliteTableLegacy::Cursor.
-    base::Status Filter(const QueryConstraints& qc,
-                        sqlite3_value**,
-                        FilterHistory);
-    base::Status Next();
-    bool Eof();
-    base::Status Column(sqlite3_context*, int N);
+  static int Create(sqlite3*,
+                    void*,
+                    int,
+                    const char* const*,
+                    sqlite3_vtab**,
+                    char**);
+  static int Destroy(sqlite3_vtab*);
 
-   private:
-    // Defines the data to be generated by the table.
-    enum FilterType {
-      // Returns all the spans.
-      kReturnAll = 0,
-      // Only returns the first span of the table. Useful for UPDATE operations.
-      kReturnFirst = 1,
-    };
+  static int Connect(sqlite3*,
+                     void*,
+                     int,
+                     const char* const*,
+                     sqlite3_vtab**,
+                     char**);
+  static int Disconnect(sqlite3_vtab*);
 
-    int64_t window_start_ = 0;
-    int64_t window_end_ = 0;
-    int64_t step_size_ = 0;
+  static int BestIndex(sqlite3_vtab*, sqlite3_index_info*);
 
-    int64_t current_ts_ = 0;
-    int64_t quantum_ts_ = 0;
-    int64_t row_id_ = 0;
+  static int Open(sqlite3_vtab*, sqlite3_vtab_cursor**);
+  static int Close(sqlite3_vtab_cursor*);
 
-    FilterType filter_type_ = FilterType::kReturnAll;
+  static int Filter(sqlite3_vtab_cursor*,
+                    int,
+                    const char*,
+                    int,
+                    sqlite3_value**);
+  static int Next(sqlite3_vtab_cursor*);
+  static int Eof(sqlite3_vtab_cursor*);
+  static int Column(sqlite3_vtab_cursor*, sqlite3_context*, int);
+  static int Rowid(sqlite3_vtab_cursor*, sqlite_int64*);
 
-    WindowOperatorTable* table_ = nullptr;
-  };
-
-  WindowOperatorTable(sqlite3*, const TraceStorage*);
-  ~WindowOperatorTable() final;
-
-  // Table implementation.
-  base::Status Init(int, const char* const*, Schema* schema) final;
-  std::unique_ptr<SqliteTableLegacy::BaseCursor> CreateCursor() final;
-  int BestIndex(const QueryConstraints&, BestIndexInfo*) final;
-  base::Status ModifyConstraints(QueryConstraints* qc) final;
-  base::Status Update(int, sqlite3_value**, sqlite3_int64*) final;
-
- private:
-  int64_t quantum_ = 0;
-  int64_t window_start_ = 0;
-
-  // max of int64_t because SQLite technically only supports int64s and not
-  // uint64s.
-  int64_t window_dur_ = std::numeric_limits<int64_t>::max();
+  static int Update(sqlite3_vtab*, int, sqlite3_value**, sqlite_int64*);
 };
 
-}  // namespace trace_processor
-}  // namespace perfetto
+}  // namespace perfetto::trace_processor
 
 #endif  // SRC_TRACE_PROCESSOR_PERFETTO_SQL_INTRINSICS_OPERATORS_WINDOW_OPERATOR_H_
diff --git a/src/trace_processor/sqlite/sqlite_engine.h b/src/trace_processor/sqlite/sqlite_engine.h
index 8dfb9a0..55e991f 100644
--- a/src/trace_processor/sqlite/sqlite_engine.h
+++ b/src/trace_processor/sqlite/sqlite_engine.h
@@ -133,6 +133,11 @@
                                   typename Module::Context* ctx);
 
   // Registers a SQLite virtual table module with the given name.
+  template <typename Module>
+  void RegisterVirtualTableModule(const std::string& module_name,
+                                  std::unique_ptr<typename Module::Context>);
+
+  // Registers a SQLite virtual table module with the given name.
   template <typename Vtab, typename Context>
   void RegisterVirtualTableModule(const std::string& module_name,
                                   Context ctx,
@@ -205,6 +210,18 @@
   PERFETTO_CHECK(res == SQLITE_OK);
 }
 
+template <typename Module>
+void SqliteEngine::RegisterVirtualTableModule(
+    const std::string& module_name,
+    std::unique_ptr<typename Module::Context> ctx) {
+  static_assert(std::is_base_of_v<sqlite::Module<Module>, Module>,
+                "Must subclass sqlite::Module");
+  int res = sqlite3_create_module_v2(
+      db_.get(), module_name.c_str(), &Module::kModule, ctx.release(),
+      [](void* arg) { delete static_cast<typename Module::Context*>(arg); });
+  PERFETTO_CHECK(res == SQLITE_OK);
+}
+
 template <typename Vtab, typename Context>
 void SqliteEngine::RegisterVirtualTableModule(
     const std::string& module_name,
diff --git a/src/trace_processor/sqlite/sqlite_utils.h b/src/trace_processor/sqlite/sqlite_utils.h
index 458398d..f3196a8 100644
--- a/src/trace_processor/sqlite/sqlite_utils.h
+++ b/src/trace_processor/sqlite/sqlite_utils.h
@@ -137,6 +137,12 @@
   }
 }
 
+inline int SetError(sqlite3_vtab* tab, const char* status) {
+  sqlite3_free(tab->zErrMsg);
+  tab->zErrMsg = sqlite3_mprintf("%s", status);
+  return SQLITE_ERROR;
+}
+
 inline void SetError(sqlite3_context* ctx, const base::Status& status) {
   PERFETTO_CHECK(!status.ok());
   sqlite::result::Error(ctx, status.c_message());
diff --git a/src/trace_processor/trace_processor_impl.cc b/src/trace_processor/trace_processor_impl.cc
index ddbee39..bfd34d0 100644
--- a/src/trace_processor/trace_processor_impl.cc
+++ b/src/trace_processor/trace_processor_impl.cc
@@ -742,8 +742,8 @@
   engine_->sqlite_engine()->RegisterVirtualTableModule<SpanJoinOperatorTable>(
       "span_outer_join", engine_.get(),
       SqliteTableLegacy::TableType::kExplicitCreate, false);
-  engine_->sqlite_engine()->RegisterVirtualTableModule<WindowOperatorTable>(
-      "window", storage, SqliteTableLegacy::TableType::kExplicitCreate, true);
+  engine_->sqlite_engine()->RegisterVirtualTableModule<WindowOperatorModule>(
+      "window", std::make_unique<WindowOperatorModule::Context>());
 
   // Initalize the tables and views in the prelude.
   InitializePreludeTablesViews(db);