tp: migrate QueryExecutor to using the newly added DataLayer in Table

Instead of creating DataLayer/DataLayerChains every time, use the
ones available in the Table instead.

Change-Id: I1248286899ea566d812d0c4262c7f7a4773a9ca9
diff --git a/python/generators/trace_processor_table/serialize.py b/python/generators/trace_processor_table/serialize.py
index 2d871bc..e4a9d96 100644
--- a/python/generators/trace_processor_table/serialize.py
+++ b/python/generators/trace_processor_table/serialize.py
@@ -289,11 +289,13 @@
       return f'''{self.name}_storage_layer_(
           new column::NumericStorage<ColumnType::{self.name}::non_optional_stored_type>(
             &{self.name}_.non_null_vector(),
-            ColumnTypeHelper<ColumnType::{self.name}::stored_type>::ToColumnType()))'''
+            ColumnTypeHelper<ColumnType::{self.name}::stored_type>::ToColumnType(),
+            {str(ColumnFlag.SORTED in self.flags).lower()}))'''
     return f'''{self.name}_storage_layer_(
         new column::NumericStorage<ColumnType::{self.name}::non_optional_stored_type>(
           &{self.name}_.vector(),
-          ColumnTypeHelper<ColumnType::{self.name}::stored_type>::ToColumnType()))'''
+          ColumnTypeHelper<ColumnType::{self.name}::stored_type>::ToColumnType(),
+          {str(ColumnFlag.SORTED in self.flags).lower()}))'''
 
   def null_layer_init(self) -> str:
     if self.is_ancestor:
