tp: add config objects for trace processor and batch trace processor

This CL removes ad-hoc parameters being passed to contructor to instead
passing a config object which can override various functionality: this
will replace loader_vendor once G3 migrates to using just this.

Change-Id: Ibbf3960106726a00cb332bb3043b75e00681a324
Bug: 180499808
diff --git a/src/trace_processor/python/example.py b/src/trace_processor/python/example.py
index b3925a4..ffdd9ac 100644
--- a/src/trace_processor/python/example.py
+++ b/src/trace_processor/python/example.py
@@ -15,7 +15,7 @@
 
 import argparse
 
-from perfetto.trace_processor import TraceProcessor
+from perfetto.trace_processor import TraceProcessor, TraceProcessorConfig
 
 
 def main():
@@ -34,16 +34,17 @@
   parser.add_argument("-f", "--file", help="Absolute path to trace", type=str)
   args = parser.parse_args()
 
+  config = TraceProcessorConfig(bin_path=args.binary)
+
   # Pass arguments into api to construct the trace processor and load the trace
   if args.address is None and args.file is None:
     raise Exception("You must specify an address or a file path to trace")
   elif args.address is None:
-    tp = TraceProcessor(file_path=args.file, bin_path=args.binary)
+    tp = TraceProcessor(trace=args.file, config=config)
   elif args.file is None:
-    tp = TraceProcessor(addr=args.address)
+    tp = TraceProcessor(addr=args.address, config=config)
   else:
-    tp = TraceProcessor(
-        addr=args.address, file_path=args.file, bin_path=args.binary)
+    tp = TraceProcessor(trace=args.file, addr=args.address, config=config)
 
   # Iterate through QueryResultIterator
   res_it = tp.query('select * from slice limit 10')
diff --git a/src/trace_processor/python/perfetto/trace_processor/__init__.py b/src/trace_processor/python/perfetto/trace_processor/__init__.py
index 7106a6c..ad09ce6 100644
--- a/src/trace_processor/python/perfetto/trace_processor/__init__.py
+++ b/src/trace_processor/python/perfetto/trace_processor/__init__.py
@@ -13,5 +13,5 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from .api import TraceProcessor, TraceProcessorException
+from .api import LoadableTrace, TraceProcessor, TraceProcessorConfig, TraceProcessorException
 from .http import TraceProcessorHttp
diff --git a/src/trace_processor/python/perfetto/trace_processor/api.py b/src/trace_processor/python/perfetto/trace_processor/api.py
index c52e628..0b67632 100644
--- a/src/trace_processor/python/perfetto/trace_processor/api.py
+++ b/src/trace_processor/python/perfetto/trace_processor/api.py
@@ -12,8 +12,10 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import dataclasses as dc
+from enum import unique
 from urllib.parse import urlparse
-from typing import BinaryIO, Generator, List, Optional, Union
+from typing import BinaryIO, Callable, Generator, List, Optional, Tuple, Union
 
 from .http import TraceProcessorHttp
 from .loader import get_loader
@@ -31,6 +33,53 @@
     super().__init__(message)
 
 
+@dc.dataclass
+class TraceProcessorConfig:
+  bin_path: Optional[str]
+  unique_port: bool
+  verbose: bool
+
+  read_tp_descriptor: Callable[[], bytes]
+  read_metrics_descriptor: Callable[[], bytes]
+  parse_file: Callable[[TraceProcessorHttp, str], TraceProcessorHttp]
+  get_shell_path: Callable[[str], None]
+  get_free_port: Callable[[bool], Tuple[str, str]]
+
+  def __init__(
+      self,
+      bin_path: Optional[str] = None,
+      unique_port: bool = True,
+      verbose: bool = False,
+      read_tp_descriptor: Callable[[], bytes] = get_loader().read_tp_descriptor,
+      read_metrics_descriptor: Callable[[], bytes] = get_loader(
+      ).read_metrics_descriptor,
+      parse_file: Callable[[TraceProcessorHttp, str],
+                           TraceProcessorHttp] = get_loader().parse_file,
+      get_shell_path: Callable[[str], None] = get_loader().get_shell_path,
+      get_free_port: Callable[[bool], Tuple[str, str]] = get_loader(
+      ).get_free_port):
+    self.bin_path = bin_path
+    self.unique_port = unique_port
+    self.verbose = verbose
+
+    self.read_tp_descriptor = read_tp_descriptor
+    self.read_metrics_descriptor = read_metrics_descriptor
+    self.parse_file = parse_file
+    self.get_shell_path = get_shell_path
+    self.get_free_port = get_free_port
+
+    try:
+      # This is the only place in trace processor which should import
+      # from a "vendor" namespace - the purpose of this code is to allow
+      # for users to set their own "default" config for trace processor
+      # without needing to specify the config in every place when trace
+      # processor is used.
+      from .vendor import override_default_tp_config
+      return override_default_tp_config(self)
+    except ModuleNotFoundError:
+      pass
+
+
 class TraceProcessor:
 
   # Values of these constants correspond to the QueryResponse message at
