tp: refine handling of default vendor configs
Instead of having a override function, expect a default config to be
provided: this better aligns with the intended usage of this API.
Change-Id: I22edbe6f4fb03219d8ea2fe55666dc9d6042778d
diff --git a/bazel/standalone/perfetto_cfg.bzl b/bazel/standalone/perfetto_cfg.bzl
index 5725041..c9c89f1 100644
--- a/bazel/standalone/perfetto_cfg.bzl
+++ b/bazel/standalone/perfetto_cfg.bzl
@@ -53,12 +53,9 @@
# The Python targets are empty on the standalone build because we assume
# any relevant deps are installed on the system or are not applicable.
- tp_init_py = [],
- gfile_py = [],
protobuf_py = [],
- protobuf_descriptor_pb2_py = [],
- pyglib_py = [],
pandas_py = [],
+ tp_vendor_py = [],
),
# This struct allows embedders to customize the cc_opts for Perfetto
diff --git a/python/BUILD b/python/BUILD
index f330ba8..39a0a7e 100644
--- a/python/BUILD
+++ b/python/BUILD
@@ -38,11 +38,10 @@
"perfetto/trace_processor/trace_processor.descriptor",
"perfetto/trace_processor/metrics.descriptor",
PERFETTO_CONFIG.root + ":trace_processor_shell",
- ] + PERFETTO_CONFIG.deps.tp_init_py,
- deps = PERFETTO_CONFIG.deps.gfile_py +
+ ],
+ deps = PERFETTO_CONFIG.deps.tp_vendor_py +
PERFETTO_CONFIG.deps.protobuf_py +
- PERFETTO_CONFIG.deps.protobuf_descriptor_pb2_py +
- PERFETTO_CONFIG.deps.pyglib_py,
+ PERFETTO_CONFIG.deps.pandas_py,
imports = [
".",
],
@@ -79,7 +78,8 @@
]),
deps = [
":trace_processor_py",
- ] + PERFETTO_CONFIG.deps.pandas_py,
+ ] + PERFETTO_CONFIG.deps.pandas_py +
+ PERFETTO_CONFIG.deps.tp_vendor_py,
imports = [
".",
],
diff --git a/python/perfetto/batch_trace_processor/api.py b/python/perfetto/batch_trace_processor/api.py
index caf9f74..b320090 100644
--- a/python/perfetto/batch_trace_processor/api.py
+++ b/python/perfetto/batch_trace_processor/api.py
@@ -14,18 +14,25 @@
# limitations under the License.
"""Contains classes for BatchTraceProcessor API."""
-from concurrent.futures.thread import ThreadPoolExecutor
+import concurrent.futures as cf
import dataclasses as dc
import multiprocessing
-from typing import Any, Callable, Dict, Optional, Tuple, Union, List
-from numpy.lib.npyio import load
+from typing import Any, Callable, Dict, Tuple, Union, List
import pandas as pd
-from perfetto.trace_processor import LoadableTrace
-from perfetto.trace_processor import TraceProcessor
-from perfetto.trace_processor import TraceProcessorException
-from perfetto.trace_processor import TraceProcessorConfig
+from perfetto.trace_processor.api import LoadableTrace
+from perfetto.trace_processor.api import TraceProcessor
+from perfetto.trace_processor.api import TraceProcessorException
+from perfetto.trace_processor.api import TraceProcessorConfig
+from perfetto.batch_trace_processor.platform import PlatformDelegate
+
+# Defining this field as a module variable means this can be changed by
+# implementations at startup and used for all BatchTraceProcessor objects
+# without having to specify on each one.
+# In Google3, this field is rewritten using Copybara to a implementation
+# which can integrates with internal infra.
+PLATFORM_DELEGATE = PlatformDelegate
@dc.dataclass
@@ -36,45 +43,11 @@
@dc.dataclass
class BatchTraceProcessorConfig:
- TraceProvider = Callable[[str], List[
- Union[LoadableTrace, BatchLoadableTrace]]]
-
tp_config: TraceProcessorConfig
- query_executor: Optional[ThreadPoolExecutor]
- load_executor: Optional[ThreadPoolExecutor]
-
- trace_provider: TraceProvider
-
- def __default_trace_provider(custom_string: str):
- del custom_string
- raise TraceProcessorException(
- 'Passed a string to batch trace processor constructor without '
- 'a trace provider being registered.')
-
- def __init__(self,
- tp_config: TraceProcessorConfig = TraceProcessorConfig(),
- query_executor: Optional[ThreadPoolExecutor] = None,
- load_executor: Optional[ThreadPoolExecutor] = None,
- trace_provider: TraceProvider = __default_trace_provider):
+ def __init__(self, tp_config: TraceProcessorConfig = TraceProcessorConfig()):
self.tp_config = tp_config
- self.query_executor = query_executor
- self.load_executor = load_executor
-
- self.trace_provider = trace_provider
-
- try:
- # This is the only place in batch trace processor which should import
- # from a "vendor" namespace - the purpose of this code is to allow
- # for users to set their own "default" config for batch trace processor
- # without needing to specify the config in every place when batch
- # trace processor is used.
- from .vendor import override_batch_tp_config
- override_batch_tp_config(self)
- except ModuleNotFoundError:
- pass
-
class BatchTraceProcessor:
"""Run ad-hoc SQL queries across many Perfetto traces.
@@ -96,21 +69,13 @@
Python across many traces.
Args:
- traces: Either a list of traces or a custom string which will be
- converted to a list of traces.
-
- If a list, each item can be one of the following types:
+ traces: A list of traces where each item is one of the following types:
1) path to a trace file to open and read
2) a file like object (file, io.BytesIO or similar) to read
3) a generator yielding bytes
4) a BatchLoadableTrace object; this is basically a wrapper around
one of the above types plus an args field; see |query_and_flatten|
for the motivation for the args field.
-
- If a string, it is passed to BatchTraceProcessorConfig.trace_provider to
- convert to a list of traces; the default implementation of this
- function just throws an exception so an implementation must be provided
- if strings will be passed.
config: configuration options which customize functionality of batch
trace processor and underlying trace processors.
"""
@@ -121,27 +86,27 @@
return x
return BatchLoadableTrace(trace=x, args={})
- def create_tp(trace: BatchLoadableTrace) -> TraceProcessor:
+ def _create_tp(trace: BatchLoadableTrace) -> TraceProcessor:
return TraceProcessor(trace=trace.trace, config=config.tp_config)
- if isinstance(traces, str):
- trace_list = config.trace_provider(traces)
- else:
- trace_list = traces
+ batch_traces = [_create_batch_trace(t) for t in traces]
+ trace_count = len(batch_traces)
- batch_traces = [_create_batch_trace(t) for t in trace_list]
+ self.platform_delegate = PLATFORM_DELEGATE()
# As trace processor is completely CPU bound, it makes sense to just
# max out the CPUs available.
- query_executor = config.query_executor or ThreadPoolExecutor(
- max_workers=multiprocessing.cpu_count())
- load_exectuor = config.load_executor or query_executor
+ query_executor = self.platform_delegate.create_query_executor(
+ trace_count) or cf.ThreadPoolExecutor(
+ max_workers=multiprocessing.cpu_count())
+ load_exectuor = self.platform_delegate.create_load_executor(
+ trace_count) or query_executor
self.tps = None
self.closed = False
self.query_executor = query_executor
self.args = [t.args for t in batch_traces]
- self.tps = list(load_exectuor.map(create_tp, batch_traces))
+ self.tps = list(load_exectuor.map(_create_tp, batch_traces))
def metric(self, metrics: List[str]):
"""Computes the provided metrics.
diff --git a/python/perfetto/batch_trace_processor/platform.py b/python/perfetto/batch_trace_processor/platform.py
new file mode 100644
index 0000000..63f1d6c
--- /dev/null
+++ b/python/perfetto/batch_trace_processor/platform.py
@@ -0,0 +1,27 @@
+# Copyright (C) 2022 The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import concurrent.futures as cf
+from typing import Optional
+
+
+class PlatformDelegate:
+ """Abstracts operations which can vary based on platform."""
+
+ def create_query_executor(trace_count: int
+ ) -> Optional[cf.ThreadPoolExecutor]:
+ return None
+
+ def create_load_executor(trace_count: int) -> Optional[cf.ThreadPoolExecutor]:
+ return None
diff --git a/python/perfetto/trace_processor/api.py b/python/perfetto/trace_processor/api.py
index 3f24a13..45bcde0 100644
--- a/python/perfetto/trace_processor/api.py
+++ b/python/perfetto/trace_processor/api.py
@@ -13,19 +13,24 @@
# limitations under the License.
import dataclasses as dc
-from enum import unique
from urllib.parse import urlparse
-from typing import BinaryIO, Callable, Generator, List, Optional, Tuple, Union
+from typing import BinaryIO, Generator, List, Optional, Union
from perfetto.trace_processor.http import TraceProcessorHttp
-from perfetto.trace_processor.loader import get_loader
+from perfetto.trace_processor.platform import PlatformDelegate
from perfetto.trace_processor.protos import ProtoFactory
from perfetto.trace_processor.shell import load_shell
+# Defining this field as a module variable means this can be changed by
+# implementations at startup and used for all TraceProcessor objects
+# without having to specify on each one.
+# In Google3, this field is rewritten using Copybara to a implementation
+# which can integrates with internal infra.
+PLATFORM_DELEGATE = PlatformDelegate
+
# Union of types supported for a trace which can be loaded by shell.
LoadableTrace = Union[None, str, BinaryIO, Generator[bytes, None, None]]
-
# Custom exception raised if any trace_processor functions return a
# response with an error defined
class TraceProcessorException(Exception):
@@ -40,46 +45,14 @@
unique_port: bool
verbose: bool
- read_tp_descriptor: Callable[[], bytes]
- read_metrics_descriptor: Callable[[], bytes]
- parse_file: Callable[[TraceProcessorHttp, str], TraceProcessorHttp]
- get_shell_path: Callable[[str], None]
- get_free_port: Callable[[bool], Tuple[str, str]]
-
- def __init__(
- self,
- bin_path: Optional[str] = None,
- unique_port: bool = True,
- verbose: bool = False,
- read_tp_descriptor: Callable[[], bytes] = get_loader().read_tp_descriptor,
- read_metrics_descriptor: Callable[[], bytes] = get_loader(
- ).read_metrics_descriptor,
- parse_file: Callable[[TraceProcessorHttp, str],
- TraceProcessorHttp] = get_loader().parse_file,
- get_shell_path: Callable[[str], None] = get_loader().get_shell_path,
- get_free_port: Callable[[bool], Tuple[str, str]] = get_loader(
- ).get_free_port):
+ def __init__(self,
+ bin_path: Optional[str] = None,
+ unique_port: bool = True,
+ verbose: bool = False):
self.bin_path = bin_path
self.unique_port = unique_port
self.verbose = verbose
- self.read_tp_descriptor = read_tp_descriptor
- self.read_metrics_descriptor = read_metrics_descriptor
- self.parse_file = parse_file
- self.get_shell_path = get_shell_path
- self.get_free_port = get_free_port
-
- try:
- # This is the only place in trace processor which should import
- # from a "vendor" namespace - the purpose of this code is to allow
- # for users to set their own "default" config for trace processor
- # without needing to specify the config in every place when trace
- # processor is used.
- from .vendor import override_default_tp_config
- return override_default_tp_config(self)
- except ModuleNotFoundError:
- pass
-
class TraceProcessor:
@@ -241,11 +214,6 @@
1) path to a trace file to open and read
2) a file like object (file, io.BytesIO or similar) to read
3) a generator yielding bytes
- 4) a custom string format which can be understood by
- TraceProcessorConfig.parse_file function. The default
- implementation of this function only supports file paths (i.e. option
- 1) but callers can choose to change the implementation to parse
- a custom string format and use that to retrieve a race.
addr: address of a running trace processor instance. Useful to query an
already loaded trace.
config: configuration options which customize functionality of trace
@@ -255,30 +223,19 @@
an exception to be thrown.
"""
- def create_tp_http(protos: ProtoFactory) -> TraceProcessorHttp:
- if addr:
- p = urlparse(addr)
- return TraceProcessorHttp(
- p.netloc if p.netloc else p.path, protos=protos)
-
- url, self.subprocess = load_shell(
- bin_path=config.bin_path,
- unique_port=config.unique_port,
- verbose=config.verbose)
- return TraceProcessorHttp(url, protos=protos)
-
if trace and file_path:
raise TraceProcessorException(
"trace and file_path cannot both be specified.")
- self.protos = ProtoFactory(config.read_tp_descriptor(),
- config.read_metrics_descriptor())
- self.http = create_tp_http(self.protos)
+ self.config = config
+ self.platform_delegate = PLATFORM_DELEGATE()
+ self.protos = ProtoFactory(self.platform_delegate)
+ self.http = self._create_tp_http(addr)
if file_path:
- config.parse_file(self.http, file_path)
+ self.platform_delegate.parse_file(self.http, file_path)
elif isinstance(trace, str):
- config.parse_file(self.http, trace)
+ self.platform_delegate.parse_file(self.http, trace)
elif hasattr(trace, 'read'):
while True:
chunk = trace.read(32 * 1024 * 1024)
@@ -346,6 +303,18 @@
return response.metatrace
+ def _create_tp_http(self, addr: str) -> TraceProcessorHttp:
+ if addr:
+ p = urlparse(addr)
+ parsed = p.netloc if p.netloc else p.path
+ return TraceProcessorHttp(parsed, protos=self.protos)
+
+ url, self.subprocess = load_shell(self.config.bin_path,
+ self.config.unique_port,
+ self.config.verbose,
+ self.platform_delegate)
+ return TraceProcessorHttp(url, protos=self.protos)
+
def __enter__(self):
return self
diff --git a/python/perfetto/trace_processor/loader.py b/python/perfetto/trace_processor/loader.py
deleted file mode 100644
index 0a1b16b..0000000
--- a/python/perfetto/trace_processor/loader.py
+++ /dev/null
@@ -1,90 +0,0 @@
-#!/usr/bin/env python3
-# Copyright (C) 2020 The Android Open Source Project
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import os
-import socket
-import subprocess
-import tempfile
-from urllib import request
-
-
-# This class contains all functions that first try to use a vendor to fulfil
-# their function
-class LoaderStandalone:
- # Limit parsing file to 32MB to maintain parity with the UI
- MAX_BYTES_LOADED = 32 * 1024 * 1024
-
- # URL to download script to run trace_processor
- SHELL_URL = 'http://get.perfetto.dev/trace_processor'
-
- # Default port that trace_processor_shell runs on
- TP_PORT = '9001'
-
- def read_tp_descriptor():
- ws = os.path.dirname(__file__)
- with open(os.path.join(ws, 'trace_processor.descriptor'), 'rb') as x:
- return x.read()
-
- def read_metrics_descriptor():
- ws = os.path.dirname(__file__)
- with open(os.path.join(ws, 'metrics.descriptor'), 'rb') as x:
- return x.read()
-
- def parse_file(tp_http, file_path):
- with open(file_path, 'rb') as f:
- f_size = os.path.getsize(file_path)
- bytes_read = 0
- while (bytes_read < f_size):
- chunk = f.read(LoaderStandalone.MAX_BYTES_LOADED)
- tp_http.parse(chunk)
- bytes_read += len(chunk)
- tp_http.notify_eof()
- return tp_http
-
- def get_shell_path(bin_path):
- # Try to use preexisting binary before attempting to download
- # trace_processor
- if bin_path is None:
- with tempfile.NamedTemporaryFile(delete=False) as file:
- req = request.Request(LoaderStandalone.SHELL_URL)
- with request.urlopen(req) as req:
- file.write(req.read())
- subprocess.check_output(['chmod', '+x', file.name])
- return file.name
- else:
- if not os.path.isfile(bin_path):
- raise Exception('Path to binary is not valid')
- return bin_path
-
- def get_free_port(unique_port):
- if not unique_port:
- return LoaderStandalone.TP_PORT, f'localhost:{LoaderStandalone.TP_PORT}'
- free_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- free_socket.bind(('', 0))
- free_socket.listen(5)
- port = free_socket.getsockname()[1]
- free_socket.close()
- return str(port), f"localhost:{str(port)}"
-
-
-# Return vendor class if it exists before falling back on LoaderStandalone
-# TODO(lalitm): remove this after migrating all consumers to
-# TraceProcessorConfig.
-def get_loader():
- try:
- from .loader_vendor import LoaderVendor
- return LoaderVendor
- except ModuleNotFoundError:
- return LoaderStandalone
diff --git a/python/perfetto/trace_processor/platform.py b/python/perfetto/trace_processor/platform.py
new file mode 100644
index 0000000..69b6b26
--- /dev/null
+++ b/python/perfetto/trace_processor/platform.py
@@ -0,0 +1,73 @@
+#!/usr/bin/env python3
+# Copyright (C) 2022 The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import socket
+import subprocess
+import tempfile
+from typing import Tuple
+from urllib import request
+
+# Limit parsing file to 32MB to maintain parity with the UI
+MAX_BYTES_LOADED = 32 * 1024 * 1024
+
+# URL to download script to run trace_processor
+SHELL_URL = 'http://get.perfetto.dev/trace_processor'
+
+
+class PlatformDelegate:
+ """Abstracts operations which can vary based on platform."""
+
+ def get_resource(self, file: str) -> bytes:
+ ws = os.path.dirname(__file__)
+ with open(os.path.join(ws, file), 'rb') as x:
+ return x.read()
+
+ # TODO(lalitm): when we add trace resolving in future CL, remove this
+ # function.
+ def parse_file(self, tp_http, file_path: str):
+ with open(file_path, 'rb') as f:
+ f_size = os.path.getsize(file_path)
+ bytes_read = 0
+ while (bytes_read < f_size):
+ chunk = f.read(MAX_BYTES_LOADED)
+ tp_http.parse(chunk)
+ bytes_read += len(chunk)
+ tp_http.notify_eof()
+ return tp_http
+
+ def get_shell_path(self, bin_path: str) -> str:
+ if bin_path is not None:
+ if not os.path.isfile(bin_path):
+ raise Exception('Path to binary is not valid')
+ return bin_path
+
+ with tempfile.NamedTemporaryFile(delete=False) as file:
+ req = request.Request(SHELL_URL)
+ with request.urlopen(req) as req:
+ file.write(req.read())
+ subprocess.check_output(['chmod', '+x', file.name])
+ return file.name
+
+ def get_bind_addr(self, port: int) -> Tuple[str, int]:
+ if port:
+ return 'localhost', port
+
+ free_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ free_socket.bind(('', 0))
+ free_socket.listen(5)
+ port = free_socket.getsockname()[1]
+ free_socket.close()
+ return 'localhost', port
diff --git a/python/perfetto/trace_processor/protos.py b/python/perfetto/trace_processor/protos.py
index 0053fdb..3a844ea 100644
--- a/python/perfetto/trace_processor/protos.py
+++ b/python/perfetto/trace_processor/protos.py
@@ -16,23 +16,27 @@
from google.protobuf import message_factory
from google.protobuf.descriptor_pool import DescriptorPool
+from perfetto.trace_processor.platform import PlatformDelegate
+
class ProtoFactory:
- def __init__(self, tp_descriptor: bytes, metrics_descriptor: bytes):
+ def __init__(self, platform_delegate: PlatformDelegate):
# Declare descriptor pool
self.descriptor_pool = DescriptorPool()
# Load trace processor descriptor and add to descriptor pool
+ tp_desc = platform_delegate.get_resource('trace_processor.descriptor')
tp_file_desc_set_pb2 = descriptor_pb2.FileDescriptorSet()
- tp_file_desc_set_pb2.MergeFromString(tp_descriptor)
+ tp_file_desc_set_pb2.MergeFromString(tp_desc)
for f_desc_pb2 in tp_file_desc_set_pb2.file:
self.descriptor_pool.Add(f_desc_pb2)
# Load metrics descriptor and add to descriptor pool
+ metrics_desc = platform_delegate.get_resource('metrics.descriptor')
metrics_file_desc_set_pb2 = descriptor_pb2.FileDescriptorSet()
- metrics_file_desc_set_pb2.MergeFromString(metrics_descriptor)
+ metrics_file_desc_set_pb2.MergeFromString(metrics_desc)
for f_desc_pb2 in metrics_file_desc_set_pb2.file:
self.descriptor_pool.Add(f_desc_pb2)
diff --git a/python/perfetto/trace_processor/shell.py b/python/perfetto/trace_processor/shell.py
index 8daa956..b0e3950 100644
--- a/python/perfetto/trace_processor/shell.py
+++ b/python/perfetto/trace_processor/shell.py
@@ -13,18 +13,25 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import os
import subprocess
import time
from urllib import request, error
-from perfetto.trace_processor.loader import get_loader
+from perfetto.trace_processor.platform import PlatformDelegate
+
+# Default port that trace_processor_shell runs on
+TP_PORT = 9001
-def load_shell(bin_path, unique_port, verbose):
- shell_path = get_loader().get_shell_path(bin_path=bin_path)
- port, url = get_loader().get_free_port(unique_port=unique_port)
- p = subprocess.Popen([shell_path, '-D', '--http-port', port],
+def load_shell(bin_path: str, unique_port: bool, verbose: bool,
+ platform_delegate: PlatformDelegate):
+ addr, port = platform_delegate.get_bind_addr(
+ port=0 if unique_port else TP_PORT)
+ url = f'{addr}:{str(port)}'
+
+ shell_path = platform_delegate.get_shell_path(bin_path=bin_path)
+ p = subprocess.Popen([shell_path, '-D', '--http-port',
+ str(port)],
stdout=subprocess.DEVNULL,
stderr=None if verbose else subprocess.DEVNULL)
@@ -38,7 +45,7 @@
raise Exception(
"Trace processor failed to start, please file a bug at https://goto.google.com/perfetto-bug"
)
- req = request.urlretrieve(f'http://{url}/status')
+ _ = request.urlretrieve(f'http://{url}/status')
time.sleep(1)
break
except error.URLError:
diff --git a/python/test/api_unittest.py b/python/test/api_unittest.py
index 732438a..f742916 100755
--- a/python/test/api_unittest.py
+++ b/python/test/api_unittest.py
@@ -17,13 +17,10 @@
from perfetto.trace_processor.api import TraceProcessor
from perfetto.trace_processor.api import TraceProcessorException
-from perfetto.trace_processor.api import TraceProcessorConfig
+from perfetto.trace_processor.api import PLATFORM_DELEGATE
from perfetto.trace_processor.protos import ProtoFactory
-TP_CONFIG = TraceProcessorConfig()
-PROTO_FACTORY = ProtoFactory(
- tp_descriptor=TP_CONFIG.read_tp_descriptor(),
- metrics_descriptor=TP_CONFIG.read_metrics_descriptor())
+PROTO_FACTORY = ProtoFactory(PLATFORM_DELEGATE())
class TestQueryResultIterator(unittest.TestCase):