Fix the handling of subscriptions with multiple topics and extra fields#2425
Fix the handling of subscriptions with multiple topics and extra fields#2425abalarev wants to merge 1 commit intoeclipse-ditto:masterfrom
Conversation
f26bbaf to
01fd8d8
Compare
|
@abalarev will you also provide a system-test which tests this fix - seems important enough to me to cover it with an integration-test |
|
@thjaeckle Sounds reasonable, I'll add some system test(s) |
|
@abalarev thanks for the added system tests 👍 |
|
@thjaeckle Please run the system tests again, I've pushed a fix. |
01fd8d8 to
340ea56
Compare
thjaeckle
left a comment
There was a problem hiding this comment.
Thanks for tackling this — the refactor direction looks right and the headline scenarios are well-covered. A few correctness concerns and polish items inline.
One overarching note on test coverage: the four correctness concerns flagged below (shared-root trimming, cross-streaming-type early return, non-deterministic findFirst over a Set, and catch-all / extras precedence) are not yet exercised by the new tests. Once the intended behavior for each is settled, please add regression tests that lock it in.
| final var builder = extra.toBuilder(); | ||
| allExtraFields.getPointers().stream() | ||
| .filter(pointer -> !neededFields.getPointers().contains(pointer)) | ||
| .forEach(pointer -> pointer.getRoot().ifPresent(builder::remove)); |
There was a problem hiding this comment.
The trim removes by pointer.getRoot(), which deletes the whole top-level object. With allExtraFields = ["/features/f1", "/features/f2", "/attributes"] and neededFields = ["/features/f1", "/attributes"], the loop hits /features/f2, removes the features root, and also wipes /features/f1 that the matched topic actually requested.
The trimming should operate at full-pointer depth (remove the value at the exact pointer, or rebuild the JSON containing only neededFields) rather than at the root segment.
| .map(FilteredTopic::getExtraFields) | ||
| .flatMap(Optional::stream) | ||
| .toList(); | ||
| boolean topicWithNoFilterNoExtraFieldsExists = topics.stream().anyMatch(topic -> topic.getFilter().isEmpty() && topic.getExtraFields().isEmpty()); |
There was a problem hiding this comment.
topics is target.getTopics() unfiltered by streaming type — pairTargetsWithTopics now passes all of a target's topics whenever any one of them has extra fields. So a target with e.g. a TWIN_EVENTS extras-topic plus a LIVE_MESSAGES topic with no filter and no extras would short-circuit enrichment for a TWIN_EVENTS signal, even though the LIVE_MESSAGES topic is irrelevant to that signal.
This anyMatch should apply the same streamingType == StreamingType.fromTopic(topic.getTopic().getPubSubTopic()) guard used in pairTargetsWithTopics.
There was a problem hiding this comment.
I had a gut feeling I've messed up something with the streamingType as I had few failing tests regarding it. I though I've fixed this in applyFilter method, but I now realize it is not correct.
I think now I must filter the topics returned by pairTargetsWithTopics to exclude topics with not the same streaming type, as you said LIVE_MESSAGES to be excluded. So I've added a filter in pairTargetsWithTopics and here I keep the criterion as it is.
| .flatMap(Optional::stream) | ||
| .toList(); | ||
| boolean topicWithNoFilterNoExtraFieldsExists = topics.stream().anyMatch(topic -> topic.getFilter().isEmpty() && topic.getExtraFields().isEmpty()); | ||
| if (allExtraFields.isEmpty() || topicWithNoFilterNoExtraFieldsExists) { |
There was a problem hiding this comment.
Behavior change worth pinning down: a target that mixes { TWIN_EVENTS, no filter, no extras } with { TWIN_EVENTS, filter=X, extras=definition } will now be delivered without extras (the catch-all wins via this early return). Previously the same signal also produced an enriched copy.
Deduplication is good, but is it intentional that the catch-all topic wins over the topic that explicitly requested enrichment? Either way, please document the precedence rule in the javadoc of enrichAndFilterSignal so the semantics are unambiguous.
There was a problem hiding this comment.
Previously the signal produced enriched copy only in case of matching filter, but no signal at all if not. I agree now the signal matching the filter would behave differently too.
Maybe it's worth to give priority to topics with extra as this would give more flexibility - it seems I have to just change the 'if'-statement to if (allExtraFields.isEmpty()) { in order to achieve this.
Or just documenting this change as there is no test covering the previous behaviour and therefore I would assume nobody use this in that way and relies on it.
@thjaeckle What is your opinion on this? Mine leans towards changing the 'if'-statement
There was a problem hiding this comment.
@abalarev yes, I would also do the if (allExtraFields.isEmpty()){.
The other one would be an unexpected behavior change - the fact that no unit test covers this does not mean that no adopters rely on this :)
| .map(signal -> setTrimmedExtra(signal, topic, expressionResolver, | ||
| extra, allExtraFieldsOptional.get())) | ||
| .stream()) | ||
| .findFirst() |
There was a problem hiding this comment.
topics is a Set<FilteredTopic>, which has no defined iteration order. If a signal matches more than one topic on the target, findFirst() picks an arbitrary one, so the trimmed extras delivered downstream become non-deterministic — the very symptom this PR sets out to fix.
Consider iterating in a deterministic order: derive from target.getTopics() via a structure that preserves insertion order, or sort topics by a stable key, so the chosen match is reproducible.
There was a problem hiding this comment.
Agree, I've added sorting by extraFields, such as no extra fields have less priority.
Signed-off-by: Andrey Balarev <andrey.balarev@bosch.com>
340ea56 to
c4216eb
Compare
Summary
Resolves incorrect handling of subscriptions for multiple topics, containing at least one extra field.
Issue
What's changed
OutboundMaooingProcessorActor flow changed as follows:
Added system tests: eclipse-ditto/ditto-testing#33