feat: created new graph-based deep-research agent

This commit is contained in:
2026-05-21 11:27:55 -06:00
parent 597f823bdf
commit d81d233527
13 changed files with 1037 additions and 0 deletions
@@ -0,0 +1,18 @@
#!/usr/bin/env python3
"""Fan-out source for context loading.
Has no logic of its own. Exists so the static `next: [plan, knowledge_lookup]`
list on this node fans out into two parallel branches (the LLM planner and
the RAG knowledge lookup) as a single super-step. The validator requires
declared parallel-branch script outputs, so we emit an empty JSON object
explicitly here.
"""
import json
def main():
print(json.dumps({}))
if __name__ == "__main__":
main()
@@ -0,0 +1,39 @@
#!/usr/bin/env python3
"""Join the per-question map outputs into a single `findings` string.
The `research_each_question` map writes `question_findings` (an array,
one entry per sub-question, in input order). Downstream nodes
(`vet_sources`, `critique`, `synthesize`) read `{{findings}}` as a
single block, so this script renders the array as a Markdown document
with one section per question.
"""
import json
import os
def load_state():
path = os.environ.get("GRAPH_STATE_FILE")
if path:
with open(path) as f:
return json.load(f)
return json.loads(os.environ.get("GRAPH_STATE", "{}"))
def main():
state = load_state()
questions = state.get("questions") or []
per_question = state.get("question_findings") or []
sections = []
for idx, q in enumerate(questions):
body = per_question[idx] if idx < len(per_question) else ""
if isinstance(body, dict) or isinstance(body, list):
body = json.dumps(body, indent=2)
sections.append(f"## {q}\n\n{body}")
findings = "\n\n".join(sections) if sections else "No findings gathered."
print(json.dumps({"findings": findings}))
if __name__ == "__main__":
main()
@@ -0,0 +1,41 @@
#!/usr/bin/env python3
"""Fold a reviewer's free-form feedback back into the research loop.
Runs when the user answers the approval step with their own text
instead of "accept" or "reject". That text (saved by the approval node
as `decision`) becomes `research_feedback`, and the graph loops back to
`research_each_question` for another informed pass (each sub-question is
re-researched in parallel with the new feedback in context). The
reflexion counter is reset so the user-driven pass gets a fresh revision
budget.
Routing (`_next`): always research_each_question.
"""
import json
import os
def load_state():
path = os.environ.get("GRAPH_STATE_FILE")
if path:
with open(path) as f:
return json.load(f)
return json.loads(os.environ.get("GRAPH_STATE", "{}"))
def main():
state = load_state()
feedback = (state.get("decision") or "").strip()
output = {
"_next": "research_each_question",
"research_attempts": 0,
"research_feedback": (
"The user reviewed the report and asked for changes. Treat "
"this as the top priority for the next pass:\n\n" + feedback
),
}
print(json.dumps(output))
if __name__ == "__main__":
main()
@@ -0,0 +1,35 @@
#!/usr/bin/env python3
"""Entry router for deep-research.
Reads the caller's prompt from state. If it contains a usable research
topic, stores it as `topic` and falls through to the static `next`
(plan). If the prompt is empty, routes to `ask_topic` so the user can
supply one interactively.
Routing (`_next`):
- prompt present -> (no _next; static next: plan)
- prompt empty -> ask_topic
"""
import json
import os
def load_state():
path = os.environ.get("GRAPH_STATE_FILE")
if path:
with open(path) as f:
return json.load(f)
return json.loads(os.environ.get("GRAPH_STATE", "{}"))
def main():
state = load_state()
prompt = (state.get("initial_prompt") or "").strip()
if prompt:
print(json.dumps({"topic": prompt}))
else:
print(json.dumps({"_next": "ask_topic"}))
if __name__ == "__main__":
main()
@@ -0,0 +1,76 @@
#!/usr/bin/env python3
"""Reflexion gate for deep-research.
Runs after `critique` has reviewed the current research findings. If the
critique's verdict is REVISE and the reflexion budget is not spent,
loops back to `research` with the critique attached as
`research_feedback`, so the retry is informed rather than a blind
re-run. Otherwise it proceeds to `synthesize`.
Routing (`_next`):
- verdict PASS -> synthesize
- verdict REVISE, budget remaining -> research_each_question (+ research_feedback)
- verdict REVISE, budget spent -> synthesize
Reflexion is a best-effort quality booster, not a hard gate: once the
budget is spent the workflow proceeds anyway, and the human approval
step is the final backstop.
"""
import json
import os
import re
# Automated revision passes allowed. `research` runs at most
# MAX_REFLEXION_REVISIONS + 1 times per user pass. Bump to allow more.
MAX_REFLEXION_REVISIONS = 2
def load_state():
path = os.environ.get("GRAPH_STATE_FILE")
if path:
with open(path) as f:
return json.load(f)
return json.loads(os.environ.get("GRAPH_STATE", "{}"))
def as_int(value, default=0):
try:
return int(value)
except (TypeError, ValueError):
return default
def parse_verdict(critique):
"""Pull PASS/REVISE from the critique's `VERDICT:` line. Defaults to
PASS when no verdict line is found, so a malformed critique lets the
workflow proceed instead of burning the whole revision budget."""
match = re.search(r"VERDICT:\s*([A-Za-z]+)", critique, re.IGNORECASE)
if not match:
return "PASS"
return match.group(1).upper()
def main():
state = load_state()
critique = state.get("critique") or ""
verdict = parse_verdict(critique)
attempts = as_int(state.get("research_attempts"))
if verdict == "REVISE" and attempts < MAX_REFLEXION_REVISIONS:
feedback = (
"A reviewer judged the previous research pass incomplete. "
"Address every point in the critique below:\n\n" + critique
)
output = {
"_next": "research_each_question",
"research_attempts": attempts + 1,
"research_feedback": feedback,
}
else:
output = {"_next": "synthesize"}
print(json.dumps(output))
if __name__ == "__main__":
main()
@@ -0,0 +1,69 @@
#!/usr/bin/env python3
"""Check that the sources cited in the research report are reachable.
Scans the final report for URLs and DOIs, probes each with a HEAD
request, and writes a `source_check` summary into state so the human
reviewer sees broken citations at the approval step.
Times out per request so a slow source cannot stall the graph.
"""
import json
import os
import re
import urllib.error
import urllib.request
DOI_RE = re.compile(r"\b(10\.\d{4,9}/[-._;()/:A-Z0-9]+)", re.IGNORECASE)
URL_RE = re.compile(r"https?://[^\s)\]\}\"'>]+")
def load_state():
path = os.environ.get("GRAPH_STATE_FILE")
if path:
with open(path) as f:
return json.load(f)
return json.loads(os.environ.get("GRAPH_STATE", "{}"))
def reachable(url, timeout=5.0):
req = urllib.request.Request(url, method="HEAD")
try:
with urllib.request.urlopen(req, timeout=timeout) as resp:
return 200 <= resp.status < 400
except urllib.error.HTTPError as e:
return 200 <= e.code < 400
except Exception:
return False
def main():
state = load_state()
report = state.get("report") or ""
urls = sorted({u.rstrip(".,;)") for u in URL_RE.findall(report)})
dois = sorted(set(DOI_RE.findall(report)))
results = []
for url in urls:
ok = reachable(url)
results.append(f" {'OK' if ok else 'UNREACHABLE'} {url}")
for doi in dois:
url = f"https://doi.org/{doi}"
if url in urls:
continue
ok = reachable(url)
results.append(f" {'OK' if ok else 'UNREACHABLE'} DOI {doi} ({url})")
if not results:
summary = "No web sources were cited in the report."
else:
summary = (
f"Source reachability ({len(results)} checked):\n"
+ "\n".join(results)
)
print(json.dumps({"source_check": summary}))
if __name__ == "__main__":
main()