diff --git a/cozo-core/src/parse/sys.rs b/cozo-core/src/parse/sys.rs index 345489d2..25f3a342 100644 --- a/cozo-core/src/parse/sys.rs +++ b/cozo-core/src/parse/sys.rs @@ -34,6 +34,8 @@ pub(crate) enum SysOp { ShowTrigger(Symbol), SetTriggers(Symbol, Vec, Vec, Vec), SetAccessLevel(Vec, AccessLevel), + CreateIndex(Symbol, Symbol, Vec), + RemoveIndex(Symbol, Symbol), } #[derive(Debug, Diagnostic, Error)] @@ -145,6 +147,34 @@ pub(crate) fn parse_sys( } SysOp::SetTriggers(rel, puts, rms, replaces) } + Rule::index_op => { + let inner = inner.into_inner().next().unwrap(); + match inner.as_rule() { + Rule::index_drop => { + let mut inner = inner.into_inner(); + let rel = inner.next().unwrap(); + let name = inner.next().unwrap(); + let cols = inner + .map(|p| Symbol::new(p.as_str(), p.extract_span())) + .collect_vec(); + SysOp::CreateIndex( + Symbol::new(rel.as_str(), rel.extract_span()), + Symbol::new(name.as_str(), name.extract_span()), + cols, + ) + } + Rule::index_create => { + let mut inner = inner.into_inner(); + let rel = inner.next().unwrap(); + let name = inner.next().unwrap(); + SysOp::RemoveIndex( + Symbol::new(rel.as_str(), rel.extract_span()), + Symbol::new(name.as_str(), name.extract_span()), + ) + } + _ => unreachable!() + } + } rule => unreachable!("{:?}", rule), }) } diff --git a/cozo-core/src/query/stored.rs b/cozo-core/src/query/stored.rs index bc6af859..1ffe373d 100644 --- a/cozo-core/src/query/stored.rs +++ b/cozo-core/src/query/stored.rs @@ -23,11 +23,11 @@ use crate::data::value::{DataValue, ValidityTs}; use crate::fixed_rule::utilities::constant::Constant; use crate::fixed_rule::FixedRuleHandle; use crate::parse::parse_script; -use crate::runtime::db::CallbackOp; -use crate::runtime::relation::{AccessLevel, InputRelationHandle, InsufficientAccessLevel}; +use crate::runtime::db::{CallbackCollector, CallbackOp}; +use crate::runtime::relation::{AccessLevel, extend_tuple_from_v, InputRelationHandle, InsufficientAccessLevel}; use crate::runtime::transact::SessionTx; use crate::storage::Storage; -use crate::{Db, NamedRows, StoreTx}; +use crate::{Db, decode_tuple_from_kv, NamedRows, StoreTx}; #[derive(Debug, Error, Diagnostic)] #[error("attempting to write into relation {0} of arity {1} with data of arity {2}")] @@ -44,15 +44,28 @@ impl<'a> SessionTx<'a> { headers: &[Symbol], cur_vld: ValidityTs, callback_targets: &BTreeSet>, - callback_collector: &mut BTreeMap< - SmartString, - Vec<(CallbackOp, NamedRows, NamedRows)>, - >, + callback_collector: &mut CallbackCollector, + propagate_triggers: bool, ) -> Result, Vec)>> { let mut to_clear = vec![]; let mut replaced_old_triggers = None; if op == RelationOp::Replace { + if !propagate_triggers { + #[derive(Debug, Error, Diagnostic)] + #[error("replace op in trigger is not allowed: {0}")] + #[diagnostic(code(eval::replace_in_trigger))] + struct ReplaceInTrigger(String); + bail!(ReplaceInTrigger(meta.name.to_string())) + + } if let Ok(old_handle) = self.get_relation(&meta.name, true) { + if !old_handle.indices.is_empty() { + #[derive(Debug, Error, Diagnostic)] + #[error("cannot replace relation {0} since it has indices")] + #[diagnostic(code(eval::replace_rel_with_indices))] + struct ReplaceRelationWithIndices(String); + bail!(ReplaceRelationWithIndices(old_handle.name.to_string())) + } if old_handle.access_level < AccessLevel::Normal { bail!(InsufficientAccessLevel( old_handle.name.to_string(), @@ -69,7 +82,7 @@ impl<'a> SessionTx<'a> { .get_single_program()?; let (_, cleanups) = db - .run_query(self, program, cur_vld, callback_targets, callback_collector) + .run_query(self, program, cur_vld, callback_targets, callback_collector, false) .map_err(|err| { if err.source_code().is_some() { err @@ -119,7 +132,9 @@ impl<'a> SessionTx<'a> { )?; let need_to_collect = !relation_store.is_temp - && (is_callback_target || !relation_store.rm_triggers.is_empty()); + && (is_callback_target + || (propagate_triggers && !relation_store.rm_triggers.is_empty())); + let has_indices = !relation_store.indices.is_empty(); let mut new_tuples: Vec = vec![]; let mut old_tuples: Vec = vec![]; @@ -129,20 +144,28 @@ impl<'a> SessionTx<'a> { .map(|ex| ex.extract_data(&tuple, cur_vld)) .try_collect()?; let key = relation_store.encode_key_for_store(&extracted, *span)?; - if need_to_collect { + if need_to_collect || has_indices { if let Some(existing) = self.store_tx.get(&key, false)? { let mut tup = extracted.clone(); if !existing.is_empty() { - let mut remaining = &existing[ENCODED_KEY_MIN_LEN..]; - while !remaining.is_empty() { - let (val, nxt) = DataValue::decode_from_key(remaining); - tup.push(val); - remaining = nxt; + extend_tuple_from_v(&mut tup, &existing); + } + if has_indices { + for (idx_rel, extractor) in relation_store.indices.values() { + let idx_tup = + extractor.iter().map(|i| tup[*i].clone()).collect_vec(); + let encoded = idx_rel + .encode_key_for_store(&idx_tup, Default::default())?; + self.store_tx.del(&encoded)?; } } - old_tuples.push(DataValue::List(tup)); + if need_to_collect { + old_tuples.push(DataValue::List(tup)); + } + } + if need_to_collect { + new_tuples.push(DataValue::List(extracted.clone())); } - new_tuples.push(DataValue::List(extracted.clone())); } if relation_store.is_temp { self.temp_store_tx.del(&key)?; @@ -151,6 +174,7 @@ impl<'a> SessionTx<'a> { } } + // triggers and callbacks if need_to_collect && !new_tuples.is_empty() { let k_bindings = relation_store .metadata @@ -168,35 +192,37 @@ impl<'a> SessionTx<'a> { kv_bindings.extend(v_bindings); let kv_bindings = kv_bindings; - for trigger in &relation_store.rm_triggers { - let mut program = - parse_script(trigger, &Default::default(), &db.algorithms, cur_vld)? - .get_single_program()?; - - make_const_rule( - &mut program, - "_new", - k_bindings.clone(), - new_tuples.clone(), - ); - - make_const_rule( - &mut program, - "_old", - kv_bindings.clone(), - old_tuples.clone(), - ); - - let (_, cleanups) = db - .run_query(self, program, cur_vld, callback_targets, callback_collector) - .map_err(|err| { - if err.source_code().is_some() { - err - } else { - err.with_source_code(trigger.to_string()) - } - })?; - to_clear.extend(cleanups); + if propagate_triggers { + for trigger in &relation_store.rm_triggers { + let mut program = + parse_script(trigger, &Default::default(), &db.algorithms, cur_vld)? + .get_single_program()?; + + make_const_rule( + &mut program, + "_new", + k_bindings.clone(), + new_tuples.clone(), + ); + + make_const_rule( + &mut program, + "_old", + kv_bindings.clone(), + old_tuples.clone(), + ); + + let (_, cleanups) = db + .run_query(self, program, cur_vld, callback_targets, callback_collector, false) + .map_err(|err| { + if err.source_code().is_some() { + err + } else { + err.with_source_code(trigger.to_string()) + } + })?; + to_clear.extend(cleanups); + } } if is_callback_target { @@ -347,7 +373,9 @@ impl<'a> SessionTx<'a> { )?; let need_to_collect = !relation_store.is_temp - && (is_callback_target || !relation_store.put_triggers.is_empty()); + && (is_callback_target + || (propagate_triggers && !relation_store.put_triggers.is_empty())); + let has_indices = !relation_store.indices.is_empty(); let mut new_tuples: Vec = vec![]; let mut old_tuples: Vec = vec![]; @@ -368,19 +396,53 @@ impl<'a> SessionTx<'a> { let key = relation_store.encode_key_for_store(&extracted, *span)?; let val = relation_store.encode_val_for_store(&extracted, *span)?; - if need_to_collect { + if need_to_collect || has_indices { if let Some(existing) = self.store_tx.get(&key, false)? { let mut tup = extracted.clone(); - let mut remaining = &existing[ENCODED_KEY_MIN_LEN..]; - while !remaining.is_empty() { - let (val, nxt) = DataValue::decode_from_key(remaining); - tup.push(val); - remaining = nxt; + if !existing.is_empty() { + extend_tuple_from_v(&mut tup, &existing); + } + if has_indices { + if extracted != tup { + for (idx_rel, extractor) in relation_store.indices.values() { + let idx_tup_old = + extractor.iter().map(|i| tup[*i].clone()).collect_vec(); + let encoded_old = idx_rel.encode_key_for_store( + &idx_tup_old, + Default::default(), + )?; + self.store_tx.del(&encoded_old)?; + + let idx_tup_new = extractor + .iter() + .map(|i| extracted[*i].clone()) + .collect_vec(); + let encoded_new = idx_rel.encode_key_for_store( + &idx_tup_new, + Default::default(), + )?; + self.store_tx.put(&encoded_new, &[])?; + } + } + } + if need_to_collect { + old_tuples.push(DataValue::List(tup)); + } + } else if has_indices { + for (idx_rel, extractor) in relation_store.indices.values() { + let idx_tup_new = extractor + .iter() + .map(|i| extracted[*i].clone()) + .collect_vec(); + let encoded_new = idx_rel + .encode_key_for_store(&idx_tup_new, Default::default())?; + self.store_tx.put(&encoded_new, &[])?; } - old_tuples.push(DataValue::List(tup)); } - new_tuples.push(DataValue::List(extracted)); + if need_to_collect { + new_tuples.push(DataValue::List(extracted)); + } } if relation_store.is_temp { @@ -405,34 +467,36 @@ impl<'a> SessionTx<'a> { bindings.extend(v_bindings); let kv_bindings = bindings; - for trigger in &relation_store.put_triggers { - let mut program = - parse_script(trigger, &Default::default(), &db.algorithms, cur_vld)? - .get_single_program()?; - - make_const_rule( - &mut program, - "_new", - kv_bindings.clone(), - new_tuples.clone(), - ); - make_const_rule( - &mut program, - "_old", - kv_bindings.clone(), - old_tuples.clone(), - ); - - let (_, cleanups) = db - .run_query(self, program, cur_vld, callback_targets, callback_collector) - .map_err(|err| { - if err.source_code().is_some() { - err - } else { - err.with_source_code(trigger.to_string()) - } - })?; - to_clear.extend(cleanups); + if propagate_triggers { + for trigger in &relation_store.put_triggers { + let mut program = + parse_script(trigger, &Default::default(), &db.algorithms, cur_vld)? + .get_single_program()?; + + make_const_rule( + &mut program, + "_new", + kv_bindings.clone(), + new_tuples.clone(), + ); + make_const_rule( + &mut program, + "_old", + kv_bindings.clone(), + old_tuples.clone(), + ); + + let (_, cleanups) = db + .run_query(self, program, cur_vld, callback_targets, callback_collector, false) + .map_err(|err| { + if err.source_code().is_some() { + err + } else { + err.with_source_code(trigger.to_string()) + } + })?; + to_clear.extend(cleanups); + } } if is_callback_target { diff --git a/cozo-core/src/runtime/db.rs b/cozo-core/src/runtime/db.rs index 8d37c368..1bc3779a 100644 --- a/cozo-core/src/runtime/db.rs +++ b/cozo-core/src/runtime/db.rs @@ -6,11 +6,11 @@ * You can obtain one at https://mozilla.org/MPL/2.0/. */ -use crossbeam::sync::ShardedLock; use std::collections::btree_map::Entry; use std::collections::{BTreeMap, BTreeSet}; use std::default::Default; use std::fmt::{Debug, Formatter}; +use std::iter; use std::path::Path; use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering}; use std::sync::{Arc, Mutex}; @@ -20,6 +20,7 @@ use std::thread; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use crossbeam::channel::{unbounded, Sender}; +use crossbeam::sync::ShardedLock; use either::{Either, Left, Right}; use itertools::Itertools; #[allow(unused_imports)] @@ -88,6 +89,11 @@ pub struct CallbackDeclaration { callback: Box, } +pub(crate) type CallbackCollector = BTreeMap< + SmartString, + Vec<(CallbackOp, NamedRows, NamedRows)>, +>; + /// The database object of Cozo. #[derive(Clone)] pub struct Db { @@ -270,6 +276,10 @@ impl<'s, S: Storage<'s>> Db { #[diagnostic(code(import::bad_data))] struct BadDataForRelation(String, JsonValue); + let rel_names = data.keys().map(|k| SmartString::from(k)).collect_vec(); + let locks = self.obtain_relation_locks(rel_names.iter()); + let _guards = locks.iter().map(|l| l.read().unwrap()).collect_vec(); + let cur_vld = current_validity(); let mut tx = self.transact_write()?; @@ -431,6 +441,10 @@ impl<'s, S: Storage<'s>> Db { #[cfg(feature = "storage-sqlite")] { + let rel_names = relations.iter().map(|n| SmartString::from(n)).collect_vec(); + let locks = self.obtain_relation_locks(rel_names.iter()); + let _guards = locks.iter().map(|l| l.read().unwrap()).collect_vec(); + let source_db = crate::new_cozo_sqlite(in_file)?; let mut src_tx = source_db.transact()?; let mut dst_tx = self.transact_write()?; @@ -598,10 +612,7 @@ impl<'s, S: Storage<'s>> Db { cur_vld: ValidityTs, span: SourceSpan, callback_targets: &BTreeSet>, - callback_collector: &mut BTreeMap< - SmartString, - Vec<(CallbackOp, NamedRows, NamedRows)>, - >, + callback_collector: &mut CallbackCollector, ) -> Result { let res = match p { Left(rel) => { @@ -637,15 +648,12 @@ impl<'s, S: Storage<'s>> Db { cleanups: &mut Vec<(Vec, Vec)>, cur_vld: ValidityTs, callback_targets: &BTreeSet>, - callback_collector: &mut BTreeMap< - SmartString, - Vec<(CallbackOp, NamedRows, NamedRows)>, - >, + callback_collector: &mut CallbackCollector, ) -> Result { #[allow(unused_variables)] let sleep_opt = p.out_opts.sleep; let (q_res, q_cleanups) = - self.run_query(tx, p, cur_vld, callback_targets, callback_collector)?; + self.run_query(tx, p, cur_vld, callback_targets, callback_collector, true)?; cleanups.extend(q_cleanups); #[cfg(not(target_arch = "wasm32"))] if let Some(secs) = sleep_opt { @@ -664,7 +672,7 @@ impl<'s, S: Storage<'s>> Db { } fn send_callbacks( &'s self, - collector: BTreeMap, Vec<(CallbackOp, NamedRows, NamedRows)>>, + collector: CallbackCollector, ) { for (k, vals) in collector { for (op, new, old) in vals { @@ -738,7 +746,7 @@ impl<'s, S: Storage<'s>> Db { } let is_write = !write_lock_names.is_empty(); let write_lock = self.obtain_relation_locks(write_lock_names.iter()); - let _write_lock_guards = write_lock.iter().map(|l| l.read()).collect_vec(); + let _write_lock_guards = write_lock.iter().map(|l| l.read().unwrap()).collect_vec(); let callback_targets = if is_write { self.current_callback_targets() @@ -803,10 +811,7 @@ impl<'s, S: Storage<'s>> Db { cleanups: &mut Vec<(Vec, Vec)>, cur_vld: ValidityTs, callback_targets: &BTreeSet>, - callback_collector: &mut BTreeMap< - SmartString, - Vec<(CallbackOp, NamedRows, NamedRows)>, - >, + callback_collector: &mut CallbackCollector, ) -> Result> { let mut ret = NamedRows::default(); for p in ps { @@ -1166,6 +1171,9 @@ impl<'s, S: Storage<'s>> Db { } SysOp::ListRelations => self.list_relations(), SysOp::RemoveRelation(rel_names) => { + let rel_name_strs = rel_names.iter().map(|n| &n.name); + let locks = self.obtain_relation_locks(rel_name_strs); + let _guards = locks.iter().map(|l| l.read().unwrap()).collect_vec(); let mut bounds = vec![]; { let mut tx = self.transact_write()?; @@ -1183,8 +1191,33 @@ impl<'s, S: Storage<'s>> Db { rows: vec![vec![DataValue::from(OK_STR)]], }) } + SysOp::CreateIndex(rel_name, idx_name, cols) => { + let lock = self.obtain_relation_locks(iter::once(&rel_name.name)).pop().unwrap(); + let _guard = lock.write().unwrap(); + let mut tx = self.transact_write()?; + tx.create_index(&rel_name, &idx_name, cols)?; + tx.commit_tx()?; + Ok(NamedRows { + headers: vec![STATUS_STR.to_string()], + rows: vec![vec![DataValue::from(OK_STR)]], + }) + } + SysOp::RemoveIndex(rel_name, idx_name) => { + let lock = self.obtain_relation_locks(iter::once(&rel_name.name)).pop().unwrap(); + let _guard = lock.read().unwrap(); + let mut tx = self.transact_write()?; + tx.remove_index(&rel_name, &idx_name)?; + tx.commit_tx()?; + Ok(NamedRows { + headers: vec![STATUS_STR.to_string()], + rows: vec![vec![DataValue::from(OK_STR)]], + }) + } SysOp::ListRelation(rs) => self.list_relation(&rs), SysOp::RenameRelation(rename_pairs) => { + let rel_names = rename_pairs.iter().flat_map(|(f, t)| [&f.name, &t.name]); + let locks = self.obtain_relation_locks(rel_names); + let _guards = locks.iter().map(|l| l.read().unwrap()).collect_vec(); let mut tx = self.transact_write()?; for (old, new) in rename_pairs { tx.rename_relation(old, new)?; @@ -1268,10 +1301,8 @@ impl<'s, S: Storage<'s>> Db { input_program: InputProgram, cur_vld: ValidityTs, callback_targets: &BTreeSet>, - callback_collector: &mut BTreeMap< - SmartString, - Vec<(CallbackOp, NamedRows, NamedRows)>, - >, + callback_collector: &mut CallbackCollector, + top_level: bool ) -> Result<(NamedRows, Vec<(Vec, Vec)>)> { // cleanups contain stored relations that should be deleted at the end of query let mut clean_ups = vec![]; @@ -1416,6 +1447,7 @@ impl<'s, S: Storage<'s>> Db { cur_vld, callback_targets, callback_collector, + top_level ) .wrap_err_with(|| format!("when executing against relation '{}'", meta.name))?; clean_ups.extend(to_clear); @@ -1470,6 +1502,7 @@ impl<'s, S: Storage<'s>> Db { cur_vld, callback_targets, callback_collector, + top_level ) .wrap_err_with(|| format!("when executing against relation '{}'", meta.name))?; clean_ups.extend(to_clear); diff --git a/cozo-core/src/runtime/relation.rs b/cozo-core/src/runtime/relation.rs index 10c509f4..2c71bd2c 100644 --- a/cozo-core/src/runtime/relation.rs +++ b/cozo-core/src/runtime/relation.rs @@ -6,6 +6,7 @@ * You can obtain one at https://mozilla.org/MPL/2.0/. */ +use std::collections::BTreeMap; use std::fmt::{Debug, Display, Formatter}; use std::sync::atomic::Ordering; @@ -73,6 +74,8 @@ pub(crate) struct RelationHandle { pub(crate) access_level: AccessLevel, #[serde(default)] pub(crate) is_temp: bool, + #[serde(default)] + pub(crate) indices: BTreeMap, (RelationHandle, Vec)>, } #[derive( @@ -483,6 +486,7 @@ impl<'a> SessionTx<'a> { replace_triggers: vec![], access_level: AccessLevel::Normal, is_temp, + indices: Default::default(), }; let name_key = vec![DataValue::Str(meta.name.clone())].encode_as_key(RelationId::SYSTEM); @@ -537,6 +541,11 @@ impl<'a> SessionTx<'a> { store.access_level )) } + + for k in store.indices.keys() { + self.destroy_relation(&format!("{}:{}", name, k))?; + } + let key = DataValue::from(name); let encoded = vec![key].encode_as_key(RelationId::SYSTEM); self.store_tx.del(&encoded)?; @@ -557,6 +566,162 @@ impl<'a> SessionTx<'a> { Ok(()) } + + pub(crate) fn create_index( + &mut self, + rel_name: &Symbol, + idx_name: &Symbol, + cols: Vec, + ) -> Result<()> { + let mut rel_handle = self.get_relation(rel_name, true)?; + if rel_handle.indices.contains_key(&idx_name.name) { + #[derive(Debug, Error, Diagnostic)] + #[error("index {0} for relation {1} already exists")] + #[diagnostic(code(tx::index_already_exists))] + pub(crate) struct IndexAlreadyExists(String, String); + + bail!(IndexAlreadyExists( + idx_name.name.to_string(), + rel_name.name.to_string() + )); + } + + let mut col_defs = vec![]; + for col in cols.iter() { + for orig_col in rel_handle + .metadata + .keys + .iter() + .chain(rel_handle.metadata.non_keys.iter()) + { + if orig_col.name == col.name { + col_defs.push(orig_col.clone()); + break; + } + } + + #[derive(Debug, Error, Diagnostic)] + #[error("column {0} in index {1} for relation {2} not found")] + #[diagnostic(code(tx::col_in_idx_not_found))] + pub(crate) struct ColInIndexNotFound(String, String, String); + + bail!(ColInIndexNotFound( + col.name.to_string(), + idx_name.name.to_string(), + rel_name.name.to_string() + )); + } + + for key in rel_handle.metadata.keys.iter() { + for col in cols.iter() { + if col.name == key.name { + break; + } + } + col_defs.push(key.clone()); + } + + let key_bindings = col_defs + .iter() + .map(|col| Symbol::new(col.name.clone(), Default::default())) + .collect_vec(); + let idx_meta = StoredRelationMetadata { + keys: col_defs, + non_keys: vec![], + }; + + let idx_handle = InputRelationHandle { + name: Symbol::new( + format!("{}:{}", rel_name.name, idx_name.name), + Default::default(), + ), + metadata: idx_meta, + key_bindings, + dep_bindings: vec![], + span: Default::default(), + }; + + let idx_handle = self.create_relation(idx_handle)?; + + // populate index + let extraction_indices = idx_handle + .metadata + .keys + .iter() + .map(|col| { + for (i, kc) in rel_handle.metadata.keys.iter().enumerate() { + if kc.name == col.name { + return i; + } + } + for (i, kc) in rel_handle.metadata.non_keys.iter().enumerate() { + if kc.name == col.name { + return i + rel_handle.metadata.keys.len(); + } + } + unreachable!() + }) + .collect_vec(); + + if self.store_tx.supports_par_put() { + for tuple in rel_handle.scan_all(self) { + let tuple = tuple?; + let extracted = extraction_indices + .iter() + .map(|idx| tuple[*idx].clone()) + .collect_vec(); + let key = idx_handle.encode_key_for_store(&extracted, Default::default())?; + self.store_tx.par_put(&key, &[])?; + } + } else { + for tuple in rel_handle.scan_all(self).collect_vec() { + let tuple = tuple?; + let extracted = extraction_indices + .iter() + .map(|idx| tuple[*idx].clone()) + .collect_vec(); + let key = idx_handle.encode_key_for_store(&extracted, Default::default())?; + self.store_tx.put(&key, &[])?; + } + } + + rel_handle + .indices + .insert(idx_name.name.clone(), (idx_handle, extraction_indices)); + + let new_encoded = + vec![DataValue::from(&rel_name.name as &str)].encode_as_key(RelationId::SYSTEM); + let mut meta_val = vec![]; + rel_handle + .serialize(&mut Serializer::new(&mut meta_val)) + .unwrap(); + self.store_tx.put(&new_encoded, &meta_val)?; + + Ok(()) + } + + pub(crate) fn remove_index(&mut self, rel_name: &Symbol, idx_name: &Symbol) -> Result<()> { + let mut rel = self.get_relation(rel_name, true)?; + if rel.indices.remove(&idx_name.name).is_none() { + #[derive(Debug, Error, Diagnostic)] + #[error("index {0} for relation {1} not found")] + #[diagnostic(code(tx::idx_not_found))] + pub(crate) struct IndexNotFound(String, String); + + bail!(IndexNotFound(idx_name.to_string(), rel_name.to_string())); + } + + self.destroy_relation(&format!("{}:{}", rel_name.name, idx_name.name))?; + + let new_encoded = + vec![DataValue::from(&rel_name.name as &str)].encode_as_key(RelationId::SYSTEM); + let mut meta_val = vec![]; + rel.serialize(&mut Serializer::new(&mut meta_val)).unwrap(); + self.store_tx.put(&new_encoded, &meta_val)?; + + Ok(()) + } + pub(crate) fn rename_relation(&mut self, old: Symbol, new: Symbol) -> Result<()> { if old.name.starts_with('_') || new.name.starts_with('_') { bail!("Bad name given"); @@ -613,6 +778,7 @@ impl<'a> SessionTx<'a> { #[derive(Debug, Error, Diagnostic)] #[error("Insufficient access level {2} for {1} on stored relation '{0}'")] +#[diagnostic(code(tx::insufficient_access_level))] pub(crate) struct InsufficientAccessLevel( pub(crate) String, pub(crate) String, diff --git a/cozo-core/src/storage/mem.rs b/cozo-core/src/storage/mem.rs index e5fbc887..44892087 100644 --- a/cozo-core/src/storage/mem.rs +++ b/cozo-core/src/storage/mem.rs @@ -129,6 +129,14 @@ impl<'s> StoreTx<'s> for MemTx<'s> { } } + fn supports_par_put(&self) -> bool { + false + } + + fn par_put(&self, _key: &[u8], _val: &[u8]) -> Result<()> { + panic!() + } + fn del(&mut self, key: &[u8]) -> Result<()> { match self { MemTx::Reader(_) => { diff --git a/cozo-core/src/storage/mod.rs b/cozo-core/src/storage/mod.rs index 9b6809a3..83ca6615 100644 --- a/cozo-core/src/storage/mod.rs +++ b/cozo-core/src/storage/mod.rs @@ -66,6 +66,15 @@ pub trait StoreTx<'s>: Sync { /// the storage engine needs to overwrite the old value. fn put(&mut self, key: &[u8], val: &[u8]) -> Result<()>; + /// Should return true if the engine supports parallel put, false otherwise. + fn supports_par_put(&self) -> bool; + + /// Put a key-value pair into the storage. In case of existing key, + /// the storage engine needs to overwrite the old value. + /// The difference between this one and `put` is the mutability of self. + /// It is OK to always panic if `supports_par_put` returns `true`. + fn par_put(&self, key: &[u8], val: &[u8]) -> Result<()>; + /// Delete a key-value pair from the storage. fn del(&mut self, key: &[u8]) -> Result<()>; diff --git a/cozo-core/src/storage/rocks.rs b/cozo-core/src/storage/rocks.rs index bcb0eb86..198669f2 100644 --- a/cozo-core/src/storage/rocks.rs +++ b/cozo-core/src/storage/rocks.rs @@ -171,6 +171,14 @@ impl<'s> StoreTx<'s> for RocksDbTx { Ok(self.db_tx.put(key, val)?) } + fn supports_par_put(&self) -> bool { + true + } + + fn par_put(&self, key: &[u8], val: &[u8]) -> Result<()> { + Ok(self.db_tx.put(key, val)?) + } + #[inline] fn del(&mut self, key: &[u8]) -> Result<()> { Ok(self.db_tx.del(key)?) diff --git a/cozo-core/src/storage/sled.rs b/cozo-core/src/storage/sled.rs index fa445237..c48a837f 100644 --- a/cozo-core/src/storage/sled.rs +++ b/cozo-core/src/storage/sled.rs @@ -138,6 +138,14 @@ impl<'s> StoreTx<'s> for SledTx { Ok(()) } + fn supports_par_put(&self) -> bool { + false + } + + fn par_put(&self, _key: &[u8], _val: &[u8]) -> Result<()> { + panic!() + } + #[inline] fn del(&mut self, key: &[u8]) -> Result<()> { self.ensure_changes_db()?; diff --git a/cozo-core/src/storage/sqlite.rs b/cozo-core/src/storage/sqlite.rs index 13ac6e06..6322d8fc 100644 --- a/cozo-core/src/storage/sqlite.rs +++ b/cozo-core/src/storage/sqlite.rs @@ -8,9 +8,9 @@ use std::path::{Path, PathBuf}; use std::sync::{Arc, Mutex}; -use crossbeam::sync::{ShardedLock, ShardedLockReadGuard, ShardedLockWriteGuard}; use ::sqlite::Connection; +use crossbeam::sync::{ShardedLock, ShardedLockReadGuard, ShardedLockWriteGuard}; use either::{Either, Left, Right}; use miette::{bail, miette, IntoDiagnostic, Result}; use sqlite::{ConnectionWithFullMutex, State, Statement}; @@ -216,6 +216,14 @@ impl<'s> StoreTx<'s> for SqliteTx<'s> { } fn put(&mut self, key: &[u8], val: &[u8]) -> Result<()> { + self.par_put(key, val) + } + + fn supports_par_put(&self) -> bool { + true + } + + fn par_put(&self, key: &[u8], val: &[u8]) -> Result<()> { self.ensure_stmt(PUT_QUERY); let mut statement = self.stmts[PUT_QUERY].lock().unwrap(); let statement = statement.as_mut().unwrap(); diff --git a/cozo-core/src/storage/temp.rs b/cozo-core/src/storage/temp.rs index e66c5604..201426e0 100644 --- a/cozo-core/src/storage/temp.rs +++ b/cozo-core/src/storage/temp.rs @@ -63,6 +63,14 @@ impl<'s> StoreTx<'s> for TempTx { Ok(()) } + fn supports_par_put(&self) -> bool { + false + } + + fn par_put(&self, _key: &[u8], _val: &[u8]) -> Result<()> { + panic!() + } + fn del(&mut self, key: &[u8]) -> Result<()> { self.store.remove(key); Ok(()) diff --git a/cozo-core/src/storage/tikv.rs b/cozo-core/src/storage/tikv.rs index 8e0ff719..3b5370ca 100644 --- a/cozo-core/src/storage/tikv.rs +++ b/cozo-core/src/storage/tikv.rs @@ -124,6 +124,14 @@ impl<'s> StoreTx<'s> for TiKvTx { } fn put(&mut self, key: &[u8], val: &[u8]) -> Result<()> { + self.par_put(key, val) + } + + fn supports_par_put(&self) -> bool { + true + } + + fn par_put(&self, key: &[u8], val: &[u8]) -> Result<()> { RT.block_on(self.tx.lock().unwrap().put(key.to_owned(), val.to_owned())) .into_diagnostic() } diff --git a/cozorocks/bridge/tx.h b/cozorocks/bridge/tx.h index af9082a9..a67ea7f8 100644 --- a/cozorocks/bridge/tx.h +++ b/cozorocks/bridge/tx.h @@ -105,11 +105,11 @@ struct TxBridge { } } - inline void put(RustBytes key, RustBytes val, RocksDbStatus &status) { + inline void put(RustBytes key, RustBytes val, RocksDbStatus &status) const { write_status(tx->Put(convert_slice(key), convert_slice(val)), status); } - inline void del(RustBytes key, RocksDbStatus &status) { + inline void del(RustBytes key, RocksDbStatus &status) const { write_status(tx->Delete(convert_slice(key)), status); } diff --git a/cozorocks/src/bridge/mod.rs b/cozorocks/src/bridge/mod.rs index 4bf198c6..dabccb4f 100644 --- a/cozorocks/src/bridge/mod.rs +++ b/cozorocks/src/bridge/mod.rs @@ -182,12 +182,12 @@ pub(crate) mod ffi { status: &mut RocksDbStatus, ); fn put( - self: Pin<&mut TxBridge>, + self: &TxBridge, key: &[u8], val: &[u8], status: &mut RocksDbStatus, ); - fn del(self: Pin<&mut TxBridge>, key: &[u8], status: &mut RocksDbStatus); + fn del(self: &TxBridge, key: &[u8], status: &mut RocksDbStatus); fn commit(self: Pin<&mut TxBridge>, status: &mut RocksDbStatus); fn rollback(self: Pin<&mut TxBridge>, status: &mut RocksDbStatus); fn rollback_to_savepoint(self: Pin<&mut TxBridge>, status: &mut RocksDbStatus); diff --git a/cozorocks/src/bridge/tx.rs b/cozorocks/src/bridge/tx.rs index 6186e051..7245ec5d 100644 --- a/cozorocks/src/bridge/tx.rs +++ b/cozorocks/src/bridge/tx.rs @@ -98,9 +98,9 @@ impl Tx { self.inner.pin_mut().clear_snapshot() } #[inline] - pub fn put(&mut self, key: &[u8], val: &[u8]) -> Result<(), RocksDbStatus> { + pub fn put(&self, key: &[u8], val: &[u8]) -> Result<(), RocksDbStatus> { let mut status = RocksDbStatus::default(); - self.inner.pin_mut().put(key, val, &mut status); + self.inner.put(key, val, &mut status); if status.is_ok() { Ok(()) } else { @@ -108,9 +108,9 @@ impl Tx { } } #[inline] - pub fn del(&mut self, key: &[u8]) -> Result<(), RocksDbStatus> { + pub fn del(&self, key: &[u8]) -> Result<(), RocksDbStatus> { let mut status = RocksDbStatus::default(); - self.inner.pin_mut().del(key, &mut status); + self.inner.del(key, &mut status); if status.is_ok() { Ok(()) } else {