Add ability to mock VMService's WebSocket connection (#8145)
diff --git a/packages/flutter_tools/lib/src/commands/trace.dart b/packages/flutter_tools/lib/src/commands/trace.dart
index 75edcc2..43b59c3 100644
--- a/packages/flutter_tools/lib/src/commands/trace.dart
+++ b/packages/flutter_tools/lib/src/commands/trace.dart
@@ -60,7 +60,7 @@
Tracing tracing;
try {
- tracing = await Tracing.connect(observatoryUri);
+ tracing = Tracing.connect(observatoryUri);
} catch (error) {
throwToolExit('Error connecting to observatory: $error');
}
@@ -102,8 +102,9 @@
class Tracing {
Tracing(this.vmService);
- static Future<Tracing> connect(Uri uri) {
- return VMService.connect(uri).then((VMService observatory) => new Tracing(observatory));
+ static Tracing connect(Uri uri) {
+ VMService observatory = VMService.connect(uri);
+ return new Tracing(observatory);
}
final VMService vmService;
diff --git a/packages/flutter_tools/lib/src/resident_runner.dart b/packages/flutter_tools/lib/src/resident_runner.dart
index 235ccdc..5d81a1c 100644
--- a/packages/flutter_tools/lib/src/resident_runner.dart
+++ b/packages/flutter_tools/lib/src/resident_runner.dart
@@ -194,7 +194,7 @@
if (!debuggingOptions.debuggingEnabled) {
return new Future<Null>.error('Error the service protocol is not enabled.');
}
- vmService = await VMService.connect(uri);
+ vmService = VMService.connect(uri);
printTrace('Connected to service protocol: $uri');
await vmService.getVM();
diff --git a/packages/flutter_tools/lib/src/vmservice.dart b/packages/flutter_tools/lib/src/vmservice.dart
index 63632cf..95a9bb6 100644
--- a/packages/flutter_tools/lib/src/vmservice.dart
+++ b/packages/flutter_tools/lib/src/vmservice.dart
@@ -7,13 +7,23 @@
import 'package:json_rpc_2/error_code.dart' as rpc_error_code;
import 'package:json_rpc_2/json_rpc_2.dart' as rpc;
+import 'package:meta/meta.dart';
import 'package:stream_channel/stream_channel.dart';
import 'package:web_socket_channel/io.dart';
+import 'package:web_socket_channel/web_socket_channel.dart';
+import 'base/common.dart';
import 'base/file_system.dart';
-import 'base/io.dart';
import 'globals.dart';
+/// A function that opens a two-way communication channel to the specified [uri].
+typedef StreamChannel<dynamic> OpenChannel(Uri uri);
+
+OpenChannel _openChannel = _defaultOpenChannel;
+
+StreamChannel<dynamic> _defaultOpenChannel(Uri uri) =>
+ new IOWebSocketChannel.connect(uri.toString());
+
/// The default VM service request timeout.
const Duration kDefaultRequestTimeout = const Duration(seconds: 10);
@@ -22,35 +32,41 @@
/// A connection to the Dart VM Service.
class VMService {
- VMService._(this.peer, this.httpAddress, this.wsAddress, this._requestTimeout) {
+ VMService._(this._peer, this.httpAddress, this.wsAddress, this._requestTimeout) {
_vm = new VM._empty(this);
+ _peer.listen().catchError((dynamic e, StackTrace stackTrace) {
+ _connectionError.completeError(e, stackTrace);
+ });
- peer.registerMethod('streamNotify', (rpc.Parameters event) {
+ _peer.registerMethod('streamNotify', (rpc.Parameters event) {
_handleStreamNotify(event.asMap);
});
}
+ @visibleForTesting
+ static void setOpenChannelForTesting(OpenChannel openChannel) {
+ _openChannel = openChannel;
+ }
+
/// Connect to a Dart VM Service at [httpUri].
///
/// Requests made via the returns [VMService] time out after [requestTimeout]
/// amount of time, which is [kDefaultRequestTimeout] by default.
- static Future<VMService> connect(Uri httpUri, { Duration requestTimeout: kDefaultRequestTimeout }) async {
+ static VMService connect(
+ Uri httpUri, {
+ Duration requestTimeout: kDefaultRequestTimeout,
+ }) {
Uri wsUri = httpUri.replace(scheme: 'ws', path: fs.path.join(httpUri.path, 'ws'));
- WebSocket ws;
- try {
- ws = await WebSocket.connect(wsUri.toString());
- } catch (e) {
- return new Future<VMService>.error('Failed to connect to $wsUri\n $e');
- }
- rpc.Peer peer = new rpc.Peer.withoutJson(jsonDocument.bind(new IOWebSocketChannel(ws).cast()));
- peer.listen();
+ StreamChannel<dynamic> channel = _openChannel(wsUri);
+ rpc.Peer peer = new rpc.Peer.withoutJson(jsonDocument.bind(channel.cast()));
return new VMService._(peer, httpUri, wsUri, requestTimeout);
}
final Uri httpAddress;
final Uri wsAddress;
- final rpc.Peer peer;
+ final rpc.Peer _peer;
final Duration _requestTimeout;
+ final Completer<Map<String, dynamic>> _connectionError = new Completer<Map<String, dynamic>>();
VM _vm;
/// The singleton [VM] object. Owns [Isolate] and [FlutterView] objects.
@@ -61,8 +77,9 @@
Set<String> _listeningFor = new Set<String>();
- bool get isClosed => peer.isClosed;
- Future<Null> get done => peer.done;
+ /// Whether our connection to the VM service has been closed;
+ bool get isClosed => _peer.isClosed;
+ Future<Null> get done => _peer.done;
// Events
Stream<ServiceEvent> get onDebugEvent => onEvent('Debug');
@@ -78,6 +95,16 @@
return _getEventController(streamId).stream;
}
+ Future<Map<String, dynamic>> _sendRequest(
+ String method,
+ Map<String, dynamic> params,
+ ) {
+ return Future.any(<Future<Map<String, dynamic>>>[
+ _peer.sendRequest(method, params),
+ _connectionError.future,
+ ]);
+ }
+
StreamController<ServiceEvent> _getEventController(String eventName) {
StreamController<ServiceEvent> controller = _eventControllers[eventName];
if (controller == null) {
@@ -114,8 +141,7 @@
Future<Null> _streamListen(String streamId) async {
if (!_listeningFor.contains(streamId)) {
_listeningFor.add(streamId);
- await peer.sendRequest('streamListen',
- <String, dynamic>{ 'streamId': streamId });
+ await _sendRequest('streamListen', <String, dynamic>{ 'streamId': streamId });
}
}
@@ -299,7 +325,7 @@
_inProgressReload = null;
}
- return _inProgressReload;
+ return await _inProgressReload;
}
/// Update [this] using [map] as a source. [map] can be a service reference.
@@ -578,8 +604,8 @@
assert(params != null);
timeout ??= _vmService._requestTimeout;
try {
- Map<String, dynamic> result = await _vmService.peer
- .sendRequest(method, params)
+ Map<String, dynamic> result = await _vmService
+ ._sendRequest(method, params)
.timeout(timeout);
return result;
} on TimeoutException {
@@ -587,6 +613,9 @@
if (timeoutFatal)
throw new TimeoutException('Request to Dart VM Service timed out: $method($params)');
return null;
+ } on WebSocketChannelException catch (error) {
+ throwToolExit('Error connecting to observatory: $error');
+ return null;
}
}
diff --git a/packages/flutter_tools/pubspec.yaml b/packages/flutter_tools/pubspec.yaml
index 1e70c2b..8b8fc38 100644
--- a/packages/flutter_tools/pubspec.yaml
+++ b/packages/flutter_tools/pubspec.yaml
@@ -15,7 +15,9 @@
file: 2.0.1
http: ^0.11.3
intl: '>=0.14.0 <0.15.0'
- json_rpc_2: ^2.0.0
+ # TODO(tvolkert): Change to ^2.0.0 after manually vetting 2.0.4 release,
+ # which contains https://github.com/dart-lang/json_rpc_2/pull/19
+ json_rpc_2: 2.0.3
json_schema: 1.0.6
linter: 0.1.30-alpha.1
meta: ^1.0.4