From 1b30a337ce9ffd3af95b58eb7b53c38f8d904c17 Mon Sep 17 00:00:00 2001 From: Ziyang Hu Date: Tue, 9 Aug 2022 17:23:12 +0800 Subject: [PATCH] more aggr ops --- Cargo.toml | 1 + src/data/aggr.rs | 101 ++++++++++++++++++++++++++++++++++++++++++++++ src/data/json.rs | 6 +++ src/data/value.rs | 48 +++++++++++++++++++++- 4 files changed, 154 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index ea62e116..69d3f408 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,6 +28,7 @@ base64 = "0.13.0" chrono = "0.4.19" num-traits = "0.2.15" itertools = "0.10.3" +regex = "1.6.0" pest = "2.2.1" pest_derive = "2.2.1" cozorocks = { path = "cozorocks" } diff --git a/src/data/aggr.rs b/src/data/aggr.rs index 291ef372..ad8bcf28 100644 --- a/src/data/aggr.rs +++ b/src/data/aggr.rs @@ -1,4 +1,6 @@ +use std::collections::BTreeSet; use std::fmt::{Debug, Formatter}; +use std::ops::Sub; use anyhow::{anyhow, bail, ensure, Result}; @@ -33,6 +35,102 @@ macro_rules! define_aggr { }; } +define_aggr!(AGGR_UNIQUE, false); +fn aggr_unique(accum: &mut DataValue, current: &DataValue, _args: &[DataValue]) -> Result { + Ok(match (accum, current) { + (accum @ DataValue::Guard, DataValue::Guard) => { + *accum = DataValue::Set(Default::default()); + true + } + (accum @ DataValue::Guard, val) => { + *accum = DataValue::Set(BTreeSet::from([val.clone()])); + true + } + (_, DataValue::Guard) => false, + (DataValue::Set(l), val) => l.insert(val.clone()), + _ => unreachable!(), + }) +} + +define_aggr!(AGGR_UNION, true); +fn aggr_union(accum: &mut DataValue, current: &DataValue, _args: &[DataValue]) -> Result { + Ok(match (accum, current) { + (accum @ DataValue::Guard, DataValue::Guard) => { + *accum = DataValue::Set(Default::default()); + true + } + (accum @ DataValue::Guard, DataValue::Set(s)) => { + *accum = DataValue::Set(s.clone()); + true + } + (accum @ DataValue::Guard, DataValue::List(s)) => { + *accum = DataValue::Set(s.iter().cloned().collect()); + true + } + (_, DataValue::Guard) => false, + (DataValue::Set(l), DataValue::Set(s)) => { + if s.is_subset(l) { + false + } else { + l.extend(s.iter().cloned()); + true + } + } + (DataValue::Set(l), DataValue::List(s)) => { + let s: BTreeSet<_> = s.iter().cloned().collect(); + if s.is_subset(l) { + false + } else { + l.extend(s); + true + } + } + (_, v) => bail!("cannot compute 'union' for value {:?}", v), + }) +} + +define_aggr!(AGGR_INTERSECTION, true); +fn aggr_intersection( + accum: &mut DataValue, + current: &DataValue, + _args: &[DataValue], +) -> Result { + Ok(match (accum, current) { + (DataValue::Guard, DataValue::Guard) => false, + (accum @ DataValue::Guard, DataValue::Set(s)) => { + *accum = DataValue::Set(s.clone()); + true + } + (accum @ DataValue::Guard, DataValue::List(s)) => { + *accum = DataValue::Set(s.iter().cloned().collect()); + true + } + (_, DataValue::Guard) => false, + (DataValue::Set(l), DataValue::Set(s)) => { + if l.is_empty() || l.is_subset(s) { + false + } else { + *l = l.sub(s); + true + } + } + (DataValue::Set(l), DataValue::List(s)) => { + if l.is_empty() { + false + } else { + let s: BTreeSet<_> = s.iter().cloned().collect(); + if l.is_subset(&s) { + false + } else { + *l = l.sub(&s); + true + } + } + } + (_, v) => bail!("cannot compute 'intersection' for value {:?}", v), + }) +} + define_aggr!(AGGR_COLLECT, false); fn aggr_collect(accum: &mut DataValue, current: &DataValue, args: &[DataValue]) -> Result { Ok(match (accum, current) { @@ -232,6 +330,9 @@ pub(crate) fn get_aggr(name: &str) -> Option<&'static Aggregation> { "mean" => &AGGR_MEAN, "choice" => &AGGR_CHOICE, "collect" => &AGGR_COLLECT, + "unique" => &AGGR_UNIQUE, + "union" => &AGGR_UNION, + "intersection" => &AGGR_INTERSECTION, _ => return None, }) } diff --git a/src/data/json.rs b/src/data/json.rs index 9ffb037e..a5a5c449 100644 --- a/src/data/json.rs +++ b/src/data/json.rs @@ -74,6 +74,12 @@ impl From for JsonValue { DataValue::Bottom => JsonValue::Null, DataValue::Timestamp(i) => JsonValue::Number(i.into()), DataValue::Guard => JsonValue::Null, + DataValue::Set(l) => { + JsonValue::Array(l.iter().map(|v| JsonValue::from(v.clone())).collect()) + } + DataValue::Regex(r) => { + json!(r.0.as_str()) + } } } } diff --git a/src/data/value.rs b/src/data/value.rs index 3f5b17fd..47a3a803 100644 --- a/src/data/value.rs +++ b/src/data/value.rs @@ -1,9 +1,11 @@ use std::cmp::{Ordering, Reverse}; +use std::collections::BTreeSet; use std::fmt::{Debug, Display, Formatter}; use anyhow::{bail, Result}; +use regex::Regex; use rmp_serde::Serializer; -use serde::Serialize; +use serde::{Deserialize, Deserializer, Serialize}; use serde_derive::{Deserialize, Serialize}; use smallvec::SmallVec; use smartstring::{LazyCompact, SmartString}; @@ -13,6 +15,41 @@ use crate::data::encode::EncodedVec; use crate::data::id::{EntityId, TxId}; use crate::data::triple::StoreOp; +#[derive(Clone)] +pub(crate) struct RegexWrapper(pub(crate) Regex); + +impl Serialize for RegexWrapper { + fn serialize(&self, _serializer: S) -> std::result::Result where S: serde::Serializer { + panic!("serializing regex"); + } +} + +impl<'de> Deserialize<'de> for RegexWrapper { + fn deserialize(_deserializer: D) -> std::result::Result where D: Deserializer<'de> { + panic!("deserializing regex"); + } +} + +impl PartialEq for RegexWrapper { + fn eq(&self, other: &Self) -> bool { + self.0.as_str() == other.0.as_str() + } +} + +impl Eq for RegexWrapper {} + +impl Ord for RegexWrapper { + fn cmp(&self, other: &Self) -> Ordering { + self.0.as_str().cmp(other.0.as_str()) + } +} + +impl PartialOrd for RegexWrapper { + fn partial_cmp(&self, other: &Self) -> Option { + self.0.as_str().partial_cmp(other.0.as_str()) + } +} + #[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Deserialize, Serialize)] pub(crate) enum DataValue { #[serde(rename = "n")] @@ -29,9 +66,12 @@ pub(crate) enum DataValue { Timestamp(i64), #[serde(rename = "v")] Bytes(Box<[u8]>), - + #[serde(rename = "x")] + Regex(RegexWrapper), #[serde(rename = "z")] List(Vec), + #[serde(rename = "y")] + Set(BTreeSet), #[serde(rename = "g")] Guard, #[serde(rename = "o")] @@ -147,6 +187,9 @@ impl Debug for DataValue { DataValue::String(s) => { write!(f, "{:?}", s) } + DataValue::Regex(r) => { + write!(f, "{:?}", r.0.as_str()) + } DataValue::Uuid(u) => { write!(f, "{}", u) } @@ -157,6 +200,7 @@ impl Debug for DataValue { write!(f, "bytes(len={})", b.len()) } DataValue::List(t) => f.debug_list().entries(t.iter()).finish(), + DataValue::Set(t) => f.debug_list().entries(t.iter()).finish(), DataValue::DescVal(v) => { write!(f, "desc<{:?}>", v) }