Allow for fuse/free races in `upb_Arena`.

Implementation is by kfm@, I only added the portability code around it.

`upb_Arena` was designed to be only thread-compatible.  However, fusing of arenas muddies the waters somewhat, because two distinct `upb_Arena` objects will end up sharing state when fused.  This causes a `upb_Arena_Free(a)` to interfere with `upb_Arena_Fuse(b, c)` if `a` and `b` were previously fused.

It turns out that we can use atomics to fix this with about a 35% regression in fuse performance (see below).  Arena create+free does not regress, thanks to special-case logic in Free().

`upb_Arena` is still a thread-compatible type, and it is still never safe to call `upb_Arena_xxx(a)` and `upb_Arena_yyy(a)` in parallel.  However you can at least now call `upb_Arena_Free(a)` and `upb_Arena_Fuse(b, c)` in parallel, even if `a` and `b` were previously fused.

Note that `upb_Arena_Fuse(a, b)` and `upb_Arena_Fuse(c, d)` is still not allowed if `b` and `c` were previously fused.  In practice this means that fuses must still be single-threaded within a single fused group.

Performance results:

```
name                                           old cpu/op  new cpu/op  delta
BM_ArenaOneAlloc                                 18.6ns ± 1%  18.6ns ± 1%     ~     (p=0.726 n=18+17)
BM_ArenaInitialBlockOneAlloc                     6.28ns ± 1%  5.73ns ± 1%   -8.68%  (p=0.000 n=17+20)
BM_ArenaFuseUnbalanced/2                         44.1ns ± 2%  60.4ns ± 1%  +37.05%  (p=0.000 n=18+19)
BM_ArenaFuseUnbalanced/8                          370ns ± 2%   500ns ± 1%  +35.12%  (p=0.000 n=19+20)
BM_ArenaFuseUnbalanced/64                        3.52µs ± 1%  4.71µs ± 1%  +33.80%  (p=0.000 n=18+19)
BM_ArenaFuseUnbalanced/128                       7.20µs ± 1%  9.72µs ± 2%  +34.93%  (p=0.000 n=16+19)
BM_ArenaFuseBalanced/2                           44.4ns ± 2%  61.4ns ± 1%  +38.23%  (p=0.000 n=20+17)
BM_ArenaFuseBalanced/8                            373ns ± 2%   509ns ± 1%  +36.57%  (p=0.000 n=19+17)
BM_ArenaFuseBalanced/64                          3.55µs ± 2%  4.79µs ± 1%  +34.80%  (p=0.000 n=19+19)
BM_ArenaFuseBalanced/128                         7.26µs ± 1%  9.76µs ± 1%  +34.45%  (p=0.000 n=17+19)
BM_LoadAdsDescriptor_Upb<NoLayout>               5.66ms ± 1%  5.69ms ± 1%   +0.57%  (p=0.013 n=18+20)
BM_LoadAdsDescriptor_Upb<WithLayout>             6.30ms ± 1%  6.36ms ± 1%   +0.90%  (p=0.000 n=19+18)
BM_LoadAdsDescriptor_Proto2<NoLayout>            12.1ms ± 1%  12.1ms ± 1%     ~     (p=0.118 n=18+18)
BM_LoadAdsDescriptor_Proto2<WithLayout>          12.2ms ± 1%  12.3ms ± 1%   +0.50%  (p=0.006 n=18+18)
BM_Parse_Upb_FileDesc<UseArena, Copy>            12.7µs ± 1%  12.7µs ± 1%     ~     (p=0.194 n=20+19)
BM_Parse_Upb_FileDesc<UseArena, Alias>           11.6µs ± 1%  11.6µs ± 1%     ~     (p=0.192 n=20+20)
BM_Parse_Upb_FileDesc<InitBlock, Copy>           12.5µs ± 1%  12.5µs ± 0%     ~     (p=0.750 n=18+14)
BM_Parse_Upb_FileDesc<InitBlock, Alias>          11.4µs ± 1%  11.3µs ± 1%   -0.34%  (p=0.046 n=19+19)
BM_Parse_Proto2<FileDesc, NoArena, Copy>         25.4µs ± 1%  25.7µs ± 2%   +1.37%  (p=0.000 n=18+18)
BM_Parse_Proto2<FileDesc, UseArena, Copy>        12.1µs ± 2%  12.1µs ± 1%     ~     (p=0.143 n=18+18)
BM_Parse_Proto2<FileDesc, InitBlock, Copy>       11.9µs ± 3%  11.9µs ± 1%     ~     (p=0.076 n=17+19)
BM_Parse_Proto2<FileDescSV, InitBlock, Alias>    13.2µs ± 1%  13.2µs ± 1%     ~     (p=0.053 n=19+19)
BM_SerializeDescriptor_Proto2                    5.97µs ± 4%  5.90µs ± 4%     ~     (p=0.093 n=17+19)
BM_SerializeDescriptor_Upb                       10.4µs ± 1%  10.4µs ± 1%     ~     (p=0.909 n=17+18)

name                                           old time/op             new time/op             delta
BM_ArenaOneAlloc                                 18.7ns ± 2%             18.6ns ± 0%     ~           (p=0.607 n=18+17)
BM_ArenaInitialBlockOneAlloc                     6.29ns ± 1%             5.74ns ± 1%   -8.71%        (p=0.000 n=17+19)
BM_ArenaFuseUnbalanced/2                         44.1ns ± 1%             60.6ns ± 1%  +37.21%        (p=0.000 n=17+19)
BM_ArenaFuseUnbalanced/8                          371ns ± 2%              500ns ± 1%  +35.02%        (p=0.000 n=19+16)
BM_ArenaFuseUnbalanced/64                        3.53µs ± 1%             4.72µs ± 1%  +33.85%        (p=0.000 n=18+19)
BM_ArenaFuseUnbalanced/128                       7.22µs ± 1%             9.73µs ± 2%  +34.87%        (p=0.000 n=16+19)
BM_ArenaFuseBalanced/2                           44.5ns ± 2%             61.5ns ± 1%  +38.22%        (p=0.000 n=20+17)
BM_ArenaFuseBalanced/8                            373ns ± 2%              510ns ± 1%  +36.58%        (p=0.000 n=19+16)
BM_ArenaFuseBalanced/64                          3.56µs ± 2%             4.80µs ± 1%  +34.87%        (p=0.000 n=19+19)
BM_ArenaFuseBalanced/128                         7.27µs ± 1%             9.77µs ± 1%  +34.40%        (p=0.000 n=17+19)
BM_LoadAdsDescriptor_Upb<NoLayout>               5.67ms ± 1%             5.71ms ± 1%   +0.60%        (p=0.011 n=18+20)
BM_LoadAdsDescriptor_Upb<WithLayout>             6.32ms ± 1%             6.37ms ± 1%   +0.87%        (p=0.000 n=19+18)
BM_LoadAdsDescriptor_Proto2<NoLayout>            12.1ms ± 1%             12.2ms ± 1%     ~           (p=0.126 n=18+19)
BM_LoadAdsDescriptor_Proto2<WithLayout>          12.2ms ± 1%             12.3ms ± 1%   +0.51%        (p=0.002 n=18+18)
BM_Parse_Upb_FileDesc<UseArena, Copy>            12.7µs ± 1%             12.7µs ± 1%     ~           (p=0.149 n=20+19)
BM_Parse_Upb_FileDesc<UseArena, Alias>           11.6µs ± 1%             11.6µs ± 1%     ~           (p=0.211 n=20+20)
BM_Parse_Upb_FileDesc<InitBlock, Copy>           12.5µs ± 1%             12.5µs ± 1%     ~           (p=0.986 n=18+15)
BM_Parse_Upb_FileDesc<InitBlock, Alias>          11.4µs ± 1%             11.3µs ± 1%     ~           (p=0.081 n=19+18)
BM_Parse_Proto2<FileDesc, NoArena, Copy>         25.4µs ± 1%             25.8µs ± 2%   +1.41%        (p=0.000 n=18+18)
BM_Parse_Proto2<FileDesc, UseArena, Copy>        12.1µs ± 2%             12.1µs ± 1%     ~           (p=0.558 n=19+18)
BM_Parse_Proto2<FileDesc, InitBlock, Copy>       12.0µs ± 3%             11.9µs ± 1%     ~           (p=0.165 n=17+19)
BM_Parse_Proto2<FileDescSV, InitBlock, Alias>    13.2µs ± 1%             13.2µs ± 1%     ~           (p=0.070 n=19+19)
BM_SerializeDescriptor_Proto2                    5.98µs ± 4%             5.92µs ± 3%     ~           (p=0.138 n=17+19)
BM_SerializeDescriptor_Upb                       10.4µs ± 1%             10.4µs ± 1%     ~           (p=0.858 n=17+18)
```

