fts index backing relations

main
Ziyang Hu 1 year ago
parent 378dd33fb6
commit ac8ccbc094

@ -182,6 +182,16 @@ pub enum Expr {
#[serde(skip)]
span: SourceSpan,
},
/// Unbound function application
UnboundApply {
/// Op representing the function to apply
op: SmartString<LazyCompact>,
/// Arguments to the application
args: Box<[Expr]>,
/// Source span
#[serde(skip)]
span: SourceSpan,
},
/// Conditional expressions
Cond {
/// Conditional clauses, the first expression in each tuple should evaluate to a boolean
@ -215,6 +225,13 @@ impl Display for Expr {
}
writer.finish()
}
Expr::UnboundApply { op, args, .. } => {
let mut writer = f.debug_tuple(op);
for arg in args.iter() {
writer.field(arg);
}
writer.finish()
}
Expr::Cond { clauses, .. } => {
let mut writer = f.debug_tuple("cond");
for (cond, expr) in clauses {
@ -227,6 +244,11 @@ impl Display for Expr {
}
}
#[derive(Debug, Error, Diagnostic)]
#[error("No implementation found for op `{1}`")]
#[diagnostic(code(eval::no_implementation))]
pub(crate) struct NoImplementationError(#[label] pub(crate) SourceSpan, pub(crate) String);
#[derive(Debug, Error, Diagnostic)]
#[error("Found value {1:?} where a boolean value is expected")]
#[diagnostic(code(eval::predicate_not_bool))]
@ -244,15 +266,16 @@ struct BadEntityId(DataValue, #[label] SourceSpan);
struct EvalRaisedError(#[label] SourceSpan, #[help] String);
impl Expr {
pub(crate) fn compile(&self) -> Vec<Bytecode> {
pub(crate) fn compile(&self) -> Result<Vec<Bytecode>> {
let mut collector = vec![];
expr2bytecode(self, &mut collector);
collector
expr2bytecode(self, &mut collector)?;
Ok(collector)
}
pub(crate) fn span(&self) -> SourceSpan {
match self {
Expr::Binding { var, .. } => var.span,
Expr::Const { span, .. } | Expr::Apply { span, .. } | Expr::Cond { span, .. } => *span,
Expr::UnboundApply { span, .. } => *span,
}
}
pub(crate) fn get_binding(&self) -> Option<&Symbol> {
@ -332,17 +355,20 @@ impl Expr {
val.fill_binding_indices(binding_map)?;
}
}
Expr::UnboundApply { op, span, .. } => {
bail!(NoImplementationError(*span, op.to_string()));
}
}
Ok(())
}
#[allow(dead_code)]
pub(crate) fn binding_indices(&self) -> BTreeSet<usize> {
pub(crate) fn binding_indices(&self) -> Result<BTreeSet<usize>> {
let mut ret = BTreeSet::default();
self.do_binding_indices(&mut ret);
ret
self.do_binding_indices(&mut ret)?;
Ok(ret)
}
#[allow(dead_code)]
fn do_binding_indices(&self, coll: &mut BTreeSet<usize>) {
fn do_binding_indices(&self, coll: &mut BTreeSet<usize>) -> Result<()> {
match self {
Expr::Binding { tuple_pos, .. } => {
if let Some(idx) = tuple_pos {
@ -352,20 +378,24 @@ impl Expr {
Expr::Const { .. } => {}
Expr::Apply { args, .. } => {
for arg in args.iter() {
arg.do_binding_indices(coll);
arg.do_binding_indices(coll)?;
}
}
Expr::Cond { clauses, .. } => {
for (cond, val) in clauses {
cond.do_binding_indices(coll);
val.do_binding_indices(coll)
cond.do_binding_indices(coll)?;
val.do_binding_indices(coll)?;
}
} // Expr::Try { clauses, .. } => {
// for clause in clauses {
// clause.do_binding_indices(coll)
// }
// }
// for clause in clauses {
// clause.do_binding_indices(coll)
// }
// }
Expr::UnboundApply { op, span, .. } => {
bail!(NoImplementationError(*span, op.to_string()));
}
}
Ok(())
}
pub(crate) fn eval_to_const(mut self) -> Result<DataValue> {
#[derive(Error, Diagnostic, Debug)]
@ -415,12 +445,12 @@ impl Expr {
}
Ok(())
}
pub(crate) fn bindings(&self) -> BTreeSet<Symbol> {
pub(crate) fn bindings(&self) -> Result<BTreeSet<Symbol>> {
let mut ret = BTreeSet::new();
self.collect_bindings(&mut ret);
ret
self.collect_bindings(&mut ret)?;
Ok(ret)
}
pub(crate) fn collect_bindings(&self, coll: &mut BTreeSet<Symbol>) {
pub(crate) fn collect_bindings(&self, coll: &mut BTreeSet<Symbol>) -> Result<()> {
match self {
Expr::Binding { var, .. } => {
coll.insert(var.clone());
@ -428,16 +458,20 @@ impl Expr {
Expr::Const { .. } => {}
Expr::Apply { args, .. } => {
for arg in args.iter() {
arg.collect_bindings(coll)
arg.collect_bindings(coll)?;
}
}
Expr::Cond { clauses, .. } => {
for (cond, val) in clauses {
cond.collect_bindings(coll);
val.collect_bindings(coll)
cond.collect_bindings(coll)?;
val.collect_bindings(coll)?;
}
}
Expr::UnboundApply { op, span, .. } => {
bail!(NoImplementationError(*span, op.to_string()));
}
}
Ok(())
}
pub(crate) fn eval(&self, bindings: impl AsRef<[DataValue]>) -> Result<DataValue> {
match self {
@ -480,6 +514,9 @@ impl Expr {
}
Ok(DataValue::Null)
}
Expr::UnboundApply { op, span, .. } => {
bail!(NoImplementationError(*span, op.to_string()));
}
}
}
pub(crate) fn extract_bound(&self, target: &Symbol) -> Result<ValueRange> {
@ -565,6 +602,9 @@ impl Expr {
}
_ => ValueRange::default(),
},
Expr::UnboundApply { op, span, .. } => {
bail!(NoImplementationError(*span, op.to_string()));
}
})
}
pub(crate) fn to_var_list(&self) -> Result<Vec<SmartString<LazyCompact>>> {

@ -1418,7 +1418,7 @@ impl Unification {
pub(crate) fn is_const(&self) -> bool {
matches!(self.expr, Expr::Const { .. })
}
pub(crate) fn bindings_in_expr(&self) -> BTreeSet<Symbol> {
pub(crate) fn bindings_in_expr(&self) -> Result<BTreeSet<Symbol>> {
self.expr.bindings()
}
}

@ -81,7 +81,7 @@ fn astar(
) -> Result<(f64, Vec<DataValue>)> {
let start_node = &starting[0];
let goal_node = &goal[0];
let heuristic_bytecode = heuristic.compile();
let heuristic_bytecode = heuristic.compile()?;
let mut stack = vec![];
let mut eval_heuristic = |node: &Tuple| -> Result<f64> {
let mut v = node.clone();

@ -35,9 +35,9 @@ impl FixedRule for Bfs {
let mut condition = payload.expr_option("condition", None)?;
let binding_map = nodes.get_binding_map(0);
condition.fill_binding_indices(&binding_map)?;
let condition_bytecode = condition.compile();
let condition_bytecode = condition.compile()?;
let condition_span = condition.span();
let binding_indices = condition.binding_indices();
let binding_indices = condition.binding_indices()?;
let skip_query_nodes = binding_indices.is_subset(&BTreeSet::from([0]));
let mut visited: BTreeSet<DataValue> = Default::default();

@ -35,9 +35,9 @@ impl FixedRule for Dfs {
let mut condition = payload.expr_option("condition", None)?;
let binding_map = nodes.get_binding_map(0);
condition.fill_binding_indices(&binding_map)?;
let condition_bytecode = condition.compile();
let condition_bytecode = condition.compile()?;
let condition_span = condition.span();
let binding_indices = condition.binding_indices();
let binding_indices = condition.binding_indices()?;
let skip_query_nodes = binding_indices.is_subset(&BTreeSet::from([0]));
let mut visited: BTreeSet<DataValue> = Default::default();

@ -45,7 +45,7 @@ impl FixedRule for RandomWalk {
let edges_binding = edges.get_binding_map(nodes_arity);
nodes_binding.extend(edges_binding);
weight.fill_binding_indices(&nodes_binding)?;
maybe_weight_bytecode = Some((weight.compile(), weight.span()));
maybe_weight_bytecode = Some((weight.compile()?, weight.span()));
}
let maybe_weight_bytecode = maybe_weight_bytecode;
let mut stack = vec![];

@ -72,8 +72,8 @@ impl FixedRule for ReorderSort {
for out in out_list.iter_mut() {
out.fill_binding_indices(&binding_map)?;
}
let out_bytecods = out_list.iter().map(|e| e.compile()).collect_vec();
let sort_by_bytecodes = sort_by.compile();
let out_bytecods: Vec<_> = out_list.iter().map(|e| e.compile()).try_collect()?;
let sort_by_bytecodes = sort_by.compile()?;
let mut stack = vec![];
let mut buffer = vec![];

@ -25,6 +25,15 @@ use std::sync::{Arc, RwLock};
pub(crate) mod cangjie;
pub(crate) mod tokenizer;
#[derive(Debug, Clone, PartialEq, serde_derive::Serialize, serde_derive::Deserialize)]
pub(crate) struct FtsIndexManifest {
pub(crate) base_relation: SmartString<LazyCompact>,
pub(crate) index_name: SmartString<LazyCompact>,
pub(crate) extractor: String,
pub(crate) tokenizer: TokenizerConfig,
pub(crate) filters: Vec<TokenizerConfig>,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, serde_derive::Serialize, serde_derive::Deserialize)]
pub(crate) struct TokenizerConfig {
pub(crate) name: SmartString<LazyCompact>,

@ -15,7 +15,7 @@ use pest::pratt_parser::{Op, PrattParser};
use smartstring::{LazyCompact, SmartString};
use thiserror::Error;
use crate::data::expr::{get_op, Bytecode, Expr};
use crate::data::expr::{get_op, Bytecode, Expr, NoImplementationError};
use crate::data::functions::{
OP_ADD, OP_AND, OP_COALESCE, OP_CONCAT, OP_DIV, OP_EQ, OP_GE, OP_GT, OP_JSON_OBJECT, OP_LE,
OP_LIST, OP_LT, OP_MINUS, OP_MOD, OP_MUL, OP_NEGATE, OP_NEQ, OP_OR, OP_POW, OP_SUB,
@ -53,7 +53,7 @@ lazy_static! {
#[diagnostic(code(parser::invalid_expression))]
pub(crate) struct InvalidExpression(#[label] pub(crate) SourceSpan);
pub(crate) fn expr2bytecode(expr: &Expr, collector: &mut Vec<Bytecode>) {
pub(crate) fn expr2bytecode(expr: &Expr, collector: &mut Vec<Bytecode>) -> Result<()> {
match expr {
Expr::Binding { var, tuple_pos } => collector.push(Bytecode::Binding {
var: var.clone(),
@ -66,7 +66,7 @@ pub(crate) fn expr2bytecode(expr: &Expr, collector: &mut Vec<Bytecode>) {
Expr::Apply { op, args, span } => {
let arity = args.len();
for arg in args.iter() {
expr2bytecode(arg, collector);
expr2bytecode(arg, collector)?;
}
collector.push(Bytecode::Apply {
op,
@ -78,7 +78,7 @@ pub(crate) fn expr2bytecode(expr: &Expr, collector: &mut Vec<Bytecode>) {
let mut return_jump_pos = vec![];
for (cond, val) in clauses {
// +1
expr2bytecode(cond, collector);
expr2bytecode(cond, collector)?;
// -1
collector.push(Bytecode::JumpIfFalse {
jump_to: 0,
@ -86,7 +86,7 @@ pub(crate) fn expr2bytecode(expr: &Expr, collector: &mut Vec<Bytecode>) {
});
let false_jump_amend_pos = collector.len() - 1;
// +1 in this branch
expr2bytecode(val, collector);
expr2bytecode(val, collector)?;
collector.push(Bytecode::Goto {
jump_to: 0,
span: *span,
@ -105,7 +105,11 @@ pub(crate) fn expr2bytecode(expr: &Expr, collector: &mut Vec<Bytecode>) {
}
}
}
Expr::UnboundApply { op, span, .. } => {
bail!(NoImplementationError(*span, op.to_string()));
}
}
Ok(())
}
pub(crate) fn build_expr(pair: Pair<'_>, param_pool: &BTreeMap<String, DataValue>) -> Result<Expr> {
@ -371,42 +375,45 @@ fn build_term(pair: Pair<'_>, param_pool: &BTreeMap<String, DataValue>) -> Resul
));
Expr::Cond { clauses, span }
}
_ => {
let op = get_op(ident).ok_or_else(|| {
FuncNotFoundError(ident.to_string(), ident_p.extract_span())
})?;
op.post_process_args(&mut args);
#[derive(Error, Diagnostic, Debug)]
#[error("Wrong number of arguments for function '{0}'")]
#[diagnostic(code(parser::func_wrong_num_args))]
struct WrongNumArgsError(String, #[label] SourceSpan, #[help] String);
if op.vararg {
ensure!(
op.min_arity <= args.len(),
WrongNumArgsError(
ident.to_string(),
span,
format!("Need at least {} argument(s)", op.min_arity)
)
);
} else {
ensure!(
op.min_arity == args.len(),
WrongNumArgsError(
ident.to_string(),
span,
format!("Need exactly {} argument(s)", op.min_arity)
)
);
}
Expr::Apply {
op,
_ => match get_op(ident) {
None => Expr::UnboundApply {
op: ident.into(),
args: args.into(),
span,
},
Some(op) => {
op.post_process_args(&mut args);
#[derive(Error, Diagnostic, Debug)]
#[error("Wrong number of arguments for function '{0}'")]
#[diagnostic(code(parser::func_wrong_num_args))]
struct WrongNumArgsError(String, #[label] SourceSpan, #[help] String);
if op.vararg {
ensure!(
op.min_arity <= args.len(),
WrongNumArgsError(
ident.to_string(),
span,
format!("Need at least {} argument(s)", op.min_arity)
)
);
} else {
ensure!(
op.min_arity == args.len(),
WrongNumArgsError(
ident.to_string(),
span,
format!("Need exactly {} argument(s)", op.min_arity)
)
);
}
Expr::Apply {
op,
args: args.into(),
span,
}
}
}
},
}
}
Rule::grouping => build_expr(pair.into_inner().next().unwrap(), param_pool)?,

@ -23,7 +23,7 @@ use crate::parse::expr::build_expr;
use crate::parse::query::parse_query;
use crate::parse::{ExtractSpan, Pairs, Rule, SourceSpan};
use crate::runtime::relation::AccessLevel;
use crate::FixedRule;
use crate::{Expr, FixedRule};
pub(crate) enum SysOp {
Compact,
@ -185,12 +185,110 @@ pub(crate) fn parse_sys(
}
SysOp::SetTriggers(rel, puts, rms, replaces)
}
Rule::fts_idx_op => {
todo!()
}
Rule::lsh_idx_op => {
todo!()
}
Rule::fts_idx_op => {
let inner = inner.into_inner().next().unwrap();
match inner.as_rule() {
Rule::index_create_adv => {
let mut inner = inner.into_inner();
let rel = inner.next().unwrap();
let name = inner.next().unwrap();
let mut filters = vec![];
let mut tokenizer = TokenizerConfig {
name: Default::default(),
args: Default::default(),
};
let mut extractor = "".to_string();
for opt_pair in inner {
let mut opt_inner = opt_pair.into_inner();
let opt_name = opt_inner.next().unwrap();
let opt_val = opt_inner.next().unwrap();
match opt_name.as_str() {
"extractor" => {
let mut ex = build_expr(opt_val, param_pool)?;
ex.partial_eval()?;
extractor = ex.to_string();
}
"tokenizer" => {
let mut expr = build_expr(opt_val, param_pool)?;
expr.partial_eval()?;
match expr {
Expr::UnboundApply { op, args, .. } => {
let mut targs = vec![];
for arg in args.iter() {
let v = arg.clone().eval_to_const()?;
targs.push(v);
}
tokenizer.name = op;
tokenizer.args = targs;
}
Expr::Binding { var, .. } => {
tokenizer.name = var.name;
tokenizer.args = vec![];
}
_ => bail!("Tokenizer must be a symbol or a call for an existing tokenizer"),
}
}
"filters" => {
let mut expr = build_expr(opt_val, param_pool)?;
expr.partial_eval()?;
match expr {
Expr::Apply { op, args, .. } => {
if op.name != "LIST" {
bail!("Filters must be a list of filters");
}
for arg in args.iter() {
match arg {
Expr::UnboundApply { op, args, .. } => {
let mut targs = vec![];
for arg in args.iter() {
let v = arg.clone().eval_to_const()?;
targs.push(v);
}
filters.push(TokenizerConfig {
name: op.clone(),
args: targs,
})
}
Expr::Binding { var, .. } => {
filters.push(TokenizerConfig {
name: var.name.clone(),
args: vec![],
})
}
_ => bail!("Tokenizer must be a symbol or a call for an existing tokenizer"),
}
}
}
_ => bail!("Filters must be a list of filters"),
}
}
_ => bail!("Unknown option {} for FTS index", opt_name.as_str()),
}
}
let config = FtsIndexConfig {
base_relation: SmartString::from(rel.as_str()),
index_name: SmartString::from(name.as_str()),
extractor,
tokenizer,
filters,
};
SysOp::CreateFtsIndex(config)
}
Rule::index_drop => {
let mut inner = inner.into_inner();
let rel = inner.next().unwrap();
let name = inner.next().unwrap();
SysOp::RemoveIndex(
Symbol::new(rel.as_str(), rel.extract_span()),
Symbol::new(name.as_str(), name.extract_span()),
)
}
r => unreachable!("{:?}", r),
}
}
Rule::vec_idx_op => {
let inner = inner.into_inner().next().unwrap();
match inner.as_rule() {

@ -469,7 +469,7 @@ impl<'a> SessionTx<'a> {
}
}
MagicAtom::Predicate(p) => {
ret = ret.filter(p.clone());
ret = ret.filter(p.clone())?;
}
MagicAtom::HnswSearch(s) => {
debug_assert!(
@ -502,7 +502,7 @@ impl<'a> SessionTx<'a> {
}
ret = ret.hnsw_search(s.clone(), own_bindings)?;
if !post_filters.is_empty() {
ret = ret.filter(Expr::build_and(post_filters, s.span));
ret = ret.filter(Expr::build_and(post_filters, s.span))?;
}
}
MagicAtom::Unification(u) => {
@ -530,7 +530,7 @@ impl<'a> SessionTx<'a> {
u.span,
)
};
ret = ret.filter(expr);
ret = ret.filter(expr)?;
} else {
seen_variables.insert(u.binding.clone());
ret = ret.unify(u.binding.clone(), u.expr.clone(), u.one_many_unif, u.span);

@ -100,7 +100,7 @@ impl UnificationRA {
.map(|(a, b)| (b, a))
.collect();
self.expr.fill_binding_indices(&parent_bindings)?;
self.expr_bytecode = self.expr.compile();
self.expr_bytecode = self.expr.compile()?;
Ok(())
}
pub(crate) fn do_eliminate_temp_vars(&mut self, used: &BTreeSet<Symbol>) -> Result<()> {
@ -110,7 +110,7 @@ impl UnificationRA {
}
}
let mut nxt = used.clone();
nxt.extend(self.expr.bindings());
nxt.extend(self.expr.bindings()?);
self.parent.eliminate_temp_vars(&nxt)?;
Ok(())
}
@ -188,7 +188,7 @@ impl FilteredRA {
}
let mut nxt = used.clone();
for e in self.filters.iter() {
nxt.extend(e.bindings());
nxt.extend(e.bindings()?);
}
self.parent.eliminate_temp_vars(&nxt)?;
Ok(())
@ -204,7 +204,7 @@ impl FilteredRA {
.collect();
for e in self.filters.iter_mut() {
e.fill_binding_indices(&parent_bindings)?;
self.filters_bytecodes.push((e.compile(), e.span()));
self.filters_bytecodes.push((e.compile()?, e.span()));
}
Ok(())
}
@ -442,8 +442,8 @@ impl RelAlgebra {
new_order,
})
}
pub(crate) fn filter(self, filter: Expr) -> Self {
match self {
pub(crate) fn filter(self, filter: Expr) -> Result<Self> {
Ok(match self {
s @ (RelAlgebra::Fixed(_)
| RelAlgebra::Reorder(_)
| RelAlgebra::NegJoin(_)
@ -543,11 +543,11 @@ impl RelAlgebra {
..
} = *inner;
for filter in filters {
let f_bindings = filter.bindings();
let f_bindings = filter.bindings()?;
if f_bindings.is_subset(&left_bindings) {
left = left.filter(filter);
left = left.filter(filter)?;
} else if f_bindings.is_subset(&right_bindings) {
right = right.filter(filter);
right = right.filter(filter)?;
} else {
remaining.push(filter);
}
@ -570,7 +570,7 @@ impl RelAlgebra {
}
joined
}
}
})
}
pub(crate) fn unify(
self,
@ -865,7 +865,7 @@ impl HnswSearchRA {
.collect();
let filter = self.hnsw_search.filter.as_mut().unwrap();
filter.fill_binding_indices(&bindings)?;
self.filter_bytecode = Some((filter.compile(), filter.span()));
self.filter_bytecode = Some((filter.compile()?, filter.span()));
}
Ok(())
}
@ -892,7 +892,7 @@ impl HnswSearchRA {
.map_ok(move |tuple| -> Result<_> {
let v = match tuple[bind_idx].clone() {
DataValue::Vec(v) => v,
d => bail!("Expected vector, got {:?}", d)
d => bail!("Expected vector, got {:?}", d),
};
let res = tx.hnsw_knn(v, &config, &filter_code, &mut stack)?;
@ -929,7 +929,7 @@ impl StoredWithValidityRA {
.collect();
for e in self.filters.iter_mut() {
e.fill_binding_indices(&bindings)?;
self.filters_bytecodes.push((e.compile(), e.span()));
self.filters_bytecodes.push((e.compile()?, e.span()));
}
Ok(())
}
@ -1038,7 +1038,7 @@ impl StoredRA {
.collect();
for e in self.filters.iter_mut() {
e.fill_binding_indices(&bindings)?;
self.filters_bytecodes.push((e.compile(), e.span()));
self.filters_bytecodes.push((e.compile()?, e.span()));
}
Ok(())
}
@ -1349,7 +1349,7 @@ impl TempStoreRA {
.collect();
for e in self.filters.iter_mut() {
e.fill_binding_indices(&bindings)?;
self.filters_bytecodes.push((e.compile(), e.span()))
self.filters_bytecodes.push((e.compile()?, e.span()))
}
Ok(())
}
@ -1844,7 +1844,7 @@ impl InnerJoin {
_ => None,
} {
for filter in filters {
left.extend(filter.bindings());
left.extend(filter.bindings()?);
}
}
self.left.eliminate_temp_vars(&left)?;

@ -44,7 +44,7 @@ impl NormalFormInlineRule {
seen_variables.insert(u.binding.clone());
round_1_collected.push(NormalFormAtom::Unification(u));
} else {
let unif_vars = u.bindings_in_expr();
let unif_vars = u.bindings_in_expr()?;
if unif_vars.is_subset(&seen_variables) {
seen_variables.insert(u.binding.clone());
round_1_collected.push(NormalFormAtom::Unification(u));
@ -139,14 +139,14 @@ impl NormalFormInlineRule {
}
}
NormalFormAtom::Predicate(p) => {
if p.bindings().is_subset(&seen_variables) {
if p.bindings()?.is_subset(&seen_variables) {
collected.push(NormalFormAtom::Predicate(p.clone()));
} else {
pending.push(NormalFormAtom::Predicate(p.clone()));
}
}
NormalFormAtom::Unification(u) => {
if u.bindings_in_expr().is_subset(&seen_variables) {
if u.bindings_in_expr()?.is_subset(&seen_variables) {
collected.push(NormalFormAtom::Unification(u.clone()));
} else {
pending.push(NormalFormAtom::Unification(u.clone()));

@ -346,7 +346,7 @@ impl<'a> SessionTx<'a> {
let mut code_expr = build_expr(parsed, &Default::default())?;
let binding_map = relation_store.raw_binding_map();
code_expr.fill_binding_indices(&binding_map)?;
hnsw_filters.insert(name.clone(), code_expr.compile());
hnsw_filters.insert(name.clone(), code_expr.compile()?);
}
}
Ok(hnsw_filters)

@ -1192,8 +1192,19 @@ impl<'s, S: Storage<'s>> Db<S> {
vec![vec![DataValue::from(OK_STR)]],
))
}
SysOp::CreateFtsIndex(_) => {
todo!("FTS index creation is not yet implemented")
SysOp::CreateFtsIndex(config) => {
let lock = self
.obtain_relation_locks(iter::once(&config.base_relation))
.pop()
.unwrap();
let _guard = lock.write().unwrap();
let mut tx = self.transact_write()?;
tx.create_fts_index(config)?;
tx.commit_tx()?;
Ok(NamedRows::new(
vec![STATUS_STR.to_string()],
vec![vec![DataValue::from(OK_STR)]],
))
}
SysOp::RemoveIndex(rel_name, idx_name) => {
let lock = self

@ -24,8 +24,9 @@ use crate::data::relation::{ColType, ColumnDef, NullableColType, StoredRelationM
use crate::data::symb::Symbol;
use crate::data::tuple::{decode_tuple_from_key, Tuple, TupleT, ENCODED_KEY_MIN_LEN};
use crate::data::value::{DataValue, ValidityTs};
use crate::fts::FtsIndexManifest;
use crate::parse::expr::build_expr;
use crate::parse::sys::HnswIndexConfig;
use crate::parse::sys::{FtsIndexConfig, HnswIndexConfig};
use crate::parse::{CozoScriptParser, Rule, SourceSpan};
use crate::query::compile::IndexPositionUse;
use crate::runtime::hnsw::HnswIndexManifest;
@ -81,6 +82,18 @@ pub(crate) struct RelationHandle {
pub(crate) indices: BTreeMap<SmartString<LazyCompact>, (RelationHandle, Vec<usize>)>,
pub(crate) hnsw_indices:
BTreeMap<SmartString<LazyCompact>, (RelationHandle, HnswIndexManifest)>,
pub(crate) fts_indices: BTreeMap<SmartString<LazyCompact>, (RelationHandle, FtsIndexManifest)>,
}
impl RelationHandle {
pub(crate) fn has_index(&self, index_name: &str) -> bool {
self.indices.contains_key(index_name)
|| self.hnsw_indices.contains_key(index_name)
|| self.fts_indices.contains_key(index_name)
}
pub(crate) fn has_no_index(&self) -> bool {
self.indices.is_empty() && self.hnsw_indices.is_empty() && self.fts_indices.is_empty()
}
}
#[derive(
@ -483,6 +496,11 @@ pub fn extend_tuple_from_v(key: &mut Tuple, val: &[u8]) {
}
}
#[derive(Debug, Error, Diagnostic)]
#[error("index {0} for relation {1} already exists")]
#[diagnostic(code(tx::index_already_exists))]
pub(crate) struct IndexAlreadyExists(String, String);
#[derive(Debug, Diagnostic, Error)]
#[error("Cannot create relation {0} as one with the same name already exists")]
#[diagnostic(code(eval::rel_name_conflict))]
@ -565,6 +583,7 @@ impl<'a> SessionTx<'a> {
is_temp,
indices: Default::default(),
hnsw_indices: Default::default(),
fts_indices: Default::default(),
};
let name_key = vec![DataValue::Str(meta.name.clone())].encode_as_key(RelationId::SYSTEM);
@ -615,7 +634,7 @@ impl<'a> SessionTx<'a> {
// bail!("Cannot destroy temp relation");
// }
let store = self.get_relation(name, true)?;
if !store.indices.is_empty() || !store.hnsw_indices.is_empty() {
if !store.has_no_index() {
bail!(
"Cannot remove stored relation `{}` with indices attached.",
name
@ -665,19 +684,95 @@ impl<'a> SessionTx<'a> {
Ok(())
}
pub(crate) fn create_hnsw_index(&mut self, config: HnswIndexConfig) -> Result<()> {
pub(crate) fn create_fts_index(&mut self, config: FtsIndexConfig) -> Result<()> {
// Get relation handle
let mut rel_handle = self.get_relation(&config.base_relation, true)?;
// Check if index already exists
if rel_handle.indices.contains_key(&config.index_name)
|| rel_handle.hnsw_indices.contains_key(&config.index_name)
{
#[derive(Debug, Error, Diagnostic)]
#[error("index {0} for relation {1} already exists")]
#[diagnostic(code(tx::index_already_exists))]
pub(crate) struct IndexAlreadyExists(String, String);
if rel_handle.has_index(&config.index_name) {
bail!(IndexAlreadyExists(
config.index_name.to_string(),
config.index_name.to_string()
));
}
// Build key columns definitions
let mut idx_keys: Vec<ColumnDef> = vec![ColumnDef {
name: SmartString::from("word"),
typing: NullableColType {
coltype: ColType::String,
nullable: false,
},
default_gen: None,
}];
for k in rel_handle.metadata.keys.iter() {
idx_keys.push(ColumnDef {
name: format!("src_{}", k.name).into(),
typing: k.typing.clone(),
default_gen: None,
});
}
let non_idx_keys: Vec<ColumnDef> = vec![
ColumnDef {
name: SmartString::from("offset_from"),
typing: NullableColType {
coltype: ColType::Int,
nullable: false,
},
default_gen: None,
},
ColumnDef {
name: SmartString::from("offset_to"),
typing: NullableColType {
coltype: ColType::Int,
nullable: false,
},
default_gen: None,
},
];
let idx_handle = self.write_idx_relation(
&config.base_relation,
&config.index_name,
idx_keys,
non_idx_keys,
)?;
// add index to relation
let manifest = FtsIndexManifest {
base_relation: config.base_relation,
index_name: config.index_name,
extractor: config.extractor,
tokenizer: config.tokenizer,
filters: config.filters,
};
// populate index TODO
rel_handle
.fts_indices
.insert(manifest.index_name.clone(), (idx_handle, manifest));
// update relation metadata
let new_encoded =
vec![DataValue::from(&rel_handle.name as &str)].encode_as_key(RelationId::SYSTEM);
let mut meta_val = vec![];
rel_handle
.serialize(&mut Serializer::new(&mut meta_val))
.unwrap();
self.store_tx.put(&new_encoded, &meta_val)?;
Ok(())
}
pub(crate) fn create_hnsw_index(&mut self, config: HnswIndexConfig) -> Result<()> {
// Get relation handle
let mut rel_handle = self.get_relation(&config.base_relation, true)?;
// Check if index already exists
if rel_handle.has_index(&config.index_name) {
bail!(IndexAlreadyExists(
config.index_name.to_string(),
config.index_name.to_string()
@ -790,28 +885,12 @@ impl<'a> SessionTx<'a> {
},
];
// create index relation
let key_bindings = idx_keys
.iter()
.map(|col| Symbol::new(col.name.clone(), Default::default()))
.collect();
let dep_bindings = non_idx_keys
.iter()
.map(|col| Symbol::new(col.name.clone(), Default::default()))
.collect();
let idx_handle = InputRelationHandle {
name: Symbol::new(
format!("{}:{}", config.base_relation, config.index_name),
Default::default(),
),
metadata: StoredRelationMetadata {
keys: idx_keys,
non_keys: non_idx_keys,
},
key_bindings,
dep_bindings,
span: Default::default(),
};
let idx_handle = self.create_relation(idx_handle)?;
let idx_handle = self.write_idx_relation(
&config.base_relation,
&config.index_name,
idx_keys,
non_idx_keys,
)?;
// add index to relation
let manifest = HnswIndexManifest {
@ -841,7 +920,7 @@ impl<'a> SessionTx<'a> {
let mut code_expr = build_expr(parsed, &Default::default())?;
let binding_map = rel_handle.raw_binding_map();
code_expr.fill_binding_indices(&binding_map)?;
code_expr.compile()
code_expr.compile()?
} else {
vec![]
};
@ -878,6 +957,35 @@ impl<'a> SessionTx<'a> {
Ok(())
}
fn write_idx_relation(
&mut self,
base_name: &str,
idx_name: &str,
idx_keys: Vec<ColumnDef>,
non_idx_keys: Vec<ColumnDef>,
) -> Result<RelationHandle> {
let key_bindings = idx_keys
.iter()
.map(|col| Symbol::new(col.name.clone(), Default::default()))
.collect();
let dep_bindings = non_idx_keys
.iter()
.map(|col| Symbol::new(col.name.clone(), Default::default()))
.collect();
let idx_handle = InputRelationHandle {
name: Symbol::new(format!("{}:{}", base_name, idx_name), Default::default()),
metadata: StoredRelationMetadata {
keys: idx_keys,
non_keys: non_idx_keys,
},
key_bindings,
dep_bindings,
span: Default::default(),
};
let idx_handle = self.create_relation(idx_handle)?;
Ok(idx_handle)
}
pub(crate) fn create_index(
&mut self,
rel_name: &Symbol,
@ -888,14 +996,7 @@ impl<'a> SessionTx<'a> {
let mut rel_handle = self.get_relation(rel_name, true)?;
// Check if index already exists
if rel_handle.indices.contains_key(&idx_name.name)
|| rel_handle.hnsw_indices.contains_key(&idx_name.name)
{
#[derive(Debug, Error, Diagnostic)]
#[error("index {0} for relation {1} already exists")]
#[diagnostic(code(tx::index_already_exists))]
pub(crate) struct IndexAlreadyExists(String, String);
if rel_handle.has_index(&idx_name.name) {
bail!(IndexAlreadyExists(
idx_name.name.to_string(),
rel_name.name.to_string()

Loading…
Cancel
Save