tp: Use CEngine when comparing int against double column

Bug:307482437
Change-Id: Id0d74fef6a338eef5ee53cd86f9d0d1ebfcd5d89
diff --git a/Android.bp b/Android.bp
index cebcd7b..6cb6274 100644
--- a/Android.bp
+++ b/Android.bp
@@ -10948,6 +10948,11 @@
     ],
 }
 
+// GN: //src/trace_processor/db:compare
+filegroup {
+    name: "perfetto_src_trace_processor_db_compare",
+}
+
 // GN: //src/trace_processor/db:db
 filegroup {
     name: "perfetto_src_trace_processor_db_db",
@@ -13813,6 +13818,7 @@
         ":perfetto_src_shared_lib_unittests",
         ":perfetto_src_trace_processor_containers_containers",
         ":perfetto_src_trace_processor_containers_unittests",
+        ":perfetto_src_trace_processor_db_compare",
         ":perfetto_src_trace_processor_db_db",
         ":perfetto_src_trace_processor_db_storage_fake_storage",
         ":perfetto_src_trace_processor_db_storage_storage",
diff --git a/src/trace_processor/db/BUILD.gn b/src/trace_processor/db/BUILD.gn
index 6b02d28..5dc4901 100644
--- a/src/trace_processor/db/BUILD.gn
+++ b/src/trace_processor/db/BUILD.gn
@@ -48,6 +48,14 @@
   ]
 }
 
+source_set("compare") {
+  sources = [ "compare.h" ]
+  deps = [
+    "../../../include/perfetto/trace_processor",
+    "../containers",
+  ]
+}
+
 perfetto_tp_tables("view_unittest") {
   sources = [ "view_unittest.py" ]
 }
diff --git a/src/trace_processor/db/query_executor.cc b/src/trace_processor/db/query_executor.cc
index 3c51d7a..a1df9f6 100644
--- a/src/trace_processor/db/query_executor.cc
+++ b/src/trace_processor/db/query_executor.cc
@@ -173,12 +173,6 @@
     use_legacy = use_legacy || (col.overlay().size() != column_size &&
                                 col.overlay().row_map().IsRange());
 
-    // Double column is filtered on with value different than double.
-    use_legacy =
-        use_legacy ||
-        (c.op != FilterOp::kIsNotNull && c.op != FilterOp::kIsNull &&
-         col.type() == SqlValue::kDouble && c.value.type != SqlValue::kDouble);
-
     // Extrinsically sorted columns.
     use_legacy = use_legacy ||
                  (col.IsSorted() && col.overlay().row_map().IsIndexVector());
diff --git a/src/trace_processor/db/storage/BUILD.gn b/src/trace_processor/db/storage/BUILD.gn
index d13b530..065d787 100644
--- a/src/trace_processor/db/storage/BUILD.gn
+++ b/src/trace_processor/db/storage/BUILD.gn
@@ -81,6 +81,7 @@
   deps = [
     ":fake_storage",
     ":storage",
+    "../:compare",
     "../../../../gn:default_deps",
     "../../../../gn:gtest_and_gmock",
     "../../../../include/perfetto/trace_processor:basic_types",
diff --git a/src/trace_processor/db/storage/numeric_storage.cc b/src/trace_processor/db/storage/numeric_storage.cc
index 4dc3679..8bd2fa8 100644
--- a/src/trace_processor/db/storage/numeric_storage.cc
+++ b/src/trace_processor/db/storage/numeric_storage.cc
@@ -19,6 +19,7 @@
 
 #include <cmath>
 #include <cstddef>
+#include <cstdint>
 #include <functional>
 #include <string>
 
@@ -194,11 +195,10 @@
   }
 }
 
