blob: 5c2f888fa9de96b72e1c5f72ad001a4f5542a9b5 [file] [log] [blame]
// Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
part of '../../db.dart';
/// A function definition for transactional functions.
///
/// The function will be given a [Transaction] object which can be used to make
/// lookups/queries and queue modifications (inserts/updates/deletes).
typedef TransactionHandler<T> = Future<T> Function(Transaction transaction);
/// A datastore transaction.
///
/// It can be used for making lookups/queries and queue modifications
/// (inserts/updates/deletes). Finally the transaction can be either committed
/// or rolled back.
class Transaction {
static const int _transactionStarted = 0;
static const int _transactionRolledBack = 1;
static const int _transactionCommitted = 2;
static const int _transactionCommitFailed = 3;
final DatastoreDB db;
final ds.Transaction _datastoreTransaction;
final List<Model> _inserts = [];
final List<Key> _deletes = [];
int _state = _transactionStarted;
Transaction(this.db, this._datastoreTransaction);
/// Looks up [keys] within this transaction.
Future<List<T?>> lookup<T extends Model>(List<Key> keys) {
return _lookupHelper<T>(
db,
keys,
datastoreTransaction: _datastoreTransaction,
);
}
/// Looks up a single [key] within this transaction, and returns the
/// associated [Model] object.
///
/// If [orElse] is specified, then it will be consulted to provide a default
/// value for the model object in the event that [key] was not found within
/// the transaction.
///
/// If the [key] is not found within the transaction and [orElse] was not
/// specified, then a [KeyNotFoundException] will be thrown.
Future<T> lookupValue<T extends Model>(Key key,
{T Function()? orElse}) async {
final values = await lookup<T>(<Key>[key]);
assert(values.length == 1);
var value = values.single;
if (value == null) {
if (orElse != null) {
value = orElse();
} else {
throw KeyNotFoundException(key);
}
}
return value;
}
/// Looks up a single [key] in the datastore, and returns the associated
/// [Model] object.
///
/// If the [key] is not found in the datastore, null will be returned.
Future<T?> lookupOrNull<T extends Model>(Key key) async {
final values = await lookup<T>(<Key>[key]);
assert(values.length == 1);
return values.single;
}
/// Enqueues [inserts] and [deletes] which should be committed at commit time.
void queueMutations({List<Model>? inserts, List<Key>? deletes}) {
_checkSealed();
if (inserts != null) {
_inserts.addAll(inserts);
}
if (deletes != null) {
_deletes.addAll(deletes);
}
}
/// Query for [kind] models with [ancestorKey].
///
/// Note that [ancestorKey] is required, since a transaction is not allowed to
/// touch/look at an arbitrary number of rows.
Query<T> query<T extends Model>(Key ancestorKey, {Partition? partition}) {
// TODO(#25): The `partition` element is redundant and should be removed.
if (partition == null) {
partition = ancestorKey.partition;
} else if (ancestorKey.partition != partition) {
throw ArgumentError(
'Ancestor queries must have the same partition in the ancestor key '
'as the partition where the query executes in.');
}
_checkSealed();
return Query<T>(db,
partition: partition,
ancestorKey: ancestorKey,
datastoreTransaction: _datastoreTransaction);
}
/// Rolls this transaction back.
Future rollback() {
_checkSealed(changeState: _transactionRolledBack, allowFailed: true);
return db.datastore.rollback(_datastoreTransaction);
}
/// Commits this transaction including all of the queued mutations.
Future commit() {
_checkSealed(changeState: _transactionCommitted);
try {
return _commitHelper(db,
inserts: _inserts,
deletes: _deletes,
datastoreTransaction: _datastoreTransaction);
} catch (error) {
_state = _transactionCommitFailed;
rethrow;
}
}
void _checkSealed({int? changeState, bool allowFailed = false}) {
if (_state == _transactionCommitted) {
throw StateError('The transaction has already been committed.');
} else if (_state == _transactionRolledBack) {
throw StateError('The transaction has already been rolled back.');
} else if (_state == _transactionCommitFailed && !allowFailed) {
throw StateError('The transaction has attempted commit and failed.');
}
if (changeState != null) {
_state = changeState;
}
}
}
class Query<T extends Model> {
final _relationMapping = const <String, ds.FilterRelation>{
'<': ds.FilterRelation.LessThan,
'<=': ds.FilterRelation.LessThanOrEqual,
'>': ds.FilterRelation.GreaterThan,
'>=': ds.FilterRelation.GreaterThanOrEqual,
'=': ds.FilterRelation.Equal,
};
final DatastoreDB _db;
final ds.Transaction? _transaction;
final String _kind;
final Partition? _partition;
final Key? _ancestorKey;
final List<ds.Filter> _filters = [];
final List<ds.Order> _orders = [];
int? _offset;
int? _limit;
Query(DatastoreDB dbImpl,
{Partition? partition,
Key? ancestorKey,
ds.Transaction? datastoreTransaction})
: _db = dbImpl,
_kind = dbImpl.modelDB.kindName(T),
_partition = partition,
_ancestorKey = ancestorKey,
_transaction = datastoreTransaction;
/// Adds a filter to this [Query].
///
/// [filterString] has form "name OP" where 'name' is a fieldName of the
/// model and OP is an operator. The following operators are supported:
///
/// * '<' (less than)
/// * '<=' (less than or equal)
/// * '>' (greater than)
/// * '>=' (greater than or equal)
/// * '=' (equal)
///
/// [comparisonObject] is the object for comparison.
void filter(String filterString, Object? comparisonObject) {
var parts = filterString.split(' ');
if (parts.length != 2 || !_relationMapping.containsKey(parts[1])) {
throw ArgumentError("Invalid filter string '$filterString'.");
}
var name = parts[0];
var comparison = parts[1];
var propertyName = _convertToDatastoreName(name);
// This is for backwards compatibility: We allow [datastore.Key]s for now.
// TODO: We should remove the condition in a major version update of
// `package:gcloud`.
if (comparisonObject is! ds.Key) {
comparisonObject = _db.modelDB
.toDatastoreValue(_kind, name, comparisonObject, forComparison: true);
}
_filters.add(ds.Filter(
_relationMapping[comparison]!, propertyName, comparisonObject!));
}
/// Adds an order to this [Query].
///
/// [orderString] has the form "-name" where 'name' is a fieldName of the
/// model and the optional '-' says whether the order is descending or
/// ascending.
void order(String orderString) {
// TODO: validate [orderString] (e.g. is name valid)
if (orderString.startsWith('-')) {
_orders.add(ds.Order(ds.OrderDirection.Descending,
_convertToDatastoreName(orderString.substring(1))));
} else {
_orders.add(ds.Order(
ds.OrderDirection.Ascending, _convertToDatastoreName(orderString)));
}
}
/// Sets the [offset] of this [Query].
///
/// When running this query, [offset] results will be skipped.
void offset(int offset) {
_offset = offset;
}
/// Sets the [limit] of this [Query].
///
/// When running this query, a maximum of [limit] results will be returned.
void limit(int limit) {
_limit = limit;
}
/// Execute this [Query] on the datastore.
///
/// Outside of transactions this method might return stale data or may not
/// return the newest updates performed on the datastore since updates
/// will be reflected in the indices in an eventual consistent way.
Stream<T> run() {
ds.Key? ancestorKey;
if (_ancestorKey != null) {
ancestorKey = _db.modelDB.toDatastoreKey(_ancestorKey!);
}
var query = ds.Query(
ancestorKey: ancestorKey,
kind: _kind,
filters: _filters,
orders: _orders,
offset: _offset,
limit: _limit);
ds.Partition? partition;
if (_partition != null) {
partition = ds.Partition(_partition!.namespace);
}
return StreamFromPages<ds.Entity>((int pageSize) {
if (_transaction != null) {
if (partition != null) {
return _db.datastore
.query(query, transaction: _transaction!, partition: partition);
}
return _db.datastore.query(query, transaction: _transaction!);
}
if (partition != null) {
return _db.datastore.query(query, partition: partition);
}
return _db.datastore.query(query);
}).stream.map<T>((e) => _db.modelDB.fromDatastoreEntity(e)!);
}
// TODO:
// - add runPaged() returning Page<Model>
// - add run*() method once we have EntityResult{Entity,Cursor} in low-level
// API.
String _convertToDatastoreName(String name) {
var propertyName = _db.modelDB.fieldNameToPropertyName(_kind, name);
if (propertyName == null) {
throw ArgumentError('Field $name is not available for kind $_kind');
}
return propertyName;
}
}
class DatastoreDB {
final ds.Datastore datastore;
final ModelDB _modelDB;
final Partition _defaultPartition;
DatastoreDB(this.datastore, {ModelDB? modelDB, Partition? defaultPartition})
: _modelDB = modelDB ?? ModelDBImpl(),
_defaultPartition = defaultPartition ?? Partition(null);
/// The [ModelDB] used to serialize/deserialize objects.
ModelDB get modelDB => _modelDB;
/// Gets the empty key using the default [Partition].
///
/// Model keys with parent set to [emptyKey] will create their own entity
/// groups.
Key get emptyKey => defaultPartition.emptyKey;
/// Gets the default [Partition].
Partition get defaultPartition => _defaultPartition;
/// Creates a new [Partition] with namespace [namespace].
Partition newPartition(String namespace) {
return Partition(namespace);
}
/// Begins a new a new transaction.
///
/// A transaction can touch only a limited number of entity groups. This limit
/// is currently 5.
// TODO: Add retries and/or auto commit/rollback.
Future<T> withTransaction<T>(TransactionHandler<T> transactionHandler) {
return datastore
.beginTransaction(crossEntityGroup: true)
.then((datastoreTransaction) {
var transaction = Transaction(this, datastoreTransaction);
return transactionHandler(transaction);
});
}
/// Build a query for [kind] models.
Query<T> query<T extends Model>({Partition? partition, Key? ancestorKey}) {
// TODO(#26): There is only one case where `partition` is not redundant
// Namely if `ancestorKey == null` and `partition != null`. We could
// say we get rid of `partition` and enforce `ancestorKey` to
// be `Partition.emptyKey`?
if (partition == null) {
if (ancestorKey != null) {
partition = ancestorKey.partition;
} else {
partition = defaultPartition;
}
} else if (ancestorKey != null && partition != ancestorKey.partition) {
throw ArgumentError(
'Ancestor queries must have the same partition in the ancestor key '
'as the partition where the query executes in.');
}
return Query<T>(this, partition: partition, ancestorKey: ancestorKey);
}
/// Looks up [keys] in the datastore and returns a list of [Model] objects.
///
/// Any key that is not found in the datastore will have a corresponding
/// value of null in the list of model objects that is returned.
///
/// For transactions, please use [beginTransaction] and call the [lookup]
/// method on it's returned [Transaction] object.
///
/// See also:
///
/// * [lookupValue], which looks a single value up by its key, requiring a
/// successful lookup.
Future<List<T?>> lookup<T extends Model>(List<Key> keys) {
return _lookupHelper<T>(this, keys);
}
/// Looks up a single [key] in the datastore, and returns the associated
/// [Model] object.
///
/// If [orElse] is specified, then it will be consulted to provide a default
/// value for the model object in the event that [key] was not found in the
/// datastore.
///
/// If the [key] is not found in the datastore and [orElse] was not
/// specified, then a [KeyNotFoundException] will be thrown.
Future<T> lookupValue<T extends Model>(Key key,
{T Function()? orElse}) async {
final values = await lookup<T>(<Key>[key]);
assert(values.length == 1);
var value = values.single;
if (value == null) {
if (orElse != null) {
value = orElse();
} else {
throw KeyNotFoundException(key);
}
}
return value;
}
/// Looks up a single [key] in the datastore, and returns the associated
/// [Model] object.
///
/// If the [key] is not found in the datastore, null will be returned.
Future<T?> lookupOrNull<T extends Model>(Key key) async {
final values = await lookup<T>(<Key>[key]);
assert(values.length == 1);
return values.single;
}
/// Add [inserts] to the datastore and remove [deletes] from it.
///
/// The order of inserts and deletes is not specified. When the commit is done
/// direct lookups will see the effect but non-ancestor queries will see the
/// change in an eventual consistent way.
///
/// The inserts are done as upserts unless the provided model does not have an
/// id, in which case an autoId will be generated.
///
/// For transactions, please use `beginTransaction` and it's returned
/// [Transaction] object.
Future commit({List<Model>? inserts, List<Key>? deletes}) {
return _commitHelper(this, inserts: inserts, deletes: deletes);
}
}
Future _commitHelper(DatastoreDB db,
{List<Model>? inserts,
List<Key>? deletes,
ds.Transaction? datastoreTransaction}) {
List<ds.Entity>? entityInserts, entityAutoIdInserts;
List<ds.Key>? entityDeletes;
late List<Model> autoIdModelInserts;
if (inserts != null) {
entityInserts = <ds.Entity>[];
entityAutoIdInserts = <ds.Entity>[];
autoIdModelInserts = <Model>[];
for (var model in inserts) {
// If parent was not explicitly set, we assume this model will map to
// its own entity group.
model.parentKey ??= db.defaultPartition.emptyKey;
if (model.id == null) {
autoIdModelInserts.add(model);
entityAutoIdInserts.add(db.modelDB.toDatastoreEntity(model));
} else {
entityInserts.add(db.modelDB.toDatastoreEntity(model));
}
}
}
if (deletes != null) {
entityDeletes = deletes.map(db.modelDB.toDatastoreKey).toList();
}
Future<ds.CommitResult> r;
if (datastoreTransaction != null) {
r = db.datastore.commit(
inserts: entityInserts ?? [],
autoIdInserts: entityAutoIdInserts ?? [],
deletes: entityDeletes ?? [],
transaction: datastoreTransaction);
} else {
r = db.datastore.commit(
inserts: entityInserts ?? [],
autoIdInserts: entityAutoIdInserts ?? [],
deletes: entityDeletes ?? []);
}
return r.then((ds.CommitResult result) {
if (entityAutoIdInserts != null && entityAutoIdInserts.isNotEmpty) {
for (var i = 0; i < result.autoIdInsertKeys.length; i++) {
var key = db.modelDB.fromDatastoreKey(result.autoIdInsertKeys[i]);
autoIdModelInserts[i].parentKey = key.parent;
autoIdModelInserts[i].id = key.id;
}
}
});
}
Future<List<T?>> _lookupHelper<T extends Model>(DatastoreDB db, List<Key> keys,
{ds.Transaction? datastoreTransaction}) {
var entityKeys = keys.map(db.modelDB.toDatastoreKey).toList();
if (datastoreTransaction != null) {
return db.datastore
.lookup(entityKeys, transaction: datastoreTransaction)
.then((List<ds.Entity?> entities) {
return entities.map<T?>(db.modelDB.fromDatastoreEntity).toList();
});
}
return db.datastore.lookup(entityKeys).then((List<ds.Entity?> entities) {
return entities.map<T?>(db.modelDB.fromDatastoreEntity).toList();
});
}