make backup/restore smoother

main
Ziyang Hu 2 years ago
parent 1b95c2616d
commit f2b96c4c26

2
Cargo.lock generated

@ -636,7 +636,7 @@ dependencies = [
[[package]]
name = "cozorocks"
version = "0.1.2"
version = "0.1.3"
dependencies = [
"cc",
"cxx",

@ -116,7 +116,7 @@ document-features = "0.2.6"
rayon = { version = "1.5.3", optional = true }
minreq = { version = "2.6.0", features = ["https-rustls"], optional = true }
tikv-jemallocator-global = { version = "0.5.0", optional = true }
cozorocks = { path = "../cozorocks", version = "0.1.2", optional = true }
cozorocks = { path = "../cozorocks", version = "0.1.3", optional = true }
sled = { version = "0.34.7", optional = true }
tikv-client = { version = "0.1.0", optional = true }
tokio = { version = "1.21.2", optional = true }

@ -58,8 +58,12 @@ lazy_static! {
backup_path.push(format!("backup-{}.db", data_size));
if Path::exists(&backup_path) {
println!("restore from backup");
let import_time = Instant::now();
db.restore_backup(backup_path.to_str().unwrap()).unwrap();
dbg!(import_time.elapsed());
dbg!(((SIZES.0 + 2 * SIZES.1) as f64) / import_time.elapsed().as_secs_f64());
} else {
println!("parse data from text file");
let mut file_path = data_dir.clone();
file_path.push(format!("pokec_{}_import.cypher", data_size));
@ -524,6 +528,21 @@ fn pattern_short() {
.unwrap();
}
#[bench]
fn nothing(_: &mut Bencher) {
initialize(&TEST_DB);
}
#[bench]
fn backup_db(_: &mut Bencher) {
initialize(&TEST_DB);
let data_size = env::var("COZO_BENCH_POKEC_SIZE").unwrap_or("medium".to_string());
let backup_taken = Instant::now();
TEST_DB.backup_db(format!("backup-{}.db", data_size)).unwrap();
dbg!(backup_taken.elapsed());
dbg!(((SIZES.0 + 2 * SIZES.1) as f64) / backup_taken.elapsed().as_secs_f64());
}
#[bench]
fn bench_aggregation(b: &mut Bencher) {
initialize(&TEST_DB);
@ -669,7 +688,6 @@ fn qps_single_vertex_write(_b: &mut Bencher) {
dbg!((count as f64) / qps_single_vertex_write_time.elapsed().as_secs_f64());
}
#[bench]
fn bench_single_vertex_write(b: &mut Bencher) {
initialize(&TEST_DB);

@ -327,12 +327,10 @@ impl<'s, S: Storage<'s>> Db<S> {
if sqlite_db.relation_store_id.load(Ordering::SeqCst) != 0 {
bail!("Cannot create backup: data exists in the target database.");
}
let mut s_tx = sqlite_db.transact_write()?;
let mut tx = self.transact()?;
let iter = tx.store_tx.range_scan(&[], &[0xFF]);
s_tx.store_tx.batch_put(iter)?;
sqlite_db.db.batch_put(iter)?;
tx.commit_tx()?;
s_tx.commit_tx()?;
Ok(())
}
#[cfg(not(feature = "storage-sqlite"))]
@ -345,7 +343,8 @@ impl<'s, S: Storage<'s>> Db<S> {
{
let sqlite_db = crate::new_cozo_sqlite(in_file.to_string())?;
let mut s_tx = sqlite_db.transact()?;
let mut tx = self.transact_write()?;
{
let mut tx = self.transact()?;
let store_id = tx.relation_store_id.load(Ordering::SeqCst);
if store_id != 0 {
bail!(
@ -354,10 +353,11 @@ impl<'s, S: Storage<'s>> Db<S> {
store_id
);
}
let iter = s_tx.store_tx.range_scan(&[], &[1]);
tx.store_tx.batch_put(iter)?;
s_tx.commit_tx()?;
tx.commit_tx()?;
}
let iter = s_tx.store_tx.total_scan();
self.db.batch_put(iter)?;
s_tx.commit_tx()?;
Ok(())
}
#[cfg(not(feature = "storage-sqlite"))]
@ -402,7 +402,10 @@ impl<'s, S: Storage<'s>> Db<S> {
Ok((src_k, src_v))
},
);
dst_tx.store_tx.batch_put(Box::new(data_it))?;
for result in data_it {
let (key, val) = result?;
dst_tx.store_tx.put(&key, &val)?;
}
}
src_tx.commit_tx()?;
@ -1297,13 +1300,19 @@ mod tests {
fn test_classical() {
let _ = env_logger::builder().is_test(true).try_init();
let db = new_cozo_mem().unwrap();
let res = db.run_script(r#"
let res = db
.run_script(
r#"
parent[] <- [['joseph', 'jakob'],
['jakob', 'issac'],
['issac', 'abraham']]
grandparent[gcld, gp] := parent[gcld, p], parent[p, gp]
?[who] := grandparent[who, 'abraham']
"#, Default::default()).unwrap().rows;
"#,
Default::default(),
)
.unwrap()
.rows;
println!("{:?}", res);
assert_eq!(res[0][0], json!("jakob"))
}

@ -41,6 +41,10 @@ pub struct MemStorage {
impl<'s> Storage<'s> for MemStorage {
type Tx = MemTx<'s>;
fn storage_kind(&self) -> &'static str {
"mem"
}
fn transact(&'s self, write: bool) -> Result<Self::Tx> {
Ok(if write {
let wtr = self.store.write().unwrap();
@ -77,6 +81,18 @@ impl<'s> Storage<'s> for MemStorage {
fn range_compact(&'s self, _lower: &[u8], _upper: &[u8]) -> Result<()> {
Ok(())
}
fn batch_put<'a>(
&'a self,
data: Box<dyn Iterator<Item = Result<(Vec<u8>, Vec<u8>)>> + 'a>,
) -> Result<()> {
let mut store = self.store.write().unwrap();
for pair in data {
let (k, v) = pair?;
store.insert(k, v);
}
Ok(())
}
}
pub enum MemTx<'s> {
@ -197,36 +213,38 @@ impl<'s> StoreTx<'s> for MemTx<'s> {
}
}
fn batch_put<'a>(
&'a mut self,
data: Box<dyn Iterator<Item = Result<(Vec<u8>, Vec<u8>)>> + 'a>,
) -> Result<()>
fn total_scan<'a>(&'a self) -> Box<dyn Iterator<Item = Result<(Vec<u8>, Vec<u8>)>> + 'a>
where
's: 'a,
{
match self {
MemTx::Reader(_) => {
bail!("write in read transaction")
}
MemTx::Writer(_, cache) => {
for pair in data {
let (k, v) = pair?;
cache.insert(k, Some(v));
}
Ok(())
}
MemTx::Reader(rdr) => Box::new(rdr.iter().map(|(k, v)| Ok((k.clone(), v.clone())))),
MemTx::Writer(wtr, cache) => Box::new(CacheIterRaw {
change_iter: cache.iter().fuse(),
db_iter: wtr.iter().fuse(),
change_cache: None,
db_cache: None,
}),
}
}
}
struct CacheIterRaw<'a> {
change_iter: Fuse<Range<'a, Vec<u8>, Option<Vec<u8>>>>,
db_iter: Fuse<Range<'a, Vec<u8>, Vec<u8>>>,
struct CacheIterRaw<'a, C, T>
where
C: Iterator<Item = (&'a Vec<u8>, &'a Option<Vec<u8>>)> + 'a,
T: Iterator<Item = (&'a Vec<u8>, &'a Vec<u8>)>,
{
change_iter: C,
db_iter: T,
change_cache: Option<(&'a Vec<u8>, &'a Option<Vec<u8>>)>,
db_cache: Option<(&'a Vec<u8>, &'a Vec<u8>)>,
}
impl CacheIterRaw<'_> {
impl<'a, C, T> CacheIterRaw<'a, C, T>
where
C: Iterator<Item = (&'a Vec<u8>, &'a Option<Vec<u8>>)> + 'a,
T: Iterator<Item = (&'a Vec<u8>, &'a Vec<u8>)>,
{
#[inline]
fn fill_cache(&mut self) -> Result<()> {
if self.change_cache.is_none() {
@ -283,7 +301,11 @@ impl CacheIterRaw<'_> {
}
}
impl Iterator for CacheIterRaw<'_> {
impl<'a, C, T> Iterator for CacheIterRaw<'a, C, T>
where
C: Iterator<Item = (&'a Vec<u8>, &'a Option<Vec<u8>>)> + 'a,
T: Iterator<Item = (&'a Vec<u8>, &'a Vec<u8>)>,
{
type Item = Result<(Vec<u8>, Vec<u8>)>;
#[inline]

@ -28,6 +28,9 @@ pub trait Storage<'s> {
/// The associated transaction type used by this engine
type Tx: StoreTx<'s>;
/// Returns a string that identifies the storage kind
fn storage_kind(&self) -> &'static str;
/// Create a transaction object. Write ops will only be called when `write == true`.
fn transact(&'s self, write: bool) -> Result<Self::Tx>;
@ -39,6 +42,14 @@ pub trait Storage<'s> {
/// Compact the key range. Can be a no-op if the storage engine does not
/// have the concept of compaction.
fn range_compact(&'s self, lower: &[u8], upper: &[u8]) -> Result<()>;
/// Put multiple key-value pairs into the database.
/// No duplicate data will be sent, and the order data come in is strictly ascending.
/// There will be no other access to the database while this function is running.
fn batch_put<'a>(
&'a self,
data: Box<dyn Iterator<Item = Result<(Vec<u8>, Vec<u8>)>> + 'a>,
) -> Result<()>;
}
/// Trait for the associated transaction type of a storage engine.
@ -92,20 +103,8 @@ pub trait StoreTx<'s> {
where
's: 'a;
/// Put multiple key-value pairs into the database.
/// The default implementation just calls `put` repeatedly.
/// Implement if there is a more efficient way.
fn batch_put<'a>(
&'a mut self,
data: Box<dyn Iterator<Item = Result<(Vec<u8>, Vec<u8>)>> + 'a>,
) -> Result<()>
/// Scan for all rows. The rows are required to be in ascending order.
fn total_scan<'a>(&'a self) -> Box<dyn Iterator<Item = Result<(Vec<u8>, Vec<u8>)>> + 'a>
where
's: 'a,
{
for pair in data {
let (k, v) = pair?;
self.put(&k, &v)?;
}
Ok(())
}
's: 'a;
}

@ -120,6 +120,10 @@ impl RocksDbStorage {
impl Storage<'_> for RocksDbStorage {
type Tx = RocksDbTx;
fn storage_kind(&self) -> &'static str {
"rocksdb"
}
fn transact(&self, _write: bool) -> Result<Self::Tx> {
let db_tx = self.db.transact().set_snapshot(true).start();
Ok(RocksDbTx { db_tx })
@ -132,6 +136,17 @@ impl Storage<'_> for RocksDbStorage {
fn range_compact(&self, lower: &[u8], upper: &[u8]) -> Result<()> {
self.db.range_compact(lower, upper).into_diagnostic()
}
fn batch_put<'a>(
&'a self,
data: Box<dyn Iterator<Item = Result<(Vec<u8>, Vec<u8>)>> + 'a>,
) -> Result<()> {
for result in data {
let (key, val) = result?;
self.db.raw_put(&key, &val)?;
}
Ok(())
}
}
pub struct RocksDbTx {
@ -196,6 +211,13 @@ impl<'s> StoreTx<'s> for RocksDbTx {
upper_bound: upper.to_vec(),
})
}
fn total_scan<'a>(&'a self) -> Box<dyn Iterator<Item = Result<(Vec<u8>, Vec<u8>)>> + 'a>
where
's: 'a,
{
self.range_scan(&[], &[u8::MAX])
}
}
pub(crate) struct RocksDbIterator {

@ -43,6 +43,10 @@ const DEL_MARKER: u8 = 0;
impl Storage<'_> for SledStorage {
type Tx = SledTx;
fn storage_kind(&self) -> &'static str {
"sled"
}
fn transact(&self, _write: bool) -> Result<Self::Tx> {
Ok(SledTx {
db: self.db.clone(),
@ -66,6 +70,19 @@ impl Storage<'_> for SledStorage {
fn range_compact(&self, _lower: &[u8], _upper: &[u8]) -> Result<()> {
Ok(())
}
fn batch_put<'a>(
&'a self,
data: Box<dyn Iterator<Item = Result<(Vec<u8>, Vec<u8>)>> + 'a>,
) -> Result<()> {
let mut tx = self.transact(true)?;
for result in data {
let (key, val) = result?;
tx.put(&key, &val)?;
}
tx.commit()?;
Ok(())
}
}
pub struct SledTx {
@ -212,6 +229,13 @@ impl<'s> StoreTx<'s> for SledTx {
)
}
}
fn total_scan<'a>(&'a self) -> Box<dyn Iterator<Item = Result<(Vec<u8>, Vec<u8>)>> + 'a>
where
's: 'a,
{
self.range_scan(&[], &[u8::MAX])
}
}
struct SledIterRaw {

@ -112,11 +112,28 @@ impl<'s> Storage<'s> for SqliteStorage {
Ok(())
}
fn batch_put<'a>(
&'a self,
data: Box<dyn Iterator<Item = Result<(Vec<u8>, Vec<u8>)>> + 'a>,
) -> Result<()> {
let mut tx = self.transact(true)?;
for result in data {
let (key, val) = result?;
tx.put(&key, &val)?;
}
tx.commit()?;
Ok(())
}
fn range_compact(&'_ self, _lower: &[u8], _upper: &[u8]) -> Result<()> {
let mut pool = self.pool.lock().unwrap();
while pool.pop().is_some() {}
Ok(())
}
fn storage_kind(&self) -> &'static str {
"sqlite"
}
}
pub struct SqliteTx<'a> {
@ -274,26 +291,17 @@ impl<'s> StoreTx<'s> for SqliteTx<'s> {
Box::new(RawIter(statement))
}
fn batch_put<'a>(
&'a mut self,
data: Box<dyn Iterator<Item = Result<(Vec<u8>, Vec<u8>)>> + 'a>,
) -> Result<()>
fn total_scan<'a>(&'a self) -> Box<dyn Iterator<Item = Result<(Vec<u8>, Vec<u8>)>> + 'a>
where
's: 'a,
{
self.ensure_stmt(PUT_QUERY);
let mut statement = self.stmts[PUT_QUERY].borrow_mut();
let statement = statement.as_mut().unwrap();
statement.reset().unwrap();
for pair in data {
let (key, val) = pair?;
statement.bind((1, key.as_slice())).unwrap();
statement.bind((2, val.as_slice())).unwrap();
while statement.next().into_diagnostic()? != State::Done {}
statement.reset().unwrap();
}
Ok(())
let statement = self
.conn
.as_ref()
.unwrap()
.prepare("select k, v from cozo order by k;")
.unwrap();
Box::new(RawIter(statement))
}
}

