blob: e1dc333b5ee4214c14464fb01f45235db63517d1 [file] [log] [blame]
# Copyright 2019 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Launch and retry swarming jobs until they pass or we hit max attempts."""
import itertools
from urllib.parse import urlparse
import attr
from recipe_engine import recipe_api
from PB.go.chromium.org.luci.buildbucket.proto import common as common_pb2
from RECIPE_MODULES.fuchsia.utils import pluralize
DEFAULT_MAX_ATTEMPTS = 2
@attr.s
class Attempt:
"""References a specific attempt of a task."""
task_id = attr.ib(type=str)
index = attr.ib(type=int, default=None) # Number of prior attempts.
host = attr.ib(type=str, default=None)
task_ui_link = attr.ib(type=str, default=None)
result = attr.ib(default=None)
# This attribute should be set by overrides of Task.process_result(). It
# indicates that even though at the swarming level the task may have
# passed something failed inside that larger task.
failure_reason = attr.ib(type=str, default="")
has_flakes = attr.ib(type=bool, default=False)
task_outputs_link = attr.ib(type=str, default=None)
logs = attr.ib(type=dict, default=attr.Factory(dict))
def __attrs_post_init__(self):
# The led module gives the host and the id, but the swarming module
# gives the link and the id. Require the id (since it has no default
# above) and require either the host or task_ui_link attributes.
assert self.host or self.task_ui_link
@property
def name(self):
return f"attempt {int(self.index)}"
@property
def success(self):
if self.failure_reason:
return False
if not self.result: # pragma: nocover
return False
return self.result.status == common_pb2.Status.SUCCESS
class TaskTracker:
"""TaskTracker tracks state about attempts to run a task.
TaskTracker runs the task until we get run_count successes. Usually
run_count is 1, for running regular tests, but run_count may be >1 when
gathering results of performance tests.
"""
# States returned by _get_state()
_LAUNCH_MORE = "launch_more"
_IN_PROGRESS = "in_progress"
_OVERALL_SUCCESS = "overall_success"
_OVERALL_FAILURE = "overall_failure"
def __init__(self, api, task, run_count):
"""
Args:
api: recipe_api.RecipeApiPlain object.
task: Task object.
run_count: number of successful runs we want to get for the task.
"""
self._api = api
self._task = task
self._attempts = []
self._in_progress_attempts = []
self._successes_required = run_count
self._successes_got = 0
self._failures_got = 0
self._flakes_got = 0
@property
def name(self):
return self._task.name
@property
def abort_early_if_failed(self):
return self._task.abort_early_if_failed
# Returns a pair (state, number_to_launch), where number_to_launch
# is the number of new task attempts to be launched.
def _get_state(self):
if len(self._in_progress_attempts) != 0:
return self._IN_PROGRESS, 0
if self._successes_got >= self._successes_required:
return self._OVERALL_SUCCESS, 0
# We treat the max_attempts parameter as a multiplier, basically
# "max attempts per successful run needed", so that the same
# max_attempts value can be used for both perfcompare and regular
# builders.
attempts_allowed = self._task.max_attempts * self._successes_required
remaining_needed = self._successes_required - self._successes_got
remaining_allowed = attempts_allowed - len(self._attempts)
if remaining_needed > remaining_allowed:
return self._OVERALL_FAILURE, 0
# Apply the "no futile retries" strategy: If we need multiple
# successful runs but we see no successes in the first batch of
# attempts, don't do any retries, on the grounds that the build
# we're testing is probably bad (i.e. it won't pass if retried).
# This is intended to avoid wasting time and infra capacity.
if (self._successes_required > 1 and self._successes_got == 0 and
len(self._attempts) >= self._successes_required):
return self._OVERALL_FAILURE, 0
return self._LAUNCH_MORE, remaining_needed
def should_launch(self):
_, number_to_launch = self._get_state()
return number_to_launch > 0
# Launch one or more task attempts. This assumes that should_launch()
# was previously called and returned True.
def launch(self):
state, number_to_launch = self._get_state()
assert state == self._LAUNCH_MORE, state
assert number_to_launch > 0
# Don't increase the priority if we need multiple successful runs (used
# for perfcompare mode).
if self._successes_required > 1:
priority_boost_amount = 0
else:
# Boost the priority by the number of previous attempts. This means
# that second attempts will take priority over first attempts, third
# attempts will take priority over second attempts, etc.
#
# This means that if there is a long queue for Swarming tasks to run,
# only the first attempts should wait. Subsequent attempts should
# jump ahead in the queue.
priority_boost_amount = len(self._attempts)
task_ids = []
for _ in range(number_to_launch):
attempt_index = len(self._attempts)
task_name = f"{self.name} (attempt {int(attempt_index)})"
with self._api.step.nest(task_name) as presentation:
attempt = self._task.launch(priority_boost_amount)
attempt.index = attempt_index
self._attempts.append(attempt)
self._in_progress_attempts.append(attempt)
task_ids.append(attempt.task_id)
presentation.links["Swarming task"] = attempt.task_ui_link
return task_ids
@property
def attempts(self): # pragma: no cover
return self._attempts[:]
@property
def in_progress(self):
state, _ = self._get_state()
return state == self._IN_PROGRESS
@property
def success(self):
state, _ = self._get_state()
return state == self._OVERALL_SUCCESS
@property
def failed(self):
return not self.success and not self.in_progress
def failed_after_max_attempts(self):
state, _ = self._get_state()
return state == self._OVERALL_FAILURE
def has_flakes(self):
return self._flakes_got > 0 or (
self._successes_got > 0 and self._failures_got > 0
)
def process_result(self, attempt, result):
with self._api.step.nest(result.builder.builder):
self._in_progress_attempts.remove(attempt)
attempt.result = result
try:
self._task.process_result(attempt)
except recipe_api.StepFailure as e:
error_step = self._api.step.empty("exception")
error_step.presentation.step_summary_text = str(e)
attempt.failure_reason = "exception during result processing"
if e.name and e.exc_result:
# The error name generally contains the name of the step
# that failed. The full step name will already be namespaced
# by task name, so present everything after the task name
# since the failure_reason will only be presented in the
# context of this task.
attempt.failure_reason += f": {e.name.split(self.name + '.')[-1]} (retcode {e.exc_result.retcode})"
trace_lines = self._api.utils.traceback_format_exc().splitlines()
attempt.logs["exception"] = trace_lines
error_step.presentation.logs["exception"] = trace_lines
if attempt.success:
self._successes_got += 1
if attempt.has_flakes:
self._flakes_got += 1
else:
self._failures_got += 1
def present(self, **kwargs):
"""Present this task when summarizing results at the end of the run.
Args:
**kwargs (Dict): passed through to present_attempt()
Returns:
None
"""
with self._api.step.nest(self.name) as task_step_presentation:
for attempt in self._attempts:
self._task.present_attempt(task_step_presentation, attempt, **kwargs)
# Show incomplete tasks in green so as not to be confused with
# actual failures.
if self.success or self.in_progress:
task_step_presentation.status = self._api.step.SUCCESS
else:
task_step_presentation.status = self._api.step.FAILURE
class Task:
"""A Task object describes:
* How to launch a task.
* How to process and present the results from a task.
This class is meant to be subclassed. Subclasses must define a launch()
method.
In most cases Task.max_attempts should be left alone. If the caller wants
to ensure a task has a larger or smaller number of max attempts than the
default for other tasks, set max_attempts to that number.
"""
def __init__(self, api, name):
"""Initializer.
Args:
api: recipe_api.RecipeApiPlain object.
name: str, human readable name of this task
"""
self._api = api
self.name = name
self.max_attempts = None
self.abort_early_if_failed = False
def process_result(self, attempt):
"""Examine the result in the given attempt for failures.
Subclasses can set attempt.failure_reason if they find a failure inside
attempt.result. failure_reason should be a short summary of the failure
(< 50 chars).
This is invoked shortly after api.swarming.collect() returns that a
task completed. It cannot assume the swarming task completed
successfully.
This is a no-op here but can be overridden by subclasses.
Returns:
None
"""
def present_attempt(self, task_step_presentation, attempt, **kwargs):
"""Present an Attempt when summarizing results at the end of the run.
Args:
task_step_presentation (StepPresentation): assuming present() was not
overridden, this will always be for a step titled after the current
task
attempt (Attempt): the Attempt to present
**kwargs (Dict): pass-through arguments for subclasses
This method will be invoked to show details of an Attempt. This base
class method just creates a link to the swarming results from the task,
but subclasses are free to create a step with much more elaborate
details of results.
Returns:
None
"""
del kwargs # Unused.
name = f"{attempt.name} ({'pass' if attempt.success else 'fail'})"
task_step_presentation.links[name] = attempt.task_ui_link
def launch(self, priority_boost_amount):
"""Launch the task (using Swarming, led, or something else).
Args:
priority_boost_amount (int): Non-negative integer specifying how much
the priority of the task should be increased from the default.
Returns:
Attempt object, with the task_id or host property filled out from
from the Swarming or led result.
"""
assert False, "Subclasses must define launch() method." # pragma: no cover
class LedTask(Task):
def __init__(self, led_data, *args, **kwargs):
super().__init__(*args, **kwargs)
self._led_data = led_data
build = led_data.result.buildbucket.bbagent_args.build
self._backend_config = build.infra.backend.config
self._original_priority = self._backend_config['priority']
def launch(self, priority_boost_amount):
assert self._led_data
# For Swarming tasks, numerically lower priority values are logically
# higher priorities, so use subtraction here.
new_priority = self._original_priority - priority_boost_amount
# Although we modify this data structure in place, one launch()
# invocation should not affect later launch() invocations because this
# 'priority' field is always overwritten.
self._backend_config['priority'] = new_priority
if priority_boost_amount != 0:
with self._api.step.nest("increase priority") as pres:
pres.step_summary_text = (
f"from {int(self._original_priority)} to {int(new_priority)}"
)
res = self._led_data.then("launch", "-real-build")
host = res.launch_result.swarming_hostname
task_id = res.launch_result.task_id or str(res.launch_result.build_id)
build_url_swarming = 'https://ci.chromium.org/swarming/task/%s?server=%s' % (
task_id,
res.launch_result.swarming_hostname,
)
build_url_bb = 'https://%s/build/%s' % (
res.launch_result.buildbucket_hostname, task_id
)
build_url = build_url_swarming if res.launch_result.task_id else build_url_bb
return self._api.swarming_retry.Attempt(
host=host,
task_id=task_id,
# Use Milo since this task is running a recipe.
task_ui_link=build_url,
)
class RetrySwarmingApi(recipe_api.RecipeApi):
"""Launch and retry swarming jobs until they pass or we hit max attempts."""
Task = Task # pylint: disable=invalid-name
LedTask = LedTask # pylint: disable=invalid-name
Attempt = Attempt # pylint: disable=invalid-name
DEFAULT_MAX_ATTEMPTS = DEFAULT_MAX_ATTEMPTS
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._task_ids_seen = set()
def run_and_present_tasks(self, tasks, **kwargs):
tasks = self.run_tasks(tasks, **kwargs)
self.present_tasks(tasks)
self.raise_failures(tasks)
def _is_complete(self, result):
return result.status not in {
common_pb2.Status.STATUS_UNSPECIFIED,
common_pb2.Status.SCHEDULED,
common_pb2.Status.STARTED,
}
def _get_tasks_to_launch(self, tasks):
if any(task.failed_after_max_attempts() for task in tasks):
# The build has failed overall, so disable launching any
# further task attempts.
return []
return [task for task in tasks if task.should_launch()]
def _launch(self, tasks):
for task in tasks:
task_ids = task.launch()
# Check whether we got any duplicate task IDs. This is just a
# rationality check for testing. With the current testing
# framework, it is easy for multiple launch attempts to return
# the same ID, because the default testing output always
# returns the same task ID value.
for task_id in task_ids:
assert (
task_id not in self._task_ids_seen
), f"Duplicate task ID seen: {repr(task_id)}"
self._task_ids_seen.add(task_id)
def _launch_and_collect(
self, tasks, collect_output_dir, summary_presentation
):
"""Launch necessary tasks and process those that complete.
Launch any tasks that are not currently running, have not passed,
and have not exceeded max_attempts.
After launching tasks, wait for the first task to complete (of the tasks
just launched as well as those that have been running for awhile).
Summarize the jobs that have just passed or failed as well as those still
running (with swarming links).
This function is mostly stateless. The caller must pass in the
same arguments for each invocation, and state is kept inside the
tasks themselves.
Args:
tasks (list[Task]): tasks to execute
collect_output_dir (Path or None): output directory to pass to
api.swarming.collect()
summary_presentation (StepPresentation): where to attach the
summary for this round of launch/collect.
Returns:
Number of jobs still running or to be relaunched. As long as this
is positive the caller should continue calling this method.
"""
summary = []
def summary_entry(list_name, task_list):
if len(task_list) == 1:
count_or_name = task_list[0].name
else:
count_or_name = len(task_list)
return f"{count_or_name} {list_name}"
to_launch = self._get_tasks_to_launch(tasks)
if to_launch:
with self.m.step.nest("launch"):
self._launch(to_launch)
# Wait on tasks that are in-progress.
tasks_by_id = {}
for task in tasks:
for attempt in task._in_progress_attempts:
assert attempt.task_id not in tasks_by_id
tasks_by_id[attempt.task_id] = (task, attempt)
results = {}
if tasks_by_id:
results = self.m.buildbucket.collect_builds(
sorted([int(build_id) for build_id in tasks_by_id]),
mirror_status=False
)
# 'collect' takes a list of build IDs and returns a list specifying
# whether each task has completed or is still running. However,
# sometimes the list it returns misses tasks that haven't
# completed. That makes no difference here because we only look at
# the completed tasks.
completed_results = list(filter(self._is_complete, results.values()))
passed_tasks = []
failed_tasks = []
if completed_results:
with self.m.step.nest("process results") as process_results_presentation:
for result in completed_results:
task, attempt = tasks_by_id[str(result.id)]
task.process_result(attempt, result)
if attempt.success:
passed_tasks.append((task, attempt))
else:
failed_tasks.append((task, attempt))
# Prevent failure states from the child log steps being
# propagated up the log step tree by the recipe log step
# system. This is desirable because although
# task.process_result() may internally catch and suppress
# an exception, the exception will still be reported
# through recipe log step system.
process_results_presentation.status = self.m.step.SUCCESS
for list_name, task_list in [
("passed", passed_tasks),
("failed", failed_tasks),
]:
if not task_list:
continue
links = []
for task, attempt in task_list:
name = f"{task.name} ({attempt.name})"
links.append((name, attempt.task_ui_link))
with self.m.step.nest(f"{list_name} tasks") as list_step_presentation:
list_step_presentation.links.update(links)
summary.append(summary_entry(list_name, [task for task, _ in task_list]))
incomplete_tasks = [task for task in tasks if task.in_progress]
# Do minimal presentation of all in-progress Attempts.
links = []
for task in tasks:
for attempt in task._in_progress_attempts:
name = f"{task.name} ({attempt.name})"
links.append((name, attempt.task_ui_link))
if links:
with self.m.step.nest("incomplete tasks") as list_step_presentation:
list_step_presentation.links.update(links)
summary.append(summary_entry("incomplete", incomplete_tasks))
to_be_relaunched = self._get_tasks_to_launch(tasks)
failed_after_max_attempts = [
task for task in tasks if task.failed_after_max_attempts()
]
if failed_after_max_attempts:
summary.append(
summary_entry("failed after max attempts", failed_after_max_attempts)
)
summary_presentation.step_summary_text = ", ".join(summary)
# Check if all abort_early_if_failed tasks are finished. If one or more
# fail, don't wait on remaining tasks. They will be automatically
# forcibly terminated when the build's Swarming task completes.
abort_early_tasks = [task for task in tasks if task.abort_early_if_failed]
abort_early_tasks_in_progress = [
task for task in abort_early_tasks if task.in_progress
]
if not abort_early_tasks_in_progress:
failed_abort_early_tasks = [
task for task in abort_early_tasks if task.failed_after_max_attempts()
]
if failed_abort_early_tasks:
return 0
return len(to_be_relaunched) + len(incomplete_tasks)
def run_tasks(
self,
tasks,
max_attempts=0,
collect_output_dir=None,
run_count=1,
):
"""Launch all tasks, retry until max_attempts reached.
Args:
tasks (seq[Task]): tasks to execute
max_attempts (int): maximum number of attempts per task (0 means
DEFAULT_MAX_ATTEMPTS)
collect_output_dir (Path or None): output directory to pass to
api.swarming.collect()
"""
max_attempts = max_attempts or DEFAULT_MAX_ATTEMPTS
for task in tasks:
if not task.max_attempts:
task.max_attempts = max_attempts
tasks = [TaskTracker(self.m, task, run_count) for task in tasks]
with self.m.step.nest("launch/collect"), self.m.context(infra_steps=True):
for i in itertools.count(0):
with self.m.step.nest(str(i)) as presentation:
if not self._launch_and_collect(
tasks=tasks,
collect_output_dir=collect_output_dir,
summary_presentation=presentation,
):
break
return tasks
def present_tasks(self, tasks):
"""Present results as steps.
Examine tasks for pass/fail status and create step data for displaying
that status. Group all passes under one step and all failures under
another step. Passes that failed at least once are also listed as
flakes.
Args:
tasks (seq[Task]): tasks to examine
"""
# TODO(mohrr) add hooks to include task-specific data beyond pass/fail.
passed_tasks = [x for x in tasks if x.success]
# Some tasks may be incomplete if the launch_and_collect loop exited
# early due to failures.
incomplete_tasks = [x for x in tasks if x.in_progress]
failed_tasks = [x for x in tasks if x.failed]
flaked_tasks = [x for x in tasks if x.has_flakes()]
with self.m.step.nest("passes") as step_presentation:
for task in passed_tasks:
task.present(category="passes")
step_presentation.step_summary_text = f"{len(passed_tasks)} passed"
with self.m.step.nest("flakes") as step_presentation:
for task in flaked_tasks:
task.present(category="flakes")
step_presentation.step_summary_text = f"{len(flaked_tasks)} flaked"
with self.m.step.nest("failures") as step_presentation:
for task in failed_tasks:
task.present(category="failures")
step_presentation.step_summary_text = f"{len(failed_tasks)} failed"
if incomplete_tasks:
with self.m.step.nest("incomplete") as step_presentation:
for task in incomplete_tasks:
task.present(category="incomplete")
step_presentation.step_summary_text = (
f"{len(incomplete_tasks)} incomplete"
)
if not failed_tasks and not incomplete_tasks:
self.m.step.empty("all tasks passed") # pragma: no cover
def raise_failures(self, tasks):
"""Raise an exception if any tasks failed.
Examine tasks for pass/fail status. If any failed, raise a StepFailure.
Args:
tasks (seq[Task]): tasks to examine
"""
failed = [x for x in tasks if x.failed]
if failed:
raise self.m.step.StepFailure(
f"{pluralize('task', failed)} failed: {', '.join(x.name for x in failed)}"
)