Support retrying Datastore operations. (#168)

* Support retrying Datastore operations.

* Expose only maxAttempts

* add more errors
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 2efd48d..bb60415 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,6 +1,7 @@
-## 0.8.10-wip
+## 0.8.10
 
 - Widen the SDK constraint to support Dart 3.0
+- Support retrying Datastore operations.
 
 ## 0.8.9
 
diff --git a/lib/datastore.dart b/lib/datastore.dart
index 3b05eea..53ca4eb 100644
--- a/lib/datastore.dart
+++ b/lib/datastore.dart
@@ -12,10 +12,12 @@
 import 'dart:async';
 
 import 'package:http/http.dart' as http;
+import 'package:retry/retry.dart';
 
 import 'common.dart' show Page;
 import 'service_scope.dart' as ss;
 import 'src/datastore_impl.dart' show DatastoreImpl;
+import 'src/retry_datastore_impl.dart';
 
 const Symbol _datastoreKey = #gcloud.datastore;
 
@@ -391,6 +393,22 @@
     return DatastoreImpl(client, project);
   }
 
+  /// Retry Datastore operations where the issue seems to be transient.
+  ///
+  /// The [delegate] is the configured [Datastore] implementation that will be
+  /// used.
+  ///
+  /// The operations will be retried at maximum of [maxAttempts].
+  factory Datastore.withRetry(
+    Datastore delegate, {
+    int? maxAttempts,
+  }) {
+    return RetryDatastoreImpl(
+      delegate,
+      RetryOptions(maxAttempts: maxAttempts ?? 3),
+    );
+  }
+
   /// Allocate integer IDs for the partially populated [keys] given as argument.
   ///
   /// The returned [Key]s will be fully populated with the allocated IDs.
