blob: fa7661e1c6d35800f3e17febcf1cd1e68493ddbb [file] [log] [blame]
#!/usr/bin/env python3
"""
Script to synchronize (local>remote and viceversa) test data files from/to GCS.
//test/data files are not checked in the codebase because they are large binary
file and change frequently. Instead we check-in only xxx.sha256 files, which
contain the SHA-256 of the actual binary file, and sync them from a GCS bucket.
File in the GCS bucket are content-indexed as gs://bucket/file_name-a1b2c3f4 .
Usage:
./test_data status # Prints the status of new & modified files.
./test_data download # To sync remote>local (used by install-build-deps).
./test_data upload # To upload newly created and modified files.
"""
import argparse
import hashlib
import logging
import os
import random
import shutil
import subprocess
import sys
from multiprocessing.pool import ThreadPool
from collections import namedtuple, defaultdict
ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
BUCKET = 'gs://perfetto/test_data'
SUFFIX = '.sha256'
CACHE_DIR = os.getenv('PERFETTO_TEST_DATA_CACHE')
CACHE_LIMIT = 1_000_000_000 # 1GB
FS_MATCH = 'matches'
FS_NEW_FILE = 'needs upload'
FS_MODIFIED = 'modified'
FS_MISSING = 'needs download'
FileStat = namedtuple('FileStat',
['path', 'status', 'actual_digest', 'expected_digest'])
args = None
def relpath(path):
return os.path.relpath(path, ROOT_DIR)
def download(url, out_file):
subprocess.check_call(['curl', '-L', '-s', '-o', out_file, url])
def list_files(path, scan_new_files=False):
""" List files recursively in path.
If scan_new_files=False, returns only files with a maching xxx.sha256 tracker.
If scan_new_files=True returns all files including untracked ones.
"""
seen = set()
for root, _, files in os.walk(path):
for fname in files:
if fname.endswith('.swp'):
continue # Temporary files left around if CTRL-C-ing while downloading.
if fname in ["OWNERS", "README.md"]:
continue # OWNERS or README.md file should not be uploaded.
fpath = os.path.join(root, fname)
if not os.path.isfile(fpath) or fname.startswith('.'):
continue
if fpath.endswith(SUFFIX):
fpath = fpath[:-len(SUFFIX)]
elif not scan_new_files:
continue
if fpath not in seen:
seen.add(fpath)
yield fpath
def hash_file(fpath):
hasher = hashlib.sha256()
with open(fpath, 'rb') as f:
for chunk in iter(lambda: f.read(32768), b''):
hasher.update(chunk)
return hasher.hexdigest()
def map_concurrently(fn, files):
done = 0
for fs in ThreadPool(args.jobs).imap_unordered(fn, files):
assert (isinstance(fs, FileStat))
done += 1
if not args.quiet:
print(
'[%d/%d] %-60s' % (done, len(files), relpath(fs.path)[-60:]),
end='\r')
if not args.quiet:
print('')
def get_file_status(fpath):
sha_file = fpath + SUFFIX
sha_exists = os.path.exists(sha_file)
file_exists = os.path.exists(fpath)
actual_digest = None
expected_digest = None
if sha_exists:
with open(sha_file, 'r') as f:
expected_digest = f.readline().strip()
if file_exists:
actual_digest = hash_file(fpath)
if sha_exists and not file_exists:
status = FS_MISSING
elif not sha_exists and file_exists:
status = FS_NEW_FILE
elif not sha_exists and not file_exists:
raise Exception(fpath)
elif expected_digest == actual_digest:
status = FS_MATCH
else:
status = FS_MODIFIED
return FileStat(fpath, status, actual_digest, expected_digest)
def cmd_upload(dir):
all_files = list_files(dir, scan_new_files=True)
files_to_upload = []
for fs in ThreadPool(args.jobs).imap_unordered(get_file_status, all_files):
if fs.status in (FS_NEW_FILE, FS_MODIFIED):
files_to_upload.append(fs)
if len(files_to_upload) == 0:
if not args.quiet:
print('No modified or new files require uploading')
return 0
if args.dry_run:
return 0
if not args.quiet:
print('About to upload %d files:' % len(files_to_upload))
print('\n'.join(relpath(f.path) for f in files_to_upload))
print('')
input('Press a key to continue or CTRL-C to abort')
def upload_one_file(fs):
assert (fs.actual_digest is not None)
dst_name = '%s/%s-%s' % (args.bucket, os.path.basename(
fs.path), fs.actual_digest)
cmd = ['gsutil', '-q', 'cp', '-n', '-a', 'public-read', fs.path, dst_name]
logging.debug(' '.join(cmd))
subprocess.check_call(cmd)
with open(fs.path + SUFFIX + '.swp', 'w') as f:
f.write(fs.actual_digest)
os.replace(fs.path + SUFFIX + '.swp', fs.path + SUFFIX)
return fs
map_concurrently(upload_one_file, files_to_upload)
return 0
def cmd_download(dir, overwrite_locally_modified=False):
files_to_download = []
modified = []
all_files = list_files(dir, scan_new_files=False)
for fs in ThreadPool(args.jobs).imap_unordered(get_file_status, all_files):
if fs.status == FS_MISSING:
files_to_download.append(fs)
elif fs.status == FS_MODIFIED:
modified.append(fs)
if len(modified) > 0 and not overwrite_locally_modified:
print('WARNING: The following files diverged locally and will NOT be ' +
'overwritten if you continue')
print('\n'.join(relpath(f.path) for f in modified))
print('')
print('Re run `download --overwrite` to overwrite locally modified files')
print('or `upload` to sync them on the GCS bucket')
print('')
input('Press a key to continue or CTRL-C to abort')
elif overwrite_locally_modified:
files_to_download += modified
if len(files_to_download) == 0:
if not args.quiet:
print('Nothing to do, all files are synced')
return 0
if not args.quiet:
print('Downloading %d files in //%s' %
(len(files_to_download), relpath(args.dir)))
if args.dry_run:
print('\n'.join(files_to_download))
return
def download_one_file(fs):
assert (fs.expected_digest is not None)
tmp_path = fs.path + '.swp'
uri = '%s/%s-%s' % (args.bucket, os.path.basename(
fs.path), fs.expected_digest)
uri = uri.replace('gs://', 'https://storage.googleapis.com/')
cache_path = None
cache_hit = False
if CACHE_DIR:
cache_path = os.path.join(CACHE_DIR, fs.expected_digest)
if cache_path and os.path.exists(cache_path):
shutil.copy(cache_path, tmp_path)
cache_hit = True
else:
logging.debug(uri)
download(uri, tmp_path)
digest = hash_file(tmp_path)
if digest != fs.expected_digest:
if cache_path and os.path.exists(cache_path):
os.remove(cache_path)
raise Exception('Mismatching digest for %s. expected=%s, actual=%s' %
(uri, fs.expected_digest, digest))
if cache_path and not cache_hit:
shutil.copy(tmp_path, cache_path)
os.replace(tmp_path, fs.path)
return fs
map_concurrently(download_one_file, files_to_download)
# Keep the cache bound.
if CACHE_DIR:
files = [os.path.join(CACHE_DIR, x) for x in os.listdir(CACHE_DIR)]
fsizes = dict(map(lambda fpath: (fpath, os.path.getsize(fpath)), files))
while sum(fsizes.values()) > CACHE_LIMIT:
fpath = random.choice(list(fsizes.keys()))
os.remove(fpath)
del fsizes[fpath]
return 0
def cmd_status(dir):
files = list_files(dir, scan_new_files=True)
file_by_status = defaultdict(list)
num_files = 0
num_out_of_sync = 0
for fs in ThreadPool(args.jobs).imap_unordered(get_file_status, files):
file_by_status[fs.status].append(relpath(fs.path))
num_files += 1
for status, rpaths in sorted(file_by_status.items()):
if status == FS_NEW_FILE and args.ignore_new:
continue
if status != FS_MATCH:
for rpath in rpaths:
num_out_of_sync += 1
if not args.quiet:
print('%-15s: %s' % (status, rpath))
if num_out_of_sync == 0:
if not args.quiet:
print('Scanned %d files in //%s, everything in sync.' %
(num_files, relpath(dir)))
return 0
return 1
def main():
parser = argparse.ArgumentParser()
parser.add_argument('--dir', default=os.path.join(ROOT_DIR, 'test/data'))
parser.add_argument('--overwrite', action='store_true')
parser.add_argument('--bucket', default=BUCKET)
parser.add_argument('--jobs', '-j', default=10, type=int)
parser.add_argument('--dry-run', '-n', action='store_true')
parser.add_argument('--quiet', '-q', action='store_true')
parser.add_argument('--verbose', '-v', action='store_true')
parser.add_argument('--ignore-new', action='store_true')
parser.add_argument('cmd', choices=['status', 'download', 'upload'])
global args
args = parser.parse_args()
logging.basicConfig(
format='%(asctime)s %(levelname).1s %(message)s',
level=logging.DEBUG if args.verbose else logging.INFO,
datefmt=r'%H:%M:%S')
if args.cmd == 'status':
return cmd_status(args.dir)
if args.cmd == 'download':
return cmd_download(args.dir, overwrite_locally_modified=args.overwrite)
if args.cmd == 'upload':
return cmd_upload(args.dir)
print('Unknown command: %s' % args.cmd)
if __name__ == '__main__':
sys.exit(main())