diff --git a/src/algebra/op.rs b/src/algebra/op.rs index a3ee7651..4ee09bd7 100644 --- a/src/algebra/op.rs +++ b/src/algebra/op.rs @@ -13,6 +13,7 @@ use std::rc::Rc; mod assoc; mod cartesian; mod concat; +mod diff; mod filter; mod from; mod group; @@ -24,6 +25,7 @@ mod nested_loop; mod scan; mod select; mod sort; +mod sym_diff; mod tagged; mod union; mod values; @@ -35,6 +37,7 @@ use crate::runtime::options::default_read_options; pub(crate) use assoc::*; pub(crate) use cartesian::*; pub(crate) use concat::*; +pub(crate) use diff::*; pub(crate) use filter::*; pub(crate) use from::*; pub(crate) use group::*; @@ -46,6 +49,7 @@ pub(crate) use nested_loop::*; pub(crate) use scan::*; pub(crate) use select::*; pub(crate) use sort::*; +pub(crate) use sym_diff::*; pub(crate) use tagged::*; pub(crate) use union::*; pub(crate) use values::*; diff --git a/src/algebra/op/concat.rs b/src/algebra/op/concat.rs index ba6f9b9d..cdcd6ea5 100644 --- a/src/algebra/op/concat.rs +++ b/src/algebra/op/concat.rs @@ -12,8 +12,6 @@ use std::collections::btree_map::Entry; use std::collections::{BTreeMap, BTreeSet}; pub(crate) const NAME_CONCAT: &str = "Concat"; -pub(crate) const NAME_DIFF: &str = "Diff"; -pub(crate) const NAME_SYM_DIFF: &str = "SymDiff"; pub(crate) struct ConcatOp<'a> { pub(crate) sources: Vec>, diff --git a/src/algebra/op/diff.rs b/src/algebra/op/diff.rs new file mode 100644 index 00000000..064a297e --- /dev/null +++ b/src/algebra/op/diff.rs @@ -0,0 +1 @@ +pub(crate) const NAME_DIFF: &str = "Diff"; diff --git a/src/algebra/op/sym_diff.rs b/src/algebra/op/sym_diff.rs new file mode 100644 index 00000000..81bc1a88 --- /dev/null +++ b/src/algebra/op/sym_diff.rs @@ -0,0 +1,136 @@ +use crate::algebra::op::{ + concat_binding_map, drop_temp_table, make_concat_iter, RelationalAlgebra, +}; +use crate::algebra::parser::{build_relational_expr, AlgebraParseError, RaBox}; +use crate::context::TempDbContext; +use crate::data::tuple::{DataKind, OwnTuple, Tuple}; +use crate::data::tuple_set::{BindingMap, TupleSet}; +use crate::ddl::reify::{DdlContext, TableInfo}; +use crate::parser::Pairs; +use crate::runtime::options::default_read_options; +use anyhow::Result; +use cozorocks::PinnableSlicePtr; +use std::collections::BTreeSet; +use std::sync::atomic::Ordering::SeqCst; +use std::sync::atomic::{AtomicU32, Ordering}; + +pub(crate) const NAME_SYM_DIFF: &str = "SymDiff"; + +pub(crate) struct SymDiffOp<'a> { + pub(crate) sources: [RaBox<'a>; 2], + ctx: &'a TempDbContext<'a>, + temp_table_id: AtomicU32, +} + +impl<'a> Drop for SymDiffOp<'a> { + fn drop(&mut self) { + drop_temp_table(self.ctx, self.temp_table_id.load(SeqCst)); + } +} + +impl<'a> SymDiffOp<'a> { + pub(crate) fn build( + ctx: &'a TempDbContext<'a>, + prev: Option>, + mut args: Pairs, + ) -> Result { + let not_enough_args = || AlgebraParseError::NotEnoughArguments(NAME_SYM_DIFF.to_string()); + let left = match prev { + Some(v) => v, + None => build_relational_expr(ctx, args.next().ok_or_else(not_enough_args)?)?, + }; + let right = build_relational_expr(ctx, args.next().ok_or_else(not_enough_args)?)?; + Ok(Self { + sources: [left, right], + ctx, + temp_table_id: Default::default(), + }) + } + + fn dedup_data(&self) -> Result<()> { + let iter = make_concat_iter(&self.sources, self.binding_map()?)?; + + let mut cache_tuple = OwnTuple::with_prefix(self.temp_table_id.load(SeqCst)); + let mut counter = OwnTuple::with_data_prefix(DataKind::Data); + let mut slice_cache = PinnableSlicePtr::default(); + let r_opts = default_read_options(); + let db = &self.ctx.sess.temp; + let w_opts = &self.ctx.sess.w_opts_temp; + for tset in iter { + let tset = tset?; + tset.encode_as_tuple(&mut cache_tuple); + let existing = db.get(&r_opts, &cache_tuple, &mut slice_cache)?; + if existing { + let found = Tuple::new(slice_cache.as_ref()); + let i = found.get_int(0)?; + counter.truncate_all(); + counter.push_int(i + 1); + } else { + counter.truncate_all(); + counter.push_int(1); + } + db.put(w_opts, &cache_tuple, &counter)?; + } + Ok(()) + } +} + +impl<'b> RelationalAlgebra for SymDiffOp<'b> { + fn name(&self) -> &str { + NAME_SYM_DIFF + } + + fn bindings(&self) -> Result> { + let mut ret = BTreeSet::new(); + for el in &self.sources { + ret.extend(el.bindings()?) + } + Ok(ret) + } + + fn binding_map(&self) -> Result { + let maps = self + .sources + .iter() + .map(|el| el.binding_map()) + .collect::>>()?; + + Ok(concat_binding_map(maps.into_iter())) + } + + fn iter<'a>(&'a self) -> Result> + 'a>> { + if self.temp_table_id.load(SeqCst) == 0 { + let temp_id = self.ctx.gen_table_id()?.id; + self.temp_table_id.store(temp_id, SeqCst); + self.dedup_data()?; + } + let r_opts = default_read_options(); + let iter = self.ctx.sess.temp.iterator(&r_opts); + let key = OwnTuple::with_prefix(self.temp_table_id.load(Ordering::SeqCst)); + Ok(Box::new(iter.iter_rows(key).filter_map( + move |(k, counter)| -> Option> { + let v = Tuple::new(k); + match TupleSet::decode_from_tuple(&v) { + Ok(tset) => { + let counter = Tuple::new(counter); + match counter.get_int(0) { + Ok(i) => { + if i == 1 { + Some(Ok(tset)) + } else { + None + } + } + Err(e) => Some(Err(e)), + } + } + Err(e) => Some(Err(e)), + } + }, + ))) + } + + fn identity(&self) -> Option { + None + } +} diff --git a/src/algebra/parser.rs b/src/algebra/parser.rs index 201464d6..535bb656 100644 --- a/src/algebra/parser.rs +++ b/src/algebra/parser.rs @@ -82,6 +82,7 @@ pub(crate) enum RaBox<'a> { ConcatOp(Box>), UnionOp(Box>), IntersectOp(Box>), + SymDiffOp(Box>), GroupOp(Box>), } @@ -103,6 +104,7 @@ impl<'a> RaBox<'a> { RaBox::ConcatOp(inner) => inner.sources.iter().collect(), RaBox::UnionOp(inner) => inner.sources.iter().collect(), RaBox::IntersectOp(inner) => inner.sources.iter().collect(), + RaBox::SymDiffOp(inner) => vec![&inner.sources[0], &inner.sources[1]], RaBox::GroupOp(inner) => vec![&inner.source], } } @@ -136,6 +138,7 @@ impl<'b> RelationalAlgebra for RaBox<'b> { RaBox::ConcatOp(inner) => inner.name(), RaBox::UnionOp(inner) => inner.name(), RaBox::IntersectOp(inner) => inner.name(), + RaBox::SymDiffOp(inner) => inner.name(), RaBox::GroupOp(inner) => inner.name(), } } @@ -157,6 +160,7 @@ impl<'b> RelationalAlgebra for RaBox<'b> { RaBox::ConcatOp(inner) => inner.bindings(), RaBox::UnionOp(inner) => inner.bindings(), RaBox::IntersectOp(inner) => inner.bindings(), + RaBox::SymDiffOp(inner) => inner.bindings(), RaBox::GroupOp(inner) => inner.bindings(), } } @@ -178,6 +182,7 @@ impl<'b> RelationalAlgebra for RaBox<'b> { RaBox::ConcatOp(inner) => inner.binding_map(), RaBox::UnionOp(inner) => inner.binding_map(), RaBox::IntersectOp(inner) => inner.binding_map(), + RaBox::SymDiffOp(inner) => inner.binding_map(), RaBox::GroupOp(inner) => inner.binding_map(), } } @@ -199,6 +204,7 @@ impl<'b> RelationalAlgebra for RaBox<'b> { RaBox::ConcatOp(inner) => inner.iter(), RaBox::UnionOp(inner) => inner.iter(), RaBox::IntersectOp(inner) => inner.iter(), + RaBox::SymDiffOp(inner) => inner.iter(), RaBox::GroupOp(inner) => inner.iter(), } } @@ -220,6 +226,7 @@ impl<'b> RelationalAlgebra for RaBox<'b> { RaBox::ConcatOp(inner) => inner.identity(), RaBox::UnionOp(inner) => inner.identity(), RaBox::IntersectOp(inner) => inner.identity(), + RaBox::SymDiffOp(inner) => inner.identity(), RaBox::GroupOp(inner) => inner.identity(), } } @@ -300,6 +307,11 @@ pub(crate) fn build_relational_expr<'a>( ctx, built, pairs, )?))) } + NAME_SYM_DIFF => { + built = Some(RaBox::SymDiffOp(Box::new(SymDiffOp::build( + ctx, built, pairs, + )?))) + } NAME_GROUP => { built = Some(RaBox::GroupOp(Box::new(GroupOp::build(ctx, built, pairs)?))) } @@ -521,7 +533,7 @@ pub(crate) mod tests { { let ctx = sess.temp_ctx(true); let s = r#" - Intersect(From(d:Job).Where(d.id < 15), From(d:Job).Where(d.id > 10)) + SymDiff(From(d:Job).Where(d.id <= 15), From(d:Job).Where(d.id >= 10)) "#; let ra = build_relational_expr( &ctx,