Merge "Update for frame data containing MapInfo object."
diff --git a/bazel/standalone/perfetto_cfg.bzl b/bazel/standalone/perfetto_cfg.bzl
index 5725041..c9c89f1 100644
--- a/bazel/standalone/perfetto_cfg.bzl
+++ b/bazel/standalone/perfetto_cfg.bzl
@@ -53,12 +53,9 @@
 
         # The Python targets are empty on the standalone build because we assume
         # any relevant deps are installed on the system or are not applicable.
-        tp_init_py = [],
-        gfile_py = [],
         protobuf_py = [],
-        protobuf_descriptor_pb2_py = [],
-        pyglib_py = [],
         pandas_py = [],
+        tp_vendor_py = [],
     ),
 
     # This struct allows embedders to customize the cc_opts for Perfetto
diff --git a/include/perfetto/ext/ipc/client_info.h b/include/perfetto/ext/ipc/client_info.h
index 6988d18..4537916 100644
--- a/include/perfetto/ext/ipc/client_info.h
+++ b/include/perfetto/ext/ipc/client_info.h
@@ -28,8 +28,8 @@
 class ClientInfo {
  public:
   ClientInfo() = default;
-  ClientInfo(ClientID client_id, uid_t uid)
-      : client_id_(client_id), uid_(uid) {}
+  ClientInfo(ClientID client_id, uid_t uid, pid_t pid)
+      : client_id_(client_id), uid_(uid), pid_(pid) {}
 
   bool operator==(const ClientInfo& other) const {
     return (client_id_ == other.client_id_ && uid_ == other.uid_);
@@ -50,9 +50,13 @@
   // Posix User ID. Comes from the kernel, can be trusted.
   uid_t uid() const { return uid_; }
 
+  // Posix process ID. Comes from the kernel and can be trusted.
+  int32_t pid() const { return pid_; }
+
  private:
   ClientID client_id_ = 0;
   uid_t uid_ = kInvalidUid;
+  pid_t pid_ = base::kInvalidPid;
 };
 
 }  // namespace ipc
diff --git a/include/perfetto/ext/tracing/core/tracing_service.h b/include/perfetto/ext/tracing/core/tracing_service.h
index 022520f..57c9a26 100644
--- a/include/perfetto/ext/tracing/core/tracing_service.h
+++ b/include/perfetto/ext/tracing/core/tracing_service.h
@@ -321,6 +321,7 @@
   virtual std::unique_ptr<ProducerEndpoint> ConnectProducer(
       Producer*,
       uid_t uid,
+      pid_t pid,
       const std::string& name,
       size_t shared_memory_size_hint_bytes = 0,
       bool in_process = false,
diff --git a/protos/perfetto/trace/perfetto_trace.proto b/protos/perfetto/trace/perfetto_trace.proto
index b14a0f4..9d4a7d8 100644
--- a/protos/perfetto/trace/perfetto_trace.proto
+++ b/protos/perfetto/trace/perfetto_trace.proto
@@ -9142,7 +9142,7 @@
 // See the [Buffers and Dataflow](/docs/concepts/buffers.md) doc for details.
 //
 // Next reserved id: 14 (up to 15).
-// Next id: 79.
+// Next id: 80.
 message TracePacket {
   // The timestamp of the TracePacket.
   // By default this timestamps refers to the trace clock (CLOCK_BOOTTIME on
@@ -9264,6 +9264,10 @@
     uint32 trusted_packet_sequence_id = 10;
   }
 
+  // Trusted process id of the producer which generated this packet, written by
+  // the service.
+  optional int32 trusted_pid = 79;
+
   // Incrementally emitted interned data, valid only on the packet's sequence
   // (packets with the same |trusted_packet_sequence_id|). The writer will
   // usually emit new interned data in the same TracePacket that first refers to
diff --git a/protos/perfetto/trace/trace_packet.proto b/protos/perfetto/trace/trace_packet.proto
index 2428427..7a806a9 100644
--- a/protos/perfetto/trace/trace_packet.proto
+++ b/protos/perfetto/trace/trace_packet.proto
@@ -85,7 +85,7 @@
 // See the [Buffers and Dataflow](/docs/concepts/buffers.md) doc for details.
 //
 // Next reserved id: 14 (up to 15).
-// Next id: 79.
+// Next id: 80.
 message TracePacket {
   // The timestamp of the TracePacket.
   // By default this timestamps refers to the trace clock (CLOCK_BOOTTIME on
@@ -207,6 +207,10 @@
     uint32 trusted_packet_sequence_id = 10;
   }
 
+  // Trusted process id of the producer which generated this packet, written by
+  // the service.
+  optional int32 trusted_pid = 79;
+
   // Incrementally emitted interned data, valid only on the packet's sequence
   // (packets with the same |trusted_packet_sequence_id|). The writer will
   // usually emit new interned data in the same TracePacket that first refers to
diff --git a/python/BUILD b/python/BUILD
index f330ba8..39a0a7e 100644
--- a/python/BUILD
+++ b/python/BUILD
@@ -38,11 +38,10 @@
         "perfetto/trace_processor/trace_processor.descriptor",
         "perfetto/trace_processor/metrics.descriptor",
         PERFETTO_CONFIG.root + ":trace_processor_shell",
-    ] + PERFETTO_CONFIG.deps.tp_init_py,
-    deps = PERFETTO_CONFIG.deps.gfile_py +
+    ],
+    deps = PERFETTO_CONFIG.deps.tp_vendor_py +
         PERFETTO_CONFIG.deps.protobuf_py +
-        PERFETTO_CONFIG.deps.protobuf_descriptor_pb2_py +
-        PERFETTO_CONFIG.deps.pyglib_py,
+        PERFETTO_CONFIG.deps.pandas_py,
     imports = [
         ".",
     ],
@@ -79,7 +78,8 @@
     ]),
     deps = [
         ":trace_processor_py",
-    ] + PERFETTO_CONFIG.deps.pandas_py,
+    ] + PERFETTO_CONFIG.deps.pandas_py +
+        PERFETTO_CONFIG.deps.tp_vendor_py,
     imports = [
         ".",
     ],
diff --git a/python/perfetto/batch_trace_processor/api.py b/python/perfetto/batch_trace_processor/api.py
index caf9f74..b320090 100644
--- a/python/perfetto/batch_trace_processor/api.py
+++ b/python/perfetto/batch_trace_processor/api.py
@@ -14,18 +14,25 @@
 # limitations under the License.
 """Contains classes for BatchTraceProcessor API."""
 
-from concurrent.futures.thread import ThreadPoolExecutor
+import concurrent.futures as cf
 import dataclasses as dc
 import multiprocessing
-from typing import Any, Callable, Dict, Optional, Tuple, Union, List
-from numpy.lib.npyio import load
+from typing import Any, Callable, Dict, Tuple, Union, List
 
 import pandas as pd
 
-from perfetto.trace_processor import LoadableTrace
-from perfetto.trace_processor import TraceProcessor
-from perfetto.trace_processor import TraceProcessorException
-from perfetto.trace_processor import TraceProcessorConfig
+from perfetto.trace_processor.api import LoadableTrace
+from perfetto.trace_processor.api import TraceProcessor
+from perfetto.trace_processor.api import TraceProcessorException
+from perfetto.trace_processor.api import TraceProcessorConfig
+from perfetto.batch_trace_processor.platform import PlatformDelegate
+
+# Defining this field as a module variable means this can be changed by
+# implementations at startup and used for all BatchTraceProcessor objects
+# without having to specify on each one.
+# In Google3, this field is rewritten using Copybara to a implementation
+# which can integrates with internal infra.
+PLATFORM_DELEGATE = PlatformDelegate
 
 
 @dc.dataclass
@@ -36,45 +43,11 @@
 
 @dc.dataclass
 class BatchTraceProcessorConfig:
-  TraceProvider = Callable[[str], List[
-      Union[LoadableTrace, BatchLoadableTrace]]]
-
   tp_config: TraceProcessorConfig
 
