Add transaction retries. (#683)

* Add transaction retries.

Most of the 500 errors in the backend are related to failing
transactions. This change adds retries with quadratic backoffs to remove
the flakiness.

Bugs:

https://github.com/flutter/flutter/issues/42524
https://github.com/flutter/flutter/issues/43112
https://github.com/flutter/flutter/issues/49673
https://github.com/flutter/flutter/issues/49672

* Add tests and also retry on grpc errors.
diff --git a/app_dart/lib/src/foundation/utils.dart b/app_dart/lib/src/foundation/utils.dart
new file mode 100644
index 0000000..e69746e
--- /dev/null
+++ b/app_dart/lib/src/foundation/utils.dart
@@ -0,0 +1,23 @@
+// Copyright 2020 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+import 'package:gcloud/datastore.dart';
+import 'package:retry/retry.dart';
+import 'package:grpc/grpc.dart';
+
+typedef RetryHandler = Function();
+
+// Runs a db transaction with retries.
+//
+// It uses quadratic backoff starting with 50ms and 3 max attempts.
+Future<void> runTransactionWithRetries(RetryHandler retryHandler,
+    {int delayMilliseconds = 50, int maxAttempts = 3}) {
+  final RetryOptions r = RetryOptions(
+      delayFactor: Duration(milliseconds: delayMilliseconds),
+      maxAttempts: maxAttempts);
+  return r.retry(
+    retryHandler,
+    retryIf: (Exception e) => e is TransactionAbortedError || e is GrpcError,
+  );
+}
diff --git a/app_dart/lib/src/request_handlers/push_engine_status_to_github.dart b/app_dart/lib/src/request_handlers/push_engine_status_to_github.dart
index d038d46..45206e3 100644
--- a/app_dart/lib/src/request_handlers/push_engine_status_to_github.dart
+++ b/app_dart/lib/src/request_handlers/push_engine_status_to_github.dart
@@ -9,6 +9,7 @@
 import 'package:meta/meta.dart';
 
 import '../datastore/cocoon_config.dart';
+import '../foundation/utils.dart';
 import '../model/appengine/github_build_status_update.dart';
 import '../model/appengine/task.dart';
 import '../request_handling/api_request_handler.dart';
@@ -93,10 +94,13 @@
 
     final int maxEntityGroups = config.maxEntityGroups;
     for (int i = 0; i < updates.length; i += maxEntityGroups) {
-      await datastore.db.withTransaction<void>((Transaction transaction) async {
-        transaction.queueMutations(
-            inserts: updates.skip(i).take(maxEntityGroups).toList());
-        await transaction.commit();
+      await runTransactionWithRetries(() async {
+        await datastore.db
+            .withTransaction<void>((Transaction transaction) async {
+          transaction.queueMutations(
+              inserts: updates.skip(i).take(maxEntityGroups).toList());
+          await transaction.commit();
+        });
       });
     }
     log.debug('Committed all updates');
diff --git a/app_dart/lib/src/request_handlers/refresh_chromebot_status.dart b/app_dart/lib/src/request_handlers/refresh_chromebot_status.dart
index bb3f50c..40bc941 100644
--- a/app_dart/lib/src/request_handlers/refresh_chromebot_status.dart
+++ b/app_dart/lib/src/request_handlers/refresh_chromebot_status.dart
@@ -8,6 +8,7 @@
 import 'package:meta/meta.dart';
 
 import '../datastore/cocoon_config.dart';
+import '../foundation/utils.dart';
 import '../model/appengine/task.dart';
 import '../request_handling/api_request_handler.dart';
 import '../request_handling/authentication.dart';
@@ -49,25 +50,26 @@
     );
 
     for (LuciBuilder builder in luciTasks.keys) {
-      await config.db.withTransaction<void>((Transaction transaction) async {
-        try {
-          await for (FullTask task
-              in datastore.queryRecentTasks(taskName: builder.taskName)) {
-            for (LuciTask luciTask in luciTasks[builder]) {
-              if (luciTask.commitSha == task.commit.sha) {
-                final Task update = task.task;
-                update.status = luciTask.status;
-                transaction.queueMutations(inserts: <Task>[update]);
-                // Stop updating task whenever we find the latest status of the same commit.
-                break;
+      await runTransactionWithRetries(() async {
+        await config.db.withTransaction<void>((Transaction transaction) async {
+          try {
+            await for (FullTask task
+                in datastore.queryRecentTasks(taskName: builder.taskName)) {
+              for (LuciTask luciTask in luciTasks[builder]) {
+                if (luciTask.commitSha == task.commit.sha) {
+                  final Task update = task.task;
+                  update.status = luciTask.status;
+                  transaction.queueMutations(inserts: <Task>[update]);
+                  // Stop updating task whenever we find the latest status of the same commit.
+                  break;
+                }
               }
             }
+            await transaction.commit();
+          } catch (error) {
+            rethrow;
           }
-          await transaction.commit();
-        } catch (error) {
-          await transaction.rollback();
-          rethrow;
-        }
+        });
       });
     }
 
diff --git a/app_dart/lib/src/request_handlers/reserve_task.dart b/app_dart/lib/src/request_handlers/reserve_task.dart
index ecb9c38..9af62f2 100644
--- a/app_dart/lib/src/request_handlers/reserve_task.dart
+++ b/app_dart/lib/src/request_handlers/reserve_task.dart
@@ -10,6 +10,7 @@
 import 'package:meta/meta.dart';
 
 import '../datastore/cocoon_config.dart';
+import '../foundation/utils.dart';
 import '../model/appengine/agent.dart';
 import '../model/appengine/commit.dart';
 import '../model/appengine/key_helper.dart';
@@ -240,20 +241,21 @@
     assert(task != null);
     assert(agentId != null);
     try {
-      return config.db.withTransaction<void>((Transaction transaction) async {
-        final Task lockedTask = await transaction.lookupValue<Task>(task.key);
+      return runTransactionWithRetries(() async {
+        await config.db.withTransaction<void>((Transaction transaction) async {
+          final Task lockedTask = await transaction.lookupValue<Task>(task.key);
+          if (lockedTask.status != Task.statusNew) {
+            // Another reservation beat us in a race.
+            throw const ReservationLostException();
+          }
 
-        if (lockedTask.status != Task.statusNew) {
-          // Another reservation beat us in a race.
-          throw const ReservationLostException();
-        }
-
-        lockedTask.status = Task.statusInProgress;
-        lockedTask.attempts += 1;
-        lockedTask.startTimestamp = DateTime.now().millisecondsSinceEpoch;
-        lockedTask.reservedForAgentId = agentId;
-        transaction.queueMutations(inserts: <Task>[lockedTask]);
-        await transaction.commit();
+          lockedTask.status = Task.statusInProgress;
+          lockedTask.attempts += 1;
+          lockedTask.startTimestamp = DateTime.now().millisecondsSinceEpoch;
+          lockedTask.reservedForAgentId = agentId;
+          transaction.queueMutations(inserts: <Task>[lockedTask]);
+          await transaction.commit();
+        });
       });
     } catch (error) {
       throw const ReservationLostException();
diff --git a/app_dart/lib/src/request_handlers/update_task_status.dart b/app_dart/lib/src/request_handlers/update_task_status.dart
index 35a8fcf..363c6c7 100644
--- a/app_dart/lib/src/request_handlers/update_task_status.dart
+++ b/app_dart/lib/src/request_handlers/update_task_status.dart
@@ -10,6 +10,7 @@
 import 'package:meta/meta.dart';
 
 import '../datastore/cocoon_config.dart';
+import '../foundation/utils.dart';
 import '../model/appengine/commit.dart';
 import '../model/appengine/key_helper.dart';
 import '../model/appengine/task.dart';
@@ -96,9 +97,11 @@
       task.endTimestamp = DateTime.now().millisecondsSinceEpoch;
     }
 
-    await datastore.db.withTransaction<void>((Transaction transaction) async {
-      transaction.queueMutations(inserts: <Task>[task]);
-      await transaction.commit();
+    await runTransactionWithRetries(() async {
+      await datastore.db.withTransaction<void>((Transaction transaction) async {
+        transaction.queueMutations(inserts: <Task>[task]);
+        await transaction.commit();
+      });
     });
 
     if (task.endTimestamp > 0) {
@@ -109,22 +112,24 @@
 
     if (newStatus == Task.statusSucceeded && scoreKeys.isNotEmpty) {
       for (String scoreKey in scoreKeys) {
-        await datastore.db
-            .withTransaction<void>((Transaction transaction) async {
-          final TimeSeries series =
-              await _getOrCreateTimeSeries(transaction, task, scoreKey);
-          final num value = resultData[scoreKey] as num;
+        await runTransactionWithRetries(() async {
+          await datastore.db
+              .withTransaction<void>((Transaction transaction) async {
+            final TimeSeries series =
+                await _getOrCreateTimeSeries(transaction, task, scoreKey);
+            final num value = resultData[scoreKey] as num;
 
-          final TimeSeriesValue seriesValue = TimeSeriesValue(
-            key: series.key.append(TimeSeriesValue),
-            createTimestamp: DateTime.now().millisecondsSinceEpoch,
-            revision: commit.sha,
-            taskKey: task.key,
-            value: value.toDouble(),
-          );
+            final TimeSeriesValue seriesValue = TimeSeriesValue(
+              key: series.key.append(TimeSeriesValue),
+              createTimestamp: DateTime.now().millisecondsSinceEpoch,
+              revision: commit.sha,
+              taskKey: task.key,
+              value: value.toDouble(),
+            );
 
-          transaction.queueMutations(inserts: <TimeSeriesValue>[seriesValue]);
-          await transaction.commit();
+            transaction.queueMutations(inserts: <TimeSeriesValue>[seriesValue]);
+            await transaction.commit();
+          });
         });
       }
     }
diff --git a/app_dart/test/foundation/utils_test.dart b/app_dart/test/foundation/utils_test.dart
new file mode 100644
index 0000000..b2e0c00
--- /dev/null
+++ b/app_dart/test/foundation/utils_test.dart
@@ -0,0 +1,56 @@
+// Copyright 2020 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+import 'package:test/test.dart';
+import 'package:gcloud/datastore.dart';
+import 'package:grpc/grpc.dart';
+
+import 'package:cocoon_service/src/foundation/utils.dart';
+
+class Counter {
+  int count = 0;
+  void increase() {
+    count = count + 1;
+  }
+
+  int value() {
+    return count;
+  }
+}
+
+void main() {
+  group('RunTransactionWithRetry', () {
+    test('retriesOnGrpcError', () async {
+      final Counter counter = Counter();
+      try {
+        await runTransactionWithRetries(() async {
+          counter.increase();
+          throw GrpcError.aborted();
+        });
+      } catch (e) {
+        expect(e, isA<GrpcError>());
+      }
+      expect(counter.value(), greaterThan(1));
+    });
+    test('retriesTransactionAbortedError', () async {
+      final Counter counter = Counter();
+      try {
+        await runTransactionWithRetries(() async {
+          counter.increase();
+          throw TransactionAbortedError();
+        });
+      } catch (e) {
+        expect(e, isA<TransactionAbortedError>());
+      }
+      expect(counter.value(), greaterThan(1));
+    });
+    test('DoesNotRetryOnSuccess', () async {
+      final Counter counter = Counter();
+      await runTransactionWithRetries(() async {
+        counter.increase();
+      });
+      expect(counter.value(), equals(1));
+    });
+  });
+}