implemented index

main
Ziyang Hu 2 years ago
parent a375eff792
commit a6a9419a35

@ -183,6 +183,10 @@ impl TempSymbGen {
self.last_id += 1; self.last_id += 1;
Symbol::new(&format!("*{}", self.last_id) as &str, span) Symbol::new(&format!("*{}", self.last_id) as &str, span)
} }
pub(crate) fn next_ignored(&mut self, span: SourceSpan) -> Symbol {
self.last_id += 1;
Symbol::new(&format!("~{}", self.last_id) as &str, span)
}
} }
#[derive(Debug, Clone)] #[derive(Debug, Clone)]

@ -87,6 +87,9 @@ impl Symbol {
pub(crate) fn is_ignored_symbol(&self) -> bool { pub(crate) fn is_ignored_symbol(&self) -> bool {
self.name == "_" self.name == "_"
} }
pub(crate) fn is_generated_ignored_symbol(&self) -> bool {
self.name.starts_with('~')
}
pub(crate) fn ensure_valid_field(&self) -> Result<()> { pub(crate) fn ensure_valid_field(&self) -> Result<()> {
if self.name.contains('(') || self.name.contains(')') { if self.name.contains('(') || self.name.contains(')') {
#[derive(Debug, Error, Diagnostic)] #[derive(Debug, Error, Diagnostic)]

@ -95,6 +95,13 @@ struct RuleNotFound(String, #[label] SourceSpan);
#[diagnostic(help("Required arity: {1}, number of arguments given: {2}"))] #[diagnostic(help("Required arity: {1}, number of arguments given: {2}"))]
struct ArityMismatch(String, usize, usize, #[label] SourceSpan); struct ArityMismatch(String, usize, usize, #[label] SourceSpan);
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub(crate) enum IndexPositionUse {
Join,
BindForLater,
Ignored,
}
impl<'a> SessionTx<'a> { impl<'a> SessionTx<'a> {
pub(crate) fn stratified_magic_compile( pub(crate) fn stratified_magic_compile(
&mut self, &mut self,
@ -221,26 +228,126 @@ impl<'a> SessionTx<'a> {
rel_app.span rel_app.span
) )
); );
// already existing vars
let mut prev_joiner_vars = vec![]; let mut prev_joiner_vars = vec![];
// vars introduced by right and joined
let mut right_joiner_vars = vec![]; let mut right_joiner_vars = vec![];
// used to split in case we need to join again
let mut right_joiner_vars_pos = vec![];
// vars introduced by right, regardless of joining
let mut right_vars = vec![]; let mut right_vars = vec![];
// used for choosing indices
let mut join_indices = vec![];
for var in &rel_app.args { for (i, var) in rel_app.args.iter().enumerate() {
if seen_variables.contains(var) { if seen_variables.contains(var) {
prev_joiner_vars.push(var.clone()); prev_joiner_vars.push(var.clone());
let rk = gen_symb(var.span); let rk = gen_symb(var.span);
right_vars.push(rk.clone()); right_vars.push(rk.clone());
right_joiner_vars.push(rk); right_joiner_vars.push(rk);
right_joiner_vars_pos.push(i);
join_indices.push(IndexPositionUse::Join)
} else { } else {
seen_variables.insert(var.clone()); seen_variables.insert(var.clone());
right_vars.push(var.clone()); right_vars.push(var.clone());
if var.is_generated_ignored_symbol() {
join_indices.push(IndexPositionUse::Ignored)
} else {
join_indices.push(IndexPositionUse::BindForLater)
}
} }
} }
let right = let chosen_index =
RelAlgebra::relation(right_vars, store, rel_app.span, rel_app.valid_at)?; store.choose_index(&join_indices, rel_app.valid_at.is_some());
debug_assert_eq!(prev_joiner_vars.len(), right_joiner_vars.len());
ret = ret.join(right, prev_joiner_vars, right_joiner_vars, rel_app.span); match chosen_index {
None => {
// scan original relation
let right = RelAlgebra::relation(
right_vars,
store,
rel_app.span,
rel_app.valid_at,
)?;
debug_assert_eq!(prev_joiner_vars.len(), right_joiner_vars.len());
ret =
ret.join(right, prev_joiner_vars, right_joiner_vars, rel_app.span);
}
Some((chosen_index, mapper, false)) => {
// index-only
let new_right_vars = mapper
.into_iter()
.map(|i| right_vars[i].clone())
.collect_vec();
let right = RelAlgebra::relation(
new_right_vars,
chosen_index,
rel_app.span,
rel_app.valid_at,
)?;
debug_assert_eq!(prev_joiner_vars.len(), right_joiner_vars.len());
ret =
ret.join(right, prev_joiner_vars, right_joiner_vars, rel_app.span);
}
Some((chosen_index, mapper, true)) => {
// index-with-join
let mut prev_joiner_first_vars = vec![];
let mut middle_joiner_left_vars = vec![];
let mut middle_vars = vec![];
for i in mapper.iter() {
let tv = gen_symb(right_vars[*i].span);
if let Some(j) = right_joiner_vars_pos.iter().position(|el| el == i)
{
prev_joiner_first_vars.push(prev_joiner_vars[j].clone());
middle_joiner_left_vars.push(tv.clone());
}
middle_vars.push(tv);
}
let middle_joiner_right_vars = mapper
.iter()
.enumerate()
.filter_map(|(idx, orig_idx)| {
if *orig_idx < store.metadata.keys.len() {
Some(middle_vars[idx].clone())
} else {
None
}
})
.collect_vec();
let final_joiner_vars = right_vars
.iter()
.take(store.metadata.keys.len())
.cloned()
.collect_vec();
let middle = RelAlgebra::relation(
middle_vars,
chosen_index,
rel_app.span,
rel_app.valid_at,
)?;
ret = ret.join(
middle,
prev_joiner_first_vars,
middle_joiner_left_vars,
rel_app.span,
);
let final_alg = RelAlgebra::relation(
right_vars,
store,
rel_app.span,
rel_app.valid_at,
)?;
ret = ret.join(
final_alg,
middle_joiner_right_vars,
final_joiner_vars,
rel_app.span,
);
}
}
} }
MagicAtom::NegatedRule(rule_app) => { MagicAtom::NegatedRule(rule_app) => {
let store_arity = store_arities.get(&rule_app.name).ok_or_else(|| { let store_arity = store_arities.get(&rule_app.name).ok_or_else(|| {
@ -279,46 +386,88 @@ impl<'a> SessionTx<'a> {
debug_assert_eq!(prev_joiner_vars.len(), right_joiner_vars.len()); debug_assert_eq!(prev_joiner_vars.len(), right_joiner_vars.len());
ret = ret.neg_join(right, prev_joiner_vars, right_joiner_vars, rule_app.span); ret = ret.neg_join(right, prev_joiner_vars, right_joiner_vars, rule_app.span);
} }
MagicAtom::NegatedRelation(relation_app) => { MagicAtom::NegatedRelation(rel_app) => {
let store = self.get_relation(&relation_app.name, false)?; let store = self.get_relation(&rel_app.name, false)?;
ensure!( ensure!(
store.arity() == relation_app.args.len(), store.arity() == rel_app.args.len(),
ArityMismatch( ArityMismatch(
relation_app.name.to_string(), rel_app.name.to_string(),
store.arity(), store.arity(),
relation_app.args.len(), rel_app.args.len(),
relation_app.span rel_app.span
) )
); );
// already existing vars
let mut prev_joiner_vars = vec![]; let mut prev_joiner_vars = vec![];
// vars introduced by right and joined
let mut right_joiner_vars = vec![]; let mut right_joiner_vars = vec![];
// used to split in case we need to join again
let mut right_joiner_vars_pos = vec![];
// vars introduced by right, regardless of joining
let mut right_vars = vec![]; let mut right_vars = vec![];
// used for choosing indices
let mut join_indices = vec![];
for var in &relation_app.args { for (i, var) in rel_app.args.iter().enumerate() {
if seen_variables.contains(var) { if seen_variables.contains(var) {
prev_joiner_vars.push(var.clone()); prev_joiner_vars.push(var.clone());
let rk = gen_symb(var.span); let rk = gen_symb(var.span);
right_vars.push(rk.clone()); right_vars.push(rk.clone());
right_joiner_vars.push(rk); right_joiner_vars.push(rk);
right_joiner_vars_pos.push(i);
join_indices.push(IndexPositionUse::Join)
} else { } else {
seen_variables.insert(var.clone());
right_vars.push(var.clone()); right_vars.push(var.clone());
if var.is_generated_ignored_symbol() {
join_indices.push(IndexPositionUse::Ignored)
} else {
join_indices.push(IndexPositionUse::BindForLater)
}
} }
} }
let right = RelAlgebra::relation( let chosen_index =
right_vars, store.choose_index(&join_indices, rel_app.valid_at.is_some());
store,
relation_app.span, match chosen_index {
relation_app.valid_at, None | Some((_, _, true)) => {
)?; let right = RelAlgebra::relation(
debug_assert_eq!(prev_joiner_vars.len(), right_joiner_vars.len()); right_vars,
ret = ret.neg_join( store,
right, rel_app.span,
prev_joiner_vars, rel_app.valid_at,
right_joiner_vars, )?;
relation_app.span, debug_assert_eq!(prev_joiner_vars.len(), right_joiner_vars.len());
); ret = ret.neg_join(
right,
prev_joiner_vars,
right_joiner_vars,
rel_app.span,
);
}
Some((chosen_index, mapper, false)) => {
// index-only
let new_right_vars = mapper
.into_iter()
.map(|i| right_vars[i].clone())
.collect_vec();
let right = RelAlgebra::relation(
new_right_vars,
chosen_index,
rel_app.span,
rel_app.valid_at,
)?;
debug_assert_eq!(prev_joiner_vars.len(), right_joiner_vars.len());
ret = ret.neg_join(
right,
prev_joiner_vars,
right_joiner_vars,
rel_app.span,
);
}
}
} }
MagicAtom::Predicate(p) => { MagicAtom::Predicate(p) => {
ret = ret.filter(p.clone()); ret = ret.filter(p.clone());

@ -163,7 +163,7 @@ impl InputAtom {
.chain(stored.metadata.non_keys.iter()) .chain(stored.metadata.non_keys.iter())
{ {
let arg = args.remove(&col_def.name).unwrap_or_else(|| Expr::Binding { let arg = args.remove(&col_def.name).unwrap_or_else(|| Expr::Binding {
var: gen.next(span), var: gen.next_ignored(span),
tuple_pos: None, tuple_pos: None,
}); });
new_args.push(arg) new_args.push(arg)
@ -238,7 +238,7 @@ impl InputRuleApplyAtom {
match arg { match arg {
Expr::Binding { var, .. } => { Expr::Binding { var, .. } => {
if var.is_ignored_symbol() { if var.is_ignored_symbol() {
let dup = gen.next(var.span); let dup = gen.next_ignored(var.span);
args.push(dup); args.push(dup);
} else if seen_variables.insert(var.clone()) { } else if seen_variables.insert(var.clone()) {
args.push(var); args.push(var);
@ -298,7 +298,7 @@ impl InputRelationApplyAtom {
match arg { match arg {
Expr::Binding { var, .. } => { Expr::Binding { var, .. } => {
if var.is_ignored_symbol() { if var.is_ignored_symbol() {
args.push(gen.next(var.span)); args.push(gen.next_ignored(var.span));
} else if seen_variables.insert(var.clone()) { } else if seen_variables.insert(var.clone()) {
args.push(var); args.push(var);
} else { } else {

@ -18,16 +18,18 @@ use crate::data::expr::Expr;
use crate::data::program::{FixedRuleApply, InputInlineRulesOrFixed, InputProgram, RelationOp}; use crate::data::program::{FixedRuleApply, InputInlineRulesOrFixed, InputProgram, RelationOp};
use crate::data::relation::{ColumnDef, NullableColType}; use crate::data::relation::{ColumnDef, NullableColType};
use crate::data::symb::Symbol; use crate::data::symb::Symbol;
use crate::data::tuple::{Tuple, ENCODED_KEY_MIN_LEN}; use crate::data::tuple::Tuple;
use crate::data::value::{DataValue, ValidityTs}; use crate::data::value::{DataValue, ValidityTs};
use crate::fixed_rule::utilities::constant::Constant; use crate::fixed_rule::utilities::constant::Constant;
use crate::fixed_rule::FixedRuleHandle; use crate::fixed_rule::FixedRuleHandle;
use crate::parse::parse_script; use crate::parse::parse_script;
use crate::runtime::db::{CallbackCollector, CallbackOp}; use crate::runtime::db::{CallbackCollector, CallbackOp};
use crate::runtime::relation::{AccessLevel, extend_tuple_from_v, InputRelationHandle, InsufficientAccessLevel}; use crate::runtime::relation::{
extend_tuple_from_v, AccessLevel, InputRelationHandle, InsufficientAccessLevel,
};
use crate::runtime::transact::SessionTx; use crate::runtime::transact::SessionTx;
use crate::storage::Storage; use crate::storage::Storage;
use crate::{Db, decode_tuple_from_kv, NamedRows, StoreTx}; use crate::{Db, NamedRows, StoreTx};
#[derive(Debug, Error, Diagnostic)] #[derive(Debug, Error, Diagnostic)]
#[error("attempting to write into relation {0} of arity {1} with data of arity {2}")] #[error("attempting to write into relation {0} of arity {1} with data of arity {2}")]
@ -56,7 +58,6 @@ impl<'a> SessionTx<'a> {
#[diagnostic(code(eval::replace_in_trigger))] #[diagnostic(code(eval::replace_in_trigger))]
struct ReplaceInTrigger(String); struct ReplaceInTrigger(String);
bail!(ReplaceInTrigger(meta.name.to_string())) bail!(ReplaceInTrigger(meta.name.to_string()))
} }
if let Ok(old_handle) = self.get_relation(&meta.name, true) { if let Ok(old_handle) = self.get_relation(&meta.name, true) {
if !old_handle.indices.is_empty() { if !old_handle.indices.is_empty() {
@ -82,7 +83,14 @@ impl<'a> SessionTx<'a> {
.get_single_program()?; .get_single_program()?;
let (_, cleanups) = db let (_, cleanups) = db
.run_query(self, program, cur_vld, callback_targets, callback_collector, false) .run_query(
self,
program,
cur_vld,
callback_targets,
callback_collector,
false,
)
.map_err(|err| { .map_err(|err| {
if err.source_code().is_some() { if err.source_code().is_some() {
err err
@ -194,9 +202,13 @@ impl<'a> SessionTx<'a> {
if propagate_triggers { if propagate_triggers {
for trigger in &relation_store.rm_triggers { for trigger in &relation_store.rm_triggers {
let mut program = let mut program = parse_script(
parse_script(trigger, &Default::default(), &db.algorithms, cur_vld)? trigger,
.get_single_program()?; &Default::default(),
&db.algorithms,
cur_vld,
)?
.get_single_program()?;
make_const_rule( make_const_rule(
&mut program, &mut program,
@ -213,7 +225,14 @@ impl<'a> SessionTx<'a> {
); );
let (_, cleanups) = db let (_, cleanups) = db
.run_query(self, program, cur_vld, callback_targets, callback_collector, false) .run_query(
self,
program,
cur_vld,
callback_targets,
callback_collector,
false,
)
.map_err(|err| { .map_err(|err| {
if err.source_code().is_some() { if err.source_code().is_some() {
err err
@ -402,29 +421,24 @@ impl<'a> SessionTx<'a> {
if !existing.is_empty() { if !existing.is_empty() {
extend_tuple_from_v(&mut tup, &existing); extend_tuple_from_v(&mut tup, &existing);
} }
if has_indices { if has_indices && extracted != tup {
if extracted != tup { for (idx_rel, extractor) in relation_store.indices.values() {
for (idx_rel, extractor) in relation_store.indices.values() { let idx_tup_old =
let idx_tup_old = extractor.iter().map(|i| tup[*i].clone()).collect_vec();
extractor.iter().map(|i| tup[*i].clone()).collect_vec(); let encoded_old = idx_rel
let encoded_old = idx_rel.encode_key_for_store( .encode_key_for_store(&idx_tup_old, Default::default())?;
&idx_tup_old, self.store_tx.del(&encoded_old)?;
Default::default(),
)?; let idx_tup_new = extractor
self.store_tx.del(&encoded_old)?; .iter()
.map(|i| extracted[*i].clone())
let idx_tup_new = extractor .collect_vec();
.iter() let encoded_new = idx_rel
.map(|i| extracted[*i].clone()) .encode_key_for_store(&idx_tup_new, Default::default())?;
.collect_vec(); self.store_tx.put(&encoded_new, &[])?;
let encoded_new = idx_rel.encode_key_for_store(
&idx_tup_new,
Default::default(),
)?;
self.store_tx.put(&encoded_new, &[])?;
}
} }
} }
if need_to_collect { if need_to_collect {
old_tuples.push(DataValue::List(tup)); old_tuples.push(DataValue::List(tup));
} }
@ -469,9 +483,13 @@ impl<'a> SessionTx<'a> {
let kv_bindings = bindings; let kv_bindings = bindings;
if propagate_triggers { if propagate_triggers {
for trigger in &relation_store.put_triggers { for trigger in &relation_store.put_triggers {
let mut program = let mut program = parse_script(
parse_script(trigger, &Default::default(), &db.algorithms, cur_vld)? trigger,
.get_single_program()?; &Default::default(),
&db.algorithms,
cur_vld,
)?
.get_single_program()?;
make_const_rule( make_const_rule(
&mut program, &mut program,
@ -487,7 +505,14 @@ impl<'a> SessionTx<'a> {
); );
let (_, cleanups) = db let (_, cleanups) = db
.run_query(self, program, cur_vld, callback_targets, callback_collector, false) .run_query(
self,
program,
cur_vld,
callback_targets,
callback_collector,
false,
)
.map_err(|err| { .map_err(|err| {
if err.source_code().is_some() { if err.source_code().is_some() {
err err

@ -89,10 +89,8 @@ pub struct CallbackDeclaration {
callback: Box<dyn Fn(CallbackOp, NamedRows, NamedRows) + Send + Sync>, callback: Box<dyn Fn(CallbackOp, NamedRows, NamedRows) + Send + Sync>,
} }
pub(crate) type CallbackCollector = BTreeMap< pub(crate) type CallbackCollector =
SmartString<LazyCompact>, BTreeMap<SmartString<LazyCompact>, Vec<(CallbackOp, NamedRows, NamedRows)>>;
Vec<(CallbackOp, NamedRows, NamedRows)>,
>;
/// The database object of Cozo. /// The database object of Cozo.
#[derive(Clone)] #[derive(Clone)]
@ -140,11 +138,7 @@ impl NamedRows {
let rows = self let rows = self
.rows .rows
.into_iter() .into_iter()
.map(|row| { .map(|row| row.into_iter().map(JsonValue::from).collect::<JsonValue>())
row.into_iter()
.map(|val| JsonValue::from(val))
.collect::<JsonValue>()
})
.collect::<JsonValue>(); .collect::<JsonValue>();
json!({ json!({
"headers": self.headers, "headers": self.headers,
@ -276,7 +270,7 @@ impl<'s, S: Storage<'s>> Db<S> {
#[diagnostic(code(import::bad_data))] #[diagnostic(code(import::bad_data))]
struct BadDataForRelation(String, JsonValue); struct BadDataForRelation(String, JsonValue);
let rel_names = data.keys().map(|k| SmartString::from(k)).collect_vec(); let rel_names = data.keys().map(SmartString::from).collect_vec();
let locks = self.obtain_relation_locks(rel_names.iter()); let locks = self.obtain_relation_locks(rel_names.iter());
let _guards = locks.iter().map(|l| l.read().unwrap()).collect_vec(); let _guards = locks.iter().map(|l| l.read().unwrap()).collect_vec();
@ -441,7 +435,7 @@ impl<'s, S: Storage<'s>> Db<S> {
#[cfg(feature = "storage-sqlite")] #[cfg(feature = "storage-sqlite")]
{ {
let rel_names = relations.iter().map(|n| SmartString::from(n)).collect_vec(); let rel_names = relations.iter().map(SmartString::from).collect_vec();
let locks = self.obtain_relation_locks(rel_names.iter()); let locks = self.obtain_relation_locks(rel_names.iter());
let _guards = locks.iter().map(|l| l.read().unwrap()).collect_vec(); let _guards = locks.iter().map(|l| l.read().unwrap()).collect_vec();
@ -569,7 +563,7 @@ impl<'s, S: Storage<'s>> Db<S> {
collected.push(lock); collected.push(lock);
} }
} }
return collected; collected
} }
fn compact_relation(&'s self) -> Result<()> { fn compact_relation(&'s self) -> Result<()> {
@ -670,10 +664,7 @@ impl<'s, S: Storage<'s>> Db<S> {
.cloned() .cloned()
.collect() .collect()
} }
fn send_callbacks( fn send_callbacks(&'s self, collector: CallbackCollector) {
&'s self,
collector: CallbackCollector,
) {
for (k, vals) in collector { for (k, vals) in collector {
for (op, new, old) in vals { for (op, new, old) in vals {
self.callback_sender self.callback_sender
@ -957,7 +948,7 @@ impl<'s, S: Storage<'s>> Db<S> {
} }
} }
} }
return Ok(Left(ret)); Ok(Left(ret))
} }
fn explain_compiled(&self, strata: &[CompiledProgram]) -> Result<NamedRows> { fn explain_compiled(&self, strata: &[CompiledProgram]) -> Result<NamedRows> {
let mut ret: Vec<JsonValue> = vec![]; let mut ret: Vec<JsonValue> = vec![];
@ -1192,7 +1183,10 @@ impl<'s, S: Storage<'s>> Db<S> {
}) })
} }
SysOp::CreateIndex(rel_name, idx_name, cols) => { SysOp::CreateIndex(rel_name, idx_name, cols) => {
let lock = self.obtain_relation_locks(iter::once(&rel_name.name)).pop().unwrap(); let lock = self
.obtain_relation_locks(iter::once(&rel_name.name))
.pop()
.unwrap();
let _guard = lock.write().unwrap(); let _guard = lock.write().unwrap();
let mut tx = self.transact_write()?; let mut tx = self.transact_write()?;
tx.create_index(&rel_name, &idx_name, cols)?; tx.create_index(&rel_name, &idx_name, cols)?;
@ -1203,7 +1197,10 @@ impl<'s, S: Storage<'s>> Db<S> {
}) })
} }
SysOp::RemoveIndex(rel_name, idx_name) => { SysOp::RemoveIndex(rel_name, idx_name) => {
let lock = self.obtain_relation_locks(iter::once(&rel_name.name)).pop().unwrap(); let lock = self
.obtain_relation_locks(iter::once(&rel_name.name))
.pop()
.unwrap();
let _guard = lock.read().unwrap(); let _guard = lock.read().unwrap();
let mut tx = self.transact_write()?; let mut tx = self.transact_write()?;
tx.remove_index(&rel_name, &idx_name)?; tx.remove_index(&rel_name, &idx_name)?;
@ -1260,11 +1257,7 @@ impl<'s, S: Storage<'s>> Db<S> {
} }
let rows = rows let rows = rows
.into_iter() .into_iter()
.map(|row| { .map(|row| row.into_iter().map(DataValue::from).collect_vec())
row.into_iter()
.map(|val| DataValue::from(val))
.collect_vec()
})
.collect_vec(); .collect_vec();
tx.commit_tx()?; tx.commit_tx()?;
Ok(NamedRows { Ok(NamedRows {
@ -1302,7 +1295,7 @@ impl<'s, S: Storage<'s>> Db<S> {
cur_vld: ValidityTs, cur_vld: ValidityTs,
callback_targets: &BTreeSet<SmartString<LazyCompact>>, callback_targets: &BTreeSet<SmartString<LazyCompact>>,
callback_collector: &mut CallbackCollector, callback_collector: &mut CallbackCollector,
top_level: bool top_level: bool,
) -> Result<(NamedRows, Vec<(Vec<u8>, Vec<u8>)>)> { ) -> Result<(NamedRows, Vec<(Vec<u8>, Vec<u8>)>)> {
// cleanups contain stored relations that should be deleted at the end of query // cleanups contain stored relations that should be deleted at the end of query
let mut clean_ups = vec![]; let mut clean_ups = vec![];
@ -1447,7 +1440,7 @@ impl<'s, S: Storage<'s>> Db<S> {
cur_vld, cur_vld,
callback_targets, callback_targets,
callback_collector, callback_collector,
top_level top_level,
) )
.wrap_err_with(|| format!("when executing against relation '{}'", meta.name))?; .wrap_err_with(|| format!("when executing against relation '{}'", meta.name))?;
clean_ups.extend(to_clear); clean_ups.extend(to_clear);
@ -1502,7 +1495,7 @@ impl<'s, S: Storage<'s>> Db<S> {
cur_vld, cur_vld,
callback_targets, callback_targets,
callback_collector, callback_collector,
top_level top_level,
) )
.wrap_err_with(|| format!("when executing against relation '{}'", meta.name))?; .wrap_err_with(|| format!("when executing against relation '{}'", meta.name))?;
clean_ups.extend(to_clear); clean_ups.extend(to_clear);
@ -1575,11 +1568,7 @@ impl<'s, S: Storage<'s>> Db<S> {
tx.commit_tx()?; tx.commit_tx()?;
let rows = rows let rows = rows
.into_iter() .into_iter()
.map(|row| { .map(|row| row.into_iter().map(DataValue::from).collect_vec())
row.into_iter()
.map(|val| DataValue::from(val))
.collect_vec()
})
.collect_vec(); .collect_vec();
Ok(NamedRows { Ok(NamedRows {
headers: vec![ headers: vec![
@ -1622,11 +1611,7 @@ impl<'s, S: Storage<'s>> Db<S> {
} }
let rows = rows let rows = rows
.into_iter() .into_iter()
.map(|row| { .map(|row| row.into_iter().map(DataValue::from).collect_vec())
row.into_iter()
.map(|val| DataValue::from(val))
.collect_vec()
})
.collect_vec(); .collect_vec();
Ok(NamedRows { Ok(NamedRows {
headers: vec![ headers: vec![

@ -24,6 +24,7 @@ use crate::data::symb::Symbol;
use crate::data::tuple::{decode_tuple_from_key, Tuple, TupleT, ENCODED_KEY_MIN_LEN}; use crate::data::tuple::{decode_tuple_from_key, Tuple, TupleT, ENCODED_KEY_MIN_LEN};
use crate::data::value::{DataValue, ValidityTs}; use crate::data::value::{DataValue, ValidityTs};
use crate::parse::SourceSpan; use crate::parse::SourceSpan;
use crate::query::compile::IndexPositionUse;
use crate::runtime::transact::SessionTx; use crate::runtime::transact::SessionTx;
use crate::{NamedRows, StoreTx}; use crate::{NamedRows, StoreTx};
@ -151,6 +152,65 @@ impl RelationHandle {
let prefix_bytes = self.id.0.to_be_bytes(); let prefix_bytes = self.id.0.to_be_bytes();
data[0..8].copy_from_slice(&prefix_bytes); data[0..8].copy_from_slice(&prefix_bytes);
} }
pub(crate) fn choose_index(
&self,
arg_uses: &[IndexPositionUse],
validity_query: bool,
) -> Option<(RelationHandle, Vec<usize>, bool)> {
if self.indices.is_empty() {
return None;
}
let mut max_prefix_len = 0;
let key_len = self.metadata.keys.len();
for arg_use in arg_uses {
if *arg_use == IndexPositionUse::Join {
max_prefix_len += 1;
if max_prefix_len == key_len {
break;
}
} else {
break;
}
}
let required_positions = arg_uses
.iter()
.enumerate()
.filter_map(|(i, pos_use)| {
if *pos_use != IndexPositionUse::Ignored {
Some(i)
} else {
None
}
})
.collect_vec();
let mut chosen = None;
for (manifest, mapper) in self.indices.values() {
if validity_query && *mapper.last().unwrap() != self.metadata.keys.len() - 1 {
continue;
}
let mut cur_prefix_len = 0;
for i in mapper {
if arg_uses[*i] == IndexPositionUse::Join {
cur_prefix_len += 1;
} else {
break;
}
}
if cur_prefix_len > max_prefix_len {
max_prefix_len = cur_prefix_len;
let mut need_join = false;
for need_pos in required_positions.iter() {
if !mapper.contains(need_pos) {
need_join = true;
break;
}
}
chosen = Some((manifest.clone(), mapper.clone(), need_join))
}
}
chosen
}
pub(crate) fn encode_key_for_store(&self, tuple: &Tuple, span: SourceSpan) -> Result<Vec<u8>> { pub(crate) fn encode_key_for_store(&self, tuple: &Tuple, span: SourceSpan) -> Result<Vec<u8>> {
let len = self.metadata.keys.len(); let len = self.metadata.keys.len();
ensure!( ensure!(
@ -465,10 +525,8 @@ impl<'a> SessionTx<'a> {
if self.store_tx.exists(&encoded, true)? { if self.store_tx.exists(&encoded, true)? {
bail!(RelNameConflictError(input_meta.name.to_string())) bail!(RelNameConflictError(input_meta.name.to_string()))
}; };
} else { } else if self.temp_store_tx.exists(&encoded, true)? {
if self.temp_store_tx.exists(&encoded, true)? { bail!(RelNameConflictError(input_meta.name.to_string()))
bail!(RelNameConflictError(input_meta.name.to_string()))
};
} }
let metadata = input_meta.metadata.clone(); let metadata = input_meta.metadata.clone();
@ -543,7 +601,7 @@ impl<'a> SessionTx<'a> {
} }
for k in store.indices.keys() { for k in store.indices.keys() {
self.destroy_relation(&format!("{}:{}", name, k))?; self.destroy_relation(&format!("{name}:{k}"))?;
} }
let key = DataValue::from(name); let key = DataValue::from(name);
@ -587,7 +645,7 @@ impl<'a> SessionTx<'a> {
} }
let mut col_defs = vec![]; let mut col_defs = vec![];
for col in cols.iter() { 'outer: for col in cols.iter() {
for orig_col in rel_handle for orig_col in rel_handle
.metadata .metadata
.keys .keys
@ -596,7 +654,7 @@ impl<'a> SessionTx<'a> {
{ {
if orig_col.name == col.name { if orig_col.name == col.name {
col_defs.push(orig_col.clone()); col_defs.push(orig_col.clone());
break; continue 'outer;
} }
} }

@ -45,7 +45,7 @@ pub extern "system" fn Java_org_cozodb_CozoJavaBridge_openDb(
let engine: String = env.get_string(engine).unwrap().into(); let engine: String = env.get_string(engine).unwrap().into();
let path: String = env.get_string(path).unwrap().into(); let path: String = env.get_string(path).unwrap().into();
let options: String = env.get_string(options).unwrap().into(); let options: String = env.get_string(options).unwrap().into();
let id = match DbInstance::new(&engine, &path, &options) { let id = match DbInstance::new(&engine, path, &options) {
Ok(db) => { Ok(db) => {
let id = HANDLES.current.fetch_add(1, Ordering::AcqRel); let id = HANDLES.current.fetch_add(1, Ordering::AcqRel);
let mut dbs = HANDLES.dbs.lock().unwrap(); let mut dbs = HANDLES.dbs.lock().unwrap();
@ -139,7 +139,7 @@ pub extern "system" fn Java_org_cozodb_CozoJavaBridge_backup(
match get_db(id) { match get_db(id) {
None => env.new_string(DB_NOT_FOUND).unwrap().into_raw(), None => env.new_string(DB_NOT_FOUND).unwrap().into_raw(),
Some(db) => { Some(db) => {
let res = db.backup_db_str(&file); let res = db.backup_db_str(file);
env.new_string(res).unwrap().into_raw() env.new_string(res).unwrap().into_raw()
} }
} }
@ -156,7 +156,7 @@ pub extern "system" fn Java_org_cozodb_CozoJavaBridge_restore(
match get_db(id) { match get_db(id) {
None => env.new_string(DB_NOT_FOUND).unwrap().into_raw(), None => env.new_string(DB_NOT_FOUND).unwrap().into_raw(),
Some(db) => { Some(db) => {
let res = db.restore_backup_str(&file); let res = db.restore_backup_str(file);
env.new_string(res).unwrap().into_raw() env.new_string(res).unwrap().into_raw()
} }
} }

@ -28,7 +28,7 @@ fn open_db(mut cx: FunctionContext) -> JsResult<JsNumber> {
let engine = cx.argument::<JsString>(0)?.value(&mut cx); let engine = cx.argument::<JsString>(0)?.value(&mut cx);
let path = cx.argument::<JsString>(1)?.value(&mut cx); let path = cx.argument::<JsString>(1)?.value(&mut cx);
let options = cx.argument::<JsString>(2)?.value(&mut cx); let options = cx.argument::<JsString>(2)?.value(&mut cx);
match DbInstance::new(&engine, &path, &options) { match DbInstance::new(&engine, path, &options) {
Ok(db) => { Ok(db) => {
let id = HANDLES.current.fetch_add(1, Ordering::AcqRel); let id = HANDLES.current.fetch_add(1, Ordering::AcqRel);
let mut dbs = HANDLES.dbs.lock().unwrap(); let mut dbs = HANDLES.dbs.lock().unwrap();

@ -40,7 +40,7 @@ fn py_to_value(ob: &PyAny) -> PyResult<DataValue> {
} }
DataValue::List(coll) DataValue::List(coll)
} else { } else {
return Err(PyException::new_err(format!("Cannot convert {} into Cozo value", ob)).into()); return Err(PyException::new_err(format!("Cannot convert {ob} into Cozo value")));
}) })
} }

@ -95,13 +95,10 @@ fn main() {
.run_script("::running", Default::default()) .run_script("::running", Default::default())
.expect("Cannot determine running queries"); .expect("Cannot determine running queries");
for row in running.rows { for row in running.rows {
let id = row.into_iter().next().unwrap().get_int().unwrap(); let id = row.into_iter().next().unwrap();
eprintln!("Killing running query {id}"); eprintln!("Killing running query {id}");
db_copy db_copy
.run_script( .run_script("::kill $id", BTreeMap::from([("id".to_string(), id)]))
"::kill $id",
BTreeMap::from([("id".to_string(), json!(id))]),
)
.expect("Cannot kill process"); .expect("Cannot kill process");
} }
}) })
@ -170,7 +167,9 @@ fn server_main(args: Args, db: DbInstance) {
} }
let payload: QueryPayload = try_or_400!(rouille::input::json_input(request)); let payload: QueryPayload = try_or_400!(rouille::input::json_input(request));
let result = db.run_script_fold_err(&payload.script, payload.params); let params = payload.params.into_iter().map(|(k, v)|
(k, DataValue::from(v))).collect();
let result = db.run_script_fold_err(&payload.script, params);
let response = Response::json(&result); let response = Response::json(&result);
if let Some(serde_json::Value::Bool(true)) = result.get("ok") { if let Some(serde_json::Value::Bool(true)) = result.get("ok") {
response response

@ -16,7 +16,7 @@ use std::io::{Read, Write};
use miette::{bail, miette, IntoDiagnostic}; use miette::{bail, miette, IntoDiagnostic};
use serde_json::{json, Value}; use serde_json::{json, Value};
use cozo::DbInstance; use cozo::{DataValue, DbInstance};
struct Indented; struct Indented;
@ -104,7 +104,7 @@ pub(crate) fn repl_main(db: DbInstance) -> Result<(), Box<dyn Error>> {
fn process_line( fn process_line(
line: &str, line: &str,
db: &DbInstance, db: &DbInstance,
params: &mut BTreeMap<String, Value>, params: &mut BTreeMap<String, DataValue>,
save_next: &mut Option<String>, save_next: &mut Option<String>,
) -> miette::Result<()> { ) -> miette::Result<()> {
let line = line.trim(); let line = line.trim();
@ -143,7 +143,7 @@ fn process_line(
if path.is_empty() { if path.is_empty() {
bail!("Backup requires a path"); bail!("Backup requires a path");
}; };
db.backup_db(path.to_string())?; db.backup_db(path)?;
println!("Backup written successfully to {path}") println!("Backup written successfully to {path}")
} }
"restore" => { "restore" => {

Loading…
Cancel
Save