diff --git a/src/trace_redaction/BUILD.gn b/src/trace_redaction/BUILD.gn
index e876fac..08abf3c 100644
--- a/src/trace_redaction/BUILD.gn
+++ b/src/trace_redaction/BUILD.gn
@@ -84,6 +84,7 @@
     "../../protos/perfetto/trace/ftrace:zero",
     "../../protos/perfetto/trace/ps:zero",
     "../trace_processor:storage_minimal",
+    "../trace_processor/util:util",
   ]
 }
 
diff --git a/src/trace_redaction/build_timeline.cc b/src/trace_redaction/build_timeline.cc
index e94ddaa..8c44f82 100644
--- a/src/trace_redaction/build_timeline.cc
+++ b/src/trace_redaction/build_timeline.cc
@@ -109,9 +109,8 @@
 
 }  // namespace
 
-base::StatusOr<CollectPrimitive::ContinueCollection> BuildTimeline::Collect(
-    const TracePacket::Decoder& packet,
-    Context* context) const {
+base::Status BuildTimeline::Collect(const TracePacket::Decoder& packet,
+                                    Context* context) const {
   // TODO(vaage): This should only be true on the first call. However, that
   // means a branch is called N times when N-1 times it will be false. This may
   // be common across Collect primitives. Having a "begin" and "end" end-points.
@@ -129,16 +128,14 @@
     AppendEvents(packet.timestamp(),
                  ProcessTree::Decoder(packet.process_tree()),
                  context->timeline.get());
-    return ContinueCollection::kNextPacket;
   }
 
   if (packet.has_ftrace_events()) {
     AppendEvents(FtraceEventBundle::Decoder(packet.ftrace_events()),
                  context->timeline.get());
-    return ContinueCollection::kNextPacket;
   }
 
-  return ContinueCollection::kNextPacket;
+  return base::OkStatus();
 }
 
 }  // namespace perfetto::trace_redaction
diff --git a/src/trace_redaction/build_timeline.h b/src/trace_redaction/build_timeline.h
index 49b92bf..d39474f 100644
--- a/src/trace_redaction/build_timeline.h
+++ b/src/trace_redaction/build_timeline.h
@@ -17,7 +17,6 @@
 #ifndef SRC_TRACE_REDACTION_BUILD_TIMELINE_H_
 #define SRC_TRACE_REDACTION_BUILD_TIMELINE_H_
 
-#include "perfetto/ext/base/status_or.h"
 #include "src/trace_redaction/trace_redaction_framework.h"
 
 #include "protos/perfetto/trace/trace_packet.pbzero.h"
@@ -28,9 +27,8 @@
 // packets and stores them in a timeline.
 class BuildTimeline : public CollectPrimitive {
  public:
-  base::StatusOr<ContinueCollection> Collect(
-      const protos::pbzero::TracePacket::Decoder& packet,
-      Context* context) const override;
+  base::Status Collect(const protos::pbzero::TracePacket::Decoder& packet,
+                       Context* context) const override;
 };
 
 }  // namespace perfetto::trace_redaction
