complete attr tx

main
Ziyang Hu 2 years ago
parent ef72c757d1
commit b2bd86f723

@ -6,7 +6,7 @@
#include "db.h"
#include "cozorocks/src/bridge/mod.rs.h"
shared_ptr<RocksDbBridge> open_db(const DbOpts &opts, RocksDbStatus &status) {
shared_ptr<RocksDbBridge> open_db(const DbOpts &opts, RocksDbStatus &status, bool use_cmp, RustComparatorFn cmp_impl) {
auto options = make_unique<Options>();
if (opts.prepare_for_bulk_load) {
options->PrepareForBulkLoad();
@ -38,11 +38,11 @@ shared_ptr<RocksDbBridge> open_db(const DbOpts &opts, RocksDbStatus &status) {
options->prefix_extractor.reset(NewFixedPrefixTransform(opts.fixed_prefix_extractor_len));
}
RustComparator *cmp = nullptr;
if (opts.comparator_impl != nullptr) {
if (use_cmp) {
cmp = new RustComparator(
string(opts.comparator_name),
opts.comparator_different_bytes_can_be_equal,
opts.comparator_impl);
cmp_impl);
options->comparator = cmp;
}

@ -49,15 +49,15 @@ struct PessimisticRocksDb : public RocksDbBridge {
virtual ~PessimisticRocksDb();
};
typedef int8_t (*CmpFn)(RustBytes a, RustBytes b);
//typedef int8_t (*CmpFn)(RustBytes a, RustBytes b);
typedef rust::Fn<std::int8_t(rust::Slice<const std::uint8_t>, rust::Slice<const std::uint8_t>)> RustComparatorFn;
class RustComparator : public Comparator {
public:
inline RustComparator(string name_, bool can_different_bytes_be_equal_, uint8_t const *const f) :
inline RustComparator(string name_, bool can_different_bytes_be_equal_, RustComparatorFn f) :
name(std::move(name_)),
ext_cmp(f),
can_different_bytes_be_equal(can_different_bytes_be_equal_) {
auto f_ = CmpFn(f);
ext_cmp = f_;
}
[[nodiscard]] inline int Compare(const Slice &a, const Slice &b) const override {
@ -77,10 +77,10 @@ public:
inline void FindShortSuccessor(string *) const override {}
string name;
CmpFn ext_cmp;
RustComparatorFn ext_cmp;
bool can_different_bytes_be_equal;
};
shared_ptr<RocksDbBridge> open_db(const DbOpts &opts, RocksDbStatus &status);
shared_ptr<RocksDbBridge> open_db(const DbOpts &opts, RocksDbStatus &status, bool use_cmp, RustComparatorFn cmp_impl);
#endif //COZOROCKS_DB_H

@ -12,11 +12,14 @@
struct IterBridge {
Transaction *tx;
unique_ptr<Iterator> iter;
string lower_storage;
string upper_storage;
Slice lower_bound;
Slice upper_bound;
unique_ptr<ReadOptions> r_opts;
explicit IterBridge(Transaction *tx_) : tx(tx_), iter(nullptr), lower_bound(), upper_bound(), r_opts(new ReadOptions) {
explicit IterBridge(Transaction *tx_) : tx(tx_), iter(nullptr), lower_bound(), upper_bound(),
r_opts(new ReadOptions) {
r_opts->ignore_range_deletions = true;
r_opts->auto_prefix_mode = true;
}
@ -61,12 +64,14 @@ struct IterBridge {
}
inline void set_lower_bound(RustBytes bound) {
lower_bound = convert_slice(bound);
lower_storage = convert_slice_to_string(bound);
lower_bound = lower_storage;
r_opts->iterate_lower_bound = &lower_bound;
}
inline void set_upper_bound(RustBytes bound) {
upper_bound = convert_slice(bound);
upper_storage = convert_slice_to_string(bound);
upper_bound = upper_storage;
r_opts->iterate_upper_bound = &upper_bound;
}

@ -11,6 +11,10 @@ inline Slice convert_slice(RustBytes d) {
return {reinterpret_cast<const char *>(d.data()), d.size()};
}
inline string convert_slice_to_string(RustBytes d) {
return {reinterpret_cast<const char *>(d.data()), d.size()};
}
inline RustBytes convert_slice_back(const Slice &s) {
return {reinterpret_cast<const std::uint8_t *>(s.data()), s.size()};
}

@ -2,10 +2,10 @@ use crate::bridge::ffi::*;
use crate::bridge::tx::TxBuilder;
use cxx::*;
use std::borrow::Cow;
use std::ptr::null;
#[derive(Default, Debug)]
#[derive(Default)]
pub struct DbBuilder<'a> {
cmp_fn: Option<fn(&[u8], &[u8]) -> i8>,
opts: DbOpts<'a>,
}
@ -30,7 +30,6 @@ impl<'a> Default for DbOpts<'a> {
capped_prefix_extractor_len: 0,
use_fixed_prefix_extractor: false,
fixed_prefix_extractor_len: 0,
comparator_impl: null(),
comparator_name: "",
comparator_different_bytes_can_be_equal: false,
destroy_on_exit: false,
@ -104,11 +103,11 @@ impl<'a> DbBuilder<'a> {
pub fn use_custom_comparator(
mut self,
name: &'a str,
cmp: extern "C" fn(&[u8], &[u8]) -> i8,
cmp: fn(&[u8], &[u8]) -> i8,
different_bytes_can_be_equal: bool,
) -> Self {
self.cmp_fn = Some(cmp);
self.opts.comparator_name = name;
self.opts.comparator_impl = cmp as *const u8;
self.opts.comparator_different_bytes_can_be_equal = different_bytes_can_be_equal;
self
}
@ -118,7 +117,17 @@ impl<'a> DbBuilder<'a> {
}
pub fn build(self) -> Result<RocksDb, RocksDbStatus> {
let mut status = RocksDbStatus::default();
let result = open_db(&self.opts, &mut status);
fn dummy(_a: &[u8], _b: &[u8]) -> i8 {
0
}
let result = open_db(
&self.opts,
&mut status,
self.cmp_fn.is_some(),
self.cmp_fn.unwrap_or(dummy),
);
if status.is_ok() {
Ok(RocksDb { inner: result })
} else {

@ -27,7 +27,6 @@ pub(crate) mod ffi {
pub capped_prefix_extractor_len: usize,
pub use_fixed_prefix_extractor: bool,
pub fixed_prefix_extractor_len: usize,
pub comparator_impl: *const u8,
pub comparator_name: &'a str,
pub comparator_different_bytes_can_be_equal: bool,
pub destroy_on_exit: bool,
@ -110,7 +109,12 @@ pub(crate) mod ffi {
type RocksDbBridge;
fn get_db_path(self: &RocksDbBridge) -> &CxxString;
fn open_db(builder: &DbOpts, status: &mut RocksDbStatus) -> SharedPtr<RocksDbBridge>;
fn open_db(
builder: &DbOpts,
status: &mut RocksDbStatus,
use_cmp: bool,
cmp_impl: fn(&[u8], &[u8]) -> i8,
) -> SharedPtr<RocksDbBridge>;
fn transact(self: &RocksDbBridge) -> UniquePtr<TxBridge>;
type TxBridge;

@ -4,9 +4,7 @@ use crate::data::encode::{
};
use std::cmp::Ordering;
#[allow(improper_ctypes_definitions)]
#[no_mangle]
pub(crate) extern "C" fn rusty_cmp(a: &[u8], b: &[u8]) -> i8 {
pub(crate) fn rusty_cmp(a: &[u8], b: &[u8]) -> i8 {
match compare_key(a, b) {
Ordering::Greater => 1,
Ordering::Equal => 0,
@ -31,7 +29,14 @@ fn compare_key(a: &[u8], b: &[u8]) -> Ordering {
return_if_resolved!(a[0].cmp(&b[0]));
match StorageTag::try_from(a[0]).unwrap() {
let tag = match StorageTag::try_from(a[0]) {
Ok(tag) => tag,
Err(e) => {
panic!("comparison failed with {:?} for {:x?}, {:x?}", e, a, b)
}
};
match tag {
TripleEntityAttrValue => compare_key_triple_eav(a, b),
TripleAttrEntityValue => compare_key_triple_aev(a, b),
TripleAttrValueEntity => compare_key_triple_ave(a, b),
@ -108,6 +113,8 @@ fn compare_key_triple_vae(a: &[u8], b: &[u8]) -> Ordering {
#[inline]
fn compare_key_attr_by_id(a: &[u8], b: &[u8]) -> Ordering {
debug_assert_eq!(a[0], StorageTag::AttrById as u8);
debug_assert_eq!(b[0], StorageTag::AttrById as u8);
let (a_a, a_t, a_o) = decode_attr_key_by_id(a).unwrap();
let (b_a, b_t, b_o) = decode_attr_key_by_id(b).unwrap();

@ -6,6 +6,7 @@ use anyhow::Result;
use rmp_serde::Serializer;
use serde::Serialize;
use smallvec::SmallVec;
use std::fmt::{Debug, Formatter};
use std::ops::{Deref, DerefMut};
#[repr(u8)]
@ -34,6 +35,74 @@ pub(crate) struct Encoded<const N: usize> {
pub(crate) inner: SmallVec<[u8; N]>,
}
impl Encoded<LARGE_VEC_SIZE> {
pub(crate) fn new(data: &[u8]) -> Self {
Self {
inner: SmallVec::from_slice(data),
}
}
}
impl<const N: usize> Debug for Encoded<N> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match StorageTag::try_from(self.inner[0]) {
Err(_) => {
write!(
f,
"?{:x?} {}",
self.inner,
String::from_utf8_lossy(&self.inner)
)
}
Ok(tag) => {
write!(f, "{:?} ", tag)?;
match tag {
StorageTag::TripleEntityAttrValue => {
let (e, a, t, s) = decode_ea_key(self).unwrap();
let v = decode_value_from_key(self).unwrap();
write!(f, "[{:?}, {:?}, {:?}, {:?}, {:?}]", e, a, v, t, s)
}
StorageTag::TripleAttrEntityValue | StorageTag::TripleAttrValueEntity => {
let (a, e, t, s) = decode_ae_key(self).unwrap();
let v = decode_value_from_key(self).unwrap();
write!(f, "[{:?}, {:?}, {:?}, {:?}, {:?}]", e, a, v, t, s)
}
StorageTag::TripleValueAttrEntity => {
let (v, a, e, t, s) = decode_vae_key(self).unwrap();
write!(f, "[{:?}, {:?}, {:?}, {:?}, {:?}]", e, a, v, t, s)
}
StorageTag::AttrById => {
debug_assert_eq!(self[0], StorageTag::AttrById as u8);
let (a, t, s) = decode_attr_key_by_id(self).unwrap();
write!(f, "<{:?}, {:?}, {:?}>", a, t, s)
}
StorageTag::AttrByKeyword => {
let (a, t, s) = decode_attr_key_by_kw(self).unwrap();
write!(f, "<{:?}, {:?}, {:?}>", a, t, s)
}
StorageTag::Tx => {
write!(f, "{:?}", TxId::from_bytes(self))
}
StorageTag::UniqueEntity => {
write!(f, "{:?}", EntityId::from_bytes(self))
}
StorageTag::UniqueAttrValue => {
let (a, v) = decode_unique_attr_val(self).unwrap();
write!(f, "<{:?}: {:?}>", a, v)
}
StorageTag::UniqueAttrById => {
write!(f, "{:?}", AttrId::from_bytes(self))
}
StorageTag::UniqueAttrByKeyword => {
let kw = decode_unique_attr_by_kw(self).unwrap();
write!(f, "{:?}", kw)
}
}
}
}
}
}
impl<const N: usize> Encoded<N> {
pub(crate) fn encoded_entity_amend_tx(&mut self, tx: TxId) {
let tx_bytes = tx.0.to_be_bytes();
@ -98,6 +167,8 @@ impl TryFrom<u8> for StorageTag {
7 => Tx,
8 => UniqueEntity,
9 => UniqueAttrValue,
10 => UniqueAttrById,
11 => UniqueAttrByKeyword,
n => return Err(StorageTagError::UnexpectedValue(n)),
})
}
@ -243,7 +314,7 @@ pub(crate) fn encode_vae_key(
let mut ret = SmallVec::<[u8; LARGE_VEC_SIZE]>::new();
ret.extend(val.0.to_be_bytes());
ret[0] = StorageTag::TripleAttrValueEntity as u8;
ret[0] = StorageTag::TripleValueAttrEntity as u8;
ret.extend(aid.0.to_be_bytes());
ret.extend(tx.0.to_be_bytes());
@ -281,6 +352,7 @@ pub(crate) fn encode_attr_by_id(aid: AttrId, tx: TxId, op: StoreOp) -> Encoded<V
#[inline]
pub(crate) fn decode_attr_key_by_id(src: &[u8]) -> Result<(AttrId, TxId, StoreOp)> {
debug_assert_eq!(src[0], StorageTag::AttrById as u8);
let aid = AttrId::from_bytes(&src[0..VEC_SIZE_8]);
let tx = TxId::from_bytes(&src[VEC_SIZE_8..VEC_SIZE_16]);
let op = src[VEC_SIZE_8].try_into()?;

@ -61,8 +61,8 @@ impl Debug for AttrId {
pub struct TxId(pub u64);
impl TxId {
pub(crate) const MAX_SYS: TxId = TxId(1000);
pub(crate) const MIN_USER: TxId = TxId(1001);
pub(crate) const MAX_SYS: TxId = TxId(10000);
pub(crate) const MIN_USER: TxId = TxId(10001);
pub(crate) const MAX_USER: TxId = TxId(0x00ff_ffff_ffff_ffff);
pub(crate) fn from_bytes(b: &[u8]) -> Self {

@ -1,9 +1,9 @@
pub(crate) mod attr;
pub(crate) mod compare;
pub(crate) mod encode;
pub(crate) mod id;
pub(crate) mod value;
pub(crate) mod json;
pub(crate) mod keyword;
pub(crate) mod triple;
pub(crate) mod tx;
pub(crate) mod encode;
pub(crate) mod keyword;
pub(crate) mod attr;
pub(crate) mod json;
pub(crate) mod value;

@ -14,13 +14,12 @@ pub enum StoreOp {
}
impl StoreOp {
pub(crate) fn is_assert(&self) -> bool{
pub(crate) fn is_assert(&self) -> bool {
*self == StoreOp::Assert
}
pub(crate) fn is_retract(&self) -> bool{
pub(crate) fn is_retract(&self) -> bool {
*self == StoreOp::Retract
}
}
impl TryFrom<u8> for StoreOp {

@ -11,5 +11,6 @@ mod tests;
pub(crate) mod data;
pub(crate) mod runtime;
pub(crate) mod transact;
pub(crate) mod utils;
pub use runtime::instance::Db;
pub use runtime::instance::Db;

@ -8,7 +8,7 @@ use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::sync::Arc;
pub struct Db {
db: RocksDb,
pub(crate) db: RocksDb,
last_attr_id: Arc<AtomicU64>,
last_ent_id: Arc<AtomicU64>,
last_tx_id: Arc<AtomicU64>,

@ -1,3 +1,2 @@
pub(crate) mod instance;
pub(crate) mod transact;

@ -38,7 +38,7 @@ impl SessionTx {
pub(crate) fn load_last_tx_id(&mut self) -> Result<TxId> {
let e_lower = encode_tx(TxId::MAX_USER);
let e_upper = encode_tx(TxId::MIN_USER);
let e_upper = encode_tx(TxId::MAX_SYS);
let it = self.bounded_scan_first(&e_lower, &e_upper);
Ok(match it.key()? {
None => TxId::MAX_SYS,

@ -1,4 +1,9 @@
use crate::data::attr::{Attribute, AttributeCardinality, AttributeIndex, AttributeTyping};
use crate::data::encode::Encoded;
use crate::data::id::{AttrId, TxId};
use crate::data::keyword::Keyword;
use crate::Db;
use anyhow::Result;
use cozorocks::DbBuilder;
fn create_db(name: &str) -> Db {
@ -16,5 +21,48 @@ fn creation() {
let db = create_db("_test_db");
test_send_sync(&db);
let session = db.new_session().unwrap();
let mut tx = session.transact(None).unwrap();
assert_eq!(
0,
tx.all_attrs()
.collect::<Result<Vec<Attribute>>>()
.unwrap()
.len()
);
let mut tx = session.transact_write().unwrap();
tx.new_attr(Attribute {
id: AttrId(0),
alias: Keyword::try_from("hello/world").unwrap(),
cardinality: AttributeCardinality::One,
val_type: AttributeTyping::Ref,
indexing: AttributeIndex::None,
with_history: true,
})
.unwrap();
tx.commit_tx("", false).unwrap();
let mut tx = session.transact(None).unwrap();
// tx.r_tx_id = TxId(10000);
let found = tx
.attr_by_kw(&Keyword::try_from("hello/world").unwrap())
.unwrap();
dbg!(found);
for attr in tx.all_attrs() {
dbg!(attr.unwrap());
}
dbg!(&session);
dbg!(tx.r_tx_id);
let mut it = session
.db
.transact()
.start()
.iterator()
.total_order_seek(true)
.start();
it.seek_to_start();
while let Some(k) = it.key().unwrap() {
dbg!(Encoded::new(k));
it.next();
// dbg!("loop");
}
}

@ -1,14 +1,16 @@
use crate::data::attr::Attribute;
use crate::data::encode::{
decode_attr_key_by_id, decode_attr_key_by_kw, encode_attr_by_id, encode_attr_by_kw,
encode_unique_attr_by_id, encode_unique_attr_by_kw,
encode_unique_attr_by_id, encode_unique_attr_by_kw, StorageTag,
};
use crate::data::id::{AttrId, TxId};
use crate::data::id::{AttrId, TxId};
use crate::data::keyword::Keyword;
use crate::data::triple::StoreOp;
use crate::runtime::transact::{SessionTx, TransactError};
use crate::utils::swap_option_result;
use anyhow::Result;
use std::sync::atomic::{ Ordering};
use cozorocks::{DbIter, IterBuilder};
use std::sync::atomic::Ordering;
impl SessionTx {
pub(crate) fn attr_by_id(&mut self, aid: AttrId) -> Result<Option<Attribute>> {
@ -18,6 +20,7 @@ impl SessionTx {
Ok(match it.pair()? {
None => None,
Some((k, v)) => {
debug_assert_eq!(k[0], StorageTag::AttrById as u8);
let (_, _, op) = decode_attr_key_by_id(k)?;
if op.is_retract() {
None
@ -29,8 +32,8 @@ impl SessionTx {
}
pub(crate) fn attr_by_kw(&mut self, kw: &Keyword) -> Result<Option<Attribute>> {
let anchor = encode_attr_by_kw(&kw, self.r_tx_id, StoreOp::Retract);
let upper = encode_attr_by_kw(&kw, TxId::MAX_SYS, StoreOp::Assert);
let anchor = encode_attr_by_kw(kw, self.r_tx_id, StoreOp::Retract);
let upper = encode_attr_by_kw(kw, TxId::MAX_SYS, StoreOp::Assert);
let it = self.bounded_scan_first(&anchor, &upper);
Ok(match it.pair()? {
None => None,
@ -45,8 +48,8 @@ impl SessionTx {
})
}
pub(crate) fn all_attrs(&mut self) -> Result<Vec<Attribute>> {
todo!()
pub(crate) fn all_attrs(&mut self) -> impl Iterator<Item = Result<Attribute>> {
AttrIter::new(self.tx.iterator(), self.r_tx_id)
}
/// conflict if new attribute has same name as existing one
@ -96,7 +99,7 @@ impl SessionTx {
self.tx.put(&id_encoded, &attr_data)?;
let kw_encoded = encode_attr_by_kw(&attr.alias, tx_id, StoreOp::Assert);
self.tx.put(&kw_encoded, &attr_data)?;
self.put_attr_guard(&attr)?;
self.put_attr_guard(attr)?;
Ok(())
}
@ -130,3 +133,56 @@ impl SessionTx {
Ok(())
}
}
struct AttrIter {
it: DbIter,
tx_bound: TxId,
last_found: Option<AttrId>,
}
impl AttrIter {
fn new(builder: IterBuilder, tx_bound: TxId) -> Self {
let upper_bound = encode_attr_by_id(AttrId::MAX_PERM, TxId::MAX_SYS, StoreOp::Assert);
let it = builder.upper_bound(&upper_bound).start();
Self {
it,
tx_bound,
last_found: None,
}
}
fn next_inner(&mut self) -> Result<Option<Attribute>> {
loop {
let id_to_seek = match self.last_found {
None => AttrId::MIN_PERM,
Some(id) => AttrId(id.0 + 1),
};
let encoded = encode_attr_by_id(id_to_seek, self.tx_bound, StoreOp::Retract);
self.it.seek(&encoded);
match self.it.pair()? {
None => return Ok(None),
Some((k, v)) => {
debug_assert_eq!(k[0], StorageTag::AttrById as u8);
let (found_aid, found_tid, found_op) = decode_attr_key_by_id(k)?;
if found_tid > self.tx_bound {
self.last_found = Some(AttrId(found_aid.0 - 1));
continue;
}
self.last_found = Some(AttrId(found_aid.0));
if found_op.is_retract() {
continue;
}
return Ok(Some(Attribute::decode(v)?));
}
}
}
}
}
impl Iterator for AttrIter {
type Item = Result<Attribute>;
fn next(&mut self) -> Option<Self::Item> {
swap_option_result(self.next_inner())
}
}

@ -9,4 +9,4 @@
// use anyhow::Result;
// use cozorocks::{DbIter, Tx};
// use std::sync::atomic::{AtomicU64, Ordering};
// use std::sync::Arc;
// use std::sync::Arc;

@ -0,0 +1,8 @@
#[inline(always)]
pub(crate) fn swap_option_result<T, E>(d: Result<Option<T>, E>) -> Option<Result<T, E>> {
match d {
Ok(Some(s)) => Some(Ok(s)),
Ok(None) => None,
Err(e) => Some(Err(e)),
}
}
Loading…
Cancel
Save