| // Copyright 2019 The Flutter Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| import 'dart:async'; |
| import 'dart:math'; |
| |
| import 'package:gcloud/datastore.dart' as gcloud_datastore; |
| import 'package:gcloud/db.dart'; |
| import 'package:github/github.dart'; |
| import 'package:meta/meta.dart'; |
| import 'package:grpc/grpc.dart'; |
| import 'package:retry/retry.dart'; |
| |
| import '../model/appengine/commit.dart'; |
| import '../model/appengine/github_build_status_update.dart'; |
| import '../model/appengine/github_gold_status_update.dart'; |
| import '../model/appengine/stage.dart'; |
| import '../model/appengine/task.dart'; |
| import '../model/appengine/time_series.dart'; |
| import '../model/appengine/time_series_value.dart'; |
| |
| /// Per the docs in [DatastoreDB.withTransaction], only 5 entity groups can |
| /// be touched in any given transaction, or the backing datastore will throw |
| /// an error. |
| const int defaultMaxEntityGroups = 5; |
| |
| /// This number inherits from old GO backend, and is upto change if necessary. |
| const int defaultTimeSeriesValuesNumber = 1500; |
| |
| /// Function signature for a [DatastoreService] provider. |
| typedef DatastoreServiceProvider = DatastoreService Function(DatastoreDB db); |
| |
| /// Function signature that will be executed with retries. |
| typedef RetryHandler = Function(); |
| |
| /// Runs a db transaction with retries. |
| /// |
| /// It uses quadratic backoff starting with 200ms and 3 max attempts. |
| /// for context please read https://github.com/flutter/flutter/issues/54615. |
| Future<void> runTransactionWithRetries(RetryHandler retryHandler, |
| {RetryOptions retryOptions}) { |
| final RetryOptions r = retryOptions ?? |
| const RetryOptions( |
| maxDelay: Duration(seconds: 10), |
| maxAttempts: 3, |
| ); |
| return r.retry( |
| retryHandler, |
| retryIf: (Exception e) => |
| e is gcloud_datastore.TransactionAbortedError || e is GrpcError, |
| ); |
| } |
| |
| /// Service class for interacting with App Engine cloud datastore. |
| /// |
| /// This service exists to provide an API for common datastore queries made by |
| /// the Cocoon backend. |
| @immutable |
| class DatastoreService { |
| /// Creates a new [DatastoreService]. |
| /// |
| /// The [db] argument must not be null. |
| const DatastoreService(this.db, this.maxEntityGroups, |
| {RetryOptions retryOptions}) |
| : assert(db != null, maxEntityGroups != null), |
| retryOptions = retryOptions ?? |
| const RetryOptions( |
| maxDelay: Duration(seconds: 10), |
| maxAttempts: 3, |
| ); |
| |
| /// Maximum number of entity groups to process at once. |
| final int maxEntityGroups; |
| |
| /// The backing [DatastoreDB] object. Guaranteed to be non-null. |
| final DatastoreDB db; |
| |
| /// Transaction retry configurations. |
| final RetryOptions retryOptions; |
| |
| /// Creates and returns a [DatastoreService] using [db] and [maxEntityGroups]. |
| static DatastoreService defaultProvider(DatastoreDB db) { |
| return DatastoreService(db ?? dbService, defaultMaxEntityGroups); |
| } |
| |
| /// Queries for recent commits. |
| /// |
| /// The [limit] argument specifies the maximum number of commits to retrieve. |
| /// |
| /// The returned commits will be ordered by most recent [Commit.timestamp]. |
| Stream<Commit> queryRecentCommits( |
| {int limit = 100, int timestamp, String branch}) { |
| timestamp ??= DateTime.now().millisecondsSinceEpoch; |
| branch ??= 'master'; |
| final Query<Commit> query = db.query<Commit>() |
| ..limit(limit) |
| ..filter('branch =', branch) |
| ..order('-timestamp') |
| ..filter('timestamp <', timestamp); |
| return query.run(); |
| } |
| |
| // Queries for recent commits without considering branches. |
| Stream<Commit> queryRecentCommitsNoBranch({int limit = 100, int timestamp}) { |
| timestamp ??= DateTime.now().millisecondsSinceEpoch; |
| final Query<Commit> query = db.query<Commit>() |
| ..limit(limit) |
| ..order('-timestamp') |
| ..filter('timestamp <', timestamp); |
| return query.run(); |
| } |
| |
| /// queryRecentTimeSerialsValues fetches the latest benchmark results starting from |
| /// [startFrom] and up to a given [limit]. |
| /// |
| /// If startFrom is nil, starts from the latest available record. |
| /// [startFrom] to be implemented... |
| Stream<TimeSeriesValue> queryRecentTimeSeriesValues(TimeSeries timeSeries, |
| {int limit = defaultTimeSeriesValuesNumber, |
| String startFrom, |
| String branch = 'master', |
| int timestamp}) { |
| timestamp ??= DateTime.now().millisecondsSinceEpoch; |
| final Query<TimeSeriesValue> query = |
| db.query<TimeSeriesValue>(ancestorKey: timeSeries.key) |
| ..filter('branch =', branch) |
| ..filter('createTimestamp <', timestamp) |
| ..limit(limit) |
| ..order('-createTimestamp'); |
| return query.run(); |
| } |
| |
| /// Queries for recent tasks that meet the specified criteria. |
| /// |
| /// Since each task belongs to a commit, this query implicitly includes a |
| /// query of the most recent N commits (see [queryRecentCommits]). The |
| /// [commitLimit] argument specifies how many commits to consider when |
| /// retrieving the list of recent tasks. |
| /// |
| /// The [taskLimit] argument specifies how many tasks to retrieve for each |
| /// commit that is considered. |
| /// |
| /// If [taskName] is specified, only tasks whose [Task.name] matches the |
| /// specified value will be returned. By default, tasks will be returned |
| /// regardless of their name. |
| /// |
| /// The returned tasks will be ordered by most recent [Commit.timestamp] |
| /// first, then by most recent [Task.createTimestamp]. |
| Stream<FullTask> queryRecentTasks( |
| {String taskName, |
| int commitLimit = 20, |
| int taskLimit = 20, |
| String branch = 'master'}) async* { |
| assert(commitLimit != null); |
| assert(taskLimit != null); |
| await for (Commit commit |
| in queryRecentCommits(limit: commitLimit, branch: branch)) { |
| final Query<Task> query = db.query<Task>(ancestorKey: commit.key) |
| ..limit(taskLimit) |
| ..order('-createTimestamp'); |
| if (taskName != null) { |
| query.filter('name =', taskName); |
| } |
| yield* query.run().map<FullTask>((Task task) => FullTask(task, commit)); |
| } |
| } |
| |
| /// Finds all tasks owned by the specified [commit] and partitions them into |
| /// stages. |
| /// |
| /// The returned list of stages will be ordered by the natural ordering of |
| /// [Stage]. |
| Future<List<Stage>> queryTasksGroupedByStage(Commit commit) async { |
| final Query<Task> query = db.query<Task>(ancestorKey: commit.key) |
| ..order('-stageName'); |
| final Map<String, StageBuilder> stages = <String, StageBuilder>{}; |
| await for (Task task in query.run()) { |
| if (!stages.containsKey(task.stageName)) { |
| stages[task.stageName] = StageBuilder() |
| ..commit = commit |
| ..name = task.stageName; |
| } |
| stages[task.stageName].tasks.add(task); |
| } |
| final List<Stage> result = stages.values |
| .map<Stage>((StageBuilder stage) => stage.build()) |
| .toList(); |
| return result..sort(); |
| } |
| |
| Future<GithubBuildStatusUpdate> queryLastStatusUpdate( |
| RepositorySlug slug, |
| PullRequest pr, |
| ) async { |
| final Query<GithubBuildStatusUpdate> query = db |
| .query<GithubBuildStatusUpdate>() |
| ..filter('repository =', slug.fullName) |
| ..filter('pr =', pr.number) |
| ..filter('head =', pr.head.sha); |
| final List<GithubBuildStatusUpdate> previousStatusUpdates = |
| await query.run().toList(); |
| |
| if (previousStatusUpdates.isEmpty) { |
| return GithubBuildStatusUpdate( |
| repository: slug.fullName, |
| pr: pr.number, |
| head: pr.head.sha, |
| status: null, |
| updates: 0, |
| ); |
| } else { |
| if (previousStatusUpdates.length > 1) { |
| throw StateError( |
| 'GithubBuildStatusUpdate should have no more than one entries on ' |
| 'repository ${slug.fullName}, pr ${pr.number}, head ${pr.head.sha}'); |
| } |
| return previousStatusUpdates.single; |
| } |
| } |
| |
| Future<GithubGoldStatusUpdate> queryLastGoldUpdate( |
| RepositorySlug slug, |
| PullRequest pr, |
| ) async { |
| final Query<GithubGoldStatusUpdate> query = db |
| .query<GithubGoldStatusUpdate>() |
| ..filter('repository =', slug.fullName) |
| ..filter('pr =', pr.number); |
| final List<GithubGoldStatusUpdate> previousStatusUpdates = |
| await query.run().toList(); |
| |
| if (previousStatusUpdates.isEmpty) { |
| return GithubGoldStatusUpdate( |
| pr: pr.number, |
| head: '', |
| status: '', |
| updates: 0, |
| description: '', |
| repository: slug.fullName, |
| ); |
| } else { |
| if (previousStatusUpdates.length > 1) { |
| throw StateError( |
| 'GithubGoldStatusUpdate should have no more than one entry on ' |
| 'repository ${slug.fullName}, pr ${pr.number}.'); |
| } |
| return previousStatusUpdates.single; |
| } |
| } |
| |
| /// Shards [rows] into several sublists of size [maxEntityGroups]. |
| Future<List<List<Model>>> shard(List<Model> rows) async { |
| final List<List<Model>> shards = <List<Model>>[]; |
| for (int i = 0; i < rows.length; i += maxEntityGroups) { |
| shards |
| .add(rows.sublist(i, i + min<int>(rows.length - i, maxEntityGroups))); |
| } |
| return shards; |
| } |
| |
| /// Inserts [rows] into datastore sharding the inserts if needed. |
| Future<void> insert(List<Model> rows) async { |
| final List<List<Model>> shards = await shard(rows); |
| for (List<Model> shard in shards) { |
| await runTransactionWithRetries(() async { |
| await db.withTransaction<void>((Transaction transaction) async { |
| transaction.queueMutations(inserts: shard); |
| await transaction.commit(); |
| }); |
| }, retryOptions: retryOptions); |
| } |
| } |
| |
| /// Looks up registers by [keys]. |
| Future<List<T>> lookupByKey<T extends Model>(List<Key> keys) async { |
| List<T> results = <T>[]; |
| await runTransactionWithRetries(() async { |
| await db.withTransaction<void>((Transaction transaction) async { |
| results = await transaction.lookup<T>(keys); |
| }); |
| }, retryOptions: retryOptions); |
| return results; |
| } |
| |
| /// Looks up registers by value using a single [key]. |
| Future<T> lookupByValue<T extends Model>(Key key, |
| {T Function() orElse}) async { |
| T result; |
| await runTransactionWithRetries(() async { |
| await db.withTransaction<void>((Transaction transaction) async { |
| result = await db.lookupValue<T>(key, orElse: orElse); |
| }); |
| }, retryOptions: retryOptions); |
| return result; |
| } |
| |
| /// Runs a function inside a transaction providing a [Transaction] parameter. |
| Future<T> withTransaction<T>(Future<T> Function(Transaction) handler) async { |
| T result; |
| await runTransactionWithRetries(() async { |
| await db.withTransaction<void>((Transaction transaction) async { |
| result = await handler(transaction); |
| }); |
| }, retryOptions: retryOptions); |
| return result; |
| } |
| } |