Skip to content

Commit 0b4073c

Browse files
Merge pull request #377 from Shopify/dahrnsbrak/fix-retry-queue-preresolved-lazy-loading
Fix retry queue in preresolved lazy loading mode
2 parents fd9e0f6 + 57a69d2 commit 0b4073c

5 files changed

Lines changed: 344 additions & 13 deletions

File tree

ruby/lib/ci/queue/redis/retry.rb

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,22 @@ def build
1212
@build ||= CI::Queue::Redis::BuildRecord.new(self, redis, config)
1313
end
1414

15+
# Retry queue is pre-populated with failed test entries from the previous run.
16+
# Don't replace them with the full preresolved/lazy test list.
17+
# QueuePopulationStrategy#configure_lazy_queue will still set entry_resolver,
18+
# so poll uses LazyEntryResolver to lazily load test files on demand.
19+
# The random/batch_size params are intentionally ignored since we keep
20+
# the existing queue contents as-is.
21+
#
22+
# Note: populate (non-stream) is intentionally NOT overridden here.
23+
# RSpec and non-lazy Minitest retries call populate to build the
24+
# @index mapping test IDs to runnable objects, which poll needs to
25+
# yield proper test/example instances. In those paths, @queue contains
26+
# bare test IDs that match @index keys, so populate works correctly.
27+
def stream_populate(tests, random: nil, batch_size: nil)
28+
self
29+
end
30+
1531
private
1632

1733
attr_reader :redis

ruby/lib/ci/queue/redis/worker.rb

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -148,9 +148,10 @@ def retrying?
148148
def retry_queue
149149
failures = build.failed_tests.to_set
150150
log = redis.lrange(key('worker', worker_id, 'queue'), 0, -1)
151-
log = log.map { |entry| CI::Queue::QueueEntry.test_id(entry) }
152-
log.select! { |test_id| failures.include?(test_id) }
153-
log.uniq!
151+
# Keep full entries (test_id + file_path) so lazy loading can resolve them.
152+
# Filter by test_id against failures without stripping file paths.
153+
log.select! { |entry| failures.include?(CI::Queue::QueueEntry.test_id(entry)) }
154+
log.uniq! { |entry| CI::Queue::QueueEntry.test_id(entry) }
154155
log.reverse!
155156
Retry.new(log, config, redis: redis)
156157
end

ruby/lib/ci/queue/static.rb

Lines changed: 45 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ def from_uri(uri, config)
1616
TEN_MINUTES = 60 * 10
1717

1818
attr_reader :progress, :total
19+
attr_accessor :entry_resolver
1920

2021
def initialize(tests, config)
2122
@queue = tests
@@ -50,6 +51,16 @@ def populate(tests, random: nil)
5051
self
5152
end
5253

54+
# Support lazy loading mode: accept an enumerator of entries and
55+
# store them in queue order (no shuffling). This preserves the
56+
# exact order from the input file for local reproduction.
57+
def stream_populate(tests, random: nil, batch_size: nil)
58+
@queue = []
59+
tests.each { |entry| @queue << entry }
60+
@total = @queue.size
61+
self
62+
end
63+
5364
def with_heartbeat(id, lease: nil)
5465
yield
5566
end
@@ -79,11 +90,15 @@ def expired?
7990
end
8091

8192
def populated?
82-
!!defined?(@index)
93+
!!defined?(@index) || @queue.any?
8394
end
8495

8596
def to_a
86-
@queue.map { |i| index.fetch(i) }
97+
if defined?(@index) && @index
98+
@queue.map { |i| index.fetch(i) }
99+
else
100+
@queue.dup
101+
end
87102
end
88103

