blob: 1442ae9c3fec03c8c65e77bf96a033728f141bc5 [file] [log] [blame] [edit]
#!/usr/bin/env python3
"""
Script to synchronize (local>remote and viceversa) test data files from/to GCS.
//test/data files are not checked in the codebase because they are large binary
file and change frequently. Instead we check-in only xxx.sha256 files, which
contain the SHA-256 of the actual binary file, and sync them from a GCS bucket.
File in the GCS bucket are content-indexed as gs://bucket/file_name-a1b2c3f4 .
Usage:
./test_data status # Prints the status of new & modified files.
./test_data download # To sync remote>local (used by install-build-deps).
./test_data upload # To upload newly created and modified files.
"""
import argparse
import logging
import os
import sys
import hashlib
import subprocess
from multiprocessing.pool import ThreadPool
from collections import namedtuple, defaultdict
ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
BUCKET = 'gs://perfetto/test_data'
SUFFIX = '.sha256'
FS_MATCH = 'matches'
FS_NEW_FILE = 'needs upload'
FS_MODIFIED = 'modified'
FS_MISSING = 'needs download'
FileStat = namedtuple('FileStat',
['path', 'status', 'actual_digest', 'expected_digest'])
args = None
def relpath(path):
return os.path.relpath(path, ROOT_DIR)
def download(url, out_file):
subprocess.check_call(['curl', '-L', '-s', '-o', out_file, url])
def list_files(path, scan_new_files=False):
""" List files recursively in path.
If scan_new_files=False, returns only files with a maching xxx.sha256 tracker.
If scan_new_files=True returns all files including untracked ones.
"""
seen = set()
for root, _, files in os.walk(path):
for fname in files:
if fname.endswith('.swp'):
continue # Temporary files left around if CTRL-C-ing while downloading.
if fname in ["OWNERS", "README.md"]:
continue # OWNERS or README.md file should not be uploaded.
fpath = os.path.join(root, fname)
if not os.path.isfile(fpath) or fname.startswith('.'):
continue
if fpath.endswith(SUFFIX):
fpath = fpath[:-len(SUFFIX)]
elif not scan_new_files:
continue
if fpath not in seen:
seen.add(fpath)
yield fpath
def hash_file(fpath):
hasher = hashlib.sha256()
with open(fpath, 'rb') as f:
for chunk in iter(lambda: f.read(32768), b''):
hasher.update(chunk)
return hasher.hexdigest()
def map_concurrently(fn, files):
done = 0
for fs in ThreadPool(args.jobs).imap_unordered(fn, files):
assert (isinstance(fs, FileStat))
done += 1
if not args.quiet:
print(
'[%d/%d] %-60s' % (done, len(files), relpath(fs.path)[-60:]),
end='\r')
if not args.quiet:
print('')
def get_file_status(fpath):
sha_file = fpath + SUFFIX
sha_exists = os.path.exists(sha_file)
file_exists = os.path.exists(fpath)
actual_digest = None
expected_digest = None
if sha_exists:
with open(sha_file, 'r') as f:
expected_digest = f.readline().strip()
if file_exists:
actual_digest = hash_file(fpath)
if sha_exists and not file_exists:
status = FS_MISSING
elif not sha_exists and file_exists:
status = FS_NEW_FILE
elif not sha_exists and not file_exists:
raise Exception(fpath)
elif expected_digest == actual_digest:
status = FS_MATCH
else:
status = FS_MODIFIED
return FileStat(fpath, status, actual_digest, expected_digest)
def cmd_upload(dir):
all_files = list_files(dir, scan_new_files=True)
files_to_upload = []
for fs in ThreadPool(args.jobs).imap_unordered(get_file_status, all_files):
if fs.status in (FS_NEW_FILE, FS_MODIFIED):
files_to_upload.append(fs)
if len(files_to_upload) == 0:
if not args.quiet:
print('No modified or new files require uploading')
return 0
if args.dry_run:
return 0
if not args.quiet:
print('\n'.join(relpath(f.path) for f in files_to_upload))
print('')
print('About to upload %d files' % len(files_to_upload))
input('Press a key to continue or CTRL-C to abort')
def upload_one_file(fs):
assert (fs.actual_digest is not None)
dst_name = '%s/%s-%s' % (args.bucket, os.path.basename(
fs.path), fs.actual_digest)
cmd = ['gsutil', '-q', 'cp', '-n', '-a', 'public-read', fs.path, dst_name]
logging.debug(' '.join(cmd))
subprocess.check_call(cmd)
with open(fs.path + SUFFIX + '.swp', 'w') as f:
f.write(fs.actual_digest)
os.replace(fs.path + SUFFIX + '.swp', fs.path + SUFFIX)
return fs
map_concurrently(upload_one_file, files_to_upload)
return 0
def cmd_clean(dir):
all_files = list_files(dir, scan_new_files=True)
files_to_clean = []
for fs in ThreadPool(args.jobs).imap_unordered(get_file_status, all_files):
if fs.status in (FS_NEW_FILE, FS_MODIFIED):
files_to_clean.append(fs.path)
if len(files_to_clean) == 0:
if not args.quiet:
print('No modified or new files require cleaning')
return 0
if args.dry_run:
return 0
if not args.quiet:
print('\n'.join(relpath(f) for f in files_to_clean))
print('')
print('About to remove %d files' % len(files_to_clean))
input('Press a key to continue or CTRL-C to abort')
list(map(os.remove, files_to_clean))
return 0
def cmd_download(dir, overwrite_locally_modified=False):
files_to_download = []
modified = []
all_files = list_files(dir, scan_new_files=False)
for fs in ThreadPool(args.jobs).imap_unordered(get_file_status, all_files):
if fs.status == FS_MISSING:
files_to_download.append(fs)
elif fs.status == FS_MODIFIED:
modified.append(fs)
if len(modified) > 0 and not overwrite_locally_modified:
print('WARNING: The following files diverged locally and will NOT be ' +
'overwritten if you continue')
print('\n'.join(relpath(f.path) for f in modified))
print('')
print('Re run `download --overwrite` to overwrite locally modified files')
print('or `upload` to sync them on the GCS bucket')
print('')
input('Press a key to continue or CTRL-C to abort')
elif overwrite_locally_modified:
files_to_download += modified
if len(files_to_download) == 0:
if not args.quiet:
print('Nothing to do, all files are synced')
return 0
if not args.quiet:
print('Downloading %d files in //%s' %
(len(files_to_download), relpath(args.dir)))
if args.dry_run:
print('\n'.join(files_to_download))
return
def download_one_file(fs):
assert (fs.expected_digest is not None)
uri = '%s/%s-%s' % (args.bucket, os.path.basename(
fs.path), fs.expected_digest)
uri = uri.replace('gs://', 'https://storage.googleapis.com/')
logging.debug(uri)
tmp_path = fs.path + '.swp'
download(uri, tmp_path)
digest = hash_file(tmp_path)
if digest != fs.expected_digest:
raise Exception('Mismatching digest for %s. expected=%s, actual=%s' %
(uri, fs.expected_digest, digest))
os.replace(tmp_path, fs.path)
return fs
map_concurrently(download_one_file, files_to_download)
return 0
def cmd_status(dir):
files = list_files(dir, scan_new_files=True)
file_by_status = defaultdict(list)
num_files = 0
num_out_of_sync = 0
for fs in ThreadPool(args.jobs).imap_unordered(get_file_status, files):
file_by_status[fs.status].append(relpath(fs.path))
num_files += 1
for status, rpaths in sorted(file_by_status.items()):
if status == FS_NEW_FILE and args.ignore_new:
continue
if status != FS_MATCH:
for rpath in rpaths:
num_out_of_sync += 1
if not args.quiet:
print('%-15s: %s' % (status, rpath))
if num_out_of_sync == 0:
if not args.quiet:
print('Scanned %d files in //%s, everything in sync.' %
(num_files, relpath(dir)))
return 0
return 1
def main():
parser = argparse.ArgumentParser()
parser.add_argument('--dir', default=os.path.join(ROOT_DIR, 'test/data'))
parser.add_argument('--overwrite', action='store_true')
parser.add_argument('--bucket', default=BUCKET)
parser.add_argument('--jobs', '-j', default=10, type=int)
parser.add_argument('--dry-run', '-n', action='store_true')
parser.add_argument('--quiet', '-q', action='store_true')
parser.add_argument('--verbose', '-v', action='store_true')
parser.add_argument('--ignore-new', action='store_true')
parser.add_argument('cmd', choices=['status', 'download', 'upload', 'clean'])
global args
args = parser.parse_args()
logging.basicConfig(
format='%(asctime)s %(levelname).1s %(message)s',
level=logging.DEBUG if args.verbose else logging.INFO,
datefmt=r'%H:%M:%S')
if args.cmd == 'status':
return cmd_status(args.dir)
if args.cmd == 'download':
return cmd_download(args.dir, overwrite_locally_modified=args.overwrite)
if args.cmd == 'upload':
return cmd_upload(args.dir)
if args.cmd == 'clean':
return cmd_clean(args.dir)
print('Unknown command: %s' % args.cmd)
if __name__ == '__main__':
sys.exit(main())