change encoding scheme

main
Ziyang Hu 2 years ago
parent aee54ecdd0
commit 6e145a9a93

@ -18,6 +18,7 @@ struct IterBridge {
IterBridge(Transaction *tx_) : tx(tx_), iter(nullptr), lower_bound(), upper_bound(), r_opts(new ReadOptions) {
r_opts->ignore_range_deletions = true;
r_opts->auto_prefix_mode = true;
}
// inline ReadOptions &get_r_opts() {

@ -14,7 +14,7 @@ extern "C" fn rusty_cmp(a: &[u8], b: &[u8]) -> i8 {
}
}
pub(crate) const DB_KEY_PREFIX_LEN: usize = 4;
pub(crate) const DB_KEY_PREFIX_LEN: usize = 8;
macro_rules! return_if_resolved {
($o:expr) => {

@ -39,7 +39,7 @@ impl<const N: usize> Encoded<N> {
let tx_bytes = tx.0.to_be_bytes();
#[allow(clippy::needless_range_loop)]
for i in 1..8 {
self.inner[VEC_SIZE_12 + i] = tx_bytes[i];
self.inner[VEC_SIZE_16 + i] = tx_bytes[i];
}
}
pub(crate) fn encoded_entity_amend_tx_to_last(&mut self) {
@ -53,7 +53,7 @@ impl<const N: usize> Encoded<N> {
let tx_bytes = tx.0.to_be_bytes();
#[allow(clippy::needless_range_loop)]
for i in 1..8 {
self.inner[VEC_SIZE_4 + i] = tx_bytes[i];
self.inner[VEC_SIZE_8 + i] = tx_bytes[i];
}
}
pub(crate) fn encoded_attr_amend_tx_to_last(&mut self) {
@ -104,11 +104,10 @@ impl TryFrom<u8> for StorageTag {
}
const LARGE_VEC_SIZE: usize = 60;
const VEC_SIZE_28: usize = 28;
const VEC_SIZE_20: usize = 20;
const VEC_SIZE_12: usize = 12;
const VEC_SIZE_32: usize = 32;
const VEC_SIZE_24: usize = 24;
const VEC_SIZE_16: usize = 16;
const VEC_SIZE_8: usize = 8;
const VEC_SIZE_4: usize = 4;
#[inline]
pub(crate) fn encode_value(val: Value) -> Encoded<LARGE_VEC_SIZE> {
@ -124,11 +123,11 @@ pub(crate) fn decode_value(src: &[u8]) -> Result<Value> {
#[inline]
pub(crate) fn decode_value_from_key(src: &[u8]) -> Result<Value> {
Ok(rmp_serde::from_slice(&src[VEC_SIZE_20..])?)
Ok(rmp_serde::from_slice(&src[VEC_SIZE_24..])?)
}
/// eid: 8 bytes (incl. tag)
/// aid: 4 bytes
/// aid: 8 bytes
/// val: variable
/// tx: 8 bytes (incl. op)
#[inline]
@ -147,8 +146,8 @@ pub(crate) fn encode_eav_key(
ret.extend(aid.0.to_be_bytes());
ret.extend(tx.0.to_be_bytes());
ret[VEC_SIZE_12] = op as u8;
debug_assert_eq!(ret.len(), VEC_SIZE_20);
ret[VEC_SIZE_16] = op as u8;
debug_assert_eq!(ret.len(), VEC_SIZE_24);
val.serialize(&mut Serializer::new(&mut ret)).unwrap();
@ -158,15 +157,15 @@ pub(crate) fn encode_eav_key(
#[inline]
pub(crate) fn decode_ea_key(src: &[u8]) -> Result<(EntityId, AttrId, TxId, StoreOp)> {
let eid = EntityId::from_bytes(&src[0..VEC_SIZE_8]);
let aid = AttrId::from_bytes(&src[VEC_SIZE_8..VEC_SIZE_12]);
let tx = TxId::from_bytes(&src[VEC_SIZE_12..VEC_SIZE_20]);
let op = src[VEC_SIZE_12].try_into()?;
let aid = AttrId::from_bytes(&src[VEC_SIZE_8..VEC_SIZE_16]);
let tx = TxId::from_bytes(&src[VEC_SIZE_16..VEC_SIZE_24]);
let op = src[VEC_SIZE_16].try_into()?;
Ok((eid, aid, tx, op))
}
/// eid: 8 bytes (incl. tag)
/// aid: 4 bytes
/// aid: 8 bytes
/// val: variable
/// tx: 8 bytes (incl. op)
#[inline]
@ -184,8 +183,8 @@ pub(crate) fn encode_aev_key(
ret.extend(eid.0.to_be_bytes());
ret.extend(tx.0.to_be_bytes());
ret[VEC_SIZE_12] = op as u8;
debug_assert_eq!(ret.len(), VEC_SIZE_20);
ret[VEC_SIZE_16] = op as u8;
debug_assert_eq!(ret.len(), VEC_SIZE_24);
val.serialize(&mut Serializer::new(&mut ret)).unwrap();
@ -194,15 +193,15 @@ pub(crate) fn encode_aev_key(
#[inline]
pub(crate) fn decode_ae_key(src: &[u8]) -> Result<(AttrId, EntityId, TxId, StoreOp)> {
let aid = AttrId::from_bytes(&src[0..VEC_SIZE_4]);
let eid = EntityId::from_bytes(&src[VEC_SIZE_4..VEC_SIZE_12]);
let tx = TxId::from_bytes(&src[VEC_SIZE_12..VEC_SIZE_20]);
let op = src[VEC_SIZE_12].try_into()?;
let aid = AttrId::from_bytes(&src[0..VEC_SIZE_8]);
let eid = EntityId::from_bytes(&src[VEC_SIZE_8..VEC_SIZE_16]);
let tx = TxId::from_bytes(&src[VEC_SIZE_16..VEC_SIZE_24]);
let op = src[VEC_SIZE_16].try_into()?;
Ok((aid, eid, tx, op))
}
/// aid: 4 bytes (incl. tag)
/// aid: 8 bytes (incl. tag)
/// val: variable
/// eid: 8 bytes
/// tx: 8 bytes (incl. op)
@ -221,8 +220,8 @@ pub(crate) fn encode_ave_key(
ret.extend(eid.0.to_be_bytes());
ret.extend(tx.0.to_be_bytes());
ret[VEC_SIZE_12] = op as u8;
debug_assert_eq!(ret.len(), VEC_SIZE_20);
ret[VEC_SIZE_16] = op as u8;
debug_assert_eq!(ret.len(), VEC_SIZE_24);
val.serialize(&mut Serializer::new(&mut ret)).unwrap();
@ -231,7 +230,7 @@ pub(crate) fn encode_ave_key(
/// val: 8 bytes (incl. tag)
/// eid: 8 bytes
/// aid: 4 bytes
/// aid: 8 bytes
/// tx: 8 bytes (incl. op)
#[inline]
pub(crate) fn encode_vae_key(
@ -248,10 +247,10 @@ pub(crate) fn encode_vae_key(
ret.extend(aid.0.to_be_bytes());
ret.extend(tx.0.to_be_bytes());
ret[VEC_SIZE_12] = op as u8;
debug_assert_eq!(ret.len(), VEC_SIZE_20);
ret[VEC_SIZE_16] = op as u8;
debug_assert_eq!(ret.len(), VEC_SIZE_24);
ret.extend(eid.0.to_be_bytes());
debug_assert_eq!(ret.len(), VEC_SIZE_28);
debug_assert_eq!(ret.len(), VEC_SIZE_32);
ret.into()
}
@ -259,48 +258,52 @@ pub(crate) fn encode_vae_key(
#[inline]
pub(crate) fn decode_vae_key(src: &[u8]) -> Result<(EntityId, AttrId, EntityId, TxId, StoreOp)> {
let vid = EntityId::from_bytes(&src[0..VEC_SIZE_8]);
let aid = AttrId::from_bytes(&src[VEC_SIZE_8..VEC_SIZE_12]);
let tx = TxId::from_bytes(&src[VEC_SIZE_12..VEC_SIZE_20]);
let eid = EntityId::from_bytes(&src[VEC_SIZE_20..VEC_SIZE_28]);
let op = src[VEC_SIZE_12].try_into()?;
let aid = AttrId::from_bytes(&src[VEC_SIZE_8..VEC_SIZE_16]);
let tx = TxId::from_bytes(&src[VEC_SIZE_16..VEC_SIZE_24]);
let eid = EntityId::from_bytes(&src[VEC_SIZE_24..VEC_SIZE_32]);
let op = src[VEC_SIZE_16].try_into()?;
Ok((vid, aid, eid, tx, op))
}
/// aid: 4 bytes (incl. tag)
/// aid: 8 bytes (incl. tag)
/// tx: 8 bytes (incl. op)
#[inline]
pub(crate) fn encode_attr_by_id(aid: AttrId, tx: TxId, op: StoreOp) -> Encoded<VEC_SIZE_12> {
let mut ret = SmallVec::<[u8; VEC_SIZE_12]>::new();
pub(crate) fn encode_attr_by_id(aid: AttrId, tx: TxId, op: StoreOp) -> Encoded<VEC_SIZE_16> {
let mut ret = SmallVec::<[u8; VEC_SIZE_16]>::new();
ret.extend(aid.0.to_be_bytes());
ret[0] = StorageTag::AttrById as u8;
ret.extend(tx.0.to_be_bytes());
ret[VEC_SIZE_4] = op as u8;
debug_assert_eq!(ret.len(), VEC_SIZE_12);
ret[VEC_SIZE_8] = op as u8;
debug_assert_eq!(ret.len(), VEC_SIZE_16);
ret.into()
}
#[inline]
pub(crate) fn decode_attr_key_by_id(src: &[u8]) -> Result<(AttrId, TxId, StoreOp)> {
let aid = AttrId::from_bytes(&src[0..VEC_SIZE_4]);
let tx = TxId::from_bytes(&src[VEC_SIZE_4..VEC_SIZE_12]);
let op = src[VEC_SIZE_4].try_into()?;
let aid = AttrId::from_bytes(&src[0..VEC_SIZE_8]);
let tx = TxId::from_bytes(&src[VEC_SIZE_8..VEC_SIZE_16]);
let op = src[VEC_SIZE_8].try_into()?;
Ok((aid, tx, op))
}
/// tag: 4 bytes (with prefix)
/// tag: 8 bytes (with prefix)
/// tx: 8 bytes (incl. op)
/// attr as kw: variable (segmented by \0)
#[inline]
pub(crate) fn encode_attr_by_kw(attr_name: Keyword, tx: TxId, op: StoreOp) -> Encoded<VEC_SIZE_12> {
let mut ret = SmallVec::<[u8; VEC_SIZE_12]>::new();
pub(crate) fn encode_attr_by_kw(attr_name: Keyword, tx: TxId, op: StoreOp) -> Encoded<LARGE_VEC_SIZE> {
let mut ret = SmallVec::<[u8; LARGE_VEC_SIZE]>::new();
ret.push(StorageTag::AttrByKeyword as u8);
let ns_bytes = attr_name.ns.as_bytes();
ret.push(ns_bytes.get(0).cloned().unwrap_or(0));
ret.push(ns_bytes.get(1).cloned().unwrap_or(0));
ret.push(ns_bytes.get(2).cloned().unwrap_or(0));
ret.push(ns_bytes.get(3).cloned().unwrap_or(0));
ret.push(ns_bytes.get(4).cloned().unwrap_or(0));
ret.push(ns_bytes.get(5).cloned().unwrap_or(0));
ret.push(ns_bytes.get(6).cloned().unwrap_or(0));
ret.extend(tx.0.to_be_bytes());
ret[VEC_SIZE_4] = op as u8;
ret[VEC_SIZE_8] = op as u8;
ret.extend_from_slice(ns_bytes);
ret.push(b'/');
ret.extend_from_slice(attr_name.ident.as_bytes());
@ -309,9 +312,9 @@ pub(crate) fn encode_attr_by_kw(attr_name: Keyword, tx: TxId, op: StoreOp) -> En
#[inline]
pub(crate) fn decode_attr_key_by_kw(src: &[u8]) -> Result<(Keyword, TxId, StoreOp)> {
let tx = TxId::from_bytes(&src[VEC_SIZE_4..VEC_SIZE_12]);
let op = src[VEC_SIZE_4].try_into()?;
let kw = Keyword::try_from(&src[VEC_SIZE_12..])?;
let tx = TxId::from_bytes(&src[VEC_SIZE_8..VEC_SIZE_16]);
let op = src[VEC_SIZE_8].try_into()?;
let kw = Keyword::try_from(&src[VEC_SIZE_16..])?;
Ok((kw, tx, op))
}
@ -343,17 +346,17 @@ pub(crate) fn encode_unique_attr_val(aid: AttrId, val: Value) -> Encoded<LARGE_V
#[inline]
pub(crate) fn decode_unique_attr_val(src: &[u8]) -> Result<(AttrId, Value)> {
let a_id = AttrId::from_bytes(&src[..VEC_SIZE_4]);
let val = rmp_serde::from_slice(&src[VEC_SIZE_4..])?;
let a_id = AttrId::from_bytes(&src[..VEC_SIZE_8]);
let val = rmp_serde::from_slice(&src[VEC_SIZE_8..])?;
Ok((a_id, val))
}
#[inline]
pub(crate) fn encode_unique_attr_by_id(aid: AttrId) -> Encoded<VEC_SIZE_4> {
let mut ret = SmallVec::<[u8; VEC_SIZE_4]>::new();
pub(crate) fn encode_unique_attr_by_id(aid: AttrId) -> Encoded<VEC_SIZE_8> {
let mut ret = SmallVec::<[u8; VEC_SIZE_8]>::new();
ret.extend(aid.0.to_be_bytes());
ret[0] = StorageTag::UniqueAttrById as u8;
debug_assert_eq!(ret.len(), VEC_SIZE_4);
debug_assert_eq!(ret.len(), VEC_SIZE_8);
ret.into()
}

@ -23,11 +23,13 @@ pub(crate) const MAX_TEMP_ENTITY_ID: EntityId = EntityId(10_000_000);
pub(crate) const MAX_PERM_ENTITY_ID: EntityId = EntityId(0x00ff_ffff_ff00_0000);
#[derive(Clone, Copy, PartialEq, Ord, PartialOrd, Eq, Deserialize, Serialize, Hash)]
pub struct AttrId(pub u32);
pub struct AttrId(pub u64);
impl AttrId {
pub(crate) fn from_bytes(b: &[u8]) -> Self {
AttrId(u32::from_be_bytes([0, b[1], b[2], b[3]]))
AttrId(u64::from_be_bytes([
0, b[1], b[2], b[3], b[4], b[5], b[6], b[7],
]))
}
}
@ -38,8 +40,8 @@ impl Debug for AttrId {
}
pub(crate) const MAX_SYS_ATTR_ID: AttrId = AttrId(1000);
pub(crate) const MAX_TEMP_ATTR_ID: AttrId = AttrId(1000000);
pub(crate) const MAX_PERM_ATTR_ID: AttrId = AttrId(0x00ff_ffff);
pub(crate) const MAX_TEMP_ATTR_ID: AttrId = AttrId(10_000_000);
pub(crate) const MAX_PERM_ATTR_ID: AttrId = AttrId(0x00ff_ffff_ff00_0000);
#[derive(Clone, Copy, PartialEq, Ord, PartialOrd, Eq, Deserialize, Serialize, Hash)]
pub struct TxId(pub u64);

@ -1,35 +1,37 @@
use cozorocks::RocksDb;
use std::sync::atomic::{AtomicU32, AtomicU64};
use std::sync::{Arc, Mutex};
// use crate::data::compare::DB_KEY_PREFIX_LEN;
// use anyhow::Result;
// use cozorocks::*;
// use std::sync::atomic::{AtomicU32, AtomicU64};
// use std::sync::{Arc, Mutex};
//
// pub struct DbInstance {
//x pub destroy_on_close: bool,
// db: SharedPtr<DbBridge>,
//x db_opts: UniquePtr<Options>,
//x tdb_opts: Option<UniquePtr<TransactionDBOptions>>,
//x odb_opts: Option<UniquePtr<OptimisticTransactionDBOptions>>,
//x path: String,
// last_attr_id: Arc<AtomicU32>,
// last_ent_id: Arc<AtomicU64>,
// last_tx_id: Arc<AtomicU64>,
// sessions: Mutex<Vec<Arc<Mutex<SessionHandle>>>>,
// }
//
// struct SessionHandle {
// id: usize,
// temp: SharedPtr<DbBridge>,
// status: SessionStatus,
// }
//
// #[derive(Eq, PartialEq, Debug, Clone, Copy)]
// pub enum SessionStatus {
// Prepared,
// Running,
// Completed,
// }
//
pub struct DbInstance {
db: RocksDb,
last_attr_id: Arc<AtomicU32>,
last_ent_id: Arc<AtomicU64>,
last_tx_id: Arc<AtomicU64>,
sessions: Mutex<Vec<Arc<Mutex<SessionHandle>>>>,
}
struct SessionHandle {
id: usize,
db: RocksDb,
last_attr_id: Arc<AtomicU32>,
last_ent_id: Arc<AtomicU64>,
last_tx_id: Arc<AtomicU64>,
status: SessionStatus,
}
#[derive(Eq, PartialEq, Debug, Clone, Copy)]
pub enum SessionStatus {
Prepared,
Running,
Completed,
}
// impl DbInstance {
// pub fn new(path: &str, optimistic: bool, destroy_on_close: bool) -> Result<Self> {
// let mut db_opts = Options::new().within_unique_ptr();

Loading…
Cancel
Save