| // Copyright (C) 2021 The Android Open Source Project |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| import {assertTrue} from '../base/logging'; |
| |
| // This class is the TypeScript equivalent of the identically-named C++ class in |
| // //protozero/proto_ring_buffer.h. See comments in that header for a detailed |
| // description. The architecture is identical. |
| |
| const kGrowBytes = 128 * 1024; |
| const kMaxMsgSize = 1024 * 1024 * 1024; |
| |
| export class ProtoRingBuffer { |
| private buf = new Uint8Array(kGrowBytes); |
| private fastpath?: Uint8Array; |
| private rd = 0; |
| private wr = 0; |
| |
| // The caller must call ReadMessage() after each append() call. |
| // The |data| might be either copied in the internal ring buffer or returned |
| // (% subarray()) to the next ReadMessage() call. |
| append(data: Uint8Array) { |
| assertTrue(this.wr <= this.buf.length); |
| assertTrue(this.rd <= this.wr); |
| |
| // If the last call to ReadMessage() consumed all the data in the buffer and |
| // there are no incomplete messages pending, restart from the beginning |
| // rather than keep ringing. This is the most common case. |
| if (this.rd === this.wr) { |
| this.rd = this.wr = 0; |
| } |
| |
| // The caller is expected to issue a ReadMessage() after each append(). |
| const dataLen = data.length; |
| if (dataLen === 0) return; |
| assertTrue(this.fastpath === undefined); |
| if (this.rd === this.wr) { |
| const msg = ProtoRingBuffer.tryReadMessage(data, 0, dataLen); |
| if ( |
| msg !== undefined && |
| msg.byteOffset + msg.length === data.byteOffset + dataLen |
| ) { |
| // Fastpath: in many cases, the underlying stream will effectively |
| // preserve the atomicity of messages for most small messages. |
| // In this case we can avoid the extra buffer roundtrip and return the |
| // original array (actually a subarray that skips the proto header). |
| // The next call to ReadMessage() will return this. |
| this.fastpath = msg; |
| return; |
| } |
| } |
| |
| let avail = this.buf.length - this.wr; |
| if (dataLen > avail) { |
| // This whole section should be hit extremely rarely. |
| |
| // Try first just recompacting the buffer by moving everything to the |
| // left. This can happen if we received "a message and a bit" on each |
| // append() call. |
| this.buf.copyWithin(0, this.rd, this.wr); |
| avail += this.rd; |
| this.wr -= this.rd; |
| this.rd = 0; |
| if (dataLen > avail) { |
| // Still not enough, expand the buffer. |
| let newSize = this.buf.length; |
| while (dataLen > newSize - this.wr) { |
| newSize += kGrowBytes; |
| } |
| assertTrue(newSize <= kMaxMsgSize * 2); |
| const newBuf = new Uint8Array(newSize); |
| newBuf.set(this.buf); |
| this.buf = newBuf; |
| // No need to touch rd / wr. |
| } |
| } |
| |
| // Append the received data at the end of the ring buffer. |
| this.buf.set(data, this.wr); |
| this.wr += dataLen; |
| } |
| |
| // Tries to extract a message from the ring buffer. If there is no message, |
| // or if the current message is still incomplete, returns undefined. |
| // The caller is expected to call this in a loop until it returns undefined. |
| // Note that a single write to Append() can yield more than one message |
| // (see ProtoRingBufferTest.CoalescingStream in the unittest). |
| readMessage(): Uint8Array | undefined { |
| if (this.fastpath !== undefined) { |
| assertTrue(this.rd === this.wr); |
| const msg = this.fastpath; |
| this.fastpath = undefined; |
| return msg; |
| } |
| assertTrue(this.rd <= this.wr); |
| if (this.rd >= this.wr) { |
| return undefined; // Completely empty. |
| } |
| const msg = ProtoRingBuffer.tryReadMessage(this.buf, this.rd, this.wr); |
| if (msg === undefined) return undefined; |
| assertTrue(msg.buffer === this.buf.buffer); |
| assertTrue(this.buf.byteOffset === 0); |
| this.rd = msg.byteOffset + msg.length; |
| |
| // Deliberately returning a copy of the data with slice(). In various cases |
| // (streaming query response) the caller will hold onto the returned buffer. |
| // If we get to this point, |msg| is a view of the circular buffer that we |
| // will overwrite on the next calls to append(). |
| return msg.slice(); |
| } |
| |
| private static tryReadMessage( |
| data: Uint8Array, |
| dataStart: number, |
| dataEnd: number, |
| ): Uint8Array | undefined { |
| assertTrue(dataEnd <= data.length); |
| let pos = dataStart; |
| if (pos >= dataEnd) return undefined; |
| const tag = data[pos++]; // Assume one-byte tag. |
| if (tag >= 0x80 || (tag & 0x07) !== 2 /* len delimited */) { |
| throw new Error( |
| `RPC framing error, unexpected tag ${tag} @ offset ${pos - 1}`, |
| ); |
| } |
| |
| let len = 0; |
| for (let shift = 0 /* no check */; ; shift += 7) { |
| if (pos >= dataEnd) { |
| return undefined; // Not enough data to read varint. |
| } |
| const val = data[pos++]; |
| len |= ((val & 0x7f) << shift) >>> 0; |
| if (val < 0x80) break; |
| } |
| |
| if (len >= kMaxMsgSize) { |
| throw new Error( |
| `RPC framing error, message too large (${len} > ${kMaxMsgSize}`, |
| ); |
| } |
| const end = pos + len; |
| if (end > dataEnd) return undefined; |
| |
| // This is a subarray() and not a slice() because in the |fastpath| case |
| // we want to just return the original buffer pushed by append(). |
| // In the slow-path (ring-buffer) case, the readMessage() above will create |
| // a copy via slice() before returning it. |
| return data.subarray(pos, end); |
| } |
| } |