diff --git a/Cargo.toml b/Cargo.toml index 0ed02281..cd4daaf3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,20 +6,26 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -pest = "2.0" -pest_derive = "2.0" -ordered-float = "2.10.0" -uuid = { version = "0.8", features = ["v1"] } +uuid = { version = "0.8", features = ["v1", "serde"] } rand = "0.8.5" -chrono = "0.4" anyhow = "1.0" lazy_static = "1.4.0" thiserror = "1.0.30" log = "0.4.16" env_logger = "0.9.0" -actix-web = "4.0.1" -extsort = "0.4.2" +smallvec = { version = "1.8.1", features = ["serde", "write", "union", "const_generics", "const_new"] } +smartstring = { version = "1.0.1", features = ["serde"] } +serde_json = "1.0.81" +serde = { version = "1.0.137" } +serde_derive = "1.0.137" +rmp = "0.8.11" +rmp-serde = "1.1.0" +rmpv = "1.0.0" +ordered-float = { version = "3.0", features = ["serde"] } cozorocks = { path = "cozorocks" } +[target.'cfg(not(target_env = "msvc"))'.dependencies] +tikv-jemallocator = "0.5" + [profile.release] lto = true \ No newline at end of file diff --git a/cozorocks/src/bridge.h b/cozorocks/src/bridge.h index 2b938915..6f233387 100644 --- a/cozorocks/src/bridge.h +++ b/cozorocks/src/bridge.h @@ -88,7 +88,7 @@ namespace rocksdb_additions { // for options - inline void set_opts_create_if_mission(Options &opts, bool v) { + inline void set_opts_create_if_missing(Options &opts, bool v) { opts.create_if_missing = v; } @@ -150,6 +150,14 @@ namespace rocksdb_additions { inner.enable_blob_garbage_collection = v; } + inline unique_ptr new_odb_opts() { + return make_unique(); + } + + inline unique_ptr new_tdb_opts() { + return make_unique(); + } + // otopts inline void set_otopts_comparator(OptimisticTransactionOptions &opts, Comparator &cmp) { @@ -200,7 +208,7 @@ namespace rocksdb_additions { }; inline shared_ptr - open_db_raw(const Options &options, const string &path, Status &status) { + open_db_raw(const Options &options, const string path, Status &status) { DB *db = nullptr; status = DB::Open(options, path, &db); @@ -210,7 +218,7 @@ namespace rocksdb_additions { inline shared_ptr open_tdb_raw(const Options &options, const TransactionDBOptions &txn_db_options, - const string &path, + const string path, Status &status) { TransactionDB *txn_db = nullptr; @@ -221,7 +229,7 @@ namespace rocksdb_additions { inline shared_ptr - open_odb_raw(const Options &options, const string &path, Status &status) { + open_odb_raw(const Options &options, const string path, Status &status) { OptimisticTransactionDB *txn_db = nullptr; status = OptimisticTransactionDB::Open(options, diff --git a/cozorocks/src/lib.rs b/cozorocks/src/lib.rs index 0a5705be..75f49815 100644 --- a/cozorocks/src/lib.rs +++ b/cozorocks/src/lib.rs @@ -1,4 +1,5 @@ use autocxx::prelude::*; +use std::fmt::{Display, Formatter}; use std::os::raw::c_char; use std::pin::Pin; @@ -6,12 +7,13 @@ include_cpp! { #include "bridge.h" safety!(unsafe) + generate_ns!("rocksdb_additions") + generate_pod!("rocksdb::Slice") generate!("rocksdb::ReadOptions") generate!("rocksdb::WriteOptions") generate!("rocksdb::Options") generate!("rocksdb::DBOptions") generate!("rocksdb::Status") - generate_pod!("rocksdb::Slice") generate!("rocksdb::PinnableSlice") generate!("rocksdb::TransactionOptions") generate!("rocksdb::OptimisticTransactionOptions") @@ -25,10 +27,9 @@ include_cpp! { generate!("rocksdb::StackableDB") generate!("rocksdb::DB") generate!("rocksdb::Snapshot") - generate_ns!("rocksdb_additions") } -pub use autocxx::{c_int, c_void}; +pub use autocxx::{c_int, c_void, WithinUniquePtr}; pub use cxx::{CxxString, CxxVector, SharedPtr, UniquePtr}; pub use ffi::rocksdb::Status_Code as StatusCode; pub use ffi::rocksdb::Status_Severity as StatusSeverity; @@ -36,12 +37,21 @@ pub use ffi::rocksdb::Status_SubCode as StatusSubCode; pub use ffi::rocksdb::*; pub use ffi::rocksdb_additions::*; +#[derive(Debug, Copy, Clone)] pub struct DbStatus { pub code: StatusCode, pub subcode: StatusSubCode, pub severity: StatusSeverity, } +impl Display for DbStatus { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "{:?}", self) + } +} + +impl std::error::Error for DbStatus {} + #[inline(always)] fn convert_status(status: &ffi::rocksdb::Status) -> DbStatus { let code = status.code(); @@ -70,6 +80,44 @@ pub fn convert_slice_back(src: &Slice) -> &[u8] { pub type SnapshotPtr = *const Snapshot; impl DbBridge { + pub fn new_raw_db(path: &str, opts: &Options) -> Result, DbStatus> { + let mut status = Status::new().within_unique_ptr(); + let bridge = open_db_raw(opts, path, status.pin_mut()); + if !status.ok() { + Err(convert_status(&status)) + } else { + Ok(bridge) + } + } + + pub fn new_tdb( + path: &str, + opts: &Options, + tdb_opts: &TransactionDBOptions, + ) -> Result, DbStatus> { + let mut status = Status::new().within_unique_ptr(); + let bridge = open_tdb_raw(opts, tdb_opts, path, status.pin_mut()); + if !status.ok() { + Err(convert_status(&status)) + } else { + Ok(bridge) + } + } + + pub fn new_odb( + path: &str, + opts: &Options, + _odb_opts: &OptimisticTransactionDBOptions, + ) -> Result, DbStatus> { + let mut status = Status::new().within_unique_ptr(); + let bridge = open_odb_raw(opts, path, status.pin_mut()); + if !status.ok() { + Err(convert_status(&status)) + } else { + Ok(bridge) + } + } + #[inline] fn get_raw_db(&self) -> Pin<&mut DB> { unsafe { Pin::new_unchecked(&mut *self.inner_db()) } diff --git a/src/data/compare.rs b/src/data/compare.rs new file mode 100644 index 00000000..8960db94 --- /dev/null +++ b/src/data/compare.rs @@ -0,0 +1,159 @@ +use crate::data::encode::{ + decode_ae_key, decode_attr_key_by_id, decode_attr_key_by_kw, decode_ea_key, + decode_unique_attr_val, decode_vae_key, decode_value_from_key, StorageTag, +}; +use crate::data::id::{EntityId, TxId}; +use lazy_static::lazy_static; +use std::cmp::Ordering; + +#[no_mangle] +extern "C" fn rusty_cmp(a: &cozorocks::Slice, b: &cozorocks::Slice) -> cozorocks::c_int { + let a = cozorocks::convert_slice_back(a); + let b = cozorocks::convert_slice_back(b); + cozorocks::c_int(match compare_key(a, b) { + Ordering::Greater => 1, + Ordering::Equal => 0, + Ordering::Less => -1, + }) +} + +pub(crate) const DB_KEY_PREFIX_LEN: usize = 4; + +lazy_static! { + pub(crate) static ref RUSTY_COMPARATOR: cozorocks::UniquePtr = { + unsafe { + let f_ptr = rusty_cmp as *const cozorocks::c_void; + cozorocks::new_rust_comparator("cozo_rusty_cmp_v1", false, f_ptr) + } + }; +} + +macro_rules! return_if_resolved { + ($o:expr) => { + match $o { + std::cmp::Ordering::Equal => {} + o => return o, + } + }; +} + +#[inline] +fn compare_key(a: &[u8], b: &[u8]) -> Ordering { + use StorageTag::*; + + return_if_resolved!(a[0].cmp(&b[0])); + + match StorageTag::try_from(a[0]).unwrap() { + TripleEntityAttrValue => compare_key_triple_eav(a, b), + TripleAttrEntityValue => compare_key_triple_aev(a, b), + TripleAttrValueEntity => compare_key_triple_ave(a, b), + TripleValueAttrEntity => compare_key_triple_vae(a, b), + AttrById => compare_key_attr_by_id(a, b), + AttrByKeyword => compare_key_attr_by_kw(a, b), + Tx => compare_key_tx(a, b), + UniqueEntity => compare_key_unique_entity(a, b), + UniqueAttrValue => compare_key_unique_attr_val(a, b), + } +} + +#[inline] +fn compare_key_triple_eav(a: &[u8], b: &[u8]) -> Ordering { + let (a_e, a_a, a_t, a_o) = decode_ea_key(a).unwrap(); + let (b_e, b_a, b_t, b_o) = decode_ea_key(b).unwrap(); + + return_if_resolved!(a_e.cmp(&b_e)); + return_if_resolved!(a_a.cmp(&b_a)); + + let a_v = decode_value_from_key(a).unwrap(); + let b_v = decode_value_from_key(b).unwrap(); + + return_if_resolved!(a_v.cmp(&b_v)); + return_if_resolved!(a_t.cmp(&b_t).reverse()); + a_o.cmp(&b_o) +} + +#[inline] +fn compare_key_triple_aev(a: &[u8], b: &[u8]) -> Ordering { + let (a_a, a_e, a_t, a_o) = decode_ae_key(a).unwrap(); + let (b_a, b_e, b_t, b_o) = decode_ae_key(b).unwrap(); + + return_if_resolved!(a_a.cmp(&b_a)); + return_if_resolved!(a_e.cmp(&b_e)); + + let a_v = decode_value_from_key(a).unwrap(); + let b_v = decode_value_from_key(b).unwrap(); + + return_if_resolved!(a_v.cmp(&b_v)); + return_if_resolved!(a_t.cmp(&b_t).reverse()); + a_o.cmp(&b_o) +} + +#[inline] +fn compare_key_triple_ave(a: &[u8], b: &[u8]) -> Ordering { + let (a_a, a_e, a_t, a_o) = decode_ae_key(a).unwrap(); + let (b_a, b_e, b_t, b_o) = decode_ae_key(b).unwrap(); + + return_if_resolved!(a_a.cmp(&b_a)); + + let a_v = decode_value_from_key(a).unwrap(); + let b_v = decode_value_from_key(b).unwrap(); + + return_if_resolved!(a_v.cmp(&b_v)); + return_if_resolved!(a_e.cmp(&b_e)); + return_if_resolved!(a_t.cmp(&b_t).reverse()); + a_o.cmp(&b_o) +} + +#[inline] +fn compare_key_triple_vae(a: &[u8], b: &[u8]) -> Ordering { + let (a_v, a_a, a_e, a_t, a_o) = decode_vae_key(a).unwrap(); + let (b_v, b_a, b_e, b_t, b_o) = decode_vae_key(b).unwrap(); + + return_if_resolved!(a_v.cmp(&b_v)); + return_if_resolved!(a_a.cmp(&b_a)); + return_if_resolved!(a_e.cmp(&b_e)); + return_if_resolved!(a_t.cmp(&b_t).reverse()); + a_o.cmp(&b_o) +} + +#[inline] +fn compare_key_attr_by_id(a: &[u8], b: &[u8]) -> Ordering { + let (a_a, a_t, a_o) = decode_attr_key_by_id(a).unwrap(); + let (b_a, b_t, b_o) = decode_attr_key_by_id(b).unwrap(); + + return_if_resolved!(a_a.cmp(&b_a)); + return_if_resolved!(a_t.cmp(&b_t).reverse()); + a_o.cmp(&b_o) +} + +#[inline] +fn compare_key_attr_by_kw(a: &[u8], b: &[u8]) -> Ordering { + let (a_kw, a_t, a_o) = decode_attr_key_by_kw(a).unwrap(); + let (b_kw, b_t, b_o) = decode_attr_key_by_kw(b).unwrap(); + + return_if_resolved!(a_kw.cmp(&b_kw)); + return_if_resolved!(a_t.cmp(&b_t).reverse()); + a_o.cmp(&b_o) +} + +#[inline] +fn compare_key_tx(a: &[u8], b: &[u8]) -> Ordering { + let a_t = TxId::from_bytes(a); + let b_t = TxId::from_bytes(b); + a_t.cmp(&b_t).reverse() +} + +#[inline] +fn compare_key_unique_entity(a: &[u8], b: &[u8]) -> Ordering { + let a_e = EntityId::from_bytes(a); + let b_e = EntityId::from_bytes(b); + a_e.cmp(&b_e) +} + +#[inline] +fn compare_key_unique_attr_val(a: &[u8], b: &[u8]) -> Ordering { + let (a_a, a_v) = decode_unique_attr_val(a).unwrap(); + let (b_a, b_v) = decode_unique_attr_val(b).unwrap(); + return_if_resolved!(a_a.cmp(&b_a)); + a_v.cmp(&b_v) +} diff --git a/src/data/encode.rs b/src/data/encode.rs new file mode 100644 index 00000000..dc203e66 --- /dev/null +++ b/src/data/encode.rs @@ -0,0 +1,291 @@ +use crate::data::encode::StorageTag::Tx; +use crate::data::id::{AttrId, EntityId, TxId}; +use crate::data::keyword::Keyword; +use crate::data::triple::StoreOp; +use crate::data::value::Value; +use anyhow::Result; +use rmp_serde::Serializer; +use serde::Serialize; +use smallvec::SmallVec; +use std::ops::Deref; + +#[repr(u8)] +#[derive(Ord, PartialOrd, Eq, PartialEq, Debug)] +pub(crate) enum StorageTag { + TripleEntityAttrValue = 1, + TripleAttrEntityValue = 2, + TripleAttrValueEntity = 3, + TripleValueAttrEntity = 4, + AttrById = 5, + AttrByKeyword = 6, + Tx = 7, + UniqueEntity = 8, + UniqueAttrValue = 9, +} + +#[derive(Debug, thiserror::Error)] +pub enum StorageTagError { + #[error("unexpected value for StoreOp: {0}")] + UnexpectedValue(u8), +} + +impl TryFrom for StorageTag { + type Error = StorageTagError; + fn try_from(value: u8) -> std::result::Result { + use StorageTag::*; + Ok(match value { + 1 => TripleEntityAttrValue, + 2 => TripleAttrEntityValue, + 3 => TripleAttrValueEntity, + 4 => TripleValueAttrEntity, + 5 => AttrById, + 6 => AttrByKeyword, + 7 => Tx, + 8 => UniqueEntity, + 9 => UniqueAttrValue, + n => return Err(StorageTagError::UnexpectedValue(n)), + }) + } +} + +#[inline] +pub(crate) fn encode_value(val: Value) -> impl Deref { + let mut ret = SmallVec::<[u8; 60]>::new(); + val.serialize(&mut Serializer::new(&mut ret)).unwrap(); + ret +} + +#[inline] +pub(crate) fn decode_value(src: &[u8]) -> Result { + Ok(rmp_serde::from_slice(src)?) +} + +#[inline] +pub(crate) fn decode_value_from_key(src: &[u8]) -> Result { + Ok(rmp_serde::from_slice(&src[20..])?) +} + +/// eid: 8 bytes (incl. tag) +/// aid: 4 bytes +/// val: variable +/// tx: 8 bytes (incl. op) +#[inline] +pub(crate) fn encode_eav_key( + eid: EntityId, + aid: AttrId, + val: Value, + tx: TxId, + op: StoreOp, +) -> impl Deref { + let mut ret = SmallVec::<[u8; 60]>::new(); + + ret.extend(eid.0.to_be_bytes()); + ret[0] = StorageTag::TripleEntityAttrValue as u8; + + ret.extend(aid.0.to_be_bytes()); + + ret.extend(tx.0.to_be_bytes()); + ret[12] = op as u8; + debug_assert_eq!(ret.len(), 20); + + val.serialize(&mut Serializer::new(&mut ret)).unwrap(); + + ret +} + +#[inline] +pub(crate) fn decode_ea_key(src: &[u8]) -> Result<(EntityId, AttrId, TxId, StoreOp)> { + let eid = EntityId::from_bytes(&src[0..8]); + let aid = AttrId::from_bytes(&src[8..12]); + let tx = TxId::from_bytes(&src[12..20]); + let op = src[12].try_into()?; + + Ok((eid, aid, tx, op)) +} + +/// eid: 8 bytes (incl. tag) +/// aid: 4 bytes +/// val: variable +/// tx: 8 bytes (incl. op) +#[inline] +pub(crate) fn encode_aev_key( + aid: AttrId, + eid: EntityId, + val: Value, + tx: TxId, + op: StoreOp, +) -> impl Deref { + let mut ret = SmallVec::<[u8; 60]>::new(); + + ret.extend(aid.0.to_be_bytes()); + ret[0] = StorageTag::TripleAttrEntityValue as u8; + + ret.extend(eid.0.to_be_bytes()); + ret.extend(tx.0.to_be_bytes()); + ret[12] = op as u8; + debug_assert_eq!(ret.len(), 20); + + val.serialize(&mut Serializer::new(&mut ret)).unwrap(); + + ret +} + +#[inline] +pub(crate) fn decode_ae_key(src: &[u8]) -> Result<(AttrId, EntityId, TxId, StoreOp)> { + let aid = AttrId::from_bytes(&src[0..4]); + let eid = EntityId::from_bytes(&src[4..12]); + let tx = TxId::from_bytes(&src[12..20]); + let op = src[12].try_into()?; + + Ok((aid, eid, tx, op)) +} + +/// aid: 4 bytes (incl. tag) +/// val: variable +/// eid: 8 bytes +/// tx: 8 bytes (incl. op) +#[inline] +pub(crate) fn encode_ave_key( + aid: AttrId, + val: Value, + eid: EntityId, + tx: TxId, + op: StoreOp, +) -> impl Deref { + let mut ret = SmallVec::<[u8; 60]>::new(); + + ret.extend(aid.0.to_be_bytes()); + ret[0] = StorageTag::TripleAttrValueEntity as u8; + + ret.extend(eid.0.to_be_bytes()); + ret.extend(tx.0.to_be_bytes()); + ret[12] = op as u8; + debug_assert_eq!(ret.len(), 20); + + val.serialize(&mut Serializer::new(&mut ret)).unwrap(); + + ret +} + +/// val: 8 bytes (incl. tag) +/// eid: 8 bytes +/// aid: 4 bytes +/// tx: 8 bytes (incl. op) +#[inline] +pub(crate) fn encode_vae_key( + val: EntityId, + aid: AttrId, + eid: EntityId, + tx: TxId, + op: StoreOp, +) -> impl Deref { + let mut ret = SmallVec::<[u8; 60]>::new(); + + ret.extend(val.0.to_be_bytes()); + ret[0] = StorageTag::TripleAttrValueEntity as u8; + + ret.extend(aid.0.to_be_bytes()); + ret.extend(tx.0.to_be_bytes()); + ret[12] = op as u8; + debug_assert_eq!(ret.len(), 20); + ret.extend(eid.0.to_be_bytes()); + debug_assert_eq!(ret.len(), 28); + + ret +} + +#[inline] +pub(crate) fn decode_vae_key(src: &[u8]) -> Result<(EntityId, AttrId, EntityId, TxId, StoreOp)> { + let vid = EntityId::from_bytes(&src[0..8]); + let aid = AttrId::from_bytes(&src[8..12]); + let tx = TxId::from_bytes(&src[12..20]); + let eid = EntityId::from_bytes(&src[20..28]); + let op = src[12].try_into()?; + + Ok((vid, aid, eid, tx, op)) +} + +/// aid: 4 bytes (incl. tag) +/// tx: 8 bytes (incl. op) +#[inline] +pub(crate) fn encode_attr_by_id(aid: AttrId, tx: TxId, op: StoreOp) -> impl Deref { + let mut ret = SmallVec::<[u8; 12]>::new(); + ret.extend(aid.0.to_be_bytes()); + ret[0] = StorageTag::AttrById as u8; + ret.extend(tx.0.to_be_bytes()); + ret[4] = op as u8; + debug_assert_eq!(ret.len(), 12); + ret +} + +#[inline] +pub(crate) fn decode_attr_key_by_id(src: &[u8]) -> Result<(AttrId, TxId, StoreOp)> { + let aid = AttrId::from_bytes(&src[0..4]); + let tx = TxId::from_bytes(&src[4..12]); + let op = src[4].try_into()?; + Ok((aid, tx, op)) +} + +/// tag: 4 bytes (with prefix) +/// tx: 8 bytes (incl. op) +/// attr as kw: variable (segmented by \0) +#[inline] +pub(crate) fn encode_attr_by_kw( + attr_name: Keyword, + tx: TxId, + op: StoreOp, +) -> impl Deref { + let mut ret = SmallVec::<[u8; 12]>::new(); + ret.push(StorageTag::AttrByKeyword as u8); + let ns_bytes = attr_name.ns.as_bytes(); + ret.push(ns_bytes.get(0).cloned().unwrap_or(0)); + ret.push(ns_bytes.get(1).cloned().unwrap_or(0)); + ret.push(ns_bytes.get(2).cloned().unwrap_or(0)); + ret.extend(tx.0.to_be_bytes()); + ret[4] = op as u8; + ret.extend_from_slice(ns_bytes); + ret.push(b'/'); + ret.extend_from_slice(attr_name.ident.as_bytes()); + ret +} + +#[inline] +pub(crate) fn decode_attr_key_by_kw(src: &[u8]) -> Result<(Keyword, TxId, StoreOp)> { + let tx = TxId::from_bytes(&src[4..12]); + let op = src[4].try_into()?; + let kw = Keyword::try_from(&src[12..])?; + Ok((kw, tx, op)) +} + +/// tx: 8 bytes (incl. tag) +#[inline] +pub(crate) fn encode_tx(tx: TxId) -> impl Deref { + let mut ret = SmallVec::<[u8; 8]>::new(); + ret.extend(tx.0.to_be_bytes()); + ret[0] = StorageTag::Tx as u8; + ret +} + +#[inline] +pub(crate) fn encode_unique_entity_placeholder(eid: EntityId) -> impl Deref { + let mut ret = SmallVec::<[u8; 8]>::new(); + ret.extend(eid.0.to_be_bytes()); + ret[0] = StorageTag::UniqueEntity as u8; + ret +} + +#[inline] +pub(crate) fn encode_unique_attr_val(aid: AttrId, val: Value) -> impl Deref { + let mut ret = SmallVec::<[u8; 60]>::new(); + ret.extend(aid.0.to_be_bytes()); + ret[0] = StorageTag::UniqueAttrValue as u8; + val.serialize(&mut Serializer::new(&mut ret)).unwrap(); + ret +} + +#[inline] +pub(crate) fn decode_unique_attr_val(src: &[u8]) -> Result<(AttrId, Value)> { + let a_id = AttrId::from_bytes(&src[..4]); + let val = rmp_serde::from_slice(&src[4..])?; + Ok((a_id, val)) +} diff --git a/src/data/id.rs b/src/data/id.rs new file mode 100644 index 00000000..686c38ca --- /dev/null +++ b/src/data/id.rs @@ -0,0 +1,62 @@ +use serde_derive::{Deserialize, Serialize}; +use std::fmt::{Debug, Formatter}; + +#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Deserialize, Serialize, Hash)] +pub struct EntityId(pub u64); + +impl EntityId { + pub(crate) fn from_bytes(b: &[u8]) -> Self { + EntityId(u64::from_be_bytes([ + 0, b[1], b[2], b[3], b[4], b[5], b[6], b[7], + ])) + } +} + +impl Debug for EntityId { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "e{}", self.0) + } +} + +pub(crate) const MAX_SYS_ENTITY_ID: EntityId = EntityId(1000); +pub(crate) const MAX_TEMP_ENTITY_ID: EntityId = EntityId(10_000_000); +pub(crate) const MAX_PERM_ENTITY_ID: EntityId = EntityId(0x00ff_ffff_ff00_0000); + +#[derive(Clone, Copy, PartialEq, Ord, PartialOrd, Eq, Deserialize, Serialize, Hash)] +pub struct AttrId(pub u32); + +impl AttrId { + pub(crate) fn from_bytes(b: &[u8]) -> Self { + AttrId(u32::from_be_bytes([0, b[1], b[2], b[3]])) + } +} + +impl Debug for AttrId { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "a{}", self.0) + } +} + +pub(crate) const MAX_SYS_ATTR_ID: AttrId = AttrId(1000); +pub(crate) const MAX_TEMP_ATTR_ID: AttrId = AttrId(1000000); +pub(crate) const MAX_PERM_ATTR_ID: AttrId = AttrId(0x00ff_ffff); + +#[derive(Clone, Copy, PartialEq, Ord, PartialOrd, Eq, Deserialize, Serialize, Hash)] +pub struct TxId(pub u64); + +impl TxId { + pub(crate) fn from_bytes(b: &[u8]) -> Self { + TxId(u64::from_be_bytes([ + 0, b[1], b[2], b[3], b[4], b[5], b[6], b[7], + ])) + } +} + +impl Debug for TxId { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "t{}", self.0) + } +} + +pub(crate) const MAX_SYS_TX_ID: TxId = TxId(1000); +pub(crate) const MAX_USER_TX_ID: TxId = TxId(0x00ff_ffff_ffff_ffff); diff --git a/src/data/keyword.rs b/src/data/keyword.rs new file mode 100644 index 00000000..e11a3d8f --- /dev/null +++ b/src/data/keyword.rs @@ -0,0 +1,43 @@ +use serde_derive::{Deserialize, Serialize}; +use smartstring::{LazyCompact, SmartString}; +use std::str::Utf8Error; + +#[derive(Debug, thiserror::Error)] +pub enum KeywordError { + #[error("invalid keyword: {0}")] + InvalidKeyword(String), + + #[error(transparent)] + Utf8(#[from] Utf8Error), +} + +#[derive(Clone, PartialEq, PartialOrd, Eq, Ord, Debug, Deserialize, Serialize)] +pub struct Keyword { + pub(crate) ns: SmartString, + pub(crate) ident: SmartString, +} + +impl TryFrom<&str> for Keyword { + type Error = KeywordError; + fn try_from(value: &str) -> Result { + let make_err = || KeywordError::InvalidKeyword(value.to_string()); + let mut kw_iter = value.split('/'); + let ns = kw_iter.next().ok_or_else(make_err)?; + let ident = kw_iter.next().ok_or_else(make_err)?; + if kw_iter.next().is_none() { + Ok(Keyword { + ns: ns.into(), + ident: ident.into(), + }) + } else { + Err(make_err()) + } + } +} + +impl TryFrom<&[u8]> for Keyword { + type Error = KeywordError; + fn try_from(value: &[u8]) -> Result { + std::str::from_utf8(value)?.try_into() + } +} diff --git a/src/data/mod.rs b/src/data/mod.rs new file mode 100644 index 00000000..bfc2647a --- /dev/null +++ b/src/data/mod.rs @@ -0,0 +1,7 @@ +pub(crate) mod compare; +pub(crate) mod id; +pub(crate) mod value; +pub(crate) mod triple; +pub(crate) mod tx; +pub(crate) mod encode; +pub(crate) mod keyword; diff --git a/src/data/triple.rs b/src/data/triple.rs new file mode 100644 index 00000000..a8a99356 --- /dev/null +++ b/src/data/triple.rs @@ -0,0 +1,26 @@ +use serde_derive::{Deserialize, Serialize}; + +#[derive(Debug, thiserror::Error)] +pub enum StoreOpError { + #[error("unexpected value for StoreOp: {0}")] + UnexpectedValue(u8), +} + +#[repr(u8)] +#[derive(Clone, Copy, PartialEq, Ord, PartialOrd, Eq, Debug, Deserialize, Serialize)] +pub enum StoreOp { + Retract = 0, + Assert = 1, +} + +impl TryFrom for StoreOp { + type Error = StoreOpError; + + fn try_from(u: u8) -> Result { + match u { + 0 => Ok(StoreOp::Retract), + 1 => Ok(StoreOp::Assert), + n => Err(StoreOpError::UnexpectedValue(n)), + } + } +} diff --git a/src/data/tx.rs b/src/data/tx.rs new file mode 100644 index 00000000..e69de29b diff --git a/src/data/value.rs b/src/data/value.rs new file mode 100644 index 00000000..a06159cc --- /dev/null +++ b/src/data/value.rs @@ -0,0 +1,45 @@ +use crate::data::id::{AttrId, EntityId, TxId}; +use crate::data::keyword::Keyword; +use ordered_float::OrderedFloat; +use serde_derive::{Deserialize, Serialize}; +use std::borrow::Cow; +use std::cmp::Reverse; +use std::fmt::Debug; +use uuid::Uuid; + +#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug, Deserialize, Serialize)] +pub enum Value<'a> { + #[serde(rename = "n")] + Null, + #[serde(rename = "b")] + Bool(bool), + #[serde(rename = "e")] + EnId(EntityId), + #[serde(rename = "a")] + AtId(AttrId), + #[serde(rename = "t")] + TxId(TxId), + #[serde(rename = "i")] + Int(i64), + #[serde(rename = "f")] + Float(OrderedFloat), + #[serde(rename = "k")] + Keyword(Keyword), + #[serde(borrow)] + #[serde(rename = "s")] + String(Cow<'a, str>), + #[serde(rename = "u")] + Uuid(Uuid), + #[serde(rename = "m")] + Timestamp(i64), + #[serde(borrow)] + #[serde(rename = "v")] + Bytes(Cow<'a, [u8]>), + + #[serde(rename = "z")] + Tuple(Box<[Value<'a>]>), + #[serde(rename = "o")] + DescVal(Reverse>>), + #[serde(rename = "r")] + Bottom, +} diff --git a/src/lib.rs b/src/lib.rs index 862452de..a83ac8de 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,30 +1,9 @@ -use cozorocks::DbStatus; -use lazy_static::lazy_static; +#[cfg(not(target_env = "msvc"))] +use tikv_jemallocator::Jemalloc; -#[no_mangle] -extern "C" fn rusty_cmp(a: &cozorocks::Slice, b: &cozorocks::Slice) -> cozorocks::c_int { - dbg!(cozorocks::convert_slice_back(a)); - dbg!(cozorocks::convert_slice_back(b)); - cozorocks::c_int(0) -} +#[cfg(not(target_env = "msvc"))] +#[global_allocator] +static GLOBAL: Jemalloc = Jemalloc; -lazy_static! { - static ref RUSTY_COMPARATOR: cozorocks::UniquePtr = { - unsafe { - let f_ptr = rusty_cmp as *const cozorocks::c_void; - cozorocks::new_rust_comparator("hello", false, f_ptr) - } - }; -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_it() { - let a = cozorocks::convert_slice(&[1, 2, 3, 4]); - let b = cozorocks::convert_slice(&[4, 5, 6, 7]); - assert_eq!(RUSTY_COMPARATOR.Compare(&a, &b), cozorocks::c_int(0)); - } -} +pub(crate) mod data; +pub(crate) mod runtime; diff --git a/src/runtime/instance.rs b/src/runtime/instance.rs new file mode 100644 index 00000000..ead3a1ff --- /dev/null +++ b/src/runtime/instance.rs @@ -0,0 +1,63 @@ +use crate::data::compare::DB_KEY_PREFIX_LEN; +use anyhow::Result; +use cozorocks::*; +use std::sync::atomic::{AtomicU32, AtomicU64}; +use std::sync::{Arc, Mutex}; + +pub struct DbInstance { + pub destroy_on_close: bool, + db: SharedPtr, + db_opts: UniquePtr, + tdb_opts: Option>, + odb_opts: Option>, + path: String, + last_attr_id: Arc, + last_ent_id: Arc, + last_tx_id: Arc, + sessions: Mutex>>>, +} + +struct SessionHandle { + id: usize, + temp: SharedPtr, + status: SessionStatus, +} + +#[derive(Eq, PartialEq, Debug, Clone, Copy)] +pub enum SessionStatus { + Prepared, + Running, + Completed, +} + +impl DbInstance { + pub fn new(path: &str, optimistic: bool, destroy_on_close: bool) -> Result { + let mut db_opts = Options::new().within_unique_ptr(); + set_opts_create_if_missing(db_opts.pin_mut(), true); + set_opts_bloom_filter(db_opts.pin_mut(), 10., true); + set_opts_capped_prefix_extractor(db_opts.pin_mut(), DB_KEY_PREFIX_LEN); + + let (db, tdb_opts, odb_opts) = if optimistic { + let o = new_odb_opts(); + let db = DbBridge::new_odb(path, &db_opts, &o)?; + (db, None, Some(o)) + } else { + let o = new_odb_opts(); + let db = DbBridge::new_odb(path, &db_opts, &o)?; + (db, Some(new_tdb_opts()), None) + }; + + Ok(Self { + db, + db_opts, + tdb_opts, + odb_opts, + path: path.to_string(), + destroy_on_close, + last_attr_id: Arc::new(Default::default()), + last_ent_id: Arc::new(Default::default()), + last_tx_id: Arc::new(Default::default()), + sessions: Mutex::new(vec![]), + }) + } +} diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs new file mode 100644 index 00000000..67020667 --- /dev/null +++ b/src/runtime/mod.rs @@ -0,0 +1 @@ +mod instance;