perfetto: Add high-performance Java data source and ProtoWriter

Add a zero-allocation Java data source API for writing custom trace
data from performance-critical code paths (e.g., frame rendering).

The core of this is ProtoWriter, a pure Java protobuf encoder that
writes directly to a pre-allocated thread-local byte buffer using
the same 4-byte redundant varint trick as C protozero for nested
message length fields. This enables single-pass encoding without
size pre-computation.

Architecture:
- ProtoWriter: zero-alloc proto encoder (varint, fixed, string,
  nested messages). ASCII fast path for strings. UTF-8 fallback
  with pre-allocated scratch buffer.
- PerfettoDataSource: base class with volatile enabled flag (~1ns
  disabled check) and ThreadLocal TraceContext.
- TraceContext: per-thread writer + optional InternPool. Single
  JNI call (nativeWritePacketToAllInstances) writes encoded bytes
  to all active tracing session instances via the C SDK stream
  writer, which handles chunk boundaries and patching.
- InternPool: optional string interning with lazy incremental
  state checking (no JNI overhead when interning is unused).

The JNI layer correctly handles:
- Multi-instance: iterates all active sessions, writes to each.
- Enabled flag: uses native enabled_ptr to avoid disabling when
  other instances remain active.
- Incremental state: tracks per-instance clear flags for interning.
- Large packets: delegates to PerfettoStreamWriterAppendBytes
  which handles cross-chunk writes and size field patching.

Benchmarks (Java program, real JNI, real Perfetto shmem, Linux):
  Full E2E encode + JNI + shmem (basic):  ~112 ns/op
  Full E2E encode + JNI + shmem (+ arg):  ~123 ns/op
  ProtoWriter encode only:                 ~14 ns/op
  JNI + shmem only (pre-encoded):          ~95 ns/op
vs C SDK raw data source (same-size pkt):  ~77 ns/op (1.45x)
vs C SDK track event LL:                  ~192 ns/op
vs existing Java track event HL:          ~179 ns/op

Bug: N/A
Test: perfetto_integrationtests --gtest_filter="JavaDataSourceTest.*"
Test: ProtoWriterTest, InternPoolTest, FullTraceE2ETest (JUnit4)
Change-Id: I0000000000000000000000000000000000000000
diff --git a/Android.bp b/Android.bp
index 7493b53..912a292 100644
--- a/Android.bp
+++ b/Android.bp
@@ -2919,6 +2919,7 @@
         ":perfetto_src_ipc_common",
         ":perfetto_src_ipc_host",
         ":perfetto_src_ipc_perfetto_ipc",
+        ":perfetto_src_java_datasource_integrationtests",
         ":perfetto_src_kallsyms_kallsyms",
         ":perfetto_src_kernel_utils_kernel_wakelock_errors",
         ":perfetto_src_kernel_utils_syscall_table",
@@ -13871,6 +13872,14 @@
     ],
 }
 