diff --git a/src/trace_processor/db/column/dense_null_overlay_unittest.cc b/src/trace_processor/db/column/dense_null_overlay_unittest.cc
index 9838a34..e838555 100644
--- a/src/trace_processor/db/column/dense_null_overlay_unittest.cc
+++ b/src/trace_processor/db/column/dense_null_overlay_unittest.cc
@@ -39,8 +39,8 @@
 
 TEST(DenseNullOverlay, NoFilteringSearch) {
   std::vector<uint32_t> data{0, 1, 0, 1, 0};
-  auto numeric =
-      std::make_unique<NumericStorage<uint32_t>>(&data, ColumnType::kUint32);
+  auto numeric = std::make_unique<NumericStorage<uint32_t>>(
+      &data, ColumnType::kUint32, false);
 
   BitVector bv{0, 1, 0, 1, 0};
   DenseNullOverlay storage(&bv);
@@ -52,8 +52,8 @@
 
 TEST(DenseNullOverlay, RestrictInputSearch) {
   std::vector<uint32_t> data{0, 1, 0, 1, 0};
-  auto numeric =
-      std::make_unique<NumericStorage<uint32_t>>(&data, ColumnType::kUint32);
+  auto numeric = std::make_unique<NumericStorage<uint32_t>>(
+      &data, ColumnType::kUint32, false);
 
   BitVector bv{0, 1, 0, 1, 0};
   DenseNullOverlay storage(&bv);
@@ -98,8 +98,8 @@
 
 TEST(DenseNullOverlay, IndexSearch) {
   std::vector<uint32_t> data{1, 0, 0, 1, 1, 1};
-  auto numeric =
-      std::make_unique<NumericStorage<uint32_t>>(&data, ColumnType::kUint32);
+  auto numeric = std::make_unique<NumericStorage<uint32_t>>(
+      &data, ColumnType::kUint32, false);
 
   BitVector bv{1, 0, 0, 1, 1, 1};
   DenseNullOverlay storage(&bv);
diff --git a/src/trace_processor/db/column/numeric_storage.h b/src/trace_processor/db/column/numeric_storage.h
index 63d20ee..676b440 100644
--- a/src/trace_processor/db/column/numeric_storage.h
+++ b/src/trace_processor/db/column/numeric_storage.h
@@ -89,9 +89,7 @@
 template <typename T>
 class NumericStorage final : public NumericStorageBase {
  public:
-  NumericStorage(const std::vector<T>* vec,
-                 ColumnType type,
-                 bool is_sorted = false)
+  NumericStorage(const std::vector<T>* vec, ColumnType type, bool is_sorted)
       : NumericStorageBase(type, is_sorted), vector_(vec) {}
 
   std::unique_ptr<DataLayerChain> MakeChain() override {
diff --git a/src/trace_processor/db/column/numeric_storage_unittest.cc b/src/trace_processor/db/column/numeric_storage_unittest.cc
index 256f57d..a4e664e 100644
--- a/src/trace_processor/db/column/numeric_storage_unittest.cc
+++ b/src/trace_processor/db/column/numeric_storage_unittest.cc
@@ -45,7 +45,7 @@
 TEST(NumericStorage, InvalidSearchConstraintsGeneralChecks) {
   std::vector<uint32_t> data_vec(128);
   std::iota(data_vec.begin(), data_vec.end(), 0);
-  NumericStorage<uint32_t> storage(&data_vec, ColumnType::kUint32);
+  NumericStorage<uint32_t> storage(&data_vec, ColumnType::kUint32, false);
   auto chain = storage.MakeChain();
 
   Range test_range(20, 100);
@@ -75,7 +75,7 @@
 TEST(NumericStorage, InvalidValueBoundsUint32) {
   std::vector<uint32_t> data_vec(128);
   std::iota(data_vec.begin(), data_vec.end(), 0);
-  NumericStorage<uint32_t> storage(&data_vec, ColumnType::kUint32);
+  NumericStorage<uint32_t> storage(&data_vec, ColumnType::kUint32, false);
   auto chain = storage.MakeChain();
 
   SqlValue max_val = SqlValue::Long(
@@ -114,7 +114,7 @@
 TEST(NumericStorage, InvalidValueBoundsInt32) {
   std::vector<int32_t> data_vec(128);
   std::iota(data_vec.begin(), data_vec.end(), 0);
-  NumericStorage<int32_t> storage(&data_vec, ColumnType::kInt32);
+  NumericStorage<int32_t> storage(&data_vec, ColumnType::kInt32, false);
   auto chain = storage.MakeChain();
 
   SqlValue max_val = SqlValue::Long(
@@ -154,7 +154,7 @@
   std::vector<uint32_t> data_vec{0, 1, 2, 0, 1, 2, 0, 1, 2};
   std::vector<uint32_t> out = {0, 1, 2, 3, 4, 5, 6, 7, 8};
 
-  NumericStorage<uint32_t> storage(&data_vec, ColumnType::kUint32);
+  NumericStorage<uint32_t> storage(&data_vec, ColumnType::kUint32, false);
   auto chain = storage.MakeChain();
   RowMap rm(0, 9);
   chain->StableSort(out.data(), 9);
@@ -167,7 +167,7 @@
   std::vector<uint32_t> data_vec{0, 1, 2, 0, 1, 2, 0, 1, 2};
   std::vector<uint32_t> out = {1, 7, 4, 0, 6, 3, 2, 5, 8};
 
-  NumericStorage<uint32_t> storage(&data_vec, ColumnType::kUint32);
+  NumericStorage<uint32_t> storage(&data_vec, ColumnType::kUint32, false);
   auto chain = storage.MakeChain();
   RowMap rm(0, 9);
   chain->StableSort(out.data(), 9);
@@ -178,7 +178,7 @@
 
 TEST(NumericStorage, Search) {
   std::vector<int32_t> data_vec{-5, 5, -4, 4, -3, 3, 0};
-  NumericStorage<int32_t> storage(&data_vec, ColumnType::kInt32);
+  NumericStorage<int32_t> storage(&data_vec, ColumnType::kInt32, false);
   auto chain = storage.MakeChain();
   Range test_range(1, 5);
   SqlValue val = SqlValue::Long(4);
@@ -204,7 +204,7 @@
 
 TEST(NumericStorage, SearchCompareWithNegative) {
   std::vector<int32_t> data_vec{-5, 5, -4, 4, -3, 3, 0};
-  NumericStorage<int32_t> storage(&data_vec, ColumnType::kInt32);
+  NumericStorage<int32_t> storage(&data_vec, ColumnType::kInt32, false);
   auto chain = storage.MakeChain();
   Range test_range(1, 5);
   SqlValue val = SqlValue::Long(-3);
@@ -230,7 +230,7 @@
 
 TEST(NumericStorage, IndexSearch) {
   std::vector<int32_t> data_vec{-5, 5, -4, 4, -3, 3, 0};
-  NumericStorage<int32_t> storage(&data_vec, ColumnType::kInt32);
+  NumericStorage<int32_t> storage(&data_vec, ColumnType::kInt32, false);
   auto chain = storage.MakeChain();
 
   // -5, -3, -3, 3, 5, 0
@@ -259,7 +259,7 @@
 
 TEST(NumericStorage, IndexSearchCompareWithNegative) {
   std::vector<int32_t> data_vec{-5, 5, -4, 4, -3, 3, 0};
-  NumericStorage<int32_t> storage(&data_vec, ColumnType::kInt32);
+  NumericStorage<int32_t> storage(&data_vec, ColumnType::kInt32, false);
   auto chain = storage.MakeChain();
 
   // -5, -3, -3, 3, 5, 0
@@ -289,7 +289,7 @@
 TEST(NumericStorage, SearchFast) {
   std::vector<uint32_t> data_vec(128);
   std::iota(data_vec.begin(), data_vec.end(), 0);
-  NumericStorage<uint32_t> storage(&data_vec, ColumnType::kUint32);
+  NumericStorage<uint32_t> storage(&data_vec, ColumnType::kUint32, false);
   auto chain = storage.MakeChain();
   RangeOrBitVector range_or_bv =
       chain->Search(FilterOp::kGe, SqlValue::Long(100), Range(0, 128));
@@ -341,7 +341,7 @@
   Indices sorted_order{sorted_order_vec.data(), 10,
                        Indices::State::kNonmonotonic};
 
-  NumericStorage<uint32_t> storage(&data_vec, ColumnType::kUint32);
+  NumericStorage<uint32_t> storage(&data_vec, ColumnType::kUint32, false);
   auto chain = storage.MakeChain();
 
   Range range = chain->OrderedIndexSearch(FilterOp::kEq, SqlValue::Long(60),
@@ -377,7 +377,7 @@
 
 TEST(NumericStorage, SearchWithIntAsDouble) {
   std::vector<int32_t> data_vec{-5, 5, -4, 4, -3, 3, 0};
-  NumericStorage<int32_t> storage(&data_vec, ColumnType::kInt32);
+  NumericStorage<int32_t> storage(&data_vec, ColumnType::kInt32, false);
   auto chain = storage.MakeChain();
   Range test_range(1, 5);
   SqlValue val = SqlValue::Double(4);
@@ -403,7 +403,7 @@
 
 TEST(NumericStorage, IndexSearchWithIntAsDouble) {
   std::vector<int32_t> data_vec{-5, 5, -4, 4, -3, 3, 0};
-  NumericStorage<int32_t> storage(&data_vec, ColumnType::kInt32);
+  NumericStorage<int32_t> storage(&data_vec, ColumnType::kInt32, false);
   auto chain = storage.MakeChain();
 
   // -5, -3, -3, 3, 5, 0
@@ -432,7 +432,7 @@
 
 TEST(NumericStorage, SearchInt32WithDouble) {
   std::vector<int32_t> data_vec{-5, 5, -4, 4, -3, 3, 0};
-  NumericStorage<int32_t> storage(&data_vec, ColumnType::kInt32);
+  NumericStorage<int32_t> storage(&data_vec, ColumnType::kInt32, false);
   auto chain = storage.MakeChain();
   Range test_range(1, 5);
   SqlValue val = SqlValue::Double(3.5);
@@ -458,7 +458,7 @@
 
 TEST(NumericStorage, SearchInt32WithNegDouble) {
   std::vector<int32_t> data_vec{-5, 5, -4, 4, -3, 3, 0};
-  NumericStorage<int32_t> storage(&data_vec, ColumnType::kInt32);
+  NumericStorage<int32_t> storage(&data_vec, ColumnType::kInt32, false);
   auto chain = storage.MakeChain();
   Range test_range(1, 5);
   SqlValue val = SqlValue::Double(-3.5);
@@ -484,7 +484,7 @@
 
 TEST(NumericStorage, IndexSearchInt32WithDouble) {
   std::vector<int32_t> data_vec{-5, 5, -4, 4, -3, 3, 0};
-  NumericStorage<int32_t> storage(&data_vec, ColumnType::kInt32);
+  NumericStorage<int32_t> storage(&data_vec, ColumnType::kInt32, false);
   auto chain = storage.MakeChain();
 
   // -5, -3, -3, 3, 5, 0
@@ -513,7 +513,7 @@
 
 TEST(NumericStorage, IndexSearchInt32WithNegDouble) {
   std::vector<int32_t> data_vec{-5, 5, -4, 4, -3, 3, 0};
-  NumericStorage<int32_t> storage(&data_vec, ColumnType::kInt32);
+  NumericStorage<int32_t> storage(&data_vec, ColumnType::kInt32, false);
   auto chain = storage.MakeChain();
 
   // -5, -3, -3, 3, 5, 0
@@ -542,7 +542,7 @@
 
 TEST(NumericStorage, SearchUint32WithNegDouble) {
   std::vector<uint32_t> data_vec{0, 1, 2, 3, 4, 5};
-  NumericStorage<uint32_t> storage(&data_vec, ColumnType::kInt32);
+  NumericStorage<uint32_t> storage(&data_vec, ColumnType::kInt32, false);
   auto chain = storage.MakeChain();
   Range test_range(1, 5);
   SqlValue val = SqlValue::Double(-3.5);
@@ -568,7 +568,7 @@
 
 TEST(NumericStorage, IndexSearchUint32WithNegDouble) {
   std::vector<uint32_t> data_vec{0, 1, 2, 3, 4, 5, 6};
-  NumericStorage<uint32_t> storage(&data_vec, ColumnType::kInt32);
+  NumericStorage<uint32_t> storage(&data_vec, ColumnType::kInt32, false);
   auto chain = storage.MakeChain();
 
   std::vector<uint32_t> indices_vec{0, 4, 4, 5, 1, 6};
@@ -607,7 +607,7 @@
   ASSERT_TRUE(compare::LongToDouble(not_rep_i, data_vec[0]) > 0);
   ASSERT_TRUE(compare::LongToDouble(not_rep_i, data_vec[1]) < 0);
 
-  NumericStorage<double> storage(&data_vec, ColumnType::kDouble);
+  NumericStorage<double> storage(&data_vec, ColumnType::kDouble, false);
   auto chain = storage.MakeChain();
   Range test_range(0, 2);
 
@@ -643,7 +643,7 @@
   ASSERT_TRUE(compare::LongToDouble(not_rep_i, data_vec[0]) < 0);
   ASSERT_TRUE(compare::LongToDouble(not_rep_i, data_vec[1]) > 0);
 
-  NumericStorage<double> storage(&data_vec, ColumnType::kDouble);
+  NumericStorage<double> storage(&data_vec, ColumnType::kDouble, false);
   auto chain = storage.MakeChain();
   Range test_range(0, 2);
 
diff --git a/src/trace_processor/db/query_executor.cc b/src/trace_processor/db/query_executor.cc
index 239c833..f97c8c5 100644
--- a/src/trace_processor/db/query_executor.cc
+++ b/src/trace_processor/db/query_executor.cc
@@ -16,29 +16,16 @@
 
 #include <algorithm>
 #include <cstdint>
-#include <memory>
 #include <optional>
 #include <utility>
 #include <vector>
 
 #include "perfetto/base/logging.h"
-#include "perfetto/ext/base/small_vector.h"
 #include "perfetto/trace_processor/basic_types.h"
 #include "src/trace_processor/containers/bit_vector.h"
 #include "src/trace_processor/containers/row_map.h"
-#include "src/trace_processor/containers/string_pool.h"
 #include "src/trace_processor/db/column.h"
-#include "src/trace_processor/db/column/arrangement_overlay.h"
 #include "src/trace_processor/db/column/data_layer.h"
-#include "src/trace_processor/db/column/dense_null_overlay.h"
-#include "src/trace_processor/db/column/dummy_storage.h"
-#include "src/trace_processor/db/column/id_storage.h"
-#include "src/trace_processor/db/column/null_overlay.h"
-#include "src/trace_processor/db/column/numeric_storage.h"
-#include "src/trace_processor/db/column/range_overlay.h"
-#include "src/trace_processor/db/column/selector_overlay.h"
-#include "src/trace_processor/db/column/set_id_storage.h"
-#include "src/trace_processor/db/column/string_storage.h"
 #include "src/trace_processor/db/column/types.h"
 #include "src/trace_processor/db/query_executor.h"
 #include "src/trace_processor/db/table.h"
@@ -155,13 +142,7 @@
                                    const std::vector<Constraint>& c_vec) {
   RowMap rm(0, table->row_count());
   for (const auto& c : c_vec) {
-    if (rm.empty()) {
-      return rm;
-    }
-
     const ColumnLegacy& col = table->columns()[c.col_idx];
-    uint32_t column_size =
-        col.IsId() ? col.overlay().row_map().Max() : col.storage_base().size();
 
     // RowMap size is 1.
     bool use_legacy = rm.size() == 1;
@@ -170,135 +151,7 @@
       col.FilterInto(c.op, c.value, &rm);
       continue;
     }
-
-    // Create storage
-    base::SmallVector<std::unique_ptr<column::DataLayer>, 4> data_layers;
-    std::unique_ptr<column::DataLayerChain> chain;
-    if (col.IsSetId()) {
-      if (col.IsNullable()) {
-        data_layers.emplace_back(std::make_unique<column::SetIdStorage>(
-            &col.storage<std::optional<uint32_t>>().non_null_vector()));
-        chain = data_layers.back()->MakeChain();
-      } else {
-        data_layers.emplace_back(std::make_unique<column::SetIdStorage>(
-            &col.storage<uint32_t>().vector()));
-        chain = data_layers.back()->MakeChain();
-      }
-    } else {
-      switch (col.col_type()) {
-        case ColumnType::kDummy:
-          data_layers.emplace_back(std::make_unique<column::DummyStorage>());
-          chain = data_layers.back()->MakeChain();
-          break;
-        case ColumnType::kId:
-          data_layers.emplace_back(std::make_unique<column::IdStorage>());
-          chain = data_layers.back()->MakeChain();
-          break;
-        case ColumnType::kString:
-          data_layers.emplace_back(std::make_unique<column::StringStorage>(
-              table->string_pool(), &col.storage<StringPool::Id>().vector(),
-              col.IsSorted()));
-          chain = data_layers.back()->MakeChain();
-          break;
-        case ColumnType::kInt64:
-          if (col.IsNullable()) {
-            data_layers.emplace_back(
-                std::make_unique<column::NumericStorage<int64_t>>(
-                    &col.storage<std::optional<int64_t>>().non_null_vector(),
-                    col.col_type(), col.IsSorted()));
-            chain = data_layers.back()->MakeChain();
-
-          } else {
-            data_layers.emplace_back(
-                std::make_unique<column::NumericStorage<int64_t>>(
-                    &col.storage<int64_t>().vector(), col.col_type(),
-                    col.IsSorted()));
-            chain = data_layers.back()->MakeChain();
-          }
-          break;
-        case ColumnType::kUint32:
-          if (col.IsNullable()) {
-            data_layers.emplace_back(
-                std::make_unique<column::NumericStorage<uint32_t>>(
-                    &col.storage<std::optional<uint32_t>>().non_null_vector(),
-                    col.col_type(), col.IsSorted()));
-            chain = data_layers.back()->MakeChain();
-          } else {
-            data_layers.emplace_back(
-                std::make_unique<column::NumericStorage<uint32_t>>(
-                    &col.storage<uint32_t>().vector(), col.col_type(),
-                    col.IsSorted()));
-            chain = data_layers.back()->MakeChain();
-          }
-          break;
-        case ColumnType::kInt32:
-          if (col.IsNullable()) {
-            data_layers.emplace_back(
-                std::make_unique<column::NumericStorage<int32_t>>(
-                    &col.storage<std::optional<int32_t>>().non_null_vector(),
-                    col.col_type(), col.IsSorted()));
-            chain = data_layers.back()->MakeChain();
-          } else {
-            data_layers.emplace_back(
-                std::make_unique<column::NumericStorage<int32_t>>(
-                    &col.storage<int32_t>().vector(), col.col_type(),
-                    col.IsSorted()));
-            chain = data_layers.back()->MakeChain();
-          }
-          break;
-        case ColumnType::kDouble:
-          if (col.IsNullable()) {
-            data_layers.emplace_back(
-                std::make_unique<column::NumericStorage<double>>(
-                    &col.storage<std::optional<double>>().non_null_vector(),
-                    col.col_type(), col.IsSorted()));
-            chain = data_layers.back()->MakeChain();
-          } else {
-            data_layers.emplace_back(
-                std::make_unique<column::NumericStorage<double>>(
-                    &col.storage<double>().vector(), col.col_type(),
-                    col.IsSorted()));
-            chain = data_layers.back()->MakeChain();
-          }
-      }
-    }
-
-    if (col.IsNullable()) {
-      // String columns are inherently nullable: null values are signified
-      // with Id::Null().
-      PERFETTO_CHECK(col.col_type() != ColumnType::kString);
-      if (col.IsDense()) {
-        data_layers.emplace_back(std::make_unique<column::DenseNullOverlay>(
-            col.storage_base().bv()));
-        chain = data_layers.back()->MakeChain(std::move(chain));
-      } else {
-        data_layers.emplace_back(
-            std::make_unique<column::NullOverlay>(col.storage_base().bv()));
-        chain = data_layers.back()->MakeChain(std::move(chain));
-      }
-    }
-    if (col.overlay().row_map().IsIndexVector()) {
-      data_layers.emplace_back(std::make_unique<column::ArrangementOverlay>(
-          col.overlay().row_map().GetIfIndexVector(),
-          Indices::State::kNonmonotonic));
-      chain = data_layers.back()->MakeChain(
-          std::move(chain),
-          column::DataLayer::ChainCreationArgs(col.IsSorted()));
-    }
-    if (col.overlay().row_map().IsBitVector()) {
-      data_layers.emplace_back(std::make_unique<column::SelectorOverlay>(
-          col.overlay().row_map().GetIfBitVector()));
-      chain = data_layers.back()->MakeChain(std::move(chain));
-    }
-    if (col.overlay().row_map().IsRange() &&
-        col.overlay().size() != column_size) {
-      data_layers.emplace_back(std::make_unique<column::RangeOverlay>(
-          col.overlay().row_map().GetIfIRange()));
-      chain = data_layers.back()->MakeChain(std::move(chain));
-    }
-    uint32_t pre_count = rm.size();
-    FilterColumn(c, *chain, &rm);
-    PERFETTO_DCHECK(rm.size() <= pre_count);
+    FilterColumn(c, table->ChainForColumn(c.col_idx), &rm);
   }
   return rm;
 }
diff --git a/src/trace_processor/db/query_executor_unittest.cc b/src/trace_processor/db/query_executor_unittest.cc
index 53196da..a1a5ab3 100644
--- a/src/trace_processor/db/query_executor_unittest.cc
+++ b/src/trace_processor/db/query_executor_unittest.cc
@@ -54,7 +54,8 @@
 
 TEST(QueryExecutor, OnlyStorageRange) {
   std::vector<int64_t> storage_data{1, 2, 3, 4, 5};
-  column::NumericStorage<int64_t> storage(&storage_data, ColumnType::kInt64);
+  column::NumericStorage<int64_t> storage(&storage_data, ColumnType::kInt64,
+                                          false);
   auto chain = storage.MakeChain();
 
   Constraint c{0, FilterOp::kGe, SqlValue::Long(3)};
@@ -67,7 +68,8 @@
 
 TEST(QueryExecutor, OnlyStorageRangeIsNull) {
   std::vector<int64_t> storage_data{1, 2, 3, 4, 5};
-  column::NumericStorage<int64_t> storage(&storage_data, ColumnType::kInt64);
+  column::NumericStorage<int64_t> storage(&storage_data, ColumnType::kInt64,
+                                          false);
   auto chain = storage.MakeChain();
 
   Constraint c{0, FilterOp::kIsNull, SqlValue()};
@@ -83,7 +85,8 @@
   std::iota(storage_data.begin(), storage_data.end(), 0);
   std::transform(storage_data.begin(), storage_data.end(), storage_data.begin(),
                  [](int64_t n) { return n % 5; });
-  column::NumericStorage<int64_t> storage(&storage_data, ColumnType::kInt64);
+  column::NumericStorage<int64_t> storage(&storage_data, ColumnType::kInt64,
+                                          false);
   auto chain = storage.MakeChain();
 
   Constraint c{0, FilterOp::kLt, SqlValue::Long(2)};
@@ -99,7 +102,8 @@
 
 TEST(QueryExecutor, OnlyStorageIndexIsNull) {
   std::vector<int64_t> storage_data{1, 2, 3, 4, 5};
-  column::NumericStorage<int64_t> storage(&storage_data, ColumnType::kInt64);
+  column::NumericStorage<int64_t> storage(&storage_data, ColumnType::kInt64,
+                                          false);
   auto chain = storage.MakeChain();
 
   Constraint c{0, FilterOp::kIsNull, SqlValue()};
@@ -113,7 +117,7 @@
   std::vector<int64_t> storage_data(5);
   std::iota(storage_data.begin(), storage_data.end(), 0);
   auto numeric = std::make_unique<column::NumericStorage<int64_t>>(
-      &storage_data, ColumnType::kInt64);
+      &storage_data, ColumnType::kInt64, false);
   BitVector bv{1, 1, 0, 1, 1, 0, 0, 0, 1, 0};
   column::NullOverlay storage(&bv);
   auto chain = storage.MakeChain(numeric->MakeChain());
@@ -131,7 +135,7 @@
   std::vector<int64_t> storage_data(5);
   std::iota(storage_data.begin(), storage_data.end(), 0);
   auto numeric = std::make_unique<column::NumericStorage<int64_t>>(
-      &storage_data, ColumnType::kInt64);
+      &storage_data, ColumnType::kInt64, false);
 
   BitVector bv{1, 1, 0, 1, 1, 0, 0, 0, 1, 0};
   column::NullOverlay storage(&bv);
@@ -155,7 +159,7 @@
   std::transform(storage_data.begin(), storage_data.end(), storage_data.begin(),
                  [](int64_t n) { return n % 3; });
   auto numeric = std::make_unique<column::NumericStorage<int64_t>>(
-      &storage_data, ColumnType::kInt64);
+      &storage_data, ColumnType::kInt64, false);
 
   BitVector bv{1, 1, 0, 1, 1, 0, 1, 0, 0, 1};
   column::NullOverlay storage(&bv);
@@ -176,7 +180,7 @@
   std::vector<int64_t> storage_data(5);
   std::iota(storage_data.begin(), storage_data.end(), 0);
   auto numeric = std::make_unique<column::NumericStorage<int64_t>>(
-      &storage_data, ColumnType::kInt64);
+      &storage_data, ColumnType::kInt64, false);
 
   BitVector bv{1, 1, 0, 1, 1, 0, 0, 0, 1, 0};
   column::NullOverlay storage(&bv);
@@ -198,7 +202,7 @@
   std::vector<int64_t> storage_data(5);
   std::iota(storage_data.begin(), storage_data.end(), 0);
   auto numeric = std::make_unique<column::NumericStorage<int64_t>>(
-      &storage_data, ColumnType::kInt64);
+      &storage_data, ColumnType::kInt64, false);
 
   BitVector bv{1, 1, 0, 0, 1};
   SelectorOverlay storage(&bv);
@@ -217,7 +221,7 @@
   std::transform(storage_data.begin(), storage_data.end(), storage_data.begin(),
                  [](int64_t n) { return n % 5; });
   auto numeric = std::make_unique<column::NumericStorage<int64_t>>(
-      &storage_data, ColumnType::kInt64);
+      &storage_data, ColumnType::kInt64, false);
 
   BitVector bv{1, 1, 0, 1, 1, 0, 1, 0, 0, 1};
   SelectorOverlay storage(&bv);
@@ -234,7 +238,7 @@
   std::vector<int64_t> storage_data(5);
   std::iota(storage_data.begin(), storage_data.end(), 0);
   auto numeric = std::make_unique<column::NumericStorage<int64_t>>(
-      &storage_data, ColumnType::kInt64);
+      &storage_data, ColumnType::kInt64, false);
 
   std::vector<uint32_t> arrangement{4, 1, 2, 2, 3};
   ArrangementOverlay storage(&arrangement, Indices::State::kNonmonotonic);
@@ -279,7 +283,7 @@
   std::vector<int64_t> storage_data(5);
   std::iota(storage_data.begin(), storage_data.end(), 0);
   auto numeric = std::make_unique<column::NumericStorage<int64_t>>(
-      &storage_data, ColumnType::kInt64);
+      &storage_data, ColumnType::kInt64, false);
 
   std::vector<uint32_t> arrangement{4, 1, 2, 2, 3};
   ArrangementOverlay storage(&arrangement, Indices::State::kNonmonotonic);
@@ -294,7 +298,8 @@
 
 TEST(QueryExecutor, MismatchedTypeNullWithOtherOperations) {
   std::vector<int64_t> storage_data{0, 1, 2, 3, 0, 1, 2, 3};
-  column::NumericStorage<int64_t> storage(&storage_data, ColumnType::kInt64);
+  column::NumericStorage<int64_t> storage(&storage_data, ColumnType::kInt64,
+                                          false);
   auto chain = storage.MakeChain();
 
   // Filter.
@@ -308,7 +313,7 @@
 TEST(QueryExecutor, SingleConstraintWithNullAndSelector) {
   std::vector<int64_t> storage_data{0, 1, 2, 3, 0, 1, 2, 3};
   auto numeric = std::make_unique<column::NumericStorage<int64_t>>(
-      &storage_data, ColumnType::kInt64);
+      &storage_data, ColumnType::kInt64, false);
 
   // Current vector
   // 0, 1, NULL, 2, 3, 0, NULL, NULL, 1, 2, 3, NULL
@@ -334,7 +339,7 @@
 TEST(QueryExecutor, SingleConstraintWithNullAndArrangement) {
   std::vector<int64_t> storage_data{0, 1, 2, 3, 0, 1, 2, 3};
   auto numeric = std::make_unique<column::NumericStorage<int64_t>>(
-      &storage_data, ColumnType::kInt64);
+      &storage_data, ColumnType::kInt64, false);
 
   // Current vector
   // 0, 1, NULL, 2, 3, 0, NULL, NULL, 1, 2, 3, NULL
@@ -360,7 +365,7 @@
 TEST(QueryExecutor, IsNullWithSelector) {
   std::vector<int64_t> storage_data{0, 1, 2, 3, 0, 1, 2, 3};
   auto numeric = std::make_unique<column::NumericStorage<int64_t>>(
-      &storage_data, ColumnType::kInt64);
+      &storage_data, ColumnType::kInt64, false);
 
   // Current vector
   // 0, 1, NULL, 2, 3, 0, NULL, NULL, 1, 2, 3, NULL
diff --git a/src/trace_processor/db/runtime_table.cc b/src/trace_processor/db/runtime_table.cc
index 4a5db52..38938b8 100644
--- a/src/trace_processor/db/runtime_table.cc
+++ b/src/trace_processor/db/runtime_table.cc
@@ -206,7 +206,7 @@
         columns.emplace_back(col_names_[i].c_str(), ints,
                              ColumnLegacy::Flag::kNoFlag, i, 0);
         storage_layers[i].reset(new column::NumericStorage<int64_t>(
-            &ints->non_null_vector(), ColumnType::kInt64));
+            &ints->non_null_vector(), ColumnType::kInt64, false));
         null_layers[i].reset(
             new column::NullOverlay(&ints->non_null_bit_vector()));
       }
@@ -236,7 +236,7 @@
         columns.emplace_back(col_names_[i].c_str(), doubles,
                              ColumnLegacy::Flag::kNoFlag, i, 0);
         storage_layers[i].reset(new column::NumericStorage<double>(
-            &doubles->non_null_vector(), ColumnType::kDouble));
+            &doubles->non_null_vector(), ColumnType::kDouble, false));
         null_layers[i].reset(
             new column::NullOverlay(&doubles->non_null_bit_vector()));
       }
diff --git a/src/trace_processor/db/table.cc b/src/trace_processor/db/table.cc
index 8f746d5..96d2570 100644
--- a/src/trace_processor/db/table.cc
+++ b/src/trace_processor/db/table.cc
@@ -23,10 +23,14 @@
 #include <vector>
 
 #include "perfetto/base/logging.h"
+#include "perfetto/trace_processor/ref_counted.h"
 #include "src/trace_processor/containers/row_map.h"
 #include "src/trace_processor/containers/string_pool.h"
 #include "src/trace_processor/db/column.h"
+#include "src/trace_processor/db/column/arrangement_overlay.h"
 #include "src/trace_processor/db/column/data_layer.h"
+#include "src/trace_processor/db/column/range_overlay.h"
+#include "src/trace_processor/db/column/selector_overlay.h"
 #include "src/trace_processor/db/column/types.h"
 #include "src/trace_processor/db/column_storage_overlay.h"
 
@@ -156,7 +160,6 @@
     table.overlays_.emplace_back(overlay.SelectRows(rm));
     PERFETTO_DCHECK(table.overlays_.back().size() == table.row_count());
   }
-  table.OnConstructionCompleted(storage_layers_, null_layers_, overlay_layers_);
 
   // Remove the sorted and row set flags from all the columns.
   for (auto& col : table.columns_) {
@@ -169,7 +172,53 @@
   if (!ob.front().desc) {
     table.columns_[ob.front().col_idx].flags_ |= ColumnLegacy::Flag::kSorted;
   }
+
+  std::vector<RefPtr<column::DataLayer>> overlay_layers(table.overlays_.size());
+  for (uint32_t i = 0; i < table.overlays_.size(); ++i) {
+    if (table.overlays_[i].row_map().IsIndexVector()) {
+      overlay_layers[i].reset(new column::ArrangementOverlay(
+          table.overlays_[i].row_map().GetIfIndexVector(),
+          Indices::State::kNonmonotonic));
+    } else if (table.overlays_[i].row_map().IsBitVector()) {
+      overlay_layers[i].reset(new column::SelectorOverlay(
+          table.overlays_[i].row_map().GetIfBitVector()));
+    } else if (table.overlays_[i].row_map().IsRange()) {
+      overlay_layers[i].reset(
+          new column::RangeOverlay(table.overlays_[i].row_map().GetIfIRange()));
+    }
+  }
+  table.OnConstructionCompleted(storage_layers_, null_layers_,
+                                std::move(overlay_layers));
   return table;
 }
 
+void Table::OnConstructionCompleted(
+    std::vector<RefPtr<column::DataLayer>> storage_layers,
+    std::vector<RefPtr<column::DataLayer>> null_layers,
+    std::vector<RefPtr<column::DataLayer>> overlay_layers) {
+  for (ColumnLegacy& col : columns_) {
+    col.BindToTable(this, string_pool_);
+  }
+  PERFETTO_CHECK(storage_layers.size() == columns_.size());
+  PERFETTO_CHECK(null_layers.size() == columns_.size());
+  PERFETTO_CHECK(overlay_layers.size() == overlays_.size());
+  storage_layers_ = std::move(storage_layers);
+  null_layers_ = std::move(null_layers);
+  overlay_layers_ = std::move(overlay_layers);
+
+  for (uint32_t i = 0; i < columns_.size(); ++i) {
+    auto chain = storage_layers_[i]->MakeChain();
+    if (const auto& null_overlay = null_layers_[i]; null_overlay.get()) {
+      chain = null_overlay->MakeChain(std::move(chain));
+    }
+    const auto& oly_idx = columns_[i].overlay_index();
+    if (const auto& overlay = overlay_layers_[oly_idx]; overlay.get()) {
+      chain = overlay->MakeChain(
+          std::move(chain),
+          column::DataLayer::ChainCreationArgs{columns_[i].IsSorted()});
+    }
+    chains_.emplace_back(std::move(chain));
+  }
+}
+
 }  // namespace perfetto::trace_processor
diff --git a/src/trace_processor/db/table.h b/src/trace_processor/db/table.h
index 5931724..83ff7c5 100644
--- a/src/trace_processor/db/table.h
+++ b/src/trace_processor/db/table.h
@@ -23,6 +23,7 @@
 #include <utility>
 #include <vector>
 
+#include "perfetto/base/compiler.h"
 #include "perfetto/base/logging.h"
 #include "perfetto/trace_processor/basic_types.h"
 #include "perfetto/trace_processor/ref_counted.h"
@@ -126,6 +127,11 @@
   Table(Table&& other) noexcept { *this = std::move(other); }
   Table& operator=(Table&& other) noexcept;
 
+  // Return a chain corresponding to a given column.
+  const column::DataLayerChain& ChainForColumn(uint32_t col_idx) const {
+    return *chains_[col_idx];
+  }
+
   // Filters and sorts the tables with the arguments specified, returning the
   // result as a RowMap.
   RowMap QueryToRowMap(
@@ -184,29 +190,7 @@
   void OnConstructionCompleted(
       std::vector<RefPtr<column::DataLayer>> storage_layers,
       std::vector<RefPtr<column::DataLayer>> null_layers,
-      std::vector<RefPtr<column::DataLayer>> overlay_layers) {
-    for (ColumnLegacy& col : columns_) {
-      col.BindToTable(this, string_pool_);
-    }
-    PERFETTO_CHECK(storage_layers.size() == columns_.size());
-    PERFETTO_CHECK(null_layers.size() == columns_.size());
-    PERFETTO_CHECK(overlay_layers.size() == overlays_.size());
-    storage_layers_ = std::move(storage_layers);
-    null_layers_ = std::move(null_layers);
-    overlay_layers_ = std::move(overlay_layers);
-
-    for (uint32_t i = 0; i < columns_.size(); ++i) {
-      auto chain = storage_layers_[i]->MakeChain();
-      if (const auto& null_overlay = null_layers_[i]; null_overlay.get()) {
-        chain = null_overlay->MakeChain(std::move(chain));
-      }
-      const auto& oly_idx = columns_[i].overlay_index();
-      if (const auto& overlay = overlay_layers_[oly_idx]; overlay.get()) {
-        chain = overlay->MakeChain(std::move(chain));
-      }
-      chains_.emplace_back(std::move(chain));
-    }
-  }
+      std::vector<RefPtr<column::DataLayer>> overlay_layers);
 
   ColumnLegacy* GetColumn(uint32_t index) { return &columns_[index]; }
 
@@ -217,7 +201,7 @@
  private:
   friend class ColumnLegacy;
 
-  RowMap FilterToRowMap(
+  PERFETTO_ALWAYS_INLINE RowMap FilterToRowMap(
       const std::vector<Constraint>& cs,
       RowMap::OptimizeFor optimize_for = RowMap::OptimizeFor::kMemory) const {
     if (cs.empty()) {
@@ -247,7 +231,6 @@
   std::vector<RefPtr<column::DataLayer>> storage_layers_;
   std::vector<RefPtr<column::DataLayer>> null_layers_;
   std::vector<RefPtr<column::DataLayer>> overlay_layers_;
-
   std::vector<std::unique_ptr<column::DataLayerChain>> chains_;
 };
 
diff --git a/src/trace_processor/tables/macros_internal.h b/src/trace_processor/tables/macros_internal.h
index 628b7d6..ebb76bb 100644
--- a/src/trace_processor/tables/macros_internal.h
+++ b/src/trace_processor/tables/macros_internal.h
@@ -19,18 +19,15 @@
 
 #include <cstddef>
 #include <cstdint>
-#include <memory>
 #include <type_traits>
 #include <utility>
 #include <vector>
 
 #include "perfetto/base/logging.h"
 #include "src/trace_processor/containers/bit_vector.h"
-#include "perfetto/trace_processor/ref_counted.h"
 #include "src/trace_processor/containers/row_map.h"
 #include "src/trace_processor/containers/string_pool.h"
 #include "src/trace_processor/db/column.h"
-#include "src/trace_processor/db/column/data_layer.h"
 #include "src/trace_processor/db/column_storage.h"
 #include "src/trace_processor/db/column_storage_overlay.h"
 #include "src/trace_processor/db/table.h"