base: port UnixSocket to Windows

This change makes both AF_UNIX and TCP/IP sockets
work on Windows. AF_UNIX support is quite recent and works
only on Windows 10, see [1].
I haven't figured out yet if we should use AF_UNIX or just
a TCP socket on Windows for the IPC layer. In any ase, it feels
like the code can support both with very little efforts as
AF_UNIX doesn't add any extra complexity vs SOCK_STREAM.

[1] https://devblogs.microsoft.com/commandline/af_unix-comes-to-windows/

Bug: 174454879
Change-Id: I9ef645678e8da7c739ccb82e656e49f13441f2a4
diff --git a/src/base/unix_socket_unittest.cc b/src/base/unix_socket_unittest.cc
index 3039b0f..0398608 100644
--- a/src/base/unix_socket_unittest.cc
+++ b/src/base/unix_socket_unittest.cc
@@ -17,13 +17,16 @@
 #include "perfetto/ext/base/unix_socket.h"
 
 #include <signal.h>
-#include <sys/mman.h>
-#include <sys/socket.h>
 #include <sys/types.h>
-#include <sys/un.h>
 #include <list>
 #include <thread>
 
+#if !PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
+#include <sys/mman.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+#endif
+
 #include "perfetto/base/build_config.h"
 #include "perfetto/base/logging.h"
 #include "perfetto/ext/base/file_utils.h"
@@ -44,7 +47,7 @@
 using ::testing::InvokeWithoutArgs;
 using ::testing::Mock;
 
-constexpr char kSocketName[] = TEST_SOCK_NAME("unix_socket_unittest");
+constexpr char kSocketName[] = TEST_SOCK_NAME("unix_socket_unittest.sock");
 
 class MockEventListener : public UnixSocket::EventListener {
  public:
@@ -122,7 +125,11 @@
   auto cli_disconnected = task_runner_.CreateCheckpoint("cli_disconnected");
   EXPECT_CALL(event_listener_, OnDisconnect(cli.get()))
       .WillOnce(InvokeWithoutArgs(cli_disconnected));
-  EXPECT_FALSE(cli->Send("whatever"));
+
+  // On Windows the first send immediately after the disconnection succeeds, the
+  // kernel will detect the disconnection only later.
+  cli->Send(".");
+  EXPECT_FALSE(cli->Send("should_fail_both_on_win_and_unix"));
   task_runner_.RunUntilCheckpoint("cli_disconnected");
 }
 
@@ -133,16 +140,19 @@
 
   auto cli = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_,
                                  SockFamily::kUnix, SockType::kStream);
-  EXPECT_CALL(event_listener_, OnConnect(cli.get(), true));
   auto cli_connected = task_runner_.CreateCheckpoint("cli_connected");
+  EXPECT_CALL(event_listener_, OnConnect(cli.get(), true))
+      .WillOnce(InvokeWithoutArgs(cli_connected));
+  auto srv_conn_seen = task_runner_.CreateCheckpoint("srv_conn_seen");
   auto srv_disconnected = task_runner_.CreateCheckpoint("srv_disconnected");
   EXPECT_CALL(event_listener_, OnNewIncomingConnection(srv.get(), _))
