main
Ziyang Hu 2 years ago
parent 337ee3ff80
commit f15e9ad5d0

@ -131,6 +131,10 @@ inline void set_comparator(Options &inner, const RustComparator &cmp_obj) {
inner.comparator = &cmp_obj;
}
inline void set_paranoid_checks(Options &inner, bool v) {
inner.paranoid_checks = v;
}
inline std::unique_ptr<ReadOptions> new_read_options() {
return std::make_unique<ReadOptions>();
}

@ -86,6 +86,7 @@ mod ffi {
fn optimize_level_style_compaction(o: Pin<&mut Options>);
fn set_create_if_missing(o: Pin<&mut Options>, v: bool);
fn set_comparator(o: Pin<&mut Options>, cmp: &RustComparator);
fn set_paranoid_checks(o: Pin<&mut Options>, v: bool);
type ReadOptions;
fn new_read_options() -> UniquePtr<ReadOptions>;

@ -158,6 +158,11 @@ impl OptionsPtr {
set_comparator(self.pin_mut(), cmp);
self
}
#[inline]
pub fn set_paranoid_checks(&mut self, v: bool) -> &mut Self {
set_paranoid_checks(self.pin_mut(), v);
self
}
}

@ -21,7 +21,7 @@ struct EngineOptions {
pub struct Engine {
db: DBPtr,
options_store: Box<EngineOptions>,
session_handles: Mutex<Vec<Arc<RwLock<SessionHandle>>>>,
session_handles: RwLock<Vec<Arc<RwLock<SessionHandle>>>>,
}
unsafe impl Send for Engine {}
@ -35,9 +35,14 @@ impl Engine {
} else {
TDBOptions::Pessimistic(PTxnDBOptionsPtr::default())
};
let mut options = OptionsPtr::default();
let cmp = RustComparatorPtr::new("cozo_cmp_v1", crate::relation::key_order::compare);
options.set_comparator(&cmp).increase_parallelism().optimize_level_style_compaction().set_create_if_missing(true);
let mut options = OptionsPtr::default();
options
.set_comparator(&cmp)
.increase_parallelism()
.optimize_level_style_compaction()
.set_create_if_missing(true)
.set_paranoid_checks(false);
let mut rng = rand::thread_rng();
let uuid_ctx = Context::new(rng.gen());
@ -49,25 +54,22 @@ impl Engine {
uuid_ctx,
});
let db = DBPtr::open(&e_options.options, &e_options.t_options, &e_options.path)?;
// println!("Created {}", e_options.path);
// println!("{:?}", db.cf_names());
db.drop_non_default_cfs();
Ok(Self {
db,
options_store: e_options,
session_handles: Mutex::new(vec![]),
session_handles: RwLock::new(vec![]),
})
}
pub fn session(&self) -> Session {
// find a handle if there is one available
// otherwise create a new one
let mut handles = self.session_handles.lock().unwrap();
let old_handle = handles.iter().find(|v| {
let old_handle = self.session_handles.read().unwrap().iter().find(|v| {
match v.read() {
Ok(content) => content.status == SessionStatus::Completed,
Err(_) => false
}
});
}).cloned();
let handle = match old_handle {
None => {
let now = SystemTime::now();
@ -87,7 +89,7 @@ impl Engine {
status: SessionStatus::Prepared,
table_count: 0,
}));
handles.push(ret.clone());
self.session_handles.write().unwrap().push(ret.clone());
ret
}
Some(h) => h.clone()
@ -152,7 +154,7 @@ mod tests {
for i in 0..10 {
let _sess = engine2.session();
}
let handles = engine2.session_handles.lock().unwrap();
let handles = engine2.session_handles.read().unwrap();
println!("got handles {}", handles.len());
let cf_ident = &handles.first().unwrap().read().unwrap().cf_ident;
println!("Opening ok {}", cf_ident);
@ -166,9 +168,9 @@ mod tests {
for i in 0..10 {
let engine = engine2.clone();
thread_handles.push(thread::spawn(move || {
// println!("In thread {}", i);
println!("In thread {}", i);
let _sess = engine.session();
// println!("In thread {} end", i);
println!("In thread {} end", i);
}))
}
@ -178,7 +180,7 @@ mod tests {
}
println!("All OK");
{
let handles = engine2.session_handles.lock().unwrap();
let handles = engine2.session_handles.read().unwrap();
println!("got handles {:#?}", handles.iter().map(|h| h.read().unwrap().cf_ident.to_string()).collect::<Vec<_>>());
}
}

Loading…
Cancel
Save