-  query_executor: Optional[ThreadPoolExecutor]
-  load_executor: Optional[ThreadPoolExecutor]
-
-  trace_provider: TraceProvider
-
-  def __default_trace_provider(custom_string: str):
-    del custom_string
-    raise TraceProcessorException(
-        'Passed a string to batch trace processor constructor without '
-        'a trace provider being registered.')
-
-  def __init__(self,
-               tp_config: TraceProcessorConfig = TraceProcessorConfig(),
-               query_executor: Optional[ThreadPoolExecutor] = None,
-               load_executor: Optional[ThreadPoolExecutor] = None,
-               trace_provider: TraceProvider = __default_trace_provider):
+  def __init__(self, tp_config: TraceProcessorConfig = TraceProcessorConfig()):
     self.tp_config = tp_config
 
-    self.query_executor = query_executor
-    self.load_executor = load_executor
-
-    self.trace_provider = trace_provider
-
-    try:
-      # This is the only place in batch trace processor which should import
-      # from a "vendor" namespace - the purpose of this code is to allow
-      # for users to set their own "default" config for batch trace processor
-      # without needing to specify the config in every place when batch
-      # trace processor is used.
-      from .vendor import override_batch_tp_config
-      override_batch_tp_config(self)
-    except ModuleNotFoundError:
-      pass
-
 
 class BatchTraceProcessor:
   """Run ad-hoc SQL queries across many Perfetto traces.
@@ -96,21 +69,13 @@
     Python across many traces.
 
     Args:
-      traces: Either a list of traces or a custom string which will be
-        converted to a list of traces.
-
-        If a list, each item can be one of the following types:
+      traces: A list of traces where each item is one of the following types:
         1) path to a trace file to open and read
         2) a file like object (file, io.BytesIO or similar) to read
         3) a generator yielding bytes
         4) a BatchLoadableTrace object; this is basically a wrapper around
            one of the above types plus an args field; see |query_and_flatten|
            for the motivation for the args field.
-
-        If a string, it is passed to BatchTraceProcessorConfig.trace_provider to
-        convert to a list of traces; the default implementation of this
-        function just throws an exception so an implementation must be provided
-        if strings will be passed.
       config: configuration options which customize functionality of batch
         trace processor and underlying trace processors.
     """
@@ -121,27 +86,27 @@
         return x
       return BatchLoadableTrace(trace=x, args={})
 
-    def create_tp(trace: BatchLoadableTrace) -> TraceProcessor:
+    def _create_tp(trace: BatchLoadableTrace) -> TraceProcessor:
       return TraceProcessor(trace=trace.trace, config=config.tp_config)
 
-    if isinstance(traces, str):
-      trace_list = config.trace_provider(traces)
-    else:
-      trace_list = traces
+    batch_traces = [_create_batch_trace(t) for t in traces]
+    trace_count = len(batch_traces)
 
-    batch_traces = [_create_batch_trace(t) for t in trace_list]
+    self.platform_delegate = PLATFORM_DELEGATE()
 
     # As trace processor is completely CPU bound, it makes sense to just
     # max out the CPUs available.
-    query_executor = config.query_executor or ThreadPoolExecutor(
-        max_workers=multiprocessing.cpu_count())
-    load_exectuor = config.load_executor or query_executor
+    query_executor = self.platform_delegate.create_query_executor(
+        trace_count) or cf.ThreadPoolExecutor(
+            max_workers=multiprocessing.cpu_count())
+    load_exectuor = self.platform_delegate.create_load_executor(
+        trace_count) or query_executor
 
     self.tps = None
     self.closed = False
     self.query_executor = query_executor
     self.args = [t.args for t in batch_traces]
-    self.tps = list(load_exectuor.map(create_tp, batch_traces))
+    self.tps = list(load_exectuor.map(_create_tp, batch_traces))
 
   def metric(self, metrics: List[str]):
     """Computes the provided metrics.
diff --git a/python/perfetto/batch_trace_processor/platform.py b/python/perfetto/batch_trace_processor/platform.py
new file mode 100644
index 0000000..63f1d6c
--- /dev/null
+++ b/python/perfetto/batch_trace_processor/platform.py
@@ -0,0 +1,27 @@
+# Copyright (C) 2022 The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import concurrent.futures as cf
+from typing import Optional
+
+
+class PlatformDelegate:
+  """Abstracts operations which can vary based on platform."""
+
+  def create_query_executor(trace_count: int
+                           ) -> Optional[cf.ThreadPoolExecutor]:
+    return None
+
+  def create_load_executor(trace_count: int) -> Optional[cf.ThreadPoolExecutor]:
+    return None
diff --git a/python/perfetto/trace_processor/api.py b/python/perfetto/trace_processor/api.py
index 3f24a13..45bcde0 100644
--- a/python/perfetto/trace_processor/api.py
+++ b/python/perfetto/trace_processor/api.py
@@ -13,19 +13,24 @@
 # limitations under the License.
 
 import dataclasses as dc
-from enum import unique
 from urllib.parse import urlparse
-from typing import BinaryIO, Callable, Generator, List, Optional, Tuple, Union
+from typing import BinaryIO, Generator, List, Optional, Union
 
 from perfetto.trace_processor.http import TraceProcessorHttp
-from perfetto.trace_processor.loader import get_loader
+from perfetto.trace_processor.platform import PlatformDelegate
 from perfetto.trace_processor.protos import ProtoFactory
 from perfetto.trace_processor.shell import load_shell
 
+# Defining this field as a module variable means this can be changed by
+# implementations at startup and used for all TraceProcessor objects
+# without having to specify on each one.
+# In Google3, this field is rewritten using Copybara to a implementation
+# which can integrates with internal infra.
+PLATFORM_DELEGATE = PlatformDelegate
+
 # Union of types supported for a trace which can be loaded by shell.
 LoadableTrace = Union[None, str, BinaryIO, Generator[bytes, None, None]]
 
-
 # Custom exception raised if any trace_processor functions return a
 # response with an error defined
 class TraceProcessorException(Exception):
@@ -40,46 +45,14 @@
   unique_port: bool
   verbose: bool
 
-  read_tp_descriptor: Callable[[], bytes]
-  read_metrics_descriptor: Callable[[], bytes]
-  parse_file: Callable[[TraceProcessorHttp, str], TraceProcessorHttp]
-  get_shell_path: Callable[[str], None]
-  get_free_port: Callable[[bool], Tuple[str, str]]
-
-  def __init__(
-      self,
-      bin_path: Optional[str] = None,
-      unique_port: bool = True,
-      verbose: bool = False,
-      read_tp_descriptor: Callable[[], bytes] = get_loader().read_tp_descriptor,
-      read_metrics_descriptor: Callable[[], bytes] = get_loader(
-      ).read_metrics_descriptor,
-      parse_file: Callable[[TraceProcessorHttp, str],
-                           TraceProcessorHttp] = get_loader().parse_file,
-      get_shell_path: Callable[[str], None] = get_loader().get_shell_path,
-      get_free_port: Callable[[bool], Tuple[str, str]] = get_loader(
-      ).get_free_port):
+  def __init__(self,
+               bin_path: Optional[str] = None,
+               unique_port: bool = True,
+               verbose: bool = False):
     self.bin_path = bin_path
     self.unique_port = unique_port
     self.verbose = verbose
 
-    self.read_tp_descriptor = read_tp_descriptor
-    self.read_metrics_descriptor = read_metrics_descriptor
-    self.parse_file = parse_file
-    self.get_shell_path = get_shell_path
-    self.get_free_port = get_free_port
-
-    try:
-      # This is the only place in trace processor which should import
-      # from a "vendor" namespace - the purpose of this code is to allow
-      # for users to set their own "default" config for trace processor
-      # without needing to specify the config in every place when trace
-      # processor is used.
-      from .vendor import override_default_tp_config
-      return override_default_tp_config(self)
-    except ModuleNotFoundError:
-      pass
-
 
 class TraceProcessor:
 
@@ -241,11 +214,6 @@
         1) path to a trace file to open and read
         2) a file like object (file, io.BytesIO or similar) to read
         3) a generator yielding bytes
-        4) a custom string format which can be understood by
-           TraceProcessorConfig.parse_file function. The default
-           implementation of this function only supports file paths (i.e. option
-           1) but callers can choose to change the implementation to parse
-           a custom string format and use that to retrieve a race.
       addr: address of a running trace processor instance. Useful to query an
         already loaded trace.
       config: configuration options which customize functionality of trace
@@ -255,30 +223,19 @@
         an exception to be thrown.
     """
 
