Support retrying Datastore operations. (#168)
* Support retrying Datastore operations.
* Expose only maxAttempts
* add more errors
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 2efd48d..bb60415 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,6 +1,7 @@
-## 0.8.10-wip
+## 0.8.10
- Widen the SDK constraint to support Dart 3.0
+- Support retrying Datastore operations.
## 0.8.9
diff --git a/lib/datastore.dart b/lib/datastore.dart
index 3b05eea..53ca4eb 100644
--- a/lib/datastore.dart
+++ b/lib/datastore.dart
@@ -12,10 +12,12 @@
import 'dart:async';
import 'package:http/http.dart' as http;
+import 'package:retry/retry.dart';
import 'common.dart' show Page;
import 'service_scope.dart' as ss;
import 'src/datastore_impl.dart' show DatastoreImpl;
+import 'src/retry_datastore_impl.dart';
const Symbol _datastoreKey = #gcloud.datastore;
@@ -391,6 +393,22 @@
return DatastoreImpl(client, project);
}
+ /// Retry Datastore operations where the issue seems to be transient.
+ ///
+ /// The [delegate] is the configured [Datastore] implementation that will be
+ /// used.
+ ///
+ /// The operations will be retried at maximum of [maxAttempts].
+ factory Datastore.withRetry(
+ Datastore delegate, {
+ int? maxAttempts,
+ }) {
+ return RetryDatastoreImpl(
+ delegate,
+ RetryOptions(maxAttempts: maxAttempts ?? 3),
+ );
+ }
+
/// Allocate integer IDs for the partially populated [keys] given as argument.
///
/// The returned [Key]s will be fully populated with the allocated IDs.
diff --git a/lib/src/retry_datastore_impl.dart b/lib/src/retry_datastore_impl.dart
new file mode 100644
index 0000000..e57410c
--- /dev/null
+++ b/lib/src/retry_datastore_impl.dart
@@ -0,0 +1,159 @@
+// Copyright (c) 2023, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'package:retry/retry.dart';
+
+import '../common.dart';
+import '../datastore.dart' as datastore;
+
+/// Datastore implementation which retries most operations
+class RetryDatastoreImpl implements datastore.Datastore {
+ final datastore.Datastore _delegate;
+ final RetryOptions _retryOptions;
+
+ RetryDatastoreImpl(this._delegate, this._retryOptions);
+
+ @override
+ Future<List<datastore.Key>> allocateIds(List<datastore.Key> keys) async {
+ return await _retryOptions.retry(
+ () => _delegate.allocateIds(keys),
+ retryIf: _retryIf,
+ );
+ }
+
+ @override
+ Future<datastore.Transaction> beginTransaction({
+ bool crossEntityGroup = false,
+ }) async {
+ return await _retryOptions.retry(
+ () => _delegate.beginTransaction(crossEntityGroup: crossEntityGroup),
+ retryIf: _retryIf,
+ );
+ }
+
+ @override
+ Future<datastore.CommitResult> commit({
+ List<datastore.Entity> inserts = const [],
+ List<datastore.Entity> autoIdInserts = const [],
+ List<datastore.Key> deletes = const [],
+ datastore.Transaction? transaction,
+ }) async {
+ Future<datastore.CommitResult> fn() async {
+ if (transaction == null) {
+ return await _delegate.commit(
+ inserts: inserts,
+ autoIdInserts: autoIdInserts,
+ deletes: deletes,
+ );
+ } else {
+ return await _delegate.commit(
+ inserts: inserts,
+ autoIdInserts: autoIdInserts,
+ deletes: deletes,
+ transaction: transaction,
+ );
+ }
+ }
+
+ final shouldNotRetry = autoIdInserts.isNotEmpty && transaction == null;
+ if (shouldNotRetry) {
+ return await fn();
+ } else {
+ return await _retryOptions.retry(fn, retryIf: _retryIf);
+ }
+ }
+
+ @override
+ Future<List<datastore.Entity?>> lookup(
+ List<datastore.Key> keys, {
+ datastore.Transaction? transaction,
+ }) async {
+ return await _retryOptions.retry(
+ () async {
+ if (transaction == null) {
+ return await _delegate.lookup(keys);
+ } else {
+ return await _delegate.lookup(keys, transaction: transaction);
+ }
+ },
+ retryIf: _retryIf,
+ );
+ }
+
+ @override
+ Future<Page<datastore.Entity>> query(
+ datastore.Query query, {
+ datastore.Partition? partition,
+ datastore.Transaction? transaction,
+ }) async {
+ Future<Page<datastore.Entity>> fn() async {
+ if (partition != null && transaction != null) {
+ return await _delegate.query(
+ query,
+ partition: partition,
+ transaction: transaction,
+ );
+ } else if (partition != null) {
+ return await _delegate.query(query, partition: partition);
+ } else if (transaction != null) {
+ return await _delegate.query(
+ query,
+ transaction: transaction,
+ );
+ } else {
+ return await _delegate.query(query);
+ }
+ }
+
+ return await _retryOptions.retry(
+ () async => _RetryPage(await fn(), _retryOptions),
+ retryIf: _retryIf,
+ );
+ }
+
+ @override
+ Future rollback(datastore.Transaction transaction) async {
+ return await _retryOptions.retry(
+ () => _delegate.rollback(transaction),
+ retryIf: _retryIf,
+ );
+ }
+}
+
+class _RetryPage<K> implements Page<K> {
+ final Page<K> _delegate;
+ final RetryOptions _retryOptions;
+
+ _RetryPage(this._delegate, this._retryOptions);
+
+ @override
+ bool get isLast => _delegate.isLast;
+
+ @override
+ List<K> get items => _delegate.items;
+
+ @override
+ Future<Page<K>> next({int? pageSize}) async {
+ return await _retryOptions.retry(
+ () async {
+ if (pageSize == null) {
+ return await _delegate.next();
+ } else {
+ return await _delegate.next(pageSize: pageSize);
+ }
+ },
+ retryIf: _retryIf,
+ );
+ }
+}
+
+bool _retryIf(Exception e) {
+ if (e is datastore.TransactionAbortedError ||
+ e is datastore.NeedIndexError ||
+ e is datastore.QuotaExceededError ||
+ e is datastore.PermissionDeniedError) {
+ return false;
+ }
+ return true;
+}
diff --git a/pubspec.yaml b/pubspec.yaml
index e5e48a3..7a58f69 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,5 +1,5 @@
name: gcloud
-version: 0.8.10-wip
+version: 0.8.10
description: >-
High level idiomatic Dart API for Google Cloud Storage, Pub-Sub and Datastore.
repository: https://github.com/dart-lang/gcloud
@@ -16,6 +16,7 @@
googleapis: '>=3.0.0 <12.0.0'
http: '>=0.13.5 <2.0.0'
meta: ^1.3.0
+ retry: ^3.1.1
dev_dependencies:
dart_flutter_team_lints: ^1.0.0
diff --git a/test/db_all_e2e_test.dart b/test/db_all_e2e_test.dart
index f66f515..af37670 100644
--- a/test/db_all_e2e_test.dart
+++ b/test/db_all_e2e_test.dart
@@ -10,6 +10,7 @@
import 'dart:async';
import 'dart:io';
+import 'package:gcloud/datastore.dart';
import 'package:gcloud/db.dart' as db;
import 'package:gcloud/src/datastore_impl.dart' as datastore_impl;
import 'package:http/http.dart';
@@ -25,12 +26,13 @@
var now = DateTime.now().millisecondsSinceEpoch;
var namespace = '${Platform.operatingSystem}$now';
- late datastore_impl.DatastoreImpl datastore;
+ late Datastore datastore;
late db.DatastoreDB datastoreDB;
Client? client;
await withAuthClient(scopes, (String project, httpClient) async {
- datastore = datastore_impl.DatastoreImpl(httpClient, project);
+ datastore =
+ Datastore.withRetry(datastore_impl.DatastoreImpl(httpClient, project));
datastoreDB = db.DatastoreDB(datastore);
client = httpClient;
});