blob: 987db5877d3a1a5282c62cd5c805565d38fe3026 [file] [log] [blame]
/*
* Copyright (C) 2022 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "src/traced/probes/statsd_client/statsd_data_source.h"
#include <stdlib.h>
#include "perfetto/base/task_runner.h"
#include "perfetto/base/time.h"
#include "perfetto/ext/base/string_utils.h"
#include "perfetto/ext/base/subprocess.h"
#include "perfetto/protozero/scattered_heap_buffer.h"
#include "perfetto/tracing/core/data_source_config.h"
#include "protos/perfetto/config/statsd/statsd_tracing_config.pbzero.h"
#include "protos/perfetto/trace/statsd/statsd_atom.pbzero.h"
#include "protos/perfetto/trace/trace_packet.pbzero.h"
#include "protos/third_party/statsd/shell_config.pbzero.h"
using ::perfetto::protos::pbzero::StatsdPullAtomConfig;
using ::perfetto::protos::pbzero::StatsdShellSubscription;
using ::perfetto::protos::pbzero::StatsdTracingConfig;
namespace perfetto {
namespace {
static constexpr const size_t kHeaderSize = sizeof(size_t);
void AddPullAtoms(const StatsdPullAtomConfig::Decoder& cfg,
protozero::RepeatedFieldIterator<int32_t> it,
StatsdShellSubscription* msg) {
constexpr int32_t kDefaultPullFreqMs = 5000;
int32_t pull_freq_ms = kDefaultPullFreqMs;
if (cfg.has_pull_frequency_ms()) {
pull_freq_ms = cfg.pull_frequency_ms();
}
for (; it; ++it) {
auto* pulled_msg = msg->add_pulled();
pulled_msg->set_freq_millis(pull_freq_ms);
for (auto package = cfg.packages(); package; ++package) {
pulled_msg->add_packages(*package);
}
auto* matcher_msg = pulled_msg->set_matcher();
matcher_msg->set_atom_id(*it);
}
}
void AddPushAtoms(protozero::RepeatedFieldIterator<int32_t> it,
StatsdShellSubscription* msg) {
for (; it; ++it) {
auto* matcher_msg = msg->add_pushed();
matcher_msg->set_atom_id(*it);
}
}
// Exec "cmd stats data-subscribe" and read/write stdin/stdout. This is
// the only way to make this work when side loading but for in tree
// builds this causes to many denials:
// avc: denied { execute_no_trans } for comm="traced_probes"
// path="/system/bin/cmd" dev="dm-0" ino=200 scontext=u:r:traced_probes:s0
// tcontext=u:object_r:system_file:s0 tclass=file permissive=1 avc: denied {
// call } for comm="cmd" scontext=u:r:traced_probes:s0 tcontext=u:r:statsd:s0
// tclass=binder permissive=1 avc: denied { use } for comm="cmd"
// path="pipe:[51149]" dev="pipefs" ino=51149 scontext=u:r:statsd:s0
// tcontext=u:r:traced_probes:s0 tclass=fd permissive=1 avc: denied { read } for
// comm="cmd" path="pipe:[51149]" dev="pipefs" ino=51149 scontext=u:r:statsd:s0
// tcontext=u:r:traced_probes:s0 tclass=fifo_file permissive=1 avc: denied {
// write } for comm="cmd" path="pipe:[51148]" dev="pipefs" ino=51148
// scontext=u:r:statsd:s0 tcontext=u:r:traced_probes:s0 tclass=fifo_file
// permissive=1 avc: denied { transfer } for comm="cmd"
// scontext=u:r:traced_probes:s0 tcontext=u:r:statsd:s0 tclass=binder
// permissive=1
class ExecStatsdBackend : public StatsdBackend {
public:
ExecStatsdBackend(std::string input, base::ScopedFile output_wr);
virtual ~ExecStatsdBackend() override;
private:
base::Subprocess subprocess_;
};
ExecStatsdBackend::ExecStatsdBackend(std::string input,
base::ScopedFile output_wr)
: StatsdBackend(std::move(input), std::move(output_wr)),
subprocess_({"/system/bin/cmd", "stats", "data-subscribe"}) {
subprocess_.args.stdin_mode = base::Subprocess::InputMode::kBuffer;
subprocess_.args.stdout_mode = base::Subprocess::OutputMode::kFd;
subprocess_.args.stderr_mode = base::Subprocess::OutputMode::kInherit;
subprocess_.args.input = std::move(input_);
subprocess_.args.out_fd = std::move(output_wr_);
subprocess_.Start();
// Have to Poll at least once so the subprocess has a chance to
// consume the input.
// TODO(hjd): Might not manage to push the whole stdin here in which
// case we can be stuck here forever. We should re-posttask the Poll
// until the whole stdin is consumed.
subprocess_.Poll();
}
// virtual
ExecStatsdBackend::~ExecStatsdBackend() = default;
std::unique_ptr<StatsdBackend> GetStatsdBackend(std::string in,
base::ScopedFile&& out) {
return std::unique_ptr<StatsdBackend>(
new ExecStatsdBackend(std::move(in), std::move(out)));
}
} // namespace
StatsdBackend::StatsdBackend(std::string input, base::ScopedFile output_wr)
: input_(std::move(input)), output_wr_(std::move(output_wr)) {}
StatsdBackend::~StatsdBackend() = default;
SizetPrefixedMessageReader::SizetPrefixedMessageReader()
: RingBufferMessageReader() {}
SizetPrefixedMessageReader::~SizetPrefixedMessageReader() {}
SizetPrefixedMessageReader::Message SizetPrefixedMessageReader::TryReadMessage(
const uint8_t* start,
const uint8_t* end) {
SizetPrefixedMessageReader::Message msg{};
size_t available = static_cast<size_t>(end - start);
if (kHeaderSize <= available) {
size_t sz = 0;
static_assert(sizeof(sz) == kHeaderSize, "kHeaderSize must match size_t");
memcpy(&sz, start, kHeaderSize);
// It is valid for sz to be zero here and we must ensure we return
// a valid Message for this case.
if (kHeaderSize + sz <= available) {
PERFETTO_CHECK(kHeaderSize + sz > sz);
msg.start = start + kHeaderSize;
msg.len = static_cast<uint32_t>(sz);
msg.field_id = 0;
}
}
return msg;
}
// static
const ProbesDataSource::Descriptor StatsdDataSource::descriptor = {
/*name*/ "android.statsd",
/*flags*/ Descriptor::kHandlesIncrementalState,
/*fill_descriptor_func*/ nullptr,
};
StatsdDataSource::StatsdDataSource(base::TaskRunner* task_runner,
TracingSessionID session_id,
std::unique_ptr<TraceWriter> writer,
const DataSourceConfig& ds_config)
: ProbesDataSource(session_id, &descriptor),
task_runner_(task_runner),
writer_(std::move(writer)),
output_(base::Pipe::Create(base::Pipe::Flags::kRdNonBlock)),
shell_subscription_(GenerateShellConfig(ds_config)),
weak_factory_(this) {}
StatsdDataSource::~StatsdDataSource() {
if (output_.rd) {
task_runner_->RemoveFileDescriptorWatch(output_.rd.get());
}
}
void StatsdDataSource::Start() {
// Don't bother actually connecting to statsd if no pull/push atoms
// were configured:
if (shell_subscription_.empty()) {
PERFETTO_LOG("Empty statsd config. Not connecting to statsd.");
return;
}
// The binary protocol for talking to statsd is to write 'size_t'
// followed by a proto encoded ShellConfig. For now we assume that
// us and statsd are the same bitness & endianness.
std::string body = shell_subscription_;
size_t size = body.size();
static_assert(sizeof(size) == kHeaderSize, "kHeaderSize must match size_t");
std::string input(sizeof(size) + size, '\0');
memcpy(&input[0], &size, sizeof(size));
memcpy(&input[0] + sizeof(size), body.data(), size);
backend_ = GetStatsdBackend(std::move(input), std::move(output_.wr));
// Watch is removed on destruction.
auto weak_this = weak_factory_.GetWeakPtr();
task_runner_->AddFileDescriptorWatch(output_.rd.get(), [weak_this] {
if (weak_this) {
weak_this->OnStatsdWakeup();
}
});
}
// Once the pipe is available to read we want to drain it but we need
// to split the work across multiple tasks to avoid statsd ddos'ing us
// and causing us to hit the timeout. At the same time we don't want
// multiple OnStatsdWakeup to cause 'concurrent' read cycles (we're
// single threaded so we can't actually race but we could still end up
// in some confused state) so:
// - The first wakeup triggers DoRead and sets read_in_progress_
// - Subsequent wakeups are ignored due to read_in_progress_
// - DoRead does a single read and either:
// - No data = we're finished so unset read_in_progress_
// - Some data so PostTask another DoRead.
void StatsdDataSource::OnStatsdWakeup() {
if (read_in_progress_) {
return;
}
read_in_progress_ = true;
DoRead();
}
// Do a single read. If there is potentially more data to read schedule
// another DoRead.
void StatsdDataSource::DoRead() {
PERFETTO_CHECK(read_in_progress_);
uint8_t data[4098];
// Read into the static buffer
ssize_t rd = PERFETTO_EINTR(read(output_.rd.get(), &data, sizeof(data)));
if (rd < 0) {
if (!base::IsAgain(errno)) {
PERFETTO_PLOG("Failed to read statsd pipe (ret: %zd)", rd);
}
// EAGAIN or otherwise we're done so re-enable the fd watch.
read_in_progress_ = false;
return;
} else if (rd == 0) {
// EOF so clean everything up.
read_in_progress_ = false;
task_runner_->RemoveFileDescriptorWatch(output_.rd.get());
backend_.reset();
}
buffer_.Append(data, static_cast<size_t>(rd));
TraceWriter::TracePacketHandle packet;
for (;;) {
SizetPrefixedMessageReader::Message msg = buffer_.ReadMessage();
// The whole packet is not available so we're done.
if (!msg.valid()) {
break;
}
// A heart beat packet with no content
if (msg.len == 0) {
continue;
}
packet = writer_->NewTracePacket();
// This is late. It's already been >=2 IPC hops since the client
// code actually produced the atom however we don't get any time
// stamp from statsd/the client so this is the best we can do:
packet->set_timestamp(static_cast<uint64_t>(base::GetBootTimeNs().count()));
auto* atom = packet->set_statsd_atom();
atom->AppendRawProtoBytes(msg.start, msg.len);
packet->Finalize();
}
// Potentially more to read so repost:
auto weak_this = weak_factory_.GetWeakPtr();
task_runner_->PostTask([weak_this] {
if (weak_this) {
weak_this->DoRead();
}
});
}
// static
std::string StatsdDataSource::GenerateShellConfig(
const DataSourceConfig& config) {
StatsdTracingConfig::Decoder cfg(config.statsd_tracing_config_raw());
protozero::HeapBuffered<StatsdShellSubscription> msg;
for (auto pull_it = cfg.pull_config(); pull_it; ++pull_it) {
StatsdPullAtomConfig::Decoder pull_cfg(*pull_it);
AddPullAtoms(pull_cfg, pull_cfg.raw_pull_atom_id(), msg.get());
AddPullAtoms(pull_cfg, pull_cfg.pull_atom_id(), msg.get());
}
AddPushAtoms(cfg.push_atom_id(), msg.get());
AddPushAtoms(cfg.raw_push_atom_id(), msg.get());
return msg.SerializeAsString();
}
base::WeakPtr<StatsdDataSource> StatsdDataSource::GetWeakPtr() const {
return weak_factory_.GetWeakPtr();
}
void StatsdDataSource::Flush(FlushRequestID, std::function<void()> callback) {
writer_->Flush(callback);
}
void StatsdDataSource::ClearIncrementalState() {}
} // namespace perfetto