[improve][broker] Cache publish metadata and avoid repeated parsing in the broker send path#25555
[improve][broker] Cache publish metadata and avoid repeated parsing in the broker send path#25555dao-jun wants to merge 3 commits into
Conversation
lhotari
left a comment
There was a problem hiding this comment.
Yes, this is a very useful change. I've also been thinking of this when I added a similar optimization for the dispatcher side.
A few review comments.
|
@lhotari Thanks for your review, the most of pulsar downstream projects(kop, rop, mop...) are not maintained anymore now, maybe some private forks are active, but it's not our duty to maintain their downward compatibility. |
| if (brokerInterceptor != null) { | ||
| brokerInterceptor | ||
| .onMessagePublish(this, headersAndPayload, messagePublishContext); | ||
| brokerInterceptor.onMessagePublish(this, headersAndPayload, messagePublishContext); |
There was a problem hiding this comment.
The PR body says the cached metadata is invalidated after BrokerInterceptor.onMessagePublish(...), but I don't see that in the final diff.
This changes behavior when metadata has already been cached before the interceptor runs. For example, on an encryption-required topic, checkAndStartPublish(...) calls publishContext.getMessageMetadata(...). If an interceptor then mutates the message metadata in headersAndPayload, PersistentTopic.isExceedMaximumDeliveryDelay(...) and MessageDeduplication will reuse the pre-interceptor cached metadata. In current master those later checks reparse headersAndPayload after the interceptor, so they observe the post-interceptor metadata.
Could we add a MessagePublishContext#clearMessageMetadata()/invalidateMessageMetadata() and call it immediately after BrokerInterceptor.onMessagePublish(...) in both the normal publish path and publishTxnMessage?
If multiple interceptors are executed in series and the first interceptor modifies metadata, and the second interceptor reads metadata through publishContext.getMessageMetadata(...), then clearing the cache only after the entire BrokerInterceptors returns cannot solve the visibility within the interceptor chain?
| if (checkAndStartPublish(producerId, sequenceId, headersAndPayload, position, messagePublishContext)) { | ||
| publishMessageToTopic(headersAndPayload, messagePublishContext); | ||
| } else { | ||
| messagePublishContext.recycle(); |
There was a problem hiding this comment.
A related lifecycle optimization: after topic.publishMessage(...) / topic.publishTxnMessage(...) returns, the cached MessageMetadata appears no longer needed by the current publish flow. PersistentTopic consumes it synchronously for max-delivery-delay validation and deduplication before asyncAddEntry/transaction append. The later async completion path and messageProduced interceptor don't receive headersAndPayload, so keeping the parsed MessageMetadata attached to MessagePublishContext until recycle extends its lifetime to the full in-flight write duration.
If we add clearMessageMetadata() for the interceptor correctness issue above, it would be useful to also clear it after the topic publish call returns, preferably in a finally block, so high-throughput or high-latency writes don't retain parsed metadata objects longer than necessary.
Motivation
ServerCnx.handleSend sits on the hot publish path, but the same MessageMetadata can be parsed multiple times for a single message. In the persistent-topic flow, producer-side encryption validation, delayed-delivery checks, and deduplication/chunk handling may all reparse the same payload. That adds avoidable CPU overhead
to a latency-sensitive path.
This change reduces that overhead by parsing metadata once per publish attempt and reusing it across the broker publish flow. It also keeps the behavior safe by invalidating the cached metadata after broker interceptors run, so any interceptor-side buffer mutation does not reuse stale data.
Modifications
Verifying this change
(Please pick either of the following options)
This change is a trivial rework / code cleanup without any test coverage.
(or)
This change is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(example:)
Does this pull request potentially affect one of the following parts:
If the box was checked, please highlight the changes