diff --git a/src/trace_redaction/build_timeline_unittest.cc b/src/trace_redaction/build_timeline_unittest.cc
index 39eb877..6041afd 100644
--- a/src/trace_redaction/build_timeline_unittest.cc
+++ b/src/trace_redaction/build_timeline_unittest.cc
@@ -90,8 +90,7 @@
 class BuildTimelineTest : public testing::Test,
                           public testing::WithParamInterface<TestParams> {
  protected:
-  base::StatusOr<CollectPrimitive::ContinueCollection> PushProcessTreePacket(
-      uint64_t timestamp) {
+  base::Status PushProcessTreePacket(uint64_t timestamp) {
     protos::gen::TracePacket packet;
     packet.set_trusted_uid(9999);
     packet.set_timestamp(timestamp);
@@ -124,8 +123,7 @@
                           &context_);
   }
 
-  base::StatusOr<CollectPrimitive::ContinueCollection>
-  PushSchedProcessFreePacket(uint64_t timestamp) {
+  base::Status PushSchedProcessFreePacket(uint64_t timestamp) {
     protos::gen::TracePacket packet;
 
     packet.set_trusted_uid(9999);
@@ -159,7 +157,7 @@
   auto params = GetParam();
 
   auto result = PushProcessTreePacket(kProcessTreeTimestamp);
-  ASSERT_OK(result) << result.status().message();
+  ASSERT_OK(result) << result.message();
 
   context_.timeline->Sort();
 
@@ -194,10 +192,10 @@
   auto params = GetParam();
 
   auto result = PushProcessTreePacket(kProcessTreeTimestamp);
-  ASSERT_OK(result) << result.status().message();
+  ASSERT_OK(result) << result.message();
 
   result = PushSchedProcessFreePacket(kThreadFreeTimestamp);
-  ASSERT_OK(result) << result.status().message();
+  ASSERT_OK(result) << result.message();
 
   context_.timeline->Sort();
 
diff --git a/src/trace_redaction/find_package_uid.cc b/src/trace_redaction/find_package_uid.cc
index 4ede90a..7b5f200 100644
--- a/src/trace_redaction/find_package_uid.cc
+++ b/src/trace_redaction/find_package_uid.cc
@@ -15,7 +15,6 @@
  */
 
 #include "src/trace_redaction/find_package_uid.h"
-#include "perfetto/ext/base/status_or.h"
 #include "perfetto/ext/base/string_view.h"
 #include "src/trace_redaction/trace_redaction_framework.h"
 
@@ -24,16 +23,29 @@
 
 namespace perfetto::trace_redaction {
 
-base::StatusOr<CollectPrimitive::ContinueCollection> FindPackageUid::Collect(
-    const protos::pbzero::TracePacket::Decoder& packet,
-    Context* context) const {
+base::Status FindPackageUid::Begin(Context* context) const {
   if (context->package_name.empty()) {
     return base::ErrStatus("FindPackageUid: missing package name.");
   }
 
+  if (context->package_uid.has_value()) {
+    return base::ErrStatus("FindPackageUid: package uid already found.");
+  }
+
+  return base::OkStatus();
+}
+
+base::Status FindPackageUid::Collect(
+    const protos::pbzero::TracePacket::Decoder& packet,
+    Context* context) const {
+  // If a package has been found in a pervious iteration, stop.
+  if (context->package_uid.has_value()) {
+    return base::OkStatus();
+  }
+
   // Skip package and move onto the next packet.
   if (!packet.has_packages_list()) {
-    return ContinueCollection::kNextPacket;
+    return base::OkStatus();
   }
 
   protos::pbzero::PackagesList::Decoder packages_list_decoder(
@@ -53,12 +65,20 @@
       if (base::StringView(context->package_name)
               .CaseInsensitiveEq(name.as_string())) {
         context->package_uid = NormalizeUid(uid.as_uint64());
-        return ContinueCollection::kRetire;
+        return base::OkStatus();
       }
     }
   }
 
-  return ContinueCollection::kNextPacket;
+  return base::OkStatus();
+}
+
+base::Status FindPackageUid::End(Context* context) const {
+  if (!context->package_uid.has_value()) {
+    return base::ErrStatus("FindPackageUid: did not find package uid.");
+  }
+
+  return base::OkStatus();
 }
 
 }  // namespace perfetto::trace_redaction
diff --git a/src/trace_redaction/find_package_uid.h b/src/trace_redaction/find_package_uid.h
index 3ec814b..70c7baf 100644
--- a/src/trace_redaction/find_package_uid.h
+++ b/src/trace_redaction/find_package_uid.h
@@ -17,7 +17,6 @@
 #ifndef SRC_TRACE_REDACTION_FIND_PACKAGE_UID_H_
 #define SRC_TRACE_REDACTION_FIND_PACKAGE_UID_H_
 
-#include "perfetto/ext/base/status_or.h"
 #include "src/trace_redaction/trace_redaction_framework.h"
 
 #include "protos/perfetto/trace/trace_packet.pbzero.h"
@@ -30,9 +29,12 @@
 // need to report the failure.
 class FindPackageUid final : public CollectPrimitive {
  public:
-  base::StatusOr<ContinueCollection> Collect(
-      const protos::pbzero::TracePacket::Decoder& packet,
-      Context* context) const override;
+  base::Status Begin(Context*) const override;
+
+  base::Status Collect(const protos::pbzero::TracePacket::Decoder& packet,
+                       Context* context) const override;
+
+  base::Status End(Context*) const override;
 };
 
 }  // namespace perfetto::trace_redaction
diff --git a/src/trace_redaction/find_package_uid_unittest.cc b/src/trace_redaction/find_package_uid_unittest.cc
index f5015d6..dd8e44f 100644
--- a/src/trace_redaction/find_package_uid_unittest.cc
+++ b/src/trace_redaction/find_package_uid_unittest.cc
@@ -186,8 +186,14 @@
 
   const auto decoder = protos::pbzero::TracePacket::Decoder(packet);
 
-  ASSERT_OK_AND_ASSIGN(auto status, find.Collect(decoder, &context));
-  ASSERT_EQ(status, CollectPrimitive::ContinueCollection::kRetire);
+  base::Status status = find.Begin(&context);
+  ASSERT_OK(status) << status.message();
+
+  status = find.Collect(decoder, &context);
+  ASSERT_OK(status) << status.message();
+
+  status = find.End(&context);
+  ASSERT_OK(status) << status.message();
 
   ASSERT_TRUE(context.package_uid.has_value());
   ASSERT_EQ(NormalizeUid(context.package_uid.value()), NormalizeUid(10205));
@@ -203,8 +209,15 @@
 
   const auto decoder = protos::pbzero::TracePacket::Decoder(packet);
 
-  ASSERT_OK_AND_ASSIGN(auto status, find.Collect(decoder, &context));
-  ASSERT_EQ(status, CollectPrimitive::ContinueCollection::kNextPacket);
+  base::Status status = find.Begin(&context);
+  ASSERT_OK(status) << status.message();
+
+  status = find.Collect(decoder, &context);
+  ASSERT_OK(status) << status.message();
+
+  // The should not have been found; End() should return an error.
+  status = find.End(&context);
+  ASSERT_FALSE(status.ok()) << status.message();
 
   ASSERT_FALSE(context.package_uid.has_value());
 }
@@ -219,8 +232,15 @@
 
   const auto decoder = protos::pbzero::TracePacket::Decoder(packet);
 
-  ASSERT_OK_AND_ASSIGN(auto status, find.Collect(decoder, &context));
-  ASSERT_EQ(status, CollectPrimitive::ContinueCollection::kNextPacket);
+  base::Status status = find.Begin(&context);
+  ASSERT_OK(status) << status.message();
+
+  status = find.Collect(decoder, &context);
+  ASSERT_OK(status) << status.message();
+
+  // The should not have been found; End() should return an error.
+  status = find.End(&context);
+  ASSERT_FALSE(status.ok()) << status.message();
 
   ASSERT_FALSE(context.package_uid.has_value());
 }
