Call pub/sub pull API multiple times to return more messages (#2035)

diff --git a/auto_submit/lib/requests/check_pull_request.dart b/auto_submit/lib/requests/check_pull_request.dart
index 3b71355..7dcb8a9 100644
--- a/auto_submit/lib/requests/check_pull_request.dart
+++ b/auto_submit/lib/requests/check_pull_request.dart
@@ -37,13 +37,23 @@
 
   @override
   Future<Response> get() async {
+    // Loops [config.kPubsubPullNumber] times to try pulling/processing as many
+    // messages as possible.
+    for (int i = 0; i < config.kPubsubPullNumber; i++) {
+      await checkPullRequest();
+    }
+    return Response.ok('Finished processing changes');
+  }
+
+  /// Pulls Pub/Sub messages and processes pull requests.
+  Future<void> checkPullRequest() async {
     final Set<int> processingLog = <int>{};
-    final pub.PullResponse pullResponse = await pubsub.pull('auto-submit-queue-sub', kPullMesssageBatchSize);
     final ApproverService approver = approverProvider(config);
+    final pub.PullResponse pullResponse = await pubsub.pull('auto-submit-queue-sub', kPullMesssageBatchSize);
     final List<pub.ReceivedMessage>? receivedMessages = pullResponse.receivedMessages;
     if (receivedMessages == null) {
       log.info('There are no requests in the queue');
-      return Response.ok('No requests in the queue.');
+      return;
     }
     log.info('Processing ${receivedMessages.length} messages');
     ValidationService validationService = ValidationService(config);
@@ -53,6 +63,8 @@
       final String messageData = message.message!.data!;
       final rawBody = json.decode(String.fromCharCodes(base64.decode(messageData))) as Map<String, dynamic>;
       final PullRequest pullRequest = PullRequest.fromJson(rawBody);
+      log.info('Processing message ackId: ${message.ackId}');
+      log.info('Processing mesageId: ${message.message!.messageId}');
       log.info('Processing PR: $rawBody');
       if (processingLog.contains(pullRequest.number)) {
         // Ack duplicate.
@@ -67,6 +79,5 @@
       futures.add(validationService.processMessage(pullRequest, message.ackId!, pubsub));
     }
     await Future.wait(futures);
-    return Response.ok('Finished processing changes');
   }
 }
diff --git a/auto_submit/lib/service/config.dart b/auto_submit/lib/service/config.dart
index a7a9c5b..d5e83ea 100644
--- a/auto_submit/lib/service/config.dart
+++ b/auto_submit/lib/service/config.dart
@@ -175,6 +175,12 @@
   /// The autosubmit label.
   String get autosubmitLabel => 'autosubmit';
 
