blob: f829a6be920acf42cdc79e7c35278b5da6608892 [file] [log] [blame]
/*
* Copyright (C) 2023 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "src/traced_relay/socket_relay_handler.h"
#include <fcntl.h>
#include <sys/poll.h>
#include <algorithm>
#include <memory>
#include <mutex>
#include <thread>
#include <utility>
#include "perfetto/base/logging.h"
#include "perfetto/base/platform_handle.h"
#include "perfetto/ext/base/thread_checker.h"
#include "perfetto/ext/base/utils.h"
#include "perfetto/ext/base/watchdog.h"
namespace perfetto {
namespace {
// Use the default watchdog timeout for task runners.
static constexpr int kWatchdogTimeoutMs = 30000;
// Timeout of the epoll_wait() call.
static constexpr int kPollTimeoutMs = 30000;
} // namespace
FdPoller::Watcher::~Watcher() = default;
FdPoller::FdPoller(Watcher* watcher) : watcher_(watcher) {
WatchForRead(notify_fd_.fd());
// This is done last in the ctor because WatchForRead() asserts using
// |thread_checker_|.
PERFETTO_DETACH_FROM_THREAD(thread_checker_);
}
void FdPoller::Poll() {
PERFETTO_DCHECK_THREAD(thread_checker_);
int num_fds = PERFETTO_EINTR(poll(
&poll_fds_[0], static_cast<nfds_t>(poll_fds_.size()), kPollTimeoutMs));
if (num_fds == -1 && base::IsAgain(errno))
return; // Poll again.
PERFETTO_DCHECK(num_fds <= static_cast<int>(poll_fds_.size()));
// Make a copy of |poll_fds_| so it's safe to watch and unwatch while
// notifying the watcher.
const auto poll_fds(poll_fds_);
for (const auto& event : poll_fds) {
if (!event.revents) // This event isn't active.
continue;
// Check whether the poller needs to break the polling loop for updates.
if (event.fd == notify_fd_.fd()) {
notify_fd_.Clear();
continue;
}
// Notify the callers on fd events.
if (event.revents & POLLOUT) {
watcher_->OnFdWritable(event.fd);
} else if (event.revents & POLLIN) {
watcher_->OnFdReadable(event.fd);
} else {
PERFETTO_DLOG("poll() returns events %d on fd %d", event.events,
event.fd);
} // Other events like POLLHUP or POLLERR are ignored.
}
}
void FdPoller::Notify() {
// Can be called from any thread.
notify_fd_.Notify();
}
std::vector<pollfd>::iterator FdPoller::FindPollEvent(base::PlatformHandle fd) {
PERFETTO_DCHECK_THREAD(thread_checker_);
return std::find_if(poll_fds_.begin(), poll_fds_.end(),
[fd](const pollfd& item) { return fd == item.fd; });
}
void FdPoller::WatchFd(base::PlatformHandle fd, WatchEvents events) {
auto it = FindPollEvent(fd);
if (it == poll_fds_.end()) {
poll_fds_.push_back({fd, events, 0});
} else {
it->events |= events;
}
}
void FdPoller::UnwatchFd(base::PlatformHandle fd, WatchEvents events) {
auto it = FindPollEvent(fd);
PERFETTO_CHECK(it != poll_fds_.end());
it->events &= ~events;
}
void FdPoller::RemoveWatch(base::PlatformHandle fd) {
auto it = FindPollEvent(fd);
PERFETTO_CHECK(it != poll_fds_.end());
poll_fds_.erase(it);
}
SocketRelayHandler::SocketRelayHandler() : fd_poller_(this) {
PERFETTO_DETACH_FROM_THREAD(io_thread_checker_);
io_thread_ = std::thread([this]() { this->Run(); });
}
SocketRelayHandler::~SocketRelayHandler() {
RunOnIOThread([this]() { this->exited_ = true; });
io_thread_.join();
}
void SocketRelayHandler::AddSocketPair(
std::unique_ptr<SocketPair> socket_pair) {
RunOnIOThread([this, socket_pair = std::move(socket_pair)]() mutable {
PERFETTO_DCHECK_THREAD(io_thread_checker_);
base::PlatformHandle fd1 = socket_pair->first.sock.fd();
base::PlatformHandle fd2 = socket_pair->second.sock.fd();
auto* ptr = socket_pair.get();
socket_pairs_.emplace_back(std::move(socket_pair));
fd_poller_.WatchForRead(fd1);
fd_poller_.WatchForRead(fd2);
socket_pairs_by_fd_[fd1] = ptr;
socket_pairs_by_fd_[fd2] = ptr;
});
}
void SocketRelayHandler::Run() {
PERFETTO_DCHECK_THREAD(io_thread_checker_);
while (!exited_) {
fd_poller_.Poll();
auto handle = base::Watchdog::GetInstance()->CreateFatalTimer(
kWatchdogTimeoutMs, base::WatchdogCrashReason::kTaskRunnerHung);
std::deque<std::packaged_task<void()>> pending_tasks;
{
std::lock_guard<std::mutex> lock(mutex_);
pending_tasks = std::move(pending_tasks_);
}
while (!pending_tasks.empty()) {
auto task = std::move(pending_tasks.front());
pending_tasks.pop_front();
task();
}
}
}
void SocketRelayHandler::OnFdReadable(base::PlatformHandle fd) {
PERFETTO_DCHECK_THREAD(io_thread_checker_);
auto socket_pair = GetSocketPair(fd);
if (!socket_pair)
return; // Already removed.
auto [fd_sock, peer_sock] = *socket_pair;
// Buffer some bytes.
auto peer_fd = peer_sock.sock.fd();
while (fd_sock.available_bytes() > 0) {
auto rsize =
fd_sock.sock.Receive(fd_sock.buffer(), fd_sock.available_bytes());
if (rsize > 0) {
fd_sock.EnqueueData(static_cast<size_t>(rsize));
continue;
}
if (rsize == 0 || (rsize == -1 && !base::IsAgain(errno))) {
// TODO(chinglinyu): flush the remaining data to |peer_sock|.
RemoveSocketPair(fd_sock, peer_sock);
return;
}
// If there is any buffered data that needs to be sent to |peer_sock|, arm
// the write watcher.
if (fd_sock.data_size() > 0) {
fd_poller_.WatchForWrite(peer_fd);
}
return;
}
// We are not bufferable: need to turn off POLLIN to avoid spinning.
fd_poller_.UnwatchForRead(fd);
PERFETTO_DCHECK(fd_sock.data_size() > 0);
// Watching for POLLOUT will cause an OnFdWritable() event of
// |peer_sock|.
fd_poller_.WatchForWrite(peer_fd);
}
void SocketRelayHandler::OnFdWritable(base::PlatformHandle fd) {
PERFETTO_DCHECK_THREAD(io_thread_checker_);
auto socket_pair = GetSocketPair(fd);
if (!socket_pair)
return; // Already removed.
auto [fd_sock, peer_sock] = *socket_pair;
// |fd_sock| can be written to without blocking. Now we can transfer from the
// buffer in |peer_sock|.
while (peer_sock.data_size() > 0) {
auto wsize = fd_sock.sock.Send(peer_sock.data(), peer_sock.data_size());
if (wsize > 0) {
peer_sock.DequeueData(static_cast<size_t>(wsize));
continue;
}
if (wsize == -1 && !base::IsAgain(errno)) {
RemoveSocketPair(fd_sock, peer_sock);
}
// errno == EAGAIN and we still have data to send: continue watching for
// read.
return;
}
// We don't have buffered data to send. Disable watching for write.
fd_poller_.UnwatchForWrite(fd);
auto peer_fd = peer_sock.sock.fd();
if (peer_sock.available_bytes())
fd_poller_.WatchForRead(peer_fd);
}
std::optional<std::tuple<SocketWithBuffer&, SocketWithBuffer&>>
SocketRelayHandler::GetSocketPair(base::PlatformHandle fd) {
PERFETTO_DCHECK_THREAD(io_thread_checker_);
auto* socket_pair = socket_pairs_by_fd_.Find(fd);
if (!socket_pair)
return std::nullopt;
PERFETTO_DCHECK(fd == (*socket_pair)->first.sock.fd() ||
fd == (*socket_pair)->second.sock.fd());
if (fd == (*socket_pair)->first.sock.fd())
return std::tie((*socket_pair)->first, (*socket_pair)->second);
return std::tie((*socket_pair)->second, (*socket_pair)->first);
}
void SocketRelayHandler::RemoveSocketPair(SocketWithBuffer& sock1,
SocketWithBuffer& sock2) {
PERFETTO_DCHECK_THREAD(io_thread_checker_);
auto fd1 = sock1.sock.fd();
auto fd2 = sock2.sock.fd();
fd_poller_.RemoveWatch(fd1);
fd_poller_.RemoveWatch(fd2);
auto* ptr1 = socket_pairs_by_fd_.Find(fd1);
auto* ptr2 = socket_pairs_by_fd_.Find(fd2);
PERFETTO_DCHECK(ptr1 && ptr2);
PERFETTO_DCHECK(*ptr1 == *ptr2);
auto* socket_pair_ptr = *ptr1;
socket_pairs_by_fd_.Erase(fd1);
socket_pairs_by_fd_.Erase(fd2);
socket_pairs_.erase(
std::remove_if(
socket_pairs_.begin(), socket_pairs_.end(),
[socket_pair_ptr](const std::unique_ptr<SocketPair>& item) {
return item.get() == socket_pair_ptr;
}),
socket_pairs_.end());
}
} // namespace perfetto