Merge "Do not hardcode tracing timeout."
diff --git a/test/end_to_end_shared_memory_fuzzer.cc b/test/end_to_end_shared_memory_fuzzer.cc
index a75edd7..22902d3 100644
--- a/test/end_to_end_shared_memory_fuzzer.cc
+++ b/test/end_to_end_shared_memory_fuzzer.cc
@@ -46,8 +46,11 @@
 // consumer.
 class FakeProducer : public Producer {
  public:
-  FakeProducer(std::string name, const uint8_t* data, size_t size)
-      : name_(std::move(name)), data_(data), size_(size) {}
+  FakeProducer(std::string name,
+               const uint8_t* data,
+               size_t size,
+               FakeConsumer* consumer)
+      : name_(std::move(name)), data_(data), size_(size), consumer_(consumer) {}
 
   void Connect(const char* socket_name, base::TaskRunner* task_runner) {
     endpoint_ = ProducerIPCClient::Connect(socket_name, this, task_runner);
@@ -75,6 +78,7 @@
     auto end_packet = trace_writer->NewTracePacket();
     end_packet->set_for_testing()->set_str("end");
     end_packet->Finalize();
+    consumer_->BusyWaitReadBuffers();
   }
 
   void TearDownDataSourceInstance(DataSourceInstanceID) override {}
@@ -85,17 +89,18 @@
   const size_t size_;
   DataSourceID id_ = 0;
   std::unique_ptr<Service::ProducerEndpoint> endpoint_;
+  FakeConsumer* consumer_;
 };
 
 class FakeProducerDelegate : public ThreadDelegate {
  public:
-  FakeProducerDelegate(const uint8_t* data, size_t size)
-      : data_(data), size_(size) {}
+  FakeProducerDelegate(const uint8_t* data, size_t size, FakeConsumer* consumer)
+      : data_(data), size_(size), consumer_(consumer) {}
   ~FakeProducerDelegate() override = default;
 
   void Initialize(base::TaskRunner* task_runner) override {
-    producer_.reset(
-        new FakeProducer("android.perfetto.FakeProducer", data_, size_));
+    producer_.reset(new FakeProducer("android.perfetto.FakeProducer", data_,
+                                     size_, consumer_));
     producer_->Connect(PRODUCER_SOCKET, task_runner);
   }
 
@@ -103,6 +108,7 @@
   std::unique_ptr<FakeProducer> producer_;
   const uint8_t* data_;
   const size_t size_;
+  FakeConsumer* consumer_;
 };
 
 class ServiceDelegate : public ThreadDelegate {
@@ -128,14 +134,10 @@
   TaskRunnerThread service_thread;
   service_thread.Start(std::unique_ptr<ServiceDelegate>(new ServiceDelegate()));
 
-  TaskRunnerThread producer_thread;
-  producer_thread.Start(std::unique_ptr<FakeProducerDelegate>(
-      new FakeProducerDelegate(data, size)));
-
   // Setup the TraceConfig for the consumer.
   TraceConfig trace_config;
   trace_config.add_buffers()->set_size_kb(8);
-  trace_config.set_duration_ms(10);
+  trace_config.set_duration_ms(1000);
 
   // Create the buffer for ftrace.
   auto* ds_config = trace_config.add_data_sources()->mutable_config();
@@ -154,6 +156,11 @@
   };
   FakeConsumer consumer(trace_config, std::move(function), &task_runner);
   consumer.Connect(CONSUMER_SOCKET);
+
+  TaskRunnerThread producer_thread;
+  producer_thread.Start(std::unique_ptr<FakeProducerDelegate>(
+      new FakeProducerDelegate(data, size, &consumer)));
+
   task_runner.RunUntilCheckpoint("no.more.packets");
   return 0;
 }
diff --git a/test/fake_consumer.cc b/test/fake_consumer.cc
index 54bf7db..f8f63ab 100644
--- a/test/fake_consumer.cc
+++ b/test/fake_consumer.cc
@@ -59,4 +59,14 @@
   packet_callback_(std::move(data), has_more);
 }
 
+void FakeConsumer::BusyWaitReadBuffers() {
+  task_runner_->PostDelayedTask(
+      std::bind([this]() {
+        endpoint_->ReadBuffers();
+        task_runner_->PostDelayedTask(
+            std::bind([this]() { BusyWaitReadBuffers(); }), 1);
+      }),
+      1);
+}
+
 }  // namespace perfetto
diff --git a/test/fake_consumer.h b/test/fake_consumer.h
index f50b9da..eec29c6 100644
--- a/test/fake_consumer.h
+++ b/test/fake_consumer.h
@@ -39,6 +39,7 @@
 
   void Connect(const char* socket_name);
   void ReadTraceData();
+  void BusyWaitReadBuffers();
 
   // Consumer implementation.
   void OnConnect() override;