-      .WillOnce(Invoke([this, cli_connected, srv_disconnected](
+      .WillOnce(Invoke([this, srv_conn_seen, srv_disconnected](
                            UnixSocket*, UnixSocket* srv_conn) {
         EXPECT_CALL(event_listener_, OnDisconnect(srv_conn))
             .WillOnce(InvokeWithoutArgs(srv_disconnected));
-        cli_connected();
+        srv_conn_seen();
       }));
+  task_runner_.RunUntilCheckpoint("srv_conn_seen");
   task_runner_.RunUntilCheckpoint("cli_connected");
 
   auto srv_conn = event_listener_.GetIncomingConnection();
@@ -184,10 +194,361 @@
   task_runner_.RunUntilCheckpoint("srv_disconnected");
 }
 
-constexpr char cli_str[] = "cli>srv";
-constexpr char srv_str[] = "srv>cli";
+TEST_F(UnixSocketTest, ListenWithPassedSocketHandle) {
+  auto sock_raw =
+      UnixSocketRaw::CreateMayFail(SockFamily::kUnix, SockType::kStream);
+  ASSERT_TRUE(sock_raw.Bind(kSocketName));
+  auto fd = sock_raw.ReleaseFd();
+  auto srv = UnixSocket::Listen(std::move(fd), &event_listener_, &task_runner_,
+                                SockFamily::kUnix, SockType::kStream);
+  ASSERT_TRUE(srv->is_listening());
+
+  auto cli_connected = task_runner_.CreateCheckpoint("cli_connected");
+  auto cli = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_,
+                                 SockFamily::kUnix, SockType::kStream);
+  EXPECT_CALL(event_listener_, OnConnect(cli.get(), true))
+      .WillOnce(InvokeWithoutArgs(cli_connected));
+  auto srv_connected = task_runner_.CreateCheckpoint("srv_connected");
+  auto srv_disconnected = task_runner_.CreateCheckpoint("srv_disconnected");
+  EXPECT_CALL(event_listener_, OnNewIncomingConnection(srv.get(), _))
+      .WillOnce(Invoke([this, srv_connected, srv_disconnected](
+                           UnixSocket*, UnixSocket* srv_conn) {
+        // An empty OnDataAvailable might be raised to signal the EOF state.
+        EXPECT_CALL(event_listener_, OnDataAvailable(srv_conn))
+            .WillRepeatedly(
+                InvokeWithoutArgs([srv_conn] { srv_conn->ReceiveString(); }));
+        EXPECT_CALL(event_listener_, OnDisconnect(srv_conn))
+            .WillOnce(InvokeWithoutArgs(srv_disconnected));
+        srv_connected();
+      }));
+  task_runner_.RunUntilCheckpoint("srv_connected");
+  task_runner_.RunUntilCheckpoint("cli_connected");
+  ASSERT_TRUE(cli->is_connected());
+  cli.reset();
+  task_runner_.RunUntilCheckpoint("srv_disconnected");
+}
+
+// Mostly a stress tests. Connects kNumClients clients to the same server and
+// tests that all can exchange data and can see the expected sequence of events.
+TEST_F(UnixSocketTest, SeveralClients) {
+  auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_,
+                                SockFamily::kUnix, SockType::kStream);
+  ASSERT_TRUE(srv->is_listening());
+  constexpr size_t kNumClients = 32;
+  std::unique_ptr<UnixSocket> cli[kNumClients];
+
+  EXPECT_CALL(event_listener_, OnNewIncomingConnection(srv.get(), _))
+      .Times(kNumClients)
+      .WillRepeatedly(Invoke([this](UnixSocket*, UnixSocket* s) {
+        EXPECT_CALL(event_listener_, OnDataAvailable(s))
+            .WillOnce(Invoke([](UnixSocket* t) {
+              ASSERT_EQ("PING", t->ReceiveString());
+              ASSERT_TRUE(t->Send("PONG"));
+            }));
+      }));
+
+  for (size_t i = 0; i < kNumClients; i++) {
+    cli[i] = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_,
+                                 SockFamily::kUnix, SockType::kStream);
+    EXPECT_CALL(event_listener_, OnConnect(cli[i].get(), true))
+        .WillOnce(Invoke([](UnixSocket* s, bool success) {
+          ASSERT_TRUE(success);
+          ASSERT_TRUE(s->Send("PING"));
+        }));
+
+    auto checkpoint = task_runner_.CreateCheckpoint(std::to_string(i));
+    EXPECT_CALL(event_listener_, OnDataAvailable(cli[i].get()))
+        .WillOnce(Invoke([checkpoint](UnixSocket* s) {
+          ASSERT_EQ("PONG", s->ReceiveString());
+          checkpoint();
+        }));
+  }
+
+  for (size_t i = 0; i < kNumClients; i++) {
+    task_runner_.RunUntilCheckpoint(std::to_string(i));
+    ASSERT_TRUE(Mock::VerifyAndClearExpectations(cli[i].get()));
+  }
+}
+
+// Tests the SockPeerCredMode::kIgnore logic.
+TEST_F(UnixSocketTest, IgnorePeerCredentials) {
+  auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_,
+                                SockFamily::kUnix, SockType::kStream);
+  ASSERT_TRUE(srv->is_listening());
+  auto cli1_connected = task_runner_.CreateCheckpoint("cli1_connected");
+  auto cli1 = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_,
+                                  SockFamily::kUnix, SockType::kStream,
+                                  SockPeerCredMode::kIgnore);
+  EXPECT_CALL(event_listener_, OnConnect(cli1.get(), true))
+      .WillOnce(InvokeWithoutArgs(cli1_connected));
+
+  auto cli2_connected = task_runner_.CreateCheckpoint("cli2_connected");
+  auto cli2 = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_,
+                                  SockFamily::kUnix, SockType::kStream,
+                                  SockPeerCredMode::kReadOnConnect);
+  EXPECT_CALL(event_listener_, OnConnect(cli2.get(), true))
+      .WillOnce(InvokeWithoutArgs(cli2_connected));
+
+  task_runner_.RunUntilCheckpoint("cli1_connected");
+  task_runner_.RunUntilCheckpoint("cli2_connected");
+
+#if !PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
+  ASSERT_EQ(cli1->peer_uid_posix(/*skip_check_for_testing=*/true), kInvalidUid);
+  ASSERT_EQ(cli2->peer_uid_posix(), geteuid());
+#if PERFETTO_BUILDFLAG(PERFETTO_OS_LINUX) || \
+    PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
+  ASSERT_EQ(cli1->peer_pid_linux(/*skip_check_for_testing=*/true), kInvalidPid);
+  ASSERT_EQ(cli2->peer_pid_linux(), getpid());
+#endif
+#endif
+}
+
+TEST_F(UnixSocketTest, BlockingSend) {
+  auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_,
+                                SockFamily::kUnix, SockType::kStream);
+  ASSERT_TRUE(srv->is_listening());
+
+  auto all_frames_done = task_runner_.CreateCheckpoint("all_frames_done");
+  size_t total_bytes_received = 0;
+  static constexpr size_t kTotalBytes = 1024 * 1024 * 4;
+  EXPECT_CALL(event_listener_, OnNewIncomingConnection(srv.get(), _))
+      .WillOnce(Invoke([this, &total_bytes_received, all_frames_done](
+                           UnixSocket*, UnixSocket* srv_conn) {
+        EXPECT_CALL(event_listener_, OnDataAvailable(srv_conn))
+            .WillRepeatedly(
+                Invoke([&total_bytes_received, all_frames_done](UnixSocket* s) {
+                  char buf[1024];
+                  size_t res = s->Receive(buf, sizeof(buf));
+                  total_bytes_received += res;
+                  if (total_bytes_received == kTotalBytes)
+                    all_frames_done();
+                }));
+      }));
+
+  // Override default timeout as this test can take time on the emulator.
+  static constexpr int kTimeoutMs = 60000 * 3;
+
+  // Perform the blocking send form another thread.
+  std::thread tx_thread([] {
+    TestTaskRunner tx_task_runner;
+    MockEventListener tx_events;
+    auto cli = UnixSocket::Connect(kSocketName, &tx_events, &tx_task_runner,
+                                   SockFamily::kUnix, SockType::kStream);
+
+    auto cli_connected = tx_task_runner.CreateCheckpoint("cli_connected");
+    EXPECT_CALL(tx_events, OnConnect(cli.get(), true))
+        .WillOnce(InvokeWithoutArgs(cli_connected));
+    tx_task_runner.RunUntilCheckpoint("cli_connected");
+
+    auto all_sent = tx_task_runner.CreateCheckpoint("all_sent");
+    char buf[1024 * 32] = {};
+    tx_task_runner.PostTask([&cli, &buf, all_sent] {
+      for (size_t i = 0; i < kTotalBytes / sizeof(buf); i++)
+        cli->Send(buf, sizeof(buf));
+      all_sent();
+    });
+    tx_task_runner.RunUntilCheckpoint("all_sent", kTimeoutMs);
+  });
+
+  task_runner_.RunUntilCheckpoint("all_frames_done", kTimeoutMs);
+  tx_thread.join();
+}
+
+// Regression test for b/76155349 . If the receiver end disconnects while the
+// sender is in the middle of a large send(), the socket should gracefully give
+// up (i.e. Shutdown()) but not crash.
+TEST_F(UnixSocketTest, ReceiverDisconnectsDuringSend) {
+  auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_,
+                                SockFamily::kUnix, SockType::kStream);
+  ASSERT_TRUE(srv->is_listening());
+  static constexpr int kTimeoutMs = 30000;
+
+  auto receive_done = task_runner_.CreateCheckpoint("receive_done");
+  EXPECT_CALL(event_listener_, OnNewIncomingConnection(srv.get(), _))
+      .WillOnce(Invoke([this, receive_done](UnixSocket*, UnixSocket* srv_conn) {
+        EXPECT_CALL(event_listener_, OnDataAvailable(srv_conn))
+            .WillOnce(Invoke([receive_done](UnixSocket* s) {
+              char buf[1024];
+              size_t res = s->Receive(buf, sizeof(buf));
+              ASSERT_EQ(1024u, res);
+              s->Shutdown(false /*notify*/);
+              receive_done();
+            }));
+      }));
+
+  // Perform the blocking send form another thread.
+  std::thread tx_thread([] {
+    TestTaskRunner tx_task_runner;
+    MockEventListener tx_events;
+    auto cli = UnixSocket::Connect(kSocketName, &tx_events, &tx_task_runner,
+                                   SockFamily::kUnix, SockType::kStream);
+
+    auto cli_connected = tx_task_runner.CreateCheckpoint("cli_connected");
+    EXPECT_CALL(tx_events, OnConnect(cli.get(), true))
+        .WillOnce(InvokeWithoutArgs(cli_connected));
+    tx_task_runner.RunUntilCheckpoint("cli_connected");
+
+    auto send_done = tx_task_runner.CreateCheckpoint("send_done");
+    static constexpr size_t kBufSize = 32 * 1024 * 1024;
+    std::unique_ptr<char[]> buf(new char[kBufSize]());
+    tx_task_runner.PostTask([&cli, &buf, send_done] {
+      cli->Send(buf.get(), kBufSize);
+      send_done();
+    });
+
+    tx_task_runner.RunUntilCheckpoint("send_done", kTimeoutMs);
+  });
+  task_runner_.RunUntilCheckpoint("receive_done", kTimeoutMs);
+  tx_thread.join();
+}
+
+TEST_F(UnixSocketTest, ReleaseSocket) {
+  auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_,
+                                SockFamily::kUnix, SockType::kStream);
+  ASSERT_TRUE(srv->is_listening());
+  auto srv_connected = task_runner_.CreateCheckpoint("srv_connected");
+  UnixSocket* peer = nullptr;
+  EXPECT_CALL(event_listener_, OnNewIncomingConnection(srv.get(), _))
+      .WillOnce(
+          Invoke([srv_connected, &peer](UnixSocket*, UnixSocket* new_conn) {
+            peer = new_conn;
+            srv_connected();
+          }));
+
+  auto cli = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_,
+                                 SockFamily::kUnix, SockType::kStream);
+  auto cli_connected = task_runner_.CreateCheckpoint("cli_connected");
+  EXPECT_CALL(event_listener_, OnConnect(cli.get(), true))
+      .WillOnce(InvokeWithoutArgs(cli_connected));
+  task_runner_.RunUntilCheckpoint("srv_connected");
+  task_runner_.RunUntilCheckpoint("cli_connected");
+  srv->Shutdown(true);
+
+  cli->Send("test");
+
+  ASSERT_NE(peer, nullptr);
+  auto raw_sock = peer->ReleaseSocket();
+
+  EXPECT_CALL(event_listener_, OnDataAvailable(_)).Times(0);
+  task_runner_.RunUntilIdle();
+
+  char buf[sizeof("test")];
+  ASSERT_TRUE(raw_sock);
+  ASSERT_EQ(raw_sock.Receive(buf, sizeof(buf)),
+            static_cast<ssize_t>(sizeof(buf)));
+  ASSERT_STREQ(buf, "test");
+}
+
+TEST_F(UnixSocketTest, TcpStream) {
+  char host_and_port[32];
+  int attempt = 0;
+  std::unique_ptr<UnixSocket> srv;
+
+  // Try listening on a random port. Some ports might be taken by other syste
+  // services. Do a bunch of attempts on different ports before giving up.
+  do {
+    sprintf(host_and_port, "127.0.0.1:%d", 10000 + (rand() % 10000));
+    srv = UnixSocket::Listen(host_and_port, &event_listener_, &task_runner_,
+                             SockFamily::kInet, SockType::kStream);
+  } while ((!srv || !srv->is_listening()) && attempt++ < 10);
+  ASSERT_TRUE(srv->is_listening());
+
+  constexpr size_t kNumClients = 3;
+  std::unique_ptr<UnixSocket> cli[kNumClients];
+  EXPECT_CALL(event_listener_, OnNewIncomingConnection(srv.get(), _))
+      .Times(kNumClients)
+      .WillRepeatedly(Invoke([&](UnixSocket*, UnixSocket* s) {
+        // OnDisconnect() might spuriously happen depending on the dtor order.
+        EXPECT_CALL(event_listener_, OnDisconnect(s)).Times(AtLeast(0));
+        EXPECT_CALL(event_listener_, OnDataAvailable(s))
+            .WillRepeatedly(Invoke([](UnixSocket* cli_sock) {
+              cli_sock->ReceiveString();  // Read connection EOF;
+            }));
+        ASSERT_TRUE(s->Send("welcome"));
+      }));
+
+  for (size_t i = 0; i < kNumClients; i++) {
+    cli[i] = UnixSocket::Connect(host_and_port, &event_listener_, &task_runner_,
+                                 SockFamily::kInet, SockType::kStream);
+    auto checkpoint = task_runner_.CreateCheckpoint(std::to_string(i));
+    EXPECT_CALL(event_listener_, OnDisconnect(cli[i].get())).Times(AtLeast(0));
+    EXPECT_CALL(event_listener_, OnConnect(cli[i].get(), true));
+    EXPECT_CALL(event_listener_, OnDataAvailable(cli[i].get()))
+        .WillRepeatedly(Invoke([checkpoint](UnixSocket* s) {
+          auto str = s->ReceiveString();
+          if (str == "")
+            return;  // Connection EOF.
+          ASSERT_EQ("welcome", str);
+          checkpoint();
+        }));
+  }
+
+  for (size_t i = 0; i < kNumClients; i++) {
+    task_runner_.RunUntilCheckpoint(std::to_string(i));
+    ASSERT_TRUE(Mock::VerifyAndClearExpectations(cli[i].get()));
+  }
+}
+
+// ---------------------------------
+// Posix-only tests below this point
+// ---------------------------------
+
+#if !PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
+// Checks that the peer_uid() is retained after the client disconnects. The IPC
+// layer needs to rely on this to validate messages received immediately before
+// a client disconnects.
+TEST_F(UnixSocketTest, PeerCredentialsRetainedAfterDisconnect) {
+  auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_,
+                                SockFamily::kUnix, SockType::kStream);
+  ASSERT_TRUE(srv->is_listening());
+  UnixSocket* srv_client_conn = nullptr;
+  auto srv_connected = task_runner_.CreateCheckpoint("srv_connected");
+  EXPECT_CALL(event_listener_, OnNewIncomingConnection(srv.get(), _))
+      .WillOnce(Invoke([&srv_client_conn, srv_connected](UnixSocket*,
+                                                         UnixSocket* srv_conn) {
+        srv_client_conn = srv_conn;
+        EXPECT_EQ(geteuid(), static_cast<uint32_t>(srv_conn->peer_uid_posix()));
+#if PERFETTO_BUILDFLAG(PERFETTO_OS_LINUX) || \
+    PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
+        EXPECT_EQ(getpid(), static_cast<pid_t>(srv_conn->peer_pid_linux()));
+#endif
+        srv_connected();
+      }));
+  auto cli_connected = task_runner_.CreateCheckpoint("cli_connected");
+  auto cli = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_,
+                                 SockFamily::kUnix, SockType::kStream);
+  EXPECT_CALL(event_listener_, OnConnect(cli.get(), true))
+      .WillOnce(InvokeWithoutArgs(cli_connected));
+
+  task_runner_.RunUntilCheckpoint("cli_connected");
+  task_runner_.RunUntilCheckpoint("srv_connected");
+  ASSERT_NE(nullptr, srv_client_conn);
+  ASSERT_TRUE(srv_client_conn->is_connected());
+
+  auto cli_disconnected = task_runner_.CreateCheckpoint("cli_disconnected");
+  EXPECT_CALL(event_listener_, OnDisconnect(srv_client_conn))
+      .WillOnce(InvokeWithoutArgs(cli_disconnected));
+
+  // TODO(primiano): when the a peer disconnects, the other end receives a
+  // spurious OnDataAvailable() that needs to be acked with a Receive() to read
+  // the EOF. See b/69536434.
+  EXPECT_CALL(event_listener_, OnDataAvailable(srv_client_conn))
+      .WillOnce(Invoke([](UnixSocket* sock) { sock->ReceiveString(); }));
+
+  cli.reset();
+  task_runner_.RunUntilCheckpoint("cli_disconnected");
+  ASSERT_FALSE(srv_client_conn->is_connected());
+  EXPECT_EQ(geteuid(),
+            static_cast<uint32_t>(srv_client_conn->peer_uid_posix()));
+#if PERFETTO_BUILDFLAG(PERFETTO_OS_LINUX) || \
+    PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
+  EXPECT_EQ(getpid(), static_cast<pid_t>(srv_client_conn->peer_pid_linux()));
+#endif
+}
 
 TEST_F(UnixSocketTest, ClientAndServerExchangeFDs) {
+  static constexpr char cli_str[] = "cli>srv";
+  static constexpr char srv_str[] = "srv>cli";
   auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_,
                                 SockFamily::kUnix, SockType::kStream);
   ASSERT_TRUE(srv->is_listening());
