blob: b7cea2e562bfa70102ddc7aafc68f068c23b7127 [file] [log] [blame]
// Copyright 2019 The Flutter Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
import 'dart:async';
import 'dart:convert';
import 'dart:io';
import 'dart:typed_data';
import 'package:meta/meta.dart';
import '../model/luci/push_message.dart';
import '../service/cache_service.dart';
import '../service/logging.dart';
import 'api_request_handler.dart';
import 'authentication.dart';
import 'body.dart';
import 'exceptions.dart';
import 'pubsub_authentication.dart';
import 'request_handler.dart';
/// An [ApiRequestHandler] that handles PubSub subscription messages.
///
/// Messages adhere to a specific contract, as follows:
///
/// * All requests must be authenticated per [AuthenticationProvider].
/// * Request body is passed following the format of [PushMessageEnvelope].
@immutable
abstract class SubscriptionHandler extends RequestHandler<Body> {
/// Creates a new [SubscriptionHandler].
const SubscriptionHandler({
required this.cache,
required super.config,
this.authProvider,
required this.subscriptionName,
});
final CacheService cache;
/// Service responsible for authenticating this [HttpRequest].
final AuthenticationProvider? authProvider;
/// Unique identifier of the PubSub subscription in this cloud project.
final String subscriptionName;
/// The authentication context associated with the HTTP request.
///
/// This is guaranteed to be non-null. If the request was unauthenticated,
/// the request will be denied.
@protected
AuthenticatedContext get authContext => getValue<AuthenticatedContext>(ApiKey.authContext)!;
/// The [PushMessage] from this [HttpRequest].
@protected
PushMessage get message => getValue<PushMessage>(PubSubKey.message)!;
@override
Future<void> service(
HttpRequest request, {
Future<void> Function(HttpStatusException)? onError,
}) async {
AuthenticatedContext authContext;
final AuthenticationProvider auth = authProvider ?? PubsubAuthenticationProvider(config: config);
try {
authContext = await auth.authenticate(request);
} on Unauthenticated catch (error) {
final HttpResponse response = request.response;
response
..statusCode = HttpStatus.unauthorized
..write(error.message);
await response.flush();
await response.close();
return;
}
List<int> body;
try {
body = await request.expand<int>((List<int> chunk) => chunk).toList();
} catch (error) {
final HttpResponse response = request.response;
response
..statusCode = HttpStatus.internalServerError
..write('$error');
await response.flush();
await response.close();
return;
}
log.fine('Request body: ${utf8.decode(body)}');
PushMessageEnvelope? envelope;
if (body.isNotEmpty) {
try {
final Map<String, dynamic> json = jsonDecode(utf8.decode(body)) as Map<String, dynamic>;
envelope = PushMessageEnvelope.fromJson(json);
} catch (error) {
final HttpResponse response = request.response;
response
..statusCode = HttpStatus.internalServerError
..write('$error');
await response.flush();
await response.close();
return;
}
}
if (envelope == null) {
throw const BadRequestException('Failed to get message');
}
log.finer(envelope.toJson());
log.fine('PubsubMessage publishTime: ${envelope.message!.publishTime}');
final String messageId = envelope.message!.messageId!;
final Uint8List? messageLock = await cache.getOrCreate(
subscriptionName,
messageId,
createFn: null,
);
if (messageLock != null) {
// No-op - There's already a write lock for this message
final HttpResponse response = request.response
..statusCode = HttpStatus.ok
..write('$messageId was already processed');
await response.flush();
await response.close();
return;
}
// Create a write lock in the cache to ensure requests are only processed once
final Uint8List lockValue = Uint8List.fromList('l'.codeUnits);
await cache.set(
subscriptionName,
messageId,
lockValue,
ttl: const Duration(days: 1),
);
log.info('Processing message $messageId');
await runZoned<Future<void>>(
() async => super.service(
request,
onError: (HttpStatusException exception) async {
log.warning('Failed to process $message. (${exception.statusCode}) ${exception.message}');
await cache.purge(subscriptionName, messageId);
log.info('Purged write lock from cache');
},
),
zoneValues: <RequestKey<dynamic>, Object?>{
PubSubKey.message: envelope.message,
ApiKey.authContext: authContext,
},
);
}
}
@visibleForTesting
class PubSubKey<T> extends RequestKey<T> {
const PubSubKey._(super.name);
static const PubSubKey<PushMessage> message = PubSubKey<PushMessage>._('message');
}