blob: 44f505b285106358d385413e58c90912b9ec8d78 [file] [log] [blame]
// Copyright 2019 The Flutter Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
import 'dart:convert';
import 'package:cocoon_service/src/request_handling/subscription_handler_v2.dart';
import 'package:meta/meta.dart';
import 'package:retry/retry.dart';
import '../../../cocoon_service.dart';
import '../../service/build_bucket_v2_client.dart';
import 'package:buildbucket/buildbucket_pb.dart' as bbv2;
import '../../request_handling/exceptions.dart';
import '../../service/logging.dart';
/// Subscription for making requests to BuildBucket.
///
/// The PubSub subscription is set up here:
/// https://console.cloud.google.com/cloudpubsub/subscription/detail/cocoon-scheduler-requests?project=flutter-dashboard
///
/// This endpoint allows Cocoon to defer BuildBucket requests off the main request loop. This is critical when new
/// commits are pushed, and they can schedule 100+ builds at once.
///
/// This endpoint takes in a POST request with the JSON of a [bbv2.BatchRequest]. In practice, the
/// [bbv2.BatchRequest] should contain a single request.
@immutable
class SchedulerRequestSubscriptionV2 extends SubscriptionHandlerV2 {
/// Creates a subscription for sending BuildBucket requests.
const SchedulerRequestSubscriptionV2({
required super.cache,
required super.config,
required this.buildBucketClient,
super.authProvider,
this.retryOptions = Config.schedulerRetry,
}) : super(subscriptionName: 'cocoon-scheduler-requests-sub');
final BuildBucketV2Client buildBucketClient;
final RetryOptions retryOptions;
@override
Future<Body> post() async {
if (message.data == null) {
log.info('no data in message');
throw const BadRequestException('no data in message');
}
// final String data = message.data!;
log.fine('attempting to read message ${message.data}');
final bbv2.BatchRequest batchRequest = bbv2.BatchRequest.create();
// Merge from json only works with the integer field names.
batchRequest.mergeFromProto3Json(jsonDecode(message.data!) as Map<String, dynamic>);
log.info('Read the following data: ${batchRequest.toProto3Json().toString()}');
/// Retry scheduling builds upto 3 times.
///
/// Log error message when still failing after retry. Avoid endless rescheduling
/// by acking the pub/sub message without throwing an exception.
String? unscheduledBuilds;
try {
await retryOptions.retry(
() async {
final List<bbv2.BatchRequest_Request> requestsToRetry = await _sendBatchRequest(batchRequest);
// Make a copy of the requests that are passed in as if simply access the list
// we make changes for all instances.
final List<bbv2.BatchRequest_Request> requestListCopy = [];
requestListCopy.addAll(requestsToRetry);
batchRequest.requests.clear();
batchRequest.requests.addAll(requestListCopy);
unscheduledBuilds = requestsToRetry.map((e) => e.scheduleBuild.builder).toString();
if (requestsToRetry.isNotEmpty) {
throw InternalServerError('Failed to schedule builds: $unscheduledBuilds.');
}
},
retryIf: (Exception e) => e is InternalServerError,
);
} catch (e) {
log.warning('Failed to schedule builds on exception: $unscheduledBuilds.');
return Body.forString('Failed to schedule builds: $unscheduledBuilds.');
}
return Body.empty;
}
/// Returns [List<bbv2.BatchRequest_Request>] of requests that need to be retried.
Future<List<bbv2.BatchRequest_Request>> _sendBatchRequest(bbv2.BatchRequest request) async {
log.info('Sending batch request for ${request.toProto3Json().toString()}');
bbv2.BatchResponse response;
try {
response = await buildBucketClient.batch(request);
} catch (e) {
log.severe('Exception making batch Requests.');
rethrow;
}
log.info('Made ${request.requests.length} and received ${response.responses.length}');
log.info('Responses: ${response.responses}');
// By default, retry everything. Then remove requests with a verified response.
// THese are the requests in the batch request object. Just requests.
final List<bbv2.BatchRequest_Request> retry = request.requests;
for (bbv2.BatchResponse_Response batchResponseResponse in response.responses) {
if (batchResponseResponse.hasScheduleBuild()) {
retry.removeWhere((element) => batchResponseResponse.scheduleBuild.builder == element.scheduleBuild.builder);
} else {
log.warning('Response does not have schedule build: $batchResponseResponse');
}
if (batchResponseResponse.hasError() && batchResponseResponse.error.code != 0) {
log.info('Non-zero grpc code: $batchResponseResponse');
}
}
return retry;
}
}