Merge "tp: add support for gzipped traces in DecompressTrace"
diff --git a/src/trace_processor/BUILD.gn b/src/trace_processor/BUILD.gn
index 671557a..5559f30 100644
--- a/src/trace_processor/BUILD.gn
+++ b/src/trace_processor/BUILD.gn
@@ -254,6 +254,7 @@
"storage",
"tables",
"types",
+ "util",
]
if (enable_perfetto_trace_processor_json) {
deps += [ "../../gn:jsoncpp" ]
@@ -312,6 +313,7 @@
"../../gn:default_deps",
"../../protos/perfetto/trace/ftrace:zero",
"../base",
+ "../protozero",
"analysis",
"db:lib",
"importers:common",
diff --git a/src/trace_processor/importers/gzip/gzip_trace_parser.cc b/src/trace_processor/importers/gzip/gzip_trace_parser.cc
index 89e8c1d..4a04074 100644
--- a/src/trace_processor/importers/gzip/gzip_trace_parser.cc
+++ b/src/trace_processor/importers/gzip/gzip_trace_parser.cc
@@ -22,10 +22,17 @@
#include "perfetto/ext/base/string_utils.h"
#include "perfetto/ext/base/string_view.h"
#include "src/trace_processor/forwarding_trace_parser.h"
+#include "src/trace_processor/util/status_macros.h"
namespace perfetto {
namespace trace_processor {
+namespace {
+
+using ResultCode = GzipDecompressor::ResultCode;
+
+} // namespace
+
GzipTraceParser::GzipTraceParser(TraceProcessorContext* context)
: context_(context) {}
@@ -50,27 +57,37 @@
len -= strlen(kSystraceFileHeader) + offset;
}
}
- decompressor_.SetInput(start, len);
+
+ bool ignored = false;
+ return Parse(start, len, &decompressor_, inner_.get(), &ignored);
+}
+
+util::Status GzipTraceParser::Parse(const uint8_t* data,
+ size_t size,
+ GzipDecompressor* decompressor,
+ ChunkedTraceReader* reader,
+ bool* needs_more_input) {
+ *needs_more_input = false;
+ decompressor->SetInput(data, size);
// Our default uncompressed buffer size is 32MB as it allows for good
// throughput.
- using ResultCode = GzipDecompressor::ResultCode;
constexpr size_t kUncompressedBufferSize = 32 * 1024 * 1024;
for (auto ret = ResultCode::kOk; ret != ResultCode::kEof;) {
std::unique_ptr<uint8_t[]> buffer(new uint8_t[kUncompressedBufferSize]);
auto result =
- decompressor_.Decompress(buffer.get(), kUncompressedBufferSize);
+ decompressor->Decompress(buffer.get(), kUncompressedBufferSize);
ret = result.ret;
if (ret == ResultCode::kError || ret == ResultCode::kNoProgress)
- return util::ErrStatus("Unable to decompress gzip/ctrace trace");
- if (ret == ResultCode::kNeedsMoreInput)
- break;
+ return util::ErrStatus("Failed to decompress trace chunk");
- util::Status status =
- inner_->Parse(std::move(buffer), result.bytes_written);
- if (!status.ok())
- return status;
+ if (ret == ResultCode::kNeedsMoreInput) {
+ *needs_more_input = true;
+ return util::OkStatus();
+ }
+
+ RETURN_IF_ERROR(reader->Parse(std::move(buffer), result.bytes_written));
}
return util::OkStatus();
}
diff --git a/src/trace_processor/importers/gzip/gzip_trace_parser.h b/src/trace_processor/importers/gzip/gzip_trace_parser.h
index c83416e..16a3679 100644
--- a/src/trace_processor/importers/gzip/gzip_trace_parser.h
+++ b/src/trace_processor/importers/gzip/gzip_trace_parser.h
@@ -34,6 +34,12 @@
util::Status Parse(std::unique_ptr<uint8_t[]>, size_t) override;
void NotifyEndOfFile() override;
+ static util::Status Parse(const uint8_t*,
+ size_t,
+ GzipDecompressor*,
+ ChunkedTraceReader*,
+ bool* needs_more_input);
+
private:
TraceProcessorContext* const context_;
GzipDecompressor decompressor_;
diff --git a/src/trace_processor/importers/proto/proto_trace_tokenizer.cc b/src/trace_processor/importers/proto/proto_trace_tokenizer.cc
index 874fb91..fdc7bcd 100644
--- a/src/trace_processor/importers/proto/proto_trace_tokenizer.cc
+++ b/src/trace_processor/importers/proto/proto_trace_tokenizer.cc
@@ -21,6 +21,8 @@
namespace perfetto {
namespace trace_processor {
+ProtoTraceTokenizer::ProtoTraceTokenizer() = default;
+
util::Status ProtoTraceTokenizer::Decompress(TraceBlobView input,
TraceBlobView* output) {
PERFETTO_DCHECK(gzip::IsGzipSupported());
diff --git a/src/trace_processor/importers/proto/proto_trace_tokenizer.h b/src/trace_processor/importers/proto/proto_trace_tokenizer.h
index 5039e48..251043b 100644
--- a/src/trace_processor/importers/proto/proto_trace_tokenizer.h
+++ b/src/trace_processor/importers/proto/proto_trace_tokenizer.h
@@ -35,6 +35,8 @@
// (or subfields, for the case of ftrace) with their timestamps.
class ProtoTraceTokenizer {
public:
+ ProtoTraceTokenizer();
+
template <typename Callback = util::Status(TraceBlobView)>
util::Status Tokenize(std::unique_ptr<uint8_t[]> owned_buf,
size_t size,
diff --git a/src/trace_processor/read_trace.cc b/src/trace_processor/read_trace.cc
index c815a60..3ecc405 100644
--- a/src/trace_processor/read_trace.cc
+++ b/src/trace_processor/read_trace.cc
@@ -19,9 +19,13 @@
#include "perfetto/base/logging.h"
#include "perfetto/ext/base/scoped_file.h"
#include "perfetto/ext/base/utils.h"
+#include "perfetto/protozero/proto_utils.h"
#include "perfetto/trace_processor/trace_processor.h"
+#include "src/trace_processor/forwarding_trace_parser.h"
+#include "src/trace_processor/importers/gzip/gzip_trace_parser.h"
#include "src/trace_processor/importers/gzip/gzip_utils.h"
+#include "src/trace_processor/importers/proto/proto_trace_tokenizer.h"
#include "src/trace_processor/util/status_macros.h"
#include "protos/perfetto/trace/trace.pbzero.h"
@@ -72,6 +76,37 @@
return util::OkStatus();
}
+class SerializingProtoTraceReader : public ChunkedTraceReader {
+ public:
+ SerializingProtoTraceReader(std::vector<uint8_t>* output) : output_(output) {}
+
+ util::Status Parse(std::unique_ptr<uint8_t[]> data, size_t size) override {
+ return tokenizer_.Tokenize(
+ std::move(data), size, [this](TraceBlobView packet) {
+ uint8_t buffer[protozero::proto_utils::kMaxSimpleFieldEncodedSize];
+
+ uint8_t* pos = buffer;
+ pos = protozero::proto_utils::WriteVarInt(kTracePacketTag, pos);
+ pos = protozero::proto_utils::WriteVarInt(packet.length(), pos);
+ output_->insert(output_->end(), buffer, pos);
+
+ output_->insert(output_->end(), packet.data(),
+ packet.data() + packet.length());
+ return util::OkStatus();
+ });
+ }
+
+ void NotifyEndOfFile() override {}
+
+ private:
+ static constexpr uint8_t kTracePacketTag =
+ protozero::proto_utils::MakeTagLengthDelimited(
+ protos::pbzero::Trace::kPacketFieldNumber);
+
+ ProtoTraceTokenizer tokenizer_;
+ std::vector<uint8_t>* output_;
+};
+
} // namespace
util::Status ReadTrace(
@@ -152,11 +187,28 @@
util::Status DecompressTrace(const uint8_t* data,
size_t size,
std::vector<uint8_t>* output) {
- if (!gzip::IsGzipSupported()) {
+ TraceType type = GuessTraceType(data, size);
+ if (type != TraceType::kGzipTraceType && type != TraceType::kProtoTraceType) {
return util::ErrStatus(
- "Cannot decompress trace in build where zlib is disabled");
+ "Only GZIP and proto trace types are supported by DecompressTrace");
}
+ if (type == TraceType::kGzipTraceType) {
+ GzipDecompressor decompressor;
+ SerializingProtoTraceReader reader(output);
+
+ bool needs_more_input = false;
+ RETURN_IF_ERROR(GzipTraceParser::Parse(data, size, &decompressor, &reader,
+ &needs_more_input));
+
+ if (needs_more_input)
+ return util::ErrStatus("Cannot decompress partial trace file");
+
+ return util::OkStatus();
+ }
+
+ PERFETTO_CHECK(type == TraceType::kProtoTraceType);
+
protos::pbzero::Trace::Decoder decoder(data, size);
GzipDecompressor decompressor;
if (size > 0 && !decoder.packet()) {
diff --git a/src/trace_processor/read_trace_integrationtest.cc b/src/trace_processor/read_trace_integrationtest.cc
index 43ea1c7..c4218af 100644
--- a/src/trace_processor/read_trace_integrationtest.cc
+++ b/src/trace_processor/read_trace_integrationtest.cc
@@ -28,10 +28,12 @@
namespace trace_processor {
namespace {
-TEST(ReadTraceIntegrationTest, CompressedTrace) {
- base::ScopedFstream f(fopen(
- base::GetTestDataPath(std::string("test/data/compressed.pb")).c_str(),
- "rb"));
+base::ScopedFstream OpenTestTrace(const std::string& path) {
+ std::string full_path = base::GetTestDataPath(path);
+ return base::ScopedFstream(fopen(full_path.c_str(), "rb"));
+}
+
+std::vector<uint8_t> ReadAllData(const base::ScopedFstream& f) {
std::vector<uint8_t> raw_trace;
while (!feof(*f)) {
uint8_t buf[4096];
@@ -39,6 +41,12 @@
fread(reinterpret_cast<char*>(buf), 1, base::ArraySize(buf), *f);
raw_trace.insert(raw_trace.end(), buf, buf + rsize);
}
+ return raw_trace;
+}
+
+TEST(ReadTraceIntegrationTest, CompressedTrace) {
+ base::ScopedFstream f = OpenTestTrace("test/data/compressed.pb");
+ std::vector<uint8_t> raw_trace = ReadAllData(f);
std::vector<uint8_t> decompressed;
decompressed.reserve(raw_trace.size());
@@ -59,17 +67,8 @@
}
TEST(ReadTraceIntegrationTest, NonProtobufShouldNotDecompress) {
- base::ScopedFstream f(
- fopen(base::GetTestDataPath(std::string("test/data/unsorted_trace.json"))
- .c_str(),
- "rb"));
- std::vector<uint8_t> raw_trace;
- while (!feof(*f)) {
- uint8_t buf[4096];
- auto rsize =
- fread(reinterpret_cast<char*>(buf), 1, base::ArraySize(buf), *f);
- raw_trace.insert(raw_trace.end(), buf, buf + rsize);
- }
+ base::ScopedFstream f = OpenTestTrace("test/data/unsorted_trace.json");
+ std::vector<uint8_t> raw_trace = ReadAllData(f);
std::vector<uint8_t> decompressed;
util::Status status = trace_processor::DecompressTrace(
@@ -77,6 +76,44 @@
ASSERT_FALSE(status.ok());
}
+TEST(ReadTraceIntegrationTest, OuterGzipDecompressTrace) {
+ base::ScopedFstream f =
+ OpenTestTrace("test/data/example_android_trace_30s.pb.gz");
+ std::vector<uint8_t> raw_compressed_trace = ReadAllData(f);
+
+ std::vector<uint8_t> decompressed;
+ util::Status status = trace_processor::DecompressTrace(
+ raw_compressed_trace.data(), raw_compressed_trace.size(), &decompressed);
+ ASSERT_TRUE(status.ok());
+
+ base::ScopedFstream u =
+ OpenTestTrace("test/data/example_android_trace_30s.pb");
+ std::vector<uint8_t> raw_trace = ReadAllData(u);
+
+ ASSERT_EQ(decompressed.size(), raw_trace.size());
+ ASSERT_EQ(decompressed, raw_trace);
+}
+
+TEST(ReadTraceIntegrationTest, DoubleGzipDecompressTrace) {
+ base::ScopedFstream f = OpenTestTrace("test/data/compressed.pb.gz");
+ std::vector<uint8_t> raw_compressed_trace = ReadAllData(f);
+
+ std::vector<uint8_t> decompressed;
+ util::Status status = trace_processor::DecompressTrace(
+ raw_compressed_trace.data(), raw_compressed_trace.size(), &decompressed);
+ ASSERT_TRUE(status.ok());
+
+ protos::pbzero::Trace::Decoder decoder(decompressed.data(),
+ decompressed.size());
+ uint32_t packet_count = 0;
+ for (auto it = decoder.packet(); it; ++it) {
+ protos::pbzero::TracePacket::Decoder packet(*it);
+ ASSERT_FALSE(packet.has_compressed_packets());
+ ++packet_count;
+ }
+ ASSERT_EQ(packet_count, 2412u);
+}
+
} // namespace
} // namespace trace_processor
} // namespace perfetto
diff --git a/tools/install-build-deps b/tools/install-build-deps
index 86d116c..e167c82 100755
--- a/tools/install-build-deps
+++ b/tools/install-build-deps
@@ -166,8 +166,8 @@
# Example traces for regression tests.
Dependency(
'buildtools/test_data.zip',
- 'https://storage.googleapis.com/perfetto/test-data-20200811-161047.zip',
- 'ffe5b473a3edfb94d1fa04610e86171d4cdc35de',
+ 'https://storage.googleapis.com/perfetto/test-data-20200910-141649.zip',
+ '8439e07570cbae4114f6495c68c4490c207bff58',
'all',
),