blob: 9af62f28eb668f13a011289acc3d61cfaf66f83b [file] [log] [blame]
// Copyright 2019 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
import 'dart:async';
import 'package:appengine/appengine.dart';
import 'package:gcloud/db.dart';
import 'package:googleapis_auth/auth_io.dart';
import 'package:meta/meta.dart';
import '../datastore/cocoon_config.dart';
import '../foundation/utils.dart';
import '../model/appengine/agent.dart';
import '../model/appengine/commit.dart';
import '../model/appengine/key_helper.dart';
import '../model/appengine/stage.dart';
import '../model/appengine/task.dart';
import '../request_handling/api_request_handler.dart';
import '../request_handling/authentication.dart';
import '../request_handling/body.dart';
import '../request_handling/exceptions.dart';
import '../service/access_token_provider.dart';
import '../service/datastore.dart';
/// Reserves a pending task so that an agent may run the task.
@immutable
class ReserveTask extends ApiRequestHandler<ReserveTaskResponse> {
ReserveTask(
Config config,
AuthenticationProvider authenticationProvider, {
@visibleForTesting TaskProvider taskProvider,
@visibleForTesting ReservationProvider reservationProvider,
@visibleForTesting AccessTokenProvider accessTokenProvider,
}) : taskProvider = taskProvider ??
TaskProvider(datastore: DatastoreService(db: config.db)),
reservationProvider =
reservationProvider ?? ReservationProvider(config),
accessTokenProvider =
accessTokenProvider ?? AccessTokenProvider(config),
super(config: config, authenticationProvider: authenticationProvider);
final TaskProvider taskProvider;
final ReservationProvider reservationProvider;
final AccessTokenProvider accessTokenProvider;
@override
Future<ReserveTaskResponse> post() async {
final Map<String, dynamic> params = requestData;
Agent agent = authContext.agent;
if (agent != null) {
if (agent.agentId != params['AgentID']) {
throw BadRequestException(
'Authenticated agent (${agent.agentId}) does not match agent '
'supplied in the request (${params['AgentID']})',
);
}
} else {
final String agentId = params['AgentID'] as String;
if (agentId == null) {
throw const BadRequestException('AgentID not specified in request');
}
final Key key = config.db.emptyKey.append(Agent, id: agentId);
agent = await config.db.lookupValue<Agent>(key, orElse: () {
throw BadRequestException('Invalid agent ID: $agentId');
});
}
const int maxAttempts = 3;
for (int attempt = 0; attempt < maxAttempts; attempt++) {
final FullTask task = await taskProvider.findNextTask(agent);
if (task == null) {
return const ReserveTaskResponse.empty();
}
try {
await reservationProvider.secureReservation(
task.task, agent.id as String);
final ClientContext clientContext = authContext.clientContext;
final AccessToken token = await accessTokenProvider.createAccessToken(
scopes: const <String>[
'https://www.googleapis.com/auth/devstorage.read_write'
],
);
final KeyHelper keyHelper =
KeyHelper(applicationContext: clientContext.applicationContext);
return ReserveTaskResponse(task.task, task.commit, token, keyHelper);
} on ReservationLostException {
// Keep looking for another task.
log.debug(
'Reservation lost for task ${task.task.name} on commit ${task.commit.sha}');
continue;
}
}
log.warning('Could not secure reservation after $maxAttempts; giving up');
return const ReserveTaskResponse.empty();
}
}
@immutable
class ReserveTaskResponse extends JsonBody {
const ReserveTaskResponse(
this.task, this.commit, this.accessToken, this.keyHelper)
: assert(task != null),
assert(commit != null),
assert(accessToken != null),
assert(keyHelper != null);
const ReserveTaskResponse.empty()
: task = null,
commit = null,
accessToken = null,
keyHelper = null;
/// The task that was reserved.
///
/// The existence of this task in this response does not mean the task
/// reservation has been secured in the cloud datastore yet. It is up to
/// callers to manage the consistency of the reservation with the cloud
/// datastore.
///
/// This may be null, which indicates that no task was available to be
/// reserved.
///
/// See also:
///
/// * [ReserveTask.secureReservation], which secures consistency of the
/// reservation with the cloud datastore.
final Task task;
/// The commit that triggered the creation of [task].
///
/// This commit "owns" [task] and represents the instantiation of the commit
/// referenced by [Task.commitKey].
///
/// This may be null, which indicates that no task was available.
final Commit commit;
/// The OAuth 2.0 access token that the receiver of this response may use to
/// make authenticated requests back to App Engine.
///
/// This may be null. Generally, when [task] is non-null, callers will want
/// to return a response that contains an access token.
///
/// See also:
///
/// * [withAccessToken], which is used to add an access token to a response
/// that otherwise did not have an access token.
final AccessToken accessToken;
/// Used to serialize keys in the response.
final KeyHelper keyHelper;
@override
Map<String, dynamic> toJson() {
// package:json_serializable would work here, but only if we adjust the
// agent to match what's output here. Since the agent needs to work against
// the Go backend as well (temporarily), we hand-code the JSON format here.
final Map<String, dynamic> taskMap = task == null
? null
: <String, dynamic>{
'Task': <String, dynamic>{
'TimeoutInMinutes': task.timeoutInMinutes,
'Name': task.name,
},
'Key': keyHelper.encode(task.key),
};
final Map<String, dynamic> commitMap = commit == null
? null
: <String, dynamic>{
'Checklist': <String, dynamic>{
'Commit': <String, dynamic>{
'Sha': commit.sha,
},
},
};
return <String, dynamic>{
'TaskEntity': taskMap,
'ChecklistEntity': commitMap,
'CloudAuthToken': accessToken?.data,
};
}
}
@visibleForTesting
class TaskProvider {
TaskProvider({
@required this.datastore,
}) : assert(datastore != null);
/// The backing datastore. Guaranteed to be non-null.
final DatastoreService datastore;
Future<FullTask> findNextTask(Agent agent) async {
await for (Commit commit in datastore.queryRecentCommits()) {
final List<Stage> stages =
await datastore.queryTasksGroupedByStage(commit);
for (Stage stage in stages) {
if (!stage.isManagedByDeviceLab) {
continue;
}
for (Task task in List<Task>.from(stage.tasks)..sort(Task.byAttempts)) {
if (task.requiredCapabilities.isEmpty) {
throw InvalidTaskException(
'Task ${task.name} has no required capabilities');
}
if (task.status == Task.statusNew &&
agent.isCapableOfPerformingTask(task)) {
return FullTask(task, commit);
}
}
}
}
return null;
}
}
@visibleForTesting
class InvalidTaskException implements Exception {
const InvalidTaskException(this.message);
final String message;
@override
String toString() => message;
}
@visibleForTesting
class ReservationProvider {
const ReservationProvider(this.config);
final Config config;
/// If another agent has obtained the reservation on the task before we've
/// been able to secure our reservation, ths will throw a
/// [ReservationLostException]
Future<void> secureReservation(Task task, String agentId) {
assert(task != null);
assert(agentId != null);
try {
return runTransactionWithRetries(() async {
await config.db.withTransaction<void>((Transaction transaction) async {
final Task lockedTask = await transaction.lookupValue<Task>(task.key);
if (lockedTask.status != Task.statusNew) {
// Another reservation beat us in a race.
throw const ReservationLostException();
}
lockedTask.status = Task.statusInProgress;
lockedTask.attempts += 1;
lockedTask.startTimestamp = DateTime.now().millisecondsSinceEpoch;
lockedTask.reservedForAgentId = agentId;
transaction.queueMutations(inserts: <Task>[lockedTask]);
await transaction.commit();
});
});
} catch (error) {
throw const ReservationLostException();
}
}
}
/// Exception representing an attempt to secure a task reservation that was
/// preempted by another reservation holder.
@visibleForTesting
class ReservationLostException implements Exception {
/// Creates a new [ReservationLostException].
const ReservationLostException();
}