UI; unify File/ArrayBuffer/fetch() into TraceStream
This CL unifies the three code paths for loading traces
introducing a TraceStream class. This class exposes
a unified interface for loading a trace in chunks and
provides three implementations for opening a trace from:
(1) A File object; (2) An ArrayBuffer (for window.open());
(3) A url.
Also this CL passes WASM input on a dedicated buffer
rather than on the WASM stack. This allows the bump the
chunk size when loading from file from 1MB to 32MB, upping
significantly the trace load speed because of hitting
JS <> WASM rundtrips less frequently.
Bug: 143074239
Change-Id: I97660b47d26378428be514fc79667cf9408dbdfe
diff --git a/gn/standalone/wasm_typescript_declaration.d.ts b/gn/standalone/wasm_typescript_declaration.d.ts
index 4ac92e9..9fa3538 100644
--- a/gn/standalone/wasm_typescript_declaration.d.ts
+++ b/gn/standalone/wasm_typescript_declaration.d.ts
@@ -52,7 +52,7 @@
returnType: string,
argTypes: string[],
args: any[],
- ): void;
+ ): number;
HEAPU8: Uint8Array;
FS: FileSystem;
}
diff --git a/src/trace_processor/rpc/wasm_bridge.cc b/src/trace_processor/rpc/wasm_bridge.cc
index f78eeb9..0f80a65 100644
--- a/src/trace_processor/rpc/wasm_bridge.cc
+++ b/src/trace_processor/rpc/wasm_bridge.cc
@@ -42,27 +42,35 @@
namespace {
Rpc* g_trace_processor_rpc;
ReplyFunction g_reply;
+
+// The buffer used to pass the request arguments. The caller (JS) decides how
+// big this buffer should be in the Initialize() call.
+uint8_t* g_req_buf;
+
} // namespace
+
// +---------------------------------------------------------------------------+
// | Exported functions called by the JS/TS running in the worker. |
// +---------------------------------------------------------------------------+
extern "C" {
-void EMSCRIPTEN_KEEPALIVE Initialize(ReplyFunction);
-void Initialize(ReplyFunction reply_function) {
- PERFETTO_ILOG("Initializing WASM bridge");
+// Returns the address of the allocated request buffer.
+uint8_t* EMSCRIPTEN_KEEPALIVE Initialize(ReplyFunction, uint32_t);
+uint8_t* Initialize(ReplyFunction reply_function, uint32_t req_buffer_size) {
g_trace_processor_rpc = new Rpc();
g_reply = reply_function;
+ g_req_buf = new uint8_t[req_buffer_size];
+ return g_req_buf;
}
-void EMSCRIPTEN_KEEPALIVE trace_processor_parse(RequestID,
- const uint8_t*,
- uint32_t);
-void trace_processor_parse(RequestID id, const uint8_t* data, size_t size) {
+// Ingests trace data.
+void EMSCRIPTEN_KEEPALIVE trace_processor_parse(RequestID, uint32_t);
+void trace_processor_parse(RequestID id, size_t size) {
// TODO(primiano): LoadTrace() makes a copy of the data, which is unfortunate.
// Ideally there should be a way to take the Blob coming from JS and move it.
// See https://github.com/WebAssembly/design/issues/1162.
- auto status = g_trace_processor_rpc->LoadTrace(data, size, /*eof=*/false);
+ auto status =
+ g_trace_processor_rpc->LoadTrace(g_req_buf, size, /*eof=*/false);
if (status.ok()) {
g_reply(id, true, "", 0);
} else {
@@ -73,23 +81,16 @@
// We keep the same signature as other methods even though we don't take input
// arguments for simplicity.
-void EMSCRIPTEN_KEEPALIVE trace_processor_notifyEof(RequestID,
- const uint8_t*,
- uint32_t);
-void trace_processor_notifyEof(RequestID id, const uint8_t*, uint32_t size) {
+void EMSCRIPTEN_KEEPALIVE trace_processor_notifyEof(RequestID, uint32_t);
+void trace_processor_notifyEof(RequestID id, uint32_t size) {
PERFETTO_DCHECK(!size);
g_trace_processor_rpc->LoadTrace(nullptr, 0, /*eof=*/true);
g_reply(id, true, "", 0);
}
-void EMSCRIPTEN_KEEPALIVE trace_processor_rawQuery(RequestID,
- const uint8_t*,
- int);
-void trace_processor_rawQuery(RequestID id,
- const uint8_t* query_data,
- int len) {
- std::vector<uint8_t> res =
- g_trace_processor_rpc->RawQuery(query_data, static_cast<size_t>(len));
+void EMSCRIPTEN_KEEPALIVE trace_processor_rawQuery(RequestID, uint32_t);
+void trace_processor_rawQuery(RequestID id, uint32_t len) {
+ std::vector<uint8_t> res = g_trace_processor_rpc->RawQuery(g_req_buf, len);
g_reply(id, true, reinterpret_cast<const char*>(res.data()),
static_cast<uint32_t>(res.size()));
diff --git a/ui/src/common/engine.ts b/ui/src/common/engine.ts
index cb99f09..5d5ab0b 100644
--- a/ui/src/common/engine.ts
+++ b/ui/src/common/engine.ts
@@ -22,7 +22,6 @@
export class NullLoadingTracker implements LoadingTracker {
beginLoading(): void {}
-
endLoading(): void {}
}
@@ -51,7 +50,7 @@
* Push trace data into the engine. The engine is supposed to automatically
* figure out the type of the trace (JSON vs Protobuf).
*/
- abstract parse(data: Uint8Array): void;
+ abstract parse(data: Uint8Array): Promise<void>;
/**
* Notify the engine no more data is coming.
diff --git a/ui/src/common/wasm_engine_proxy.ts b/ui/src/common/wasm_engine_proxy.ts
index 8194894..52ea141 100644
--- a/ui/src/common/wasm_engine_proxy.ts
+++ b/ui/src/common/wasm_engine_proxy.ts
@@ -71,19 +71,18 @@
* worker thread.
*/
export class WasmEngineProxy extends Engine {
+ readonly id: string;
private readonly worker: Worker;
private readonly traceProcessor_: TraceProcessor;
private pendingCallbacks: Map<number, protobufjs.RPCImplCallback>;
private nextRequestId: number;
- readonly id: string;
- constructor(
- args: {id: string, loadingTracker?: LoadingTracker, worker: Worker}) {
- super(args.loadingTracker);
+ constructor(id: string, worker: Worker, loadingTracker?: LoadingTracker) {
+ super(loadingTracker);
this.nextRequestId = 0;
this.pendingCallbacks = new Map();
- this.id = args.id;
- this.worker = args.worker;
+ this.id = id;
+ this.worker = worker;
this.worker.onmessage = this.onMessage.bind(this);
this.traceProcessor_ =
TraceProcessor.create(this.rpcImpl.bind(this, 'trace_processor'));
diff --git a/ui/src/controller/globals.ts b/ui/src/controller/globals.ts
index 244c6cc..e268932 100644
--- a/ui/src/controller/globals.ts
+++ b/ui/src/controller/globals.ts
@@ -17,16 +17,8 @@
import {assertExists} from '../base/logging';
import {Remote} from '../base/remote';
import {DeferredAction, StateActions} from '../common/actions';
-import {Engine} from '../common/engine';
import {createEmptyState, State} from '../common/state';
-import {
- createWasmEngine,
- destroyWasmEngine,
- WasmEngineProxy
-} from '../common/wasm_engine_proxy';
-
import {ControllerAny} from './controller';
-import {LoadingManager} from './loading_manager';
type PublishKinds =
'OverviewData'|'TrackData'|'Threads'|'QueryResult'|'LegacyTrace'|
@@ -93,19 +85,6 @@
assertExists(this._frontend).send<void>('patchState', [patches]);
}
- createEngine(): Engine {
- const id = new Date().toUTCString();
- return new WasmEngineProxy({
- id,
- worker: createWasmEngine(id),
- loadingTracker: LoadingManager.getInstance,
- });
- }
-
- destroyEngine(id: string): void {
- destroyWasmEngine(id);
- }
-
// TODO: this needs to be cleaned up.
publish(what: PublishKinds, data: {}, transferList?: Transferable[]) {
assertExists(this._frontend)
diff --git a/ui/src/controller/trace_controller.ts b/ui/src/controller/trace_controller.ts
index d91232f..c503ac2 100644
--- a/ui/src/controller/trace_controller.ts
+++ b/ui/src/controller/trace_controller.ts
@@ -27,6 +27,11 @@
import {SCROLLING_TRACK_GROUP} from '../common/state';
import {toNs, toNsCeil, toNsFloor} from '../common/time';
import {TimeSpan} from '../common/time';
+import {
+ createWasmEngine,
+ destroyWasmEngine,
+ WasmEngineProxy
+} from '../common/wasm_engine_proxy';
import {QuantizedLoad, ThreadDesc} from '../frontend/globals';
import {ANDROID_LOGS_TRACK_KIND} from '../tracks/android_log/common';
import {SLICE_TRACK_KIND} from '../tracks/chrome_slices/common';
@@ -45,6 +50,7 @@
import {Child, Children, Controller} from './controller';
import {globals} from './globals';
+import {LoadingManager} from './loading_manager';
import {LogsController} from './logs_controller';
import {QueryController, QueryControllerArgs} from './query_controller';
import {SearchController} from './search_controller';
@@ -52,15 +58,16 @@
SelectionController,
SelectionControllerArgs
} from './selection_controller';
+import {
+ TraceBufferStream,
+ TraceFileStream,
+ TraceHttpStream,
+ TraceStream
+} from './trace_stream';
import {TrackControllerArgs, trackControllerRegistry} from './track_controller';
type States = 'init'|'loading_trace'|'ready';
-declare interface FileReaderSync { readAsArrayBuffer(blob: Blob): ArrayBuffer; }
-
-declare var FileReaderSync:
- {prototype: FileReaderSync; new (): FileReaderSync;};
-
interface ThreadSliceTrack {
maxDepth: number;
trackId: number;
@@ -80,7 +87,9 @@
}
onDestroy() {
- if (this.engine !== undefined) globals.destroyEngine(this.engine.id);
+ if (this.engine instanceof WasmEngineProxy) {
+ destroyWasmEngine(this.engine.id);
+ }
}
run() {
@@ -91,12 +100,19 @@
engineId: this.engineId,
ready: false,
}));
- this.loadTrace().then(() => {
- globals.dispatch(Actions.setEngineReady({
- engineId: this.engineId,
- ready: true,
- }));
- });
+ this.loadTrace()
+ .then(() => {
+ globals.dispatch(Actions.setEngineReady({
+ engineId: this.engineId,
+ ready: true,
+ }));
+ })
+ .catch(err => {
+ this.updateStatus(`${err}`);
+ this.setState('init');
+ console.error(err);
+ return;
+ });
this.updateStatus('Opening trace');
this.setState('loading_trace');
break;
@@ -155,63 +171,37 @@
private async loadTrace() {
this.updateStatus('Creating trace processor');
const engineCfg = assertExists(globals.state.engines[this.engineId]);
- this.engine = globals.createEngine();
- const statusHeader = 'Opening trace';
+ console.log('Opening trace using built-in WASM engine');
+ this.engine = new WasmEngineProxy(
+ this.engineId,
+ createWasmEngine(this.engineId),
+ LoadingManager.getInstance);
+ let traceStream: TraceStream;
if (engineCfg.source instanceof File) {
- const blob = engineCfg.source as Blob;
- const reader = new FileReaderSync();
- const SLICE_SIZE = 1024 * 1024;
- for (let off = 0; off < blob.size; off += SLICE_SIZE) {
- const slice = blob.slice(off, off + SLICE_SIZE);
- const arrBuf = reader.readAsArrayBuffer(slice);
- await this.engine.parse(new Uint8Array(arrBuf));
- const progress = Math.round((off + slice.size) / blob.size * 100);
- this.updateStatus(`${statusHeader} ${progress} %`);
- }
+ traceStream = new TraceFileStream(engineCfg.source);
} else if (engineCfg.source instanceof ArrayBuffer) {
- this.updateStatus(`${statusHeader} 0 %`);
- const buffer = new Uint8Array(engineCfg.source);
- const SLICE_SIZE = 1024 * 1024;
- for (let off = 0; off < buffer.byteLength; off += SLICE_SIZE) {
- const slice = buffer.subarray(off, off + SLICE_SIZE);
- await this.engine.parse(slice);
- const progress =
- Math.round((off + slice.byteLength) / buffer.byteLength * 100);
- this.updateStatus(`${statusHeader} ${progress} %`);
- }
+ traceStream = new TraceBufferStream(engineCfg.source);
} else {
- const resp = await fetch(engineCfg.source);
- if (resp.status !== 200) {
- this.updateStatus(`HTTP error ${resp.status}`);
- throw new Error(`fetch() failed with HTTP error ${resp.status}`);
- }
- // tslint:disable-next-line no-any
- const rd = (resp.body as any).getReader() as ReadableStreamReader;
- const tStartMs = performance.now();
- let tLastUpdateMs = 0;
- for (let off = 0;;) {
- const readRes = await rd.read() as {value: Uint8Array, done: boolean};
- if (readRes.value !== undefined) {
- off += readRes.value.length;
- await this.engine.parse(readRes.value);
- }
- // For traces loaded from the network there doesn't seem to be a
- // reliable way to compute the %. The content-length exposed by GCS is
- // before compression (which is handled transparently by the browser).
- const nowMs = performance.now();
- if (nowMs - tLastUpdateMs > 100) {
- tLastUpdateMs = nowMs;
- const mb = off / 1e6;
- const tElapsed = (nowMs - tStartMs) / 1e3;
- let status = `${statusHeader} ${mb.toFixed(1)} MB `;
- status += `(${(mb / tElapsed).toFixed(1)} MB/s)`;
- this.updateStatus(status);
- }
- if (readRes.done) break;
- }
+ traceStream = new TraceHttpStream(engineCfg.source);
}
+ const tStart = performance.now();
+ for (;;) {
+ const res = await traceStream.readChunk();
+ await this.engine.parse(res.data);
+ const elapsed = (performance.now() - tStart) / 1000;
+ let status = 'Loading trace ';
+ if (res.bytesTotal > 0) {
+ const progress = Math.round(res.bytesRead / res.bytesTotal * 100);
+ status += `${progress}%`;
+ } else {
+ status += `${Math.round(res.bytesRead / 1e6)} MB`;
+ }
+ status += ` - ${Math.ceil(res.bytesRead / elapsed / 1e6)} MB/s`;
+ this.updateStatus(status);
+ if (res.eof) break;
+ }
await this.engine.notifyEof();
const traceTime = await this.engine.getTraceTimeBounds();
diff --git a/ui/src/controller/trace_stream.ts b/ui/src/controller/trace_stream.ts
new file mode 100644
index 0000000..75ef8d4
--- /dev/null
+++ b/ui/src/controller/trace_stream.ts
@@ -0,0 +1,134 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+import {defer, Deferred} from '../base/deferred';
+import {assertExists, assertTrue} from '../base/logging';
+
+const SLICE_SIZE = 32 * 1024 * 1024;
+
+// The object returned by TraceStream.readChunk() promise.
+export interface TraceChunk {
+ data: Uint8Array;
+ eof: boolean;
+ bytesRead: number;
+ bytesTotal: number;
+}
+
+// Base interface for loading trace data in chunks.
+// The caller has to call readChunk() until TraceChunk.eof == true.
+export interface TraceStream {
+ readChunk(): Promise<TraceChunk>;
+}
+
+// Loads a trace from a File object. For the "open file" use case.
+export class TraceFileStream implements TraceStream {
+ private traceFile: Blob;
+ private reader: FileReader;
+ private pendingRead?: Deferred<TraceChunk>;
+ private bytesRead = 0;
+
+ constructor(traceFile: Blob) {
+ this.traceFile = traceFile;
+ this.reader = new FileReader();
+ this.reader.onloadend = () => this.onLoad();
+ }
+
+ onLoad() {
+ const res = assertExists(this.reader.result) as ArrayBuffer;
+ const pendingRead = assertExists(this.pendingRead);
+ this.pendingRead = undefined;
+ if (this.reader.error) {
+ pendingRead.reject(this.reader.error);
+ return;
+ }
+ this.bytesRead += res.byteLength;
+ pendingRead.resolve({
+ data: new Uint8Array(res),
+ eof: this.bytesRead >= this.traceFile.size,
+ bytesRead: this.bytesRead,
+ bytesTotal: this.traceFile.size,
+ });
+ }
+
+ readChunk(): Promise<TraceChunk> {
+ const sliceEnd = Math.min(this.bytesRead + SLICE_SIZE, this.traceFile.size);
+ const slice = this.traceFile.slice(this.bytesRead, sliceEnd);
+ this.pendingRead = defer<TraceChunk>();
+ this.reader.readAsArrayBuffer(slice);
+ return this.pendingRead;
+ }
+}
+
+// Loads a trace from an ArrayBuffer. For the window.open() + postMessage
+// use-case, used by other dashboards (see post_message_handler.ts).
+export class TraceBufferStream implements TraceStream {
+ private traceBuf: ArrayBuffer;
+ private bytesRead = 0;
+
+ constructor(traceBuf: ArrayBuffer) {
+ this.traceBuf = traceBuf;
+ }
+
+ readChunk(): Promise<TraceChunk> {
+ assertTrue(this.bytesRead <= this.traceBuf.byteLength);
+ const len = Math.min(SLICE_SIZE, this.traceBuf.byteLength - this.bytesRead);
+ const data = new Uint8Array(this.traceBuf, this.bytesRead, len);
+ this.bytesRead += len;
+ return Promise.resolve({
+ data,
+ eof: this.bytesRead >= this.traceBuf.byteLength,
+ bytesRead: this.bytesRead,
+ bytesTotal: this.traceBuf.byteLength,
+ });
+ }
+}
+
+// Loads a stream from a URL via fetch(). For the permalink (?s=UUID) and
+// open url (?url=http://...) cases.
+export class TraceHttpStream implements TraceStream {
+ private bytesRead = 0;
+ private bytesTotal = 0;
+ private uri: string;
+ private httpStream?: ReadableStreamReader;
+
+ constructor(uri: string) {
+ assertTrue(uri.startsWith('http://') || uri.startsWith('https://'));
+ this.uri = uri;
+ }
+
+ async readChunk(): Promise<TraceChunk> {
+ // Initialize the fetch() job on the first read request.
+ if (this.httpStream === undefined) {
+ const response = await fetch(this.uri);
+ if (response.status !== 200) {
+ throw new Error(`HTTP ${response.status} - ${response.statusText}`);
+ }
+ const len = response.headers.get('Content-Length');
+ this.bytesTotal = len ? Number.parseInt(len, 10) : 0;
+ // tslint:disable-next-line no-any
+ this.httpStream = (response.body as any).getReader();
+ }
+
+ const res =
+ (await this.httpStream!.read()) as {value?: Uint8Array, done: boolean};
+ const data = res.value ? res.value : new Uint8Array();
+ this.bytesRead += data.length;
+ return {
+ data,
+ eof: res.done,
+ bytesRead: this.bytesRead,
+ bytesTotal: this.bytesTotal,
+ };
+ }
+}
diff --git a/ui/src/engine/wasm_bridge.ts b/ui/src/engine/wasm_bridge.ts
index 129432f..86e39ff 100644
--- a/ui/src/engine/wasm_bridge.ts
+++ b/ui/src/engine/wasm_bridge.ts
@@ -16,6 +16,13 @@
import {assertExists, assertTrue} from '../base/logging';
import * as init_trace_processor from '../gen/trace_processor';
+// The Initialize() call will allocate a buffer of REQ_BUF_SIZE bytes which
+// will be used to copy the input request data. This is to avoid passing the
+// input data on the stack, which has a limited (~1MB) size.
+// The buffer will be allocated by the C++ side and reachable at
+// HEAPU8[reqBufferAddr, +REQ_BUFFER_SIZE].
+const REQ_BUF_SIZE = 32 * 1024 * 1024;
+
function writeToUIConsole(line: string) {
console.log(line);
}
@@ -40,6 +47,7 @@
private aborted: boolean;
private currentRequestResult: WasmBridgeResponse|null;
private connection: init_trace_processor.Module;
+ private reqBufferAddr = 0;
constructor(init: init_trace_processor.InitWasm) {
this.aborted = false;
@@ -55,7 +63,8 @@
});
this.whenInitialized = deferredRuntimeInitialized.then(() => {
const fn = this.connection.addFunction(this.onReply.bind(this), 'viiii');
- this.connection.ccall('Initialize', 'void', ['number'], [fn]);
+ this.reqBufferAddr = this.connection.ccall(
+ 'Initialize', 'number', ['number', 'number'], [fn, REQ_BUF_SIZE]);
});
}
@@ -70,12 +79,15 @@
// TODO(b/124805622): protoio can generate CamelCase names - normalize.
const methodName = req.methodName;
const name = methodName.charAt(0).toLowerCase() + methodName.slice(1);
+ assertTrue(req.data.length <= REQ_BUF_SIZE);
+ const endAddr = this.reqBufferAddr + req.data.length;
+ this.connection.HEAPU8.subarray(this.reqBufferAddr, endAddr).set(req.data);
this.connection.ccall(
- `${req.serviceName}_${name}`, // C method name.
- 'void', // Return type.
- ['number', 'array', 'number'], // Input args.
- [req.id, req.data, req.data.length] // Args.
- );
+ `${req.serviceName}_${name}`, // C method name.
+ 'void', // Return type.
+ ['number', 'number'], // Input args.
+ [req.id, req.data.length] // Args.
+ );
const result = assertExists(this.currentRequestResult);
assertTrue(req.id === result.id);
diff --git a/ui/src/query/index.ts b/ui/src/query/index.ts
index aff94bc..bbb1f2c 100644
--- a/ui/src/query/index.ts
+++ b/ui/src/query/index.ts
@@ -169,10 +169,7 @@
if (this.engine) {
destroyWasmEngine(kEngineId);
}
- this.engine = new WasmEngineProxy({
- id: 'engine',
- worker: createWasmEngine(kEngineId),
- });
+ this.engine = new WasmEngineProxy('engine', createWasmEngine(kEngineId));
this.file = input.file;
this.readNextSlice(0);
@@ -297,7 +294,7 @@
isResult(q) ? `${q.executionTimeNs / 1000000}ms` : ''),
isResult(q) ? m('.query-content', renderTable(q.result)) : null,
isError(q) ? m('.query-content', q.error) : null,
- isPending(q) ? m('.query-content') : null, ))),
+ isPending(q) ? m('.query-content') : null))),
]);
}
@@ -307,7 +304,7 @@
m('tr', columns(result).map(c => m('th', c))),
rows(result, 0, 1000).map(r => {
return m('tr', Object.values(r).map(d => m('td', d)));
- }), );
+ }));
}
function main() {