fix a serious problem where data from dropped relations not cleared on disk

main
Ziyang Hu 1 year ago
parent 45441ec398
commit 29b2c4d900

@ -293,18 +293,18 @@ impl<'s, S: Storage<'s>> Db<S> {
for payload in payloads { for payload in payloads {
match payload { match payload {
TransactionPayload::Commit => { TransactionPayload::Commit => {
for (lower, upper) in cleanups {
if let Err(err) = tx.store_tx.del_range_from_persisted(&lower, &upper) {
eprintln!("{err:?}")
}
}
let _ = results.send(tx.commit_tx().map(|_| NamedRows::default())); let _ = results.send(tx.commit_tx().map(|_| NamedRows::default()));
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
if !callback_collector.is_empty() { if !callback_collector.is_empty() {
self.send_callbacks(callback_collector) self.send_callbacks(callback_collector)
} }
for (lower, upper) in cleanups {
if let Err(err) = self.db.del_range(&lower, &upper) {
eprintln!("{err:?}")
}
}
break; break;
} }
TransactionPayload::Abort => { TransactionPayload::Abort => {
@ -893,11 +893,14 @@ impl<'s, S: Storage<'s>> Db<S> {
&mut callback_collector, &mut callback_collector,
)?; )?;
for (lower, upper) in cleanups {
tx.store_tx.del_range_from_persisted(&lower, &upper)?;
}
if is_write { if is_write {
tx.commit_tx()?; tx.commit_tx()?;
} else { } else {
tx.commit_tx()?; tx.commit_tx()?;
assert!(cleanups.is_empty(), "non-empty cleanups on read-only tx");
} }
} }
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
@ -905,9 +908,6 @@ impl<'s, S: Storage<'s>> Db<S> {
self.send_callbacks(callback_collector) self.send_callbacks(callback_collector)
} }
for (lower, upper) in cleanups {
self.db.del_range(&lower, &upper)?;
}
Ok(res) Ok(res)
} }
fn explain_compiled(&self, strata: &[CompiledProgram]) -> Result<NamedRows> { fn explain_compiled(&self, strata: &[CompiledProgram]) -> Result<NamedRows> {
@ -1168,19 +1168,17 @@ impl<'s, S: Storage<'s>> Db<S> {
let locks = self.obtain_relation_locks(rel_name_strs); let locks = self.obtain_relation_locks(rel_name_strs);
let _guards = locks.iter().map(|l| l.read().unwrap()).collect_vec(); let _guards = locks.iter().map(|l| l.read().unwrap()).collect_vec();
let mut bounds = vec![]; let mut bounds = vec![];
{ let mut tx = self.transact_write()?;
let mut tx = self.transact_write()?; for rs in rel_names {
for rs in rel_names { let bound = tx.destroy_relation(&rs)?;
let bound = tx.destroy_relation(&rs)?; if !rs.is_temp_store_name() {
if !rs.is_temp_store_name() { bounds.extend(bound);
bounds.extend(bound);
}
} }
tx.commit_tx()?;
} }
for (lower, upper) in bounds { for (lower, upper) in bounds {
self.db.del_range(&lower, &upper)?; tx.store_tx.del_range_from_persisted(&lower, &upper)?;
} }
tx.commit_tx()?;
Ok(NamedRows::new( Ok(NamedRows::new(
vec![STATUS_STR.to_string()], vec![STATUS_STR.to_string()],
vec![vec![DataValue::from(OK_STR)]], vec![vec![DataValue::from(OK_STR)]],
@ -1259,7 +1257,7 @@ impl<'s, S: Storage<'s>> Db<S> {
let mut tx = self.transact_write()?; let mut tx = self.transact_write()?;
let bounds = tx.remove_index(&rel_name, &idx_name)?; let bounds = tx.remove_index(&rel_name, &idx_name)?;
for (lower, upper) in bounds { for (lower, upper) in bounds {
self.db.del_range(&lower, &upper)?; tx.store_tx.del_range_from_persisted(&lower, &upper)?;
} }
tx.commit_tx()?; tx.commit_tx()?;
Ok(NamedRows::new( Ok(NamedRows::new(

@ -304,11 +304,14 @@ impl<'s, S: Storage<'s>> Db<S> {
}, },
} }
for (lower, upper) in cleanups {
tx.store_tx.del_range_from_persisted(&lower, &upper)?;
}
if is_write { if is_write {
tx.commit_tx()?; tx.commit_tx()?;
} else { } else {
tx.commit_tx()?; tx.commit_tx()?;
assert!(cleanups.is_empty(), "non-empty cleanups on read-only tx");
} }
} }
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
@ -316,9 +319,6 @@ impl<'s, S: Storage<'s>> Db<S> {
self.send_callbacks(callback_collector) self.send_callbacks(callback_collector)
} }
for (lower, upper) in cleanups {
self.db.del_range(&lower, &upper)?;
}
Ok(ret) Ok(ret)
} }
} }

@ -664,7 +664,11 @@ impl<'a> SessionTx<'a> {
let metadata = RelationHandle::decode(&found)?; let metadata = RelationHandle::decode(&found)?;
Ok(metadata) Ok(metadata)
} }
pub(crate) fn describe_relation(&mut self, name: &str, description: SmartString<LazyCompact>) -> Result<()> { pub(crate) fn describe_relation(
&mut self,
name: &str,
description: SmartString<LazyCompact>,
) -> Result<()> {
let mut meta = self.get_relation(name, true)?; let mut meta = self.get_relation(name, true)?;
meta.description = description; meta.description = description;

@ -609,6 +609,8 @@ fn test_index() {
.map(|row| row.as_array().unwrap()[5].clone()) .map(|row| row.as_array().unwrap()[5].clone())
.collect_vec(); .collect_vec();
assert!(joins.contains(&json!(":friends:rev"))); assert!(joins.contains(&json!(":friends:rev")));
db.run_script("::index drop friends:rev", Default::default())
.unwrap();
} }
#[test] #[test]
@ -948,7 +950,6 @@ fn test_fts_indexing() {
} }
} }
#[test] #[test]
fn test_lsh_indexing() { fn test_lsh_indexing() {
let db = DbInstance::new("mem", "", "").unwrap(); let db = DbInstance::new("mem", "", "").unwrap();
@ -958,12 +959,12 @@ fn test_lsh_indexing() {
r"?[k, v] <- [['a', 'hello world!'], ['b', 'the world is round']] :put a {k => v}", r"?[k, v] <- [['a', 'hello world!'], ['b', 'the world is round']] :put a {k => v}",
Default::default(), Default::default(),
) )
.unwrap(); .unwrap();
db.run_script( db.run_script(
r"::lsh create a:lsh {extractor: v, tokenizer: Simple, n_gram: 3, target_threshold: 0.3 }", r"::lsh create a:lsh {extractor: v, tokenizer: Simple, n_gram: 3, target_threshold: 0.3 }",
Default::default(), Default::default(),
) )
.unwrap(); .unwrap();
db.run_script( db.run_script(
r"?[k, v] <- [ r"?[k, v] <- [
['b', 'the world is square!'], ['b', 'the world is square!'],
@ -973,8 +974,10 @@ fn test_lsh_indexing() {
] :put a {k => v}", ] :put a {k => v}",
Default::default(), Default::default(),
) )
.unwrap();
let res = db
.run_script("::columns a:lsh", Default::default())
.unwrap(); .unwrap();
let res = db.run_script("::columns a:lsh", Default::default()).unwrap();
for row in res.into_json()["rows"].as_array().unwrap() { for row in res.into_json()["rows"].as_array().unwrap() {
println!("{}", row); println!("{}", row);
} }
@ -1019,6 +1022,8 @@ fn test_lsh_indexing() {
for row in res.into_json()["rows"].as_array().unwrap() { for row in res.into_json()["rows"].as_array().unwrap() {
println!("{}", row); println!("{}", row);
} }
db.run_script(r"::lsh drop a:lsh", Default::default())
.unwrap();
} }
#[test] #[test]
@ -1142,7 +1147,9 @@ fn multi_index_vec() {
"#, "#,
Default::default(), Default::default(),
).unwrap(); ).unwrap();
let res = db.run_script("::indices product", Default::default()).unwrap(); let res = db
.run_script("::indices product", Default::default())
.unwrap();
for row in res.into_json()["rows"].as_array().unwrap() { for row in res.into_json()["rows"].as_array().unwrap() {
println!("{}", row); println!("{}", row);
} }

