Add ability to record & replay VMService connection (#8355)

diff --git a/packages/flutter_tools/lib/src/base/process.dart b/packages/flutter_tools/lib/src/base/process.dart
index c8edfbf..b22671e 100644
--- a/packages/flutter_tools/lib/src/base/process.dart
+++ b/packages/flutter_tools/lib/src/base/process.dart
@@ -27,8 +27,10 @@
   _shutdownHooks.clear();
   _shutdownHooksRunning = true;
   try {
+    List<Future<dynamic>> futures = <Future<dynamic>>[];
     for (ShutdownHook shutdownHook in hooks)
-      await shutdownHook();
+      futures.add(shutdownHook());
+    await Future.wait<dynamic>(futures);
   } finally {
     _shutdownHooksRunning = false;
   }
diff --git a/packages/flutter_tools/lib/src/base/utils.dart b/packages/flutter_tools/lib/src/base/utils.dart
index 608fa7e..cfc216c 100644
--- a/packages/flutter_tools/lib/src/base/utils.dart
+++ b/packages/flutter_tools/lib/src/base/utils.dart
@@ -61,6 +61,7 @@
 }
 
 File getUniqueFile(Directory dir, String baseName, String ext) {
+  FileSystem fs = dir.fileSystem;
   int i = 1;
 
   while (true) {
diff --git a/packages/flutter_tools/lib/src/runner/flutter_command_runner.dart b/packages/flutter_tools/lib/src/runner/flutter_command_runner.dart
index 5cb6ca9..09cc64f 100644
--- a/packages/flutter_tools/lib/src/runner/flutter_command_runner.dart
+++ b/packages/flutter_tools/lib/src/runner/flutter_command_runner.dart
@@ -22,6 +22,7 @@
 import '../globals.dart';
 import '../usage.dart';
 import '../version.dart';
+import '../vmservice.dart';
 
 const String kFlutterRootEnvironmentVariableName = 'FLUTTER_ROOT'; // should point to //flutter/ (root of flutter/flutter repo)
 const String kFlutterEngineEnvironmentVariableName = 'FLUTTER_ENGINE'; // should point to //engine/src/ (root of flutter/engine repo)
@@ -169,6 +170,7 @@
       enableRecordingProcessManager(recordTo);
       enableRecordingFileSystem(recordTo);
       await enableRecordingPlatform(recordTo);
+      VMService.enableRecordingConnection(recordTo);
     }
 
     if (globalResults['replay-from'] != null) {
@@ -178,6 +180,7 @@
       await enableReplayProcessManager(replayFrom);
       enableReplayFileSystem(replayFrom);
       await enableReplayPlatform(replayFrom);
+      VMService.enableReplayConnection(replayFrom);
     }
 
     logger.quiet = globalResults['quiet'];
diff --git a/packages/flutter_tools/lib/src/vmservice.dart b/packages/flutter_tools/lib/src/vmservice.dart
index b3114d8..cc6ded4 100644
--- a/packages/flutter_tools/lib/src/vmservice.dart
+++ b/packages/flutter_tools/lib/src/vmservice.dart
@@ -5,9 +5,9 @@
 import 'dart:async';
 import 'dart:convert' show BASE64;
 
+import 'package:file/file.dart';
 import 'package:json_rpc_2/error_code.dart' as rpc_error_code;
 import 'package:json_rpc_2/json_rpc_2.dart' as rpc;
-import 'package:meta/meta.dart';
 import 'package:stream_channel/stream_channel.dart';
 import 'package:web_socket_channel/io.dart';
 import 'package:web_socket_channel/web_socket_channel.dart';
@@ -15,14 +15,17 @@
 import 'base/common.dart';
 import 'base/file_system.dart';
 import 'globals.dart';
+import 'vmservice_record_replay.dart';
 
 /// A function that opens a two-way communication channel to the specified [uri].
-typedef StreamChannel<dynamic> OpenChannel(Uri uri);
+typedef StreamChannel<String> _OpenChannel(Uri uri);
 
-OpenChannel _openChannel = _defaultOpenChannel;
+_OpenChannel _openChannel = _defaultOpenChannel;
 
-StreamChannel<dynamic> _defaultOpenChannel(Uri uri) =>
-    new IOWebSocketChannel.connect(uri.toString());
+const String _kRecordingType = 'vmservice';
+
+StreamChannel<String> _defaultOpenChannel(Uri uri) =>
+    new IOWebSocketChannel.connect(uri.toString()).cast();
 
 /// The default VM service request timeout.
 const Duration kDefaultRequestTimeout = const Duration(seconds: 10);
@@ -43,9 +46,29 @@
     });
   }
 
