[app_dart] Move release logic to dedicated subscription (#2167)

diff --git a/app_dart/bin/server.dart b/app_dart/bin/server.dart
index e3e8b05..36b641f 100644
--- a/app_dart/bin/server.dart
+++ b/app_dart/bin/server.dart
@@ -73,10 +73,14 @@
         config: config,
         pubsub: const PubSub(),
       ),
-      '/api/github/webhook-subscription': GithubWebhookSubscription(
+      '/api/github/webhook-branch-subscription': GithubBranchWebhookSubscription(
         config: config,
         cache: cache,
         branchService: branchService,
+      ),
+      '/api/github/webhook-subscription': GithubWebhookSubscription(
+        config: config,
+        cache: cache,
         githubChecksService: githubChecksService,
         scheduler: scheduler,
       ),
diff --git a/app_dart/lib/cocoon_service.dart b/app_dart/lib/cocoon_service.dart
index a3c0fa7..e6c5770 100644
--- a/app_dart/lib/cocoon_service.dart
+++ b/app_dart/lib/cocoon_service.dart
@@ -16,6 +16,7 @@
 export 'src/request_handlers/get_green_commits.dart';
 export 'src/request_handlers/github_rate_limit_status.dart';
 export 'src/request_handlers/github_webhook.dart';
+export 'src/request_handlers/github/branch_subscription.dart';
 export 'src/request_handlers/github/webhook_subscription.dart';
 export 'src/request_handlers/postsubmit_luci_subscription.dart';
 export 'src/request_handlers/presubmit_luci_subscription.dart';
diff --git a/app_dart/lib/src/request_handlers/github/branch_subscription.dart b/app_dart/lib/src/request_handlers/github/branch_subscription.dart
new file mode 100644
index 0000000..2f54085
--- /dev/null
+++ b/app_dart/lib/src/request_handlers/github/branch_subscription.dart
@@ -0,0 +1,55 @@
+// Copyright 2019 The Flutter Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+import 'dart:convert';
+
+import 'package:github/hooks.dart';
+import 'package:meta/meta.dart';
+
+import '../../../protos.dart' as pb;
+import '../../request_handling/body.dart';
+import '../../request_handling/subscription_handler.dart';
+import '../../service/branch_service.dart';
+import '../../service/config.dart';
+import '../../service/logging.dart';
+
+const String kWebhookCreateEvent = 'create';
+
+/// Subscription for processing GitHub webhooks relating to branches.
+///
+/// This subscription processes branch events on GitHub into Cocoon.
+@immutable
+class GithubBranchWebhookSubscription extends SubscriptionHandler {
+  /// Creates a subscription for processing GitHub webhooks.
+  const GithubBranchWebhookSubscription({
+    required super.cache,
+    required super.config,
+    required this.branchService,
+  }) : super(subscriptionName: 'github-webhook-branches');
+
+  final BranchService branchService;
+
+  @override
+  Future<Body> post() async {
+    if (message.data == null || message.data!.isEmpty) {
+      log.warning('GitHub webhook message was empty. No-oping');
+      return Body.empty;
+    }
+
+    final pb.GithubWebhookMessage webhook = pb.GithubWebhookMessage.fromJson(message.data!);
+    if (webhook.event != kWebhookCreateEvent) {
+      return Body.empty;
+    }
+
+    log.fine('Processing ${webhook.event}');
+    final CreateEvent createEvent = CreateEvent.fromJson(json.decode(webhook.payload) as Map<String, dynamic>);
+    await branchService.handleCreateRequest(createEvent);
+    if (createEvent.repository?.slug() == Config.flutterSlug) {
+      await branchService.branchFlutterRecipes(createEvent.ref!);
+      log.fine('Created flutter/recipes branch for ${createEvent.ref!}');
+    }
+
+    return Body.empty;
+  }
+}
diff --git a/app_dart/lib/src/request_handlers/github/webhook_subscription.dart b/app_dart/lib/src/request_handlers/github/webhook_subscription.dart
index 5fc0931..fadc313 100644
--- a/app_dart/lib/src/request_handlers/github/webhook_subscription.dart
+++ b/app_dart/lib/src/request_handlers/github/webhook_subscription.dart
@@ -13,7 +13,6 @@
 import '../../request_handling/body.dart';
 import '../../request_handling/exceptions.dart';
 import '../../request_handling/subscription_handler.dart';
-import '../../service/branch_service.dart';
 import '../../service/config.dart';
 import '../../service/datastore.dart';
 import '../../service/github_checks_service.dart';
@@ -56,9 +55,8 @@
     required this.scheduler,
     this.githubChecksService,
     this.datastoreProvider = DatastoreService.defaultProvider,
-    required this.branchService,
     super.authProvider,
-  }) : super(topicName: 'github-webhooks');
+  }) : super(subscriptionName: 'github-webhooks-sub');
 
   /// Cocoon scheduler to trigger tasks against changes from GitHub.
   final Scheduler scheduler;