@@ -181,52 +230,54 @@
   def __init__(self,
                trace: LoadableTrace = None,
                addr: Optional[str] = None,
-               bin_path: Optional[str] = None,
-               unique_port: bool = True,
-               verbose: bool = False,
+               config: TraceProcessorConfig = TraceProcessorConfig(),
                file_path: Optional[str] = None):
     """Create a trace processor instance.
 
     Args:
-      trace: Trace to be loaded into the trace processor instance. One of
+      trace: trace to be loaded into the trace processor instance. One of
         three types of argument is supported:
         1) path to a trace file to open and read
         2) a file like object (file, io.BytesIO or similar) to read
         3) a generator yielding bytes
-      addr: address of a running trace processor instance. For advanced
-        use only.
-      bin_path: path to a trace processor shell binary. For advanced use
-        only.
-      unique_port: whether the trace processor shell instance should be
-        be started on a unique port. Only used when |addr| is not set.
-        For advanced use only.
-      verbose: whether trace processor shell should emit verbose logs;
-        can be very spammy. For advanced use only.
-      file_path (deprecated): path to a trace file to load. Please use
+        4) a custom string format which can be understood by
+           TraceProcessorConfig.parse_file function. The default
+           implementation of this function only supports file paths (i.e. option
+           1) but callers can choose to change the implementation to parse
+           a custom string format and use that to retrieve a race.
+      addr: address of a running trace processor instance. Useful to query an
+        already loaded trace.
+      config: configuration options which customize functionality of trace
+        processor and the Python binding.
+      file_path (deprecated): path to a trace file to load. Use
         |trace| instead of this field: specifying both will cause
         an exception to be thrown.
     """
 
-    def create_tp_http():
+    def create_tp_http(protos: ProtoFactory) -> TraceProcessorHttp:
       if addr:
         p = urlparse(addr)
-        return TraceProcessorHttp(p.netloc if p.netloc else p.path)
+        return TraceProcessorHttp(
+            p.netloc if p.netloc else p.path, protos=protos)
 
       url, self.subprocess = load_shell(
-          bin_path=bin_path, unique_port=unique_port, verbose=verbose)
-      return TraceProcessorHttp(url)
+          bin_path=config.bin_path,
+          unique_port=config.unique_port,
+          verbose=config.verbose)
+      return TraceProcessorHttp(url, protos=protos)
 
     if trace and file_path:
       raise TraceProcessorException(
           "trace and file_path cannot both be specified.")
 
-    self.http = create_tp_http()
-    self.protos = ProtoFactory()
+    self.protos = ProtoFactory(config.read_tp_descriptor(),
+                               config.read_metrics_descriptor())
+    self.http = create_tp_http(self.protos)
 
     if file_path:
-      get_loader().parse_file(self.http, file_path)
+      config.parse_file(self.http, file_path)
     elif isinstance(trace, str):
-      get_loader().parse_file(self.http, trace)
+      config.parse_file(self.http, trace)
     elif hasattr(trace, 'read'):
       while True:
         chunk = trace.read(32 * 1024 * 1024)
diff --git a/src/trace_processor/python/perfetto/trace_processor/http.py b/src/trace_processor/python/perfetto/trace_processor/http.py
index bf751f9..f3cbfb5 100644
--- a/src/trace_processor/python/perfetto/trace_processor/http.py
+++ b/src/trace_processor/python/perfetto/trace_processor/http.py
@@ -14,17 +14,18 @@
 # limitations under the License.
 
 import http.client
