# Copyright 2019 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

"""Launch and retry swarming jobs until they pass or we hit max attempts."""

import itertools
from urllib.parse import urlparse

import attr
from recipe_engine import recipe_api

from RECIPE_MODULES.fuchsia.utils import pluralize

DEFAULT_MAX_ATTEMPTS = 2


@attr.s
class Attempt:
  """References a specific attempt of a task."""

  task_id = attr.ib(type=str)
  index = attr.ib(type=int, default=None)  # Number of prior attempts.
  host = attr.ib(type=str, default=None)
  task_ui_link = attr.ib(type=str, default=None)
  # api.swarming.TaskResult from api.swarming.collect() call.
  result = attr.ib(default=None)
  # This attribute should be set by overrides of Task.process_result(). It
  # indicates that even though at the swarming level the task may have
  # passed something failed inside that larger task.
  failure_reason = attr.ib(type=str, default="")
  has_flakes = attr.ib(type=bool, default=False)
  task_outputs_link = attr.ib(type=str, default=None)
  logs = attr.ib(type=dict, default=attr.Factory(dict))

  def __attrs_post_init__(self):
    # The led module gives the host and the id, but the swarming module
    # gives the link and the id. Require the id (since it has no default
    # above) and require either the host or task_ui_link attributes.
    assert self.host or self.task_ui_link
    if not self.task_ui_link:
      self.task_ui_link = f"https://{self.host}/task?id={self.task_id}"
    elif not self.host:
      self.host = urlparse(self.task_ui_link).netloc

  @property
  def bot_id(self):
    return self.result.bot_id if self.result else None

  @property
  def bot_ui_link(self):
    return f"https://{self.host}/bot?id={self.bot_id}" if self.bot_id else None

  @property
  def name(self):
    return f"attempt {int(self.index)}"

  # TODO(mohrr) add hook for pass/fail beyond swarming task level.
  # In some cases may need to examine task outputs to determine pass/fail.
  @property
  def success(self):
    if self.failure_reason:
      return False

    if not self.result:  # pragma: nocover
      return False

    try:
      self.result.analyze()
      return True
    except recipe_api.StepFailure:
      return False


