Retry devfs uploads in case they fail. (#41406)

* Retry devfs uploads in case they fail.

Fixes #34959.

diff --git a/packages/flutter_tools/lib/src/devfs.dart b/packages/flutter_tools/lib/src/devfs.dart
index c4ba45c..8795c4d 100644
--- a/packages/flutter_tools/lib/src/devfs.dart
+++ b/packages/flutter_tools/lib/src/devfs.dart
@@ -289,7 +289,7 @@
     while ((_inFlight < kMaxInFlight) && (!_completer.isCompleted) && _outstanding.isNotEmpty) {
       final Uri deviceUri = _outstanding.keys.first;
       final DevFSContent content = _outstanding.remove(deviceUri);
-      _startWrite(deviceUri, content);
+      _startWrite(deviceUri, content, retry: 10);
       _inFlight += 1;
     }
     if ((_inFlight == 0) && (!_completer.isCompleted) && _outstanding.isEmpty) {
@@ -299,22 +299,33 @@
 
   Future<void> _startWrite(
     Uri deviceUri,
-    DevFSContent content, [
+    DevFSContent content, {
     int retry = 0,
-  ]) async {
-    try {
-      final HttpClientRequest request = await _client.putUrl(httpAddress);
-      request.headers.removeAll(HttpHeaders.acceptEncodingHeader);
-      request.headers.add('dev_fs_name', fsName);
-      request.headers.add('dev_fs_uri_b64', base64.encode(utf8.encode('$deviceUri')));
-      final Stream<List<int>> contents = content.contentsAsCompressedStream();
-      await request.addStream(contents);
-      final HttpClientResponse response = await request.close();
-      await response.drain<void>();
-    } catch (error, trace) {
-      if (!_completer.isCompleted) {
-        printTrace('Error writing "$deviceUri" to DevFS: $error');
-        _completer.completeError(error, trace);
+  }) async {
+    while(true) {
+      try {
+        final HttpClientRequest request = await _client.putUrl(httpAddress);
+        request.headers.removeAll(HttpHeaders.acceptEncodingHeader);
+        request.headers.add('dev_fs_name', fsName);
+        request.headers.add('dev_fs_uri_b64', base64.encode(utf8.encode('$deviceUri')));
+        final Stream<List<int>> contents = content.contentsAsCompressedStream();
+        await request.addStream(contents);
+        final HttpClientResponse response = await request.close();
+        response.listen((_) => null,
+            onError: (dynamic error) { printTrace('error: $error'); },
+            cancelOnError: true);
+        break;
+      } catch (error, trace) {
+        if (!_completer.isCompleted) {
+          printTrace('Error writing "$deviceUri" to DevFS: $error');
+          if (retry > 0) {
+            retry--;
+            printTrace('trying again in a few - $retry more attempts left');
+            await Future<void>.delayed(const Duration(milliseconds: 500));
+            continue;
+          }
+          _completer.completeError(error, trace);
+        }
       }
     }
     _inFlight -= 1;
diff --git a/packages/flutter_tools/test/general.shard/devfs_test.dart b/packages/flutter_tools/test/general.shard/devfs_test.dart
index 90d919a..351d6c8 100644
--- a/packages/flutter_tools/test/general.shard/devfs_test.dart
+++ b/packages/flutter_tools/test/general.shard/devfs_test.dart
@@ -92,6 +92,75 @@
     }, skip: Platform.isWindows); // TODO(jonahwilliams): fix or disable this functionality.
   });
 
+  group('mocked http client', () {
+    HttpOverrides savedHttpOverrides;
+    HttpClient httpClient;
+
+    setUpAll(() {
+      tempDir = _newTempDir(fs);
+      basePath = tempDir.path;
+      savedHttpOverrides = HttpOverrides.current;
+      httpClient = MockOddlyFailingHttpClient();
+      HttpOverrides.global = MyHttpOverrides(httpClient);
+    });
+
+    tearDownAll(() async {
+      HttpOverrides.global = savedHttpOverrides;
+    });
+
+    testUsingContext('retry uploads when failure', () async {
+      final File file = fs.file(fs.path.join(basePath, filePath));
+      await file.parent.create(recursive: true);
+      file.writeAsBytesSync(<int>[1, 2, 3]);
+      // simulate package
+      await _createPackage(fs, 'somepkg', 'somefile.txt');
+
+      final RealMockVMService vmService = RealMockVMService();
+      final RealMockVM vm = RealMockVM();
+      final Map<String, dynamic> response =  <String, dynamic>{ 'uri': 'file://abc' };
+      when(vm.createDevFS(any)).thenAnswer((Invocation invocation) {
+        return Future<Map<String, dynamic>>.value(response);
+      });
+      when(vmService.vm).thenReturn(vm);
+
+      reset(httpClient);
+
+      final MockHttpClientRequest httpRequest = MockHttpClientRequest();
+      when(httpRequest.headers).thenReturn(MockHttpHeaders());
+      when(httpClient.putUrl(any)).thenAnswer((Invocation invocation) {
+        return Future<HttpClientRequest>.value(httpRequest);
+      });
+      final MockHttpClientResponse httpClientResponse = MockHttpClientResponse();
+      int nRequest = 0;
+      const int kFailedAttempts = 5;
+      when(httpRequest.close()).thenAnswer((Invocation invocation) {
+        if (nRequest++ < kFailedAttempts) {
+          throw 'Connection resert by peer';
+        }
+        return Future<HttpClientResponse>.value(httpClientResponse);
+      });
+
+      devFS = DevFS(vmService, 'test', tempDir);
+      await devFS.create();
+
+      final MockResidentCompiler residentCompiler = MockResidentCompiler();
+      final UpdateFSReport report = await devFS.update(
+        mainPath: 'lib/foo.txt',
+        generator: residentCompiler,
+        pathToReload: 'lib/foo.txt.dill',
+        trackWidgetCreation: false,
+        invalidatedFiles: <Uri>[],
+      );
+
+      expect(report.syncedBytes, 22);
+      expect(report.success, isTrue);
+      verify(httpClient.putUrl(any)).called(kFailedAttempts + 1);
+      verify(httpRequest.close()).called(kFailedAttempts + 1);
+    }, overrides: <Type, Generator>{
+      FileSystem: () => fs,
+    });
+  });
+
   group('devfs remote', () {
     MockVMService vmService;
     final MockResidentCompiler residentCompiler = MockResidentCompiler();
@@ -200,7 +269,6 @@
     }, overrides: <Type, Generator>{
       FileSystem: () => fs,
     });
-
   });
 }
 
@@ -326,3 +394,25 @@
   fs.file(fs.path.join(_tempDirs[0].path, '.packages')).writeAsStringSync(sb.toString());
 }
 
+class RealMockVM extends Mock implements VM {
+
+}
+
+class RealMockVMService extends Mock implements VMService {
+
+}
+
+class MyHttpOverrides extends HttpOverrides {
+  MyHttpOverrides(this._httpClient);
+  @override
+  HttpClient createHttpClient(SecurityContext context) {
+    return _httpClient;
+  }
+
+  final HttpClient _httpClient;
+}
+
+class MockOddlyFailingHttpClient extends Mock implements HttpClient {}
+class MockHttpClientRequest extends Mock implements HttpClientRequest {}
+class MockHttpHeaders extends Mock implements HttpHeaders {}
+class MockHttpClientResponse extends Mock implements HttpClientResponse {}
\ No newline at end of file