Merge "Add TCP/IP support to UnixSocket"
diff --git a/include/perfetto/ext/base/unix_socket.h b/include/perfetto/ext/base/unix_socket.h
index b4e8e04..4bb9b04 100644
--- a/include/perfetto/ext/base/unix_socket.h
+++ b/include/perfetto/ext/base/unix_socket.h
@@ -37,8 +37,9 @@
 
 // Use arbitrarily high values to avoid that some code accidentally ends up
 // assuming that these enum values match the sysroot's SOCK_xxx defines rather
-// than using GetUnixSockType().
+// than using GetSockType() / GetSockFamily().
 enum class SockType { kStream = 100, kDgram, kSeqPacket };
+enum class SockFamily { kUnix = 200, kInet };
 
 // UnixSocketRaw is a basic wrapper around UNIX sockets. It exposes wrapper
 // methods that take care of most common pitfalls (e.g., marking fd as
@@ -47,17 +48,20 @@
 class UnixSocketRaw {
  public:
   // Creates a new unconnected unix socket.
-  static UnixSocketRaw CreateMayFail(SockType t) { return UnixSocketRaw(t); }
+  static UnixSocketRaw CreateMayFail(SockFamily family, SockType type) {
+    return UnixSocketRaw(family, type);
+  }
 
   // Crates a pair of connected sockets.
-  static std::pair<UnixSocketRaw, UnixSocketRaw> CreatePair(SockType);
+  static std::pair<UnixSocketRaw, UnixSocketRaw> CreatePair(SockFamily,
+                                                            SockType);
 
   // Creates an uninitialized unix socket.
   UnixSocketRaw();
 
   // Creates a unix socket adopting an existing file descriptor. This is
   // typically used to inherit fds from init via environment variables.
-  UnixSocketRaw(ScopedFile, SockType);
+  UnixSocketRaw(ScopedFile, SockFamily, SockType);
 
   ~UnixSocketRaw() = default;
   UnixSocketRaw(UnixSocketRaw&&) noexcept = default;
@@ -73,6 +77,7 @@
   bool IsBlocking() const;
   void RetainOnExec();
   SockType type() const { return type_; }
+  SockFamily family() const { return family_; }
   int fd() const { return *fd_; }
   explicit operator bool() const { return !!fd_; }
 
@@ -98,13 +103,14 @@
   static void ShiftMsgHdr(size_t n, struct msghdr* msg);
 
  private:
-  explicit UnixSocketRaw(SockType);
+  UnixSocketRaw(SockFamily, SockType);
 
   UnixSocketRaw(const UnixSocketRaw&) = delete;
   UnixSocketRaw& operator=(const UnixSocketRaw&) = delete;
 
   ScopedFile fd_;
-  SockType type_{SockType::kStream};
+  SockFamily family_ = SockFamily::kUnix;
+  SockType type_ = SockType::kStream;
 };
 
 // A non-blocking UNIX domain socket. Allows also to transfer file descriptors.
@@ -179,22 +185,26 @@
 
   enum class BlockingMode { kNonBlocking, kBlocking };
 
-  // Creates a Unix domain socket and starts listening. If |socket_name|
-  // starts with a '@', an abstract socket will be created (Linux/Android only).
+  // Creates a socket and starts listening. If SockFamily::kUnix and
+  // |socket_name| starts with a '@', an abstract UNIX dmoain socket will be
+  // created instead of a filesystem-linked UNIX socket (Linux/Android only).
+  // If SockFamily::kInet, |socket_name| is host:port (e.g., "1.2.3.4:8000").
   // Returns always an instance. In case of failure (e.g., another socket
   // with the same name is  already listening) the returned socket will have
   // is_listening() == false and last_error() will contain the failure reason.
   static std::unique_ptr<UnixSocket> Listen(const std::string& socket_name,
                                             EventListener*,
                                             TaskRunner*,
-                                            SockType = SockType::kStream);
+                                            SockFamily,
+                                            SockType);
 
   // Attaches to a pre-existing socket. The socket must have been created in
   // SOCK_STREAM mode and the caller must have called bind() on it.
   static std::unique_ptr<UnixSocket> Listen(ScopedFile,
                                             EventListener*,
                                             TaskRunner*,
-                                            SockType = SockType::kStream);
+                                            SockFamily,
+                                            SockType);
 
   // Creates a Unix domain socket and connects to the listening endpoint.
   // Returns always an instance. EventListener::OnConnect(bool success) will
@@ -202,14 +212,15 @@
   static std::unique_ptr<UnixSocket> Connect(const std::string& socket_name,
                                              EventListener*,
                                              TaskRunner*,
-                                             SockType = SockType::kStream);
+                                             SockFamily,
+                                             SockType);
 
   // Constructs a UnixSocket using the given connected socket.
-  static std::unique_ptr<UnixSocket> AdoptConnected(
-      ScopedFile fd,
-      EventListener* event_listener,
-      TaskRunner* task_runner,
-      SockType sock_type);
+  static std::unique_ptr<UnixSocket> AdoptConnected(ScopedFile,
+                                                    EventListener*,
+                                                    TaskRunner*,
+                                                    SockFamily,
+                                                    SockType);
 
   UnixSocket(const UnixSocket&) = delete;
   UnixSocket& operator=(const UnixSocket&) = delete;
@@ -303,8 +314,13 @@
   UnixSocketRaw ReleaseSocket();
 
  private:
-  UnixSocket(EventListener*, TaskRunner*, SockType);
-  UnixSocket(EventListener*, TaskRunner*, ScopedFile, State, SockType);
+  UnixSocket(EventListener*, TaskRunner*, SockFamily, SockType);
+  UnixSocket(EventListener*,
+             TaskRunner*,
+             ScopedFile,
+             State,
+             SockFamily,
+             SockType);
 
   // Called once by the corresponding public static factory methods.
   void DoConnect(const std::string& socket_name);