class TaskTracker:
  """TaskTracker tracks state about attempts to run a task.

    TaskTracker runs the task until we get run_count successes. Usually
    run_count is 1, for running regular tests, but run_count may be >1 when
    gathering results of performance tests.
    """

  # States returned by _get_state()
  _LAUNCH_MORE = "launch_more"
  _IN_PROGRESS = "in_progress"
  _OVERALL_SUCCESS = "overall_success"
  _OVERALL_FAILURE = "overall_failure"

  def __init__(self, api, task, run_count):
    """
        Args:
          api: recipe_api.RecipeApiPlain object.
          task: Task object.
          run_count: number of successful runs we want to get for the task.
        """
    self._api = api
    self._task = task
    self._attempts = []
    self._in_progress_attempts = []
    self._successes_required = run_count
    self._successes_got = 0
    self._failures_got = 0
    self._flakes_got = 0

  @property
  def name(self):
    return self._task.name

  @property
  def abort_early_if_failed(self):
    return self._task.abort_early_if_failed

  # Returns a pair (state, number_to_launch), where number_to_launch
  # is the number of new task attempts to be launched.
  def _get_state(self):
    if len(self._in_progress_attempts) != 0:
      return self._IN_PROGRESS, 0

    if self._successes_got >= self._successes_required:
      return self._OVERALL_SUCCESS, 0
    # We treat the max_attempts parameter as a multiplier, basically
    # "max attempts per successful run needed", so that the same
    # max_attempts value can be used for both perfcompare and regular
    # builders.
    attempts_allowed = self._task.max_attempts * self._successes_required
    remaining_needed = self._successes_required - self._successes_got
    remaining_allowed = attempts_allowed - len(self._attempts)
    if remaining_needed > remaining_allowed:
      return self._OVERALL_FAILURE, 0
    # Apply the "no futile retries" strategy: If we need multiple
    # successful runs but we see no successes in the first batch of
    # attempts, don't do any retries, on the grounds that the build
    # we're testing is probably bad (i.e. it won't pass if retried).
    # This is intended to avoid wasting time and infra capacity.
    if (self._successes_required > 1 and self._successes_got == 0 and
        len(self._attempts) >= self._successes_required):
      return self._OVERALL_FAILURE, 0
    return self._LAUNCH_MORE, remaining_needed

  def should_launch(self):
    _, number_to_launch = self._get_state()
    return number_to_launch > 0

  # Launch one or more task attempts.  This assumes that should_launch()
  # was previously called and returned True.
  def launch(self):
    state, number_to_launch = self._get_state()
    assert state == self._LAUNCH_MORE, state
    assert number_to_launch > 0

    # Don't increase the priority if we need multiple successful runs (used
    # for perfcompare mode).
    if self._successes_required > 1:
      priority_boost_amount = 0
    else:
      # Boost the priority by the number of previous attempts.  This means
      # that second attempts will take priority over first attempts, third
      # attempts will take priority over second attempts, etc.
      #
      # This means that if there is a long queue for Swarming tasks to run,
      # only the first attempts should wait.  Subsequent attempts should
      # jump ahead in the queue.
      priority_boost_amount = len(self._attempts)

    task_ids = []
    for _ in range(number_to_launch):
      attempt_index = len(self._attempts)
      task_name = f"{self.name} (attempt {int(attempt_index)})"
      with self._api.step.nest(task_name) as presentation:
        attempt = self._task.launch(priority_boost_amount)
        attempt.index = attempt_index
        self._attempts.append(attempt)
        self._in_progress_attempts.append(attempt)
        task_ids.append(attempt.task_id)
        presentation.links["Swarming task"] = attempt.task_ui_link
    return task_ids

  @property
  def attempts(self):  # pragma: no cover
    return self._attempts[:]

  @property
  def in_progress(self):
    state, _ = self._get_state()
    return state == self._IN_PROGRESS

  @property
  def success(self):
    state, _ = self._get_state()
    return state == self._OVERALL_SUCCESS

  @property
  def failed(self):
    return not self.success and not self.in_progress

  def failed_after_max_attempts(self):
    state, _ = self._get_state()
    return state == self._OVERALL_FAILURE

  def has_flakes(self):
    return self._flakes_got > 0 or (
        self._successes_got > 0 and self._failures_got > 0
    )

  def process_result(self, attempt, result):
    with self._api.step.nest(result.name):
      self._in_progress_attempts.remove(attempt)
      attempt.result = result
      try:
        self._task.process_result(attempt)
      except recipe_api.StepFailure as e:
        error_step = self._api.step.empty("exception")
        error_step.presentation.step_summary_text = str(e)
        attempt.failure_reason = "exception during result processing"
        if e.name and e.exc_result:
          # The error name generally contains the name of the step
          # that failed. The full step name will already be namespaced
          # by task name, so present everything after the task name
          # since the failure_reason will only be presented in the
          # context of this task.
          attempt.failure_reason += f": {e.name.split(self.name + '.')[-1]} (retcode {e.exc_result.retcode})"
        trace_lines = self._api.utils.traceback_format_exc().splitlines()
        attempt.logs["exception"] = trace_lines
        error_step.presentation.logs["exception"] = trace_lines
      if attempt.success:
        self._successes_got += 1
        if attempt.has_flakes:
          self._flakes_got += 1
      else:
        self._failures_got += 1

  def present(self, **kwargs):
    """Present this task when summarizing results at the end of the run.

        Args:
          **kwargs (Dict): passed through to present_attempt()

        Returns:
          None
        """
    with self._api.step.nest(self.name) as task_step_presentation:
      for attempt in self._attempts:
        self._task.present_attempt(task_step_presentation, attempt, **kwargs)

      # Show incomplete tasks in green so as not to be confused with
      # actual failures.
      if self.success or self.in_progress:
        task_step_presentation.status = self._api.step.SUCCESS
      else:
        task_step_presentation.status = self._api.step.FAILURE


