Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 59 additions & 34 deletions harness/drive.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
// Stop layers, all of them, because each alone fails (the unanimous field
// lesson): the judge, never the worker, decides from transcript evidence;
// -max-iters bounds every request; a no-progress detector stops iterations
// that mutate nothing; Ctrl-C always pauses to the prompt. Mutation approval
// that mutate nothing; Escape always pauses to the prompt. Mutation approval
// is untouched: gates in interactive, -yes in print mode.
package harness

Expand Down Expand Up @@ -137,9 +137,13 @@ type driveConfig struct {
// Interactive routes it into the transcript.
say func(format string, a ...any)
// turnCtx supplies each iteration's context; nil means Background. The
// interactive REPL passes its interrupt watcher's, so Ctrl-C pauses the
// interactive REPL passes its interrupt watcher's, so Escape pauses the
// drive instead of being ignored.
turnCtx func() (context.Context, func())
// drainQueued returns and clears any messages the user typed while the turn
// ran (live input). When present at an iteration boundary they are injected
// as the user's steer for the next step, in place of the judge's verdict.
drainQueued func() []string
}

// drive continues an already-run first turn until the judge rules done or
Expand All @@ -165,43 +169,61 @@ func drive(r *repl, cfg driveConfig, firstTurns []agent.Turn) int {
iterTurns := firstTurns
stuck := 0
for iter := 1; ; iter++ {
// The judge runs under the same cancellable context as the worker, so
// Ctrl-C pauses the drive during the judge phase too (not just during a
// streamed worker iteration).
jctx, jdone := turnCtx()
v, jUsed, jerr := judgeGoal(jctx, r.p, cfg.request, renderTranscript(iterTurns, 300))
jdone()
r.accountAux(jUsed) // the judge is real spend; count it, leave the gauge
switch {
case jerr != nil:
if isCanceled(jerr) {
say("== paused; your next message steers")
return driveInterrupted
// A message the user typed while the turn ran (live input) takes priority
// over the judge: act on their steer at this boundary instead of
// auto-continuing or stopping.
var steered string
if cfg.drainQueued != nil {
if q := cfg.drainQueued(); len(q) > 0 {
steered = strings.Join(q, "\n")
}
// No verdict means no mandate to keep spending: stop quietly.
say("== judge unavailable (%s); returning to you", compact(jerr.Error()))
return driveBlocked
case v.Done:
if iter > 1 {
say("== done after %d iterations: %s", iter, compact(v.Reason))
}
return driveDone
case v.Blocked:
say("== needs you: %s", compact(v.Reason))
return driveBlocked
}

if iter >= cfg.maxIters {
say("== max iterations (%d) reached; latest verdict: %s", cfg.maxIters, compact(v.Reason))
return driveMaxIters
var v verdict
if steered == "" {
// The judge runs under the same cancellable context as the worker, so
// Escape pauses the drive during the judge phase too (not just during a
// streamed worker iteration).
jctx, jdone := turnCtx()
var jUsed agent.Usage
var jerr error
v, jUsed, jerr = judgeGoal(jctx, r.p, cfg.request, renderTranscript(iterTurns, 300))
jdone()
r.accountAux(jUsed) // the judge is real spend; count it, leave the gauge
switch {
case jerr != nil:
if isCanceled(jerr) {
say("== paused; your next message steers")
return driveInterrupted
}
// No verdict means no mandate to keep spending: stop quietly.
say("== judge unavailable (%s); returning to you", compact(jerr.Error()))
return driveBlocked
case v.Done:
if iter > 1 {
say("== done after %d iterations: %s", iter, compact(v.Reason))
}
return driveDone
case v.Blocked:
say("== needs you: %s", compact(v.Reason))
return driveBlocked
}
if iter >= cfg.maxIters {
say("== max iterations (%d) reached; latest verdict: %s", cfg.maxIters, compact(v.Reason))
return driveMaxIters
}
} else {
say("== steering: %s", compact(steered))
}

start := time.Now()
mutBefore := cfg.mutations()
mark := len(r.history)
opening := render(steerPrompt("continue", continueTemplate), map[string]string{
"iteration": fmt.Sprint(iter + 1), "verdict": v.Reason, "request": cfg.request,
})
opening := steered
if opening == "" {
opening = render(steerPrompt("continue", continueTemplate), map[string]string{
"iteration": fmt.Sprint(iter + 1), "verdict": v.Reason, "request": cfg.request,
})
}
if r.preflight(opening) {
return driveBlocked
}
Expand Down Expand Up @@ -241,9 +263,12 @@ func drive(r *repl, cfg driveConfig, firstTurns []agent.Turn) int {
r.managePressure() // the window boundary stays the chain's

iterTurns = out[mark:]
if cfg.mutations() == mutBefore {
switch {
case steered != "": // the user steered: intent, not a stall
stuck = 0
case cfg.mutations() == mutBefore:
stuck++
} else {
default:
stuck = 0
}
say("== iteration %d · %s · %d in / %d out tokens",
Expand Down
50 changes: 50 additions & 0 deletions harness/drive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,56 @@ func TestDriveInterrupted(t *testing.T) {
}
}

// TestDriveInjectsQueuedSteer: a message typed during the turn (live input) is
// injected as a user turn and acted on at the next boundary, in place of the
// judge. The worker reply "ok" is not valid JSON, so if the judge still ran
// first the drive would block; reaching done proves the steer was injected.
// Breaker: drop the drainQueued branch and the judge sees "ok" -> driveBlocked.
func TestDriveInjectsQueuedSteer(t *testing.T) {
drained := false
drain := func() []string {
if drained {
return nil
}
drained = true
return []string{"do X instead"}
}
p := &seqChat{fns: []func(context.Context) (agent.Reply, error){
reply("ok"), // the steered iteration's worker reply (no tool calls)
reply(`{"done": true, "blocked": false, "reason": "done"}`), // judge on the next loop
}}
r := driveRepl(t, p, workTurns())
_, count := counting()
cfg := driveConfig{request: "fix", maxIters: 5, mutations: count, drainQueued: drain}
if code := drive(r, cfg, workTurns()); code != driveDone {
t.Fatalf("code %d, want driveDone", code)
}
found := false
for _, tn := range r.history {
if tn.Role == "user" && strings.Contains(tn.Text, "do X instead") {
found = true
}
}
if !found {
t.Fatal("the queued steer was not injected into history")
}
}

// TestInterruptsQueue: drain returns the enqueued messages and clears them, so a
// steer is consumed once. Breaker: drop the clear in drain and it re-injects
// forever.
func TestInterruptsQueue(t *testing.T) {
in := &interrupts{}
in.enqueue("a")
in.enqueue("b")
if got := in.drain(); len(got) != 2 || got[0] != "a" || got[1] != "b" {
t.Fatalf("drain = %v, want [a b]", got)
}
if got := in.drain(); len(got) != 0 {
t.Fatalf("second drain must be empty, got %v", got)
}
}

// TestDriveJudgeUnavailable: no verdict means no mandate to keep spending.
func TestDriveJudgeUnavailable(t *testing.T) {
p := &seqChat{} // judge call errors immediately
Expand Down
130 changes: 96 additions & 34 deletions harness/harness.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,19 +404,35 @@ func Main() {
}()
}

// Ctrl-C cancels the running turn, not sesh; pressed twice within
// two seconds it quits (after restoring the terminal).
// Escape cancels the running turn; Ctrl-C quits (twice within two seconds,
// after restoring the terminal).
intr := newInterrupts(func() {
con.Close()
r.goodbye()
})

say := func(f string, a ...any) {
emit("%s "+f+"%s\n", append(append([]any{dim}, a...), reset)...)
}
// Live input (type/queue/Escape-cancel while the agent works) needs the
// footer TUI and a turn that never stops to ask: with -ask the gate reads
// approvals mid-turn from the same keyboard, so that posture stays
// synchronous.
tc, isTUI := con.(*tuiConsole)
live := isTUI && !(*ask && !*autoYes)

var pending string // a queued message that becomes the next turn's input
for {
line, err := con.ReadLine("-> ")
if err != nil {
emit("\n")
r.goodbye()
return
line := pending
pending = ""
if line == "" {
l, err := con.ReadLine("-> ")
if err != nil {
emit("\n")
r.goodbye()
return
}
line = l
}
if line == "" {
continue
Expand All @@ -435,40 +451,66 @@ func Main() {
if r.preflight(line) {
continue // the message can never fit; nothing was sent
}
ctx, done := intr.turnContext()
stopSpin := r.spin()
turns, ok := r.runTurn(ctx, line, tools, hooks)
done()
cfg := driveConfig{
request: line, maxIters: *maxIters, tools: tools, hooks: hooks,
mutations: mutCount, turnCtx: intr.turnContext, drainQueued: intr.drain,
say: say,
}
// Goal-driven persistence: the request is the goal; a judged not-done
// keeps the session working until done, blocked, stuck, or the cap.
// Conversation (no tool use) never drives. Ctrl-C pauses to the prompt.
if ok {
drive(r, driveConfig{
request: line, maxIters: *maxIters, tools: tools, hooks: hooks,
mutations: mutCount, turnCtx: intr.turnContext,
say: func(f string, a ...any) {
emit("%s "+f+"%s\n", append(append([]any{dim}, a...), reset)...)
},
}, turns)
// Conversation (no tool use) never drives.
if live {
// The turn runs in the background while the editor stays live: the
// user can type a steering message (queued, injected at the next
// boundary) or press Escape to cancel.
ctx, done := intr.turnContext()
stopSpin := r.spin()
doneCh := make(chan struct{})
go func() {
defer close(doneCh)
turns, ok := r.runTurn(ctx, line, tools, hooks)
done()
if ok {
drive(r, cfg, turns)
}
}()
tc.attendTurn(turnAttend{done: doneCh, cancel: intr.cancelCurrent, queue: intr.enqueue})
<-doneCh // the worker is finished (attendTurn can also return on EOF); never overlap turns
stopSpin()
if q := intr.drain(); len(q) > 0 { // typed after the last boundary: run next
pending = strings.Join(q, "\n")
}
} else {
ctx, done := intr.turnContext()
stopSpin := r.spin()
turns, ok := r.runTurn(ctx, line, tools, hooks)
done()
if ok {
drive(r, cfg, turns)
}
stopSpin()
}
stopSpin()
}
}

// interrupts watches Ctrl-C for the whole session. During a turn the first
// press cancels that turn; a second press within the window (whether or not a
// turn is running) restores the terminal and quits. The watcher is persistent
// so the quit window spans the gap after a cancelled turn ends, which is
// exactly when an impatient second press arrives.
// interrupts owns turn control for the session: the cancel function for the
// turn in flight (the live editor's Escape calls it) and the queue of messages
// the user types while a turn runs (drained and injected as a steer at the next
// boundary). Ctrl-C is handled here too, but only to quit (a stray press warns,
// a second within the window quits), since cancelling is Escape's job now.
type interrupts struct {
mu sync.Mutex
cancel context.CancelFunc // non-nil while a turn is running
last time.Time
cleanup func()
queued []string // messages typed during a turn, drained at the next boundary
}

const doublePressWindow = 2 * time.Second

// newInterrupts wires Ctrl-C to quit (a stray press warns first, a second within
// the window quits and restores the terminal). Cancelling a turn is Escape's
// job, handled by the live editor, so Ctrl-C is left purely for quitting.
func newInterrupts(cleanup func()) *interrupts {
in := &interrupts{cleanup: cleanup}
sigc := make(chan os.Signal, 1)
Expand All @@ -478,23 +520,43 @@ func newInterrupts(cleanup func()) *interrupts {
in.mu.Lock()
double := time.Since(in.last) < doublePressWindow
in.last = time.Now()
cancel := in.cancel
in.mu.Unlock()
switch {
case double:
if double {
in.cleanup()
os.Exit(130)
case cancel != nil:
emit("\n%s cancelling turn... (ctrl-c again to quit)%s\n", yellow, reset)
cancel()
default:
emit("\n%s (ctrl-c again to quit)%s\n", yellow, reset)
}
emit("\n%s (ctrl-c again to quit; esc cancels the turn)%s\n", yellow, reset)
}
}()
return in
}

// cancelCurrent aborts the turn in flight, if any (the live editor's Escape).
func (in *interrupts) cancelCurrent() {
in.mu.Lock()
c := in.cancel
in.mu.Unlock()
if c != nil {
c()
}
}

// enqueue stashes a message typed while a turn runs; drain returns and clears
// the queue at a safe boundary, where it can be injected as a user turn.
func (in *interrupts) enqueue(s string) {
in.mu.Lock()
in.queued = append(in.queued, s)
in.mu.Unlock()
}

func (in *interrupts) drain() []string {
in.mu.Lock()
q := in.queued
in.queued = nil
in.mu.Unlock()
return q
}

// turnContext hands out a cancellable context for one turn and the cleanup
// that detaches it from the watcher.
func (in *interrupts) turnContext() (context.Context, func()) {
Expand Down
3 changes: 2 additions & 1 deletion harness/help.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ GOAL-DRIVEN PERSISTENCE (default in every mode)
fresh-context judge rules from transcript evidence: done (stop), blocked
(return to the user), or continue (the reason feeds the next iteration and
work resumes). Plain conversation is never judged and never loops.
Stop layers: -max-iters, a no-progress detector, -max-tools, Ctrl-C.
Stop layers: -max-iters, a no-progress detector, -max-tools, Esc (cancels a
turn; type while it works to steer at the next step). Ctrl-C quits.

CONTINUITY (infinite sessions)
Context pressure is managed by handoff, never lossy in-place compaction: at
Expand Down
Loading
Loading