use disk cached collection for two-phase ops

main
Ziyang Hu 1 year ago
parent 29b2c4d900
commit c06b0f0ccb

36
Cargo.lock generated

@ -304,7 +304,7 @@ dependencies = [
"cc", "cc",
"cfg-if 1.0.0", "cfg-if 1.0.0",
"libc", "libc",
"miniz_oxide", "miniz_oxide 0.6.2",
"object", "object",
"rustc-demangle", "rustc-demangle",
] ]
@ -743,6 +743,7 @@ dependencies = [
"smartstring", "smartstring",
"sqlite", "sqlite",
"sqlite3-src", "sqlite3-src",
"swapvec",
"thiserror", "thiserror",
"tikv-client", "tikv-client",
"tikv-jemallocator-global", "tikv-jemallocator-global",
@ -1264,7 +1265,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a8a2db397cb1c8772f31494cb8917e48cd1e64f0fa7efac59fbd741a0a8ce841" checksum = "a8a2db397cb1c8772f31494cb8917e48cd1e64f0fa7efac59fbd741a0a8ce841"
dependencies = [ dependencies = [
"crc32fast", "crc32fast",
"miniz_oxide", "miniz_oxide 0.6.2",
] ]
[[package]] [[package]]
@ -2027,6 +2028,15 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "lz4_flex"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b8c72594ac26bfd34f2d99dfced2edfaddfe8a476e3ff2ca0eb293d925c4f83"
dependencies = [
"twox-hash",
]
[[package]] [[package]]
name = "matchit" name = "matchit"
version = "0.7.0" version = "0.7.0"
@ -2129,6 +2139,15 @@ dependencies = [
"adler", "adler",
] ]
[[package]]
name = "miniz_oxide"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e7810e0be55b428ada41041c41f32c9f1a42817901b4ccf45fa3d4b6561e74c7"
dependencies = [
"adler",
]
[[package]] [[package]]
name = "minreq" name = "minreq"
version = "2.7.0" version = "2.7.0"
@ -3640,6 +3659,19 @@ dependencies = [
"is-terminal", "is-terminal",
] ]
[[package]]
name = "swapvec"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f6d2e759025e1ddace58d35fd578843d20b517a5692237a6edb6c27da4fbc49"
dependencies = [
"bincode",
"lz4_flex",
"miniz_oxide 0.7.1",
"serde",
"tempfile",
]
[[package]] [[package]]
name = "swift-bridge" name = "swift-bridge"
version = "0.1.51" version = "0.1.51"

@ -136,4 +136,5 @@ quadrature = "0.1.2"
jieba-rs = "0.6.7" jieba-rs = "0.6.7"
aho-corasick = "1.0.1" aho-corasick = "1.0.1"
rust-stemmers = "1.2.0" rust-stemmers = "1.2.0"
fast2s = "0.3.1" fast2s = "0.3.1"
swapvec = "0.2.0"