diff --git a/src/base/unix_socket.cc b/src/base/unix_socket.cc
index 663cb14..e63f978 100644
--- a/src/base/unix_socket.cc
+++ b/src/base/unix_socket.cc
@@ -18,6 +18,8 @@
 
 #include <errno.h>
 #include <fcntl.h>
+#include <netdb.h>
+#include <netinet/in.h>
 #include <stdlib.h>
 #include <string.h>
 #include <sys/socket.h>
@@ -32,6 +34,7 @@
 #include "perfetto/base/build_config.h"
 #include "perfetto/base/logging.h"
 #include "perfetto/base/task_runner.h"
+#include "perfetto/ext/base/string_utils.h"
 #include "perfetto/ext/base/utils.h"
 
 #if PERFETTO_BUILDFLAG(PERFETTO_OS_MACOSX)
@@ -48,6 +51,7 @@
 #endif
 
 namespace {
+
 // MSG_NOSIGNAL is not supported on Mac OS X, but in that case the socket is
 // created with SO_NOSIGPIPE (See InitializeSocket()).
 #if PERFETTO_BUILDFLAG(PERFETTO_OS_MACOSX)
@@ -63,7 +67,36 @@
 using CBufLenType = socklen_t;
 #endif
 
-inline int GetUnixSockType(SockType type) {
+// A wrapper around variable-size sockaddr structs.
+// This is solving the following problem: when calling connect() or bind(), the
+// caller needs to take care to allocate the right struct (sockaddr_un for
+// AF_UNIX, sockaddr_in for AF_INET).   Those structs have different sizes and,
+// more importantly, are bigger than the base struct sockaddr.
+struct SockaddrAny {
+  SockaddrAny() : size() {}
+  SockaddrAny(const void* addr, socklen_t sz) : data(new char[sz]), size(sz) {
+    memcpy(data.get(), addr, static_cast<size_t>(size));
+  }
+
+  const struct sockaddr* addr() const {
+    return reinterpret_cast<const struct sockaddr*>(data.get());
+  }
+
+  std::unique_ptr<char[]> data;
+  socklen_t size;
+};
+
+inline int GetSockFamily(SockFamily family) {
+  switch (family) {
+    case SockFamily::kUnix:
+      return AF_UNIX;
+    case SockFamily::kInet:
+      return AF_INET;
+  }
+  PERFETTO_CHECK(false);  // For GCC.
+}
+
+inline int GetSockType(SockType type) {
   switch (type) {
     case SockType::kStream:
       return SOCK_STREAM;
@@ -72,25 +105,42 @@
     case SockType::kSeqPacket:
       return SOCK_SEQPACKET;
   }
-  PERFETTO_CHECK(false);
+  PERFETTO_CHECK(false);  // For GCC.
 }
 
-bool MakeSockAddr(const std::string& socket_name,
-                  sockaddr_un* addr,
-                  socklen_t* addr_size) {
-  memset(addr, 0, sizeof(*addr));
-  const size_t name_len = socket_name.size();
-  if (name_len >= sizeof(addr->sun_path)) {
-    errno = ENAMETOOLONG;
-    return false;
+SockaddrAny MakeSockAddr(SockFamily family, const std::string& socket_name) {
+  switch (family) {
+    case SockFamily::kUnix: {
+      struct sockaddr_un saddr {};
+      const size_t name_len = socket_name.size();
+      if (name_len >= sizeof(saddr.sun_path)) {
+        errno = ENAMETOOLONG;
+        return SockaddrAny();
+      }
+      memcpy(saddr.sun_path, socket_name.data(), name_len);
+      if (saddr.sun_path[0] == '@')
+        saddr.sun_path[0] = '\0';
+      saddr.sun_family = AF_UNIX;
+      auto size = static_cast<socklen_t>(
+          __builtin_offsetof(sockaddr_un, sun_path) + name_len + 1);
+      PERFETTO_CHECK(static_cast<size_t>(size) <= sizeof(saddr));
+      return SockaddrAny(&saddr, size);
+    }
+    case SockFamily::kInet: {
+      auto parts = SplitString(socket_name, ":");
+      PERFETTO_CHECK(parts.size() == 2);
+      struct addrinfo* addr_info = nullptr;
+      struct addrinfo hints {};
+      hints.ai_family = AF_INET;
+      PERFETTO_CHECK(getaddrinfo(parts[0].c_str(), parts[1].c_str(), &hints,
+                                 &addr_info) == 0);
+      PERFETTO_CHECK(addr_info->ai_family == AF_INET);
+      SockaddrAny res(addr_info->ai_addr, addr_info->ai_addrlen);
+      freeaddrinfo(addr_info);
+      return res;
+    }
   }
-  memcpy(addr->sun_path, socket_name.data(), name_len);
-  if (addr->sun_path[0] == '@')
-    addr->sun_path[0] = '\0';
-  addr->sun_family = AF_UNIX;
-  *addr_size = static_cast<socklen_t>(
-      __builtin_offsetof(sockaddr_un, sun_path) + name_len + 1);
-  return true;
+  PERFETTO_CHECK(false);  // For GCC.
 }
 
 }  // namespace
@@ -122,29 +172,39 @@
 }
 
 // static
-std::pair<UnixSocketRaw, UnixSocketRaw> UnixSocketRaw::CreatePair(SockType t) {
+std::pair<UnixSocketRaw, UnixSocketRaw> UnixSocketRaw::CreatePair(
+    SockFamily family,
+    SockType type) {
   int fds[2];
-  if (socketpair(AF_UNIX, GetUnixSockType(t), 0, fds) != 0)
+  if (socketpair(GetSockFamily(family), GetSockType(type), 0, fds) != 0)
     return std::make_pair(UnixSocketRaw(), UnixSocketRaw());
 
-  return std::make_pair(UnixSocketRaw(ScopedFile(fds[0]), t),
-                        UnixSocketRaw(ScopedFile(fds[1]), t));
+  return std::make_pair(UnixSocketRaw(ScopedFile(fds[0]), family, type),
+                        UnixSocketRaw(ScopedFile(fds[1]), family, type));
 }
 
 UnixSocketRaw::UnixSocketRaw() = default;
 
-UnixSocketRaw::UnixSocketRaw(SockType type)
-    : UnixSocketRaw(ScopedFile(socket(AF_UNIX, GetUnixSockType(type), 0)),
-                    type) {}
+UnixSocketRaw::UnixSocketRaw(SockFamily family, SockType type)
+    : UnixSocketRaw(
+          ScopedFile(socket(GetSockFamily(family), GetSockType(type), 0)),
+          family,
+          type) {}
 
-UnixSocketRaw::UnixSocketRaw(ScopedFile fd, SockType type)
-    : fd_(std::move(fd)), type_(type) {
+UnixSocketRaw::UnixSocketRaw(ScopedFile fd, SockFamily family, SockType type)
+    : fd_(std::move(fd)), family_(family), type_(type) {
   PERFETTO_CHECK(fd_);
 #if PERFETTO_BUILDFLAG(PERFETTO_OS_MACOSX)
   const int no_sigpipe = 1;
   setsockopt(*fd_, SOL_SOCKET, SO_NOSIGPIPE, &no_sigpipe, sizeof(no_sigpipe));
 #endif
 
+  if (family == SockFamily::kInet) {
+    int flag = 1;
+    PERFETTO_CHECK(
+        !setsockopt(*fd_, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof(flag)));
+  }
+
   // There is no reason why a socket should outlive the process in case of
   // exec() by default, this is just working around a broken unix design.
   int fcntl_res = fcntl(*fd_, F_SETFD, FD_CLOEXEC);
@@ -178,12 +238,11 @@
 
 bool UnixSocketRaw::Bind(const std::string& socket_name) {
   PERFETTO_DCHECK(fd_);
-  sockaddr_un addr;
-  socklen_t addr_size;
-  if (!MakeSockAddr(socket_name, &addr, &addr_size))
+  SockaddrAny addr = MakeSockAddr(family_, socket_name);
+  if (addr.size == 0)
     return false;
 
-  if (bind(*fd_, reinterpret_cast<sockaddr*>(&addr), addr_size)) {
+  if (bind(*fd_, addr.addr(), addr.size)) {
     PERFETTO_DPLOG("bind(%s)", socket_name.c_str());
     return false;
   }
@@ -199,13 +258,11 @@
 
 bool UnixSocketRaw::Connect(const std::string& socket_name) {
   PERFETTO_DCHECK(fd_);
-  sockaddr_un addr;
-  socklen_t addr_size;
-  if (!MakeSockAddr(socket_name, &addr, &addr_size))
+  SockaddrAny addr = MakeSockAddr(family_, socket_name);
+  if (addr.size == 0)
     return false;
 
-  int res = PERFETTO_EINTR(
-      connect(*fd_, reinterpret_cast<sockaddr*>(&addr), addr_size));
+  int res = PERFETTO_EINTR(connect(*fd_, addr.addr(), addr.size));
   if (res && errno != EINPROGRESS)
     return false;
 
@@ -371,32 +428,36 @@
 std::unique_ptr<UnixSocket> UnixSocket::Listen(const std::string& socket_name,
                                                EventListener* event_listener,
                                                TaskRunner* task_runner,
+                                               SockFamily sock_family,
                                                SockType sock_type) {
-  auto sock_raw = UnixSocketRaw::CreateMayFail(sock_type);
+  auto sock_raw = UnixSocketRaw::CreateMayFail(sock_family, sock_type);
   if (!sock_raw || !sock_raw.Bind(socket_name))
     return nullptr;
 
   // Forward the call to the Listen() overload below.
-  return Listen(sock_raw.ReleaseFd(), event_listener, task_runner, sock_type);
+  return Listen(sock_raw.ReleaseFd(), event_listener, task_runner, sock_family,
+                sock_type);
 }
 
 // static
 std::unique_ptr<UnixSocket> UnixSocket::Listen(ScopedFile fd,
                                                EventListener* event_listener,
                                                TaskRunner* task_runner,
+                                               SockFamily sock_family,
                                                SockType sock_type) {
   return std::unique_ptr<UnixSocket>(
       new UnixSocket(event_listener, task_runner, std::move(fd),
-                     State::kListening, sock_type));
+                     State::kListening, sock_family, sock_type));
 }
 
 // static
 std::unique_ptr<UnixSocket> UnixSocket::Connect(const std::string& socket_name,
                                                 EventListener* event_listener,
                                                 TaskRunner* task_runner,
+                                                SockFamily sock_family,
                                                 SockType sock_type) {
   std::unique_ptr<UnixSocket> sock(
-      new UnixSocket(event_listener, task_runner, sock_type));
+      new UnixSocket(event_listener, task_runner, sock_family, sock_type));
   sock->DoConnect(socket_name);
   return sock;
 }
@@ -406,25 +467,29 @@
     ScopedFile fd,
     EventListener* event_listener,
     TaskRunner* task_runner,
+    SockFamily sock_family,
     SockType sock_type) {
   return std::unique_ptr<UnixSocket>(
       new UnixSocket(event_listener, task_runner, std::move(fd),
-                     State::kConnected, sock_type));
+                     State::kConnected, sock_family, sock_type));
 }
 
 UnixSocket::UnixSocket(EventListener* event_listener,
                        TaskRunner* task_runner,
+                       SockFamily sock_family,
                        SockType sock_type)
     : UnixSocket(event_listener,
                  task_runner,
                  ScopedFile(),
                  State::kDisconnected,
+                 sock_family,
                  sock_type) {}
 
 UnixSocket::UnixSocket(EventListener* event_listener,
                        TaskRunner* task_runner,
                        ScopedFile adopt_fd,
                        State adopt_state,
+                       SockFamily sock_family,
                        SockType sock_type)
     : event_listener_(event_listener),
       task_runner_(task_runner),
@@ -432,14 +497,14 @@
   state_ = State::kDisconnected;
   if (adopt_state == State::kDisconnected) {
     PERFETTO_DCHECK(!adopt_fd);
-    sock_raw_ = UnixSocketRaw::CreateMayFail(sock_type);
+    sock_raw_ = UnixSocketRaw::CreateMayFail(sock_family, sock_type);
     if (!sock_raw_) {
       last_error_ = errno;
       return;
     }
   } else if (adopt_state == State::kConnected) {
     PERFETTO_DCHECK(adopt_fd);
-    sock_raw_ = UnixSocketRaw(std::move(adopt_fd), sock_type);
+    sock_raw_ = UnixSocketRaw(std::move(adopt_fd), sock_family, sock_type);
     state_ = State::kConnected;
     ReadPeerCredentials();
   } else if (adopt_state == State::kListening) {
@@ -451,7 +516,7 @@
       return;
     }
 
-    sock_raw_ = UnixSocketRaw(std::move(adopt_fd), sock_type);
+    sock_raw_ = UnixSocketRaw(std::move(adopt_fd), sock_family, sock_type);
     if (!sock_raw_.Listen()) {
       last_error_ = errno;
       PERFETTO_DPLOG("listen()");
@@ -468,6 +533,7 @@
   sock_raw_.SetBlocking(false);
 
   WeakPtr<UnixSocket> weak_ptr = weak_ptr_factory_.GetWeakPtr();
+
   task_runner_->AddFileDescriptorWatch(sock_raw_.fd(), [weak_ptr] {
     if (weak_ptr)
       weak_ptr->OnEvent();
@@ -522,6 +588,10 @@
 }
 
 void UnixSocket::ReadPeerCredentials() {
+  // Peer credentials are supported only on AF_UNIX sockets.
+  if (sock_raw_.family() != SockFamily::kUnix)
+    return;
+
 #if PERFETTO_BUILDFLAG(PERFETTO_OS_LINUX) || \
     PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
   struct ucred user_cred;
@@ -554,6 +624,7 @@
     socklen_t err_len = sizeof(sock_err);
     int res =
         getsockopt(sock_raw_.fd(), SOL_SOCKET, SO_ERROR, &sock_err, &err_len);
+
     if (res == 0 && sock_err == EINPROGRESS)
       return;  // Not connected yet, just a spurious FD watch wakeup.
     if (res == 0 && sock_err == 0) {
@@ -561,6 +632,7 @@
       state_ = State::kConnected;
       return event_listener_->OnConnect(this, true /* connected */);
     }
+    PERFETTO_DLOG("Connection error: %s", strerror(sock_err));
     last_error_ = sock_err;
     Shutdown(false);
     return event_listener_->OnConnect(this, false /* connected */);
@@ -571,15 +643,15 @@
     // There could be more than one incoming connection behind each FD watch
     // notification. Drain'em all.
     for (;;) {
-      sockaddr_un cli_addr = {};
+      struct sockaddr_in cli_addr {};
       socklen_t size = sizeof(cli_addr);
       ScopedFile new_fd(PERFETTO_EINTR(accept(
           sock_raw_.fd(), reinterpret_cast<sockaddr*>(&cli_addr), &size)));
       if (!new_fd)
         return;
-      std::unique_ptr<UnixSocket> new_sock(
-          new UnixSocket(event_listener_, task_runner_, std::move(new_fd),
-                         State::kConnected, sock_raw_.type()));
+      std::unique_ptr<UnixSocket> new_sock(new UnixSocket(
+          event_listener_, task_runner_, std::move(new_fd), State::kConnected,
+          sock_raw_.family(), sock_raw_.type()));
       event_listener_->OnNewIncomingConnection(this, std::move(new_sock));
     }
   }
diff --git a/src/base/unix_socket_unittest.cc b/src/base/unix_socket_unittest.cc
index 8f54cab..138f631 100644
--- a/src/base/unix_socket_unittest.cc
+++ b/src/base/unix_socket_unittest.cc
@@ -84,7 +84,8 @@
 };
 
 TEST_F(UnixSocketTest, ConnectionFailureIfUnreachable) {
-  auto cli = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_);
+  auto cli = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_,
+                                 SockFamily::kUnix, SockType::kStream);
   ASSERT_FALSE(cli->is_connected());
   auto checkpoint = task_runner_.CreateCheckpoint("failure");
   EXPECT_CALL(event_listener_, OnConnect(cli.get(), false))
@@ -95,7 +96,8 @@
 // Both server and client should see an OnDisconnect() if the server drops
 // incoming connections immediately as they are created.
 TEST_F(UnixSocketTest, ConnectionImmediatelyDroppedByServer) {
-  auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_);
+  auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_,
+                                SockFamily::kUnix, SockType::kStream);
   ASSERT_TRUE(srv->is_listening());
 
   // The server will immediately shutdown the connection upon
@@ -110,7 +112,8 @@
           }));
 
   auto checkpoint = task_runner_.CreateCheckpoint("cli_connected");
