Fix #144 (#147)

* Fix #144

* Remove deadcode from test

* Removed unused import from test
diff --git a/CHANGELOG.md b/CHANGELOG.md
index e47410c..71ade45 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,7 @@
+## 0.8.7
+
+- Fix `Bucket.write` when size is below 1MB.
+
 ## 0.8.6
 
 - Throttle streams piped into `Bucket.write` when the size is not known
diff --git a/lib/src/storage_impl.dart b/lib/src/storage_impl.dart
index 448c5f1..6251c0c 100644
--- a/lib/src/storage_impl.dart
+++ b/lib/src/storage_impl.dart
@@ -604,7 +604,8 @@
     if (_state == _stateProbingLength) {
       // As the data is already cached don't bother to wait on somebody
       // listening on the stream before adding the data.
-      _startNormalUpload(Stream.value(_buffer.takeBytes()), _buffer.length);
+      final length = _buffer.length;
+      _startNormalUpload(Stream.value(_buffer.takeBytes()), length);
     } else {
       _resumableController.close();
     }
@@ -628,18 +629,22 @@
     _doneCompleter.completeError(e, s);
   }
 
-  void _startNormalUpload(Stream<List<int>> stream, int? length) {
+  void _startNormalUpload(Stream<List<int>> stream, int? length) async {
     var contentType = _object.contentType ?? 'application/octet-stream';
     var media = storage_api.Media(stream, length, contentType: contentType);
-    _api.objects
-        .insert(_object, _bucketName,
-            name: _objectName,
-            predefinedAcl: _predefinedAcl,
-            uploadMedia: media,
-            uploadOptions: storage_api.UploadOptions.defaultOptions)
-        .then((response) {
+    try {
+      final response = await _api.objects.insert(
+        _object,
+        _bucketName,
+        name: _objectName,
+        predefinedAcl: _predefinedAcl,
+        uploadMedia: media,
+        uploadOptions: storage_api.UploadOptions.defaultOptions,
+      );
       _doneCompleter.complete(_ObjectInfoImpl(response));
-    }, onError: _completeError);
+    } catch (e, st) {
+      _completeError(e, st);
+    }
   }
 
   void _startResumableUpload(Stream<List<int>> stream, int? length) {
diff --git a/pubspec.yaml b/pubspec.yaml
index f42336e..3cf8142 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,5 +1,5 @@
 name: gcloud
-version: 0.8.6
+version: 0.8.7
 description: >-
   High level idiomatic Dart API for Google Cloud Storage, Pub-Sub and Datastore.
 repository: https://github.com/dart-lang/gcloud
diff --git a/test/db/e2e/db_test_impl.dart b/test/db/e2e/db_test_impl.dart
index 9e55c54..39cfe26 100644
--- a/test/db/e2e/db_test_impl.dart
+++ b/test/db/e2e/db_test_impl.dart
@@ -47,7 +47,6 @@
 
 import 'package:gcloud/db.dart' as db;
 import 'package:gcloud/src/datastore_impl.dart' as datastore_impl;
-import 'package:http/http.dart';
 import 'package:test/test.dart';
 
 import '../../common_e2e.dart';
@@ -733,7 +732,6 @@
 
 Future main() async {
   late db.DatastoreDB store;
-  BaseClient? client;
 
   var scopes = datastore_impl.DatastoreImpl.scopes;
   await withAuthClient(scopes, (String project, httpClient) {
@@ -743,9 +741,5 @@
     });
   });
 
-  tearDownAll(() {
-    client?.close();
-  });
-
   runTests(store, null);
 }
diff --git a/test/storage/e2e_test.dart b/test/storage/e2e_test.dart
index 7da0deb..ffd8ab2 100644
--- a/test/storage/e2e_test.dart
+++ b/test/storage/e2e_test.dart
@@ -113,32 +113,58 @@
   group('object', () {
     // Run all object tests in the same bucket to try to avoid the rate-limit
     // for creating and deleting buckets while testing.
-    Future withTestBucket(Future Function(Bucket bucket) function) {
-      return function(testBucket).whenComplete(() {
+    Future<T> withTestBucket<T>(Future<T> Function(Bucket bucket) fn) async {
+      try {
+        return await fn(testBucket);
+      } finally {
         // TODO: Clean the bucket.
+      }
+    }
+
+    void testWithBucket(
+      String name,
+      FutureOr<void> Function(Bucket bucket) fn,
+    ) {
+      test(name, () async {
+        try {
+          await fn(testBucket);
+        } finally {
+          // TODO: Clean the bucket.
+        }
       });
     }
 
-    test('create-read-delete', () {
-      Future test(name, List<int> bytes) {
-        return withTestBucket((Bucket bucket) {
-          return bucket.writeBytes('test', bytes).then(expectAsync1((info) {
-            expect(info, isNotNull);
-            return bucket.read('test').fold<List<int>>(
-                [], (p, e) => p..addAll(e)).then(expectAsync1((result) {
-              expect(result, bytes);
-              return bucket.delete('test').then(expectAsync1((result) {
-                expect(result, isNull);
-              }));
-            }));
-          }));
+    group('create-read-delete', () {
+      void testCreateReadDelete(String name, List<int> bytes) {
+        testWithBucket(name, (bucket) async {
+          final info = await bucket.writeBytes('test', bytes);
+          expect(info, isNotNull);
+          final result = await bucket
+              .read('test')
+              .fold<List<int>>([], (p, e) => p..addAll(e));
+          expect(result, bytes);
+          await bucket.delete('test');
         });
       }
 
-      return Future.forEach([
-        () => test('test-1', [1, 2, 3]),
-        () => test('test-2', bytesResumableUpload)
-      ], (Function f) => f().then(expectAsync1((_) {})));
+      testCreateReadDelete('test-1', [1, 2, 3]);
+      testCreateReadDelete('test-2', bytesResumableUpload);
+    });
+
+    group('create-read-delete-streaming', () {
+      void testCreateReadDelete(String name, List<int> bytes) {
+        testWithBucket(name, (bucket) async {
+          await Stream.value(bytes).pipe(bucket.write('test'));
+          final result = await bucket
+              .read('test')
+              .fold<List<int>>([], (p, e) => p..addAll(e));
+          expect(result, bytes);
+          await bucket.delete('test');
+        });
+      }
+
+      testCreateReadDelete('test-1', [1, 2, 3, 5, 6, 7, 8, 9]);
+      testCreateReadDelete('test-2', bytesResumableUpload);
     });
 
     test('create-with-predefined-acl-delete', () {