class Task:
  """A Task object describes:

     * How to launch a task.
     * How to process and present the results from a task.

    This class is meant to be subclassed. Subclasses must define a launch()
    method.

    In most cases Task.max_attempts should be left alone. If the caller wants
    to ensure a task has a larger or smaller number of max attempts than the
    default for other tasks, set max_attempts to that number.
    """

  def __init__(self, api, name):
    """Initializer.

        Args:
          api: recipe_api.RecipeApiPlain object.
          name: str, human readable name of this task
        """
    self._api = api
    self.name = name
    self.max_attempts = None
    self.abort_early_if_failed = False

  def process_result(self, attempt):
    """Examine the result in the given attempt for failures.

        Subclasses can set attempt.failure_reason if they find a failure inside
        attempt.result. failure_reason should be a short summary of the failure
        (< 50 chars).

        This is invoked shortly after api.swarming.collect() returns that a
        task completed. It cannot assume the swarming task completed
        successfully.

        This is a no-op here but can be overridden by subclasses.

        Returns:
          None
        """

  def present_attempt(self, task_step_presentation, attempt, **kwargs):
    """Present an Attempt when summarizing results at the end of the run.

        Args:
          task_step_presentation (StepPresentation): assuming present() was not
            overridden, this will always be for a step titled after the current
            task
          attempt (Attempt): the Attempt to present
          **kwargs (Dict): pass-through arguments for subclasses

        This method will be invoked to show details of an Attempt. This base
        class method just creates a link to the swarming results from the task,
        but subclasses are free to create a step with much more elaborate
        details of results.

        Returns:
          None
        """
    del kwargs  # Unused.
    name = f"{attempt.name} ({'pass' if attempt.success else 'fail'})"
    task_step_presentation.links[name] = attempt.task_ui_link
    task_step_presentation.links[attempt.bot_id] = attempt.bot_ui_link

  def launch(self, priority_boost_amount):
    """Launch the task (using Swarming, led, or something else).

        Args:
          priority_boost_amount (int): Non-negative integer specifying how much
            the priority of the task should be increased from the default.

        Returns:
          Attempt object, with the task_id or host property filled out from
          from the Swarming or led result.
        """
    assert False, "Subclasses must define launch() method."  # pragma: no cover


class TriggeredTask(Task):

  def __init__(self, request, api, **kwargs):
    super().__init__(api, request.name, **kwargs)
    self._request = request

  def launch(self, priority_boost_amount):
    assert self._request

    request = self._request
    if priority_boost_amount != 0:
      old_priority = request.priority
      # For Swarming tasks, numerically lower priority values are logically
      # higher priorities, so use subtraction here.
      request = request.with_priority(old_priority - priority_boost_amount)
      with self._api.step.nest("increase priority") as pres:
        pres.step_summary_text = (
            f"from {int(old_priority)} to {int(request.priority)}"
        )

    metadata = self._api.swarming.trigger("trigger", [request])
    assert len(metadata) == 1
    metadata = metadata[0]

    return self._api.swarming_retry.Attempt(
        task_ui_link=metadata.task_ui_link,
        task_id=metadata.id,
    )


class LedTask(Task):

  def __init__(self, led_data, *args, **kwargs):
    super().__init__(*args, **kwargs)
    self._led_data = led_data

    build = led_data.result.buildbucket.bbagent_args.build
    self._swarming_pb = build.infra.swarming
    self._original_priority = self._swarming_pb.priority

  def launch(self, priority_boost_amount):
    assert self._led_data

    # For Swarming tasks, numerically lower priority values are logically
    # higher priorities, so use subtraction here.
    new_priority = self._original_priority - priority_boost_amount
    # Although we modify this data structure in place, one launch()
    # invocation should not affect later launch() invocations because this
    # 'priority' field is always overwritten.
    self._swarming_pb.priority = new_priority
    if priority_boost_amount != 0:
      with self._api.step.nest("increase priority") as pres:
        pres.step_summary_text = (
            f"from {int(self._original_priority)} to {int(new_priority)}"
        )

    res = self._led_data.then("launch", "-modernize", "-real-build")
    host = res.launch_result.swarming_hostname
    task_id = res.launch_result.task_id or str(res.launch_result.build_id)
    build_url_swarming = 'https://ci.chromium.org/swarming/task/%s?server=%s' % (
        task_id,
        res.launch_result.swarming_hostname,
    )
    build_url_bb = 'https://%s/build/%s' % (
        res.launch_result.buildbucket_hostname, task_id
    )
    build_url = build_url_swarming if res.launch_result.task_id else build_url_bb
    return self._api.swarming_retry.Attempt(
        host=host,
        task_id=task_id,
        # Use Milo since this task is running a recipe.
        task_ui_link=build_url,
    )


