tuples are complicated

main
Ziyang Hu 2 years ago
parent f5de468548
commit 007c36daa2

@ -1,13 +1,41 @@
use std::fmt::{Debug, Formatter};
use crate::data::expr::Expr;
use crate::data::typing::Typing;
pub(crate) trait Op: Send + Sync {
fn is_resolved(&self) -> bool;
fn name(&self) -> &str;
fn non_null_args(&self) -> bool {
true
}
fn typing_eval(&self, args: ()) -> Typing {
Typing::Any
}
fn row_eval(&self, ctx: (), args: ()) -> () {
unimplemented!()
}
fn row_eval_non_null(&self) -> () {
panic!()
}
fn expr_eval(&self, ctx: (), args: ()) -> () {
self.row_eval(ctx, args)
}
}
pub(crate) trait AggOp: Send + Sync {
fn is_resolved(&self) -> bool;
fn name(&self) -> &str;
fn row_eval(&self, ctx: (), args: ()) -> () {
unimplemented!()
}
fn expr_eval(&self, ctx: (), args: ()) -> () {
self.row_eval(ctx, args)
}
}
impl<'a> Expr<'a> {
pub(crate) fn expr_eval() {}
pub(crate) fn row_eval() {}
}
pub(crate) struct UnresolvedOp(pub String);

@ -9,6 +9,7 @@ use std::hash::{Hash, Hasher};
use std::result;
use chrono::format::Item;
use uuid::Uuid;
use cozorocks::{PinnableSlicePtr, PinnableSlicePtrShared, SlicePtr, SlicePtrShared};
#[derive(thiserror::Error, Debug)]
pub enum TupleError {
@ -426,7 +427,7 @@ impl<T: AsRef<[u8]>> Tuple<T> {
let (val, offset) = self.parse_value_at(pos + 1)?;
(offset, Value::DescVal(Reverse(val.into())))
}
StorageTag::Max => (start, Value::EndSentinel),
StorageTag::Max => (start, Value::Sentinel),
};
Ok((val, nxt))
}
@ -534,6 +535,7 @@ impl OwnTuple {
#[inline]
pub(crate) fn seal_with_sentinel(&mut self) {
self.push_tag(StorageTag::Max);
self.idx_cache.borrow_mut().push(self.data.len());
}
#[inline]
fn push_tag(&mut self, tag: StorageTag) {
@ -638,7 +640,7 @@ impl OwnTuple {
cache.truncate(start_len);
cache.push(self.data.len());
}
Value::EndSentinel => panic!("Cannot push sentinel value"),
Value::Sentinel => self.seal_with_sentinel(),
Value::DescVal(Reverse(v)) => {
self.push_reverse_value(v);
}
@ -727,4 +729,115 @@ impl<'a, P, T> From<(P, T)> for OwnTuple
}
ret
}
}
}
pub(crate) enum ReifiedTupleData {
Own(Vec<u8>),
Slice(SlicePtr),
SharedSlice(SlicePtrShared),
PinnableSlice(PinnableSlicePtr),
PinnableSliceShared(PinnableSlicePtrShared),
}
impl Clone for ReifiedTupleData {
fn clone(&self) -> Self {
match self {
ReifiedTupleData::Own(o) => ReifiedTupleData::Own(o.clone()),
ReifiedTupleData::Slice(s) => ReifiedTupleData::Own(s.as_ref().to_vec()),
ReifiedTupleData::SharedSlice(s) => ReifiedTupleData::SharedSlice(s.clone()),
ReifiedTupleData::PinnableSlice(s) => ReifiedTupleData::Own(s.as_ref().to_vec()),
ReifiedTupleData::PinnableSliceShared(s) => ReifiedTupleData::PinnableSliceShared(s.clone())
}
}
}
impl From<Vec<u8>> for ReifiedTupleData {
fn from(d: Vec<u8>) -> Self {
ReifiedTupleData::Own(d)
}
}
impl From<SlicePtr> for ReifiedTupleData {
fn from(d: SlicePtr) -> Self {
ReifiedTupleData::Slice(d)
}
}
impl From<SlicePtrShared> for ReifiedTupleData {
fn from(d: SlicePtrShared) -> Self {
ReifiedTupleData::SharedSlice(d)
}
}
impl From<PinnableSlicePtr> for ReifiedTupleData {
fn from(d: PinnableSlicePtr) -> Self {
ReifiedTupleData::PinnableSlice(d)
}
}
impl From<PinnableSlicePtrShared> for ReifiedTupleData {
fn from(d: PinnableSlicePtrShared) -> Self {
ReifiedTupleData::PinnableSliceShared(d)
}
}
impl AsRef<[u8]> for ReifiedTupleData {
fn as_ref(&self) -> &[u8] {
match self {
ReifiedTupleData::Own(o) => o.as_ref(),
ReifiedTupleData::Slice(s) => s.as_ref(),
ReifiedTupleData::SharedSlice(s) => s.as_ref(),
ReifiedTupleData::PinnableSlice(s) => s.as_ref(),
ReifiedTupleData::PinnableSliceShared(s) => s.as_ref()
}
}
}
pub(crate) type ReifiedTuple = Tuple<ReifiedTupleData>;
impl From<OwnTuple> for ReifiedTuple {
fn from(t: OwnTuple) -> Self {
ReifiedTuple {
data: t.data.into(),
idx_cache: t.idx_cache,
}
}
}
impl From<Tuple<SlicePtr>> for ReifiedTuple {
fn from(t: Tuple<SlicePtr>) -> Self {
ReifiedTuple {
data: t.data.into(),
idx_cache: t.idx_cache,
}
}
}
impl From<Tuple<SlicePtrShared>> for ReifiedTuple {
fn from(t: Tuple<SlicePtrShared>) -> Self {
ReifiedTuple {
data: t.data.into(),
idx_cache: t.idx_cache,
}
}
}
impl From<Tuple<PinnableSlicePtr>> for ReifiedTuple {
fn from(t: Tuple<PinnableSlicePtr>) -> Self {
ReifiedTuple {
data: t.data.into(),
idx_cache: t.idx_cache,
}
}
}
impl From<Tuple<PinnableSlicePtrShared>> for ReifiedTuple {
fn from(t: Tuple<PinnableSlicePtrShared>) -> Self {
ReifiedTuple {
data: t.data.into(),
idx_cache: t.idx_cache,
}
}
}

