working semi-naive with snapshot

main
Ziyang Hu 2 years ago
parent 067708f2fb
commit bc23afa0f4

@ -12,6 +12,17 @@
#include "tx.h" #include "tx.h"
#include "slice.h" #include "slice.h"
struct SnapshotBridge {
const Snapshot *snapshot;
DB *db;
explicit SnapshotBridge(const Snapshot *snapshot_, DB *db_) : snapshot(snapshot_), db(db_) {}
~SnapshotBridge() {
db->ReleaseSnapshot(snapshot);
}
};
struct RawRocksDbBridge { struct RawRocksDbBridge {
unique_ptr<DB> db; unique_ptr<DB> db;
unique_ptr<Comparator> comparator; unique_ptr<Comparator> comparator;
@ -35,6 +46,11 @@ struct RawRocksDbBridge {
} }
} }
shared_ptr<SnapshotBridge> make_snapshot() const {
const Snapshot *snapshot = db->GetSnapshot();
return make_shared<SnapshotBridge>(snapshot, &*db);
}
inline void set_ignore_range_deletions(bool v) const { inline void set_ignore_range_deletions(bool v) const {
r_opts->ignore_range_deletions = v; r_opts->ignore_range_deletions = v;
} }
@ -47,6 +63,12 @@ struct RawRocksDbBridge {
return make_unique<IterBridge>(&*db); return make_unique<IterBridge>(&*db);
}; };
inline unique_ptr<IterBridge> iterator_with_snapshot(const SnapshotBridge &sb) const {
auto ret = make_unique<IterBridge>(&*db);
ret->set_snapshot(sb.snapshot);
return ret;
};
inline unique_ptr<PinnableSlice> get(RustBytes key, RocksDbStatus &status) const { inline unique_ptr<PinnableSlice> get(RustBytes key, RocksDbStatus &status) const {
Slice key_ = convert_slice(key); Slice key_ = convert_slice(key);
auto ret = make_unique<PinnableSlice>(); auto ret = make_unique<PinnableSlice>();

@ -31,6 +31,10 @@ struct IterBridge {
r_opts->auto_prefix_mode = true; r_opts->auto_prefix_mode = true;
} }
inline void set_snapshot(const Snapshot *snapshot) {
r_opts->snapshot = snapshot;
}
// inline ReadOptions &get_r_opts() { // inline ReadOptions &get_r_opts() {
// return *r_opts; // return *r_opts;
// } // }

@ -2,9 +2,9 @@ use std::borrow::Cow;
use cxx::*; use cxx::*;
use crate::{IterBuilder, PinSlice};
use crate::bridge::ffi::*; use crate::bridge::ffi::*;
use crate::bridge::tx::TxBuilder; use crate::bridge::tx::TxBuilder;
use crate::{IterBuilder, PinSlice};
#[derive(Default)] #[derive(Default)]
pub struct DbBuilder<'a> { pub struct DbBuilder<'a> {
@ -173,6 +173,10 @@ impl RawRocksDb {
self self
} }
#[inline] #[inline]
pub fn make_snapshot(&self) -> SharedPtr<SnapshotBridge> {
self.inner.make_snapshot()
}
#[inline]
pub fn iterator(&self) -> IterBuilder { pub fn iterator(&self) -> IterBuilder {
IterBuilder { IterBuilder {
inner: self.inner.iterator(), inner: self.inner.iterator(),
@ -180,6 +184,13 @@ impl RawRocksDb {
.auto_prefix_mode(true) .auto_prefix_mode(true)
} }
#[inline] #[inline]
pub fn iterator_with_snapshot(&self, snapshot: &SnapshotBridge) -> IterBuilder {
IterBuilder {
inner: self.inner.iterator_with_snapshot(snapshot),
}
.auto_prefix_mode(true)
}
#[inline]
pub fn put(&self, key: &[u8], val: &[u8]) -> Result<(), RocksDbStatus> { pub fn put(&self, key: &[u8], val: &[u8]) -> Result<(), RocksDbStatus> {
let mut status = RocksDbStatus::default(); let mut status = RocksDbStatus::default();
self.inner.put(key, val, &mut status); self.inner.put(key, val, &mut status);

@ -108,6 +108,7 @@ pub(crate) mod ffi {
// type ReadOptions; // type ReadOptions;
type RawRocksDbBridge; type RawRocksDbBridge;
pub type SnapshotBridge;
fn get_db_path(self: &RawRocksDbBridge) -> &CxxString; fn get_db_path(self: &RawRocksDbBridge) -> &CxxString;
fn open_raw_db( fn open_raw_db(
builder: &DbOpts, builder: &DbOpts,
@ -117,7 +118,9 @@ pub(crate) mod ffi {
no_wal: bool, no_wal: bool,
) -> SharedPtr<RawRocksDbBridge>; ) -> SharedPtr<RawRocksDbBridge>;
fn set_ignore_range_deletions(self: &RawRocksDbBridge, val: bool); fn set_ignore_range_deletions(self: &RawRocksDbBridge, val: bool);
fn make_snapshot(self: &RawRocksDbBridge) -> SharedPtr<SnapshotBridge>;
fn iterator(self: &RawRocksDbBridge) -> UniquePtr<IterBridge>; fn iterator(self: &RawRocksDbBridge) -> UniquePtr<IterBridge>;
fn iterator_with_snapshot(self: &RawRocksDbBridge, snapshot: &SnapshotBridge) -> UniquePtr<IterBridge>;
fn get( fn get(
self: &RawRocksDbBridge, self: &RawRocksDbBridge,
key: &[u8], key: &[u8],

@ -2,6 +2,7 @@ pub use bridge::db::DbBuilder;
pub use bridge::db::RawRocksDb; pub use bridge::db::RawRocksDb;
pub use bridge::db::RocksDb; pub use bridge::db::RocksDb;
pub use bridge::ffi::RocksDbStatus; pub use bridge::ffi::RocksDbStatus;
pub use bridge::ffi::SnapshotBridge;
pub use bridge::ffi::StatusCode; pub use bridge::ffi::StatusCode;
pub use bridge::ffi::StatusSeverity; pub use bridge::ffi::StatusSeverity;
pub use bridge::ffi::StatusSubCode; pub use bridge::ffi::StatusSubCode;

@ -173,17 +173,18 @@ impl SessionTx {
// dbg!(&compiled); // dbg!(&compiled);
for epoch in 1u32.. { for epoch in 1u32.. {
// eprintln!("epoch {}", epoch); eprintln!("epoch {}", epoch);
let mut new_derived = false; let mut new_derived = false;
let snapshot = self.throwaway.make_snapshot();
if epoch == 1 { if epoch == 1 {
let epoch_encoded = epoch.to_be_bytes(); let epoch_encoded = epoch.to_be_bytes();
for (k, rules) in compiled.iter() { for (k, rules) in compiled.iter() {
let (store, _arity) = stores.get(k).unwrap(); let (store, _arity) = stores.get(k).unwrap();
let use_delta = BTreeSet::default(); let use_delta = BTreeSet::default();
for (rule_n, (_head, relation)) in rules.iter().enumerate() { for (rule_n, (_head, relation)) in rules.iter().enumerate() {
for item_res in relation.iter(self, epoch, &use_delta) { for item_res in relation.iter(self, epoch, &use_delta, &snapshot) {
let item = item_res?; let item = item_res?;
// eprintln!("item for {}.{}: {:?} at {}", k, rule_n, item, epoch); eprintln!("item for {}.{}: {:?} at {}", k, rule_n, item, epoch);
store.put(&item, &epoch_encoded)?; store.put(&item, &epoch_encoded)?;
new_derived = true; new_derived = true;
} }
@ -196,13 +197,15 @@ impl SessionTx {
for (rule_n, (_head, relation)) in rules.iter().enumerate() { for (rule_n, (_head, relation)) in rules.iter().enumerate() {
for (delta_store, _) in stores.values() { for (delta_store, _) in stores.values() {
let use_delta = BTreeSet::from([delta_store.id]); let use_delta = BTreeSet::from([delta_store.id]);
for item_res in relation.iter(self, epoch, &use_delta) { for item_res in relation.iter(self, epoch, &use_delta, &snapshot) {
// todo: if the relation does not depend on the delta, skip // todo: if the relation does not depend on the delta, skip
let item = item_res?; let item = item_res?;
// improvement: the clauses can actually be evaluated in parallel // improvement: the clauses can actually be evaluated in parallel
if store.put_if_absent(&item, &epoch_encoded)? { if store.put_if_absent(&item, &epoch_encoded)? {
// eprintln!("item for {}.{}: {:?} at {}", k, rule_n, item, epoch); eprintln!("item for {}.{}: {:?} at {}", k, rule_n, item, epoch);
new_derived = true; new_derived = true;
} else {
eprintln!("item for {}.{}: {:?} at {}, rederived", k, rule_n, item, epoch);
} }
} }
} }

@ -3,6 +3,8 @@ use std::collections::{BTreeMap, BTreeSet};
use anyhow::Result; use anyhow::Result;
use itertools::Itertools; use itertools::Itertools;
use cozorocks::SnapshotBridge;
use crate::data::attr::Attribute; use crate::data::attr::Attribute;
use crate::data::keyword::Keyword; use crate::data::keyword::Keyword;
use crate::data::tuple::{Tuple, TupleIter}; use crate::data::tuple::{Tuple, TupleIter};
@ -49,6 +51,7 @@ impl ReorderRelation {
tx: &'a SessionTx, tx: &'a SessionTx,
epoch: u32, epoch: u32,
use_delta: &BTreeSet<ThrowawayId>, use_delta: &BTreeSet<ThrowawayId>,
snapshot: &'a SnapshotBridge
) -> TupleIter<'a> { ) -> TupleIter<'a> {
let old_order = self.relation.bindings(); let old_order = self.relation.bindings();
let old_order_indices: BTreeMap<_, _> = old_order let old_order_indices: BTreeMap<_, _> = old_order
@ -67,7 +70,7 @@ impl ReorderRelation {
.collect_vec(); .collect_vec();
Box::new( Box::new(
self.relation self.relation
.iter(tx, epoch, use_delta) .iter(tx, epoch, use_delta, snapshot)
.map_ok(move |tuple| { .map_ok(move |tuple| {
let old = tuple.0; let old = tuple.0;
let new = reorder_indices let new = reorder_indices
@ -509,11 +512,11 @@ pub struct StoredDerivedRelation {
} }
impl StoredDerivedRelation { impl StoredDerivedRelation {
fn iter(&self, epoch: u32, use_delta: &BTreeSet<ThrowawayId>) -> TupleIter { fn iter(&self, epoch: u32, use_delta: &BTreeSet<ThrowawayId>, snapshot: &SnapshotBridge) -> TupleIter {
if use_delta.contains(&self.storage.id) { if use_delta.contains(&self.storage.id) {
Box::new( Box::new(
self.storage self.storage
.scan_all() .scan_all_with_snapshot(snapshot)
.filter_map_ok(move |(t, stored_epoch)| { .filter_map_ok(move |(t, stored_epoch)| {
if let Some(stored_epoch) = stored_epoch { if let Some(stored_epoch) = stored_epoch {
if epoch > stored_epoch + 1 { if epoch > stored_epoch + 1 {
@ -524,7 +527,7 @@ impl StoredDerivedRelation {
}), }),
) )
} else { } else {
Box::new(self.storage.scan_all().map_ok(|(t, _)| t)) Box::new(self.storage.scan_all_with_snapshot(snapshot).map_ok(|(t, _)| t))
} }
} }
fn join_is_prefix(&self, right_join_indices: &[usize]) -> bool { fn join_is_prefix(&self, right_join_indices: &[usize]) -> bool {
@ -540,6 +543,7 @@ impl StoredDerivedRelation {
eliminate_indices: BTreeSet<usize>, eliminate_indices: BTreeSet<usize>,
epoch: u32, epoch: u32,
use_delta: &BTreeSet<ThrowawayId>, use_delta: &BTreeSet<ThrowawayId>,
snapshot: &'a SnapshotBridge,
) -> TupleIter<'a> { ) -> TupleIter<'a> {
let mut right_invert_indices = right_join_indices.iter().enumerate().collect_vec(); let mut right_invert_indices = right_join_indices.iter().enumerate().collect_vec();
right_invert_indices.sort_by_key(|(_, b)| **b); right_invert_indices.sort_by_key(|(_, b)| **b);
@ -559,7 +563,7 @@ impl StoredDerivedRelation {
.collect_vec(), .collect_vec(),
); );
self.storage self.storage
.scan_prefix(&prefix) .scan_prefix_with_snapshot(&prefix, snapshot)
.filter_map_ok(move |(found, meta)| { .filter_map_ok(move |(found, meta)| {
if let Some(stored_epoch) = meta { if let Some(stored_epoch) = meta {
if epoch != stored_epoch + 1 { if epoch != stored_epoch + 1 {
@ -601,10 +605,11 @@ impl StoredDerivedRelation {
.collect_vec(), .collect_vec(),
); );
self.storage self.storage
.scan_prefix(&prefix) .scan_prefix_with_snapshot(&prefix, snapshot)
.filter_map_ok(move |(found, meta)| { .filter_map_ok(move |(found, meta)| {
if let Some(stored_epoch) = meta { if let Some(stored_epoch) = meta {
if epoch == stored_epoch { if epoch == stored_epoch {
eprintln!("warning: read fresh data");
return None; return None;
} }
} }
@ -741,6 +746,7 @@ impl Relation {
tx: &'a SessionTx, tx: &'a SessionTx,
epoch: u32, epoch: u32,
use_delta: &BTreeSet<ThrowawayId>, use_delta: &BTreeSet<ThrowawayId>,
snapshot: &'a SnapshotBridge,
) -> TupleIter<'a> { ) -> TupleIter<'a> {
match self { match self {
Relation::Fixed(f) => Box::new(f.data.iter().map(|t| Ok(Tuple(t.clone())))), Relation::Fixed(f) => Box::new(f.data.iter().map(|t| Ok(Tuple(t.clone())))),
@ -748,9 +754,9 @@ impl Relation {
tx.triple_a_before_scan(r.attr.id, r.vld) tx.triple_a_before_scan(r.attr.id, r.vld)
.map_ok(|(_, e_id, y)| Tuple(vec![DataValue::EnId(e_id), y])), .map_ok(|(_, e_id, y)| Tuple(vec![DataValue::EnId(e_id), y])),
), ),
Relation::Derived(r) => r.iter(epoch, use_delta), Relation::Derived(r) => r.iter(epoch, use_delta, snapshot),
Relation::Join(j) => j.iter(tx, epoch, use_delta), Relation::Join(j) => j.iter(tx, epoch, use_delta, snapshot),
Relation::Reorder(r) => r.iter(tx, epoch, use_delta), Relation::Reorder(r) => r.iter(tx, epoch, use_delta, snapshot),
} }
} }
} }
@ -782,6 +788,7 @@ impl InnerJoin {
tx: &'a SessionTx, tx: &'a SessionTx,
epoch: u32, epoch: u32,
use_delta: &BTreeSet<ThrowawayId>, use_delta: &BTreeSet<ThrowawayId>,
snapshot: &'a SnapshotBridge,
) -> TupleIter<'a> { ) -> TupleIter<'a> {
let bindings = self.bindings(); let bindings = self.bindings();
let eliminate_indices = get_eliminate_indices(&bindings, &self.to_eliminate); let eliminate_indices = get_eliminate_indices(&bindings, &self.to_eliminate);
@ -792,7 +799,7 @@ impl InnerJoin {
.join_indices(&self.left.bindings(), &self.right.bindings()) .join_indices(&self.left.bindings(), &self.right.bindings())
.unwrap(); .unwrap();
f.join( f.join(
self.left.iter(tx, epoch, use_delta), self.left.iter(tx, epoch, use_delta, snapshot),
join_indices, join_indices,
eliminate_indices, eliminate_indices,
) )
@ -803,7 +810,7 @@ impl InnerJoin {
.join_indices(&self.left.bindings(), &self.right.bindings()) .join_indices(&self.left.bindings(), &self.right.bindings())
.unwrap(); .unwrap();
r.join( r.join(
self.left.iter(tx, epoch, use_delta), self.left.iter(tx, epoch, use_delta, snapshot),
join_indices, join_indices,
tx, tx,
eliminate_indices, eliminate_indices,
@ -816,17 +823,20 @@ impl InnerJoin {
.unwrap(); .unwrap();
if r.join_is_prefix(&join_indices.1) { if r.join_is_prefix(&join_indices.1) {
r.prefix_join( r.prefix_join(
self.left.iter(tx, epoch, use_delta), self.left.iter(tx, epoch, use_delta, snapshot),
join_indices, join_indices,
eliminate_indices, eliminate_indices,
epoch, epoch,
use_delta, use_delta,
snapshot
) )
} else { } else {
self.materialized_join(tx, eliminate_indices, epoch, use_delta) self.materialized_join(tx, eliminate_indices, epoch, use_delta, snapshot)
}
} }
Relation::Join(_) => {
self.materialized_join(tx, eliminate_indices, epoch, use_delta, snapshot)
} }
Relation::Join(_) => self.materialized_join(tx, eliminate_indices, epoch, use_delta),
Relation::Reorder(_) => { Relation::Reorder(_) => {
panic!("joining on reordered") panic!("joining on reordered")
} }
@ -838,6 +848,7 @@ impl InnerJoin {
eliminate_indices: BTreeSet<usize>, eliminate_indices: BTreeSet<usize>,
epoch: u32, epoch: u32,
use_delta: &BTreeSet<ThrowawayId>, use_delta: &BTreeSet<ThrowawayId>,
snapshot: &'a SnapshotBridge,
) -> TupleIter<'a> { ) -> TupleIter<'a> {
let right_bindings = self.right.bindings(); let right_bindings = self.right.bindings();
let (left_join_indices, right_join_indices) = self let (left_join_indices, right_join_indices) = self
@ -858,7 +869,7 @@ impl InnerJoin {
.map(|(a, _)| a) .map(|(a, _)| a)
.collect_vec(); .collect_vec();
let throwaway = tx.new_throwaway(); let throwaway = tx.new_throwaway();
for item in self.right.iter(tx, epoch, use_delta) { for item in self.right.iter(tx, epoch, use_delta, snapshot) {
match item { match item {
Ok(tuple) => { Ok(tuple) => {
let stored_tuple = Tuple( let stored_tuple = Tuple(
@ -876,7 +887,7 @@ impl InnerJoin {
} }
Box::new( Box::new(
self.left self.left
.iter(tx, epoch, use_delta) .iter(tx, epoch, use_delta, snapshot)
.map_ok(move |tuple| { .map_ok(move |tuple| {
let eliminate_indices = eliminate_indices.clone(); let eliminate_indices = eliminate_indices.clone();
let prefix = Tuple( let prefix = Tuple(

@ -1,6 +1,6 @@
use std::fmt::{Debug, Formatter}; use std::fmt::{Debug, Formatter};
use cozorocks::{DbIter, PinSlice, RawRocksDb, RocksDbStatus}; use cozorocks::{DbIter, PinSlice, RawRocksDb, RocksDbStatus, SnapshotBridge};
use crate::data::tuple::{EncodedTuple, Tuple}; use crate::data::tuple::{EncodedTuple, Tuple};
use crate::data::value::DataValue; use crate::data::value::DataValue;
@ -57,6 +57,17 @@ impl ThrowawayArea {
it.seek(&lower); it.seek(&lower);
ThrowawayIter { it, started: false } ThrowawayIter { it, started: false }
} }
pub fn scan_all_with_snapshot(&self, snapshot: &SnapshotBridge) -> impl Iterator<Item = anyhow::Result<(Tuple, Option<u32>)>> {
let (lower, upper) = EncodedTuple::bounds_for_prefix(self.id.0);
let mut it = self
.db
.iterator_with_snapshot(snapshot)
.upper_bound(&upper)
.prefix_same_as_start(true)
.start();
it.seek(&lower);
ThrowawayIter { it, started: false }
}
pub(crate) fn scan_prefix( pub(crate) fn scan_prefix(
&self, &self,
prefix: &Tuple, prefix: &Tuple,
@ -75,6 +86,25 @@ impl ThrowawayArea {
it.seek(&lower); it.seek(&lower);
ThrowawayIter { it, started: false } ThrowawayIter { it, started: false }
} }
pub(crate) fn scan_prefix_with_snapshot(
&self,
prefix: &Tuple,
snapshot: &SnapshotBridge
) -> impl Iterator<Item = anyhow::Result<(Tuple, Option<u32>)>> {
let mut upper = prefix.0.clone();
upper.push(DataValue::Bottom);
let upper = Tuple(upper);
let upper = upper.encode_as_key(self.id);
let lower = prefix.encode_as_key(self.id);
let mut it = self
.db
.iterator_with_snapshot(snapshot)
.upper_bound(&upper)
.prefix_same_as_start(true)
.start();
it.seek(&lower);
ThrowawayIter { it, started: false }
}
} }
struct ThrowawayIter { struct ThrowawayIter {

Loading…
Cancel
Save