Improve the use of serde

main
Ziyang Hu 2 years ago
parent efb5267724
commit 3b80b02906

@ -22,7 +22,6 @@ use thiserror::Error;
use crate::algo::{AlgoImpl, CannotDetermineArity};
use crate::data::expr::Expr;
use crate::data::json::JsonValue;
use crate::data::program::{MagicAlgoApply, MagicSymbol};
use crate::data::symb::Symbol;
use crate::data::tuple::Tuple;
@ -66,7 +65,7 @@ impl AlgoImpl for JsonReader {
_ => bail!(BadFields(fields_span)),
};
let mut counter = -1i64;
let mut process_row = |row: &JsonValue| -> Result<()> {
let mut process_row = |row: &BTreeMap<String, DataValue>| -> Result<()> {
let mut ret = if prepend_index {
counter += 1;
vec![DataValue::from(counter)]
@ -82,7 +81,7 @@ impl AlgoImpl for JsonReader {
bail!("field {} is absent from JSON line", field);
}
}
Some(v) => DataValue::from(v),
Some(v) => v.clone(),
};
ret.push(val);
}
@ -103,11 +102,8 @@ impl AlgoImpl for JsonReader {
}
} else {
let content = fs::read_to_string(file_path).into_diagnostic()?;
let data: JsonValue = serde_json::from_str(&content).into_diagnostic()?;
let rows = data
.as_array()
.ok_or_else(|| miette!("JSON file is not an array"))?;
for row in rows {
let rows: Vec<BTreeMap<String, DataValue>> = serde_json::from_str(&content).into_diagnostic()?;
for row in &rows {
process_row(row)?;
}
}
@ -126,11 +122,8 @@ impl AlgoImpl for JsonReader {
}
}
} else {
let data: JsonValue = serde_json::from_str(content).into_diagnostic()?;
let rows = data
.as_array()
.ok_or_else(|| miette!("JSON file is not an array"))?;
for row in rows {
let rows: Vec<BTreeMap<String, DataValue>> = serde_json::from_str(content).into_diagnostic()?;
for row in &rows {
process_row(row)?;
}
}

@ -18,7 +18,7 @@ use smartstring::{LazyCompact, SmartString};
use uuid::Uuid;
#[derive(Clone, Hash, Eq, PartialEq, serde_derive::Deserialize, serde_derive::Serialize)]
pub(crate) struct UuidWrapper(pub(crate) Uuid);
pub struct UuidWrapper(pub(crate) Uuid);
impl PartialOrd<Self> for UuidWrapper {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
@ -38,7 +38,7 @@ impl Ord for UuidWrapper {
}
#[derive(Clone)]
pub(crate) struct RegexWrapper(pub(crate) Regex);
pub struct RegexWrapper(pub(crate) Regex);
impl Hash for RegexWrapper {
fn hash<H: Hasher>(&self, state: &mut H) {
@ -87,7 +87,7 @@ impl PartialOrd for RegexWrapper {
#[derive(
Clone, PartialEq, Eq, PartialOrd, Ord, serde_derive::Deserialize, serde_derive::Serialize, Hash,
)]
pub(crate) enum DataValue {
pub enum DataValue {
Null,
Bool(bool),
Num(Num),
@ -115,7 +115,7 @@ impl From<f64> for DataValue {
}
#[derive(Copy, Clone, serde_derive::Deserialize, serde_derive::Serialize)]
pub(crate) enum Num {
pub enum Num {
Int(i64),
Float(f64),
}

@ -33,9 +33,10 @@
#![allow(clippy::type_complexity)]
#![allow(clippy::too_many_arguments)]
use itertools::Itertools;
use std::collections::BTreeMap;
use lazy_static::lazy_static;
pub use miette::Error;
#[allow(unused_imports)]
use miette::{
bail, miette, GraphicalReportHandler, GraphicalTheme, IntoDiagnostic, JSONReportHandler,
Result, ThemeCharacters, ThemeStyles,
@ -56,6 +57,7 @@ pub use storage::tikv::{new_cozo_tikv, TiKvStorage};
pub use storage::{Storage, StoreTx};
use crate::data::json::JsonValue;
use crate::data::value::DataValue;
// pub use storage::re::{new_cozo_redb, ReStorage};
@ -110,7 +112,7 @@ impl DbInstance {
/// `path` is ignored for `mem` and `tikv` kinds.
/// `options` is ignored for every kind except `tikv`.
#[allow(unused_variables)]
pub fn new(kind: &str, path: &str, options: JsonValue) -> Result<Self> {
pub fn new(kind: &str, path: &str, options: &str) -> Result<Self> {
Ok(match kind {
"mem" => Self::Mem(new_cozo_mem()?),
#[cfg(feature = "storage-sqlite")]
@ -121,25 +123,14 @@ impl DbInstance {
"sled" => Self::Sled(new_cozo_sled(path)?),
#[cfg(feature = "storage-tikv")]
"tikv" => {
let end_points = options
.get("pd_endpoints")
.ok_or_else(|| miette!("required option 'pd_endpoints' not found"))?;
let end_points = end_points
.as_array()
.ok_or_else(|| miette!("option 'pd_endpoints' must be an array"))?;
let end_points: Vec<_> = end_points
.iter()
.map(|v| {
v.as_str()
.map(|s| s.to_string())
.ok_or_else(|| "option 'pd_endpoints' must contain strings")
})
.try_collect()?;
let optimistic = options.get("optimistic").unwrap_or(&JsonValue::Bool(true));
let optimistic = optimistic
.as_bool()
.ok_or_else(|| miette!("option 'optimistic' must be a bool"))?;
Self::TiKv(new_cozo_tikv(end_points, optimistic)?)
#[derive(serde_derive::Deserialize)]
struct TiKvOpts {
end_points: Vec<String>,
#[serde(default = "Default::default()")]
optimistic: bool,
}
let opts: TiKvOpts = serde_json::from_str(options)?;
Self::TiKv(new_cozo_tikv(opts.end_points.clone(), opts.optimistic)?)
}
kind => bail!(
"database kind '{}' not supported (maybe not compiled in)",
@ -153,11 +144,10 @@ impl DbInstance {
path: &str,
options: &str,
) -> std::result::Result<Self, String> {
let options: JsonValue = serde_json::from_str(options).map_err(|e| e.to_string())?;
Self::new(kind, path, options).map_err(|err| err.to_string())
}
/// Dispatcher method. See [crate::Db::run_script].
pub fn run_script(&self, payload: &str, params: &Map<String, JsonValue>) -> Result<JsonValue> {
pub fn run_script(&self, payload: &str, params: &BTreeMap<String, DataValue>) -> Result<JsonValue> {
match self {
DbInstance::Mem(db) => db.run_script(payload, params),
#[cfg(feature = "storage-sqlite")]
@ -172,7 +162,7 @@ impl DbInstance {
}
/// Run the CozoScript passed in. The `params` argument is a map of parameters.
/// Fold any error into the return JSON itself.
pub fn run_script_fold_err(&self, payload: &str, params: &Map<String, JsonValue>) -> JsonValue {
pub fn run_script_fold_err(&self, payload: &str, params: &BTreeMap<String, DataValue>) -> JsonValue {
match self.run_script(payload, params) {
Ok(json) => json,
Err(mut err) => {
@ -199,14 +189,10 @@ impl DbInstance {
/// Run the CozoScript passed in. The `params` argument is a map of parameters formatted as JSON.
pub fn run_script_str(&self, payload: &str, params: &str) -> String {
let params_json = if params.is_empty() {
Map::default()
BTreeMap::default()
} else {
match serde_json::from_str::<serde_json::Value>(params) {
Ok(serde_json::Value::Object(map)) => map,
Ok(_) => {
return json!({"ok": false, "message": "params argument is not valid JSON"})
.to_string()
}
match serde_json::from_str::<BTreeMap<String, DataValue>>(params) {
Ok(map) => map,
Err(_) => {
return json!({"ok": false, "message": "params argument is not a JSON map"})
.to_string()
@ -247,26 +233,15 @@ impl DbInstance {
}
}
fn export_relations_str_inner(&self, data: &str) -> Result<JsonValue> {
let j_val: JsonValue = serde_json::from_str(data).into_diagnostic()?;
let relations = j_val
.get("relations")
.ok_or_else(|| miette!("field 'relations' expected"))?;
let v = relations
.as_array()
.ok_or_else(|| miette!("expects field 'relations' to be an array"))?;
let relations: Vec<_> = v
.iter()
.map(|name| {
name.as_str().ok_or_else(|| {
miette!("expects field 'relations' to be an array of string names")
})
})
.try_collect()?;
let as_objects = j_val.get("as_objects").unwrap_or(&JsonValue::Bool(false));
let as_objects = as_objects
.as_bool()
.ok_or_else(|| miette!("expects field 'as_objects' to be a boolean"))?;
let results = self.export_relations(relations.into_iter(), as_objects)?;
#[derive(serde_derive::Deserialize)]
struct Payload {
relations: Vec<String>,
#[serde(default = "Default::default")]
as_objects: bool,
}
let j_val: Payload = serde_json::from_str(data).into_diagnostic()?;
let results =
self.export_relations(j_val.relations.iter().map(|s| s as &str), j_val.as_objects)?;
Ok(results)
}
/// Dispatcher method. See [crate::Db::import_relations].
@ -295,11 +270,8 @@ impl DbInstance {
}
}
fn import_relations_str_inner(&self, data: &str) -> Result<()> {
let j_obj: JsonValue = serde_json::from_str(data).into_diagnostic()?;
let j_obj = j_obj
.as_object()
.ok_or_else(|| miette!("expect an object"))?;
self.import_relations(j_obj)
let j_obj: Map<String, JsonValue> = serde_json::from_str(data).into_diagnostic()?;
self.import_relations(&j_obj)
}
/// Dispatcher method. See [crate::Db::backup_db].
pub fn backup_db(&self, out_file: String) -> Result<()> {
@ -323,7 +295,7 @@ impl DbInstance {
}
}
/// Restore from an Sqlite backup
pub fn restore_backup(&self, in_file: String) -> Result<()> {
pub fn restore_backup(&self, in_file: &str) -> Result<()> {
match self {
DbInstance::Mem(db) => db.restore_backup(in_file),
#[cfg(feature = "storage-sqlite")]
@ -338,11 +310,42 @@ impl DbInstance {
}
/// Restore from an Sqlite backup, with JSON string return value
pub fn restore_backup_str(&self, in_file: &str) -> String {
match self.restore_backup(in_file.to_string()) {
match self.restore_backup(in_file) {
Ok(_) => json!({"ok": true}).to_string(),
Err(err) => json!({"ok": false, "message": err.to_string()}).to_string(),
}
}
/// Dispatcher method. See [crate::Db::import_from_backup].
pub fn import_from_backup(&self, in_file: &str, relations: &[String]) -> Result<()> {
match self {
DbInstance::Mem(db) => db.import_from_backup(in_file, relations),
#[cfg(feature = "storage-sqlite")]
DbInstance::Sqlite(db) => db.import_from_backup(in_file, relations),
#[cfg(feature = "storage-rocksdb")]
DbInstance::RocksDb(db) => db.import_from_backup(in_file, relations),
#[cfg(feature = "storage-sled")]
DbInstance::Sled(db) => db.import_from_backup(in_file, relations),
#[cfg(feature = "storage-tikv")]
DbInstance::TiKv(db) => db.import_from_backup(in_file, relations),
}
}
/// Import relations from an Sqlite backup, with JSON string return value
pub fn import_from_backup_str(&self, payload: &str) -> String {
match self.import_from_backup_str_inner(payload) {
Ok(_) => json!({"ok": true}).to_string(),
Err(err) => json!({"ok": false, "message": err.to_string()}).to_string(),
}
}
fn import_from_backup_str_inner(&self, payload: &str) -> Result<()> {
#[derive(serde_derive::Deserialize)]
struct Payload {
path: String,
relations: Vec<String>,
}
let json_payload: Payload = serde_json::from_str(payload).into_diagnostic()?;
self.import_from_backup(&json_payload.path, &json_payload.relations)
}
}
lazy_static! {

@ -105,7 +105,7 @@ impl<'s, S: Storage<'s>> Db<S> {
pub fn run_script(
&'s self,
payload: &str,
params: &Map<String, JsonValue>,
params: &BTreeMap<String, DataValue>,
) -> Result<JsonValue> {
#[cfg(not(feature = "wasm"))]
let start = Instant::now();
@ -367,10 +367,10 @@ impl<'s, S: Storage<'s>> Db<S> {
bail!("backup requires the 'storage-sqlite' feature to be enabled")
}
/// Restore from an Sqlite backup
pub fn restore_backup(&'s self, in_file: String) -> Result<()> {
pub fn restore_backup(&'s self, in_file: &str) -> Result<()> {
#[cfg(feature = "storage-sqlite")]
{
let sqlite_db = crate::new_cozo_sqlite(in_file)?;
let sqlite_db = crate::new_cozo_sqlite(in_file.to_string())?;
let s_tx = sqlite_db.transact_write()?;
let store_id = s_tx.relation_store_id.load(Ordering::SeqCst);
if store_id != 0 {
@ -387,6 +387,39 @@ impl<'s, S: Storage<'s>> Db<S> {
#[cfg(not(feature = "storage-sqlite"))]
bail!("backup requires the 'storage-sqlite' feature to be enabled")
}
/// Import data from relations in a backup file
pub fn import_from_backup(&'s self, in_file: &str, relations: &[String]) -> Result<()> {
#[cfg(not(feature = "storage-sqlite"))]
bail!("backup requires the 'storage-sqlite' feature to be enabled");
#[cfg(feature = "storage-sqlite")]
{
let source_db = crate::new_cozo_sqlite(in_file.to_string())?;
let mut src_tx = source_db.transact()?;
let mut dst_tx = self.transact_write()?;
for relation in relations {
let src_handle = src_tx.get_relation(relation, false)?;
let dst_handle = dst_tx.get_relation(relation, true)?;
let src_lower = Tuple::default().encode_as_key(src_handle.id);
let src_upper = Tuple::default().encode_as_key(src_handle.id.next());
let data_it = src_tx.tx.range_scan(&src_lower, &src_upper).map(
|src_pair| -> Result<(Vec<u8>, Vec<u8>)> {
let (mut src_k, mut src_v) = src_pair?;
dst_handle.amend_key_prefix(&mut src_k);
dst_handle.amend_key_prefix(&mut src_v);
Ok((src_k, src_v))
},
);
dst_tx.tx.batch_put(Box::new(data_it))?;
}
src_tx.commit_tx()?;
dst_tx.commit_tx()
}
}
fn compact_relation(&'s self) -> Result<()> {
let l = Tuple::default().encode_as_key(RelationId(0));
@ -421,13 +454,9 @@ impl<'s, S: Storage<'s>> Db<S> {
fn do_run_script(
&'s self,
payload: &str,
params: &Map<String, JsonValue>,
param_pool: &BTreeMap<String, DataValue>,
) -> Result<JsonValue> {
let param_pool = params
.iter()
.map(|(k, v)| (k.clone(), DataValue::from(v)))
.collect();
match parse_script(payload, &param_pool)? {
match parse_script(payload, param_pool)? {
CozoScript::Multi(ps) => {
let is_write = ps.iter().any(|p| p.out_opts.store_relation.is_some());
let mut cleanups = vec![];

@ -123,6 +123,10 @@ impl RelationHandle {
ret.extend(prefix_bytes);
ret
}
pub(crate) fn amend_key_prefix(&self, data: &mut [u8]) {
let prefix_bytes = self.id.0.to_be_bytes();
data[0..8].copy_from_slice(&prefix_bytes);
}
pub(crate) fn encode_key_for_store(&self, tuple: &Tuple, span: SourceSpan) -> Result<Vec<u8>> {
let len = self.metadata.keys.len();
ensure!(

Loading…
Cancel
Save