| // Copyright (C) 2018 The Android Open Source Project |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| import {defer} from '../base/deferred'; |
| import {assertExists, assertTrue} from '../base/logging'; |
| import * as init_trace_processor from '../gen/trace_processor'; |
| |
| // The Initialize() call will allocate a buffer of REQ_BUF_SIZE bytes which |
| // will be used to copy the input request data. This is to avoid passing the |
| // input data on the stack, which has a limited (~1MB) size. |
| // The buffer will be allocated by the C++ side and reachable at |
| // HEAPU8[reqBufferAddr, +REQ_BUFFER_SIZE]. |
| const REQ_BUF_SIZE = 32 * 1024 * 1024; |
| |
| // The end-to-end interaction between JS and Wasm is as follows: |
| // - [JS] Inbound data received by the worker (onmessage() in engine/index.ts). |
| // - [JS] onRpcDataReceived() (this file) |
| // - [C++] trace_processor_on_rpc_request (wasm_bridge.cc) |
| // - [C++] some TraceProcessor::method() |
| // for (batch in result_rows) |
| // - [C++] RpcResponseFunction(bytes) (wasm_bridge.cc) |
| // - [JS] onReply() (this file) |
| // - [JS] postMessage() (this file) |
| export class WasmBridge { |
| // When this promise has resolved it is safe to call callWasm. |
| whenInitialized: Promise<void>; |
| |
| private aborted: boolean; |
| private connection: init_trace_processor.Module; |
| private reqBufferAddr = 0; |
| private lastStderr: string[] = []; |
| private messagePort?: MessagePort; |
| |
| constructor() { |
| this.aborted = false; |
| const deferredRuntimeInitialized = defer<void>(); |
| this.connection = init_trace_processor({ |
| locateFile: (s: string) => s, |
| print: (line: string) => console.log(line), |
| printErr: (line: string) => this.appendAndLogErr(line), |
| onRuntimeInitialized: () => deferredRuntimeInitialized.resolve(), |
| }); |
| this.whenInitialized = deferredRuntimeInitialized.then(() => { |
| const fn = this.connection.addFunction(this.onReply.bind(this), 'vii'); |
| this.reqBufferAddr = this.connection.ccall( |
| 'trace_processor_rpc_init', |
| /*return=*/ 'number', |
| /*args=*/['number', 'number'], |
| [fn, REQ_BUF_SIZE]); |
| }); |
| } |
| |
| initialize(port: MessagePort) { |
| // Ensure that initialize() is called only once. |
| assertTrue(this.messagePort === undefined); |
| this.messagePort = port; |
| // Note: setting .onmessage implicitly calls port.start() and dispatches the |
| // queued messages. addEventListener('message') doesn't. |
| this.messagePort.onmessage = this.onMessage.bind(this); |
| } |
| |
| onMessage(msg: MessageEvent) { |
| if (this.aborted) { |
| throw new Error('Wasm module crashed'); |
| } |
| assertTrue(msg.data instanceof Uint8Array); |
| const data = msg.data as Uint8Array; |
| let wrSize = 0; |
| // If the request data is larger than our JS<>Wasm interop buffer, split it |
| // into multiple writes. The RPC channel is byte-oriented and is designed to |
| // deal with arbitrary fragmentations. |
| while (wrSize < data.length) { |
| const sliceLen = Math.min(data.length - wrSize, REQ_BUF_SIZE); |
| const dataSlice = data.subarray(wrSize, wrSize + sliceLen); |
| this.connection.HEAPU8.set(dataSlice, this.reqBufferAddr); |
| wrSize += sliceLen; |
| try { |
| this.connection.ccall( |
| 'trace_processor_on_rpc_request', // C function name. |
| 'void', // Return type. |
| ['number'], // Arg types. |
| [sliceLen] // Args. |
| ); |
| } catch (err) { |
| this.aborted = true; |
| let abortReason = `${err}`; |
| if (err instanceof Error) { |
| abortReason = `${err.name}: ${err.message}\n${err.stack}`; |
| } |
| abortReason += '\n\nstderr: \n' + this.lastStderr.join('\n'); |
| throw new Error(abortReason); |
| } |
| } // while(wrSize < data.length) |
| } |
| |
| // This function is bound and passed to Initialize and is called by the C++ |
| // code while in the ccall(trace_processor_on_rpc_request). |
| private onReply(heapPtr: number, size: number) { |
| const data = this.connection.HEAPU8.slice(heapPtr, heapPtr + size); |
| assertExists(this.messagePort).postMessage(data, [data.buffer]); |
| } |
| |
| private appendAndLogErr(line: string) { |
| console.warn(line); |
| // Keep the last N lines in the |lastStderr| buffer. |
| this.lastStderr.push(line); |
| if (this.lastStderr.length > 512) { |
| this.lastStderr.shift(); |
| } |
| } |
| } |