From c364e9ae544c9aba8980c7fa8e208bac6ce764de Mon Sep 17 00:00:00 2001 From: Ziyang Hu Date: Mon, 4 Jul 2022 16:04:23 +0800 Subject: [PATCH] starting DB --- cozorocks/src/bridge/db.rs | 2 +- cozorocks/src/tests.rs | 17 ++++- src/data/compare.rs | 2 +- src/data/encode.rs | 8 +- src/data/id.rs | 25 ++++--- src/lib.rs | 4 + src/runtime/instance.rs | 145 +++++++++++++++---------------------- src/runtime/mod.rs | 4 +- src/runtime/transact.rs | 64 ++++++++++++++++ src/tests.rs | 19 +++++ 10 files changed, 183 insertions(+), 107 deletions(-) create mode 100644 src/runtime/transact.rs create mode 100644 src/tests.rs diff --git a/cozorocks/src/bridge/db.rs b/cozorocks/src/bridge/db.rs index 2f720f7a..fbaa587a 100644 --- a/cozorocks/src/bridge/db.rs +++ b/cozorocks/src/bridge/db.rs @@ -104,7 +104,7 @@ impl<'a> DbBuilder<'a> { pub fn use_custom_comparator( mut self, name: &'a str, - cmp: fn(&[u8], &[u8]) -> i8, + cmp: extern "C" fn(&[u8], &[u8]) -> i8, different_bytes_can_be_equal: bool, ) -> Self { self.opts.comparator_name = name; diff --git a/cozorocks/src/tests.rs b/cozorocks/src/tests.rs index fab49a3f..04295969 100644 --- a/cozorocks/src/tests.rs +++ b/cozorocks/src/tests.rs @@ -1,9 +1,19 @@ use crate::*; -fn test_comparator(a: &[u8], b: &[u8]) -> i8 { +#[allow(improper_ctypes_definitions)] +#[no_mangle] +extern "C" fn test_comparator(a: &[u8], b: &[u8]) -> i8 { use std::cmp::Ordering::*; + let res = a.cmp(b); - match a.cmp(b) { + // println!( + // "comparator called: {} vs {} => {:?}", + // String::from_utf8_lossy(a), + // String::from_utf8_lossy(b), + // res + // ); + + match res { Equal => 0, Greater => 1, Less => -1, @@ -36,8 +46,7 @@ fn creation() { ); assert!(tx.get("bye".as_bytes(), false).unwrap().is_none()); - let mut it = tx.iterator() - .total_order_seek(true).start(); + let mut it = tx.iterator().total_order_seek(true).start(); it.seek_to_start(); while let Some((k, v)) = it.pair().unwrap() { let mut res = String::from_utf8_lossy(k); diff --git a/src/data/compare.rs b/src/data/compare.rs index c7c1276c..c2ac3de7 100644 --- a/src/data/compare.rs +++ b/src/data/compare.rs @@ -6,7 +6,7 @@ use std::cmp::Ordering; #[allow(improper_ctypes_definitions)] #[no_mangle] -extern "C" fn rusty_cmp(a: &[u8], b: &[u8]) -> i8 { +pub(crate) extern "C" fn rusty_cmp(a: &[u8], b: &[u8]) -> i8 { match compare_key(a, b) { Ordering::Greater => 1, Ordering::Equal => 0, diff --git a/src/data/encode.rs b/src/data/encode.rs index eb560c81..6869650f 100644 --- a/src/data/encode.rs +++ b/src/data/encode.rs @@ -291,7 +291,11 @@ pub(crate) fn decode_attr_key_by_id(src: &[u8]) -> Result<(AttrId, TxId, StoreOp /// tx: 8 bytes (incl. op) /// attr as kw: variable (segmented by \0) #[inline] -pub(crate) fn encode_attr_by_kw(attr_name: Keyword, tx: TxId, op: StoreOp) -> Encoded { +pub(crate) fn encode_attr_by_kw( + attr_name: Keyword, + tx: TxId, + op: StoreOp, +) -> Encoded { let mut ret = SmallVec::<[u8; LARGE_VEC_SIZE]>::new(); ret.push(StorageTag::AttrByKeyword as u8); let ns_bytes = attr_name.ns.as_bytes(); @@ -328,7 +332,7 @@ pub(crate) fn encode_tx(tx: TxId) -> Encoded { } #[inline] -pub(crate) fn encode_unique_entity_placeholder(eid: EntityId) -> Encoded { +pub(crate) fn encode_unique_entity(eid: EntityId) -> Encoded { let mut ret = SmallVec::<[u8; VEC_SIZE_8]>::new(); ret.extend(eid.0.to_be_bytes()); ret[0] = StorageTag::UniqueEntity as u8; diff --git a/src/data/id.rs b/src/data/id.rs index e70081ad..ecd2c792 100644 --- a/src/data/id.rs +++ b/src/data/id.rs @@ -5,6 +5,11 @@ use std::fmt::{Debug, Formatter}; pub struct EntityId(pub u64); impl EntityId { + pub(crate) const MAX_SYS: EntityId = EntityId(1000); + pub(crate) const MAX_TEMP: EntityId = EntityId(10_000_000); + pub(crate) const MIN_PERM: EntityId = EntityId(10_000_001); + pub(crate) const MAX_PERM: EntityId = EntityId(0x00ff_ffff_ff00_0000); + pub(crate) fn from_bytes(b: &[u8]) -> Self { EntityId(u64::from_be_bytes([ 0, b[1], b[2], b[3], b[4], b[5], b[6], b[7], @@ -18,14 +23,15 @@ impl Debug for EntityId { } } -pub(crate) const MAX_SYS_ENTITY_ID: EntityId = EntityId(1000); -pub(crate) const MAX_TEMP_ENTITY_ID: EntityId = EntityId(10_000_000); -pub(crate) const MAX_PERM_ENTITY_ID: EntityId = EntityId(0x00ff_ffff_ff00_0000); - #[derive(Clone, Copy, PartialEq, Ord, PartialOrd, Eq, Deserialize, Serialize, Hash)] pub struct AttrId(pub u64); impl AttrId { + pub(crate) const MAX_SYS: AttrId = AttrId(1000); + pub(crate) const MAX_TEMP: AttrId = AttrId(10_000_000); + pub(crate) const MIN_PERM: AttrId = AttrId(10_000_001); + pub(crate) const MAX_PERM: AttrId = AttrId(0x00ff_ffff_ff00_0000); + pub(crate) fn from_bytes(b: &[u8]) -> Self { AttrId(u64::from_be_bytes([ 0, b[1], b[2], b[3], b[4], b[5], b[6], b[7], @@ -39,14 +45,14 @@ impl Debug for AttrId { } } -pub(crate) const MAX_SYS_ATTR_ID: AttrId = AttrId(1000); -pub(crate) const MAX_TEMP_ATTR_ID: AttrId = AttrId(10_000_000); -pub(crate) const MAX_PERM_ATTR_ID: AttrId = AttrId(0x00ff_ffff_ff00_0000); - #[derive(Clone, Copy, PartialEq, Ord, PartialOrd, Eq, Deserialize, Serialize, Hash)] pub struct TxId(pub u64); impl TxId { + pub(crate) const MAX_SYS: TxId = TxId(1000); + pub(crate) const MIN_USER: TxId = TxId(1001); + pub(crate) const MAX_USER: TxId = TxId(0x00ff_ffff_ffff_ffff); + pub(crate) fn from_bytes(b: &[u8]) -> Self { TxId(u64::from_be_bytes([ 0, b[1], b[2], b[3], b[4], b[5], b[6], b[7], @@ -59,6 +65,3 @@ impl Debug for TxId { write!(f, "t{}", self.0) } } - -pub(crate) const MAX_SYS_TX_ID: TxId = TxId(1000); -pub(crate) const MAX_USER_TX_ID: TxId = TxId(0x00ff_ffff_ffff_ffff); diff --git a/src/lib.rs b/src/lib.rs index a83ac8de..b4877587 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -7,3 +7,7 @@ static GLOBAL: Jemalloc = Jemalloc; pub(crate) mod data; pub(crate) mod runtime; +#[cfg(test)] +mod tests; + +pub use runtime::instance::Db; \ No newline at end of file diff --git a/src/runtime/instance.rs b/src/runtime/instance.rs index 31be58f0..95160896 100644 --- a/src/runtime/instance.rs +++ b/src/runtime/instance.rs @@ -1,95 +1,66 @@ -use cozorocks::RocksDb; -use std::sync::atomic::{AtomicU32, AtomicU64}; -use std::sync::{Arc, Mutex}; +use crate::data::compare::{rusty_cmp, DB_KEY_PREFIX_LEN}; +use crate::runtime::transact::SessionTx; +use anyhow::Result; +use cozorocks::{DbBuilder, RocksDb}; +use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; +use std::sync::Arc; -// use crate::data::compare::DB_KEY_PREFIX_LEN; -// use anyhow::Result; -// use cozorocks::*; -// use std::sync::atomic::{AtomicU32, AtomicU64}; -// use std::sync::{Arc, Mutex}; -// -pub struct DbInstance { +pub struct Db { db: RocksDb, - last_attr_id: Arc, + last_attr_id: Arc, last_ent_id: Arc, last_tx_id: Arc, - sessions: Mutex>>>, + n_sessions: Arc, + session_id: usize, } -struct SessionHandle { - id: usize, - db: RocksDb, - last_attr_id: Arc, - last_ent_id: Arc, - last_tx_id: Arc, - status: SessionStatus, -} +impl Db { + pub fn build(builder: DbBuilder) -> Result { + let db = builder + .use_bloom_filter(true, 10., true) + .use_capped_prefix_extractor(true, DB_KEY_PREFIX_LEN) + .use_custom_comparator("cozo_rusty_cmp", rusty_cmp, false) + .build()?; + Ok(Self { + db, + last_attr_id: Arc::new(Default::default()), + last_ent_id: Arc::new(Default::default()), + last_tx_id: Arc::new(Default::default()), + n_sessions: Arc::new(Default::default()), + session_id: Default::default(), + }) + } -#[derive(Eq, PartialEq, Debug, Clone, Copy)] -pub enum SessionStatus { - Prepared, - Running, - Completed, -} + pub fn new_session(&self) -> Result { + if self.session_id == 0 { + self.load_last_ids()?; + } + + let old_count = self.n_sessions.fetch_add(1, Ordering::AcqRel); -// impl DbInstance { -// pub fn new(path: &str, optimistic: bool, destroy_on_close: bool) -> Result { -// let mut db_opts = Options::new().within_unique_ptr(); -// set_opts_create_if_missing(db_opts.pin_mut(), true); -// set_opts_bloom_filter(db_opts.pin_mut(), 10., true); -// set_opts_capped_prefix_extractor(db_opts.pin_mut(), DB_KEY_PREFIX_LEN); -// -// let (db, tdb_opts, odb_opts) = if optimistic { -// let o = new_odb_opts(); -// // let db = DbBridge::new_odb(path, &db_opts, &o)?; -// let db = todo!(); -// (db, None, Some(o)) -// } else { -// let o = new_tdb_opts(); -// let db = DbBridge::new_tdb(path, &db_opts, &o)?; -// // let db = todo!(); -// (db, Some(new_tdb_opts()), None) -// }; -// // -// // Ok(Self { -// // db, -// // db_opts, -// // tdb_opts, -// // odb_opts, -// // path: path.to_string(), -// // destroy_on_close, -// // last_attr_id: Arc::new(Default::default()), -// // last_ent_id: Arc::new(Default::default()), -// // last_tx_id: Arc::new(Default::default()), -// // sessions: Mutex::new(vec![]), -// // }) -// todo!() -// } -// } -// -// #[cfg(test)] -// mod tests { -// use crate::data::compare::RUSTY_COMPARATOR; -// use super::*; -// -// #[test] -// fn test_create() { -// let mut opts = Options::new().within_unique_ptr(); -// // set_opts_comparator(opts.pin_mut(), &RUSTY_COMPARATOR); -// set_opts_create_if_missing(opts.pin_mut(), true); -// set_opts_bloom_filter(opts.pin_mut(), 10., true); -// set_opts_capped_prefix_extractor(opts.pin_mut(), DB_KEY_PREFIX_LEN); -// let db_ = DbBridge::new_raw_db("_test", &opts).unwrap(); -// // -// // let o = new_odb_opts(); -// // let db = DbBridge::new_odb("_test2", &opts, &o).unwrap(); -// // -// // let o = new_tdb_opts(); -// // let db = DbBridge::new_tdb("_test21", &opts, &o).unwrap(); -// // -// // -// // dbg!(12345); -// -// // let db = DbInstance::new("_test3", false, true).unwrap(); -// } -// } + Ok(Self { + db: self.db.clone(), + last_attr_id: self.last_attr_id.clone(), + last_ent_id: self.last_ent_id.clone(), + last_tx_id: self.last_tx_id.clone(), + n_sessions: self.n_sessions.clone(), + session_id: old_count + 1, + }) + } + + fn load_last_ids(&self) -> Result<()> { + let mut tx = self.transact(); + self.last_attr_id + .store(tx.load_last_attr_id()?.0, Ordering::Release); + self.last_ent_id + .store(tx.load_last_entity_id()?.0, Ordering::Release); + self.last_tx_id + .store(tx.load_last_tx_id()?.0, Ordering::Release); + Ok(()) + } + pub(crate) fn transact(&self) -> SessionTx { + SessionTx { + tx: self.db.transact().start(), + } + } +} diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index 67020667..21fbf854 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -1 +1,3 @@ -mod instance; +pub(crate) mod instance; +pub(crate) mod transact; + diff --git a/src/runtime/transact.rs b/src/runtime/transact.rs new file mode 100644 index 00000000..7e9b8992 --- /dev/null +++ b/src/runtime/transact.rs @@ -0,0 +1,64 @@ +use crate::data::encode::{encode_tx, encode_unique_attr_by_id, encode_unique_entity}; +use crate::data::id::{AttrId, EntityId, TxId}; +use anyhow::Result; +use cozorocks::Tx; + +pub(crate) struct SessionTx { + pub(crate) tx: Tx, +} + +impl SessionTx { + pub(crate) fn load_last_entity_id(&mut self) -> Result { + let e_lower = encode_unique_entity(EntityId::MIN_PERM); + let e_upper = encode_unique_entity(EntityId::MAX_PERM); + + let mut it = self + .tx + .iterator() + .lower_bound(&e_lower) + .lower_bound(&e_upper) + .start(); + + it.seek_to_end(); + Ok(match it.key()? { + None => EntityId::MAX_TEMP, + Some(data) => EntityId::from_bytes(data), + }) + } + + pub(crate) fn load_last_attr_id(&mut self) -> Result { + let e_lower = encode_unique_attr_by_id(AttrId::MIN_PERM); + let e_upper = encode_unique_attr_by_id(AttrId::MAX_PERM); + + let mut it = self + .tx + .iterator() + .lower_bound(&e_lower) + .lower_bound(&e_upper) + .start(); + + it.seek_to_end(); + Ok(match it.key()? { + None => AttrId::MAX_TEMP, + Some(data) => AttrId::from_bytes(data), + }) + } + + pub(crate) fn load_last_tx_id(&mut self) -> Result { + let e_lower = encode_tx(TxId::MAX_USER); + let e_upper = encode_tx(TxId::MIN_USER); + + let mut it = self + .tx + .iterator() + .lower_bound(&e_lower) + .lower_bound(&e_upper) + .start(); + + it.seek_to_start(); + Ok(match it.key()? { + None => TxId::MAX_SYS, + Some(data) => TxId::from_bytes(data), + }) + } +} diff --git a/src/tests.rs b/src/tests.rs new file mode 100644 index 00000000..c9f37482 --- /dev/null +++ b/src/tests.rs @@ -0,0 +1,19 @@ +use crate::Db; +use cozorocks::DbBuilder; + +fn create_db(name: &str) -> Db { + let builder = DbBuilder::default() + .path(name) + .create_if_missing(true) + .destroy_on_exit(true); + Db::build(builder).unwrap() +} + +fn test_send_sync(_: &T) {} + +#[test] +fn creation() { + let db = create_db("_test_db"); + test_send_sync(&db); + let session = db.new_session().unwrap(); +}