+// GN: //src/java_datasource:integrationtests
+filegroup {
+    name: "perfetto_src_java_datasource_integrationtests",
+    srcs: [
+        "src/java_datasource/java_datasource_integrationtest.cc",
+    ],
+}
+
 // GN: //src/java_sdk/main:perfetto_lib
 android_library {
     name: "perfetto_src_java_sdk_main_perfetto_lib",
diff --git a/gn/perfetto_integrationtests.gni b/gn/perfetto_integrationtests.gni
index 7dfdece..0c9d102 100644
--- a/gn/perfetto_integrationtests.gni
+++ b/gn/perfetto_integrationtests.gni
@@ -17,6 +17,7 @@
 perfetto_integrationtests_targets = [
   "src/tracing/test:client_api_integrationtests",
   "src/shared_lib/test:integrationtests",
+  "src/java_datasource:integrationtests",
 ]
 
 if (enable_perfetto_ipc && enable_perfetto_system_consumer) {
diff --git a/src/java_datasource/BUILD.gn b/src/java_datasource/BUILD.gn
new file mode 100644
index 0000000..1cbb2f9
--- /dev/null
+++ b/src/java_datasource/BUILD.gn
@@ -0,0 +1,19 @@
+import("../../gn/perfetto.gni")
+
+if (enable_perfetto_integration_tests) {
+  source_set("integrationtests") {
+    testonly = true
+    deps = [
+      "../../gn:default_deps",
+      "../../gn:gtest_and_gmock",
+      "../../include/perfetto/public",
+      "../base",
+      "../shared_lib:for_testing",
+      "../shared_lib:shared_lib",
+      "../shared_lib/test:utils",
+      "../shared_lib/test/protos",
+    ]
+    defines = [ "PERFETTO_SDK_DISABLE_SHLIB_EXPORT" ]
+    sources = [ "java_datasource_integrationtest.cc" ]
+  }
+}
diff --git a/src/java_datasource/bench/PerfettoBench.java b/src/java_datasource/bench/PerfettoBench.java
new file mode 100644
index 0000000..449400c
--- /dev/null
+++ b/src/java_datasource/bench/PerfettoBench.java
@@ -0,0 +1,213 @@
+package dev.perfetto.sdk;
+
+import java.util.Arrays;
+
+/**
+ * End-to-end benchmark: Java ProtoWriter encoding + real JNI + real shmem.
+ *
+ * Loads a Linux-native JNI library that wraps the Perfetto C SDK,
+ * registers a custom data source, starts an in-process tracing session,
+ * and benchmarks the full path: encode in Java -> JNI -> AppendBytes -> shmem.
+ */
+public class PerfettoBench {
+    // JNI methods - real Perfetto C SDK underneath
+    static native void nativeInit();                       // PerfettoProducerInit
+    static native long nativeRegisterDs(String name);      // PerfettoDsImplCreate + Register
+    static native long nativeStartSession(String dsName);  // Start in-process session
+    static native void nativeStopSession(long session);    // Stop + read
+    static native void nativeWritePacket(long dsImpl, byte[] buf, int len); // iterate + AppendBytes
+
+    static {
+        System.loadLibrary("perfetto_bench_jni");
+    }
+
+    // TracePacket field numbers
+    static final int TP_TIMESTAMP = 8;
+    static final int TP_SEQ_ID = 10;
+    static final int TP_TRACK_EVENT = 11;
+    static final int TP_SEQ_FLAGS = 13;
+    static final int TE_TYPE = 9;
+    static final int TE_NAME_IID = 10;
+    static final int TE_TRACK_UUID = 11;
+    static final int TE_DEBUG_ANNOTATIONS = 4;
+    static final int DA_NAME_IID = 1;
+    static final int DA_UINT = 7;
+
+    static final int WARMUP = 100_000;
+    static final int ITERATIONS = 2_000_000;
+    static final int REPS = 5;
+
+    public static void main(String[] args) {
+        nativeInit();
+        String dsName = "dev.perfetto.java_bench";
+        long dsImpl = nativeRegisterDs(dsName);
+        if (dsImpl == 0) {
+            System.err.println("Failed to register data source");
+            return;
+        }
+
+        ProtoWriter w = new ProtoWriter(4096);
+
+        System.out.println("Java DataSource E2E Benchmark (real JNI + real shmem)");
+        System.out.println("=====================================================");
+        System.out.println();
+
+        // ---------- Benchmark 1: Encoding only (no session, disabled fast path) ----------
+        benchmarkRun("ProtoWriter encode only (no shmem)", REPS, () -> {
+            for (int i = 0; i < WARMUP; i++) {
+                w.reset();
+                encodeSliceBegin(w);
+            }
+            long start = System.nanoTime();
+            for (int i = 0; i < ITERATIONS; i++) {
+                w.reset();
+                encodeSliceBegin(w);
+            }
+            return System.nanoTime() - start;
+        });
+
+        // ---------- Benchmark 2: Full E2E - encode + JNI + shmem ----------
+        {
+            long session = nativeStartSession(dsName);
+            // Warmup with real shmem writes
+            for (int i = 0; i < WARMUP; i++) {
+                w.reset();
+                encodeSliceBegin(w);
+                nativeWritePacket(dsImpl, w.buffer(), w.position());
+            }
+
+            benchmarkRun("Full E2E: encode + JNI + shmem (basic)", REPS, () -> {
+                long start = System.nanoTime();
+                for (int i = 0; i < ITERATIONS; i++) {
+                    w.reset();
+                    encodeSliceBegin(w);
+                    nativeWritePacket(dsImpl, w.buffer(), w.position());
+                }
+                return System.nanoTime() - start;
+            });
+
+            nativeStopSession(session);
+        }
+
+        // ---------- Benchmark 3: Full E2E with debug arg ----------
+        {
+            long session = nativeStartSession(dsName);
+            for (int i = 0; i < WARMUP; i++) {
+                w.reset();
+                encodeSliceBeginWithArg(w);
+                nativeWritePacket(dsImpl, w.buffer(), w.position());
+            }
+
+            benchmarkRun("Full E2E: encode + JNI + shmem (+ debug arg)", REPS, () -> {
+                long start = System.nanoTime();
+                for (int i = 0; i < ITERATIONS; i++) {
+                    w.reset();
+                    encodeSliceBeginWithArg(w);
+                    nativeWritePacket(dsImpl, w.buffer(), w.position());
+                }
+                return System.nanoTime() - start;
+            });
+
+            nativeStopSession(session);
+        }
+
+        // ---------- Benchmark 4: JNI + shmem only (pre-encoded packet) ----------
+        {
+            long session = nativeStartSession(dsName);
+            w.reset();
+            encodeSliceBegin(w);
+            byte[] preEncoded = Arrays.copyOf(w.buffer(), w.position());
+
+            for (int i = 0; i < WARMUP; i++) {
+                nativeWritePacket(dsImpl, preEncoded, preEncoded.length);
+            }
+
+            benchmarkRun("JNI + shmem only (pre-encoded, no encode cost)", REPS, () -> {
+                long start = System.nanoTime();
+                for (int i = 0; i < ITERATIONS; i++) {
+                    nativeWritePacket(dsImpl, preEncoded, preEncoded.length);
+                }
+                return System.nanoTime() - start;
+            });
+
+            nativeStopSession(session);
+        }
+
+        // ---------- Benchmark 5: Builder API encode only ----------
+        {
+            PacketBuilder pb = new PacketBuilder(null);
+            benchmarkRun("Builder encode: 3 fields + nested", REPS, () -> {
+                for (int i = 0; i < WARMUP; i++) {
+                    w.reset();
+                    encodeWithBuilder(pb, w);
+                }
+                long start = System.nanoTime();
+                for (int i = 0; i < ITERATIONS; i++) {
+                    w.reset();
+                    encodeWithBuilder(pb, w);
+                }
+                return System.nanoTime() - start;
+            });
+        }
+
+        // Packet sizes
+        System.out.println("\nPacket sizes:");
+        w.reset(); encodeSliceBegin(w);
+        System.out.printf("  SliceBegin (raw):   %d bytes%n", w.position());
+        w.reset(); encodeSliceBeginWithArg(w);
+        System.out.printf("  SliceBegin+Arg:     %d bytes%n", w.position());
+    }
+
+    static void encodeSliceBegin(ProtoWriter w) {
+        w.writeVarInt(TP_TIMESTAMP, 123456789000L);
+        w.writeVarInt(TP_SEQ_ID, 1);
+        w.writeVarInt(TP_SEQ_FLAGS, 2);
+        int te = w.beginNested(TP_TRACK_EVENT);
+        w.writeVarInt(TE_TYPE, 1);
+        w.writeVarInt(TE_TRACK_UUID, 12345);
+        w.writeVarInt(TE_NAME_IID, 1);
+        w.endNested(te);
+    }
+
+    // Same packet via builder API (should produce identical bytes)
+    static void encodeWithBuilder(PacketBuilder pb, ProtoWriter w) {
+        pb.start(w)
+                .writeVarInt(TP_TIMESTAMP, 123456789000L)
+                .writeVarInt(TP_SEQ_ID, 1)
+                .writeVarInt(TP_SEQ_FLAGS, 2)
+                .beginNested(TP_TRACK_EVENT)
+                    .writeVarInt(TE_TYPE, 1)
+                    .writeVarInt(TE_TRACK_UUID, 12345)
+                    .writeVarInt(TE_NAME_IID, 1)
+                .endNested()
+                .commit();
+    }
+
+    static void encodeSliceBeginWithArg(ProtoWriter w) {
+        w.writeVarInt(TP_TIMESTAMP, 123456789000L);
+        w.writeVarInt(TP_SEQ_ID, 1);
+        w.writeVarInt(TP_SEQ_FLAGS, 2);
+        int te = w.beginNested(TP_TRACK_EVENT);
+        w.writeVarInt(TE_TYPE, 1);
+        w.writeVarInt(TE_TRACK_UUID, 12345);
+        w.writeVarInt(TE_NAME_IID, 1);
+        int da = w.beginNested(TE_DEBUG_ANNOTATIONS);
+        w.writeVarInt(DA_NAME_IID, 1);
+        w.writeVarInt(DA_UINT, 42);
+        w.endNested(da);
+        w.endNested(te);
+    }
+
+    interface BenchFn { long run(); }
+
+    static void benchmarkRun(String name, int reps, BenchFn fn) {
+        long[] times = new long[reps];
+        for (int r = 0; r < reps; r++) {
+            times[r] = fn.run();
+        }
+        java.util.Arrays.sort(times);
+        long median = times[reps / 2];
+        double nsPerOp = (double) median / ITERATIONS;
+        System.out.printf("  %-55s %7.1f ns/op%n", name, nsPerOp);
+    }
+}
diff --git a/src/java_datasource/bench/perfetto_bench_jni.cc b/src/java_datasource/bench/perfetto_bench_jni.cc
new file mode 100644
index 0000000..66fc85a
--- /dev/null
+++ b/src/java_datasource/bench/perfetto_bench_jni.cc
@@ -0,0 +1,146 @@
+// Linux JNI for Java DataSource benchmark.
+// Links against libperfetto_c, no Android deps.
+
+#include <jni.h>
+#include <string.h>
+
+#include "perfetto/public/abi/atomic.h"
+#include "perfetto/public/abi/backend_type.h"
+#include "perfetto/public/abi/data_source_abi.h"
+#include "perfetto/public/abi/producer_abi.h"
+#include "perfetto/public/abi/stream_writer_abi.h"
+#include "perfetto/public/abi/tracing_session_abi.h"
+#include "perfetto/public/producer.h"
+#include "perfetto/public/stream_writer.h"
+#include "perfetto/public/tracing_session.h"
+
+static struct PerfettoDsImpl* g_ds_impl = nullptr;
+static PERFETTO_ATOMIC(bool) * g_enabled_ptr = nullptr;
+
+extern "C" {
+
+JNIEXPORT void JNICALL Java_dev_perfetto_sdk_PerfettoBench_nativeInit(JNIEnv*,
+                                                                      jclass) {
+  struct PerfettoProducerInitArgs args = PERFETTO_PRODUCER_INIT_ARGS_INIT();
+  args.backends = PERFETTO_BACKEND_IN_PROCESS;
+  PerfettoProducerInit(args);
+}
+
+JNIEXPORT jlong JNICALL
+Java_dev_perfetto_sdk_PerfettoBench_nativeRegisterDs(JNIEnv* env,
+                                                     jclass,
+                                                     jstring name) {
+  const char* name_c = env->GetStringUTFChars(name, nullptr);
+  size_t name_len = strlen(name_c);
+
+  struct PerfettoDsImpl* ds_impl = PerfettoDsImplCreate();
+
+  // Build DataSourceDescriptor: field 1 = name
+  uint8_t desc[256];
+  uint8_t* p = desc;
+  *p++ = (1 << 3) | 2;
+  *p++ = static_cast<uint8_t>(name_len);
+  memcpy(p, name_c, name_len);
+  p += name_len;
+
+  PERFETTO_ATOMIC(bool)* enabled_ptr = nullptr;
+  bool ok = PerfettoDsImplRegister(ds_impl, &enabled_ptr, desc,
+                                   static_cast<size_t>(p - desc));
+  env->ReleaseStringUTFChars(name, name_c);
+
+  if (!ok) {
+    return 0;
+  }
+
+  g_ds_impl = ds_impl;
+  g_enabled_ptr = enabled_ptr;
+  return static_cast<jlong>(reinterpret_cast<uintptr_t>(ds_impl));
+}
+
+JNIEXPORT jlong JNICALL
+Java_dev_perfetto_sdk_PerfettoBench_nativeStartSession(JNIEnv* env,
+                                                       jclass,
+                                                       jstring ds_name) {
+  const char* name_c = env->GetStringUTFChars(ds_name, nullptr);
+  size_t name_len = strlen(name_c);
+
+  // Build TraceConfig proto manually:
+  // TraceConfig {
+  //   buffers { size_kb: 4096 }
+  //   data_sources { config { name: "<ds_name>" } }
+  // }
+  uint8_t cfg[256];
+  size_t pos = 0;
+
+  // buffers (field 1, nested)
+  cfg[pos++] = (1 << 3) | 2;  // tag
+  cfg[pos++] = 3;             // len
+  // BufferConfig.size_kb (field 1, varint) = 4096
+  cfg[pos++] = (1 << 3) | 0;
+  cfg[pos++] = 0x80;
+  cfg[pos++] = 0x20;  // 4096
+
+  // data_sources (field 2, nested)
+  cfg[pos++] = (2 << 3) | 2;  // tag
+  size_t ds_len_pos = pos++;  // len placeholder
+
+  // DataSource.config (field 1, nested)
+  cfg[pos++] = (1 << 3) | 2;      // tag
+  size_t config_len_pos = pos++;  // len placeholder
+
+  // DataSourceConfig.name (field 1, string)
+  cfg[pos++] = (1 << 3) | 2;
+  cfg[pos++] = static_cast<uint8_t>(name_len);
+  memcpy(cfg + pos, name_c, name_len);
+  pos += name_len;
+
+  // Backfill lengths
+  cfg[config_len_pos] = static_cast<uint8_t>(pos - config_len_pos - 1);
+  cfg[ds_len_pos] = static_cast<uint8_t>(pos - ds_len_pos - 1);
+
+  env->ReleaseStringUTFChars(ds_name, name_c);
+
+  struct PerfettoTracingSessionImpl* session =
+      PerfettoTracingSessionCreate(PERFETTO_BACKEND_IN_PROCESS);
+  PerfettoTracingSessionSetup(session, cfg, pos);
+  PerfettoTracingSessionStartBlocking(session);
+
+  return static_cast<jlong>(reinterpret_cast<uintptr_t>(session));
+}
+
+JNIEXPORT void JNICALL
+Java_dev_perfetto_sdk_PerfettoBench_nativeStopSession(JNIEnv*,
+                                                      jclass,
+                                                      jlong session_ptr) {
+  auto* session = reinterpret_cast<struct PerfettoTracingSessionImpl*>(
+      static_cast<uintptr_t>(session_ptr));
+  PerfettoTracingSessionStopBlocking(session);
+  PerfettoTracingSessionDestroy(session);
+}
+
+JNIEXPORT void JNICALL
+Java_dev_perfetto_sdk_PerfettoBench_nativeWritePacket(JNIEnv* env,
+                                                      jclass,
+                                                      jlong ds_ptr,
+                                                      jbyteArray buf,
+                                                      jint len) {
+  auto* ds_impl =
+      reinterpret_cast<struct PerfettoDsImpl*>(static_cast<uintptr_t>(ds_ptr));
+
+  // Same path as our real JNI: copy to stack, iterate, AppendBytes.
+  uint8_t stack_buf[4096];
+  env->GetByteArrayRegion(buf, 0, len, reinterpret_cast<jbyte*>(stack_buf));
+
+  struct PerfettoDsImplTracerIterator it =
+      PerfettoDsImplTraceIterateBegin(ds_impl);
+  while (it.tracer) {
+    struct PerfettoStreamWriter writer =
+        PerfettoDsTracerImplPacketBegin(it.tracer);
+    PerfettoStreamWriterAppendBytes(&writer, stack_buf,
+                                    static_cast<size_t>(len));
+    PerfettoDsTracerImplPacketEnd(it.tracer, &writer);
+    PerfettoDsImplTraceIterateNext(ds_impl, &it);
+  }
+}
+
+}  // extern "C"
diff --git a/src/java_datasource/java/main/AndroidManifest.xml b/src/java_datasource/java/main/AndroidManifest.xml
new file mode 100644
index 0000000..ee5c2a8
--- /dev/null
+++ b/src/java_datasource/java/main/AndroidManifest.xml
@@ -0,0 +1,26 @@
+<?xml version="1.0" encoding="utf-8"?>
+<!--
+ * Copyright (C) 2026 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ -->
+
+<manifest xmlns:android="http://schemas.android.com/apk/res/android"
+    package="dev.perfetto.sdk">
+    <uses-sdk android:minSdkVersion="33" />
+
+    <application>
+        <!-- ... -->
+    </application>
+
+</manifest>
diff --git a/src/java_datasource/java/main/BUILD.gn b/src/java_datasource/java/main/BUILD.gn
new file mode 100644
index 0000000..2aad09e
--- /dev/null
+++ b/src/java_datasource/java/main/BUILD.gn
@@ -0,0 +1,20 @@
+import("../../../../gn/perfetto.gni")
+import("../../../../gn/perfetto_android_sdk.gni")
+
+assert(enable_perfetto_android_java_sdk)
+
+perfetto_android_library("perfetto_datasource_lib") {
+  sources = [
+    "dev/perfetto/sdk/InternPool.java",
+    "dev/perfetto/sdk/PacketBuilder.java",
+    "dev/perfetto/sdk/PerfettoDataSource.java",
+    "dev/perfetto/sdk/ProtoWriter.java",
+    "dev/perfetto/sdk/TraceContext.java",
+  ]
+  deps = [ "../../jni:libperfetto_datasource_jni" ]
+  manifest = "AndroidManifest.xml"
+  android_bp_java_target_name_suffix = "_java"
+  android_bp_copy_java_target_name_suffix = "_framework_java"
+  android_bp_copy_java_target_jarjar =
+      "perfetto-datasource-framework-jarjar-rules.txt"
+}
diff --git a/src/java_datasource/java/main/dev/perfetto/sdk/InternPool.java b/src/java_datasource/java/main/dev/perfetto/sdk/InternPool.java
new file mode 100644
index 0000000..15704d7
--- /dev/null
+++ b/src/java_datasource/java/main/dev/perfetto/sdk/InternPool.java
@@ -0,0 +1,119 @@
+/*
+ * Copyright (C) 2026 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package dev.perfetto.sdk;
+
+import java.util.HashMap;
+
+/**
+ * String interning pool for Perfetto trace packets.
+ *
+ * Maps strings to integer IDs (interned IDs or "iids") for each interning
+ * type. When the same string is used repeatedly across trace packets,
+ * the full string is emitted once in an InternedData message and subsequent
+ * uses reference it by iid.
+ *
+ * The pool is reset when the incremental state is cleared (e.g., after a
+ * flush). After reset, all strings must be re-emitted.
+ *
+ * Thread safety: not thread-safe. Each thread uses its own pool via
+ * TraceContext.
+ */
+public final class InternPool {
+    /**
+     * Result of an intern operation. Contains the iid and whether this is a
+     * new entry that needs to be emitted in InternedData.
+     *
+     * Fields are public for zero-overhead access on the hot path. This object
+     * is reused across calls -- do not hold a reference to it.
+     */
+    public static final class InternResult {
+        public int iid;
+        public boolean isNew;
+
+        void set(int iid, boolean isNew) {
+            this.iid = iid;
+            this.isNew = isNew;
+        }
+    }
+
+    // One map per interning type. Indexed by user-defined type constants.
+    // Each map: string -> iid.
+    private static final int MAX_INTERN_TYPES = 16;
+    @SuppressWarnings("unchecked")
+    private final HashMap<String, Integer>[] mTables = new HashMap[MAX_INTERN_TYPES];
+    private final int[] mNextIid = new int[MAX_INTERN_TYPES];
+
+    // Reusable result object to avoid allocation on every intern() call.
+    private final InternResult mResult = new InternResult();
+
+    private int mGeneration;
+
+    public InternPool() {
+        for (int i = 0; i < MAX_INTERN_TYPES; i++) {
+            mNextIid[i] = 1; // iid 0 is reserved
+        }
+    }
+
+    /**
+     * Intern a string for the given type.
+     *
+     * Returns an InternResult (reused object, not allocated) with the iid
+     * and whether this is a new entry. If isNew is true, the caller must
+     * emit an InternedData entry for this string.
+     *
+     * @param type Interning type index (0 to MAX_INTERN_TYPES-1).
+     *             Use constants for event_names, debug_annotation_names, etc.
+     * @param value The string to intern.
+     * @return Reused InternResult. Copy the values if you need to keep them.
+     */
+    public InternResult intern(int type, String value) {
+        HashMap<String, Integer> table = mTables[type];
+        if (table == null) {
+            table = new HashMap<>();
+            mTables[type] = table;
+        }
+
+        Integer existing = table.get(value);
+        if (existing != null) {
+            mResult.set(existing, false);
+        } else {
+            int iid = mNextIid[type]++;
+            table.put(value, iid);
+            mResult.set(iid, true);
+        }
+        return mResult;
+    }
+
+    /**
+     * Reset all interning tables. Called when incremental state is cleared.
+     * After reset, all strings will be re-emitted on next use.
+     */
+    public void reset() {
+        for (int i = 0; i < MAX_INTERN_TYPES; i++) {
+            if (mTables[i] != null) {
+                mTables[i].clear();
+            }
+            mNextIid[i] = 1;
+        }
+        mGeneration++;
+    }
+
+    /** Current generation. Changes on every reset(). */
+    public int generation() {
+        return mGeneration;
+    }
+}
diff --git a/src/java_datasource/java/main/dev/perfetto/sdk/PacketBuilder.java b/src/java_datasource/java/main/dev/perfetto/sdk/PacketBuilder.java
new file mode 100644
index 0000000..2c9b227
--- /dev/null
+++ b/src/java_datasource/java/main/dev/perfetto/sdk/PacketBuilder.java
@@ -0,0 +1,160 @@
+/*
+ * Copyright (C) 2026 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package dev.perfetto.sdk;
+
+/**
+ * Fluent builder for constructing TracePacket proto messages.
+ *
+ * Zero allocations. Cached per-thread in TraceContext and reused across
+ * trace points. All methods write directly to the underlying ProtoWriter.
+ *
+ * This is a thin wrapper over ProtoWriter that provides:
+ * - Fluent chaining (all write methods return {@code this})
+ * - Managed nesting (LIFO endNested with auto-close on commit)
+ * - One-call commit to all active tracing sessions
+ *
+ * Usage:
+ *   TraceContext ctx = dataSource.trace();
+ *   if (ctx == null) return;
+ *   ctx.newPacket()
+ *       .writeVarInt(TIMESTAMP_FIELD, timestamp)
+ *       .beginNested(MY_PAYLOAD_FIELD)
+ *           .writeString(NAME_FIELD, name)
+ *           .writeVarInt(VALUE_FIELD, value)
+ *       .endNested()
+ *       .commit();
+ *
+ * Thread safety: not thread-safe. Each thread has its own instance.
+ */
+public final class PacketBuilder {
+    private static final int MAX_NESTING_DEPTH = 16;
+
+    private final TraceContext mCtx;
+    private ProtoWriter mWriter;
+    private final int[] mTokenStack = new int[MAX_NESTING_DEPTH];
+    private int mTokenDepth;
+
+    PacketBuilder(TraceContext ctx) {
+        mCtx = ctx;
+    }
+
+    /** Start building a new packet. Called by TraceContext.newPacket(). */
+    PacketBuilder start(ProtoWriter writer) {
+        mWriter = writer;
+        mTokenDepth = 0;
+        return this;
+    }
+
+    // ====================================================================
+    // Field writers (mirror ProtoWriter but return this for chaining)
+    // ====================================================================
+
+    /** Write a varint field (uint32/uint64/int32/int64/bool/enum). */
+    public PacketBuilder writeVarInt(int fieldId, long value) {
+        mWriter.writeVarInt(fieldId, value);
+        return this;
+    }
+
+    /** Write a sint32/sint64 field (zigzag encoded). */
+    public PacketBuilder writeSInt(int fieldId, long value) {
+        mWriter.writeSInt(fieldId, value);
+        return this;
+    }
+
+    /** Write a bool field. */
+    public PacketBuilder writeBool(int fieldId, boolean value) {
+        mWriter.writeBool(fieldId, value);
+        return this;
+    }
+
+    /** Write a fixed64/sfixed64 field. */
+    public PacketBuilder writeFixed64(int fieldId, long value) {
+        mWriter.writeFixed64(fieldId, value);
+        return this;
+    }
+
+    /** Write a fixed32/sfixed32 field. */
+    public PacketBuilder writeFixed32(int fieldId, int value) {
+        mWriter.writeFixed32(fieldId, value);
+        return this;
+    }
+
+    /** Write a double field. */
+    public PacketBuilder writeDouble(int fieldId, double value) {
+        mWriter.writeDouble(fieldId, value);
+        return this;
+    }
+
+    /** Write a float field. */
+    public PacketBuilder writeFloat(int fieldId, float value) {
+        mWriter.writeFloat(fieldId, value);
+        return this;
+    }
+
+    /** Write a string field. */
+    public PacketBuilder writeString(int fieldId, String value) {
+        mWriter.writeString(fieldId, value);
+        return this;
+    }
+
+    /** Write a bytes field. */
+    public PacketBuilder writeBytes(int fieldId, byte[] value) {
+        mWriter.writeBytes(fieldId, value);
+        return this;
+    }
+
+    /** Write a bytes field with offset and length. */
+    public PacketBuilder writeBytes(int fieldId, byte[] value, int offset, int length) {
+        mWriter.writeBytes(fieldId, value, offset, length);
+        return this;
+    }
+
+    // ====================================================================
+    // Nesting
+    // ====================================================================
+
+    /** Begin a nested message. Must be matched by endNested(). */
+    public PacketBuilder beginNested(int fieldId) {
+        mTokenStack[mTokenDepth++] = mWriter.beginNested(fieldId);
+        return this;
+    }
+
+    /** End the most recently opened nested message. */
+    public PacketBuilder endNested() {
+        mWriter.endNested(mTokenStack[--mTokenDepth]);
+        return this;
+    }
+
+    // ====================================================================
+    // Commit
+    // ====================================================================
+
+    /**
+     * Finalize the packet and write it to all active tracing sessions.
+     * Closes any unclosed nested messages and commits via TraceContext.
+     */
+    public void commit() {
+        while (mTokenDepth > 0) {
+            mWriter.endNested(mTokenStack[--mTokenDepth]);
+        }
+        // mCtx is null in standalone/test usage where the caller reads the
+        // encoded bytes from the ProtoWriter directly.
+        if (mCtx != null) {
+            mCtx.commitPacket();
+        }
+    }
+}
diff --git a/src/java_datasource/java/main/dev/perfetto/sdk/PerfettoDataSource.java b/src/java_datasource/java/main/dev/perfetto/sdk/PerfettoDataSource.java
new file mode 100644
index 0000000..24509da
--- /dev/null
+++ b/src/java_datasource/java/main/dev/perfetto/sdk/PerfettoDataSource.java
@@ -0,0 +1,134 @@
+/*
+ * Copyright (C) 2026 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package dev.perfetto.sdk;
+
+import dalvik.annotation.optimization.CriticalNative;
+import dalvik.annotation.optimization.FastNative;
+
+/**
+ * High-performance custom data source for Perfetto.
+ *
+ * Zero heap allocations on the trace hot path. Proto encoding is done
+ * entirely in Java (ProtoWriter) to a pre-allocated thread-local buffer.
+ * A single JNI call writes the encoded packet to all active tracing
+ * session instances' shared memory.
+ *
+ * Usage:
+ *   // Define and register (once at startup):
+ *   class MyDataSource extends PerfettoDataSource {
+ *     static final MyDataSource INSTANCE = new MyDataSource();
+ *     static { INSTANCE.register("my.data_source"); }
+ *   }
+ *
+ *   // Builder API (zero allocs, fluent):
+ *   TraceContext ctx = MyDataSource.INSTANCE.trace();
+ *   if (ctx == null) return;
+ *   ctx.newPacket()
+ *       .writeVarInt(TIMESTAMP_FIELD, timestamp)
+ *       .beginNested(MY_PAYLOAD_FIELD)
+ *           .writeString(NAME_FIELD, name)
+ *           .writeVarInt(VALUE_FIELD, value)
+ *       .endNested()
+ *       .commit();
+ *
+ *   // Low-level ProtoWriter API (maximum control):
+ *   TraceContext ctx = MyDataSource.INSTANCE.trace();
+ *   if (ctx == null) return;
+ *   ProtoWriter w = ctx.getWriter();
+ *   w.writeVarInt(TIMESTAMP_FIELD, timestamp);
+ *   int payload = w.beginNested(MY_PAYLOAD_FIELD);
+ *   w.writeString(NAME_FIELD, name);
+ *   w.endNested(payload);
+ *   ctx.commitPacket();
+ *
+ * Thread safety: this class is thread-safe. trace() returns per-thread
+ * TraceContext instances.
+ */
+public abstract class PerfettoDataSource {
+    long mNativeDsPtr;
+
+    // Volatile: single read in trace() is the fast-path disabled check.
+    // Set true in OnStart (any instance), false in OnStop (when no
+    // instances remain, checked via native enabled_ptr).
+    volatile boolean mEnabled;
+
+    // Per-thread TraceContext. Allocated once, then reused forever.
+    final ThreadLocal<TraceContext> mTlsContext = new ThreadLocal<>();
+
+    /**
+     * Register this data source with the tracing service.
+     *
+     * @param name Data source name matching the trace config.
+     */
+    public final void register(String name) {
+        mNativeDsPtr = nativeRegister(this, name);
+    }
+
+    /**
+     * Begin a trace operation. Returns a TraceContext if tracing is enabled,
+     * null otherwise.
+     *
+     * Cost when disabled: 1 volatile read (~1ns).
+     * Cost when enabled: volatile read + ThreadLocal.get() (~4ns).
+     *
+     * Call ctx.commitPacket() when done writing.
+     */
+    public final TraceContext trace() {
+        if (!mEnabled) {
+            return null;
+        }
+        TraceContext ctx = mTlsContext.get();
+        if (ctx == null) {
+            ctx = new TraceContext();
+            mTlsContext.set(ctx);
+        }
+        ctx.begin(mNativeDsPtr);
+        return ctx;
+    }
+
+    // ====================================================================
+    // Lifecycle callbacks (override in subclass if needed)
+    // ====================================================================
+
+    protected void onSetup(int instanceIndex, byte[] config) {}
+    protected void onStart(int instanceIndex) {}
+    protected void onStop(int instanceIndex) {}
+    protected void onFlush(int instanceIndex) {}
+
+    // Called from JNI
+    @SuppressWarnings("unused")
+    void onEnabledChanged(boolean enabled) {
+        mEnabled = enabled;
+    }
+
+    // ====================================================================
+    // JNI
+    // ====================================================================
+
+    @FastNative
+    static native long nativeRegister(PerfettoDataSource ds, String name);
+
+    @CriticalNative
+    static native boolean nativeCheckAnyIncrementalStateCleared(long dsPtr);
+
+    @FastNative
+    static native void nativeWritePacketToAllInstances(
+            long dsPtr, byte[] buf, int len);
+
+    @CriticalNative
+    static native void nativeFlush(long dsPtr);
+}
diff --git a/src/java_datasource/java/main/dev/perfetto/sdk/ProtoWriter.java b/src/java_datasource/java/main/dev/perfetto/sdk/ProtoWriter.java
new file mode 100644
index 0000000..a9e6810
--- /dev/null
+++ b/src/java_datasource/java/main/dev/perfetto/sdk/ProtoWriter.java
@@ -0,0 +1,330 @@
+/*
+ * Copyright (C) 2026 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package dev.perfetto.sdk;
+
+/**
+ * Zero-allocation protobuf encoder. Pure Java equivalent of C protozero.
+ *
+ * Writes protobuf wire format directly to a pre-allocated byte buffer.
+ * Designed for Perfetto trace packet encoding on the Android frame rendering
+ * hot path.
+ *
+ * Nested messages use 4-byte redundant varint encoding for the length field,
+ * matching protozero's approach for single-pass encoding without size
+ * pre-computation. The redundant varint encodes small values in 4 bytes
+ * (e.g., 5 is encoded as 0x85 0x80 0x80 0x00) which is valid protobuf
+ * that all decoders handle correctly.
+ *
+ * Thread safety: not thread-safe. Each thread must use its own instance
+ * (typically via ThreadLocal in TraceContext).
+ */
+public final class ProtoWriter {
+    private static final int WIRE_TYPE_VARINT = 0;
+    private static final int WIRE_TYPE_FIXED64 = 1;
+    private static final int WIRE_TYPE_DELIMITED = 2;
+    private static final int WIRE_TYPE_FIXED32 = 5;
+
+    // Matches PROTOZERO_MESSAGE_LENGTH_FIELD_SIZE in pb_msg.h.
+    private static final int NESTED_LENGTH_FIELD_SIZE = 4;
+    private static final int MAX_NESTING_DEPTH = 16;
+    private static final int UTF8_SCRATCH_SIZE = 512;
+
+    private byte[] mBuf;
+    private int mPos;
+    private final int[] mNestingStack = new int[MAX_NESTING_DEPTH];
+    private int mNestingDepth;
+    private final byte[] mUtf8Scratch = new byte[UTF8_SCRATCH_SIZE];
+
+    public ProtoWriter() {
+        this(32 * 1024);
+    }
+
+    public ProtoWriter(int bufferSize) {
+        mBuf = new byte[bufferSize];
+    }
+
+    /** Reset write position. No allocation. */
+    public void reset() {
+        mPos = 0;
+        mNestingDepth = 0;
+    }
+
+    /** Current write position (number of bytes written). */
+    public int position() {
+        return mPos;
+    }
+
+    /** The underlying buffer. Valid data from index 0 to position(). */
+    public byte[] buffer() {
+        return mBuf;
+    }
+
+    // ========================================================================
+    // Varint fields (wire type 0)
+    // ========================================================================
+
+    /** Write uint32/uint64/int32/int64/enum field. */
+    public void writeVarInt(int fieldId, long value) {
+        writeRawVarInt(makeTag(fieldId, WIRE_TYPE_VARINT));
+        writeRawVarInt(value);
+    }
+
+    /** Write sint32/sint64 field (zigzag encoded). */
+    public void writeSInt(int fieldId, long value) {
+        writeVarInt(fieldId, (value << 1) ^ (value >> 63));
+    }
+
+    /** Write bool field. */
+    public void writeBool(int fieldId, boolean value) {
+        writeRawVarInt(makeTag(fieldId, WIRE_TYPE_VARINT));
+        ensureCapacity(1);
+        mBuf[mPos++] = (byte) (value ? 1 : 0);
+    }
+
+    // ========================================================================
+    // Fixed-size fields
+    // ========================================================================
+
+    /** Write fixed64/sfixed64 field. */
+    public void writeFixed64(int fieldId, long value) {
+        writeRawVarInt(makeTag(fieldId, WIRE_TYPE_FIXED64));
+        putLongLE(value);
+    }
+
+    /** Write fixed32/sfixed32 field. */
+    public void writeFixed32(int fieldId, int value) {
+        writeRawVarInt(makeTag(fieldId, WIRE_TYPE_FIXED32));
+        putIntLE(value);
+    }
+
+    /** Write double field. */
+    public void writeDouble(int fieldId, double value) {
+        writeFixed64(fieldId, Double.doubleToRawLongBits(value));
+    }
+
+    /** Write float field. */
+    public void writeFloat(int fieldId, float value) {
+        writeFixed32(fieldId, Float.floatToRawIntBits(value));
+    }
+
+    // ========================================================================
+    // Length-delimited fields (wire type 2)
+    // ========================================================================
+
+    /**
+     * Write string field. Uses ASCII fast path when all chars are <= 0x7F
+     * (common for trace event names, categories, arg keys).
+     */
+    public void writeString(int fieldId, String value) {
+        writeRawVarInt(makeTag(fieldId, WIRE_TYPE_DELIMITED));
+        int len = value.length();
+
+        boolean ascii = true;
+        for (int i = 0; i < len; i++) {
+            if (value.charAt(i) > 0x7F) {
+                ascii = false;
+                break;
+            }
+        }
+
+        if (ascii) {
+            writeRawVarInt(len);
+            ensureCapacity(len);
+            for (int i = 0; i < len; i++) {
+                mBuf[mPos++] = (byte) value.charAt(i);
+            }
+        } else {
+            writeStringUtf8(value);
+        }
+    }
+
+    /** Write bytes field. */
+    public void writeBytes(int fieldId, byte[] value, int offset, int length) {
+        writeRawVarInt(makeTag(fieldId, WIRE_TYPE_DELIMITED));
+        writeRawVarInt(length);
+        ensureCapacity(length);
+        System.arraycopy(value, offset, mBuf, mPos, length);
+        mPos += length;
+    }
+
+    /** Write bytes field (full array). */
+    public void writeBytes(int fieldId, byte[] value) {
+        writeBytes(fieldId, value, 0, value.length);
+    }
+
+    // ========================================================================
+    // Nested messages
+    // ========================================================================
+
+    /**
+     * Begin a nested length-delimited message.
+     *
+     * Writes the field tag and reserves 4 bytes for the length (redundant
+     * varint). Returns a nesting token that must be passed to endNested().
+     *
+     * The 4-byte redundant varint supports nested messages up to 256MB
+     * (0x0FFFFFFF bytes). This matches protozero's limit.
+     */
+    public int beginNested(int fieldId) {
+        writeRawVarInt(makeTag(fieldId, WIRE_TYPE_DELIMITED));
+        ensureCapacity(NESTED_LENGTH_FIELD_SIZE);
+        int bookmark = mPos;
+        mPos += NESTED_LENGTH_FIELD_SIZE;
+        mNestingStack[mNestingDepth++] = bookmark;
+        return mNestingDepth - 1;
+    }
+
+    /**
+     * End a nested message started with beginNested().
+     *
+     * Backfills the 4-byte redundant varint length at the bookmark position.
+     * The redundant encoding writes the size using exactly 4 bytes with
+     * leading-zero continuation bits:
+     *
+     *   byte 0: (size & 0x7F) | 0x80
+     *   byte 1: ((size >> 7) & 0x7F) | 0x80
+     *   byte 2: ((size >> 14) & 0x7F) | 0x80
+     *   byte 3: (size >> 21) & 0x7F
+     *
+     * This is valid protobuf (decoders must handle leading-zero varints)
+     * and avoids a double pass to compute sizes upfront.
+     */
+    public void endNested(int token) {
+        mNestingDepth--;
+        int bookmark = mNestingStack[token];
+        int size = mPos - bookmark - NESTED_LENGTH_FIELD_SIZE;
+        mBuf[bookmark] = (byte) ((size & 0x7F) | 0x80);
+        mBuf[bookmark + 1] = (byte) (((size >> 7) & 0x7F) | 0x80);
+        mBuf[bookmark + 2] = (byte) (((size >> 14) & 0x7F) | 0x80);
+        mBuf[bookmark + 3] = (byte) ((size >> 21) & 0x7F);
+    }
+
+    // ========================================================================
+    // Append raw bytes (for pre-encoded data)
+    // ========================================================================
+
+    /** Append raw bytes to the output. */
+    public void appendRawBytes(byte[] data, int offset, int length) {
+        ensureCapacity(length);
+        System.arraycopy(data, offset, mBuf, mPos, length);
+        mPos += length;
+    }
+
+    // ========================================================================
+    // Internal methods
+    // ========================================================================
+
+    private static long makeTag(int fieldId, int wireType) {
+        return ((long) fieldId << 3) | wireType;
+    }
+
+    private void writeRawVarInt(long value) {
+        ensureCapacity(10);
+        while ((value & ~0x7FL) != 0) {
+            mBuf[mPos++] = (byte) ((value & 0x7F) | 0x80);
+            value >>>= 7;
+        }
+        mBuf[mPos++] = (byte) value;
+    }
+
+    private void putLongLE(long value) {
+        ensureCapacity(8);
+        mBuf[mPos++] = (byte) value;
+        mBuf[mPos++] = (byte) (value >> 8);
+        mBuf[mPos++] = (byte) (value >> 16);
+        mBuf[mPos++] = (byte) (value >> 24);
+        mBuf[mPos++] = (byte) (value >> 32);
+        mBuf[mPos++] = (byte) (value >> 40);
+        mBuf[mPos++] = (byte) (value >> 48);
+        mBuf[mPos++] = (byte) (value >> 56);
+    }
+
+    private void putIntLE(int value) {
+        ensureCapacity(4);
+        mBuf[mPos++] = (byte) value;
+        mBuf[mPos++] = (byte) (value >> 8);
+        mBuf[mPos++] = (byte) (value >> 16);
+        mBuf[mPos++] = (byte) (value >> 24);
+    }
+
+    private void ensureCapacity(int needed) {
+        if (mPos + needed <= mBuf.length) {
+            return;
+        }
+        int newSize = Math.max(mBuf.length * 2, mPos + needed);
+        byte[] newBuf = new byte[newSize];
+        System.arraycopy(mBuf, 0, newBuf, 0, mPos);
+        mBuf = newBuf;
+    }
+
+    private void writeStringUtf8(String s) {
+        int utf8Len = encodeUtf8(s, mUtf8Scratch);
+        if (utf8Len >= 0) {
+            writeRawVarInt(utf8Len);
+            ensureCapacity(utf8Len);
+            System.arraycopy(mUtf8Scratch, 0, mBuf, mPos, utf8Len);
+            mPos += utf8Len;
+        } else {
+            // Scratch too small (string > ~170 chars non-ASCII). Rare path.
+            byte[] big = new byte[-utf8Len];
+            int actualLen = encodeUtf8(s, big);
+            writeRawVarInt(actualLen);
+            ensureCapacity(actualLen);
+            System.arraycopy(big, 0, mBuf, mPos, actualLen);
+            mPos += actualLen;
+        }
+    }
+
+    /**
+     * Encode string as UTF-8 into dst.
+     * Returns byte count on success.
+     * Returns negative value (-needed_size) if dst is too small.
+     */
+    private static int encodeUtf8(String s, byte[] dst) {
+        int len = s.length();
+        int dp = 0;
+        for (int i = 0; i < len; i++) {
+            char c = s.charAt(i);
+            if (c <= 0x7F) {
+                if (dp >= dst.length)
+                    return -(len * 3);
+                dst[dp++] = (byte) c;
+            } else if (c <= 0x7FF) {
+                if (dp + 2 > dst.length)
+                    return -(len * 3);
+                dst[dp++] = (byte) (0xC0 | (c >> 6));
+                dst[dp++] = (byte) (0x80 | (c & 0x3F));
+            } else if (Character.isHighSurrogate(c) && i + 1 < len) {
+                char low = s.charAt(++i);
+                int cp = Character.toCodePoint(c, low);
+                if (dp + 4 > dst.length)
+                    return -(len * 4);
+                dst[dp++] = (byte) (0xF0 | (cp >> 18));
+                dst[dp++] = (byte) (0x80 | ((cp >> 12) & 0x3F));
+                dst[dp++] = (byte) (0x80 | ((cp >> 6) & 0x3F));
+                dst[dp++] = (byte) (0x80 | (cp & 0x3F));
+            } else {
+                if (dp + 3 > dst.length)
+                    return -(len * 3);
+                dst[dp++] = (byte) (0xE0 | (c >> 12));
+                dst[dp++] = (byte) (0x80 | ((c >> 6) & 0x3F));
+                dst[dp++] = (byte) (0x80 | (c & 0x3F));
+            }
+        }
+        return dp;
+    }
+}
diff --git a/src/java_datasource/java/main/dev/perfetto/sdk/TraceContext.java b/src/java_datasource/java/main/dev/perfetto/sdk/TraceContext.java
new file mode 100644
index 0000000..6e08d91
--- /dev/null
+++ b/src/java_datasource/java/main/dev/perfetto/sdk/TraceContext.java
@@ -0,0 +1,162 @@
+/*
+ * Copyright (C) 2026 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package dev.perfetto.sdk;
+
+/**
+ * Per-thread tracing context for a data source.
+ *
+ * Manages a ProtoWriter and InternPool, and handles writing encoded trace
+ * packets to all active tracing session instances via a single JNI call.
+ *
+ * Cached per-thread via ThreadLocal in PerfettoDataSource. No allocation
+ * after first use on each thread.
+ *
+ * Builder API:
+ *   ctx.newPacket()
+ *       .writeVarInt(TIMESTAMP_FIELD, timestamp)
+ *       .beginNested(MY_PAYLOAD_FIELD)
+ *           .writeString(NAME_FIELD, name)
+ *       .endNested()
+ *       .commit();
+ *
+ * Low-level ProtoWriter API:
+ *   ProtoWriter w = ctx.getWriter();
+ *   w.writeVarInt(TIMESTAMP_FIELD, timestamp);
+ *   ctx.commitPacket();
+ *
+ * Thread safety: not thread-safe. Each thread has its own instance.
+ */
+public final class TraceContext {
+    private final ProtoWriter mWriter;
+    private final InternPool mInternPool;
+    private final PacketBuilder mPacketBuilder;
+    private long mDsPtr;
+
+    // Set true on begin(). Cleared after the incremental state check.
+    // Ensures we check exactly once per trace() call, and only when
+    // the caller actually uses interning.
+    private boolean mNeedsIncrStateCheck;
+
+    TraceContext() {
+        mWriter = new ProtoWriter();
+        mInternPool = new InternPool();
+        mPacketBuilder = new PacketBuilder(this);
+    }
+
+    /**
+     * Begin a trace operation. Called by PerfettoDataSource.trace().
+     * Resets the writer and marks incremental state for checking.
+     */
+    void begin(long dsPtr) {
+        mDsPtr = dsPtr;
+        mWriter.reset();
+        mNeedsIncrStateCheck = true;
+    }
+
+    /** Get the ProtoWriter for encoding TracePacket fields. */
+    public ProtoWriter getWriter() {
+        return mWriter;
+    }
+
+    /**
+     * Get the interning pool for this thread.
+     *
+     * The InternPool is valid only after calling
+     * {@link #resetIncrementalStateIfNeeded()}. Calling intern() without
+     * checking incremental state first may produce broken traces when
+     * multiple tracing sessions are active.
+     */
+    public InternPool getInternPool() {
+        return mInternPool;
+    }
+
+    /**
+     * Check and reset incremental state if any session requires it.
+     *
+     * Must be called once per trace() before using the InternPool. This
+     * is a single JNI call that checks all active session instances.
+     *
+     * When this returns true:
+     * 1. The InternPool has been reset (all entries cleared).
+     * 2. The caller must write sequence_flags with
+     *    SEQ_INCREMENTAL_STATE_CLEARED in the packet.
+     * 3. All subsequent intern() calls return isNew=true until the pool
+     *    is warm again.
+     *
+     * When this returns false, the InternPool is still valid from the
+     * previous trace point and no special action is needed.
+     *
+     * This method is idempotent within a single trace() call -- the
+     * second call always returns false.
+     *
+     * If you don't use interning, don't call this. No JNI overhead.
+     */
+    public boolean resetIncrementalStateIfNeeded() {
+        if (!mNeedsIncrStateCheck) {
+            return false;
+        }
+        mNeedsIncrStateCheck = false;
+
+        boolean cleared =
+                PerfettoDataSource.nativeCheckAnyIncrementalStateCleared(mDsPtr);
+        if (cleared) {
+            mInternPool.reset();
+        }
+        return cleared;
+    }
+
+    // ====================================================================
+    // Packet builder: fluent API for constructing proto messages.
+    // Zero allocations -- the PacketBuilder is cached per-thread.
+    // ====================================================================
+
+    /**
+     * Start building a new TracePacket with a fluent API.
+     *
+     * Example:
+     *   ctx.newPacket()
+     *       .writeVarInt(TIMESTAMP_FIELD, timestamp)
+     *       .beginNested(MY_PAYLOAD_FIELD)
+     *           .writeString(NAME_FIELD, name)
+     *           .writeVarInt(VALUE_FIELD, value)
+     *       .endNested()
+     *       .commit();
+     */
+    public PacketBuilder newPacket() {
+        return mPacketBuilder.start(mWriter);
+    }
+
+    // ====================================================================
+    // Low-level API: direct ProtoWriter access for maximum control.
+    // ====================================================================
+
+    /**
+     * Write the encoded packet to all active tracing session instances
+     * and reset the writer for the next packet.
+     *
+     * Single JNI call. The C stream writer handles chunk boundaries
+     * and size field patching internally.
+     */
+    public void commitPacket() {
+        int len = mWriter.position();
+        if (len > 0) {
+            PerfettoDataSource.nativeWritePacketToAllInstances(
+                    mDsPtr, mWriter.buffer(), len);
+        }
+        mWriter.reset();
+    }
+}
diff --git a/src/java_datasource/java/main/perfetto-datasource-framework-jarjar-rules.txt b/src/java_datasource/java/main/perfetto-datasource-framework-jarjar-rules.txt
new file mode 100644
index 0000000..5a0086b
--- /dev/null
+++ b/src/java_datasource/java/main/perfetto-datasource-framework-jarjar-rules.txt
@@ -0,0 +1 @@
+rule dev.perfetto.sdk.** com.android.internal.dev.perfetto.sdk.@1
diff --git a/src/java_datasource/java/test/BUILD.gn b/src/java_datasource/java/test/BUILD.gn
new file mode 100644
index 0000000..5fa5dcb
--- /dev/null
+++ b/src/java_datasource/java/test/BUILD.gn
@@ -0,0 +1,20 @@
+import("../../../../gn/perfetto.gni")
+import("../../../../gn/perfetto_android_sdk.gni")
+
+assert(enable_perfetto_android_java_sdk)
+
+perfetto_android_library("perfetto_datasource_test") {
+  testonly = true
+  sources = [
+    "dev/perfetto/sdk/InternPoolTest.java",
+    "dev/perfetto/sdk/PacketBuilderTest.java",
+    "dev/perfetto/sdk/ProtoWriterTest.java",
+    "dev/perfetto/sdk/TraceEncodingTest.java",
+  ]
+  deps = [ "../main:perfetto_datasource_lib" ]
+  manifest = "../main/AndroidManifest.xml"
+  android_bp_java_target_name_suffix = "_tests_java"
+  android_bp_copy_java_target_name_suffix = "_tests_framework_java"
+  android_bp_copy_java_target_jarjar =
+      "../main/perfetto-datasource-framework-jarjar-rules.txt"
+}
diff --git a/src/java_datasource/java/test/dev/perfetto/sdk/InternPoolTest.java b/src/java_datasource/java/test/dev/perfetto/sdk/InternPoolTest.java
new file mode 100644
index 0000000..5c02399
--- /dev/null
+++ b/src/java_datasource/java/test/dev/perfetto/sdk/InternPoolTest.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright (C) 2026 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package dev.perfetto.sdk;
+
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+
+public class InternPoolTest {
+    @Test
+    public void internAndCache() {
+        InternPool pool = new InternPool();
+        InternPool.InternResult r = pool.intern(0, "hello");
+        assertTrue(r.isNew);
+        assertEquals(1, r.iid);
+
+        r = pool.intern(0, "hello");
+        assertFalse(r.isNew);
+        assertEquals(1, r.iid);
+
+        r = pool.intern(0, "world");
+        assertTrue(r.isNew);
+        assertEquals(2, r.iid);
+    }
+
+    @Test
+    public void independentTypes() {
+        InternPool pool = new InternPool();
+        pool.intern(0, "hello");
+
+        InternPool.InternResult r = pool.intern(1, "hello");
+        assertTrue(r.isNew);
+        assertEquals(1, r.iid); // independent iid space
+    }
+
+    @Test
+    public void reset() {
+        InternPool pool = new InternPool();
+        pool.intern(0, "hello");
+        assertEquals(0, pool.generation());
+
+        pool.reset();
+        assertEquals(1, pool.generation());
+
+        InternPool.InternResult r = pool.intern(0, "hello");
+        assertTrue(r.isNew);
+        assertEquals(1, r.iid); // restarts from 1
+    }
+
+    @Test
+    public void resultObjectReused() {
+        InternPool pool = new InternPool();
+        InternPool.InternResult r1 = pool.intern(0, "a");
+        InternPool.InternResult r2 = pool.intern(0, "b");
+        assertSame(r1, r2);
+    }
+}
diff --git a/src/java_datasource/java/test/dev/perfetto/sdk/PacketBuilderTest.java b/src/java_datasource/java/test/dev/perfetto/sdk/PacketBuilderTest.java
new file mode 100644
index 0000000..f7e6184
--- /dev/null
+++ b/src/java_datasource/java/test/dev/perfetto/sdk/PacketBuilderTest.java
@@ -0,0 +1,126 @@
+/*
+ * Copyright (C) 2026 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package dev.perfetto.sdk;
+
+import static org.junit.Assert.*;
+
+import java.util.Arrays;
+import org.junit.Test;
+
+public class PacketBuilderTest {
+
+    static long readVarInt(byte[] data, int[] off) {
+        long r = 0;
+        int shift = 0;
+        while (off[0] < data.length) {
+            byte b = data[off[0]++];
+            r |= (long) (b & 0x7F) << shift;
+            if ((b & 0x80) == 0) return r;
+            shift += 7;
+        }
+        throw new RuntimeException("truncated varint");
+    }
+
+    static void skipField(byte[] data, int[] off, int wt) {
+        switch (wt) {
+            case 0: readVarInt(data, off); break;
+            case 1: off[0] += 8; break;
+            case 2: off[0] += (int) readVarInt(data, off); break;
+            case 5: off[0] += 4; break;
+        }
+    }
+
+    @Test
+    public void matchesRawProtoWriter() {
+        ProtoWriter w = new ProtoWriter(1024);
+        PacketBuilder pb = new PacketBuilder(null);
+
+        pb.start(w)
+                .writeVarInt(1, 42)
+                .beginNested(2)
+                    .writeString(3, "hello")
+                .endNested()
+                .commit();
+        byte[] builderOut = Arrays.copyOf(w.buffer(), w.position());
+
+        ProtoWriter raw = new ProtoWriter(1024);
+        raw.writeVarInt(1, 42);
+        int tok = raw.beginNested(2);
+        raw.writeString(3, "hello");
+        raw.endNested(tok);
+        byte[] rawOut = Arrays.copyOf(raw.buffer(), raw.position());
+
+        assertArrayEquals(rawOut, builderOut);
+    }
+
+    @Test
+    public void autoCloseOnCommit() {
+        ProtoWriter w = new ProtoWriter(1024);
+        PacketBuilder pb = new PacketBuilder(null);
+
+        pb.start(w)
+                .beginNested(1)
+                    .writeVarInt(2, 42)
+                // no endNested -- commit auto-closes
+                .commit();
+
+        byte[] b = Arrays.copyOf(w.buffer(), w.position());
+        int[] off = {0};
+        int tag = (int) readVarInt(b, off);
+        assertEquals(1, tag >>> 3);
+        int len = (int) readVarInt(b, off);
+        int end = off[0] + len;
+        readVarInt(b, off);
+        assertEquals(42, readVarInt(b, off));
+        assertEquals(end, off[0]);
+    }
+
+    @Test
+    public void reuse() {
+        ProtoWriter w = new ProtoWriter(1024);
+        PacketBuilder pb = new PacketBuilder(null);
+
+        pb.start(w).writeVarInt(1, 10).commit();
+        byte[] first = Arrays.copyOf(w.buffer(), w.position());
+
+        w.reset();
+        pb.start(w).writeString(1, "second").commit();
+        byte[] second = Arrays.copyOf(w.buffer(), w.position());
+
+        assertFalse(Arrays.equals(first, second));
+    }
+
+    @Test
+    public void allFieldTypes() {
+        ProtoWriter w = new ProtoWriter(1024);
+        PacketBuilder pb = new PacketBuilder(null);
+
+        pb.start(w)
+                .writeVarInt(1, 42)
+                .writeSInt(2, -1)
+                .writeBool(3, true)
+                .writeFixed32(4, 0x12345678)
+                .writeFixed64(5, 0xDEADL)
+                .writeFloat(6, 1.5f)
+                .writeDouble(7, 3.14)
+                .writeString(8, "test")
+                .writeBytes(9, new byte[]{1, 2})
+                .commit();
+
+        assertTrue(w.position() > 30);
+    }
+}
diff --git a/src/java_datasource/java/test/dev/perfetto/sdk/ProtoWriterTest.java b/src/java_datasource/java/test/dev/perfetto/sdk/ProtoWriterTest.java
new file mode 100644
index 0000000..6ff8d59
--- /dev/null
+++ b/src/java_datasource/java/test/dev/perfetto/sdk/ProtoWriterTest.java
@@ -0,0 +1,307 @@
+/*
+ * Copyright (C) 2026 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package dev.perfetto.sdk;
+
+import static org.junit.Assert.*;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Arrays;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ProtoWriterTest {
+    private ProtoWriter w;
+
+    @Before
+    public void setUp() {
+        w = new ProtoWriter(1024);
+    }
+
+    private byte[] out() {
+        return Arrays.copyOf(w.buffer(), w.position());
+    }
+
+    static long readVarInt(byte[] data, int[] off) {
+        long r = 0;
+        int shift = 0;
+        while (off[0] < data.length) {
+            byte b = data[off[0]++];
+            r |= (long) (b & 0x7F) << shift;
+            if ((b & 0x80) == 0) return r;
+            shift += 7;
+        }
+        throw new RuntimeException("truncated varint");
+    }
+
+    static int fieldId(int tag) { return tag >>> 3; }
+    static int wireType(int tag) { return tag & 0x7; }
+
+    @Test
+    public void varInt() {
+        w.writeVarInt(1, 0);
+        w.writeVarInt(2, 42);
+        w.writeVarInt(3, 0xFFFFFFFFL);
+        w.writeVarInt(4, -1L);
+
+        byte[] b = out();
+        int[] off = {0};
+        assertEquals(1, fieldId((int) readVarInt(b, off))); assertEquals(0, readVarInt(b, off));
+        assertEquals(2, fieldId((int) readVarInt(b, off))); assertEquals(42, readVarInt(b, off));
+        assertEquals(3, fieldId((int) readVarInt(b, off))); assertEquals(0xFFFFFFFFL, readVarInt(b, off));
+        assertEquals(4, fieldId((int) readVarInt(b, off))); assertEquals(-1L, readVarInt(b, off));
+    }
+
+    @Test
+    public void sInt() {
+        w.writeSInt(1, 0);
+        w.writeSInt(2, -1);
+        w.writeSInt(3, 1);
+
+        byte[] b = out();
+        int[] off = {0};
+        readVarInt(b, off); assertEquals(0, readVarInt(b, off));   // zigzag(0) = 0
+        readVarInt(b, off); assertEquals(1, readVarInt(b, off));   // zigzag(-1) = 1
+        readVarInt(b, off); assertEquals(2, readVarInt(b, off));   // zigzag(1) = 2
+    }
+
+    @Test
+    public void boolField() {
+        w.writeBool(1, true);
+        w.writeBool(2, false);
+
+        byte[] b = out();
+        int[] off = {0};
+        readVarInt(b, off); assertEquals(1, readVarInt(b, off));
+        readVarInt(b, off); assertEquals(0, readVarInt(b, off));
+    }
+
+    @Test
+    public void fixed() {
+        w.writeFixed32(1, 0xDEADBEEF);
+        w.writeFixed64(2, 0xCAFEBABEDEADFEEDL);
+        w.writeFloat(3, 3.14f);
+        w.writeDouble(4, 2.71828);
+
+        byte[] b = out();
+        int[] off = {0};
+        int tag = (int) readVarInt(b, off);
+        assertEquals(5, wireType(tag));
+        assertEquals(0xDEADBEEFL, ByteBuffer.wrap(b, off[0], 4).order(ByteOrder.LITTLE_ENDIAN).getInt() & 0xFFFFFFFFL);
+        off[0] += 4;
+
+        tag = (int) readVarInt(b, off);
+        assertEquals(1, wireType(tag));
+        assertEquals(0xCAFEBABEDEADFEEDL, ByteBuffer.wrap(b, off[0], 8).order(ByteOrder.LITTLE_ENDIAN).getLong());
+        off[0] += 8;
+
+        readVarInt(b, off);
+        assertEquals(3.14f, ByteBuffer.wrap(b, off[0], 4).order(ByteOrder.LITTLE_ENDIAN).getFloat(), 0);
+        off[0] += 4;
+
+        readVarInt(b, off);
+        assertEquals(2.71828, ByteBuffer.wrap(b, off[0], 8).order(ByteOrder.LITTLE_ENDIAN).getDouble(), 0);
+    }
+
+    @Test
+    public void stringAscii() {
+        w.writeString(1, "hello");
+        byte[] b = out();
+        int[] off = {0};
+        assertEquals(1, fieldId((int) readVarInt(b, off)));
+        int len = (int) readVarInt(b, off);
+        assertEquals("hello", new String(b, off[0], len));
+    }
+
+    @Test
+    public void stringUtf8() {
+        // 2-byte, 3-byte, and 4-byte UTF-8 code points.
+        String s = "caf\u00e9\u4e16\uD83D\uDE00";
+        w.writeString(1, s);
+        byte[] b = out();
+        int[] off = {0};
+        readVarInt(b, off);
+        int len = (int) readVarInt(b, off);
+        assertEquals(s, new String(b, off[0], len, java.nio.charset.StandardCharsets.UTF_8));
+    }
+
+    @Test
+    public void bytesField() {
+        byte[] data = {0x01, 0x02, (byte) 0xFF};
+        w.writeBytes(1, data);
+        byte[] b = out();
+        int[] off = {0};
+        readVarInt(b, off);
+        int len = (int) readVarInt(b, off);
+        assertArrayEquals(data, Arrays.copyOfRange(b, off[0], off[0] + len));
+    }
+
+    @Test
+    public void nestedEmpty() {
+        int tok = w.beginNested(1);
+        w.endNested(tok);
+
+        byte[] b = out();
+        int[] off = {0};
+        readVarInt(b, off);
+        // Redundant varint: size 0 encoded as 4 bytes.
+        assertEquals(0x80, b[off[0]] & 0xFF);
+        assertEquals(0x80, b[off[0] + 1] & 0xFF);
+        assertEquals(0x80, b[off[0] + 2] & 0xFF);
+        assertEquals(0x00, b[off[0] + 3] & 0xFF);
+        assertEquals(0, (int) readVarInt(b, off));
+    }
+
+    @Test
+    public void nestedWithFields() {
+        int tok = w.beginNested(1);
+        w.writeVarInt(2, 100);
+        w.writeString(3, "test");
+        w.endNested(tok);
+
+        byte[] b = out();
+        int[] off = {0};
+        readVarInt(b, off);
+        int lenStart = off[0];
+        int len = (int) readVarInt(b, off);
+        assertEquals(lenStart + 4, off[0]); // redundant varint = 4 bytes
+        int end = off[0] + len;
+
+        assertEquals(2, fieldId((int) readVarInt(b, off)));
+        assertEquals(100, readVarInt(b, off));
+        assertEquals(3, fieldId((int) readVarInt(b, off)));
+        int slen = (int) readVarInt(b, off);
+        assertEquals("test", new String(b, off[0], slen));
+        off[0] += slen;
+        assertEquals(end, off[0]);
+    }
+
+    @Test
+    public void doublyNested() {
+        int o = w.beginNested(1);
+          int i = w.beginNested(2);
+            w.writeVarInt(3, 99);
+          w.endNested(i);
+          w.writeVarInt(4, 100);
+        w.endNested(o);
+
+        byte[] b = out();
+        int[] off = {0};
+
+        assertEquals(1, fieldId((int) readVarInt(b, off)));
+        int oLen = (int) readVarInt(b, off);
+        int oEnd = off[0] + oLen;
+
+        assertEquals(2, fieldId((int) readVarInt(b, off)));
+        int iLen = (int) readVarInt(b, off);
+        int iEnd = off[0] + iLen;
+
+        assertEquals(3, fieldId((int) readVarInt(b, off)));
+        assertEquals(99, readVarInt(b, off));
+        assertEquals(iEnd, off[0]);
+
+        assertEquals(4, fieldId((int) readVarInt(b, off)));
+        assertEquals(100, readVarInt(b, off));
+        assertEquals(oEnd, off[0]);
+    }
+
+    @Test
+    public void triplyNested() {
+        int l1 = w.beginNested(1);
+          int l2 = w.beginNested(2);
+            int l3 = w.beginNested(3);
+              w.writeVarInt(4, 42);
+            w.endNested(l3);
+          w.endNested(l2);
+        w.endNested(l1);
+
+        byte[] b = out();
+        int[] off = {0};
+        readVarInt(b, off); int s1 = (int) readVarInt(b, off); int e1 = off[0] + s1;
+        readVarInt(b, off); int s2 = (int) readVarInt(b, off); int e2 = off[0] + s2;
+        readVarInt(b, off); int s3 = (int) readVarInt(b, off); int e3 = off[0] + s3;
+        readVarInt(b, off); assertEquals(42, readVarInt(b, off));
+        assertEquals(e3, off[0]);
+        assertEquals(e2, off[0]);
+        assertEquals(e1, off[0]);
+    }
+
+    @Test
+    public void allPrimitivesInNested() {
+        int tok = w.beginNested(1);
+        w.writeVarInt(2, 42);
+        w.writeSInt(3, -100);
+        w.writeBool(4, true);
+        w.writeFixed32(5, 0x12345678);
+        w.writeFixed64(6, 0xCAFEBABEL);
+        w.writeFloat(7, 1.5f);
+        w.writeDouble(8, 9.81);
+        w.writeString(9, "test");
+        w.writeBytes(10, new byte[]{1, 2, 3});
+        w.endNested(tok);
+
+        byte[] b = out();
+        int[] off = {0};
+        readVarInt(b, off);
+        int len = (int) readVarInt(b, off);
+        assertTrue(len > 30);
+        assertEquals(b.length, off[0] + len);
+    }
+
+    @Test
+    public void resetReusesBuffer() {
+        byte[] before = w.buffer();
+        w.writeVarInt(1, 42);
+        w.reset();
+        assertSame(before, w.buffer());
+        assertEquals(0, w.position());
+    }
+
+    @Test
+    public void bufferGrowth() {
+        ProtoWriter small = new ProtoWriter(16);
+        for (int i = 0; i < 10; i++) {
+            small.writeVarInt(1, 0xFFFFFFFFL);
+        }
+        assertTrue(small.position() > 16);
+        // Verify output is still valid.
+        byte[] b = Arrays.copyOf(small.buffer(), small.position());
+        int[] off = {0};
+        for (int i = 0; i < 10; i++) {
+            readVarInt(b, off);
+            assertEquals(0xFFFFFFFFL, readVarInt(b, off));
+        }
+    }
+
+    @Test
+    public void largeRedundantVarInt() {
+        // Nested message > 127 bytes to exercise multi-byte redundant varint.
+        int tok = w.beginNested(1);
+        for (int i = 0; i < 40; i++) {
+            w.writeVarInt(2, 0xFFFFL);
+        }
+        w.endNested(tok);
+
+        byte[] b = out();
+        int[] off = {0};
+        readVarInt(b, off);
+        int lenStart = off[0];
+        int len = (int) readVarInt(b, off);
+        assertEquals(lenStart + 4, off[0]); // still 4-byte redundant varint
+        assertTrue(len > 127);
+    }
+}
diff --git a/src/java_datasource/java/test/dev/perfetto/sdk/TraceEncodingTest.java b/src/java_datasource/java/test/dev/perfetto/sdk/TraceEncodingTest.java
new file mode 100644
index 0000000..ddb9833
--- /dev/null
+++ b/src/java_datasource/java/test/dev/perfetto/sdk/TraceEncodingTest.java
@@ -0,0 +1,202 @@
+/*
+ * Copyright (C) 2026 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package dev.perfetto.sdk;
+
+import static org.junit.Assert.*;
+
+import java.io.*;
+import java.nio.file.*;
+import java.util.Arrays;
+import org.junit.Test;
+
+/**
+ * Encodes a complete trace with ProtoWriter, writes to disk, and verifies
+ * with trace_processor_shell.
+ */
+public class TraceEncodingTest {
+    static final int TP_TIMESTAMP = 8;
+    static final int TP_CLOCK_ID = 58;
+    static final int TP_TRACK_EVENT = 11;
+    static final int TP_SEQ_ID = 10;
+    static final int TP_SEQ_FLAGS = 13;
+    static final int TP_INTERNED_DATA = 12;
+    static final int TP_TRACK_DESCRIPTOR = 60;
+    static final int TD_UUID = 1;
+    static final int TD_NAME = 2;
+    static final int TD_PROCESS = 3;
+    static final int PD_PID = 1;
+    static final int ID_EVENT_NAMES = 2;
+    static final int EN_IID = 1;
+    static final int EN_NAME = 2;
+    static final int TE_NAME = 23;
+    static final int TE_TYPE = 9;
+    static final int TE_TRACK_UUID = 11;
+    static final int TE_NAME_IID = 10;
+    static final int TE_DEBUG_ANNOTATIONS = 4;
+    static final int DA_NAME = 10;
+    static final int DA_INT = 4;
+    static final int DA_DOUBLE = 5;
+    static final int DA_STRING = 6;
+    static final int DA_BOOL = 2;
+
+    @Test
+    public void traceWithSlicesAndArgs() throws Exception {
+        ProtoWriter trace = new ProtoWriter(16384);
+        ProtoWriter pkt = new ProtoWriter(1024);
+
+        // Track descriptor.
+        pkt.writeVarInt(TP_SEQ_ID, 1);
+        int td = pkt.beginNested(TP_TRACK_DESCRIPTOR);
+        pkt.writeVarInt(TD_UUID, 42);
+        pkt.writeString(TD_NAME, "TestTrack");
+        int pd = pkt.beginNested(TD_PROCESS);
+        pkt.writeVarInt(PD_PID, 1234);
+        pkt.endNested(pd);
+        pkt.endNested(td);
+        wrapPacket(trace, pkt);
+
+        // Slice begin with interned name + incremental state.
+        pkt.reset();
+        pkt.writeVarInt(TP_TIMESTAMP, 1000);
+        pkt.writeVarInt(TP_CLOCK_ID, 6);
+        pkt.writeVarInt(TP_SEQ_ID, 1);
+        pkt.writeVarInt(TP_SEQ_FLAGS, 3);
+        int id = pkt.beginNested(TP_INTERNED_DATA);
+        int en = pkt.beginNested(ID_EVENT_NAMES);
+        pkt.writeVarInt(EN_IID, 1);
+        pkt.writeString(EN_NAME, "doFrame");
+        pkt.endNested(en);
+        pkt.endNested(id);
+        int te = pkt.beginNested(TP_TRACK_EVENT);
+        pkt.writeVarInt(TE_TYPE, 1);
+        pkt.writeVarInt(TE_TRACK_UUID, 42);
+        pkt.writeVarInt(TE_NAME_IID, 1);
+        pkt.endNested(te);
+        wrapPacket(trace, pkt);
+
+        // Slice end.
+        pkt.reset();
+        pkt.writeVarInt(TP_TIMESTAMP, 3000);
+        pkt.writeVarInt(TP_CLOCK_ID, 6);
+        pkt.writeVarInt(TP_SEQ_ID, 1);
+        pkt.writeVarInt(TP_SEQ_FLAGS, 2);
+        te = pkt.beginNested(TP_TRACK_EVENT);
+        pkt.writeVarInt(TE_TYPE, 2);
+        pkt.writeVarInt(TE_TRACK_UUID, 42);
+        pkt.endNested(te);
+        wrapPacket(trace, pkt);
+
+        // Instant with debug annotations (int, string, double, bool).
+        pkt.reset();
+        pkt.writeVarInt(TP_TIMESTAMP, 4000);
+        pkt.writeVarInt(TP_CLOCK_ID, 6);
+        pkt.writeVarInt(TP_SEQ_ID, 1);
+        pkt.writeVarInt(TP_SEQ_FLAGS, 2);
+        te = pkt.beginNested(TP_TRACK_EVENT);
+        pkt.writeVarInt(TE_TYPE, 3);
+        pkt.writeVarInt(TE_TRACK_UUID, 42);
+        pkt.writeString(TE_NAME, "frameStats");
+        writeDebugArg(pkt, "frame_id", 42);
+        writeDebugArgStr(pkt, "layer", "com.example.app");
+        writeDebugArgDouble(pkt, "jank_pct", 12.345);
+        writeDebugArgBool(pkt, "is_janky", true);
+        pkt.endNested(te);
+        wrapPacket(trace, pkt);
+
+        byte[] traceBytes = Arrays.copyOf(trace.buffer(), trace.position());
+        Path traceFile = Files.createTempFile("perfetto_e2e_", ".pb");
+        Files.write(traceFile, traceBytes);
+
+        String tpShell = findTraceProcessorShell();
+        if (tpShell == null) {
+            assertTrue(traceBytes.length > 100);
+            return;
+        }
+
+        String slices = runQuery(tpShell, traceFile,
+                "SELECT name, dur FROM slice WHERE dur > 0 ORDER BY ts;");
+        assertTrue(slices.contains("doFrame"));
+        assertTrue(slices.contains("2000"));
+
+        String args = runQuery(tpShell, traceFile,
+                "SELECT key, int_value, string_value, real_value FROM args "
+                + "WHERE arg_set_id IN "
+                + "(SELECT arg_set_id FROM slice WHERE name = 'frameStats') "
+                + "ORDER BY key;");
+        assertTrue(args.contains("frame_id") && args.contains("42"));
+        assertTrue(args.contains("jank_pct") && args.contains("12.345"));
+        assertTrue(args.contains("layer") && args.contains("com.example"));
+
+        Files.deleteIfExists(traceFile);
+    }
+
+    private void wrapPacket(ProtoWriter trace, ProtoWriter pkt) {
+        trace.writeBytes(1, pkt.buffer(), 0, pkt.position());
+    }
+
+    private void writeDebugArg(ProtoWriter w, String name, long value) {
+        int da = w.beginNested(TE_DEBUG_ANNOTATIONS);
+        w.writeString(DA_NAME, name);
+        w.writeVarInt(DA_INT, value);
+        w.endNested(da);
+    }
+
+    private void writeDebugArgStr(ProtoWriter w, String name, String value) {
+        int da = w.beginNested(TE_DEBUG_ANNOTATIONS);
+        w.writeString(DA_NAME, name);
+        w.writeString(DA_STRING, value);
+        w.endNested(da);
+    }
+
+    private void writeDebugArgDouble(ProtoWriter w, String name, double value) {
+        int da = w.beginNested(TE_DEBUG_ANNOTATIONS);
+        w.writeString(DA_NAME, name);
+        w.writeDouble(DA_DOUBLE, value);
+        w.endNested(da);
+    }
+
+    private void writeDebugArgBool(ProtoWriter w, String name, boolean value) {
+        int da = w.beginNested(TE_DEBUG_ANNOTATIONS);
+        w.writeString(DA_NAME, name);
+        w.writeBool(DA_BOOL, value);
+        w.endNested(da);
+    }
+
+    private String runQuery(String tp, Path trace, String sql) throws Exception {
+        ProcessBuilder pb = new ProcessBuilder(tp, "--query-file", "/dev/stdin",
+                trace.toString());
+        pb.redirectErrorStream(true);
+        Process proc = pb.start();
+        proc.getOutputStream().write(sql.getBytes());
+        proc.getOutputStream().close();
+        String out = new String(proc.getInputStream().readAllBytes());
+        assertEquals("query failed: " + out, 0, proc.waitFor());
+        return out;
+    }
+
+    private static String findTraceProcessorShell() {
+        File outDir = new File("out");
+        if (!outDir.isDirectory()) return null;
+        File[] dirs = outDir.listFiles();
+        if (dirs == null) return null;
+        for (File d : dirs) {
+            File tp = new File(d, "trace_processor_shell");
+            if (tp.canExecute()) return tp.getPath();
+        }
+        return null;
+    }
+}
diff --git a/src/java_datasource/java_datasource_integrationtest.cc b/src/java_datasource/java_datasource_integrationtest.cc
new file mode 100644
index 0000000..b620273
--- /dev/null
+++ b/src/java_datasource/java_datasource_integrationtest.cc
@@ -0,0 +1,429 @@
+/*
+ * Copyright (C) 2026 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Integration test for the Java DataSource JNI path.
+//
+// This test exercises the exact same C ABI call sequence that
+// perfetto_datasource_jni.cc uses:
+//   1. PerfettoDsImplCreate + register callbacks + PerfettoDsImplRegister
+//   2. PerfettoDsImplTraceIterateBegin/Next (instance iteration)
+//   3. PerfettoDsTracerImplPacketBegin + PerfettoStreamWriterAppendBytes +
+//      PerfettoDsTracerImplPacketEnd (packet writing)
+//   4. PerfettoDsImplGetIncrementalState (incremental state check)
+//
+// The packet bytes are pre-encoded in the test (mimicking ProtoWriter output)
+// and written to the stream writer via AppendBytes, exactly as the JNI does.
+
+#include <cstddef>
+#include <cstdint>
+#include <cstring>
+#include <string>
+#include <vector>
+
+#include "perfetto/public/abi/atomic.h"
+#include "perfetto/public/abi/data_source_abi.h"
+#include "perfetto/public/abi/stream_writer_abi.h"
+#include "perfetto/public/data_source.h"
+#include "perfetto/public/pb_utils.h"
+#include "perfetto/public/producer.h"
+#include "perfetto/public/protos/trace/test_event.pzc.h"
+#include "perfetto/public/protos/trace/trace.pzc.h"
+#include "perfetto/public/protos/trace/trace_packet.pzc.h"
+#include "perfetto/public/stream_writer.h"
+
+#include "src/shared_lib/reset_for_testing.h"
+#include "src/shared_lib/test/utils.h"
+
+#include "test/gtest_and_gmock.h"
+
+using ::perfetto::shlib::test_utils::FieldView;
+using ::perfetto::shlib::test_utils::IdFieldView;
+using ::perfetto::shlib::test_utils::MsgField;
+using ::perfetto::shlib::test_utils::PbField;
+using ::perfetto::shlib::test_utils::StringField;
+using ::perfetto::shlib::test_utils::TracingSession;
+using ::perfetto::shlib::test_utils::VarIntField;
+using ::testing::_;
+using ::testing::ElementsAre;
+
+namespace {
+
+constexpr char kDataSourceName[] = "dev.perfetto.java_datasource_test";
+
+// Per-instance incremental state, same as JNI IncrState.
+struct IncrState {
+  bool was_cleared;
+};
+
+void* OnCreateIncr(struct PerfettoDsImpl*,
+                   PerfettoDsInstanceIndex,
+                   struct PerfettoDsTracerImpl*,
+                   void*) {
+  auto* incr = new IncrState();
+  incr->was_cleared = true;
+  return incr;
+}
+
+void OnDeleteIncr(void* obj) {
+  delete static_cast<IncrState*>(obj);
+}
+
+bool OnClearIncr(void* obj, void*) {
+  auto* incr = static_cast<IncrState*>(obj);
+  incr->was_cleared = true;
+  return true;
+}
+
+// Helper: encode a varint into buf. Returns number of bytes written.
+size_t EncodeVarInt(uint64_t value, uint8_t* buf) {
+  size_t n = 0;
+  while (value >= 0x80) {
+    buf[n++] = static_cast<uint8_t>((value & 0x7F) | 0x80);
+    value >>= 7;
+  }
+  buf[n++] = static_cast<uint8_t>(value);
+  return n;
+}
+
+// Helper: encode a 4-byte redundant varint (same as ProtoWriter.endNested).
+void EncodeRedundantVarInt(uint32_t value, uint8_t* buf) {
+  buf[0] = static_cast<uint8_t>((value & 0x7F) | 0x80);
+  buf[1] = static_cast<uint8_t>(((value >> 7) & 0x7F) | 0x80);
+  buf[2] = static_cast<uint8_t>(((value >> 14) & 0x7F) | 0x80);
+  buf[3] = static_cast<uint8_t>((value >> 21) & 0x7F);
+}
+
+// Helper: build a TracePacket with a for_testing.payload.str field,
+// using the same encoding ProtoWriter would produce.
+// Returns the encoded bytes.
+std::vector<uint8_t> BuildTestPacket(const std::string& test_string) {
+  std::vector<uint8_t> buf;
+  buf.reserve(256);
+
+  // TracePacket.for_testing (field 900, wire type 2)
+  // for_testing is a nested message
+  uint8_t tmp[16];
+  size_t n;
+
+  // Tag for field 900, wire type 2 (length-delimited)
+  uint32_t tag = (900 << 3) | 2;
+  n = EncodeVarInt(tag, tmp);
+  buf.insert(buf.end(), tmp, tmp + n);
+
+  // Reserve 4 bytes for outer nested length (redundant varint)
+  size_t outer_len_pos = buf.size();
+  buf.resize(buf.size() + 4);
+
+  size_t outer_data_start = buf.size();
+
+  // TestEvent.payload (field 5, wire type 2)
+  tag = (5 << 3) | 2;
+  n = EncodeVarInt(tag, tmp);
+  buf.insert(buf.end(), tmp, tmp + n);
+
+  // Reserve 4 bytes for inner nested length
+  size_t inner_len_pos = buf.size();
+  buf.resize(buf.size() + 4);
+
+  size_t inner_data_start = buf.size();
+
+  // TestEvent.TestPayload.str (field 1, wire type 2)
+  tag = (1 << 3) | 2;
+  n = EncodeVarInt(tag, tmp);
+  buf.insert(buf.end(), tmp, tmp + n);
+
+  // String length
+  n = EncodeVarInt(test_string.size(), tmp);
+  buf.insert(buf.end(), tmp, tmp + n);
+
+  // String data
+  buf.insert(buf.end(), test_string.begin(), test_string.end());
+
+  // Backfill inner nested length
+  uint32_t inner_size = static_cast<uint32_t>(buf.size() - inner_data_start);
+  EncodeRedundantVarInt(inner_size, buf.data() + inner_len_pos);
+
+  // Backfill outer nested length
+  uint32_t outer_size = static_cast<uint32_t>(buf.size() - outer_data_start);
+  EncodeRedundantVarInt(outer_size, buf.data() + outer_len_pos);
+
+  return buf;
+}
+
+// Helper: write pre-encoded packet bytes to all active instances.
+// This is the EXACT same logic as nativeWritePacketToAllInstances in
+// perfetto_datasource_jni.cc.
+void WritePacketToAllInstances(struct PerfettoDsImpl* ds_impl,
+                               const uint8_t* data,
+                               size_t len) {
+  struct PerfettoDsImplTracerIterator it =
+      PerfettoDsImplTraceIterateBegin(ds_impl);
+  while (it.tracer) {
+    struct PerfettoStreamWriter writer =
+        PerfettoDsTracerImplPacketBegin(it.tracer);
+    PerfettoStreamWriterAppendBytes(&writer, data, len);
+    PerfettoDsTracerImplPacketEnd(it.tracer, &writer);
+    PerfettoDsImplTraceIterateNext(ds_impl, &it);
+  }
+}
+
+// Helper: check if any instance had incremental state cleared.
+// Same logic as nativeCheckAnyIncrementalStateCleared.
+bool CheckAnyIncrementalStateCleared(struct PerfettoDsImpl* ds_impl) {
+  bool any_cleared = false;
+  struct PerfettoDsImplTracerIterator it =
+      PerfettoDsImplTraceIterateBegin(ds_impl);
+  while (it.tracer) {
+    auto* incr = static_cast<IncrState*>(
+        PerfettoDsImplGetIncrementalState(ds_impl, it.tracer, it.inst_id));
+    if (incr && incr->was_cleared) {
+      any_cleared = true;
+      incr->was_cleared = false;
+    }
+    PerfettoDsImplTraceIterateNext(ds_impl, &it);
+  }
+  return any_cleared;
+}
+
+class JavaDataSourceTest : public testing::Test {
+ protected:
+  void SetUp() override {
+    struct PerfettoProducerInitArgs args = PERFETTO_PRODUCER_INIT_ARGS_INIT();
+    args.backends = PERFETTO_BACKEND_IN_PROCESS;
+    PerfettoProducerInit(args);
+
+    ds_impl_ = PerfettoDsImplCreate();
+
+    PerfettoDsSetOnCreateIncr(ds_impl_, OnCreateIncr);
+    PerfettoDsSetOnDeleteIncr(ds_impl_, OnDeleteIncr);
+    PerfettoDsSetOnClearIncr(ds_impl_, OnClearIncr);
+
+    // Build DataSourceDescriptor proto: field 1 = name string
+    uint8_t desc[256];
+    uint8_t* p = desc;
+    *p++ = (1 << 3) | 2;  // field 1, wire type 2
+    size_t name_len = strlen(kDataSourceName);
+    *p++ = static_cast<uint8_t>(name_len);
+    memcpy(p, kDataSourceName, name_len);
+    p += name_len;
+    size_t desc_size = static_cast<size_t>(p - desc);
+
+    bool ok = PerfettoDsImplRegister(ds_impl_, &enabled_ptr_, desc, desc_size);
+    ASSERT_TRUE(ok);
+  }
+
+  void TearDown() override {
+    perfetto::shlib::ResetForTesting();
+    if (ds_impl_) {
+      perfetto::shlib::DsImplDestroy(ds_impl_);
+      ds_impl_ = nullptr;
+    }
+  }
+
+  struct PerfettoDsImpl* ds_impl_ = nullptr;
+  PERFETTO_ATOMIC(bool) * enabled_ptr_ = nullptr;
+};
+
+// Test: disabled data source doesn't execute
+TEST_F(JavaDataSourceTest, DisabledNotExecuted) {
+  struct PerfettoDsImplTracerIterator it =
+      PerfettoDsImplTraceIterateBegin(ds_impl_);
+  EXPECT_EQ(it.tracer, nullptr);
+}
+
+// Test: basic packet write through the JNI path
+TEST_F(JavaDataSourceTest, WritePacketViaAppendBytes) {
+  TracingSession tracing_session =
+      TracingSession::Builder().set_data_source_name(kDataSourceName).Build();
+
+  // Build a packet with for_testing.payload.str = "HELLO_FROM_JAVA"
+  std::vector<uint8_t> packet = BuildTestPacket("HELLO_FROM_JAVA");
+
+  // Write it using the same path as the JNI
+  WritePacketToAllInstances(ds_impl_, packet.data(), packet.size());
+
+  tracing_session.StopBlocking();
+  std::vector<uint8_t> data = tracing_session.ReadBlocking();
+
+  // Verify: find the for_testing field with our string
+  bool found = false;
+  for (struct PerfettoPbDecoderField trace_field : FieldView(data)) {
+    ASSERT_THAT(trace_field, PbField(perfetto_protos_Trace_packet_field_number,
+                                     MsgField(_)));
+    IdFieldView for_testing(
+        trace_field, perfetto_protos_TracePacket_for_testing_field_number);
+    ASSERT_TRUE(for_testing.ok());
+    if (for_testing.size() == 0) {
+      continue;
+    }
+    found = true;
+    ASSERT_EQ(for_testing.size(), 1u);
+    // Check payload.str
+    EXPECT_THAT(FieldView(for_testing.front()),
+                ElementsAre(PbField(
+                    perfetto_protos_TestEvent_payload_field_number,
+                    MsgField(ElementsAre(PbField(
+                        perfetto_protos_TestEvent_TestPayload_str_field_number,
+                        StringField("HELLO_FROM_JAVA")))))));
+  }
+  EXPECT_TRUE(found);
+}
+
+// Test: multiple packets
+TEST_F(JavaDataSourceTest, WriteMultiplePackets) {
+  TracingSession tracing_session =
+      TracingSession::Builder().set_data_source_name(kDataSourceName).Build();
+
+  std::vector<uint8_t> pkt1 = BuildTestPacket("PACKET_ONE");
+  std::vector<uint8_t> pkt2 = BuildTestPacket("PACKET_TWO");
+  std::vector<uint8_t> pkt3 = BuildTestPacket("PACKET_THREE");
+
+  WritePacketToAllInstances(ds_impl_, pkt1.data(), pkt1.size());
+  WritePacketToAllInstances(ds_impl_, pkt2.data(), pkt2.size());
+  WritePacketToAllInstances(ds_impl_, pkt3.data(), pkt3.size());
+
+  tracing_session.StopBlocking();
+  std::vector<uint8_t> data = tracing_session.ReadBlocking();
+
+  int found_count = 0;
+  for (struct PerfettoPbDecoderField trace_field : FieldView(data)) {
+    ASSERT_THAT(trace_field, PbField(perfetto_protos_Trace_packet_field_number,
+                                     MsgField(_)));
+    IdFieldView for_testing(
+        trace_field, perfetto_protos_TracePacket_for_testing_field_number);
+    ASSERT_TRUE(for_testing.ok());
+    if (for_testing.size() > 0) {
+      found_count++;
+    }
+  }
+  EXPECT_EQ(found_count, 3);
+}
+
+// Test: incremental state tracking (used for interning)
+TEST_F(JavaDataSourceTest, IncrementalStateCleared) {
+  TracingSession tracing_session =
+      TracingSession::Builder().set_data_source_name(kDataSourceName).Build();
+
+  // First check: state should be cleared (new state)
+  bool cleared = CheckAnyIncrementalStateCleared(ds_impl_);
+  EXPECT_TRUE(cleared);
+
+  // Second check: state should NOT be cleared (we consumed the flag)
+  cleared = CheckAnyIncrementalStateCleared(ds_impl_);
+  EXPECT_FALSE(cleared);
+
+  // Write a packet to ensure everything works
+  std::vector<uint8_t> pkt = BuildTestPacket("AFTER_INCR_CHECK");
+  WritePacketToAllInstances(ds_impl_, pkt.data(), pkt.size());
+
+  tracing_session.StopBlocking();
+  std::vector<uint8_t> data = tracing_session.ReadBlocking();
+
+  bool found = false;
+  for (struct PerfettoPbDecoderField trace_field : FieldView(data)) {
+    ASSERT_THAT(trace_field, PbField(perfetto_protos_Trace_packet_field_number,
+                                     MsgField(_)));
+    IdFieldView for_testing(
+        trace_field, perfetto_protos_TracePacket_for_testing_field_number);
+    ASSERT_TRUE(for_testing.ok());
+    if (for_testing.size() > 0) {
+      found = true;
+    }
+  }
+  EXPECT_TRUE(found);
+}
+
+// Test: two concurrent tracing sessions (multi-instance)
+TEST_F(JavaDataSourceTest, MultiInstance) {
+  TracingSession session1 =
+      TracingSession::Builder().set_data_source_name(kDataSourceName).Build();
+  TracingSession session2 =
+      TracingSession::Builder().set_data_source_name(kDataSourceName).Build();
+
+  // Count how many instances are active
+  int instance_count = 0;
+  struct PerfettoDsImplTracerIterator it =
+      PerfettoDsImplTraceIterateBegin(ds_impl_);
+  while (it.tracer) {
+    instance_count++;
+    PerfettoDsImplTraceIterateNext(ds_impl_, &it);
+  }
+  EXPECT_EQ(instance_count, 2);
+
+  // Write a packet -- should go to both sessions
+  std::vector<uint8_t> pkt = BuildTestPacket("MULTI_INSTANCE");
+  WritePacketToAllInstances(ds_impl_, pkt.data(), pkt.size());
+
+  session1.StopBlocking();
+  session2.StopBlocking();
+
+  // Both sessions should have the packet
+  auto data1 = session1.ReadBlocking();
+  auto data2 = session2.ReadBlocking();
+
+  auto count_test_packets = [](const std::vector<uint8_t>& data) -> int {
+    int count = 0;
+    for (struct PerfettoPbDecoderField trace_field : FieldView(data)) {
+      IdFieldView for_testing(
+          trace_field, perfetto_protos_TracePacket_for_testing_field_number);
+      if (for_testing.ok() && for_testing.size() > 0) {
+        count++;
+      }
+    }
+    return count;
+  };
+
+  EXPECT_EQ(count_test_packets(data1), 1);
+  EXPECT_EQ(count_test_packets(data2), 1);
+}
+
+// Test: large packet that spans multiple chunks
+TEST_F(JavaDataSourceTest, LargePacket) {
+  TracingSession tracing_session =
+      TracingSession::Builder().set_data_source_name(kDataSourceName).Build();
+
+  // Build a large test string (8KB -- bigger than a typical chunk)
+  std::string large_string(8192, 'X');
+  std::vector<uint8_t> pkt = BuildTestPacket(large_string);
+
+  WritePacketToAllInstances(ds_impl_, pkt.data(), pkt.size());
+
+  tracing_session.StopBlocking();
+  std::vector<uint8_t> data = tracing_session.ReadBlocking();
+
+  bool found = false;
+  for (struct PerfettoPbDecoderField trace_field : FieldView(data)) {
+    ASSERT_THAT(trace_field, PbField(perfetto_protos_Trace_packet_field_number,
+                                     MsgField(_)));
+    IdFieldView for_testing(
+        trace_field, perfetto_protos_TracePacket_for_testing_field_number);
+    ASSERT_TRUE(for_testing.ok());
+    if (for_testing.size() > 0) {
+      found = true;
+      // Verify the payload string matches
+      EXPECT_THAT(
+          FieldView(for_testing.front()),
+          ElementsAre(PbField(
+              perfetto_protos_TestEvent_payload_field_number,
+              MsgField(ElementsAre(PbField(
+                  perfetto_protos_TestEvent_TestPayload_str_field_number,
+                  StringField(large_string)))))));
+    }
+  }
+  EXPECT_TRUE(found);
+}
+
+}  // namespace
diff --git a/src/java_datasource/jni/BUILD.gn b/src/java_datasource/jni/BUILD.gn
new file mode 100644
index 0000000..918c916
--- /dev/null
+++ b/src/java_datasource/jni/BUILD.gn
@@ -0,0 +1,35 @@
+import("../../../gn/perfetto.gni")
+import("../../../gn/perfetto_android_sdk.gni")
+
+assert(enable_perfetto_android_java_sdk)
+
+SOURCE_SET_SOURCES = [
+  "perfetto_datasource_jni.cc",
+  "perfetto_datasource_jni.h",
+]
+
+SOURCE_SET_DEPS = [
+  "../../../gn:default_deps",
+  "../../android_sdk/nativehelper:nativehelper",
+]
+
+source_set("libperfetto_datasource_jni_src") {
+  sources = SOURCE_SET_SOURCES
+  deps = SOURCE_SET_DEPS
+}
+
+source_set("libperfetto_datasource_framework_jni_src") {
+  sources = SOURCE_SET_SOURCES
+  deps = SOURCE_SET_DEPS
+  cflags = [ "-DPERFETTO_JNI_JARJAR_PREFIX=com/android/internal/" ]
+}
+
+perfetto_android_jni_library("libperfetto_datasource_jni") {
+  deps = [
+    ":libperfetto_datasource_jni_src",
+    "../../../gn:default_deps",
+    "../../shared_lib:libperfetto_c",
+  ]
+  binary_name = "libperfetto_datasource_jni.so"
+  bazel_linkopts = "-llog"
+}
diff --git a/src/java_datasource/jni/perfetto_datasource_jni.cc b/src/java_datasource/jni/perfetto_datasource_jni.cc
new file mode 100644
index 0000000..8309fd5
--- /dev/null
+++ b/src/java_datasource/jni/perfetto_datasource_jni.cc
@@ -0,0 +1,347 @@
+/*
+ * Copyright (C) 2026 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "src/java_datasource/jni/perfetto_datasource_jni.h"
+
+#include <android/log.h>
+#include <jni.h>
+#include <string.h>
+
+#include "perfetto/public/abi/atomic.h"
+#include "perfetto/public/abi/data_source_abi.h"
+#include "perfetto/public/abi/stream_writer_abi.h"
+#include "perfetto/public/stream_writer.h"
+#include "src/android_sdk/jni/macros.h"
+#include "src/android_sdk/nativehelper/JNIHelp.h"
+#include "src/android_sdk/nativehelper/scoped_utf_chars.h"
+
+#define LOG_TAG "PerfettoDataSourceJNI"
+
+namespace perfetto {
+namespace jni {
+
+namespace {
+
+// Per-data-source registration state. Stored as the user_arg for callbacks.
+struct DsState {
+  struct PerfettoDsImpl* ds_impl;
+  // Points to the native atomic enabled flag. True when any instance is active.
+  // Used by OnStop to decide whether to disable the Java fast path.
+  PERFETTO_ATOMIC(bool) * enabled_ptr;
+  // Global ref to the Java PerfettoDataSource object.
+  jobject java_ds_global_ref;
+  // Cached JVM pointer for callbacks from arbitrary threads.
+  JavaVM* jvm;
+  // Cached method IDs.
+  jmethodID on_enabled_changed_mid;
+  jmethodID on_setup_mid;
+  jmethodID on_start_mid;
+  jmethodID on_stop_mid;
+  jmethodID on_flush_mid;
+};
+
+// Per-instance incremental state. Tracks whether the state was cleared
+// so Java can know when to re-emit InternedData.
+struct IncrState {
+  bool was_cleared;
+};
+
+JNIEnv* GetJNIEnv(JavaVM* jvm) {
+  JNIEnv* env = nullptr;
+  jvm->GetEnv(reinterpret_cast<void**>(&env), JNI_VERSION_1_6);
+  return env;
+}
+
+// ============================================================================
+// Data source callbacks
+// ============================================================================
+
+void* OnSetup(struct PerfettoDsImpl*,
+              PerfettoDsInstanceIndex inst_id,
+              void* ds_config,
+              size_t ds_config_size,
+              void* user_arg,
+              struct PerfettoDsOnSetupArgs*) {
+  auto* state = static_cast<DsState*>(user_arg);
+  JNIEnv* env = GetJNIEnv(state->jvm);
+  if (env) {
+    jbyteArray config = env->NewByteArray(static_cast<jsize>(ds_config_size));
+    if (config) {
+      env->SetByteArrayRegion(config, 0, static_cast<jsize>(ds_config_size),
+                              reinterpret_cast<const jbyte*>(ds_config));
+      env->CallVoidMethod(state->java_ds_global_ref, state->on_setup_mid,
+                          static_cast<jint>(inst_id), config);
+      env->DeleteLocalRef(config);
+    }
+  }
+  return nullptr;
+}
+
+void OnStart(struct PerfettoDsImpl*,
+             PerfettoDsInstanceIndex inst_id,
+             void* user_arg,
+             void*,
+             struct PerfettoDsOnStartArgs*) {
+  auto* state = static_cast<DsState*>(user_arg);
+  JNIEnv* env = GetJNIEnv(state->jvm);
+  if (env) {
+    env->CallVoidMethod(state->java_ds_global_ref,
+                        state->on_enabled_changed_mid, JNI_TRUE);
+    env->CallVoidMethod(state->java_ds_global_ref, state->on_start_mid,
+                        static_cast<jint>(inst_id));
+  }
+}
+
+void OnStop(struct PerfettoDsImpl*,
+            PerfettoDsInstanceIndex inst_id,
+            void* user_arg,
+            void*,
+            struct PerfettoDsOnStopArgs*) {
+  auto* state = static_cast<DsState*>(user_arg);
+  JNIEnv* env = GetJNIEnv(state->jvm);
+  if (env) {
+    // Only disable the Java fast path if no instances remain active.
+    // The native enabled_ptr is an atomic bool managed by the C SDK that
+    // is true when at least one instance is active.
+    if (!PERFETTO_ATOMIC_LOAD_EXPLICIT(*state->enabled_ptr,
+                                       PERFETTO_MEMORY_ORDER_RELAXED)) {
+      env->CallVoidMethod(state->java_ds_global_ref,
+                          state->on_enabled_changed_mid, JNI_FALSE);
+    }
+    env->CallVoidMethod(state->java_ds_global_ref, state->on_stop_mid,
+                        static_cast<jint>(inst_id));
+  }
+}
+
+void OnFlush(struct PerfettoDsImpl*,
+             PerfettoDsInstanceIndex inst_id,
+             void* user_arg,
+             void*,
+             struct PerfettoDsOnFlushArgs*) {
+  auto* state = static_cast<DsState*>(user_arg);
+  JNIEnv* env = GetJNIEnv(state->jvm);
+  if (env) {
+    env->CallVoidMethod(state->java_ds_global_ref, state->on_flush_mid,
+                        static_cast<jint>(inst_id));
+  }
+}
+
+// Incremental state callbacks.
+
+void* OnCreateIncr(struct PerfettoDsImpl*,
+                   PerfettoDsInstanceIndex,
+                   struct PerfettoDsTracerImpl*,
+                   void*) {
+  auto* incr = new IncrState();
+  incr->was_cleared = true;  // New state counts as "cleared"
+  return incr;
+}
+
+void OnDeleteIncr(void* obj) {
+  delete static_cast<IncrState*>(obj);
+}
+
+bool OnClearIncr(void* obj, void*) {
+  auto* incr = static_cast<IncrState*>(obj);
+  incr->was_cleared = true;
+  return true;
+}
+
+}  // namespace
+
+// ============================================================================
+// JNI methods
+// ============================================================================
+
+static jlong nativeRegister(JNIEnv* env,
+                            jclass,
+                            jobject java_ds,
+                            jstring name) {
+  ScopedUtfChars name_chars(env, name);
+  if (name_chars.c_str() == nullptr) {
+    return 0;
+  }
+
+  struct PerfettoDsImpl* ds_impl = PerfettoDsImplCreate();
+
+  auto* state = new DsState();
+  state->ds_impl = ds_impl;
+  state->java_ds_global_ref = env->NewGlobalRef(java_ds);
+  env->GetJavaVM(&state->jvm);
+
+  jclass cls = env->GetObjectClass(java_ds);
+  state->on_enabled_changed_mid =
+      env->GetMethodID(cls, "onEnabledChanged", "(Z)V");
+  state->on_setup_mid = env->GetMethodID(cls, "onSetup", "(I[B)V");
+  state->on_start_mid = env->GetMethodID(cls, "onStart", "(I)V");
+  state->on_stop_mid = env->GetMethodID(cls, "onStop", "(I)V");
+  state->on_flush_mid = env->GetMethodID(cls, "onFlush", "(I)V");
+
+  PerfettoDsSetOnSetupCallback(ds_impl, OnSetup);
+  PerfettoDsSetOnStartCallback(ds_impl, OnStart);
+  PerfettoDsSetOnStopCallback(ds_impl, OnStop);
+  PerfettoDsSetOnFlushCallback(ds_impl, OnFlush);
+  PerfettoDsSetOnCreateIncr(ds_impl, OnCreateIncr);
+  PerfettoDsSetOnDeleteIncr(ds_impl, OnDeleteIncr);
+  PerfettoDsSetOnClearIncr(ds_impl, OnClearIncr);
+  PerfettoDsSetCbUserArg(ds_impl, state);
+
+  // Build the DataSourceDescriptor proto: field 1 (name) as a
+  // length-delimited string. The length is encoded as a single-byte varint,
+  // which limits names to 127 bytes.
+  size_t name_len = strlen(name_chars.c_str());
+  if (name_len > 127) {
+    __android_log_print(ANDROID_LOG_ERROR, LOG_TAG,
+                        "Data source name too long (max 127): %s",
+                        name_chars.c_str());
+    env->DeleteGlobalRef(state->java_ds_global_ref);
+    delete state;
+    return 0;
+  }
+  uint8_t desc[256];
+  uint8_t* p = desc;
+  *p++ = (1 << 3) | 2;  // field 1, wire type 2
+  *p++ = static_cast<uint8_t>(name_len);
+  memcpy(p, name_chars.c_str(), name_len);
+  p += name_len;
+  size_t desc_size = static_cast<size_t>(p - desc);
+
+  bool ok =
+      PerfettoDsImplRegister(ds_impl, &state->enabled_ptr, desc, desc_size);
+  if (!ok) {
+    __android_log_print(ANDROID_LOG_ERROR, LOG_TAG,
+                        "Failed to register data source: %s",
+                        name_chars.c_str());
+    env->DeleteGlobalRef(state->java_ds_global_ref);
+    delete state;
+    return 0;
+  }
+
+  return static_cast<jlong>(reinterpret_cast<uintptr_t>(ds_impl));
+}
+
+static jboolean nativeCheckAnyIncrementalStateCleared(jlong ds_ptr) {
+  auto* ds_impl =
+      reinterpret_cast<struct PerfettoDsImpl*>(static_cast<uintptr_t>(ds_ptr));
+  bool any_cleared = false;
+
+  struct PerfettoDsImplTracerIterator it =
+      PerfettoDsImplTraceIterateBegin(ds_impl);
+  while (it.tracer) {
+    auto* incr = static_cast<IncrState*>(
+        PerfettoDsImplGetIncrementalState(ds_impl, it.tracer, it.inst_id));
+    if (incr && incr->was_cleared) {
+      any_cleared = true;
+      incr->was_cleared = false;
+    }
+    PerfettoDsImplTraceIterateNext(ds_impl, &it);
+  }
+
+  return any_cleared ? JNI_TRUE : JNI_FALSE;
+}
+
+static void nativeWritePacketToAllInstances(JNIEnv* env,
+                                            jclass,
+                                            jlong ds_ptr,
+                                            jbyteArray buf,
+                                            jint len) {
+  if (len <= 0) {
+    return;
+  }
+
+  auto* ds_impl =
+      reinterpret_cast<struct PerfettoDsImpl*>(static_cast<uintptr_t>(ds_ptr));
+
+  // Copy Java byte[] to a stack buffer to avoid holding a JNI critical
+  // region across stream writer calls (which may involve IPC for chunk
+  // transitions).
+  static constexpr size_t kStackBufSize = 4096;
+  uint8_t stack_buf[kStackBufSize];
+  uint8_t* data;
+  bool heap_allocated = false;
+
+  if (static_cast<size_t>(len) <= kStackBufSize) {
+    data = stack_buf;
+  } else {
+    data = new uint8_t[static_cast<size_t>(len)];
+    heap_allocated = true;
+  }
+
+  env->GetByteArrayRegion(buf, 0, len, reinterpret_cast<jbyte*>(data));
+
+  // Iterate all active instances and write the packet to each.
+  struct PerfettoDsImplTracerIterator it =
+      PerfettoDsImplTraceIterateBegin(ds_impl);
+  while (it.tracer) {
+    struct PerfettoStreamWriter writer =
+        PerfettoDsTracerImplPacketBegin(it.tracer);
+    PerfettoStreamWriterAppendBytes(&writer, data, static_cast<size_t>(len));
+    PerfettoDsTracerImplPacketEnd(it.tracer, &writer);
+    PerfettoDsImplTraceIterateNext(ds_impl, &it);
+  }
+
+  if (heap_allocated) {
+    delete[] data;
+  }
+}
+
+static void nativeFlush(jlong ds_ptr) {
+  auto* ds_impl =
+      reinterpret_cast<struct PerfettoDsImpl*>(static_cast<uintptr_t>(ds_ptr));
+
+  struct PerfettoDsImplTracerIterator it =
+      PerfettoDsImplTraceIterateBegin(ds_impl);
+  while (it.tracer) {
+    PerfettoDsTracerImplFlush(it.tracer, nullptr, nullptr);
+    PerfettoDsImplTraceIterateNext(ds_impl, &it);
+  }
+}
+
+// ============================================================================
+// JNI registration
+// ============================================================================
+
+static const JNINativeMethod gMethods[] = {
+    {"nativeRegister",
+     "(Ldev/perfetto/sdk/PerfettoDataSource;Ljava/lang/String;)J",
+     reinterpret_cast<void*>(nativeRegister)},
+    {"nativeCheckAnyIncrementalStateCleared", "(J)Z",
+     reinterpret_cast<void*>(nativeCheckAnyIncrementalStateCleared)},
+    {"nativeWritePacketToAllInstances", "(J[BI)V",
+     reinterpret_cast<void*>(nativeWritePacketToAllInstances)},
+    {"nativeFlush", "(J)V", reinterpret_cast<void*>(nativeFlush)},
+};
+
+int register_dev_perfetto_sdk_PerfettoDataSource(JNIEnv* env) {
+  int res = jniRegisterNativeMethods(
+      env, TO_MAYBE_JAR_JAR_CLASS_NAME("dev/perfetto/sdk/PerfettoDataSource"),
+      gMethods, sizeof(gMethods) / sizeof(gMethods[0]));
+  LOG_ALWAYS_FATAL_IF(res < 0,
+                      "Unable to register PerfettoDataSource native methods.");
+  return 0;
+}
+
+}  // namespace jni
+}  // namespace perfetto
+
+JNIEXPORT jint JNI_OnLoad(JavaVM* vm, void*) {
+  JNIEnv* env;
+  if (vm->GetEnv(reinterpret_cast<void**>(&env), JNI_VERSION_1_6) != JNI_OK) {
+    return JNI_ERR;
+  }
+  perfetto::jni::register_dev_perfetto_sdk_PerfettoDataSource(env);
+  return JNI_VERSION_1_6;
+}
diff --git a/src/java_datasource/jni/perfetto_datasource_jni.h b/src/java_datasource/jni/perfetto_datasource_jni.h
new file mode 100644
index 0000000..ba68fce
--- /dev/null
+++ b/src/java_datasource/jni/perfetto_datasource_jni.h
@@ -0,0 +1,30 @@
+/*
+ * Copyright (C) 2026 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef SRC_JAVA_DATASOURCE_JNI_PERFETTO_DATASOURCE_JNI_H_
+#define SRC_JAVA_DATASOURCE_JNI_PERFETTO_DATASOURCE_JNI_H_
+
+#include <jni.h>
+
+namespace perfetto {
+namespace jni {
+
+int register_dev_perfetto_sdk_PerfettoDataSource(JNIEnv* env);
+
+}  // namespace jni
+}  // namespace perfetto
+
+#endif  // SRC_JAVA_DATASOURCE_JNI_PERFETTO_DATASOURCE_JNI_H_