move stuff around

main
Ziyang Hu 2 years ago
parent 1c8ff81f84
commit e3d9a18124

@ -1,10 +1,12 @@
use std::fmt::{Debug, Display, Formatter};
use std::path::Path;
use actix_cors::Cors; use actix_cors::Cors;
use actix_web::{post, web, App, HttpResponse, HttpServer, Responder}; use actix_web::{App, HttpResponse, HttpServer, post, Responder, web};
use clap::Parser; use clap::Parser;
use cozo::Db; use cozo::Db;
use cozorocks::DbBuilder; use cozorocks::DbBuilder;
use std::fmt::{Debug, Display, Formatter};
use std::path::Path;
type Result<T> = std::result::Result<T, RespError>; type Result<T> = std::result::Result<T, RespError>;
@ -33,7 +35,7 @@ impl From<anyhow::Error> for RespError {
} }
#[derive(Parser, Debug)] #[derive(Parser, Debug)]
#[clap(version, about, long_about=None)] #[clap(version, about, long_about = None)]
struct Args { struct Args {
/// Path to the directory to store the database /// Path to the directory to store the database
#[clap(value_parser)] #[clap(value_parser)]
@ -114,7 +116,7 @@ async fn main() -> std::io::Result<()> {
.service(transact) .service(transact)
.service(transact_attr) .service(transact_attr)
}) })
.bind(addr)? .bind(addr)?
.run() .run()
.await .await
} }

@ -1,16 +1,18 @@
use crate::data::encode::EncodedVec; use std::fmt::{Display, Formatter};
use crate::data::id::{AttrId, EntityId, TxId};
use crate::data::keyword::Keyword;
use crate::data::triple::StoreOp;
use crate::data::tx_triple::TempIdCtx;
use crate::data::value::Value;
use anyhow::Result; use anyhow::Result;
use rmp_serde::Serializer; use rmp_serde::Serializer;
use serde::Serialize; use serde::Serialize;
use serde_derive::{Deserialize, Serialize}; use serde_derive::{Deserialize, Serialize};
use serde_json::json; use serde_json::json;
use smallvec::SmallVec; use smallvec::SmallVec;
use std::fmt::{Display, Formatter};
use crate::data::encode::EncodedVec;
use crate::data::id::{AttrId, EntityId, TxId};
use crate::data::keyword::Keyword;
use crate::data::triple::StoreOp;
use crate::data::value::Value;
use crate::preprocess::triple::TempIdCtx;
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
pub enum AttributeError { pub enum AttributeError {
@ -117,7 +119,7 @@ impl TryFrom<&'_ str> for AttributeTyping {
return Err(AttributeError::Conversion( return Err(AttributeError::Conversion(
s.to_string(), s.to_string(),
"AttributeTyping".to_string(), "AttributeTyping".to_string(),
)) ));
} }
}) })
} }
@ -246,7 +248,7 @@ impl TryFrom<&'_ str> for AttributeIndex {
return Err(AttributeError::Conversion( return Err(AttributeError::Conversion(
s.to_string(), s.to_string(),
"AttributeIndex".to_string(), "AttributeIndex".to_string(),
)) ));
} }
}) })
} }
@ -307,7 +309,7 @@ impl Attribute {
) -> Result<Value<'a>> { ) -> Result<Value<'a>> {
if self.val_type.is_ref_type() { if self.val_type.is_ref_type() {
if let Value::String(s) = value { if let Value::String(s) = value {
return Ok(Value::EnId(ctx.str2tempid(&s, false))) return Ok(Value::EnId(ctx.str2tempid(&s, false)));
} }
} }
self.val_type.coerce_value(value) self.val_type.coerce_value(value)

@ -1,8 +1,9 @@
use std::cmp::Ordering;
use crate::data::encode::{ use crate::data::encode::{
decode_ae_key, decode_attr_key_by_id, decode_ea_key, decode_sentinel_attr_val, decode_vae_key, decode_ae_key, decode_attr_key_by_id, decode_ea_key, decode_sentinel_attr_val, decode_vae_key,
decode_value_from_key, StorageTag, decode_value_from_key, StorageTag,
}; };
use std::cmp::Ordering;
pub(crate) fn rusty_cmp(a: &[u8], b: &[u8]) -> i8 { pub(crate) fn rusty_cmp(a: &[u8], b: &[u8]) -> i8 {
match compare_key(a, b) { match compare_key(a, b) {

@ -1,15 +1,17 @@
use std::fmt::{Debug, Formatter};
use std::ops::{Deref, DerefMut};
use anyhow::Result;
use rmp_serde::Serializer;
use serde::Serialize;
use smallvec::SmallVec;
use crate::data::attr::Attribute; use crate::data::attr::Attribute;
use crate::data::id::{AttrId, EntityId, TxId, Validity}; use crate::data::id::{AttrId, EntityId, TxId, Validity};
use crate::data::keyword::Keyword; use crate::data::keyword::Keyword;
use crate::data::triple::StoreOp; use crate::data::triple::StoreOp;
use crate::data::value::Value; use crate::data::value::Value;
use crate::runtime::transact::TxLog; use crate::runtime::transact::TxLog;
use anyhow::Result;
use rmp_serde::Serializer;
use serde::Serialize;
use smallvec::SmallVec;
use std::fmt::{Debug, Formatter};
use std::ops::{Deref, DerefMut};
#[repr(u8)] #[repr(u8)]
#[derive(Ord, PartialOrd, Eq, PartialEq, Debug)] #[derive(Ord, PartialOrd, Eq, PartialEq, Debug)]

@ -1,9 +1,11 @@
use crate::data::triple::StoreOp;
use chrono::{DateTime, TimeZone, Utc};
use serde_derive::{Deserialize, Serialize};
use std::fmt::{Debug, Formatter}; use std::fmt::{Debug, Formatter};
use std::time::{SystemTime, UNIX_EPOCH}; use std::time::{SystemTime, UNIX_EPOCH};
use chrono::{DateTime, TimeZone, Utc};
use serde_derive::{Deserialize, Serialize};
use crate::data::triple::StoreOp;
#[derive(Clone, Copy, PartialEq, Eq, Ord, PartialOrd, Deserialize, Serialize, Hash)] #[derive(Clone, Copy, PartialEq, Eq, Ord, PartialOrd, Deserialize, Serialize, Hash)]
pub struct Validity(pub i64); pub struct Validity(pub i64);
@ -70,7 +72,7 @@ impl Debug for Validity {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
if *self == Validity::MIN { if *self == Validity::MIN {
write!(f, "MIN") write!(f, "MIN")
} else if *self == Validity::NO_HISTORY{ } else if *self == Validity::NO_HISTORY {
write!(f, "NO_HISTORY") write!(f, "NO_HISTORY")
} else if *self == Validity::MAX { } else if *self == Validity::MAX {
write!(f, "MAX") write!(f, "MAX")

@ -1,9 +1,10 @@
use serde_json::json;
pub(crate) use serde_json::Value as JsonValue;
use crate::data::attr::{Attribute, AttributeCardinality, AttributeIndex, AttributeTyping}; use crate::data::attr::{Attribute, AttributeCardinality, AttributeIndex, AttributeTyping};
use crate::data::id::{AttrId, EntityId, TxId}; use crate::data::id::{AttrId, EntityId, TxId};
use crate::data::keyword::{Keyword, KeywordError}; use crate::data::keyword::{Keyword, KeywordError};
use crate::data::value::Value; use crate::data::value::Value;
use serde_json::json;
pub(crate) use serde_json::Value as JsonValue;
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
pub enum JsonError { pub enum JsonError {
@ -35,6 +36,7 @@ impl<'a> From<&'a JsonValue> for Value<'a> {
} }
} }
} }
impl From<Value<'_>> for JsonValue { impl From<Value<'_>> for JsonValue {
fn from(v: Value<'_>) -> Self { fn from(v: Value<'_>) -> Self {
match v { match v {

@ -1,8 +1,9 @@
use serde_derive::{Deserialize, Serialize};
use smartstring::{LazyCompact, SmartString};
use std::fmt::{Debug, Display, Formatter}; use std::fmt::{Debug, Display, Formatter};
use std::str::Utf8Error; use std::str::Utf8Error;
use serde_derive::{Deserialize, Serialize};
use smartstring::{LazyCompact, SmartString};
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
pub enum KeywordError { pub enum KeywordError {
#[error("cannot convert to keyword: {0}")] #[error("cannot convert to keyword: {0}")]

@ -6,6 +6,4 @@ pub(crate) mod json;
pub(crate) mod keyword; pub(crate) mod keyword;
pub(crate) mod triple; pub(crate) mod triple;
pub(crate) mod value; pub(crate) mod value;
pub(crate) mod tx_attr;
pub(crate) mod tx_triple;

@ -1,6 +1,7 @@
use serde_derive::{Deserialize, Serialize};
use std::fmt::{Display, Formatter}; use std::fmt::{Display, Formatter};
use serde_derive::{Deserialize, Serialize};
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
pub enum StoreOpError { pub enum StoreOpError {
#[error("unexpected value for StoreOp: {0}")] #[error("unexpected value for StoreOp: {0}")]

@ -1,19 +1,22 @@
use crate::data::encode::{decode_value, EncodedVec}; use std::borrow::Cow;
use crate::data::id::{EntityId, TxId}; use std::cmp::Reverse;
use crate::data::keyword::Keyword; use std::fmt::Debug;
use crate::data::triple::StoreOp;
use anyhow::Result; use anyhow::Result;
use cozorocks::PinSlice;
use ordered_float::OrderedFloat; use ordered_float::OrderedFloat;
use rmp_serde::Serializer; use rmp_serde::Serializer;
use serde::Serialize; use serde::Serialize;
use serde_derive::{Deserialize, Serialize}; use serde_derive::{Deserialize, Serialize};
use smallvec::SmallVec; use smallvec::SmallVec;
use std::borrow::Cow;
use std::cmp::Reverse;
use std::fmt::Debug;
use uuid::Uuid; use uuid::Uuid;
use cozorocks::PinSlice;
use crate::data::encode::{decode_value, EncodedVec};
use crate::data::id::{EntityId, TxId};
use crate::data::keyword::Keyword;
use crate::data::triple::StoreOp;
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
pub enum ValueError { pub enum ValueError {
#[error("type mismatch: expected {0}, got {1}")] #[error("type mismatch: expected {0}, got {1}")]
@ -119,11 +122,12 @@ impl PinSliceValue {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use crate::data::keyword::Keyword;
use crate::data::value::Value;
use std::collections::{BTreeMap, HashMap}; use std::collections::{BTreeMap, HashMap};
use std::mem::size_of; use std::mem::size_of;
use crate::data::keyword::Keyword;
use crate::data::value::Value;
#[test] #[test]
fn show_size() { fn show_size() {
dbg!(size_of::<Value>()); dbg!(size_of::<Value>());

@ -1,6 +1,10 @@
#[cfg(not(target_env = "msvc"))] #[cfg(not(target_env = "msvc"))]
use tikv_jemallocator::Jemalloc; use tikv_jemallocator::Jemalloc;
pub use data::encode::EncodedVec;
pub use preprocess::attr::AttrTxItem;
pub use runtime::db::Db;
#[cfg(not(target_env = "msvc"))] #[cfg(not(target_env = "msvc"))]
#[global_allocator] #[global_allocator]
static GLOBAL: Jemalloc = Jemalloc; static GLOBAL: Jemalloc = Jemalloc;
@ -9,7 +13,5 @@ pub(crate) mod data;
pub(crate) mod runtime; pub(crate) mod runtime;
pub(crate) mod transact; pub(crate) mod transact;
pub(crate) mod utils; pub(crate) mod utils;
pub(crate) mod preprocess;
pub use data::encode::EncodedVec;
pub use data::tx_attr::AttrTxItem;
pub use runtime::db::Db;

@ -1,8 +1,9 @@
use crate::data::attr::Attribute;
use crate::data::triple::StoreOp;
use anyhow::Result; use anyhow::Result;
use itertools::Itertools; use itertools::Itertools;
use crate::data::attr::Attribute;
use crate::data::triple::StoreOp;
#[derive(Debug)] #[derive(Debug)]
pub struct AttrTxItem { pub struct AttrTxItem {
pub(crate) op: StoreOp, pub(crate) op: StoreOp,
@ -29,7 +30,7 @@ impl AttrTxItem {
req.clone(), req.clone(),
"'attrs' cannot be empty".to_string(), "'attrs' cannot be empty".to_string(),
) )
.into()); .into());
} }
let res = items.iter().map(AttrTxItem::try_from).try_collect()?; let res = items.iter().map(AttrTxItem::try_from).try_collect()?;
Ok((res, comment)) Ok((res, comment))
@ -54,7 +55,7 @@ impl TryFrom<&'_ serde_json::Value> for AttrTxItem {
value.clone(), value.clone(),
"object must have exactly one field".to_string(), "object must have exactly one field".to_string(),
) )
.into()); .into());
} }
let (k, v) = map.into_iter().next().unwrap(); let (k, v) = map.into_iter().next().unwrap();
let op = match k as &str { let op = match k as &str {
@ -63,7 +64,7 @@ impl TryFrom<&'_ serde_json::Value> for AttrTxItem {
_ => { _ => {
return Err( return Err(
AttrTxItemError::Decoding(value.clone(), format!("unknown op {}", k)).into(), AttrTxItemError::Decoding(value.clone(), format!("unknown op {}", k)).into(),
) );
} }
}; };

@ -0,0 +1,2 @@
pub(crate) mod attr;
pub(crate) mod triple;

@ -1,13 +1,15 @@
use std::collections::btree_map::Entry;
use std::collections::BTreeMap;
use std::fmt::{Display, Formatter};
use anyhow::Result;
use serde_json::Map;
use crate::data::attr::{Attribute, AttributeIndex, AttributeTyping}; use crate::data::attr::{Attribute, AttributeIndex, AttributeTyping};
use crate::data::id::{AttrId, EntityId, Validity}; use crate::data::id::{AttrId, EntityId, Validity};
use crate::data::keyword::Keyword; use crate::data::keyword::Keyword;
use crate::data::value::Value; use crate::data::value::Value;
use crate::runtime::transact::SessionTx; use crate::runtime::transact::SessionTx;
use anyhow::Result;
use serde_json::Map;
use std::collections::btree_map::Entry;
use std::collections::BTreeMap;
use std::fmt::{Display, Formatter};
#[derive(Debug)] #[derive(Debug)]
pub(crate) struct Triple<'a> { pub(crate) struct Triple<'a> {
@ -176,7 +178,7 @@ impl SessionTx {
serde_json::Value::Object(item.clone()), serde_json::Value::Object(item.clone()),
"expect any of the keys 'put', 'retract', 'erase', 'ensure'".to_string(), "expect any of the keys 'put', 'retract', 'erase', 'ensure'".to_string(),
) )
.into()); .into());
} }
}; };
let since = match item.get("since") { let since = match item.get("since") {
@ -226,7 +228,7 @@ impl SessionTx {
action, action,
"using temp id instead of perm id".to_string(), "using temp id instead of perm id".to_string(),
) )
.into()); .into());
} }
let v = if let serde_json::Value::Object(inner) = value { let v = if let serde_json::Value::Object(inner) = value {
@ -261,13 +263,13 @@ impl SessionTx {
action, action,
"component shorthand cannot be used".to_string(), "component shorthand cannot be used".to_string(),
) )
.into()); .into());
} }
let (eid, has_unique_attr) = let (eid, has_unique_attr) =
self.parse_tx_request_obj(comp, true, action, since, temp_id_ctx, collected)?; self.parse_tx_request_obj(comp, true, action, since, temp_id_ctx, collected)?;
if !has_unique_attr && parent_attr.val_type != AttributeTyping::Component { if !has_unique_attr && parent_attr.val_type != AttributeTyping::Component {
return Err(TxError::InvalidAction(action, return Err(TxError::InvalidAction(action,
"component shorthand must contain at least one unique/identity field for non-component refs".to_string()).into()); "component shorthand must contain at least one unique/identity field for non-component refs".to_string()).into());
} }
Ok(Value::EnId(eid)) Ok(Value::EnId(eid))
} }
@ -286,7 +288,7 @@ impl SessionTx {
action, action,
"singlet only allowed for 'retract'".to_string(), "singlet only allowed for 'retract'".to_string(),
) )
.into()); .into());
} }
let eid = eid.as_u64().ok_or_else(|| { let eid = eid.as_u64().ok_or_else(|| {
TxError::Decoding(eid.clone(), "cannot parse as entity id".to_string()) TxError::Decoding(eid.clone(), "cannot parse as entity id".to_string())
@ -314,7 +316,7 @@ impl SessionTx {
action, action,
"doublet only allowed for 'retract'".to_string(), "doublet only allowed for 'retract'".to_string(),
) )
.into()); .into());
} }
let kw: Keyword = attr.try_into()?; let kw: Keyword = attr.try_into()?;
let attr = self.attr_by_kw(&kw)?.ok_or(TxError::AttrNotFound(kw))?; let attr = self.attr_by_kw(&kw)?.ok_or(TxError::AttrNotFound(kw))?;
@ -396,7 +398,7 @@ impl SessionTx {
id.0, id.0,
"conflicting id for identity value".into(), "conflicting id for identity value".into(),
) )
.into()); .into());
} }
id id
} else if eid.is_string() { } else if eid.is_string() {
@ -404,7 +406,7 @@ impl SessionTx {
existing_id.0, existing_id.0,
"specifying temp_id string together with unique constraint".into(), "specifying temp_id string together with unique constraint".into(),
) )
.into()); .into());
} else { } else {
existing_id existing_id
} }
@ -476,7 +478,7 @@ impl SessionTx {
existing_eid.0, existing_eid.0,
"conflicting entity id given".to_string(), "conflicting entity id given".to_string(),
) )
.into()); .into());
} }
} }
eid = Some(existing_eid) eid = Some(existing_eid)
@ -499,7 +501,7 @@ impl SessionTx {
given_id.0, given_id.0,
"temp id given where perm id is required".to_string(), "temp id given where perm id is required".to_string(),
) )
.into()); .into());
} }
if let Some(prev_id) = eid { if let Some(prev_id) = eid {
if prev_id != given_id { if prev_id != given_id {
@ -507,7 +509,7 @@ impl SessionTx {
given_id.0, given_id.0,
"conflicting entity id given".to_string(), "conflicting entity id given".to_string(),
) )
.into()); .into());
} }
} }
eid = Some(given_id); eid = Some(given_id);
@ -518,7 +520,7 @@ impl SessionTx {
eid_inner.0, eid_inner.0,
"conflicting entity id given".to_string(), "conflicting entity id given".to_string(),
) )
.into()); .into());
} }
let temp_id_str = temp_id.as_str().ok_or_else(|| { let temp_id_str = temp_id.as_str().ok_or_else(|| {
TxError::Decoding( TxError::Decoding(
@ -541,7 +543,7 @@ impl SessionTx {
action, action,
"upsert requires identity attribute present".to_string(), "upsert requires identity attribute present".to_string(),
) )
.into()); .into());
} }
for (attr, v) in pairs { for (attr, v) in pairs {
self.parse_tx_request_inner(eid, &attr, v, action, since, temp_id_ctx, collected)?; self.parse_tx_request_inner(eid, &attr, v, action, since, temp_id_ctx, collected)?;
@ -557,7 +559,7 @@ impl SessionTx {
action, action,
"cannot use non-unique fields to specify entity".to_string(), "cannot use non-unique fields to specify entity".to_string(),
) )
.into()); .into());
} }
} }
} }
@ -572,7 +574,7 @@ fn assert_absence_of_keys(m: &Map<String, serde_json::Value>, keys: &[&str]) ->
serde_json::Value::Object(m.clone()), serde_json::Value::Object(m.clone()),
format!("object must not contain key {}", k), format!("object must not contain key {}", k),
) )
.into()); .into());
} }
} }
Ok(()) Ok(())

