specialized iterator for no history

main
Ziyang Hu 2 years ago
parent 78e49e4a52
commit c624951f45

1
.gitignore vendored

@ -31,3 +31,4 @@ _test*
*.db *.db
cozorocks/deps/ cozorocks/deps/
.DS_Store .DS_Store
flamegraph.svg

@ -39,5 +39,8 @@ cozorocks = { path = "cozorocks" }
#[profile.release] #[profile.release]
#lto = true #lto = true
[profile.release]
debug = true
[workspace] [workspace]
members = ["cozorocks", "cozohttp", "cozopy", "cozoplay/src-tauri"] members = ["cozorocks", "cozohttp", "cozopy", "cozoplay/src-tauri"]

@ -3,11 +3,12 @@ use std::fmt::{Debug, Formatter};
use std::iter; use std::iter;
use anyhow::{anyhow, bail, Context, Result}; use anyhow::{anyhow, bail, Context, Result};
use either::{Left, Right};
use itertools::Itertools; use itertools::Itertools;
use crate::data::attr::Attribute; use crate::data::attr::Attribute;
use crate::data::expr::Expr; use crate::data::expr::Expr;
use crate::data::id::Validity; use crate::data::id::{AttrId, EntityId, Validity};
use crate::data::symb::Symbol; use crate::data::symb::Symbol;
use crate::data::tuple::{Tuple, TupleIter}; use crate::data::tuple::{Tuple, TupleIter};
use crate::data::value::DataValue; use crate::data::value::DataValue;
@ -623,10 +624,17 @@ impl TripleRelation {
} }
fn iter<'a>(&'a self, tx: &'a SessionTx) -> TupleIter<'a> { fn iter<'a>(&'a self, tx: &'a SessionTx) -> TupleIter<'a> {
if self.attr.with_history {
let it = tx let it = tx
.triple_a_before_scan(self.attr.id, self.vld) .triple_a_before_scan(self.attr.id, self.vld)
.map_ok(|(_, e_id, y)| Tuple(vec![e_id.to_value(), y])); .map_ok(|(_, e_id, y)| Tuple(vec![e_id.to_value(), y]));
self.return_filtered_iter(it, Default::default()) self.return_filtered_iter(it, Default::default())
} else {
let it = tx
.triple_a_scan(self.attr.id)
.map_ok(|(_, e_id, y)| Tuple(vec![e_id.to_value(), y]));
self.return_filtered_iter(it, Default::default())
}
} }
pub(crate) fn neg_join<'a>( pub(crate) fn neg_join<'a>(
@ -713,13 +721,26 @@ impl TripleRelation {
// [f, f] not really a join // [f, f] not really a join
let it = left_iter let it = left_iter
.map_ok(|tuple| { .map_ok(|tuple| {
tx.triple_a_before_scan(self.attr.id, self.vld) if self.attr.with_history {
Left(tx.triple_a_before_scan(self.attr.id, self.vld).map_ok(
move |(_, e_id, val)| {
let mut ret = tuple.0.clone();
ret.push(e_id.to_value());
ret.push(val);
Tuple(ret)
},
))
} else {
Right(
tx.triple_a_scan(self.attr.id)
.map_ok(move |(_, e_id, val)| { .map_ok(move |(_, e_id, val)| {
let mut ret = tuple.0.clone(); let mut ret = tuple.0.clone();
ret.push(e_id.to_value()); ret.push(e_id.to_value());
ret.push(val); ret.push(val);
Tuple(ret) Tuple(ret)
}) }),
)
}
}) })
.flatten_ok() .flatten_ok()
.map(flatten_err); .map(flatten_err);
@ -818,7 +839,12 @@ impl TripleRelation {
.unwrap() .unwrap()
.get_entity_id() .get_entity_id()
.with_context(|| format!("{:?}, {:?}", self, tuple))?; .with_context(|| format!("{:?}, {:?}", self, tuple))?;
match tx.triple_ea_before_scan(eid, self.attr.id, self.vld).next() { let nxt = if self.attr.with_history {
tx.triple_ea_before_scan(eid, self.attr.id, self.vld).next()
} else {
tx.triple_ea_scan(eid, self.attr.id).next()
};
match nxt {
None => Ok(if !eliminate_indices.is_empty() { None => Ok(if !eliminate_indices.is_empty() {
Some(Tuple( Some(Tuple(
tuple tuple
@ -862,13 +888,20 @@ impl TripleRelation {
.get_entity_id() .get_entity_id()
.with_context(|| format!("{:?}, {:?}, {}", self, tuple, left_e_idx)) .with_context(|| format!("{:?}, {:?}, {}", self, tuple, left_e_idx))
.map(move |eid| { .map(move |eid| {
tx.triple_ea_before_scan(eid, self.attr.id, self.vld) let clj = move |(eid, _, val): (EntityId, AttrId, DataValue)| {
.map_ok(move |(eid, _, val)| {
let mut ret = tuple.0.clone(); let mut ret = tuple.0.clone();
ret.push(eid.to_value()); ret.push(eid.to_value());
ret.push(val); ret.push(val);
Tuple(ret) Tuple(ret)
}) };
if self.attr.with_history {
Left(tx.triple_ea_scan(eid, self.attr.id).map_ok(clj))
} else {
Right(
tx.triple_ea_before_scan(eid, self.attr.id, self.vld)
.map_ok(clj),
)
}
}) })
}) })
.map(flatten_err) .map(flatten_err)
@ -892,10 +925,13 @@ impl TripleRelation {
.unwrap() .unwrap()
.get_entity_id() .get_entity_id()
.with_context(|| format!("{:?}", self))?; .with_context(|| format!("{:?}", self))?;
match tx let nxt = if self.attr.with_history {
.triple_vref_a_before_scan(v_eid, self.attr.id, self.vld) tx.triple_vref_a_before_scan(v_eid, self.attr.id, self.vld)
.next() .next()
{ } else {
tx.triple_vref_a_scan(v_eid, self.attr.id).next()
};
match nxt {
None => Ok(if !eliminate_indices.is_empty() { None => Ok(if !eliminate_indices.is_empty() {
Some(Tuple( Some(Tuple(
tuple tuple
@ -964,7 +1000,12 @@ impl TripleRelation {
left_iter left_iter
.map_ok(move |tuple| -> Result<Option<Tuple>> { .map_ok(move |tuple| -> Result<Option<Tuple>> {
let val = tuple.0.get(left_v_idx).unwrap(); let val = tuple.0.get(left_v_idx).unwrap();
match tx.triple_av_before_scan(self.attr.id, val, self.vld).next() { let nxt = if self.attr.with_history {
tx.triple_av_before_scan(self.attr.id, val, self.vld).next()
} else {
tx.triple_av_scan(self.attr.id, val).next()
};
match nxt {
None => Ok(if !eliminate_indices.is_empty() { None => Ok(if !eliminate_indices.is_empty() {
Some(Tuple( Some(Tuple(
tuple tuple
@ -1002,13 +1043,26 @@ impl TripleRelation {
let it = left_iter let it = left_iter
.map_ok(move |tuple| { .map_ok(move |tuple| {
let val = tuple.0.get(left_v_idx).unwrap(); let val = tuple.0.get(left_v_idx).unwrap();
if self.attr.with_history {
Left(
tx.triple_av_before_scan(self.attr.id, val, self.vld) tx.triple_av_before_scan(self.attr.id, val, self.vld)
.map_ok(move |(_, val, eid)| { .map_ok(move |(_, val, eid): (AttrId, DataValue, EntityId)| {
let mut ret = tuple.0.clone(); let mut ret = tuple.0.clone();
ret.push(eid.to_value()); ret.push(eid.to_value());
ret.push(val); ret.push(val);
Tuple(ret) Tuple(ret)
}) }),
)
} else {
Right(tx.triple_av_scan(self.attr.id, val).map_ok(
move |(_, val, eid): (AttrId, DataValue, EntityId)| {
let mut ret = tuple.0.clone();
ret.push(eid.to_value());
ret.push(val);
Tuple(ret)
},
))
}
}) })
.flatten_ok() .flatten_ok()
.map(flatten_err); .map(flatten_err);
@ -1025,7 +1079,12 @@ impl TripleRelation {
left_iter left_iter
.map_ok(move |tuple| -> Result<Option<Tuple>> { .map_ok(move |tuple| -> Result<Option<Tuple>> {
let val = tuple.0.get(left_v_idx).unwrap(); let val = tuple.0.get(left_v_idx).unwrap();
for item in tx.triple_a_before_scan(self.attr.id, self.vld) { let it = if self.attr.with_history {
Left(tx.triple_a_before_scan(self.attr.id, self.vld))
} else {
Right(tx.triple_a_scan(self.attr.id))
};
for item in it {
let (_, _, found_val) = item?; let (_, _, found_val) = item?;
if *val == found_val { if *val == found_val {
return Ok(None); return Ok(None);
@ -1063,7 +1122,12 @@ impl TripleRelation {
) -> TupleIter<'a> { ) -> TupleIter<'a> {
// [f, b] where b is not indexed // [f, b] where b is not indexed
let throwaway = tx.new_temp_store(); let throwaway = tx.new_temp_store();
for item in tx.triple_a_before_scan(self.attr.id, self.vld) { let it = if self.attr.with_history {
Left(tx.triple_a_before_scan(self.attr.id, self.vld))
} else {
Right(tx.triple_a_scan(self.attr.id))
};
for item in it {
match item { match item {
Err(e) => return Box::new([Err(e)].into_iter()), Err(e) => return Box::new([Err(e)].into_iter()),
Ok((_, eid, val)) => { Ok((_, eid, val)) => {

@ -145,11 +145,7 @@ impl SessionTx {
let (v_in_key, v_in_val) = if attr.cardinality.is_one() { let (v_in_key, v_in_val) = if attr.cardinality.is_one() {
( (
&DataValue::Guard, &DataValue::Guard,
if op.is_assert() { if op.is_assert() { v } else { &DataValue::Guard },
v
} else {
&DataValue::Guard
},
) )
} else { } else {
(v, &DataValue::Guard) (v, &DataValue::Guard)
@ -410,6 +406,15 @@ impl SessionTx {
}, },
) )
} }
pub(crate) fn triple_ea_scan(
&self,
eid: EntityId,
aid: AttrId,
) -> impl Iterator<Item = Result<(EntityId, AttrId, DataValue)>> {
let lower = encode_eav_key(eid, aid, &DataValue::Null, Validity::MAX);
let upper = encode_eav_key(eid, aid, &DataValue::Bottom, Validity::MIN);
TripleEntityAttrIter::new(self.tx.iterator(), lower, upper)
}
pub(crate) fn triple_ea_before_scan( pub(crate) fn triple_ea_before_scan(
&self, &self,
eid: EntityId, eid: EntityId,
@ -435,6 +440,15 @@ impl SessionTx {
} }
Ok(false) Ok(false)
} }
pub(crate) fn triple_av_scan(
&self,
aid: AttrId,
v: &DataValue,
) -> impl Iterator<Item = Result<(AttrId, DataValue, EntityId)>> {
let lower = encode_ave_key(aid, v, EntityId::ZERO, Validity::MAX);
let upper = encode_ave_key(aid, v, EntityId::MAX_PERM, Validity::MIN);
TripleAttrValueIter::new(self.tx.iterator(), lower, upper)
}
pub(crate) fn triple_av_before_scan( pub(crate) fn triple_av_before_scan(
&self, &self,
aid: AttrId, aid: AttrId,
@ -455,6 +469,15 @@ impl SessionTx {
let upper = encode_ave_key(aid, v, EntityId::MAX_PERM, Validity::MIN); let upper = encode_ave_key(aid, v, EntityId::MAX_PERM, Validity::MIN);
TripleAttrValueAfterIter::new(self.tx.iterator(), lower, upper, after) TripleAttrValueAfterIter::new(self.tx.iterator(), lower, upper, after)
} }
pub(crate) fn triple_vref_a_scan(
&self,
v_eid: EntityId,
aid: AttrId,
) -> impl Iterator<Item = Result<(EntityId, AttrId, EntityId)>> {
let lower = encode_vae_key(v_eid, aid, EntityId::ZERO, Validity::MAX);
let upper = encode_vae_key(v_eid, aid, EntityId::MAX_PERM, Validity::MIN);
TripleValueRefAttrIter::new(self.tx.iterator(), lower, upper)
}
pub(crate) fn triple_vref_a_before_scan( pub(crate) fn triple_vref_a_before_scan(
&self, &self,
v_eid: EntityId, v_eid: EntityId,
@ -465,6 +488,14 @@ impl SessionTx {
let upper = encode_vae_key(v_eid, aid, EntityId::MAX_PERM, Validity::MIN); let upper = encode_vae_key(v_eid, aid, EntityId::MAX_PERM, Validity::MIN);
TripleValueRefAttrBeforeIter::new(self.tx.iterator(), lower, upper, before) TripleValueRefAttrBeforeIter::new(self.tx.iterator(), lower, upper, before)
} }
pub(crate) fn triple_a_scan(
&self,
aid: AttrId,
) -> impl Iterator<Item = Result<(AttrId, EntityId, DataValue)>> {
let lower = encode_aev_key(aid, EntityId::ZERO, &DataValue::Null, Validity::MAX);
let upper = encode_aev_key(aid, EntityId::MAX_PERM, &DataValue::Bottom, Validity::MIN);
TripleAttrEntityIter::new(self.tx.iterator(), lower, upper)
}
pub(crate) fn triple_a_before_scan( pub(crate) fn triple_a_before_scan(
&self, &self,
aid: AttrId, aid: AttrId,
@ -480,28 +511,41 @@ impl SessionTx {
struct TripleEntityAttrIter { struct TripleEntityAttrIter {
it: DbIter, it: DbIter,
current: EncodedVec<LARGE_VEC_SIZE>, started: bool,
} }
impl TripleEntityAttrIter { impl TripleEntityAttrIter {
fn next_inner(&mut self) -> Result<Option<(EntityId, AttrId, DataValue, Validity, StoreOp)>> { fn new(
self.it.seek(&self.current); builder: IterBuilder,
lower_bound: EncodedVec<LARGE_VEC_SIZE>,
upper_bound: EncodedVec<LARGE_VEC_SIZE>,
) -> Self {
let mut it = builder.upper_bound(&upper_bound).start();
it.seek(&lower_bound);
Self { it, started: false }
}
fn next_inner(&mut self) -> Result<Option<(EntityId, AttrId, DataValue)>> {
if !self.started {
self.started = true;
} else {
self.it.next();
}
return match self.it.pair()? { return match self.it.pair()? {
None => Ok(None), None => Ok(None),
Some((k_slice, v_slice)) => { Some((k_slice, v_slice)) => {
let (eid, aid, tid) = decode_ea_key(k_slice)?; let (eid, aid, _tid) = decode_ea_key(k_slice)?;
let v = decode_value_from_key(k_slice)?; let mut v = decode_value_from_key(k_slice)?;
self.current.copy_from_slice(k_slice); if v == DataValue::Guard {
self.current.encoded_entity_amend_validity_to_inf_past(); v = decode_value_from_val(v_slice)?;
let op = StoreOp::try_from(v_slice[0])?; }
Ok(Some((eid, aid, v, tid, op))) Ok(Some((eid, aid, v)))
} }
}; };
} }
} }
impl Iterator for TripleEntityAttrIter { impl Iterator for TripleEntityAttrIter {
type Item = Result<(EntityId, AttrId, DataValue, Validity, StoreOp)>; type Item = Result<(EntityId, AttrId, DataValue)>;
fn next(&mut self) -> Option<Self::Item> { fn next(&mut self) -> Option<Self::Item> {
swap_option_result(self.next_inner()) swap_option_result(self.next_inner())
@ -569,32 +613,38 @@ impl Iterator for TripleEntityAttrBeforeIter {
struct TripleAttrEntityIter { struct TripleAttrEntityIter {
it: DbIter, it: DbIter,
current: EncodedVec<LARGE_VEC_SIZE>, started: bool,
} }
impl TripleAttrEntityIter { impl TripleAttrEntityIter {
fn next_inner(&mut self) -> Result<Option<(AttrId, EntityId, DataValue, Validity, StoreOp)>> { fn new(
loop { builder: IterBuilder,
self.it.seek(&self.current); lower_bound: EncodedVec<LARGE_VEC_SIZE>,
match self.it.pair()? { upper_bound: EncodedVec<LARGE_VEC_SIZE>,
None => return Ok(None), ) -> Self {
Some((k_slice, v_slice)) => { let mut it = builder.upper_bound(&upper_bound).start();
let (aid, eid, tid) = decode_ae_key(k_slice)?; it.seek(&lower_bound);
let v = decode_value_from_key(k_slice)?; Self { it, started: false }
self.current.copy_from_slice(k_slice);
self.current.encoded_entity_amend_validity_to_inf_past();
let op = StoreOp::try_from(v_slice[0])?;
if op.is_assert() {
return Ok(Some((aid, eid, v, tid, op)));
} }
fn next_inner(&mut self) -> Result<Option<(AttrId, EntityId, DataValue)>> {
if self.started {
self.it.next()
} else {
self.started = true;
} }
match self.it.key()? {
None => Ok(None),
Some(k_slice) => {
let (aid, eid, _tid) = decode_ae_key(k_slice)?;
let v = decode_value_from_key(k_slice)?;
Ok(Some((aid, eid, v)))
} }
} }
} }
} }
impl Iterator for TripleAttrEntityIter { impl Iterator for TripleAttrEntityIter {
type Item = Result<(AttrId, EntityId, DataValue, Validity, StoreOp)>; type Item = Result<(AttrId, EntityId, DataValue)>;
fn next(&mut self) -> Option<Self::Item> { fn next(&mut self) -> Option<Self::Item> {
swap_option_result(self.next_inner()) swap_option_result(self.next_inner())
@ -662,28 +712,38 @@ impl Iterator for TripleAttrEntityBeforeIter {
struct TripleAttrValueIter { struct TripleAttrValueIter {
it: DbIter, it: DbIter,
current: EncodedVec<LARGE_VEC_SIZE>, started: bool,
} }
impl TripleAttrValueIter { impl TripleAttrValueIter {
fn next_inner(&mut self) -> Result<Option<(AttrId, DataValue, EntityId, Validity, StoreOp)>> { fn new(
self.it.seek(&self.current); builder: IterBuilder,
return match self.it.pair()? { lower_bound: EncodedVec<LARGE_VEC_SIZE>,
upper_bound: EncodedVec<LARGE_VEC_SIZE>,
) -> Self {
let mut it = builder.upper_bound(&upper_bound).start();
it.seek(&lower_bound);
Self { it, started: false }
}
fn next_inner(&mut self) -> Result<Option<(AttrId, DataValue, EntityId)>> {
if self.started {
self.it.next();
} else {
self.started = true;
}
return match self.it.key()? {
None => Ok(None), None => Ok(None),
Some((k_slice, v_slice)) => { Some(k_slice) => {
let (aid, eid, tid) = decode_ae_key(k_slice)?; let (aid, eid, _tid) = decode_ae_key(k_slice)?;
let v = decode_value_from_key(k_slice)?; let v = decode_value_from_key(k_slice)?;
self.current.copy_from_slice(k_slice); Ok(Some((aid, v, eid)))
self.current.encoded_entity_amend_validity_to_inf_past();
let op = StoreOp::try_from(v_slice[0])?;
Ok(Some((aid, v, eid, tid, op)))
} }
}; };
} }
} }
impl Iterator for TripleAttrValueIter { impl Iterator for TripleAttrValueIter {
type Item = Result<(AttrId, DataValue, EntityId, Validity, StoreOp)>; type Item = Result<(AttrId, DataValue, EntityId)>;
fn next(&mut self) -> Option<Self::Item> { fn next(&mut self) -> Option<Self::Item> {
swap_option_result(self.next_inner()) swap_option_result(self.next_inner())
@ -810,27 +870,37 @@ impl Iterator for TripleAttrValueAfterIter {
struct TripleValueRefAttrIter { struct TripleValueRefAttrIter {
it: DbIter, it: DbIter,
current: EncodedVec<LARGE_VEC_SIZE>, started: bool,
} }
impl TripleValueRefAttrIter { impl TripleValueRefAttrIter {
fn next_inner(&mut self) -> Result<Option<(EntityId, AttrId, EntityId, Validity, StoreOp)>> { fn new(
self.it.seek(&self.current); builder: IterBuilder,
return match self.it.pair()? { lower_bound: EncodedVec<LARGE_VEC_SIZE>,
upper_bound: EncodedVec<LARGE_VEC_SIZE>,
) -> Self {
let mut it = builder.upper_bound(&upper_bound).start();
it.seek(&lower_bound);
Self { it, started: false }
}
fn next_inner(&mut self) -> Result<Option<(EntityId, AttrId, EntityId)>> {
if self.started {
self.it.next();
} else {
self.started = true;
}
return match self.it.key()? {
None => Ok(None), None => Ok(None),
Some((k_slice, v_slice)) => { Some(k_slice) => {
let (v_eid, aid, eid, tid) = decode_vae_key(k_slice)?; let (v_eid, aid, eid, _) = decode_vae_key(k_slice)?;
self.current.copy_from_slice(k_slice); Ok(Some((v_eid, aid, eid)))
self.current.encoded_entity_amend_validity_to_inf_past();
let op = StoreOp::try_from(v_slice[0])?;
Ok(Some((v_eid, aid, eid, tid, op)))
} }
}; };
} }
} }
impl Iterator for TripleValueRefAttrIter { impl Iterator for TripleValueRefAttrIter {
type Item = Result<(EntityId, AttrId, EntityId, Validity, StoreOp)>; type Item = Result<(EntityId, AttrId, EntityId)>;
fn next(&mut self) -> Option<Self::Item> { fn next(&mut self) -> Option<Self::Item> {
swap_option_result(self.next_inner()) swap_option_result(self.next_inner())

Loading…
Cancel
Save