DB dispatcher

main
Ziyang Hu 2 years ago
parent 2a22b28301
commit 7819436522

@ -19,7 +19,7 @@
//! ```
//! use cozo::*;
//!
//! let db = new_cozo_mem().unwrap();
//! let db = DbInstance::new("mem", "", Default::default()).unwrap();
//! let script = "?[a] := a in [1, 2, 3]";
//! let result = db.run_script(script, &Default::default()).unwrap();
//! println!("{:?}", result);
@ -33,7 +33,10 @@
#![allow(clippy::type_complexity)]
#![allow(clippy::too_many_arguments)]
use itertools::Itertools;
pub use miette::Error;
use miette::{bail, miette, IntoDiagnostic, Result};
use serde_json::{json, Map};
pub use runtime::db::Db;
pub use runtime::relation::decode_tuple_from_kv;
@ -48,6 +51,9 @@ pub use storage::sqlite::{new_cozo_sqlite, SqliteStorage};
pub use storage::tikv::{new_cozo_tikv, TiKvStorage};
pub use storage::{Storage, StoreTx};
use crate::data::json::JsonValue;
use crate::runtime::db::{JSON_ERR_HANDLER, TEXT_ERR_HANDLER};
// pub use storage::re::{new_cozo_redb, ReStorage};
pub(crate) mod algo;
@ -57,3 +63,258 @@ pub(crate) mod query;
pub(crate) mod runtime;
pub(crate) mod storage;
pub(crate) mod utils;
#[derive(Clone)]
/// A dispatcher for concrete storage implementations, wrapping [crate::Db].
/// Many methods are dispatching methods -- see the corresponding methods on [crate::Db] for more details.
pub enum DbInstance {
/// In memory storage (not persistent)
Mem(Db<MemStorage>),
#[cfg(feature = "storage-sqlite")]
/// Sqlite storage
Sqlite(Db<SqliteStorage>),
#[cfg(feature = "storage-rocksdb")]
/// RocksDB storage
RocksDb(Db<RocksDbStorage>),
#[cfg(feature = "storage-sled")]
/// Sled storage (experimental)
Sled(Db<SledStorage>),
#[cfg(feature = "storage-tikv")]
/// TiKV storage (experimental)
TiKv(Db<TiKvStorage>),
}
impl DbInstance {
/// Create a DbInstance, which is a dispatcher for various concrete implementations.
/// The valid kinds are `mem`, `sqlite`, `rocksdb`, `sled` and `tikv`,
/// assuming all features are enabled during compilation. Otherwise only
/// some of the kinds are available. The `mem` kind is always available.
///
/// `path` is ignored for `mem` and `tikv` kinds.
/// `options` is ignored for every kind except `tikv`.
#[allow(unused_variables)]
pub fn new(kind: &str, path: &str, options: JsonValue) -> Result<Self> {
Ok(match kind {
"mem" => Self::Mem(new_cozo_mem()?),
#[cfg(feature = "storage-sqlite")]
"sqlite" => Self::Sqlite(new_cozo_sqlite(path.to_string())?),
#[cfg(feature = "storage-rocksdb")]
"rocksdb" => Self::RocksDb(new_cozo_rocksdb(path)?),
#[cfg(feature = "storage-sled")]
"sled" => Self::Sled(new_cozo_sled(path)?),
#[cfg(feature = "storage-tikv")]
"tikv" => {
let end_points = options
.get("pd_endpoints")
.ok_or_else(|| miette!("required option 'pd_endpoints' not found"))?;
let end_points = end_points
.as_array()
.ok_or_else(|| miette!("option 'pd_endpoints' must be an array"))?;
let end_points: Vec<_> = end_points
.iter()
.map(|v| {
v.as_str()
.map(|s| s.to_string())
.ok_or_else(|| "option 'pd_endpoints' must contain strings")
})
.try_collect()?;
let optimistic = options.get("optimistic").unwrap_or(&JsonValue::Bool(true));
let optimistic = optimistic
.as_bool()
.ok_or_else(|| miette!("option 'optimistic' must be a bool"))?;
Self::TiKv(new_cozo_tikv(end_points, optimistic)?)
}
kind => bail!(
"database kind '{}' not supported (maybe not compiled in)",
kind
),
})
}
/// Dispatcher method. See [crate::Db::run_script].
pub fn run_script(&self, payload: &str, params: &Map<String, JsonValue>) -> Result<JsonValue> {
match self {
DbInstance::Mem(db) => db.run_script(payload, params),
#[cfg(feature = "storage-sqlite")]
DbInstance::Sqlite(db) => db.run_script(payload, params),
#[cfg(feature = "storage-rocksdb")]
DbInstance::RocksDb(db) => db.run_script(payload, params),
#[cfg(feature = "storage-sled")]
DbInstance::Sled(db) => db.run_script(payload, params),
#[cfg(feature = "storage-tikv")]
DbInstance::TiKv(db) => db.run_script(payload, params),
}
}
/// Run the CozoScript passed in. The `params` argument is a map of parameters.
/// Fold any error into the return JSON itself.
pub fn run_script_fold_err(&self, payload: &str, params: &Map<String, JsonValue>) -> JsonValue {
match self.run_script(payload, params) {
Ok(json) => json,
Err(mut err) => {
if err.source_code().is_none() {
err = err.with_source_code(payload.to_string());
}
let mut text_err = String::new();
let mut json_err = String::new();
TEXT_ERR_HANDLER
.render_report(&mut text_err, err.as_ref())
.expect("render text error failed");
JSON_ERR_HANDLER
.render_report(&mut json_err, err.as_ref())
.expect("render json error failed");
let mut json: serde_json::Value =
serde_json::from_str(&json_err).expect("parse rendered json error failed");
let map = json.as_object_mut().unwrap();
map.insert("ok".to_string(), json!(false));
map.insert("display".to_string(), json!(text_err));
json
}
}
}
/// Run the CozoScript passed in. The `params` argument is a map of parameters formatted as JSON.
pub fn run_script_str(&self, payload: &str, params: &str) -> String {
let params_json = if params.is_empty() {
Map::default()
} else {
match serde_json::from_str::<serde_json::Value>(params) {
Ok(serde_json::Value::Object(map)) => map,
Ok(_) => {
return json!({"ok": false, "message": "params argument is not valid JSON"})
.to_string()
}
Err(_) => {
return json!({"ok": false, "message": "params argument is not a JSON map"})
.to_string()
}
}
};
self.run_script_fold_err(payload, &params_json).to_string()
}
/// Dispatcher method. See [crate::Db::export_relations].
pub fn export_relations<'a>(
&self,
relations: impl Iterator<Item = &'a str>,
) -> Result<JsonValue> {
match self {
DbInstance::Mem(db) => db.export_relations(relations),
#[cfg(feature = "storage-sqlite")]
DbInstance::Sqlite(db) => db.export_relations(relations),
#[cfg(feature = "storage-rocksdb")]
DbInstance::RocksDb(db) => db.export_relations(relations),
#[cfg(feature = "storage-sled")]
DbInstance::Sled(db) => db.export_relations(relations),
#[cfg(feature = "storage-tikv")]
DbInstance::TiKv(db) => db.export_relations(relations),
}
}
/// Export relations to JSON-encoded string
pub fn export_relations_str(&self, relations_str: &str) -> String {
match self.export_relations_str_inner(relations_str) {
Ok(s) => {
let ret = json!({"ok": true, "data": s});
format!("{}", ret)
}
Err(err) => {
let ret = json!({"ok": false, "message": err.to_string()});
format!("{}", ret)
}
}
}
fn export_relations_str_inner(&self, relations_str: &str) -> Result<JsonValue> {
let j_val: JsonValue = serde_json::from_str(relations_str).into_diagnostic()?;
let v = j_val
.as_array()
.ok_or_else(|| miette!("expects an array"))?;
let relations: Vec<_> = v
.iter()
.map(|name| {
name.as_str()
.ok_or_else(|| miette!("expects an array of string names"))
})
.try_collect()?;
let results = self.export_relations(relations.into_iter())?;
Ok(results)
}
/// Dispatcher method. See [crate::Db::import_relations].
pub fn import_relation(&self, relation: &str, in_data: &[JsonValue]) -> Result<()> {
match self {
DbInstance::Mem(db) => db.import_relation(relation, in_data),
#[cfg(feature = "storage-sqlite")]
DbInstance::Sqlite(db) => db.import_relation(relation, in_data),
#[cfg(feature = "storage-rocksdb")]
DbInstance::RocksDb(db) => db.import_relation(relation, in_data),
#[cfg(feature = "storage-sled")]
DbInstance::Sled(db) => db.import_relation(relation, in_data),
#[cfg(feature = "storage-tikv")]
DbInstance::TiKv(db) => db.import_relation(relation, in_data),
}
}
/// Import a relation, the data is given as a JSON string, and the returned result is converted into a string
pub fn import_relation_str(&self, data: &str) -> String {
match self.import_relation_str_inner(data) {
Ok(()) => {
format!("{}", json!({"ok": true}))
}
Err(err) => {
format!("{}", json!({"ok": false, "message": err.to_string()}))
}
}
}
fn import_relation_str_inner(&self, data: &str) -> Result<()> {
let j_obj: JsonValue = serde_json::from_str(data).into_diagnostic()?;
let name_val = j_obj
.get("relation")
.ok_or_else(|| miette!("key 'relation' required"))?;
let name = name_val
.as_str()
.ok_or_else(|| miette!("field 'relation' must be a string"))?;
let data_val = j_obj
.get("data")
.ok_or_else(|| miette!("key 'data' required"))?;
let data = data_val
.as_array()
.ok_or_else(|| miette!("field 'data' must be an array"))?;
self.import_relation(name, data)
}
/// Dispatcher method. See [crate::Db::backup_db].
pub fn backup_db(&self, out_file: String) -> Result<()> {
match self {
DbInstance::Mem(db) => db.backup_db(out_file),
#[cfg(feature = "storage-sqlite")]
DbInstance::Sqlite(db) => db.backup_db(out_file),
#[cfg(feature = "storage-rocksdb")]
DbInstance::RocksDb(db) => db.backup_db(out_file),
#[cfg(feature = "storage-sled")]
DbInstance::Sled(db) => db.backup_db(out_file),
#[cfg(feature = "storage-tikv")]
DbInstance::TiKv(db) => db.backup_db(out_file),
}
}
/// Backup the running database into an Sqlite file, with JSON string return value
pub fn backup_db_str(&self, out_file: &str) -> String {
match self.backup_db(out_file.to_string()) {
Ok(_) => json!({"ok": true}).to_string(),
Err(err) => json!({"ok": false, "message": err.to_string()}).to_string(),
}
}
/// Restore from an Sqlite backup
pub fn restore_backup(&self, in_file: String) -> Result<()> {
match self {
DbInstance::Mem(db) => db.restore_backup(in_file),
#[cfg(feature = "storage-sqlite")]
DbInstance::Sqlite(db) => db.restore_backup(in_file),
#[cfg(feature = "storage-rocksdb")]
DbInstance::RocksDb(db) => db.restore_backup(in_file),
#[cfg(feature = "storage-sled")]
DbInstance::Sled(db) => db.restore_backup(in_file),
#[cfg(feature = "storage-tikv")]
DbInstance::TiKv(db) => db.restore_backup(in_file),
}
}
/// Restore from an Sqlite backup, with JSON string return value
pub fn restore_backup_str(&self, in_file: &str) -> String {
match self.restore_backup(in_file.to_string()) {
Ok(_) => json!({"ok": true}).to_string(),
Err(err) => json!({"ok": false, "message": err.to_string()}).to_string(),
}
}
}

