AVE forward and backward scanning

main
Ziyang Hu 2 years ago
parent f2e529bcd2
commit 28f5eeb13d

@ -24,7 +24,7 @@ macro_rules! return_if_resolved {
}
#[inline]
fn compare_key(a: &[u8], b: &[u8]) -> Ordering {
pub(crate) fn compare_key(a: &[u8], b: &[u8]) -> Ordering {
use StorageTag::*;
return_if_resolved!(a[0].cmp(&b[0]));

@ -1,5 +1,5 @@
use crate::data::attr::{Attribute, AttributeTyping};
use crate::data::compare::rusty_cmp;
use crate::data::compare::{compare_key, rusty_cmp};
use crate::data::encode::{
decode_ae_key, decode_ea_key, decode_vae_key, decode_value, decode_value_from_key,
decode_value_from_val, encode_aev_key, encode_ave_key, encode_ave_key_for_unique_v,
@ -193,44 +193,34 @@ impl SessionTx {
let ave_encoded = encode_ave_key(attr.id, v, e_in_key, vld_in_key);
// checking of unique constraints
if attr.indexing.is_unique_index() {
let current_ave_encoded = if attr.with_history {
ave_encoded.clone()
let (current_ave_encoded, vld_in_key) = if attr.with_history {
(ave_encoded.clone(), vld)
} else {
encode_ave_key(attr.id, v, e_in_key, Validity::NO_HISTORY)
(
encode_ave_key(attr.id, v, e_in_key, Validity::NO_HISTORY),
Validity::NO_HISTORY,
)
};
// back scan
if attr.with_history {
let ave_encoded_upper_bound =
encode_ave_key(attr.id, v, e_in_key, Validity::MIN);
if let Some((k_slice, v_slice)) = self
.bounded_scan_first(&current_ave_encoded, &ave_encoded_upper_bound)
.pair()?
{
let (_found_aid, found_eid, _found_vld) = decode_ae_key(k_slice)?;
let found_op = StoreOp::try_from(v_slice[0])?;
let found_v = decode_value_from_key(k_slice)?;
if found_eid != eid && found_op.is_assert() {
for item in self.triple_av_before_scan(attr.id, v, vld_in_key) {
let (_, _, found_eid) = item?;
if found_eid != eid {
return Err(TripleError::UniqueConstraintViolated(
attr.keyword.clone(),
format!("back scan found: {:?} vs {:?}", v, found_v),
format!("{:?}", v),
)
.into());
}
}
}
let ave_encoded_lower_bound = encode_ave_key(attr.id, v, e_in_key, Validity::MAX);
if let Some((k_slice, v_slice)) = self
.bounded_scan_last(&ave_encoded_lower_bound, &current_ave_encoded)
.pair()?
{
let (_found_aid, found_eid, _found_vld) = decode_ae_key(k_slice)?;
let found_op = StoreOp::try_from(v_slice[0])?;
let found_v = decode_value_from_key(k_slice)?;
if found_eid != eid && found_op.is_assert() {
for item in self.triple_av_after_scan(attr.id, v, vld_in_key) {
let (_, _, found_eid) = item?;
if found_eid != eid {
return Err(TripleError::UniqueConstraintViolated(
attr.keyword.clone(),
format!("forward scan found: {:?} vs {:?}", v, found_v),
format!("{:?}", v),
)
.into());
}
@ -475,6 +465,16 @@ impl SessionTx {
let upper = encode_ave_key(aid, v, EntityId::MAX_PERM, Validity::MIN);
TripleAttrValueBeforeIter::new(self.tx.iterator(), lower, upper, before)
}
pub(crate) fn triple_av_after_scan(
&mut self,
aid: AttrId,
v: &Value,
after: Validity,
) -> impl Iterator<Item = Result<(AttrId, StaticValue, EntityId)>> {
let lower = encode_ave_key(aid, v, EntityId::MIN_PERM, Validity::MAX);
let upper = encode_ave_key(aid, v, EntityId::MAX_PERM, Validity::MIN);
TripleAttrValueAfterIter::new(self.tx.iterator(), lower, upper, after)
}
pub(crate) fn triple_vref_a_scan(
&mut self,
v_eid: EntityId,
@ -883,6 +883,65 @@ impl Iterator for TripleAttrValueBeforeIter {
}
}
// after version
struct TripleAttrValueAfterIter {
it: DbIter,
lower_bound: EncodedVec<LARGE_VEC_SIZE>,
current: EncodedVec<LARGE_VEC_SIZE>,
after: Validity,
}
impl TripleAttrValueAfterIter {
fn new(
builder: IterBuilder,
lower_bound: EncodedVec<LARGE_VEC_SIZE>,
upper_bound: EncodedVec<LARGE_VEC_SIZE>,
after: Validity,
) -> Self {
let it = builder.lower_bound(&lower_bound).start();
Self {
it,
lower_bound,
current: upper_bound,
after,
}
}
fn next_inner(&mut self) -> Result<Option<(AttrId, StaticValue, EntityId)>> {
loop {
self.it.seek_back(&self.current);
match self.it.pair()? {
None => return Ok(None),
Some((k_slice, v_slice)) => {
if compare_key(k_slice, &self.lower_bound) == std::cmp::Ordering::Less {
return Ok(None);
}
let (aid, eid, tid) = decode_ae_key(k_slice)?;
if tid < self.after {
self.current.encoded_entity_amend_validity(self.after);
continue;
}
let v = decode_value_from_key(k_slice)?;
self.current.copy_from_slice(k_slice);
self.current.encoded_entity_amend_validity_to_last();
let op = StoreOp::try_from(v_slice[0])?;
if op.is_assert() {
return Ok(Some((aid, v.to_static(), eid)));
}
}
}
}
}
}
impl Iterator for TripleAttrValueAfterIter {
type Item = Result<(AttrId, StaticValue, EntityId)>;
fn next(&mut self) -> Option<Self::Item> {
swap_option_result(self.next_inner())
}
}
// normal version
struct TripleValueRefAttrIter {

Loading…
Cancel
Save