|  | #!/usr/bin/env python3 | 
|  | """ | 
|  | Script to synchronize (local>remote and viceversa) test data files from/to GCS. | 
|  |  | 
|  | //test/data files are not checked in the codebase because they are large binary | 
|  | file and change frequently. Instead we check-in only xxx.sha256 files, which | 
|  | contain the SHA-256 of the actual binary file, and sync them from a GCS bucket. | 
|  |  | 
|  | File in the GCS bucket are content-indexed as gs://bucket/file_name-a1b2c3f4 . | 
|  |  | 
|  | Usage: | 
|  | ./test_data status     # Prints the status of new & modified files. | 
|  | ./test_data download   # To sync remote>local (used by install-build-deps). | 
|  | ./test_data upload     # To upload newly created and modified files. | 
|  | """ | 
|  |  | 
|  | import argparse | 
|  | import logging | 
|  | import os | 
|  | import sys | 
|  | import hashlib | 
|  | import subprocess | 
|  |  | 
|  | from multiprocessing.pool import ThreadPool | 
|  | from collections import namedtuple, defaultdict | 
|  |  | 
|  | ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) | 
|  | BUCKET = 'gs://perfetto/test_data' | 
|  | SUFFIX = '.sha256' | 
|  |  | 
|  | FS_MATCH = 'matches' | 
|  | FS_NEW_FILE = 'needs upload' | 
|  | FS_MODIFIED = 'modified' | 
|  | FS_MISSING = 'needs download' | 
|  |  | 
|  | FileStat = namedtuple('FileStat', | 
|  | ['path', 'status', 'actual_digest', 'expected_digest']) | 
|  | args = None | 
|  |  | 
|  |  | 
|  | def relpath(path): | 
|  | return os.path.relpath(path, ROOT_DIR) | 
|  |  | 
|  |  | 
|  | def download(url, out_file): | 
|  | subprocess.check_call(['curl', '-L', '-s', '-o', out_file, url]) | 
|  |  | 
|  |  | 
|  | def list_files(path, scan_new_files=False): | 
|  | """ List files recursively in path. | 
|  |  | 
|  | If scan_new_files=False, returns only files with a maching xxx.sha256 tracker. | 
|  | If scan_new_files=True returns all files including untracked ones. | 
|  | """ | 
|  | seen = set() | 
|  | for root, _, files in os.walk(path): | 
|  | for fname in files: | 
|  | if fname.endswith('.swp'): | 
|  | continue  # Temporary files left around if CTRL-C-ing while downloading. | 
|  | if fname in ["OWNERS", "README.md"]: | 
|  | continue  # OWNERS or README.md file should not be uploaded. | 
|  | fpath = os.path.join(root, fname) | 
|  | if not os.path.isfile(fpath) or fname.startswith('.'): | 
|  | continue | 
|  | if fpath.endswith(SUFFIX): | 
|  | fpath = fpath[:-len(SUFFIX)] | 
|  | elif not scan_new_files: | 
|  | continue | 
|  | if fpath not in seen: | 
|  | seen.add(fpath) | 
|  | yield fpath | 
|  |  | 
|  |  | 
|  | def hash_file(fpath): | 
|  | hasher = hashlib.sha256() | 
|  | with open(fpath, 'rb') as f: | 
|  | for chunk in iter(lambda: f.read(32768), b''): | 
|  | hasher.update(chunk) | 
|  | return hasher.hexdigest() | 
|  |  | 
|  |  | 
|  | def map_concurrently(fn, files): | 
|  | done = 0 | 
|  | for fs in ThreadPool(args.jobs).imap_unordered(fn, files): | 
|  | assert (isinstance(fs, FileStat)) | 
|  | done += 1 | 
|  | if not args.quiet: | 
|  | print( | 
|  | '[%d/%d] %-60s' % (done, len(files), relpath(fs.path)[-60:]), | 
|  | end='\r') | 
|  | if not args.quiet: | 
|  | print('') | 
|  |  | 
|  |  | 
|  | def get_file_status(fpath): | 
|  | sha_file = fpath + SUFFIX | 
|  | sha_exists = os.path.exists(sha_file) | 
|  | file_exists = os.path.exists(fpath) | 
|  | actual_digest = None | 
|  | expected_digest = None | 
|  | if sha_exists: | 
|  | with open(sha_file, 'r') as f: | 
|  | expected_digest = f.readline().strip() | 
|  | if file_exists: | 
|  | actual_digest = hash_file(fpath) | 
|  | if sha_exists and not file_exists: | 
|  | status = FS_MISSING | 
|  | elif not sha_exists and file_exists: | 
|  | status = FS_NEW_FILE | 
|  | elif not sha_exists and not file_exists: | 
|  | raise Exception(fpath) | 
|  | elif expected_digest == actual_digest: | 
|  | status = FS_MATCH | 
|  | else: | 
|  | status = FS_MODIFIED | 
|  | return FileStat(fpath, status, actual_digest, expected_digest) | 
|  |  | 
|  |  | 
|  | def cmd_upload(dir): | 
|  | all_files = list_files(dir, scan_new_files=True) | 
|  | files_to_upload = [] | 
|  | for fs in ThreadPool(args.jobs).imap_unordered(get_file_status, all_files): | 
|  | if fs.status in (FS_NEW_FILE, FS_MODIFIED): | 
|  | files_to_upload.append(fs) | 
|  | if len(files_to_upload) == 0: | 
|  | if not args.quiet: | 
|  | print('No modified or new files require uploading') | 
|  | return 0 | 
|  | if args.dry_run: | 
|  | return 0 | 
|  | if not args.quiet: | 
|  | print('About to upload %d files:' % len(files_to_upload)) | 
|  | print('\n'.join(relpath(f.path) for f in files_to_upload)) | 
|  | print('') | 
|  | input('Press a key to continue or CTRL-C to abort') | 
|  |  | 
|  | def upload_one_file(fs): | 
|  | assert (fs.actual_digest is not None) | 
|  | dst_name = '%s/%s-%s' % (args.bucket, os.path.basename( | 
|  | fs.path), fs.actual_digest) | 
|  | cmd = ['gsutil', '-q', 'cp', '-n', '-a', 'public-read', fs.path, dst_name] | 
|  | logging.debug(' '.join(cmd)) | 
|  | subprocess.check_call(cmd) | 
|  | with open(fs.path + SUFFIX + '.swp', 'w') as f: | 
|  | f.write(fs.actual_digest) | 
|  | os.replace(fs.path + SUFFIX + '.swp', fs.path + SUFFIX) | 
|  | return fs | 
|  |  | 
|  | map_concurrently(upload_one_file, files_to_upload) | 
|  | return 0 | 
|  |  | 
|  |  | 
|  | def cmd_download(dir, overwrite_locally_modified=False): | 
|  | files_to_download = [] | 
|  | modified = [] | 
|  | all_files = list_files(dir, scan_new_files=False) | 
|  | for fs in ThreadPool(args.jobs).imap_unordered(get_file_status, all_files): | 
|  | if fs.status == FS_MISSING: | 
|  | files_to_download.append(fs) | 
|  | elif fs.status == FS_MODIFIED: | 
|  | modified.append(fs) | 
|  |  | 
|  | if len(modified) > 0 and not overwrite_locally_modified: | 
|  | print('WARNING: The following files diverged locally and will NOT be ' + | 
|  | 'overwritten if you continue') | 
|  | print('\n'.join(relpath(f.path) for f in modified)) | 
|  | print('') | 
|  | print('Re run `download --overwrite` to overwrite locally modified files') | 
|  | print('or `upload` to sync them on the GCS bucket') | 
|  | print('') | 
|  | input('Press a key to continue or CTRL-C to abort') | 
|  | elif overwrite_locally_modified: | 
|  | files_to_download += modified | 
|  |  | 
|  | if len(files_to_download) == 0: | 
|  | if not args.quiet: | 
|  | print('Nothing to do, all files are synced') | 
|  | return 0 | 
|  |  | 
|  | if not args.quiet: | 
|  | print('Downloading %d files in //%s' % | 
|  | (len(files_to_download), relpath(args.dir))) | 
|  | if args.dry_run: | 
|  | print('\n'.join(files_to_download)) | 
|  | return | 
|  |  | 
|  | def download_one_file(fs): | 
|  | assert (fs.expected_digest is not None) | 
|  | uri = '%s/%s-%s' % (args.bucket, os.path.basename( | 
|  | fs.path), fs.expected_digest) | 
|  | uri = uri.replace('gs://', 'https://storage.googleapis.com/') | 
|  | logging.debug(uri) | 
|  | tmp_path = fs.path + '.swp' | 
|  | download(uri, tmp_path) | 
|  | digest = hash_file(tmp_path) | 
|  | if digest != fs.expected_digest: | 
|  | raise Exception('Mismatching digest for %s. expected=%s, actual=%s' % | 
|  | (uri, fs.expected_digest, digest)) | 
|  | os.replace(tmp_path, fs.path) | 
|  | return fs | 
|  |  | 
|  | map_concurrently(download_one_file, files_to_download) | 
|  | return 0 | 
|  |  | 
|  |  | 
|  | def cmd_status(dir): | 
|  | files = list_files(dir, scan_new_files=True) | 
|  | file_by_status = defaultdict(list) | 
|  | num_files = 0 | 
|  | num_out_of_sync = 0 | 
|  | for fs in ThreadPool(args.jobs).imap_unordered(get_file_status, files): | 
|  | file_by_status[fs.status].append(relpath(fs.path)) | 
|  | num_files += 1 | 
|  | for status, rpaths in sorted(file_by_status.items()): | 
|  | if status == FS_NEW_FILE and args.ignore_new: | 
|  | continue | 
|  | if status != FS_MATCH: | 
|  | for rpath in rpaths: | 
|  | num_out_of_sync += 1 | 
|  | if not args.quiet: | 
|  | print('%-15s: %s' % (status, rpath)) | 
|  | if num_out_of_sync == 0: | 
|  | if not args.quiet: | 
|  | print('Scanned %d files in //%s, everything in sync.' % | 
|  | (num_files, relpath(dir))) | 
|  | return 0 | 
|  | return 1 | 
|  |  | 
|  |  | 
|  | def main(): | 
|  | parser = argparse.ArgumentParser() | 
|  | parser.add_argument('--dir', default=os.path.join(ROOT_DIR, 'test/data')) | 
|  | parser.add_argument('--overwrite', action='store_true') | 
|  | parser.add_argument('--bucket', default=BUCKET) | 
|  | parser.add_argument('--jobs', '-j', default=10, type=int) | 
|  | parser.add_argument('--dry-run', '-n', action='store_true') | 
|  | parser.add_argument('--quiet', '-q', action='store_true') | 
|  | parser.add_argument('--verbose', '-v', action='store_true') | 
|  | parser.add_argument('--ignore-new', action='store_true') | 
|  | parser.add_argument('cmd', choices=['status', 'download', 'upload']) | 
|  | global args | 
|  | args = parser.parse_args() | 
|  | logging.basicConfig( | 
|  | format='%(asctime)s %(levelname).1s %(message)s', | 
|  | level=logging.DEBUG if args.verbose else logging.INFO, | 
|  | datefmt=r'%H:%M:%S') | 
|  | if args.cmd == 'status': | 
|  | return cmd_status(args.dir) | 
|  | if args.cmd == 'download': | 
|  | return cmd_download(args.dir, overwrite_locally_modified=args.overwrite) | 
|  | if args.cmd == 'upload': | 
|  | return cmd_upload(args.dir) | 
|  | print('Unknown command: %s' % args.cmd) | 
|  |  | 
|  |  | 
|  | if __name__ == '__main__': | 
|  | sys.exit(main()) |