blob: 40195650503ea960a1bff8c56cc9285b5288f7ea [file] [log] [blame] [edit]
#!/usr/bin/env python3
# Copyright (C) 2025 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import fnmatch
import os
import re
import subprocess
import sys
import time
from typing import List, Optional, Tuple, Iterable
# The script can be executed as a git hook symlink which is why we also
# do realpath.
REPO_ROOT = os.path.dirname(
os.path.dirname(os.path.abspath(os.path.realpath(__file__))))
def RunAndReportIfLong(func, *args, **kargs):
"""Runs function, reports runtime if it exceeds a limit."""
start = time.time()
results = func(*args, **kargs)
end = time.time()
limit = 3.0 # seconds
name = func.__name__
runtime = end - start
if runtime > limit:
print(f"{name} took >{limit:.2f}s ({runtime:.2f}s)", file=sys.stderr)
return results
def run_command(args: List[str],
check: bool = True,
**kwargs) -> subprocess.CompletedProcess:
"""Helper to run a subprocess. Handles capture_output conflict."""
# Decide whether to use capture_output based on kwargs
use_capture_output = 'stdout' not in kwargs and 'stderr' not in kwargs
try:
if use_capture_output:
# Default case: capture stdout/stderr using capture_output
return subprocess.run(
args, capture_output=True, text=True, check=check, **kwargs)
else:
# Case where stdout or stderr is explicitly provided (e.g., DEVNULL)
# Don't use capture_output=True. Ensure text=True if not overridden.
kwargs.setdefault('text', True)
# Filter out capture_output if it was somehow passed in kwargs
kwargs.pop('capture_output', None)
return subprocess.run(args, check=check, **kwargs)
except FileNotFoundError:
# Specific handling for command not found
print(
f"Error: Command '{args[0]}' not found. Please ensure it's installed "
f"and in your PATH.",
file=sys.stderr)
sys.exit(1)
except subprocess.CalledProcessError as e:
# Reraise for caller to handle, or handle more specifically if needed
raise e
def get_merge_base() -> Optional[str]:
result = run_command(['git', 'symbolic-ref', '--short', 'HEAD'], check=True)
branch = result.stdout.strip()
# First check if there's a tracking branch
try:
# More direct way to get the upstream branch
args = ['git', 'rev-parse', '--abbrev-ref', f'{branch}@{{upstream}}']
tracking_result = run_command(args, check=True)
tracking_branch = tracking_result.stdout.strip()
# If we have a tracking branch, check if it's local
if tracking_branch:
# Check if this is a local branch by trying to get its ref in refs/heads/
try:
_ = run_command(
['git', 'show-ref', '--verify', f'refs/heads/{tracking_branch}'],
check=True)
# If we get here without an exception, it's a local branch
return tracking_branch
except subprocess.CalledProcessError:
# Not a local branch, continue to fallback
pass
except subprocess.CalledProcessError:
# No tracking branch, continue to fallback
pass
# Fallback to origin/main
try:
# Check if origin/main exists
_ = run_command(['git', 'show-ref', '--verify', 'refs/remotes/origin/main'],
check=True)
return 'origin/main'
except subprocess.CalledProcessError:
# origin/main doesn't exist, return None
return None
def get_changed_files(merge_base: Optional[str]) -> List[str]:
"""Gets list of files changed since merge_base, or all relevant files if
merge_base is None."""
try:
if merge_base:
# Status checks modified/added files, diff checks files changed since
# merge-base
status_result = run_command(['git', 'status', '--porcelain'])
# Ignore deleted
staged_or_modified = [
line.split()[-1]
for line in status_result.stdout.splitlines()
if not line.startswith(' D')
]
diff_result = run_command([
'git', 'diff', '--name-only', '--diff-filter=crd',
f'{merge_base}...HEAD'
])
diff_files = diff_result.stdout.splitlines()
# Combine and make unique, relative to repo root
all_changed = set(staged_or_modified) | set(diff_files)
# Normalize paths
return sorted([os.path.normpath(f) for f in all_changed])
else:
# No merge base, get all tracked files
result = run_command(['git', 'ls-files'])
return sorted([os.path.normpath(f) for f in result.stdout.splitlines()])
except (subprocess.CalledProcessError, FileNotFoundError) as e:
print(f"Error getting changed files: {e}", file=sys.stderr)
sys.exit(1)
def filter_files(files: List[str],
files_to_check: Optional[Iterable[str]] = None,
files_to_skip: Optional[Iterable[str]] = None) -> List[str]:
"""Filters a list of files based on include/exclude patterns (fnmatch)."""
filtered = []
# Use glob patterns (fnmatch) directly
check_patterns = list(files_to_check) if files_to_check else [
'*'
] # Default: check all
skip_patterns = list(files_to_skip) if files_to_skip else []
# fnmatch generally works well with forward slashes even on Windows
# No need to normalize patterns unless specific issues arise
for f in files:
# Use os.path.normpath for consistent separators in the file path being
# checked
norm_f = os.path.normpath(f)
# Check skip patterns first
if any(
fnmatch.fnmatch(norm_f, pattern) or
fnmatch.fnmatch(os.path.basename(norm_f), pattern)
for pattern in skip_patterns):
# Check against full path and just the filename for flexibility
continue
# Check include patterns
if any(
fnmatch.fnmatch(norm_f, pattern) or
fnmatch.fnmatch(os.path.basename(norm_f), pattern)
for pattern in check_patterns):
filtered.append(f) # Return original path 'f'
return filtered
def read_file_content(filepath: str) -> Optional[List[str]]:
"""Reads a file's content, returns lines or None on error."""
try:
# Use absolute path for reading
abs_path = os.path.join(REPO_ROOT, filepath)
with open(abs_path, 'r', encoding='utf-8', errors='ignore') as f:
return f.readlines()
except FileNotFoundError:
# This can happen if a file listed in diff was deleted or moved before
# reading
print(f"Warning: File not found during read {filepath}", file=sys.stderr)
return None
except Exception as e:
print(f"Error reading file {filepath}: {e}", file=sys.stderr)
return None
def get_git_file_content(ref: str, filepath: str) -> Optional[List[str]]:
"""Gets file content from a specific git ref."""
try:
# Use relative path for git show
result = run_command(['git', 'show', f'{ref}:{filepath}'],
check=False) # Don't fail if file didn't exist in ref
if result.returncode == 0:
return result.stdout.splitlines(keepends=True)
return [] # File didn't exist or was deleted, treat as empty old content
except (subprocess.CalledProcessError, FileNotFoundError) as e:
print(
f"Error getting git content for {ref}:{filepath}: {e}", file=sys.stderr)
return None # Indicate error
def get_commit_message() -> str:
"""Gets the commit message of the HEAD commit."""
try:
result = run_command(['git', 'log', '-1', '--pretty=%B'])
return result.stdout
except (subprocess.CalledProcessError, FileNotFoundError) as e:
print(f"Error getting commit message: {e}", file=sys.stderr)
return "" # Return empty string on error
def CheckDoNotSubmit(changed_files: List[str]) -> List[str]:
errors = []
pattern = re.compile(r'DO NOT SUBMIT', re.IGNORECASE)
files_to_check_list = filter_files(
changed_files, files_to_skip=['tools/run_presubmit'])
for f_path in files_to_check_list:
content = read_file_content(f_path)
if content:
for i, line in enumerate(content):
if pattern.search(line):
errors.append(f"{f_path}:{i+1}: Found 'DO NOT SUBMIT'")
return errors
def CheckChangeHasNoTabs(changed_files: List[str]) -> List[str]:
errors = []
# Basic filter: skip common binary/data files where tabs might be ok,
# c++ files are already handled by clang-format.
files_to_check_list = filter_files(
changed_files,
files_to_skip=[
'*.cc',
'*.h',
'*.pb',
'*.png',
'*.jpg',
'*/test/data/*',
'Makefile',
'*.descriptor',
])
for f_path in files_to_check_list:
content = read_file_content(f_path)
if content:
for i, line in enumerate(content):
if '\t' in line:
errors.append(f"{f_path}:{i+1}: Found tab character")
break # Only report once per file
return errors
def CheckCodeFormatted() -> List[str]:
# NOTE: format-sources does its own invocation of git diff ... to detect
# changed file. We use that, rather than passing the list here, to avoid
# ending in a situation where run_presubmit fails, but then
# tools/format_sources succeeds because they diverge in their diff logic.
tool = os.path.join(REPO_ROOT, 'tools/format-sources')
try:
run_command([tool, '--quiet', '--check-only'], stderr=subprocess.STDOUT)
except subprocess.CalledProcessError:
return [f'Please run {tool} to format sources.']
return []
def CheckIncludeGuards(changed_files: List[str]) -> List[str]:
if sys.platform == 'win32':
return []
tool_rel_path = 'tools/fix_include_guards'
tool = os.path.join(REPO_ROOT, tool_rel_path)
# Use fnmatch patterns
files_to_check_list = filter_files(
changed_files, files_to_check=['*.cc', '*.h', tool_rel_path])
if not files_to_check_list:
return []
# Check if the tool itself changed
tool_changed = tool_rel_path in [os.path.normpath(f) for f in changed_files]
# Check if relevant source files changed
source_files_changed = any(f != tool_rel_path for f in files_to_check_list)
if not tool_changed and not source_files_changed:
return []
try:
# Run the tool relative to the repo root
run_command([sys.executable, tool, '--check-only'], cwd=REPO_ROOT)
except FileNotFoundError:
return [f"Tool not found: {tool}"]
except subprocess.CalledProcessError:
return [f'Please run python {tool} to fix include guards.']
return []
def CheckIncludeViolations(changed_files: List[str]) -> List[str]:
if sys.platform == 'win32':
return []
tool_rel_path = 'tools/check_include_violations'
tool = os.path.join(REPO_ROOT, tool_rel_path)
# Use fnmatch patterns
files_to_check_list = filter_files(
changed_files, files_to_check=['include/*.h', tool_rel_path])
if not files_to_check_list:
return []
tool_changed = tool_rel_path in [os.path.normpath(f) for f in changed_files]
source_files_changed = any(f != tool_rel_path for f in files_to_check_list)
if not tool_changed and not source_files_changed:
return []
try:
run_command([sys.executable, tool], cwd=REPO_ROOT)
except FileNotFoundError:
return [f"Tool not found: {tool}"]
except subprocess.CalledProcessError:
return [f'{tool} failed.']
return []
def CheckIncludePaths(changed_files: List[str]) -> List[str]:
# Uses fnmatch patterns
files_to_check_list = filter_files(
changed_files, files_to_check=['*.h', '*.cc'])
error_lines = []
pattern = re.compile(r'^#include "(.*\.h)"')
for f_path in files_to_check_list:
content = read_file_content(f_path)
if not content:
continue
for i, line in enumerate(content):
match = pattern.search(line)
if not match:
continue
if '// no-include-violation-check' in line:
continue
inc_hdr = match.group(1)
# Normalize path separators for comparison
norm_inc_hdr = os.path.normpath(inc_hdr)
# Check if it starts with 'include/perfetto' or 'include\perfetto'
if norm_inc_hdr.startswith(os.path.join('include', 'perfetto')):
error_lines.append(
f' {f_path}:{i+1}: Redundant "include/" in #include path'
f' "{inc_hdr}"')
# Check for relative paths (no directory separator in the normalized path)
# Exclude paths already starting with 'perfetto/' which is allowed
# relative to include/
has_no_os_sep = os.path.sep not in norm_inc_hdr
has_no_fwd_slash = '/' not in norm_inc_hdr
does_not_start_with_perfetto = not norm_inc_hdr.startswith('perfetto/')
if has_no_os_sep and has_no_fwd_slash and does_not_start_with_perfetto:
# Also allow includes relative to 'src/' for internal headers
if not norm_inc_hdr.startswith('src/'):
error_lines.append(
f' {f_path}:{i+1}: relative #include "{inc_hdr}" not allowed, '
f'use full path from include dir (perfetto/...) or project root '
f'(src/...) ')
return [] if not error_lines else [
'Invalid #include paths detected:\n' + '\n'.join(error_lines)
]
def CheckProtoComments(changed_files: List[str]) -> List[str]:
if sys.platform == 'win32':
return []
tool_rel_path = 'tools/check_proto_comments'
tool = os.path.join(REPO_ROOT, tool_rel_path)
# Use fnmatch patterns
files_to_check_list = filter_files(
changed_files, files_to_check=['protos/perfetto/*.proto', tool_rel_path])
if not files_to_check_list:
return []
tool_changed = tool_rel_path in [os.path.normpath(f) for f in changed_files]
source_files_changed = any(f != tool_rel_path for f in files_to_check_list)
if not tool_changed and not source_files_changed:
return []
try:
run_command([sys.executable, tool], cwd=REPO_ROOT)
except FileNotFoundError:
return [f"Tool not found: {tool}"]
except subprocess.CalledProcessError:
return [f'{tool} failed']
return []
def CheckBuild(changed_files: List[str]) -> List[str]:
if sys.platform == 'win32':
return []
tool_rel_path = 'tools/gen_bazel'
tool = os.path.join(REPO_ROOT, tool_rel_path)
files_to_check_list = filter_files(
changed_files,
files_to_check=['*BUILD.gn', '*.gni', 'BUILD.extras', tool_rel_path])
if not files_to_check_list:
return []
tool_changed = tool_rel_path in [os.path.normpath(f) for f in changed_files]
source_files_changed = any(f != tool_rel_path for f in files_to_check_list)
if not tool_changed and not source_files_changed:
return []
try:
# Run the tool relative to the repo root
run_command([sys.executable, tool, '--check-only'], cwd=REPO_ROOT)
except FileNotFoundError:
return [f"Tool not found: {tool}"]
except subprocess.CalledProcessError:
return [
f'Bazel BUILD(s) are out of date. Run python {tool} to update them.'
]
return []
def CheckAndroidBlueprint(changed_files: List[str]) -> List[str]:
if sys.platform == 'win32':
return []
tool_rel_path = 'tools/gen_android_bp'
tool = os.path.join(REPO_ROOT, tool_rel_path)
# Use fnmatch patterns
files_to_check_list = filter_files(
changed_files,
files_to_check=['*BUILD.gn', '*.gni', tool_rel_path],
files_to_skip=['src/trace_processor/perfetto_sql/stdlib/chrome/BUILD.gn'])
if not files_to_check_list:
return []
tool_changed = tool_rel_path in [os.path.normpath(f) for f in changed_files]
source_files_changed = any(f != tool_rel_path for f in files_to_check_list)
if not tool_changed and not source_files_changed:
return []
try:
run_command([sys.executable, tool, '--check-only'], cwd=REPO_ROOT)
except FileNotFoundError:
return [f"Tool not found: {tool}"]
except subprocess.CalledProcessError:
return [
f'Android build files are out of date. Run python {tool} to update '
'them.'
]
return []
def CheckBinaryDescriptors(changed_files: List[str]) -> List[str]:
if sys.platform == 'win32':
return []
tool_rel_path = 'tools/gen_binary_descriptors'
tool = os.path.join(REPO_ROOT, tool_rel_path)
# Use fnmatch patterns
files_to_check_list = filter_files(
changed_files,
files_to_check=['protos/perfetto/*.proto', '*.h', tool_rel_path])
if not files_to_check_list:
return []
tool_changed = tool_rel_path in [os.path.normpath(f) for f in changed_files]
source_files_changed = any(f != tool_rel_path for f in files_to_check_list)
if not tool_changed and not source_files_changed:
return []
try:
run_command([sys.executable, tool, '--check-only'], cwd=REPO_ROOT)
except FileNotFoundError:
return [f"Tool not found: {tool}"]
except subprocess.CalledProcessError:
return [f'Please run python {tool} to update binary descriptors.']
return []
def CheckMergedTraceConfigProto(changed_files: List[str]) -> List[str]:
if sys.platform == 'win32':
return []
tool_rel_path = 'tools/gen_merged_protos'
tool = os.path.join(REPO_ROOT, tool_rel_path)
# Use fnmatch patterns
files_to_check_list = filter_files(
changed_files, files_to_check=['protos/perfetto/*.proto', tool_rel_path])
if not files_to_check_list:
return []
tool_changed = tool_rel_path in [os.path.normpath(f) for f in changed_files]
source_files_changed = any(f != tool_rel_path for f in files_to_check_list)
if not tool_changed and not source_files_changed:
return []
try:
run_command([sys.executable, tool, '--check-only'], cwd=REPO_ROOT)
except FileNotFoundError:
return [f"Tool not found: {tool}"]
except subprocess.CalledProcessError:
return [
'perfetto_config.proto or perfetto_trace.proto is out of ' +
f'date. Please run python {tool} to update it.'
]
return []
def CheckProtoEventList(changed_files: List[str],
merge_base: Optional[str]) -> List[str]:
target_file = 'src/tools/ftrace_proto_gen/event_list'
norm_target_file = os.path.normpath(target_file)
if norm_target_file not in [os.path.normpath(f) for f in changed_files]:
return []
if not merge_base:
print(
f"Warning: Cannot check {target_file} history accurately without "
f"merge-base.",
file=sys.stderr)
return [] # Skip check if we don't have history baseline
old_content = get_git_file_content(merge_base, target_file)
new_content = read_file_content(target_file)
if old_content is None or new_content is None:
return [f"Error reading old or new content for {target_file}"]
# Allow adding lines or replacing lines with 'removed...'
errors = []
# Use zip_longest to handle lines added/removed at the end
from itertools import zip_longest
for i, (old_line_raw,
new_line_raw) in enumerate(zip_longest(old_content, new_content)):
old_line = old_line_raw.rstrip('\n\r') if old_line_raw is not None else None
new_line = new_line_raw.rstrip('\n\r') if new_line_raw is not None else None
if old_line == new_line:
continue
elif old_line is None and new_line is not None:
# Line added - this is okay
continue
elif old_line is not None and new_line is None:
# Line removed - check if it should have been replaced
if old_line.strip() and not old_line.strip().startswith(
'#'): # Ignore empty/comment lines removal
errors.append(
f"{target_file}:{i+1}: Line removed without being replaced by "
f"'removed': '{old_line}'")
elif old_line is not None and new_line is not None:
# Line changed - check if replacement is valid
if not new_line.strip().startswith('removed'):
errors.append(
f"{target_file}:{i+1}: Invalid change. Only appending or replacing "
f"with 'removed' is allowed. Got '{new_line}' from '{old_line}'")
if errors:
return [
f'{target_file} only has two supported changes: '
f'appending a new line, and replacing a line content starting with '
f'"removed".\n' + "\n".join(errors)
]
return []
def CheckBannedCpp(changed_files: List[str]) -> List[str]:
bad_cpp: List[Tuple[str, str]] = [
(r'\bstd::stoi\b',
'std::stoi throws exceptions prefer base::StringToInt32()'),
(r'\bstd::stol\b',
'std::stoull throws exceptions prefer base::StringToInt32()'),
(r'\bstd::stoul\b',
'std::stoull throws exceptions prefer base::StringToUint32()'),
(r'\bstd::stoll\b',
'std::stoull throws exceptions prefer base::StringToInt64()'),
(r'\bstd::stoull\b',
'std::stoull throws exceptions prefer base::StringToUint64()'),
(r'\bstd::stof\b',
'std::stof throws exceptions prefer base::StringToDouble()'),
(r'\bstd::stod\b',
'std::stod throws exceptions prefer base::StringToDouble()'),
(r'\bstd::stold\b',
'std::stold throws exceptions prefer base::StringToDouble()'),
(r'\bstrncpy\b',
'strncpy does not null-terminate if src > dst. Use base::StringCopy'),
(r'[(=]\s*snprintf\(',
'snprintf can return > dst_size. Use base::SprintfTrunc'),
(r'//.*\bDNS\b',
'// DNS (Do Not Ship) found. Did you mean to remove some testing code?'),
(r'\bPERFETTO_EINTR\(close\(',
'close(2) must not be retried on EINTR on Linux and other OSes '
'that we run on, as the fd will be closed.'),
(r'^#include <inttypes.h>', 'Use <cinttypes> rather than <inttypes.h>. ' +
'See https://github.com/google/perfetto/issues/146'),
]
# Use fnmatch patterns for filtering files
files_to_check_list = filter_files(
changed_files, files_to_check=['*.h', '*.cc'])
errors = []
comment_pattern = re.compile(r'^\s*//') # Regex is fine here
for f_path in files_to_check_list:
content = read_file_content(f_path)
if content:
for i, line in enumerate(content):
if comment_pattern.search(line):
continue # Skip comments
for regex_str, message in bad_cpp:
# Use re.search for checking content
if re.search(regex_str, line):
errors.append(f'Banned pattern:\n {f_path}:{i+1}: {message}')
return errors
def CheckSqlModules(changed_files: List[str]) -> List[str]:
if sys.platform == 'win32':
return []
tool_rel_path = 'tools/check_sql_modules.py'
tool = os.path.join(REPO_ROOT, tool_rel_path)
# Use fnmatch patterns
files_to_check_list = filter_files(
changed_files,
files_to_check=[
'src/trace_processor/perfetto_sql/stdlib/*.sql', tool_rel_path
])
if not files_to_check_list:
return []
tool_changed = tool_rel_path in [os.path.normpath(f) for f in changed_files]
source_files_changed = any(f != tool_rel_path for f in files_to_check_list)
if not tool_changed and not source_files_changed:
return []
try:
run_command([sys.executable, tool], cwd=REPO_ROOT)
except FileNotFoundError:
return [f"Tool not found: {tool}"]
except subprocess.CalledProcessError:
return [f'{tool} failed']
return []
def CheckSqlMetrics(changed_files: List[str]) -> List[str]:
if sys.platform == 'win32':
return []
tool_rel_path = 'tools/check_sql_metrics.py'
tool = os.path.join(REPO_ROOT, tool_rel_path)
# Use fnmatch patterns
files_to_check_list = filter_files(
changed_files,
files_to_check=['src/trace_processor/metrics/*.sql', tool_rel_path])
if not files_to_check_list:
return []
tool_changed = tool_rel_path in [os.path.normpath(f) for f in changed_files]
source_files_changed = any(f != tool_rel_path for f in files_to_check_list)
if not tool_changed and not source_files_changed:
return []
try:
run_command([sys.executable, tool], cwd=REPO_ROOT)
except FileNotFoundError:
return [f"Tool not found: {tool}"]
except subprocess.CalledProcessError:
return [f'{tool} failed']
return []
def CheckTestData(_: List[str]) -> List[str]:
tool_rel_path = 'tools/test_data'
tool = os.path.join(REPO_ROOT, tool_rel_path)
try:
# Assume tool needs execution permissions or use sys.executable
run_command([sys.executable, tool, 'status', '--quiet'], cwd=REPO_ROOT)
except FileNotFoundError:
# Try without sys.executable if it's meant to be directly executable
try:
run_command([tool, 'status', '--quiet'], cwd=REPO_ROOT)
except FileNotFoundError:
return [f"Tool not found: {tool}"]
except subprocess.CalledProcessError:
# Fall through to error message
pass
else:
return [] # Succeeded without sys.executable
except subprocess.CalledProcessError:
# Fall through to error message
pass
else: # If try block succeeded
return []
# If we reached here, it failed
return [
f'//test/data is out of sync. Run `{tool} status` for more.\n'
f'If you rebaselined UI tests or added a new test trace, run: `{tool} '
f'upload`.\n'
f'Otherwise run `tools/install-build-deps` or `{tool} download '
f'--overwrite` to sync local test_data'
]
def CheckChromeStdlib(changed_files: List[str]) -> List[str]:
# Use fnmatch patterns
stdlib_paths_patterns = ("src/trace_processor/perfetto_sql/stdlib/chrome/*",
"test/data/chrome/*",
"test/trace_processor/diff_tests/stdlib/chrome/*")
chrome_stdlib_files = filter_files(
changed_files, files_to_check=stdlib_paths_patterns)
if not chrome_stdlib_files:
return []
# Check commit message for exceptions
commit_message = get_commit_message()
# Use regex for more robust check in commit message
if re.search(r'COPYBARA_IMPORT', commit_message, re.IGNORECASE):
print(
"INFO: COPYBARA_IMPORT detected, skipping CheckChromeStdlib.",
file=sys.stderr)
return []
if re.search(r'CHROME_STDLIB_MANUAL_ROLL', commit_message, re.IGNORECASE):
print(
"INFO: CHROME_STDLIB_MANUAL_ROLL detected, skipping CheckChromeStdlib.",
file=sys.stderr)
return []
# If files changed and no exception found, return error
# Use relative paths from patterns for message
paths_str = ', '.join(p.replace('*', '') for p in stdlib_paths_patterns)
message = (
f'Files under {paths_str} '
'are rolled from the Chromium repository by a '
'Copybara service.\nYou should not modify these in '
'the Perfetto repository, please make your changes '
'in Chromium instead.\n'
'Affected files:\n' +
"\n".join([f" - {f}" for f in chrome_stdlib_files]) + "\n"
'If you want to do a manual roll, you must specify '
'CHROME_STDLIB_MANUAL_ROLL=<reason> in the CL description/commit message.'
)
return [message]
def CheckAmalgamatedPythonTools(changed_files: List[str]) -> List[str]:
if sys.platform == 'win32':
return []
tool_rel_path = 'tools/gen_amalgamated_python_tools'
tool = os.path.join(REPO_ROOT, tool_rel_path)
# Use fnmatch patterns
files_to_check_list = filter_files(
changed_files, files_to_check=['python/*', tool_rel_path])
if not files_to_check_list:
return []
tool_changed = tool_rel_path in [os.path.normpath(f) for f in changed_files]
source_files_changed = any(f != tool_rel_path for f in files_to_check_list)
if not tool_changed and not source_files_changed:
return []
try:
run_command([sys.executable, tool, '--check-only'], cwd=REPO_ROOT)
except FileNotFoundError:
return [f"Tool not found: {tool}"]
except subprocess.CalledProcessError:
return [
f'amalgamated python tools/ are out of date. Run python {tool} to '
f'update them.'
]
return []
def CheckAbsolutePathsInGn(changed_files: List[str]) -> List[str]:
# Use fnmatch patterns for filtering
files_to_check_list = filter_files(
changed_files,
files_to_check=['*.gni', '*.gn'], # Simplified include
files_to_skip=[
'.gn', # Skip root .gn file by name
'gn/*', # Skip files in root gn/ directory
'buildtools/*', # Skip files in root buildtools/ directory
])
error_lines = []
# Pattern to find "//" inside quotes. Regex is fine here.
abs_path_pattern = re.compile(r'"//[^"]')
nogncheck_pattern = re.compile(r'#\s*nogncheck', re.IGNORECASE)
comment_pattern = re.compile(r'^\s*#')
for f_path in files_to_check_list:
content = read_file_content(f_path)
if content:
for i, line in enumerate(content):
if nogncheck_pattern.search(line) or comment_pattern.match(line):
continue # Skip comments and '# nogncheck' lines
if abs_path_pattern.search(line):
error_lines.append(f' {f_path}:{i+1}: {line.strip()}')
if not error_lines:
return []
return [
'Use relative paths in GN rather than absolute ("//..."): Check these '
'lines:\n' + '\n'.join(error_lines)
]
def CheckUiImports(changed_files: List[str]) -> List[str]:
tool_rel_path = 'tools/check_imports'
tool = os.path.join(REPO_ROOT, tool_rel_path)
# Use fnmatch patterns
files_to_check_list = filter_files(
changed_files, files_to_check=['ui/*', tool_rel_path])
if not files_to_check_list:
return []
try:
run_command([sys.executable, tool], cwd=REPO_ROOT, stderr=subprocess.STDOUT)
except subprocess.CalledProcessError:
return [f'Import violations detected in ui/ sources. See {tool_rel_path}']
return []
def CheckUiRatchet(changed_files: List[str]) -> List[str]:
tool_rel_path = 'tools/check_ratchet'
tool = os.path.join(REPO_ROOT, tool_rel_path)
# Use fnmatch patterns
files_to_check_list = filter_files(
changed_files, files_to_check=['ui/*', tool_rel_path])
if not files_to_check_list:
return []
try:
run_command([sys.executable, tool], cwd=REPO_ROOT, stderr=subprocess.STDOUT)
except subprocess.CalledProcessError:
return [f'Bad patterns detected in ui/ sources. See {tool_rel_path}']
return []
def main():
# 1. Determine files to check
merge_base = get_merge_base()
if not merge_base:
print("No upstream branch found. Ensure that an upstream tracking branch "
"has been set.")
sys.exit(1)
changed_files = get_changed_files(merge_base)
if not changed_files:
print("No changed files detected relative to upstream or in working dir.")
sys.exit(0)
print(f"Checking {len(changed_files)} files...") # Optional verbosity
def long_line_sources_filter(files):
return filter_files(
files,
files_to_check=['*'], # Check all initially
files_to_skip=[
'Android.bp',
"buildtools/grpc/BUILD.gn",
'*.json',
'*.sql',
'*.out',
'test/trace_processor/*/tests*',
'*BUILD',
'WORKSPACE',
'*/Makefile',
'*/perfetto_build_flags.h',
"infra/luci/*",
"ui/*.[jt]s",
"ui/pnpm-lock.yaml",
'tools/install-build-deps',
])
# 3. Run all checks
all_results = []
# Use RunAndReportIfLong for all checks
checks_to_run = [
(CheckCodeFormatted, []),
(CheckDoNotSubmit, [changed_files]),
(CheckChangeHasNoTabs, [changed_files]),
(CheckIncludeGuards, [changed_files]),
(CheckIncludeViolations, [changed_files]),
(CheckIncludePaths, [changed_files]),
(CheckProtoComments, [changed_files]),
(CheckBuild, [changed_files]),
(CheckAndroidBlueprint, [changed_files]),
(CheckBinaryDescriptors, [changed_files]),
(CheckMergedTraceConfigProto, [changed_files]),
(CheckProtoEventList, [changed_files, merge_base]), # Needs merge_base
(CheckBannedCpp, [changed_files]),
(CheckSqlModules, [changed_files]),
(CheckSqlMetrics, [changed_files]),
(CheckTestData, [changed_files
]), # Doesn't need specific files list but pass anyway
(CheckAmalgamatedPythonTools, [changed_files]),
(CheckChromeStdlib, [changed_files]), # Checks commit msg
(CheckAbsolutePathsInGn, [changed_files]),
(CheckUiImports, [changed_files]),
(CheckUiRatchet, [changed_files]),
]
for func, args in checks_to_run:
all_results.extend(RunAndReportIfLong(func, *args))
# 4. Report results
if all_results:
print("\n--- Presubmit Errors Found ---", file=sys.stderr)
# Filter out potential None results if any check returns None on error
for error in filter(None, all_results):
print(f"- {error}", file=sys.stderr)
print("\n--- Presubmit Failed ---", file=sys.stderr)
sys.exit(1)
else:
print("\n--- Presubmit Succeeded ---")
sys.exit(0)
if __name__ == '__main__':
# Ensure we run from repo root for consistent paths
os.chdir(REPO_ROOT)
main()