// Copyright 2019 The Flutter Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

import 'dart:async';
import 'dart:convert';
import 'dart:io';
import 'dart:typed_data';
import 'package:meta/meta.dart';

import '../model/luci/push_message.dart';
import '../service/cache_service.dart';
import '../service/logging.dart';
import 'api_request_handler.dart';
import 'authentication.dart';
import 'body.dart';
import 'exceptions.dart';
import 'pubsub_authentication.dart';
import 'request_handler.dart';

/// An [ApiRequestHandler] that handles PubSub subscription messages.
///
/// Messages adhere to a specific contract, as follows:
///
///  * All requests must be authenticated per [AuthenticationProvider].
///  * Request body is passed following the format of [PushMessageEnvelope].
@immutable
abstract class SubscriptionHandler extends RequestHandler<Body> {
  /// Creates a new [SubscriptionHandler].
  const SubscriptionHandler({
    required this.cache,
    required super.config,
    this.authProvider,
    required this.subscriptionName,
  });

  final CacheService cache;

  /// Service responsible for authenticating this [HttpRequest].
  final AuthenticationProvider? authProvider;

  /// Unique identifier of the PubSub subscription in this cloud project.
  final String subscriptionName;

  /// The authentication context associated with the HTTP request.
  ///
  /// This is guaranteed to be non-null. If the request was unauthenticated,
  /// the request will be denied.
  @protected
  AuthenticatedContext get authContext => getValue<AuthenticatedContext>(ApiKey.authContext)!;

  /// The [PushMessage] from this [HttpRequest].
  @protected
  PushMessage get message => getValue<PushMessage>(PubSubKey.message)!;

  @override
  Future<void> service(
    HttpRequest request, {
    Future<void> Function(HttpStatusException)? onError,
  }) async {
    AuthenticatedContext authContext;
    final AuthenticationProvider auth = authProvider ?? PubsubAuthenticationProvider(config: config);
    try {
      authContext = await auth.authenticate(request);
    } on Unauthenticated catch (error) {
      final HttpResponse response = request.response;
      response
        ..statusCode = HttpStatus.unauthorized
        ..write(error.message);
      await response.flush();
      await response.close();
      return;
    }

    List<int> body;
    try {
      body = await request.expand<int>((List<int> chunk) => chunk).toList();
    } catch (error) {
      final HttpResponse response = request.response;
      response
        ..statusCode = HttpStatus.internalServerError
        ..write('$error');
      await response.flush();
      await response.close();
      return;
    }

    log.fine('Request body: ${utf8.decode(body)}');
    PushMessageEnvelope? envelope;
    if (body.isNotEmpty) {
      try {
        final Map<String, dynamic> json = jsonDecode(utf8.decode(body)) as Map<String, dynamic>;
        envelope = PushMessageEnvelope.fromJson(json);
      } catch (error) {
        final HttpResponse response = request.response;
        response
          ..statusCode = HttpStatus.internalServerError
          ..write('$error');
        await response.flush();
        await response.close();
        return;
      }
    }

    if (envelope == null) {
      throw const BadRequestException('Failed to get message');
    }
    log.finer(envelope.toJson());
    log.fine('PubsubMessage publishTime: ${envelope.message!.publishTime}');

    final String messageId = envelope.message!.messageId!;

    final Uint8List? messageLock = await cache.getOrCreate(
      subscriptionName,
      messageId,
      createFn: null,
    );
    if (messageLock != null) {
      // No-op - There's already a write lock for this message
      final HttpResponse response = request.response
        ..statusCode = HttpStatus.ok
        ..write('$messageId was already processed');
      await response.flush();
      await response.close();
      return;
    }

    // Create a write lock in the cache to ensure requests are only processed once
    final Uint8List lockValue = Uint8List.fromList('l'.codeUnits);
    await cache.set(
      subscriptionName,
      messageId,
      lockValue,
      ttl: const Duration(days: 1),
    );

    log.info('Processing message $messageId');
    await runZoned<Future<void>>(
      () async => super.service(
        request,
        onError: (HttpStatusException exception) async {
          log.warning('Failed to process $message. (${exception.statusCode}) ${exception.message}');
          await cache.purge(subscriptionName, messageId);
          log.info('Purged write lock from cache');
        },
      ),
      zoneValues: <RequestKey<dynamic>, Object?>{
        PubSubKey.message: envelope.message,
        ApiKey.authContext: authContext,
      },
    );
  }
}

@visibleForTesting
class PubSubKey<T> extends RequestKey<T> {
  const PubSubKey._(super.name);

  static const PubSubKey<PushMessage> message = PubSubKey<PushMessage>._('message');
}
