diff --git a/cozorocks/bridge/cozorocks.h b/cozorocks/bridge/cozorocks.h index 984a0446..cf7073cf 100644 --- a/cozorocks/bridge/cozorocks.h +++ b/cozorocks/bridge/cozorocks.h @@ -87,28 +87,36 @@ public: return name.c_str(); } - virtual bool CanKeysWithDifferentByteContentsBeEqual() const { return true; } + virtual bool CanKeysWithDifferentByteContentsBeEqual() const { + return can_different_bytes_be_equal; + } void FindShortestSeparator(std::string *, const rocksdb::Slice &) const {} void FindShortSuccessor(std::string *) const {} - void set_fn(RustComparatorFn f) const { + void set_fn(RustComparatorFn f) { rust_compare = f; } - void set_name(rust::Str name_) const { + void set_name(rust::Str name_) { name = std::string(name_); } - mutable std::string name; - mutable RustComparatorFn rust_compare; + void set_can_different_bytes_be_equal(bool v) { + can_different_bytes_be_equal = v; + } + + std::string name; + RustComparatorFn rust_compare; + bool can_different_bytes_be_equal; }; -inline unique_ptr new_rust_comparator(rust::Str name, RustComparatorFn f) { +inline unique_ptr new_rust_comparator(rust::Str name, RustComparatorFn f, bool diff_bytes_can_equal) { auto ret = make_unique(); ret->set_name(name); ret->set_fn(f); + ret->set_can_different_bytes_be_equal(diff_bytes_can_equal); return ret; } diff --git a/cozorocks/src/bridge.rs b/cozorocks/src/bridge.rs index 5c9360b2..cb32ddd6 100644 --- a/cozorocks/src/bridge.rs +++ b/cozorocks/src/bridge.rs @@ -106,7 +106,7 @@ mod ffi { fn new_odb_options() -> UniquePtr; type RustComparator; - fn new_rust_comparator(name: &str, cmp: fn(&[u8], &[u8]) -> i8) -> UniquePtr; + fn new_rust_comparator(name: &str, cmp: fn(&[u8], &[u8]) -> i8, diff_bytes_can_equal: bool) -> UniquePtr; pub type IteratorBridge; fn seek_to_first(self: &IteratorBridge); @@ -180,11 +180,11 @@ use std::fmt::Formatter; impl std::fmt::Display for StatusBridgeCode { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "{}", match self { - &StatusBridgeCode::OK => "Ok", - &StatusBridgeCode::LOCK_ERROR => "LockError", - &StatusBridgeCode::EXISTING_ERROR => "ExistingError", - &StatusBridgeCode::NOT_FOUND_ERROR => "NotFoundError", + write!(f, "{}", match *self { + StatusBridgeCode::OK => "Ok", + StatusBridgeCode::LOCK_ERROR => "LockError", + StatusBridgeCode::EXISTING_ERROR => "ExistingError", + StatusBridgeCode::NOT_FOUND_ERROR => "NotFoundError", _ => "Unknown" }) } @@ -192,24 +192,24 @@ impl std::fmt::Display for StatusBridgeCode { impl std::fmt::Display for StatusCode { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "{}", match self { - &StatusCode::kOk => "Ok", - &StatusCode::kNotFound => "NotFound", - &StatusCode::kCorruption => "Corruption", - &StatusCode::kNotSupported => "NotSupported", - &StatusCode::kInvalidArgument => "InvalidArgument", - &StatusCode::kIOError => "IoError", - &StatusCode::kMergeInProgress => "MergeInProgress", - &StatusCode::kIncomplete => "Incomplete", - &StatusCode::kShutdownInProgress => "ShutdownInProgress", - &StatusCode::kTimedOut => "TimedOut", - &StatusCode::kAborted => "Aborted", - &StatusCode::kBusy => "Busy", - &StatusCode::kExpired => "Expired", - &StatusCode::kTryAgain => "TryAgain", - &StatusCode::kCompactionTooLarge => "CompactionTooLarge", - &StatusCode::kColumnFamilyDropped => "ColumnFamilyDropped", - &StatusCode::kMaxCode => "MaxCode", + write!(f, "{}", match *self { + StatusCode::kOk => "Ok", + StatusCode::kNotFound => "NotFound", + StatusCode::kCorruption => "Corruption", + StatusCode::kNotSupported => "NotSupported", + StatusCode::kInvalidArgument => "InvalidArgument", + StatusCode::kIOError => "IoError", + StatusCode::kMergeInProgress => "MergeInProgress", + StatusCode::kIncomplete => "Incomplete", + StatusCode::kShutdownInProgress => "ShutdownInProgress", + StatusCode::kTimedOut => "TimedOut", + StatusCode::kAborted => "Aborted", + StatusCode::kBusy => "Busy", + StatusCode::kExpired => "Expired", + StatusCode::kTryAgain => "TryAgain", + StatusCode::kCompactionTooLarge => "CompactionTooLarge", + StatusCode::kColumnFamilyDropped => "ColumnFamilyDropped", + StatusCode::kMaxCode => "MaxCode", _ => "Unknown" }) } @@ -217,23 +217,23 @@ impl std::fmt::Display for StatusCode { impl std::fmt::Display for StatusSubCode { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "{}", match self { - &StatusSubCode::kNone => "None", - &StatusSubCode::kMutexTimeout => "MutexTimeout", - &StatusSubCode::kLockTimeout => "LockTimeout", - &StatusSubCode::kLockLimit => "LockLimit", - &StatusSubCode::kNoSpace => "NoSpace", - &StatusSubCode::kDeadlock => "DeadLock", - &StatusSubCode::kStaleFile => "StaleFile", - &StatusSubCode::kMemoryLimit => "MemoryLimit", - &StatusSubCode::kSpaceLimit => "SpaceLimit", - &StatusSubCode::kPathNotFound => "PathNotFound", - &StatusSubCode::KMergeOperandsInsufficientCapacity => "MergeOperandsInsufficientCapacity", - &StatusSubCode::kManualCompactionPaused => "ManualCompactionPaused", - &StatusSubCode::kOverwritten => "Overwritten", - &StatusSubCode::kTxnNotPrepared => "TxnNotPrepared", - &StatusSubCode::kIOFenced => "IoFenced", - &StatusSubCode::kMaxSubCode => "MaxSubCode", + write!(f, "{}", match *self { + StatusSubCode::kNone => "None", + StatusSubCode::kMutexTimeout => "MutexTimeout", + StatusSubCode::kLockTimeout => "LockTimeout", + StatusSubCode::kLockLimit => "LockLimit", + StatusSubCode::kNoSpace => "NoSpace", + StatusSubCode::kDeadlock => "DeadLock", + StatusSubCode::kStaleFile => "StaleFile", + StatusSubCode::kMemoryLimit => "MemoryLimit", + StatusSubCode::kSpaceLimit => "SpaceLimit", + StatusSubCode::kPathNotFound => "PathNotFound", + StatusSubCode::KMergeOperandsInsufficientCapacity => "MergeOperandsInsufficientCapacity", + StatusSubCode::kManualCompactionPaused => "ManualCompactionPaused", + StatusSubCode::kOverwritten => "Overwritten", + StatusSubCode::kTxnNotPrepared => "TxnNotPrepared", + StatusSubCode::kIOFenced => "IoFenced", + StatusSubCode::kMaxSubCode => "MaxSubCode", _ => "Unknown" }) } @@ -241,13 +241,13 @@ impl std::fmt::Display for StatusSubCode { impl std::fmt::Display for StatusSeverity { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "{}", match self { - &StatusSeverity::kNoError => "NoError", - &StatusSeverity::kSoftError => "SoftError", - &StatusSeverity::kHardError => "HardError", - &StatusSeverity::kFatalError => "FatalError", - &StatusSeverity::kUnrecoverableError => "UnrecoverableError", - &StatusSeverity::kMaxSeverity => "MaxSeverity", + write!(f, "{}", match *self { + StatusSeverity::kNoError => "NoError", + StatusSeverity::kSoftError => "SoftError", + StatusSeverity::kHardError => "HardError", + StatusSeverity::kFatalError => "FatalError", + StatusSeverity::kUnrecoverableError => "UnrecoverableError", + StatusSeverity::kMaxSeverity => "MaxSeverity", _ => "Unknown" }) } diff --git a/cozorocks/src/lib.rs b/cozorocks/src/lib.rs index a7204c0a..f8606001 100644 --- a/cozorocks/src/lib.rs +++ b/cozorocks/src/lib.rs @@ -72,7 +72,7 @@ impl From for Option { } } -pub type Result = std::result::Result; +type Result = std::result::Result; pub enum SlicePtr { Plain(UniquePtr), @@ -97,8 +97,8 @@ pub struct RustComparatorPtr(UniquePtr); impl RustComparatorPtr { #[inline] - pub fn new(name: &str, cmp: fn(&[u8], &[u8]) -> i8) -> Self { - Self(new_rust_comparator(name, cmp)) + pub fn new(name: &str, cmp: fn(&[u8], &[u8]) -> i8, diff_bytes_can_equal: bool) -> Self { + Self(new_rust_comparator(name, cmp, diff_bytes_can_equal)) } } diff --git a/src/db/engine.rs b/src/db/engine.rs index b4c82444..1c62b509 100644 --- a/src/db/engine.rs +++ b/src/db/engine.rs @@ -3,11 +3,13 @@ use cozorocks::*; -use std::sync::{Arc, RwLock}; +use std::sync::{Arc, Mutex, RwLock}; use std::time::{SystemTime, UNIX_EPOCH}; use uuid::Uuid; use uuid::v1::{Context, Timestamp}; use rand::Rng; +use crate::error::{CozoError, Result}; +use crate::error::CozoError::{Poisoned, SessionErr}; pub struct EngineOptions { cmp: RustComparatorPtr, @@ -20,7 +22,7 @@ pub struct EngineOptions { pub struct Engine { pub db: DBPtr, pub options_store: Box, - session_handles: RwLock>>>, + session_handles: Mutex>>>, } unsafe impl Send for Engine {} @@ -34,7 +36,10 @@ impl Engine { } else { TDBOptions::Pessimistic(PTxnDBOptionsPtr::default()) }; - let cmp = RustComparatorPtr::new("cozo_cmp_v1", crate::relation::key_order::compare); + let cmp = RustComparatorPtr::new( + "cozo_cmp_v1", + crate::relation::key_order::compare, + false); let mut options = OptionsPtr::default(); options .set_comparator(&cmp) @@ -57,13 +62,14 @@ impl Engine { Ok(Self { db, options_store: e_options, - session_handles: RwLock::new(vec![]), + session_handles: Mutex::new(vec![]), }) } - pub fn session(&self) -> Session { + pub fn session(&self) -> Result { // find a handle if there is one available // otherwise create a new one - let old_handle = self.session_handles.read().unwrap().iter().find(|v| { + let mut guard = self.session_handles.lock().map_err(|_| CozoError::Poisoned)?; + let old_handle = guard.iter().find(|v| { match v.read() { Ok(content) => content.status == SessionStatus::Completed, Err(_) => false @@ -72,36 +78,38 @@ impl Engine { let handle = match old_handle { None => { let now = SystemTime::now(); - let since_epoch = now.duration_since(UNIX_EPOCH).unwrap(); + let since_epoch = now.duration_since(UNIX_EPOCH)?; let ts = Timestamp::from_unix( &self.options_store.uuid_ctx, since_epoch.as_secs(), since_epoch.subsec_nanos(), ); let mut rng = rand::thread_rng(); - let id = Uuid::new_v1(ts, &[rng.gen(), rng.gen(), rng.gen(), rng.gen(), rng.gen(), rng.gen()]).unwrap(); + let id = Uuid::new_v1(ts, &[rng.gen(), rng.gen(), rng.gen(), rng.gen(), rng.gen(), rng.gen()])?; let cf_ident = id.to_string(); - self.db.create_cf(&self.options_store.options, &cf_ident).unwrap(); + self.db.create_cf(&self.options_store.options, &cf_ident)?; let ret = Arc::new(RwLock::new(SessionHandle { cf_ident, status: SessionStatus::Prepared, table_count: 0, })); - self.session_handles.write().unwrap().push(ret.clone()); + guard.push(ret.clone()); ret } - Some(h) => h.clone() + Some(h) => h }; - Session { + let mut sess = Session { engine: self, stack_depth: 0, txn: TransactionPtr::null(), perm_cf: SharedPtr::null(), temp_cf: SharedPtr::null(), handle, - } + }; + sess.start()?; + Ok(sess) } } @@ -117,10 +125,10 @@ pub struct Session<'a> { // metadata are stored in table 0 impl<'a> Session<'a> { - pub fn start(&mut self) { + pub fn start(&mut self) -> Result<()> { self.perm_cf = self.engine.db.default_cf(); assert!(!self.perm_cf.is_null()); - self.temp_cf = self.engine.db.get_cf(&self.handle.read().unwrap().cf_ident).unwrap(); + self.temp_cf = self.engine.db.get_cf(&self.handle.read().map_err(|_| Poisoned)?.cf_ident).ok_or(SessionErr)?; assert!(!self.temp_cf.is_null()); let t_options = match self.engine.options_store.t_options { TDBOptions::Pessimistic(_) => { @@ -141,7 +149,20 @@ impl<'a> Session<'a> { if self.txn.is_null() { panic!("Starting session failed as opening transaction failed"); } - self.handle.write().unwrap().status = SessionStatus::Running; + self.handle.write().map_err(|_| Poisoned)?.status = SessionStatus::Running; + Ok(()) + } + pub fn finish_work(&mut self) -> Result<()> { + self.handle.write().map_err(|_| Poisoned)?.status = SessionStatus::Completed; + Ok(()) + } +} + +impl<'a> Drop for Session<'a> { + fn drop(&mut self) { + if let Err(e) = self.finish_work() { + eprintln!("{:?}", e); + } } } @@ -171,17 +192,16 @@ mod tests { fn push_get() { { let engine = Engine::new("_push_get".to_string(), false).unwrap(); - let mut sess = engine.session(); - sess.start(); + let sess = engine.session().unwrap(); for i in (-80..-40).step_by(10) { - let mut ikey = Tuple::with_prefix(0); + let mut ikey = Tuple::with_null_prefix(); ikey.push_int(i); ikey.push_str("pqr"); println!("in {:?} {:?}", ikey, ikey.data); - sess.txn.put(false, &sess.temp_cf, &ikey, &ikey).unwrap(); - println!("out {:?}", sess.txn.get(false, &sess.temp_cf, &ikey).unwrap().as_ref()); + sess.txn.put(true, &sess.perm_cf, &ikey, &ikey).unwrap(); + println!("out {:?}", sess.txn.get(true, &sess.perm_cf, &ikey).unwrap().as_ref()); } - let it = sess.txn.iterator(false, &sess.temp_cf); + let it = sess.txn.iterator(true, &sess.perm_cf); it.to_first(); for (key, val) in it.iter() { println!("a: {:?} {:?}", key.as_ref(), val.as_ref()); @@ -206,16 +226,18 @@ mod tests { assert!(engine.is_ok()); let engine2 = Engine::new(p1.to_string(), false); assert!(engine2.is_err()); + println!("create OK"); } let engine2 = Engine::new(p2.to_string(), false); assert!(engine2.is_ok()); + println!("start ok"); let engine2 = Arc::new(Engine::new(p3.to_string(), true).unwrap()); { for _i in 0..10 { - let mut _sess = engine2.session(); - _sess.start(); + let _sess = engine2.session().unwrap(); } - let handles = engine2.session_handles.read().unwrap(); + println!("sess OK"); + let handles = engine2.session_handles.lock().unwrap(); println!("got handles {}", handles.len()); let cf_ident = &handles.first().unwrap().read().unwrap().cf_ident; println!("Opening ok {}", cf_ident); @@ -226,12 +248,11 @@ mod tests { let mut thread_handles = vec![]; println!("concurrent"); - for i in 0..1 { + for i in 0..10 { let engine = engine2.clone(); thread_handles.push(thread::spawn(move || { - println!("In thread {}", i); - let mut sess = engine.session(); - sess.start(); + let mut sess = engine.session().unwrap(); + println!("In thread {} {}", i, sess.handle.read().unwrap().cf_ident); for _ in 0..10000 { sess.push_env(); sess.define_variable("abc", &"xyz".into(), true); @@ -260,7 +281,7 @@ mod tests { } println!("All OK"); { - let handles = engine2.session_handles.read().unwrap(); + let handles = engine2.session_handles.lock().unwrap(); println!("got handles {:#?}", handles.iter().map(|h| h.read().unwrap().cf_ident.to_string()).collect::>()); } } diff --git a/src/db/eval.rs b/src/db/eval.rs index 093d2884..3be2eb3a 100644 --- a/src/db/eval.rs +++ b/src/db/eval.rs @@ -4,22 +4,23 @@ use crate::relation::table::{DataKind, Table}; use crate::relation::tuple::{Tuple}; use crate::relation::typing::Typing; use crate::relation::value::Value; +use crate::error::Result; pub trait Environment> { fn push_env(&mut self); - fn pop_env(&mut self); - fn define_variable(&mut self, name: &str, val: &Value, in_root: bool); - fn define_type_alias(&mut self, name: &str, typ: &Typing, in_root: bool); - fn define_table(&mut self, table: &Table, in_root: bool); - fn resolve(&mut self, name: &str) -> Option>; - fn delete_defined(&mut self, name: &str, in_root: bool); + fn pop_env(&mut self) -> Result<()>; + fn define_variable(&mut self, name: &str, val: &Value, in_root: bool) -> Result<()>; + fn define_type_alias(&mut self, name: &str, typ: &Typing, in_root: bool) -> Result<()>; + fn define_table(&mut self, table: &Table, in_root: bool) -> Result<()>; + fn resolve(&mut self, name: &str) -> Result>>; + fn delete_defined(&mut self, name: &str, in_root: bool) -> Result<()>; } impl<'a> Session<'a> { fn encode_definable_key(&self, name: &str, in_root: bool) -> Tuple> { let depth_code = if in_root { 0 } else { self.stack_depth as i64 }; - let mut tuple = Tuple::with_prefix(0); + let mut tuple = Tuple::with_null_prefix(); tuple.push_str(name); tuple.push_int(depth_code); tuple @@ -32,62 +33,65 @@ impl<'a> Environment for Session<'a> { self.stack_depth -= 1; } - fn pop_env(&mut self) { + fn pop_env(&mut self) -> Result<()> { if self.stack_depth == 0 { - return; + return Ok(()); } // Remove all stuff starting with the stack depth from the temp session - let mut prefix = Tuple::with_prefix(0); + let mut prefix = Tuple::with_null_prefix(); prefix.push_int(self.stack_depth as i64); let it = self.txn.iterator(false, &self.temp_cf); it.seek(&prefix); for val in it.keys() { let cur = Tuple::new(val); if cur.starts_with(&prefix) { - let name = cur.get(1).unwrap(); - let mut ikey = Tuple::with_prefix(0); - ikey.push_value(&name); - ikey.push_int(self.stack_depth as i64); + if let Some(name) = cur.get(1) { + let mut ikey = Tuple::with_null_prefix(); + ikey.push_value(&name); + ikey.push_int(self.stack_depth as i64); - self.txn.del(false, &self.temp_cf, cur).unwrap(); - self.txn.del(false, &self.temp_cf, ikey).unwrap(); + self.txn.del(false, &self.temp_cf, cur)?; + self.txn.del(false, &self.temp_cf, ikey)?; + } } else { break; } } self.stack_depth += 1; + Ok(()) } - fn define_variable(&mut self, name: &str, val: &Value, in_root: bool) { + fn define_variable(&mut self, name: &str, val: &Value, in_root: bool) -> Result<()> { let key = self.encode_definable_key(name, in_root); - let mut data = Tuple::with_prefix(DataKind::Value as u32); + let mut data = Tuple::with_data_prefix(DataKind::Value); data.push_value(val); if in_root { - self.txn.put(true, &self.perm_cf, key, data).unwrap(); + self.txn.put(true, &self.perm_cf, key, data)?; } else { - let mut ikey = Tuple::with_prefix(0); + let mut ikey = Tuple::with_null_prefix(); ikey.push_int(self.stack_depth as i64); ikey.push_str(name); - self.txn.put(false, &self.temp_cf, key, data).unwrap(); - self.txn.put(false, &self.temp_cf, ikey, "").unwrap(); + self.txn.put(false, &self.temp_cf, key, data)?; + self.txn.put(false, &self.temp_cf, ikey, "")?; } + Ok(()) } - fn define_type_alias(&mut self, name: &str, typ: &Typing, in_root: bool) { + fn define_type_alias(&mut self, name: &str, typ: &Typing, in_root: bool) -> Result<()> { todo!() } - fn define_table(&mut self, table: &Table, in_root: bool) { + fn define_table(&mut self, table: &Table, in_root: bool) -> Result<()> { todo!() } - fn resolve(&mut self, name: &str) -> Option> { - let mut tuple = Tuple::with_prefix(0); + fn resolve(&mut self, name: &str) -> Result>> { + let mut tuple = Tuple::with_null_prefix(); tuple.push_str(name); let it = self.txn.iterator(false, &self.temp_cf); it.seek(&tuple); - match it.pair() { + Ok(match it.pair() { None => { None } @@ -102,10 +106,10 @@ impl<'a> Environment for Session<'a> { None } } - } + }) } - fn delete_defined(&mut self, name: &str, in_root: bool) { + fn delete_defined(&mut self, name: &str, in_root: bool) -> Result<()> { todo!() } } \ No newline at end of file diff --git a/src/error.rs b/src/error.rs index 8f24f0d0..8b94598f 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,10 +1,14 @@ use std::result; +use std::sync::PoisonError; +use std::time::SystemTimeError; use thiserror::Error; +use cozorocks::BridgeError; + // use crate::parser::Rule; // #[derive(Error, Debug)] pub enum CozoError { -// #[error("Invalid UTF code")] + // #[error("Invalid UTF code")] // InvalidUtfCode, // // #[error("Invalid escape sequence")] @@ -60,6 +64,20 @@ pub enum CozoError { // // #[error(transparent)] // Io(#[from] std::io::Error), + #[error("Session error")] + SessionErr, + + #[error("Poisoned locks")] + Poisoned, + + #[error(transparent)] + SysTime(#[from] SystemTimeError), + + #[error(transparent)] + Uuid(#[from] uuid::Error), + + #[error(transparent)] + Bridge(#[from] BridgeError), } pub type Result = result::Result; \ No newline at end of file diff --git a/src/relation/tuple.rs b/src/relation/tuple.rs index 39d83519..bf79f572 100644 --- a/src/relation/tuple.rs +++ b/src/relation/tuple.rs @@ -4,6 +4,7 @@ use std::collections::BTreeMap; use std::fmt::{Debug, Formatter}; use std::hash::{Hash, Hasher}; use uuid::Uuid; +use crate::relation::table::DataKind; use crate::relation::value::{Tag, Value}; #[derive(Clone)] @@ -222,6 +223,14 @@ impl<'a, T: AsRef<[u8]>> Iterator for TupleIter<'a, T> { } impl Tuple> { + #[inline] + pub fn with_null_prefix() -> Self { + Tuple::with_prefix(0) + } + #[inline] + pub fn with_data_prefix(prefix: DataKind) -> Self { + Tuple::with_prefix(prefix as u32) + } #[inline] pub fn with_prefix(prefix: u32) -> Self { let data = Vec::from(prefix.to_be_bytes());