-
Notifications
You must be signed in to change notification settings - Fork 12
Expand file tree
/
Copy pathfile_output_manager.py
More file actions
258 lines (213 loc) · 9.37 KB
/
file_output_manager.py
File metadata and controls
258 lines (213 loc) · 9.37 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
"""File Output Manager — JIT Context for inter-agent artifact passing.
Instead of passing full text summaries between agents (which degrades like
a game of telephone), this module maintains a registry of **real files**
produced by each task. Downstream agents receive lightweight file-path
references and read the source of truth directly.
Inspired by Anthropic's multi-agent research system and the JIT Context
pattern from "Memory in the Age of AI Agents" (arXiv:2512.13564).
Integration points:
dag_executor._run_single_task — call ``registry.register(output)`` after
each successful task completion.
dag_executor._run_single_task — call ``registry.enhance_prompt(...)``
before building the agent prompt.
"""
from __future__ import annotations
import json
import logging
import os
from dataclasses import dataclass
from pathlib import Path
from typing import Any
from contracts import TaskInput, TaskOutput, TaskStatus
logger = logging.getLogger(__name__)
# ── File-type inference ──────────────────────────────────────────────────────
_EXT_MAP: dict[str, str] = {
".ts": "code",
".tsx": "code",
".js": "code",
".jsx": "code",
".py": "code",
".go": "code",
".rs": "code",
".java": "code",
".sql": "code",
".sh": "code",
".css": "code",
".scss": "code",
".html": "markup",
".xml": "markup",
".svg": "markup",
".json": "data",
".yaml": "data",
".yml": "data",
".toml": "data",
".csv": "data",
".env": "data",
".md": "doc",
".txt": "doc",
".rst": "doc",
".png": "asset",
".jpg": "asset",
".gif": "asset",
".ico": "asset",
".woff": "asset",
".woff2": "asset",
".ttf": "asset",
".lock": "lockfile",
}
def infer_file_type(path: str) -> str:
"""Return a human-friendly file type label based on extension."""
ext = Path(path).suffix.lower()
return _EXT_MAP.get(ext, "file")
# ── Artifact Reference ───────────────────────────────────────────────────────
@dataclass
class ArtifactRef:
"""A lightweight pointer to a file produced by a task."""
task_id: str
path: str # relative to project root
file_type: str # code | data | doc | asset | ...
description: str # from Artifact.title or auto-generated
# ── Artifact Registry ────────────────────────────────────────────────────────
class ArtifactRegistry:
"""Tracks all file artifacts produced during a DAG execution.
Lifecycle:
1. Created once per ``execute_graph`` call.
2. After each successful task, ``register(output)`` is called.
3. Before each task prompt is built, ``enhance_prompt(task, prompt)``
injects file references for upstream dependencies.
"""
def __init__(self, project_dir: str) -> None:
self._project_dir = project_dir
# task_id -> list of ArtifactRef
self._refs: dict[str, list[ArtifactRef]] = {}
# ── Registration ─────────────────────────────────────────────────────
def register(self, output: TaskOutput) -> int:
"""Extract file references from a completed task output.
Returns the number of artifacts registered.
"""
if output.status != TaskStatus.COMPLETED:
return 0
refs: list[ArtifactRef] = []
# 1. Structured artifacts (typed, with metadata)
for art in output.structured_artifacts:
path = art.file_path
if not path:
continue
resolved = self._resolve(path)
if resolved and os.path.exists(resolved):
refs.append(
ArtifactRef(
task_id=output.task_id,
path=path,
file_type=infer_file_type(path),
description=art.title,
)
)
# 2. Plain artifact paths (list[str] of file paths)
seen_paths = {r.path for r in refs}
for path in output.artifacts:
if path in seen_paths:
continue
resolved = self._resolve(path)
if resolved and os.path.exists(resolved):
refs.append(
ArtifactRef(
task_id=output.task_id,
path=path,
file_type=infer_file_type(path),
description=f"File produced by task {output.task_id}",
)
)
seen_paths.add(path)
self._refs[output.task_id] = refs
if refs:
logger.info(
"[FileOutputManager] Registered %d artifacts from task %s: %s",
len(refs),
output.task_id,
[r.path for r in refs],
)
return len(refs)
# ── Prompt Enhancement ───────────────────────────────────────────────
def get_refs_for_task(self, task: TaskInput) -> list[ArtifactRef]:
"""Collect artifact refs from all upstream tasks (context_from)."""
refs: list[ArtifactRef] = []
seen: set[str] = set()
for upstream_id in task.context_from:
for ref in self._refs.get(upstream_id, []):
if ref.path not in seen:
refs.append(ref)
seen.add(ref.path)
# Also include input_artifacts declared on the task
for path in task.input_artifacts:
if path not in seen:
resolved = self._resolve(path)
if resolved and os.path.exists(resolved):
refs.append(
ArtifactRef(
task_id="input",
path=path,
file_type=infer_file_type(path),
description=f"Input artifact: {Path(path).name}",
)
)
seen.add(path)
return refs
def enhance_prompt(self, task: TaskInput, prompt: str) -> str:
"""Inject file artifact references into the agent prompt.
Adds a clearly-delimited section telling the agent which files
from upstream tasks are available and should be read directly.
"""
refs = self.get_refs_for_task(task)
if not refs:
return prompt
lines = [
"",
"## Upstream Artifacts (read these files directly — they are the source of truth)",
"",
]
for ref in refs:
lines.append(f"- **{ref.path}** ({ref.file_type}) — {ref.description}")
lines.append("")
lines.append(
"IMPORTANT: Read the files listed above instead of relying on "
"summary text. They contain the actual, up-to-date content."
)
lines.append("")
return prompt + "\n".join(lines)
# ── Manifest ─────────────────────────────────────────────────────────
def save_manifest(self, path: str | None = None) -> str:
"""Write a JSON manifest of all registered artifacts.
Returns the path to the manifest file.
"""
manifest_path = path or os.path.join(
self._project_dir, ".hivemind", "artifact_manifest.json"
)
os.makedirs(os.path.dirname(manifest_path), exist_ok=True)
data: dict[str, Any] = {}
for task_id, refs in self._refs.items():
data[task_id] = [
{"path": r.path, "type": r.file_type, "description": r.description} for r in refs
]
with open(manifest_path, "w") as f:
json.dump(data, f, indent=2)
logger.info("[FileOutputManager] Manifest saved to %s", manifest_path)
return manifest_path
# ── Stats ────────────────────────────────────────────────────────────
def stats(self) -> dict[str, Any]:
"""Return summary statistics."""
all_refs = [r for refs in self._refs.values() for r in refs]
type_counts: dict[str, int] = {}
for r in all_refs:
type_counts[r.file_type] = type_counts.get(r.file_type, 0) + 1
return {
"total_tasks": len(self._refs),
"total_artifacts": len(all_refs),
"by_type": type_counts,
}
# ── Internal ─────────────────────────────────────────────────────────
def _resolve(self, path: str) -> str:
"""Resolve a path relative to project_dir if not absolute."""
if os.path.isabs(path):
return path
return os.path.join(self._project_dir, path)