impl view

main
Ziyang Hu 2 years ago
parent 740317cc1b
commit 507a7e4fea

@ -164,4 +164,9 @@ OptimisticRocksDb::~OptimisticRocksDb() {
cerr << status.ToString() << endl;
}
}
}
void OptimisticRocksDb::del_range(RustBytes, RustBytes, RocksDbStatus &status) const {
status.code = StatusCode::kInvalidArgument;
status.message = rust::String("cannot call 'del_range' on optimistic db");
}

@ -107,6 +107,8 @@ struct RocksDbBridge {
[[nodiscard]] virtual unique_ptr<TxBridge> transact() const = 0;
virtual void del_range(RustBytes start, RustBytes end, RocksDbStatus &status) const = 0;
[[nodiscard]] inline const string &get_db_path() const {
return db_path;
}
@ -121,6 +123,8 @@ struct OptimisticRocksDb : public RocksDbBridge {
return ret;
}
void del_range(RustBytes, RustBytes, RocksDbStatus &status) const override;
virtual ~OptimisticRocksDb();
};
@ -133,6 +137,21 @@ struct PessimisticRocksDb : public RocksDbBridge {
return ret;
}
inline void del_range(RustBytes start, RustBytes end, RocksDbStatus &status) const override {
WriteBatch batch;
auto s = batch.DeleteRange(db->DefaultColumnFamily(), convert_slice(start), convert_slice(end));
if (!s.ok()) {
write_status(s, status);
return;
}
WriteOptions w_opts;
TransactionDBWriteOptimizations optimizations;
optimizations.skip_concurrency_control = true;
optimizations.skip_duplicate_key_check = true;
auto s2 = db->Write(w_opts, optimizations, &batch);
write_status(s2, status);
}
virtual ~PessimisticRocksDb();
};

@ -256,6 +256,16 @@ impl RocksDb {
inner: self.inner.transact(),
}
}
#[inline]
pub fn range_del(&self, lower: &[u8], upper: &[u8]) -> Result<(), RocksDbStatus> {
let mut status = RocksDbStatus::default();
self.inner.del_range(lower, upper, &mut status);
if status.is_ok() {
Ok(())
} else {
Err(status)
}
}
}
unsafe impl Send for RocksDb {}

@ -145,6 +145,12 @@ pub(crate) mod ffi {
cmp_impl: fn(&[u8], &[u8]) -> i8,
) -> SharedPtr<RocksDbBridge>;
fn transact(self: &RocksDbBridge) -> UniquePtr<TxBridge>;
fn del_range(
self: &RocksDbBridge,
lower: &[u8],
upper: &[u8],
status: &mut RocksDbStatus,
);
type TxBridge;
// fn get_r_opts(self: Pin<&mut TxBridge>) -> Pin<&mut ReadOptions>;

