blob: 4b82971a1fbe031895dd9b932d0e8891fbf64db8 [file] [log] [blame] [edit]
#!/usr/bin/env python3
# Copyright (C) 2025 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import argparse
import fnmatch
import os
import re
import subprocess
import sys
import time
from typing import List, Optional, Tuple, Iterable
# The script can be executed as a git hook symlink. Using `git` to find the
# repo root is more resilient for worktree usage as a git hook.
try:
REPO_ROOT = subprocess.check_output(['git', 'rev-parse', '--show-toplevel'],
text=True,
stderr=subprocess.PIPE).strip()
except (subprocess.CalledProcessError, FileNotFoundError) as e:
print(
f"Error: Could not determine repository root via git. {e}",
file=sys.stderr)
sys.exit(1)
IS_WIN = sys.platform.startswith('win')
def RunAndReportIfLong(func, *args, **kargs):
"""Runs function, reports runtime if it exceeds a limit."""
start = time.time()
results = func(*args, **kargs)
end = time.time()
limit = 3.0 # seconds
name = func.__name__
runtime = end - start
if runtime > limit:
print(f"{name} took >{limit:.2f}s ({runtime:.2f}s)", file=sys.stderr)
return results
def run_command(args: List[str],
check: bool = True,
**kwargs) -> subprocess.CompletedProcess:
"""Helper to run a subprocess. Handles capture_output conflict."""
# Decide whether to use capture_output based on kwargs
use_capture_output = 'stdout' not in kwargs and 'stderr' not in kwargs
try:
if use_capture_output:
# Default case: capture stdout/stderr using capture_output
return subprocess.run(
args, capture_output=True, text=True, check=check, **kwargs)
else:
# Case where stdout or stderr is explicitly provided (e.g., DEVNULL)
# Don't use capture_output=True. Ensure text=True if not overridden.
kwargs.setdefault('text', True)
# Filter out capture_output if it was somehow passed in kwargs
kwargs.pop('capture_output', None)
return subprocess.run(args, check=check, **kwargs)
except FileNotFoundError:
# Specific handling for command not found
print(
f"Error: Command '{args[0]}' not found. Please ensure it's installed "
f"and in your PATH.",
file=sys.stderr)
sys.exit(1)
except subprocess.CalledProcessError as e:
# Reraise for caller to handle, or handle more specifically if needed
raise e
def get_changed_files(merge_base: Optional[str]) -> List[str]:
"""Gets list of files changed since merge_base, or all relevant files if
merge_base is None."""
try:
if merge_base:
# Status checks modified/added files, diff checks files changed since
# merge-base
status_result = run_command(['git', 'status', '--porcelain'])
# Ignore deleted
staged_or_modified = [
line.split()[-1]
for line in status_result.stdout.splitlines()
if not line.startswith(' D')
]
# Find the merge-base between merge_base and HEAD to avoid picking up
# other changes if merge_base has moved on.
try:
merge_base_result = run_command(
['git', 'merge-base', merge_base, 'HEAD'])
merge_base_commit = merge_base_result.stdout.strip()
except Exception as _:
# On the CI, the merge base cannot be found for PRs from external
# contributors since we do a shallow clone. In that case, use the
# `merge_base` directly.
merge_base_commit = merge_base
diff_result = run_command([
'git', 'diff', '--name-only', '--diff-filter=crd', merge_base_commit,
'HEAD'
])
diff_files = diff_result.stdout.splitlines()
# Combine and make unique, relative to repo root
all_changed = set(staged_or_modified) | set(diff_files)
# Normalize paths
return sorted([os.path.normpath(f) for f in all_changed])
else:
# No merge base, get all tracked files
result = run_command(['git', 'ls-files'])
return sorted([os.path.normpath(f) for f in result.stdout.splitlines()])
except (subprocess.CalledProcessError, FileNotFoundError) as e:
print(f"Error getting changed files: {e}", file=sys.stderr)
sys.exit(1)
def filter_files(files: List[str],
files_to_check: Optional[Iterable[str]] = None,
files_to_skip: Optional[Iterable[str]] = None) -> List[str]:
"""Filters a list of files based on include/exclude patterns (fnmatch)."""
filtered = []
# Use glob patterns (fnmatch) directly
check_patterns = list(files_to_check) if files_to_check else [
'*'
] # Default: check all
skip_patterns = list(files_to_skip) if files_to_skip else []
# fnmatch generally works well with forward slashes even on Windows
# No need to normalize patterns unless specific issues arise
for f in files:
# Use os.path.normpath for consistent separators in the file path being
# checked
norm_f = os.path.normpath(f)
# Check skip patterns first
if any(
fnmatch.fnmatch(norm_f, pattern) or
fnmatch.fnmatch(os.path.basename(norm_f), pattern)
for pattern in skip_patterns):
# Check against full path and just the filename for flexibility
continue
# Check include patterns
if any(
fnmatch.fnmatch(norm_f, pattern) or
fnmatch.fnmatch(os.path.basename(norm_f), pattern)
for pattern in check_patterns):
filtered.append(f) # Return original path 'f'
return filtered
def read_file_content(filepath: str) -> Optional[List[str]]:
"""Reads a file's content, returns lines or None on error."""
try:
# Use absolute path for reading
abs_path = os.path.join(REPO_ROOT, filepath)
with open(abs_path, 'r', encoding='utf-8', errors='ignore') as f:
return f.readlines()
except FileNotFoundError:
# This can happen if a file listed in diff was deleted or moved before
# reading
print(f"Warning: File not found during read {filepath}", file=sys.stderr)
return None
except Exception as e:
print(f"Error reading file {filepath}: {e}", file=sys.stderr)
return None
def get_git_file_content(ref: str, filepath: str) -> Optional[List[str]]:
"""Gets file content from a specific git ref."""
try:
# Use relative path for git show
result = run_command(['git', 'show', f'{ref}:{filepath}'],
check=False) # Don't fail if file didn't exist in ref
if result.returncode == 0:
return result.stdout.splitlines(keepends=True)
return [] # File didn't exist or was deleted, treat as empty old content
except (subprocess.CalledProcessError, FileNotFoundError) as e:
print(
f"Error getting git content for {ref}:{filepath}: {e}", file=sys.stderr)
return None # Indicate error
def get_presubmit_commit_message() -> str:
"""
Gets the relevant commit message for presubmit testing.
When GitHub CI is checking a pull request, HEAD is a temporary merge commit.
In that scenario, we want the message from the pull request's actual branch,
which is the second parent (HEAD^2). For local runs, this is not a merge
commit, so we fall back to HEAD.
"""
try:
# Check if HEAD is a merge commit by seeing if it has a second parent. We
# use --verify to ensure the ref exists, and check=False to handle the
# failure gracefully when HEAD isn't a merge commit.
proc = run_command(
['git', 'rev-parse', '--verify', 'HEAD^2'],
check=False,
stdout=subprocess.PIPE, # So we can read the hash below.
stderr=subprocess.DEVNULL)
if proc.returncode == 0:
# It's a merge commit. Get the message from the second parent.
head_ref = proc.stdout.strip()
print("Detected merge commit. Reading message from pull request HEAD "
f"({head_ref[:8]}).")
else:
# Not a merge commit (e.g., local run). Get the message from HEAD.
head_ref = 'HEAD'
result = run_command(['git', 'log', '-1', '--pretty=%B', head_ref])
return result.stdout
except (subprocess.CalledProcessError, FileNotFoundError) as e:
print(f"Error getting commit message: {e}", file=sys.stderr)
return "" # Return empty string on error
def CheckDoNotSubmit(changed_files: List[str]) -> List[str]:
errors = []
pattern = re.compile(r'DO NOT SUBMIT', re.IGNORECASE)
files_to_check_list = filter_files(
changed_files, files_to_skip=['tools/run_presubmit'])
for f_path in files_to_check_list:
content = read_file_content(f_path)
if content:
for i, line in enumerate(content):
if pattern.search(line):
errors.append(f"{f_path}:{i+1}: Found 'DO NOT SUBMIT'")
return errors
def CheckChangeHasNoTabs(changed_files: List[str]) -> List[str]:
errors = []
# Basic filter: skip common binary/data files where tabs might be ok,
# c++ files are already handled by clang-format.
files_to_check_list = filter_files(
changed_files,
files_to_skip=[
'*.cc',
'*.h',
'*.md',
'*.pb',
'*.png',
'*.jpg',
'*/test/data/*',
'Makefile',
'rules',
'*.descriptor',
])
for f_path in files_to_check_list:
content = read_file_content(f_path)
if content:
for i, line in enumerate(content):
if '\t' in line:
errors.append(f"{f_path}:{i+1}: Found tab character")
break # Only report once per file
return errors
def CheckCodeFormatted(merge_base: str, skip_formatters: str) -> List[str]:
# NOTE: format-sources does its own invocation of git diff ... to detect
# changed file. We use that, rather than passing the list here, to avoid
# ending in a situation where run_presubmit fails, but then
# tools/format_sources succeeds because they diverge in their diff logic.
tool = os.path.join(REPO_ROOT, 'tools/format-sources')
try:
args = [
tool,
'--quiet',
'--check-only',
'--skip=' + skip_formatters,
'--upstream=' + merge_base,
]
run_command(args, stderr=subprocess.STDOUT)
except subprocess.CalledProcessError:
return [f'Please run {tool} to format sources.']
return []
def CheckIncludeGuards(changed_files: List[str]) -> List[str]:
if sys.platform == 'win32':
return []
tool_rel_path = 'tools/fix_include_guards'
tool = os.path.join(REPO_ROOT, tool_rel_path)
# Use fnmatch patterns
files_to_check_list = filter_files(
changed_files, files_to_check=['*.cc', '*.h', tool_rel_path])
if not files_to_check_list:
return []
# Check if the tool itself changed
tool_changed = tool_rel_path in [os.path.normpath(f) for f in changed_files]
# Check if relevant source files changed
source_files_changed = any(f != tool_rel_path for f in files_to_check_list)
if not tool_changed and not source_files_changed:
return []
try:
# Run the tool relative to the repo root
run_command([sys.executable, tool, '--check-only'], cwd=REPO_ROOT)
except FileNotFoundError:
return [f"Tool not found: {tool}"]
except subprocess.CalledProcessError:
return [f'Please run python {tool} to fix include guards.']
return []
def CheckIncludeViolations(changed_files: List[str]) -> List[str]:
if sys.platform == 'win32':
return []
tool_rel_path = 'tools/check_include_violations'
tool = os.path.join(REPO_ROOT, tool_rel_path)
# Use fnmatch patterns
files_to_check_list = filter_files(
changed_files, files_to_check=['include/*.h', tool_rel_path])
if not files_to_check_list:
return []
tool_changed = tool_rel_path in [os.path.normpath(f) for f in changed_files]
source_files_changed = any(f != tool_rel_path for f in files_to_check_list)
if not tool_changed and not source_files_changed:
return []
try:
run_command([sys.executable, tool], cwd=REPO_ROOT)
except FileNotFoundError:
return [f"Tool not found: {tool}"]
except subprocess.CalledProcessError:
return [f'{tool} failed.']
return []
def CheckIncludePaths(changed_files: List[str]) -> List[str]:
# Uses fnmatch patterns
files_to_check_list = filter_files(
changed_files,
files_to_check=['*.h', '*.cc'],
files_to_skip=['examples/sdk/example.cc'])
error_lines = []
pattern = re.compile(r'^#include "(.*\.h)"')
for f_path in files_to_check_list:
content = read_file_content(f_path)
if not content:
continue
for i, line in enumerate(content):
match = pattern.search(line)
if not match:
continue
if '// no-include-violation-check' in line:
continue
inc_hdr = match.group(1)
# Normalize path separators for comparison
norm_inc_hdr = os.path.normpath(inc_hdr)
# Check if it starts with 'include/perfetto' or 'include\perfetto'
if norm_inc_hdr.startswith(os.path.join('include', 'perfetto')):
error_lines.append(
f' {f_path}:{i+1}: Redundant "include/" in #include path'
f' "{inc_hdr}"')
# Check for relative paths (no directory separator in the normalized path)
# Exclude paths already starting with 'perfetto/' which is allowed
# relative to include/
has_no_os_sep = os.path.sep not in norm_inc_hdr
has_no_fwd_slash = '/' not in norm_inc_hdr
does_not_start_with_perfetto = not norm_inc_hdr.startswith('perfetto/')
if has_no_os_sep and has_no_fwd_slash and does_not_start_with_perfetto:
# Also allow includes relative to 'src/' for internal headers
if not norm_inc_hdr.startswith('src/'):
error_lines.append(
f' {f_path}:{i+1}: relative #include "{inc_hdr}" not allowed, '
f'use full path from include dir (perfetto/...) or project root '
f'(src/...) ')
return [] if not error_lines else [
'Invalid #include paths detected:\n' + '\n'.join(error_lines)
]
def CheckProtoComments(changed_files: List[str]) -> List[str]:
if sys.platform == 'win32':
return []
tool_rel_path = 'tools/check_proto_comments'
tool = os.path.join(REPO_ROOT, tool_rel_path)
# Use fnmatch patterns
files_to_check_list = filter_files(
changed_files, files_to_check=['protos/perfetto/*.proto', tool_rel_path])
if not files_to_check_list:
return []
tool_changed = tool_rel_path in [os.path.normpath(f) for f in changed_files]
source_files_changed = any(f != tool_rel_path for f in files_to_check_list)
if not tool_changed and not source_files_changed:
return []
try:
run_command([sys.executable, tool], cwd=REPO_ROOT)
except FileNotFoundError:
return [f"Tool not found: {tool}"]
except subprocess.CalledProcessError:
return [f'{tool} failed']
return []
def CheckBuild(changed_files: List[str]) -> List[str]:
if sys.platform == 'win32':
return []
tool_rel_path = 'tools/gen_bazel'
tool = os.path.join(REPO_ROOT, tool_rel_path)
files_to_check_list = filter_files(
changed_files,
files_to_check=['*BUILD.gn', '*.gni', 'BUILD.extras', tool_rel_path])
if not files_to_check_list:
return []
tool_changed = tool_rel_path in [os.path.normpath(f) for f in changed_files]
source_files_changed = any(f != tool_rel_path for f in files_to_check_list)
if not tool_changed and not source_files_changed:
return []
try:
# Run the tool relative to the repo root
run_command([sys.executable, tool, '--check-only'], cwd=REPO_ROOT)
except FileNotFoundError:
return [f"Tool not found: {tool}"]
except subprocess.CalledProcessError:
return [
f'Bazel BUILD(s) are out of date. Run python {tool} to update them.'
]
return []
def CheckAndroidBlueprint(changed_files: List[str]) -> List[str]:
if sys.platform == 'win32':
return []
tool_rel_path = 'tools/gen_android_bp'
tool = os.path.join(REPO_ROOT, tool_rel_path)
# Use fnmatch patterns
files_to_check_list = filter_files(
changed_files,
files_to_check=['*BUILD.gn', '*.gni', tool_rel_path],
files_to_skip=['src/trace_processor/perfetto_sql/stdlib/chrome/BUILD.gn'])
if not files_to_check_list:
return []
tool_changed = tool_rel_path in [os.path.normpath(f) for f in changed_files]
source_files_changed = any(f != tool_rel_path for f in files_to_check_list)
if not tool_changed and not source_files_changed:
return []
try:
run_command([sys.executable, tool, '--check-only'], cwd=REPO_ROOT)
except FileNotFoundError:
return [f"Tool not found: {tool}"]
except subprocess.CalledProcessError:
return [
f'Android build files are out of date. Run python {tool} to update '
'them.'
]
return []
def EnsureProtoBinaryExists():
out_dir = os.path.join(REPO_ROOT, 'out', 'presubmits')
protoc_path = os.path.join(out_dir, 'protoc') + ('.exe' if IS_WIN else '')
if os.path.exists(protoc_path):
return protoc_path
gn = os.path.join(REPO_ROOT, 'tools', 'gn')
ninja = os.path.join(REPO_ROOT, 'tools', 'ninja')
gn_args = 'is_debug=false'
subprocess.check_call(
[sys.executable, gn, 'gen', f'--args={gn_args}', out_dir], cwd=REPO_ROOT)
subprocess.check_call([sys.executable, ninja, '-C', out_dir, 'protoc'])
return protoc_path
def CheckBinaryDescriptors(changed_files: List[str]) -> List[str]:
if sys.platform == 'win32':
return []
tool_rel_path = 'tools/gen_binary_descriptors'
tool = os.path.join(REPO_ROOT, tool_rel_path)
# Use fnmatch patterns
files_to_check_list = filter_files(
changed_files,
files_to_check=['protos/perfetto/*.proto', '*.h', tool_rel_path])
if not files_to_check_list:
return []
tool_changed = tool_rel_path in [os.path.normpath(f) for f in changed_files]
source_files_changed = any(f != tool_rel_path for f in files_to_check_list)
if not tool_changed and not source_files_changed:
return []
protoc = EnsureProtoBinaryExists()
try:
run_command([sys.executable, tool, '--check-only', f'--protoc={protoc}'],
cwd=REPO_ROOT)
except FileNotFoundError:
return [f"Tool not found: {tool}"]
except subprocess.CalledProcessError:
return [f'Please run python {tool} to update binary descriptors.']
return []
def CheckMergedTraceConfigProto(changed_files: List[str]) -> List[str]:
if sys.platform == 'win32':
return []
tool_rel_path = 'tools/gen_merged_protos'
tool = os.path.join(REPO_ROOT, tool_rel_path)
# Use fnmatch patterns
files_to_check_list = filter_files(
changed_files, files_to_check=['protos/perfetto/*.proto', tool_rel_path])
if not files_to_check_list:
return []
tool_changed = tool_rel_path in [os.path.normpath(f) for f in changed_files]
source_files_changed = any(f != tool_rel_path for f in files_to_check_list)
if not tool_changed and not source_files_changed:
return []
try:
run_command([sys.executable, tool, '--check-only'], cwd=REPO_ROOT)
except FileNotFoundError:
return [f"Tool not found: {tool}"]
except subprocess.CalledProcessError:
return [
'perfetto_config.proto or perfetto_trace.proto is out of ' +
f'date. Please run python {tool} to update it.'
]
return []
def CheckProtoEventList(changed_files: List[str],
merge_base: Optional[str]) -> List[str]:
target_file = 'src/tools/ftrace_proto_gen/event_list'
norm_target_file = os.path.normpath(target_file)
if norm_target_file not in [os.path.normpath(f) for f in changed_files]:
return []
if not merge_base:
print(
f"Warning: Cannot check {target_file} history accurately without "
f"merge-base.",
file=sys.stderr)
return [] # Skip check if we don't have history baseline
old_content = get_git_file_content(merge_base, target_file)
new_content = read_file_content(target_file)
if old_content is None or new_content is None:
return [f"Error reading old or new content for {target_file}"]
# Allow adding lines or replacing lines with 'removed...'
errors = []
# Use zip_longest to handle lines added/removed at the end
from itertools import zip_longest
for i, (old_line_raw,
new_line_raw) in enumerate(zip_longest(old_content, new_content)):
old_line = old_line_raw.rstrip('\n\r') if old_line_raw is not None else None
new_line = new_line_raw.rstrip('\n\r') if new_line_raw is not None else None
if old_line == new_line:
continue
elif old_line is None and new_line is not None:
# Line added - this is okay
continue
elif old_line is not None and new_line is None:
# Line removed - check if it should have been replaced
if old_line.strip() and not old_line.strip().startswith(
'#'): # Ignore empty/comment lines removal
errors.append(
f"{target_file}:{i+1}: Line removed without being replaced by "
f"'removed': '{old_line}'")
elif old_line is not None and new_line is not None:
# Line changed - check if replacement is valid
if not new_line.strip().startswith('removed'):
errors.append(
f"{target_file}:{i+1}: Invalid change. Only appending or replacing "
f"with 'removed' is allowed. Got '{new_line}' from '{old_line}'")
if errors:
return [
f'{target_file} only has two supported changes: '
f'appending a new line, and replacing a line content starting with '
f'"removed".\n' + "\n".join(errors)
]
return []
def CheckBannedCpp(changed_files: List[str]) -> List[str]:
bad_cpp: List[Tuple[str, str]] = [
(r'\bstd::stoi\b',
'std::stoi throws exceptions prefer base::StringToInt32()'),
(r'\bstd::stol\b',
'std::stoull throws exceptions prefer base::StringToInt32()'),
(r'\bstd::stoul\b',
'std::stoull throws exceptions prefer base::StringToUint32()'),
(r'\bstd::stoll\b',
'std::stoull throws exceptions prefer base::StringToInt64()'),
(r'\bstd::stoull\b',
'std::stoull throws exceptions prefer base::StringToUint64()'),
(r'\bstd::stof\b',
'std::stof throws exceptions prefer base::StringToDouble()'),
(r'\bstd::stod\b',
'std::stod throws exceptions prefer base::StringToDouble()'),
(r'\bstd::stold\b',
'std::stold throws exceptions prefer base::StringToDouble()'),
(r'\bstrncpy\b',
'strncpy does not null-terminate if src > dst. Use base::StringCopy'),
(r'[(=]\s*snprintf\(',
'snprintf can return > dst_size. Use base::SprintfTrunc'),
(r'//.*\bDNS\b',
'// DNS (Do Not Ship) found. Did you mean to remove some testing code?'),
(r'\bPERFETTO_EINTR\(close\(',
'close(2) must not be retried on EINTR on Linux and other OSes '
'that we run on, as the fd will be closed.'),
(r'^#include <inttypes.h>', 'Use <cinttypes> rather than <inttypes.h>. ' +
'See https://github.com/google/perfetto/issues/146'),
]
# Use fnmatch patterns for filtering files
files_to_check_list = filter_files(
changed_files, files_to_check=['*.h', '*.cc'])
errors = []
comment_pattern = re.compile(r'^\s*//') # Regex is fine here
for f_path in files_to_check_list:
content = read_file_content(f_path)
if content:
for i, line in enumerate(content):
if comment_pattern.search(line):
continue # Skip comments
for regex_str, message in bad_cpp:
# Use re.search for checking content
if re.search(regex_str, line):
errors.append(f'Banned pattern:\n {f_path}:{i+1}: {message}')
return errors
def CheckSqlModules(changed_files: List[str]) -> List[str]:
if sys.platform == 'win32':
return []
tool_rel_path = 'tools/check_sql_modules.py'
tool = os.path.join(REPO_ROOT, tool_rel_path)
# Use fnmatch patterns
files_to_check_list = filter_files(
changed_files,
files_to_check=[
'src/trace_processor/perfetto_sql/stdlib/*.sql', tool_rel_path
])
if not files_to_check_list:
return []
tool_changed = tool_rel_path in [os.path.normpath(f) for f in changed_files]
source_files_changed = any(f != tool_rel_path for f in files_to_check_list)
if not tool_changed and not source_files_changed:
return []
try:
run_command([sys.executable, tool], cwd=REPO_ROOT)
except FileNotFoundError:
return [f"Tool not found: {tool}"]
except subprocess.CalledProcessError:
return [f'{tool} failed']
return []
def CheckStdlibIncludes(changed_files: List[str]) -> List[str]:
if sys.platform == 'win32':
return []
tool_rel_path = 'tools/check_stdlib_includes'
tool = os.path.join(REPO_ROOT, tool_rel_path)
# Use fnmatch patterns
files_to_check_list = filter_files(
changed_files,
files_to_check=[
'src/trace_processor/perfetto_sql/stdlib/*.sql', tool_rel_path
])
if not files_to_check_list:
return []
tool_changed = tool_rel_path in [os.path.normpath(f) for f in changed_files]
source_files_changed = any(f != tool_rel_path for f in files_to_check_list)
if not tool_changed and not source_files_changed:
return []
try:
run_command([sys.executable, tool, '--quiet'], cwd=REPO_ROOT)
except FileNotFoundError:
return [f"Tool not found: {tool}"]
except subprocess.CalledProcessError:
return [f'{tool} failed']
return []
def CheckSqlMetrics(changed_files: List[str]) -> List[str]:
if sys.platform == 'win32':
return []
tool_rel_path = 'tools/check_sql_metrics.py'
tool = os.path.join(REPO_ROOT, tool_rel_path)
# Use fnmatch patterns
files_to_check_list = filter_files(
changed_files,
files_to_check=['src/trace_processor/metrics/*.sql', tool_rel_path])
if not files_to_check_list:
return []
tool_changed = tool_rel_path in [os.path.normpath(f) for f in changed_files]
source_files_changed = any(f != tool_rel_path for f in files_to_check_list)
if not tool_changed and not source_files_changed:
return []
try:
run_command([sys.executable, tool], cwd=REPO_ROOT)
except FileNotFoundError:
return [f"Tool not found: {tool}"]
except subprocess.CalledProcessError:
return [f'{tool} failed']
return []
def CheckTestData(_: List[str]) -> List[str]:
tool_rel_path = 'tools/test_data'
tool = os.path.join(REPO_ROOT, tool_rel_path)
try:
# Assume tool needs execution permissions or use sys.executable
run_command([sys.executable, tool, 'status', '--quiet'], cwd=REPO_ROOT)
except FileNotFoundError:
# Try without sys.executable if it's meant to be directly executable
try:
run_command([tool, 'status', '--quiet'], cwd=REPO_ROOT)
except FileNotFoundError:
return [f"Tool not found: {tool}"]
except subprocess.CalledProcessError:
# Fall through to error message
pass
else:
return [] # Succeeded without sys.executable
except subprocess.CalledProcessError:
# Fall through to error message
pass
else: # If try block succeeded
return []
# If we reached here, it failed
return [
f'//test/data is out of sync. Run `{tool} status` for more.\n'
f'If you rebaselined UI tests or added a new test trace, run: `{tool} '
f'upload`.\n'
f'Otherwise run `tools/install-build-deps` or `{tool} download '
f'--overwrite` to sync local test_data'
]
def CheckChromeStdlib(changed_files: List[str]) -> List[str]:
# Use fnmatch patterns
stdlib_paths_patterns = ("src/trace_processor/perfetto_sql/stdlib/chrome/*",
"test/data/chrome/*",
"test/trace_processor/diff_tests/stdlib/chrome/*")
chrome_stdlib_files = filter_files(
changed_files, files_to_check=stdlib_paths_patterns)
if not chrome_stdlib_files:
return []
# Check commit message for exceptions
commit_message = get_presubmit_commit_message()
# Use regex for more robust check in commit message
if re.search(r'COPYBARA_IMPORT', commit_message, re.IGNORECASE):
print(
"INFO: COPYBARA_IMPORT detected, skipping CheckChromeStdlib.",
file=sys.stderr)
return []
if re.search(r'CHROME_STDLIB_MANUAL_ROLL', commit_message, re.IGNORECASE):
print(
"INFO: CHROME_STDLIB_MANUAL_ROLL detected, skipping CheckChromeStdlib.",
file=sys.stderr)
return []
# If files changed and no exception found, return error
# Use relative paths from patterns for message
paths_str = ', '.join(p.replace('*', '') for p in stdlib_paths_patterns)
message = (
f'Files under {paths_str} '
'are rolled from the Chromium repository by a '
'Copybara service.\nYou should not modify these in '
'the Perfetto repository, please make your changes '
'in Chromium instead.\n'
'Affected files:\n' +
"\n".join([f" - {f}" for f in chrome_stdlib_files]) + "\n"
'If you want to do a manual roll, you must specify '
'CHROME_STDLIB_MANUAL_ROLL=<reason> in the CL description/commit message.'
)
return [message]
def CheckAmalgamatedPythonTools(changed_files: List[str]) -> List[str]:
if sys.platform == 'win32':
return []
tool_rel_path = 'tools/gen_amalgamated_python_tools'
tool = os.path.join(REPO_ROOT, tool_rel_path)
# Use fnmatch patterns
files_to_check_list = filter_files(
changed_files, files_to_check=['python/*', tool_rel_path])
if not files_to_check_list:
return []
tool_changed = tool_rel_path in [os.path.normpath(f) for f in changed_files]
source_files_changed = any(f != tool_rel_path for f in files_to_check_list)
if not tool_changed and not source_files_changed:
return []
try:
run_command([sys.executable, tool, '--check-only'], cwd=REPO_ROOT)
except FileNotFoundError:
return [f"Tool not found: {tool}"]
except subprocess.CalledProcessError:
return [
f'amalgamated python tools/ are out of date. Run python {tool} to '
f'update them.'
]
return []
def CheckAbsolutePathsInGn(changed_files: List[str]) -> List[str]:
# Use fnmatch patterns for filtering
files_to_check_list = filter_files(
changed_files,
files_to_check=['*.gni', '*.gn'], # Simplified include
files_to_skip=[
'.gn', # Skip root .gn file by name
'gn/*', # Skip files in root gn/ directory
'buildtools/*', # Skip files in root buildtools/ directory
])
error_lines = []
# Pattern to find "//" inside quotes. Regex is fine here.
abs_path_pattern = re.compile(r'"//[^"]')
nogncheck_pattern = re.compile(r'#\s*nogncheck', re.IGNORECASE)
comment_pattern = re.compile(r'^\s*#')
for f_path in files_to_check_list:
content = read_file_content(f_path)
if content:
for i, line in enumerate(content):
if nogncheck_pattern.search(line) or comment_pattern.match(line):
continue # Skip comments and '# nogncheck' lines
if abs_path_pattern.search(line):
error_lines.append(f' {f_path}:{i+1}: {line.strip()}')
if not error_lines:
return []
return [
'Use relative paths in GN rather than absolute ("//..."): Check these '
'lines:\n' + '\n'.join(error_lines)
]
def CheckUiImports(changed_files: List[str]) -> List[str]:
tool_rel_path = 'tools/check_imports'
tool = os.path.join(REPO_ROOT, tool_rel_path)
# Use fnmatch patterns
files_to_check_list = filter_files(
changed_files, files_to_check=['ui/*', tool_rel_path])
if not files_to_check_list:
return []
try:
run_command([sys.executable, tool], cwd=REPO_ROOT, stderr=subprocess.STDOUT)
except subprocess.CalledProcessError:
return [f'Import violations detected in ui/ sources. See {tool_rel_path}']
return []
def CheckUiRatchet(changed_files: List[str]) -> List[str]:
tool_rel_path = 'tools/check_ratchet'
tool = os.path.join(REPO_ROOT, tool_rel_path)
# Use fnmatch patterns
files_to_check_list = filter_files(
changed_files, files_to_check=['ui/*', tool_rel_path])
if not files_to_check_list:
return []
try:
run_command([sys.executable, tool], cwd=REPO_ROOT, stderr=subprocess.STDOUT)
except subprocess.CalledProcessError:
return [f'Bad patterns detected in ui/ sources. See {tool_rel_path}']
return []
def main():
parser = argparse.ArgumentParser()
parser.add_argument('--merge-base', default=None)
parser.add_argument(
'--skip-formatters',
default='',
help='Comma-separated list of code formatters to skip')
# The pre-push hook passes extra cmdline arguments like 'origin git@...'.
# Accept and ignore positional args.
args, _ = parser.parse_known_args()
# 1. Determine files to check
merge_base = args.merge_base
if merge_base is None:
merge_base = subprocess.check_output(
['git', 'merge-base', 'HEAD', 'origin/main'], text=True).strip()
changed_files = get_changed_files(merge_base)
if not changed_files:
print("No changed files detected relative to upstream or in working dir.")
sys.exit(0)
print(f"Checking {len(changed_files)} files...") # Optional verbosity
# 3. Run all checks
all_results = []
# Use RunAndReportIfLong for all checks
checks_to_run = [
(CheckCodeFormatted, [merge_base, args.skip_formatters]),
(CheckDoNotSubmit, [changed_files]),
(CheckChangeHasNoTabs, [changed_files]),
(CheckIncludeGuards, [changed_files]),
(CheckIncludeViolations, [changed_files]),
(CheckIncludePaths, [changed_files]),
(CheckProtoComments, [changed_files]),
(CheckBuild, [changed_files]),
(CheckAndroidBlueprint, [changed_files]),
(CheckBinaryDescriptors, [changed_files]),
(CheckMergedTraceConfigProto, [changed_files]),
(CheckProtoEventList, [changed_files, merge_base]), # Needs merge_base
(CheckBannedCpp, [changed_files]),
(CheckSqlModules, [changed_files]),
(CheckStdlibIncludes, [changed_files]),
(CheckSqlMetrics, [changed_files]),
(CheckTestData, [changed_files
]), # Doesn't need specific files list but pass anyway
(CheckAmalgamatedPythonTools, [changed_files]),
(CheckChromeStdlib, [changed_files]), # Checks commit msg
(CheckAbsolutePathsInGn, [changed_files]),
(CheckUiImports, [changed_files]),
(CheckUiRatchet, [changed_files]),
]
for func, args in checks_to_run:
all_results.extend(RunAndReportIfLong(func, *args))
# 4. Report results
if all_results:
print("\n--- Presubmit Errors Found ---", file=sys.stderr)
# Filter out potential None results if any check returns None on error
for error in filter(None, all_results):
print(f"- {error}", file=sys.stderr)
print("\n--- Presubmit Failed ---", file=sys.stderr)
sys.exit(1)
else:
print("\n--- Presubmit Succeeded ---")
sys.exit(0)
if __name__ == '__main__':
# Ensure we run from repo root for consistent paths
os.chdir(REPO_ROOT)
main()