blob: 8602abdab3c7665a7f175fcce6c86d831d05342c [file] [log] [blame]
/*
* Copyright (C) 2018 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "src/profiling/memory/heapprofd_producer.h"
#include <inttypes.h>
#include <signal.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include "perfetto/base/file_utils.h"
#include "perfetto/tracing/core/data_source_config.h"
#include "perfetto/tracing/core/data_source_descriptor.h"
#include "perfetto/tracing/core/trace_writer.h"
#include "perfetto/tracing/ipc/producer_ipc_client.h"
namespace perfetto {
namespace profiling {
namespace {
constexpr char kHeapprofdDataSource[] = "android.heapprofd";
constexpr size_t kUnwinderQueueSize = 1000;
constexpr size_t kBookkeepingQueueSize = 1000;
constexpr size_t kUnwinderThreads = 5;
constexpr int kHeapprofdSignal = 36;
constexpr uint32_t kInitialConnectionBackoffMs = 100;
constexpr uint32_t kMaxConnectionBackoffMs = 30 * 1000;
ClientConfiguration MakeClientConfiguration(const DataSourceConfig& cfg) {
ClientConfiguration client_config;
client_config.interval = cfg.heapprofd_config().sampling_interval_bytes();
return client_config;
}
template <typename Fn>
void ForEachPid(const char* file, Fn callback) {
base::ScopedDir proc_dir(opendir("/proc"));
if (!proc_dir) {
PERFETTO_DFATAL("Failed to open /proc");
return;
}
struct dirent* entry;
while ((entry = readdir(*proc_dir))) {
char filename_buf[128];
ssize_t written = snprintf(filename_buf, sizeof(filename_buf),
"/proc/%s/%s", entry->d_name, file);
if (written < 0 || static_cast<size_t>(written) >= sizeof(filename_buf)) {
if (written < 0)
PERFETTO_DFATAL("Failed to concatenate cmdline file.");
else
PERFETTO_DFATAL("Overflow when concatenating cmdline file.");
continue;
}
char* end;
long int pid = strtol(entry->d_name, &end, 10);
if (*end != '\0')
continue;
callback(static_cast<pid_t>(pid), filename_buf);
}
}
void FindAllProfilablePids(std::vector<pid_t>* pids) {
ForEachPid("cmdline", [pids](pid_t pid, const char* filename_buf) {
if (pid == getpid())
return;
struct stat statbuf;
// Check if we have permission to the process.
if (stat(filename_buf, &statbuf) == 0)
pids->emplace_back(pid);
});
}
void FindPidsForCmdlines(const std::vector<std::string>& cmdlines,
std::vector<pid_t>* pids) {
ForEachPid("cmdline", [&cmdlines, pids](pid_t pid, const char* filename_buf) {
if (pid == getpid())
return;
std::string process_cmdline;
process_cmdline.reserve(128);
if (!base::ReadFile(filename_buf, &process_cmdline))
return;
if (process_cmdline.empty())
return;
// Strip everything after @ for Java processes.
// Otherwise, strip newline at EOF.
size_t endpos = process_cmdline.find('@');
if (endpos == std::string::npos)
endpos = process_cmdline.size();
if (endpos < 1)
return;
process_cmdline.resize(endpos);
for (const std::string& cmdline : cmdlines) {
if (process_cmdline == cmdline)
pids->emplace_back(static_cast<pid_t>(pid));
}
});
}
} // namespace
// We create kUnwinderThreads unwinding threads and one bookeeping thread.
// The bookkeeping thread is singleton in order to avoid expensive and
// complicated synchronisation in the bookkeeping.
//
// We wire up the system by creating BoundedQueues between the threads. The main
// thread runs the TaskRunner driving the SocketListener. The unwinding thread
// takes the data received by the SocketListener and if it is a malloc does
// stack unwinding, and if it is a free just forwards the content of the record
// to the bookkeeping thread.
//
// +--------------+
// |SocketListener|
// +------+-------+
// |
// +--UnwindingRecord -+
// | |
// +--------v-------+ +-------v--------+
// |Unwinding Thread| |Unwinding Thread|
// +--------+-------+ +-------+--------+
// | |
// +-BookkeepingRecord +
// |
// +--------v---------+
// |Bookkeeping Thread|
// +------------------+
HeapprofdProducer::HeapprofdProducer(base::TaskRunner* task_runner)
: task_runner_(task_runner),
bookkeeping_queue_(kBookkeepingQueueSize),
bookkeeping_th_([this] { bookkeeping_thread_.Run(&bookkeeping_queue_); }),
unwinder_queues_(MakeUnwinderQueues(kUnwinderThreads)),
unwinding_threads_(MakeUnwindingThreads(kUnwinderThreads)),
socket_listener_(MakeSocketListenerCallback(), &bookkeeping_thread_),
socket_(MakeSocket()),
weak_factory_(this) {}
HeapprofdProducer::~HeapprofdProducer() {
bookkeeping_queue_.Shutdown();
for (auto& queue : unwinder_queues_) {
queue.Shutdown();
}
bookkeeping_th_.join();
for (std::thread& th : unwinding_threads_) {
th.join();
}
}
void HeapprofdProducer::OnConnect() {
// TODO(fmayer): Delete once we have generic reconnect logic.
PERFETTO_DCHECK(state_ == kConnecting);
state_ = kConnected;
ResetConnectionBackoff();
PERFETTO_LOG("Connected to the service");
DataSourceDescriptor desc;
desc.set_name(kHeapprofdDataSource);
endpoint_->RegisterDataSource(desc);
}
// TODO(fmayer): Delete once we have generic reconnect logic.
void HeapprofdProducer::OnDisconnect() {
PERFETTO_DCHECK(state_ == kConnected || state_ == kConnecting);
PERFETTO_LOG("Disconnected from tracing service");
if (state_ == kConnected)
return task_runner_->PostTask([this] { this->Restart(); });
state_ = kNotConnected;
IncreaseConnectionBackoff();
task_runner_->PostDelayedTask([this] { this->Connect(); },
connection_backoff_ms_);
}
void HeapprofdProducer::SetupDataSource(DataSourceInstanceID id,
const DataSourceConfig& cfg) {
PERFETTO_DLOG("Setting up data source.");
if (cfg.name() != kHeapprofdDataSource) {
PERFETTO_DLOG("Invalid data source name.");
return;
}
auto it = data_sources_.find(id);
if (it != data_sources_.end()) {
PERFETTO_DFATAL("Received duplicated data source instance id: %" PRIu64,
id);
return;
}
DataSource data_source;
ClientConfiguration client_config = MakeClientConfiguration(cfg);
if (cfg.heapprofd_config().all()) {
FindAllProfilablePids(&data_source.pids);
if (!cfg.heapprofd_config().pid().empty())
PERFETTO_ELOG("Got all and pid. Ignoring pid.");
if (!cfg.heapprofd_config().process_cmdline().empty())
PERFETTO_ELOG("Got all and process_cmdline. Ignoring process_cmdline.");
} else {
for (uint64_t pid : cfg.heapprofd_config().pid())
data_source.pids.emplace_back(static_cast<pid_t>(pid));
FindPidsForCmdlines(cfg.heapprofd_config().process_cmdline(),
&data_source.pids);
}
auto pid_it = data_source.pids.begin();
while (pid_it != data_source.pids.end()) {
auto profiling_session = socket_listener_.ExpectPID(*pid_it, client_config);
if (!profiling_session) {
PERFETTO_DFATAL("No enabling duplicate profiling session for %d",
*pid_it);
pid_it = data_source.pids.erase(pid_it);
continue;
}
data_source.sessions.emplace_back(std::move(profiling_session));
++pid_it;
}
if (data_source.pids.empty()) {
// TODO(fmayer): Whole system profiling.
PERFETTO_DLOG("No valid pids given. Not setting up data source.");
return;
}
auto buffer_id = static_cast<BufferID>(cfg.target_buffer());
data_source.trace_writer = endpoint_->CreateTraceWriter(buffer_id);
data_sources_.emplace(id, std::move(data_source));
PERFETTO_DLOG("Set up data source.");
}
void HeapprofdProducer::DoContinuousDump(DataSourceInstanceID id,
uint32_t dump_interval) {
if (!Dump(id, 0 /* flush_id */, false /* is_flush */))
return;
auto weak_producer = weak_factory_.GetWeakPtr();
task_runner_->PostDelayedTask(
[weak_producer, id, dump_interval] {
if (!weak_producer)
return;
weak_producer->DoContinuousDump(id, dump_interval);
},
dump_interval);
}
void HeapprofdProducer::StartDataSource(DataSourceInstanceID id,
const DataSourceConfig& cfg) {
PERFETTO_DLOG("Start DataSource");
auto it = data_sources_.find(id);
if (it == data_sources_.end()) {
PERFETTO_DFATAL("Received invalid data source instance to start: %" PRIu64,
id);
return;
}
const DataSource& data_source = it->second;
for (pid_t pid : data_source.pids) {
PERFETTO_DLOG("Sending %d to %d", kHeapprofdSignal, pid);
if (kill(pid, kHeapprofdSignal) != 0) {
PERFETTO_DPLOG("kill");
}
}
const auto continuous_dump_config =
cfg.heapprofd_config().continuous_dump_config();
uint32_t dump_interval = continuous_dump_config.dump_interval_ms();
if (dump_interval) {
auto weak_producer = weak_factory_.GetWeakPtr();
task_runner_->PostDelayedTask(
[weak_producer, id, dump_interval] {
if (!weak_producer)
return;
weak_producer->DoContinuousDump(id, dump_interval);
},
continuous_dump_config.dump_phase_ms());
}
PERFETTO_DLOG("Started DataSource");
}
void HeapprofdProducer::StopDataSource(DataSourceInstanceID id) {
// DataSource holds ProfilingSession handles which on being destructed tear
// down the profiling on the client.
if (data_sources_.erase(id) != 1)
PERFETTO_DFATAL("Trying to stop non existing data source: %" PRIu64, id);
}
void HeapprofdProducer::OnTracingSetup() {}
bool HeapprofdProducer::Dump(DataSourceInstanceID id,
FlushRequestID flush_id,
bool has_flush_id) {
PERFETTO_DLOG("Dumping %" PRIu64 ", flush: %d", id, has_flush_id);
auto it = data_sources_.find(id);
if (it == data_sources_.end()) {
return false;
}
const DataSource& data_source = it->second;
BookkeepingRecord record{};
record.record_type = BookkeepingRecord::Type::Dump;
DumpRecord& dump_record = record.dump_record;
dump_record.pids = data_source.pids;
dump_record.trace_writer = data_source.trace_writer;
auto weak_producer = weak_factory_.GetWeakPtr();
base::TaskRunner* task_runner = task_runner_;
if (has_flush_id) {
dump_record.callback = [task_runner, weak_producer, flush_id] {
task_runner->PostTask([weak_producer, flush_id] {
if (!weak_producer)
return weak_producer->FinishDataSourceFlush(flush_id);
});
};
} else {
dump_record.callback = [] {};
}
bookkeeping_queue_.Add(std::move(record));
return true;
}
void HeapprofdProducer::Flush(FlushRequestID flush_id,
const DataSourceInstanceID* ids,
size_t num_ids) {
if (num_ids == 0)
return;
size_t& flush_in_progress = flushes_in_progress_[flush_id];
PERFETTO_DCHECK(flush_in_progress == 0);
flush_in_progress = num_ids;
for (size_t i = 0; i < num_ids; ++i)
Dump(ids[i], flush_id, true);
}
void HeapprofdProducer::FinishDataSourceFlush(FlushRequestID flush_id) {
auto it = flushes_in_progress_.find(flush_id);
if (it == flushes_in_progress_.end()) {
PERFETTO_DFATAL("FinishDataSourceFlush id invalid: %" PRIu64, flush_id);
return;
}
size_t& flush_in_progress = it->second;
if (--flush_in_progress == 0) {
endpoint_->NotifyFlushComplete(flush_id);
flushes_in_progress_.erase(flush_id);
}
}
std::function<void(UnwindingRecord)>
HeapprofdProducer::MakeSocketListenerCallback() {
return [this](UnwindingRecord record) {
unwinder_queues_[static_cast<size_t>(record.pid) % kUnwinderThreads].Add(
std::move(record));
};
}
std::vector<BoundedQueue<UnwindingRecord>>
HeapprofdProducer::MakeUnwinderQueues(size_t n) {
std::vector<BoundedQueue<UnwindingRecord>> ret(n);
for (size_t i = 0; i < n; ++i)
ret[i].SetCapacity(kUnwinderQueueSize);
return ret;
}
std::vector<std::thread> HeapprofdProducer::MakeUnwindingThreads(size_t n) {
std::vector<std::thread> ret;
for (size_t i = 0; i < n; ++i) {
ret.emplace_back([this, i] {
UnwindingMainLoop(&unwinder_queues_[i], &bookkeeping_queue_);
});
}
return ret;
}
std::unique_ptr<base::UnixSocket> HeapprofdProducer::MakeSocket() {
const char* sock_fd = getenv(kHeapprofdSocketEnvVar);
if (sock_fd == nullptr) {
unlink(kHeapprofdSocketFile);
return base::UnixSocket::Listen(kHeapprofdSocketFile, &socket_listener_,
task_runner_);
}
char* end;
int raw_fd = static_cast<int>(strtol(sock_fd, &end, 10));
if (*end != '\0')
PERFETTO_FATAL("Invalid %s. Expected decimal integer.",
kHeapprofdSocketEnvVar);
return base::UnixSocket::Listen(base::ScopedFile(raw_fd), &socket_listener_,
task_runner_);
}
// TODO(fmayer): Delete these and used ReconnectingProducer once submitted
void HeapprofdProducer::Restart() {
// We lost the connection with the tracing service. At this point we need
// to reset all the data sources. Trying to handle that manually is going to
// be error prone. What we do here is simply desroying the instance and
// recreating it again.
// TODO(hjd): Add e2e test for this.
base::TaskRunner* task_runner = task_runner_;
const char* socket_name = socket_name_;
// Invoke destructor and then the constructor again.
this->~HeapprofdProducer();
new (this) HeapprofdProducer(task_runner);
ConnectWithRetries(socket_name);
}
void HeapprofdProducer::ConnectWithRetries(const char* socket_name) {
PERFETTO_DCHECK(state_ == kNotStarted);
state_ = kNotConnected;
ResetConnectionBackoff();
socket_name_ = socket_name;
Connect();
}
void HeapprofdProducer::DumpAll() {
for (const auto& id_and_data_source : data_sources_) {
if (!Dump(id_and_data_source.first, 0 /* flush_id */, false /* is_flush */))
PERFETTO_DLOG("Failed to dump %" PRIu64, id_and_data_source.first);
}
}
void HeapprofdProducer::Connect() {
PERFETTO_DCHECK(state_ == kNotConnected);
state_ = kConnecting;
endpoint_ = ProducerIPCClient::Connect(socket_name_, this,
"android.heapprofd", task_runner_);
}
void HeapprofdProducer::IncreaseConnectionBackoff() {
connection_backoff_ms_ *= 2;
if (connection_backoff_ms_ > kMaxConnectionBackoffMs)
connection_backoff_ms_ = kMaxConnectionBackoffMs;
}
void HeapprofdProducer::ResetConnectionBackoff() {
connection_backoff_ms_ = kInitialConnectionBackoffMs;
}
} // namespace profiling
} // namespace perfetto