-    def create_tp_http(protos: ProtoFactory) -> TraceProcessorHttp:
-      if addr:
-        p = urlparse(addr)
-        return TraceProcessorHttp(
-            p.netloc if p.netloc else p.path, protos=protos)
-
-      url, self.subprocess = load_shell(
-          bin_path=config.bin_path,
-          unique_port=config.unique_port,
-          verbose=config.verbose)
-      return TraceProcessorHttp(url, protos=protos)
-
     if trace and file_path:
       raise TraceProcessorException(
           "trace and file_path cannot both be specified.")
 
-    self.protos = ProtoFactory(config.read_tp_descriptor(),
-                               config.read_metrics_descriptor())
-    self.http = create_tp_http(self.protos)
+    self.config = config
+    self.platform_delegate = PLATFORM_DELEGATE()
+    self.protos = ProtoFactory(self.platform_delegate)
+    self.http = self._create_tp_http(addr)
 
     if file_path:
-      config.parse_file(self.http, file_path)
+      self.platform_delegate.parse_file(self.http, file_path)
     elif isinstance(trace, str):
-      config.parse_file(self.http, trace)
+      self.platform_delegate.parse_file(self.http, trace)
     elif hasattr(trace, 'read'):
       while True:
         chunk = trace.read(32 * 1024 * 1024)
@@ -346,6 +303,18 @@
 
     return response.metatrace
 
+  def _create_tp_http(self, addr: str) -> TraceProcessorHttp:
+    if addr:
+      p = urlparse(addr)
+      parsed = p.netloc if p.netloc else p.path
+      return TraceProcessorHttp(parsed, protos=self.protos)
+
+    url, self.subprocess = load_shell(self.config.bin_path,
+                                      self.config.unique_port,
+                                      self.config.verbose,
+                                      self.platform_delegate)
+    return TraceProcessorHttp(url, protos=self.protos)
+
   def __enter__(self):
     return self
 
diff --git a/python/perfetto/trace_processor/loader.py b/python/perfetto/trace_processor/loader.py
deleted file mode 100644
index 0a1b16b..0000000
--- a/python/perfetto/trace_processor/loader.py
+++ /dev/null
@@ -1,90 +0,0 @@
-#!/usr/bin/env python3
-# Copyright (C) 2020 The Android Open Source Project
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import os
-import socket
-import subprocess
-import tempfile
-from urllib import request
-
-
-# This class contains all functions that first try to use a vendor to fulfil
-# their function
-class LoaderStandalone:
-  # Limit parsing file to 32MB to maintain parity with the UI
-  MAX_BYTES_LOADED = 32 * 1024 * 1024
-
-  # URL to download script to run trace_processor
-  SHELL_URL = 'http://get.perfetto.dev/trace_processor'
-
-  # Default port that trace_processor_shell runs on
-  TP_PORT = '9001'
-
-  def read_tp_descriptor():
-    ws = os.path.dirname(__file__)
-    with open(os.path.join(ws, 'trace_processor.descriptor'), 'rb') as x:
-      return x.read()
-
-  def read_metrics_descriptor():
-    ws = os.path.dirname(__file__)
-    with open(os.path.join(ws, 'metrics.descriptor'), 'rb') as x:
-      return x.read()
-
-  def parse_file(tp_http, file_path):
-    with open(file_path, 'rb') as f:
-      f_size = os.path.getsize(file_path)
-      bytes_read = 0
-      while (bytes_read < f_size):
-        chunk = f.read(LoaderStandalone.MAX_BYTES_LOADED)
-        tp_http.parse(chunk)
-        bytes_read += len(chunk)
-    tp_http.notify_eof()
-    return tp_http
-
-  def get_shell_path(bin_path):
-    # Try to use preexisting binary before attempting to download
-    # trace_processor
-    if bin_path is None:
-      with tempfile.NamedTemporaryFile(delete=False) as file:
-        req = request.Request(LoaderStandalone.SHELL_URL)
-        with request.urlopen(req) as req:
-          file.write(req.read())
-      subprocess.check_output(['chmod', '+x', file.name])
-      return file.name
-    else:
-      if not os.path.isfile(bin_path):
-        raise Exception('Path to binary is not valid')
-      return bin_path
-
-  def get_free_port(unique_port):
-    if not unique_port:
-      return LoaderStandalone.TP_PORT, f'localhost:{LoaderStandalone.TP_PORT}'
-    free_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-    free_socket.bind(('', 0))
-    free_socket.listen(5)
-    port = free_socket.getsockname()[1]
-    free_socket.close()
-    return str(port), f"localhost:{str(port)}"
-
-
-# Return vendor class if it exists before falling back on LoaderStandalone
-# TODO(lalitm): remove this after migrating all consumers to
-# TraceProcessorConfig.
-def get_loader():
-  try:
-    from .loader_vendor import LoaderVendor
-    return LoaderVendor
-  except ModuleNotFoundError:
-    return LoaderStandalone
diff --git a/python/perfetto/trace_processor/platform.py b/python/perfetto/trace_processor/platform.py
new file mode 100644
index 0000000..69b6b26
--- /dev/null
+++ b/python/perfetto/trace_processor/platform.py
@@ -0,0 +1,73 @@
+#!/usr/bin/env python3
+# Copyright (C) 2022 The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import socket
+import subprocess
+import tempfile
+from typing import Tuple
+from urllib import request
+
+# Limit parsing file to 32MB to maintain parity with the UI
+MAX_BYTES_LOADED = 32 * 1024 * 1024
+
+# URL to download script to run trace_processor
+SHELL_URL = 'http://get.perfetto.dev/trace_processor'
+
+
+class PlatformDelegate:
+  """Abstracts operations which can vary based on platform."""
+
+  def get_resource(self, file: str) -> bytes:
+    ws = os.path.dirname(__file__)
+    with open(os.path.join(ws, file), 'rb') as x:
+      return x.read()
+
+  # TODO(lalitm): when we add trace resolving in future CL, remove this
+  # function.
+  def parse_file(self, tp_http, file_path: str):
+    with open(file_path, 'rb') as f:
+      f_size = os.path.getsize(file_path)
+      bytes_read = 0
+      while (bytes_read < f_size):
+        chunk = f.read(MAX_BYTES_LOADED)
+        tp_http.parse(chunk)
+        bytes_read += len(chunk)
+    tp_http.notify_eof()
+    return tp_http
+
+  def get_shell_path(self, bin_path: str) -> str:
+    if bin_path is not None:
+      if not os.path.isfile(bin_path):
+        raise Exception('Path to binary is not valid')
+      return bin_path
+
+    with tempfile.NamedTemporaryFile(delete=False) as file:
+      req = request.Request(SHELL_URL)
+      with request.urlopen(req) as req:
+        file.write(req.read())
+    subprocess.check_output(['chmod', '+x', file.name])
+    return file.name
+
+  def get_bind_addr(self, port: int) -> Tuple[str, int]:
+    if port:
+      return 'localhost', port
+
+    free_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+    free_socket.bind(('', 0))
+    free_socket.listen(5)
+    port = free_socket.getsockname()[1]
+    free_socket.close()
+    return 'localhost', port
diff --git a/python/perfetto/trace_processor/protos.py b/python/perfetto/trace_processor/protos.py
index 0053fdb..3a844ea 100644
--- a/python/perfetto/trace_processor/protos.py
+++ b/python/perfetto/trace_processor/protos.py
@@ -16,23 +16,27 @@
 from google.protobuf import message_factory
 from google.protobuf.descriptor_pool import DescriptorPool
 
+from perfetto.trace_processor.platform import PlatformDelegate
+
 
 class ProtoFactory:
 
-  def __init__(self, tp_descriptor: bytes, metrics_descriptor: bytes):
+  def __init__(self, platform_delegate: PlatformDelegate):
     # Declare descriptor pool
     self.descriptor_pool = DescriptorPool()
 
     # Load trace processor descriptor and add to descriptor pool
