create sessions

main
Ziyang Hu 2 years ago
parent 2afb722683
commit e0ab833345

@ -9,7 +9,8 @@ edition = "2021"
pest = "2.0"
pest_derive = "2.0"
ordered-float = "2.10.0"
uuid = "0.8"
uuid = { version = "0.8", features = ["v1"] }
rand = "0.8.5"
chrono = "0.4"
anyhow = "1.0"
lazy_static = "1.4.0"

@ -432,7 +432,7 @@ struct TDBBridge {
}
}
inline void
inline shared_ptr<ColumnFamilyHandle>
create_column_family_raw(const Options &options, const string &name, BridgeStatus &status) const {
{
ReadLock r_lock(handle_lock);
@ -440,14 +440,18 @@ struct TDBBridge {
write_status_impl(status, StatusCode::kMaxCode, StatusSubCode::kMaxSubCode,
StatusSeverity::kSoftError,
2);
return;
return shared_ptr<ColumnFamilyHandle>(nullptr);
}
}
WriteLock w_lock(handle_lock);
ColumnFamilyHandle *handle;
auto s = db->CreateColumnFamily(options, name, &handle);
write_status(std::move(s), status);
handles[name] = shared_ptr<ColumnFamilyHandle>(handle);
{
WriteLock w_lock(handle_lock);
ColumnFamilyHandle *handle;
auto s = db->CreateColumnFamily(options, name, &handle);
write_status(std::move(s), status);
auto ret = shared_ptr<ColumnFamilyHandle>(handle);
handles[name] = ret;
return ret;
}
}
inline void drop_column_family_raw(const string &name, BridgeStatus &status) const {

@ -158,7 +158,7 @@ mod ffi {
raw_r_ops: UniquePtr<ReadOptions>,
txn_options: UniquePtr<OptimisticTransactionOptions>) -> UniquePtr<TransactionBridge>;
fn get_cf_handle_raw(self: &TDBBridge, name: &CxxString) -> SharedPtr<ColumnFamilyHandle>;
fn create_column_family_raw(self: &TDBBridge, options: &Options, name: &CxxString, status: &mut BridgeStatus);
fn create_column_family_raw(self: &TDBBridge, options: &Options, name: &CxxString, status: &mut BridgeStatus) -> SharedPtr<ColumnFamilyHandle>;
fn drop_column_family_raw(self: &TDBBridge, name: &CxxString, status: &mut BridgeStatus);
fn get_column_family_names_raw(self: &TDBBridge) -> UniquePtr<CxxVector<CxxString>>;
fn open_tdb_raw(options: &Options,

@ -596,11 +596,11 @@ impl DBPtr {
}
}
#[inline]
pub fn create_cf(&self, options: &OptionsPtr, name: impl AsRef<str>) -> Result<()> {
pub fn create_cf(&self, options: &OptionsPtr, name: impl AsRef<str>) -> Result<SharedPtr<ColumnFamilyHandle>> {
let_cxx_string!(name = name.as_ref());
let mut status = BridgeStatus::default();
self.create_column_family_raw(options, &name, &mut status);
status.check_err(())
let ret = self.create_column_family_raw(options, &name, &mut status);
status.check_err(ret)
}
#[inline]
pub fn drop_cf(&self, name: impl AsRef<str>) -> Result<()> {
@ -615,7 +615,9 @@ impl DBPtr {
}
pub fn drop_non_default_cfs(&self) {
for name in self.cf_names() {
self.drop_cf(name);
if name != "default" {
self.drop_cf(name).unwrap();
}
}
}
}

@ -3,23 +3,29 @@
use cozorocks::*;
use std::sync::{Arc, RwLock};
use std::sync::{Arc, LockResult, Mutex, RwLock};
use std::sync::atomic::AtomicUsize;
use std::time::{SystemTime, UNIX_EPOCH};
use uuid::Uuid;
use uuid::v1::{Context, Timestamp};
use rand::Rng;
struct EngineOptions {
cmp: RustComparatorPtr,
options: OptionsPtr,
t_options: TDBOptions,
path: String,
uuid_ctx: Context,
}
pub struct Engine {
db: DBPtr,
options_store: Box<EngineOptions>,
session_handles: Vec<Arc<SessionHandle>>,
session_handles: Mutex<Vec<Arc<RwLock<SessionHandle>>>>,
}
unsafe impl Send for Engine {}
unsafe impl Sync for Engine {}
impl Engine {
@ -32,35 +38,86 @@ impl Engine {
let mut options = OptionsPtr::default();
let cmp = RustComparatorPtr::new("cozo_cmp_v1", crate::relation::key_order::compare);
options.set_comparator(&cmp).increase_parallelism().optimize_level_style_compaction().set_create_if_missing(true);
let e_options = Box::new(EngineOptions { cmp, options, t_options, path });
let mut rng = rand::thread_rng();
let uuid_ctx = Context::new(rng.gen());
let e_options = Box::new(EngineOptions {
cmp,
options,
t_options,
path,
uuid_ctx,
});
let db = DBPtr::open(&e_options.options, &e_options.t_options, &e_options.path)?;
// println!("Created {}", e_options.path);
// println!("{:?}", db.cf_names());
db.drop_non_default_cfs();
Ok(Self {
db,
options_store: e_options,
session_handles: vec![]
session_handles: Mutex::new(vec![]),
})
}
pub fn session(&self) -> Session {
// find a handle if there is one available
// otherwise create
todo!()
// otherwise create a new one
let mut handles = self.session_handles.lock().unwrap();
let old_handle = handles.iter().find(|v| {
match v.read() {
Ok(content) => content.status == SessionStatus::Completed,
Err(_) => false
}
});
let handle = match old_handle {
None => {
let now = SystemTime::now();
let since_epoch = now.duration_since(UNIX_EPOCH).unwrap();
let ts = Timestamp::from_unix(
&self.options_store.uuid_ctx,
since_epoch.as_secs(),
since_epoch.subsec_nanos(),
);
let mut rng = rand::thread_rng();
let id = Uuid::new_v1(ts, &[rng.gen(), rng.gen(), rng.gen(), rng.gen(), rng.gen(), rng.gen()]).unwrap();
let cf_ident = id.to_string();
self.db.create_cf(&self.options_store.options, &cf_ident).unwrap();
let ret = Arc::new(RwLock::new(SessionHandle {
cf_ident,
status: SessionStatus::Prepared,
table_count: 0,
}));
handles.push(ret.clone());
ret
}
Some(h) => h.clone()
};
return Session {
engine: self,
stack_depth: 0,
handle,
};
}
}
pub struct Session<'a> {
engine: &'a Engine,
stack_depth: i32, // zero or negative
stack_depth: i32,
// zero or negative
handle: Arc<RwLock<SessionHandle>>,
}
// every session has its own column family to play with
// metadata are stored in table 0
#[derive(Clone)]
pub struct SessionHandle {
cf_ident: String,
status: SessionStatus,
table_idx: AtomicUsize
table_count: usize,
}
#[derive(Eq, PartialEq, Debug, Clone)]
pub enum SessionStatus {
Prepared,
Running,
@ -74,7 +131,7 @@ mod tests {
#[test]
fn test_create() {
let p1= "_test_db_create1";
let p1 = "_test_db_create1";
let p2 = "_test_db_create2";
let p3 = "_test_db_create3";
{
@ -90,8 +147,15 @@ mod tests {
}
let engine2 = Engine::new(p2.to_string(), false);
assert!(engine2.is_ok());
let engine2 = Engine::new(p3.to_string(), false);
assert!(engine2.is_ok());
let engine2 = Engine::new(p3.to_string(), false).unwrap();
let _sess = engine2.session();
let handles = engine2.session_handles.lock().unwrap();
println!("got handles {}", handles.len());
let cf_ident = &handles.first().unwrap().read().unwrap().cf_ident;
println!("Opening ok {}", cf_ident);
let cf = engine2.db.get_cf(cf_ident).unwrap();
assert!(!cf.is_null());
println!("Getting CF ok");
}
let _ = fs::remove_dir_all(p1);
let _ = fs::remove_dir_all(p2);

Loading…
Cancel
Save