|
|
|
@ -309,6 +309,43 @@ impl MeetAggrObj for MeetAggrIntersection {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
define_aggr!(AGGR_STR_JOIN, false);
|
|
|
|
|
|
|
|
|
|
#[derive(Default)]
|
|
|
|
|
struct AggrStrJoin {
|
|
|
|
|
separator: Option<String>,
|
|
|
|
|
accum: String,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl AggrStrJoin {
|
|
|
|
|
fn new(separator: String) -> Self {
|
|
|
|
|
Self {
|
|
|
|
|
separator: Some(separator),
|
|
|
|
|
accum: "".to_string(),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl NormalAggrObj for AggrStrJoin {
|
|
|
|
|
fn set(&mut self, value: &DataValue) -> Result<()> {
|
|
|
|
|
if let Some(sep) = &self.separator {
|
|
|
|
|
if !self.accum.is_empty() {
|
|
|
|
|
self.accum.push_str(sep)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if let DataValue::String(s) = value {
|
|
|
|
|
self.accum.push_str(s);
|
|
|
|
|
Ok(())
|
|
|
|
|
} else {
|
|
|
|
|
bail!("cannot apply 'str_join' to {:?}", value)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn get(&self) -> Result<DataValue> {
|
|
|
|
|
todo!()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
define_aggr!(AGGR_COLLECT, false);
|
|
|
|
|
|
|
|
|
|
#[derive(Default)]
|
|
|
|
@ -398,7 +435,7 @@ impl NormalAggrObj for AggrSum {
|
|
|
|
|
DataValue::Number(n) => {
|
|
|
|
|
self.sum += n.get_float();
|
|
|
|
|
}
|
|
|
|
|
v => bail!("cannot compute 'mean': encountered value {:?}", v),
|
|
|
|
|
v => bail!("cannot compute 'sum': encountered value {:?}", v),
|
|
|
|
|
}
|
|
|
|
|
Ok(())
|
|
|
|
|
}
|
|
|
|
@ -408,6 +445,29 @@ impl NormalAggrObj for AggrSum {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
define_aggr!(AGGR_PRODUCT, false);
|
|
|
|
|
|
|
|
|
|
#[derive(Default)]
|
|
|
|
|
struct AggrProduct {
|
|
|
|
|
product: f64,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl NormalAggrObj for AggrProduct {
|
|
|
|
|
fn set(&mut self, value: &DataValue) -> Result<()> {
|
|
|
|
|
match value {
|
|
|
|
|
DataValue::Number(n) => {
|
|
|
|
|
self.product *= n.get_float();
|
|
|
|
|
}
|
|
|
|
|
v => bail!("cannot compute 'product': encountered value {:?}", v),
|
|
|
|
|
}
|
|
|
|
|
Ok(())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn get(&self) -> Result<DataValue> {
|
|
|
|
|
Ok(DataValue::from(self.product))
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
define_aggr!(AGGR_MIN, true);
|
|
|
|
|
|
|
|
|
|
struct AggrMin {
|
|
|
|
@ -516,6 +576,44 @@ impl MeetAggrObj for MeetAggrChoice {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
define_aggr!(AGGR_CHOICE_LAST, true);
|
|
|
|
|
|
|
|
|
|
struct AggrChoiceLast {
|
|
|
|
|
found: DataValue,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl Default for AggrChoiceLast {
|
|
|
|
|
fn default() -> Self {
|
|
|
|
|
Self {
|
|
|
|
|
found: DataValue::Null,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl NormalAggrObj for AggrChoiceLast {
|
|
|
|
|
fn set(&mut self, value: &DataValue) -> Result<()> {
|
|
|
|
|
self.found = value.clone();
|
|
|
|
|
Ok(())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn get(&self) -> Result<DataValue> {
|
|
|
|
|
Ok(self.found.clone())
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
struct MeetAggrChoiceLast;
|
|
|
|
|
|
|
|
|
|
impl MeetAggrObj for MeetAggrChoiceLast {
|
|
|
|
|
fn update(&self, left: &mut DataValue, right: &DataValue) -> Result<bool> {
|
|
|
|
|
Ok(if *left == *right {
|
|
|
|
|
false
|
|
|
|
|
} else {
|
|
|
|
|
*left = right.clone();
|
|
|
|
|
true
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
define_aggr!(AGGR_MIN_COST, true);
|
|
|
|
|
|
|
|
|
|
struct AggrMinCost {
|
|
|
|
@ -739,6 +837,163 @@ impl MeetAggrObj for MeetAggrCoalesce {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
define_aggr!(AGGR_BIT_AND, true);
|
|
|
|
|
|
|
|
|
|
#[derive(Default)]
|
|
|
|
|
struct AggrBitAnd {
|
|
|
|
|
res: Vec<u8>,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl NormalAggrObj for AggrBitAnd {
|
|
|
|
|
fn set(&mut self, value: &DataValue) -> Result<()> {
|
|
|
|
|
match value {
|
|
|
|
|
DataValue::Bytes(bs) => {
|
|
|
|
|
if self.res.len() == 0 {
|
|
|
|
|
self.res = bs.to_vec();
|
|
|
|
|
} else {
|
|
|
|
|
ensure!(
|
|
|
|
|
self.res.len() == bs.len(),
|
|
|
|
|
"operands of 'bit_and' must have the same lengths, got {:x?} and {:x?}",
|
|
|
|
|
self.res,
|
|
|
|
|
bs
|
|
|
|
|
);
|
|
|
|
|
for (l, r) in self.res.iter_mut().zip(bs.iter()) {
|
|
|
|
|
*l &= *r;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
Ok(())
|
|
|
|
|
}
|
|
|
|
|
v => bail!("cannot apply 'bit_and' to {:?}", v),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn get(&self) -> Result<DataValue> {
|
|
|
|
|
Ok(DataValue::Bytes(self.res.clone().into()))
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
struct MeetAggrBitAnd;
|
|
|
|
|
|
|
|
|
|
impl MeetAggrObj for MeetAggrBitAnd {
|
|
|
|
|
fn update(&self, left: &mut DataValue, right: &DataValue) -> Result<bool> {
|
|
|
|
|
match (left, right) {
|
|
|
|
|
(DataValue::Bytes(left), DataValue::Bytes(right)) => {
|
|
|
|
|
if left == right {
|
|
|
|
|
return Ok(false);
|
|
|
|
|
}
|
|
|
|
|
ensure!(
|
|
|
|
|
left.len() == right.len(),
|
|
|
|
|
"operands of 'bit_and' must have the same lengths, got {:x?} and {:x?}",
|
|
|
|
|
left,
|
|
|
|
|
right
|
|
|
|
|
);
|
|
|
|
|
for (l, r) in left.iter_mut().zip(right.iter()) {
|
|
|
|
|
*l &= *r;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Ok(true)
|
|
|
|
|
}
|
|
|
|
|
v => bail!("cannot apply 'bit_and' to {:?}", v),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
define_aggr!(AGGR_BIT_OR, true);
|
|
|
|
|
|
|
|
|
|
#[derive(Default)]
|
|
|
|
|
struct AggrBitOr {
|
|
|
|
|
res: Vec<u8>,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl NormalAggrObj for AggrBitOr {
|
|
|
|
|
fn set(&mut self, value: &DataValue) -> Result<()> {
|
|
|
|
|
match value {
|
|
|
|
|
DataValue::Bytes(bs) => {
|
|
|
|
|
if self.res.len() == 0 {
|
|
|
|
|
self.res = bs.to_vec();
|
|
|
|
|
} else {
|
|
|
|
|
ensure!(
|
|
|
|
|
self.res.len() == bs.len(),
|
|
|
|
|
"operands of 'bit_or' must have the same lengths, got {:x?} and {:x?}",
|
|
|
|
|
self.res,
|
|
|
|
|
bs
|
|
|
|
|
);
|
|
|
|
|
for (l, r) in self.res.iter_mut().zip(bs.iter()) {
|
|
|
|
|
*l |= *r;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
Ok(())
|
|
|
|
|
}
|
|
|
|
|
v => bail!("cannot apply 'bit_or' to {:?}", v),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn get(&self) -> Result<DataValue> {
|
|
|
|
|
Ok(DataValue::Bytes(self.res.clone().into()))
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
struct MeetAggrBitOr;
|
|
|
|
|
|
|
|
|
|
impl MeetAggrObj for MeetAggrBitOr {
|
|
|
|
|
fn update(&self, left: &mut DataValue, right: &DataValue) -> Result<bool> {
|
|
|
|
|
match (left, right) {
|
|
|
|
|
(DataValue::Bytes(left), DataValue::Bytes(right)) => {
|
|
|
|
|
if left == right {
|
|
|
|
|
return Ok(false);
|
|
|
|
|
}
|
|
|
|
|
ensure!(
|
|
|
|
|
left.len() == right.len(),
|
|
|
|
|
"operands of 'bit_or' must have the same lengths, got {:x?} and {:x?}",
|
|
|
|
|
left,
|
|
|
|
|
right
|
|
|
|
|
);
|
|
|
|
|
for (l, r) in left.iter_mut().zip(right.iter()) {
|
|
|
|
|
*l |= *r;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Ok(true)
|
|
|
|
|
}
|
|
|
|
|
v => bail!("cannot apply 'bit_or' to {:?}", v),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
define_aggr!(AGGR_BIT_XOR, false);
|
|
|
|
|
|
|
|
|
|
#[derive(Default)]
|
|
|
|
|
struct AggrBitXor {
|
|
|
|
|
res: Vec<u8>,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl NormalAggrObj for AggrBitXor {
|
|
|
|
|
fn set(&mut self, value: &DataValue) -> Result<()> {
|
|
|
|
|
match value {
|
|
|
|
|
DataValue::Bytes(bs) => {
|
|
|
|
|
if self.res.len() == 0 {
|
|
|
|
|
self.res = bs.to_vec();
|
|
|
|
|
} else {
|
|
|
|
|
ensure!(
|
|
|
|
|
self.res.len() == bs.len(),
|
|
|
|
|
"operands of 'bit_xor' must have the same lengths, got {:x?} and {:x?}",
|
|
|
|
|
self.res,
|
|
|
|
|
bs
|
|
|
|
|
);
|
|
|
|
|
for (l, r) in self.res.iter_mut().zip(bs.iter()) {
|
|
|
|
|
*l ^= *r;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
Ok(())
|
|
|
|
|
}
|
|
|
|
|
v => bail!("cannot apply 'bit_xor' to {:?}", v),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn get(&self) -> Result<DataValue> {
|
|
|
|
|
Ok(DataValue::Bytes(self.res.clone().into()))
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub(crate) fn get_aggr(name: &str) -> Option<&'static Aggregation> {
|
|
|
|
|
Some(match name {
|
|
|
|
|
"count" => &AGGR_COUNT,
|
|
|
|
@ -749,6 +1004,7 @@ pub(crate) fn get_aggr(name: &str) -> Option<&'static Aggregation> {
|
|
|
|
|
"max" => &AGGR_MAX,
|
|
|
|
|
"mean" => &AGGR_MEAN,
|
|
|
|
|
"choice" => &AGGR_CHOICE,
|
|
|
|
|
"choice_last" => &AGGR_CHOICE_LAST,
|
|
|
|
|
"collect" => &AGGR_COLLECT,
|
|
|
|
|
"unique" => &AGGR_UNIQUE,
|
|
|
|
|
"union" => &AGGR_UNION,
|
|
|
|
@ -768,6 +1024,9 @@ impl Aggregation {
|
|
|
|
|
name if name == AGGR_MIN.name => Box::new(MeetAggrMin),
|
|
|
|
|
name if name == AGGR_MAX.name => Box::new(MeetAggrMax),
|
|
|
|
|
name if name == AGGR_CHOICE.name => Box::new(MeetAggrChoice),
|
|
|
|
|
name if name == AGGR_CHOICE_LAST.name => Box::new(MeetAggrChoiceLast),
|
|
|
|
|
name if name == AGGR_BIT_AND.name => Box::new(MeetAggrBitAnd),
|
|
|
|
|
name if name == AGGR_BIT_OR.name => Box::new(MeetAggrBitOr),
|
|
|
|
|
name if name == AGGR_UNION.name => Box::new(MeetAggrUnion),
|
|
|
|
|
name if name == AGGR_INTERSECTION.name => Box::new(MeetAggrIntersection),
|
|
|
|
|
name if name == AGGR_SHORTEST.name => Box::new(MeetAggrShortest),
|
|
|
|
@ -786,10 +1045,15 @@ impl Aggregation {
|
|
|
|
|
name if name == AGGR_GROUP_COUNT.name => Box::new(AggrGroupCount::default()),
|
|
|
|
|
name if name == AGGR_COUNT_UNIQUE.name => Box::new(AggrCountUnique::default()),
|
|
|
|
|
name if name == AGGR_SUM.name => Box::new(AggrSum::default()),
|
|
|
|
|
name if name == AGGR_PRODUCT.name => Box::new(AggrProduct::default()),
|
|
|
|
|
name if name == AGGR_MIN.name => Box::new(AggrMin::default()),
|
|
|
|
|
name if name == AGGR_MAX.name => Box::new(AggrMax::default()),
|
|
|
|
|
name if name == AGGR_MEAN.name => Box::new(AggrMean::default()),
|
|
|
|
|
name if name == AGGR_CHOICE.name => Box::new(AggrChoice::default()),
|
|
|
|
|
name if name == AGGR_CHOICE_LAST.name => Box::new(AggrChoiceLast::default()),
|
|
|
|
|
name if name == AGGR_BIT_AND.name => Box::new(AggrBitAnd::default()),
|
|
|
|
|
name if name == AGGR_BIT_OR.name => Box::new(AggrBitOr::default()),
|
|
|
|
|
name if name == AGGR_BIT_XOR.name => Box::new(AggrBitXor::default()),
|
|
|
|
|
name if name == AGGR_UNIQUE.name => Box::new(AggrUnique::default()),
|
|
|
|
|
name if name == AGGR_UNION.name => Box::new(AggrUnion::default()),
|
|
|
|
|
name if name == AGGR_INTERSECTION.name => Box::new(AggrIntersection::default()),
|
|
|
|
@ -797,6 +1061,19 @@ impl Aggregation {
|
|
|
|
|
name if name == AGGR_MIN_COST.name => Box::new(AggrMinCost::default()),
|
|
|
|
|
name if name == AGGR_MAX_COST.name => Box::new(AggrMaxCost::default()),
|
|
|
|
|
name if name == AGGR_COALESCE.name => Box::new(AggrCoalesce::default()),
|
|
|
|
|
name if name == AGGR_STR_JOIN.name => Box::new({
|
|
|
|
|
if args.len() == 0 {
|
|
|
|
|
AggrStrJoin::default()
|
|
|
|
|
} else {
|
|
|
|
|
let arg = args[0].get_string().ok_or_else(|| {
|
|
|
|
|
anyhow!(
|
|
|
|
|
"the argument to 'str_join' must be a string, got {:?}",
|
|
|
|
|
args[0]
|
|
|
|
|
)
|
|
|
|
|
})?;
|
|
|
|
|
AggrStrJoin::new(arg.to_string())
|
|
|
|
|
}
|
|
|
|
|
}),
|
|
|
|
|
name if name == AGGR_COLLECT.name => Box::new({
|
|
|
|
|
if args.len() == 0 {
|
|
|
|
|
AggrCollect::default()
|
|
|
|
|