Add transaction retries. (#683)
* Add transaction retries.
Most of the 500 errors in the backend are related to failing
transactions. This change adds retries with quadratic backoffs to remove
the flakiness.
Bugs:
https://github.com/flutter/flutter/issues/42524
https://github.com/flutter/flutter/issues/43112
https://github.com/flutter/flutter/issues/49673
https://github.com/flutter/flutter/issues/49672
* Add tests and also retry on grpc errors.
diff --git a/app_dart/lib/src/foundation/utils.dart b/app_dart/lib/src/foundation/utils.dart
new file mode 100644
index 0000000..e69746e
--- /dev/null
+++ b/app_dart/lib/src/foundation/utils.dart
@@ -0,0 +1,23 @@
+// Copyright 2020 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+import 'package:gcloud/datastore.dart';
+import 'package:retry/retry.dart';
+import 'package:grpc/grpc.dart';
+
+typedef RetryHandler = Function();
+
+// Runs a db transaction with retries.
+//
+// It uses quadratic backoff starting with 50ms and 3 max attempts.
+Future<void> runTransactionWithRetries(RetryHandler retryHandler,
+ {int delayMilliseconds = 50, int maxAttempts = 3}) {
+ final RetryOptions r = RetryOptions(
+ delayFactor: Duration(milliseconds: delayMilliseconds),
+ maxAttempts: maxAttempts);
+ return r.retry(
+ retryHandler,
+ retryIf: (Exception e) => e is TransactionAbortedError || e is GrpcError,
+ );
+}
diff --git a/app_dart/lib/src/request_handlers/push_engine_status_to_github.dart b/app_dart/lib/src/request_handlers/push_engine_status_to_github.dart
index d038d46..45206e3 100644
--- a/app_dart/lib/src/request_handlers/push_engine_status_to_github.dart
+++ b/app_dart/lib/src/request_handlers/push_engine_status_to_github.dart
@@ -9,6 +9,7 @@
import 'package:meta/meta.dart';
import '../datastore/cocoon_config.dart';
+import '../foundation/utils.dart';
import '../model/appengine/github_build_status_update.dart';
import '../model/appengine/task.dart';
import '../request_handling/api_request_handler.dart';
@@ -93,10 +94,13 @@
final int maxEntityGroups = config.maxEntityGroups;
for (int i = 0; i < updates.length; i += maxEntityGroups) {
- await datastore.db.withTransaction<void>((Transaction transaction) async {
- transaction.queueMutations(
- inserts: updates.skip(i).take(maxEntityGroups).toList());
- await transaction.commit();
+ await runTransactionWithRetries(() async {
+ await datastore.db
+ .withTransaction<void>((Transaction transaction) async {
+ transaction.queueMutations(
+ inserts: updates.skip(i).take(maxEntityGroups).toList());
+ await transaction.commit();
+ });
});
}
log.debug('Committed all updates');
diff --git a/app_dart/lib/src/request_handlers/refresh_chromebot_status.dart b/app_dart/lib/src/request_handlers/refresh_chromebot_status.dart
index bb3f50c..40bc941 100644
--- a/app_dart/lib/src/request_handlers/refresh_chromebot_status.dart
+++ b/app_dart/lib/src/request_handlers/refresh_chromebot_status.dart
@@ -8,6 +8,7 @@
import 'package:meta/meta.dart';
import '../datastore/cocoon_config.dart';
+import '../foundation/utils.dart';
import '../model/appengine/task.dart';
import '../request_handling/api_request_handler.dart';
import '../request_handling/authentication.dart';
@@ -49,25 +50,26 @@
);
for (LuciBuilder builder in luciTasks.keys) {
- await config.db.withTransaction<void>((Transaction transaction) async {
- try {
- await for (FullTask task
- in datastore.queryRecentTasks(taskName: builder.taskName)) {
- for (LuciTask luciTask in luciTasks[builder]) {
- if (luciTask.commitSha == task.commit.sha) {
- final Task update = task.task;
- update.status = luciTask.status;
- transaction.queueMutations(inserts: <Task>[update]);
- // Stop updating task whenever we find the latest status of the same commit.
- break;
+ await runTransactionWithRetries(() async {
+ await config.db.withTransaction<void>((Transaction transaction) async {
+ try {
+ await for (FullTask task
+ in datastore.queryRecentTasks(taskName: builder.taskName)) {
+ for (LuciTask luciTask in luciTasks[builder]) {
+ if (luciTask.commitSha == task.commit.sha) {
+ final Task update = task.task;
+ update.status = luciTask.status;
+ transaction.queueMutations(inserts: <Task>[update]);
+ // Stop updating task whenever we find the latest status of the same commit.
+ break;
+ }
}
}
+ await transaction.commit();
+ } catch (error) {
+ rethrow;
}
- await transaction.commit();
- } catch (error) {
- await transaction.rollback();
- rethrow;
- }
+ });
});
}
diff --git a/app_dart/lib/src/request_handlers/reserve_task.dart b/app_dart/lib/src/request_handlers/reserve_task.dart
index ecb9c38..9af62f2 100644
--- a/app_dart/lib/src/request_handlers/reserve_task.dart
+++ b/app_dart/lib/src/request_handlers/reserve_task.dart
@@ -10,6 +10,7 @@
import 'package:meta/meta.dart';
import '../datastore/cocoon_config.dart';
+import '../foundation/utils.dart';
import '../model/appengine/agent.dart';
import '../model/appengine/commit.dart';
import '../model/appengine/key_helper.dart';
@@ -240,20 +241,21 @@
assert(task != null);
assert(agentId != null);
try {
- return config.db.withTransaction<void>((Transaction transaction) async {
- final Task lockedTask = await transaction.lookupValue<Task>(task.key);
+ return runTransactionWithRetries(() async {
+ await config.db.withTransaction<void>((Transaction transaction) async {
+ final Task lockedTask = await transaction.lookupValue<Task>(task.key);
+ if (lockedTask.status != Task.statusNew) {
+ // Another reservation beat us in a race.
+ throw const ReservationLostException();
+ }
- if (lockedTask.status != Task.statusNew) {
- // Another reservation beat us in a race.
- throw const ReservationLostException();
- }
-
- lockedTask.status = Task.statusInProgress;
- lockedTask.attempts += 1;
- lockedTask.startTimestamp = DateTime.now().millisecondsSinceEpoch;
- lockedTask.reservedForAgentId = agentId;
- transaction.queueMutations(inserts: <Task>[lockedTask]);
- await transaction.commit();
+ lockedTask.status = Task.statusInProgress;
+ lockedTask.attempts += 1;
+ lockedTask.startTimestamp = DateTime.now().millisecondsSinceEpoch;
+ lockedTask.reservedForAgentId = agentId;
+ transaction.queueMutations(inserts: <Task>[lockedTask]);
+ await transaction.commit();
+ });
});
} catch (error) {
throw const ReservationLostException();
diff --git a/app_dart/lib/src/request_handlers/update_task_status.dart b/app_dart/lib/src/request_handlers/update_task_status.dart
index 35a8fcf..363c6c7 100644
--- a/app_dart/lib/src/request_handlers/update_task_status.dart
+++ b/app_dart/lib/src/request_handlers/update_task_status.dart
@@ -10,6 +10,7 @@
import 'package:meta/meta.dart';
import '../datastore/cocoon_config.dart';
+import '../foundation/utils.dart';
import '../model/appengine/commit.dart';
import '../model/appengine/key_helper.dart';
import '../model/appengine/task.dart';
@@ -96,9 +97,11 @@
task.endTimestamp = DateTime.now().millisecondsSinceEpoch;
}
- await datastore.db.withTransaction<void>((Transaction transaction) async {
- transaction.queueMutations(inserts: <Task>[task]);
- await transaction.commit();
+ await runTransactionWithRetries(() async {
+ await datastore.db.withTransaction<void>((Transaction transaction) async {
+ transaction.queueMutations(inserts: <Task>[task]);
+ await transaction.commit();
+ });
});
if (task.endTimestamp > 0) {
@@ -109,22 +112,24 @@
if (newStatus == Task.statusSucceeded && scoreKeys.isNotEmpty) {
for (String scoreKey in scoreKeys) {
- await datastore.db
- .withTransaction<void>((Transaction transaction) async {
- final TimeSeries series =
- await _getOrCreateTimeSeries(transaction, task, scoreKey);
- final num value = resultData[scoreKey] as num;
+ await runTransactionWithRetries(() async {
+ await datastore.db
+ .withTransaction<void>((Transaction transaction) async {
+ final TimeSeries series =
+ await _getOrCreateTimeSeries(transaction, task, scoreKey);
+ final num value = resultData[scoreKey] as num;
- final TimeSeriesValue seriesValue = TimeSeriesValue(
- key: series.key.append(TimeSeriesValue),
- createTimestamp: DateTime.now().millisecondsSinceEpoch,
- revision: commit.sha,
- taskKey: task.key,
- value: value.toDouble(),
- );
+ final TimeSeriesValue seriesValue = TimeSeriesValue(
+ key: series.key.append(TimeSeriesValue),
+ createTimestamp: DateTime.now().millisecondsSinceEpoch,
+ revision: commit.sha,
+ taskKey: task.key,
+ value: value.toDouble(),
+ );
- transaction.queueMutations(inserts: <TimeSeriesValue>[seriesValue]);
- await transaction.commit();
+ transaction.queueMutations(inserts: <TimeSeriesValue>[seriesValue]);
+ await transaction.commit();
+ });
});
}
}
diff --git a/app_dart/test/foundation/utils_test.dart b/app_dart/test/foundation/utils_test.dart
new file mode 100644
index 0000000..b2e0c00
--- /dev/null
+++ b/app_dart/test/foundation/utils_test.dart
@@ -0,0 +1,56 @@
+// Copyright 2020 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+import 'package:test/test.dart';
+import 'package:gcloud/datastore.dart';
+import 'package:grpc/grpc.dart';
+
+import 'package:cocoon_service/src/foundation/utils.dart';
+
+class Counter {
+ int count = 0;
+ void increase() {
+ count = count + 1;
+ }
+
+ int value() {
+ return count;
+ }
+}
+
+void main() {
+ group('RunTransactionWithRetry', () {
+ test('retriesOnGrpcError', () async {
+ final Counter counter = Counter();
+ try {
+ await runTransactionWithRetries(() async {
+ counter.increase();
+ throw GrpcError.aborted();
+ });
+ } catch (e) {
+ expect(e, isA<GrpcError>());
+ }
+ expect(counter.value(), greaterThan(1));
+ });
+ test('retriesTransactionAbortedError', () async {
+ final Counter counter = Counter();
+ try {
+ await runTransactionWithRetries(() async {
+ counter.increase();
+ throw TransactionAbortedError();
+ });
+ } catch (e) {
+ expect(e, isA<TransactionAbortedError>());
+ }
+ expect(counter.value(), greaterThan(1));
+ });
+ test('DoesNotRetryOnSuccess', () async {
+ final Counter counter = Counter();
+ await runTransactionWithRetries(() async {
+ counter.increase();
+ });
+ expect(counter.value(), equals(1));
+ });
+ });
+}