blob: 874d16d87c9a52357c7cbacb40f6f63cccebc60d [file] [log] [blame]
// Copyright 2019 The Flutter Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
import 'dart:convert';
import 'package:meta/meta.dart';
import 'package:retry/retry.dart';
import '../../../cocoon_service.dart';
import '../../model/luci/buildbucket.dart';
import '../../request_handling/exceptions.dart';
import '../../request_handling/subscription_handler.dart';
import '../../service/logging.dart';
/// Subscription for making requests to BuildBucket.
///
/// The PubSub subscription is set up here:
/// https://console.cloud.google.com/cloudpubsub/subscription/detail/scheduler-requests?project=flutter-dashboard
///
/// This endpoint allows Cocoon to defer BuildBucket requests off the main request loop. This is critical when new
/// commits are pushed, and they can schedule 100+ builds at once.
///
/// This endpoint takes in a POST request with the JSON of a [BatchRequest]. In practice, the
/// [BatchRequest] should contain a single request.
@immutable
class SchedulerRequestSubscription extends SubscriptionHandler {
/// Creates a subscription for sending BuildBucket requests.
const SchedulerRequestSubscription({
required super.cache,
required super.config,
required this.buildBucketClient,
super.authProvider,
this.retryOptions = Config.schedulerRetry,
}) : super(subscriptionName: 'scheduler-requests');
final BuildBucketClient buildBucketClient;
final RetryOptions retryOptions;
@override
Future<Body> post() async {
BatchRequest request;
try {
final String data = message.data!;
Map<String, dynamic> jsonData;
log.info('rawJson: $data');
try {
jsonData = jsonDecode(data) as Map<String, dynamic>;
} on FormatException {
jsonData = json.decode(String.fromCharCodes(base64.decode(data))) as Map<String, dynamic>;
}
request = BatchRequest.fromJson(jsonData);
} catch (e) {
log.severe('Failed to construct BatchRequest from message');
log.severe(e);
throw BadRequestException(e.toString());
}
/// Retry scheduling builds upto 3 times.
///
/// Log error message when still failing after retry. Avoid endless rescheduling
/// by acking the pub/sub message without throwing an exception.
String? unScheduledBuilds;
try {
await retryOptions.retry(
() async {
final List<Request> requestsToRetry = await _sendBatchRequest(request);
request = BatchRequest(requests: requestsToRetry);
unScheduledBuilds = requestsToRetry.map((e) => e.scheduleBuild!.builderId.builder).toString();
if (requestsToRetry.isNotEmpty) {
throw InternalServerError('Failed to schedule builds: $unScheduledBuilds.');
}
},
retryIf: (Exception e) => e is InternalServerError,
);
} catch (e) {
log.warning('Failed to schedule builds: $unScheduledBuilds.');
return Body.forString('Failed to schedule builds: $unScheduledBuilds.');
}
return Body.empty;
}
/// Wrapper around [BuildbucketClient.batch] to ensure all requests are made.
///
/// Returns [List<Request>] of requests that need to be retried.
Future<List<Request>> _sendBatchRequest(BatchRequest request) async {
final BatchResponse response = await buildBucketClient.batch(request);
log.fine('Made ${request.requests?.length} and received ${response.responses?.length}');
log.fine('Responses: ${response.responses}');
// By default, retry everything. Then remove requests with a verified response.
final List<Request> retry = request.requests ?? <Request>[];
response.responses?.forEach((Response subresponse) {
if (subresponse.scheduleBuild != null) {
retry
.removeWhere((Request request) => request.scheduleBuild?.builderId == subresponse.scheduleBuild!.builderId);
} else {
log.warning('Response does not have schedule build: $subresponse');
}
if (subresponse.error?.code != 0) {
log.fine('Non-zero grpc code: $subresponse');
}
});
return retry;
}
}