tp: migrate window table to new module based api
Change-Id: I1c07ba8902fc08461b6e9e6a2656725f5a0f868a
diff --git a/src/trace_processor/perfetto_sql/intrinsics/operators/window_operator.cc b/src/trace_processor/perfetto_sql/intrinsics/operators/window_operator.cc
index d0712ff..eb31bb3 100644
--- a/src/trace_processor/perfetto_sql/intrinsics/operators/window_operator.cc
+++ b/src/trace_processor/perfetto_sql/intrinsics/operators/window_operator.cc
@@ -16,144 +16,204 @@
#include "src/trace_processor/perfetto_sql/intrinsics/operators/window_operator.h"
-#include "perfetto/base/status.h"
+#include <sqlite3.h>
+#include <cstdint>
+#include <memory>
+
+#include "perfetto/base/logging.h"
#include "src/trace_processor/sqlite/bindings/sqlite_result.h"
#include "src/trace_processor/sqlite/sqlite_utils.h"
-namespace perfetto {
-namespace trace_processor {
+namespace perfetto::trace_processor {
-WindowOperatorTable::WindowOperatorTable(sqlite3*, const TraceStorage*) {}
-WindowOperatorTable::~WindowOperatorTable() = default;
-
-base::Status WindowOperatorTable::Init(int,
- const char* const*,
- Schema* schema) {
- const bool kHidden = true;
- *schema = Schema(
- {
- // These are the operator columns:
- SqliteTableLegacy::Column(Column::kRowId, "rowid",
- SqlValue::Type::kLong, kHidden),
- SqliteTableLegacy::Column(Column::kQuantum, "quantum",
- SqlValue::Type::kLong, kHidden),
- SqliteTableLegacy::Column(Column::kWindowStart, "window_start",
- SqlValue::Type::kLong, kHidden),
- SqliteTableLegacy::Column(Column::kWindowDur, "window_dur",
- SqlValue::Type::kLong, kHidden),
- // These are the ouput columns:
- SqliteTableLegacy::Column(Column::kTs, "ts", SqlValue::Type::kLong),
- SqliteTableLegacy::Column(Column::kDuration, "dur",
- SqlValue::Type::kLong),
- SqliteTableLegacy::Column(Column::kQuantumTs, "quantum_ts",
- SqlValue::Type::kLong),
- },
- {Column::kRowId});
- return base::OkStatus();
+namespace {
+constexpr char kSchema[] = R"(
+ CREATE TABLE x(
+ rowid BIGINT HIDDEN,
+ quantum BIGINT HIDDEN,
+ window_start BIGINT HIDDEN,
+ window_dur BIGINT HIDDEN,
+ ts BIGINT,
+ dur BIGINT,
+ quantum_ts BIGINT,
+ PRIMARY KEY(rowid)
+ ) WITHOUT ROWID
+ )";
}
-std::unique_ptr<SqliteTableLegacy::BaseCursor>
-WindowOperatorTable::CreateCursor() {
- return std::unique_ptr<SqliteTableLegacy::BaseCursor>(new Cursor(this));
-}
+int WindowOperatorModule::Create(sqlite3* db,
+ void* raw_ctx,
+ int argc,
+ const char* const* argv,
+ sqlite3_vtab** vtab,
+ char**) {
+ PERFETTO_CHECK(argc == 3);
+ if (int ret = sqlite3_declare_vtab(db, kSchema); ret != SQLITE_OK) {
+ return ret;
+ }
+ auto* ctx = GetContext(raw_ctx);
+ auto it_and_inserted = ctx->state_by_name.Insert(argv[2], nullptr);
+ PERFETTO_CHECK(
+ it_and_inserted.second ||
+ (it_and_inserted.first && it_and_inserted.first->get()->disconnected));
+ *it_and_inserted.first = std::make_unique<State>();
-int WindowOperatorTable::BestIndex(const QueryConstraints&, BestIndexInfo*) {
+ std::unique_ptr<Vtab> res = std::make_unique<Vtab>();
+ res->context = ctx;
+ res->name = argv[2];
+ res->state = it_and_inserted.first->get();
+ *vtab = res.release();
return SQLITE_OK;
}
-base::Status WindowOperatorTable::ModifyConstraints(QueryConstraints* qc) {
- // Remove ordering on timestamp if it is the only ordering as we are already
- // sorted on TS. This makes span joining significantly faster.
- const auto& ob = qc->order_by();
- if (ob.size() == 1 && ob[0].iColumn == Column::kTs && !ob[0].desc) {
- qc->mutable_order_by()->clear();
- }
- return base::OkStatus();
+int WindowOperatorModule::Destroy(sqlite3_vtab* vtab) {
+ auto* tab = GetVtab(vtab);
+ PERFETTO_CHECK(tab->context->state_by_name.Erase(tab->name));
+ delete tab;
+ return SQLITE_OK;
}
-base::Status WindowOperatorTable::Update(int argc,
- sqlite3_value** argv,
- sqlite3_int64*) {
- // We only support updates to ts and dur. Disallow deletes (argc == 1) and
- // inserts (argv[0] == null).
- if (argc < 2 || sqlite3_value_type(argv[0]) == SQLITE_NULL) {
- return base::ErrStatus(
- "Invalid number/value of arguments when updating window table");
+int WindowOperatorModule::Connect(sqlite3* db,
+ void* raw_ctx,
+ int argc,
+ const char* const* argv,
+ sqlite3_vtab** vtab,
+ char**) {
+ PERFETTO_CHECK(argc == 3);
+ if (int ret = sqlite3_declare_vtab(db, kSchema); ret != SQLITE_OK) {
+ return ret;
}
+ auto* ctx = GetContext(raw_ctx);
+ auto* ptr = ctx->state_by_name.Find(argv[2]);
+ PERFETTO_CHECK(ptr);
+ ptr->get()->disconnected = false;
- int64_t new_quantum = sqlite3_value_int64(argv[3]);
- int64_t new_start = sqlite3_value_int64(argv[4]);
- int64_t new_dur = sqlite3_value_int64(argv[5]);
- if (new_dur == 0) {
- return base::ErrStatus("Cannot set duration of window table to zero.");
- }
-
- quantum_ = new_quantum;
- window_start_ = new_start;
- window_dur_ = new_dur;
-
- return base::OkStatus();
+ std::unique_ptr<Vtab> res = std::make_unique<Vtab>();
+ res->context = ctx;
+ res->name = argv[2];
+ res->state = ptr->get();
+ *vtab = res.release();
+ return SQLITE_OK;
}
-WindowOperatorTable::Cursor::Cursor(WindowOperatorTable* table)
- : SqliteTableLegacy::BaseCursor(table), table_(table) {}
-WindowOperatorTable::Cursor::~Cursor() = default;
+int WindowOperatorModule::Disconnect(sqlite3_vtab* vtab) {
+ auto* tab = GetVtab(vtab);
+ auto* ptr = tab->context->state_by_name.Find(tab->name);
+ PERFETTO_CHECK(ptr);
+ ptr->get()->disconnected = true;
+ delete tab;
+ return SQLITE_OK;
+}
-base::Status WindowOperatorTable::Cursor::Filter(const QueryConstraints& qc,
- sqlite3_value** argv,
- FilterHistory) {
- *this = Cursor(table_);
- window_start_ = table_->window_start_;
- window_end_ = table_->window_start_ + table_->window_dur_;
- step_size_ = table_->quantum_ == 0 ? table_->window_dur_ : table_->quantum_;
-
- current_ts_ = window_start_;
+int WindowOperatorModule::BestIndex(sqlite3_vtab*, sqlite3_index_info* info) {
+ info->orderByConsumed = info->nOrderBy == 1 &&
+ info->aOrderBy[0].iColumn == Column::kTs &&
+ !info->aOrderBy[0].desc;
// Set return first if there is a equals constraint on the row id asking to
// return the first row.
- bool return_first = qc.constraints().size() == 1 &&
- qc.constraints()[0].column == Column::kRowId &&
- sqlite::utils::IsOpEq(qc.constraints()[0].op) &&
- sqlite3_value_int(argv[0]) == 0;
- if (return_first) {
- filter_type_ = FilterType::kReturnFirst;
+ bool is_row_id_constraint = info->nConstraint == 1 &&
+ info->aConstraint[0].iColumn == Column::kRowId &&
+ info->aConstraint[0].usable &&
+ sqlite::utils::IsOpEq(info->aConstraint[0].op);
+ if (is_row_id_constraint) {
+ info->idxNum = 1;
+ info->aConstraintUsage[0].argvIndex = 1;
} else {
- filter_type_ = FilterType::kReturnAll;
+ info->idxNum = 0;
}
- return base::OkStatus();
+ return SQLITE_OK;
}
-base::Status WindowOperatorTable::Cursor::Column(sqlite3_context* context,
- int N) {
+int WindowOperatorModule::Open(sqlite3_vtab*, sqlite3_vtab_cursor** cursor) {
+ std::unique_ptr<Cursor> c = std::make_unique<Cursor>();
+ *cursor = c.release();
+ return SQLITE_OK;
+}
+
+int WindowOperatorModule::Close(sqlite3_vtab_cursor* cursor) {
+ delete GetCursor(cursor);
+ return SQLITE_OK;
+}
+
+int WindowOperatorModule::Filter(sqlite3_vtab_cursor* cursor,
+ int is_row_id_constraint,
+ const char*,
+ int argc,
+ sqlite3_value** argv) {
+ auto* t = GetVtab(cursor->pVtab);
+ auto* c = GetCursor(cursor);
+
+ c->window_start = t->state->window_start;
+ c->window_end = t->state->window_start + t->state->window_dur;
+ c->step_size =
+ t->state->quantum == 0 ? t->state->window_dur : t->state->quantum;
+
+ c->current_ts = c->window_start;
+
+ if (is_row_id_constraint) {
+ PERFETTO_CHECK(argc == 1);
+ c->filter_type = sqlite3_value_int(argv[0]) == 0 ? FilterType::kReturnFirst
+ : FilterType::kReturnAll;
+ } else {
+ c->filter_type = FilterType::kReturnAll;
+ }
+ return SQLITE_OK;
+}
+
+int WindowOperatorModule::Next(sqlite3_vtab_cursor* cursor) {
+ auto* c = GetCursor(cursor);
+ switch (c->filter_type) {
+ case FilterType::kReturnFirst:
+ c->current_ts = c->window_end;
+ break;
+ case FilterType::kReturnAll:
+ c->current_ts += c->step_size;
+ c->quantum_ts++;
+ break;
+ }
+ c->row_id++;
+ return SQLITE_OK;
+}
+
+int WindowOperatorModule::Eof(sqlite3_vtab_cursor* cursor) {
+ auto* c = GetCursor(cursor);
+ return c->current_ts >= c->window_end;
+}
+
+int WindowOperatorModule::Column(sqlite3_vtab_cursor* cursor,
+ sqlite3_context* ctx,
+ int N) {
+ auto* t = GetVtab(cursor->pVtab);
+ auto* c = GetCursor(cursor);
switch (N) {
case Column::kQuantum: {
- sqlite::result::Long(context,
- static_cast<sqlite_int64>(table_->quantum_));
+ sqlite::result::Long(ctx, static_cast<sqlite_int64>(t->state->quantum));
break;
}
case Column::kWindowStart: {
- sqlite::result::Long(context,
- static_cast<sqlite_int64>(table_->window_start_));
+ sqlite::result::Long(ctx,
+ static_cast<sqlite_int64>(t->state->window_start));
break;
}
case Column::kWindowDur: {
- sqlite::result::Long(context, static_cast<int>(table_->window_dur_));
+ sqlite::result::Long(ctx, static_cast<int>(t->state->window_dur));
break;
}
case Column::kTs: {
- sqlite::result::Long(context, static_cast<sqlite_int64>(current_ts_));
+ sqlite::result::Long(ctx, static_cast<sqlite_int64>(c->current_ts));
break;
}
case Column::kDuration: {
- sqlite::result::Long(context, static_cast<sqlite_int64>(step_size_));
+ sqlite::result::Long(ctx, static_cast<sqlite_int64>(c->step_size));
break;
}
case Column::kQuantumTs: {
- sqlite::result::Long(context, static_cast<sqlite_int64>(quantum_ts_));
+ sqlite::result::Long(ctx, static_cast<sqlite_int64>(c->quantum_ts));
break;
}
case Column::kRowId: {
- sqlite::result::Long(context, static_cast<sqlite_int64>(row_id_));
+ sqlite::result::Long(ctx, static_cast<sqlite_int64>(c->row_id));
break;
}
default: {
@@ -161,26 +221,39 @@
break;
}
}
- return base::OkStatus();
+ return SQLITE_OK;
}
-base::Status WindowOperatorTable::Cursor::Next() {
- switch (filter_type_) {
- case FilterType::kReturnFirst:
- current_ts_ = window_end_;
- break;
- case FilterType::kReturnAll:
- current_ts_ += step_size_;
- quantum_ts_++;
- break;
+int WindowOperatorModule::Rowid(sqlite3_vtab_cursor*, sqlite_int64*) {
+ return SQLITE_ERROR;
+}
+
+int WindowOperatorModule::Update(sqlite3_vtab* tab,
+ int argc,
+ sqlite3_value** argv,
+ sqlite_int64*) {
+ auto* t = GetVtab(tab);
+
+ // We only support updates to ts and dur. Disallow deletes (argc == 1) and
+ // inserts (argv[0] == null).
+ if (argc < 2 || sqlite3_value_type(argv[0]) == SQLITE_NULL) {
+ return sqlite::utils::SetError(
+ tab, "Invalid number/value of arguments when updating window table");
}
- row_id_++;
- return base::OkStatus();
+
+ int64_t new_quantum = sqlite3_value_int64(argv[3]);
+ int64_t new_start = sqlite3_value_int64(argv[4]);
+ int64_t new_dur = sqlite3_value_int64(argv[5]);
+ if (new_dur == 0) {
+ return sqlite::utils::SetError(
+ tab, "Cannot set duration of window table to zero.");
+ }
+
+ t->state->quantum = new_quantum;
+ t->state->window_start = new_start;
+ t->state->window_dur = new_dur;
+
+ return SQLITE_OK;
}
-bool WindowOperatorTable::Cursor::Eof() {
- return current_ts_ >= window_end_;
-}
-
-} // namespace trace_processor
-} // namespace perfetto
+} // namespace perfetto::trace_processor
diff --git a/src/trace_processor/perfetto_sql/intrinsics/operators/window_operator.h b/src/trace_processor/perfetto_sql/intrinsics/operators/window_operator.h
index f9c2414..d023042 100644
--- a/src/trace_processor/perfetto_sql/intrinsics/operators/window_operator.h
+++ b/src/trace_processor/perfetto_sql/intrinsics/operators/window_operator.h
@@ -17,20 +17,55 @@
#ifndef SRC_TRACE_PROCESSOR_PERFETTO_SQL_INTRINSICS_OPERATORS_WINDOW_OPERATOR_H_
#define SRC_TRACE_PROCESSOR_PERFETTO_SQL_INTRINSICS_OPERATORS_WINDOW_OPERATOR_H_
+#include <cstdint>
#include <limits>
#include <memory>
+#include <string>
-#include "perfetto/base/status.h"
-#include "src/trace_processor/sqlite/sqlite_table.h"
+#include "perfetto/ext/base/flat_hash_map.h"
+#include "src/trace_processor/sqlite/bindings/sqlite_module.h"
-namespace perfetto {
-namespace trace_processor {
+namespace perfetto::trace_processor {
class TraceStorage;
-class WindowOperatorTable final
- : public TypedSqliteTable<WindowOperatorTable, const TraceStorage*> {
- public:
+// Operator table which can emit spans of a configurable duration.
+struct WindowOperatorModule : sqlite::Module<WindowOperatorModule> {
+ // Defines the data to be generated by the table.
+ enum FilterType {
+ // Returns all the spans.
+ kReturnAll = 0,
+ // Only returns the first span of the table. Useful for UPDATE operations.
+ kReturnFirst = 1,
+ };
+ struct State {
+ bool disconnected = false;
+ int64_t quantum = 0;
+ int64_t window_start = 0;
+
+ // max of int64_t because SQLite technically only supports int64s and not
+ // uint64s.
+ int64_t window_dur = std::numeric_limits<int64_t>::max();
+ };
+ struct Context {
+ base::FlatHashMap<std::string, std::unique_ptr<State>> state_by_name;
+ };
+ struct Vtab : sqlite::Module<WindowOperatorModule>::Vtab {
+ Context* context;
+ std::string name;
+ State* state = nullptr;
+ };
+ struct Cursor : sqlite::Module<WindowOperatorModule>::Cursor {
+ int64_t window_start = 0;
+ int64_t window_end = 0;
+ int64_t step_size = 0;
+
+ int64_t current_ts = 0;
+ int64_t quantum_ts = 0;
+ int64_t row_id = 0;
+
+ FilterType filter_type = FilterType::kReturnAll;
+ };
enum Column {
kRowId = 0,
kQuantum = 1,
@@ -40,64 +75,44 @@
kDuration = 5,
kQuantumTs = 6
};
- class Cursor final : public SqliteTableLegacy::BaseCursor {
- public:
- explicit Cursor(WindowOperatorTable*);
- ~Cursor() final;
- Cursor(Cursor&&) = default;
- Cursor& operator=(Cursor&&) = default;
+ static constexpr auto kType = kCreateOnly;
+ static constexpr bool kDoesOverloadFunctions = false;
- // Implementation of SqliteTableLegacy::Cursor.
- base::Status Filter(const QueryConstraints& qc,
- sqlite3_value**,
- FilterHistory);
- base::Status Next();
- bool Eof();
- base::Status Column(sqlite3_context*, int N);
+ static int Create(sqlite3*,
+ void*,
+ int,
+ const char* const*,
+ sqlite3_vtab**,
+ char**);
+ static int Destroy(sqlite3_vtab*);
- private:
- // Defines the data to be generated by the table.
- enum FilterType {
- // Returns all the spans.
- kReturnAll = 0,
- // Only returns the first span of the table. Useful for UPDATE operations.
- kReturnFirst = 1,
- };
+ static int Connect(sqlite3*,
+ void*,
+ int,
+ const char* const*,
+ sqlite3_vtab**,
+ char**);
+ static int Disconnect(sqlite3_vtab*);
- int64_t window_start_ = 0;
- int64_t window_end_ = 0;
- int64_t step_size_ = 0;
+ static int BestIndex(sqlite3_vtab*, sqlite3_index_info*);
- int64_t current_ts_ = 0;
- int64_t quantum_ts_ = 0;
- int64_t row_id_ = 0;
+ static int Open(sqlite3_vtab*, sqlite3_vtab_cursor**);
+ static int Close(sqlite3_vtab_cursor*);
- FilterType filter_type_ = FilterType::kReturnAll;
+ static int Filter(sqlite3_vtab_cursor*,
+ int,
+ const char*,
+ int,
+ sqlite3_value**);
+ static int Next(sqlite3_vtab_cursor*);
+ static int Eof(sqlite3_vtab_cursor*);
+ static int Column(sqlite3_vtab_cursor*, sqlite3_context*, int);
+ static int Rowid(sqlite3_vtab_cursor*, sqlite_int64*);
- WindowOperatorTable* table_ = nullptr;
- };
-
- WindowOperatorTable(sqlite3*, const TraceStorage*);
- ~WindowOperatorTable() final;
-
- // Table implementation.
- base::Status Init(int, const char* const*, Schema* schema) final;
- std::unique_ptr<SqliteTableLegacy::BaseCursor> CreateCursor() final;
- int BestIndex(const QueryConstraints&, BestIndexInfo*) final;
- base::Status ModifyConstraints(QueryConstraints* qc) final;
- base::Status Update(int, sqlite3_value**, sqlite3_int64*) final;
-
- private:
- int64_t quantum_ = 0;
- int64_t window_start_ = 0;
-
- // max of int64_t because SQLite technically only supports int64s and not
- // uint64s.
- int64_t window_dur_ = std::numeric_limits<int64_t>::max();
+ static int Update(sqlite3_vtab*, int, sqlite3_value**, sqlite_int64*);
};
-} // namespace trace_processor
-} // namespace perfetto
+} // namespace perfetto::trace_processor
#endif // SRC_TRACE_PROCESSOR_PERFETTO_SQL_INTRINSICS_OPERATORS_WINDOW_OPERATOR_H_
diff --git a/src/trace_processor/sqlite/sqlite_engine.h b/src/trace_processor/sqlite/sqlite_engine.h
index 8dfb9a0..55e991f 100644
--- a/src/trace_processor/sqlite/sqlite_engine.h
+++ b/src/trace_processor/sqlite/sqlite_engine.h
@@ -133,6 +133,11 @@
typename Module::Context* ctx);
// Registers a SQLite virtual table module with the given name.
+ template <typename Module>
+ void RegisterVirtualTableModule(const std::string& module_name,
+ std::unique_ptr<typename Module::Context>);
+
+ // Registers a SQLite virtual table module with the given name.
template <typename Vtab, typename Context>
void RegisterVirtualTableModule(const std::string& module_name,
Context ctx,
@@ -205,6 +210,18 @@
PERFETTO_CHECK(res == SQLITE_OK);
}
+template <typename Module>
+void SqliteEngine::RegisterVirtualTableModule(
+ const std::string& module_name,
+ std::unique_ptr<typename Module::Context> ctx) {
+ static_assert(std::is_base_of_v<sqlite::Module<Module>, Module>,
+ "Must subclass sqlite::Module");
+ int res = sqlite3_create_module_v2(
+ db_.get(), module_name.c_str(), &Module::kModule, ctx.release(),
+ [](void* arg) { delete static_cast<typename Module::Context*>(arg); });
+ PERFETTO_CHECK(res == SQLITE_OK);
+}
+
template <typename Vtab, typename Context>
void SqliteEngine::RegisterVirtualTableModule(
const std::string& module_name,
diff --git a/src/trace_processor/sqlite/sqlite_utils.h b/src/trace_processor/sqlite/sqlite_utils.h
index 458398d..f3196a8 100644
--- a/src/trace_processor/sqlite/sqlite_utils.h
+++ b/src/trace_processor/sqlite/sqlite_utils.h
@@ -137,6 +137,12 @@
}
}
+inline int SetError(sqlite3_vtab* tab, const char* status) {
+ sqlite3_free(tab->zErrMsg);
+ tab->zErrMsg = sqlite3_mprintf("%s", status);
+ return SQLITE_ERROR;
+}
+
inline void SetError(sqlite3_context* ctx, const base::Status& status) {
PERFETTO_CHECK(!status.ok());
sqlite::result::Error(ctx, status.c_message());
diff --git a/src/trace_processor/trace_processor_impl.cc b/src/trace_processor/trace_processor_impl.cc
index ddbee39..bfd34d0 100644
--- a/src/trace_processor/trace_processor_impl.cc
+++ b/src/trace_processor/trace_processor_impl.cc
@@ -742,8 +742,8 @@
engine_->sqlite_engine()->RegisterVirtualTableModule<SpanJoinOperatorTable>(
"span_outer_join", engine_.get(),
SqliteTableLegacy::TableType::kExplicitCreate, false);
- engine_->sqlite_engine()->RegisterVirtualTableModule<WindowOperatorTable>(
- "window", storage, SqliteTableLegacy::TableType::kExplicitCreate, true);
+ engine_->sqlite_engine()->RegisterVirtualTableModule<WindowOperatorModule>(
+ "window", std::make_unique<WindowOperatorModule::Context>());
// Initalize the tables and views in the prelude.
InitializePreludeTablesViews(db);