@@ -271,79 +632,6 @@
   task_runner_.RunUntilCheckpoint("cli_disconnected");
 }
 
-TEST_F(UnixSocketTest, ListenWithPassedFileDescriptor) {
-  auto sock_raw =
-      UnixSocketRaw::CreateMayFail(SockFamily::kUnix, SockType::kStream);
-  ASSERT_TRUE(sock_raw.Bind(kSocketName));
-  auto fd = sock_raw.ReleaseFd();
-  auto srv = UnixSocket::Listen(std::move(fd), &event_listener_, &task_runner_,
-                                SockFamily::kUnix, SockType::kStream);
-  ASSERT_TRUE(srv->is_listening());
-
-  auto cli = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_,
-                                 SockFamily::kUnix, SockType::kStream);
-  EXPECT_CALL(event_listener_, OnConnect(cli.get(), true));
-  auto cli_connected = task_runner_.CreateCheckpoint("cli_connected");
-  auto srv_disconnected = task_runner_.CreateCheckpoint("srv_disconnected");
-  EXPECT_CALL(event_listener_, OnNewIncomingConnection(srv.get(), _))
-      .WillOnce(Invoke([this, cli_connected, srv_disconnected](
-                           UnixSocket*, UnixSocket* srv_conn) {
-        // Read the EOF state.
-        EXPECT_CALL(event_listener_, OnDataAvailable(srv_conn))
-            .WillOnce(
-                InvokeWithoutArgs([srv_conn] { srv_conn->ReceiveString(); }));
-        EXPECT_CALL(event_listener_, OnDisconnect(srv_conn))
-            .WillOnce(InvokeWithoutArgs(srv_disconnected));
-        cli_connected();
-      }));
-  task_runner_.RunUntilCheckpoint("cli_connected");
-  ASSERT_TRUE(cli->is_connected());
-  cli.reset();
-  task_runner_.RunUntilCheckpoint("srv_disconnected");
-}
-
-// Mostly a stress tests. Connects kNumClients clients to the same server and
-// tests that all can exchange data and can see the expected sequence of events.
-TEST_F(UnixSocketTest, SeveralClients) {
-  auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_,
-                                SockFamily::kUnix, SockType::kStream);
-  ASSERT_TRUE(srv->is_listening());
-  constexpr size_t kNumClients = 32;
-  std::unique_ptr<UnixSocket> cli[kNumClients];
-
-  EXPECT_CALL(event_listener_, OnNewIncomingConnection(srv.get(), _))
-      .Times(kNumClients)
-      .WillRepeatedly(Invoke([this](UnixSocket*, UnixSocket* s) {
-        EXPECT_CALL(event_listener_, OnDataAvailable(s))
-            .WillOnce(Invoke([](UnixSocket* t) {
-              ASSERT_EQ("PING", t->ReceiveString());
-              ASSERT_TRUE(t->Send("PONG"));
-            }));
-      }));
-
-  for (size_t i = 0; i < kNumClients; i++) {
-    cli[i] = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_,
-                                 SockFamily::kUnix, SockType::kStream);
-    EXPECT_CALL(event_listener_, OnConnect(cli[i].get(), true))
-        .WillOnce(Invoke([](UnixSocket* s, bool success) {
-          ASSERT_TRUE(success);
-          ASSERT_TRUE(s->Send("PING"));
-        }));
-
-    auto checkpoint = task_runner_.CreateCheckpoint(std::to_string(i));
-    EXPECT_CALL(event_listener_, OnDataAvailable(cli[i].get()))
-        .WillOnce(Invoke([checkpoint](UnixSocket* s) {
-          ASSERT_EQ("PONG", s->ReceiveString());
-          checkpoint();
-        }));
-  }
-
-  for (size_t i = 0; i < kNumClients; i++) {
-    task_runner_.RunUntilCheckpoint(std::to_string(i));
-    ASSERT_TRUE(Mock::VerifyAndClearExpectations(cli[i].get()));
-  }
-}
-
 // Creates two processes. The server process creates a file and passes it over
 // the socket to the client. Both processes mmap the file in shared mode and
 // check that they see the same contents.
