Cache locking in Cocoon for access tokens (#2389)
* Revert will wait for required checks to complete.
* Revert "Revert will wait for required checks to complete."
This reverts commit 0555407ad4eaf001167273e70ccdb313be4da21d.
* Add locking to the cocoon cache.
* Formatting
* Updated to add methods for locking along side the regular methods.
* Remove duplicated code by lock around calls to self.
* Format
* Updated code.
* Using standard mutex instead of read/write since we dont user the read lock.
diff --git a/app_dart/lib/src/request_handling/subscription_handler.dart b/app_dart/lib/src/request_handling/subscription_handler.dart
index 977ccec..d35ff95 100644
--- a/app_dart/lib/src/request_handling/subscription_handler.dart
+++ b/app_dart/lib/src/request_handling/subscription_handler.dart
@@ -109,7 +109,11 @@
final String messageId = envelope.message!.messageId!;
- final Uint8List? messageLock = await cache.getOrCreate(subscriptionName, messageId);
+ final Uint8List? messageLock = await cache.getOrCreate(
+ subscriptionName,
+ messageId,
+ createFn: null,
+ );
if (messageLock != null) {
// No-op - There's already a write lock for this message
final HttpResponse response = request.response
diff --git a/app_dart/lib/src/service/cache_service.dart b/app_dart/lib/src/service/cache_service.dart
index 4059e5a..5639334 100644
--- a/app_dart/lib/src/service/cache_service.dart
+++ b/app_dart/lib/src/service/cache_service.dart
@@ -8,6 +8,8 @@
import 'package:meta/meta.dart';
import 'package:neat_cache/cache_provider.dart';
import 'package:neat_cache/neat_cache.dart';
+import 'package:mutex/mutex.dart';
+import 'package:retry/retry.dart';
/// Service for reading and writing values to a cache for quick access of data.
///
@@ -20,6 +22,8 @@
}) : _provider =
inMemory ? Cache.inMemoryCacheProvider(inMemoryMaxNumberEntries) : Cache.redisCacheProvider(memorystoreUri);
+ final Mutex m = Mutex();
+
final CacheProvider<List<int>> _provider;
Cache<Uint8List> get cache => cacheValue ?? Cache<List<int>>(_provider).withCodec<Uint8List>(const _CacheCodec());
@@ -49,35 +53,78 @@
Future<Uint8List?> getOrCreate(
String subcacheName,
String key, {
- int attempt = 1,
- Future<Uint8List> Function()? createFn,
+ required Future<Uint8List> Function()? createFn,
Duration ttl = const Duration(minutes: 1),
}) async {
- final Cache<Uint8List> subcache = cache.withPrefix(subcacheName);
- Uint8List? value;
-
- try {
- value = await subcache[key].get();
- } catch (e) {
- if (attempt < maxCacheGetAttempts) {
- return getOrCreate(
- subcacheName,
- key,
- attempt: ++attempt,
- createFn: createFn,
- ttl: ttl,
- );
- } else {
- // Give up on trying to get the value from the cache.
- value = null;
- }
- }
+ Uint8List? value = await _readValue(subcacheName, key);
// If given createFn, update the cache value if the value returned was null.
if (createFn != null && value == null) {
// Try creating the value
value = await createFn();
- await set(subcacheName, key, value, ttl: ttl);
+ await set(
+ subcacheName,
+ key,
+ value,
+ ttl: ttl,
+ );
+ }
+
+ return value;
+ }
+
+ /// This method is the same as the [getOrCreate] method above except that it
+ /// enforces locking access.
+ ///
+ /// Note: these methods are intended to prevent issues around race conditions
+ /// when storing and retrieving github tokens locally only for this instance.
+ /// Care should be taken to use the locking methods together when accessing
+ /// data from an entity using the cache.
+ Future<Uint8List?> getOrCreateWithLocking(
+ String subcacheName,
+ String key, {
+ required Future<Uint8List> Function()? createFn,
+ Duration ttl = const Duration(minutes: 1),
+ }) async {
+ Uint8List? value = await _readValue(subcacheName, key);
+
+ // If given createFn, update the cache value if the value returned was null.
+ if (createFn != null && value == null) {
+ // Try creating the value
+ value = await createFn();
+ await setWithLocking(
+ subcacheName,
+ key,
+ value,
+ ttl: ttl,
+ );
+ }
+
+ return value;
+ }
+
+ Future<Uint8List?> _readValue(
+ String subcacheName,
+ String key,
+ ) async {
+ final Cache<Uint8List> subcache = cache.withPrefix(subcacheName);
+ Uint8List? value;
+
+ const RetryOptions r = RetryOptions(
+ maxAttempts: maxCacheGetAttempts,
+ delayFactor: Duration(milliseconds: 50),
+ );
+
+ try {
+ await r.retry(
+ () async {
+ value = await subcache[key].get();
+ },
+ );
+ } catch (e) {
+ // If the last retry is unsuccessful on an exception we do not want to die
+ // here.
+ value = null;
}
return value;
@@ -95,10 +142,46 @@
return entry.set(value, ttl);
}
+ /// Set [value] for [key] in the subcache [subcacheName] with [ttl] but
+ /// enforce locking accessing.
+ ///
+ /// Note: these methods are intended to prevent issues around race conditions
+ /// when storing and retrieving github tokens. Care should be taken to use the
+ /// locking methods together when accessing data from an entity using the
+ /// cache.
+ Future<Uint8List?> setWithLocking(
+ String subcacheName,
+ String key,
+ Uint8List? value, {
+ Duration ttl = const Duration(minutes: 1),
+ }) async {
+ await m.acquire();
+ try {
+ return set(
+ subcacheName,
+ key,
+ value,
+ ttl: ttl,
+ );
+ } finally {
+ m.release();
+ }
+ }
+
/// Clear the value stored in subcache [subcacheName] for key [key].
- Future<void> purge(String subcacheName, String key) {
- final Cache<Uint8List> subcache = cache.withPrefix(subcacheName);
- return subcache[key].purge(retries: maxCacheGetAttempts);
+ ///
+ /// Note: these methods are intended to prevent issues around race conditions
+ /// when storing and retrieving github tokens. Care should be taken to use the
+ /// locking methods together when accessing data from an entity using the
+ /// cache.
+ Future<void> purge(String subcacheName, String key) async {
+ await m.acquire();
+ try {
+ final Cache<Uint8List> subcache = cache.withPrefix(subcacheName);
+ return subcache[key].purge(retries: maxCacheGetAttempts);
+ } finally {
+ m.release();
+ }
}
void dispose() {
diff --git a/app_dart/lib/src/service/config.dart b/app_dart/lib/src/service/config.dart
index 5b1a89f..663d345 100644
--- a/app_dart/lib/src/service/config.dart
+++ b/app_dart/lib/src/service/config.dart
@@ -326,7 +326,7 @@
Future<String> generateGithubToken(gh.RepositorySlug slug) async {
// GitHub's secondary rate limits are run into very frequently when making auth tokens.
- final Uint8List? cacheValue = await _cache.getOrCreate(
+ final Uint8List? cacheValue = await _cache.getOrCreateWithLocking(
configCacheName,
'githubToken-${slug.fullName}',
createFn: () => _generateGithubToken(slug),
diff --git a/app_dart/pubspec.lock b/app_dart/pubspec.lock
index ef0f5c0..9f1ecb2 100644
--- a/app_dart/pubspec.lock
+++ b/app_dart/pubspec.lock
@@ -497,6 +497,14 @@
url: "https://pub.dev"
source: hosted
version: "5.3.2"
+ mutex:
+ dependency: "direct main"
+ description:
+ name: mutex
+ sha256: "03116a4e46282a671b46c12de649d72c0ed18188ffe12a8d0fc63e83f4ad88f4"
+ url: "https://pub.dev"
+ source: hosted
+ version: "3.0.1"
neat_cache:
dependency: "direct main"
description:
diff --git a/app_dart/pubspec.yaml b/app_dart/pubspec.yaml
index dcd383e..a6eecbf 100644
--- a/app_dart/pubspec.yaml
+++ b/app_dart/pubspec.yaml
@@ -31,6 +31,7 @@
logging: ^1.1.0
meta: ^1.8.0
mime: ^1.0.2
+ mutex: ^3.0.1
neat_cache: ^2.0.2
path: ^1.8.2
process: ^4.2.4
diff --git a/app_dart/test/request_handlers/flush_cache_test.dart b/app_dart/test/request_handlers/flush_cache_test.dart
index 21246a8..d7c0d99 100644
--- a/app_dart/test/request_handlers/flush_cache_test.dart
+++ b/app_dart/test/request_handlers/flush_cache_test.dart
@@ -48,7 +48,7 @@
);
await tester.get(handler);
- expect(await cache.getOrCreate(Config.configCacheName, cacheKey), null);
+ expect(await cache.getOrCreate(Config.configCacheName, cacheKey, createFn: null), null);
});
test('raises error if cache key not passed', () async {
diff --git a/app_dart/test/request_handling/subscription_handler_test.dart b/app_dart/test/request_handling/subscription_handler_test.dart
index 04b4684..bee1304 100644
--- a/app_dart/test/request_handling/subscription_handler_test.dart
+++ b/app_dart/test/request_handling/subscription_handler_test.dart
@@ -95,19 +95,19 @@
expect(response.statusCode, HttpStatus.ok);
// 2. Empty message is returned as this was already processed
expect(responseBody, '123 was already processed');
- expect(await cache.getOrCreate(subscription.subscriptionName, '123'), isNotNull);
+ expect(await cache.getOrCreate(subscription.subscriptionName, '123', createFn: null), isNotNull);
});
test('ensure messages can be retried', () async {
final CacheService cache = CacheService(inMemory: true);
subscription = ErrorTest(cache);
HttpClientResponse response = await issueRequest(body: jsonEncode(testEnvelope));
- Uint8List? messageLock = await cache.getOrCreate('error', '123');
+ Uint8List? messageLock = await cache.getOrCreate('error', '123', createFn: null);
expect(response.statusCode, HttpStatus.internalServerError);
expect(messageLock, isNull);
response = await issueRequest(body: jsonEncode(testEnvelope));
- messageLock = await cache.getOrCreate('error', '123');
+ messageLock = await cache.getOrCreate('error', '123', createFn: null);
expect(response.statusCode, HttpStatus.internalServerError);
expect(messageLock, isNull);
});
diff --git a/app_dart/test/service/cache_service_test.dart b/app_dart/test/service/cache_service_test.dart
index 165d4ea..d2d3310 100644
--- a/app_dart/test/service/cache_service_test.dart
+++ b/app_dart/test/service/cache_service_test.dart
@@ -2,6 +2,7 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
+import 'dart:async';
import 'dart:typed_data';
import 'package:cocoon_service/src/service/cache_service.dart';
@@ -22,7 +23,11 @@
});
test('returns null when no value exists', () async {
- final Uint8List? value = await cache.getOrCreate(testSubcacheName, 'abc');
+ final Uint8List? value = await cache.getOrCreate(
+ testSubcacheName,
+ 'abc',
+ createFn: null,
+ );
expect(value, isNull);
});
@@ -33,7 +38,11 @@
await cache.set(testSubcacheName, testKey, expectedValue);
- final Uint8List? value = await cache.getOrCreate(testSubcacheName, testKey);
+ final Uint8List? value = await cache.getOrCreate(
+ testSubcacheName,
+ testKey,
+ createFn: null,
+ );
expect(value, expectedValue);
});
@@ -47,10 +56,18 @@
await cache.set(testSubcacheName, testKey1, expectedValue1);
await cache.set(testSubcacheName, testKey2, expectedValue2);
- final Uint8List? value1 = await cache.getOrCreate(testSubcacheName, testKey1);
+ final Uint8List? value1 = await cache.getOrCreate(
+ testSubcacheName,
+ testKey1,
+ createFn: null,
+ );
expect(value1, null);
- final Uint8List? value2 = await cache.getOrCreate(testSubcacheName, testKey2);
+ final Uint8List? value2 = await cache.getOrCreate(
+ testSubcacheName,
+ testKey2,
+ createFn: null,
+ );
expect(value2, expectedValue2);
});
@@ -68,7 +85,11 @@
cache.cacheValue = mockMainCache;
- final Uint8List? value = await cache.getOrCreate(testSubcacheName, 'does not matter');
+ final Uint8List? value = await cache.getOrCreate(
+ testSubcacheName,
+ 'does not matter',
+ createFn: null,
+ );
verify(mockTestSubcache['does not matter']).called(2);
expect(value, Uint8List.fromList('abc123'.codeUnits));
});
@@ -88,7 +109,11 @@
cache.cacheValue = mockMainCache;
- final Uint8List? value = await cache.getOrCreate(testSubcacheName, 'does not matter');
+ final Uint8List? value = await cache.getOrCreate(
+ testSubcacheName,
+ 'does not matter',
+ createFn: null,
+ );
verify(mockTestSubcache['does not matter']).called(CacheService.maxCacheGetAttempts);
expect(value, isNull);
});
@@ -108,13 +133,21 @@
await cache.set(testSubcacheName, testKey, expectedValue);
- final Uint8List? value = await cache.getOrCreate(testSubcacheName, testKey);
+ final Uint8List? value = await cache.getOrCreate(
+ testSubcacheName,
+ testKey,
+ createFn: null,
+ );
expect(value, expectedValue);
await cache.purge(testSubcacheName, testKey);
- final Uint8List? valueAfterPurge = await cache.getOrCreate(testSubcacheName, testKey);
+ final Uint8List? valueAfterPurge = await cache.getOrCreate(
+ testSubcacheName,
+ testKey,
+ createFn: null,
+ );
expect(valueAfterPurge, isNull);
});
@@ -157,6 +190,64 @@
);
verify(entry.set(any, testDuration)).called(1);
});
+
+ test('set does not block read attempt', () async {
+ const String testKey = 'abc';
+ final Uint8List expectedValue = Uint8List.fromList('123'.codeUnits);
+
+ final cacheWrite = cache.setWithLocking(testSubcacheName, testKey, expectedValue);
+ Uint8List? valueAfterSet = await cache.getOrCreateWithLocking(
+ testSubcacheName,
+ testKey,
+ createFn: null,
+ );
+
+ expect(valueAfterSet, null);
+ await cacheWrite;
+ valueAfterSet = await cache.getOrCreateWithLocking(
+ testSubcacheName,
+ testKey,
+ createFn: null,
+ );
+ expect(valueAfterSet, expectedValue);
+ });
+
+ test('read locks are not blocking', () async {
+ const String testKey = 'abc';
+ final Uint8List expectedValue = Uint8List.fromList('123'.codeUnits);
+
+ await cache.setWithLocking(testSubcacheName, testKey, expectedValue);
+ final Future<Uint8List?> valueAfterSet = cache.getOrCreateWithLocking(
+ testSubcacheName,
+ testKey,
+ createFn: null,
+ );
+ final Uint8List? valueAfterSet2 = await cache.getOrCreateWithLocking(
+ testSubcacheName,
+ testKey,
+ createFn: null,
+ );
+
+ expect(valueAfterSet2, expectedValue);
+ await valueAfterSet.then((value) => expect(value, expectedValue));
+ });
+
+ test('write locks are blocking', () async {
+ const String testKey = 'abc';
+ final Uint8List expectedValue = Uint8List.fromList('123'.codeUnits);
+ final Uint8List newValue = Uint8List.fromList('345'.codeUnits);
+
+ final cacheWrite = cache.setWithLocking(testSubcacheName, testKey, expectedValue);
+ final cacheWrite2 = cache.setWithLocking(testSubcacheName, testKey, newValue);
+ await cacheWrite;
+ final Uint8List? readValue = await cache.getOrCreateWithLocking(
+ testSubcacheName,
+ testKey,
+ createFn: null,
+ );
+ expect(readValue, expectedValue);
+ await cacheWrite2;
+ });
});
}
diff --git a/app_dart/test/src/utilities/mocks.mocks.dart b/app_dart/test/src/utilities/mocks.mocks.dart
index 0e33d34..762708a 100644
--- a/app_dart/test/src/utilities/mocks.mocks.dart
+++ b/app_dart/test/src/utilities/mocks.mocks.dart
@@ -7152,6 +7152,11 @@
returnValue: '',
) as String);
@override
+ String get version => (super.noSuchMethod(
+ Invocation.getter(#version),
+ returnValue: '',
+ ) as String);
+ @override
_i2.Client get client => (super.noSuchMethod(
Invocation.getter(#client),
returnValue: _FakeClient_0(
diff --git a/auto_submit/pubspec.lock b/auto_submit/pubspec.lock
index 20f4db1..9ea70d2 100644
--- a/auto_submit/pubspec.lock
+++ b/auto_submit/pubspec.lock
@@ -333,7 +333,7 @@
dependency: "direct main"
description:
name: graphql
- sha256: "68bb34f7d239b7c9530c37de754ccb4b9e19927d838fd103da297444c11dad62"
+ sha256: "2a04c92d1fa897de21cd76e13ed212e7da1b0c2036ed54bd148d0b1679fbd72e"
url: "https://pub.dev"
source: hosted
version: "5.1.2-beta.6"