-  @visibleForTesting
-  static void setOpenChannelForTesting(OpenChannel openChannel) {
-    _openChannel = openChannel;
+  /// Enables recording of VMService JSON-rpc activity to the specified base
+  /// recording [location].
+  ///
+  /// Activity will be recorded in a subdirectory of [location] named
+  /// `"vmservice"`. It is permissible for [location] to represent an existing
+  /// non-empty directory as long as there is no collision with the
+  /// `"vmservice"` subdirectory.
+  static void enableRecordingConnection(String location) {
+    Directory dir = getRecordingSink(location, _kRecordingType);
+    _openChannel = (Uri uri) {
+      StreamChannel<String> delegate = _defaultOpenChannel(uri);
+      return new RecordingVMServiceChannel(delegate, dir);
+    };
+  }
+
+  /// Enables VMService JSON-rpc replay mode.
+  ///
+  /// [location] must represent a directory to which VMService JSON-rpc
+  /// activity has been recorded (i.e. the result of having been previously
+  /// passed to [enableRecordingConnection]), or a [ToolExit] will be thrown.
+  static void enableReplayConnection(String location) {
+    Directory dir = getReplaySource(location, _kRecordingType);
+    _openChannel = (Uri uri) => new ReplayVMServiceChannel(dir);
   }
 
   /// Connect to a Dart VM Service at [httpUri].
@@ -57,8 +80,8 @@
     Duration requestTimeout: kDefaultRequestTimeout,
   }) {
     Uri wsUri = httpUri.replace(scheme: 'ws', path: fs.path.join(httpUri.path, 'ws'));
-    StreamChannel<dynamic> channel = _openChannel(wsUri);
-    rpc.Peer peer = new rpc.Peer.withoutJson(jsonDocument.bind(channel.cast()));
+    StreamChannel<String> channel = _openChannel(wsUri);
+    rpc.Peer peer = new rpc.Peer.withoutJson(jsonDocument.bind(channel));
     return new VMService._(peer, httpUri, wsUri, requestTimeout);
   }
 
