UI: spawn worker from the frontend

This CL spawns the engine worker (The thing that
hosts the TraceProcessor Wasm module) directly from
the frontend, rather than using a nested worker.
This simplifies further lots of the Wasm-related
boilerplate.
It also avoids relying on nested worker which
doesn't work with Safari.

Test: manual on Safari and Firefox.
Bug: 159142289
Bug: 143973344
Change-Id: I92fe949ae5dc69ac34e2e638ed5d7279978e3c26
diff --git a/ui/src/common/wasm_engine_proxy.ts b/ui/src/common/wasm_engine_proxy.ts
index aabcc12..5f7715d 100644
--- a/ui/src/common/wasm_engine_proxy.ts
+++ b/ui/src/common/wasm_engine_proxy.ts
@@ -15,64 +15,19 @@
 import {assertTrue} from '../base/logging';
 import {Engine, LoadingTracker} from './engine';
 
-const activeWorkers = new Map<string, Worker>();
-let warmWorker: null|Worker = null;
-
-function createWorker(): Worker {
-  return new Worker('engine_bundle.js');
-}
-
-// Take the warm engine and start creating a new WASM engine in the background
-// for the next call.
-export function createWasmEngine(id: string): Worker {
-  if (warmWorker === null) {
-    throw new Error('warmupWasmEngine() not called');
-  }
-  if (activeWorkers.has(id)) {
-    throw new Error(`Duplicate worker ID ${id}`);
-  }
-  const activeWorker = warmWorker;
-  warmWorker = createWorker();
-  activeWorkers.set(id, activeWorker);
-  return activeWorker;
-}
-
-export function destroyWasmEngine(id: string) {
-  if (!activeWorkers.has(id)) {
-    throw new Error(`Cannot find worker ID ${id}`);
-  }
-  activeWorkers.get(id)!.terminate();
-  activeWorkers.delete(id);
-}
-
-/**
- * It's quite slow to compile WASM and (in Chrome) this happens every time
- * a worker thread attempts to load a WASM module since there is no way to
- * cache the compiled code currently. To mitigate this we can always keep a
- * WASM backend 'ready to go' just waiting to be provided with a trace file.
- * warmupWasmEngineWorker (together with getWasmEngineWorker)
- * implement this behaviour.
- */
-export function warmupWasmEngine(): void {
-  if (warmWorker !== null) {
-    throw new Error('warmupWasmEngine() already called');
-  }
-  warmWorker = createWorker();
-}
-
 /**
  * This implementation of Engine uses a WASM backend hosted in a separate
  * worker thread.
  */
 export class WasmEngineProxy extends Engine {
   readonly id: string;
-  private readonly worker: Worker;
+  private port: MessagePort;
 
-  constructor(id: string, worker: Worker, loadingTracker?: LoadingTracker) {
+  constructor(id: string, port: MessagePort, loadingTracker?: LoadingTracker) {
     super(loadingTracker);
     this.id = id;
-    this.worker = worker;
-    this.worker.onmessage = this.onMessage.bind(this);
+    this.port = port;
+    this.port.onmessage = this.onMessage.bind(this);
   }
 
   onMessage(m: MessageEvent) {
@@ -84,6 +39,6 @@
     // We deliberately don't use a transfer list because protobufjs reuses the
     // same buffer when encoding messages (which is good, because creating a new
     // TypedArray for each decode operation would be too expensive).
-    this.worker.postMessage(data);
+    this.port.postMessage(data);
   }
 }
diff --git a/ui/src/common/worker_messages.ts b/ui/src/common/worker_messages.ts
new file mode 100644
index 0000000..b60a656
--- /dev/null
+++ b/ui/src/common/worker_messages.ts
@@ -0,0 +1,48 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// This file defines the API of messages exchanged between frontend and
+// {engine, controller} worker when bootstrapping the workers.
+// Those messages are sent only once. The rest of the communication happens
+// over the MessagePort(s) that are sent in the init message.
+
+// This is so we can create all the workers in a central place in the frontend
+// (Safari still doesn't spawning workers from other workers) but then let them
+// communicate by sending the right MessagePort to them.
+
+// Frontend -> Engine initialization message.
+export interface EngineWorkerInitMessage {
+  // The port used to receive engine messages (e.g., query commands).
+  // The controller owns the other end of the MessageChannel
+  // (see resetEngineWorker()).
+  enginePort: MessagePort;
+}
+
+// Frontend -> Controller initialization message.
+export interface ControllerWorkerInitMessage {
+  // For receiving dispatch() commands from the frontend. This is where most of
+  // the frontend <> controller interaction happens.
+  controllerPort: MessagePort;
+
+  // For publishing results back to the frontend. This is used for one-way
+  // non-retained publish() operations (e.g. track data after a query).
+  frontendPort: MessagePort;
+
+  // For controller <> Chrome extension communication.
+  extensionPort: MessagePort;
+
+  // For reporting errors back to the frontend. This is a dedicated port to
+  // reduce depdencies on the business logic behind the other ports.
+  errorReportingPort: MessagePort;
+}
diff --git a/ui/src/controller/globals.ts b/ui/src/controller/globals.ts
index 8c5be70..d47af46 100644
--- a/ui/src/controller/globals.ts
+++ b/ui/src/controller/globals.ts
@@ -100,6 +100,19 @@
         .send<void>(`publish${what}`, [data], transferList);
   }
 
