Skip to content

[improve][broker] PIP-473: MetadataTransactionBuffer for segment topics#25768

Merged
merlimat merged 3 commits into
apache:masterfrom
merlimat:mmerli/pip-473-txn-buffer
May 14, 2026
Merged

[improve][broker] PIP-473: MetadataTransactionBuffer for segment topics#25768
merlimat merged 3 commits into
apache:masterfrom
merlimat:mmerli/pip-473-txn-buffer

Conversation

@merlimat
Copy link
Copy Markdown
Contributor

Summary

P3 of the metadata-driven transactions stack (PIP-473), building on the data layer landed in #25754. Adds a TransactionBuffer implementation for segment:// topics that reads truth from the metadata-store layout rather than from a per-topic snapshot log.

  • MetadataTransactionBuffer — header-cache + per-segment first-position tracking for maxReadPosition. appendBufferToTxn does a cache-first header authorization, then ML append, then a /txn-op write; all three must succeed before the producer is ack'd. State transitions are driven by /txn-segment-events notifications — on each event we re-read headers for every currently-open txn (K is small, so this is cheap and inherently tolerates subscribeSequence's collapse semantics).
  • Recovery rebuilds state from listWritesBySegment + per-txn header reads ("Option C" from the impl plan): no persisted event offset, no per-event replay. Cost bounded by active load on the segment, not by history.
  • MetadataTransactionBufferProvider constructs the buffer from the broker's local MetadataStore.
  • DispatchingTransactionBufferProvider routes segment:// topics to the metadata buffer and falls through to TopicTransactionBuffer for the legacy persistent:// path. This is now the configured default so segment topics pick up the new buffer out of the box.
  • Helpers shared with P4 / P5: TxnIds (<most>-<least> round-trip for TxnID), TxnPaths.txnIdFromOpPath, TxnMetadataStore.deleteWriteOpsForSegmentAndTxn.

Test plan

  • :pulsar-broker:test --tests MetadataTransactionBufferTest — 7 scenarios on the in-memory MetadataStore: open/commit/abort, concurrent open txns, TxnConflict on terminal header, recovery from pre-populated metadata, isTxnAborted on unknown txn
  • :pulsar-broker:test --tests DispatchingTransactionBufferProviderTest — 2 routing tests covering segment://persistent://
  • :pulsar-broker:checkstyleMain + :pulsar-broker:checkstyleTest clean

P3 of the metadata-driven transactions stack. Adds a TransactionBuffer
implementation for segment:// topics that reads truth from the metadata-
store layout shipped in P2.

- MetadataTransactionBuffer: header-cache + per-segment first-position
  tracking for maxReadPosition. appendBufferToTxn does a cache-first
  header authorization, then ML append, then a /txn-op write; all three
  must succeed before the producer is ack'd. State transitions are
  driven by /txn-segment-events notifications — on each event we
  re-read headers for every currently-open txn (cheap, K is small), so
  the design inherently tolerates subscribeSequence's collapse semantics.
- Recovery rebuilds in-memory state from listWritesBySegment + header
  reads (Option C from the plan): no persisted event offset, no per-event
  replay; cost bounded by active load on the segment, not by history.
- MetadataTransactionBufferProvider builds the buffer from the broker's
  local MetadataStore.
- DispatchingTransactionBufferProvider routes segment:// topics to the
  metadata buffer and falls through to TopicTransactionBuffer for the
  legacy persistent:// path. This is now the configured default so
  segment topics pick up the new buffer out of the box.
- TxnIds helper (<most>-<least> round-trip) + TxnPaths.txnIdFromOpPath
  + TxnMetadataStore.deleteWriteOpsForSegmentAndTxn cover what the TB
  needs from the metadata layer; the same helpers will be reused by P4
  (PendingAckStore) and P5 (TC v5).

Tests: 7 MetadataTransactionBuffer scenarios (open/commit/abort,
concurrent open txns, TxnConflict on terminal header, recovery from
pre-populated metadata, isTxnAborted on unknown txn) + 2 routing tests
for the dispatcher. All run against the in-memory MetadataStore via
mocked PersistentTopic + ManagedLedger.
@lhotari
Copy link
Copy Markdown
Member

lhotari commented May 13, 2026

Local review findings from Claude Code — not a substitute for human review, but flagging for the author's consideration.

1. [BUG] TxnIds.fromKey cannot round-trip a negative mostSigBitsTxnIds.java

toKey(new TxnID(-1, 1)) produces "-1-1". fromKey then runs key.indexOf('-') → returns 0 (the leading sign), trips the dash <= 0 guard, and throws IllegalArgumentException. Same for Long.MIN_VALUE mostSigBits. The javadoc claim "round-trips losslessly" is false. In current Pulsar code TxnID mostSigBits is normally non-negative, so this is latent — but the contract should match reality. The companion TxnPaths.txnIdFromOpPath already uses lastIndexOf('-'), so the on-disk encoding is consistent; only fromKey is wrong.

2. [BUG] recover() leaks the segment-events subscription on the failure path — MetadataTransactionBuffer.java:115-167

