blob: 7c09984b8452f834c98cc3b9bb0a728ccc06661b [file] [log] [blame]
/*
* Copyright (C) 2024 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <mutex>
#include <thread>
#include <grpcpp/server_context.h>
#include <grpcpp/support/status.h>
#include "perfetto/base/time.h"
#include "perfetto/ext/trace_processor/rpc/query_result_serializer.h"
#include "perfetto/trace_processor/trace_processor.h"
#include "src/bigtrace/worker/worker_impl.h"
namespace perfetto::bigtrace {
grpc::Status WorkerImpl::QueryTrace(
grpc::ServerContext* server_context,
const protos::BigtraceQueryTraceArgs* args,
protos::BigtraceQueryTraceResponse* response) {
std::mutex mutex;
bool is_thread_done = false;
std::string args_trace = args->trace();
if (args_trace.empty()) {
return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
"Empty trace name is not valid");
}
if (args_trace[0] != '/') {
return grpc::Status(
grpc::StatusCode::INVALID_ARGUMENT,
"Trace path must contain and begin with / for the prefix");
}
std::string prefix = args_trace.substr(0, args_trace.find("/", 1));
if (registry_.find(prefix) == registry_.end()) {
return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
"Path prefix does not exist in registry");
}
if (prefix.length() == args_trace.length()) {
return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
"Empty path is invalid");
}
std::string path = args_trace.substr(prefix.length() + 1);
base::StatusOr<std::unique_ptr<trace_processor::TraceProcessor>> tp_or =
registry_[prefix]->LoadTraceProcessor(path);
if (!tp_or.ok()) {
const std::string& error_message = tp_or.status().message();
return grpc::Status(grpc::StatusCode::INTERNAL, error_message);
}
std::unique_ptr<trace_processor::TraceProcessor> tp = std::move(*tp_or);
std::optional<trace_processor::Iterator> iterator;
std::thread execute_query_thread([&]() {
iterator = tp->ExecuteQuery(args->sql_query());
std::lock_guard<std::mutex> lk(mutex);
is_thread_done = true;
});
for (;;) {
if (server_context->IsCancelled()) {
// If the thread is cancelled, we need to propagate the information to the
// trace processor thread and we do this by attempting to interrupt the
// trace processor every 10ms until the trace processor thread returns.
//
// A loop is necessary here because, due to scheduling delay, it is
// possible we are cancelled before trace processor even started running.
// InterruptQuery is ignored if it happens before entering TraceProcessor
// which can cause the query to not be interrupted at all.
while (!execute_query_thread.joinable()) {
base::SleepMicroseconds(10000);
tp->InterruptQuery();
}
execute_query_thread.join();
return grpc::Status::CANCELLED;
}
std::lock_guard<std::mutex> lk(mutex);
if (is_thread_done) {
execute_query_thread.join();
trace_processor::QueryResultSerializer serializer =
trace_processor::QueryResultSerializer(*std::move(iterator));
std::vector<uint8_t> serialized;
for (bool has_more = true; has_more;) {
serialized.clear();
has_more = serializer.Serialize(&serialized);
response->add_result()->ParseFromArray(
serialized.data(), static_cast<int>(serialized.size()));
}
response->set_trace(args->trace());
return grpc::Status::OK;
}
}
}
} // namespace perfetto::bigtrace