replace yields with temp stores

main
Ziyang Hu 2 years ago
parent 0c7f2a3d27
commit 5963b35aac

@ -43,7 +43,7 @@ var = @{(XID_START | "_") ~ (XID_CONTINUE | "_")*}
param = @{"$" ~ (XID_CONTINUE | "_")*}
ident = @{XID_START ~ ("_" | XID_CONTINUE)*}
underscore_ident = @{("_" | XID_START) ~ ("_" | XID_CONTINUE)*}
relation_ident = @{"*" ~ compound_ident}
relation_ident = @{"*" ~ (compound_ident | underscore_ident)}
compound_ident = @{ident ~ ("." ~ ident)?}
rule = {rule_head ~ ":=" ~ rule_body ~ ";"?}
@ -108,13 +108,12 @@ list = { "[" ~ (expr ~ ",")* ~ expr? ~ "]" }
grouping = { "(" ~ expr ~ ")" }
option = _{(limit_option|offset_option|sort_option|relation_option|timeout_option|sleep_option|
assert_none_option|assert_some_option|yield_option) ~ ";"?}
assert_none_option|assert_some_option) ~ ";"?}
out_arg = @{var ~ ("(" ~ var ~ ")")?}
limit_option = {":limit" ~ expr}
offset_option = {":offset" ~ expr}
yield_option = {":yield" ~ ident}
sort_option = {(":sort" | ":order") ~ (sort_arg ~ ",")* ~ sort_arg }
relation_option = {relation_op ~ compound_ident ~ table_schema?}
relation_option = {relation_op ~ (compound_ident | underscore_ident) ~ table_schema?}
relation_op = _{relation_create | relation_replace | relation_put | relation_rm | relation_ensure | relation_ensure_not}
relation_create = {":create"}
relation_replace = {":replace"}

