Cache locking in Cocoon for access tokens (#2389)

* Revert will wait for required checks to complete.

* Revert "Revert will wait for required checks to complete."

This reverts commit 0555407ad4eaf001167273e70ccdb313be4da21d.

* Add locking to the cocoon cache.

* Formatting

* Updated to add methods for locking along side the regular methods.

* Remove duplicated code by lock around calls to self.

* Format

* Updated code.

* Using standard mutex instead of read/write since we dont user the read lock.
diff --git a/app_dart/lib/src/request_handling/subscription_handler.dart b/app_dart/lib/src/request_handling/subscription_handler.dart
index 977ccec..d35ff95 100644
--- a/app_dart/lib/src/request_handling/subscription_handler.dart
+++ b/app_dart/lib/src/request_handling/subscription_handler.dart
@@ -109,7 +109,11 @@
 
     final String messageId = envelope.message!.messageId!;
 
-    final Uint8List? messageLock = await cache.getOrCreate(subscriptionName, messageId);
+    final Uint8List? messageLock = await cache.getOrCreate(
+      subscriptionName,
+      messageId,
+      createFn: null,
+    );
     if (messageLock != null) {
       // No-op - There's already a write lock for this message
       final HttpResponse response = request.response
diff --git a/app_dart/lib/src/service/cache_service.dart b/app_dart/lib/src/service/cache_service.dart
index 4059e5a..5639334 100644
--- a/app_dart/lib/src/service/cache_service.dart
+++ b/app_dart/lib/src/service/cache_service.dart
@@ -8,6 +8,8 @@
 import 'package:meta/meta.dart';
 import 'package:neat_cache/cache_provider.dart';
 import 'package:neat_cache/neat_cache.dart';
+import 'package:mutex/mutex.dart';
+import 'package:retry/retry.dart';
 
 /// Service for reading and writing values to a cache for quick access of data.
 ///
@@ -20,6 +22,8 @@
   }) : _provider =
             inMemory ? Cache.inMemoryCacheProvider(inMemoryMaxNumberEntries) : Cache.redisCacheProvider(memorystoreUri);
 
+  final Mutex m = Mutex();
+
   final CacheProvider<List<int>> _provider;
 
   Cache<Uint8List> get cache => cacheValue ?? Cache<List<int>>(_provider).withCodec<Uint8List>(const _CacheCodec());
@@ -49,35 +53,78 @@
   Future<Uint8List?> getOrCreate(
     String subcacheName,
     String key, {
-    int attempt = 1,
-    Future<Uint8List> Function()? createFn,
+    required Future<Uint8List> Function()? createFn,
     Duration ttl = const Duration(minutes: 1),
   }) async {
-    final Cache<Uint8List> subcache = cache.withPrefix(subcacheName);
-    Uint8List? value;
-
-    try {
-      value = await subcache[key].get();
-    } catch (e) {
-      if (attempt < maxCacheGetAttempts) {
-        return getOrCreate(
-          subcacheName,
-          key,
-          attempt: ++attempt,
-          createFn: createFn,
-          ttl: ttl,
-        );
-      } else {
-        // Give up on trying to get the value from the cache.
-        value = null;
-      }
-    }
+    Uint8List? value = await _readValue(subcacheName, key);
 
     // If given createFn, update the cache value if the value returned was null.
     if (createFn != null && value == null) {
       // Try creating the value
       value = await createFn();
-      await set(subcacheName, key, value, ttl: ttl);
+      await set(
+        subcacheName,
+        key,
+        value,
+        ttl: ttl,
+      );
+    }
+
+    return value;
+  }
+
+  /// This method is the same as the [getOrCreate] method above except that it
+  /// enforces locking access.
+  ///
+  /// Note: these methods are intended to prevent issues around race conditions
+  /// when storing and retrieving github tokens locally only for this instance.
+  /// Care should be taken to use the locking methods together when accessing
+  /// data from an entity using the cache.
+  Future<Uint8List?> getOrCreateWithLocking(
+    String subcacheName,
+    String key, {
+    required Future<Uint8List> Function()? createFn,
+    Duration ttl = const Duration(minutes: 1),
+  }) async {
+    Uint8List? value = await _readValue(subcacheName, key);
+
+    // If given createFn, update the cache value if the value returned was null.
+    if (createFn != null && value == null) {
+      // Try creating the value
+      value = await createFn();
+      await setWithLocking(
+        subcacheName,
+        key,
+        value,
+        ttl: ttl,
+      );
+    }
+
+    return value;
+  }
+
+  Future<Uint8List?> _readValue(
+    String subcacheName,
+    String key,
+  ) async {
+    final Cache<Uint8List> subcache = cache.withPrefix(subcacheName);
+    Uint8List? value;
+
+    const RetryOptions r = RetryOptions(
+      maxAttempts: maxCacheGetAttempts,
+      delayFactor: Duration(milliseconds: 50),
+    );
+
+    try {
+      await r.retry(
+        () async {
+          value = await subcache[key].get();
+        },
+      );
+    } catch (e) {
+      // If the last retry is unsuccessful on an exception we do not want to die
+      // here.
+      value = null;
     }
 
     return value;
@@ -95,10 +142,46 @@
     return entry.set(value, ttl);
   }
 