PiperOrigin-RevId: 518573683
diff --git a/upb/mem/arena.c b/upb/mem/arena.c
index 23124a6..4161cb0 100644
--- a/upb/mem/arena.c
+++ b/upb/mem/arena.c
@@ -26,6 +26,7 @@
  */
 
 #include "upb/mem/arena_internal.h"
+#include "upb/port/atomic.h"
 
 // Must be last.
 #include "upb/port/def.inc"
@@ -58,19 +59,42 @@
 static const size_t memblock_reserve =
     UPB_ALIGN_UP(sizeof(_upb_MemBlock), UPB_MALLOC_ALIGN);
 
-static upb_Arena* arena_findroot(upb_Arena* a) {
-  /* Path splitting keeps time complexity down, see:
-   *   https://en.wikipedia.org/wiki/Disjoint-set_data_structure */
-  while (a->parent != a) {
-    upb_Arena* next = a->parent;
-    a->parent = next->parent;
+static upb_Arena* _upb_Arena_FindRoot(upb_Arena* a) {
+  uintptr_t poc = upb_Atomic_LoadAcquire(&a->parent_or_count);
+  while (_upb_Arena_IsTaggedPointer(poc)) {
+    upb_Arena* next = _upb_Arena_PointerFromTagged(poc);
+    uintptr_t next_poc = upb_Atomic_LoadAcquire(&next->parent_or_count);
+
+    if (_upb_Arena_IsTaggedPointer(next_poc)) {
+      // To keep complexity down, we lazily collapse levels of the tree.  This
+      // keeps it flat in the final case, but doesn't cost much incrementally.
+      //
+      // Path splitting keeps time complexity down, see:
+      //   https://en.wikipedia.org/wiki/Disjoint-set_data_structure
+      //
+      // We can safely use a relaxed atomic here because all threads doing this
+      // will converge on the same value and we don't need memory orderings to
+      // be visible.
+      //
+      // This is true because:
+      // - If no fuses occur, this will eventually become the root.
+      // - If fuses are actively occuring, the root may change, but the
+      //   invariant is that `parent_or_count` merely points to *a* parent.
+      //
+      // In other words, it is moving towards "the" root, and that root may move
+      // further away over time, but the path towards that root will continue to
+      // be valid and the creation of the path carries all the memory orderings
+      // required.
+      upb_Atomic_StoreRelaxed(&a->parent_or_count, next_poc);
+    }
     a = next;
+    poc = next_poc;
   }
   return a;
 }
 
 size_t upb_Arena_SpaceAllocated(upb_Arena* arena) {
-  arena = arena_findroot(arena);
+  arena = _upb_Arena_FindRoot(arena);
   size_t memsize = 0;
 
   _upb_MemBlock* block = arena->freelist;
@@ -83,8 +107,15 @@
   return memsize;
 }
 
-uint32_t upb_Arena_DebugRefCount(upb_Arena* arena) {
-  return arena_findroot(arena)->refcount;
+uint32_t upb_Arena_DebugRefCount(upb_Arena* a) {
+  // These loads could probably be relaxed, but given that this is debug-only,
+  // it's not worth introducing a new variant for it.
+  uintptr_t poc = upb_Atomic_LoadAcquire(&a->parent_or_count);
+  while (_upb_Arena_IsTaggedPointer(poc)) {
+    a = _upb_Arena_PointerFromTagged(poc);
+    poc = upb_Atomic_LoadAcquire(&a->parent_or_count);
+  }
+  return _upb_Arena_RefCountFromTagged(poc);
 }
 
 static void upb_Arena_addblock(upb_Arena* a, upb_Arena* root, void* ptr,
@@ -108,7 +139,7 @@
 }
 
 static bool upb_Arena_Allocblock(upb_Arena* a, size_t size) {
-  upb_Arena* root = arena_findroot(a);
+  upb_Arena* root = _upb_Arena_FindRoot(a);
   size_t block_size = UPB_MAX(size, a->last_size * 2) + memblock_reserve;
   _upb_MemBlock* block = upb_malloc(root->block_alloc, block_size);
 
@@ -139,8 +170,7 @@
   n -= sizeof(*a);
 
   a->block_alloc = alloc;
-  a->parent = a;
-  a->refcount = 1;
+  upb_Atomic_Init(&a->parent_or_count, _upb_Arena_TaggedFromRefcount(1));
   a->freelist = NULL;
   a->freelist_tail = NULL;
   a->cleanup_metadata = upb_cleanup_metadata(NULL, false);
@@ -172,8 +202,7 @@
   a = UPB_PTR_AT(mem, n - sizeof(*a), upb_Arena);
 
   a->block_alloc = alloc;
-  a->parent = a;
-  a->refcount = 1;
+  upb_Atomic_Init(&a->parent_or_count, _upb_Arena_TaggedFromRefcount(1));
   a->last_size = UPB_MAX(128, n);
   a->head.ptr = mem;
   a->head.end = UPB_PTR_AT(mem, n - sizeof(*a), char);
@@ -186,8 +215,7 @@
 
 static void arena_dofree(upb_Arena* a) {
   _upb_MemBlock* block = a->freelist;
-  UPB_ASSERT(a->parent == a);
-  UPB_ASSERT(a->refcount == 0);
+  UPB_ASSERT(_upb_Arena_RefCountFromTagged(a->parent_or_count) == 1);
 
   while (block) {
     /* Load first since we are deleting block. */
@@ -208,8 +236,32 @@
 }
 
 void upb_Arena_Free(upb_Arena* a) {
-  a = arena_findroot(a);
-  if (--a->refcount == 0) arena_dofree(a);
+  uintptr_t poc = upb_Atomic_LoadAcquire(&a->parent_or_count);
+retry:
+  while (_upb_Arena_IsTaggedPointer(poc)) {
+    a = _upb_Arena_PointerFromTagged(poc);
+    poc = upb_Atomic_LoadAcquire(&a->parent_or_count);
+  }
+
+  // compare_exchange or fetch_sub are RMW operations, which are more
+  // expensive then direct loads.  As an optimization, we only do RMW ops
+  // when we need to update things for other threads to see.
+  if (poc == _upb_Arena_TaggedFromRefcount(1)) {
+    arena_dofree(a);
+    return;
+  }
+
+  if (upb_Atomic_CompareExchangeStrongAcqRel(
+          &a->parent_or_count, &poc,
+          _upb_Arena_TaggedFromRefcount(_upb_Arena_RefCountFromTagged(poc) -
+                                        1))) {
+    // We were >1 and we decremented it successfully, so we are done.
+    return;
+  }
+
+  // We failed our update, so someone has done something, retry the whole
+  // process, but the failed exchange reloaded `poc` for us.
+  goto retry;
 }
 
 bool upb_Arena_AddCleanup(upb_Arena* a, void* ud, upb_CleanupFunc* func) {
@@ -234,33 +286,80 @@
 }
 
 bool upb_Arena_Fuse(upb_Arena* a1, upb_Arena* a2) {
-  upb_Arena* r1 = arena_findroot(a1);
-  upb_Arena* r2 = arena_findroot(a2);
+  // SAFE IN THE PRESENCE OF FUSE/FREE RACES BUT NOT IN THE
+  // PRESENCE OF FUSE/FUSE RACES!!!
+  //
+  // `parent_or_count` has two disctint modes
+  // -  parent pointer mode
+  // -  refcount mode
+  //
+  // In parent pointer mode, it may change what pointer it refers to in the
+  // tree, but it will always approach a root.  Any operation that walks the
+  // tree to the root may collapse levels of the tree concurrently.
+  //
+  // In refcount mode, any free operation may lower the refcount.
+  //
+  // Only a fuse operation may increase the refcount.
+  // Only a fuse operation may switch `parent_or_count` from parent mode to
+  // refcount mode.
+  //
+  // Given that we do not allow fuse/fuse races, we may rely on the invariant
+  // that only refcounts can change once we have found the root.  Because the
+  // threads doing the fuse must hold references, we can guarantee that no
+  // refcounts will reach zero concurrently.
+  upb_Arena* r1 = _upb_Arena_FindRoot(a1);
+  upb_Arena* r2 = _upb_Arena_FindRoot(a2);
 
-  if (r1 == r2) return true; /* Already fused. */
+  if (r1 == r2) return true;  // Already fused.
 
-  /* Do not fuse initial blocks since we cannot lifetime extend them. */
+  // Do not fuse initial blocks since we cannot lifetime extend them.
   if (upb_cleanup_has_initial_block(r1->cleanup_metadata)) return false;
   if (upb_cleanup_has_initial_block(r2->cleanup_metadata)) return false;
 
-  /* Only allow fuse with a common allocator */
+  // Only allow fuse with a common allocator
   if (r1->block_alloc != r2->block_alloc) return false;
 
-  /* We want to join the smaller tree to the larger tree.
-   * So swap first if they are backwards. */
-  if (r1->refcount < r2->refcount) {
+  uintptr_t r1_poc = upb_Atomic_LoadAcquire(&r1->parent_or_count);
+  uintptr_t r2_poc = upb_Atomic_LoadAcquire(&r2->parent_or_count);
+  UPB_ASSERT(_upb_Arena_IsTaggedRefcount(r1_poc));
+  UPB_ASSERT(_upb_Arena_IsTaggedRefcount(r2_poc));
+
+  // Keep the tree shallow by joining the smaller tree to the larger.
+  if (_upb_Arena_RefCountFromTagged(r1_poc) <
+      _upb_Arena_RefCountFromTagged(r2_poc)) {
     upb_Arena* tmp = r1;
     r1 = r2;
     r2 = tmp;
+
+    uintptr_t tmp_poc = r1_poc;
+    r1_poc = r2_poc;
+    r2_poc = tmp_poc;
   }
 
-  /* r1 takes over r2's freelist and refcount. */
-  r1->refcount += r2->refcount;
+  // r1 takes over r2's freelist, this must happen before we update
+  // refcounts since the refcount carriers the memory dependencies.
   if (r2->freelist_tail) {
     UPB_ASSERT(r2->freelist_tail->next == NULL);
     r2->freelist_tail->next = r1->freelist;
     r1->freelist = r2->freelist;
   }
-  r2->parent = r1;
+
+  // The moment we install `r1` as the parent for `r2` all racing frees may
+  // immediately begin decrementing `r1`'s refcount.  So we must install all the
+  // refcounts that we know about first to prevent a premature unref to zero.
+  uint32_t r2_refcount = _upb_Arena_RefCountFromTagged(r2_poc);
+  upb_Atomic_AddRelease(&r1->parent_or_count, ((uintptr_t)r2_refcount) << 1);
+
+  // When installing `r1` as the parent for `r2` racing frees may have changed
+  // the refcount for `r2` so we need to capture the old value to fix up `r1`'s
+  // refcount based on the delta from what we saw the first time.
+  r2_poc = upb_Atomic_ExchangeAcqRel(&r2->parent_or_count,
+                                     _upb_Arena_TaggedFromPointer(r1));
+  UPB_ASSERT(_upb_Arena_IsTaggedRefcount(r2_poc));
+  uint32_t delta_refcount = r2_refcount - _upb_Arena_RefCountFromTagged(r2_poc);
+  if (delta_refcount != 0) {
+    upb_Atomic_SubRelease(&r1->parent_or_count, ((uintptr_t)delta_refcount)
+                                                    << 1);
+  }
   return true;
 }