+    tp_desc = platform_delegate.get_resource('trace_processor.descriptor')
     tp_file_desc_set_pb2 = descriptor_pb2.FileDescriptorSet()
-    tp_file_desc_set_pb2.MergeFromString(tp_descriptor)
+    tp_file_desc_set_pb2.MergeFromString(tp_desc)
 
     for f_desc_pb2 in tp_file_desc_set_pb2.file:
       self.descriptor_pool.Add(f_desc_pb2)
 
     # Load metrics descriptor and add to descriptor pool
+    metrics_desc = platform_delegate.get_resource('metrics.descriptor')
     metrics_file_desc_set_pb2 = descriptor_pb2.FileDescriptorSet()
-    metrics_file_desc_set_pb2.MergeFromString(metrics_descriptor)
+    metrics_file_desc_set_pb2.MergeFromString(metrics_desc)
 
     for f_desc_pb2 in metrics_file_desc_set_pb2.file:
       self.descriptor_pool.Add(f_desc_pb2)
diff --git a/python/perfetto/trace_processor/shell.py b/python/perfetto/trace_processor/shell.py
index 8daa956..b0e3950 100644
--- a/python/perfetto/trace_processor/shell.py
+++ b/python/perfetto/trace_processor/shell.py
@@ -13,18 +13,25 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import os
 import subprocess
 import time
 from urllib import request, error
 
-from perfetto.trace_processor.loader import get_loader
+from perfetto.trace_processor.platform import PlatformDelegate
+
+# Default port that trace_processor_shell runs on
+TP_PORT = 9001
 
 
-def load_shell(bin_path, unique_port, verbose):
-  shell_path = get_loader().get_shell_path(bin_path=bin_path)
-  port, url = get_loader().get_free_port(unique_port=unique_port)
-  p = subprocess.Popen([shell_path, '-D', '--http-port', port],
+def load_shell(bin_path: str, unique_port: bool, verbose: bool,
+               platform_delegate: PlatformDelegate):
+  addr, port = platform_delegate.get_bind_addr(
+      port=0 if unique_port else TP_PORT)
+  url = f'{addr}:{str(port)}'
+
+  shell_path = platform_delegate.get_shell_path(bin_path=bin_path)
+  p = subprocess.Popen([shell_path, '-D', '--http-port',
+                        str(port)],
                        stdout=subprocess.DEVNULL,
                        stderr=None if verbose else subprocess.DEVNULL)
 
@@ -38,7 +45,7 @@
         raise Exception(
             "Trace processor failed to start, please file a bug at https://goto.google.com/perfetto-bug"
         )
-      req = request.urlretrieve(f'http://{url}/status')
+      _ = request.urlretrieve(f'http://{url}/status')
       time.sleep(1)
       break
     except error.URLError:
diff --git a/python/test/api_unittest.py b/python/test/api_unittest.py
index 732438a..f742916 100755
--- a/python/test/api_unittest.py
+++ b/python/test/api_unittest.py
@@ -17,13 +17,10 @@
 
 from perfetto.trace_processor.api import TraceProcessor
 from perfetto.trace_processor.api import TraceProcessorException
-from perfetto.trace_processor.api import TraceProcessorConfig
+from perfetto.trace_processor.api import PLATFORM_DELEGATE
 from perfetto.trace_processor.protos import ProtoFactory
 
-TP_CONFIG = TraceProcessorConfig()
-PROTO_FACTORY = ProtoFactory(
-    tp_descriptor=TP_CONFIG.read_tp_descriptor(),
-    metrics_descriptor=TP_CONFIG.read_metrics_descriptor())
+PROTO_FACTORY = ProtoFactory(PLATFORM_DELEGATE())
 
 
 class TestQueryResultIterator(unittest.TestCase):
diff --git a/src/base/file_utils.cc b/src/base/file_utils.cc
index e3139bf..09782fe 100644
--- a/src/base/file_utils.cc
+++ b/src/base/file_utils.cc
@@ -49,7 +49,7 @@
 // Wrap FindClose to: (1) make the return unix-style; (2) deal with stdcall.
 int CloseFindHandle(HANDLE h) {
   return FindClose(h) ? 0 : -1;
-};
+}
 #endif
 
 }  // namespace
diff --git a/src/ipc/host_impl.cc b/src/ipc/host_impl.cc
index fa85c2b..4141f28 100644
--- a/src/ipc/host_impl.cc
+++ b/src/ipc/host_impl.cc
@@ -65,6 +65,16 @@
 #endif
 }
 
+pid_t GetLinuxPeerPid(base::UnixSocket* sock) {
+#if PERFETTO_BUILDFLAG(PERFETTO_OS_LINUX) || \
+    PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
+  return sock->peer_pid_linux();
+#else
+  base::ignore_result(sock);
+  return base::kInvalidPid;  // Unsupported.
+#endif
+}
+
 }  // namespace
 
 // static
@@ -240,7 +250,8 @@
 
   auto peer_uid = GetPosixPeerUid(client->sock.get());
   auto scoped_key = g_crash_key_uid.SetScoped(static_cast<int64_t>(peer_uid));
-  service->client_info_ = ClientInfo(client->id, peer_uid);
+  service->client_info_ =
+      ClientInfo(client->id, peer_uid, GetLinuxPeerPid(client->sock.get()));
   service->received_fd_ = &client->received_fd;
   method.invoker(service, *decoded_req_args, std::move(deferred_reply));
   service->received_fd_ = nullptr;
@@ -307,7 +318,8 @@
     return;
   ClientID client_id = it->second->id;
 
-  ClientInfo client_info(client_id, GetPosixPeerUid(sock));
+  ClientInfo client_info(client_id, GetPosixPeerUid(sock),
+                         GetLinuxPeerPid(sock));
   clients_by_socket_.erase(it);
   PERFETTO_DCHECK(clients_.count(client_id));
   clients_.erase(client_id);
diff --git a/src/ipc/host_impl_unittest.cc b/src/ipc/host_impl_unittest.cc
index a04ff22..e1bbee7 100644
--- a/src/ipc/host_impl_unittest.cc
+++ b/src/ipc/host_impl_unittest.cc
@@ -478,6 +478,39 @@
   task_runner_->RunUntilCheckpoint("on_reply_received");
 }
 
+#if PERFETTO_BUILDFLAG(PERFETTO_OS_LINUX) || \
+    PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
+// Check ClientInfo of the service.
+TEST_F(HostImplTest, ServiceClientInfo) {
+  FakeService* fake_service = new FakeService("FakeService");
+  ASSERT_TRUE(host_->ExposeService(std::unique_ptr<Service>(fake_service)));
+  auto on_bind = task_runner_->CreateCheckpoint("on_bind");
+  cli_->BindService("FakeService");
+  EXPECT_CALL(*cli_, OnServiceBound(_)).WillOnce(InvokeWithoutArgs(on_bind));
+  task_runner_->RunUntilCheckpoint("on_bind");
+
+  RequestProto req_args;
+  req_args.set_data("foo");
+  cli_->InvokeMethod(cli_->last_bound_service_id_, 1, req_args);
+  EXPECT_CALL(*fake_service, OnFakeMethod1(_, _))
+      .WillOnce(
+          Invoke([fake_service](const RequestProto& req, DeferredBase* reply) {
+            ASSERT_EQ("foo", req.data());
+            std::unique_ptr<ReplyProto> reply_args(new ReplyProto());
+            reply_args->set_data("bar");
+            reply->Resolve(AsyncResult<ProtoMessage>(
+                std::unique_ptr<ProtoMessage>(reply_args.release())));
+            // Verifies the pid() and uid() values in ClientInfo.
+            const auto& client_info = fake_service->client_info();
+            ASSERT_EQ(client_info.uid(), getuid());
+            ASSERT_EQ(client_info.pid(), getpid());
+          }));
+
+  EXPECT_CALL(*cli_, OnInvokeMethodReply(_)).WillOnce(Return());
+  task_runner_->RunUntilIdle();
+}
+#endif  // OS_WIN
+
 // TODO(primiano): add the tests below in next CLs.
 // TEST(HostImplTest, ManyClients) {}
 // TEST(HostImplTest, OverlappingRequstsOutOfOrder) {}
