scratch DB

main
Ziyang Hu 2 years ago
parent bd077062e7
commit e316c44fca

@ -8,7 +8,7 @@ authors = ["Ziyang Hu"]
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
uuid = { version = "0.8", features = ["v1", "serde"] }
uuid = { version = "0.8", features = ["v1", "v4", "serde"] }
rand = "0.8.5"
anyhow = "1.0"
lazy_static = "1.4.0"

@ -23,7 +23,6 @@ struct RawRocksDbBridge {
inline ~RawRocksDbBridge() {
if (destroy_on_exit) {
cerr << "destroying database on exit: " << db_path << endl;
auto status = db->Close();
if (!status.ok()) {
cerr << status.ToString() << endl;
@ -36,6 +35,10 @@ struct RawRocksDbBridge {
}
}
inline void set_ignore_range_deletions(bool v) const {
r_opts->ignore_range_deletions = v;
}
[[nodiscard]] inline const string &get_db_path() const {
return db_path;
}
@ -59,15 +62,15 @@ struct RawRocksDbBridge {
write_status(s, status);
}
inline void put(RustBytes key, RustBytes val, RocksDbStatus &status) {
inline void put(RustBytes key, RustBytes val, RocksDbStatus &status) const {
write_status(db->Put(*w_opts, convert_slice(key), convert_slice(val)), status);
}
inline void del(RustBytes key, RocksDbStatus &status) {
inline void del(RustBytes key, RocksDbStatus &status) const {
write_status(db->Delete(*w_opts, convert_slice(key)), status);
}
inline void del_range(RustBytes start, RustBytes end, RocksDbStatus &status) {
inline void del_range(RustBytes start, RustBytes end, RocksDbStatus &status) const {
write_status(db->DeleteRange(*w_opts, db->DefaultColumnFamily(), convert_slice(start), convert_slice(end)),
status);
}

@ -19,13 +19,13 @@ struct IterBridge {
Slice upper_bound;
unique_ptr<ReadOptions> r_opts;
explicit IterBridge(Transaction *tx_) : tx(tx_), iter(nullptr), lower_bound(), upper_bound(),
explicit IterBridge(Transaction *tx_) : db(nullptr), tx(tx_), iter(nullptr), lower_bound(), upper_bound(),
r_opts(new ReadOptions) {
r_opts->ignore_range_deletions = true;
r_opts->auto_prefix_mode = true;
}
explicit IterBridge(DB *db_) : db(db_), iter(nullptr), lower_bound(), upper_bound(),
explicit IterBridge(DB *db_) : db(db_), tx(nullptr), iter(nullptr), lower_bound(), upper_bound(),
r_opts(new ReadOptions) {
r_opts->ignore_range_deletions = true;
r_opts->auto_prefix_mode = true;

@ -1,7 +1,10 @@
use std::borrow::Cow;
use cxx::*;
use crate::bridge::ffi::*;
use crate::bridge::tx::TxBuilder;
use cxx::*;
use std::borrow::Cow;
use crate::{IterBuilder, PinSlice};
#[derive(Default)]
pub struct DbBuilder<'a> {
@ -134,6 +137,98 @@ impl<'a> DbBuilder<'a> {
Err(status)
}
}
pub fn build_raw(self, no_wal: bool) -> Result<RawRocksDb, RocksDbStatus> {
let mut status = RocksDbStatus::default();
fn dummy(_a: &[u8], _b: &[u8]) -> i8 {
0
}
let result = open_raw_db(
&self.opts,
&mut status,
self.cmp_fn.is_some(),
self.cmp_fn.unwrap_or(dummy),
no_wal,
);
if status.is_ok() {
Ok(RawRocksDb { inner: result })
} else {
Err(status)
}
}
}
#[derive(Clone)]
pub struct RawRocksDb {
inner: SharedPtr<RawRocksDbBridge>,
}
impl RawRocksDb {
pub fn db_path(&self) -> Cow<str> {
self.inner.get_db_path().to_string_lossy()
}
pub fn ignore_range_deletions(self, val: bool) -> Self {
self.inner.set_ignore_range_deletions(val);
self
}
#[inline]
pub fn iterator(&self) -> IterBuilder {
IterBuilder {
inner: self.inner.iterator(),
}
.auto_prefix_mode(true)
}
#[inline]
pub fn put(&mut self, key: &[u8], val: &[u8]) -> Result<(), RocksDbStatus> {
let mut status = RocksDbStatus::default();
self.inner.put(key, val, &mut status);
if status.is_ok() {
Ok(())
} else {
Err(status)
}
}
#[inline]
pub fn del(&mut self, key: &[u8]) -> Result<(), RocksDbStatus> {
let mut status = RocksDbStatus::default();
self.inner.del(key, &mut status);
if status.is_ok() {
Ok(())
} else {
Err(status)
}
}
#[inline]
pub fn range_del(&mut self, lower: &[u8], upper: &[u8]) -> Result<(), RocksDbStatus> {
let mut status = RocksDbStatus::default();
self.inner.del_range(lower, upper, &mut status);
if status.is_ok() {
Ok(())
} else {
Err(status)
}
}
#[inline]
pub fn get(&self, key: &[u8]) -> Result<Option<PinSlice>, RocksDbStatus> {
let mut status = RocksDbStatus::default();
let ret = self.inner.get(key, &mut status);
match status.code {
StatusCode::kOk => Ok(Some(PinSlice { inner: ret })),
StatusCode::kNotFound => Ok(None),
_ => Err(status),
}
}
#[inline]
pub fn exists(&self, key: &[u8]) -> Result<bool, RocksDbStatus> {
let mut status = RocksDbStatus::default();
self.inner.exists(key, &mut status);
match status.code {
StatusCode::kOk => Ok(true),
StatusCode::kNotFound => Ok(false),
_ => Err(status),
}
}
}
#[derive(Clone)]
@ -153,4 +248,9 @@ impl RocksDb {
}
unsafe impl Send for RocksDb {}
unsafe impl Sync for RocksDb {}
unsafe impl Send for RawRocksDb {}
unsafe impl Sync for RawRocksDb {}

@ -1,6 +1,7 @@
use crate::bridge::ffi::*;
use cxx::UniquePtr;
use crate::bridge::ffi::*;
pub struct IterBuilder {
pub(crate) inner: UniquePtr<IterBridge>,
}

@ -116,6 +116,7 @@ pub(crate) mod ffi {
cmp_impl: fn(&[u8], &[u8]) -> i8,
no_wal: bool,
) -> SharedPtr<RawRocksDbBridge>;
fn set_ignore_range_deletions(self: &RawRocksDbBridge, val: bool);
fn iterator(self: &RawRocksDbBridge) -> UniquePtr<IterBridge>;
fn get(
self: &RawRocksDbBridge,
@ -123,15 +124,10 @@ pub(crate) mod ffi {
status: &mut RocksDbStatus,
) -> UniquePtr<PinnableSlice>;
fn exists(self: &RawRocksDbBridge, key: &[u8], status: &mut RocksDbStatus);
fn put(
self: Pin<&mut RawRocksDbBridge>,
key: &[u8],
val: &[u8],
status: &mut RocksDbStatus,
);
fn del(self: Pin<&mut RawRocksDbBridge>, key: &[u8], status: &mut RocksDbStatus);
fn put(self: &RawRocksDbBridge, key: &[u8], val: &[u8], status: &mut RocksDbStatus);
fn del(self: &RawRocksDbBridge, key: &[u8], status: &mut RocksDbStatus);
fn del_range(
self: Pin<&mut RawRocksDbBridge>,
self: &RawRocksDbBridge,
lower: &[u8],
upper: &[u8],
status: &mut RocksDbStatus,

@ -11,7 +11,7 @@ pub struct TxBuilder {
}
pub struct PinSlice {
inner: UniquePtr<PinnableSlice>,
pub(crate) inner: UniquePtr<PinnableSlice>,
}
impl Deref for PinSlice {
@ -172,6 +172,6 @@ impl Tx {
IterBuilder {
inner: self.inner.iterator(),
}
.auto_prefix_mode(true)
.auto_prefix_mode(true)
}
}

@ -1,9 +1,5 @@
pub(crate) mod bridge;
#[cfg(test)]
mod tests;
pub use bridge::db::DbBuilder;
pub use bridge::db::RawRocksDb;
pub use bridge::db::RocksDb;
pub use bridge::ffi::RocksDbStatus;
pub use bridge::ffi::StatusCode;
@ -14,3 +10,8 @@ pub use bridge::iter::IterBuilder;
pub use bridge::tx::PinSlice;
pub use bridge::tx::Tx;
pub use bridge::tx::TxBuilder;
pub(crate) mod bridge;
#[cfg(test)]
mod tests;

@ -1,11 +1,13 @@
use std::cmp::{min, Ordering};
use anyhow::Result;
use rmp_serde::Serializer;
use serde::Serialize;
use anyhow::Result;
use crate::data::value::DataValue;
pub(crate) const SCRATCH_DB_KEY_PREFIX_LEN: usize = 4;
#[derive(Debug, thiserror::Error)]
pub enum TupleError {
#[error("bad data: {0} for {1:x?}")]
@ -145,7 +147,7 @@ impl<'a> Iterator for EncodedTupleIter<'a> {
}
}
pub(crate) fn rusty_temp_cmp(a: &[u8], b: &[u8]) -> i8 {
pub(crate) fn rusty_scratch_cmp(a: &[u8], b: &[u8]) -> i8 {
let a = EncodedTuple(a);
let b = EncodedTuple(b);
match a.prefix().unwrap().cmp(&b.prefix().unwrap()) {

@ -1,31 +1,36 @@
use std::collections::BTreeMap;
use std::env::temp_dir;
use std::fmt::{Debug, Formatter};
use std::sync::atomic::{AtomicU32, AtomicU64, AtomicUsize, Ordering};
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use anyhow::Result;
use itertools::Itertools;
use serde_json::json;
use uuid::Uuid;
use cozorocks::{DbBuilder, DbIter, RocksDb};
use cozorocks::{DbBuilder, DbIter, RawRocksDb, RocksDb};
use crate::AttrTxItem;
use crate::data::compare::{DB_KEY_PREFIX_LEN, rusty_cmp};
use crate::data::compare::{rusty_cmp, DB_KEY_PREFIX_LEN};
use crate::data::encode::{
decode_ea_key, decode_value_from_key, decode_value_from_val, encode_eav_key, StorageTag,
};
use crate::data::id::{AttrId, EntityId, TxId, Validity};
use crate::data::json::JsonValue;
use crate::data::triple::StoreOp;
use crate::data::tuple::{rusty_scratch_cmp, SCRATCH_DB_KEY_PREFIX_LEN};
use crate::data::value::DataValue;
use crate::runtime::transact::SessionTx;
use crate::transact::pull::CurrentPath;
use crate::AttrTxItem;
pub struct Db {
db: RocksDb,
scratch: RawRocksDb,
last_attr_id: Arc<AtomicU64>,
last_ent_id: Arc<AtomicU64>,
last_tx_id: Arc<AtomicU64>,
last_scratch_id: Arc<AtomicU32>,
n_sessions: Arc<AtomicUsize>,
session_id: usize,
}
@ -47,11 +52,25 @@ impl Db {
.use_capped_prefix_extractor(true, DB_KEY_PREFIX_LEN)
.use_custom_comparator("cozo_rusty_cmp", rusty_cmp, false)
.build()?;
let mut temp_db_location = temp_dir();
temp_db_location.push(format!("{}.cozo", Uuid::new_v4()));
let scratch = DbBuilder::default()
.path(temp_db_location.to_str().unwrap())
.create_if_missing(true)
.destroy_on_exit(true)
.use_bloom_filter(true, 10., true)
.use_capped_prefix_extractor(true, SCRATCH_DB_KEY_PREFIX_LEN)
.use_custom_comparator("cozo_rusty_scratch_cmp", rusty_scratch_cmp, false)
.build_raw(true)?
.ignore_range_deletions(true);
let ret = Self {
db,
scratch,
last_attr_id: Arc::new(Default::default()),
last_ent_id: Arc::new(Default::default()),
last_tx_id: Arc::new(Default::default()),
last_scratch_id: Arc::new(Default::default()),
n_sessions: Arc::new(Default::default()),
session_id: Default::default(),
};
@ -64,9 +83,11 @@ impl Db {
Ok(Self {
db: self.db.clone(),
scratch: self.scratch.clone(),
last_attr_id: self.last_attr_id.clone(),
last_ent_id: self.last_ent_id.clone(),
last_tx_id: self.last_tx_id.clone(),
last_scratch_id: self.last_scratch_id.clone(),
n_sessions: self.n_sessions.clone(),
session_id: old_count + 1,
})

Loading…
Cancel
Save