blob: a96308a01a41a611c8fe7191f8eeae2c738ea503 [file] [log] [blame]
/*
* Copyright (C) 2023 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "src/traced_relay/socket_relay_handler.h"
#include <chrono>
#include <cstring>
#include <memory>
#include <random>
#include <string>
#include <thread>
#include <utility>
#include "perfetto/ext/base/threading/thread_pool.h"
#include "perfetto/ext/base/unix_socket.h"
#include "test/gtest_and_gmock.h"
using testing::Values;
namespace perfetto {
namespace {
using RawSocketPair = std::pair<base::UnixSocketRaw, base::UnixSocketRaw>;
using RngValueType = std::minstd_rand0::result_type;
struct TestClient {
RawSocketPair endpoint_sockets;
std::minstd_rand0 data_prng;
std::thread client_thread;
};
class SocketRelayHandlerTest : public ::testing::TestWithParam<uint32_t> {
protected:
void SetUp() override {
socket_relay_handler_ = std::make_unique<SocketRelayHandler>();
for (uint32_t i = 0; i < GetParam(); i++) {
TestClient client{SetUpEndToEndSockets(), std::minstd_rand0(i), {}};
test_clients_.push_back(std::move(client));
}
}
void TearDown() override { socket_relay_handler_ = nullptr; }
RawSocketPair SetUpEndToEndSockets() {
// Creates 2 SocketPairs:
// sock1 <-> sock2 <-> SocketRelayHandler <-> sock3 <-> sock4.
// sock2 and sock3 are transferred to the SocketRelayHandler.
// We test by reading and writing bidirectionally using sock1 and sock4.
auto [sock1, sock2] = base::UnixSocketRaw::CreatePairPosix(
base::SockFamily::kUnix, base::SockType::kStream);
sock2.SetBlocking(false);
auto [sock3, sock4] = base::UnixSocketRaw::CreatePairPosix(
base::SockFamily::kUnix, base::SockType::kStream);
sock3.SetBlocking(false);
auto socket_pair = std::make_unique<SocketPair>();
socket_pair->first.sock = std::move(sock2);
socket_pair->second.sock = std::move(sock3);
socket_relay_handler_->AddSocketPair(std::move(socket_pair));
RawSocketPair endpoint_sockets;
endpoint_sockets.first = std::move(sock1);
endpoint_sockets.second = std::move(sock4);
return endpoint_sockets;
}
std::unique_ptr<SocketRelayHandler> socket_relay_handler_;
std::vector<TestClient> test_clients_;
// Use fewer receiver threads than sender threads.
base::ThreadPool receiver_thread_pool_{1 + GetParam() / 10};
};
TEST(SocketWithBufferTest, EnqueueDequeue) {
SocketWithBuffer socket_with_buffer;
// No data initially.
EXPECT_EQ(0u, socket_with_buffer.data_size());
// Has room for writing some bytes into.
std::string data = "12345678901234567890";
EXPECT_GT(socket_with_buffer.available_bytes(), data.size());
memcpy(socket_with_buffer.buffer(), data.data(), data.size());
socket_with_buffer.EnqueueData(data.size());
EXPECT_EQ(data.size(), socket_with_buffer.data_size());
// Dequeue some bytes.
socket_with_buffer.DequeueData(5);
EXPECT_EQ(socket_with_buffer.data_size(), data.size() - 5);
std::string buffered_data(reinterpret_cast<char*>(socket_with_buffer.data()),
socket_with_buffer.data_size());
EXPECT_EQ(buffered_data, "678901234567890");
}
// Test the SocketRelayHander with randomized request and response data.
TEST_P(SocketRelayHandlerTest, RandomizedRequestResponse) {
// The max message size in the number of RNG calls.
constexpr size_t kMaxMsgSizeRng = 1 << 20;
// Create the threads for sending and receiving data through the
// SocketRelayHandler.
for (auto& client : test_clients_) {
auto* thread_pool = &receiver_thread_pool_;
auto thread_func = [&client, thread_pool]() {
auto& rng = client.data_prng;
// The max number of requests.
const size_t num_requests = rng() % 50;
for (size_t j = 0; j < num_requests; j++) {
auto& send_endpoint = client.endpoint_sockets.first;
auto& receive_endpoint = client.endpoint_sockets.second;
auto req_size = rng() % kMaxMsgSizeRng;
// Generate the random request.
std::vector<RngValueType> request;
request.reserve(req_size);
for (size_t r = 0; r < req_size; r++) {
request.emplace_back(rng());
}
// Create a buffer for receiving the request.
std::vector<RngValueType> received_request(request.size());
std::mutex mutex;
std::condition_variable cv;
std::unique_lock<std::mutex> lock(mutex);
bool done = false;
// Blocking receive on the thread pool.
thread_pool->PostTask([&]() {
const size_t bytes_to_receive =
received_request.size() * sizeof(RngValueType);
uint8_t* receive_buffer =
reinterpret_cast<uint8_t*>(received_request.data());
size_t bytes_received = 0;
// Perform a blocking read until we received the expected bytes.
while (bytes_received < bytes_to_receive) {
ssize_t rsize = PERFETTO_EINTR(
receive_endpoint.Receive(receive_buffer + bytes_received,
bytes_to_receive - bytes_received));
if (rsize <= 0)
break;
bytes_received += static_cast<size_t>(rsize);
std::this_thread::yield(); // Adds some scheduling randomness.
}
std::lock_guard<std::mutex> inner_lock(mutex);
done = true;
cv.notify_one();
});
// Perform a blocking send of the request data.
PERFETTO_EINTR(send_endpoint.Send(
request.data(), request.size() * sizeof(RngValueType)));
// Wait until the request is fully received.
cv.wait(lock, [&done] { return done; });
// Check data integrity.
EXPECT_EQ(request, received_request);
// Add some randomness to timing.
std::this_thread::sleep_for(std::chrono::microseconds(rng() % 1000));
// Emulate the response by reversing the data flow direction.
std::swap(send_endpoint, receive_endpoint);
}
};
client.client_thread = std::thread(std::move(thread_func));
}
for (auto& client : test_clients_) {
client.client_thread.join();
}
}
INSTANTIATE_TEST_SUITE_P(ByConnections,
SocketRelayHandlerTest,
Values(1, 5, 50));
} // namespace
} // namespace perfetto