more guardrails for preventing messing up indices

main
Ziyang Hu 2 years ago
parent 6a5f55db64
commit caa41419ae

@ -23,13 +23,13 @@ use crate::data::value::{DataValue, ValidityTs};
use crate::fixed_rule::utilities::constant::Constant;
use crate::fixed_rule::FixedRuleHandle;
use crate::parse::parse_script;
use crate::runtime::callback::{CallbackCollector, CallbackOp};
use crate::runtime::relation::{
extend_tuple_from_v, AccessLevel, InputRelationHandle, InsufficientAccessLevel,
};
use crate::runtime::transact::SessionTx;
use crate::storage::Storage;
use crate::{Db, NamedRows, StoreTx};
use crate::runtime::callback::{CallbackCollector, CallbackOp};
#[derive(Debug, Error, Diagnostic)]
#[error("attempting to write into relation {0} of arity {1} with data of arity {2}")]
@ -155,9 +155,7 @@ impl<'a> SessionTx<'a> {
if need_to_collect || has_indices {
if let Some(existing) = self.store_tx.get(&key, false)? {
let mut tup = extracted.clone();
if !existing.is_empty() {
extend_tuple_from_v(&mut tup, &existing);
}
extend_tuple_from_v(&mut tup, &existing);
if has_indices {
for (idx_rel, extractor) in relation_store.indices.values() {
let idx_tup =
@ -417,11 +415,8 @@ impl<'a> SessionTx<'a> {
if need_to_collect || has_indices {
if let Some(existing) = self.store_tx.get(&key, false)? {
let mut tup = extracted.clone();
if !existing.is_empty() {
tup.truncate(relation_store.metadata.keys.len());
extend_tuple_from_v(&mut tup, &existing);
}
let mut tup = extracted[0..relation_store.metadata.keys.len()].to_vec();
extend_tuple_from_v(&mut tup, &existing);
if has_indices && extracted != tup {
for (idx_rel, extractor) in relation_store.indices.values() {
let idx_tup_old =

@ -49,7 +49,9 @@ use crate::query::ra::{
use crate::runtime::callback::{
CallbackCollector, CallbackDeclaration, CallbackOp, EventCallbackRegistry,
};
use crate::runtime::relation::{AccessLevel, InsufficientAccessLevel, RelationHandle, RelationId};
use crate::runtime::relation::{
extend_tuple_from_v, AccessLevel, InsufficientAccessLevel, RelationHandle, RelationId,
};
use crate::runtime::transact::SessionTx;
use crate::storage::temp::TempStorage;
use crate::storage::{Storage, StoreTx};
@ -109,6 +111,11 @@ impl<S> Debug for Db<S> {
#[diagnostic(code(db::init))]
pub(crate) struct BadDbInit(#[help] pub(crate) String);
#[derive(Debug, Error, Diagnostic)]
#[error("Cannot import data into relation {0} as it is an index")]
#[diagnostic(code(tx::import_into_index))]
pub(crate) struct ImportIntoIndex(pub(crate) String);
#[derive(serde_derive::Serialize, serde_derive::Deserialize, Debug, Clone, Default)]
/// Rows in a relation, together with headers for the fields.
pub struct NamedRows {
@ -249,8 +256,8 @@ impl<'s, S: Storage<'s>> Db<S> {
/// what was returned by [Self::export_relations].
/// The target stored relations must already exist in the database.
///
/// Note that triggers are _not_ run for the relations, if any exists.
/// If you need to activate triggers, use queries with parameters.
/// Note that triggers and callbacks are _not_ run for the relations, if any exists.
/// If you need to activate triggers or callbacks, use queries with parameters.
pub fn import_relations(&'s self, data: BTreeMap<String, NamedRows>) -> Result<()> {
#[derive(Debug, Diagnostic, Error)]
#[error("cannot import data for relation '{0}': {1}")]
@ -277,7 +284,11 @@ impl<'s, S: Storage<'s>> Db<S> {
s
}
};
if relation.contains(':') {
bail!(ImportIntoIndex(relation.to_string()))
}
let handle = tx.get_relation(relation, false)?;
let has_indices = !handle.indices.is_empty();
if handle.access_level < AccessLevel::Protected {
bail!(InsufficientAccessLevel(
@ -341,6 +352,21 @@ impl<'s, S: Storage<'s>> Db<S> {
})
.try_collect()?;
let k_store = handle.encode_key_for_store(&keys, Default::default())?;
if has_indices {
if let Some(existing) = tx.store_tx.get(&k_store, false)? {
let mut old = keys.clone();
extend_tuple_from_v(&mut old, &existing);
if is_delete || old != row {
for (idx_rel, extractor) in handle.indices.values() {
let idx_tup =
extractor.iter().map(|i| old[*i].clone()).collect_vec();
let encoded =
idx_rel.encode_key_for_store(&idx_tup, Default::default())?;
tx.store_tx.del(&encoded)?;
}
}
}
}
if is_delete {
tx.store_tx.del(&k_store)?;
} else {
@ -355,6 +381,16 @@ impl<'s, S: Storage<'s>> Db<S> {
.try_collect()?;
let v_store = handle.encode_val_only_for_store(&vals, Default::default())?;
tx.store_tx.put(&k_store, &v_store)?;
if has_indices {
let mut kv = keys;
kv.extend(vals);
for (idx_rel, extractor) in handle.indices.values() {
let idx_tup = extractor.iter().map(|i| kv[*i].clone()).collect_vec();
let encoded =
idx_rel.encode_key_for_store(&idx_tup, Default::default())?;
tx.store_tx.put(&encoded, &[])?;
}
}
}
}
}
@ -409,8 +445,8 @@ impl<'s, S: Storage<'s>> Db<S> {
/// Import data from relations in a backup file.
/// The target stored relations must already exist in the database.
///
/// Note that triggers are _not_ run for the relations, if any exists.
/// If you need to activate triggers, use queries with parameters.
/// Note that triggers and callbacks are _not_ run for the relations, if any exists.
/// If you need to activate triggers or callbacks, use queries with parameters.
#[allow(unused_variables)]
pub fn import_from_backup(
&'s self,
@ -431,9 +467,22 @@ impl<'s, S: Storage<'s>> Db<S> {
let mut dst_tx = self.transact_write()?;
for relation in relations {
if relation.contains(':') {
bail!(ImportIntoIndex(relation.to_string()))
}
let src_handle = src_tx.get_relation(relation, false)?;
let dst_handle = dst_tx.get_relation(relation, false)?;
if !dst_handle.indices.is_empty() {
#[derive(Debug, Error, Diagnostic)]
#[error("Cannot import data into relation {0} from backup as the relation has indices")]
#[diagnostic(code(tx::bare_import_with_indices))]
#[diagnostic(help("Use `import_relations()` instead"))]
pub(crate) struct RestoreIntoRelWithIndices(pub(crate) String);
bail!(RestoreIntoRelWithIndices(dst_handle.name.to_string()))
}
if dst_handle.access_level < AccessLevel::Protected {
bail!(InsufficientAccessLevel(
dst_handle.name.to_string(),

Loading…
Cancel
Save