@ -1,4 +1,16 @@
use crate::data::compare::{rusty_cmp, DB_KEY_PREFIX_LEN}; use std::collections::BTreeMap;
use std::fmt::{Debug, Formatter};
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use anyhow::Result;
use itertools::Itertools;
use serde_json::json;
use cozorocks::{DbBuilder, DbIter, RocksDb};
use crate::AttrTxItem;
use crate::data::compare::{DB_KEY_PREFIX_LEN, rusty_cmp};
use crate::data::encode::{ use crate::data::encode::{
decode_ea_key, decode_value_from_key, decode_value_from_val, encode_eav_key, StorageTag, decode_ea_key, decode_value_from_key, decode_value_from_val, encode_eav_key, StorageTag,
}; };
@ -6,15 +18,6 @@ use crate::data::id::{AttrId, EntityId, TxId, Validity};
use crate::data::triple::StoreOp; use crate::data::triple::StoreOp;
use crate::data::value::Value; use crate::data::value::Value;
use crate::runtime::transact::SessionTx; use crate::runtime::transact::SessionTx;
use crate::AttrTxItem;
use anyhow::Result;
use cozorocks::{DbBuilder, DbIter, RocksDb};
use itertools::Itertools;
use serde_json::json;
use std::collections::BTreeMap;
use std::fmt::{Debug, Formatter};
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::sync::Arc;
pub struct Db { pub struct Db {
db: RocksDb, db: RocksDb,

@ -1,19 +1,22 @@
use std::collections::{BTreeMap, BTreeSet};
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use anyhow::Result;
use rmp_serde::Serializer;
use serde::Serialize;
use serde_derive::{Deserialize, Serialize};
use smallvec::SmallVec;
use cozorocks::{DbIter, Tx};
use crate::data::attr::Attribute; use crate::data::attr::Attribute;
use crate::data::encode::{ use crate::data::encode::{
encode_tx, encode_sentinel_attr_by_id, encode_sentinel_entity_attr, EncodedVec, encode_sentinel_attr_by_id, encode_sentinel_entity_attr, encode_tx, EncodedVec,
}; };
use crate::data::id::{AttrId, EntityId, TxId, Validity}; use crate::data::id::{AttrId, EntityId, TxId, Validity};
use crate::data::keyword::Keyword; use crate::data::keyword::Keyword;
use crate::data::value::StaticValue; use crate::data::value::StaticValue;
use anyhow::Result;
use cozorocks::{DbIter, Tx};
use rmp_serde::Serializer;
use serde::Serialize;
use serde_derive::{Deserialize, Serialize};
use smallvec::SmallVec;
use std::collections::{BTreeMap, BTreeSet};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
pub struct SessionTx { pub struct SessionTx {
pub(crate) tx: Tx, pub(crate) tx: Tx,
@ -25,7 +28,7 @@ pub struct SessionTx {
pub(crate) attr_by_kw_cache: BTreeMap<Keyword, Option<Attribute>>, pub(crate) attr_by_kw_cache: BTreeMap<Keyword, Option<Attribute>>,
pub(crate) temp_entity_to_perm: BTreeMap<EntityId, EntityId>, pub(crate) temp_entity_to_perm: BTreeMap<EntityId, EntityId>,
pub(crate) eid_by_attr_val_cache: pub(crate) eid_by_attr_val_cache:
BTreeMap<StaticValue, BTreeMap<(AttrId, Validity), Option<EntityId>>>, BTreeMap<StaticValue, BTreeMap<(AttrId, Validity), Option<EntityId>>>,
// "touched" requires the id to exist prior to the transaction, and something related to it has changed // "touched" requires the id to exist prior to the transaction, and something related to it has changed
pub(crate) touched_eids: BTreeSet<EntityId>, pub(crate) touched_eids: BTreeSet<EntityId>,
} }

@ -1,3 +1,10 @@
use std::sync::atomic::Ordering;
use anyhow::Result;
use cozorocks::{DbIter, IterBuilder};
use crate::AttrTxItem;
use crate::data::attr::Attribute; use crate::data::attr::Attribute;
use crate::data::encode::{ use crate::data::encode::{
encode_attr_by_id, encode_sentinel_attr_by_id, encode_sentinel_attr_by_kw, VEC_SIZE_8, encode_attr_by_id, encode_sentinel_attr_by_id, encode_sentinel_attr_by_kw, VEC_SIZE_8,
@ -7,10 +14,6 @@ use crate::data::keyword::Keyword;
use crate::data::triple::StoreOp; use crate::data::triple::StoreOp;
use crate::runtime::transact::{SessionTx, TransactError}; use crate::runtime::transact::{SessionTx, TransactError};
use crate::utils::swap_option_result; use crate::utils::swap_option_result;
use crate::AttrTxItem;
use anyhow::Result;
use cozorocks::{DbIter, IterBuilder};
use std::sync::atomic::Ordering;
impl SessionTx { impl SessionTx {
pub fn tx_attrs(&mut self, payloads: Vec<AttrTxItem>) -> Result<Vec<(StoreOp, AttrId)>> { pub fn tx_attrs(&mut self, payloads: Vec<AttrTxItem>) -> Result<Vec<(StoreOp, AttrId)>> {
@ -94,7 +97,7 @@ impl SessionTx {
}) })
} }
pub(crate) fn all_attrs(&mut self) -> impl Iterator<Item = Result<Attribute>> { pub(crate) fn all_attrs(&mut self) -> impl Iterator<Item=Result<Attribute>> {
AttrIter::new(self.tx.iterator()) AttrIter::new(self.tx.iterator())
} }
@ -105,7 +108,7 @@ impl SessionTx {
attr.id, attr.id,
"cardinality cannot be 'many' for unique or identity attributes".to_string(), "cardinality cannot be 'many' for unique or identity attributes".to_string(),
) )
.into()); .into());
} }
if self.attr_by_kw(&attr.keyword)?.is_some() { if self.attr_by_kw(&attr.keyword)?.is_some() {
@ -116,7 +119,7 @@ impl SessionTx {
attr.keyword attr.keyword
), ),
) )
.into()); .into());
} }
attr.id = AttrId(self.last_attr_id.fetch_add(1, Ordering::AcqRel) + 1); attr.id = AttrId(self.last_attr_id.fetch_add(1, Ordering::AcqRel) + 1);
self.put_attr(&attr, StoreOp::Assert) self.put_attr(&attr, StoreOp::Assert)
@ -136,7 +139,7 @@ impl SessionTx {
attr.id, attr.id,
format!("alias conflict: {}", attr.keyword), format!("alias conflict: {}", attr.keyword),
) )
.into()); .into());
} }
if existing.val_type != attr.val_type if existing.val_type != attr.val_type
|| existing.cardinality != attr.cardinality || existing.cardinality != attr.cardinality
@ -171,7 +174,7 @@ impl SessionTx {
aid, aid,
"attempting to retract non-existing attribute".to_string(), "attempting to retract non-existing attribute".to_string(),
) )
.into()), .into()),
Some(attr) => { Some(attr) => {
self.put_attr(&attr, StoreOp::Retract)?; self.put_attr(&attr, StoreOp::Retract)?;
Ok(attr.id) Ok(attr.id)

@ -1,3 +1,9 @@
use std::sync::atomic::Ordering;
use anyhow::Result;
use cozorocks::{DbIter, IterBuilder};
use crate::data::attr::{Attribute, AttributeTyping}; use crate::data::attr::{Attribute, AttributeTyping};
use crate::data::compare::compare_key; use crate::data::compare::compare_key;
use crate::data::encode::{ use crate::data::encode::{
@ -9,13 +15,10 @@ use crate::data::encode::{
use crate::data::id::{AttrId, EntityId, Validity}; use crate::data::id::{AttrId, EntityId, Validity};
use crate::data::keyword::Keyword; use crate::data::keyword::Keyword;
use crate::data::triple::StoreOp; use crate::data::triple::StoreOp;
use crate::data::tx_triple::{Quintuple, TxAction}; use crate::data::value::{INLINE_VAL_SIZE_LIMIT, StaticValue, Value};
use crate::data::value::{StaticValue, Value, INLINE_VAL_SIZE_LIMIT}; use crate::preprocess::triple::{Quintuple, TxAction};
use crate::runtime::transact::{SessionTx, TransactError}; use crate::runtime::transact::{SessionTx, TransactError};
use crate::utils::swap_option_result; use crate::utils::swap_option_result;
use anyhow::Result;
use cozorocks::{DbIter, IterBuilder};
use std::sync::atomic::Ordering;
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
enum TripleError { enum TripleError {
@ -132,7 +135,7 @@ impl SessionTx {
format!("{:?}", v), format!("{:?}", v),
format!("{:?}", stored_v), format!("{:?}", stored_v),
) )
.into()); .into());
} }
Ok(()) Ok(())
} }
@ -211,8 +214,8 @@ impl SessionTx {
} else { } else {
Validity::NO_HISTORY Validity::NO_HISTORY
}; };
// back scan
if attr.with_history { if attr.with_history {
// back scan in time
for item in self.triple_av_before_scan(attr.id, v, vld_in_key) { for item in self.triple_av_before_scan(attr.id, v, vld_in_key) {
let (_, _, found_eid) = item?; let (_, _, found_eid) = item?;
if found_eid != eid { if found_eid != eid {
@ -220,19 +223,28 @@ impl SessionTx {
attr.keyword.clone(), attr.keyword.clone(),
format!("{:?}", v), format!("{:?}", v),
) )
.into()); .into());
} }
} }
} // fwd scan in time
for item in self.triple_av_after_scan(attr.id, v, vld_in_key) {
for item in self.triple_av_after_scan(attr.id, v, vld_in_key) { let (_, _, found_eid) = item?;
let (_, _, found_eid) = item?; if found_eid != eid {
return Err(TripleError::UniqueConstraintViolated(
attr.keyword.clone(),
format!("{:?}", v),
)
.into());
}
}
} else if let Some(v_slice) = self.tx.get(&ave_encoded, false)? {
let (_, found_eid, _) = decode_ae_key(&v_slice)?;
if found_eid != eid { if found_eid != eid {
return Err(TripleError::UniqueConstraintViolated( return Err(TripleError::UniqueConstraintViolated(
attr.keyword.clone(), attr.keyword.clone(),
format!("{:?}", v), format!("{:?}", v),
) )
.into()); .into());
} }
} }
} }
@ -426,7 +438,7 @@ impl SessionTx {
&mut self, &mut self,
eid: EntityId, eid: EntityId,
aid: AttrId, aid: AttrId,
) -> impl Iterator<Item = Result<(EntityId, AttrId, StaticValue, Validity, StoreOp)>> { ) -> impl Iterator<Item=Result<(EntityId, AttrId, StaticValue, Validity, StoreOp)>> {
let lower = encode_eav_key(eid, aid, &Value::Null, Validity::MAX); let lower = encode_eav_key(eid, aid, &Value::Null, Validity::MAX);
let upper = encode_eav_key(eid, aid, &Value::Bottom, Validity::MIN); let upper = encode_eav_key(eid, aid, &Value::Bottom, Validity::MIN);
TripleEntityAttrIter::new(self.tx.iterator(), lower, upper) TripleEntityAttrIter::new(self.tx.iterator(), lower, upper)
@ -436,7 +448,7 @@ impl SessionTx {
eid: EntityId, eid: EntityId,
aid: AttrId, aid: AttrId,
before: Validity, before: Validity,
) -> impl Iterator<Item = Result<(EntityId, AttrId, StaticValue)>> { ) -> impl Iterator<Item=Result<(EntityId, AttrId, StaticValue)>> {
let lower = encode_eav_key(eid, aid, &Value::Null, Validity::MAX); let lower = encode_eav_key(eid, aid, &Value::Null, Validity::MAX);
let upper = encode_eav_key(eid, aid, &Value::Bottom, Validity::MIN); let upper = encode_eav_key(eid, aid, &Value::Bottom, Validity::MIN);
TripleEntityAttrBeforeIter::new(self.tx.iterator(), lower, upper, before) TripleEntityAttrBeforeIter::new(self.tx.iterator(), lower, upper, before)
@ -445,7 +457,7 @@ impl SessionTx {
&mut self, &mut self,
aid: AttrId, aid: AttrId,
eid: EntityId, eid: EntityId,
) -> impl Iterator<Item = Result<(AttrId, EntityId, StaticValue, Validity, StoreOp)>> { ) -> impl Iterator<Item=Result<(AttrId, EntityId, StaticValue, Validity, StoreOp)>> {
let lower = encode_aev_key(aid, eid, &Value::Null, Validity::MAX); let lower = encode_aev_key(aid, eid, &Value::Null, Validity::MAX);
let upper = encode_aev_key(aid, eid, &Value::Bottom, Validity::MIN); let upper = encode_aev_key(aid, eid, &Value::Bottom, Validity::MIN);
TripleAttrEntityIter::new(self.tx.iterator(), lower, upper) TripleAttrEntityIter::new(self.tx.iterator(), lower, upper)
@ -455,7 +467,7 @@ impl SessionTx {
aid: AttrId, aid: AttrId,
eid: EntityId, eid: EntityId,
before: Validity, before: Validity,
) -> impl Iterator<Item = Result<(AttrId, EntityId, StaticValue)>> { ) -> impl Iterator<Item=Result<(AttrId, EntityId, StaticValue)>> {
let lower = encode_aev_key(aid, eid, &Value::Null, Validity::MAX); let lower = encode_aev_key(aid, eid, &Value::Null, Validity::MAX);
let upper = encode_aev_key(aid, eid, &Value::Bottom, Validity::MIN); let upper = encode_aev_key(aid, eid, &Value::Bottom, Validity::MIN);
TripleAttrEntityBeforeIter::new(self.tx.iterator(), lower, upper, before) TripleAttrEntityBeforeIter::new(self.tx.iterator(), lower, upper, before)
@ -464,7 +476,7 @@ impl SessionTx {
&mut self, &mut self,
aid: AttrId, aid: AttrId,
v: &Value, v: &Value,
) -> impl Iterator<Item = Result<(AttrId, StaticValue, EntityId, Validity, StoreOp)>> { ) -> impl Iterator<Item=Result<(AttrId, StaticValue, EntityId, Validity, StoreOp)>> {
let lower = encode_ave_key(aid, v, EntityId::MIN_PERM, Validity::MAX); let lower = encode_ave_key(aid, v, EntityId::MIN_PERM, Validity::MAX);
let upper = encode_ave_key(aid, v, EntityId::MAX_PERM, Validity::MIN); let upper = encode_ave_key(aid, v, EntityId::MAX_PERM, Validity::MIN);
TripleAttrValueIter::new(self.tx.iterator(), lower, upper) TripleAttrValueIter::new(self.tx.iterator(), lower, upper)
@ -474,7 +486,7 @@ impl SessionTx {
aid: AttrId, aid: AttrId,
v: &Value, v: &Value,
before: Validity, before: Validity,
) -> impl Iterator<Item = Result<(AttrId, StaticValue, EntityId)>> { ) -> impl Iterator<Item=Result<(AttrId, StaticValue, EntityId)>> {
let lower = encode_ave_key(aid, v, EntityId::MIN_PERM, Validity::MAX); let lower = encode_ave_key(aid, v, EntityId::MIN_PERM, Validity::MAX);
let upper = encode_ave_key(aid, v, EntityId::MAX_PERM, Validity::MIN); let upper = encode_ave_key(aid, v, EntityId::MAX_PERM, Validity::MIN);
TripleAttrValueBeforeIter::new(self.tx.iterator(), lower, upper, before) TripleAttrValueBeforeIter::new(self.tx.iterator(), lower, upper, before)
@ -484,7 +496,7 @@ impl SessionTx {
aid: AttrId, aid: AttrId,
v: &Value, v: &Value,
after: Validity, after: Validity,
) -> impl Iterator<Item = Result<(AttrId, StaticValue, EntityId)>> { ) -> impl Iterator<Item=Result<(AttrId, StaticValue, EntityId)>> {
let lower = encode_ave_key(aid, v, EntityId::MIN_PERM, Validity::MAX); let lower = encode_ave_key(aid, v, EntityId::MIN_PERM, Validity::MAX);
let upper = encode_ave_key(aid, v, EntityId::MAX_PERM, Validity::MIN); let upper = encode_ave_key(aid, v, EntityId::MAX_PERM, Validity::MIN);
TripleAttrValueAfterIter::new(self.tx.iterator(), lower, upper, after) TripleAttrValueAfterIter::new(self.tx.iterator(), lower, upper, after)
@ -493,7 +505,7 @@ impl SessionTx {
&mut self, &mut self,
v_eid: EntityId, v_eid: EntityId,
aid: AttrId, aid: AttrId,
) -> impl Iterator<Item = Result<(EntityId, AttrId, EntityId, Validity, StoreOp)>> { ) -> impl Iterator<Item=Result<(EntityId, AttrId, EntityId, Validity, StoreOp)>> {
let lower = encode_vae_key(v_eid, aid, EntityId::MIN_PERM, Validity::MAX); let lower = encode_vae_key(v_eid, aid, EntityId::MIN_PERM, Validity::MAX);
let upper = encode_vae_key(v_eid, aid, EntityId::MAX_PERM, Validity::MIN); let upper = encode_vae_key(v_eid, aid, EntityId::MAX_PERM, Validity::MIN);
TripleValueRefAttrIter::new(self.tx.iterator(), lower, upper) TripleValueRefAttrIter::new(self.tx.iterator(), lower, upper)
@ -503,7 +515,7 @@ impl SessionTx {
v_eid: EntityId, v_eid: EntityId,
aid: AttrId, aid: AttrId,
before: Validity, before: Validity,
) -> impl Iterator<Item = Result<(EntityId, AttrId, EntityId)>> { ) -> impl Iterator<Item=Result<(EntityId, AttrId, EntityId)>> {
let lower = encode_vae_key(v_eid, aid, EntityId::MIN_PERM, Validity::MAX); let lower = encode_vae_key(v_eid, aid, EntityId::MIN_PERM, Validity::MAX);
let upper = encode_vae_key(v_eid, aid, EntityId::MAX_PERM, Validity::MIN); let upper = encode_vae_key(v_eid, aid, EntityId::MAX_PERM, Validity::MIN);
TripleValueRefAttrBeforeIter::new(self.tx.iterator(), lower, upper, before) TripleValueRefAttrBeforeIter::new(self.tx.iterator(), lower, upper, before)
@ -511,7 +523,7 @@ impl SessionTx {
pub(crate) fn triple_e_scan( pub(crate) fn triple_e_scan(
&mut self, &mut self,
eid: EntityId, eid: EntityId,
) -> impl Iterator<Item = Result<(EntityId, AttrId, StaticValue, Validity, StoreOp)>> { ) -> impl Iterator<Item=Result<(EntityId, AttrId, StaticValue, Validity, StoreOp)>> {
let lower = encode_eav_key(eid, AttrId::MIN_PERM, &Value::Null, Validity::MAX); let lower = encode_eav_key(eid, AttrId::MIN_PERM, &Value::Null, Validity::MAX);
let upper = encode_eav_key(eid, AttrId::MAX_PERM, &Value::Bottom, Validity::MIN); let upper = encode_eav_key(eid, AttrId::MAX_PERM, &Value::Bottom, Validity::MIN);
TripleEntityAttrIter::new(self.tx.iterator(), lower, upper) TripleEntityAttrIter::new(self.tx.iterator(), lower, upper)
@ -520,7 +532,7 @@ impl SessionTx {
&mut self, &mut self,
eid: EntityId, eid: EntityId,
before: Validity, before: Validity,
) -> impl Iterator<Item = Result<(EntityId, AttrId, StaticValue)>> { ) -> impl Iterator<Item=Result<(EntityId, AttrId, StaticValue)>> {
let lower = encode_eav_key(eid, AttrId::MIN_PERM, &Value::Null, Validity::MAX); let lower = encode_eav_key(eid, AttrId::MIN_PERM, &Value::Null, Validity::MAX);
let upper = encode_eav_key(eid, AttrId::MAX_PERM, &Value::Bottom, Validity::MIN); let upper = encode_eav_key(eid, AttrId::MAX_PERM, &Value::Bottom, Validity::MIN);
TripleEntityAttrBeforeIter::new(self.tx.iterator(), lower, upper, before) TripleEntityAttrBeforeIter::new(self.tx.iterator(), lower, upper, before)
@ -528,7 +540,7 @@ impl SessionTx {
pub(crate) fn triple_a_scan( pub(crate) fn triple_a_scan(
&mut self, &mut self,
aid: AttrId, aid: AttrId,
) -> impl Iterator<Item = Result<(AttrId, EntityId, StaticValue, Validity, StoreOp)>> { ) -> impl Iterator<Item=Result<(AttrId, EntityId, StaticValue, Validity, StoreOp)>> {
let lower = encode_aev_key(aid, EntityId::MIN_PERM, &Value::Null, Validity::MAX); let lower = encode_aev_key(aid, EntityId::MIN_PERM, &Value::Null, Validity::MAX);
let upper = encode_aev_key(aid, EntityId::MAX_PERM, &Value::Bottom, Validity::MIN); let upper = encode_aev_key(aid, EntityId::MAX_PERM, &Value::Bottom, Validity::MIN);
TripleAttrEntityIter::new(self.tx.iterator(), lower, upper) TripleAttrEntityIter::new(self.tx.iterator(), lower, upper)
@ -537,14 +549,14 @@ impl SessionTx {
&mut self, &mut self,
aid: AttrId, aid: AttrId,
before: Validity, before: Validity,
) -> impl Iterator<Item = Result<(AttrId, EntityId, StaticValue)>> { ) -> impl Iterator<Item=Result<(AttrId, EntityId, StaticValue)>> {
let lower = encode_aev_key(aid, EntityId::MIN_PERM, &Value::Null, Validity::MAX); let lower = encode_aev_key(aid, EntityId::MIN_PERM, &Value::Null, Validity::MAX);
let upper = encode_aev_key(aid, EntityId::MAX_PERM, &Value::Bottom, Validity::MIN); let upper = encode_aev_key(aid, EntityId::MAX_PERM, &Value::Bottom, Validity::MIN);
TripleAttrEntityBeforeIter::new(self.tx.iterator(), lower, upper, before) TripleAttrEntityBeforeIter::new(self.tx.iterator(), lower, upper, before)
} }
pub(crate) fn triple_a_scan_all( pub(crate) fn triple_a_scan_all(
&mut self, &mut self,
) -> impl Iterator<Item = Result<(AttrId, EntityId, StaticValue, Validity, StoreOp)>> { ) -> impl Iterator<Item=Result<(AttrId, EntityId, StaticValue, Validity, StoreOp)>> {
let lower = encode_aev_key( let lower = encode_aev_key(
AttrId::MIN_PERM, AttrId::MIN_PERM,
EntityId::MIN_PERM, EntityId::MIN_PERM,
@ -562,7 +574,7 @@ impl SessionTx {
pub(crate) fn triple_a_before_scan_all( pub(crate) fn triple_a_before_scan_all(
&mut self, &mut self,
before: Validity, before: Validity,
) -> impl Iterator<Item = Result<(AttrId, EntityId, StaticValue)>> { ) -> impl Iterator<Item=Result<(AttrId, EntityId, StaticValue)>> {
let lower = encode_aev_key( let lower = encode_aev_key(
AttrId::MIN_PERM, AttrId::MIN_PERM,
EntityId::MIN_PERM, EntityId::MIN_PERM,
@ -580,7 +592,7 @@ impl SessionTx {
pub(crate) fn triple_vref_scan( pub(crate) fn triple_vref_scan(
&mut self, &mut self,
v_eid: EntityId, v_eid: EntityId,
) -> impl Iterator<Item = Result<(EntityId, AttrId, EntityId, Validity, StoreOp)>> { ) -> impl Iterator<Item=Result<(EntityId, AttrId, EntityId, Validity, StoreOp)>> {
let lower = encode_vae_key(v_eid, AttrId::MIN_PERM, EntityId::MIN_PERM, Validity::MAX); let lower = encode_vae_key(v_eid, AttrId::MIN_PERM, EntityId::MIN_PERM, Validity::MAX);
let upper = encode_vae_key(v_eid, AttrId::MAX_PERM, EntityId::MAX_PERM, Validity::MIN); let upper = encode_vae_key(v_eid, AttrId::MAX_PERM, EntityId::MAX_PERM, Validity::MIN);
TripleValueRefAttrIter::new(self.tx.iterator(), lower, upper) TripleValueRefAttrIter::new(self.tx.iterator(), lower, upper)
@ -589,7 +601,7 @@ impl SessionTx {
&mut self, &mut self,
v_eid: EntityId, v_eid: EntityId,
before: Validity, before: Validity,
) -> impl Iterator<Item = Result<(EntityId, AttrId, EntityId)>> { ) -> impl Iterator<Item=Result<(EntityId, AttrId, EntityId)>> {
let lower = encode_vae_key(v_eid, AttrId::MIN_PERM, EntityId::MIN_PERM, Validity::MAX); let lower = encode_vae_key(v_eid, AttrId::MIN_PERM, EntityId::MIN_PERM, Validity::MAX);
let upper = encode_vae_key(v_eid, AttrId::MAX_PERM, EntityId::MAX_PERM, Validity::MIN); let upper = encode_vae_key(v_eid, AttrId::MAX_PERM, EntityId::MAX_PERM, Validity::MIN);
TripleValueRefAttrBeforeIter::new(self.tx.iterator(), lower, upper, before) TripleValueRefAttrBeforeIter::new(self.tx.iterator(), lower, upper, before)

Loading…
Cancel
Save