From f5de468548f76634795b36b3af50721770831f27 Mon Sep 17 00:00:00 2001 From: Ziyang Hu Date: Mon, 16 May 2022 23:21:16 +0800 Subject: [PATCH] various stuff --- cozorocks/src/lib.rs | 24 +++++- cozorocks/src/options.rs | 7 ++ src/data/expr.rs | 4 +- src/data/expr_parser.rs | 4 +- src/data/op.rs | 4 +- src/data/tuple.rs | 36 ++++++++- src/data/tuple_set.rs | 6 +- src/runtime/instance.rs | 156 +++++++++++++++++++++++++++++++++------ src/runtime/options.rs | 15 +++- 9 files changed, 220 insertions(+), 36 deletions(-) diff --git a/cozorocks/src/lib.rs b/cozorocks/src/lib.rs index 3845bf06..427f2f35 100644 --- a/cozorocks/src/lib.rs +++ b/cozorocks/src/lib.rs @@ -276,6 +276,17 @@ impl TransactionPtr { } } #[inline] + pub fn get_owned(&self, options: &ReadOptions, + key: impl AsRef<[u8]>) -> Result> { + let mut slice = PinnableSlicePtr::default(); + if self.get(options, key, &mut slice)? { + Ok(Some(slice)) + } else { + Ok(None) + + } + } + #[inline] pub fn get_for_update( &self, options: &ReadOptions, @@ -377,6 +388,17 @@ impl DbPtr { } } #[inline] + pub fn get_owned(&self, options: &ReadOptions, + key: impl AsRef<[u8]>) -> Result> { + let mut slice = PinnableSlicePtr::default(); + if self.get(options, key, &mut slice)? { + Ok(Some(slice)) + } else { + Ok(None) + + } + } + #[inline] pub fn del(&self, options: &WriteOptions, key: impl AsRef<[u8]>) -> Result<()> { let mut status = BridgeStatus::default(); let ret = self.del_raw(options, key.as_ref(), &mut status); @@ -399,7 +421,7 @@ impl DbPtr { } #[inline] - pub fn make_transaction( + pub fn txn( &self, options: TransactOptions, write_ops: WriteOptionsPtr, diff --git a/cozorocks/src/options.rs b/cozorocks/src/options.rs index ad63cff9..65a40c5d 100644 --- a/cozorocks/src/options.rs +++ b/cozorocks/src/options.rs @@ -128,6 +128,10 @@ impl OptionsPtr { pub struct ReadOptionsPtr(UniquePtr); +unsafe impl Send for ReadOptionsPtr {} +// unsafe impl Sync for ReadOptionsPtr {} + + impl Deref for ReadOptionsPtr { type Target = UniquePtr; @@ -174,6 +178,9 @@ impl ReadOptionsPtr { pub struct WriteOptionsPtr(pub(crate) UniquePtr); +unsafe impl Send for WriteOptionsPtr {} +// unsafe impl Sync for WriteOptionsPtr {} + impl Deref for WriteOptionsPtr { type Target = UniquePtr; diff --git a/src/data/expr.rs b/src/data/expr.rs index 0ecb9267..69c44b63 100644 --- a/src/data/expr.rs +++ b/src/data/expr.rs @@ -28,8 +28,8 @@ pub(crate) enum Expr<'a> { Variable(String), TableCol(TableId, ColId), TupleSetIdx(TupleSetIdx), - Apply(Arc, Vec>), - ApplyAgg(Arc, Vec>, Vec>), + Apply(Arc, Vec>), + ApplyAgg(Arc, Vec>, Vec>), FieldAcc(String, Box>), IdxAcc(usize, Box>), } diff --git a/src/data/expr_parser.rs b/src/data/expr_parser.rs index a61e50d6..7830670a 100644 --- a/src/data/expr_parser.rs +++ b/src/data/expr_parser.rs @@ -99,7 +99,7 @@ fn build_expr_primary(pair: Pair) -> Result { let mut inner = pair.into_inner(); let p = inner.next().unwrap(); let op = p.as_rule(); - let op: Arc = match op { + let op: Arc = match op { Rule::term => return build_expr_primary(p), Rule::negate => Arc::new(OpNegate), Rule::minus => Arc::new(OpMinus), @@ -224,7 +224,7 @@ fn build_expr_infix<'a>( ) -> Result> { let lhs = lhs?; let rhs = rhs?; - let op: Arc = match op.as_rule() { + let op: Arc = match op.as_rule() { Rule::op_add => Arc::new(OpAdd), Rule::op_str_cat => Arc::new(OpStrCat), Rule::op_sub => Arc::new(OpSub), diff --git a/src/data/op.rs b/src/data/op.rs index c2636e1a..9a0eb1cc 100644 --- a/src/data/op.rs +++ b/src/data/op.rs @@ -1,11 +1,11 @@ use std::fmt::{Debug, Formatter}; -pub(crate) trait Op { +pub(crate) trait Op: Send + Sync { fn is_resolved(&self) -> bool; fn name(&self) -> &str; } -pub(crate) trait AggOp { +pub(crate) trait AggOp: Send + Sync { fn is_resolved(&self) -> bool; fn name(&self) -> &str; } diff --git a/src/data/tuple.rs b/src/data/tuple.rs index b757ceb1..f6ee6b50 100644 --- a/src/data/tuple.rs +++ b/src/data/tuple.rs @@ -7,10 +7,11 @@ use std::collections::BTreeMap; use std::fmt::{Debug, Formatter}; use std::hash::{Hash, Hasher}; use std::result; +use chrono::format::Item; use uuid::Uuid; #[derive(thiserror::Error, Debug)] -pub(crate) enum TupleError { +pub enum TupleError { #[error("Undefined data kind {0}")] UndefinedDataKind(u32), @@ -108,8 +109,14 @@ impl> Tuple { } } +impl From for u32 { + fn from(dk: DataKind) -> Self { + dk as u32 + } +} + #[derive(Clone)] -pub(crate) struct Tuple +pub struct Tuple where T: AsRef<[u8]>, { @@ -117,6 +124,16 @@ pub(crate) struct Tuple idx_cache: RefCell>, } +unsafe impl> Send for Tuple {} + +unsafe impl> Sync for Tuple {} + +impl From for Tuple where T: AsRef<[u8]> { + fn from(data: T) -> Self { + Tuple::new(data) + } +} + impl Tuple where T: AsRef<[u8]>, @@ -135,7 +152,7 @@ impl AsRef<[u8]> for Tuple } } -pub(crate) type OwnTuple = Tuple>; +pub type OwnTuple = Tuple>; pub(crate) const PREFIX_LEN: usize = 4; @@ -698,3 +715,16 @@ impl> Hash for Tuple { } impl> Eq for Tuple {} + + +impl<'a, P, T> From<(P, T)> for OwnTuple + where T: IntoIterator>, + P: Into { + fn from((prefix, it): (P, T)) -> Self { + let mut ret = OwnTuple::with_prefix(prefix.into()); + for item in it.into_iter() { + ret.push_value(item); + } + ret + } +} \ No newline at end of file diff --git a/src/data/tuple_set.rs b/src/data/tuple_set.rs index e076f47b..e0671431 100644 --- a/src/data/tuple_set.rs +++ b/src/data/tuple_set.rs @@ -9,7 +9,7 @@ pub(crate) enum TypingError { type Result = result::Result; -const MIN_TABLE_ID: u32 = 10001; +pub(crate) const MIN_TABLE_ID_BOUND: u32 = 10000; #[derive(Eq, PartialEq, Clone, Copy, Ord, PartialOrd, Hash)] pub(crate) struct TableId { @@ -25,14 +25,14 @@ impl Debug for TableId { impl TableId { pub(crate) fn new(in_root: bool, id: u32) -> Result { - if id < MIN_TABLE_ID { + if id <= MIN_TABLE_ID_BOUND { Err(TypingError::InvalidTableId(id)) } else { Ok(TableId { in_root, id }) } } pub(crate) fn is_valid(&self) -> bool { - self.id >= MIN_TABLE_ID + self.id > MIN_TABLE_ID_BOUND } } diff --git a/src/runtime/instance.rs b/src/runtime/instance.rs index 9f91b292..6e4414ee 100644 --- a/src/runtime/instance.rs +++ b/src/runtime/instance.rs @@ -1,17 +1,27 @@ use std::{mem, result}; -use cozorocks::{BridgeError, DbPtr, destroy_db, OptionsPtrShared, TDbOptions}; +use std::collections::BTreeMap; +use cozorocks::{BridgeError, DbPtr, destroy_db, OptionsPtrShared, PinnableSlicePtr, ReadOptionsPtr, TDbOptions, TransactionPtr, TransactOptions, WriteOptionsPtr}; use std::sync::{Arc, LockResult, Mutex, PoisonError}; +use std::sync::atomic::{AtomicU32, Ordering}; +use lazy_static::lazy_static; use log::error; -use crate::data::tuple::Tuple; -use crate::runtime::options::{default_options, default_txn_options, default_write_options}; +use crate::data::expr::StaticExpr; +use crate::data::tuple::{DataKind, OwnTuple, Tuple, TupleError}; +use crate::data::tuple_set::MIN_TABLE_ID_BOUND; +use crate::data::typing::Typing; +use crate::data::value::{StaticValue, Value}; +use crate::runtime::options::{default_options, default_read_options, default_txn_db_options, default_txn_options, default_write_options}; #[derive(thiserror::Error, Debug)] pub enum DbInstanceError { #[error(transparent)] - DbBridgeError(#[from] BridgeError), + DbBridge(#[from] BridgeError), #[error("Cannot obtain session lock")] - SessionLockError, + SessionLock, + + #[error(transparent)] + Tuple(#[from] TupleError), } type Result = result::Result; @@ -27,6 +37,7 @@ pub enum SessionStatus { struct SessionHandle { id: usize, db: DbPtr, + next_table_id: u32, status: SessionStatus, } @@ -36,18 +47,20 @@ pub struct DbInstance { tdb_options: TDbOptions, path: String, session_handles: Mutex>>>, + optimistic: bool, destroy_on_close: bool, } impl DbInstance { pub fn new(path: &str, optimistic: bool) -> Result { let options = default_options().make_shared(); - let tdb_options = default_txn_options(optimistic); + let tdb_options = default_txn_db_options(optimistic); let main = DbPtr::open(&options, &tdb_options, path)?; Ok(Self { options, tdb_options, main, + optimistic, path: path.to_string(), session_handles: vec![].into(), destroy_on_close: false, @@ -58,7 +71,7 @@ impl DbInstance { impl DbInstance { pub fn session(&self) -> Result { let mut handles = self.session_handles.lock() - .map_err(|_| DbInstanceError::SessionLockError)?; + .map_err(|_| DbInstanceError::SessionLock)?; let handle = handles.iter().find_map(|handle| { match handle.try_lock() { Ok(inner) => { @@ -84,6 +97,7 @@ impl DbInstance { status: SessionStatus::Prepared, id: idx, db: temp.clone(), + next_table_id: MIN_TABLE_ID_BOUND, })); handles.push(handle.clone()); @@ -94,10 +108,21 @@ impl DbInstance { drop(handles); + let mut w_opts_temp = default_write_options(); + w_opts_temp.set_disable_wal(true); + Ok(Session { main: self.main.clone(), temp, session_handle: handle, + optimistic: self.optimistic, + w_opts_main: default_write_options(), + w_opts_temp, + r_opts_main: default_read_options(), + r_opts_temp: default_read_options(), + stack: vec![], + cur_table_id: 0.into(), + params: Default::default(), }) } @@ -153,24 +178,71 @@ impl Drop for DbInstance { } } +enum SessionDefinable { + Value(StaticValue), + Expr(StaticExpr), + Typing(Typing) + // TODO +} + +type SessionStackFrame = BTreeMap; + pub struct Session { pub(crate) main: DbPtr, pub(crate) temp: DbPtr, + pub(crate) r_opts_main: ReadOptionsPtr, + pub(crate) r_opts_temp: ReadOptionsPtr, + pub(crate) w_opts_main: WriteOptionsPtr, + pub(crate) w_opts_temp: WriteOptionsPtr, + optimistic: bool, + cur_table_id: AtomicU32, + stack: Vec, + params: BTreeMap, session_handle: Arc>, } +pub(crate) struct InterpretContext<'a> { + session: &'a Session, +} + +impl <'a> InterpretContext<'a> { + pub(crate) fn resolve(&self, key: impl AsRef) { + + } + pub(crate) fn resolve_value(&self, key: impl AsRef) { + + } + pub(crate) fn resolve_typing(&self, key: impl AsRef) { + todo!() + } + // also for expr, table, etc.. +} + impl Session { - pub fn start(&mut self) -> Result<()> { - let mut handle = self.session_handle.lock() - .map_err(|_| DbInstanceError::SessionLockError)?; - handle.status = SessionStatus::Running; - Ok(()) + pub fn start(mut self) -> Result { + { + self.push_env(); + let mut handle = self.session_handle.lock() + .map_err(|_| DbInstanceError::SessionLock)?; + handle.status = SessionStatus::Running; + self.cur_table_id = handle.next_table_id.into(); + } + Ok(self) + } + pub(crate) fn push_env(&mut self) { + self.stack.push(BTreeMap::new()); + } + pub(crate) fn pop_env(&mut self) { + if self.stack.len() > 1 { + self.stack.pop(); + } } fn clear_data(&self) -> Result<()> { - let w_opts = default_write_options(); - self.temp - .del_range(&w_opts, Tuple::with_null_prefix(), Tuple::max_tuple())?; - // self.temp.compact_all()?; + self.temp.del_range( + &self.w_opts_temp, + Tuple::with_null_prefix(), + Tuple::max_tuple(), + )?; Ok(()) } pub fn stop(&mut self) -> Result<()> { @@ -178,11 +250,51 @@ impl Session { let mut handle = self.session_handle.lock() .map_err(|_| { error!("failed to stop interpreter"); - DbInstanceError::SessionLockError + DbInstanceError::SessionLock })?; + handle.next_table_id = self.cur_table_id.load(Ordering::SeqCst); handle.status = SessionStatus::Completed; Ok(()) } + + pub(crate) fn get_next_temp_table_id(&self) -> u32 { + let mut res = self.cur_table_id.fetch_add(1, Ordering::SeqCst); + while res.wrapping_add(1) < MIN_TABLE_ID_BOUND { + res = self.cur_table_id.fetch_add(MIN_TABLE_ID_BOUND, Ordering::SeqCst); + } + res + 1 + } + + pub(crate) fn txn(&self, w_opts: Option) -> TransactionPtr { + self.main.txn(default_txn_options(self.optimistic), + w_opts.unwrap_or_else(default_write_options)) + } + + pub(crate) fn get_next_main_table_id(&self) -> Result { + let txn = self.txn(None); + let key = MAIN_DB_TABLE_ID_SEQ_KEY.as_ref(); + let cur_id = match txn.get_owned(&self.r_opts_main, key)? { + None => { + let val = OwnTuple::from( + (DataKind::Data, &[(MIN_TABLE_ID_BOUND as i64).into()])); + txn.put(key, &val)?; + MIN_TABLE_ID_BOUND + } + Some(pt) => { + let pt = Tuple::from(pt); + let prev_id = pt.get_int(0)?; + let val = OwnTuple::from((DataKind::Data, &[(prev_id + 1).into()])); + txn.put(key, &val)?; + (prev_id + 1) as u32 + } + }; + txn.commit()?; + Ok(cur_id + 1) + } +} + +lazy_static! { + static ref MAIN_DB_TABLE_ID_SEQ_KEY: OwnTuple = OwnTuple::from((0u32, &[Value::Null])); } impl Drop for Session { @@ -201,7 +313,7 @@ mod tests { use super::*; use crate::runtime::instance::DbInstance; - fn test_send_sync(_x: T) {} + fn test_send(_x: T) {} #[test] fn creation() -> Result<()> { @@ -214,9 +326,11 @@ mod tests { let start = Instant::now(); let mut db2 = DbInstance::new("_test2", true)?; db2.set_destroy_on_close(true); - for _ in 0..1000 { - let i1 = db2.session()?; - test_send_sync(i1); + for _ in 0..100 { + let i1 = db2.session()?.start()?; + dbg!(i1.get_next_temp_table_id()); + dbg!(i1.get_next_main_table_id()?); + test_send(i1); } dbg!(start.elapsed()); Ok(()) diff --git a/src/runtime/options.rs b/src/runtime/options.rs index f80bec8b..0fdbf4f1 100644 --- a/src/runtime/options.rs +++ b/src/runtime/options.rs @@ -1,5 +1,5 @@ use lazy_static::lazy_static; -use cozorocks::{FlushOptionsPtr, OptionsPtr, OTxnDbOptionsPtr, OTxnOptionsPtr, PTxnDbOptionsPtr, PTxnOptionsPtr, ReadOptionsPtr, RustComparatorPtr, TDbOptions, WriteOptionsPtr}; +use cozorocks::{FlushOptionsPtr, OptionsPtr, OTxnDbOptionsPtr, OTxnOptionsPtr, PTxnDbOptionsPtr, PTxnOptionsPtr, ReadOptionsPtr, RustComparatorPtr, TDbOptions, TransactOptions, WriteOptionsPtr}; use crate::data::tuple::PREFIX_LEN; const COMPARATOR_NAME: &str = "cozo_cmp_v1"; @@ -34,10 +34,21 @@ pub fn default_flush_options() -> FlushOptionsPtr { FlushOptionsPtr::default() } -pub fn default_txn_options(optimistic: bool) -> TDbOptions { +pub fn default_txn_db_options(optimistic: bool) -> TDbOptions { if optimistic { TDbOptions::Optimistic(OTxnDbOptionsPtr::default()) } else { TDbOptions::Pessimistic(PTxnDbOptionsPtr::default()) } } + +pub fn default_txn_options(optimistic: bool) -> TransactOptions { + if optimistic { + let o = OTxnOptionsPtr::new(&DEFAULT_COMPARATOR); + TransactOptions::Optimistic(o) + } else { + let mut o = PTxnOptionsPtr::default(); + o.set_deadlock_detect(true); + TransactOptions::Pessimistic(o) + } +} \ No newline at end of file