@@ -67,7 +65,6 @@
   final GithubChecksService? githubChecksService;
 
   final DatastoreServiceProvider datastoreProvider;
-  final BranchService branchService;
 
   @override
   Future<Body> post() async {
@@ -91,13 +88,6 @@
           throw InternalServerError('Failed to process $checkRunEvent');
         }
         break;
-      case 'create':
-        final CreateEvent createEvent = CreateEvent.fromJson(json.decode(webhook.payload) as Map<String, dynamic>);
-        await branchService.handleCreateRequest(createEvent);
-        if (createEvent.repository?.slug() == Config.flutterSlug) {
-          await branchService.branchFlutterRecipes(createEvent.ref!);
-        }
-        break;
     }
 
     return Body.empty;
diff --git a/app_dart/lib/src/request_handlers/postsubmit_luci_subscription.dart b/app_dart/lib/src/request_handlers/postsubmit_luci_subscription.dart
index c13b124..6be1cd4 100644
--- a/app_dart/lib/src/request_handlers/postsubmit_luci_subscription.dart
+++ b/app_dart/lib/src/request_handlers/postsubmit_luci_subscription.dart
@@ -35,7 +35,7 @@
     @visibleForTesting this.datastoreProvider = DatastoreService.defaultProvider,
     required this.luciBuildService,
     required this.scheduler,
-  }) : super(topicName: 'luci-postsubmit');
+  }) : super(subscriptionName: 'luci-postsubmit');
 
   final DatastoreServiceProvider datastoreProvider;
   final LuciBuildService luciBuildService;
diff --git a/app_dart/lib/src/request_handlers/presubmit_luci_subscription.dart b/app_dart/lib/src/request_handlers/presubmit_luci_subscription.dart
index 81632d4..8b35a98 100644
--- a/app_dart/lib/src/request_handlers/presubmit_luci_subscription.dart
+++ b/app_dart/lib/src/request_handlers/presubmit_luci_subscription.dart
@@ -38,7 +38,7 @@
     required this.luciBuildService,
     required this.githubChecksService,
     AuthenticationProvider? authProvider,
-  }) : super(topicName: 'github-updater');
+  }) : super(subscriptionName: 'github-updater');
 
   final BuildBucketClient buildBucketClient;
   final LuciBuildService luciBuildService;
diff --git a/app_dart/lib/src/request_handlers/scheduler/request_subscription.dart b/app_dart/lib/src/request_handlers/scheduler/request_subscription.dart
index 317279f..89231a7 100644
--- a/app_dart/lib/src/request_handlers/scheduler/request_subscription.dart
+++ b/app_dart/lib/src/request_handlers/scheduler/request_subscription.dart
@@ -32,7 +32,7 @@
     required this.buildBucketClient,
     super.authProvider,
     this.retryOptions = Config.schedulerRetry,
-  }) : super(topicName: 'scheduler-requests');
+  }) : super(subscriptionName: 'scheduler-requests');
 
   final BuildBucketClient buildBucketClient;
 
diff --git a/app_dart/lib/src/request_handling/subscription_handler.dart b/app_dart/lib/src/request_handling/subscription_handler.dart
index 6cc40a6..a80b3f6 100644
--- a/app_dart/lib/src/request_handling/subscription_handler.dart
+++ b/app_dart/lib/src/request_handling/subscription_handler.dart
@@ -31,7 +31,7 @@
     required this.cache,
     required super.config,
     this.authProvider,
