recombine main and view relations

main
Ziyang Hu 2 years ago
parent 80df573cee
commit 49d213f66e

@ -46,7 +46,9 @@ ColumnFamilyOptions default_cf_options() {
return options; return options;
} }
shared_ptr<RocksDbBridge> open_db(const DbOpts &opts, RocksDbStatus &status, bool use_cmp, RustComparatorFn cmp_impl) { shared_ptr<RocksDbBridge> open_db(const DbOpts &opts, RocksDbStatus &status, bool use_cmp,
RustComparatorFn pri_cmp_impl,
RustComparatorFn snd_cmp_impl) {
auto options = default_db_options(); auto options = default_db_options();
auto cf_pri_opts = default_cf_options(); auto cf_pri_opts = default_cf_options();
auto cf_snd_opts = default_cf_options(); auto cf_snd_opts = default_cf_options();
@ -89,32 +91,40 @@ shared_ptr<RocksDbBridge> open_db(const DbOpts &opts, RocksDbStatus &status, boo
cf_pri_opts.table_factory.reset(NewBlockBasedTableFactory(table_options)); cf_pri_opts.table_factory.reset(NewBlockBasedTableFactory(table_options));
cf_snd_opts.table_factory.reset(NewBlockBasedTableFactory(table_options)); cf_snd_opts.table_factory.reset(NewBlockBasedTableFactory(table_options));
} }
if (opts.use_capped_prefix_extractor) { if (opts.pri_use_capped_prefix_extractor) {
options.prefix_extractor.reset(NewCappedPrefixTransform(opts.capped_prefix_extractor_len)); cf_pri_opts.prefix_extractor.reset(NewCappedPrefixTransform(opts.pri_capped_prefix_extractor_len));
cf_pri_opts.prefix_extractor.reset(NewCappedPrefixTransform(opts.capped_prefix_extractor_len));
cf_snd_opts.prefix_extractor.reset(NewCappedPrefixTransform(opts.capped_prefix_extractor_len));
} }
if (opts.use_fixed_prefix_extractor) { if (opts.snd_use_capped_prefix_extractor) {
options.prefix_extractor.reset(NewFixedPrefixTransform(opts.fixed_prefix_extractor_len)); cf_snd_opts.prefix_extractor.reset(NewCappedPrefixTransform(opts.snd_capped_prefix_extractor_len));
cf_pri_opts.prefix_extractor.reset(NewFixedPrefixTransform(opts.fixed_prefix_extractor_len));
cf_snd_opts.prefix_extractor.reset(NewFixedPrefixTransform(opts.fixed_prefix_extractor_len));
} }
RustComparator *cmp = nullptr; if (opts.pri_use_fixed_prefix_extractor) {
cf_pri_opts.prefix_extractor.reset(NewFixedPrefixTransform(opts.pri_fixed_prefix_extractor_len));
}
if (opts.snd_use_fixed_prefix_extractor) {
cf_pri_opts.prefix_extractor.reset(NewFixedPrefixTransform(opts.snd_fixed_prefix_extractor_len));
}
RustComparator *pri_cmp = nullptr;
RustComparator *snd_cmp = nullptr;
if (use_cmp) { if (use_cmp) {
cmp = new RustComparator( pri_cmp = new RustComparator(
string(opts.comparator_name), string(opts.pri_comparator_name),
opts.comparator_different_bytes_can_be_equal, opts.pri_comparator_different_bytes_can_be_equal,
cmp_impl); pri_cmp_impl);
options.comparator = cmp; cf_pri_opts.comparator = pri_cmp;
cf_pri_opts.comparator = cmp;
cf_snd_opts.comparator = cmp; snd_cmp = new RustComparator(
string(opts.snd_comparator_name),
opts.snd_comparator_different_bytes_can_be_equal,
snd_cmp_impl);
cf_snd_opts.comparator = snd_cmp;
} }
options.create_missing_column_families = true; options.create_missing_column_families = true;
shared_ptr<RocksDbBridge> db = make_shared<RocksDbBridge>(); shared_ptr<RocksDbBridge> db = make_shared<RocksDbBridge>();
db->db_path = string(opts.db_path); db->db_path = string(opts.db_path);
db->comparator.reset(cmp); db->pri_comparator.reset(pri_cmp);
db->snd_comparator.reset(snd_cmp);
std::vector<ColumnFamilyDescriptor> column_families; std::vector<ColumnFamilyDescriptor> column_families;
column_families.emplace_back(ColumnFamilyDescriptor( column_families.emplace_back(ColumnFamilyDescriptor(

@ -41,7 +41,8 @@ struct SstFileWriterBridge {
}; };
struct RocksDbBridge { struct RocksDbBridge {
unique_ptr<Comparator> comparator; unique_ptr<Comparator> pri_comparator;
unique_ptr<Comparator> snd_comparator;
unique_ptr<TransactionDB> db; unique_ptr<TransactionDB> db;
vector<ColumnFamilyHandle *> cf_handles; vector<ColumnFamilyHandle *> cf_handles;
@ -141,6 +142,8 @@ public:
bool can_different_bytes_be_equal; bool can_different_bytes_be_equal;
}; };
shared_ptr<RocksDbBridge> open_db(const DbOpts &opts, RocksDbStatus &status, bool use_cmp, RustComparatorFn cmp_impl); shared_ptr<RocksDbBridge>
open_db(const DbOpts &opts, RocksDbStatus &status, bool use_cmp, RustComparatorFn pri_cmp_impl,
RustComparatorFn snd_cmp_impl);
#endif //COZOROCKS_DB_H #endif //COZOROCKS_DB_H

@ -8,7 +8,8 @@ use crate::CfHandle;
#[derive(Default, Clone)] #[derive(Default, Clone)]
pub struct DbBuilder<'a> { pub struct DbBuilder<'a> {
pub cmp_fn: Option<fn(&[u8], &[u8]) -> i8>, pub pri_cmp_fn: Option<fn(&[u8], &[u8]) -> i8>,
pub snd_cmp_fn: Option<fn(&[u8], &[u8]) -> i8>,
pub opts: DbOpts<'a>, pub opts: DbOpts<'a>,
} }
@ -16,7 +17,6 @@ impl<'a> Default for DbOpts<'a> {
fn default() -> Self { fn default() -> Self {
Self { Self {
db_path: "", db_path: "",
optimistic: false,
prepare_for_bulk_load: false, prepare_for_bulk_load: false,
increase_parallelism: 0, increase_parallelism: 0,
optimize_level_style_compaction: false, optimize_level_style_compaction: false,
@ -29,12 +29,18 @@ impl<'a> Default for DbOpts<'a> {
use_bloom_filter: false, use_bloom_filter: false,
bloom_filter_bits_per_key: 0.0, bloom_filter_bits_per_key: 0.0,
bloom_filter_whole_key_filtering: false, bloom_filter_whole_key_filtering: false,
use_capped_prefix_extractor: false, pri_use_capped_prefix_extractor: false,
capped_prefix_extractor_len: 0, pri_capped_prefix_extractor_len: 0,
use_fixed_prefix_extractor: false, pri_use_fixed_prefix_extractor: false,
fixed_prefix_extractor_len: 0, pri_fixed_prefix_extractor_len: 0,
comparator_name: "", pri_comparator_name: "",
comparator_different_bytes_can_be_equal: false, pri_comparator_different_bytes_can_be_equal: false,
snd_use_capped_prefix_extractor: false,
snd_capped_prefix_extractor_len: 0,
snd_use_fixed_prefix_extractor: false,
snd_fixed_prefix_extractor_len: 0,
snd_comparator_name: "",
snd_comparator_different_bytes_can_be_equal: false,
destroy_on_exit: false, destroy_on_exit: false,
} }
} }
@ -45,10 +51,6 @@ impl<'a> DbBuilder<'a> {
self.opts.db_path = path; self.opts.db_path = path;
self self
} }
pub fn optimistic(mut self, val: bool) -> Self {
self.opts.optimistic = val;
self
}
pub fn prepare_for_bulk_load(mut self, val: bool) -> Self { pub fn prepare_for_bulk_load(mut self, val: bool) -> Self {
self.opts.prepare_for_bulk_load = val; self.opts.prepare_for_bulk_load = val;
self self
@ -93,25 +95,46 @@ impl<'a> DbBuilder<'a> {
self.opts.bloom_filter_whole_key_filtering = whole_key_filtering; self.opts.bloom_filter_whole_key_filtering = whole_key_filtering;
self self
} }
pub fn use_capped_prefix_extractor(mut self, enable: bool, len: usize) -> Self { pub fn pri_use_capped_prefix_extractor(mut self, enable: bool, len: usize) -> Self {
self.opts.use_capped_prefix_extractor = enable; self.opts.pri_use_capped_prefix_extractor = enable;
self.opts.capped_prefix_extractor_len = len; self.opts.pri_capped_prefix_extractor_len = len;
self
}
pub fn snd_use_capped_prefix_extractor(mut self, enable: bool, len: usize) -> Self {
self.opts.snd_use_capped_prefix_extractor = enable;
self.opts.snd_capped_prefix_extractor_len = len;
self self
} }
pub fn use_fixed_prefix_extractor(mut self, enable: bool, len: usize) -> Self { pub fn pri_use_fixed_prefix_extractor(mut self, enable: bool, len: usize) -> Self {
self.opts.use_fixed_prefix_extractor = enable; self.opts.pri_use_fixed_prefix_extractor = enable;
self.opts.fixed_prefix_extractor_len = len; self.opts.pri_fixed_prefix_extractor_len = len;
self
}
pub fn snd_use_fixed_prefix_extractor(mut self, enable: bool, len: usize) -> Self {
self.opts.snd_use_fixed_prefix_extractor = enable;
self.opts.snd_fixed_prefix_extractor_len = len;
self
}
pub fn pri_use_custom_comparator(
mut self,
name: &'a str,
cmp: fn(&[u8], &[u8]) -> i8,
different_bytes_can_be_equal: bool,
) -> Self {
self.pri_cmp_fn = Some(cmp);
self.opts.pri_comparator_name = name;
self.opts.pri_comparator_different_bytes_can_be_equal = different_bytes_can_be_equal;
self self
} }
pub fn use_custom_comparator( pub fn snd_use_custom_comparator(
mut self, mut self,
name: &'a str, name: &'a str,
cmp: fn(&[u8], &[u8]) -> i8, cmp: fn(&[u8], &[u8]) -> i8,
different_bytes_can_be_equal: bool, different_bytes_can_be_equal: bool,
) -> Self { ) -> Self {
self.cmp_fn = Some(cmp); self.snd_cmp_fn = Some(cmp);
self.opts.comparator_name = name; self.opts.snd_comparator_name = name;
self.opts.comparator_different_bytes_can_be_equal = different_bytes_can_be_equal; self.opts.snd_comparator_different_bytes_can_be_equal = different_bytes_can_be_equal;
self self
} }
pub fn destroy_on_exit(mut self, destroy: bool) -> Self { pub fn destroy_on_exit(mut self, destroy: bool) -> Self {
@ -128,8 +151,9 @@ impl<'a> DbBuilder<'a> {
let result = open_db( let result = open_db(
&self.opts, &self.opts,
&mut status, &mut status,
self.cmp_fn.is_some(), self.pri_cmp_fn.is_some() || self.snd_cmp_fn.is_some(),
self.cmp_fn.unwrap_or(dummy), self.pri_cmp_fn.unwrap_or(dummy),
self.snd_cmp_fn.unwrap_or(dummy)
); );
if status.is_ok() { if status.is_ok() {
Ok(RocksDb { inner: result }) Ok(RocksDb { inner: result })

@ -14,7 +14,6 @@ pub(crate) mod ffi {
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
struct DbOpts<'a> { struct DbOpts<'a> {
pub db_path: &'a str, pub db_path: &'a str,
pub optimistic: bool,
pub prepare_for_bulk_load: bool, pub prepare_for_bulk_load: bool,
pub increase_parallelism: usize, pub increase_parallelism: usize,
pub optimize_level_style_compaction: bool, pub optimize_level_style_compaction: bool,
@ -27,12 +26,18 @@ pub(crate) mod ffi {
pub use_bloom_filter: bool, pub use_bloom_filter: bool,
pub bloom_filter_bits_per_key: f64, pub bloom_filter_bits_per_key: f64,
pub bloom_filter_whole_key_filtering: bool, pub bloom_filter_whole_key_filtering: bool,
pub use_capped_prefix_extractor: bool, pub pri_use_capped_prefix_extractor: bool,
pub capped_prefix_extractor_len: usize, pub pri_capped_prefix_extractor_len: usize,
pub use_fixed_prefix_extractor: bool, pub pri_use_fixed_prefix_extractor: bool,
pub fixed_prefix_extractor_len: usize, pub pri_fixed_prefix_extractor_len: usize,
pub comparator_name: &'a str, pub snd_use_capped_prefix_extractor: bool,
pub comparator_different_bytes_can_be_equal: bool, pub snd_capped_prefix_extractor_len: usize,
pub snd_use_fixed_prefix_extractor: bool,
pub snd_fixed_prefix_extractor_len: usize,
pub pri_comparator_name: &'a str,
pub pri_comparator_different_bytes_can_be_equal: bool,
pub snd_comparator_name: &'a str,
pub snd_comparator_different_bytes_can_be_equal: bool,
pub destroy_on_exit: bool, pub destroy_on_exit: bool,
} }
@ -119,7 +124,8 @@ pub(crate) mod ffi {
builder: &DbOpts, builder: &DbOpts,
status: &mut RocksDbStatus, status: &mut RocksDbStatus,
use_cmp: bool, use_cmp: bool,
cmp_impl: fn(&[u8], &[u8]) -> i8, pri_cmp_impl: fn(&[u8], &[u8]) -> i8,
snd_cmp_impl: fn(&[u8], &[u8]) -> i8,
) -> SharedPtr<RocksDbBridge>; ) -> SharedPtr<RocksDbBridge>;
fn transact(self: &RocksDbBridge) -> UniquePtr<TxBridge>; fn transact(self: &RocksDbBridge) -> UniquePtr<TxBridge>;
fn del_range( fn del_range(

@ -271,7 +271,7 @@ impl MagicAlgoRuleArg {
MagicAlgoRuleArg::Stored(s, _) => { MagicAlgoRuleArg::Stored(s, _) => {
let view_rel = tx.get_view_rel(s)?; let view_rel = tx.get_view_rel(s)?;
let t = Tuple(vec![prefix.clone()]); let t = Tuple(vec![prefix.clone()]);
Box::new(view_rel.scan_prefix(&t)) Box::new(view_rel.scan_prefix(tx, &t))
} }
MagicAlgoRuleArg::Triple(attr, _, dir, vld) => { MagicAlgoRuleArg::Triple(attr, _, dir, vld) => {
if *dir == TripleDir::Bwd && !attr.val_type.is_ref_type() { if *dir == TripleDir::Bwd && !attr.val_type.is_ref_type() {
@ -362,7 +362,7 @@ impl MagicAlgoRuleArg {
} }
MagicAlgoRuleArg::Stored(s, _) => { MagicAlgoRuleArg::Stored(s, _) => {
let view_rel = tx.get_view_rel(s)?; let view_rel = tx.get_view_rel(s)?;
Box::new(view_rel.scan_all()?) Box::new(view_rel.scan_all(tx)?)
} }
MagicAlgoRuleArg::Triple(attr, _, dir, vld) => match dir { MagicAlgoRuleArg::Triple(attr, _, dir, vld) => match dir {
TripleDir::Fwd => { TripleDir::Fwd => {

@ -1,9 +1,9 @@
use std::collections::BTreeMap; use std::collections::BTreeMap;
use itertools::Itertools; use itertools::Itertools;
use miette::{ensure, miette, IntoDiagnostic, Result}; use miette::{ensure, miette, Result};
use serde_json::{json, Map}; use serde_json::{json, Map};
use tempfile::NamedTempFile;
use cozorocks::CfHandle::Snd; use cozorocks::CfHandle::Snd;
use crate::data::attr::Attribute; use crate::data::attr::Attribute;
@ -47,9 +47,12 @@ impl SessionTx {
res_iter: impl Iterator<Item = Result<Tuple>> + 'a, res_iter: impl Iterator<Item = Result<Tuple>> + 'a,
op: ViewOp, op: ViewOp,
meta: &ViewRelMetadata, meta: &ViewRelMetadata,
) -> Result<()> { ) -> Result<Option<(Vec<u8>, Vec<u8>)>> {
let mut to_clear = None;
if op == ViewOp::Rederive { if op == ViewOp::Rederive {
let _ = self.destroy_view_rel(&meta.name); if let Ok(c) = self.destroy_view_rel(&meta.name) {
to_clear = Some(c);
}
} }
let view_store = if op == ViewOp::Rederive || op == ViewOp::Create { let view_store = if op == ViewOp::Rederive || op == ViewOp::Create {
self.create_view_rel(meta.clone())? self.create_view_rel(meta.clone())?
@ -68,33 +71,19 @@ impl SessionTx {
found found
}; };
if op == ViewOp::Retract { if op == ViewOp::Retract {
let mut vtx = self.view_db.transact().start();
for data in res_iter { for data in res_iter {
let data = data?; let data = data?;
let encoded = data.encode_as_key(view_store.metadata.id); let encoded = data.encode_as_key(view_store.metadata.id);
vtx.del(&encoded, Snd)?; self.tx.del(&encoded, Snd)?;
} }
vtx.commit()?;
} else { } else {
let file = NamedTempFile::new().into_diagnostic()?;
let path = file.into_temp_path();
let path = path.to_string_lossy();
let mut writer = self.view_db.get_sst_writer(&path, Snd)?;
let mut written = false;
for data in res_iter { for data in res_iter {
let data = data?; let data = data?;
let encoded = data.encode_as_key(view_store.metadata.id); let encoded = data.encode_as_key(view_store.metadata.id);
writer.put(&encoded, &[])?; self.tx.put(&encoded, &[], Snd)?;
written = true;
}
if written {
writer.finish()?;
self.view_db.ingest_sst_file(&path, Snd)?;
} }
} }
Ok(()) Ok(to_clear)
} }
fn run_pull_on_item(&self, id: EntityId, specs: &[OutPullSpecWithAttr]) -> Result<JsonValue> { fn run_pull_on_item(&self, id: EntityId, specs: &[OutPullSpecWithAttr]) -> Result<JsonValue> {
let mut ret_map = Map::default(); let mut ret_map = Map::default();

@ -1350,6 +1350,7 @@ impl ViewRelation {
fn prefix_join<'a>( fn prefix_join<'a>(
&'a self, &'a self,
tx: &'a SessionTx,
left_iter: TupleIter<'a>, left_iter: TupleIter<'a>,
(left_join_indices, right_join_indices): (Vec<usize>, Vec<usize>), (left_join_indices, right_join_indices): (Vec<usize>, Vec<usize>),
eliminate_indices: BTreeSet<usize>, eliminate_indices: BTreeSet<usize>,
@ -1382,7 +1383,7 @@ impl ViewRelation {
{ {
return Left( return Left(
self.storage self.storage
.scan_bounded_prefix(&prefix, &l_bound, &u_bound) .scan_bounded_prefix(tx, &prefix, &l_bound, &u_bound)
.filter_map_ok(move |found| { .filter_map_ok(move |found| {
// dbg!("filter", &tuple, &prefix, &found); // dbg!("filter", &tuple, &prefix, &found);
let mut ret = tuple.0.clone(); let mut ret = tuple.0.clone();
@ -1395,7 +1396,7 @@ impl ViewRelation {
skip_range_check = true; skip_range_check = true;
Right( Right(
self.storage self.storage
.scan_prefix(&prefix) .scan_prefix(tx, &prefix)
.filter_map_ok(move |found| { .filter_map_ok(move |found| {
// dbg!("filter", &tuple, &prefix, &found); // dbg!("filter", &tuple, &prefix, &found);
let mut ret = tuple.0.clone(); let mut ret = tuple.0.clone();
@ -1423,6 +1424,7 @@ impl ViewRelation {
fn neg_join<'a>( fn neg_join<'a>(
&'a self, &'a self,
tx: &'a SessionTx,
left_iter: TupleIter<'a>, left_iter: TupleIter<'a>,
(left_join_indices, right_join_indices): (Vec<usize>, Vec<usize>), (left_join_indices, right_join_indices): (Vec<usize>, Vec<usize>),
eliminate_indices: BTreeSet<usize>, eliminate_indices: BTreeSet<usize>,
@ -1448,7 +1450,7 @@ impl ViewRelation {
.collect_vec(), .collect_vec(),
); );
'outer: for found in self.storage.scan_prefix(&prefix) { 'outer: for found in self.storage.scan_prefix(tx, &prefix) {
let found = found?; let found = found?;
for (left_idx, right_idx) in for (left_idx, right_idx) in
left_join_indices.iter().zip(right_join_indices.iter()) left_join_indices.iter().zip(right_join_indices.iter())
@ -1483,8 +1485,8 @@ impl ViewRelation {
)) ))
} }
fn iter(&self) -> Result<TupleIter<'_>> { fn iter(&self, tx: &SessionTx) -> Result<TupleIter<'_>> {
let it = self.storage.scan_all()?; let it = self.storage.scan_all(tx)?;
Ok(if self.filters.is_empty() { Ok(if self.filters.is_empty() {
Box::new(it) Box::new(it)
} else { } else {
@ -1838,7 +1840,7 @@ impl Relation {
Relation::Fixed(f) => Ok(Box::new(f.data.iter().map(|t| Ok(Tuple(t.clone()))))), Relation::Fixed(f) => Ok(Box::new(f.data.iter().map(|t| Ok(Tuple(t.clone()))))),
Relation::Triple(r) => r.iter(tx), Relation::Triple(r) => r.iter(tx),
Relation::Derived(r) => r.iter(epoch, use_delta), Relation::Derived(r) => r.iter(epoch, use_delta),
Relation::View(v) => v.iter(), Relation::View(v) => v.iter(tx),
Relation::Join(j) => j.iter(tx, epoch, use_delta), Relation::Join(j) => j.iter(tx, epoch, use_delta),
Relation::Reorder(r) => r.iter(tx, epoch, use_delta), Relation::Reorder(r) => r.iter(tx, epoch, use_delta),
Relation::Filter(r) => r.iter(tx, epoch, use_delta), Relation::Filter(r) => r.iter(tx, epoch, use_delta),
@ -1917,6 +1919,7 @@ impl NegJoin {
) )
.unwrap(); .unwrap();
v.neg_join( v.neg_join(
tx,
self.left.iter(tx, epoch, use_delta)?, self.left.iter(tx, epoch, use_delta)?,
join_indices, join_indices,
eliminate_indices, eliminate_indices,
@ -2036,6 +2039,7 @@ impl InnerJoin {
.unwrap(); .unwrap();
if r.join_is_prefix(&join_indices.1) { if r.join_is_prefix(&join_indices.1) {
r.prefix_join( r.prefix_join(
tx,
self.left.iter(tx, epoch, use_delta)?, self.left.iter(tx, epoch, use_delta)?,
join_indices, join_indices,
eliminate_indices, eliminate_indices,

@ -13,8 +13,8 @@ use miette::{bail, ensure, miette, IntoDiagnostic, Result};
use serde_json::json; use serde_json::json;
use smartstring::SmartString; use smartstring::SmartString;
use cozorocks::{DbBuilder, DbIter, RocksDb};
use cozorocks::CfHandle::{Pri, Snd}; use cozorocks::CfHandle::{Pri, Snd};
use cozorocks::{DbBuilder, DbIter, RocksDb};
use crate::data::compare::{rusty_cmp, DB_KEY_PREFIX_LEN}; use crate::data::compare::{rusty_cmp, DB_KEY_PREFIX_LEN};
use crate::data::encode::{largest_key, smallest_key}; use crate::data::encode::{largest_key, smallest_key};
@ -53,7 +53,6 @@ impl Drop for RunningQueryCleanup {
pub struct Db { pub struct Db {
db: RocksDb, db: RocksDb,
view_db: RocksDb,
last_attr_id: Arc<AtomicU64>, last_attr_id: Arc<AtomicU64>,
last_ent_id: Arc<AtomicU64>, last_ent_id: Arc<AtomicU64>,
last_tx_id: Arc<AtomicU64>, last_tx_id: Arc<AtomicU64>,
@ -79,28 +78,19 @@ impl Db {
let path = builder.opts.db_path; let path = builder.opts.db_path;
fs::create_dir_all(path).into_diagnostic()?; fs::create_dir_all(path).into_diagnostic()?;
let path_buf = PathBuf::from(path); let path_buf = PathBuf::from(path);
let mut triple_path = path_buf.clone(); let mut store_path = path_buf.clone();
triple_path.push("triple"); store_path.push("data");
let db_builder = builder let db_builder = builder
.use_capped_prefix_extractor(true, DB_KEY_PREFIX_LEN) .pri_use_capped_prefix_extractor(true, DB_KEY_PREFIX_LEN)
.optimistic(false) .pri_use_custom_comparator("cozo_rusty_cmp", rusty_cmp, false)
.use_custom_comparator("cozo_rusty_cmp", rusty_cmp, false) .snd_use_capped_prefix_extractor(true, SCRATCH_DB_KEY_PREFIX_LEN)
.path(triple_path.to_str().unwrap()); .snd_use_custom_comparator("cozo_rusty_scratch_cmp", rusty_scratch_cmp, false)
let mut rel_path = path_buf; .path(store_path.to_str().unwrap());
rel_path.push("rel");
let view_db_builder = db_builder
.clone()
.optimistic(false)
.path(rel_path.to_str().unwrap())
.use_capped_prefix_extractor(true, SCRATCH_DB_KEY_PREFIX_LEN)
.use_custom_comparator("cozo_rusty_scratch_cmp", rusty_scratch_cmp, false);
let db = db_builder.build()?; let db = db_builder.build()?;
let view_db = view_db_builder.build()?;
let ret = Self { let ret = Self {
db, db,
view_db,
last_attr_id: Arc::new(Default::default()), last_attr_id: Arc::new(Default::default()),
last_ent_id: Arc::new(Default::default()), last_ent_id: Arc::new(Default::default()),
last_tx_id: Arc::new(Default::default()), last_tx_id: Arc::new(Default::default()),
@ -133,7 +123,6 @@ impl Db {
Ok(Self { Ok(Self {
db: self.db.clone(), db: self.db.clone(),
view_db: self.view_db.clone(),
last_attr_id: self.last_attr_id.clone(), last_attr_id: self.last_attr_id.clone(),
last_ent_id: self.last_ent_id.clone(), last_ent_id: self.last_ent_id.clone(),
last_tx_id: self.last_tx_id.clone(), last_tx_id: self.last_tx_id.clone(),
@ -160,7 +149,6 @@ impl Db {
pub fn transact(&self) -> Result<SessionTx> { pub fn transact(&self) -> Result<SessionTx> {
let ret = SessionTx { let ret = SessionTx {
tx: self.db.transact().set_snapshot(true).start(), tx: self.db.transact().set_snapshot(true).start(),
view_db: self.view_db.clone(),
mem_store_id: Default::default(), mem_store_id: Default::default(),
view_store_id: self.view_store_id.clone(), view_store_id: self.view_store_id.clone(),
w_tx_id: None, w_tx_id: None,
@ -179,7 +167,6 @@ impl Db {
let ret = SessionTx { let ret = SessionTx {
tx: self.db.transact().set_snapshot(true).start(), tx: self.db.transact().set_snapshot(true).start(),
view_db: self.view_db.clone(),
mem_store_id: Default::default(), mem_store_id: Default::default(),
view_store_id: self.view_store_id.clone(), view_store_id: self.view_store_id.clone(),
w_tx_id: Some(cur_tx_id), w_tx_id: Some(cur_tx_id),
@ -197,34 +184,6 @@ impl Db {
it.seek_to_start(); it.seek_to_start();
it it
} }
// pub fn pull(&self, eid: &JsonValue, payload: &JsonValue, vld: &JsonValue) -> Result<JsonValue> {
// let eid = EntityId::try_from(eid)?;
// let vld = match vld {
// JsonValue::Null => Validity::current(),
// v => Validity::try_from(v)?,
// };
// let mut tx = self.transact()?;
// let specs = tx.parse_pull(payload, 0)?;
// let mut collected = Default::default();
// let mut recursive_seen = Default::default();
// for (idx, spec) in specs.iter().enumerate() {
// tx.pull(
// eid,
// vld,
// spec,
// 0,
// &specs,
// CurrentPath::new(idx)?,
// &mut collected,
// &mut recursive_seen,
// )?;
// }
// Ok(JsonValue::Object(collected))
// }
// pub fn run_tx_triples(&self, payload: &str) -> Result<JsonValue> {
// let payload = parse_tx_to_json(payload)?;
// self.transact_triples(&payload)
// }
fn transact_triples(&self, payloads: Vec<Quintuple>) -> Result<JsonValue> { fn transact_triples(&self, payloads: Vec<Quintuple>) -> Result<JsonValue> {
let mut tx = self.transact_write()?; let mut tx = self.transact_write()?;
let res: JsonValue = tx let res: JsonValue = tx
@ -332,7 +291,11 @@ impl Db {
} }
} }
fn run_query(&self, input_program: InputProgram) -> Result<JsonValue> { fn run_query(&self, input_program: InputProgram) -> Result<JsonValue> {
let mut tx = self.transact()?; let mut tx = if input_program.out_opts.as_view.is_some() {
self.transact_write()?
} else {
self.transact()?
};
if let Some((meta, op)) = &input_program.out_opts.as_view { if let Some((meta, op)) = &input_program.out_opts.as_view {
if *op == ViewOp::Create { if *op == ViewOp::Create {
ensure!( ensure!(
@ -404,7 +367,11 @@ impl Db {
Right(sorted_iter) Right(sorted_iter)
}; };
if let Some((meta, view_op)) = input_program.out_opts.as_view { if let Some((meta, view_op)) = input_program.out_opts.as_view {
tx.execute_view(sorted_iter, view_op, &meta)?; let to_clear = tx.execute_view(sorted_iter, view_op, &meta)?;
tx.commit_tx("", false)?;
if let Some((lower, upper)) = to_clear {
self.db.range_del(&lower, &upper, Snd)?;
}
Ok(json!({"view": "OK"})) Ok(json!({"view": "OK"}))
} else { } else {
let ret: Vec<_> = tx.run_pull_on_query_results( let ret: Vec<_> = tx.run_pull_on_query_results(
@ -427,7 +394,11 @@ impl Db {
}; };
if let Some((meta, view_op)) = input_program.out_opts.as_view { if let Some((meta, view_op)) = input_program.out_opts.as_view {
tx.execute_view(scan, view_op, &meta)?; let to_clear = tx.execute_view(scan, view_op, &meta)?;
tx.commit_tx("", false)?;
if let Some((lower, upper)) = to_clear {
self.db.range_del(&lower, &upper, Snd)?;
}
Ok(json!({"view": "OK"})) Ok(json!({"view": "OK"}))
} else { } else {
let ret: Vec<_> = tx.run_pull_on_query_results( let ret: Vec<_> = tx.run_pull_on_query_results(
@ -442,8 +413,10 @@ impl Db {
} }
pub fn remove_view(&self, name: &str) -> Result<()> { pub fn remove_view(&self, name: &str) -> Result<()> {
let name = Symbol::from(name); let name = Symbol::from(name);
let tx = self.transact()?; let mut tx = self.transact_write()?;
tx.destroy_view_rel(&name) let (lower, upper) = tx.destroy_view_rel(&name)?;
self.db.range_del(&lower, &upper, Snd)?;
Ok(())
} }
pub fn list_running(&self) -> Result<JsonValue> { pub fn list_running(&self) -> Result<JsonValue> {
let res = self let res = self
@ -461,7 +434,7 @@ impl Db {
ks.push(DataValue::Str(SmartString::from(*el))); ks.push(DataValue::Str(SmartString::from(*el)));
} }
let key = Tuple(ks).encode_as_key(ViewRelId::SYSTEM); let key = Tuple(ks).encode_as_key(ViewRelId::SYSTEM);
let mut vtx = self.view_db.transact().start(); let mut vtx = self.db.transact().start();
vtx.put(&key, v, Snd)?; vtx.put(&key, v, Snd)?;
vtx.commit()?; vtx.commit()?;
Ok(()) Ok(())
@ -472,7 +445,7 @@ impl Db {
ks.push(DataValue::Str(SmartString::from(*el))); ks.push(DataValue::Str(SmartString::from(*el)));
} }
let key = Tuple(ks).encode_as_key(ViewRelId::SYSTEM); let key = Tuple(ks).encode_as_key(ViewRelId::SYSTEM);
let mut vtx = self.view_db.transact().start(); let mut vtx = self.db.transact().start();
vtx.del(&key, Snd)?; vtx.del(&key, Snd)?;
vtx.commit()?; vtx.commit()?;
Ok(()) Ok(())
@ -483,7 +456,7 @@ impl Db {
ks.push(DataValue::Str(SmartString::from(*el))); ks.push(DataValue::Str(SmartString::from(*el)));
} }
let key = Tuple(ks).encode_as_key(ViewRelId::SYSTEM); let key = Tuple(ks).encode_as_key(ViewRelId::SYSTEM);
let vtx = self.view_db.transact().start(); let vtx = self.db.transact().start();
Ok(match vtx.get(&key, false, Snd)? { Ok(match vtx.get(&key, false, Snd)? {
None => None, None => None,
Some(slice) => Some(slice.to_vec()), Some(slice) => Some(slice.to_vec()),
@ -499,7 +472,7 @@ impl Db {
} }
let upper_bound = Tuple(vec![DataValue::Bot]); let upper_bound = Tuple(vec![DataValue::Bot]);
let mut it = self let mut it = self
.view_db .db
.transact() .transact()
.start() .start()
.iterator(Snd) .iterator(Snd)
@ -557,7 +530,7 @@ impl Db {
)))]) )))])
.encode_as_key(ViewRelId::SYSTEM); .encode_as_key(ViewRelId::SYSTEM);
let mut it = self let mut it = self
.view_db .db
.transact() .transact()
.start() .start()
.iterator(Snd) .iterator(Snd)

@ -8,7 +8,7 @@ use rmp_serde::Serializer;
use serde::Serialize; use serde::Serialize;
use smallvec::SmallVec; use smallvec::SmallVec;
use cozorocks::{DbIter, RocksDb, Tx}; use cozorocks::{DbIter, Tx};
use cozorocks::CfHandle::{Pri, Snd}; use cozorocks::CfHandle::{Pri, Snd};
use crate::data::attr::Attribute; use crate::data::attr::Attribute;
@ -25,7 +25,6 @@ use crate::runtime::view::ViewRelId;
pub struct SessionTx { pub struct SessionTx {
pub(crate) tx: Tx, pub(crate) tx: Tx,
pub(crate) view_db: RocksDb,
pub(crate) view_store_id: Arc<AtomicU64>, pub(crate) view_store_id: Arc<AtomicU64>,
pub(crate) mem_store_id: Arc<AtomicU32>, pub(crate) mem_store_id: Arc<AtomicU32>,
pub(crate) w_tx_id: Option<TxId>, pub(crate) w_tx_id: Option<TxId>,
@ -116,8 +115,7 @@ impl SessionTx {
pub(crate) fn load_last_view_store_id(&self) -> Result<ViewRelId> { pub(crate) fn load_last_view_store_id(&self) -> Result<ViewRelId> {
let tuple = Tuple(vec![DataValue::Null]); let tuple = Tuple(vec![DataValue::Null]);
let t_encoded = tuple.encode_as_key(ViewRelId::SYSTEM); let t_encoded = tuple.encode_as_key(ViewRelId::SYSTEM);
let vtx = self.view_db.transact().start(); let found = self.tx.get(&t_encoded, false, Snd)?;
let found = vtx.get(&t_encoded, false, Snd)?;
match found { match found {
None => Ok(ViewRelId::SYSTEM), None => Ok(ViewRelId::SYSTEM),
Some(slice) => ViewRelId::raw_decode(&slice), Some(slice) => ViewRelId::raw_decode(&slice),

@ -6,7 +6,7 @@ use rmp_serde::Serializer;
use serde::Serialize; use serde::Serialize;
use cozorocks::CfHandle::Snd; use cozorocks::CfHandle::Snd;
use cozorocks::{DbIter, RocksDb, Tx}; use cozorocks::DbIter;
use crate::data::symb::Symbol; use crate::data::symb::Symbol;
use crate::data::tuple::{EncodedTuple, Tuple}; use crate::data::tuple::{EncodedTuple, Tuple};
@ -70,7 +70,6 @@ pub(crate) struct ViewRelMetadata {
#[derive(Clone)] #[derive(Clone)]
pub(crate) struct ViewRelStore { pub(crate) struct ViewRelStore {
view_db: RocksDb,
pub(crate) metadata: ViewRelMetadata, pub(crate) metadata: ViewRelMetadata,
} }
@ -81,21 +80,26 @@ impl Debug for ViewRelStore {
} }
impl ViewRelStore { impl ViewRelStore {
pub(crate) fn scan_all(&self) -> Result<impl Iterator<Item = Result<Tuple>>> { pub(crate) fn scan_all(&self, tx: &SessionTx) -> Result<impl Iterator<Item = Result<Tuple>>> {
let lower = Tuple::default().encode_as_key(self.metadata.id); let lower = Tuple::default().encode_as_key(self.metadata.id);
let upper = Tuple::default().encode_as_key(self.metadata.id.next()?); let upper = Tuple::default().encode_as_key(self.metadata.id.next()?);
Ok(ViewRelIterator::new(&self.view_db, &lower, &upper)) Ok(ViewRelIterator::new(tx, &lower, &upper))
} }
pub(crate) fn scan_prefix(&self, prefix: &Tuple) -> impl Iterator<Item = Result<Tuple>> { pub(crate) fn scan_prefix(
&self,
tx: &SessionTx,
prefix: &Tuple,
) -> impl Iterator<Item = Result<Tuple>> {
let mut upper = prefix.0.clone(); let mut upper = prefix.0.clone();
upper.push(DataValue::Bot); upper.push(DataValue::Bot);
let prefix_encoded = prefix.encode_as_key(self.metadata.id); let prefix_encoded = prefix.encode_as_key(self.metadata.id);
let upper_encoded = Tuple(upper).encode_as_key(self.metadata.id); let upper_encoded = Tuple(upper).encode_as_key(self.metadata.id);
ViewRelIterator::new(&self.view_db, &prefix_encoded, &upper_encoded) ViewRelIterator::new(tx, &prefix_encoded, &upper_encoded)
} }
pub(crate) fn scan_bounded_prefix( pub(crate) fn scan_bounded_prefix(
&self, &self,
tx: &SessionTx,
prefix: &Tuple, prefix: &Tuple,
lower: &[DataValue], lower: &[DataValue],
upper: &[DataValue], upper: &[DataValue],
@ -107,7 +111,7 @@ impl ViewRelStore {
upper_t.0.push(DataValue::Bot); upper_t.0.push(DataValue::Bot);
let lower_encoded = lower_t.encode_as_key(self.metadata.id); let lower_encoded = lower_t.encode_as_key(self.metadata.id);
let upper_encoded = upper_t.encode_as_key(self.metadata.id); let upper_encoded = upper_t.encode_as_key(self.metadata.id);
ViewRelIterator::new(&self.view_db, &lower_encoded, &upper_encoded) ViewRelIterator::new(tx, &lower_encoded, &upper_encoded)
} }
} }
@ -117,13 +121,8 @@ struct ViewRelIterator {
} }
impl ViewRelIterator { impl ViewRelIterator {
fn new(db: &RocksDb, lower: &[u8], upper: &[u8]) -> Self { fn new(sess: &SessionTx, lower: &[u8], upper: &[u8]) -> Self {
let mut inner = db let mut inner = sess.tx.iterator(Snd).upper_bound(upper).start();
.transact()
.start()
.iterator(Snd)
.upper_bound(upper)
.start();
inner.seek(lower); inner.seek(lower);
Self { Self {
inner, inner,
@ -154,15 +153,13 @@ impl SessionTx {
pub(crate) fn view_exists(&self, name: &Symbol) -> Result<bool> { pub(crate) fn view_exists(&self, name: &Symbol) -> Result<bool> {
let key = DataValue::Str(name.0.clone()); let key = DataValue::Str(name.0.clone());
let encoded = Tuple(vec![key]).encode_as_key(ViewRelId::SYSTEM); let encoded = Tuple(vec![key]).encode_as_key(ViewRelId::SYSTEM);
let vtx = self.view_db.transact().start(); Ok(self.tx.exists(&encoded, false, Snd)?)
Ok(vtx.exists(&encoded, false, Snd)?)
} }
pub(crate) fn create_view_rel(&self, mut meta: ViewRelMetadata) -> Result<ViewRelStore> { pub(crate) fn create_view_rel(&mut self, mut meta: ViewRelMetadata) -> Result<ViewRelStore> {
let key = DataValue::Str(meta.name.0.clone()); let key = DataValue::Str(meta.name.0.clone());
let encoded = Tuple(vec![key]).encode_as_key(ViewRelId::SYSTEM); let encoded = Tuple(vec![key]).encode_as_key(ViewRelId::SYSTEM);
let mut vtx = self.view_db.transact().set_snapshot(true).start(); if self.tx.exists(&encoded, true, Snd)? {
if vtx.exists(&encoded, true, Snd)? {
bail!( bail!(
"cannot create view {}: one with the same name already exists", "cannot create view {}: one with the same name already exists",
meta.name meta.name
@ -170,50 +167,37 @@ impl SessionTx {
}; };
let last_id = self.view_store_id.fetch_add(1, Ordering::SeqCst); let last_id = self.view_store_id.fetch_add(1, Ordering::SeqCst);
meta.id = ViewRelId::new(last_id + 1)?; meta.id = ViewRelId::new(last_id + 1)?;
vtx.put(&encoded, &meta.id.raw_encode(), Snd)?; self.tx.put(&encoded, &meta.id.raw_encode(), Snd)?;
let name_key = let name_key =
Tuple(vec![DataValue::Str(meta.name.0.clone())]).encode_as_key(ViewRelId::SYSTEM); Tuple(vec![DataValue::Str(meta.name.0.clone())]).encode_as_key(ViewRelId::SYSTEM);
let mut meta_val = vec![]; let mut meta_val = vec![];
meta.serialize(&mut Serializer::new(&mut meta_val)).unwrap(); meta.serialize(&mut Serializer::new(&mut meta_val)).unwrap();
vtx.put(&name_key, &meta_val, Snd)?; self.tx.put(&name_key, &meta_val, Snd)?;
let tuple = Tuple(vec![DataValue::Null]); let tuple = Tuple(vec![DataValue::Null]);
let t_encoded = tuple.encode_as_key(ViewRelId::SYSTEM); let t_encoded = tuple.encode_as_key(ViewRelId::SYSTEM);
vtx.put(&t_encoded, &meta.id.raw_encode(), Snd)?; self.tx.put(&t_encoded, &meta.id.raw_encode(), Snd)?;
vtx.commit()?; Ok(ViewRelStore { metadata: meta })
Ok(ViewRelStore {
view_db: self.view_db.clone(),
metadata: meta,
})
} }
pub(crate) fn get_view_rel(&self, name: &Symbol) -> Result<ViewRelStore> { pub(crate) fn get_view_rel(&self, name: &Symbol) -> Result<ViewRelStore> {
let vtx = self.view_db.transact().start();
self.do_get_view_rel(name, &vtx)
}
fn do_get_view_rel(&self, name: &Symbol, vtx: &Tx) -> Result<ViewRelStore> {
let key = DataValue::Str(name.0.clone()); let key = DataValue::Str(name.0.clone());
let encoded = Tuple(vec![key]).encode_as_key(ViewRelId::SYSTEM); let encoded = Tuple(vec![key]).encode_as_key(ViewRelId::SYSTEM);
let found = vtx let found = self
.tx
.get(&encoded, true, Snd)? .get(&encoded, true, Snd)?
.ok_or_else(|| miette!("cannot find stored view {}", name))?; .ok_or_else(|| miette!("cannot find stored view {}", name))?;
let metadata: ViewRelMetadata = rmp_serde::from_slice(&found).into_diagnostic()?; let metadata: ViewRelMetadata = rmp_serde::from_slice(&found).into_diagnostic()?;
Ok(ViewRelStore { Ok(ViewRelStore { metadata })
view_db: self.view_db.clone(),
metadata,
})
} }
pub(crate) fn destroy_view_rel(&self, name: &Symbol) -> Result<()> { pub(crate) fn destroy_view_rel(&mut self, name: &Symbol) -> Result<(Vec<u8>, Vec<u8>)> {
let mut vtx = self.view_db.transact().start(); let store = self.get_view_rel(name)?;
let store = self.do_get_view_rel(name, &vtx)?;
let key = DataValue::Str(name.0.clone()); let key = DataValue::Str(name.0.clone());
let encoded = Tuple(vec![key]).encode_as_key(ViewRelId::SYSTEM); let encoded = Tuple(vec![key]).encode_as_key(ViewRelId::SYSTEM);
vtx.del(&encoded, Snd)?; self.tx.del(&encoded, Snd)?;
let lower_bound = Tuple::default().encode_as_key(store.metadata.id); let lower_bound = Tuple::default().encode_as_key(store.metadata.id);
let upper_bound = Tuple::default().encode_as_key(store.metadata.id.next()?); let upper_bound = Tuple::default().encode_as_key(store.metadata.id.next()?);
self.view_db.range_del(&lower_bound, &upper_bound, Snd)?; Ok((lower_bound, upper_bound))
vtx.commit()?;
Ok(())
} }
} }

Loading…
Cancel
Save