UnixSocket: use poll() for send timeout.

SO_SNDTIMEO doesn't behave as expected. In rare cases
(if the receiver consumes input very slowly) a sendmsg()
can block for longer than the timeout.
This behavior can be reproduced reliably on MacOs
(see UnixSocketTest.BlockingSendTimeout). On Linux it's
harder to repro and seems to require a suspend/resume in the
middle of the send.
This CL changes the implementation of SendMsgAllPosix(). When
the socket is in blocking mode and a timeout is set, we switch
to a combination of non-blocking send + poll().

Bug: 193234818
Change-Id: Ic7fc19107d683325589cd336703cddd8ad341a88
diff --git a/src/base/unix_socket.cc b/src/base/unix_socket.cc
index 5986917..3088399 100644
--- a/src/base/unix_socket.cc
+++ b/src/base/unix_socket.cc
@@ -36,6 +36,7 @@
 #include <netdb.h>
 #include <netinet/in.h>
 #include <netinet/tcp.h>
+#include <poll.h>
 #include <sys/socket.h>
 #include <sys/un.h>
 #include <unistd.h>
@@ -51,6 +52,7 @@
 #include "perfetto/base/build_config.h"
 #include "perfetto/base/logging.h"
 #include "perfetto/base/task_runner.h"
+#include "perfetto/base/time.h"
 #include "perfetto/ext/base/string_utils.h"
 #include "perfetto/ext/base/utils.h"
 