-inline SearchValidationResult IntColumnToDouble(SqlValue* sql_val,
-                                                FilterOp op) {
+SearchValidationResult IntColumnToDouble(SqlValue* sql_val, FilterOp op) {
   double double_val = sql_val->AsDouble();
 
-  // Case when |sql_val| can be interpreted as a SqlValue::Long.
+  // Case when |sql_val| can be interpreted as a SqlValue::Double.
   if (std::equal_to<double>()(
           static_cast<double>(static_cast<int64_t>(double_val)), double_val)) {
     *sql_val = SqlValue::Long(static_cast<int64_t>(double_val));
@@ -231,6 +231,44 @@
   PERFETTO_FATAL("For GCC");
 }
 
+SearchValidationResult DoubleColumnWithInt(SqlValue* sql_val, FilterOp op) {
+  int64_t i = sql_val->AsLong();
+  double i_as_d = static_cast<double>(i);
+
+  // Case when |sql_val| can be interpreted as a SqlValue::Long.
+  if (std::equal_to<int64_t>()(i, static_cast<int64_t>(i_as_d))) {
+    *sql_val = SqlValue::Double(i_as_d);
+    return SearchValidationResult::kOk;
+  }
+
+  // Logic for when the value can't be represented as double.
+  switch (op) {
+    case FilterOp::kEq:
+      return SearchValidationResult::kNoData;
+    case FilterOp::kNe:
+      return SearchValidationResult::kAllData;
+
+    case FilterOp::kLe:
+    case FilterOp::kGt:
+      // The first double value smaller than |i|.
+      *sql_val = SqlValue::Double(std::nextafter(i_as_d, i - 1));
+      return SearchValidationResult::kOk;
+
+    case FilterOp::kLt:
+    case FilterOp::kGe:
+      // The first double value bigger than |i|.
+      *sql_val = SqlValue::Double(std::nextafter(i_as_d, i + 1));
+      return SearchValidationResult::kOk;
+
+    case FilterOp::kIsNotNull:
+    case FilterOp::kIsNull:
+    case FilterOp::kGlob:
+    case FilterOp::kRegex:
+      PERFETTO_FATAL("Invalid filter operation");
+  }
+  PERFETTO_FATAL("For GCC");
+}
+
 }  // namespace
 
 SearchValidationResult NumericStorageBase::ValidateSearchConstraints(
@@ -291,7 +329,7 @@
                          ? static_cast<double_t>(val.AsLong())
                          : val.AsDouble();
 
-  switch (type_) {
+  switch (storage_type_) {
     case ColumnType::kDouble:
       // Any value would make a sensible comparison with a double.
     case ColumnType::kInt64:
@@ -353,7 +391,8 @@
                                 std::to_string(static_cast<uint32_t>(op)));
                     });
 
-  if (sql_val.type == SqlValue::kDouble && type_ != ColumnType::kDouble) {
+  if (sql_val.type == SqlValue::kDouble &&
+      storage_type_ != ColumnType::kDouble) {
     auto comp = IntColumnToDouble(&sql_val, op);
     switch (comp) {
       case SearchValidationResult::kOk:
@@ -365,7 +404,20 @@
     }
   }
 
-  NumericValue val = GetNumericTypeVariant(type_, sql_val);
+  if (sql_val.type != SqlValue::kDouble &&
+      storage_type_ == ColumnType::kDouble) {
+    auto comp = DoubleColumnWithInt(&sql_val, op);
+    switch (comp) {
+      case SearchValidationResult::kOk:
+        break;
+      case SearchValidationResult::kAllData:
+        return RangeOrBitVector(search_range);
+      case SearchValidationResult::kNoData:
+        return RangeOrBitVector(Range());
+    }
+  }
+
+  NumericValue val = GetNumericTypeVariant(storage_type_, sql_val);
 
   if (is_sorted_) {
     if (op != FilterOp::kNe) {
@@ -395,7 +447,8 @@
                                 std::to_string(static_cast<uint32_t>(op)));
                     });
 
-  if (sql_val.type == SqlValue::kDouble && type_ != ColumnType::kDouble) {
+  if (sql_val.type == SqlValue::kDouble &&
+      storage_type_ != ColumnType::kDouble) {
     auto comp = IntColumnToDouble(&sql_val, op);
     switch (comp) {
       case SearchValidationResult::kOk:
@@ -407,7 +460,20 @@
     }
   }
 
-  NumericValue val = GetNumericTypeVariant(type_, sql_val);
+  if (sql_val.type != SqlValue::kDouble &&
+      storage_type_ == ColumnType::kDouble) {
+    auto comp = DoubleColumnWithInt(&sql_val, op);
+    switch (comp) {
+      case SearchValidationResult::kOk:
+        break;
+      case SearchValidationResult::kAllData:
+        return RangeOrBitVector(Range(0, indices_size));
+      case SearchValidationResult::kNoData:
+        return RangeOrBitVector(Range());
+    }
+  }
+
+  NumericValue val = GetNumericTypeVariant(storage_type_, sql_val);
 
   if (sorted) {
     return RangeOrBitVector(
@@ -534,7 +600,7 @@
                            return first_val < second_val;
                          });
       },
-      GetNumericTypeVariant(type_, SqlValue::Long(0)));
+      GetNumericTypeVariant(storage_type_, SqlValue::Long(0)));
 }
 
 void NumericStorageBase::Sort(uint32_t*, uint32_t) const {
@@ -545,10 +611,10 @@
 void NumericStorageBase::Serialize(StorageProto* msg) const {
   auto* numeric_storage_msg = msg->set_numeric_storage();
   numeric_storage_msg->set_is_sorted(is_sorted_);
-  numeric_storage_msg->set_column_type(static_cast<uint32_t>(type_));
+  numeric_storage_msg->set_column_type(static_cast<uint32_t>(storage_type_));
 
   uint32_t type_size;
-  switch (type_) {
+  switch (storage_type_) {
     case ColumnType::kInt64:
       type_size = sizeof(int64_t);
       break;
diff --git a/src/trace_processor/db/storage/numeric_storage.h b/src/trace_processor/db/storage/numeric_storage.h
index 741a8e0..6af150a 100644
--- a/src/trace_processor/db/storage/numeric_storage.h
+++ b/src/trace_processor/db/storage/numeric_storage.h
@@ -60,7 +60,7 @@
                      uint32_t size,
                      ColumnType type,
                      bool is_sorted = false)
-      : size_(size), data_(data), type_(type), is_sorted_(is_sorted) {}
+      : size_(size), data_(data), storage_type_(type), is_sorted_(is_sorted) {}
 
  private:
   // All viable numeric values for ColumnTypes.
@@ -86,7 +86,7 @@
 
   const uint32_t size_ = 0;
   const void* data_ = nullptr;
-  const ColumnType type_ = ColumnType::kDummy;
+  const ColumnType storage_type_ = ColumnType::kDummy;
   const bool is_sorted_ = false;
 };
 
diff --git a/src/trace_processor/db/storage/numeric_storage_unittest.cc b/src/trace_processor/db/storage/numeric_storage_unittest.cc
index eceea34..5806f83 100644
--- a/src/trace_processor/db/storage/numeric_storage_unittest.cc
+++ b/src/trace_processor/db/storage/numeric_storage_unittest.cc
@@ -16,6 +16,8 @@
 #include "src/trace_processor/db/storage/numeric_storage.h"
 #include <cstdint>
 
+#include "perfetto/trace_processor/basic_types.h"
+#include "src/trace_processor/db/compare.h"
 #include "src/trace_processor/db/storage/types.h"
 #include "test/gtest_and_gmock.h"
 
@@ -578,6 +580,77 @@
   ASSERT_THAT(ToIndexVector(res), ElementsAre(0, 1, 2, 3, 4, 5));
 }
 
+TEST(NumericStorageUnittest, DoubleColumnWithIntThatCantBeRepresentedAsDouble) {
+  // Sanity check that this value can't be represented as double.
+  int64_t not_rep_i = 9007199254740993;
+  EXPECT_FALSE(std::nextafter(static_cast<double>(not_rep_i), 1.0) ==
+               static_cast<double>(not_rep_i));
+  SqlValue val = SqlValue::Long(not_rep_i);
+
+  std::vector<double> data_vec{9007199254740992.0, 9007199254740994.0};
+
+  // Whether LongToDouble has the expected results.
+  ASSERT_TRUE(compare::LongToDouble(not_rep_i, data_vec[0]) > 0);
+  ASSERT_TRUE(compare::LongToDouble(not_rep_i, data_vec[1]) < 0);
+
+  NumericStorage<double> storage(&data_vec, ColumnType::kDouble);
+  Range test_range(0, 2);
+
+  auto res = storage.Search(FilterOp::kEq, val, test_range);
+  ASSERT_THAT(ToIndexVector(res), IsEmpty());
+
+  res = storage.Search(FilterOp::kNe, val, test_range);
+  ASSERT_THAT(ToIndexVector(res), ElementsAre(0, 1));
+
+  res = storage.Search(FilterOp::kLt, val, test_range);
+  ASSERT_THAT(ToIndexVector(res), ElementsAre(0));
+
+  res = storage.Search(FilterOp::kLe, val, test_range);
+  ASSERT_THAT(ToIndexVector(res), ElementsAre(0));
+
+  res = storage.Search(FilterOp::kGt, val, test_range);
+  ASSERT_THAT(ToIndexVector(res), ElementsAre(1));
+
+  res = storage.Search(FilterOp::kGe, val, test_range);
+  ASSERT_THAT(ToIndexVector(res), ElementsAre(1));
+}
+
+TEST(NumericStorageUnittest,
+     DoubleColumnWithNegIntThatCantBeRepresentedAsDouble) {
+  // Sanity check that this value can't be represented as double.
+  int64_t not_rep_i = -9007199254740993;
+  EXPECT_FALSE(std::nextafter(static_cast<double>(not_rep_i), 1.0) ==
+               static_cast<double>(not_rep_i));
+  SqlValue val = SqlValue::Long(not_rep_i);
+
+  std::vector<double> data_vec{-9007199254740992.0, -9007199254740994.0};
+
+  // Whether LongToDouble has the expected results.
+  ASSERT_TRUE(compare::LongToDouble(not_rep_i, data_vec[0]) < 0);
+  ASSERT_TRUE(compare::LongToDouble(not_rep_i, data_vec[1]) > 0);
+
+  NumericStorage<double> storage(&data_vec, ColumnType::kDouble);
+  Range test_range(0, 2);
+
+  auto res = storage.Search(FilterOp::kEq, val, test_range);
+  ASSERT_THAT(ToIndexVector(res), IsEmpty());
+
+  res = storage.Search(FilterOp::kNe, val, test_range);
+  ASSERT_THAT(ToIndexVector(res), ElementsAre(0, 1));
+
+  res = storage.Search(FilterOp::kLt, val, test_range);
+  ASSERT_THAT(ToIndexVector(res), ElementsAre(1));
+
+  res = storage.Search(FilterOp::kLe, val, test_range);
+  ASSERT_THAT(ToIndexVector(res), ElementsAre(1));
+
+  res = storage.Search(FilterOp::kGt, val, test_range);
+  ASSERT_THAT(ToIndexVector(res), ElementsAre(0));
+
+  res = storage.Search(FilterOp::kGe, val, test_range);
+  ASSERT_THAT(ToIndexVector(res), ElementsAre(0));
+}
+
 }  // namespace
 }  // namespace storage
 }  // namespace trace_processor