LSH indexing

main
Ziyang Hu 1 year ago
parent eabef3de14
commit a07d4bf960

@ -18,7 +18,6 @@ use thiserror::Error;
use crate::data::aggr::Aggregation;
use crate::data::expr::Expr;
use crate::data::functions::OP_LIST;
use crate::data::relation::StoredRelationMetadata;
use crate::data::symb::{Symbol, PROG_ENTRY};
use crate::data::value::{DataValue, ValidityTs};
@ -995,7 +994,6 @@ impl SearchInput {
mut self,
base_handle: RelationHandle,
idx_handle: RelationHandle,
inv_idx_handle: RelationHandle,
manifest: MinHashLshIndexManifest,
gen: &mut TempSymbGen,
) -> Result<Disjunction> {
@ -1073,55 +1071,23 @@ impl SearchInput {
#[diagnostic(code(parser::wrong_arity_for_lsh_keys))]
struct WrongArityForKeys(#[label] SourceSpan, usize, usize);
let query = match self.parameters.remove("keys") {
None => match self.parameters.remove("key") {
None => {
bail!(LshRequiredMissing("keys".to_string(), self.span))
}
Some(expr) => {
ensure!(
base_handle.indices.keys().len() == 1,
LshRequiredMissing("keys".to_string(), self.span)
);
let span = expr.span();
let kw = gen.next(span);
let unif = NormalFormAtom::Unification(Unification {
binding: kw.clone(),
expr: Expr::Apply {
op: &OP_LIST,
args: [expr].into(),
span,
},
one_many_unif: false,
span,
});
conj.push(unif);
kw
}
},
Some(mut expr) => {
expr.partial_eval()?;
match expr {
Expr::Apply { op, args, span } => {
ensure!(op.name == OP_LIST.name, ExpectedListForLshKeys(span));
ensure!(
args.len() == base_handle.indices.keys().len(),
WrongArityForKeys(span, base_handle.indices.keys().len(), args.len())
);
let kw = gen.next(span);
let unif = NormalFormAtom::Unification(Unification {
binding: kw.clone(),
expr: Expr::Apply { op, args, span },
one_many_unif: false,
span,
});
conj.push(unif);
kw
}
_ => {
bail!(ExpectedListForLshKeys(self.span))
}
}
let query = match self
.parameters
.remove("query")
.ok_or_else(|| miette!(LshRequiredMissing("query".to_string(), self.span)))?
{
Expr::Binding { var, .. } => var,
expr => {
let span = expr.span();
let kw = gen.next(span);
let unif = NormalFormAtom::Unification(Unification {
binding: kw.clone(),
expr,
one_many_unif: false,
span,
});
conj.push(unif);
kw
}
};
@ -1184,7 +1150,6 @@ impl SearchInput {
conj.push(NormalFormAtom::LshSearch(LshSearch {
base_handle,
idx_handle,
inv_idx_handle,
manifest,
bindings,
k,
@ -1603,10 +1568,10 @@ impl SearchInput {
{
return self.normalize_fts(base_handle, idx_handle, manifest, gen);
}
if let Some((idx_handle, inv_idx_handle, manifest)) =
if let Some((idx_handle, _, manifest)) =
base_handle.lsh_indices.get(&self.index.name).cloned()
{
return self.normalize_lsh(base_handle, idx_handle, inv_idx_handle, manifest, gen);
return self.normalize_lsh(base_handle, idx_handle, manifest, gen);
}
#[derive(Debug, Error, Diagnostic)]
#[error("Index {name} not found on relation {relation}")]

@ -215,6 +215,7 @@ pub(crate) fn parse_sys(
args: Default::default(),
};
let mut extractor = "".to_string();
let mut extract_filter = "".to_string();
let mut n_gram = 1;
let mut n_perm = 200;
let mut target_threshold = 0.9;
@ -272,6 +273,11 @@ pub(crate) fn parse_sys(
ex.partial_eval()?;
extractor = ex.to_string();
}
"extract_filter" => {
let mut ex = build_expr(opt_val, param_pool)?;
ex.partial_eval()?;
extract_filter = ex.to_string();
}
"tokenizer" => {
let mut expr = build_expr(opt_val, param_pool)?;
expr.partial_eval()?;
@ -347,6 +353,10 @@ pub(crate) fn parse_sys(
false_positive_weight /= total_weights;
false_negative_weight /= total_weights;
if !extract_filter.is_empty() {
extractor = format!("if({}, {})", extract_filter, extractor);
}
let config = MinHashLshConfig {
base_relation: SmartString::from(rel.as_str()),
index_name: SmartString::from(name.as_str()),
@ -386,6 +396,7 @@ pub(crate) fn parse_sys(
args: Default::default(),
};
let mut extractor = "".to_string();
let mut extract_filter = "".to_string();
for opt_pair in inner {
let mut opt_inner = opt_pair.into_inner();
let opt_name = opt_inner.next().unwrap();
@ -396,6 +407,11 @@ pub(crate) fn parse_sys(
ex.partial_eval()?;
extractor = ex.to_string();
}
"extract_filter" => {
let mut ex = build_expr(opt_val, param_pool)?;
ex.partial_eval()?;
extract_filter = ex.to_string();
}
"tokenizer" => {
let mut expr = build_expr(opt_val, param_pool)?;
expr.partial_eval()?;
@ -453,6 +469,9 @@ pub(crate) fn parse_sys(
_ => bail!("Unknown option {} for FTS index", opt_name.as_str()),
}
}
if !extract_filter.is_empty() {
extractor = format!("if({}, {})", extract_filter, extractor);
}
let config = FtsIndexConfig {
base_relation: SmartString::from(rel.as_str()),
index_name: SmartString::from(name.as_str()),

@ -907,7 +907,6 @@ pub(crate) struct LshSearchRA {
pub(crate) own_bindings: Vec<Symbol>,
}
impl LshSearchRA {
fn fill_binding_indices_and_compile(&mut self) -> Result<()> {
self.parent.fill_binding_indices_and_compile()?;
@ -942,21 +941,24 @@ impl LshSearchRA {
let config = self.lsh_search.clone();
let filter_code = self.filter_bytecode.clone();
let mut stack = vec![];
let perms = config.manifest.get_hash_perms();
let tokenizer = tx.tokenizers.get(
&config.idx_handle.name,
&config.manifest.tokenizer,
&config.manifest.filters,
)?;
let it = self
.parent
.iter(tx, delta_rule, stores)?
.map_ok(move |tuple| -> Result<_> {
let q = match tuple[bind_idx].clone() {
DataValue::List(l) => l,
d => bail!("Expected list for LSH search, got {:?}", d),
};
let res = tx.lsh_search(
&q,
&tuple[bind_idx],
&config,
&mut stack,
&filter_code,
&perms,
&tokenizer,
)?;
Ok(res.into_iter().map(move |t| {
let mut r = tuple.clone();
@ -970,7 +972,6 @@ impl LshSearchRA {
}
}
#[derive(Debug)]
pub(crate) struct FtsSearchRA {
pub(crate) parent: Box<RelAlgebra>,

@ -27,6 +27,7 @@ use crate::fts::tokenizer::TextAnalyzer;
use crate::parse::expr::build_expr;
use crate::parse::{parse_script, CozoScriptParser, Rule};
use crate::runtime::callback::{CallbackCollector, CallbackOp};
use crate::runtime::minhash_lsh::HashPermutations;
use crate::runtime::relation::{
extend_tuple_from_v, AccessLevel, InputRelationHandle, InsufficientAccessLevel, RelationHandle,
};
@ -238,6 +239,7 @@ impl<'a> SessionTx<'a> {
let has_indices = !relation_store.indices.is_empty();
let has_hnsw_indices = !relation_store.hnsw_indices.is_empty();
let has_fts_indices = !relation_store.fts_indices.is_empty();
let has_lsh_indices = !relation_store.lsh_indices.is_empty();
let mut new_tuples: Vec<DataValue> = vec![];
let mut old_tuples: Vec<DataValue> = vec![];
@ -250,7 +252,8 @@ impl<'a> SessionTx<'a> {
key_extractors.extend(val_extractors);
let mut stack = vec![];
let hnsw_filters = Self::make_hnsw_filters(relation_store)?;
let fts_processors = self.make_fts_processors(relation_store)?;
let fts_lsh_processors = self.make_fts_lsh_processors(relation_store)?;
let lsh_perms = self.make_lsh_hash_perms(relation_store);
for tuple in res_iter {
let extracted: Vec<DataValue> = key_extractors
@ -261,13 +264,19 @@ impl<'a> SessionTx<'a> {
let key = relation_store.encode_key_for_store(&extracted, span)?;
let val = relation_store.encode_val_for_store(&extracted, span)?;
if need_to_collect || has_indices || has_hnsw_indices || has_fts_indices {
if need_to_collect
|| has_indices
|| has_hnsw_indices
|| has_fts_indices
|| has_lsh_indices
{
if let Some(existing) = self.store_tx.get(&key, false)? {
let mut tup = extracted[0..relation_store.metadata.keys.len()].to_vec();
extend_tuple_from_v(&mut tup, &existing);
if has_indices && extracted != tup {
self.update_in_index(relation_store, &extracted, &tup)?;
self.del_in_fts(relation_store, &mut stack, &fts_processors, &tup)?;
self.del_in_fts(relation_store, &mut stack, &fts_lsh_processors, &tup)?;
self.del_in_lsh(relation_store, &tup)?;
}
if need_to_collect {
@ -286,7 +295,14 @@ impl<'a> SessionTx<'a> {
}
self.update_in_hnsw(relation_store, &mut stack, &hnsw_filters, &extracted)?;
self.put_in_fts(relation_store, &mut stack, &fts_processors, &extracted)?;
self.put_in_fts(relation_store, &mut stack, &fts_lsh_processors, &extracted)?;
self.put_in_lsh(
relation_store,
&mut stack,
&fts_lsh_processors,
&extracted,
&lsh_perms,
)?;
if need_to_collect {
new_tuples.push(DataValue::List(extracted));
@ -345,6 +361,38 @@ impl<'a> SessionTx<'a> {
Ok(())
}
fn put_in_lsh(
&mut self,
rel_handle: &RelationHandle,
stack: &mut Vec<DataValue>,
processors: &BTreeMap<SmartString<LazyCompact>, (Arc<TextAnalyzer>, Vec<Bytecode>)>,
new_kv: &[DataValue],
hash_perms_map: &BTreeMap<SmartString<LazyCompact>, HashPermutations>,
) -> Result<()> {
for (k, (idx_handle, inv_idx_handle, manifest)) in rel_handle.lsh_indices.iter() {
let (tokenizer, extractor) = processors.get(k).unwrap();
self.put_lsh_index_item(
new_kv,
extractor,
stack,
tokenizer,
rel_handle,
idx_handle,
inv_idx_handle,
manifest,
hash_perms_map.get(k).unwrap(),
)?;
}
Ok(())
}
fn del_in_lsh(&mut self, rel_handle: &RelationHandle, old_kv: &[DataValue]) -> Result<()> {
for (idx_handle, inv_idx_handle, _) in rel_handle.lsh_indices.values() {
self.del_lsh_index_item(old_kv, None, idx_handle, inv_idx_handle)?;
}
Ok(())
}
fn update_in_hnsw(
&mut self,
relation_store: &RelationHandle,
@ -366,11 +414,22 @@ impl<'a> SessionTx<'a> {
Ok(())
}
fn make_fts_processors(
fn make_lsh_hash_perms(
&self,
relation_store: &RelationHandle,
) -> BTreeMap<SmartString<LazyCompact>, HashPermutations> {
let mut perms = BTreeMap::new();
for (name, (_, _, manifest)) in relation_store.lsh_indices.iter() {
perms.insert(name.clone(), manifest.get_hash_perms());
}
perms
}
fn make_fts_lsh_processors(
&self,
relation_store: &RelationHandle,
) -> Result<BTreeMap<SmartString<LazyCompact>, (Arc<TextAnalyzer>, Vec<Bytecode>)>> {
let mut fts_processors = BTreeMap::new();
let mut processors = BTreeMap::new();
for (name, (_, manifest)) in relation_store.fts_indices.iter() {
let tokenizer = self.tokenizers.get(
&relation_store.name,
@ -386,9 +445,26 @@ impl<'a> SessionTx<'a> {
let binding_map = relation_store.raw_binding_map();
code_expr.fill_binding_indices(&binding_map)?;
let extractor = code_expr.compile()?;
fts_processors.insert(name.clone(), (tokenizer, extractor));
processors.insert(name.clone(), (tokenizer, extractor));
}
for (name, (_, _, manifest)) in relation_store.lsh_indices.iter() {
let tokenizer = self.tokenizers.get(
&relation_store.name,
&manifest.tokenizer,
&manifest.filters,
)?;
let parsed = CozoScriptParser::parse(Rule::expr, &manifest.extractor)
.into_diagnostic()?
.next()
.unwrap();
let mut code_expr = build_expr(parsed, &Default::default())?;
let binding_map = relation_store.raw_binding_map();
code_expr.fill_binding_indices(&binding_map)?;
let extractor = code_expr.compile()?;
processors.insert(name.clone(), (tokenizer, extractor));
}
Ok(fts_processors)
Ok(processors)
}
fn make_hnsw_filters(
@ -449,6 +525,7 @@ impl<'a> SessionTx<'a> {
let has_indices = !relation_store.indices.is_empty();
let has_hnsw_indices = !relation_store.hnsw_indices.is_empty();
let has_fts_indices = !relation_store.fts_indices.is_empty();
let has_lsh_indices = !relation_store.lsh_indices.is_empty();
let mut new_tuples: Vec<DataValue> = vec![];
let mut old_tuples: Vec<DataValue> = vec![];
@ -461,7 +538,8 @@ impl<'a> SessionTx<'a> {
let mut stack = vec![];
let hnsw_filters = Self::make_hnsw_filters(relation_store)?;
let fts_processors = self.make_fts_processors(relation_store)?;
let fts_lsh_processors = self.make_fts_lsh_processors(relation_store)?;
let lsh_perms = self.make_lsh_hash_perms(relation_store);
for tuple in res_iter {
let mut new_kv: Vec<DataValue> = key_extractors
@ -502,8 +580,14 @@ impl<'a> SessionTx<'a> {
}
let new_val = relation_store.encode_val_for_store(&new_kv, span)?;
if need_to_collect || has_indices || has_hnsw_indices || has_fts_indices {
self.del_in_fts(relation_store, &mut stack, &fts_processors, &old_kv)?;
if need_to_collect
|| has_indices
|| has_hnsw_indices
|| has_fts_indices
|| has_lsh_indices
{
self.del_in_fts(relation_store, &mut stack, &fts_lsh_processors, &old_kv)?;
self.del_in_lsh(relation_store, &old_kv)?;
self.update_in_index(relation_store, &new_kv, &old_kv)?;
if need_to_collect {
@ -511,7 +595,14 @@ impl<'a> SessionTx<'a> {
}
self.update_in_hnsw(relation_store, &mut stack, &hnsw_filters, &new_kv)?;
self.put_in_fts(relation_store, &mut stack, &fts_processors, &new_kv)?;
self.put_in_fts(relation_store, &mut stack, &fts_lsh_processors, &new_kv)?;
self.put_in_lsh(
relation_store,
&mut stack,
&fts_lsh_processors,
&new_kv,
&lsh_perms,
)?;
if need_to_collect {
new_tuples.push(DataValue::List(new_kv));
@ -825,7 +916,7 @@ impl<'a> SessionTx<'a> {
let has_indices = !relation_store.indices.is_empty();
let has_hnsw_indices = !relation_store.hnsw_indices.is_empty();
let has_fts_indices = !relation_store.fts_indices.is_empty();
let fts_processors = self.make_fts_processors(relation_store)?;
let fts_processors = self.make_fts_lsh_processors(relation_store)?;
let mut new_tuples: Vec<DataValue> = vec![];
let mut old_tuples: Vec<DataValue> = vec![];
let mut stack = vec![];
@ -841,6 +932,7 @@ impl<'a> SessionTx<'a> {
let mut tup = extracted.clone();
extend_tuple_from_v(&mut tup, &existing);
self.del_in_fts(relation_store, &mut stack, &fts_processors, &tup)?;
self.del_in_lsh(relation_store, &tup)?;
if has_indices {
for (idx_rel, extractor) in relation_store.indices.values() {
let idx_tup = extractor.iter().map(|i| tup[*i].clone()).collect_vec();

@ -8,13 +8,14 @@
// Some ideas are from https://github.com/schelterlabs/rust-minhash
use crate::data::expr::{eval_bytecode, Bytecode, eval_bytecode_pred};
use crate::data::expr::{eval_bytecode, eval_bytecode_pred, Bytecode};
use crate::data::tuple::Tuple;
use crate::fts::tokenizer::TextAnalyzer;
use crate::fts::TokenizerConfig;
use crate::runtime::relation::RelationHandle;
use crate::runtime::transact::SessionTx;
use crate::{DataValue, Expr, SourceSpan, Symbol};
use itertools::Itertools;
use miette::{bail, miette, Result};
use quadrature::integrate;
use rand::{thread_rng, RngCore};
@ -28,10 +29,9 @@ impl<'a> SessionTx<'a> {
pub(crate) fn del_lsh_index_item(
&mut self,
tuple: &[DataValue],
bytes: Option<Vec<u8>>,
bytes: Option<Vec<Vec<u8>>>,
idx_handle: &RelationHandle,
inv_idx_handle: &RelationHandle,
manifest: &MinHashLshIndexManifest,
) -> Result<()> {
let bytes = match bytes {
None => {
@ -39,7 +39,13 @@ impl<'a> SessionTx<'a> {
let inv_key = inv_idx_handle.encode_key_for_store(tuple, Default::default())?;
self.store_tx.del(&inv_key)?;
match found.pop() {
Some(DataValue::Bytes(b)) => b,
Some(DataValue::List(l)) => l
.into_iter()
.map(|chunk| match chunk {
DataValue::Bytes(b) => b,
_ => unreachable!(),
})
.collect_vec(),
_ => unreachable!(),
}
} else {
@ -49,16 +55,11 @@ impl<'a> SessionTx<'a> {
Some(b) => b,
};
let mut key = Vec::with_capacity(bytes.len() + 2);
key.push(DataValue::Bot);
let mut key = Vec::with_capacity(idx_handle.metadata.keys.len());
key.push(DataValue::Bot);
key.extend_from_slice(tuple);
for (i, chunk) in bytes
.chunks_exact(manifest.r * std::mem::size_of::<u32>())
.enumerate()
{
key[0] = DataValue::from(i as i64);
key[1] = DataValue::Bytes(chunk.to_vec());
for chunk in bytes {
key[0] = DataValue::Bytes(chunk);
let key_bytes = idx_handle.encode_key_for_store(&key, Default::default())?;
self.store_tx.del(&key_bytes)?;
}
@ -80,10 +81,16 @@ impl<'a> SessionTx<'a> {
inv_idx_handle.get_val_only(self, &tuple[..rel_handle.metadata.keys.len()])?
{
let bytes = match found.pop() {
Some(DataValue::Bytes(b)) => b,
Some(DataValue::List(l)) => l
.into_iter()
.map(|chunk| match chunk {
DataValue::Bytes(b) => b,
_ => unreachable!(),
})
.collect_vec(),
_ => unreachable!(),
};
self.del_lsh_index_item(tuple, Some(bytes), idx_handle, inv_idx_handle, manifest)?;
self.del_lsh_index_item(tuple, Some(bytes), idx_handle, inv_idx_handle)?;
}
let to_index = eval_bytecode(extractor, tuple, stack)?;
let min_hash = match to_index {
@ -96,45 +103,54 @@ impl<'a> SessionTx<'a> {
_ => bail!("Cannot put value {:?} into a LSH index", to_index),
};
let bytes = min_hash.get_bytes();
let chunk_size = manifest.r * std::mem::size_of::<u32>();
let chunks = (0..manifest.b).map(|i| {
let mut byte_range = bytes[i * chunk_size..(i + 1) * chunk_size].to_vec();
byte_range.extend_from_slice(&(i as u16).to_le_bytes());
byte_range
}).collect_vec();
let inv_key_part = &tuple[..rel_handle.metadata.keys.len()];
let inv_val_part = vec![DataValue::Bytes(bytes.to_vec())];
let inv_key = inv_idx_handle.encode_key_for_store(inv_key_part, Default::default())?;
let inv_val =
inv_idx_handle.encode_val_only_for_store(&inv_val_part, Default::default())?;
self.store_tx.put(&inv_key, &inv_val)?;
let mut key = Vec::with_capacity(bytes.len() + 2);
key.push(DataValue::Bot);
let mut key = Vec::with_capacity(bytes.len() + 1);
key.push(DataValue::Bot);
key.extend_from_slice(inv_key_part);
let chunk_size = manifest.r * std::mem::size_of::<u32>();
for i in 0..manifest.b {
let byte_range = &bytes[i * chunk_size..(i + 1) * chunk_size];
key[0] = DataValue::from(i as i64);
key[1] = DataValue::Bytes(byte_range.to_vec());
for chunk in chunks.iter() {
key[0] = DataValue::Bytes(chunk.clone());
let key_bytes = idx_handle.encode_key_for_store(&key, Default::default())?;
self.store_tx.put(&key_bytes, &[])?;
}
let inv_val_part = vec![DataValue::List(chunks.into_iter().map(DataValue::Bytes).collect_vec())];
let inv_key = inv_idx_handle.encode_key_for_store(inv_key_part, Default::default())?;
let inv_val =
inv_idx_handle.encode_val_only_for_store(&inv_val_part, Default::default())?;
self.store_tx.put(&inv_key, &inv_val)?;
Ok(())
}
pub(crate) fn lsh_search(
&self,
tuple: &[DataValue],
q: &DataValue,
config: &LshSearch,
stack: &mut Vec<DataValue>,
filter_code: &Option<(Vec<Bytecode>, SourceSpan)>,
perms: &HashPermutations,
tokenizer: &TextAnalyzer,
) -> Result<Vec<Tuple>> {
let bytes = if let Some(mut found) = config
.inv_idx_handle
.get_val_only(self, &tuple[..config.base_handle.metadata.keys.len()])?
{
match found.pop() {
Some(DataValue::Bytes(b)) => b,
_ => unreachable!(),
let bytes = match q {
DataValue::Null => {
return Ok(vec![]);
}
DataValue::List(l) => HashValues::new(l.iter(), perms).get_bytes().to_vec(),
DataValue::Str(s) => {
let n_grams = tokenizer.unique_ngrams(s, config.manifest.n_gram);
HashValues::new(n_grams.iter(), perms).get_bytes().to_vec()
}
} else {
return Ok(vec![]);
_ => bail!("Cannot search for value {:?} in a LSH index", q),
};
let chunk_size = config.manifest.r * std::mem::size_of::<u32>();
let mut key_prefix = Vec::with_capacity(2);
@ -146,10 +162,8 @@ impl<'a> SessionTx<'a> {
for ks in config.idx_handle.scan_prefix(self, &key_prefix) {
let ks = ks?;
let key_part = &ks[2..];
if key_part != tuple {
let found = found_tuples.entry(key_part.to_vec()).or_default();
*found += 1;
}
let found = found_tuples.entry(key_part.to_vec()).or_default();
*found += 1;
}
}
let mut ret = vec![];
@ -185,7 +199,6 @@ impl<'a> SessionTx<'a> {
pub(crate) struct LshSearch {
pub(crate) base_handle: RelationHandle,
pub(crate) idx_handle: RelationHandle,
pub(crate) inv_idx_handle: RelationHandle,
pub(crate) manifest: MinHashLshIndexManifest,
pub(crate) bindings: Vec<Symbol>,
pub(crate) k: Option<usize>,
@ -261,8 +274,7 @@ impl LshParams {
}
fn false_positive_probability(threshold: f64, b: usize, r: usize) -> f64 {
let _probability =
|s| -> f64 { 1. - f64::powf(1. - f64::powi(s, r as i32), b as f64) };
let _probability = |s| -> f64 { 1. - f64::powf(1. - f64::powi(s, r as i32), b as f64) };
integrate(_probability, 0.0, threshold, _ALLOWED_INTEGRATE_ERR).integral
}
@ -326,8 +338,6 @@ impl HashValues {
}
#[cfg(test)]
pub(crate) fn jaccard(&self, other_minhash: &Self) -> f32 {
use itertools::Itertools;
let matches = self
.0
.iter()

@ -743,14 +743,6 @@ impl<'a> SessionTx<'a> {
}];
let mut idx_keys = vec![
ColumnDef {
name: SmartString::from("perm"),
typing: NullableColType {
coltype: ColType::Int,
nullable: false,
},
default_gen: None,
},
ColumnDef {
name: SmartString::from("hash"),
typing: NullableColType {

Loading…
Cancel
Save