tempstores have names

main
Ziyang Hu 2 years ago
parent 37b4851622
commit d6dd698ea4

@ -108,8 +108,8 @@ if __name__ == '__main__':
db = insert_data(False) db = insert_data(False)
start_time = time.time() start_time = time.time()
res = db.run([Q(['?c', '?code', '?desc'], res = db.run([Q(['?c', '?code', '?desc'],
T.country.code('?c', 'VN'), Disj(T.country.code('?c', 'CU'),
Unify('?c', 10000239), Unify('?c', 10000239)),
T.country.code('?c', '?code'), T.country.code('?c', '?code'),
T.country.desc('?c', '?desc'))]) T.country.desc('?c', '?desc'))])
end_time = time.time() end_time = time.time()

@ -15,7 +15,7 @@ impl SessionTx {
rule: &MagicRule, rule: &MagicRule,
rule_name: &MagicKeyword, rule_name: &MagicKeyword,
rule_idx: usize, rule_idx: usize,
stores: &BTreeMap<MagicKeyword, (TempStore, usize)>, stores: &BTreeMap<MagicKeyword, TempStore>,
ret_vars: &[Keyword], ret_vars: &[Keyword],
) -> Result<Relation> { ) -> Result<Relation> {
let mut ret = Relation::unit(); let mut ret = Relation::unit();
@ -58,15 +58,15 @@ impl SessionTx {
} }
} }
MagicAtom::Rule(rule_app) => { MagicAtom::Rule(rule_app) => {
let (store, arity) = stores let store = stores
.get(&rule_app.name) .get(&rule_app.name)
.ok_or_else(|| anyhow!("undefined rule {:?} encountered", rule_app.name))? .ok_or_else(|| anyhow!("undefined rule {:?} encountered", rule_app.name))?
.clone(); .clone();
ensure!( ensure!(
arity == rule_app.args.len(), store.key_size == rule_app.args.len(),
"arity mismatch in rule application {:?}, expect {}, found {}", "arity mismatch in rule application {:?}, expect {}, found {}",
rule_app.name, rule_app.name,
arity, store.key_size,
rule_app.args.len() rule_app.args.len()
); );
let mut prev_joiner_vars = vec![]; let mut prev_joiner_vars = vec![];
@ -129,15 +129,15 @@ impl SessionTx {
} }
} }
MagicAtom::NegatedRule(rule_app) => { MagicAtom::NegatedRule(rule_app) => {
let (store, arity) = stores let store = stores
.get(&rule_app.name) .get(&rule_app.name)
.ok_or_else(|| anyhow!("undefined rule encountered: {:?}", rule_app.name))? .ok_or_else(|| anyhow!("undefined rule encountered: {:?}", rule_app.name))?
.clone(); .clone();
ensure!( ensure!(
arity == rule_app.args.len(), store.key_size == rule_app.args.len(),
"arity mismatch for {:?}, expect {}, got {}", "arity mismatch for {:?}, expect {}, got {}",
rule_app.name, rule_app.name,
arity, store.key_size,
rule_app.args.len() rule_app.args.len()
); );

@ -20,14 +20,18 @@ impl SessionTx {
.0 .0
.iter() .iter()
.flat_map(|p| p.prog.iter()) .flat_map(|p| p.prog.iter())
.map(|(k, s)| (k.clone(), (self.new_throwaway(), s[0].head.len()))) .map(|(k, s)| {
(
k.clone(),
(self.new_throwaway(s[0].head.len(), 0, k.clone())),
)
})
.collect::<BTreeMap<_, _>>(); .collect::<BTreeMap<_, _>>();
let ret_area = stores let ret_area = stores
.get(&MagicKeyword::Muggle { .get(&MagicKeyword::Muggle {
inner: PROG_ENTRY.clone(), inner: PROG_ENTRY.clone(),
}) })
.ok_or_else(|| anyhow!("program entry not found in rules"))? .ok_or_else(|| anyhow!("program entry not found in rules"))?
.0
.clone(); .clone();
debug!("evaluate program with {} strata", prog.0.len()); debug!("evaluate program with {} strata", prog.0.len());
@ -40,7 +44,7 @@ impl SessionTx {
fn semi_naive_magic_evaluate( fn semi_naive_magic_evaluate(
&mut self, &mut self,
prog: &MagicProgram, prog: &MagicProgram,
stores: &BTreeMap<MagicKeyword, (TempStore, usize)>, stores: &BTreeMap<MagicKeyword, TempStore>,
) -> Result<()> { ) -> Result<()> {
let compiled: BTreeMap<_, _> = prog let compiled: BTreeMap<_, _> = prog
.prog .prog
@ -78,7 +82,7 @@ impl SessionTx {
debug!("epoch {}", epoch); debug!("epoch {}", epoch);
if epoch == 0 { if epoch == 0 {
for (k, rules) in compiled.iter() { for (k, rules) in compiled.iter() {
let (store, _arity) = stores.get(k).unwrap(); let store = stores.get(k).unwrap();
let use_delta = BTreeSet::default(); let use_delta = BTreeSet::default();
for (rule_n, (_head, _deriving_rules, relation)) in rules.iter().enumerate() { for (rule_n, (_head, _deriving_rules, relation)) in rules.iter().enumerate() {
debug!("initial calculation for rule {:?}.{}", k, rule_n); debug!("initial calculation for rule {:?}.{}", k, rule_n);
@ -97,7 +101,7 @@ impl SessionTx {
} }
for (k, rules) in compiled.iter() { for (k, rules) in compiled.iter() {
let (store, _arity) = stores.get(k).unwrap(); let store = stores.get(k).unwrap();
for (rule_n, (_head, deriving_rules, relation)) in rules.iter().enumerate() { for (rule_n, (_head, deriving_rules, relation)) in rules.iter().enumerate() {
let mut should_do_calculation = false; let mut should_do_calculation = false;
for d_rule in deriving_rules { for d_rule in deriving_rules {
@ -112,7 +116,7 @@ impl SessionTx {
// debug!("skip {}.{}", k, rule_n); // debug!("skip {}.{}", k, rule_n);
continue; continue;
} }
for (delta_key, (delta_store, _)) in stores.iter() { for (delta_key, delta_store) in stores.iter() {
if !deriving_rules.contains(delta_key) { if !deriving_rules.contains(delta_key) {
continue; continue;
} }

@ -196,7 +196,7 @@ impl Debug for Relation {
Relation::Derived(r) => f Relation::Derived(r) => f
.debug_tuple("Derived") .debug_tuple("Derived")
.field(&bindings) .field(&bindings)
.field(&r.storage.id) .field(&r.storage.rule_name)
.finish(), .finish(),
Relation::Join(r) => { Relation::Join(r) => {
if r.left.is_unit() { if r.left.is_unit() {
@ -231,9 +231,9 @@ impl Debug for Relation {
Relation::Unification(r) => f Relation::Unification(r) => f
.debug_tuple("Unify") .debug_tuple("Unify")
.field(&bindings) .field(&bindings)
.field(&r.parent)
.field(&r.binding) .field(&r.binding)
.field(&r.expr) .field(&r.expr)
.field(&r.parent)
.finish(), .finish(),
} }
} }
@ -989,7 +989,7 @@ impl TripleRelation {
eliminate_indices: BTreeSet<usize>, eliminate_indices: BTreeSet<usize>,
) -> TupleIter<'a> { ) -> TupleIter<'a> {
// [f, b] where b is not indexed // [f, b] where b is not indexed
let throwaway = tx.new_throwaway(); let throwaway = tx.temp_area();
for item in tx.triple_a_before_scan(self.attr.id, self.vld) { for item in tx.triple_a_before_scan(self.attr.id, self.vld) {
match item { match item {
Err(e) => return Box::new([Err(e)].into_iter()), Err(e) => return Box::new([Err(e)].into_iter()),
@ -1528,7 +1528,7 @@ impl InnerJoin {
.sorted_by_key(|(_, b)| **b) .sorted_by_key(|(_, b)| **b)
.map(|(a, _)| a) .map(|(a, _)| a)
.collect_vec(); .collect_vec();
let throwaway = tx.new_throwaway(); let throwaway = tx.temp_area();
for item in self.right.iter(tx, epoch, use_delta) { for item in self.right.iter(tx, epoch, use_delta) {
match item { match item {
Ok(tuple) => { Ok(tuple) => {

@ -1,9 +1,10 @@
use std::fmt::{Debug, Formatter}; use std::fmt::{Debug, Formatter};
use log::{error}; use log::error;
use cozorocks::{DbIter, RawRocksDb, RocksDbStatus}; use cozorocks::{DbIter, RawRocksDb, RocksDbStatus};
use crate::data::program::MagicKeyword;
use crate::data::tuple::{EncodedTuple, Tuple}; use crate::data::tuple::{EncodedTuple, Tuple};
use crate::data::value::DataValue; use crate::data::value::DataValue;
@ -20,6 +21,9 @@ impl Debug for TempStoreId {
pub(crate) struct TempStore { pub(crate) struct TempStore {
pub(crate) db: RawRocksDb, pub(crate) db: RawRocksDb,
pub(crate) id: TempStoreId, pub(crate) id: TempStoreId,
pub(crate) key_size: usize,
pub(crate) val_size: usize,
pub(crate) rule_name: MagicKeyword,
} }
impl Debug for TempStore { impl Debug for TempStore {
@ -37,13 +41,13 @@ impl TempStore {
let key_encoded = tuple.encode_as_key_for_epoch(self.id, epoch); let key_encoded = tuple.encode_as_key_for_epoch(self.id, epoch);
self.db.exists(&key_encoded) self.db.exists(&key_encoded)
} }
pub(crate) fn scan_all(&self) -> impl Iterator<Item=anyhow::Result<Tuple>> { pub(crate) fn scan_all(&self) -> impl Iterator<Item = anyhow::Result<Tuple>> {
self.scan_all_for_epoch(0) self.scan_all_for_epoch(0)
} }
pub(crate) fn scan_all_for_epoch( pub(crate) fn scan_all_for_epoch(
&self, &self,
epoch: u32, epoch: u32,
) -> impl Iterator<Item=anyhow::Result<Tuple>> { ) -> impl Iterator<Item = anyhow::Result<Tuple>> {
let (lower, upper) = EncodedTuple::bounds_for_prefix_and_epoch(self.id, epoch); let (lower, upper) = EncodedTuple::bounds_for_prefix_and_epoch(self.id, epoch);
let mut it = self let mut it = self
.db .db
@ -57,14 +61,14 @@ impl TempStore {
pub(crate) fn scan_prefix( pub(crate) fn scan_prefix(
&self, &self,
prefix: &Tuple, prefix: &Tuple,
) -> impl Iterator<Item=anyhow::Result<Tuple>> { ) -> impl Iterator<Item = anyhow::Result<Tuple>> {
self.scan_prefix_for_epoch(prefix, 0) self.scan_prefix_for_epoch(prefix, 0)
} }
pub(crate) fn scan_prefix_for_epoch( pub(crate) fn scan_prefix_for_epoch(
&self, &self,
prefix: &Tuple, prefix: &Tuple,
epoch: u32, epoch: u32,
) -> impl Iterator<Item=anyhow::Result<Tuple>> { ) -> impl Iterator<Item = anyhow::Result<Tuple>> {
let mut upper = prefix.0.clone(); let mut upper = prefix.0.clone();
upper.push(DataValue::Bottom); upper.push(DataValue::Bottom);
let upper = Tuple(upper); let upper = Tuple(upper);

@ -16,6 +16,7 @@ use crate::data::encode::{
}; };
use crate::data::id::{AttrId, EntityId, TxId, Validity}; use crate::data::id::{AttrId, EntityId, TxId, Validity};
use crate::data::keyword::Keyword; use crate::data::keyword::Keyword;
use crate::data::program::MagicKeyword;
use crate::data::value::DataValue; use crate::data::value::DataValue;
use crate::runtime::temp_store::{TempStore, TempStoreId}; use crate::runtime::temp_store::{TempStore, TempStoreId};
@ -66,12 +67,34 @@ impl TxLog {
} }
impl SessionTx { impl SessionTx {
pub(crate) fn new_throwaway(&self) -> TempStore { pub(crate) fn new_throwaway(
&self,
key_size: usize,
val_size: usize,
rule_name: MagicKeyword,
) -> TempStore {
let old_count = self.temp_store_id.fetch_add(1, Ordering::AcqRel); let old_count = self.temp_store_id.fetch_add(1, Ordering::AcqRel);
let old_count = old_count & 0x00ff_ffffu32; let old_count = old_count & 0x00ff_ffffu32;
TempStore { TempStore {
db: self.temp_store.clone(), db: self.temp_store.clone(),
id: TempStoreId(old_count), id: TempStoreId(old_count),
key_size,
val_size,
rule_name,
}
}
pub(crate) fn temp_area(&self) -> TempStore {
let old_count = self.temp_store_id.fetch_add(1, Ordering::AcqRel);
let old_count = old_count & 0x00ff_ffffu32;
TempStore {
db: self.temp_store.clone(),
id: TempStoreId(old_count),
key_size: 0,
val_size: 0,
rule_name: MagicKeyword::Muggle {
inner: Keyword::from(""),
},
} }
} }

Loading…
Cancel
Save