refactor store ops

main
Ziyang Hu 1 year ago
parent a880431726
commit 4c3fb1fdde

@ -13,11 +13,13 @@ query_script_inner_no_bracket = { (option | rule | const_rule | fixed_rule)+ }
imperative_script = {SOI ~ imperative_stmt+ ~ EOI}
sys_script = {SOI ~ "::" ~ (list_relations_op | list_relation_op | remove_relations_op | trigger_relation_op |
trigger_relation_show_op | rename_relations_op | running_op | kill_op | explain_op |
access_level_op | index_op | vec_idx_op | compact_op | list_fixed_rules) ~ EOI}
access_level_op | index_op | vec_idx_op | fts_idx_op | lsh_idx_op | compact_op | list_fixed_rules) ~ EOI}
index_op = {"index" ~ (index_create | index_drop)}
vec_idx_op = {"hnsw" ~ (index_create_hnsw | index_drop)}
vec_idx_op = {"hnsw" ~ (index_create_adv | index_drop)}
fts_idx_op = {"fts" ~ (index_create_adv | index_drop)}
lsh_idx_op = {"lsh" ~ (index_create_adv | index_drop)}
index_create = {"create" ~ compound_ident ~ ":" ~ ident ~ "{" ~ (ident ~ ",")* ~ ident? ~ "}"}
index_create_hnsw = {"create" ~ compound_ident ~ ":" ~ ident ~ "{" ~ (index_opt_field ~ ",")* ~ index_opt_field? ~ "}"}
index_create_adv = {"create" ~ compound_ident ~ ":" ~ ident ~ "{" ~ (index_opt_field ~ ",")* ~ index_opt_field? ~ "}"}
index_drop = {"drop" ~ compound_ident ~ ":" ~ ident }
compact_op = {"compact"}
list_fixed_rules = {"fixed_rules"}

@ -825,6 +825,7 @@ pub(crate) fn get_op(name: &str) -> Option<&'static Op> {
"regex_replace_all" => &OP_REGEX_REPLACE_ALL,
"regex_extract" => &OP_REGEX_EXTRACT,
"regex_extract_first" => &OP_REGEX_EXTRACT_FIRST,
"t2s" => &OP_T2S,
"encode_base64" => &OP_ENCODE_BASE64,
"decode_base64" => &OP_DECODE_BASE64,
"first" => &OP_FIRST,

@ -137,7 +137,7 @@ fn get_json_path<'a>(
.get_int()
.ok_or_else(|| miette!("json path must be a string or a number"))?
as usize;
if arr.len() >= key + 1 {
if arr.len() <= key + 1 {
arr.resize_with(key + 1, || JsonValue::Null);
}
@ -1445,6 +1445,14 @@ pub(crate) fn op_regex_extract_first(args: &[DataValue]) -> Result<DataValue> {
}
}
define_op!(OP_T2S, 1, false);
fn op_t2s(args: &[DataValue]) -> Result<DataValue> {
Ok(match &args[0] {
DataValue::Str(s) => DataValue::Str(fast2s::convert(s).into()),
d => d.clone(),
})
}
define_op!(OP_IS_NULL, 1, false);
pub(crate) fn op_is_null(args: &[DataValue]) -> Result<DataValue> {
Ok(DataValue::from(matches!(args[0], DataValue::Null)))
@ -1753,7 +1761,7 @@ fn get_impl(args: &[DataValue]) -> Result<DataValue> {
.get_int()
.ok_or_else(|| miette!("second argument to 'get' mut be an integer"))?;
let idx = get_index(n, l.len())?;
return Ok(l[idx].clone());
Ok(l[idx].clone())
}
DataValue::Json(json) => {
let res = match &args[1] {
@ -1770,8 +1778,7 @@ fn get_impl(args: &[DataValue]) -> Result<DataValue> {
.clone()
}
DataValue::List(l) => {
let mut v = json.clone();
get_json_path_immutable(&mut v, l)?.clone()
get_json_path_immutable(json, l)?.clone()
}
_ => bail!("second argument to 'get' mut be a string or integer"),
};

@ -263,7 +263,7 @@ impl TokenizerCache {
hashed_cache.insert(hash.as_ref().to_vec(), analyzer.clone());
let mut idx_cache = self.named_cache.write().unwrap();
idx_cache.insert(tokenizer_name.into(), analyzer.clone());
return Ok(analyzer);
Ok(analyzer)
}
}
}