+  // Returns the port of the MessageChannel that can be used to communicate with
+  // the Wasm Engine (issue SQL queries and retrieve results).
+  resetEngineWorker() {
+    const chan = new MessageChannel();
+    const port = chan.port1;
+    // Invokes resetEngineWorker() in frontend/index.ts. It will spawn a new
+    // worker and assign it the passed |port|.
+    assertExists(this._frontend).send<void>('resetEngineWorker', [port], [
+      port
+    ]);
+    return chan.port2;
+  }
+
   get state(): State {
     return assertExists(this._state);
   }
diff --git a/ui/src/controller/index.ts b/ui/src/controller/index.ts
index bf655b4..9f5add7 100644
--- a/ui/src/controller/index.ts
+++ b/ui/src/controller/index.ts
@@ -16,30 +16,21 @@
 
 import {reportError, setErrorHandler} from '../base/logging';
 import {Remote} from '../base/remote';
-import {warmupWasmEngine} from '../common/wasm_engine_proxy';
-
+import {ControllerWorkerInitMessage} from '../common/worker_messages';
 import {AppController} from './app_controller';
 import {globals} from './globals';
 
-interface OnMessageArg {
-  data: {
-    frontendPort: MessagePort; controllerPort: MessagePort;
-    extensionPort: MessagePort;
-    errorReportingPort: MessagePort;
-  };
-}
-
 function main() {
   self.addEventListener('error', e => reportError(e));
   self.addEventListener('unhandledrejection', e => reportError(e));
-  warmupWasmEngine();
   let initialized = false;
-  self.onmessage = ({data}: OnMessageArg) => {
+  self.onmessage = (e: MessageEvent) => {
     if (initialized) {
       console.error('Already initialized');
       return;
     }
     initialized = true;
+    const data = e.data as ControllerWorkerInitMessage;
     const frontendPort = data.frontendPort;
     const controllerPort = data.controllerPort;
     const extensionPort = data.extensionPort;
diff --git a/ui/src/controller/trace_controller.ts b/ui/src/controller/trace_controller.ts
index 1b3580e..8427b7c 100644
--- a/ui/src/controller/trace_controller.ts
+++ b/ui/src/controller/trace_controller.ts
@@ -26,11 +26,7 @@
 import {EngineMode} from '../common/state';
 import {toNs, toNsCeil, toNsFloor} from '../common/time';
 import {TimeSpan} from '../common/time';
-import {
-  createWasmEngine,
-  destroyWasmEngine,
-  WasmEngineProxy
-} from '../common/wasm_engine_proxy';
+import {WasmEngineProxy} from '../common/wasm_engine_proxy';
 import {QuantizedLoad, ThreadDesc} from '../frontend/globals';
 
 import {
@@ -98,12 +94,6 @@
     this.engineId = engineId;
   }
 
-  onDestroy() {
-    if (this.engine instanceof WasmEngineProxy) {
-      destroyWasmEngine(this.engine.id);
-    }
-  }
-
   run() {
     const engineCfg = assertExists(globals.state.engines[this.engineId]);
     switch (this.state) {
@@ -231,10 +221,9 @@
     } else {
       console.log('Opening trace using built-in WASM engine');
       engineMode = 'WASM';
+      const enginePort = globals.resetEngineWorker();
       this.engine = new WasmEngineProxy(
-          this.engineId,
-          createWasmEngine(this.engineId),
-          LoadingManager.getInstance);
+          this.engineId, enginePort, LoadingManager.getInstance);
     }
 
     globals.dispatch(Actions.setEngineReady({
diff --git a/ui/src/engine/index.ts b/ui/src/engine/index.ts
index 3a4020d..8a7c27e 100644
--- a/ui/src/engine/index.ts
+++ b/ui/src/engine/index.ts
@@ -12,32 +12,30 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-import {assertTrue} from '../base/logging';
-import * as init_trace_processor from '../gen/trace_processor';
-
+import {assertExists} from '../base/logging';
+import {EngineWorkerInitMessage} from '../common/worker_messages';
 import {WasmBridge} from './wasm_bridge';
 
 const selfWorker = self as {} as Worker;
+const wasmBridge = new WasmBridge();
 
-// Messages can arrive before the Wasm module initialization is complete.
-// Queue these for later.
-const msgQueue: MessageEvent[] = [];
+// There are two message handlers here:
+// 1. The Worker (self.onmessage) handler.
+// 2. The MessagePort handler.
+// When the app bootstraps, frontend/index.ts creates a MessageChannel and sends
+// one end to the controller (the other worker) and the other end to us, so that
+// the controller can interact with the Wasm worker without roundtrips through
+// the frontend.
+// The sequence of actions is the following:
+// 1. The frontend does one postMessage({port: MessagePort}) on the Worker
+//    scope. This message transfers the MessagePort (whose other end is
+//    connected to the Conotroller). This is the only postMessage we'll ever
+//    receive here.
+// 2. All the other messages (i.e. the TraceProcessor RPC binary pipe) will be
+//    received on the MessagePort.
+
+// Receives the boostrap message from the frontend with the MessagePort.
 selfWorker.onmessage = (msg: MessageEvent) => {
-  msgQueue.push(msg);
+  const port = assertExists((msg.data as EngineWorkerInitMessage).enginePort);
+  wasmBridge.initialize(port);
 };
-
-const bridge = new WasmBridge(init_trace_processor, selfWorker);
-bridge.whenInitialized.then(() => {
-  const handleMsg = (msg: MessageEvent) => {
-    assertTrue(msg.data instanceof Uint8Array);
-    bridge.onRpcDataReceived(msg.data as Uint8Array);
-  };
-
-  // Dispatch queued messages.
-  let msg;
-  while (msg = msgQueue.shift()) {
-    handleMsg(msg);
-  }
-
-  selfWorker.onmessage = handleMsg;
-});
diff --git a/ui/src/engine/wasm_bridge.ts b/ui/src/engine/wasm_bridge.ts
index 92813e2..79e7e65 100644
--- a/ui/src/engine/wasm_bridge.ts
+++ b/ui/src/engine/wasm_bridge.ts
@@ -13,6 +13,7 @@
 // limitations under the License.
 
 import {defer} from '../base/deferred';
+import {assertExists, assertTrue} from '../base/logging';
 import * as init_trace_processor from '../gen/trace_processor';
 
 // The Initialize() call will allocate a buffer of REQ_BUF_SIZE bytes which
@@ -22,12 +23,6 @@
 // HEAPU8[reqBufferAddr, +REQ_BUFFER_SIZE].
 const REQ_BUF_SIZE = 32 * 1024 * 1024;
 
-// The common denominator between Worker and MessageChannel. Used to send
-// messages, regardless of Worker vs a dedicated MessagePort.
-export interface PostMessageChannel {
-  postMessage(message: Uint8Array, transfer: Transferable[]): void;
-}
-
 // The end-to-end interaction between JS and Wasm is as follows:
 // - [JS] Inbound data received by the worker (onmessage() in engine/index.ts).
 //   - [JS] onRpcDataReceived() (this file)
@@ -45,13 +40,12 @@
   private connection: init_trace_processor.Module;
   private reqBufferAddr = 0;
   private lastStderr: string[] = [];
-  private postMessageChannel: PostMessageChannel;
+  private messagePort?: MessagePort;
 
-  constructor(init: init_trace_processor.InitWasm, chan: PostMessageChannel) {
+  constructor() {
     this.aborted = false;
-    this.postMessageChannel = chan;
     const deferredRuntimeInitialized = defer<void>();
-    this.connection = init({
+    this.connection = init_trace_processor({
       locateFile: (s: string) => s,
       print: (line: string) => console.log(line),
       printErr: (line: string) => this.appendAndLogErr(line),
@@ -67,10 +61,21 @@
     });
   }
 
-  onRpcDataReceived(data: Uint8Array) {
+  initialize(port: MessagePort) {
+    // Ensure that initialize() is called only once.
+    assertTrue(this.messagePort === undefined);
+    this.messagePort = port;
+    // Note: setting .onmessage implicitly calls port.start() and dispatches the
+    // queued messages. addEventListener('message') doesn't.
+    this.messagePort.onmessage = this.onMessage.bind(this);
+  }
+
+  onMessage(msg: MessageEvent) {
     if (this.aborted) {
       throw new Error('Wasm module crashed');
     }
+    assertTrue(msg.data instanceof Uint8Array);
+    const data = msg.data as Uint8Array;
     let wrSize = 0;
     // If the request data is larger than our JS<>Wasm interop buffer, split it
     // into multiple writes. The RPC channel is byte-oriented and is designed to
@@ -103,7 +108,7 @@
   // code while in the ccall(trace_processor_on_rpc_request).
   private onReply(heapPtr: number, size: number) {
     const data = this.connection.HEAPU8.slice(heapPtr, heapPtr + size);
-    this.postMessageChannel.postMessage(data, [data.buffer]);
+    assertExists(this.messagePort).postMessage(data, [data.buffer]);
   }
 
   private appendAndLogErr(line: string) {
diff --git a/ui/src/frontend/index.ts b/ui/src/frontend/index.ts
index 7d6ec9b..80712cf 100644
--- a/ui/src/frontend/index.ts
+++ b/ui/src/frontend/index.ts
@@ -31,6 +31,10 @@
 } from '../common/logs';
 import {MetricResult} from '../common/metric_data';
 import {CurrentSearchResults, SearchSummary} from '../common/search_data';
+import {
+  ControllerWorkerInitMessage,
+  EngineWorkerInitMessage
+} from '../common/worker_messages';
 
 import {AnalyzePage} from './analyze_page';
 import {loadAndroidBugToolInfo} from './android_bug_tool';
@@ -66,6 +70,9 @@
   return ['127.0.0.1', 'localhost'].includes((new URL(url)).hostname);
 }
 
+let idleWasmWorker: Worker;
+let activeWasmWorker: Worker;
+
 /**
  * The API the main thread exposes to the controller.
  */
@@ -265,6 +272,24 @@
     this.redraw();
   }
 
+  // This method is called by the controller via the Remote<> interface whenver
+  // a new trace is loaded. This creates a new worker and passes it the
+  // MessagePort received by the controller. This is because on Safari, all
+  // workers must be spawned from the main thread.
+  resetEngineWorker(port: MessagePort) {
+    // We keep always an idle worker around, the first one is created by the
+    // main() below, so we can hide the latency of the Wasm initialization.
+    if (activeWasmWorker !== undefined) {
+      activeWasmWorker.terminate();
+    }
+    // Swap the active worker with the idle one and create a new idle worker
+    // for the next trace.
+    activeWasmWorker = assertExists(idleWasmWorker);
+    const msg: EngineWorkerInitMessage = {enginePort: port};
+    activeWasmWorker.postMessage(msg, [port]);
+    idleWasmWorker = new Worker(globals.root + 'engine_bundle.js');
+  }
+
   private redraw(): void {
     if (globals.state.route &&
         globals.state.route !== this.router.getRouteFromHash()) {
@@ -356,6 +381,7 @@
   window.addEventListener('unhandledrejection', e => reportError(e));
 
   const controller = new Worker(globals.root + 'controller_bundle.js');
+  idleWasmWorker = new Worker(globals.root + 'engine_bundle.js');
   const frontendChannel = new MessageChannel();
   const controllerChannel = new MessageChannel();
   const extensionLocalChannel = new MessageChannel();
@@ -364,20 +390,18 @@
   errorReportingChannel.port2.onmessage = (e) =>
       maybeShowErrorDialog(`${e.data}`);
 
-  controller.postMessage(
-      {
-        frontendPort: frontendChannel.port1,
-        controllerPort: controllerChannel.port1,
-        extensionPort: extensionLocalChannel.port1,
-        errorReportingPort: errorReportingChannel.port1,
-      },
-      [
-        frontendChannel.port1,
-        controllerChannel.port1,
-        extensionLocalChannel.port1,
-        errorReportingChannel.port1,
-      ]);
-
+  const msg: ControllerWorkerInitMessage = {
+    frontendPort: frontendChannel.port1,
+    controllerPort: controllerChannel.port1,
+    extensionPort: extensionLocalChannel.port1,
+    errorReportingPort: errorReportingChannel.port1,
+  };
+  controller.postMessage(msg, [
+    msg.frontendPort,
+    msg.controllerPort,
+    msg.extensionPort,
+    msg.errorReportingPort,
+  ]);
   const dispatch =
       controllerChannel.port2.postMessage.bind(controllerChannel.port2);
   globals.initialize(dispatch, controller);