-    required this.topicName,
+    required this.subscriptionName,
   });
 
   final CacheService cache;
@@ -39,8 +39,8 @@
   /// Service responsible for authenticating this [HttpRequest].
   final AuthenticationProvider? authProvider;
 
-  /// Unique identifier of the PubSub in this cloud project.
-  final String topicName;
+  /// Unique identifier of the PubSub subscription in this cloud project.
+  final String subscriptionName;
 
   /// The authentication context associated with the HTTP request.
   ///
@@ -109,7 +109,7 @@
 
     final String messageId = envelope.message!.messageId!;
 
-    final Uint8List? messageLock = await cache.getOrCreate(topicName, messageId);
+    final Uint8List? messageLock = await cache.getOrCreate(subscriptionName, messageId);
     if (messageLock != null) {
       // No-op - There's already a write lock for this message
       final HttpResponse response = request.response
@@ -123,7 +123,7 @@
     // Create a write lock in the cache to ensure requests are only processed once
     final Uint8List lockValue = Uint8List.fromList('l'.codeUnits);
     await cache.set(
-      topicName,
+      subscriptionName,
       messageId,
       lockValue,
       ttl: const Duration(days: 1),
@@ -135,7 +135,7 @@
         request,
         onError: (_) async {
           log.warning('Failed to process $message');
-          await cache.purge(topicName, messageId);
+          await cache.purge(subscriptionName, messageId);
           log.info('Purged write lock from cache');
         },
       ),
diff --git a/app_dart/test/request_handlers/github/branch_subscription_test.dart b/app_dart/test/request_handlers/github/branch_subscription_test.dart
new file mode 100644
index 0000000..1e0d751
--- /dev/null
+++ b/app_dart/test/request_handlers/github/branch_subscription_test.dart
@@ -0,0 +1,61 @@
+// Copyright 2019 The Flutter Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+import 'package:cocoon_service/src/request_handlers/github/branch_subscription.dart';
+import 'package:cocoon_service/src/request_handling/body.dart';
+import 'package:cocoon_service/src/service/cache_service.dart';
+import 'package:cocoon_service/src/service/config.dart';
+
+import 'package:mockito/mockito.dart';
+import 'package:test/test.dart';
+
+import '../../src/datastore/fake_config.dart';
+import '../../src/request_handling/fake_http.dart';
+import '../../src/request_handling/subscription_tester.dart';
+import '../../src/utilities/mocks.dart';
+import '../../src/utilities/webhook_generators.dart';
+
+void main() {
+  group('GithubBranchWebhookSubscription', () {
+    late GithubBranchWebhookSubscription webhook;
+    late FakeHttpRequest request;
+    late MockBranchService branchService;
+    late SubscriptionTester tester;
+
+    /// Name of an example release base branch name.
+    const String kReleaseBaseRef = 'flutter-2.12-candidate.4';
+
+    setUp(() {
+      request = FakeHttpRequest();
+      branchService = MockBranchService();
+      tester = SubscriptionTester(request: request);
+
+      webhook = GithubBranchWebhookSubscription(
+        config: FakeConfig(),
+        cache: CacheService(inMemory: true),
+        branchService: branchService,
+      );
+    });
+    test('process create branch event', () async {
+      tester.message = generateCreateBranchMessage(kReleaseBaseRef, Config.flutterSlug.fullName);
+      await tester.post(webhook);
+
+      verify(branchService.branchFlutterRecipes(kReleaseBaseRef));
+    });
+
+    test('do not create recipe branches on non-flutter/flutter branches', () async {
+      tester.message = generateCreateBranchMessage(kReleaseBaseRef, Config.engineSlug.fullName);
+      await tester.post(webhook);
+
+      verifyNever(branchService.branchFlutterRecipes(any));
+    });
+
+    test('do not process non-create messages', () async {
+      tester.message = generateGithubWebhookMessage();
+      expect(await tester.post(webhook), Body.empty);
+
+      verifyNever(branchService.branchFlutterRecipes(any));
+    });
+  });
+}
diff --git a/app_dart/test/request_handlers/github/webhook_subscription_test.dart b/app_dart/test/request_handlers/github/webhook_subscription_test.dart
index 6e428be..78afbf5 100644
--- a/app_dart/test/request_handlers/github/webhook_subscription_test.dart
+++ b/app_dart/test/request_handlers/github/webhook_subscription_test.dart
@@ -33,7 +33,6 @@
   late FakeGithubService githubService;
   late FakeHttpRequest request;
   late FakeScheduler scheduler;
-  late MockBranchService branchService;
   late MockGitHub gitHubClient;
   late MockGithubChecksUtil mockGithubChecksUtil;
   late MockGithubChecksService mockGithubChecksService;
@@ -70,7 +69,6 @@
       wrongHeadBranchPullRequestMessageValue: 'wrongHeadBranchPullRequestMessage',
       wrongBaseBranchPullRequestMessageValue: '{{target_branch}} -> {{default_branch}}',
     );
