blob: 200e0fb8294badb0a640d899fd7b16c9f07708f0 [file] [log] [blame]
// Copyright 2014 The Flutter Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
import 'dart:async';
import 'dart:io';
import 'package:process/process.dart';
import 'common/logging.dart';
import 'common/network.dart';
import 'dart/dart_vm.dart';
import 'runners/ssh_command_runner.dart';
final String _ipv4Loopback = InternetAddress.loopbackIPv4.address;
final String _ipv6Loopback = InternetAddress.loopbackIPv6.address;
const ProcessManager _processManager = LocalProcessManager();
const Duration _kIsolateFindTimeout = Duration(minutes: 1);
const Duration _kDartVmConnectionTimeout = Duration(seconds: 9);
const Duration _kVmPollInterval = Duration(milliseconds: 1500);
final Logger _log = Logger('FuchsiaRemoteConnection');
/// A function for forwarding ports on the local machine to a remote device.
///
/// Takes a remote `address`, the target device's port, and an optional
/// `interface` and `configFile`. The config file is used primarily for the
/// default SSH port forwarding configuration.
typedef PortForwardingFunction = Future<PortForwarder> Function(
String address,
int remotePort, [
String? interface,
String? configFile,
]);
/// The function for forwarding the local machine's ports to a remote Fuchsia
/// device.
///
/// Can be overwritten in the event that a different method is required.
/// Defaults to using SSH port forwarding.
PortForwardingFunction fuchsiaPortForwardingFunction = _SshPortForwarder.start;
/// Sets [fuchsiaPortForwardingFunction] back to the default SSH port forwarding
/// implementation.
void restoreFuchsiaPortForwardingFunction() {
fuchsiaPortForwardingFunction = _SshPortForwarder.start;
}
/// A general error raised when something fails within a
/// [FuchsiaRemoteConnection].
class FuchsiaRemoteConnectionError extends Error {
/// Basic constructor outlining the reason for the failure in `message`.
FuchsiaRemoteConnectionError(this.message);
/// The reason for the failure.
final String message;
@override
String toString() {
return '$FuchsiaRemoteConnectionError: $message';
}
}
/// An enum specifying a Dart VM's state.
enum DartVmEventType {
/// The Dart VM has started.
started,
/// The Dart VM has stopped.
///
/// This can mean either the host machine cannot be connect to, the VM service
/// has shut down cleanly, or the VM service has crashed.
stopped,
}
/// An event regarding the Dart VM.
///
/// Specifies the type of the event (whether the VM has started or has stopped),
/// and contains the service port of the VM as well as a URL to connect to it.
class DartVmEvent {
DartVmEvent._({required this.eventType, required this.servicePort, required this.uri});
/// The URL used to connect to the Dart VM.
final Uri uri;
/// The type of event regarding this instance of the Dart VM.
final DartVmEventType eventType;
/// The port on the host machine that the Dart VM service is/was running on.
final int servicePort;
}
/// Manages a remote connection to a Fuchsia Device.
///
/// Provides affordances to observe and connect to Flutter views, isolates, and
/// perform actions on the Fuchsia device's various VM services.
///
/// This class can be connected to several instances of the Fuchsia device's
/// Dart VM at any given time.
class FuchsiaRemoteConnection {
FuchsiaRemoteConnection._(this._useIpV6, this._sshCommandRunner)
: _pollDartVms = false;
bool _pollDartVms;
final List<PortForwarder> _forwardedVmServicePorts = <PortForwarder>[];
final SshCommandRunner _sshCommandRunner;
final bool _useIpV6;
/// A mapping of Dart VM ports (as seen on the target machine), to
/// [PortForwarder] instances mapping from the local machine to the target
/// machine.
final Map<int, PortForwarder> _dartVmPortMap = <int, PortForwarder>{};
/// Tracks stale ports so as not to reconnect while polling.
final Set<int> _stalePorts = <int>{};
/// A broadcast stream that emits events relating to Dart VM's as they update.
Stream<DartVmEvent> get onDartVmEvent => _onDartVmEvent;
late Stream<DartVmEvent> _onDartVmEvent;
final StreamController<DartVmEvent> _dartVmEventController =
StreamController<DartVmEvent>();
/// VM service cache to avoid repeating handshakes across function
/// calls. Keys a URI to a DartVm connection instance.
final Map<Uri, DartVm?> _dartVmCache = <Uri, DartVm?>{};
/// Same as [FuchsiaRemoteConnection.connect] albeit with a provided
/// [SshCommandRunner] instance.
static Future<FuchsiaRemoteConnection> connectWithSshCommandRunner(SshCommandRunner commandRunner) async {
final FuchsiaRemoteConnection connection = FuchsiaRemoteConnection._(
isIpV6Address(commandRunner.address), commandRunner);
await connection._forwardOpenPortsToDeviceServicePorts();
Stream<DartVmEvent> dartVmStream() {
Future<void> listen() async {
while (connection._pollDartVms) {
await connection._pollVms();
await Future<void>.delayed(_kVmPollInterval);
}
connection._dartVmEventController.close();
}
connection._dartVmEventController.onListen = listen;
return connection._dartVmEventController.stream.asBroadcastStream();
}
connection._onDartVmEvent = dartVmStream();
return connection;
}
/// Opens a connection to a Fuchsia device.
///
/// Accepts an `address` to a Fuchsia device, and optionally a `sshConfigPath`
/// in order to open the associated ssh_config for port forwarding.
///
/// Will throw an [ArgumentError] if `address` is malformed.
///
/// Once this function is called, the instance of [FuchsiaRemoteConnection]
/// returned will keep all associated DartVM connections opened over the
/// lifetime of the object.
///
/// At its current state Dart VM connections will not be added or removed over
/// the lifetime of this object.
///
/// Throws an [ArgumentError] if the supplied `address` is not valid IPv6 or
/// IPv4.
///
/// If `address` is IPv6 link local (usually starts with `fe80::`), then
/// `interface` will probably need to be set in order to connect successfully
/// (that being the outgoing interface of your machine, not the interface on
/// the target machine).
///
/// Attempts to set `address` via the environment variable
/// `FUCHSIA_DEVICE_URL` in the event that the argument is not passed.
/// If `address` is not supplied, `interface` is also ignored, as the format
/// is expected to contain the interface as well (in the event that it is
/// link-local), like the following:
///
/// ```
/// fe80::1%eth0
/// ```
///
/// In the event that `FUCHSIA_SSH_CONFIG` is set in the environment, that
/// will be used when `sshConfigPath` isn't supplied.
static Future<FuchsiaRemoteConnection> connect([
String? address,
String interface = '',
String? sshConfigPath,
]) async {
address ??= Platform.environment['FUCHSIA_DEVICE_URL'];
sshConfigPath ??= Platform.environment['FUCHSIA_SSH_CONFIG'];
if (address == null) {
throw FuchsiaRemoteConnectionError(
r'No address supplied, and $FUCHSIA_DEVICE_URL not found.');
}
const String interfaceDelimiter = '%';
if (address.contains(interfaceDelimiter)) {
final List<String> addressAndInterface =
address.split(interfaceDelimiter);
address = addressAndInterface[0];
interface = addressAndInterface[1];
}
return FuchsiaRemoteConnection.connectWithSshCommandRunner(
SshCommandRunner(
address: address,
interface: interface,
sshConfigPath: sshConfigPath,
),
);
}
/// Closes all open connections.
///
/// Any objects that this class returns (including any child objects from
/// those objects) will subsequently have its connection closed as well, so
/// behavior for them will be undefined.
Future<void> stop() async {
for (final PortForwarder pf in _forwardedVmServicePorts) {
// Closes VM service first to ensure that the connection is closed cleanly
// on the target before shutting down the forwarding itself.
final Uri uri = _getDartVmUri(pf);
final DartVm? vmService = _dartVmCache[uri];
_dartVmCache[uri] = null;
await vmService?.stop();
await pf.stop();
}
for (final PortForwarder pf in _dartVmPortMap.values) {
final Uri uri = _getDartVmUri(pf);
final DartVm? vmService = _dartVmCache[uri];
_dartVmCache[uri] = null;
await vmService?.stop();
await pf.stop();
}
_dartVmCache.clear();
_forwardedVmServicePorts.clear();
_dartVmPortMap.clear();
_pollDartVms = false;
}
/// Helper method for [getMainIsolatesByPattern].
///
/// Called when either there are no Isolates that exist that match
/// `pattern`, or there are not yet any active Dart VM's on the system
/// (possible when the Isolate we're attempting to connect to is in the only
/// instance of the Dart VM and its service port has not yet opened).
Future<List<IsolateRef>> _waitForMainIsolatesByPattern([
Pattern? pattern,
Duration timeout = _kIsolateFindTimeout,
Duration vmConnectionTimeout = _kDartVmConnectionTimeout,
]) async {
final Completer<List<IsolateRef>> completer = Completer<List<IsolateRef>>();
_onDartVmEvent.listen(
(DartVmEvent event) async {
if (event.eventType == DartVmEventType.started) {
_log.fine('New VM found on port: ${event.servicePort}. Searching '
'for Isolate: $pattern');
final DartVm? vmService = await _getDartVm(event.uri);
// If the VM service is null, set the result to the empty list.
final List<IsolateRef> result = await vmService?.getMainIsolatesByPattern(pattern!) ?? <IsolateRef>[];
if (result.isNotEmpty) {
if (!completer.isCompleted) {
completer.complete(result);
} else {
_log.warning('Found more than one Dart VM containing Isolates '
'that match the pattern "$pattern".');
}
}
}
},
onDone: () {
if (!completer.isCompleted) {
_log.warning('Terminating isolate search for "$pattern"'
' before timeout reached.');
}
},
);
return completer.future.timeout(timeout);
}
/// Returns all Isolates running `main()` as matched by the [Pattern].
///
/// If there are no live Dart VM's or the Isolate cannot be found, waits until
/// either `timeout` is reached, or a Dart VM starts up with a name that
/// matches `pattern`.
Future<List<IsolateRef>> getMainIsolatesByPattern(
Pattern pattern, {
Duration timeout = _kIsolateFindTimeout,
Duration vmConnectionTimeout = _kDartVmConnectionTimeout,
}) async {
// If for some reason there are no Dart VM's that are alive, wait for one to
// start with the Isolate in question.
if (_dartVmPortMap.isEmpty) {
_log.fine('No live Dart VMs found. Awaiting new VM startup');
return _waitForMainIsolatesByPattern(
pattern, timeout, vmConnectionTimeout);
}
// Accumulate a list of eventual IsolateRef lists so that they can be loaded
// simultaneously via Future.wait.
final List<Future<List<IsolateRef>>> isolates =
<Future<List<IsolateRef>>>[];
for (final PortForwarder fp in _dartVmPortMap.values) {
final DartVm? vmService =
await _getDartVm(_getDartVmUri(fp), timeout: vmConnectionTimeout);
if (vmService == null) {
continue;
}
isolates.add(vmService.getMainIsolatesByPattern(pattern));
}
final List<IsolateRef> result =
await Future.wait<List<IsolateRef>>(isolates)
.timeout(timeout)
.then<List<IsolateRef>>((List<List<IsolateRef>> listOfLists) {
final List<List<IsolateRef>> mutableListOfLists =
List<List<IsolateRef>>.from(listOfLists)
..retainWhere((List<IsolateRef> list) => list.isNotEmpty);
// Folds the list of lists into one flat list.
return mutableListOfLists.fold<List<IsolateRef>>(
<IsolateRef>[],
(List<IsolateRef> accumulator, List<IsolateRef> element) {
accumulator.addAll(element);
return accumulator;
},
);
});
// If no VM instance anywhere has this, it's possible it hasn't spun up
// anywhere.
//
// For the time being one Flutter Isolate runs at a time in each VM, so for
// now this will wait until the timer runs out or a new Dart VM starts that
// contains the Isolate in question.
//
// TODO(awdavies): Set this up to handle multiple Isolates per Dart VM.
if (result.isEmpty) {
_log.fine('No instance of the Isolate found. Awaiting new VM startup');
return _waitForMainIsolatesByPattern(
pattern, timeout, vmConnectionTimeout);
}
return result;
}
/// Returns a list of [FlutterView] objects.
///
/// This is run across all connected Dart VM connections that this class is
/// managing.
Future<List<FlutterView>> getFlutterViews() async {
if (_dartVmPortMap.isEmpty) {
return <FlutterView>[];
}
final List<List<FlutterView>> flutterViewLists =
await _invokeForAllVms<List<FlutterView>>((DartVm vmService) async {
return vmService.getAllFlutterViews();
});
final List<FlutterView> results = flutterViewLists.fold<List<FlutterView>>(
<FlutterView>[], (List<FlutterView> acc, List<FlutterView> element) {
acc.addAll(element);
return acc;
});
return List<FlutterView>.unmodifiable(results);
}
// Calls all Dart VM's, returning a list of results.
//
// A side effect of this function is that internally tracked port forwarding
// will be updated in the event that ports are found to be broken/stale: they
// will be shut down and removed from tracking.
Future<List<E>> _invokeForAllVms<E>(
Future<E> Function(DartVm vmService) vmFunction, [
bool queueEvents = true,
]) async {
final List<E> result = <E>[];
// Helper function loop.
Future<void> shutDownPortForwarder(PortForwarder pf) async {
await pf.stop();
_stalePorts.add(pf.remotePort);
if (queueEvents) {
_dartVmEventController.add(DartVmEvent._(
eventType: DartVmEventType.stopped,
servicePort: pf.remotePort,
uri: _getDartVmUri(pf),
));
}
}
for (final PortForwarder pf in _dartVmPortMap.values) {
final DartVm? service = await _getDartVm(_getDartVmUri(pf));
if (service == null) {
await shutDownPortForwarder(pf);
} else {
result.add(await vmFunction(service));
}
}
_stalePorts.forEach(_dartVmPortMap.remove);
return result;
}
Uri _getDartVmUri(PortForwarder pf) {
String? addr;
if (pf.openPortAddress == null) {
addr = _useIpV6 ? '[$_ipv6Loopback]' : _ipv4Loopback;
} else {
addr = isIpV6Address(pf.openPortAddress!)
? '[${pf.openPortAddress}]'
: pf.openPortAddress;
}
final Uri uri = Uri.http('$addr:${pf.port}', '/');
return uri;
}
/// Attempts to create a connection to a Dart VM.
///
/// Returns null if either there is an [HttpException] or a
/// [TimeoutException], else a [DartVm] instance.
Future<DartVm?> _getDartVm(
Uri uri, {
Duration timeout = _kDartVmConnectionTimeout,
}) async {
if (!_dartVmCache.containsKey(uri)) {
// When raising an HttpException this means that there is no instance of
// the Dart VM to communicate with. The TimeoutException is raised when
// the Dart VM instance is shut down in the middle of communicating.
try {
final DartVm dartVm = await DartVm.connect(uri, timeout: timeout);
_dartVmCache[uri] = dartVm;
} on HttpException {
_log.warning('HTTP Exception encountered connecting to new VM');
return null;
} on TimeoutException {
_log.warning('TimeoutException encountered connecting to new VM');
return null;
}
}
return _dartVmCache[uri];
}
/// Checks for changes in the list of Dart VM instances.
///
/// If there are new instances of the Dart VM, then connections will be
/// attempted (after clearing out stale connections).
Future<void> _pollVms() async {
await _checkPorts();
final List<int> servicePorts = await getDeviceServicePorts();
for (final int servicePort in servicePorts) {
if (!_stalePorts.contains(servicePort) &&
!_dartVmPortMap.containsKey(servicePort)) {
_dartVmPortMap[servicePort] = await fuchsiaPortForwardingFunction(
_sshCommandRunner.address,
servicePort,
_sshCommandRunner.interface,
_sshCommandRunner.sshConfigPath);
_dartVmEventController.add(DartVmEvent._(
eventType: DartVmEventType.started,
servicePort: servicePort,
uri: _getDartVmUri(_dartVmPortMap[servicePort]!),
));
}
}
}
/// Runs a dummy heartbeat command on all Dart VM instances.
///
/// Removes any failing ports from the cache.
Future<void> _checkPorts([ bool queueEvents = true ]) async {
// Filters out stale ports after connecting. Ignores results.
await _invokeForAllVms<void>(
(DartVm vmService) async {
await vmService.ping();
},
queueEvents,
);
}
/// Forwards a series of open ports to the remote device.
///
/// When this function is run, all existing forwarded ports and connections
/// are reset by way of [stop].
Future<void> _forwardOpenPortsToDeviceServicePorts() async {
await stop();
final List<int> servicePorts = await getDeviceServicePorts();
final List<PortForwarder?> forwardedVmServicePorts =
await Future.wait<PortForwarder?>(
servicePorts.map<Future<PortForwarder?>>((int deviceServicePort) {
return fuchsiaPortForwardingFunction(
_sshCommandRunner.address,
deviceServicePort,
_sshCommandRunner.interface,
_sshCommandRunner.sshConfigPath);
}));
for (final PortForwarder? pf in forwardedVmServicePorts) {
// TODO(awdavies): Handle duplicates.
_dartVmPortMap[pf!.remotePort] = pf;
}
// Don't queue events, since this is the initial forwarding.
await _checkPorts(false);
_pollDartVms = true;
}
/// Gets the open Dart VM service ports on a remote Fuchsia device.
///
/// The method attempts to get service ports through an SSH connection. Upon
/// successfully getting the VM service ports, returns them as a list of
/// integers. If an empty list is returned, then no Dart VM instances could be
/// found. An exception is thrown in the event of an actual error when
/// attempting to acquire the ports.
Future<List<int>> getDeviceServicePorts() async {
final List<String> portPaths = await _sshCommandRunner
.run('/bin/find /hub -name vmservice-port');
final List<int> ports = <int>[];
for (final String path in portPaths) {
if (path == '') {
continue;
}
final List<String> lsOutput =
await _sshCommandRunner.run('/bin/ls $path');
for (final String line in lsOutput) {
if (line == '') {
continue;
}
final int? port = int.tryParse(line);
if (port != null) {
ports.add(port);
}
}
}
return ports;
}
}
/// Defines an interface for port forwarding.
///
/// When a port forwarder is initialized, it is intended to save a port through
/// which a connection is persisted along the lifetime of this object.
///
/// To shut down a port forwarder you must call the [stop] function.
abstract class PortForwarder {
/// Determines the port which is being forwarded.
int get port;
/// The address on which the open port is accessible. Defaults to null to
/// indicate local loopback.
String? get openPortAddress => null;
/// The destination port on the other end of the port forwarding tunnel.
int get remotePort;
/// Shuts down and cleans up port forwarding.
Future<void> stop();
}
/// Instances of this class represent a running SSH tunnel.
///
/// The SSH tunnel is from the host to a VM service running on a Fuchsia device.
class _SshPortForwarder implements PortForwarder {
_SshPortForwarder._(
this._remoteAddress,
this._remotePort,
this._localSocket,
this._interface,
this._sshConfigPath,
this._ipV6,
);
final String _remoteAddress;
final int _remotePort;
final ServerSocket _localSocket;
final String? _sshConfigPath;
final String? _interface;
final bool _ipV6;
@override
int get port => _localSocket.port;
@override
String get openPortAddress => _ipV6 ? _ipv6Loopback : _ipv4Loopback;
@override
int get remotePort => _remotePort;
/// Starts SSH forwarding through a subprocess, and returns an instance of
/// [_SshPortForwarder].
static Future<_SshPortForwarder> start(
String address,
int remotePort, [
String? interface,
String? sshConfigPath,
]) async {
final bool isIpV6 = isIpV6Address(address);
final ServerSocket? localSocket = await _createLocalSocket();
if (localSocket == null || localSocket.port == 0) {
_log.warning('_SshPortForwarder failed to find a local port for '
'$address:$remotePort');
throw StateError('Unable to create a socket or no available ports.');
}
// TODO(awdavies): The square-bracket enclosure for using the IPv6 loopback
// didn't appear to work, but when assigning to the IPv4 loopback device,
// netstat shows that the local port is actually being used on the IPv6
// loopback (::1). Therefore, while the IPv4 loopback can be used for
// forwarding to the destination IPv6 interface, when connecting to the
// websocket, the IPV6 loopback should be used.
final String formattedForwardingUrl =
'${localSocket.port}:$_ipv4Loopback:$remotePort';
final String targetAddress =
isIpV6 && interface!.isNotEmpty ? '$address%$interface' : address;
const String dummyRemoteCommand = 'true';
final List<String> command = <String>[
'ssh',
if (isIpV6) '-6',
if (sshConfigPath != null)
...<String>['-F', sshConfigPath],
'-nNT',
'-f',
'-L',
formattedForwardingUrl,
targetAddress,
dummyRemoteCommand,
];
_log.fine("_SshPortForwarder running '${command.join(' ')}'");
// Must await for the port forwarding function to completer here, as
// forwarding must be completed before surfacing VM events (as the user may
// want to connect immediately after an event is surfaced).
final ProcessResult processResult = await _processManager.run(command);
_log.fine("'${command.join(' ')}' exited with exit code "
'${processResult.exitCode}');
if (processResult.exitCode != 0) {
throw StateError('Unable to start port forwarding');
}
final _SshPortForwarder result = _SshPortForwarder._(
address, remotePort, localSocket, interface, sshConfigPath, isIpV6);
_log.fine('Set up forwarding from ${localSocket.port} '
'to $address port $remotePort');
return result;
}
/// Kills the SSH forwarding command, then to ensure no ports are forwarded,
/// runs the SSH 'cancel' command to shut down port forwarding completely.
@override
Future<void> stop() async {
// Cancel the forwarding request. See [start] for commentary about why this
// uses the IPv4 loopback.
final String formattedForwardingUrl =
'${_localSocket.port}:$_ipv4Loopback:$_remotePort';
final String targetAddress = _ipV6 && _interface!.isNotEmpty
? '$_remoteAddress%$_interface'
: _remoteAddress;
final String? sshConfigPath = _sshConfigPath;
final List<String> command = <String>[
'ssh',
if (sshConfigPath != null)
...<String>['-F', sshConfigPath],
'-O',
'cancel',
'-L',
formattedForwardingUrl,
targetAddress,
];
_log.fine(
'Shutting down SSH forwarding with command: ${command.join(' ')}');
final ProcessResult result = await _processManager.run(command);
if (result.exitCode != 0) {
_log.warning('Command failed:\nstdout: ${result.stdout}'
'\nstderr: ${result.stderr}');
}
_localSocket.close();
}
/// Attempts to find an available port.
///
/// If successful returns a valid [ServerSocket] (which must be disconnected
/// later).
static Future<ServerSocket?> _createLocalSocket() async {
try {
return await ServerSocket.bind(_ipv4Loopback, 0);
} catch (e) {
// We should not be catching all errors arbitrarily here, this might hide real errors.
// TODO(ianh): Determine which exceptions to catch here.
_log.warning('_createLocalSocket failed: $e');
return null;
}
}
}