+  /// Number of Pub/Sub pull calls in each cron job run.
+  ///
+  /// TODO(keyonghan): monitor and optimize this number based on response time
+  /// https://github.com/flutter/cocoon/pull/2035/files#r938143840.
+  int get kPubsubPullNumber => 5;
+
   /// Get the webhook key
   Future<String> getWebhookKey() async {
     final Uint8List? cacheValue = await cache[kWebHookKey].get(
diff --git a/auto_submit/test/requests/check_pull_request_test.dart b/auto_submit/test/requests/check_pull_request_test.dart
index 0132331..437b381 100644
--- a/auto_submit/test/requests/check_pull_request_test.dart
+++ b/auto_submit/test/requests/check_pull_request_test.dart
@@ -5,23 +5,23 @@
 // ignore_for_file: constant_identifier_names
 import 'package:auto_submit/service/config.dart';
 
-import 'package:auto_submit/service/log.dart';
-import 'package:logging/logging.dart';
 import 'package:auto_submit/requests/check_pull_request.dart';
 import 'package:auto_submit/requests/check_pull_request_queries.dart';
+import 'package:auto_submit/service/log.dart';
 import 'package:github/github.dart';
+import 'package:graphql/client.dart' hide Request, Response;
+import 'package:logging/logging.dart';
 import 'package:mockito/mockito.dart';
 import 'package:test/test.dart';
-import 'package:graphql/client.dart' hide Request, Response;
 
-import '../utilities/mocks.dart';
-import '../utilities/utils.dart' hide createQueryResult;
 import './github_webhook_test_data.dart';
 import '../src/request_handling/fake_pubsub.dart';
 import '../src/request_handling/fake_authentication.dart';
 import '../src/service/fake_config.dart';
 import '../src/service/fake_github_service.dart';
 import '../src/service/fake_graphql_client.dart';
+import '../utilities/mocks.dart';
+import '../utilities/utils.dart' hide createQueryResult;
 
 const String oid = '6dcb09b5b57875f334f61aebed695e2e4193db5e';
 const String title = 'some_title';
@@ -35,7 +35,7 @@
     final FakeGithubService githubService = FakeGithubService();
     late MockPullRequestsService pullRequests;
     final MockGitHub gitHub = MockGitHub();
-    final FakePubSub pubsub = FakePubSub();
+    late FakePubSub pubsub;
     late PullRequestHelper flutterRequest;
     late PullRequestHelper cocoonRequest;
     late List<QueryOptions> expectedOptions;
@@ -50,6 +50,7 @@
     setUp(() {
       githubGraphQLClient = FakeGraphQLClient();
       auth = FakeCronAuthProvider();
+      pubsub = FakePubSub();
       expectedOptions = <QueryOptions>[];
 
       githubGraphQLClient.mutateResultForOptions = (MutationOptions options) => createFakeQueryResult();
@@ -102,7 +103,7 @@
 
     test('Multiple identical messages are processed once', () async {
       final PullRequest pullRequest1 = generatePullRequest(prNumber: 0, repoName: cocoonRepo);
-      for (int i = 0; i < 3; i++) {
+      for (int i = 0; i < 2; i++) {
         pubsub.publish('auto-submit-queue-sub', pullRequest1);
       }
 
@@ -124,7 +125,7 @@
     test('Closed PRs are not processed', () async {
       final PullRequest pullRequest1 = generatePullRequest(prNumber: 0, repoName: cocoonRepo, state: 'close');
       when(pullRequests.get(any, any)).thenAnswer((_) async => PullRequest(number: 0, state: 'close'));
-      for (int i = 0; i < 3; i++) {
+      for (int i = 0; i < 2; i++) {
         pubsub.publish('auto-submit-queue-sub', pullRequest1);
       }
 
@@ -474,6 +475,18 @@
       verifyQueries(expectedOptions);
       assert(pubsub.messagesQueue.isEmpty);
     });
+
+    test('Multiple pull calls are executed.', () async {
+      config.kPubsubPullNumberValue = 2;
+      final PullRequest pullRequest1 = generatePullRequest(prNumber: 0, repoName: cocoonRepo);
+      for (int i = 0; i < 3; i++) {
+        pubsub.publish('auto-submit-queue-sub', pullRequest1);
+      }
+      checkPullRequest = CheckPullRequest(config: config, pubsub: pubsub, cronAuthProvider: auth);
+      cocoonRequest = PullRequestHelper(prNumber: 0, lastCommitHash: oid);
+      await checkPullRequest.get();
+      expect(0, pubsub.messagesQueue.length);
+    });
   });
 }
 
diff --git a/auto_submit/test/src/request_handling/fake_pubsub.dart b/auto_submit/test/src/request_handling/fake_pubsub.dart
index aead39e..5a6a25d 100644
--- a/auto_submit/test/src/request_handling/fake_pubsub.dart
+++ b/auto_submit/test/src/request_handling/fake_pubsub.dart
@@ -10,6 +10,9 @@
 
 class FakePubSub extends PubSub {
   List<dynamic> messagesQueue = <dynamic>[];
+  // Number of messages in each Pub/Sub pull call. This mocks the API
+  // returning random number of messages each time.
+  int messageSize = 2;
 
   @override
   Future<void> publish(String topicName, dynamic json) async {
@@ -25,7 +28,10 @@
     List<ReceivedMessage> receivedMessages = <ReceivedMessage>[];
     if (messagesQueue.isNotEmpty) {
       int i = 0;
-      while (i < min(100, messagesQueue.length)) {
+      // Returns only allowed max number of messages. The number should not be greater than
+      // `maxMessages`, the available messages, and the number allowed in each call. The
+      // last number is to mock real `pull` API call.
+      while (i < min(min(maxMessages, messagesQueue.length), messageSize)) {
         receivedMessages.add(ReceivedMessage(message: PubsubMessage(data: messagesQueue[i] as String), ackId: '1'));
         i++;
       }
diff --git a/auto_submit/test/src/service/fake_config.dart b/auto_submit/test/src/service/fake_config.dart
index b1bf0f5..b028bef 100644
--- a/auto_submit/test/src/service/fake_config.dart
+++ b/auto_submit/test/src/service/fake_config.dart
@@ -23,6 +23,7 @@
     this.overrideTreeStatusLabelValue,
     this.autosubmitLabelValue,
     this.webhookKey,
+    this.kPubsubPullNumberValue,
   }) : super(
           cacheProvider: Cache.inMemoryCacheProvider(4),
           secretManager: LocalSecretManager(),
@@ -35,6 +36,10 @@
   String? overrideTreeStatusLabelValue;
   String? autosubmitLabelValue;
   String? webhookKey;
+  int? kPubsubPullNumberValue;
+
+  @override
+  int get kPubsubPullNumber => kPubsubPullNumberValue ?? 1;
 
   @override
   Future<GitHub> createGithubClient(RepositorySlug slug) async => githubClient!;