feat: created new graph-based deep-research agent
This commit is contained in:
@@ -0,0 +1,18 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Fan-out source for context loading.
|
||||
|
||||
Has no logic of its own. Exists so the static `next: [plan, knowledge_lookup]`
|
||||
list on this node fans out into two parallel branches (the LLM planner and
|
||||
the RAG knowledge lookup) as a single super-step. The validator requires
|
||||
declared parallel-branch script outputs, so we emit an empty JSON object
|
||||
explicitly here.
|
||||
"""
|
||||
import json
|
||||
|
||||
|
||||
def main():
|
||||
print(json.dumps({}))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -0,0 +1,39 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Join the per-question map outputs into a single `findings` string.
|
||||
|
||||
The `research_each_question` map writes `question_findings` (an array,
|
||||
one entry per sub-question, in input order). Downstream nodes
|
||||
(`vet_sources`, `critique`, `synthesize`) read `{{findings}}` as a
|
||||
single block, so this script renders the array as a Markdown document
|
||||
with one section per question.
|
||||
"""
|
||||
import json
|
||||
import os
|
||||
|
||||
|
||||
def load_state():
|
||||
path = os.environ.get("GRAPH_STATE_FILE")
|
||||
if path:
|
||||
with open(path) as f:
|
||||
return json.load(f)
|
||||
return json.loads(os.environ.get("GRAPH_STATE", "{}"))
|
||||
|
||||
|
||||
def main():
|
||||
state = load_state()
|
||||
questions = state.get("questions") or []
|
||||
per_question = state.get("question_findings") or []
|
||||
|
||||
sections = []
|
||||
for idx, q in enumerate(questions):
|
||||
body = per_question[idx] if idx < len(per_question) else ""
|
||||
if isinstance(body, dict) or isinstance(body, list):
|
||||
body = json.dumps(body, indent=2)
|
||||
sections.append(f"## {q}\n\n{body}")
|
||||
|
||||
findings = "\n\n".join(sections) if sections else "No findings gathered."
|
||||
print(json.dumps({"findings": findings}))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -0,0 +1,41 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Fold a reviewer's free-form feedback back into the research loop.
|
||||
|
||||
Runs when the user answers the approval step with their own text
|
||||
instead of "accept" or "reject". That text (saved by the approval node
|
||||
as `decision`) becomes `research_feedback`, and the graph loops back to
|
||||
`research_each_question` for another informed pass (each sub-question is
|
||||
re-researched in parallel with the new feedback in context). The
|
||||
reflexion counter is reset so the user-driven pass gets a fresh revision
|
||||
budget.
|
||||
|
||||
Routing (`_next`): always research_each_question.
|
||||
"""
|
||||
import json
|
||||
import os
|
||||
|
||||
|
||||
def load_state():
|
||||
path = os.environ.get("GRAPH_STATE_FILE")
|
||||
if path:
|
||||
with open(path) as f:
|
||||
return json.load(f)
|
||||
return json.loads(os.environ.get("GRAPH_STATE", "{}"))
|
||||
|
||||
|
||||
def main():
|
||||
state = load_state()
|
||||
feedback = (state.get("decision") or "").strip()
|
||||
output = {
|
||||
"_next": "research_each_question",
|
||||
"research_attempts": 0,
|
||||
"research_feedback": (
|
||||
"The user reviewed the report and asked for changes. Treat "
|
||||
"this as the top priority for the next pass:\n\n" + feedback
|
||||
),
|
||||
}
|
||||
print(json.dumps(output))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -0,0 +1,35 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Entry router for deep-research.
|
||||
|
||||
Reads the caller's prompt from state. If it contains a usable research
|
||||
topic, stores it as `topic` and falls through to the static `next`
|
||||
(plan). If the prompt is empty, routes to `ask_topic` so the user can
|
||||
supply one interactively.
|
||||
|
||||
Routing (`_next`):
|
||||
- prompt present -> (no _next; static next: plan)
|
||||
- prompt empty -> ask_topic
|
||||
"""
|
||||
import json
|
||||
import os
|
||||
|
||||
|
||||
def load_state():
|
||||
path = os.environ.get("GRAPH_STATE_FILE")
|
||||
if path:
|
||||
with open(path) as f:
|
||||
return json.load(f)
|
||||
return json.loads(os.environ.get("GRAPH_STATE", "{}"))
|
||||
|
||||
|
||||
def main():
|
||||
state = load_state()
|
||||
prompt = (state.get("initial_prompt") or "").strip()
|
||||
if prompt:
|
||||
print(json.dumps({"topic": prompt}))
|
||||
else:
|
||||
print(json.dumps({"_next": "ask_topic"}))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -0,0 +1,76 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Reflexion gate for deep-research.
|
||||
|
||||
Runs after `critique` has reviewed the current research findings. If the
|
||||
critique's verdict is REVISE and the reflexion budget is not spent,
|
||||
loops back to `research` with the critique attached as
|
||||
`research_feedback`, so the retry is informed rather than a blind
|
||||
re-run. Otherwise it proceeds to `synthesize`.
|
||||
|
||||
Routing (`_next`):
|
||||
- verdict PASS -> synthesize
|
||||
- verdict REVISE, budget remaining -> research_each_question (+ research_feedback)
|
||||
- verdict REVISE, budget spent -> synthesize
|
||||
|
||||
Reflexion is a best-effort quality booster, not a hard gate: once the
|
||||
budget is spent the workflow proceeds anyway, and the human approval
|
||||
step is the final backstop.
|
||||
"""
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
|
||||
# Automated revision passes allowed. `research` runs at most
|
||||
# MAX_REFLEXION_REVISIONS + 1 times per user pass. Bump to allow more.
|
||||
MAX_REFLEXION_REVISIONS = 2
|
||||
|
||||
|
||||
def load_state():
|
||||
path = os.environ.get("GRAPH_STATE_FILE")
|
||||
if path:
|
||||
with open(path) as f:
|
||||
return json.load(f)
|
||||
return json.loads(os.environ.get("GRAPH_STATE", "{}"))
|
||||
|
||||
|
||||
def as_int(value, default=0):
|
||||
try:
|
||||
return int(value)
|
||||
except (TypeError, ValueError):
|
||||
return default
|
||||
|
||||
|
||||
def parse_verdict(critique):
|
||||
"""Pull PASS/REVISE from the critique's `VERDICT:` line. Defaults to
|
||||
PASS when no verdict line is found, so a malformed critique lets the
|
||||
workflow proceed instead of burning the whole revision budget."""
|
||||
match = re.search(r"VERDICT:\s*([A-Za-z]+)", critique, re.IGNORECASE)
|
||||
if not match:
|
||||
return "PASS"
|
||||
return match.group(1).upper()
|
||||
|
||||
|
||||
def main():
|
||||
state = load_state()
|
||||
critique = state.get("critique") or ""
|
||||
verdict = parse_verdict(critique)
|
||||
attempts = as_int(state.get("research_attempts"))
|
||||
|
||||
if verdict == "REVISE" and attempts < MAX_REFLEXION_REVISIONS:
|
||||
feedback = (
|
||||
"A reviewer judged the previous research pass incomplete. "
|
||||
"Address every point in the critique below:\n\n" + critique
|
||||
)
|
||||
output = {
|
||||
"_next": "research_each_question",
|
||||
"research_attempts": attempts + 1,
|
||||
"research_feedback": feedback,
|
||||
}
|
||||
else:
|
||||
output = {"_next": "synthesize"}
|
||||
|
||||
print(json.dumps(output))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -0,0 +1,69 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Check that the sources cited in the research report are reachable.
|
||||
|
||||
Scans the final report for URLs and DOIs, probes each with a HEAD
|
||||
request, and writes a `source_check` summary into state so the human
|
||||
reviewer sees broken citations at the approval step.
|
||||
|
||||
Times out per request so a slow source cannot stall the graph.
|
||||
"""
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import urllib.error
|
||||
import urllib.request
|
||||
|
||||
DOI_RE = re.compile(r"\b(10\.\d{4,9}/[-._;()/:A-Z0-9]+)", re.IGNORECASE)
|
||||
URL_RE = re.compile(r"https?://[^\s)\]\}\"'>]+")
|
||||
|
||||
|
||||
def load_state():
|
||||
path = os.environ.get("GRAPH_STATE_FILE")
|
||||
if path:
|
||||
with open(path) as f:
|
||||
return json.load(f)
|
||||
return json.loads(os.environ.get("GRAPH_STATE", "{}"))
|
||||
|
||||
|
||||
def reachable(url, timeout=5.0):
|
||||
req = urllib.request.Request(url, method="HEAD")
|
||||
try:
|
||||
with urllib.request.urlopen(req, timeout=timeout) as resp:
|
||||
return 200 <= resp.status < 400
|
||||
except urllib.error.HTTPError as e:
|
||||
return 200 <= e.code < 400
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
def main():
|
||||
state = load_state()
|
||||
report = state.get("report") or ""
|
||||
|
||||
urls = sorted({u.rstrip(".,;)") for u in URL_RE.findall(report)})
|
||||
dois = sorted(set(DOI_RE.findall(report)))
|
||||
|
||||
results = []
|
||||
for url in urls:
|
||||
ok = reachable(url)
|
||||
results.append(f" {'OK' if ok else 'UNREACHABLE'} {url}")
|
||||
for doi in dois:
|
||||
url = f"https://doi.org/{doi}"
|
||||
if url in urls:
|
||||
continue
|
||||
ok = reachable(url)
|
||||
results.append(f" {'OK' if ok else 'UNREACHABLE'} DOI {doi} ({url})")
|
||||
|
||||
if not results:
|
||||
summary = "No web sources were cited in the report."
|
||||
else:
|
||||
summary = (
|
||||
f"Source reachability ({len(results)} checked):\n"
|
||||
+ "\n".join(results)
|
||||
)
|
||||
|
||||
print(json.dumps({"source_check": summary}))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user