Correctly handle case where sendmsg returns less than requested bytes.
Disallow kNonBlocking in UnixSocket as it does not work as expected.
Change-Id: I64747acb832b999e31e7b5cca0c583e0e7974532
Bug: 117139237
diff --git a/src/base/unix_socket_unittest.cc b/src/base/unix_socket_unittest.cc
index 5cdd6ab..382f9bc 100644
--- a/src/base/unix_socket_unittest.cc
+++ b/src/base/unix_socket_unittest.cc
@@ -41,6 +41,7 @@
using ::testing::Mock;
constexpr char kSocketName[] = TEST_SOCK_NAME("unix_socket_unittest");
+constexpr auto kBlocking = UnixSocket::BlockingMode::kBlocking;
class MockEventListener : public UnixSocket::EventListener {
public:
@@ -115,7 +116,7 @@
auto cli_disconnected = task_runner_.CreateCheckpoint("cli_disconnected");
EXPECT_CALL(event_listener_, OnDisconnect(cli.get()))
.WillOnce(InvokeWithoutArgs(cli_disconnected));
- EXPECT_FALSE(cli->Send("whatever"));
+ EXPECT_FALSE(cli->Send("whatever", kBlocking));
task_runner_.RunUntilCheckpoint("cli_disconnected");
}
@@ -153,8 +154,8 @@
ASSERT_EQ("cli>srv", s->ReceiveString());
srv_did_recv();
}));
- ASSERT_TRUE(cli->Send("cli>srv"));
- ASSERT_TRUE(srv_conn->Send("srv>cli"));
+ ASSERT_TRUE(cli->Send("cli>srv", kBlocking));
+ ASSERT_TRUE(srv_conn->Send("srv>cli", kBlocking));
task_runner_.RunUntilCheckpoint("cli_did_recv");
task_runner_.RunUntilCheckpoint("srv_did_recv");
@@ -168,8 +169,8 @@
ASSERT_EQ("", cli->ReceiveString());
ASSERT_EQ(0u, srv_conn->Receive(&msg, sizeof(msg)));
ASSERT_EQ("", srv_conn->ReceiveString());
- ASSERT_FALSE(cli->Send("foo"));
- ASSERT_FALSE(srv_conn->Send("bar"));
+ ASSERT_FALSE(cli->Send("foo", kBlocking));
+ ASSERT_FALSE(srv_conn->Send("bar", kBlocking));
srv->Shutdown(true);
task_runner_.RunUntilCheckpoint("cli_disconnected");
task_runner_.RunUntilCheckpoint("srv_disconnected");
@@ -244,10 +245,10 @@
int buf_fd[2] = {null_fd.get(), zero_fd.get()};
- ASSERT_TRUE(
- cli->Send(cli_str, sizeof(cli_str), buf_fd, base::ArraySize(buf_fd)));
+ ASSERT_TRUE(cli->Send(cli_str, sizeof(cli_str), buf_fd,
+ base::ArraySize(buf_fd), kBlocking));
ASSERT_TRUE(srv_conn->Send(srv_str, sizeof(srv_str), buf_fd,
- base::ArraySize(buf_fd)));
+ base::ArraySize(buf_fd), kBlocking));
task_runner_.RunUntilCheckpoint("srv_did_recv");
task_runner_.RunUntilCheckpoint("cli_did_recv");
@@ -300,7 +301,7 @@
EXPECT_CALL(event_listener_, OnDataAvailable(s))
.WillOnce(Invoke([](UnixSocket* t) {
ASSERT_EQ("PING", t->ReceiveString());
- ASSERT_TRUE(t->Send("PONG"));
+ ASSERT_TRUE(t->Send("PONG", kBlocking));
}));
}));
@@ -309,7 +310,7 @@
EXPECT_CALL(event_listener_, OnConnect(cli[i].get(), true))
.WillOnce(Invoke([](UnixSocket* s, bool success) {
ASSERT_TRUE(success);
- ASSERT_TRUE(s->Send("PING"));
+ ASSERT_TRUE(s->Send("PING", kBlocking));
}));
auto checkpoint = task_runner_.CreateCheckpoint(std::to_string(i));
@@ -356,7 +357,7 @@
.WillOnce(Invoke(
[this, tmp_fd, checkpoint, mem](UnixSocket*, UnixSocket* new_conn) {
ASSERT_EQ(geteuid(), static_cast<uint32_t>(new_conn->peer_uid()));
- ASSERT_TRUE(new_conn->Send("txfd", 5, tmp_fd));
+ ASSERT_TRUE(new_conn->Send("txfd", 5, tmp_fd, kBlocking));
// Wait for the client to change this again.
EXPECT_CALL(event_listener_, OnDataAvailable(new_conn))
.WillOnce(Invoke([checkpoint, mem](UnixSocket* s) {
@@ -391,7 +392,7 @@
// Now change the shared memory and ping the other process.
memcpy(mem, "rock more", 10);
- ASSERT_TRUE(s->Send("change notify"));
+ ASSERT_TRUE(s->Send("change notify", kBlocking));
checkpoint();
}));
task_runner_.RunUntilCheckpoint("change_seen_by_client");
@@ -409,7 +410,7 @@
int num_frame) {
char buf[kAtomicWrites_FrameSize];
memset(buf, static_cast<char>(num_frame), sizeof(buf));
- if (s->Send(buf, sizeof(buf)))
+ if (s->Send(buf, sizeof(buf), -1, kBlocking))
return true;
task_runner->PostTask(
std::bind(&AtomicWrites_SendAttempt, s, task_runner, num_frame));
@@ -560,8 +561,7 @@
char buf[1024 * 32] = {};
tx_task_runner.PostTask([&cli, &buf, all_sent] {
for (size_t i = 0; i < kTotalBytes / sizeof(buf); i++)
- cli->Send(buf, sizeof(buf), -1 /*fd*/,
- UnixSocket::BlockingMode::kBlocking);
+ cli->Send(buf, sizeof(buf), -1 /*fd*/, kBlocking);
all_sent();
});
tx_task_runner.RunUntilCheckpoint("all_sent", kTimeoutMs);
@@ -608,8 +608,7 @@
static constexpr size_t kBufSize = 32 * 1024 * 1024;
std::unique_ptr<char[]> buf(new char[kBufSize]());
tx_task_runner.PostTask([&cli, &buf, send_done] {
- bool send_res = cli->Send(buf.get(), kBufSize, -1 /*fd*/,
- UnixSocket::BlockingMode::kBlocking);
+ bool send_res = cli->Send(buf.get(), kBufSize, -1 /*fd*/, kBlocking);
ASSERT_FALSE(send_res);
send_done();
});
@@ -620,6 +619,166 @@
tx_thread.join();
}
+TEST_F(UnixSocketTest, ShiftMsgHdrSendPartialFirst) {
+ // Send a part of the first iov, then send the rest.
+ struct iovec iov[2] = {};
+ char hello[] = "hello";
+ char world[] = "world";
+ iov[0].iov_base = &hello[0];
+ iov[0].iov_len = base::ArraySize(hello);
+
+ iov[1].iov_base = &world[0];
+ iov[1].iov_len = base::ArraySize(world);
+
+ struct msghdr hdr = {};
+ hdr.msg_iov = iov;
+ hdr.msg_iovlen = base::ArraySize(iov);
+
+ ShiftMsgHdr(1, &hdr);
+ EXPECT_NE(hdr.msg_iov, nullptr);
+ EXPECT_EQ(hdr.msg_iov[0].iov_base, &hello[1]);
+ EXPECT_EQ(hdr.msg_iov[1].iov_base, &world[0]);
+ EXPECT_EQ(hdr.msg_iovlen, 2);
+ EXPECT_STREQ(reinterpret_cast<char*>(hdr.msg_iov[0].iov_base), "ello");
+ EXPECT_EQ(iov[0].iov_len, base::ArraySize(hello) - 1);
+
+ ShiftMsgHdr(base::ArraySize(hello) - 1, &hdr);
+ EXPECT_EQ(hdr.msg_iov, &iov[1]);
+ EXPECT_EQ(hdr.msg_iovlen, 1);
+ EXPECT_STREQ(reinterpret_cast<char*>(hdr.msg_iov[0].iov_base), world);
+ EXPECT_EQ(hdr.msg_iov[0].iov_len, base::ArraySize(world));
+
+ ShiftMsgHdr(base::ArraySize(world), &hdr);
+ EXPECT_EQ(hdr.msg_iov, nullptr);
+ EXPECT_EQ(hdr.msg_iovlen, 0);
+}
+
+TEST_F(UnixSocketTest, ShiftMsgHdrSendFirstAndPartial) {
+ // Send first iov and part of the second iov, then send the rest.
+ struct iovec iov[2] = {};
+ char hello[] = "hello";
+ char world[] = "world";
+ iov[0].iov_base = &hello[0];
+ iov[0].iov_len = base::ArraySize(hello);
+
+ iov[1].iov_base = &world[0];
+ iov[1].iov_len = base::ArraySize(world);
+
+ struct msghdr hdr = {};
+ hdr.msg_iov = iov;
+ hdr.msg_iovlen = base::ArraySize(iov);
+
+ ShiftMsgHdr(base::ArraySize(hello) + 1, &hdr);
+ EXPECT_NE(hdr.msg_iov, nullptr);
+ EXPECT_EQ(hdr.msg_iovlen, 1);
+ EXPECT_STREQ(reinterpret_cast<char*>(hdr.msg_iov[0].iov_base), "orld");
+ EXPECT_EQ(hdr.msg_iov[0].iov_len, base::ArraySize(world) - 1);
+
+ ShiftMsgHdr(base::ArraySize(world) - 1, &hdr);
+ EXPECT_EQ(hdr.msg_iov, nullptr);
+ EXPECT_EQ(hdr.msg_iovlen, 0);
+}
+
+TEST_F(UnixSocketTest, ShiftMsgHdrSendEverything) {
+ // Send everything at once.
+ struct iovec iov[2] = {};
+ char hello[] = "hello";
+ char world[] = "world";
+ iov[0].iov_base = &hello[0];
+ iov[0].iov_len = base::ArraySize(hello);
+
+ iov[1].iov_base = &world[0];
+ iov[1].iov_len = base::ArraySize(world);
+
+ struct msghdr hdr = {};
+ hdr.msg_iov = iov;
+ hdr.msg_iovlen = base::ArraySize(iov);
+
+ ShiftMsgHdr(base::ArraySize(world) + base::ArraySize(hello), &hdr);
+ EXPECT_EQ(hdr.msg_iov, nullptr);
+ EXPECT_EQ(hdr.msg_iovlen, 0);
+}
+
+void Handler(int) {}
+
+int RollbackSigaction(const struct sigaction* act) {
+ return sigaction(SIGWINCH, act, nullptr);
+}
+
+TEST_F(UnixSocketTest, PartialSendMsgAll) {
+ int sv[2];
+ ASSERT_EQ(socketpair(AF_UNIX, SOCK_STREAM, 0, sv), 0);
+ base::ScopedFile send_socket(sv[0]);
+ base::ScopedFile recv_socket(sv[1]);
+
+ // Set bufsize to minimum.
+ int bufsize = 1024;
+ ASSERT_EQ(setsockopt(*send_socket, SOL_SOCKET, SO_SNDBUF, &bufsize,
+ sizeof(bufsize)),
+ 0);
+ ASSERT_EQ(setsockopt(*recv_socket, SOL_SOCKET, SO_RCVBUF, &bufsize,
+ sizeof(bufsize)),
+ 0);
+
+ // Send something larger than send + recv kernel buffers combined to make
+ // sendmsg block.
+ char send_buf[8192];
+ // Make MSAN happy.
+ for (size_t i = 0; i < sizeof(send_buf); ++i)
+ send_buf[i] = static_cast<char>(i % 256);
+ char recv_buf[sizeof(send_buf)];
+
+ // Need to install signal handler to cause the interrupt to happen.
+ // man 3 pthread_kill:
+ // Signal dispositions are process-wide: if a signal handler is
+ // installed, the handler will be invoked in the thread thread, but if
+ // the disposition of the signal is "stop", "continue", or "terminate",
+ // this action will affect the whole process.
+ struct sigaction oldact;
+ struct sigaction newact = {};
+ newact.sa_handler = Handler;
+ ASSERT_EQ(sigaction(SIGWINCH, &newact, &oldact), 0);
+ base::ScopedResource<const struct sigaction*, RollbackSigaction, nullptr>
+ rollback(&oldact);
+
+ auto blocked_thread = pthread_self();
+ std::thread th([blocked_thread, &recv_socket, &recv_buf] {
+ ssize_t rd = PERFETTO_EINTR(read(*recv_socket, recv_buf, 1));
+ ASSERT_EQ(rd, 1);
+ // We are now sure the other thread is in sendmsg, interrupt send.
+ ASSERT_EQ(pthread_kill(blocked_thread, SIGWINCH), 0);
+ // Drain the socket to allow SendMsgAll to succeed.
+ size_t offset = 1;
+ while (offset < sizeof(recv_buf)) {
+ rd = PERFETTO_EINTR(
+ read(*recv_socket, recv_buf + offset, sizeof(recv_buf) - offset));
+ ASSERT_GE(rd, 0);
+ offset += static_cast<size_t>(rd);
+ }
+ });
+
+ // Test sending the send_buf in several chunks as an iov to exercise the
+ // more complicated code-paths of SendMsgAll.
+ struct msghdr hdr = {};
+ struct iovec iov[4];
+ static_assert(sizeof(send_buf) % base::ArraySize(iov) == 0,
+ "Cannot split buffer into even pieces.");
+ constexpr size_t kChunkSize = sizeof(send_buf) / base::ArraySize(iov);
+ for (size_t i = 0; i < base::ArraySize(iov); ++i) {
+ iov[i].iov_base = send_buf + i * kChunkSize;
+ iov[i].iov_len = kChunkSize;
+ }
+ hdr.msg_iov = iov;
+ hdr.msg_iovlen = base::ArraySize(iov);
+
+ ASSERT_EQ(SendMsgAll(*send_socket, &hdr, 0), sizeof(send_buf));
+ send_socket.reset();
+ th.join();
+ // Make sure the re-entry logic was actually triggered.
+ ASSERT_EQ(hdr.msg_iov, nullptr);
+ ASSERT_EQ(memcmp(send_buf, recv_buf, sizeof(send_buf)), 0);
+}
+
// TODO(primiano): add a test to check that in the case of a peer sending a fd
// and the other end just doing a recv (without taking it), the fd is closed and
// not left around.