tp: C++ implementation of counter_leading_intervals

Time for `SELECT COUNT() FROM counter_leading_intervals!(counter)` decreased from 2.3s to 180 ms on a 20 MB trace.

Change-Id: Iba5085ce1089a951b213c3d23d9fabd864daf580
diff --git a/Android.bp b/Android.bp
index b689919..d0c7a89 100644
--- a/Android.bp
+++ b/Android.bp
@@ -13249,6 +13249,7 @@
     name: "perfetto_src_trace_processor_perfetto_sql_intrinsics_functions_functions",
     srcs: [
         "src/trace_processor/perfetto_sql/intrinsics/functions/base64.cc",
+        "src/trace_processor/perfetto_sql/intrinsics/functions/counter_intervals.cc",
         "src/trace_processor/perfetto_sql/intrinsics/functions/create_function.cc",
         "src/trace_processor/perfetto_sql/intrinsics/functions/create_view_function.cc",
         "src/trace_processor/perfetto_sql/intrinsics/functions/dominator_tree.cc",
diff --git a/BUILD b/BUILD
index bee4ad4..905fe46 100644
--- a/BUILD
+++ b/BUILD
@@ -2438,6 +2438,8 @@
         "src/trace_processor/perfetto_sql/intrinsics/functions/base64.cc",
         "src/trace_processor/perfetto_sql/intrinsics/functions/base64.h",
         "src/trace_processor/perfetto_sql/intrinsics/functions/clock_functions.h",
+        "src/trace_processor/perfetto_sql/intrinsics/functions/counter_intervals.cc",
+        "src/trace_processor/perfetto_sql/intrinsics/functions/counter_intervals.h",
         "src/trace_processor/perfetto_sql/intrinsics/functions/create_function.cc",
         "src/trace_processor/perfetto_sql/intrinsics/functions/create_function.h",
         "src/trace_processor/perfetto_sql/intrinsics/functions/create_view_function.cc",
@@ -2567,6 +2569,7 @@
     name = "src_trace_processor_perfetto_sql_intrinsics_types_types",
     srcs = [
         "src/trace_processor/perfetto_sql/intrinsics/types/array.h",
+        "src/trace_processor/perfetto_sql/intrinsics/types/counter.h",
         "src/trace_processor/perfetto_sql/intrinsics/types/node.h",
         "src/trace_processor/perfetto_sql/intrinsics/types/partitioned_intervals.h",
         "src/trace_processor/perfetto_sql/intrinsics/types/row_dataframe.h",
diff --git a/src/trace_processor/db/column_storage.h b/src/trace_processor/db/column_storage.h
index 5974081..eae4bef 100644
--- a/src/trace_processor/db/column_storage.h
+++ b/src/trace_processor/db/column_storage.h
@@ -137,6 +137,10 @@
     data_.insert(data_.end(), count, val);
     valid_.Resize(valid_.size() + static_cast<uint32_t>(count), true);
   }
+  void Append(const std::vector<T>& vals) {
+    data_.insert(data_.end(), vals.begin(), vals.end());
+    valid_.Resize(valid_.size() + static_cast<uint32_t>(vals.size()), true);
+  }
   void Set(uint32_t idx, T val) {
     if (mode_ == Mode::kDense) {
       valid_.Set(idx);
diff --git a/src/trace_processor/db/runtime_table.cc b/src/trace_processor/db/runtime_table.cc
index 9019d00..4370499 100644
--- a/src/trace_processor/db/runtime_table.cc
+++ b/src/trace_processor/db/runtime_table.cc
@@ -274,27 +274,31 @@
 }
 
 base::Status RuntimeTable::Builder::AddIntegers(uint32_t idx,
-                                                int64_t res,
+                                                int64_t val,
                                                 uint32_t count) {
   auto* col = storage_[idx].get();
   if (auto* leading_nulls_ptr = std::get_if<uint32_t>(col)) {
     *col = Fill<NullIntStorage>(*leading_nulls_ptr, std::nullopt);
   }
   if (auto* doubles = std::get_if<NullDoubleStorage>(col)) {
-    if (!IsPerfectlyRepresentableAsDouble(res)) {
+    if (!IsPerfectlyRepresentableAsDouble(val)) {
       return base::ErrStatus("Column %s contains %" PRId64
                              " which cannot be represented as a double",
-                             col_names_[idx].c_str(), res);
+                             col_names_[idx].c_str(), val);
     }
-    doubles->AppendMultiple(static_cast<double>(res), count);
+    doubles->AppendMultiple(static_cast<double>(val), count);
     return base::OkStatus();
   }
-  auto* ints = std::get_if<NullIntStorage>(col);
+  if (auto* null_ints = std::get_if<NullIntStorage>(col)) {
+    null_ints->AppendMultiple(val, count);
+    return base::OkStatus();
+  }
+  auto* ints = std::get_if<IntStorage>(col);
   if (!ints) {
     return base::ErrStatus("Column %s does not have consistent types",
                            col_names_[idx].c_str());
   }
-  ints->AppendMultiple(res, count);
+  ints->AppendMultiple(val, count);
   return base::OkStatus();
 }
 
@@ -369,9 +373,28 @@
   std::get<IntStorage>(*storage_[idx]).Append(res);
 }
 
+void RuntimeTable::Builder::AddNullIntegersUnchecked(
+    uint32_t idx,
+    const std::vector<int64_t>& res) {
+  std::get<NullIntStorage>(*storage_[idx]).Append(res);
+}
+
+void RuntimeTable::Builder::AddNonNullDoublesUnchecked(
+    uint32_t idx,
+    const std::vector<double>& vals) {
+  std::get<DoubleStorage>(*storage_[idx]).Append(vals);
+}
+
+void RuntimeTable::Builder::AddNullDoublesUnchecked(
+    uint32_t idx,
+    const std::vector<double>& vals) {
+  std::get<NullDoubleStorage>(*storage_[idx]).Append(vals);
+}
+
 base::StatusOr<std::unique_ptr<RuntimeTable>> RuntimeTable::Builder::Build(
     uint32_t rows) && {
-  std::vector<RefPtr<column::StorageLayer>> storage_layers(col_names_.size() + 1);
+  std::vector<RefPtr<column::StorageLayer>> storage_layers(col_names_.size() +
+                                                           1);
   std::vector<RefPtr<column::OverlayLayer>> null_layers(col_names_.size() + 1);
 
   std::vector<ColumnLegacy> legacy_columns;
@@ -422,12 +445,24 @@
           i, col_names_[i].c_str(), std::get_if<IntStorage>(col),
           storage_layers, overlay_layers, legacy_columns, legacy_overlays);
 
-    } else if (auto* doubles = std::get_if<NullDoubleStorage>(col)) {
-      // The doubles column.
+    } else if (auto* doubles = std::get_if<DoubleStorage>(col)) {
+      // The `doubles` column for tables where column types was provided before.
       PERFETTO_CHECK(doubles->size() == rows);
-      if (doubles->non_null_size() == doubles->size()) {
+      bool is_sorted =
+          std::is_sorted(doubles->vector().begin(), doubles->vector().end());
+      uint32_t flags =
+          is_sorted ? ColumnLegacy::Flag::kNonNull | ColumnLegacy::Flag::kSorted
+                    : ColumnLegacy::Flag::kNonNull;
+      legacy_columns.emplace_back(col_names_[i].c_str(), doubles, flags, i, 0);
+      storage_layers[i].reset(new column::NumericStorage<double>(
+          &doubles->vector(), ColumnType::kDouble, is_sorted));
+
+    } else if (auto* null_doubles = std::get_if<NullDoubleStorage>(col)) {
+      // The doubles column.
+      PERFETTO_CHECK(null_doubles->size() == rows);
+      if (null_doubles->non_null_size() == null_doubles->size()) {
         // The column is not nullable.
-        *col = DoubleStorage::CreateFromAssertNonNull(std::move(*doubles));
+        *col = DoubleStorage::CreateFromAssertNonNull(std::move(*null_doubles));
 
         auto* non_null_doubles = std::get_if<DoubleStorage>(col);
         bool is_sorted = std::is_sorted(non_null_doubles->vector().begin(),
@@ -441,12 +476,12 @@
             &non_null_doubles->vector(), ColumnType::kDouble, is_sorted));
       } else {
         // The column is nullable.
-        legacy_columns.emplace_back(col_names_[i].c_str(), doubles,
+        legacy_columns.emplace_back(col_names_[i].c_str(), null_doubles,
                                     ColumnLegacy::Flag::kNoFlag, i, 0);
         storage_layers[i].reset(new column::NumericStorage<double>(
-            &doubles->non_null_vector(), ColumnType::kDouble, false));
+            &null_doubles->non_null_vector(), ColumnType::kDouble, false));
         null_layers[i].reset(
-            new column::NullOverlay(&doubles->non_null_bit_vector()));
+            new column::NullOverlay(&null_doubles->non_null_bit_vector()));
       }
 
     } else if (auto* strings = std::get_if<StringStorage>(col)) {
diff --git a/src/trace_processor/db/runtime_table.h b/src/trace_processor/db/runtime_table.h
index 3dd2a78..ed93250 100644
--- a/src/trace_processor/db/runtime_table.h
+++ b/src/trace_processor/db/runtime_table.h
@@ -80,8 +80,10 @@
     void AddNonNullIntegerUnchecked(uint32_t idx, int64_t res) {
       std::get<IntStorage>(*storage_[idx]).Append(res);
     }
-    void AddNonNullIntegersUnchecked(uint32_t idx,
-                                     const std::vector<int64_t>& res);
+    void AddNonNullIntegersUnchecked(uint32_t idx, const std::vector<int64_t>&);
+    void AddNullIntegersUnchecked(uint32_t idx, const std::vector<int64_t>&);
+    void AddNonNullDoublesUnchecked(uint32_t idx, const std::vector<double>&);
+    void AddNullDoublesUnchecked(uint32_t idx, const std::vector<double>&);
 
     base::StatusOr<std::unique_ptr<RuntimeTable>> Build(uint32_t rows) &&;
 
diff --git a/src/trace_processor/perfetto_sql/intrinsics/functions/BUILD.gn b/src/trace_processor/perfetto_sql/intrinsics/functions/BUILD.gn
index 863c2e6..d7ae797 100644
--- a/src/trace_processor/perfetto_sql/intrinsics/functions/BUILD.gn
+++ b/src/trace_processor/perfetto_sql/intrinsics/functions/BUILD.gn
@@ -22,6 +22,8 @@
     "base64.cc",
     "base64.h",
     "clock_functions.h",
+    "counter_intervals.cc",
+    "counter_intervals.h",
     "create_function.cc",
     "create_function.h",
     "create_view_function.cc",
diff --git a/src/trace_processor/perfetto_sql/intrinsics/functions/counter_intervals.cc b/src/trace_processor/perfetto_sql/intrinsics/functions/counter_intervals.cc
new file mode 100644
index 0000000..f9c8d48
--- /dev/null
+++ b/src/trace_processor/perfetto_sql/intrinsics/functions/counter_intervals.cc
@@ -0,0 +1,169 @@
+/*
+ * Copyright (C) 2024 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "src/trace_processor/perfetto_sql/intrinsics/functions/counter_intervals.h"
+
+#include <algorithm>
+#include <cinttypes>
+#include <cstdint>
+#include <iterator>
+#include <memory>
+#include <numeric>
+#include <string>
+#include <string_view>
+#include <utility>
+#include <variant>
+#include <vector>
+
+#include "perfetto/base/compiler.h"
+#include "perfetto/base/logging.h"
+#include "perfetto/base/status.h"
+#include "perfetto/ext/base/flat_hash_map.h"
+#include "perfetto/ext/base/status_or.h"
+#include "perfetto/ext/base/string_utils.h"
+#include "perfetto/trace_processor/basic_types.h"
+#include "src/trace_processor/containers/string_pool.h"
+#include "src/trace_processor/db/runtime_table.h"
+#include "src/trace_processor/perfetto_sql/engine/function_util.h"
+#include "src/trace_processor/perfetto_sql/engine/perfetto_sql_engine.h"
+#include "src/trace_processor/perfetto_sql/intrinsics/types/counter.h"
+#include "src/trace_processor/perfetto_sql/intrinsics/types/partitioned_intervals.h"
+#include "src/trace_processor/sqlite/bindings/sqlite_bind.h"
+#include "src/trace_processor/sqlite/bindings/sqlite_column.h"
+#include "src/trace_processor/sqlite/bindings/sqlite_function.h"
+#include "src/trace_processor/sqlite/bindings/sqlite_result.h"
+#include "src/trace_processor/sqlite/bindings/sqlite_stmt.h"
+#include "src/trace_processor/sqlite/bindings/sqlite_type.h"
+#include "src/trace_processor/sqlite/bindings/sqlite_value.h"
+#include "src/trace_processor/sqlite/sqlite_utils.h"
+#include "src/trace_processor/util/status_macros.h"
+
+namespace perfetto::trace_processor::perfetto_sql {
+namespace {
+
+struct CounterIntervals : public SqliteFunction<CounterIntervals> {
+  static constexpr char kName[] = "__intrinsic_counter_intervals";
+  static constexpr int kArgCount = 3;
+
+  struct UserDataContext {
+    PerfettoSqlEngine* engine;
+    StringPool* pool;
+  };
+
+  static void Step(sqlite3_context* ctx, int argc, sqlite3_value** argv) {
+    PERFETTO_DCHECK(argc == kArgCount);
+    const char* leading_str = sqlite::value::Text(argv[0]);
+    if (!leading_str) {
+      return sqlite::result::Error(
+          ctx, "interval intersect: column list cannot be null");
+    }
+
+    // TODO(mayzner): Support 'lagging'.
+    if (base::CaseInsensitiveEqual("lagging", leading_str)) {
+      return sqlite::result::Error(
+          ctx, "interval intersect: 'lagging' is not implemented");
+    }
+    if (!base::CaseInsensitiveEqual("leading", leading_str)) {
+      return sqlite::result::Error(ctx,
+                                   "interval intersect: second argument has to "
+                                   "be either 'leading' or 'lagging");
+    }
+
+    int64_t trace_end = sqlite::value::Int64(argv[1]);
+
+    // Get column names of return columns.
+    std::vector<std::string> ret_col_names{
+        "id", "ts", "dur", "track_id", "value", "next_value", "delta_value"};
+    std::vector<RuntimeTable::BuilderColumnType> col_types{
+        RuntimeTable::kInt,         // id
+        RuntimeTable::kInt,         // ts,
+        RuntimeTable::kInt,         // dur
+        RuntimeTable::kInt,         // track_id
+        RuntimeTable::kDouble,      // value
+        RuntimeTable::kNullDouble,  // next_value
+        RuntimeTable::kNullDouble,  // delta_value
+    };
+
+    auto partitioned_counter = sqlite::value::Pointer<PartitionedCounter>(
+        argv[2], PartitionedCounter::kName);
+    if (!partitioned_counter) {
+      SQLITE_ASSIGN_OR_RETURN(
+          ctx, std::unique_ptr<RuntimeTable> ret_table,
+          RuntimeTable::Builder(GetUserData(ctx)->pool, ret_col_names)
+              .Build(0));
+      return sqlite::result::UniquePointer(ctx, std::move(ret_table), "TABLE");
+    }
+
+    RuntimeTable::Builder builder(GetUserData(ctx)->pool, ret_col_names,
+                                  col_types);
+
+    uint32_t rows_count = 0;
+    for (auto track_counter = partitioned_counter->partitions_map.GetIterator();
+         track_counter; ++track_counter) {
+      int64_t track_id = track_counter.key();
+      const auto& cols = track_counter.value();
+      size_t r_count = cols.id.size();
+      rows_count += r_count;
+
+      // Id
+      builder.AddNonNullIntegersUnchecked(0, cols.id);
+      // Ts
+      builder.AddNonNullIntegersUnchecked(1, cols.ts);
+
+      // Dur
+      std::vector<int64_t> dur(r_count);
+      for (size_t i = 0; i < r_count - 1; i++) {
+        dur[i] = cols.ts[i + 1] - cols.ts[i];
+      }
+      dur[r_count - 1] = trace_end - cols.ts.back();
+      builder.AddNonNullIntegersUnchecked(2, dur);
+
+      // Track id
+      builder.AddIntegers(3, track_id, static_cast<uint32_t>(r_count));
+      // Value
+      builder.AddNonNullDoublesUnchecked(4, cols.val);
+
+      // Next value
+      std::vector<double> next_vals(cols.val.begin() + 1, cols.val.end());
+      builder.AddNullDoublesUnchecked(5, next_vals);
+      builder.AddNull(5);
+
+      // Delta value
+      std::vector<double> deltas(r_count - 1);
+      for (size_t i = 0; i < r_count - 1; i++) {
+        deltas[i] = cols.val[i + 1] - cols.val[i];
+      }
+      builder.AddNull(6);
+      builder.AddNullDoublesUnchecked(6, deltas);
+    }
+
+    SQLITE_ASSIGN_OR_RETURN(ctx, std::unique_ptr<RuntimeTable> ret_tab,
+                            std::move(builder).Build(rows_count));
+
+    return sqlite::result::UniquePointer(ctx, std::move(ret_tab), "TABLE");
+  }
+};
+
+}  // namespace
+
+base::Status RegisterCounterIntervalsFunctions(PerfettoSqlEngine& engine,
+                                               StringPool* pool) {
+  return engine.RegisterSqliteFunction<CounterIntervals>(
+      std::make_unique<CounterIntervals::UserDataContext>(
+          CounterIntervals::UserDataContext{&engine, pool}));
+}
+
+}  // namespace perfetto::trace_processor::perfetto_sql
diff --git a/src/trace_processor/perfetto_sql/intrinsics/functions/counter_intervals.h b/src/trace_processor/perfetto_sql/intrinsics/functions/counter_intervals.h
new file mode 100644
index 0000000..dc0249a
--- /dev/null
+++ b/src/trace_processor/perfetto_sql/intrinsics/functions/counter_intervals.h
@@ -0,0 +1,32 @@
+/*
+ * Copyright (C) 2024 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef SRC_TRACE_PROCESSOR_PERFETTO_SQL_INTRINSICS_FUNCTIONS_COUNTER_INTERVALS_H_
+#define SRC_TRACE_PROCESSOR_PERFETTO_SQL_INTRINSICS_FUNCTIONS_COUNTER_INTERVALS_H_
+
+#include "perfetto/base/status.h"
+#include "src/trace_processor/containers/string_pool.h"
+#include "src/trace_processor/perfetto_sql/engine/perfetto_sql_engine.h"
+
+namespace perfetto::trace_processor::perfetto_sql {
+
+// Registers all interval intersect related functions with |engine|.
+base::Status RegisterCounterIntervalsFunctions(PerfettoSqlEngine& engine,
+                                               StringPool* pool);
+
+}  // namespace perfetto::trace_processor::perfetto_sql
+
+#endif  // SRC_TRACE_PROCESSOR_PERFETTO_SQL_INTRINSICS_FUNCTIONS_COUNTER_INTERVALS_H_
diff --git a/src/trace_processor/perfetto_sql/intrinsics/functions/type_builders.cc b/src/trace_processor/perfetto_sql/intrinsics/functions/type_builders.cc
index 4ce84d5..a9d9287 100644
--- a/src/trace_processor/perfetto_sql/intrinsics/functions/type_builders.cc
+++ b/src/trace_processor/perfetto_sql/intrinsics/functions/type_builders.cc
@@ -19,9 +19,11 @@
 #include <algorithm>
 #include <cstddef>
 #include <cstdint>
+#include <functional>
 #include <limits>
 #include <memory>
 #include <optional>
+#include <queue>
 #include <string>
 #include <utility>
 #include <variant>
@@ -31,11 +33,13 @@
 #include "perfetto/base/status.h"
 #include "perfetto/ext/base/flat_hash_map.h"
 #include "perfetto/ext/base/hash.h"
+#include "perfetto/ext/base/small_vector.h"
 #include "perfetto/public/compiler.h"
 #include "perfetto/trace_processor/basic_types.h"
 #include "src/trace_processor/containers/interval_intersector.h"
 #include "src/trace_processor/perfetto_sql/engine/perfetto_sql_engine.h"
 #include "src/trace_processor/perfetto_sql/intrinsics/types/array.h"
+#include "src/trace_processor/perfetto_sql/intrinsics/types/counter.h"
 #include "src/trace_processor/perfetto_sql/intrinsics/types/node.h"
 #include "src/trace_processor/perfetto_sql/intrinsics/types/partitioned_intervals.h"
 #include "src/trace_processor/perfetto_sql/intrinsics/types/row_dataframe.h"
@@ -407,6 +411,55 @@
   }
 };
 
+struct CounterPerTrackAgg
+    : public SqliteAggregateFunction<perfetto_sql::PartitionedCounter> {
+  static constexpr char kName[] = "__intrinsic_counter_per_track_agg";
+  static constexpr int kArgCount = 4;
+  struct AggCtx : SqliteAggregateContext<AggCtx> {
+    perfetto_sql::PartitionedCounter tracks;
+  };
+
+  static void Step(sqlite3_context* ctx, int rargc, sqlite3_value** argv) {
+    auto argc = static_cast<uint32_t>(rargc);
+    PERFETTO_DCHECK(argc == kArgCount);
+    auto& tracks = AggCtx::GetOrCreateContextForStep(ctx).tracks;
+
+    // Fetch columns.
+    int64_t id = sqlite::value::Int64(argv[0]);
+    int64_t ts = sqlite::value::Int64(argv[1]);
+    int64_t track_id = static_cast<uint32_t>(sqlite::value::Int64(argv[2]));
+    double val = sqlite::value::Double(argv[3]);
+
+    auto* new_rows_track = tracks.partitions_map.Find(track_id);
+    if (!new_rows_track) {
+      new_rows_track = tracks.partitions_map.Insert(track_id, {}).first;
+    } else if (std::equal_to<double>()(new_rows_track->val.back(), val)) {
+      // TODO(mayzner): This algorithm is focused on "leading" counters - if the
+      // counter before had the same value we can safely remove the new one as
+      // it adds no value. In the future we should also support "lagging" - if
+      // the next one has the same value as the previous, we should remove the
+      // previous.
+      return;
+    }
+
+    new_rows_track->id.push_back(id);
+    new_rows_track->ts.push_back(ts);
+    new_rows_track->val.push_back(val);
+  }
+
+  static void Final(sqlite3_context* ctx) {
+    auto raw_agg_ctx = AggCtx::GetContextOrNullForFinal(ctx);
+    if (!raw_agg_ctx) {
+      return sqlite::result::Null(ctx);
+    }
+    return sqlite::result::UniquePointer(
+        ctx,
+        std::make_unique<perfetto_sql::PartitionedCounter>(
+            std::move(raw_agg_ctx.get()->tracks)),
+        perfetto_sql::PartitionedCounter::kName);
+  }
+};
+
 }  // namespace
 
 base::Status RegisterTypeBuilderFunctions(PerfettoSqlEngine& engine) {
@@ -417,6 +470,8 @@
   RETURN_IF_ERROR(
       engine.RegisterSqliteAggregateFunction<IntervalTreeIntervalsAgg>(
           nullptr));
+  RETURN_IF_ERROR(
+      engine.RegisterSqliteAggregateFunction<CounterPerTrackAgg>(nullptr));
   return engine.RegisterSqliteAggregateFunction<NodeAgg>(nullptr);
 }
 
diff --git a/src/trace_processor/perfetto_sql/intrinsics/types/BUILD.gn b/src/trace_processor/perfetto_sql/intrinsics/types/BUILD.gn
index 628f05f..26486e83 100644
--- a/src/trace_processor/perfetto_sql/intrinsics/types/BUILD.gn
+++ b/src/trace_processor/perfetto_sql/intrinsics/types/BUILD.gn
@@ -20,6 +20,7 @@
 source_set("types") {
   sources = [
     "array.h",
+    "counter.h",
     "node.h",
     "partitioned_intervals.h",
     "row_dataframe.h",
diff --git a/src/trace_processor/perfetto_sql/intrinsics/types/counter.h b/src/trace_processor/perfetto_sql/intrinsics/types/counter.h
new file mode 100644
index 0000000..863bbc3
--- /dev/null
+++ b/src/trace_processor/perfetto_sql/intrinsics/types/counter.h
@@ -0,0 +1,44 @@
+/*
+ * Copyright (C) 2024 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef SRC_TRACE_PROCESSOR_PERFETTO_SQL_INTRINSICS_TYPES_COUNTER_H_
+#define SRC_TRACE_PROCESSOR_PERFETTO_SQL_INTRINSICS_TYPES_COUNTER_H_
+
+#include <cstdint>
+#include <limits>
+#include <string>
+#include <vector>
+#include "perfetto/ext/base/flat_hash_map.h"
+#include "perfetto/trace_processor/basic_types.h"
+
+namespace perfetto::trace_processor::perfetto_sql {
+
+struct CounterTrackPartition {
+  std::vector<int64_t> id;
+  std::vector<int64_t> ts;
+  std::vector<double> val;
+};
+
+struct PartitionedCounter {
+  static constexpr char kName[] = "COUNTER_TRACK_PARTITIONS";
+  base::
+      FlatHashMap<int64_t, CounterTrackPartition, base::AlreadyHashed<int64_t>>
+          partitions_map;
+};
+
+}  // namespace perfetto::trace_processor::perfetto_sql
+
+#endif  // SRC_TRACE_PROCESSOR_PERFETTO_SQL_INTRINSICS_TYPES_COUNTER_H_
diff --git a/src/trace_processor/perfetto_sql/stdlib/counters/intervals.sql b/src/trace_processor/perfetto_sql/stdlib/counters/intervals.sql
index cf9e5a3..7eac359 100644
--- a/src/trace_processor/perfetto_sql/stdlib/counters/intervals.sql
+++ b/src/trace_processor/perfetto_sql/stdlib/counters/intervals.sql
@@ -49,23 +49,32 @@
 -- value DOUBLE, next_value DOUBLE, delta_value DOUBLE).
 RETURNS TableOrSubquery AS
 (
-  WITH base AS (
-    SELECT
-      id,
-      ts,
-      track_id,
-      value,
-      LAG(value) OVER (PARTITION BY track_id ORDER BY ts) AS lag_value
-    FROM $counter_table
-  )
   SELECT
-    id,
-    ts,
-    LEAD(ts, 1, trace_end()) OVER(PARTITION BY track_id ORDER BY ts) - ts AS dur,
-    track_id,
-    value,
-    LEAD(value) OVER(PARTITION BY track_id ORDER BY ts) AS next_value,
-    value - lag_value AS delta_value
-  FROM base
-  WHERE value != lag_value OR lag_value IS NULL
+    c0 AS id,
+    c1 AS ts,
+    c2 AS dur,
+    c3 AS track_id,
+    c4 AS value,
+    c5 AS next_value,
+    c6 AS delta_value
+  FROM __intrinsic_table_ptr(
+    __intrinsic_counter_intervals(
+      "leading", TRACE_END(),
+      (SELECT __intrinsic_counter_per_track_agg(
+        input.id,
+        input.ts,
+        input.track_id,
+        input.value
+      )
+      FROM (SELECT * FROM $counter_table ORDER BY ts) input)
+    )
+  )
+
+  WHERE __intrinsic_table_ptr_bind(c0, 'id')
+    AND __intrinsic_table_ptr_bind(c1, 'ts')
+    AND __intrinsic_table_ptr_bind(c2, 'dur')
+    AND __intrinsic_table_ptr_bind(c3, 'track_id')
+    AND __intrinsic_table_ptr_bind(c4, 'value')
+    AND __intrinsic_table_ptr_bind(c5, 'next_value')
+    AND __intrinsic_table_ptr_bind(c6, 'delta_value')
 );
diff --git a/src/trace_processor/trace_processor_impl.cc b/src/trace_processor/trace_processor_impl.cc
index 114fb82..f41f1c8 100644
--- a/src/trace_processor/trace_processor_impl.cc
+++ b/src/trace_processor/trace_processor_impl.cc
@@ -71,6 +71,7 @@
 #include "src/trace_processor/perfetto_sql/engine/table_pointer_module.h"
 #include "src/trace_processor/perfetto_sql/intrinsics/functions/base64.h"
 #include "src/trace_processor/perfetto_sql/intrinsics/functions/clock_functions.h"
+#include "src/trace_processor/perfetto_sql/intrinsics/functions/counter_intervals.h"
 #include "src/trace_processor/perfetto_sql/intrinsics/functions/create_function.h"
 #include "src/trace_processor/perfetto_sql/intrinsics/functions/create_view_function.h"
 #include "src/trace_processor/perfetto_sql/intrinsics/functions/dominator_tree.h"
@@ -815,6 +816,10 @@
     base::Status status = perfetto_sql::RegisterIntervalIntersectFunctions(
         *engine_, context_.storage->mutable_string_pool());
   }
+  {
+    base::Status status = perfetto_sql::RegisterCounterIntervalsFunctions(
+        *engine_, context_.storage->mutable_string_pool());
+  }
 
   TraceStorage* storage = context_.storage.get();
 
diff --git a/test/trace_processor/diff_tests/stdlib/counters/tests.py b/test/trace_processor/diff_tests/stdlib/counters/tests.py
index d3ade3e..1bf51b4 100644
--- a/test/trace_processor/diff_tests/stdlib/counters/tests.py
+++ b/test/trace_processor/diff_tests/stdlib/counters/tests.py
@@ -27,24 +27,20 @@
         query="""
         INCLUDE PERFETTO MODULE counters.intervals;
 
-        WITH
-          foo AS (
-            SELECT 0 AS id, 0 AS ts, 10 AS value, 1 AS track_id
-            UNION ALL
-            SELECT 1 AS id, 0 AS ts, 10 AS value, 2 AS track_id
-            UNION ALL
-            SELECT 2 AS id, 10 AS ts, 10 AS value, 1 AS track_id
-            UNION ALL
-            SELECT 3 AS id, 10 AS ts, 20 AS value, 2 AS track_id
-            UNION ALL
-            SELECT 4 AS id, 20 AS ts, 30 AS value, 1 AS track_id
+          WITH data(id, ts, value, track_id) AS (
+            VALUES
+            (0, 0, 10, 1),
+            (1, 0, 10, 2),
+            (2, 10, 10, 1),
+            (3, 10, 20, 2),
+            (4, 20, 30, 1)
           )
-        SELECT * FROM counter_leading_intervals !(foo);
+          SELECT * FROM counter_leading_intervals!(data);
         """,
         out=Csv("""
         "id","ts","dur","track_id","value","next_value","delta_value"
-        0,0,20,1,10,30,"[NULL]"
-        4,20,19980,1,30,"[NULL]",20
-        1,0,10,2,10,20,"[NULL]"
-        3,10,19990,2,20,"[NULL]",10
+        0,0,20,1,10.000000,30.000000,"[NULL]"
+        4,20,19980,1,30.000000,"[NULL]",20.000000
+        1,0,10,2,10.000000,20.000000,"[NULL]"
+        3,10,19990,2,20.000000,"[NULL]",10.000000
         """))