| /* |
| * Copyright (C) 2020 The Android Open Source Project |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #include <math.h> |
| #include <stdint.h> |
| |
| #include <algorithm> |
| #include <atomic> |
| #include <chrono> |
| #include <list> |
| #include <random> |
| #include <thread> |
| |
| #include "perfetto/base/time.h" |
| #include "perfetto/ext/base/file_utils.h" |
| #include "perfetto/ext/base/string_utils.h" |
| #include "perfetto/tracing.h" |
| |
| #include "protos/perfetto/config/stress_test_config.gen.h" |
| #include "protos/perfetto/trace/test_event.pbzero.h" |
| |
| using StressTestConfig = perfetto::protos::gen::StressTestConfig; |
| |
| namespace perfetto { |
| namespace { |
| |
| StressTestConfig* g_cfg; |
| |
| class StressTestDataSource : public DataSource<StressTestDataSource> { |
| public: |
| constexpr static BufferExhaustedPolicy kBufferExhaustedPolicy = |
| BufferExhaustedPolicy::kStall; |
| |
| void OnSetup(const SetupArgs& args) override; |
| void OnStart(const StartArgs&) override; |
| void OnStop(const StopArgs&) override; |
| |
| private: |
| class Worker { |
| public: |
| explicit Worker(uint32_t id) : id_(id) {} |
| void Start(); |
| void Stop(); |
| ~Worker() { Stop(); } |
| |
| private: |
| void WorkerMain(uint32_t worker_id); |
| void FillPayload(const StressTestConfig::WriterTiming&, |
| uint32_t seq, |
| uint32_t nesting, |
| protos::pbzero::TestEvent::TestPayload*); |
| |
| const uint32_t id_; |
| std::thread thread_; |
| std::atomic<bool> quit_; |
| std::minstd_rand0 rnd_seq_; |
| |
| // Use a different engine for the generation of random value, keep rnd_seq_ |
| // dedicated to generating deterministic sequences. |
| std::minstd_rand0 rnd_gen_; |
| }; |
| |
| std::list<Worker> workers_; |
| }; |
| |
| // Called before the tracing session starts. |
| void StressTestDataSource::OnSetup(const SetupArgs&) { |
| for (uint32_t i = 0; i < std::max(g_cfg->num_threads(), 1u); ++i) |
| workers_.emplace_back(i); |
| } |
| |
| // Called when the tracing session starts. |
| void StressTestDataSource::OnStart(const StartArgs&) { |
| for (auto& worker : workers_) |
| worker.Start(); |
| } |
| |
| // Called when the tracing session ends. |
| void StressTestDataSource::OnStop(const StopArgs&) { |
| for (auto& worker : workers_) |
| worker.Stop(); |
| workers_.clear(); |
| } |
| |
| void StressTestDataSource::Worker::Start() { |
| quit_.store(false); |
| thread_ = std::thread(&StressTestDataSource::Worker::WorkerMain, this, id_); |
| } |
| |
| void StressTestDataSource::Worker::Stop() { |
| if (!thread_.joinable() || quit_) |
| return; |
| PERFETTO_DLOG("Stopping worker %u", id_); |
| quit_.store(true); |
| thread_.join(); |
| } |
| |
| void StressTestDataSource::Worker::WorkerMain(uint32_t worker_id) { |
| PERFETTO_DLOG("Worker %u starting", worker_id); |
| rnd_seq_ = std::minstd_rand0(0); |
| int64_t t_start = base::GetBootTimeNs().count(); |
| int64_t num_msgs = 0; |
| |
| const int64_t max_msgs = g_cfg->max_events() |
| ? static_cast<int64_t>(g_cfg->max_events()) |
| : INT64_MAX; |
| bool is_last = false; |
| while (!is_last) { |
| is_last = quit_ || ++num_msgs >= max_msgs; |
| |
| const int64_t now = base::GetBootTimeNs().count(); |
| const auto elapsed_ms = static_cast<uint64_t>((now - t_start) / 1000000); |
| |
| const auto* timings = &g_cfg->steady_state_timings(); |
| if (g_cfg->burst_period_ms() && |
| elapsed_ms % g_cfg->burst_period_ms() > |
| (g_cfg->burst_period_ms() - g_cfg->burst_duration_ms())) { |
| timings = &g_cfg->burst_timings(); |
| } |
| std::normal_distribution<> rate_dist{timings->rate_mean(), |
| timings->rate_stddev()}; |
| |
| double period_ns = 1e9 / rate_dist(rnd_gen_); |
| period_ns = isnan(period_ns) || period_ns == 0.0 ? 1 : period_ns; |
| double expected_msgs = static_cast<double>(now - t_start) / period_ns; |
| int64_t delay_ns = 0; |
| if (static_cast<int64_t>(expected_msgs) < num_msgs) |
| delay_ns = static_cast<int64_t>(period_ns); |
| std::this_thread::sleep_for( |
| std::chrono::nanoseconds(static_cast<int64_t>(delay_ns))); |
| |
| StressTestDataSource::Trace([&](StressTestDataSource::TraceContext ctx) { |
| const uint32_t seq = static_cast<uint32_t>(rnd_seq_()); |
| auto packet = ctx.NewTracePacket(); |
| packet->set_timestamp(static_cast<uint64_t>(now)); |
| auto* test_event = packet->set_for_testing(); |
| test_event->set_seq_value(seq); |
| test_event->set_counter(static_cast<uint64_t>(num_msgs)); |
| if (is_last) |
| test_event->set_is_last(true); |
| |
| FillPayload(*timings, seq, g_cfg->nesting(), test_event->set_payload()); |
| }); // Trace(). |
| |
| } // while (!quit) |
| PERFETTO_DLOG("Worker done"); |
| } |
| |
| void StressTestDataSource::Worker::FillPayload( |
| const StressTestConfig::WriterTiming& timings, |
| uint32_t seq, |
| uint32_t nesting, |
| protos::pbzero::TestEvent::TestPayload* payload) { |
| // Write the payload in two halves, optionally with some delay in the |
| // middle. |
| std::normal_distribution<> msg_size_dist{timings.payload_mean(), |
| timings.payload_stddev()}; |
| auto payload_size = |
| static_cast<uint32_t>(std::max(std::round(msg_size_dist(rnd_gen_)), 0.0)); |
| std::string buf; |
| buf.resize(payload_size / 2); |
| for (size_t i = 0; i < buf.size(); ++i) { |
| buf[i] = static_cast<char>(33 + ((seq + i) % 64)); // Stay ASCII. |
| } |
| payload->add_str(buf); |
| payload->set_remaining_nesting_depth(nesting); |
| if (timings.payload_write_time_ms() > 0) { |
| std::this_thread::sleep_for( |
| std::chrono::milliseconds(timings.payload_write_time_ms())); |
| } |
| |
| if (nesting > 0) |
| FillPayload(timings, seq, nesting - 1, payload->add_nested()); |
| |
| payload->add_str(buf); |
| } |
| } // namespace |
| |
| PERFETTO_DECLARE_DATA_SOURCE_STATIC_MEMBERS(StressTestDataSource); |
| PERFETTO_DEFINE_DATA_SOURCE_STATIC_MEMBERS(StressTestDataSource); |
| |
| } // namespace perfetto |
| |
| int main() { |
| perfetto::TracingInitArgs args; |
| args.backends = perfetto::kSystemBackend; |
| |
| std::string config_blob; |
| if (isatty(fileno(stdin))) |
| PERFETTO_LOG("Reading StressTestConfig proto from stdin"); |
| perfetto::base::ReadFileStream(stdin, &config_blob); |
| |
| StressTestConfig cfg; |
| perfetto::g_cfg = &cfg; |
| if (config_blob.empty() || !cfg.ParseFromString(config_blob)) |
| PERFETTO_FATAL("A StressTestConfig blob must be passed into stdin"); |
| |
| if (cfg.shmem_page_size_kb()) |
| args.shmem_page_size_hint_kb = cfg.shmem_page_size_kb(); |
| if (cfg.shmem_size_kb()) |
| args.shmem_page_size_hint_kb = cfg.shmem_size_kb(); |
| |
| perfetto::Tracing::Initialize(args); |
| perfetto::DataSourceDescriptor dsd; |
| dsd.set_name("perfetto.stress_test"); |
| perfetto::StressTestDataSource::Register(dsd); |
| |
| for (;;) { |
| std::this_thread::sleep_for(std::chrono::seconds(30)); |
| } |
| } |