From 4366a2274ea4a79ef74e5500b8ea95e8d09922dc Mon Sep 17 00:00:00 2001 From: Ziyang Hu Date: Thu, 17 Nov 2022 22:27:30 +0800 Subject: [PATCH] import/export --- Cargo.lock | 16 +-- cozo-core/src/query/stored.rs | 12 +- cozo-core/src/runtime/db.rs | 227 ++++++++++++++++++++++++++---- cozo-core/src/runtime/relation.rs | 4 +- cozo-core/src/storage/mem.rs | 11 +- cozo-core/src/storage/mod.rs | 14 +- cozo-core/src/storage/sqlite.rs | 11 +- 7 files changed, 235 insertions(+), 60 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 68efded5..b617f238 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -775,9 +775,9 @@ dependencies = [ [[package]] name = "digest" -version = "0.10.5" +version = "0.10.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "adfbc57365a37acbd2ebf2b64d7e69bb766e2fea813521ed536f5d0520dcf86c" +checksum = "8168378f4e5023e7218c89c891c0fd8ecdb5e5e4f18cb78f38cf245dd021e76f" dependencies = [ "block-buffer", "crypto-common", @@ -1637,9 +1637,9 @@ dependencies = [ [[package]] name = "nalgebra" -version = "0.31.3" +version = "0.31.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0eef2ccf9aefdae3334ad5911b5414724b9047bfe145b3353d5878732cc7b86b" +checksum = "20bd243ab3dbb395b39ee730402d2e5405e448c75133ec49cc977762c4cba3d1" dependencies = [ "approx", "matrixmultiply", @@ -2388,9 +2388,9 @@ dependencies = [ [[package]] name = "reqwest" -version = "0.11.12" +version = "0.11.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "431949c384f4e2ae07605ccaa56d1d9d2ecdb5cadd4f9577ccfab29f2e5149fc" +checksum = "68cc60575865c7831548863cc02356512e3f1dc2f3f82cb837d7fc4cc8f3c97c" dependencies = [ "base64", "bytes", @@ -3315,9 +3315,9 @@ dependencies = [ [[package]] name = "uuid" -version = "1.2.1" +version = "1.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "feb41e78f93363bb2df8b0e86a2ca30eed7806ea16ea0c790d757cf93f79be83" +checksum = "422ee0de9031b5b948b97a8fc04e3aa35230001a722ddd27943e0be31564ce4c" dependencies = [ "atomic", "getrandom 0.2.8", diff --git a/cozo-core/src/query/stored.rs b/cozo-core/src/query/stored.rs index b0b68666..576401df 100644 --- a/cozo-core/src/query/stored.rs +++ b/cozo-core/src/query/stored.rs @@ -118,7 +118,7 @@ impl<'a> SessionTx<'a> { .map(|ex| ex.extract_data(&tuple)) .try_collect()?, ); - let key = relation_store.adhoc_encode_key(&extracted, *span)?; + let key = relation_store.encode_key_for_store(&extracted, *span)?; if has_triggers { if let Some(existing) = self.tx.get(&key, false)? { let mut tup = extracted.clone(); @@ -205,8 +205,8 @@ impl<'a> SessionTx<'a> { .try_collect()?, ); - let key = relation_store.adhoc_encode_key(&extracted, *span)?; - let val = relation_store.adhoc_encode_val(&extracted, *span)?; + let key = relation_store.encode_key_for_store(&extracted, *span)?; + let val = relation_store.encode_val_for_store(&extracted, *span)?; let existing = self.tx.get(&key, true)?; match existing { @@ -254,7 +254,7 @@ impl<'a> SessionTx<'a> { .map(|ex| ex.extract_data(&tuple)) .try_collect()?, ); - let key = relation_store.adhoc_encode_key(&extracted, *span)?; + let key = relation_store.encode_key_for_store(&extracted, *span)?; if self.tx.exists(&key, true)? { bail!(TransactAssertionFailure { relation: relation_store.name.to_string(), @@ -302,8 +302,8 @@ impl<'a> SessionTx<'a> { .try_collect()?, ); - let key = relation_store.adhoc_encode_key(&extracted, *span)?; - let val = relation_store.adhoc_encode_val(&extracted, *span)?; + let key = relation_store.encode_key_for_store(&extracted, *span)?; + let val = relation_store.encode_val_for_store(&extracted, *span)?; if has_triggers { if let Some(existing) = self.tx.get(&key, false)? { diff --git a/cozo-core/src/runtime/db.rs b/cozo-core/src/runtime/db.rs index 62fe5beb..646b365e 100644 --- a/cozo-core/src/runtime/db.rs +++ b/cozo-core/src/runtime/db.rs @@ -7,6 +7,7 @@ */ use std::collections::BTreeMap; +use std::default::Default; use std::fmt::{Debug, Formatter}; use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use std::sync::{Arc, Mutex}; @@ -17,7 +18,7 @@ use either::{Left, Right}; use itertools::Itertools; use lazy_static::lazy_static; use miette::{ - bail, ensure, Diagnostic, GraphicalReportHandler, GraphicalTheme, IntoDiagnostic, + bail, ensure, miette, Diagnostic, GraphicalReportHandler, GraphicalTheme, IntoDiagnostic, JSONReportHandler, Result, WrapErr, }; use serde_json::{json, Map}; @@ -26,8 +27,10 @@ use thiserror::Error; use crate::data::json::JsonValue; use crate::data::program::{InputProgram, QueryAssertion, RelationOp}; +use crate::data::relation::ColumnDef; use crate::data::tuple::Tuple; use crate::data::value::{DataValue, LARGEST_UTF_CHAR}; +use crate::decode_tuple_from_kv; use crate::parse::sys::SysOp; use crate::parse::{parse_script, CozoScript, SourceSpan}; use crate::query::compile::{CompiledProgram, CompiledRule, CompiledRuleSet}; @@ -108,36 +111,6 @@ impl<'s, S: Storage<'s>> Db { Ok(()) } - fn compact_relation(&'s self) -> Result<()> { - let l = Tuple::default().encode_as_key(RelationId(0)); - let u = Tuple(vec![DataValue::Bot]).encode_as_key(RelationId(u64::MAX)); - self.db.range_compact(&l, &u)?; - Ok(()) - } - - fn load_last_ids(&'s self) -> Result<()> { - let mut tx = self.transact()?; - self.relation_store_id - .store(tx.load_last_relation_store_id()?.0, Ordering::Release); - tx.commit_tx()?; - Ok(()) - } - fn transact(&'s self) -> Result> { - let ret = SessionTx { - tx: Box::new(self.db.transact(false)?), - mem_store_id: Default::default(), - relation_store_id: self.relation_store_id.clone(), - }; - Ok(ret) - } - fn transact_write(&'s self) -> Result> { - let ret = SessionTx { - tx: Box::new(self.db.transact(true)?), - mem_store_id: Default::default(), - relation_store_id: self.relation_store_id.clone(), - }; - Ok(ret) - } /// Run the CozoScript passed in. The `params` argument is a map of parameters. pub fn run_script( &'s self, @@ -210,6 +183,198 @@ impl<'s, S: Storage<'s>> Db { }; self.run_script_fold_err(payload, ¶ms_json).to_string() } + /// Export relations to JSON data. + pub fn export_relations<'a>( + &'s self, + relations: impl Iterator, + ) -> Result { + let tx = self.transact()?; + let mut ret = serde_json::Map::default(); + for rel in relations { + let mut coll = vec![]; + let handle = tx.get_relation(rel, false)?; + let mut cols = handle + .metadata + .keys + .iter() + .map(|col| col.name.clone()) + .collect_vec(); + cols.extend( + handle + .metadata + .non_keys + .iter() + .map(|col| col.name.clone()) + .collect_vec(), + ); + + let start = handle.encode_key_for_store(&Tuple(vec![]), Default::default())?; + let end = + handle.encode_key_for_store(&Tuple(vec![DataValue::Bot]), Default::default())?; + for data in tx.tx.range_scan(&start, &end) { + let (k, v) = data?; + let tuple = decode_tuple_from_kv(&k, &v); + let mut j_map = serde_json::Map::default(); + for (k, v) in cols.iter().zip(tuple.0) { + let j_v = JsonValue::from(v); + j_map.insert(k.to_string(), j_v); + } + coll.push(JsonValue::Object(j_map)); + } + ret.insert(rel.to_string(), JsonValue::Array(coll)); + } + Ok(JsonValue::Object(ret)) + } + /// Export relations to JSON-encoded string + pub fn export_relations_str(&'s self, relations_str: &str) -> String { + match self.export_relations_str_inner(relations_str) { + Ok(s) => { + let ret = json!({"ok": true, "data": s}); + format!("{}", ret) + } + Err(err) => { + let ret = json!({"ok": false, "message": err.to_string()}); + format!("{}", ret) + } + } + } + fn export_relations_str_inner(&'s self, relations_str: &str) -> Result { + let j_val: JsonValue = serde_json::from_str(relations_str).into_diagnostic()?; + let v = j_val + .as_array() + .ok_or_else(|| miette!("expects an array"))?; + let relations: Vec<_> = v + .iter() + .map(|name| { + name.as_str() + .ok_or_else(|| miette!("expects an array of string names")) + }) + .try_collect()?; + let results = self.export_relations(relations.into_iter())?; + Ok(results) + } + /// Import a relation + pub fn import_relation(&'s self, relation: &str, in_data: &[JsonValue]) -> Result<()> { + #[derive(Debug, Diagnostic, Error)] + #[error("cannot import data for relation '{0}': {1}")] + #[diagnostic(code(import::bad_data))] + struct BadDataForRelation(String, JsonValue); + + let mut tx = self.transact_write()?; + let handle = tx.get_relation(relation, true)?; + for val in in_data { + let proc_col = |col: &ColumnDef| { + let d_val = match val.get(&col.name as &str) { + None => match &col.default_gen { + Some(gen) => gen.clone().eval_to_const()?, + None => { + bail!(BadDataForRelation(relation.to_string(), val.clone())) + } + }, + Some(data) => DataValue::from(data), + }; + col.typing.coerce(d_val) + }; + + let keys: Vec<_> = handle.metadata.keys.iter().map(proc_col).try_collect()?; + let vals: Vec<_> = handle + .metadata + .non_keys + .iter() + .map(proc_col) + .try_collect()?; + let k_store = handle.encode_key_for_store(&Tuple(keys), Default::default())?; + let v_store = handle.encode_val_for_store(&Tuple(vals), Default::default())?; + tx.tx.put(&k_store, &v_store)?; + } + Ok(()) + } + /// Import a relation, the data is given as a JSON string, and the returned result is converted into a string + pub fn import_relation_str(&'s self, data: &str) -> String { + match self.import_relation_str_inner(data) { + Ok(()) => { + format!("{}", json!({"ok": true})) + } + Err(err) => { + format!("{}", json!({"ok": false, "message": err.to_string()})) + } + } + } + fn import_relation_str_inner(&'s self, data: &str) -> Result<()> { + let j_obj: JsonValue = serde_json::from_str(data).into_diagnostic()?; + let name_val = j_obj + .get("relation") + .ok_or_else(|| miette!("key 'relation' required"))?; + let name = name_val + .as_str() + .ok_or_else(|| miette!("field 'relation' must be a string"))?; + let data_val = j_obj + .get("data") + .ok_or_else(|| miette!("key 'data' required"))?; + let data = data_val + .as_array() + .ok_or_else(|| miette!("field 'data' must be an array"))?; + self.import_relation(name, data) + } + /// Backup the running database into an Sqlite file + pub fn backup_db(&'s self, out_file: String) -> Result<()> { + #[cfg(feature = "storage-sqlite")] + { + let sqlite_db = crate::new_cozo_sqlite(out_file)?; + let mut s_tx = sqlite_db.transact_write()?; + let tx = self.transact()?; + let iter = tx.tx.range_scan(&[], &[1]); + s_tx.tx.batch_put(iter)?; + Ok(()) + } + #[cfg(not(feature = "storage-sqlite"))] + bail!("backup requires the 'storeage-sqlite' feature to be enabled") + } + /// Restore from an Sqlite backup + pub fn restore_backup(&'s self, in_file: String) -> Result<()> { + #[cfg(feature = "storage-sqlite")] + { + let sqlite_db = crate::new_cozo_sqlite(in_file)?; + let s_tx = sqlite_db.transact_write()?; + let mut tx = self.transact()?; + let iter = s_tx.tx.range_scan(&[], &[1]); + tx.tx.batch_put(iter)?; + Ok(()) + } + #[cfg(not(feature = "storage-sqlite"))] + bail!("backup requires the 'storeage-sqlite' feature to be enabled") + } + + fn compact_relation(&'s self) -> Result<()> { + let l = Tuple::default().encode_as_key(RelationId(0)); + let u = Tuple(vec![DataValue::Bot]).encode_as_key(RelationId(u64::MAX)); + self.db.range_compact(&l, &u)?; + Ok(()) + } + + fn load_last_ids(&'s self) -> Result<()> { + let mut tx = self.transact()?; + self.relation_store_id + .store(tx.load_last_relation_store_id()?.0, Ordering::Release); + tx.commit_tx()?; + Ok(()) + } + fn transact(&'s self) -> Result> { + let ret = SessionTx { + tx: Box::new(self.db.transact(false)?), + mem_store_id: Default::default(), + relation_store_id: self.relation_store_id.clone(), + }; + Ok(ret) + } + fn transact_write(&'s self) -> Result> { + let ret = SessionTx { + tx: Box::new(self.db.transact(true)?), + mem_store_id: Default::default(), + relation_store_id: self.relation_store_id.clone(), + }; + Ok(ret) + } fn do_run_script( &'s self, payload: &str, diff --git a/cozo-core/src/runtime/relation.rs b/cozo-core/src/runtime/relation.rs index 7c28564b..ccfe5d71 100644 --- a/cozo-core/src/runtime/relation.rs +++ b/cozo-core/src/runtime/relation.rs @@ -123,7 +123,7 @@ impl RelationHandle { ret.extend(prefix_bytes); ret } - pub(crate) fn adhoc_encode_key(&self, tuple: &Tuple, span: SourceSpan) -> Result> { + pub(crate) fn encode_key_for_store(&self, tuple: &Tuple, span: SourceSpan) -> Result> { let len = self.metadata.keys.len(); ensure!( tuple.0.len() >= len, @@ -140,7 +140,7 @@ impl RelationHandle { } Ok(ret) } - pub(crate) fn adhoc_encode_val(&self, tuple: &Tuple, _span: SourceSpan) -> Result> { + pub(crate) fn encode_val_for_store(&self, tuple: &Tuple, _span: SourceSpan) -> Result> { let start = self.metadata.keys.len(); let len = self.metadata.non_keys.len(); let mut ret = self.encode_key_prefix(len); diff --git a/cozo-core/src/storage/mem.rs b/cozo-core/src/storage/mem.rs index 455439c4..21878c5f 100644 --- a/cozo-core/src/storage/mem.rs +++ b/cozo-core/src/storage/mem.rs @@ -197,10 +197,13 @@ impl<'s> StoreTx<'s> for MemTx<'s> { } } - fn batch_put( - &mut self, - data: Box, Vec)>>>, - ) -> Result<()> { + fn batch_put<'a>( + &'a mut self, + data: Box, Vec)>> + 'a>, + ) -> Result<()> + where + 's: 'a, + { match self { MemTx::Reader(_) => { bail!("write in read transaction") diff --git a/cozo-core/src/storage/mod.rs b/cozo-core/src/storage/mod.rs index c9ce6300..ce1c9ded 100644 --- a/cozo-core/src/storage/mod.rs +++ b/cozo-core/src/storage/mod.rs @@ -76,7 +76,8 @@ pub trait StoreTx<'s> { upper: &[u8], ) -> Box> + 'a> where - 's: 'a { + 's: 'a, + { let it = self.range_scan(lower, upper); Box::new(it.map_ok(|(k, v)| decode_tuple_from_kv(&k, &v))) } @@ -94,10 +95,13 @@ pub trait StoreTx<'s> { /// Put multiple key-value pairs into the database. /// The default implementation just calls `put` repeatedly. /// Implement if there is a more efficient way. - fn batch_put( - &mut self, - data: Box, Vec)>>>, - ) -> Result<()> { + fn batch_put<'a>( + &'a mut self, + data: Box, Vec)>> + 'a>, + ) -> Result<()> + where + 's: 'a, + { for pair in data { let (k, v) = pair?; self.put(&k, &v)?; diff --git a/cozo-core/src/storage/sqlite.rs b/cozo-core/src/storage/sqlite.rs index 478c139f..2c02e1e3 100644 --- a/cozo-core/src/storage/sqlite.rs +++ b/cozo-core/src/storage/sqlite.rs @@ -221,10 +221,13 @@ impl<'s> StoreTx<'s> for SqliteTx<'s> { Box::new(RawIter(statement)) } - fn batch_put( - &mut self, - data: Box, Vec)>>>, - ) -> Result<()> { + fn batch_put<'a>( + &'a mut self, + data: Box, Vec)>> + 'a>, + ) -> Result<()> + where + 's: 'a, + { let query = r#" insert into cozo(k, v) values (?, ?) on conflict(k) do update set v=excluded.v;