-  auto cli = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_);
+  auto cli = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_,
+                                 SockFamily::kUnix, SockType::kStream);
   EXPECT_CALL(event_listener_, OnConnect(cli.get(), true))
       .WillOnce(InvokeWithoutArgs(checkpoint));
   task_runner_.RunUntilCheckpoint("cli_connected");
@@ -125,10 +128,12 @@
 }
 
 TEST_F(UnixSocketTest, ClientAndServerExchangeData) {
-  auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_);
+  auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_,
+                                SockFamily::kUnix, SockType::kStream);
   ASSERT_TRUE(srv->is_listening());
 
-  auto cli = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_);
+  auto cli = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_,
+                                 SockFamily::kUnix, SockType::kStream);
   EXPECT_CALL(event_listener_, OnConnect(cli.get(), true));
   auto cli_connected = task_runner_.CreateCheckpoint("cli_connected");
   auto srv_disconnected = task_runner_.CreateCheckpoint("srv_disconnected");
@@ -184,10 +189,12 @@
 constexpr char srv_str[] = "srv>cli";
 
 TEST_F(UnixSocketTest, ClientAndServerExchangeFDs) {
-  auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_);
+  auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_,
+                                SockFamily::kUnix, SockType::kStream);
   ASSERT_TRUE(srv->is_listening());
 