diff --git a/src/traced/service/builtin_producer.cc b/src/traced/service/builtin_producer.cc
index 4bd945b..20d9849 100644
--- a/src/traced/service/builtin_producer.cc
+++ b/src/traced/service/builtin_producer.cc
@@ -20,6 +20,7 @@
 
 #include "perfetto/base/build_config.h"
 #include "perfetto/base/logging.h"
+#include "perfetto/base/proc_utils.h"
 #include "perfetto/ext/base/metatrace.h"
 #include "perfetto/ext/base/utils.h"
 #include "perfetto/ext/base/weak_ptr.h"
@@ -67,7 +68,7 @@
 
 void BuiltinProducer::ConnectInProcess(TracingService* svc) {
   endpoint_ = svc->ConnectProducer(
-      this, base::GetCurrentUserId(), "traced",
+      this, base::GetCurrentUserId(), base::GetProcessId(), "traced",
       /*shared_memory_size_hint_bytes=*/16 * 1024, /*in_process=*/true,
       TracingService::ProducerSMBScrapingMode::kDisabled,
       /*shared_memory_page_size_hint_bytes=*/4096);
diff --git a/src/tracing/core/packet_stream_validator.cc b/src/tracing/core/packet_stream_validator.cc
index 50d1db3..d84eed3 100644
--- a/src/tracing/core/packet_stream_validator.cc
+++ b/src/tracing/core/packet_stream_validator.cc
@@ -39,6 +39,7 @@
     protos::pbzero::TracePacket::kTraceStatsFieldNumber,
     protos::pbzero::TracePacket::kCompressedPacketsFieldNumber,
     protos::pbzero::TracePacket::kSynchronizationMarkerFieldNumber,
+    protos::pbzero::TracePacket::kTrustedPidFieldNumber,
 };
 
 // This translation unit is quite subtle and perf-sensitive. Remember to check
diff --git a/src/tracing/core/packet_stream_validator_unittest.cc b/src/tracing/core/packet_stream_validator_unittest.cc
index fab2a60..74de04c 100644
--- a/src/tracing/core/packet_stream_validator_unittest.cc
+++ b/src/tracing/core/packet_stream_validator_unittest.cc
@@ -151,6 +151,76 @@
   }
 }
 
+TEST(PacketStreamValidatorTest, SimplePacketWithPid) {
+  protos::gen::TracePacket proto;
+  proto.set_trusted_pid(123);
+  std::string ser_buf = proto.SerializeAsString();
+
+  Slices seq;
+  seq.emplace_back(&ser_buf[0], ser_buf.size());
+  EXPECT_FALSE(PacketStreamValidator::Validate(seq));
+}
+
+TEST(PacketStreamValidatorTest, SimplePacketWithZeroPid) {
+  protos::gen::TracePacket proto;
+  proto.set_trusted_pid(0);
+  std::string ser_buf = proto.SerializeAsString();
+
+  Slices seq;
+  seq.emplace_back(&ser_buf[0], ser_buf.size());
+  EXPECT_FALSE(PacketStreamValidator::Validate(seq));
+}
+
+TEST(PacketStreamValidatorTest, SimplePacketWithNegativeOnePid) {
+  protos::gen::TracePacket proto;
+  proto.set_trusted_pid(-1);
+  std::string ser_buf = proto.SerializeAsString();
+
+  Slices seq;
+  seq.emplace_back(&ser_buf[0], ser_buf.size());
+  EXPECT_FALSE(PacketStreamValidator::Validate(seq));
+}
+
+TEST(PacketStreamValidatorTest, ComplexPacketWithPid) {
+  protos::gen::TracePacket proto;
+  proto.mutable_for_testing()->set_str("string field");
+  proto.mutable_ftrace_events()->set_cpu(0);
+  auto* ft = proto.mutable_ftrace_events()->add_event();
+  ft->set_pid(42);
+  ft->mutable_sched_switch()->set_prev_comm("tom");
+  ft->mutable_sched_switch()->set_prev_pid(123);
+  ft->mutable_sched_switch()->set_next_comm("jerry");
+  ft->mutable_sched_switch()->set_next_pid(456);
+  proto.set_trusted_pid(123);
+  std::string ser_buf = proto.SerializeAsString();
+
+  Slices seq;
+  seq.emplace_back(&ser_buf[0], ser_buf.size());
+  EXPECT_FALSE(PacketStreamValidator::Validate(seq));
+}
+
+TEST(PacketStreamValidatorTest, FragmentedPacketWithPid) {
+  protos::gen::TracePacket proto;
+  proto.mutable_for_testing()->set_str("string field");
+  proto.set_trusted_pid(123);
+  proto.mutable_ftrace_events()->set_cpu(0);
+  auto* ft = proto.mutable_ftrace_events()->add_event();
+  ft->set_pid(42);
+  ft->mutable_sched_switch()->set_prev_comm("tom");
+  ft->mutable_sched_switch()->set_prev_pid(123);
+  ft->mutable_sched_switch()->set_next_comm("jerry");
+  ft->mutable_sched_switch()->set_next_pid(456);
+  proto.mutable_for_testing()->set_str("foo");
+  std::string ser_buf = proto.SerializeAsString();
+
+  for (size_t i = 0; i < ser_buf.size(); i++) {
+    Slices seq;
+    seq.emplace_back(&ser_buf[0], i);
+    seq.emplace_back(&ser_buf[i], ser_buf.size() - i);
+    EXPECT_FALSE(PacketStreamValidator::Validate(seq));
+  }
+}
+
 TEST(PacketStreamValidatorTest, TruncatedPacket) {
   protos::gen::TracePacket proto;
   proto.mutable_for_testing()->set_str("string field");
diff --git a/src/tracing/core/trace_buffer.cc b/src/tracing/core/trace_buffer.cc
index acdcaa9..41633aa 100644
--- a/src/tracing/core/trace_buffer.cc
+++ b/src/tracing/core/trace_buffer.cc
@@ -88,6 +88,7 @@
 // while we execute here. Don't do any processing on it other than memcpy().
 void TraceBuffer::CopyChunkUntrusted(ProducerID producer_id_trusted,
                                      uid_t producer_uid_trusted,
+                                     pid_t producer_pid_trusted,
                                      WriterID writer_id,
                                      ChunkID chunk_id,
                                      uint16_t num_fragments,
@@ -258,7 +259,7 @@
   stats_.set_bytes_written(stats_.bytes_written() + record_size);
   auto it_and_inserted = index_.emplace(
       key, ChunkMeta(GetChunkRecordAt(wptr_), num_fragments, chunk_complete,
-                     chunk_flags, producer_uid_trusted));
+                     chunk_flags, producer_uid_trusted, producer_pid_trusted));
   PERFETTO_DCHECK(it_and_inserted.second);
   TRACE_BUFFER_DLOG("  copying @ [%lu - %lu] %zu", wptr_ - begin(),
                     uintptr_t(wptr_ - begin()) + record_size, record_size);
@@ -535,7 +536,7 @@
   TRACE_BUFFER_DLOG("ReadNextTracePacket()");
 
   // Just in case we forget to initialize these below.
-  *sequence_properties = {0, kInvalidUid, 0};
+  *sequence_properties = {0, kInvalidUid, base::kInvalidPid, 0};
   *previous_packet_on_sequence_dropped = false;
 
   // At the start of each sequence iteration, we consider the last read packet
@@ -575,6 +576,7 @@
     const ProducerID trusted_producer_id = read_iter_.producer_id();
     const WriterID writer_id = read_iter_.writer_id();
     const uid_t trusted_uid = chunk_meta->trusted_uid;
+    const pid_t trusted_pid = chunk_meta->trusted_pid;
 
     // At this point we have a chunk in |chunk_meta| that has not been fully
     // read. We don't know yet whether we have enough data to read the full
