import/export

main
Ziyang Hu 2 years ago
parent fb94fcd1c5
commit 4366a2274e

16
Cargo.lock generated

@ -775,9 +775,9 @@ dependencies = [
[[package]]
name = "digest"
version = "0.10.5"
version = "0.10.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "adfbc57365a37acbd2ebf2b64d7e69bb766e2fea813521ed536f5d0520dcf86c"
checksum = "8168378f4e5023e7218c89c891c0fd8ecdb5e5e4f18cb78f38cf245dd021e76f"
dependencies = [
"block-buffer",
"crypto-common",
@ -1637,9 +1637,9 @@ dependencies = [
[[package]]
name = "nalgebra"
version = "0.31.3"
version = "0.31.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0eef2ccf9aefdae3334ad5911b5414724b9047bfe145b3353d5878732cc7b86b"
checksum = "20bd243ab3dbb395b39ee730402d2e5405e448c75133ec49cc977762c4cba3d1"
dependencies = [
"approx",
"matrixmultiply",
@ -2388,9 +2388,9 @@ dependencies = [
[[package]]
name = "reqwest"
version = "0.11.12"
version = "0.11.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "431949c384f4e2ae07605ccaa56d1d9d2ecdb5cadd4f9577ccfab29f2e5149fc"
checksum = "68cc60575865c7831548863cc02356512e3f1dc2f3f82cb837d7fc4cc8f3c97c"
dependencies = [
"base64",
"bytes",
@ -3315,9 +3315,9 @@ dependencies = [
[[package]]
name = "uuid"
version = "1.2.1"
version = "1.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "feb41e78f93363bb2df8b0e86a2ca30eed7806ea16ea0c790d757cf93f79be83"
checksum = "422ee0de9031b5b948b97a8fc04e3aa35230001a722ddd27943e0be31564ce4c"
dependencies = [
"atomic",
"getrandom 0.2.8",

@ -118,7 +118,7 @@ impl<'a> SessionTx<'a> {
.map(|ex| ex.extract_data(&tuple))
.try_collect()?,
);
let key = relation_store.adhoc_encode_key(&extracted, *span)?;
let key = relation_store.encode_key_for_store(&extracted, *span)?;
if has_triggers {
if let Some(existing) = self.tx.get(&key, false)? {
let mut tup = extracted.clone();
@ -205,8 +205,8 @@ impl<'a> SessionTx<'a> {
.try_collect()?,
);
let key = relation_store.adhoc_encode_key(&extracted, *span)?;
let val = relation_store.adhoc_encode_val(&extracted, *span)?;
let key = relation_store.encode_key_for_store(&extracted, *span)?;
let val = relation_store.encode_val_for_store(&extracted, *span)?;
let existing = self.tx.get(&key, true)?;
match existing {
@ -254,7 +254,7 @@ impl<'a> SessionTx<'a> {
.map(|ex| ex.extract_data(&tuple))
.try_collect()?,
);
let key = relation_store.adhoc_encode_key(&extracted, *span)?;
let key = relation_store.encode_key_for_store(&extracted, *span)?;
if self.tx.exists(&key, true)? {
bail!(TransactAssertionFailure {
relation: relation_store.name.to_string(),
@ -302,8 +302,8 @@ impl<'a> SessionTx<'a> {
.try_collect()?,
);
let key = relation_store.adhoc_encode_key(&extracted, *span)?;
let val = relation_store.adhoc_encode_val(&extracted, *span)?;
let key = relation_store.encode_key_for_store(&extracted, *span)?;
let val = relation_store.encode_val_for_store(&extracted, *span)?;
if has_triggers {
if let Some(existing) = self.tx.get(&key, false)? {

@ -7,6 +7,7 @@
*/
use std::collections::BTreeMap;
use std::default::Default;
use std::fmt::{Debug, Formatter};
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
@ -17,7 +18,7 @@ use either::{Left, Right};
use itertools::Itertools;
use lazy_static::lazy_static;
use miette::{
bail, ensure, Diagnostic, GraphicalReportHandler, GraphicalTheme, IntoDiagnostic,
bail, ensure, miette, Diagnostic, GraphicalReportHandler, GraphicalTheme, IntoDiagnostic,
JSONReportHandler, Result, WrapErr,
};
use serde_json::{json, Map};
@ -26,8 +27,10 @@ use thiserror::Error;
use crate::data::json::JsonValue;
use crate::data::program::{InputProgram, QueryAssertion, RelationOp};
use crate::data::relation::ColumnDef;
use crate::data::tuple::Tuple;
use crate::data::value::{DataValue, LARGEST_UTF_CHAR};
use crate::decode_tuple_from_kv;
use crate::parse::sys::SysOp;
use crate::parse::{parse_script, CozoScript, SourceSpan};
use crate::query::compile::{CompiledProgram, CompiledRule, CompiledRuleSet};
@ -108,36 +111,6 @@ impl<'s, S: Storage<'s>> Db<S> {
Ok(())
}
fn compact_relation(&'s self) -> Result<()> {
let l = Tuple::default().encode_as_key(RelationId(0));
let u = Tuple(vec![DataValue::Bot]).encode_as_key(RelationId(u64::MAX));
self.db.range_compact(&l, &u)?;
Ok(())
}
fn load_last_ids(&'s self) -> Result<()> {
let mut tx = self.transact()?;
self.relation_store_id
.store(tx.load_last_relation_store_id()?.0, Ordering::Release);
tx.commit_tx()?;
Ok(())
}
fn transact(&'s self) -> Result<SessionTx<'_>> {
let ret = SessionTx {
tx: Box::new(self.db.transact(false)?),
mem_store_id: Default::default(),
relation_store_id: self.relation_store_id.clone(),
};
Ok(ret)
}
fn transact_write(&'s self) -> Result<SessionTx<'_>> {
let ret = SessionTx {
tx: Box::new(self.db.transact(true)?),
mem_store_id: Default::default(),
relation_store_id: self.relation_store_id.clone(),
};
Ok(ret)
}
/// Run the CozoScript passed in. The `params` argument is a map of parameters.
pub fn run_script(
&'s self,
@ -210,6 +183,198 @@ impl<'s, S: Storage<'s>> Db<S> {
};
self.run_script_fold_err(payload, &params_json).to_string()
}
/// Export relations to JSON data.
pub fn export_relations<'a>(
&'s self,
relations: impl Iterator<Item = &'a str>,
) -> Result<JsonValue> {
let tx = self.transact()?;
let mut ret = serde_json::Map::default();
for rel in relations {
let mut coll = vec![];
let handle = tx.get_relation(rel, false)?;
let mut cols = handle
.metadata
.keys
.iter()
.map(|col| col.name.clone())
.collect_vec();
cols.extend(
handle
.metadata
.non_keys
.iter()
.map(|col| col.name.clone())
.collect_vec(),
);
let start = handle.encode_key_for_store(&Tuple(vec![]), Default::default())?;
let end =
handle.encode_key_for_store(&Tuple(vec![DataValue::Bot]), Default::default())?;
for data in tx.tx.range_scan(&start, &end) {
let (k, v) = data?;
let tuple = decode_tuple_from_kv(&k, &v);
let mut j_map = serde_json::Map::default();
for (k, v) in cols.iter().zip(tuple.0) {
let j_v = JsonValue::from(v);
j_map.insert(k.to_string(), j_v);
}
coll.push(JsonValue::Object(j_map));
}
ret.insert(rel.to_string(), JsonValue::Array(coll));
}
Ok(JsonValue::Object(ret))
}
/// Export relations to JSON-encoded string
pub fn export_relations_str(&'s self, relations_str: &str) -> String {
match self.export_relations_str_inner(relations_str) {
Ok(s) => {
let ret = json!({"ok": true, "data": s});
format!("{}", ret)
}
Err(err) => {
let ret = json!({"ok": false, "message": err.to_string()});
format!("{}", ret)
}
}
}
fn export_relations_str_inner(&'s self, relations_str: &str) -> Result<JsonValue> {
let j_val: JsonValue = serde_json::from_str(relations_str).into_diagnostic()?;
let v = j_val
.as_array()
.ok_or_else(|| miette!("expects an array"))?;
let relations: Vec<_> = v
.iter()
.map(|name| {
name.as_str()
.ok_or_else(|| miette!("expects an array of string names"))
})
.try_collect()?;
let results = self.export_relations(relations.into_iter())?;
Ok(results)
}
/// Import a relation
pub fn import_relation(&'s self, relation: &str, in_data: &[JsonValue]) -> Result<()> {
#[derive(Debug, Diagnostic, Error)]
#[error("cannot import data for relation '{0}': {1}")]
#[diagnostic(code(import::bad_data))]
struct BadDataForRelation(String, JsonValue);
let mut tx = self.transact_write()?;
let handle = tx.get_relation(relation, true)?;
for val in in_data {
let proc_col = |col: &ColumnDef| {
let d_val = match val.get(&col.name as &str) {
None => match &col.default_gen {
Some(gen) => gen.clone().eval_to_const()?,
None => {
bail!(BadDataForRelation(relation.to_string(), val.clone()))
}
},
Some(data) => DataValue::from(data),
};
col.typing.coerce(d_val)
};
let keys: Vec<_> = handle.metadata.keys.iter().map(proc_col).try_collect()?;
let vals: Vec<_> = handle
.metadata
.non_keys
.iter()
.map(proc_col)
.try_collect()?;
let k_store = handle.encode_key_for_store(&Tuple(keys), Default::default())?;
let v_store = handle.encode_val_for_store(&Tuple(vals), Default::default())?;
tx.tx.put(&k_store, &v_store)?;
}
Ok(())
}
/// Import a relation, the data is given as a JSON string, and the returned result is converted into a string
pub fn import_relation_str(&'s self, data: &str) -> String {
match self.import_relation_str_inner(data) {
Ok(()) => {
format!("{}", json!({"ok": true}))
}
Err(err) => {
format!("{}", json!({"ok": false, "message": err.to_string()}))
}
}
}
fn import_relation_str_inner(&'s self, data: &str) -> Result<()> {
let j_obj: JsonValue = serde_json::from_str(data).into_diagnostic()?;
let name_val = j_obj
.get("relation")
.ok_or_else(|| miette!("key 'relation' required"))?;
let name = name_val
.as_str()
.ok_or_else(|| miette!("field 'relation' must be a string"))?;
let data_val = j_obj
.get("data")
.ok_or_else(|| miette!("key 'data' required"))?;
let data = data_val
.as_array()
.ok_or_else(|| miette!("field 'data' must be an array"))?;
self.import_relation(name, data)
}
/// Backup the running database into an Sqlite file
pub fn backup_db(&'s self, out_file: String) -> Result<()> {
#[cfg(feature = "storage-sqlite")]
{
let sqlite_db = crate::new_cozo_sqlite(out_file)?;
let mut s_tx = sqlite_db.transact_write()?;
let tx = self.transact()?;
let iter = tx.tx.range_scan(&[], &[1]);
s_tx.tx.batch_put(iter)?;
Ok(())
}
#[cfg(not(feature = "storage-sqlite"))]
bail!("backup requires the 'storeage-sqlite' feature to be enabled")
}
/// Restore from an Sqlite backup
pub fn restore_backup(&'s self, in_file: String) -> Result<()> {
#[cfg(feature = "storage-sqlite")]
{
let sqlite_db = crate::new_cozo_sqlite(in_file)?;
let s_tx = sqlite_db.transact_write()?;
let mut tx = self.transact()?;
let iter = s_tx.tx.range_scan(&[], &[1]);
tx.tx.batch_put(iter)?;
Ok(())
}
#[cfg(not(feature = "storage-sqlite"))]
bail!("backup requires the 'storeage-sqlite' feature to be enabled")
}
fn compact_relation(&'s self) -> Result<()> {
let l = Tuple::default().encode_as_key(RelationId(0));
let u = Tuple(vec![DataValue::Bot]).encode_as_key(RelationId(u64::MAX));
self.db.range_compact(&l, &u)?;
Ok(())
}
fn load_last_ids(&'s self) -> Result<()> {
let mut tx = self.transact()?;
self.relation_store_id
.store(tx.load_last_relation_store_id()?.0, Ordering::Release);
tx.commit_tx()?;
Ok(())
}
fn transact(&'s self) -> Result<SessionTx<'_>> {
let ret = SessionTx {
tx: Box::new(self.db.transact(false)?),
mem_store_id: Default::default(),
relation_store_id: self.relation_store_id.clone(),
};
Ok(ret)
}
fn transact_write(&'s self) -> Result<SessionTx<'_>> {
let ret = SessionTx {
tx: Box::new(self.db.transact(true)?),
mem_store_id: Default::default(),
relation_store_id: self.relation_store_id.clone(),
};
Ok(ret)
}
fn do_run_script(
&'s self,
payload: &str,

@ -123,7 +123,7 @@ impl RelationHandle {
ret.extend(prefix_bytes);
ret
}
pub(crate) fn adhoc_encode_key(&self, tuple: &Tuple, span: SourceSpan) -> Result<Vec<u8>> {
pub(crate) fn encode_key_for_store(&self, tuple: &Tuple, span: SourceSpan) -> Result<Vec<u8>> {
let len = self.metadata.keys.len();
ensure!(
tuple.0.len() >= len,
@ -140,7 +140,7 @@ impl RelationHandle {
}
Ok(ret)
}
pub(crate) fn adhoc_encode_val(&self, tuple: &Tuple, _span: SourceSpan) -> Result<Vec<u8>> {
pub(crate) fn encode_val_for_store(&self, tuple: &Tuple, _span: SourceSpan) -> Result<Vec<u8>> {
let start = self.metadata.keys.len();
let len = self.metadata.non_keys.len();
let mut ret = self.encode_key_prefix(len);

@ -197,10 +197,13 @@ impl<'s> StoreTx<'s> for MemTx<'s> {
}
}
fn batch_put(
&mut self,
data: Box<dyn Iterator<Item = Result<(Vec<u8>, Vec<u8>)>>>,
) -> Result<()> {
fn batch_put<'a>(
&'a mut self,
data: Box<dyn Iterator<Item = Result<(Vec<u8>, Vec<u8>)>> + 'a>,
) -> Result<()>
where
's: 'a,
{
match self {
MemTx::Reader(_) => {
bail!("write in read transaction")

@ -76,7 +76,8 @@ pub trait StoreTx<'s> {
upper: &[u8],
) -> Box<dyn Iterator<Item = Result<Tuple>> + 'a>
where
's: 'a {
's: 'a,
{
let it = self.range_scan(lower, upper);
Box::new(it.map_ok(|(k, v)| decode_tuple_from_kv(&k, &v)))
}
@ -94,10 +95,13 @@ pub trait StoreTx<'s> {
/// Put multiple key-value pairs into the database.
/// The default implementation just calls `put` repeatedly.
/// Implement if there is a more efficient way.
fn batch_put(
&mut self,
data: Box<dyn Iterator<Item = Result<(Vec<u8>, Vec<u8>)>>>,
) -> Result<()> {
fn batch_put<'a>(
&'a mut self,
data: Box<dyn Iterator<Item = Result<(Vec<u8>, Vec<u8>)>> + 'a>,
) -> Result<()>
where
's: 'a,
{
for pair in data {
let (k, v) = pair?;
self.put(&k, &v)?;

@ -221,10 +221,13 @@ impl<'s> StoreTx<'s> for SqliteTx<'s> {
Box::new(RawIter(statement))
}
fn batch_put(
&mut self,
data: Box<dyn Iterator<Item = Result<(Vec<u8>, Vec<u8>)>>>,
) -> Result<()> {
fn batch_put<'a>(
&'a mut self,
data: Box<dyn Iterator<Item = Result<(Vec<u8>, Vec<u8>)>> + 'a>,
) -> Result<()>
where
's: 'a,
{
let query = r#"
insert into cozo(k, v) values (?, ?)
on conflict(k) do update set v=excluded.v;

Loading…
Cancel
Save