| // Copyright (C) 2018 The Android Open Source Project |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| import {defer, Deferred} from '../base/deferred'; |
| import {assertExists, assertTrue} from '../base/logging'; |
| import { |
| ComputeMetricArgs, |
| ComputeMetricResult, |
| DisableAndReadMetatraceResult, |
| EnableMetatraceArgs, |
| MetatraceCategories, |
| QueryArgs, |
| QueryResult as ProtoQueryResult, |
| ResetTraceProcessorArgs, |
| TraceProcessorRpc, |
| TraceProcessorRpcStream, |
| } from '../protos'; |
| |
| import {ProtoRingBuffer} from './proto_ring_buffer'; |
| import { |
| createQueryResult, |
| QueryError, |
| QueryResult, |
| WritableQueryResult, |
| } from './query_result'; |
| |
| import TPM = TraceProcessorRpc.TraceProcessorMethod; |
| import {Disposable} from '../base/disposable'; |
| import {Result} from '../base/utils'; |
| |
| export interface LoadingTracker { |
| beginLoading(): void; |
| endLoading(): void; |
| } |
| |
| export class NullLoadingTracker implements LoadingTracker { |
| beginLoading(): void {} |
| endLoading(): void {} |
| } |
| |
| // This is used to skip the decoding of queryResult from protobufjs and deal |
| // with it ourselves. See the comment below around `QueryResult.decode = ...`. |
| interface QueryResultBypass { |
| rawQueryResult: Uint8Array; |
| } |
| |
| export interface TraceProcessorConfig { |
| cropTrackEvents: boolean; |
| ingestFtraceInRawTable: boolean; |
| analyzeTraceProtoContent: boolean; |
| ftraceDropUntilAllCpusValid: boolean; |
| } |
| |
| export interface Engine { |
| /** |
| * Execute a query against the database, returning a promise that resolves |
| * when the query has completed but rejected when the query fails for whatever |
| * reason. On success, the promise will only resolve once all the resulting |
| * rows have been received. |
| * |
| * The promise will be rejected if the query fails. |
| * |
| * @param sql The query to execute. |
| * @param tag An optional tag used to trace the origin of the query. |
| */ |
| query(sql: string, tag?: string): Promise<QueryResult>; |
| |
| /** |
| * Execute a query against the database, returning a promise that resolves |
| * when the query has completed or failed. The promise will never get |
| * rejected, it will always successfully resolve. Use the returned wrapper |
| * object to determine whether the query completed successfully. |
| * |
| * The promise will only resolve once all the resulting rows have been |
| * received. |
| * |
| * @param sql The query to execute. |
| * @param tag An optional tag used to trace the origin of the query. |
| */ |
| tryQuery(sql: string, tag?: string): Promise<Result<QueryResult, Error>>; |
| |
| /** |
| * Execute one or more metric and get the result. |
| * |
| * @param metrics The metrics to run. |
| * @param format The format of the response. |
| */ |
| computeMetric( |
| metrics: string[], |
| format: 'json' | 'prototext' | 'proto', |
| ): Promise<string | Uint8Array>; |
| } |
| |
| // Abstract interface of a trace proccessor. |
| // This is the TypeScript equivalent of src/trace_processor/rpc.h. |
| // There are two concrete implementations: |
| // 1. WasmEngineProxy: creates a Wasm module and interacts over postMessage(). |
| // 2. HttpRpcEngine: connects to an external `trace_processor_shell --httpd`. |
| // and interacts via fetch(). |
| // In both cases, we have a byte-oriented pipe to interact with TraceProcessor. |
| // The derived class is only expected to deal with these two functions: |
| // 1. Implement the abstract rpcSendRequestBytes() function, sending the |
| // proto-encoded TraceProcessorRpc requests to the TraceProcessor instance. |
| // 2. Call onRpcResponseBytes() when response data is received. |
| export abstract class EngineBase implements Engine { |
| abstract readonly id: string; |
| private loadingTracker: LoadingTracker; |
| private txSeqId = 0; |
| private rxSeqId = 0; |
| private rxBuf = new ProtoRingBuffer(); |
| private pendingParses = new Array<Deferred<void>>(); |
| private pendingEOFs = new Array<Deferred<void>>(); |
| private pendingResetTraceProcessors = new Array<Deferred<void>>(); |
| private pendingQueries = new Array<WritableQueryResult>(); |
| private pendingRestoreTables = new Array<Deferred<void>>(); |
| private pendingComputeMetrics = new Array<Deferred<string | Uint8Array>>(); |
| private pendingReadMetatrace?: Deferred<DisableAndReadMetatraceResult>; |
| private _isMetatracingEnabled = false; |
| |
| constructor(tracker?: LoadingTracker) { |
| this.loadingTracker = tracker ? tracker : new NullLoadingTracker(); |
| } |
| |
| // Called to send data to the TraceProcessor instance. This turns into a |
| // postMessage() or a HTTP request, depending on the Engine implementation. |
| abstract rpcSendRequestBytes(data: Uint8Array): void; |
| |
| // Called when an inbound message is received by the Engine implementation |
| // (e.g. onmessage for the Wasm case, on when HTTP replies are received for |
| // the HTTP+RPC case). |
| onRpcResponseBytes(dataWillBeRetained: Uint8Array) { |
| // Note: when hitting the fastpath inside ProtoRingBuffer, the |data| buffer |
| // is returned back by readMessage() (% subarray()-ing it) and held onto by |
| // other classes (e.g., QueryResult). For both fetch() and Wasm we are fine |
| // because every response creates a new buffer. |
| this.rxBuf.append(dataWillBeRetained); |
| for (;;) { |
| const msg = this.rxBuf.readMessage(); |
| if (msg === undefined) break; |
| this.onRpcResponseMessage(msg); |
| } |
| } |
| |
| // Parses a response message. |
| // |rpcMsgEncoded| is a sub-array to to the start of a TraceProcessorRpc |
| // proto-encoded message (without the proto preamble and varint size). |
| private onRpcResponseMessage(rpcMsgEncoded: Uint8Array) { |
| // Here we override the protobufjs-generated code to skip the parsing of the |
| // new streaming QueryResult and instead passing it through like a buffer. |
| // This is the overall problem: All trace processor responses are wrapped |
| // into a perfetto.protos.TraceProcessorRpc proto message. In all cases % |
| // TPM_QUERY_STREAMING, we want protobufjs to decode the proto bytes and |
| // give us a structured object. In the case of TPM_QUERY_STREAMING, instead, |
| // we want to deal with the proto parsing ourselves using the new |
| // QueryResult.appendResultBatch() method, because that handled streaming |
| // results more efficiently and skips several copies. |
| // By overriding the decode method below, we achieve two things: |
| // 1. We avoid protobufjs decoding the TraceProcessorRpc.query_result field. |
| // 2. We stash (a view of) the original buffer into the |rawQueryResult| so |
| // the `case TPM_QUERY_STREAMING` below can take it. |
| ProtoQueryResult.decode = (reader: protobuf.Reader, length: number) => { |
| const res = ProtoQueryResult.create() as {} as QueryResultBypass; |
| res.rawQueryResult = reader.buf.subarray(reader.pos, reader.pos + length); |
| // All this works only if protobufjs returns the original ArrayBuffer |
| // from |rpcMsgEncoded|. It should be always the case given the |
| // current implementation. This check mainly guards against future |
| // behavioral changes of protobufjs. We don't want to accidentally |
| // hold onto some internal protobufjs buffer. We are fine holding |
| // onto |rpcMsgEncoded| because those come from ProtoRingBuffer which |
| // is buffer-retention-friendly. |
| assertTrue(res.rawQueryResult.buffer === rpcMsgEncoded.buffer); |
| reader.pos += length; |
| return res as {} as ProtoQueryResult; |
| }; |
| |
| const rpc = TraceProcessorRpc.decode(rpcMsgEncoded); |
| |
| if (rpc.fatalError !== undefined && rpc.fatalError.length > 0) { |
| throw new Error(`${rpc.fatalError}`); |
| } |
| |
| // Allow restarting sequences from zero (when reloading the browser). |
| if (rpc.seq !== this.rxSeqId + 1 && this.rxSeqId !== 0 && rpc.seq !== 0) { |
| // "(ERR:rpc_seq)" is intercepted by error_dialog.ts to show a more |
| // graceful and actionable error. |
| throw new Error( |
| `RPC sequence id mismatch cur=${rpc.seq} last=${this.rxSeqId} (ERR:rpc_seq)`, |
| ); |
| } |
| |
| this.rxSeqId = rpc.seq; |
| |
| let isFinalResponse = true; |
| |
| switch (rpc.response) { |
| case TPM.TPM_APPEND_TRACE_DATA: |
| const appendResult = assertExists(rpc.appendResult); |
| const pendingPromise = assertExists(this.pendingParses.shift()); |
| if (appendResult.error && appendResult.error.length > 0) { |
| pendingPromise.reject(appendResult.error); |
| } else { |
| pendingPromise.resolve(); |
| } |
| break; |
| case TPM.TPM_FINALIZE_TRACE_DATA: |
| assertExists(this.pendingEOFs.shift()).resolve(); |
| break; |
| case TPM.TPM_RESET_TRACE_PROCESSOR: |
| assertExists(this.pendingResetTraceProcessors.shift()).resolve(); |
| break; |
| case TPM.TPM_RESTORE_INITIAL_TABLES: |
| assertExists(this.pendingRestoreTables.shift()).resolve(); |
| break; |
| case TPM.TPM_QUERY_STREAMING: |
| const qRes = assertExists(rpc.queryResult) as {} as QueryResultBypass; |
| const pendingQuery = assertExists(this.pendingQueries[0]); |
| pendingQuery.appendResultBatch(qRes.rawQueryResult); |
| if (pendingQuery.isComplete()) { |
| this.pendingQueries.shift(); |
| } else { |
| isFinalResponse = false; |
| } |
| break; |
| case TPM.TPM_COMPUTE_METRIC: |
| const metricRes = assertExists(rpc.metricResult) as ComputeMetricResult; |
| const pendingComputeMetric = assertExists( |
| this.pendingComputeMetrics.shift(), |
| ); |
| if (metricRes.error && metricRes.error.length > 0) { |
| const error = new QueryError( |
| `ComputeMetric() error: ${metricRes.error}`, |
| { |
| query: 'COMPUTE_METRIC', |
| }, |
| ); |
| pendingComputeMetric.reject(error); |
| } else { |
| const result = |
| metricRes.metricsAsPrototext || |
| metricRes.metricsAsJson || |
| metricRes.metrics || |
| ''; |
| pendingComputeMetric.resolve(result); |
| } |
| break; |
| case TPM.TPM_DISABLE_AND_READ_METATRACE: |
| const metatraceRes = assertExists( |
| rpc.metatrace, |
| ) as DisableAndReadMetatraceResult; |
| assertExists(this.pendingReadMetatrace).resolve(metatraceRes); |
| this.pendingReadMetatrace = undefined; |
| break; |
| default: |
| console.log( |
| 'Unexpected TraceProcessor response received: ', |
| rpc.response, |
| ); |
| break; |
| } // switch(rpc.response); |
| |
| if (isFinalResponse) { |
| this.loadingTracker.endLoading(); |
| } |
| } |
| |
| // TraceProcessor methods below this point. |
| // The methods below are called by the various controllers in the UI and |
| // deal with marshalling / unmarshaling requests to/from TraceProcessor. |
| |
| // Push trace data into the engine. The engine is supposed to automatically |
| // figure out the type of the trace (JSON vs Protobuf). |
| parse(data: Uint8Array): Promise<void> { |
| const asyncRes = defer<void>(); |
| this.pendingParses.push(asyncRes); |
| const rpc = TraceProcessorRpc.create(); |
| rpc.request = TPM.TPM_APPEND_TRACE_DATA; |
| rpc.appendTraceData = data; |
| this.rpcSendRequest(rpc); |
| return asyncRes; // Linearize with the worker. |
| } |
| |
| // Notify the engine that we reached the end of the trace. |
| // Called after the last parse() call. |
| notifyEof(): Promise<void> { |
| const asyncRes = defer<void>(); |
| this.pendingEOFs.push(asyncRes); |
| const rpc = TraceProcessorRpc.create(); |
| rpc.request = TPM.TPM_FINALIZE_TRACE_DATA; |
| this.rpcSendRequest(rpc); |
| return asyncRes; // Linearize with the worker. |
| } |
| |
| // Updates the TraceProcessor Config. This method creates a new |
| // TraceProcessor instance, so it should be called before passing any trace |
| // data. |
| resetTraceProcessor({ |
| cropTrackEvents, |
| ingestFtraceInRawTable, |
| analyzeTraceProtoContent, |
| ftraceDropUntilAllCpusValid, |
| }: TraceProcessorConfig): Promise<void> { |
| const asyncRes = defer<void>(); |
| this.pendingResetTraceProcessors.push(asyncRes); |
| const rpc = TraceProcessorRpc.create(); |
| rpc.request = TPM.TPM_RESET_TRACE_PROCESSOR; |
| const args = (rpc.resetTraceProcessorArgs = new ResetTraceProcessorArgs()); |
| args.dropTrackEventDataBefore = cropTrackEvents |
| ? ResetTraceProcessorArgs.DropTrackEventDataBefore |
| .TRACK_EVENT_RANGE_OF_INTEREST |
| : ResetTraceProcessorArgs.DropTrackEventDataBefore.NO_DROP; |
| args.ingestFtraceInRawTable = ingestFtraceInRawTable; |
| args.analyzeTraceProtoContent = analyzeTraceProtoContent; |
| args.ftraceDropUntilAllCpusValid = ftraceDropUntilAllCpusValid; |
| this.rpcSendRequest(rpc); |
| return asyncRes; |
| } |
| |
| // Resets the trace processor state by destroying any table/views created by |
| // the UI after loading. |
| restoreInitialTables(): Promise<void> { |
| const asyncRes = defer<void>(); |
| this.pendingRestoreTables.push(asyncRes); |
| const rpc = TraceProcessorRpc.create(); |
| rpc.request = TPM.TPM_RESTORE_INITIAL_TABLES; |
| this.rpcSendRequest(rpc); |
| return asyncRes; // Linearize with the worker. |
| } |
| |
| // Shorthand for sending a compute metrics request to the engine. |
| async computeMetric( |
| metrics: string[], |
| format: 'json' | 'prototext' | 'proto', |
| ): Promise<string | Uint8Array> { |
| const asyncRes = defer<string | Uint8Array>(); |
| this.pendingComputeMetrics.push(asyncRes); |
| const rpc = TraceProcessorRpc.create(); |
| rpc.request = TPM.TPM_COMPUTE_METRIC; |
| const args = (rpc.computeMetricArgs = new ComputeMetricArgs()); |
| args.metricNames = metrics; |
| if (format === 'json') { |
| args.format = ComputeMetricArgs.ResultFormat.JSON; |
| } else if (format === 'prototext') { |
| args.format = ComputeMetricArgs.ResultFormat.TEXTPROTO; |
| } else if (format === 'proto') { |
| args.format = ComputeMetricArgs.ResultFormat.BINARY_PROTOBUF; |
| } else { |
| throw new Error(`Unknown compute metric format ${format}`); |
| } |
| this.rpcSendRequest(rpc); |
| return asyncRes; |
| } |
| |
| // Issues a streaming query and retrieve results in batches. |
| // The returned QueryResult object will be populated over time with batches |
| // of rows (each batch conveys ~128KB of data and a variable number of rows). |
| // The caller can decide whether to wait that all batches have been received |
| // (by awaiting the returned object or calling result.waitAllRows()) or handle |
| // the rows incrementally. |
| // |
| // Example usage: |
| // const res = engine.execute('SELECT foo, bar FROM table'); |
| // console.log(res.numRows()); // Will print 0 because we didn't await. |
| // await(res.waitAllRows()); |
| // console.log(res.numRows()); // Will print the total number of rows. |
| // |
| // for (const it = res.iter({foo: NUM, bar:STR}); it.valid(); it.next()) { |
| // console.log(it.foo, it.bar); |
| // } |
| // |
| // Optional |tag| (usually a component name) can be provided to allow |
| // attributing trace processor workload to different UI components. |
| private streamingQuery( |
| sqlQuery: string, |
| tag?: string, |
| ): Promise<QueryResult> & QueryResult { |
| const rpc = TraceProcessorRpc.create(); |
| rpc.request = TPM.TPM_QUERY_STREAMING; |
| rpc.queryArgs = new QueryArgs(); |
| rpc.queryArgs.sqlQuery = sqlQuery; |
| if (tag) { |
| rpc.queryArgs.tag = tag; |
| } |
| const result = createQueryResult({ |
| query: sqlQuery, |
| }); |
| this.pendingQueries.push(result); |
| this.rpcSendRequest(rpc); |
| return result; |
| } |
| |
| // Wraps .streamingQuery(), captures errors and re-throws with current stack. |
| // |
| // Note: This function is less flexible than .execute() as it only returns a |
| // promise which must be unwrapped before the QueryResult may be accessed. |
| async query(sqlQuery: string, tag?: string): Promise<QueryResult> { |
| try { |
| return await this.streamingQuery(sqlQuery, tag); |
| } catch (e) { |
| // Replace the error's stack trace with the one from here |
| // Note: It seems only V8 can trace the stack up the promise chain, so its |
| // likely this stack won't be useful on !V8. |
| // See |
| // https://docs.google.com/document/d/13Sy_kBIJGP0XT34V1CV3nkWya4TwYx9L3Yv45LdGB6Q |
| captureStackTrace(e); |
| throw e; |
| } |
| } |
| |
| async tryQuery( |
| sql: string, |
| tag?: string, |
| ): Promise<Result<QueryResult, Error>> { |
| try { |
| const result = await this.query(sql, tag); |
| return {success: true, result}; |
| } catch (error: unknown) { |
| // We know we only throw Error type objects so we can type assert safely |
| return {success: false, error: error as Error}; |
| } |
| } |
| |
| isMetatracingEnabled(): boolean { |
| return this._isMetatracingEnabled; |
| } |
| |
| enableMetatrace(categories?: MetatraceCategories) { |
| const rpc = TraceProcessorRpc.create(); |
| rpc.request = TPM.TPM_ENABLE_METATRACE; |
| if (categories) { |
| rpc.enableMetatraceArgs = new EnableMetatraceArgs(); |
| rpc.enableMetatraceArgs.categories = categories; |
| } |
| this._isMetatracingEnabled = true; |
| this.rpcSendRequest(rpc); |
| } |
| |
| stopAndGetMetatrace(): Promise<DisableAndReadMetatraceResult> { |
| // If we are already finalising a metatrace, ignore the request. |
| if (this.pendingReadMetatrace) { |
| return Promise.reject(new Error('Already finalising a metatrace')); |
| } |
| |
| const result = defer<DisableAndReadMetatraceResult>(); |
| |
| const rpc = TraceProcessorRpc.create(); |
| rpc.request = TPM.TPM_DISABLE_AND_READ_METATRACE; |
| this._isMetatracingEnabled = false; |
| this.pendingReadMetatrace = result; |
| this.rpcSendRequest(rpc); |
| return result; |
| } |
| |
| // Marshals the TraceProcessorRpc request arguments and sends the request |
| // to the concrete Engine (Wasm or HTTP). |
| private rpcSendRequest(rpc: TraceProcessorRpc) { |
| rpc.seq = this.txSeqId++; |
| // Each message is wrapped in a TraceProcessorRpcStream to add the varint |
| // preamble with the size, which allows tokenization on the other end. |
| const outerProto = TraceProcessorRpcStream.create(); |
| outerProto.msg.push(rpc); |
| const buf = TraceProcessorRpcStream.encode(outerProto).finish(); |
| this.loadingTracker.beginLoading(); |
| this.rpcSendRequestBytes(buf); |
| } |
| |
| getProxy(tag: string): EngineProxy { |
| return new EngineProxy(this, tag); |
| } |
| } |
| |
| // Lightweight engine proxy which annotates all queries with a tag |
| export class EngineProxy implements Engine, Disposable { |
| private engine: EngineBase; |
| private tag: string; |
| private _isAlive: boolean; |
| |
| constructor(engine: EngineBase, tag: string) { |
| this.engine = engine; |
| this.tag = tag; |
| this._isAlive = true; |
| } |
| |
| async query(query: string, tag?: string): Promise<QueryResult> { |
| if (!this._isAlive) { |
| throw new Error(`EngineProxy ${this.tag} was disposed.`); |
| } |
| return await this.engine.query(query, tag); |
| } |
| |
| async tryQuery( |
| query: string, |
| tag?: string, |
| ): Promise<Result<QueryResult, Error>> { |
| if (!this._isAlive) { |
| return { |
| success: false, |
| error: new Error(`EngineProxy ${this.tag} was disposed.`), |
| }; |
| } |
| return await this.engine.tryQuery(query, tag); |
| } |
| |
| async computeMetric( |
| metrics: string[], |
| format: 'json' | 'prototext' | 'proto', |
| ): Promise<string | Uint8Array> { |
| if (!this._isAlive) { |
| return Promise.reject(new Error(`EngineProxy ${this.tag} was disposed.`)); |
| } |
| return this.engine.computeMetric(metrics, format); |
| } |
| |
| get engineId(): string { |
| return this.engine.id; |
| } |
| |
| dispose() { |
| this._isAlive = false; |
| } |
| } |
| |
| // Capture stack trace and attach to the given error object |
| function captureStackTrace(e: Error): void { |
| const stack = new Error().stack; |
| if ('captureStackTrace' in Error) { |
| // V8 specific |
| Error.captureStackTrace(e, captureStackTrace); |
| } else { |
| // Generic |
| Object.defineProperty(e, 'stack', { |
| value: stack, |
| writable: true, |
| configurable: true, |
| }); |
| } |
| } |