@@ -648,7 +650,8 @@
         ReadPacketResult result = ReadNextPacketInChunk(chunk_meta, packet);
 
         if (PERFETTO_LIKELY(result == ReadPacketResult::kSucceeded)) {
-          *sequence_properties = {trusted_producer_id, trusted_uid, writer_id};
+          *sequence_properties = {trusted_producer_id, trusted_uid, trusted_pid,
+                                  writer_id};
           *previous_packet_on_sequence_dropped = previous_packet_dropped;
           return true;
         } else if (result == ReadPacketResult::kFailedEmptyPacket) {
@@ -673,7 +676,8 @@
       ReadAheadResult ra_res = ReadAhead(packet);
       if (ra_res == ReadAheadResult::kSucceededReturnSlices) {
         stats_.set_readaheads_succeeded(stats_.readaheads_succeeded() + 1);
-        *sequence_properties = {trusted_producer_id, trusted_uid, writer_id};
+        *sequence_properties = {trusted_producer_id, trusted_uid, trusted_pid,
+                                writer_id};
         *previous_packet_on_sequence_dropped = previous_packet_dropped;
         return true;
       }
diff --git a/src/tracing/core/trace_buffer.h b/src/tracing/core/trace_buffer.h
index 5196623..415c769 100644
--- a/src/tracing/core/trace_buffer.h
+++ b/src/tracing/core/trace_buffer.h
@@ -158,6 +158,7 @@
   struct PacketSequenceProperties {
     ProducerID producer_id_trusted;
     uid_t producer_uid_trusted;
+    pid_t producer_pid_trusted;
     WriterID writer_id;
   };
 
@@ -190,6 +191,7 @@
   // TODO(eseckler): Pass in a PacketStreamProperties instead of individual IDs.
   void CopyChunkUntrusted(ProducerID producer_id_trusted,
                           uid_t producer_uid_trusted,
+                          pid_t producer_pid_trusted,
                           WriterID writer_id,
                           ChunkID chunk_id,
                           uint16_t num_fragments,
@@ -377,8 +379,17 @@
       kLastReadPacketSkipped = 1 << 1
     };
 
-    ChunkMeta(ChunkRecord* r, uint16_t p, bool complete, uint8_t f, uid_t u)
-        : chunk_record{r}, trusted_uid{u}, flags{f}, num_fragments{p} {
+    ChunkMeta(ChunkRecord* r,
+              uint16_t p,
+              bool complete,
+              uint8_t f,
+              uid_t u,
+              pid_t pid)
+        : chunk_record{r},
+          trusted_uid{u},
+          trusted_pid(pid),
+          flags{f},
+          num_fragments{p} {
       if (complete)
         index_flags = kComplete;
     }
@@ -407,6 +418,7 @@
 
     ChunkRecord* const chunk_record;  // Addr of ChunkRecord within |data_|.
     const uid_t trusted_uid;          // uid of the producer.
+    const pid_t trusted_pid;          // pid of the producer.
 
     // Flags set by TraceBuffer to track the state of the chunk in the index.
     uint8_t index_flags = 0;
diff --git a/src/tracing/core/trace_buffer_unittest.cc b/src/tracing/core/trace_buffer_unittest.cc
index a7bf27a..dbba1e9 100644
--- a/src/tracing/core/trace_buffer_unittest.cc
+++ b/src/tracing/core/trace_buffer_unittest.cc
@@ -985,8 +985,9 @@
 
   uint8_t valid_ptr = 0;
   trace_buffer()->CopyChunkUntrusted(
-      ProducerID(1), uid_t(0), WriterID(1), ChunkID(1), 1 /* num packets */,
-      0 /* flags */, true /* chunk_complete */, &valid_ptr, sizeof(valid_ptr));
+      ProducerID(1), uid_t(0), pid_t(0), WriterID(1), ChunkID(1),
+      1 /* num packets */, 0 /* flags */, true /* chunk_complete */, &valid_ptr,
+      sizeof(valid_ptr));
 
   CreateChunk(ProducerID(1), WriterID(1), ChunkID(2))
       .AddPacket(32, 'b')
@@ -1090,8 +1091,9 @@
   chunk.insert(chunk.end(), 128 - sizeof(ChunkRecord), 0xff);
   chunk.back() = 0x7f;
   trace_buffer()->CopyChunkUntrusted(
-      ProducerID(4), uid_t(0), WriterID(1), ChunkID(1), 1 /* num packets */,
-      0 /* flags*/, true /* chunk_complete */, chunk.data(), chunk.size());
+      ProducerID(4), uid_t(0), pid_t(0), WriterID(1), ChunkID(1),
+      1 /* num packets */, 0 /* flags*/, true /* chunk_complete */,
+      chunk.data(), chunk.size());
 
   // Add a valid chunk.
   CreateChunk(ProducerID(1), WriterID(1), ChunkID(1))
@@ -1115,8 +1117,9 @@
   chunk.back() = 0x7f;
   for (int i = 0; i < 3; i++) {
     trace_buffer()->CopyChunkUntrusted(
-        ProducerID(1), uid_t(0), WriterID(1), ChunkID(1), 1 /* num packets */,
-        0 /* flags */, true /* chunk_complete */, chunk.data(), chunk.size());
+        ProducerID(1), uid_t(0), pid_t(0), WriterID(1), ChunkID(1),
+        1 /* num packets */, 0 /* flags */, true /* chunk_complete */,
+        chunk.data(), chunk.size());
   }
 
   trace_buffer()->BeginRead();
diff --git a/src/tracing/core/tracing_service_impl.cc b/src/tracing/core/tracing_service_impl.cc
index b898a1a..9117a8a 100644
--- a/src/tracing/core/tracing_service_impl.cc
+++ b/src/tracing/core/tracing_service_impl.cc
@@ -363,6 +363,7 @@
 std::unique_ptr<TracingService::ProducerEndpoint>
 TracingServiceImpl::ConnectProducer(Producer* producer,
                                     uid_t uid,
+                                    pid_t pid,
                                     const std::string& producer_name,
                                     size_t shared_memory_size_hint_bytes,
                                     bool in_process,
@@ -398,7 +399,7 @@
   }
 
   std::unique_ptr<ProducerEndpointImpl> endpoint(new ProducerEndpointImpl(
-      id, uid, this, task_runner_, producer, producer_name, sdk_version,
+      id, uid, pid, this, task_runner_, producer, producer_name, sdk_version,
       in_process, smb_scraping_enabled));
   auto it_and_inserted = producers_.emplace(id, endpoint.get());
   PERFETTO_DCHECK(it_and_inserted.second);
@@ -1848,9 +1849,9 @@
           chunk.header()->chunk_id.load(std::memory_order_relaxed);
 
       CopyProducerPageIntoLogBuffer(
-          producer->id_, producer->uid_, writer_id, chunk_id, *target_buffer_id,
-          packet_count, flags, chunk_complete, chunk.payload_begin(),
-          chunk.payload_size());
+          producer->id_, producer->uid_, producer->pid_, writer_id, chunk_id,
+          *target_buffer_id, packet_count, flags, chunk_complete,
+          chunk.payload_begin(), chunk.payload_size());
     }
   }
 }
@@ -2133,6 +2134,9 @@
       PERFETTO_DCHECK(sequence_properties.producer_id_trusted != 0);
       PERFETTO_DCHECK(sequence_properties.writer_id != 0);
       PERFETTO_DCHECK(sequence_properties.producer_uid_trusted != kInvalidUid);
+      // Not checking sequence_properties.producer_pid_trusted: it is
+      // base::kInvalidPid if the platform doesn't support it.
+
       PERFETTO_DCHECK(packet.size() > 0);
       if (!PacketStreamValidator::Validate(packet.slices())) {
         tracing_session->invalid_packets++;
@@ -2157,6 +2161,11 @@
           tracing_session->GetPacketSequenceID(
               sequence_properties.producer_id_trusted,
               sequence_properties.writer_id));
+      if (sequence_properties.producer_pid_trusted != base::kInvalidPid) {
+        // Not supported on all platforms.
+        trusted_packet->set_trusted_pid(
+            static_cast<int32_t>(sequence_properties.producer_pid_trusted));
+      }
       if (previous_packet_dropped)
         trusted_packet->set_previous_packet_dropped(previous_packet_dropped);
       slice.size = trusted_packet.Finalize();
