forked from use-less-vars/ThoughtMachine
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathagent_controller.py
More file actions
280 lines (246 loc) · 10.8 KB
/
Copy pathagent_controller.py
File metadata and controls
280 lines (246 loc) · 10.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
# agent_controller.py
import threading
import queue
import traceback
from agent_core import AgentConfig
from agent import Agent
from typing import Optional, List, Dict, Any
from PyQt6.QtCore import QObject, pyqtSignal
class AgentController(QObject):
"""
Runs the agent in a background thread and provides thread‑safe control
via start/stop/pause/resume and a queue for receiving events.
"""
# Signals
event_occurred = pyqtSignal(dict)
def __init__(self):
super().__init__()
# Thread‑safe queue for passing events from agent thread to main thread
self.event_queue = queue.Queue()
# Events for controlling the agent thread
self.stop_event = threading.Event() # when set, agent should stop
self.pause_event = threading.Event() # when set, agent can run (otherwise paused)
self.pause_event.set() # start unpaused
# Thread handle and running flag
self.thread = None
self._running = False
self._initial_conversation = None
self.agent = None
# Query queue for keep-alive mode
self.query_queue = queue.Queue()
self._keep_alive = True
self._pause_requested = False
self._processing_query = False
def _cleanup_if_thread_dead(self):
"""Check if background thread is dead and reset state if needed."""
if self.thread is not None and not self.thread.is_alive():
# Thread has finished but state wasn't cleaned up
self._running = False
self.thread = None
self.agent = None # Clear old agent reference
self._keep_alive = True
self._pause_requested = False
self._processing_query = False
print(f"[Controller] Cleaned up dead thread, _running={self._running}")
def reset(self):
"""Reset controller to initial state, clearing all queues and events."""
# Clear event queue
while True:
try:
self.event_queue.get_nowait()
except queue.Empty:
break
# Clear query queue
while True:
try:
self.query_queue.get_nowait()
except queue.Empty:
break
# Reset events
self.stop_event.clear()
self.pause_event.set() # start unpaused
# Reset state
self.thread = None
self._running = False
self._initial_conversation = None
self.agent = None
self._keep_alive = True
self._pause_requested = False
self._processing_query = False
print("[Controller] Reset to initial state")
@property
def is_running(self):
"""Return True if the agent thread is alive."""
# Check both the running flag and thread status
if self.thread is not None and self.thread.is_alive():
return True
# Thread is dead or doesn't exist, ensure state is cleaned up
if self._running:
# Thread died unexpectedly, clean up
self._cleanup_if_thread_dead()
return self._running
def get_config(self):
"""Return the current AgentConfig being used."""
return self._config
def start(self, query: str, config: AgentConfig, initial_conversation: Optional[List[Dict[str, Any]]] = None):
"""
Start the agent with the given query and configuration.
Args:
query: The user query string.
config: An AgentConfig instance (api_key, model, etc.).
initial_conversation: Optional previous conversation history to continue from.
"""
print(f"[Controller] start called with query: {query[:50]}...")
# Clean up any dead thread state
self._cleanup_if_thread_dead()
if self._running:
raise RuntimeError("Agent is already running. Stop it first.")
# Reset control events
self.stop_event.clear()
self.pause_event.set() # ensure we start unpaused
# Reset internal state flags
self._keep_alive = True
self._pause_requested = False
self._processing_query = False
# Store query and config for the background thread
self._query = query
self._config = config
self._initial_conversation = initial_conversation
# Enqueue the initial query
self.query_queue.put(query)
# Create and start the daemon thread
self.thread = threading.Thread(target=self._run, daemon=True)
self._running = True
self.thread.start()
def stop(self):
"""Request the agent to stop after the current turn/tool."""
self.stop_event.set()
self.pause_event.set() # if paused, resume so stop can be noticed
def continue_session(self, query: str):
"""Submit a new query to the already running agent."""
if not self.is_running:
raise RuntimeError("Agent not running. Use start() first.")
self.resume()
self.query_queue.put(query)
def request_pause(self):
"""Request agent to pause after current turn."""
if not self.is_running:
raise RuntimeError("Agent not running. Use start() first.")
if self._processing_query:
# Agent is currently processing a query, set pause flag
self.pause()
else:
# Agent is idle, send paused event directly
self._emit_event({"type": "paused"})
def restart_session(self):
"""Restart agent with cleared history."""
if not self.is_running:
raise RuntimeError("Agent not running. Use start() first.")
if self.agent:
self.agent.request_reset()
# Also submit a sentinel to trigger reset in queue
self.query_queue.put("[RESET]")
def pause(self):
"""Pause the agent before the next turn (finishes current turn first)."""
self.pause_event.clear()
self._pause_requested = True
def resume(self):
"""Resume a paused agent."""
self.pause_event.set()
self._pause_requested = False
def get_event(self, block=False, timeout=None):
"""
Retrieve an event from the queue.
Args:
block: If True, wait until an event is available.
timeout: Maximum time to wait when block=True (None = wait forever).
Returns:
The next event dict, or None if no event is available (when block=False).
"""
try:
return self.event_queue.get(block=block, timeout=timeout)
except queue.Empty:
return None
def _emit_event(self, event):
"""Emit event both to queue and signal."""
# Put into queue for compatibility
self.event_queue.put(event)
# Emit signal for presenter
self.event_occurred.emit(event)
def _run(self):
"""Internal method that runs in the background thread."""
print("[Controller] _run started")
try:
# Define the stop_check function that the agent will call before each turn
def should_stop():
# If paused, wait until pause_event is set again
print(f"[Controller] should_stop called, pause_event.is_set={self.pause_event.is_set()}, stop_event.is_set={self.stop_event.is_set()}")
self.pause_event.wait() # blocks while paused
return self.stop_event.is_set()
# Inject the stop_check into a copy of the config to avoid mutating the original
run_config = self._config.model_copy() if hasattr(self._config, 'model_copy') else self._config
run_config.stop_check = should_stop
if self._initial_conversation is not None:
run_config.initial_conversation = self._initial_conversation
# Create Agent instance
agent = Agent(run_config, initial_conversation=self._initial_conversation)
self.agent = agent # store for potential reuse
# Main loop: process queries from queue
while self._keep_alive:
# Wait for next query
try:
query = self.query_queue.get(timeout=1.0)
except queue.Empty:
# Check if we should stop
if self.stop_event.is_set():
break
continue
if query == "[RESET]":
agent.reset()
continue
if query == "[PAUSE]":
print("[Controller] Pause requested")
self._emit_event({"type": "paused"})
continue
print(f"[Controller] Processing query: {query[:50]}...")
self._processing_query = True
# Run the agent for this query
for event in agent.process_query(query):
print(f"[Controller] Event: {event['type']}")
# Put each event into the queue for the GUI to pick up
self._emit_event(event)
# If this is a terminal event, decide what to do
if event["type"] in ("stopped", "error", "max_turns"):
# These are fatal, stop the whole agent thread
self._keep_alive = False
break
elif event["type"] in ("final", "user_interaction_requested"):
# Agent has completed this query, pause and wait for next query
# Yield a paused event to inform GUI
print("[Controller] Sending paused event")
self._emit_event({"type": "paused"})
break
# For other events (turn), continue processing
# Check if pause requested after a turn
if event["type"] == "turn" and self._pause_requested:
print("[Controller] Pause requested, breaking after turn")
self._pause_requested = False
self._emit_event({"type": "paused"})
break
self._processing_query = False
# If _keep_alive becomes False, break outer loop
if not self._keep_alive:
break
except Exception as e:
# Catch any unexpected exception and send an error event
print(f"[Controller] Exception in _run: {e}")
traceback.print_exc()
self._emit_event({
"type": "error",
"message": str(e),
"traceback": traceback.format_exc() # helpful for debugging
})
finally:
# Signal that the thread is finishing
self._emit_event({"type": "thread_finished"})
self._running = False