index creation and deletion; fix trigger decoding

main
Ziyang Hu 2 years ago
parent 92928dfef2
commit a375eff792

@ -34,6 +34,8 @@ pub(crate) enum SysOp {
ShowTrigger(Symbol), ShowTrigger(Symbol),
SetTriggers(Symbol, Vec<String>, Vec<String>, Vec<String>), SetTriggers(Symbol, Vec<String>, Vec<String>, Vec<String>),
SetAccessLevel(Vec<Symbol>, AccessLevel), SetAccessLevel(Vec<Symbol>, AccessLevel),
CreateIndex(Symbol, Symbol, Vec<Symbol>),
RemoveIndex(Symbol, Symbol),
} }
#[derive(Debug, Diagnostic, Error)] #[derive(Debug, Diagnostic, Error)]
@ -145,6 +147,34 @@ pub(crate) fn parse_sys(
} }
SysOp::SetTriggers(rel, puts, rms, replaces) SysOp::SetTriggers(rel, puts, rms, replaces)
} }
Rule::index_op => {
let inner = inner.into_inner().next().unwrap();
match inner.as_rule() {
Rule::index_drop => {
let mut inner = inner.into_inner();
let rel = inner.next().unwrap();
let name = inner.next().unwrap();
let cols = inner
.map(|p| Symbol::new(p.as_str(), p.extract_span()))
.collect_vec();
SysOp::CreateIndex(
Symbol::new(rel.as_str(), rel.extract_span()),
Symbol::new(name.as_str(), name.extract_span()),
cols,
)
}
Rule::index_create => {
let mut inner = inner.into_inner();
let rel = inner.next().unwrap();
let name = inner.next().unwrap();
SysOp::RemoveIndex(
Symbol::new(rel.as_str(), rel.extract_span()),
Symbol::new(name.as_str(), name.extract_span()),
)
}
_ => unreachable!()
}
}
rule => unreachable!("{:?}", rule), rule => unreachable!("{:?}", rule),
}) })
} }

