Call pub/sub pull API multiple times to return more messages (#2035)
diff --git a/auto_submit/lib/requests/check_pull_request.dart b/auto_submit/lib/requests/check_pull_request.dart
index 3b71355..7dcb8a9 100644
--- a/auto_submit/lib/requests/check_pull_request.dart
+++ b/auto_submit/lib/requests/check_pull_request.dart
@@ -37,13 +37,23 @@
@override
Future<Response> get() async {
+ // Loops [config.kPubsubPullNumber] times to try pulling/processing as many
+ // messages as possible.
+ for (int i = 0; i < config.kPubsubPullNumber; i++) {
+ await checkPullRequest();
+ }
+ return Response.ok('Finished processing changes');
+ }
+
+ /// Pulls Pub/Sub messages and processes pull requests.
+ Future<void> checkPullRequest() async {
final Set<int> processingLog = <int>{};
- final pub.PullResponse pullResponse = await pubsub.pull('auto-submit-queue-sub', kPullMesssageBatchSize);
final ApproverService approver = approverProvider(config);
+ final pub.PullResponse pullResponse = await pubsub.pull('auto-submit-queue-sub', kPullMesssageBatchSize);
final List<pub.ReceivedMessage>? receivedMessages = pullResponse.receivedMessages;
if (receivedMessages == null) {
log.info('There are no requests in the queue');
- return Response.ok('No requests in the queue.');
+ return;
}
log.info('Processing ${receivedMessages.length} messages');
ValidationService validationService = ValidationService(config);
@@ -53,6 +63,8 @@
final String messageData = message.message!.data!;
final rawBody = json.decode(String.fromCharCodes(base64.decode(messageData))) as Map<String, dynamic>;
final PullRequest pullRequest = PullRequest.fromJson(rawBody);
+ log.info('Processing message ackId: ${message.ackId}');
+ log.info('Processing mesageId: ${message.message!.messageId}');
log.info('Processing PR: $rawBody');
if (processingLog.contains(pullRequest.number)) {
// Ack duplicate.
@@ -67,6 +79,5 @@
futures.add(validationService.processMessage(pullRequest, message.ackId!, pubsub));
}
await Future.wait(futures);
- return Response.ok('Finished processing changes');
}
}
diff --git a/auto_submit/lib/service/config.dart b/auto_submit/lib/service/config.dart
index a7a9c5b..d5e83ea 100644
--- a/auto_submit/lib/service/config.dart
+++ b/auto_submit/lib/service/config.dart
@@ -175,6 +175,12 @@
/// The autosubmit label.
String get autosubmitLabel => 'autosubmit';
+ /// Number of Pub/Sub pull calls in each cron job run.
+ ///
+ /// TODO(keyonghan): monitor and optimize this number based on response time
+ /// https://github.com/flutter/cocoon/pull/2035/files#r938143840.
+ int get kPubsubPullNumber => 5;
+
/// Get the webhook key
Future<String> getWebhookKey() async {
final Uint8List? cacheValue = await cache[kWebHookKey].get(
diff --git a/auto_submit/test/requests/check_pull_request_test.dart b/auto_submit/test/requests/check_pull_request_test.dart
index 0132331..437b381 100644
--- a/auto_submit/test/requests/check_pull_request_test.dart
+++ b/auto_submit/test/requests/check_pull_request_test.dart
@@ -5,23 +5,23 @@
// ignore_for_file: constant_identifier_names
import 'package:auto_submit/service/config.dart';
-import 'package:auto_submit/service/log.dart';
-import 'package:logging/logging.dart';
import 'package:auto_submit/requests/check_pull_request.dart';
import 'package:auto_submit/requests/check_pull_request_queries.dart';
+import 'package:auto_submit/service/log.dart';
import 'package:github/github.dart';
+import 'package:graphql/client.dart' hide Request, Response;
+import 'package:logging/logging.dart';
import 'package:mockito/mockito.dart';
import 'package:test/test.dart';
-import 'package:graphql/client.dart' hide Request, Response;
-import '../utilities/mocks.dart';
-import '../utilities/utils.dart' hide createQueryResult;
import './github_webhook_test_data.dart';
import '../src/request_handling/fake_pubsub.dart';
import '../src/request_handling/fake_authentication.dart';
import '../src/service/fake_config.dart';
import '../src/service/fake_github_service.dart';
import '../src/service/fake_graphql_client.dart';
+import '../utilities/mocks.dart';
+import '../utilities/utils.dart' hide createQueryResult;
const String oid = '6dcb09b5b57875f334f61aebed695e2e4193db5e';
const String title = 'some_title';
@@ -35,7 +35,7 @@
final FakeGithubService githubService = FakeGithubService();
late MockPullRequestsService pullRequests;
final MockGitHub gitHub = MockGitHub();
- final FakePubSub pubsub = FakePubSub();
+ late FakePubSub pubsub;
late PullRequestHelper flutterRequest;
late PullRequestHelper cocoonRequest;
late List<QueryOptions> expectedOptions;
@@ -50,6 +50,7 @@
setUp(() {
githubGraphQLClient = FakeGraphQLClient();
auth = FakeCronAuthProvider();
+ pubsub = FakePubSub();
expectedOptions = <QueryOptions>[];
githubGraphQLClient.mutateResultForOptions = (MutationOptions options) => createFakeQueryResult();
@@ -102,7 +103,7 @@
test('Multiple identical messages are processed once', () async {
final PullRequest pullRequest1 = generatePullRequest(prNumber: 0, repoName: cocoonRepo);
- for (int i = 0; i < 3; i++) {
+ for (int i = 0; i < 2; i++) {
pubsub.publish('auto-submit-queue-sub', pullRequest1);
}
@@ -124,7 +125,7 @@
test('Closed PRs are not processed', () async {
final PullRequest pullRequest1 = generatePullRequest(prNumber: 0, repoName: cocoonRepo, state: 'close');
when(pullRequests.get(any, any)).thenAnswer((_) async => PullRequest(number: 0, state: 'close'));
- for (int i = 0; i < 3; i++) {
+ for (int i = 0; i < 2; i++) {
pubsub.publish('auto-submit-queue-sub', pullRequest1);
}
@@ -474,6 +475,18 @@
verifyQueries(expectedOptions);
assert(pubsub.messagesQueue.isEmpty);
});
+
+ test('Multiple pull calls are executed.', () async {
+ config.kPubsubPullNumberValue = 2;
+ final PullRequest pullRequest1 = generatePullRequest(prNumber: 0, repoName: cocoonRepo);
+ for (int i = 0; i < 3; i++) {
+ pubsub.publish('auto-submit-queue-sub', pullRequest1);
+ }
+ checkPullRequest = CheckPullRequest(config: config, pubsub: pubsub, cronAuthProvider: auth);
+ cocoonRequest = PullRequestHelper(prNumber: 0, lastCommitHash: oid);
+ await checkPullRequest.get();
+ expect(0, pubsub.messagesQueue.length);
+ });
});
}
diff --git a/auto_submit/test/src/request_handling/fake_pubsub.dart b/auto_submit/test/src/request_handling/fake_pubsub.dart
index aead39e..5a6a25d 100644
--- a/auto_submit/test/src/request_handling/fake_pubsub.dart
+++ b/auto_submit/test/src/request_handling/fake_pubsub.dart
@@ -10,6 +10,9 @@
class FakePubSub extends PubSub {
List<dynamic> messagesQueue = <dynamic>[];
+ // Number of messages in each Pub/Sub pull call. This mocks the API
+ // returning random number of messages each time.
+ int messageSize = 2;
@override
Future<void> publish(String topicName, dynamic json) async {
@@ -25,7 +28,10 @@
List<ReceivedMessage> receivedMessages = <ReceivedMessage>[];
if (messagesQueue.isNotEmpty) {
int i = 0;
- while (i < min(100, messagesQueue.length)) {
+ // Returns only allowed max number of messages. The number should not be greater than
+ // `maxMessages`, the available messages, and the number allowed in each call. The
+ // last number is to mock real `pull` API call.
+ while (i < min(min(maxMessages, messagesQueue.length), messageSize)) {
receivedMessages.add(ReceivedMessage(message: PubsubMessage(data: messagesQueue[i] as String), ackId: '1'));
i++;
}
diff --git a/auto_submit/test/src/service/fake_config.dart b/auto_submit/test/src/service/fake_config.dart
index b1bf0f5..b028bef 100644
--- a/auto_submit/test/src/service/fake_config.dart
+++ b/auto_submit/test/src/service/fake_config.dart
@@ -23,6 +23,7 @@
this.overrideTreeStatusLabelValue,
this.autosubmitLabelValue,
this.webhookKey,
+ this.kPubsubPullNumberValue,
}) : super(
cacheProvider: Cache.inMemoryCacheProvider(4),
secretManager: LocalSecretManager(),
@@ -35,6 +36,10 @@
String? overrideTreeStatusLabelValue;
String? autosubmitLabelValue;
String? webhookKey;
+ int? kPubsubPullNumberValue;
+
+ @override
+ int get kPubsubPullNumber => kPubsubPullNumberValue ?? 1;
@override
Future<GitHub> createGithubClient(RepositorySlug slug) async => githubClient!;