class RetrySwarmingApi(recipe_api.RecipeApi):
  """Launch and retry swarming jobs until they pass or we hit max attempts."""

  Task = Task  # pylint: disable=invalid-name
  LedTask = LedTask  # pylint: disable=invalid-name
  TriggeredTask = TriggeredTask  # pylint: disable=invalid-name

  Attempt = Attempt  # pylint: disable=invalid-name

  DEFAULT_MAX_ATTEMPTS = DEFAULT_MAX_ATTEMPTS

  def __init__(self, *args, **kwargs):
    super().__init__(*args, **kwargs)
    self._task_ids_seen = set()

  def run_and_present_tasks(self, tasks, **kwargs):
    tasks = self.run_tasks(tasks, **kwargs)
    self.present_tasks(tasks)
    self.raise_failures(tasks)

  def _is_complete(self, result):
    # At the moment results have a bunch of fields set to None if incomplete.
    # On the assumption this will be changed at some point I'm also checking
    # the state explicitly.
    if result.name is None:
      return False

    return result.state not in {
        self.m.swarming.TaskState.RUNNING,
        self.m.swarming.TaskState.PENDING,
    }

  def _get_tasks_to_launch(self, tasks):
    if any(task.failed_after_max_attempts() for task in tasks):
      # The build has failed overall, so disable launching any
      # further task attempts.
      return []
    return [task for task in tasks if task.should_launch()]

  def _launch(self, tasks):
    for task in tasks:
      task_ids = task.launch()
      # Check whether we got any duplicate task IDs.  This is just a
      # rationality check for testing.  With the current testing
      # framework, it is easy for multiple launch attempts to return
      # the same ID, because the default testing output always
      # returns the same task ID value.
      for task_id in task_ids:
        assert (
            task_id not in self._task_ids_seen
        ), f"Duplicate task ID seen: {repr(task_id)}"
        self._task_ids_seen.add(task_id)

  def _launch_and_collect(
      self, tasks, collect_output_dir, summary_presentation
  ):
    """Launch necessary tasks and process those that complete.

        Launch any tasks that are not currently running, have not passed,
        and have not exceeded max_attempts.

        After launching tasks, wait for the first task to complete (of the tasks
        just launched as well as those that have been running for awhile).
        Summarize the jobs that have just passed or failed as well as those still
        running (with swarming links).

        This function is mostly stateless. The caller must pass in the
        same arguments for each invocation, and state is kept inside the
        tasks themselves.

        Args:
          tasks (list[Task]): tasks to execute
          collect_output_dir (Path or None): output directory to pass to
            api.swarming.collect()
          summary_presentation (StepPresentation): where to attach the
            summary for this round of launch/collect.

        Returns:
          Number of jobs still running or to be relaunched. As long as this
          is positive the caller should continue calling this method.
        """
    summary = []

    def summary_entry(list_name, task_list):
      if len(task_list) == 1:
        count_or_name = task_list[0].name
      else:
        count_or_name = len(task_list)
      return f"{count_or_name} {list_name}"

    to_launch = self._get_tasks_to_launch(tasks)
    if to_launch:
      with self.m.step.nest("launch"):
        self._launch(to_launch)

    # Wait on tasks that are in-progress.
    tasks_by_id = {}
    for task in tasks:
      for attempt in task._in_progress_attempts:
        assert attempt.task_id not in tasks_by_id
        tasks_by_id[attempt.task_id] = (task, attempt)
    results = []
    if tasks_by_id:
      results = self.m.swarming.collect(
          "collect",
          sorted(tasks_by_id.keys()),
          output_dir=collect_output_dir,
          eager=True,
          verbose=True,
      )

    # 'collect' takes a list of task IDs and returns a list specifying
    # whether each task has completed or is still running.  However,
    # sometimes the list it returns misses tasks that haven't
    # completed.  That makes no difference here because we only look at
    # the completed tasks.
    completed_results = [
        result for result in results if self._is_complete(result)
    ]

    passed_tasks = []
    failed_tasks = []
    if completed_results:
      with self.m.step.nest("process results") as process_results_presentation:
        for result in completed_results:
          task, attempt = tasks_by_id[result.id]
          task.process_result(attempt, result)
          if attempt.success:
            passed_tasks.append((task, attempt))
          else:
            failed_tasks.append((task, attempt))

        # Prevent failure states from the child log steps being
        # propagated up the log step tree by the recipe log step
        # system.  This is desirable because although
        # task.process_result() may internally catch and suppress
        # an exception, the exception will still be reported
        # through recipe log step system.
        process_results_presentation.status = self.m.step.SUCCESS

    for list_name, task_list in [
        ("passed", passed_tasks),
        ("failed", failed_tasks),
    ]:
      if not task_list:
        continue
      links = []
      for task, attempt in task_list:
        name = f"{task.name} ({attempt.name})"
        links.append((name, attempt.task_ui_link))
      with self.m.step.nest(f"{list_name} tasks") as list_step_presentation:
        list_step_presentation.links.update(links)

      summary.append(summary_entry(list_name, [task for task, _ in task_list]))

    incomplete_tasks = [task for task in tasks if task.in_progress]
    # Do minimal presentation of all in-progress Attempts.
    links = []
    for task in tasks:
      for attempt in task._in_progress_attempts:
        name = f"{task.name} ({attempt.name})"
        links.append((name, attempt.task_ui_link))
    if links:
      with self.m.step.nest("incomplete tasks") as list_step_presentation:
        list_step_presentation.links.update(links)
      summary.append(summary_entry("incomplete", incomplete_tasks))

    to_be_relaunched = self._get_tasks_to_launch(tasks)
    failed_after_max_attempts = [
        task for task in tasks if task.failed_after_max_attempts()
    ]
    if failed_after_max_attempts:
      summary.append(
          summary_entry("failed after max attempts", failed_after_max_attempts)
      )

    summary_presentation.step_summary_text = ", ".join(summary)

    # Check if all abort_early_if_failed tasks are finished. If one or more
    # fail, don't wait on remaining tasks. They will be automatically
    # forcibly terminated when the build's Swarming task completes.
    abort_early_tasks = [task for task in tasks if task.abort_early_if_failed]
    abort_early_tasks_in_progress = [
        task for task in abort_early_tasks if task.in_progress
    ]
    if not abort_early_tasks_in_progress:
      failed_abort_early_tasks = [
          task for task in abort_early_tasks if task.failed_after_max_attempts()
      ]
      if failed_abort_early_tasks:
        return 0

    return len(to_be_relaunched) + len(incomplete_tasks)

  def run_tasks(
      self,
      tasks,
      max_attempts=0,
      collect_output_dir=None,
      run_count=1,
  ):
    """Launch all tasks, retry until max_attempts reached.

        Args:
          tasks (seq[Task]): tasks to execute
          max_attempts (int): maximum number of attempts per task (0 means
            DEFAULT_MAX_ATTEMPTS)
          collect_output_dir (Path or None): output directory to pass to
            api.swarming.collect()
        """

    max_attempts = max_attempts or DEFAULT_MAX_ATTEMPTS

    for task in tasks:
      if not task.max_attempts:
        task.max_attempts = max_attempts

    tasks = [TaskTracker(self.m, task, run_count) for task in tasks]

    with self.m.step.nest("launch/collect"), self.m.context(infra_steps=True):
      for i in itertools.count(0):
        with self.m.step.nest(str(i)) as presentation:
          if not self._launch_and_collect(
              tasks=tasks,
              collect_output_dir=collect_output_dir,
              summary_presentation=presentation,
          ):
            break

    return tasks

  def present_tasks(self, tasks):
    """Present results as steps.

        Examine tasks for pass/fail status and create step data for displaying
        that status. Group all passes under one step and all failures under
        another step. Passes that failed at least once are also listed as
        flakes.

        Args:
          tasks (seq[Task]): tasks to examine
        """
    # TODO(mohrr) add hooks to include task-specific data beyond pass/fail.
    passed_tasks = [x for x in tasks if x.success]
    # Some tasks may be incomplete if the launch_and_collect loop exited
    # early due to failures.
    incomplete_tasks = [x for x in tasks if x.in_progress]
    failed_tasks = [x for x in tasks if x.failed]
    flaked_tasks = [x for x in tasks if x.has_flakes()]

    with self.m.step.nest("passes") as step_presentation:
      for task in passed_tasks:
        task.present(category="passes")
      step_presentation.step_summary_text = f"{len(passed_tasks)} passed"

    with self.m.step.nest("flakes") as step_presentation:
      for task in flaked_tasks:
        task.present(category="flakes")
      step_presentation.step_summary_text = f"{len(flaked_tasks)} flaked"

    with self.m.step.nest("failures") as step_presentation:
      for task in failed_tasks:
        task.present(category="failures")
      step_presentation.step_summary_text = f"{len(failed_tasks)} failed"

    if incomplete_tasks:
      with self.m.step.nest("incomplete") as step_presentation:
        for task in incomplete_tasks:
          task.present(category="incomplete")
        step_presentation.step_summary_text = (
            f"{len(incomplete_tasks)} incomplete"
        )

    if not failed_tasks and not incomplete_tasks:
      self.m.step.empty("all tasks passed")  # pragma: no cover

  def raise_failures(self, tasks):
    """Raise an exception if any tasks failed.

        Examine tasks for pass/fail status. If any failed, raise a StepFailure.

        Args:
          tasks (seq[Task]): tasks to examine
        """
    failed = [x for x in tasks if x.failed]
    if failed:
      raise self.m.step.StepFailure(
          f"{pluralize('task', failed)} failed: {', '.join(x.name for x in failed)}"
      )