@ -23,11 +23,11 @@ use crate::data::value::{DataValue, ValidityTs};
use crate::fixed_rule::utilities::constant::Constant; use crate::fixed_rule::utilities::constant::Constant;
use crate::fixed_rule::FixedRuleHandle; use crate::fixed_rule::FixedRuleHandle;
use crate::parse::parse_script; use crate::parse::parse_script;
use crate::runtime::db::CallbackOp; use crate::runtime::db::{CallbackCollector, CallbackOp};
use crate::runtime::relation::{AccessLevel, InputRelationHandle, InsufficientAccessLevel}; use crate::runtime::relation::{AccessLevel, extend_tuple_from_v, InputRelationHandle, InsufficientAccessLevel};
use crate::runtime::transact::SessionTx; use crate::runtime::transact::SessionTx;
use crate::storage::Storage; use crate::storage::Storage;
use crate::{Db, NamedRows, StoreTx}; use crate::{Db, decode_tuple_from_kv, NamedRows, StoreTx};
#[derive(Debug, Error, Diagnostic)] #[derive(Debug, Error, Diagnostic)]
#[error("attempting to write into relation {0} of arity {1} with data of arity {2}")] #[error("attempting to write into relation {0} of arity {1} with data of arity {2}")]
@ -44,15 +44,28 @@ impl<'a> SessionTx<'a> {
headers: &[Symbol], headers: &[Symbol],
cur_vld: ValidityTs, cur_vld: ValidityTs,
callback_targets: &BTreeSet<SmartString<LazyCompact>>, callback_targets: &BTreeSet<SmartString<LazyCompact>>,
callback_collector: &mut BTreeMap< callback_collector: &mut CallbackCollector,
SmartString<LazyCompact>, propagate_triggers: bool,
Vec<(CallbackOp, NamedRows, NamedRows)>,
>,
) -> Result<Vec<(Vec<u8>, Vec<u8>)>> { ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
let mut to_clear = vec![]; let mut to_clear = vec![];
let mut replaced_old_triggers = None; let mut replaced_old_triggers = None;
if op == RelationOp::Replace { if op == RelationOp::Replace {
if !propagate_triggers {
#[derive(Debug, Error, Diagnostic)]
#[error("replace op in trigger is not allowed: {0}")]
#[diagnostic(code(eval::replace_in_trigger))]
struct ReplaceInTrigger(String);
bail!(ReplaceInTrigger(meta.name.to_string()))
}
if let Ok(old_handle) = self.get_relation(&meta.name, true) { if let Ok(old_handle) = self.get_relation(&meta.name, true) {
if !old_handle.indices.is_empty() {
#[derive(Debug, Error, Diagnostic)]
#[error("cannot replace relation {0} since it has indices")]
#[diagnostic(code(eval::replace_rel_with_indices))]
struct ReplaceRelationWithIndices(String);
bail!(ReplaceRelationWithIndices(old_handle.name.to_string()))
}
if old_handle.access_level < AccessLevel::Normal { if old_handle.access_level < AccessLevel::Normal {
bail!(InsufficientAccessLevel( bail!(InsufficientAccessLevel(
old_handle.name.to_string(), old_handle.name.to_string(),
@ -69,7 +82,7 @@ impl<'a> SessionTx<'a> {
.get_single_program()?; .get_single_program()?;
let (_, cleanups) = db let (_, cleanups) = db
.run_query(self, program, cur_vld, callback_targets, callback_collector) .run_query(self, program, cur_vld, callback_targets, callback_collector, false)
.map_err(|err| { .map_err(|err| {
if err.source_code().is_some() { if err.source_code().is_some() {
err err
@ -119,7 +132,9 @@ impl<'a> SessionTx<'a> {
)?; )?;
let need_to_collect = !relation_store.is_temp let need_to_collect = !relation_store.is_temp
&& (is_callback_target || !relation_store.rm_triggers.is_empty()); && (is_callback_target
|| (propagate_triggers && !relation_store.rm_triggers.is_empty()));
let has_indices = !relation_store.indices.is_empty();
let mut new_tuples: Vec<DataValue> = vec![]; let mut new_tuples: Vec<DataValue> = vec![];
let mut old_tuples: Vec<DataValue> = vec![]; let mut old_tuples: Vec<DataValue> = vec![];
@ -129,21 +144,29 @@ impl<'a> SessionTx<'a> {
.map(|ex| ex.extract_data(&tuple, cur_vld)) .map(|ex| ex.extract_data(&tuple, cur_vld))
.try_collect()?; .try_collect()?;
let key = relation_store.encode_key_for_store(&extracted, *span)?; let key = relation_store.encode_key_for_store(&extracted, *span)?;
if need_to_collect { if need_to_collect || has_indices {
if let Some(existing) = self.store_tx.get(&key, false)? { if let Some(existing) = self.store_tx.get(&key, false)? {
let mut tup = extracted.clone(); let mut tup = extracted.clone();
if !existing.is_empty() { if !existing.is_empty() {
let mut remaining = &existing[ENCODED_KEY_MIN_LEN..]; extend_tuple_from_v(&mut tup, &existing);
while !remaining.is_empty() {
let (val, nxt) = DataValue::decode_from_key(remaining);
tup.push(val);
remaining = nxt;
} }
if has_indices {
for (idx_rel, extractor) in relation_store.indices.values() {
let idx_tup =
extractor.iter().map(|i| tup[*i].clone()).collect_vec();
let encoded = idx_rel
.encode_key_for_store(&idx_tup, Default::default())?;
self.store_tx.del(&encoded)?;
} }
}
if need_to_collect {
old_tuples.push(DataValue::List(tup)); old_tuples.push(DataValue::List(tup));
} }
}
if need_to_collect {
new_tuples.push(DataValue::List(extracted.clone())); new_tuples.push(DataValue::List(extracted.clone()));
} }
}
if relation_store.is_temp { if relation_store.is_temp {
self.temp_store_tx.del(&key)?; self.temp_store_tx.del(&key)?;
} else { } else {
@ -151,6 +174,7 @@ impl<'a> SessionTx<'a> {
} }
} }
// triggers and callbacks
if need_to_collect && !new_tuples.is_empty() { if need_to_collect && !new_tuples.is_empty() {
let k_bindings = relation_store let k_bindings = relation_store
.metadata .metadata
@ -168,6 +192,7 @@ impl<'a> SessionTx<'a> {
kv_bindings.extend(v_bindings); kv_bindings.extend(v_bindings);
let kv_bindings = kv_bindings; let kv_bindings = kv_bindings;
if propagate_triggers {
for trigger in &relation_store.rm_triggers { for trigger in &relation_store.rm_triggers {
let mut program = let mut program =
parse_script(trigger, &Default::default(), &db.algorithms, cur_vld)? parse_script(trigger, &Default::default(), &db.algorithms, cur_vld)?
@ -188,7 +213,7 @@ impl<'a> SessionTx<'a> {
); );
let (_, cleanups) = db let (_, cleanups) = db
.run_query(self, program, cur_vld, callback_targets, callback_collector) .run_query(self, program, cur_vld, callback_targets, callback_collector, false)
.map_err(|err| { .map_err(|err| {
if err.source_code().is_some() { if err.source_code().is_some() {
err err
@ -198,6 +223,7 @@ impl<'a> SessionTx<'a> {
})?; })?;
to_clear.extend(cleanups); to_clear.extend(cleanups);
} }
}
if is_callback_target { if is_callback_target {
let target_collector = callback_collector let target_collector = callback_collector
@ -347,7 +373,9 @@ impl<'a> SessionTx<'a> {
)?; )?;
let need_to_collect = !relation_store.is_temp let need_to_collect = !relation_store.is_temp
&& (is_callback_target || !relation_store.put_triggers.is_empty()); && (is_callback_target
|| (propagate_triggers && !relation_store.put_triggers.is_empty()));
let has_indices = !relation_store.indices.is_empty();
let mut new_tuples: Vec<DataValue> = vec![]; let mut new_tuples: Vec<DataValue> = vec![];
let mut old_tuples: Vec<DataValue> = vec![]; let mut old_tuples: Vec<DataValue> = vec![];
@ -368,20 +396,54 @@ impl<'a> SessionTx<'a> {
let key = relation_store.encode_key_for_store(&extracted, *span)?; let key = relation_store.encode_key_for_store(&extracted, *span)?;
let val = relation_store.encode_val_for_store(&extracted, *span)?; let val = relation_store.encode_val_for_store(&extracted, *span)?;
if need_to_collect { if need_to_collect || has_indices {
if let Some(existing) = self.store_tx.get(&key, false)? { if let Some(existing) = self.store_tx.get(&key, false)? {
let mut tup = extracted.clone(); let mut tup = extracted.clone();
let mut remaining = &existing[ENCODED_KEY_MIN_LEN..]; if !existing.is_empty() {
while !remaining.is_empty() { extend_tuple_from_v(&mut tup, &existing);
let (val, nxt) = DataValue::decode_from_key(remaining); }
tup.push(val); if has_indices {
remaining = nxt; if extracted != tup {
for (idx_rel, extractor) in relation_store.indices.values() {
let idx_tup_old =
extractor.iter().map(|i| tup[*i].clone()).collect_vec();
let encoded_old = idx_rel.encode_key_for_store(
&idx_tup_old,
Default::default(),
)?;
self.store_tx.del(&encoded_old)?;
let idx_tup_new = extractor
.iter()
.map(|i| extracted[*i].clone())
.collect_vec();
let encoded_new = idx_rel.encode_key_for_store(
&idx_tup_new,
Default::default(),
)?;
self.store_tx.put(&encoded_new, &[])?;
} }
}
}
if need_to_collect {
old_tuples.push(DataValue::List(tup)); old_tuples.push(DataValue::List(tup));
} }
} else if has_indices {
for (idx_rel, extractor) in relation_store.indices.values() {
let idx_tup_new = extractor
.iter()
.map(|i| extracted[*i].clone())
.collect_vec();
let encoded_new = idx_rel
.encode_key_for_store(&idx_tup_new, Default::default())?;
self.store_tx.put(&encoded_new, &[])?;
}
}
if need_to_collect {
new_tuples.push(DataValue::List(extracted)); new_tuples.push(DataValue::List(extracted));
} }
}
if relation_store.is_temp { if relation_store.is_temp {
self.temp_store_tx.put(&key, &val)?; self.temp_store_tx.put(&key, &val)?;
@ -405,6 +467,7 @@ impl<'a> SessionTx<'a> {
bindings.extend(v_bindings); bindings.extend(v_bindings);
let kv_bindings = bindings; let kv_bindings = bindings;
if propagate_triggers {
for trigger in &relation_store.put_triggers { for trigger in &relation_store.put_triggers {
let mut program = let mut program =
parse_script(trigger, &Default::default(), &db.algorithms, cur_vld)? parse_script(trigger, &Default::default(), &db.algorithms, cur_vld)?
@ -424,7 +487,7 @@ impl<'a> SessionTx<'a> {
); );
let (_, cleanups) = db let (_, cleanups) = db
.run_query(self, program, cur_vld, callback_targets, callback_collector) .run_query(self, program, cur_vld, callback_targets, callback_collector, false)
.map_err(|err| { .map_err(|err| {
if err.source_code().is_some() { if err.source_code().is_some() {
err err
@ -434,6 +497,7 @@ impl<'a> SessionTx<'a> {
})?; })?;
to_clear.extend(cleanups); to_clear.extend(cleanups);
} }
}
if is_callback_target { if is_callback_target {
let target_collector = callback_collector let target_collector = callback_collector

@ -6,11 +6,11 @@
* You can obtain one at https://mozilla.org/MPL/2.0/. * You can obtain one at https://mozilla.org/MPL/2.0/.
*/ */
use crossbeam::sync::ShardedLock;
use std::collections::btree_map::Entry; use std::collections::btree_map::Entry;
use std::collections::{BTreeMap, BTreeSet}; use std::collections::{BTreeMap, BTreeSet};
use std::default::Default; use std::default::Default;
use std::fmt::{Debug, Formatter}; use std::fmt::{Debug, Formatter};
use std::iter;
use std::path::Path; use std::path::Path;
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering}; use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
@ -20,6 +20,7 @@ use std::thread;
use std::time::{Duration, SystemTime, UNIX_EPOCH}; use std::time::{Duration, SystemTime, UNIX_EPOCH};
use crossbeam::channel::{unbounded, Sender}; use crossbeam::channel::{unbounded, Sender};
use crossbeam::sync::ShardedLock;
use either::{Either, Left, Right}; use either::{Either, Left, Right};
use itertools::Itertools; use itertools::Itertools;
#[allow(unused_imports)] #[allow(unused_imports)]
@ -88,6 +89,11 @@ pub struct CallbackDeclaration {
callback: Box<dyn Fn(CallbackOp, NamedRows, NamedRows) + Send + Sync>, callback: Box<dyn Fn(CallbackOp, NamedRows, NamedRows) + Send + Sync>,
} }
pub(crate) type CallbackCollector = BTreeMap<
SmartString<LazyCompact>,
Vec<(CallbackOp, NamedRows, NamedRows)>,
>;
/// The database object of Cozo. /// The database object of Cozo.
#[derive(Clone)] #[derive(Clone)]
pub struct Db<S> { pub struct Db<S> {
@ -270,6 +276,10 @@ impl<'s, S: Storage<'s>> Db<S> {
#[diagnostic(code(import::bad_data))] #[diagnostic(code(import::bad_data))]
struct BadDataForRelation(String, JsonValue); struct BadDataForRelation(String, JsonValue);
let rel_names = data.keys().map(|k| SmartString::from(k)).collect_vec();
let locks = self.obtain_relation_locks(rel_names.iter());
let _guards = locks.iter().map(|l| l.read().unwrap()).collect_vec();
let cur_vld = current_validity(); let cur_vld = current_validity();
let mut tx = self.transact_write()?; let mut tx = self.transact_write()?;
@ -431,6 +441,10 @@ impl<'s, S: Storage<'s>> Db<S> {
#[cfg(feature = "storage-sqlite")] #[cfg(feature = "storage-sqlite")]
{ {
let rel_names = relations.iter().map(|n| SmartString::from(n)).collect_vec();
let locks = self.obtain_relation_locks(rel_names.iter());
let _guards = locks.iter().map(|l| l.read().unwrap()).collect_vec();
let source_db = crate::new_cozo_sqlite(in_file)?; let source_db = crate::new_cozo_sqlite(in_file)?;
let mut src_tx = source_db.transact()?; let mut src_tx = source_db.transact()?;
let mut dst_tx = self.transact_write()?; let mut dst_tx = self.transact_write()?;
@ -598,10 +612,7 @@ impl<'s, S: Storage<'s>> Db<S> {
cur_vld: ValidityTs, cur_vld: ValidityTs,
span: SourceSpan, span: SourceSpan,
callback_targets: &BTreeSet<SmartString<LazyCompact>>, callback_targets: &BTreeSet<SmartString<LazyCompact>>,
callback_collector: &mut BTreeMap< callback_collector: &mut CallbackCollector,
SmartString<LazyCompact>,
Vec<(CallbackOp, NamedRows, NamedRows)>,
>,
) -> Result<bool> { ) -> Result<bool> {
let res = match p { let res = match p {
Left(rel) => { Left(rel) => {
@ -637,15 +648,12 @@ impl<'s, S: Storage<'s>> Db<S> {
cleanups: &mut Vec<(Vec<u8>, Vec<u8>)>, cleanups: &mut Vec<(Vec<u8>, Vec<u8>)>,
cur_vld: ValidityTs, cur_vld: ValidityTs,
callback_targets: &BTreeSet<SmartString<LazyCompact>>, callback_targets: &BTreeSet<SmartString<LazyCompact>>,
callback_collector: &mut BTreeMap< callback_collector: &mut CallbackCollector,
SmartString<LazyCompact>,
Vec<(CallbackOp, NamedRows, NamedRows)>,
>,
) -> Result<NamedRows> { ) -> Result<NamedRows> {
#[allow(unused_variables)] #[allow(unused_variables)]
let sleep_opt = p.out_opts.sleep; let sleep_opt = p.out_opts.sleep;
let (q_res, q_cleanups) = let (q_res, q_cleanups) =
self.run_query(tx, p, cur_vld, callback_targets, callback_collector)?; self.run_query(tx, p, cur_vld, callback_targets, callback_collector, true)?;
cleanups.extend(q_cleanups); cleanups.extend(q_cleanups);
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
if let Some(secs) = sleep_opt { if let Some(secs) = sleep_opt {
@ -664,7 +672,7 @@ impl<'s, S: Storage<'s>> Db<S> {
} }
fn send_callbacks( fn send_callbacks(
&'s self, &'s self,
collector: BTreeMap<SmartString<LazyCompact>, Vec<(CallbackOp, NamedRows, NamedRows)>>, collector: CallbackCollector,
) { ) {
for (k, vals) in collector { for (k, vals) in collector {
for (op, new, old) in vals { for (op, new, old) in vals {
@ -738,7 +746,7 @@ impl<'s, S: Storage<'s>> Db<S> {
} }
let is_write = !write_lock_names.is_empty(); let is_write = !write_lock_names.is_empty();
let write_lock = self.obtain_relation_locks(write_lock_names.iter()); let write_lock = self.obtain_relation_locks(write_lock_names.iter());
let _write_lock_guards = write_lock.iter().map(|l| l.read()).collect_vec(); let _write_lock_guards = write_lock.iter().map(|l| l.read().unwrap()).collect_vec();
let callback_targets = if is_write { let callback_targets = if is_write {
self.current_callback_targets() self.current_callback_targets()
@ -803,10 +811,7 @@ impl<'s, S: Storage<'s>> Db<S> {
cleanups: &mut Vec<(Vec<u8>, Vec<u8>)>, cleanups: &mut Vec<(Vec<u8>, Vec<u8>)>,
cur_vld: ValidityTs, cur_vld: ValidityTs,
callback_targets: &BTreeSet<SmartString<LazyCompact>>, callback_targets: &BTreeSet<SmartString<LazyCompact>>,
callback_collector: &mut BTreeMap< callback_collector: &mut CallbackCollector,
SmartString<LazyCompact>,
Vec<(CallbackOp, NamedRows, NamedRows)>,
>,
) -> Result<Either<NamedRows, ControlCode>> { ) -> Result<Either<NamedRows, ControlCode>> {
let mut ret = NamedRows::default(); let mut ret = NamedRows::default();
for p in ps { for p in ps {
@ -1166,6 +1171,9 @@ impl<'s, S: Storage<'s>> Db<S> {
} }
SysOp::ListRelations => self.list_relations(), SysOp::ListRelations => self.list_relations(),
SysOp::RemoveRelation(rel_names) => { SysOp::RemoveRelation(rel_names) => {
let rel_name_strs = rel_names.iter().map(|n| &n.name);
let locks = self.obtain_relation_locks(rel_name_strs);
let _guards = locks.iter().map(|l| l.read().unwrap()).collect_vec();
let mut bounds = vec![]; let mut bounds = vec![];
{ {
let mut tx = self.transact_write()?; let mut tx = self.transact_write()?;
@ -1183,8 +1191,33 @@ impl<'s, S: Storage<'s>> Db<S> {
rows: vec![vec![DataValue::from(OK_STR)]], rows: vec![vec![DataValue::from(OK_STR)]],
}) })
} }
SysOp::CreateIndex(rel_name, idx_name, cols) => {
let lock = self.obtain_relation_locks(iter::once(&rel_name.name)).pop().unwrap();
let _guard = lock.write().unwrap();
let mut tx = self.transact_write()?;
tx.create_index(&rel_name, &idx_name, cols)?;
tx.commit_tx()?;
Ok(NamedRows {
headers: vec![STATUS_STR.to_string()],
rows: vec![vec![DataValue::from(OK_STR)]],
})
}
SysOp::RemoveIndex(rel_name, idx_name) => {
let lock = self.obtain_relation_locks(iter::once(&rel_name.name)).pop().unwrap();
let _guard = lock.read().unwrap();
let mut tx = self.transact_write()?;
tx.remove_index(&rel_name, &idx_name)?;
tx.commit_tx()?;
Ok(NamedRows {
headers: vec![STATUS_STR.to_string()],
rows: vec![vec![DataValue::from(OK_STR)]],
})
}
SysOp::ListRelation(rs) => self.list_relation(&rs), SysOp::ListRelation(rs) => self.list_relation(&rs),
SysOp::RenameRelation(rename_pairs) => { SysOp::RenameRelation(rename_pairs) => {
let rel_names = rename_pairs.iter().flat_map(|(f, t)| [&f.name, &t.name]);
let locks = self.obtain_relation_locks(rel_names);
let _guards = locks.iter().map(|l| l.read().unwrap()).collect_vec();
let mut tx = self.transact_write()?; let mut tx = self.transact_write()?;
for (old, new) in rename_pairs { for (old, new) in rename_pairs {
tx.rename_relation(old, new)?; tx.rename_relation(old, new)?;
@ -1268,10 +1301,8 @@ impl<'s, S: Storage<'s>> Db<S> {
input_program: InputProgram, input_program: InputProgram,
cur_vld: ValidityTs, cur_vld: ValidityTs,
callback_targets: &BTreeSet<SmartString<LazyCompact>>, callback_targets: &BTreeSet<SmartString<LazyCompact>>,
callback_collector: &mut BTreeMap< callback_collector: &mut CallbackCollector,
SmartString<LazyCompact>, top_level: bool
Vec<(CallbackOp, NamedRows, NamedRows)>,
>,
) -> Result<(NamedRows, Vec<(Vec<u8>, Vec<u8>)>)> { ) -> Result<(NamedRows, Vec<(Vec<u8>, Vec<u8>)>)> {
// cleanups contain stored relations that should be deleted at the end of query // cleanups contain stored relations that should be deleted at the end of query
let mut clean_ups = vec![]; let mut clean_ups = vec![];
@ -1416,6 +1447,7 @@ impl<'s, S: Storage<'s>> Db<S> {
cur_vld, cur_vld,
callback_targets, callback_targets,
callback_collector, callback_collector,
top_level
) )
.wrap_err_with(|| format!("when executing against relation '{}'", meta.name))?; .wrap_err_with(|| format!("when executing against relation '{}'", meta.name))?;
clean_ups.extend(to_clear); clean_ups.extend(to_clear);
@ -1470,6 +1502,7 @@ impl<'s, S: Storage<'s>> Db<S> {
cur_vld, cur_vld,
callback_targets, callback_targets,
callback_collector, callback_collector,
top_level
) )
.wrap_err_with(|| format!("when executing against relation '{}'", meta.name))?; .wrap_err_with(|| format!("when executing against relation '{}'", meta.name))?;
clean_ups.extend(to_clear); clean_ups.extend(to_clear);

@ -6,6 +6,7 @@
* You can obtain one at https://mozilla.org/MPL/2.0/. * You can obtain one at https://mozilla.org/MPL/2.0/.
*/ */
use std::collections::BTreeMap;
use std::fmt::{Debug, Display, Formatter}; use std::fmt::{Debug, Display, Formatter};
use std::sync::atomic::Ordering; use std::sync::atomic::Ordering;
@ -73,6 +74,8 @@ pub(crate) struct RelationHandle {
pub(crate) access_level: AccessLevel, pub(crate) access_level: AccessLevel,
#[serde(default)] #[serde(default)]
pub(crate) is_temp: bool, pub(crate) is_temp: bool,
#[serde(default)]
pub(crate) indices: BTreeMap<SmartString<LazyCompact>, (RelationHandle, Vec<usize>)>,
} }
#[derive( #[derive(
@ -483,6 +486,7 @@ impl<'a> SessionTx<'a> {
replace_triggers: vec![], replace_triggers: vec![],
access_level: AccessLevel::Normal, access_level: AccessLevel::Normal,
is_temp, is_temp,
indices: Default::default(),
}; };
let name_key = vec![DataValue::Str(meta.name.clone())].encode_as_key(RelationId::SYSTEM); let name_key = vec![DataValue::Str(meta.name.clone())].encode_as_key(RelationId::SYSTEM);
@ -537,6 +541,11 @@ impl<'a> SessionTx<'a> {
store.access_level store.access_level
)) ))
} }
for k in store.indices.keys() {
self.destroy_relation(&format!("{}:{}", name, k))?;
}
let key = DataValue::from(name); let key = DataValue::from(name);
let encoded = vec![key].encode_as_key(RelationId::SYSTEM); let encoded = vec![key].encode_as_key(RelationId::SYSTEM);
self.store_tx.del(&encoded)?; self.store_tx.del(&encoded)?;
@ -557,6 +566,162 @@ impl<'a> SessionTx<'a> {
Ok(()) Ok(())
} }
pub(crate) fn create_index(
&mut self,
rel_name: &Symbol,
idx_name: &Symbol,
cols: Vec<Symbol>,
) -> Result<()> {
let mut rel_handle = self.get_relation(rel_name, true)?;
if rel_handle.indices.contains_key(&idx_name.name) {
#[derive(Debug, Error, Diagnostic)]
#[error("index {0} for relation {1} already exists")]
#[diagnostic(code(tx::index_already_exists))]
pub(crate) struct IndexAlreadyExists(String, String);
bail!(IndexAlreadyExists(
idx_name.name.to_string(),
rel_name.name.to_string()
));
}
let mut col_defs = vec![];
for col in cols.iter() {
for orig_col in rel_handle
.metadata
.keys
.iter()
.chain(rel_handle.metadata.non_keys.iter())
{
if orig_col.name == col.name {
col_defs.push(orig_col.clone());
break;
}
}
#[derive(Debug, Error, Diagnostic)]
#[error("column {0} in index {1} for relation {2} not found")]
#[diagnostic(code(tx::col_in_idx_not_found))]
pub(crate) struct ColInIndexNotFound(String, String, String);
bail!(ColInIndexNotFound(
col.name.to_string(),
idx_name.name.to_string(),
rel_name.name.to_string()
));
}
for key in rel_handle.metadata.keys.iter() {
for col in cols.iter() {
if col.name == key.name {
break;
}
}
col_defs.push(key.clone());
}
let key_bindings = col_defs
.iter()
.map(|col| Symbol::new(col.name.clone(), Default::default()))
.collect_vec();
let idx_meta = StoredRelationMetadata {
keys: col_defs,
non_keys: vec![],
};
let idx_handle = InputRelationHandle {
name: Symbol::new(
format!("{}:{}", rel_name.name, idx_name.name),
Default::default(),
),
metadata: idx_meta,
key_bindings,
dep_bindings: vec![],
span: Default::default(),
};
let idx_handle = self.create_relation(idx_handle)?;
// populate index
let extraction_indices = idx_handle
.metadata
.keys
.iter()
.map(|col| {
for (i, kc) in rel_handle.metadata.keys.iter().enumerate() {
if kc.name == col.name {
return i;
}
}
for (i, kc) in rel_handle.metadata.non_keys.iter().enumerate() {
if kc.name == col.name {
return i + rel_handle.metadata.keys.len();
}
}
unreachable!()
})
.collect_vec();
if self.store_tx.supports_par_put() {
for tuple in rel_handle.scan_all(self) {
let tuple = tuple?;
let extracted = extraction_indices
.iter()
.map(|idx| tuple[*idx].clone())
.collect_vec();
let key = idx_handle.encode_key_for_store(&extracted, Default::default())?;
self.store_tx.par_put(&key, &[])?;
}
} else {
for tuple in rel_handle.scan_all(self).collect_vec() {
let tuple = tuple?;
let extracted = extraction_indices
.iter()
.map(|idx| tuple[*idx].clone())
.collect_vec();
let key = idx_handle.encode_key_for_store(&extracted, Default::default())?;
self.store_tx.put(&key, &[])?;
}
}
rel_handle
.indices
.insert(idx_name.name.clone(), (idx_handle, extraction_indices));
let new_encoded =
vec![DataValue::from(&rel_name.name as &str)].encode_as_key(RelationId::SYSTEM);
let mut meta_val = vec![];
rel_handle
.serialize(&mut Serializer::new(&mut meta_val))
.unwrap();
self.store_tx.put(&new_encoded, &meta_val)?;
Ok(())
}
pub(crate) fn remove_index(&mut self, rel_name: &Symbol, idx_name: &Symbol) -> Result<()> {
let mut rel = self.get_relation(rel_name, true)?;
if rel.indices.remove(&idx_name.name).is_none() {
#[derive(Debug, Error, Diagnostic)]
#[error("index {0} for relation {1} not found")]
#[diagnostic(code(tx::idx_not_found))]
pub(crate) struct IndexNotFound(String, String);
bail!(IndexNotFound(idx_name.to_string(), rel_name.to_string()));
}
self.destroy_relation(&format!("{}:{}", rel_name.name, idx_name.name))?;
let new_encoded =
vec![DataValue::from(&rel_name.name as &str)].encode_as_key(RelationId::SYSTEM);
let mut meta_val = vec![];
rel.serialize(&mut Serializer::new(&mut meta_val)).unwrap();
self.store_tx.put(&new_encoded, &meta_val)?;
Ok(())
}
pub(crate) fn rename_relation(&mut self, old: Symbol, new: Symbol) -> Result<()> { pub(crate) fn rename_relation(&mut self, old: Symbol, new: Symbol) -> Result<()> {
if old.name.starts_with('_') || new.name.starts_with('_') { if old.name.starts_with('_') || new.name.starts_with('_') {
bail!("Bad name given"); bail!("Bad name given");
@ -613,6 +778,7 @@ impl<'a> SessionTx<'a> {
#[derive(Debug, Error, Diagnostic)] #[derive(Debug, Error, Diagnostic)]
#[error("Insufficient access level {2} for {1} on stored relation '{0}'")] #[error("Insufficient access level {2} for {1} on stored relation '{0}'")]
#[diagnostic(code(tx::insufficient_access_level))]
pub(crate) struct InsufficientAccessLevel( pub(crate) struct InsufficientAccessLevel(
pub(crate) String, pub(crate) String,
pub(crate) String, pub(crate) String,

@ -129,6 +129,14 @@ impl<'s> StoreTx<'s> for MemTx<'s> {
} }
} }
fn supports_par_put(&self) -> bool {
false
}
fn par_put(&self, _key: &[u8], _val: &[u8]) -> Result<()> {
panic!()
}
fn del(&mut self, key: &[u8]) -> Result<()> { fn del(&mut self, key: &[u8]) -> Result<()> {
match self { match self {
MemTx::Reader(_) => { MemTx::Reader(_) => {

@ -66,6 +66,15 @@ pub trait StoreTx<'s>: Sync {
/// the storage engine needs to overwrite the old value. /// the storage engine needs to overwrite the old value.
fn put(&mut self, key: &[u8], val: &[u8]) -> Result<()>; fn put(&mut self, key: &[u8], val: &[u8]) -> Result<()>;
/// Should return true if the engine supports parallel put, false otherwise.
fn supports_par_put(&self) -> bool;
/// Put a key-value pair into the storage. In case of existing key,
/// the storage engine needs to overwrite the old value.
/// The difference between this one and `put` is the mutability of self.
/// It is OK to always panic if `supports_par_put` returns `true`.
fn par_put(&self, key: &[u8], val: &[u8]) -> Result<()>;
/// Delete a key-value pair from the storage. /// Delete a key-value pair from the storage.
fn del(&mut self, key: &[u8]) -> Result<()>; fn del(&mut self, key: &[u8]) -> Result<()>;

@ -171,6 +171,14 @@ impl<'s> StoreTx<'s> for RocksDbTx {
Ok(self.db_tx.put(key, val)?) Ok(self.db_tx.put(key, val)?)
} }
fn supports_par_put(&self) -> bool {
true
}
fn par_put(&self, key: &[u8], val: &[u8]) -> Result<()> {
Ok(self.db_tx.put(key, val)?)
}
#[inline] #[inline]
fn del(&mut self, key: &[u8]) -> Result<()> { fn del(&mut self, key: &[u8]) -> Result<()> {
Ok(self.db_tx.del(key)?) Ok(self.db_tx.del(key)?)

@ -138,6 +138,14 @@ impl<'s> StoreTx<'s> for SledTx {
Ok(()) Ok(())
} }
fn supports_par_put(&self) -> bool {
false
}
fn par_put(&self, _key: &[u8], _val: &[u8]) -> Result<()> {
panic!()
}
#[inline] #[inline]
fn del(&mut self, key: &[u8]) -> Result<()> { fn del(&mut self, key: &[u8]) -> Result<()> {
self.ensure_changes_db()?; self.ensure_changes_db()?;

@ -8,9 +8,9 @@
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use crossbeam::sync::{ShardedLock, ShardedLockReadGuard, ShardedLockWriteGuard};
use ::sqlite::Connection; use ::sqlite::Connection;
use crossbeam::sync::{ShardedLock, ShardedLockReadGuard, ShardedLockWriteGuard};
use either::{Either, Left, Right}; use either::{Either, Left, Right};
use miette::{bail, miette, IntoDiagnostic, Result}; use miette::{bail, miette, IntoDiagnostic, Result};
use sqlite::{ConnectionWithFullMutex, State, Statement}; use sqlite::{ConnectionWithFullMutex, State, Statement};
@ -216,6 +216,14 @@ impl<'s> StoreTx<'s> for SqliteTx<'s> {
} }
fn put(&mut self, key: &[u8], val: &[u8]) -> Result<()> { fn put(&mut self, key: &[u8], val: &[u8]) -> Result<()> {
self.par_put(key, val)
}
fn supports_par_put(&self) -> bool {
true
}
fn par_put(&self, key: &[u8], val: &[u8]) -> Result<()> {
self.ensure_stmt(PUT_QUERY); self.ensure_stmt(PUT_QUERY);
let mut statement = self.stmts[PUT_QUERY].lock().unwrap(); let mut statement = self.stmts[PUT_QUERY].lock().unwrap();
let statement = statement.as_mut().unwrap(); let statement = statement.as_mut().unwrap();

@ -63,6 +63,14 @@ impl<'s> StoreTx<'s> for TempTx {
Ok(()) Ok(())
} }
fn supports_par_put(&self) -> bool {
false
}
fn par_put(&self, _key: &[u8], _val: &[u8]) -> Result<()> {
panic!()
}
fn del(&mut self, key: &[u8]) -> Result<()> { fn del(&mut self, key: &[u8]) -> Result<()> {
self.store.remove(key); self.store.remove(key);
Ok(()) Ok(())

@ -124,6 +124,14 @@ impl<'s> StoreTx<'s> for TiKvTx {
} }
fn put(&mut self, key: &[u8], val: &[u8]) -> Result<()> { fn put(&mut self, key: &[u8], val: &[u8]) -> Result<()> {
self.par_put(key, val)
}
fn supports_par_put(&self) -> bool {
true
}
fn par_put(&self, key: &[u8], val: &[u8]) -> Result<()> {
RT.block_on(self.tx.lock().unwrap().put(key.to_owned(), val.to_owned())) RT.block_on(self.tx.lock().unwrap().put(key.to_owned(), val.to_owned()))
.into_diagnostic() .into_diagnostic()
} }

@ -105,11 +105,11 @@ struct TxBridge {
} }
} }
inline void put(RustBytes key, RustBytes val, RocksDbStatus &status) { inline void put(RustBytes key, RustBytes val, RocksDbStatus &status) const {
write_status(tx->Put(convert_slice(key), convert_slice(val)), status); write_status(tx->Put(convert_slice(key), convert_slice(val)), status);
} }
inline void del(RustBytes key, RocksDbStatus &status) { inline void del(RustBytes key, RocksDbStatus &status) const {
write_status(tx->Delete(convert_slice(key)), status); write_status(tx->Delete(convert_slice(key)), status);
} }

@ -182,12 +182,12 @@ pub(crate) mod ffi {
status: &mut RocksDbStatus, status: &mut RocksDbStatus,
); );
fn put( fn put(
self: Pin<&mut TxBridge>, self: &TxBridge,
key: &[u8], key: &[u8],
val: &[u8], val: &[u8],
status: &mut RocksDbStatus, status: &mut RocksDbStatus,
); );
fn del(self: Pin<&mut TxBridge>, key: &[u8], status: &mut RocksDbStatus); fn del(self: &TxBridge, key: &[u8], status: &mut RocksDbStatus);
fn commit(self: Pin<&mut TxBridge>, status: &mut RocksDbStatus); fn commit(self: Pin<&mut TxBridge>, status: &mut RocksDbStatus);
fn rollback(self: Pin<&mut TxBridge>, status: &mut RocksDbStatus); fn rollback(self: Pin<&mut TxBridge>, status: &mut RocksDbStatus);
fn rollback_to_savepoint(self: Pin<&mut TxBridge>, status: &mut RocksDbStatus); fn rollback_to_savepoint(self: Pin<&mut TxBridge>, status: &mut RocksDbStatus);

@ -98,9 +98,9 @@ impl Tx {
self.inner.pin_mut().clear_snapshot() self.inner.pin_mut().clear_snapshot()
} }
#[inline] #[inline]
pub fn put(&mut self, key: &[u8], val: &[u8]) -> Result<(), RocksDbStatus> { pub fn put(&self, key: &[u8], val: &[u8]) -> Result<(), RocksDbStatus> {
let mut status = RocksDbStatus::default(); let mut status = RocksDbStatus::default();
self.inner.pin_mut().put(key, val, &mut status); self.inner.put(key, val, &mut status);
if status.is_ok() { if status.is_ok() {
Ok(()) Ok(())
} else { } else {
@ -108,9 +108,9 @@ impl Tx {
} }
} }
#[inline] #[inline]
pub fn del(&mut self, key: &[u8]) -> Result<(), RocksDbStatus> { pub fn del(&self, key: &[u8]) -> Result<(), RocksDbStatus> {
let mut status = RocksDbStatus::default(); let mut status = RocksDbStatus::default();
self.inner.pin_mut().del(key, &mut status); self.inner.del(key, &mut status);
if status.is_ok() { if status.is_ok() {
Ok(()) Ok(())
} else { } else {

Loading…
Cancel
Save