@ -33,6 +33,7 @@ use crate::runtime::hnsw::HnswIndexManifest;
use crate::runtime::minhash_lsh::{HashPermutations, LshParams, MinHashLshIndexManifest, Weights}; use crate::runtime::minhash_lsh::{HashPermutations, LshParams, MinHashLshIndexManifest, Weights};
use crate::runtime::transact::SessionTx; use crate::runtime::transact::SessionTx;
use crate::{NamedRows, StoreTx}; use crate::{NamedRows, StoreTx};
use crate::utils::TempCollector;
#[derive( #[derive(
Copy, Copy,
@ -835,9 +836,14 @@ impl<'a> SessionTx<'a> {
let mut stack = vec![]; let mut stack = vec![];
let existing: Vec<_> = rel_handle.scan_all(self).try_collect()?;
let hash_perms = manifest.get_hash_perms(); let hash_perms = manifest.get_hash_perms();
for tuple in existing { let mut existing = TempCollector::default();
for tuple in rel_handle.scan_all(self) {
existing.push(tuple?);
}
for tuple in existing.into_iter() {
self.put_lsh_index_item( self.put_lsh_index_item(
&tuple, &tuple,
&extractor, &extractor,
@ -967,8 +973,11 @@ impl<'a> SessionTx<'a> {
let mut stack = vec![]; let mut stack = vec![];
let existing: Vec<_> = rel_handle.scan_all(self).try_collect()?; let mut existing = TempCollector::default();
for tuple in existing { for tuple in rel_handle.scan_all(self) {
existing.push(tuple?);
}
for tuple in existing.into_iter() {
let key_part = &tuple[..rel_handle.metadata.keys.len()]; let key_part = &tuple[..rel_handle.metadata.keys.len()];
if rel_handle.exists(self, key_part)? { if rel_handle.exists(self, key_part)? {
self.del_fts_index_item( self.del_fts_index_item(
@ -1150,7 +1159,10 @@ impl<'a> SessionTx<'a> {
}; };
// populate index // populate index
let all_tuples = rel_handle.scan_all(self).collect::<Result<Vec<_>>>()?; let mut all_tuples = TempCollector::default();
for tuple in rel_handle.scan_all(self) {
all_tuples.push(tuple?);
}
let filter = if let Some(f_code) = &manifest.index_filter { let filter = if let Some(f_code) = &manifest.index_filter {
let parsed = CozoScriptParser::parse(Rule::expr, f_code) let parsed = CozoScriptParser::parse(Rule::expr, f_code)
.into_diagnostic()? .into_diagnostic()?
@ -1169,7 +1181,7 @@ impl<'a> SessionTx<'a> {
Some(&filter) Some(&filter)
}; };
let mut stack = vec![]; let mut stack = vec![];
for tuple in all_tuples { for tuple in all_tuples.into_iter() {
self.hnsw_put( self.hnsw_put(
&manifest, &manifest,
&rel_handle, &rel_handle,
@ -1332,8 +1344,11 @@ impl<'a> SessionTx<'a> {
self.store_tx.par_put(&key, &[])?; self.store_tx.par_put(&key, &[])?;
} }
} else { } else {
for tuple in rel_handle.scan_all(self).collect_vec() { let mut existing = TempCollector::default();
let tuple = tuple?; for tuple in rel_handle.scan_all(self) {
existing.push(tuple?);
}
for tuple in existing.into_iter() {
let extracted = extraction_indices let extracted = extraction_indices
.iter() .iter()
.map(|idx| tuple[*idx].clone()) .map(|idx| tuple[*idx].clone())

@ -74,12 +74,21 @@ pub trait StoreTx<'s>: Sync {
/// Put a key-value pair into the storage. In case of existing key, /// Put a key-value pair into the storage. In case of existing key,
/// the storage engine needs to overwrite the old value. /// the storage engine needs to overwrite the old value.
/// The difference between this one and `put` is the mutability of self. /// The difference between this one and `put` is the mutability of self.
/// It is OK to always panic if `supports_par_put` returns `true`. /// It is OK to always panic if `supports_par_put` returns `false`.
fn par_put(&self, key: &[u8], val: &[u8]) -> Result<()>; fn par_put(&self, _key: &[u8], _val: &[u8]) -> Result<()> {
panic!("par_put is not supported")
}
/// Delete a key-value pair from the storage. /// Delete a key-value pair from the storage.
fn del(&mut self, key: &[u8]) -> Result<()>; fn del(&mut self, key: &[u8]) -> Result<()>;
/// Delete a key-value pair from the storage.
/// The difference between this one and `del` is the mutability of self.
/// It is OK to always panic if `supports_par_put` returns `false`.
fn par_del(&self, _key: &[u8]) -> Result<()> {
panic!("par_del is not supported")
}
/// Delete a range from persisted data only. /// Delete a range from persisted data only.
fn del_range_from_persisted(&mut self, lower: &[u8], upper: &[u8]) -> Result<()>; fn del_range_from_persisted(&mut self, lower: &[u8], upper: &[u8]) -> Result<()>;

@ -171,6 +171,7 @@ impl<'s> StoreTx<'s> for RocksDbTx {
true true
} }
#[inline]
fn par_put(&self, key: &[u8], val: &[u8]) -> Result<()> { fn par_put(&self, key: &[u8], val: &[u8]) -> Result<()> {
Ok(self.db_tx.put(key, val)?) Ok(self.db_tx.put(key, val)?)
} }
@ -180,6 +181,11 @@ impl<'s> StoreTx<'s> for RocksDbTx {
Ok(self.db_tx.del(key)?) Ok(self.db_tx.del(key)?)
} }
#[inline]
fn par_del(&self, key: &[u8]) -> Result<()> {
Ok(self.db_tx.del(key)?)
}
fn del_range_from_persisted(&mut self, lower: &[u8], upper: &[u8]) -> Result<()> { fn del_range_from_persisted(&mut self, lower: &[u8], upper: &[u8]) -> Result<()> {
let mut inner = self.db_tx.iterator().upper_bound(upper).start(); let mut inner = self.db_tx.iterator().upper_bound(upper).start();
inner.seek(lower); inner.seek(lower);

@ -19,7 +19,7 @@ use crate::data::tuple::Tuple;
use crate::data::value::ValidityTs; use crate::data::value::ValidityTs;
use crate::runtime::relation::decode_tuple_from_kv; use crate::runtime::relation::decode_tuple_from_kv;
use crate::storage::{Storage, StoreTx}; use crate::storage::{Storage, StoreTx};
use crate::utils::swap_option_result; use crate::utils::{swap_option_result, TempCollector};
/// Creates a Sled database object. Experimental. /// Creates a Sled database object. Experimental.
/// You should use [`new_cozo_rocksdb`](crate::new_cozo_rocksdb) or /// You should use [`new_cozo_rocksdb`](crate::new_cozo_rocksdb) or
@ -129,10 +129,6 @@ impl<'s> StoreTx<'s> for SledTx {
false false
} }
fn par_put(&self, _key: &[u8], _val: &[u8]) -> Result<()> {
panic!()
}
#[inline] #[inline]
fn del(&mut self, key: &[u8]) -> Result<()> { fn del(&mut self, key: &[u8]) -> Result<()> {
self.ensure_changes_db()?; self.ensure_changes_db()?;
@ -146,11 +142,14 @@ impl<'s> StoreTx<'s> for SledTx {
} }
fn del_range_from_persisted(&mut self, lower: &[u8], upper: &[u8]) -> Result<()> { fn del_range_from_persisted(&mut self, lower: &[u8], upper: &[u8]) -> Result<()> {
let to_del: Vec<_> = self let mut to_del = TempCollector::default();
.range_scan(lower, upper)
.map_ok(|(k, v)| k) for pair in self.range_scan(lower, upper) {
.try_collect()?; let (k, _) = pair?;
for k_res in to_del { to_del.push(k);
}
for k_res in to_del.into_iter() {
self.db.remove(&k_res)?; self.db.remove(&k_res)?;
} }
Ok(()) Ok(())

@ -215,6 +215,10 @@ impl<'s> StoreTx<'s> for SqliteTx<'s> {
} }
fn del(&mut self, key: &[u8]) -> Result<()> { fn del(&mut self, key: &[u8]) -> Result<()> {
self.par_del(key)
}
fn par_del(&self, key: &[u8]) -> Result<()> {
self.ensure_stmt(DEL_QUERY); self.ensure_stmt(DEL_QUERY);
let mut statement = self.stmts[DEL_QUERY].lock().unwrap(); let mut statement = self.stmts[DEL_QUERY].lock().unwrap();
let statement = statement.as_mut().unwrap(); let statement = statement.as_mut().unwrap();

@ -63,10 +63,6 @@ impl<'s> StoreTx<'s> for TempTx {
false false
} }
fn par_put(&self, _key: &[u8], _val: &[u8]) -> Result<()> {
panic!()
}
fn del(&mut self, key: &[u8]) -> Result<()> { fn del(&mut self, key: &[u8]) -> Result<()> {
self.store.remove(key); self.store.remove(key);
Ok(()) Ok(())

@ -20,7 +20,7 @@ use crate::data::tuple::Tuple;
use crate::data::value::ValidityTs; use crate::data::value::ValidityTs;
use crate::runtime::relation::decode_tuple_from_kv; use crate::runtime::relation::decode_tuple_from_kv;
use crate::storage::{Storage, StoreTx}; use crate::storage::{Storage, StoreTx};
use crate::utils::swap_option_result; use crate::utils::{swap_option_result, TempCollector};
use crate::Db; use crate::Db;
/// Connect to a Storage engine backed by TiKV. /// Connect to a Storage engine backed by TiKV.
@ -126,16 +126,21 @@ impl<'s> StoreTx<'s> for TiKvTx {
} }
fn del(&mut self, key: &[u8]) -> Result<()> { fn del(&mut self, key: &[u8]) -> Result<()> {
self.par_del(key)
}
fn par_del(&self, key: &[u8]) -> Result<()> {
RT.block_on(self.tx.lock().unwrap().delete(key.to_owned())) RT.block_on(self.tx.lock().unwrap().delete(key.to_owned()))
.into_diagnostic() .into_diagnostic()
} }
fn del_range_from_persisted(&mut self, lower: &[u8], upper: &[u8]) -> Result<()> { fn del_range_from_persisted(&mut self, lower: &[u8], upper: &[u8]) -> Result<()> {
let to_remove: Vec<_> = self let mut to_del = TempCollector::default();
.range_scan(lower, upper) for pair in self.range_scan(lower, upper) {
.map_ok(|(k, v)| k) to_del.push(pair?.0);
.try_collect()?; }
for key in to_remove {
for key in to_del.into_iter() {
self.del(&key)?; self.del(&key)?;
} }
Ok(()) Ok(())

@ -14,3 +14,18 @@ pub(crate) fn swap_option_result<T, E>(d: Result<Option<T>, E>) -> Option<Result
Err(e) => Some(Err(e)), Err(e) => Some(Err(e)),
} }
} }
#[derive(Default)]
pub(crate) struct TempCollector<T: serde::Serialize + for<'a> serde::Deserialize<'a>> {
// pub(crate) inner: Vec<T>,
pub(crate) inner: swapvec::SwapVec<T>,
}
impl<T: serde::Serialize + for<'a> serde::Deserialize<'a>> TempCollector<T> {
pub(crate) fn push(&mut self, val: T) {
self.inner.push(val).unwrap();
}
pub(crate) fn into_iter(self) -> impl Iterator<Item = T> {
self.inner.into_iter().map(|v| v.unwrap())
}
}

Loading…
Cancel
Save