[stdlib]: Weight bounded DFS search

A DFS graph search algorithm with an early exit when a reachability tree exceeds
a target edge weight sum.

Test: tools/diff_test_trace_processor.py out/android/trace_processor_shell --name-filter '.*weight_bounded.*'
Change-Id: Id222f4733f0d3f7369cfe671e35544cfd9c91ee7
diff --git a/Android.bp b/Android.bp
index d5da1dd..47fe183 100644
--- a/Android.bp
+++ b/Android.bp
@@ -12221,6 +12221,7 @@
         "src/trace_processor/perfetto_sql/intrinsics/table_functions/connected_flow.cc",
         "src/trace_processor/perfetto_sql/intrinsics/table_functions/descendant.cc",
         "src/trace_processor/perfetto_sql/intrinsics/table_functions/dfs.cc",
+        "src/trace_processor/perfetto_sql/intrinsics/table_functions/dfs_weight_bounded.cc",
         "src/trace_processor/perfetto_sql/intrinsics/table_functions/dominator_tree.cc",
         "src/trace_processor/perfetto_sql/intrinsics/table_functions/experimental_annotated_stack.cc",
         "src/trace_processor/perfetto_sql/intrinsics/table_functions/experimental_counter_dur.cc",
diff --git a/BUILD b/BUILD
index 8eedd8d..026ca3f 100644
--- a/BUILD
+++ b/BUILD
@@ -2304,6 +2304,8 @@
         "src/trace_processor/perfetto_sql/intrinsics/table_functions/descendant.h",
         "src/trace_processor/perfetto_sql/intrinsics/table_functions/dfs.cc",
         "src/trace_processor/perfetto_sql/intrinsics/table_functions/dfs.h",
+        "src/trace_processor/perfetto_sql/intrinsics/table_functions/dfs_weight_bounded.cc",
+        "src/trace_processor/perfetto_sql/intrinsics/table_functions/dfs_weight_bounded.h",
         "src/trace_processor/perfetto_sql/intrinsics/table_functions/dominator_tree.cc",
         "src/trace_processor/perfetto_sql/intrinsics/table_functions/dominator_tree.h",
         "src/trace_processor/perfetto_sql/intrinsics/table_functions/experimental_annotated_stack.cc",
diff --git a/src/trace_processor/perfetto_sql/intrinsics/table_functions/BUILD.gn b/src/trace_processor/perfetto_sql/intrinsics/table_functions/BUILD.gn
index 6edd50b..814b6ba 100644
--- a/src/trace_processor/perfetto_sql/intrinsics/table_functions/BUILD.gn
+++ b/src/trace_processor/perfetto_sql/intrinsics/table_functions/BUILD.gn
@@ -27,6 +27,8 @@
     "descendant.h",
     "dfs.cc",
     "dfs.h",
+    "dfs_weight_bounded.cc",
+    "dfs_weight_bounded.h",
     "dominator_tree.cc",
     "dominator_tree.h",
     "experimental_annotated_stack.cc",
diff --git a/src/trace_processor/perfetto_sql/intrinsics/table_functions/dfs.h b/src/trace_processor/perfetto_sql/intrinsics/table_functions/dfs.h
index 8dad16d..43e3239 100644
--- a/src/trace_processor/perfetto_sql/intrinsics/table_functions/dfs.h
+++ b/src/trace_processor/perfetto_sql/intrinsics/table_functions/dfs.h
@@ -35,9 +35,9 @@
 //
 // Arguments:
 //  1) |source_node_ids|: RepeatedBuilderResult proto containing a column of
-//     int64 values corresponding to the source of edges.
+//     uint32 values corresponding to the source of edges.
 //  2) |dest_node_ids|:  RepeatedBuilderResult proto containing a column of
-//     int64 values corresponding to the destination of edges. This number of
+//     uint32 values corresponding to the destination of edges. This number of
 //     values should be the same as |source_node_ids| with each index in
 //     |source_node_ids| acting as the source for the corresponding index in
 //     |dest_node_ids|.
@@ -47,7 +47,7 @@
 // Returns:
 //  A table with the nodes reachable from the start node and their "parent" in
 //  the tree generated by the DFS. The schema of the table
-//  is (node_id int64_t, parent_node_id optional<int64_t>).
+//  is (node_id uint32_t, parent_node_id optional<uint32_t>).
 //
 // Note: this function is not intended to be used directly from SQL: instead
 // macros exist in the standard library, wrapping it and making it
