diff --git a/src/preprocess/query.rs b/src/preprocess/query.rs index a7d2a1ab..2d278804 100644 --- a/src/preprocess/query.rs +++ b/src/preprocess/query.rs @@ -11,7 +11,9 @@ use crate::data::keyword::Keyword; use crate::data::value::DataValue; use crate::preprocess::triple::TxError; use crate::runtime::transact::SessionTx; -use crate::transact::query::{InlineFixedRelation, InnerJoin, Joiner, Relation, TripleRelation}; +use crate::transact::query::{ + InlineFixedRelation, InnerJoin, Joiner, Relation, StoredDerivedRelation, TripleRelation, +}; use crate::transact::throwaway::ThrowawayArea; use crate::{EntityId, Validity}; @@ -42,6 +44,8 @@ pub enum QueryProcError { UnexpectedForm(JsonValue, String), #[error("arity mismatch for rule {0}: all definitions must have the same arity")] ArityMismatch(Keyword), + #[error("encountered undefined rule {0}")] + UndefinedRule(Keyword), } #[derive(Clone, Debug)] @@ -98,7 +102,6 @@ pub enum Atom { #[derive(Clone, Debug)] pub struct RuleSet { - pub(crate) storage: Option, pub(crate) sets: Vec, pub(crate) arity: usize, } @@ -117,6 +120,8 @@ impl RuleSet { } } +pub(crate) type DatalogProgram = BTreeMap; + #[derive(Clone, Debug, Default)] pub enum Aggregation { #[default] @@ -131,11 +136,53 @@ pub(crate) struct Rule { } impl SessionTx { + fn semi_naive_evaluate(&mut self, prog: &DatalogProgram) -> Result<()> { + let stores = prog + .iter() + .map(|(k, s)| (k.clone(), (self.new_throwaway(), s.arity))) + .collect::>(); + let compiled: BTreeMap<_, _> = prog + .iter() + .map( + |(k, body)| -> Result<(Keyword, Vec<(Vec<(Keyword, Aggregation)>, Relation)>)> { + let mut collected = Vec::with_capacity(body.sets.len()); + for rule in &body.sets { + let relation = self.compile_rule_body(&rule.body, rule.vld, &stores)?; + collected.push((rule.head.clone(), relation)); + } + Ok((k.clone(), collected)) + }, + ) + .try_collect()?; + + for epoch in 0u32.. { + let mut changed = false; + for (k, rules) in compiled.iter() { + let store = stores.get(k).unwrap(); + let use_delta = BTreeSet::from([stores.get(k).unwrap().0.get_id()]); + for (head, relation) in rules { + for item_res in relation.iter(self, epoch, &use_delta) { + let item = item_res?; + // check if item exists, if no, update changed + // store item + // todo: variables elimination should be more aggressive + // todo: reorder items at the end + // improvement: the clauses can actually be evaluated in parallel + } + } + } + if !changed { + break; + } + } + // todo: return the throwaway for query! + Ok(()) + } pub fn parse_rule_sets( &mut self, payload: &JsonValue, default_vld: Validity, - ) -> Result> { + ) -> Result { let rules = payload .as_array() .ok_or_else(|| { @@ -165,14 +212,7 @@ impl SessionTx { return Err(QueryProcError::ArityMismatch(name).into()); } } - Ok(( - name, - RuleSet { - storage: None, - sets: rules, - arity, - }, - )) + Ok((name, RuleSet { sets: rules, arity })) }) .try_collect() } @@ -299,7 +339,12 @@ impl SessionTx { }, )) } - pub fn compile_rule_body(&mut self, clauses: Vec, vld: Validity) -> Result { + fn compile_rule_body( + &mut self, + clauses: &[Atom], + vld: Validity, + stores: &BTreeMap, + ) -> Result { let mut ret = Relation::unit(); let mut seen_variables = BTreeSet::new(); let mut id_serial = 0; @@ -311,13 +356,13 @@ impl SessionTx { }; for clause in clauses { match clause { - Atom::AttrTriple(a_triple) => match (a_triple.entity, a_triple.value) { + Atom::AttrTriple(a_triple) => match (&a_triple.entity, &a_triple.value) { (Term::Const(eid), Term::Var(v_kw)) => { let temp_join_key_left = next_ignored_kw(); let temp_join_key_right = next_ignored_kw(); let const_rel = Relation::Fixed(InlineFixedRelation { bindings: vec![temp_join_key_left.clone()], - data: vec![vec![DataValue::EnId(eid)]], + data: vec![vec![DataValue::EnId(*eid)]], to_eliminate: Default::default(), }); if ret.is_unit() { @@ -338,19 +383,19 @@ impl SessionTx { let mut join_right_keys = vec![temp_join_key_right.clone()]; let v_kw = { - if seen_variables.contains(&v_kw) { + if seen_variables.contains(v_kw) { let ret = next_ignored_kw(); // to_eliminate.insert(ret.clone()); - join_left_keys.push(v_kw); + join_left_keys.push(v_kw.clone()); join_right_keys.push(ret.clone()); ret } else { seen_variables.insert(v_kw.clone()); - v_kw + v_kw.clone() } }; let right = Relation::Triple(TripleRelation { - attr: a_triple.attr, + attr: a_triple.attr.clone(), vld, bindings: [temp_join_key_right, v_kw], }); @@ -369,7 +414,7 @@ impl SessionTx { let temp_join_key_right = next_ignored_kw(); let const_rel = Relation::Fixed(InlineFixedRelation { bindings: vec![temp_join_key_left.clone()], - data: vec![vec![val]], + data: vec![vec![val.clone()]], to_eliminate: Default::default(), }); if ret.is_unit() { @@ -392,16 +437,16 @@ impl SessionTx { let e_kw = { if seen_variables.contains(&e_kw) { let ret = next_ignored_kw(); - join_left_keys.push(e_kw); + join_left_keys.push(e_kw.clone()); join_right_keys.push(ret.clone()); ret } else { seen_variables.insert(e_kw.clone()); - e_kw + e_kw.clone() } }; let right = Relation::Triple(TripleRelation { - attr: a_triple.attr, + attr: a_triple.attr.clone(), vld, bindings: [e_kw, temp_join_key_right], }); @@ -424,27 +469,27 @@ impl SessionTx { let e_kw = { if seen_variables.contains(&e_kw) { let ret = next_ignored_kw(); - join_left_keys.push(e_kw); + join_left_keys.push(e_kw.clone()); join_right_keys.push(ret.clone()); ret } else { seen_variables.insert(e_kw.clone()); - e_kw + e_kw.clone() } }; let v_kw = { - if seen_variables.contains(&v_kw) { + if seen_variables.contains(v_kw) { let ret = next_ignored_kw(); - join_left_keys.push(v_kw); + join_left_keys.push(v_kw.clone()); join_right_keys.push(ret.clone()); ret } else { seen_variables.insert(v_kw.clone()); - v_kw + v_kw.clone() } }; let right = Relation::Triple(TripleRelation { - attr: a_triple.attr, + attr: a_triple.attr.clone(), vld, bindings: [e_kw, v_kw], }); @@ -466,7 +511,7 @@ impl SessionTx { let (left_var_1, left_var_2) = (next_ignored_kw(), next_ignored_kw()); let const_rel = Relation::Fixed(InlineFixedRelation { bindings: vec![left_var_1.clone(), left_var_2.clone()], - data: vec![vec![DataValue::EnId(eid), val]], + data: vec![vec![DataValue::EnId(*eid), val.clone()]], to_eliminate: Default::default(), }); if ret.is_unit() { @@ -485,7 +530,7 @@ impl SessionTx { let (right_var_1, right_var_2) = (next_ignored_kw(), next_ignored_kw()); let right = Relation::Triple(TripleRelation { - attr: a_triple.attr, + attr: a_triple.attr.clone(), vld, bindings: [right_var_1.clone(), right_var_2.clone()], }); @@ -501,7 +546,63 @@ impl SessionTx { } }, Atom::Rule(rule_app) => { - todo!() + let (store, arity) = stores + .get(&rule_app.name) + .ok_or_else(|| QueryProcError::UndefinedRule(rule_app.name.clone()))? + .clone(); + if arity != rule_app.args.len() { + return Err(QueryProcError::ArityMismatch(rule_app.name.clone()).into()); + } + + let mut left_joiner_vals = vec![]; + let mut left_joiner_vars = vec![]; + let mut right_joiner_vars = vec![]; + let mut right_vars = vec![]; + + for term in &rule_app.args { + match term { + Term::Var(var) => { + right_vars.push(var.clone()); + } + Term::Const(constant) => { + left_joiner_vals.push(constant.clone()); + left_joiner_vars.push(next_ignored_kw()); + right_joiner_vars.push(next_ignored_kw()); + right_vars.push(next_ignored_kw()); + } + } + } + + if !left_joiner_vars.is_empty() { + let const_joiner = Relation::Fixed(InlineFixedRelation { + bindings: left_joiner_vars.clone(), + data: vec![left_joiner_vals], + to_eliminate: Default::default(), + }); + ret = Relation::Join(Box::new(InnerJoin { + left: ret, + right: const_joiner, + joiner: Joiner { + left_keys: vec![], + right_keys: vec![], + }, + to_eliminate: Default::default(), + })) + } + + let right = Relation::Derived(StoredDerivedRelation { + bindings: right_vars, + storage: store, + }); + ret = Relation::Join(Box::new(InnerJoin { + left: ret, + right, + joiner: Joiner { + left_keys: left_joiner_vars, + right_keys: right_joiner_vars, + }, + to_eliminate: Default::default(), + })) } Atom::Predicate(_) => { todo!() diff --git a/src/runtime/transact.rs b/src/runtime/transact.rs index 4ec477d0..b0107041 100644 --- a/src/runtime/transact.rs +++ b/src/runtime/transact.rs @@ -17,7 +17,7 @@ use crate::data::encode::{ use crate::data::id::{AttrId, EntityId, TxId, Validity}; use crate::data::keyword::Keyword; use crate::data::value::DataValue; -use crate::transact::throwaway::ThrowawayArea; +use crate::transact::throwaway::{ThrowawayArea, ThrowawayId}; pub struct SessionTx { pub(crate) tx: Tx, @@ -70,7 +70,7 @@ impl SessionTx { let old_count = self.throwaway_count.fetch_add(1, Ordering::AcqRel); ThrowawayArea { db: self.throwaway.clone(), - prefix: old_count, + id: ThrowawayId(old_count), } } diff --git a/src/transact/query.rs b/src/transact/query.rs index 17bbca41..1553429f 100644 --- a/src/transact/query.rs +++ b/src/transact/query.rs @@ -8,7 +8,7 @@ use crate::data::keyword::Keyword; use crate::data::tuple::{Tuple, TupleIter}; use crate::data::value::DataValue; use crate::runtime::transact::SessionTx; -use crate::transact::throwaway::ThrowawayArea; +use crate::transact::throwaway::{ThrowawayArea, ThrowawayId}; use crate::Validity; #[derive(Debug)] @@ -456,13 +456,28 @@ fn get_eliminate_indices(bindings: &[Keyword], eliminate: &BTreeSet) -> #[derive(Debug)] pub struct StoredDerivedRelation { - bindings: Vec, - storage: ThrowawayArea, + pub(crate) bindings: Vec, + pub(crate) storage: ThrowawayArea, } impl StoredDerivedRelation { - fn iter(&self) -> TupleIter { - Box::new(self.storage.scan_all().map_ok(|(t, _)| t)) + fn iter(&self, epoch: u32, use_delta: &BTreeSet) -> TupleIter { + if use_delta.contains(&self.storage.get_id()) { + Box::new( + self.storage + .scan_all() + .filter_map_ok(move |(t, stored_epoch)| { + if let Some(stored_epoch) = stored_epoch { + if epoch > stored_epoch + 1 { + return None; + } + } + Some(t) + }), + ) + } else { + Box::new(self.storage.scan_all().map_ok(|(t, _)| t)) + } } fn join_is_prefix(&self, right_join_indices: &[usize]) -> bool { let mut indices = right_join_indices.to_vec(); @@ -475,6 +490,8 @@ impl StoredDerivedRelation { left_iter: TupleIter<'a>, (left_join_indices, right_join_indices): (Vec, Vec), eliminate_indices: BTreeSet, + epoch: u32, + use_delta: &BTreeSet, ) -> TupleIter<'a> { let mut right_invert_indices = right_join_indices.iter().enumerate().collect_vec(); right_invert_indices.sort_by_key(|(_, b)| **b); @@ -482,39 +499,82 @@ impl StoredDerivedRelation { .into_iter() .map(|(a, _)| left_join_indices[a]) .collect_vec(); - Box::new( - left_iter - .map_ok(move |tuple| { - let eliminate_indices = eliminate_indices.clone(); - let prefix = Tuple( - left_to_prefix_indices - .iter() - .map(|i| tuple.0[*i].clone()) - .collect_vec(), - ); - self.storage.scan_prefix(&prefix).map_ok(move |(found, _)| { - let mut ret = tuple.0.clone(); - ret.extend(found.0); - - if !eliminate_indices.is_empty() { - ret = ret - .into_iter() - .enumerate() - .filter_map(|(i, v)| { - if eliminate_indices.contains(&i) { - None - } else { - Some(v) + if use_delta.contains(&self.storage.get_id()) { + Box::new( + left_iter + .map_ok(move |tuple| { + let eliminate_indices = eliminate_indices.clone(); + let prefix = Tuple( + left_to_prefix_indices + .iter() + .map(|i| tuple.0[*i].clone()) + .collect_vec(), + ); + self.storage + .scan_prefix(&prefix) + .filter_map_ok(move |(found, meta)| { + if let Some(stored_epoch) = meta { + if epoch > stored_epoch + 1 { + return None; } - }) - .collect_vec(); - } - Tuple(ret) + } + let mut ret = tuple.0.clone(); + ret.extend(found.0); + + if !eliminate_indices.is_empty() { + ret = ret + .into_iter() + .enumerate() + .filter_map(|(i, v)| { + if eliminate_indices.contains(&i) { + None + } else { + Some(v) + } + }) + .collect_vec(); + } + Some(Tuple(ret)) + }) }) - }) - .flatten_ok() - .map(flatten_err), - ) + .flatten_ok() + .map(flatten_err), + ) + } else { + Box::new( + left_iter + .map_ok(move |tuple| { + let eliminate_indices = eliminate_indices.clone(); + let prefix = Tuple( + left_to_prefix_indices + .iter() + .map(|i| tuple.0[*i].clone()) + .collect_vec(), + ); + self.storage.scan_prefix(&prefix).map_ok(move |(found, _)| { + let mut ret = tuple.0.clone(); + ret.extend(found.0); + + if !eliminate_indices.is_empty() { + ret = ret + .into_iter() + .enumerate() + .filter_map(|(i, v)| { + if eliminate_indices.contains(&i) { + None + } else { + Some(v) + } + }) + .collect_vec(); + } + Tuple(ret) + }) + }) + .flatten_ok() + .map(flatten_err), + ) + } } } @@ -607,15 +667,20 @@ impl Relation { Relation::Join(j) => j.bindings(), } } - pub fn iter<'a>(&'a self, tx: &'a SessionTx) -> TupleIter<'a> { + pub fn iter<'a>( + &'a self, + tx: &'a SessionTx, + epoch: u32, + use_delta: &BTreeSet, + ) -> TupleIter<'a> { match self { Relation::Fixed(f) => Box::new(f.data.iter().map(|t| Ok(Tuple(t.clone())))), Relation::Triple(r) => Box::new( tx.triple_a_before_scan(r.attr.id, r.vld) .map_ok(|(_, e_id, y)| Tuple(vec![DataValue::EnId(e_id), y])), ), - Relation::Derived(r) => r.iter(), - Relation::Join(j) => j.iter(tx), + Relation::Derived(r) => r.iter(epoch, use_delta), + Relation::Join(j) => j.iter(tx, epoch, use_delta), } } } @@ -641,7 +706,12 @@ impl InnerJoin { ret.extend(self.right.bindings()); ret } - pub(crate) fn iter<'a>(&'a self, tx: &'a SessionTx) -> TupleIter<'a> { + pub(crate) fn iter<'a>( + &'a self, + tx: &'a SessionTx, + epoch: u32, + use_delta: &BTreeSet, + ) -> TupleIter<'a> { let bindings = self.bindings(); let eliminate_indices = get_eliminate_indices(&bindings, &self.to_eliminate); match &self.right { @@ -649,31 +719,48 @@ impl InnerJoin { let join_indices = self .joiner .join_indices(&self.left.bindings(), &self.right.bindings()); - f.join(self.left.iter(tx), join_indices, eliminate_indices) + f.join( + self.left.iter(tx, epoch, use_delta), + join_indices, + eliminate_indices, + ) } Relation::Triple(r) => { let join_indices = self .joiner .join_indices(&self.left.bindings(), &self.right.bindings()); - r.join(self.left.iter(tx), join_indices, tx, eliminate_indices) + r.join( + self.left.iter(tx, epoch, use_delta), + join_indices, + tx, + eliminate_indices, + ) } Relation::Derived(r) => { let join_indices = self .joiner .join_indices(&self.left.bindings(), &self.right.bindings()); if r.join_is_prefix(&join_indices.1) { - r.prefix_join(self.left.iter(tx), join_indices, eliminate_indices) + r.prefix_join( + self.left.iter(tx, epoch, use_delta), + join_indices, + eliminate_indices, + epoch, + use_delta, + ) } else { - self.materialized_join(tx, eliminate_indices) + self.materialized_join(tx, eliminate_indices, epoch, use_delta) } } - Relation::Join(_) => self.materialized_join(tx, eliminate_indices), + Relation::Join(_) => self.materialized_join(tx, eliminate_indices, epoch, use_delta), } } fn materialized_join<'a>( &'a self, tx: &'a SessionTx, eliminate_indices: BTreeSet, + epoch: u32, + use_delta: &BTreeSet, ) -> TupleIter<'a> { let right_bindings = self.right.bindings(); let (left_join_indices, right_join_indices) = self @@ -693,7 +780,7 @@ impl InnerJoin { .map(|(a, _)| a) .collect_vec(); let mut throwaway = tx.new_throwaway(); - for item in self.right.iter(tx) { + for item in self.right.iter(tx, epoch, use_delta) { match item { Ok(tuple) => { let stored_tuple = Tuple( @@ -711,7 +798,7 @@ impl InnerJoin { } Box::new( self.left - .iter(tx) + .iter(tx, epoch, use_delta) .map_ok(move |tuple| { let eliminate_indices = eliminate_indices.clone(); let prefix = Tuple( diff --git a/src/transact/throwaway.rs b/src/transact/throwaway.rs index e6e69aae..8e5f1183 100644 --- a/src/transact/throwaway.rs +++ b/src/transact/throwaway.rs @@ -1,36 +1,43 @@ use std::fmt::{Debug, Formatter}; + use cozorocks::{DbIter, PinSlice, RawRocksDb, RocksDbStatus}; use crate::data::tuple::{EncodedTuple, Tuple}; use crate::data::value::DataValue; +#[derive(Copy, Clone, Debug, Ord, PartialOrd, Eq, PartialEq)] +pub struct ThrowawayId(pub(crate) u32); + #[derive(Clone)] pub(crate) struct ThrowawayArea { pub(crate) db: RawRocksDb, - pub(crate) prefix: u32, + pub(crate) id: ThrowawayId, } impl Debug for ThrowawayArea { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "Throwaway<{}>", self.prefix) + write!(f, "Throwaway<{}>", self.id.0) } } impl ThrowawayArea { + pub(crate) fn get_id(&self) -> ThrowawayId { + todo!() + } pub(crate) fn put(&mut self, tuple: &Tuple, value: &[u8]) -> Result<(), RocksDbStatus> { - let key_encoded = tuple.encode_as_key(self.prefix); + let key_encoded = tuple.encode_as_key(self.id.0); self.db.put(&key_encoded, value) } pub(crate) fn get(&self, tuple: &Tuple) -> Result, RocksDbStatus> { - let key_encoded = tuple.encode_as_key(self.prefix); + let key_encoded = tuple.encode_as_key(self.id.0); self.db.get(&key_encoded) } pub(crate) fn del(&mut self, tuple: &Tuple) -> Result<(), RocksDbStatus> { - let key_encoded = tuple.encode_as_key(self.prefix); + let key_encoded = tuple.encode_as_key(self.id.0); self.db.del(&key_encoded) } - pub(crate) fn scan_all(&self) -> impl Iterator)>> { - let (lower, upper) = EncodedTuple::bounds_for_prefix(self.prefix); + pub(crate) fn scan_all(&self) -> impl Iterator)>> { + let (lower, upper) = EncodedTuple::bounds_for_prefix(self.id.0); let mut it = self .db .iterator() @@ -43,12 +50,12 @@ impl ThrowawayArea { pub(crate) fn scan_prefix( &self, prefix: &Tuple, - ) -> impl Iterator)>> { + ) -> impl Iterator)>> { let mut upper = prefix.0.clone(); upper.push(DataValue::Null); let upper = Tuple(upper); - let upper = upper.encode_as_key(self.prefix); - let lower = prefix.encode_as_key(self.prefix); + let upper = upper.encode_as_key(self.id.0); + let lower = prefix.encode_as_key(self.id.0); let mut it = self .db .iterator() @@ -66,7 +73,7 @@ struct ThrowawayIter { } impl Iterator for ThrowawayIter { - type Item = anyhow::Result<(Tuple, Vec)>; + type Item = anyhow::Result<(Tuple, Option)>; fn next(&mut self) -> Option { if !self.started { @@ -79,7 +86,16 @@ impl Iterator for ThrowawayIter { Ok(None) => None, Ok(Some((k_slice, v_slice))) => match EncodedTuple(k_slice).decode() { Err(e) => Some(Err(e)), - Ok(t) => Some(Ok((t, v_slice.to_vec()))), + Ok(t) => { + let epoch = if v_slice.is_empty() { + None + } else { + Some(u32::from_be_bytes([ + v_slice[0], v_slice[1], v_slice[2], v_slice[3], + ])) + }; + Some(Ok((t, epoch))) + } }, } } @@ -87,7 +103,7 @@ impl Iterator for ThrowawayIter { impl Drop for ThrowawayArea { fn drop(&mut self) { - let (lower, upper) = EncodedTuple::bounds_for_prefix(self.prefix); + let (lower, upper) = EncodedTuple::bounds_for_prefix(self.id.0); if let Err(e) = self.db.range_del(&lower, &upper) { eprintln!("{}", e); }