restructure code

main
Ziyang Hu 2 years ago
parent 758f7392f0
commit 348f17b2cf

@ -64,8 +64,9 @@ pub use storage::tikv::{new_cozo_tikv, TiKvStorage};
pub use storage::{Storage, StoreTx};
use crate::data::json::JsonValue;
use crate::runtime::callback::CallbackOp;
#[cfg(not(target_arch = "wasm32"))]
use crate::runtime::db::CallbackOp;
pub(crate) mod data;
pub(crate) mod fixed_rule;

@ -23,13 +23,13 @@ use crate::data::value::{DataValue, ValidityTs};
use crate::fixed_rule::utilities::constant::Constant;
use crate::fixed_rule::FixedRuleHandle;
use crate::parse::parse_script;
use crate::runtime::db::{CallbackCollector, CallbackOp};
use crate::runtime::relation::{
extend_tuple_from_v, AccessLevel, InputRelationHandle, InsufficientAccessLevel,
};
use crate::runtime::transact::SessionTx;
use crate::storage::Storage;
use crate::{Db, NamedRows, StoreTx};
use crate::runtime::callback::{CallbackCollector, CallbackOp};
#[derive(Debug, Error, Diagnostic)]
#[error("attempting to write into relation {0} of arity {1} with data of arity {2}")]

@ -0,0 +1,64 @@
/*
* Copyright 2022, The Cozo Project Authors.
*
* This Source Code Form is subject to the terms of the Mozilla Public License, v. 2.0.
* If a copy of the MPL was not distributed with this file,
* You can obtain one at https://mozilla.org/MPL/2.0/.
*/
use std::collections::{BTreeMap, BTreeSet};
use smartstring::{LazyCompact, SmartString};
use crate::{Db, NamedRows, Storage};
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub enum CallbackOp {
Put,
Rm,
}
#[cfg(not(target_arch = "wasm32"))]
pub struct CallbackDeclaration {
pub(crate) dependent: SmartString<LazyCompact>,
pub(crate) callback: Box<dyn Fn(CallbackOp, NamedRows, NamedRows) + Send + Sync>,
}
pub(crate) type CallbackCollector =
BTreeMap<SmartString<LazyCompact>, Vec<(CallbackOp, NamedRows, NamedRows)>>;
#[cfg(not(target_arch = "wasm32"))]
pub(crate) type EventCallbackRegistry = (
BTreeMap<u32, CallbackDeclaration>,
BTreeMap<SmartString<LazyCompact>, BTreeSet<u32>>,
);
impl<'s, S: Storage<'s>> Db<S> {
pub(crate) fn current_callback_targets(&self) -> BTreeSet<SmartString<LazyCompact>> {
#[cfg(not(target_arch = "wasm32"))]
{
self.event_callbacks
.read()
.unwrap()
.1
.keys()
.cloned()
.collect()
}
#[cfg(target_arch = "wasm32")]
{
Default::default()
}
}
#[cfg(not(target_arch = "wasm32"))]
pub(crate) fn send_callbacks(&'s self, collector: CallbackCollector) {
for (k, vals) in collector {
for (op, new, old) in vals {
self.callback_sender
.send((k.clone(), op, new, old))
.expect("sending to callback processor failed");
}
}
}
}

