blob: 79e7e6580493241162fa1bd8d0977d1e4aca9907 [file] [log] [blame]
// Copyright (C) 2018 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
import {defer} from '../base/deferred';
import {assertExists, assertTrue} from '../base/logging';
import * as init_trace_processor from '../gen/trace_processor';
// The Initialize() call will allocate a buffer of REQ_BUF_SIZE bytes which
// will be used to copy the input request data. This is to avoid passing the
// input data on the stack, which has a limited (~1MB) size.
// The buffer will be allocated by the C++ side and reachable at
// HEAPU8[reqBufferAddr, +REQ_BUFFER_SIZE].
const REQ_BUF_SIZE = 32 * 1024 * 1024;
// The end-to-end interaction between JS and Wasm is as follows:
// - [JS] Inbound data received by the worker (onmessage() in engine/index.ts).
// - [JS] onRpcDataReceived() (this file)
// - [C++] trace_processor_on_rpc_request (wasm_bridge.cc)
// - [C++] some TraceProcessor::method()
// for (batch in result_rows)
// - [C++] RpcResponseFunction(bytes) (wasm_bridge.cc)
// - [JS] onReply() (this file)
// - [JS] postMessage() (this file)
export class WasmBridge {
// When this promise has resolved it is safe to call callWasm.
whenInitialized: Promise<void>;
private aborted: boolean;
private connection: init_trace_processor.Module;
private reqBufferAddr = 0;
private lastStderr: string[] = [];
private messagePort?: MessagePort;
constructor() {
this.aborted = false;
const deferredRuntimeInitialized = defer<void>();
this.connection = init_trace_processor({
locateFile: (s: string) => s,
print: (line: string) => console.log(line),
printErr: (line: string) => this.appendAndLogErr(line),
onRuntimeInitialized: () => deferredRuntimeInitialized.resolve(),
});
this.whenInitialized = deferredRuntimeInitialized.then(() => {
const fn = this.connection.addFunction(this.onReply.bind(this), 'vii');
this.reqBufferAddr = this.connection.ccall(
'trace_processor_rpc_init',
/*return=*/ 'number',
/*args=*/['number', 'number'],
[fn, REQ_BUF_SIZE]);
});
}
initialize(port: MessagePort) {
// Ensure that initialize() is called only once.
assertTrue(this.messagePort === undefined);
this.messagePort = port;
// Note: setting .onmessage implicitly calls port.start() and dispatches the
// queued messages. addEventListener('message') doesn't.
this.messagePort.onmessage = this.onMessage.bind(this);
}
onMessage(msg: MessageEvent) {
if (this.aborted) {
throw new Error('Wasm module crashed');
}
assertTrue(msg.data instanceof Uint8Array);
const data = msg.data as Uint8Array;
let wrSize = 0;
// If the request data is larger than our JS<>Wasm interop buffer, split it
// into multiple writes. The RPC channel is byte-oriented and is designed to
// deal with arbitrary fragmentations.
while (wrSize < data.length) {
const sliceLen = Math.min(data.length - wrSize, REQ_BUF_SIZE);
const dataSlice = data.subarray(wrSize, wrSize + sliceLen);
this.connection.HEAPU8.set(dataSlice, this.reqBufferAddr);
wrSize += sliceLen;
try {
this.connection.ccall(
'trace_processor_on_rpc_request', // C function name.
'void', // Return type.
['number'], // Arg types.
[sliceLen] // Args.
);
} catch (err) {
this.aborted = true;
let abortReason = `${err}`;
if (err instanceof Error) {
abortReason = `${err.name}: ${err.message}\n${err.stack}`;
}
abortReason += '\n\nstderr: \n' + this.lastStderr.join('\n');
throw new Error(abortReason);
}
} // while(wrSize < data.length)
}
// This function is bound and passed to Initialize and is called by the C++
// code while in the ccall(trace_processor_on_rpc_request).
private onReply(heapPtr: number, size: number) {
const data = this.connection.HEAPU8.slice(heapPtr, heapPtr + size);
assertExists(this.messagePort).postMessage(data, [data.buffer]);
}
private appendAndLogErr(line: string) {
console.warn(line);
// Keep the last N lines in the |lastStderr| buffer.
this.lastStderr.push(line);
if (this.lastStderr.length > 512) {
this.lastStderr.shift();
}
}
}