blob: d6cd5e12ea76d6daeac10308c7e184fb17efbd92 [file] [log] [blame]
// Copyright 2017, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
import 'dart:convert';
import 'dart:io';
import 'package:meta/meta.dart';
/// A shared singleton instance of `dart:io`'s [stdin] stream.
///
/// _Unlike_ the normal [stdin] stream, [sharedStdIn] may switch subscribers
/// as long as the previous subscriber cancels before the new subscriber starts
/// listening.
///
/// [SharedStdIn.terminate] *must* be invoked in order to close the underlying
/// connection to [stdin], allowing your program to close automatically without
/// hanging.
final SharedStdIn sharedStdIn = SharedStdIn(stdin);
/// A singleton wrapper around `stdin` that allows new subscribers.
///
/// This class is visible in order to be used as a test harness for mock
/// implementations of `stdin`. In normal programs, [sharedStdIn] should be
/// used directly.
@visibleForTesting
class SharedStdIn extends Stream<List<int>> {
StreamController<List<int>>? _current;
StreamSubscription<List<int>>? _sub;
SharedStdIn([Stream<List<int>>? stream]) {
_sub = (stream ??= stdin).listen(_onInput);
}
/// Returns a future that completes with the next line.
///
/// This is similar to the standard [Stdin.readLineSync], but asynchronous.
Future<String> nextLine({Encoding encoding = systemEncoding}) =>
lines(encoding: encoding).first;
/// Returns the stream transformed as UTF8 strings separated by line breaks.
///
/// This is similar to synchronous code using [Stdin.readLineSync]:
/// ```dart
/// while (true) {
/// var line = stdin.readLineSync();
/// // ...
/// }
/// ```
///
/// ... but asynchronous.
Stream<String> lines({Encoding encoding = systemEncoding}) =>
transform(utf8.decoder).transform(const LineSplitter());
void _onInput(List<int> event) => _getCurrent().add(event);
StreamController<List<int>> _getCurrent() =>
_current ??= StreamController<List<int>>(
onCancel: () {
_current = null;
},
sync: true);
@override
StreamSubscription<List<int>> listen(
void Function(List<int> event)? onData, {
Function? onError,
void Function()? onDone,
bool? cancelOnError,
}) {
if (_sub == null) {
throw StateError('Stdin has already been terminated.');
}
// ignore: close_sinks
final controller = _getCurrent();
if (controller.hasListener) {
throw StateError(''
'Subscriber already listening. The existing subscriber must cancel '
'before another may be added.');
}
return controller.stream.listen(
onData,
onDone: onDone,
onError: onError,
cancelOnError: cancelOnError,
);
}
/// Terminates the connection to `stdin`, closing all subscription.
Future<void> terminate() async {
if (_sub == null) {
throw StateError('Stdin has already been terminated.');
}
await _sub?.cancel();
await _current?.close();
_sub = null;
}
}