From f2b96c4c2694dabfed64622ccf651d8904472d65 Mon Sep 17 00:00:00 2001 From: Ziyang Hu Date: Mon, 12 Dec 2022 19:23:14 +0800 Subject: [PATCH] make backup/restore smoother --- Cargo.lock | 2 +- cozo-core/Cargo.toml | 2 +- cozo-core/benches/pokec.rs | 20 ++++++++++- cozo-core/src/runtime/db.rs | 41 +++++++++++++--------- cozo-core/src/storage/mem.rs | 60 ++++++++++++++++++++++----------- cozo-core/src/storage/mod.rs | 29 ++++++++-------- cozo-core/src/storage/rocks.rs | 22 ++++++++++++ cozo-core/src/storage/sled.rs | 24 +++++++++++++ cozo-core/src/storage/sqlite.rs | 42 +++++++++++++---------- cozo-core/src/storage/tikv.rs | 24 +++++++++++++ cozorocks/Cargo.toml | 2 +- cozorocks/bridge/db.h | 8 +++++ cozorocks/src/bridge/db.rs | 10 ++++++ cozorocks/src/bridge/mod.rs | 6 ++++ 14 files changed, 221 insertions(+), 71 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7fec0168..3b0c98eb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -636,7 +636,7 @@ dependencies = [ [[package]] name = "cozorocks" -version = "0.1.2" +version = "0.1.3" dependencies = [ "cc", "cxx", diff --git a/cozo-core/Cargo.toml b/cozo-core/Cargo.toml index fd0ec877..84ca6171 100644 --- a/cozo-core/Cargo.toml +++ b/cozo-core/Cargo.toml @@ -116,7 +116,7 @@ document-features = "0.2.6" rayon = { version = "1.5.3", optional = true } minreq = { version = "2.6.0", features = ["https-rustls"], optional = true } tikv-jemallocator-global = { version = "0.5.0", optional = true } -cozorocks = { path = "../cozorocks", version = "0.1.2", optional = true } +cozorocks = { path = "../cozorocks", version = "0.1.3", optional = true } sled = { version = "0.34.7", optional = true } tikv-client = { version = "0.1.0", optional = true } tokio = { version = "1.21.2", optional = true } diff --git a/cozo-core/benches/pokec.rs b/cozo-core/benches/pokec.rs index 3e5143e8..8dc48c65 100644 --- a/cozo-core/benches/pokec.rs +++ b/cozo-core/benches/pokec.rs @@ -58,8 +58,12 @@ lazy_static! { backup_path.push(format!("backup-{}.db", data_size)); if Path::exists(&backup_path) { println!("restore from backup"); + let import_time = Instant::now(); db.restore_backup(backup_path.to_str().unwrap()).unwrap(); + dbg!(import_time.elapsed()); + dbg!(((SIZES.0 + 2 * SIZES.1) as f64) / import_time.elapsed().as_secs_f64()); } else { + println!("parse data from text file"); let mut file_path = data_dir.clone(); file_path.push(format!("pokec_{}_import.cypher", data_size)); @@ -524,6 +528,21 @@ fn pattern_short() { .unwrap(); } +#[bench] +fn nothing(_: &mut Bencher) { + initialize(&TEST_DB); +} + +#[bench] +fn backup_db(_: &mut Bencher) { + initialize(&TEST_DB); + let data_size = env::var("COZO_BENCH_POKEC_SIZE").unwrap_or("medium".to_string()); + let backup_taken = Instant::now(); + TEST_DB.backup_db(format!("backup-{}.db", data_size)).unwrap(); + dbg!(backup_taken.elapsed()); + dbg!(((SIZES.0 + 2 * SIZES.1) as f64) / backup_taken.elapsed().as_secs_f64()); +} + #[bench] fn bench_aggregation(b: &mut Bencher) { initialize(&TEST_DB); @@ -669,7 +688,6 @@ fn qps_single_vertex_write(_b: &mut Bencher) { dbg!((count as f64) / qps_single_vertex_write_time.elapsed().as_secs_f64()); } - #[bench] fn bench_single_vertex_write(b: &mut Bencher) { initialize(&TEST_DB); diff --git a/cozo-core/src/runtime/db.rs b/cozo-core/src/runtime/db.rs index 834b6891..57eff8b2 100644 --- a/cozo-core/src/runtime/db.rs +++ b/cozo-core/src/runtime/db.rs @@ -327,12 +327,10 @@ impl<'s, S: Storage<'s>> Db { if sqlite_db.relation_store_id.load(Ordering::SeqCst) != 0 { bail!("Cannot create backup: data exists in the target database."); } - let mut s_tx = sqlite_db.transact_write()?; let mut tx = self.transact()?; let iter = tx.store_tx.range_scan(&[], &[0xFF]); - s_tx.store_tx.batch_put(iter)?; + sqlite_db.db.batch_put(iter)?; tx.commit_tx()?; - s_tx.commit_tx()?; Ok(()) } #[cfg(not(feature = "storage-sqlite"))] @@ -345,19 +343,21 @@ impl<'s, S: Storage<'s>> Db { { let sqlite_db = crate::new_cozo_sqlite(in_file.to_string())?; let mut s_tx = sqlite_db.transact()?; - let mut tx = self.transact_write()?; - let store_id = tx.relation_store_id.load(Ordering::SeqCst); - if store_id != 0 { - bail!( - "Cannot restore backup: data exists in the current database. \ + { + let mut tx = self.transact()?; + let store_id = tx.relation_store_id.load(Ordering::SeqCst); + if store_id != 0 { + bail!( + "Cannot restore backup: data exists in the current database. \ You can only restore into a new database (store id: {}).", - store_id - ); + store_id + ); + } + tx.commit_tx()?; } - let iter = s_tx.store_tx.range_scan(&[], &[1]); - tx.store_tx.batch_put(iter)?; + let iter = s_tx.store_tx.total_scan(); + self.db.batch_put(iter)?; s_tx.commit_tx()?; - tx.commit_tx()?; Ok(()) } #[cfg(not(feature = "storage-sqlite"))] @@ -402,7 +402,10 @@ impl<'s, S: Storage<'s>> Db { Ok((src_k, src_v)) }, ); - dst_tx.store_tx.batch_put(Box::new(data_it))?; + for result in data_it { + let (key, val) = result?; + dst_tx.store_tx.put(&key, &val)?; + } } src_tx.commit_tx()?; @@ -1297,13 +1300,19 @@ mod tests { fn test_classical() { let _ = env_logger::builder().is_test(true).try_init(); let db = new_cozo_mem().unwrap(); - let res = db.run_script(r#" + let res = db + .run_script( + r#" parent[] <- [['joseph', 'jakob'], ['jakob', 'issac'], ['issac', 'abraham']] grandparent[gcld, gp] := parent[gcld, p], parent[p, gp] ?[who] := grandparent[who, 'abraham'] - "#, Default::default()).unwrap().rows; + "#, + Default::default(), + ) + .unwrap() + .rows; println!("{:?}", res); assert_eq!(res[0][0], json!("jakob")) } diff --git a/cozo-core/src/storage/mem.rs b/cozo-core/src/storage/mem.rs index cdd2ba56..8b158ce5 100644 --- a/cozo-core/src/storage/mem.rs +++ b/cozo-core/src/storage/mem.rs @@ -41,6 +41,10 @@ pub struct MemStorage { impl<'s> Storage<'s> for MemStorage { type Tx = MemTx<'s>; + fn storage_kind(&self) -> &'static str { + "mem" + } + fn transact(&'s self, write: bool) -> Result { Ok(if write { let wtr = self.store.write().unwrap(); @@ -77,6 +81,18 @@ impl<'s> Storage<'s> for MemStorage { fn range_compact(&'s self, _lower: &[u8], _upper: &[u8]) -> Result<()> { Ok(()) } + + fn batch_put<'a>( + &'a self, + data: Box, Vec)>> + 'a>, + ) -> Result<()> { + let mut store = self.store.write().unwrap(); + for pair in data { + let (k, v) = pair?; + store.insert(k, v); + } + Ok(()) + } } pub enum MemTx<'s> { @@ -197,36 +213,38 @@ impl<'s> StoreTx<'s> for MemTx<'s> { } } - fn batch_put<'a>( - &'a mut self, - data: Box, Vec)>> + 'a>, - ) -> Result<()> + fn total_scan<'a>(&'a self) -> Box, Vec)>> + 'a> where 's: 'a, { match self { - MemTx::Reader(_) => { - bail!("write in read transaction") - } - MemTx::Writer(_, cache) => { - for pair in data { - let (k, v) = pair?; - cache.insert(k, Some(v)); - } - Ok(()) - } + MemTx::Reader(rdr) => Box::new(rdr.iter().map(|(k, v)| Ok((k.clone(), v.clone())))), + MemTx::Writer(wtr, cache) => Box::new(CacheIterRaw { + change_iter: cache.iter().fuse(), + db_iter: wtr.iter().fuse(), + change_cache: None, + db_cache: None, + }), } } } -struct CacheIterRaw<'a> { - change_iter: Fuse, Option>>>, - db_iter: Fuse, Vec>>, +struct CacheIterRaw<'a, C, T> +where + C: Iterator, &'a Option>)> + 'a, + T: Iterator, &'a Vec)>, +{ + change_iter: C, + db_iter: T, change_cache: Option<(&'a Vec, &'a Option>)>, db_cache: Option<(&'a Vec, &'a Vec)>, } -impl CacheIterRaw<'_> { +impl<'a, C, T> CacheIterRaw<'a, C, T> +where + C: Iterator, &'a Option>)> + 'a, + T: Iterator, &'a Vec)>, +{ #[inline] fn fill_cache(&mut self) -> Result<()> { if self.change_cache.is_none() { @@ -283,7 +301,11 @@ impl CacheIterRaw<'_> { } } -impl Iterator for CacheIterRaw<'_> { +impl<'a, C, T> Iterator for CacheIterRaw<'a, C, T> +where + C: Iterator, &'a Option>)> + 'a, + T: Iterator, &'a Vec)>, +{ type Item = Result<(Vec, Vec)>; #[inline] diff --git a/cozo-core/src/storage/mod.rs b/cozo-core/src/storage/mod.rs index ce1c9ded..ebc2ec07 100644 --- a/cozo-core/src/storage/mod.rs +++ b/cozo-core/src/storage/mod.rs @@ -28,6 +28,9 @@ pub trait Storage<'s> { /// The associated transaction type used by this engine type Tx: StoreTx<'s>; + /// Returns a string that identifies the storage kind + fn storage_kind(&self) -> &'static str; + /// Create a transaction object. Write ops will only be called when `write == true`. fn transact(&'s self, write: bool) -> Result; @@ -39,6 +42,14 @@ pub trait Storage<'s> { /// Compact the key range. Can be a no-op if the storage engine does not /// have the concept of compaction. fn range_compact(&'s self, lower: &[u8], upper: &[u8]) -> Result<()>; + + /// Put multiple key-value pairs into the database. + /// No duplicate data will be sent, and the order data come in is strictly ascending. + /// There will be no other access to the database while this function is running. + fn batch_put<'a>( + &'a self, + data: Box, Vec)>> + 'a>, + ) -> Result<()>; } /// Trait for the associated transaction type of a storage engine. @@ -92,20 +103,8 @@ pub trait StoreTx<'s> { where 's: 'a; - /// Put multiple key-value pairs into the database. - /// The default implementation just calls `put` repeatedly. - /// Implement if there is a more efficient way. - fn batch_put<'a>( - &'a mut self, - data: Box, Vec)>> + 'a>, - ) -> Result<()> + /// Scan for all rows. The rows are required to be in ascending order. + fn total_scan<'a>(&'a self) -> Box, Vec)>> + 'a> where - 's: 'a, - { - for pair in data { - let (k, v) = pair?; - self.put(&k, &v)?; - } - Ok(()) - } + 's: 'a; } diff --git a/cozo-core/src/storage/rocks.rs b/cozo-core/src/storage/rocks.rs index f7bdc42f..aff437a4 100644 --- a/cozo-core/src/storage/rocks.rs +++ b/cozo-core/src/storage/rocks.rs @@ -120,6 +120,10 @@ impl RocksDbStorage { impl Storage<'_> for RocksDbStorage { type Tx = RocksDbTx; + fn storage_kind(&self) -> &'static str { + "rocksdb" + } + fn transact(&self, _write: bool) -> Result { let db_tx = self.db.transact().set_snapshot(true).start(); Ok(RocksDbTx { db_tx }) @@ -132,6 +136,17 @@ impl Storage<'_> for RocksDbStorage { fn range_compact(&self, lower: &[u8], upper: &[u8]) -> Result<()> { self.db.range_compact(lower, upper).into_diagnostic() } + + fn batch_put<'a>( + &'a self, + data: Box, Vec)>> + 'a>, + ) -> Result<()> { + for result in data { + let (key, val) = result?; + self.db.raw_put(&key, &val)?; + } + Ok(()) + } } pub struct RocksDbTx { @@ -196,6 +211,13 @@ impl<'s> StoreTx<'s> for RocksDbTx { upper_bound: upper.to_vec(), }) } + + fn total_scan<'a>(&'a self) -> Box, Vec)>> + 'a> + where + 's: 'a, + { + self.range_scan(&[], &[u8::MAX]) + } } pub(crate) struct RocksDbIterator { diff --git a/cozo-core/src/storage/sled.rs b/cozo-core/src/storage/sled.rs index 821ada5f..d32b4b04 100644 --- a/cozo-core/src/storage/sled.rs +++ b/cozo-core/src/storage/sled.rs @@ -43,6 +43,10 @@ const DEL_MARKER: u8 = 0; impl Storage<'_> for SledStorage { type Tx = SledTx; + fn storage_kind(&self) -> &'static str { + "sled" + } + fn transact(&self, _write: bool) -> Result { Ok(SledTx { db: self.db.clone(), @@ -66,6 +70,19 @@ impl Storage<'_> for SledStorage { fn range_compact(&self, _lower: &[u8], _upper: &[u8]) -> Result<()> { Ok(()) } + + fn batch_put<'a>( + &'a self, + data: Box, Vec)>> + 'a>, + ) -> Result<()> { + let mut tx = self.transact(true)?; + for result in data { + let (key, val) = result?; + tx.put(&key, &val)?; + } + tx.commit()?; + Ok(()) + } } pub struct SledTx { @@ -212,6 +229,13 @@ impl<'s> StoreTx<'s> for SledTx { ) } } + + fn total_scan<'a>(&'a self) -> Box, Vec)>> + 'a> + where + 's: 'a, + { + self.range_scan(&[], &[u8::MAX]) + } } struct SledIterRaw { diff --git a/cozo-core/src/storage/sqlite.rs b/cozo-core/src/storage/sqlite.rs index 44343fb7..882fcfde 100644 --- a/cozo-core/src/storage/sqlite.rs +++ b/cozo-core/src/storage/sqlite.rs @@ -112,11 +112,28 @@ impl<'s> Storage<'s> for SqliteStorage { Ok(()) } + fn batch_put<'a>( + &'a self, + data: Box, Vec)>> + 'a>, + ) -> Result<()> { + let mut tx = self.transact(true)?; + for result in data { + let (key, val) = result?; + tx.put(&key, &val)?; + } + tx.commit()?; + Ok(()) + } + fn range_compact(&'_ self, _lower: &[u8], _upper: &[u8]) -> Result<()> { let mut pool = self.pool.lock().unwrap(); while pool.pop().is_some() {} Ok(()) } + + fn storage_kind(&self) -> &'static str { + "sqlite" + } } pub struct SqliteTx<'a> { @@ -274,26 +291,17 @@ impl<'s> StoreTx<'s> for SqliteTx<'s> { Box::new(RawIter(statement)) } - fn batch_put<'a>( - &'a mut self, - data: Box, Vec)>> + 'a>, - ) -> Result<()> + fn total_scan<'a>(&'a self) -> Box, Vec)>> + 'a> where 's: 'a, { - self.ensure_stmt(PUT_QUERY); - let mut statement = self.stmts[PUT_QUERY].borrow_mut(); - let statement = statement.as_mut().unwrap(); - statement.reset().unwrap(); - - for pair in data { - let (key, val) = pair?; - statement.bind((1, key.as_slice())).unwrap(); - statement.bind((2, val.as_slice())).unwrap(); - while statement.next().into_diagnostic()? != State::Done {} - statement.reset().unwrap(); - } - Ok(()) + let statement = self + .conn + .as_ref() + .unwrap() + .prepare("select k, v from cozo order by k;") + .unwrap(); + Box::new(RawIter(statement)) } } diff --git a/cozo-core/src/storage/tikv.rs b/cozo-core/src/storage/tikv.rs index bf6f2b98..af854807 100644 --- a/cozo-core/src/storage/tikv.rs +++ b/cozo-core/src/storage/tikv.rs @@ -61,6 +61,10 @@ pub struct TiKvStorage { impl Storage<'_> for TiKvStorage { type Tx = TiKvTx; + fn storage_kind(&self) -> &'static str { + "tikv" + } + fn transact(&self, _write: bool) -> Result { let tx = if self.optimistic { RT.block_on(self.client.begin_optimistic()) @@ -88,6 +92,19 @@ impl Storage<'_> for TiKvStorage { fn range_compact(&self, _lower: &[u8], _upper: &[u8]) -> Result<()> { Ok(()) } + + fn batch_put<'a>( + &'a self, + data: Box, Vec)>> + 'a>, + ) -> Result<()> { + let mut tx = self.transact(true)?; + for result in data { + let (key, val) = result?; + tx.put(&key, &val)?; + } + tx.commit()?; + Ok(()) + } } pub struct TiKvTx { @@ -153,6 +170,13 @@ impl<'s> StoreTx<'s> for TiKvTx { { Box::new(BatchScannerRaw::new(self.tx.clone(), lower, upper)) } + + fn total_scan<'a>(&'a self) -> Box, Vec)>> + 'a> + where + 's: 'a, + { + self.range_scan(&[], &[u8::MAX]) + } } struct BatchScannerRaw { diff --git a/cozorocks/Cargo.toml b/cozorocks/Cargo.toml index 8124b335..bec0feca 100644 --- a/cozorocks/Cargo.toml +++ b/cozorocks/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "cozorocks" -version = "0.1.2" +version = "0.1.3" edition = "2021" license = "MPL-2.0" authors = ["Ziyang Hu"] diff --git a/cozorocks/bridge/db.h b/cozorocks/bridge/db.h index dd60b2d5..fc7e462a 100644 --- a/cozorocks/bridge/db.h +++ b/cozorocks/bridge/db.h @@ -42,6 +42,8 @@ struct SstFileWriterBridge { }; +static WriteOptions DEFAULT_WRITE_OPTIONS = WriteOptions(); + struct RocksDbBridge { unique_ptr db; @@ -93,6 +95,12 @@ struct RocksDbBridge { write_status(s2, status); } + inline void put(RustBytes key, RustBytes val, RocksDbStatus &status) const { + auto raw_db = this->get_base_db(); + auto s = raw_db->Put(DEFAULT_WRITE_OPTIONS, convert_slice(key), convert_slice(val)); + write_status(s, status); + } + void compact_range(RustBytes start, RustBytes end, RocksDbStatus &status) const { CompactRangeOptions options; auto cf = db->DefaultColumnFamily(); diff --git a/cozorocks/src/bridge/db.rs b/cozorocks/src/bridge/db.rs index f16015b2..b9bfa0c1 100644 --- a/cozorocks/src/bridge/db.rs +++ b/cozorocks/src/bridge/db.rs @@ -146,6 +146,16 @@ impl RocksDb { } } #[inline] + pub fn raw_put(&self, key: &[u8], val: &[u8]) -> Result<(), RocksDbStatus> { + let mut status = RocksDbStatus::default(); + self.inner.put(key, val, &mut status); + if status.is_ok() { + Ok(()) + } else { + Err(status) + } + } + #[inline] pub fn range_compact(&self, lower: &[u8], upper: &[u8]) -> Result<(), RocksDbStatus> { let mut status = RocksDbStatus::default(); self.inner.compact_range(lower, upper, &mut status); diff --git a/cozorocks/src/bridge/mod.rs b/cozorocks/src/bridge/mod.rs index 37a163b5..065fa051 100644 --- a/cozorocks/src/bridge/mod.rs +++ b/cozorocks/src/bridge/mod.rs @@ -133,6 +133,12 @@ pub(crate) mod ffi { upper: &[u8], status: &mut RocksDbStatus, ); + fn put( + self: &RocksDbBridge, + key: &[u8], + val: &[u8], + status: &mut RocksDbStatus, + ); fn compact_range( self: &RocksDbBridge, lower: &[u8],