@@ -372,7 +660,8 @@
     EXPECT_CALL(event_listener_, OnNewIncomingConnection(srv.get(), _))
         .WillOnce(Invoke(
             [this, tmp_fd, checkpoint, mem](UnixSocket*, UnixSocket* new_conn) {
-              ASSERT_EQ(geteuid(), static_cast<uint32_t>(new_conn->peer_uid()));
+              ASSERT_EQ(geteuid(),
+                        static_cast<uint32_t>(new_conn->peer_uid_posix()));
               ASSERT_TRUE(new_conn->Send("txfd", 5, tmp_fd));
               // Wait for the client to change this again.
               EXPECT_CALL(event_listener_, OnDataAvailable(new_conn))
@@ -420,189 +709,6 @@
   }
 }
 
-// Checks that the peer_uid() is retained after the client disconnects. The IPC
-// layer needs to rely on this to validate messages received immediately before
-// a client disconnects.
-TEST_F(UnixSocketTest, PeerCredentialsRetainedAfterDisconnect) {
-  auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_,
-                                SockFamily::kUnix, SockType::kStream);
-  ASSERT_TRUE(srv->is_listening());
-  UnixSocket* srv_client_conn = nullptr;
-  auto srv_connected = task_runner_.CreateCheckpoint("srv_connected");
-  EXPECT_CALL(event_listener_, OnNewIncomingConnection(srv.get(), _))
-      .WillOnce(Invoke(
-          [&srv_client_conn, srv_connected](UnixSocket*, UnixSocket* srv_conn) {
-            srv_client_conn = srv_conn;
-            EXPECT_EQ(geteuid(), static_cast<uint32_t>(srv_conn->peer_uid()));
-#if PERFETTO_BUILDFLAG(PERFETTO_OS_LINUX) || \
-    PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
-            EXPECT_EQ(getpid(), static_cast<pid_t>(srv_conn->peer_pid()));
-#endif
-            srv_connected();
-          }));
-  auto cli_connected = task_runner_.CreateCheckpoint("cli_connected");
-  auto cli = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_,
-                                 SockFamily::kUnix, SockType::kStream);
-  EXPECT_CALL(event_listener_, OnConnect(cli.get(), true))
-      .WillOnce(InvokeWithoutArgs(cli_connected));
-
-  task_runner_.RunUntilCheckpoint("cli_connected");
-  task_runner_.RunUntilCheckpoint("srv_connected");
-  ASSERT_NE(nullptr, srv_client_conn);
-  ASSERT_TRUE(srv_client_conn->is_connected());
-
-  auto cli_disconnected = task_runner_.CreateCheckpoint("cli_disconnected");
-  EXPECT_CALL(event_listener_, OnDisconnect(srv_client_conn))
-      .WillOnce(InvokeWithoutArgs(cli_disconnected));
-
-  // TODO(primiano): when the a peer disconnects, the other end receives a
-  // spurious OnDataAvailable() that needs to be acked with a Receive() to read
-  // the EOF. See b/69536434.
-  EXPECT_CALL(event_listener_, OnDataAvailable(srv_client_conn))
-      .WillOnce(Invoke([](UnixSocket* sock) { sock->ReceiveString(); }));
-
-  cli.reset();
-  task_runner_.RunUntilCheckpoint("cli_disconnected");
-  ASSERT_FALSE(srv_client_conn->is_connected());
-  EXPECT_EQ(geteuid(), static_cast<uint32_t>(srv_client_conn->peer_uid()));
-#if PERFETTO_BUILDFLAG(PERFETTO_OS_LINUX) || \
-    PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
-  EXPECT_EQ(getpid(), static_cast<pid_t>(srv_client_conn->peer_pid()));
-#endif
-}
-
-// Tests the SockPeerCredMode::kIgnore logic.
-TEST_F(UnixSocketTest, IgnorePeerCredentials) {
-  auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_,
-                                SockFamily::kUnix, SockType::kStream);
-  ASSERT_TRUE(srv->is_listening());
-  auto cli1_connected = task_runner_.CreateCheckpoint("cli1_connected");
-  auto cli1 = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_,
-                                  SockFamily::kUnix, SockType::kStream,
-                                  SockPeerCredMode::kIgnore);
-  EXPECT_CALL(event_listener_, OnConnect(cli1.get(), true))
-      .WillOnce(InvokeWithoutArgs(cli1_connected));
-
-  auto cli2_connected = task_runner_.CreateCheckpoint("cli2_connected");
-  auto cli2 = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_,
-                                  SockFamily::kUnix, SockType::kStream,
-                                  SockPeerCredMode::kReadOnConnect);
-  EXPECT_CALL(event_listener_, OnConnect(cli2.get(), true))
-      .WillOnce(InvokeWithoutArgs(cli2_connected));
-
-  task_runner_.RunUntilCheckpoint("cli1_connected");
-  task_runner_.RunUntilCheckpoint("cli2_connected");
-
-  ASSERT_EQ(cli1->peer_uid(/*skip_check_for_testing=*/true), kInvalidUid);
-  ASSERT_EQ(cli2->peer_uid(), geteuid());
-#if PERFETTO_BUILDFLAG(PERFETTO_OS_LINUX) || \
-    PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
-  ASSERT_EQ(cli1->peer_pid(/*skip_check_for_testing=*/true), kInvalidPid);
-  ASSERT_EQ(cli2->peer_pid(), getpid());
-#endif
-}
-
-TEST_F(UnixSocketTest, BlockingSend) {
-  auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_,
-                                SockFamily::kUnix, SockType::kStream);
-  ASSERT_TRUE(srv->is_listening());
-
-  auto all_frames_done = task_runner_.CreateCheckpoint("all_frames_done");
-  size_t total_bytes_received = 0;
-  constexpr size_t kTotalBytes = 1024 * 1024 * 4;
-  EXPECT_CALL(event_listener_, OnNewIncomingConnection(srv.get(), _))
-      .WillOnce(Invoke([this, &total_bytes_received, all_frames_done](
-                           UnixSocket*, UnixSocket* srv_conn) {
-        EXPECT_CALL(event_listener_, OnDataAvailable(srv_conn))
-            .WillRepeatedly(
-                Invoke([&total_bytes_received, all_frames_done](UnixSocket* s) {
-                  char buf[1024];
-                  size_t res = s->Receive(buf, sizeof(buf));
-                  total_bytes_received += res;
-                  if (total_bytes_received == kTotalBytes)
-                    all_frames_done();
-                }));
-      }));
-
-  // Override default timeout as this test can take time on the emulator.
-  const int kTimeoutMs = 60000 * 3;
-
-  // Perform the blocking send form another thread.
-  std::thread tx_thread([] {
-    TestTaskRunner tx_task_runner;
-    MockEventListener tx_events;
-    auto cli = UnixSocket::Connect(kSocketName, &tx_events, &tx_task_runner,
-                                   SockFamily::kUnix, SockType::kStream);
-
-    auto cli_connected = tx_task_runner.CreateCheckpoint("cli_connected");
-    EXPECT_CALL(tx_events, OnConnect(cli.get(), true))
-        .WillOnce(InvokeWithoutArgs(cli_connected));
-    tx_task_runner.RunUntilCheckpoint("cli_connected");
-
-    auto all_sent = tx_task_runner.CreateCheckpoint("all_sent");
-    char buf[1024 * 32] = {};
-    tx_task_runner.PostTask([&cli, &buf, all_sent] {
-      for (size_t i = 0; i < kTotalBytes / sizeof(buf); i++)
-        cli->Send(buf, sizeof(buf));
-      all_sent();
-    });
-    tx_task_runner.RunUntilCheckpoint("all_sent", kTimeoutMs);
-  });
-
-  task_runner_.RunUntilCheckpoint("all_frames_done", kTimeoutMs);
-  tx_thread.join();
-}
-
-// Regression test for b/76155349 . If the receiver end disconnects while the
-// sender is in the middle of a large send(), the socket should gracefully give
-// up (i.e. Shutdown()) but not crash.
-TEST_F(UnixSocketTest, ReceiverDisconnectsDuringSend) {
-  auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_,
-                                SockFamily::kUnix, SockType::kStream);
-  ASSERT_TRUE(srv->is_listening());
-  const int kTimeoutMs = 30000;
-
-  auto receive_done = task_runner_.CreateCheckpoint("receive_done");
-  EXPECT_CALL(event_listener_, OnNewIncomingConnection(srv.get(), _))
-      .WillOnce(Invoke([this, receive_done](UnixSocket*, UnixSocket* srv_conn) {
-        EXPECT_CALL(event_listener_, OnDataAvailable(srv_conn))
-            .WillOnce(Invoke([receive_done](UnixSocket* s) {
-              char buf[1024];
-              size_t res = s->Receive(buf, sizeof(buf));
-              ASSERT_EQ(1024u, res);
-              s->Shutdown(false /*notify*/);
-              receive_done();
-            }));
-      }));
-
-  // Perform the blocking send form another thread.
-  std::thread tx_thread([] {
-    TestTaskRunner tx_task_runner;
-    MockEventListener tx_events;
-    auto cli = UnixSocket::Connect(kSocketName, &tx_events, &tx_task_runner,
-                                   SockFamily::kUnix, SockType::kStream);
-
-    auto cli_connected = tx_task_runner.CreateCheckpoint("cli_connected");
-    EXPECT_CALL(tx_events, OnConnect(cli.get(), true))
-        .WillOnce(InvokeWithoutArgs(cli_connected));
-    tx_task_runner.RunUntilCheckpoint("cli_connected");
-
-    auto send_done = tx_task_runner.CreateCheckpoint("send_done");
-    // We need a
-    static constexpr size_t kBufSize = 32 * 1024 * 1024;
-    std::unique_ptr<char[]> buf(new char[kBufSize]());
-    tx_task_runner.PostTask([&cli, &buf, send_done] {
-      bool send_res = cli->Send(buf.get(), kBufSize);
-      ASSERT_FALSE(send_res);
-      send_done();
-    });
-
-    tx_task_runner.RunUntilCheckpoint("send_done", kTimeoutMs);
-  });
-  task_runner_.RunUntilCheckpoint("receive_done", kTimeoutMs);
-  tx_thread.join();
-}
-
 TEST_F(UnixSocketTest, ShiftMsgHdrSendPartialFirst) {
   // Send a part of the first iov, then send the rest.
   struct iovec iov[2] = {};
@@ -618,7 +724,7 @@
   hdr.msg_iov = iov;
   hdr.msg_iovlen = base::ArraySize(iov);
 
-  UnixSocketRaw::ShiftMsgHdr(1, &hdr);
+  UnixSocketRaw::ShiftMsgHdrPosix(1, &hdr);
   EXPECT_NE(hdr.msg_iov, nullptr);
   EXPECT_EQ(hdr.msg_iov[0].iov_base, &hello[1]);
   EXPECT_EQ(hdr.msg_iov[1].iov_base, &world[0]);
@@ -626,13 +732,13 @@
   EXPECT_STREQ(reinterpret_cast<char*>(hdr.msg_iov[0].iov_base), "ello");
   EXPECT_EQ(iov[0].iov_len, base::ArraySize(hello) - 1);
 
-  UnixSocketRaw::ShiftMsgHdr(base::ArraySize(hello) - 1, &hdr);
+  UnixSocketRaw::ShiftMsgHdrPosix(base::ArraySize(hello) - 1, &hdr);
   EXPECT_EQ(hdr.msg_iov, &iov[1]);
   EXPECT_EQ(static_cast<int>(hdr.msg_iovlen), 1);
   EXPECT_STREQ(reinterpret_cast<char*>(hdr.msg_iov[0].iov_base), world);
   EXPECT_EQ(hdr.msg_iov[0].iov_len, base::ArraySize(world));
 
-  UnixSocketRaw::ShiftMsgHdr(base::ArraySize(world), &hdr);
+  UnixSocketRaw::ShiftMsgHdrPosix(base::ArraySize(world), &hdr);
   EXPECT_EQ(hdr.msg_iov, nullptr);
   EXPECT_EQ(static_cast<int>(hdr.msg_iovlen), 0);
 }