@ -23,32 +23,32 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH};
#[cfg(not(target_arch = "wasm32"))]
use crossbeam::channel::{unbounded, Sender};
use crossbeam::sync::ShardedLock;
use either::{Either, Left, Right};
use either::{Left, Right};
use itertools::Itertools;
use miette::Report;
#[allow(unused_imports)]
use miette::{bail, ensure, miette, Diagnostic, IntoDiagnostic, Result, WrapErr};
use serde_json::json;
use smartstring::{LazyCompact, SmartString};
use thiserror::Error;
use crate::data::expr::PredicateTypeError;
use crate::data::functions::{current_validity, op_to_bool};
use crate::data::functions::current_validity;
use crate::data::json::JsonValue;
use crate::data::program::{InputProgram, QueryAssertion, RelationOp};
use crate::data::relation::ColumnDef;
use crate::data::symb::Symbol;
use crate::data::tuple::{Tuple, TupleT};
use crate::data::value::{DataValue, ValidityTs, LARGEST_UTF_CHAR};
use crate::fixed_rule::DEFAULT_FIXED_RULES;
use crate::parse::sys::SysOp;
use crate::parse::{
parse_script, CozoScript, ImperativeCondition, ImperativeProgram, ImperativeStmt, SourceSpan,
};
use crate::parse::{parse_script, CozoScript, SourceSpan};
use crate::query::compile::{CompiledProgram, CompiledRule, CompiledRuleSet};
use crate::query::ra::{
FilteredRA, InnerJoin, NegJoin, RelAlgebra, ReorderRA, StoredRA, StoredWithValidityRA,
TempStoreRA, UnificationRA,
};
use crate::runtime::callback::{
CallbackCollector, CallbackDeclaration, CallbackOp, EventCallbackRegistry,
};
use crate::runtime::relation::{AccessLevel, InsufficientAccessLevel, RelationHandle, RelationId};
use crate::runtime::transact::SessionTx;
use crate::storage::temp::TempStorage;
@ -79,26 +79,10 @@ pub struct DbManifest {
pub storage_version: u64,
}
#[allow(dead_code)]
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub enum CallbackOp {
Put,
Rm,
}
#[cfg(not(target_arch = "wasm32"))]
pub struct CallbackDeclaration {
dependent: SmartString<LazyCompact>,
callback: Box<dyn Fn(CallbackOp, NamedRows, NamedRows) + Send + Sync>,
}
pub(crate) type CallbackCollector =
BTreeMap<SmartString<LazyCompact>, Vec<(CallbackOp, NamedRows, NamedRows)>>;
/// The database object of Cozo.
#[derive(Clone)]
pub struct Db<S> {
db: S,
pub(crate) db: S,
temp_db: TempStorage,
relation_store_id: Arc<AtomicU64>,
queries_count: Arc<AtomicU64>,
@ -107,18 +91,13 @@ pub struct Db<S> {
#[cfg(not(target_arch = "wasm32"))]
callback_count: Arc<AtomicU32>,
#[cfg(not(target_arch = "wasm32"))]
callback_sender: Sender<(SmartString<LazyCompact>, CallbackOp, NamedRows, NamedRows)>,
pub(crate) callback_sender:
Sender<(SmartString<LazyCompact>, CallbackOp, NamedRows, NamedRows)>,
#[cfg(not(target_arch = "wasm32"))]
event_callbacks: Arc<ShardedLock<EventCallbackRegistry>>,
pub(crate) event_callbacks: Arc<ShardedLock<EventCallbackRegistry>>,
relation_locks: Arc<ShardedLock<BTreeMap<SmartString<LazyCompact>, Arc<ShardedLock<()>>>>>,
}
#[cfg(not(target_arch = "wasm32"))]
type EventCallbackRegistry = (
BTreeMap<u32, CallbackDeclaration>,
BTreeMap<SmartString<LazyCompact>, BTreeSet<u32>>,
);
impl<S> Debug for Db<S> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "Db")
@ -157,12 +136,6 @@ impl NamedRows {
const STATUS_STR: &str = "status";
const OK_STR: &str = "OK";
enum ControlCode {
Termination(NamedRows),
Break(Option<SmartString<LazyCompact>>, SourceSpan),
Continue(Option<SmartString<LazyCompact>>, SourceSpan),
}
impl<'s, S: Storage<'s>> Db<S> {
/// Create a new database object with the given storage.
/// You must call [`initialize`](Self::initialize) immediately after creation.
@ -553,7 +526,7 @@ impl<'s, S: Storage<'s>> Db<S> {
ret.is_some()
}
fn obtain_relation_locks<'a, T: Iterator<Item = &'a SmartString<LazyCompact>>>(
pub(crate) fn obtain_relation_locks<'a, T: Iterator<Item = &'a SmartString<LazyCompact>>>(
&'s self,
rels: T,
) -> Vec<Arc<ShardedLock<()>>> {
@ -612,44 +585,8 @@ impl<'s, S: Storage<'s>> Db<S> {
};
Ok(ret)
}
fn execute_imperative_condition(
&'s self,
p: &ImperativeCondition,
tx: &mut SessionTx<'_>,
cleanups: &mut Vec<(Vec<u8>, Vec<u8>)>,
cur_vld: ValidityTs,
span: SourceSpan,
callback_targets: &BTreeSet<SmartString<LazyCompact>>,
callback_collector: &mut CallbackCollector,
) -> Result<bool> {
let res = match p {
Left(rel) => {
let relation = tx.get_relation(rel, false)?;
relation.as_named_rows(tx)?
}
Right(p) => self.execute_single_program(
p.clone(),
tx,
cleanups,
cur_vld,
callback_targets,
callback_collector,
)?,
};
Ok(match res.rows.first() {
None => false,
Some(row) => {
if row.is_empty() {
false
} else {
op_to_bool(&row[row.len() - 1..])?
.get_bool()
.ok_or_else(|| PredicateTypeError(span, row.last().cloned().unwrap()))?
}
}
})
}
fn execute_single_program(
pub(crate) fn execute_single_program(
&'s self,
p: InputProgram,
tx: &mut SessionTx<'_>,
@ -670,33 +607,6 @@ impl<'s, S: Storage<'s>> Db<S> {
Ok(q_res)
}
fn current_callback_targets(&self) -> BTreeSet<SmartString<LazyCompact>> {
#[cfg(not(target_arch = "wasm32"))]
{
self.event_callbacks
.read()
.unwrap()
.1
.keys()
.cloned()
.collect()
}
#[cfg(target_arch = "wasm32")]
{
Default::default()
}
}
#[cfg(not(target_arch = "wasm32"))]
fn send_callbacks(&'s self, collector: CallbackCollector) {
for (k, vals) in collector {
for (op, new, old) in vals {
self.callback_sender
.send((k.clone(), op, new, old))
.expect("sending to callback processor failed");
}
}
}
fn do_run_script(
&'s self,
payload: &str,
@ -704,277 +614,61 @@ impl<'s, S: Storage<'s>> Db<S> {
cur_vld: ValidityTs,
) -> Result<NamedRows> {
match parse_script(payload, param_pool, &self.algorithms, cur_vld)? {
CozoScript::Single(p) => {
let mut callback_collector = BTreeMap::new();
let write_lock_names = p.needs_write_lock();
let is_write = write_lock_names.is_some();
let write_lock = self.obtain_relation_locks(write_lock_names.iter());
let _write_lock_guards = if is_write {
Some(write_lock[0].read().unwrap())
} else {
None
};
let callback_targets = if is_write {
self.current_callback_targets()
} else {
Default::default()
};
let mut cleanups = vec![];
let res;
{
let mut tx = if is_write {
self.transact_write()?
} else {
self.transact()?
};
res = self.execute_single_program(
p,
&mut tx,
&mut cleanups,
cur_vld,
&callback_targets,
&mut callback_collector,
)?;
if is_write {
tx.commit_tx()?;
} else {
tx.commit_tx()?;
assert!(cleanups.is_empty(), "non-empty cleanups on read-only tx");
}
}
#[cfg(not(target_arch = "wasm32"))]
if !callback_collector.is_empty() {
self.send_callbacks(callback_collector)
}
for (lower, upper) in cleanups {
self.db.del_range(&lower, &upper)?;
}
Ok(res)
}
CozoScript::Imperative(ps) => {
let mut callback_collector = BTreeMap::new();
let mut write_lock_names = BTreeSet::new();
for p in &ps {
p.needs_write_locks(&mut write_lock_names);
}
let is_write = !write_lock_names.is_empty();
let write_lock = self.obtain_relation_locks(write_lock_names.iter());
let _write_lock_guards = write_lock.iter().map(|l| l.read().unwrap()).collect_vec();
let callback_targets = if is_write {
self.current_callback_targets()
} else {
Default::default()
};
let mut cleanups: Vec<(Vec<u8>, Vec<u8>)> = vec![];
let ret;
{
let mut tx = if is_write {
self.transact_write()?
} else {
self.transact()?
};
match self.execute_imperative_stmts(
&ps,
&mut tx,
&mut cleanups,
cur_vld,
&callback_targets,
&mut callback_collector,
)? {
Left(res) => ret = res,
Right(ctrl) => match ctrl {
ControlCode::Termination(res) => {
ret = res;
}
ControlCode::Break(_, span) | ControlCode::Continue(_, span) => {
#[derive(Debug, Error, Diagnostic)]
#[error("control flow has nowhere to go")]
#[diagnostic(code(eval::dangling_ctrl_flow))]
struct DanglingControlFlow(#[label] SourceSpan);
CozoScript::Single(p) => self.execute_single(cur_vld, p),
CozoScript::Imperative(ps) => self.execute_imperative(cur_vld, &ps),
CozoScript::Sys(op) => self.run_sys_op(op),
}
}
bail!(DanglingControlFlow(span))
}
},
}
fn execute_single(&'s self, cur_vld: ValidityTs, p: InputProgram) -> Result<NamedRows, Report> {
let mut callback_collector = BTreeMap::new();
let write_lock_names = p.needs_write_lock();
let is_write = write_lock_names.is_some();
let write_lock = self.obtain_relation_locks(write_lock_names.iter());
let _write_lock_guards = if is_write {
Some(write_lock[0].read().unwrap())
} else {
None
};
let callback_targets = if is_write {
self.current_callback_targets()
} else {
Default::default()
};
let mut cleanups = vec![];
let res;
{
let mut tx = if is_write {
self.transact_write()?
} else {
self.transact()?
};
if is_write {
tx.commit_tx()?;
} else {
tx.commit_tx()?;
assert!(cleanups.is_empty(), "non-empty cleanups on read-only tx");
}
}
#[cfg(not(target_arch = "wasm32"))]
if !callback_collector.is_empty() {
self.send_callbacks(callback_collector)
}
res = self.execute_single_program(
p,
&mut tx,
&mut cleanups,
cur_vld,
&callback_targets,
&mut callback_collector,
)?;
for (lower, upper) in cleanups {
self.db.del_range(&lower, &upper)?;
}
Ok(ret)
if is_write {
tx.commit_tx()?;
} else {
tx.commit_tx()?;
assert!(cleanups.is_empty(), "non-empty cleanups on read-only tx");
}
CozoScript::Sys(op) => self.run_sys_op(op),
}
}
fn execute_imperative_stmts(
&'s self,
ps: &ImperativeProgram,
tx: &mut SessionTx<'_>,
cleanups: &mut Vec<(Vec<u8>, Vec<u8>)>,
cur_vld: ValidityTs,
callback_targets: &BTreeSet<SmartString<LazyCompact>>,
callback_collector: &mut CallbackCollector,
) -> Result<Either<NamedRows, ControlCode>> {
let mut ret = NamedRows::default();
for p in ps {
match p {
ImperativeStmt::Break { target, span, .. } => {
return Ok(Right(ControlCode::Break(target.clone(), *span)));
}
ImperativeStmt::Continue { target, span, .. } => {
return Ok(Right(ControlCode::Continue(target.clone(), *span)));
}
ImperativeStmt::ReturnNil { .. } => {
return Ok(Right(ControlCode::Termination(NamedRows::default())))
}
ImperativeStmt::ReturnProgram { prog, .. } => {
ret = self.execute_single_program(
prog.clone(),
tx,
cleanups,
cur_vld,
callback_targets,
callback_collector,
)?;
return Ok(Right(ControlCode::Termination(ret)));
}
ImperativeStmt::ReturnTemp { rel, .. } => {
let relation = tx.get_relation(rel, false)?;
return Ok(Right(ControlCode::Termination(relation.as_named_rows(tx)?)));
}
ImperativeStmt::TempDebug { temp, .. } => {
let relation = tx.get_relation(temp, false)?;
println!("{}: {:?}", temp, relation.as_named_rows(tx)?);
ret = NamedRows::default();
}
ImperativeStmt::Program { prog, .. } => {
ret = self.execute_single_program(
prog.clone(),
tx,
cleanups,
cur_vld,
callback_targets,
callback_collector,
)?;
}
ImperativeStmt::IgnoreErrorProgram { prog, .. } => {
match self.execute_single_program(
prog.clone(),
tx,
cleanups,
cur_vld,
callback_targets,
callback_collector,
) {
Ok(res) => ret = res,
Err(_) => {
ret = NamedRows {
headers: vec!["status".to_string()],
rows: vec![vec![DataValue::from("FAILED")]],
}
}
}
}
ImperativeStmt::If {
condition,
then_branch,
else_branch,
span,
negated,
} => {
let cond_val = self.execute_imperative_condition(
condition,
tx,
cleanups,
cur_vld,
*span,
callback_targets,
callback_collector,
)?;
let cond_val = if *negated { !cond_val } else { cond_val };
let to_execute = if cond_val { then_branch } else { else_branch };
match self.execute_imperative_stmts(
to_execute,
tx,
cleanups,
cur_vld,
callback_targets,
callback_collector,
)? {
Left(rows) => {
ret = rows;
}
Right(ctrl) => return Ok(Right(ctrl)),
}
}
ImperativeStmt::Loop { label, body, .. } => {
ret = Default::default();
loop {
match self.execute_imperative_stmts(
body,
tx,
cleanups,
cur_vld,
callback_targets,
callback_collector,
)? {
Left(_) => {}
Right(ctrl) => match ctrl {
ControlCode::Termination(ret) => {
return Ok(Right(ControlCode::Termination(ret)))
}
ControlCode::Break(break_label, span) => {
if break_label.is_none() || break_label == *label {
break;
} else {
return Ok(Right(ControlCode::Break(break_label, span)));
}
}
ControlCode::Continue(cont_label, span) => {
if cont_label.is_none() || cont_label == *label {
continue;
} else {
return Ok(Right(ControlCode::Continue(cont_label, span)));
}
}
},
}
}
}
ImperativeStmt::TempSwap { left, right, .. } => {
tx.rename_temp_relation(
Symbol::new(left.clone(), Default::default()),
Symbol::new(SmartString::from("_*temp*"), Default::default()),
)?;
tx.rename_temp_relation(
Symbol::new(right.clone(), Default::default()),
Symbol::new(left.clone(), Default::default()),
)?;
tx.rename_temp_relation(
Symbol::new(SmartString::from("_*temp*"), Default::default()),
Symbol::new(right.clone(), Default::default()),
)?;
ret = NamedRows::default();
break;
}
}
#[cfg(not(target_arch = "wasm32"))]
if !callback_collector.is_empty() {
self.send_callbacks(callback_collector)
}
for (lower, upper) in cleanups {
self.db.del_range(&lower, &upper)?;
}
Ok(Left(ret))
Ok(res)
}
fn explain_compiled(&self, strata: &[CompiledProgram]) -> Result<NamedRows> {
let mut ret: Vec<JsonValue> = vec![];

@ -0,0 +1,288 @@
/*
* Copyright 2022, The Cozo Project Authors.
*
* This Source Code Form is subject to the terms of the Mozilla Public License, v. 2.0.
* If a copy of the MPL was not distributed with this file,
* You can obtain one at https://mozilla.org/MPL/2.0/.
*/
use std::collections::{BTreeMap, BTreeSet};
use either::{Either, Left, Right};
use itertools::Itertools;
use smartstring::{LazyCompact, SmartString};
use crate::parse::{ImperativeCondition, ImperativeProgram, ImperativeStmt, SourceSpan};
use crate::{DataValue, Db, NamedRows, Storage, ValidityTs};
use crate::data::expr::PredicateTypeError;
use crate::data::functions::op_to_bool;
use crate::runtime::callback::CallbackCollector;
use crate::runtime::transact::SessionTx;
use miette::{bail, Report, Result, Diagnostic};
use crate::data::symb::Symbol;
use thiserror::Error;
enum ControlCode {
Termination(NamedRows),
Break(Option<SmartString<LazyCompact>>, SourceSpan),
Continue(Option<SmartString<LazyCompact>>, SourceSpan),
}
impl<'s, S: Storage<'s>> Db<S> {
fn execute_imperative_condition(
&'s self,
p: &ImperativeCondition,
tx: &mut SessionTx<'_>,
cleanups: &mut Vec<(Vec<u8>, Vec<u8>)>,
cur_vld: ValidityTs,
span: SourceSpan,
callback_targets: &BTreeSet<SmartString<LazyCompact>>,
callback_collector: &mut CallbackCollector,
) -> Result<bool> {
let res = match p {
Left(rel) => {
let relation = tx.get_relation(rel, false)?;
relation.as_named_rows(tx)?
}
Right(p) => self.execute_single_program(
p.clone(),
tx,
cleanups,
cur_vld,
callback_targets,
callback_collector,
)?,
};
Ok(match res.rows.first() {
None => false,
Some(row) => {
if row.is_empty() {
false
} else {
op_to_bool(&row[row.len() - 1..])?
.get_bool()
.ok_or_else(|| PredicateTypeError(span, row.last().cloned().unwrap()))?
}
}
})
}
fn execute_imperative_stmts(
&'s self,
ps: &ImperativeProgram,
tx: &mut SessionTx<'_>,
cleanups: &mut Vec<(Vec<u8>, Vec<u8>)>,
cur_vld: ValidityTs,
callback_targets: &BTreeSet<SmartString<LazyCompact>>,
callback_collector: &mut CallbackCollector,
) -> Result<Either<NamedRows, ControlCode>> {
let mut ret = NamedRows::default();
for p in ps {
match p {
ImperativeStmt::Break { target, span, .. } => {
return Ok(Right(ControlCode::Break(target.clone(), *span)));
}
ImperativeStmt::Continue { target, span, .. } => {
return Ok(Right(ControlCode::Continue(target.clone(), *span)));
}
ImperativeStmt::ReturnNil { .. } => {
return Ok(Right(ControlCode::Termination(NamedRows::default())))
}
ImperativeStmt::ReturnProgram { prog, .. } => {
ret = self.execute_single_program(
prog.clone(),
tx,
cleanups,
cur_vld,
callback_targets,
callback_collector,
)?;
return Ok(Right(ControlCode::Termination(ret)));
}
ImperativeStmt::ReturnTemp { rel, .. } => {
let relation = tx.get_relation(rel, false)?;
return Ok(Right(ControlCode::Termination(relation.as_named_rows(tx)?)));
}
ImperativeStmt::TempDebug { temp, .. } => {
let relation = tx.get_relation(temp, false)?;
println!("{}: {:?}", temp, relation.as_named_rows(tx)?);
ret = NamedRows::default();
}
ImperativeStmt::Program { prog, .. } => {
ret = self.execute_single_program(
prog.clone(),
tx,
cleanups,
cur_vld,
callback_targets,
callback_collector,
)?;
}
ImperativeStmt::IgnoreErrorProgram { prog, .. } => {
match self.execute_single_program(
prog.clone(),
tx,
cleanups,
cur_vld,
callback_targets,
callback_collector,
) {
Ok(res) => ret = res,
Err(_) => {
ret = NamedRows {
headers: vec!["status".to_string()],
rows: vec![vec![DataValue::from("FAILED")]],
}
}
}
}
ImperativeStmt::If {
condition,
then_branch,
else_branch,
span,
negated,
} => {
let cond_val = self.execute_imperative_condition(
condition,
tx,
cleanups,
cur_vld,
*span,
callback_targets,
callback_collector,
)?;
let cond_val = if *negated { !cond_val } else { cond_val };
let to_execute = if cond_val { then_branch } else { else_branch };
match self.execute_imperative_stmts(
to_execute,
tx,
cleanups,
cur_vld,
callback_targets,
callback_collector,
)? {
Left(rows) => {
ret = rows;
}
Right(ctrl) => return Ok(Right(ctrl)),
}
}
ImperativeStmt::Loop { label, body, .. } => {
ret = Default::default();
loop {
match self.execute_imperative_stmts(
body,
tx,
cleanups,
cur_vld,
callback_targets,
callback_collector,
)? {
Left(_) => {}
Right(ctrl) => match ctrl {
ControlCode::Termination(ret) => {
return Ok(Right(ControlCode::Termination(ret)))
}
ControlCode::Break(break_label, span) => {
if break_label.is_none() || break_label == *label {
break;
} else {
return Ok(Right(ControlCode::Break(break_label, span)));
}
}
ControlCode::Continue(cont_label, span) => {
if cont_label.is_none() || cont_label == *label {
continue;
} else {
return Ok(Right(ControlCode::Continue(cont_label, span)));
}
}
},
}
}
}
ImperativeStmt::TempSwap { left, right, .. } => {
tx.rename_temp_relation(
Symbol::new(left.clone(), Default::default()),
Symbol::new(SmartString::from("_*temp*"), Default::default()),
)?;
tx.rename_temp_relation(
Symbol::new(right.clone(), Default::default()),
Symbol::new(left.clone(), Default::default()),
)?;
tx.rename_temp_relation(
Symbol::new(SmartString::from("_*temp*"), Default::default()),
Symbol::new(right.clone(), Default::default()),
)?;
ret = NamedRows::default();
break;
}
}
}
Ok(Left(ret))
}
pub(crate) fn execute_imperative(&'s self, cur_vld: ValidityTs, ps: &ImperativeProgram) -> Result<NamedRows, Report> {
let mut callback_collector = BTreeMap::new();
let mut write_lock_names = BTreeSet::new();
for p in ps {
p.needs_write_locks(&mut write_lock_names);
}
let is_write = !write_lock_names.is_empty();
let write_lock = self.obtain_relation_locks(write_lock_names.iter());
let _write_lock_guards = write_lock.iter().map(|l| l.read().unwrap()).collect_vec();
let callback_targets = if is_write {
self.current_callback_targets()
} else {
Default::default()
};
let mut cleanups: Vec<(Vec<u8>, Vec<u8>)> = vec![];
let ret;
{
let mut tx = if is_write {
self.transact_write()?
} else {
self.transact()?
};
match self.execute_imperative_stmts(
&ps,
&mut tx,
&mut cleanups,
cur_vld,
&callback_targets,
&mut callback_collector,
)? {
Left(res) => ret = res,
Right(ctrl) => match ctrl {
ControlCode::Termination(res) => {
ret = res;
}
ControlCode::Break(_, span) | ControlCode::Continue(_, span) => {
#[derive(Debug, Error, Diagnostic)]
#[error("control flow has nowhere to go")]
#[diagnostic(code(eval::dangling_ctrl_flow))]
struct DanglingControlFlow(#[label] SourceSpan);
bail!(DanglingControlFlow(span))
}
},
}
if is_write {
tx.commit_tx()?;
} else {
tx.commit_tx()?;
assert!(cleanups.is_empty(), "non-empty cleanups on read-only tx");
}
}
#[cfg(not(target_arch = "wasm32"))]
if !callback_collector.is_empty() {
self.send_callbacks(callback_collector)
}
for (lower, upper) in cleanups {
self.db.del_range(&lower, &upper)?;
}
Ok(ret)
}
}

@ -10,5 +10,7 @@ pub(crate) mod db;
pub(crate) mod transact;
pub(crate) mod relation;
pub(crate) mod temp_store;
pub(crate) mod callback;
pub(crate) mod imperative;
#[cfg(test)]
mod tests;

@ -21,8 +21,9 @@ use crate::data::symb::Symbol;
use crate::data::value::DataValue;
use crate::fixed_rule::FixedRulePayload;
use crate::parse::SourceSpan;
use crate::runtime::db::{CallbackOp, Poison};
use crate::{new_cozo_mem, FixedRule, RegularTempStore};
use crate::runtime::callback::CallbackOp;
use crate::runtime::db::Poison;
#[test]
fn test_limit_offset() {

Loading…
Cancel
Save