[ Tool ] Add `Stream.transformWithCallSite` to provide more useful stack traces (#177470)

Exceptions thrown within a stream transformer don't provide any context
as to where the call to `transform(...)` occurred within the program,
often resulting in stack traces consisting of only Dart SDK sources.

This change adds a new extension method on `Stream` called
`transformWithCallSite`, which captures the current `StackTrace` upon
invocation. This stack trace is reported in the case of an error in
order to provide context for better error reporting.

Example issue: https://github.com/flutter/flutter/issues/81666
diff --git a/packages/flutter_tools/lib/src/android/android_device.dart b/packages/flutter_tools/lib/src/android/android_device.dart
index d197321..9e597d0 100644
--- a/packages/flutter_tools/lib/src/android/android_device.dart
+++ b/packages/flutter_tools/lib/src/android/android_device.dart
@@ -15,6 +15,7 @@
 import '../base/logger.dart';
 import '../base/platform.dart';
 import '../base/process.dart';
+import '../base/utils.dart';
 import '../build_info.dart';
 import '../convert.dart';
 import '../device.dart';
@@ -1118,13 +1119,10 @@
     // We expect logcat streams to occasionally contain invalid utf-8,
     // see: https://github.com/flutter/flutter/pull/8864.
     const decoder = Utf8Decoder(reportErrors: false);
-    _adbProcess.stdout
-        .transform<String>(decoder)
-        .transform<String>(const LineSplitter())
-        .listen(_onLine);
+    _adbProcess.stdout.transform(utf8LineDecoder).listen(_onLine);
     _adbProcess.stderr
-        .transform<String>(decoder)
-        .transform<String>(const LineSplitter())
+        .transformWithCallSite(decoder)
+        .transform(const LineSplitter())
         .listen(_onLine);
     unawaited(
       _adbProcess.exitCode.whenComplete(() {
diff --git a/packages/flutter_tools/lib/src/android/android_emulator.dart b/packages/flutter_tools/lib/src/android/android_emulator.dart
index 126a7d5..e25304c 100644
--- a/packages/flutter_tools/lib/src/android/android_emulator.dart
+++ b/packages/flutter_tools/lib/src/android/android_emulator.dart
@@ -11,7 +11,7 @@
 import '../base/io.dart';
 import '../base/logger.dart';
 import '../base/process.dart';
-import '../convert.dart';
+import '../base/utils.dart';
 import '../device.dart';
 import '../emulator.dart';
 import 'android_sdk.dart';
@@ -159,12 +159,10 @@
     final stdoutList = <String>[];
     final stderrList = <String>[];
     final StreamSubscription<String> stdoutSubscription = process.stdout
-        .transform<String>(utf8.decoder)
-        .transform<String>(const LineSplitter())
+        .transform(utf8LineDecoder)
         .listen(stdoutList.add);
     final StreamSubscription<String> stderrSubscription = process.stderr
-        .transform<String>(utf8.decoder)
-        .transform<String>(const LineSplitter())
+        .transform(utf8LineDecoder)
         .listen(stderrList.add);
     final Future<void> stdioFuture = Future.wait<void>(<Future<void>>[
       stdoutSubscription.asFuture<void>(),
diff --git a/packages/flutter_tools/lib/src/base/process.dart b/packages/flutter_tools/lib/src/base/process.dart
index 5e8fabf..187f5bf 100644
--- a/packages/flutter_tools/lib/src/base/process.dart
+++ b/packages/flutter_tools/lib/src/base/process.dart
@@ -13,6 +13,7 @@
 import 'exit.dart';
 import 'io.dart';
 import 'logger.dart';
+import 'utils.dart';
 
 typedef StringConverter = String? Function(String string);
 
@@ -561,8 +562,7 @@
       environment: environment,
     );
     final StreamSubscription<String> stdoutSubscription = process.stdout
-        .transform<String>(utf8.decoder)
-        .transform<String>(const LineSplitter())
+        .transform(utf8LineDecoder)
         .where((String line) => filter == null || filter.hasMatch(line))
         .listen((String line) {
           String? mappedLine = line;
@@ -581,8 +581,7 @@
           }
         });
     final StreamSubscription<String> stderrSubscription = process.stderr
-        .transform<String>(utf8.decoder)
-        .transform<String>(const LineSplitter())
+        .transform(utf8LineDecoder)
         .where((String line) => filter == null || filter.hasMatch(line))
         .listen((String line) {
           String? mappedLine = line;
diff --git a/packages/flutter_tools/lib/src/base/utils.dart b/packages/flutter_tools/lib/src/base/utils.dart
index 043a801..43bb34d 100644
--- a/packages/flutter_tools/lib/src/base/utils.dart
+++ b/packages/flutter_tools/lib/src/base/utils.dart
@@ -12,6 +12,7 @@
 import 'package:intl/intl.dart';
 import 'package:meta/meta.dart';
 import 'package:path/path.dart' as path; // flutter_ignore: package_path_import
+import 'package:stack_trace/stack_trace.dart';
 
 import '../convert.dart';
 import 'platform.dart';
@@ -580,3 +581,26 @@
     return Uri(scheme: scheme, userInfo: userInfo, host: host, port: port, path: this.path);
   }
 }
+
+extension StackTraceTransform<T> on Stream<T> {
+  /// A custom implementation of [transform] that captures the
+  /// stack trace at the point of invocation.
+  Stream<S> transformWithCallSite<S>(StreamTransformer<T, S> transformer) {
+    // Don't include this frame with the stack trace as it adds no value.
+    final callSiteTrace = Trace.current(1);
+    return transform(transformer).transform(
+      StreamTransformer.fromHandlers(
+        handleData: (data, sink) {
+          sink.add(data);
+        },
+        handleError: (error, stackTrace, sink) {
+          sink.addError(error, callSiteTrace);
+        },
+      ),
+    );
+  }
+}
+
+final utf8LineDecoder = StreamTransformer<List<int>, String>.fromBind(
+  (stream) => stream.transformWithCallSite(utf8.decoder).transform(const LineSplitter()),
+);
diff --git a/packages/flutter_tools/lib/src/commands/symbolize.dart b/packages/flutter_tools/lib/src/commands/symbolize.dart
index 92177ed..54bc92d 100644
--- a/packages/flutter_tools/lib/src/commands/symbolize.dart
+++ b/packages/flutter_tools/lib/src/commands/symbolize.dart
@@ -11,6 +11,7 @@
 import '../base/common.dart';
 import '../base/file_system.dart';
 import '../base/io.dart';
+import '../base/utils.dart';
 import '../convert.dart';
 import '../runner/flutter_command.dart';
 
@@ -175,7 +176,7 @@
       output = outputFile.openWrite();
     } else {
       final outputController = StreamController<List<int>>();
-      outputController.stream.transform(utf8.decoder).listen(_stdio.stdoutWrite);
+      outputController.stream.transformWithCallSite(utf8.decoder).listen(_stdio.stdoutWrite);
       output = IOSink(outputController);
     }
 
@@ -299,9 +300,8 @@
     StreamSubscription<void>? subscription;
     subscription = input
         .cast<List<int>>()
-        .transform(const Utf8Decoder())
-        .transform(const LineSplitter())
-        .transform(unitSymbolsTransformer(unitSymbols))
+        .transform(utf8LineDecoder)
+        .transformWithCallSite(unitSymbolsTransformer(unitSymbols))
         .listen(
           (String line) {
             try {
diff --git a/packages/flutter_tools/lib/src/compile.dart b/packages/flutter_tools/lib/src/compile.dart
index e710151..c878069 100644
--- a/packages/flutter_tools/lib/src/compile.dart
+++ b/packages/flutter_tools/lib/src/compile.dart
@@ -17,6 +17,7 @@
 import 'base/logger.dart';
 import 'base/platform.dart';
 import 'base/process.dart';
+import 'base/utils.dart';
 import 'build_info.dart';
 import 'convert.dart';
 
@@ -378,10 +379,7 @@
     final Process server = await _processManager.start(command);
 
     server.stderr.transform<String>(utf8.decoder).listen(_logger.printError);
-    server.stdout
-        .transform<String>(utf8.decoder)
-        .transform<String>(const LineSplitter())
-        .listen(_stdoutHandler.handler);
+    server.stdout.transform(utf8LineDecoder).listen(_stdoutHandler.handler);
     final int exitCode = await server.exitCode;
     if (exitCode == 0) {
       return _stdoutHandler.compilerOutput?.future;
@@ -893,8 +891,7 @@
     _logger.printTrace(command.join(' '));
     _server = await _processManager.start(command);
     _server?.stdout
-        .transform<String>(utf8.decoder)
-        .transform<String>(const LineSplitter())
+        .transform(utf8LineDecoder)
         .listen(
           _stdoutHandler.handler,
           onDone: () {
@@ -907,10 +904,7 @@
           },
         );
 
-    _server?.stderr
-        .transform<String>(utf8.decoder)
-        .transform<String>(const LineSplitter())
-        .listen(_logger.printError);
+    _server?.stderr.transform(utf8LineDecoder).listen(_logger.printError);
 
     unawaited(
       _server?.exitCode.then((int code) {
diff --git a/packages/flutter_tools/lib/src/dart/analysis.dart b/packages/flutter_tools/lib/src/dart/analysis.dart
index 4c2a3d6..e4bddf3 100644
--- a/packages/flutter_tools/lib/src/dart/analysis.dart
+++ b/packages/flutter_tools/lib/src/dart/analysis.dart
@@ -75,14 +75,10 @@
     // This callback hookup can't throw.
     unawaited(_process!.exitCode.whenComplete(() => _process = null));
 
-    final Stream<String> errorStream = _process!.stderr
-        .transform<String>(utf8.decoder)
-        .transform<String>(const LineSplitter());
+    final Stream<String> errorStream = _process!.stderr.transform(utf8LineDecoder);
     errorStream.listen(_handleError);
 
-    final Stream<String> inStream = _process!.stdout
-        .transform<String>(utf8.decoder)
-        .transform<String>(const LineSplitter());
+    final Stream<String> inStream = _process!.stdout.transform(utf8LineDecoder);
     inStream.listen(_handleServerResponse);
 
     _sendCommand('server.setSubscriptions', <String, dynamic>{
diff --git a/packages/flutter_tools/lib/src/debug_adapters/flutter_base_adapter.dart b/packages/flutter_tools/lib/src/debug_adapters/flutter_base_adapter.dart
index c3e5bfe..a93354b 100644
--- a/packages/flutter_tools/lib/src/debug_adapters/flutter_base_adapter.dart
+++ b/packages/flutter_tools/lib/src/debug_adapters/flutter_base_adapter.dart
@@ -10,6 +10,7 @@
 import '../base/file_system.dart';
 import '../base/io.dart';
 import '../base/platform.dart';
+import '../base/utils.dart';
 import '../cache.dart';
 import '../convert.dart';
 import 'flutter_adapter_args.dart';
@@ -165,8 +166,8 @@
         }(executable, processArgs, env: env);
     this.process = process;
 
-    process.stdout.transform(ByteToLineTransformer()).listen(handleStdout);
-    process.stderr.transform(utf8.decoder).listen(handleStderr);
+    process.stdout.transformWithCallSite(ByteToLineTransformer()).listen(handleStdout);
+    process.stderr.transformWithCallSite(utf8.decoder).listen(handleStderr);
     unawaited(process.exitCode.then(handleExitCode));
   }
 
diff --git a/packages/flutter_tools/lib/src/desktop_device.dart b/packages/flutter_tools/lib/src/desktop_device.dart
index e080ac2..e485068 100644
--- a/packages/flutter_tools/lib/src/desktop_device.dart
+++ b/packages/flutter_tools/lib/src/desktop_device.dart
@@ -11,8 +11,8 @@
 import 'base/io.dart';
 import 'base/logger.dart';
 import 'base/os.dart';
+import 'base/utils.dart';
 import 'build_info.dart';
-import 'convert.dart';
 import 'devfs.dart';
 import 'device.dart';
 import 'device_port_forwarder.dart';
@@ -362,7 +362,7 @@
 
   @override
   Stream<String> get logLines {
-    return _inputController.stream.transform(utf8.decoder).transform(const LineSplitter());
+    return _inputController.stream.transform(utf8LineDecoder);
   }
 
   @override
diff --git a/packages/flutter_tools/lib/src/devtools_launcher.dart b/packages/flutter_tools/lib/src/devtools_launcher.dart
index 23ff51e..24d4fd1 100644
--- a/packages/flutter_tools/lib/src/devtools_launcher.dart
+++ b/packages/flutter_tools/lib/src/devtools_launcher.dart
@@ -12,7 +12,7 @@
 import 'base/common.dart';
 import 'base/io.dart' as io;
 import 'base/logger.dart';
-import 'convert.dart';
+import 'base/utils.dart';
 import 'resident_runner.dart';
 
 /// An implementation of the devtools launcher that uses `dart devtools` to
@@ -66,9 +66,7 @@
       _processStartCompleter.complete();
 
       final devToolsCompleter = Completer<Uri>();
-      _devToolsProcess!.stdout.transform(utf8.decoder).transform(const LineSplitter()).listen((
-        String line,
-      ) {
+      _devToolsProcess!.stdout.transform(utf8LineDecoder).listen((String line) {
         final Match? dtdMatch = _serveDtdPattern.firstMatch(line);
         if (dtdMatch != null) {
           final String uri = dtdMatch[1]!;
@@ -80,10 +78,7 @@
           devToolsCompleter.complete(Uri.parse(url));
         }
       });
-      _devToolsProcess!.stderr
-          .transform(utf8.decoder)
-          .transform(const LineSplitter())
-          .listen(_logger.printError);
+      _devToolsProcess!.stderr.transform(utf8LineDecoder).listen(_logger.printError);
 
       final bool runningOnBot = await _botDetector.isRunningOnBot;
       devToolsProcessExit = _devToolsProcess!.exitCode.then((int exitCode) {
diff --git a/packages/flutter_tools/lib/src/ios/core_devices.dart b/packages/flutter_tools/lib/src/ios/core_devices.dart
index 0083b3e..7607e8e 100644
--- a/packages/flutter_tools/lib/src/ios/core_devices.dart
+++ b/packages/flutter_tools/lib/src/ios/core_devices.dart
@@ -13,6 +13,7 @@
 import '../base/logger.dart';
 import '../base/process.dart';
 import '../base/template.dart';
+import '../base/utils.dart';
 import '../convert.dart';
 import '../device.dart';
 import '../macos/xcode.dart';
@@ -767,8 +768,7 @@
       coreDeviceLogForwarder.launchProcess = launchProcess;
 
       final StreamSubscription<String> stdoutSubscription = launchProcess.stdout
-          .transform<String>(utf8.decoder)
-          .transform<String>(const LineSplitter())
+          .transform(utf8LineDecoder)
           .listen((String line) {
             if (launchCompleter.isCompleted && !_ignoreLog(line)) {
               coreDeviceLogForwarder.addLog(line);
@@ -782,8 +782,7 @@
           });
 
       final StreamSubscription<String> stderrSubscription = launchProcess.stderr
-          .transform<String>(utf8.decoder)
-          .transform<String>(const LineSplitter())
+          .transform(utf8LineDecoder)
           .listen((String line) {
             if (launchCompleter.isCompleted && !_ignoreLog(line)) {
               coreDeviceLogForwarder.addLog(line);
diff --git a/packages/flutter_tools/lib/src/ios/devices.dart b/packages/flutter_tools/lib/src/ios/devices.dart
index 871a69d..69e2d51 100644
--- a/packages/flutter_tools/lib/src/ios/devices.dart
+++ b/packages/flutter_tools/lib/src/ios/devices.dart
@@ -1709,14 +1709,8 @@
       return;
     }
     _iMobileDevice.startLogger(_deviceId, _isWirelesslyConnected).then<void>((Process process) {
-      process.stdout
-          .transform<String>(utf8.decoder)
-          .transform<String>(const LineSplitter())
-          .listen(_newSyslogLineHandler());
-      process.stderr
-          .transform<String>(utf8.decoder)
-          .transform<String>(const LineSplitter())
-          .listen(_newSyslogLineHandler());
+      process.stdout.transform(utf8LineDecoder).listen(_newSyslogLineHandler());
+      process.stderr.transform(utf8LineDecoder).listen(_newSyslogLineHandler());
       process.exitCode.whenComplete(() {
         if (!linesController.hasListener) {
           return;
diff --git a/packages/flutter_tools/lib/src/ios/ios_deploy.dart b/packages/flutter_tools/lib/src/ios/ios_deploy.dart
index 2ec3210..e9de491 100644
--- a/packages/flutter_tools/lib/src/ios/ios_deploy.dart
+++ b/packages/flutter_tools/lib/src/ios/ios_deploy.dart
@@ -13,8 +13,8 @@
 import '../base/logger.dart';
 import '../base/platform.dart';
 import '../base/process.dart';
+import '../base/utils.dart';
 import '../cache.dart';
-import '../convert.dart';
 import '../device.dart';
 import 'code_signing.dart';
 
@@ -316,158 +316,157 @@
     try {
       _iosDeployProcess = await _processUtils.start(_launchCommand, environment: _iosDeployEnv);
       String? lastLineFromDebugger;
-      final StreamSubscription<String> stdoutSubscription = _iosDeployProcess!.stdout
-          .transform<String>(utf8.decoder)
-          .transform<String>(const LineSplitter())
-          .listen((String line) {
-            _monitorIOSDeployFailure(line, _logger);
+      final StreamSubscription<String>
+      stdoutSubscription = _iosDeployProcess!.stdout.transform(utf8LineDecoder).listen((
+        String line,
+      ) {
+        _monitorIOSDeployFailure(line, _logger);
 
-            // (lldb)    platform select remote-'ios' --sysroot
-            // Use the configurable custom lldb prompt in the regex. The developer can set this prompt to anything.
-            // For example `settings set prompt "(mylldb)"` in ~/.lldbinit results in:
-            // "(mylldb)    platform select remote-'ios' --sysroot"
-            if (_lldbPlatformSelect.hasMatch(line)) {
-              final String platformSelect = _lldbPlatformSelect.stringMatch(line) ?? '';
-              if (platformSelect.isEmpty) {
-                return;
-              }
-              final int promptEndIndex = line.indexOf(platformSelect);
-              if (promptEndIndex == -1) {
-                return;
-              }
-              final String prompt = line.substring(0, promptEndIndex);
-              lldbRun = RegExp(RegExp.escape(prompt) + r'\s*run');
-              _logger.printTrace(line);
-              return;
-            }
+        // (lldb)    platform select remote-'ios' --sysroot
+        // Use the configurable custom lldb prompt in the regex. The developer can set this prompt to anything.
+        // For example `settings set prompt "(mylldb)"` in ~/.lldbinit results in:
+        // "(mylldb)    platform select remote-'ios' --sysroot"
+        if (_lldbPlatformSelect.hasMatch(line)) {
+          final String platformSelect = _lldbPlatformSelect.stringMatch(line) ?? '';
+          if (platformSelect.isEmpty) {
+            return;
+          }
+          final int promptEndIndex = line.indexOf(platformSelect);
+          if (promptEndIndex == -1) {
+            return;
+          }
+          final String prompt = line.substring(0, promptEndIndex);
+          lldbRun = RegExp(RegExp.escape(prompt) + r'\s*run');
+          _logger.printTrace(line);
+          return;
+        }
 
-            // Symbol Path: /Users/swarming/Library/Developer/Xcode/iOS DeviceSupport/16.2 (20C65) arm64e/Symbols
-            if (_symbolsPathPattern.hasMatch(line)) {
-              _logger.printTrace('Detected path to iOS debug symbols: "$line"');
-              final String prefix = _symbolsPathPattern.stringMatch(line) ?? '';
-              if (prefix.isEmpty) {
-                return;
-              }
-              symbolsDirectoryPath = line.substring(prefix.length);
-              return;
-            }
+        // Symbol Path: /Users/swarming/Library/Developer/Xcode/iOS DeviceSupport/16.2 (20C65) arm64e/Symbols
+        if (_symbolsPathPattern.hasMatch(line)) {
+          _logger.printTrace('Detected path to iOS debug symbols: "$line"');
+          final String prefix = _symbolsPathPattern.stringMatch(line) ?? '';
+          if (prefix.isEmpty) {
+            return;
+          }
+          symbolsDirectoryPath = line.substring(prefix.length);
+          return;
+        }
 
-            // (lldb)     run
-            // success
-            // 2020-09-15 13:42:25.185474-0700 Runner[477:181141] flutter: The Dart VM service is listening on http://127.0.0.1:57782/
-            if (lldbRun.hasMatch(line)) {
-              _logger.printTrace(line);
-              _debuggerState = _IOSDeployDebuggerState.launching;
-              return;
-            }
-            // Next line after "run" must be "success", or the attach failed.
-            // Example: "error: process launch failed"
-            if (_debuggerState == _IOSDeployDebuggerState.launching) {
-              _logger.printTrace(line);
-              final attachSuccess = line == 'success';
-              _debuggerState = attachSuccess
-                  ? _IOSDeployDebuggerState.attached
-                  : _IOSDeployDebuggerState.detached;
-              if (!debuggerCompleter.isCompleted) {
-                debuggerCompleter.complete(attachSuccess);
-              }
-              return;
-            }
+        // (lldb)     run
+        // success
+        // 2020-09-15 13:42:25.185474-0700 Runner[477:181141] flutter: The Dart VM service is listening on http://127.0.0.1:57782/
+        if (lldbRun.hasMatch(line)) {
+          _logger.printTrace(line);
+          _debuggerState = _IOSDeployDebuggerState.launching;
+          return;
+        }
+        // Next line after "run" must be "success", or the attach failed.
+        // Example: "error: process launch failed"
+        if (_debuggerState == _IOSDeployDebuggerState.launching) {
+          _logger.printTrace(line);
+          final attachSuccess = line == 'success';
+          _debuggerState = attachSuccess
+              ? _IOSDeployDebuggerState.attached
+              : _IOSDeployDebuggerState.detached;
+          if (!debuggerCompleter.isCompleted) {
+            debuggerCompleter.complete(attachSuccess);
+          }
+          return;
+        }
 
-            // (lldb) process signal SIGSTOP
-            // or
-            // process signal SIGSTOP
-            if (line.contains(_signalStop)) {
-              // The app is about to be stopped. Only show in verbose mode.
-              _logger.printTrace(line);
-              return;
-            }
+        // (lldb) process signal SIGSTOP
+        // or
+        // process signal SIGSTOP
+        if (line.contains(_signalStop)) {
+          // The app is about to be stopped. Only show in verbose mode.
+          _logger.printTrace(line);
+          return;
+        }
 
-            // error: Failed to send signal 17: failed to send signal 17
-            if (line.contains(_signalStopError)) {
-              // The stop signal failed, force exit.
-              exit();
-              return;
-            }
+        // error: Failed to send signal 17: failed to send signal 17
+        if (line.contains(_signalStopError)) {
+          // The stop signal failed, force exit.
+          exit();
+          return;
+        }
 
-            if (line == _backTraceAll) {
-              // The app is stopped and the backtrace for all threads will be printed.
-              _logger.printTrace(line);
-              // Even though we're not "detached", just stopped, mark as detached so the backtrace
-              // is only show in verbose.
-              _debuggerState = _IOSDeployDebuggerState.detached;
+        if (line == _backTraceAll) {
+          // The app is stopped and the backtrace for all threads will be printed.
+          _logger.printTrace(line);
+          // Even though we're not "detached", just stopped, mark as detached so the backtrace
+          // is only show in verbose.
+          _debuggerState = _IOSDeployDebuggerState.detached;
 
-              // If we paused the app and are waiting to resume it, complete the completer
-              final Completer<void>? processResumeCompleter = _processResumeCompleter;
-              if (processResumeCompleter != null) {
-                _processResumeCompleter = null;
-                processResumeCompleter.complete();
-              }
-              return;
-            }
+          // If we paused the app and are waiting to resume it, complete the completer
+          final Completer<void>? processResumeCompleter = _processResumeCompleter;
+          if (processResumeCompleter != null) {
+            _processResumeCompleter = null;
+            processResumeCompleter.complete();
+          }
+          return;
+        }
 
-            if (line.contains('PROCESS_STOPPED') || _lldbProcessStopped.hasMatch(line)) {
-              // The app has been stopped. Dump the backtrace, and detach.
-              _logger.printTrace(line);
-              _iosDeployProcess?.stdin.writeln(_backTraceAll);
-              if (_processResumeCompleter == null) {
-                detach();
-              }
-              return;
-            }
+        if (line.contains('PROCESS_STOPPED') || _lldbProcessStopped.hasMatch(line)) {
+          // The app has been stopped. Dump the backtrace, and detach.
+          _logger.printTrace(line);
+          _iosDeployProcess?.stdin.writeln(_backTraceAll);
+          if (_processResumeCompleter == null) {
+            detach();
+          }
+          return;
+        }
 
-            if (line.contains('PROCESS_EXITED') || _lldbProcessExit.hasMatch(line)) {
-              // The app exited or crashed, so exit. Continue passing debugging
-              // messages to the log reader until it exits to capture crash dumps.
-              _logger.printTrace(line);
-              if (line.contains(_lostConnectionPattern)) {
-                _lostConnection = true;
-              }
-              exit();
-              return;
-            }
-            if (_lldbProcessDetached.hasMatch(line)) {
-              // The debugger has detached from the app, and there will be no more debugging messages.
-              // Kill the ios-deploy process.
-              _logger.printTrace(line);
-              exit();
-              return;
-            }
+        if (line.contains('PROCESS_EXITED') || _lldbProcessExit.hasMatch(line)) {
+          // The app exited or crashed, so exit. Continue passing debugging
+          // messages to the log reader until it exits to capture crash dumps.
+          _logger.printTrace(line);
+          if (line.contains(_lostConnectionPattern)) {
+            _lostConnection = true;
+          }
+          exit();
+          return;
+        }
+        if (_lldbProcessDetached.hasMatch(line)) {
+          // The debugger has detached from the app, and there will be no more debugging messages.
+          // Kill the ios-deploy process.
+          _logger.printTrace(line);
+          exit();
+          return;
+        }
 
-            if (_lldbProcessResuming.hasMatch(line)) {
-              _logger.printTrace(line);
-              // we marked this detached when we received [_backTraceAll]
-              _debuggerState = _IOSDeployDebuggerState.attached;
-              return;
-            }
+        if (_lldbProcessResuming.hasMatch(line)) {
+          _logger.printTrace(line);
+          // we marked this detached when we received [_backTraceAll]
+          _debuggerState = _IOSDeployDebuggerState.attached;
+          return;
+        }
 
-            if (_debuggerState != _IOSDeployDebuggerState.attached) {
-              _logger.printTrace(line);
-              return;
-            }
-            if (lastLineFromDebugger != null && lastLineFromDebugger!.isNotEmpty && line.isEmpty) {
-              // The lldb console stream from ios-deploy is separated lines by an extra \r\n.
-              // To avoid all lines being double spaced, if the last line from the
-              // debugger was not an empty line, skip this empty line.
-              // This will still cause "legit" logged newlines to be doubled...
-            } else if (!_debuggerOutput.isClosed) {
-              _debuggerOutput.add(line);
+        if (_debuggerState != _IOSDeployDebuggerState.attached) {
+          _logger.printTrace(line);
+          return;
+        }
+        if (lastLineFromDebugger != null && lastLineFromDebugger!.isNotEmpty && line.isEmpty) {
+          // The lldb console stream from ios-deploy is separated lines by an extra \r\n.
+          // To avoid all lines being double spaced, if the last line from the
+          // debugger was not an empty line, skip this empty line.
+          // This will still cause "legit" logged newlines to be doubled...
+        } else if (!_debuggerOutput.isClosed) {
+          _debuggerOutput.add(line);
 
-              // Sometimes the `ios-deploy` process does not return logs from the
-              // application after attaching, such as the Dart VM url. In CI,
-              // `idevicesyslog` is used as a fallback to get logs. Print a
-              // message to indicate whether logs were received from `ios-deploy`
-              // to help with debugging.
-              if (!receivedLogs) {
-                _logger.printTrace('Received logs from ios-deploy.');
-                receivedLogs = true;
-              }
-            }
-            lastLineFromDebugger = line;
-          });
+          // Sometimes the `ios-deploy` process does not return logs from the
+          // application after attaching, such as the Dart VM url. In CI,
+          // `idevicesyslog` is used as a fallback to get logs. Print a
+          // message to indicate whether logs were received from `ios-deploy`
+          // to help with debugging.
+          if (!receivedLogs) {
+            _logger.printTrace('Received logs from ios-deploy.');
+            receivedLogs = true;
+          }
+        }
+        lastLineFromDebugger = line;
+      });
       final StreamSubscription<String> stderrSubscription = _iosDeployProcess!.stderr
-          .transform<String>(utf8.decoder)
-          .transform<String>(const LineSplitter())
+          .transform(utf8LineDecoder)
           .listen((String line) {
             _monitorIOSDeployFailure(line, _logger);
             _logger.printTrace(line);
diff --git a/packages/flutter_tools/lib/src/ios/lldb.dart b/packages/flutter_tools/lib/src/ios/lldb.dart
index 8da71ee..35d0df3 100644
--- a/packages/flutter_tools/lib/src/ios/lldb.dart
+++ b/packages/flutter_tools/lib/src/ios/lldb.dart
@@ -10,7 +10,7 @@
 import '../base/io.dart';
 import '../base/logger.dart';
 import '../base/process.dart';
-import '../convert.dart';
+import '../base/utils.dart';
 
 /// LLDB is the default debugger in Xcode on macOS. Once the application has
 /// launched on a physical iOS device, you can attach to it using LLDB.
@@ -148,8 +148,7 @@
       );
 
       final StreamSubscription<String> stdoutSubscription = _lldbProcess!.stdout
-          .transform<String>(utf8.decoder)
-          .transform<String>(const LineSplitter())
+          .transform(utf8LineDecoder)
           .listen((String line) {
             if (_isAttached && !_ignoreLog(line)) {
               // Only forwards logs after LLDB is attached. All logs before then are part of the
@@ -163,8 +162,7 @@
           });
 
       final StreamSubscription<String> stderrSubscription = _lldbProcess!.stderr
-          .transform<String>(utf8.decoder)
-          .transform<String>(const LineSplitter())
+          .transform(utf8LineDecoder)
           .listen((String line) {
             _monitorError(line);
             if (_isAttached && !_ignoreLog(line)) {
diff --git a/packages/flutter_tools/lib/src/ios/simulators.dart b/packages/flutter_tools/lib/src/ios/simulators.dart
index 266eeb4..a3deb00 100644
--- a/packages/flutter_tools/lib/src/ios/simulators.dart
+++ b/packages/flutter_tools/lib/src/ios/simulators.dart
@@ -834,40 +834,22 @@
     // Unified logging iOS 11 and greater (introduced in iOS 10).
     if (await device.sdkMajorVersion >= 11) {
       _deviceProcess = await launchDeviceUnifiedLogging(device, _appName);
-      _deviceProcess?.stdout
-          .transform<String>(utf8.decoder)
-          .transform<String>(const LineSplitter())
-          .listen(_onUnifiedLoggingLine);
-      _deviceProcess?.stderr
-          .transform<String>(utf8.decoder)
-          .transform<String>(const LineSplitter())
-          .listen(_onUnifiedLoggingLine);
+      _deviceProcess?.stdout.transform(utf8LineDecoder).listen(_onUnifiedLoggingLine);
+      _deviceProcess?.stderr.transform(utf8LineDecoder).listen(_onUnifiedLoggingLine);
     } else {
       // Fall back to syslog parsing.
       await device.ensureLogsExists();
       _deviceProcess = await launchDeviceSystemLogTool(device);
-      _deviceProcess?.stdout
-          .transform<String>(utf8.decoder)
-          .transform<String>(const LineSplitter())
-          .listen(_onSysLogDeviceLine);
-      _deviceProcess?.stderr
-          .transform<String>(utf8.decoder)
-          .transform<String>(const LineSplitter())
-          .listen(_onSysLogDeviceLine);
+      _deviceProcess?.stdout.transform(utf8LineDecoder).listen(_onSysLogDeviceLine);
+      _deviceProcess?.stderr.transform(utf8LineDecoder).listen(_onSysLogDeviceLine);
     }
 
     // Track system.log crashes.
     // ReportCrash[37965]: Saved crash report for FlutterRunner[37941]...
     _systemProcess = await launchSystemLogTool(device);
     if (_systemProcess != null) {
-      _systemProcess?.stdout
-          .transform<String>(utf8.decoder)
-          .transform<String>(const LineSplitter())
-          .listen(_onSystemLine);
-      _systemProcess?.stderr
-          .transform<String>(utf8.decoder)
-          .transform<String>(const LineSplitter())
-          .listen(_onSystemLine);
+      _systemProcess?.stdout.transform(utf8LineDecoder).listen(_onSystemLine);
+      _systemProcess?.stderr.transform(utf8LineDecoder).listen(_onSystemLine);
     }
 
     // We don't want to wait for the process or its callback. Best effort
diff --git a/packages/flutter_tools/lib/src/ios/xcode_debug.dart b/packages/flutter_tools/lib/src/ios/xcode_debug.dart
index d6bbc12..cc7b615 100644
--- a/packages/flutter_tools/lib/src/ios/xcode_debug.dart
+++ b/packages/flutter_tools/lib/src/ios/xcode_debug.dart
@@ -16,6 +16,7 @@
 import '../base/logger.dart';
 import '../base/process.dart';
 import '../base/template.dart';
+import '../base/utils.dart';
 import '../build_info.dart';
 import '../convert.dart';
 import '../macos/xcode.dart';
@@ -106,36 +107,34 @@
       ]);
 
       final stdoutBuffer = StringBuffer();
-      stdoutSubscription = startDebugActionProcess!.stdout
-          .transform<String>(utf8.decoder)
-          .transform<String>(const LineSplitter())
-          .listen((String line) {
-            _logger.printTrace(line);
-            stdoutBuffer.write(line);
-          });
+      stdoutSubscription = startDebugActionProcess!.stdout.transform(utf8LineDecoder).listen((
+        String line,
+      ) {
+        _logger.printTrace(line);
+        stdoutBuffer.write(line);
+      });
 
       final stderrBuffer = StringBuffer();
       var permissionWarningPrinted = false;
       // console.log from the script are found in the stderr
-      stderrSubscription = startDebugActionProcess!.stderr
-          .transform<String>(utf8.decoder)
-          .transform<String>(const LineSplitter())
-          .listen((String line) {
-            _logger.printTrace('stderr: $line');
-            stderrBuffer.write(line);
+      stderrSubscription = startDebugActionProcess!.stderr.transform(utf8LineDecoder).listen((
+        String line,
+      ) {
+        _logger.printTrace('stderr: $line');
+        stderrBuffer.write(line);
 
-            // This error may occur if Xcode automation has not been allowed.
-            // Example: Failed to get workspace: Error: An error occurred.
-            if (!permissionWarningPrinted &&
-                line.contains('Failed to get workspace') &&
-                line.contains('An error occurred')) {
-              _logger.printError(
-                'There was an error finding the project in Xcode. Ensure permission '
-                'has been given to control Xcode in Settings > Privacy & Security > Automation.',
-              );
-              permissionWarningPrinted = true;
-            }
-          });
+        // This error may occur if Xcode automation has not been allowed.
+        // Example: Failed to get workspace: Error: An error occurred.
+        if (!permissionWarningPrinted &&
+            line.contains('Failed to get workspace') &&
+            line.contains('An error occurred')) {
+          _logger.printError(
+            'There was an error finding the project in Xcode. Ensure permission '
+            'has been given to control Xcode in Settings > Privacy & Security > Automation.',
+          );
+          permissionWarningPrinted = true;
+        }
+      });
 
       final int exitCode = await startDebugActionProcess!.exitCode.whenComplete(() async {
         await stdoutSubscription?.cancel();
diff --git a/packages/flutter_tools/lib/src/macos/xcdevice.dart b/packages/flutter_tools/lib/src/macos/xcdevice.dart
index 86e7f64..3397a95 100644
--- a/packages/flutter_tools/lib/src/macos/xcdevice.dart
+++ b/packages/flutter_tools/lib/src/macos/xcdevice.dart
@@ -14,6 +14,7 @@
 import '../base/logger.dart';
 import '../base/platform.dart';
 import '../base/process.dart';
+import '../base/utils.dart';
 import '../base/version.dart';
 import '../build_info.dart';
 import '../cache.dart';
@@ -287,8 +288,7 @@
     final Process process = await _processUtils.start(cmd);
 
     final StreamSubscription<String> stdoutSubscription = process.stdout
-        .transform<String>(utf8.decoder)
-        .transform<String>(const LineSplitter())
+        .transform(utf8LineDecoder)
         .listen((String line) {
           String? mappedLine = line;
           if (mapFunction != null) {
@@ -300,8 +300,7 @@
           }
         });
     final StreamSubscription<String> stderrSubscription = process.stderr
-        .transform<String>(utf8.decoder)
-        .transform<String>(const LineSplitter())
+        .transform(utf8LineDecoder)
         .listen((String line) {
           String? mappedLine = line;
           if (mapFunction != null) {
diff --git a/packages/flutter_tools/lib/src/protocol_discovery.dart b/packages/flutter_tools/lib/src/protocol_discovery.dart
index 776e680..cd2d4a9 100644
--- a/packages/flutter_tools/lib/src/protocol_discovery.dart
+++ b/packages/flutter_tools/lib/src/protocol_discovery.dart
@@ -6,6 +6,7 @@
 
 import 'base/io.dart';
 import 'base/logger.dart';
+import 'base/utils.dart';
 import 'device.dart';
 import 'device_port_forwarder.dart';
 import 'globals.dart' as globals;
@@ -85,7 +86,7 @@
   /// Port forwarding is only attempted when this is invoked,
   /// for each VM Service URL in the stream.
   Stream<Uri> get uris {
-    final Stream<Uri> uriStream = _uriStreamController.stream.transform(
+    final Stream<Uri> uriStream = _uriStreamController.stream.transformWithCallSite(
       _throttle<Uri>(waitDuration: throttleDuration),
     );
     return uriStream.asyncMap<Uri>(_forwardPort);
diff --git a/packages/flutter_tools/lib/src/test/flutter_tester_device.dart b/packages/flutter_tools/lib/src/test/flutter_tester_device.dart
index 815e574..b3a59c7 100644
--- a/packages/flutter_tools/lib/src/test/flutter_tester_device.dart
+++ b/packages/flutter_tools/lib/src/test/flutter_tester_device.dart
@@ -14,7 +14,7 @@
 import '../base/io.dart';
 import '../base/logger.dart';
 import '../base/platform.dart';
-import '../convert.dart';
+import '../base/utils.dart';
 import '../device.dart';
 import '../globals.dart' as globals;
 import '../native_assets.dart';
@@ -297,8 +297,7 @@
   }) {
     for (final stream in <Stream<List<int>>>[process.stderr, process.stdout]) {
       stream
-          .transform<String>(utf8.decoder)
-          .transform<String>(const LineSplitter())
+          .transform(utf8LineDecoder)
           .listen(
             (String line) async {
               logger.printTrace('test $id: Shell: $line');
diff --git a/packages/flutter_tools/lib/src/test/test_golden_comparator.dart b/packages/flutter_tools/lib/src/test/test_golden_comparator.dart
index a021c9b..fdecea3 100644
--- a/packages/flutter_tools/lib/src/test/test_golden_comparator.dart
+++ b/packages/flutter_tools/lib/src/test/test_golden_comparator.dart
@@ -11,6 +11,7 @@
 import '../base/file_system.dart';
 import '../base/io.dart';
 import '../base/logger.dart';
+import '../base/utils.dart';
 import '../convert.dart';
 import 'test_compiler.dart';
 import 'test_config.dart';
@@ -273,8 +274,7 @@
     // Also parse stdout as a stream of JSON objects.
     streamIterator = StreamIterator<Map<String, dynamic>>(
       process.stdout
-          .transform<String>(utf8.decoder)
-          .transform<String>(const LineSplitter())
+          .transform(utf8LineDecoder)
           .where((String line) {
             logger.printTrace('<<< $line');
             return line.isNotEmpty && line[0] == '{';
@@ -283,9 +283,7 @@
           .cast<Map<String, dynamic>>(),
     );
 
-    process.stderr.transform<String>(utf8.decoder).transform<String>(const LineSplitter()).forEach((
-      String line,
-    ) {
+    process.stderr.transform(utf8LineDecoder).forEach((String line) {
       logger.printError('<<< $line');
     });
   }
diff --git a/packages/flutter_tools/lib/src/web/chrome.dart b/packages/flutter_tools/lib/src/web/chrome.dart
index 3fe8b23..66933c6 100644
--- a/packages/flutter_tools/lib/src/web/chrome.dart
+++ b/packages/flutter_tools/lib/src/web/chrome.dart
@@ -16,7 +16,7 @@
 import '../base/logger.dart';
 import '../base/os.dart';
 import '../base/platform.dart';
-import '../convert.dart';
+import '../base/utils.dart';
 
 /// An environment variable used to override the location of Google Chrome.
 const kChromeEnvironment = 'CHROME_EXECUTABLE';
@@ -303,7 +303,7 @@
     while (true) {
       final Process process = await _processManager.start(args);
 
-      process.stdout.transform(utf8.decoder).transform(const LineSplitter()).listen((String line) {
+      process.stdout.transform(utf8LineDecoder).listen((String line) {
         _logger.printTrace('[CHROME]: $line');
       });
 
@@ -313,8 +313,7 @@
       var shouldRetry = false;
       final errors = <String>[];
       await process.stderr
-          .transform(utf8.decoder)
-          .transform(const LineSplitter())
+          .transform(utf8LineDecoder)
           .map((String line) {
             _logger.printTrace('[CHROME]: $line');
             errors.add('[CHROME]:$line');
diff --git a/packages/flutter_tools/lib/src/widget_preview/dtd_services.dart b/packages/flutter_tools/lib/src/widget_preview/dtd_services.dart
index 64222a5..0487850 100644
--- a/packages/flutter_tools/lib/src/widget_preview/dtd_services.dart
+++ b/packages/flutter_tools/lib/src/widget_preview/dtd_services.dart
@@ -17,6 +17,7 @@
 import '../base/logger.dart';
 import '../base/platform.dart';
 import '../base/process.dart';
+import '../base/utils.dart';
 import '../convert.dart';
 import '../dart/package_map.dart';
 import '../project.dart';
@@ -175,7 +176,7 @@
     // Wait for the DTD connection information.
     final dtdUri = Completer<Uri>();
     late final StreamSubscription<String> sub;
-    sub = _dtdProcess!.stdout.transform(const Utf8Decoder()).listen((String data) async {
+    sub = _dtdProcess!.stdout.transformWithCallSite(utf8.decoder).listen((String data) async {
       await sub.cancel();
       final jsonData = json.decode(data) as Map<String, Object?>;
       if (jsonData case {'tooling_daemon_details': {'uri': final String dtdUriString}}) {
diff --git a/packages/flutter_tools/test/general.shard/utils_test.dart b/packages/flutter_tools/test/general.shard/utils_test.dart
index e55c589..6122452 100644
--- a/packages/flutter_tools/test/general.shard/utils_test.dart
+++ b/packages/flutter_tools/test/general.shard/utils_test.dart
@@ -2,10 +2,14 @@
 // Use of this source code is governed by a BSD-style license that can be
 // found in the LICENSE file.
 
+import 'dart:async';
+import 'dart:convert';
+
 import 'package:flutter_tools/src/base/platform.dart';
 import 'package:flutter_tools/src/base/terminal.dart';
 import 'package:flutter_tools/src/base/utils.dart';
 import 'package:flutter_tools/src/base/version.dart';
+import 'package:stack_trace/stack_trace.dart';
 
 import '../src/common.dart';
 
@@ -521,4 +525,47 @@
       '10.0MB',
     );
   });
+
+  testWithoutContext('Stream.transformWithCallSite', () async {
+    final inputController = StreamController<String>();
+    const jsonMap = <String, Object?>{'foo': 123};
+    const invalidJson = 'Hello world!';
+
+    final validCompleter = Completer<void>();
+    final errorCompleter = Completer<void>();
+
+    // Wrap the callsite of `transformWithCallSite` with a named function to be more confident
+    // that the top frame is the location we expect without actually needing to check exact line
+    // and column numbers.
+    void listenToStream() {
+      inputController.stream
+          .transformWithCallSite(json.decoder)
+          .listen(
+            (result) {
+              expect(validCompleter.isCompleted, false);
+              expect(result, jsonMap);
+              validCompleter.complete();
+            },
+            onError: (Object e, StackTrace st) {
+              expect(errorCompleter.isCompleted, false);
+              expect(e, isA<FormatException>());
+              final trace = Trace.from(st);
+              // Validate that the top stack frame corresponds to where `transformWithCallSite`
+              // was invoked.
+              expect(trace.frames.first.member, 'main.<fn>.listenToStream');
+              errorCompleter.complete();
+            },
+          );
+    }
+
+    listenToStream();
+
+    // Write both valid and invalid JSON to the stream.
+    inputController.add(json.encode(jsonMap));
+    inputController.add(invalidJson);
+
+    await inputController.sink.close();
+    await validCompleter.future;
+    await errorCompleter.future;
+  });
 }