@@ -652,13 +758,13 @@
   hdr.msg_iov = iov;
   hdr.msg_iovlen = base::ArraySize(iov);
 
-  UnixSocketRaw::ShiftMsgHdr(base::ArraySize(hello) + 1, &hdr);
+  UnixSocketRaw::ShiftMsgHdrPosix(base::ArraySize(hello) + 1, &hdr);
   EXPECT_NE(hdr.msg_iov, nullptr);
   EXPECT_EQ(static_cast<int>(hdr.msg_iovlen), 1);
   EXPECT_STREQ(reinterpret_cast<char*>(hdr.msg_iov[0].iov_base), "orld");
   EXPECT_EQ(hdr.msg_iov[0].iov_len, base::ArraySize(world) - 1);
 
-  UnixSocketRaw::ShiftMsgHdr(base::ArraySize(world) - 1, &hdr);
+  UnixSocketRaw::ShiftMsgHdrPosix(base::ArraySize(world) - 1, &hdr);
   EXPECT_EQ(hdr.msg_iov, nullptr);
   EXPECT_EQ(static_cast<int>(hdr.msg_iovlen), 0);
 }
@@ -678,14 +784,13 @@
   hdr.msg_iov = iov;
   hdr.msg_iovlen = base::ArraySize(iov);
 