@@ -2695,6 +2704,7 @@
 void TracingServiceImpl::CopyProducerPageIntoLogBuffer(
     ProducerID producer_id_trusted,
     uid_t producer_uid_trusted,
+    pid_t producer_pid_trusted,
     WriterID writer_id,
     ChunkID chunk_id,
     BufferID buffer_id,
@@ -2749,9 +2759,10 @@
     return;
   }
 
-  buf->CopyChunkUntrusted(producer_id_trusted, producer_uid_trusted, writer_id,
-                          chunk_id, num_fragments, chunk_flags, chunk_complete,
-                          src, size);
+  buf->CopyChunkUntrusted(producer_id_trusted, producer_uid_trusted,
+                          producer_pid_trusted, writer_id, chunk_id,
+                          num_fragments, chunk_flags, chunk_complete, src,
+                          size);
 }
 
 void TracingServiceImpl::ApplyChunkPatches(
@@ -3701,6 +3712,7 @@
 TracingServiceImpl::ProducerEndpointImpl::ProducerEndpointImpl(
     ProducerID id,
     uid_t uid,
+    pid_t pid,
     TracingServiceImpl* service,
     base::TaskRunner* task_runner,
     Producer* producer,
@@ -3710,6 +3722,7 @@
     bool smb_scraping_enabled)
     : id_(id),
       uid_(uid),
+      pid_(pid),
       service_(service),
       task_runner_(task_runner),
       producer_(producer),
@@ -3799,7 +3812,8 @@
     uint8_t chunk_flags = packets.flags;
 
     service_->CopyProducerPageIntoLogBuffer(
-        id_, uid_, writer_id, chunk_id, buffer_id, num_fragments, chunk_flags,
+        id_, uid_, pid_, writer_id, chunk_id, buffer_id, num_fragments,
+        chunk_flags,
         /*chunk_complete=*/true, chunk.payload_begin(), chunk.payload_size());
 
     // This one has release-store semantics.
diff --git a/src/tracing/core/tracing_service_impl.h b/src/tracing/core/tracing_service_impl.h
index 8100968..5c4e650 100644
--- a/src/tracing/core/tracing_service_impl.h
+++ b/src/tracing/core/tracing_service_impl.h
@@ -86,6 +86,7 @@
    public:
     ProducerEndpointImpl(ProducerID,
                          uid_t uid,
+                         pid_t pid,
                          TracingServiceImpl*,
                          base::TaskRunner*,
                          Producer*,
@@ -149,6 +150,7 @@
 
     ProducerID const id_;
     const uid_t uid_;
+    const pid_t pid_;
     TracingServiceImpl* const service_;
     base::TaskRunner* const task_runner_;
     Producer* producer_;
@@ -255,6 +257,7 @@
   void UnregisterDataSource(ProducerID, const std::string& name);
   void CopyProducerPageIntoLogBuffer(ProducerID,
                                      uid_t,
+                                     pid_t,
                                      WriterID,
                                      ChunkID,
                                      BufferID,
@@ -315,6 +318,7 @@
   std::unique_ptr<TracingService::ProducerEndpoint> ConnectProducer(
       Producer*,
       uid_t uid,
+      pid_t pid,
       const std::string& producer_name,
       size_t shared_memory_size_hint_bytes = 0,
       bool in_process = false,
diff --git a/src/tracing/core/tracing_service_impl_unittest.cc b/src/tracing/core/tracing_service_impl_unittest.cc
index 6ee5c62..41ef5d8 100644
--- a/src/tracing/core/tracing_service_impl_unittest.cc
+++ b/src/tracing/core/tracing_service_impl_unittest.cc
@@ -1416,9 +1416,9 @@
   producer->WaitForDataSourceStart("data_source");
 
   std::unique_ptr<MockProducer> producer_otheruid = CreateMockProducer();
-  auto x =
-      svc->ConnectProducer(producer_otheruid.get(),
-                           base::GetCurrentUserId() + 1, "mock_producer_ouid");
+  auto x = svc->ConnectProducer(producer_otheruid.get(),
+                                base::GetCurrentUserId() + 1,
+                                base::GetProcessId(), "mock_producer_ouid");
   EXPECT_CALL(*producer_otheruid, OnConnect()).Times(0);
   task_runner.RunUntilIdle();
   Mock::VerifyAndClearExpectations(producer_otheruid.get());
@@ -1786,7 +1786,7 @@
     auto name = "mock_producer_" + std::to_string(i);
     producer[i] = CreateMockProducer();
     producer[i]->Connect(svc.get(), name, base::GetCurrentUserId(),
-                         kSizes[i].hint_size_kb * 1024,
+                         base::GetProcessId(), kSizes[i].hint_size_kb * 1024,
                          kSizes[i].hint_page_size_kb * 1024);
     producer[i]->RegisterDataSource("data_source");
   }
@@ -2427,11 +2427,13 @@
   consumer->Connect(svc.get());
 
   std::unique_ptr<MockProducer> producer1 = CreateMockProducer();
-  producer1->Connect(svc.get(), "mock_producer1", 123u /* uid */);
+  producer1->Connect(svc.get(), "mock_producer1", 123u /* uid */,
+                     1001 /* pid */);
   producer1->RegisterDataSource("data_source");
 
   std::unique_ptr<MockProducer> producer2 = CreateMockProducer();
-  producer2->Connect(svc.get(), "mock_producer2", 456u /* uid */);
+  producer2->Connect(svc.get(), "mock_producer2", 456u /* uid */,
+                     2002 /* pid */);
   producer2->RegisterDataSource("data_source");
 
   TraceConfig trace_config;
@@ -2482,6 +2484,7 @@
           Property(&protos::gen::TracePacket::for_testing,
                    Property(&protos::gen::TestEvent::str, Eq("payload1a1"))),
           Property(&protos::gen::TracePacket::trusted_uid, Eq(123)),
+          Property(&protos::gen::TracePacket::trusted_pid, Eq(1001)),
           Property(&protos::gen::TracePacket::trusted_packet_sequence_id,
                    Eq(2u)))));
   EXPECT_THAT(
@@ -2490,6 +2493,7 @@
           Property(&protos::gen::TracePacket::for_testing,
                    Property(&protos::gen::TestEvent::str, Eq("payload1a2"))),
           Property(&protos::gen::TracePacket::trusted_uid, Eq(123)),
+          Property(&protos::gen::TracePacket::trusted_pid, Eq(1001)),
           Property(&protos::gen::TracePacket::trusted_packet_sequence_id,
                    Eq(2u)))));
   EXPECT_THAT(
@@ -2498,6 +2502,7 @@
           Property(&protos::gen::TracePacket::for_testing,
                    Property(&protos::gen::TestEvent::str, Eq("payload1b1"))),
           Property(&protos::gen::TracePacket::trusted_uid, Eq(123)),
+          Property(&protos::gen::TracePacket::trusted_pid, Eq(1001)),
           Property(&protos::gen::TracePacket::trusted_packet_sequence_id,
                    Eq(3u)))));
   EXPECT_THAT(
@@ -2506,6 +2511,7 @@
           Property(&protos::gen::TracePacket::for_testing,
                    Property(&protos::gen::TestEvent::str, Eq("payload1b2"))),
           Property(&protos::gen::TracePacket::trusted_uid, Eq(123)),
+          Property(&protos::gen::TracePacket::trusted_pid, Eq(1001)),
           Property(&protos::gen::TracePacket::trusted_packet_sequence_id,
                    Eq(3u)))));
   EXPECT_THAT(
@@ -2514,6 +2520,7 @@
           Property(&protos::gen::TracePacket::for_testing,
                    Property(&protos::gen::TestEvent::str, Eq("payload2a1"))),
           Property(&protos::gen::TracePacket::trusted_uid, Eq(456)),
+          Property(&protos::gen::TracePacket::trusted_pid, Eq(2002)),
           Property(&protos::gen::TracePacket::trusted_packet_sequence_id,
                    Eq(4u)))));
 }
@@ -3728,7 +3735,7 @@
   SharedMemory* shm_raw = shm.get();
 
   // Service should adopt the SMB provided by the producer.