+  /// Set [value] for [key] in the subcache [subcacheName] with [ttl] but
+  /// enforce locking accessing.
+  ///
+  /// Note: these methods are intended to prevent issues around race conditions
+  /// when storing and retrieving github tokens. Care should be taken to use the
+  /// locking methods together when accessing data from an entity using the
+  /// cache.
+  Future<Uint8List?> setWithLocking(
+    String subcacheName,
+    String key,
+    Uint8List? value, {
+    Duration ttl = const Duration(minutes: 1),
+  }) async {
+    await m.acquire();
+    try {
+      return set(
+        subcacheName,
+        key,
+        value,
+        ttl: ttl,
+      );
+    } finally {
+      m.release();
+    }
+  }
+
   /// Clear the value stored in subcache [subcacheName] for key [key].
-  Future<void> purge(String subcacheName, String key) {
-    final Cache<Uint8List> subcache = cache.withPrefix(subcacheName);
-    return subcache[key].purge(retries: maxCacheGetAttempts);
+  ///
+  /// Note: these methods are intended to prevent issues around race conditions
+  /// when storing and retrieving github tokens. Care should be taken to use the
+  /// locking methods together when accessing data from an entity using the
+  /// cache.
+  Future<void> purge(String subcacheName, String key) async {
+    await m.acquire();
+    try {
+      final Cache<Uint8List> subcache = cache.withPrefix(subcacheName);
+      return subcache[key].purge(retries: maxCacheGetAttempts);
+    } finally {
+      m.release();
+    }
   }
 
   void dispose() {
diff --git a/app_dart/lib/src/service/config.dart b/app_dart/lib/src/service/config.dart
index 5b1a89f..663d345 100644
--- a/app_dart/lib/src/service/config.dart
+++ b/app_dart/lib/src/service/config.dart
@@ -326,7 +326,7 @@
 
   Future<String> generateGithubToken(gh.RepositorySlug slug) async {
     // GitHub's secondary rate limits are run into very frequently when making auth tokens.
-    final Uint8List? cacheValue = await _cache.getOrCreate(
+    final Uint8List? cacheValue = await _cache.getOrCreateWithLocking(
       configCacheName,
       'githubToken-${slug.fullName}',
       createFn: () => _generateGithubToken(slug),
diff --git a/app_dart/pubspec.lock b/app_dart/pubspec.lock
index ef0f5c0..9f1ecb2 100644
--- a/app_dart/pubspec.lock
+++ b/app_dart/pubspec.lock
@@ -497,6 +497,14 @@
       url: "https://pub.dev"
     source: hosted
     version: "5.3.2"
+  mutex:
+    dependency: "direct main"
+    description:
+      name: mutex
+      sha256: "03116a4e46282a671b46c12de649d72c0ed18188ffe12a8d0fc63e83f4ad88f4"
+      url: "https://pub.dev"
+    source: hosted
+    version: "3.0.1"
   neat_cache:
     dependency: "direct main"
     description:
diff --git a/app_dart/pubspec.yaml b/app_dart/pubspec.yaml
index dcd383e..a6eecbf 100644
--- a/app_dart/pubspec.yaml
+++ b/app_dart/pubspec.yaml
@@ -31,6 +31,7 @@
   logging: ^1.1.0
   meta: ^1.8.0
   mime: ^1.0.2
+  mutex: ^3.0.1
   neat_cache: ^2.0.2
   path: ^1.8.2
   process: ^4.2.4
diff --git a/app_dart/test/request_handlers/flush_cache_test.dart b/app_dart/test/request_handlers/flush_cache_test.dart
index 21246a8..d7c0d99 100644
--- a/app_dart/test/request_handlers/flush_cache_test.dart
+++ b/app_dart/test/request_handlers/flush_cache_test.dart
@@ -48,7 +48,7 @@
       );
       await tester.get(handler);
 
-      expect(await cache.getOrCreate(Config.configCacheName, cacheKey), null);
+      expect(await cache.getOrCreate(Config.configCacheName, cacheKey, createFn: null), null);
     });
 
     test('raises error if cache key not passed', () async {
diff --git a/app_dart/test/request_handling/subscription_handler_test.dart b/app_dart/test/request_handling/subscription_handler_test.dart
index 04b4684..bee1304 100644
--- a/app_dart/test/request_handling/subscription_handler_test.dart
+++ b/app_dart/test/request_handling/subscription_handler_test.dart
@@ -95,19 +95,19 @@
       expect(response.statusCode, HttpStatus.ok);
       // 2. Empty message is returned as this was already processed
       expect(responseBody, '123 was already processed');
-      expect(await cache.getOrCreate(subscription.subscriptionName, '123'), isNotNull);
+      expect(await cache.getOrCreate(subscription.subscriptionName, '123', createFn: null), isNotNull);
     });
 
     test('ensure messages can be retried', () async {
       final CacheService cache = CacheService(inMemory: true);
       subscription = ErrorTest(cache);
       HttpClientResponse response = await issueRequest(body: jsonEncode(testEnvelope));
-      Uint8List? messageLock = await cache.getOrCreate('error', '123');
+      Uint8List? messageLock = await cache.getOrCreate('error', '123', createFn: null);
       expect(response.statusCode, HttpStatus.internalServerError);
       expect(messageLock, isNull);
 
       response = await issueRequest(body: jsonEncode(testEnvelope));
-      messageLock = await cache.getOrCreate('error', '123');
+      messageLock = await cache.getOrCreate('error', '123', createFn: null);
       expect(response.statusCode, HttpStatus.internalServerError);
       expect(messageLock, isNull);
     });
diff --git a/app_dart/test/service/cache_service_test.dart b/app_dart/test/service/cache_service_test.dart
index 165d4ea..d2d3310 100644
--- a/app_dart/test/service/cache_service_test.dart
+++ b/app_dart/test/service/cache_service_test.dart
@@ -2,6 +2,7 @@
 // Use of this source code is governed by a BSD-style license that can be
 // found in the LICENSE file.
 
+import 'dart:async';
 import 'dart:typed_data';
 
 import 'package:cocoon_service/src/service/cache_service.dart';
@@ -22,7 +23,11 @@
     });
 
     test('returns null when no value exists', () async {
-      final Uint8List? value = await cache.getOrCreate(testSubcacheName, 'abc');
+      final Uint8List? value = await cache.getOrCreate(
+        testSubcacheName,
+        'abc',
+        createFn: null,
+      );
 
       expect(value, isNull);
     });