-  UnixSocketRaw::ShiftMsgHdr(base::ArraySize(world) + base::ArraySize(hello),
-                             &hdr);
+  UnixSocketRaw::ShiftMsgHdrPosix(
+      base::ArraySize(world) + base::ArraySize(hello), &hdr);
   EXPECT_EQ(hdr.msg_iov, nullptr);
   EXPECT_EQ(static_cast<int>(hdr.msg_iovlen), 0);
 }
 
-void Handler(int) {}
-
+// For use in PartialSendMsgAll template argument. Cannot be a lambda.
 int RollbackSigaction(const struct sigaction* act) {
   return sigaction(SIGWINCH, act, nullptr);
 }
@@ -694,7 +799,7 @@
   UnixSocketRaw send_sock;
   UnixSocketRaw recv_sock;
   std::tie(send_sock, recv_sock) =
-      UnixSocketRaw::CreatePair(SockFamily::kUnix, SockType::kStream);
+      UnixSocketRaw::CreatePairPosix(SockFamily::kUnix, SockType::kStream);
   ASSERT_TRUE(send_sock);
   ASSERT_TRUE(recv_sock);
 
@@ -723,7 +828,7 @@
   //   this action will affect the whole process.
   struct sigaction oldact;
   struct sigaction newact = {};