-  auto cli = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_);
+  auto cli = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_,
+                                 SockFamily::kUnix, SockType::kStream);
   EXPECT_CALL(event_listener_, OnConnect(cli.get(), true));
   auto cli_connected = task_runner_.CreateCheckpoint("cli_connected");
   auto srv_disconnected = task_runner_.CreateCheckpoint("srv_disconnected");
@@ -266,13 +273,16 @@
 }
 
 TEST_F(UnixSocketTest, ListenWithPassedFileDescriptor) {
-  auto sock_raw = UnixSocketRaw::CreateMayFail(SockType::kStream);
+  auto sock_raw =
+      UnixSocketRaw::CreateMayFail(SockFamily::kUnix, SockType::kStream);
   ASSERT_TRUE(sock_raw.Bind(kSocketName));
   auto fd = sock_raw.ReleaseFd();
-  auto srv = UnixSocket::Listen(std::move(fd), &event_listener_, &task_runner_);
+  auto srv = UnixSocket::Listen(std::move(fd), &event_listener_, &task_runner_,
+                                SockFamily::kUnix, SockType::kStream);
   ASSERT_TRUE(srv->is_listening());
 
-  auto cli = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_);
+  auto cli = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_,
+                                 SockFamily::kUnix, SockType::kStream);
   EXPECT_CALL(event_listener_, OnConnect(cli.get(), true));
   auto cli_connected = task_runner_.CreateCheckpoint("cli_connected");
   auto srv_disconnected = task_runner_.CreateCheckpoint("srv_disconnected");
