base: port UnixSocket to Windows
This change makes both AF_UNIX and TCP/IP sockets
work on Windows. AF_UNIX support is quite recent and works
only on Windows 10, see [1].
I haven't figured out yet if we should use AF_UNIX or just
a TCP socket on Windows for the IPC layer. In any ase, it feels
like the code can support both with very little efforts as
AF_UNIX doesn't add any extra complexity vs SOCK_STREAM.
[1] https://devblogs.microsoft.com/commandline/af_unix-comes-to-windows/
Bug: 174454879
Change-Id: I9ef645678e8da7c739ccb82e656e49f13441f2a4
diff --git a/src/base/unix_socket_unittest.cc b/src/base/unix_socket_unittest.cc
index 3039b0f..0398608 100644
--- a/src/base/unix_socket_unittest.cc
+++ b/src/base/unix_socket_unittest.cc
@@ -17,13 +17,16 @@
#include "perfetto/ext/base/unix_socket.h"
#include <signal.h>
-#include <sys/mman.h>
-#include <sys/socket.h>
#include <sys/types.h>
-#include <sys/un.h>
#include <list>
#include <thread>
+#if !PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
+#include <sys/mman.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+#endif
+
#include "perfetto/base/build_config.h"
#include "perfetto/base/logging.h"
#include "perfetto/ext/base/file_utils.h"
@@ -44,7 +47,7 @@
using ::testing::InvokeWithoutArgs;
using ::testing::Mock;
-constexpr char kSocketName[] = TEST_SOCK_NAME("unix_socket_unittest");
+constexpr char kSocketName[] = TEST_SOCK_NAME("unix_socket_unittest.sock");
class MockEventListener : public UnixSocket::EventListener {
public:
@@ -122,7 +125,11 @@
auto cli_disconnected = task_runner_.CreateCheckpoint("cli_disconnected");
EXPECT_CALL(event_listener_, OnDisconnect(cli.get()))
.WillOnce(InvokeWithoutArgs(cli_disconnected));
- EXPECT_FALSE(cli->Send("whatever"));
+
+ // On Windows the first send immediately after the disconnection succeeds, the
+ // kernel will detect the disconnection only later.
+ cli->Send(".");
+ EXPECT_FALSE(cli->Send("should_fail_both_on_win_and_unix"));
task_runner_.RunUntilCheckpoint("cli_disconnected");
}
@@ -133,16 +140,19 @@
auto cli = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_,
SockFamily::kUnix, SockType::kStream);
- EXPECT_CALL(event_listener_, OnConnect(cli.get(), true));
auto cli_connected = task_runner_.CreateCheckpoint("cli_connected");
+ EXPECT_CALL(event_listener_, OnConnect(cli.get(), true))
+ .WillOnce(InvokeWithoutArgs(cli_connected));
+ auto srv_conn_seen = task_runner_.CreateCheckpoint("srv_conn_seen");
auto srv_disconnected = task_runner_.CreateCheckpoint("srv_disconnected");
EXPECT_CALL(event_listener_, OnNewIncomingConnection(srv.get(), _))
- .WillOnce(Invoke([this, cli_connected, srv_disconnected](
+ .WillOnce(Invoke([this, srv_conn_seen, srv_disconnected](
UnixSocket*, UnixSocket* srv_conn) {
EXPECT_CALL(event_listener_, OnDisconnect(srv_conn))
.WillOnce(InvokeWithoutArgs(srv_disconnected));
- cli_connected();
+ srv_conn_seen();
}));
+ task_runner_.RunUntilCheckpoint("srv_conn_seen");
task_runner_.RunUntilCheckpoint("cli_connected");
auto srv_conn = event_listener_.GetIncomingConnection();
@@ -184,10 +194,361 @@
task_runner_.RunUntilCheckpoint("srv_disconnected");
}
-constexpr char cli_str[] = "cli>srv";
-constexpr char srv_str[] = "srv>cli";
+TEST_F(UnixSocketTest, ListenWithPassedSocketHandle) {
+ auto sock_raw =
+ UnixSocketRaw::CreateMayFail(SockFamily::kUnix, SockType::kStream);
+ ASSERT_TRUE(sock_raw.Bind(kSocketName));
+ auto fd = sock_raw.ReleaseFd();
+ auto srv = UnixSocket::Listen(std::move(fd), &event_listener_, &task_runner_,
+ SockFamily::kUnix, SockType::kStream);
+ ASSERT_TRUE(srv->is_listening());
+
+ auto cli_connected = task_runner_.CreateCheckpoint("cli_connected");
+ auto cli = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_,
+ SockFamily::kUnix, SockType::kStream);
+ EXPECT_CALL(event_listener_, OnConnect(cli.get(), true))
+ .WillOnce(InvokeWithoutArgs(cli_connected));
+ auto srv_connected = task_runner_.CreateCheckpoint("srv_connected");
+ auto srv_disconnected = task_runner_.CreateCheckpoint("srv_disconnected");
+ EXPECT_CALL(event_listener_, OnNewIncomingConnection(srv.get(), _))
+ .WillOnce(Invoke([this, srv_connected, srv_disconnected](
+ UnixSocket*, UnixSocket* srv_conn) {
+ // An empty OnDataAvailable might be raised to signal the EOF state.
+ EXPECT_CALL(event_listener_, OnDataAvailable(srv_conn))
+ .WillRepeatedly(
+ InvokeWithoutArgs([srv_conn] { srv_conn->ReceiveString(); }));
+ EXPECT_CALL(event_listener_, OnDisconnect(srv_conn))
+ .WillOnce(InvokeWithoutArgs(srv_disconnected));
+ srv_connected();
+ }));
+ task_runner_.RunUntilCheckpoint("srv_connected");
+ task_runner_.RunUntilCheckpoint("cli_connected");
+ ASSERT_TRUE(cli->is_connected());
+ cli.reset();
+ task_runner_.RunUntilCheckpoint("srv_disconnected");
+}
+
+// Mostly a stress tests. Connects kNumClients clients to the same server and
+// tests that all can exchange data and can see the expected sequence of events.
+TEST_F(UnixSocketTest, SeveralClients) {
+ auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_,
+ SockFamily::kUnix, SockType::kStream);
+ ASSERT_TRUE(srv->is_listening());
+ constexpr size_t kNumClients = 32;
+ std::unique_ptr<UnixSocket> cli[kNumClients];
+
+ EXPECT_CALL(event_listener_, OnNewIncomingConnection(srv.get(), _))
+ .Times(kNumClients)
+ .WillRepeatedly(Invoke([this](UnixSocket*, UnixSocket* s) {
+ EXPECT_CALL(event_listener_, OnDataAvailable(s))
+ .WillOnce(Invoke([](UnixSocket* t) {
+ ASSERT_EQ("PING", t->ReceiveString());
+ ASSERT_TRUE(t->Send("PONG"));
+ }));
+ }));
+
+ for (size_t i = 0; i < kNumClients; i++) {
+ cli[i] = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_,
+ SockFamily::kUnix, SockType::kStream);
+ EXPECT_CALL(event_listener_, OnConnect(cli[i].get(), true))
+ .WillOnce(Invoke([](UnixSocket* s, bool success) {
+ ASSERT_TRUE(success);
+ ASSERT_TRUE(s->Send("PING"));
+ }));
+
+ auto checkpoint = task_runner_.CreateCheckpoint(std::to_string(i));
+ EXPECT_CALL(event_listener_, OnDataAvailable(cli[i].get()))
+ .WillOnce(Invoke([checkpoint](UnixSocket* s) {
+ ASSERT_EQ("PONG", s->ReceiveString());
+ checkpoint();
+ }));
+ }
+
+ for (size_t i = 0; i < kNumClients; i++) {
+ task_runner_.RunUntilCheckpoint(std::to_string(i));
+ ASSERT_TRUE(Mock::VerifyAndClearExpectations(cli[i].get()));
+ }
+}
+
+// Tests the SockPeerCredMode::kIgnore logic.
+TEST_F(UnixSocketTest, IgnorePeerCredentials) {
+ auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_,
+ SockFamily::kUnix, SockType::kStream);
+ ASSERT_TRUE(srv->is_listening());
+ auto cli1_connected = task_runner_.CreateCheckpoint("cli1_connected");
+ auto cli1 = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_,
+ SockFamily::kUnix, SockType::kStream,
+ SockPeerCredMode::kIgnore);
+ EXPECT_CALL(event_listener_, OnConnect(cli1.get(), true))
+ .WillOnce(InvokeWithoutArgs(cli1_connected));
+
+ auto cli2_connected = task_runner_.CreateCheckpoint("cli2_connected");
+ auto cli2 = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_,
+ SockFamily::kUnix, SockType::kStream,
+ SockPeerCredMode::kReadOnConnect);
+ EXPECT_CALL(event_listener_, OnConnect(cli2.get(), true))
+ .WillOnce(InvokeWithoutArgs(cli2_connected));
+
+ task_runner_.RunUntilCheckpoint("cli1_connected");
+ task_runner_.RunUntilCheckpoint("cli2_connected");
+
+#if !PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
+ ASSERT_EQ(cli1->peer_uid_posix(/*skip_check_for_testing=*/true), kInvalidUid);
+ ASSERT_EQ(cli2->peer_uid_posix(), geteuid());
+#if PERFETTO_BUILDFLAG(PERFETTO_OS_LINUX) || \
+ PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
+ ASSERT_EQ(cli1->peer_pid_linux(/*skip_check_for_testing=*/true), kInvalidPid);
+ ASSERT_EQ(cli2->peer_pid_linux(), getpid());
+#endif
+#endif
+}
+
+TEST_F(UnixSocketTest, BlockingSend) {
+ auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_,
+ SockFamily::kUnix, SockType::kStream);
+ ASSERT_TRUE(srv->is_listening());
+
+ auto all_frames_done = task_runner_.CreateCheckpoint("all_frames_done");
+ size_t total_bytes_received = 0;
+ static constexpr size_t kTotalBytes = 1024 * 1024 * 4;
+ EXPECT_CALL(event_listener_, OnNewIncomingConnection(srv.get(), _))
+ .WillOnce(Invoke([this, &total_bytes_received, all_frames_done](
+ UnixSocket*, UnixSocket* srv_conn) {
+ EXPECT_CALL(event_listener_, OnDataAvailable(srv_conn))
+ .WillRepeatedly(
+ Invoke([&total_bytes_received, all_frames_done](UnixSocket* s) {
+ char buf[1024];
+ size_t res = s->Receive(buf, sizeof(buf));
+ total_bytes_received += res;
+ if (total_bytes_received == kTotalBytes)
+ all_frames_done();
+ }));
+ }));
+
+ // Override default timeout as this test can take time on the emulator.
+ static constexpr int kTimeoutMs = 60000 * 3;
+
+ // Perform the blocking send form another thread.
+ std::thread tx_thread([] {
+ TestTaskRunner tx_task_runner;
+ MockEventListener tx_events;
+ auto cli = UnixSocket::Connect(kSocketName, &tx_events, &tx_task_runner,
+ SockFamily::kUnix, SockType::kStream);
+
+ auto cli_connected = tx_task_runner.CreateCheckpoint("cli_connected");
+ EXPECT_CALL(tx_events, OnConnect(cli.get(), true))
+ .WillOnce(InvokeWithoutArgs(cli_connected));
+ tx_task_runner.RunUntilCheckpoint("cli_connected");
+
+ auto all_sent = tx_task_runner.CreateCheckpoint("all_sent");
+ char buf[1024 * 32] = {};
+ tx_task_runner.PostTask([&cli, &buf, all_sent] {
+ for (size_t i = 0; i < kTotalBytes / sizeof(buf); i++)
+ cli->Send(buf, sizeof(buf));
+ all_sent();
+ });
+ tx_task_runner.RunUntilCheckpoint("all_sent", kTimeoutMs);
+ });
+
+ task_runner_.RunUntilCheckpoint("all_frames_done", kTimeoutMs);
+ tx_thread.join();
+}
+
+// Regression test for b/76155349 . If the receiver end disconnects while the
+// sender is in the middle of a large send(), the socket should gracefully give
+// up (i.e. Shutdown()) but not crash.
+TEST_F(UnixSocketTest, ReceiverDisconnectsDuringSend) {
+ auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_,
+ SockFamily::kUnix, SockType::kStream);
+ ASSERT_TRUE(srv->is_listening());
+ static constexpr int kTimeoutMs = 30000;
+
+ auto receive_done = task_runner_.CreateCheckpoint("receive_done");
+ EXPECT_CALL(event_listener_, OnNewIncomingConnection(srv.get(), _))
+ .WillOnce(Invoke([this, receive_done](UnixSocket*, UnixSocket* srv_conn) {
+ EXPECT_CALL(event_listener_, OnDataAvailable(srv_conn))
+ .WillOnce(Invoke([receive_done](UnixSocket* s) {
+ char buf[1024];
+ size_t res = s->Receive(buf, sizeof(buf));
+ ASSERT_EQ(1024u, res);
+ s->Shutdown(false /*notify*/);
+ receive_done();
+ }));
+ }));
+
+ // Perform the blocking send form another thread.
+ std::thread tx_thread([] {
+ TestTaskRunner tx_task_runner;
+ MockEventListener tx_events;
+ auto cli = UnixSocket::Connect(kSocketName, &tx_events, &tx_task_runner,
+ SockFamily::kUnix, SockType::kStream);
+
+ auto cli_connected = tx_task_runner.CreateCheckpoint("cli_connected");
+ EXPECT_CALL(tx_events, OnConnect(cli.get(), true))
+ .WillOnce(InvokeWithoutArgs(cli_connected));
+ tx_task_runner.RunUntilCheckpoint("cli_connected");
+
+ auto send_done = tx_task_runner.CreateCheckpoint("send_done");
+ static constexpr size_t kBufSize = 32 * 1024 * 1024;
+ std::unique_ptr<char[]> buf(new char[kBufSize]());
+ tx_task_runner.PostTask([&cli, &buf, send_done] {
+ cli->Send(buf.get(), kBufSize);
+ send_done();
+ });
+
+ tx_task_runner.RunUntilCheckpoint("send_done", kTimeoutMs);
+ });
+ task_runner_.RunUntilCheckpoint("receive_done", kTimeoutMs);
+ tx_thread.join();
+}
+
+TEST_F(UnixSocketTest, ReleaseSocket) {
+ auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_,
+ SockFamily::kUnix, SockType::kStream);
+ ASSERT_TRUE(srv->is_listening());
+ auto srv_connected = task_runner_.CreateCheckpoint("srv_connected");
+ UnixSocket* peer = nullptr;
+ EXPECT_CALL(event_listener_, OnNewIncomingConnection(srv.get(), _))
+ .WillOnce(
+ Invoke([srv_connected, &peer](UnixSocket*, UnixSocket* new_conn) {
+ peer = new_conn;
+ srv_connected();
+ }));
+
+ auto cli = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_,
+ SockFamily::kUnix, SockType::kStream);
+ auto cli_connected = task_runner_.CreateCheckpoint("cli_connected");
+ EXPECT_CALL(event_listener_, OnConnect(cli.get(), true))
+ .WillOnce(InvokeWithoutArgs(cli_connected));
+ task_runner_.RunUntilCheckpoint("srv_connected");
+ task_runner_.RunUntilCheckpoint("cli_connected");
+ srv->Shutdown(true);
+
+ cli->Send("test");
+
+ ASSERT_NE(peer, nullptr);
+ auto raw_sock = peer->ReleaseSocket();
+
+ EXPECT_CALL(event_listener_, OnDataAvailable(_)).Times(0);
+ task_runner_.RunUntilIdle();
+
+ char buf[sizeof("test")];
+ ASSERT_TRUE(raw_sock);
+ ASSERT_EQ(raw_sock.Receive(buf, sizeof(buf)),
+ static_cast<ssize_t>(sizeof(buf)));
+ ASSERT_STREQ(buf, "test");
+}
+
+TEST_F(UnixSocketTest, TcpStream) {
+ char host_and_port[32];
+ int attempt = 0;
+ std::unique_ptr<UnixSocket> srv;
+
+ // Try listening on a random port. Some ports might be taken by other syste
+ // services. Do a bunch of attempts on different ports before giving up.
+ do {
+ sprintf(host_and_port, "127.0.0.1:%d", 10000 + (rand() % 10000));
+ srv = UnixSocket::Listen(host_and_port, &event_listener_, &task_runner_,
+ SockFamily::kInet, SockType::kStream);
+ } while ((!srv || !srv->is_listening()) && attempt++ < 10);
+ ASSERT_TRUE(srv->is_listening());
+
+ constexpr size_t kNumClients = 3;
+ std::unique_ptr<UnixSocket> cli[kNumClients];
+ EXPECT_CALL(event_listener_, OnNewIncomingConnection(srv.get(), _))
+ .Times(kNumClients)
+ .WillRepeatedly(Invoke([&](UnixSocket*, UnixSocket* s) {
+ // OnDisconnect() might spuriously happen depending on the dtor order.
+ EXPECT_CALL(event_listener_, OnDisconnect(s)).Times(AtLeast(0));
+ EXPECT_CALL(event_listener_, OnDataAvailable(s))
+ .WillRepeatedly(Invoke([](UnixSocket* cli_sock) {
+ cli_sock->ReceiveString(); // Read connection EOF;
+ }));
+ ASSERT_TRUE(s->Send("welcome"));
+ }));
+
+ for (size_t i = 0; i < kNumClients; i++) {
+ cli[i] = UnixSocket::Connect(host_and_port, &event_listener_, &task_runner_,
+ SockFamily::kInet, SockType::kStream);
+ auto checkpoint = task_runner_.CreateCheckpoint(std::to_string(i));
+ EXPECT_CALL(event_listener_, OnDisconnect(cli[i].get())).Times(AtLeast(0));
+ EXPECT_CALL(event_listener_, OnConnect(cli[i].get(), true));
+ EXPECT_CALL(event_listener_, OnDataAvailable(cli[i].get()))
+ .WillRepeatedly(Invoke([checkpoint](UnixSocket* s) {
+ auto str = s->ReceiveString();
+ if (str == "")
+ return; // Connection EOF.
+ ASSERT_EQ("welcome", str);
+ checkpoint();
+ }));
+ }
+
+ for (size_t i = 0; i < kNumClients; i++) {
+ task_runner_.RunUntilCheckpoint(std::to_string(i));
+ ASSERT_TRUE(Mock::VerifyAndClearExpectations(cli[i].get()));
+ }
+}
+
+// ---------------------------------
+// Posix-only tests below this point
+// ---------------------------------
+
+#if !PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
+// Checks that the peer_uid() is retained after the client disconnects. The IPC
+// layer needs to rely on this to validate messages received immediately before
+// a client disconnects.
+TEST_F(UnixSocketTest, PeerCredentialsRetainedAfterDisconnect) {
+ auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_,
+ SockFamily::kUnix, SockType::kStream);
+ ASSERT_TRUE(srv->is_listening());
+ UnixSocket* srv_client_conn = nullptr;
+ auto srv_connected = task_runner_.CreateCheckpoint("srv_connected");
+ EXPECT_CALL(event_listener_, OnNewIncomingConnection(srv.get(), _))
+ .WillOnce(Invoke([&srv_client_conn, srv_connected](UnixSocket*,
+ UnixSocket* srv_conn) {
+ srv_client_conn = srv_conn;
+ EXPECT_EQ(geteuid(), static_cast<uint32_t>(srv_conn->peer_uid_posix()));
+#if PERFETTO_BUILDFLAG(PERFETTO_OS_LINUX) || \
+ PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
+ EXPECT_EQ(getpid(), static_cast<pid_t>(srv_conn->peer_pid_linux()));
+#endif
+ srv_connected();
+ }));
+ auto cli_connected = task_runner_.CreateCheckpoint("cli_connected");
+ auto cli = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_,
+ SockFamily::kUnix, SockType::kStream);
+ EXPECT_CALL(event_listener_, OnConnect(cli.get(), true))
+ .WillOnce(InvokeWithoutArgs(cli_connected));
+
+ task_runner_.RunUntilCheckpoint("cli_connected");
+ task_runner_.RunUntilCheckpoint("srv_connected");
+ ASSERT_NE(nullptr, srv_client_conn);
+ ASSERT_TRUE(srv_client_conn->is_connected());
+
+ auto cli_disconnected = task_runner_.CreateCheckpoint("cli_disconnected");
+ EXPECT_CALL(event_listener_, OnDisconnect(srv_client_conn))
+ .WillOnce(InvokeWithoutArgs(cli_disconnected));
+
+ // TODO(primiano): when the a peer disconnects, the other end receives a
+ // spurious OnDataAvailable() that needs to be acked with a Receive() to read
+ // the EOF. See b/69536434.
+ EXPECT_CALL(event_listener_, OnDataAvailable(srv_client_conn))
+ .WillOnce(Invoke([](UnixSocket* sock) { sock->ReceiveString(); }));
+
+ cli.reset();
+ task_runner_.RunUntilCheckpoint("cli_disconnected");
+ ASSERT_FALSE(srv_client_conn->is_connected());
+ EXPECT_EQ(geteuid(),
+ static_cast<uint32_t>(srv_client_conn->peer_uid_posix()));
+#if PERFETTO_BUILDFLAG(PERFETTO_OS_LINUX) || \
+ PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
+ EXPECT_EQ(getpid(), static_cast<pid_t>(srv_client_conn->peer_pid_linux()));
+#endif
+}
TEST_F(UnixSocketTest, ClientAndServerExchangeFDs) {
+ static constexpr char cli_str[] = "cli>srv";
+ static constexpr char srv_str[] = "srv>cli";
auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_,
SockFamily::kUnix, SockType::kStream);
ASSERT_TRUE(srv->is_listening());
@@ -271,79 +632,6 @@
task_runner_.RunUntilCheckpoint("cli_disconnected");
}
-TEST_F(UnixSocketTest, ListenWithPassedFileDescriptor) {
- auto sock_raw =
- UnixSocketRaw::CreateMayFail(SockFamily::kUnix, SockType::kStream);
- ASSERT_TRUE(sock_raw.Bind(kSocketName));
- auto fd = sock_raw.ReleaseFd();
- auto srv = UnixSocket::Listen(std::move(fd), &event_listener_, &task_runner_,
- SockFamily::kUnix, SockType::kStream);
- ASSERT_TRUE(srv->is_listening());
-
- auto cli = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_,
- SockFamily::kUnix, SockType::kStream);
- EXPECT_CALL(event_listener_, OnConnect(cli.get(), true));
- auto cli_connected = task_runner_.CreateCheckpoint("cli_connected");
- auto srv_disconnected = task_runner_.CreateCheckpoint("srv_disconnected");
- EXPECT_CALL(event_listener_, OnNewIncomingConnection(srv.get(), _))
- .WillOnce(Invoke([this, cli_connected, srv_disconnected](
- UnixSocket*, UnixSocket* srv_conn) {
- // Read the EOF state.
- EXPECT_CALL(event_listener_, OnDataAvailable(srv_conn))
- .WillOnce(
- InvokeWithoutArgs([srv_conn] { srv_conn->ReceiveString(); }));
- EXPECT_CALL(event_listener_, OnDisconnect(srv_conn))
- .WillOnce(InvokeWithoutArgs(srv_disconnected));
- cli_connected();
- }));
- task_runner_.RunUntilCheckpoint("cli_connected");
- ASSERT_TRUE(cli->is_connected());
- cli.reset();
- task_runner_.RunUntilCheckpoint("srv_disconnected");
-}
-
-// Mostly a stress tests. Connects kNumClients clients to the same server and
-// tests that all can exchange data and can see the expected sequence of events.
-TEST_F(UnixSocketTest, SeveralClients) {
- auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_,
- SockFamily::kUnix, SockType::kStream);
- ASSERT_TRUE(srv->is_listening());
- constexpr size_t kNumClients = 32;
- std::unique_ptr<UnixSocket> cli[kNumClients];
-
- EXPECT_CALL(event_listener_, OnNewIncomingConnection(srv.get(), _))
- .Times(kNumClients)
- .WillRepeatedly(Invoke([this](UnixSocket*, UnixSocket* s) {
- EXPECT_CALL(event_listener_, OnDataAvailable(s))
- .WillOnce(Invoke([](UnixSocket* t) {
- ASSERT_EQ("PING", t->ReceiveString());
- ASSERT_TRUE(t->Send("PONG"));
- }));
- }));
-
- for (size_t i = 0; i < kNumClients; i++) {
- cli[i] = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_,
- SockFamily::kUnix, SockType::kStream);
- EXPECT_CALL(event_listener_, OnConnect(cli[i].get(), true))
- .WillOnce(Invoke([](UnixSocket* s, bool success) {
- ASSERT_TRUE(success);
- ASSERT_TRUE(s->Send("PING"));
- }));
-
- auto checkpoint = task_runner_.CreateCheckpoint(std::to_string(i));
- EXPECT_CALL(event_listener_, OnDataAvailable(cli[i].get()))
- .WillOnce(Invoke([checkpoint](UnixSocket* s) {
- ASSERT_EQ("PONG", s->ReceiveString());
- checkpoint();
- }));
- }
-
- for (size_t i = 0; i < kNumClients; i++) {
- task_runner_.RunUntilCheckpoint(std::to_string(i));
- ASSERT_TRUE(Mock::VerifyAndClearExpectations(cli[i].get()));
- }
-}
-
// Creates two processes. The server process creates a file and passes it over
// the socket to the client. Both processes mmap the file in shared mode and
// check that they see the same contents.
@@ -372,7 +660,8 @@
EXPECT_CALL(event_listener_, OnNewIncomingConnection(srv.get(), _))
.WillOnce(Invoke(
[this, tmp_fd, checkpoint, mem](UnixSocket*, UnixSocket* new_conn) {
- ASSERT_EQ(geteuid(), static_cast<uint32_t>(new_conn->peer_uid()));
+ ASSERT_EQ(geteuid(),
+ static_cast<uint32_t>(new_conn->peer_uid_posix()));
ASSERT_TRUE(new_conn->Send("txfd", 5, tmp_fd));
// Wait for the client to change this again.
EXPECT_CALL(event_listener_, OnDataAvailable(new_conn))
@@ -420,189 +709,6 @@
}
}
-// Checks that the peer_uid() is retained after the client disconnects. The IPC
-// layer needs to rely on this to validate messages received immediately before
-// a client disconnects.
-TEST_F(UnixSocketTest, PeerCredentialsRetainedAfterDisconnect) {
- auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_,
- SockFamily::kUnix, SockType::kStream);
- ASSERT_TRUE(srv->is_listening());
- UnixSocket* srv_client_conn = nullptr;
- auto srv_connected = task_runner_.CreateCheckpoint("srv_connected");
- EXPECT_CALL(event_listener_, OnNewIncomingConnection(srv.get(), _))
- .WillOnce(Invoke(
- [&srv_client_conn, srv_connected](UnixSocket*, UnixSocket* srv_conn) {
- srv_client_conn = srv_conn;
- EXPECT_EQ(geteuid(), static_cast<uint32_t>(srv_conn->peer_uid()));
-#if PERFETTO_BUILDFLAG(PERFETTO_OS_LINUX) || \
- PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
- EXPECT_EQ(getpid(), static_cast<pid_t>(srv_conn->peer_pid()));
-#endif
- srv_connected();
- }));
- auto cli_connected = task_runner_.CreateCheckpoint("cli_connected");
- auto cli = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_,
- SockFamily::kUnix, SockType::kStream);
- EXPECT_CALL(event_listener_, OnConnect(cli.get(), true))
- .WillOnce(InvokeWithoutArgs(cli_connected));
-
- task_runner_.RunUntilCheckpoint("cli_connected");
- task_runner_.RunUntilCheckpoint("srv_connected");
- ASSERT_NE(nullptr, srv_client_conn);
- ASSERT_TRUE(srv_client_conn->is_connected());
-
- auto cli_disconnected = task_runner_.CreateCheckpoint("cli_disconnected");
- EXPECT_CALL(event_listener_, OnDisconnect(srv_client_conn))
- .WillOnce(InvokeWithoutArgs(cli_disconnected));
-
- // TODO(primiano): when the a peer disconnects, the other end receives a
- // spurious OnDataAvailable() that needs to be acked with a Receive() to read
- // the EOF. See b/69536434.
- EXPECT_CALL(event_listener_, OnDataAvailable(srv_client_conn))
- .WillOnce(Invoke([](UnixSocket* sock) { sock->ReceiveString(); }));
-
- cli.reset();
- task_runner_.RunUntilCheckpoint("cli_disconnected");
- ASSERT_FALSE(srv_client_conn->is_connected());
- EXPECT_EQ(geteuid(), static_cast<uint32_t>(srv_client_conn->peer_uid()));
-#if PERFETTO_BUILDFLAG(PERFETTO_OS_LINUX) || \
- PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
- EXPECT_EQ(getpid(), static_cast<pid_t>(srv_client_conn->peer_pid()));
-#endif
-}
-
-// Tests the SockPeerCredMode::kIgnore logic.
-TEST_F(UnixSocketTest, IgnorePeerCredentials) {
- auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_,
- SockFamily::kUnix, SockType::kStream);
- ASSERT_TRUE(srv->is_listening());
- auto cli1_connected = task_runner_.CreateCheckpoint("cli1_connected");
- auto cli1 = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_,
- SockFamily::kUnix, SockType::kStream,
- SockPeerCredMode::kIgnore);
- EXPECT_CALL(event_listener_, OnConnect(cli1.get(), true))
- .WillOnce(InvokeWithoutArgs(cli1_connected));
-
- auto cli2_connected = task_runner_.CreateCheckpoint("cli2_connected");
- auto cli2 = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_,
- SockFamily::kUnix, SockType::kStream,
- SockPeerCredMode::kReadOnConnect);
- EXPECT_CALL(event_listener_, OnConnect(cli2.get(), true))
- .WillOnce(InvokeWithoutArgs(cli2_connected));
-
- task_runner_.RunUntilCheckpoint("cli1_connected");
- task_runner_.RunUntilCheckpoint("cli2_connected");
-
- ASSERT_EQ(cli1->peer_uid(/*skip_check_for_testing=*/true), kInvalidUid);
- ASSERT_EQ(cli2->peer_uid(), geteuid());
-#if PERFETTO_BUILDFLAG(PERFETTO_OS_LINUX) || \
- PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
- ASSERT_EQ(cli1->peer_pid(/*skip_check_for_testing=*/true), kInvalidPid);
- ASSERT_EQ(cli2->peer_pid(), getpid());
-#endif
-}
-
-TEST_F(UnixSocketTest, BlockingSend) {
- auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_,
- SockFamily::kUnix, SockType::kStream);
- ASSERT_TRUE(srv->is_listening());
-
- auto all_frames_done = task_runner_.CreateCheckpoint("all_frames_done");
- size_t total_bytes_received = 0;
- constexpr size_t kTotalBytes = 1024 * 1024 * 4;
- EXPECT_CALL(event_listener_, OnNewIncomingConnection(srv.get(), _))
- .WillOnce(Invoke([this, &total_bytes_received, all_frames_done](
- UnixSocket*, UnixSocket* srv_conn) {
- EXPECT_CALL(event_listener_, OnDataAvailable(srv_conn))
- .WillRepeatedly(
- Invoke([&total_bytes_received, all_frames_done](UnixSocket* s) {
- char buf[1024];
- size_t res = s->Receive(buf, sizeof(buf));
- total_bytes_received += res;
- if (total_bytes_received == kTotalBytes)
- all_frames_done();
- }));
- }));
-
- // Override default timeout as this test can take time on the emulator.
- const int kTimeoutMs = 60000 * 3;
-
- // Perform the blocking send form another thread.
- std::thread tx_thread([] {
- TestTaskRunner tx_task_runner;
- MockEventListener tx_events;
- auto cli = UnixSocket::Connect(kSocketName, &tx_events, &tx_task_runner,
- SockFamily::kUnix, SockType::kStream);
-
- auto cli_connected = tx_task_runner.CreateCheckpoint("cli_connected");
- EXPECT_CALL(tx_events, OnConnect(cli.get(), true))
- .WillOnce(InvokeWithoutArgs(cli_connected));
- tx_task_runner.RunUntilCheckpoint("cli_connected");
-
- auto all_sent = tx_task_runner.CreateCheckpoint("all_sent");
- char buf[1024 * 32] = {};
- tx_task_runner.PostTask([&cli, &buf, all_sent] {
- for (size_t i = 0; i < kTotalBytes / sizeof(buf); i++)
- cli->Send(buf, sizeof(buf));
- all_sent();
- });
- tx_task_runner.RunUntilCheckpoint("all_sent", kTimeoutMs);
- });
-
- task_runner_.RunUntilCheckpoint("all_frames_done", kTimeoutMs);
- tx_thread.join();
-}
-
-// Regression test for b/76155349 . If the receiver end disconnects while the
-// sender is in the middle of a large send(), the socket should gracefully give
-// up (i.e. Shutdown()) but not crash.
-TEST_F(UnixSocketTest, ReceiverDisconnectsDuringSend) {
- auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_,
- SockFamily::kUnix, SockType::kStream);
- ASSERT_TRUE(srv->is_listening());
- const int kTimeoutMs = 30000;
-
- auto receive_done = task_runner_.CreateCheckpoint("receive_done");
- EXPECT_CALL(event_listener_, OnNewIncomingConnection(srv.get(), _))
- .WillOnce(Invoke([this, receive_done](UnixSocket*, UnixSocket* srv_conn) {
- EXPECT_CALL(event_listener_, OnDataAvailable(srv_conn))
- .WillOnce(Invoke([receive_done](UnixSocket* s) {
- char buf[1024];
- size_t res = s->Receive(buf, sizeof(buf));
- ASSERT_EQ(1024u, res);
- s->Shutdown(false /*notify*/);
- receive_done();
- }));
- }));
-
- // Perform the blocking send form another thread.
- std::thread tx_thread([] {
- TestTaskRunner tx_task_runner;
- MockEventListener tx_events;
- auto cli = UnixSocket::Connect(kSocketName, &tx_events, &tx_task_runner,
- SockFamily::kUnix, SockType::kStream);
-
- auto cli_connected = tx_task_runner.CreateCheckpoint("cli_connected");
- EXPECT_CALL(tx_events, OnConnect(cli.get(), true))
- .WillOnce(InvokeWithoutArgs(cli_connected));
- tx_task_runner.RunUntilCheckpoint("cli_connected");
-
- auto send_done = tx_task_runner.CreateCheckpoint("send_done");
- // We need a
- static constexpr size_t kBufSize = 32 * 1024 * 1024;
- std::unique_ptr<char[]> buf(new char[kBufSize]());
- tx_task_runner.PostTask([&cli, &buf, send_done] {
- bool send_res = cli->Send(buf.get(), kBufSize);
- ASSERT_FALSE(send_res);
- send_done();
- });
-
- tx_task_runner.RunUntilCheckpoint("send_done", kTimeoutMs);
- });
- task_runner_.RunUntilCheckpoint("receive_done", kTimeoutMs);
- tx_thread.join();
-}
-
TEST_F(UnixSocketTest, ShiftMsgHdrSendPartialFirst) {
// Send a part of the first iov, then send the rest.
struct iovec iov[2] = {};
@@ -618,7 +724,7 @@
hdr.msg_iov = iov;
hdr.msg_iovlen = base::ArraySize(iov);
- UnixSocketRaw::ShiftMsgHdr(1, &hdr);
+ UnixSocketRaw::ShiftMsgHdrPosix(1, &hdr);
EXPECT_NE(hdr.msg_iov, nullptr);
EXPECT_EQ(hdr.msg_iov[0].iov_base, &hello[1]);
EXPECT_EQ(hdr.msg_iov[1].iov_base, &world[0]);
@@ -626,13 +732,13 @@
EXPECT_STREQ(reinterpret_cast<char*>(hdr.msg_iov[0].iov_base), "ello");
EXPECT_EQ(iov[0].iov_len, base::ArraySize(hello) - 1);
- UnixSocketRaw::ShiftMsgHdr(base::ArraySize(hello) - 1, &hdr);
+ UnixSocketRaw::ShiftMsgHdrPosix(base::ArraySize(hello) - 1, &hdr);
EXPECT_EQ(hdr.msg_iov, &iov[1]);
EXPECT_EQ(static_cast<int>(hdr.msg_iovlen), 1);
EXPECT_STREQ(reinterpret_cast<char*>(hdr.msg_iov[0].iov_base), world);
EXPECT_EQ(hdr.msg_iov[0].iov_len, base::ArraySize(world));
- UnixSocketRaw::ShiftMsgHdr(base::ArraySize(world), &hdr);
+ UnixSocketRaw::ShiftMsgHdrPosix(base::ArraySize(world), &hdr);
EXPECT_EQ(hdr.msg_iov, nullptr);
EXPECT_EQ(static_cast<int>(hdr.msg_iovlen), 0);
}
@@ -652,13 +758,13 @@
hdr.msg_iov = iov;
hdr.msg_iovlen = base::ArraySize(iov);
- UnixSocketRaw::ShiftMsgHdr(base::ArraySize(hello) + 1, &hdr);
+ UnixSocketRaw::ShiftMsgHdrPosix(base::ArraySize(hello) + 1, &hdr);
EXPECT_NE(hdr.msg_iov, nullptr);
EXPECT_EQ(static_cast<int>(hdr.msg_iovlen), 1);
EXPECT_STREQ(reinterpret_cast<char*>(hdr.msg_iov[0].iov_base), "orld");
EXPECT_EQ(hdr.msg_iov[0].iov_len, base::ArraySize(world) - 1);
- UnixSocketRaw::ShiftMsgHdr(base::ArraySize(world) - 1, &hdr);
+ UnixSocketRaw::ShiftMsgHdrPosix(base::ArraySize(world) - 1, &hdr);
EXPECT_EQ(hdr.msg_iov, nullptr);
EXPECT_EQ(static_cast<int>(hdr.msg_iovlen), 0);
}
@@ -678,14 +784,13 @@
hdr.msg_iov = iov;
hdr.msg_iovlen = base::ArraySize(iov);
- UnixSocketRaw::ShiftMsgHdr(base::ArraySize(world) + base::ArraySize(hello),
- &hdr);
+ UnixSocketRaw::ShiftMsgHdrPosix(
+ base::ArraySize(world) + base::ArraySize(hello), &hdr);
EXPECT_EQ(hdr.msg_iov, nullptr);
EXPECT_EQ(static_cast<int>(hdr.msg_iovlen), 0);
}
-void Handler(int) {}
-
+// For use in PartialSendMsgAll template argument. Cannot be a lambda.
int RollbackSigaction(const struct sigaction* act) {
return sigaction(SIGWINCH, act, nullptr);
}
@@ -694,7 +799,7 @@
UnixSocketRaw send_sock;
UnixSocketRaw recv_sock;
std::tie(send_sock, recv_sock) =
- UnixSocketRaw::CreatePair(SockFamily::kUnix, SockType::kStream);
+ UnixSocketRaw::CreatePairPosix(SockFamily::kUnix, SockType::kStream);
ASSERT_TRUE(send_sock);
ASSERT_TRUE(recv_sock);
@@ -723,7 +828,7 @@
// this action will affect the whole process.
struct sigaction oldact;
struct sigaction newact = {};
- newact.sa_handler = Handler;
+ newact.sa_handler = [](int) {};
ASSERT_EQ(sigaction(SIGWINCH, &newact, &oldact), 0);
base::ScopedResource<const struct sigaction*, RollbackSigaction, nullptr>
rollback(&oldact);
@@ -734,7 +839,7 @@
ASSERT_EQ(rd, 1);
// We are now sure the other thread is in sendmsg, interrupt send.
ASSERT_EQ(pthread_kill(blocked_thread, SIGWINCH), 0);
- // Drain the socket to allow SendMsgAll to succeed.
+ // Drain the socket to allow SendMsgAllPosix to succeed.
size_t offset = 1;
while (offset < sizeof(recv_buf)) {
rd = PERFETTO_EINTR(
@@ -745,7 +850,7 @@
});
// Test sending the send_buf in several chunks as an iov to exercise the
- // more complicated code-paths of SendMsgAll.
+ // more complicated code-paths of SendMsgAllPosix.
struct msghdr hdr = {};
struct iovec iov[4];
static_assert(sizeof(send_buf) % base::ArraySize(iov) == 0,
@@ -758,110 +863,15 @@
hdr.msg_iov = iov;
hdr.msg_iovlen = base::ArraySize(iov);
- ASSERT_EQ(send_sock.SendMsgAll(&hdr), static_cast<ssize_t>(sizeof(send_buf)));
+ ASSERT_EQ(send_sock.SendMsgAllPosix(&hdr),
+ static_cast<ssize_t>(sizeof(send_buf)));
send_sock.Shutdown();
th.join();
// Make sure the re-entry logic was actually triggered.
ASSERT_EQ(hdr.msg_iov, nullptr);
ASSERT_EQ(memcmp(send_buf, recv_buf, sizeof(send_buf)), 0);
}
-
-TEST_F(UnixSocketTest, ReleaseSocket) {
- auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_,
- SockFamily::kUnix, SockType::kStream);
- ASSERT_TRUE(srv->is_listening());
- auto connected = task_runner_.CreateCheckpoint("connected");
- UnixSocket* peer = nullptr;
- EXPECT_CALL(event_listener_, OnNewIncomingConnection(srv.get(), _))
- .WillOnce(Invoke([connected, &peer](UnixSocket*, UnixSocket* new_conn) {
- peer = new_conn;
- connected();
- }));
-
- auto cli = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_,
- SockFamily::kUnix, SockType::kStream);
- EXPECT_CALL(event_listener_, OnConnect(cli.get(), true));
- task_runner_.RunUntilCheckpoint("connected");
- srv->Shutdown(true);
-
- cli->Send("test");
-
- ASSERT_NE(peer, nullptr);
- auto raw_sock = peer->ReleaseSocket();
-
- EXPECT_CALL(event_listener_, OnDataAvailable(_)).Times(0);
- task_runner_.RunUntilIdle();
-
- char buf[sizeof("test")];
- ASSERT_TRUE(raw_sock);
- ASSERT_EQ(raw_sock.Receive(buf, sizeof(buf)),
- static_cast<ssize_t>(sizeof(buf)));
- ASSERT_STREQ(buf, "test");
-}
-
-TEST_F(UnixSocketTest, TcpStream) {
- char host_and_port[32];
- int attempt = 0;
- std::unique_ptr<UnixSocket> srv;
-
- // Try listening on a random port. Some ports might be taken by other syste
- // services. Do a bunch of attempts on different ports before giving up.
- do {
- sprintf(host_and_port, "127.0.0.1:%d", 10000 + (rand() % 10000));
- srv = UnixSocket::Listen(host_and_port, &event_listener_, &task_runner_,
- SockFamily::kInet, SockType::kStream);
- } while ((!srv || !srv->is_listening()) && attempt++ < 10);
- ASSERT_TRUE(srv->is_listening());
-
- constexpr size_t kNumClients = 3;
- std::unique_ptr<UnixSocket> cli[kNumClients];
- EXPECT_CALL(event_listener_, OnNewIncomingConnection(srv.get(), _))
- .Times(kNumClients)
- .WillRepeatedly(Invoke([&](UnixSocket*, UnixSocket* s) {
- // OnDisconnect() might spuriously happen depending on the dtor order.
- EXPECT_CALL(event_listener_, OnDisconnect(s)).Times(AtLeast(0));
- EXPECT_CALL(event_listener_, OnDataAvailable(s))
- .WillRepeatedly(Invoke([](UnixSocket* cli_sock) {
- cli_sock->ReceiveString(); // Read connection EOF;
- }));
- ASSERT_TRUE(s->Send("welcome"));
- }));
-
- for (size_t i = 0; i < kNumClients; i++) {
- cli[i] = UnixSocket::Connect(host_and_port, &event_listener_, &task_runner_,
- SockFamily::kInet, SockType::kStream);
- // PERFETTO_ILOG("cli : %p", reinterpret_cast<void*>(cli[i].get()));
- auto checkpoint = task_runner_.CreateCheckpoint(std::to_string(i));
- EXPECT_CALL(event_listener_, OnDisconnect(cli[i].get())).Times(AtLeast(0));
- EXPECT_CALL(event_listener_, OnConnect(cli[i].get(), true));
- EXPECT_CALL(event_listener_, OnDataAvailable(cli[i].get()))
- .WillRepeatedly(Invoke([checkpoint](UnixSocket* s) {
- auto str = s->ReceiveString();
- if (str == "")
- return; // Connection EOF.
- ASSERT_EQ("welcome", str);
- checkpoint();
- }));
- }
-
- for (size_t i = 0; i < kNumClients; i++) {
- task_runner_.RunUntilCheckpoint(std::to_string(i));
- ASSERT_TRUE(Mock::VerifyAndClearExpectations(cli[i].get()));
- }
-}
-
-// TODO(primiano): add a test to check that in the case of a peer sending a fd
-// and the other end just doing a recv (without taking it), the fd is closed and
-// not left around.
-
-// TODO(primiano); add a test to check that a socket can be reused after
-// Shutdown(),
-
-// TODO(primiano): add a test to check that OnDisconnect() is called in all
-// possible cases.
-
-// TODO(primiano): add tests that destroy the socket in all possible stages and
-// verify that no spurious EventListener callback is received.
+#endif // !OS_WIN
} // namespace
} // namespace base