From c624951f4518057866f63f6c65d39d4a6383084d Mon Sep 17 00:00:00 2001 From: Ziyang Hu Date: Sat, 13 Aug 2022 14:44:20 +0800 Subject: [PATCH] specialized iterator for no history --- .gitignore | 3 +- Cargo.toml | 3 + src/query/relation.rs | 132 +++++++++++++++++++++++-------- src/transact/exec.rs | 178 +++++++++++++++++++++++++++++------------- 4 files changed, 227 insertions(+), 89 deletions(-) diff --git a/.gitignore b/.gitignore index 2cddba5d..2d74f8d7 100644 --- a/.gitignore +++ b/.gitignore @@ -30,4 +30,5 @@ _test* *.db cozorocks/deps/ -.DS_Store \ No newline at end of file +.DS_Store +flamegraph.svg \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml index 69d3f408..0a5635ab 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,5 +39,8 @@ cozorocks = { path = "cozorocks" } #[profile.release] #lto = true +[profile.release] +debug = true + [workspace] members = ["cozorocks", "cozohttp", "cozopy", "cozoplay/src-tauri"] \ No newline at end of file diff --git a/src/query/relation.rs b/src/query/relation.rs index 991c81f9..48acab5f 100644 --- a/src/query/relation.rs +++ b/src/query/relation.rs @@ -3,11 +3,12 @@ use std::fmt::{Debug, Formatter}; use std::iter; use anyhow::{anyhow, bail, Context, Result}; +use either::{Left, Right}; use itertools::Itertools; use crate::data::attr::Attribute; use crate::data::expr::Expr; -use crate::data::id::Validity; +use crate::data::id::{AttrId, EntityId, Validity}; use crate::data::symb::Symbol; use crate::data::tuple::{Tuple, TupleIter}; use crate::data::value::DataValue; @@ -623,10 +624,17 @@ impl TripleRelation { } fn iter<'a>(&'a self, tx: &'a SessionTx) -> TupleIter<'a> { - let it = tx - .triple_a_before_scan(self.attr.id, self.vld) - .map_ok(|(_, e_id, y)| Tuple(vec![e_id.to_value(), y])); - self.return_filtered_iter(it, Default::default()) + if self.attr.with_history { + let it = tx + .triple_a_before_scan(self.attr.id, self.vld) + .map_ok(|(_, e_id, y)| Tuple(vec![e_id.to_value(), y])); + self.return_filtered_iter(it, Default::default()) + } else { + let it = tx + .triple_a_scan(self.attr.id) + .map_ok(|(_, e_id, y)| Tuple(vec![e_id.to_value(), y])); + self.return_filtered_iter(it, Default::default()) + } } pub(crate) fn neg_join<'a>( @@ -713,13 +721,26 @@ impl TripleRelation { // [f, f] not really a join let it = left_iter .map_ok(|tuple| { - tx.triple_a_before_scan(self.attr.id, self.vld) - .map_ok(move |(_, e_id, val)| { - let mut ret = tuple.0.clone(); - ret.push(e_id.to_value()); - ret.push(val); - Tuple(ret) - }) + if self.attr.with_history { + Left(tx.triple_a_before_scan(self.attr.id, self.vld).map_ok( + move |(_, e_id, val)| { + let mut ret = tuple.0.clone(); + ret.push(e_id.to_value()); + ret.push(val); + Tuple(ret) + }, + )) + } else { + Right( + tx.triple_a_scan(self.attr.id) + .map_ok(move |(_, e_id, val)| { + let mut ret = tuple.0.clone(); + ret.push(e_id.to_value()); + ret.push(val); + Tuple(ret) + }), + ) + } }) .flatten_ok() .map(flatten_err); @@ -818,7 +839,12 @@ impl TripleRelation { .unwrap() .get_entity_id() .with_context(|| format!("{:?}, {:?}", self, tuple))?; - match tx.triple_ea_before_scan(eid, self.attr.id, self.vld).next() { + let nxt = if self.attr.with_history { + tx.triple_ea_before_scan(eid, self.attr.id, self.vld).next() + } else { + tx.triple_ea_scan(eid, self.attr.id).next() + }; + match nxt { None => Ok(if !eliminate_indices.is_empty() { Some(Tuple( tuple @@ -862,13 +888,20 @@ impl TripleRelation { .get_entity_id() .with_context(|| format!("{:?}, {:?}, {}", self, tuple, left_e_idx)) .map(move |eid| { - tx.triple_ea_before_scan(eid, self.attr.id, self.vld) - .map_ok(move |(eid, _, val)| { - let mut ret = tuple.0.clone(); - ret.push(eid.to_value()); - ret.push(val); - Tuple(ret) - }) + let clj = move |(eid, _, val): (EntityId, AttrId, DataValue)| { + let mut ret = tuple.0.clone(); + ret.push(eid.to_value()); + ret.push(val); + Tuple(ret) + }; + if self.attr.with_history { + Left(tx.triple_ea_scan(eid, self.attr.id).map_ok(clj)) + } else { + Right( + tx.triple_ea_before_scan(eid, self.attr.id, self.vld) + .map_ok(clj), + ) + } }) }) .map(flatten_err) @@ -892,10 +925,13 @@ impl TripleRelation { .unwrap() .get_entity_id() .with_context(|| format!("{:?}", self))?; - match tx - .triple_vref_a_before_scan(v_eid, self.attr.id, self.vld) - .next() - { + let nxt = if self.attr.with_history { + tx.triple_vref_a_before_scan(v_eid, self.attr.id, self.vld) + .next() + } else { + tx.triple_vref_a_scan(v_eid, self.attr.id).next() + }; + match nxt { None => Ok(if !eliminate_indices.is_empty() { Some(Tuple( tuple @@ -964,7 +1000,12 @@ impl TripleRelation { left_iter .map_ok(move |tuple| -> Result> { let val = tuple.0.get(left_v_idx).unwrap(); - match tx.triple_av_before_scan(self.attr.id, val, self.vld).next() { + let nxt = if self.attr.with_history { + tx.triple_av_before_scan(self.attr.id, val, self.vld).next() + } else { + tx.triple_av_scan(self.attr.id, val).next() + }; + match nxt { None => Ok(if !eliminate_indices.is_empty() { Some(Tuple( tuple @@ -1002,13 +1043,26 @@ impl TripleRelation { let it = left_iter .map_ok(move |tuple| { let val = tuple.0.get(left_v_idx).unwrap(); - tx.triple_av_before_scan(self.attr.id, val, self.vld) - .map_ok(move |(_, val, eid)| { - let mut ret = tuple.0.clone(); - ret.push(eid.to_value()); - ret.push(val); - Tuple(ret) - }) + if self.attr.with_history { + Left( + tx.triple_av_before_scan(self.attr.id, val, self.vld) + .map_ok(move |(_, val, eid): (AttrId, DataValue, EntityId)| { + let mut ret = tuple.0.clone(); + ret.push(eid.to_value()); + ret.push(val); + Tuple(ret) + }), + ) + } else { + Right(tx.triple_av_scan(self.attr.id, val).map_ok( + move |(_, val, eid): (AttrId, DataValue, EntityId)| { + let mut ret = tuple.0.clone(); + ret.push(eid.to_value()); + ret.push(val); + Tuple(ret) + }, + )) + } }) .flatten_ok() .map(flatten_err); @@ -1025,7 +1079,12 @@ impl TripleRelation { left_iter .map_ok(move |tuple| -> Result> { let val = tuple.0.get(left_v_idx).unwrap(); - for item in tx.triple_a_before_scan(self.attr.id, self.vld) { + let it = if self.attr.with_history { + Left(tx.triple_a_before_scan(self.attr.id, self.vld)) + } else { + Right(tx.triple_a_scan(self.attr.id)) + }; + for item in it { let (_, _, found_val) = item?; if *val == found_val { return Ok(None); @@ -1063,7 +1122,12 @@ impl TripleRelation { ) -> TupleIter<'a> { // [f, b] where b is not indexed let throwaway = tx.new_temp_store(); - for item in tx.triple_a_before_scan(self.attr.id, self.vld) { + let it = if self.attr.with_history { + Left(tx.triple_a_before_scan(self.attr.id, self.vld)) + } else { + Right(tx.triple_a_scan(self.attr.id)) + }; + for item in it { match item { Err(e) => return Box::new([Err(e)].into_iter()), Ok((_, eid, val)) => { diff --git a/src/transact/exec.rs b/src/transact/exec.rs index fc38c8b0..1679989a 100644 --- a/src/transact/exec.rs +++ b/src/transact/exec.rs @@ -145,11 +145,7 @@ impl SessionTx { let (v_in_key, v_in_val) = if attr.cardinality.is_one() { ( &DataValue::Guard, - if op.is_assert() { - v - } else { - &DataValue::Guard - }, + if op.is_assert() { v } else { &DataValue::Guard }, ) } else { (v, &DataValue::Guard) @@ -410,6 +406,15 @@ impl SessionTx { }, ) } + pub(crate) fn triple_ea_scan( + &self, + eid: EntityId, + aid: AttrId, + ) -> impl Iterator> { + let lower = encode_eav_key(eid, aid, &DataValue::Null, Validity::MAX); + let upper = encode_eav_key(eid, aid, &DataValue::Bottom, Validity::MIN); + TripleEntityAttrIter::new(self.tx.iterator(), lower, upper) + } pub(crate) fn triple_ea_before_scan( &self, eid: EntityId, @@ -435,6 +440,15 @@ impl SessionTx { } Ok(false) } + pub(crate) fn triple_av_scan( + &self, + aid: AttrId, + v: &DataValue, + ) -> impl Iterator> { + let lower = encode_ave_key(aid, v, EntityId::ZERO, Validity::MAX); + let upper = encode_ave_key(aid, v, EntityId::MAX_PERM, Validity::MIN); + TripleAttrValueIter::new(self.tx.iterator(), lower, upper) + } pub(crate) fn triple_av_before_scan( &self, aid: AttrId, @@ -455,6 +469,15 @@ impl SessionTx { let upper = encode_ave_key(aid, v, EntityId::MAX_PERM, Validity::MIN); TripleAttrValueAfterIter::new(self.tx.iterator(), lower, upper, after) } + pub(crate) fn triple_vref_a_scan( + &self, + v_eid: EntityId, + aid: AttrId, + ) -> impl Iterator> { + let lower = encode_vae_key(v_eid, aid, EntityId::ZERO, Validity::MAX); + let upper = encode_vae_key(v_eid, aid, EntityId::MAX_PERM, Validity::MIN); + TripleValueRefAttrIter::new(self.tx.iterator(), lower, upper) + } pub(crate) fn triple_vref_a_before_scan( &self, v_eid: EntityId, @@ -465,6 +488,14 @@ impl SessionTx { let upper = encode_vae_key(v_eid, aid, EntityId::MAX_PERM, Validity::MIN); TripleValueRefAttrBeforeIter::new(self.tx.iterator(), lower, upper, before) } + pub(crate) fn triple_a_scan( + &self, + aid: AttrId, + ) -> impl Iterator> { + let lower = encode_aev_key(aid, EntityId::ZERO, &DataValue::Null, Validity::MAX); + let upper = encode_aev_key(aid, EntityId::MAX_PERM, &DataValue::Bottom, Validity::MIN); + TripleAttrEntityIter::new(self.tx.iterator(), lower, upper) + } pub(crate) fn triple_a_before_scan( &self, aid: AttrId, @@ -480,28 +511,41 @@ impl SessionTx { struct TripleEntityAttrIter { it: DbIter, - current: EncodedVec, + started: bool, } impl TripleEntityAttrIter { - fn next_inner(&mut self) -> Result> { - self.it.seek(&self.current); + fn new( + builder: IterBuilder, + lower_bound: EncodedVec, + upper_bound: EncodedVec, + ) -> Self { + let mut it = builder.upper_bound(&upper_bound).start(); + it.seek(&lower_bound); + Self { it, started: false } + } + fn next_inner(&mut self) -> Result> { + if !self.started { + self.started = true; + } else { + self.it.next(); + } return match self.it.pair()? { None => Ok(None), Some((k_slice, v_slice)) => { - let (eid, aid, tid) = decode_ea_key(k_slice)?; - let v = decode_value_from_key(k_slice)?; - self.current.copy_from_slice(k_slice); - self.current.encoded_entity_amend_validity_to_inf_past(); - let op = StoreOp::try_from(v_slice[0])?; - Ok(Some((eid, aid, v, tid, op))) + let (eid, aid, _tid) = decode_ea_key(k_slice)?; + let mut v = decode_value_from_key(k_slice)?; + if v == DataValue::Guard { + v = decode_value_from_val(v_slice)?; + } + Ok(Some((eid, aid, v))) } }; } } impl Iterator for TripleEntityAttrIter { - type Item = Result<(EntityId, AttrId, DataValue, Validity, StoreOp)>; + type Item = Result<(EntityId, AttrId, DataValue)>; fn next(&mut self) -> Option { swap_option_result(self.next_inner()) @@ -569,32 +613,38 @@ impl Iterator for TripleEntityAttrBeforeIter { struct TripleAttrEntityIter { it: DbIter, - current: EncodedVec, + started: bool, } impl TripleAttrEntityIter { - fn next_inner(&mut self) -> Result> { - loop { - self.it.seek(&self.current); - match self.it.pair()? { - None => return Ok(None), - Some((k_slice, v_slice)) => { - let (aid, eid, tid) = decode_ae_key(k_slice)?; - let v = decode_value_from_key(k_slice)?; - self.current.copy_from_slice(k_slice); - self.current.encoded_entity_amend_validity_to_inf_past(); - let op = StoreOp::try_from(v_slice[0])?; - if op.is_assert() { - return Ok(Some((aid, eid, v, tid, op))); - } - } + fn new( + builder: IterBuilder, + lower_bound: EncodedVec, + upper_bound: EncodedVec, + ) -> Self { + let mut it = builder.upper_bound(&upper_bound).start(); + it.seek(&lower_bound); + Self { it, started: false } + } + fn next_inner(&mut self) -> Result> { + if self.started { + self.it.next() + } else { + self.started = true; + } + match self.it.key()? { + None => Ok(None), + Some(k_slice) => { + let (aid, eid, _tid) = decode_ae_key(k_slice)?; + let v = decode_value_from_key(k_slice)?; + Ok(Some((aid, eid, v))) } } } } impl Iterator for TripleAttrEntityIter { - type Item = Result<(AttrId, EntityId, DataValue, Validity, StoreOp)>; + type Item = Result<(AttrId, EntityId, DataValue)>; fn next(&mut self) -> Option { swap_option_result(self.next_inner()) @@ -662,28 +712,38 @@ impl Iterator for TripleAttrEntityBeforeIter { struct TripleAttrValueIter { it: DbIter, - current: EncodedVec, + started: bool, } impl TripleAttrValueIter { - fn next_inner(&mut self) -> Result> { - self.it.seek(&self.current); - return match self.it.pair()? { + fn new( + builder: IterBuilder, + lower_bound: EncodedVec, + upper_bound: EncodedVec, + ) -> Self { + let mut it = builder.upper_bound(&upper_bound).start(); + it.seek(&lower_bound); + Self { it, started: false } + } + fn next_inner(&mut self) -> Result> { + if self.started { + self.it.next(); + } else { + self.started = true; + } + return match self.it.key()? { None => Ok(None), - Some((k_slice, v_slice)) => { - let (aid, eid, tid) = decode_ae_key(k_slice)?; + Some(k_slice) => { + let (aid, eid, _tid) = decode_ae_key(k_slice)?; let v = decode_value_from_key(k_slice)?; - self.current.copy_from_slice(k_slice); - self.current.encoded_entity_amend_validity_to_inf_past(); - let op = StoreOp::try_from(v_slice[0])?; - Ok(Some((aid, v, eid, tid, op))) + Ok(Some((aid, v, eid))) } }; } } impl Iterator for TripleAttrValueIter { - type Item = Result<(AttrId, DataValue, EntityId, Validity, StoreOp)>; + type Item = Result<(AttrId, DataValue, EntityId)>; fn next(&mut self) -> Option { swap_option_result(self.next_inner()) @@ -810,27 +870,37 @@ impl Iterator for TripleAttrValueAfterIter { struct TripleValueRefAttrIter { it: DbIter, - current: EncodedVec, + started: bool, } impl TripleValueRefAttrIter { - fn next_inner(&mut self) -> Result> { - self.it.seek(&self.current); - return match self.it.pair()? { + fn new( + builder: IterBuilder, + lower_bound: EncodedVec, + upper_bound: EncodedVec, + ) -> Self { + let mut it = builder.upper_bound(&upper_bound).start(); + it.seek(&lower_bound); + Self { it, started: false } + } + fn next_inner(&mut self) -> Result> { + if self.started { + self.it.next(); + } else { + self.started = true; + } + return match self.it.key()? { None => Ok(None), - Some((k_slice, v_slice)) => { - let (v_eid, aid, eid, tid) = decode_vae_key(k_slice)?; - self.current.copy_from_slice(k_slice); - self.current.encoded_entity_amend_validity_to_inf_past(); - let op = StoreOp::try_from(v_slice[0])?; - Ok(Some((v_eid, aid, eid, tid, op))) + Some(k_slice) => { + let (v_eid, aid, eid, _) = decode_vae_key(k_slice)?; + Ok(Some((v_eid, aid, eid))) } }; } } impl Iterator for TripleValueRefAttrIter { - type Item = Result<(EntityId, AttrId, EntityId, Validity, StoreOp)>; + type Item = Result<(EntityId, AttrId, EntityId)>; fn next(&mut self) -> Option { swap_option_result(self.next_inner())