@@ -296,7 +306,8 @@
 // Mostly a stress tests. Connects kNumClients clients to the same server and
 // tests that all can exchange data and can see the expected sequence of events.
 TEST_F(UnixSocketTest, SeveralClients) {
-  auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_);
+  auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_,
+                                SockFamily::kUnix, SockType::kStream);
   ASSERT_TRUE(srv->is_listening());
   constexpr size_t kNumClients = 32;
   std::unique_ptr<UnixSocket> cli[kNumClients];
@@ -312,7 +323,8 @@
       }));
 
   for (size_t i = 0; i < kNumClients; i++) {
-    cli[i] = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_);
+    cli[i] = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_,
+                                 SockFamily::kUnix, SockType::kStream);
     EXPECT_CALL(event_listener_, OnConnect(cli[i].get(), true))
         .WillOnce(Invoke([](UnixSocket* s, bool success) {
           ASSERT_TRUE(success);
@@ -352,7 +364,8 @@
     ASSERT_NE(nullptr, mem);
     memcpy(mem, "shm rocks", 10);
 
-    auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_);
+    auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_,
+                                  SockFamily::kUnix, SockType::kStream);
     ASSERT_TRUE(srv->is_listening());
     // Signal the other process that it can connect.
     ASSERT_EQ(1, base::WriteAll(*pipe.wr, ".", 1));
@@ -377,8 +390,8 @@
     char sync_cmd = '\0';
     ASSERT_EQ(1, PERFETTO_EINTR(read(*pipe.rd, &sync_cmd, 1)));
     ASSERT_EQ('.', sync_cmd);
-    auto cli =
-        UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_);
+    auto cli = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_,
+                                   SockFamily::kUnix, SockType::kStream);
     EXPECT_CALL(event_listener_, OnConnect(cli.get(), true));
     auto checkpoint = task_runner_.CreateCheckpoint("change_seen_by_client");
     EXPECT_CALL(event_listener_, OnDataAvailable(cli.get()))
@@ -412,7 +425,8 @@
 // layer needs to rely on this to validate messages received immediately before
 // a client disconnects.
 TEST_F(UnixSocketTest, PeerCredentialsRetainedAfterDisconnect) {
-  auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_);
+  auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_,
+                                SockFamily::kUnix, SockType::kStream);
   ASSERT_TRUE(srv->is_listening());
   UnixSocket* srv_client_conn = nullptr;
   auto srv_connected = task_runner_.CreateCheckpoint("srv_connected");
@@ -428,7 +442,8 @@
             srv_connected();
           }));
   auto cli_connected = task_runner_.CreateCheckpoint("cli_connected");
-  auto cli = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_);
+  auto cli = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_,
+                                 SockFamily::kUnix, SockType::kStream);
   EXPECT_CALL(event_listener_, OnConnect(cli.get(), true))
       .WillOnce(InvokeWithoutArgs(cli_connected));
 
@@ -458,7 +473,8 @@
 }
 
 TEST_F(UnixSocketTest, BlockingSend) {
-  auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_);
+  auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_,
+                                SockFamily::kUnix, SockType::kStream);
   ASSERT_TRUE(srv->is_listening());
 
   auto all_frames_done = task_runner_.CreateCheckpoint("all_frames_done");
