// Copyright (c) 2014, the Dart project authors.  Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

part of '../../db.dart';

/// A function definition for transactional functions.
///
/// The function will be given a [Transaction] object which can be used to make
/// lookups/queries and queue modifications (inserts/updates/deletes).
typedef TransactionHandler<T> = Future<T> Function(Transaction transaction);

/// A datastore transaction.
///
/// It can be used for making lookups/queries and queue modifications
/// (inserts/updates/deletes). Finally the transaction can be either committed
/// or rolled back.
class Transaction {
  static const int _transactionStarted = 0;
  static const int _transactionRolledBack = 1;
  static const int _transactionCommitted = 2;
  static const int _transactionCommitFailed = 3;

  final DatastoreDB db;
  final ds.Transaction _datastoreTransaction;

  final List<Model> _inserts = [];
  final List<Key> _deletes = [];

  int _state = _transactionStarted;

  Transaction(this.db, this._datastoreTransaction);

  /// Looks up [keys] within this transaction.
  Future<List<T?>> lookup<T extends Model>(List<Key> keys) {
    return _lookupHelper<T>(
      db,
      keys,
      datastoreTransaction: _datastoreTransaction,
    );
  }

  /// Looks up a single [key] within this transaction, and returns the
  /// associated [Model] object.
  ///
  /// If [orElse] is specified, then it will be consulted to provide a default
  /// value for the model object in the event that [key] was not found within
  /// the transaction.
  ///
  /// If the [key] is not found within the transaction and [orElse] was not
  /// specified, then a [KeyNotFoundException] will be thrown.
  Future<T> lookupValue<T extends Model>(Key key,
      {T Function()? orElse}) async {
    final values = await lookup<T>(<Key>[key]);
    assert(values.length == 1);
    var value = values.single;
    if (value == null) {
      if (orElse != null) {
        value = orElse();
      } else {
        throw KeyNotFoundException(key);
      }
    }
    return value;
  }

  /// Looks up a single [key] in the datastore, and returns the associated
  /// [Model] object.
  ///
  /// If the [key] is not found in the datastore, null will be returned.
  Future<T?> lookupOrNull<T extends Model>(Key key) async {
    final values = await lookup<T>(<Key>[key]);
    assert(values.length == 1);
    return values.single;
  }

  /// Enqueues [inserts] and [deletes] which should be committed at commit time.
  void queueMutations({List<Model>? inserts, List<Key>? deletes}) {
    _checkSealed();
    if (inserts != null) {
      _inserts.addAll(inserts);
    }
    if (deletes != null) {
      _deletes.addAll(deletes);
    }
  }

  /// Query for [kind] models with [ancestorKey].
  ///
  /// Note that [ancestorKey] is required, since a transaction is not allowed to
  /// touch/look at an arbitrary number of rows.
  Query<T> query<T extends Model>(Key ancestorKey, {Partition? partition}) {
    // TODO(#25): The `partition` element is redundant and should be removed.
    if (partition == null) {
      partition = ancestorKey.partition;
    } else if (ancestorKey.partition != partition) {
      throw ArgumentError(
          'Ancestor queries must have the same partition in the ancestor key '
          'as the partition where the query executes in.');
    }
    _checkSealed();
    return Query<T>(db,
        partition: partition,
        ancestorKey: ancestorKey,
        datastoreTransaction: _datastoreTransaction);
  }

  /// Rolls this transaction back.
  Future rollback() {
    _checkSealed(changeState: _transactionRolledBack, allowFailed: true);
    return db.datastore.rollback(_datastoreTransaction);
  }

  /// Commits this transaction including all of the queued mutations.
  Future commit() {
    _checkSealed(changeState: _transactionCommitted);
    try {
      return _commitHelper(db,
          inserts: _inserts,
          deletes: _deletes,
          datastoreTransaction: _datastoreTransaction);
    } catch (error) {
      _state = _transactionCommitFailed;
      rethrow;
    }
  }

