error handling; correct guards

main
Ziyang Hu 2 years ago
parent dd45ebb291
commit e70ed6d330

@ -87,28 +87,36 @@ public:
return name.c_str();
}
virtual bool CanKeysWithDifferentByteContentsBeEqual() const { return true; }
virtual bool CanKeysWithDifferentByteContentsBeEqual() const {
return can_different_bytes_be_equal;
}
void FindShortestSeparator(std::string *, const rocksdb::Slice &) const {}
void FindShortSuccessor(std::string *) const {}
void set_fn(RustComparatorFn f) const {
void set_fn(RustComparatorFn f) {
rust_compare = f;
}
void set_name(rust::Str name_) const {
void set_name(rust::Str name_) {
name = std::string(name_);
}
mutable std::string name;
mutable RustComparatorFn rust_compare;
void set_can_different_bytes_be_equal(bool v) {
can_different_bytes_be_equal = v;
}
std::string name;
RustComparatorFn rust_compare;
bool can_different_bytes_be_equal;
};
inline unique_ptr<RustComparator> new_rust_comparator(rust::Str name, RustComparatorFn f) {
inline unique_ptr<RustComparator> new_rust_comparator(rust::Str name, RustComparatorFn f, bool diff_bytes_can_equal) {
auto ret = make_unique<RustComparator>();
ret->set_name(name);
ret->set_fn(f);
ret->set_can_different_bytes_be_equal(diff_bytes_can_equal);
return ret;
}

@ -106,7 +106,7 @@ mod ffi {
fn new_odb_options() -> UniquePtr<OptimisticTransactionDBOptions>;
type RustComparator;
fn new_rust_comparator(name: &str, cmp: fn(&[u8], &[u8]) -> i8) -> UniquePtr<RustComparator>;
fn new_rust_comparator(name: &str, cmp: fn(&[u8], &[u8]) -> i8, diff_bytes_can_equal: bool) -> UniquePtr<RustComparator>;
pub type IteratorBridge;
fn seek_to_first(self: &IteratorBridge);
@ -180,11 +180,11 @@ use std::fmt::Formatter;
impl std::fmt::Display for StatusBridgeCode {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", match self {
&StatusBridgeCode::OK => "Ok",
&StatusBridgeCode::LOCK_ERROR => "LockError",
&StatusBridgeCode::EXISTING_ERROR => "ExistingError",
&StatusBridgeCode::NOT_FOUND_ERROR => "NotFoundError",
write!(f, "{}", match *self {
StatusBridgeCode::OK => "Ok",
StatusBridgeCode::LOCK_ERROR => "LockError",
StatusBridgeCode::EXISTING_ERROR => "ExistingError",
StatusBridgeCode::NOT_FOUND_ERROR => "NotFoundError",
_ => "Unknown"
})
}
@ -192,24 +192,24 @@ impl std::fmt::Display for StatusBridgeCode {
impl std::fmt::Display for StatusCode {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", match self {
&StatusCode::kOk => "Ok",
&StatusCode::kNotFound => "NotFound",
&StatusCode::kCorruption => "Corruption",
&StatusCode::kNotSupported => "NotSupported",
&StatusCode::kInvalidArgument => "InvalidArgument",
&StatusCode::kIOError => "IoError",
&StatusCode::kMergeInProgress => "MergeInProgress",
&StatusCode::kIncomplete => "Incomplete",
&StatusCode::kShutdownInProgress => "ShutdownInProgress",
&StatusCode::kTimedOut => "TimedOut",
&StatusCode::kAborted => "Aborted",
&StatusCode::kBusy => "Busy",
&StatusCode::kExpired => "Expired",
&StatusCode::kTryAgain => "TryAgain",
&StatusCode::kCompactionTooLarge => "CompactionTooLarge",
&StatusCode::kColumnFamilyDropped => "ColumnFamilyDropped",
&StatusCode::kMaxCode => "MaxCode",
write!(f, "{}", match *self {
StatusCode::kOk => "Ok",
StatusCode::kNotFound => "NotFound",
StatusCode::kCorruption => "Corruption",
StatusCode::kNotSupported => "NotSupported",
StatusCode::kInvalidArgument => "InvalidArgument",
StatusCode::kIOError => "IoError",
StatusCode::kMergeInProgress => "MergeInProgress",
StatusCode::kIncomplete => "Incomplete",
StatusCode::kShutdownInProgress => "ShutdownInProgress",
StatusCode::kTimedOut => "TimedOut",
StatusCode::kAborted => "Aborted",
StatusCode::kBusy => "Busy",
StatusCode::kExpired => "Expired",
StatusCode::kTryAgain => "TryAgain",
StatusCode::kCompactionTooLarge => "CompactionTooLarge",
StatusCode::kColumnFamilyDropped => "ColumnFamilyDropped",
StatusCode::kMaxCode => "MaxCode",
_ => "Unknown"
})
}
@ -217,23 +217,23 @@ impl std::fmt::Display for StatusCode {
impl std::fmt::Display for StatusSubCode {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", match self {
&StatusSubCode::kNone => "None",
&StatusSubCode::kMutexTimeout => "MutexTimeout",
&StatusSubCode::kLockTimeout => "LockTimeout",
&StatusSubCode::kLockLimit => "LockLimit",
&StatusSubCode::kNoSpace => "NoSpace",
&StatusSubCode::kDeadlock => "DeadLock",
&StatusSubCode::kStaleFile => "StaleFile",
&StatusSubCode::kMemoryLimit => "MemoryLimit",
&StatusSubCode::kSpaceLimit => "SpaceLimit",
&StatusSubCode::kPathNotFound => "PathNotFound",
&StatusSubCode::KMergeOperandsInsufficientCapacity => "MergeOperandsInsufficientCapacity",
&StatusSubCode::kManualCompactionPaused => "ManualCompactionPaused",
&StatusSubCode::kOverwritten => "Overwritten",
&StatusSubCode::kTxnNotPrepared => "TxnNotPrepared",
&StatusSubCode::kIOFenced => "IoFenced",
&StatusSubCode::kMaxSubCode => "MaxSubCode",
write!(f, "{}", match *self {
StatusSubCode::kNone => "None",
StatusSubCode::kMutexTimeout => "MutexTimeout",
StatusSubCode::kLockTimeout => "LockTimeout",
StatusSubCode::kLockLimit => "LockLimit",
StatusSubCode::kNoSpace => "NoSpace",
StatusSubCode::kDeadlock => "DeadLock",
StatusSubCode::kStaleFile => "StaleFile",
StatusSubCode::kMemoryLimit => "MemoryLimit",
StatusSubCode::kSpaceLimit => "SpaceLimit",
StatusSubCode::kPathNotFound => "PathNotFound",
StatusSubCode::KMergeOperandsInsufficientCapacity => "MergeOperandsInsufficientCapacity",
StatusSubCode::kManualCompactionPaused => "ManualCompactionPaused",
StatusSubCode::kOverwritten => "Overwritten",
StatusSubCode::kTxnNotPrepared => "TxnNotPrepared",
StatusSubCode::kIOFenced => "IoFenced",
StatusSubCode::kMaxSubCode => "MaxSubCode",
_ => "Unknown"
})
}
@ -241,13 +241,13 @@ impl std::fmt::Display for StatusSubCode {
impl std::fmt::Display for StatusSeverity {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", match self {
&StatusSeverity::kNoError => "NoError",
&StatusSeverity::kSoftError => "SoftError",
&StatusSeverity::kHardError => "HardError",
&StatusSeverity::kFatalError => "FatalError",
&StatusSeverity::kUnrecoverableError => "UnrecoverableError",
&StatusSeverity::kMaxSeverity => "MaxSeverity",
write!(f, "{}", match *self {
StatusSeverity::kNoError => "NoError",
StatusSeverity::kSoftError => "SoftError",
StatusSeverity::kHardError => "HardError",
StatusSeverity::kFatalError => "FatalError",
StatusSeverity::kUnrecoverableError => "UnrecoverableError",
StatusSeverity::kMaxSeverity => "MaxSeverity",
_ => "Unknown"
})
}

@ -72,7 +72,7 @@ impl From<BridgeStatus> for Option<BridgeError> {
}
}
pub type Result<T> = std::result::Result<T, BridgeError>;
type Result<T> = std::result::Result<T, BridgeError>;
pub enum SlicePtr {
Plain(UniquePtr<Slice>),
@ -97,8 +97,8 @@ pub struct RustComparatorPtr(UniquePtr<RustComparator>);
impl RustComparatorPtr {
#[inline]
pub fn new(name: &str, cmp: fn(&[u8], &[u8]) -> i8) -> Self {
Self(new_rust_comparator(name, cmp))
pub fn new(name: &str, cmp: fn(&[u8], &[u8]) -> i8, diff_bytes_can_equal: bool) -> Self {
Self(new_rust_comparator(name, cmp, diff_bytes_can_equal))
}
}

@ -3,11 +3,13 @@
use cozorocks::*;
use std::sync::{Arc, RwLock};
use std::sync::{Arc, Mutex, RwLock};
use std::time::{SystemTime, UNIX_EPOCH};
use uuid::Uuid;
use uuid::v1::{Context, Timestamp};
use rand::Rng;
use crate::error::{CozoError, Result};
use crate::error::CozoError::{Poisoned, SessionErr};
pub struct EngineOptions {
cmp: RustComparatorPtr,
@ -20,7 +22,7 @@ pub struct EngineOptions {
pub struct Engine {
pub db: DBPtr,
pub options_store: Box<EngineOptions>,
session_handles: RwLock<Vec<Arc<RwLock<SessionHandle>>>>,
session_handles: Mutex<Vec<Arc<RwLock<SessionHandle>>>>,
}
unsafe impl Send for Engine {}
@ -34,7 +36,10 @@ impl Engine {
} else {
TDBOptions::Pessimistic(PTxnDBOptionsPtr::default())
};
let cmp = RustComparatorPtr::new("cozo_cmp_v1", crate::relation::key_order::compare);
let cmp = RustComparatorPtr::new(
"cozo_cmp_v1",
crate::relation::key_order::compare,
false);
let mut options = OptionsPtr::default();
options
.set_comparator(&cmp)
@ -57,13 +62,14 @@ impl Engine {
Ok(Self {
db,
options_store: e_options,
session_handles: RwLock::new(vec![]),
session_handles: Mutex::new(vec![]),
})
}
pub fn session(&self) -> Session {
pub fn session(&self) -> Result<Session> {
// find a handle if there is one available
// otherwise create a new one
let old_handle = self.session_handles.read().unwrap().iter().find(|v| {
let mut guard = self.session_handles.lock().map_err(|_| CozoError::Poisoned)?;
let old_handle = guard.iter().find(|v| {
match v.read() {
Ok(content) => content.status == SessionStatus::Completed,
Err(_) => false
@ -72,36 +78,38 @@ impl Engine {
let handle = match old_handle {
None => {
let now = SystemTime::now();
let since_epoch = now.duration_since(UNIX_EPOCH).unwrap();
let since_epoch = now.duration_since(UNIX_EPOCH)?;
let ts = Timestamp::from_unix(
&self.options_store.uuid_ctx,
since_epoch.as_secs(),
since_epoch.subsec_nanos(),
);
let mut rng = rand::thread_rng();
let id = Uuid::new_v1(ts, &[rng.gen(), rng.gen(), rng.gen(), rng.gen(), rng.gen(), rng.gen()]).unwrap();
let id = Uuid::new_v1(ts, &[rng.gen(), rng.gen(), rng.gen(), rng.gen(), rng.gen(), rng.gen()])?;
let cf_ident = id.to_string();
self.db.create_cf(&self.options_store.options, &cf_ident).unwrap();
self.db.create_cf(&self.options_store.options, &cf_ident)?;
let ret = Arc::new(RwLock::new(SessionHandle {
cf_ident,
status: SessionStatus::Prepared,
table_count: 0,
}));
self.session_handles.write().unwrap().push(ret.clone());
guard.push(ret.clone());
ret
}
Some(h) => h.clone()
Some(h) => h
};
Session {
let mut sess = Session {
engine: self,
stack_depth: 0,
txn: TransactionPtr::null(),
perm_cf: SharedPtr::null(),
temp_cf: SharedPtr::null(),
handle,
}
};
sess.start()?;
Ok(sess)
}
}
@ -117,10 +125,10 @@ pub struct Session<'a> {
// metadata are stored in table 0
impl<'a> Session<'a> {
pub fn start(&mut self) {
pub fn start(&mut self) -> Result<()> {
self.perm_cf = self.engine.db.default_cf();
assert!(!self.perm_cf.is_null());
self.temp_cf = self.engine.db.get_cf(&self.handle.read().unwrap().cf_ident).unwrap();
self.temp_cf = self.engine.db.get_cf(&self.handle.read().map_err(|_| Poisoned)?.cf_ident).ok_or(SessionErr)?;
assert!(!self.temp_cf.is_null());
let t_options = match self.engine.options_store.t_options {
TDBOptions::Pessimistic(_) => {
@ -141,7 +149,20 @@ impl<'a> Session<'a> {
if self.txn.is_null() {
panic!("Starting session failed as opening transaction failed");
}
self.handle.write().unwrap().status = SessionStatus::Running;
self.handle.write().map_err(|_| Poisoned)?.status = SessionStatus::Running;
Ok(())
}
pub fn finish_work(&mut self) -> Result<()> {
self.handle.write().map_err(|_| Poisoned)?.status = SessionStatus::Completed;
Ok(())
}
}
impl<'a> Drop for Session<'a> {
fn drop(&mut self) {
if let Err(e) = self.finish_work() {
eprintln!("{:?}", e);
}
}
}
@ -171,17 +192,16 @@ mod tests {
fn push_get() {
{
let engine = Engine::new("_push_get".to_string(), false).unwrap();
let mut sess = engine.session();
sess.start();
let sess = engine.session().unwrap();
for i in (-80..-40).step_by(10) {
let mut ikey = Tuple::with_prefix(0);
let mut ikey = Tuple::with_null_prefix();
ikey.push_int(i);
ikey.push_str("pqr");
println!("in {:?} {:?}", ikey, ikey.data);
sess.txn.put(false, &sess.temp_cf, &ikey, &ikey).unwrap();
println!("out {:?}", sess.txn.get(false, &sess.temp_cf, &ikey).unwrap().as_ref());
sess.txn.put(true, &sess.perm_cf, &ikey, &ikey).unwrap();
println!("out {:?}", sess.txn.get(true, &sess.perm_cf, &ikey).unwrap().as_ref());
}
let it = sess.txn.iterator(false, &sess.temp_cf);
let it = sess.txn.iterator(true, &sess.perm_cf);
it.to_first();
for (key, val) in it.iter() {
println!("a: {:?} {:?}", key.as_ref(), val.as_ref());
@ -206,16 +226,18 @@ mod tests {
assert!(engine.is_ok());
let engine2 = Engine::new(p1.to_string(), false);
assert!(engine2.is_err());
println!("create OK");
}
let engine2 = Engine::new(p2.to_string(), false);
assert!(engine2.is_ok());
println!("start ok");
let engine2 = Arc::new(Engine::new(p3.to_string(), true).unwrap());
{
for _i in 0..10 {
let mut _sess = engine2.session();
_sess.start();
let _sess = engine2.session().unwrap();
}
let handles = engine2.session_handles.read().unwrap();
println!("sess OK");
let handles = engine2.session_handles.lock().unwrap();
println!("got handles {}", handles.len());
let cf_ident = &handles.first().unwrap().read().unwrap().cf_ident;
println!("Opening ok {}", cf_ident);
@ -226,12 +248,11 @@ mod tests {
let mut thread_handles = vec![];
println!("concurrent");
for i in 0..1 {
for i in 0..10 {
let engine = engine2.clone();
thread_handles.push(thread::spawn(move || {
println!("In thread {}", i);
let mut sess = engine.session();
sess.start();
let mut sess = engine.session().unwrap();
println!("In thread {} {}", i, sess.handle.read().unwrap().cf_ident);
for _ in 0..10000 {
sess.push_env();
sess.define_variable("abc", &"xyz".into(), true);
@ -260,7 +281,7 @@ mod tests {
}
println!("All OK");
{
let handles = engine2.session_handles.read().unwrap();
let handles = engine2.session_handles.lock().unwrap();
println!("got handles {:#?}", handles.iter().map(|h| h.read().unwrap().cf_ident.to_string()).collect::<Vec<_>>());
}
}

@ -4,22 +4,23 @@ use crate::relation::table::{DataKind, Table};
use crate::relation::tuple::{Tuple};
use crate::relation::typing::Typing;
use crate::relation::value::Value;
use crate::error::Result;
pub trait Environment<T: AsRef<[u8]>> {
fn push_env(&mut self);
fn pop_env(&mut self);
fn define_variable(&mut self, name: &str, val: &Value, in_root: bool);
fn define_type_alias(&mut self, name: &str, typ: &Typing, in_root: bool);
fn define_table(&mut self, table: &Table, in_root: bool);
fn resolve(&mut self, name: &str) -> Option<Tuple<T>>;
fn delete_defined(&mut self, name: &str, in_root: bool);
fn pop_env(&mut self) -> Result<()>;
fn define_variable(&mut self, name: &str, val: &Value, in_root: bool) -> Result<()>;
fn define_type_alias(&mut self, name: &str, typ: &Typing, in_root: bool) -> Result<()>;
fn define_table(&mut self, table: &Table, in_root: bool) -> Result<()>;
fn resolve(&mut self, name: &str) -> Result<Option<Tuple<T>>>;
fn delete_defined(&mut self, name: &str, in_root: bool) -> Result<()>;
}
impl<'a> Session<'a> {
fn encode_definable_key(&self, name: &str, in_root: bool) -> Tuple<Vec<u8>> {
let depth_code = if in_root { 0 } else { self.stack_depth as i64 };
let mut tuple = Tuple::with_prefix(0);
let mut tuple = Tuple::with_null_prefix();
tuple.push_str(name);
tuple.push_int(depth_code);
tuple
@ -32,62 +33,65 @@ impl<'a> Environment<SlicePtr> for Session<'a> {
self.stack_depth -= 1;
}
fn pop_env(&mut self) {
fn pop_env(&mut self) -> Result<()> {
if self.stack_depth == 0 {
return;
return Ok(());
}
// Remove all stuff starting with the stack depth from the temp session
let mut prefix = Tuple::with_prefix(0);
let mut prefix = Tuple::with_null_prefix();
prefix.push_int(self.stack_depth as i64);
let it = self.txn.iterator(false, &self.temp_cf);
it.seek(&prefix);
for val in it.keys() {
let cur = Tuple::new(val);
if cur.starts_with(&prefix) {
let name = cur.get(1).unwrap();
let mut ikey = Tuple::with_prefix(0);
ikey.push_value(&name);
ikey.push_int(self.stack_depth as i64);
if let Some(name) = cur.get(1) {
let mut ikey = Tuple::with_null_prefix();
ikey.push_value(&name);
ikey.push_int(self.stack_depth as i64);
self.txn.del(false, &self.temp_cf, cur).unwrap();
self.txn.del(false, &self.temp_cf, ikey).unwrap();
self.txn.del(false, &self.temp_cf, cur)?;
self.txn.del(false, &self.temp_cf, ikey)?;
}
} else {
break;
}
}
self.stack_depth += 1;
Ok(())
}
fn define_variable(&mut self, name: &str, val: &Value, in_root: bool) {
fn define_variable(&mut self, name: &str, val: &Value, in_root: bool) -> Result<()> {
let key = self.encode_definable_key(name, in_root);
let mut data = Tuple::with_prefix(DataKind::Value as u32);
let mut data = Tuple::with_data_prefix(DataKind::Value);
data.push_value(val);
if in_root {
self.txn.put(true, &self.perm_cf, key, data).unwrap();
self.txn.put(true, &self.perm_cf, key, data)?;
} else {
let mut ikey = Tuple::with_prefix(0);
let mut ikey = Tuple::with_null_prefix();
ikey.push_int(self.stack_depth as i64);
ikey.push_str(name);
self.txn.put(false, &self.temp_cf, key, data).unwrap();
self.txn.put(false, &self.temp_cf, ikey, "").unwrap();
self.txn.put(false, &self.temp_cf, key, data)?;
self.txn.put(false, &self.temp_cf, ikey, "")?;
}
Ok(())
}
fn define_type_alias(&mut self, name: &str, typ: &Typing, in_root: bool) {
fn define_type_alias(&mut self, name: &str, typ: &Typing, in_root: bool) -> Result<()> {
todo!()
}
fn define_table(&mut self, table: &Table, in_root: bool) {
fn define_table(&mut self, table: &Table, in_root: bool) -> Result<()> {
todo!()
}
fn resolve(&mut self, name: &str) -> Option<Tuple<SlicePtr>> {
let mut tuple = Tuple::with_prefix(0);
fn resolve(&mut self, name: &str) -> Result<Option<Tuple<SlicePtr>>> {
let mut tuple = Tuple::with_null_prefix();
tuple.push_str(name);
let it = self.txn.iterator(false, &self.temp_cf);
it.seek(&tuple);
match it.pair() {
Ok(match it.pair() {
None => {
None
}
@ -102,10 +106,10 @@ impl<'a> Environment<SlicePtr> for Session<'a> {
None
}
}
}
})
}
fn delete_defined(&mut self, name: &str, in_root: bool) {
fn delete_defined(&mut self, name: &str, in_root: bool) -> Result<()> {
todo!()
}
}

@ -1,10 +1,14 @@
use std::result;
use std::sync::PoisonError;
use std::time::SystemTimeError;
use thiserror::Error;
use cozorocks::BridgeError;
// use crate::parser::Rule;
//
#[derive(Error, Debug)]
pub enum CozoError {
// #[error("Invalid UTF code")]
// #[error("Invalid UTF code")]
// InvalidUtfCode,
//
// #[error("Invalid escape sequence")]
@ -60,6 +64,20 @@ pub enum CozoError {
//
// #[error(transparent)]
// Io(#[from] std::io::Error),
#[error("Session error")]
SessionErr,
#[error("Poisoned locks")]
Poisoned,
#[error(transparent)]
SysTime(#[from] SystemTimeError),
#[error(transparent)]
Uuid(#[from] uuid::Error),
#[error(transparent)]
Bridge(#[from] BridgeError),
}
pub type Result<T> = result::Result<T, CozoError>;

@ -4,6 +4,7 @@ use std::collections::BTreeMap;
use std::fmt::{Debug, Formatter};
use std::hash::{Hash, Hasher};
use uuid::Uuid;
use crate::relation::table::DataKind;
use crate::relation::value::{Tag, Value};
#[derive(Clone)]
@ -222,6 +223,14 @@ impl<'a, T: AsRef<[u8]>> Iterator for TupleIter<'a, T> {
}
impl Tuple<Vec<u8>> {
#[inline]
pub fn with_null_prefix() -> Self {
Tuple::with_prefix(0)
}
#[inline]
pub fn with_data_prefix(prefix: DataKind) -> Self {
Tuple::with_prefix(prefix as u32)
}
#[inline]
pub fn with_prefix(prefix: u32) -> Self {
let data = Vec::from(prefix.to_be_bytes());

Loading…
Cancel
Save