@@ -485,7 +501,8 @@
   std::thread tx_thread([] {
     TestTaskRunner tx_task_runner;
     MockEventListener tx_events;
-    auto cli = UnixSocket::Connect(kSocketName, &tx_events, &tx_task_runner);
+    auto cli = UnixSocket::Connect(kSocketName, &tx_events, &tx_task_runner,
+                                   SockFamily::kUnix, SockType::kStream);
 
     auto cli_connected = tx_task_runner.CreateCheckpoint("cli_connected");
     EXPECT_CALL(tx_events, OnConnect(cli.get(), true))
@@ -510,7 +527,8 @@
 // sender is in the middle of a large send(), the socket should gracefully give
 // up (i.e. Shutdown()) but not crash.
 TEST_F(UnixSocketTest, ReceiverDisconnectsDuringSend) {
-  auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_);
+  auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_,
+                                SockFamily::kUnix, SockType::kStream);
   ASSERT_TRUE(srv->is_listening());
   const int kTimeoutMs = 30000;
 
@@ -531,7 +549,8 @@
   std::thread tx_thread([] {
     TestTaskRunner tx_task_runner;
     MockEventListener tx_events;
-    auto cli = UnixSocket::Connect(kSocketName, &tx_events, &tx_task_runner);
+    auto cli = UnixSocket::Connect(kSocketName, &tx_events, &tx_task_runner,
+                                   SockFamily::kUnix, SockType::kStream);
 
     auto cli_connected = tx_task_runner.CreateCheckpoint("cli_connected");
     EXPECT_CALL(tx_events, OnConnect(cli.get(), true))
@@ -644,7 +663,8 @@
 TEST_F(UnixSocketTest, PartialSendMsgAll) {
   UnixSocketRaw send_sock;
   UnixSocketRaw recv_sock;
-  std::tie(send_sock, recv_sock) = UnixSocketRaw::CreatePair(SockType::kStream);
+  std::tie(send_sock, recv_sock) =
+      UnixSocketRaw::CreatePair(SockFamily::kUnix, SockType::kStream);
   ASSERT_TRUE(send_sock);
   ASSERT_TRUE(recv_sock);
 
@@ -717,7 +737,8 @@
 }
 
 TEST_F(UnixSocketTest, ReleaseSocket) {
-  auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_);
+  auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_,
+                                SockFamily::kUnix, SockType::kStream);
   ASSERT_TRUE(srv->is_listening());
   auto connected = task_runner_.CreateCheckpoint("connected");
   UnixSocket* peer = nullptr;
@@ -727,7 +748,8 @@
         connected();
       }));
 
-  auto cli = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_);
+  auto cli = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_,
+                                 SockFamily::kUnix, SockType::kStream);
   EXPECT_CALL(event_listener_, OnConnect(cli.get(), true));
   task_runner_.RunUntilCheckpoint("connected");
   srv->Shutdown(true);
@@ -747,6 +769,57 @@
   ASSERT_STREQ(buf, "test");
 }
 
+TEST_F(UnixSocketTest, TcpStream) {
+  char host_and_port[32];
+  int attempt = 0;
+  std::unique_ptr<UnixSocket> srv;
+
+  // Try listening on a random port. Some ports might be taken by other syste
+  // services. Do a bunch of attempts on different ports before giving up.
+  do {
+    sprintf(host_and_port, "127.0.0.1:%d", 10000 + (rand() % 10000));
+    srv = UnixSocket::Listen(host_and_port, &event_listener_, &task_runner_,
+                             SockFamily::kInet, SockType::kStream);
+  } while ((!srv || !srv->is_listening()) && attempt++ < 10);
+  ASSERT_TRUE(srv->is_listening());
+
+  constexpr size_t kNumClients = 3;
+  std::unique_ptr<UnixSocket> cli[kNumClients];
+  EXPECT_CALL(event_listener_, OnNewIncomingConnection(srv.get(), _))
+      .Times(kNumClients)
+      .WillRepeatedly(Invoke([&](UnixSocket*, UnixSocket* s) {
+        // OnDisconnect() might spuriously happen depending on the dtor order.
+        EXPECT_CALL(event_listener_, OnDisconnect(s)).Times(AtLeast(0));
+        EXPECT_CALL(event_listener_, OnDataAvailable(s))
+            .WillRepeatedly(Invoke([](UnixSocket* cli_sock) {
+              cli_sock->ReceiveString();  // Read connection EOF;
+            }));
+        ASSERT_TRUE(s->Send("welcome", kBlocking));
+      }));
+
+  for (size_t i = 0; i < kNumClients; i++) {
+    cli[i] = UnixSocket::Connect(host_and_port, &event_listener_, &task_runner_,
+                                 SockFamily::kInet, SockType::kStream);
+    // PERFETTO_ILOG("cli : %p", reinterpret_cast<void*>(cli[i].get()));
+    auto checkpoint = task_runner_.CreateCheckpoint(std::to_string(i));
+    EXPECT_CALL(event_listener_, OnDisconnect(cli[i].get())).Times(AtLeast(0));
+    EXPECT_CALL(event_listener_, OnConnect(cli[i].get(), true));
+    EXPECT_CALL(event_listener_, OnDataAvailable(cli[i].get()))
+        .WillRepeatedly(Invoke([checkpoint](UnixSocket* s) {
+          auto str = s->ReceiveString();
+          if (str == "")
+            return;  // Connection EOF.
+          ASSERT_EQ("welcome", str);
+          checkpoint();
+        }));
+  }
+
+  for (size_t i = 0; i < kNumClients; i++) {
+    task_runner_.RunUntilCheckpoint(std::to_string(i));
+    ASSERT_TRUE(Mock::VerifyAndClearExpectations(cli[i].get()));
+  }
+}
+
 // TODO(primiano): add a test to check that in the case of a peer sending a fd
 // and the other end just doing a recv (without taking it), the fd is closed and
 // not left around.
