main
Ziyang Hu 2 years ago
parent 3665e7750a
commit 160fd7e034

@ -9,7 +9,6 @@ authors = ["Ziyang Hu"]
[dependencies]
uuid = { version = "0.8", features = ["v1", "v4", "serde"] }
nanoid = { version = "0.4.0", features = [] }
rand = "0.8.5"
anyhow = "1.0"
lazy_static = "1.4.0"

@ -4,6 +4,7 @@ use std::path::Path;
use actix_cors::Cors;
use actix_web::{App, HttpResponse, HttpServer, post, Responder, web};
use clap::Parser;
use log::info;
use cozo::Db;
use cozorocks::DbBuilder;
@ -104,7 +105,7 @@ async fn main() -> std::io::Result<()> {
let app_state = web::Data::new(AppStateWithDb { db });
let addr = (&args.bind as &str, args.port);
eprintln!("Serving database {} at {}:{}", args.path, addr.0, addr.1);
info!("Serving database {} at {}:{}", args.path, addr.0, addr.1);
HttpServer::new(move || {
let cors = Cors::permissive();

@ -307,7 +307,7 @@ impl Attribute {
}
self.val_type.coerce_value(value)
}
pub(crate) fn encode(&self) -> EncodedVec<INLINE_VAL_SIZE_LIMIT> {
pub fn encode(&self) -> EncodedVec<INLINE_VAL_SIZE_LIMIT> {
let mut ret = SmallVec::<[u8; INLINE_VAL_SIZE_LIMIT]>::new();
self.serialize(&mut Serializer::new(&mut ret)).unwrap();
ret.into()

@ -1,7 +1,6 @@
use std::fmt::{Debug, Display, Formatter};
use std::str::Utf8Error;
use nanoid::nanoid;
use serde_derive::{Deserialize, Serialize};
use smartstring::{LazyCompact, SmartString};
@ -52,25 +51,9 @@ impl TryFrom<&[u8]> for Keyword {
}
impl Keyword {
pub(crate) fn rand_ignored() -> Self {
let id = nanoid!();
Keyword::from(&format!("_{}", id) as &str)
}
pub(crate) fn is_reserved(&self) -> bool {
self.0.is_empty() || self.0.starts_with(['_', ':', '<', '.', '*', '?', '!'])
}
pub(crate) fn is_query_binding(&self) -> bool {
self.0.starts_with('?')
}
pub(crate) fn is_ignored_binding(&self) -> bool {
self.0.starts_with(['_', '*'])
}
pub(crate) fn is_ignored_wildcard(&self) -> bool {
self.0 == "_"
}
pub(crate) fn is_binding(&self) -> bool {
self.is_query_binding() || self.is_ignored_binding()
}
pub(crate) fn to_string_no_prefix(&self) -> String {
format!("{}", self.0)
}

@ -1,5 +1,5 @@
use std::cmp::Reverse;
use std::fmt::{Binary, Debug, Display, Formatter, Pointer};
use std::fmt::{ Debug, Formatter};
use anyhow::Result;
use ordered_float::OrderedFloat;

@ -6,6 +6,7 @@ use std::ops::Sub;
use anyhow::Result;
use itertools::Itertools;
use log::{debug, Level, log_enabled, trace};
use serde_json::Map;
use crate::data::attr::Attribute;
@ -14,12 +15,9 @@ use crate::data::keyword::Keyword;
use crate::data::value::DataValue;
use crate::preprocess::triple::TxError;
use crate::runtime::transact::SessionTx;
use crate::transact::query::{
InlineFixedRelation, InnerJoin, Joiner, Relation, ReorderRelation, StoredDerivedRelation,
TripleRelation,
};
use crate::transact::throwaway::ThrowawayArea;
use crate::{EntityId, Validity};
use crate::transact::query::Relation;
/// example ruleset in python and javascript
/// ```python
@ -111,7 +109,7 @@ pub enum Atom {
#[derive(Clone, Debug)]
pub struct RuleSet {
pub(crate) sets: Vec<Rule>,
pub(crate) rules: Vec<Rule>,
pub(crate) arity: usize,
}
@ -122,7 +120,7 @@ impl Rule {
if let Atom::Rule(rule) = clause {
collected.insert(rule.name.clone());
}
// todo: negation, etc
// todo: negation, disjunction, etc
}
collected
}
@ -176,8 +174,8 @@ impl SessionTx {
Keyword,
Vec<(Vec<BindingHeadTerm>, BTreeSet<Keyword>, Relation)>,
)> {
let mut collected = Vec::with_capacity(body.sets.len());
for rule in &body.sets {
let mut collected = Vec::with_capacity(body.rules.len());
for rule in &body.rules {
let header = rule.head.iter().map(|t| &t.name).cloned().collect_vec();
let relation =
self.compile_rule_body(&rule.body, rule.vld, &stores, &header)?;
@ -188,9 +186,11 @@ impl SessionTx {
)
.try_collect()?;
for (k, vs) in compiled.iter() {
for (i, (binding, _, rel)) in vs.iter().enumerate() {
eprintln!("{}.{} {:?}: {:#?}", k, i, BindingHeadFormatter(binding), rel)
if log_enabled!(Level::Debug) {
for (k, vs) in compiled.iter() {
for (i, (binding, _, rel)) in vs.iter().enumerate() {
debug!("{}.{} {:?}: {:#?}", k, i, BindingHeadFormatter(binding), rel)
}
}
}
@ -198,16 +198,16 @@ impl SessionTx {
let mut prev_changed = changed.clone();
for epoch in 0u32.. {
eprintln!("epoch {}", epoch);
debug!("epoch {}", epoch);
if epoch == 0 {
for (k, rules) in compiled.iter() {
let (store, _arity) = stores.get(k).unwrap();
let use_delta = BTreeSet::default();
for (rule_n, (_head, _deriving_rules, relation)) in rules.iter().enumerate() {
eprintln!("initial calculation for rule {}.{}", k, rule_n);
debug!("initial calculation for rule {}.{}", k, rule_n);
for item_res in relation.iter(self, Some(0), &use_delta) {
let item = item_res?;
eprintln!("item for {}.{}: {:?} at {}", k, rule_n, item, epoch);
trace!("item for {}.{}: {:?} at {}", k, rule_n, item, epoch);
store.put(&item, 0)?;
*changed.get_mut(k).unwrap() = true;
}
@ -223,32 +223,32 @@ impl SessionTx {
let (store, _arity) = stores.get(k).unwrap();
for (rule_n, (_head, deriving_rules, relation)) in rules.iter().enumerate() {
let mut should_do_calculation = false;
for drule in deriving_rules {
if *prev_changed.get(drule).unwrap() {
for d_rule in deriving_rules {
if *prev_changed.get(d_rule).unwrap() {
should_do_calculation = true;
break;
}
}
if !should_do_calculation {
eprintln!("skipping rule {}.{} as none of its dependencies changed in the last iteration", k, rule_n);
debug!("skipping rule {}.{} as none of its dependencies changed in the last iteration", k, rule_n);
continue;
}
for (delta_key, (delta_store, _)) in stores.iter() {
if !deriving_rules.contains(delta_key) {
continue;
}
eprintln!("with delta {} for rule {}.{}", delta_key, k, rule_n);
debug!("with delta {} for rule {}.{}", delta_key, k, rule_n);
let use_delta = BTreeSet::from([delta_store.id]);
for item_res in relation.iter(self, Some(epoch), &use_delta) {
let item = item_res?;
// improvement: the clauses can actually be evaluated in parallel
if store.exists(&item, 0)? {
eprintln!(
trace!(
"item for {}.{}: {:?} at {}, rederived",
k, rule_n, item, epoch
);
} else {
eprintln!("item for {}.{}: {:?} at {}", k, rule_n, item, epoch);
trace!("item for {}.{}: {:?} at {}", k, rule_n, item, epoch);
*changed.get_mut(k).unwrap() = true;
store.put(&item, epoch)?;
store.put(&item, 0)?;
@ -298,7 +298,7 @@ impl SessionTx {
return Err(QueryProcError::ArityMismatch(name).into());
}
}
Ok((name, RuleSet { sets: rules, arity }))
Ok((name, RuleSet { rules: rules, arity }))
})
.try_collect()
}
@ -438,7 +438,7 @@ impl SessionTx {
let mut ret = Relation::unit();
let mut seen_variables = BTreeSet::new();
let mut id_serial = 0;
let mut next_ignored_kw = || -> Keyword {
let mut gen_temp_kw = || -> Keyword {
let s = format!("*{}", id_serial);
let kw = Keyword::from(&s as &str);
id_serial += 1;
@ -448,8 +448,8 @@ impl SessionTx {
match clause {
Atom::AttrTriple(a_triple) => match (&a_triple.entity, &a_triple.value) {
(Term::Const(eid), Term::Var(v_kw)) => {
let temp_join_key_left = next_ignored_kw();
let temp_join_key_right = next_ignored_kw();
let temp_join_key_left = gen_temp_kw();
let temp_join_key_right = gen_temp_kw();
let const_rel = Relation::singlet(
vec![temp_join_key_left.clone()],
vec![DataValue::EnId(*eid)],
@ -465,7 +465,7 @@ impl SessionTx {
let v_kw = {
if seen_variables.contains(v_kw) {
let ret = next_ignored_kw();
let ret = gen_temp_kw();
// to_eliminate.insert(ret.clone());
join_left_keys.push(v_kw.clone());
join_right_keys.push(ret.clone());
@ -481,8 +481,8 @@ impl SessionTx {
ret = ret.join(right, join_left_keys, join_right_keys);
}
(Term::Var(e_kw), Term::Const(val)) => {
let temp_join_key_left = next_ignored_kw();
let temp_join_key_right = next_ignored_kw();
let temp_join_key_left = gen_temp_kw();
let temp_join_key_right = gen_temp_kw();
let const_rel =
Relation::singlet(vec![temp_join_key_left.clone()], vec![val.clone()]);
if ret.is_unit() {
@ -496,7 +496,7 @@ impl SessionTx {
let e_kw = {
if seen_variables.contains(&e_kw) {
let ret = next_ignored_kw();
let ret = gen_temp_kw();
join_left_keys.push(e_kw.clone());
join_right_keys.push(ret.clone());
ret
@ -518,7 +518,7 @@ impl SessionTx {
}
let e_kw = {
if seen_variables.contains(&e_kw) {
let ret = next_ignored_kw();
let ret = gen_temp_kw();
join_left_keys.push(e_kw.clone());
join_right_keys.push(ret.clone());
ret
@ -529,7 +529,7 @@ impl SessionTx {
};
let v_kw = {
if seen_variables.contains(v_kw) {
let ret = next_ignored_kw();
let ret = gen_temp_kw();
join_left_keys.push(v_kw.clone());
join_right_keys.push(ret.clone());
ret
@ -547,7 +547,7 @@ impl SessionTx {
}
}
(Term::Const(eid), Term::Const(val)) => {
let (left_var_1, left_var_2) = (next_ignored_kw(), next_ignored_kw());
let (left_var_1, left_var_2) = (gen_temp_kw(), gen_temp_kw());
let const_rel = Relation::singlet(
vec![left_var_1.clone(), left_var_2.clone()],
vec![DataValue::EnId(*eid), val.clone()],
@ -557,7 +557,7 @@ impl SessionTx {
} else {
ret = ret.cartesian_join(const_rel);
}
let (right_var_1, right_var_2) = (next_ignored_kw(), next_ignored_kw());
let (right_var_1, right_var_2) = (gen_temp_kw(), gen_temp_kw());
let right = Relation::triple(
a_triple.attr.clone(),
@ -592,7 +592,7 @@ impl SessionTx {
Term::Var(var) => {
if seen_variables.contains(var) {
prev_joiner_vars.push(var.clone());
let rk = next_ignored_kw();
let rk = gen_temp_kw();
right_vars.push(rk.clone());
right_joiner_vars.push(rk);
} else {
@ -602,10 +602,10 @@ impl SessionTx {
}
Term::Const(constant) => {
temp_left_joiner_vals.push(constant.clone());
let left_kw = next_ignored_kw();
let left_kw = gen_temp_kw();
prev_joiner_vars.push(left_kw.clone());
temp_left_bindings.push(left_kw);
let right_kw = next_ignored_kw();
let right_kw = gen_temp_kw();
right_joiner_vars.push(right_kw.clone());
right_vars.push(right_kw);
}

@ -223,7 +223,7 @@ impl InlineFixedRelation {
eliminate_indices: BTreeSet<usize>,
) -> TupleIter<'a> {
if self.data.is_empty() {
Box::new([].into_iter())
Box::new(iter::empty())
} else if self.data.len() == 1 {
let data = self.data[0].clone();
let right_join_values = right_join_indices
@ -552,7 +552,7 @@ impl TripleRelation {
eliminate_indices: BTreeSet<usize>,
) -> TupleIter<'a> {
// [f, b] where b is not indexed
let mut throwaway = tx.new_throwaway();
let throwaway = tx.new_throwaway();
for item in tx.triple_a_before_scan(self.attr.id, self.vld) {
match item {
Err(e) => return Box::new([Err(e)].into_iter()),
@ -782,8 +782,8 @@ impl Relation {
pub(crate) fn eliminate_temp_vars(&mut self, used: &BTreeSet<Keyword>) -> Result<()> {
match self {
Relation::Fixed(r) => r.do_eliminate_temp_vars(used),
Relation::Triple(r) => Ok(()),
Relation::Derived(r) => Ok(()),
Relation::Triple(_r) => Ok(()),
Relation::Derived(_r) => Ok(()),
Relation::Join(r) => r.do_eliminate_temp_vars(used),
Relation::Reorder(r) => r.relation.eliminate_temp_vars(used),
}

@ -1,4 +1,5 @@
use std::fmt::{Debug, Formatter};
use log::error;
use cozorocks::{DbIter, RawRocksDb, RocksDbStatus};
@ -31,27 +32,10 @@ impl ThrowawayArea {
let key_encoded = tuple.encode_as_key_for_epoch(self.id, epoch);
self.db.put(&key_encoded, &[])
}
pub(crate) fn put_if_absent(&self, tuple: &Tuple, epoch: u32) -> Result<bool, RocksDbStatus> {
let key_encoded = tuple.encode_as_key_for_epoch(self.id, epoch);
Ok(if !self.db.exists(&key_encoded)? {
self.db.put(&key_encoded, &[])?;
true
} else {
false
})
}
// pub(crate) fn get(&self, tuple: &Tuple, epoch: u32) -> Result<Option<PinSlice>, RocksDbStatus> {
// let key_encoded = tuple.encode_as_key_for_epoch(self.id, epoch);
// self.db.get(&key_encoded)
// }
pub(crate) fn exists(&self, tuple: &Tuple, epoch: u32) -> Result<bool, RocksDbStatus> {
let key_encoded = tuple.encode_as_key_for_epoch(self.id, epoch);
self.db.exists(&key_encoded)
}
pub(crate) fn del(&self, tuple: &Tuple, epoch: u32) -> Result<(), RocksDbStatus> {
let key_encoded = tuple.encode_as_key_for_epoch(self.id, epoch);
self.db.del(&key_encoded)
}
pub fn scan_all(&self) -> impl Iterator<Item = anyhow::Result<Tuple>> {
self.scan_all_for_epoch(0)
}
@ -113,7 +97,7 @@ impl Iterator for ThrowawayIter {
match self.it.pair() {
Err(e) => Some(Err(e.into())),
Ok(None) => None,
Ok(Some((k_slice, v_slice))) => match EncodedTuple(k_slice).decode() {
Ok(Some((k_slice, _v_slice))) => match EncodedTuple(k_slice).decode() {
Err(e) => Some(Err(e)),
Ok(t) => Some(Ok(t)),
},
@ -125,7 +109,7 @@ impl Drop for ThrowawayArea {
fn drop(&mut self) {
let (lower, upper) = EncodedTuple::bounds_for_prefix(self.id);
if let Err(e) = self.db.range_del(&lower, &upper) {
eprintln!("{}", e);
error!("{}", e);
}
}
}

@ -12,10 +12,16 @@ fn create_db(name: &str) -> Db {
Db::build(builder).unwrap()
}
fn init_logger() {
let _ = env_logger::builder().is_test(true).try_init();
}
fn test_send_sync<T: Send + Sync>(_: &T) {}
#[test]
fn creation() {
init_logger();
let db = create_db("_test_db");
test_send_sync(&db);
assert!(db.current_schema().unwrap().as_array().unwrap().is_empty());

Loading…
Cancel
Save