Reland: Attach looks at future observatory URIs (#45307)

diff --git a/packages/flutter_tools/lib/src/android/android_device.dart b/packages/flutter_tools/lib/src/android/android_device.dart
index a47581d..38959ff 100644
--- a/packages/flutter_tools/lib/src/android/android_device.dart
+++ b/packages/flutter_tools/lib/src/android/android_device.dart
@@ -855,25 +855,9 @@
   @override
   String get name => device.name;
 
-  DateTime _timeOrigin;
-
-  DateTime _adbTimestampToDateTime(String adbTimestamp) {
-    // The adb timestamp format is: mm-dd hours:minutes:seconds.milliseconds
-    // Dart's DateTime parse function accepts this format so long as we provide
-    // the year, resulting in:
-    // yyyy-mm-dd hours:minutes:seconds.milliseconds.
-    return DateTime.parse('${DateTime.now().year}-$adbTimestamp');
-  }
-
   void _start() {
-    // Start the adb logcat process.
-    final List<String> args = <String>['shell', '-x', 'logcat', '-v', 'time'];
-    final String lastTimestamp = device.lastLogcatTimestamp;
-    if (lastTimestamp != null) {
-      _timeOrigin = _adbTimestampToDateTime(lastTimestamp);
-    } else {
-      _timeOrigin = null;
-    }
+    // Start the adb logcat process and filter logs by the "flutter" tag.
+    final List<String> args = <String>['shell', '-x', 'logcat', '-v', 'time', '-s', 'flutter'];
     processUtils.start(device.adbCommandForDevice(args)).then<void>((Process process) {
       _process = process;
       // We expect logcat streams to occasionally contain invalid utf-8,
@@ -923,18 +907,7 @@
   // mm-dd hh:mm:ss.milliseconds Priority/Tag( PID): ....
   void _onLine(String line) {
     final Match timeMatch = AndroidDevice._timeRegExp.firstMatch(line);
-    if (timeMatch == null) {
-      return;
-    }
-    if (_timeOrigin != null) {
-      final String timestamp = timeMatch.group(0);
-      final DateTime time = _adbTimestampToDateTime(timestamp);
-      if (!time.isAfter(_timeOrigin)) {
-        // Ignore log messages before the origin.
-        return;
-      }
-    }
-    if (line.length == timeMatch.end) {
+    if (timeMatch == null || line.length == timeMatch.end) {
       return;
     }
     // Chop off the time.
diff --git a/packages/flutter_tools/lib/src/commands/attach.dart b/packages/flutter_tools/lib/src/commands/attach.dart
index 1e39758..c82a2bd 100644
--- a/packages/flutter_tools/lib/src/commands/attach.dart
+++ b/packages/flutter_tools/lib/src/commands/attach.dart
@@ -4,6 +4,8 @@
 
 import 'dart:async';
 
+import 'package:meta/meta.dart';
+
 import '../artifacts.dart';
 import '../base/common.dart';
 import '../base/context.dart';
@@ -200,16 +202,14 @@
             notifyingLogger: NotifyingLogger(), logToStdout: true)
       : null;
 
-    Uri observatoryUri;
+    Stream<Uri> observatoryUri;
     bool usesIpv6 = ipv6;
     final String ipv6Loopback = InternetAddress.loopbackIPv6.address;
     final String ipv4Loopback = InternetAddress.loopbackIPv4.address;
     final String hostname = usesIpv6 ? ipv6Loopback : ipv4Loopback;
 
-    bool attachLogger = false;
     if (devicePort == null && debugUri == null) {
       if (device is FuchsiaDevice) {
-        attachLogger = true;
         final String module = stringArg('module');
         if (module == null) {
           throwToolExit('\'--module\' is required for attaching to a Fuchsia device');
@@ -218,8 +218,7 @@
         FuchsiaIsolateDiscoveryProtocol isolateDiscoveryProtocol;
         try {
           isolateDiscoveryProtocol = device.getIsolateDiscoveryProtocol(module);
-          observatoryUri = await isolateDiscoveryProtocol.uri;
-          printStatus('Done.'); // FYI, this message is used as a sentinel in tests.
+          observatoryUri = Stream<Uri>.fromFuture(isolateDiscoveryProtocol.uri).asBroadcastStream();
         } catch (_) {
           isolateDiscoveryProtocol?.dispose();
           final List<ForwardedPort> ports = device.portForwarder.forwardedPorts.toList();
@@ -229,85 +228,55 @@
           rethrow;
         }
       } else if ((device is IOSDevice) || (device is IOSSimulator)) {
-        observatoryUri = await MDnsObservatoryDiscovery.instance.getObservatoryUri(
-          appId,
-          device,
-          usesIpv6: usesIpv6,
-          deviceVmservicePort: deviceVmservicePort,
-        );
+        observatoryUri = Stream<Uri>
+          .fromFuture(
+            MDnsObservatoryDiscovery.instance.getObservatoryUri(
+              appId,
+              device,
+              usesIpv6: usesIpv6,
+              deviceVmservicePort: deviceVmservicePort,
+            )
+          ).asBroadcastStream();
       }
       // If MDNS discovery fails or we're not on iOS, fallback to ProtocolDiscovery.
       if (observatoryUri == null) {
-        ProtocolDiscovery observatoryDiscovery;
-        try {
-          observatoryDiscovery = ProtocolDiscovery.observatory(
+        final ProtocolDiscovery observatoryDiscovery =
+          ProtocolDiscovery.observatory(
             device.getLogReader(),
             portForwarder: device.portForwarder,
             ipv6: ipv6,
             devicePort: deviceVmservicePort,
             hostPort: hostVmservicePort,
           );
-          printStatus('Waiting for a connection from Flutter on ${device.name}...');
-          observatoryUri = await observatoryDiscovery.uri;
-          // Determine ipv6 status from the scanned logs.
-          usesIpv6 = observatoryDiscovery.ipv6;
-          printStatus('Done.'); // FYI, this message is used as a sentinel in tests.
-        } catch (error) {
-          throwToolExit('Failed to establish a debug connection with ${device.name}: $error');
-        } finally {
-          await observatoryDiscovery?.cancel();
-        }
+        printStatus('Waiting for a connection from Flutter on ${device.name}...');
+        observatoryUri = observatoryDiscovery.uris;
+        // Determine ipv6 status from the scanned logs.
+        usesIpv6 = observatoryDiscovery.ipv6;
       }
     } else {
-      observatoryUri = await buildObservatoryUri(
-        device,
-        debugUri?.host ?? hostname,
-        devicePort ?? debugUri.port,
-        hostVmservicePort,
-        debugUri?.path,
-      );
-    }
-    try {
-      final bool useHot = getBuildInfo().isDebug;
-      final FlutterDevice flutterDevice = await FlutterDevice.create(
-        device,
-        flutterProject: flutterProject,
-        trackWidgetCreation: boolArg('track-widget-creation'),
-        fileSystemRoots: stringsArg('filesystem-root'),
-        fileSystemScheme: stringArg('filesystem-scheme'),
-        viewFilter: stringArg('isolate-filter'),
-        target: stringArg('target'),
-        targetModel: TargetModel(stringArg('target-model')),
-        buildMode: getBuildMode(),
-        dartDefines: dartDefines,
-      );
-      flutterDevice.observatoryUris = <Uri>[ observatoryUri ];
-      final List<FlutterDevice> flutterDevices =  <FlutterDevice>[flutterDevice];
-      final DebuggingOptions debuggingOptions = DebuggingOptions.enabled(getBuildInfo());
-      terminal.usesTerminalUi = daemon == null;
-      final ResidentRunner runner = useHot ?
-          hotRunnerFactory.build(
-            flutterDevices,
-            target: targetFile,
-            debuggingOptions: debuggingOptions,
-            packagesFilePath: globalResults['packages'] as String,
-            projectRootPath: stringArg('project-root'),
-            dillOutputPath: stringArg('output-dill'),
-            ipv6: usesIpv6,
-            flutterProject: flutterProject,
+      observatoryUri = Stream<Uri>
+        .fromFuture(
+          buildObservatoryUri(
+            device,
+            debugUri?.host ?? hostname,
+            devicePort ?? debugUri.port,
+            hostVmservicePort,
+            debugUri?.path,
           )
-        : ColdRunner(
-            flutterDevices,
-            target: targetFile,
-            debuggingOptions: debuggingOptions,
-            ipv6: usesIpv6,
-          );
-      if (attachLogger) {
-        flutterDevice.startEchoingDeviceLog();
-      }
+        ).asBroadcastStream();
+    }
 
+    terminal.usesTerminalUi = daemon == null;
+
+    try {
       int result;
       if (daemon != null) {
+        final ResidentRunner runner = await createResidentRunner(
+          observatoryUris: observatoryUri,
+          device: device,
+          flutterProject: flutterProject,
+          usesIpv6: usesIpv6,
+        );
         AppInstance app;
         try {
           app = await daemon.appDomain.launch(
@@ -324,20 +293,34 @@
         }
         result = await app.runner.waitForAppToFinish();
         assert(result != null);
-      } else {
+        return;
+      }
+      while (true) {
+        final ResidentRunner runner = await createResidentRunner(
+          observatoryUris: observatoryUri,
+          device: device,
+          flutterProject: flutterProject,
+          usesIpv6: usesIpv6,
+        );
         final Completer<void> onAppStart = Completer<void>.sync();
+        TerminalHandler terminalHandler;
         unawaited(onAppStart.future.whenComplete(() {
-          TerminalHandler(runner)
+          terminalHandler = TerminalHandler(runner)
             ..setupTerminal()
             ..registerSignalHandlers();
         }));
         result = await runner.attach(
           appStartedCompleter: onAppStart,
         );
+        if (result != 0) {
+          throwToolExit(null, exitCode: result);
+        }
+        terminalHandler?.stop();
         assert(result != null);
-      }
-      if (result != 0) {
-        throwToolExit(null, exitCode: result);
+        if (runner.exited || !runner.isWaitingForObservatory) {
+          break;
+        }
+        printStatus('Waiting for a new connection from Flutter on ${device.name}...');
       }
     } finally {
       final List<ForwardedPort> ports = device.portForwarder.forwardedPorts.toList();
@@ -347,6 +330,52 @@
     }
   }
 
+  Future<ResidentRunner> createResidentRunner({
+    @required Stream<Uri> observatoryUris,
+    @required Device device,
+    @required FlutterProject flutterProject,
+    @required bool usesIpv6,
+  }) async {
+    assert(observatoryUris != null);
+    assert(device != null);
+    assert(flutterProject != null);
+    assert(usesIpv6 != null);
+
+    final FlutterDevice flutterDevice = await FlutterDevice.create(
+      device,
+      flutterProject: flutterProject,
+      trackWidgetCreation: boolArg('track-widget-creation'),
+      fileSystemRoots: stringsArg('filesystem-root'),
+      fileSystemScheme: stringArg('filesystem-scheme'),
+      viewFilter: stringArg('isolate-filter'),
+      target: stringArg('target'),
+      targetModel: TargetModel(stringArg('target-model')),
+      buildMode: getBuildMode(),
+      dartDefines: dartDefines,
+    );
+    flutterDevice.observatoryUris = observatoryUris;
+    final List<FlutterDevice> flutterDevices =  <FlutterDevice>[flutterDevice];
+    final DebuggingOptions debuggingOptions = DebuggingOptions.enabled(getBuildInfo());
+
+    return getBuildInfo().isDebug
+      ? hotRunnerFactory.build(
+          flutterDevices,
+          target: targetFile,
+          debuggingOptions: debuggingOptions,
+          packagesFilePath: globalResults['packages'] as String,
+          projectRootPath: stringArg('project-root'),
+          dillOutputPath: stringArg('output-dill'),
+          ipv6: usesIpv6,
+          flutterProject: flutterProject,
+        )
+      : ColdRunner(
+          flutterDevices,
+          target: targetFile,
+          debuggingOptions: debuggingOptions,
+          ipv6: usesIpv6,
+        );
+  }
+
   Future<void> _validateArguments() async { }
 }
 
diff --git a/packages/flutter_tools/lib/src/devfs.dart b/packages/flutter_tools/lib/src/devfs.dart
index 2bfc891..9bba2d7 100644
--- a/packages/flutter_tools/lib/src/devfs.dart
+++ b/packages/flutter_tools/lib/src/devfs.dart
@@ -11,6 +11,7 @@
 import 'base/context.dart';
 import 'base/file_system.dart';
 import 'base/io.dart';
+import 'base/net.dart';
 import 'build_info.dart';
 import 'bundle.dart';
 import 'compile.dart';
@@ -265,17 +266,20 @@
 
 class _DevFSHttpWriter {
   _DevFSHttpWriter(this.fsName, VMService serviceProtocol)
-    : httpAddress = serviceProtocol.httpAddress;
+    : httpAddress = serviceProtocol.httpAddress,
+      _client = (context.get<HttpClientFactory>() == null)
+        ? HttpClient()
+        : context.get<HttpClientFactory>()();
 
   final String fsName;
   final Uri httpAddress;
+  final HttpClient _client;
 
   static const int kMaxInFlight = 6;
 
   int _inFlight = 0;
   Map<Uri, DevFSContent> _outstanding;
   Completer<void> _completer;
-  final HttpClient _client = HttpClient();
 
   Future<void> write(Map<Uri, DevFSContent> entries) async {
     _client.maxConnectionsPerHost = kMaxInFlight;
diff --git a/packages/flutter_tools/lib/src/protocol_discovery.dart b/packages/flutter_tools/lib/src/protocol_discovery.dart
index 803c957..796203d 100644
--- a/packages/flutter_tools/lib/src/protocol_discovery.dart
+++ b/packages/flutter_tools/lib/src/protocol_discovery.dart
@@ -17,16 +17,23 @@
     this.logReader,
     this.serviceName, {
     this.portForwarder,
+    this.throttleDuration,
     this.hostPort,
     this.devicePort,
     this.ipv6,
-  }) : assert(logReader != null) {
-    _deviceLogSubscription = logReader.logLines.listen(_handleLine);
+  }) : assert(logReader != null)
+  {
+    _deviceLogSubscription = logReader.logLines.listen(
+      _handleLine,
+      onDone: _stopScrapingLogs,
+    );
+    _uriStreamController = _BufferedStreamController<Uri>();
   }
 
   factory ProtocolDiscovery.observatory(
     DeviceLogReader logReader, {
     DevicePortForwarder portForwarder,
+    Duration throttleDuration = const Duration(milliseconds: 200),
     @required int hostPort,
     @required int devicePort,
     @required bool ipv6,
@@ -36,6 +43,7 @@
       logReader,
       kObservatoryService,
       portForwarder: portForwarder,
+      throttleDuration: throttleDuration,
       hostPort: hostPort,
       devicePort: devicePort,
       ipv6: ipv6,
@@ -49,50 +57,70 @@
   final int devicePort;
   final bool ipv6;
 
-  final Completer<Uri> _completer = Completer<Uri>();
+  /// The time to wait before forwarding a new observatory URIs from [logReader].
+  final Duration throttleDuration;
 
   StreamSubscription<String> _deviceLogSubscription;
+  _BufferedStreamController<Uri> _uriStreamController;
 
   /// The discovered service URI.
+  /// Use [uris] instead.
+  // TODO(egarciad): replace `uri` for `uris`.
+  Future<Uri> get uri {
+    return uris.first;
+  }
+
+  /// The discovered service URIs.
   ///
-  /// Port forwarding is only attempted when this is invoked, in case we never
-  /// need to port forward.
-  Future<Uri> get uri async {
-    final Uri rawUri = await _completer.future;
-    return await _forwardPort(rawUri);
+  /// When a new observatory URI is available in [logReader],
+  /// the URIs are forwarded at most once every [throttleDuration].
+  ///
+  /// Port forwarding is only attempted when this is invoked,
+  /// for each observatory URI in the stream.
+  Stream<Uri> get uris {
+    return _uriStreamController.stream
+      .transform(_throttle<Uri>(
+        waitDuration: throttleDuration,
+      ))
+      .asyncMap<Uri>(_forwardPort);
   }
 
   Future<void> cancel() => _stopScrapingLogs();
 
   Future<void> _stopScrapingLogs() async {
+    await _uriStreamController?.close();
     await _deviceLogSubscription?.cancel();
     _deviceLogSubscription = null;
   }
 
+  Match _getPatternMatch(String line) {
+    final RegExp r = RegExp('${RegExp.escape(serviceName)} listening on ((http|\/\/)[a-zA-Z0-9:/=_\\-\.\\[\\]]+)');
+    return r.firstMatch(line);
+  }
+
+  Uri _getObservatoryUri(String line) {
+    final Match match = _getPatternMatch(line);
+    if (match != null) {
+      return Uri.parse(match[1]);
+    }
+    return null;
+  }
+
   void _handleLine(String line) {
     Uri uri;
-    final RegExp r = RegExp('${RegExp.escape(serviceName)} listening on ((http|\/\/)[a-zA-Z0-9:/=_\\-\.\\[\\]]+)');
-    final Match match = r.firstMatch(line);
-
-    if (match != null) {
-      try {
-        uri = Uri.parse(match[1]);
-      } on FormatException catch (error, stackTrace) {
-        _stopScrapingLogs();
-        _completer.completeError(error, stackTrace);
-      }
+    try {
+      uri = _getObservatoryUri(line);
+    } on FormatException catch(error, stackTrace) {
+      _uriStreamController.addError(error, stackTrace);
     }
     if (uri == null) {
       return;
     }
-    if (devicePort != null  &&  uri.port != devicePort) {
+    if (devicePort != null && uri.port != devicePort) {
       printTrace('skipping potential observatory $uri due to device port mismatch');
       return;
     }
-
-    assert(!_completer.isCompleted);
-    _stopScrapingLogs();
-    _completer.complete(uri);
+    _uriStreamController.add(uri);
   }
 
   Future<Uri> _forwardPort(Uri deviceUri) async {
@@ -110,7 +138,101 @@
     if (ipv6) {
       hostUri = hostUri.replace(host: InternetAddress.loopbackIPv6.host);
     }
-
     return hostUri;
   }
 }
+
+/// Provides a broadcast stream controller that buffers the events
+/// if there isn't a listener attached.
+/// The events are then delivered when a listener is attached to the stream.
+class _BufferedStreamController<T> {
+  _BufferedStreamController() : _events = <dynamic>[];
+
+  /// The stream that this controller is controlling.
+  Stream<T> get stream {
+    return _streamController.stream;
+  }
+
+  StreamController<T> _streamControllerInstance;
+
+  StreamController<T> get _streamController {
+    _streamControllerInstance ??= StreamController<T>.broadcast(onListen: () {
+      for (dynamic event in _events) {
+        assert(!(T is List));
+        if (event is T) {
+          _streamControllerInstance.add(event);
+        } else {
+          _streamControllerInstance.addError(
+            event.first as Object,
+            event.last as StackTrace,
+          );
+        }
+      }
+      _events.clear();
+    });
+    return _streamControllerInstance;
+  }
+
+  final List<dynamic> _events;
+
+  /// Sends [event] if there is a listener attached to the broadcast stream.
+  /// Otherwise, it enqueues [event] until a listener is attached.
+  void add(T event) {
+    if (_streamController.hasListener) {
+      _streamController.add(event);
+    } else {
+      _events.add(event);
+    }
+  }
+
+  /// Sends or enqueues an error event.
+  void addError(Object error, [StackTrace stackTrace]) {
+    if (_streamController.hasListener) {
+      _streamController.addError(error, stackTrace);
+    } else {
+      _events.add(<dynamic>[error, stackTrace]);
+    }
+  }
+
+  /// Closes the stream.
+  Future<void> close() {
+    return _streamController.close();
+  }
+}
+
+/// This transformer will produce an event at most once every [waitDuration].
+///
+/// For example, consider a `waitDuration` of `10ms`, and list of event names
+/// and arrival times: `a (0ms), b (5ms), c (11ms), d (21ms)`.
+/// The events `c` and `d` will be produced as a result.
+StreamTransformer<S, S> _throttle<S>({
+  @required Duration waitDuration,
+}) {
+  assert(waitDuration != null);
+
+  S latestLine;
+  int lastExecution;
+  Future<void> throttleFuture;
+
+  return StreamTransformer<S, S>
+    .fromHandlers(
+      handleData: (S value, EventSink<S> sink) {
+        latestLine = value;
+
+        final int currentTime = DateTime.now().millisecondsSinceEpoch;
+        lastExecution ??= currentTime;
+        final int remainingTime = currentTime - lastExecution;
+        final int nextExecutionTime = remainingTime > waitDuration.inMilliseconds
+          ? 0
+          : waitDuration.inMilliseconds - remainingTime;
+
+        throttleFuture ??= Future<void>
+          .delayed(Duration(milliseconds: nextExecutionTime))
+          .whenComplete(() {
+            sink.add(latestLine);
+            throttleFuture = null;
+            lastExecution = DateTime.now().millisecondsSinceEpoch;
+          });
+      }
+    );
+}
diff --git a/packages/flutter_tools/lib/src/resident_runner.dart b/packages/flutter_tools/lib/src/resident_runner.dart
index 9b3ebb6..ea34ae1 100644
--- a/packages/flutter_tools/lib/src/resident_runner.dart
+++ b/packages/flutter_tools/lib/src/resident_runner.dart
@@ -131,16 +131,20 @@
 
   final Device device;
   final ResidentCompiler generator;
-  List<Uri> observatoryUris;
+  Stream<Uri> observatoryUris;
   List<VMService> vmServices;
   DevFS devFS;
   ApplicationPackage package;
   List<String> fileSystemRoots;
   String fileSystemScheme;
   StreamSubscription<String> _loggingSubscription;
+  bool _isListeningForObservatoryUri;
   final String viewFilter;
   final bool trackWidgetCreation;
 
+  /// Whether the stream [observatoryUris] is still open.
+  bool get isWaitingForObservatory => _isListeningForObservatoryUri ?? false;
+
   /// If the [reloadSources] parameter is not null the 'reloadSources' service
   /// will be registered.
   /// The 'reloadSources' service can be used by other Service Protocol clients
@@ -154,23 +158,50 @@
     ReloadSources reloadSources,
     Restart restart,
     CompileExpression compileExpression,
-  }) async {
-    if (vmServices != null) {
-      return;
-    }
-    final List<VMService> localVmServices = List<VMService>(observatoryUris.length);
-    for (int i = 0; i < observatoryUris.length; i += 1) {
-      printTrace('Connecting to service protocol: ${observatoryUris[i]}');
-      localVmServices[i] = await VMService.connect(
-        observatoryUris[i],
-        reloadSources: reloadSources,
-        restart: restart,
-        compileExpression: compileExpression,
-      );
-      printTrace('Successfully connected to service protocol: ${observatoryUris[i]}');
-    }
-    vmServices = localVmServices;
-    device.getLogReader(app: package).connectedVMServices = vmServices;
+  }) {
+    final Completer<void> completer = Completer<void>();
+    StreamSubscription<void> subscription;
+    bool isWaitingForVm = false;
+
+    subscription = observatoryUris.listen((Uri observatoryUri) async {
+      // FYI, this message is used as a sentinel in tests.
+      printTrace('Connecting to service protocol: $observatoryUri');
+      isWaitingForVm = true;
+      VMService service;
+
+      try {
+        service = await VMService.connect(
+          observatoryUri,
+          reloadSources: reloadSources,
+          restart: restart,
+          compileExpression: compileExpression,
+        );
+      } on Exception catch (exception) {
+        printTrace('Fail to connect to service protocol: $observatoryUri: $exception');
+        if (!completer.isCompleted && !_isListeningForObservatoryUri) {
+          completer.completeError('failed to connect to $observatoryUri');
+        }
+        return;
+      }
+      if (completer.isCompleted) {
+        return;
+      }
+      printTrace('Successfully connected to service protocol: $observatoryUri');
+
+      vmServices = <VMService>[service];
+      device.getLogReader(app: package).connectedVMServices = vmServices;
+      completer.complete();
+      await subscription.cancel();
+    }, onError: (dynamic error) {
+      printTrace('Fail to handle observatory URI: $error');
+    }, onDone: () {
+      _isListeningForObservatoryUri = false;
+      if (!completer.isCompleted && !isWaitingForVm) {
+        completer.completeError('connection to device ended too early');
+      }
+    });
+    _isListeningForObservatoryUri = true;
+    return completer.future;
   }
 
   Future<void> refreshViews() async {
@@ -221,6 +252,7 @@
     if (flutterViews.any((FlutterView view) {
       return view != null &&
              view.uiIsolate != null &&
+             view.uiIsolate.pauseEvent != null &&
              view.uiIsolate.pauseEvent.isPauseEvent;
       }
     )) {
@@ -431,9 +463,13 @@
       return 2;
     }
     if (result.hasObservatory) {
-      observatoryUris = <Uri>[result.observatoryUri];
+      observatoryUris = Stream<Uri>
+        .value(result.observatoryUri)
+        .asBroadcastStream();
     } else {
-      observatoryUris = <Uri>[];
+      observatoryUris = const Stream<Uri>
+        .empty()
+        .asBroadcastStream();
     }
     return 0;
   }
@@ -491,9 +527,13 @@
       return 2;
     }
     if (result.hasObservatory) {
-      observatoryUris = <Uri>[result.observatoryUri];
+      observatoryUris = Stream<Uri>
+        .value(result.observatoryUri)
+        .asBroadcastStream();
     } else {
-      observatoryUris = <Uri>[];
+      observatoryUris = const Stream<Uri>
+        .empty()
+        .asBroadcastStream();
     }
     return 0;
   }
@@ -613,14 +653,21 @@
   /// The parent location of the incremental artifacts.
   @visibleForTesting
   final Directory artifactDirectory;
-  final Completer<int> _finished = Completer<int>();
   final String packagesFilePath;
   final String projectRootPath;
   final String mainPath;
   final AssetBundle assetBundle;
 
   bool _exited = false;
-  bool hotMode ;
+  Completer<int> _finished = Completer<int>();
+  bool hotMode;
+
+  /// Returns true if every device is streaming observatory URIs.
+  bool get isWaitingForObservatory {
+    return flutterDevices.every((FlutterDevice device) {
+      return device.isWaitingForObservatory;
+    });
+  }
 
   String get dillOutputPath => _dillOutputPath ?? fs.path.join(artifactDirectory.path, 'app.dill');
   String getReloadPath({ bool fullRestart }) => mainPath + (fullRestart ? '' : '.incremental') + '.dill';
@@ -631,6 +678,9 @@
   bool get isRunningRelease => debuggingOptions.buildInfo.isRelease;
   bool get supportsServiceProtocol => isRunningDebug || isRunningProfile;
 
+  /// Returns [true] if the resident runner exited after invoking [exit()].
+  bool get exited => _exited;
+
   /// Whether this runner can hot restart.
   ///
   /// To prevent scenarios where only a subset of devices are hot restarted,
@@ -862,6 +912,8 @@
       throw 'The service protocol is not enabled.';
     }
 
+    _finished = Completer<int>();
+
     bool viewFound = false;
     for (FlutterDevice device in flutterDevices) {
       await device.connect(
@@ -1045,15 +1097,33 @@
     subscription = terminal.keystrokes.listen(processTerminalInput);
   }
 
+
+  final Map<io.ProcessSignal, Object> _signalTokens = <io.ProcessSignal, Object>{};
+
+  void _addSignalHandler(io.ProcessSignal signal, SignalHandler handler) {
+    _signalTokens[signal] = signals.addHandler(signal, handler);
+  }
+
   void registerSignalHandlers() {
     assert(residentRunner.stayResident);
-    signals.addHandler(io.ProcessSignal.SIGINT, _cleanUp);
-    signals.addHandler(io.ProcessSignal.SIGTERM, _cleanUp);
+
+    _addSignalHandler(io.ProcessSignal.SIGINT, _cleanUp);
+    _addSignalHandler(io.ProcessSignal.SIGTERM, _cleanUp);
     if (!residentRunner.supportsServiceProtocol || !residentRunner.supportsRestart) {
       return;
     }
-    signals.addHandler(io.ProcessSignal.SIGUSR1, _handleSignal);
-    signals.addHandler(io.ProcessSignal.SIGUSR2, _handleSignal);
+    _addSignalHandler(io.ProcessSignal.SIGUSR1, _handleSignal);
+    _addSignalHandler(io.ProcessSignal.SIGUSR2, _handleSignal);
+  }
+
+  /// Unregisters terminal signal and keystroke handlers.
+  void stop() {
+    assert(residentRunner.stayResident);
+    for (MapEntry<io.ProcessSignal, Object> entry in _signalTokens.entries) {
+      signals.removeHandler(entry.key, entry.value);
+    }
+    _signalTokens.clear();
+    subscription.cancel();
   }
 
   /// Returns [true] if the input has been handled by this function.
diff --git a/packages/flutter_tools/lib/src/run_cold.dart b/packages/flutter_tools/lib/src/run_cold.dart
index 21f0776..98499b8 100644
--- a/packages/flutter_tools/lib/src/run_cold.dart
+++ b/packages/flutter_tools/lib/src/run_cold.dart
@@ -83,7 +83,7 @@
     if (flutterDevices.first.observatoryUris != null) {
       // For now, only support one debugger connection.
       connectionInfoCompleter?.complete(DebugConnectionInfo(
-        httpUri: flutterDevices.first.observatoryUris.first,
+        httpUri: flutterDevices.first.vmServices.first.httpAddress,
         wsUri: flutterDevices.first.vmServices.first.wsAddress,
       ));
     }
@@ -183,10 +183,9 @@
     bool haveAnything = false;
     for (FlutterDevice device in flutterDevices) {
       final String dname = device.device.name;
-      if (device.observatoryUris != null) {
-        for (Uri uri in device.observatoryUris) {
-          printStatus('An Observatory debugger and profiler on $dname is available at $uri');
-          haveAnything = true;
+      if (device.vmServices != null) {
+        for (VMService vm in device.vmServices) {
+          printStatus('An Observatory debugger and profiler on $dname is available at: ${vm.httpAddress}');
         }
       }
     }
diff --git a/packages/flutter_tools/lib/src/run_hot.dart b/packages/flutter_tools/lib/src/run_hot.dart
index 1886aa1..649e326 100644
--- a/packages/flutter_tools/lib/src/run_hot.dart
+++ b/packages/flutter_tools/lib/src/run_hot.dart
@@ -180,7 +180,7 @@
         // Only handle one debugger connection.
         connectionInfoCompleter.complete(
           DebugConnectionInfo(
-            httpUri: flutterDevices.first.observatoryUris.first,
+            httpUri: flutterDevices.first.vmServices.first.httpAddress,
             wsUri: flutterDevices.first.vmServices.first.wsAddress,
             baseUri: baseUris.first.toString(),
           ),
@@ -987,8 +987,8 @@
     printStatus(message);
     for (FlutterDevice device in flutterDevices) {
       final String dname = device.device.name;
-      for (Uri uri in device.observatoryUris) {
-        printStatus('An Observatory debugger and profiler on $dname is available at: $uri');
+      for (VMService vm in device.vmServices) {
+        printStatus('An Observatory debugger and profiler on $dname is available at: ${vm.httpAddress}');
       }
     }
     final String quitMessage = _didAttach
diff --git a/packages/flutter_tools/test/commands.shard/hermetic/attach_test.dart b/packages/flutter_tools/test/commands.shard/hermetic/attach_test.dart
index 7a99e5b..e100bc6 100644
--- a/packages/flutter_tools/test/commands.shard/hermetic/attach_test.dart
+++ b/packages/flutter_tools/test/commands.shard/hermetic/attach_test.dart
@@ -7,24 +7,31 @@
 import 'package:file/memory.dart';
 import 'package:flutter_tools/src/base/common.dart';
 import 'package:flutter_tools/src/base/file_system.dart';
+import 'package:flutter_tools/src/base/io.dart';
 import 'package:flutter_tools/src/base/logger.dart';
+import 'package:flutter_tools/src/base/net.dart';
 import 'package:flutter_tools/src/base/platform.dart';
 import 'package:flutter_tools/src/base/terminal.dart';
 import 'package:flutter_tools/src/cache.dart';
 import 'package:flutter_tools/src/commands/attach.dart';
+import 'package:flutter_tools/src/convert.dart';
 import 'package:flutter_tools/src/device.dart';
 import 'package:flutter_tools/src/ios/devices.dart';
 import 'package:flutter_tools/src/mdns_discovery.dart';
+import 'package:flutter_tools/src/project.dart';
 import 'package:flutter_tools/src/resident_runner.dart';
 import 'package:flutter_tools/src/run_hot.dart';
+import 'package:flutter_tools/src/vmservice.dart';
 import 'package:meta/meta.dart';
 import 'package:mockito/mockito.dart';
 import 'package:process/process.dart';
+import 'package:quiver/testing/async.dart';
 
 import '../../src/common.dart';
 import '../../src/context.dart';
 import '../../src/mocks.dart';
 
+
 void main() {
   group('attach', () {
     StreamLogger logger;
@@ -49,11 +56,16 @@
       MockDeviceLogReader mockLogReader;
       MockPortForwarder portForwarder;
       MockAndroidDevice device;
+      MockProcessManager mockProcessManager;
+      MockHttpClient httpClient;
+      Completer<void> vmServiceDoneCompleter;
 
       setUp(() {
+        mockProcessManager = MockProcessManager();
         mockLogReader = MockDeviceLogReader();
         portForwarder = MockPortForwarder();
         device = MockAndroidDevice();
+        vmServiceDoneCompleter = Completer<void>();
         when(device.portForwarder)
           .thenReturn(portForwarder);
         when(portForwarder.forward(devicePort, hostPort: anyNamed('hostPort')))
@@ -63,6 +75,14 @@
         when(portForwarder.unforward(any))
           .thenAnswer((_) async => null);
 
+        final HttpClientRequest httpClientRequest = MockHttpClientRequest();
+        httpClient = MockHttpClient();
+        when(httpClient.putUrl(any))
+          .thenAnswer((_) => Future<HttpClientRequest>.value(httpClientRequest));
+        when(httpClientRequest.headers).thenReturn(MockHttpHeaders());
+        when(httpClientRequest.close())
+          .thenAnswer((_) => Future<HttpClientResponse>.value(MockHttpClientResponse()));
+
         // We cannot add the device to a device manager because that is
         // only enabled by the context of each testUsingContext call.
         //
@@ -77,17 +97,15 @@
       testUsingContext('finds observatory port and forwards', () async {
         when(device.getLogReader()).thenAnswer((_) {
           // Now that the reader is used, start writing messages to it.
-          Timer.run(() {
-            mockLogReader.addLine('Foo');
-            mockLogReader.addLine('Observatory listening on http://127.0.0.1:$devicePort');
-          });
+          mockLogReader.addLine('Foo');
+          mockLogReader.addLine('Observatory listening on http://127.0.0.1:$devicePort');
           return mockLogReader;
         });
         testDeviceManager.addDevice(device);
         final Completer<void> completer = Completer<void>();
         final StreamSubscription<String> loggerSubscription = logger.stream.listen((String message) {
-          if (message == '[stdout] Done.') {
-            // The "Done." message is output by the AttachCommand when it's done.
+          if (message == '[verbose] Observatory URL on device: http://127.0.0.1:$devicePort') {
+            // The "Observatory URL on device" message is output by the ProtocolDiscovery when it found the observatory.
             completer.complete();
           }
         });
@@ -96,6 +114,7 @@
         verify(
           portForwarder.forward(devicePort, hostPort: anyNamed('hostPort')),
         ).called(1);
+        await mockLogReader.dispose();
         await expectLoggerInterruptEndsTask(task, logger);
         await loggerSubscription.cancel();
       }, overrides: <Type, Generator>{
@@ -104,13 +123,113 @@
         Logger: () => logger,
       });
 
+      testUsingContext('finds all observatory ports and forwards them', () async {
+        testFileSystem.file(testFileSystem.path.join('.packages')).createSync();
+        testFileSystem.file(testFileSystem.path.join('lib', 'main.dart')).createSync();
+        testFileSystem
+          .file(testFileSystem.path.join('build', 'flutter_assets', 'AssetManifest.json'))
+          ..createSync(recursive: true)
+          ..writeAsStringSync('{}');
+
+        when(device.name).thenReturn('MockAndroidDevice');
+        when(device.getLogReader()).thenReturn(mockLogReader);
+
+        final Process dartProcess = MockProcess();
+        final StreamController<List<int>> compilerStdoutController = StreamController<List<int>>();
+
+        when(dartProcess.stdout).thenAnswer((_) => compilerStdoutController.stream);
+        when(dartProcess.stderr)
+          .thenAnswer((_) => Stream<List<int>>.fromFuture(Future<List<int>>.value(const <int>[])));
+
+        when(dartProcess.stdin).thenAnswer((_) => MockStdIn());
+
+        final Completer<int> dartProcessExitCode = Completer<int>();
+        when(dartProcess.exitCode).thenAnswer((_) => dartProcessExitCode.future);
+        when(mockProcessManager.start(any)).thenAnswer((_) => Future<Process>.value(dartProcess));
+
+        testDeviceManager.addDevice(device);
+
+        final List<String> observatoryLogs = <String>[];
+
+        await FakeAsync().run((FakeAsync time) {
+          unawaited(runZoned(() async {
+            final StreamSubscription<String> loggerSubscription = logger.stream.listen((String message) {
+              // The "Observatory URL on device" message is output by the ProtocolDiscovery when it found the observatory.
+              if (message.startsWith('[verbose] Observatory URL on device')) {
+                observatoryLogs.add(message);
+              }
+              if (message == '[stdout] Waiting for a connection from Flutter on MockAndroidDevice...') {
+                observatoryLogs.add(message);
+              }
+              if (message == '[stdout] Lost connection to device.') {
+                observatoryLogs.add(message);
+              }
+              if (message.contains('To hot reload changes while running, press "r". To hot restart (and rebuild state), press "R".')) {
+                observatoryLogs.add(message);
+              }
+            });
+
+            final TestHotRunnerFactory testHotRunnerFactory = TestHotRunnerFactory();
+            final Future<void> task = createTestCommandRunner(
+                AttachCommand(hotRunnerFactory: testHotRunnerFactory)
+              ).run(<String>['attach']);
+
+            // First iteration of the attach loop.
+            mockLogReader.addLine('Observatory listening on http://127.0.0.1:0001');
+            mockLogReader.addLine('Observatory listening on http://127.0.0.1:1234');
+
+            time.elapse(const Duration(milliseconds: 200));
+
+            compilerStdoutController
+              .add(utf8.encode('result abc\nline1\nline2\nabc\nabc /path/to/main.dart.dill 0\n'));
+            time.flushMicrotasks();
+
+            // Second iteration of the attach loop.
+            mockLogReader.addLine('Observatory listening on http://127.0.0.1:0002');
+            mockLogReader.addLine('Observatory listening on http://127.0.0.1:1235');
+
+            time.elapse(const Duration(milliseconds: 200));
+
+            compilerStdoutController
+              .add(utf8.encode('result abc\nline1\nline2\nabc\nabc /path/to/main.dart.dill 0\n'));
+            time.flushMicrotasks();
+
+            dartProcessExitCode.complete(0);
+
+            await loggerSubscription.cancel();
+            await testHotRunnerFactory.exitApp();
+            await task;
+          }));
+        });
+
+        expect(observatoryLogs.length, 7);
+        expect(observatoryLogs[0], '[stdout] Waiting for a connection from Flutter on MockAndroidDevice...');
+        expect(observatoryLogs[1], '[verbose] Observatory URL on device: http://127.0.0.1:1234');
+        expect(observatoryLogs[2], '[stdout] Lost connection to device.');
+        expect(observatoryLogs[3].contains('To hot reload changes while running, press "r". To hot restart (and rebuild state), press "R"'), isTrue);
+        expect(observatoryLogs[4], '[verbose] Observatory URL on device: http://127.0.0.1:1235');
+        expect(observatoryLogs[5], '[stdout] Lost connection to device.');
+        expect(observatoryLogs[6].contains('To hot reload changes while running, press "r". To hot restart (and rebuild state), press "R"'), isTrue);
+
+        verify(portForwarder.forward(1234, hostPort: anyNamed('hostPort'))).called(1);
+        verify(portForwarder.forward(1235, hostPort: anyNamed('hostPort'))).called(1);
+
+      }, overrides: <Type, Generator>{
+        FileSystem: () => testFileSystem,
+        HttpClientFactory: () => () => httpClient,
+        ProcessManager: () => mockProcessManager,
+        Logger: () => logger,
+        VMServiceConnector: () => getFakeVmServiceFactory(
+          vmServiceDoneCompleter: vmServiceDoneCompleter,
+        ),
+      });
+
       testUsingContext('Fails with tool exit on bad Observatory uri', () async {
         when(device.getLogReader()).thenAnswer((_) {
           // Now that the reader is used, start writing messages to it.
-          Timer.run(() {
-            mockLogReader.addLine('Foo');
-            mockLogReader.addLine('Observatory listening on http:/:/127.0.0.1:$devicePort');
-          });
+          mockLogReader.addLine('Foo');
+          mockLogReader.addLine('Observatory listening on http:/:/127.0.0.1:$devicePort');
+          mockLogReader.dispose();
           return mockLogReader;
         });
         testDeviceManager.addDevice(device);
@@ -125,10 +244,8 @@
       testUsingContext('accepts filesystem parameters', () async {
         when(device.getLogReader()).thenAnswer((_) {
           // Now that the reader is used, start writing messages to it.
-          Timer.run(() {
-            mockLogReader.addLine('Foo');
-            mockLogReader.addLine('Observatory listening on http://127.0.0.1:$devicePort');
-          });
+          mockLogReader.addLine('Foo');
+          mockLogReader.addLine('Observatory listening on http://127.0.0.1:$devicePort');
           return mockLogReader;
         });
         testDeviceManager.addDevice(device);
@@ -141,6 +258,8 @@
         final MockHotRunner mockHotRunner = MockHotRunner();
         when(mockHotRunner.attach(appStartedCompleter: anyNamed('appStartedCompleter')))
             .thenAnswer((_) async => 0);
+        when(mockHotRunner.exited).thenReturn(false);
+        when(mockHotRunner.isWaitingForObservatory).thenReturn(false);
 
         final MockHotRunnerFactory mockHotRunnerFactory = MockHotRunnerFactory();
         when(
@@ -204,10 +323,8 @@
       testUsingContext('exits when ipv6 is specified and debug-port is not', () async {
         when(device.getLogReader()).thenAnswer((_) {
           // Now that the reader is used, start writing messages to it.
-          Timer.run(() {
-            mockLogReader.addLine('Foo');
-            mockLogReader.addLine('Observatory listening on http://127.0.0.1:$devicePort');
-          });
+          mockLogReader.addLine('Foo');
+          mockLogReader.addLine('Observatory listening on http://127.0.0.1:$devicePort');
           return mockLogReader;
         });
         testDeviceManager.addDevice(device);
@@ -228,10 +345,8 @@
       testUsingContext('exits when observatory-port is specified and debug-port is not', () async {
         when(device.getLogReader()).thenAnswer((_) {
           // Now that the reader is used, start writing messages to it.
-          Timer.run(() {
-            mockLogReader.addLine('Foo');
-            mockLogReader.addLine('Observatory listening on http://127.0.0.1:$devicePort');
-          });
+          mockLogReader.addLine('Foo');
+          mockLogReader.addLine('Observatory listening on http://127.0.0.1:$devicePort');
           return mockLogReader;
         });
         testDeviceManager.addDevice(device);
@@ -250,7 +365,6 @@
       },);
     });
 
-
     testUsingContext('selects specified target', () async {
       const int devicePort = 499;
       const int hostPort = 42;
@@ -277,16 +391,16 @@
         flutterProject: anyNamed('flutterProject'),
         ipv6: false,
       )).thenReturn(mockHotRunner);
+      when(mockHotRunner.exited).thenReturn(false);
+      when(mockHotRunner.isWaitingForObservatory).thenReturn(false);
 
       testDeviceManager.addDevice(device);
       when(device.getLogReader())
         .thenAnswer((_) {
           // Now that the reader is used, start writing messages to it.
-          Timer.run(() {
-            mockLogReader.addLine('Foo');
-            mockLogReader.addLine(
-                'Observatory listening on http://127.0.0.1:$devicePort');
-          });
+          mockLogReader.addLine('Foo');
+          mockLogReader.addLine(
+              'Observatory listening on http://127.0.0.1:$devicePort');
           return mockLogReader;
         });
       final File foo = fs.file('lib/foo.dart')
@@ -570,3 +684,84 @@
     expect(error.exitCode, 2); // ...with exit code 2.
   }
 }
+
+VMServiceConnector getFakeVmServiceFactory({
+  @required Completer<void> vmServiceDoneCompleter,
+}) {
+  assert(vmServiceDoneCompleter != null);
+
+  return (
+    Uri httpUri, {
+    ReloadSources reloadSources,
+    Restart restart,
+    CompileExpression compileExpression,
+    CompressionOptions compression,
+  }) async {
+    final VMService vmService = VMServiceMock();
+    final VM vm = VMMock();
+
+    when(vmService.vm).thenReturn(vm);
+    when(vmService.isClosed).thenReturn(false);
+    when(vmService.done).thenAnswer((_) {
+      return Future<void>.value(null);
+    });
+
+    when(vm.refreshViews(waitForViews: anyNamed('waitForViews')))
+      .thenAnswer((_) => Future<void>.value(null));
+    when(vm.views)
+      .thenReturn(<FlutterView>[FlutterViewMock()]);
+    when(vm.createDevFS(any))
+      .thenAnswer((_) => Future<Map<String, dynamic>>.value(<String, dynamic>{'uri': '/',}));
+
+    return vmService;
+  };
+}
+
+class TestHotRunnerFactory extends HotRunnerFactory {
+  HotRunner _runner;
+
+  @override
+  HotRunner build(
+    List<FlutterDevice> devices, {
+    String target,
+    DebuggingOptions debuggingOptions,
+    bool benchmarkMode = false,
+    File applicationBinary,
+    bool hostIsIde = false,
+    String projectRootPath,
+    String packagesFilePath,
+    String dillOutputPath,
+    bool stayResident = true,
+    bool ipv6 = false,
+    FlutterProject flutterProject,
+  }) {
+    _runner ??= HotRunner(
+      devices,
+      target: target,
+      debuggingOptions: debuggingOptions,
+      benchmarkMode: benchmarkMode,
+      applicationBinary: applicationBinary,
+      hostIsIde: hostIsIde,
+      projectRootPath: projectRootPath,
+      packagesFilePath: packagesFilePath,
+      dillOutputPath: dillOutputPath,
+      stayResident: stayResident,
+      ipv6: ipv6,
+    );
+    return _runner;
+  }
+
+  Future<void> exitApp() async {
+    assert(_runner != null);
+    await _runner.exit();
+  }
+}
+
+class VMMock extends Mock implements VM {}
+class VMServiceMock extends Mock implements VMService {}
+class FlutterViewMock extends Mock implements FlutterView {}
+class MockProcessManager extends Mock implements ProcessManager {}
+class MockProcess extends Mock implements Process {}
+class MockHttpClientRequest extends Mock implements HttpClientRequest {}
+class MockHttpClientResponse extends Mock implements HttpClientResponse {}
+class MockHttpHeaders extends Mock implements HttpHeaders {}
\ No newline at end of file
diff --git a/packages/flutter_tools/test/general.shard/devfs_test.dart b/packages/flutter_tools/test/general.shard/devfs_test.dart
index bd5a5a6..0298b16 100644
--- a/packages/flutter_tools/test/general.shard/devfs_test.dart
+++ b/packages/flutter_tools/test/general.shard/devfs_test.dart
@@ -10,6 +10,7 @@
 import 'package:file/memory.dart';
 import 'package:flutter_tools/src/base/file_system.dart';
 import 'package:flutter_tools/src/base/io.dart';
+import 'package:flutter_tools/src/base/net.dart';
 import 'package:flutter_tools/src/compile.dart';
 import 'package:flutter_tools/src/devfs.dart';
 import 'package:flutter_tools/src/vmservice.dart';
@@ -159,6 +160,7 @@
       verify(httpRequest.close()).called(kFailedAttempts + 1);
     }, overrides: <Type, Generator>{
       FileSystem: () => fs,
+      HttpClientFactory: () => () => httpClient,
       ProcessManager: () => FakeProcessManager.any(),
     });
   });
@@ -208,6 +210,7 @@
       expect(report.success, true);
     }, overrides: <Type, Generator>{
       FileSystem: () => fs,
+      HttpClient: () => () => HttpClient(),
       ProcessManager: () => FakeProcessManager.any(),
     });
 
@@ -310,6 +313,7 @@
       expect(devFS.lastCompiled, isNot(previousCompile));
     }, overrides: <Type, Generator>{
       FileSystem: () => fs,
+      HttpClient: () => () => HttpClient(),
       ProcessManager: () => FakeProcessManager.any(),
     });
   });
diff --git a/packages/flutter_tools/test/general.shard/protocol_discovery_test.dart b/packages/flutter_tools/test/general.shard/protocol_discovery_test.dart
index db074a0..b8cacfa 100644
--- a/packages/flutter_tools/test/general.shard/protocol_discovery_test.dart
+++ b/packages/flutter_tools/test/general.shard/protocol_discovery_test.dart
@@ -6,6 +6,7 @@
 
 import 'package:flutter_tools/src/device.dart';
 import 'package:flutter_tools/src/protocol_discovery.dart';
+import 'package:quiver/testing/async.dart';
 
 import '../src/common.dart';
 import '../src/context.dart';
@@ -16,40 +17,47 @@
     MockDeviceLogReader logReader;
     ProtocolDiscovery discoverer;
 
+    /// Performs test set-up functionality that must be performed as part of
+    /// the `test()` pass and not part of the `setUp()` pass.
+    ///
+    /// This exists to make sure we're not creating an error that tries to
+    /// cross an error-zone boundary. Our use of `testUsingContext()` runs the
+    /// test code inside an error zone, but the `setUp()` code is not run in
+    /// any zone. This creates the potential for errors that try to cross
+    /// error-zone boundaries, which are considered uncaught.
+    ///
+    /// This also exists for cases where our initialization requires access to
+    /// a `Context` object, which is only set up inside the zone.
+    ///
+    /// These issues do not pertain to real code and are a test-only concern,
+    /// because in real code, the zone is set up in `main()`.
+    ///
+    /// See also: [runZoned]
+    void initialize({
+      int devicePort,
+      Duration throttleDuration = const Duration(milliseconds: 200),
+    }) {
+      logReader = MockDeviceLogReader();
+      discoverer = ProtocolDiscovery.observatory(
+        logReader,
+        ipv6: false,
+        hostPort: null,
+        devicePort: devicePort,
+        throttleDuration: throttleDuration,
+      );
+    }
+
+    testUsingContext('returns non-null uri future', () async {
+      initialize();
+      expect(discoverer.uri, isNotNull);
+    });
+
     group('no port forwarding', () {
-      int devicePort;
-
-      /// Performs test set-up functionality that must be performed as part of
-      /// the `test()` pass and not part of the `setUp()` pass.
-      ///
-      /// This exists to make sure we're not creating an error that tries to
-      /// cross an error-zone boundary. Our use of `testUsingContext()` runs the
-      /// test code inside an error zone, but the `setUp()` code is not run in
-      /// any zone. This creates the potential for errors that try to cross
-      /// error-zone boundaries, which are considered uncaught.
-      ///
-      /// This also exists for cases where our initialization requires access to
-      /// a `Context` object, which is only set up inside the zone.
-      ///
-      /// These issues do not pertain to real code and are a test-only concern,
-      /// because in real code, the zone is set up in `main()`.
-      ///
-      /// See also: [runZoned]
-      void initialize() {
-        logReader = MockDeviceLogReader();
-        discoverer = ProtocolDiscovery.observatory(logReader, ipv6: false, hostPort: null, devicePort: devicePort);
-      }
-
       tearDown(() {
         discoverer.cancel();
         logReader.dispose();
       });
 
-      testUsingContext('returns non-null uri future', () async {
-        initialize();
-        expect(discoverer.uri, isNotNull);
-      });
-
       testUsingContext('discovers uri if logs already produced output', () async {
         initialize();
         logReader.addLine('HELLO WORLD');
@@ -59,6 +67,28 @@
         expect('$uri', 'http://127.0.0.1:9999');
       });
 
+      testUsingContext('discovers uri if logs already produced output and no listener is attached', () async {
+        initialize();
+        logReader.addLine('HELLO WORLD');
+        logReader.addLine('Observatory listening on http://127.0.0.1:9999');
+
+        await Future<void>.delayed(Duration.zero);
+
+        final Uri uri = await discoverer.uri;
+        expect(uri, isNotNull);
+        expect(uri.port, 9999);
+        expect('$uri', 'http://127.0.0.1:9999');
+      });
+
+      testUsingContext('uri throws if logs produce bad line and no listener is attached', () async {
+        initialize();
+        logReader.addLine('Observatory listening on http://127.0.0.1:apple');
+
+        await Future<void>.delayed(Duration.zero);
+
+        expect(discoverer.uri, throwsA(isFormatException));
+      });
+
       testUsingContext('discovers uri if logs not yet produced output', () async {
         initialize();
         final Future<Uri> uriFuture = discoverer.uri;
@@ -78,9 +108,7 @@
 
       testUsingContext('uri throws if logs produce bad line', () async {
         initialize();
-        Timer.run(() {
-          logReader.addLine('Observatory listening on http://127.0.0.1:apple');
-        });
+        logReader.addLine('Observatory listening on http://127.0.0.1:apple');
         expect(discoverer.uri, throwsA(isFormatException));
       });
 
@@ -123,8 +151,7 @@
       });
 
       testUsingContext('skips uri if port does not match the requested vmservice - requested last', () async {
-        devicePort = 12346;
-        initialize();
+        initialize(devicePort: 12346);
         final Future<Uri> uriFuture = discoverer.uri;
         logReader.addLine('I/flutter : Observatory listening on http://127.0.0.1:12345/PTwjm8Ii8qg=/');
         logReader.addLine('I/flutter : Observatory listening on http://127.0.0.1:12346/PTwjm8Ii8qg=/');
@@ -134,8 +161,7 @@
       });
 
       testUsingContext('skips uri if port does not match the requested vmservice - requested first', () async {
-        devicePort = 12346;
-        initialize();
+        initialize(devicePort: 12346);
         final Future<Uri> uriFuture = discoverer.uri;
         logReader.addLine('I/flutter : Observatory listening on http://127.0.0.1:12346/PTwjm8Ii8qg=/');
         logReader.addLine('I/flutter : Observatory listening on http://127.0.0.1:12345/PTwjm8Ii8qg=/');
@@ -143,6 +169,86 @@
         expect(uri.port, 12346);
         expect('$uri', 'http://127.0.0.1:12346/PTwjm8Ii8qg=/');
       });
+
+      testUsingContext('first uri in the stream is the last one from the log', () async {
+        initialize();
+        logReader.addLine('I/flutter : Observatory listening on http://127.0.0.1:12346/PTwjm8Ii8qg=/');
+        logReader.addLine('I/flutter : Observatory listening on http://127.0.0.1:12345/PTwjm8Ii8qg=/');
+        final Uri uri = await discoverer.uris.first;
+        expect(uri.port, 12345);
+        expect('$uri', 'http://127.0.0.1:12345/PTwjm8Ii8qg=/');
+      });
+
+      testUsingContext('first uri in the stream is the last one from the log that matches the port', () async {
+        initialize(devicePort: 12345);
+        logReader.addLine('I/flutter : Observatory listening on http://127.0.0.1:12346/PTwjm8Ii8qg=/');
+        logReader.addLine('I/flutter : Observatory listening on http://127.0.0.1:12345/PTwjm8Ii8qg=/');
+        logReader.addLine('I/flutter : Observatory listening on http://127.0.0.1:12344/PTwjm8Ii8qg=/');
+        final Uri uri = await discoverer.uris.first;
+        expect(uri.port, 12345);
+        expect('$uri', 'http://127.0.0.1:12345/PTwjm8Ii8qg=/');
+      });
+
+      testUsingContext('uris in the stream are throttled', () async {
+        const Duration kThrottleDuration = Duration(milliseconds: 10);
+
+        FakeAsync().run((FakeAsync time) {
+          initialize(throttleDuration: kThrottleDuration);
+
+          final List<Uri> discoveredUris = <Uri>[];
+          discoverer.uris.listen((Uri uri) {
+            discoveredUris.add(uri);
+          });
+
+          logReader.addLine('I/flutter : Observatory listening on http://127.0.0.1:12346/PTwjm8Ii8qg=/');
+          logReader.addLine('I/flutter : Observatory listening on http://127.0.0.1:12345/PTwjm8Ii8qg=/');
+
+          time.elapse(kThrottleDuration);
+
+          logReader.addLine('I/flutter : Observatory listening on http://127.0.0.1:12344/PTwjm8Ii8qg=/');
+          logReader.addLine('I/flutter : Observatory listening on http://127.0.0.1:12343/PTwjm8Ii8qg=/');
+
+          time.elapse(kThrottleDuration);
+
+          expect(discoveredUris.length, 2);
+          expect(discoveredUris[0].port, 12345);
+          expect('${discoveredUris[0]}', 'http://127.0.0.1:12345/PTwjm8Ii8qg=/');
+          expect(discoveredUris[1].port, 12343);
+          expect('${discoveredUris[1]}', 'http://127.0.0.1:12343/PTwjm8Ii8qg=/');
+        });
+      });
+
+      testUsingContext('uris in the stream are throttled when they match the port', () async {
+        const Duration kThrottleTimeInMilliseconds = Duration(milliseconds: 10);
+
+        FakeAsync().run((FakeAsync time) {
+          initialize(
+            devicePort: 12345,
+            throttleDuration: kThrottleTimeInMilliseconds,
+          );
+
+          final List<Uri> discoveredUris = <Uri>[];
+          discoverer.uris.listen((Uri uri) {
+            discoveredUris.add(uri);
+          });
+
+          logReader.addLine('I/flutter : Observatory listening on http://127.0.0.1:12346/PTwjm8Ii8qg=/');
+          logReader.addLine('I/flutter : Observatory listening on http://127.0.0.1:12345/PTwjm8Ii8qg=/');
+
+          time.elapse(kThrottleTimeInMilliseconds);
+
+          logReader.addLine('I/flutter : Observatory listening on http://127.0.0.1:12345/PTwjm8Ii8qc=/');
+          logReader.addLine('I/flutter : Observatory listening on http://127.0.0.1:12344/PTwjm8Ii8qf=/');
+
+          time.elapse(kThrottleTimeInMilliseconds);
+
+          expect(discoveredUris.length, 2);
+          expect(discoveredUris[0].port, 12345);
+          expect('${discoveredUris[0]}', 'http://127.0.0.1:12345/PTwjm8Ii8qg=/');
+          expect(discoveredUris[1].port, 12345);
+          expect('${discoveredUris[1]}', 'http://127.0.0.1:12345/PTwjm8Ii8qc=/');
+        });
+      });
     });
 
     group('port forwarding', () {
@@ -164,7 +270,7 @@
         expect('$uri', 'http://127.0.0.1:99/PTwjm8Ii8qg=/');
 
         await discoverer.cancel();
-        logReader.dispose();
+        await logReader.dispose();
       });
 
       testUsingContext('specified port', () async {
@@ -185,7 +291,7 @@
         expect('$uri', 'http://127.0.0.1:1243/PTwjm8Ii8qg=/');
 
         await discoverer.cancel();
-        logReader.dispose();
+        await logReader.dispose();
       });
 
       testUsingContext('specified port zero', () async {
@@ -206,7 +312,7 @@
         expect('$uri', 'http://127.0.0.1:99/PTwjm8Ii8qg=/');
 
         await discoverer.cancel();
-        logReader.dispose();
+        await logReader.dispose();
       });
 
       testUsingContext('ipv6', () async {
@@ -227,7 +333,7 @@
         expect('$uri', 'http://[::1]:54777/PTwjm8Ii8qg=/');
 
         await discoverer.cancel();
-        logReader.dispose();
+        await logReader.dispose();
       });
 
       testUsingContext('ipv6 with Ascii Escape code', () async {
@@ -248,7 +354,7 @@
         expect('$uri', 'http://[::1]:54777/PTwjm8Ii8qg=/');
 
         await discoverer.cancel();
-        logReader.dispose();
+        await logReader.dispose();
       });
     });
   });
diff --git a/packages/flutter_tools/test/general.shard/resident_runner_test.dart b/packages/flutter_tools/test/general.shard/resident_runner_test.dart
index ceeb55d..00d0e28 100644
--- a/packages/flutter_tools/test/general.shard/resident_runner_test.dart
+++ b/packages/flutter_tools/test/general.shard/resident_runner_test.dart
@@ -95,9 +95,7 @@
     when(mockFlutterView.uiIsolate).thenReturn(mockIsolate);
     when(mockFlutterView.runFromSource(any, any, any)).thenAnswer((Invocation invocation) async {});
     when(mockFlutterDevice.stopEchoingDeviceLog()).thenAnswer((Invocation invocation) async { });
-    when(mockFlutterDevice.observatoryUris).thenReturn(<Uri>[
-      testUri,
-    ]);
+    when(mockFlutterDevice.observatoryUris).thenAnswer((_) => Stream<Uri>.value(testUri));
     when(mockFlutterDevice.connect(
       reloadSources: anyNamed('reloadSources'),
       restart: anyNamed('restart'),
@@ -636,7 +634,7 @@
     final TestFlutterDevice flutterDevice = TestFlutterDevice(
       mockDevice,
       <FlutterView>[],
-      observatoryUris: <Uri>[ testUri ]
+      observatoryUris: Stream<Uri>.value(testUri),
     );
 
     await flutterDevice.connect();
@@ -657,7 +655,7 @@
 class MockProcessManager extends Mock implements ProcessManager {}
 class MockServiceEvent extends Mock implements ServiceEvent {}
 class TestFlutterDevice extends FlutterDevice {
-  TestFlutterDevice(Device device, this.views, { List<Uri> observatoryUris })
+  TestFlutterDevice(Device device, this.views, { Stream<Uri> observatoryUris })
     : super(device, buildMode: BuildMode.debug, trackWidgetCreation: false) {
     _observatoryUris = observatoryUris;
   }
@@ -666,8 +664,8 @@
   final List<FlutterView> views;
 
   @override
-  List<Uri> get observatoryUris => _observatoryUris;
-  List<Uri> _observatoryUris;
+  Stream<Uri> get observatoryUris => _observatoryUris;
+  Stream<Uri> _observatoryUris;
 }
 
 class ThrowingForwardingFileSystem extends ForwardingFileSystem {
diff --git a/packages/flutter_tools/test/src/mocks.dart b/packages/flutter_tools/test/src/mocks.dart
index 9f6c9f1..cde2e80 100644
--- a/packages/flutter_tools/test/src/mocks.dart
+++ b/packages/flutter_tools/test/src/mocks.dart
@@ -534,6 +534,12 @@
   bool isSupported() => true;
 
   @override
+  bool get supportsHotRestart => true;
+
+  @override
+  bool get supportsFlutterExit => false;
+
+  @override
   bool isSupportedForProject(FlutterProject flutterProject) => true;
 }
 
@@ -563,16 +569,33 @@
   @override
   String get name => 'MockLogReader';
 
-  final StreamController<String> _linesController = StreamController<String>.broadcast();
+  StreamController<String> _cachedLinesController;
+
+  final List<String> _lineQueue = <String>[];
+  StreamController<String> get _linesController {
+    _cachedLinesController ??= StreamController<String>
+      .broadcast(onListen: () {
+        _lineQueue.forEach(_linesController.add);
+        _lineQueue.clear();
+     });
+    return _cachedLinesController;
+  }
 
   @override
   Stream<String> get logLines => _linesController.stream;
 
-  void addLine(String line) => _linesController.add(line);
+  void addLine(String line) {
+    if (_linesController.hasListener) {
+      _linesController.add(line);
+    } else {
+      _lineQueue.add(line);
+    }
+  }
 
   @override
-  void dispose() {
-    _linesController.close();
+  Future<void> dispose() async {
+    _lineQueue.clear();
+    await _linesController.close();
   }
 }