blob: 9f820c57d4bb8810e20447339213ae926500bacb [file]
#!/usr/bin/env python3
# Copyright (C) 2025 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Unified stdlib parser library for Perfetto SQL standard library.
This module provides functions to parse stdlib SQL files and generate
structured output for consumption by various tools.
"""
import os
from pathlib import Path
from typing import List, Tuple, Optional
from python.generators.sql_processing.docs_parse import DocParseOptions, ParsedModule, parse_file
from python.generators.sql_processing.utils import is_internal
from python.generators.sql_processing.stdlib_tags import get_tags, get_table_importance
from python.perfetto.trace_data_checks import check_to_query, MODULE_DATA_CHECK_SQL, TABLE_DATA_CHECK_SQL
ROOT_DIR = os.path.dirname(
os.path.dirname(
os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
def find_stdlib_path():
"""Find the stdlib directory in the current repository."""
stdlib_path = os.path.join(ROOT_DIR, "src", "trace_processor", "perfetto_sql",
"stdlib")
if not os.path.exists(stdlib_path):
raise ValueError(f"stdlib path not found: {stdlib_path}")
return Path(stdlib_path)
def get_module_name(rel_path: str) -> str:
"""Convert a relative SQL file path to its module name.
Args:
rel_path: Relative path from stdlib root (e.g., "slices/stack.sql")
Returns:
Module name (e.g., "slices.stack")
"""
# Remove .sql extension
path_without_ext = rel_path.removesuffix('.sql')
# Convert path separators to dots for module name
module_name = path_without_ext.replace(os.sep, '.')
return module_name
def parse_all_modules(
stdlib_path: str,
include_internal: bool = False,
name_filter: Optional[str] = None
) -> List[Tuple[str, str, str, ParsedModule]]:
"""Parse all SQL modules in the stdlib.
Args:
stdlib_path: Path to stdlib directory
include_internal: Whether to include internal (private) artifacts
name_filter: Optional regex to filter module names
Returns:
List of tuples: (abs_path, rel_path, module_name, parsed_module)
"""
import re
modules = []
for root, _, files in os.walk(stdlib_path, topdown=True):
for f in files:
abs_path = os.path.join(root, f)
if not abs_path.endswith(".sql"):
continue
rel_path = os.path.relpath(abs_path, stdlib_path)
module_name = get_module_name(rel_path)
# Apply name filter if provided
if name_filter is not None:
try:
pattern = re.compile(name_filter)
except re.error as e:
raise ValueError(f"Invalid regex pattern '{name_filter}': {e}")
if not pattern.match(rel_path):
continue
# Read and parse the file
with open(abs_path, 'r', encoding='utf-8') as f:
sql = f.read()
parsed = parse_file(
rel_path,
sql,
options=DocParseOptions(
enforce_every_column_set_is_documented=True,
include_internal=include_internal),
)
# Some modules (i.e. `deprecated`) should not generate output
if not parsed:
continue
modules.append((abs_path, rel_path, module_name, parsed))
return modules
def format_entities(modules: List[Tuple[str, str, str, ParsedModule]]) -> dict:
"""Format parsed modules as entity map for dependency checking.
Output format:
{
"modules": {
"slices.stack": {
"entities": [
{"name": "stack_from_stack_profile_callsite", "is_internal": false},
{"name": "_intervals_flatten", "is_internal": true}
],
"includes": ["slices.with_context", "graphs.search"]
},
...
},
"entity_to_module": {
"stack_from_stack_profile_callsite": "slices.stack",
"_intervals_flatten": "slices.stack",
...
}
}
"""
modules_dict = {}
entity_to_module = {}
for _, _, module_name, parsed in modules:
# Extract all entity names with internal flag
entities = []
# Tables and views
for table in parsed.table_views:
entities.append({
"name": table.name,
"is_internal": is_internal(table.name)
})
entity_to_module[table.name] = module_name
# Functions
for func in parsed.functions:
entities.append({
"name": func.name,
"is_internal": is_internal(func.name)
})
entity_to_module[func.name] = module_name
# Table functions
for func in parsed.table_functions:
entities.append({
"name": func.name,
"is_internal": is_internal(func.name)
})
entity_to_module[func.name] = module_name
# Macros
for macro in parsed.macros:
entities.append({
"name": macro.name,
"is_internal": is_internal(macro.name)
})
entity_to_module[macro.name] = module_name
# Extract includes
# Note: inc.module already contains the full module name
# Example: inc.module = "android.suspend", inc.package = "android"
includes = [inc.module for inc in parsed.includes]
modules_dict[module_name] = {
"entities": entities,
"includes": includes,
}
return {
"modules": modules_dict,
"entity_to_module": entity_to_module,
}
def format_metadata(modules: List[Tuple[str, str, str, ParsedModule]]) -> dict:
"""Format only the metadata not available from the TP table functions.
The TP exposes module names, packages, table/function/macro names,
descriptions, types, columns and args. This function emits only the
complementary metadata that lives outside the SQL syntax:
- tags and data-availability check SQL (module level)
- includes (INCLUDE PERFETTO MODULE directives)
- importance and data-availability check SQL (table level)
Output (keyed by module name so the UI can look up by key):
{
"android.memory": {
"tags": ["android"],
"includes": ["android.memory.heap"],
"data_check_sql": "SELECT EXISTS(...) AS has_data", // null if absent
"tables": { // omitted if empty
"android_heap_profile_allocation": {
"importance": "high", // null if absent
"data_check_sql": "SELECT EXISTS(...) AS has_data" // null if absent
}
}
},
...
}
"""
result = {}
for _, _, module_name, parsed in modules:
tags = get_tags(module_name)
includes = [inc.module for inc in parsed.includes]
data_check_sql = (
check_to_query(MODULE_DATA_CHECK_SQL[module_name])
if module_name in MODULE_DATA_CHECK_SQL else None)
tables = {}
for table in parsed.table_views:
importance = get_table_importance(table.name)
table_check = (
check_to_query(TABLE_DATA_CHECK_SQL[table.name])
if table.name in TABLE_DATA_CHECK_SQL else None)
if importance is not None or table_check is not None:
tables[table.name] = {
'importance': importance,
'data_check_sql': table_check,
}
entry = {
'tags': tags,
'includes': includes,
'data_check_sql': data_check_sql,
}
if tables:
entry['tables'] = tables
result[module_name] = entry
return result