semi-naive evaluation

main
Ziyang Hu 2 years ago
parent 434a80dc5f
commit 00fb6b9204

@ -180,7 +180,7 @@ impl RawRocksDb {
.auto_prefix_mode(true)
}
#[inline]
pub fn put(&mut self, key: &[u8], val: &[u8]) -> Result<(), RocksDbStatus> {
pub fn put(&self, key: &[u8], val: &[u8]) -> Result<(), RocksDbStatus> {
let mut status = RocksDbStatus::default();
self.inner.put(key, val, &mut status);
if status.is_ok() {
@ -190,7 +190,7 @@ impl RawRocksDb {
}
}
#[inline]
pub fn del(&mut self, key: &[u8]) -> Result<(), RocksDbStatus> {
pub fn del(&self, key: &[u8]) -> Result<(), RocksDbStatus> {
let mut status = RocksDbStatus::default();
self.inner.del(key, &mut status);
if status.is_ok() {

@ -8,6 +8,7 @@ use serde::Serialize;
use crate::data::json::JsonValue;
use crate::data::value::DataValue;
use crate::transact::throwaway::ThrowawayId;
pub(crate) const SCRATCH_DB_KEY_PREFIX_LEN: usize = 4;
@ -39,10 +40,10 @@ impl Tuple {
pub(crate) fn arity(&self) -> usize {
self.0.len()
}
pub(crate) fn encode_as_key(&self, prefix: u32) -> Vec<u8> {
pub(crate) fn encode_as_key(&self, prefix: ThrowawayId) -> Vec<u8> {
let len = self.arity();
let mut ret = Vec::with_capacity(4 + 4 * len + 10 * len);
ret.extend(prefix.to_be_bytes());
ret.extend(prefix.0.to_be_bytes());
ret.extend((len as u32).to_be_bytes());
ret.resize(4 * (len + 1), 0);
for (idx, val) in self.0.iter().enumerate() {
@ -203,6 +204,7 @@ mod tests {
use crate::data::tuple::{EncodedTuple, Tuple};
use crate::data::value::DataValue;
use crate::transact::throwaway::ThrowawayId;
#[test]
fn test_serde() {
@ -212,7 +214,7 @@ mod tests {
json!("my_name_is").into(),
];
let val = Tuple(val);
let encoded = val.encode_as_key(123);
let encoded = val.encode_as_key(ThrowawayId(123));
println!("{:x?}", encoded);
let encoded_tuple: EncodedTuple = (&encoded as &[u8]).into();
println!("{:?}", encoded_tuple.prefix());

@ -9,10 +9,14 @@ use serde_json::Map;
use crate::data::attr::Attribute;
use crate::data::json::JsonValue;
use crate::data::keyword::Keyword;
use crate::data::tuple::TupleIter;
use crate::data::value::DataValue;
use crate::preprocess::triple::TxError;
use crate::runtime::transact::SessionTx;
use crate::transact::query::{InlineFixedRelation, InnerJoin, Joiner, Relation, ReorderRelation, StoredDerivedRelation, TripleRelation};
use crate::transact::query::{
InlineFixedRelation, InnerJoin, Joiner, Relation, ReorderRelation, StoredDerivedRelation,
TripleRelation,
};
use crate::transact::throwaway::ThrowawayArea;
use crate::{EntityId, Validity};
@ -49,6 +53,8 @@ pub enum QueryProcError {
UnsafeUnboundVars(BTreeSet<Keyword>),
#[error("program logic error: {0}")]
LogicError(String),
#[error("entry not found: expect a rule named '?'")]
EntryNotFound,
}
#[derive(Clone, Debug)]
@ -139,11 +145,16 @@ pub(crate) struct Rule {
}
impl SessionTx {
fn semi_naive_evaluate(&mut self, prog: &DatalogProgram) -> Result<()> {
fn semi_naive_evaluate(&mut self, prog: &DatalogProgram) -> Result<ThrowawayArea> {
let stores = prog
.iter()
.map(|(k, s)| (k.clone(), (self.new_throwaway(), s.arity)))
.collect::<BTreeMap<_, _>>();
let ret_area = stores
.get(&Keyword::from("?"))
.ok_or(QueryProcError::EntryNotFound)?
.0
.clone();
let compiled: BTreeMap<_, _> = prog
.iter()
.map(
@ -162,16 +173,17 @@ impl SessionTx {
for epoch in 0u32.. {
let mut changed = false;
let epoch_encoded = epoch.to_be_bytes();
for (k, rules) in compiled.iter() {
let store = stores.get(k).unwrap();
let use_delta = BTreeSet::from([stores.get(k).unwrap().0.get_id()]);
for (head, relation) in rules {
let (store, _arity) = stores.get(k).unwrap();
let use_delta = BTreeSet::from([stores.get(k).unwrap().0.id]);
for (_head, relation) in rules {
for item_res in relation.iter(self, epoch, &use_delta) {
let item = item_res?;
// check if item exists, if no, update changed
// store item
// todo: reorder items at the end
// improvement: the clauses can actually be evaluated in parallel
if store.put_if_absent(&item, &epoch_encoded)? {
changed = true;
}
}
}
}
@ -179,8 +191,7 @@ impl SessionTx {
break;
}
}
// todo: return the throwaway for query!
Ok(())
Ok(ret_area)
}
pub fn parse_rule_sets(
&mut self,
@ -641,7 +652,7 @@ impl SessionTx {
if ret_vars != cur_ret_bindings {
ret = Relation::Reorder(ReorderRelation {
relation: Box::new(ret),
new_order: ret_vars.to_vec()
new_order: ret_vars.to_vec(),
})
}

@ -510,7 +510,7 @@ pub struct StoredDerivedRelation {
impl StoredDerivedRelation {
fn iter(&self, epoch: u32, use_delta: &BTreeSet<ThrowawayId>) -> TupleIter {
if use_delta.contains(&self.storage.get_id()) {
if use_delta.contains(&self.storage.id) {
Box::new(
self.storage
.scan_all()
@ -547,7 +547,7 @@ impl StoredDerivedRelation {
.into_iter()
.map(|(a, _)| left_join_indices[a])
.collect_vec();
if use_delta.contains(&self.storage.get_id()) {
if use_delta.contains(&self.storage.id) {
Box::new(
left_iter
.map_ok(move |tuple| {

@ -21,19 +21,29 @@ impl Debug for ThrowawayArea {
}
impl ThrowawayArea {
pub(crate) fn get_id(&self) -> ThrowawayId {
todo!()
}
pub(crate) fn put(&mut self, tuple: &Tuple, value: &[u8]) -> Result<(), RocksDbStatus> {
let key_encoded = tuple.encode_as_key(self.id.0);
pub(crate) fn put(&self, tuple: &Tuple, value: &[u8]) -> Result<(), RocksDbStatus> {
let key_encoded = tuple.encode_as_key(self.id);
self.db.put(&key_encoded, value)
}
pub(crate) fn put_if_absent(&self, tuple: &Tuple, value: &[u8]) -> Result<bool, RocksDbStatus> {
let key_encoded = tuple.encode_as_key(self.id);
Ok(if !self.db.exists(&key_encoded)? {
self.db.put(&key_encoded, value)?;
true
} else {
false
})
}
pub(crate) fn get(&self, tuple: &Tuple) -> Result<Option<PinSlice>, RocksDbStatus> {
let key_encoded = tuple.encode_as_key(self.id.0);
let key_encoded = tuple.encode_as_key(self.id);
self.db.get(&key_encoded)
}
pub(crate) fn del(&mut self, tuple: &Tuple) -> Result<(), RocksDbStatus> {
let key_encoded = tuple.encode_as_key(self.id.0);
pub(crate) fn exists(&self, tuple: &Tuple) -> Result<bool, RocksDbStatus> {
let key_encoded = tuple.encode_as_key(self.id);
self.db.exists(&key_encoded)
}
pub(crate) fn del(&self, tuple: &Tuple) -> Result<(), RocksDbStatus> {
let key_encoded = tuple.encode_as_key(self.id);
self.db.del(&key_encoded)
}
pub(crate) fn scan_all(&self) -> impl Iterator<Item = anyhow::Result<(Tuple, Option<u32>)>> {
@ -54,8 +64,8 @@ impl ThrowawayArea {
let mut upper = prefix.0.clone();
upper.push(DataValue::Null);
let upper = Tuple(upper);
let upper = upper.encode_as_key(self.id.0);
let lower = prefix.encode_as_key(self.id.0);
let upper = upper.encode_as_key(self.id);
let lower = prefix.encode_as_key(self.id);
let mut it = self
.db
.iterator()

Loading…
Cancel
Save