starting DB

main
Ziyang Hu 2 years ago
parent 6e145a9a93
commit c364e9ae54

@ -104,7 +104,7 @@ impl<'a> DbBuilder<'a> {
pub fn use_custom_comparator(
mut self,
name: &'a str,
cmp: fn(&[u8], &[u8]) -> i8,
cmp: extern "C" fn(&[u8], &[u8]) -> i8,
different_bytes_can_be_equal: bool,
) -> Self {
self.opts.comparator_name = name;

@ -1,9 +1,19 @@
use crate::*;
fn test_comparator(a: &[u8], b: &[u8]) -> i8 {
#[allow(improper_ctypes_definitions)]
#[no_mangle]
extern "C" fn test_comparator(a: &[u8], b: &[u8]) -> i8 {
use std::cmp::Ordering::*;
let res = a.cmp(b);
match a.cmp(b) {
// println!(
// "comparator called: {} vs {} => {:?}",
// String::from_utf8_lossy(a),
// String::from_utf8_lossy(b),
// res
// );
match res {
Equal => 0,
Greater => 1,
Less => -1,
@ -36,8 +46,7 @@ fn creation() {
);
assert!(tx.get("bye".as_bytes(), false).unwrap().is_none());
let mut it = tx.iterator()
.total_order_seek(true).start();
let mut it = tx.iterator().total_order_seek(true).start();
it.seek_to_start();
while let Some((k, v)) = it.pair().unwrap() {
let mut res = String::from_utf8_lossy(k);

@ -6,7 +6,7 @@ use std::cmp::Ordering;
#[allow(improper_ctypes_definitions)]
#[no_mangle]
extern "C" fn rusty_cmp(a: &[u8], b: &[u8]) -> i8 {
pub(crate) extern "C" fn rusty_cmp(a: &[u8], b: &[u8]) -> i8 {
match compare_key(a, b) {
Ordering::Greater => 1,
Ordering::Equal => 0,

@ -291,7 +291,11 @@ pub(crate) fn decode_attr_key_by_id(src: &[u8]) -> Result<(AttrId, TxId, StoreOp
/// tx: 8 bytes (incl. op)
/// attr as kw: variable (segmented by \0)
#[inline]
pub(crate) fn encode_attr_by_kw(attr_name: Keyword, tx: TxId, op: StoreOp) -> Encoded<LARGE_VEC_SIZE> {
pub(crate) fn encode_attr_by_kw(
attr_name: Keyword,
tx: TxId,
op: StoreOp,
) -> Encoded<LARGE_VEC_SIZE> {
let mut ret = SmallVec::<[u8; LARGE_VEC_SIZE]>::new();
ret.push(StorageTag::AttrByKeyword as u8);
let ns_bytes = attr_name.ns.as_bytes();
@ -328,7 +332,7 @@ pub(crate) fn encode_tx(tx: TxId) -> Encoded<VEC_SIZE_8> {
}
#[inline]
pub(crate) fn encode_unique_entity_placeholder(eid: EntityId) -> Encoded<VEC_SIZE_8> {
pub(crate) fn encode_unique_entity(eid: EntityId) -> Encoded<VEC_SIZE_8> {
let mut ret = SmallVec::<[u8; VEC_SIZE_8]>::new();
ret.extend(eid.0.to_be_bytes());
ret[0] = StorageTag::UniqueEntity as u8;

@ -5,6 +5,11 @@ use std::fmt::{Debug, Formatter};
pub struct EntityId(pub u64);
impl EntityId {
pub(crate) const MAX_SYS: EntityId = EntityId(1000);
pub(crate) const MAX_TEMP: EntityId = EntityId(10_000_000);
pub(crate) const MIN_PERM: EntityId = EntityId(10_000_001);
pub(crate) const MAX_PERM: EntityId = EntityId(0x00ff_ffff_ff00_0000);
pub(crate) fn from_bytes(b: &[u8]) -> Self {
EntityId(u64::from_be_bytes([
0, b[1], b[2], b[3], b[4], b[5], b[6], b[7],
@ -18,14 +23,15 @@ impl Debug for EntityId {
}
}
pub(crate) const MAX_SYS_ENTITY_ID: EntityId = EntityId(1000);
pub(crate) const MAX_TEMP_ENTITY_ID: EntityId = EntityId(10_000_000);
pub(crate) const MAX_PERM_ENTITY_ID: EntityId = EntityId(0x00ff_ffff_ff00_0000);
#[derive(Clone, Copy, PartialEq, Ord, PartialOrd, Eq, Deserialize, Serialize, Hash)]
pub struct AttrId(pub u64);
impl AttrId {
pub(crate) const MAX_SYS: AttrId = AttrId(1000);
pub(crate) const MAX_TEMP: AttrId = AttrId(10_000_000);
pub(crate) const MIN_PERM: AttrId = AttrId(10_000_001);
pub(crate) const MAX_PERM: AttrId = AttrId(0x00ff_ffff_ff00_0000);
pub(crate) fn from_bytes(b: &[u8]) -> Self {
AttrId(u64::from_be_bytes([
0, b[1], b[2], b[3], b[4], b[5], b[6], b[7],
@ -39,14 +45,14 @@ impl Debug for AttrId {
}
}
pub(crate) const MAX_SYS_ATTR_ID: AttrId = AttrId(1000);
pub(crate) const MAX_TEMP_ATTR_ID: AttrId = AttrId(10_000_000);
pub(crate) const MAX_PERM_ATTR_ID: AttrId = AttrId(0x00ff_ffff_ff00_0000);
#[derive(Clone, Copy, PartialEq, Ord, PartialOrd, Eq, Deserialize, Serialize, Hash)]
pub struct TxId(pub u64);
impl TxId {
pub(crate) const MAX_SYS: TxId = TxId(1000);
pub(crate) const MIN_USER: TxId = TxId(1001);
pub(crate) const MAX_USER: TxId = TxId(0x00ff_ffff_ffff_ffff);
pub(crate) fn from_bytes(b: &[u8]) -> Self {
TxId(u64::from_be_bytes([
0, b[1], b[2], b[3], b[4], b[5], b[6], b[7],
@ -59,6 +65,3 @@ impl Debug for TxId {
write!(f, "t{}", self.0)
}
}
pub(crate) const MAX_SYS_TX_ID: TxId = TxId(1000);
pub(crate) const MAX_USER_TX_ID: TxId = TxId(0x00ff_ffff_ffff_ffff);

@ -7,3 +7,7 @@ static GLOBAL: Jemalloc = Jemalloc;
pub(crate) mod data;
pub(crate) mod runtime;
#[cfg(test)]
mod tests;
pub use runtime::instance::Db;

@ -1,95 +1,66 @@
use cozorocks::RocksDb;
use std::sync::atomic::{AtomicU32, AtomicU64};
use std::sync::{Arc, Mutex};
use crate::data::compare::{rusty_cmp, DB_KEY_PREFIX_LEN};
use crate::runtime::transact::SessionTx;
use anyhow::Result;
use cozorocks::{DbBuilder, RocksDb};
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::sync::Arc;
// use crate::data::compare::DB_KEY_PREFIX_LEN;
// use anyhow::Result;
// use cozorocks::*;
// use std::sync::atomic::{AtomicU32, AtomicU64};
// use std::sync::{Arc, Mutex};
//
pub struct DbInstance {
pub struct Db {
db: RocksDb,
last_attr_id: Arc<AtomicU32>,
last_attr_id: Arc<AtomicU64>,
last_ent_id: Arc<AtomicU64>,
last_tx_id: Arc<AtomicU64>,
sessions: Mutex<Vec<Arc<Mutex<SessionHandle>>>>,
n_sessions: Arc<AtomicUsize>,
session_id: usize,
}
struct SessionHandle {
id: usize,
db: RocksDb,
last_attr_id: Arc<AtomicU32>,
last_ent_id: Arc<AtomicU64>,
last_tx_id: Arc<AtomicU64>,
status: SessionStatus,
}
impl Db {
pub fn build(builder: DbBuilder) -> Result<Self> {
let db = builder
.use_bloom_filter(true, 10., true)
.use_capped_prefix_extractor(true, DB_KEY_PREFIX_LEN)
.use_custom_comparator("cozo_rusty_cmp", rusty_cmp, false)
.build()?;
Ok(Self {
db,
last_attr_id: Arc::new(Default::default()),
last_ent_id: Arc::new(Default::default()),
last_tx_id: Arc::new(Default::default()),
n_sessions: Arc::new(Default::default()),
session_id: Default::default(),
})
}
#[derive(Eq, PartialEq, Debug, Clone, Copy)]
pub enum SessionStatus {
Prepared,
Running,
Completed,
}
pub fn new_session(&self) -> Result<Self> {
if self.session_id == 0 {
self.load_last_ids()?;
}
let old_count = self.n_sessions.fetch_add(1, Ordering::AcqRel);
// impl DbInstance {
// pub fn new(path: &str, optimistic: bool, destroy_on_close: bool) -> Result<Self> {
// let mut db_opts = Options::new().within_unique_ptr();
// set_opts_create_if_missing(db_opts.pin_mut(), true);
// set_opts_bloom_filter(db_opts.pin_mut(), 10., true);
// set_opts_capped_prefix_extractor(db_opts.pin_mut(), DB_KEY_PREFIX_LEN);
//
// let (db, tdb_opts, odb_opts) = if optimistic {
// let o = new_odb_opts();
// // let db = DbBridge::new_odb(path, &db_opts, &o)?;
// let db = todo!();
// (db, None, Some(o))
// } else {
// let o = new_tdb_opts();
// let db = DbBridge::new_tdb(path, &db_opts, &o)?;
// // let db = todo!();
// (db, Some(new_tdb_opts()), None)
// };
// //
// // Ok(Self {
// // db,
// // db_opts,
// // tdb_opts,
// // odb_opts,
// // path: path.to_string(),
// // destroy_on_close,
// // last_attr_id: Arc::new(Default::default()),
// // last_ent_id: Arc::new(Default::default()),
// // last_tx_id: Arc::new(Default::default()),
// // sessions: Mutex::new(vec![]),
// // })
// todo!()
// }
// }
//
// #[cfg(test)]
// mod tests {
// use crate::data::compare::RUSTY_COMPARATOR;
// use super::*;
//
// #[test]
// fn test_create() {
// let mut opts = Options::new().within_unique_ptr();
// // set_opts_comparator(opts.pin_mut(), &RUSTY_COMPARATOR);
// set_opts_create_if_missing(opts.pin_mut(), true);
// set_opts_bloom_filter(opts.pin_mut(), 10., true);
// set_opts_capped_prefix_extractor(opts.pin_mut(), DB_KEY_PREFIX_LEN);
// let db_ = DbBridge::new_raw_db("_test", &opts).unwrap();
// //
// // let o = new_odb_opts();
// // let db = DbBridge::new_odb("_test2", &opts, &o).unwrap();
// //
// // let o = new_tdb_opts();
// // let db = DbBridge::new_tdb("_test21", &opts, &o).unwrap();
// //
// //
// // dbg!(12345);
//
// // let db = DbInstance::new("_test3", false, true).unwrap();
// }
// }
Ok(Self {
db: self.db.clone(),
last_attr_id: self.last_attr_id.clone(),
last_ent_id: self.last_ent_id.clone(),
last_tx_id: self.last_tx_id.clone(),
n_sessions: self.n_sessions.clone(),
session_id: old_count + 1,
})
}
fn load_last_ids(&self) -> Result<()> {
let mut tx = self.transact();
self.last_attr_id
.store(tx.load_last_attr_id()?.0, Ordering::Release);
self.last_ent_id
.store(tx.load_last_entity_id()?.0, Ordering::Release);
self.last_tx_id
.store(tx.load_last_tx_id()?.0, Ordering::Release);
Ok(())
}
pub(crate) fn transact(&self) -> SessionTx {
SessionTx {
tx: self.db.transact().start(),
}
}
}

@ -1 +1,3 @@
mod instance;
pub(crate) mod instance;
pub(crate) mod transact;

@ -0,0 +1,64 @@
use crate::data::encode::{encode_tx, encode_unique_attr_by_id, encode_unique_entity};
use crate::data::id::{AttrId, EntityId, TxId};
use anyhow::Result;
use cozorocks::Tx;
pub(crate) struct SessionTx {
pub(crate) tx: Tx,
}
impl SessionTx {
pub(crate) fn load_last_entity_id(&mut self) -> Result<EntityId> {
let e_lower = encode_unique_entity(EntityId::MIN_PERM);
let e_upper = encode_unique_entity(EntityId::MAX_PERM);
let mut it = self
.tx
.iterator()
.lower_bound(&e_lower)
.lower_bound(&e_upper)
.start();
it.seek_to_end();
Ok(match it.key()? {
None => EntityId::MAX_TEMP,
Some(data) => EntityId::from_bytes(data),
})
}
pub(crate) fn load_last_attr_id(&mut self) -> Result<AttrId> {
let e_lower = encode_unique_attr_by_id(AttrId::MIN_PERM);
let e_upper = encode_unique_attr_by_id(AttrId::MAX_PERM);
let mut it = self
.tx
.iterator()
.lower_bound(&e_lower)
.lower_bound(&e_upper)
.start();
it.seek_to_end();
Ok(match it.key()? {
None => AttrId::MAX_TEMP,
Some(data) => AttrId::from_bytes(data),
})
}
pub(crate) fn load_last_tx_id(&mut self) -> Result<TxId> {
let e_lower = encode_tx(TxId::MAX_USER);
let e_upper = encode_tx(TxId::MIN_USER);
let mut it = self
.tx
.iterator()
.lower_bound(&e_lower)
.lower_bound(&e_upper)
.start();
it.seek_to_start();
Ok(match it.key()? {
None => TxId::MAX_SYS,
Some(data) => TxId::from_bytes(data),
})
}
}

@ -0,0 +1,19 @@
use crate::Db;
use cozorocks::DbBuilder;
fn create_db(name: &str) -> Db {
let builder = DbBuilder::default()
.path(name)
.create_if_missing(true)
.destroy_on_exit(true);
Db::build(builder).unwrap()
}
fn test_send_sync<T: Send + Sync>(_: &T) {}
#[test]
fn creation() {
let db = create_db("_test_db");
test_send_sync(&db);
let session = db.new_session().unwrap();
}
Loading…
Cancel
Save