-  newact.sa_handler = Handler;
+  newact.sa_handler = [](int) {};
   ASSERT_EQ(sigaction(SIGWINCH, &newact, &oldact), 0);
   base::ScopedResource<const struct sigaction*, RollbackSigaction, nullptr>
       rollback(&oldact);
@@ -734,7 +839,7 @@
     ASSERT_EQ(rd, 1);
     // We are now sure the other thread is in sendmsg, interrupt send.
     ASSERT_EQ(pthread_kill(blocked_thread, SIGWINCH), 0);
-    // Drain the socket to allow SendMsgAll to succeed.
+    // Drain the socket to allow SendMsgAllPosix to succeed.
     size_t offset = 1;
     while (offset < sizeof(recv_buf)) {
       rd = PERFETTO_EINTR(
@@ -745,7 +850,7 @@
   });
 
   // Test sending the send_buf in several chunks as an iov to exercise the
-  // more complicated code-paths of SendMsgAll.
+  // more complicated code-paths of SendMsgAllPosix.
   struct msghdr hdr = {};
   struct iovec iov[4];
   static_assert(sizeof(send_buf) % base::ArraySize(iov) == 0,
@@ -758,110 +863,15 @@
   hdr.msg_iov = iov;
   hdr.msg_iovlen = base::ArraySize(iov);
 
-  ASSERT_EQ(send_sock.SendMsgAll(&hdr), static_cast<ssize_t>(sizeof(send_buf)));
+  ASSERT_EQ(send_sock.SendMsgAllPosix(&hdr),
+            static_cast<ssize_t>(sizeof(send_buf)));
   send_sock.Shutdown();
   th.join();
   // Make sure the re-entry logic was actually triggered.
   ASSERT_EQ(hdr.msg_iov, nullptr);
   ASSERT_EQ(memcmp(send_buf, recv_buf, sizeof(send_buf)), 0);
 }
