Prevent traced from blocking on a unresponsive producer.
Previously traced would block indefinitely if a producer
stop draining the IPC channel. This would result in eventually
the sendmsg call blocking and android's Watchdog killing traced
for being unresponsive, if some producer in a long trace was
unresponsive.
We add an integration test which exposed this issue to prevent
regressions, and set a timeout on the producer sockets and disconnect
the producer on timeouts.
We don't set the timeout on the consumer socket on the assumption
that the consumer is trusted and to minimize changes to our
behaviour as S is finalizing.
Bug: 169051440
Change-Id: I63988f79b63977ae27eedd126a2bbdcaaa520e78
diff --git a/Android.bp b/Android.bp
index ba600ad..faca278 100644
--- a/Android.bp
+++ b/Android.bp
@@ -1003,6 +1003,7 @@
":perfetto_src_ipc_client",
":perfetto_src_ipc_common",
":perfetto_src_ipc_host",
+ ":perfetto_src_ipc_perfetto_ipc",
":perfetto_src_kallsyms_kallsyms",
":perfetto_src_perfetto_cmd_perfetto_atoms",
":perfetto_src_protozero_protozero",
@@ -1251,6 +1252,7 @@
":perfetto_src_ipc_client",
":perfetto_src_ipc_common",
":perfetto_src_ipc_host",
+ ":perfetto_src_ipc_perfetto_ipc",
":perfetto_src_kallsyms_kallsyms",
":perfetto_src_perfetto_cmd_perfetto_atoms",
":perfetto_src_protozero_protozero",
@@ -1634,6 +1636,7 @@
":perfetto_src_ipc_client",
":perfetto_src_ipc_common",
":perfetto_src_ipc_host",
+ ":perfetto_src_ipc_perfetto_ipc",
":perfetto_src_kallsyms_kallsyms",
":perfetto_src_perfetto_cmd_perfetto_atoms",
":perfetto_src_profiling_common_callstack_trie",
@@ -6310,6 +6313,11 @@
],
}
+// GN: //src/ipc:perfetto_ipc
+filegroup {
+ name: "perfetto_src_ipc_perfetto_ipc",
+}
+
// GN: //src/ipc:test_messages_cpp
genrule {
name: "perfetto_src_ipc_test_messages_cpp_gen",
diff --git a/CHANGELOG b/CHANGELOG
index 5e22ca6..67a2e8f 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,7 @@
Unreleased:
Tracing service and probes:
- *
+ * Fixed crash of tracing service if a client is unresponsive on the IPC
+ channel. Clients are disconnected if they don't respond to IPCs for 10s.
Trace Processor:
*
UI:
diff --git a/include/perfetto/ext/base/unix_socket.h b/include/perfetto/ext/base/unix_socket.h
index 5731237..beb02e6 100644
--- a/include/perfetto/ext/base/unix_socket.h
+++ b/include/perfetto/ext/base/unix_socket.h
@@ -236,6 +236,12 @@
// be reused with Listen() or Connect().
void Shutdown(bool notify);
+ void SetTxTimeout(uint32_t timeout_ms) {
+ PERFETTO_CHECK(sock_raw_.SetTxTimeout(timeout_ms));
+ }
+ void SetRxTimeout(uint32_t timeout_ms) {
+ PERFETTO_CHECK(sock_raw_.SetRxTimeout(timeout_ms));
+ }
// Returns true is the message was queued, false if there was no space in the
// output buffer, in which case the client should retry or give up.
// If any other error happens the socket will be shutdown and
diff --git a/src/ipc/client_impl.h b/src/ipc/client_impl.h
index 3947823..fafd756 100644
--- a/src/ipc/client_impl.h
+++ b/src/ipc/client_impl.h
@@ -67,6 +67,8 @@
base::WeakPtr<ServiceProxy>,
int fd = -1);
+ base::UnixSocket* GetUnixSocketForTesting() { return sock_.get(); }
+
private:
struct QueuedRequest {
QueuedRequest();
diff --git a/src/ipc/host_impl.cc b/src/ipc/host_impl.cc
index 2806a53..c6cf8c1 100644
--- a/src/ipc/host_impl.cc
+++ b/src/ipc/host_impl.cc
@@ -92,6 +92,8 @@
clients_by_socket_[new_conn.get()] = client.get();
client->id = client_id;
client->sock = std::move(new_conn);
+ // Watchdog is 30 seconds, so set the socket timeout to 10 seconds.
+ client->sock->SetTxTimeout(10000);
clients_[client_id] = std::move(client);
}
@@ -232,11 +234,14 @@
void HostImpl::SendFrame(ClientConnection* client, const Frame& frame, int fd) {
std::string buf = BufferedFrameDeserializer::Serialize(frame);
- // TODO(primiano): this should do non-blocking I/O. But then what if the
- // socket buffer is full? We might want to either drop the request or throttle
- // the send and PostTask the reply later? Right now we are making Send()
- // blocking as a workaround. Propagate bakpressure to the caller instead.
+ // When a new Client connects in OnNewClientConnection we set a timeout on
+ // Send (see call to SetTxTimeout).
+ //
+ // The old behaviour was to do a blocking I/O call, which caused crashes from
+ // misbehaving producers (see b/169051440).
bool res = client->sock->Send(buf.data(), buf.size(), fd);
+ // If we timeout |res| will be false, but the UnixSocket will have called
+ // UnixSocket::ShutDown() and thus |is_connected()| is false.
PERFETTO_CHECK(res || !client->sock->is_connected());
}
diff --git a/src/tracing/ipc/producer/producer_ipc_client_impl.h b/src/tracing/ipc/producer/producer_ipc_client_impl.h
index 53af401..78b4165 100644
--- a/src/tracing/ipc/producer/producer_ipc_client_impl.h
+++ b/src/tracing/ipc/producer/producer_ipc_client_impl.h
@@ -91,6 +91,8 @@
void OnConnect() override;
void OnDisconnect() override;
+ ipc::Client* GetClientForTesting() { return ipc_channel_.get(); }
+
private:
// Invoked soon after having established the connection with the service.
void OnConnectionInitialized(bool connection_succeeded,
diff --git a/test/BUILD.gn b/test/BUILD.gn
index 772ec42..1b81bad 100644
--- a/test/BUILD.gn
+++ b/test/BUILD.gn
@@ -111,6 +111,7 @@
"../protos/perfetto/config:cpp",
"../protos/perfetto/trace:zero",
"../src/base:test_support",
+ "../src/ipc:perfetto_ipc",
"../src/tracing/ipc:common",
]
sources = [
diff --git a/test/end_to_end_integrationtest.cc b/test/end_to_end_integrationtest.cc
index 1dd1eca..6b99fb7 100644
--- a/test/end_to_end_integrationtest.cc
+++ b/test/end_to_end_integrationtest.cc
@@ -584,6 +584,75 @@
}
}
+// This is a regression test see b/169051440 for context.
+//
+// In this test we ensure that traced will not crash if a Producer stops
+// responding or draining the socket (i.e. after we fill up the IPC buffer
+// traced doesn't block on trying to write to the IPC buffer and watchdog
+// doesn't kill it).
+TEST_F(PerfettoTest, UnresponsiveProducer) {
+ base::TestTaskRunner task_runner;
+
+ TestHelper helper(&task_runner);
+ helper.StartServiceIfRequired();
+ auto* producer = helper.ConnectFakeProducer();
+ helper.ConnectConsumer();
+ helper.WaitForConsumerConnect();
+
+ TraceConfig trace_config;
+ trace_config.add_buffers()->set_size_kb(4096 * 10);
+ trace_config.set_duration_ms(100);
+ trace_config.set_flush_timeout_ms(1);
+ trace_config.set_data_source_stop_timeout_ms(1);
+
+ auto* ds_config = trace_config.add_data_sources()->mutable_config();
+ ds_config->set_name("android.perfetto.FakeProducer");
+
+ static constexpr size_t kNumPackets = 1;
+ static constexpr uint32_t kRandomSeed = 42;
+ static constexpr uint32_t kMsgSize = 1024 * 1024 - 42;
+ ds_config->mutable_for_testing()->set_seed(kRandomSeed);
+ ds_config->mutable_for_testing()->set_message_count(kNumPackets);
+ ds_config->mutable_for_testing()->set_message_size(kMsgSize);
+ ds_config->mutable_for_testing()->set_send_batch_on_register(true);
+
+ // This string is just used to make the StartDataSource IPC larger.
+ ds_config->set_legacy_config(std::string(8192, '.'));
+ ds_config->set_target_buffer(0);
+
+ // Run one legit trace, this ensures that the producer above is
+ // valid and correct and mirrors real life producers.
+ helper.StartTracing(trace_config);
+ helper.WaitForProducerEnabled();
+ helper.WaitForTracingDisabled();
+
+ helper.ReadData();
+ helper.WaitForReadData(/* read_count */ 0, /* timeout_ms */ 10000);
+
+ const auto& packets = helper.trace();
+ ASSERT_EQ(packets.size(), 1u);
+ ASSERT_TRUE(packets[0].has_for_testing());
+ ASSERT_FALSE(packets[0].for_testing().str().empty());
+ helper.FreeBuffers();
+
+ // Switch the producer to ignoring the IPC socket. On a pixel 4 it took 13
+ // traces to fill up the IPC buffer and cause traced to block (and eventually
+ // watchdog to kill it).
+ helper.producer_thread()->get()->RemoveFileDescriptorWatch(
+ producer->unix_socket_fd());
+
+ trace_config.set_duration_ms(1);
+ for (uint32_t i = 0u; i < 15u; i++) {
+ helper.StartTracing(trace_config, base::ScopedFile());
+ helper.WaitForTracingDisabled(/* timeout_ms = */ 20000);
+ helper.FreeBuffers();
+ }
+ // We need to readd the FileDescriptor (otherwise when the UnixSocket attempts
+ // to remove it a the FakeProducer is destroyed will hit a CHECK failure.
+ helper.producer_thread()->get()->AddFileDescriptorWatch(
+ producer->unix_socket_fd(), []() {});
+}
+
TEST_F(PerfettoTest, DetachAndReattach) {
base::TestTaskRunner task_runner;
diff --git a/test/fake_producer.cc b/test/fake_producer.cc
index fae6f91..d35a674 100644
--- a/test/fake_producer.cc
+++ b/test/fake_producer.cc
@@ -26,6 +26,8 @@
#include "perfetto/ext/tracing/core/trace_packet.h"
#include "perfetto/ext/tracing/core/trace_writer.h"
#include "perfetto/tracing/core/data_source_config.h"
+#include "src/ipc/client_impl.h"
+#include "src/tracing/ipc/producer/producer_ipc_client_impl.h"
#include "protos/perfetto/config/test_config.gen.h"
#include "protos/perfetto/trace/test_event.pbzero.h"
@@ -159,6 +161,15 @@
endpoint_->NotifyFlushComplete(flush_request_id);
}
+int FakeProducer::unix_socket_fd() {
+ // Since FakeProducer is only used in tests we can include and assume the
+ // implementation.
+ auto* producer = static_cast<ProducerIPCClientImpl*>(endpoint_.get());
+ auto* ipc_client =
+ static_cast<ipc::ClientImpl*>(producer->GetClientForTesting());
+ return ipc_client->GetUnixSocketForTesting()->fd();
+}
+
void FakeProducer::SetupFromConfig(const protos::gen::TestConfig& config) {
rnd_engine_ = std::minstd_rand0(config.seed());
message_count_ = config.message_count();
diff --git a/test/fake_producer.h b/test/fake_producer.h
index c38e40f..c248051 100644
--- a/test/fake_producer.h
+++ b/test/fake_producer.h
@@ -81,6 +81,9 @@
void ClearIncrementalState(const DataSourceInstanceID* /*data_source_ids*/,
size_t /*num_data_sources*/) override {}
+ // For testing, access to the fd used to communicate with the TracingService.
+ int unix_socket_fd();
+
private:
void SetupFromConfig(const protos::gen::TestConfig& config);
void EmitEventBatchOnTaskRunner(std::function<void()> callback);
diff --git a/test/test_helper.cc b/test/test_helper.cc
index 2ffdd8c..b3000b5 100644
--- a/test/test_helper.cc
+++ b/test/test_helper.cc
@@ -56,6 +56,7 @@
void TestHelper::OnTracingDisabled(const std::string& /*error*/) {
std::move(on_stop_tracing_callback_)();
+ on_stop_tracing_callback_ = nullptr;
}
void TestHelper::OnTraceData(std::vector<TracePacket> packets, bool has_more) {
@@ -141,8 +142,10 @@
void TestHelper::StartTracing(const TraceConfig& config,
base::ScopedFile file) {
+ PERFETTO_CHECK(!on_stop_tracing_callback_);
trace_.clear();
- on_stop_tracing_callback_ = CreateCheckpoint("stop.tracing");
+ on_stop_tracing_callback_ =
+ CreateCheckpoint("stop.tracing" + std::to_string(++trace_count_));
endpoint_->EnableTracing(config, std::move(file));
}
@@ -164,6 +167,10 @@
endpoint_->ReadBuffers();
}
+void TestHelper::FreeBuffers() {
+ endpoint_->FreeBuffers();
+}
+
void TestHelper::WaitForConsumerConnect() {
RunUntilCheckpoint("consumer.connected." + std::to_string(cur_consumer_num_));
}
@@ -177,7 +184,8 @@
}
void TestHelper::WaitForTracingDisabled(uint32_t timeout_ms) {
- RunUntilCheckpoint("stop.tracing", timeout_ms);
+ RunUntilCheckpoint(std::string("stop.tracing") + std::to_string(trace_count_),
+ timeout_ms);
}
void TestHelper::WaitForReadData(uint32_t read_count, uint32_t timeout_ms) {
diff --git a/test/test_helper.h b/test/test_helper.h
index 5989a67..e052ec5 100644
--- a/test/test_helper.h
+++ b/test/test_helper.h
@@ -196,6 +196,7 @@
void DisableTracing();
void FlushAndWait(uint32_t timeout_ms);
void ReadData(uint32_t read_count = 0);
+ void FreeBuffers();
void DetachConsumer(const std::string& key);
bool AttachConsumer(const std::string& key);
void CreateProducerProvidedSmb();
@@ -240,6 +241,7 @@
uint64_t instance_num_;
base::TestTaskRunner* task_runner_ = nullptr;
int cur_consumer_num_ = 0;
+ uint64_t trace_count_ = 0;
std::function<void()> on_connect_callback_;
std::function<void()> on_packets_finished_callback_;