@@ -33,7 +38,11 @@
 
       await cache.set(testSubcacheName, testKey, expectedValue);
 
-      final Uint8List? value = await cache.getOrCreate(testSubcacheName, testKey);
+      final Uint8List? value = await cache.getOrCreate(
+        testSubcacheName,
+        testKey,
+        createFn: null,
+      );
 
       expect(value, expectedValue);
     });
@@ -47,10 +56,18 @@
       await cache.set(testSubcacheName, testKey1, expectedValue1);
       await cache.set(testSubcacheName, testKey2, expectedValue2);
 
-      final Uint8List? value1 = await cache.getOrCreate(testSubcacheName, testKey1);
+      final Uint8List? value1 = await cache.getOrCreate(
+        testSubcacheName,
+        testKey1,
+        createFn: null,
+      );
       expect(value1, null);
 
-      final Uint8List? value2 = await cache.getOrCreate(testSubcacheName, testKey2);
+      final Uint8List? value2 = await cache.getOrCreate(
+        testSubcacheName,
+        testKey2,
+        createFn: null,
+      );
       expect(value2, expectedValue2);
     });
 
@@ -68,7 +85,11 @@
 
       cache.cacheValue = mockMainCache;
 
-      final Uint8List? value = await cache.getOrCreate(testSubcacheName, 'does not matter');
+      final Uint8List? value = await cache.getOrCreate(
+        testSubcacheName,
+        'does not matter',
+        createFn: null,
+      );
       verify(mockTestSubcache['does not matter']).called(2);
       expect(value, Uint8List.fromList('abc123'.codeUnits));
     });
@@ -88,7 +109,11 @@
 
       cache.cacheValue = mockMainCache;
 
-      final Uint8List? value = await cache.getOrCreate(testSubcacheName, 'does not matter');
+      final Uint8List? value = await cache.getOrCreate(
+        testSubcacheName,
+        'does not matter',
+        createFn: null,
+      );
       verify(mockTestSubcache['does not matter']).called(CacheService.maxCacheGetAttempts);
       expect(value, isNull);
     });
