blob: 9a3c5f47399faa614b675b356089321b9678d0a4 [file] [log] [blame]
// Copyright 2016 The Flutter Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
import 'dart:async';
import 'dart:convert' show utf8;
import 'dart:io';
// TODO(dnfield): Move to replacement once we make it more reasnoable to
// upgrade the agent.
// ignore: deprecated_member_use
import 'package:vm_service_client/vm_service_client.dart';
import 'agent.dart';
import 'utils.dart';
/// The default task timeout, if a custom value is not provided.
///
/// This should be the same as `_kDefaultTaskTimeout` defined in https://github.com/flutter/flutter/blob/master/dev/devicelab/lib/framework/framework.dart
const Duration _kDefaultTaskTimeout = const Duration(minutes: 15);
/// Extra amount of time we give the devicelab task to finish or timeout on its
/// own before forcefully quitting it.
const Duration _kGracePeriod = const Duration(minutes: 1);
/// Send logs in 10KB chunks.
const int _kLogChunkSize = 10000;
/// A result of running a single task.
///
/// In normal circumstances, even when a task fails, the result is parsed from
/// JSON returned by the task runner process via the VM service. However, if
/// things are completely out of control and the task runner process is
/// corrupted a failed result can be instantiated directly using
/// [TaskResult.failure] constructor.
class TaskResult {
/// Parses a task result from JSON.
TaskResult.parse(Map<String, dynamic> json)
: succeeded = json['success'] as bool,
data = json['data'] as Map<String, dynamic>,
benchmarkScoreKeys = json['benchmarkScoreKeys'] ?? const <String>[],
reason = json['reason'] as String;
/// Constructs an unsuccessful result.
TaskResult.failure(this.reason)
: this.succeeded = false,
this.data = const <String, dynamic>{},
this.benchmarkScoreKeys = const <String>[];
/// Whether the task succeeded.
final bool succeeded;
/// Task-specific JSON data.
final Map<String, dynamic> data;
/// Keys in [data] that store scores that will be submitted to Golem.
///
/// Each key is also part of a benchmark's name tracked by Golem.
/// A benchmark name is computed by combining [Task.name] with a key
/// separated by a dot. For example, if a task's name is
/// `"complex_layout__start_up"` and score key is
/// `"engineEnterTimestampMicros"`, the score will be submitted to Golem under
/// `"complex_layout__start_up.engineEnterTimestampMicros"`.
///
/// This convention reduces the amount of configuration that needs to be done
/// to submit benchmark scores to Golem.
final dynamic benchmarkScoreKeys;
/// Whether the task failed.
bool get failed => !succeeded;
/// Explains the failure reason if [failed].
final String reason;
Map<String, dynamic> toJson() {
return <String, dynamic>{
'success': succeeded,
'data': data,
'benchmarkScoreKeys': benchmarkScoreKeys,
'reason': reason,
};
}
}
/// Runs a task in a separate Dart VM and collects the result using the VM
/// service protocol.
///
/// [taskName] is the name of the task. The corresponding task executable is
/// expected to be found under `bin/tasks`.
Future<TaskResult> runTask(Agent agent, CocoonTask task) async {
String devicelabPath = '${config.flutterDirectory.path}/dev/devicelab';
String taskExecutable = 'bin/tasks/${task.name}.dart';
if (!file('$devicelabPath/$taskExecutable').existsSync())
throw 'Executable Dart file not found: $taskExecutable';
int vmServicePort = await _findAvailablePort();
Process runner;
await inDirectory(devicelabPath, () async {
runner = await startProcess(
dartBin,
<String>[
'--enable-vm-service=$vmServicePort',
'--no-pause-isolates-on-exit',
'--disable-service-auth-codes',
taskExecutable,
'--cloud-auth-token=${task.cloudAuthToken}',
],
silent: true);
});
bool runnerFinished = false;
// ignore: unawaited_futures
runner.exitCode.then((_) {
runnerFinished = true;
});
StringBuffer buffer = StringBuffer();
Future<Null> sendLog(String message, {bool flush: false}) async {
buffer.write(toLogString(message));
logger.info('[task runner] [${task.name}] $message');
// Send a chunk at a time, or upon request.
if (flush || buffer.length > _kLogChunkSize) {
String chunk = buffer.toString();
buffer = StringBuffer();
await agent.uploadLogChunk(task.key, chunk);
}
}
await sendLog('Agent ID: ${agent.agentId}', flush: true);
var stdoutSub =
runner.stdout.transform(utf8.decoder).listen((String message) async {
await sendLog(message);
});
var stderrSub =
runner.stderr.transform(utf8.decoder).listen((String message) async {
await sendLog(message);
});
String waitingFor = 'connection';
try {
VMIsolateRef isolate = await _connectToRunnerIsolate(vmServicePort);
waitingFor = 'task completion';
Duration taskTimeout = task.timeoutInMinutes != 0
? Duration(minutes: task.timeoutInMinutes)
: _kDefaultTaskTimeout;
Map<String, dynamic> taskResult = await isolate.invokeExtension(
'ext.cocoonRunTask', <String, String>{
'timeoutInMinutes': '${taskTimeout.inMinutes}'
}).timeout(taskTimeout + _kGracePeriod) as Map<String, dynamic>;
waitingFor = 'task process to exit';
final Future<dynamic> whenProcessExits = Future.wait<void>([
runner.exitCode,
stdoutSub.asFuture(),
stderrSub.asFuture(),
]);
await whenProcessExits.timeout(const Duration(seconds: 1));
return TaskResult.parse(taskResult);
} on TimeoutException catch (timeout) {
runner.kill(ProcessSignal.sigint);
return TaskResult.failure(
'Timeout waiting for $waitingFor: ${timeout.message}');
} finally {
await stdoutSub.cancel();
await stderrSub.cancel();
await sendLog('Task execution finished', flush: true);
// Force-quit the task runner process.
if (!runnerFinished) runner.kill(ProcessSignal.sigkill);
// Force-quit dangling local processes (such as adb commands).
await forceQuitRunningProcesses();
}
}
Future<VMIsolateRef> _connectToRunnerIsolate(int vmServicePort) async {
String url = 'ws://localhost:$vmServicePort/ws';
DateTime started = DateTime.now();
// TODO(yjbanov): due to lack of imagination at the moment the handshake with
// the task process is very rudimentary and requires this small
// delay to let the task process open up the VM service port.
// Otherwise we almost always hit the non-ready case first and
// wait a whole 1 second, which is annoying.
await Future<void>.delayed(const Duration(milliseconds: 100));
while (true) {
try {
// Make sure VM server is up by successfully opening and closing a socket.
await (await WebSocket.connect(url)).close();
// Look up the isolate.
VMServiceClient client = VMServiceClient.connect(url);
VM vm = await client.getVM();
VMIsolateRef isolate = vm.isolates.single;
String response =
await isolate.invokeExtension('ext.cocoonRunnerReady') as String;
if (response != 'ready') throw 'not ready yet';
return isolate;
} catch (error) {
const Duration connectionTimeout = const Duration(seconds: 10);
if (DateTime.now().difference(started) > connectionTimeout) {
throw TimeoutException(
'Failed to connect to the task runner process',
connectionTimeout,
);
}
logger.info('VM service not ready yet: $error');
const Duration pauseBetweenRetries = const Duration(milliseconds: 200);
logger.info('Will retry in $pauseBetweenRetries.');
await Future<void>.delayed(pauseBetweenRetries);
}
}
}
Future<int> _findAvailablePort() async {
int port = 20000;
while (true) {
try {
ServerSocket socket =
await ServerSocket.bind(InternetAddress.loopbackIPv4, port);
await socket.close();
return port;
} catch (_) {
port++;
}
}
}