blob: b31d331b7c57cee1de37e86728900ff578f728a3 [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright 2013 The Flutter Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
#
# Usage: scan_deps.py --osv-lockfile <lockfile> --output <parsed lockfile>
#
# This script parses the dependencies provided in lockfile format for
# osv-scanner so that the common ancestor commits from the mirrored and
# upstream for each dependency are provided in the lockfile
# It is expected that the osv-lockfile input is updated by this script
# and then uploaded using GitHub actions to be used by the osv-scanner
# reusable action.
import argparse
import json
import os
import shutil
import subprocess
import sys
from compatibility_helper import byte_str_decode
SCRIPT_DIR = os.path.dirname(sys.argv[0])
CHECKOUT_ROOT = os.path.realpath(os.path.join(SCRIPT_DIR, '..'))
DEP_CLONE_DIR = CHECKOUT_ROOT + '/clone-test'
DEPS = os.path.join(CHECKOUT_ROOT, 'DEPS')
UPSTREAM_PREFIX = 'upstream_'
failed_deps = [] # Deps which fail to be cloned or git-merge based.
def parse_deps_file(lockfile, output_file):
"""
Takes input of fully qualified dependencies,
for each dep find the common ancestor commit SHA
from the upstream and query OSV API using that SHA
If the commit cannot be found or the dep cannot be
compared to an upstream, prints list of those deps
"""
deps_list = []
with open(DEPS, 'r') as file:
local_scope = {}
global_scope = {'Var': lambda x: x} # Dummy lambda.
# Read the content.
deps_content = file.read()
# Eval the content.
exec(deps_content, global_scope, local_scope)
# Extract the deps and filter.
deps_list = local_scope.get('vars')
with open(lockfile, 'r') as file:
data = json.load(file)
results = data['results']
if not os.path.exists(DEP_CLONE_DIR):
os.mkdir(DEP_CLONE_DIR) # Clone deps with upstream into temporary dir.
# Extract commit hash, save in dictionary.
for result in results:
packages = result['packages']
for package in packages:
mirror_url = package['package']['name']
commit = package['package']['commit']
ancestor_result = get_common_ancestor([mirror_url, commit], deps_list)
if ancestor_result:
common_commit, upstream = ancestor_result
package['package']['commit'] = common_commit
package['package']['name'] = upstream
try:
# Clean up cloned upstream dependency directory.
shutil.rmtree(
DEP_CLONE_DIR
) # Use shutil.rmtree since dir could be non-empty.
except OSError as clone_dir_error:
print(
'Error cleaning up clone directory: %s : %s' %
(DEP_CLONE_DIR, clone_dir_error.strerror)
)
# Write common ancestor commit data to new file to be
# used in next github action step with osv-scanner.
# The output_file name defaults to converted-osv-lockfile.json
with open(output_file, 'w') as file:
json.dump(data, file)
def get_common_ancestor(dep, deps_list):
"""
Given an input of a mirrored dep,
compare to the mapping of deps to their upstream
in DEPS and find a common ancestor
commit SHA value.
This is done by first cloning the mirrored dep,
then a branch which tracks the upstream.
From there, git merge-base operates using the HEAD
commit SHA of the upstream branch and the pinned
SHA value of the mirrored branch
"""
# dep[0] contains the mirror repo.
# dep[1] contains the mirror's pinned SHA.
# upstream is the origin repo.
dep_name = dep[0].split('/')[-1].split('.')[0]
if UPSTREAM_PREFIX + dep_name not in deps_list:
print('did not find dep: ' + dep_name)
return None
try:
# Get the upstream URL from the mapping in DEPS file.
upstream = deps_list.get(UPSTREAM_PREFIX + dep_name)
temp_dep_dir = DEP_CLONE_DIR + '/' + dep_name
# Clone dependency from mirror.
subprocess.check_output(['git', 'clone', '--quiet', '--', dep[0], dep_name],
cwd=DEP_CLONE_DIR)
# Create branch that will track the upstream dep.
print(
'attempting to add upstream remote from: {upstream}'.format(
upstream=upstream
)
)
subprocess.check_output(['git', 'remote', 'add', 'upstream', upstream],
cwd=temp_dep_dir)
subprocess.check_output(['git', 'fetch', '--quiet', 'upstream'],
cwd=temp_dep_dir)
# Get name of the default branch for upstream (e.g. main/master/etc.).
default_branch = subprocess.check_output(
'git remote show upstream ' + "| sed -n \'/HEAD branch/s/.*: //p\'",
cwd=temp_dep_dir,
shell=True
)
default_branch = byte_str_decode(default_branch)
default_branch = default_branch.strip()
# Make upstream branch track the upstream dep.
subprocess.check_output([
'git', 'checkout', '--force', '-b', 'upstream', '--track',
'upstream/' + default_branch
],
cwd=temp_dep_dir)
# Get the most recent commit from default branch of upstream.
commit = subprocess.check_output(
'git for-each-ref ' +
"--format=\'%(objectname:short)\' refs/heads/upstream",
cwd=temp_dep_dir,
shell=True
)
commit = byte_str_decode(commit)
commit = commit.strip()
# Perform merge-base on most recent default branch commit and pinned mirror commit.
ancestor_commit = subprocess.check_output(
'git merge-base {commit} {depUrl}'.format(commit=commit, depUrl=dep[1]),
cwd=temp_dep_dir,
shell=True
)
ancestor_commit = byte_str_decode(ancestor_commit)
ancestor_commit = ancestor_commit.strip()
print('Ancestor commit: ' + ancestor_commit)
return ancestor_commit, upstream
except subprocess.CalledProcessError as error:
print(
"Subprocess command '{0}' failed with exit code: {1}.".format(
error.cmd, str(error.returncode)
)
)
if error.output:
print("Subprocess error output: '{0}'".format(error.output))
return None
def parse_args(args):
args = args[1:]
parser = argparse.ArgumentParser(
description='A script to find common ancestor commit SHAs'
)
parser.add_argument(
'--osv-lockfile',
'-d',
type=str,
help='Input osv-scanner compatible lockfile of dependencies to parse.',
default=os.path.join(CHECKOUT_ROOT, 'osv-lockfile.json')
)
parser.add_argument(
'--output',
'-o',
type=str,
help='Output osv-scanner compatible deps file.',
default=os.path.join(CHECKOUT_ROOT, 'converted-osv-lockfile.json')
)
return parser.parse_args(args)
def main(argv):
args = parse_args(argv)
parse_deps_file(args.osv_lockfile, args.output)
return 0
if __name__ == '__main__':
sys.exit(main(sys.argv))