| #!/usr/bin/env python3 |
| # Copyright (C) 2019 The Android Open Source Project |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| ''' Worker main loop. Pulls jobs from the DB and runs them in the sandbox |
| |
| It also handles timeouts and graceful container termination. |
| ''' |
| |
| import logging |
| import os |
| import random |
| import signal |
| import socket |
| import subprocess |
| import threading |
| import time |
| import traceback |
| |
| from config import DB, JOB_TIMEOUT_SEC |
| from common_utils import req, utc_now_iso, init_logging |
| from common_utils import ConcurrentModificationError, SCOPES |
| |
| CUR_DIR = os.path.dirname(__file__) |
| SCOPES.append('https://www.googleapis.com/auth/firebase.database') |
| SCOPES.append('https://www.googleapis.com/auth/userinfo.email') |
| WORKER_NAME = '%s-%s' % (os.getenv( |
| 'WORKER_HOST', 'local').split('-')[-1], socket.gethostname()) |
| sigterm = threading.Event() |
| |
| |
| def try_acquire_job(job_id): |
| ''' Transactionally acquire the given job. |
| |
| Returns the job JSON object if it managed to acquire and put it into the |
| STARTED state, None if another worker got there first. |
| ''' |
| logging.debug('Trying to acquire job %s', job_id) |
| |
| uri = '%s/jobs/%s.json' % (DB, job_id) |
| job, etag = req('GET', uri, req_etag=True) |
| if job['status'] != 'QUEUED': |
| return None # Somebody else took it |
| try: |
| job['status'] = 'STARTED' |
| job['time_started'] = utc_now_iso() |
| job['worker'] = WORKER_NAME |
| req('PUT', uri, body=job, etag=etag) |
| return job |
| except ConcurrentModificationError: |
| return None |
| |
| |
| def make_worker_obj(status, job_id=None): |
| return { |
| 'job_id': job_id, |
| 'status': status, |
| 'last_update': utc_now_iso(), |
| 'host': os.getenv('WORKER_HOST', '') |
| } |
| |
| |
| def worker_loop(): |
| ''' Pulls a job from the queue and runs it invoking run_job.py ''' |
| uri = '%s/jobs_queued.json?orderBy="$key"&limitToLast=10' % DB |
| jobs = req('GET', uri) |
| if not jobs: |
| return |
| |
| # Work out the worker number from the hostname. We try to distribute the load |
| # (via the time.sleep below) so that we fill first all the worker-1 of each |
| # vm, then worker-2 and so on. This is designed so that if there is only one |
| # CL (hence N jobs) in the queue, each VM gets only one job, maximizing the |
| # cpu efficiency of each VM. |
| try: |
| worker_num = int(socket.gethostname().split('-')[-1]) |
| except ValueError: |
| worker_num = 1 |
| |
| # Transactionally acquire a job. Deal with races (two workers trying to |
| # acquire the same job). |
| job = None |
| job_id = None |
| for job_id in sorted(jobs.keys(), reverse=True): |
| job = try_acquire_job(job_id) |
| if job is not None: |
| break |
| logging.info('Raced while trying to acquire job %s, retrying', job_id) |
| time.sleep(worker_num * 2 + random.random()) |
| if job is None: |
| logging.error('Failed to acquire a job') |
| return |
| |
| logging.info('Starting job %s', job_id) |
| |
| # Update the db, move the job to the running queue. |
| patch_obj = { |
| 'jobs_queued/' + job_id: {}, # = DELETE |
| 'jobs_running/' + job_id: { |
| 'worker': WORKER_NAME |
| }, |
| 'workers/' + WORKER_NAME: make_worker_obj('RUNNING', job_id=job_id) |
| } |
| req('PATCH', '%s.json' % DB, body=patch_obj) |
| |
| cmd = [os.path.join(CUR_DIR, 'run_job.py'), job_id] |
| |
| # Propagate the worker's PERFETTO_ vars and merge with the job-specific vars. |
| env = dict(os.environ, **{k: str(v) for (k, v) in job['env'].items()}) |
| job_runner = subprocess.Popen(cmd, env=env) |
| |
| # Run the job in a python subprocess, to isolate the main loop from logs |
| # uploader failures. |
| res = None |
| cancelled = False |
| timed_out = False |
| time_started = time.time() |
| time_last_db_poll = time_started |
| polled_status = 'STARTED' |
| while res is None: |
| time.sleep(0.25) |
| res = job_runner.poll() |
| now = time.time() |
| if now - time_last_db_poll > 10: # Throttle DB polling. |
| polled_status = req('GET', '%s/jobs/%s/status.json' % (DB, job_id)) |
| time_last_db_poll = now |
| if now - time_started > JOB_TIMEOUT_SEC: |
| logging.info('Job %s timed out, terminating', job_id) |
| timed_out = True |
| job_runner.terminate() |
| if (sigterm.is_set() or polled_status != 'STARTED') and not cancelled: |
| logging.info('Job %s cancelled, terminating', job_id) |
| cancelled = True |
| job_runner.terminate() |
| |
| status = ('INTERRUPTED' if sigterm.is_set() else 'CANCELLED' if cancelled else |
| 'TIMED_OUT' if timed_out else 'COMPLETED' if res == 0 else 'FAILED') |
| logging.info('Job %s %s with code %s', job_id, status, res) |
| |
| # Update the DB, unless the job has been cancelled. The "is not None" |
| # condition deals with a very niche case, that is, avoid creating a partial |
| # job entry after doing a full clear of the DB (which is super rare, happens |
| # only when re-deploying the CI). |
| if polled_status is not None: |
| patch = { |
| 'jobs/%s/status' % job_id: status, |
| 'jobs/%s/exit_code' % job_id: {} if res is None else res, |
| 'jobs/%s/time_ended' % job_id: utc_now_iso(), |
| 'jobs_running/%s' % job_id: {}, # = DELETE |
| } |
| req('PATCH', '%s.json' % (DB), body=patch) |
| |
| |
| def sig_handler(_, __): |
| logging.warning('Interrupted by signal, exiting worker') |
| sigterm.set() |
| |
| |
| def main(): |
| init_logging() |
| logging.info('Worker started') |
| signal.signal(signal.SIGTERM, sig_handler) |
| signal.signal(signal.SIGINT, sig_handler) |
| |
| while not sigterm.is_set(): |
| logging.debug('Starting poll cycle') |
| try: |
| worker_loop() |
| req('PUT', |
| '%s/workers/%s.json' % (DB, WORKER_NAME), |
| body=make_worker_obj('IDLE')) |
| except: |
| logging.error('Exception in worker loop:\n%s', traceback.format_exc()) |
| if sigterm.is_set(): |
| break |
| |
| # Synchronize sleeping with the wall clock. This is so all VMs wake up at |
| # the same time. See comment on distributing load above in this file. |
| poll_time_sec = 5 |
| time.sleep(poll_time_sec - (time.time() % poll_time_sec)) |
| |
| # The use case here is the VM being terminated by the GCE infrastructure. |
| # We mark the worker as terminated and the job as cancelled so we don't wait |
| # forever for it. |
| logging.warning('Exiting the worker loop, got signal: %s', sigterm.is_set()) |
| req('PUT', |
| '%s/workers/%s.json' % (DB, WORKER_NAME), |
| body=make_worker_obj('TERMINATED')) |
| |
| |
| if __name__ == '__main__': |
| main() |