@ -42,7 +42,6 @@ pub(crate) struct QueryOutOptions {
pub(crate) sorters: Vec<(Symbol, SortDir)>,
pub(crate) store_relation: Option<(InputRelationHandle, RelationOp)>,
pub(crate) assertion: Option<QueryAssertion>,
pub(crate) yield_const: Option<Symbol>,
}
impl Debug for QueryOutOptions {
@ -199,21 +198,21 @@ impl InputInlineRulesOrFixed {
InputInlineRulesOrFixed::Fixed { fixed, .. } => fixed.span,
}
}
pub(crate) fn used_rule(&self, rule_name: &Symbol) -> bool {
match self {
InputInlineRulesOrFixed::Rules { rules, .. } => rules
.iter()
.any(|rule| rule.body.iter().any(|atom| atom.used_rule(rule_name))),
InputInlineRulesOrFixed::Fixed { fixed, .. } => fixed.rule_args.iter().any(|arg| {
if let FixedRuleArg::InMem { name, .. } = arg {
if name == rule_name {
return true;
}
}
false
}),
}
}
// pub(crate) fn used_rule(&self, rule_name: &Symbol) -> bool {
// match self {
// InputInlineRulesOrFixed::Rules { rules, .. } => rules
// .iter()
// .any(|rule| rule.body.iter().any(|atom| atom.used_rule(rule_name))),
// InputInlineRulesOrFixed::Fixed { fixed, .. } => fixed.rule_args.iter().any(|arg| {
// if let FixedRuleArg::InMem { name, .. } = arg {
// if name == rule_name {
// return true;
// }
// }
// false
// }),
// }
// }
}
pub(crate) struct FixedRuleApply {
@ -517,9 +516,9 @@ struct EntryHeadNotExplicitlyDefinedError(#[label] SourceSpan);
pub(crate) struct NoEntryError;
impl InputProgram {
pub(crate) fn used_rule(&self, rule_name: &Symbol) -> bool {
self.prog.values().any(|rule| rule.used_rule(rule_name))
}
// pub(crate) fn used_rule(&self, rule_name: &Symbol) -> bool {
// self.prog.values().any(|rule| rule.used_rule(rule_name))
// }
pub(crate) fn get_entry_arity(&self) -> Result<usize> {
if let Some(entry) = self.prog.get(&Symbol::new(PROG_ENTRY, SourceSpan(0, 0))) {
@ -968,16 +967,16 @@ impl Display for InputAtom {
}
impl InputAtom {
pub(crate) fn used_rule(&self, rule_name: &Symbol) -> bool {
match self {
InputAtom::Rule { inner } => inner.name == *rule_name,
InputAtom::Negation { inner, .. } => inner.used_rule(rule_name),
InputAtom::Conjunction { inner, .. } | InputAtom::Disjunction { inner, .. } => {
inner.iter().any(|a| a.used_rule(rule_name))
}
_ => false,
}
}
// pub(crate) fn used_rule(&self, rule_name: &Symbol) -> bool {
// match self {
// InputAtom::Rule { inner } => inner.name == *rule_name,
// InputAtom::Negation { inner, .. } => inner.used_rule(rule_name),
// InputAtom::Conjunction { inner, .. } | InputAtom::Disjunction { inner, .. } => {
// inner.iter().any(|a| a.used_rule(rule_name))
// }
// _ => false,
// }
// }
pub(crate) fn span(&self) -> SourceSpan {
match self {
InputAtom::Negation { span, .. }

@ -78,6 +78,9 @@ impl Symbol {
span,
}
}
pub(crate) fn is_temp_store_name(&self) -> bool {
self.name.starts_with('_')
}
pub(crate) fn is_prog_entry(&self) -> bool {
self.name == "?"
}

@ -346,14 +346,6 @@ pub(crate) fn parse_query(
);
out_opts.assertion = Some(QueryAssertion::AssertSome(pair.extract_span()))
}
Rule::yield_option => {
ensure!(
out_opts.yield_const.is_none(),
DuplicateYield(pair.extract_span())
);
let p = pair.into_inner().next().unwrap();
out_opts.yield_const = Some(Symbol::new(p.as_str(), p.extract_span()));
}
Rule::EOI => break,
r => unreachable!("{:?}", r),
}

@ -26,7 +26,7 @@ use crate::parse::parse_script;
use crate::runtime::relation::{AccessLevel, InputRelationHandle, InsufficientAccessLevel};
use crate::runtime::transact::SessionTx;
use crate::storage::Storage;
use crate::Db;
use crate::{Db, StoreTx};
#[derive(Debug, Error, Diagnostic)]
#[error("attempting to write into relation {0} of arity {1} with data of arity {2}")]
@ -108,7 +108,8 @@ impl<'a> SessionTx<'a> {
headers,
)?;
let has_triggers = !relation_store.rm_triggers.is_empty();
let has_triggers =
!relation_store.is_temp && !relation_store.rm_triggers.is_empty();
let mut new_tuples: Vec<DataValue> = vec![];
let mut old_tuples: Vec<DataValue> = vec![];
@ -133,7 +134,11 @@ impl<'a> SessionTx<'a> {
}
new_tuples.push(DataValue::List(extracted.clone()));
}
self.store_tx.del(&key)?;
if relation_store.is_temp {
self.temp_store_tx.del(&key)?;
} else {
self.store_tx.del(&key)?;
}
}
if has_triggers && !new_tuples.is_empty() {
@ -205,7 +210,11 @@ impl<'a> SessionTx<'a> {
let key = relation_store.encode_key_for_store(&extracted, *span)?;
let val = relation_store.encode_val_for_store(&extracted, *span)?;
let existing = self.store_tx.get(&key, true)?;
let existing = if relation_store.is_temp {
self.temp_store_tx.get(&key, true)?
} else {
self.store_tx.get(&key, true)?
};
match existing {
None => {
bail!(TransactAssertionFailure {
@ -249,7 +258,12 @@ impl<'a> SessionTx<'a> {
.map(|ex| ex.extract_data(&tuple, cur_vld))
.try_collect()?;
let key = relation_store.encode_key_for_store(&extracted, *span)?;
if self.store_tx.exists(&key, true)? {
let already_exists = if relation_store.is_temp {
self.temp_store_tx.exists(&key, true)?
} else {
self.store_tx.exists(&key, true)?
};
if already_exists {
bail!(TransactAssertionFailure {
relation: relation_store.name.to_string(),
key: extracted,
@ -274,7 +288,8 @@ impl<'a> SessionTx<'a> {
headers,
)?;
let has_triggers = !relation_store.put_triggers.is_empty();
let has_triggers =
!relation_store.is_temp && !relation_store.put_triggers.is_empty();
let mut new_tuples: Vec<DataValue> = vec![];
let mut old_tuples: Vec<DataValue> = vec![];
@ -310,7 +325,11 @@ impl<'a> SessionTx<'a> {
new_tuples.push(DataValue::List(extracted));
}
self.store_tx.put(&key, &val)?;
if relation_store.is_temp {
self.temp_store_tx.put(&key, &val)?;
} else {
self.store_tx.put(&key, &val)?;
}
}
if has_triggers && !new_tuples.is_empty() {

@ -22,21 +22,15 @@ use itertools::Itertools;
#[allow(unused_imports)]
use miette::{bail, ensure, miette, Diagnostic, IntoDiagnostic, Result, WrapErr};
use serde_json::json;
use smartstring::SmartString;
use thiserror::Error;
use crate::data::expr::Expr;
use crate::data::functions::current_validity;
use crate::data::json::JsonValue;
use crate::data::program::{
FixedRuleApply, InputInlineRulesOrFixed, InputProgram, QueryAssertion, RelationOp,
};
use crate::data::program::{InputProgram, QueryAssertion, RelationOp};
use crate::data::relation::ColumnDef;
use crate::data::symb::Symbol;
use crate::data::tuple::{Tuple, TupleT};
use crate::data::value::{DataValue, ValidityTs, LARGEST_UTF_CHAR};
use crate::fixed_rule::utilities::Constant;
use crate::fixed_rule::{FixedRuleHandle, DEFAULT_FIXED_RULES};
use crate::fixed_rule::DEFAULT_FIXED_RULES;
use crate::parse::sys::SysOp;
use crate::parse::{parse_script, CozoScript, SourceSpan};
use crate::query::compile::{CompiledProgram, CompiledRule, CompiledRuleSet};
@ -46,6 +40,7 @@ use crate::query::ra::{
};
use crate::runtime::relation::{AccessLevel, InsufficientAccessLevel, RelationHandle, RelationId};
use crate::runtime::transact::SessionTx;
use crate::storage::temp::TempStorage;
use crate::storage::{Storage, StoreTx};
use crate::{decode_tuple_from_kv, FixedRule};
@ -86,6 +81,7 @@ pub type TxCallback = Box<dyn FnMut(CallbackOp, Tuple) + Send + Sync>;
#[derive(Clone)]
pub struct Db<S> {
db: S,
temp_db: TempStorage,
relation_store_id: Arc<AtomicU64>,
queries_count: Arc<AtomicU64>,
running_queries: Arc<Mutex<BTreeMap<u64, RunningQueryHandle>>>,
@ -143,6 +139,7 @@ impl<'s, S: Storage<'s>> Db<S> {
pub fn new(storage: S) -> Result<Self> {
let ret = Self {
db: storage,
temp_db: Default::default(),
relation_store_id: Arc::new(Default::default()),
queries_count: Arc::new(Default::default()),
running_queries: Arc::new(Mutex::new(Default::default())),
@ -487,14 +484,18 @@ impl<'s, S: Storage<'s>> Db<S> {
fn transact(&'s self) -> Result<SessionTx<'_>> {
let ret = SessionTx {
store_tx: Box::new(self.db.transact(false)?),
temp_store_tx: self.temp_db.transact(true)?,
relation_store_id: self.relation_store_id.clone(),
temp_store_id: Default::default(),
};
Ok(ret)
}
fn transact_write(&'s self) -> Result<SessionTx<'_>> {
let ret = SessionTx {
store_tx: Box::new(self.db.transact(true)?),
temp_store_tx: self.temp_db.transact(true)?,
relation_store_id: self.relation_store_id.clone(),
temp_store_id: Default::default(),
};
Ok(ret)
}
@ -506,7 +507,13 @@ impl<'s, S: Storage<'s>> Db<S> {
) -> Result<NamedRows> {
match parse_script(payload, param_pool, &self.algorithms, cur_vld)? {
CozoScript::Multi(ps) => {
let is_write = ps.iter().any(|p| p.out_opts.store_relation.is_some());
let is_write = ps.iter().any(|p| {
if let Some((h, _)) = &p.out_opts.store_relation {
!h.name.name.starts_with('_')
} else {
false
}
});
let mut cleanups = vec![];
let mut res = NamedRows {
headers: vec![],
@ -519,21 +526,10 @@ impl<'s, S: Storage<'s>> Db<S> {
self.transact()?
};
let mut propagate_results = BTreeMap::new();
let prog_n = ps.len();
for (i, mut p) in ps.into_iter().enumerate() {
for p in ps.into_iter() {
#[allow(unused_variables)]
let sleep_opt = p.out_opts.sleep;
let prop = p.out_opts.yield_const.clone();
propagate_previous_results(&mut p, &propagate_results)?;
let (q_res, q_cleanups) = self.run_query(&mut tx, p, cur_vld)?;
if let Some(to_yield) = prop {
if i != prog_n - 1 {
propagate_results.insert(to_yield, q_res.clone());
}
}
res = q_res;
cleanups.extend(q_cleanups);
#[cfg(not(target_arch = "wasm32"))]
@ -1232,53 +1228,3 @@ impl Poison {
Ok(())
}
}
fn propagate_previous_results(
p: &mut InputProgram,
prev_results: &BTreeMap<Symbol, NamedRows>,
) -> Result<()> {
for (k, v) in prev_results {
if !p.used_rule(k) {
continue;
}
let replaced = p.prog.insert(
k.clone(),
InputInlineRulesOrFixed::Fixed {
fixed: FixedRuleApply {
fixed_handle: FixedRuleHandle {
name: Symbol::new("Constant", Default::default()),
},
rule_args: vec![],
options: Arc::new(BTreeMap::from([(
SmartString::from("data"),
Expr::Const {
val: DataValue::List(
v.rows
.iter()
.map(|row| DataValue::List(row.clone()))
.collect_vec(),
),
span: Default::default(),
},
)])),
head: vec![],
arity: v.headers.len(),
span: Default::default(),
fixed_impl: Arc::new(Box::new(Constant)),
},
},
);
if let Some(replaced_rel) = replaced {
#[derive(Debug, Diagnostic, Error)]
#[error("Name conflict with previous yield: '{0}'")]
#[diagnostic(code(db::name_confilict_with_yield))]
pub(crate) struct ConflictWithPrevYield(String, #[label] SourceSpan);
bail!(ConflictWithPrevYield(
k.to_string(),
replaced_rel.first_span()
))
}
}
Ok(())
}

@ -23,6 +23,7 @@ use crate::data::tuple::{decode_tuple_from_key, Tuple, TupleT, ENCODED_KEY_MIN_L
use crate::data::value::{DataValue, ValidityTs};
use crate::parse::SourceSpan;
use crate::runtime::transact::SessionTx;
use crate::StoreTx;
#[derive(
Copy,
@ -69,6 +70,8 @@ pub(crate) struct RelationHandle {
pub(crate) rm_triggers: Vec<String>,
pub(crate) replace_triggers: Vec<String>,
pub(crate) access_level: AccessLevel,
#[serde(default)]
pub(crate) is_temp: bool,
}
#[derive(
@ -232,7 +235,11 @@ impl RelationHandle {
) -> impl Iterator<Item = Result<Tuple>> + 'a {
let lower = Tuple::default().encode_as_key(self.id);
let upper = Tuple::default().encode_as_key(self.id.next());
tx.store_tx.range_scan_tuple(&lower, &upper)
if self.is_temp {
tx.temp_store_tx.range_scan_tuple(&lower, &upper)
} else {
tx.store_tx.range_scan_tuple(&lower, &upper)
}
}
pub(crate) fn skip_scan_all<'a>(
@ -242,20 +249,36 @@ impl RelationHandle {
) -> impl Iterator<Item = Result<Tuple>> + 'a {
let lower = Tuple::default().encode_as_key(self.id);
let upper = Tuple::default().encode_as_key(self.id.next());
tx.store_tx.range_skip_scan_tuple(&lower, &upper, valid_at)
if self.is_temp {
tx.temp_store_tx
.range_skip_scan_tuple(&lower, &upper, valid_at)
} else {
tx.store_tx.range_skip_scan_tuple(&lower, &upper, valid_at)
}
}
pub(crate) fn get(&self, tx: &SessionTx<'_>, key: &[DataValue]) -> Result<Option<Tuple>> {
let key_data = key.encode_as_key(self.id);
Ok(tx
.store_tx
.get(&key_data, false)?
.map(|val_data| decode_tuple_from_kv(&key_data, &val_data)))
if self.is_temp {
Ok(tx
.temp_store_tx
.get(&key_data, false)?
.map(|val_data| decode_tuple_from_kv(&key_data, &val_data)))
} else {
Ok(tx
.store_tx
.get(&key_data, false)?
.map(|val_data| decode_tuple_from_kv(&key_data, &val_data)))
}
}
pub(crate) fn exists(&self, tx: &SessionTx<'_>, key: &[DataValue]) -> Result<bool> {
let key_data = key.encode_as_key(self.id);
tx.store_tx.exists(&key_data, false)
if self.is_temp {
tx.temp_store_tx.exists(&key_data, false)
} else {
tx.store_tx.exists(&key_data, false)
}
}
pub(crate) fn scan_prefix<'a>(
@ -269,8 +292,13 @@ impl RelationHandle {
upper.push(DataValue::Bot);
let prefix_encoded = lower.encode_as_key(self.id);
let upper_encoded = upper.encode_as_key(self.id);
tx.store_tx
.range_scan_tuple(&prefix_encoded, &upper_encoded)
if self.is_temp {
tx.temp_store_tx
.range_scan_tuple(&prefix_encoded, &upper_encoded)
} else {
tx.store_tx
.range_scan_tuple(&prefix_encoded, &upper_encoded)
}
}
pub(crate) fn skip_scan_prefix<'a>(
@ -285,8 +313,13 @@ impl RelationHandle {
upper.push(DataValue::Bot);
let prefix_encoded = lower.encode_as_key(self.id);
let upper_encoded = upper.encode_as_key(self.id);
tx.store_tx
.range_skip_scan_tuple(&prefix_encoded, &upper_encoded, valid_at)
if self.is_temp {
tx.temp_store_tx
.range_skip_scan_tuple(&prefix_encoded, &upper_encoded, valid_at)
} else {
tx.store_tx
.range_skip_scan_tuple(&prefix_encoded, &upper_encoded, valid_at)
}
}
pub(crate) fn scan_bounded_prefix<'a>(
@ -303,7 +336,12 @@ impl RelationHandle {
upper_t.push(DataValue::Bot);
let lower_encoded = lower_t.encode_as_key(self.id);
let upper_encoded = upper_t.encode_as_key(self.id);
tx.store_tx.range_scan_tuple(&lower_encoded, &upper_encoded)
if self.is_temp {
tx.temp_store_tx
.range_scan_tuple(&lower_encoded, &upper_encoded)
} else {
tx.store_tx.range_scan_tuple(&lower_encoded, &upper_encoded)
}
}
pub(crate) fn skip_scan_bounded_prefix<'a>(
&self,
@ -320,8 +358,13 @@ impl RelationHandle {
upper_t.push(DataValue::Bot);
let lower_encoded = lower_t.encode_as_key(self.id);
let upper_encoded = upper_t.encode_as_key(self.id);
tx.store_tx
.range_skip_scan_tuple(&lower_encoded, &upper_encoded, valid_at)
if self.is_temp {
tx.temp_store_tx
.range_skip_scan_tuple(&lower_encoded, &upper_encoded, valid_at)
} else {
tx.store_tx
.range_skip_scan_tuple(&lower_encoded, &upper_encoded, valid_at)
}
}
}
@ -350,7 +393,11 @@ impl<'a> SessionTx<'a> {
pub(crate) fn relation_exists(&self, name: &str) -> Result<bool> {
let key = DataValue::from(name);
let encoded = vec![key].encode_as_key(RelationId::SYSTEM);
self.store_tx.exists(&encoded, false)
if name.starts_with('_') {
self.temp_store_tx.exists(&encoded, false)
} else {
self.store_tx.exists(&encoded, false)
}
}
pub(crate) fn set_relation_triggers(
&mut self,
@ -359,6 +406,9 @@ impl<'a> SessionTx<'a> {
rms: Vec<String>,
replaces: Vec<String>,
) -> Result<()> {
if name.name.starts_with('_') {
bail!("Cannot set triggers for temp store")
}
let mut original = self.get_relation(&name, true)?;
if original.access_level < AccessLevel::Protected {
bail!(InsufficientAccessLevel(
@ -393,8 +443,14 @@ impl<'a> SessionTx<'a> {
bail!(RelNameConflictError(input_meta.name.to_string()))
};
let is_temp = input_meta.name.is_temp_store_name();
let metadata = input_meta.metadata.clone();
let last_id = self.relation_store_id.fetch_add(1, Ordering::SeqCst);
let last_id = if is_temp {
self.temp_store_id.fetch_add(1, Ordering::Relaxed) as u64
} else {
self.relation_store_id.fetch_add(1, Ordering::SeqCst)
};
let meta = RelationHandle {
name: input_meta.name.name,
id: RelationId::new(last_id + 1),
@ -403,19 +459,26 @@ impl<'a> SessionTx<'a> {
rm_triggers: vec![],
replace_triggers: vec![],
access_level: AccessLevel::Normal,
is_temp,
};
self.store_tx.put(&encoded, &meta.id.raw_encode())?;
let name_key = vec![DataValue::Str(meta.name.clone())].encode_as_key(RelationId::SYSTEM);
let mut meta_val = vec![];
meta.serialize(&mut Serializer::new(&mut meta_val).with_struct_map())
.unwrap();
self.store_tx.put(&name_key, &meta_val)?;
let tuple = vec![DataValue::Null];
let t_encoded = tuple.encode_as_key(RelationId::SYSTEM);
self.store_tx.put(&t_encoded, &meta.id.raw_encode())?;
if is_temp {
self.temp_store_tx.put(&encoded, &meta.id.raw_encode())?;
self.temp_store_tx.put(&name_key, &meta_val)?;
self.temp_store_tx.put(&t_encoded, &meta.id.raw_encode())?;
} else {
self.store_tx.put(&encoded, &meta.id.raw_encode())?;
self.store_tx.put(&name_key, &meta_val)?;
self.store_tx.put(&t_encoded, &meta.id.raw_encode())?;
}
Ok(meta)
}
pub(crate) fn get_relation(&self, name: &str, lock: bool) -> Result<RelationHandle> {
@ -427,14 +490,22 @@ impl<'a> SessionTx<'a> {
let key = DataValue::from(name);
let encoded = vec![key].encode_as_key(RelationId::SYSTEM);
let found = self
.store_tx
.get(&encoded, lock)?
.ok_or_else(|| StoredRelationNotFoundError(name.to_string()))?;
let found = if name.starts_with('_') {
self.temp_store_tx
.get(&encoded, lock)?
.ok_or_else(|| StoredRelationNotFoundError(name.to_string()))?
} else {
self.store_tx
.get(&encoded, lock)?
.ok_or_else(|| StoredRelationNotFoundError(name.to_string()))?
};
let metadata = RelationHandle::decode(&found)?;
Ok(metadata)
}
pub(crate) fn destroy_relation(&mut self, name: &str) -> Result<(Vec<u8>, Vec<u8>)> {
if name.starts_with('_') {
bail!("Cannot destroy temp relation");
}
let store = self.get_relation(name, true)?;
if store.access_level < AccessLevel::Normal {
bail!(InsufficientAccessLevel(
@ -464,6 +535,9 @@ impl<'a> SessionTx<'a> {
Ok(())
}
pub(crate) fn rename_relation(&mut self, old: Symbol, new: Symbol) -> Result<()> {
if old.name.starts_with('_') || new.name.starts_with('_') {
bail!("Bad name given");
}
let new_key = DataValue::Str(new.name.clone());
let new_encoded = vec![new_key].encode_as_key(RelationId::SYSTEM);

@ -7,10 +7,10 @@
*
*/
use crate::data::value::DataValue;
use log::debug;
use serde_json::json;
use crate::data::value::DataValue;
use crate::new_cozo_mem;
#[test]
@ -268,55 +268,22 @@ fn returning_relations() {
let res = db
.run_script(
r#"
{
?[] <- [[1,2,3]]
:yield nxt
}
{
?[a,b,c] := nxt[a, b, c]
}
{:create _xxz {a}}
{?[a] := a in [5,4,1,2,3] :put _xxz {a}}
{?[a] := *_xxz[a], (a % 2) == 0 :rm _xxz {a}}
{?[a] := *_xxz[b], a = b * 2}
"#,
Default::default(),
)
.unwrap()
.into_json();
assert_eq!(res["rows"], json!([[1, 2, 3]]));
.unwrap();
assert_eq!(res.into_json()["rows"], json!([[2], [6], [10]]));
let res = db
.run_script(
r#"
{
?[a] <- [[1]]
:yield first_yield
}
{
?[a] := first_yield[b], a = b + 1
:yield second_yield
}
{
?[a] := first_yield[a]
?[a] := second_yield[a]
}
{?[a] := *_xxz[b], a = b * 2}
"#,
Default::default(),
)
.unwrap()
.into_json();
assert_eq!(res["rows"], json!([[1], [2]]));
let res = db.run_script(
r#"
{
?[] <- [[1,2,3]]
:yield nxt
}
{
nxt[] <- [[2, 3, 5]]
?[a,b,c] := nxt[a, b, c]
}
"#,
Default::default(),
);
);
assert!(res.is_err());
}

@ -6,7 +6,7 @@
* You can obtain one at https://mozilla.org/MPL/2.0/.
*/
use std::sync::atomic::AtomicU64;
use std::sync::atomic::{AtomicU32, AtomicU64};
use std::sync::Arc;
use miette::Result;
@ -14,11 +14,14 @@ use miette::Result;
use crate::data::tuple::TupleT;
use crate::data::value::DataValue;
use crate::runtime::relation::RelationId;
use crate::storage::temp::TempTx;
use crate::storage::StoreTx;
pub struct SessionTx<'a> {
pub(crate) store_tx: Box<dyn StoreTx<'a> + 'a>,
pub(crate) temp_store_tx: TempTx,
pub(crate) relation_store_id: Arc<AtomicU64>,
pub(crate) temp_store_id: AtomicU32,
}
impl<'a> SessionTx<'a> {

@ -419,11 +419,11 @@ impl Iterator for CacheIter<'_> {
}
/// Keep an eye on https://github.com/rust-lang/rust/issues/49638
struct SkipIterator<'a> {
inner: &'a BTreeMap<Vec<u8>, Vec<u8>>,
upper: Vec<u8>,
valid_at: ValidityTs,
next_bound: Vec<u8>,
pub(crate) struct SkipIterator<'a> {
pub(crate) inner: &'a BTreeMap<Vec<u8>, Vec<u8>>,
pub(crate) upper: Vec<u8>,
pub(crate) valid_at: ValidityTs,
pub(crate) next_bound: Vec<u8>,
}
impl<'a> Iterator for SkipIterator<'a> {

@ -22,6 +22,7 @@ pub(crate) mod sled;
pub(crate) mod sqlite;
#[cfg(feature = "storage-tikv")]
pub(crate) mod tikv;
pub(crate) mod temp;
// pub(crate) mod re;
/// Swappable storage trait for Cozo's storage engine

@ -0,0 +1,132 @@
/*
* Copyright 2022, The Cozo Project Authors.
*
* This Source Code Form is subject to the terms of the Mozilla Public License, v. 2.0.
* If a copy of the MPL was not distributed with this file,
* You can obtain one at https://mozilla.org/MPL/2.0/.
*/
use std::collections::BTreeMap;
use std::default::Default;
use miette::Result;
use crate::data::tuple::Tuple;
use crate::data::value::ValidityTs;
use crate::runtime::relation::decode_tuple_from_kv;
use crate::storage::mem::SkipIterator;
use crate::storage::{Storage, StoreTx};
#[derive(Clone, Default)]
pub(crate) struct TempStorage;
impl<'s> Storage<'s> for TempStorage {
type Tx = TempTx;
fn storage_kind(&self) -> &'static str {
"temp"
}
fn transact(&'s self, _write: bool) -> Result<Self::Tx> {
Ok(TempTx {
store: Default::default(),
})
}
fn del_range(&'s self, _lower: &[u8], _upper: &[u8]) -> Result<()> {
panic!("del_range called on temp store")
}
fn range_compact(&'s self, _lower: &[u8], _upper: &[u8]) -> Result<()> {
panic!("range compact called on temp store")
}
fn batch_put<'a>(
&'a self,
_data: Box<dyn Iterator<Item = Result<(Vec<u8>, Vec<u8>)>> + 'a>,
) -> Result<()> {
panic!("batch put compact called on temp store")
}
}
pub(crate) struct TempTx {
store: BTreeMap<Vec<u8>, Vec<u8>>,
}
impl<'s> StoreTx<'s> for TempTx {
fn get(&self, key: &[u8], _for_update: bool) -> Result<Option<Vec<u8>>> {
Ok(self.store.get(key).cloned())
}
fn put(&mut self, key: &[u8], val: &[u8]) -> Result<()> {
self.store.insert(key.to_vec(), val.to_vec());
Ok(())
}
fn del(&mut self, key: &[u8]) -> Result<()> {
self.store.remove(key);
Ok(())
}
fn exists(&self, key: &[u8], _for_update: bool) -> Result<bool> {
Ok(self.store.contains_key(key))
}
fn commit(&mut self) -> Result<()> {
Ok(())
}
fn range_scan_tuple<'a>(
&'a self,
lower: &[u8],
upper: &[u8],
) -> Box<dyn Iterator<Item = Result<Tuple>> + 'a>
where
's: 'a,
{
Box::new(
self.store
.range(lower.to_vec()..upper.to_vec())
.map(|(k, v)| Ok(decode_tuple_from_kv(k, v))),
)
}
fn range_skip_scan_tuple<'a>(
&'a self,
lower: &[u8],
upper: &[u8],
valid_at: ValidityTs,
) -> Box<dyn Iterator<Item = Result<Tuple>> + 'a> {
Box::new(
SkipIterator {
inner: &self.store,
upper: upper.to_vec(),
valid_at,
next_bound: lower.to_vec(),
}
.map(Ok),
)
}
fn range_scan<'a>(
&'a self,
lower: &[u8],
upper: &[u8],
) -> Box<dyn Iterator<Item = Result<(Vec<u8>, Vec<u8>)>> + 'a>
where
's: 'a,
{
Box::new(
self.store
.range(lower.to_vec()..upper.to_vec())
.map(|(k, v)| Ok((k.clone(), v.clone()))),
)
}
fn total_scan<'a>(&'a self) -> Box<dyn Iterator<Item = Result<(Vec<u8>, Vec<u8>)>> + 'a>
where
's: 'a,
{
Box::new(self.store.iter().map(|(k, v)| Ok((k.clone(), v.clone()))))
}
}
Loading…
Cancel
Save