diff --git a/src/ipc/client_impl.cc b/src/ipc/client_impl.cc
index c3bd830..b563d95 100644
--- a/src/ipc/client_impl.cc
+++ b/src/ipc/client_impl.cc
@@ -43,7 +43,9 @@
 
 ClientImpl::ClientImpl(const char* socket_name, base::TaskRunner* task_runner)
     : task_runner_(task_runner), weak_ptr_factory_(this) {
-  sock_ = base::UnixSocket::Connect(socket_name, this, task_runner);
+  sock_ = base::UnixSocket::Connect(socket_name, this, task_runner,
+                                    base::SockFamily::kUnix,
+                                    base::SockType::kStream);
 }
 
 ClientImpl::~ClientImpl() {
diff --git a/src/ipc/client_impl_unittest.cc b/src/ipc/client_impl_unittest.cc
index 84a9dd1..e8e634e 100644
--- a/src/ipc/client_impl_unittest.cc
+++ b/src/ipc/client_impl_unittest.cc
@@ -103,7 +103,9 @@
 
   explicit FakeHost(base::TaskRunner* task_runner) {
     DESTROY_TEST_SOCK(kSockName);
-    listening_sock = base::UnixSocket::Listen(kSockName, this, task_runner);
+    listening_sock = base::UnixSocket::Listen(kSockName, this, task_runner,
+                                              base::SockFamily::kUnix,
+                                              base::SockType::kStream);
     EXPECT_TRUE(listening_sock->is_listening());
   }
   ~FakeHost() override { DESTROY_TEST_SOCK(kSockName); }
diff --git a/src/ipc/host_impl.cc b/src/ipc/host_impl.cc
index 7a7d95c..0a56b85 100644
--- a/src/ipc/host_impl.cc
+++ b/src/ipc/host_impl.cc
@@ -55,13 +55,17 @@
 HostImpl::HostImpl(base::ScopedFile socket_fd, base::TaskRunner* task_runner)
     : task_runner_(task_runner), weak_ptr_factory_(this) {
   PERFETTO_DCHECK_THREAD(thread_checker_);
-  sock_ = base::UnixSocket::Listen(std::move(socket_fd), this, task_runner_);
+  sock_ = base::UnixSocket::Listen(std::move(socket_fd), this, task_runner_,
+                                   base::SockFamily::kUnix,
+                                   base::SockType::kStream);
 }
 
 HostImpl::HostImpl(const char* socket_name, base::TaskRunner* task_runner)
     : task_runner_(task_runner), weak_ptr_factory_(this) {
   PERFETTO_DCHECK_THREAD(thread_checker_);
-  sock_ = base::UnixSocket::Listen(socket_name, this, task_runner_);
+  sock_ = base::UnixSocket::Listen(socket_name, this, task_runner_,
+                                   base::SockFamily::kUnix,
+                                   base::SockType::kStream);
 }
 
 HostImpl::~HostImpl() = default;
diff --git a/src/ipc/host_impl_unittest.cc b/src/ipc/host_impl_unittest.cc
index 70215ff..0624cd7 100644
--- a/src/ipc/host_impl_unittest.cc
+++ b/src/ipc/host_impl_unittest.cc
@@ -88,7 +88,9 @@
   MOCK_METHOD0(OnRequestError, void());
 
   explicit FakeClient(base::TaskRunner* task_runner) {
-    sock_ = base::UnixSocket::Connect(kSockName, this, task_runner);
+    sock_ = base::UnixSocket::Connect(kSockName, this, task_runner,
+                                      base::SockFamily::kUnix,
+                                      base::SockType::kStream);
   }
 
   ~FakeClient() override = default;
@@ -334,15 +336,15 @@
                                                sizeof(kFileContent))),
             sizeof(kFileContent));
   EXPECT_CALL(*fake_service, OnFakeMethod1(_, _))
-      .WillOnce(Invoke([on_reply_sent, &tx_file](const RequestProto&,
-                                                 DeferredBase* reply) {
-        std::unique_ptr<ReplyProto> reply_args(new ReplyProto());
-        auto async_res = AsyncResult<ProtoMessage>(
-            std::unique_ptr<ProtoMessage>(reply_args.release()));
-        async_res.set_fd(tx_file.fd());
-        reply->Resolve(std::move(async_res));
-        on_reply_sent();
-      }));
+      .WillOnce(Invoke(
+          [on_reply_sent, &tx_file](const RequestProto&, DeferredBase* reply) {
+            std::unique_ptr<ReplyProto> reply_args(new ReplyProto());
+            auto async_res = AsyncResult<ProtoMessage>(
+                std::unique_ptr<ProtoMessage>(reply_args.release()));
+            async_res.set_fd(tx_file.fd());
+            reply->Resolve(std::move(async_res));
+            on_reply_sent();
+          }));
   task_runner_->RunUntilCheckpoint("on_reply_sent");
   tx_file.ReleaseFD();
 
diff --git a/src/profiling/memory/client.cc b/src/profiling/memory/client.cc
index e3a0e7d..8065733 100644
--- a/src/profiling/memory/client.cc
+++ b/src/profiling/memory/client.cc
@@ -101,7 +101,8 @@
 // static
 base::Optional<base::UnixSocketRaw> Client::ConnectToHeapprofd(
     const std::string& sock_name) {
-  auto sock = base::UnixSocketRaw::CreateMayFail(base::SockType::kStream);
+  auto sock = base::UnixSocketRaw::CreateMayFail(base::SockFamily::kUnix,
+                                                 base::SockType::kStream);
   if (!sock || !sock.Connect(sock_name)) {
     PERFETTO_PLOG("Failed to connect to %s", sock_name.c_str());
     return base::nullopt;
diff --git a/src/profiling/memory/heapprofd_producer.cc b/src/profiling/memory/heapprofd_producer.cc
index d1024cc..c10adc3 100644
--- a/src/profiling/memory/heapprofd_producer.cc
+++ b/src/profiling/memory/heapprofd_producer.cc
@@ -137,7 +137,7 @@
   PERFETTO_DCHECK(mode_ == HeapprofdMode::kChild);
   auto socket = base::UnixSocket::AdoptConnected(
       std::move(inherited_fd_), &socket_delegate_, task_runner_,
-      base::SockType::kStream);
+      base::SockFamily::kUnix, base::SockType::kStream);
 
   HandleClientConnection(std::move(socket), target_process_);
 }
@@ -610,7 +610,6 @@
   }
   DataSource& data_source = it->second;
 
