Skip to content

Commit ed501ad

Browse files
santibclaude
andauthored
Gate guard model with halt scopes (#28)
* Gate executes branches directly + class-level DSL Phase 4 of the Mars v2 refactor. - Gate: add class-level `condition`/`branch` DSL for reusable gates. Gate#run now executes the matched branch directly instead of returning a Runnable for Sequential to detect - Aggregator: context-aware — accepts ExecutionContext and passes its outputs to the operation - Sequential: remove is_a?(Runnable) check, now just chains step results Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * Address review comments * Fix linters * Change Halt behavior --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 9a7a5f2 commit ed501ad

14 files changed

Lines changed: 370 additions & 91 deletions

File tree

README.md

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -120,19 +120,29 @@ parallel = MARS::Workflows::Parallel.new(
120120

121121
### Gates
122122

123-
Create conditional branching in your workflows:
123+
Gates act as guards that either let the workflow continue or divert to a fallback path:
124124

125125
```ruby
126126
gate = MARS::Gate.new(
127-
"Decision Gate",
128-
condition: ->(input) { input[:score] > 0.5 ? :success : :failure },
129-
branches: {
130-
success: success_workflow,
127+
"Validation Gate",
128+
check: ->(input) { :failure unless input[:score] > 0.5 },
129+
fallbacks: {
131130
failure: failure_workflow
132131
}
133132
)
134133
```
135134

135+
Control halt scope — `:local` (default) stops only the parent workflow, `:global` propagates to the root:
136+
137+
```ruby
138+
gate = MARS::Gate.new(
139+
"Critical Gate",
140+
check: ->(input) { :error unless input[:valid] },
141+
fallbacks: { error: error_workflow },
142+
halt_scope: :global
143+
)
144+
```
145+
136146
### Visualization
137147

138148
Generate Mermaid diagrams to visualize your workflows:

examples/complex_llm_workflow/generator.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,8 +91,8 @@ def tools
9191
)
9292

9393
gate = MARS::Gate.new(
94-
condition: ->(input) { input.split.length < 10 ? :success : :failure },
95-
branches: {
94+
check: ->(input) { :failure unless input.split.length < 10 },
95+
fallbacks: {
9696
failure: error_workflow
9797
}
9898
)

examples/complex_workflow/generator.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,8 @@ class Agent5 < MARS::Agent
4646

4747
# Create the gate that decides between exit or continue
4848
gate = MARS::Gate.new(
49-
condition: ->(input) { input[:result] },
50-
branches: {
49+
check: ->(input) { input[:result] },
50+
fallbacks: {
5151
warning: sequential_workflow,
5252
error: parallel_workflow
5353
}

examples/simple_workflow/generator.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@ class Agent3 < MARS::Agent
2626

2727
# Create the gate that decides between exit or continue
2828
gate = MARS::Gate.new(
29-
condition: ->(input) { input[:result] },
30-
branches: {
29+
check: ->(input) { input[:result] },
30+
fallbacks: {
3131
success: success_workflow
3232
}
3333
)

lib/mars/gate.rb

Lines changed: 36 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,21 +2,51 @@
22

33
module MARS
44
class Gate < Runnable
5-
def initialize(name = "Gate", condition:, branches:, **kwargs)
5+
class << self
6+
def check(&block)
7+
@check_block = block
8+
end
9+
10+
attr_reader :check_block
11+
12+
def fallback(key, runnable)
13+
fallbacks_map[key] = runnable
14+
end
15+
16+
def fallbacks_map
17+
@fallbacks_map ||= {}
18+
end
19+
20+
def halt_scope(scope = nil)
21+
scope ? @halt_scope = scope : @halt_scope
22+
end
23+
end
24+
25+
def initialize(name = "Gate", check: nil, fallbacks: nil, halt_scope: nil, **kwargs)
626
super(name: name, **kwargs)
727

8-
@condition = condition
9-
@branches = branches
28+
@check = check || self.class.check_block
29+
@fallbacks = fallbacks || self.class.fallbacks_map
30+
@halt_scope = halt_scope || self.class.halt_scope || :local
1031
end
1132

1233
def run(input)
13-
result = condition.call(input)
34+
result = check.call(input)
35+
36+
return input unless result
37+
38+
branch = fallbacks[result]
39+
raise ArgumentError, "No fallback registered for #{result.inspect}" unless branch
1440

15-
branches[result] || input
41+
Halt.new(resolve_branch(branch).run(input), scope: @halt_scope)
1642
end
1743

1844
private
1945

20-
attr_reader :condition, :branches
46+
attr_reader :check, :fallbacks
47+
48+
def resolve_branch(branch)
49+
branch.is_a?(Class) ? branch.new : branch
50+
end
2151
end
2252
end

lib/mars/halt.rb

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
# frozen_string_literal: true
2+
3+
module MARS
4+
class Halt
5+
attr_reader :result, :scope
6+
7+
def initialize(result, scope: :local)
8+
@result = result
9+
@scope = scope
10+
end
11+
12+
def local? = scope == :local
13+
def global? = scope == :global
14+
end
15+
end

lib/mars/rendering/graph/gate.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@ def to_graph(builder, parent_id: nil, value: nil)
1010
builder.add_node(node_id, name, Node::GATE)
1111
builder.add_edge(parent_id, node_id, value)
1212

13-
sink_nodes = branches.map do |condition_result, branch|
14-
branch.to_graph(builder, parent_id: node_id, value: condition_result)
13+
sink_nodes = fallbacks.map do |fallback_key, branch|
14+
branch.to_graph(builder, parent_id: node_id, value: fallback_key)
1515
end
1616

1717
sink_nodes.flatten

lib/mars/workflows/parallel.rb

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,23 @@ def initialize(name, steps:, aggregator: nil, **kwargs)
1212

1313
def run(input)
1414
errors = []
15-
results = Async do |workflow|
16-
tasks = @steps.map do |step|
15+
results = execute_steps(input, errors)
16+
17+
raise AggregateError, errors if errors.any?
18+
19+
has_global_halt = results.any? { |r| r.is_a?(Halt) && r.global? }
20+
unwrapped = results.map { |r| r.is_a?(Halt) ? r.result : r }
21+
result = aggregator.run(unwrapped)
22+
has_global_halt ? Halt.new(result, scope: :global) : result
23+
end
24+
25+
private
26+
27+
attr_reader :steps, :aggregator
28+
29+
def execute_steps(input, errors)
30+
Async do |workflow|
31+
tasks = steps.map do |step|
1732
workflow.async do
1833
step.run(input)
1934
rescue StandardError => e
@@ -23,15 +38,7 @@ def run(input)
2338

2439
tasks.map(&:wait)
2540
end.result
26-
27-
raise AggregateError, errors if errors.any?
28-
29-
aggregator.run(results)
3041
end
31-
32-
private
33-
34-
attr_reader :steps, :aggregator
3542
end
3643
end
3744
end

lib/mars/workflows/sequential.rb

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,14 @@ def initialize(name, steps:, **kwargs)
1111

1212
def run(input)
1313
@steps.each do |step|
14-
result = step.run(input)
15-
16-
if result.is_a?(Runnable)
17-
input = result.run(input)
18-
break
19-
else
20-
input = result
21-
end
14+
input = step.run(input)
15+
16+
next unless input.is_a?(Halt)
17+
18+
return input if input.global?
19+
20+
input = input.result
21+
break
2222
end
2323

2424
input

spec/mars/aggregator_spec.rb

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
RSpec.describe MARS::Aggregator do
44
describe "#run" do
5-
context "when called without a block" do
5+
context "when called without an operation" do
66
let(:aggregator) { described_class.new }
77

88
it "returns the input as is" do
@@ -11,10 +11,10 @@
1111
end
1212
end
1313

14-
context "when initialized with a block operation" do
14+
context "when initialized with an operation" do
1515
let(:aggregator) { described_class.new("Aggregator", operation: lambda(&:join)) }
1616

17-
it "executes the block and returns its value" do
17+
it "executes the operation and returns its value" do
1818
result = aggregator.run(%w[a b c])
1919
expect(result).to eq("abc")
2020
end

0 commit comments

Comments
 (0)