Add TCP/IP support to UnixSocket

Adds the ability to create TCP sockets. This
will be used by the trace processor HTTP interface
(See bug).
Arguably the class should be renamed to just Socket,
but that is a larger change and perhaps not worth given
the nicher use case for TCP.
This CL also gets rid of the default args and makes all
call-sites explicit to avoid bugs in future.

Bug: 143074239
Change-Id: Ib3a66e1f33ff2c025defb0ecd066b0e0e491e2e9
diff --git a/src/base/unix_socket_unittest.cc b/src/base/unix_socket_unittest.cc
index 8f54cab..138f631 100644
--- a/src/base/unix_socket_unittest.cc
+++ b/src/base/unix_socket_unittest.cc
@@ -84,7 +84,8 @@
 };
 
 TEST_F(UnixSocketTest, ConnectionFailureIfUnreachable) {
-  auto cli = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_);
+  auto cli = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_,
+                                 SockFamily::kUnix, SockType::kStream);
   ASSERT_FALSE(cli->is_connected());
   auto checkpoint = task_runner_.CreateCheckpoint("failure");
   EXPECT_CALL(event_listener_, OnConnect(cli.get(), false))
@@ -95,7 +96,8 @@
 // Both server and client should see an OnDisconnect() if the server drops
 // incoming connections immediately as they are created.
 TEST_F(UnixSocketTest, ConnectionImmediatelyDroppedByServer) {
-  auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_);
+  auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_,
+                                SockFamily::kUnix, SockType::kStream);
   ASSERT_TRUE(srv->is_listening());
 
   // The server will immediately shutdown the connection upon
@@ -110,7 +112,8 @@
           }));
 
   auto checkpoint = task_runner_.CreateCheckpoint("cli_connected");
-  auto cli = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_);
+  auto cli = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_,
+                                 SockFamily::kUnix, SockType::kStream);
   EXPECT_CALL(event_listener_, OnConnect(cli.get(), true))
       .WillOnce(InvokeWithoutArgs(checkpoint));
   task_runner_.RunUntilCheckpoint("cli_connected");
@@ -125,10 +128,12 @@
 }
 
 TEST_F(UnixSocketTest, ClientAndServerExchangeData) {
-  auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_);
+  auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_,
+                                SockFamily::kUnix, SockType::kStream);
   ASSERT_TRUE(srv->is_listening());
 
-  auto cli = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_);
+  auto cli = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_,
+                                 SockFamily::kUnix, SockType::kStream);
   EXPECT_CALL(event_listener_, OnConnect(cli.get(), true));
   auto cli_connected = task_runner_.CreateCheckpoint("cli_connected");
   auto srv_disconnected = task_runner_.CreateCheckpoint("srv_disconnected");
@@ -184,10 +189,12 @@
 constexpr char srv_str[] = "srv>cli";
 
 TEST_F(UnixSocketTest, ClientAndServerExchangeFDs) {
-  auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_);
+  auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_,
+                                SockFamily::kUnix, SockType::kStream);
   ASSERT_TRUE(srv->is_listening());
 
-  auto cli = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_);
+  auto cli = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_,
+                                 SockFamily::kUnix, SockType::kStream);
   EXPECT_CALL(event_listener_, OnConnect(cli.get(), true));
   auto cli_connected = task_runner_.CreateCheckpoint("cli_connected");
   auto srv_disconnected = task_runner_.CreateCheckpoint("srv_disconnected");
@@ -266,13 +273,16 @@
 }
 
 TEST_F(UnixSocketTest, ListenWithPassedFileDescriptor) {
-  auto sock_raw = UnixSocketRaw::CreateMayFail(SockType::kStream);
+  auto sock_raw =
+      UnixSocketRaw::CreateMayFail(SockFamily::kUnix, SockType::kStream);
   ASSERT_TRUE(sock_raw.Bind(kSocketName));
   auto fd = sock_raw.ReleaseFd();
-  auto srv = UnixSocket::Listen(std::move(fd), &event_listener_, &task_runner_);
+  auto srv = UnixSocket::Listen(std::move(fd), &event_listener_, &task_runner_,
+                                SockFamily::kUnix, SockType::kStream);
   ASSERT_TRUE(srv->is_listening());
 
-  auto cli = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_);
+  auto cli = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_,
+                                 SockFamily::kUnix, SockType::kStream);
   EXPECT_CALL(event_listener_, OnConnect(cli.get(), true));
   auto cli_connected = task_runner_.CreateCheckpoint("cli_connected");
   auto srv_disconnected = task_runner_.CreateCheckpoint("srv_disconnected");