-    branchService = MockBranchService();
     issuesService = MockIssuesService();
     when(issuesService.addLabelsToIssue(any, any, any)).thenAnswer((_) async => <IssueLabel>[]);
     when(issuesService.createComment(any, any, any)).thenAnswer((_) async => IssueComment());
@@ -107,7 +105,6 @@
       datastoreProvider: (_) => DatastoreService(config.db, 5),
       githubChecksService: mockGithubChecksService,
       scheduler: scheduler,
-      branchService: branchService,
     );
   });
 
@@ -2435,22 +2432,6 @@
     });
   });
 
-  group('github webhook create branch event', () {
-    test('process create branch event', () async {
-      tester.message = generateCreateBranchMessage('flutter-2.12-candidate.4', Config.flutterSlug.fullName);
-      await tester.post(webhook);
-
-      verify(branchService.branchFlutterRecipes('flutter-2.12-candidate.4'));
-    });
-
-    test('do not create recipe branches on non-flutter/flutter branches', () async {
-      tester.message = generateCreateBranchMessage('flutter-2.12-candidate.4', Config.engineSlug.fullName);
-      await tester.post(webhook);
-
-      verifyNever(branchService.branchFlutterRecipes(any));
-    });
-  });
-
   group('github webhook check_run event', () {
     test('processes check run event', () async {
       tester.message = generateCheckRunEvent();
diff --git a/app_dart/test/request_handling/subscription_handler_test.dart b/app_dart/test/request_handling/subscription_handler_test.dart
index 30f7fea..04b4684 100644
--- a/app_dart/test/request_handling/subscription_handler_test.dart
+++ b/app_dart/test/request_handling/subscription_handler_test.dart
@@ -95,7 +95,7 @@
       expect(response.statusCode, HttpStatus.ok);
       // 2. Empty message is returned as this was already processed
       expect(responseBody, '123 was already processed');
-      expect(await cache.getOrCreate(subscription.topicName, '123'), isNotNull);
+      expect(await cache.getOrCreate(subscription.subscriptionName, '123'), isNotNull);
     });
 
     test('ensure messages can be retried', () async {
@@ -121,7 +121,7 @@
           cache: CacheService(inMemory: true),
           config: FakeConfig(),
           authProvider: FakeAuthenticationProvider(authenticated: false),
-          topicName: 'unauth',
+          subscriptionName: 'unauth',
         );
 
   @override
@@ -135,7 +135,7 @@
           cache: CacheService(inMemory: true),
           config: FakeConfig(),
           authProvider: FakeAuthenticationProvider(),
-          topicName: 'auth',
+          subscriptionName: 'auth',
         );
 
   @override
@@ -149,7 +149,7 @@
           cache: cache ?? CacheService(inMemory: true),
           config: FakeConfig(),
           authProvider: FakeAuthenticationProvider(),
-          topicName: 'error',
+          subscriptionName: 'error',
         );
 
   @override
@@ -163,7 +163,7 @@
           cache: cache ?? CacheService(inMemory: true),
           config: FakeConfig(),
           authProvider: FakeAuthenticationProvider(),
-          topicName: 'read',
+          subscriptionName: 'read',
         );
 
   @override