89104
def size
@@ -101,9 +116,28 @@ def running
101116
def poll
102117
while !@shutdown && config.circuit_breakers.none?(&:open?) && !max_test_failed? && reserved_test = @queue.shift
103118
reserved_tests << reserved_test
104-
yield index.fetch(reserved_test)
119+
if entry_resolver
120+
resolved = entry_resolver.call(reserved_test)
121+
# Track the original queue entry so requeue can push it back
122+
# with its full payload (file path, load-error data, etc.).
123+
reserved_entries[resolved.id] = reserved_test if resolved.respond_to?(:id)
124+
yield resolved
125+
elsif defined?(@index) && @index
126+
# Queue entries may be JSON-formatted (with test_id + file_path) while
127+
# the index is keyed by bare test_id from populate. Try the raw entry
128+
# first, then fall back to extracting the test_id.
129+
test_id = begin
130+
CI::Queue::QueueEntry.test_id(reserved_test)
131+
rescue JSON::ParserError
132+
reserved_test
133+
end
134+
yield index.fetch(test_id)
135+
else
136+
yield reserved_test
137+
end
105138
end
106139
reserved_tests.clear
140+
reserved_entries.clear
107141
end
108142

109143
def exhausted?
@@ -134,7 +168,10 @@ def requeue(entry)
134168
return false unless should_requeue?(test_id)
135169

136170
requeues[test_id] += 1
137-
@queue.unshift(test_id)
171+
# Push back the original queue entry (with file path / load-error payload)
172+
# so entry_resolver can fully resolve it on the next poll iteration.
173+
original_entry = reserved_entries.delete(test_id) || test_id
174+
@queue.unshift(original_entry)
138175
true
139176
end
140177

@@ -150,6 +187,10 @@ def requeues
150187
@requeues ||= Hash.new(0)
151188
end
152189

190+
def reserved_entries
191+
@reserved_entries ||= {}
192+
end
193+
153194
def reserved_tests
154195
@reserved_tests ||= Concurrent::Set.new
155196
end

ruby/test/ci/queue/redis_test.rb

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,149 @@ def test_retry_queue_with_all_tests_passing_2
6161
assert_equal retry_test_order, retry_test_order
6262
end
6363

