first run txid -> validity, make compiler happy

main
Ziyang Hu 2 years ago
parent 2bc4adc858
commit a193c9aa82

@ -1,5 +1,5 @@
use crate::data::encode::{decode_value, EncodedVec}; use crate::data::encode::{decode_value, EncodedVec};
use crate::data::id::EntityId; use crate::data::id::{EntityId, TxId};
use crate::data::keyword::Keyword; use crate::data::keyword::Keyword;
use crate::data::triple::StoreOp; use crate::data::triple::StoreOp;
use anyhow::Result; use anyhow::Result;
@ -78,9 +78,14 @@ pub(crate) type StaticValue = Value<'static>;
pub(crate) const INLINE_VAL_SIZE_LIMIT: usize = 60; pub(crate) const INLINE_VAL_SIZE_LIMIT: usize = 60;
impl<'a> Value<'a> { impl<'a> Value<'a> {
pub(crate) fn encode_with_op(&self, op: StoreOp) -> EncodedVec<INLINE_VAL_SIZE_LIMIT> { pub(crate) fn encode_with_op_and_tx(
&self,
op: StoreOp,
txid: TxId,
) -> EncodedVec<INLINE_VAL_SIZE_LIMIT> {
let mut ret = SmallVec::<[u8; INLINE_VAL_SIZE_LIMIT]>::new(); let mut ret = SmallVec::<[u8; INLINE_VAL_SIZE_LIMIT]>::new();
ret.push(op as u8); ret.extend(txid.bytes());
ret[0] = op as u8;
self.serialize(&mut Serializer::new(&mut ret)).unwrap(); self.serialize(&mut Serializer::new(&mut ret)).unwrap();
ret.into() ret.into()
} }

@ -1,7 +1,6 @@
use crate::data::compare::{rusty_cmp, DB_KEY_PREFIX_LEN}; use crate::data::compare::{rusty_cmp, DB_KEY_PREFIX_LEN};
use crate::data::encode::encode_tx;
use crate::data::id::TxId; use crate::data::id::TxId;
use crate::runtime::transact::{SessionTx, TxLog}; use crate::runtime::transact::SessionTx;
use anyhow::Result; use anyhow::Result;
use cozorocks::{DbBuilder, DbIter, RocksDb}; use cozorocks::{DbBuilder, DbIter, RocksDb};
use std::fmt::{Debug, Formatter}; use std::fmt::{Debug, Formatter};
@ -62,19 +61,18 @@ impl Db {
} }
fn load_last_ids(&self) -> Result<()> { fn load_last_ids(&self) -> Result<()> {
let mut tx = self.transact(None)?; let mut tx = self.transact()?;
self.last_tx_id.store(tx.r_tx_id.0, Ordering::Release); self.last_tx_id
.store(tx.load_last_tx_id()?.0, Ordering::Release);
self.last_attr_id self.last_attr_id
.store(tx.load_last_attr_id()?.0, Ordering::Release); .store(tx.load_last_attr_id()?.0, Ordering::Release);
self.last_ent_id self.last_ent_id
.store(tx.load_last_entity_id()?.0, Ordering::Release); .store(tx.load_last_entity_id()?.0, Ordering::Release);
Ok(()) Ok(())
} }
pub(crate) fn transact(&self, at: Option<TxId>) -> Result<SessionTx> { pub(crate) fn transact(&self) -> Result<SessionTx> {
let tx_id = at.unwrap_or(TxId::ZERO); let ret = SessionTx {
let mut ret = SessionTx {
tx: self.db.transact().set_snapshot(true).start(), tx: self.db.transact().set_snapshot(true).start(),
r_tx_id: tx_id,
w_tx_id: None, w_tx_id: None,
last_attr_id: self.last_attr_id.clone(), last_attr_id: self.last_attr_id.clone(),
last_ent_id: self.last_ent_id.clone(), last_ent_id: self.last_ent_id.clone(),
@ -85,10 +83,6 @@ impl Db {
eid_by_attr_val_cache: Default::default(), eid_by_attr_val_cache: Default::default(),
touched_eids: Default::default(), touched_eids: Default::default(),
}; };
if at.is_none() {
let tid = ret.load_last_tx_id()?;
ret.r_tx_id = tid;
}
Ok(ret) Ok(ret)
} }
pub(crate) fn transact_write(&self) -> Result<SessionTx> { pub(crate) fn transact_write(&self) -> Result<SessionTx> {
@ -97,7 +91,6 @@ impl Db {
let ret = SessionTx { let ret = SessionTx {
tx: self.db.transact().set_snapshot(true).start(), tx: self.db.transact().set_snapshot(true).start(),
r_tx_id: cur_tx_id,
w_tx_id: Some(cur_tx_id), w_tx_id: Some(cur_tx_id),
last_attr_id: self.last_attr_id.clone(), last_attr_id: self.last_attr_id.clone(),
last_ent_id: self.last_ent_id.clone(), last_ent_id: self.last_ent_id.clone(),
@ -115,44 +108,4 @@ impl Db {
it.seek_to_start(); it.seek_to_start();
it it
} }
pub(crate) fn find_tx_before_timestamp_millis(&self, ts: i64) -> Result<Option<TxLog>> {
// binary search
let lower_bound = encode_tx(TxId::MAX_SYS);
let upper_bound = encode_tx(TxId::MAX_USER);
// both are inclusive bounds
let mut lower_found = TxId::MIN_USER;
let mut upper_found = TxId::MAX_USER;
let mut it = self
.transact_write()?
.tx
.iterator()
.lower_bound(&lower_bound)
.upper_bound(&upper_bound)
.start();
loop {
let needle = TxId((lower_found.0 + upper_found.0) / 2);
let current = encode_tx(needle);
it.seek(&current);
match it.val()? {
Some(v_slice) => {
let log = TxLog::decode(v_slice)?;
let found_ts = log.timestamp;
if found_ts == ts || needle == upper_found || needle == lower_found {
return Ok(Some(log));
}
if found_ts < ts {
lower_found = log.id;
continue;
}
if found_ts > ts {
upper_found = TxId(log.id.0 - 1);
continue;
}
}
None => return Ok(None),
}
}
}
} }

@ -1,6 +1,6 @@
use crate::data::attr::Attribute; use crate::data::attr::Attribute;
use crate::data::encode::{encode_tx, encode_unique_attr_by_id, encode_unique_entity, EncodedVec}; use crate::data::encode::{encode_tx, encode_unique_attr_by_id, encode_unique_entity, EncodedVec};
use crate::data::id::{AttrId, EntityId, TxId}; use crate::data::id::{AttrId, EntityId, TxId, Validity};
use crate::data::keyword::Keyword; use crate::data::keyword::Keyword;
use crate::data::value::StaticValue; use crate::data::value::StaticValue;
use anyhow::Result; use anyhow::Result;
@ -12,11 +12,9 @@ use smallvec::SmallVec;
use std::collections::{BTreeMap, BTreeSet}; use std::collections::{BTreeMap, BTreeSet};
use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc; use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
pub(crate) struct SessionTx { pub(crate) struct SessionTx {
pub(crate) tx: Tx, pub(crate) tx: Tx,
pub(crate) r_tx_id: TxId,
pub(crate) w_tx_id: Option<TxId>, pub(crate) w_tx_id: Option<TxId>,
pub(crate) last_attr_id: Arc<AtomicU64>, pub(crate) last_attr_id: Arc<AtomicU64>,
pub(crate) last_ent_id: Arc<AtomicU64>, pub(crate) last_ent_id: Arc<AtomicU64>,
@ -36,15 +34,12 @@ pub(crate) struct TxLog {
#[serde(rename = "c")] #[serde(rename = "c")]
pub(crate) comment: String, pub(crate) comment: String,
#[serde(rename = "t")] #[serde(rename = "t")]
pub(crate) timestamp: i64, pub(crate) timestamp: Validity,
} }
impl TxLog { impl TxLog {
pub(crate) fn new(id: TxId, comment: &str) -> Self { pub(crate) fn new(id: TxId, comment: &str) -> Self {
let timestamp = SystemTime::now() let timestamp = Validity::current();
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as i64;
Self { Self {
id, id,
comment: comment.to_string(), comment: comment.to_string(),
@ -111,7 +106,6 @@ impl SessionTx {
if refresh { if refresh {
let new_tx_id = TxId(self.last_tx_id.fetch_add(1, Ordering::AcqRel) + 1); let new_tx_id = TxId(self.last_tx_id.fetch_add(1, Ordering::AcqRel) + 1);
self.tx.set_snapshot(); self.tx.set_snapshot();
self.r_tx_id = new_tx_id;
self.w_tx_id = Some(new_tx_id); self.w_tx_id = Some(new_tx_id);
self.clear_cache(); self.clear_cache();
} }

@ -18,7 +18,7 @@ impl SessionTx {
return Ok(res.clone()); return Ok(res.clone());
} }
let anchor = encode_attr_by_id(aid, self.r_tx_id); let anchor = encode_attr_by_id(aid, TxId::MAX_USER);
let upper = encode_attr_by_id(aid, TxId::ZERO); let upper = encode_attr_by_id(aid, TxId::ZERO);
let it = self.bounded_scan_first(&anchor, &upper); let it = self.bounded_scan_first(&anchor, &upper);
Ok(match it.pair()? { Ok(match it.pair()? {
@ -49,7 +49,7 @@ impl SessionTx {
return Ok(res.clone()); return Ok(res.clone());
} }
let anchor = encode_attr_by_kw(kw, self.r_tx_id); let anchor = encode_attr_by_kw(kw, TxId::MAX_USER);
let upper = encode_attr_by_kw(kw, TxId::ZERO); let upper = encode_attr_by_kw(kw, TxId::ZERO);
let it = self.bounded_scan_first(&anchor, &upper); let it = self.bounded_scan_first(&anchor, &upper);
Ok(match it.pair()? { Ok(match it.pair()? {
@ -75,7 +75,7 @@ impl SessionTx {
} }
pub(crate) fn all_attrs(&mut self) -> impl Iterator<Item = Result<Attribute>> { pub(crate) fn all_attrs(&mut self) -> impl Iterator<Item = Result<Attribute>> {
AttrIter::new(self.tx.iterator(), self.r_tx_id) AttrIter::new(self.tx.iterator(), TxId::MAX_USER)
} }
/// conflict if new attribute has same name as existing one /// conflict if new attribute has same name as existing one

@ -4,7 +4,7 @@ use crate::data::encode::{
encode_aev_key, encode_ave_key, encode_ave_key_for_unique_v, encode_eav_key, encode_aev_key, encode_ave_key, encode_ave_key_for_unique_v, encode_eav_key,
encode_unique_attr_val, encode_unique_entity, encode_vae_key, EncodedVec, LARGE_VEC_SIZE, encode_unique_attr_val, encode_unique_entity, encode_vae_key, EncodedVec, LARGE_VEC_SIZE,
}; };
use crate::data::id::{AttrId, EntityId, TxId}; use crate::data::id::{AttrId, EntityId, Validity};
use crate::data::keyword::Keyword; use crate::data::keyword::Keyword;
use crate::data::triple::StoreOp; use crate::data::triple::StoreOp;
use crate::data::value::{StaticValue, Value, INLINE_VAL_SIZE_LIMIT}; use crate::data::value::{StaticValue, Value, INLINE_VAL_SIZE_LIMIT};
@ -23,7 +23,7 @@ enum TripleError {
#[error("unique constraint violated: {0} {1}")] #[error("unique constraint violated: {0} {1}")]
UniqueConstraintViolated(Keyword, String), UniqueConstraintViolated(Keyword, String),
#[error("triple not found for {0:?} {1:?} {2:?}")] #[error("triple not found for {0:?} {1:?} {2:?}")]
TripleEANotFound(EntityId, AttrId, TxId), TripleEANotFound(EntityId, AttrId, Validity),
} }
impl SessionTx { impl SessionTx {
@ -32,13 +32,14 @@ impl SessionTx {
eid: EntityId, eid: EntityId,
attr: &Attribute, attr: &Attribute,
v: &Value, v: &Value,
vld: Validity,
op: StoreOp, op: StoreOp,
) -> Result<()> { ) -> Result<()> {
let tx_id = self.get_write_tx_id()?; let tx_id = self.get_write_tx_id()?;
let tx_id_in_key = if attr.with_history { let vld_in_key = if attr.with_history {
tx_id vld
} else { } else {
TxId::NO_HISTORY Validity::MIN
}; };
// elide value in key for eav and aev if cardinality is one // elide value in key for eav and aev if cardinality is one
let (v_in_key, v_in_val) = if attr.cardinality.is_one() { let (v_in_key, v_in_val) = if attr.cardinality.is_one() {
@ -49,8 +50,8 @@ impl SessionTx {
} else { } else {
(v, &Value::Bottom) (v, &Value::Bottom)
}; };
let eav_encoded = encode_eav_key(eid, attr.id, v_in_key, tx_id_in_key); let eav_encoded = encode_eav_key(eid, attr.id, v_in_key, vld_in_key);
let val_encoded = v_in_val.encode_with_op(op); let val_encoded = v_in_val.encode_with_op_and_tx(op, tx_id);
self.tx.put(&eav_encoded, &val_encoded)?; self.tx.put(&eav_encoded, &val_encoded)?;
// elide value in data for aev if it is big // elide value in data for aev if it is big
@ -60,12 +61,12 @@ impl SessionTx {
val_encoded val_encoded
}; };
let aev_encoded = encode_aev_key(attr.id, eid, v_in_key, tx_id_in_key); let aev_encoded = encode_aev_key(attr.id, eid, v_in_key, vld_in_key);
self.tx.put(&aev_encoded, &val_encoded)?; self.tx.put(&aev_encoded, &val_encoded)?;
// vae for ref types // vae for ref types
if attr.val_type.is_ref_type() { if attr.val_type.is_ref_type() {
let vae_encoded = encode_vae_key(v.get_entity_id()?, attr.id, eid, tx_id_in_key); let vae_encoded = encode_vae_key(v.get_entity_id()?, attr.id, eid, vld_in_key);
self.tx.put(&vae_encoded, &[op as u8])?; self.tx.put(&vae_encoded, &[op as u8])?;
} }
@ -77,16 +78,16 @@ impl SessionTx {
} else { } else {
eid eid
}; };
let ave_encoded = encode_ave_key(attr.id, v, e_in_key, tx_id_in_key); let ave_encoded = encode_ave_key(attr.id, v, e_in_key, vld_in_key);
// checking of unique constraints // checking of unique constraints
// TODO include future-pointing checking // TODO include future-pointing checking
if attr.indexing.is_unique_index() { if attr.indexing.is_unique_index() {
let starting = if attr.with_history { let starting = if attr.with_history {
ave_encoded.clone() ave_encoded.clone()
} else { } else {
encode_ave_key(attr.id, v, e_in_key, tx_id) encode_ave_key(attr.id, v, e_in_key, vld)
}; };
let ave_encoded_bound = encode_ave_key(attr.id, v, e_in_key, TxId::ZERO); let ave_encoded_bound = encode_ave_key(attr.id, v, e_in_key, Validity::MIN);
if let Some((k_slice, v_slice)) = self if let Some((k_slice, v_slice)) = self
.bounded_scan_first(&starting, &ave_encoded_bound) .bounded_scan_first(&starting, &ave_encoded_bound)
.pair()? .pair()?
@ -120,7 +121,13 @@ impl SessionTx {
Ok(()) Ok(())
} }
pub(crate) fn new_triple(&mut self, eid: EntityId, attr: &Attribute, v: &Value) -> Result<()> { pub(crate) fn new_triple(
&mut self,
eid: EntityId,
attr: &Attribute,
v: &Value,
vld: Validity,
) -> Result<()> {
// invariant: in the preparation step, any identity attr should already be resolved to // invariant: in the preparation step, any identity attr should already be resolved to
// an existing eid, if there is one // an existing eid, if there is one
let eid = if eid.is_perm() { let eid = if eid.is_perm() {
@ -135,7 +142,7 @@ impl SessionTx {
} }
} }
}; };
self.put_triple(eid, attr, v, StoreOp::Assert) self.put_triple(eid, attr, v, vld, StoreOp::Assert)
} }
pub(crate) fn amend_triple( pub(crate) fn amend_triple(
@ -143,12 +150,13 @@ impl SessionTx {
eid: EntityId, eid: EntityId,
attr: &Attribute, attr: &Attribute,
v: &Value, v: &Value,
vld: Validity,
) -> Result<()> { ) -> Result<()> {
if !eid.is_perm() { if !eid.is_perm() {
return Err(TripleError::TempEid(eid).into()); return Err(TripleError::TempEid(eid).into());
} }
// checking that the eid actually exists should be done in the preprocessing step // checking that the eid actually exists should be done in the preprocessing step
self.put_triple(eid, attr, v, StoreOp::Retract) self.put_triple(eid, attr, v, vld, StoreOp::Retract)
} }
pub(crate) fn retract_triple( pub(crate) fn retract_triple(
@ -156,11 +164,12 @@ impl SessionTx {
eid: EntityId, eid: EntityId,
attr: &Attribute, attr: &Attribute,
v: &Value, v: &Value,
vld: Validity,
) -> Result<()> { ) -> Result<()> {
self.put_triple(eid, attr, v, StoreOp::Retract)?; self.put_triple(eid, attr, v, vld, StoreOp::Retract)?;
if attr.val_type == AttributeTyping::Component { if attr.val_type == AttributeTyping::Component {
let eid_v = v.get_entity_id()?; let eid_v = v.get_entity_id()?;
self.retract_entity(eid_v)?; self.retract_entity(eid_v, vld)?;
} }
Ok(()) Ok(())
} }
@ -168,26 +177,30 @@ impl SessionTx {
&mut self, &mut self,
eid: EntityId, eid: EntityId,
attr: &Attribute, attr: &Attribute,
vld: Validity,
) -> Result<()> { ) -> Result<()> {
let lower_bound = encode_eav_key(eid, attr.id, &Value::Null, TxId::MAX_USER); // TODO properly add retractions at the correct time
let upper_bound = encode_eav_key(eid, attr.id, &Value::Bottom, TxId::ZERO); let lower_bound = encode_eav_key(eid, attr.id, &Value::Null, Validity::MAX);
self.batch_retract_triple(lower_bound, upper_bound) let upper_bound = encode_eav_key(eid, attr.id, &Value::Bottom, Validity::MIN);
self.batch_retract_triple(lower_bound, upper_bound, vld)
} }
pub(crate) fn retract_entity(&mut self, eid: EntityId) -> Result<()> { pub(crate) fn retract_entity(&mut self, eid: EntityId, vld: Validity) -> Result<()> {
match self.latest_entity_existence(eid, true)? { match self.latest_entity_existence(eid, true)? {
LatestTripleExistence::Asserted => {} LatestTripleExistence::Asserted => {}
LatestTripleExistence::Retracted => return Ok(()), LatestTripleExistence::Retracted => return Ok(()),
LatestTripleExistence::NotFound => return Err(TripleError::EidNotFound(eid).into()), LatestTripleExistence::NotFound => return Err(TripleError::EidNotFound(eid).into()),
} }
let lower_bound = encode_eav_key(eid, AttrId::MIN_PERM, &Value::Null, TxId::MAX_USER); let lower_bound = encode_eav_key(eid, AttrId::MIN_PERM, &Value::Null, Validity::MAX);
let upper_bound = encode_eav_key(eid, AttrId::MAX_PERM, &Value::Bottom, TxId::ZERO); let upper_bound = encode_eav_key(eid, AttrId::MAX_PERM, &Value::Bottom, Validity::MAX);
self.batch_retract_triple(lower_bound, upper_bound) self.batch_retract_triple(lower_bound, upper_bound, vld)
} }
fn batch_retract_triple( fn batch_retract_triple(
&mut self, &mut self,
lower_bound: EncodedVec<LARGE_VEC_SIZE>, lower_bound: EncodedVec<LARGE_VEC_SIZE>,
upper_bound: EncodedVec<LARGE_VEC_SIZE>, upper_bound: EncodedVec<LARGE_VEC_SIZE>,
vld: Validity,
) -> Result<()> { ) -> Result<()> {
// TODO properly retract attributes at the correct time only
let mut it = self.bounded_scan(&lower_bound, &upper_bound); let mut it = self.bounded_scan(&lower_bound, &upper_bound);
let mut current = lower_bound.clone(); let mut current = lower_bound.clone();
loop { loop {
@ -202,9 +215,9 @@ impl SessionTx {
let cur_attr = self let cur_attr = self
.attr_by_id(cur_aid)? .attr_by_id(cur_aid)?
.ok_or(TransactError::AttrNotFound(cur_aid))?; .ok_or(TransactError::AttrNotFound(cur_aid))?;
self.retract_triple(cur_eid, &cur_attr, &cur_v)?; self.retract_triple(cur_eid, &cur_attr, &cur_v, vld)?;
} }
current = encode_eav_key(cur_eid, cur_aid, &cur_v, TxId::ZERO); current = encode_eav_key(cur_eid, cur_aid, &cur_v, Validity::MIN);
} }
} }
} }
@ -229,6 +242,7 @@ impl SessionTx {
&mut self, &mut self,
attr: &Attribute, attr: &Attribute,
v: &Value, v: &Value,
vld: Validity,
) -> Result<Option<EntityId>> { ) -> Result<Option<EntityId>> {
if let Some(inner) = self.eid_by_attr_val_cache.get(v) { if let Some(inner) = self.eid_by_attr_val_cache.get(v) {
if let Some(found) = inner.get(&attr.id) { if let Some(found) = inner.get(&attr.id) {
@ -236,8 +250,8 @@ impl SessionTx {
} }
} }
let lower = encode_ave_key_for_unique_v(attr.id, v, self.r_tx_id); let lower = encode_ave_key_for_unique_v(attr.id, v, vld);
let upper = encode_ave_key_for_unique_v(attr.id, v, TxId::ZERO); let upper = encode_ave_key_for_unique_v(attr.id, v, Validity::MIN);
Ok( Ok(
if let Some((k_slice, v_slice)) = self.bounded_scan_first(&lower, &upper).pair()? { if let Some((k_slice, v_slice)) = self.bounded_scan_first(&lower, &upper).pair()? {
if StoreOp::try_from(v_slice[0])?.is_assert() { if StoreOp::try_from(v_slice[0])?.is_assert() {
@ -264,83 +278,96 @@ impl SessionTx {
&mut self, &mut self,
eid: EntityId, eid: EntityId,
aid: AttrId, aid: AttrId,
txid: TxId, vld: Validity,
) -> Result<StaticValue> { ) -> Result<StaticValue> {
let encoded = encode_eav_key(eid, aid, &Value::Bottom, txid); let encoded = encode_eav_key(eid, aid, &Value::Bottom, vld);
let res = self let res = self
.tx .tx
.get(&encoded, false)? .get(&encoded, false)?
.ok_or(TripleError::TripleEANotFound(eid, aid, txid))?; .ok_or(TripleError::TripleEANotFound(eid, aid, vld))?;
Ok(decode_value(&res.as_ref()[1..])?.to_static()) Ok(decode_value(&res.as_ref()[1..])?.to_static())
} }
pub(crate) fn triple_ea_scan( pub(crate) fn triple_ea_scan(
&mut self, &mut self,
eid: EntityId, eid: EntityId,
aid: AttrId, aid: AttrId,
) -> impl Iterator<Item = Result<(EntityId, AttrId, StaticValue, TxId)>> { ) -> impl Iterator<Item = Result<(EntityId, AttrId, StaticValue, Validity)>> {
let lower = encode_eav_key(eid, aid, &Value::Null, TxId::MAX_USER); let lower = encode_eav_key(eid, aid, &Value::Null, Validity::MAX);
let upper = encode_eav_key(eid, aid, &Value::Bottom, TxId::ZERO); let upper = encode_eav_key(eid, aid, &Value::Bottom, Validity::MIN);
TripleEntityAttrIter::new(self.tx.iterator(), lower, upper) TripleEntityAttrIter::new(self.tx.iterator(), lower, upper)
} }
pub(crate) fn triple_ae_scan( pub(crate) fn triple_ae_scan(
&mut self, &mut self,
aid: AttrId, aid: AttrId,
eid: EntityId, eid: EntityId,
) -> impl Iterator<Item = Result<(AttrId, EntityId, StaticValue, TxId)>> { ) -> impl Iterator<Item = Result<(AttrId, EntityId, StaticValue, Validity)>> {
let lower = encode_aev_key(aid, eid, &Value::Null, TxId::MAX_USER); let lower = encode_aev_key(aid, eid, &Value::Null, Validity::MAX);
let upper = encode_aev_key(aid, eid, &Value::Bottom, TxId::ZERO); let upper = encode_aev_key(aid, eid, &Value::Bottom, Validity::MIN);
TripleAttrEntityIter::new(self.tx.iterator(), lower, upper) TripleAttrEntityIter::new(self.tx.iterator(), lower, upper)
} }
pub(crate) fn triple_av_scan( pub(crate) fn triple_av_scan(
&mut self, &mut self,
aid: AttrId, aid: AttrId,
v: &Value, v: &Value,
) -> impl Iterator<Item = Result<(AttrId, StaticValue, EntityId, TxId)>> { ) -> impl Iterator<Item = Result<(AttrId, StaticValue, EntityId, Validity)>> {
let lower = encode_ave_key(aid, v, EntityId::MIN_PERM, TxId::MAX_USER); let lower = encode_ave_key(aid, v, EntityId::MIN_PERM, Validity::MAX);
let upper = encode_ave_key(aid, v, EntityId::MAX_PERM, TxId::ZERO); let upper = encode_ave_key(aid, v, EntityId::MAX_PERM, Validity::MIN);
TripleAttrValueIter::new(self.tx.iterator(), lower, upper) TripleAttrValueIter::new(self.tx.iterator(), lower, upper)
} }
pub(crate) fn triple_vref_a_scan( pub(crate) fn triple_vref_a_scan(
&mut self, &mut self,
v_eid: EntityId, v_eid: EntityId,
aid: AttrId, aid: AttrId,
) -> impl Iterator<Item = Result<(EntityId, AttrId, EntityId, TxId)>> { ) -> impl Iterator<Item = Result<(EntityId, AttrId, EntityId, Validity)>> {
let lower = encode_vae_key(v_eid, aid, EntityId::MIN_PERM, TxId::MAX_USER); let lower = encode_vae_key(v_eid, aid, EntityId::MIN_PERM, Validity::MAX);
let upper = encode_vae_key(v_eid, aid, EntityId::MAX_PERM, TxId::ZERO); let upper = encode_vae_key(v_eid, aid, EntityId::MAX_PERM, Validity::MIN);
TripleValueRefAttrIter::new(self.tx.iterator(), lower, upper) TripleValueRefAttrIter::new(self.tx.iterator(), lower, upper)
} }
pub(crate) fn triple_e_scan( pub(crate) fn triple_e_scan(
&mut self, &mut self,
eid: EntityId, eid: EntityId,
) -> impl Iterator<Item = Result<(EntityId, AttrId, StaticValue, TxId)>> { ) -> impl Iterator<Item = Result<(EntityId, AttrId, StaticValue, Validity)>> {
let lower = encode_eav_key(eid, AttrId::MIN_PERM, &Value::Null, TxId::MAX_USER); let lower = encode_eav_key(eid, AttrId::MIN_PERM, &Value::Null, Validity::MAX);
let upper = encode_eav_key(eid, AttrId::MAX_PERM, &Value::Bottom, TxId::ZERO); let upper = encode_eav_key(eid, AttrId::MAX_PERM, &Value::Bottom, Validity::MIN);
TripleEntityAttrIter::new(self.tx.iterator(), lower, upper) TripleEntityAttrIter::new(self.tx.iterator(), lower, upper)
} }
pub(crate) fn triple_a_scan( pub(crate) fn triple_a_scan(
&mut self, &mut self,
aid: AttrId, aid: AttrId,
) -> impl Iterator<Item = Result<(AttrId, EntityId, StaticValue, TxId)>> { ) -> impl Iterator<Item = Result<(AttrId, EntityId, StaticValue, Validity)>> {
let lower = encode_aev_key(aid, EntityId::MIN_PERM, &Value::Null, TxId::MAX_USER); let lower = encode_aev_key(aid, EntityId::MIN_PERM, &Value::Null, Validity::MAX);
let upper = encode_aev_key(aid, EntityId::MAX_PERM, &Value::Bottom, TxId::ZERO); let upper = encode_aev_key(aid, EntityId::MAX_PERM, &Value::Bottom, Validity::MIN);
TripleAttrEntityIter::new(self.tx.iterator(), lower, upper) TripleAttrEntityIter::new(self.tx.iterator(), lower, upper)
}pub(crate) fn triple_a_scan_all( }
pub(crate) fn triple_a_scan_all(
&mut self, &mut self,
) -> impl Iterator<Item = Result<(AttrId, EntityId, StaticValue, TxId)>> { ) -> impl Iterator<Item = Result<(AttrId, EntityId, StaticValue, Validity)>> {
let lower = encode_aev_key(AttrId::MIN_PERM, EntityId::MIN_PERM, &Value::Null, TxId::MAX_USER); let lower = encode_aev_key(
let upper = encode_aev_key(AttrId::MAX_PERM, EntityId::MAX_PERM, &Value::Bottom, TxId::ZERO); AttrId::MIN_PERM,
EntityId::MIN_PERM,
&Value::Null,
Validity::MAX,
);
let upper = encode_aev_key(
AttrId::MAX_PERM,
EntityId::MAX_PERM,
&Value::Bottom,
Validity::MIN,
);
TripleAttrEntityIter::new(self.tx.iterator(), lower, upper) TripleAttrEntityIter::new(self.tx.iterator(), lower, upper)
} }
pub(crate) fn triple_vref_scan( pub(crate) fn triple_vref_scan(
&mut self, &mut self,
v_eid: EntityId, v_eid: EntityId,
) -> impl Iterator<Item = Result<(EntityId, AttrId, EntityId, TxId)>> { ) -> impl Iterator<Item = Result<(EntityId, AttrId, EntityId, Validity)>> {
let lower = encode_vae_key(v_eid, AttrId::MIN_PERM, EntityId::MIN_PERM, TxId::MAX_USER); let lower = encode_vae_key(v_eid, AttrId::MIN_PERM, EntityId::MIN_PERM, Validity::MAX);
let upper = encode_vae_key(v_eid, AttrId::MAX_PERM, EntityId::MAX_PERM, TxId::ZERO); let upper = encode_vae_key(v_eid, AttrId::MAX_PERM, EntityId::MAX_PERM, Validity::MIN);
TripleValueRefAttrIter::new(self.tx.iterator(), lower, upper) TripleValueRefAttrIter::new(self.tx.iterator(), lower, upper)
} }
} }
// FIXME iterators should iterate on current validity
enum LatestTripleExistence { enum LatestTripleExistence {
Asserted, Asserted,
Retracted, Retracted,
@ -364,7 +391,7 @@ impl TripleEntityAttrIter {
current: lower_bound, current: lower_bound,
} }
} }
fn next_inner(&mut self) -> Result<Option<(EntityId, AttrId, StaticValue, TxId)>> { fn next_inner(&mut self) -> Result<Option<(EntityId, AttrId, StaticValue, Validity)>> {
loop { loop {
self.it.seek(&self.current); self.it.seek(&self.current);
match self.it.pair()? { match self.it.pair()? {
@ -385,7 +412,7 @@ impl TripleEntityAttrIter {
} }
impl Iterator for TripleEntityAttrIter { impl Iterator for TripleEntityAttrIter {
type Item = Result<(EntityId, AttrId, StaticValue, TxId)>; type Item = Result<(EntityId, AttrId, StaticValue, Validity)>;
fn next(&mut self) -> Option<Self::Item> { fn next(&mut self) -> Option<Self::Item> {
swap_option_result(self.next_inner()) swap_option_result(self.next_inner())
@ -409,7 +436,7 @@ impl TripleAttrEntityIter {
current: lower_bound, current: lower_bound,
} }
} }
fn next_inner(&mut self) -> Result<Option<(AttrId, EntityId, StaticValue, TxId)>> { fn next_inner(&mut self) -> Result<Option<(AttrId, EntityId, StaticValue, Validity)>> {
loop { loop {
self.it.seek(&self.current); self.it.seek(&self.current);
match self.it.pair()? { match self.it.pair()? {
@ -430,7 +457,7 @@ impl TripleAttrEntityIter {
} }
impl Iterator for TripleAttrEntityIter { impl Iterator for TripleAttrEntityIter {
type Item = Result<(AttrId, EntityId, StaticValue, TxId)>; type Item = Result<(AttrId, EntityId, StaticValue, Validity)>;
fn next(&mut self) -> Option<Self::Item> { fn next(&mut self) -> Option<Self::Item> {
swap_option_result(self.next_inner()) swap_option_result(self.next_inner())
@ -454,7 +481,7 @@ impl TripleAttrValueIter {
current: lower_bound, current: lower_bound,
} }
} }
fn next_inner(&mut self) -> Result<Option<(AttrId, StaticValue, EntityId, TxId)>> { fn next_inner(&mut self) -> Result<Option<(AttrId, StaticValue, EntityId, Validity)>> {
loop { loop {
self.it.seek(&self.current); self.it.seek(&self.current);
match self.it.pair()? { match self.it.pair()? {
@ -475,7 +502,7 @@ impl TripleAttrValueIter {
} }
impl Iterator for TripleAttrValueIter { impl Iterator for TripleAttrValueIter {
type Item = Result<(AttrId, StaticValue, EntityId, TxId)>; type Item = Result<(AttrId, StaticValue, EntityId, Validity)>;
fn next(&mut self) -> Option<Self::Item> { fn next(&mut self) -> Option<Self::Item> {
swap_option_result(self.next_inner()) swap_option_result(self.next_inner())
@ -499,7 +526,7 @@ impl TripleValueRefAttrIter {
current: lower_bound, current: lower_bound,
} }
} }
fn next_inner(&mut self) -> Result<Option<(EntityId, AttrId, EntityId, TxId)>> { fn next_inner(&mut self) -> Result<Option<(EntityId, AttrId, EntityId, Validity)>> {
loop { loop {
self.it.seek(&self.current); self.it.seek(&self.current);
match self.it.pair()? { match self.it.pair()? {
@ -519,7 +546,7 @@ impl TripleValueRefAttrIter {
} }
impl Iterator for TripleValueRefAttrIter { impl Iterator for TripleValueRefAttrIter {
type Item = Result<(EntityId, AttrId, EntityId, TxId)>; type Item = Result<(EntityId, AttrId, EntityId, Validity)>;
fn next(&mut self) -> Option<Self::Item> { fn next(&mut self) -> Option<Self::Item> {
swap_option_result(self.next_inner()) swap_option_result(self.next_inner())

Loading…
Cancel
Save