-
   for (std::pair<const pid_t, ProcessState>& pid_and_process_state :
        data_source.process_states) {
     pid_t pid = pid_and_process_state.first;
diff --git a/src/profiling/memory/main.cc b/src/profiling/memory/main.cc
index 7df39e7..ff5ac97 100644
--- a/src/profiling/memory/main.cc
+++ b/src/profiling/memory/main.cc
@@ -159,9 +159,9 @@
   HeapprofdProducer producer(HeapprofdMode::kCentral, &task_runner);
 
   int listening_raw_socket = GetListeningSocket();
-  auto listening_socket =
-      base::UnixSocket::Listen(base::ScopedFile(listening_raw_socket),
-                               &producer.socket_delegate(), &task_runner);
+  auto listening_socket = base::UnixSocket::Listen(
+      base::ScopedFile(listening_raw_socket), &producer.socket_delegate(),
+      &task_runner, base::SockFamily::kUnix, base::SockType::kStream);
 
   struct sigaction action = {};
   action.sa_handler = [](int) { g_dump_evt->Notify(); };
diff --git a/src/profiling/memory/malloc_hooks.cc b/src/profiling/memory/malloc_hooks.cc
index 2eb4cd4..f696e58 100644
--- a/src/profiling/memory/malloc_hooks.cc
+++ b/src/profiling/memory/malloc_hooks.cc
@@ -256,7 +256,7 @@
   perfetto::base::UnixSocketRaw parent_sock;
   perfetto::base::UnixSocketRaw child_sock;
   std::tie(parent_sock, child_sock) = perfetto::base::UnixSocketRaw::CreatePair(
-      perfetto::base::SockType::kStream);
+      perfetto::base::SockFamily::kUnix, perfetto::base::SockType::kStream);
 
   if (!parent_sock || !child_sock) {
     PERFETTO_PLOG("Failed to create socketpair.");
diff --git a/src/profiling/memory/unwinding.cc b/src/profiling/memory/unwinding.cc
index 59a4dca..ced87f9 100644
--- a/src/profiling/memory/unwinding.cc
+++ b/src/profiling/memory/unwinding.cc
@@ -374,7 +374,7 @@
 void UnwindingWorker::HandleHandoffSocket(HandoffData handoff_data) {
   auto sock = base::UnixSocket::AdoptConnected(
       handoff_data.sock.ReleaseFd(), this, this->thread_task_runner_.get(),
-      base::SockType::kStream);
+      base::SockFamily::kUnix, base::SockType::kStream);
   pid_t peer_pid = sock->peer_pid();
 
   UnwindingMetadata metadata(peer_pid, std::move(handoff_data.maps_fd),
diff --git a/src/profiling/memory/wire_protocol_unittest.cc b/src/profiling/memory/wire_protocol_unittest.cc
index a8d623e..462ea06 100644
--- a/src/profiling/memory/wire_protocol_unittest.cc
+++ b/src/profiling/memory/wire_protocol_unittest.cc
@@ -56,8 +56,10 @@
   int sv[2];
   PERFETTO_CHECK(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == 0);
   base::UnixSocketRaw send_sock(base::ScopedFile(sv[0]),
+                                base::SockFamily::kUnix,
                                 base::SockType::kStream);
   base::UnixSocketRaw recv_sock(base::ScopedFile(sv[1]),
+                                base::SockFamily::kUnix,
                                 base::SockType::kStream);
   char msg[] = "a";
   PERFETTO_CHECK(send_sock.Send(msg, sizeof(msg), &fd, 1));
diff --git a/src/traced/probes/android_log/android_log_data_source.cc b/src/traced/probes/android_log/android_log_data_source.cc
index 73e84db..c49bec6 100644
--- a/src/traced/probes/android_log/android_log_data_source.cc
+++ b/src/traced/probes/android_log/android_log_data_source.cc
@@ -148,7 +148,8 @@
 }
 
 base::UnixSocketRaw AndroidLogDataSource::ConnectLogdrSocket() {
-  auto socket = base::UnixSocketRaw::CreateMayFail(base::SockType::kSeqPacket);
+  auto socket = base::UnixSocketRaw::CreateMayFail(base::SockFamily::kUnix,
+                                                   base::SockType::kSeqPacket);
   if (!socket || !socket.Connect(kLogdrSocket)) {
     PERFETTO_PLOG("Failed to connect to %s", kLogdrSocket);
     return base::UnixSocketRaw();
diff --git a/src/traced/probes/android_log/android_log_data_source_unittest.cc b/src/traced/probes/android_log/android_log_data_source_unittest.cc
index ee38ced..1c90204 100644
--- a/src/traced/probes/android_log/android_log_data_source_unittest.cc
+++ b/src/traced/probes/android_log/android_log_data_source_unittest.cc
@@ -62,8 +62,8 @@
     base::UnixSocketRaw recv_sock;
     // In theory this should be a kSeqPacket. We use kDgram here so that the
     // test can run also on MacOS (which doesn't support SOCK_SEQPACKET).
-    const auto kSockType = base::SockType::kDgram;
-    std::tie(send_sock, recv_sock) = base::UnixSocketRaw::CreatePair(kSockType);
+    std::tie(send_sock, recv_sock) = base::UnixSocketRaw::CreatePair(
+        base::SockFamily::kUnix, base::SockType::kDgram);
     ASSERT_TRUE(send_sock);
     ASSERT_TRUE(recv_sock);
 
diff --git a/test/producer_socket_fuzzer.cc b/test/producer_socket_fuzzer.cc
index 430b2de..ac7227f 100644
--- a/test/producer_socket_fuzzer.cc
+++ b/test/producer_socket_fuzzer.cc
@@ -63,7 +63,8 @@
       data, size, task_runner.CreateCheckpoint("data_sent"));
 
   std::unique_ptr<base::UnixSocket> sock = base::UnixSocket::Connect(
-      helper.GetProducerSocketName(), &fake_event_listener, &task_runner);
+      helper.GetProducerSocketName(), &fake_event_listener, &task_runner,
+      base::SockFamily::kUnix, base::SockType::kStream);
 
   task_runner.RunUntilCheckpoint("data_sent");
   return 0;