blob: 76860557c0df18e39bce1ab8932cf0f23859f912 [file] [log] [blame]
#!/usr/bin/env python3
# Copyright (C) 2022 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from abc import ABC
from dataclasses import dataclass
import re
import sys
from typing import Dict, List, Optional, Set, NamedTuple
from python.generators.sql_processing.docs_extractor import DocsExtractor
from python.generators.sql_processing.utils import ObjKind
from python.generators.sql_processing.utils import ALLOWED_PREFIXES
from python.generators.sql_processing.utils import OBJECT_NAME_ALLOWLIST
from python.generators.sql_processing.utils import COLUMN_ANNOTATION_PATTERN
from python.generators.sql_processing.utils import ANY_PATTERN
from python.generators.sql_processing.utils import ARG_DEFINITION_PATTERN
from python.generators.sql_processing.utils import ARG_ANNOTATION_PATTERN
def _is_internal(name: str) -> bool:
return re.match(r'^_.*', name, re.IGNORECASE) is not None
def _is_snake_case(s: str) -> bool:
return re.fullmatch(r'^[a-z_0-9]*$', s) is not None
def parse_comment(comment: str) -> str:
"""Parse a SQL comment (i.e. -- Foo\n -- bar.) into a string (i.e. "Foo bar.")."""
return ' '.join(line.strip().lstrip('--').lstrip()
for line in comment.strip().split('\n'))
def get_module_prefix_error(name: str, path: str, module: str) -> Optional[str]:
"""Returns error message if the name is not correct, None otherwise."""
prefix = name.lower().split('_')[0]
if module in ["common", "prelude", "deprecated"]:
if prefix == module:
return (f'Names of tables/views/functions in the "{module}" module '
f'should not start with {module}')
return None
if prefix == module:
# Module prefix is always allowed.
return None
allowed_prefixes = [module]
for (path_prefix, allowed_name_prefix) in ALLOWED_PREFIXES.items():
if path.startswith(path_prefix):
if prefix == allowed_name_prefix:
return None
allowed_prefixes.append(allowed_name_prefix)
if path in OBJECT_NAME_ALLOWLIST and name in OBJECT_NAME_ALLOWLIST[path]:
return None
return (
f'Names of tables/views/functions at path "{path}" should be prefixed '
f'with one of following names: {", ".join(allowed_prefixes)}')
class Arg(NamedTuple):
# TODO(b/307926059): the type is missing on old-style documentation for
# tables. Make it "str" after stdlib is migrated.
type: Optional[str]
description: str
class AbstractDocParser(ABC):
@dataclass
class Column:
pass
def __init__(self, path: str, module: str):
self.path = path
self.module = module
self.name = None
self.errors = []
def _parse_name(self, upper: bool = False):
assert self.name
assert isinstance(self.name, str)
module_prefix_error = get_module_prefix_error(self.name, self.path,
self.module)
if module_prefix_error is not None:
self._error(module_prefix_error)
return self.name.strip()
def _parse_desc_not_empty(self, desc: str):
if not desc:
self._error('Description of the table/view/function/macro is missing')
return desc.strip()
def _validate_only_contains_annotations(self,
ans: List[DocsExtractor.Annotation],
ans_types: Set[str]):
used_ans_types = set(a.key for a in ans)
for type in used_ans_types.difference(ans_types):
self._error(f'Unknown documentation annotation {type}')
def _parse_columns(self, ans: List[DocsExtractor.Annotation],
schema: Optional[str]) -> Dict[str, Arg]:
column_annotations = {}
for t in ans:
if t.key != '@column':
continue
m = re.match(COLUMN_ANNOTATION_PATTERN, t.value)
if not m:
self._error(f'@column annotation value {t.value} does not match '
f'pattern {COLUMN_ANNOTATION_PATTERN}')
continue
column_annotations[m.group(1)] = Arg(None, m.group(2).strip())
if not schema:
# If we don't have schema, we have to accept annotations as the source of
# truth.
return column_annotations
columns = self._parse_args_definition(schema)
for column in columns:
inline_comment = columns[column].description
if not inline_comment and column not in column_annotations:
self._error(f'Column "{column}" is missing a description. Please add a '
'comment in front of the column definition')
continue
if column not in column_annotations:
continue
annotation = column_annotations[column].description
if inline_comment and annotation:
self._error(f'Column "{column}" is documented twice. Please remove the '
'@column annotation')
if not inline_comment and annotation:
# Absorb old-style annotations.
columns[column] = Arg(columns[column].type, annotation)
# Check that the annotations match existing columns.
for annotation in column_annotations:
if annotation not in columns:
self._error(f'Column "{annotation}" is documented but does not exist '
'in table definition')
return columns
def _parse_args(self, ans: List[DocsExtractor.Annotation],
sql_args_str: str) -> Dict[str, Arg]:
args = self._parse_args_definition(sql_args_str)
arg_annotations = {}
for an in ans:
if an.key != '@arg':
continue
m = re.match(ARG_ANNOTATION_PATTERN, an.value)
if m is None:
self._error(f'Expected arg documentation "{an.value}" to match pattern '
f'{ARG_ANNOTATION_PATTERN}')
continue
arg_annotations[m.group(1)] = Arg(m.group(2), m.group(3).strip())
for arg in args:
if not args[arg].description and arg not in arg_annotations:
self._error(f'Arg "{arg}" is missing a description. '
'Please add a comment in front of the arg definition.')
if args[arg].description and arg in arg_annotations:
self._error(f'Arg "{arg}" is documented twice. '
'Please remove the @arg annotation')
if not args[arg].description and arg in arg_annotations:
# Absorb old-style annotations.
# TODO(b/307926059): Remove it once stdlib is migrated.
args[arg] = Arg(args[arg].type, arg_annotations[arg].description)
for arg in arg_annotations:
if arg not in args:
self._error(
f'Arg "{arg}" is documented but not found in function definition.')
return args
# Parse function argument definition list or a table schema, e.g.
# arg1 INT, arg2 STRING, including their comments.
def _parse_args_definition(self, args_str: str) -> Dict[str, Arg]:
result = {}
remaining_args = args_str.strip()
while remaining_args:
m = re.match(fr'^{ARG_DEFINITION_PATTERN}({ANY_PATTERN})', remaining_args)
if not m:
self._error(f'Expected "{args_str}" to correspond to '
'"-- Comment\n arg_name TYPE" format '
'({ARG_DEFINITION_PATTERN})')
return result
groups = m.groups()
comment = '' if groups[0] is None else parse_comment(groups[0])
name = groups[-3]
type = groups[-2]
result[name] = Arg(type, comment)
# Strip whitespace and comma and parse the next arg.
remaining_args = groups[-1].lstrip().lstrip(',').lstrip()
return result
def _error(self, error: str):
self.errors.append(
f'Error while parsing documentation for "{self.name}" in {self.path}: '
f'{error}')
class TableOrView:
name: str
type: str
desc: str
cols: Dict[str, Arg]
def __init__(self, name, type, desc, cols):
self.name = name
self.type = type
self.desc = desc
self.cols = cols
class TableViewDocParser(AbstractDocParser):
"""Parses documentation for CREATE TABLE and CREATE VIEW statements."""
def __init__(self, path: str, module: str):
super().__init__(path, module)
def parse(self, doc: DocsExtractor.Extract) -> Optional[TableOrView]:
assert doc.obj_kind == ObjKind.table_view
or_replace, perfetto_or_virtual, type, self.name, schema = doc.obj_match
if or_replace is not None:
self._error(
f'{type} "{self.name}": CREATE OR REPLACE is not allowed in stdlib '
f'as standard library modules can only included once. Please just '
f'use CREATE instead.')
if _is_internal(self.name):
return None
is_perfetto_table_or_view = (
perfetto_or_virtual and perfetto_or_virtual.lower() == 'perfetto')
if not schema and is_perfetto_table_or_view:
self._error(
f'{type} "{self.name}": schema is missing for a non-internal stdlib'
f' perfetto table or view')
self._validate_only_contains_annotations(doc.annotations, {'@column'})
return TableOrView(
name=self._parse_name(),
type=type,
desc=self._parse_desc_not_empty(doc.description),
cols=self._parse_columns(doc.annotations, schema),
)
class Function:
name: str
desc: str
args: Dict[str, Arg]
return_type: str
return_desc: str
def __init__(self, name, desc, args, return_type, return_desc):
self.name = name
self.desc = desc
self.args = args
self.return_type = return_type
self.return_desc = return_desc
class FunctionDocParser(AbstractDocParser):
"""Parses documentation for CREATE_FUNCTION statements."""
def __init__(self, path: str, module: str):
super().__init__(path, module)
def parse(self, doc: DocsExtractor.Extract) -> Optional[Function]:
or_replace, self.name, args, ret_comment, ret_type = doc.obj_match
if or_replace is not None:
self._error(
f'Function "{self.name}": CREATE OR REPLACE is not allowed in stdlib '
f'as standard library modules can only included once. Please just '
f'use CREATE instead.')
# Ignore internal functions.
if _is_internal(self.name):
return None
name = self._parse_name()
if not _is_snake_case(name):
self._error(f'Function name "{name}" is not snake_case'
f' (should be {name.casefold()})')
ret_desc = None if ret_comment is None else parse_comment(ret_comment)
if not ret_desc:
self._error(f'Function "{name}": return description is missing')
return Function(
name=name,
desc=self._parse_desc_not_empty(doc.description),
args=self._parse_args(doc.annotations, args),
return_type=ret_type,
return_desc=ret_desc,
)
class TableFunction:
name: str
desc: str
cols: Dict[str, Arg]
args: Dict[str, Arg]
def __init__(self, name, desc, cols, args):
self.name = name
self.desc = desc
self.cols = cols
self.args = args
class TableFunctionDocParser(AbstractDocParser):
"""Parses documentation for table function statements."""
def __init__(self, path: str, module: str):
super().__init__(path, module)
def parse(self, doc: DocsExtractor.Extract) -> Optional[TableFunction]:
or_replace, self.name, args, ret_comment, columns = doc.obj_match
if or_replace is not None:
self._error(
f'Function "{self.name}": CREATE OR REPLACE is not allowed in stdlib '
f'as standard library modules can only included once. Please just '
f'use CREATE instead.')
# Ignore internal functions.
if _is_internal(self.name):
return None
self._validate_only_contains_annotations(doc.annotations,
{'@arg', '@column'})
name = self._parse_name()
if not _is_snake_case(name):
self._error(f'Function name "{name}" is not snake_case'
f' (should be "{name.casefold()}")')
return TableFunction(
name=name,
desc=self._parse_desc_not_empty(doc.description),
cols=self._parse_columns(doc.annotations, columns),
args=self._parse_args(doc.annotations, args),
)
class Macro:
name: str
desc: str
return_desc: str
return_type: str
args: Dict[str, Arg]
def __init__(self, name: str, desc: str, return_desc: str, return_type: str,
args: Dict[str, Arg]):
self.name = name
self.desc = desc
self.return_desc = return_desc
self.return_type = return_type
self.args = args
class MacroDocParser(AbstractDocParser):
"""Parses documentation for macro statements."""
def __init__(self, path: str, module: str):
super().__init__(path, module)
def parse(self, doc: DocsExtractor.Extract) -> Optional[Macro]:
or_replace, self.name, args, return_desc, return_type = doc.obj_match
if or_replace is not None:
self._error(
f'Function "{self.name}": CREATE OR REPLACE is not allowed in stdlib '
f'as standard library modules can only included once. Please just '
f'use CREATE instead.')
# Ignore internal macros.
if _is_internal(self.name):
return None
self._validate_only_contains_annotations(doc.annotations, set())
name = self._parse_name()
if not _is_snake_case(name):
self._error(f'Macro name "{name}" is not snake_case'
f' (should be "{name.casefold()}")')
return Macro(
name=name,
desc=self._parse_desc_not_empty(doc.description),
return_desc=parse_comment(return_desc),
return_type=return_type,
args=self._parse_args(doc.annotations, args),
)
class ParsedFile:
"""Data class containing all of the docmentation of single SQL file"""
errors: List[str] = []
table_views: List[TableOrView] = []
functions: List[Function] = []
table_functions: List[TableFunction] = []
macros: List[Macro] = []
def __init__(self, errors: List[str], table_views: List[TableOrView],
functions: List[Function], table_functions: List[TableFunction],
macros: List[Macro]):
self.errors = errors
self.table_views = table_views
self.functions = functions
self.table_functions = table_functions
self.macros = macros
def parse_file(path: str, sql: str) -> Optional[ParsedFile]:
"""Reads the provided SQL and, if possible, generates a dictionary with data
from documentation together with errors from validation of the schema."""
if sys.platform.startswith('win'):
path = path.replace('\\', '/')
# Get module name
module_name = path.split('/stdlib/')[-1].split('/')[0]
# Disable support for `deprecated` module
if module_name == "deprecated":
return
# Extract all the docs from the SQL.
extractor = DocsExtractor(path, module_name, sql)
docs = extractor.extract()
if extractor.errors:
return ParsedFile(extractor.errors, [], [], [], [])
# Parse the extracted docs.
errors = []
table_views = []
functions = []
table_functions = []
macros = []
for doc in docs:
if doc.obj_kind == ObjKind.table_view:
parser = TableViewDocParser(path, module_name)
res = parser.parse(doc)
if res:
table_views.append(res)
errors += parser.errors
if doc.obj_kind == ObjKind.function:
parser = FunctionDocParser(path, module_name)
res = parser.parse(doc)
if res:
functions.append(res)
errors += parser.errors
if doc.obj_kind == ObjKind.table_function:
parser = TableFunctionDocParser(path, module_name)
res = parser.parse(doc)
if res:
table_functions.append(res)
errors += parser.errors
if doc.obj_kind == ObjKind.macro:
parser = MacroDocParser(path, module_name)
res = parser.parse(doc)
if res:
macros.append(res)
errors += parser.errors
return ParsedFile(errors, table_views, functions, table_functions, macros)