diff --git a/src/data/op.rs b/src/data/op.rs index 9a0eb1cc..32f2650e 100644 --- a/src/data/op.rs +++ b/src/data/op.rs @@ -1,13 +1,41 @@ use std::fmt::{Debug, Formatter}; +use crate::data::expr::Expr; +use crate::data::typing::Typing; pub(crate) trait Op: Send + Sync { fn is_resolved(&self) -> bool; fn name(&self) -> &str; + fn non_null_args(&self) -> bool { + true + } + fn typing_eval(&self, args: ()) -> Typing { + Typing::Any + } + fn row_eval(&self, ctx: (), args: ()) -> () { + unimplemented!() + } + fn row_eval_non_null(&self) -> () { + panic!() + } + fn expr_eval(&self, ctx: (), args: ()) -> () { + self.row_eval(ctx, args) + } } pub(crate) trait AggOp: Send + Sync { fn is_resolved(&self) -> bool; fn name(&self) -> &str; + fn row_eval(&self, ctx: (), args: ()) -> () { + unimplemented!() + } + fn expr_eval(&self, ctx: (), args: ()) -> () { + self.row_eval(ctx, args) + } +} + +impl<'a> Expr<'a> { + pub(crate) fn expr_eval() {} + pub(crate) fn row_eval() {} } pub(crate) struct UnresolvedOp(pub String); diff --git a/src/data/tuple.rs b/src/data/tuple.rs index f6ee6b50..b8ee3426 100644 --- a/src/data/tuple.rs +++ b/src/data/tuple.rs @@ -9,6 +9,7 @@ use std::hash::{Hash, Hasher}; use std::result; use chrono::format::Item; use uuid::Uuid; +use cozorocks::{PinnableSlicePtr, PinnableSlicePtrShared, SlicePtr, SlicePtrShared}; #[derive(thiserror::Error, Debug)] pub enum TupleError { @@ -426,7 +427,7 @@ impl> Tuple { let (val, offset) = self.parse_value_at(pos + 1)?; (offset, Value::DescVal(Reverse(val.into()))) } - StorageTag::Max => (start, Value::EndSentinel), + StorageTag::Max => (start, Value::Sentinel), }; Ok((val, nxt)) } @@ -534,6 +535,7 @@ impl OwnTuple { #[inline] pub(crate) fn seal_with_sentinel(&mut self) { self.push_tag(StorageTag::Max); + self.idx_cache.borrow_mut().push(self.data.len()); } #[inline] fn push_tag(&mut self, tag: StorageTag) { @@ -638,7 +640,7 @@ impl OwnTuple { cache.truncate(start_len); cache.push(self.data.len()); } - Value::EndSentinel => panic!("Cannot push sentinel value"), + Value::Sentinel => self.seal_with_sentinel(), Value::DescVal(Reverse(v)) => { self.push_reverse_value(v); } @@ -727,4 +729,115 @@ impl<'a, P, T> From<(P, T)> for OwnTuple } ret } -} \ No newline at end of file +} + + +pub(crate) enum ReifiedTupleData { + Own(Vec), + Slice(SlicePtr), + SharedSlice(SlicePtrShared), + PinnableSlice(PinnableSlicePtr), + PinnableSliceShared(PinnableSlicePtrShared), +} + +impl Clone for ReifiedTupleData { + fn clone(&self) -> Self { + match self { + ReifiedTupleData::Own(o) => ReifiedTupleData::Own(o.clone()), + ReifiedTupleData::Slice(s) => ReifiedTupleData::Own(s.as_ref().to_vec()), + ReifiedTupleData::SharedSlice(s) => ReifiedTupleData::SharedSlice(s.clone()), + ReifiedTupleData::PinnableSlice(s) => ReifiedTupleData::Own(s.as_ref().to_vec()), + ReifiedTupleData::PinnableSliceShared(s) => ReifiedTupleData::PinnableSliceShared(s.clone()) + } + } +} + +impl From> for ReifiedTupleData { + fn from(d: Vec) -> Self { + ReifiedTupleData::Own(d) + } +} + +impl From for ReifiedTupleData { + fn from(d: SlicePtr) -> Self { + ReifiedTupleData::Slice(d) + } +} + +impl From for ReifiedTupleData { + fn from(d: SlicePtrShared) -> Self { + ReifiedTupleData::SharedSlice(d) + } +} + +impl From for ReifiedTupleData { + fn from(d: PinnableSlicePtr) -> Self { + ReifiedTupleData::PinnableSlice(d) + } +} + +impl From for ReifiedTupleData { + fn from(d: PinnableSlicePtrShared) -> Self { + ReifiedTupleData::PinnableSliceShared(d) + } +} + + +impl AsRef<[u8]> for ReifiedTupleData { + fn as_ref(&self) -> &[u8] { + match self { + ReifiedTupleData::Own(o) => o.as_ref(), + ReifiedTupleData::Slice(s) => s.as_ref(), + ReifiedTupleData::SharedSlice(s) => s.as_ref(), + ReifiedTupleData::PinnableSlice(s) => s.as_ref(), + ReifiedTupleData::PinnableSliceShared(s) => s.as_ref() + } + } +} + +pub(crate) type ReifiedTuple = Tuple; + +impl From for ReifiedTuple { + fn from(t: OwnTuple) -> Self { + ReifiedTuple { + data: t.data.into(), + idx_cache: t.idx_cache, + } + } +} + +impl From> for ReifiedTuple { + fn from(t: Tuple) -> Self { + ReifiedTuple { + data: t.data.into(), + idx_cache: t.idx_cache, + } + } +} + +impl From> for ReifiedTuple { + fn from(t: Tuple) -> Self { + ReifiedTuple { + data: t.data.into(), + idx_cache: t.idx_cache, + } + } +} + +impl From> for ReifiedTuple { + fn from(t: Tuple) -> Self { + ReifiedTuple { + data: t.data.into(), + idx_cache: t.idx_cache, + } + } +} + +impl From> for ReifiedTuple { + fn from(t: Tuple) -> Self { + ReifiedTuple { + data: t.data.into(), + idx_cache: t.idx_cache, + } + } +} diff --git a/src/data/tuple_set.rs b/src/data/tuple_set.rs index e0671431..d595505e 100644 --- a/src/data/tuple_set.rs +++ b/src/data/tuple_set.rs @@ -1,13 +1,21 @@ +use std::cmp::Ordering; use std::fmt::{Debug, Formatter}; use std::result; +use cozorocks::{PinnableSlicePtr, PinnableSlicePtrShared, SlicePtr, SlicePtrShared}; +use crate::data::tuple::{OwnTuple, ReifiedTuple, Tuple, TupleError}; +use crate::data::value::Value; #[derive(thiserror::Error, Debug)] -pub(crate) enum TypingError { +pub(crate) enum TupleSetError { #[error("table id not allowed: {0}")] InvalidTableId(u32), + #[error("Index out of bound: {0}")] + IndexOutOfBound(usize), + #[error(transparent)] + Tuple(#[from] TupleError), } -type Result = result::Result; +type Result = result::Result; pub(crate) const MIN_TABLE_ID_BOUND: u32 = 10000; @@ -26,7 +34,7 @@ impl Debug for TableId { impl TableId { pub(crate) fn new(in_root: bool, id: u32) -> Result { if id <= MIN_TABLE_ID_BOUND { - Err(TypingError::InvalidTableId(id)) + Err(TupleSetError::InvalidTableId(id)) } else { Ok(TableId { in_root, id }) } @@ -78,3 +86,89 @@ impl Debug for TupleSetIdx { ) } } + +#[derive(Debug, Clone, Eq, PartialEq)] +pub(crate) struct TupleSet { + keys: Vec, + vals: Vec, +} + +impl TupleSet { + pub(crate) fn push_key(&mut self, t: ReifiedTuple) { + self.keys.push(t); + } + pub(crate) fn push_val(&mut self, v: ReifiedTuple) { + self.vals.push(v); + } + pub(crate) fn merge(&mut self, o: TupleSet) { + self.keys.extend(o.keys); + self.vals.extend(o.vals); + } + pub(crate) fn extend_keys(&mut self, keys: I) + where I: IntoIterator, + ReifiedTuple: From { + self.keys.extend(keys.into_iter().map(ReifiedTuple::from)); + } + pub(crate) fn extend_vals(&mut self, keys: I) + where I: IntoIterator, + ReifiedTuple: From { + self.vals.extend(keys.into_iter().map(ReifiedTuple::from)); + } + + pub(crate) fn all_keys_eq(&self, other: &Self) -> bool { + if self.keys.len() != other.keys.len() { + return false; + } + for (l, r) in self.keys.iter().zip(&other.keys) { + if !l.key_part_eq(r) { + return false; + } + } + true + } + pub(crate) fn all_keys_cmp(&self, other: &Self) -> Ordering { + for (l, r) in self.keys.iter().zip(&other.keys) { + match l.key_part_cmp(r) { + Ordering::Equal => {} + v => return v, + } + } + Ordering::Equal + } + + pub(crate) fn get_value(&self, TupleSetIdx { is_key, t_set, col_idx }: TupleSetIdx) -> Result { + let tuples = if is_key { &self.keys } else { &self.vals }; + let tuple = tuples.get(t_set).ok_or(TupleSetError::IndexOutOfBound(t_set))?; + let res = tuple.get(col_idx)?; + Ok(res) + } +} + +impl From<(I1, I2)> for TupleSet + where I1: IntoIterator, + ReifiedTuple: From, + I2: IntoIterator, + ReifiedTuple: From { + fn from((keys, vals): (I1, I2)) -> Self { + TupleSet { + keys: keys.into_iter().map(ReifiedTuple::from).collect(), + vals: vals.into_iter().map(ReifiedTuple::from).collect(), + } + } +} + +#[cfg(test)] +mod tests { + use std::mem; + use super::*; + + #[test] + fn sizes() { + let t = OwnTuple::with_prefix(0); + let t2 = OwnTuple::with_prefix(0); + let ts = TupleSet::from(([t], [t2])); + dbg!(ts); + dbg!(mem::size_of::()); + dbg!(mem::size_of::()); + } +} \ No newline at end of file diff --git a/src/data/typing.rs b/src/data/typing.rs index 567ae6a1..a937520f 100644 --- a/src/data/typing.rs +++ b/src/data/typing.rs @@ -77,6 +77,9 @@ impl Debug for Typing { } impl Typing { + pub(crate) fn representative_value(&self) -> StaticValue { + todo!() + } pub(crate) fn coerce<'a>(&self, v: Value<'a>) -> Result> { if *self == Typing::Any { return Ok(v); diff --git a/src/data/value.rs b/src/data/value.rs index b14e73de..8f91cc1b 100644 --- a/src/data/value.rs +++ b/src/data/value.rs @@ -20,7 +20,7 @@ pub enum Value<'a> { DescVal(Reverse>>), - EndSentinel, + Sentinel, // Acts as "any" in type inference, end value in sorting } pub(crate) type StaticValue = Value<'static>; @@ -204,7 +204,7 @@ impl<'a> Display for Value<'a> { } f.write_char('}')?; } - Value::EndSentinel => write!(f, "Sentinel")?, + Value::Sentinel => write!(f, "Sentinel")?, Value::DescVal(Reverse(v)) => { write!(f, "~{}", v)?; } @@ -233,7 +233,7 @@ impl<'a> Value<'a> { .map(|(k, v)| (Cow::Owned(k.into_owned()), v.to_static())) .collect::, StaticValue>>() .into(), - Value::EndSentinel => panic!("Cannot process sentinel value"), + Value::Sentinel => panic!("Cannot process sentinel value"), Value::Bytes(t) => Value::from(t.into_owned()), Value::DescVal(Reverse(val)) => Value::DescVal(Reverse(val.to_static().into())), } diff --git a/src/runtime/instance.rs b/src/runtime/instance.rs index 6e4414ee..8fc6c325 100644 --- a/src/runtime/instance.rs +++ b/src/runtime/instance.rs @@ -1,7 +1,7 @@ use std::{mem, result}; -use std::collections::BTreeMap; +use std::collections::{BTreeMap, BTreeSet}; use cozorocks::{BridgeError, DbPtr, destroy_db, OptionsPtrShared, PinnableSlicePtr, ReadOptionsPtr, TDbOptions, TransactionPtr, TransactOptions, WriteOptionsPtr}; -use std::sync::{Arc, LockResult, Mutex, PoisonError}; +use std::sync::{Arc, LockResult, Mutex, PoisonError, RwLock, RwLockReadGuard, RwLockWriteGuard}; use std::sync::atomic::{AtomicU32, Ordering}; use lazy_static::lazy_static; use log::error; @@ -10,6 +10,7 @@ use crate::data::tuple::{DataKind, OwnTuple, Tuple, TupleError}; use crate::data::tuple_set::MIN_TABLE_ID_BOUND; use crate::data::typing::Typing; use crate::data::value::{StaticValue, Value}; +use crate::runtime::instance::DbInstanceError::TableDoesNotExist; use crate::runtime::options::{default_options, default_read_options, default_txn_db_options, default_txn_options, default_write_options}; #[derive(thiserror::Error, Debug)] @@ -22,6 +23,15 @@ pub enum DbInstanceError { #[error(transparent)] Tuple(#[from] TupleError), + + #[error("Cannot obtain table access lock")] + TableAccessLock, + + #[error("Cannot obtain table mutation lock")] + TableMutationLock, + + #[error("Table does not exist: {0}")] + TableDoesNotExist(u32), } type Result = result::Result; @@ -41,6 +51,8 @@ struct SessionHandle { status: SessionStatus, } +type TableLock = Arc>; + pub struct DbInstance { pub(crate) main: DbPtr, options: OptionsPtrShared, @@ -49,6 +61,7 @@ pub struct DbInstance { session_handles: Mutex>>>, optimistic: bool, destroy_on_close: bool, + table_locks: TableLock, } impl DbInstance { @@ -64,6 +77,7 @@ impl DbInstance { path: path.to_string(), session_handles: vec![].into(), destroy_on_close: false, + table_locks: Default::default(), }) } } @@ -123,6 +137,7 @@ impl DbInstance { stack: vec![], cur_table_id: 0.into(), params: Default::default(), + table_locks: self.table_locks.clone(), }) } @@ -181,7 +196,7 @@ impl Drop for DbInstance { enum SessionDefinable { Value(StaticValue), Expr(StaticExpr), - Typing(Typing) + Typing(Typing), // TODO } @@ -199,19 +214,16 @@ pub struct Session { stack: Vec, params: BTreeMap, session_handle: Arc>, + table_locks: TableLock, } pub(crate) struct InterpretContext<'a> { session: &'a Session, } -impl <'a> InterpretContext<'a> { - pub(crate) fn resolve(&self, key: impl AsRef) { - - } - pub(crate) fn resolve_value(&self, key: impl AsRef) { - - } +impl<'a> InterpretContext<'a> { + pub(crate) fn resolve(&self, key: impl AsRef) {} + pub(crate) fn resolve_value(&self, key: impl AsRef) {} pub(crate) fn resolve_typing(&self, key: impl AsRef) { todo!() } @@ -291,6 +303,12 @@ impl Session { txn.commit()?; Ok(cur_id + 1) } + pub(crate) fn table_access_guard(&self, ids: BTreeSet) -> Result> { + self.table_locks.try_read().map_err(|_| DbInstanceError::TableAccessLock) + } + pub(crate) fn table_mutation_guard(&self, ids: BTreeSet) -> Result> { + self.table_locks.write().map_err(|_| DbInstanceError::TableAccessLock) + } } lazy_static! {