new aggr implementations

main
Ziyang Hu 2 years ago
parent bfa15ab043
commit 0978415b19

@ -1,25 +1,34 @@
use std::collections::{BTreeMap, BTreeSet};
use std::fmt::{Debug, Formatter};
use std::ops::Sub;
use anyhow::{anyhow, bail, ensure, Result};
use itertools::Itertools;
use crate::data::value::{DataValue, Number};
use crate::data::value::DataValue;
#[derive(Clone)]
pub(crate) struct Aggregation {
pub(crate) name: &'static str,
pub(crate) meet_combine: fn(&mut DataValue, &DataValue, &[DataValue]) -> Result<bool>,
pub(crate) is_meet: bool,
pub(crate) meet_op: Option<Box<dyn MeetAggrObj>>,
pub(crate) normal_op: Option<Box<dyn NormalAggrObj>>,
}
trait NormalAggrObj {
impl Clone for Aggregation {
fn clone(&self) -> Self {
Self {
name: self.name,
is_meet: self.is_meet,
meet_op: None,
normal_op: None,
}
}
}
pub(crate) trait NormalAggrObj {
fn set(&mut self, value: &DataValue) -> Result<()>;
fn get(&self) -> Result<DataValue>;
}
trait MeetAggrObj {
pub(crate) trait MeetAggrObj {
fn update(&self, left: &mut DataValue, right: &DataValue) -> Result<bool>;
}
@ -39,29 +48,16 @@ macro_rules! define_aggr {
($name:ident, $is_meet:expr) => {
const $name: Aggregation = Aggregation {
name: stringify!($name),
meet_combine: ::casey::lower!($name),
is_meet: $is_meet,
meet_op: None,
normal_op: None,
};
};
}
define_aggr!(AGGR_UNIQUE, false);
fn aggr_unique(accum: &mut DataValue, current: &DataValue, _args: &[DataValue]) -> Result<bool> {
Ok(match (accum, current) {
(accum @ DataValue::Guard, DataValue::Guard) => {
*accum = DataValue::Set(Default::default());
true
}
(accum @ DataValue::Guard, val) => {
*accum = DataValue::Set(BTreeSet::from([val.clone()]));
true
}
(_, DataValue::Guard) => false,
(DataValue::Set(l), val) => l.insert(val.clone()),
_ => unreachable!(),
})
}
#[derive(Default)]
struct AggrUnique {
accum: BTreeSet<DataValue>,
}
@ -78,41 +74,8 @@ impl NormalAggrObj for AggrUnique {
}
define_aggr!(AGGR_GROUP_COUNT, false);
fn aggr_group_count(
accum: &mut DataValue,
current: &DataValue,
_args: &[DataValue],
) -> Result<bool> {
dbg!(&current);
Ok(match (accum, current) {
(accum @ DataValue::Guard, DataValue::Guard) => {
*accum = DataValue::List(vec![]);
true
}
(accum @ DataValue::Guard, val) => {
*accum = DataValue::Map(BTreeMap::from([(val.clone(), DataValue::from(1))]));
true
}
(accum, DataValue::Guard) => {
*accum = DataValue::List(
accum
.get_map()
.unwrap()
.iter()
.map(|(k, v)| DataValue::List(vec![k.clone(), v.clone()]))
.collect_vec(),
);
true
}
(DataValue::Map(l), val) => {
let entry = l.entry(val.clone()).or_insert_with(|| DataValue::from(0));
*entry = DataValue::from(entry.get_int().unwrap() + 1);
true
}
_ => unreachable!(),
})
}
#[derive(Default)]
struct AggrGroupCount {
accum: BTreeMap<DataValue, i64>,
}
@ -135,29 +98,8 @@ impl NormalAggrObj for AggrGroupCount {
}
define_aggr!(AGGR_COUNT_UNIQUE, false);
fn aggr_count_unique(
accum: &mut DataValue,
current: &DataValue,
_args: &[DataValue],
) -> Result<bool> {
Ok(match (accum, current) {
(accum @ DataValue::Guard, DataValue::Guard) => {
*accum = DataValue::from(0);
true
}
(accum @ DataValue::Guard, val) => {
*accum = DataValue::Set(BTreeSet::from([val.clone()]));
true
}
(accum, DataValue::Guard) => {
*accum = DataValue::from(accum.get_set().unwrap().len() as i64);
true
}
(DataValue::Set(l), val) => l.insert(val.clone()),
_ => unreachable!(),
})
}
#[derive(Default)]
struct AggrCountUnique {
count: i64,
accum: BTreeSet<DataValue>,
@ -178,42 +120,8 @@ impl NormalAggrObj for AggrCountUnique {
}
define_aggr!(AGGR_UNION, true);
fn aggr_union(accum: &mut DataValue, current: &DataValue, _args: &[DataValue]) -> Result<bool> {
Ok(match (accum, current) {
(accum @ DataValue::Guard, DataValue::Guard) => {
*accum = DataValue::Set(Default::default());
true
}
(accum @ DataValue::Guard, DataValue::Set(s)) => {
*accum = DataValue::Set(s.clone());
true
}
(accum @ DataValue::Guard, DataValue::List(s)) => {
*accum = DataValue::Set(s.iter().cloned().collect());
true
}
(_, DataValue::Guard) => false,
(DataValue::Set(l), DataValue::Set(s)) => {
if s.is_subset(l) {
false
} else {
l.extend(s.iter().cloned());
true
}
}
(DataValue::Set(l), DataValue::List(s)) => {
let s: BTreeSet<_> = s.iter().cloned().collect();
if s.is_subset(l) {
false
} else {
l.extend(s);
true
}
}
(_, v) => bail!("cannot compute 'union' for value {:?}", v),
})
}
#[derive(Default)]
struct AggrUnion {
accum: BTreeSet<DataValue>,
}
@ -264,47 +172,8 @@ impl MeetAggrObj for MeetAggrUnion {
}
define_aggr!(AGGR_INTERSECTION, true);
fn aggr_intersection(
accum: &mut DataValue,
current: &DataValue,
_args: &[DataValue],
) -> Result<bool> {
Ok(match (accum, current) {
(DataValue::Guard, DataValue::Guard) => false,
(accum @ DataValue::Guard, DataValue::Set(s)) => {
*accum = DataValue::Set(s.clone());
true
}
(accum @ DataValue::Guard, DataValue::List(s)) => {
*accum = DataValue::Set(s.iter().cloned().collect());
true
}
(_, DataValue::Guard) => false,
(DataValue::Set(l), DataValue::Set(s)) => {
if l.is_empty() || l.is_subset(s) {
false
} else {
*l = l.sub(s);
true
}
}
(DataValue::Set(l), DataValue::List(s)) => {
if l.is_empty() {
false
} else {
let s: BTreeSet<_> = s.iter().cloned().collect();
if l.is_subset(&s) {
false
} else {
*l = l.sub(&s);
true
}
}
}
(_, v) => bail!("cannot compute 'intersection' for value {:?}", v),
})
}
#[derive(Default)]
struct AggrIntersection {
accum: BTreeSet<DataValue>,
}
@ -328,6 +197,7 @@ impl NormalAggrObj for AggrIntersection {
}
struct MeetAggrIntersection;
impl MeetAggrObj for MeetAggrIntersection {
fn update(&self, left: &mut DataValue, right: &DataValue) -> Result<bool> {
loop {
@ -358,48 +228,29 @@ impl MeetAggrObj for MeetAggrIntersection {
}
define_aggr!(AGGR_COLLECT, false);
fn aggr_collect(accum: &mut DataValue, current: &DataValue, args: &[DataValue]) -> Result<bool> {
Ok(match (accum, current) {
(accum @ DataValue::Guard, DataValue::Guard) => {
if let Some(limit) = args.get(0) {
let limit = limit
.get_int()
.ok_or_else(|| anyhow!("collect limit must be an integer"))?;
ensure!(limit > 0, "collect limit must be positive, got {}", limit);
}
*accum = DataValue::List(vec![]);
true
}
(accum @ DataValue::Guard, val) => {
if let Some(limit) = args.get(0) {
let limit = limit
.get_int()
.ok_or_else(|| anyhow!("collect limit must be an integer"))?;
ensure!(limit > 0, "collect limit must be positive, got {}", limit);
}
*accum = DataValue::List(vec![val.clone()]);
true
}
(_, DataValue::Guard) => false,
(DataValue::List(l), val) => {
if let Some(limit) = args.get(0).and_then(|v| v.get_int()) {
if l.len() >= (limit as usize) {
return Ok(false);
}
}
l.push(val.clone());
true
}
_ => unreachable!(),
})
}
#[derive(Default)]
struct AggrCollect {
limit: Option<usize>,
accum: Vec<DataValue>,
}
impl AggrCollect {
fn new(limit: usize) -> Self {
Self {
limit: Some(limit),
accum: vec![],
}
}
}
impl NormalAggrObj for AggrCollect {
fn set(&mut self, value: &DataValue) -> Result<()> {
if let Some(limit) = self.limit {
if self.accum.len() >= limit {
return Ok(());
}
}
self.accum.push(value.clone());
Ok(())
}
@ -410,25 +261,8 @@ impl NormalAggrObj for AggrCollect {
}
define_aggr!(AGGR_COUNT, false);
fn aggr_count(accum: &mut DataValue, current: &DataValue, _args: &[DataValue]) -> Result<bool> {
Ok(match (accum, current) {
(accum @ DataValue::Guard, DataValue::Guard) => {
*accum = DataValue::Number(Number::Int(0));
true
}
(accum @ DataValue::Guard, _) => {
*accum = DataValue::Number(Number::Int(1));
true
}
(DataValue::Number(Number::Int(_)), DataValue::Guard) => false,
(DataValue::Number(Number::Int(i)), _) => {
*i += 1;
true
}
_ => unreachable!(),
})
}
#[derive(Default)]
struct AggrCount {
count: i64,
}
@ -445,38 +279,8 @@ impl NormalAggrObj for AggrCount {
}
define_aggr!(AGGR_MEAN, false);
fn aggr_mean(accum: &mut DataValue, current: &DataValue, _args: &[DataValue]) -> Result<bool> {
Ok(match (accum, current) {
(accum @ DataValue::Guard, DataValue::Guard) => {
*accum = DataValue::from(0.);
true
}
(accum @ DataValue::Guard, DataValue::Number(n)) => {
*accum = DataValue::List(vec![DataValue::from(n.get_float()), DataValue::from(1)]);
true
}
(accum @ DataValue::List(_), DataValue::Guard) => {
let args = accum.get_list().unwrap();
let total = args[0].get_float().unwrap();
let count = args[1].get_float().unwrap();
*accum = DataValue::from(total / count);
true
}
(DataValue::List(l), DataValue::Number(j)) => {
let new_total = l[0].get_float().unwrap() + j.get_float();
l[0] = DataValue::from(new_total);
let new_count = l[1].get_int().unwrap() + 1;
l[1] = DataValue::from(new_count);
true
}
(i, j) => bail!(
"cannot compute mean: encountered value {:?} for aggregate {:?}",
j,
i
),
})
}
#[derive(Default)]
struct AggrMean {
count: i64,
sum: f64,
@ -500,46 +304,8 @@ impl NormalAggrObj for AggrMean {
}
define_aggr!(AGGR_SUM, false);
fn aggr_sum(accum: &mut DataValue, current: &DataValue, _args: &[DataValue]) -> Result<bool> {
Ok(match (accum, current) {
(accum @ DataValue::Guard, DataValue::Guard) => {
*accum = DataValue::Number(Number::Int(0));
true
}
(accum @ DataValue::Guard, DataValue::Number(Number::Int(i))) => {
*accum = DataValue::Number(Number::Int(*i));
true
}
(accum @ DataValue::Guard, DataValue::Number(Number::Float(f))) => {
*accum = DataValue::Number(Number::Float(*f));
true
}
(DataValue::Number(_), DataValue::Guard) => false,
(DataValue::Number(i), DataValue::Number(j)) => {
match (*i, *j) {
(Number::Int(a), Number::Int(b)) => {
*i = Number::Int(a + b);
}
(Number::Float(a), Number::Float(b)) => {
*i = Number::Float(a + b);
}
(Number::Int(a), Number::Float(b)) => {
*i = Number::Float((a as f64) + b);
}
(Number::Float(a), Number::Int(b)) => {
*i = Number::Float(a + (b as f64));
}
}
true
}
(i, j) => bail!(
"cannot compute min: encountered value {:?} for aggregate {:?}",
j,
i
),
})
}
#[derive(Default)]
struct AggrSum {
sum: f64,
}
@ -561,33 +327,19 @@ impl NormalAggrObj for AggrSum {
}
define_aggr!(AGGR_MIN, true);
fn aggr_min(accum: &mut DataValue, current: &DataValue, _args: &[DataValue]) -> Result<bool> {
Ok(match (accum, current) {
(accum @ DataValue::Guard, DataValue::Number(n)) => {
*accum = DataValue::Number(*n);
true
}
(_, DataValue::Guard) => false,
(DataValue::Number(i), DataValue::Number(j)) => {
if *i <= *j {
false
} else {
*i = *j;
true
}
}
(i, j) => bail!(
"cannot compute min: encountered value {:?} for aggregate {:?}",
j,
i
),
})
}
struct AggrMin {
found: DataValue,
}
impl Default for AggrMin {
fn default() -> Self {
Self {
found: DataValue::Bottom,
}
}
}
impl NormalAggrObj for AggrMin {
fn set(&mut self, value: &DataValue) -> Result<()> {
if *value < self.found {
@ -615,33 +367,19 @@ impl MeetAggrObj for MeetAggrMin {
}
define_aggr!(AGGR_MAX, true);
fn aggr_max(accum: &mut DataValue, current: &DataValue, _args: &[DataValue]) -> Result<bool> {
Ok(match (accum, current) {
(accum @ DataValue::Guard, DataValue::Number(n)) => {
*accum = DataValue::Number(*n);
true
}
(_, DataValue::Guard) => false,
(DataValue::Number(i), DataValue::Number(j)) => {
if *i >= *j {
false
} else {
*i = *j;
true
}
}
(i, j) => bail!(
"cannot compute max: encountered value {:?} for aggregate {:?}",
j,
i
),
})
}
struct AggrMax {
found: DataValue,
}
impl Default for AggrMax {
fn default() -> Self {
Self {
found: DataValue::Null,
}
}
}
impl NormalAggrObj for AggrMax {
fn set(&mut self, value: &DataValue) -> Result<()> {
if *value > self.found {
@ -669,16 +407,8 @@ impl MeetAggrObj for MeetAggrMax {
}
define_aggr!(AGGR_CHOICE, true);
fn aggr_choice(accum: &mut DataValue, current: &DataValue, _args: &[DataValue]) -> Result<bool> {
Ok(match (accum, current) {
(accum @ DataValue::Guard, v) => {
*accum = v.clone();
true
}
_ => false,
})
}
#[derive(Default)]
struct AggrChoice {
found: Option<DataValue>,
}
@ -705,50 +435,21 @@ impl MeetAggrObj for MeetAggrChoice {
}
define_aggr!(AGGR_MIN_COST, true);
fn aggr_min_cost(accum: &mut DataValue, current: &DataValue, _args: &[DataValue]) -> Result<bool> {
Ok(match (accum, current) {
(accum @ DataValue::Guard, DataValue::Guard) => {
*accum = DataValue::Null;
true
}
(accum @ DataValue::Guard, l @ DataValue::List(_)) => {
if l.get_list().unwrap().len() != 2 {
bail!(
"'min_cost' requires a list of length 2 as argument, got {:?}",
l
);
}
*accum = l.clone();
true
}
(_, DataValue::Guard) => false,
(accum, DataValue::List(l)) => {
if l.len() != 2 {
bail!(
"'min_cost' requires a list of length 2 as argument, got {:?}",
l
);
}
let cur_cost = l.get(1).unwrap();
let prev = accum.get_list().unwrap();
let prev_cost = prev.get(1).unwrap();
if prev_cost <= cur_cost {
false
} else {
*accum = DataValue::List(l.clone());
true
}
}
(_, v) => bail!("cannot compute 'min_cost' on {:?}", v),
})
}
struct AggrMinCost {
found: DataValue,
cost: DataValue,
}
impl Default for AggrMinCost {
fn default() -> Self {
Self {
found: DataValue::Null,
cost: DataValue::Bottom,
}
}
}
impl NormalAggrObj for AggrMinCost {
fn set(&mut self, value: &DataValue) -> Result<()> {
match value {
@ -801,40 +502,23 @@ impl MeetAggrObj for MeetAggrMinCost {
}
define_aggr!(AGGR_SHORTEST, true);
fn aggr_shortest(accum: &mut DataValue, current: &DataValue, _args: &[DataValue]) -> Result<bool> {
Ok(match (accum, current) {
(accum @ DataValue::Guard, DataValue::Guard) => {
*accum = DataValue::Null;
true
}
(accum @ DataValue::Guard, l @ DataValue::List(_)) => {
*accum = l.clone();
true
}
(_, DataValue::Guard) => false,
(accum, DataValue::List(l)) => {
let current = accum.get_list().unwrap();
if current.len() <= l.len() {
false
} else {
*accum = DataValue::List(l.clone());
true
}
}
(_, v) => bail!("cannot compute 'shortest' on {:?}", v),
})
}
#[derive(Default)]
struct AggrShortest {
found: Vec<DataValue>,
found: Option<Vec<DataValue>>,
}
impl NormalAggrObj for AggrShortest {
fn set(&mut self, value: &DataValue) -> Result<()> {
match value {
DataValue::List(l) => {
if l.len() < self.found.len() {
self.found = l.clone();
match self.found {
None => self.found = Some(l.clone()),
Some(ref mut found) => {
if l.len() < found.len() {
*found = l.clone();
}
}
}
Ok(())
}
@ -843,7 +527,10 @@ impl NormalAggrObj for AggrShortest {
}
fn get(&self) -> Result<DataValue> {
Ok(DataValue::List(self.found.clone()))
Ok(match self.found {
None => DataValue::Null,
Some(ref l) => DataValue::List(l.clone()),
})
}
}
@ -864,34 +551,19 @@ impl MeetAggrObj for MeetAggrShortest {
}
define_aggr!(AGGR_COALESCE, true);
fn aggr_coalesce(accum: &mut DataValue, current: &DataValue, _args: &[DataValue]) -> Result<bool> {
Ok(match (accum, current) {
(accum @ DataValue::Guard, DataValue::Guard) => {
*accum = DataValue::Null;
true
}
(accum @ DataValue::Guard, v) => {
*accum = v.clone();
true
}
(_, DataValue::Guard) => false,
(accum, v) => {
if *accum == *v {
false
} else if *accum == DataValue::Null {
*accum = v.clone();
true
} else {
false
}
}
})
}
struct AggrCoalesce {
found: DataValue,
}
impl Default for AggrCoalesce {
fn default() -> Self {
Self {
found: DataValue::Null,
}
}
}
impl NormalAggrObj for AggrCoalesce {
fn set(&mut self, value: &DataValue) -> Result<()> {
if self.found == DataValue::Null {
@ -938,3 +610,58 @@ pub(crate) fn get_aggr(name: &str) -> Option<&'static Aggregation> {
_ => return None,
})
}
impl Aggregation {
pub(crate) fn meet_init(&mut self, _args: &[DataValue]) -> Result<()> {
self.meet_op.replace(match self.name {
name if name == AGGR_MIN.name => Box::new(MeetAggrMin),
name if name == AGGR_MAX.name => Box::new(MeetAggrMax),
name if name == AGGR_CHOICE.name => Box::new(MeetAggrChoice),
name if name == AGGR_UNION.name => Box::new(MeetAggrUnion),
name if name == AGGR_INTERSECTION.name => Box::new(MeetAggrIntersection),
name if name == AGGR_SHORTEST.name => Box::new(MeetAggrShortest),
name if name == AGGR_MIN_COST.name => Box::new(MeetAggrMinCost),
name if name == AGGR_COALESCE.name => Box::new(MeetAggrCoalesce),
_ => unreachable!(),
});
Ok(())
}
pub(crate) fn normal_init(&mut self, args: &[DataValue]) -> Result<()> {
self.normal_op.replace(match self.name {
name if name == AGGR_COUNT.name => Box::new(AggrCount::default()),
name if name == AGGR_GROUP_COUNT.name => Box::new(AggrGroupCount::default()),
name if name == AGGR_COUNT_UNIQUE.name => Box::new(AggrCountUnique::default()),
name if name == AGGR_SUM.name => Box::new(AggrSum::default()),
name if name == AGGR_MIN.name => Box::new(AggrMin::default()),
name if name == AGGR_MAX.name => Box::new(AggrMax::default()),
name if name == AGGR_MEAN.name => Box::new(AggrMean::default()),
name if name == AGGR_CHOICE.name => Box::new(AggrChoice::default()),
name if name == AGGR_UNIQUE.name => Box::new(AggrUnique::default()),
name if name == AGGR_UNION.name => Box::new(AggrUnion::default()),
name if name == AGGR_INTERSECTION.name => Box::new(AggrIntersection::default()),
name if name == AGGR_SHORTEST.name => Box::new(AggrShortest::default()),
name if name == AGGR_MIN_COST.name => Box::new(AggrMinCost::default()),
name if name == AGGR_COALESCE.name => Box::new(AggrCoalesce::default()),
name if name == AGGR_COLLECT.name => Box::new({
if args.len() == 0 {
AggrCollect::default()
} else {
let arg = args[0].get_int().ok_or_else(|| {
anyhow!(
"the argument to 'collect' must be an integer, got {:?}",
args[0]
)
})?;
ensure!(
arg > 0,
"argument to 'collect' must be positive, got {}",
arg
);
AggrCollect::new(arg as usize)
}
}),
_ => unreachable!(),
});
Ok(())
}
}

@ -251,18 +251,18 @@ impl DataValue {
_ => None,
}
}
pub(crate) fn get_set(&self) -> Option<&BTreeSet<DataValue>> {
match self {
DataValue::Set(s) => Some(s),
_ => None,
}
}
pub(crate) fn get_map(&self) -> Option<&BTreeMap<DataValue, DataValue>> {
match self {
DataValue::Map(m) => Some(m),
_ => None,
}
}
// pub(crate) fn get_set(&self) -> Option<&BTreeSet<DataValue>> {
// match self {
// DataValue::Set(s) => Some(s),
// _ => None,
// }
// }
// pub(crate) fn get_map(&self) -> Option<&BTreeMap<DataValue, DataValue>> {
// match self {
// DataValue::Map(m) => Some(m),
// _ => None,
// }
// }
pub(crate) fn get_string(&self) -> Option<&str> {
match self {
DataValue::String(s) => Some(s),

@ -166,11 +166,17 @@ impl SessionTx {
let is_meet = aggr_kind == AggrKind::Meet;
for (rule_n, rule) in ruleset.iter().enumerate() {
debug!("initial calculation for rule {:?}.{}", rule_symb, rule_n);
let mut aggr = rule.aggr.clone();
for el in aggr.iter_mut() {
if let Some((aggr, args)) = el {
aggr.meet_init(&args)?;
}
}
for item_res in rule.relation.iter(self, Some(0), &use_delta)? {
let item = item_res?;
trace!("item for {:?}.{}: {:?} at {}", rule_symb, rule_n, item, 0);
if is_meet {
store.aggr_meet_put(&item, &rule.aggr, 0)?;
store.aggr_meet_put(&item, &mut aggr, 0)?;
} else if should_check_limit {
if !store.exists(&item, 0) {
store.put(item, 0);
@ -243,6 +249,14 @@ impl SessionTx {
if !should_do_calculation {
continue;
}
let mut aggr = rule.aggr.clone();
for el in aggr.iter_mut() {
if let Some((aggr, args)) = el {
aggr.meet_init(&args)?;
}
}
for (delta_key, delta_store) in stores.iter() {
if !rule.contained_rules.contains(delta_key) {
continue;
@ -256,7 +270,7 @@ impl SessionTx {
let item = item_res?;
// improvement: the clauses can actually be evaluated in parallel
if is_meet_aggr {
let aggr_changed = store.aggr_meet_put(&item, &rule.aggr, epoch)?;
let aggr_changed = store.aggr_meet_put(&item, &mut aggr, epoch)?;
if aggr_changed {
*changed.get_mut(rule_symb).unwrap() = true;
}

@ -72,7 +72,7 @@ impl DerivedRelStore {
pub(crate) fn aggr_meet_put(
&self,
tuple: &Tuple,
aggrs: &[Option<(Aggregation, Vec<DataValue>)>],
aggrs: &mut [Option<(Aggregation, Vec<DataValue>)>],
epoch: u32,
) -> Result<bool> {
self.ensure_mem_db_for_epoch(epoch);
@ -95,10 +95,10 @@ impl DerivedRelStore {
if let Some(prev_aggr) = prev_aggr {
let mut changed = false;
for (i, aggr) in aggrs.iter().enumerate() {
if let Some((aggr_op, aggr_args)) = aggr {
let op = aggr_op.meet_combine;
changed |= op(&mut prev_aggr.0[i], &tuple.0[i], aggr_args)?;
for (i, aggr) in aggrs.iter_mut().enumerate() {
if let Some((aggr_op, _aggr_args)) = aggr {
let op = aggr_op.meet_op.as_mut().unwrap();
changed |= op.update(&mut prev_aggr.0[i], &tuple.0[i])?;
}
}
if changed && epoch != 0 {
@ -112,11 +112,8 @@ impl DerivedRelStore {
.iter()
.enumerate()
.map(|(i, aggr)| -> Result<DataValue> {
if let Some((aggr_op, aggr_args)) = aggr {
let op = aggr_op.meet_combine;
let mut init = DataValue::Guard;
op(&mut init, &tuple.0[i], aggr_args)?;
Ok(init)
if aggr.is_some() {
Ok(tuple.0[i].clone())
} else {
Ok(DataValue::Guard)
}
@ -200,7 +197,7 @@ impl DerivedRelStore {
Tuple(combined)
}
});
let aggrs = aggrs.to_vec();
let mut aggrs = aggrs.to_vec();
let n_keys = aggrs.iter().filter(|aggr| aggr.is_none()).count();
let grouped = it.group_by(move |tuple| tuple.0[..n_keys].to_vec());
let mut invert_indices = vec![];
@ -221,28 +218,36 @@ impl DerivedRelStore {
.map(|(a, _b)| a)
.collect_vec();
for (_key, mut group_iter) in grouped.into_iter() {
// TODO improve normal aggr here
for aggr_pair in &mut aggrs {
if let Some((aggr, args)) = aggr_pair {
aggr.normal_init(&args)?;
}
}
let mut aggr_res = vec![DataValue::Guard; aggrs.len()];
let first_tuple = group_iter.next().unwrap();
for (idx, aggr) in aggrs.iter().enumerate() {
for (idx, aggr) in aggrs.iter_mut().enumerate() {
let val = &first_tuple.0[invert_indices[idx]];
if let Some((aggr_op, aggr_args)) = aggr {
(aggr_op.meet_combine)(&mut aggr_res[idx], val, aggr_args)?;
if let Some((aggr_op, _aggr_args)) = aggr {
let op = aggr_op.normal_op.as_mut().unwrap();
op.set(val)?;
} else {
aggr_res[idx] = first_tuple.0[invert_indices[idx]].clone();
}
}
for tuple in group_iter {
for (idx, aggr) in aggrs.iter().enumerate() {
for (idx, aggr) in aggrs.iter_mut().enumerate() {
let val = &tuple.0[invert_indices[idx]];
if let Some((aggr_op, aggr_args)) = aggr {
(aggr_op.meet_combine)(&mut aggr_res[idx], val, aggr_args)?;
if let Some((aggr_op, _aggr_args)) = aggr {
let op = aggr_op.normal_op.as_mut().unwrap();
// (aggr_op.meet_combine)(&mut aggr_res[idx], val, aggr_args)?;
op.set(val)?;
}
}
}
for (i, aggr) in aggrs.iter().enumerate() {
if let Some((aggr_op, aggr_args)) = aggr {
(aggr_op.meet_combine)(&mut aggr_res[i], &DataValue::Guard, aggr_args)?;
if let Some((aggr_op, _aggr_args)) = aggr {
let op = aggr_op.normal_op.as_ref().unwrap();
aggr_res[i] = op.get()?;
}
}
let res_tpl = Tuple(aggr_res);

@ -652,7 +652,7 @@ fn air_routes() -> Result<()> {
"#,
)?;
dbg!(len_of_names_count_time.elapsed());
assert_eq!(*res.get("rows").unwrap(), json!([[866]]));
assert_eq!(*res.get("rows").unwrap(), json!([[866.0]]));
let group_count_by_out_time = Instant::now();
let res = db.run_script(
@ -1036,7 +1036,7 @@ fn air_routes() -> Result<()> {
"#,
)?;
dbg!(total_distance_from_three_cities_time.elapsed());
assert_eq!(*res.get("rows").unwrap(), json!([[2733379]]));
assert_eq!(*res.get("rows").unwrap(), json!([[2733379.0]]));
let total_distance_within_three_cities_time = Instant::now();
let res = db.run_script(
@ -1047,7 +1047,7 @@ fn air_routes() -> Result<()> {
"#,
)?;
dbg!(total_distance_within_three_cities_time.elapsed());
assert_eq!(*res.get("rows").unwrap(), json!([[10282]]));
assert_eq!(*res.get("rows").unwrap(), json!([[10282.0]]));
let specific_distance_time = Instant::now();
let res = db.run_script(

Loading…
Cancel
Save