Eagerly wait for the driver extension on FlutterDriver.connect() (#56428)

diff --git a/packages/flutter_driver/lib/src/driver/vmservice_driver.dart b/packages/flutter_driver/lib/src/driver/vmservice_driver.dart
index af9a4fb..10859f5 100644
--- a/packages/flutter_driver/lib/src/driver/vmservice_driver.dart
+++ b/packages/flutter_driver/lib/src/driver/vmservice_driver.dart
@@ -8,7 +8,6 @@
 
 import 'package:file/file.dart' as f;
 import 'package:fuchsia_remote_debug_protocol/fuchsia_remote_debug_protocol.dart' as fuchsia;
-import 'package:json_rpc_2/error_code.dart' as error_code;
 import 'package:json_rpc_2/json_rpc_2.dart' as rpc;
 import 'package:meta/meta.dart';
 import 'package:path/path.dart' as p;
@@ -150,11 +149,37 @@
     }
 
     /// Waits for a signal from the VM service that the extension is registered.
-    /// Returns [_flutterExtensionMethodName]
-    Future<String> waitForServiceExtension() {
-      return isolate.onExtensionAdded.firstWhere((String extension) {
-        return extension == _flutterExtensionMethodName;
-      });
+    ///
+    /// Looks at the list of loaded extensions for the current [isolateRef], as
+    /// well as the stream of added extensions.
+    Future<void> waitForServiceExtension() async {
+      final Future<void> extensionAlreadyAdded = isolateRef
+        .loadRunnable()
+        .then((VMIsolate isolate) async {
+          if (isolate.extensionRpcs.contains(_flutterExtensionMethodName)) {
+            return;
+          }
+          // Never complete. Rely on the stream listener to find the service
+          // extension instead.
+          return Completer<void>().future;
+        });
+
+      final Completer<void> extensionAdded = Completer<void>();
+      StreamSubscription<String> isolateAddedSubscription;
+      isolateAddedSubscription = isolate.onExtensionAdded.listen(
+        (String extensionName) {
+          if (extensionName == _flutterExtensionMethodName) {
+            extensionAdded.complete();
+            isolateAddedSubscription.cancel();
+          }
+        },
+        onError: extensionAdded.completeError,
+        cancelOnError: true);
+
+      await Future.any(<Future<void>>[
+        extensionAlreadyAdded,
+        extensionAdded.future,
+      ]);
     }
 
     /// Tells the Dart VM Service to notify us about "Isolate" events.
@@ -174,25 +199,7 @@
     if (isolate.pauseEvent is VMPauseStartEvent) {
       _log('Isolate is paused at start.');
 
-      // If the isolate is paused at the start, e.g. via the --start-paused
-      // option, then the VM service extension is not registered yet. Wait for
-      // it to be registered.
-      await enableIsolateStreams();
-      final Future<String> whenServiceExtensionReady = waitForServiceExtension();
-      final Future<dynamic> whenResumed = resumeLeniently();
-      await whenResumed;
-
-      _log('Waiting for service extension');
-      // We will never receive the extension event if the user does not
-      // register it. If that happens, show a message but continue waiting.
-      await _warnIfSlow<String>(
-        future: whenServiceExtensionReady,
-        timeout: kUnusuallyLongTimeout,
-        message: 'Flutter Driver extension is taking a long time to become available. '
-            'Ensure your test app (often "lib/main.dart") imports '
-            '"package:flutter_driver/driver_extension.dart" and '
-            'calls enableFlutterDriverExtension() as the first call in main().',
-      );
+      await resumeLeniently();
     } else if (isolate.pauseEvent is VMPauseExitEvent ||
         isolate.pauseEvent is VMPauseBreakpointEvent ||
         isolate.pauseEvent is VMPauseExceptionEvent ||
@@ -210,27 +217,20 @@
       );
     }
 