diff --git a/lib/src/retry_datastore_impl.dart b/lib/src/retry_datastore_impl.dart
new file mode 100644
index 0000000..e57410c
--- /dev/null
+++ b/lib/src/retry_datastore_impl.dart
@@ -0,0 +1,159 @@
+// Copyright (c) 2023, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'package:retry/retry.dart';
+
+import '../common.dart';
+import '../datastore.dart' as datastore;
+
+/// Datastore implementation which retries most operations
+class RetryDatastoreImpl implements datastore.Datastore {
+  final datastore.Datastore _delegate;
+  final RetryOptions _retryOptions;
+
+  RetryDatastoreImpl(this._delegate, this._retryOptions);
+
+  @override
+  Future<List<datastore.Key>> allocateIds(List<datastore.Key> keys) async {
+    return await _retryOptions.retry(
+      () => _delegate.allocateIds(keys),
+      retryIf: _retryIf,
+    );
+  }
+
+  @override
+  Future<datastore.Transaction> beginTransaction({
+    bool crossEntityGroup = false,
+  }) async {
+    return await _retryOptions.retry(
+      () => _delegate.beginTransaction(crossEntityGroup: crossEntityGroup),
+      retryIf: _retryIf,
+    );
+  }
+
+  @override
+  Future<datastore.CommitResult> commit({
+    List<datastore.Entity> inserts = const [],
+    List<datastore.Entity> autoIdInserts = const [],
+    List<datastore.Key> deletes = const [],
+    datastore.Transaction? transaction,
+  }) async {
+    Future<datastore.CommitResult> fn() async {
+      if (transaction == null) {
+        return await _delegate.commit(
+          inserts: inserts,
+          autoIdInserts: autoIdInserts,
+          deletes: deletes,
+        );
+      } else {
+        return await _delegate.commit(
+          inserts: inserts,
+          autoIdInserts: autoIdInserts,
+          deletes: deletes,
+          transaction: transaction,
+        );
+      }
+    }
+
+    final shouldNotRetry = autoIdInserts.isNotEmpty && transaction == null;
+    if (shouldNotRetry) {
+      return await fn();
+    } else {
+      return await _retryOptions.retry(fn, retryIf: _retryIf);
+    }
+  }
+
+  @override
+  Future<List<datastore.Entity?>> lookup(
+    List<datastore.Key> keys, {
+    datastore.Transaction? transaction,
+  }) async {
+    return await _retryOptions.retry(
+      () async {
+        if (transaction == null) {
+          return await _delegate.lookup(keys);
+        } else {
+          return await _delegate.lookup(keys, transaction: transaction);
+        }
+      },
+      retryIf: _retryIf,
+    );
+  }
+
+  @override
+  Future<Page<datastore.Entity>> query(
+    datastore.Query query, {
+    datastore.Partition? partition,
+    datastore.Transaction? transaction,
+  }) async {
+    Future<Page<datastore.Entity>> fn() async {
+      if (partition != null && transaction != null) {
+        return await _delegate.query(
+          query,
+          partition: partition,
+          transaction: transaction,
+        );
+      } else if (partition != null) {
+        return await _delegate.query(query, partition: partition);
+      } else if (transaction != null) {
+        return await _delegate.query(
+          query,
+          transaction: transaction,
+        );
+      } else {
+        return await _delegate.query(query);
+      }
+    }
+
+    return await _retryOptions.retry(
+      () async => _RetryPage(await fn(), _retryOptions),
+      retryIf: _retryIf,
+    );
+  }
+
+  @override
+  Future rollback(datastore.Transaction transaction) async {
+    return await _retryOptions.retry(
+      () => _delegate.rollback(transaction),
+      retryIf: _retryIf,
+    );
+  }
+}
+
+class _RetryPage<K> implements Page<K> {
+  final Page<K> _delegate;
+  final RetryOptions _retryOptions;
+
+  _RetryPage(this._delegate, this._retryOptions);
+
+  @override
+  bool get isLast => _delegate.isLast;
+
+  @override
+  List<K> get items => _delegate.items;
+
+  @override
+  Future<Page<K>> next({int? pageSize}) async {
+    return await _retryOptions.retry(
+      () async {
+        if (pageSize == null) {
+          return await _delegate.next();
+        } else {
+          return await _delegate.next(pageSize: pageSize);
+        }
+      },
+      retryIf: _retryIf,
+    );
+  }
+}
+
+bool _retryIf(Exception e) {
+  if (e is datastore.TransactionAbortedError ||
+      e is datastore.NeedIndexError ||
+      e is datastore.QuotaExceededError ||
+      e is datastore.PermissionDeniedError) {
+    return false;
+  }
+  return true;
+}
diff --git a/pubspec.yaml b/pubspec.yaml
index e5e48a3..7a58f69 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,5 +1,5 @@
 name: gcloud
-version: 0.8.10-wip
+version: 0.8.10
 description: >-
   High level idiomatic Dart API for Google Cloud Storage, Pub-Sub and Datastore.
 repository: https://github.com/dart-lang/gcloud
@@ -16,6 +16,7 @@
   googleapis: '>=3.0.0 <12.0.0'
   http: '>=0.13.5 <2.0.0'
   meta: ^1.3.0
+  retry: ^3.1.1
 
 dev_dependencies:
   dart_flutter_team_lints: ^1.0.0
diff --git a/test/db_all_e2e_test.dart b/test/db_all_e2e_test.dart
index f66f515..af37670 100644
--- a/test/db_all_e2e_test.dart
+++ b/test/db_all_e2e_test.dart
@@ -10,6 +10,7 @@
 import 'dart:async';
 import 'dart:io';
 
+import 'package:gcloud/datastore.dart';
 import 'package:gcloud/db.dart' as db;
 import 'package:gcloud/src/datastore_impl.dart' as datastore_impl;
 import 'package:http/http.dart';
@@ -25,12 +26,13 @@
   var now = DateTime.now().millisecondsSinceEpoch;
   var namespace = '${Platform.operatingSystem}$now';
 
-  late datastore_impl.DatastoreImpl datastore;
+  late Datastore datastore;
   late db.DatastoreDB datastoreDB;
   Client? client;
 
   await withAuthClient(scopes, (String project, httpClient) async {
-    datastore = datastore_impl.DatastoreImpl(httpClient, project);
+    datastore =
+        Datastore.withRetry(datastore_impl.DatastoreImpl(httpClient, project));
     datastoreDB = db.DatastoreDB(datastore);
     client = httpClient;
   });