Move unix socket to base.

Change-Id: Ic84986efb90ee95a45abc4558fa472f890acd885
diff --git a/src/base/BUILD.gn b/src/base/BUILD.gn
index 68939a9..019cd7b 100644
--- a/src/base/BUILD.gn
+++ b/src/base/BUILD.gn
@@ -88,13 +88,13 @@
 
 if (!build_with_chromium) {
   # This cannot be in :base as it does not build on WASM.
-  source_set("sock_utils") {
+  source_set("unix_socket") {
     deps = [
       "../../gn:default_deps",
       "../../include/perfetto/base",
     ]
     sources = [
-      "sock_utils.cc",
+      "unix_socket.cc",
     ]
   }
 }
@@ -153,7 +153,10 @@
       "utils_unittest.cc",
     ]
   }
-  if (!build_with_chromium && (is_linux || is_android)) {
-    sources += [ "watchdog_unittest.cc" ]
+  if (!build_with_chromium) {
+    sources += [ "unix_socket_unittest.cc" ]
+    if (is_linux || is_android) {
+      sources += [ "watchdog_unittest.cc" ]
+    }
   }
 }
diff --git a/src/base/sock_utils.cc b/src/base/sock_utils.cc
deleted file mode 100644
index bbe00fd..0000000
--- a/src/base/sock_utils.cc
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Copyright (C) 2018 The Android Open Source Project
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "perfetto/base/sock_utils.h"
-
-#include <sys/socket.h>
-#include <sys/un.h>
-
-namespace perfetto {
-namespace base {
-namespace {
-// MSG_NOSIGNAL is not supported on Mac OS X, but in that case the socket is
-// created with SO_NOSIGPIPE (See InitializeSocket()).
-#if PERFETTO_BUILDFLAG(PERFETTO_OS_MACOSX)
-constexpr int kNoSigPipe = 0;
-#else
-constexpr int kNoSigPipe = MSG_NOSIGNAL;
-#endif
-
-// Android takes an int instead of socklen_t for the control buffer size.
-#if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
-using CBufLenType = size_t;
-#else
-using CBufLenType = socklen_t;
-#endif
-}
-
-// The CMSG_* macros use NULL instead of nullptr.
-#pragma GCC diagnostic push
-#if !PERFETTO_BUILDFLAG(PERFETTO_OS_MACOSX)
-#pragma GCC diagnostic ignored "-Wzero-as-null-pointer-constant"
-#endif
-
-ssize_t Send(int fd,
-             const void* msg,
-             size_t len,
-             const int* send_fds,
-             size_t num_fds) {
-  msghdr msg_hdr = {};
-  iovec iov = {const_cast<void*>(msg), len};
-  msg_hdr.msg_iov = &iov;
-  msg_hdr.msg_iovlen = 1;
-  alignas(cmsghdr) char control_buf[256];
-
-  if (num_fds > 0) {
-    const CBufLenType control_buf_len =
-        static_cast<CBufLenType>(CMSG_SPACE(num_fds * sizeof(int)));
-    PERFETTO_CHECK(control_buf_len <= sizeof(control_buf));
-    memset(control_buf, 0, sizeof(control_buf));
-    msg_hdr.msg_control = control_buf;
-    msg_hdr.msg_controllen = control_buf_len;
-    struct cmsghdr* cmsg = CMSG_FIRSTHDR(&msg_hdr);
-    cmsg->cmsg_level = SOL_SOCKET;
-    cmsg->cmsg_type = SCM_RIGHTS;
-    cmsg->cmsg_len = static_cast<CBufLenType>(CMSG_LEN(num_fds * sizeof(int)));
-    memcpy(CMSG_DATA(cmsg), send_fds, num_fds * sizeof(int));
-    msg_hdr.msg_controllen = cmsg->cmsg_len;
-  }
-
-  return PERFETTO_EINTR(sendmsg(fd, &msg_hdr, kNoSigPipe));
-}
-
-ssize_t Receive(int fd,
-                void* msg,
-                size_t len,
-                base::ScopedFile* fd_vec,
-                size_t max_files) {
-  msghdr msg_hdr = {};
-  iovec iov = {msg, len};
-  msg_hdr.msg_iov = &iov;
-  msg_hdr.msg_iovlen = 1;
-  alignas(cmsghdr) char control_buf[256];
-
-  if (max_files > 0) {
-    msg_hdr.msg_control = control_buf;
-    msg_hdr.msg_controllen =
-        static_cast<CBufLenType>(CMSG_SPACE(max_files * sizeof(int)));
-    PERFETTO_CHECK(msg_hdr.msg_controllen <= sizeof(control_buf));
-  }
-  const ssize_t sz = PERFETTO_EINTR(recvmsg(fd, &msg_hdr, kNoSigPipe));
-  if (sz <= 0) {
-    return sz;
-  }
-  PERFETTO_CHECK(static_cast<size_t>(sz) <= len);
-
-  int* fds = nullptr;
-  uint32_t fds_len = 0;
-
-  if (max_files > 0) {
-    for (cmsghdr* cmsg = CMSG_FIRSTHDR(&msg_hdr); cmsg;
-         cmsg = CMSG_NXTHDR(&msg_hdr, cmsg)) {
-      const size_t payload_len = cmsg->cmsg_len - CMSG_LEN(0);
-      if (cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SCM_RIGHTS) {
-        PERFETTO_DCHECK(payload_len % sizeof(int) == 0u);
-        PERFETTO_CHECK(fds == nullptr);
-        fds = reinterpret_cast<int*>(CMSG_DATA(cmsg));
-        fds_len = static_cast<uint32_t>(payload_len / sizeof(int));
-      }
-    }
-  }
-
-  if (msg_hdr.msg_flags & MSG_TRUNC || msg_hdr.msg_flags & MSG_CTRUNC) {
-    for (size_t i = 0; fds && i < fds_len; ++i)
-      close(fds[i]);
-    errno = EMSGSIZE;
-    return -1;
-  }
-
-  for (size_t i = 0; fds && i < fds_len; ++i) {
-    if (i < max_files)
-      fd_vec[i].reset(fds[i]);
-    else
-      close(fds[i]);
-  }
-
-  return sz;
-}
-
-#pragma GCC diagnostic pop
-
-bool MakeSockAddr(const std::string& socket_name,
-                  sockaddr_un* addr,
-                  socklen_t* addr_size) {
-  memset(addr, 0, sizeof(*addr));
-  const size_t name_len = socket_name.size();
-  if (name_len >= sizeof(addr->sun_path)) {
-    errno = ENAMETOOLONG;
-    return false;
-  }
-  memcpy(addr->sun_path, socket_name.data(), name_len);
-  if (addr->sun_path[0] == '@')
-    addr->sun_path[0] = '\0';
-  addr->sun_family = AF_UNIX;
-  *addr_size = static_cast<socklen_t>(
-      __builtin_offsetof(sockaddr_un, sun_path) + name_len + 1);
-  return true;
-}
-
-base::ScopedFile CreateSocket() {
-  return base::ScopedFile(socket(AF_UNIX, SOCK_STREAM, 0));
-}
-
-}  // namespace base
-}  // namespace perfetto
diff --git a/src/base/unix_socket.cc b/src/base/unix_socket.cc
new file mode 100644
index 0000000..e3ed5f2
--- /dev/null
+++ b/src/base/unix_socket.cc
@@ -0,0 +1,543 @@
+/*
+ * Copyright (C) 2017 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "perfetto/base/unix_socket.h"
+
+#include <errno.h>
+#include <fcntl.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/socket.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <sys/un.h>
+#include <unistd.h>
+
+#include <algorithm>
+#include <memory>
+
+#include "perfetto/base/build_config.h"
+#include "perfetto/base/logging.h"
+#include "perfetto/base/task_runner.h"
+#include "perfetto/base/utils.h"
+
+#if PERFETTO_BUILDFLAG(PERFETTO_OS_MACOSX)
+#include <sys/ucred.h>
+#endif
+
+namespace perfetto {
+namespace base {
+
+namespace {
+// MSG_NOSIGNAL is not supported on Mac OS X, but in that case the socket is
+// created with SO_NOSIGPIPE (See InitializeSocket()).
+#if PERFETTO_BUILDFLAG(PERFETTO_OS_MACOSX)
+constexpr int kNoSigPipe = 0;
+#else
+constexpr int kNoSigPipe = MSG_NOSIGNAL;
+#endif
+
+// Android takes an int instead of socklen_t for the control buffer size.
+#if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
+using CBufLenType = size_t;
+#else
+using CBufLenType = socklen_t;
+#endif
+}
+
+// The CMSG_* macros use NULL instead of nullptr.
+#pragma GCC diagnostic push
+#if !PERFETTO_BUILDFLAG(PERFETTO_OS_MACOSX)
+#pragma GCC diagnostic ignored "-Wzero-as-null-pointer-constant"
+#endif
+
+ssize_t SockSend(int fd,
+                 const void* msg,
+                 size_t len,
+                 const int* send_fds,
+                 size_t num_fds) {
+  msghdr msg_hdr = {};
+  iovec iov = {const_cast<void*>(msg), len};
+  msg_hdr.msg_iov = &iov;
+  msg_hdr.msg_iovlen = 1;
+  alignas(cmsghdr) char control_buf[256];
+
+  if (num_fds > 0) {
+    const CBufLenType control_buf_len =
+        static_cast<CBufLenType>(CMSG_SPACE(num_fds * sizeof(int)));
+    PERFETTO_CHECK(control_buf_len <= sizeof(control_buf));
+    memset(control_buf, 0, sizeof(control_buf));
+    msg_hdr.msg_control = control_buf;
+    msg_hdr.msg_controllen = control_buf_len;
+    struct cmsghdr* cmsg = CMSG_FIRSTHDR(&msg_hdr);
+    cmsg->cmsg_level = SOL_SOCKET;
+    cmsg->cmsg_type = SCM_RIGHTS;
+    cmsg->cmsg_len = static_cast<CBufLenType>(CMSG_LEN(num_fds * sizeof(int)));
+    memcpy(CMSG_DATA(cmsg), send_fds, num_fds * sizeof(int));
+    msg_hdr.msg_controllen = cmsg->cmsg_len;
+  }
+
+  return PERFETTO_EINTR(sendmsg(fd, &msg_hdr, kNoSigPipe));
+}
+
+ssize_t SockReceive(int fd,
+                    void* msg,
+                    size_t len,
+                    ScopedFile* fd_vec,
+                    size_t max_files) {
+  msghdr msg_hdr = {};
+  iovec iov = {msg, len};
+  msg_hdr.msg_iov = &iov;
+  msg_hdr.msg_iovlen = 1;
+  alignas(cmsghdr) char control_buf[256];
+
+  if (max_files > 0) {
+    msg_hdr.msg_control = control_buf;
+    msg_hdr.msg_controllen =
+        static_cast<CBufLenType>(CMSG_SPACE(max_files * sizeof(int)));
+    PERFETTO_CHECK(msg_hdr.msg_controllen <= sizeof(control_buf));
+  }
+  const ssize_t sz = PERFETTO_EINTR(recvmsg(fd, &msg_hdr, kNoSigPipe));
+  if (sz <= 0) {
+    return sz;
+  }
+  PERFETTO_CHECK(static_cast<size_t>(sz) <= len);
+
+  int* fds = nullptr;
+  uint32_t fds_len = 0;
+
+  if (max_files > 0) {
+    for (cmsghdr* cmsg = CMSG_FIRSTHDR(&msg_hdr); cmsg;
+         cmsg = CMSG_NXTHDR(&msg_hdr, cmsg)) {
+      const size_t payload_len = cmsg->cmsg_len - CMSG_LEN(0);
+      if (cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SCM_RIGHTS) {
+        PERFETTO_DCHECK(payload_len % sizeof(int) == 0u);
+        PERFETTO_CHECK(fds == nullptr);
+        fds = reinterpret_cast<int*>(CMSG_DATA(cmsg));
+        fds_len = static_cast<uint32_t>(payload_len / sizeof(int));
+      }
+    }
+  }
+
+  if (msg_hdr.msg_flags & MSG_TRUNC || msg_hdr.msg_flags & MSG_CTRUNC) {
+    for (size_t i = 0; fds && i < fds_len; ++i)
+      close(fds[i]);
+    errno = EMSGSIZE;
+    return -1;
+  }
+
+  for (size_t i = 0; fds && i < fds_len; ++i) {
+    if (i < max_files)
+      fd_vec[i].reset(fds[i]);
+    else
+      close(fds[i]);
+  }
+
+  return sz;
+}
+
+#pragma GCC diagnostic pop
+
+bool MakeSockAddr(const std::string& socket_name,
+                  sockaddr_un* addr,
+                  socklen_t* addr_size) {
+  memset(addr, 0, sizeof(*addr));
+  const size_t name_len = socket_name.size();
+  if (name_len >= sizeof(addr->sun_path)) {
+    errno = ENAMETOOLONG;
+    return false;
+  }
+  memcpy(addr->sun_path, socket_name.data(), name_len);
+  if (addr->sun_path[0] == '@')
+    addr->sun_path[0] = '\0';
+  addr->sun_family = AF_UNIX;
+  *addr_size = static_cast<socklen_t>(
+      __builtin_offsetof(sockaddr_un, sun_path) + name_len + 1);
+  return true;
+}
+
+ScopedFile CreateSocket() {
+  return ScopedFile(socket(AF_UNIX, SOCK_STREAM, 0));
+}
+
+// TODO(primiano): Add ThreadChecker to methods of this class.
+
+// static
+ScopedFile UnixSocket::CreateAndBind(const std::string& socket_name) {
+  ScopedFile fd = CreateSocket();
+  if (!fd)
+    return fd;
+
+  sockaddr_un addr;
+  socklen_t addr_size;
+  if (!MakeSockAddr(socket_name, &addr, &addr_size)) {
+    return ScopedFile();
+  }
+
+  if (bind(*fd, reinterpret_cast<sockaddr*>(&addr), addr_size)) {
+    PERFETTO_DPLOG("bind()");
+    return ScopedFile();
+  }
+
+  return fd;
+}
+
+// static
+std::unique_ptr<UnixSocket> UnixSocket::Listen(const std::string& socket_name,
+                                               EventListener* event_listener,
+                                               TaskRunner* task_runner) {
+  // Forward the call to the Listen() overload below.
+  return Listen(CreateAndBind(socket_name), event_listener, task_runner);
+}
+
+// static
+std::unique_ptr<UnixSocket> UnixSocket::Listen(ScopedFile socket_fd,
+                                               EventListener* event_listener,
+                                               TaskRunner* task_runner) {
+  std::unique_ptr<UnixSocket> sock(new UnixSocket(
+      event_listener, task_runner, std::move(socket_fd), State::kListening));
+  return sock;
+}
+
+// static
+std::unique_ptr<UnixSocket> UnixSocket::Connect(const std::string& socket_name,
+                                                EventListener* event_listener,
+                                                TaskRunner* task_runner) {
+  std::unique_ptr<UnixSocket> sock(new UnixSocket(event_listener, task_runner));
+  sock->DoConnect(socket_name);
+  return sock;
+}
+
+UnixSocket::UnixSocket(EventListener* event_listener, TaskRunner* task_runner)
+    : UnixSocket(event_listener,
+                 task_runner,
+                 ScopedFile(),
+                 State::kDisconnected) {}
+
+UnixSocket::UnixSocket(EventListener* event_listener,
+                       TaskRunner* task_runner,
+                       ScopedFile adopt_fd,
+                       State adopt_state)
+    : event_listener_(event_listener),
+      task_runner_(task_runner),
+      weak_ptr_factory_(this) {
+  state_ = State::kDisconnected;
+  if (adopt_state == State::kDisconnected) {
+    // We get here from the default ctor().
+    PERFETTO_DCHECK(!adopt_fd);
+    fd_ = CreateSocket();
+    if (!fd_) {
+      last_error_ = errno;
+      return;
+    }
+  } else if (adopt_state == State::kConnected) {
+    // We get here from OnNewIncomingConnection().
+    PERFETTO_DCHECK(adopt_fd);
+    fd_ = std::move(adopt_fd);
+    state_ = State::kConnected;
+    ReadPeerCredentials();
+  } else if (adopt_state == State::kListening) {
+    // We get here from Listen().
+
+    // |adopt_fd| might genuinely be invalid if the bind() failed.
+    if (!adopt_fd) {
+      last_error_ = errno;
+      return;
+    }
+
+    fd_ = std::move(adopt_fd);
+    if (listen(*fd_, SOMAXCONN)) {
+      last_error_ = errno;
+      PERFETTO_DPLOG("listen()");
+      return;
+    }
+    state_ = State::kListening;
+  } else {
+    PERFETTO_CHECK(false);  // Unfeasible.
+  }
+
+  PERFETTO_DCHECK(fd_);
+  last_error_ = 0;
+
+#if PERFETTO_BUILDFLAG(PERFETTO_OS_MACOSX)
+  const int no_sigpipe = 1;
+  setsockopt(*fd_, SOL_SOCKET, SO_NOSIGPIPE, &no_sigpipe, sizeof(no_sigpipe));
+#endif
+  // There is no reason why a socket should outlive the process in case of
+  // exec() by default, this is just working around a broken unix design.
+  int fcntl_res = fcntl(*fd_, F_SETFD, FD_CLOEXEC);
+  PERFETTO_CHECK(fcntl_res == 0);
+
+  SetBlockingIO(false);
+
+  WeakPtr<UnixSocket> weak_ptr = weak_ptr_factory_.GetWeakPtr();
+  task_runner_->AddFileDescriptorWatch(*fd_, [weak_ptr]() {
+    if (weak_ptr)
+      weak_ptr->OnEvent();
+  });
+}
+
+UnixSocket::~UnixSocket() {
+  // The implicit dtor of |weak_ptr_factory_| will no-op pending callbacks.
+  Shutdown(true);
+}
+
+// Called only by the Connect() static constructor.
+void UnixSocket::DoConnect(const std::string& socket_name) {
+  PERFETTO_DCHECK(state_ == State::kDisconnected);
+
+  // This is the only thing that can gracefully fail in the ctor.
+  if (!fd_)
+    return NotifyConnectionState(false);
+
+  sockaddr_un addr;
+  socklen_t addr_size;
+  if (!MakeSockAddr(socket_name, &addr, &addr_size)) {
+    last_error_ = errno;
+    return NotifyConnectionState(false);
+  }
+
+  int res = PERFETTO_EINTR(
+      connect(*fd_, reinterpret_cast<sockaddr*>(&addr), addr_size));
+  if (res && errno != EINPROGRESS) {
+    last_error_ = errno;
+    return NotifyConnectionState(false);
+  }
+
+  // At this point either |res| == 0 (the connect() succeeded) or started
+  // asynchronously (EINPROGRESS).
+  last_error_ = 0;
+  state_ = State::kConnecting;
+
+  // Even if the socket is non-blocking, connecting to a UNIX socket can be
+  // acknowledged straight away rather than returning EINPROGRESS. In this case
+  // just trigger an OnEvent without waiting for the FD watch. That will poll
+  // the SO_ERROR and evolve the state into either kConnected or kDisconnected.
+  if (res == 0) {
+    WeakPtr<UnixSocket> weak_ptr = weak_ptr_factory_.GetWeakPtr();
+    task_runner_->PostTask([weak_ptr]() {
+      if (weak_ptr)
+        weak_ptr->OnEvent();
+    });
+  }
+}
+
+void UnixSocket::ReadPeerCredentials() {
+#if PERFETTO_BUILDFLAG(PERFETTO_OS_LINUX) || \
+    PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
+  struct ucred user_cred;
+  socklen_t len = sizeof(user_cred);
+  int res = getsockopt(*fd_, SOL_SOCKET, SO_PEERCRED, &user_cred, &len);
+  PERFETTO_CHECK(res == 0);
+  peer_uid_ = user_cred.uid;
+  peer_pid_ = user_cred.pid;
+#else
+  struct xucred user_cred;
+  socklen_t len = sizeof(user_cred);
+  int res = getsockopt(*fd_, 0, LOCAL_PEERCRED, &user_cred, &len);
+  PERFETTO_CHECK(res == 0 && user_cred.cr_version == XUCRED_VERSION);
+  peer_uid_ = static_cast<uid_t>(user_cred.cr_uid);
+// There is no pid in the LOCAL_PEERCREDS for MacOS / FreeBSD.
+#endif
+}
+
+void UnixSocket::OnEvent() {
+  if (state_ == State::kDisconnected)
+    return;  // Some spurious event, typically queued just before Shutdown().
+
+  if (state_ == State::kConnected)
+    return event_listener_->OnDataAvailable(this);
+
+  if (state_ == State::kConnecting) {
+    PERFETTO_DCHECK(fd_);
+    int sock_err = EINVAL;
+    socklen_t err_len = sizeof(sock_err);
+    int res = getsockopt(*fd_, SOL_SOCKET, SO_ERROR, &sock_err, &err_len);
+    if (res == 0 && sock_err == EINPROGRESS)
+      return;  // Not connected yet, just a spurious FD watch wakeup.
+    if (res == 0 && sock_err == 0) {
+      ReadPeerCredentials();
+      state_ = State::kConnected;
+      return event_listener_->OnConnect(this, true /* connected */);
+    }
+    last_error_ = sock_err;
+    Shutdown(false);
+    return event_listener_->OnConnect(this, false /* connected */);
+  }
+
+  // New incoming connection.
+  if (state_ == State::kListening) {
+    // There could be more than one incoming connection behind each FD watch
+    // notification. Drain'em all.
+    for (;;) {
+      sockaddr_un cli_addr = {};
+      socklen_t size = sizeof(cli_addr);
+      ScopedFile new_fd(PERFETTO_EINTR(
+          accept(*fd_, reinterpret_cast<sockaddr*>(&cli_addr), &size)));
+      if (!new_fd)
+        return;
+      std::unique_ptr<UnixSocket> new_sock(new UnixSocket(
+          event_listener_, task_runner_, std::move(new_fd), State::kConnected));
+      event_listener_->OnNewIncomingConnection(this, std::move(new_sock));
+    }
+  }
+}
+
+bool UnixSocket::Send(const std::string& msg) {
+  return Send(msg.c_str(), msg.size() + 1);
+}
+
+bool UnixSocket::Send(const void* msg,
+                      size_t len,
+                      int send_fd,
+                      BlockingMode blocking_mode) {
+  if (send_fd != -1)
+    return Send(msg, len, &send_fd, 1, blocking_mode);
+  return Send(msg, len, nullptr, 0, blocking_mode);
+}
+
+bool UnixSocket::Send(const void* msg,
+                      size_t len,
+                      const int* send_fds,
+                      size_t num_fds,
+                      BlockingMode blocking_mode) {
+  if (state_ != State::kConnected) {
+    errno = last_error_ = ENOTCONN;
+    return false;
+  }
+
+  if (blocking_mode == BlockingMode::kBlocking)
+    SetBlockingIO(true);
+  const ssize_t sz = SockSend(*fd_, msg, len, send_fds, num_fds);
+  if (blocking_mode == BlockingMode::kBlocking)
+    SetBlockingIO(false);
+
+  if (sz == static_cast<ssize_t>(len)) {
+    last_error_ = 0;
+    return true;
+  }
+
+  // If sendmsg() succeds but the returned size is < |len| it means that the
+  // endpoint disconnected in the middle of the read, and we managed to send
+  // only a portion of the buffer. In this case we should just give up.
+
+  if (sz < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
+    // A genuine out-of-buffer. The client should retry or give up.
+    // Man pages specify that EAGAIN and EWOULDBLOCK have the same semantic here
+    // and clients should check for both.
+    last_error_ = EAGAIN;
+    return false;
+  }
+
+  // Either the the other endpoint disconnect (ECONNRESET) or some other error
+  // happened.
+  last_error_ = errno;
+  PERFETTO_DPLOG("sendmsg() failed");
+  Shutdown(true);
+  return false;
+}
+
+void UnixSocket::Shutdown(bool notify) {
+  WeakPtr<UnixSocket> weak_ptr = weak_ptr_factory_.GetWeakPtr();
+  if (notify) {
+    if (state_ == State::kConnected) {
+      task_runner_->PostTask([weak_ptr]() {
+        if (weak_ptr)
+          weak_ptr->event_listener_->OnDisconnect(weak_ptr.get());
+      });
+    } else if (state_ == State::kConnecting) {
+      task_runner_->PostTask([weak_ptr]() {
+        if (weak_ptr)
+          weak_ptr->event_listener_->OnConnect(weak_ptr.get(), false);
+      });
+    }
+  }
+
+  if (fd_) {
+    shutdown(*fd_, SHUT_RDWR);
+    task_runner_->RemoveFileDescriptorWatch(*fd_);
+    fd_.reset();
+  }
+  state_ = State::kDisconnected;
+}
+
+size_t UnixSocket::Receive(void* msg, size_t len) {
+  return Receive(msg, len, nullptr, 0);
+}
+
+size_t UnixSocket::Receive(void* msg,
+                           size_t len,
+                           ScopedFile* fd_vec,
+                           size_t max_files) {
+  if (state_ != State::kConnected) {
+    last_error_ = ENOTCONN;
+    return 0;
+  }
+
+  const ssize_t sz = SockReceive(*fd_, msg, len, fd_vec, max_files);
+  if (sz < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
+    last_error_ = EAGAIN;
+    return 0;
+  }
+  if (sz <= 0) {
+    last_error_ = errno;
+    Shutdown(true);
+    return 0;
+  }
+  PERFETTO_CHECK(static_cast<size_t>(sz) <= len);
+  return static_cast<size_t>(sz);
+}
+
+std::string UnixSocket::ReceiveString(size_t max_length) {
+  std::unique_ptr<char[]> buf(new char[max_length + 1]);
+  size_t rsize = Receive(buf.get(), max_length);
+  PERFETTO_CHECK(static_cast<size_t>(rsize) <= max_length);
+  buf[static_cast<size_t>(rsize)] = '\0';
+  return std::string(buf.get());
+}
+
+void UnixSocket::NotifyConnectionState(bool success) {
+  if (!success)
+    Shutdown(false);
+
+  WeakPtr<UnixSocket> weak_ptr = weak_ptr_factory_.GetWeakPtr();
+  task_runner_->PostTask([weak_ptr, success]() {
+    if (weak_ptr)
+      weak_ptr->event_listener_->OnConnect(weak_ptr.get(), success);
+  });
+}
+
+void UnixSocket::SetBlockingIO(bool is_blocking) {
+  int flags = fcntl(*fd_, F_GETFL, 0);
+  if (!is_blocking) {
+    flags |= O_NONBLOCK;
+  } else {
+    flags &= ~static_cast<int>(O_NONBLOCK);
+  }
+  bool fcntl_res = fcntl(fd(), F_SETFL, flags);
+  PERFETTO_CHECK(fcntl_res == 0);
+}
+
+UnixSocket::EventListener::~EventListener() {}
+void UnixSocket::EventListener::OnNewIncomingConnection(
+    UnixSocket*,
+    std::unique_ptr<UnixSocket>) {}
+void UnixSocket::EventListener::OnConnect(UnixSocket*, bool) {}
+void UnixSocket::EventListener::OnDisconnect(UnixSocket*) {}
+void UnixSocket::EventListener::OnDataAvailable(UnixSocket*) {}
+
+}  // namespace base
+}  // namespace perfetto
diff --git a/src/base/unix_socket_unittest.cc b/src/base/unix_socket_unittest.cc
new file mode 100644
index 0000000..5cdd6ab
--- /dev/null
+++ b/src/base/unix_socket_unittest.cc
@@ -0,0 +1,638 @@
+/*
+ * Copyright (C) 2017 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "perfetto/base/unix_socket.h"
+
+#include <sys/mman.h>
+
+#include <list>
+#include <thread>
+
+#include "gmock/gmock.h"
+#include "gtest/gtest.h"
+#include "perfetto/base/build_config.h"
+#include "perfetto/base/logging.h"
+#include "perfetto/base/temp_file.h"
+#include "perfetto/base/utils.h"
+#include "src/base/test/test_task_runner.h"
+#include "src/ipc/test/test_socket.h"
+
+namespace perfetto {
+namespace base {
+namespace {
+
+using ::testing::_;
+using ::testing::AtLeast;
+using ::testing::Invoke;
+using ::testing::InvokeWithoutArgs;
+using ::testing::Mock;
+
+constexpr char kSocketName[] = TEST_SOCK_NAME("unix_socket_unittest");
+
+class MockEventListener : public UnixSocket::EventListener {
+ public:
+  MOCK_METHOD2(OnNewIncomingConnection, void(UnixSocket*, UnixSocket*));
+  MOCK_METHOD2(OnConnect, void(UnixSocket*, bool));
+  MOCK_METHOD1(OnDisconnect, void(UnixSocket*));
+  MOCK_METHOD1(OnDataAvailable, void(UnixSocket*));
+
+  // GMock doesn't support mocking methods with non-copiable args.
+  void OnNewIncomingConnection(
+      UnixSocket* self,
+      std::unique_ptr<UnixSocket> new_connection) override {
+    incoming_connections_.emplace_back(std::move(new_connection));
+    OnNewIncomingConnection(self, incoming_connections_.back().get());
+  }
+
+  std::unique_ptr<UnixSocket> GetIncomingConnection() {
+    if (incoming_connections_.empty())
+      return nullptr;
+    std::unique_ptr<UnixSocket> sock = std::move(incoming_connections_.front());
+    incoming_connections_.pop_front();
+    return sock;
+  }
+
+ private:
+  std::list<std::unique_ptr<UnixSocket>> incoming_connections_;
+};
+
+class UnixSocketTest : public ::testing::Test {
+ protected:
+  void SetUp() override { DESTROY_TEST_SOCK(kSocketName); }
+  void TearDown() override { DESTROY_TEST_SOCK(kSocketName); }
+
+  TestTaskRunner task_runner_;
+  MockEventListener event_listener_;
+};
+
+TEST_F(UnixSocketTest, ConnectionFailureIfUnreachable) {
+  auto cli = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_);
+  ASSERT_FALSE(cli->is_connected());
+  auto checkpoint = task_runner_.CreateCheckpoint("failure");
+  EXPECT_CALL(event_listener_, OnConnect(cli.get(), false))
+      .WillOnce(InvokeWithoutArgs(checkpoint));
+  task_runner_.RunUntilCheckpoint("failure");
+}
+
+// Both server and client should see an OnDisconnect() if the server drops
+// incoming connections immediately as they are created.
+TEST_F(UnixSocketTest, ConnectionImmediatelyDroppedByServer) {
+  auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_);
+  ASSERT_TRUE(srv->is_listening());
+
+  // The server will immediately shutdown the connection upon
+  // OnNewIncomingConnection().
+  auto srv_did_shutdown = task_runner_.CreateCheckpoint("srv_did_shutdown");
+  EXPECT_CALL(event_listener_, OnNewIncomingConnection(srv.get(), _))
+      .WillOnce(
+          Invoke([this, srv_did_shutdown](UnixSocket*, UnixSocket* new_conn) {
+            EXPECT_CALL(event_listener_, OnDisconnect(new_conn));
+            new_conn->Shutdown(true);
+            srv_did_shutdown();
+          }));
+
+  auto checkpoint = task_runner_.CreateCheckpoint("cli_connected");
+  auto cli = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_);
+  EXPECT_CALL(event_listener_, OnConnect(cli.get(), true))
+      .WillOnce(InvokeWithoutArgs(checkpoint));
+  task_runner_.RunUntilCheckpoint("cli_connected");
+  task_runner_.RunUntilCheckpoint("srv_did_shutdown");
+
+  // Trying to send something will trigger the disconnection notification.
+  auto cli_disconnected = task_runner_.CreateCheckpoint("cli_disconnected");
+  EXPECT_CALL(event_listener_, OnDisconnect(cli.get()))
+      .WillOnce(InvokeWithoutArgs(cli_disconnected));
+  EXPECT_FALSE(cli->Send("whatever"));
+  task_runner_.RunUntilCheckpoint("cli_disconnected");
+}
+
+TEST_F(UnixSocketTest, ClientAndServerExchangeData) {
+  auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_);
+  ASSERT_TRUE(srv->is_listening());
+
+  auto cli = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_);
+  EXPECT_CALL(event_listener_, OnConnect(cli.get(), true));
+  auto cli_connected = task_runner_.CreateCheckpoint("cli_connected");
+  auto srv_disconnected = task_runner_.CreateCheckpoint("srv_disconnected");
+  EXPECT_CALL(event_listener_, OnNewIncomingConnection(srv.get(), _))
+      .WillOnce(Invoke([this, cli_connected, srv_disconnected](
+                           UnixSocket*, UnixSocket* srv_conn) {
+        EXPECT_CALL(event_listener_, OnDisconnect(srv_conn))
+            .WillOnce(InvokeWithoutArgs(srv_disconnected));
+        cli_connected();
+      }));
+  task_runner_.RunUntilCheckpoint("cli_connected");
+
+  auto srv_conn = event_listener_.GetIncomingConnection();
+  ASSERT_TRUE(srv_conn);
+  ASSERT_TRUE(cli->is_connected());
+
+  auto cli_did_recv = task_runner_.CreateCheckpoint("cli_did_recv");
+  EXPECT_CALL(event_listener_, OnDataAvailable(cli.get()))
+      .WillOnce(Invoke([cli_did_recv](UnixSocket* s) {
+        ASSERT_EQ("srv>cli", s->ReceiveString());
+        cli_did_recv();
+      }));
+
+  auto srv_did_recv = task_runner_.CreateCheckpoint("srv_did_recv");
+  EXPECT_CALL(event_listener_, OnDataAvailable(srv_conn.get()))
+      .WillOnce(Invoke([srv_did_recv](UnixSocket* s) {
+        ASSERT_EQ("cli>srv", s->ReceiveString());
+        srv_did_recv();
+      }));
+  ASSERT_TRUE(cli->Send("cli>srv"));
+  ASSERT_TRUE(srv_conn->Send("srv>cli"));
+  task_runner_.RunUntilCheckpoint("cli_did_recv");
+  task_runner_.RunUntilCheckpoint("srv_did_recv");
+
+  // Check that Send/Receive() fails gracefully once the socket is closed.
+  auto cli_disconnected = task_runner_.CreateCheckpoint("cli_disconnected");
+  EXPECT_CALL(event_listener_, OnDisconnect(cli.get()))
+      .WillOnce(InvokeWithoutArgs(cli_disconnected));
+  cli->Shutdown(true);
+  char msg[4];
+  ASSERT_EQ(0u, cli->Receive(&msg, sizeof(msg)));
+  ASSERT_EQ("", cli->ReceiveString());
+  ASSERT_EQ(0u, srv_conn->Receive(&msg, sizeof(msg)));
+  ASSERT_EQ("", srv_conn->ReceiveString());
+  ASSERT_FALSE(cli->Send("foo"));
+  ASSERT_FALSE(srv_conn->Send("bar"));
+  srv->Shutdown(true);
+  task_runner_.RunUntilCheckpoint("cli_disconnected");
+  task_runner_.RunUntilCheckpoint("srv_disconnected");
+}
+
+constexpr char cli_str[] = "cli>srv";
+constexpr char srv_str[] = "srv>cli";
+
+TEST_F(UnixSocketTest, ClientAndServerExchangeFDs) {
+  auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_);
+  ASSERT_TRUE(srv->is_listening());
+
+  auto cli = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_);
+  EXPECT_CALL(event_listener_, OnConnect(cli.get(), true));
+  auto cli_connected = task_runner_.CreateCheckpoint("cli_connected");
+  auto srv_disconnected = task_runner_.CreateCheckpoint("srv_disconnected");
+  EXPECT_CALL(event_listener_, OnNewIncomingConnection(srv.get(), _))
+      .WillOnce(Invoke([this, cli_connected, srv_disconnected](
+                           UnixSocket*, UnixSocket* srv_conn) {
+        EXPECT_CALL(event_listener_, OnDisconnect(srv_conn))
+            .WillOnce(InvokeWithoutArgs(srv_disconnected));
+        cli_connected();
+      }));
+  task_runner_.RunUntilCheckpoint("cli_connected");
+
+  auto srv_conn = event_listener_.GetIncomingConnection();
+  ASSERT_TRUE(srv_conn);
+  ASSERT_TRUE(cli->is_connected());
+
+  ScopedFile null_fd(open("/dev/null", O_RDONLY));
+  ScopedFile zero_fd(open("/dev/zero", O_RDONLY));
+
+  auto cli_did_recv = task_runner_.CreateCheckpoint("cli_did_recv");
+  EXPECT_CALL(event_listener_, OnDataAvailable(cli.get()))
+      .WillRepeatedly(Invoke([cli_did_recv](UnixSocket* s) {
+        ScopedFile fd_buf[3];
+        char buf[sizeof(cli_str)];
+        if (!s->Receive(buf, sizeof(buf), fd_buf, ArraySize(fd_buf)))
+          return;
+        ASSERT_STREQ(srv_str, buf);
+        ASSERT_NE(*fd_buf[0], -1);
+        ASSERT_NE(*fd_buf[1], -1);
+        ASSERT_EQ(*fd_buf[2], -1);
+
+        char rd_buf[1];
+        // /dev/null
+        ASSERT_EQ(read(*fd_buf[0], rd_buf, sizeof(rd_buf)), 0);
+        // /dev/zero
+        ASSERT_EQ(read(*fd_buf[1], rd_buf, sizeof(rd_buf)), 1);
+        cli_did_recv();
+      }));
+
+  auto srv_did_recv = task_runner_.CreateCheckpoint("srv_did_recv");
+  EXPECT_CALL(event_listener_, OnDataAvailable(srv_conn.get()))
+      .WillRepeatedly(Invoke([srv_did_recv](UnixSocket* s) {
+        ScopedFile fd_buf[3];
+        char buf[sizeof(srv_str)];
+        if (!s->Receive(buf, sizeof(buf), fd_buf, ArraySize(fd_buf)))
+          return;
+        ASSERT_STREQ(cli_str, buf);
+        ASSERT_NE(*fd_buf[0], -1);
+        ASSERT_NE(*fd_buf[1], -1);
+        ASSERT_EQ(*fd_buf[2], -1);
+
+        char rd_buf[1];
+        // /dev/null
+        ASSERT_EQ(read(*fd_buf[0], rd_buf, sizeof(rd_buf)), 0);
+        // /dev/zero
+        ASSERT_EQ(read(*fd_buf[1], rd_buf, sizeof(rd_buf)), 1);
+        srv_did_recv();
+      }));
+
+  int buf_fd[2] = {null_fd.get(), zero_fd.get()};
+
+  ASSERT_TRUE(
+      cli->Send(cli_str, sizeof(cli_str), buf_fd, base::ArraySize(buf_fd)));
+  ASSERT_TRUE(srv_conn->Send(srv_str, sizeof(srv_str), buf_fd,
+                             base::ArraySize(buf_fd)));
+  task_runner_.RunUntilCheckpoint("srv_did_recv");
+  task_runner_.RunUntilCheckpoint("cli_did_recv");
+
+  auto cli_disconnected = task_runner_.CreateCheckpoint("cli_disconnected");
+  EXPECT_CALL(event_listener_, OnDisconnect(cli.get()))
+      .WillOnce(InvokeWithoutArgs(cli_disconnected));
+  cli->Shutdown(true);
+  srv->Shutdown(true);
+  task_runner_.RunUntilCheckpoint("srv_disconnected");
+  task_runner_.RunUntilCheckpoint("cli_disconnected");
+}
+
+TEST_F(UnixSocketTest, ListenWithPassedFileDescriptor) {
+  auto fd = UnixSocket::CreateAndBind(kSocketName);
+  auto srv = UnixSocket::Listen(std::move(fd), &event_listener_, &task_runner_);
+  ASSERT_TRUE(srv->is_listening());
+
+  auto cli = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_);
+  EXPECT_CALL(event_listener_, OnConnect(cli.get(), true));
+  auto cli_connected = task_runner_.CreateCheckpoint("cli_connected");
+  auto srv_disconnected = task_runner_.CreateCheckpoint("srv_disconnected");
+  EXPECT_CALL(event_listener_, OnNewIncomingConnection(srv.get(), _))
+      .WillOnce(Invoke([this, cli_connected, srv_disconnected](
+                           UnixSocket*, UnixSocket* srv_conn) {
+        // Read the EOF state.
+        EXPECT_CALL(event_listener_, OnDataAvailable(srv_conn))
+            .WillOnce(
+                InvokeWithoutArgs([srv_conn] { srv_conn->ReceiveString(); }));
+        EXPECT_CALL(event_listener_, OnDisconnect(srv_conn))
+            .WillOnce(InvokeWithoutArgs(srv_disconnected));
+        cli_connected();
+      }));
+  task_runner_.RunUntilCheckpoint("cli_connected");
+  ASSERT_TRUE(cli->is_connected());
+  cli.reset();
+  task_runner_.RunUntilCheckpoint("srv_disconnected");
+}
+
+// Mostly a stress tests. Connects kNumClients clients to the same server and
+// tests that all can exchange data and can see the expected sequence of events.
+TEST_F(UnixSocketTest, SeveralClients) {
+  auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_);
+  ASSERT_TRUE(srv->is_listening());
+  constexpr size_t kNumClients = 32;
+  std::unique_ptr<UnixSocket> cli[kNumClients];
+
+  EXPECT_CALL(event_listener_, OnNewIncomingConnection(srv.get(), _))
+      .Times(kNumClients)
+      .WillRepeatedly(Invoke([this](UnixSocket*, UnixSocket* s) {
+        EXPECT_CALL(event_listener_, OnDataAvailable(s))
+            .WillOnce(Invoke([](UnixSocket* t) {
+              ASSERT_EQ("PING", t->ReceiveString());
+              ASSERT_TRUE(t->Send("PONG"));
+            }));
+      }));
+
+  for (size_t i = 0; i < kNumClients; i++) {
+    cli[i] = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_);
+    EXPECT_CALL(event_listener_, OnConnect(cli[i].get(), true))
+        .WillOnce(Invoke([](UnixSocket* s, bool success) {
+          ASSERT_TRUE(success);
+          ASSERT_TRUE(s->Send("PING"));
+        }));
+
+    auto checkpoint = task_runner_.CreateCheckpoint(std::to_string(i));
+    EXPECT_CALL(event_listener_, OnDataAvailable(cli[i].get()))
+        .WillOnce(Invoke([checkpoint](UnixSocket* s) {
+          ASSERT_EQ("PONG", s->ReceiveString());
+          checkpoint();
+        }));
+  }
+
+  for (size_t i = 0; i < kNumClients; i++) {
+    task_runner_.RunUntilCheckpoint(std::to_string(i));
+    ASSERT_TRUE(Mock::VerifyAndClearExpectations(cli[i].get()));
+  }
+}
+
+// Creates two processes. The server process creates a file and passes it over
+// the socket to the client. Both processes mmap the file in shared mode and
+// check that they see the same contents.
+TEST_F(UnixSocketTest, SharedMemory) {
+  int pipes[2];
+  ASSERT_EQ(0, pipe(pipes));
+
+  pid_t pid = fork();
+  ASSERT_GE(pid, 0);
+  constexpr size_t kTmpSize = 4096;
+
+  if (pid == 0) {
+    // Child process.
+    TempFile scoped_tmp = TempFile::CreateUnlinked();
+    int tmp_fd = scoped_tmp.fd();
+    ASSERT_FALSE(ftruncate(tmp_fd, kTmpSize));
+    char* mem = reinterpret_cast<char*>(
+        mmap(nullptr, kTmpSize, PROT_READ | PROT_WRITE, MAP_SHARED, tmp_fd, 0));
+    ASSERT_NE(nullptr, mem);
+    memcpy(mem, "shm rocks", 10);
+
+    auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_);
+    ASSERT_TRUE(srv->is_listening());
+    // Signal the other process that it can connect.
+    ASSERT_EQ(1, PERFETTO_EINTR(write(pipes[1], ".", 1)));
+    auto checkpoint = task_runner_.CreateCheckpoint("change_seen_by_server");
+    EXPECT_CALL(event_listener_, OnNewIncomingConnection(srv.get(), _))
+        .WillOnce(Invoke(
+            [this, tmp_fd, checkpoint, mem](UnixSocket*, UnixSocket* new_conn) {
+              ASSERT_EQ(geteuid(), static_cast<uint32_t>(new_conn->peer_uid()));
+              ASSERT_TRUE(new_conn->Send("txfd", 5, tmp_fd));
+              // Wait for the client to change this again.
+              EXPECT_CALL(event_listener_, OnDataAvailable(new_conn))
+                  .WillOnce(Invoke([checkpoint, mem](UnixSocket* s) {
+                    ASSERT_EQ("change notify", s->ReceiveString());
+                    ASSERT_STREQ("rock more", mem);
+                    checkpoint();
+                  }));
+            }));
+    task_runner_.RunUntilCheckpoint("change_seen_by_server");
+    ASSERT_TRUE(Mock::VerifyAndClearExpectations(&event_listener_));
+    _exit(0);
+  } else {
+    char sync_cmd = '\0';
+    ASSERT_EQ(1, PERFETTO_EINTR(read(pipes[0], &sync_cmd, 1)));
+    ASSERT_EQ('.', sync_cmd);
+    auto cli =
+        UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_);
+    EXPECT_CALL(event_listener_, OnConnect(cli.get(), true));
+    auto checkpoint = task_runner_.CreateCheckpoint("change_seen_by_client");
+    EXPECT_CALL(event_listener_, OnDataAvailable(cli.get()))
+        .WillOnce(Invoke([checkpoint](UnixSocket* s) {
+          char msg[32];
+          ScopedFile fd;
+          ASSERT_EQ(5u, s->Receive(msg, sizeof(msg), &fd));
+          ASSERT_STREQ("txfd", msg);
+          ASSERT_TRUE(fd);
+          char* mem = reinterpret_cast<char*>(mmap(
+              nullptr, kTmpSize, PROT_READ | PROT_WRITE, MAP_SHARED, *fd, 0));
+          ASSERT_NE(nullptr, mem);
+          mem[9] = '\0';  // Just to get a clean error in case of test failure.
+          ASSERT_STREQ("shm rocks", mem);
+
+          // Now change the shared memory and ping the other process.
+          memcpy(mem, "rock more", 10);
+          ASSERT_TRUE(s->Send("change notify"));
+          checkpoint();
+        }));
+    task_runner_.RunUntilCheckpoint("change_seen_by_client");
+    int st = 0;
+    PERFETTO_EINTR(waitpid(pid, &st, 0));
+    ASSERT_FALSE(WIFSIGNALED(st)) << "Server died with signal " << WTERMSIG(st);
+    EXPECT_TRUE(WIFEXITED(st));
+    ASSERT_EQ(0, WEXITSTATUS(st));
+  }
+}
+
+constexpr size_t kAtomicWrites_FrameSize = 1123;
+bool AtomicWrites_SendAttempt(UnixSocket* s,
+                              TaskRunner* task_runner,
+                              int num_frame) {
+  char buf[kAtomicWrites_FrameSize];
+  memset(buf, static_cast<char>(num_frame), sizeof(buf));
+  if (s->Send(buf, sizeof(buf)))
+    return true;
+  task_runner->PostTask(
+      std::bind(&AtomicWrites_SendAttempt, s, task_runner, num_frame));
+  return false;
+}
+
+// Creates a client-server pair. The client sends continuously data to the
+// server. Upon each Send() attempt, the client sends a buffer which is memset()
+// with a unique number (0 to kNumFrames). We are deliberately trying to fill
+// the socket output buffer, so we expect some of these Send()s to fail.
+// The client is extremely aggressive and, when a Send() fails, just keeps
+// re-posting it with the same unique number. The server verifies that we
+// receive one and exactly one of each buffers, without any gaps or truncation.
+TEST_F(UnixSocketTest, DISABLED_SendIsAtomic) {
+  static constexpr int kNumFrames = 127;
+
+  auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_);
+  ASSERT_TRUE(srv->is_listening());
+
+  auto cli = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_);
+
+  auto all_frames_done = task_runner_.CreateCheckpoint("all_frames_done");
+  std::set<int> received_iterations;
+  EXPECT_CALL(event_listener_, OnNewIncomingConnection(srv.get(), _))
+      .WillOnce(Invoke([this, &received_iterations, all_frames_done](
+                           UnixSocket*, UnixSocket* srv_conn) {
+        EXPECT_CALL(event_listener_, OnDataAvailable(srv_conn))
+            .WillRepeatedly(
+                Invoke([&received_iterations, all_frames_done](UnixSocket* s) {
+                  char buf[kAtomicWrites_FrameSize];
+                  size_t res = s->Receive(buf, sizeof(buf));
+                  if (res == 0)
+                    return;  // Spurious select(), could happen.
+                  ASSERT_EQ(kAtomicWrites_FrameSize, res);
+                  // Check that we didn't get two truncated frames.
+                  for (size_t i = 0; i < sizeof(buf); i++)
+                    ASSERT_EQ(buf[0], buf[i]);
+                  ASSERT_EQ(0u, received_iterations.count(buf[0]));
+                  received_iterations.insert(buf[0]);
+                  if (received_iterations.size() == kNumFrames)
+                    all_frames_done();
+                }));
+      }));
+
+  auto cli_connected = task_runner_.CreateCheckpoint("cli_connected");
+  EXPECT_CALL(event_listener_, OnConnect(cli.get(), true))
+      .WillOnce(InvokeWithoutArgs(cli_connected));
+  task_runner_.RunUntilCheckpoint("cli_connected");
+  ASSERT_TRUE(cli->is_connected());
+  ASSERT_EQ(geteuid(), static_cast<uint32_t>(cli->peer_uid()));
+
+  bool did_requeue = false;
+  for (int i = 0; i < kNumFrames; i++)
+    did_requeue |= !AtomicWrites_SendAttempt(cli.get(), &task_runner_, i);
+
+  // We expect that at least one of the kNumFrames didn't fit in the socket
+  // buffer and was re-posted, otherwise this entire test would be pointless.
+  ASSERT_TRUE(did_requeue);
+
+  task_runner_.RunUntilCheckpoint("all_frames_done");
+}
+
+// Checks that the peer_uid() is retained after the client disconnects. The IPC
+// layer needs to rely on this to validate messages received immediately before
+// a client disconnects.
+TEST_F(UnixSocketTest, PeerCredentialsRetainedAfterDisconnect) {
+  auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_);
+  ASSERT_TRUE(srv->is_listening());
+  UnixSocket* srv_client_conn = nullptr;
+  auto srv_connected = task_runner_.CreateCheckpoint("srv_connected");
+  EXPECT_CALL(event_listener_, OnNewIncomingConnection(srv.get(), _))
+      .WillOnce(Invoke(
+          [&srv_client_conn, srv_connected](UnixSocket*, UnixSocket* srv_conn) {
+            srv_client_conn = srv_conn;
+            EXPECT_EQ(geteuid(), static_cast<uint32_t>(srv_conn->peer_uid()));
+#if PERFETTO_BUILDFLAG(PERFETTO_OS_LINUX) || \
+    PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
+            EXPECT_EQ(getpid(), static_cast<uint32_t>(srv_conn->peer_pid()));
+#endif
+            srv_connected();
+          }));
+  auto cli_connected = task_runner_.CreateCheckpoint("cli_connected");
+  auto cli = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_);
+  EXPECT_CALL(event_listener_, OnConnect(cli.get(), true))
+      .WillOnce(InvokeWithoutArgs(cli_connected));
+
+  task_runner_.RunUntilCheckpoint("cli_connected");
+  task_runner_.RunUntilCheckpoint("srv_connected");
+  ASSERT_NE(nullptr, srv_client_conn);
+  ASSERT_TRUE(srv_client_conn->is_connected());
+
+  auto cli_disconnected = task_runner_.CreateCheckpoint("cli_disconnected");
+  EXPECT_CALL(event_listener_, OnDisconnect(srv_client_conn))
+      .WillOnce(InvokeWithoutArgs(cli_disconnected));
+
+  // TODO(primiano): when the a peer disconnects, the other end receives a
+  // spurious OnDataAvailable() that needs to be acked with a Receive() to read
+  // the EOF. See b/69536434.
+  EXPECT_CALL(event_listener_, OnDataAvailable(srv_client_conn))
+      .WillOnce(Invoke([](UnixSocket* sock) { sock->ReceiveString(); }));
+
+  cli.reset();
+  task_runner_.RunUntilCheckpoint("cli_disconnected");
+  ASSERT_FALSE(srv_client_conn->is_connected());
+  EXPECT_EQ(geteuid(), static_cast<uint32_t>(srv_client_conn->peer_uid()));
+#if PERFETTO_BUILDFLAG(PERFETTO_OS_LINUX) || \
+    PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
+  EXPECT_EQ(getpid(), static_cast<uint32_t>(srv_client_conn->peer_pid()));
+#endif
+}
+
+TEST_F(UnixSocketTest, BlockingSend) {
+  auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_);
+  ASSERT_TRUE(srv->is_listening());
+
+  auto all_frames_done = task_runner_.CreateCheckpoint("all_frames_done");
+  size_t total_bytes_received = 0;
+  constexpr size_t kTotalBytes = 1024 * 1024 * 4;
+  EXPECT_CALL(event_listener_, OnNewIncomingConnection(srv.get(), _))
+      .WillOnce(Invoke([this, &total_bytes_received, all_frames_done](
+                           UnixSocket*, UnixSocket* srv_conn) {
+        EXPECT_CALL(event_listener_, OnDataAvailable(srv_conn))
+            .WillRepeatedly(
+                Invoke([&total_bytes_received, all_frames_done](UnixSocket* s) {
+                  char buf[1024];
+                  size_t res = s->Receive(buf, sizeof(buf));
+                  total_bytes_received += res;
+                  if (total_bytes_received == kTotalBytes)
+                    all_frames_done();
+                }));
+      }));
+
+  // Override default timeout as this test can take time on the emulator.
+  const int kTimeoutMs = 60000 * 3;
+
+  // Perform the blocking send form another thread.
+  std::thread tx_thread([] {
+    TestTaskRunner tx_task_runner;
+    MockEventListener tx_events;
+    auto cli = UnixSocket::Connect(kSocketName, &tx_events, &tx_task_runner);
+
+    auto cli_connected = tx_task_runner.CreateCheckpoint("cli_connected");
+    EXPECT_CALL(tx_events, OnConnect(cli.get(), true))
+        .WillOnce(InvokeWithoutArgs(cli_connected));
+    tx_task_runner.RunUntilCheckpoint("cli_connected");
+
+    auto all_sent = tx_task_runner.CreateCheckpoint("all_sent");
+    char buf[1024 * 32] = {};
+    tx_task_runner.PostTask([&cli, &buf, all_sent] {
+      for (size_t i = 0; i < kTotalBytes / sizeof(buf); i++)
+        cli->Send(buf, sizeof(buf), -1 /*fd*/,
+                  UnixSocket::BlockingMode::kBlocking);
+      all_sent();
+    });
+    tx_task_runner.RunUntilCheckpoint("all_sent", kTimeoutMs);
+  });
+
+  task_runner_.RunUntilCheckpoint("all_frames_done", kTimeoutMs);
+  tx_thread.join();
+}
+
+// Regression test for b/76155349 . If the receiver end disconnects while the
+// sender is in the middle of a large send(), the socket should gracefully give
+// up (i.e. Shutdown()) but not crash.
+TEST_F(UnixSocketTest, ReceiverDisconnectsDuringSend) {
+  auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_);
+  ASSERT_TRUE(srv->is_listening());
+  const int kTimeoutMs = 30000;
+
+  auto receive_done = task_runner_.CreateCheckpoint("receive_done");
+  EXPECT_CALL(event_listener_, OnNewIncomingConnection(srv.get(), _))
+      .WillOnce(Invoke([this, receive_done](UnixSocket*, UnixSocket* srv_conn) {
+        EXPECT_CALL(event_listener_, OnDataAvailable(srv_conn))
+            .WillOnce(Invoke([receive_done](UnixSocket* s) {
+              char buf[1024];
+              size_t res = s->Receive(buf, sizeof(buf));
+              ASSERT_EQ(1024u, res);
+              s->Shutdown(false /*notify*/);
+              receive_done();
+            }));
+      }));
+
+  // Perform the blocking send form another thread.
+  std::thread tx_thread([] {
+    TestTaskRunner tx_task_runner;
+    MockEventListener tx_events;
+    auto cli = UnixSocket::Connect(kSocketName, &tx_events, &tx_task_runner);
+
+    auto cli_connected = tx_task_runner.CreateCheckpoint("cli_connected");
+    EXPECT_CALL(tx_events, OnConnect(cli.get(), true))
+        .WillOnce(InvokeWithoutArgs(cli_connected));
+    tx_task_runner.RunUntilCheckpoint("cli_connected");
+
+    auto send_done = tx_task_runner.CreateCheckpoint("send_done");
+    // We need a
+    static constexpr size_t kBufSize = 32 * 1024 * 1024;
+    std::unique_ptr<char[]> buf(new char[kBufSize]());
+    tx_task_runner.PostTask([&cli, &buf, send_done] {
+      bool send_res = cli->Send(buf.get(), kBufSize, -1 /*fd*/,
+                                UnixSocket::BlockingMode::kBlocking);
+      ASSERT_FALSE(send_res);
+      send_done();
+    });
+
+    tx_task_runner.RunUntilCheckpoint("send_done", kTimeoutMs);
+  });
+  task_runner_.RunUntilCheckpoint("receive_done", kTimeoutMs);
+  tx_thread.join();
+}
+
+// TODO(primiano): add a test to check that in the case of a peer sending a fd
+// and the other end just doing a recv (without taking it), the fd is closed and
+// not left around.
+
+// TODO(primiano); add a test to check that a socket can be reused after
+// Shutdown(),
+
+// TODO(primiano): add a test to check that OnDisconnect() is called in all
+// possible cases.
+
+// TODO(primiano): add tests that destroy the socket in all possible stages and
+// verify that no spurious EventListener callback is received.
+
+}  // namespace
+}  // namespace base
+}  // namespace perfetto