main
Ziyang Hu 2 years ago
parent 6fcc0b788d
commit 740317cc1b

@ -1,6 +1,10 @@
## TODO
* [ ] stored relations
* [ ] `store-put` and `store-retract`
* [ ] more complete functions and aggregations
* [ ] auto-updating relations?
* [ ] counting-based `auto-view`
* [ ] manual or time-based `materialized-view`
* [ ] more complete tx tests
* [ ] graph algorithms

@ -6,10 +6,10 @@ use crate::bridge::ffi::*;
use crate::bridge::tx::TxBuilder;
use crate::{IterBuilder, PinSlice};
#[derive(Default)]
#[derive(Default, Clone)]
pub struct DbBuilder<'a> {
cmp_fn: Option<fn(&[u8], &[u8]) -> i8>,
opts: DbOpts<'a>,
pub cmp_fn: Option<fn(&[u8], &[u8]) -> i8>,
pub opts: DbOpts<'a>,
}
impl<'a> Default for DbOpts<'a> {

@ -7,7 +7,7 @@ pub(crate) mod tx;
#[cxx::bridge]
pub(crate) mod ffi {
#[derive(Debug)]
#[derive(Debug, Clone)]
struct DbOpts<'a> {
pub db_path: &'a str,
pub optimistic: bool,

@ -70,7 +70,7 @@ impl SessionTx {
for (name, data) in const_rules {
let store = self.new_rule_store(name.clone(), data[0].0.len());
for tuple in data {
store.put(tuple.clone(), 0)?;
store.put(tuple.clone(), 0);
}
stores.insert(name.clone(), store);
}

@ -86,15 +86,15 @@ impl SessionTx {
if is_meet {
store.aggr_meet_put(&item, &rule.aggr, 0)?;
} else if should_check_limit {
if !store.exists(&item, 0)? {
store.put(item, 0)?;
if !store.exists(&item, 0) {
store.put(item, 0);
if limiter.incr() {
trace!("early stopping due to result count limit exceeded");
return Ok(());
}
}
} else {
store.put(item, 0)?;
store.put(item, 0);
}
*changed.get_mut(k).unwrap() = true;
}
@ -109,7 +109,7 @@ impl SessionTx {
{
let item = item_res?;
trace!("item for {:?}.{}: {:?} at {}", k, rule_n, item, epoch);
store_to_use.normal_aggr_put(&item, &rule.aggr, serial)?;
store_to_use.normal_aggr_put(&item, &rule.aggr, serial);
*changed.get_mut(k).unwrap() = true;
}
}
@ -170,7 +170,7 @@ impl SessionTx {
if aggr_changed {
*changed.get_mut(k).unwrap() = true;
}
} else if store.exists(&item, 0)? {
} else if store.exists(&item, 0) {
trace!(
"item for {:?}.{}: {:?} at {}, rederived",
k,
@ -181,8 +181,8 @@ impl SessionTx {
} else {
trace!("item for {:?}.{}: {:?} at {}", k, rule_n, item, epoch);
*changed.get_mut(k).unwrap() = true;
store.put(item.clone(), epoch)?;
store.put(item, 0)?;
store.put(item.clone(), epoch);
store.put(item, 0);
if should_check_limit && limiter.incr() {
trace!("early stopping due to result count limit exceeded");
return Ok(());

@ -19,7 +19,7 @@ use crate::runtime::transact::SessionTx;
pub(crate) enum Relation {
Fixed(InlineFixedRelation),
Triple(TripleRelation),
Derived(StoredDerivedRelation),
Derived(DerivedRelation),
Join(Box<InnerJoin>),
NegJoin(Box<NegJoin>),
Reorder(ReorderRelation),
@ -346,7 +346,7 @@ impl Relation {
self.join(right, vec![], vec![])
}
pub(crate) fn derived(bindings: Vec<Symbol>, storage: TempStore) -> Self {
Self::Derived(StoredDerivedRelation {
Self::Derived(DerivedRelation {
bindings,
storage,
filters: vec![],
@ -1255,9 +1255,7 @@ impl TripleRelation {
Err(e) => return Ok(Box::new([Err(e)].into_iter())),
Ok((_, eid, val)) => {
let t = Tuple(vec![val, eid.to_value()]);
if let Err(e) = throwaway.put(t, 0) {
return Ok(Box::new([Err(e.into())].into_iter()));
}
throwaway.put(t, 0);
}
}
}
@ -1314,13 +1312,13 @@ fn get_eliminate_indices(bindings: &[Symbol], eliminate: &BTreeSet<Symbol>) -> B
}
#[derive(Debug)]
pub(crate) struct StoredDerivedRelation {
pub(crate) struct DerivedRelation {
pub(crate) bindings: Vec<Symbol>,
pub(crate) storage: TempStore,
pub(crate) filters: Vec<Expr>,
}
impl StoredDerivedRelation {
impl DerivedRelation {
fn fill_binding_indices(&mut self) -> Result<()> {
let bindings: BTreeMap<_, _> = self
.bindings
@ -1864,9 +1862,7 @@ impl InnerJoin {
.map(|i| tuple.0[*i].clone())
.collect_vec(),
);
if let Err(e) = throwaway.put(stored_tuple, 0) {
return Ok(Box::new([Err(e.into())].into_iter()));
}
throwaway.put(stored_tuple, 0);
}
Err(e) => return Ok(Box::new([Err(e)].into_iter())),
}

@ -38,7 +38,7 @@ impl SessionTx {
.collect_vec();
key.push(DataValue::from(idx as i64));
let key = Tuple(key);
ret.put_kv(key, tuple, 0)?;
ret.put_kv(key, tuple, 0);
}
Ok(ret)
}

@ -1,5 +1,4 @@
use std::collections::BTreeMap;
use std::env::temp_dir;
use std::fmt::{Debug, Formatter};
use std::sync::atomic::{AtomicU32, AtomicU64, AtomicUsize, Ordering};
use std::sync::Arc;
@ -9,7 +8,6 @@ use either::{Left, Right};
use itertools::Itertools;
use log::debug;
use serde_json::json;
use uuid::Uuid;
use cozorocks::{DbBuilder, DbIter, RawRocksDb, RocksDb};
@ -32,7 +30,7 @@ use crate::runtime::transact::SessionTx;
pub struct Db {
db: RocksDb,
temp_db: RawRocksDb,
view_db: RawRocksDb,
last_attr_id: Arc<AtomicU64>,
last_ent_id: Arc<AtomicU64>,
last_tx_id: Arc<AtomicU64>,
@ -53,26 +51,24 @@ impl Debug for Db {
impl Db {
pub fn build(builder: DbBuilder<'_>) -> Result<Self> {
let db = builder
let db_builder = builder
.use_bloom_filter(true, 10., true)
.use_capped_prefix_extractor(true, DB_KEY_PREFIX_LEN)
.use_custom_comparator("cozo_rusty_cmp", rusty_cmp, false)
.build()?;
let mut temp_db_location = temp_dir();
temp_db_location.push(format!("{}.cozo", Uuid::new_v4()));
let scratch = DbBuilder::default()
.path(temp_db_location.to_str().unwrap())
.create_if_missing(true)
.destroy_on_exit(true)
.use_bloom_filter(true, 10., true)
.use_custom_comparator("cozo_rusty_cmp", rusty_cmp, false);
let mut temp_path: String = db_builder.opts.db_path.to_string();
temp_path.push_str(".cozo_stored");
let view_db_builder = db_builder
.clone()
.path(&temp_path)
.use_capped_prefix_extractor(true, SCRATCH_DB_KEY_PREFIX_LEN)
.use_custom_comparator("cozo_rusty_scratch_cmp", rusty_scratch_cmp, false)
.build_raw(true)?
.ignore_range_deletions(true);
.use_custom_comparator("cozo_rusty_scratch_cmp", rusty_scratch_cmp, false);
let db = db_builder.build()?;
let view_db = view_db_builder.build_raw(true)?;
let ret = Self {
db,
temp_db: scratch,
view_db,
last_attr_id: Arc::new(Default::default()),
last_ent_id: Arc::new(Default::default()),
last_tx_id: Arc::new(Default::default()),
@ -89,7 +85,7 @@ impl Db {
Ok(Self {
db: self.db.clone(),
temp_db: self.temp_db.clone(),
view_db: self.view_db.clone(),
last_attr_id: self.last_attr_id.clone(),
last_ent_id: self.last_ent_id.clone(),
last_tx_id: self.last_tx_id.clone(),
@ -112,7 +108,7 @@ impl Db {
pub fn transact(&self) -> Result<SessionTx> {
let ret = SessionTx {
tx: self.db.transact().set_snapshot(true).start(),
temp_store: self.temp_db.clone(),
view_db: self.view_db.clone(),
temp_store_id: self.temp_store_id.clone(),
w_tx_id: None,
last_attr_id: self.last_attr_id.clone(),
@ -132,7 +128,7 @@ impl Db {
let ret = SessionTx {
tx: self.db.transact().set_snapshot(true).start(),
temp_store: self.temp_db.clone(),
view_db: self.view_db.clone(),
temp_store_id: self.temp_store_id.clone(),
w_tx_id: Some(cur_tx_id),
last_attr_id: self.last_attr_id.clone(),

@ -8,7 +8,7 @@ use std::sync::{Arc, RwLock};
use anyhow::Result;
use itertools::Itertools;
use cozorocks::{DbIter, RawRocksDb, RocksDbStatus};
use cozorocks::DbIter;
use crate::data::aggr::Aggregation;
use crate::data::program::MagicSymbol;
@ -41,12 +41,7 @@ impl Debug for TempStore {
}
impl TempStore {
pub(crate) fn new(
_db: RawRocksDb,
id: TempStoreId,
rule_name: MagicSymbol,
arity: usize,
) -> TempStore {
pub(crate) fn new(id: TempStoreId, rule_name: MagicSymbol, arity: usize) -> TempStore {
Self {
epoch_size: Default::default(),
mem_db: Default::default(),
@ -132,26 +127,24 @@ impl TempStore {
Ok(true)
}
}
pub(crate) fn put(&self, tuple: Tuple, epoch: u32) -> Result<(), RocksDbStatus> {
pub(crate) fn put(&self, tuple: Tuple, epoch: u32) {
self.ensure_mem_db_for_epoch(epoch);
let db = self.mem_db.try_read().unwrap();
let mut target = db.get(epoch as usize).unwrap().try_write().unwrap();
target.insert(tuple, Tuple::default());
Ok(())
}
pub(crate) fn put_kv(&self, tuple: Tuple, val: Tuple, epoch: u32) -> Result<(), RocksDbStatus> {
pub(crate) fn put_kv(&self, tuple: Tuple, val: Tuple, epoch: u32) {
self.ensure_mem_db_for_epoch(epoch);
let db = self.mem_db.try_read().unwrap();
let mut target = db.get(epoch as usize).unwrap().try_write().unwrap();
target.insert(tuple, val);
Ok(())
}
pub(crate) fn normal_aggr_put(
&self,
tuple: &Tuple,
aggrs: &[Option<(Aggregation, Vec<DataValue>)>],
serial: usize,
) -> Result<(), RocksDbStatus> {
) {
self.ensure_mem_db_for_epoch(0);
let mut vals = vec![];
for (idx, agg) in aggrs.iter().enumerate() {
@ -169,13 +162,12 @@ impl TempStore {
let target = self.mem_db.try_read().unwrap();
let mut target = target.get(0).unwrap().try_write().unwrap();
target.insert(Tuple(vals), Tuple::default());
Ok(())
}
pub(crate) fn exists(&self, tuple: &Tuple, epoch: u32) -> Result<bool, RocksDbStatus> {
pub(crate) fn exists(&self, tuple: &Tuple, epoch: u32) -> bool {
self.ensure_mem_db_for_epoch(epoch);
let target = self.mem_db.try_read().unwrap();
let target = target.get(epoch as usize).unwrap().try_read().unwrap();
Ok(target.contains_key(tuple))
target.contains_key(tuple)
}
pub(crate) fn normal_aggr_scan_and_put(
@ -251,14 +243,14 @@ impl TempStore {
}
let res_tpl = Tuple(aggr_res);
if let Some(lmt) = limiter.borrow_mut() {
if !store.exists(&res_tpl, 0)? {
store.put(res_tpl, 0)?;
if !store.exists(&res_tpl, 0) {
store.put(res_tpl, 0);
if lmt.incr() {
return Ok(true);
}
}
} else {
store.put(res_tpl, 0)?;
store.put(res_tpl, 0);
}
}
Ok(false)

@ -21,7 +21,7 @@ use crate::runtime::temp_store::{TempStore, TempStoreId};
pub struct SessionTx {
pub(crate) tx: Tx,
pub(crate) temp_store: RawRocksDb,
pub(crate) view_db: RawRocksDb,
pub(crate) temp_store_id: Arc<AtomicU32>,
pub(crate) w_tx_id: Option<TxId>,
pub(crate) last_attr_id: Arc<AtomicU64>,
@ -71,19 +71,13 @@ impl SessionTx {
pub(crate) fn new_rule_store(&self, rule_name: MagicSymbol, arity: usize) -> TempStore {
let old_count = self.temp_store_id.fetch_add(1, Ordering::AcqRel);
let old_count = old_count & 0x00ff_ffffu32;
TempStore::new(
self.temp_store.clone(),
TempStoreId(old_count),
rule_name,
arity,
)
TempStore::new(TempStoreId(old_count), rule_name, arity)
}
pub(crate) fn new_temp_store(&self) -> TempStore {
let old_count = self.temp_store_id.fetch_add(1, Ordering::AcqRel);
let old_count = old_count & 0x00ff_ffffu32;
TempStore::new(
self.temp_store.clone(),
TempStoreId(old_count),
MagicSymbol::Muggle {
inner: Symbol::from(""),

Loading…
Cancel
Save