@ -243,6 +243,7 @@ impl MagicRule {
pub(crate) enum InputAtom {
AttrTriple(InputAttrTripleAtom),
Rule(InputRuleApplyAtom),
View(InputViewApplyAtom),
Predicate(Expr),
Negation(Box<InputAtom>),
Conjunction(Vec<InputAtom>),
@ -254,8 +255,10 @@ pub(crate) enum InputAtom {
pub(crate) enum NormalFormAtom {
AttrTriple(NormalFormAttrTripleAtom),
Rule(NormalFormRuleApplyAtom),
View(NormalFormViewApplyAtom),
NegatedAttrTriple(NormalFormAttrTripleAtom),
NegatedRule(NormalFormRuleApplyAtom),
NegatedView(NormalFormViewApplyAtom),
Predicate(Expr),
Unification(Unification),
}
@ -264,9 +267,11 @@ pub(crate) enum NormalFormAtom {
pub(crate) enum MagicAtom {
AttrTriple(MagicAttrTripleAtom),
Rule(MagicRuleApplyAtom),
View(MagicViewApplyAtom),
Predicate(Expr),
NegatedAttrTriple(MagicAttrTripleAtom),
NegatedRule(MagicRuleApplyAtom),
NegatedView(MagicViewApplyAtom),
Unification(Unification),
}
@ -297,18 +302,36 @@ pub(crate) struct InputRuleApplyAtom {
pub(crate) args: Vec<InputTerm<DataValue>>,
}
#[derive(Clone, Debug)]
pub(crate) struct InputViewApplyAtom {
pub(crate) name: Symbol,
pub(crate) args: Vec<InputTerm<DataValue>>,
}
#[derive(Clone, Debug)]
pub(crate) struct NormalFormRuleApplyAtom {
pub(crate) name: Symbol,
pub(crate) args: Vec<Symbol>,
}
#[derive(Clone, Debug)]
pub(crate) struct NormalFormViewApplyAtom {
pub(crate) name: Symbol,
pub(crate) args: Vec<Symbol>,
}
#[derive(Clone, Debug)]
pub(crate) struct MagicRuleApplyAtom {
pub(crate) name: MagicSymbol,
pub(crate) args: Vec<Symbol>,
}
#[derive(Clone, Debug)]
pub(crate) struct MagicViewApplyAtom {
pub(crate) name: Symbol,
pub(crate) args: Vec<Symbol>,
}
#[derive(Clone, Debug)]
pub(crate) enum InputTerm<T> {
Var(Symbol),

@ -1,12 +1,14 @@
use std::cmp::{min, Ordering};
use std::cmp::{max, min, Ordering};
use std::fmt::{Debug, Formatter};
use anyhow::{ensure, Result};
use itertools::Itertools;
use rmp_serde::Serializer;
use serde::Serialize;
use crate::data::json::JsonValue;
use crate::data::value::DataValue;
use crate::runtime::temp_store::TempStoreId;
use crate::runtime::view::ViewRelId;
pub(crate) const SCRATCH_DB_KEY_PREFIX_LEN: usize = 6;
@ -30,35 +32,31 @@ impl Debug for Tuple {
pub(crate) type TupleIter<'a> = Box<dyn Iterator<Item = Result<Tuple>> + 'a>;
impl Tuple {
// pub(crate) fn arity(&self) -> usize {
// self.0.len()
// }
// pub(crate) fn encode_as_key_for_epoch(&self, prefix: TempStoreId, epoch: u32) -> Vec<u8> {
// let len = self.arity();
// let mut ret = Vec::with_capacity(4 + 4 * len + 10 * len);
// let prefix_bytes = prefix.0.to_be_bytes();
// let epoch_bytes = epoch.to_be_bytes();
// ret.extend([
// prefix_bytes[1],
// prefix_bytes[2],
// prefix_bytes[3],
// epoch_bytes[1],
// epoch_bytes[2],
// epoch_bytes[3],
// ]);
// ret.extend((len as u16).to_be_bytes());
// ret.resize(max(6, 4 * (len + 1)), 0);
// for (idx, val) in self.0.iter().enumerate() {
// if idx > 0 {
// let pos = (ret.len() as u32).to_be_bytes();
// for (i, u) in pos.iter().enumerate() {
// ret[4 * (1 + idx) + i] = *u;
// }
// }
// val.serialize(&mut Serializer::new(&mut ret)).unwrap();
// }
// ret
// }
pub(crate) fn encode_as_key(&self, prefix: ViewRelId) -> Vec<u8> {
let len = self.0.len();
let mut ret = Vec::with_capacity(4 + 4 * len + 10 * len);
let prefix_bytes = prefix.0.to_be_bytes();
ret.extend([
prefix_bytes[2],
prefix_bytes[3],
prefix_bytes[4],
prefix_bytes[5],
prefix_bytes[6],
prefix_bytes[7],
]);
ret.extend((len as u16).to_be_bytes());
ret.resize(max(6, 4 * (len + 1)), 0);
for (idx, val) in self.0.iter().enumerate() {
if idx > 0 {
let pos = (ret.len() as u32).to_be_bytes();
for (i, u) in pos.iter().enumerate() {
ret[4 * (1 + idx) + i] = *u;
}
}
val.serialize(&mut Serializer::new(&mut ret)).unwrap();
}
ret
}
}
#[derive(Copy, Clone, Debug)]
@ -112,11 +110,12 @@ impl<'a> EncodedTuple<'a> {
// ],
// )
// }
pub(crate) fn prefix(&self) -> Result<(TempStoreId, u32)> {
pub(crate) fn prefix(&self) -> Result<ViewRelId> {
ensure!(self.0.len() >= 6, "bad data: {:x?}", self.0);
let id = u32::from_be_bytes([0, self.0[0], self.0[1], self.0[2]]);
let epoch = u32::from_be_bytes([0, self.0[3], self.0[4], self.0[5]]);
Ok((TempStoreId(id), epoch))
let id = u64::from_be_bytes([
0, 0, self.0[0], self.0[1], self.0[2], self.0[3], self.0[4], self.0[5],
]);
Ok(ViewRelId(id))
}
pub(crate) fn arity(&self) -> Result<usize> {
if self.0.len() == 6 {

@ -10,10 +10,7 @@ use crate::data::attr::Attribute;
use crate::data::expr::{get_op, Expr, OP_LIST};
use crate::data::id::{EntityId, Validity};
use crate::data::json::JsonValue;
use crate::data::program::{
InputAtom, InputAttrTripleAtom, InputProgram, InputRule, InputRuleApplyAtom, InputTerm,
MagicSymbol, Unification,
};
use crate::data::program::{InputAtom, InputAttrTripleAtom, InputProgram, InputRule, InputRuleApplyAtom, InputTerm, MagicSymbol, Unification, InputViewApplyAtom};
use crate::data::symb::{Symbol, PROG_ENTRY};
use crate::data::tuple::Tuple;
use crate::data::value::DataValue;
@ -517,6 +514,52 @@ impl SessionTx {
args,
}))
}
fn parse_input_view_atom(
&mut self,
payload: &Map<String, JsonValue>,
vld: Validity,
) -> Result<InputAtom> {
let rule_name = payload
.get("view")
.ok_or_else(|| anyhow!("expect key 'view' in rule atom"))?
.as_str()
.ok_or_else(|| anyhow!("expect value for key 'view' to be a string"))?
.into();
let args = payload
.get("args")
.ok_or_else(|| anyhow!("expect key 'args' in rule atom"))?
.as_array()
.ok_or_else(|| anyhow!("expect value for key 'args' to be an array"))?
.iter()
.map(|value_rep| -> Result<InputTerm<DataValue>> {
if let Some(s) = value_rep.as_str() {
let var = Symbol::from(s);
if s.starts_with(['?', '_']) {
return Ok(InputTerm::Var(var));
} else {
ensure!(
!var.is_reserved(),
"{} is a reserved string value and must be quoted",
s
)
}
}
if let Some(o) = value_rep.as_object() {
return if let Some(c) = o.get("const") {
Ok(InputTerm::Const(c.into()))
} else {
let eid = self.parse_eid_from_map(o, vld)?;
Ok(InputTerm::Const(eid.to_value()))
};
}
Ok(InputTerm::Const(value_rep.into()))
})
.try_collect()?;
Ok(InputAtom::View(InputViewApplyAtom {
name: rule_name,
args,
}))
}
fn parse_input_rule_definition(
&mut self,
payload: &JsonValue,
@ -627,6 +670,8 @@ impl SessionTx {
JsonValue::Object(map) => {
if map.contains_key("rule") {
self.parse_input_rule_atom(map, vld)
} else if map.contains_key("view") {
self.parse_input_view_atom(map, vld)
} else if map.contains_key("op") {
Self::parse_input_predicate_atom(map, params_pool)
} else if map.contains_key("unify") {

@ -10,7 +10,7 @@ use crate::data::symb::Symbol;
use crate::data::value::DataValue;
use crate::parse::query::ConstRules;
use crate::query::relation::Relation;
use crate::runtime::temp_store::TempStore;
use crate::runtime::derived::DerivedRelStore;
use crate::runtime::transact::SessionTx;
pub(crate) type CompiledProgram = BTreeMap<MagicSymbol, CompiledRuleSet>;
@ -64,8 +64,8 @@ impl SessionTx {
&mut self,
prog: &StratifiedMagicProgram,
const_rules: &ConstRules,
) -> Result<(Vec<CompiledProgram>, BTreeMap<MagicSymbol, TempStore>)> {
let mut stores: BTreeMap<MagicSymbol, TempStore> = Default::default();
) -> Result<(Vec<CompiledProgram>, BTreeMap<MagicSymbol, DerivedRelStore>)> {
let mut stores: BTreeMap<MagicSymbol, DerivedRelStore> = Default::default();
for (name, data) in const_rules {
let store = self.new_rule_store(name.clone(), data[0].0.len());
@ -132,7 +132,7 @@ impl SessionTx {
rule: &MagicRule,
rule_name: &MagicSymbol,
rule_idx: usize,
stores: &BTreeMap<MagicSymbol, TempStore>,
stores: &BTreeMap<MagicSymbol, DerivedRelStore>,
ret_vars: &[Symbol],
) -> Result<Relation> {
let mut ret = Relation::unit();
@ -206,6 +206,39 @@ impl SessionTx {
debug_assert_eq!(prev_joiner_vars.len(), right_joiner_vars.len());
ret = ret.join(right, prev_joiner_vars, right_joiner_vars);
}
MagicAtom::View(view_app) => {
todo!()
// let store = stores
// .get(&rule_app.name)
// .ok_or_else(|| anyhow!("undefined rule '{:?}' encountered", rule_app.name))?
// .clone();
// ensure!(
// store.arity == rule_app.args.len(),
// "arity mismatch in rule application {:?}, expect {}, found {}",
// rule_app.name,
// store.arity,
// rule_app.args.len()
// );
// let mut prev_joiner_vars = vec![];
// let mut right_joiner_vars = vec![];
// let mut right_vars = vec![];
//
// for var in &rule_app.args {
// if seen_variables.contains(var) {
// prev_joiner_vars.push(var.clone());
// let rk = gen_symb();
// right_vars.push(rk.clone());
// right_joiner_vars.push(rk);
// } else {
// seen_variables.insert(var.clone());
// right_vars.push(var.clone());
// }
// }
//
// let right = Relation::derived(right_vars, store);
// debug_assert_eq!(prev_joiner_vars.len(), right_joiner_vars.len());
// ret = ret.join(right, prev_joiner_vars, right_joiner_vars);
}
MagicAtom::NegatedAttrTriple(a_triple) => {
let mut join_left_keys = vec![];
let mut join_right_keys = vec![];
@ -277,6 +310,41 @@ impl SessionTx {
debug_assert_eq!(prev_joiner_vars.len(), right_joiner_vars.len());
ret = ret.neg_join(right, prev_joiner_vars, right_joiner_vars);
}
MagicAtom::NegatedView(view_app) => {
todo!()
// let store = stores
// .get(&rule_app.name)
// .ok_or_else(|| {
// anyhow!("undefined rule encountered: '{:?}'", rule_app.name)
// })?
// .clone();
// ensure!(
// store.arity == rule_app.args.len(),
// "arity mismatch for {:?}, expect {}, got {}",
// rule_app.name,
// store.arity,
// rule_app.args.len()
// );
//
// let mut prev_joiner_vars = vec![];
// let mut right_joiner_vars = vec![];
// let mut right_vars = vec![];
//
// for var in &rule_app.args {
// if seen_variables.contains(var) {
// prev_joiner_vars.push(var.clone());
// let rk = gen_symb();
// right_vars.push(rk.clone());
// right_joiner_vars.push(rk);
// } else {
// right_vars.push(var.clone());
// }
// }
//
// let right = Relation::derived(right_vars, store);
// debug_assert_eq!(prev_joiner_vars.len(), right_joiner_vars.len());
// ret = ret.neg_join(right, prev_joiner_vars, right_joiner_vars);
}
MagicAtom::Predicate(p) => {
if let Some(fs) = ret.get_filters() {
fs.extend(p.to_conjunction());

@ -7,7 +7,7 @@ use log::{debug, log_enabled, trace, Level};
use crate::data::program::MagicSymbol;
use crate::data::symb::PROG_ENTRY;
use crate::query::compile::{AggrKind, CompiledProgram};
use crate::runtime::temp_store::TempStore;
use crate::runtime::derived::DerivedRelStore;
use crate::runtime::transact::SessionTx;
pub(crate) struct QueryLimiter {
@ -30,9 +30,9 @@ impl SessionTx {
pub(crate) fn stratified_magic_evaluate(
&mut self,
strata: &[CompiledProgram],
stores: &BTreeMap<MagicSymbol, TempStore>,
stores: &BTreeMap<MagicSymbol, DerivedRelStore>,
num_to_take: Option<usize>,
) -> Result<TempStore> {
) -> Result<DerivedRelStore> {
let ret_area = stores
.get(&MagicSymbol::Muggle {
inner: PROG_ENTRY.clone(),
@ -49,7 +49,7 @@ impl SessionTx {
fn semi_naive_magic_evaluate(
&mut self,
prog: &CompiledProgram,
stores: &BTreeMap<MagicSymbol, TempStore>,
stores: &BTreeMap<MagicSymbol, DerivedRelStore>,
num_to_take: Option<usize>,
) -> Result<()> {
if log_enabled!(Level::Debug) {

@ -4,10 +4,7 @@ use anyhow::{bail, Result};
use itertools::Itertools;
use crate::data::expr::Expr;
use crate::data::program::{
InputAtom, InputAttrTripleAtom, InputRuleApplyAtom, InputTerm, NormalFormAtom,
NormalFormAttrTripleAtom, NormalFormRuleApplyAtom, TempSymbGen, Unification,
};
use crate::data::program::{InputAtom, InputAttrTripleAtom, InputRuleApplyAtom, InputTerm, NormalFormAtom, NormalFormAttrTripleAtom, NormalFormRuleApplyAtom, TempSymbGen, Unification, InputViewApplyAtom, NormalFormViewApplyAtom};
#[derive(Debug)]
pub(crate) struct Disjunction(pub(crate) Vec<Conjunction>);
@ -42,7 +39,7 @@ pub(crate) struct Conjunction(pub(crate) Vec<NormalFormAtom>);
impl InputAtom {
pub(crate) fn negation_normal_form(self) -> Result<Self> {
Ok(match self {
a @ (InputAtom::AttrTriple(_) | InputAtom::Rule(_) | InputAtom::Predicate(_)) => a,
a @ (InputAtom::AttrTriple(_) | InputAtom::Rule(_) | InputAtom::Predicate(_) | InputAtom::View(_)) => a,
InputAtom::Conjunction(args) => InputAtom::Conjunction(
args.into_iter()
.map(|a| a.negation_normal_form())
@ -55,7 +52,7 @@ impl InputAtom {
),
InputAtom::Unification(unif) => InputAtom::Unification(unif),
InputAtom::Negation(arg) => match *arg {
a @ (InputAtom::AttrTriple(_) | InputAtom::Rule(_)) => {
a @ (InputAtom::AttrTriple(_) | InputAtom::Rule(_) | InputAtom::View(_)) => {
InputAtom::Negation(Box::new(a))
}
InputAtom::Predicate(p) => InputAtom::Predicate(p.negate()),
@ -106,6 +103,7 @@ impl InputAtom {
}
InputAtom::AttrTriple(a) => a.normalize(false, gen),
InputAtom::Rule(r) => r.normalize(false, gen),
InputAtom::View(v) => v.normalize(false, gen),
InputAtom::Predicate(mut p) => {
p.partial_eval(&Default::default())?;
Disjunction::singlet(NormalFormAtom::Predicate(p))
@ -113,6 +111,7 @@ impl InputAtom {
InputAtom::Negation(n) => match *n {
InputAtom::Rule(r) => r.normalize(true, gen),
InputAtom::AttrTriple(r) => r.normalize(true, gen),
InputAtom::View(v) => v.normalize(true, gen),
_ => unreachable!(),
},
InputAtom::Unification(u) => Disjunction::singlet(NormalFormAtom::Unification(u)),
@ -258,3 +257,52 @@ impl InputAttrTripleAtom {
})
}
}
impl InputViewApplyAtom {
fn normalize(self, is_negated: bool, gen: &mut TempSymbGen) -> Disjunction {
let mut ret = Vec::with_capacity(self.args.len() + 1);
let mut args = Vec::with_capacity(self.args.len());
let mut seen_variables = BTreeSet::new();
for arg in self.args {
match arg {
InputTerm::Var(kw) => {
if seen_variables.insert(kw.clone()) {
args.push(kw);
} else {
let dup = gen.next();
let unif = NormalFormAtom::Unification(Unification {
binding: dup.clone(),
expr: Expr::Binding(kw, None),
one_many_unif: false,
});
ret.push(unif);
args.push(dup);
}
}
InputTerm::Const(val) => {
let kw = gen.next();
args.push(kw.clone());
let unif = NormalFormAtom::Unification(Unification {
binding: kw,
expr: Expr::Const(val),
one_many_unif: false,
});
ret.push(unif)
}
}
}
ret.push(if is_negated {
NormalFormAtom::NegatedView(NormalFormViewApplyAtom {
name: self.name,
args,
})
} else {
NormalFormAtom::View(NormalFormViewApplyAtom {
name: self.name,
args,
})
});
Disjunction::conj(ret)
}
}

@ -6,8 +6,8 @@ use smallvec::SmallVec;
use crate::data::program::{
MagicAtom, MagicAttrTripleAtom, MagicProgram, MagicRule, MagicRuleApplyAtom, MagicRuleSet,
MagicSymbol, NormalFormAtom, NormalFormProgram, NormalFormRule, StratifiedMagicProgram,
StratifiedNormalFormProgram,
MagicSymbol, MagicViewApplyAtom, NormalFormAtom, NormalFormProgram, NormalFormRule,
StratifiedMagicProgram, StratifiedNormalFormProgram,
};
use crate::data::symb::{Symbol, PROG_ENTRY};
@ -112,7 +112,8 @@ impl MagicProgram {
match atom {
a @ (MagicAtom::Predicate(_)
| MagicAtom::NegatedAttrTriple(_)
| MagicAtom::NegatedRule(_)) => {
| MagicAtom::NegatedRule(_)
| MagicAtom::NegatedView(_)) => {
collected_atoms.push(a);
}
MagicAtom::AttrTriple(t) => {
@ -120,6 +121,10 @@ impl MagicProgram {
seen_bindings.insert(t.value.clone());
collected_atoms.push(MagicAtom::AttrTriple(t));
}
MagicAtom::View(v) => {
seen_bindings.extend(v.args.iter().cloned());
collected_atoms.push(MagicAtom::View(v));
}
MagicAtom::Unification(u) => {
seen_bindings.insert(u.binding.clone());
collected_atoms.push(MagicAtom::Unification(u));
@ -304,6 +309,18 @@ impl NormalFormAtom {
}
MagicAtom::AttrTriple(t)
}
NormalFormAtom::View(v) => {
let v = MagicViewApplyAtom {
name: v.name.clone(),
args: v.args.clone(),
};
for arg in v.args.iter() {
if !seen_bindings.contains(arg) {
seen_bindings.insert(arg.clone());
}
}
MagicAtom::View(v)
}
NormalFormAtom::Predicate(p) => {
// predicate cannot introduce new bindings
MagicAtom::Predicate(p.clone())
@ -349,6 +366,10 @@ impl NormalFormAtom {
},
args: nr.args.clone(),
}),
NormalFormAtom::NegatedView(nv) => MagicAtom::NegatedView(MagicViewApplyAtom {
name: nv.name.clone(),
args: nv.args.clone(),
}),
NormalFormAtom::Unification(u) => {
seen_bindings.insert(u.binding.clone());
MagicAtom::Unification(u.clone())

@ -13,13 +13,15 @@ use crate::data::id::{AttrId, EntityId, Validity};
use crate::data::symb::Symbol;
use crate::data::tuple::{Tuple, TupleIter};
use crate::data::value::DataValue;
use crate::runtime::temp_store::{TempStore, TempStoreId};
use crate::runtime::derived::{DerivedRelStore, DerivedRelStoreId};
use crate::runtime::transact::SessionTx;
use crate::runtime::view::ViewRelStore;
pub(crate) enum Relation {
Fixed(InlineFixedRelation),
Triple(TripleRelation),
Derived(DerivedRelation),
View(ViewRelation),
Join(Box<InnerJoin>),
NegJoin(Box<NegJoin>),
Reorder(ReorderRelation),
@ -81,7 +83,7 @@ impl UnificationRelation {
&'a self,
tx: &'a SessionTx,
epoch: Option<u32>,
use_delta: &BTreeSet<TempStoreId>,
use_delta: &BTreeSet<DerivedRelStoreId>,
) -> Result<TupleIter<'a>> {
let mut bindings = self.parent.bindings_after_eliminate();
bindings.push(self.binding.clone());
@ -164,7 +166,7 @@ impl FilteredRelation {
&'a self,
tx: &'a SessionTx,
epoch: Option<u32>,
use_delta: &BTreeSet<TempStoreId>,
use_delta: &BTreeSet<DerivedRelStoreId>,
) -> Result<TupleIter<'a>> {
let bindings = self.parent.bindings_after_eliminate();
let eliminate_indices = get_eliminate_indices(&bindings, &self.to_eliminate);
@ -229,6 +231,12 @@ impl Debug for Relation {
.field(&r.storage.rule_name)
.field(&r.filters)
.finish(),
Relation::View(r) => f
.debug_tuple("Derived")
.field(&bindings)
.field(&r.storage.metadata.name)
.field(&r.filters)
.finish(),
Relation::Join(r) => {
if r.left.is_unit() {
r.right.fmt(f)
@ -289,6 +297,9 @@ impl Relation {
Relation::Derived(d) => {
d.fill_binding_indices()?;
}
Relation::View(v) => {
v.fill_binding_indices()?;
}
Relation::Reorder(r) => {
r.relation.fill_normal_binding_indices()?;
}
@ -345,13 +356,20 @@ impl Relation {
pub(crate) fn cartesian_join(self, right: Relation) -> Self {
self.join(right, vec![], vec![])
}
pub(crate) fn derived(bindings: Vec<Symbol>, storage: TempStore) -> Self {
pub(crate) fn derived(bindings: Vec<Symbol>, storage: DerivedRelStore) -> Self {
Self::Derived(DerivedRelation {
bindings,
storage,
filters: vec![],
})
}
pub(crate) fn view(bindings: Vec<Symbol>, storage: ViewRelStore) -> Self {
Self::View(ViewRelation {
bindings,
storage,
filters: vec![]
})
}
pub(crate) fn triple(
attr: Attribute,
vld: Validity,
@ -435,7 +453,7 @@ impl ReorderRelation {
&'a self,
tx: &'a SessionTx,
epoch: Option<u32>,
use_delta: &BTreeSet<TempStoreId>,
use_delta: &BTreeSet<DerivedRelStoreId>,
) -> Result<TupleIter<'a>> {
let old_order = self.relation.bindings_after_eliminate();
let old_order_indices: BTreeMap<_, _> = old_order
@ -1311,10 +1329,186 @@ fn get_eliminate_indices(bindings: &[Symbol], eliminate: &BTreeSet<Symbol>) -> B
.collect::<BTreeSet<_>>()
}
#[derive(Debug)]
pub(crate) struct ViewRelation {
pub(crate) bindings: Vec<Symbol>,
pub(crate) storage: ViewRelStore,
pub(crate) filters: Vec<Expr>,
}
impl ViewRelation {
fn fill_binding_indices(&mut self) -> Result<()> {
let bindings: BTreeMap<_, _> = self
.bindings
.iter()
.cloned()
.enumerate()
.map(|(a, b)| (b, a))
.collect();
for e in self.filters.iter_mut() {
e.fill_binding_indices(&bindings)?;
}
Ok(())
}
fn prefix_join<'a>(
&'a self,
left_iter: TupleIter<'a>,
(left_join_indices, right_join_indices): (Vec<usize>, Vec<usize>),
eliminate_indices: BTreeSet<usize>,
) -> Result<TupleIter<'a>> {
let mut right_invert_indices = right_join_indices.iter().enumerate().collect_vec();
right_invert_indices.sort_by_key(|(_, b)| **b);
let left_to_prefix_indices = right_invert_indices
.into_iter()
.map(|(a, _)| left_join_indices[a])
.collect_vec();
let mut skip_range_check = false;
let it = left_iter
.map_ok(move |tuple| {
let prefix = Tuple(
left_to_prefix_indices
.iter()
.map(|i| tuple.0[*i].clone())
.collect_vec(),
);
if !skip_range_check && !self.filters.is_empty() {
let other_bindings = &self.bindings[right_join_indices.len()..];
let (l_bound, u_bound) = match compute_bounds(&self.filters, other_bindings) {
Ok(b) => b,
_ => (vec![], vec![]),
};
if !l_bound.iter().all(|v| *v == DataValue::Null)
|| !u_bound.iter().all(|v| *v == DataValue::Bottom)
{
return Left(
self.storage
.scan_bounded_prefix(
&prefix, &l_bound, &u_bound,
)
.filter_map_ok(move |found| {
// dbg!("filter", &tuple, &prefix, &found);
let mut ret = tuple.0.clone();
ret.extend(found.0);
Some(Tuple(ret))
}),
);
}
}
skip_range_check = true;
Right(
self.storage
.scan_prefix(&prefix)
.filter_map_ok(move |found| {
// dbg!("filter", &tuple, &prefix, &found);
let mut ret = tuple.0.clone();
ret.extend(found.0);
Some(Tuple(ret))
}),
)
})
.flatten_ok()
.map(flatten_err);
Ok(
match (self.filters.is_empty(), eliminate_indices.is_empty()) {
(true, true) => Box::new(it),
(true, false) => {
Box::new(it.map_ok(move |t| eliminate_from_tuple(t, &eliminate_indices)))
}
(false, true) => Box::new(filter_iter(self.filters.clone(), it)),
(false, false) => Box::new(
filter_iter(self.filters.clone(), it)
.map_ok(move |t| eliminate_from_tuple(t, &eliminate_indices)),
),
},
)
}
fn neg_join<'a>(
&'a self,
left_iter: TupleIter<'a>,
(left_join_indices, right_join_indices): (Vec<usize>, Vec<usize>),
eliminate_indices: BTreeSet<usize>,
) -> Result<TupleIter<'a>> {
debug_assert!(!right_join_indices.is_empty());
let mut right_invert_indices = right_join_indices.iter().enumerate().collect_vec();
right_invert_indices.sort_by_key(|(_, b)| **b);
let mut left_to_prefix_indices = vec![];
for (ord, (idx, ord_sorted)) in right_invert_indices.iter().enumerate() {
if ord != **ord_sorted {
break;
}
left_to_prefix_indices.push(left_join_indices[*idx]);
}
Ok(Box::new(
left_iter
.map_ok(move |tuple| -> Result<Option<Tuple>> {
let prefix = Tuple(
left_to_prefix_indices
.iter()
.map(|i| tuple.0[*i].clone())
.collect_vec(),
);
'outer: for found in self.storage.scan_prefix(&prefix) {
let found = found?;
for (left_idx, right_idx) in
left_join_indices.iter().zip(right_join_indices.iter())
{
if tuple.0[*left_idx] != found.0[*right_idx] {
continue 'outer;
}
}
return Ok(None);
}
Ok(Some(if !eliminate_indices.is_empty() {
Tuple(
tuple
.0
.into_iter()
.enumerate()
.filter_map(|(i, v)| {
if eliminate_indices.contains(&i) {
None
} else {
Some(v)
}
})
.collect_vec(),
)
} else {
tuple
}))
})
.map(flatten_err)
.filter_map(invert_option_err),
))
}
fn iter(&self) -> Result<TupleIter<'_>> {
let it = self.storage.scan_all()?;
Ok(if self.filters.is_empty() {
Box::new(it)
} else {
Box::new(filter_iter(self.filters.clone(), it))
})
}
fn join_is_prefix(&self, right_join_indices: &[usize]) -> bool {
let mut indices = right_join_indices.to_vec();
indices.sort();
let l = indices.len();
indices.into_iter().eq(0..l)
}
}
#[derive(Debug)]
pub(crate) struct DerivedRelation {
pub(crate) bindings: Vec<Symbol>,
pub(crate) storage: TempStore,
pub(crate) storage: DerivedRelStore,
pub(crate) filters: Vec<Expr>,
}
@ -1346,7 +1540,7 @@ impl DerivedRelation {
Ok(())
}
fn iter(&self, epoch: Option<u32>, use_delta: &BTreeSet<TempStoreId>) -> Result<TupleIter<'_>> {
fn iter(&self, epoch: Option<u32>, use_delta: &BTreeSet<DerivedRelStoreId>) -> Result<TupleIter<'_>> {
if epoch == Some(0) && use_delta.contains(&self.storage.id) {
return Ok(Box::new(iter::empty()));
}
@ -1441,7 +1635,7 @@ impl DerivedRelation {
(left_join_indices, right_join_indices): (Vec<usize>, Vec<usize>),
eliminate_indices: BTreeSet<usize>,
epoch: Option<u32>,
use_delta: &BTreeSet<TempStoreId>,
use_delta: &BTreeSet<DerivedRelStoreId>,
) -> Result<TupleIter<'a>> {
if epoch == Some(0) && use_delta.contains(&self.storage.id) {
return Ok(Box::new(iter::empty()));
@ -1585,6 +1779,7 @@ impl Relation {
Relation::Fixed(r) => r.do_eliminate_temp_vars(used),
Relation::Triple(_r) => Ok(()),
Relation::Derived(_r) => Ok(()),
Relation::View(_v) => Ok(()),
Relation::Join(r) => r.do_eliminate_temp_vars(used),
Relation::Reorder(r) => r.relation.eliminate_temp_vars(used),
Relation::Filter(r) => r.do_eliminate_temp_vars(used),
@ -1598,6 +1793,7 @@ impl Relation {
Relation::Fixed(r) => Some(&r.to_eliminate),
Relation::Triple(_) => None,
Relation::Derived(_) => None,
Relation::View(_) => None,
Relation::Join(r) => Some(&r.to_eliminate),
Relation::Reorder(_) => None,
Relation::Filter(r) => Some(&r.to_eliminate),
@ -1622,6 +1818,7 @@ impl Relation {
Relation::Fixed(f) => f.bindings.clone(),
Relation::Triple(t) => t.bindings.to_vec(),
Relation::Derived(d) => d.bindings.clone(),
Relation::View(v) => v.bindings.clone(),
Relation::Join(j) => j.bindings(),
Relation::Reorder(r) => r.bindings(),
Relation::Filter(r) => r.parent.bindings_after_eliminate(),
@ -1637,12 +1834,13 @@ impl Relation {
&'a self,
tx: &'a SessionTx,
epoch: Option<u32>,
use_delta: &BTreeSet<TempStoreId>,
use_delta: &BTreeSet<DerivedRelStoreId>,
) -> Result<TupleIter<'a>> {
match self {
Relation::Fixed(f) => Ok(Box::new(f.data.iter().map(|t| Ok(Tuple(t.clone()))))),
Relation::Triple(r) => r.iter(tx),
Relation::Derived(r) => r.iter(epoch, use_delta),
Relation::View(v) => v.iter(),
Relation::Join(j) => j.iter(tx, epoch, use_delta),
Relation::Reorder(r) => r.iter(tx, epoch, use_delta),
Relation::Filter(r) => r.iter(tx, epoch, use_delta),
@ -1678,7 +1876,7 @@ impl NegJoin {
&'a self,
tx: &'a SessionTx,
epoch: Option<u32>,
use_delta: &BTreeSet<TempStoreId>,
use_delta: &BTreeSet<DerivedRelStoreId>,
) -> Result<TupleIter<'a>> {
let bindings = self.left.bindings_after_eliminate();
let eliminate_indices = get_eliminate_indices(&bindings, &self.to_eliminate);
@ -1712,6 +1910,20 @@ impl NegJoin {
eliminate_indices,
)
}
Relation::View(v) => {
let join_indices = self
.joiner
.join_indices(
&self.left.bindings_after_eliminate(),
&self.right.bindings_after_eliminate(),
)
.unwrap();
v.neg_join(
self.left.iter(tx, epoch, use_delta)?,
join_indices,
eliminate_indices,
)
}
_ => {
unreachable!()
}
@ -1762,7 +1974,7 @@ impl InnerJoin {
&'a self,
tx: &'a SessionTx,
epoch: Option<u32>,
use_delta: &BTreeSet<TempStoreId>,
use_delta: &BTreeSet<DerivedRelStoreId>,
) -> Result<TupleIter<'a>> {
let bindings = self.bindings();
let eliminate_indices = get_eliminate_indices(&bindings, &self.to_eliminate);
@ -1816,6 +2028,24 @@ impl InnerJoin {
self.materialized_join(tx, eliminate_indices, epoch, use_delta)
}
}
Relation::View(r) => {
let join_indices = self
.joiner
.join_indices(
&self.left.bindings_after_eliminate(),
&self.right.bindings_after_eliminate(),
)
.unwrap();
if r.join_is_prefix(&join_indices.1) {
r.prefix_join(
self.left.iter(tx, epoch, use_delta)?,
join_indices,
eliminate_indices,
)
} else {
self.materialized_join(tx, eliminate_indices, epoch, use_delta)
}
}
Relation::Join(_) | Relation::Filter(_) | Relation::Unification(_) => {
self.materialized_join(tx, eliminate_indices, epoch, use_delta)
}
@ -1832,7 +2062,7 @@ impl InnerJoin {
tx: &'a SessionTx,
eliminate_indices: BTreeSet<usize>,
epoch: Option<u32>,
use_delta: &BTreeSet<TempStoreId>,
use_delta: &BTreeSet<DerivedRelStoreId>,
) -> Result<TupleIter<'a>> {
let right_bindings = self.right.bindings_after_eliminate();
let (left_join_indices, right_join_indices) = self

@ -51,6 +51,13 @@ impl NormalFormRule {
}
round_1_collected.push(NormalFormAtom::Rule(r))
}
NormalFormAtom::View(mut v) => {
for arg in &mut v.args {
process_ignored_symbol(arg);
seen_variables.insert(arg.clone());
}
round_1_collected.push(NormalFormAtom::View(v))
}
NormalFormAtom::NegatedAttrTriple(mut t) => {
process_ignored_symbol(&mut t.value);
process_ignored_symbol(&mut t.entity);
@ -62,6 +69,12 @@ impl NormalFormRule {
}
pending.push(NormalFormAtom::NegatedRule(r))
}
NormalFormAtom::NegatedView(mut v) => {
for arg in &mut v.args {
process_ignored_symbol(arg);
}
pending.push(NormalFormAtom::NegatedView(v))
}
NormalFormAtom::Predicate(p) => {
pending.push(NormalFormAtom::Predicate(p));
}
@ -84,8 +97,13 @@ impl NormalFormRule {
seen_variables.extend(r.args.iter().cloned());
collected.push(NormalFormAtom::Rule(r))
}
NormalFormAtom::View(v) => {
seen_variables.extend(v.args.iter().cloned());
collected.push(NormalFormAtom::View(v))
}
NormalFormAtom::NegatedAttrTriple(_)
| NormalFormAtom::NegatedRule(_)
| NormalFormAtom::NegatedView(_)
| NormalFormAtom::Predicate(_) => {
unreachable!()
}
@ -96,7 +114,9 @@ impl NormalFormRule {
}
for atom in last_pending.iter() {
match atom {
NormalFormAtom::AttrTriple(_) | NormalFormAtom::Rule(_) => unreachable!(),
NormalFormAtom::AttrTriple(_)
| NormalFormAtom::Rule(_)
| NormalFormAtom::View(_) => unreachable!(),
NormalFormAtom::NegatedAttrTriple(t) => {
if seen_variables.contains(&t.value) && seen_variables.contains(&t.entity) {
collected.push(NormalFormAtom::NegatedAttrTriple(t.clone()));
@ -111,6 +131,13 @@ impl NormalFormRule {
pending.push(NormalFormAtom::NegatedRule(r.clone()));
}
}
NormalFormAtom::NegatedView(v) => {
if v.args.iter().all(|a| seen_variables.contains(a)) {
collected.push(NormalFormAtom::NegatedView(v.clone()));
} else {
collected.push(NormalFormAtom::NegatedView(v.clone()));
}
}
NormalFormAtom::Predicate(p) => {
if p.bindings().is_subset(&seen_variables) {
collected.push(NormalFormAtom::Predicate(p.clone()));
@ -132,7 +159,9 @@ impl NormalFormRule {
if !pending.is_empty() {
for atom in pending {
match atom {
NormalFormAtom::AttrTriple(_) | NormalFormAtom::Rule(_) => unreachable!(),
NormalFormAtom::AttrTriple(_)
| NormalFormAtom::Rule(_)
| NormalFormAtom::View(_) => unreachable!(),
NormalFormAtom::NegatedAttrTriple(t) => {
if seen_variables.contains(&t.value) || seen_variables.contains(&t.entity) {
collected.push(NormalFormAtom::NegatedAttrTriple(t.clone()));
@ -147,6 +176,13 @@ impl NormalFormRule {
bail!("found unsafe rule application in rule: {:?}", r);
}
}
NormalFormAtom::NegatedView(v) => {
if v.args.iter().any(|a| seen_variables.contains(a)) {
collected.push(NormalFormAtom::NegatedView(v.clone()));
} else {
bail!("found unsafe rule application in view: {:?}", v);
}
}
NormalFormAtom::Predicate(p) => {
bail!("found unsafe predicate in rule: {:?}", p)
}

@ -8,16 +8,16 @@ use crate::data::symb::Symbol;
use crate::data::tuple::Tuple;
use crate::data::value::DataValue;
use crate::parse::query::SortDir;
use crate::runtime::temp_store::TempStore;
use crate::runtime::derived::DerivedRelStore;
use crate::runtime::transact::SessionTx;
impl SessionTx {
pub(crate) fn sort_and_collect(
&mut self,
original: TempStore,
original: DerivedRelStore,
sorters: &[(Symbol, SortDir)],
head: &[Symbol],
) -> Result<TempStore> {
) -> Result<DerivedRelStore> {
let head_indices: BTreeMap<_, _> = head.iter().enumerate().map(|(i, k)| (k, i)).collect();
let idx_sorters = sorters
.iter()

@ -14,6 +14,8 @@ impl NormalFormAtom {
fn contained_rules(&self) -> BTreeMap<&Symbol, bool> {
match self {
NormalFormAtom::AttrTriple(_)
| NormalFormAtom::View(_)
| NormalFormAtom::NegatedView(_)
| NormalFormAtom::Predicate(_)
| NormalFormAtom::Unification(_)
| NormalFormAtom::NegatedAttrTriple(_) => Default::default(),

@ -1,6 +1,6 @@
use std::collections::BTreeMap;
use std::fmt::{Debug, Formatter};
use std::sync::atomic::{AtomicU32, AtomicU64, AtomicUsize, Ordering};
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::sync::Arc;
use anyhow::Result;
@ -9,7 +9,7 @@ use itertools::Itertools;
use log::debug;
use serde_json::json;
use cozorocks::{DbBuilder, DbIter, RawRocksDb, RocksDb};
use cozorocks::{DbBuilder, DbIter, RocksDb};
use crate::data::compare::{rusty_cmp, DB_KEY_PREFIX_LEN};
use crate::data::encode::{
@ -30,11 +30,11 @@ use crate::runtime::transact::SessionTx;
pub struct Db {
db: RocksDb,
view_db: RawRocksDb,
view_db: RocksDb,
last_attr_id: Arc<AtomicU64>,
last_ent_id: Arc<AtomicU64>,
last_tx_id: Arc<AtomicU64>,
temp_store_id: Arc<AtomicU32>,
view_store_id: Arc<AtomicU64>,
n_sessions: Arc<AtomicUsize>,
session_id: usize,
}
@ -60,11 +60,12 @@ impl Db {
let view_db_builder = db_builder
.clone()
.path(&temp_path)
.optimistic(false)
.use_capped_prefix_extractor(true, SCRATCH_DB_KEY_PREFIX_LEN)
.use_custom_comparator("cozo_rusty_scratch_cmp", rusty_scratch_cmp, false);
let db = db_builder.build()?;
let view_db = view_db_builder.build_raw(true)?;
let view_db = view_db_builder.build()?;
let ret = Self {
db,
@ -72,7 +73,7 @@ impl Db {
last_attr_id: Arc::new(Default::default()),
last_ent_id: Arc::new(Default::default()),
last_tx_id: Arc::new(Default::default()),
temp_store_id: Arc::new(Default::default()),
view_store_id: Arc::new(Default::default()),
n_sessions: Arc::new(Default::default()),
session_id: Default::default(),
};
@ -89,7 +90,7 @@ impl Db {
last_attr_id: self.last_attr_id.clone(),
last_ent_id: self.last_ent_id.clone(),
last_tx_id: self.last_tx_id.clone(),
temp_store_id: self.temp_store_id.clone(),
view_store_id: self.view_store_id.clone(),
n_sessions: self.n_sessions.clone(),
session_id: old_count + 1,
})
@ -103,13 +104,16 @@ impl Db {
.store(tx.load_last_attr_id()?.0, Ordering::Release);
self.last_ent_id
.store(tx.load_last_entity_id()?.0, Ordering::Release);
self.view_store_id
.store(tx.load_last_view_store_id()?.0, Ordering::Release);
Ok(())
}
pub fn transact(&self) -> Result<SessionTx> {
let ret = SessionTx {
tx: self.db.transact().set_snapshot(true).start(),
view_db: self.view_db.clone(),
temp_store_id: self.temp_store_id.clone(),
mem_store_id: Default::default(),
view_store_id: self.view_store_id.clone(),
w_tx_id: None,
last_attr_id: self.last_attr_id.clone(),
last_ent_id: self.last_ent_id.clone(),
@ -129,7 +133,8 @@ impl Db {
let ret = SessionTx {
tx: self.db.transact().set_snapshot(true).start(),
view_db: self.view_db.clone(),
temp_store_id: self.temp_store_id.clone(),
mem_store_id: Default::default(),
view_store_id: self.view_store_id.clone(),
w_tx_id: Some(cur_tx_id),
last_attr_id: self.last_attr_id.clone(),
last_ent_id: self.last_ent_id.clone(),

@ -17,31 +17,31 @@ use crate::data::value::DataValue;
use crate::query::eval::QueryLimiter;
#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
pub(crate) struct TempStoreId(pub(crate) u32);
pub(crate) struct DerivedRelStoreId(pub(crate) u32);
impl Debug for TempStoreId {
impl Debug for DerivedRelStoreId {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "t{}", self.0)
}
}
#[derive(Clone)]
pub(crate) struct TempStore {
pub(crate) struct DerivedRelStore {
mem_db: Arc<RwLock<Vec<Arc<RwLock<BTreeMap<Tuple, Tuple>>>>>>,
epoch_size: Arc<AtomicU32>,
pub(crate) id: TempStoreId,
pub(crate) id: DerivedRelStoreId,
pub(crate) rule_name: MagicSymbol,
pub(crate) arity: usize,
}
impl Debug for TempStore {
impl Debug for DerivedRelStore {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "TempStore<{}>", self.id.0)
}
}
impl TempStore {
pub(crate) fn new(id: TempStoreId, rule_name: MagicSymbol, arity: usize) -> TempStore {
impl DerivedRelStore {
pub(crate) fn new(id: DerivedRelStoreId, rule_name: MagicSymbol, arity: usize) -> DerivedRelStore {
Self {
epoch_size: Default::default(),
mem_db: Default::default(),
@ -173,7 +173,7 @@ impl TempStore {
pub(crate) fn normal_aggr_scan_and_put(
&self,
aggrs: &[Option<(Aggregation, Vec<DataValue>)>],
store: &TempStore,
store: &DerivedRelStore,
mut limiter: Option<&mut QueryLimiter>,
) -> Result<bool> {
let db_target = self.mem_db.try_read().unwrap();

@ -1,3 +1,4 @@
pub(crate) mod db;
pub(crate) mod transact;
pub(crate) mod temp_store;
pub(crate) mod derived;
pub(crate) mod view;

@ -7,7 +7,7 @@ use rmp_serde::Serializer;
use serde::Serialize;
use smallvec::SmallVec;
use cozorocks::{DbIter, RawRocksDb, Tx};
use cozorocks::{DbIter, RocksDb, Tx};
use crate::data::attr::Attribute;
use crate::data::encode::{
@ -16,13 +16,16 @@ use crate::data::encode::{
use crate::data::id::{AttrId, EntityId, TxId, Validity};
use crate::data::program::MagicSymbol;
use crate::data::symb::Symbol;
use crate::data::tuple::Tuple;
use crate::data::value::DataValue;
use crate::runtime::temp_store::{TempStore, TempStoreId};
use crate::runtime::derived::{DerivedRelStore, DerivedRelStoreId};
use crate::runtime::view::ViewRelId;
pub struct SessionTx {
pub(crate) tx: Tx,
pub(crate) view_db: RawRocksDb,
pub(crate) temp_store_id: Arc<AtomicU32>,
pub(crate) view_db: RocksDb,
pub(crate) view_store_id: Arc<AtomicU64>,
pub(crate) mem_store_id: Arc<AtomicU32>,
pub(crate) w_tx_id: Option<TxId>,
pub(crate) last_attr_id: Arc<AtomicU64>,
pub(crate) last_ent_id: Arc<AtomicU64>,
@ -68,17 +71,17 @@ impl TxLog {
}
impl SessionTx {
pub(crate) fn new_rule_store(&self, rule_name: MagicSymbol, arity: usize) -> TempStore {
let old_count = self.temp_store_id.fetch_add(1, Ordering::AcqRel);
pub(crate) fn new_rule_store(&self, rule_name: MagicSymbol, arity: usize) -> DerivedRelStore {
let old_count = self.mem_store_id.fetch_add(1, Ordering::AcqRel);
let old_count = old_count & 0x00ff_ffffu32;
TempStore::new(TempStoreId(old_count), rule_name, arity)
DerivedRelStore::new(DerivedRelStoreId(old_count), rule_name, arity)
}
pub(crate) fn new_temp_store(&self) -> TempStore {
let old_count = self.temp_store_id.fetch_add(1, Ordering::AcqRel);
pub(crate) fn new_temp_store(&self) -> DerivedRelStore {
let old_count = self.mem_store_id.fetch_add(1, Ordering::AcqRel);
let old_count = old_count & 0x00ff_ffffu32;
TempStore::new(
TempStoreId(old_count),
DerivedRelStore::new(
DerivedRelStoreId(old_count),
MagicSymbol::Muggle {
inner: Symbol::from(""),
},
@ -115,6 +118,17 @@ impl SessionTx {
})
}
pub(crate) fn load_last_view_store_id(&mut self) -> Result<ViewRelId> {
let tuple = Tuple(vec![DataValue::Null]);
let t_encoded = tuple.encode_as_key(ViewRelId::SYSTEM);
let vtx = self.view_db.transact().start();
let found = vtx.get(&t_encoded, false)?;
match found {
None => Ok(ViewRelId::SYSTEM),
Some(slice) => ViewRelId::raw_decode(&slice),
}
}
pub(crate) fn load_last_tx_id(&mut self) -> Result<TxId> {
let e_lower = encode_tx(TxId::MAX_USER);
let e_upper = encode_tx(TxId::MAX_SYS);

@ -0,0 +1,204 @@
use std::fmt::{Debug, Formatter};
use std::sync::atomic::Ordering;
use anyhow::{anyhow, bail, Result};
use rmp_serde::Serializer;
use serde::Serialize;
use cozorocks::{DbIter, RocksDb, Tx};
use crate::data::symb::Symbol;
use crate::data::tuple::{EncodedTuple, Tuple};
use crate::data::value::DataValue;
use crate::runtime::transact::SessionTx;
use crate::utils::swap_option_result;
#[derive(
Copy,
Clone,
Eq,
PartialEq,
Debug,
serde_derive::Serialize,
serde_derive::Deserialize,
PartialOrd,
Ord,
)]
pub(crate) struct ViewRelId(pub(crate) u64);
impl ViewRelId {
pub(crate) fn new(u: u64) -> Result<Self> {
if u > 2u64.pow(6 * 8) {
bail!("StoredRelId overflow: {}", u)
} else {
Ok(Self(u))
}
}
pub(crate) fn next(&self) -> Result<Self> {
Self::new(self.0 + 1)
}
pub(crate) const SYSTEM: Self = Self(0);
pub(crate) fn raw_encode(&self) -> [u8; 8] {
self.0.to_be_bytes()
}
pub(crate) fn raw_decode(src: &[u8]) -> Result<Self> {
if src.len() < 8 {
bail!("cannot decode bytes as StoredRelId: {:x?}", src)
} else {
let u = u64::from_be_bytes([
src[0], src[1], src[2], src[3], src[4], src[5], src[6], src[7],
]);
Self::new(u)
}
}
}
#[derive(Clone, Eq, PartialEq, Debug, serde_derive::Serialize, serde_derive::Deserialize)]
pub(crate) enum ViewRelKind {
Manual,
AutoByCount,
}
#[derive(Clone, Eq, PartialEq, Debug, serde_derive::Serialize, serde_derive::Deserialize)]
pub(crate) struct ViewRelMetadata {
pub(crate) name: Symbol,
pub(crate) id: ViewRelId,
pub(crate) arity: usize,
pub(crate) kind: ViewRelKind,
}
#[derive(Clone)]
pub(crate) struct ViewRelStore {
view_db: RocksDb,
pub(crate) metadata: ViewRelMetadata,
}
impl Debug for ViewRelStore {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "ViewStore<{}>", self.metadata.name)
}
}
impl ViewRelStore {
pub(crate) fn scan_all(&self) -> Result<impl Iterator<Item = Result<Tuple>>> {
let lower = Tuple::default().encode_as_key(self.metadata.id);
let upper = Tuple::default().encode_as_key(self.metadata.id.next()?);
Ok(ViewRelIterator::new(&self.view_db, &lower, &upper))
}
pub(crate) fn scan_prefix(
&self,
prefix: &Tuple,
) -> impl Iterator<Item = Result<Tuple>> {
let mut upper = prefix.0.clone();
upper.push(DataValue::Bottom);
let prefix_encoded = prefix.encode_as_key(self.metadata.id);
let upper_encoded = Tuple(upper).encode_as_key(self.metadata.id);
ViewRelIterator::new(&self.view_db, &prefix_encoded, &upper_encoded)
}
pub(crate) fn scan_bounded_prefix(
&self,
prefix: &Tuple,
lower: &[DataValue],
upper: &[DataValue],
) -> impl Iterator<Item = Result<Tuple>> {
let mut lower_t = prefix.clone();
lower_t.0.extend_from_slice(lower);
let mut upper_t = prefix.clone();
upper_t.0.extend_from_slice(upper);
upper_t.0.push(DataValue::Bottom);
let lower_encoded = lower_t.encode_as_key(self.metadata.id);
let upper_encoded = upper_t.encode_as_key(self.metadata.id);
ViewRelIterator::new(&self.view_db, &lower_encoded, &upper_encoded)
}
}
struct ViewRelIterator {
inner: DbIter,
started: bool,
}
impl ViewRelIterator {
fn new(db: &RocksDb, lower: &[u8], upper: &[u8]) -> Self {
let mut inner = db.transact().start().iterator().upper_bound(&upper).start();
inner.seek(&lower);
Self {
inner,
started: false,
}
}
fn next_inner(&mut self) -> Result<Option<Tuple>> {
if self.started {
self.inner.next()
} else {
self.started = true;
}
Ok(match self.inner.key()? {
None => None,
Some(k_slice) => Some(EncodedTuple(k_slice).decode()?),
})
}
}
impl Iterator for ViewRelIterator {
type Item = Result<Tuple>;
fn next(&mut self) -> Option<Self::Item> {
swap_option_result(self.next_inner())
}
}
impl SessionTx {
pub(crate) fn create_view_rel(&self, mut meta: ViewRelMetadata) -> Result<ViewRelStore> {
let key = DataValue::String(meta.name.0.clone());
let encoded = Tuple(vec![key]).encode_as_key(ViewRelId::SYSTEM);
let mut vtx = self.view_db.transact().set_snapshot(true).start();
if vtx.exists(&encoded, true)? {
bail!(
"cannot create view {}: one with the same name already exists",
meta.name
)
};
let last_id = self.view_store_id.fetch_add(1, Ordering::SeqCst);
meta.id = ViewRelId::new(last_id + 1)?;
vtx.put(&encoded, &meta.id.raw_encode())?;
let name_key =
Tuple(vec![DataValue::String(meta.name.0.clone())]).encode_as_key(ViewRelId::SYSTEM);
let mut meta_val = vec![];
meta.serialize(&mut Serializer::new(&mut meta_val)).unwrap();
vtx.put(&name_key, &meta_val)?;
Ok(ViewRelStore {
view_db: self.view_db.clone(),
metadata: meta,
})
}
pub(crate) fn get_view_rel(&self, name: &Symbol) -> Result<ViewRelStore> {
let vtx = self.view_db.transact().start();
self.do_get_view_rel(name, &vtx)
}
fn do_get_view_rel(&self, name: &Symbol, vtx: &Tx) -> Result<ViewRelStore> {
let key = DataValue::String(name.0.clone());
let encoded = Tuple(vec![key]).encode_as_key(ViewRelId::SYSTEM);
let found = vtx
.get(&encoded, true)?
.ok_or_else(|| anyhow!("cannot find stored view {}", name))?;
let metadata: ViewRelMetadata = rmp_serde::from_slice(&found)?;
Ok(ViewRelStore {
view_db: self.view_db.clone(),
metadata,
})
}
pub(crate) fn destroy_view_rel(&self, name: &Symbol) -> Result<()> {
let mut vtx = self.view_db.transact().start();
let store = self.do_get_view_rel(name, &vtx)?;
let key = DataValue::String(name.0.clone());
let encoded = Tuple(vec![key]).encode_as_key(ViewRelId::SYSTEM);
vtx.del(&encoded)?;
let lower_bound = Tuple::default().encode_as_key(store.metadata.id);
let upper_bound = Tuple::default().encode_as_key(store.metadata.id.next()?);
self.view_db.range_del(&lower_bound, &upper_bound)?;
Ok(())
}
}

@ -1,2 +1,2 @@
pub(crate) mod meta;
pub(crate) mod exec;
pub(crate) mod triple;

Loading…
Cancel
Save