Add ability to mock VMService's WebSocket connection (#8145)

diff --git a/packages/flutter_tools/lib/src/commands/trace.dart b/packages/flutter_tools/lib/src/commands/trace.dart
index 75edcc2..43b59c3 100644
--- a/packages/flutter_tools/lib/src/commands/trace.dart
+++ b/packages/flutter_tools/lib/src/commands/trace.dart
@@ -60,7 +60,7 @@
     Tracing tracing;
 
     try {
-      tracing = await Tracing.connect(observatoryUri);
+      tracing = Tracing.connect(observatoryUri);
     } catch (error) {
       throwToolExit('Error connecting to observatory: $error');
     }
@@ -102,8 +102,9 @@
 class Tracing {
   Tracing(this.vmService);
 
-  static Future<Tracing> connect(Uri uri) {
-    return VMService.connect(uri).then((VMService observatory) => new Tracing(observatory));
+  static Tracing connect(Uri uri) {
+    VMService observatory = VMService.connect(uri);
+    return new Tracing(observatory);
   }
 
   final VMService vmService;
diff --git a/packages/flutter_tools/lib/src/resident_runner.dart b/packages/flutter_tools/lib/src/resident_runner.dart
index 235ccdc..5d81a1c 100644
--- a/packages/flutter_tools/lib/src/resident_runner.dart
+++ b/packages/flutter_tools/lib/src/resident_runner.dart
@@ -194,7 +194,7 @@
     if (!debuggingOptions.debuggingEnabled) {
       return new Future<Null>.error('Error the service protocol is not enabled.');
     }
-    vmService = await VMService.connect(uri);
+    vmService = VMService.connect(uri);
     printTrace('Connected to service protocol: $uri');
     await vmService.getVM();
 
diff --git a/packages/flutter_tools/lib/src/vmservice.dart b/packages/flutter_tools/lib/src/vmservice.dart
index 63632cf..95a9bb6 100644
--- a/packages/flutter_tools/lib/src/vmservice.dart
+++ b/packages/flutter_tools/lib/src/vmservice.dart
@@ -7,13 +7,23 @@
 
 import 'package:json_rpc_2/error_code.dart' as rpc_error_code;
 import 'package:json_rpc_2/json_rpc_2.dart' as rpc;
+import 'package:meta/meta.dart';
 import 'package:stream_channel/stream_channel.dart';
 import 'package:web_socket_channel/io.dart';
+import 'package:web_socket_channel/web_socket_channel.dart';
 
+import 'base/common.dart';
 import 'base/file_system.dart';
-import 'base/io.dart';
 import 'globals.dart';
 
+/// A function that opens a two-way communication channel to the specified [uri].
+typedef StreamChannel<dynamic> OpenChannel(Uri uri);
+
+OpenChannel _openChannel = _defaultOpenChannel;
+
+StreamChannel<dynamic> _defaultOpenChannel(Uri uri) =>
+    new IOWebSocketChannel.connect(uri.toString());
+
 /// The default VM service request timeout.
 const Duration kDefaultRequestTimeout = const Duration(seconds: 10);
 
@@ -22,35 +32,41 @@
 
 /// A connection to the Dart VM Service.
 class VMService {
-  VMService._(this.peer, this.httpAddress, this.wsAddress, this._requestTimeout) {
+  VMService._(this._peer, this.httpAddress, this.wsAddress, this._requestTimeout) {
     _vm = new VM._empty(this);
+    _peer.listen().catchError((dynamic e, StackTrace stackTrace) {
+      _connectionError.completeError(e, stackTrace);
+    });
 
-    peer.registerMethod('streamNotify', (rpc.Parameters event) {
+    _peer.registerMethod('streamNotify', (rpc.Parameters event) {
       _handleStreamNotify(event.asMap);
     });
   }
 
+  @visibleForTesting
+  static void setOpenChannelForTesting(OpenChannel openChannel) {
+    _openChannel = openChannel;
+  }
+
   /// Connect to a Dart VM Service at [httpUri].
   ///
   /// Requests made via the returns [VMService] time out after [requestTimeout]
   /// amount of time, which is [kDefaultRequestTimeout] by default.
-  static Future<VMService> connect(Uri httpUri, { Duration requestTimeout: kDefaultRequestTimeout }) async {
+  static VMService connect(
+    Uri httpUri, {
+    Duration requestTimeout: kDefaultRequestTimeout,
+  }) {
     Uri wsUri = httpUri.replace(scheme: 'ws', path: fs.path.join(httpUri.path, 'ws'));
-    WebSocket ws;
-    try {
-      ws = await WebSocket.connect(wsUri.toString());
-    } catch (e) {
-      return new Future<VMService>.error('Failed to connect to $wsUri\n  $e');
-    }
-    rpc.Peer peer = new rpc.Peer.withoutJson(jsonDocument.bind(new IOWebSocketChannel(ws).cast()));
-    peer.listen();
+    StreamChannel<dynamic> channel = _openChannel(wsUri);
+    rpc.Peer peer = new rpc.Peer.withoutJson(jsonDocument.bind(channel.cast()));
     return new VMService._(peer, httpUri, wsUri, requestTimeout);
   }
 
   final Uri httpAddress;
   final Uri wsAddress;
-  final rpc.Peer peer;
+  final rpc.Peer _peer;
   final Duration _requestTimeout;
+  final Completer<Map<String, dynamic>> _connectionError = new Completer<Map<String, dynamic>>();
 
   VM _vm;
   /// The singleton [VM] object. Owns [Isolate] and [FlutterView] objects.
@@ -61,8 +77,9 @@
 
   Set<String> _listeningFor = new Set<String>();
 
-  bool get isClosed => peer.isClosed;
-  Future<Null> get done => peer.done;
+  /// Whether our connection to the VM service has been closed;
+  bool get isClosed => _peer.isClosed;
+  Future<Null> get done => _peer.done;
 
   // Events
   Stream<ServiceEvent> get onDebugEvent => onEvent('Debug');
@@ -78,6 +95,16 @@
     return _getEventController(streamId).stream;
   }
 
+  Future<Map<String, dynamic>> _sendRequest(
+    String method,
+    Map<String, dynamic> params,
+  ) {
+    return Future.any(<Future<Map<String, dynamic>>>[
+      _peer.sendRequest(method, params),
+      _connectionError.future,
+    ]);
+  }
+
   StreamController<ServiceEvent> _getEventController(String eventName) {
     StreamController<ServiceEvent> controller = _eventControllers[eventName];
     if (controller == null) {
@@ -114,8 +141,7 @@
   Future<Null> _streamListen(String streamId) async {
     if (!_listeningFor.contains(streamId)) {
       _listeningFor.add(streamId);
-      await peer.sendRequest('streamListen',
-                             <String, dynamic>{ 'streamId': streamId });
+      await _sendRequest('streamListen', <String, dynamic>{ 'streamId': streamId });
     }
   }
 
@@ -299,7 +325,7 @@
       _inProgressReload = null;
     }
 
-    return _inProgressReload;
+    return await _inProgressReload;
   }
 
   /// Update [this] using [map] as a source. [map] can be a service reference.
@@ -578,8 +604,8 @@
     assert(params != null);
     timeout ??= _vmService._requestTimeout;
     try {
-      Map<String, dynamic> result = await _vmService.peer
-          .sendRequest(method, params)
+      Map<String, dynamic> result = await _vmService
+          ._sendRequest(method, params)
           .timeout(timeout);
       return result;
     } on TimeoutException {
@@ -587,6 +613,9 @@
       if (timeoutFatal)
         throw new TimeoutException('Request to Dart VM Service timed out: $method($params)');
       return null;
+    } on WebSocketChannelException catch (error) {
+      throwToolExit('Error connecting to observatory: $error');
+      return null;
     }
   }
 
diff --git a/packages/flutter_tools/pubspec.yaml b/packages/flutter_tools/pubspec.yaml
index 1e70c2b..8b8fc38 100644
--- a/packages/flutter_tools/pubspec.yaml
+++ b/packages/flutter_tools/pubspec.yaml
@@ -15,7 +15,9 @@
   file: 2.0.1
   http: ^0.11.3
   intl: '>=0.14.0 <0.15.0'
-  json_rpc_2: ^2.0.0
+  # TODO(tvolkert): Change to ^2.0.0 after manually vetting 2.0.4 release,
+  # which contains https://github.com/dart-lang/json_rpc_2/pull/19
+  json_rpc_2: 2.0.3
   json_schema: 1.0.6
   linter: 0.1.30-alpha.1
   meta: ^1.0.4