stored relation

main
Ziyang Hu 2 years ago
parent e4c09a5d8e
commit fc69de69b8

@ -11,7 +11,9 @@ use crate::data::keyword::Keyword;
use crate::data::value::DataValue; use crate::data::value::DataValue;
use crate::preprocess::triple::TxError; use crate::preprocess::triple::TxError;
use crate::runtime::transact::SessionTx; use crate::runtime::transact::SessionTx;
use crate::transact::query::{InlineFixedRelation, InnerJoin, Joiner, Relation, TripleRelation}; use crate::transact::query::{
InlineFixedRelation, InnerJoin, Joiner, Relation, StoredDerivedRelation, TripleRelation,
};
use crate::transact::throwaway::ThrowawayArea; use crate::transact::throwaway::ThrowawayArea;
use crate::{EntityId, Validity}; use crate::{EntityId, Validity};
@ -42,6 +44,8 @@ pub enum QueryProcError {
UnexpectedForm(JsonValue, String), UnexpectedForm(JsonValue, String),
#[error("arity mismatch for rule {0}: all definitions must have the same arity")] #[error("arity mismatch for rule {0}: all definitions must have the same arity")]
ArityMismatch(Keyword), ArityMismatch(Keyword),
#[error("encountered undefined rule {0}")]
UndefinedRule(Keyword),
} }
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
@ -98,7 +102,6 @@ pub enum Atom {
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct RuleSet { pub struct RuleSet {
pub(crate) storage: Option<ThrowawayArea>,
pub(crate) sets: Vec<Rule>, pub(crate) sets: Vec<Rule>,
pub(crate) arity: usize, pub(crate) arity: usize,
} }
@ -117,6 +120,8 @@ impl RuleSet {
} }
} }
pub(crate) type DatalogProgram = BTreeMap<Keyword, RuleSet>;
#[derive(Clone, Debug, Default)] #[derive(Clone, Debug, Default)]
pub enum Aggregation { pub enum Aggregation {
#[default] #[default]
@ -131,11 +136,53 @@ pub(crate) struct Rule {
} }
impl SessionTx { impl SessionTx {
fn semi_naive_evaluate(&mut self, prog: &DatalogProgram) -> Result<()> {
let stores = prog
.iter()
.map(|(k, s)| (k.clone(), (self.new_throwaway(), s.arity)))
.collect::<BTreeMap<_, _>>();
let compiled: BTreeMap<_, _> = prog
.iter()
.map(
|(k, body)| -> Result<(Keyword, Vec<(Vec<(Keyword, Aggregation)>, Relation)>)> {
let mut collected = Vec::with_capacity(body.sets.len());
for rule in &body.sets {
let relation = self.compile_rule_body(&rule.body, rule.vld, &stores)?;
collected.push((rule.head.clone(), relation));
}
Ok((k.clone(), collected))
},
)
.try_collect()?;
for epoch in 0u32.. {
let mut changed = false;
for (k, rules) in compiled.iter() {
let store = stores.get(k).unwrap();
let use_delta = BTreeSet::from([stores.get(k).unwrap().0.get_id()]);
for (head, relation) in rules {
for item_res in relation.iter(self, epoch, &use_delta) {
let item = item_res?;
// check if item exists, if no, update changed
// store item
// todo: variables elimination should be more aggressive
// todo: reorder items at the end
// improvement: the clauses can actually be evaluated in parallel
}
}
}
if !changed {
break;
}
}
// todo: return the throwaway for query!
Ok(())
}
pub fn parse_rule_sets( pub fn parse_rule_sets(
&mut self, &mut self,
payload: &JsonValue, payload: &JsonValue,
default_vld: Validity, default_vld: Validity,
) -> Result<BTreeMap<Keyword, RuleSet>> { ) -> Result<DatalogProgram> {
let rules = payload let rules = payload
.as_array() .as_array()
.ok_or_else(|| { .ok_or_else(|| {
@ -165,14 +212,7 @@ impl SessionTx {
return Err(QueryProcError::ArityMismatch(name).into()); return Err(QueryProcError::ArityMismatch(name).into());
} }
} }
Ok(( Ok((name, RuleSet { sets: rules, arity }))
name,
RuleSet {
storage: None,
sets: rules,
arity,
},
))
}) })
.try_collect() .try_collect()
} }
@ -299,7 +339,12 @@ impl SessionTx {
}, },
)) ))
} }
pub fn compile_rule_body(&mut self, clauses: Vec<Atom>, vld: Validity) -> Result<Relation> { fn compile_rule_body(
&mut self,
clauses: &[Atom],
vld: Validity,
stores: &BTreeMap<Keyword, (ThrowawayArea, usize)>,
) -> Result<Relation> {
let mut ret = Relation::unit(); let mut ret = Relation::unit();
let mut seen_variables = BTreeSet::new(); let mut seen_variables = BTreeSet::new();
let mut id_serial = 0; let mut id_serial = 0;
@ -311,13 +356,13 @@ impl SessionTx {
}; };
for clause in clauses { for clause in clauses {
match clause { match clause {
Atom::AttrTriple(a_triple) => match (a_triple.entity, a_triple.value) { Atom::AttrTriple(a_triple) => match (&a_triple.entity, &a_triple.value) {
(Term::Const(eid), Term::Var(v_kw)) => { (Term::Const(eid), Term::Var(v_kw)) => {
let temp_join_key_left = next_ignored_kw(); let temp_join_key_left = next_ignored_kw();
let temp_join_key_right = next_ignored_kw(); let temp_join_key_right = next_ignored_kw();
let const_rel = Relation::Fixed(InlineFixedRelation { let const_rel = Relation::Fixed(InlineFixedRelation {
bindings: vec![temp_join_key_left.clone()], bindings: vec![temp_join_key_left.clone()],
data: vec![vec![DataValue::EnId(eid)]], data: vec![vec![DataValue::EnId(*eid)]],
to_eliminate: Default::default(), to_eliminate: Default::default(),
}); });
if ret.is_unit() { if ret.is_unit() {
@ -338,19 +383,19 @@ impl SessionTx {
let mut join_right_keys = vec![temp_join_key_right.clone()]; let mut join_right_keys = vec![temp_join_key_right.clone()];
let v_kw = { let v_kw = {
if seen_variables.contains(&v_kw) { if seen_variables.contains(v_kw) {
let ret = next_ignored_kw(); let ret = next_ignored_kw();
// to_eliminate.insert(ret.clone()); // to_eliminate.insert(ret.clone());
join_left_keys.push(v_kw); join_left_keys.push(v_kw.clone());
join_right_keys.push(ret.clone()); join_right_keys.push(ret.clone());
ret ret
} else { } else {
seen_variables.insert(v_kw.clone()); seen_variables.insert(v_kw.clone());
v_kw v_kw.clone()
} }
}; };
let right = Relation::Triple(TripleRelation { let right = Relation::Triple(TripleRelation {
attr: a_triple.attr, attr: a_triple.attr.clone(),
vld, vld,
bindings: [temp_join_key_right, v_kw], bindings: [temp_join_key_right, v_kw],
}); });
@ -369,7 +414,7 @@ impl SessionTx {
let temp_join_key_right = next_ignored_kw(); let temp_join_key_right = next_ignored_kw();
let const_rel = Relation::Fixed(InlineFixedRelation { let const_rel = Relation::Fixed(InlineFixedRelation {
bindings: vec![temp_join_key_left.clone()], bindings: vec![temp_join_key_left.clone()],
data: vec![vec![val]], data: vec![vec![val.clone()]],
to_eliminate: Default::default(), to_eliminate: Default::default(),
}); });
if ret.is_unit() { if ret.is_unit() {
@ -392,16 +437,16 @@ impl SessionTx {
let e_kw = { let e_kw = {
if seen_variables.contains(&e_kw) { if seen_variables.contains(&e_kw) {
let ret = next_ignored_kw(); let ret = next_ignored_kw();
join_left_keys.push(e_kw); join_left_keys.push(e_kw.clone());
join_right_keys.push(ret.clone()); join_right_keys.push(ret.clone());
ret ret
} else { } else {
seen_variables.insert(e_kw.clone()); seen_variables.insert(e_kw.clone());
e_kw e_kw.clone()
} }
}; };
let right = Relation::Triple(TripleRelation { let right = Relation::Triple(TripleRelation {
attr: a_triple.attr, attr: a_triple.attr.clone(),
vld, vld,
bindings: [e_kw, temp_join_key_right], bindings: [e_kw, temp_join_key_right],
}); });
@ -424,27 +469,27 @@ impl SessionTx {
let e_kw = { let e_kw = {
if seen_variables.contains(&e_kw) { if seen_variables.contains(&e_kw) {
let ret = next_ignored_kw(); let ret = next_ignored_kw();
join_left_keys.push(e_kw); join_left_keys.push(e_kw.clone());
join_right_keys.push(ret.clone()); join_right_keys.push(ret.clone());
ret ret
} else { } else {
seen_variables.insert(e_kw.clone()); seen_variables.insert(e_kw.clone());
e_kw e_kw.clone()
} }
}; };
let v_kw = { let v_kw = {
if seen_variables.contains(&v_kw) { if seen_variables.contains(v_kw) {
let ret = next_ignored_kw(); let ret = next_ignored_kw();
join_left_keys.push(v_kw); join_left_keys.push(v_kw.clone());
join_right_keys.push(ret.clone()); join_right_keys.push(ret.clone());
ret ret
} else { } else {
seen_variables.insert(v_kw.clone()); seen_variables.insert(v_kw.clone());
v_kw v_kw.clone()
} }
}; };
let right = Relation::Triple(TripleRelation { let right = Relation::Triple(TripleRelation {
attr: a_triple.attr, attr: a_triple.attr.clone(),
vld, vld,
bindings: [e_kw, v_kw], bindings: [e_kw, v_kw],
}); });
@ -466,7 +511,7 @@ impl SessionTx {
let (left_var_1, left_var_2) = (next_ignored_kw(), next_ignored_kw()); let (left_var_1, left_var_2) = (next_ignored_kw(), next_ignored_kw());
let const_rel = Relation::Fixed(InlineFixedRelation { let const_rel = Relation::Fixed(InlineFixedRelation {
bindings: vec![left_var_1.clone(), left_var_2.clone()], bindings: vec![left_var_1.clone(), left_var_2.clone()],
data: vec![vec![DataValue::EnId(eid), val]], data: vec![vec![DataValue::EnId(*eid), val.clone()]],
to_eliminate: Default::default(), to_eliminate: Default::default(),
}); });
if ret.is_unit() { if ret.is_unit() {
@ -485,7 +530,7 @@ impl SessionTx {
let (right_var_1, right_var_2) = (next_ignored_kw(), next_ignored_kw()); let (right_var_1, right_var_2) = (next_ignored_kw(), next_ignored_kw());
let right = Relation::Triple(TripleRelation { let right = Relation::Triple(TripleRelation {
attr: a_triple.attr, attr: a_triple.attr.clone(),
vld, vld,
bindings: [right_var_1.clone(), right_var_2.clone()], bindings: [right_var_1.clone(), right_var_2.clone()],
}); });
@ -501,7 +546,63 @@ impl SessionTx {
} }
}, },
Atom::Rule(rule_app) => { Atom::Rule(rule_app) => {
todo!() let (store, arity) = stores
.get(&rule_app.name)
.ok_or_else(|| QueryProcError::UndefinedRule(rule_app.name.clone()))?
.clone();
if arity != rule_app.args.len() {
return Err(QueryProcError::ArityMismatch(rule_app.name.clone()).into());
}
let mut left_joiner_vals = vec![];
let mut left_joiner_vars = vec![];
let mut right_joiner_vars = vec![];
let mut right_vars = vec![];
for term in &rule_app.args {
match term {
Term::Var(var) => {
right_vars.push(var.clone());
}
Term::Const(constant) => {
left_joiner_vals.push(constant.clone());
left_joiner_vars.push(next_ignored_kw());
right_joiner_vars.push(next_ignored_kw());
right_vars.push(next_ignored_kw());
}
}
}
if !left_joiner_vars.is_empty() {
let const_joiner = Relation::Fixed(InlineFixedRelation {
bindings: left_joiner_vars.clone(),
data: vec![left_joiner_vals],
to_eliminate: Default::default(),
});
ret = Relation::Join(Box::new(InnerJoin {
left: ret,
right: const_joiner,
joiner: Joiner {
left_keys: vec![],
right_keys: vec![],
},
to_eliminate: Default::default(),
}))
}
let right = Relation::Derived(StoredDerivedRelation {
bindings: right_vars,
storage: store,
});
ret = Relation::Join(Box::new(InnerJoin {
left: ret,
right,
joiner: Joiner {
left_keys: left_joiner_vars,
right_keys: right_joiner_vars,
},
to_eliminate: Default::default(),
}))
} }
Atom::Predicate(_) => { Atom::Predicate(_) => {
todo!() todo!()

@ -17,7 +17,7 @@ use crate::data::encode::{
use crate::data::id::{AttrId, EntityId, TxId, Validity}; use crate::data::id::{AttrId, EntityId, TxId, Validity};
use crate::data::keyword::Keyword; use crate::data::keyword::Keyword;
use crate::data::value::DataValue; use crate::data::value::DataValue;
use crate::transact::throwaway::ThrowawayArea; use crate::transact::throwaway::{ThrowawayArea, ThrowawayId};
pub struct SessionTx { pub struct SessionTx {
pub(crate) tx: Tx, pub(crate) tx: Tx,
@ -70,7 +70,7 @@ impl SessionTx {
let old_count = self.throwaway_count.fetch_add(1, Ordering::AcqRel); let old_count = self.throwaway_count.fetch_add(1, Ordering::AcqRel);
ThrowawayArea { ThrowawayArea {
db: self.throwaway.clone(), db: self.throwaway.clone(),
prefix: old_count, id: ThrowawayId(old_count),
} }
} }

@ -8,7 +8,7 @@ use crate::data::keyword::Keyword;
use crate::data::tuple::{Tuple, TupleIter}; use crate::data::tuple::{Tuple, TupleIter};
use crate::data::value::DataValue; use crate::data::value::DataValue;
use crate::runtime::transact::SessionTx; use crate::runtime::transact::SessionTx;
use crate::transact::throwaway::ThrowawayArea; use crate::transact::throwaway::{ThrowawayArea, ThrowawayId};
use crate::Validity; use crate::Validity;
#[derive(Debug)] #[derive(Debug)]
@ -456,13 +456,28 @@ fn get_eliminate_indices(bindings: &[Keyword], eliminate: &BTreeSet<Keyword>) ->
#[derive(Debug)] #[derive(Debug)]
pub struct StoredDerivedRelation { pub struct StoredDerivedRelation {
bindings: Vec<Keyword>, pub(crate) bindings: Vec<Keyword>,
storage: ThrowawayArea, pub(crate) storage: ThrowawayArea,
} }
impl StoredDerivedRelation { impl StoredDerivedRelation {
fn iter(&self) -> TupleIter { fn iter(&self, epoch: u32, use_delta: &BTreeSet<ThrowawayId>) -> TupleIter {
Box::new(self.storage.scan_all().map_ok(|(t, _)| t)) if use_delta.contains(&self.storage.get_id()) {
Box::new(
self.storage
.scan_all()
.filter_map_ok(move |(t, stored_epoch)| {
if let Some(stored_epoch) = stored_epoch {
if epoch > stored_epoch + 1 {
return None;
}
}
Some(t)
}),
)
} else {
Box::new(self.storage.scan_all().map_ok(|(t, _)| t))
}
} }
fn join_is_prefix(&self, right_join_indices: &[usize]) -> bool { fn join_is_prefix(&self, right_join_indices: &[usize]) -> bool {
let mut indices = right_join_indices.to_vec(); let mut indices = right_join_indices.to_vec();
@ -475,6 +490,8 @@ impl StoredDerivedRelation {
left_iter: TupleIter<'a>, left_iter: TupleIter<'a>,
(left_join_indices, right_join_indices): (Vec<usize>, Vec<usize>), (left_join_indices, right_join_indices): (Vec<usize>, Vec<usize>),
eliminate_indices: BTreeSet<usize>, eliminate_indices: BTreeSet<usize>,
epoch: u32,
use_delta: &BTreeSet<ThrowawayId>,
) -> TupleIter<'a> { ) -> TupleIter<'a> {
let mut right_invert_indices = right_join_indices.iter().enumerate().collect_vec(); let mut right_invert_indices = right_join_indices.iter().enumerate().collect_vec();
right_invert_indices.sort_by_key(|(_, b)| **b); right_invert_indices.sort_by_key(|(_, b)| **b);
@ -482,39 +499,82 @@ impl StoredDerivedRelation {
.into_iter() .into_iter()
.map(|(a, _)| left_join_indices[a]) .map(|(a, _)| left_join_indices[a])
.collect_vec(); .collect_vec();
Box::new( if use_delta.contains(&self.storage.get_id()) {
left_iter Box::new(
.map_ok(move |tuple| { left_iter
let eliminate_indices = eliminate_indices.clone(); .map_ok(move |tuple| {
let prefix = Tuple( let eliminate_indices = eliminate_indices.clone();
left_to_prefix_indices let prefix = Tuple(
.iter() left_to_prefix_indices
.map(|i| tuple.0[*i].clone()) .iter()
.collect_vec(), .map(|i| tuple.0[*i].clone())
); .collect_vec(),
self.storage.scan_prefix(&prefix).map_ok(move |(found, _)| { );
let mut ret = tuple.0.clone(); self.storage
ret.extend(found.0); .scan_prefix(&prefix)
.filter_map_ok(move |(found, meta)| {
if !eliminate_indices.is_empty() { if let Some(stored_epoch) = meta {
ret = ret if epoch > stored_epoch + 1 {
.into_iter() return None;
.enumerate()
.filter_map(|(i, v)| {
if eliminate_indices.contains(&i) {
None
} else {
Some(v)
} }
}) }
.collect_vec(); let mut ret = tuple.0.clone();
} ret.extend(found.0);
Tuple(ret)
if !eliminate_indices.is_empty() {
ret = ret
.into_iter()
.enumerate()
.filter_map(|(i, v)| {
if eliminate_indices.contains(&i) {
None
} else {
Some(v)
}
})
.collect_vec();
}
Some(Tuple(ret))
})
}) })
}) .flatten_ok()
.flatten_ok() .map(flatten_err),
.map(flatten_err), )
) } else {
Box::new(
left_iter
.map_ok(move |tuple| {
let eliminate_indices = eliminate_indices.clone();
let prefix = Tuple(
left_to_prefix_indices
.iter()
.map(|i| tuple.0[*i].clone())
.collect_vec(),
);
self.storage.scan_prefix(&prefix).map_ok(move |(found, _)| {
let mut ret = tuple.0.clone();
ret.extend(found.0);
if !eliminate_indices.is_empty() {
ret = ret
.into_iter()
.enumerate()
.filter_map(|(i, v)| {
if eliminate_indices.contains(&i) {
None
} else {
Some(v)
}
})
.collect_vec();
}
Tuple(ret)
})
})
.flatten_ok()
.map(flatten_err),
)
}
} }
} }
@ -607,15 +667,20 @@ impl Relation {
Relation::Join(j) => j.bindings(), Relation::Join(j) => j.bindings(),
} }
} }
pub fn iter<'a>(&'a self, tx: &'a SessionTx) -> TupleIter<'a> { pub fn iter<'a>(
&'a self,
tx: &'a SessionTx,
epoch: u32,
use_delta: &BTreeSet<ThrowawayId>,
) -> TupleIter<'a> {
match self { match self {
Relation::Fixed(f) => Box::new(f.data.iter().map(|t| Ok(Tuple(t.clone())))), Relation::Fixed(f) => Box::new(f.data.iter().map(|t| Ok(Tuple(t.clone())))),
Relation::Triple(r) => Box::new( Relation::Triple(r) => Box::new(
tx.triple_a_before_scan(r.attr.id, r.vld) tx.triple_a_before_scan(r.attr.id, r.vld)
.map_ok(|(_, e_id, y)| Tuple(vec![DataValue::EnId(e_id), y])), .map_ok(|(_, e_id, y)| Tuple(vec![DataValue::EnId(e_id), y])),
), ),
Relation::Derived(r) => r.iter(), Relation::Derived(r) => r.iter(epoch, use_delta),
Relation::Join(j) => j.iter(tx), Relation::Join(j) => j.iter(tx, epoch, use_delta),
} }
} }
} }
@ -641,7 +706,12 @@ impl InnerJoin {
ret.extend(self.right.bindings()); ret.extend(self.right.bindings());
ret ret
} }
pub(crate) fn iter<'a>(&'a self, tx: &'a SessionTx) -> TupleIter<'a> { pub(crate) fn iter<'a>(
&'a self,
tx: &'a SessionTx,
epoch: u32,
use_delta: &BTreeSet<ThrowawayId>,
) -> TupleIter<'a> {
let bindings = self.bindings(); let bindings = self.bindings();
let eliminate_indices = get_eliminate_indices(&bindings, &self.to_eliminate); let eliminate_indices = get_eliminate_indices(&bindings, &self.to_eliminate);
match &self.right { match &self.right {
@ -649,31 +719,48 @@ impl InnerJoin {
let join_indices = self let join_indices = self
.joiner .joiner
.join_indices(&self.left.bindings(), &self.right.bindings()); .join_indices(&self.left.bindings(), &self.right.bindings());
f.join(self.left.iter(tx), join_indices, eliminate_indices) f.join(
self.left.iter(tx, epoch, use_delta),
join_indices,
eliminate_indices,
)
} }
Relation::Triple(r) => { Relation::Triple(r) => {
let join_indices = self let join_indices = self
.joiner .joiner
.join_indices(&self.left.bindings(), &self.right.bindings()); .join_indices(&self.left.bindings(), &self.right.bindings());
r.join(self.left.iter(tx), join_indices, tx, eliminate_indices) r.join(
self.left.iter(tx, epoch, use_delta),
join_indices,
tx,
eliminate_indices,
)
} }
Relation::Derived(r) => { Relation::Derived(r) => {
let join_indices = self let join_indices = self
.joiner .joiner
.join_indices(&self.left.bindings(), &self.right.bindings()); .join_indices(&self.left.bindings(), &self.right.bindings());
if r.join_is_prefix(&join_indices.1) { if r.join_is_prefix(&join_indices.1) {
r.prefix_join(self.left.iter(tx), join_indices, eliminate_indices) r.prefix_join(
self.left.iter(tx, epoch, use_delta),
join_indices,
eliminate_indices,
epoch,
use_delta,
)
} else { } else {
self.materialized_join(tx, eliminate_indices) self.materialized_join(tx, eliminate_indices, epoch, use_delta)
} }
} }
Relation::Join(_) => self.materialized_join(tx, eliminate_indices), Relation::Join(_) => self.materialized_join(tx, eliminate_indices, epoch, use_delta),
} }
} }
fn materialized_join<'a>( fn materialized_join<'a>(
&'a self, &'a self,
tx: &'a SessionTx, tx: &'a SessionTx,
eliminate_indices: BTreeSet<usize>, eliminate_indices: BTreeSet<usize>,
epoch: u32,
use_delta: &BTreeSet<ThrowawayId>,
) -> TupleIter<'a> { ) -> TupleIter<'a> {
let right_bindings = self.right.bindings(); let right_bindings = self.right.bindings();
let (left_join_indices, right_join_indices) = self let (left_join_indices, right_join_indices) = self
@ -693,7 +780,7 @@ impl InnerJoin {
.map(|(a, _)| a) .map(|(a, _)| a)
.collect_vec(); .collect_vec();
let mut throwaway = tx.new_throwaway(); let mut throwaway = tx.new_throwaway();
for item in self.right.iter(tx) { for item in self.right.iter(tx, epoch, use_delta) {
match item { match item {
Ok(tuple) => { Ok(tuple) => {
let stored_tuple = Tuple( let stored_tuple = Tuple(
@ -711,7 +798,7 @@ impl InnerJoin {
} }
Box::new( Box::new(
self.left self.left
.iter(tx) .iter(tx, epoch, use_delta)
.map_ok(move |tuple| { .map_ok(move |tuple| {
let eliminate_indices = eliminate_indices.clone(); let eliminate_indices = eliminate_indices.clone();
let prefix = Tuple( let prefix = Tuple(

@ -1,36 +1,43 @@
use std::fmt::{Debug, Formatter}; use std::fmt::{Debug, Formatter};
use cozorocks::{DbIter, PinSlice, RawRocksDb, RocksDbStatus}; use cozorocks::{DbIter, PinSlice, RawRocksDb, RocksDbStatus};
use crate::data::tuple::{EncodedTuple, Tuple}; use crate::data::tuple::{EncodedTuple, Tuple};
use crate::data::value::DataValue; use crate::data::value::DataValue;
#[derive(Copy, Clone, Debug, Ord, PartialOrd, Eq, PartialEq)]
pub struct ThrowawayId(pub(crate) u32);
#[derive(Clone)] #[derive(Clone)]
pub(crate) struct ThrowawayArea { pub(crate) struct ThrowawayArea {
pub(crate) db: RawRocksDb, pub(crate) db: RawRocksDb,
pub(crate) prefix: u32, pub(crate) id: ThrowawayId,
} }
impl Debug for ThrowawayArea { impl Debug for ThrowawayArea {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "Throwaway<{}>", self.prefix) write!(f, "Throwaway<{}>", self.id.0)
} }
} }
impl ThrowawayArea { impl ThrowawayArea {
pub(crate) fn get_id(&self) -> ThrowawayId {
todo!()
}
pub(crate) fn put(&mut self, tuple: &Tuple, value: &[u8]) -> Result<(), RocksDbStatus> { pub(crate) fn put(&mut self, tuple: &Tuple, value: &[u8]) -> Result<(), RocksDbStatus> {
let key_encoded = tuple.encode_as_key(self.prefix); let key_encoded = tuple.encode_as_key(self.id.0);
self.db.put(&key_encoded, value) self.db.put(&key_encoded, value)
} }
pub(crate) fn get(&self, tuple: &Tuple) -> Result<Option<PinSlice>, RocksDbStatus> { pub(crate) fn get(&self, tuple: &Tuple) -> Result<Option<PinSlice>, RocksDbStatus> {
let key_encoded = tuple.encode_as_key(self.prefix); let key_encoded = tuple.encode_as_key(self.id.0);
self.db.get(&key_encoded) self.db.get(&key_encoded)
} }
pub(crate) fn del(&mut self, tuple: &Tuple) -> Result<(), RocksDbStatus> { pub(crate) fn del(&mut self, tuple: &Tuple) -> Result<(), RocksDbStatus> {
let key_encoded = tuple.encode_as_key(self.prefix); let key_encoded = tuple.encode_as_key(self.id.0);
self.db.del(&key_encoded) self.db.del(&key_encoded)
} }
pub(crate) fn scan_all(&self) -> impl Iterator<Item = anyhow::Result<(Tuple, Vec<u8>)>> { pub(crate) fn scan_all(&self) -> impl Iterator<Item = anyhow::Result<(Tuple, Option<u32>)>> {
let (lower, upper) = EncodedTuple::bounds_for_prefix(self.prefix); let (lower, upper) = EncodedTuple::bounds_for_prefix(self.id.0);
let mut it = self let mut it = self
.db .db
.iterator() .iterator()
@ -43,12 +50,12 @@ impl ThrowawayArea {
pub(crate) fn scan_prefix( pub(crate) fn scan_prefix(
&self, &self,
prefix: &Tuple, prefix: &Tuple,
) -> impl Iterator<Item = anyhow::Result<(Tuple, Vec<u8>)>> { ) -> impl Iterator<Item = anyhow::Result<(Tuple, Option<u32>)>> {
let mut upper = prefix.0.clone(); let mut upper = prefix.0.clone();
upper.push(DataValue::Null); upper.push(DataValue::Null);
let upper = Tuple(upper); let upper = Tuple(upper);
let upper = upper.encode_as_key(self.prefix); let upper = upper.encode_as_key(self.id.0);
let lower = prefix.encode_as_key(self.prefix); let lower = prefix.encode_as_key(self.id.0);
let mut it = self let mut it = self
.db .db
.iterator() .iterator()
@ -66,7 +73,7 @@ struct ThrowawayIter {
} }
impl Iterator for ThrowawayIter { impl Iterator for ThrowawayIter {
type Item = anyhow::Result<(Tuple, Vec<u8>)>; type Item = anyhow::Result<(Tuple, Option<u32>)>;
fn next(&mut self) -> Option<Self::Item> { fn next(&mut self) -> Option<Self::Item> {
if !self.started { if !self.started {
@ -79,7 +86,16 @@ impl Iterator for ThrowawayIter {
Ok(None) => None, Ok(None) => None,
Ok(Some((k_slice, v_slice))) => match EncodedTuple(k_slice).decode() { Ok(Some((k_slice, v_slice))) => match EncodedTuple(k_slice).decode() {
Err(e) => Some(Err(e)), Err(e) => Some(Err(e)),
Ok(t) => Some(Ok((t, v_slice.to_vec()))), Ok(t) => {
let epoch = if v_slice.is_empty() {
None
} else {
Some(u32::from_be_bytes([
v_slice[0], v_slice[1], v_slice[2], v_slice[3],
]))
};
Some(Ok((t, epoch)))
}
}, },
} }
} }
@ -87,7 +103,7 @@ impl Iterator for ThrowawayIter {
impl Drop for ThrowawayArea { impl Drop for ThrowawayArea {
fn drop(&mut self) { fn drop(&mut self) {
let (lower, upper) = EncodedTuple::bounds_for_prefix(self.prefix); let (lower, upper) = EncodedTuple::bounds_for_prefix(self.id.0);
if let Err(e) = self.db.range_del(&lower, &upper) { if let Err(e) = self.db.range_del(&lower, &upper) {
eprintln!("{}", e); eprintln!("{}", e);
} }

Loading…
Cancel
Save