encode, decode, compare

main
Ziyang Hu 2 years ago
parent 6c5ac6faa9
commit dd7a59d258

@ -6,20 +6,26 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
pest = "2.0"
pest_derive = "2.0"
ordered-float = "2.10.0"
uuid = { version = "0.8", features = ["v1"] }
uuid = { version = "0.8", features = ["v1", "serde"] }
rand = "0.8.5"
chrono = "0.4"
anyhow = "1.0"
lazy_static = "1.4.0"
thiserror = "1.0.30"
log = "0.4.16"
env_logger = "0.9.0"
actix-web = "4.0.1"
extsort = "0.4.2"
smallvec = { version = "1.8.1", features = ["serde", "write", "union", "const_generics", "const_new"] }
smartstring = { version = "1.0.1", features = ["serde"] }
serde_json = "1.0.81"
serde = { version = "1.0.137" }
serde_derive = "1.0.137"
rmp = "0.8.11"
rmp-serde = "1.1.0"
rmpv = "1.0.0"
ordered-float = { version = "3.0", features = ["serde"] }
cozorocks = { path = "cozorocks" }
[target.'cfg(not(target_env = "msvc"))'.dependencies]
tikv-jemallocator = "0.5"
[profile.release]
lto = true

@ -88,7 +88,7 @@ namespace rocksdb_additions {
// for options
inline void set_opts_create_if_mission(Options &opts, bool v) {
inline void set_opts_create_if_missing(Options &opts, bool v) {
opts.create_if_missing = v;
}
@ -150,6 +150,14 @@ namespace rocksdb_additions {
inner.enable_blob_garbage_collection = v;
}
inline unique_ptr<OptimisticTransactionDBOptions> new_odb_opts() {
return make_unique<OptimisticTransactionDBOptions>();
}
inline unique_ptr<TransactionDBOptions> new_tdb_opts() {
return make_unique<TransactionDBOptions>();
}
// otopts
inline void set_otopts_comparator(OptimisticTransactionOptions &opts, Comparator &cmp) {
@ -200,7 +208,7 @@ namespace rocksdb_additions {
};
inline shared_ptr<DbBridge>
open_db_raw(const Options &options, const string &path, Status &status) {
open_db_raw(const Options &options, const string path, Status &status) {
DB *db = nullptr;
status = DB::Open(options, path, &db);
@ -210,7 +218,7 @@ namespace rocksdb_additions {
inline shared_ptr<DbBridge>
open_tdb_raw(const Options &options,
const TransactionDBOptions &txn_db_options,
const string &path,
const string path,
Status &status) {
TransactionDB *txn_db = nullptr;
@ -221,7 +229,7 @@ namespace rocksdb_additions {
inline shared_ptr<DbBridge>
open_odb_raw(const Options &options, const string &path, Status &status) {
open_odb_raw(const Options &options, const string path, Status &status) {
OptimisticTransactionDB *txn_db = nullptr;
status = OptimisticTransactionDB::Open(options,

@ -1,4 +1,5 @@
use autocxx::prelude::*;
use std::fmt::{Display, Formatter};
use std::os::raw::c_char;
use std::pin::Pin;
@ -6,12 +7,13 @@ include_cpp! {
#include "bridge.h"
safety!(unsafe)
generate_ns!("rocksdb_additions")
generate_pod!("rocksdb::Slice")
generate!("rocksdb::ReadOptions")
generate!("rocksdb::WriteOptions")
generate!("rocksdb::Options")
generate!("rocksdb::DBOptions")
generate!("rocksdb::Status")
generate_pod!("rocksdb::Slice")
generate!("rocksdb::PinnableSlice")
generate!("rocksdb::TransactionOptions")
generate!("rocksdb::OptimisticTransactionOptions")
@ -25,10 +27,9 @@ include_cpp! {
generate!("rocksdb::StackableDB")
generate!("rocksdb::DB")
generate!("rocksdb::Snapshot")
generate_ns!("rocksdb_additions")
}
pub use autocxx::{c_int, c_void};
pub use autocxx::{c_int, c_void, WithinUniquePtr};
pub use cxx::{CxxString, CxxVector, SharedPtr, UniquePtr};
pub use ffi::rocksdb::Status_Code as StatusCode;
pub use ffi::rocksdb::Status_Severity as StatusSeverity;
@ -36,12 +37,21 @@ pub use ffi::rocksdb::Status_SubCode as StatusSubCode;
pub use ffi::rocksdb::*;
pub use ffi::rocksdb_additions::*;
#[derive(Debug, Copy, Clone)]
pub struct DbStatus {
pub code: StatusCode,
pub subcode: StatusSubCode,
pub severity: StatusSeverity,
}
impl Display for DbStatus {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", self)
}
}
impl std::error::Error for DbStatus {}
#[inline(always)]
fn convert_status(status: &ffi::rocksdb::Status) -> DbStatus {
let code = status.code();
@ -70,6 +80,44 @@ pub fn convert_slice_back(src: &Slice) -> &[u8] {
pub type SnapshotPtr = *const Snapshot;
impl DbBridge {
pub fn new_raw_db(path: &str, opts: &Options) -> Result<SharedPtr<Self>, DbStatus> {
let mut status = Status::new().within_unique_ptr();
let bridge = open_db_raw(opts, path, status.pin_mut());
if !status.ok() {
Err(convert_status(&status))
} else {
Ok(bridge)
}
}
pub fn new_tdb(
path: &str,
opts: &Options,
tdb_opts: &TransactionDBOptions,
) -> Result<SharedPtr<Self>, DbStatus> {
let mut status = Status::new().within_unique_ptr();
let bridge = open_tdb_raw(opts, tdb_opts, path, status.pin_mut());
if !status.ok() {
Err(convert_status(&status))
} else {
Ok(bridge)
}
}
pub fn new_odb(
path: &str,
opts: &Options,
_odb_opts: &OptimisticTransactionDBOptions,
) -> Result<SharedPtr<Self>, DbStatus> {
let mut status = Status::new().within_unique_ptr();
let bridge = open_odb_raw(opts, path, status.pin_mut());
if !status.ok() {
Err(convert_status(&status))
} else {
Ok(bridge)
}
}
#[inline]
fn get_raw_db(&self) -> Pin<&mut DB> {
unsafe { Pin::new_unchecked(&mut *self.inner_db()) }

@ -0,0 +1,159 @@
use crate::data::encode::{
decode_ae_key, decode_attr_key_by_id, decode_attr_key_by_kw, decode_ea_key,
decode_unique_attr_val, decode_vae_key, decode_value_from_key, StorageTag,
};
use crate::data::id::{EntityId, TxId};
use lazy_static::lazy_static;
use std::cmp::Ordering;
#[no_mangle]
extern "C" fn rusty_cmp(a: &cozorocks::Slice, b: &cozorocks::Slice) -> cozorocks::c_int {
let a = cozorocks::convert_slice_back(a);
let b = cozorocks::convert_slice_back(b);
cozorocks::c_int(match compare_key(a, b) {
Ordering::Greater => 1,
Ordering::Equal => 0,
Ordering::Less => -1,
})
}
pub(crate) const DB_KEY_PREFIX_LEN: usize = 4;
lazy_static! {
pub(crate) static ref RUSTY_COMPARATOR: cozorocks::UniquePtr<cozorocks::RustComparator> = {
unsafe {
let f_ptr = rusty_cmp as *const cozorocks::c_void;
cozorocks::new_rust_comparator("cozo_rusty_cmp_v1", false, f_ptr)
}
};
}
macro_rules! return_if_resolved {
($o:expr) => {
match $o {
std::cmp::Ordering::Equal => {}
o => return o,
}
};
}
#[inline]
fn compare_key(a: &[u8], b: &[u8]) -> Ordering {
use StorageTag::*;
return_if_resolved!(a[0].cmp(&b[0]));
match StorageTag::try_from(a[0]).unwrap() {
TripleEntityAttrValue => compare_key_triple_eav(a, b),
TripleAttrEntityValue => compare_key_triple_aev(a, b),
TripleAttrValueEntity => compare_key_triple_ave(a, b),
TripleValueAttrEntity => compare_key_triple_vae(a, b),
AttrById => compare_key_attr_by_id(a, b),
AttrByKeyword => compare_key_attr_by_kw(a, b),
Tx => compare_key_tx(a, b),
UniqueEntity => compare_key_unique_entity(a, b),
UniqueAttrValue => compare_key_unique_attr_val(a, b),
}
}
#[inline]
fn compare_key_triple_eav(a: &[u8], b: &[u8]) -> Ordering {
let (a_e, a_a, a_t, a_o) = decode_ea_key(a).unwrap();
let (b_e, b_a, b_t, b_o) = decode_ea_key(b).unwrap();
return_if_resolved!(a_e.cmp(&b_e));
return_if_resolved!(a_a.cmp(&b_a));
let a_v = decode_value_from_key(a).unwrap();
let b_v = decode_value_from_key(b).unwrap();
return_if_resolved!(a_v.cmp(&b_v));
return_if_resolved!(a_t.cmp(&b_t).reverse());
a_o.cmp(&b_o)
}
#[inline]
fn compare_key_triple_aev(a: &[u8], b: &[u8]) -> Ordering {
let (a_a, a_e, a_t, a_o) = decode_ae_key(a).unwrap();
let (b_a, b_e, b_t, b_o) = decode_ae_key(b).unwrap();
return_if_resolved!(a_a.cmp(&b_a));
return_if_resolved!(a_e.cmp(&b_e));
let a_v = decode_value_from_key(a).unwrap();
let b_v = decode_value_from_key(b).unwrap();
return_if_resolved!(a_v.cmp(&b_v));
return_if_resolved!(a_t.cmp(&b_t).reverse());
a_o.cmp(&b_o)
}
#[inline]
fn compare_key_triple_ave(a: &[u8], b: &[u8]) -> Ordering {
let (a_a, a_e, a_t, a_o) = decode_ae_key(a).unwrap();
let (b_a, b_e, b_t, b_o) = decode_ae_key(b).unwrap();
return_if_resolved!(a_a.cmp(&b_a));
let a_v = decode_value_from_key(a).unwrap();
let b_v = decode_value_from_key(b).unwrap();
return_if_resolved!(a_v.cmp(&b_v));
return_if_resolved!(a_e.cmp(&b_e));
return_if_resolved!(a_t.cmp(&b_t).reverse());
a_o.cmp(&b_o)
}
#[inline]
fn compare_key_triple_vae(a: &[u8], b: &[u8]) -> Ordering {
let (a_v, a_a, a_e, a_t, a_o) = decode_vae_key(a).unwrap();
let (b_v, b_a, b_e, b_t, b_o) = decode_vae_key(b).unwrap();
return_if_resolved!(a_v.cmp(&b_v));
return_if_resolved!(a_a.cmp(&b_a));
return_if_resolved!(a_e.cmp(&b_e));
return_if_resolved!(a_t.cmp(&b_t).reverse());
a_o.cmp(&b_o)
}
#[inline]
fn compare_key_attr_by_id(a: &[u8], b: &[u8]) -> Ordering {
let (a_a, a_t, a_o) = decode_attr_key_by_id(a).unwrap();
let (b_a, b_t, b_o) = decode_attr_key_by_id(b).unwrap();
return_if_resolved!(a_a.cmp(&b_a));
return_if_resolved!(a_t.cmp(&b_t).reverse());
a_o.cmp(&b_o)
}
#[inline]
fn compare_key_attr_by_kw(a: &[u8], b: &[u8]) -> Ordering {
let (a_kw, a_t, a_o) = decode_attr_key_by_kw(a).unwrap();
let (b_kw, b_t, b_o) = decode_attr_key_by_kw(b).unwrap();
return_if_resolved!(a_kw.cmp(&b_kw));
return_if_resolved!(a_t.cmp(&b_t).reverse());
a_o.cmp(&b_o)
}
#[inline]
fn compare_key_tx(a: &[u8], b: &[u8]) -> Ordering {
let a_t = TxId::from_bytes(a);
let b_t = TxId::from_bytes(b);
a_t.cmp(&b_t).reverse()
}
#[inline]
fn compare_key_unique_entity(a: &[u8], b: &[u8]) -> Ordering {
let a_e = EntityId::from_bytes(a);
let b_e = EntityId::from_bytes(b);
a_e.cmp(&b_e)
}
#[inline]
fn compare_key_unique_attr_val(a: &[u8], b: &[u8]) -> Ordering {
let (a_a, a_v) = decode_unique_attr_val(a).unwrap();
let (b_a, b_v) = decode_unique_attr_val(b).unwrap();
return_if_resolved!(a_a.cmp(&b_a));
a_v.cmp(&b_v)
}

@ -0,0 +1,291 @@
use crate::data::encode::StorageTag::Tx;
use crate::data::id::{AttrId, EntityId, TxId};
use crate::data::keyword::Keyword;
use crate::data::triple::StoreOp;
use crate::data::value::Value;
use anyhow::Result;
use rmp_serde::Serializer;
use serde::Serialize;
use smallvec::SmallVec;
use std::ops::Deref;
#[repr(u8)]
#[derive(Ord, PartialOrd, Eq, PartialEq, Debug)]
pub(crate) enum StorageTag {
TripleEntityAttrValue = 1,
TripleAttrEntityValue = 2,
TripleAttrValueEntity = 3,
TripleValueAttrEntity = 4,
AttrById = 5,
AttrByKeyword = 6,
Tx = 7,
UniqueEntity = 8,
UniqueAttrValue = 9,
}
#[derive(Debug, thiserror::Error)]
pub enum StorageTagError {
#[error("unexpected value for StoreOp: {0}")]
UnexpectedValue(u8),
}
impl TryFrom<u8> for StorageTag {
type Error = StorageTagError;
fn try_from(value: u8) -> std::result::Result<Self, Self::Error> {
use StorageTag::*;
Ok(match value {
1 => TripleEntityAttrValue,
2 => TripleAttrEntityValue,
3 => TripleAttrValueEntity,
4 => TripleValueAttrEntity,
5 => AttrById,
6 => AttrByKeyword,
7 => Tx,
8 => UniqueEntity,
9 => UniqueAttrValue,
n => return Err(StorageTagError::UnexpectedValue(n)),
})
}
}
#[inline]
pub(crate) fn encode_value(val: Value) -> impl Deref<Target = [u8]> {
let mut ret = SmallVec::<[u8; 60]>::new();
val.serialize(&mut Serializer::new(&mut ret)).unwrap();
ret
}
#[inline]
pub(crate) fn decode_value(src: &[u8]) -> Result<Value> {
Ok(rmp_serde::from_slice(src)?)
}
#[inline]
pub(crate) fn decode_value_from_key(src: &[u8]) -> Result<Value> {
Ok(rmp_serde::from_slice(&src[20..])?)
}
/// eid: 8 bytes (incl. tag)
/// aid: 4 bytes
/// val: variable
/// tx: 8 bytes (incl. op)
#[inline]
pub(crate) fn encode_eav_key(
eid: EntityId,
aid: AttrId,
val: Value,
tx: TxId,
op: StoreOp,
) -> impl Deref<Target = [u8]> {
let mut ret = SmallVec::<[u8; 60]>::new();
ret.extend(eid.0.to_be_bytes());
ret[0] = StorageTag::TripleEntityAttrValue as u8;
ret.extend(aid.0.to_be_bytes());
ret.extend(tx.0.to_be_bytes());
ret[12] = op as u8;
debug_assert_eq!(ret.len(), 20);
val.serialize(&mut Serializer::new(&mut ret)).unwrap();
ret
}
#[inline]
pub(crate) fn decode_ea_key(src: &[u8]) -> Result<(EntityId, AttrId, TxId, StoreOp)> {
let eid = EntityId::from_bytes(&src[0..8]);
let aid = AttrId::from_bytes(&src[8..12]);
let tx = TxId::from_bytes(&src[12..20]);
let op = src[12].try_into()?;
Ok((eid, aid, tx, op))
}
/// eid: 8 bytes (incl. tag)
/// aid: 4 bytes
/// val: variable
/// tx: 8 bytes (incl. op)
#[inline]
pub(crate) fn encode_aev_key(
aid: AttrId,
eid: EntityId,
val: Value,
tx: TxId,
op: StoreOp,
) -> impl Deref<Target = [u8]> {
let mut ret = SmallVec::<[u8; 60]>::new();
ret.extend(aid.0.to_be_bytes());
ret[0] = StorageTag::TripleAttrEntityValue as u8;
ret.extend(eid.0.to_be_bytes());
ret.extend(tx.0.to_be_bytes());
ret[12] = op as u8;
debug_assert_eq!(ret.len(), 20);
val.serialize(&mut Serializer::new(&mut ret)).unwrap();
ret
}
#[inline]
pub(crate) fn decode_ae_key(src: &[u8]) -> Result<(AttrId, EntityId, TxId, StoreOp)> {
let aid = AttrId::from_bytes(&src[0..4]);
let eid = EntityId::from_bytes(&src[4..12]);
let tx = TxId::from_bytes(&src[12..20]);
let op = src[12].try_into()?;
Ok((aid, eid, tx, op))
}
/// aid: 4 bytes (incl. tag)
/// val: variable
/// eid: 8 bytes
/// tx: 8 bytes (incl. op)
#[inline]
pub(crate) fn encode_ave_key(
aid: AttrId,
val: Value,
eid: EntityId,
tx: TxId,
op: StoreOp,
) -> impl Deref<Target = [u8]> {
let mut ret = SmallVec::<[u8; 60]>::new();
ret.extend(aid.0.to_be_bytes());
ret[0] = StorageTag::TripleAttrValueEntity as u8;
ret.extend(eid.0.to_be_bytes());
ret.extend(tx.0.to_be_bytes());
ret[12] = op as u8;
debug_assert_eq!(ret.len(), 20);
val.serialize(&mut Serializer::new(&mut ret)).unwrap();
ret
}
/// val: 8 bytes (incl. tag)
/// eid: 8 bytes
/// aid: 4 bytes
/// tx: 8 bytes (incl. op)
#[inline]
pub(crate) fn encode_vae_key(
val: EntityId,
aid: AttrId,
eid: EntityId,
tx: TxId,
op: StoreOp,
) -> impl Deref<Target = [u8]> {
let mut ret = SmallVec::<[u8; 60]>::new();
ret.extend(val.0.to_be_bytes());
ret[0] = StorageTag::TripleAttrValueEntity as u8;
ret.extend(aid.0.to_be_bytes());
ret.extend(tx.0.to_be_bytes());
ret[12] = op as u8;
debug_assert_eq!(ret.len(), 20);
ret.extend(eid.0.to_be_bytes());
debug_assert_eq!(ret.len(), 28);
ret
}
#[inline]
pub(crate) fn decode_vae_key(src: &[u8]) -> Result<(EntityId, AttrId, EntityId, TxId, StoreOp)> {
let vid = EntityId::from_bytes(&src[0..8]);
let aid = AttrId::from_bytes(&src[8..12]);
let tx = TxId::from_bytes(&src[12..20]);
let eid = EntityId::from_bytes(&src[20..28]);
let op = src[12].try_into()?;
Ok((vid, aid, eid, tx, op))
}
/// aid: 4 bytes (incl. tag)
/// tx: 8 bytes (incl. op)
#[inline]
pub(crate) fn encode_attr_by_id(aid: AttrId, tx: TxId, op: StoreOp) -> impl Deref<Target = [u8]> {
let mut ret = SmallVec::<[u8; 12]>::new();
ret.extend(aid.0.to_be_bytes());
ret[0] = StorageTag::AttrById as u8;
ret.extend(tx.0.to_be_bytes());
ret[4] = op as u8;
debug_assert_eq!(ret.len(), 12);
ret
}
#[inline]
pub(crate) fn decode_attr_key_by_id(src: &[u8]) -> Result<(AttrId, TxId, StoreOp)> {
let aid = AttrId::from_bytes(&src[0..4]);
let tx = TxId::from_bytes(&src[4..12]);
let op = src[4].try_into()?;
Ok((aid, tx, op))
}
/// tag: 4 bytes (with prefix)
/// tx: 8 bytes (incl. op)
/// attr as kw: variable (segmented by \0)
#[inline]
pub(crate) fn encode_attr_by_kw(
attr_name: Keyword,
tx: TxId,
op: StoreOp,
) -> impl Deref<Target = [u8]> {
let mut ret = SmallVec::<[u8; 12]>::new();
ret.push(StorageTag::AttrByKeyword as u8);
let ns_bytes = attr_name.ns.as_bytes();
ret.push(ns_bytes.get(0).cloned().unwrap_or(0));
ret.push(ns_bytes.get(1).cloned().unwrap_or(0));
ret.push(ns_bytes.get(2).cloned().unwrap_or(0));
ret.extend(tx.0.to_be_bytes());
ret[4] = op as u8;
ret.extend_from_slice(ns_bytes);
ret.push(b'/');
ret.extend_from_slice(attr_name.ident.as_bytes());
ret
}
#[inline]
pub(crate) fn decode_attr_key_by_kw(src: &[u8]) -> Result<(Keyword, TxId, StoreOp)> {
let tx = TxId::from_bytes(&src[4..12]);
let op = src[4].try_into()?;
let kw = Keyword::try_from(&src[12..])?;
Ok((kw, tx, op))
}
/// tx: 8 bytes (incl. tag)
#[inline]
pub(crate) fn encode_tx(tx: TxId) -> impl Deref<Target = [u8]> {
let mut ret = SmallVec::<[u8; 8]>::new();
ret.extend(tx.0.to_be_bytes());
ret[0] = StorageTag::Tx as u8;
ret
}
#[inline]
pub(crate) fn encode_unique_entity_placeholder(eid: EntityId) -> impl Deref<Target = [u8]> {
let mut ret = SmallVec::<[u8; 8]>::new();
ret.extend(eid.0.to_be_bytes());
ret[0] = StorageTag::UniqueEntity as u8;
ret
}
#[inline]
pub(crate) fn encode_unique_attr_val(aid: AttrId, val: Value) -> impl Deref<Target = [u8]> {
let mut ret = SmallVec::<[u8; 60]>::new();
ret.extend(aid.0.to_be_bytes());
ret[0] = StorageTag::UniqueAttrValue as u8;
val.serialize(&mut Serializer::new(&mut ret)).unwrap();
ret
}
#[inline]
pub(crate) fn decode_unique_attr_val(src: &[u8]) -> Result<(AttrId, Value)> {
let a_id = AttrId::from_bytes(&src[..4]);
let val = rmp_serde::from_slice(&src[4..])?;
Ok((a_id, val))
}

@ -0,0 +1,62 @@
use serde_derive::{Deserialize, Serialize};
use std::fmt::{Debug, Formatter};
#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Deserialize, Serialize, Hash)]
pub struct EntityId(pub u64);
impl EntityId {
pub(crate) fn from_bytes(b: &[u8]) -> Self {
EntityId(u64::from_be_bytes([
0, b[1], b[2], b[3], b[4], b[5], b[6], b[7],
]))
}
}
impl Debug for EntityId {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "e{}", self.0)
}
}
pub(crate) const MAX_SYS_ENTITY_ID: EntityId = EntityId(1000);
pub(crate) const MAX_TEMP_ENTITY_ID: EntityId = EntityId(10_000_000);
pub(crate) const MAX_PERM_ENTITY_ID: EntityId = EntityId(0x00ff_ffff_ff00_0000);
#[derive(Clone, Copy, PartialEq, Ord, PartialOrd, Eq, Deserialize, Serialize, Hash)]
pub struct AttrId(pub u32);
impl AttrId {
pub(crate) fn from_bytes(b: &[u8]) -> Self {
AttrId(u32::from_be_bytes([0, b[1], b[2], b[3]]))
}
}
impl Debug for AttrId {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "a{}", self.0)
}
}
pub(crate) const MAX_SYS_ATTR_ID: AttrId = AttrId(1000);
pub(crate) const MAX_TEMP_ATTR_ID: AttrId = AttrId(1000000);
pub(crate) const MAX_PERM_ATTR_ID: AttrId = AttrId(0x00ff_ffff);
#[derive(Clone, Copy, PartialEq, Ord, PartialOrd, Eq, Deserialize, Serialize, Hash)]
pub struct TxId(pub u64);
impl TxId {
pub(crate) fn from_bytes(b: &[u8]) -> Self {
TxId(u64::from_be_bytes([
0, b[1], b[2], b[3], b[4], b[5], b[6], b[7],
]))
}
}
impl Debug for TxId {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "t{}", self.0)
}
}
pub(crate) const MAX_SYS_TX_ID: TxId = TxId(1000);
pub(crate) const MAX_USER_TX_ID: TxId = TxId(0x00ff_ffff_ffff_ffff);

@ -0,0 +1,43 @@
use serde_derive::{Deserialize, Serialize};
use smartstring::{LazyCompact, SmartString};
use std::str::Utf8Error;
#[derive(Debug, thiserror::Error)]
pub enum KeywordError {
#[error("invalid keyword: {0}")]
InvalidKeyword(String),
#[error(transparent)]
Utf8(#[from] Utf8Error),
}
#[derive(Clone, PartialEq, PartialOrd, Eq, Ord, Debug, Deserialize, Serialize)]
pub struct Keyword {
pub(crate) ns: SmartString<LazyCompact>,
pub(crate) ident: SmartString<LazyCompact>,
}
impl TryFrom<&str> for Keyword {
type Error = KeywordError;
fn try_from(value: &str) -> Result<Self, Self::Error> {
let make_err = || KeywordError::InvalidKeyword(value.to_string());
let mut kw_iter = value.split('/');
let ns = kw_iter.next().ok_or_else(make_err)?;
let ident = kw_iter.next().ok_or_else(make_err)?;
if kw_iter.next().is_none() {
Ok(Keyword {
ns: ns.into(),
ident: ident.into(),
})
} else {
Err(make_err())
}
}
}
impl TryFrom<&[u8]> for Keyword {
type Error = KeywordError;
fn try_from(value: &[u8]) -> Result<Self, Self::Error> {
std::str::from_utf8(value)?.try_into()
}
}

@ -0,0 +1,7 @@
pub(crate) mod compare;
pub(crate) mod id;
pub(crate) mod value;
pub(crate) mod triple;
pub(crate) mod tx;
pub(crate) mod encode;
pub(crate) mod keyword;

@ -0,0 +1,26 @@
use serde_derive::{Deserialize, Serialize};
#[derive(Debug, thiserror::Error)]
pub enum StoreOpError {
#[error("unexpected value for StoreOp: {0}")]
UnexpectedValue(u8),
}
#[repr(u8)]
#[derive(Clone, Copy, PartialEq, Ord, PartialOrd, Eq, Debug, Deserialize, Serialize)]
pub enum StoreOp {
Retract = 0,
Assert = 1,
}
impl TryFrom<u8> for StoreOp {
type Error = StoreOpError;
fn try_from(u: u8) -> Result<Self, Self::Error> {
match u {
0 => Ok(StoreOp::Retract),
1 => Ok(StoreOp::Assert),
n => Err(StoreOpError::UnexpectedValue(n)),
}
}
}

@ -0,0 +1,45 @@
use crate::data::id::{AttrId, EntityId, TxId};
use crate::data::keyword::Keyword;
use ordered_float::OrderedFloat;
use serde_derive::{Deserialize, Serialize};
use std::borrow::Cow;
use std::cmp::Reverse;
use std::fmt::Debug;
use uuid::Uuid;
#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug, Deserialize, Serialize)]
pub enum Value<'a> {
#[serde(rename = "n")]
Null,
#[serde(rename = "b")]
Bool(bool),
#[serde(rename = "e")]
EnId(EntityId),
#[serde(rename = "a")]
AtId(AttrId),
#[serde(rename = "t")]
TxId(TxId),
#[serde(rename = "i")]
Int(i64),
#[serde(rename = "f")]
Float(OrderedFloat<f64>),
#[serde(rename = "k")]
Keyword(Keyword),
#[serde(borrow)]
#[serde(rename = "s")]
String(Cow<'a, str>),
#[serde(rename = "u")]
Uuid(Uuid),
#[serde(rename = "m")]
Timestamp(i64),
#[serde(borrow)]
#[serde(rename = "v")]
Bytes(Cow<'a, [u8]>),
#[serde(rename = "z")]
Tuple(Box<[Value<'a>]>),
#[serde(rename = "o")]
DescVal(Reverse<Box<Value<'a>>>),
#[serde(rename = "r")]
Bottom,
}

@ -1,30 +1,9 @@
use cozorocks::DbStatus;
use lazy_static::lazy_static;
#[cfg(not(target_env = "msvc"))]
use tikv_jemallocator::Jemalloc;
#[no_mangle]
extern "C" fn rusty_cmp(a: &cozorocks::Slice, b: &cozorocks::Slice) -> cozorocks::c_int {
dbg!(cozorocks::convert_slice_back(a));
dbg!(cozorocks::convert_slice_back(b));
cozorocks::c_int(0)
}
#[cfg(not(target_env = "msvc"))]
#[global_allocator]
static GLOBAL: Jemalloc = Jemalloc;
lazy_static! {
static ref RUSTY_COMPARATOR: cozorocks::UniquePtr<cozorocks::RustComparator> = {
unsafe {
let f_ptr = rusty_cmp as *const cozorocks::c_void;
cozorocks::new_rust_comparator("hello", false, f_ptr)
}
};
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_it() {
let a = cozorocks::convert_slice(&[1, 2, 3, 4]);
let b = cozorocks::convert_slice(&[4, 5, 6, 7]);
assert_eq!(RUSTY_COMPARATOR.Compare(&a, &b), cozorocks::c_int(0));
}
}
pub(crate) mod data;
pub(crate) mod runtime;

@ -0,0 +1,63 @@
use crate::data::compare::DB_KEY_PREFIX_LEN;
use anyhow::Result;
use cozorocks::*;
use std::sync::atomic::{AtomicU32, AtomicU64};
use std::sync::{Arc, Mutex};
pub struct DbInstance {
pub destroy_on_close: bool,
db: SharedPtr<DbBridge>,
db_opts: UniquePtr<Options>,
tdb_opts: Option<UniquePtr<TransactionDBOptions>>,
odb_opts: Option<UniquePtr<OptimisticTransactionDBOptions>>,
path: String,
last_attr_id: Arc<AtomicU32>,
last_ent_id: Arc<AtomicU64>,
last_tx_id: Arc<AtomicU64>,
sessions: Mutex<Vec<Arc<Mutex<SessionHandle>>>>,
}
struct SessionHandle {
id: usize,
temp: SharedPtr<DbBridge>,
status: SessionStatus,
}
#[derive(Eq, PartialEq, Debug, Clone, Copy)]
pub enum SessionStatus {
Prepared,
Running,
Completed,
}
impl DbInstance {
pub fn new(path: &str, optimistic: bool, destroy_on_close: bool) -> Result<Self> {
let mut db_opts = Options::new().within_unique_ptr();
set_opts_create_if_missing(db_opts.pin_mut(), true);
set_opts_bloom_filter(db_opts.pin_mut(), 10., true);
set_opts_capped_prefix_extractor(db_opts.pin_mut(), DB_KEY_PREFIX_LEN);
let (db, tdb_opts, odb_opts) = if optimistic {
let o = new_odb_opts();
let db = DbBridge::new_odb(path, &db_opts, &o)?;
(db, None, Some(o))
} else {
let o = new_odb_opts();
let db = DbBridge::new_odb(path, &db_opts, &o)?;
(db, Some(new_tdb_opts()), None)
};
Ok(Self {
db,
db_opts,
tdb_opts,
odb_opts,
path: path.to_string(),
destroy_on_close,
last_attr_id: Arc::new(Default::default()),
last_ent_id: Arc::new(Default::default()),
last_tx_id: Arc::new(Default::default()),
sessions: Mutex::new(vec![]),
})
}
}

@ -0,0 +1 @@
mod instance;
Loading…
Cancel
Save