@@ -66,16 +68,6 @@
 
 namespace {
 
-// MSG_NOSIGNAL is not supported on Mac OS X, but in that case the socket is
-// created with SO_NOSIGPIPE (See InitializeSocket()).
-// On Windows this does't apply as signals don't exist.
-#if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
-#elif PERFETTO_BUILDFLAG(PERFETTO_OS_APPLE)
-constexpr int kNoSigPipe = 0;
-#else
-constexpr int kNoSigPipe = MSG_NOSIGNAL;
-#endif
-
 // Android takes an int instead of socklen_t for the control buffer size.
 #if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
 using CBufLenType = size_t;
@@ -431,19 +423,60 @@
   // This does not make sense on non-blocking sockets.
   PERFETTO_DCHECK(fd_);
 
+  const bool is_blocking_with_timeout =
+      tx_timeout_ms_ > 0 && ((fcntl(*fd_, F_GETFL, 0) & O_NONBLOCK) == 0);
+  const int64_t start_ms = GetWallTimeMs().count();
+
+  // Waits until some space is available in the tx buffer.
+  // Returns true if some buffer space is available, false if times out.
+  auto poll_or_timeout = [&] {
+    PERFETTO_DCHECK(is_blocking_with_timeout);
+    const int64_t deadline = start_ms + tx_timeout_ms_;
+    const int64_t now_ms = GetWallTimeMs().count();
+    if (now_ms >= deadline)
+      return false;  // Timed out
+    const int timeout_ms = static_cast<int>(deadline - now_ms);
+    pollfd pfd{*fd_, POLLOUT, 0};
+    return PERFETTO_EINTR(poll(&pfd, 1, timeout_ms)) > 0;
+  };
+
+// We implement blocking sends that require a timeout as non-blocking + poll.
+// This is because SO_SNDTIMEO doesn't work as expected (b/193234818). On linux
+// we can just pass MSG_DONTWAIT to force the send to be non-blocking. On Mac,
+// instead we need to flip the O_NONBLOCK flag back and forth.
+#if PERFETTO_BUILDFLAG(PERFETTO_OS_APPLE)
+  // MSG_NOSIGNAL is not supported on Mac OS X, but in that case the socket is
+  // created with SO_NOSIGPIPE (See InitializeSocket()).
+  int send_flags = 0;
+
+  if (is_blocking_with_timeout)
+    SetBlocking(false);
+
+  auto reset_nonblock_on_exit = OnScopeExit([&] {
+    if (is_blocking_with_timeout)
+      SetBlocking(true);
+  });
+#else
+  int send_flags = MSG_NOSIGNAL | (is_blocking_with_timeout ? MSG_DONTWAIT : 0);
+#endif
+
   ssize_t total_sent = 0;
   while (msg->msg_iov) {
-    ssize_t sent = PERFETTO_EINTR(sendmsg(*fd_, msg, kNoSigPipe));
-    if (sent <= 0) {
-      if (sent == -1 && IsAgain(errno))
-        return total_sent;
-      return sent;
+    ssize_t send_res = PERFETTO_EINTR(sendmsg(*fd_, msg, send_flags));
+    if (send_res == -1 && IsAgain(errno)) {
+      if (is_blocking_with_timeout && poll_or_timeout()) {
+        continue;  // Tx buffer unblocked, repeat the loop.
+      }
+      return total_sent;
+    } else if (send_res <= 0) {
+      return send_res;  // An error occurred.
+    } else {
+      total_sent += send_res;
+      ShiftMsgHdrPosix(static_cast<size_t>(send_res), msg);
+      // Only send the ancillary data with the first sendmsg call.
+      msg->msg_control = nullptr;
+      msg->msg_controllen = 0;
     }
-    total_sent += sent;
-    ShiftMsgHdrPosix(static_cast<size_t>(sent), msg);
-    // Only send the ancillary data with the first sendmsg call.
-    msg->msg_control = nullptr;
-    msg->msg_controllen = 0;
   }
   return total_sent;
 }
@@ -541,8 +574,14 @@
 
 bool UnixSocketRaw::SetTxTimeout(uint32_t timeout_ms) {
   PERFETTO_DCHECK(fd_);
+  // On Unix-based systems, SO_SNDTIMEO isn't used for Send() because it's
+  // unreliable (b/193234818). Instead we use non-blocking sendmsg() + poll().
+  // See SendMsgAllPosix(). We still make the setsockopt call because
+  // SO_SNDTIMEO also affects connect().
+  tx_timeout_ms_ = timeout_ms;
 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
   DWORD timeout = timeout_ms;
+  ignore_result(tx_timeout_ms_);
 #else
   struct timeval timeout {};
   uint32_t timeout_sec = timeout_ms / 1000;
diff --git a/src/base/unix_socket_unittest.cc b/src/base/unix_socket_unittest.cc
index 86a6bb5..c838663 100644
--- a/src/base/unix_socket_unittest.cc
+++ b/src/base/unix_socket_unittest.cc
@@ -30,6 +30,7 @@
 #include "perfetto/base/build_config.h"
 #include "perfetto/base/logging.h"
 #include "perfetto/ext/base/file_utils.h"
+#include "perfetto/ext/base/periodic_task.h"
 #include "perfetto/ext/base/pipe.h"
 #include "perfetto/ext/base/temp_file.h"
 #include "perfetto/ext/base/utils.h"
@@ -331,6 +332,53 @@
   tx_thread.join();
 }
 
+// Regression test for b/193234818. SO_SNDTIMEO is unreliable on most systems.
+// It doesn't guarantee that the whole send() call blocks for at most X, as the
+// kernel rearms the timeout if the send buffers frees up and allows a partial
+// send. This test reproduces the issue 100% on Mac. Unfortunately on Linux the
+// repro seem to happen only when a suspend happens in the middle.
+TEST_F(UnixSocketTest, BlockingSendTimeout) {
+  TestTaskRunner ttr;
+  UnixSocketRaw send_sock;
+  UnixSocketRaw recv_sock;
+  std::tie(send_sock, recv_sock) =
+      UnixSocketRaw::CreatePairPosix(kTestSocket.family(), SockType::kStream);
+
+  auto blocking_send_done = ttr.CreateCheckpoint("blocking_send_done");
+
+  std::thread tx_thread([&] {
+    // Fill the tx buffer in non-blocking mode.
+    send_sock.SetBlocking(false);
+    char buf[1024 * 16]{};
+    while (send_sock.Send(buf, sizeof(buf)) > 0) {
+    }
+
+    // Then do a blocking send. It should return a partial value within the tx
+    // timeout.
+    send_sock.SetBlocking(true);
+    send_sock.SetTxTimeout(10);
+    ASSERT_LT(send_sock.Send(buf, sizeof(buf)),
+              static_cast<ssize_t>(sizeof(buf)));
+    ttr.PostTask(blocking_send_done);
+  });
+
+  // This task needs to be slow enough so that doesn't unblock the send, but
+  // fast enough so that within a blocking cycle, the send re-attempts and
+  // re-arms the timeout.
+  PeriodicTask read_slowly_task(&ttr);
+  PeriodicTask::Args args;
+  args.period_ms = 1;  // Read 1 byte every ms (1 KiB/s).
+  args.task = [&] {
+    char rxbuf[1]{};
+    recv_sock.Receive(rxbuf, sizeof(rxbuf));
+  };
+  read_slowly_task.Start(args);
+
+  ttr.RunUntilCheckpoint("blocking_send_done");
+  read_slowly_task.Reset();
+  tx_thread.join();
+}
+
 // Regression test for b/76155349 . If the receiver end disconnects while the
 // sender is in the middle of a large send(), the socket should gracefully give
 // up (i.e. Shutdown()) but not crash.