blob: 0c80599a3b4f8f274f80cc08c5b863f3f12a91ce [file] [log] [blame]
// Copyright 2019 The Flutter Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
import 'package:cocoon_service/src/foundation/utils.dart';
import 'package:cocoon_service/src/model/appengine/task.dart';
import 'package:cocoon_service/src/request_handling/body.dart';
import 'package:cocoon_service/src/service/datastore.dart';
import 'package:cocoon_service/src/service/scheduler/policy.dart';
import 'package:gcloud/db.dart';
import 'package:github/github.dart';
import 'package:meta/meta.dart';
import 'package:retry/retry.dart';
import '../../model/ci_yaml/ci_yaml.dart';
import '../../model/ci_yaml/target.dart';
import '../../request_handling/exceptions.dart';
import '../../request_handling/request_handler.dart';
import '../../service/config.dart';
import '../../service/logging.dart';
import '../../service/luci_build_service.dart';
import '../../service/scheduler.dart';
/// Cron request handler for scheduling targets when capacity becomes available.
///
/// Targets that have a [BatchPolicy] need to have backfilling enabled to ensure that ToT is always being tested.
@immutable
class BatchBackfiller extends RequestHandler {
/// Creates a subscription for sending BuildBucket requests.
const BatchBackfiller({
required super.config,
required this.scheduler,
@visibleForTesting this.datastoreProvider = DatastoreService.defaultProvider,
});
final DatastoreServiceProvider datastoreProvider;
final Scheduler scheduler;
@override
Future<Body> get() async {
final List<Future<void>> futures = <Future<void>>[];
for (RepositorySlug slug in config.supportedRepos) {
futures.add(backfillRepository(slug));
}
// Process all repos asynchronously
await Future.wait<void>(futures);
return Body.empty;
}
Future<void> backfillRepository(RepositorySlug slug) async {
final DatastoreService datastore = datastoreProvider(config.db);
final List<FullTask> tasks =
await (datastore.queryRecentTasks(slug: slug, commitLimit: config.backfillerCommitLimit)).toList();
// Construct Task columns to scan for backfilling
final Map<String, List<FullTask>> taskMap = <String, List<FullTask>>{};
for (FullTask fullTask in tasks) {
if (taskMap.containsKey(fullTask.task.name)) {
taskMap[fullTask.task.name]!.add(fullTask);
} else {
taskMap[fullTask.task.name!] = <FullTask>[fullTask];
}
}
// Check if should be scheduled (there is no yellow runs). Run the most recent gray.
List<Tuple<Target, FullTask, int>> backfill = <Tuple<Target, FullTask, int>>[];
for (List<FullTask> taskColumn in taskMap.values) {
final FullTask task = taskColumn.first;
final CiYaml ciYaml = await scheduler.getCiYaml(task.commit);
final List<Target> ciYamlTargets = ciYaml.backfillTargets;
// Skips scheduling if the task is not in TOT commit anymore.
final bool taskInToT = ciYamlTargets.map((Target target) => target.value.name).toList().contains(task.task.name);
if (!taskInToT) {
continue;
}
final Target target = ciYamlTargets.singleWhere((target) => target.value.name == task.task.name);
if (target.schedulerPolicy is! BatchPolicy) {
continue;
}
final FullTask? backfillTask = _backfillTask(target, taskColumn);
final int? priority = backfillPriority(taskColumn.map((e) => e.task).toList());
if (priority != null && backfillTask != null) {
backfill.add(Tuple<Target, FullTask, int>(target, backfillTask, priority));
}
}
// Get the number of targets to be backfilled in each cycle.
backfill = getFilteredBackfill(backfill);
log.fine('Backfilling ${backfill.length} builds');
log.fine(backfill.map<String>((Tuple<Target, FullTask, int> tuple) => tuple.first.value.name));
// Update tasks status as in progress to avoid duplicate scheduling.
final List<Task> backfillTasks = backfill.map((Tuple<Target, FullTask, int> tuple) => tuple.second.task).toList();
try {
await datastore.withTransaction<void>((Transaction transaction) async {
transaction.queueMutations(inserts: backfillTasks);
await transaction.commit();
log.fine(
'Updated ${backfillTasks.length} tasks: ${backfillTasks.map((e) => e.name).toList()} when backfilling.',
);
});
// Schedule all builds asynchronously.
// Schedule after db updates to avoid duplicate scheduling when db update fails.
await _scheduleWithRetries(backfill);
} catch (error) {
log.severe('Failed to update tasks when backfilling: $error');
}
}
/// Filters [config.backfillerTargetLimit] targets to backfill.
///
/// High priority targets will be guranteed to get back filled first. If more targets
/// than [config.backfillerTargetLimit], pick the limited number of targets after a
/// shuffle. This is to make sure all targets are picked with the same chance.
List<Tuple<Target, FullTask, int>> getFilteredBackfill(List<Tuple<Target, FullTask, int>> backfill) {
if (backfill.length <= config.backfillerTargetLimit) {
return backfill;
}
final List<Tuple<Target, FullTask, int>> filteredBackfill = <Tuple<Target, FullTask, int>>[];
final List<Tuple<Target, FullTask, int>> highPriorityBackfill =
backfill.where((element) => element.third == LuciBuildService.kRerunPriority).toList();
final List<Tuple<Target, FullTask, int>> normalPriorityBackfill =
backfill.where((element) => element.third != LuciBuildService.kRerunPriority).toList();
if (highPriorityBackfill.length >= config.backfillerTargetLimit) {
highPriorityBackfill.shuffle();
filteredBackfill.addAll(highPriorityBackfill.sublist(0, config.backfillerTargetLimit));
} else {
filteredBackfill.addAll(highPriorityBackfill);
normalPriorityBackfill.shuffle();
filteredBackfill
.addAll(normalPriorityBackfill.sublist(0, config.backfillerTargetLimit - highPriorityBackfill.length));
}
return filteredBackfill;
}
/// Schedules tasks with retry when hitting pub/sub server errors.
Future<void> _scheduleWithRetries(List<Tuple<Target, FullTask, int>> backfill) async {
const RetryOptions retryOptions = Config.schedulerRetry;
try {
await retryOptions.retry(
() async {
final List<List<Tuple<Target, Task, int>>> tupleLists =
await Future.wait<List<Tuple<Target, Task, int>>>(backfillRequestList(backfill));
if (tupleLists.any((List<Tuple<Target, Task, int>> tupleList) => tupleList.isNotEmpty)) {
final int nonEmptyListLenght = tupleLists.where((element) => element.isNotEmpty).toList().length;
log.info('Backfill fails and retry backfilling $nonEmptyListLenght targets.');
backfill = _updateBackfill(backfill, tupleLists);
throw InternalServerError('Failed to backfill ${backfill.length} targets.');
}
},
retryIf: (Exception e) => e is InternalServerError,
);
} catch (error) {
log.severe('Failed to backfill ${backfill.length} targets due to error: $error');
}
}
/// Updates the [backfill] list with those that fail to get scheduled.
///
/// [tupleLists] maintains the same tuple order as those in [backfill].
/// Each element from [backfill] is encapsulated as a list in [tupleLists] to prepare for
/// [scheduler.luciBuildService.schedulePostsubmitBuilds].
List<Tuple<Target, FullTask, int>> _updateBackfill(
List<Tuple<Target, FullTask, int>> backfill,
List<List<Tuple<Target, Task, int>>> tupleLists,
) {
final List<Tuple<Target, FullTask, int>> updatedBackfill = <Tuple<Target, FullTask, int>>[];
for (int i = 0; i < tupleLists.length; i++) {
if (tupleLists[i].isNotEmpty) {
updatedBackfill.add(backfill[i]);
}
}
return updatedBackfill;
}
/// Creates a list of backfill requests.
List<Future<List<Tuple<Target, Task, int>>>> backfillRequestList(List<Tuple<Target, FullTask, int>> backfill) {
final List<Future<List<Tuple<Target, Task, int>>>> futures = <Future<List<Tuple<Target, Task, int>>>>[];
for (Tuple<Target, FullTask, int> tuple in backfill) {
// TODO(chillers): The backfill priority is always going to be low. If this is a ToT task, we should run it at the default priority.
final Tuple<Target, Task, int> toBeScheduled = Tuple(
tuple.first,
tuple.second.task,
tuple.third,
);
futures.add(
scheduler.luciBuildService.schedulePostsubmitBuilds(
commit: tuple.second.commit,
toBeScheduled: [toBeScheduled],
),
);
}
return futures;
}
/// Returns priority for back filled targets.
///
/// Skips scheduling newly created targets whose available entries are
/// less than `BatchPolicy.kBatchSize`.
///
/// Uses a higher priority if there is an earlier failed build. Otherwise,
/// uses default `LuciBuildService.kBackfillPriority`
int? backfillPriority(List<Task> tasks) {
if (tasks.length < BatchPolicy.kBatchSize) {
return null;
}
if (shouldRerunPriority(tasks, BatchPolicy.kBatchSize)) {
return LuciBuildService.kRerunPriority;
}
return LuciBuildService.kBackfillPriority;
}
/// Returns the most recent [FullTask] to backfill.
///
/// A [FullTask] is only returned iff:
/// 1. There are no running builds (yellow)
/// 2. There are tasks that haven't been run (gray)
///
/// This is naive, and doesn't rely on knowing the actual Flutter infra capacity.
///
/// Otherwise, returns null indicating nothing should be backfilled.
FullTask? _backfillTask(Target target, List<FullTask> tasks) {
final List<FullTask> relevantTasks = tasks.where((FullTask task) => task.task.name == target.value.name).toList();
if (relevantTasks.any((FullTask task) => task.task.status == Task.statusInProgress)) {
// Don't schedule more builds where there is already a running task
return null;
}
final List<FullTask> backfillTask =
relevantTasks.where((FullTask task) => task.task.status == Task.statusNew).toList();
if (backfillTask.isEmpty) {
return null;
}
// First item in the list is guranteed to be most recent.
// Mark task as in progress to ensure it isn't scheduled over
backfillTask.first.task.status = Task.statusInProgress;
return backfillTask.first;
}
}