@@ -229,10 +249,28 @@
   const auto packet = CreatePackageListPacket();
 
   Context context;
+
   const FindPackageUid find;
 
   const auto decoder = protos::pbzero::TracePacket::Decoder(packet);
-  ASSERT_FALSE(find.Collect(decoder, &context).ok());
+
+  base::Status status = find.Begin(&context);
+  ASSERT_FALSE(status.ok()) << status.message();
+}
+
+TEST(FindPackageUidTest, FailsIfUidStartsInitialized) {
+  const auto packet = CreatePackageListPacket();
+
+  Context context;
+  context.package_name = "com.google.android.uvexposurereporter";
+  context.package_uid = 1000;
+
+  const FindPackageUid find;
+
+  const auto decoder = protos::pbzero::TracePacket::Decoder(packet);
+
+  base::Status status = find.Begin(&context);
+  ASSERT_FALSE(status.ok()) << status.message();
 }
 
 }  // namespace perfetto::trace_redaction
diff --git a/src/trace_redaction/trace_redaction_framework.cc b/src/trace_redaction/trace_redaction_framework.cc
index e3b729a..477f9ce 100644
--- a/src/trace_redaction/trace_redaction_framework.cc
+++ b/src/trace_redaction/trace_redaction_framework.cc
@@ -20,6 +20,14 @@
 
 CollectPrimitive::~CollectPrimitive() = default;
 
+base::Status CollectPrimitive::Begin(Context*) const {
+  return base::OkStatus();
+}
+
+base::Status CollectPrimitive::End(Context*) const {
+  return base::OkStatus();
+}
+
 BuildPrimitive::~BuildPrimitive() = default;
 
 TransformPrimitive::~TransformPrimitive() = default;
diff --git a/src/trace_redaction/trace_redaction_framework.h b/src/trace_redaction/trace_redaction_framework.h
index 6304fe7..c1c7e45 100644
--- a/src/trace_redaction/trace_redaction_framework.h
+++ b/src/trace_redaction/trace_redaction_framework.h
@@ -178,25 +178,29 @@
   std::unique_ptr<ProcessThreadTimeline> timeline;
 };
 