subscribeSegmentEvents is called first and subscription is set. If the subsequent listWritesBySegment or per-txn header reads fail, recoveryFuture is completed exceptionally and the method returns without closing subscription. Nothing else closes it until/unless closeAsync() is later invoked — but a TB whose recovery failed may never get a clean shutdown call, and the listener stays wired to the metadata store until then. Close subscription in the error branch of the terminal whenComplete.

3. [QUALITY] txns map grows unbounded — MetadataTransactionBuffer.java:94, 286-308

Terminal txns are never removed from txns; their firstPosition is nulled but the entry stays in the map for the life of the topic. cleanupOpRecords only deletes the on-disk /txn-op/... records. For a long-running segment with high txn turnover this is an in-memory leak. Either evict from txns once cleanupOpRecords completes, or cap the cache. If you evict, isTxnAborted would then start returning true for evicted txns (the "unknown → aborted" default) — that matches what readers should do after physical cleanup, but it should be tested.

4. [QUALITY] Events fired during recovery are dropped, with no post-recovery rescan — MetadataTransactionBuffer.java:115-167, 262

subscribeSegmentEvents is set up before the scan, but triggerReconcile short-circuits while \!recoveryFuture.isDone(). Any event fired between subscription start and recovery completion is silently ignored. If no further event arrives for an affected txn, its cached state stays at the recovery-snapshot value (potentially OPEN when the header has already moved to terminal). The design intent — events as wake-ups, header as truth — relies on a strong "any event will re-read all open headers", but that does not protect against a quiet txn whose only event landed in the recovery window. Either queue events arriving during recovery and replay them after recoveryFuture completes, or do an unconditional reconcile pass once recovery finishes.

5. [QUALITY] recordOp can mutate state on a concurrently-reconciled terminal txn — MetadataTransactionBuffer.java:240-257

Between the cache-first header check and the lock re-acquisition in recordOp, a reconcile may have flipped the entry to a terminal state with firstPosition = null. The current code then does entry.firstPosition = position (because firstPosition == null), mutating a terminal entry. recomputeMaxReadPositionLocked ignores non-OPEN entries so it is benign for read-position correctness today, but the state is misleading. Worse, the entry == null branch adds the txn back as OPEN with firstPosition = position — that genuinely undoes a reconcile decision (though that branch only triggers if some path actually removes entries, which today nothing does — see also finding 3). Guard the mutation on entry.state == OPEN.

6. [QUALITY] TOCTOU between header authorization and ML append — MetadataTransactionBuffer.java:197-205

The cache-first header read happens before appendToLedger. Between them, the TC may commit or abort the txn. The entry is still written to the managed ledger, and a /txn-op record is then written too. On COMMITTED, the spurious write becomes visible since isTxnAborted returns false. On ABORTED, it is filtered. The legacy TopicTransactionBuffer presumably has a similar window so this may be accepted as a TC-ordering concern, but a sentence in the class javadoc on what the TC must guarantee here would help future maintainers.

Copy link
Copy Markdown
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, please check local Claude Code review findings

merlimat added 2 commits May 13, 2026 13:45
…eak, etc.

Findings 1-6 from local Claude Code review on PR apache#25768 (@lhotari):

1. TxnIds.fromKey round-tripped on negative mostSigBits incorrectly because
   indexOf('-') hits the leading sign. Replace the separator with '_' (Java's
   Long.parseLong rejects underscores, so the split is unambiguous for any
   pair of long values — positive, negative, or MIN/MAX). +7 unit tests
   cover the matrix and malformed-input rejection.

2. recover() now closes the segment-event subscription if the scan or any
   header read fails — previously, a recovery error left the listener wired
   to the metadata store with no path to clean it up (closeAsync may never
   be called on a TB that never recovered).

3. After recoveryFuture completes successfully, do an explicit reconcile
   pass. The listener is registered before the scan to avoid a race with
   events that fire mid-recovery, but triggerReconcile short-circuits while
   recovery is in progress — so a state change whose only notification
   landed in that window would never get picked up otherwise.

4. recordOp now guards the firstPosition update on entry.state == OPEN. A
   concurrent reconcile that flipped the entry to terminal between the
   cache-first authorization read and recordOp shouldn't get clobbered
   back to OPEN.

5. Class javadoc documents (a) the publish-side TOCTOU window between
   header authorization and ML append, and the TC ordering it relies on,
   and (b) the txns-map growth tradeoff (cache pruning is deferred to
   P5/P6 — evicting committed entries would break isTxnAborted for valid
   messages).
V5TransactionTest.testAbortMovesTransactionToAbortedState was failing with
"Transaction is already ABORTED — TxnConflict" because the v4 transaction
coordinator (still in use today; the v5 TC arrives in P5) stores txn state
in MLTransactionMetadataStore, not at /txn/<txnId>. MetadataTransactionBuffer's
cache-first header read then returns empty and defaults to ABORTED, which
rejects every publish on a segment topic.

Revert the default transactionBufferProviderClassName back to
TopicTransactionBufferProvider. The DispatchingTransactionBufferProvider
class stays available so operators (and P5) can opt segment topics into the
metadata-driven buffer once the TC writes the headers it expects.
@merlimat merlimat merged commit fd0961d into apache:master May 14, 2026
43 checks passed
@merlimat merlimat deleted the mmerli/pip-473-txn-buffer branch May 14, 2026 00:50
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants