From 7819436522387dcaa972059577ec8b5eef624a11 Mon Sep 17 00:00:00 2001 From: Ziyang Hu Date: Sat, 19 Nov 2022 16:50:07 +0800 Subject: [PATCH] DB dispatcher --- cozo-core/src/lib.rs | 263 +++++++++++++++++++++++++++++++++++- cozo-core/src/runtime/db.rs | 124 +---------------- cozo-lib-c/src/lib.rs | 111 ++------------- 3 files changed, 279 insertions(+), 219 deletions(-) diff --git a/cozo-core/src/lib.rs b/cozo-core/src/lib.rs index b2206eeb..921f155a 100644 --- a/cozo-core/src/lib.rs +++ b/cozo-core/src/lib.rs @@ -19,7 +19,7 @@ //! ``` //! use cozo::*; //! -//! let db = new_cozo_mem().unwrap(); +//! let db = DbInstance::new("mem", "", Default::default()).unwrap(); //! let script = "?[a] := a in [1, 2, 3]"; //! let result = db.run_script(script, &Default::default()).unwrap(); //! println!("{:?}", result); @@ -33,7 +33,10 @@ #![allow(clippy::type_complexity)] #![allow(clippy::too_many_arguments)] +use itertools::Itertools; pub use miette::Error; +use miette::{bail, miette, IntoDiagnostic, Result}; +use serde_json::{json, Map}; pub use runtime::db::Db; pub use runtime::relation::decode_tuple_from_kv; @@ -48,6 +51,9 @@ pub use storage::sqlite::{new_cozo_sqlite, SqliteStorage}; pub use storage::tikv::{new_cozo_tikv, TiKvStorage}; pub use storage::{Storage, StoreTx}; +use crate::data::json::JsonValue; +use crate::runtime::db::{JSON_ERR_HANDLER, TEXT_ERR_HANDLER}; + // pub use storage::re::{new_cozo_redb, ReStorage}; pub(crate) mod algo; @@ -57,3 +63,258 @@ pub(crate) mod query; pub(crate) mod runtime; pub(crate) mod storage; pub(crate) mod utils; + +#[derive(Clone)] +/// A dispatcher for concrete storage implementations, wrapping [crate::Db]. +/// Many methods are dispatching methods -- see the corresponding methods on [crate::Db] for more details. +pub enum DbInstance { + /// In memory storage (not persistent) + Mem(Db), + #[cfg(feature = "storage-sqlite")] + /// Sqlite storage + Sqlite(Db), + #[cfg(feature = "storage-rocksdb")] + /// RocksDB storage + RocksDb(Db), + #[cfg(feature = "storage-sled")] + /// Sled storage (experimental) + Sled(Db), + #[cfg(feature = "storage-tikv")] + /// TiKV storage (experimental) + TiKv(Db), +} + +impl DbInstance { + /// Create a DbInstance, which is a dispatcher for various concrete implementations. + /// The valid kinds are `mem`, `sqlite`, `rocksdb`, `sled` and `tikv`, + /// assuming all features are enabled during compilation. Otherwise only + /// some of the kinds are available. The `mem` kind is always available. + /// + /// `path` is ignored for `mem` and `tikv` kinds. + /// `options` is ignored for every kind except `tikv`. + #[allow(unused_variables)] + pub fn new(kind: &str, path: &str, options: JsonValue) -> Result { + Ok(match kind { + "mem" => Self::Mem(new_cozo_mem()?), + #[cfg(feature = "storage-sqlite")] + "sqlite" => Self::Sqlite(new_cozo_sqlite(path.to_string())?), + #[cfg(feature = "storage-rocksdb")] + "rocksdb" => Self::RocksDb(new_cozo_rocksdb(path)?), + #[cfg(feature = "storage-sled")] + "sled" => Self::Sled(new_cozo_sled(path)?), + #[cfg(feature = "storage-tikv")] + "tikv" => { + let end_points = options + .get("pd_endpoints") + .ok_or_else(|| miette!("required option 'pd_endpoints' not found"))?; + let end_points = end_points + .as_array() + .ok_or_else(|| miette!("option 'pd_endpoints' must be an array"))?; + let end_points: Vec<_> = end_points + .iter() + .map(|v| { + v.as_str() + .map(|s| s.to_string()) + .ok_or_else(|| "option 'pd_endpoints' must contain strings") + }) + .try_collect()?; + let optimistic = options.get("optimistic").unwrap_or(&JsonValue::Bool(true)); + let optimistic = optimistic + .as_bool() + .ok_or_else(|| miette!("option 'optimistic' must be a bool"))?; + Self::TiKv(new_cozo_tikv(end_points, optimistic)?) + } + kind => bail!( + "database kind '{}' not supported (maybe not compiled in)", + kind + ), + }) + } + /// Dispatcher method. See [crate::Db::run_script]. + pub fn run_script(&self, payload: &str, params: &Map) -> Result { + match self { + DbInstance::Mem(db) => db.run_script(payload, params), + #[cfg(feature = "storage-sqlite")] + DbInstance::Sqlite(db) => db.run_script(payload, params), + #[cfg(feature = "storage-rocksdb")] + DbInstance::RocksDb(db) => db.run_script(payload, params), + #[cfg(feature = "storage-sled")] + DbInstance::Sled(db) => db.run_script(payload, params), + #[cfg(feature = "storage-tikv")] + DbInstance::TiKv(db) => db.run_script(payload, params), + } + } + /// Run the CozoScript passed in. The `params` argument is a map of parameters. + /// Fold any error into the return JSON itself. + pub fn run_script_fold_err(&self, payload: &str, params: &Map) -> JsonValue { + match self.run_script(payload, params) { + Ok(json) => json, + Err(mut err) => { + if err.source_code().is_none() { + err = err.with_source_code(payload.to_string()); + } + let mut text_err = String::new(); + let mut json_err = String::new(); + TEXT_ERR_HANDLER + .render_report(&mut text_err, err.as_ref()) + .expect("render text error failed"); + JSON_ERR_HANDLER + .render_report(&mut json_err, err.as_ref()) + .expect("render json error failed"); + let mut json: serde_json::Value = + serde_json::from_str(&json_err).expect("parse rendered json error failed"); + let map = json.as_object_mut().unwrap(); + map.insert("ok".to_string(), json!(false)); + map.insert("display".to_string(), json!(text_err)); + json + } + } + } + /// Run the CozoScript passed in. The `params` argument is a map of parameters formatted as JSON. + pub fn run_script_str(&self, payload: &str, params: &str) -> String { + let params_json = if params.is_empty() { + Map::default() + } else { + match serde_json::from_str::(params) { + Ok(serde_json::Value::Object(map)) => map, + Ok(_) => { + return json!({"ok": false, "message": "params argument is not valid JSON"}) + .to_string() + } + Err(_) => { + return json!({"ok": false, "message": "params argument is not a JSON map"}) + .to_string() + } + } + }; + self.run_script_fold_err(payload, ¶ms_json).to_string() + } + /// Dispatcher method. See [crate::Db::export_relations]. + pub fn export_relations<'a>( + &self, + relations: impl Iterator, + ) -> Result { + match self { + DbInstance::Mem(db) => db.export_relations(relations), + #[cfg(feature = "storage-sqlite")] + DbInstance::Sqlite(db) => db.export_relations(relations), + #[cfg(feature = "storage-rocksdb")] + DbInstance::RocksDb(db) => db.export_relations(relations), + #[cfg(feature = "storage-sled")] + DbInstance::Sled(db) => db.export_relations(relations), + #[cfg(feature = "storage-tikv")] + DbInstance::TiKv(db) => db.export_relations(relations), + } + } + /// Export relations to JSON-encoded string + pub fn export_relations_str(&self, relations_str: &str) -> String { + match self.export_relations_str_inner(relations_str) { + Ok(s) => { + let ret = json!({"ok": true, "data": s}); + format!("{}", ret) + } + Err(err) => { + let ret = json!({"ok": false, "message": err.to_string()}); + format!("{}", ret) + } + } + } + fn export_relations_str_inner(&self, relations_str: &str) -> Result { + let j_val: JsonValue = serde_json::from_str(relations_str).into_diagnostic()?; + let v = j_val + .as_array() + .ok_or_else(|| miette!("expects an array"))?; + let relations: Vec<_> = v + .iter() + .map(|name| { + name.as_str() + .ok_or_else(|| miette!("expects an array of string names")) + }) + .try_collect()?; + let results = self.export_relations(relations.into_iter())?; + Ok(results) + } + /// Dispatcher method. See [crate::Db::import_relations]. + pub fn import_relation(&self, relation: &str, in_data: &[JsonValue]) -> Result<()> { + match self { + DbInstance::Mem(db) => db.import_relation(relation, in_data), + #[cfg(feature = "storage-sqlite")] + DbInstance::Sqlite(db) => db.import_relation(relation, in_data), + #[cfg(feature = "storage-rocksdb")] + DbInstance::RocksDb(db) => db.import_relation(relation, in_data), + #[cfg(feature = "storage-sled")] + DbInstance::Sled(db) => db.import_relation(relation, in_data), + #[cfg(feature = "storage-tikv")] + DbInstance::TiKv(db) => db.import_relation(relation, in_data), + } + } + /// Import a relation, the data is given as a JSON string, and the returned result is converted into a string + pub fn import_relation_str(&self, data: &str) -> String { + match self.import_relation_str_inner(data) { + Ok(()) => { + format!("{}", json!({"ok": true})) + } + Err(err) => { + format!("{}", json!({"ok": false, "message": err.to_string()})) + } + } + } + fn import_relation_str_inner(&self, data: &str) -> Result<()> { + let j_obj: JsonValue = serde_json::from_str(data).into_diagnostic()?; + let name_val = j_obj + .get("relation") + .ok_or_else(|| miette!("key 'relation' required"))?; + let name = name_val + .as_str() + .ok_or_else(|| miette!("field 'relation' must be a string"))?; + let data_val = j_obj + .get("data") + .ok_or_else(|| miette!("key 'data' required"))?; + let data = data_val + .as_array() + .ok_or_else(|| miette!("field 'data' must be an array"))?; + self.import_relation(name, data) + } + /// Dispatcher method. See [crate::Db::backup_db]. + pub fn backup_db(&self, out_file: String) -> Result<()> { + match self { + DbInstance::Mem(db) => db.backup_db(out_file), + #[cfg(feature = "storage-sqlite")] + DbInstance::Sqlite(db) => db.backup_db(out_file), + #[cfg(feature = "storage-rocksdb")] + DbInstance::RocksDb(db) => db.backup_db(out_file), + #[cfg(feature = "storage-sled")] + DbInstance::Sled(db) => db.backup_db(out_file), + #[cfg(feature = "storage-tikv")] + DbInstance::TiKv(db) => db.backup_db(out_file), + } + } + /// Backup the running database into an Sqlite file, with JSON string return value + pub fn backup_db_str(&self, out_file: &str) -> String { + match self.backup_db(out_file.to_string()) { + Ok(_) => json!({"ok": true}).to_string(), + Err(err) => json!({"ok": false, "message": err.to_string()}).to_string(), + } + } + /// Restore from an Sqlite backup + pub fn restore_backup(&self, in_file: String) -> Result<()> { + match self { + DbInstance::Mem(db) => db.restore_backup(in_file), + #[cfg(feature = "storage-sqlite")] + DbInstance::Sqlite(db) => db.restore_backup(in_file), + #[cfg(feature = "storage-rocksdb")] + DbInstance::RocksDb(db) => db.restore_backup(in_file), + #[cfg(feature = "storage-sled")] + DbInstance::Sled(db) => db.restore_backup(in_file), + #[cfg(feature = "storage-tikv")] + DbInstance::TiKv(db) => db.restore_backup(in_file), + } + } + /// Restore from an Sqlite backup, with JSON string return value + pub fn restore_backup_str(&self, in_file: &str) -> String { + match self.restore_backup(in_file.to_string()) { + Ok(_) => json!({"ok": true}).to_string(), + Err(err) => json!({"ok": false, "message": err.to_string()}).to_string(), + } + } +} diff --git a/cozo-core/src/runtime/db.rs b/cozo-core/src/runtime/db.rs index a788c42a..39d2eede 100644 --- a/cozo-core/src/runtime/db.rs +++ b/cozo-core/src/runtime/db.rs @@ -18,7 +18,7 @@ use either::{Left, Right}; use itertools::Itertools; use lazy_static::lazy_static; use miette::{ - bail, ensure, miette, Diagnostic, GraphicalReportHandler, GraphicalTheme, IntoDiagnostic, + bail, ensure, Diagnostic, GraphicalReportHandler, GraphicalTheme, IntoDiagnostic, JSONReportHandler, Result, WrapErr, }; use serde_json::{json, Map}; @@ -86,9 +86,9 @@ impl Debug for Db { pub(crate) struct BadDbInit(#[help] pub(crate) String); lazy_static! { - static ref TEXT_ERR_HANDLER: GraphicalReportHandler = + pub static ref TEXT_ERR_HANDLER: GraphicalReportHandler = miette::GraphicalReportHandler::new().with_theme(GraphicalTheme::unicode()); - static ref JSON_ERR_HANDLER: JSONReportHandler = miette::JSONReportHandler::new(); + pub static ref JSON_ERR_HANDLER: JSONReportHandler = miette::JSONReportHandler::new(); } impl<'s, S: Storage<'s>> Db { @@ -134,55 +134,6 @@ impl<'s, S: Storage<'s>> Db { err => err, } } - /// Run the CozoScript passed in. The `params` argument is a map of parameters. - /// Fold any error into the return JSON itself. - pub fn run_script_fold_err( - &'s self, - payload: &str, - params: &Map, - ) -> JsonValue { - match self.run_script(payload, params) { - Ok(json) => json, - Err(mut err) => { - if err.source_code().is_none() { - err = err.with_source_code(payload.to_string()); - } - let mut text_err = String::new(); - let mut json_err = String::new(); - TEXT_ERR_HANDLER - .render_report(&mut text_err, err.as_ref()) - .expect("render text error failed"); - JSON_ERR_HANDLER - .render_report(&mut json_err, err.as_ref()) - .expect("render json error failed"); - let mut json: serde_json::Value = - serde_json::from_str(&json_err).expect("parse rendered json error failed"); - let map = json.as_object_mut().unwrap(); - map.insert("ok".to_string(), json!(false)); - map.insert("display".to_string(), json!(text_err)); - json - } - } - } - /// Run the CozoScript passed in. The `params` argument is a map of parameters formatted as JSON. - pub fn run_script_str(&'s self, payload: &str, params: &str) -> String { - let params_json = if params.is_empty() { - Map::default() - } else { - match serde_json::from_str::(params) { - Ok(serde_json::Value::Object(map)) => map, - Ok(_) => { - return json!({"ok": false, "message": "params argument is not valid JSON"}) - .to_string() - } - Err(_) => { - return json!({"ok": false, "message": "params argument is not a JSON map"}) - .to_string() - } - } - }; - self.run_script_fold_err(payload, ¶ms_json).to_string() - } /// Export relations to JSON data. pub fn export_relations<'a>( &'s self, @@ -225,34 +176,6 @@ impl<'s, S: Storage<'s>> Db { } Ok(JsonValue::Object(ret)) } - /// Export relations to JSON-encoded string - pub fn export_relations_str(&'s self, relations_str: &str) -> String { - match self.export_relations_str_inner(relations_str) { - Ok(s) => { - let ret = json!({"ok": true, "data": s}); - format!("{}", ret) - } - Err(err) => { - let ret = json!({"ok": false, "message": err.to_string()}); - format!("{}", ret) - } - } - } - fn export_relations_str_inner(&'s self, relations_str: &str) -> Result { - let j_val: JsonValue = serde_json::from_str(relations_str).into_diagnostic()?; - let v = j_val - .as_array() - .ok_or_else(|| miette!("expects an array"))?; - let relations: Vec<_> = v - .iter() - .map(|name| { - name.as_str() - .ok_or_else(|| miette!("expects an array of string names")) - }) - .try_collect()?; - let results = self.export_relations(relations.into_iter())?; - Ok(results) - } /// Import a relation pub fn import_relation(&'s self, relation: &str, in_data: &[JsonValue]) -> Result<()> { #[derive(Debug, Diagnostic, Error)] @@ -289,33 +212,6 @@ impl<'s, S: Storage<'s>> Db { } Ok(()) } - /// Import a relation, the data is given as a JSON string, and the returned result is converted into a string - pub fn import_relation_str(&'s self, data: &str) -> String { - match self.import_relation_str_inner(data) { - Ok(()) => { - format!("{}", json!({"ok": true})) - } - Err(err) => { - format!("{}", json!({"ok": false, "message": err.to_string()})) - } - } - } - fn import_relation_str_inner(&'s self, data: &str) -> Result<()> { - let j_obj: JsonValue = serde_json::from_str(data).into_diagnostic()?; - let name_val = j_obj - .get("relation") - .ok_or_else(|| miette!("key 'relation' required"))?; - let name = name_val - .as_str() - .ok_or_else(|| miette!("field 'relation' must be a string"))?; - let data_val = j_obj - .get("data") - .ok_or_else(|| miette!("key 'data' required"))?; - let data = data_val - .as_array() - .ok_or_else(|| miette!("field 'data' must be an array"))?; - self.import_relation(name, data) - } /// Backup the running database into an Sqlite file pub fn backup_db(&'s self, out_file: String) -> Result<()> { #[cfg(feature = "storage-sqlite")] @@ -330,13 +226,6 @@ impl<'s, S: Storage<'s>> Db { #[cfg(not(feature = "storage-sqlite"))] bail!("backup requires the 'storage-sqlite' feature to be enabled") } - /// Backup the running database into an Sqlite file, with JSON string return value - pub fn backup_db_str(&'s self, out_file: &str) -> String { - match self.backup_db(out_file.to_string()) { - Ok(_) => json!({"ok": true}).to_string(), - Err(err) => json!({"ok": false, "message": err.to_string()}).to_string(), - } - } /// Restore from an Sqlite backup pub fn restore_backup(&'s self, in_file: String) -> Result<()> { #[cfg(feature = "storage-sqlite")] @@ -351,13 +240,6 @@ impl<'s, S: Storage<'s>> Db { #[cfg(not(feature = "storage-sqlite"))] bail!("backup requires the 'storage-sqlite' feature to be enabled") } - /// Restore from an Sqlite backup, with JSON string return value - pub fn restore_backup_str(&'s self, in_file: &str) -> String { - match self.restore_backup(in_file.to_string()) { - Ok(_) => json!({"ok": true}).to_string(), - Err(err) => json!({"ok": false, "message": err.to_string()}).to_string(), - } - } fn compact_relation(&'s self) -> Result<()> { let l = Tuple::default().encode_as_key(RelationId(0)); diff --git a/cozo-lib-c/src/lib.rs b/cozo-lib-c/src/lib.rs index 899e4ab1..742fdf50 100644 --- a/cozo-lib-c/src/lib.rs +++ b/cozo-lib-c/src/lib.rs @@ -17,95 +17,6 @@ use lazy_static::lazy_static; use cozo::*; -#[derive(Clone)] -enum DbInstance { - Mem(Db), - #[cfg(feature = "storage-sqlite")] - Sqlite(Db), - #[cfg(feature = "storage-rocksdb")] - RocksDb(Db), -} - -impl DbInstance { - fn new(engine: &str, path: &str) -> Result { - match engine { - "mem" => Ok(Self::Mem(new_cozo_mem().map_err(|err| err.to_string())?)), - "sqlite" => { - #[cfg(feature = "storage-sqlite")] - { - return Ok(Self::Sqlite( - new_cozo_sqlite(path.to_string()).map_err(|err| err.to_string())?, - )); - } - - #[cfg(not(feature = "storage-sqlite"))] - { - return Err("support for sqlite not compiled".to_string()); - } - } - "rocksdb" => { - #[cfg(feature = "storage-rocksdb")] - { - return Ok(Self::RocksDb( - new_cozo_rocksdb(path.to_string()).map_err(|err| err.to_string())?, - )); - } - - #[cfg(not(feature = "storage-rocksdb"))] - { - return Err("support for rocksdb not compiled".to_string()); - } - } - _ => Err(format!("unsupported engine: {}", engine)), - } - } - fn run_script_str(&self, payload: &str, params: &str) -> String { - match self { - DbInstance::Mem(db) => db.run_script_str(payload, params), - #[cfg(feature = "storage-sqlite")] - DbInstance::Sqlite(db) => db.run_script_str(payload, params), - #[cfg(feature = "storage-rocksdb")] - DbInstance::RocksDb(db) => db.run_script_str(payload, params), - } - } - fn import_relations(&self, data: &str) -> String { - match self { - DbInstance::Mem(db) => db.import_relation_str(data), - #[cfg(feature = "storage-sqlite")] - DbInstance::Sqlite(db) => db.import_relation_str(data), - #[cfg(feature = "storage-rocksdb")] - DbInstance::RocksDb(db) => db.import_relation_str(data), - } - } - fn export_relations(&self, data: &str) -> String { - match self { - DbInstance::Mem(db) => db.export_relations_str(data), - #[cfg(feature = "storage-sqlite")] - DbInstance::Sqlite(db) => db.export_relations_str(data), - #[cfg(feature = "storage-rocksdb")] - DbInstance::RocksDb(db) => db.export_relations_str(data), - } - } - fn backup(&self, path: &str) -> String { - match self { - DbInstance::Mem(db) => db.backup_db_str(path), - #[cfg(feature = "storage-sqlite")] - DbInstance::Sqlite(db) => db.backup_db_str(path), - #[cfg(feature = "storage-rocksdb")] - DbInstance::RocksDb(db) => db.backup_db_str(path), - } - } - fn restore(&self, path: &str) -> String { - match self { - DbInstance::Mem(db) => db.restore_backup_str(path), - #[cfg(feature = "storage-sqlite")] - DbInstance::Sqlite(db) => db.restore_backup_str(path), - #[cfg(feature = "storage-rocksdb")] - DbInstance::RocksDb(db) => db.restore_backup_str(path), - } - } -} - struct Handles { current: AtomicI32, dbs: Mutex>, @@ -133,19 +44,19 @@ pub unsafe extern "C" fn cozo_open_db( path: *const c_char, db_id: &mut i32, ) -> *mut c_char { - let path = match CStr::from_ptr(path).to_str() { + let engine = match CStr::from_ptr(engine).to_str() { Ok(p) => p, Err(err) => return CString::new(format!("{}", err)).unwrap().into_raw(), }; - let engine = match CStr::from_ptr(engine).to_str() { + let path = match CStr::from_ptr(path).to_str() { Ok(p) => p, Err(err) => return CString::new(format!("{}", err)).unwrap().into_raw(), }; - let db = match DbInstance::new(engine, path) { + let db = match DbInstance::new(engine, path, Default::default()) { Ok(db) => db, - Err(err) => return CString::new(err).unwrap().into_raw(), + Err(err) => return CString::new(err.to_string()).unwrap().into_raw(), }; let id = HANDLES.current.fetch_add(1, Ordering::AcqRel); @@ -254,7 +165,9 @@ pub unsafe extern "C" fn cozo_import_relation( Ok(p) => p, Err(err) => return CString::new(format!("{}", err)).unwrap().into_raw(), }; - CString::new(db.import_relations(data)).unwrap().into_raw() + CString::new(db.import_relation_str(data)) + .unwrap() + .into_raw() } #[no_mangle] @@ -286,7 +199,9 @@ pub unsafe extern "C" fn cozo_export_relations( Ok(p) => p, Err(err) => return CString::new(format!("{}", err)).unwrap().into_raw(), }; - CString::new(db.export_relations(data)).unwrap().into_raw() + CString::new(db.export_relations_str(data)) + .unwrap() + .into_raw() } #[no_mangle] @@ -315,7 +230,7 @@ pub unsafe extern "C" fn cozo_backup(db_id: i32, out_path: *const c_char) -> *mu Ok(p) => p, Err(err) => return CString::new(format!("{}", err)).unwrap().into_raw(), }; - CString::new(db.backup(data)).unwrap().into_raw() + CString::new(db.backup_db_str(data)).unwrap().into_raw() } #[no_mangle] @@ -344,7 +259,9 @@ pub unsafe extern "C" fn cozo_restore(db_id: i32, in_path: *const c_char) -> *mu Ok(p) => p, Err(err) => return CString::new(format!("{}", err)).unwrap().into_raw(), }; - CString::new(db.restore(data)).unwrap().into_raw() + CString::new(db.restore_backup_str(data)) + .unwrap() + .into_raw() } /// Free any C-string returned from the Cozo C API.