64+
def test_retry_queue_preserves_full_entries_with_file_paths
65+
# Use stream_populate with file-path entries (as in preresolved mode),
66+
# then verify retry_queue preserves the full entry including the file path.
67+
@redis.flushdb
68+
build_id = 'retry-file-paths'
69+
leader = worker(1, populate: false, lazy_load_streaming_timeout: 2, queue_init_timeout: 2, build_id: build_id)
70+
consumer = worker(2, populate: false, lazy_load_streaming_timeout: 2, queue_init_timeout: 2, build_id: build_id)
71+
consumer.entry_resolver = ->(entry) { entry }
72+
73+
tests = [
74+
EntryTest.new('ATest#test_foo', CI::Queue::QueueEntry.format('ATest#test_foo', '/tmp/a_test.rb')),
75+
EntryTest.new('ATest#test_bar', CI::Queue::QueueEntry.format('ATest#test_bar', '/tmp/a_test.rb')),
76+
]
77+
78+
leader_thread = Thread.new do
79+
leader.stream_populate(tests, random: Random.new(0), batch_size: 10)
80+
end
81+
82+
timeout_at = Process.clock_gettime(Process::CLOCK_MONOTONIC) + 2
83+
loop do
84+
status = @redis.get(leader.send(:key, 'master-status'))
85+
break if status == 'ready'
86+
raise "streaming status not set" if Process.clock_gettime(Process::CLOCK_MONOTONIC) > timeout_at
87+
sleep 0.01
88+
end
89+
90+
# Consumer polls all tests, failing the first one
91+
failed_entry = nil
92+
consumer.poll do |entry|
93+
if failed_entry.nil?
94+
failed_entry = entry
95+
consumer.report_failure!
96+
# record_error calls acknowledge internally
97+
consumer.build.record_error(entry, 'Failed')
98+
else
99+
consumer.report_success!
100+
consumer.acknowledge(entry)
101+
end
102+
end
103+
104+
leader_thread.join(2)
105+
106+
retry_queue = consumer.retry_queue
107+
refute_predicate retry_queue, :exhausted?
108+
109+
retry_entries = retry_queue.instance_variable_get(:@queue).dup
110+
assert_equal 1, retry_entries.size
111+
# The critical assertion: retry entry must be a JSON entry with file_path,
112+
# not just the bare test ID. A regression in retry_queue would strip this.
113+
parsed = CI::Queue::QueueEntry.parse(retry_entries.first)
114+
assert parsed[:file_path], "Retry entry should preserve the full entry with file path"
115+
failed_test_id = CI::Queue::QueueEntry.test_id(failed_entry)
116+
assert_equal failed_test_id, CI::Queue::QueueEntry.test_id(retry_entries.first)
117+
ensure
118+
leader_thread&.kill
119+
end
120+
121+
def test_retry_queue_stream_populate_is_noop
122+
target = shuffled_test_list.first
123+
@queue.poll do |test|
124+
if test == target
125+
@queue.report_failure!
126+
# record_error calls acknowledge internally
127+
@queue.build.record_error(test.queue_entry, 'Failed')
128+
else
129+
@queue.report_success!
130+
@queue.acknowledge(test.queue_entry)
131+
end
132+
end
133+
134+
retry_queue = @queue.retry_queue
135+
original_queue_contents = retry_queue.instance_variable_get(:@queue).dup
136+
refute_empty original_queue_contents
137+
138+
# stream_populate should NOT replace the retry queue's contents
139+
dummy_entries = Enumerator.new do |yielder|
140+
yielder << CI::Queue::QueueEntry.format("ZTest#test_zzz", "/tmp/z_test.rb")
141+
end
142+
retry_queue.stream_populate(dummy_entries, random: Random.new(0))
143+
144+
assert_equal original_queue_contents, retry_queue.instance_variable_get(:@queue),
145+
"stream_populate should not replace retry queue contents"
146+
end
147+
148+
def test_retry_queue_works_with_entry_resolver
149+
# Fail a test, then verify retry queue works with entry_resolver (lazy loading)
150+
target = shuffled_test_list.first
151+
@queue.poll do |test|
152+
if test == target
153+
@queue.report_failure!
154+
# record_error calls acknowledge internally
155+
@queue.build.record_error(test.queue_entry, 'Failed')
156+
else
157+
@queue.report_success!
158+
@queue.acknowledge(test.queue_entry)
159+
end
160+
end
161+
162+
retry_queue = @queue.retry_queue
163+
164+
# Set up entry_resolver (as configure_lazy_queue would do)
165+
resolved_entries = []
166+
retry_queue.entry_resolver = ->(entry) {
167+
resolved_entries << entry
168+
entry
169+
}
170+
171+
# stream_populate is a no-op, preserving the retry entries
172+
retry_queue.stream_populate(Enumerator.new { |y| }, random: Random.new(0))
173+
174+
# Poll should use entry_resolver, not index.fetch — no KeyError crash
175+
polled = []
176+
retry_queue.poll do |test|
177+
polled << test
178+
retry_queue.acknowledge(test)
179+
end
180+
181+
assert_equal retry_queue.total, polled.size
182+
assert_equal polled.size, resolved_entries.size,
183+
"All polled entries should have gone through entry_resolver"
184+
end
185+
186+
def test_retry_queue_with_multiple_failures_deduplicates
187+
# Fail multiple tests, verify retry queue deduplicates by test_id
188+
failed_ids = []
189+
@queue.poll do |test|
190+
@queue.report_failure!
191+
@queue.build.record_error(test.queue_entry, 'Failed')
192+
failed_ids << test.id
193+
end
194+
195+
assert_operator failed_ids.size, :>=, 2, "Need multiple failures for this test"
196+
197+
retry_queue = @queue.retry_queue
198+
retry_entries = retry_queue.instance_variable_get(:@queue).dup
199+
200+
# Each failed test should appear exactly once (no duplicates from requeues)
201+
retry_test_ids = retry_entries.map { |e| CI::Queue::QueueEntry.test_id(e) }
202+
assert_equal retry_test_ids.uniq, retry_test_ids,
203+
"Retry queue should not contain duplicate test IDs"
204+
assert_equal failed_ids.uniq.sort, retry_test_ids.sort
205+
end
206+
64207
def test_shutdown
65208
poll(@queue) do
66209
@queue.shutdown!

0 commit comments

Comments
 (0)