@ -58,29 +58,6 @@ impl<'s> Storage<'s> for MemStorage {
}) })
} }
fn del_range(&'s self, lower: &[u8], upper: &[u8]) -> Result<()> {
let store = self.store.clone();
let lower_b = lower.to_vec();
let upper_b = upper.to_vec();
let closure = move || {
let keys = {
let rdr = store.read().unwrap();
rdr.range(lower_b..upper_b)
.map(|kv| kv.0.clone())
.collect_vec()
};
let mut wtr = store.write().unwrap();
for k in keys.iter() {
wtr.remove(k);
}
};
#[cfg(target_arch = "wasm32")]
closure();
#[cfg(not(target_arch = "wasm32"))]
std::thread::spawn(closure);
Ok(())
}
fn range_compact(&'s self, _lower: &[u8], _upper: &[u8]) -> Result<()> { fn range_compact(&'s self, _lower: &[u8], _upper: &[u8]) -> Result<()> {
Ok(()) Ok(())
} }
@ -149,6 +126,25 @@ impl<'s> StoreTx<'s> for MemTx<'s> {
} }
} }
fn del_range_from_persisted(&mut self, lower: &[u8], upper: &[u8]) -> Result<()> {
match self {
MemTx::Reader(_) => {
bail!("write in read transaction")
}
MemTx::Writer(ref mut wtr, _) => {
let keys = wtr
.range(lower.to_vec()..upper.to_vec())
.map(|kv| kv.0.clone())
.collect_vec();
for k in keys.iter() {
wtr.remove(k);
}
}
}
Ok(())
}
fn exists(&self, key: &[u8], _for_update: bool) -> Result<bool> { fn exists(&self, key: &[u8], _for_update: bool) -> Result<bool> {
Ok(match self { Ok(match self {
MemTx::Reader(rdr) => rdr.contains_key(key), MemTx::Reader(rdr) => rdr.contains_key(key),

@ -36,11 +36,6 @@ pub trait Storage<'s>: Send + Sync + Clone {
/// Create a transaction object. Write ops will only be called when `write == true`. /// Create a transaction object. Write ops will only be called when `write == true`.
fn transact(&'s self, write: bool) -> Result<Self::Tx>; fn transact(&'s self, write: bool) -> Result<Self::Tx>;
/// Delete a range. It is ok to return immediately and do the deletion in
/// the background. It is guaranteed that no keys within the deleted range
/// will be accessed in any way by any transaction again.
fn del_range(&'s self, lower: &[u8], upper: &[u8]) -> Result<()>;
/// Compact the key range. Can be a no-op if the storage engine does not /// Compact the key range. Can be a no-op if the storage engine does not
/// have the concept of compaction. /// have the concept of compaction.
fn range_compact(&'s self, lower: &[u8], upper: &[u8]) -> Result<()>; fn range_compact(&'s self, lower: &[u8], upper: &[u8]) -> Result<()>;
@ -85,6 +80,9 @@ pub trait StoreTx<'s>: Sync {
/// Delete a key-value pair from the storage. /// Delete a key-value pair from the storage.
fn del(&mut self, key: &[u8]) -> Result<()>; fn del(&mut self, key: &[u8]) -> Result<()>;
/// Delete a range from persisted data only.
fn del_range_from_persisted(&mut self, lower: &[u8], upper: &[u8]) -> Result<()>;
/// Check if a key exists. If `for_update` is `true` (only possible in a write transaction), /// Check if a key exists. If `for_update` is `true` (only possible in a write transaction),
/// then the database needs to guarantee that `commit()` can only succeed if /// then the database needs to guarantee that `commit()` can only succeed if
/// the key has not been modified outside the transaction. /// the key has not been modified outside the transaction.

@ -134,10 +134,6 @@ impl Storage<'_> for RocksDbStorage {
Ok(RocksDbTx { db_tx }) Ok(RocksDbTx { db_tx })
} }
fn del_range(&self, lower: &[u8], upper: &[u8]) -> Result<()> {
Ok(self.db.range_del(lower, upper)?)
}
fn range_compact(&self, lower: &[u8], upper: &[u8]) -> Result<()> { fn range_compact(&self, lower: &[u8], upper: &[u8]) -> Result<()> {
self.db.range_compact(lower, upper).into_diagnostic() self.db.range_compact(lower, upper).into_diagnostic()
} }
@ -184,6 +180,19 @@ impl<'s> StoreTx<'s> for RocksDbTx {
Ok(self.db_tx.del(key)?) Ok(self.db_tx.del(key)?)
} }
fn del_range_from_persisted(&mut self, lower: &[u8], upper: &[u8]) -> Result<()> {
let mut inner = self.db_tx.iterator().upper_bound(upper).start();
inner.seek(lower);
while let Some(key) = inner.key()? {
if key >= upper {
break;
}
self.db_tx.del(key)?;
inner.next();
}
Ok(())
}
#[inline] #[inline]
fn exists(&self, key: &[u8], for_update: bool) -> Result<bool> { fn exists(&self, key: &[u8], for_update: bool) -> Result<bool> {
Ok(self.db_tx.exists(key, for_update)?) Ok(self.db_tx.exists(key, for_update)?)
@ -242,7 +251,10 @@ impl<'s> StoreTx<'s> for RocksDbTx {
}) })
} }
fn range_count<'a>(&'a self, lower: &[u8], upper: &[u8]) -> Result<usize> where 's: 'a { fn range_count<'a>(&'a self, lower: &[u8], upper: &[u8]) -> Result<usize>
where
's: 'a,
{
let mut inner = self.db_tx.iterator().upper_bound(upper).start(); let mut inner = self.db_tx.iterator().upper_bound(upper).start();
inner.seek(lower); inner.seek(lower);
let mut count = 0; let mut count = 0;

@ -55,19 +55,6 @@ impl Storage<'_> for SledStorage {
}) })
} }
fn del_range(&self, lower: &[u8], upper: &[u8]) -> Result<()> {
let db = self.db.clone();
let lower_v = lower.to_vec();
let upper_v = upper.to_vec();
thread::spawn(move || -> Result<()> {
for k_res in db.range(lower_v..upper_v).keys() {
db.remove(k_res.into_diagnostic()?).into_diagnostic()?;
}
Ok(())
});
Ok(())
}
fn range_compact(&self, _lower: &[u8], _upper: &[u8]) -> Result<()> { fn range_compact(&self, _lower: &[u8], _upper: &[u8]) -> Result<()> {
Ok(()) Ok(())
} }
@ -158,6 +145,17 @@ impl<'s> StoreTx<'s> for SledTx {
Ok(()) Ok(())
} }
fn del_range_from_persisted(&mut self, lower: &[u8], upper: &[u8]) -> Result<()> {
let to_del: Vec<_> = self
.range_scan(lower, upper)
.map_ok(|(k, v)| k)
.try_collect()?;
for k_res in to_del {
self.db.remove(&k_res)?;
}
Ok(())
}
#[inline] #[inline]
fn exists(&self, key: &[u8], _for_update: bool) -> Result<bool> { fn exists(&self, key: &[u8], _for_update: bool) -> Result<bool> {
if let Some(changes) = &self.changes { if let Some(changes) = &self.changes {

@ -92,26 +92,6 @@ impl<'s> Storage<'s> for SqliteStorage {
}) })
} }
fn del_range(&'_ self, lower: &[u8], upper: &[u8]) -> Result<()> {
let lower_b = lower.to_vec();
let upper_b = upper.to_vec();
let query = r#"
delete from cozo where k >= ? and k < ?;
"#;
let lock = self.lock.clone();
let name = self.name.clone();
let closure = move || {
let _locked = lock.write().unwrap();
let conn = sqlite::open(&name).unwrap();
let mut statement = conn.prepare(query).unwrap();
statement.bind((1, &lower_b as &[u8])).unwrap();
statement.bind((2, &upper_b as &[u8])).unwrap();
while statement.next().unwrap() != State::Done {}
};
std::thread::spawn(closure);
Ok(())
}
fn batch_put<'a>( fn batch_put<'a>(
&'a self, &'a self,
data: Box<dyn Iterator<Item = Result<(Vec<u8>, Vec<u8>)>> + 'a>, data: Box<dyn Iterator<Item = Result<(Vec<u8>, Vec<u8>)>> + 'a>,
@ -155,7 +135,7 @@ const QUERIES: [&str; N_QUERIES] = [
"select 1 from cozo where k = ?;", "select 1 from cozo where k = ?;",
"select k, v from cozo where k >= ? and k < ? order by k;", "select k, v from cozo where k >= ? and k < ? order by k;",
"select k, v from cozo where k >= ? and k < ? order by k limit 1;", "select k, v from cozo where k >= ? and k < ? order by k limit 1;",
"select count(*) from cozo where k >= ? and k < ?;" "select count(*) from cozo where k >= ? and k < ?;",
]; ];
const GET_QUERY: usize = 0; const GET_QUERY: usize = 0;
@ -246,6 +226,18 @@ impl<'s> StoreTx<'s> for SqliteTx<'s> {
Ok(()) Ok(())
} }
fn del_range_from_persisted(&mut self, lower: &[u8], upper: &[u8]) -> Result<()> {
let query = r#"
delete from cozo where k >= ? and k < ?;
"#;
let mut statement = self.conn.as_ref().unwrap().prepare(query).unwrap();
statement.bind((1, lower)).unwrap();
statement.bind((2, upper)).unwrap();
while statement.next().unwrap() != State::Done {}
Ok(())
}
fn exists(&self, key: &[u8], _for_update: bool) -> Result<bool> { fn exists(&self, key: &[u8], _for_update: bool) -> Result<bool> {
self.ensure_stmt(EXISTS_QUERY); self.ensure_stmt(EXISTS_QUERY);
let mut statement = self.stmts[EXISTS_QUERY].lock().unwrap(); let mut statement = self.stmts[EXISTS_QUERY].lock().unwrap();
@ -321,7 +313,10 @@ impl<'s> StoreTx<'s> for SqliteTx<'s> {
Box::new(RawIter(statement)) Box::new(RawIter(statement))
} }
fn range_count<'a>(&'a self, lower: &[u8], upper: &[u8]) -> Result<usize> where 's: 'a { fn range_count<'a>(&'a self, lower: &[u8], upper: &[u8]) -> Result<usize>
where
's: 'a,
{
let query = QUERIES[COUNT_RANGE_QUERY]; let query = QUERIES[COUNT_RANGE_QUERY];
let mut statement = self.conn.as_ref().unwrap().prepare(query).unwrap(); let mut statement = self.conn.as_ref().unwrap().prepare(query).unwrap();
statement.bind((1, lower)).unwrap(); statement.bind((1, lower)).unwrap();

@ -33,10 +33,6 @@ impl<'s> Storage<'s> for TempStorage {
}) })
} }
fn del_range(&'s self, _lower: &[u8], _upper: &[u8]) -> Result<()> {
panic!("del_range called on temp store")
}
fn range_compact(&'s self, _lower: &[u8], _upper: &[u8]) -> Result<()> { fn range_compact(&'s self, _lower: &[u8], _upper: &[u8]) -> Result<()> {
panic!("range compact called on temp store") panic!("range compact called on temp store")
} }
@ -76,6 +72,10 @@ impl<'s> StoreTx<'s> for TempTx {
Ok(()) Ok(())
} }
fn del_range_from_persisted(&mut self, _lower: &[u8], _upper: &[u8]) -> Result<()> {
Ok(())
}
fn exists(&self, key: &[u8], _for_update: bool) -> Result<bool> { fn exists(&self, key: &[u8], _for_update: bool) -> Result<bool> {
Ok(self.store.contains_key(key)) Ok(self.store.contains_key(key))
} }