diff --git a/packages/flutter_tools/lib/src/vmservice_record_replay.dart b/packages/flutter_tools/lib/src/vmservice_record_replay.dart
new file mode 100644
index 0000000..8afe560
--- /dev/null
+++ b/packages/flutter_tools/lib/src/vmservice_record_replay.dart
@@ -0,0 +1,299 @@
+// Copyright 2017 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+import 'dart:async';
+import 'dart:convert';
+
+import 'package:file/file.dart';
+import 'package:stream_channel/stream_channel.dart';
+
+import 'base/io.dart';
+import 'base/process.dart';
+import 'globals.dart';
+
+const String _kManifest = 'MANIFEST.txt';
+const String _kRequest = 'request';
+const String _kResponse = 'response';
+const String _kId = 'id';
+const String _kType = 'type';
+const String _kData = 'data';
+
+/// A [StreamChannel] that expects VM service (JSON-rpc) protocol messages and
+/// serializes all such messages to the file system for later playback.
+class RecordingVMServiceChannel extends DelegatingStreamChannel<String> {
+  final List<_Message> _messages = <_Message>[];
+
+  _RecordingStream _streamRecorder;
+  _RecordingSink _sinkRecorder;
+
+  RecordingVMServiceChannel(StreamChannel<String> delegate, Directory location)
+      : super(delegate) {
+    addShutdownHook(() async {
+      // Sort the messages such that they are ordered
+      // `[request1, response1, request2, response2, ...]`. This serves no
+      // other purpose than to make the serialized format more human-readable.
+      _messages.sort((_Message message1, _Message message2) {
+        int id1 = message1.id;
+        int id2 = message2.id;
+        int result = id1.compareTo(id2);
+        if (result != 0) {
+          return result;
+        } else if (message1.type == _kRequest) {
+          return -1;
+        } else {
+          return 1;
+        }
+      });
+
+      File file = _getManifest(location);
+      String json = new JsonEncoder.withIndent('  ').convert(_messages);
+      await file.writeAsString(json, flush: true);
+    });
+  }
+
+  @override
+  Stream<String> get stream {
+    if (_streamRecorder == null) {
+      _streamRecorder = new _RecordingStream(super.stream, _messages);
+    }
+    return _streamRecorder.stream;
+  }
+
+  @override
+  StreamSink<String> get sink {
+    if (_sinkRecorder == null) {
+      _sinkRecorder = new _RecordingSink(super.sink, _messages);
+    }
+    return _sinkRecorder;
+  }
+}
+
+/// Base class for request and response JSON-rpc messages.
+abstract class _Message {
+  final String type;
+  final Map<String, dynamic> data;
+
+  _Message(this.type, this.data);
+
+  factory _Message.fromRecording(Map<String, dynamic> recordingData) {
+    return recordingData[_kType] == _kRequest
+        ? new _Request(recordingData[_kData])
+        : new _Response(recordingData[_kData]);
+  }
+
+  int get id => data[_kId];
+
+  /// Allows [JsonEncoder] to properly encode objects of this type.
+  Map<String, dynamic> toJson() {
+    return <String, dynamic>{
+      _kType: type,
+      _kData: data,
+    };
+  }
+}
+
+/// A VM service JSON-rpc request (sent to the VM).
+class _Request extends _Message {
+  _Request(Map<String, dynamic> data) : super(_kRequest, data);
+  _Request.fromString(String data) : this(JSON.decoder.convert(data));
+}
+
+/// A VM service JSON-rpc response (from the VM).
+class _Response extends _Message {
+  _Response(Map<String, dynamic> data) : super(_kResponse, data);
+  _Response.fromString(String data) : this(JSON.decoder.convert(data));
+}
+
+/// A matching request/response pair.
+///
+/// A request and response match by virtue of having matching
+/// [IDs](_Message.id).
+class _Transaction {
+  _Request request;
+  _Response response;
+}
+
+/// A helper class that monitors a [Stream] of VM service JSON-rpc responses
+/// and saves the responses to a recording.
+class _RecordingStream {
+  final Stream<String> _delegate;
+  final StreamController<String> _controller;
+  final List<_Message> _recording;
+  StreamSubscription<String> _subscription;
+
+  _RecordingStream(Stream<String> stream, this._recording)
+      : _delegate = stream,
+        _controller = stream.isBroadcast
+            ? new StreamController<String>.broadcast()
+            : new StreamController<String>() {
+    _controller.onListen = () {
+      assert(_subscription == null);
+      _subscription = _listenToStream();
+    };
+    _controller.onCancel = () async {
+      assert(_subscription != null);
+      await _subscription.cancel();
+      _subscription = null;
+    };
+    _controller.onPause = () {
+      assert(_subscription != null && !_subscription.isPaused);
+      _subscription.pause();
+    };
+    _controller.onResume = () {
+      assert(_subscription != null && _subscription.isPaused);
+      _subscription.resume();
+    };
+  }
+
+  StreamSubscription<String> _listenToStream() {
+    return _delegate.listen(
+      (String element) {
+        _recording.add(new _Response.fromString(element));
+        _controller.add(element);
+      },
+      onError: (dynamic error, StackTrace stackTrace) {
+        // We currently don't support recording of errors.
+        _controller.addError(error, stackTrace);
+      },
+      onDone: () {
+        _controller.close();
+      },
+    );
+  }
+
+  /// The wrapped [Stream] to expose to callers.
+  Stream<String> get stream => _controller.stream;
+}
+
+/// A [StreamSink] that monitors VM service JSON-rpc requests and saves the
+/// requests to a recording.
+class _RecordingSink implements StreamSink<String> {
+  final StreamSink<String> _delegate;
+  final List<_Message> _recording;
+
+  _RecordingSink(this._delegate, this._recording);
+
+  @override
+  Future<dynamic> close() => _delegate.close();
+
+  @override
+  Future<dynamic> get done => _delegate.done;
+
+  @override
+  void add(String data) {
+    _delegate.add(data);
+    _recording.add(new _Request.fromString(data));
+  }
+
+  @override
+  void addError(dynamic errorEvent, [StackTrace stackTrace]) {
+    throw new UnimplementedError('Add support for this if the need ever arises');
+  }
+
+  @override
+  Future<dynamic> addStream(Stream<String> stream) {
+    throw new UnimplementedError('Add support for this if the need ever arises');
+  }
+}
+
+/// A [StreamChannel] that expects VM service (JSON-rpc) requests to be written
+/// to its [StreamChannel.sink], looks up those requests in a recording, and
+/// replays the corresponding responses back from the recording.
+class ReplayVMServiceChannel extends StreamChannelMixin<String> {
+  final Map<int, _Transaction> _transactions;
+  final StreamController<String> _controller = new StreamController<String>();
+  _ReplaySink _replaySink;
+
+  ReplayVMServiceChannel(Directory location)
+      : _transactions = _loadTransactions(location);
+
+  static Map<int, _Transaction> _loadTransactions(Directory location) {
+    File file = _getManifest(location);
+    String json = file.readAsStringSync();
+    Iterable<_Message> messages = JSON.decoder.convert(json).map<_Message>(_toMessage);
+    Map<int, _Transaction> transactions = <int, _Transaction>{};
+    for (_Message message in messages) {
+      _Transaction transaction =
+          transactions.putIfAbsent(message.id, () => new _Transaction());
+      if (message.type == _kRequest) {
+        assert(transaction.request == null);
+        transaction.request = message;
+      } else {
+        assert(transaction.response == null);
+        transaction.response = message;
+      }
+    }
+    return transactions;
+  }
+
+  static _Message _toMessage(Map<String, dynamic> jsonData) {
+    return new _Message.fromRecording(jsonData);
+  }
+
+  void send(_Request request) {
+    if (!_transactions.containsKey(request.id))
+      throw new ArgumentError('No matching invocation found');
+    _Transaction transaction = _transactions.remove(request.id);
+    // TODO(tvolkert): validate that `transaction.request` matches `request`
+    if (transaction.response == null) {
+      // This signals that when we were recording, the VM shut down before
+      // we received the response. This is typically due to the user quitting
+      // the app runner. We follow suit here and exit.
+      printStatus('Exiting due to dangling request');
+      exit(0);
+    } else {
+      _controller.add(JSON.encoder.convert(transaction.response.data));
+      if (_transactions.isEmpty)
+        _controller.close();
+    }
+  }
+
+  @override
+  StreamSink<String> get sink {
+    if (_replaySink == null)
+      _replaySink = new _ReplaySink(this);
+    return _replaySink;
+  }
+
+  @override
+  Stream<String> get stream => _controller.stream;
+}
+
+class _ReplaySink implements StreamSink<String> {
+  final ReplayVMServiceChannel channel;
+  final Completer<Null> _completer = new Completer<Null>();
+
+  _ReplaySink(this.channel);
+
+  @override
+  Future<dynamic> close() {
+    _completer.complete();
+    return _completer.future;
+  }
+
+  @override
+  Future<dynamic> get done => _completer.future;
+
+  @override
+  void add(String data) {
+    if (_completer.isCompleted)
+      throw new StateError('Sink already closed');
+    channel.send(new _Request.fromString(data));
+  }
+
+  @override
+  void addError(dynamic errorEvent, [StackTrace stackTrace]) {
+    throw new UnimplementedError('Add support for this if the need ever arises');
+  }
+
+  @override
+  Future<dynamic> addStream(Stream<String> stream) {
+    throw new UnimplementedError('Add support for this if the need ever arises');
+  }
+}
+
+File _getManifest(Directory location) {
+  String path = location.fileSystem.path.join(location.path, _kManifest);
+  return location.fileSystem.file(path);
+}