more aggr ops

main
Ziyang Hu 2 years ago
parent e58eaad9a6
commit 1b30a337ce

@ -28,6 +28,7 @@ base64 = "0.13.0"
chrono = "0.4.19"
num-traits = "0.2.15"
itertools = "0.10.3"
regex = "1.6.0"
pest = "2.2.1"
pest_derive = "2.2.1"
cozorocks = { path = "cozorocks" }

@ -1,4 +1,6 @@
use std::collections::BTreeSet;
use std::fmt::{Debug, Formatter};
use std::ops::Sub;
use anyhow::{anyhow, bail, ensure, Result};
@ -33,6 +35,102 @@ macro_rules! define_aggr {
};
}
define_aggr!(AGGR_UNIQUE, false);
fn aggr_unique(accum: &mut DataValue, current: &DataValue, _args: &[DataValue]) -> Result<bool> {
Ok(match (accum, current) {
(accum @ DataValue::Guard, DataValue::Guard) => {
*accum = DataValue::Set(Default::default());
true
}
(accum @ DataValue::Guard, val) => {
*accum = DataValue::Set(BTreeSet::from([val.clone()]));
true
}
(_, DataValue::Guard) => false,
(DataValue::Set(l), val) => l.insert(val.clone()),
_ => unreachable!(),
})
}
define_aggr!(AGGR_UNION, true);
fn aggr_union(accum: &mut DataValue, current: &DataValue, _args: &[DataValue]) -> Result<bool> {
Ok(match (accum, current) {
(accum @ DataValue::Guard, DataValue::Guard) => {
*accum = DataValue::Set(Default::default());
true
}
(accum @ DataValue::Guard, DataValue::Set(s)) => {
*accum = DataValue::Set(s.clone());
true
}
(accum @ DataValue::Guard, DataValue::List(s)) => {
*accum = DataValue::Set(s.iter().cloned().collect());
true
}
(_, DataValue::Guard) => false,
(DataValue::Set(l), DataValue::Set(s)) => {
if s.is_subset(l) {
false
} else {
l.extend(s.iter().cloned());
true
}
}
(DataValue::Set(l), DataValue::List(s)) => {
let s: BTreeSet<_> = s.iter().cloned().collect();
if s.is_subset(l) {
false
} else {
l.extend(s);
true
}
}
(_, v) => bail!("cannot compute 'union' for value {:?}", v),
})
}
define_aggr!(AGGR_INTERSECTION, true);
fn aggr_intersection(
accum: &mut DataValue,
current: &DataValue,
_args: &[DataValue],
) -> Result<bool> {
Ok(match (accum, current) {
(DataValue::Guard, DataValue::Guard) => false,
(accum @ DataValue::Guard, DataValue::Set(s)) => {
*accum = DataValue::Set(s.clone());
true
}
(accum @ DataValue::Guard, DataValue::List(s)) => {
*accum = DataValue::Set(s.iter().cloned().collect());
true
}
(_, DataValue::Guard) => false,
(DataValue::Set(l), DataValue::Set(s)) => {
if l.is_empty() || l.is_subset(s) {
false
} else {
*l = l.sub(s);
true
}
}
(DataValue::Set(l), DataValue::List(s)) => {
if l.is_empty() {
false
} else {
let s: BTreeSet<_> = s.iter().cloned().collect();
if l.is_subset(&s) {
false
} else {
*l = l.sub(&s);
true
}
}
}
(_, v) => bail!("cannot compute 'intersection' for value {:?}", v),
})
}
define_aggr!(AGGR_COLLECT, false);
fn aggr_collect(accum: &mut DataValue, current: &DataValue, args: &[DataValue]) -> Result<bool> {
Ok(match (accum, current) {
@ -232,6 +330,9 @@ pub(crate) fn get_aggr(name: &str) -> Option<&'static Aggregation> {
"mean" => &AGGR_MEAN,
"choice" => &AGGR_CHOICE,
"collect" => &AGGR_COLLECT,
"unique" => &AGGR_UNIQUE,
"union" => &AGGR_UNION,
"intersection" => &AGGR_INTERSECTION,
_ => return None,
})
}

@ -74,6 +74,12 @@ impl From<DataValue> for JsonValue {
DataValue::Bottom => JsonValue::Null,
DataValue::Timestamp(i) => JsonValue::Number(i.into()),
DataValue::Guard => JsonValue::Null,
DataValue::Set(l) => {
JsonValue::Array(l.iter().map(|v| JsonValue::from(v.clone())).collect())
}
DataValue::Regex(r) => {
json!(r.0.as_str())
}
}
}
}

@ -1,9 +1,11 @@
use std::cmp::{Ordering, Reverse};
use std::collections::BTreeSet;
use std::fmt::{Debug, Display, Formatter};
use anyhow::{bail, Result};
use regex::Regex;
use rmp_serde::Serializer;
use serde::Serialize;
use serde::{Deserialize, Deserializer, Serialize};
use serde_derive::{Deserialize, Serialize};
use smallvec::SmallVec;
use smartstring::{LazyCompact, SmartString};
@ -13,6 +15,41 @@ use crate::data::encode::EncodedVec;
use crate::data::id::{EntityId, TxId};
use crate::data::triple::StoreOp;
#[derive(Clone)]
pub(crate) struct RegexWrapper(pub(crate) Regex);
impl Serialize for RegexWrapper {
fn serialize<S>(&self, _serializer: S) -> std::result::Result<S::Ok, S::Error> where S: serde::Serializer {
panic!("serializing regex");
}
}
impl<'de> Deserialize<'de> for RegexWrapper {
fn deserialize<D>(_deserializer: D) -> std::result::Result<Self, D::Error> where D: Deserializer<'de> {
panic!("deserializing regex");
}
}
impl PartialEq for RegexWrapper {
fn eq(&self, other: &Self) -> bool {
self.0.as_str() == other.0.as_str()
}
}
impl Eq for RegexWrapper {}
impl Ord for RegexWrapper {
fn cmp(&self, other: &Self) -> Ordering {
self.0.as_str().cmp(other.0.as_str())
}
}
impl PartialOrd for RegexWrapper {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
self.0.as_str().partial_cmp(other.0.as_str())
}
}
#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Deserialize, Serialize)]
pub(crate) enum DataValue {
#[serde(rename = "n")]
@ -29,9 +66,12 @@ pub(crate) enum DataValue {
Timestamp(i64),
#[serde(rename = "v")]
Bytes(Box<[u8]>),
#[serde(rename = "x")]
Regex(RegexWrapper),
#[serde(rename = "z")]
List(Vec<DataValue>),
#[serde(rename = "y")]
Set(BTreeSet<DataValue>),
#[serde(rename = "g")]
Guard,
#[serde(rename = "o")]
@ -147,6 +187,9 @@ impl Debug for DataValue {
DataValue::String(s) => {
write!(f, "{:?}", s)
}
DataValue::Regex(r) => {
write!(f, "{:?}", r.0.as_str())
}
DataValue::Uuid(u) => {
write!(f, "{}", u)
}
@ -157,6 +200,7 @@ impl Debug for DataValue {
write!(f, "bytes(len={})", b.len())
}
DataValue::List(t) => f.debug_list().entries(t.iter()).finish(),
DataValue::Set(t) => f.debug_list().entries(t.iter()).finish(),
DataValue::DescVal(v) => {
write!(f, "desc<{:?}>", v)
}

Loading…
Cancel
Save