| // Copyright 2014 The Flutter Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| import 'dart:async'; |
| import 'dart:typed_data'; |
| |
| import 'package:meta/meta.dart'; |
| |
| import 'base/common.dart'; |
| import 'base/io.dart'; |
| import 'base/logger.dart'; |
| import 'base/utils.dart'; |
| import 'convert.dart'; |
| |
| /// A single message passed through the [DaemonConnection]. |
| class DaemonMessage { |
| DaemonMessage(this.data, [this.binary]); |
| |
| /// Content of the JSON message in the message. |
| final Map<String, Object?> data; |
| |
| /// Stream of the binary content of the message. |
| /// |
| /// Must be listened to if binary data is present. |
| final Stream<List<int>>? binary; |
| } |
| |
| /// Data of an event passed through the [DaemonConnection]. |
| class DaemonEventData { |
| DaemonEventData(this.eventName, this.data, [this.binary]); |
| |
| /// The name of the event. |
| final String eventName; |
| |
| /// The data of the event. |
| final Object? data; |
| |
| /// Stream of the binary content of the event. |
| /// |
| /// Must be listened to if binary data is present. |
| final Stream<List<int>>? binary; |
| } |
| |
| const String _binaryLengthKey = '_binaryLength'; |
| |
| enum _InputStreamParseState { |
| json, |
| binary, |
| } |
| |
| /// Converts a binary stream to a stream of [DaemonMessage]. |
| /// |
| /// The daemon JSON-RPC protocol is defined as follows: every single line of |
| /// text that starts with `[{` and ends with `}]` will be parsed as a JSON |
| /// message. The array should contain only one single object which contains the |
| /// message data. |
| /// |
| /// If the JSON object contains the key [_binaryLengthKey] with an integer |
| /// value (will be refered to as N), the following N bytes after the newline |
| /// character will contain the binary part of the message. |
| @visibleForTesting |
| class DaemonInputStreamConverter { |
| DaemonInputStreamConverter(this.inputStream) { |
| // Lazily listen to the input stream. |
| _controller.onListen = () { |
| final StreamSubscription<List<int>> subscription = inputStream.listen((List<int> chunk) { |
| _processChunk(chunk); |
| }, onError: (Object error, StackTrace stackTrace) { |
| _controller.addError(error, stackTrace); |
| }, onDone: () { |
| unawaited(_controller.close()); |
| }); |
| |
| _controller.onCancel = subscription.cancel; |
| // We should not handle onPause or onResume. When the stream is paused, we |
| // still need to read from the input stream. |
| }; |
| } |
| |
| final Stream<List<int>> inputStream; |
| |
| final StreamController<DaemonMessage> _controller = StreamController<DaemonMessage>(); |
| Stream<DaemonMessage> get convertedStream => _controller.stream; |
| |
| // Internal states |
| /// The current parse state, whether we are expecting JSON or binary data. |
| _InputStreamParseState state = _InputStreamParseState.json; |
| |
| /// The binary stream that is being transferred. |
| late StreamController<List<int>> currentBinaryStream; |
| |
| /// Remaining length in bytes that have to be sent to the binary stream. |
| int remainingBinaryLength = 0; |
| |
| /// Buffer to hold the current line of input data. |
| final BytesBuilder bytesBuilder = BytesBuilder(copy: false); |
| |
| // Processes a single chunk received in the input stream. |
| void _processChunk(List<int> chunk) { |
| |
| int start = 0; |
| while (start < chunk.length) { |
| if (state == _InputStreamParseState.json) { |
| start += _processChunkInJsonMode(chunk, start); |
| } else if (state == _InputStreamParseState.binary) { |
| final int bytesSent = _addBinaryChunk(chunk, start, remainingBinaryLength); |
| start += bytesSent; |
| remainingBinaryLength -= bytesSent; |
| |
| if (remainingBinaryLength <= 0) { |
| assert(remainingBinaryLength == 0); |
| |
| unawaited(currentBinaryStream.close()); |
| state = _InputStreamParseState.json; |
| } |
| } |
| } |
| } |
| |
| /// Processes a chunk in JSON mode, and returns the number of bytes processed. |
| int _processChunkInJsonMode(List<int> chunk, int start) { |
| const int LF = 10; // The '\n' character |
| |
| // Search for newline character. |
| final int indexOfNewLine = chunk.indexOf(LF, start); |
| if (indexOfNewLine < 0) { |
| bytesBuilder.add(chunk.sublist(start)); |
| return chunk.length - start; |
| } |
| |
| bytesBuilder.add(chunk.sublist(start, indexOfNewLine + 1)); |
| |
| // Process chunk here |
| final Uint8List combinedChunk = bytesBuilder.takeBytes(); |
| String jsonString = utf8.decode(combinedChunk).trim(); |
| if (jsonString.startsWith('[{') && jsonString.endsWith('}]')) { |
| jsonString = jsonString.substring(1, jsonString.length - 1); |
| final Map<String, Object?>? value = castStringKeyedMap(json.decode(jsonString)); |
| if (value != null) { |
| // Check if we need to consume another binary blob. |
| if (value[_binaryLengthKey] != null) { |
| remainingBinaryLength = value[_binaryLengthKey]! as int; |
| currentBinaryStream = StreamController<List<int>>(); |
| state = _InputStreamParseState.binary; |
| _controller.add(DaemonMessage(value, currentBinaryStream.stream)); |
| } else { |
| _controller.add(DaemonMessage(value)); |
| } |
| } |
| } |
| |
| return indexOfNewLine + 1 - start; |
| } |
| |
| int _addBinaryChunk(List<int> chunk, int start, int maximumSizeToRead) { |
| if (start == 0 && chunk.length <= remainingBinaryLength) { |
| currentBinaryStream.add(chunk); |
| return chunk.length; |
| } else { |
| final int chunkRemainingLength = chunk.length - start; |
| final int sizeToRead = chunkRemainingLength < remainingBinaryLength ? chunkRemainingLength : remainingBinaryLength; |
| currentBinaryStream.add(chunk.sublist(start, start + sizeToRead)); |
| return sizeToRead; |
| } |
| } |
| } |
| |
| /// A stream that a [DaemonConnection] uses to communicate with each other. |
| class DaemonStreams { |
| DaemonStreams( |
| Stream<List<int>> rawInputStream, |
| StreamSink<List<int>> outputSink, { |
| required Logger logger, |
| }) : |
| _outputSink = outputSink, |
| inputStream = DaemonInputStreamConverter(rawInputStream).convertedStream, |
| _logger = logger; |
| |
| /// Creates a [DaemonStreams] that uses stdin and stdout as the underlying streams. |
| DaemonStreams.fromStdio(Stdio stdio, { required Logger logger }) |
| : this(stdio.stdin, stdio.stdout, logger: logger); |
| |
| /// Creates a [DaemonStreams] that uses [Socket] as the underlying streams. |
| DaemonStreams.fromSocket(Socket socket, { required Logger logger }) |
| : this(socket, socket, logger: logger); |
| |
| /// Connects to a server and creates a [DaemonStreams] from the connection as the underlying streams. |
| factory DaemonStreams.connect(String host, int port, { required Logger logger }) { |
| final Future<Socket> socketFuture = Socket.connect(host, port); |
| final StreamController<List<int>> inputStreamController = StreamController<List<int>>(); |
| final StreamController<List<int>> outputStreamController = StreamController<List<int>>(); |
| socketFuture.then((Socket socket) { |
| inputStreamController.addStream(socket); |
| socket.addStream(outputStreamController.stream); |
| }).onError((Object error, StackTrace stackTrace) { |
| logger.printError('Socket error: $error'); |
| logger.printTrace('$stackTrace'); |
| // Propagate the error to the streams. |
| inputStreamController.addError(error, stackTrace); |
| unawaited(outputStreamController.close()); |
| }); |
| return DaemonStreams(inputStreamController.stream, outputStreamController.sink, logger: logger); |
| } |
| |
| final StreamSink<List<int>> _outputSink; |
| final Logger _logger; |
| |
| /// Stream that contains input to the [DaemonConnection]. |
| final Stream<DaemonMessage> inputStream; |
| |
| /// Outputs a message through the connection. |
| void send(Map<String, Object?> message, [ List<int>? binary ]) { |
| try { |
| if (binary != null) { |
| message[_binaryLengthKey] = binary.length; |
| } |
| _outputSink.add(utf8.encode('[${json.encode(message)}]\n')); |
| if (binary != null) { |
| _outputSink.add(binary); |
| } |
| } on StateError catch (error) { |
| _logger.printError('Failed to write daemon command response: $error'); |
| // Failed to send, close the connection |
| _outputSink.close(); |
| } on IOException catch (error) { |
| _logger.printError('Failed to write daemon command response: $error'); |
| // Failed to send, close the connection |
| _outputSink.close(); |
| } |
| } |
| |
| /// Cleans up any resources used. |
| Future<void> dispose() async { |
| unawaited(_outputSink.close()); |
| } |
| } |
| |
| /// Connection between a flutter daemon and a client. |
| class DaemonConnection { |
| DaemonConnection({ |
| required DaemonStreams daemonStreams, |
| required Logger logger, |
| }): _logger = logger, |
| _daemonStreams = daemonStreams { |
| _commandSubscription = daemonStreams.inputStream.listen( |
| _handleMessage, |
| onError: (Object error, StackTrace stackTrace) { |
| // We have to listen for on error otherwise the error on the socket |
| // will end up in the Zone error handler. |
| // Do nothing here and let the stream close handlers handle shutting |
| // down the daemon. |
| } |
| ); |
| } |
| |
| final DaemonStreams _daemonStreams; |
| |
| final Logger _logger; |
| |
| late final StreamSubscription<DaemonMessage> _commandSubscription; |
| |
| int _outgoingRequestId = 0; |
| final Map<String, Completer<Object?>> _outgoingRequestCompleters = <String, Completer<Object?>>{}; |
| |
| final StreamController<DaemonEventData> _events = StreamController<DaemonEventData>.broadcast(); |
| final StreamController<DaemonMessage> _incomingCommands = StreamController<DaemonMessage>(); |
| |
| /// A stream that contains all the incoming requests. |
| Stream<DaemonMessage> get incomingCommands => _incomingCommands.stream; |
| |
| /// Listens to the event with the event name [eventToListen]. |
| Stream<DaemonEventData> listenToEvent(String eventToListen) { |
| return _events.stream |
| .where((DaemonEventData event) => event.eventName == eventToListen); |
| } |
| |
| /// Sends a request to the other end of the connection. |
| /// |
| /// Returns a [Future] that resolves with the content. |
| Future<Object?> sendRequest(String method, [Object? params, List<int>? binary]) async { |
| final String id = '${++_outgoingRequestId}'; |
| final Completer<Object?> completer = Completer<Object?>(); |
| _outgoingRequestCompleters[id] = completer; |
| final Map<String, Object?> data = <String, Object?>{ |
| 'id': id, |
| 'method': method, |
| if (params != null) 'params': params, |
| }; |
| _logger.printTrace('-> Sending to daemon, id = $id, method = $method'); |
| _daemonStreams.send(data, binary); |
| return completer.future; |
| } |
| |
| /// Sends a response to the other end of the connection. |
| void sendResponse(Object id, [Object? result]) { |
| _daemonStreams.send(<String, Object?>{ |
| 'id': id, |
| if (result != null) 'result': result, |
| }); |
| } |
| |
| /// Sends an error response to the other end of the connection. |
| void sendErrorResponse(Object id, Object? error, StackTrace trace) { |
| _daemonStreams.send(<String, Object?>{ |
| 'id': id, |
| 'error': error, |
| 'trace': '$trace', |
| }); |
| } |
| |
| /// Sends an event to the client. |
| void sendEvent(String name, [ Object? params, List<int>? binary ]) { |
| _daemonStreams.send(<String, Object?>{ |
| 'event': name, |
| if (params != null) 'params': params, |
| }, binary); |
| } |
| |
| /// Handles the input from the stream. |
| /// |
| /// There are three kinds of data: Request, Response, Event. |
| /// |
| /// Request: |
| /// {"id": <Object>. "method": <String>, "params": <optional, Object?>} |
| /// |
| /// Response: |
| /// {"id": <Object>. "result": <optional, Object?>} for a successful response. |
| /// {"id": <Object>. "error": <Object>, "stackTrace": <String>} for an error response. |
| /// |
| /// Event: |
| /// {"event": <String>. "params": <optional, Object?>} |
| void _handleMessage(DaemonMessage message) { |
| final Map<String, Object?> data = message.data; |
| if (data['id'] != null) { |
| if (data['method'] == null) { |
| // This is a response to previously sent request. |
| final String id = data['id']! as String; |
| if (data['error'] != null) { |
| // This is an error response. |
| _logger.printTrace('<- Error response received from daemon, id = $id'); |
| final Object error = data['error']!; |
| final String stackTrace = data['stackTrace'] as String? ?? ''; |
| _outgoingRequestCompleters.remove(id)?.completeError(error, StackTrace.fromString(stackTrace)); |
| } else { |
| _logger.printTrace('<- Response received from daemon, id = $id'); |
| final Object? result = data['result']; |
| _outgoingRequestCompleters.remove(id)?.complete(result); |
| } |
| } else { |
| _incomingCommands.add(message); |
| } |
| } else if (data['event'] != null) { |
| // This is an event |
| _logger.printTrace('<- Event received: ${data['event']}'); |
| final Object? eventName = data['event']; |
| if (eventName is String) { |
| _events.add(DaemonEventData( |
| eventName, |
| data['params'], |
| message.binary, |
| )); |
| } else { |
| throwToolExit('event name received is not string!'); |
| } |
| } else { |
| _logger.printError('Unknown data received from daemon'); |
| } |
| } |
| |
| /// Cleans up any resources used in the connection. |
| Future<void> dispose() async { |
| await _commandSubscription.cancel(); |
| await _daemonStreams.dispose(); |
| unawaited(_events.close()); |
| unawaited(_incomingCommands.close()); |
| } |
| } |