diff --git a/cozo-core/src/runtime/db.rs b/cozo-core/src/runtime/db.rs index 8e3dc9f1..2815a797 100644 --- a/cozo-core/src/runtime/db.rs +++ b/cozo-core/src/runtime/db.rs @@ -293,18 +293,18 @@ impl<'s, S: Storage<'s>> Db { for payload in payloads { match payload { TransactionPayload::Commit => { + for (lower, upper) in cleanups { + if let Err(err) = tx.store_tx.del_range_from_persisted(&lower, &upper) { + eprintln!("{err:?}") + } + } + let _ = results.send(tx.commit_tx().map(|_| NamedRows::default())); #[cfg(not(target_arch = "wasm32"))] if !callback_collector.is_empty() { self.send_callbacks(callback_collector) } - for (lower, upper) in cleanups { - if let Err(err) = self.db.del_range(&lower, &upper) { - eprintln!("{err:?}") - } - } - break; } TransactionPayload::Abort => { @@ -893,11 +893,14 @@ impl<'s, S: Storage<'s>> Db { &mut callback_collector, )?; + for (lower, upper) in cleanups { + tx.store_tx.del_range_from_persisted(&lower, &upper)?; + } + if is_write { tx.commit_tx()?; } else { tx.commit_tx()?; - assert!(cleanups.is_empty(), "non-empty cleanups on read-only tx"); } } #[cfg(not(target_arch = "wasm32"))] @@ -905,9 +908,6 @@ impl<'s, S: Storage<'s>> Db { self.send_callbacks(callback_collector) } - for (lower, upper) in cleanups { - self.db.del_range(&lower, &upper)?; - } Ok(res) } fn explain_compiled(&self, strata: &[CompiledProgram]) -> Result { @@ -1168,19 +1168,17 @@ impl<'s, S: Storage<'s>> Db { let locks = self.obtain_relation_locks(rel_name_strs); let _guards = locks.iter().map(|l| l.read().unwrap()).collect_vec(); let mut bounds = vec![]; - { - let mut tx = self.transact_write()?; - for rs in rel_names { - let bound = tx.destroy_relation(&rs)?; - if !rs.is_temp_store_name() { - bounds.extend(bound); - } + let mut tx = self.transact_write()?; + for rs in rel_names { + let bound = tx.destroy_relation(&rs)?; + if !rs.is_temp_store_name() { + bounds.extend(bound); } - tx.commit_tx()?; } for (lower, upper) in bounds { - self.db.del_range(&lower, &upper)?; + tx.store_tx.del_range_from_persisted(&lower, &upper)?; } + tx.commit_tx()?; Ok(NamedRows::new( vec![STATUS_STR.to_string()], vec![vec![DataValue::from(OK_STR)]], @@ -1259,7 +1257,7 @@ impl<'s, S: Storage<'s>> Db { let mut tx = self.transact_write()?; let bounds = tx.remove_index(&rel_name, &idx_name)?; for (lower, upper) in bounds { - self.db.del_range(&lower, &upper)?; + tx.store_tx.del_range_from_persisted(&lower, &upper)?; } tx.commit_tx()?; Ok(NamedRows::new( diff --git a/cozo-core/src/runtime/imperative.rs b/cozo-core/src/runtime/imperative.rs index a6673c03..8a6ffd74 100644 --- a/cozo-core/src/runtime/imperative.rs +++ b/cozo-core/src/runtime/imperative.rs @@ -304,11 +304,14 @@ impl<'s, S: Storage<'s>> Db { }, } + for (lower, upper) in cleanups { + tx.store_tx.del_range_from_persisted(&lower, &upper)?; + } + if is_write { tx.commit_tx()?; } else { tx.commit_tx()?; - assert!(cleanups.is_empty(), "non-empty cleanups on read-only tx"); } } #[cfg(not(target_arch = "wasm32"))] @@ -316,9 +319,6 @@ impl<'s, S: Storage<'s>> Db { self.send_callbacks(callback_collector) } - for (lower, upper) in cleanups { - self.db.del_range(&lower, &upper)?; - } Ok(ret) } } diff --git a/cozo-core/src/runtime/relation.rs b/cozo-core/src/runtime/relation.rs index 58debecc..a44f7801 100644 --- a/cozo-core/src/runtime/relation.rs +++ b/cozo-core/src/runtime/relation.rs @@ -664,7 +664,11 @@ impl<'a> SessionTx<'a> { let metadata = RelationHandle::decode(&found)?; Ok(metadata) } - pub(crate) fn describe_relation(&mut self, name: &str, description: SmartString) -> Result<()> { + pub(crate) fn describe_relation( + &mut self, + name: &str, + description: SmartString, + ) -> Result<()> { let mut meta = self.get_relation(name, true)?; meta.description = description; diff --git a/cozo-core/src/runtime/tests.rs b/cozo-core/src/runtime/tests.rs index 52082807..9e283141 100644 --- a/cozo-core/src/runtime/tests.rs +++ b/cozo-core/src/runtime/tests.rs @@ -609,6 +609,8 @@ fn test_index() { .map(|row| row.as_array().unwrap()[5].clone()) .collect_vec(); assert!(joins.contains(&json!(":friends:rev"))); + db.run_script("::index drop friends:rev", Default::default()) + .unwrap(); } #[test] @@ -948,7 +950,6 @@ fn test_fts_indexing() { } } - #[test] fn test_lsh_indexing() { let db = DbInstance::new("mem", "", "").unwrap(); @@ -958,12 +959,12 @@ fn test_lsh_indexing() { r"?[k, v] <- [['a', 'hello world!'], ['b', 'the world is round']] :put a {k => v}", Default::default(), ) - .unwrap(); + .unwrap(); db.run_script( r"::lsh create a:lsh {extractor: v, tokenizer: Simple, n_gram: 3, target_threshold: 0.3 }", Default::default(), ) - .unwrap(); + .unwrap(); db.run_script( r"?[k, v] <- [ ['b', 'the world is square!'], @@ -973,8 +974,10 @@ fn test_lsh_indexing() { ] :put a {k => v}", Default::default(), ) + .unwrap(); + let res = db + .run_script("::columns a:lsh", Default::default()) .unwrap(); - let res = db.run_script("::columns a:lsh", Default::default()).unwrap(); for row in res.into_json()["rows"].as_array().unwrap() { println!("{}", row); } @@ -1019,6 +1022,8 @@ fn test_lsh_indexing() { for row in res.into_json()["rows"].as_array().unwrap() { println!("{}", row); } + db.run_script(r"::lsh drop a:lsh", Default::default()) + .unwrap(); } #[test] @@ -1142,7 +1147,9 @@ fn multi_index_vec() { "#, Default::default(), ).unwrap(); - let res = db.run_script("::indices product", Default::default()).unwrap(); + let res = db + .run_script("::indices product", Default::default()) + .unwrap(); for row in res.into_json()["rows"].as_array().unwrap() { println!("{}", row); } diff --git a/cozo-core/src/storage/mem.rs b/cozo-core/src/storage/mem.rs index a9dad332..d27cc9ba 100644 --- a/cozo-core/src/storage/mem.rs +++ b/cozo-core/src/storage/mem.rs @@ -58,29 +58,6 @@ impl<'s> Storage<'s> for MemStorage { }) } - fn del_range(&'s self, lower: &[u8], upper: &[u8]) -> Result<()> { - let store = self.store.clone(); - let lower_b = lower.to_vec(); - let upper_b = upper.to_vec(); - let closure = move || { - let keys = { - let rdr = store.read().unwrap(); - rdr.range(lower_b..upper_b) - .map(|kv| kv.0.clone()) - .collect_vec() - }; - let mut wtr = store.write().unwrap(); - for k in keys.iter() { - wtr.remove(k); - } - }; - #[cfg(target_arch = "wasm32")] - closure(); - #[cfg(not(target_arch = "wasm32"))] - std::thread::spawn(closure); - Ok(()) - } - fn range_compact(&'s self, _lower: &[u8], _upper: &[u8]) -> Result<()> { Ok(()) } @@ -149,6 +126,25 @@ impl<'s> StoreTx<'s> for MemTx<'s> { } } + fn del_range_from_persisted(&mut self, lower: &[u8], upper: &[u8]) -> Result<()> { + match self { + MemTx::Reader(_) => { + bail!("write in read transaction") + } + MemTx::Writer(ref mut wtr, _) => { + let keys = wtr + .range(lower.to_vec()..upper.to_vec()) + .map(|kv| kv.0.clone()) + .collect_vec(); + for k in keys.iter() { + wtr.remove(k); + } + } + } + + Ok(()) + } + fn exists(&self, key: &[u8], _for_update: bool) -> Result { Ok(match self { MemTx::Reader(rdr) => rdr.contains_key(key), diff --git a/cozo-core/src/storage/mod.rs b/cozo-core/src/storage/mod.rs index 4c53e3df..2cc421d7 100644 --- a/cozo-core/src/storage/mod.rs +++ b/cozo-core/src/storage/mod.rs @@ -36,11 +36,6 @@ pub trait Storage<'s>: Send + Sync + Clone { /// Create a transaction object. Write ops will only be called when `write == true`. fn transact(&'s self, write: bool) -> Result; - /// Delete a range. It is ok to return immediately and do the deletion in - /// the background. It is guaranteed that no keys within the deleted range - /// will be accessed in any way by any transaction again. - fn del_range(&'s self, lower: &[u8], upper: &[u8]) -> Result<()>; - /// Compact the key range. Can be a no-op if the storage engine does not /// have the concept of compaction. fn range_compact(&'s self, lower: &[u8], upper: &[u8]) -> Result<()>; @@ -85,6 +80,9 @@ pub trait StoreTx<'s>: Sync { /// Delete a key-value pair from the storage. fn del(&mut self, key: &[u8]) -> Result<()>; + /// Delete a range from persisted data only. + fn del_range_from_persisted(&mut self, lower: &[u8], upper: &[u8]) -> Result<()>; + /// Check if a key exists. If `for_update` is `true` (only possible in a write transaction), /// then the database needs to guarantee that `commit()` can only succeed if /// the key has not been modified outside the transaction. diff --git a/cozo-core/src/storage/rocks.rs b/cozo-core/src/storage/rocks.rs index 4c0ca9ec..2b973c0b 100644 --- a/cozo-core/src/storage/rocks.rs +++ b/cozo-core/src/storage/rocks.rs @@ -134,10 +134,6 @@ impl Storage<'_> for RocksDbStorage { Ok(RocksDbTx { db_tx }) } - fn del_range(&self, lower: &[u8], upper: &[u8]) -> Result<()> { - Ok(self.db.range_del(lower, upper)?) - } - fn range_compact(&self, lower: &[u8], upper: &[u8]) -> Result<()> { self.db.range_compact(lower, upper).into_diagnostic() } @@ -184,6 +180,19 @@ impl<'s> StoreTx<'s> for RocksDbTx { Ok(self.db_tx.del(key)?) } + fn del_range_from_persisted(&mut self, lower: &[u8], upper: &[u8]) -> Result<()> { + let mut inner = self.db_tx.iterator().upper_bound(upper).start(); + inner.seek(lower); + while let Some(key) = inner.key()? { + if key >= upper { + break; + } + self.db_tx.del(key)?; + inner.next(); + } + Ok(()) + } + #[inline] fn exists(&self, key: &[u8], for_update: bool) -> Result { Ok(self.db_tx.exists(key, for_update)?) @@ -242,7 +251,10 @@ impl<'s> StoreTx<'s> for RocksDbTx { }) } - fn range_count<'a>(&'a self, lower: &[u8], upper: &[u8]) -> Result where 's: 'a { + fn range_count<'a>(&'a self, lower: &[u8], upper: &[u8]) -> Result + where + 's: 'a, + { let mut inner = self.db_tx.iterator().upper_bound(upper).start(); inner.seek(lower); let mut count = 0; diff --git a/cozo-core/src/storage/sled.rs b/cozo-core/src/storage/sled.rs index 37b882db..3db3233f 100644 --- a/cozo-core/src/storage/sled.rs +++ b/cozo-core/src/storage/sled.rs @@ -55,19 +55,6 @@ impl Storage<'_> for SledStorage { }) } - fn del_range(&self, lower: &[u8], upper: &[u8]) -> Result<()> { - let db = self.db.clone(); - let lower_v = lower.to_vec(); - let upper_v = upper.to_vec(); - thread::spawn(move || -> Result<()> { - for k_res in db.range(lower_v..upper_v).keys() { - db.remove(k_res.into_diagnostic()?).into_diagnostic()?; - } - Ok(()) - }); - Ok(()) - } - fn range_compact(&self, _lower: &[u8], _upper: &[u8]) -> Result<()> { Ok(()) } @@ -158,6 +145,17 @@ impl<'s> StoreTx<'s> for SledTx { Ok(()) } + fn del_range_from_persisted(&mut self, lower: &[u8], upper: &[u8]) -> Result<()> { + let to_del: Vec<_> = self + .range_scan(lower, upper) + .map_ok(|(k, v)| k) + .try_collect()?; + for k_res in to_del { + self.db.remove(&k_res)?; + } + Ok(()) + } + #[inline] fn exists(&self, key: &[u8], _for_update: bool) -> Result { if let Some(changes) = &self.changes { diff --git a/cozo-core/src/storage/sqlite.rs b/cozo-core/src/storage/sqlite.rs index 9daa7348..2af9a7d6 100644 --- a/cozo-core/src/storage/sqlite.rs +++ b/cozo-core/src/storage/sqlite.rs @@ -92,26 +92,6 @@ impl<'s> Storage<'s> for SqliteStorage { }) } - fn del_range(&'_ self, lower: &[u8], upper: &[u8]) -> Result<()> { - let lower_b = lower.to_vec(); - let upper_b = upper.to_vec(); - let query = r#" - delete from cozo where k >= ? and k < ?; - "#; - let lock = self.lock.clone(); - let name = self.name.clone(); - let closure = move || { - let _locked = lock.write().unwrap(); - let conn = sqlite::open(&name).unwrap(); - let mut statement = conn.prepare(query).unwrap(); - statement.bind((1, &lower_b as &[u8])).unwrap(); - statement.bind((2, &upper_b as &[u8])).unwrap(); - while statement.next().unwrap() != State::Done {} - }; - std::thread::spawn(closure); - Ok(()) - } - fn batch_put<'a>( &'a self, data: Box, Vec)>> + 'a>, @@ -155,7 +135,7 @@ const QUERIES: [&str; N_QUERIES] = [ "select 1 from cozo where k = ?;", "select k, v from cozo where k >= ? and k < ? order by k;", "select k, v from cozo where k >= ? and k < ? order by k limit 1;", - "select count(*) from cozo where k >= ? and k < ?;" + "select count(*) from cozo where k >= ? and k < ?;", ]; const GET_QUERY: usize = 0; @@ -246,6 +226,18 @@ impl<'s> StoreTx<'s> for SqliteTx<'s> { Ok(()) } + fn del_range_from_persisted(&mut self, lower: &[u8], upper: &[u8]) -> Result<()> { + let query = r#" + delete from cozo where k >= ? and k < ?; + "#; + let mut statement = self.conn.as_ref().unwrap().prepare(query).unwrap(); + + statement.bind((1, lower)).unwrap(); + statement.bind((2, upper)).unwrap(); + while statement.next().unwrap() != State::Done {} + Ok(()) + } + fn exists(&self, key: &[u8], _for_update: bool) -> Result { self.ensure_stmt(EXISTS_QUERY); let mut statement = self.stmts[EXISTS_QUERY].lock().unwrap(); @@ -321,7 +313,10 @@ impl<'s> StoreTx<'s> for SqliteTx<'s> { Box::new(RawIter(statement)) } - fn range_count<'a>(&'a self, lower: &[u8], upper: &[u8]) -> Result where 's: 'a { + fn range_count<'a>(&'a self, lower: &[u8], upper: &[u8]) -> Result + where + 's: 'a, + { let query = QUERIES[COUNT_RANGE_QUERY]; let mut statement = self.conn.as_ref().unwrap().prepare(query).unwrap(); statement.bind((1, lower)).unwrap(); diff --git a/cozo-core/src/storage/temp.rs b/cozo-core/src/storage/temp.rs index 15a5a001..7038e8fa 100644 --- a/cozo-core/src/storage/temp.rs +++ b/cozo-core/src/storage/temp.rs @@ -33,10 +33,6 @@ impl<'s> Storage<'s> for TempStorage { }) } - fn del_range(&'s self, _lower: &[u8], _upper: &[u8]) -> Result<()> { - panic!("del_range called on temp store") - } - fn range_compact(&'s self, _lower: &[u8], _upper: &[u8]) -> Result<()> { panic!("range compact called on temp store") } @@ -76,6 +72,10 @@ impl<'s> StoreTx<'s> for TempTx { Ok(()) } + fn del_range_from_persisted(&mut self, _lower: &[u8], _upper: &[u8]) -> Result<()> { + Ok(()) + } + fn exists(&self, key: &[u8], _for_update: bool) -> Result { Ok(self.store.contains_key(key)) } diff --git a/cozo-core/src/storage/tikv.rs b/cozo-core/src/storage/tikv.rs index d65e8ed8..27cc50f4 100644 --- a/cozo-core/src/storage/tikv.rs +++ b/cozo-core/src/storage/tikv.rs @@ -79,17 +79,6 @@ impl Storage<'_> for TiKvStorage { }) } - fn del_range(&self, lower: &[u8], upper: &[u8]) -> Result<()> { - let raw_client = self.raw_client.clone(); - let lower_b = lower.to_owned(); - let upper_b = upper.to_owned(); - thread::spawn(move || { - RT.block_on(raw_client.delete_range(lower_b..upper_b)) - .unwrap(); - }); - Ok(()) - } - fn range_compact(&self, _lower: &[u8], _upper: &[u8]) -> Result<()> { Ok(()) } @@ -141,6 +130,17 @@ impl<'s> StoreTx<'s> for TiKvTx { .into_diagnostic() } + fn del_range_from_persisted(&mut self, lower: &[u8], upper: &[u8]) -> Result<()> { + let to_remove: Vec<_> = self + .range_scan(lower, upper) + .map_ok(|(k, v)| k) + .try_collect()?; + for key in to_remove { + self.del(&key)?; + } + Ok(()) + } + fn exists(&self, key: &[u8], for_update: bool) -> Result { if for_update { RT.block_on(self.tx.lock().unwrap().get_for_update(key.to_owned()))