-    // Invoked checkHealth and try to fix delays in the registration of Service
-    // extensions
-    Future<Health> checkHealth() async {
-      try {
-        // At this point the service extension must be installed. Verify it.
-        return await driver.checkHealth();
-      } on rpc.RpcException catch (e) {
-        if (e.code != error_code.METHOD_NOT_FOUND) {
-          rethrow;
-        }
-        _log(
-            'Check Health failed, try to wait for the service extensions to be '
-                'registered.'
-        );
-        await enableIsolateStreams();
-        await waitForServiceExtension();
-        return driver.checkHealth();
-      }
-    }
+    await enableIsolateStreams();
 
-    final Health health = await checkHealth();
+    // We will never receive the extension event if the user does not register
+    // it. If that happens, show a message but continue waiting.
+    await _warnIfSlow<void>(
+      future: waitForServiceExtension(),
+      timeout: kUnusuallyLongTimeout,
+      message: 'Flutter Driver extension is taking a long time to become available. '
+          'Ensure your test app (often "lib/main.dart") imports '
+          '"package:flutter_driver/driver_extension.dart" and '
+          'calls enableFlutterDriverExtension() as the first call in main().',
+    );
+
+    final Health health = await driver.checkHealth();
     if (health.status != HealthStatus.ok) {
       await client.close();
       throw DriverError('Flutter application health check failed.');
diff --git a/packages/flutter_driver/test/flutter_driver_test.dart b/packages/flutter_driver/test/flutter_driver_test.dart
index f60ad1b..b0d66d4 100644
--- a/packages/flutter_driver/test/flutter_driver_test.dart
+++ b/packages/flutter_driver/test/flutter_driver_test.dart
@@ -49,6 +49,10 @@
       when(mockClient.getVM()).thenAnswer((_) => Future<MockVM>.value(mockVM));
       when(mockVM.isolates).thenReturn(<VMRunnableIsolate>[mockIsolate]);
       when(mockIsolate.loadRunnable()).thenAnswer((_) => Future<MockIsolate>.value(mockIsolate));
+      when(mockIsolate.extensionRpcs).thenReturn(<String>[]);
+      when(mockIsolate.onExtensionAdded).thenAnswer((Invocation invocation) {
+        return Stream<String>.fromIterable(<String>['ext.flutter.driver']);
+      });
       when(mockIsolate.invokeExtension(any, any)).thenAnswer(
           (Invocation invocation) => makeMockResponse(<String, dynamic>{'status': 'ok'}));
       vmServiceConnectFunction = (String url, {Map<String, dynamic> headers}) {
@@ -81,7 +85,7 @@
       final FlutterDriver driver = await FlutterDriver.connect(dartVmServiceUrl: '');
       expect(driver, isNotNull);
       expectLogContains('Isolate is paused at start');
-      expect(connectionLog, <String>['streamListen', 'onExtensionAdded', 'resume']);
+      expect(connectionLog, <String>['resume', 'streamListen', 'onExtensionAdded']);
     });
 
     test('connects to isolate paused mid-flight', () async {
@@ -117,6 +121,18 @@
       expectLogContains('Isolate is not paused. Assuming application is ready.');
     });
 
+    test('connects to unpaused when onExtensionAdded does not contain the '
+      'driver extension', () async {
+      when(mockIsolate.pauseEvent).thenReturn(MockVMResumeEvent());
+      when(mockIsolate.extensionRpcs).thenReturn(<String>['ext.flutter.driver']);
+      when(mockIsolate.onExtensionAdded).thenAnswer((Invocation invocation) {
+        return const Stream<String>.empty();
+      });
+      final FlutterDriver driver = await FlutterDriver.connect(dartVmServiceUrl: '');
+      expect(driver, isNotNull);
+      expectLogContains('Isolate is not paused. Assuming application is ready.');
+    });
+
     test('connects with headers', () async {
       Map<String, dynamic> actualHeaders;
       vmServiceConnectFunction = (String url, {Map<String, dynamic> headers}) {