blob: ca64f648c3364867d7354a2a6c8d54e0abe7b0e3 [file] [log] [blame]
// Copyright 2013 The Flutter Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
import 'package:googleapis/storage/v1.dart';
import 'package:googleapis_auth/googleapis_auth.dart';
/// Global (in terms of earth) mutex using Google Cloud Storage.
class GcsLock {
/// Create a lock with an authenticated client and a GCS bucket name.
///
/// The client is used to communicate with Google Cloud Storage APIs.
GcsLock(this._client, this._bucketName)
: assert(_client != null),
assert(_bucketName != null) {
_api = StorageApi(_client);
}
/// Create a temporary lock file in GCS, and use it as a mutex mechanism to
/// run a piece of code exclusively.
///
/// There must be no existing lock file with the same name in order to
/// proceed. If multiple [GcsLock]s with the same `bucketName` and
/// `lockFileName` try [protectedRun] simultaneously, only one will proceed
/// and create the lock file. All others will be blocked.
///
/// When [protectedRun] finishes, the lock file is deleted, and other blocked
/// [protectedRun] may proceed.
///
/// If the lock file is stuck (e.g., `_unlock` is interrupted unexpectedly),
/// one may need to manually delete the lock file from GCS to unblock any
/// [protectedRun] that may depend on it.
Future<void> protectedRun(
String lockFileName, Future<void> Function() f) async {
await _lock(lockFileName);
try {
await f();
} catch (e, stacktrace) {
print(stacktrace);
rethrow;
} finally {
await _unlock(lockFileName);
}
}
Future<void> _lock(String lockFileName) async {
final Object object = Object();
object.bucket = _bucketName;
object.name = lockFileName;
final Media content = Media(const Stream<List<int>>.empty(), 0);
Duration waitPeriod = const Duration(milliseconds: 10);
bool locked = false;
while (!locked) {
try {
await _api.objects.insert(object, _bucketName,
ifGenerationMatch: '0', uploadMedia: content);
locked = true;
} on DetailedApiRequestError catch (e) {
if (e.status == 412) {
// Status 412 means that the lock file already exists. Wait until
// that lock file is deleted.
await Future<void>.delayed(waitPeriod);
waitPeriod *= 2;
if (waitPeriod >= _kWarningThreshold) {
print(
'The lock is waiting for a long time: $waitPeriod. '
'If the lock file $lockFileName in bucket $_bucketName '
'seems to be stuck (i.e., it was created a long time ago and '
'no one seems to be owning it currently), delete it manually '
'to unblock this.',
);
}
} else {
rethrow;
}
}
}
}
Future<void> _unlock(String lockFileName) async {
await _api.objects.delete(_bucketName, lockFileName);
}
late StorageApi _api;
final String _bucketName;
final AuthClient _client;
static const Duration _kWarningThreshold = Duration(seconds: 10);
}