| // Copyright (C) 2021 The Android Open Source Project |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| // This file deals with deserialization and iteration of the proto-encoded |
| // byte buffer that is returned by TraceProcessor when invoking the |
| // TPM_QUERY_STREAMING method. The returned |query_result| buffer is optimized |
| // for being moved cheaply across workers and decoded on-the-flight as we step |
| // through the iterator. |
| // See comments around QueryResult in trace_processor.proto for more details. |
| |
| // The classes in this file are organized as follows: |
| // |
| // QueryResultImpl: |
| // The object returned by the Engine.query(sql) method. |
| // This object is a holder of row data. Batches of raw get appended |
| // incrementally as they are received by the remote TraceProcessor instance. |
| // QueryResultImpl also deals with asynchronicity of queries and allows callers |
| // to obtain a promise that waits for more (or all) rows. |
| // At any point in time the following objects hold a reference to QueryResult: |
| // - The Engine: for appending row batches. |
| // - UI code, typically controllers, who make queries. |
| // |
| // ResultBatch: |
| // Hold the data, returned by the remote TraceProcessor instance, for a number |
| // of rows (TP typically chunks the results in batches of 128KB). |
| // A QueryResultImpl holds exclusively ResultBatches for a given query. |
| // ResultBatch is not exposed externally, it's just an internal representation |
| // that helps with proto decoding. ResultBatch is immutable after it gets |
| // appended and decoded. The iteration state is held by the RowIteratorImpl. |
| // |
| // RowIteratorImpl: |
| // Decouples the data owned by QueryResultImpl (and its ResultBatch(es)) from |
| // the iteration state. The iterator effectively is the union of a ResultBatch |
| // and the row number in it. Rows within the batch are decoded as the user calls |
| // next(). When getting at the end of the batch, it takes care of switching to |
| // the next batch (if any) within the QueryResultImpl. |
| // This object is part of the API exposed to tracks / controllers. |
| |
| // Ensure protobuf is initialized. |
| import '../base/static_initializers'; |
| import protobuf from 'protobufjs/minimal'; |
| import {defer, Deferred} from '../base/deferred'; |
| import {assertExists, assertFalse, assertTrue} from '../base/logging'; |
| import {utf8Decode} from '../base/string_utils'; |
| import {Duration, duration, Time, time} from '../base/time'; |
| |
| export type SqlValue = string | number | bigint | null | Uint8Array; |
| // TODO(altimin): Replace ColumnType with SqlValue across the codebase and |
| // remove export here. |
| export type ColumnType = SqlValue; |
| |
| export const UNKNOWN: ColumnType = null; |
| export const NUM = 0; |
| export const STR = 'str'; |
| export const NUM_NULL: number | null = 1; |
| export const STR_NULL: string | null = 'str_null'; |
| export const BLOB: Uint8Array = new Uint8Array(); |
| export const BLOB_NULL: Uint8Array | null = new Uint8Array(); |
| export const LONG: bigint = 0n; |
| export const LONG_NULL: bigint | null = 1n; |
| |
| const SHIFT_32BITS = 32n; |
| |
| // Fast decode varint int64 into a bigint |
| // Inspired by |
| // https://github.com/protobufjs/protobuf.js/blob/56b1e64979dae757b67a21d326e16acee39f2267/src/reader.js#L123 |
| export function decodeInt64Varint(buf: Uint8Array, pos: number): bigint { |
| let hi: number = 0; |
| let lo: number = 0; |
| let i = 0; |
| |
| if (buf.length - pos > 4) { |
| // fast route (lo) |
| for (; i < 4; ++i) { |
| // 1st..4th |
| lo = (lo | ((buf[pos] & 127) << (i * 7))) >>> 0; |
| if (buf[pos++] < 128) { |
| return BigInt(lo); |
| } |
| } |
| // 5th |
| lo = (lo | ((buf[pos] & 127) << 28)) >>> 0; |
| hi = (hi | ((buf[pos] & 127) >> 4)) >>> 0; |
| if (buf[pos++] < 128) { |
| return (BigInt(hi) << SHIFT_32BITS) | BigInt(lo); |
| } |
| i = 0; |
| } else { |
| for (; i < 3; ++i) { |
| if (pos >= buf.length) { |
| throw Error('Index out of range'); |
| } |
| // 1st..3rd |
| lo = (lo | ((buf[pos] & 127) << (i * 7))) >>> 0; |
| if (buf[pos++] < 128) { |
| return BigInt(lo); |
| } |
| } |
| // 4th |
| lo = (lo | ((buf[pos++] & 127) << (i * 7))) >>> 0; |
| return (BigInt(hi) << SHIFT_32BITS) | BigInt(lo); |
| } |
| if (buf.length - pos > 4) { |
| // fast route (hi) |
| for (; i < 5; ++i) { |
| // 6th..10th |
| hi = (hi | ((buf[pos] & 127) << (i * 7 + 3))) >>> 0; |
| if (buf[pos++] < 128) { |
| const big = (BigInt(hi) << SHIFT_32BITS) | BigInt(lo); |
| return BigInt.asIntN(64, big); |
| } |
| } |
| } else { |
| for (; i < 5; ++i) { |
| if (pos >= buf.length) { |
| throw Error('Index out of range'); |
| } |
| // 6th..10th |
| hi = (hi | ((buf[pos] & 127) << (i * 7 + 3))) >>> 0; |
| if (buf[pos++] < 128) { |
| const big = (BigInt(hi) << SHIFT_32BITS) | BigInt(lo); |
| return BigInt.asIntN(64, big); |
| } |
| } |
| } |
| throw Error('invalid varint encoding'); |
| } |
| |
| // Info that could help debug a query error. For example the query |
| // in question, the stack where the query was issued, the active |
| // plugin etc. |
| export interface QueryErrorInfo { |
| query: string; |
| } |
| |
| export class QueryError extends Error { |
| readonly query: string; |
| |
| constructor(message: string, info: QueryErrorInfo) { |
| super(message); |
| this.query = info.query; |
| } |
| |
| toString() { |
| return `${super.toString()}\nQuery:\n${this.query}`; |
| } |
| } |
| |
| // One row extracted from an SQL result: |
| export interface Row { |
| [key: string]: ColumnType; |
| } |
| |
| // The methods that any iterator has to implement. |
| export interface RowIteratorBase { |
| valid(): boolean; |
| next(): void; |
| |
| // Reflection support for cases where the column names are not known upfront |
| // (e.g. the query result table for user-provided SQL queries). |
| // It throws if the passed column name doesn't exist. |
| // Example usage: |
| // for (const it = queryResult.iter({}); it.valid(); it.next()) { |
| // for (const columnName : queryResult.columns()) { |
| // console.log(it.get(columnName)); |
| get(columnName: string): ColumnType; |
| } |
| |
| // A RowIterator is a type that has all the fields defined in the query spec |
| // plus the valid() and next() operators. This is to ultimately allow the |
| // clients to do: |
| // const result = await engine.query("select name, surname, id from people;"); |
| // const iter = queryResult.iter({name: STR, surname: STR, id: NUM}); |
| // for (; iter.valid(); iter.next()) |
| // console.log(iter.name, iter.surname); |
| export type RowIterator<T extends Row> = RowIteratorBase & T; |
| |
| function columnTypeToString(t: ColumnType): string { |
| switch (t) { |
| case NUM: |
| return 'NUM'; |
| case NUM_NULL: |
| return 'NUM_NULL'; |
| case STR: |
| return 'STR'; |
| case STR_NULL: |
| return 'STR_NULL'; |
| case BLOB: |
| return 'BLOB'; |
| case BLOB_NULL: |
| return 'BLOB_NULL'; |
| case LONG: |
| return 'LONG'; |
| case LONG_NULL: |
| return 'LONG_NULL'; |
| case UNKNOWN: |
| return 'UNKNOWN'; |
| default: |
| return `INVALID(${t})`; |
| } |
| } |
| |
| function isCompatible(actual: CellType, expected: ColumnType): boolean { |
| switch (actual) { |
| case CellType.CELL_NULL: |
| return ( |
| expected === NUM_NULL || |
| expected === STR_NULL || |
| expected === BLOB_NULL || |
| expected === LONG_NULL || |
| expected === UNKNOWN |
| ); |
| case CellType.CELL_VARINT: |
| return ( |
| expected === NUM || |
| expected === NUM_NULL || |
| expected === LONG || |
| expected === LONG_NULL || |
| expected === UNKNOWN |
| ); |
| case CellType.CELL_FLOAT64: |
| return expected === NUM || expected === NUM_NULL || expected === UNKNOWN; |
| case CellType.CELL_STRING: |
| return expected === STR || expected === STR_NULL || expected === UNKNOWN; |
| case CellType.CELL_BLOB: |
| return ( |
| expected === BLOB || expected === BLOB_NULL || expected === UNKNOWN |
| ); |
| default: |
| throw new Error(`Unknown CellType ${actual}`); |
| } |
| } |
| |
| // This has to match CellType in trace_processor.proto. |
| enum CellType { |
| CELL_NULL = 1, |
| CELL_VARINT = 2, |
| CELL_FLOAT64 = 3, |
| CELL_STRING = 4, |
| CELL_BLOB = 5, |
| } |
| |
| const CELL_TYPE_NAMES = [ |
| 'UNKNOWN', |
| 'NULL', |
| 'VARINT', |
| 'FLOAT64', |
| 'STRING', |
| 'BLOB', |
| ]; |
| |
| const TAG_LEN_DELIM = 2; |
| |
| // This is the interface exposed to readers (e.g. tracks). The underlying object |
| // (QueryResultImpl) owns the result data. This allows to obtain iterators on |
| // that. In future it will allow to wait for incremental updates (new rows being |
| // fetched) for streaming queries. |
| export interface QueryResult { |
| // Obtains an iterator. |
| // TODO(primiano): this should have an option to destruct data as we read. In |
| // the case of a long query (e.g. `SELECT * FROM sched` in the query prompt) |
| // we don't want to accumulate everything in memory. OTOH UI tracks want to |
| // keep the data around so they can redraw them on each animation frame. For |
| // now we keep everything in memory in the QueryResultImpl object. |
| // iter<T extends Row>(spec: T): RowIterator<T>; |
| iter<T extends Row>(spec: T): RowIterator<T>; |
| |
| // Like iter() for queries that expect only one row. It embeds the valid() |
| // check (i.e. throws if no rows are available) and returns directly the |
| // first result. |
| firstRow<T extends Row>(spec: T): T; |
| |
| // If != undefined the query errored out and error() contains the message. |
| error(): string | undefined; |
| |
| // Returns the number of rows accumulated so far. Note that this number can |
| // change over time as more batches are received. It becomes stable only |
| // when isComplete() returns true or after waitAllRows() is resolved. |
| numRows(): number; |
| |
| // If true all rows have been fetched. Calling iter() will iterate through the |
| // last row. If false, iter() will return an iterator which might iterate |
| // through some rows (or none) but will surely not reach the end. |
| isComplete(): boolean; |
| |
| // Returns a promise that is resolved only when all rows (i.e. all batches) |
| // have been fetched. The promise return value is always the object itself. |
| waitAllRows(): Promise<QueryResult>; |
| |
| // Returns a promise that is resolved when either: |
| // - more rows are available |
| // - all rows are available |
| // The promise return value is always the object iself. |
| waitMoreRows(): Promise<QueryResult>; |
| |
| // Can return an empty array if called before the first batch is resolved. |
| // This should be called only after having awaited for at least one batch. |
| columns(): string[]; |
| |
| // Returns the number of SQL statements in the query |
| // (e.g. 2 'if SELECT 1; SELECT 2;') |
| statementCount(): number; |
| |
| // Returns the number of SQL statement that produced output rows. This number |
| // is <= statementCount(). |
| statementWithOutputCount(): number; |
| |
| // Returns the last SQL statement. |
| lastStatementSql(): string; |
| } |
| |
| // Interface exposed to engine.ts to pump in the data as new row batches arrive. |
| export interface WritableQueryResult extends QueryResult { |
| // |resBytes| is a proto-encoded trace_processor.QueryResult message. |
| // The overall flow looks as follows: |
| // - The user calls engine.query('select ...') and gets a QueryResult back. |
| // - The query call posts a message to the worker that runs the SQL engine ( |
| // or sends a HTTP request in case of the RPC+HTTP interface). |
| // - The returned QueryResult object is initially empty. |
| // - Over time, the sql engine will postMessage() back results in batches. |
| // - Each bach will end up calling this appendResultBatch() method. |
| // - If there is any pending promise (e.g. the caller called |
| // queryResult.waitAllRows()), this call will awake them (if this is the |
| // last batch). |
| appendResultBatch(resBytes: Uint8Array): void; |
| } |
| |
| // The actual implementation, which bridges together the reader side and the |
| // writer side (the one exposed to the Engine). This is the same object so that |
| // when the engine pumps new row batches we can resolve pending promises that |
| // readers (e.g. track code) are waiting for. |
| class QueryResultImpl implements QueryResult, WritableQueryResult { |
| columnNames: string[] = []; |
| private _error?: string; |
| private _numRows = 0; |
| private _isComplete = false; |
| private _errorInfo: QueryErrorInfo; |
| private _statementCount = 0; |
| private _statementWithOutputCount = 0; |
| private _lastStatementSql = ''; |
| |
| constructor(errorInfo: QueryErrorInfo) { |
| this._errorInfo = errorInfo; |
| } |
| |
| // --- QueryResult implementation. |
| |
| // TODO(primiano): for the moment new batches are appended but old batches |
| // are never removed. This won't work with abnormally large result sets, as |
| // it will stash all rows in memory. We could switch to a model where the |
| // iterator is destructive and deletes batch objects once iterating past the |
| // end of each batch. If we do that, than we need to assign monotonic IDs to |
| // batches. Also if we do that, we should prevent creating more than one |
| // iterator for a QueryResult. |
| batches: ResultBatch[] = []; |
| |
| // Promise awaiting on waitAllRows(). This should be resolved only when the |
| // last result batch has been been retrieved. |
| private allRowsPromise?: Deferred<QueryResult>; |
| |
| // Promise awaiting on waitMoreRows(). This resolved when the next |
| // batch is appended via appendResultBatch. |
| private moreRowsPromise?: Deferred<QueryResult>; |
| |
| isComplete(): boolean { |
| return this._isComplete; |
| } |
| numRows(): number { |
| return this._numRows; |
| } |
| error(): string | undefined { |
| return this._error; |
| } |
| columns(): string[] { |
| return this.columnNames; |
| } |
| statementCount(): number { |
| return this._statementCount; |
| } |
| statementWithOutputCount(): number { |
| return this._statementWithOutputCount; |
| } |
| lastStatementSql(): string { |
| return this._lastStatementSql; |
| } |
| |
| iter<T extends Row>(spec: T): RowIterator<T> { |
| const impl = new RowIteratorImplWithRowData(spec, this); |
| return impl as {} as RowIterator<T>; |
| } |
| |
| firstRow<T extends Row>(spec: T): T { |
| const impl = new RowIteratorImplWithRowData(spec, this); |
| assertTrue(impl.valid()); |
| return impl as {} as RowIterator<T> as T; |
| } |
| |
| // Can be called only once. |
| waitAllRows(): Promise<QueryResult> { |
| assertTrue(this.allRowsPromise === undefined); |
| this.allRowsPromise = defer<QueryResult>(); |
| if (this._isComplete) { |
| this.resolveOrReject(this.allRowsPromise, this); |
| } |
| return this.allRowsPromise; |
| } |
| |
| waitMoreRows(): Promise<QueryResult> { |
| if (this.moreRowsPromise !== undefined) { |
| return this.moreRowsPromise; |
| } |
| |
| const moreRowsPromise = defer<QueryResult>(); |
| if (this._isComplete) { |
| this.resolveOrReject(moreRowsPromise, this); |
| } else { |
| this.moreRowsPromise = moreRowsPromise; |
| } |
| return moreRowsPromise; |
| } |
| |
| // --- WritableQueryResult implementation. |
| |
| // Called by the engine when a new QueryResult is available. Note that a |
| // single Query() call can yield >1 QueryResult due to result batching |
| // if more than ~64K of data are returned, e.g. when returning O(M) rows. |
| // |resBytes| is a proto-encoded trace_processor.QueryResult message. |
| // It is fine to retain the resBytes without slicing a copy, because |
| // ProtoRingBuffer does the slice() for us (or passes through the buffer |
| // coming from postMessage() (Wasm case) of fetch() (HTTP+RPC case). |
| appendResultBatch(resBytes: Uint8Array) { |
| const reader = protobuf.Reader.create(resBytes); |
| assertTrue(reader.pos === 0); |
| const columnNamesEmptyAtStartOfBatch = this.columnNames.length === 0; |
| const columnNamesSet = new Set<string>(); |
| while (reader.pos < reader.len) { |
| const tag = reader.uint32(); |
| switch (tag >>> 3) { |
| case 1: // column_names |
| // Only the first batch should contain the column names. If this fires |
| // something is going wrong in the handling of the batch stream. |
| assertTrue(columnNamesEmptyAtStartOfBatch); |
| const origColName = reader.string(); |
| let colName = origColName; |
| // In some rare cases two columns can have the same name (b/194891824) |
| // e.g. `select 1 as x, 2 as x`. These queries don't happen in the |
| // UI code, but they can happen when the user types a query (e.g. |
| // with a join). The most practical thing we can do here is renaming |
| // the columns with a suffix. Keeping the same name will break when |
| // iterating, because column names become iterator object keys. |
| for (let i = 1; columnNamesSet.has(colName); ++i) { |
| colName = `${origColName}_${i}`; |
| assertTrue(i < 100); // Give up at some point; |
| } |
| columnNamesSet.add(colName); |
| this.columnNames.push(colName); |
| break; |
| case 2: // error |
| // The query has errored only if the |error| field is non-empty. |
| // In protos, we don't distinguish between non-present and empty. |
| // Make sure we don't propagate ambiguous empty strings to JS. |
| const err = reader.string(); |
| this._error = err !== undefined && err.length ? err : undefined; |
| break; |
| case 3: // batch |
| const batchLen = reader.uint32(); |
| const batchRaw = resBytes.subarray(reader.pos, reader.pos + batchLen); |
| reader.pos += batchLen; |
| |
| // The ResultBatch ctor parses the CellsBatch submessage. |
| const parsedBatch = new ResultBatch(batchRaw); |
| this.batches.push(parsedBatch); |
| this._isComplete = parsedBatch.isLastBatch; |
| |
| // In theory one could construct a valid proto serializing the column |
| // names after the cell batches. In practice the QueryResultSerializer |
| // doesn't do that so it's not worth complicating the code. |
| const numColumns = this.columnNames.length; |
| if (numColumns !== 0) { |
| assertTrue(parsedBatch.numCells % numColumns === 0); |
| this._numRows += parsedBatch.numCells / numColumns; |
| } else { |
| // numColumns == 0 is plausible for queries like CREATE TABLE ... . |
| assertTrue(parsedBatch.numCells === 0); |
| } |
| break; |
| |
| case 4: |
| this._statementCount = reader.uint32(); |
| break; |
| |
| case 5: |
| this._statementWithOutputCount = reader.uint32(); |
| break; |
| |
| case 6: |
| this._lastStatementSql = reader.string(); |
| break; |
| |
| default: |
| console.warn(`Unexpected QueryResult field ${tag >>> 3}`); |
| reader.skipType(tag & 7); |
| break; |
| } // switch (tag) |
| } // while (pos < end) |
| |
| if (this.moreRowsPromise !== undefined) { |
| this.resolveOrReject(this.moreRowsPromise, this); |
| this.moreRowsPromise = undefined; |
| } |
| |
| if (this._isComplete && this.allRowsPromise !== undefined) { |
| this.resolveOrReject(this.allRowsPromise, this); |
| } |
| } |
| |
| ensureAllRowsPromise(): Promise<QueryResult> { |
| if (this.allRowsPromise === undefined) { |
| this.waitAllRows(); // Will populate |this.allRowsPromise|. |
| } |
| return assertExists(this.allRowsPromise); |
| } |
| |
| get errorInfo(): QueryErrorInfo { |
| return this._errorInfo; |
| } |
| |
| private resolveOrReject(promise: Deferred<QueryResult>, arg: QueryResult) { |
| if (this._error === undefined) { |
| promise.resolve(arg); |
| } else { |
| promise.reject(new QueryError(this._error, this._errorInfo)); |
| } |
| } |
| } |
| |
| // This class holds onto a received result batch (a Uint8Array) and does some |
| // partial parsing to tokenize the various cell groups. This parsing mainly |
| // consists of identifying and caching the offsets of each cell group and |
| // initializing the varint decoders. This half parsing is done to keep the |
| // iterator's next() fast, without decoding everything into memory. |
| // This is an internal implementation detail and is not exposed outside. The |
| // RowIteratorImpl uses this class to iterate through batches (this class takes |
| // care of iterating within a batch, RowIteratorImpl takes care of switching |
| // batches when needed). |
| // Note: at any point in time there can be more than one ResultIterator |
| // referencing the same batch. The batch must be immutable. |
| class ResultBatch { |
| readonly isLastBatch: boolean = false; |
| readonly batchBytes: Uint8Array; |
| readonly cellTypesOff: number = 0; |
| readonly cellTypesLen: number = 0; |
| readonly varintOff: number = 0; |
| readonly varintLen: number = 0; |
| readonly float64Cells = new Float64Array(); |
| readonly blobCells: Uint8Array[] = []; |
| readonly stringCells: string[] = []; |
| |
| // batchBytes is a trace_processor.QueryResult.CellsBatch proto. |
| constructor(batchBytes: Uint8Array) { |
| this.batchBytes = batchBytes; |
| const reader = protobuf.Reader.create(batchBytes); |
| assertTrue(reader.pos === 0); |
| const end = reader.len; |
| |
| // Here we deconstruct the proto by hand. The CellsBatch is carefully |
| // designed to allow a very fast parsing from the TS side. We pack all cells |
| // of the same types together, so we can do only one call (per batch) to |
| // TextDecoder.decode(), we can overlay a memory-aligned typedarray for |
| // float values and can quickly tell and type-check the cell types. |
| // One row = N cells (we know the number upfront from the outer message). |
| // Each bach contains always an integer multiple of N cells (i.e. rows are |
| // never fragmented across different batches). |
| while (reader.pos < end) { |
| const tag = reader.uint32(); |
| switch (tag >>> 3) { |
| case 1: // cell types, a packed array containing one CellType per cell. |
| assertTrue((tag & 7) === TAG_LEN_DELIM); // Must be packed varint. |
| this.cellTypesLen = reader.uint32(); |
| this.cellTypesOff = reader.pos; |
| reader.pos += this.cellTypesLen; |
| break; |
| |
| case 2: // varint_cells, a packed varint buffer. |
| assertTrue((tag & 7) === TAG_LEN_DELIM); // Must be packed varint. |
| const packLen = reader.uint32(); |
| this.varintOff = reader.pos; |
| this.varintLen = packLen; |
| assertTrue(reader.buf === batchBytes); |
| assertTrue( |
| this.varintOff + this.varintLen <= |
| batchBytes.byteOffset + batchBytes.byteLength, |
| ); |
| reader.pos += packLen; |
| break; |
| |
| case 3: // float64_cells, a 64-bit aligned packed fixed64 buffer. |
| assertTrue((tag & 7) === TAG_LEN_DELIM); // Must be packed varint. |
| const f64Len = reader.uint32(); |
| assertTrue(f64Len % 8 === 0); |
| // Float64Array's constructor is evil: the offset is in bytes but the |
| // length is in 8-byte words. |
| const f64Words = f64Len / 8; |
| const f64Off = batchBytes.byteOffset + reader.pos; |
| if (f64Off % 8 === 0) { |
| this.float64Cells = new Float64Array( |
| batchBytes.buffer, |
| f64Off, |
| f64Words, |
| ); |
| } else { |
| // When using the production code in trace_processor's rpc.cc, the |
| // float64 should be 8-bytes aligned. The slow-path case is only for |
| // tests. |
| const slice = batchBytes.buffer.slice(f64Off, f64Off + f64Len); |
| this.float64Cells = new Float64Array(slice); |
| } |
| reader.pos += f64Len; |
| break; |
| |
| case 4: // blob_cells: one entry per blob. |
| assertTrue((tag & 7) === TAG_LEN_DELIM); |
| // protobufjs's bytes() under the hoods calls slice() and creates |
| // a copy. Fine here as blobs are rare and not a fastpath. |
| this.blobCells.push(new Uint8Array(reader.bytes())); |
| break; |
| |
| case 5: // string_cells: all the string cells concatenated with \0s. |
| assertTrue((tag & 7) === TAG_LEN_DELIM); |
| const strLen = reader.uint32(); |
| assertTrue(reader.pos + strLen <= end); |
| const subArr = batchBytes.subarray(reader.pos, reader.pos + strLen); |
| assertTrue(subArr.length === strLen); |
| // The reason why we do this split rather than creating one string |
| // per entry is that utf8 decoding has some non-negligible cost. See |
| // go/postmessage-benchmark . |
| this.stringCells = utf8Decode(subArr).split('\0'); |
| reader.pos += strLen; |
| break; |
| |
| case 6: // is_last_batch (boolean). |
| this.isLastBatch = !!reader.bool(); |
| break; |
| |
| case 7: // padding for realignment, skip silently. |
| reader.skipType(tag & 7); |
| break; |
| |
| default: |
| console.warn(`Unexpected QueryResult.CellsBatch field ${tag >>> 3}`); |
| reader.skipType(tag & 7); |
| break; |
| } // switch(tag) |
| } // while (pos < end) |
| } |
| |
| get numCells() { |
| return this.cellTypesLen; |
| } |
| } |
| |
| class RowIteratorImpl implements RowIteratorBase { |
| // The spec passed to the iter call containing the expected types, e.g.: |
| // {'colA': NUM, 'colB': NUM_NULL, 'colC': STRING}. |
| // This doesn't ever change. |
| readonly rowSpec: Row; |
| |
| // The object that holds the current row. This points to the parent |
| // RowIteratorImplWithRowData instance that created this class. |
| rowData: Row; |
| |
| // The QueryResult object we are reading data from. The engine will pump |
| // batches over time into this object. |
| private resultObj: QueryResultImpl; |
| |
| // All the member variables in the group below point to the identically-named |
| // members in result.batch[batchIdx]. This is to avoid indirection layers in |
| // the next() hotpath, so we can do this.float64Cells vs |
| // this.resultObj.batch[this.batchIdx].float64Cells. |
| // These are re-set every time tryMoveToNextBatch() is called (and succeeds). |
| private batchIdx = -1; // The batch index within |result.batches[]|. |
| private batchBytes = new Uint8Array(); |
| private columnNames: string[] = []; |
| private numColumns = 0; |
| private cellTypesEnd = -1; // -1 so the 1st next() hits tryMoveToNextBatch(). |
| private float64Cells = new Float64Array(); |
| private varIntReader = protobuf.Reader.create(this.batchBytes); |
| private blobCells: Uint8Array[] = []; |
| private stringCells: string[] = []; |
| |
| // These members instead are incremented as we read cells from next(). They |
| // are the mutable state of the iterator. |
| private nextCellTypeOff = 0; |
| private nextFloat64Cell = 0; |
| private nextStringCell = 0; |
| private nextBlobCell = 0; |
| private isValid = false; |
| |
| constructor(querySpec: Row, rowData: Row, res: QueryResultImpl) { |
| Object.assign(this, querySpec); |
| this.rowData = rowData; |
| this.rowSpec = {...querySpec}; // ... -> Copy all the key/value pairs. |
| this.resultObj = res; |
| this.next(); |
| } |
| |
| valid(): boolean { |
| return this.isValid; |
| } |
| |
| private makeError(message: string): QueryError { |
| return new QueryError(message, this.resultObj.errorInfo); |
| } |
| |
| get(columnName: string): ColumnType { |
| const res = this.rowData[columnName]; |
| if (res === undefined) { |
| throw this.makeError( |
| `Column '${columnName}' doesn't exist. ` + |
| `Actual columns: [${this.columnNames.join(',')}]`, |
| ); |
| } |
| return res; |
| } |
| |
| // Moves the cursor next by one row and updates |isValid|. |
| // When this fails to move, two cases are possible: |
| // 1. We reached the end of the result set (this is the case if |
| // QueryResult.isComplete() == true when this fails). |
| // 2. We reached the end of the current batch, but more rows might come later |
| // (if QueryResult.isComplete() == false). |
| next() { |
| // At some point we might reach the end of the current batch, but the next |
| // batch might be available already. In this case we want next() to |
| // transparently move on to the next batch. |
| while (this.nextCellTypeOff + this.numColumns > this.cellTypesEnd) { |
| // If TraceProcessor is behaving well, we should never end up in a |
| // situation where we have leftover cells. TP is expected to serialize |
| // whole rows in each QueryResult batch and NOT truncate them midway. |
| // If this assert fires the TP RPC logic has a bug. |
| assertTrue( |
| this.nextCellTypeOff === this.cellTypesEnd || this.cellTypesEnd === -1, |
| ); |
| if (!this.tryMoveToNextBatch()) { |
| this.isValid = false; |
| return; |
| } |
| } |
| |
| const rowData = this.rowData; |
| const numColumns = this.numColumns; |
| |
| // Read the current row. |
| for (let i = 0; i < numColumns; i++) { |
| const cellType = this.batchBytes[this.nextCellTypeOff++]; |
| const colName = this.columnNames[i]; |
| const expType = this.rowSpec[colName]; |
| |
| switch (cellType) { |
| case CellType.CELL_NULL: |
| rowData[colName] = null; |
| break; |
| |
| case CellType.CELL_VARINT: |
| if (expType === NUM || expType === NUM_NULL) { |
| // This is very subtle. The return type of int64 can be either a |
| // number or a Long.js {high:number, low:number} if Long.js is |
| // installed. The default state seems different in node and browser. |
| // We force-disable Long.js support in the top of this source file. |
| const val = this.varIntReader.int64(); |
| rowData[colName] = val as {} as number; |
| } else { |
| // LONG, LONG_NULL, or unspecified - return as bigint |
| const value = decodeInt64Varint( |
| this.batchBytes, |
| this.varIntReader.pos, |
| ); |
| rowData[colName] = value; |
| this.varIntReader.skip(); // Skips a varint |
| } |
| break; |
| |
| case CellType.CELL_FLOAT64: |
| rowData[colName] = this.float64Cells[this.nextFloat64Cell++]; |
| break; |
| |
| case CellType.CELL_STRING: |
| rowData[colName] = this.stringCells[this.nextStringCell++]; |
| break; |
| |
| case CellType.CELL_BLOB: |
| const blob = this.blobCells[this.nextBlobCell++]; |
| rowData[colName] = blob; |
| break; |
| |
| default: |
| throw this.makeError(`Invalid cell type ${cellType}`); |
| } |
| } // For (cells) |
| this.isValid = true; |
| } |
| |
| private tryMoveToNextBatch(): boolean { |
| const nextBatchIdx = this.batchIdx + 1; |
| if (nextBatchIdx >= this.resultObj.batches.length) { |
| return false; |
| } |
| |
| this.columnNames = this.resultObj.columnNames; |
| this.numColumns = this.columnNames.length; |
| |
| this.batchIdx = nextBatchIdx; |
| const batch = assertExists(this.resultObj.batches[nextBatchIdx]); |
| this.batchBytes = batch.batchBytes; |
| this.nextCellTypeOff = batch.cellTypesOff; |
| this.cellTypesEnd = batch.cellTypesOff + batch.cellTypesLen; |
| this.float64Cells = batch.float64Cells; |
| this.blobCells = batch.blobCells; |
| this.stringCells = batch.stringCells; |
| this.varIntReader = protobuf.Reader.create(batch.batchBytes); |
| this.varIntReader.pos = batch.varintOff; |
| this.varIntReader.len = batch.varintOff + batch.varintLen; |
| this.nextFloat64Cell = 0; |
| this.nextStringCell = 0; |
| this.nextBlobCell = 0; |
| |
| // Check that all the expected columns are present. |
| for (const expectedCol of Object.keys(this.rowSpec)) { |
| if (this.columnNames.indexOf(expectedCol) < 0) { |
| throw this.makeError( |
| `Column ${expectedCol} not found in the SQL result ` + |
| `set {${this.columnNames.join(' ')}}`, |
| ); |
| } |
| } |
| |
| // Check that the cells types are consistent. |
| const numColumns = this.numColumns; |
| if (batch.numCells === 0) { |
| // This can happen if the query result contains just an error. In this |
| // an empty batch with isLastBatch=true is appended as an EOF marker. |
| // In theory TraceProcessor could return an empty batch in the middle and |
| // that would be fine from a protocol viewpoint. In practice, no code path |
| // does that today so it doesn't make sense trying supporting it with a |
| // recursive call to tryMoveToNextBatch(). |
| assertTrue(batch.isLastBatch); |
| return false; |
| } |
| |
| assertTrue(numColumns > 0); |
| for (let i = this.nextCellTypeOff; i < this.cellTypesEnd; i++) { |
| const col = (i - this.nextCellTypeOff) % numColumns; |
| const colName = this.columnNames[col]; |
| const actualType = this.batchBytes[i] as CellType; |
| const expType = this.rowSpec[colName]; |
| |
| // If undefined, the caller doesn't want to read this column at all, so |
| // it can be whatever. |
| if (expType === undefined) continue; |
| |
| let err = ''; |
| if (!isCompatible(actualType, expType)) { |
| if (actualType === CellType.CELL_NULL) { |
| err = |
| 'SQL value is NULL but that was not expected' + |
| ` (expected type: ${columnTypeToString(expType)}). ` + |
| 'Did you mean NUM_NULL, LONG_NULL, STR_NULL or BLOB_NULL?'; |
| } else { |
| err = `Incompatible cell type. Expected: ${columnTypeToString( |
| expType, |
| )} actual: ${CELL_TYPE_NAMES[actualType]}`; |
| } |
| } |
| if (err.length > 0) { |
| const row = Math.floor(i / numColumns); |
| const message = `Error @ row: ${row} col: '${colName}': ${err}`; |
| throw this.makeError(message); |
| } |
| } |
| return true; |
| } |
| } |
| |
| // This is the object ultimately returned to the client when calling |
| // QueryResult.iter(...). |
| // The only reason why this is disjoint from RowIteratorImpl is to avoid |
| // naming collisions between the members variables required by RowIteratorImpl |
| // and the column names returned by the iterator. |
| class RowIteratorImplWithRowData implements RowIteratorBase { |
| private _impl: RowIteratorImpl; |
| |
| next: () => void; |
| valid: () => boolean; |
| get: (columnName: string) => ColumnType; |
| |
| constructor(querySpec: Row, res: QueryResultImpl) { |
| const thisAsRow = this as {} as Row; |
| Object.assign(thisAsRow, querySpec); |
| this._impl = new RowIteratorImpl(querySpec, thisAsRow, res); |
| this.next = this._impl.next.bind(this._impl); |
| this.valid = this._impl.valid.bind(this._impl); |
| this.get = this._impl.get.bind(this._impl); |
| } |
| } |
| |
| // This is a proxy object that wraps QueryResultImpl, adding await-ability. |
| // This is so that: |
| // 1. Clients that just want to await for the full result set can just call |
| // await engine.query('...') and will get a QueryResult that is guaranteed |
| // to be complete. |
| // 2. Clients that know how to handle the streaming can use it straight away. |
| class WaitableQueryResultImpl |
| implements QueryResult, WritableQueryResult, PromiseLike<QueryResult> |
| { |
| private impl: QueryResultImpl; |
| private thenCalled = false; |
| |
| constructor(errorInfo: QueryErrorInfo) { |
| this.impl = new QueryResultImpl(errorInfo); |
| } |
| |
| // QueryResult implementation. Proxies all calls to the impl object. |
| iter<T extends Row>(spec: T) { |
| return this.impl.iter(spec); |
| } |
| firstRow<T extends Row>(spec: T) { |
| return this.impl.firstRow(spec); |
| } |
| waitAllRows() { |
| return this.impl.waitAllRows(); |
| } |
| waitMoreRows() { |
| return this.impl.waitMoreRows(); |
| } |
| isComplete() { |
| return this.impl.isComplete(); |
| } |
| numRows() { |
| return this.impl.numRows(); |
| } |
| columns() { |
| return this.impl.columns(); |
| } |
| error() { |
| return this.impl.error(); |
| } |
| statementCount() { |
| return this.impl.statementCount(); |
| } |
| statementWithOutputCount() { |
| return this.impl.statementWithOutputCount(); |
| } |
| lastStatementSql() { |
| return this.impl.lastStatementSql(); |
| } |
| |
| // WritableQueryResult implementation. |
| appendResultBatch(resBytes: Uint8Array) { |
| return this.impl.appendResultBatch(resBytes); |
| } |
| |
| // PromiseLike<QueryResult> implementaton. |
| |
| // eslint-disable-next-line @typescript-eslint/no-explicit-any |
| then(onfulfilled: any, onrejected: any): any { |
| assertFalse(this.thenCalled); |
| this.thenCalled = true; |
| return this.impl.ensureAllRowsPromise().then(onfulfilled, onrejected); |
| } |
| |
| // eslint-disable-next-line @typescript-eslint/no-explicit-any |
| catch(error: any): any { |
| return this.impl.ensureAllRowsPromise().catch(error); |
| } |
| |
| // eslint-disable-next-line @typescript-eslint/no-explicit-any |
| finally(callback: () => void): any { |
| return this.impl.ensureAllRowsPromise().finally(callback); |
| } |
| |
| // eslint and clang-format disagree on how to format get[foo](). Let |
| // clang-format win: |
| get [Symbol.toStringTag](): string { |
| return 'Promise<WaitableQueryResult>'; |
| } |
| } |
| |
| export function createQueryResult( |
| errorInfo: QueryErrorInfo, |
| ): QueryResult & Promise<QueryResult> & WritableQueryResult { |
| return new WaitableQueryResultImpl(errorInfo); |
| } |
| |
| // Throws if the value cannot be reasonably converted to a bigint. |
| // Assumes value is in native time units. |
| export function timeFromSql(value: ColumnType): time { |
| if (typeof value === 'bigint') { |
| return Time.fromRaw(value); |
| } else if (typeof value === 'number') { |
| return Time.fromRaw(BigInt(Math.floor(value))); |
| } else if (value === null) { |
| return Time.ZERO; |
| } else { |
| throw Error(`Refusing to create time from unrelated type ${value}`); |
| } |
| } |
| |
| // Throws if the value cannot be reasonably converted to a bigint. |
| // Assumes value is in nanoseconds. |
| export function durationFromSql(value: ColumnType): duration { |
| if (typeof value === 'bigint') { |
| return value; |
| } else if (typeof value === 'number') { |
| return BigInt(Math.floor(value)); |
| } else if (value === null) { |
| return Duration.ZERO; |
| } else { |
| throw Error(`Refusing to create duration from unrelated type ${value}`); |
| } |
| } |