change mutability for scan operations

main
Ziyang Hu 2 years ago
parent 79808a4513
commit a857cb11cc

@ -57,7 +57,7 @@ struct TxBridge {
r_opts->fill_cache = val; r_opts->fill_cache = val;
} }
inline unique_ptr<IterBridge> iterator() { inline unique_ptr<IterBridge> iterator() const {
return make_unique<IterBridge>(&*tx); return make_unique<IterBridge>(&*tx);
}; };
@ -87,7 +87,7 @@ struct TxBridge {
void start(); void start();
inline unique_ptr<PinnableSlice> get(RustBytes key, bool for_update, RocksDbStatus &status) { inline unique_ptr<PinnableSlice> get(RustBytes key, bool for_update, RocksDbStatus &status) const {
Slice key_ = convert_slice(key); Slice key_ = convert_slice(key);
auto ret = make_unique<PinnableSlice>(); auto ret = make_unique<PinnableSlice>();
if (for_update) { if (for_update) {
@ -100,7 +100,7 @@ struct TxBridge {
return ret; return ret;
} }
inline void exists(RustBytes key, bool for_update, RocksDbStatus &status) { inline void exists(RustBytes key, bool for_update, RocksDbStatus &status) const {
Slice key_ = convert_slice(key); Slice key_ = convert_slice(key);
auto ret = PinnableSlice(); auto ret = PinnableSlice();
if (for_update) { if (for_update) {

@ -126,17 +126,12 @@ pub(crate) mod ffi {
fn set_snapshot(self: Pin<&mut TxBridge>, val: bool); fn set_snapshot(self: Pin<&mut TxBridge>, val: bool);
fn clear_snapshot(self: Pin<&mut TxBridge>); fn clear_snapshot(self: Pin<&mut TxBridge>);
fn get( fn get(
self: Pin<&mut TxBridge>, self: &TxBridge,
key: &[u8], key: &[u8],
for_update: bool, for_update: bool,
status: &mut RocksDbStatus, status: &mut RocksDbStatus,
) -> UniquePtr<PinnableSlice>; ) -> UniquePtr<PinnableSlice>;
fn exists( fn exists(self: &TxBridge, key: &[u8], for_update: bool, status: &mut RocksDbStatus);
self: Pin<&mut TxBridge>,
key: &[u8],
for_update: bool,
status: &mut RocksDbStatus,
);
fn put(self: Pin<&mut TxBridge>, key: &[u8], val: &[u8], status: &mut RocksDbStatus); fn put(self: Pin<&mut TxBridge>, key: &[u8], val: &[u8], status: &mut RocksDbStatus);
fn del(self: Pin<&mut TxBridge>, key: &[u8], status: &mut RocksDbStatus); fn del(self: Pin<&mut TxBridge>, key: &[u8], status: &mut RocksDbStatus);
fn commit(self: Pin<&mut TxBridge>, status: &mut RocksDbStatus); fn commit(self: Pin<&mut TxBridge>, status: &mut RocksDbStatus);
@ -144,7 +139,7 @@ pub(crate) mod ffi {
fn rollback_to_savepoint(self: Pin<&mut TxBridge>, status: &mut RocksDbStatus); fn rollback_to_savepoint(self: Pin<&mut TxBridge>, status: &mut RocksDbStatus);
fn pop_savepoint(self: Pin<&mut TxBridge>, status: &mut RocksDbStatus); fn pop_savepoint(self: Pin<&mut TxBridge>, status: &mut RocksDbStatus);
fn set_savepoint(self: Pin<&mut TxBridge>); fn set_savepoint(self: Pin<&mut TxBridge>);
fn iterator(self: Pin<&mut TxBridge>) -> UniquePtr<IterBridge>; fn iterator(self: &TxBridge) -> UniquePtr<IterBridge>;
type IterBridge; type IterBridge;
fn start(self: Pin<&mut IterBridge>); fn start(self: Pin<&mut IterBridge>);

@ -1,9 +1,11 @@
use crate::bridge::ffi::*;
use crate::bridge::iter::IterBuilder;
use cxx::*;
use std::fmt::{Debug, Formatter}; use std::fmt::{Debug, Formatter};
use std::ops::Deref; use std::ops::Deref;
use cxx::*;
use crate::bridge::ffi::*;
use crate::bridge::iter::IterBuilder;
pub struct TxBuilder { pub struct TxBuilder {
pub(crate) inner: UniquePtr<TxBridge>, pub(crate) inner: UniquePtr<TxBridge>,
} }
@ -102,9 +104,9 @@ impl Tx {
} }
} }
#[inline] #[inline]
pub fn get(&mut self, key: &[u8], for_update: bool) -> Result<Option<PinSlice>, RocksDbStatus> { pub fn get(&self, key: &[u8], for_update: bool) -> Result<Option<PinSlice>, RocksDbStatus> {
let mut status = RocksDbStatus::default(); let mut status = RocksDbStatus::default();
let ret = self.inner.pin_mut().get(key, for_update, &mut status); let ret = self.inner.get(key, for_update, &mut status);
match status.code { match status.code {
StatusCode::kOk => Ok(Some(PinSlice { inner: ret })), StatusCode::kOk => Ok(Some(PinSlice { inner: ret })),
StatusCode::kNotFound => Ok(None), StatusCode::kNotFound => Ok(None),
@ -112,9 +114,9 @@ impl Tx {
} }
} }
#[inline] #[inline]
pub fn exists(&mut self, key: &[u8], for_update: bool) -> Result<bool, RocksDbStatus> { pub fn exists(&self, key: &[u8], for_update: bool) -> Result<bool, RocksDbStatus> {
let mut status = RocksDbStatus::default(); let mut status = RocksDbStatus::default();
self.inner.pin_mut().exists(key, for_update, &mut status); self.inner.exists(key, for_update, &mut status);
match status.code { match status.code {
StatusCode::kOk => Ok(true), StatusCode::kOk => Ok(true),
StatusCode::kNotFound => Ok(false), StatusCode::kNotFound => Ok(false),
@ -166,9 +168,9 @@ impl Tx {
} }
} }
#[inline] #[inline]
pub fn iterator(&mut self) -> IterBuilder { pub fn iterator(&self) -> IterBuilder {
IterBuilder { IterBuilder {
inner: self.inner.pin_mut().iterator(), inner: self.inner.iterator(),
} }
.auto_prefix_mode(true) .auto_prefix_mode(true)
} }

@ -1,6 +1,5 @@
use std::collections::BTreeMap; use std::collections::BTreeMap;
use anyhow::Result;
use itertools::Itertools; use itertools::Itertools;
use crate::data::attr::Attribute; use crate::data::attr::Attribute;
@ -124,6 +123,7 @@ impl TripleRelation {
} }
fn cartesian_join<'a>(&'a self, left_iter: TupleIter<'a>) -> TupleIter<'a> { fn cartesian_join<'a>(&'a self, left_iter: TupleIter<'a>) -> TupleIter<'a> {
// [f, f] not really a join // [f, f] not really a join
left_iter.map_ok(|tuple| {});
todo!() todo!()
} }
fn ev_join<'a>( fn ev_join<'a>(

@ -133,7 +133,7 @@ impl SessionTx {
format!("{:?}", v), format!("{:?}", v),
format!("{:?}", stored_v), format!("{:?}", stored_v),
) )
.into()); .into());
} }
Ok(()) Ok(())
} }
@ -225,7 +225,7 @@ impl SessionTx {
attr.keyword.clone(), attr.keyword.clone(),
format!("{:?}", v), format!("{:?}", v),
) )
.into()); .into());
} }
} }
// fwd scan in time // fwd scan in time
@ -236,7 +236,7 @@ impl SessionTx {
attr.keyword.clone(), attr.keyword.clone(),
format!("{:?}", v), format!("{:?}", v),
) )
.into()); .into());
} }
} }
} else if let Some(v_slice) = self.tx.get(&ave_encoded, false)? { } else if let Some(v_slice) = self.tx.get(&ave_encoded, false)? {
@ -246,7 +246,7 @@ impl SessionTx {
attr.keyword.clone(), attr.keyword.clone(),
format!("{:?}", v), format!("{:?}", v),
) )
.into()); .into());
} }
} }
} }
@ -424,7 +424,7 @@ impl SessionTx {
) )
} }
pub(crate) fn restore_bottom_value( pub(crate) fn restore_bottom_value(
&mut self, &self,
eid: EntityId, eid: EntityId,
aid: AttrId, aid: AttrId,
vld: Validity, vld: Validity,
@ -437,128 +437,128 @@ impl SessionTx {
decode_value(&res.as_ref()[1..]) decode_value(&res.as_ref()[1..])
} }
pub(crate) fn triple_ea_scan( pub(crate) fn triple_ea_scan(
&mut self, &self,
eid: EntityId, eid: EntityId,
aid: AttrId, aid: AttrId,
) -> impl Iterator<Item=Result<(EntityId, AttrId, DataValue, Validity, StoreOp)>> { ) -> impl Iterator<Item = Result<(EntityId, AttrId, DataValue, Validity, StoreOp)>> {
let lower = encode_eav_key(eid, aid, &DataValue::Null, Validity::MAX); let lower = encode_eav_key(eid, aid, &DataValue::Null, Validity::MAX);
let upper = encode_eav_key(eid, aid, &DataValue::Bottom, Validity::MIN); let upper = encode_eav_key(eid, aid, &DataValue::Bottom, Validity::MIN);
TripleEntityAttrIter::new(self.tx.iterator(), lower, upper) TripleEntityAttrIter::new(self.tx.iterator(), lower, upper)
} }
pub(crate) fn triple_ea_before_scan( pub(crate) fn triple_ea_before_scan(
&mut self, &self,
eid: EntityId, eid: EntityId,
aid: AttrId, aid: AttrId,
before: Validity, before: Validity,
) -> impl Iterator<Item=Result<(EntityId, AttrId, DataValue)>> { ) -> impl Iterator<Item = Result<(EntityId, AttrId, DataValue)>> {
let lower = encode_eav_key(eid, aid, &DataValue::Null, Validity::MAX); let lower = encode_eav_key(eid, aid, &DataValue::Null, Validity::MAX);
let upper = encode_eav_key(eid, aid, &DataValue::Bottom, Validity::MIN); let upper = encode_eav_key(eid, aid, &DataValue::Bottom, Validity::MIN);
TripleEntityAttrBeforeIter::new(self.tx.iterator(), lower, upper, before) TripleEntityAttrBeforeIter::new(self.tx.iterator(), lower, upper, before)
} }
pub(crate) fn triple_ae_scan( pub(crate) fn triple_ae_scan(
&mut self, &self,
aid: AttrId, aid: AttrId,
eid: EntityId, eid: EntityId,
) -> impl Iterator<Item=Result<(AttrId, EntityId, DataValue, Validity, StoreOp)>> { ) -> impl Iterator<Item = Result<(AttrId, EntityId, DataValue, Validity, StoreOp)>> {
let lower = encode_aev_key(aid, eid, &DataValue::Null, Validity::MAX); let lower = encode_aev_key(aid, eid, &DataValue::Null, Validity::MAX);
let upper = encode_aev_key(aid, eid, &DataValue::Bottom, Validity::MIN); let upper = encode_aev_key(aid, eid, &DataValue::Bottom, Validity::MIN);
TripleAttrEntityIter::new(self.tx.iterator(), lower, upper) TripleAttrEntityIter::new(self.tx.iterator(), lower, upper)
} }
pub(crate) fn triple_ae_before_scan( pub(crate) fn triple_ae_before_scan(
&mut self, &self,
aid: AttrId, aid: AttrId,
eid: EntityId, eid: EntityId,
before: Validity, before: Validity,
) -> impl Iterator<Item=Result<(AttrId, EntityId, DataValue)>> { ) -> impl Iterator<Item = Result<(AttrId, EntityId, DataValue)>> {
let lower = encode_aev_key(aid, eid, &DataValue::Null, Validity::MAX); let lower = encode_aev_key(aid, eid, &DataValue::Null, Validity::MAX);
let upper = encode_aev_key(aid, eid, &DataValue::Bottom, Validity::MIN); let upper = encode_aev_key(aid, eid, &DataValue::Bottom, Validity::MIN);
TripleAttrEntityBeforeIter::new(self.tx.iterator(), lower, upper, before) TripleAttrEntityBeforeIter::new(self.tx.iterator(), lower, upper, before)
} }
pub(crate) fn triple_av_scan( pub(crate) fn triple_av_scan(
&mut self, &self,
aid: AttrId, aid: AttrId,
v: &DataValue, v: &DataValue,
) -> impl Iterator<Item=Result<(AttrId, DataValue, EntityId, Validity, StoreOp)>> { ) -> impl Iterator<Item = Result<(AttrId, DataValue, EntityId, Validity, StoreOp)>> {
let lower = encode_ave_key(aid, v, EntityId::MIN_PERM, Validity::MAX); let lower = encode_ave_key(aid, v, EntityId::MIN_PERM, Validity::MAX);
let upper = encode_ave_key(aid, v, EntityId::MAX_PERM, Validity::MIN); let upper = encode_ave_key(aid, v, EntityId::MAX_PERM, Validity::MIN);
TripleAttrValueIter::new(self.tx.iterator(), lower, upper) TripleAttrValueIter::new(self.tx.iterator(), lower, upper)
} }
pub(crate) fn triple_av_before_scan( pub(crate) fn triple_av_before_scan(
&mut self, &self,
aid: AttrId, aid: AttrId,
v: &DataValue, v: &DataValue,
before: Validity, before: Validity,
) -> impl Iterator<Item=Result<(AttrId, DataValue, EntityId)>> { ) -> impl Iterator<Item = Result<(AttrId, DataValue, EntityId)>> {
let lower = encode_ave_key(aid, v, EntityId::MIN_PERM, Validity::MAX); let lower = encode_ave_key(aid, v, EntityId::MIN_PERM, Validity::MAX);
let upper = encode_ave_key(aid, v, EntityId::MAX_PERM, Validity::MIN); let upper = encode_ave_key(aid, v, EntityId::MAX_PERM, Validity::MIN);
TripleAttrValueBeforeIter::new(self.tx.iterator(), lower, upper, before) TripleAttrValueBeforeIter::new(self.tx.iterator(), lower, upper, before)
} }
pub(crate) fn triple_av_after_scan( pub(crate) fn triple_av_after_scan(
&mut self, &self,
aid: AttrId, aid: AttrId,
v: &DataValue, v: &DataValue,
after: Validity, after: Validity,
) -> impl Iterator<Item=Result<(AttrId, DataValue, EntityId)>> { ) -> impl Iterator<Item = Result<(AttrId, DataValue, EntityId)>> {
let lower = encode_ave_key(aid, v, EntityId::MIN_PERM, Validity::MAX); let lower = encode_ave_key(aid, v, EntityId::MIN_PERM, Validity::MAX);
let upper = encode_ave_key(aid, v, EntityId::MAX_PERM, Validity::MIN); let upper = encode_ave_key(aid, v, EntityId::MAX_PERM, Validity::MIN);
TripleAttrValueAfterIter::new(self.tx.iterator(), lower, upper, after) TripleAttrValueAfterIter::new(self.tx.iterator(), lower, upper, after)
} }
pub(crate) fn triple_vref_a_scan( pub(crate) fn triple_vref_a_scan(
&mut self, &self,
v_eid: EntityId, v_eid: EntityId,
aid: AttrId, aid: AttrId,
) -> impl Iterator<Item=Result<(EntityId, AttrId, EntityId, Validity, StoreOp)>> { ) -> impl Iterator<Item = Result<(EntityId, AttrId, EntityId, Validity, StoreOp)>> {
let lower = encode_vae_key(v_eid, aid, EntityId::MIN_PERM, Validity::MAX); let lower = encode_vae_key(v_eid, aid, EntityId::MIN_PERM, Validity::MAX);
let upper = encode_vae_key(v_eid, aid, EntityId::MAX_PERM, Validity::MIN); let upper = encode_vae_key(v_eid, aid, EntityId::MAX_PERM, Validity::MIN);
TripleValueRefAttrIter::new(self.tx.iterator(), lower, upper) TripleValueRefAttrIter::new(self.tx.iterator(), lower, upper)
} }
pub(crate) fn triple_vref_a_before_scan( pub(crate) fn triple_vref_a_before_scan(
&mut self, &self,
v_eid: EntityId, v_eid: EntityId,
aid: AttrId, aid: AttrId,
before: Validity, before: Validity,
) -> impl Iterator<Item=Result<(EntityId, AttrId, EntityId)>> { ) -> impl Iterator<Item = Result<(EntityId, AttrId, EntityId)>> {
let lower = encode_vae_key(v_eid, aid, EntityId::MIN_PERM, Validity::MAX); let lower = encode_vae_key(v_eid, aid, EntityId::MIN_PERM, Validity::MAX);
let upper = encode_vae_key(v_eid, aid, EntityId::MAX_PERM, Validity::MIN); let upper = encode_vae_key(v_eid, aid, EntityId::MAX_PERM, Validity::MIN);
TripleValueRefAttrBeforeIter::new(self.tx.iterator(), lower, upper, before) TripleValueRefAttrBeforeIter::new(self.tx.iterator(), lower, upper, before)
} }
pub(crate) fn triple_e_scan( pub(crate) fn triple_e_scan(
&mut self, &self,
eid: EntityId, eid: EntityId,
) -> impl Iterator<Item=Result<(EntityId, AttrId, DataValue, Validity, StoreOp)>> { ) -> impl Iterator<Item = Result<(EntityId, AttrId, DataValue, Validity, StoreOp)>> {
let lower = encode_eav_key(eid, AttrId::MIN_PERM, &DataValue::Null, Validity::MAX); let lower = encode_eav_key(eid, AttrId::MIN_PERM, &DataValue::Null, Validity::MAX);
let upper = encode_eav_key(eid, AttrId::MAX_PERM, &DataValue::Bottom, Validity::MIN); let upper = encode_eav_key(eid, AttrId::MAX_PERM, &DataValue::Bottom, Validity::MIN);
TripleEntityAttrIter::new(self.tx.iterator(), lower, upper) TripleEntityAttrIter::new(self.tx.iterator(), lower, upper)
} }
pub(crate) fn triple_e_before_scan( pub(crate) fn triple_e_before_scan(
&mut self, &self,
eid: EntityId, eid: EntityId,
before: Validity, before: Validity,
) -> impl Iterator<Item=Result<(EntityId, AttrId, DataValue)>> { ) -> impl Iterator<Item = Result<(EntityId, AttrId, DataValue)>> {
let lower = encode_eav_key(eid, AttrId::MIN_PERM, &DataValue::Null, Validity::MAX); let lower = encode_eav_key(eid, AttrId::MIN_PERM, &DataValue::Null, Validity::MAX);
let upper = encode_eav_key(eid, AttrId::MAX_PERM, &DataValue::Bottom, Validity::MIN); let upper = encode_eav_key(eid, AttrId::MAX_PERM, &DataValue::Bottom, Validity::MIN);
TripleEntityAttrBeforeIter::new(self.tx.iterator(), lower, upper, before) TripleEntityAttrBeforeIter::new(self.tx.iterator(), lower, upper, before)
} }
pub(crate) fn triple_a_scan( pub(crate) fn triple_a_scan(
&mut self, &self,
aid: AttrId, aid: AttrId,
) -> impl Iterator<Item=Result<(AttrId, EntityId, DataValue, Validity, StoreOp)>> { ) -> impl Iterator<Item = Result<(AttrId, EntityId, DataValue, Validity, StoreOp)>> {
let lower = encode_aev_key(aid, EntityId::MIN_PERM, &DataValue::Null, Validity::MAX); let lower = encode_aev_key(aid, EntityId::MIN_PERM, &DataValue::Null, Validity::MAX);
let upper = encode_aev_key(aid, EntityId::MAX_PERM, &DataValue::Bottom, Validity::MIN); let upper = encode_aev_key(aid, EntityId::MAX_PERM, &DataValue::Bottom, Validity::MIN);
TripleAttrEntityIter::new(self.tx.iterator(), lower, upper) TripleAttrEntityIter::new(self.tx.iterator(), lower, upper)
} }
pub(crate) fn triple_a_before_scan( pub(crate) fn triple_a_before_scan(
&mut self, &self,
aid: AttrId, aid: AttrId,
before: Validity, before: Validity,
) -> impl Iterator<Item=Result<(AttrId, EntityId, DataValue)>> { ) -> impl Iterator<Item = Result<(AttrId, EntityId, DataValue)>> {
let lower = encode_aev_key(aid, EntityId::MIN_PERM, &DataValue::Null, Validity::MAX); let lower = encode_aev_key(aid, EntityId::MIN_PERM, &DataValue::Null, Validity::MAX);
let upper = encode_aev_key(aid, EntityId::MAX_PERM, &DataValue::Bottom, Validity::MIN); let upper = encode_aev_key(aid, EntityId::MAX_PERM, &DataValue::Bottom, Validity::MIN);
TripleAttrEntityBeforeIter::new(self.tx.iterator(), lower, upper, before) TripleAttrEntityBeforeIter::new(self.tx.iterator(), lower, upper, before)
} }
pub(crate) fn triple_a_scan_all( pub(crate) fn triple_a_scan_all(
&mut self, &self,
) -> impl Iterator<Item=Result<(AttrId, EntityId, DataValue, Validity, StoreOp)>> { ) -> impl Iterator<Item = Result<(AttrId, EntityId, DataValue, Validity, StoreOp)>> {
let lower = encode_aev_key( let lower = encode_aev_key(
AttrId::MIN_PERM, AttrId::MIN_PERM,
EntityId::MIN_PERM, EntityId::MIN_PERM,
@ -574,9 +574,9 @@ impl SessionTx {
TripleAttrEntityIter::new(self.tx.iterator(), lower, upper) TripleAttrEntityIter::new(self.tx.iterator(), lower, upper)
} }
pub(crate) fn triple_a_before_scan_all( pub(crate) fn triple_a_before_scan_all(
&mut self, &self,
before: Validity, before: Validity,
) -> impl Iterator<Item=Result<(AttrId, EntityId, DataValue)>> { ) -> impl Iterator<Item = Result<(AttrId, EntityId, DataValue)>> {
let lower = encode_aev_key( let lower = encode_aev_key(
AttrId::MIN_PERM, AttrId::MIN_PERM,
EntityId::MIN_PERM, EntityId::MIN_PERM,
@ -592,18 +592,18 @@ impl SessionTx {
TripleAttrEntityBeforeIter::new(self.tx.iterator(), lower, upper, before) TripleAttrEntityBeforeIter::new(self.tx.iterator(), lower, upper, before)
} }
pub(crate) fn triple_vref_scan( pub(crate) fn triple_vref_scan(
&mut self, &self,
v_eid: EntityId, v_eid: EntityId,
) -> impl Iterator<Item=Result<(EntityId, AttrId, EntityId, Validity, StoreOp)>> { ) -> impl Iterator<Item = Result<(EntityId, AttrId, EntityId, Validity, StoreOp)>> {
let lower = encode_vae_key(v_eid, AttrId::MIN_PERM, EntityId::MIN_PERM, Validity::MAX); let lower = encode_vae_key(v_eid, AttrId::MIN_PERM, EntityId::MIN_PERM, Validity::MAX);
let upper = encode_vae_key(v_eid, AttrId::MAX_PERM, EntityId::MAX_PERM, Validity::MIN); let upper = encode_vae_key(v_eid, AttrId::MAX_PERM, EntityId::MAX_PERM, Validity::MIN);
TripleValueRefAttrIter::new(self.tx.iterator(), lower, upper) TripleValueRefAttrIter::new(self.tx.iterator(), lower, upper)
} }
pub(crate) fn triple_vref_before_scan( pub(crate) fn triple_vref_before_scan(
&mut self, &self,
v_eid: EntityId, v_eid: EntityId,
before: Validity, before: Validity,
) -> impl Iterator<Item=Result<(EntityId, AttrId, EntityId)>> { ) -> impl Iterator<Item = Result<(EntityId, AttrId, EntityId)>> {
let lower = encode_vae_key(v_eid, AttrId::MIN_PERM, EntityId::MIN_PERM, Validity::MAX); let lower = encode_vae_key(v_eid, AttrId::MIN_PERM, EntityId::MIN_PERM, Validity::MAX);
let upper = encode_vae_key(v_eid, AttrId::MAX_PERM, EntityId::MAX_PERM, Validity::MIN); let upper = encode_vae_key(v_eid, AttrId::MAX_PERM, EntityId::MAX_PERM, Validity::MIN);
TripleValueRefAttrBeforeIter::new(self.tx.iterator(), lower, upper, before) TripleValueRefAttrBeforeIter::new(self.tx.iterator(), lower, upper, before)

Loading…
Cancel
Save