@@ -108,13 +133,21 @@
 
       await cache.set(testSubcacheName, testKey, expectedValue);
 
-      final Uint8List? value = await cache.getOrCreate(testSubcacheName, testKey);
+      final Uint8List? value = await cache.getOrCreate(
+        testSubcacheName,
+        testKey,
+        createFn: null,
+      );
 
       expect(value, expectedValue);
 
       await cache.purge(testSubcacheName, testKey);
 
-      final Uint8List? valueAfterPurge = await cache.getOrCreate(testSubcacheName, testKey);
+      final Uint8List? valueAfterPurge = await cache.getOrCreate(
+        testSubcacheName,
+        testKey,
+        createFn: null,
+      );
       expect(valueAfterPurge, isNull);
     });
 
@@ -157,6 +190,64 @@
       );
       verify(entry.set(any, testDuration)).called(1);
     });
+
+    test('set does not block read attempt', () async {
+      const String testKey = 'abc';
+      final Uint8List expectedValue = Uint8List.fromList('123'.codeUnits);
+
+      final cacheWrite = cache.setWithLocking(testSubcacheName, testKey, expectedValue);
+      Uint8List? valueAfterSet = await cache.getOrCreateWithLocking(
+        testSubcacheName,
+        testKey,
+        createFn: null,
+      );
+
+      expect(valueAfterSet, null);
+      await cacheWrite;
+      valueAfterSet = await cache.getOrCreateWithLocking(
+        testSubcacheName,
+        testKey,
+        createFn: null,
+      );
+      expect(valueAfterSet, expectedValue);
+    });
+
+    test('read locks are not blocking', () async {
+      const String testKey = 'abc';
+      final Uint8List expectedValue = Uint8List.fromList('123'.codeUnits);
+
+      await cache.setWithLocking(testSubcacheName, testKey, expectedValue);
+      final Future<Uint8List?> valueAfterSet = cache.getOrCreateWithLocking(
+        testSubcacheName,
+        testKey,
+        createFn: null,
+      );
+      final Uint8List? valueAfterSet2 = await cache.getOrCreateWithLocking(
+        testSubcacheName,
+        testKey,
+        createFn: null,
+      );
+
+      expect(valueAfterSet2, expectedValue);
+      await valueAfterSet.then((value) => expect(value, expectedValue));
+    });
+
+    test('write locks are blocking', () async {
+      const String testKey = 'abc';
+      final Uint8List expectedValue = Uint8List.fromList('123'.codeUnits);
+      final Uint8List newValue = Uint8List.fromList('345'.codeUnits);
+
+      final cacheWrite = cache.setWithLocking(testSubcacheName, testKey, expectedValue);
+      final cacheWrite2 = cache.setWithLocking(testSubcacheName, testKey, newValue);
+      await cacheWrite;
+      final Uint8List? readValue = await cache.getOrCreateWithLocking(
+        testSubcacheName,
+        testKey,
+        createFn: null,
+      );
+      expect(readValue, expectedValue);
+      await cacheWrite2;
+    });
   });
 }
 
diff --git a/app_dart/test/src/utilities/mocks.mocks.dart b/app_dart/test/src/utilities/mocks.mocks.dart
index 0e33d34..762708a 100644
--- a/app_dart/test/src/utilities/mocks.mocks.dart
+++ b/app_dart/test/src/utilities/mocks.mocks.dart
@@ -7152,6 +7152,11 @@
         returnValue: '',
       ) as String);
   @override
+  String get version => (super.noSuchMethod(
+        Invocation.getter(#version),
+        returnValue: '',
+      ) as String);
+  @override
   _i2.Client get client => (super.noSuchMethod(
         Invocation.getter(#client),
         returnValue: _FakeClient_0(
diff --git a/auto_submit/pubspec.lock b/auto_submit/pubspec.lock
index 20f4db1..9ea70d2 100644
--- a/auto_submit/pubspec.lock
+++ b/auto_submit/pubspec.lock
@@ -333,7 +333,7 @@
     dependency: "direct main"
     description:
       name: graphql
-      sha256: "68bb34f7d239b7c9530c37de754ccb4b9e19927d838fd103da297444c11dad62"
+      sha256: "2a04c92d1fa897de21cd76e13ed212e7da1b0c2036ed54bd148d0b1679fbd72e"
       url: "https://pub.dev"
     source: hosted
     version: "5.1.2-beta.6"