-// Responsible for extracting low-level data from the trace and storing it in
-// the context.
+// Extracts low-level data from the trace and writes it into the context. The
+// life cycle of a collect primitive is:
+//
+//  primitive.Begin(&context);
+//
+//  for (auto& packet : packets) {
+//    primitive.Collect(packet, &context);
+//  }
+//
+//  primitive.End(&context);
 class CollectPrimitive {
  public:
-  // When a collect primitive has collected all necessary information, it can
-  // stop processing packets by returning kRetire. If the primitives wants to
-  // continue processing packets, it will return kNextPacket.
-  //
-  // If a collector encounters an unrecoverable error, base::ErrStatus() is
-  // returned.
-  enum class ContinueCollection : bool { kRetire = false, kNextPacket = true };
-
   virtual ~CollectPrimitive();
 
-  // Processes a packet and writes low-level data to the context. Returns
-  // kContinue if the primitive wants more data (i.e. next packet).
-  virtual base::StatusOr<ContinueCollection> Collect(
-      const protos::pbzero::TracePacket::Decoder& packet,
-      Context* context) const = 0;
+  // Called once before the first call to Collect(...).
+  virtual base::Status Begin(Context*) const;
+
+  // Reads a trace packet and updates the context.
+  virtual base::Status Collect(const protos::pbzero::TracePacket::Decoder&,
+                               Context*) const = 0;
+
+  // Called once after the last call to Collect(...).
+  virtual base::Status End(Context*) const;
 };
 
 // Responsible for converting low-level data from the context and storing it in
diff --git a/src/trace_redaction/trace_redactor.cc b/src/trace_redaction/trace_redactor.cc
index 2f6d7f9..fa56af4 100644
--- a/src/trace_redaction/trace_redactor.cc
+++ b/src/trace_redaction/trace_redactor.cc
@@ -19,7 +19,6 @@
 #include <cstddef>
 #include <string>
 #include <string_view>
-#include <vector>
 
 #include "perfetto/base/status.h"
 #include "perfetto/ext/base/file_utils.h"
@@ -28,6 +27,7 @@
 #include "perfetto/protozero/scattered_heap_buffer.h"
 #include "perfetto/trace_processor/trace_blob.h"
 #include "perfetto/trace_processor/trace_blob_view.h"
+#include "src/trace_processor/util/status_macros.h"
 #include "src/trace_redaction/trace_redaction_framework.h"
 
 #include "protos/perfetto/trace/trace.pbzero.h"
@@ -55,9 +55,8 @@
   trace_processor::TraceBlobView whole_view(
       trace_processor::TraceBlob::FromMmap(std::move(mapped)));
 
-  if (auto status = Collect(context, whole_view); !status.ok()) {
-    return status;
-  }
+  // TODO(vaage): Update other status code to use RETURN_IF_ERROR.
+  RETURN_IF_ERROR(Collect(context, whole_view));
 
   if (auto status = Build(context); !status.ok()) {
     return status;
@@ -74,13 +73,8 @@
 base::Status TraceRedactor::Collect(
     Context* context,
     const trace_processor::TraceBlobView& view) const {
-  // Mask, marking which collectors should be ran. When a collector no longer
-  // needs to run, the value will be null.
-  std::vector<const CollectPrimitive*> collectors;
-  collectors.reserve(collectors_.size());
-
   for (const auto& collector : collectors_) {
-    collectors.push_back(collector.get());
+    RETURN_IF_ERROR(collector->Begin(context));
   }
 
   const Trace::Decoder trace_decoder(view.data(), view.length());
@@ -88,28 +82,13 @@
   for (auto packet_it = trace_decoder.packet(); packet_it; ++packet_it) {
     const TracePacket::Decoder packet(packet_it->as_bytes());
 
-    for (auto cit = collectors.begin(); cit != collectors.end();) {
-      auto status = (*cit)->Collect(packet, context);
-
-      if (!status.ok()) {
-        return status.status();
-      }
-
-      // If this collector has returned `kStop`, it means that it (and it alone)
-      // no longer needs to run. The driver (TraceRedactor) should not invoke it
-      // on any future packets.
-      if (status.value() == CollectPrimitive::ContinueCollection::kRetire) {
-        cit = collectors.erase(cit);
-      } else {
-        ++cit;
-      }
+    for (auto& collector : collectors_) {
+      RETURN_IF_ERROR(collector->Collect(packet, context));
     }
+  }
 
-    // If all the collectors have found what they were looking for, then there
-    // is no reason to continue through the trace.
-    if (collectors.empty()) {
-      break;
-    }
+  for (const auto& collector : collectors_) {
+    RETURN_IF_ERROR(collector->End(context));
   }
 
   return base::OkStatus();