diff --git a/src/trace_processor/perfetto_sql/intrinsics/table_functions/dfs_weight_bounded.cc b/src/trace_processor/perfetto_sql/intrinsics/table_functions/dfs_weight_bounded.cc
new file mode 100644
index 0000000..4ff33cc
--- /dev/null
+++ b/src/trace_processor/perfetto_sql/intrinsics/table_functions/dfs_weight_bounded.cc
@@ -0,0 +1,288 @@
+/*
+ * Copyright (C) 2024 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "src/trace_processor/perfetto_sql/intrinsics/table_functions/dfs_weight_bounded.h"
+
+#include <algorithm>
+#include <cstddef>
+#include <cstdint>
+#include <memory>
+#include <optional>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "perfetto/base/logging.h"
+#include "perfetto/base/status.h"
+#include "perfetto/ext/base/status_or.h"
+#include "perfetto/protozero/proto_decoder.h"
+#include "perfetto/trace_processor/basic_types.h"
+#include "protos/perfetto/trace_processor/metrics_impl.pbzero.h"
+#include "src/trace_processor/containers/string_pool.h"
+#include "src/trace_processor/db/column.h"
+#include "src/trace_processor/db/table.h"
+#include "src/trace_processor/perfetto_sql/intrinsics/table_functions/tables_py.h"
+#include "src/trace_processor/util/status_macros.h"
+
+namespace perfetto::trace_processor {
+namespace tables {
+DfsWeightBoundedTable::~DfsWeightBoundedTable() = default;
+}  // namespace tables
+
+namespace {
+struct Edge {
+  uint32_t id;
+  uint32_t weight;
+};
+using Destinations = std::vector<Edge>;
+
+base::StatusOr<std::vector<Destinations>> ParseSourceToDestionationsMap(
+    protos::pbzero::RepeatedBuilderResult::Decoder& source,
+    protos::pbzero::RepeatedBuilderResult::Decoder& dest,
+    protos::pbzero::RepeatedBuilderResult::Decoder& weight) {
+  std::vector<Destinations> source_to_destinations_map;
+  bool parse_error = false;
+  auto source_node_ids = source.int_values(&parse_error);
+  auto dest_node_ids = dest.int_values(&parse_error);
+  auto edge_weights = weight.int_values(&parse_error);
+
+  for (; source_node_ids && dest_node_ids && edge_weights;
+       ++source_node_ids, ++dest_node_ids, ++edge_weights) {
+    source_to_destinations_map.resize(
+        std::max(source_to_destinations_map.size(),
+                 std::max(static_cast<size_t>(*source_node_ids + 1),
+                          static_cast<size_t>(*dest_node_ids + 1))));
+    source_to_destinations_map[static_cast<uint32_t>(*source_node_ids)]
+        .push_back(Edge{static_cast<uint32_t>(*dest_node_ids),
+                        static_cast<uint32_t>(*edge_weights)});
+  }
+  if (parse_error) {
+    return base::ErrStatus("Failed while parsing source or dest ids");
+  }
+  if (static_cast<bool>(source_node_ids) != static_cast<bool>(dest_node_ids)) {
+    return base::ErrStatus(
+        "dfs_weight_bounded: length of source and destination columns is not "
+        "the same");
+  }
+  return source_to_destinations_map;
+}
+
+base::StatusOr<std::vector<Edge>> ParseRootToMaxWeightMap(
+    protos::pbzero::RepeatedBuilderResult::Decoder& start,
+    protos::pbzero::RepeatedBuilderResult::Decoder& end) {
+  std::vector<Edge> roots;
+  bool parse_error = false;
+  auto root_node_ids = start.int_values(&parse_error);
+  auto max_weights = end.int_values(&parse_error);
+
+  for (; root_node_ids && max_weights; ++root_node_ids, ++max_weights) {
+    roots.push_back(Edge{static_cast<uint32_t>(*root_node_ids),
+                         static_cast<uint32_t>(*max_weights)});
+  }
+
+  if (parse_error) {
+    return base::ErrStatus(
+        "Failed while parsing root_node_ids or root_max_weights");
+  }
+  if (static_cast<bool>(root_node_ids) != static_cast<bool>(max_weights)) {
+    return base::ErrStatus(
+        "dfs_weight_bounded: length of root_node_ids and root_max_weights "
+        "columns is not the same");
+  }
+  return roots;
+}
+
+void DfsWeightBoundedImpl(
+    tables::DfsWeightBoundedTable* table,
+    const std::vector<Destinations>& source_to_destinations_map,
+    const std::vector<Edge>& roots) {
+  struct StackState {
+    uint32_t id;
+    uint32_t weight;
+    std::optional<uint32_t> parent_id;
+  };
+
+  std::vector<uint8_t> seen_node_ids(source_to_destinations_map.size());
+  std::vector<StackState> stack;
+
+  for (const auto& root : roots) {
+    stack.clear();
+    stack.push_back({root.id, 0, std::nullopt});
+    std::fill(seen_node_ids.begin(), seen_node_ids.end(), 0);
+
+    for (uint32_t total_weight = 0; !stack.empty();) {
+      StackState stack_state = stack.back();
+      stack.pop_back();
+
+      if (seen_node_ids[stack_state.id]) {
+        continue;
+      }
+      seen_node_ids[stack_state.id] = true;
+
+      // We want to greedily return all possible edges that are reachable within
+      // the target weight. If an edge already fails the requirement, skip it
+      // and don't include it's weight but continue the search, some other edges
+      // might fit.
+      if (total_weight + stack_state.weight > root.weight) {
+        continue;
+      }
+      total_weight += stack_state.weight;
+
+      tables::DfsWeightBoundedTable::Row row;
+      row.root_node_id = root.id;
+      row.node_id = stack_state.id;
+      row.parent_node_id = stack_state.parent_id;
+      table->Insert(row);
+
+      PERFETTO_DCHECK(stack_state.id < source_to_destinations_map.size());
+
+      const auto& children = source_to_destinations_map[stack_state.id];
+      for (auto it = children.rbegin(); it != children.rend(); ++it) {
+        stack.emplace_back(StackState{(*it).id, (*it).weight, stack_state.id});
+      }
+    }
+  }
+}
+}  // namespace
+
+DfsWeightBounded::DfsWeightBounded(StringPool* pool) : pool_(pool) {}
+DfsWeightBounded::~DfsWeightBounded() = default;
+
+Table::Schema DfsWeightBounded::CreateSchema() {
+  return tables::DfsWeightBoundedTable::ComputeStaticSchema();
+}
+
+std::string DfsWeightBounded::TableName() {
+  return tables::DfsWeightBoundedTable::Name();
+}
+
+uint32_t DfsWeightBounded::EstimateRowCount() {
+  // TODO(lalitm): improve this estimate.
+  return 1024;
+}
+
+base::StatusOr<std::unique_ptr<Table>> DfsWeightBounded::ComputeTable(
+    const std::vector<SqlValue>& arguments) {
+  PERFETTO_CHECK(arguments.size() == 5);
+
+  const SqlValue& raw_source_ids = arguments[0];
+  const SqlValue& raw_dest_ids = arguments[1];
+  const SqlValue& raw_edge_weights = arguments[2];
+  const SqlValue& raw_root_ids = arguments[3];
+  const SqlValue& raw_root_max_weights = arguments[4];
+
+  if (raw_source_ids.is_null() && raw_dest_ids.is_null() &&
+      raw_edge_weights.is_null() && raw_root_ids.is_null() &&
+      raw_root_max_weights.is_null()) {
+    return std::unique_ptr<Table>(
+        std::make_unique<tables::DfsWeightBoundedTable>(pool_));
+  }
+
+  if (raw_source_ids.is_null() || raw_dest_ids.is_null() ||
+      raw_edge_weights.is_null() || raw_root_ids.is_null() ||
+      raw_root_max_weights.is_null()) {
+    return base::ErrStatus(
+        "dfs_weight_bounded: either all arguments should be null or none "
+        "should be");
+  }
+  if (raw_source_ids.type != SqlValue::kBytes) {
+    return base::ErrStatus(
+        "dfs_weight_bounded: source_node_ids should be a repeated field");
+  }
+  if (raw_dest_ids.type != SqlValue::kBytes) {
+    return base::ErrStatus(
+        "dfs_weight_bounded: dest_node_ids should be a repeated field");
+  }
+  if (raw_edge_weights.type != SqlValue::kBytes) {
+    return base::ErrStatus(
+        "dfs_weight_bounded: edge_weights should be a repeated field");
+  }
+  if (raw_root_ids.type != SqlValue::kBytes) {
+    return base::ErrStatus(
+        "dfs_weight_bounded: root_ids should be a repeated field");
+  }
+  if (raw_root_max_weights.type != SqlValue::kBytes) {
+    return base::ErrStatus(
+        "dfs_weight_bounded: root_max_weights should be a repeated field");
+  }
+
+  protos::pbzero::ProtoBuilderResult::Decoder proto_source_ids(
+      static_cast<const uint8_t*>(raw_source_ids.AsBytes()),
+      raw_source_ids.bytes_count);
+  if (!proto_source_ids.is_repeated()) {
+    return base::ErrStatus(
+        "dfs_weight_bounded: source_node_ids is not generated by RepeatedField "
+        "function");
+  }
+  protos::pbzero::RepeatedBuilderResult::Decoder source_ids(
+      proto_source_ids.repeated());
+
+  protos::pbzero::ProtoBuilderResult::Decoder proto_dest_ids(
+      static_cast<const uint8_t*>(raw_dest_ids.AsBytes()),
+      raw_dest_ids.bytes_count);
+  if (!proto_dest_ids.is_repeated()) {
+    return base::ErrStatus(
+        "dfs_weight_bounded: dest_node_ids is not generated by RepeatedField "
+        "function");
+  }
+  protos::pbzero::RepeatedBuilderResult::Decoder dest_ids(
+      proto_dest_ids.repeated());
+
+  protos::pbzero::ProtoBuilderResult::Decoder proto_edge_weights(
+      static_cast<const uint8_t*>(raw_edge_weights.AsBytes()),
+      raw_edge_weights.bytes_count);
+  if (!proto_edge_weights.is_repeated()) {
+    return base::ErrStatus(
+        "dfs_weight_bounded: edge_weights is not generated by RepeatedField "
+        "function");
+  }
+  protos::pbzero::RepeatedBuilderResult::Decoder edge_weights(
+      proto_edge_weights.repeated());
+
+  protos::pbzero::ProtoBuilderResult::Decoder proto_root_ids(
+      static_cast<const uint8_t*>(raw_root_ids.AsBytes()),
+      raw_root_ids.bytes_count);
+  if (!proto_root_ids.is_repeated()) {
+    return base::ErrStatus(
+        "dfs_weight_bounded: root_ids is not generated by RepeatedField "
+        "function");
+  }
+  protos::pbzero::RepeatedBuilderResult::Decoder root_ids(
+      proto_root_ids.repeated());
+
+  protos::pbzero::ProtoBuilderResult::Decoder proto_root_max_weights(
+      static_cast<const uint8_t*>(raw_root_max_weights.AsBytes()),
+      raw_root_max_weights.bytes_count);
+  if (!proto_root_max_weights.is_repeated()) {
+    return base::ErrStatus(
+        "dfs_weight_bounded: root_max_weights is not generated by "
+        "RepeatedField function");
+  }
+  protos::pbzero::RepeatedBuilderResult::Decoder root_max_weights(
+      proto_root_max_weights.repeated());
+
+  ASSIGN_OR_RETURN(auto map, ParseSourceToDestionationsMap(source_ids, dest_ids,
+                                                           edge_weights));
+
+  ASSIGN_OR_RETURN(auto roots,
+                   ParseRootToMaxWeightMap(root_ids, root_max_weights));
+
+  auto table = std::make_unique<tables::DfsWeightBoundedTable>(pool_);
+  DfsWeightBoundedImpl(table.get(), map, roots);
+  return std::unique_ptr<Table>(std::move(table));
+}
+
+}  // namespace perfetto::trace_processor
diff --git a/src/trace_processor/perfetto_sql/intrinsics/table_functions/dfs_weight_bounded.h b/src/trace_processor/perfetto_sql/intrinsics/table_functions/dfs_weight_bounded.h
new file mode 100644
index 0000000..f1af772
--- /dev/null
+++ b/src/trace_processor/perfetto_sql/intrinsics/table_functions/dfs_weight_bounded.h
@@ -0,0 +1,85 @@
+/*
+ * Copyright (C) 2024 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef SRC_TRACE_PROCESSOR_PERFETTO_SQL_INTRINSICS_TABLE_FUNCTIONS_DFS_WEIGHT_BOUNDED_H_
+#define SRC_TRACE_PROCESSOR_PERFETTO_SQL_INTRINSICS_TABLE_FUNCTIONS_DFS_WEIGHT_BOUNDED_H_
+
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "perfetto/ext/base/status_or.h"
+#include "perfetto/trace_processor/basic_types.h"
+#include "src/trace_processor/containers/string_pool.h"
+#include "src/trace_processor/db/table.h"
+#include "src/trace_processor/perfetto_sql/intrinsics/table_functions/static_table_function.h"
+
+namespace perfetto::trace_processor {
+
+// An SQL table-function which performs a weight bounded DFS from a set of start
+// nodes in a graph and returns all the nodes which are reachable from each
+// start node independently.
+//
+// Arguments:
+//  1) |source_node_ids|: RepeatedBuilderResult proto containing a column of
+//     uint32 values corresponding to the source of edges.
+//  2) |dest_node_ids|:  RepeatedBuilderResult proto containing a column of
+//     uint32 values corresponding to the destination of edges. This number of
+//     values should be the same as |source_node_ids| with each index in
+//     |source_node_ids| acting as the source for the corresponding index in
+//     |dest_node_ids|.
+//  3) |edge_weights|:  RepeatedBuilderResult proto containing a column of
+//     uint32 values corresponding to the weight of edges. This number of
+//     values should be the same as |source_node_ids| with each index in
+//     |source_node_ids| acting as the source for the corresponding index in
+//     |edge_weights|.
+//  4) |root_node_ids|: RepeatedBuilderResult proto containing a column of
+//     uint32 values corresponding to the ID of the start nodes in the graph
+//     from which reachability should be computed.
+//  5) |root_max_weights|: RepeatedBuilderResult proto containing a column of
+//     uint32 values corresponding to the max sum of edge weights inclusive,
+//     at which point the DFS from the |root_node_ids| stops. This number of
+//     values should be the same as |root_node_ids|.
+//
+// Returns:
+//  A table with the nodes reachable from the start node, their "parent" in
+//  the tree generated by the DFS and the starting node itself "root". The
+//  schema of the table is (root_node_id uint32_t, node_id uint32_t,
+//  parent_node_id optional<uint32_t>).
+//
+// Note: this function is not intended to be used directly from SQL: instead
+// macros exist in the standard library, wrapping it and making it
+// user-friendly.
+class DfsWeightBounded : public StaticTableFunction {
+ public:
+  explicit DfsWeightBounded(StringPool*);
+  virtual ~DfsWeightBounded() override;
+
+  // StaticTableFunction implementation.
+  Table::Schema CreateSchema() override;
+  std::string TableName() override;
+  uint32_t EstimateRowCount() override;
+  base::StatusOr<std::unique_ptr<Table>> ComputeTable(
+      const std::vector<SqlValue>& arguments) override;
+
+ private:
+  StringPool* pool_ = nullptr;
+};
+
+}  // namespace perfetto::trace_processor
+
+#endif  // SRC_TRACE_PROCESSOR_PERFETTO_SQL_INTRINSICS_TABLE_FUNCTIONS_DFS_WEIGHT_BOUNDED_H_
diff --git a/src/trace_processor/perfetto_sql/intrinsics/table_functions/tables.py b/src/trace_processor/perfetto_sql/intrinsics/table_functions/tables.py
index 3c86e50..0570082 100644
--- a/src/trace_processor/perfetto_sql/intrinsics/table_functions/tables.py
+++ b/src/trace_processor/perfetto_sql/intrinsics/table_functions/tables.py
@@ -187,6 +187,27 @@
         C("in_right_durs", CppOptional(CppString()), flags=ColumnFlag.HIDDEN),
     ])
 
+DFS_WEIGHT_BOUNDED_TABLE = Table(
+    python_module=__file__,
+    class_name="DfsWeightBoundedTable",
+    sql_name="__intrinsic_dfs_weight_bounded",
+    columns=[
+        C("root_node_id", CppUint32()),
+        C("node_id", CppUint32()),
+        C("parent_node_id", CppOptional(CppUint32())),
+        C("in_source_node_ids",
+          CppOptional(CppUint32()),
+          flags=ColumnFlag.HIDDEN),
+        C("in_dest_node_ids", CppOptional(CppUint32()),
+          flags=ColumnFlag.HIDDEN),
+        C("in_edge_weights", CppOptional(CppUint32()), flags=ColumnFlag.HIDDEN),
+        C("in_root_node_ids", CppOptional(CppUint32()),
+          flags=ColumnFlag.HIDDEN),
+        C("in_root_max_weights",
+          CppOptional(CppUint32()),
+          flags=ColumnFlag.HIDDEN),
+    ])
+
 # Keep this list sorted.
 ALL_TABLES = [
     ANCESTOR_SLICE_BY_STACK_TABLE,
@@ -196,6 +217,7 @@
     DESCENDANT_SLICE_BY_STACK_TABLE,
     DESCENDANT_SLICE_TABLE,
     DFS_TABLE,
+    DFS_WEIGHT_BOUNDED_TABLE,
     DOMINATOR_TREE_TABLE,
     EXPERIMENTAL_ANNOTATED_CALLSTACK_TABLE,
     EXPERIMENTAL_COUNTER_DUR_TABLE,
diff --git a/src/trace_processor/perfetto_sql/stdlib/graphs/search.sql b/src/trace_processor/perfetto_sql/stdlib/graphs/search.sql
index 4bba25a..6214716 100644
--- a/src/trace_processor/perfetto_sql/stdlib/graphs/search.sql
+++ b/src/trace_processor/perfetto_sql/stdlib/graphs/search.sql
@@ -99,3 +99,82 @@
   SELECT node_id, lead(node_id) OVER (PARTITION BY node_parent_id ORDER BY sort_key) AS next_node_id
     FROM $graph_table
 );
+
+-- Computes the "reachable" set of nodes in a directed graph from a set of
+-- starting (root) nodes by performing a depth-first search from each root node on the graph.
+-- The search is bounded by the sum of edge weights on the path and the root node specifies the
+-- max weight (inclusive) allowed before stopping the search.
+-- The returned nodes are structured as a tree with parent-child relationships corresponding
+-- to the order in which nodes were encountered by the DFS. Each row also has the root node from
+-- which where the edge was encountered.
+--
+-- While this macro can be used directly by end users (hence being public),
+-- it is primarily intended as a lower-level building block upon which higher
+-- level functions/macros in the standard library can be built.
+--
+-- Example usage on traces with sched info:
+-- ```
+-- -- Compute the reachable nodes from a sched wakeup chain
+-- INCLUDE PERFETTO MODULE sched.thread_executing_spans;
+--
+-- SELECT *
+-- FROM
+--   graph_reachable_dfs_bounded
+--    !(
+--      (
+--        SELECT
+--          id AS source_node_id,
+--          COALESCE(parent_id, id) AS dest_node_id,
+--          id - COALESCE(parent_id, id) AS edge_weight
+--        FROM _wakeup_chain
+--      ),
+--      (
+--        SELECT
+--          id AS root_node_id,
+--          id - COALESCE(prev_id, id) AS root_max_weight
+--        FROM _wakeup_chain
+--      ));
+-- ```
+CREATE PERFETTO MACRO graph_reachable_weight_bounded_dfs(
+  -- A table/view/subquery corresponding to a directed graph on which the
+  -- reachability search should be performed. This table must have the columns
+  -- "source_node_id" and "dest_node_id" corresponding to the two nodes on
+  -- either end of the edges in the graph and an "edge_weight" corresponding to the
+  -- weight of the edge between the node.
+  --
+  -- Note: the columns must contain uint32 similar to ids in trace processor
+  -- tables (i.e. the values should be relatively dense and close to zero). The
+  -- implementation makes assumptions on this for performance reasons and, if
+  -- this criteria is not, can lead to enormous amounts of memory being
+  -- allocated.
+  graph_table TableOrSubquery,
+  -- A table/view/subquery corresponding to start nodes to |graph_table| which will be the
+  -- roots of the reachability trees. This table must have the columns
+  -- "root_node_id" and "root_max_weight" corresponding to the starting node id and the max
+  -- weight allowed on the tree.
+  --
+  -- Note: the columns must contain uint32 similar to ids in trace processor
+  -- tables (i.e. the values should be relatively dense and close to zero). The
+  -- implementation makes assumptions on this for performance reasons and, if
+  -- this criteria is not, can lead to enormous amounts of memory being
+  -- allocated.
+  root_table TableOrSubquery
+)
+-- The returned table has the schema (root_node_id, node_id UINT32, parent_node_id UINT32).
+-- |root_node_id| is the id of the starting node under which this edge was encountered.
+-- |node_id| is the id of the node from the input graph and |parent_node_id|
+-- is the id of the node which was the first encountered predecessor in a DFS
+-- search of the graph.
+RETURNS TableOrSubquery AS
+(
+  WITH __temp_graph_table AS (SELECT * FROM $graph_table),
+  __temp_root_table AS (SELECT * FROM $root_table)
+  SELECT dt.root_node_id, dt.node_id, dt.parent_node_id
+  FROM __intrinsic_dfs_weight_bounded(
+    (SELECT RepeatedField(source_node_id) FROM __temp_graph_table),
+    (SELECT RepeatedField(dest_node_id) FROM __temp_graph_table),
+    (SELECT RepeatedField(edge_weight) FROM __temp_graph_table),
+    (SELECT RepeatedField(root_node_id) FROM __temp_root_table),
+    (SELECT RepeatedField(root_max_weight) FROM __temp_root_table)
+  ) dt
+);
diff --git a/src/trace_processor/trace_processor_impl.cc b/src/trace_processor/trace_processor_impl.cc
index 9ac4a5c..60c66b2 100644
--- a/src/trace_processor/trace_processor_impl.cc
+++ b/src/trace_processor/trace_processor_impl.cc
@@ -84,6 +84,7 @@
 #include "src/trace_processor/perfetto_sql/intrinsics/table_functions/connected_flow.h"
 #include "src/trace_processor/perfetto_sql/intrinsics/table_functions/descendant.h"
 #include "src/trace_processor/perfetto_sql/intrinsics/table_functions/dfs.h"
+#include "src/trace_processor/perfetto_sql/intrinsics/table_functions/dfs_weight_bounded.h"
 #include "src/trace_processor/perfetto_sql/intrinsics/table_functions/dominator_tree.h"
 #include "src/trace_processor/perfetto_sql/intrinsics/table_functions/experimental_annotated_stack.h"
 #include "src/trace_processor/perfetto_sql/intrinsics/table_functions/experimental_counter_dur.h"
@@ -916,6 +917,8 @@
       context_.storage->mutable_string_pool()));
   engine_->RegisterStaticTableFunction(
       std::make_unique<Dfs>(context_.storage->mutable_string_pool()));
+  engine_->RegisterStaticTableFunction(std::make_unique<DfsWeightBounded>(
+      context_.storage->mutable_string_pool()));
 
   // Metrics.
   RegisterAllProtoBuilderFunctions(&pool_, engine_.get(), this);
diff --git a/test/trace_processor/diff_tests/stdlib/graphs/search_tests.py b/test/trace_processor/diff_tests/stdlib/graphs/search_tests.py
index 1e3b96b..48cc56f 100644
--- a/test/trace_processor/diff_tests/stdlib/graphs/search_tests.py
+++ b/test/trace_processor/diff_tests/stdlib/graphs/search_tests.py
@@ -153,3 +153,48 @@
         3,2
         2,"[NULL]"
         """))
+
+  def test_weight_bounded_dfs(self):
+    return DiffTestBlueprint(
+        trace=DataPath('counters.json'),
+        query="""
+          INCLUDE PERFETTO MODULE graphs.search;
+
+          CREATE PERFETTO TABLE foo AS
+          SELECT 0 AS source_node_id, 0 AS dest_node_id, 0 AS edge_weight
+          UNION ALL
+          VALUES (1, 2, 1)
+          UNION ALL
+          VALUES (1, 3, 1)
+          UNION ALL
+          VALUES (3, 4, 1)
+          UNION ALL
+          VALUES (3, 5, 0)
+          UNION ALL
+          VALUES (5, 6, 0);
+
+          CREATE PERFETTO TABLE roots AS
+          SELECT 0 AS root_node_id, 0 AS root_max_weight
+          UNION ALL
+          VALUES (1, 2)
+          UNION ALL
+          VALUES (3, 1)
+          UNION ALL
+          VALUES (2, 0);
+
+          SELECT * FROM graph_reachable_weight_bounded_dfs!(foo, roots);
+        """,
+        out=Csv("""
+        "root_node_id","node_id","parent_node_id"
+        0,0,"[NULL]"
+        1,1,"[NULL]"
+        1,2,1
+        1,3,1
+        1,5,3
+        1,6,5
+        3,3,"[NULL]"
+        3,4,3
+        3,5,3
+        3,6,5
+        2,2,"[NULL]"
+        """))