@ -18,7 +18,7 @@ use either::{Left, Right};
use itertools::Itertools;
use lazy_static::lazy_static;
use miette::{
bail, ensure, miette, Diagnostic, GraphicalReportHandler, GraphicalTheme, IntoDiagnostic,
bail, ensure, Diagnostic, GraphicalReportHandler, GraphicalTheme, IntoDiagnostic,
JSONReportHandler, Result, WrapErr,
};
use serde_json::{json, Map};
@ -86,9 +86,9 @@ impl<S> Debug for Db<S> {
pub(crate) struct BadDbInit(#[help] pub(crate) String);
lazy_static! {
static ref TEXT_ERR_HANDLER: GraphicalReportHandler =
pub static ref TEXT_ERR_HANDLER: GraphicalReportHandler =
miette::GraphicalReportHandler::new().with_theme(GraphicalTheme::unicode());
static ref JSON_ERR_HANDLER: JSONReportHandler = miette::JSONReportHandler::new();
pub static ref JSON_ERR_HANDLER: JSONReportHandler = miette::JSONReportHandler::new();
}
impl<'s, S: Storage<'s>> Db<S> {
@ -134,55 +134,6 @@ impl<'s, S: Storage<'s>> Db<S> {
err => err,
}
}
/// Run the CozoScript passed in. The `params` argument is a map of parameters.
/// Fold any error into the return JSON itself.
pub fn run_script_fold_err(
&'s self,
payload: &str,
params: &Map<String, JsonValue>,
) -> JsonValue {
match self.run_script(payload, params) {
Ok(json) => json,
Err(mut err) => {
if err.source_code().is_none() {
err = err.with_source_code(payload.to_string());
}
let mut text_err = String::new();
let mut json_err = String::new();
TEXT_ERR_HANDLER
.render_report(&mut text_err, err.as_ref())
.expect("render text error failed");
JSON_ERR_HANDLER
.render_report(&mut json_err, err.as_ref())
.expect("render json error failed");
let mut json: serde_json::Value =
serde_json::from_str(&json_err).expect("parse rendered json error failed");
let map = json.as_object_mut().unwrap();
map.insert("ok".to_string(), json!(false));
map.insert("display".to_string(), json!(text_err));
json
}
}
}
/// Run the CozoScript passed in. The `params` argument is a map of parameters formatted as JSON.
pub fn run_script_str(&'s self, payload: &str, params: &str) -> String {
let params_json = if params.is_empty() {
Map::default()
} else {
match serde_json::from_str::<serde_json::Value>(params) {
Ok(serde_json::Value::Object(map)) => map,
Ok(_) => {
return json!({"ok": false, "message": "params argument is not valid JSON"})
.to_string()
}
Err(_) => {
return json!({"ok": false, "message": "params argument is not a JSON map"})
.to_string()
}
}
};
self.run_script_fold_err(payload, &params_json).to_string()
}
/// Export relations to JSON data.
pub fn export_relations<'a>(
&'s self,
@ -225,34 +176,6 @@ impl<'s, S: Storage<'s>> Db<S> {
}
Ok(JsonValue::Object(ret))
}
/// Export relations to JSON-encoded string
pub fn export_relations_str(&'s self, relations_str: &str) -> String {
match self.export_relations_str_inner(relations_str) {
Ok(s) => {
let ret = json!({"ok": true, "data": s});
format!("{}", ret)
}
Err(err) => {
let ret = json!({"ok": false, "message": err.to_string()});
format!("{}", ret)
}
}
}
fn export_relations_str_inner(&'s self, relations_str: &str) -> Result<JsonValue> {
let j_val: JsonValue = serde_json::from_str(relations_str).into_diagnostic()?;
let v = j_val
.as_array()
.ok_or_else(|| miette!("expects an array"))?;
let relations: Vec<_> = v
.iter()
.map(|name| {
name.as_str()
.ok_or_else(|| miette!("expects an array of string names"))
})
.try_collect()?;
let results = self.export_relations(relations.into_iter())?;
Ok(results)
}
/// Import a relation
pub fn import_relation(&'s self, relation: &str, in_data: &[JsonValue]) -> Result<()> {
#[derive(Debug, Diagnostic, Error)]
@ -289,33 +212,6 @@ impl<'s, S: Storage<'s>> Db<S> {
}
Ok(())
}
/// Import a relation, the data is given as a JSON string, and the returned result is converted into a string
pub fn import_relation_str(&'s self, data: &str) -> String {
match self.import_relation_str_inner(data) {
Ok(()) => {
format!("{}", json!({"ok": true}))
}
Err(err) => {
format!("{}", json!({"ok": false, "message": err.to_string()}))
}
}
}
fn import_relation_str_inner(&'s self, data: &str) -> Result<()> {
let j_obj: JsonValue = serde_json::from_str(data).into_diagnostic()?;
let name_val = j_obj
.get("relation")
.ok_or_else(|| miette!("key 'relation' required"))?;
let name = name_val
.as_str()
.ok_or_else(|| miette!("field 'relation' must be a string"))?;
let data_val = j_obj
.get("data")
.ok_or_else(|| miette!("key 'data' required"))?;
let data = data_val
.as_array()
.ok_or_else(|| miette!("field 'data' must be an array"))?;
self.import_relation(name, data)
}
/// Backup the running database into an Sqlite file
pub fn backup_db(&'s self, out_file: String) -> Result<()> {
#[cfg(feature = "storage-sqlite")]
@ -330,13 +226,6 @@ impl<'s, S: Storage<'s>> Db<S> {
#[cfg(not(feature = "storage-sqlite"))]
bail!("backup requires the 'storage-sqlite' feature to be enabled")
}
/// Backup the running database into an Sqlite file, with JSON string return value
pub fn backup_db_str(&'s self, out_file: &str) -> String {
match self.backup_db(out_file.to_string()) {
Ok(_) => json!({"ok": true}).to_string(),
Err(err) => json!({"ok": false, "message": err.to_string()}).to_string(),
}
}
/// Restore from an Sqlite backup
pub fn restore_backup(&'s self, in_file: String) -> Result<()> {
#[cfg(feature = "storage-sqlite")]
@ -351,13 +240,6 @@ impl<'s, S: Storage<'s>> Db<S> {
#[cfg(not(feature = "storage-sqlite"))]
bail!("backup requires the 'storage-sqlite' feature to be enabled")
}
/// Restore from an Sqlite backup, with JSON string return value
pub fn restore_backup_str(&'s self, in_file: &str) -> String {
match self.restore_backup(in_file.to_string()) {
Ok(_) => json!({"ok": true}).to_string(),
Err(err) => json!({"ok": false, "message": err.to_string()}).to_string(),
}
}
fn compact_relation(&'s self) -> Result<()> {
let l = Tuple::default().encode_as_key(RelationId(0));

@ -17,95 +17,6 @@ use lazy_static::lazy_static;
use cozo::*;
#[derive(Clone)]
enum DbInstance {
Mem(Db<MemStorage>),
#[cfg(feature = "storage-sqlite")]
Sqlite(Db<SqliteStorage>),
#[cfg(feature = "storage-rocksdb")]
RocksDb(Db<RocksDbStorage>),
}
impl DbInstance {
fn new(engine: &str, path: &str) -> Result<Self, String> {
match engine {
"mem" => Ok(Self::Mem(new_cozo_mem().map_err(|err| err.to_string())?)),
"sqlite" => {
#[cfg(feature = "storage-sqlite")]
{
return Ok(Self::Sqlite(
new_cozo_sqlite(path.to_string()).map_err(|err| err.to_string())?,
));
}
#[cfg(not(feature = "storage-sqlite"))]
{
return Err("support for sqlite not compiled".to_string());
}
}
"rocksdb" => {
#[cfg(feature = "storage-rocksdb")]
{
return Ok(Self::RocksDb(
new_cozo_rocksdb(path.to_string()).map_err(|err| err.to_string())?,
));
}
#[cfg(not(feature = "storage-rocksdb"))]
{
return Err("support for rocksdb not compiled".to_string());
}
}
_ => Err(format!("unsupported engine: {}", engine)),
}
}
fn run_script_str(&self, payload: &str, params: &str) -> String {
match self {
DbInstance::Mem(db) => db.run_script_str(payload, params),
#[cfg(feature = "storage-sqlite")]
DbInstance::Sqlite(db) => db.run_script_str(payload, params),
#[cfg(feature = "storage-rocksdb")]
DbInstance::RocksDb(db) => db.run_script_str(payload, params),
}
}
fn import_relations(&self, data: &str) -> String {
match self {
DbInstance::Mem(db) => db.import_relation_str(data),
#[cfg(feature = "storage-sqlite")]
DbInstance::Sqlite(db) => db.import_relation_str(data),
#[cfg(feature = "storage-rocksdb")]
DbInstance::RocksDb(db) => db.import_relation_str(data),
}
}
fn export_relations(&self, data: &str) -> String {
match self {
DbInstance::Mem(db) => db.export_relations_str(data),
#[cfg(feature = "storage-sqlite")]
DbInstance::Sqlite(db) => db.export_relations_str(data),
#[cfg(feature = "storage-rocksdb")]
DbInstance::RocksDb(db) => db.export_relations_str(data),
}
}
fn backup(&self, path: &str) -> String {
match self {
DbInstance::Mem(db) => db.backup_db_str(path),
#[cfg(feature = "storage-sqlite")]
DbInstance::Sqlite(db) => db.backup_db_str(path),
#[cfg(feature = "storage-rocksdb")]
DbInstance::RocksDb(db) => db.backup_db_str(path),
}
}
fn restore(&self, path: &str) -> String {
match self {
DbInstance::Mem(db) => db.restore_backup_str(path),
#[cfg(feature = "storage-sqlite")]
DbInstance::Sqlite(db) => db.restore_backup_str(path),
#[cfg(feature = "storage-rocksdb")]
DbInstance::RocksDb(db) => db.restore_backup_str(path),
}
}
}
struct Handles {
current: AtomicI32,
dbs: Mutex<BTreeMap<i32, DbInstance>>,
@ -133,19 +44,19 @@ pub unsafe extern "C" fn cozo_open_db(
path: *const c_char,
db_id: &mut i32,
) -> *mut c_char {
let path = match CStr::from_ptr(path).to_str() {
let engine = match CStr::from_ptr(engine).to_str() {
Ok(p) => p,
Err(err) => return CString::new(format!("{}", err)).unwrap().into_raw(),
};
let engine = match CStr::from_ptr(engine).to_str() {
let path = match CStr::from_ptr(path).to_str() {
Ok(p) => p,
Err(err) => return CString::new(format!("{}", err)).unwrap().into_raw(),
};
let db = match DbInstance::new(engine, path) {
let db = match DbInstance::new(engine, path, Default::default()) {
Ok(db) => db,
Err(err) => return CString::new(err).unwrap().into_raw(),
Err(err) => return CString::new(err.to_string()).unwrap().into_raw(),
};
let id = HANDLES.current.fetch_add(1, Ordering::AcqRel);
@ -254,7 +165,9 @@ pub unsafe extern "C" fn cozo_import_relation(
Ok(p) => p,
Err(err) => return CString::new(format!("{}", err)).unwrap().into_raw(),
};
CString::new(db.import_relations(data)).unwrap().into_raw()
CString::new(db.import_relation_str(data))
.unwrap()
.into_raw()
}
#[no_mangle]
@ -286,7 +199,9 @@ pub unsafe extern "C" fn cozo_export_relations(
Ok(p) => p,
Err(err) => return CString::new(format!("{}", err)).unwrap().into_raw(),
};
CString::new(db.export_relations(data)).unwrap().into_raw()
CString::new(db.export_relations_str(data))
.unwrap()
.into_raw()
}
#[no_mangle]
@ -315,7 +230,7 @@ pub unsafe extern "C" fn cozo_backup(db_id: i32, out_path: *const c_char) -> *mu
Ok(p) => p,
Err(err) => return CString::new(format!("{}", err)).unwrap().into_raw(),
};
CString::new(db.backup(data)).unwrap().into_raw()
CString::new(db.backup_db_str(data)).unwrap().into_raw()
}
#[no_mangle]
@ -344,7 +259,9 @@ pub unsafe extern "C" fn cozo_restore(db_id: i32, in_path: *const c_char) -> *mu
Ok(p) => p,
Err(err) => return CString::new(format!("{}", err)).unwrap().into_raw(),
};
CString::new(db.restore(data)).unwrap().into_raw()
CString::new(db.restore_backup_str(data))
.unwrap()
.into_raw()
}
/// Free any C-string returned from the Cozo C API.

Loading…
Cancel
Save