everything uses anyhow

main
Ziyang Hu 2 years ago
parent f1692bc072
commit 3565f85e44

@ -13,7 +13,6 @@ uuid = { version = "1.1.2", features = ["v1", "v4", "serde"] }
rand = "0.8.5"
anyhow = "1.0"
lazy_static = "1.4.0"
thiserror = "1.0.30"
log = "0.4.16"
env_logger = "0.9.0"
smallvec = { version = "1.8.1", features = ["serde", "write", "union", "const_generics", "const_new"] }

@ -1,6 +1,6 @@
use std::fmt::{Display, Formatter};
use anyhow::{bail, Result};
use anyhow::{anyhow, bail, Result};
use rmp_serde::Serializer;
use serde::Serialize;
use serde_derive::{Deserialize, Serialize};
@ -112,15 +112,9 @@ impl TryFrom<&'_ str> for AttributeTyping {
}
}
#[derive(Debug, thiserror::Error)]
pub(crate) enum TypeError {
#[error("provided value {1} is not of type {0:?}")]
Typing(AttributeTyping, String),
}
impl AttributeTyping {
fn type_err(&self, val: DataValue) -> TypeError {
TypeError::Typing(*self, format!("{:?}", val))
fn type_err(&self, val: DataValue) -> anyhow::Error {
anyhow!("cannot coerce {:?} to {:?}", val, self)
}
pub(crate) fn coerce_value(&self, val: DataValue) -> Result<DataValue> {
match self {

@ -11,8 +11,7 @@ use crate::data::json::JsonValue;
use crate::data::keyword::{Keyword, PROG_ENTRY};
use crate::data::value::DataValue;
use crate::query::compile::{
Atom, AttrTripleAtom, BindingHeadTerm, DatalogProgram, QueryCompilationError, Rule,
RuleApplyAtom, RuleSet, Term,
Atom, AttrTripleAtom, BindingHeadTerm, DatalogProgram, Rule, RuleApplyAtom, RuleSet, Term,
};
use crate::query::magic::magic_sets_rewrite;
use crate::runtime::transact::SessionTx;

@ -2,7 +2,7 @@ use std::collections::{BTreeMap, BTreeSet};
use std::fmt::{Debug, Formatter};
use std::ops::Sub;
use anyhow::Result;
use anyhow::{anyhow, ensure, Result};
use itertools::Itertools;
use crate::data::attr::Attribute;
@ -35,39 +35,6 @@ use crate::{EntityId, Validity};
/// ]
/// ```
/// we also have `F.is_married(["anne", "brutus"], ["constantine", "delphi"])` for ad-hoc fact rules
#[derive(Debug, thiserror::Error)]
pub enum QueryCompilationError {
#[error("error parsing query clause {0}: {1}")]
UnexpectedForm(JsonValue, String),
#[error("arity mismatch for rule {0}: all definitions must have the same arity")]
ArityMismatch(Keyword),
#[error("encountered undefined rule {0}")]
UndefinedRule(Keyword),
#[error("safety: unbound variables for rule {0}({1}): needs to return {2:?}, bound {3:?}")]
UnsafeUnboundVars(Keyword, usize, BTreeSet<Keyword>, BTreeSet<Keyword>),
#[error("program logic error: {0}")]
LogicError(String),
#[error("entry not found: expect a rule named '?'")]
EntryNotFound,
#[error("duplicate variables: {0:?}")]
DuplicateVariables(Vec<Keyword>),
#[error("no entry to program: you must define a rule named '?'")]
NoEntryToProgram,
#[error("heads for entry must be identical")]
EntryHeadsNotIdentical,
#[error("required binding not found: {0}")]
BindingNotFound(Keyword),
#[error("unknown operator '{0}'")]
UnknownOperator(String),
#[error("op {0} arity mismatch: expected {1} arguments, found {2}")]
PredicateArityMismatch(&'static str, usize, usize),
#[error("op {0} is not a predicate")]
NotAPredicate(&'static str),
#[error("unsafe bindings in expression {0:?}: {1:?}")]
UnsafeBindingInPredicate(Expr, BTreeSet<Keyword>),
#[error("negation safety: all variables {0:?} are unbound")]
NegationSafety(Vec<Keyword>),
}
#[derive(Clone, Debug)]
pub enum Term<T> {
@ -364,13 +331,15 @@ impl SessionTx {
Atom::Rule(rule_app) => {
let (store, arity) = stores
.get(&rule_app.name)
.ok_or_else(|| QueryCompilationError::UndefinedRule(rule_app.name.clone()))?
.ok_or_else(|| anyhow!("undefined rule {} encountered", rule_app.name))?
.clone();
if arity != rule_app.args.len() {
return Err(
QueryCompilationError::ArityMismatch(rule_app.name.clone()).into()
);
}
ensure!(
arity == rule_app.args.len(),
"arity mismatch in rule application {}, expect {}, found {}",
rule_app.name,
arity,
rule_app.args.len()
);
let mut prev_joiner_vars = vec![];
let mut temp_left_bindings = vec![];
@ -518,13 +487,12 @@ impl SessionTx {
v_kw.clone()
}
};
if join_right_keys.is_empty() {
return Err(QueryCompilationError::NegationSafety(vec![
e_kw.clone(),
v_kw.clone(),
])
.into());
}
ensure!(
!join_right_keys.is_empty(),
"unsafe negation: {} and {} are unbound",
e_kw,
v_kw
);
let right =
Relation::triple(a_triple.attr.clone(), vld, e_kw, v_kw);
if ret.is_unit() {
@ -565,15 +533,16 @@ impl SessionTx {
let (store, arity) = stores
.get(&rule_app.name)
.ok_or_else(|| {
QueryCompilationError::UndefinedRule(rule_app.name.clone())
anyhow!("undefined rule encountered: {}", rule_app.name)
})?
.clone();
if arity != rule_app.args.len() {
return Err(QueryCompilationError::ArityMismatch(
rule_app.name.clone(),
)
.into());
}
ensure!(
arity == rule_app.args.len(),
"arity mismatch for {}, expect {}, got {}",
rule_app.name,
arity,
rule_app.args.len()
);
let mut prev_joiner_vars = vec![];
let mut temp_left_bindings = vec![];
@ -633,15 +602,14 @@ impl SessionTx {
}
let cur_ret_set: BTreeSet<_> = ret.bindings_after_eliminate().into_iter().collect();
if cur_ret_set != ret_vars_set {
return Err(QueryCompilationError::UnsafeUnboundVars(
rule_name.clone(),
rule_idx,
ret_vars_set,
cur_ret_set,
)
.into());
}
ensure!(
cur_ret_set == ret_vars_set,
"unbound variables in rule head for {}.{}: variables required {:?}, of which only {:?} are bound",
rule_name,
rule_idx,
ret_vars_set,
cur_ret_set
);
let cur_ret_bindings = ret.bindings_after_eliminate();
if ret_vars != cur_ret_bindings {
ret = ret.reorder(ret_vars.to_vec());

@ -1,13 +1,13 @@
use std::collections::{BTreeMap, BTreeSet};
use std::mem;
use anyhow::Result;
use anyhow::{anyhow, Result};
use itertools::Itertools;
use log::{debug, log_enabled, trace, Level};
use crate::data::keyword::{Keyword, PROG_ENTRY};
use crate::query::compile::{
BindingHeadFormatter, BindingHeadTerm, DatalogProgram, QueryCompilationError,
BindingHeadFormatter, BindingHeadTerm, DatalogProgram,
};
use crate::query::magic::magic_sets_rewrite;
use crate::query::relation::Relation;
@ -28,7 +28,7 @@ impl SessionTx {
.collect::<BTreeMap<_, _>>();
let ret_area = stores
.get(&PROG_ENTRY)
.ok_or(QueryCompilationError::EntryNotFound)?
.ok_or_else(|| anyhow!("program entry not found in rules"))?
.0
.clone();

@ -1,13 +1,13 @@
use std::collections::BTreeMap;
use anyhow::Result;
use anyhow::{anyhow, bail, ensure, Result};
use itertools::Itertools;
use serde_json::json;
use crate::data::attr::AttributeTyping;
use crate::data::json::JsonValue;
use crate::data::keyword::{Keyword, PROG_ENTRY};
use crate::query::compile::{DatalogProgram, QueryCompilationError};
use crate::query::compile::DatalogProgram;
use crate::query::pull::{CurrentPath, PullSpecs};
use crate::query::relation::flatten_err;
use crate::runtime::transact::SessionTx;
@ -15,12 +15,12 @@ use crate::Validity;
pub(crate) mod compile;
pub(crate) mod eval;
pub(crate) mod graph;
pub(crate) mod logical;
pub(crate) mod magic;
pub(crate) mod pull;
pub(crate) mod relation;
pub(crate) mod logical;
pub(crate) mod graph;
pub(crate) mod stratify;
pub(crate) mod magic;
impl SessionTx {
pub fn run_query(&mut self, payload: &JsonValue) -> Result<QueryResult<'_>> {
@ -28,19 +28,13 @@ impl SessionTx {
None => Validity::current(),
Some(v) => Validity::try_from(v)?,
};
let q = payload.get("q").ok_or_else(|| {
QueryCompilationError::UnexpectedForm(payload.clone(), "expect key 'q'".to_string())
})?;
let rules_payload = q.as_array().ok_or_else(|| {
QueryCompilationError::UnexpectedForm(q.clone(), "expect array".to_string())
})?;
if rules_payload.is_empty() {
return Err(QueryCompilationError::UnexpectedForm(
payload.clone(),
"empty rules".to_string(),
)
.into());
}
let q = payload
.get("q")
.ok_or_else(|| anyhow!("expect field 'q' in query {}", payload))?;
let rules_payload = q
.as_array()
.ok_or_else(|| anyhow!("expect field 'q' to be an array in query {}", payload))?;
ensure!(!rules_payload.is_empty(), "no rules in {}", payload);
let prog = if rules_payload.first().unwrap().is_array() {
let q = json!([{"rule": "?", "args": rules_payload}]);
self.parse_rule_sets(&q, vld)?
@ -143,11 +137,7 @@ impl SessionTx {
.map(flatten_err),
))
}
Some(v) => Err(QueryCompilationError::UnexpectedForm(
v.clone(),
"out specification should be an array".to_string(),
)
.into()),
Some(v) => bail!("out spec should be an array, found {}", v),
}
}
fn parse_pull_specs_for_query(
@ -174,47 +164,30 @@ impl SessionTx {
let kw = Keyword::from(s as &str);
let idx = *entry_bindings
.get(&kw)
.ok_or_else(|| QueryCompilationError::BindingNotFound(kw.clone()))?;
.ok_or_else(|| anyhow!("binding {} not found", kw))?;
Ok((idx, None))
}
JsonValue::Object(m) => {
let kw = m
.get("pull")
.ok_or_else(|| {
QueryCompilationError::UnexpectedForm(
JsonValue::Object(m.clone()),
"expect key 'pull'".to_string(),
)
})?
.ok_or_else(|| anyhow!("expect field 'pull' in {:?}", m))?
.as_str()
.ok_or_else(|| {
QueryCompilationError::UnexpectedForm(
JsonValue::Object(m.clone()),
"expect key 'pull' to have a binding as value".to_string(),
)
})?;
.ok_or_else(|| anyhow!("expect 'pull' to be a binding in {:?}", m))?;
let kw = Keyword::from(kw);
let idx = *entry_bindings
.get(&kw)
.ok_or_else(|| QueryCompilationError::BindingNotFound(kw.clone()))?;
let spec = m.get("spec").ok_or_else(|| {
QueryCompilationError::UnexpectedForm(
JsonValue::Object(m.clone()),
"expect key 'spec'".to_string(),
)
})?;
.ok_or_else(|| anyhow!("binding {} not found", kw))?;
let spec = m
.get("spec")
.ok_or_else(|| anyhow!("expect field 'spec' in {:?}", m))?;
let specs = self.parse_pull(spec, 0)?;
Ok((idx, Some(specs)))
}
v => Err(QueryCompilationError::UnexpectedForm(
v.clone(),
"expect binding or map".to_string(),
)
.into()),
v => bail!("expect binding or map, got {:?}", v),
}
})
.try_collect()
}
}
pub type QueryResult<'a> = Box<dyn Iterator<Item=Result<JsonValue>> + 'a>;
pub type QueryResult<'a> = Box<dyn Iterator<Item = Result<JsonValue>> + 'a>;

@ -2,7 +2,7 @@ use std::collections::{BTreeMap, BTreeSet};
use std::fmt::{Debug, Formatter};
use std::iter;
use anyhow::Result;
use anyhow::{bail, Result};
use itertools::Itertools;
use crate::data::attr::Attribute;
@ -10,7 +10,6 @@ use crate::data::expr::Expr;
use crate::data::keyword::Keyword;
use crate::data::tuple::{Tuple, TupleIter};
use crate::data::value::DataValue;
use crate::query::compile::QueryCompilationError;
use crate::runtime::temp_store::{TempStore, TempStoreId};
use crate::runtime::transact::SessionTx;
use crate::Validity;
@ -1035,7 +1034,7 @@ impl StoredDerivedRelation {
'outer: for found in self.storage.scan_prefix(&prefix) {
let found = found?;
for (left_idx, right_idx) in
left_join_indices.iter().zip(right_join_indices.iter())
left_join_indices.iter().zip(right_join_indices.iter())
{
if tuple.0[*left_idx] != found.0[*right_idx] {
continue 'outer;
@ -1142,7 +1141,7 @@ impl Debug for Joiner {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let left_bindings = BindingFormatter(self.left_keys.clone());
let right_bindings = BindingFormatter(self.right_keys.clone());
write!(f, "{:?}<->{:?}", left_bindings, right_bindings, )
write!(f, "{:?}<->{:?}", left_bindings, right_bindings,)
}
}
@ -1167,19 +1166,15 @@ impl Joiner {
for (l, r) in self.left_keys.iter().zip(self.right_keys.iter()) {
let l_pos = match left_binding_map.get(l) {
None => {
return Err(QueryCompilationError::LogicError(format!(
"join key is wrong: left binding for {} not found: left {:?} vs right {:?}, {:?}",
l, left_bindings, right_bindings, self
)).into());
bail!("join key is wrong: left binding for {} not found: left {:?} vs right {:?}, {:?}",
l, left_bindings, right_bindings, self);
}
Some(p) => p,
};
let r_pos = match right_binding_map.get(r) {
None => {
return Err(QueryCompilationError::LogicError(format!(
"join key is wrong: right binding for {} not found: left {:?} vs right {:?}, {:?}",
r, left_bindings, right_bindings, self
)).into());
bail!("join key is wrong: right binding for {} not found: left {:?} vs right {:?}, {:?}",
r, left_bindings, right_bindings, self);
}
Some(p) => p,
};

@ -1,23 +1,15 @@
use std::collections::{BTreeMap, BTreeSet};
use std::collections::btree_map::Entry;
use std::collections::{BTreeMap, BTreeSet};
use anyhow::Result;
use anyhow::{ensure, Result};
use itertools::Itertools;
use crate::data::keyword::{Keyword, PROG_ENTRY};
use crate::query::compile::{Atom, DatalogProgram};
use crate::query::graph::{
generalized_kahn, Graph, reachable_components, StratifiedGraph, strongly_connected_components,
generalized_kahn, reachable_components, strongly_connected_components, Graph, StratifiedGraph,
};
#[derive(thiserror::Error, Debug)]
pub enum GraphError {
#[error("every program requires an entry named '?'")]
EntryNotFound,
#[error("the rules #{0:?} form a cycle with negation/aggregation inside, which is unsafe")]
GraphNotStratified(BTreeSet<Keyword>),
}
impl Atom {
fn contained_rules(&self) -> BTreeMap<&Keyword, bool> {
match self {
@ -86,12 +78,11 @@ fn verify_no_cycle(g: &StratifiedGraph<&'_ Keyword>, sccs: &[BTreeSet<&Keyword>]
for scc in sccs {
if scc.contains(k) {
for (v, negated) in vs {
if *negated && scc.contains(v) {
return Err(GraphError::GraphNotStratified(
scc.iter().cloned().cloned().collect(),
)
.into());
}
ensure!(
!negated || !scc.contains(v),
"unstratifiable graph: cycle {:?} contains negation or aggregation",
scc
);
}
}
}
@ -137,9 +128,10 @@ pub(crate) fn stratify_program(prog: &DatalogProgram) -> Result<Vec<DatalogProgr
let prog_entry: &Keyword = &PROG_ENTRY;
let stratified_graph = convert_program_to_graph(&prog);
let graph = reduce_to_graph(&stratified_graph);
if !graph.contains_key(prog_entry) {
return Err(GraphError::EntryNotFound.into());
}
ensure!(
graph.contains_key(prog_entry),
"program graph does not have an entry"
);
// 1. find reachable clauses starting from the query
let reachable: BTreeSet<_> = reachable_components(&graph, &prog_entry)
@ -167,9 +159,11 @@ pub(crate) fn stratify_program(prog: &DatalogProgram) -> Result<Vec<DatalogProgr
// 6. topological sort the reduced graph to get a stratification
let sort_result = generalized_kahn(&reduced_graph, stratified_graph.len());
let n_strata = sort_result.len();
let invert_sort_result = sort_result.into_iter().enumerate().flat_map(|(stratum, indices)| {
indices.into_iter().map(move |idx| (idx, stratum))
}).collect::<BTreeMap<_, _>>();
let invert_sort_result = sort_result
.into_iter()
.enumerate()
.flat_map(|(stratum, indices)| indices.into_iter().map(move |idx| (idx, stratum)))
.collect::<BTreeMap<_, _>>();
// 7. translate the stratification into datalog program
let mut ret: Vec<DatalogProgram> = vec![Default::default(); n_strata];
for (name, ruleset) in prog {

@ -1,8 +1,8 @@
use std::collections::{BTreeMap, BTreeSet};
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
use std::sync::Arc;
use anyhow::Result;
use anyhow::{anyhow, Result};
use rmp_serde::Serializer;
use serde::Serialize;
use serde_derive::{Deserialize, Serialize};
@ -31,7 +31,7 @@ pub struct SessionTx {
pub(crate) attr_by_kw_cache: BTreeMap<Keyword, Option<Attribute>>,
pub(crate) temp_entity_to_perm: BTreeMap<EntityId, EntityId>,
pub(crate) eid_by_attr_val_cache:
BTreeMap<DataValue, BTreeMap<(AttrId, Validity), Option<EntityId>>>,
BTreeMap<DataValue, BTreeMap<(AttrId, Validity), Option<EntityId>>>,
// "touched" requires the id to exist prior to the transaction, and something related to it has changed
pub(crate) touched_eids: BTreeSet<EntityId>,
}
@ -130,8 +130,9 @@ impl SessionTx {
Ok(())
}
pub(crate) fn get_write_tx_id(&self) -> std::result::Result<TxId, TransactError> {
self.w_tx_id.ok_or(TransactError::WriteInReadOnly)
pub(crate) fn get_write_tx_id(&self) -> Result<TxId> {
self.w_tx_id
.ok_or_else(|| anyhow!("attempting to write in read-only transaction"))
}
pub(crate) fn bounded_scan(&mut self, lower: &[u8], upper: &[u8]) -> DbIter {
self.tx
@ -159,23 +160,3 @@ impl SessionTx {
it
}
}
#[derive(Debug, thiserror::Error)]
pub enum TransactError {
#[error("attribute conflict for {0:?}: {1}")]
AttrConflict(AttrId, String),
#[error("attribute consistency for {0:?}: {1}")]
AttrConsistency(AttrId, String),
#[error("attribute not found {0:?}")]
AttrNotFound(AttrId),
#[error("attribute not found {0}")]
AttrNotFoundKw(Keyword),
#[error("attempt to write in read-only transaction")]
WriteInReadOnly,
#[error("attempt to change immutable property for attr {0:?}")]
ChangingImmutableProperty(AttrId),
#[error("required triple not found for {0:?}, {1:?}")]
RequiredTripleNotFound(EntityId, AttrId),
#[error("precondition failed for {0:?} {1:?}, expect {2}, got {3}")]
PreconditionFailed(EntityId, AttrId, String, String),
}

@ -1,6 +1,6 @@
use std::sync::atomic::Ordering;
use anyhow::Result;
use anyhow::{anyhow, ensure, Result};
use cozorocks::{DbIter, IterBuilder};
@ -17,19 +17,9 @@ use crate::data::keyword::Keyword;
use crate::data::triple::StoreOp;
use crate::data::value::{DataValue, INLINE_VAL_SIZE_LIMIT};
use crate::parse::triple::{Quintuple, TxAction};
use crate::runtime::transact::{SessionTx, TransactError};
use crate::runtime::transact::SessionTx;
use crate::utils::swap_option_result;
#[derive(Debug, thiserror::Error)]
enum ExecError {
#[error("use of temp entity id: {0:?}")]
TempEid(EntityId),
#[error("unique constraint violated: {0} {1}")]
UniqueConstraintViolated(Keyword, String),
#[error("triple not found for {0:?} {1:?} {2:?}")]
TripleEANotFound(EntityId, AttrId, Validity),
}
impl SessionTx {
pub fn tx_triples(&mut self, payloads: Vec<Quintuple>) -> Result<Vec<(EntityId, isize)>> {
let mut ret = Vec::with_capacity(payloads.len());
@ -107,7 +97,7 @@ impl SessionTx {
) -> Result<()> {
let aid = attr.id;
let sentinel = encode_sentinel_entity_attr(eid, aid);
let gen_err = || TransactError::RequiredTripleNotFound(eid, aid);
let gen_err = || anyhow!("required triple not found for {:?}, {:?}", eid, aid);
self.tx.get(&sentinel, true)?.ok_or_else(gen_err)?;
let v_in_key = if attr.cardinality.is_one() {
&DataValue::Bottom
@ -126,15 +116,12 @@ impl SessionTx {
} else {
decode_value_from_key(k_slice)?
};
if stored_v != *v {
return Err(TransactError::PreconditionFailed(
eid,
aid,
format!("{:?}", v),
format!("{:?}", stored_v),
)
.into());
}
ensure!(
stored_v == *v,
"precondition check failed: wanted {:?}, got {:?}",
v,
stored_v
);
Ok(())
}
pub(crate) fn write_triple(
@ -220,34 +207,31 @@ impl SessionTx {
// back scan in time
for item in self.triple_av_before_scan(attr.id, v, vld_in_key) {
let (_, _, found_eid) = item?;
if found_eid != eid {
return Err(ExecError::UniqueConstraintViolated(
attr.keyword.clone(),
format!("{:?}", v),
)
.into());
}
ensure!(
found_eid == eid,
"unique constraint violated for attr {} with value {:?}",
attr.keyword,
v
);
}
// fwd scan in time
for item in self.triple_av_after_scan(attr.id, v, vld_in_key) {
let (_, _, found_eid) = item?;
if found_eid != eid {
return Err(ExecError::UniqueConstraintViolated(
attr.keyword.clone(),
format!("{:?}", v),
)
.into());
}
ensure!(
found_eid == eid,
"unique constraint violated for attr {} with value {:?}",
attr.keyword,
v
);
}
} else if let Some(v_slice) = self.tx.get(&ave_encoded, false)? {
let (_, found_eid, _) = decode_ae_key(&v_slice)?;
if found_eid != eid {
return Err(ExecError::UniqueConstraintViolated(
attr.keyword.clone(),
format!("{:?}", v),
)
.into());
}
ensure!(
found_eid == eid,
"unique constraint violated for attr {} with value {:?}",
attr.keyword,
v
);
}
}
let e_in_val_encoded = DataValue::EnId(eid).encode_with_op_and_tx(op, tx_id);
@ -317,9 +301,7 @@ impl SessionTx {
v: &DataValue,
vld: Validity,
) -> Result<EntityId> {
if !eid.is_perm() {
return Err(ExecError::TempEid(eid).into());
}
ensure!(eid.is_perm(), "temp id not allowed here: {:?}", eid);
// checking that the eid actually exists should be done in the preprocessing step
self.write_triple(eid, attr, v, vld, StoreOp::Retract)
}
@ -378,7 +360,7 @@ impl SessionTx {
if op.is_assert() {
let cur_attr = self
.attr_by_id(cur_aid)?
.ok_or(TransactError::AttrNotFound(cur_aid))?;
.ok_or_else(|| anyhow!("attribute not found for {:?}", cur_aid))?;
self.retract_triple(cur_eid, &cur_attr, &cur_v, vld)?;
counter -= 1;
}
@ -436,14 +418,14 @@ impl SessionTx {
let res = self
.tx
.get(&encoded, false)?
.ok_or(ExecError::TripleEANotFound(eid, aid, vld))?;
.ok_or_else(|| anyhow!("triple not found with {:?} and {:?}", eid, aid))?;
decode_value(&res.as_ref()[1..])
}
pub(crate) fn triple_ea_scan(
&self,
eid: EntityId,
aid: AttrId,
) -> impl Iterator<Item=Result<(EntityId, AttrId, DataValue, Validity, StoreOp)>> {
) -> impl Iterator<Item = Result<(EntityId, AttrId, DataValue, Validity, StoreOp)>> {
let lower = encode_eav_key(eid, aid, &DataValue::Null, Validity::MAX);
let upper = encode_eav_key(eid, aid, &DataValue::Bottom, Validity::MIN);
TripleEntityAttrIter::new(self.tx.iterator(), lower, upper)
@ -453,7 +435,7 @@ impl SessionTx {
eid: EntityId,
aid: AttrId,
before: Validity,
) -> impl Iterator<Item=Result<(EntityId, AttrId, DataValue)>> {
) -> impl Iterator<Item = Result<(EntityId, AttrId, DataValue)>> {
let lower = encode_eav_key(eid, aid, &DataValue::Null, Validity::MAX);
let upper = encode_eav_key(eid, aid, &DataValue::Bottom, Validity::MIN);
TripleEntityAttrBeforeIter::new(self.tx.iterator(), lower, upper, before)
@ -477,7 +459,7 @@ impl SessionTx {
&self,
aid: AttrId,
eid: EntityId,
) -> impl Iterator<Item=Result<(AttrId, EntityId, DataValue, Validity, StoreOp)>> {
) -> impl Iterator<Item = Result<(AttrId, EntityId, DataValue, Validity, StoreOp)>> {
let lower = encode_aev_key(aid, eid, &DataValue::Null, Validity::MAX);
let upper = encode_aev_key(aid, eid, &DataValue::Bottom, Validity::MIN);
TripleAttrEntityIter::new(self.tx.iterator(), lower, upper)
@ -487,7 +469,7 @@ impl SessionTx {
aid: AttrId,
eid: EntityId,
before: Validity,
) -> impl Iterator<Item=Result<(AttrId, EntityId, DataValue)>> {
) -> impl Iterator<Item = Result<(AttrId, EntityId, DataValue)>> {
let lower = encode_aev_key(aid, eid, &DataValue::Null, Validity::MAX);
let upper = encode_aev_key(aid, eid, &DataValue::Bottom, Validity::MIN);
TripleAttrEntityBeforeIter::new(self.tx.iterator(), lower, upper, before)
@ -496,7 +478,7 @@ impl SessionTx {
&self,
aid: AttrId,
v: &DataValue,
) -> impl Iterator<Item=Result<(AttrId, DataValue, EntityId, Validity, StoreOp)>> {
) -> impl Iterator<Item = Result<(AttrId, DataValue, EntityId, Validity, StoreOp)>> {
let lower = encode_ave_key(aid, v, EntityId::MIN_PERM, Validity::MAX);
let upper = encode_ave_key(aid, v, EntityId::MAX_PERM, Validity::MIN);
TripleAttrValueIter::new(self.tx.iterator(), lower, upper)
@ -506,7 +488,7 @@ impl SessionTx {
aid: AttrId,
v: &DataValue,
before: Validity,
) -> impl Iterator<Item=Result<(AttrId, DataValue, EntityId)>> {
) -> impl Iterator<Item = Result<(AttrId, DataValue, EntityId)>> {
let lower = encode_ave_key(aid, v, EntityId::MIN_PERM, Validity::MAX);
let upper = encode_ave_key(aid, v, EntityId::MAX_PERM, Validity::MIN);
TripleAttrValueBeforeIter::new(self.tx.iterator(), lower, upper, before)
@ -516,7 +498,7 @@ impl SessionTx {
aid: AttrId,
v: &DataValue,
after: Validity,
) -> impl Iterator<Item=Result<(AttrId, DataValue, EntityId)>> {
) -> impl Iterator<Item = Result<(AttrId, DataValue, EntityId)>> {
let lower = encode_ave_key(aid, v, EntityId::MIN_PERM, Validity::MAX);
let upper = encode_ave_key(aid, v, EntityId::MAX_PERM, Validity::MIN);
TripleAttrValueAfterIter::new(self.tx.iterator(), lower, upper, after)
@ -525,7 +507,7 @@ impl SessionTx {
&self,
v_eid: EntityId,
aid: AttrId,
) -> impl Iterator<Item=Result<(EntityId, AttrId, EntityId, Validity, StoreOp)>> {
) -> impl Iterator<Item = Result<(EntityId, AttrId, EntityId, Validity, StoreOp)>> {
let lower = encode_vae_key(v_eid, aid, EntityId::MIN_PERM, Validity::MAX);
let upper = encode_vae_key(v_eid, aid, EntityId::MAX_PERM, Validity::MIN);
TripleValueRefAttrIter::new(self.tx.iterator(), lower, upper)
@ -535,7 +517,7 @@ impl SessionTx {
v_eid: EntityId,
aid: AttrId,
before: Validity,
) -> impl Iterator<Item=Result<(EntityId, AttrId, EntityId)>> {
) -> impl Iterator<Item = Result<(EntityId, AttrId, EntityId)>> {
let lower = encode_vae_key(v_eid, aid, EntityId::MIN_PERM, Validity::MAX);
let upper = encode_vae_key(v_eid, aid, EntityId::MAX_PERM, Validity::MIN);
TripleValueRefAttrBeforeIter::new(self.tx.iterator(), lower, upper, before)
@ -543,7 +525,7 @@ impl SessionTx {
pub(crate) fn triple_e_scan(
&self,
eid: EntityId,
) -> impl Iterator<Item=Result<(EntityId, AttrId, DataValue, Validity, StoreOp)>> {
) -> impl Iterator<Item = Result<(EntityId, AttrId, DataValue, Validity, StoreOp)>> {
let lower = encode_eav_key(eid, AttrId::MIN_PERM, &DataValue::Null, Validity::MAX);
let upper = encode_eav_key(eid, AttrId::MAX_PERM, &DataValue::Bottom, Validity::MIN);
TripleEntityAttrIter::new(self.tx.iterator(), lower, upper)
@ -552,7 +534,7 @@ impl SessionTx {
&self,
eid: EntityId,
before: Validity,
) -> impl Iterator<Item=Result<(EntityId, AttrId, DataValue)>> {
) -> impl Iterator<Item = Result<(EntityId, AttrId, DataValue)>> {
let lower = encode_eav_key(eid, AttrId::MIN_PERM, &DataValue::Null, Validity::MAX);
let upper = encode_eav_key(eid, AttrId::MAX_PERM, &DataValue::Bottom, Validity::MIN);
TripleEntityAttrBeforeIter::new(self.tx.iterator(), lower, upper, before)
@ -560,7 +542,7 @@ impl SessionTx {
pub(crate) fn triple_a_scan(
&self,
aid: AttrId,
) -> impl Iterator<Item=Result<(AttrId, EntityId, DataValue, Validity, StoreOp)>> {
) -> impl Iterator<Item = Result<(AttrId, EntityId, DataValue, Validity, StoreOp)>> {
let lower = encode_aev_key(aid, EntityId::MIN_PERM, &DataValue::Null, Validity::MAX);
let upper = encode_aev_key(aid, EntityId::MAX_PERM, &DataValue::Bottom, Validity::MIN);
TripleAttrEntityIter::new(self.tx.iterator(), lower, upper)
@ -569,14 +551,14 @@ impl SessionTx {
&self,
aid: AttrId,
before: Validity,
) -> impl Iterator<Item=Result<(AttrId, EntityId, DataValue)>> {
) -> impl Iterator<Item = Result<(AttrId, EntityId, DataValue)>> {
let lower = encode_aev_key(aid, EntityId::MIN_PERM, &DataValue::Null, Validity::MAX);
let upper = encode_aev_key(aid, EntityId::MAX_PERM, &DataValue::Bottom, Validity::MIN);
TripleAttrEntityBeforeIter::new(self.tx.iterator(), lower, upper, before)
}
pub(crate) fn triple_a_scan_all(
&self,
) -> impl Iterator<Item=Result<(AttrId, EntityId, DataValue, Validity, StoreOp)>> {
) -> impl Iterator<Item = Result<(AttrId, EntityId, DataValue, Validity, StoreOp)>> {
let lower = encode_aev_key(
AttrId::MIN_PERM,
EntityId::MIN_PERM,
@ -594,7 +576,7 @@ impl SessionTx {
pub(crate) fn triple_a_before_scan_all(
&self,
before: Validity,
) -> impl Iterator<Item=Result<(AttrId, EntityId, DataValue)>> {
) -> impl Iterator<Item = Result<(AttrId, EntityId, DataValue)>> {
let lower = encode_aev_key(
AttrId::MIN_PERM,
EntityId::MIN_PERM,
@ -612,7 +594,7 @@ impl SessionTx {
pub(crate) fn triple_vref_scan(
&self,
v_eid: EntityId,
) -> impl Iterator<Item=Result<(EntityId, AttrId, EntityId, Validity, StoreOp)>> {
) -> impl Iterator<Item = Result<(EntityId, AttrId, EntityId, Validity, StoreOp)>> {
let lower = encode_vae_key(v_eid, AttrId::MIN_PERM, EntityId::MIN_PERM, Validity::MAX);
let upper = encode_vae_key(v_eid, AttrId::MAX_PERM, EntityId::MAX_PERM, Validity::MIN);
TripleValueRefAttrIter::new(self.tx.iterator(), lower, upper)
@ -621,7 +603,7 @@ impl SessionTx {
&self,
v_eid: EntityId,
before: Validity,
) -> impl Iterator<Item=Result<(EntityId, AttrId, EntityId)>> {
) -> impl Iterator<Item = Result<(EntityId, AttrId, EntityId)>> {
let lower = encode_vae_key(v_eid, AttrId::MIN_PERM, EntityId::MIN_PERM, Validity::MAX);
let upper = encode_vae_key(v_eid, AttrId::MAX_PERM, EntityId::MAX_PERM, Validity::MIN);
TripleValueRefAttrBeforeIter::new(self.tx.iterator(), lower, upper, before)

@ -1,10 +1,9 @@
use std::sync::atomic::Ordering;
use anyhow::Result;
use anyhow::{anyhow, bail, ensure, Result};
use cozorocks::{DbIter, IterBuilder};
use crate::AttrTxItem;
use crate::data::attr::Attribute;
use crate::data::encode::{
encode_attr_by_id, encode_sentinel_attr_by_id, encode_sentinel_attr_by_kw, VEC_SIZE_8,
@ -12,8 +11,9 @@ use crate::data::encode::{
use crate::data::id::AttrId;
use crate::data::keyword::Keyword;
use crate::data::triple::StoreOp;
use crate::runtime::transact::{SessionTx, TransactError};
use crate::runtime::transact::SessionTx;
use crate::utils::swap_option_result;
use crate::AttrTxItem;
impl SessionTx {
pub fn tx_attrs(&mut self, payloads: Vec<AttrTxItem>) -> Result<Vec<(StoreOp, AttrId)>> {
@ -97,30 +97,24 @@ impl SessionTx {
})
}
pub(crate) fn all_attrs(&mut self) -> impl Iterator<Item=Result<Attribute>> {
pub(crate) fn all_attrs(&mut self) -> impl Iterator<Item = Result<Attribute>> {
AttrIter::new(self.tx.iterator())
}
/// conflict if new attribute has same name as existing one
pub(crate) fn new_attr(&mut self, mut attr: Attribute) -> Result<AttrId> {
if attr.cardinality.is_many() && attr.indexing.is_unique_index() {
return Err(TransactError::AttrConsistency(
attr.id,
"cardinality cannot be 'many' for unique or identity attributes".to_string(),
)
.into());
}
ensure!(
!attr.cardinality.is_many() || !attr.indexing.is_unique_index(),
"cardinality cannot be 'many' for unique or identity attributes: {:?}",
attr
);
ensure!(
self.attr_by_kw(&attr.keyword)?.is_none(),
"new attribute conflicts with existing one for alias {}",
attr.keyword
);
if self.attr_by_kw(&attr.keyword)?.is_some() {
return Err(TransactError::AttrConflict(
attr.id,
format!(
"new attribute conflicts with existing one for alias {}",
attr.keyword
),
)
.into());
}
attr.id = AttrId(self.last_attr_id.fetch_add(1, Ordering::AcqRel) + 1);
self.put_attr(&attr, StoreOp::Assert)
}
@ -129,25 +123,24 @@ impl SessionTx {
/// or if the attr_id doesn't already exist (or retracted),
/// or if changing immutable properties (cardinality, val_type, indexing)
pub(crate) fn amend_attr(&mut self, attr: Attribute) -> Result<AttrId> {
let existing = self.attr_by_id(attr.id)?.ok_or_else(|| {
TransactError::AttrConflict(attr.id, "expected attributed not found".to_string())
})?;
let existing = self
.attr_by_id(attr.id)?
.ok_or_else(|| anyhow!("expected attribute id {:?} not found", attr.id))?;
let tx_id = self.get_write_tx_id()?;
if existing.keyword != attr.keyword {
if self.attr_by_kw(&attr.keyword)?.is_some() {
return Err(TransactError::AttrConflict(
attr.id,
format!("alias conflict: {}", attr.keyword),
)
.into());
}
if existing.val_type != attr.val_type
|| existing.cardinality != attr.cardinality
|| existing.indexing != attr.indexing
|| existing.with_history != attr.with_history
{
return Err(TransactError::ChangingImmutableProperty(attr.id).into());
}
ensure!(
self.attr_by_kw(&attr.keyword)?.is_none(),
"attribute alias {} conflict with existing one",
attr.keyword
);
ensure!(
existing.val_type == attr.val_type
&& existing.cardinality == attr.cardinality
&& existing.indexing == attr.indexing
&& existing.with_history == attr.with_history,
"changing immutable property for {:?}",
attr
);
let kw_sentinel = encode_sentinel_attr_by_kw(&existing.keyword);
let attr_data = existing.encode_with_op_and_tx(StoreOp::Retract, tx_id);
self.tx.put(&kw_sentinel, &attr_data)?;
@ -170,11 +163,7 @@ impl SessionTx {
/// conflict if retracted attr_id doesn't already exist, or is already retracted
pub(crate) fn retract_attr(&mut self, aid: AttrId) -> Result<AttrId> {
match self.attr_by_id(aid)? {
None => Err(TransactError::AttrConflict(
aid,
"attempting to retract non-existing attribute".to_string(),
)
.into()),
None => bail!("attempting to retract non-existing attribute {:?}", aid),
Some(attr) => {
self.put_attr(&attr, StoreOp::Retract)?;
Ok(attr.id)
@ -185,7 +174,7 @@ impl SessionTx {
pub(crate) fn retract_attr_by_kw(&mut self, kw: &Keyword) -> Result<AttrId> {
let attr = self
.attr_by_kw(kw)?
.ok_or_else(|| TransactError::AttrNotFoundKw(kw.clone()))?;
.ok_or_else(|| anyhow!("attribute not found: {}", kw))?;
self.retract_attr(attr.id)
}
}

Loading…
Cancel
Save