feat: implemented the frontier-based scheduling for the graph executor with simplified state management (gotta love .clone)

This commit is contained in:
2026-05-20 13:48:55 -06:00
parent 82a060b277
commit de2a8dcf89
6 changed files with 341 additions and 136 deletions
+118 -8
View File
@@ -159,13 +159,44 @@ impl StateManager {
}
}
/// Returns an `Arc`-wrapped snapshot of the current graph state. Each branch
/// in a parallel super-step shares this snapshot for reads; their writes
/// accumulate into per-branch `StagingArea` instances, which are merged via
/// `apply_branch_writes` at the end of the super-step.
/// Forks state for a parallel branch: returns a fully-owned `StateManager`
/// seeded from the current state's data. The branch mutates its fork
/// freely; callers extract its writes via `diff_against` after the branch
/// completes, then merge them via `apply_branch_writes`.
///
/// Distinct from the older `snapshot()` method (returns a `HashMap` clone of
/// the data only — used by `script_executor` to ship state to child processes).
/// Distinct from `read_snapshot` (returns a shared `Arc<GraphState>` for
/// reads) — `fork_for_branch_state` returns a writable owned clone.
pub fn fork_for_branch_state(&self) -> Self {
Self {
state: self.state.clone(),
temp_file: None,
}
}
/// Returns the keys whose values differ from `snapshot`. Use this after a
/// branch finishes to extract its writes (input to `apply_branch_writes`).
/// Keys present in `self` but absent from `snapshot`, or with different
/// values, count as writes. Deletions are not represented (no current node
/// executor deletes state).
pub fn diff_against(&self, snapshot: &GraphState) -> HashMap<String, Value> {
let mut diff = HashMap::new();
for (k, v) in self.state.data() {
if snapshot.get(k) != Some(v) {
diff.insert(k.clone(), v.clone());
}
}
diff
}
/// Returns an `Arc`-wrapped snapshot of the current graph state. Each
/// branch in a parallel super-step uses this snapshot as the baseline for
/// its `diff_against` call at branch end. The executor extracts each
/// branch's writes (the diff) and merges them via `apply_branch_writes` at
/// the super-step boundary.
///
/// Distinct from the older `snapshot()` method (returns a `HashMap` clone
/// of the data only — used by `script_executor` to ship state to child
/// processes).
#[allow(dead_code)]
pub fn read_snapshot(&self) -> Arc<GraphState> {
Arc::new(self.state.clone())
@@ -936,12 +967,91 @@ mod tests {
#[test]
fn interpolate_raw_inner_spaces_treated_as_mixed() {
let manager = manager_with(&[("k", json!("v"))]);
// `{{ k }}` is not a valid pure reference (spaces inside braces are
// outside the allowed character set). Fall back to string interpolation
// -- which doesn't match the regex either, so the literal passes through.
let result = manager.interpolate_raw("{{ k }}").unwrap();
assert_eq!(result, json!("{{ k }}"));
}
#[test]
fn fork_for_branch_state_copies_data() {
let parent = manager_with(&[("a", json!(1)), ("b", json!("x"))]);
let fork = parent.fork_for_branch_state();
assert_eq!(fork.state().get("a"), Some(&json!(1)));
assert_eq!(fork.state().get("b"), Some(&json!("x")));
}
#[test]
fn fork_for_branch_state_isolates_writes_from_parent() {
let parent = manager_with(&[("count", json!(10))]);
let mut fork = parent.fork_for_branch_state();
fork.state_mut().set("count".into(), json!(999));
assert_eq!(fork.state().get("count"), Some(&json!(999)));
assert_eq!(parent.state().get("count"), Some(&json!(10)));
}
#[test]
fn fork_for_branch_state_does_not_share_temp_file_lifecycle() {
let parent = manager_with(&[("k", json!("v"))]);
let fork = parent.fork_for_branch_state();
assert!(fork.temp_file.is_none());
// Dropping the fork must not affect the parent's data
drop(fork);
assert_eq!(parent.state().get("k"), Some(&json!("v")));
}
#[test]
fn diff_against_returns_empty_when_unchanged() {
let original = manager_with(&[("a", json!(1)), ("b", json!(2))]);
let fork = original.fork_for_branch_state();
let diff = fork.diff_against(original.state());
assert!(diff.is_empty());
}
#[test]
fn diff_against_reports_newly_written_keys() {
let original = manager_with(&[]);
let mut fork = original.fork_for_branch_state();
fork.state_mut().set("new".into(), json!(42));
let diff = fork.diff_against(original.state());
assert_eq!(diff.len(), 1);
assert_eq!(diff.get("new"), Some(&json!(42)));
}
#[test]
fn diff_against_reports_changed_values_only() {
let original = manager_with(&[("a", json!(1)), ("b", json!(2)), ("c", json!(3))]);
let mut fork = original.fork_for_branch_state();
fork.state_mut().set("b".into(), json!(99));
let diff = fork.diff_against(original.state());
assert_eq!(diff.len(), 1);
assert_eq!(diff.get("b"), Some(&json!(99)));
assert!(!diff.contains_key("a"));
assert!(!diff.contains_key("c"));
}
#[test]
fn diff_against_does_not_report_reverted_writes() {
// Branch writes then writes back to the original value; net change = 0.
let original = manager_with(&[("x", json!("initial"))]);
let mut fork = original.fork_for_branch_state();
fork.state_mut().set("x".into(), json!("modified"));
fork.state_mut().set("x".into(), json!("initial"));
let diff = fork.diff_against(original.state());
assert!(diff.is_empty(), "reverted write should not appear in diff");
}
}