Skip to content

Commit 901bb96

Browse files
committed
Fix Fetcher._fetch_offsets_by_times retry handling (#2833)
1 parent aa5ce58 commit 901bb96

2 files changed

Lines changed: 192 additions & 4 deletions

File tree

kafka/consumer/fetcher.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -250,16 +250,17 @@ def _fetch_offsets_by_times(self, timestamps, timeout_ms=None):
250250
break
251251

252252
if future.succeeded():
253-
fetched_offsets.update(future.value[0])
254-
if not future.value[1]:
253+
offsets, retry = future.value
254+
fetched_offsets.update(offsets)
255+
if not retry:
255256
return fetched_offsets
256257

257-
timestamps = {tp: timestamps[tp] for tp in future.value[1]}
258+
timestamps = {tp: timestamps[tp] for tp in retry}
258259

259260
elif not future.retriable():
260261
raise future.exception # pylint: disable-msg=raising-bad-type
261262

262-
if future.exception.invalid_metadata or self._client.cluster.need_update:
263+
elif future.exception.invalid_metadata or self._client.cluster.need_update:
263264
refresh_future = self._client.cluster.request_update()
264265
self._client.poll(future=refresh_future, timeout_ms=timer.timeout_ms)
265266

test/test_fetcher.py

Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -770,3 +770,190 @@ def test_seek_before_exception(client, mocker):
770770
# Should not throw OffsetOutOfRangeError after the seek
771771
records, partial = fetcher.fetched_records()
772772
assert len(records) == 0
773+
774+
775+
class TestFetchOffsetsByTimes:
776+
def _make_fetcher(self, client, mocker):
777+
subscription_state = SubscriptionState()
778+
subscription_state.subscribe(topics=['test'])
779+
tp = TopicPartition('test', 0)
780+
subscription_state.assign_from_subscribed([tp])
781+
subscription_state.seek(tp, 0)
782+
return Fetcher(client, subscription_state)
783+
784+
def test_empty_timestamps(self, client, metrics, mocker):
785+
fetcher = self._make_fetcher(client, mocker)
786+
assert fetcher._fetch_offsets_by_times({}) == {}
787+
788+
def test_success_no_retry(self, client, mocker):
789+
fetcher = self._make_fetcher(client, mocker)
790+
tp = TopicPartition('test', 0)
791+
timestamps = {tp: 1000}
792+
expected_offset = OffsetAndTimestamp(10, 1000, -1)
793+
794+
future = Future()
795+
mocker.patch.object(fetcher, '_send_list_offsets_requests', return_value=future)
796+
mocker.patch.object(fetcher._client, 'poll', side_effect=lambda **kw: future.success(({tp: expected_offset}, set())))
797+
798+
result = fetcher._fetch_offsets_by_times(timestamps, timeout_ms=10000)
799+
assert result == {tp: expected_offset}
800+
801+
def test_success_with_retry(self, client, mocker):
802+
fetcher = self._make_fetcher(client, mocker)
803+
tp0 = TopicPartition('test', 0)
804+
tp1 = TopicPartition('test', 1)
805+
timestamps = {tp0: 1000, tp1: 2000}
806+
offset0 = OffsetAndTimestamp(10, 1000, -1)
807+
offset1 = OffsetAndTimestamp(20, 2000, -1)
808+
809+
# First call succeeds for tp0 but needs retry for tp1
810+
future1 = Future()
811+
future2 = Future()
812+
futures = iter([future1, future2])
813+
mocker.patch.object(fetcher, '_send_list_offsets_requests', side_effect=lambda ts: next(futures))
814+
815+
def poll_side_effect(**kw):
816+
f = kw.get('future')
817+
if f is future1:
818+
f.success(({tp0: offset0}, {tp1}))
819+
elif f is future2:
820+
f.success(({tp1: offset1}, set()))
821+
822+
mocker.patch.object(fetcher._client, 'poll', side_effect=poll_side_effect)
823+
824+
result = fetcher._fetch_offsets_by_times(timestamps, timeout_ms=10000)
825+
assert result == {tp0: offset0, tp1: offset1}
826+
827+
def test_timeout_raises(self, client, mocker):
828+
fetcher = self._make_fetcher(client, mocker)
829+
tp = TopicPartition('test', 0)
830+
timestamps = {tp: 1000}
831+
832+
future = Future()
833+
mocker.patch.object(fetcher, '_send_list_offsets_requests', return_value=future)
834+
# poll does not complete the future
835+
mocker.patch.object(fetcher._client, 'poll')
836+
837+
with pytest.raises(Errors.KafkaTimeoutError):
838+
fetcher._fetch_offsets_by_times(timestamps, timeout_ms=10000)
839+
840+
def test_non_retriable_error_raises(self, client, mocker):
841+
fetcher = self._make_fetcher(client, mocker)
842+
tp = TopicPartition('test', 0)
843+
timestamps = {tp: 1000}
844+
845+
future = Future()
846+
mocker.patch.object(fetcher, '_send_list_offsets_requests', return_value=future)
847+
# AuthorizationError is not retriable
848+
error = Errors.TopicAuthorizationFailedError()
849+
mocker.patch.object(fetcher._client, 'poll', side_effect=lambda **kw: future.failure(error))
850+
851+
with pytest.raises(Errors.TopicAuthorizationFailedError):
852+
fetcher._fetch_offsets_by_times(timestamps, timeout_ms=10000)
853+
854+
def test_retriable_invalid_metadata_triggers_refresh(self, client, mocker):
855+
fetcher = self._make_fetcher(client, mocker)
856+
tp = TopicPartition('test', 0)
857+
timestamps = {tp: 1000}
858+
expected_offset = OffsetAndTimestamp(10, 1000, -1)
859+
860+
# First call fails with invalid_metadata error, second succeeds
861+
future1 = Future()
862+
future2 = Future()
863+
futures = iter([future1, future2])
864+
mocker.patch.object(fetcher, '_send_list_offsets_requests', side_effect=lambda ts: next(futures))
865+
866+
refresh_future = Future()
867+
mocker.patch.object(fetcher._client.cluster, 'request_update', return_value=refresh_future)
868+
869+
call_count = [0]
870+
def poll_side_effect(**kw):
871+
f = kw.get('future')
872+
if f is future1:
873+
f.failure(NotLeaderForPartitionError())
874+
elif f is refresh_future:
875+
refresh_future.success(None)
876+
elif f is future2:
877+
f.success(({tp: expected_offset}, set()))
878+
call_count[0] += 1
879+
880+
mocker.patch.object(fetcher._client, 'poll', side_effect=poll_side_effect)
881+
882+
result = fetcher._fetch_offsets_by_times(timestamps, timeout_ms=10000)
883+
assert result == {tp: expected_offset}
884+
fetcher._client.cluster.request_update.assert_called_once()
885+
886+
def test_retriable_non_metadata_error_sleeps(self, client, mocker):
887+
fetcher = self._make_fetcher(client, mocker)
888+
tp = TopicPartition('test', 0)
889+
timestamps = {tp: 1000}
890+
expected_offset = OffsetAndTimestamp(10, 1000, -1)
891+
892+
# RequestTimedOutError is retriable but not invalid_metadata
893+
future1 = Future()
894+
future2 = Future()
895+
futures = iter([future1, future2])
896+
mocker.patch.object(fetcher, '_send_list_offsets_requests', side_effect=lambda ts: next(futures))
897+
898+
# Ensure cluster does not need update
899+
mocker.patch.object(type(fetcher._client.cluster), 'need_update', new_callable=mocker.PropertyMock, return_value=False)
900+
901+
def poll_side_effect(**kw):
902+
f = kw.get('future')
903+
if f is future1:
904+
f.failure(Errors.RequestTimedOutError())
905+
elif f is future2:
906+
f.success(({tp: expected_offset}, set()))
907+
908+
mocker.patch.object(fetcher._client, 'poll', side_effect=poll_side_effect)
909+
mock_sleep = mocker.patch('time.sleep')
910+
911+
result = fetcher._fetch_offsets_by_times(timestamps, timeout_ms=10000)
912+
assert result == {tp: expected_offset}
913+
mock_sleep.assert_called_once()
914+
915+
def test_success_does_not_check_exception(self, client, mocker):
916+
"""Regression: successful future should not fall through to check future.exception."""
917+
fetcher = self._make_fetcher(client, mocker)
918+
tp0 = TopicPartition('test', 0)
919+
tp1 = TopicPartition('test', 1)
920+
timestamps = {tp0: 1000, tp1: 2000}
921+
offset0 = OffsetAndTimestamp(10, 1000, -1)
922+
offset1 = OffsetAndTimestamp(20, 2000, -1)
923+
924+
future1 = Future()
925+
future2 = Future()
926+
futures = iter([future1, future2])
927+
mocker.patch.object(fetcher, '_send_list_offsets_requests', side_effect=lambda ts: next(futures))
928+
929+
def poll_side_effect(**kw):
930+
f = kw.get('future')
931+
if f is future1:
932+
# Succeeds but has retry partitions — the bug was that code
933+
# would fall through to check future.exception (which is None),
934+
# causing an AttributeError
935+
f.success(({tp0: offset0}, {tp1}))
936+
elif f is future2:
937+
f.success(({tp1: offset1}, set()))
938+
939+
mocker.patch.object(fetcher._client, 'poll', side_effect=poll_side_effect)
940+
941+
# Should not raise AttributeError
942+
result = fetcher._fetch_offsets_by_times(timestamps, timeout_ms=10000)
943+
assert result == {tp0: offset0, tp1: offset1}
944+
945+
def test_no_timeout_passes_none(self, client, mocker):
946+
fetcher = self._make_fetcher(client, mocker)
947+
tp = TopicPartition('test', 0)
948+
timestamps = {tp: 1000}
949+
expected_offset = OffsetAndTimestamp(10, 1000, -1)
950+
951+
future = Future()
952+
mocker.patch.object(fetcher, '_send_list_offsets_requests', return_value=future)
953+
mocker.patch.object(fetcher._client, 'poll', side_effect=lambda **kw: future.success(({tp: expected_offset}, set())))
954+
955+
result = fetcher._fetch_offsets_by_times(timestamps, timeout_ms=None)
956+
assert result == {tp: expected_offset}
957+
# With timeout_ms=None, poll should receive None timeout
958+
fetcher._client.poll.assert_called_once()
959+
assert fetcher._client.poll.call_args[1]['timeout_ms'] is None

0 commit comments

Comments
 (0)