@@ -296,7 +306,8 @@
 // Mostly a stress tests. Connects kNumClients clients to the same server and
 // tests that all can exchange data and can see the expected sequence of events.
 TEST_F(UnixSocketTest, SeveralClients) {
-  auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_);
+  auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_,
+                                SockFamily::kUnix, SockType::kStream);
   ASSERT_TRUE(srv->is_listening());
   constexpr size_t kNumClients = 32;
   std::unique_ptr<UnixSocket> cli[kNumClients];
@@ -312,7 +323,8 @@
       }));
 
   for (size_t i = 0; i < kNumClients; i++) {
-    cli[i] = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_);
+    cli[i] = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_,
+                                 SockFamily::kUnix, SockType::kStream);
     EXPECT_CALL(event_listener_, OnConnect(cli[i].get(), true))
         .WillOnce(Invoke([](UnixSocket* s, bool success) {
           ASSERT_TRUE(success);
@@ -352,7 +364,8 @@
     ASSERT_NE(nullptr, mem);
     memcpy(mem, "shm rocks", 10);
 
-    auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_);
+    auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_,
+                                  SockFamily::kUnix, SockType::kStream);
     ASSERT_TRUE(srv->is_listening());
     // Signal the other process that it can connect.
     ASSERT_EQ(1, base::WriteAll(*pipe.wr, ".", 1));
@@ -377,8 +390,8 @@
     char sync_cmd = '\0';
     ASSERT_EQ(1, PERFETTO_EINTR(read(*pipe.rd, &sync_cmd, 1)));
     ASSERT_EQ('.', sync_cmd);
-    auto cli =
-        UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_);
+    auto cli = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_,
+                                   SockFamily::kUnix, SockType::kStream);
     EXPECT_CALL(event_listener_, OnConnect(cli.get(), true));
     auto checkpoint = task_runner_.CreateCheckpoint("change_seen_by_client");
     EXPECT_CALL(event_listener_, OnDataAvailable(cli.get()))
@@ -412,7 +425,8 @@
 // layer needs to rely on this to validate messages received immediately before
 // a client disconnects.
 TEST_F(UnixSocketTest, PeerCredentialsRetainedAfterDisconnect) {
-  auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_);
+  auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_,
+                                SockFamily::kUnix, SockType::kStream);
   ASSERT_TRUE(srv->is_listening());
   UnixSocket* srv_client_conn = nullptr;
   auto srv_connected = task_runner_.CreateCheckpoint("srv_connected");
@@ -428,7 +442,8 @@
             srv_connected();
           }));
   auto cli_connected = task_runner_.CreateCheckpoint("cli_connected");
-  auto cli = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_);
+  auto cli = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_,
+                                 SockFamily::kUnix, SockType::kStream);
   EXPECT_CALL(event_listener_, OnConnect(cli.get(), true))
       .WillOnce(InvokeWithoutArgs(cli_connected));
 
@@ -458,7 +473,8 @@
 }
 
 TEST_F(UnixSocketTest, BlockingSend) {
-  auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_);
+  auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_,
+                                SockFamily::kUnix, SockType::kStream);
   ASSERT_TRUE(srv->is_listening());
 
   auto all_frames_done = task_runner_.CreateCheckpoint("all_frames_done");
@@ -485,7 +501,8 @@
   std::thread tx_thread([] {
     TestTaskRunner tx_task_runner;
     MockEventListener tx_events;
-    auto cli = UnixSocket::Connect(kSocketName, &tx_events, &tx_task_runner);
+    auto cli = UnixSocket::Connect(kSocketName, &tx_events, &tx_task_runner,
+                                   SockFamily::kUnix, SockType::kStream);
 
     auto cli_connected = tx_task_runner.CreateCheckpoint("cli_connected");
     EXPECT_CALL(tx_events, OnConnect(cli.get(), true))
@@ -510,7 +527,8 @@
 // sender is in the middle of a large send(), the socket should gracefully give
 // up (i.e. Shutdown()) but not crash.
 TEST_F(UnixSocketTest, ReceiverDisconnectsDuringSend) {
-  auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_);
+  auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_,
+                                SockFamily::kUnix, SockType::kStream);
   ASSERT_TRUE(srv->is_listening());
   const int kTimeoutMs = 30000;
 
@@ -531,7 +549,8 @@
   std::thread tx_thread([] {
     TestTaskRunner tx_task_runner;
     MockEventListener tx_events;
-    auto cli = UnixSocket::Connect(kSocketName, &tx_events, &tx_task_runner);
+    auto cli = UnixSocket::Connect(kSocketName, &tx_events, &tx_task_runner,
+                                   SockFamily::kUnix, SockType::kStream);
 
     auto cli_connected = tx_task_runner.CreateCheckpoint("cli_connected");
     EXPECT_CALL(tx_events, OnConnect(cli.get(), true))
@@ -644,7 +663,8 @@
 TEST_F(UnixSocketTest, PartialSendMsgAll) {
   UnixSocketRaw send_sock;
   UnixSocketRaw recv_sock;
-  std::tie(send_sock, recv_sock) = UnixSocketRaw::CreatePair(SockType::kStream);
+  std::tie(send_sock, recv_sock) =
+      UnixSocketRaw::CreatePair(SockFamily::kUnix, SockType::kStream);
   ASSERT_TRUE(send_sock);
   ASSERT_TRUE(recv_sock);
 
@@ -717,7 +737,8 @@
 }
 
 TEST_F(UnixSocketTest, ReleaseSocket) {
-  auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_);
+  auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_,
+                                SockFamily::kUnix, SockType::kStream);
   ASSERT_TRUE(srv->is_listening());
   auto connected = task_runner_.CreateCheckpoint("connected");
   UnixSocket* peer = nullptr;
@@ -727,7 +748,8 @@
         connected();
       }));
 
