From c06b0f0ccbb9ce06453d3f155c9418353c22f6b5 Mon Sep 17 00:00:00 2001 From: Ziyang Hu Date: Tue, 2 May 2023 13:14:49 +0800 Subject: [PATCH] use disk cached collection for two-phase ops --- Cargo.lock | 36 +++++++++++++++++++++++++++++-- cozo-core/Cargo.toml | 3 ++- cozo-core/src/runtime/relation.rs | 31 +++++++++++++++++++------- cozo-core/src/storage/mod.rs | 13 +++++++++-- cozo-core/src/storage/rocks.rs | 6 ++++++ cozo-core/src/storage/sled.rs | 19 ++++++++-------- cozo-core/src/storage/sqlite.rs | 4 ++++ cozo-core/src/storage/temp.rs | 4 ---- cozo-core/src/storage/tikv.rs | 17 +++++++++------ cozo-core/src/utils.rs | 15 +++++++++++++ 10 files changed, 115 insertions(+), 33 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 601807a6..a0250354 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -304,7 +304,7 @@ dependencies = [ "cc", "cfg-if 1.0.0", "libc", - "miniz_oxide", + "miniz_oxide 0.6.2", "object", "rustc-demangle", ] @@ -743,6 +743,7 @@ dependencies = [ "smartstring", "sqlite", "sqlite3-src", + "swapvec", "thiserror", "tikv-client", "tikv-jemallocator-global", @@ -1264,7 +1265,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a8a2db397cb1c8772f31494cb8917e48cd1e64f0fa7efac59fbd741a0a8ce841" dependencies = [ "crc32fast", - "miniz_oxide", + "miniz_oxide 0.6.2", ] [[package]] @@ -2027,6 +2028,15 @@ dependencies = [ "libc", ] +[[package]] +name = "lz4_flex" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b8c72594ac26bfd34f2d99dfced2edfaddfe8a476e3ff2ca0eb293d925c4f83" +dependencies = [ + "twox-hash", +] + [[package]] name = "matchit" version = "0.7.0" @@ -2129,6 +2139,15 @@ dependencies = [ "adler", ] +[[package]] +name = "miniz_oxide" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7810e0be55b428ada41041c41f32c9f1a42817901b4ccf45fa3d4b6561e74c7" +dependencies = [ + "adler", +] + [[package]] name = "minreq" version = "2.7.0" @@ -3640,6 +3659,19 @@ dependencies = [ "is-terminal", ] +[[package]] +name = "swapvec" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f6d2e759025e1ddace58d35fd578843d20b517a5692237a6edb6c27da4fbc49" +dependencies = [ + "bincode", + "lz4_flex", + "miniz_oxide 0.7.1", + "serde", + "tempfile", +] + [[package]] name = "swift-bridge" version = "0.1.51" diff --git a/cozo-core/Cargo.toml b/cozo-core/Cargo.toml index 8dedafd3..c1bfafd4 100644 --- a/cozo-core/Cargo.toml +++ b/cozo-core/Cargo.toml @@ -136,4 +136,5 @@ quadrature = "0.1.2" jieba-rs = "0.6.7" aho-corasick = "1.0.1" rust-stemmers = "1.2.0" -fast2s = "0.3.1" \ No newline at end of file +fast2s = "0.3.1" +swapvec = "0.2.0" \ No newline at end of file diff --git a/cozo-core/src/runtime/relation.rs b/cozo-core/src/runtime/relation.rs index a44f7801..00380614 100644 --- a/cozo-core/src/runtime/relation.rs +++ b/cozo-core/src/runtime/relation.rs @@ -33,6 +33,7 @@ use crate::runtime::hnsw::HnswIndexManifest; use crate::runtime::minhash_lsh::{HashPermutations, LshParams, MinHashLshIndexManifest, Weights}; use crate::runtime::transact::SessionTx; use crate::{NamedRows, StoreTx}; +use crate::utils::TempCollector; #[derive( Copy, @@ -835,9 +836,14 @@ impl<'a> SessionTx<'a> { let mut stack = vec![]; - let existing: Vec<_> = rel_handle.scan_all(self).try_collect()?; + let hash_perms = manifest.get_hash_perms(); - for tuple in existing { + let mut existing = TempCollector::default(); + for tuple in rel_handle.scan_all(self) { + existing.push(tuple?); + } + + for tuple in existing.into_iter() { self.put_lsh_index_item( &tuple, &extractor, @@ -967,8 +973,11 @@ impl<'a> SessionTx<'a> { let mut stack = vec![]; - let existing: Vec<_> = rel_handle.scan_all(self).try_collect()?; - for tuple in existing { + let mut existing = TempCollector::default(); + for tuple in rel_handle.scan_all(self) { + existing.push(tuple?); + } + for tuple in existing.into_iter() { let key_part = &tuple[..rel_handle.metadata.keys.len()]; if rel_handle.exists(self, key_part)? { self.del_fts_index_item( @@ -1150,7 +1159,10 @@ impl<'a> SessionTx<'a> { }; // populate index - let all_tuples = rel_handle.scan_all(self).collect::>>()?; + let mut all_tuples = TempCollector::default(); + for tuple in rel_handle.scan_all(self) { + all_tuples.push(tuple?); + } let filter = if let Some(f_code) = &manifest.index_filter { let parsed = CozoScriptParser::parse(Rule::expr, f_code) .into_diagnostic()? @@ -1169,7 +1181,7 @@ impl<'a> SessionTx<'a> { Some(&filter) }; let mut stack = vec![]; - for tuple in all_tuples { + for tuple in all_tuples.into_iter() { self.hnsw_put( &manifest, &rel_handle, @@ -1332,8 +1344,11 @@ impl<'a> SessionTx<'a> { self.store_tx.par_put(&key, &[])?; } } else { - for tuple in rel_handle.scan_all(self).collect_vec() { - let tuple = tuple?; + let mut existing = TempCollector::default(); + for tuple in rel_handle.scan_all(self) { + existing.push(tuple?); + } + for tuple in existing.into_iter() { let extracted = extraction_indices .iter() .map(|idx| tuple[*idx].clone()) diff --git a/cozo-core/src/storage/mod.rs b/cozo-core/src/storage/mod.rs index 2cc421d7..22d9b115 100644 --- a/cozo-core/src/storage/mod.rs +++ b/cozo-core/src/storage/mod.rs @@ -74,12 +74,21 @@ pub trait StoreTx<'s>: Sync { /// Put a key-value pair into the storage. In case of existing key, /// the storage engine needs to overwrite the old value. /// The difference between this one and `put` is the mutability of self. - /// It is OK to always panic if `supports_par_put` returns `true`. - fn par_put(&self, key: &[u8], val: &[u8]) -> Result<()>; + /// It is OK to always panic if `supports_par_put` returns `false`. + fn par_put(&self, _key: &[u8], _val: &[u8]) -> Result<()> { + panic!("par_put is not supported") + } /// Delete a key-value pair from the storage. fn del(&mut self, key: &[u8]) -> Result<()>; + /// Delete a key-value pair from the storage. + /// The difference between this one and `del` is the mutability of self. + /// It is OK to always panic if `supports_par_put` returns `false`. + fn par_del(&self, _key: &[u8]) -> Result<()> { + panic!("par_del is not supported") + } + /// Delete a range from persisted data only. fn del_range_from_persisted(&mut self, lower: &[u8], upper: &[u8]) -> Result<()>; diff --git a/cozo-core/src/storage/rocks.rs b/cozo-core/src/storage/rocks.rs index 2b973c0b..db81dc2a 100644 --- a/cozo-core/src/storage/rocks.rs +++ b/cozo-core/src/storage/rocks.rs @@ -171,6 +171,7 @@ impl<'s> StoreTx<'s> for RocksDbTx { true } + #[inline] fn par_put(&self, key: &[u8], val: &[u8]) -> Result<()> { Ok(self.db_tx.put(key, val)?) } @@ -180,6 +181,11 @@ impl<'s> StoreTx<'s> for RocksDbTx { Ok(self.db_tx.del(key)?) } + #[inline] + fn par_del(&self, key: &[u8]) -> Result<()> { + Ok(self.db_tx.del(key)?) + } + fn del_range_from_persisted(&mut self, lower: &[u8], upper: &[u8]) -> Result<()> { let mut inner = self.db_tx.iterator().upper_bound(upper).start(); inner.seek(lower); diff --git a/cozo-core/src/storage/sled.rs b/cozo-core/src/storage/sled.rs index 3db3233f..756bfcff 100644 --- a/cozo-core/src/storage/sled.rs +++ b/cozo-core/src/storage/sled.rs @@ -19,7 +19,7 @@ use crate::data::tuple::Tuple; use crate::data::value::ValidityTs; use crate::runtime::relation::decode_tuple_from_kv; use crate::storage::{Storage, StoreTx}; -use crate::utils::swap_option_result; +use crate::utils::{swap_option_result, TempCollector}; /// Creates a Sled database object. Experimental. /// You should use [`new_cozo_rocksdb`](crate::new_cozo_rocksdb) or @@ -129,10 +129,6 @@ impl<'s> StoreTx<'s> for SledTx { false } - fn par_put(&self, _key: &[u8], _val: &[u8]) -> Result<()> { - panic!() - } - #[inline] fn del(&mut self, key: &[u8]) -> Result<()> { self.ensure_changes_db()?; @@ -146,11 +142,14 @@ impl<'s> StoreTx<'s> for SledTx { } fn del_range_from_persisted(&mut self, lower: &[u8], upper: &[u8]) -> Result<()> { - let to_del: Vec<_> = self - .range_scan(lower, upper) - .map_ok(|(k, v)| k) - .try_collect()?; - for k_res in to_del { + let mut to_del = TempCollector::default(); + + for pair in self.range_scan(lower, upper) { + let (k, _) = pair?; + to_del.push(k); + } + + for k_res in to_del.into_iter() { self.db.remove(&k_res)?; } Ok(()) diff --git a/cozo-core/src/storage/sqlite.rs b/cozo-core/src/storage/sqlite.rs index 2af9a7d6..f2093957 100644 --- a/cozo-core/src/storage/sqlite.rs +++ b/cozo-core/src/storage/sqlite.rs @@ -215,6 +215,10 @@ impl<'s> StoreTx<'s> for SqliteTx<'s> { } fn del(&mut self, key: &[u8]) -> Result<()> { + self.par_del(key) + } + + fn par_del(&self, key: &[u8]) -> Result<()> { self.ensure_stmt(DEL_QUERY); let mut statement = self.stmts[DEL_QUERY].lock().unwrap(); let statement = statement.as_mut().unwrap(); diff --git a/cozo-core/src/storage/temp.rs b/cozo-core/src/storage/temp.rs index 7038e8fa..7d60c00c 100644 --- a/cozo-core/src/storage/temp.rs +++ b/cozo-core/src/storage/temp.rs @@ -63,10 +63,6 @@ impl<'s> StoreTx<'s> for TempTx { false } - fn par_put(&self, _key: &[u8], _val: &[u8]) -> Result<()> { - panic!() - } - fn del(&mut self, key: &[u8]) -> Result<()> { self.store.remove(key); Ok(()) diff --git a/cozo-core/src/storage/tikv.rs b/cozo-core/src/storage/tikv.rs index 27cc50f4..a47e24b3 100644 --- a/cozo-core/src/storage/tikv.rs +++ b/cozo-core/src/storage/tikv.rs @@ -20,7 +20,7 @@ use crate::data::tuple::Tuple; use crate::data::value::ValidityTs; use crate::runtime::relation::decode_tuple_from_kv; use crate::storage::{Storage, StoreTx}; -use crate::utils::swap_option_result; +use crate::utils::{swap_option_result, TempCollector}; use crate::Db; /// Connect to a Storage engine backed by TiKV. @@ -126,16 +126,21 @@ impl<'s> StoreTx<'s> for TiKvTx { } fn del(&mut self, key: &[u8]) -> Result<()> { + self.par_del(key) + } + + fn par_del(&self, key: &[u8]) -> Result<()> { RT.block_on(self.tx.lock().unwrap().delete(key.to_owned())) .into_diagnostic() } fn del_range_from_persisted(&mut self, lower: &[u8], upper: &[u8]) -> Result<()> { - let to_remove: Vec<_> = self - .range_scan(lower, upper) - .map_ok(|(k, v)| k) - .try_collect()?; - for key in to_remove { + let mut to_del = TempCollector::default(); + for pair in self.range_scan(lower, upper) { + to_del.push(pair?.0); + } + + for key in to_del.into_iter() { self.del(&key)?; } Ok(()) diff --git a/cozo-core/src/utils.rs b/cozo-core/src/utils.rs index b2d3d8db..ca45db9a 100644 --- a/cozo-core/src/utils.rs +++ b/cozo-core/src/utils.rs @@ -14,3 +14,18 @@ pub(crate) fn swap_option_result(d: Result, E>) -> Option Some(Err(e)), } } + +#[derive(Default)] +pub(crate) struct TempCollector serde::Deserialize<'a>> { + // pub(crate) inner: Vec, + pub(crate) inner: swapvec::SwapVec, +} + +impl serde::Deserialize<'a>> TempCollector { + pub(crate) fn push(&mut self, val: T) { + self.inner.push(val).unwrap(); + } + pub(crate) fn into_iter(self) -> impl Iterator { + self.inner.into_iter().map(|v| v.unwrap()) + } +}