From e316c44fcafc139eabc6dbcda085174c2167cbdf Mon Sep 17 00:00:00 2001 From: Ziyang Hu Date: Tue, 19 Jul 2022 22:15:27 +0800 Subject: [PATCH] scratch DB --- Cargo.toml | 2 +- cozorocks/bridge/db.h | 11 ++-- cozorocks/bridge/iter.h | 4 +- cozorocks/src/bridge/db.rs | 104 ++++++++++++++++++++++++++++++++++- cozorocks/src/bridge/iter.rs | 3 +- cozorocks/src/bridge/mod.rs | 12 ++-- cozorocks/src/bridge/tx.rs | 4 +- cozorocks/src/lib.rs | 11 ++-- src/data/tuple.rs | 6 +- src/runtime/db.rs | 29 ++++++++-- 10 files changed, 155 insertions(+), 31 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 3745c77d..3e424c01 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,7 @@ authors = ["Ziyang Hu"] # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -uuid = { version = "0.8", features = ["v1", "serde"] } +uuid = { version = "0.8", features = ["v1", "v4", "serde"] } rand = "0.8.5" anyhow = "1.0" lazy_static = "1.4.0" diff --git a/cozorocks/bridge/db.h b/cozorocks/bridge/db.h index 599b93b0..0973e1f4 100644 --- a/cozorocks/bridge/db.h +++ b/cozorocks/bridge/db.h @@ -23,7 +23,6 @@ struct RawRocksDbBridge { inline ~RawRocksDbBridge() { if (destroy_on_exit) { - cerr << "destroying database on exit: " << db_path << endl; auto status = db->Close(); if (!status.ok()) { cerr << status.ToString() << endl; @@ -36,6 +35,10 @@ struct RawRocksDbBridge { } } + inline void set_ignore_range_deletions(bool v) const { + r_opts->ignore_range_deletions = v; + } + [[nodiscard]] inline const string &get_db_path() const { return db_path; } @@ -59,15 +62,15 @@ struct RawRocksDbBridge { write_status(s, status); } - inline void put(RustBytes key, RustBytes val, RocksDbStatus &status) { + inline void put(RustBytes key, RustBytes val, RocksDbStatus &status) const { write_status(db->Put(*w_opts, convert_slice(key), convert_slice(val)), status); } - inline void del(RustBytes key, RocksDbStatus &status) { + inline void del(RustBytes key, RocksDbStatus &status) const { write_status(db->Delete(*w_opts, convert_slice(key)), status); } - inline void del_range(RustBytes start, RustBytes end, RocksDbStatus &status) { + inline void del_range(RustBytes start, RustBytes end, RocksDbStatus &status) const { write_status(db->DeleteRange(*w_opts, db->DefaultColumnFamily(), convert_slice(start), convert_slice(end)), status); } diff --git a/cozorocks/bridge/iter.h b/cozorocks/bridge/iter.h index c2bc4641..7ffc55fb 100644 --- a/cozorocks/bridge/iter.h +++ b/cozorocks/bridge/iter.h @@ -19,13 +19,13 @@ struct IterBridge { Slice upper_bound; unique_ptr r_opts; - explicit IterBridge(Transaction *tx_) : tx(tx_), iter(nullptr), lower_bound(), upper_bound(), + explicit IterBridge(Transaction *tx_) : db(nullptr), tx(tx_), iter(nullptr), lower_bound(), upper_bound(), r_opts(new ReadOptions) { r_opts->ignore_range_deletions = true; r_opts->auto_prefix_mode = true; } - explicit IterBridge(DB *db_) : db(db_), iter(nullptr), lower_bound(), upper_bound(), + explicit IterBridge(DB *db_) : db(db_), tx(nullptr), iter(nullptr), lower_bound(), upper_bound(), r_opts(new ReadOptions) { r_opts->ignore_range_deletions = true; r_opts->auto_prefix_mode = true; diff --git a/cozorocks/src/bridge/db.rs b/cozorocks/src/bridge/db.rs index 933e0c80..3cae60db 100644 --- a/cozorocks/src/bridge/db.rs +++ b/cozorocks/src/bridge/db.rs @@ -1,7 +1,10 @@ +use std::borrow::Cow; + +use cxx::*; + use crate::bridge::ffi::*; use crate::bridge::tx::TxBuilder; -use cxx::*; -use std::borrow::Cow; +use crate::{IterBuilder, PinSlice}; #[derive(Default)] pub struct DbBuilder<'a> { @@ -134,6 +137,98 @@ impl<'a> DbBuilder<'a> { Err(status) } } + pub fn build_raw(self, no_wal: bool) -> Result { + let mut status = RocksDbStatus::default(); + + fn dummy(_a: &[u8], _b: &[u8]) -> i8 { + 0 + } + + let result = open_raw_db( + &self.opts, + &mut status, + self.cmp_fn.is_some(), + self.cmp_fn.unwrap_or(dummy), + no_wal, + ); + if status.is_ok() { + Ok(RawRocksDb { inner: result }) + } else { + Err(status) + } + } +} + +#[derive(Clone)] +pub struct RawRocksDb { + inner: SharedPtr, +} + +impl RawRocksDb { + pub fn db_path(&self) -> Cow { + self.inner.get_db_path().to_string_lossy() + } + pub fn ignore_range_deletions(self, val: bool) -> Self { + self.inner.set_ignore_range_deletions(val); + self + } + #[inline] + pub fn iterator(&self) -> IterBuilder { + IterBuilder { + inner: self.inner.iterator(), + } + .auto_prefix_mode(true) + } + #[inline] + pub fn put(&mut self, key: &[u8], val: &[u8]) -> Result<(), RocksDbStatus> { + let mut status = RocksDbStatus::default(); + self.inner.put(key, val, &mut status); + if status.is_ok() { + Ok(()) + } else { + Err(status) + } + } + #[inline] + pub fn del(&mut self, key: &[u8]) -> Result<(), RocksDbStatus> { + let mut status = RocksDbStatus::default(); + self.inner.del(key, &mut status); + if status.is_ok() { + Ok(()) + } else { + Err(status) + } + } + #[inline] + pub fn range_del(&mut self, lower: &[u8], upper: &[u8]) -> Result<(), RocksDbStatus> { + let mut status = RocksDbStatus::default(); + self.inner.del_range(lower, upper, &mut status); + if status.is_ok() { + Ok(()) + } else { + Err(status) + } + } + #[inline] + pub fn get(&self, key: &[u8]) -> Result, RocksDbStatus> { + let mut status = RocksDbStatus::default(); + let ret = self.inner.get(key, &mut status); + match status.code { + StatusCode::kOk => Ok(Some(PinSlice { inner: ret })), + StatusCode::kNotFound => Ok(None), + _ => Err(status), + } + } + #[inline] + pub fn exists(&self, key: &[u8]) -> Result { + let mut status = RocksDbStatus::default(); + self.inner.exists(key, &mut status); + match status.code { + StatusCode::kOk => Ok(true), + StatusCode::kNotFound => Ok(false), + _ => Err(status), + } + } } #[derive(Clone)] @@ -153,4 +248,9 @@ impl RocksDb { } unsafe impl Send for RocksDb {} + unsafe impl Sync for RocksDb {} + +unsafe impl Send for RawRocksDb {} + +unsafe impl Sync for RawRocksDb {} diff --git a/cozorocks/src/bridge/iter.rs b/cozorocks/src/bridge/iter.rs index c8aca352..565b9c9d 100644 --- a/cozorocks/src/bridge/iter.rs +++ b/cozorocks/src/bridge/iter.rs @@ -1,6 +1,7 @@ -use crate::bridge::ffi::*; use cxx::UniquePtr; +use crate::bridge::ffi::*; + pub struct IterBuilder { pub(crate) inner: UniquePtr, } diff --git a/cozorocks/src/bridge/mod.rs b/cozorocks/src/bridge/mod.rs index 71e8625b..cdf46114 100644 --- a/cozorocks/src/bridge/mod.rs +++ b/cozorocks/src/bridge/mod.rs @@ -116,6 +116,7 @@ pub(crate) mod ffi { cmp_impl: fn(&[u8], &[u8]) -> i8, no_wal: bool, ) -> SharedPtr; + fn set_ignore_range_deletions(self: &RawRocksDbBridge, val: bool); fn iterator(self: &RawRocksDbBridge) -> UniquePtr; fn get( self: &RawRocksDbBridge, @@ -123,15 +124,10 @@ pub(crate) mod ffi { status: &mut RocksDbStatus, ) -> UniquePtr; fn exists(self: &RawRocksDbBridge, key: &[u8], status: &mut RocksDbStatus); - fn put( - self: Pin<&mut RawRocksDbBridge>, - key: &[u8], - val: &[u8], - status: &mut RocksDbStatus, - ); - fn del(self: Pin<&mut RawRocksDbBridge>, key: &[u8], status: &mut RocksDbStatus); + fn put(self: &RawRocksDbBridge, key: &[u8], val: &[u8], status: &mut RocksDbStatus); + fn del(self: &RawRocksDbBridge, key: &[u8], status: &mut RocksDbStatus); fn del_range( - self: Pin<&mut RawRocksDbBridge>, + self: &RawRocksDbBridge, lower: &[u8], upper: &[u8], status: &mut RocksDbStatus, diff --git a/cozorocks/src/bridge/tx.rs b/cozorocks/src/bridge/tx.rs index 2f2ba329..19606a41 100644 --- a/cozorocks/src/bridge/tx.rs +++ b/cozorocks/src/bridge/tx.rs @@ -11,7 +11,7 @@ pub struct TxBuilder { } pub struct PinSlice { - inner: UniquePtr, + pub(crate) inner: UniquePtr, } impl Deref for PinSlice { @@ -172,6 +172,6 @@ impl Tx { IterBuilder { inner: self.inner.iterator(), } - .auto_prefix_mode(true) + .auto_prefix_mode(true) } } diff --git a/cozorocks/src/lib.rs b/cozorocks/src/lib.rs index 4d2041b4..2fa00c5a 100644 --- a/cozorocks/src/lib.rs +++ b/cozorocks/src/lib.rs @@ -1,9 +1,5 @@ -pub(crate) mod bridge; - -#[cfg(test)] -mod tests; - pub use bridge::db::DbBuilder; +pub use bridge::db::RawRocksDb; pub use bridge::db::RocksDb; pub use bridge::ffi::RocksDbStatus; pub use bridge::ffi::StatusCode; @@ -14,3 +10,8 @@ pub use bridge::iter::IterBuilder; pub use bridge::tx::PinSlice; pub use bridge::tx::Tx; pub use bridge::tx::TxBuilder; + +pub(crate) mod bridge; + +#[cfg(test)] +mod tests; diff --git a/src/data/tuple.rs b/src/data/tuple.rs index 353325ed..903f71c6 100644 --- a/src/data/tuple.rs +++ b/src/data/tuple.rs @@ -1,11 +1,13 @@ use std::cmp::{min, Ordering}; +use anyhow::Result; use rmp_serde::Serializer; use serde::Serialize; -use anyhow::Result; use crate::data::value::DataValue; +pub(crate) const SCRATCH_DB_KEY_PREFIX_LEN: usize = 4; + #[derive(Debug, thiserror::Error)] pub enum TupleError { #[error("bad data: {0} for {1:x?}")] @@ -145,7 +147,7 @@ impl<'a> Iterator for EncodedTupleIter<'a> { } } -pub(crate) fn rusty_temp_cmp(a: &[u8], b: &[u8]) -> i8 { +pub(crate) fn rusty_scratch_cmp(a: &[u8], b: &[u8]) -> i8 { let a = EncodedTuple(a); let b = EncodedTuple(b); match a.prefix().unwrap().cmp(&b.prefix().unwrap()) { diff --git a/src/runtime/db.rs b/src/runtime/db.rs index 8fec2dcb..09506c16 100644 --- a/src/runtime/db.rs +++ b/src/runtime/db.rs @@ -1,31 +1,36 @@ use std::collections::BTreeMap; +use std::env::temp_dir; use std::fmt::{Debug, Formatter}; +use std::sync::atomic::{AtomicU32, AtomicU64, AtomicUsize, Ordering}; use std::sync::Arc; -use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; use anyhow::Result; use itertools::Itertools; use serde_json::json; +use uuid::Uuid; -use cozorocks::{DbBuilder, DbIter, RocksDb}; +use cozorocks::{DbBuilder, DbIter, RawRocksDb, RocksDb}; -use crate::AttrTxItem; -use crate::data::compare::{DB_KEY_PREFIX_LEN, rusty_cmp}; +use crate::data::compare::{rusty_cmp, DB_KEY_PREFIX_LEN}; use crate::data::encode::{ decode_ea_key, decode_value_from_key, decode_value_from_val, encode_eav_key, StorageTag, }; use crate::data::id::{AttrId, EntityId, TxId, Validity}; use crate::data::json::JsonValue; use crate::data::triple::StoreOp; +use crate::data::tuple::{rusty_scratch_cmp, SCRATCH_DB_KEY_PREFIX_LEN}; use crate::data::value::DataValue; use crate::runtime::transact::SessionTx; use crate::transact::pull::CurrentPath; +use crate::AttrTxItem; pub struct Db { db: RocksDb, + scratch: RawRocksDb, last_attr_id: Arc, last_ent_id: Arc, last_tx_id: Arc, + last_scratch_id: Arc, n_sessions: Arc, session_id: usize, } @@ -47,11 +52,25 @@ impl Db { .use_capped_prefix_extractor(true, DB_KEY_PREFIX_LEN) .use_custom_comparator("cozo_rusty_cmp", rusty_cmp, false) .build()?; + let mut temp_db_location = temp_dir(); + temp_db_location.push(format!("{}.cozo", Uuid::new_v4())); + + let scratch = DbBuilder::default() + .path(temp_db_location.to_str().unwrap()) + .create_if_missing(true) + .destroy_on_exit(true) + .use_bloom_filter(true, 10., true) + .use_capped_prefix_extractor(true, SCRATCH_DB_KEY_PREFIX_LEN) + .use_custom_comparator("cozo_rusty_scratch_cmp", rusty_scratch_cmp, false) + .build_raw(true)? + .ignore_range_deletions(true); let ret = Self { db, + scratch, last_attr_id: Arc::new(Default::default()), last_ent_id: Arc::new(Default::default()), last_tx_id: Arc::new(Default::default()), + last_scratch_id: Arc::new(Default::default()), n_sessions: Arc::new(Default::default()), session_id: Default::default(), }; @@ -64,9 +83,11 @@ impl Db { Ok(Self { db: self.db.clone(), + scratch: self.scratch.clone(), last_attr_id: self.last_attr_id.clone(), last_ent_id: self.last_ent_id.clone(), last_tx_id: self.last_tx_id.clone(), + last_scratch_id: self.last_scratch_id.clone(), n_sessions: self.n_sessions.clone(), session_id: old_count + 1, })