-  auto cli = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_);
+  auto cli = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_,
+                                 SockFamily::kUnix, SockType::kStream);
   EXPECT_CALL(event_listener_, OnConnect(cli.get(), true));
   task_runner_.RunUntilCheckpoint("connected");
   srv->Shutdown(true);
@@ -747,6 +769,57 @@
   ASSERT_STREQ(buf, "test");
 }
 
+TEST_F(UnixSocketTest, TcpStream) {
+  char host_and_port[32];
+  int attempt = 0;
+  std::unique_ptr<UnixSocket> srv;
+
+  // Try listening on a random port. Some ports might be taken by other syste
+  // services. Do a bunch of attempts on different ports before giving up.
+  do {
+    sprintf(host_and_port, "127.0.0.1:%d", 10000 + (rand() % 10000));
+    srv = UnixSocket::Listen(host_and_port, &event_listener_, &task_runner_,
+                             SockFamily::kInet, SockType::kStream);
+  } while ((!srv || !srv->is_listening()) && attempt++ < 10);
+  ASSERT_TRUE(srv->is_listening());
+
+  constexpr size_t kNumClients = 3;
+  std::unique_ptr<UnixSocket> cli[kNumClients];
+  EXPECT_CALL(event_listener_, OnNewIncomingConnection(srv.get(), _))
+      .Times(kNumClients)
+      .WillRepeatedly(Invoke([&](UnixSocket*, UnixSocket* s) {
+        // OnDisconnect() might spuriously happen depending on the dtor order.
+        EXPECT_CALL(event_listener_, OnDisconnect(s)).Times(AtLeast(0));
+        EXPECT_CALL(event_listener_, OnDataAvailable(s))
+            .WillRepeatedly(Invoke([](UnixSocket* cli_sock) {
+              cli_sock->ReceiveString();  // Read connection EOF;
+            }));
+        ASSERT_TRUE(s->Send("welcome", kBlocking));
+      }));
+
+  for (size_t i = 0; i < kNumClients; i++) {
+    cli[i] = UnixSocket::Connect(host_and_port, &event_listener_, &task_runner_,
+                                 SockFamily::kInet, SockType::kStream);
+    // PERFETTO_ILOG("cli : %p", reinterpret_cast<void*>(cli[i].get()));
+    auto checkpoint = task_runner_.CreateCheckpoint(std::to_string(i));
+    EXPECT_CALL(event_listener_, OnDisconnect(cli[i].get())).Times(AtLeast(0));
+    EXPECT_CALL(event_listener_, OnConnect(cli[i].get(), true));
+    EXPECT_CALL(event_listener_, OnDataAvailable(cli[i].get()))
+        .WillRepeatedly(Invoke([checkpoint](UnixSocket* s) {
+          auto str = s->ReceiveString();
+          if (str == "")
+            return;  // Connection EOF.
+          ASSERT_EQ("welcome", str);
+          checkpoint();
+        }));
+  }
+
+  for (size_t i = 0; i < kNumClients; i++) {
+    task_runner_.RunUntilCheckpoint(std::to_string(i));
+    ASSERT_TRUE(Mock::VerifyAndClearExpectations(cli[i].get()));
+  }
+}
+
 // TODO(primiano): add a test to check that in the case of a peer sending a fd
 // and the other end just doing a recv (without taking it), the fd is closed and
 // not left around.