UI: spawn worker from the frontend
This CL spawns the engine worker (The thing that
hosts the TraceProcessor Wasm module) directly from
the frontend, rather than using a nested worker.
This simplifies further lots of the Wasm-related
boilerplate.
It also avoids relying on nested worker which
doesn't work with Safari.
Test: manual on Safari and Firefox.
Bug: 159142289
Bug: 143973344
Change-Id: I92fe949ae5dc69ac34e2e638ed5d7279978e3c26
diff --git a/ui/src/common/wasm_engine_proxy.ts b/ui/src/common/wasm_engine_proxy.ts
index aabcc12..5f7715d 100644
--- a/ui/src/common/wasm_engine_proxy.ts
+++ b/ui/src/common/wasm_engine_proxy.ts
@@ -15,64 +15,19 @@
import {assertTrue} from '../base/logging';
import {Engine, LoadingTracker} from './engine';
-const activeWorkers = new Map<string, Worker>();
-let warmWorker: null|Worker = null;
-
-function createWorker(): Worker {
- return new Worker('engine_bundle.js');
-}
-
-// Take the warm engine and start creating a new WASM engine in the background
-// for the next call.
-export function createWasmEngine(id: string): Worker {
- if (warmWorker === null) {
- throw new Error('warmupWasmEngine() not called');
- }
- if (activeWorkers.has(id)) {
- throw new Error(`Duplicate worker ID ${id}`);
- }
- const activeWorker = warmWorker;
- warmWorker = createWorker();
- activeWorkers.set(id, activeWorker);
- return activeWorker;
-}
-
-export function destroyWasmEngine(id: string) {
- if (!activeWorkers.has(id)) {
- throw new Error(`Cannot find worker ID ${id}`);
- }
- activeWorkers.get(id)!.terminate();
- activeWorkers.delete(id);
-}
-
-/**
- * It's quite slow to compile WASM and (in Chrome) this happens every time
- * a worker thread attempts to load a WASM module since there is no way to
- * cache the compiled code currently. To mitigate this we can always keep a
- * WASM backend 'ready to go' just waiting to be provided with a trace file.
- * warmupWasmEngineWorker (together with getWasmEngineWorker)
- * implement this behaviour.
- */
-export function warmupWasmEngine(): void {
- if (warmWorker !== null) {
- throw new Error('warmupWasmEngine() already called');
- }
- warmWorker = createWorker();
-}
-
/**
* This implementation of Engine uses a WASM backend hosted in a separate
* worker thread.
*/
export class WasmEngineProxy extends Engine {
readonly id: string;
- private readonly worker: Worker;
+ private port: MessagePort;
- constructor(id: string, worker: Worker, loadingTracker?: LoadingTracker) {
+ constructor(id: string, port: MessagePort, loadingTracker?: LoadingTracker) {
super(loadingTracker);
this.id = id;
- this.worker = worker;
- this.worker.onmessage = this.onMessage.bind(this);
+ this.port = port;
+ this.port.onmessage = this.onMessage.bind(this);
}
onMessage(m: MessageEvent) {
@@ -84,6 +39,6 @@
// We deliberately don't use a transfer list because protobufjs reuses the
// same buffer when encoding messages (which is good, because creating a new
// TypedArray for each decode operation would be too expensive).
- this.worker.postMessage(data);
+ this.port.postMessage(data);
}
}
diff --git a/ui/src/common/worker_messages.ts b/ui/src/common/worker_messages.ts
new file mode 100644
index 0000000..b60a656
--- /dev/null
+++ b/ui/src/common/worker_messages.ts
@@ -0,0 +1,48 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// This file defines the API of messages exchanged between frontend and
+// {engine, controller} worker when bootstrapping the workers.
+// Those messages are sent only once. The rest of the communication happens
+// over the MessagePort(s) that are sent in the init message.
+
+// This is so we can create all the workers in a central place in the frontend
+// (Safari still doesn't spawning workers from other workers) but then let them
+// communicate by sending the right MessagePort to them.
+
+// Frontend -> Engine initialization message.
+export interface EngineWorkerInitMessage {
+ // The port used to receive engine messages (e.g., query commands).
+ // The controller owns the other end of the MessageChannel
+ // (see resetEngineWorker()).
+ enginePort: MessagePort;
+}
+
+// Frontend -> Controller initialization message.
+export interface ControllerWorkerInitMessage {
+ // For receiving dispatch() commands from the frontend. This is where most of
+ // the frontend <> controller interaction happens.
+ controllerPort: MessagePort;
+
+ // For publishing results back to the frontend. This is used for one-way
+ // non-retained publish() operations (e.g. track data after a query).
+ frontendPort: MessagePort;
+
+ // For controller <> Chrome extension communication.
+ extensionPort: MessagePort;
+
+ // For reporting errors back to the frontend. This is a dedicated port to
+ // reduce depdencies on the business logic behind the other ports.
+ errorReportingPort: MessagePort;
+}
diff --git a/ui/src/controller/globals.ts b/ui/src/controller/globals.ts
index 8c5be70..d47af46 100644
--- a/ui/src/controller/globals.ts
+++ b/ui/src/controller/globals.ts
@@ -100,6 +100,19 @@
.send<void>(`publish${what}`, [data], transferList);
}
+ // Returns the port of the MessageChannel that can be used to communicate with
+ // the Wasm Engine (issue SQL queries and retrieve results).
+ resetEngineWorker() {
+ const chan = new MessageChannel();
+ const port = chan.port1;
+ // Invokes resetEngineWorker() in frontend/index.ts. It will spawn a new
+ // worker and assign it the passed |port|.
+ assertExists(this._frontend).send<void>('resetEngineWorker', [port], [
+ port
+ ]);
+ return chan.port2;
+ }
+
get state(): State {
return assertExists(this._state);
}
diff --git a/ui/src/controller/index.ts b/ui/src/controller/index.ts
index bf655b4..9f5add7 100644
--- a/ui/src/controller/index.ts
+++ b/ui/src/controller/index.ts
@@ -16,30 +16,21 @@
import {reportError, setErrorHandler} from '../base/logging';
import {Remote} from '../base/remote';
-import {warmupWasmEngine} from '../common/wasm_engine_proxy';
-
+import {ControllerWorkerInitMessage} from '../common/worker_messages';
import {AppController} from './app_controller';
import {globals} from './globals';
-interface OnMessageArg {
- data: {
- frontendPort: MessagePort; controllerPort: MessagePort;
- extensionPort: MessagePort;
- errorReportingPort: MessagePort;
- };
-}
-
function main() {
self.addEventListener('error', e => reportError(e));
self.addEventListener('unhandledrejection', e => reportError(e));
- warmupWasmEngine();
let initialized = false;
- self.onmessage = ({data}: OnMessageArg) => {
+ self.onmessage = (e: MessageEvent) => {
if (initialized) {
console.error('Already initialized');
return;
}
initialized = true;
+ const data = e.data as ControllerWorkerInitMessage;
const frontendPort = data.frontendPort;
const controllerPort = data.controllerPort;
const extensionPort = data.extensionPort;
diff --git a/ui/src/controller/trace_controller.ts b/ui/src/controller/trace_controller.ts
index 1b3580e..8427b7c 100644
--- a/ui/src/controller/trace_controller.ts
+++ b/ui/src/controller/trace_controller.ts
@@ -26,11 +26,7 @@
import {EngineMode} from '../common/state';
import {toNs, toNsCeil, toNsFloor} from '../common/time';
import {TimeSpan} from '../common/time';
-import {
- createWasmEngine,
- destroyWasmEngine,
- WasmEngineProxy
-} from '../common/wasm_engine_proxy';
+import {WasmEngineProxy} from '../common/wasm_engine_proxy';
import {QuantizedLoad, ThreadDesc} from '../frontend/globals';
import {
@@ -98,12 +94,6 @@
this.engineId = engineId;
}
- onDestroy() {
- if (this.engine instanceof WasmEngineProxy) {
- destroyWasmEngine(this.engine.id);
- }
- }
-
run() {
const engineCfg = assertExists(globals.state.engines[this.engineId]);
switch (this.state) {
@@ -231,10 +221,9 @@
} else {
console.log('Opening trace using built-in WASM engine');
engineMode = 'WASM';
+ const enginePort = globals.resetEngineWorker();
this.engine = new WasmEngineProxy(
- this.engineId,
- createWasmEngine(this.engineId),
- LoadingManager.getInstance);
+ this.engineId, enginePort, LoadingManager.getInstance);
}
globals.dispatch(Actions.setEngineReady({
diff --git a/ui/src/engine/index.ts b/ui/src/engine/index.ts
index 3a4020d..8a7c27e 100644
--- a/ui/src/engine/index.ts
+++ b/ui/src/engine/index.ts
@@ -12,32 +12,30 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-import {assertTrue} from '../base/logging';
-import * as init_trace_processor from '../gen/trace_processor';
-
+import {assertExists} from '../base/logging';
+import {EngineWorkerInitMessage} from '../common/worker_messages';
import {WasmBridge} from './wasm_bridge';
const selfWorker = self as {} as Worker;
+const wasmBridge = new WasmBridge();
-// Messages can arrive before the Wasm module initialization is complete.
-// Queue these for later.
-const msgQueue: MessageEvent[] = [];
+// There are two message handlers here:
+// 1. The Worker (self.onmessage) handler.
+// 2. The MessagePort handler.
+// When the app bootstraps, frontend/index.ts creates a MessageChannel and sends
+// one end to the controller (the other worker) and the other end to us, so that
+// the controller can interact with the Wasm worker without roundtrips through
+// the frontend.
+// The sequence of actions is the following:
+// 1. The frontend does one postMessage({port: MessagePort}) on the Worker
+// scope. This message transfers the MessagePort (whose other end is
+// connected to the Conotroller). This is the only postMessage we'll ever
+// receive here.
+// 2. All the other messages (i.e. the TraceProcessor RPC binary pipe) will be
+// received on the MessagePort.
+
+// Receives the boostrap message from the frontend with the MessagePort.
selfWorker.onmessage = (msg: MessageEvent) => {
- msgQueue.push(msg);
+ const port = assertExists((msg.data as EngineWorkerInitMessage).enginePort);
+ wasmBridge.initialize(port);
};
-
-const bridge = new WasmBridge(init_trace_processor, selfWorker);
-bridge.whenInitialized.then(() => {
- const handleMsg = (msg: MessageEvent) => {
- assertTrue(msg.data instanceof Uint8Array);
- bridge.onRpcDataReceived(msg.data as Uint8Array);
- };
-
- // Dispatch queued messages.
- let msg;
- while (msg = msgQueue.shift()) {
- handleMsg(msg);
- }
-
- selfWorker.onmessage = handleMsg;
-});
diff --git a/ui/src/engine/wasm_bridge.ts b/ui/src/engine/wasm_bridge.ts
index 92813e2..79e7e65 100644
--- a/ui/src/engine/wasm_bridge.ts
+++ b/ui/src/engine/wasm_bridge.ts
@@ -13,6 +13,7 @@
// limitations under the License.
import {defer} from '../base/deferred';
+import {assertExists, assertTrue} from '../base/logging';
import * as init_trace_processor from '../gen/trace_processor';
// The Initialize() call will allocate a buffer of REQ_BUF_SIZE bytes which
@@ -22,12 +23,6 @@
// HEAPU8[reqBufferAddr, +REQ_BUFFER_SIZE].
const REQ_BUF_SIZE = 32 * 1024 * 1024;
-// The common denominator between Worker and MessageChannel. Used to send
-// messages, regardless of Worker vs a dedicated MessagePort.
-export interface PostMessageChannel {
- postMessage(message: Uint8Array, transfer: Transferable[]): void;
-}
-
// The end-to-end interaction between JS and Wasm is as follows:
// - [JS] Inbound data received by the worker (onmessage() in engine/index.ts).
// - [JS] onRpcDataReceived() (this file)
@@ -45,13 +40,12 @@
private connection: init_trace_processor.Module;
private reqBufferAddr = 0;
private lastStderr: string[] = [];
- private postMessageChannel: PostMessageChannel;
+ private messagePort?: MessagePort;
- constructor(init: init_trace_processor.InitWasm, chan: PostMessageChannel) {
+ constructor() {
this.aborted = false;
- this.postMessageChannel = chan;
const deferredRuntimeInitialized = defer<void>();
- this.connection = init({
+ this.connection = init_trace_processor({
locateFile: (s: string) => s,
print: (line: string) => console.log(line),
printErr: (line: string) => this.appendAndLogErr(line),
@@ -67,10 +61,21 @@
});
}
- onRpcDataReceived(data: Uint8Array) {
+ initialize(port: MessagePort) {
+ // Ensure that initialize() is called only once.
+ assertTrue(this.messagePort === undefined);
+ this.messagePort = port;
+ // Note: setting .onmessage implicitly calls port.start() and dispatches the
+ // queued messages. addEventListener('message') doesn't.
+ this.messagePort.onmessage = this.onMessage.bind(this);
+ }
+
+ onMessage(msg: MessageEvent) {
if (this.aborted) {
throw new Error('Wasm module crashed');
}
+ assertTrue(msg.data instanceof Uint8Array);
+ const data = msg.data as Uint8Array;
let wrSize = 0;
// If the request data is larger than our JS<>Wasm interop buffer, split it
// into multiple writes. The RPC channel is byte-oriented and is designed to
@@ -103,7 +108,7 @@
// code while in the ccall(trace_processor_on_rpc_request).
private onReply(heapPtr: number, size: number) {
const data = this.connection.HEAPU8.slice(heapPtr, heapPtr + size);
- this.postMessageChannel.postMessage(data, [data.buffer]);
+ assertExists(this.messagePort).postMessage(data, [data.buffer]);
}
private appendAndLogErr(line: string) {
diff --git a/ui/src/frontend/index.ts b/ui/src/frontend/index.ts
index 7d6ec9b..80712cf 100644
--- a/ui/src/frontend/index.ts
+++ b/ui/src/frontend/index.ts
@@ -31,6 +31,10 @@
} from '../common/logs';
import {MetricResult} from '../common/metric_data';
import {CurrentSearchResults, SearchSummary} from '../common/search_data';
+import {
+ ControllerWorkerInitMessage,
+ EngineWorkerInitMessage
+} from '../common/worker_messages';
import {AnalyzePage} from './analyze_page';
import {loadAndroidBugToolInfo} from './android_bug_tool';
@@ -66,6 +70,9 @@
return ['127.0.0.1', 'localhost'].includes((new URL(url)).hostname);
}
+let idleWasmWorker: Worker;
+let activeWasmWorker: Worker;
+
/**
* The API the main thread exposes to the controller.
*/
@@ -265,6 +272,24 @@
this.redraw();
}
+ // This method is called by the controller via the Remote<> interface whenver
+ // a new trace is loaded. This creates a new worker and passes it the
+ // MessagePort received by the controller. This is because on Safari, all
+ // workers must be spawned from the main thread.
+ resetEngineWorker(port: MessagePort) {
+ // We keep always an idle worker around, the first one is created by the
+ // main() below, so we can hide the latency of the Wasm initialization.
+ if (activeWasmWorker !== undefined) {
+ activeWasmWorker.terminate();
+ }
+ // Swap the active worker with the idle one and create a new idle worker
+ // for the next trace.
+ activeWasmWorker = assertExists(idleWasmWorker);
+ const msg: EngineWorkerInitMessage = {enginePort: port};
+ activeWasmWorker.postMessage(msg, [port]);
+ idleWasmWorker = new Worker(globals.root + 'engine_bundle.js');
+ }
+
private redraw(): void {
if (globals.state.route &&
globals.state.route !== this.router.getRouteFromHash()) {
@@ -356,6 +381,7 @@
window.addEventListener('unhandledrejection', e => reportError(e));
const controller = new Worker(globals.root + 'controller_bundle.js');
+ idleWasmWorker = new Worker(globals.root + 'engine_bundle.js');
const frontendChannel = new MessageChannel();
const controllerChannel = new MessageChannel();
const extensionLocalChannel = new MessageChannel();
@@ -364,20 +390,18 @@
errorReportingChannel.port2.onmessage = (e) =>
maybeShowErrorDialog(`${e.data}`);
- controller.postMessage(
- {
- frontendPort: frontendChannel.port1,
- controllerPort: controllerChannel.port1,
- extensionPort: extensionLocalChannel.port1,
- errorReportingPort: errorReportingChannel.port1,
- },
- [
- frontendChannel.port1,
- controllerChannel.port1,
- extensionLocalChannel.port1,
- errorReportingChannel.port1,
- ]);
-
+ const msg: ControllerWorkerInitMessage = {
+ frontendPort: frontendChannel.port1,
+ controllerPort: controllerChannel.port1,
+ extensionPort: extensionLocalChannel.port1,
+ errorReportingPort: errorReportingChannel.port1,
+ };
+ controller.postMessage(msg, [
+ msg.frontendPort,
+ msg.controllerPort,
+ msg.extensionPort,
+ msg.errorReportingPort,
+ ]);
const dispatch =
controllerChannel.port2.postMessage.bind(controllerChannel.port2);
globals.initialize(dispatch, controller);