  void _checkSealed({int? changeState, bool allowFailed = false}) {
    if (_state == _transactionCommitted) {
      throw StateError('The transaction has already been committed.');
    } else if (_state == _transactionRolledBack) {
      throw StateError('The transaction has already been rolled back.');
    } else if (_state == _transactionCommitFailed && !allowFailed) {
      throw StateError('The transaction has attempted commit and failed.');
    }
    if (changeState != null) {
      _state = changeState;
    }
  }
}

class Query<T extends Model> {
  final _relationMapping = const <String, ds.FilterRelation>{
    '<': ds.FilterRelation.LessThan,
    '<=': ds.FilterRelation.LessThanOrEqual,
    '>': ds.FilterRelation.GreaterThan,
    '>=': ds.FilterRelation.GreaterThanOrEqual,
    '=': ds.FilterRelation.Equal,
  };

  final DatastoreDB _db;
  final ds.Transaction? _transaction;
  final String _kind;

  final Partition? _partition;
  final Key? _ancestorKey;

  final List<ds.Filter> _filters = [];
  final List<ds.Order> _orders = [];
  int? _offset;
  int? _limit;

  Query(DatastoreDB dbImpl,
      {Partition? partition,
      Key? ancestorKey,
      ds.Transaction? datastoreTransaction})
      : _db = dbImpl,
        _kind = dbImpl.modelDB.kindName(T),
        _partition = partition,
        _ancestorKey = ancestorKey,
        _transaction = datastoreTransaction;

  /// Adds a filter to this [Query].
  ///
  /// [filterString] has form "name OP" where 'name' is a fieldName of the
  /// model and OP is an operator. The following operators are supported:
  ///
  ///   * '<' (less than)
  ///   * '<=' (less than or equal)
  ///   * '>' (greater than)
  ///   * '>=' (greater than or equal)
  ///   * '=' (equal)
  ///
  /// [comparisonObject] is the object for comparison.
  void filter(String filterString, Object? comparisonObject) {
    var parts = filterString.split(' ');
    if (parts.length != 2 || !_relationMapping.containsKey(parts[1])) {
      throw ArgumentError("Invalid filter string '$filterString'.");
    }

    var name = parts[0];
    var comparison = parts[1];
    var propertyName = _convertToDatastoreName(name);

    // This is for backwards compatibility: We allow [datastore.Key]s for now.
    // TODO: We should remove the condition in a major version update of
    // `package:gcloud`.
    if (comparisonObject is! ds.Key) {
      comparisonObject = _db.modelDB
          .toDatastoreValue(_kind, name, comparisonObject, forComparison: true);
    }
    _filters.add(ds.Filter(
        _relationMapping[comparison]!, propertyName, comparisonObject!));
  }

  /// Adds an order to this [Query].
  ///
  /// [orderString] has the form "-name" where 'name' is a fieldName of the
  /// model and the optional '-' says whether the order is descending or
  /// ascending.
  void order(String orderString) {
    // TODO: validate [orderString] (e.g. is name valid)
    if (orderString.startsWith('-')) {
      _orders.add(ds.Order(ds.OrderDirection.Descending,
          _convertToDatastoreName(orderString.substring(1))));
    } else {
      _orders.add(ds.Order(
          ds.OrderDirection.Ascending, _convertToDatastoreName(orderString)));
    }
  }

  /// Sets the [offset] of this [Query].
  ///
  /// When running this query, [offset] results will be skipped.
  void offset(int offset) {
    _offset = offset;
  }

  /// Sets the [limit] of this [Query].
  ///
  /// When running this query, a maximum of [limit] results will be returned.
  void limit(int limit) {
    _limit = limit;
  }

