Primiano Tucci | 3d4217d | 2021-11-05 11:11:51 +0000 | [diff] [blame] | 1 | #!/usr/bin/env python3 |
| 2 | """ |
| 3 | Script to synchronize (local>remote and viceversa) test data files from/to GCS. |
| 4 | |
| 5 | //test/data files are not checked in the codebase because they are large binary |
| 6 | file and change frequently. Instead we check-in only xxx.sha256 files, which |
| 7 | contain the SHA-256 of the actual binary file, and sync them from a GCS bucket. |
| 8 | |
| 9 | File in the GCS bucket are content-indexed as gs://bucket/file_name-a1b2c3f4 . |
| 10 | |
| 11 | Usage: |
| 12 | ./test_data status # Prints the status of new & modified files. |
| 13 | ./test_data download # To sync remote>local (used by install-build-deps). |
| 14 | ./test_data upload # To upload newly created and modified files. |
| 15 | """ |
| 16 | |
| 17 | import argparse |
| 18 | import logging |
| 19 | import os |
| 20 | import sys |
| 21 | import hashlib |
| 22 | import subprocess |
| 23 | |
| 24 | from multiprocessing.pool import ThreadPool |
| 25 | from collections import namedtuple, defaultdict |
| 26 | |
| 27 | ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) |
| 28 | BUCKET = 'gs://perfetto/test_data' |
| 29 | SUFFIX = '.sha256' |
| 30 | |
| 31 | FS_MATCH = 'matches' |
| 32 | FS_NEW_FILE = 'needs upload' |
| 33 | FS_MODIFIED = 'modified' |
| 34 | FS_MISSING = 'needs download' |
| 35 | |
| 36 | FileStat = namedtuple('FileStat', |
| 37 | ['path', 'status', 'actual_digest', 'expected_digest']) |
| 38 | args = None |
| 39 | |
| 40 | |
| 41 | def relpath(path): |
| 42 | return os.path.relpath(path, ROOT_DIR) |
| 43 | |
| 44 | |
| 45 | def download(url, out_file): |
| 46 | subprocess.check_call(['curl', '-L', '-s', '-o', out_file, url]) |
| 47 | |
| 48 | |
| 49 | def list_files(path, scan_new_files=False): |
| 50 | """ List files recursively in path. |
| 51 | |
| 52 | If scan_new_files=False, returns only files with a maching xxx.sha256 tracker. |
| 53 | If scan_new_files=True returns all files including untracked ones. |
| 54 | """ |
| 55 | seen = set() |
| 56 | for root, _, files in os.walk(path): |
| 57 | for fname in files: |
| 58 | if fname.endswith('.swp'): |
| 59 | continue # Temporary files left around if CTRL-C-ing while downloading. |
Rasika Navarange | 081eb56 | 2023-11-20 14:23:37 +0000 | [diff] [blame] | 60 | if fname in ["OWNERS", "README.md"]: |
| 61 | continue # OWNERS or README.md file should not be uploaded. |
Primiano Tucci | 3d4217d | 2021-11-05 11:11:51 +0000 | [diff] [blame] | 62 | fpath = os.path.join(root, fname) |
| 63 | if not os.path.isfile(fpath) or fname.startswith('.'): |
| 64 | continue |
| 65 | if fpath.endswith(SUFFIX): |
| 66 | fpath = fpath[:-len(SUFFIX)] |
| 67 | elif not scan_new_files: |
| 68 | continue |
| 69 | if fpath not in seen: |
| 70 | seen.add(fpath) |
| 71 | yield fpath |
| 72 | |
| 73 | |
| 74 | def hash_file(fpath): |
| 75 | hasher = hashlib.sha256() |
| 76 | with open(fpath, 'rb') as f: |
| 77 | for chunk in iter(lambda: f.read(32768), b''): |
| 78 | hasher.update(chunk) |
| 79 | return hasher.hexdigest() |
| 80 | |
| 81 | |
| 82 | def map_concurrently(fn, files): |
| 83 | done = 0 |
| 84 | for fs in ThreadPool(args.jobs).imap_unordered(fn, files): |
| 85 | assert (isinstance(fs, FileStat)) |
| 86 | done += 1 |
| 87 | if not args.quiet: |
| 88 | print( |
| 89 | '[%d/%d] %-60s' % (done, len(files), relpath(fs.path)[-60:]), |
| 90 | end='\r') |
| 91 | if not args.quiet: |
| 92 | print('') |
| 93 | |
| 94 | |
| 95 | def get_file_status(fpath): |
| 96 | sha_file = fpath + SUFFIX |
| 97 | sha_exists = os.path.exists(sha_file) |
| 98 | file_exists = os.path.exists(fpath) |
| 99 | actual_digest = None |
| 100 | expected_digest = None |
| 101 | if sha_exists: |
| 102 | with open(sha_file, 'r') as f: |
| 103 | expected_digest = f.readline().strip() |
| 104 | if file_exists: |
| 105 | actual_digest = hash_file(fpath) |
| 106 | if sha_exists and not file_exists: |
| 107 | status = FS_MISSING |
| 108 | elif not sha_exists and file_exists: |
| 109 | status = FS_NEW_FILE |
| 110 | elif not sha_exists and not file_exists: |
| 111 | raise Exception(fpath) |
| 112 | elif expected_digest == actual_digest: |
| 113 | status = FS_MATCH |
| 114 | else: |
| 115 | status = FS_MODIFIED |
| 116 | return FileStat(fpath, status, actual_digest, expected_digest) |
| 117 | |
| 118 | |
| 119 | def cmd_upload(dir): |
| 120 | all_files = list_files(dir, scan_new_files=True) |
| 121 | files_to_upload = [] |
| 122 | for fs in ThreadPool(args.jobs).imap_unordered(get_file_status, all_files): |
| 123 | if fs.status in (FS_NEW_FILE, FS_MODIFIED): |
| 124 | files_to_upload.append(fs) |
| 125 | if len(files_to_upload) == 0: |
| 126 | if not args.quiet: |
| 127 | print('No modified or new files require uploading') |
| 128 | return 0 |
| 129 | if args.dry_run: |
| 130 | return 0 |
| 131 | if not args.quiet: |
| 132 | print('About to upload %d files:' % len(files_to_upload)) |
| 133 | print('\n'.join(relpath(f.path) for f in files_to_upload)) |
| 134 | print('') |
| 135 | input('Press a key to continue or CTRL-C to abort') |
| 136 | |
| 137 | def upload_one_file(fs): |
| 138 | assert (fs.actual_digest is not None) |
| 139 | dst_name = '%s/%s-%s' % (args.bucket, os.path.basename( |
| 140 | fs.path), fs.actual_digest) |
Andrew Shulaev | c2bd001 | 2021-11-15 10:28:04 +0000 | [diff] [blame] | 141 | cmd = ['gsutil', '-q', 'cp', '-n', '-a', 'public-read', fs.path, dst_name] |
Primiano Tucci | 3d4217d | 2021-11-05 11:11:51 +0000 | [diff] [blame] | 142 | logging.debug(' '.join(cmd)) |
| 143 | subprocess.check_call(cmd) |
| 144 | with open(fs.path + SUFFIX + '.swp', 'w') as f: |
| 145 | f.write(fs.actual_digest) |
Primiano Tucci | e57ed09 | 2021-12-07 14:00:50 +0000 | [diff] [blame] | 146 | os.replace(fs.path + SUFFIX + '.swp', fs.path + SUFFIX) |
Primiano Tucci | 3d4217d | 2021-11-05 11:11:51 +0000 | [diff] [blame] | 147 | return fs |
| 148 | |
| 149 | map_concurrently(upload_one_file, files_to_upload) |
| 150 | return 0 |
| 151 | |
| 152 | |
| 153 | def cmd_download(dir, overwrite_locally_modified=False): |
| 154 | files_to_download = [] |
| 155 | modified = [] |
| 156 | all_files = list_files(dir, scan_new_files=False) |
| 157 | for fs in ThreadPool(args.jobs).imap_unordered(get_file_status, all_files): |
| 158 | if fs.status == FS_MISSING: |
| 159 | files_to_download.append(fs) |
| 160 | elif fs.status == FS_MODIFIED: |
| 161 | modified.append(fs) |
| 162 | |
| 163 | if len(modified) > 0 and not overwrite_locally_modified: |
| 164 | print('WARNING: The following files diverged locally and will NOT be ' + |
| 165 | 'overwritten if you continue') |
| 166 | print('\n'.join(relpath(f.path) for f in modified)) |
| 167 | print('') |
| 168 | print('Re run `download --overwrite` to overwrite locally modified files') |
| 169 | print('or `upload` to sync them on the GCS bucket') |
| 170 | print('') |
| 171 | input('Press a key to continue or CTRL-C to abort') |
| 172 | elif overwrite_locally_modified: |
| 173 | files_to_download += modified |
| 174 | |
| 175 | if len(files_to_download) == 0: |
| 176 | if not args.quiet: |
| 177 | print('Nothing to do, all files are synced') |
| 178 | return 0 |
| 179 | |
| 180 | if not args.quiet: |
| 181 | print('Downloading %d files in //%s' % |
| 182 | (len(files_to_download), relpath(args.dir))) |
| 183 | if args.dry_run: |
| 184 | print('\n'.join(files_to_download)) |
| 185 | return |
| 186 | |
| 187 | def download_one_file(fs): |
| 188 | assert (fs.expected_digest is not None) |
| 189 | uri = '%s/%s-%s' % (args.bucket, os.path.basename( |
| 190 | fs.path), fs.expected_digest) |
| 191 | uri = uri.replace('gs://', 'https://storage.googleapis.com/') |
| 192 | logging.debug(uri) |
| 193 | tmp_path = fs.path + '.swp' |
| 194 | download(uri, tmp_path) |
| 195 | digest = hash_file(tmp_path) |
| 196 | if digest != fs.expected_digest: |
| 197 | raise Exception('Mismatching digest for %s. expected=%s, actual=%s' % |
| 198 | (uri, fs.expected_digest, digest)) |
Primiano Tucci | e57ed09 | 2021-12-07 14:00:50 +0000 | [diff] [blame] | 199 | os.replace(tmp_path, fs.path) |
Primiano Tucci | 3d4217d | 2021-11-05 11:11:51 +0000 | [diff] [blame] | 200 | return fs |
| 201 | |
| 202 | map_concurrently(download_one_file, files_to_download) |
| 203 | return 0 |
| 204 | |
| 205 | |
| 206 | def cmd_status(dir): |
| 207 | files = list_files(dir, scan_new_files=True) |
| 208 | file_by_status = defaultdict(list) |
| 209 | num_files = 0 |
| 210 | num_out_of_sync = 0 |
| 211 | for fs in ThreadPool(args.jobs).imap_unordered(get_file_status, files): |
| 212 | file_by_status[fs.status].append(relpath(fs.path)) |
| 213 | num_files += 1 |
| 214 | for status, rpaths in sorted(file_by_status.items()): |
Primiano Tucci | 856f2be | 2022-03-17 17:25:12 +0000 | [diff] [blame] | 215 | if status == FS_NEW_FILE and args.ignore_new: |
| 216 | continue |
Primiano Tucci | 3d4217d | 2021-11-05 11:11:51 +0000 | [diff] [blame] | 217 | if status != FS_MATCH: |
| 218 | for rpath in rpaths: |
| 219 | num_out_of_sync += 1 |
| 220 | if not args.quiet: |
| 221 | print('%-15s: %s' % (status, rpath)) |
| 222 | if num_out_of_sync == 0: |
| 223 | if not args.quiet: |
| 224 | print('Scanned %d files in //%s, everything in sync.' % |
| 225 | (num_files, relpath(dir))) |
| 226 | return 0 |
| 227 | return 1 |
| 228 | |
| 229 | |
| 230 | def main(): |
| 231 | parser = argparse.ArgumentParser() |
| 232 | parser.add_argument('--dir', default=os.path.join(ROOT_DIR, 'test/data')) |
| 233 | parser.add_argument('--overwrite', action='store_true') |
| 234 | parser.add_argument('--bucket', default=BUCKET) |
| 235 | parser.add_argument('--jobs', '-j', default=10, type=int) |
| 236 | parser.add_argument('--dry-run', '-n', action='store_true') |
| 237 | parser.add_argument('--quiet', '-q', action='store_true') |
| 238 | parser.add_argument('--verbose', '-v', action='store_true') |
Primiano Tucci | 856f2be | 2022-03-17 17:25:12 +0000 | [diff] [blame] | 239 | parser.add_argument('--ignore-new', action='store_true') |
Primiano Tucci | 3d4217d | 2021-11-05 11:11:51 +0000 | [diff] [blame] | 240 | parser.add_argument('cmd', choices=['status', 'download', 'upload']) |
| 241 | global args |
| 242 | args = parser.parse_args() |
| 243 | logging.basicConfig( |
| 244 | format='%(asctime)s %(levelname).1s %(message)s', |
| 245 | level=logging.DEBUG if args.verbose else logging.INFO, |
| 246 | datefmt=r'%H:%M:%S') |
| 247 | if args.cmd == 'status': |
| 248 | return cmd_status(args.dir) |
| 249 | if args.cmd == 'download': |
| 250 | return cmd_download(args.dir, overwrite_locally_modified=args.overwrite) |
| 251 | if args.cmd == 'upload': |
| 252 | return cmd_upload(args.dir) |
| 253 | print('Unknown command: %s' % args.cmd) |
| 254 | |
| 255 | |
| 256 | if __name__ == '__main__': |
| 257 | sys.exit(main()) |