@ -61,6 +61,10 @@ pub struct TiKvStorage {
impl Storage<'_> for TiKvStorage {
type Tx = TiKvTx;
fn storage_kind(&self) -> &'static str {
"tikv"
}
fn transact(&self, _write: bool) -> Result<Self::Tx> {
let tx = if self.optimistic {
RT.block_on(self.client.begin_optimistic())
@ -88,6 +92,19 @@ impl Storage<'_> for TiKvStorage {
fn range_compact(&self, _lower: &[u8], _upper: &[u8]) -> Result<()> {
Ok(())
}
fn batch_put<'a>(
&'a self,
data: Box<dyn Iterator<Item = Result<(Vec<u8>, Vec<u8>)>> + 'a>,
) -> Result<()> {
let mut tx = self.transact(true)?;
for result in data {
let (key, val) = result?;
tx.put(&key, &val)?;
}
tx.commit()?;
Ok(())
}
}
pub struct TiKvTx {
@ -153,6 +170,13 @@ impl<'s> StoreTx<'s> for TiKvTx {
{
Box::new(BatchScannerRaw::new(self.tx.clone(), lower, upper))
}
fn total_scan<'a>(&'a self) -> Box<dyn Iterator<Item = Result<(Vec<u8>, Vec<u8>)>> + 'a>
where
's: 'a,
{
self.range_scan(&[], &[u8::MAX])
}
}
struct BatchScannerRaw {

@ -1,6 +1,6 @@
[package]
name = "cozorocks"
version = "0.1.2"
version = "0.1.3"
edition = "2021"
license = "MPL-2.0"
authors = ["Ziyang Hu"]

@ -42,6 +42,8 @@ struct SstFileWriterBridge {
};
static WriteOptions DEFAULT_WRITE_OPTIONS = WriteOptions();
struct RocksDbBridge {
unique_ptr<TransactionDB> db;
@ -93,6 +95,12 @@ struct RocksDbBridge {
write_status(s2, status);
}
inline void put(RustBytes key, RustBytes val, RocksDbStatus &status) const {
auto raw_db = this->get_base_db();
auto s = raw_db->Put(DEFAULT_WRITE_OPTIONS, convert_slice(key), convert_slice(val));
write_status(s, status);
}
void compact_range(RustBytes start, RustBytes end, RocksDbStatus &status) const {
CompactRangeOptions options;
auto cf = db->DefaultColumnFamily();

@ -146,6 +146,16 @@ impl RocksDb {
}
}
#[inline]
pub fn raw_put(&self, key: &[u8], val: &[u8]) -> Result<(), RocksDbStatus> {
let mut status = RocksDbStatus::default();
self.inner.put(key, val, &mut status);
if status.is_ok() {
Ok(())
} else {
Err(status)
}
}
#[inline]
pub fn range_compact(&self, lower: &[u8], upper: &[u8]) -> Result<(), RocksDbStatus> {
let mut status = RocksDbStatus::default();
self.inner.compact_range(lower, upper, &mut status);

@ -133,6 +133,12 @@ pub(crate) mod ffi {
upper: &[u8],
status: &mut RocksDbStatus,
);
fn put(
self: &RocksDbBridge,
key: &[u8],
val: &[u8],
status: &mut RocksDbStatus,
);
fn compact_range(
self: &RocksDbBridge,
lower: &[u8],

Loading…
Cancel
Save