  /// Execute this [Query] on the datastore.
  ///
  /// Outside of transactions this method might return stale data or may not
  /// return the newest updates performed on the datastore since updates
  /// will be reflected in the indices in an eventual consistent way.
  Stream<T> run() {
    ds.Key? ancestorKey;
    if (_ancestorKey != null) {
      ancestorKey = _db.modelDB.toDatastoreKey(_ancestorKey!);
    }
    var query = ds.Query(
        ancestorKey: ancestorKey,
        kind: _kind,
        filters: _filters,
        orders: _orders,
        offset: _offset,
        limit: _limit);

    ds.Partition? partition;
    if (_partition != null) {
      partition = ds.Partition(_partition!.namespace);
    }

    return StreamFromPages<ds.Entity>((int pageSize) {
      if (_transaction != null) {
        if (partition != null) {
          return _db.datastore
              .query(query, transaction: _transaction!, partition: partition);
        }
        return _db.datastore.query(query, transaction: _transaction!);
      }
      if (partition != null) {
        return _db.datastore.query(query, partition: partition);
      }
      return _db.datastore.query(query);
    }).stream.map<T>((e) => _db.modelDB.fromDatastoreEntity(e)!);
  }

  // TODO:
  // - add runPaged() returning Page<Model>
  // - add run*() method once we have EntityResult{Entity,Cursor} in low-level
  //   API.

  String _convertToDatastoreName(String name) {
    var propertyName = _db.modelDB.fieldNameToPropertyName(_kind, name);
    if (propertyName == null) {
      throw ArgumentError('Field $name is not available for kind $_kind');
    }
    return propertyName;
  }
}

class DatastoreDB {
  final ds.Datastore datastore;
  final ModelDB _modelDB;
  final Partition _defaultPartition;

  DatastoreDB(this.datastore, {ModelDB? modelDB, Partition? defaultPartition})
      : _modelDB = modelDB ?? ModelDBImpl(),
        _defaultPartition = defaultPartition ?? Partition(null);

  /// The [ModelDB] used to serialize/deserialize objects.
  ModelDB get modelDB => _modelDB;

  /// Gets the empty key using the default [Partition].
  ///
  /// Model keys with parent set to [emptyKey] will create their own entity
  /// groups.
  Key get emptyKey => defaultPartition.emptyKey;

  /// Gets the default [Partition].
  Partition get defaultPartition => _defaultPartition;

  /// Creates a new [Partition] with namespace [namespace].
  Partition newPartition(String namespace) {
    return Partition(namespace);
  }

  /// Begins a new a new transaction.
  ///
  /// A transaction can touch only a limited number of entity groups. This limit
  /// is currently 5.
  // TODO: Add retries and/or auto commit/rollback.
  Future<T> withTransaction<T>(TransactionHandler<T> transactionHandler) {
    return datastore
        .beginTransaction(crossEntityGroup: true)
        .then((datastoreTransaction) {
      var transaction = Transaction(this, datastoreTransaction);
      return transactionHandler(transaction);
    });
  }

  /// Build a query for [kind] models.
  Query<T> query<T extends Model>({Partition? partition, Key? ancestorKey}) {
    // TODO(#26): There is only one case where `partition` is not redundant
    // Namely if `ancestorKey == null` and `partition != null`. We could
    // say we get rid of `partition` and enforce `ancestorKey` to
    // be `Partition.emptyKey`?
    if (partition == null) {
      if (ancestorKey != null) {
        partition = ancestorKey.partition;
      } else {
        partition = defaultPartition;
      }
    } else if (ancestorKey != null && partition != ancestorKey.partition) {
      throw ArgumentError(
          'Ancestor queries must have the same partition in the ancestor key '
          'as the partition where the query executes in.');
    }
    return Query<T>(this, partition: partition, ancestorKey: ancestorKey);
  }

  /// Looks up [keys] in the datastore and returns a list of [Model] objects.
  ///
  /// Any key that is not found in the datastore will have a corresponding
  /// value of null in the list of model objects that is returned.
  ///
  /// For transactions, please use [beginTransaction] and call the [lookup]
  /// method on it's returned [Transaction] object.
  ///
  /// See also:
  ///
  ///  * [lookupValue], which looks a single value up by its key, requiring a
  ///    successful lookup.
  Future<List<T?>> lookup<T extends Model>(List<Key> keys) {
    return _lookupHelper<T>(this, keys);
  }

