[improve][broker] PIP-473: MetadataPendingAckStore for segment subscriptions#25772
Open
merlimat wants to merge 1 commit into
Open
[improve][broker] PIP-473: MetadataPendingAckStore for segment subscriptions#25772merlimat wants to merge 1 commit into
merlimat wants to merge 1 commit into
Conversation
…iptions P4 of the metadata-driven transactions stack. Adds a PendingAckStore implementation for subscriptions on segment:// topics that reads truth from the metadata-store layout shipped in P2 and used by P3. - MetadataPendingAckStore: appendIndividualAck / appendCumulativeAck write TxnOp(ACK, segment, sub, ledgerId, entryId, cumulative) records via the TxnMetadataStore facade. appendCommitMark / appendAbortMark are no-ops — in v5 the TC owns the lifecycle, the store consumes its events. - Subscription-event listener applies the same header-truthing reconcile pattern as P3: on each notification, re-read headers for every currently- open txn this subscription is involved in; for those that went terminal, call PendingAckHandleImpl.commitTxn / abortTxn (no SPI-shape change for the handle) and then delete the corresponding /txn-op ack records. - Recovery via scan (Option C): subscribe → scan listAcksBySegmentSubscription → group by txnId → fetch headers → seed open-txn set → terminal txns discovered mid-scan are processed during recovery → drain post-recovery reconcile to catch any events that fired in the window. - MetadataPendingAckStoreProvider builds the store from the local MetadataStore. DispatchingTransactionPendingAckStoreProvider routes by TopicName.isSegment() and falls through to the legacy MLPendingAckStore for persistent:// subscriptions. - Default config (transactionPendingAckStoreProviderClassName) is unchanged — stays legacy until P5.4 flips everything together (TC v5 + dispatching TB provider + dispatching PendingAckStore provider). Supporting changes: - TxnOp gains a nullable Boolean cumulative field, only set on cumulative acks. WRITE records (and individual acks) leave it null. - TxnMetadataStore.deleteAckOpsForSegmentSubscriptionAndTxn mirrors the P3 write-op cleanup helper. The two flavors share a private scanAndDeleteOpsForTxn. Tests: 6 MetadataPendingAckStoreTest scenarios (individual ack append, cumulative ack append, no-op marks, commit/abort events drive handle callbacks + cleanup, recovery rebuilds open-txn set and applies terminal txns mid-scan) + 2 routing tests for the dispatcher. The segment-aware pending-ack workaround in MLPendingAckStore stays in place — segment topics still use the legacy store by default until P5.4 flips and that PR removes the workaround.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
P4 of the PIP-473 metadata-driven transactions stack. Adds the ack-side complement to #25768 — a
PendingAckStoreimplementation for subscriptions onsegment://topics that reads truth from the metadata-store layout.MetadataPendingAckStore:appendIndividualAck/appendCumulativeAckwriteTxnOp(ACK, segment, sub, ledgerId, entryId, cumulative)records via theTxnMetadataStorefaçade.appendCommitMark/appendAbortMarkare no-ops — in v5 the TC owns the lifecycle, the store consumes its events./txn-subscription-events/<seg>:<sub>-*) uses the same header-truthing reconcile pattern as the P3 TB: re-read headers for every currently-open txn this subscription is involved in, then for terminals callPendingAckHandleImpl.commitTxn/abortTxn(no SPI change) and delete the corresponding/txn-opack records.listAcksBySegmentSubscription→ group by txnId → fetch headers → seed open-txn set → terminal txns discovered mid-scan are processed inline → drain post-recovery reconcile.MetadataPendingAckStoreProvider+DispatchingTransactionPendingAckStoreProvider(routes byTopicName.isSegment(), falls through to legacyMLPendingAckStoreotherwise).Supporting changes
TxnOpgains a nullableBoolean cumulativefield (only set on cumulative acks).TxnMetadataStore.deleteAckOpsForSegmentSubscriptionAndTxnmirrors the P3 write-op cleanup helper; the two flavors share a privatescanAndDeleteOpsForTxn.Not in this PR (deferred to P5.4)
transactionPendingAckStoreProviderClassNamestaysMLPendingAckStoreProvider. The new store is opt-in until the v5 TC is wired up.Test plan
pulsar-broker:test --tests MetadataPendingAckStoreTest— 6 cases on the metadata store (Memory backend, mockedPersistentSubscription+PendingAckHandleImpl): individual ack append, cumulative ack append, no-op marks, commit event driveshandle.commitTxn+ ack-op cleanup, abort event driveshandle.abortTxn+ cleanup, recovery rebuilds state + processes terminal txns discovered mid-scan.pulsar-broker:test --tests DispatchingTransactionPendingAckStoreProviderTest— 2 routing tests.TxnOp.cumulativeaddition.pulsar-broker.