@ -10,7 +10,7 @@ use std::collections::BTreeMap;
use std::sync::Arc;
use itertools::Itertools;
use miette::{ensure, miette, Diagnostic, Result, bail};
use miette::{bail, ensure, miette, Diagnostic, Result};
use smartstring::{LazyCompact, SmartString};
use thiserror::Error;
@ -54,7 +54,7 @@ pub(crate) struct HnswIndexConfig {
pub(crate) m_neighbours: usize,
pub(crate) index_filter: Option<String>,
pub(crate) extend_candidates: bool,
pub(crate) keep_pruned_connections: bool
pub(crate) keep_pruned_connections: bool,
}
#[derive(
@ -175,10 +175,16 @@ pub(crate) fn parse_sys(
}
SysOp::SetTriggers(rel, puts, rms, replaces)
}
Rule::fts_idx_op => {
todo!()
}
Rule::lsh_idx_op => {
todo!()
}
Rule::vec_idx_op => {
let inner = inner.into_inner().next().unwrap();
match inner.as_rule() {
Rule::index_create_hnsw => {
Rule::index_create_adv => {
let mut inner = inner.into_inner();
let rel = inner.next().unwrap();
let name = inner.next().unwrap();

@ -15,9 +15,9 @@ use pest::Parser;
use smartstring::{LazyCompact, SmartString};
use thiserror::Error;
use crate::data::expr::Expr;
use crate::data::expr::{Bytecode, Expr};
use crate::data::program::{FixedRuleApply, InputInlineRulesOrFixed, InputProgram, RelationOp};
use crate::data::relation::{ColumnDef, NullableColType};
use crate::data::relation::{ColumnDef, NullableColType, StoredRelationMetadata};
use crate::data::symb::Symbol;
use crate::data::tuple::{Tuple, ENCODED_KEY_MIN_LEN};
use crate::data::value::{DataValue, ValidityTs};
@ -27,11 +27,11 @@ use crate::parse::expr::build_expr;
use crate::parse::{parse_script, CozoScriptParser, Rule};
use crate::runtime::callback::{CallbackCollector, CallbackOp};
use crate::runtime::relation::{
extend_tuple_from_v, AccessLevel, InputRelationHandle, InsufficientAccessLevel,
extend_tuple_from_v, AccessLevel, InputRelationHandle, InsufficientAccessLevel, RelationHandle,
};
use crate::runtime::transact::SessionTx;
use crate::storage::Storage;
use crate::{Db, NamedRows, StoreTx};
use crate::{Db, NamedRows, SourceSpan, StoreTx};
#[derive(Debug, Error, Diagnostic)]
#[error("attempting to write into relation {0} of arity {1} with data of arity {2}")]
@ -129,18 +129,102 @@ impl<'a> SessionTx<'a> {
..
} = meta;
match op {
RelationOp::Rm => self.remove_from_relation(
db,
res_iter,
headers,
cur_vld,
callback_targets,
callback_collector,
propagate_triggers,
&mut to_clear,
&mut relation_store,
metadata,
key_bindings,
*span,
)?,
RelationOp::Ensure => self.ensure_in_relation(
res_iter,
headers,
cur_vld,
&mut relation_store,
metadata,
key_bindings,
dep_bindings,
*span,
)?,
RelationOp::EnsureNot => self.ensure_not_in_relation(
res_iter,
headers,
cur_vld,
&mut relation_store,
metadata,
key_bindings,
*span,
)?,
RelationOp::Update => self.update_in_relation(
db,
res_iter,
headers,
cur_vld,
callback_targets,
callback_collector,
propagate_triggers,
&mut to_clear,
&mut relation_store,
metadata,
key_bindings,
dep_bindings,
*span,
)?,
RelationOp::Create | RelationOp::Replace | RelationOp::Put => self.put_into_relation(
db,
res_iter,
headers,
cur_vld,
callback_targets,
callback_collector,
propagate_triggers,
&mut to_clear,
&mut relation_store,
metadata,
key_bindings,
dep_bindings,
*span,
)?,
};
Ok(to_clear)
}
fn put_into_relation<'s, S: Storage<'s>>(
&mut self,
db: &Db<S>,
res_iter: impl Iterator<Item = Tuple>,
headers: &[Symbol],
cur_vld: ValidityTs,
callback_targets: &BTreeSet<SmartString<LazyCompact>>,
callback_collector: &mut CallbackCollector,
propagate_triggers: bool,
to_clear: &mut Vec<(Vec<u8>, Vec<u8>)>,
relation_store: &mut RelationHandle,
metadata: &StoredRelationMetadata,
key_bindings: &[Symbol],
dep_bindings: &[Symbol],
span: SourceSpan,
) -> Result<()> {
let is_callback_target = callback_targets.contains(&relation_store.name);
match op {
RelationOp::Rm => {
if relation_store.access_level < AccessLevel::Protected {
bail!(InsufficientAccessLevel(
relation_store.name.to_string(),
"row removal".to_string(),
"row insertion".to_string(),
relation_store.access_level
));
}
let key_extractors = make_extractors(
let mut key_extractors = make_extractors(
&relation_store.metadata.keys,
&metadata.keys,
key_bindings,
@ -149,245 +233,143 @@ impl<'a> SessionTx<'a> {
let need_to_collect = !relation_store.is_temp
&& (is_callback_target
|| (propagate_triggers && !relation_store.rm_triggers.is_empty()));
|| (propagate_triggers && !relation_store.put_triggers.is_empty()));
let has_indices = !relation_store.indices.is_empty();
let has_hnsw_indices = !relation_store.hnsw_indices.is_empty();
let mut new_tuples: Vec<DataValue> = vec![];
let mut old_tuples: Vec<DataValue> = vec![];
let val_extractors = make_extractors(
&relation_store.metadata.non_keys,
&metadata.non_keys,
dep_bindings,
headers,
)?;
key_extractors.extend(val_extractors);
let mut stack = vec![];
let hnsw_filters = Self::make_hnsw_filters(relation_store)?;
for tuple in res_iter {
let extracted: Vec<DataValue> = key_extractors
.iter()
.map(|ex| ex.extract_data(&tuple, cur_vld))
.try_collect()?;
let key = relation_store.encode_key_for_store(&extracted, *span)?;
let key = relation_store.encode_key_for_store(&extracted, span)?;
let val = relation_store.encode_val_for_store(&extracted, span)?;
if need_to_collect || has_indices || has_hnsw_indices {
if let Some(existing) = self.store_tx.get(&key, false)? {
let mut tup = extracted.clone();
let mut tup = extracted[0..relation_store.metadata.keys.len()].to_vec();
extend_tuple_from_v(&mut tup, &existing);
if has_indices {
for (idx_rel, extractor) in relation_store.indices.values() {
let idx_tup =
extractor.iter().map(|i| tup[*i].clone()).collect_vec();
let encoded = idx_rel
.encode_key_for_store(&idx_tup, Default::default())?;
self.store_tx.del(&encoded)?;
}
}
if has_hnsw_indices {
for (idx_handle, _) in relation_store.hnsw_indices.values() {
self.hnsw_remove(&relation_store, idx_handle, &extracted)?;
}
if has_indices && extracted != tup {
self.update_in_index(relation_store, &extracted, &tup)?;
}
if need_to_collect {
old_tuples.push(DataValue::List(tup));
}
} else if has_indices {
for (idx_rel, extractor) in relation_store.indices.values() {
let idx_tup_new = extractor
.iter()
.map(|i| extracted[*i].clone())
.collect_vec();
let encoded_new =
idx_rel.encode_key_for_store(&idx_tup_new, Default::default())?;
self.store_tx.put(&encoded_new, &[])?;
}
}
self.update_in_hnsw(relation_store, &mut stack, &hnsw_filters, &extracted)?;
if need_to_collect {
new_tuples.push(DataValue::List(extracted.clone()));
new_tuples.push(DataValue::List(extracted));
}
}
if relation_store.is_temp {
self.temp_store_tx.del(&key)?;
self.temp_store_tx.put(&key, &val)?;
} else {
self.store_tx.del(&key)?;
self.store_tx.put(&key, &val)?;
}
}
// triggers and callbacks
if need_to_collect && !new_tuples.is_empty() {
let k_bindings = relation_store
.metadata
.keys
.iter()
.map(|k| Symbol::new(k.name.clone(), Default::default()))
.collect_vec();
let v_bindings = relation_store
.metadata
.non_keys
.iter()
.map(|k| Symbol::new(k.name.clone(), Default::default()));
let mut kv_bindings = k_bindings.clone();
kv_bindings.extend(v_bindings);
let kv_bindings = kv_bindings;
if propagate_triggers {
for trigger in &relation_store.rm_triggers {
let mut program = parse_script(
trigger,
&Default::default(),
&db.fixed_rules.read().unwrap(),
cur_vld,
)?
.get_single_program()?;
make_const_rule(
&mut program,
"_new",
k_bindings.clone(),
new_tuples.clone(),
);
make_const_rule(
&mut program,
"_old",
kv_bindings.clone(),
old_tuples.clone(),
);
let (_, cleanups) = db
.run_query(
self,
program,
self.collect_mutations(
db,
cur_vld,
callback_targets,
callback_collector,
false,
)
.map_err(|err| {
if err.source_code().is_some() {
err
} else {
err.with_source_code(trigger.to_string())
}
})?;
to_clear.extend(cleanups);
}
}
if is_callback_target {
let target_collector = callback_collector
.entry(relation_store.name.clone())
.or_default();
target_collector.push((
CallbackOp::Rm,
NamedRows::new(
k_bindings
.into_iter()
.map(|k| k.name.to_string())
.collect_vec(),
new_tuples
.into_iter()
.map(|v| match v {
DataValue::List(l) => l,
_ => unreachable!(),
})
.collect_vec(),
),
NamedRows::new(
kv_bindings
.into_iter()
.map(|k| k.name.to_string())
.collect_vec(),
old_tuples
.into_iter()
.map(|v| match v {
DataValue::List(l) => l,
_ => unreachable!(),
})
.collect_vec(),
),
))
}
propagate_triggers,
to_clear,
relation_store,
is_callback_target,
new_tuples,
old_tuples,
)?;
}
Ok(())
}
RelationOp::Ensure => {
if relation_store.access_level < AccessLevel::ReadOnly {
bail!(InsufficientAccessLevel(
relation_store.name.to_string(),
"row check".to_string(),
relation_store.access_level
));
}
let mut key_extractors = make_extractors(
&relation_store.metadata.keys,
&metadata.keys,
key_bindings,
headers,
)?;
let val_extractors = make_extractors(
&relation_store.metadata.non_keys,
&metadata.non_keys,
dep_bindings,
headers,
fn update_in_hnsw(
&mut self,
relation_store: &RelationHandle,
stack: &mut Vec<DataValue>,
hnsw_filters: &BTreeMap<SmartString<LazyCompact>, Vec<Bytecode>>,
new_kv: &[DataValue],
) -> Result<()> {
for (name, (idx_handle, idx_manifest)) in relation_store.hnsw_indices.iter() {
let filter = hnsw_filters.get(name);
self.hnsw_put(
idx_manifest,
relation_store,
idx_handle,
filter,
stack,
new_kv,
)?;
key_extractors.extend(val_extractors);
for tuple in res_iter {
let extracted: Vec<DataValue> = key_extractors
.iter()
.map(|ex| ex.extract_data(&tuple, cur_vld))
.try_collect()?;
let key = relation_store.encode_key_for_store(&extracted, *span)?;
let val = relation_store.encode_val_for_store(&extracted, *span)?;
let existing = if relation_store.is_temp {
self.temp_store_tx.get(&key, true)?
} else {
self.store_tx.get(&key, true)?
};
match existing {
None => {
bail!(TransactAssertionFailure {
relation: relation_store.name.to_string(),
key: extracted,
notice: "key does not exist in database".to_string()
})
}
Some(v) => {
if &v as &[u8] != &val as &[u8] {
bail!(TransactAssertionFailure {
relation: relation_store.name.to_string(),
key: extracted,
notice: "key exists in database, but value does not match"
.to_string()
})
}
}
Ok(())
}
fn make_hnsw_filters(
relation_store: &RelationHandle,
) -> Result<BTreeMap<SmartString<LazyCompact>, Vec<Bytecode>>> {
let mut hnsw_filters = BTreeMap::new();
for (name, (_, manifest)) in relation_store.hnsw_indices.iter() {
if let Some(f_code) = &manifest.index_filter {
let parsed = CozoScriptParser::parse(Rule::expr, f_code)
.into_diagnostic()?
.next()
.unwrap();
let mut code_expr = build_expr(parsed, &Default::default())?;
let binding_map = relation_store.raw_binding_map();
code_expr.fill_binding_indices(&binding_map)?;
hnsw_filters.insert(name.clone(), code_expr.compile());
}
}
RelationOp::EnsureNot => {
if relation_store.access_level < AccessLevel::ReadOnly {
bail!(InsufficientAccessLevel(
relation_store.name.to_string(),
"row check".to_string(),
relation_store.access_level
));
Ok(hnsw_filters)
}
let key_extractors = make_extractors(
&relation_store.metadata.keys,
&metadata.keys,
key_bindings,
headers,
)?;
fn update_in_relation<'s, S: Storage<'s>>(
&mut self,
db: &Db<S>,
res_iter: impl Iterator<Item = Tuple>,
headers: &[Symbol],
cur_vld: ValidityTs,
callback_targets: &BTreeSet<SmartString<LazyCompact>>,
callback_collector: &mut CallbackCollector,
propagate_triggers: bool,
to_clear: &mut Vec<(Vec<u8>, Vec<u8>)>,
relation_store: &mut RelationHandle,
metadata: &StoredRelationMetadata,
key_bindings: &[Symbol],
dep_bindings: &[Symbol],
span: SourceSpan,
) -> Result<()> {
let is_callback_target = callback_targets.contains(&relation_store.name);
for tuple in res_iter {
let extracted: Vec<DataValue> = key_extractors
.iter()
.map(|ex| ex.extract_data(&tuple, cur_vld))
.try_collect()?;
let key = relation_store.encode_key_for_store(&extracted, *span)?;
let already_exists = if relation_store.is_temp {
self.temp_store_tx.exists(&key, true)?
} else {
self.store_tx.exists(&key, true)?
};
if already_exists {
bail!(TransactAssertionFailure {
relation: relation_store.name.to_string(),
key: extracted,
notice: "key exists in database".to_string()
})
}
}
}
RelationOp::Update => {
if relation_store.access_level < AccessLevel::Protected {
bail!(InsufficientAccessLevel(
relation_store.name.to_string(),
@ -419,21 +401,7 @@ impl<'a> SessionTx<'a> {
);
let mut stack = vec![];
let mut hnsw_filters = BTreeMap::new();
if has_hnsw_indices {
for (name, (_, manifest)) in relation_store.hnsw_indices.iter() {
if let Some(f_code) = &manifest.index_filter {
let parsed = CozoScriptParser::parse(Rule::expr, f_code)
.into_diagnostic()?
.next()
.unwrap();
let mut code_expr = build_expr(parsed, &Default::default())?;
let binding_map = relation_store.raw_binding_map();
code_expr.fill_binding_indices(&binding_map)?;
hnsw_filters.insert(name.clone(), code_expr.compile());
}
}
}
let hnsw_filters = Self::make_hnsw_filters(relation_store)?;
for tuple in res_iter {
let mut new_kv: Vec<DataValue> = key_extractors
@ -441,7 +409,7 @@ impl<'a> SessionTx<'a> {
.map(|ex| ex.extract_data(&tuple, cur_vld))
.try_collect()?;
let key = relation_store.encode_key_for_store(&new_kv, *span)?;
let key = relation_store.encode_key_for_store(&new_kv, span)?;
let original_val_bytes = if relation_store.is_temp {
self.temp_store_tx.get(&key, true)?
} else {
@ -466,54 +434,22 @@ impl<'a> SessionTx<'a> {
None => {
new_kv.push(original_val[i].clone());
}
Some(ex) => {
let val = ex.extract_data(&tuple, cur_vld)?;
new_kv.push(val);
}
}
}
let new_val = relation_store.encode_val_for_store(&new_kv, *span)?;
if need_to_collect || has_indices || has_hnsw_indices {
if has_indices {
for (idx_rel, idx_extractor) in relation_store.indices.values() {
let idx_tup_old = idx_extractor
.iter()
.map(|i| old_kv[*i].clone())
.collect_vec();
let encoded_old = idx_rel
.encode_key_for_store(&idx_tup_old, Default::default())?;
self.store_tx.del(&encoded_old)?;
let idx_tup_new = idx_extractor
.iter()
.map(|i| new_kv[*i].clone())
.collect_vec();
let encoded_new = idx_rel
.encode_key_for_store(&idx_tup_new, Default::default())?;
self.store_tx.put(&encoded_new, &[])?;
Some(ex) => {
let val = ex.extract_data(&tuple, cur_vld)?;
new_kv.push(val);
}
}
}
let new_val = relation_store.encode_val_for_store(&new_kv, span)?;
if need_to_collect || has_indices || has_hnsw_indices {
self.update_in_index(relation_store, &new_kv, &old_kv)?;
if need_to_collect {
old_tuples.push(DataValue::List(old_kv));
}
if has_hnsw_indices {
for (name, (idx_handle, idx_manifest)) in
relation_store.hnsw_indices.iter()
{
let filter = hnsw_filters.get(name);
self.hnsw_put(
idx_manifest,
&relation_store,
idx_handle,
filter,
&mut stack,
&new_kv,
)?;
}
}
self.update_in_hnsw(relation_store, &mut stack, &hnsw_filters, &new_kv)?;
if need_to_collect {
new_tuples.push(DataValue::List(new_kv));
@ -528,6 +464,35 @@ impl<'a> SessionTx<'a> {
}
if need_to_collect && !new_tuples.is_empty() {
self.collect_mutations(
db,
cur_vld,
callback_targets,
callback_collector,
propagate_triggers,
to_clear,
relation_store,
is_callback_target,
new_tuples,
old_tuples,
)?;
}
Ok(())
}
fn collect_mutations<'s, S: Storage<'s>>(
&mut self,
db: &Db<S>,
cur_vld: ValidityTs,
callback_targets: &BTreeSet<SmartString<LazyCompact>>,
callback_collector: &mut CallbackCollector,
propagate_triggers: bool,
to_clear: &mut Vec<(Vec<u8>, Vec<u8>)>,
relation_store: &RelationHandle,
is_callback_target: bool,
new_tuples: Vec<DataValue>,
old_tuples: Vec<DataValue>,
) -> Result<()> {
let mut bindings = relation_store
.metadata
.keys
@ -556,13 +521,13 @@ impl<'a> SessionTx<'a> {
&mut program,
"_new",
kv_bindings.clone(),
new_tuples.clone(),
new_tuples.to_vec(),
);
make_const_rule(
&mut program,
"_old",
kv_bindings.clone(),
old_tuples.clone(),
old_tuples.to_vec(),
);
let (_, cleanups) = db
@ -617,31 +582,105 @@ impl<'a> SessionTx<'a> {
),
))
}
Ok(())
}
fn update_in_index(
&mut self,
relation_store: &RelationHandle,
new_kv: &[DataValue],
old_kv: &[DataValue],
) -> Result<()> {
for (idx_rel, idx_extractor) in relation_store.indices.values() {
let idx_tup_old = idx_extractor
.iter()
.map(|i| old_kv[*i].clone())
.collect_vec();
let encoded_old = idx_rel.encode_key_for_store(&idx_tup_old, Default::default())?;
self.store_tx.del(&encoded_old)?;
let idx_tup_new = idx_extractor
.iter()
.map(|i| new_kv[*i].clone())
.collect_vec();
let encoded_new = idx_rel.encode_key_for_store(&idx_tup_new, Default::default())?;
self.store_tx.put(&encoded_new, &[])?;
}
Ok(())
}
RelationOp::Create | RelationOp::Replace | RelationOp::Put => {
if relation_store.access_level < AccessLevel::Protected {
fn ensure_not_in_relation(
&mut self,
res_iter: impl Iterator<Item = Tuple>,
headers: &[Symbol],
cur_vld: ValidityTs,
relation_store: &mut RelationHandle,
metadata: &StoredRelationMetadata,
key_bindings: &[Symbol],
span: SourceSpan,
) -> Result<()> {
if relation_store.access_level < AccessLevel::ReadOnly {
bail!(InsufficientAccessLevel(
relation_store.name.to_string(),
"row insertion".to_string(),
"row check".to_string(),
relation_store.access_level
));
}
let mut key_extractors = make_extractors(
let key_extractors = make_extractors(
&relation_store.metadata.keys,
&metadata.keys,
key_bindings,
headers,
)?;
let need_to_collect = !relation_store.is_temp
&& (is_callback_target
|| (propagate_triggers && !relation_store.put_triggers.is_empty()));
let has_indices = !relation_store.indices.is_empty();
let has_hnsw_indices = !relation_store.hnsw_indices.is_empty();
let mut new_tuples: Vec<DataValue> = vec![];
let mut old_tuples: Vec<DataValue> = vec![];
for tuple in res_iter {
let extracted: Vec<DataValue> = key_extractors
.iter()
.map(|ex| ex.extract_data(&tuple, cur_vld))
.try_collect()?;
let key = relation_store.encode_key_for_store(&extracted, span)?;
let already_exists = if relation_store.is_temp {
self.temp_store_tx.exists(&key, true)?
} else {
self.store_tx.exists(&key, true)?
};
if already_exists {
bail!(TransactAssertionFailure {
relation: relation_store.name.to_string(),
key: extracted,
notice: "key exists in database".to_string()
})
}
}
Ok(())
}
fn ensure_in_relation(
&mut self,
res_iter: impl Iterator<Item = Tuple>,
headers: &[Symbol],
cur_vld: ValidityTs,
relation_store: &mut RelationHandle,
metadata: &StoredRelationMetadata,
key_bindings: &[Symbol],
dep_bindings: &[Symbol],
span: SourceSpan,
) -> Result<()> {
if relation_store.access_level < AccessLevel::ReadOnly {
bail!(InsufficientAccessLevel(
relation_store.name.to_string(),
"row check".to_string(),
relation_store.access_level
));
}
let mut key_extractors = make_extractors(
&relation_store.metadata.keys,
&metadata.keys,
key_bindings,
headers,
)?;
let val_extractors = make_extractors(
&relation_store.metadata.non_keys,
@ -650,114 +689,140 @@ impl<'a> SessionTx<'a> {
headers,
)?;
key_extractors.extend(val_extractors);
let mut stack = vec![];
let mut hnsw_filters = BTreeMap::new();
if has_hnsw_indices {
for (name, (_, manifest)) in relation_store.hnsw_indices.iter() {
if let Some(f_code) = &manifest.index_filter {
let parsed = CozoScriptParser::parse(Rule::expr, f_code)
.into_diagnostic()?
.next()
.unwrap();
let mut code_expr = build_expr(parsed, &Default::default())?;
let binding_map = relation_store.raw_binding_map();
code_expr.fill_binding_indices(&binding_map)?;
hnsw_filters.insert(name.clone(), code_expr.compile());
for tuple in res_iter {
let extracted: Vec<DataValue> = key_extractors
.iter()
.map(|ex| ex.extract_data(&tuple, cur_vld))
.try_collect()?;
let key = relation_store.encode_key_for_store(&extracted, span)?;
let val = relation_store.encode_val_for_store(&extracted, span)?;
let existing = if relation_store.is_temp {
self.temp_store_tx.get(&key, true)?
} else {
self.store_tx.get(&key, true)?
};
match existing {
None => {
bail!(TransactAssertionFailure {
relation: relation_store.name.to_string(),
key: extracted,
notice: "key does not exist in database".to_string()
})
}
Some(v) => {
if &v as &[u8] != &val as &[u8] {
bail!(TransactAssertionFailure {
relation: relation_store.name.to_string(),
key: extracted,
notice: "key exists in database, but value does not match".to_string()
})
}
}
}
}
Ok(())
}
fn remove_from_relation<'s, S: Storage<'s>>(
&mut self,
db: &Db<S>,
res_iter: impl Iterator<Item = Tuple>,
headers: &[Symbol],
cur_vld: ValidityTs,
callback_targets: &BTreeSet<SmartString<LazyCompact>>,
callback_collector: &mut CallbackCollector,
propagate_triggers: bool,
to_clear: &mut Vec<(Vec<u8>, Vec<u8>)>,
relation_store: &mut RelationHandle,
metadata: &StoredRelationMetadata,
key_bindings: &[Symbol],
span: SourceSpan,
) -> Result<()> {
let is_callback_target = callback_targets.contains(&relation_store.name);
if relation_store.access_level < AccessLevel::Protected {
bail!(InsufficientAccessLevel(
relation_store.name.to_string(),
"row removal".to_string(),
relation_store.access_level
));
}
let key_extractors = make_extractors(
&relation_store.metadata.keys,
&metadata.keys,
key_bindings,
headers,
)?;
let need_to_collect = !relation_store.is_temp
&& (is_callback_target
|| (propagate_triggers && !relation_store.rm_triggers.is_empty()));
let has_indices = !relation_store.indices.is_empty();
let has_hnsw_indices = !relation_store.hnsw_indices.is_empty();
let mut new_tuples: Vec<DataValue> = vec![];
let mut old_tuples: Vec<DataValue> = vec![];
for tuple in res_iter {
let extracted: Vec<DataValue> = key_extractors
.iter()
.map(|ex| ex.extract_data(&tuple, cur_vld))
.try_collect()?;
let key = relation_store.encode_key_for_store(&extracted, *span)?;
let val = relation_store.encode_val_for_store(&extracted, *span)?;
let key = relation_store.encode_key_for_store(&extracted, span)?;
if need_to_collect || has_indices || has_hnsw_indices {
if let Some(existing) = self.store_tx.get(&key, false)? {
let mut tup = extracted[0..relation_store.metadata.keys.len()].to_vec();
let mut tup = extracted.clone();
extend_tuple_from_v(&mut tup, &existing);
if has_indices && extracted != tup {
if has_indices {
for (idx_rel, extractor) in relation_store.indices.values() {
let idx_tup_old =
extractor.iter().map(|i| tup[*i].clone()).collect_vec();
let encoded_old = idx_rel
.encode_key_for_store(&idx_tup_old, Default::default())?;
self.store_tx.del(&encoded_old)?;
let idx_tup_new = extractor
.iter()
.map(|i| extracted[*i].clone())
.collect_vec();
let encoded_new = idx_rel
.encode_key_for_store(&idx_tup_new, Default::default())?;
self.store_tx.put(&encoded_new, &[])?;
}
let idx_tup = extractor.iter().map(|i| tup[*i].clone()).collect_vec();
let encoded =
idx_rel.encode_key_for_store(&idx_tup, Default::default())?;
self.store_tx.del(&encoded)?;
}
if need_to_collect {
old_tuples.push(DataValue::List(tup));
}
} else if has_indices {
for (idx_rel, extractor) in relation_store.indices.values() {
let idx_tup_new = extractor
.iter()
.map(|i| extracted[*i].clone())
.collect_vec();
let encoded_new = idx_rel
.encode_key_for_store(&idx_tup_new, Default::default())?;
self.store_tx.put(&encoded_new, &[])?;
if has_hnsw_indices {
for (idx_handle, _) in relation_store.hnsw_indices.values() {
self.hnsw_remove(relation_store, idx_handle, &extracted)?;
}
}
if has_hnsw_indices {
for (name, (idx_handle, idx_manifest)) in
relation_store.hnsw_indices.iter()
{
let filter = hnsw_filters.get(name);
self.hnsw_put(
idx_manifest,
&relation_store,
idx_handle,
filter,
&mut stack,
&extracted,
)?;
if need_to_collect {
old_tuples.push(DataValue::List(tup));
}
}
if need_to_collect {
new_tuples.push(DataValue::List(extracted));
new_tuples.push(DataValue::List(extracted.clone()));
}
}
if relation_store.is_temp {
self.temp_store_tx.put(&key, &val)?;
self.temp_store_tx.del(&key)?;
} else {
self.store_tx.put(&key, &val)?;
self.store_tx.del(&key)?;
}
}
// triggers and callbacks
if need_to_collect && !new_tuples.is_empty() {
let mut bindings = relation_store
let k_bindings = relation_store
.metadata
.keys
.iter()
.map(|k| Symbol::new(k.name.clone(), Default::default()))
.collect_vec();
let v_bindings = relation_store
.metadata
.non_keys
.iter()
.map(|k| Symbol::new(k.name.clone(), Default::default()));
bindings.extend(v_bindings);
let mut kv_bindings = k_bindings.clone();
kv_bindings.extend(v_bindings);
let kv_bindings = kv_bindings;
let kv_bindings = bindings;
if propagate_triggers {
for trigger in &relation_store.put_triggers {
for trigger in &relation_store.rm_triggers {
let mut program = parse_script(
trigger,
&Default::default(),
@ -766,12 +831,8 @@ impl<'a> SessionTx<'a> {
)?
.get_single_program()?;
make_const_rule(
&mut program,
"_new",
kv_bindings.clone(),
new_tuples.clone(),
);
make_const_rule(&mut program, "_new", k_bindings.clone(), new_tuples.clone());
make_const_rule(
&mut program,
"_old",
@ -803,14 +864,13 @@ impl<'a> SessionTx<'a> {
let target_collector = callback_collector
.entry(relation_store.name.clone())
.or_default();
let headers = kv_bindings
.into_iter()
.map(|k| k.name.to_string())
.collect_vec();
target_collector.push((
CallbackOp::Put,
CallbackOp::Rm,
NamedRows::new(
headers.clone(),
k_bindings
.into_iter()
.map(|k| k.name.to_string())
.collect_vec(),
new_tuples
.into_iter()
.map(|v| match v {
@ -820,7 +880,10 @@ impl<'a> SessionTx<'a> {
.collect_vec(),
),
NamedRows::new(
headers,
kv_bindings
.into_iter()
.map(|k| k.name.to_string())
.collect_vec(),
old_tuples
.into_iter()
.map(|v| match v {
@ -832,10 +895,7 @@ impl<'a> SessionTx<'a> {
))
}
}
}
};
Ok(to_clear)
Ok(())
}
}

@ -151,7 +151,7 @@ impl VectorCache {
impl<'a> SessionTx<'a> {
fn hnsw_put_vector(
&mut self,
tuple: &Tuple,
tuple: &[DataValue],
q: &Vector,
idx: usize,
subidx: i32,
@ -685,7 +685,7 @@ impl<'a> SessionTx<'a> {
idx_table: &RelationHandle,
filter: Option<&Vec<Bytecode>>,
stack: &mut Vec<DataValue>,
tuple: &Tuple,
tuple: &[DataValue],
) -> Result<bool> {
if let Some(code) = filter {
if !eval_bytecode_pred(code, tuple, stack, Default::default())? {

Loading…
Cancel
Save