@ -1,13 +1,21 @@
use std::cmp::Ordering;
use std::fmt::{Debug, Formatter};
use std::result;
use cozorocks::{PinnableSlicePtr, PinnableSlicePtrShared, SlicePtr, SlicePtrShared};
use crate::data::tuple::{OwnTuple, ReifiedTuple, Tuple, TupleError};
use crate::data::value::Value;
#[derive(thiserror::Error, Debug)]
pub(crate) enum TypingError {
pub(crate) enum TupleSetError {
#[error("table id not allowed: {0}")]
InvalidTableId(u32),
#[error("Index out of bound: {0}")]
IndexOutOfBound(usize),
#[error(transparent)]
Tuple(#[from] TupleError),
}
type Result<T> = result::Result<T, TypingError>;
type Result<T> = result::Result<T, TupleSetError>;
pub(crate) const MIN_TABLE_ID_BOUND: u32 = 10000;
@ -26,7 +34,7 @@ impl Debug for TableId {
impl TableId {
pub(crate) fn new(in_root: bool, id: u32) -> Result<Self> {
if id <= MIN_TABLE_ID_BOUND {
Err(TypingError::InvalidTableId(id))
Err(TupleSetError::InvalidTableId(id))
} else {
Ok(TableId { in_root, id })
}
@ -78,3 +86,89 @@ impl Debug for TupleSetIdx {
)
}
}
#[derive(Debug, Clone, Eq, PartialEq)]
pub(crate) struct TupleSet {
keys: Vec<ReifiedTuple>,
vals: Vec<ReifiedTuple>,
}
impl TupleSet {
pub(crate) fn push_key(&mut self, t: ReifiedTuple) {
self.keys.push(t);
}
pub(crate) fn push_val(&mut self, v: ReifiedTuple) {
self.vals.push(v);
}
pub(crate) fn merge(&mut self, o: TupleSet) {
self.keys.extend(o.keys);
self.vals.extend(o.vals);
}
pub(crate) fn extend_keys<I, T>(&mut self, keys: I)
where I: IntoIterator<Item=T>,
ReifiedTuple: From<T> {
self.keys.extend(keys.into_iter().map(ReifiedTuple::from));
}
pub(crate) fn extend_vals<I, T>(&mut self, keys: I)
where I: IntoIterator<Item=T>,
ReifiedTuple: From<T> {
self.vals.extend(keys.into_iter().map(ReifiedTuple::from));
}
pub(crate) fn all_keys_eq(&self, other: &Self) -> bool {
if self.keys.len() != other.keys.len() {
return false;
}
for (l, r) in self.keys.iter().zip(&other.keys) {
if !l.key_part_eq(r) {
return false;
}
}
true
}
pub(crate) fn all_keys_cmp(&self, other: &Self) -> Ordering {
for (l, r) in self.keys.iter().zip(&other.keys) {
match l.key_part_cmp(r) {
Ordering::Equal => {}
v => return v,
}
}
Ordering::Equal
}
pub(crate) fn get_value(&self, TupleSetIdx { is_key, t_set, col_idx }: TupleSetIdx) -> Result<Value> {
let tuples = if is_key { &self.keys } else { &self.vals };
let tuple = tuples.get(t_set).ok_or(TupleSetError::IndexOutOfBound(t_set))?;
let res = tuple.get(col_idx)?;
Ok(res)
}
}
impl<I1, T1, I2, T2> From<(I1, I2)> for TupleSet
where I1: IntoIterator<Item=T1>,
ReifiedTuple: From<T1>,
I2: IntoIterator<Item=T2>,
ReifiedTuple: From<T2> {
fn from((keys, vals): (I1, I2)) -> Self {
TupleSet {
keys: keys.into_iter().map(ReifiedTuple::from).collect(),
vals: vals.into_iter().map(ReifiedTuple::from).collect(),
}
}
}
#[cfg(test)]
mod tests {
use std::mem;
use super::*;
#[test]
fn sizes() {
let t = OwnTuple::with_prefix(0);
let t2 = OwnTuple::with_prefix(0);
let ts = TupleSet::from(([t], [t2]));
dbg!(ts);
dbg!(mem::size_of::<ReifiedTuple>());
dbg!(mem::size_of::<TupleSet>());
}
}

@ -77,6 +77,9 @@ impl Debug for Typing {
}
impl Typing {
pub(crate) fn representative_value(&self) -> StaticValue {
todo!()
}
pub(crate) fn coerce<'a>(&self, v: Value<'a>) -> Result<Value<'a>> {
if *self == Typing::Any {
return Ok(v);

@ -20,7 +20,7 @@ pub enum Value<'a> {
DescVal(Reverse<Box<Value<'a>>>),
EndSentinel,
Sentinel, // Acts as "any" in type inference, end value in sorting
}
pub(crate) type StaticValue = Value<'static>;
@ -204,7 +204,7 @@ impl<'a> Display for Value<'a> {
}
f.write_char('}')?;
}
Value::EndSentinel => write!(f, "Sentinel")?,
Value::Sentinel => write!(f, "Sentinel")?,
Value::DescVal(Reverse(v)) => {
write!(f, "~{}", v)?;
}
@ -233,7 +233,7 @@ impl<'a> Value<'a> {
.map(|(k, v)| (Cow::Owned(k.into_owned()), v.to_static()))
.collect::<BTreeMap<Cow<'static, str>, StaticValue>>()
.into(),
Value::EndSentinel => panic!("Cannot process sentinel value"),
Value::Sentinel => panic!("Cannot process sentinel value"),
Value::Bytes(t) => Value::from(t.into_owned()),
Value::DescVal(Reverse(val)) => Value::DescVal(Reverse(val.to_static().into())),
}

@ -1,7 +1,7 @@
use std::{mem, result};
use std::collections::BTreeMap;
use std::collections::{BTreeMap, BTreeSet};
use cozorocks::{BridgeError, DbPtr, destroy_db, OptionsPtrShared, PinnableSlicePtr, ReadOptionsPtr, TDbOptions, TransactionPtr, TransactOptions, WriteOptionsPtr};
use std::sync::{Arc, LockResult, Mutex, PoisonError};
use std::sync::{Arc, LockResult, Mutex, PoisonError, RwLock, RwLockReadGuard, RwLockWriteGuard};
use std::sync::atomic::{AtomicU32, Ordering};
use lazy_static::lazy_static;
use log::error;
@ -10,6 +10,7 @@ use crate::data::tuple::{DataKind, OwnTuple, Tuple, TupleError};
use crate::data::tuple_set::MIN_TABLE_ID_BOUND;
use crate::data::typing::Typing;
use crate::data::value::{StaticValue, Value};
use crate::runtime::instance::DbInstanceError::TableDoesNotExist;
use crate::runtime::options::{default_options, default_read_options, default_txn_db_options, default_txn_options, default_write_options};
#[derive(thiserror::Error, Debug)]
@ -22,6 +23,15 @@ pub enum DbInstanceError {
#[error(transparent)]
Tuple(#[from] TupleError),
#[error("Cannot obtain table access lock")]
TableAccessLock,
#[error("Cannot obtain table mutation lock")]
TableMutationLock,
#[error("Table does not exist: {0}")]
TableDoesNotExist(u32),
}
type Result<T> = result::Result<T, DbInstanceError>;
@ -41,6 +51,8 @@ struct SessionHandle {
status: SessionStatus,
}
type TableLock = Arc<RwLock<()>>;
pub struct DbInstance {
pub(crate) main: DbPtr,
options: OptionsPtrShared,
@ -49,6 +61,7 @@ pub struct DbInstance {
session_handles: Mutex<Vec<Arc<Mutex<SessionHandle>>>>,
optimistic: bool,
destroy_on_close: bool,
table_locks: TableLock,
}
impl DbInstance {
@ -64,6 +77,7 @@ impl DbInstance {
path: path.to_string(),
session_handles: vec![].into(),
destroy_on_close: false,
table_locks: Default::default(),
})
}
}
@ -123,6 +137,7 @@ impl DbInstance {
stack: vec![],
cur_table_id: 0.into(),
params: Default::default(),
table_locks: self.table_locks.clone(),
})
}
@ -181,7 +196,7 @@ impl Drop for DbInstance {
enum SessionDefinable {
Value(StaticValue),
Expr(StaticExpr),
Typing(Typing)
Typing(Typing),
// TODO
}
@ -199,19 +214,16 @@ pub struct Session {
stack: Vec<SessionStackFrame>,
params: BTreeMap<String, StaticValue>,
session_handle: Arc<Mutex<SessionHandle>>,
table_locks: TableLock,
}
pub(crate) struct InterpretContext<'a> {
session: &'a Session,
}
impl <'a> InterpretContext<'a> {
pub(crate) fn resolve(&self, key: impl AsRef<str>) {
}
pub(crate) fn resolve_value(&self, key: impl AsRef<str>) {
}
impl<'a> InterpretContext<'a> {
pub(crate) fn resolve(&self, key: impl AsRef<str>) {}
pub(crate) fn resolve_value(&self, key: impl AsRef<str>) {}
pub(crate) fn resolve_typing(&self, key: impl AsRef<str>) {
todo!()
}
@ -291,6 +303,12 @@ impl Session {
txn.commit()?;
Ok(cur_id + 1)
}
pub(crate) fn table_access_guard(&self, ids: BTreeSet<u32>) -> Result<RwLockReadGuard<()>> {
self.table_locks.try_read().map_err(|_| DbInstanceError::TableAccessLock)
}
pub(crate) fn table_mutation_guard(&self, ids: BTreeSet<u32>) -> Result<RwLockWriteGuard<()>> {
self.table_locks.write().map_err(|_| DbInstanceError::TableAccessLock)
}
}
lazy_static! {

Loading…
Cancel
Save