-
-TEST_F(UnixSocketTest, ReleaseSocket) {
-  auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_,
-                                SockFamily::kUnix, SockType::kStream);
-  ASSERT_TRUE(srv->is_listening());
-  auto connected = task_runner_.CreateCheckpoint("connected");
-  UnixSocket* peer = nullptr;
-  EXPECT_CALL(event_listener_, OnNewIncomingConnection(srv.get(), _))
-      .WillOnce(Invoke([connected, &peer](UnixSocket*, UnixSocket* new_conn) {
-        peer = new_conn;
-        connected();
-      }));
-
-  auto cli = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_,
-                                 SockFamily::kUnix, SockType::kStream);
-  EXPECT_CALL(event_listener_, OnConnect(cli.get(), true));
-  task_runner_.RunUntilCheckpoint("connected");
-  srv->Shutdown(true);
-
-  cli->Send("test");
-
-  ASSERT_NE(peer, nullptr);
-  auto raw_sock = peer->ReleaseSocket();
-
-  EXPECT_CALL(event_listener_, OnDataAvailable(_)).Times(0);
-  task_runner_.RunUntilIdle();
-
-  char buf[sizeof("test")];
-  ASSERT_TRUE(raw_sock);
-  ASSERT_EQ(raw_sock.Receive(buf, sizeof(buf)),
-            static_cast<ssize_t>(sizeof(buf)));
-  ASSERT_STREQ(buf, "test");
-}
-
-TEST_F(UnixSocketTest, TcpStream) {
-  char host_and_port[32];
-  int attempt = 0;
-  std::unique_ptr<UnixSocket> srv;
-
-  // Try listening on a random port. Some ports might be taken by other syste
-  // services. Do a bunch of attempts on different ports before giving up.
-  do {
-    sprintf(host_and_port, "127.0.0.1:%d", 10000 + (rand() % 10000));
-    srv = UnixSocket::Listen(host_and_port, &event_listener_, &task_runner_,
-                             SockFamily::kInet, SockType::kStream);
-  } while ((!srv || !srv->is_listening()) && attempt++ < 10);
-  ASSERT_TRUE(srv->is_listening());
-
-  constexpr size_t kNumClients = 3;
-  std::unique_ptr<UnixSocket> cli[kNumClients];
-  EXPECT_CALL(event_listener_, OnNewIncomingConnection(srv.get(), _))
-      .Times(kNumClients)
-      .WillRepeatedly(Invoke([&](UnixSocket*, UnixSocket* s) {
-        // OnDisconnect() might spuriously happen depending on the dtor order.
-        EXPECT_CALL(event_listener_, OnDisconnect(s)).Times(AtLeast(0));
-        EXPECT_CALL(event_listener_, OnDataAvailable(s))
-            .WillRepeatedly(Invoke([](UnixSocket* cli_sock) {
-              cli_sock->ReceiveString();  // Read connection EOF;
-            }));
-        ASSERT_TRUE(s->Send("welcome"));
-      }));
-
-  for (size_t i = 0; i < kNumClients; i++) {
-    cli[i] = UnixSocket::Connect(host_and_port, &event_listener_, &task_runner_,
-                                 SockFamily::kInet, SockType::kStream);
-    // PERFETTO_ILOG("cli : %p", reinterpret_cast<void*>(cli[i].get()));
-    auto checkpoint = task_runner_.CreateCheckpoint(std::to_string(i));
-    EXPECT_CALL(event_listener_, OnDisconnect(cli[i].get())).Times(AtLeast(0));
-    EXPECT_CALL(event_listener_, OnConnect(cli[i].get(), true));
-    EXPECT_CALL(event_listener_, OnDataAvailable(cli[i].get()))
-        .WillRepeatedly(Invoke([checkpoint](UnixSocket* s) {
-          auto str = s->ReceiveString();
-          if (str == "")
-            return;  // Connection EOF.
-          ASSERT_EQ("welcome", str);
-          checkpoint();
-        }));
-  }
-
-  for (size_t i = 0; i < kNumClients; i++) {
-    task_runner_.RunUntilCheckpoint(std::to_string(i));
-    ASSERT_TRUE(Mock::VerifyAndClearExpectations(cli[i].get()));
-  }
-}
-
-// TODO(primiano): add a test to check that in the case of a peer sending a fd
-// and the other end just doing a recv (without taking it), the fd is closed and
-// not left around.
-
-// TODO(primiano); add a test to check that a socket can be reused after
-// Shutdown(),
-
-// TODO(primiano): add a test to check that OnDisconnect() is called in all
-// possible cases.
-
-// TODO(primiano): add tests that destroy the socket in all possible stages and
-// verify that no spurious EventListener callback is received.
+#endif  // !OS_WIN
 
 }  // namespace
 }  // namespace base