triple transacts

main
Ziyang Hu 2 years ago
parent bf412a9e52
commit 6634f3d816

@ -43,7 +43,7 @@ fn compare_key(a: &[u8], b: &[u8]) -> Ordering {
TripleValueAttrEntity => compare_key_triple_vae(a, b),
AttrById => compare_key_attr_by_id(a, b),
Tx => compare_key_tx(a, b),
UniqueEntity => compare_key_unique_entity(a, b),
UniqueEntityAttr => compare_key_unique_entity_attr(a, b),
UniqueAttrValue => compare_key_unique_attr_val(a, b),
UniqueAttrById => compare_key_unique_attr_by_id(a, b),
UniqueAttrByKeyword => compare_key_unique_attr_by_kw(a, b),
@ -123,7 +123,7 @@ fn compare_key_tx(a: &[u8], b: &[u8]) -> Ordering {
}
#[inline]
fn compare_key_unique_entity(a: &[u8], b: &[u8]) -> Ordering {
fn compare_key_unique_entity_attr(a: &[u8], b: &[u8]) -> Ordering {
a.cmp(b)
}

@ -20,7 +20,7 @@ pub(crate) enum StorageTag {
TripleValueAttrEntity = 4,
AttrById = 5,
Tx = 6,
UniqueEntity = 7,
UniqueEntityAttr = 7,
UniqueAttrValue = 8,
UniqueAttrById = 9,
UniqueAttrByKeyword = 10,
@ -70,11 +70,15 @@ impl EncodedVec<LARGE_VEC_SIZE> {
if data.len() <= 1 {
op.to_string()
} else {
format!("{}{:?}", op, Attribute::decode(&data[VEC_SIZE_8..]).unwrap())
format!(
"{}{:?}",
op,
Attribute::decode(&data[VEC_SIZE_8..]).unwrap()
)
}
}
StorageTag::Tx => format!("{:?}", TxLog::decode(data).unwrap()),
StorageTag::UniqueEntity | StorageTag::UniqueAttrValue => {
StorageTag::UniqueEntityAttr | StorageTag::UniqueAttrValue => {
format!("{:?}", TxId::from_bytes(data))
}
}
@ -121,7 +125,7 @@ impl<const N: usize> Debug for EncodedVec<N> {
StorageTag::Tx => {
write!(f, " {:?}", TxId::from_bytes(self))
}
StorageTag::UniqueEntity => {
StorageTag::UniqueEntityAttr => {
write!(f, " {:?}", EntityId::from_bytes(self))
}
StorageTag::UniqueAttrValue => {
@ -188,7 +192,7 @@ impl TryFrom<u8> for StorageTag {
4 => TripleValueAttrEntity,
5 => AttrById,
6 => Tx,
7 => UniqueEntity,
7 => UniqueEntityAttr,
8 => UniqueAttrValue,
9 => UniqueAttrById,
10 => UniqueAttrByKeyword,
@ -213,6 +217,11 @@ pub(crate) fn decode_value_from_key(src: &[u8]) -> Result<Value> {
Ok(rmp_serde::from_slice(&src[VEC_SIZE_24..])?)
}
#[inline]
pub(crate) fn decode_value_from_val(src: &[u8]) -> Result<Value> {
Ok(rmp_serde::from_slice(&src[VEC_SIZE_8..])?)
}
/// eid: 8 bytes (incl. tag)
/// aid: 8 bytes
/// val: variable
@ -381,13 +390,21 @@ pub(crate) fn encode_tx(tx: TxId) -> EncodedVec<VEC_SIZE_8> {
}
#[inline]
pub(crate) fn encode_unique_entity(eid: EntityId) -> EncodedVec<VEC_SIZE_8> {
let mut ret = SmallVec::<[u8; VEC_SIZE_8]>::new();
pub(crate) fn encode_unique_entity_attr(eid: EntityId, aid: AttrId) -> EncodedVec<VEC_SIZE_16> {
let mut ret = SmallVec::<[u8; VEC_SIZE_16]>::new();
ret.extend(eid.bytes());
ret[0] = StorageTag::UniqueEntity as u8;
ret[0] = StorageTag::UniqueEntityAttr as u8;
ret.extend(aid.bytes());
ret.into()
}
#[inline]
pub(crate) fn decode_unique_entity_attr(src: &[u8]) -> Result<(EntityId, AttrId)> {
let eid = EntityId::from_bytes(&src[..VEC_SIZE_8]);
let aid = AttrId::from_bytes(&src[VEC_SIZE_8..VEC_SIZE_16]);
Ok((eid, aid))
}
#[inline]
pub(crate) fn encode_unique_attr_val(aid: AttrId, val: &Value) -> EncodedVec<LARGE_VEC_SIZE> {
let mut ret = SmallVec::<[u8; LARGE_VEC_SIZE]>::new();

@ -9,6 +9,7 @@ pub struct Validity(pub i64);
impl Validity {
pub(crate) const MAX: Validity = Validity(i64::MAX);
pub(crate) const NO_HISTORY: Validity = Validity(i64::MIN + 1);
pub(crate) const MIN: Validity = Validity(i64::MIN);
pub(crate) fn current() -> Self {
let timestamp = SystemTime::now()

@ -10,25 +10,24 @@ use std::collections::BTreeMap;
use std::fmt::{Display, Formatter};
pub(crate) struct Triple<'a> {
id: EntityId,
attr: AttrId,
value: Value<'a>,
pub(crate) id: EntityId,
pub(crate) attr: AttrId,
pub(crate) value: Value<'a>,
}
pub struct Quintuple<'a> {
triple: Triple<'a>,
action: TxAction,
validity: Validity,
pub(crate) triple: Triple<'a>,
pub(crate) action: TxAction,
pub(crate) validity: Validity,
}
#[repr(u8)]
#[derive(Debug, Eq, PartialEq, Copy, Clone)]
pub enum TxAction {
Put,
RetractEAV,
RetractEA,
RetractE,
Erase,
Retract,
RetractAllEA,
RetractAllE,
Ensure,
}
@ -57,7 +56,7 @@ impl SessionTx {
/// ```json
/// {"tx": [...], "comment": "a comment", "since": timestamp}
/// ```
/// each line in `tx` is `{"put: ...}`, `{"retract": ...}`, `{"erase": ...}` or `{"ensure": ...}`
/// each line in `tx` is `{"put: ...}`, `{"retract": ...}` or `{"ensure": ...}`
/// these can also have a `since` field, overriding the timestamp
/// the dots can be triples
/// ```json
@ -130,9 +129,7 @@ impl SessionTx {
if let Some(inner) = item.get("put") {
(inner, TxAction::Put)
} else if let Some(inner) = item.get("retract") {
(inner, TxAction::RetractEAV)
} else if let Some(inner) = item.get("erase") {
(inner, TxAction::Erase)
(inner, TxAction::Retract)
} else if let Some(inner) = item.get("ensure") {
(inner, TxAction::Ensure)
} else {
@ -210,6 +207,65 @@ impl SessionTx {
collected: &mut Vec<Quintuple<'a>>,
) -> Result<()> {
match item {
[eid] => {
if action != TxAction::Retract {
return Err(TxError::InvalidAction(
action,
"singlet only allowed for 'retract'".to_string(),
)
.into());
}
let eid = eid.as_u64().ok_or_else(|| {
TxError::Decoding(eid.clone(), "cannot parse as entity id".to_string())
})?;
let eid = EntityId(eid);
if !eid.is_perm() {
return Err(
TxError::EntityId(eid.0, "expected perm entity id".to_string()).into(),
);
}
collected.push(Quintuple {
triple: Triple {
id: eid,
attr: AttrId(0),
value: Value::Null,
},
action: TxAction::RetractAllE,
validity: since,
});
Ok(())
}
[eid, attr] => {
if action != TxAction::Retract {
return Err(TxError::InvalidAction(
action,
"doublet only allowed for 'retract'".to_string(),
)
.into());
}
let kw: Keyword = attr.try_into()?;
let attr = self.attr_by_kw(&kw)?.ok_or(TxError::AttrNotFound(kw))?;
let eid = eid.as_u64().ok_or_else(|| {
TxError::Decoding(eid.clone(), "cannot parse as entity id".to_string())
})?;
let eid = EntityId(eid);
if !eid.is_perm() {
return Err(
TxError::EntityId(eid.0, "expected perm entity id".to_string()).into(),
);
}
collected.push(Quintuple {
triple: Triple {
id: eid,
attr: attr.id,
value: Value::Null,
},
action: TxAction::RetractAllEA,
validity: since,
});
Ok(())
}
[eid, attr_kw, val] => {
let kw: Keyword = attr_kw.try_into()?;
let attr = self.attr_by_kw(&kw)?.ok_or(TxError::AttrNotFound(kw))?;

@ -1,5 +1,5 @@
use crate::data::attr::Attribute;
use crate::data::encode::{encode_tx, encode_unique_attr_by_id, encode_unique_entity, EncodedVec};
use crate::data::encode::{encode_tx, encode_unique_attr_by_id, encode_unique_entity_attr, EncodedVec};
use crate::data::id::{AttrId, EntityId, TxId, Validity};
use crate::data::keyword::Keyword;
use crate::data::value::StaticValue;
@ -66,8 +66,8 @@ impl SessionTx {
}
pub(crate) fn load_last_entity_id(&mut self) -> Result<EntityId> {
let e_lower = encode_unique_entity(EntityId::MIN_PERM);
let e_upper = encode_unique_entity(EntityId::MAX_PERM);
let e_lower = encode_unique_entity_attr(EntityId::MIN_PERM, AttrId::MIN_PERM);
let e_upper = encode_unique_entity_attr(EntityId::MAX_PERM, AttrId::MIN_PERM);
let it = self.bounded_scan_last(&e_lower, &e_upper);
Ok(match it.key()? {
@ -149,4 +149,8 @@ pub enum TransactError {
WriteInReadOnly,
#[error("attempt to change immutable property for attr {0:?}")]
ChangingImmutableProperty(AttrId),
#[error("required triple not found for {0:?}, {1:?}")]
RequiredTripleNotFound(EntityId, AttrId),
#[error("precondition failed for {0:?} {1:?}, expect {2}, got {3}")]
PreconditionFailed(EntityId, AttrId, String, String)
}

@ -1,12 +1,14 @@
use crate::data::attr::{Attribute, AttributeTyping};
use crate::data::encode::{
decode_ae_key, decode_ea_key, decode_vae_key, decode_value, decode_value_from_key,
encode_aev_key, encode_ave_key, encode_ave_key_for_unique_v, encode_eav_key,
encode_unique_attr_val, encode_unique_entity, encode_vae_key, EncodedVec, LARGE_VEC_SIZE,
decode_value_from_val, encode_aev_key, encode_ave_key, encode_ave_key_for_unique_v,
encode_eav_key, encode_unique_attr_val, encode_unique_entity_attr, encode_vae_key, EncodedVec,
LARGE_VEC_SIZE,
};
use crate::data::id::{AttrId, EntityId, Validity};
use crate::data::keyword::Keyword;
use crate::data::triple::StoreOp;
use crate::data::tx_triple::{Quintuple, TxAction};
use crate::data::value::{StaticValue, Value, INLINE_VAL_SIZE_LIMIT};
use crate::runtime::transact::{SessionTx, TransactError};
use crate::utils::swap_option_result;
@ -27,6 +29,95 @@ enum TripleError {
}
impl SessionTx {
pub fn tx_triples(&mut self, payloads: Vec<Quintuple>) -> Result<()> {
for payload in payloads {
match payload.action {
TxAction::Put => {
let attr = self.attr_by_id(payload.triple.attr)?.unwrap();
if payload.triple.id.is_perm() {
self.amend_triple(
payload.triple.id,
&attr,
&payload.triple.value,
payload.validity,
)?;
} else {
self.new_triple(
payload.triple.id,
&attr,
&payload.triple.value,
payload.validity,
)?;
}
}
TxAction::Retract => {
let attr = self.attr_by_id(payload.triple.attr)?.unwrap();
self.retract_triple(
payload.triple.id,
&attr,
&payload.triple.value,
payload.validity,
)?;
}
TxAction::RetractAllEA => {
let attr = self.attr_by_id(payload.triple.attr)?.unwrap();
self.retract_triples_for_attr(payload.triple.id, &attr, payload.validity)?;
}
TxAction::RetractAllE => {
self.retract_entity(payload.triple.id, payload.validity)?;
}
TxAction::Ensure => {
let attr = self.attr_by_id(payload.triple.attr)?.unwrap();
self.ensure_triple(
payload.triple.id,
&attr,
&payload.triple.value,
payload.validity,
)?;
}
}
}
Ok(())
}
pub(crate) fn ensure_triple(
&mut self,
eid: EntityId,
attr: &Attribute,
v: &Value,
vld: Validity,
) -> Result<()> {
let aid = attr.id;
let signal = encode_unique_entity_attr(eid, aid);
let gen_err = || TransactError::RequiredTripleNotFound(eid, aid);
self.tx.get(&signal, true)?.ok_or_else(gen_err)?;
let v_in_key = if attr.cardinality.is_one() {
&Value::Bottom
} else {
v
};
let eav_encoded = encode_eav_key(eid, attr.id, v_in_key, vld);
let eav_encoded_upper = encode_eav_key(eid, attr.id, v_in_key, Validity::MIN);
let it = self.bounded_scan_first(&eav_encoded, &eav_encoded_upper);
let (k_slice, v_slice) = it.pair()?.ok_or_else(gen_err)?;
if StoreOp::try_from(v_slice[0])?.is_retract() {
return Err(gen_err().into());
}
let stored_v = if attr.cardinality.is_one() {
decode_value_from_val(v_slice)?
} else {
decode_value_from_key(k_slice)?
};
if stored_v != *v {
return Err(TransactError::PreconditionFailed(
eid,
aid,
format!("{:?}", v),
format!("{:?}", stored_v),
)
.into());
}
Ok(())
}
pub(crate) fn put_triple(
&mut self,
eid: EntityId,
@ -39,7 +130,7 @@ impl SessionTx {
let vld_in_key = if attr.with_history {
vld
} else {
Validity::MIN
Validity::NO_HISTORY
};
// elide value in key for eav and aev if cardinality is one
let (v_in_key, v_in_val) = if attr.cardinality.is_one() {
@ -56,7 +147,7 @@ impl SessionTx {
// elide value in data for aev if it is big
let val_encoded = if val_encoded.len() > INLINE_VAL_SIZE_LIMIT {
Value::Bottom.encode()
Value::Bottom.encode_with_op_and_tx(op, tx_id)
} else {
val_encoded
};
@ -67,7 +158,10 @@ impl SessionTx {
// vae for ref types
if attr.val_type.is_ref_type() {
let vae_encoded = encode_vae_key(v.get_entity_id()?, attr.id, eid, vld_in_key);
self.tx.put(&vae_encoded, &[op as u8])?;
self.tx.put(
&vae_encoded,
&Value::Bottom.encode_with_op_and_tx(op, tx_id),
)?;
}
// ave for indexing
@ -122,7 +216,7 @@ impl SessionTx {
}
}
}
let e_in_val_encoded = eid.bytes();
let e_in_val_encoded = Value::EnId(eid).encode_with_op_and_tx(op, tx_id);
self.tx.put(&ave_encoded, &e_in_val_encoded)?;
self.tx.put(
@ -131,8 +225,10 @@ impl SessionTx {
)?;
}
self.tx
.put(&encode_unique_entity(eid), &tx_id.bytes_with_op(op))?;
self.tx.put(
&encode_unique_entity_attr(eid, attr.id),
&tx_id.bytes_with_op(op),
)?;
Ok(())
}
@ -200,11 +296,6 @@ impl SessionTx {
self.batch_retract_triple(lower_bound, upper_bound, vld)
}
pub(crate) fn retract_entity(&mut self, eid: EntityId, vld: Validity) -> Result<()> {
match self.latest_entity_existence(eid, true)? {
LatestTripleExistence::Asserted => {}
LatestTripleExistence::Retracted => return Ok(()),
LatestTripleExistence::NotFound => return Err(TripleError::EidNotFound(eid).into()),
}
let lower_bound = encode_eav_key(eid, AttrId::MIN_PERM, &Value::Null, Validity::MAX);
let upper_bound = encode_eav_key(eid, AttrId::MAX_PERM, &Value::Bottom, Validity::MAX);
self.batch_retract_triple(lower_bound, upper_bound, vld)
@ -241,22 +332,6 @@ impl SessionTx {
}
}
}
fn latest_entity_existence(
&mut self,
eid: EntityId,
for_update: bool,
) -> Result<LatestTripleExistence> {
let encoded = encode_unique_entity(eid);
Ok(if let Some(v_slice) = self.tx.get(&encoded, for_update)? {
let op = StoreOp::try_from(v_slice[0])?;
match op {
StoreOp::Retract => LatestTripleExistence::Retracted,
StoreOp::Assert => LatestTripleExistence::Asserted,
}
} else {
LatestTripleExistence::NotFound
})
}
pub(crate) fn eid_by_unique_av(
&mut self,
attr: &Attribute,

Loading…
Cancel
Save