[improve][broker] PIP-473: MetadataTransactionBuffer for segment topics#25768
Conversation
P3 of the metadata-driven transactions stack. Adds a TransactionBuffer implementation for segment:// topics that reads truth from the metadata- store layout shipped in P2. - MetadataTransactionBuffer: header-cache + per-segment first-position tracking for maxReadPosition. appendBufferToTxn does a cache-first header authorization, then ML append, then a /txn-op write; all three must succeed before the producer is ack'd. State transitions are driven by /txn-segment-events notifications — on each event we re-read headers for every currently-open txn (cheap, K is small), so the design inherently tolerates subscribeSequence's collapse semantics. - Recovery rebuilds in-memory state from listWritesBySegment + header reads (Option C from the plan): no persisted event offset, no per-event replay; cost bounded by active load on the segment, not by history. - MetadataTransactionBufferProvider builds the buffer from the broker's local MetadataStore. - DispatchingTransactionBufferProvider routes segment:// topics to the metadata buffer and falls through to TopicTransactionBuffer for the legacy persistent:// path. This is now the configured default so segment topics pick up the new buffer out of the box. - TxnIds helper (<most>-<least> round-trip) + TxnPaths.txnIdFromOpPath + TxnMetadataStore.deleteWriteOpsForSegmentAndTxn cover what the TB needs from the metadata layer; the same helpers will be reused by P4 (PendingAckStore) and P5 (TC v5). Tests: 7 MetadataTransactionBuffer scenarios (open/commit/abort, concurrent open txns, TxnConflict on terminal header, recovery from pre-populated metadata, isTxnAborted on unknown txn) + 2 routing tests for the dispatcher. All run against the in-memory MetadataStore via mocked PersistentTopic + ManagedLedger.
|
Local review findings from Claude Code — not a substitute for human review, but flagging for the author's consideration. 1. [BUG]
|
lhotari
left a comment
There was a problem hiding this comment.
LGTM, please check local Claude Code review findings
…eak, etc. Findings 1-6 from local Claude Code review on PR apache#25768 (@lhotari): 1. TxnIds.fromKey round-tripped on negative mostSigBits incorrectly because indexOf('-') hits the leading sign. Replace the separator with '_' (Java's Long.parseLong rejects underscores, so the split is unambiguous for any pair of long values — positive, negative, or MIN/MAX). +7 unit tests cover the matrix and malformed-input rejection. 2. recover() now closes the segment-event subscription if the scan or any header read fails — previously, a recovery error left the listener wired to the metadata store with no path to clean it up (closeAsync may never be called on a TB that never recovered). 3. After recoveryFuture completes successfully, do an explicit reconcile pass. The listener is registered before the scan to avoid a race with events that fire mid-recovery, but triggerReconcile short-circuits while recovery is in progress — so a state change whose only notification landed in that window would never get picked up otherwise. 4. recordOp now guards the firstPosition update on entry.state == OPEN. A concurrent reconcile that flipped the entry to terminal between the cache-first authorization read and recordOp shouldn't get clobbered back to OPEN. 5. Class javadoc documents (a) the publish-side TOCTOU window between header authorization and ML append, and the TC ordering it relies on, and (b) the txns-map growth tradeoff (cache pruning is deferred to P5/P6 — evicting committed entries would break isTxnAborted for valid messages).
V5TransactionTest.testAbortMovesTransactionToAbortedState was failing with "Transaction is already ABORTED — TxnConflict" because the v4 transaction coordinator (still in use today; the v5 TC arrives in P5) stores txn state in MLTransactionMetadataStore, not at /txn/<txnId>. MetadataTransactionBuffer's cache-first header read then returns empty and defaults to ABORTED, which rejects every publish on a segment topic. Revert the default transactionBufferProviderClassName back to TopicTransactionBufferProvider. The DispatchingTransactionBufferProvider class stays available so operators (and P5) can opt segment topics into the metadata-driven buffer once the TC writes the headers it expects.
Summary
P3 of the metadata-driven transactions stack (PIP-473), building on the data layer landed in #25754. Adds a
TransactionBufferimplementation forsegment://topics that reads truth from the metadata-store layout rather than from a per-topic snapshot log.MetadataTransactionBuffer— header-cache + per-segment first-position tracking formaxReadPosition.appendBufferToTxndoes a cache-first header authorization, then ML append, then a/txn-opwrite; all three must succeed before the producer is ack'd. State transitions are driven by/txn-segment-eventsnotifications — on each event we re-read headers for every currently-open txn (K is small, so this is cheap and inherently toleratessubscribeSequence's collapse semantics).listWritesBySegment+ per-txn header reads ("Option C" from the impl plan): no persisted event offset, no per-event replay. Cost bounded by active load on the segment, not by history.MetadataTransactionBufferProviderconstructs the buffer from the broker's localMetadataStore.DispatchingTransactionBufferProviderroutessegment://topics to the metadata buffer and falls through toTopicTransactionBufferfor the legacypersistent://path. This is now the configured default so segment topics pick up the new buffer out of the box.TxnIds(<most>-<least>round-trip forTxnID),TxnPaths.txnIdFromOpPath,TxnMetadataStore.deleteWriteOpsForSegmentAndTxn.Test plan
:pulsar-broker:test --tests MetadataTransactionBufferTest— 7 scenarios on the in-memoryMetadataStore: open/commit/abort, concurrent open txns, TxnConflict on terminal header, recovery from pre-populated metadata,isTxnAbortedon unknown txn:pulsar-broker:test --tests DispatchingTransactionBufferProviderTest— 2 routing tests coveringsegment://↔persistent://:pulsar-broker:checkstyleMain+:pulsar-broker:checkstyleTestclean