Add TCP/IP support to UnixSocket
Adds the ability to create TCP sockets. This
will be used by the trace processor HTTP interface
(See bug).
Arguably the class should be renamed to just Socket,
but that is a larger change and perhaps not worth given
the nicher use case for TCP.
This CL also gets rid of the default args and makes all
call-sites explicit to avoid bugs in future.
Bug: 143074239
Change-Id: Ib3a66e1f33ff2c025defb0ecd066b0e0e491e2e9
diff --git a/src/base/unix_socket.cc b/src/base/unix_socket.cc
index 663cb14..e63f978 100644
--- a/src/base/unix_socket.cc
+++ b/src/base/unix_socket.cc
@@ -18,6 +18,8 @@
#include <errno.h>
#include <fcntl.h>
+#include <netdb.h>
+#include <netinet/in.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
@@ -32,6 +34,7 @@
#include "perfetto/base/build_config.h"
#include "perfetto/base/logging.h"
#include "perfetto/base/task_runner.h"
+#include "perfetto/ext/base/string_utils.h"
#include "perfetto/ext/base/utils.h"
#if PERFETTO_BUILDFLAG(PERFETTO_OS_MACOSX)
@@ -48,6 +51,7 @@
#endif
namespace {
+
// MSG_NOSIGNAL is not supported on Mac OS X, but in that case the socket is
// created with SO_NOSIGPIPE (See InitializeSocket()).
#if PERFETTO_BUILDFLAG(PERFETTO_OS_MACOSX)
@@ -63,7 +67,36 @@
using CBufLenType = socklen_t;
#endif
-inline int GetUnixSockType(SockType type) {
+// A wrapper around variable-size sockaddr structs.
+// This is solving the following problem: when calling connect() or bind(), the
+// caller needs to take care to allocate the right struct (sockaddr_un for
+// AF_UNIX, sockaddr_in for AF_INET). Those structs have different sizes and,
+// more importantly, are bigger than the base struct sockaddr.
+struct SockaddrAny {
+ SockaddrAny() : size() {}
+ SockaddrAny(const void* addr, socklen_t sz) : data(new char[sz]), size(sz) {
+ memcpy(data.get(), addr, static_cast<size_t>(size));
+ }
+
+ const struct sockaddr* addr() const {
+ return reinterpret_cast<const struct sockaddr*>(data.get());
+ }
+
+ std::unique_ptr<char[]> data;
+ socklen_t size;
+};
+
+inline int GetSockFamily(SockFamily family) {
+ switch (family) {
+ case SockFamily::kUnix:
+ return AF_UNIX;
+ case SockFamily::kInet:
+ return AF_INET;
+ }
+ PERFETTO_CHECK(false); // For GCC.
+}
+
+inline int GetSockType(SockType type) {
switch (type) {
case SockType::kStream:
return SOCK_STREAM;
@@ -72,25 +105,42 @@
case SockType::kSeqPacket:
return SOCK_SEQPACKET;
}
- PERFETTO_CHECK(false);
+ PERFETTO_CHECK(false); // For GCC.
}
-bool MakeSockAddr(const std::string& socket_name,
- sockaddr_un* addr,
- socklen_t* addr_size) {
- memset(addr, 0, sizeof(*addr));
- const size_t name_len = socket_name.size();
- if (name_len >= sizeof(addr->sun_path)) {
- errno = ENAMETOOLONG;
- return false;
+SockaddrAny MakeSockAddr(SockFamily family, const std::string& socket_name) {
+ switch (family) {
+ case SockFamily::kUnix: {
+ struct sockaddr_un saddr {};
+ const size_t name_len = socket_name.size();
+ if (name_len >= sizeof(saddr.sun_path)) {
+ errno = ENAMETOOLONG;
+ return SockaddrAny();
+ }
+ memcpy(saddr.sun_path, socket_name.data(), name_len);
+ if (saddr.sun_path[0] == '@')
+ saddr.sun_path[0] = '\0';
+ saddr.sun_family = AF_UNIX;
+ auto size = static_cast<socklen_t>(
+ __builtin_offsetof(sockaddr_un, sun_path) + name_len + 1);
+ PERFETTO_CHECK(static_cast<size_t>(size) <= sizeof(saddr));
+ return SockaddrAny(&saddr, size);
+ }
+ case SockFamily::kInet: {
+ auto parts = SplitString(socket_name, ":");
+ PERFETTO_CHECK(parts.size() == 2);
+ struct addrinfo* addr_info = nullptr;
+ struct addrinfo hints {};
+ hints.ai_family = AF_INET;
+ PERFETTO_CHECK(getaddrinfo(parts[0].c_str(), parts[1].c_str(), &hints,
+ &addr_info) == 0);
+ PERFETTO_CHECK(addr_info->ai_family == AF_INET);
+ SockaddrAny res(addr_info->ai_addr, addr_info->ai_addrlen);
+ freeaddrinfo(addr_info);
+ return res;
+ }
}
- memcpy(addr->sun_path, socket_name.data(), name_len);
- if (addr->sun_path[0] == '@')
- addr->sun_path[0] = '\0';
- addr->sun_family = AF_UNIX;
- *addr_size = static_cast<socklen_t>(
- __builtin_offsetof(sockaddr_un, sun_path) + name_len + 1);
- return true;
+ PERFETTO_CHECK(false); // For GCC.
}
} // namespace
@@ -122,29 +172,39 @@
}
// static
-std::pair<UnixSocketRaw, UnixSocketRaw> UnixSocketRaw::CreatePair(SockType t) {
+std::pair<UnixSocketRaw, UnixSocketRaw> UnixSocketRaw::CreatePair(
+ SockFamily family,
+ SockType type) {
int fds[2];
- if (socketpair(AF_UNIX, GetUnixSockType(t), 0, fds) != 0)
+ if (socketpair(GetSockFamily(family), GetSockType(type), 0, fds) != 0)
return std::make_pair(UnixSocketRaw(), UnixSocketRaw());
- return std::make_pair(UnixSocketRaw(ScopedFile(fds[0]), t),
- UnixSocketRaw(ScopedFile(fds[1]), t));
+ return std::make_pair(UnixSocketRaw(ScopedFile(fds[0]), family, type),
+ UnixSocketRaw(ScopedFile(fds[1]), family, type));
}
UnixSocketRaw::UnixSocketRaw() = default;
-UnixSocketRaw::UnixSocketRaw(SockType type)
- : UnixSocketRaw(ScopedFile(socket(AF_UNIX, GetUnixSockType(type), 0)),
- type) {}
+UnixSocketRaw::UnixSocketRaw(SockFamily family, SockType type)
+ : UnixSocketRaw(
+ ScopedFile(socket(GetSockFamily(family), GetSockType(type), 0)),
+ family,
+ type) {}
-UnixSocketRaw::UnixSocketRaw(ScopedFile fd, SockType type)
- : fd_(std::move(fd)), type_(type) {
+UnixSocketRaw::UnixSocketRaw(ScopedFile fd, SockFamily family, SockType type)
+ : fd_(std::move(fd)), family_(family), type_(type) {
PERFETTO_CHECK(fd_);
#if PERFETTO_BUILDFLAG(PERFETTO_OS_MACOSX)
const int no_sigpipe = 1;
setsockopt(*fd_, SOL_SOCKET, SO_NOSIGPIPE, &no_sigpipe, sizeof(no_sigpipe));
#endif
+ if (family == SockFamily::kInet) {
+ int flag = 1;
+ PERFETTO_CHECK(
+ !setsockopt(*fd_, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof(flag)));
+ }
+
// There is no reason why a socket should outlive the process in case of
// exec() by default, this is just working around a broken unix design.
int fcntl_res = fcntl(*fd_, F_SETFD, FD_CLOEXEC);
@@ -178,12 +238,11 @@
bool UnixSocketRaw::Bind(const std::string& socket_name) {
PERFETTO_DCHECK(fd_);
- sockaddr_un addr;
- socklen_t addr_size;
- if (!MakeSockAddr(socket_name, &addr, &addr_size))
+ SockaddrAny addr = MakeSockAddr(family_, socket_name);
+ if (addr.size == 0)
return false;
- if (bind(*fd_, reinterpret_cast<sockaddr*>(&addr), addr_size)) {
+ if (bind(*fd_, addr.addr(), addr.size)) {
PERFETTO_DPLOG("bind(%s)", socket_name.c_str());
return false;
}
@@ -199,13 +258,11 @@
bool UnixSocketRaw::Connect(const std::string& socket_name) {
PERFETTO_DCHECK(fd_);
- sockaddr_un addr;
- socklen_t addr_size;
- if (!MakeSockAddr(socket_name, &addr, &addr_size))
+ SockaddrAny addr = MakeSockAddr(family_, socket_name);
+ if (addr.size == 0)
return false;
- int res = PERFETTO_EINTR(
- connect(*fd_, reinterpret_cast<sockaddr*>(&addr), addr_size));
+ int res = PERFETTO_EINTR(connect(*fd_, addr.addr(), addr.size));
if (res && errno != EINPROGRESS)
return false;
@@ -371,32 +428,36 @@
std::unique_ptr<UnixSocket> UnixSocket::Listen(const std::string& socket_name,
EventListener* event_listener,
TaskRunner* task_runner,
+ SockFamily sock_family,
SockType sock_type) {
- auto sock_raw = UnixSocketRaw::CreateMayFail(sock_type);
+ auto sock_raw = UnixSocketRaw::CreateMayFail(sock_family, sock_type);
if (!sock_raw || !sock_raw.Bind(socket_name))
return nullptr;
// Forward the call to the Listen() overload below.
- return Listen(sock_raw.ReleaseFd(), event_listener, task_runner, sock_type);
+ return Listen(sock_raw.ReleaseFd(), event_listener, task_runner, sock_family,
+ sock_type);
}
// static
std::unique_ptr<UnixSocket> UnixSocket::Listen(ScopedFile fd,
EventListener* event_listener,
TaskRunner* task_runner,
+ SockFamily sock_family,
SockType sock_type) {
return std::unique_ptr<UnixSocket>(
new UnixSocket(event_listener, task_runner, std::move(fd),
- State::kListening, sock_type));
+ State::kListening, sock_family, sock_type));
}
// static
std::unique_ptr<UnixSocket> UnixSocket::Connect(const std::string& socket_name,
EventListener* event_listener,
TaskRunner* task_runner,
+ SockFamily sock_family,
SockType sock_type) {
std::unique_ptr<UnixSocket> sock(
- new UnixSocket(event_listener, task_runner, sock_type));
+ new UnixSocket(event_listener, task_runner, sock_family, sock_type));
sock->DoConnect(socket_name);
return sock;
}
@@ -406,25 +467,29 @@
ScopedFile fd,
EventListener* event_listener,
TaskRunner* task_runner,
+ SockFamily sock_family,
SockType sock_type) {
return std::unique_ptr<UnixSocket>(
new UnixSocket(event_listener, task_runner, std::move(fd),
- State::kConnected, sock_type));
+ State::kConnected, sock_family, sock_type));
}
UnixSocket::UnixSocket(EventListener* event_listener,
TaskRunner* task_runner,
+ SockFamily sock_family,
SockType sock_type)
: UnixSocket(event_listener,
task_runner,
ScopedFile(),
State::kDisconnected,
+ sock_family,
sock_type) {}
UnixSocket::UnixSocket(EventListener* event_listener,
TaskRunner* task_runner,
ScopedFile adopt_fd,
State adopt_state,
+ SockFamily sock_family,
SockType sock_type)
: event_listener_(event_listener),
task_runner_(task_runner),
@@ -432,14 +497,14 @@
state_ = State::kDisconnected;
if (adopt_state == State::kDisconnected) {
PERFETTO_DCHECK(!adopt_fd);
- sock_raw_ = UnixSocketRaw::CreateMayFail(sock_type);
+ sock_raw_ = UnixSocketRaw::CreateMayFail(sock_family, sock_type);
if (!sock_raw_) {
last_error_ = errno;
return;
}
} else if (adopt_state == State::kConnected) {
PERFETTO_DCHECK(adopt_fd);
- sock_raw_ = UnixSocketRaw(std::move(adopt_fd), sock_type);
+ sock_raw_ = UnixSocketRaw(std::move(adopt_fd), sock_family, sock_type);
state_ = State::kConnected;
ReadPeerCredentials();
} else if (adopt_state == State::kListening) {
@@ -451,7 +516,7 @@
return;
}
- sock_raw_ = UnixSocketRaw(std::move(adopt_fd), sock_type);
+ sock_raw_ = UnixSocketRaw(std::move(adopt_fd), sock_family, sock_type);
if (!sock_raw_.Listen()) {
last_error_ = errno;
PERFETTO_DPLOG("listen()");
@@ -468,6 +533,7 @@
sock_raw_.SetBlocking(false);
WeakPtr<UnixSocket> weak_ptr = weak_ptr_factory_.GetWeakPtr();
+
task_runner_->AddFileDescriptorWatch(sock_raw_.fd(), [weak_ptr] {
if (weak_ptr)
weak_ptr->OnEvent();
@@ -522,6 +588,10 @@
}
void UnixSocket::ReadPeerCredentials() {
+ // Peer credentials are supported only on AF_UNIX sockets.
+ if (sock_raw_.family() != SockFamily::kUnix)
+ return;
+
#if PERFETTO_BUILDFLAG(PERFETTO_OS_LINUX) || \
PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
struct ucred user_cred;
@@ -554,6 +624,7 @@
socklen_t err_len = sizeof(sock_err);
int res =
getsockopt(sock_raw_.fd(), SOL_SOCKET, SO_ERROR, &sock_err, &err_len);
+
if (res == 0 && sock_err == EINPROGRESS)
return; // Not connected yet, just a spurious FD watch wakeup.
if (res == 0 && sock_err == 0) {
@@ -561,6 +632,7 @@
state_ = State::kConnected;
return event_listener_->OnConnect(this, true /* connected */);
}
+ PERFETTO_DLOG("Connection error: %s", strerror(sock_err));
last_error_ = sock_err;
Shutdown(false);
return event_listener_->OnConnect(this, false /* connected */);
@@ -571,15 +643,15 @@
// There could be more than one incoming connection behind each FD watch
// notification. Drain'em all.
for (;;) {
- sockaddr_un cli_addr = {};
+ struct sockaddr_in cli_addr {};
socklen_t size = sizeof(cli_addr);
ScopedFile new_fd(PERFETTO_EINTR(accept(
sock_raw_.fd(), reinterpret_cast<sockaddr*>(&cli_addr), &size)));
if (!new_fd)
return;
- std::unique_ptr<UnixSocket> new_sock(
- new UnixSocket(event_listener_, task_runner_, std::move(new_fd),
- State::kConnected, sock_raw_.type()));
+ std::unique_ptr<UnixSocket> new_sock(new UnixSocket(
+ event_listener_, task_runner_, std::move(new_fd), State::kConnected,
+ sock_raw_.family(), sock_raw_.type()));
event_listener_->OnNewIncomingConnection(this, std::move(new_sock));
}
}
diff --git a/src/base/unix_socket_unittest.cc b/src/base/unix_socket_unittest.cc
index 8f54cab..138f631 100644
--- a/src/base/unix_socket_unittest.cc
+++ b/src/base/unix_socket_unittest.cc
@@ -84,7 +84,8 @@
};
TEST_F(UnixSocketTest, ConnectionFailureIfUnreachable) {
- auto cli = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_);
+ auto cli = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_,
+ SockFamily::kUnix, SockType::kStream);
ASSERT_FALSE(cli->is_connected());
auto checkpoint = task_runner_.CreateCheckpoint("failure");
EXPECT_CALL(event_listener_, OnConnect(cli.get(), false))
@@ -95,7 +96,8 @@
// Both server and client should see an OnDisconnect() if the server drops
// incoming connections immediately as they are created.
TEST_F(UnixSocketTest, ConnectionImmediatelyDroppedByServer) {
- auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_);
+ auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_,
+ SockFamily::kUnix, SockType::kStream);
ASSERT_TRUE(srv->is_listening());
// The server will immediately shutdown the connection upon
@@ -110,7 +112,8 @@
}));
auto checkpoint = task_runner_.CreateCheckpoint("cli_connected");
- auto cli = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_);
+ auto cli = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_,
+ SockFamily::kUnix, SockType::kStream);
EXPECT_CALL(event_listener_, OnConnect(cli.get(), true))
.WillOnce(InvokeWithoutArgs(checkpoint));
task_runner_.RunUntilCheckpoint("cli_connected");
@@ -125,10 +128,12 @@
}
TEST_F(UnixSocketTest, ClientAndServerExchangeData) {
- auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_);
+ auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_,
+ SockFamily::kUnix, SockType::kStream);
ASSERT_TRUE(srv->is_listening());
- auto cli = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_);
+ auto cli = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_,
+ SockFamily::kUnix, SockType::kStream);
EXPECT_CALL(event_listener_, OnConnect(cli.get(), true));
auto cli_connected = task_runner_.CreateCheckpoint("cli_connected");
auto srv_disconnected = task_runner_.CreateCheckpoint("srv_disconnected");
@@ -184,10 +189,12 @@
constexpr char srv_str[] = "srv>cli";
TEST_F(UnixSocketTest, ClientAndServerExchangeFDs) {
- auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_);
+ auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_,
+ SockFamily::kUnix, SockType::kStream);
ASSERT_TRUE(srv->is_listening());
- auto cli = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_);
+ auto cli = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_,
+ SockFamily::kUnix, SockType::kStream);
EXPECT_CALL(event_listener_, OnConnect(cli.get(), true));
auto cli_connected = task_runner_.CreateCheckpoint("cli_connected");
auto srv_disconnected = task_runner_.CreateCheckpoint("srv_disconnected");
@@ -266,13 +273,16 @@
}
TEST_F(UnixSocketTest, ListenWithPassedFileDescriptor) {
- auto sock_raw = UnixSocketRaw::CreateMayFail(SockType::kStream);
+ auto sock_raw =
+ UnixSocketRaw::CreateMayFail(SockFamily::kUnix, SockType::kStream);
ASSERT_TRUE(sock_raw.Bind(kSocketName));
auto fd = sock_raw.ReleaseFd();
- auto srv = UnixSocket::Listen(std::move(fd), &event_listener_, &task_runner_);
+ auto srv = UnixSocket::Listen(std::move(fd), &event_listener_, &task_runner_,
+ SockFamily::kUnix, SockType::kStream);
ASSERT_TRUE(srv->is_listening());
- auto cli = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_);
+ auto cli = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_,
+ SockFamily::kUnix, SockType::kStream);
EXPECT_CALL(event_listener_, OnConnect(cli.get(), true));
auto cli_connected = task_runner_.CreateCheckpoint("cli_connected");
auto srv_disconnected = task_runner_.CreateCheckpoint("srv_disconnected");
@@ -296,7 +306,8 @@
// Mostly a stress tests. Connects kNumClients clients to the same server and
// tests that all can exchange data and can see the expected sequence of events.
TEST_F(UnixSocketTest, SeveralClients) {
- auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_);
+ auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_,
+ SockFamily::kUnix, SockType::kStream);
ASSERT_TRUE(srv->is_listening());
constexpr size_t kNumClients = 32;
std::unique_ptr<UnixSocket> cli[kNumClients];
@@ -312,7 +323,8 @@
}));
for (size_t i = 0; i < kNumClients; i++) {
- cli[i] = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_);
+ cli[i] = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_,
+ SockFamily::kUnix, SockType::kStream);
EXPECT_CALL(event_listener_, OnConnect(cli[i].get(), true))
.WillOnce(Invoke([](UnixSocket* s, bool success) {
ASSERT_TRUE(success);
@@ -352,7 +364,8 @@
ASSERT_NE(nullptr, mem);
memcpy(mem, "shm rocks", 10);
- auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_);
+ auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_,
+ SockFamily::kUnix, SockType::kStream);
ASSERT_TRUE(srv->is_listening());
// Signal the other process that it can connect.
ASSERT_EQ(1, base::WriteAll(*pipe.wr, ".", 1));
@@ -377,8 +390,8 @@
char sync_cmd = '\0';
ASSERT_EQ(1, PERFETTO_EINTR(read(*pipe.rd, &sync_cmd, 1)));
ASSERT_EQ('.', sync_cmd);
- auto cli =
- UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_);
+ auto cli = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_,
+ SockFamily::kUnix, SockType::kStream);
EXPECT_CALL(event_listener_, OnConnect(cli.get(), true));
auto checkpoint = task_runner_.CreateCheckpoint("change_seen_by_client");
EXPECT_CALL(event_listener_, OnDataAvailable(cli.get()))
@@ -412,7 +425,8 @@
// layer needs to rely on this to validate messages received immediately before
// a client disconnects.
TEST_F(UnixSocketTest, PeerCredentialsRetainedAfterDisconnect) {
- auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_);
+ auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_,
+ SockFamily::kUnix, SockType::kStream);
ASSERT_TRUE(srv->is_listening());
UnixSocket* srv_client_conn = nullptr;
auto srv_connected = task_runner_.CreateCheckpoint("srv_connected");
@@ -428,7 +442,8 @@
srv_connected();
}));
auto cli_connected = task_runner_.CreateCheckpoint("cli_connected");
- auto cli = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_);
+ auto cli = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_,
+ SockFamily::kUnix, SockType::kStream);
EXPECT_CALL(event_listener_, OnConnect(cli.get(), true))
.WillOnce(InvokeWithoutArgs(cli_connected));
@@ -458,7 +473,8 @@
}
TEST_F(UnixSocketTest, BlockingSend) {
- auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_);
+ auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_,
+ SockFamily::kUnix, SockType::kStream);
ASSERT_TRUE(srv->is_listening());
auto all_frames_done = task_runner_.CreateCheckpoint("all_frames_done");
@@ -485,7 +501,8 @@
std::thread tx_thread([] {
TestTaskRunner tx_task_runner;
MockEventListener tx_events;
- auto cli = UnixSocket::Connect(kSocketName, &tx_events, &tx_task_runner);
+ auto cli = UnixSocket::Connect(kSocketName, &tx_events, &tx_task_runner,
+ SockFamily::kUnix, SockType::kStream);
auto cli_connected = tx_task_runner.CreateCheckpoint("cli_connected");
EXPECT_CALL(tx_events, OnConnect(cli.get(), true))
@@ -510,7 +527,8 @@
// sender is in the middle of a large send(), the socket should gracefully give
// up (i.e. Shutdown()) but not crash.
TEST_F(UnixSocketTest, ReceiverDisconnectsDuringSend) {
- auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_);
+ auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_,
+ SockFamily::kUnix, SockType::kStream);
ASSERT_TRUE(srv->is_listening());
const int kTimeoutMs = 30000;
@@ -531,7 +549,8 @@
std::thread tx_thread([] {
TestTaskRunner tx_task_runner;
MockEventListener tx_events;
- auto cli = UnixSocket::Connect(kSocketName, &tx_events, &tx_task_runner);
+ auto cli = UnixSocket::Connect(kSocketName, &tx_events, &tx_task_runner,
+ SockFamily::kUnix, SockType::kStream);
auto cli_connected = tx_task_runner.CreateCheckpoint("cli_connected");
EXPECT_CALL(tx_events, OnConnect(cli.get(), true))
@@ -644,7 +663,8 @@
TEST_F(UnixSocketTest, PartialSendMsgAll) {
UnixSocketRaw send_sock;
UnixSocketRaw recv_sock;
- std::tie(send_sock, recv_sock) = UnixSocketRaw::CreatePair(SockType::kStream);
+ std::tie(send_sock, recv_sock) =
+ UnixSocketRaw::CreatePair(SockFamily::kUnix, SockType::kStream);
ASSERT_TRUE(send_sock);
ASSERT_TRUE(recv_sock);
@@ -717,7 +737,8 @@
}
TEST_F(UnixSocketTest, ReleaseSocket) {
- auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_);
+ auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_,
+ SockFamily::kUnix, SockType::kStream);
ASSERT_TRUE(srv->is_listening());
auto connected = task_runner_.CreateCheckpoint("connected");
UnixSocket* peer = nullptr;
@@ -727,7 +748,8 @@
connected();
}));
- auto cli = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_);
+ auto cli = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_,
+ SockFamily::kUnix, SockType::kStream);
EXPECT_CALL(event_listener_, OnConnect(cli.get(), true));
task_runner_.RunUntilCheckpoint("connected");
srv->Shutdown(true);
@@ -747,6 +769,57 @@
ASSERT_STREQ(buf, "test");
}
+TEST_F(UnixSocketTest, TcpStream) {
+ char host_and_port[32];
+ int attempt = 0;
+ std::unique_ptr<UnixSocket> srv;
+
+ // Try listening on a random port. Some ports might be taken by other syste
+ // services. Do a bunch of attempts on different ports before giving up.
+ do {
+ sprintf(host_and_port, "127.0.0.1:%d", 10000 + (rand() % 10000));
+ srv = UnixSocket::Listen(host_and_port, &event_listener_, &task_runner_,
+ SockFamily::kInet, SockType::kStream);
+ } while ((!srv || !srv->is_listening()) && attempt++ < 10);
+ ASSERT_TRUE(srv->is_listening());
+
+ constexpr size_t kNumClients = 3;
+ std::unique_ptr<UnixSocket> cli[kNumClients];
+ EXPECT_CALL(event_listener_, OnNewIncomingConnection(srv.get(), _))
+ .Times(kNumClients)
+ .WillRepeatedly(Invoke([&](UnixSocket*, UnixSocket* s) {
+ // OnDisconnect() might spuriously happen depending on the dtor order.
+ EXPECT_CALL(event_listener_, OnDisconnect(s)).Times(AtLeast(0));
+ EXPECT_CALL(event_listener_, OnDataAvailable(s))
+ .WillRepeatedly(Invoke([](UnixSocket* cli_sock) {
+ cli_sock->ReceiveString(); // Read connection EOF;
+ }));
+ ASSERT_TRUE(s->Send("welcome", kBlocking));
+ }));
+
+ for (size_t i = 0; i < kNumClients; i++) {
+ cli[i] = UnixSocket::Connect(host_and_port, &event_listener_, &task_runner_,
+ SockFamily::kInet, SockType::kStream);
+ // PERFETTO_ILOG("cli : %p", reinterpret_cast<void*>(cli[i].get()));
+ auto checkpoint = task_runner_.CreateCheckpoint(std::to_string(i));
+ EXPECT_CALL(event_listener_, OnDisconnect(cli[i].get())).Times(AtLeast(0));
+ EXPECT_CALL(event_listener_, OnConnect(cli[i].get(), true));
+ EXPECT_CALL(event_listener_, OnDataAvailable(cli[i].get()))
+ .WillRepeatedly(Invoke([checkpoint](UnixSocket* s) {
+ auto str = s->ReceiveString();
+ if (str == "")
+ return; // Connection EOF.
+ ASSERT_EQ("welcome", str);
+ checkpoint();
+ }));
+ }
+
+ for (size_t i = 0; i < kNumClients; i++) {
+ task_runner_.RunUntilCheckpoint(std::to_string(i));
+ ASSERT_TRUE(Mock::VerifyAndClearExpectations(cli[i].get()));
+ }
+}
+
// TODO(primiano): add a test to check that in the case of a peer sending a fd
// and the other end just doing a recv (without taking it), the fd is closed and
// not left around.