make DB Clone again

main
Ziyang Hu 2 years ago
parent 9ccb891233
commit 91ebe758c1

@ -81,18 +81,19 @@ pub struct DbManifest {
} }
/// The database object of Cozo. /// The database object of Cozo.
#[derive(Clone)]
pub struct Db<S> { pub struct Db<S> {
pub(crate) db: S, pub(crate) db: S,
temp_db: TempStorage, temp_db: TempStorage,
relation_store_id: Arc<AtomicU64>, relation_store_id: Arc<AtomicU64>,
queries_count: AtomicU64, queries_count: Arc<AtomicU64>,
running_queries: Arc<Mutex<BTreeMap<u64, RunningQueryHandle>>>, running_queries: Arc<Mutex<BTreeMap<u64, RunningQueryHandle>>>,
pub(crate) fixed_rules: ShardedLock<BTreeMap<String, Arc<Box<dyn FixedRule>>>>, pub(crate) fixed_rules: Arc<ShardedLock<BTreeMap<String, Arc<Box<dyn FixedRule>>>>>,
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
callback_count: AtomicU32, callback_count: Arc<AtomicU32>,
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
pub(crate) event_callbacks: ShardedLock<EventCallbackRegistry>, pub(crate) event_callbacks: Arc<ShardedLock<EventCallbackRegistry>>,
relation_locks: ShardedLock<BTreeMap<SmartString<LazyCompact>, Arc<ShardedLock<()>>>>, relation_locks: Arc<ShardedLock<BTreeMap<SmartString<LazyCompact>, Arc<ShardedLock<()>>>>>,
} }
impl<S> Debug for Db<S> { impl<S> Debug for Db<S> {
@ -186,7 +187,7 @@ impl<'s, S: Storage<'s>> Db<S> {
relation_store_id: Default::default(), relation_store_id: Default::default(),
queries_count: Default::default(), queries_count: Default::default(),
running_queries: Default::default(), running_queries: Default::default(),
fixed_rules: ShardedLock::new(DEFAULT_FIXED_RULES.clone()), fixed_rules: Arc::new(ShardedLock::new(DEFAULT_FIXED_RULES.clone())),
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
callback_count: Default::default(), callback_count: Default::default(),
// callback_receiver: Arc::new(receiver), // callback_receiver: Arc::new(receiver),

@ -36,7 +36,7 @@ pub fn new_cozo_mem() -> Result<crate::Db<MemStorage>> {
} }
/// The non-persistent storage /// The non-persistent storage
#[derive(Default)] #[derive(Default, Clone)]
pub struct MemStorage { pub struct MemStorage {
store: Arc<ShardedLock<BTreeMap<Vec<u8>, Vec<u8>>>>, store: Arc<ShardedLock<BTreeMap<Vec<u8>, Vec<u8>>>>,
} }

@ -26,7 +26,7 @@ pub(crate) mod tikv;
// pub(crate) mod re; // pub(crate) mod re;
/// Swappable storage trait for Cozo's storage engine /// Swappable storage trait for Cozo's storage engine
pub trait Storage<'s>: Sync { pub trait Storage<'s>: Sync + Clone {
/// The associated transaction type used by this engine /// The associated transaction type used by this engine
type Tx: StoreTx<'s>; type Tx: StoreTx<'s>;

@ -111,6 +111,7 @@ pub fn new_cozo_rocksdb(path: impl AsRef<Path>) -> Result<Db<RocksDbStorage>> {
} }
/// RocksDB storage engine /// RocksDB storage engine
#[derive(Clone)]
pub struct RocksDbStorage { pub struct RocksDbStorage {
db: RocksDb, db: RocksDb,
} }

@ -33,6 +33,7 @@ pub fn new_cozo_sled(path: impl AsRef<Path>) -> Result<crate::Db<SledStorage>> {
} }
/// Storage engine using Sled /// Storage engine using Sled
#[derive(Clone)]
pub struct SledStorage { pub struct SledStorage {
db: Db, db: Db,
} }

@ -22,10 +22,11 @@ use crate::storage::{Storage, StoreTx};
use crate::utils::swap_option_result; use crate::utils::swap_option_result;
/// The Sqlite storage engine /// The Sqlite storage engine
#[derive(Clone)]
pub struct SqliteStorage { pub struct SqliteStorage {
lock: Arc<ShardedLock<()>>, lock: Arc<ShardedLock<()>>,
name: PathBuf, name: PathBuf,
pool: Mutex<Vec<ConnectionWithFullMutex>>, pool: Arc<Mutex<Vec<ConnectionWithFullMutex>>>,
} }
/// Create a sqlite backed database. /// Create a sqlite backed database.
@ -51,7 +52,7 @@ pub fn new_cozo_sqlite(path: impl AsRef<Path>) -> Result<crate::Db<SqliteStorage
let ret = crate::Db::new(SqliteStorage { let ret = crate::Db::new(SqliteStorage {
lock: Default::default(), lock: Default::default(),
name: PathBuf::from(path.as_ref()), name: PathBuf::from(path.as_ref()),
pool: Mutex::new(vec![]), pool: Default::default(),
})?; })?;
ret.initialize()?; ret.initialize()?;

@ -17,7 +17,7 @@ use crate::runtime::relation::decode_tuple_from_kv;
use crate::storage::mem::SkipIterator; use crate::storage::mem::SkipIterator;
use crate::storage::{Storage, StoreTx}; use crate::storage::{Storage, StoreTx};
#[derive(Default)] #[derive(Default, Clone)]
pub(crate) struct TempStorage; pub(crate) struct TempStorage;
impl<'s> Storage<'s> for TempStorage { impl<'s> Storage<'s> for TempStorage {

@ -52,6 +52,7 @@ lazy_static! {
} }
/// Storage engine based on TiKV /// Storage engine based on TiKV
#[derive(Clone)]
pub struct TiKvStorage { pub struct TiKvStorage {
client: Arc<TransactionClient>, client: Arc<TransactionClient>,
raw_client: Arc<RawClient>, raw_client: Arc<RawClient>,

@ -12,7 +12,7 @@ use std::collections::BTreeMap;
use std::ffi::{c_char, CStr, CString}; use std::ffi::{c_char, CStr, CString};
use std::ptr::null_mut; use std::ptr::null_mut;
use std::sync::atomic::{AtomicI32, Ordering}; use std::sync::atomic::{AtomicI32, Ordering};
use std::sync::{Arc, Mutex}; use std::sync::Mutex;
use lazy_static::lazy_static; use lazy_static::lazy_static;
@ -20,7 +20,7 @@ use cozo::*;
struct Handles { struct Handles {
current: AtomicI32, current: AtomicI32,
dbs: Mutex<BTreeMap<i32, Arc<DbInstance>>>, dbs: Mutex<BTreeMap<i32, DbInstance>>,
} }
lazy_static! { lazy_static! {
@ -69,7 +69,7 @@ pub unsafe extern "C" fn cozo_open_db(
let id = HANDLES.current.fetch_add(1, Ordering::AcqRel); let id = HANDLES.current.fetch_add(1, Ordering::AcqRel);
let mut dbs = HANDLES.dbs.lock().unwrap(); let mut dbs = HANDLES.dbs.lock().unwrap();
dbs.insert(id, Arc::new(db)); dbs.insert(id, db);
*db_id = id; *db_id = id;
null_mut() null_mut()
} }

@ -7,7 +7,7 @@
*/ */
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::sync::atomic::{AtomicI32, Ordering}; use std::sync::atomic::{AtomicI32, Ordering};
use std::sync::{Arc, Mutex}; use std::sync::Mutex;
use jni::objects::{JClass, JString}; use jni::objects::{JClass, JString};
use jni::sys::{jboolean, jint, jstring}; use jni::sys::{jboolean, jint, jstring};
@ -19,7 +19,7 @@ use cozo::*;
#[derive(Default)] #[derive(Default)]
struct Handles { struct Handles {
current: AtomicI32, current: AtomicI32,
dbs: Mutex<BTreeMap<i32, Arc<DbInstance>>>, dbs: Mutex<BTreeMap<i32, DbInstance>>,
} }
lazy_static! { lazy_static! {
@ -29,7 +29,7 @@ lazy_static! {
}; };
} }
fn get_db(id: i32) -> Option<Arc<DbInstance>> { fn get_db(id: i32) -> Option<DbInstance> {
let dbs = HANDLES.dbs.lock().unwrap(); let dbs = HANDLES.dbs.lock().unwrap();
dbs.get(&id).cloned() dbs.get(&id).cloned()
} }
@ -49,7 +49,7 @@ pub extern "system" fn Java_org_cozodb_CozoJavaBridge_openDb(
Ok(db) => { Ok(db) => {
let id = HANDLES.current.fetch_add(1, Ordering::AcqRel); let id = HANDLES.current.fetch_add(1, Ordering::AcqRel);
let mut dbs = HANDLES.dbs.lock().unwrap(); let mut dbs = HANDLES.dbs.lock().unwrap();
dbs.insert(id, Arc::new(db)); dbs.insert(id, db);
id id
} }
Err(err) => { Err(err) => {

@ -204,7 +204,7 @@ fn params2js<'a>(
#[derive(Default)] #[derive(Default)]
struct Handles { struct Handles {
current: AtomicU32, current: AtomicU32,
dbs: Mutex<BTreeMap<u32, Arc<DbInstance>>>, dbs: Mutex<BTreeMap<u32, DbInstance>>,
cb_idx: AtomicU32, cb_idx: AtomicU32,
current_cbs: Mutex<BTreeMap<u32, Sender<Result<NamedRows>>>>, current_cbs: Mutex<BTreeMap<u32, Sender<Result<NamedRows>>>>,
} }
@ -221,7 +221,7 @@ fn open_db(mut cx: FunctionContext) -> JsResult<JsNumber> {
Ok(db) => { Ok(db) => {
let id = HANDLES.current.fetch_add(1, Ordering::AcqRel); let id = HANDLES.current.fetch_add(1, Ordering::AcqRel);
let mut dbs = HANDLES.dbs.lock().unwrap(); let mut dbs = HANDLES.dbs.lock().unwrap();
dbs.insert(id, Arc::new(db)); dbs.insert(id, db);
Ok(cx.number(id)) Ok(cx.number(id))
} }
Err(err) => { Err(err) => {

@ -53,7 +53,7 @@ ColumnFamilyOptions default_cf_options() {
return options; return options;
} }
unique_ptr <RocksDbBridge> open_db(const DbOpts &opts, RocksDbStatus &status) { shared_ptr <RocksDbBridge> open_db(const DbOpts &opts, RocksDbStatus &status) {
auto options = default_db_options(); auto options = default_db_options();
shared_ptr<Cache> cache = nullptr; shared_ptr<Cache> cache = nullptr;
@ -120,7 +120,7 @@ unique_ptr <RocksDbBridge> open_db(const DbOpts &opts, RocksDbStatus &status) {
} }
options.create_missing_column_families = true; options.create_missing_column_families = true;
unique_ptr <RocksDbBridge> db = make_unique<RocksDbBridge>(); shared_ptr <RocksDbBridge> db = make_shared<RocksDbBridge>();
db->db_path = convert_vec_to_string(opts.db_path); db->db_path = convert_vec_to_string(opts.db_path);

@ -117,7 +117,7 @@ struct RocksDbBridge {
~RocksDbBridge(); ~RocksDbBridge();
}; };
unique_ptr<RocksDbBridge> shared_ptr<RocksDbBridge>
open_db(const DbOpts &opts, RocksDbStatus &status); open_db(const DbOpts &opts, RocksDbStatus &status);
#endif //COZOROCKS_DB_H #endif //COZOROCKS_DB_H

@ -133,8 +133,9 @@ impl DbBuilder {
} }
} }
#[derive(Clone)]
pub struct RocksDb { pub struct RocksDb {
inner: UniquePtr<RocksDbBridge>, inner: SharedPtr<RocksDbBridge>,
} }
impl RocksDb { impl RocksDb {

@ -122,7 +122,7 @@ pub(crate) mod ffi {
type RocksDbBridge; type RocksDbBridge;
fn get_db_path(self: &RocksDbBridge) -> &CxxString; fn get_db_path(self: &RocksDbBridge) -> &CxxString;
fn open_db(builder: &DbOpts, status: &mut RocksDbStatus) -> UniquePtr<RocksDbBridge>; fn open_db(builder: &DbOpts, status: &mut RocksDbStatus) -> SharedPtr<RocksDbBridge>;
fn transact(self: &RocksDbBridge) -> UniquePtr<TxBridge>; fn transact(self: &RocksDbBridge) -> UniquePtr<TxBridge>;
fn del_range(self: &RocksDbBridge, lower: &[u8], upper: &[u8], status: &mut RocksDbStatus); fn del_range(self: &RocksDbBridge, lower: &[u8], upper: &[u8], status: &mut RocksDbStatus);
fn put(self: &RocksDbBridge, key: &[u8], val: &[u8], status: &mut RocksDbStatus); fn put(self: &RocksDbBridge, key: &[u8], val: &[u8], status: &mut RocksDbStatus);

@ -12,7 +12,6 @@ use std::fs;
use std::net::Ipv6Addr; use std::net::Ipv6Addr;
use std::process::exit; use std::process::exit;
use std::str::FromStr; use std::str::FromStr;
use std::sync::Arc;
use clap::Parser; use clap::Parser;
use env_logger::Env; use env_logger::Env;
@ -78,14 +77,12 @@ fn main() {
eprintln!("{SECURITY_WARNING}"); eprintln!("{SECURITY_WARNING}");
} }
let db = Arc::new( let db = DbInstance::new(
DbInstance::new( args.engine.as_str(),
args.engine.as_str(), args.path.as_str(),
args.path.as_str(), &args.config.clone(),
&args.config.clone(), )
) .unwrap();
.unwrap(),
);
if let Some(restore_path) = &args.restore { if let Some(restore_path) = &args.restore {
db.restore_backup(restore_path).unwrap(); db.restore_backup(restore_path).unwrap();
@ -117,7 +114,7 @@ fn main() {
} }
} }
fn server_main(args: Args, db: Arc<DbInstance>) { fn server_main(args: Args, db: DbInstance) {
let conf_path = format!("{}.{}.cozo_auth", args.path, args.engine); let conf_path = format!("{}.{}.cozo_auth", args.path, args.engine);
let auth_guard = match fs::read_to_string(&conf_path) { let auth_guard = match fs::read_to_string(&conf_path) {
Ok(s) => s.trim().to_string(), Ok(s) => s.trim().to_string(),

@ -12,7 +12,6 @@ use std::collections::BTreeMap;
use std::error::Error; use std::error::Error;
use std::fs::File; use std::fs::File;
use std::io::{Read, Write}; use std::io::{Read, Write};
use std::sync::Arc;
use miette::{bail, miette, IntoDiagnostic}; use miette::{bail, miette, IntoDiagnostic};
use serde_json::{json, Value}; use serde_json::{json, Value};
@ -58,7 +57,7 @@ impl rustyline::validate::Validator for Indented {
} }
} }
pub(crate) fn repl_main(db: Arc<DbInstance>) -> Result<(), Box<dyn Error>> { pub(crate) fn repl_main(db: DbInstance) -> Result<(), Box<dyn Error>> {
println!("Welcome to the Cozo REPL."); println!("Welcome to the Cozo REPL.");
println!("Type a space followed by newline to enter multiline mode."); println!("Type a space followed by newline to enter multiline mode.");

Loading…
Cancel
Save