Skip to content

Commit 8aaa7c5

Browse files
committed
Restart HeartbeatProcess on transient pipe errors
When the monitor subprocess exits unexpectedly (e.g. due to a transient network/Redis error), @pipe.write raises IOError or Errno::EPIPE. With abort_on_exception=true on the heartbeat thread, this killed the entire worker mid-run, causing tests to be skipped. HeartbeatProcess#tick! now catches these errors, restarts the subprocess via a new restart! method, and retries up to MAX_RESTART_ATTEMPTS (3) consecutive failures before re-raising. The counter resets on each successful tick so transient errors don't exhaust the budget permanently. Also adds mocha as a dev dependency and a dedicated test file covering retry, restart, max-attempts, and counter-reset behaviour.
1 parent 0abb690 commit 8aaa7c5

5 files changed

Lines changed: 81 additions & 0 deletions

File tree

ruby/Gemfile.lock

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ GEM
4343
builder
4444
minitest (>= 5.0)
4545
ruby-progressbar
46+
mocha (3.1.0)
47+
ruby2_keywords (>= 0.0.5)
4648
msgpack (1.8.0)
4749
parallel (1.27.0)
4850
parser (3.3.10.2)
@@ -86,6 +88,7 @@ GEM
8688
parser (>= 3.3.7.2)
8789
prism (~> 1.7)
8890
ruby-progressbar (1.13.0)
91+
ruby2_keywords (0.0.5)
8992
securerandom (0.4.1)
9093
simplecov (0.22.0)
9194
docile (~> 1.1)
@@ -112,6 +115,7 @@ DEPENDENCIES
112115
ci-queue!
113116
minitest (~> 5.11)
114117
minitest-reporters (~> 1.1)
118+
mocha
115119
msgpack
116120
rake
117121
redis

ruby/ci-queue.gemspec

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ Gem::Specification.new do |spec|
4040
spec.add_development_dependency 'redis'
4141
spec.add_development_dependency 'simplecov', '~> 0.12'
4242
spec.add_development_dependency 'minitest-reporters', '~> 1.1'
43+
spec.add_development_dependency 'mocha'
4344

4445
spec.add_development_dependency 'rexml'
4546
spec.add_development_dependency 'snappy'

ruby/lib/ci/queue/redis/base.rb

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,8 @@ def resolve_lua_includes(script, root)
264264
end
265265

266266
class HeartbeatProcess
267+
MAX_RESTART_ATTEMPTS = 3
268+
267269
def initialize(redis_url, zset_key, owners_key, leases_key)
268270
@redis_url = redis_url
269271
@zset_key = zset_key
@@ -313,10 +315,28 @@ def shutdown!
313315

314316
def tick!(id, lease)
315317
send_message(:tick!, id: id, lease: lease.to_s)
318+
@restart_attempts = 0
319+
rescue IOError, Errno::EPIPE => error
320+
@restart_attempts = (@restart_attempts || 0) + 1
321+
raise if @restart_attempts > MAX_RESTART_ATTEMPTS
322+
323+
restart!
324+
retry
316325
end
317326

318327
private
319328

329+
def restart!
330+
@pipe.close rescue nil
331+
begin
332+
Process.kill(:TERM, @pid)
333+
Process.waitpid2(@pid)
334+
rescue Errno::ESRCH, Errno::ECHILD
335+
nil
336+
end
337+
boot!
338+
end
339+
320340
def send_message(*message)
321341
payload = message.to_json
322342
@pipe.write([payload.bytesize].pack("L").b, payload)
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
# frozen_string_literal: true
2+
require 'test_helper'
3+
4+
class CI::Queue::Redis::Base::HeartbeatProcessTest < Minitest::Test
5+
MAX = CI::Queue::Redis::Base::HeartbeatProcess::MAX_RESTART_ATTEMPTS
6+
7+
def setup
8+
@hp = CI::Queue::Redis::Base::HeartbeatProcess.new(
9+
'redis://localhost:6379/0',
10+
'zset', 'owners', 'leases'
11+
)
12+
# boot! and restart! must not spawn real processes
13+
@hp.stubs(:boot!)
14+
@hp.stubs(:restart!)
15+
end
16+
17+
def test_tick_retries_after_pipe_ioerror
18+
@hp.expects(:send_message).twice.raises(IOError, "closed stream").then.returns(nil)
19+
20+
@hp.tick!("test_id", "lease_id")
21+
end
22+
23+
def test_tick_retries_after_epipe
24+
@hp.expects(:send_message).twice.raises(Errno::EPIPE).then.returns(nil)
25+
26+
@hp.tick!("test_id", "lease_id")
27+
end
28+
29+
def test_tick_calls_restart_on_pipe_error
30+
@hp.stubs(:send_message).raises(IOError, "closed stream").then.returns(nil)
31+
@hp.expects(:restart!).once
32+
33+
@hp.tick!("test_id", "lease_id")
34+
end
35+
36+
def test_tick_raises_after_max_restart_attempts
37+
@hp.stubs(:send_message).raises(IOError, "closed stream")
38+
39+
assert_raises(IOError) do
40+
(MAX + 1).times { @hp.tick!("test_id", "lease_id") }
41+
end
42+
end
43+
44+
def test_restart_counter_resets_after_success
45+
# Build a sequence: [raise, return] * (MAX+1).
46+
# Without @restart_attempts = 0 on success, the (MAX+1)th failure would exceed MAX and raise.
47+
stub = @hp.stubs(:send_message)
48+
(MAX + 1).times do |i|
49+
stub = stub.raises(IOError, "closed stream").then.returns(nil)
50+
stub = stub.then unless i == MAX
51+
end
52+
53+
(MAX + 1).times { @hp.tick!("test_id", "lease_id") }
54+
end
55+
end

ruby/test/test_helper.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
require 'ci/queue/redis'
1010
require 'minitest/queue'
1111
require 'minitest/autorun'
12+
require 'mocha/minitest'
1213

1314
Minitest::Reporters.use!([Minitest::Reporters::SpecReporter.new])
1415

0 commit comments

Comments
 (0)