390 lines
11 KiB
Rust
390 lines
11 KiB
Rust
use super::types::Reducer;
|
|
use crate::graph::type_name;
|
|
use anyhow::{Result, bail};
|
|
use serde_json::{Number, Value};
|
|
|
|
pub fn apply(reducer: Reducer, current: Option<&Value>, incoming: Value) -> Result<Value> {
|
|
match reducer {
|
|
Reducer::Append => apply_append(current, incoming),
|
|
Reducer::Extend => apply_extend(current, incoming),
|
|
Reducer::Concat => apply_concat(current, incoming),
|
|
Reducer::Sum => apply_sum(current, incoming),
|
|
Reducer::Max => apply_max(current, incoming),
|
|
Reducer::Min => apply_min(current, incoming),
|
|
Reducer::Merge => apply_merge(current, incoming),
|
|
Reducer::Overwrite => Ok(incoming),
|
|
}
|
|
}
|
|
|
|
fn apply_append(current: Option<&Value>, incoming: Value) -> Result<Value> {
|
|
let mut arr = match current {
|
|
None => Vec::new(),
|
|
Some(Value::Array(a)) => a.clone(),
|
|
Some(other) => bail!(
|
|
"reducer 'append' requires an array (or absent) for the current value, got {}",
|
|
type_name(other)
|
|
),
|
|
};
|
|
arr.push(incoming);
|
|
|
|
Ok(Value::Array(arr))
|
|
}
|
|
|
|
fn apply_extend(current: Option<&Value>, incoming: Value) -> Result<Value> {
|
|
let mut arr = match current {
|
|
None => Vec::new(),
|
|
Some(Value::Array(a)) => a.clone(),
|
|
Some(other) => bail!(
|
|
"reducer 'extend' requires an array (or absent) for the current value, got {}",
|
|
type_name(other)
|
|
),
|
|
};
|
|
match incoming {
|
|
Value::Array(items) => arr.extend(items),
|
|
other => bail!(
|
|
"reducer 'extend' requires an array for the incoming value, got {}",
|
|
type_name(&other)
|
|
),
|
|
}
|
|
|
|
Ok(Value::Array(arr))
|
|
}
|
|
|
|
fn apply_concat(current: Option<&Value>, incoming: Value) -> Result<Value> {
|
|
let incoming_str = match incoming {
|
|
Value::String(s) => s,
|
|
other => bail!(
|
|
"reducer 'concat' requires a string for the incoming value, got {}",
|
|
type_name(&other)
|
|
),
|
|
};
|
|
let result = match current {
|
|
None => incoming_str,
|
|
Some(Value::String(c)) => {
|
|
if c.is_empty() {
|
|
incoming_str
|
|
} else {
|
|
format!("{c}\n{incoming_str}")
|
|
}
|
|
}
|
|
Some(other) => bail!(
|
|
"reducer 'concat' requires a string (or absent) for the current value, got {}",
|
|
type_name(other)
|
|
),
|
|
};
|
|
|
|
Ok(Value::String(result))
|
|
}
|
|
|
|
fn apply_sum(current: Option<&Value>, incoming: Value) -> Result<Value> {
|
|
let i = number_or_error(&incoming, "sum", "incoming")?;
|
|
let c = match current {
|
|
None => 0.0,
|
|
Some(value) => number_or_error(value, "sum", "current")?,
|
|
};
|
|
|
|
Ok(json_number(c + i))
|
|
}
|
|
|
|
fn apply_max(current: Option<&Value>, incoming: Value) -> Result<Value> {
|
|
let i = number_or_error(&incoming, "max", "incoming")?;
|
|
match current {
|
|
None => Ok(json_number(i)),
|
|
Some(value) => {
|
|
let c = number_or_error(value, "max", "current")?;
|
|
Ok(json_number(c.max(i)))
|
|
}
|
|
}
|
|
}
|
|
|
|
fn apply_min(current: Option<&Value>, incoming: Value) -> Result<Value> {
|
|
let i = number_or_error(&incoming, "min", "incoming")?;
|
|
match current {
|
|
None => Ok(json_number(i)),
|
|
Some(value) => {
|
|
let c = number_or_error(value, "min", "current")?;
|
|
Ok(json_number(c.min(i)))
|
|
}
|
|
}
|
|
}
|
|
|
|
fn apply_merge(current: Option<&Value>, incoming: Value) -> Result<Value> {
|
|
let mut map = match current {
|
|
None => serde_json::Map::new(),
|
|
Some(Value::Object(m)) => m.clone(),
|
|
Some(other) => bail!(
|
|
"reducer 'merge' requires an object (or absent) for the current value, got {}",
|
|
type_name(other)
|
|
),
|
|
};
|
|
match incoming {
|
|
Value::Object(items) => {
|
|
for (k, v) in items {
|
|
map.insert(k, v);
|
|
}
|
|
}
|
|
other => bail!(
|
|
"reducer 'merge' requires an object for the incoming value, got {}",
|
|
type_name(&other)
|
|
),
|
|
}
|
|
|
|
Ok(Value::Object(map))
|
|
}
|
|
|
|
fn number_or_error(value: &Value, reducer_name: &str, position: &str) -> Result<f64> {
|
|
match value.as_f64() {
|
|
Some(n) => Ok(n),
|
|
None => bail!(
|
|
"reducer '{reducer_name}' requires a number for the {position} value, got {}",
|
|
type_name(value)
|
|
),
|
|
}
|
|
}
|
|
|
|
// Numeric reducers compute in f64 for simplicity. Integer typing is preserved when the result is losslessly
|
|
// representable as i64.
|
|
fn json_number(n: f64) -> Value {
|
|
if n.fract() == 0.0 && n.is_finite() && n.abs() <= (i64::MAX as f64) {
|
|
Value::Number(Number::from(n as i64))
|
|
} else {
|
|
match Number::from_f64(n) {
|
|
Some(num) => Value::Number(num),
|
|
None => Value::Null,
|
|
}
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
use serde_json::json;
|
|
|
|
#[test]
|
|
fn append_to_absent_creates_single_element_array() {
|
|
let result = apply(Reducer::Append, None, json!("a")).unwrap();
|
|
|
|
assert_eq!(result, json!(["a"]));
|
|
}
|
|
|
|
#[test]
|
|
fn append_pushes_onto_existing_array() {
|
|
let current = json!(["a", "b"]);
|
|
let result = apply(Reducer::Append, Some(¤t), json!("c")).unwrap();
|
|
|
|
assert_eq!(result, json!(["a", "b", "c"]));
|
|
}
|
|
|
|
#[test]
|
|
fn append_errors_when_current_is_not_array() {
|
|
let current = json!("not an array");
|
|
|
|
let err = apply(Reducer::Append, Some(¤t), json!("x"))
|
|
.unwrap_err()
|
|
.to_string();
|
|
|
|
assert!(err.contains("'append'"), "got: {err}");
|
|
assert!(err.contains("string"), "got: {err}");
|
|
}
|
|
|
|
#[test]
|
|
fn extend_concatenates_arrays() {
|
|
let current = json!([1, 2]);
|
|
|
|
let result = apply(Reducer::Extend, Some(¤t), json!([3, 4])).unwrap();
|
|
|
|
assert_eq!(result, json!([1, 2, 3, 4]));
|
|
}
|
|
|
|
#[test]
|
|
fn extend_from_absent_with_array() {
|
|
let result = apply(Reducer::Extend, None, json!([1, 2])).unwrap();
|
|
|
|
assert_eq!(result, json!([1, 2]));
|
|
}
|
|
|
|
#[test]
|
|
fn extend_errors_when_incoming_is_not_array() {
|
|
let err = apply(Reducer::Extend, None, json!(42))
|
|
.unwrap_err()
|
|
.to_string();
|
|
|
|
assert!(err.contains("'extend'"), "got: {err}");
|
|
assert!(err.contains("number"), "got: {err}");
|
|
assert!(err.contains("incoming"), "got: {err}");
|
|
}
|
|
|
|
#[test]
|
|
fn concat_joins_strings_with_newline() {
|
|
let current = json!("first");
|
|
|
|
let result = apply(Reducer::Concat, Some(¤t), json!("second")).unwrap();
|
|
|
|
assert_eq!(result, json!("first\nsecond"));
|
|
}
|
|
|
|
#[test]
|
|
fn concat_from_absent_yields_incoming() {
|
|
let result = apply(Reducer::Concat, None, json!("hello")).unwrap();
|
|
|
|
assert_eq!(result, json!("hello"));
|
|
}
|
|
|
|
#[test]
|
|
fn concat_skips_separator_when_current_is_empty_string() {
|
|
let current = json!("");
|
|
|
|
let result = apply(Reducer::Concat, Some(¤t), json!("first")).unwrap();
|
|
|
|
assert_eq!(result, json!("first"));
|
|
}
|
|
|
|
#[test]
|
|
fn concat_errors_when_incoming_is_not_string() {
|
|
let err = apply(Reducer::Concat, None, json!(42))
|
|
.unwrap_err()
|
|
.to_string();
|
|
|
|
assert!(err.contains("'concat'"), "got: {err}");
|
|
assert!(err.contains("number"), "got: {err}");
|
|
}
|
|
|
|
#[test]
|
|
fn sum_adds_numbers() {
|
|
let current = json!(5);
|
|
|
|
let result = apply(Reducer::Sum, Some(¤t), json!(7)).unwrap();
|
|
|
|
assert_eq!(result, json!(12));
|
|
}
|
|
|
|
#[test]
|
|
fn sum_starts_from_zero_when_current_absent() {
|
|
let result = apply(Reducer::Sum, None, json!(3.5)).unwrap();
|
|
|
|
assert_eq!(result, json!(3.5));
|
|
}
|
|
|
|
#[test]
|
|
fn sum_preserves_integer_type_for_whole_results() {
|
|
let current = json!(2);
|
|
|
|
let result = apply(Reducer::Sum, Some(¤t), json!(3)).unwrap();
|
|
|
|
assert!(result.is_i64(), "expected integer, got {result:?}");
|
|
assert_eq!(result, json!(5));
|
|
}
|
|
|
|
#[test]
|
|
fn sum_uses_float_when_result_has_fractional() {
|
|
let current = json!(1.5);
|
|
let result = apply(Reducer::Sum, Some(¤t), json!(2.25)).unwrap();
|
|
|
|
assert_eq!(result, json!(3.75));
|
|
}
|
|
|
|
#[test]
|
|
fn sum_errors_on_string_incoming() {
|
|
let err = apply(Reducer::Sum, None, json!("not a number"))
|
|
.unwrap_err()
|
|
.to_string();
|
|
|
|
assert!(err.contains("'sum'"), "got: {err}");
|
|
assert!(err.contains("string"), "got: {err}");
|
|
}
|
|
|
|
#[test]
|
|
fn max_returns_larger_of_two() {
|
|
let current = json!(5);
|
|
let result = apply(Reducer::Max, Some(¤t), json!(3)).unwrap();
|
|
assert_eq!(result, json!(5));
|
|
|
|
let result = apply(Reducer::Max, Some(¤t), json!(10)).unwrap();
|
|
assert_eq!(result, json!(10));
|
|
}
|
|
|
|
#[test]
|
|
fn max_yields_incoming_when_current_absent() {
|
|
let result = apply(Reducer::Max, None, json!(42)).unwrap();
|
|
|
|
assert_eq!(result, json!(42));
|
|
}
|
|
|
|
#[test]
|
|
fn min_returns_smaller_of_two() {
|
|
let current = json!(5);
|
|
let result = apply(Reducer::Min, Some(¤t), json!(3)).unwrap();
|
|
assert_eq!(result, json!(3));
|
|
|
|
let result = apply(Reducer::Min, Some(¤t), json!(10)).unwrap();
|
|
assert_eq!(result, json!(5));
|
|
}
|
|
|
|
#[test]
|
|
fn min_errors_on_non_numeric_current() {
|
|
let current = json!("oops");
|
|
|
|
let err = apply(Reducer::Min, Some(¤t), json!(1))
|
|
.unwrap_err()
|
|
.to_string();
|
|
|
|
assert!(err.contains("'min'"), "got: {err}");
|
|
assert!(err.contains("current"), "got: {err}");
|
|
}
|
|
|
|
#[test]
|
|
fn merge_unions_objects_with_incoming_winning_collisions() {
|
|
let current = json!({ "a": 1, "b": 2 });
|
|
let incoming = json!({ "b": 99, "c": 3 });
|
|
|
|
let result = apply(Reducer::Merge, Some(¤t), incoming).unwrap();
|
|
|
|
assert_eq!(result, json!({ "a": 1, "b": 99, "c": 3 }));
|
|
}
|
|
|
|
#[test]
|
|
fn merge_from_absent_yields_incoming_object() {
|
|
let result = apply(Reducer::Merge, None, json!({ "k": "v" })).unwrap();
|
|
|
|
assert_eq!(result, json!({ "k": "v" }));
|
|
}
|
|
|
|
#[test]
|
|
fn merge_errors_when_incoming_is_not_object() {
|
|
let err = apply(Reducer::Merge, None, json!([1, 2]))
|
|
.unwrap_err()
|
|
.to_string();
|
|
|
|
assert!(err.contains("'merge'"), "got: {err}");
|
|
assert!(err.contains("array"), "got: {err}");
|
|
}
|
|
|
|
#[test]
|
|
fn merge_errors_when_current_is_not_object() {
|
|
let current = json!("not object");
|
|
|
|
let err = apply(Reducer::Merge, Some(¤t), json!({ "k": "v" }))
|
|
.unwrap_err()
|
|
.to_string();
|
|
|
|
assert!(err.contains("'merge'"), "got: {err}");
|
|
assert!(err.contains("current"), "got: {err}");
|
|
}
|
|
|
|
#[test]
|
|
fn overwrite_ignores_current_and_returns_incoming() {
|
|
let current = json!("old");
|
|
|
|
let result = apply(Reducer::Overwrite, Some(¤t), json!("new")).unwrap();
|
|
|
|
assert_eq!(result, json!("new"));
|
|
}
|
|
|
|
#[test]
|
|
fn overwrite_works_with_absent_current() {
|
|
let result = apply(Reducer::Overwrite, None, json!(42)).unwrap();
|
|
|
|
assert_eq!(result, json!(42));
|
|
}
|
|
}
|