Merge "Add TCP/IP support to UnixSocket"
diff --git a/include/perfetto/ext/base/unix_socket.h b/include/perfetto/ext/base/unix_socket.h
index b4e8e04..4bb9b04 100644
--- a/include/perfetto/ext/base/unix_socket.h
+++ b/include/perfetto/ext/base/unix_socket.h
@@ -37,8 +37,9 @@
// Use arbitrarily high values to avoid that some code accidentally ends up
// assuming that these enum values match the sysroot's SOCK_xxx defines rather
-// than using GetUnixSockType().
+// than using GetSockType() / GetSockFamily().
enum class SockType { kStream = 100, kDgram, kSeqPacket };
+enum class SockFamily { kUnix = 200, kInet };
// UnixSocketRaw is a basic wrapper around UNIX sockets. It exposes wrapper
// methods that take care of most common pitfalls (e.g., marking fd as
@@ -47,17 +48,20 @@
class UnixSocketRaw {
public:
// Creates a new unconnected unix socket.
- static UnixSocketRaw CreateMayFail(SockType t) { return UnixSocketRaw(t); }
+ static UnixSocketRaw CreateMayFail(SockFamily family, SockType type) {
+ return UnixSocketRaw(family, type);
+ }
// Crates a pair of connected sockets.
- static std::pair<UnixSocketRaw, UnixSocketRaw> CreatePair(SockType);
+ static std::pair<UnixSocketRaw, UnixSocketRaw> CreatePair(SockFamily,
+ SockType);
// Creates an uninitialized unix socket.
UnixSocketRaw();
// Creates a unix socket adopting an existing file descriptor. This is
// typically used to inherit fds from init via environment variables.
- UnixSocketRaw(ScopedFile, SockType);
+ UnixSocketRaw(ScopedFile, SockFamily, SockType);
~UnixSocketRaw() = default;
UnixSocketRaw(UnixSocketRaw&&) noexcept = default;
@@ -73,6 +77,7 @@
bool IsBlocking() const;
void RetainOnExec();
SockType type() const { return type_; }
+ SockFamily family() const { return family_; }
int fd() const { return *fd_; }
explicit operator bool() const { return !!fd_; }
@@ -98,13 +103,14 @@
static void ShiftMsgHdr(size_t n, struct msghdr* msg);
private:
- explicit UnixSocketRaw(SockType);
+ UnixSocketRaw(SockFamily, SockType);
UnixSocketRaw(const UnixSocketRaw&) = delete;
UnixSocketRaw& operator=(const UnixSocketRaw&) = delete;
ScopedFile fd_;
- SockType type_{SockType::kStream};
+ SockFamily family_ = SockFamily::kUnix;
+ SockType type_ = SockType::kStream;
};
// A non-blocking UNIX domain socket. Allows also to transfer file descriptors.
@@ -179,22 +185,26 @@
enum class BlockingMode { kNonBlocking, kBlocking };
- // Creates a Unix domain socket and starts listening. If |socket_name|
- // starts with a '@', an abstract socket will be created (Linux/Android only).
+ // Creates a socket and starts listening. If SockFamily::kUnix and
+ // |socket_name| starts with a '@', an abstract UNIX dmoain socket will be
+ // created instead of a filesystem-linked UNIX socket (Linux/Android only).
+ // If SockFamily::kInet, |socket_name| is host:port (e.g., "1.2.3.4:8000").
// Returns always an instance. In case of failure (e.g., another socket
// with the same name is already listening) the returned socket will have
// is_listening() == false and last_error() will contain the failure reason.
static std::unique_ptr<UnixSocket> Listen(const std::string& socket_name,
EventListener*,
TaskRunner*,
- SockType = SockType::kStream);
+ SockFamily,
+ SockType);
// Attaches to a pre-existing socket. The socket must have been created in
// SOCK_STREAM mode and the caller must have called bind() on it.
static std::unique_ptr<UnixSocket> Listen(ScopedFile,
EventListener*,
TaskRunner*,
- SockType = SockType::kStream);
+ SockFamily,
+ SockType);
// Creates a Unix domain socket and connects to the listening endpoint.
// Returns always an instance. EventListener::OnConnect(bool success) will
@@ -202,14 +212,15 @@
static std::unique_ptr<UnixSocket> Connect(const std::string& socket_name,
EventListener*,
TaskRunner*,
- SockType = SockType::kStream);
+ SockFamily,
+ SockType);
// Constructs a UnixSocket using the given connected socket.
- static std::unique_ptr<UnixSocket> AdoptConnected(
- ScopedFile fd,
- EventListener* event_listener,
- TaskRunner* task_runner,
- SockType sock_type);
+ static std::unique_ptr<UnixSocket> AdoptConnected(ScopedFile,
+ EventListener*,
+ TaskRunner*,
+ SockFamily,
+ SockType);
UnixSocket(const UnixSocket&) = delete;
UnixSocket& operator=(const UnixSocket&) = delete;
@@ -303,8 +314,13 @@
UnixSocketRaw ReleaseSocket();
private:
- UnixSocket(EventListener*, TaskRunner*, SockType);
- UnixSocket(EventListener*, TaskRunner*, ScopedFile, State, SockType);
+ UnixSocket(EventListener*, TaskRunner*, SockFamily, SockType);
+ UnixSocket(EventListener*,
+ TaskRunner*,
+ ScopedFile,
+ State,
+ SockFamily,
+ SockType);
// Called once by the corresponding public static factory methods.
void DoConnect(const std::string& socket_name);
diff --git a/src/base/unix_socket.cc b/src/base/unix_socket.cc
index 663cb14..e63f978 100644
--- a/src/base/unix_socket.cc
+++ b/src/base/unix_socket.cc
@@ -18,6 +18,8 @@
#include <errno.h>
#include <fcntl.h>
+#include <netdb.h>
+#include <netinet/in.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
@@ -32,6 +34,7 @@
#include "perfetto/base/build_config.h"
#include "perfetto/base/logging.h"
#include "perfetto/base/task_runner.h"
+#include "perfetto/ext/base/string_utils.h"
#include "perfetto/ext/base/utils.h"
#if PERFETTO_BUILDFLAG(PERFETTO_OS_MACOSX)
@@ -48,6 +51,7 @@
#endif
namespace {
+
// MSG_NOSIGNAL is not supported on Mac OS X, but in that case the socket is
// created with SO_NOSIGPIPE (See InitializeSocket()).
#if PERFETTO_BUILDFLAG(PERFETTO_OS_MACOSX)
@@ -63,7 +67,36 @@
using CBufLenType = socklen_t;
#endif
-inline int GetUnixSockType(SockType type) {
+// A wrapper around variable-size sockaddr structs.
+// This is solving the following problem: when calling connect() or bind(), the
+// caller needs to take care to allocate the right struct (sockaddr_un for
+// AF_UNIX, sockaddr_in for AF_INET). Those structs have different sizes and,
+// more importantly, are bigger than the base struct sockaddr.
+struct SockaddrAny {
+ SockaddrAny() : size() {}
+ SockaddrAny(const void* addr, socklen_t sz) : data(new char[sz]), size(sz) {
+ memcpy(data.get(), addr, static_cast<size_t>(size));
+ }
+
+ const struct sockaddr* addr() const {
+ return reinterpret_cast<const struct sockaddr*>(data.get());
+ }
+
+ std::unique_ptr<char[]> data;
+ socklen_t size;
+};
+
+inline int GetSockFamily(SockFamily family) {
+ switch (family) {
+ case SockFamily::kUnix:
+ return AF_UNIX;
+ case SockFamily::kInet:
+ return AF_INET;
+ }
+ PERFETTO_CHECK(false); // For GCC.
+}
+
+inline int GetSockType(SockType type) {
switch (type) {
case SockType::kStream:
return SOCK_STREAM;
@@ -72,25 +105,42 @@
case SockType::kSeqPacket:
return SOCK_SEQPACKET;
}
- PERFETTO_CHECK(false);
+ PERFETTO_CHECK(false); // For GCC.
}
-bool MakeSockAddr(const std::string& socket_name,
- sockaddr_un* addr,
- socklen_t* addr_size) {
- memset(addr, 0, sizeof(*addr));
- const size_t name_len = socket_name.size();
- if (name_len >= sizeof(addr->sun_path)) {
- errno = ENAMETOOLONG;
- return false;
+SockaddrAny MakeSockAddr(SockFamily family, const std::string& socket_name) {
+ switch (family) {
+ case SockFamily::kUnix: {
+ struct sockaddr_un saddr {};
+ const size_t name_len = socket_name.size();
+ if (name_len >= sizeof(saddr.sun_path)) {
+ errno = ENAMETOOLONG;
+ return SockaddrAny();
+ }
+ memcpy(saddr.sun_path, socket_name.data(), name_len);
+ if (saddr.sun_path[0] == '@')
+ saddr.sun_path[0] = '\0';
+ saddr.sun_family = AF_UNIX;
+ auto size = static_cast<socklen_t>(
+ __builtin_offsetof(sockaddr_un, sun_path) + name_len + 1);
+ PERFETTO_CHECK(static_cast<size_t>(size) <= sizeof(saddr));
+ return SockaddrAny(&saddr, size);
+ }
+ case SockFamily::kInet: {
+ auto parts = SplitString(socket_name, ":");
+ PERFETTO_CHECK(parts.size() == 2);
+ struct addrinfo* addr_info = nullptr;
+ struct addrinfo hints {};
+ hints.ai_family = AF_INET;
+ PERFETTO_CHECK(getaddrinfo(parts[0].c_str(), parts[1].c_str(), &hints,
+ &addr_info) == 0);
+ PERFETTO_CHECK(addr_info->ai_family == AF_INET);
+ SockaddrAny res(addr_info->ai_addr, addr_info->ai_addrlen);
+ freeaddrinfo(addr_info);
+ return res;
+ }
}
- memcpy(addr->sun_path, socket_name.data(), name_len);
- if (addr->sun_path[0] == '@')
- addr->sun_path[0] = '\0';
- addr->sun_family = AF_UNIX;
- *addr_size = static_cast<socklen_t>(
- __builtin_offsetof(sockaddr_un, sun_path) + name_len + 1);
- return true;
+ PERFETTO_CHECK(false); // For GCC.
}
} // namespace
@@ -122,29 +172,39 @@
}
// static
-std::pair<UnixSocketRaw, UnixSocketRaw> UnixSocketRaw::CreatePair(SockType t) {
+std::pair<UnixSocketRaw, UnixSocketRaw> UnixSocketRaw::CreatePair(
+ SockFamily family,
+ SockType type) {
int fds[2];
- if (socketpair(AF_UNIX, GetUnixSockType(t), 0, fds) != 0)
+ if (socketpair(GetSockFamily(family), GetSockType(type), 0, fds) != 0)
return std::make_pair(UnixSocketRaw(), UnixSocketRaw());
- return std::make_pair(UnixSocketRaw(ScopedFile(fds[0]), t),
- UnixSocketRaw(ScopedFile(fds[1]), t));
+ return std::make_pair(UnixSocketRaw(ScopedFile(fds[0]), family, type),
+ UnixSocketRaw(ScopedFile(fds[1]), family, type));
}
UnixSocketRaw::UnixSocketRaw() = default;
-UnixSocketRaw::UnixSocketRaw(SockType type)
- : UnixSocketRaw(ScopedFile(socket(AF_UNIX, GetUnixSockType(type), 0)),
- type) {}
+UnixSocketRaw::UnixSocketRaw(SockFamily family, SockType type)
+ : UnixSocketRaw(
+ ScopedFile(socket(GetSockFamily(family), GetSockType(type), 0)),
+ family,
+ type) {}
-UnixSocketRaw::UnixSocketRaw(ScopedFile fd, SockType type)
- : fd_(std::move(fd)), type_(type) {
+UnixSocketRaw::UnixSocketRaw(ScopedFile fd, SockFamily family, SockType type)
+ : fd_(std::move(fd)), family_(family), type_(type) {
PERFETTO_CHECK(fd_);
#if PERFETTO_BUILDFLAG(PERFETTO_OS_MACOSX)
const int no_sigpipe = 1;
setsockopt(*fd_, SOL_SOCKET, SO_NOSIGPIPE, &no_sigpipe, sizeof(no_sigpipe));
#endif
+ if (family == SockFamily::kInet) {
+ int flag = 1;
+ PERFETTO_CHECK(
+ !setsockopt(*fd_, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof(flag)));
+ }
+
// There is no reason why a socket should outlive the process in case of
// exec() by default, this is just working around a broken unix design.
int fcntl_res = fcntl(*fd_, F_SETFD, FD_CLOEXEC);
@@ -178,12 +238,11 @@
bool UnixSocketRaw::Bind(const std::string& socket_name) {
PERFETTO_DCHECK(fd_);
- sockaddr_un addr;
- socklen_t addr_size;
- if (!MakeSockAddr(socket_name, &addr, &addr_size))
+ SockaddrAny addr = MakeSockAddr(family_, socket_name);
+ if (addr.size == 0)
return false;
- if (bind(*fd_, reinterpret_cast<sockaddr*>(&addr), addr_size)) {
+ if (bind(*fd_, addr.addr(), addr.size)) {
PERFETTO_DPLOG("bind(%s)", socket_name.c_str());
return false;
}
@@ -199,13 +258,11 @@
bool UnixSocketRaw::Connect(const std::string& socket_name) {
PERFETTO_DCHECK(fd_);
- sockaddr_un addr;
- socklen_t addr_size;
- if (!MakeSockAddr(socket_name, &addr, &addr_size))
+ SockaddrAny addr = MakeSockAddr(family_, socket_name);
+ if (addr.size == 0)
return false;
- int res = PERFETTO_EINTR(
- connect(*fd_, reinterpret_cast<sockaddr*>(&addr), addr_size));
+ int res = PERFETTO_EINTR(connect(*fd_, addr.addr(), addr.size));
if (res && errno != EINPROGRESS)
return false;
@@ -371,32 +428,36 @@
std::unique_ptr<UnixSocket> UnixSocket::Listen(const std::string& socket_name,
EventListener* event_listener,
TaskRunner* task_runner,
+ SockFamily sock_family,
SockType sock_type) {
- auto sock_raw = UnixSocketRaw::CreateMayFail(sock_type);
+ auto sock_raw = UnixSocketRaw::CreateMayFail(sock_family, sock_type);
if (!sock_raw || !sock_raw.Bind(socket_name))
return nullptr;
// Forward the call to the Listen() overload below.
- return Listen(sock_raw.ReleaseFd(), event_listener, task_runner, sock_type);
+ return Listen(sock_raw.ReleaseFd(), event_listener, task_runner, sock_family,
+ sock_type);
}
// static
std::unique_ptr<UnixSocket> UnixSocket::Listen(ScopedFile fd,
EventListener* event_listener,
TaskRunner* task_runner,
+ SockFamily sock_family,
SockType sock_type) {
return std::unique_ptr<UnixSocket>(
new UnixSocket(event_listener, task_runner, std::move(fd),
- State::kListening, sock_type));
+ State::kListening, sock_family, sock_type));
}
// static
std::unique_ptr<UnixSocket> UnixSocket::Connect(const std::string& socket_name,
EventListener* event_listener,
TaskRunner* task_runner,
+ SockFamily sock_family,
SockType sock_type) {
std::unique_ptr<UnixSocket> sock(
- new UnixSocket(event_listener, task_runner, sock_type));
+ new UnixSocket(event_listener, task_runner, sock_family, sock_type));
sock->DoConnect(socket_name);
return sock;
}
@@ -406,25 +467,29 @@
ScopedFile fd,
EventListener* event_listener,
TaskRunner* task_runner,
+ SockFamily sock_family,
SockType sock_type) {
return std::unique_ptr<UnixSocket>(
new UnixSocket(event_listener, task_runner, std::move(fd),
- State::kConnected, sock_type));
+ State::kConnected, sock_family, sock_type));
}
UnixSocket::UnixSocket(EventListener* event_listener,
TaskRunner* task_runner,
+ SockFamily sock_family,
SockType sock_type)
: UnixSocket(event_listener,
task_runner,
ScopedFile(),
State::kDisconnected,
+ sock_family,
sock_type) {}
UnixSocket::UnixSocket(EventListener* event_listener,
TaskRunner* task_runner,
ScopedFile adopt_fd,
State adopt_state,
+ SockFamily sock_family,
SockType sock_type)
: event_listener_(event_listener),
task_runner_(task_runner),
@@ -432,14 +497,14 @@
state_ = State::kDisconnected;
if (adopt_state == State::kDisconnected) {
PERFETTO_DCHECK(!adopt_fd);
- sock_raw_ = UnixSocketRaw::CreateMayFail(sock_type);
+ sock_raw_ = UnixSocketRaw::CreateMayFail(sock_family, sock_type);
if (!sock_raw_) {
last_error_ = errno;
return;
}
} else if (adopt_state == State::kConnected) {
PERFETTO_DCHECK(adopt_fd);
- sock_raw_ = UnixSocketRaw(std::move(adopt_fd), sock_type);
+ sock_raw_ = UnixSocketRaw(std::move(adopt_fd), sock_family, sock_type);
state_ = State::kConnected;
ReadPeerCredentials();
} else if (adopt_state == State::kListening) {
@@ -451,7 +516,7 @@
return;
}
- sock_raw_ = UnixSocketRaw(std::move(adopt_fd), sock_type);
+ sock_raw_ = UnixSocketRaw(std::move(adopt_fd), sock_family, sock_type);
if (!sock_raw_.Listen()) {
last_error_ = errno;
PERFETTO_DPLOG("listen()");
@@ -468,6 +533,7 @@
sock_raw_.SetBlocking(false);
WeakPtr<UnixSocket> weak_ptr = weak_ptr_factory_.GetWeakPtr();
+
task_runner_->AddFileDescriptorWatch(sock_raw_.fd(), [weak_ptr] {
if (weak_ptr)
weak_ptr->OnEvent();
@@ -522,6 +588,10 @@
}
void UnixSocket::ReadPeerCredentials() {
+ // Peer credentials are supported only on AF_UNIX sockets.
+ if (sock_raw_.family() != SockFamily::kUnix)
+ return;
+
#if PERFETTO_BUILDFLAG(PERFETTO_OS_LINUX) || \
PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
struct ucred user_cred;
@@ -554,6 +624,7 @@
socklen_t err_len = sizeof(sock_err);
int res =
getsockopt(sock_raw_.fd(), SOL_SOCKET, SO_ERROR, &sock_err, &err_len);
+
if (res == 0 && sock_err == EINPROGRESS)
return; // Not connected yet, just a spurious FD watch wakeup.
if (res == 0 && sock_err == 0) {
@@ -561,6 +632,7 @@
state_ = State::kConnected;
return event_listener_->OnConnect(this, true /* connected */);
}
+ PERFETTO_DLOG("Connection error: %s", strerror(sock_err));
last_error_ = sock_err;
Shutdown(false);
return event_listener_->OnConnect(this, false /* connected */);
@@ -571,15 +643,15 @@
// There could be more than one incoming connection behind each FD watch
// notification. Drain'em all.
for (;;) {
- sockaddr_un cli_addr = {};
+ struct sockaddr_in cli_addr {};
socklen_t size = sizeof(cli_addr);
ScopedFile new_fd(PERFETTO_EINTR(accept(
sock_raw_.fd(), reinterpret_cast<sockaddr*>(&cli_addr), &size)));
if (!new_fd)
return;
- std::unique_ptr<UnixSocket> new_sock(
- new UnixSocket(event_listener_, task_runner_, std::move(new_fd),
- State::kConnected, sock_raw_.type()));
+ std::unique_ptr<UnixSocket> new_sock(new UnixSocket(
+ event_listener_, task_runner_, std::move(new_fd), State::kConnected,
+ sock_raw_.family(), sock_raw_.type()));
event_listener_->OnNewIncomingConnection(this, std::move(new_sock));
}
}
diff --git a/src/base/unix_socket_unittest.cc b/src/base/unix_socket_unittest.cc
index 8f54cab..138f631 100644
--- a/src/base/unix_socket_unittest.cc
+++ b/src/base/unix_socket_unittest.cc
@@ -84,7 +84,8 @@
};
TEST_F(UnixSocketTest, ConnectionFailureIfUnreachable) {
- auto cli = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_);
+ auto cli = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_,
+ SockFamily::kUnix, SockType::kStream);
ASSERT_FALSE(cli->is_connected());
auto checkpoint = task_runner_.CreateCheckpoint("failure");
EXPECT_CALL(event_listener_, OnConnect(cli.get(), false))
@@ -95,7 +96,8 @@
// Both server and client should see an OnDisconnect() if the server drops
// incoming connections immediately as they are created.
TEST_F(UnixSocketTest, ConnectionImmediatelyDroppedByServer) {
- auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_);
+ auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_,
+ SockFamily::kUnix, SockType::kStream);
ASSERT_TRUE(srv->is_listening());
// The server will immediately shutdown the connection upon
@@ -110,7 +112,8 @@
}));
auto checkpoint = task_runner_.CreateCheckpoint("cli_connected");
- auto cli = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_);
+ auto cli = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_,
+ SockFamily::kUnix, SockType::kStream);
EXPECT_CALL(event_listener_, OnConnect(cli.get(), true))
.WillOnce(InvokeWithoutArgs(checkpoint));
task_runner_.RunUntilCheckpoint("cli_connected");
@@ -125,10 +128,12 @@
}
TEST_F(UnixSocketTest, ClientAndServerExchangeData) {
- auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_);
+ auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_,
+ SockFamily::kUnix, SockType::kStream);
ASSERT_TRUE(srv->is_listening());
- auto cli = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_);
+ auto cli = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_,
+ SockFamily::kUnix, SockType::kStream);
EXPECT_CALL(event_listener_, OnConnect(cli.get(), true));
auto cli_connected = task_runner_.CreateCheckpoint("cli_connected");
auto srv_disconnected = task_runner_.CreateCheckpoint("srv_disconnected");
@@ -184,10 +189,12 @@
constexpr char srv_str[] = "srv>cli";
TEST_F(UnixSocketTest, ClientAndServerExchangeFDs) {
- auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_);
+ auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_,
+ SockFamily::kUnix, SockType::kStream);
ASSERT_TRUE(srv->is_listening());
- auto cli = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_);
+ auto cli = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_,
+ SockFamily::kUnix, SockType::kStream);
EXPECT_CALL(event_listener_, OnConnect(cli.get(), true));
auto cli_connected = task_runner_.CreateCheckpoint("cli_connected");
auto srv_disconnected = task_runner_.CreateCheckpoint("srv_disconnected");
@@ -266,13 +273,16 @@
}
TEST_F(UnixSocketTest, ListenWithPassedFileDescriptor) {
- auto sock_raw = UnixSocketRaw::CreateMayFail(SockType::kStream);
+ auto sock_raw =
+ UnixSocketRaw::CreateMayFail(SockFamily::kUnix, SockType::kStream);
ASSERT_TRUE(sock_raw.Bind(kSocketName));
auto fd = sock_raw.ReleaseFd();
- auto srv = UnixSocket::Listen(std::move(fd), &event_listener_, &task_runner_);
+ auto srv = UnixSocket::Listen(std::move(fd), &event_listener_, &task_runner_,
+ SockFamily::kUnix, SockType::kStream);
ASSERT_TRUE(srv->is_listening());
- auto cli = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_);
+ auto cli = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_,
+ SockFamily::kUnix, SockType::kStream);
EXPECT_CALL(event_listener_, OnConnect(cli.get(), true));
auto cli_connected = task_runner_.CreateCheckpoint("cli_connected");
auto srv_disconnected = task_runner_.CreateCheckpoint("srv_disconnected");
@@ -296,7 +306,8 @@
// Mostly a stress tests. Connects kNumClients clients to the same server and
// tests that all can exchange data and can see the expected sequence of events.
TEST_F(UnixSocketTest, SeveralClients) {
- auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_);
+ auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_,
+ SockFamily::kUnix, SockType::kStream);
ASSERT_TRUE(srv->is_listening());
constexpr size_t kNumClients = 32;
std::unique_ptr<UnixSocket> cli[kNumClients];
@@ -312,7 +323,8 @@
}));
for (size_t i = 0; i < kNumClients; i++) {
- cli[i] = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_);
+ cli[i] = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_,
+ SockFamily::kUnix, SockType::kStream);
EXPECT_CALL(event_listener_, OnConnect(cli[i].get(), true))
.WillOnce(Invoke([](UnixSocket* s, bool success) {
ASSERT_TRUE(success);
@@ -352,7 +364,8 @@
ASSERT_NE(nullptr, mem);
memcpy(mem, "shm rocks", 10);
- auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_);
+ auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_,
+ SockFamily::kUnix, SockType::kStream);
ASSERT_TRUE(srv->is_listening());
// Signal the other process that it can connect.
ASSERT_EQ(1, base::WriteAll(*pipe.wr, ".", 1));
@@ -377,8 +390,8 @@
char sync_cmd = '\0';
ASSERT_EQ(1, PERFETTO_EINTR(read(*pipe.rd, &sync_cmd, 1)));
ASSERT_EQ('.', sync_cmd);
- auto cli =
- UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_);
+ auto cli = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_,
+ SockFamily::kUnix, SockType::kStream);
EXPECT_CALL(event_listener_, OnConnect(cli.get(), true));
auto checkpoint = task_runner_.CreateCheckpoint("change_seen_by_client");
EXPECT_CALL(event_listener_, OnDataAvailable(cli.get()))
@@ -412,7 +425,8 @@
// layer needs to rely on this to validate messages received immediately before
// a client disconnects.
TEST_F(UnixSocketTest, PeerCredentialsRetainedAfterDisconnect) {
- auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_);
+ auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_,
+ SockFamily::kUnix, SockType::kStream);
ASSERT_TRUE(srv->is_listening());
UnixSocket* srv_client_conn = nullptr;
auto srv_connected = task_runner_.CreateCheckpoint("srv_connected");
@@ -428,7 +442,8 @@
srv_connected();
}));
auto cli_connected = task_runner_.CreateCheckpoint("cli_connected");
- auto cli = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_);
+ auto cli = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_,
+ SockFamily::kUnix, SockType::kStream);
EXPECT_CALL(event_listener_, OnConnect(cli.get(), true))
.WillOnce(InvokeWithoutArgs(cli_connected));
@@ -458,7 +473,8 @@
}
TEST_F(UnixSocketTest, BlockingSend) {
- auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_);
+ auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_,
+ SockFamily::kUnix, SockType::kStream);
ASSERT_TRUE(srv->is_listening());
auto all_frames_done = task_runner_.CreateCheckpoint("all_frames_done");
@@ -485,7 +501,8 @@
std::thread tx_thread([] {
TestTaskRunner tx_task_runner;
MockEventListener tx_events;
- auto cli = UnixSocket::Connect(kSocketName, &tx_events, &tx_task_runner);
+ auto cli = UnixSocket::Connect(kSocketName, &tx_events, &tx_task_runner,
+ SockFamily::kUnix, SockType::kStream);
auto cli_connected = tx_task_runner.CreateCheckpoint("cli_connected");
EXPECT_CALL(tx_events, OnConnect(cli.get(), true))
@@ -510,7 +527,8 @@
// sender is in the middle of a large send(), the socket should gracefully give
// up (i.e. Shutdown()) but not crash.
TEST_F(UnixSocketTest, ReceiverDisconnectsDuringSend) {
- auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_);
+ auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_,
+ SockFamily::kUnix, SockType::kStream);
ASSERT_TRUE(srv->is_listening());
const int kTimeoutMs = 30000;
@@ -531,7 +549,8 @@
std::thread tx_thread([] {
TestTaskRunner tx_task_runner;
MockEventListener tx_events;
- auto cli = UnixSocket::Connect(kSocketName, &tx_events, &tx_task_runner);
+ auto cli = UnixSocket::Connect(kSocketName, &tx_events, &tx_task_runner,
+ SockFamily::kUnix, SockType::kStream);
auto cli_connected = tx_task_runner.CreateCheckpoint("cli_connected");
EXPECT_CALL(tx_events, OnConnect(cli.get(), true))
@@ -644,7 +663,8 @@
TEST_F(UnixSocketTest, PartialSendMsgAll) {
UnixSocketRaw send_sock;
UnixSocketRaw recv_sock;
- std::tie(send_sock, recv_sock) = UnixSocketRaw::CreatePair(SockType::kStream);
+ std::tie(send_sock, recv_sock) =
+ UnixSocketRaw::CreatePair(SockFamily::kUnix, SockType::kStream);
ASSERT_TRUE(send_sock);
ASSERT_TRUE(recv_sock);
@@ -717,7 +737,8 @@
}
TEST_F(UnixSocketTest, ReleaseSocket) {
- auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_);
+ auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_,
+ SockFamily::kUnix, SockType::kStream);
ASSERT_TRUE(srv->is_listening());
auto connected = task_runner_.CreateCheckpoint("connected");
UnixSocket* peer = nullptr;
@@ -727,7 +748,8 @@
connected();
}));
- auto cli = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_);
+ auto cli = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_,
+ SockFamily::kUnix, SockType::kStream);
EXPECT_CALL(event_listener_, OnConnect(cli.get(), true));
task_runner_.RunUntilCheckpoint("connected");
srv->Shutdown(true);
@@ -747,6 +769,57 @@
ASSERT_STREQ(buf, "test");
}
+TEST_F(UnixSocketTest, TcpStream) {
+ char host_and_port[32];
+ int attempt = 0;
+ std::unique_ptr<UnixSocket> srv;
+
+ // Try listening on a random port. Some ports might be taken by other syste
+ // services. Do a bunch of attempts on different ports before giving up.
+ do {
+ sprintf(host_and_port, "127.0.0.1:%d", 10000 + (rand() % 10000));
+ srv = UnixSocket::Listen(host_and_port, &event_listener_, &task_runner_,
+ SockFamily::kInet, SockType::kStream);
+ } while ((!srv || !srv->is_listening()) && attempt++ < 10);
+ ASSERT_TRUE(srv->is_listening());
+
+ constexpr size_t kNumClients = 3;
+ std::unique_ptr<UnixSocket> cli[kNumClients];
+ EXPECT_CALL(event_listener_, OnNewIncomingConnection(srv.get(), _))
+ .Times(kNumClients)
+ .WillRepeatedly(Invoke([&](UnixSocket*, UnixSocket* s) {
+ // OnDisconnect() might spuriously happen depending on the dtor order.
+ EXPECT_CALL(event_listener_, OnDisconnect(s)).Times(AtLeast(0));
+ EXPECT_CALL(event_listener_, OnDataAvailable(s))
+ .WillRepeatedly(Invoke([](UnixSocket* cli_sock) {
+ cli_sock->ReceiveString(); // Read connection EOF;
+ }));
+ ASSERT_TRUE(s->Send("welcome", kBlocking));
+ }));
+
+ for (size_t i = 0; i < kNumClients; i++) {
+ cli[i] = UnixSocket::Connect(host_and_port, &event_listener_, &task_runner_,
+ SockFamily::kInet, SockType::kStream);
+ // PERFETTO_ILOG("cli : %p", reinterpret_cast<void*>(cli[i].get()));
+ auto checkpoint = task_runner_.CreateCheckpoint(std::to_string(i));
+ EXPECT_CALL(event_listener_, OnDisconnect(cli[i].get())).Times(AtLeast(0));
+ EXPECT_CALL(event_listener_, OnConnect(cli[i].get(), true));
+ EXPECT_CALL(event_listener_, OnDataAvailable(cli[i].get()))
+ .WillRepeatedly(Invoke([checkpoint](UnixSocket* s) {
+ auto str = s->ReceiveString();
+ if (str == "")
+ return; // Connection EOF.
+ ASSERT_EQ("welcome", str);
+ checkpoint();
+ }));
+ }
+
+ for (size_t i = 0; i < kNumClients; i++) {
+ task_runner_.RunUntilCheckpoint(std::to_string(i));
+ ASSERT_TRUE(Mock::VerifyAndClearExpectations(cli[i].get()));
+ }
+}
+
// TODO(primiano): add a test to check that in the case of a peer sending a fd
// and the other end just doing a recv (without taking it), the fd is closed and
// not left around.
diff --git a/src/ipc/client_impl.cc b/src/ipc/client_impl.cc
index c3bd830..b563d95 100644
--- a/src/ipc/client_impl.cc
+++ b/src/ipc/client_impl.cc
@@ -43,7 +43,9 @@
ClientImpl::ClientImpl(const char* socket_name, base::TaskRunner* task_runner)
: task_runner_(task_runner), weak_ptr_factory_(this) {
- sock_ = base::UnixSocket::Connect(socket_name, this, task_runner);
+ sock_ = base::UnixSocket::Connect(socket_name, this, task_runner,
+ base::SockFamily::kUnix,
+ base::SockType::kStream);
}
ClientImpl::~ClientImpl() {
diff --git a/src/ipc/client_impl_unittest.cc b/src/ipc/client_impl_unittest.cc
index 84a9dd1..e8e634e 100644
--- a/src/ipc/client_impl_unittest.cc
+++ b/src/ipc/client_impl_unittest.cc
@@ -103,7 +103,9 @@
explicit FakeHost(base::TaskRunner* task_runner) {
DESTROY_TEST_SOCK(kSockName);
- listening_sock = base::UnixSocket::Listen(kSockName, this, task_runner);
+ listening_sock = base::UnixSocket::Listen(kSockName, this, task_runner,
+ base::SockFamily::kUnix,
+ base::SockType::kStream);
EXPECT_TRUE(listening_sock->is_listening());
}
~FakeHost() override { DESTROY_TEST_SOCK(kSockName); }
diff --git a/src/ipc/host_impl.cc b/src/ipc/host_impl.cc
index 7a7d95c..0a56b85 100644
--- a/src/ipc/host_impl.cc
+++ b/src/ipc/host_impl.cc
@@ -55,13 +55,17 @@
HostImpl::HostImpl(base::ScopedFile socket_fd, base::TaskRunner* task_runner)
: task_runner_(task_runner), weak_ptr_factory_(this) {
PERFETTO_DCHECK_THREAD(thread_checker_);
- sock_ = base::UnixSocket::Listen(std::move(socket_fd), this, task_runner_);
+ sock_ = base::UnixSocket::Listen(std::move(socket_fd), this, task_runner_,
+ base::SockFamily::kUnix,
+ base::SockType::kStream);
}
HostImpl::HostImpl(const char* socket_name, base::TaskRunner* task_runner)
: task_runner_(task_runner), weak_ptr_factory_(this) {
PERFETTO_DCHECK_THREAD(thread_checker_);
- sock_ = base::UnixSocket::Listen(socket_name, this, task_runner_);
+ sock_ = base::UnixSocket::Listen(socket_name, this, task_runner_,
+ base::SockFamily::kUnix,
+ base::SockType::kStream);
}
HostImpl::~HostImpl() = default;
diff --git a/src/ipc/host_impl_unittest.cc b/src/ipc/host_impl_unittest.cc
index 70215ff..0624cd7 100644
--- a/src/ipc/host_impl_unittest.cc
+++ b/src/ipc/host_impl_unittest.cc
@@ -88,7 +88,9 @@
MOCK_METHOD0(OnRequestError, void());
explicit FakeClient(base::TaskRunner* task_runner) {
- sock_ = base::UnixSocket::Connect(kSockName, this, task_runner);
+ sock_ = base::UnixSocket::Connect(kSockName, this, task_runner,
+ base::SockFamily::kUnix,
+ base::SockType::kStream);
}
~FakeClient() override = default;
@@ -334,15 +336,15 @@
sizeof(kFileContent))),
sizeof(kFileContent));
EXPECT_CALL(*fake_service, OnFakeMethod1(_, _))
- .WillOnce(Invoke([on_reply_sent, &tx_file](const RequestProto&,
- DeferredBase* reply) {
- std::unique_ptr<ReplyProto> reply_args(new ReplyProto());
- auto async_res = AsyncResult<ProtoMessage>(
- std::unique_ptr<ProtoMessage>(reply_args.release()));
- async_res.set_fd(tx_file.fd());
- reply->Resolve(std::move(async_res));
- on_reply_sent();
- }));
+ .WillOnce(Invoke(
+ [on_reply_sent, &tx_file](const RequestProto&, DeferredBase* reply) {
+ std::unique_ptr<ReplyProto> reply_args(new ReplyProto());
+ auto async_res = AsyncResult<ProtoMessage>(
+ std::unique_ptr<ProtoMessage>(reply_args.release()));
+ async_res.set_fd(tx_file.fd());
+ reply->Resolve(std::move(async_res));
+ on_reply_sent();
+ }));
task_runner_->RunUntilCheckpoint("on_reply_sent");
tx_file.ReleaseFD();
diff --git a/src/profiling/memory/client.cc b/src/profiling/memory/client.cc
index e3a0e7d..8065733 100644
--- a/src/profiling/memory/client.cc
+++ b/src/profiling/memory/client.cc
@@ -101,7 +101,8 @@
// static
base::Optional<base::UnixSocketRaw> Client::ConnectToHeapprofd(
const std::string& sock_name) {
- auto sock = base::UnixSocketRaw::CreateMayFail(base::SockType::kStream);
+ auto sock = base::UnixSocketRaw::CreateMayFail(base::SockFamily::kUnix,
+ base::SockType::kStream);
if (!sock || !sock.Connect(sock_name)) {
PERFETTO_PLOG("Failed to connect to %s", sock_name.c_str());
return base::nullopt;
diff --git a/src/profiling/memory/heapprofd_producer.cc b/src/profiling/memory/heapprofd_producer.cc
index d1024cc..c10adc3 100644
--- a/src/profiling/memory/heapprofd_producer.cc
+++ b/src/profiling/memory/heapprofd_producer.cc
@@ -137,7 +137,7 @@
PERFETTO_DCHECK(mode_ == HeapprofdMode::kChild);
auto socket = base::UnixSocket::AdoptConnected(
std::move(inherited_fd_), &socket_delegate_, task_runner_,
- base::SockType::kStream);
+ base::SockFamily::kUnix, base::SockType::kStream);
HandleClientConnection(std::move(socket), target_process_);
}
@@ -610,7 +610,6 @@
}
DataSource& data_source = it->second;
-
for (std::pair<const pid_t, ProcessState>& pid_and_process_state :
data_source.process_states) {
pid_t pid = pid_and_process_state.first;
diff --git a/src/profiling/memory/main.cc b/src/profiling/memory/main.cc
index 7df39e7..ff5ac97 100644
--- a/src/profiling/memory/main.cc
+++ b/src/profiling/memory/main.cc
@@ -159,9 +159,9 @@
HeapprofdProducer producer(HeapprofdMode::kCentral, &task_runner);
int listening_raw_socket = GetListeningSocket();
- auto listening_socket =
- base::UnixSocket::Listen(base::ScopedFile(listening_raw_socket),
- &producer.socket_delegate(), &task_runner);
+ auto listening_socket = base::UnixSocket::Listen(
+ base::ScopedFile(listening_raw_socket), &producer.socket_delegate(),
+ &task_runner, base::SockFamily::kUnix, base::SockType::kStream);
struct sigaction action = {};
action.sa_handler = [](int) { g_dump_evt->Notify(); };
diff --git a/src/profiling/memory/malloc_hooks.cc b/src/profiling/memory/malloc_hooks.cc
index 2eb4cd4..f696e58 100644
--- a/src/profiling/memory/malloc_hooks.cc
+++ b/src/profiling/memory/malloc_hooks.cc
@@ -256,7 +256,7 @@
perfetto::base::UnixSocketRaw parent_sock;
perfetto::base::UnixSocketRaw child_sock;
std::tie(parent_sock, child_sock) = perfetto::base::UnixSocketRaw::CreatePair(
- perfetto::base::SockType::kStream);
+ perfetto::base::SockFamily::kUnix, perfetto::base::SockType::kStream);
if (!parent_sock || !child_sock) {
PERFETTO_PLOG("Failed to create socketpair.");
diff --git a/src/profiling/memory/unwinding.cc b/src/profiling/memory/unwinding.cc
index 59a4dca..ced87f9 100644
--- a/src/profiling/memory/unwinding.cc
+++ b/src/profiling/memory/unwinding.cc
@@ -374,7 +374,7 @@
void UnwindingWorker::HandleHandoffSocket(HandoffData handoff_data) {
auto sock = base::UnixSocket::AdoptConnected(
handoff_data.sock.ReleaseFd(), this, this->thread_task_runner_.get(),
- base::SockType::kStream);
+ base::SockFamily::kUnix, base::SockType::kStream);
pid_t peer_pid = sock->peer_pid();
UnwindingMetadata metadata(peer_pid, std::move(handoff_data.maps_fd),
diff --git a/src/profiling/memory/wire_protocol_unittest.cc b/src/profiling/memory/wire_protocol_unittest.cc
index a8d623e..462ea06 100644
--- a/src/profiling/memory/wire_protocol_unittest.cc
+++ b/src/profiling/memory/wire_protocol_unittest.cc
@@ -56,8 +56,10 @@
int sv[2];
PERFETTO_CHECK(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == 0);
base::UnixSocketRaw send_sock(base::ScopedFile(sv[0]),
+ base::SockFamily::kUnix,
base::SockType::kStream);
base::UnixSocketRaw recv_sock(base::ScopedFile(sv[1]),
+ base::SockFamily::kUnix,
base::SockType::kStream);
char msg[] = "a";
PERFETTO_CHECK(send_sock.Send(msg, sizeof(msg), &fd, 1));
diff --git a/src/traced/probes/android_log/android_log_data_source.cc b/src/traced/probes/android_log/android_log_data_source.cc
index 73e84db..c49bec6 100644
--- a/src/traced/probes/android_log/android_log_data_source.cc
+++ b/src/traced/probes/android_log/android_log_data_source.cc
@@ -148,7 +148,8 @@
}
base::UnixSocketRaw AndroidLogDataSource::ConnectLogdrSocket() {
- auto socket = base::UnixSocketRaw::CreateMayFail(base::SockType::kSeqPacket);
+ auto socket = base::UnixSocketRaw::CreateMayFail(base::SockFamily::kUnix,
+ base::SockType::kSeqPacket);
if (!socket || !socket.Connect(kLogdrSocket)) {
PERFETTO_PLOG("Failed to connect to %s", kLogdrSocket);
return base::UnixSocketRaw();
diff --git a/src/traced/probes/android_log/android_log_data_source_unittest.cc b/src/traced/probes/android_log/android_log_data_source_unittest.cc
index ee38ced..1c90204 100644
--- a/src/traced/probes/android_log/android_log_data_source_unittest.cc
+++ b/src/traced/probes/android_log/android_log_data_source_unittest.cc
@@ -62,8 +62,8 @@
base::UnixSocketRaw recv_sock;
// In theory this should be a kSeqPacket. We use kDgram here so that the
// test can run also on MacOS (which doesn't support SOCK_SEQPACKET).
- const auto kSockType = base::SockType::kDgram;
- std::tie(send_sock, recv_sock) = base::UnixSocketRaw::CreatePair(kSockType);
+ std::tie(send_sock, recv_sock) = base::UnixSocketRaw::CreatePair(
+ base::SockFamily::kUnix, base::SockType::kDgram);
ASSERT_TRUE(send_sock);
ASSERT_TRUE(recv_sock);
diff --git a/test/producer_socket_fuzzer.cc b/test/producer_socket_fuzzer.cc
index 430b2de..ac7227f 100644
--- a/test/producer_socket_fuzzer.cc
+++ b/test/producer_socket_fuzzer.cc
@@ -63,7 +63,8 @@
data, size, task_runner.CreateCheckpoint("data_sent"));
std::unique_ptr<base::UnixSocket> sock = base::UnixSocket::Connect(
- helper.GetProducerSocketName(), &fake_event_listener, &task_runner);
+ helper.GetProducerSocketName(), &fake_event_listener, &task_runner,
+ base::SockFamily::kUnix, base::SockType::kStream);
task_runner.RunUntilCheckpoint("data_sent");
return 0;