  /// Looks up a single [key] in the datastore, and returns the associated
  /// [Model] object.
  ///
  /// If [orElse] is specified, then it will be consulted to provide a default
  /// value for the model object in the event that [key] was not found in the
  /// datastore.
  ///
  /// If the [key] is not found in the datastore and [orElse] was not
  /// specified, then a [KeyNotFoundException] will be thrown.
  Future<T> lookupValue<T extends Model>(Key key,
      {T Function()? orElse}) async {
    final values = await lookup<T>(<Key>[key]);
    assert(values.length == 1);
    var value = values.single;
    if (value == null) {
      if (orElse != null) {
        value = orElse();
      } else {
        throw KeyNotFoundException(key);
      }
    }
    return value;
  }

  /// Looks up a single [key] in the datastore, and returns the associated
  /// [Model] object.
  ///
  /// If the [key] is not found in the datastore, null will be returned.
  Future<T?> lookupOrNull<T extends Model>(Key key) async {
    final values = await lookup<T>(<Key>[key]);
    assert(values.length == 1);
    return values.single;
  }

  /// Add [inserts] to the datastore and remove [deletes] from it.
  ///
  /// The order of inserts and deletes is not specified. When the commit is done
  /// direct lookups will see the effect but non-ancestor queries will see the
  /// change in an eventual consistent way.
  ///
  /// The inserts are done as upserts unless the provided model does not have an
  /// id, in which case an autoId will be generated.
  ///
  /// For transactions, please use `beginTransaction` and it's returned
  /// [Transaction] object.
  Future commit({List<Model>? inserts, List<Key>? deletes}) {
    return _commitHelper(this, inserts: inserts, deletes: deletes);
  }
}

Future _commitHelper(DatastoreDB db,
    {List<Model>? inserts,
    List<Key>? deletes,
    ds.Transaction? datastoreTransaction}) {
  List<ds.Entity>? entityInserts, entityAutoIdInserts;
  List<ds.Key>? entityDeletes;
  late List<Model> autoIdModelInserts;
  if (inserts != null) {
    entityInserts = <ds.Entity>[];
    entityAutoIdInserts = <ds.Entity>[];
    autoIdModelInserts = <Model>[];

    for (var model in inserts) {
      // If parent was not explicitly set, we assume this model will map to
      // its own entity group.
      model.parentKey ??= db.defaultPartition.emptyKey;
      if (model.id == null) {
        autoIdModelInserts.add(model);
        entityAutoIdInserts.add(db.modelDB.toDatastoreEntity(model));
      } else {
        entityInserts.add(db.modelDB.toDatastoreEntity(model));
      }
    }
  }
  if (deletes != null) {
    entityDeletes = deletes.map(db.modelDB.toDatastoreKey).toList();
  }
  Future<ds.CommitResult> r;
  if (datastoreTransaction != null) {
    r = db.datastore.commit(
        inserts: entityInserts ?? [],
        autoIdInserts: entityAutoIdInserts ?? [],
        deletes: entityDeletes ?? [],
        transaction: datastoreTransaction);
  } else {
    r = db.datastore.commit(
        inserts: entityInserts ?? [],
        autoIdInserts: entityAutoIdInserts ?? [],
        deletes: entityDeletes ?? []);
  }

  return r.then((ds.CommitResult result) {
    if (entityAutoIdInserts != null && entityAutoIdInserts.isNotEmpty) {
      for (var i = 0; i < result.autoIdInsertKeys.length; i++) {
        var key = db.modelDB.fromDatastoreKey(result.autoIdInsertKeys[i]);
        autoIdModelInserts[i].parentKey = key.parent;
        autoIdModelInserts[i].id = key.id;
      }
    }
  });
}

Future<List<T?>> _lookupHelper<T extends Model>(DatastoreDB db, List<Key> keys,
    {ds.Transaction? datastoreTransaction}) {
  var entityKeys = keys.map(db.modelDB.toDatastoreKey).toList();

  if (datastoreTransaction != null) {
    return db.datastore
        .lookup(entityKeys, transaction: datastoreTransaction)
        .then((List<ds.Entity?> entities) {
      return entities.map<T?>(db.modelDB.fromDatastoreEntity).toList();
    });
  }
  return db.datastore.lookup(entityKeys).then((List<ds.Entity?> entities) {
    return entities.map<T?>(db.modelDB.fromDatastoreEntity).toList();
  });
}