+from typing import List
 
 from .protos import ProtoFactory
 
 
 class TraceProcessorHttp:
 
-  def __init__(self, url):
-    self.protos = ProtoFactory()
+  def __init__(self, url: str, protos: ProtoFactory):
+    self.protos = protos
     self.conn = http.client.HTTPConnection(url)
 
-  def execute_query(self, query):
+  def execute_query(self, query: str):
     args = self.protos.RawQueryArgs()
     args.sql_query = query
     byte_data = args.SerializeToString()
@@ -34,7 +35,7 @@
       result.ParseFromString(f.read())
       return result
 
-  def compute_metric(self, metrics):
+  def compute_metric(self, metrics: List[str]):
     args = self.protos.ComputeMetricArgs()
     args.metric_names.extend(metrics)
     byte_data = args.SerializeToString()
@@ -44,7 +45,7 @@
       result.ParseFromString(f.read())
       return result
 
-  def parse(self, chunk):
+  def parse(self, chunk: bytes):
     self.conn.request('POST', '/parse', body=chunk)
     with self.conn.getresponse() as f:
       return f.read()
diff --git a/src/trace_processor/python/perfetto/trace_processor/loader.py b/src/trace_processor/python/perfetto/trace_processor/loader.py
index e57145f..0a1b16b 100644
--- a/src/trace_processor/python/perfetto/trace_processor/loader.py
+++ b/src/trace_processor/python/perfetto/trace_processor/loader.py
@@ -53,7 +53,7 @@
     tp_http.notify_eof()
     return tp_http
 
-  def get_shell_path(bin_path=None):
+  def get_shell_path(bin_path):
     # Try to use preexisting binary before attempting to download
     # trace_processor
     if bin_path is None:
@@ -68,7 +68,7 @@
         raise Exception('Path to binary is not valid')
       return bin_path
 
-  def get_free_port(unique_port=False):
+  def get_free_port(unique_port):
     if not unique_port:
       return LoaderStandalone.TP_PORT, f'localhost:{LoaderStandalone.TP_PORT}'
     free_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
@@ -80,6 +80,8 @@
 
 
 # Return vendor class if it exists before falling back on LoaderStandalone
+# TODO(lalitm): remove this after migrating all consumers to
+# TraceProcessorConfig.
 def get_loader():
   try:
     from .loader_vendor import LoaderVendor
diff --git a/src/trace_processor/python/perfetto/trace_processor/protos.py b/src/trace_processor/python/perfetto/trace_processor/protos.py
index b5e3700..37be4f2 100644
--- a/src/trace_processor/python/perfetto/trace_processor/protos.py
+++ b/src/trace_processor/python/perfetto/trace_processor/protos.py
@@ -16,27 +16,24 @@
 from google.protobuf import message_factory
 from google.protobuf.descriptor_pool import DescriptorPool
 
-from .loader import get_loader
-
 
 class ProtoFactory:
 
-  def __init__(self):
+  def __init__(self, tp_descriptor: bytes,
+               metrics_descriptor: bytes):
     # Declare descriptor pool
     self.descriptor_pool = DescriptorPool()
 
     # Load trace processor descriptor and add to descriptor pool
-    tp_descriptor_bytes = get_loader().read_tp_descriptor()
     tp_file_desc_set_pb2 = descriptor_pb2.FileDescriptorSet()
-    tp_file_desc_set_pb2.MergeFromString(tp_descriptor_bytes)
+    tp_file_desc_set_pb2.MergeFromString(tp_descriptor)
 
     for f_desc_pb2 in tp_file_desc_set_pb2.file:
       self.descriptor_pool.Add(f_desc_pb2)
 
     # Load metrics descriptor and add to descriptor pool
-    metrics_descriptor_bytes = get_loader().read_metrics_descriptor()
     metrics_file_desc_set_pb2 = descriptor_pb2.FileDescriptorSet()
-    metrics_file_desc_set_pb2.MergeFromString(metrics_descriptor_bytes)
+    metrics_file_desc_set_pb2.MergeFromString(metrics_descriptor)
 
     for f_desc_pb2 in metrics_file_desc_set_pb2.file:
       self.descriptor_pool.Add(f_desc_pb2)