-  producer->Connect(svc.get(), "mock_producer", /*uid=*/42,
+  producer->Connect(svc.get(), "mock_producer", /*uid=*/42, /*pid=*/1025,
                     /*shared_memory_size_hint_bytes=*/0, kShmPageSizeBytes,
                     std::move(shm));
   EXPECT_TRUE(producer->endpoint()->IsShmemProvidedByProducer());
@@ -3783,7 +3790,7 @@
 
   // Service should not adopt the SMB provided by the producer, because the SMB
   // size isn't a multiple of the page size.
-  producer->Connect(svc.get(), "mock_producer", /*uid=*/42,
+  producer->Connect(svc.get(), "mock_producer", /*uid=*/42, /*pid=*/1025,
                     /*shared_memory_size_hint_bytes=*/0, kShmPageSizeBytes,
                     std::move(shm));
   EXPECT_FALSE(producer->endpoint()->IsShmemProvidedByProducer());
diff --git a/src/tracing/internal/in_process_tracing_backend.cc b/src/tracing/internal/in_process_tracing_backend.cc
index d71a62f..ee1283d 100644
--- a/src/tracing/internal/in_process_tracing_backend.cc
+++ b/src/tracing/internal/in_process_tracing_backend.cc
@@ -86,7 +86,7 @@
     const ConnectProducerArgs& args) {
   PERFETTO_DCHECK(args.task_runner->RunsTasksOnCurrentThread());
   return GetOrCreateService(args.task_runner)
-      ->ConnectProducer(args.producer, /*uid=*/0, args.producer_name,
+      ->ConnectProducer(args.producer, /*uid=*/0, /*pid=*/0, args.producer_name,
                         args.shmem_size_hint_bytes,
                         /*in_process=*/true,
                         TracingService::ProducerSMBScrapingMode::kEnabled,
diff --git a/src/tracing/ipc/service/producer_ipc_service.cc b/src/tracing/ipc/service/producer_ipc_service.cc
index e15badb..5f9a387 100644
--- a/src/tracing/ipc/service/producer_ipc_service.cc
+++ b/src/tracing/ipc/service/producer_ipc_service.cc
@@ -117,7 +117,7 @@
 
   // ConnectProducer will call OnConnect() on the next task.
   producer->service_endpoint = core_service_->ConnectProducer(
-      producer.get(), client_info.uid(), req.producer_name(),
+      producer.get(), client_info.uid(), client_info.pid(), req.producer_name(),
       req.shared_memory_size_hint_bytes(),
       /*in_process=*/false, smb_scraping_mode,
       req.shared_memory_page_size_hint_bytes(), std::move(shmem),
diff --git a/src/tracing/test/fake_packet.cc b/src/tracing/test/fake_packet.cc
index eef70df..d86d1b3 100644
--- a/src/tracing/test/fake_packet.cc
+++ b/src/tracing/test/fake_packet.cc
@@ -158,7 +158,7 @@
 }
 
 size_t FakeChunk::CopyIntoTraceBuffer(bool chunk_complete) {
-  trace_buffer_->CopyChunkUntrusted(producer_id, uid, writer_id, chunk_id,
+  trace_buffer_->CopyChunkUntrusted(producer_id, uid, pid, writer_id, chunk_id,
                                     num_packets, flags, chunk_complete,
                                     data.data(), data.size());
   return data.size() + TraceBuffer::InlineChunkHeaderSize;
diff --git a/src/tracing/test/fake_packet.h b/src/tracing/test/fake_packet.h
index 117df5c..76ad63e 100644
--- a/src/tracing/test/fake_packet.h
+++ b/src/tracing/test/fake_packet.h
@@ -24,6 +24,7 @@
 #include <string>
 #include <vector>
 
+#include "perfetto/ext/base/utils.h"
 #include "perfetto/ext/tracing/core/basic_types.h"
 
 namespace perfetto {
@@ -85,6 +86,7 @@
   uint8_t flags = 0;
   uint16_t num_packets = 0;
   uid_t uid = kInvalidUid;
+  pid_t pid = base::kInvalidPid;
   std::vector<uint8_t> data;
 };
 
diff --git a/src/tracing/test/mock_producer.cc b/src/tracing/test/mock_producer.cc
index 8f339d6..ff9f9b9 100644
--- a/src/tracing/test/mock_producer.cc
+++ b/src/tracing/test/mock_producer.cc
@@ -69,12 +69,13 @@
 void MockProducer::Connect(TracingService* svc,
                            const std::string& producer_name,
                            uid_t uid,
+                           pid_t pid,
                            size_t shared_memory_size_hint_bytes,
                            size_t shared_memory_page_size_hint_bytes,
                            std::unique_ptr<SharedMemory> shm) {
   producer_name_ = producer_name;
   service_endpoint_ = svc->ConnectProducer(
-      this, uid, producer_name, shared_memory_size_hint_bytes,
+      this, uid, pid, producer_name, shared_memory_size_hint_bytes,
       /*in_process=*/true, TracingService::ProducerSMBScrapingMode::kDefault,
       shared_memory_page_size_hint_bytes, std::move(shm));
   auto checkpoint_name = "on_producer_connect_" + producer_name;
diff --git a/src/tracing/test/mock_producer.h b/src/tracing/test/mock_producer.h
index fd8968c..59ff7c2 100644
--- a/src/tracing/test/mock_producer.h
+++ b/src/tracing/test/mock_producer.h
@@ -48,6 +48,7 @@
   void Connect(TracingService* svc,
                const std::string& producer_name,
                uid_t uid = 42,
+               pid_t pid = 1025,
                size_t shared_memory_size_hint_bytes = 0,
                size_t shared_memory_page_size_hint_bytes = 0,
                std::unique_ptr<SharedMemory> shm = nullptr);
diff --git a/tools/record_android_trace b/tools/record_android_trace
index ccd8473..db0cffe 100755
--- a/tools/record_android_trace
+++ b/tools/record_android_trace
@@ -219,8 +219,25 @@
   device_file = device_dir + fname
 
   cmd = [perfetto_cmd, '--background', '--txt', '-o', device_file]
+  on_device_config = None
+  on_host_config = None
   if args.config is not None:
     cmd += ['-c', '-']
+    if api_level < 24:
+      # adb shell does not redirect stdin. Push the config on a temporary file
+      # on the device.
+      mktmp = adb('shell', 'mktemp', '--tmpdir', '/data/local/tmp', stdout=subprocess.PIPE)
+      on_device_config = mktmp.communicate()[0].decode().strip().strip()
+      if mktmp.returncode != 0:
+        prt('Failed to create config on device', ANSI.RED)
+        sys.exit(1)
+      exit_code = adb('push', '--sync', args.config, on_device_config).wait()
+      if exit_code != 0:
+        prt('Failed to push config on device', ANSI.RED)
+        sys.exit(1)
+      cmd = ['cat', on_device_config, '|'] + cmd
+    else:
+      on_host_config = args.config
   else:
     cmd += ['-t', args.time, '-b', args.buffer]
     for app in args.app:
@@ -243,10 +260,12 @@
   if not os.path.exists(host_dir):
     shutil.os.makedirs(host_dir)
 
-  with open(args.config or os.devnull, 'rb') as f:
+  with open(on_host_config or os.devnull, 'rb') as f:
     print('Running ' + ' '.join(cmd))
     proc = adb('shell', *cmd, stdin=f, stdout=subprocess.PIPE)
     proc_out = proc.communicate()[0].decode().strip()
+    if on_device_config is not None:
+      adb('shell', 'rm', on_device_config).wait()
     # On older versions of Android (x86_64 emulator running API 22) the output
     # looks like:
     #   WARNING: linker: /data/local/tmp/tracebox: unused DT entry: ...
diff --git a/ui/release/channels.json b/ui/release/channels.json
index d329006..90350bb 100644
--- a/ui/release/channels.json
+++ b/ui/release/channels.json
@@ -6,7 +6,7 @@
     },
     {
       "name": "canary",
-      "rev": "4a40a87235f5715a909c0d082ca6a1b318c9b210"
+      "rev": "3ff938b95864694fcb3df4cd0741889dbe36edd8"
     },
     {
       "name": "autopush",