main
Ziyang Hu 2 years ago
parent a84cd0227e
commit dc237a13c2

@ -10,7 +10,6 @@ use crate::data::tuple_set::{
use crate::ddl::reify::TableInfo;
use crate::runtime::options::{default_read_options, default_write_options};
use anyhow::Result;
use chrono::format::Item;
use cozorocks::{DbPtr, PrefixIterator, ReadOptionsPtr, TransactionPtr, WriteOptionsPtr};
use std::collections::{BTreeMap, BTreeSet};

@ -1,6 +1,6 @@
use crate::algebra::op::{
build_binding_map_from_info, parse_chain, unique_prefix_nested_loop, ChainPart,
ChainPartEdgeDir, FilterError, InterpretContext, JoinType, NestLoopLeftPrefixIter, QueryError,
ChainPartEdgeDir, FilterError, InterpretContext, JoinType, NestLoopLeftPrefixIter,
RelationalAlgebra, SelectOpError, SortDirection, NAME_SKIP, NAME_SORT, NAME_TAKE, NAME_WHERE,
};
use crate::algebra::parser::{AlgebraParseError, RaBox};
@ -9,7 +9,7 @@ use crate::data::expr::Expr;
use crate::data::parser::parse_scoped_dict;
use crate::data::tuple::{DataKind, OwnTuple, ReifiedTuple, Tuple};
use crate::data::tuple_set::{
merge_binding_maps, BindingMap, BindingMapEvalContext, TableId, TupleSet, TupleSetEvalContext,
merge_binding_maps, BindingMap, BindingMapEvalContext, TupleSet, TupleSetEvalContext,
TupleSetIdx,
};
use crate::data::value::Value;
@ -17,10 +17,7 @@ use crate::ddl::reify::{AssocInfo, EdgeInfo, NodeInfo, TableInfo};
use crate::parser::{Pair, Pairs, Rule};
use crate::runtime::options::{default_read_options, default_write_options};
use anyhow::Result;
use cozorocks::{
DbPtr, IteratorPtr, PrefixIterator, ReadOptionsPtr, RowIterator, TransactionPtr,
WriteOptionsPtr,
};
use cozorocks::RowIterator;
use std::collections::{BTreeMap, BTreeSet};
pub(crate) const NAME_WALK: &str = "Walk";

@ -656,6 +656,9 @@ pub(crate) mod tests {
did_avg: avg[d.id],
did: d.id,
dvar: var[d.id],
dmin: min[d.id],
dmax: max[d.id],
dcoll: collect[d.id],
})
"#;
let ra = build_relational_expr(

@ -2,7 +2,6 @@ use crate::data::eval::EvalError;
use crate::data::op_agg::{OpAgg, OpAggT};
use crate::data::value::{StaticValue, Value};
use anyhow::Result;
use std::sync::atomic::{Ordering};
use std::sync::{Arc, Mutex};
use crate::data::expr::Expr;

@ -0,0 +1,116 @@
use crate::data::eval::EvalError;
use crate::data::expr::Expr;
use crate::data::op_agg::{OpAgg, OpAggT};
use crate::data::value::{StaticValue, Value};
use anyhow::Result;
use std::sync::{Arc, Mutex};
use ordered_float::Float;
pub(crate) const NAME_OP_MIN: &str = "min";
pub(crate) const NAME_OP_MAX: &str = "max";
pub(crate) fn build_op_min(a_args: Vec<Expr>, args: Vec<Expr>) -> Expr {
Expr::ApplyAgg(OpAgg(Arc::new(OpMin::default())), a_args, args)
}
pub(crate) fn build_op_max(a_args: Vec<Expr>, args: Vec<Expr>) -> Expr {
Expr::ApplyAgg(OpAgg(Arc::new(OpMax::default())), a_args, args)
}
#[derive(Default)]
pub struct OpMin {
total: Mutex<f64>,
}
impl OpAggT for OpMin {
fn name(&self) -> &str {
NAME_OP_MIN
}
fn arity(&self) -> Option<usize> {
Some(1)
}
fn reset(&self) {
let mut total = self.total.lock().unwrap();
*total = f64::max_value();
}
fn initialize(&self, _a_args: Vec<StaticValue>) -> Result<()> {
Ok(())
}
fn put(&self, args: &[Value]) -> Result<()> {
let arg = args.iter().next().unwrap();
let to_add = match arg {
Value::Int(i) => (*i) as f64,
Value::Float(f) => f.into_inner(),
Value::Null => return Ok(()),
v => {
return Err(EvalError::OpTypeMismatch(
self.name().to_string(),
vec![v.clone().into_static()],
)
.into())
}
};
let current = *self.total.lock().unwrap();
*self.total.lock().unwrap() = current.min(to_add);
Ok(())
}
fn get(&self) -> Result<StaticValue> {
let f = *self.total.lock().unwrap();
Ok(f.into())
}
}
#[derive(Default)]
pub struct OpMax {
total: Mutex<f64>,
}
impl OpAggT for OpMax {
fn name(&self) -> &str {
NAME_OP_MAX
}
fn arity(&self) -> Option<usize> {
Some(1)
}
fn reset(&self) {
let mut total = self.total.lock().unwrap();
*total = f64::min_value();
}
fn initialize(&self, _a_args: Vec<StaticValue>) -> Result<()> {
Ok(())
}
fn put(&self, args: &[Value]) -> Result<()> {
let arg = args.iter().next().unwrap();
let to_add = match arg {
Value::Int(i) => (*i) as f64,
Value::Float(f) => f.into_inner(),
Value::Null => return Ok(()),
v => {
return Err(EvalError::OpTypeMismatch(
self.name().to_string(),
vec![v.clone().into_static()],
)
.into())
}
};
let current = *self.total.lock().unwrap();
*self.total.lock().unwrap() = current.max(to_add);
Ok(())
}
fn get(&self) -> Result<StaticValue> {
let f = *self.total.lock().unwrap();
Ok(f.into())
}
}

@ -1,7 +1,6 @@
use crate::data::eval::EvalError;
use crate::data::expr::Expr;
use crate::data::op::{OP_ADD, OP_DIV, OP_IS_NULL, OP_MUL, OP_POW, OP_SUB};
use crate::data::op_agg::{build_op_count_non_null, OpAgg, OpAggT, OpCountWith};
use crate::data::op_agg::{OpAgg, OpAggT};
use crate::data::value::{StaticValue, Value};
use anyhow::Result;
use std::sync::atomic::{AtomicUsize, Ordering};

@ -10,7 +10,6 @@ use lazy_static::lazy_static;
use pest::prec_climber::{Assoc, Operator, PrecClimber};
use std::borrow::Cow;
use std::collections::BTreeMap;
use std::sync::Arc;
#[derive(thiserror::Error, Debug)]
pub enum ExprParseError {
@ -343,6 +342,8 @@ fn build_aggr_call(name: &str, a_args: Vec<Expr>, args: Vec<Expr>) -> Result<Exp
NAME_OP_LAG => build_op_lag(a_args, args),
NAME_OP_COLLECT_IF => build_op_collect_if(a_args, args),
NAME_OP_COLLECT => build_op_collect(a_args, args),
NAME_OP_MIN => build_op_min(a_args, args),
NAME_OP_MAX => build_op_max(a_args, args),
method_name => unimplemented!("{}", method_name),
})
}

Loading…
Cancel
Save