|
|
|
@ -10,7 +10,8 @@ use std::collections::{BTreeMap, BTreeSet};
|
|
|
|
|
use std::sync::Arc;
|
|
|
|
|
|
|
|
|
|
use itertools::Itertools;
|
|
|
|
|
use miette::{bail, Diagnostic, Result, WrapErr};
|
|
|
|
|
use miette::{bail, Diagnostic, IntoDiagnostic, Result, WrapErr};
|
|
|
|
|
use pest::Parser;
|
|
|
|
|
use smartstring::{LazyCompact, SmartString};
|
|
|
|
|
use thiserror::Error;
|
|
|
|
|
|
|
|
|
@ -22,7 +23,8 @@ use crate::data::tuple::Tuple;
|
|
|
|
|
use crate::data::value::{DataValue, ValidityTs};
|
|
|
|
|
use crate::fixed_rule::utilities::constant::Constant;
|
|
|
|
|
use crate::fixed_rule::FixedRuleHandle;
|
|
|
|
|
use crate::parse::parse_script;
|
|
|
|
|
use crate::parse::expr::build_expr;
|
|
|
|
|
use crate::parse::{parse_script, CozoScriptParser, Rule};
|
|
|
|
|
use crate::runtime::callback::{CallbackCollector, CallbackOp};
|
|
|
|
|
use crate::runtime::relation::{
|
|
|
|
|
extend_tuple_from_v, AccessLevel, InputRelationHandle, InsufficientAccessLevel,
|
|
|
|
@ -149,6 +151,7 @@ impl<'a> SessionTx<'a> {
|
|
|
|
|
&& (is_callback_target
|
|
|
|
|
|| (propagate_triggers && !relation_store.rm_triggers.is_empty()));
|
|
|
|
|
let has_indices = !relation_store.indices.is_empty();
|
|
|
|
|
let has_hnsw_indices = !relation_store.hnsw_indices.is_empty();
|
|
|
|
|
let mut new_tuples: Vec<DataValue> = vec![];
|
|
|
|
|
let mut old_tuples: Vec<DataValue> = vec![];
|
|
|
|
|
|
|
|
|
@ -158,7 +161,7 @@ impl<'a> SessionTx<'a> {
|
|
|
|
|
.map(|ex| ex.extract_data(&tuple, cur_vld))
|
|
|
|
|
.try_collect()?;
|
|
|
|
|
let key = relation_store.encode_key_for_store(&extracted, *span)?;
|
|
|
|
|
if need_to_collect || has_indices {
|
|
|
|
|
if need_to_collect || has_indices || has_hnsw_indices {
|
|
|
|
|
if let Some(existing) = self.store_tx.get(&key, false)? {
|
|
|
|
|
let mut tup = extracted.clone();
|
|
|
|
|
extend_tuple_from_v(&mut tup, &existing);
|
|
|
|
@ -171,6 +174,13 @@ impl<'a> SessionTx<'a> {
|
|
|
|
|
self.store_tx.del(&encoded)?;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if has_hnsw_indices {
|
|
|
|
|
for (idx_handle, _) in
|
|
|
|
|
relation_store.hnsw_indices.values()
|
|
|
|
|
{
|
|
|
|
|
self.hnsw_remove(&relation_store, idx_handle, &extracted)?;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if need_to_collect {
|
|
|
|
|
old_tuples.push(DataValue::List(tup));
|
|
|
|
|
}
|
|
|
|
@ -399,6 +409,7 @@ impl<'a> SessionTx<'a> {
|
|
|
|
|
&& (is_callback_target
|
|
|
|
|
|| (propagate_triggers && !relation_store.put_triggers.is_empty()));
|
|
|
|
|
let has_indices = !relation_store.indices.is_empty();
|
|
|
|
|
let has_hnsw_indices = !relation_store.hnsw_indices.is_empty();
|
|
|
|
|
let mut new_tuples: Vec<DataValue> = vec![];
|
|
|
|
|
let mut old_tuples: Vec<DataValue> = vec![];
|
|
|
|
|
|
|
|
|
@ -409,6 +420,22 @@ impl<'a> SessionTx<'a> {
|
|
|
|
|
headers,
|
|
|
|
|
)?;
|
|
|
|
|
key_extractors.extend(val_extractors);
|
|
|
|
|
let mut stack = vec![];
|
|
|
|
|
let mut hnsw_filters = BTreeMap::new();
|
|
|
|
|
if has_hnsw_indices {
|
|
|
|
|
for (name, (_, manifest)) in relation_store.hnsw_indices.iter() {
|
|
|
|
|
if let Some(f_code) = &manifest.index_filter {
|
|
|
|
|
let parsed = CozoScriptParser::parse(Rule::expr, f_code)
|
|
|
|
|
.into_diagnostic()?
|
|
|
|
|
.next()
|
|
|
|
|
.unwrap();
|
|
|
|
|
let mut code_expr = build_expr(parsed, &Default::default())?;
|
|
|
|
|
let binding_map = relation_store.raw_binding_map();
|
|
|
|
|
code_expr.fill_binding_indices(&binding_map)?;
|
|
|
|
|
hnsw_filters.insert(name.clone(), code_expr.compile());
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for tuple in res_iter {
|
|
|
|
|
let extracted: Vec<DataValue> = key_extractors
|
|
|
|
@ -419,7 +446,7 @@ impl<'a> SessionTx<'a> {
|
|
|
|
|
let key = relation_store.encode_key_for_store(&extracted, *span)?;
|
|
|
|
|
let val = relation_store.encode_val_for_store(&extracted, *span)?;
|
|
|
|
|
|
|
|
|
|
if need_to_collect || has_indices {
|
|
|
|
|
if need_to_collect || has_indices || has_hnsw_indices {
|
|
|
|
|
if let Some(existing) = self.store_tx.get(&key, false)? {
|
|
|
|
|
let mut tup = extracted[0..relation_store.metadata.keys.len()].to_vec();
|
|
|
|
|
extend_tuple_from_v(&mut tup, &existing);
|
|
|
|
@ -456,6 +483,22 @@ impl<'a> SessionTx<'a> {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if has_hnsw_indices {
|
|
|
|
|
for (name, (idx_handle, idx_manifest)) in
|
|
|
|
|
relation_store.hnsw_indices.iter()
|
|
|
|
|
{
|
|
|
|
|
let filter = hnsw_filters.get(name);
|
|
|
|
|
self.hnsw_put(
|
|
|
|
|
idx_manifest,
|
|
|
|
|
&relation_store,
|
|
|
|
|
idx_handle,
|
|
|
|
|
filter,
|
|
|
|
|
&mut stack,
|
|
|
|
|
&extracted,
|
|
|
|
|
)?;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if need_to_collect {
|
|
|
|
|
new_tuples.push(DataValue::List(extracted));
|
|
|
|
|
}
|
|
|
|
|