@ -79,17 +79,6 @@ impl Storage<'_> for TiKvStorage {
}) })
} }
fn del_range(&self, lower: &[u8], upper: &[u8]) -> Result<()> {
let raw_client = self.raw_client.clone();
let lower_b = lower.to_owned();
let upper_b = upper.to_owned();
thread::spawn(move || {
RT.block_on(raw_client.delete_range(lower_b..upper_b))
.unwrap();
});
Ok(())
}
fn range_compact(&self, _lower: &[u8], _upper: &[u8]) -> Result<()> { fn range_compact(&self, _lower: &[u8], _upper: &[u8]) -> Result<()> {
Ok(()) Ok(())
} }
@ -141,6 +130,17 @@ impl<'s> StoreTx<'s> for TiKvTx {
.into_diagnostic() .into_diagnostic()
} }
fn del_range_from_persisted(&mut self, lower: &[u8], upper: &[u8]) -> Result<()> {
let to_remove: Vec<_> = self
.range_scan(lower, upper)
.map_ok(|(k, v)| k)
.try_collect()?;
for key in to_remove {
self.del(&key)?;
}
Ok(())
}
fn exists(&self, key: &[u8], for_update: bool) -> Result<bool> { fn exists(&self, key: &[u8], for_update: bool) -> Result<bool> {
if for_update { if for_update {
RT.block_on(self.tx.lock().unwrap().get_for_update(key.to_owned())) RT.block_on(self.tx.lock().unwrap().get_for_update(key.to_owned()))

Loading…
Cancel
Save