optimize semi-naive logic

main
Ziyang Hu 2 years ago
parent c595e26843
commit c4ee752bbc

@ -10,7 +10,7 @@ use crate::data::json::JsonValue;
use crate::data::value::DataValue; use crate::data::value::DataValue;
use crate::transact::throwaway::ThrowawayId; use crate::transact::throwaway::ThrowawayId;
pub(crate) const SCRATCH_DB_KEY_PREFIX_LEN: usize = 4; pub(crate) const SCRATCH_DB_KEY_PREFIX_LEN: usize = 6;
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
pub enum TupleError { pub enum TupleError {
@ -41,10 +41,22 @@ impl Tuple {
self.0.len() self.0.len()
} }
pub(crate) fn encode_as_key(&self, prefix: ThrowawayId) -> Vec<u8> { pub(crate) fn encode_as_key(&self, prefix: ThrowawayId) -> Vec<u8> {
self.encode_as_key_for_epoch(prefix, 0)
}
pub(crate) fn encode_as_key_for_epoch(&self, prefix: ThrowawayId, epoch: u32) -> Vec<u8> {
let len = self.arity(); let len = self.arity();
let mut ret = Vec::with_capacity(4 + 4 * len + 10 * len); let mut ret = Vec::with_capacity(4 + 4 * len + 10 * len);
ret.extend(prefix.0.to_be_bytes()); let prefix_bytes = prefix.0.to_be_bytes();
ret.extend((len as u32).to_be_bytes()); let epoch_bytes = epoch.to_be_bytes();
ret.extend([
prefix_bytes[1],
prefix_bytes[2],
prefix_bytes[3],
epoch_bytes[1],
epoch_bytes[2],
epoch_bytes[3],
]);
ret.extend((len as u16).to_be_bytes());
ret.resize(4 * (len + 1), 0); ret.resize(4 * (len + 1), 0);
for (idx, val) in self.0.iter().enumerate() { for (idx, val) in self.0.iter().enumerate() {
if idx > 0 { if idx > 0 {
@ -69,23 +81,65 @@ impl<'a> From<&'a [u8]> for EncodedTuple<'a> {
} }
impl<'a> EncodedTuple<'a> { impl<'a> EncodedTuple<'a> {
pub(crate) fn bounds_for_prefix(bound: u32) -> ([u8; 4], [u8; 4]) { pub(crate) fn bounds_for_prefix(prefix: ThrowawayId) -> ([u8; 6], [u8; 6]) {
(bound.to_be_bytes(), (bound + 1).to_be_bytes()) let prefix_bytes = prefix.0.to_be_bytes();
let next_prefix_bytes = (prefix.0+1).to_be_bytes();
(
[
prefix_bytes[1],
prefix_bytes[2],
prefix_bytes[3],
0,
0,
0,
],
[
next_prefix_bytes[1],
next_prefix_bytes[2],
next_prefix_bytes[3],
0,
0,
0,
],
)
}
pub(crate) fn bounds_for_prefix_and_epoch(prefix: ThrowawayId, epoch: u32) -> ([u8; 6], [u8; 6]) {
let prefix_bytes = prefix.0.to_be_bytes();
let epoch_bytes = epoch.to_be_bytes();
let epoch_bytes_upper = (epoch + 1).to_be_bytes();
(
[
prefix_bytes[1],
prefix_bytes[2],
prefix_bytes[3],
epoch_bytes[1],
epoch_bytes[2],
epoch_bytes[3],
],
[
prefix_bytes[1],
prefix_bytes[2],
prefix_bytes[3],
epoch_bytes_upper[1],
epoch_bytes_upper[2],
epoch_bytes_upper[3],
],
)
} }
pub(crate) fn prefix(&self) -> Result<u32, TupleError> { pub(crate) fn prefix(&self) -> Result<(ThrowawayId, u32), TupleError> {
if self.0.len() < 4 { if self.0.len() < 6 {
Err(TupleError::BadData( Err(TupleError::BadData(
"bad data length".to_string(), "bad data length".to_string(),
self.0.to_vec(), self.0.to_vec(),
)) ))
} else { } else {
Ok(u32::from_be_bytes([ let id = u32::from_be_bytes([0, self.0[0], self.0[1], self.0[2]]);
self.0[0], self.0[1], self.0[2], self.0[3], let epoch = u32::from_be_bytes([0, self.0[3], self.0[4], self.0[5]]);
])) Ok((ThrowawayId(id), epoch))
} }
} }
pub(crate) fn arity(&self) -> Result<usize, TupleError> { pub(crate) fn arity(&self) -> Result<usize, TupleError> {
if self.0.len() == 4 { if self.0.len() == 6 {
return Ok(0); return Ok(0);
} }
if self.0.len() < 8 { if self.0.len() < 8 {
@ -94,12 +148,12 @@ impl<'a> EncodedTuple<'a> {
self.0.to_vec(), self.0.to_vec(),
)) ))
} else { } else {
Ok(u32::from_be_bytes([self.0[4], self.0[5], self.0[6], self.0[7]]) as usize) Ok(u16::from_be_bytes([self.0[6], self.0[7]]) as usize)
} }
} }
fn force_get(&self, idx: usize) -> DataValue { fn force_get(&self, idx: usize) -> DataValue {
let pos = if idx == 0 { let pos = if idx == 0 {
let arity = u32::from_be_bytes([self.0[4], self.0[5], self.0[6], self.0[7]]) as usize; let arity = u16::from_be_bytes([self.0[6], self.0[7]]) as usize;
4 * (arity + 1) 4 * (arity + 1)
} else { } else {
let len_pos = (idx + 1) * 4; let len_pos = (idx + 1) * 4;

@ -1,5 +1,6 @@
use std::collections::btree_map::Entry; use std::collections::btree_map::Entry;
use std::collections::{BTreeMap, BTreeSet}; use std::collections::{BTreeMap, BTreeSet};
use std::mem;
use std::ops::Sub; use std::ops::Sub;
use anyhow::Result; use anyhow::Result;
@ -9,7 +10,6 @@ use serde_json::Map;
use crate::data::attr::Attribute; use crate::data::attr::Attribute;
use crate::data::json::JsonValue; use crate::data::json::JsonValue;
use crate::data::keyword::Keyword; use crate::data::keyword::Keyword;
use crate::data::tuple::TupleIter;
use crate::data::value::DataValue; use crate::data::value::DataValue;
use crate::preprocess::triple::TxError; use crate::preprocess::triple::TxError;
use crate::runtime::transact::SessionTx; use crate::runtime::transact::SessionTx;
@ -174,55 +174,71 @@ impl SessionTx {
// dbg!(&compiled); // dbg!(&compiled);
for epoch in 1u32.. { let mut changed: BTreeMap<_, _> = compiled.keys().map(|k| (k, false)).collect();
let mut prev_changed = changed.clone();
for epoch in 0u32.. {
eprintln!("epoch {}", epoch); eprintln!("epoch {}", epoch);
let mut new_derived = false; if epoch == 0 {
let snapshot = self.throwaway.make_snapshot();
if epoch == 1 {
let epoch_encoded = epoch.to_be_bytes();
for (k, rules) in compiled.iter() { for (k, rules) in compiled.iter() {
let (store, _arity) = stores.get(k).unwrap(); let (store, _arity) = stores.get(k).unwrap();
let use_delta = BTreeSet::default(); let use_delta = BTreeSet::default();
for (rule_n, (_head, _deriving_rules, relation)) in rules.iter().enumerate() { for (rule_n, (_head, _deriving_rules, relation)) in rules.iter().enumerate() {
eprintln!("initial calculation for rule {}.{}", k, rule_n); eprintln!("initial calculation for rule {}.{}", k, rule_n);
for item_res in relation.iter(self, epoch, &use_delta, &snapshot) { for item_res in relation.iter(self, Some(0), &use_delta) {
let item = item_res?; let item = item_res?;
eprintln!("item for {}.{}: {:?} at {}", k, rule_n, item, epoch); eprintln!("item for {}.{}: {:?} at {}", k, rule_n, item, epoch);
store.put(&item, &epoch_encoded)?; store.put(&item, 0)?;
new_derived = true; *changed.get_mut(k).unwrap() = true;
} }
} }
} }
} else { } else {
let epoch_encoded = epoch.to_be_bytes(); mem::swap(&mut changed, &mut prev_changed);
for (_k, v) in changed.iter_mut() {
*v = false;
}
for (k, rules) in compiled.iter() { for (k, rules) in compiled.iter() {
let (store, _arity) = stores.get(k).unwrap(); let (store, _arity) = stores.get(k).unwrap();
for (rule_n, (_head, deriving_rules, relation)) in rules.iter().enumerate() { for (rule_n, (_head, deriving_rules, relation)) in rules.iter().enumerate() {
let mut should_do_calculation = false;
for drule in deriving_rules {
if *prev_changed.get(drule).unwrap() {
should_do_calculation = true;
break;
}
}
if !should_do_calculation {
eprintln!("skipping rule {}.{} as none of its dependencies changed in the last iteration", k, rule_n);
continue;
}
for (delta_key, (delta_store, _)) in stores.iter() { for (delta_key, (delta_store, _)) in stores.iter() {
if !deriving_rules.contains(delta_key) { if !deriving_rules.contains(delta_key) {
continue; continue;
} }
eprintln!("with delta {} for rule {}.{}", delta_key, k, rule_n); eprintln!("with delta {} for rule {}.{}", delta_key, k, rule_n);
let use_delta = BTreeSet::from([delta_store.id]); let use_delta = BTreeSet::from([delta_store.id]);
for item_res in relation.iter(self, epoch, &use_delta, &snapshot) { for item_res in relation.iter(self, Some(epoch), &use_delta) {
// todo: if the relation does not depend on the delta, skip
let item = item_res?; let item = item_res?;
// improvement: the clauses can actually be evaluated in parallel // improvement: the clauses can actually be evaluated in parallel
if store.put_if_absent(&item, &epoch_encoded)? { if store.exists(&item, 0)? {
eprintln!("item for {}.{}: {:?} at {}", k, rule_n, item, epoch);
new_derived = true;
} else {
eprintln!( eprintln!(
"item for {}.{}: {:?} at {}, rederived", "item for {}.{}: {:?} at {}, rederived",
k, rule_n, item, epoch k, rule_n, item, epoch
); );
} else {
eprintln!("item for {}.{}: {:?} at {}", k, rule_n, item, epoch);
*changed.get_mut(k).unwrap() = true;
store.put(&item, epoch)?;
store.put(&item, 0)?;
} }
} }
} }
} }
} }
} }
if !new_derived { if changed.values().all(|rule_changed| !*rule_changed) {
break; break;
} }
} }

@ -1,8 +1,8 @@
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::env::temp_dir; use std::env::temp_dir;
use std::fmt::{Debug, Formatter}; use std::fmt::{Debug, Formatter};
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, AtomicU64, AtomicUsize, Ordering}; use std::sync::atomic::{AtomicU32, AtomicU64, AtomicUsize, Ordering};
use std::sync::Arc;
use anyhow::Result; use anyhow::Result;
use itertools::Itertools; use itertools::Itertools;
@ -11,8 +11,7 @@ use uuid::Uuid;
use cozorocks::{DbBuilder, DbIter, RawRocksDb, RocksDb}; use cozorocks::{DbBuilder, DbIter, RawRocksDb, RocksDb};
use crate::AttrTxItem; use crate::data::compare::{rusty_cmp, DB_KEY_PREFIX_LEN};
use crate::data::compare::{DB_KEY_PREFIX_LEN, rusty_cmp};
use crate::data::encode::{ use crate::data::encode::{
decode_ea_key, decode_value_from_key, decode_value_from_val, encode_eav_key, StorageTag, decode_ea_key, decode_value_from_key, decode_value_from_val, encode_eav_key, StorageTag,
}; };
@ -23,6 +22,7 @@ use crate::data::tuple::{rusty_scratch_cmp, SCRATCH_DB_KEY_PREFIX_LEN};
use crate::data::value::DataValue; use crate::data::value::DataValue;
use crate::runtime::transact::SessionTx; use crate::runtime::transact::SessionTx;
use crate::transact::pull::CurrentPath; use crate::transact::pull::CurrentPath;
use crate::AttrTxItem;
pub struct Db { pub struct Db {
db: RocksDb, db: RocksDb,
@ -30,7 +30,7 @@ pub struct Db {
last_attr_id: Arc<AtomicU64>, last_attr_id: Arc<AtomicU64>,
last_ent_id: Arc<AtomicU64>, last_ent_id: Arc<AtomicU64>,
last_tx_id: Arc<AtomicU64>, last_tx_id: Arc<AtomicU64>,
throwaway_count: Arc<AtomicU32>, throwaway_idx: Arc<AtomicU32>,
n_sessions: Arc<AtomicUsize>, n_sessions: Arc<AtomicUsize>,
session_id: usize, session_id: usize,
} }
@ -70,7 +70,7 @@ impl Db {
last_attr_id: Arc::new(Default::default()), last_attr_id: Arc::new(Default::default()),
last_ent_id: Arc::new(Default::default()), last_ent_id: Arc::new(Default::default()),
last_tx_id: Arc::new(Default::default()), last_tx_id: Arc::new(Default::default()),
throwaway_count: Arc::new(Default::default()), throwaway_idx: Arc::new(Default::default()),
n_sessions: Arc::new(Default::default()), n_sessions: Arc::new(Default::default()),
session_id: Default::default(), session_id: Default::default(),
}; };
@ -87,7 +87,7 @@ impl Db {
last_attr_id: self.last_attr_id.clone(), last_attr_id: self.last_attr_id.clone(),
last_ent_id: self.last_ent_id.clone(), last_ent_id: self.last_ent_id.clone(),
last_tx_id: self.last_tx_id.clone(), last_tx_id: self.last_tx_id.clone(),
throwaway_count: self.throwaway_count.clone(), throwaway_idx: self.throwaway_idx.clone(),
n_sessions: self.n_sessions.clone(), n_sessions: self.n_sessions.clone(),
session_id: old_count + 1, session_id: old_count + 1,
}) })
@ -107,7 +107,7 @@ impl Db {
let ret = SessionTx { let ret = SessionTx {
tx: self.db.transact().set_snapshot(true).start(), tx: self.db.transact().set_snapshot(true).start(),
throwaway: self.throwaway_db.clone(), throwaway: self.throwaway_db.clone(),
throwaway_count: self.throwaway_count.clone(), throwaway_idx: self.throwaway_idx.clone(),
w_tx_id: None, w_tx_id: None,
last_attr_id: self.last_attr_id.clone(), last_attr_id: self.last_attr_id.clone(),
last_ent_id: self.last_ent_id.clone(), last_ent_id: self.last_ent_id.clone(),
@ -127,7 +127,7 @@ impl Db {
let ret = SessionTx { let ret = SessionTx {
tx: self.db.transact().set_snapshot(true).start(), tx: self.db.transact().set_snapshot(true).start(),
throwaway: self.throwaway_db.clone(), throwaway: self.throwaway_db.clone(),
throwaway_count: self.throwaway_count.clone(), throwaway_idx: self.throwaway_idx.clone(),
w_tx_id: Some(cur_tx_id), w_tx_id: Some(cur_tx_id),
last_attr_id: self.last_attr_id.clone(), last_attr_id: self.last_attr_id.clone(),
last_ent_id: self.last_ent_id.clone(), last_ent_id: self.last_ent_id.clone(),

@ -22,7 +22,7 @@ use crate::transact::throwaway::{ThrowawayArea, ThrowawayId};
pub struct SessionTx { pub struct SessionTx {
pub(crate) tx: Tx, pub(crate) tx: Tx,
pub(crate) throwaway: RawRocksDb, pub(crate) throwaway: RawRocksDb,
pub(crate) throwaway_count: Arc<AtomicU32>, pub(crate) throwaway_idx: Arc<AtomicU32>,
pub(crate) w_tx_id: Option<TxId>, pub(crate) w_tx_id: Option<TxId>,
pub(crate) last_attr_id: Arc<AtomicU64>, pub(crate) last_attr_id: Arc<AtomicU64>,
pub(crate) last_ent_id: Arc<AtomicU64>, pub(crate) last_ent_id: Arc<AtomicU64>,
@ -67,7 +67,8 @@ impl TxLog {
impl SessionTx { impl SessionTx {
pub(crate) fn new_throwaway(&self) -> ThrowawayArea { pub(crate) fn new_throwaway(&self) -> ThrowawayArea {
let old_count = self.throwaway_count.fetch_add(1, Ordering::AcqRel); let old_count = self.throwaway_idx.fetch_add(1, Ordering::AcqRel);
let old_count = old_count & 0x00ff_ffffu32;
ThrowawayArea { ThrowawayArea {
db: self.throwaway.clone(), db: self.throwaway.clone(),
id: ThrowawayId(old_count), id: ThrowawayId(old_count),

@ -1,10 +1,9 @@
use std::collections::{BTreeMap, BTreeSet}; use std::collections::{BTreeMap, BTreeSet};
use std::iter;
use anyhow::Result; use anyhow::Result;
use itertools::Itertools; use itertools::Itertools;
use cozorocks::SnapshotBridge;
use crate::data::attr::Attribute; use crate::data::attr::Attribute;
use crate::data::keyword::Keyword; use crate::data::keyword::Keyword;
use crate::data::tuple::{Tuple, TupleIter}; use crate::data::tuple::{Tuple, TupleIter};
@ -49,9 +48,8 @@ impl ReorderRelation {
fn iter<'a>( fn iter<'a>(
&'a self, &'a self,
tx: &'a SessionTx, tx: &'a SessionTx,
epoch: u32, epoch: Option<u32>,
use_delta: &BTreeSet<ThrowawayId>, use_delta: &BTreeSet<ThrowawayId>,
snapshot: &'a SnapshotBridge
) -> TupleIter<'a> { ) -> TupleIter<'a> {
let old_order = self.relation.bindings(); let old_order = self.relation.bindings();
let old_order_indices: BTreeMap<_, _> = old_order let old_order_indices: BTreeMap<_, _> = old_order
@ -70,7 +68,7 @@ impl ReorderRelation {
.collect_vec(); .collect_vec();
Box::new( Box::new(
self.relation self.relation
.iter(tx, epoch, use_delta, snapshot) .iter(tx, epoch, use_delta)
.map_ok(move |tuple| { .map_ok(move |tuple| {
let old = tuple.0; let old = tuple.0;
let new = reorder_indices let new = reorder_indices
@ -451,7 +449,7 @@ impl TripleRelation {
Err(e) => return Box::new([Err(e)].into_iter()), Err(e) => return Box::new([Err(e)].into_iter()),
Ok((_, eid, val)) => { Ok((_, eid, val)) => {
let t = Tuple(vec![val, DataValue::EnId(eid)]); let t = Tuple(vec![val, DataValue::EnId(eid)]);
if let Err(e) = throwaway.put(&t, &[]) { if let Err(e) = throwaway.put(&t, 0) {
return Box::new([Err(e.into())].into_iter()); return Box::new([Err(e.into())].into_iter());
} }
} }
@ -512,23 +510,27 @@ pub struct StoredDerivedRelation {
} }
impl StoredDerivedRelation { impl StoredDerivedRelation {
fn iter(&self, epoch: u32, use_delta: &BTreeSet<ThrowawayId>, snapshot: &SnapshotBridge) -> TupleIter { fn iter(&self, epoch: Option<u32>, use_delta: &BTreeSet<ThrowawayId>) -> TupleIter {
if use_delta.contains(&self.storage.id) { if epoch == Some(0) {
Box::new( return Box::new(iter::empty());
self.storage
.scan_all_with_snapshot(snapshot)
.filter_map_ok(move |(t, stored_epoch)| {
if let Some(stored_epoch) = stored_epoch {
if epoch > stored_epoch + 1 {
return None;
}
}
Some(t)
}),
)
} else {
Box::new(self.storage.scan_all_with_snapshot(snapshot).map_ok(|(t, _)| t))
} }
let scan_epoch = match epoch {
None => 0,
Some(ep) => {
if use_delta.contains(&self.storage.id) {
ep - 1
} else {
0
}
}
};
Box::new(
self.storage
.scan_all_for_epoch(scan_epoch)
.map_ok(|(t, _)| t),
)
} }
fn join_is_prefix(&self, right_join_indices: &[usize]) -> bool { fn join_is_prefix(&self, right_join_indices: &[usize]) -> bool {
let mut indices = right_join_indices.to_vec(); let mut indices = right_join_indices.to_vec();
@ -541,103 +543,64 @@ impl StoredDerivedRelation {
left_iter: TupleIter<'a>, left_iter: TupleIter<'a>,
(left_join_indices, right_join_indices): (Vec<usize>, Vec<usize>), (left_join_indices, right_join_indices): (Vec<usize>, Vec<usize>),
eliminate_indices: BTreeSet<usize>, eliminate_indices: BTreeSet<usize>,
epoch: u32, epoch: Option<u32>,
use_delta: &BTreeSet<ThrowawayId>, use_delta: &BTreeSet<ThrowawayId>,
snapshot: &'a SnapshotBridge,
) -> TupleIter<'a> { ) -> TupleIter<'a> {
if epoch == Some(0) {
return Box::new(iter::empty());
}
let mut right_invert_indices = right_join_indices.iter().enumerate().collect_vec(); let mut right_invert_indices = right_join_indices.iter().enumerate().collect_vec();
right_invert_indices.sort_by_key(|(_, b)| **b); right_invert_indices.sort_by_key(|(_, b)| **b);
let left_to_prefix_indices = right_invert_indices let left_to_prefix_indices = right_invert_indices
.into_iter() .into_iter()
.map(|(a, _)| left_join_indices[a]) .map(|(a, _)| left_join_indices[a])
.collect_vec(); .collect_vec();
if use_delta.contains(&self.storage.id) { let scan_epoch = match epoch {
Box::new( None => 0,
left_iter Some(ep) => {
.map_ok(move |tuple| { if use_delta.contains(&self.storage.id) {
let eliminate_indices = eliminate_indices.clone(); ep - 1
let prefix = Tuple( } else {
left_to_prefix_indices 0
.iter() }
.map(|i| tuple.0[*i].clone()) }
.collect_vec(), };
); Box::new(
self.storage left_iter
.scan_prefix_with_snapshot(&prefix, snapshot) .map_ok(move |tuple| {
.filter_map_ok(move |(found, meta)| { let eliminate_indices = eliminate_indices.clone();
if let Some(stored_epoch) = meta { let prefix = Tuple(
if epoch != stored_epoch + 1 { left_to_prefix_indices
return None; .iter()
} .map(|i| tuple.0[*i].clone())
} .collect_vec(),
// dbg!("filter", &tuple, &prefix, &found); );
let mut ret = tuple.0.clone(); self.storage
ret.extend(found.0); .scan_prefix_for_epoch(&prefix, scan_epoch)
.filter_map_ok(move |(found, meta)| {
if !eliminate_indices.is_empty() { // dbg!("filter", &tuple, &prefix, &found);
ret = ret let mut ret = tuple.0.clone();
.into_iter() ret.extend(found.0);
.enumerate()
.filter_map(|(i, v)| {
if eliminate_indices.contains(&i) {
None
} else {
Some(v)
}
})
.collect_vec();
}
Some(Tuple(ret))
})
})
.flatten_ok()
.map(flatten_err),
)
} else {
Box::new(
left_iter
.map_ok(move |tuple| {
let eliminate_indices = eliminate_indices.clone();
let prefix = Tuple(
left_to_prefix_indices
.iter()
.map(|i| tuple.0[*i].clone())
.collect_vec(),
);
self.storage
.scan_prefix_with_snapshot(&prefix, snapshot)
.filter_map_ok(move |(found, meta)| {
if let Some(stored_epoch) = meta {
if epoch == stored_epoch {
eprintln!("warning: read fresh data");
return None;
}
}
// dbg!("no-filter", &tuple, &prefix, &found); if !eliminate_indices.is_empty() {
let mut ret = tuple.0.clone(); ret = ret
ret.extend(found.0); .into_iter()
.enumerate()
if !eliminate_indices.is_empty() { .filter_map(|(i, v)| {
ret = ret if eliminate_indices.contains(&i) {
.into_iter() None
.enumerate() } else {
.filter_map(|(i, v)| { Some(v)
if eliminate_indices.contains(&i) { }
None })
} else { .collect_vec();
Some(v) }
} Some(Tuple(ret))
}) })
.collect_vec(); })
} .flatten_ok()
Some(Tuple(ret)) .map(flatten_err),
}) )
})
.flatten_ok()
.map(flatten_err),
)
}
} }
} }
@ -744,9 +707,8 @@ impl Relation {
pub fn iter<'a>( pub fn iter<'a>(
&'a self, &'a self,
tx: &'a SessionTx, tx: &'a SessionTx,
epoch: u32, epoch: Option<u32>,
use_delta: &BTreeSet<ThrowawayId>, use_delta: &BTreeSet<ThrowawayId>,
snapshot: &'a SnapshotBridge,
) -> TupleIter<'a> { ) -> TupleIter<'a> {
match self { match self {
Relation::Fixed(f) => Box::new(f.data.iter().map(|t| Ok(Tuple(t.clone())))), Relation::Fixed(f) => Box::new(f.data.iter().map(|t| Ok(Tuple(t.clone())))),
@ -754,9 +716,9 @@ impl Relation {
tx.triple_a_before_scan(r.attr.id, r.vld) tx.triple_a_before_scan(r.attr.id, r.vld)
.map_ok(|(_, e_id, y)| Tuple(vec![DataValue::EnId(e_id), y])), .map_ok(|(_, e_id, y)| Tuple(vec![DataValue::EnId(e_id), y])),
), ),
Relation::Derived(r) => r.iter(epoch, use_delta, snapshot), Relation::Derived(r) => r.iter(epoch, use_delta),
Relation::Join(j) => j.iter(tx, epoch, use_delta, snapshot), Relation::Join(j) => j.iter(tx, epoch, use_delta),
Relation::Reorder(r) => r.iter(tx, epoch, use_delta, snapshot), Relation::Reorder(r) => r.iter(tx, epoch, use_delta),
} }
} }
} }
@ -786,9 +748,8 @@ impl InnerJoin {
pub(crate) fn iter<'a>( pub(crate) fn iter<'a>(
&'a self, &'a self,
tx: &'a SessionTx, tx: &'a SessionTx,
epoch: u32, epoch: Option<u32>,
use_delta: &BTreeSet<ThrowawayId>, use_delta: &BTreeSet<ThrowawayId>,
snapshot: &'a SnapshotBridge,
) -> TupleIter<'a> { ) -> TupleIter<'a> {
let bindings = self.bindings(); let bindings = self.bindings();
let eliminate_indices = get_eliminate_indices(&bindings, &self.to_eliminate); let eliminate_indices = get_eliminate_indices(&bindings, &self.to_eliminate);
@ -799,7 +760,7 @@ impl InnerJoin {
.join_indices(&self.left.bindings(), &self.right.bindings()) .join_indices(&self.left.bindings(), &self.right.bindings())
.unwrap(); .unwrap();
f.join( f.join(
self.left.iter(tx, epoch, use_delta, snapshot), self.left.iter(tx, epoch, use_delta),
join_indices, join_indices,
eliminate_indices, eliminate_indices,
) )
@ -810,7 +771,7 @@ impl InnerJoin {
.join_indices(&self.left.bindings(), &self.right.bindings()) .join_indices(&self.left.bindings(), &self.right.bindings())
.unwrap(); .unwrap();
r.join( r.join(
self.left.iter(tx, epoch, use_delta, snapshot), self.left.iter(tx, epoch, use_delta),
join_indices, join_indices,
tx, tx,
eliminate_indices, eliminate_indices,
@ -823,20 +784,17 @@ impl InnerJoin {
.unwrap(); .unwrap();
if r.join_is_prefix(&join_indices.1) { if r.join_is_prefix(&join_indices.1) {
r.prefix_join( r.prefix_join(
self.left.iter(tx, epoch, use_delta, snapshot), self.left.iter(tx, epoch, use_delta),
join_indices, join_indices,
eliminate_indices, eliminate_indices,
epoch, epoch,
use_delta, use_delta,
snapshot
) )
} else { } else {
self.materialized_join(tx, eliminate_indices, epoch, use_delta, snapshot) self.materialized_join(tx, eliminate_indices, epoch, use_delta)
} }
} }
Relation::Join(_) => { Relation::Join(_) => self.materialized_join(tx, eliminate_indices, epoch, use_delta),
self.materialized_join(tx, eliminate_indices, epoch, use_delta, snapshot)
}
Relation::Reorder(_) => { Relation::Reorder(_) => {
panic!("joining on reordered") panic!("joining on reordered")
} }
@ -846,9 +804,8 @@ impl InnerJoin {
&'a self, &'a self,
tx: &'a SessionTx, tx: &'a SessionTx,
eliminate_indices: BTreeSet<usize>, eliminate_indices: BTreeSet<usize>,
epoch: u32, epoch: Option<u32>,
use_delta: &BTreeSet<ThrowawayId>, use_delta: &BTreeSet<ThrowawayId>,
snapshot: &'a SnapshotBridge,
) -> TupleIter<'a> { ) -> TupleIter<'a> {
let right_bindings = self.right.bindings(); let right_bindings = self.right.bindings();
let (left_join_indices, right_join_indices) = self let (left_join_indices, right_join_indices) = self
@ -869,7 +826,7 @@ impl InnerJoin {
.map(|(a, _)| a) .map(|(a, _)| a)
.collect_vec(); .collect_vec();
let throwaway = tx.new_throwaway(); let throwaway = tx.new_throwaway();
for item in self.right.iter(tx, epoch, use_delta, snapshot) { for item in self.right.iter(tx, epoch, use_delta) {
match item { match item {
Ok(tuple) => { Ok(tuple) => {
let stored_tuple = Tuple( let stored_tuple = Tuple(
@ -878,7 +835,7 @@ impl InnerJoin {
.map(|i| tuple.0[*i].clone()) .map(|i| tuple.0[*i].clone())
.collect_vec(), .collect_vec(),
); );
if let Err(e) = throwaway.put(&stored_tuple, &[]) { if let Err(e) = throwaway.put(&stored_tuple, 0) {
return Box::new([Err(e.into())].into_iter()); return Box::new([Err(e.into())].into_iter());
} }
} }
@ -887,7 +844,7 @@ impl InnerJoin {
} }
Box::new( Box::new(
self.left self.left
.iter(tx, epoch, use_delta, snapshot) .iter(tx, epoch, use_delta)
.map_ok(move |tuple| { .map_ok(move |tuple| {
let eliminate_indices = eliminate_indices.clone(); let eliminate_indices = eliminate_indices.clone();
let prefix = Tuple( let prefix = Tuple(

@ -1,6 +1,6 @@
use std::fmt::{Debug, Formatter}; use std::fmt::{Debug, Formatter};
use cozorocks::{DbIter, PinSlice, RawRocksDb, RocksDbStatus, SnapshotBridge}; use cozorocks::{DbIter, RawRocksDb, RocksDbStatus};
use crate::data::tuple::{EncodedTuple, Tuple}; use crate::data::tuple::{EncodedTuple, Tuple};
use crate::data::value::DataValue; use crate::data::value::DataValue;
@ -21,47 +21,46 @@ impl Debug for ThrowawayArea {
} }
impl ThrowawayArea { impl ThrowawayArea {
pub(crate) fn put(&self, tuple: &Tuple, value: &[u8]) -> Result<(), RocksDbStatus> { pub(crate) fn put(&self, tuple: &Tuple, epoch: u32) -> Result<(), RocksDbStatus> {
let key_encoded = tuple.encode_as_key(self.id); let key_encoded = tuple.encode_as_key_for_epoch(self.id, epoch);
self.db.put(&key_encoded, value) self.db.put(&key_encoded, &[])
} }
pub(crate) fn put_if_absent(&self, tuple: &Tuple, value: &[u8]) -> Result<bool, RocksDbStatus> { pub(crate) fn put_if_absent(
let key_encoded = tuple.encode_as_key(self.id); &self,
tuple: &Tuple,
epoch: u32,
) -> Result<bool, RocksDbStatus> {
let key_encoded = tuple.encode_as_key_for_epoch(self.id, epoch);
Ok(if !self.db.exists(&key_encoded)? { Ok(if !self.db.exists(&key_encoded)? {
self.db.put(&key_encoded, value)?; self.db.put(&key_encoded, &[])?;
true true
} else { } else {
false false
}) })
} }
pub(crate) fn get(&self, tuple: &Tuple) -> Result<Option<PinSlice>, RocksDbStatus> { // pub(crate) fn get(&self, tuple: &Tuple, epoch: u32) -> Result<Option<PinSlice>, RocksDbStatus> {
let key_encoded = tuple.encode_as_key(self.id); // let key_encoded = tuple.encode_as_key_for_epoch(self.id, epoch);
self.db.get(&key_encoded) // self.db.get(&key_encoded)
} // }
pub(crate) fn exists(&self, tuple: &Tuple) -> Result<bool, RocksDbStatus> { pub(crate) fn exists(&self, tuple: &Tuple, epoch: u32) -> Result<bool, RocksDbStatus> {
let key_encoded = tuple.encode_as_key(self.id); let key_encoded = tuple.encode_as_key_for_epoch(self.id, epoch);
self.db.exists(&key_encoded) self.db.exists(&key_encoded)
} }
pub(crate) fn del(&self, tuple: &Tuple) -> Result<(), RocksDbStatus> { pub(crate) fn del(&self, tuple: &Tuple, epoch: u32) -> Result<(), RocksDbStatus> {
let key_encoded = tuple.encode_as_key(self.id); let key_encoded = tuple.encode_as_key_for_epoch(self.id, epoch);
self.db.del(&key_encoded) self.db.del(&key_encoded)
} }
pub fn scan_all(&self) -> impl Iterator<Item = anyhow::Result<(Tuple, Option<u32>)>> { pub fn scan_all(&self) -> impl Iterator<Item = anyhow::Result<(Tuple, Option<u32>)>> {
let (lower, upper) = EncodedTuple::bounds_for_prefix(self.id.0); self.scan_all_for_epoch(0)
let mut it = self
.db
.iterator()
.upper_bound(&upper)
.prefix_same_as_start(true)
.start();
it.seek(&lower);
ThrowawayIter { it, started: false }
} }
pub fn scan_all_with_snapshot(&self, snapshot: &SnapshotBridge) -> impl Iterator<Item = anyhow::Result<(Tuple, Option<u32>)>> { pub fn scan_all_for_epoch(
let (lower, upper) = EncodedTuple::bounds_for_prefix(self.id.0); &self,
epoch: u32,
) -> impl Iterator<Item = anyhow::Result<(Tuple, Option<u32>)>> {
let (lower, upper) = EncodedTuple::bounds_for_prefix_and_epoch(self.id, epoch);
let mut it = self let mut it = self
.db .db
.iterator_with_snapshot(snapshot) .iterator()
.upper_bound(&upper) .upper_bound(&upper)
.prefix_same_as_start(true) .prefix_same_as_start(true)
.start(); .start();
@ -72,33 +71,21 @@ impl ThrowawayArea {
&self, &self,
prefix: &Tuple, prefix: &Tuple,
) -> impl Iterator<Item = anyhow::Result<(Tuple, Option<u32>)>> { ) -> impl Iterator<Item = anyhow::Result<(Tuple, Option<u32>)>> {
let mut upper = prefix.0.clone(); self.scan_prefix_for_epoch(prefix, 0)
upper.push(DataValue::Bottom);
let upper = Tuple(upper);
let upper = upper.encode_as_key(self.id);
let lower = prefix.encode_as_key(self.id);
let mut it = self
.db
.iterator()
.upper_bound(&upper)
.prefix_same_as_start(true)
.start();
it.seek(&lower);
ThrowawayIter { it, started: false }
} }
pub(crate) fn scan_prefix_with_snapshot( pub(crate) fn scan_prefix_for_epoch(
&self, &self,
prefix: &Tuple, prefix: &Tuple,
snapshot: &SnapshotBridge epoch: u32,
) -> impl Iterator<Item = anyhow::Result<(Tuple, Option<u32>)>> { ) -> impl Iterator<Item = anyhow::Result<(Tuple, Option<u32>)>> {
let mut upper = prefix.0.clone(); let mut upper = prefix.0.clone();
upper.push(DataValue::Bottom); upper.push(DataValue::Bottom);
let upper = Tuple(upper); let upper = Tuple(upper);
let upper = upper.encode_as_key(self.id); let upper = upper.encode_as_key_for_epoch(self.id, epoch);
let lower = prefix.encode_as_key(self.id); let lower = prefix.encode_as_key_for_epoch(self.id, epoch);
let mut it = self let mut it = self
.db .db
.iterator_with_snapshot(snapshot) .iterator()
.upper_bound(&upper) .upper_bound(&upper)
.prefix_same_as_start(true) .prefix_same_as_start(true)
.start(); .start();
@ -143,7 +130,7 @@ impl Iterator for ThrowawayIter {
impl Drop for ThrowawayArea { impl Drop for ThrowawayArea {
fn drop(&mut self) { fn drop(&mut self) {
let (lower, upper) = EncodedTuple::bounds_for_prefix(self.id.0); let (lower, upper) = EncodedTuple::bounds_for_prefix(self.id);
if let Err(e) = self.db.range_del(&lower, &upper) { if let Err(e) = self.db.range_del(&lower, &upper) {
eprintln!("{}", e); eprintln!("{}", e);
} }

Loading…
Cancel
Save