more principled approaches to storing meta information

main
Ziyang Hu 2 years ago
parent 726107adec
commit 1c6c3777e8

@ -8,6 +8,9 @@ use std::collections::BTreeMap;
use std::fmt::{Debug, Formatter};
use std::result;
use std::sync::Arc;
use crate::parser::{CozoParser, Rule};
use pest::Parser;
use crate::data::parser::ExprParseError;
#[derive(thiserror::Error, Debug)]
pub(crate) enum ExprError {
@ -19,6 +22,12 @@ pub(crate) enum ExprError {
#[error("List extraction failed for {0}")]
ListExtractionFailed(StaticValue),
#[error("Failed to parse {0} into expr")]
Parse(String),
#[error(transparent)]
ParseInner(#[from] ExprParseError)
}
type Result<T> = result::Result<T, ExprError>;
@ -523,3 +532,16 @@ impl<'a> From<Expr<'a>> for Value<'a> {
fn build_tagged_value<'a>(tag: &'static str, val: Value<'a>) -> Value<'a> {
Value::Dict(BTreeMap::from([(tag.into(), val)]))
}
impl <'a> TryFrom<&'a str> for Expr<'a> {
type Error = ExprError;
fn try_from(value: &'a str) -> result::Result<Self, Self::Error> {
let pair = CozoParser::parse(Rule::expr_all, value)
.map_err(|_| ExprError::Parse(value.to_string()))?
.next()
.ok_or_else(|| ExprError::Parse(value.to_string()))?;
Ok(Expr::try_from(pair)?)
}
}

@ -1,6 +1,6 @@
use crate::data::value::Value;
use cozorocks::{PinnableSlicePtr, PinnableSlicePtrShared, SlicePtr, SlicePtrShared};
use std::borrow::Cow;
use std::borrow::{Borrow, Cow};
use std::cell::RefCell;
use std::cmp::{Ordering, Reverse};
use std::collections::BTreeMap;
@ -26,25 +26,38 @@ pub enum TupleError {
type Result<T> = result::Result<T, TupleError>;
const STORAGE_BOOL_FALSE: u8 = 1;
const STORAGE_NULL: u8 = 2;
const STORAGE_BOOL_TRUE: u8 = 3;
const STORAGE_INT: u8 = 4;
const STORAGE_FLOAT: u8 = 5;
const STORAGE_TEXT: u8 = 6;
const STORAGE_UUID: u8 = 7;
const STORAGE_BYTES: u8 = 64;
const STORAGE_LIST: u8 = 128;
const STORAGE_DICT: u8 = 129;
const STORAGE_DESC_VAL: u8 = 192;
const STORAGE_MAX: u8 = 255;
#[repr(u8)]
#[derive(Ord, PartialOrd, Eq, PartialEq)]
pub(crate) enum StorageTag {
BoolFalse = 1,
Null = 2,
BoolTrue = 3,
Int = 4,
Float = 5,
Text = 6,
Uuid = 7,
BoolFalse = STORAGE_BOOL_FALSE,
Null = STORAGE_NULL,
BoolTrue = STORAGE_BOOL_TRUE,
Int = STORAGE_INT,
Float = STORAGE_FLOAT,
Text = STORAGE_TEXT,
Uuid = STORAGE_UUID,
Bytes = 64,
Bytes = STORAGE_BYTES,
List = 128,
Dict = 129,
List = STORAGE_LIST,
Dict = STORAGE_DICT,
DescVal = 192,
DescVal = STORAGE_DESC_VAL,
Max = 255,
Max = STORAGE_MAX,
}
impl TryFrom<u8> for StorageTag {
@ -53,38 +66,49 @@ impl TryFrom<u8> for StorageTag {
fn try_from(u: u8) -> std::result::Result<StorageTag, u8> {
use self::StorageTag::*;
Ok(match u {
1 => BoolFalse,
2 => Null,
3 => BoolTrue,
4 => Int,
5 => Float,
6 => Text,
7 => Uuid,
STORAGE_BOOL_FALSE => BoolFalse,
STORAGE_NULL => Null,
STORAGE_BOOL_TRUE => BoolTrue,
STORAGE_INT => Int,
STORAGE_FLOAT => Float,
STORAGE_TEXT => Text,
STORAGE_UUID => Uuid,
64 => Bytes,
STORAGE_BYTES => Bytes,
128 => List,
129 => Dict,
STORAGE_LIST => List,
STORAGE_DICT => Dict,
192 => DescVal,
STORAGE_DESC_VAL => DescVal,
255 => Max,
STORAGE_MAX => Max,
v => return Err(v),
})
}
}
const DATAKIND_DATA: u32 = 0;
const DATAKIND_NODE: u32 = 1;
const DATAKIND_EDGE: u32 = 2;
const DATAKIND_ASSOC: u32 = 3;
const DATAKIND_INDEX: u32 = 4;
const DATAKIND_SEQUENCE: u32 = 5;
const DATAKIND_VAL: u32 = 11;
const DATAKIND_TYPE: u32 = 12;
const DATAKIND_EMPTY: u32 = u32::MAX;
#[repr(u32)]
#[derive(Ord, PartialOrd, Eq, PartialEq, Debug, Clone)]
pub enum DataKind {
Data = 0,
Node = 1,
Edge = 2,
Assoc = 3,
Index = 4,
Val = 5,
Type = 6,
Empty = u32::MAX,
Data = DATAKIND_DATA,
Node = DATAKIND_NODE,
Edge = DATAKIND_EDGE,
Assoc = DATAKIND_ASSOC,
Index = DATAKIND_INDEX,
Sequence = DATAKIND_SEQUENCE,
Val = DATAKIND_VAL,
Type = DATAKIND_TYPE,
Empty = DATAKIND_EMPTY,
}
// In storage, key layout is `[0, name, stack_depth]` where stack_depth is a non-positive number as zigzag
// Also has inverted index `[0, stack_depth, name]` for easy popping of stacks
@ -95,14 +119,14 @@ impl<T: AsRef<[u8]>> Tuple<T> {
pub fn data_kind(&self) -> Result<DataKind> {
use DataKind::*;
Ok(match self.get_prefix() {
0 => Data,
1 => Node,
2 => Edge,
3 => Assoc,
4 => Index,
5 => Val,
6 => Type,
u32::MAX => Empty,
DATAKIND_DATA => Data,
DATAKIND_NODE => Node,
DATAKIND_EDGE => Edge,
DATAKIND_ASSOC => Assoc,
DATAKIND_INDEX => Index,
DATAKIND_VAL => Val,
DATAKIND_TYPE => Type,
DATAKIND_EMPTY => Empty,
v => return Err(TupleError::UndefinedDataKind(v)),
})
}
@ -116,8 +140,8 @@ impl From<DataKind> for u32 {
#[derive(Clone)]
pub struct Tuple<T>
where
T: AsRef<[u8]>,
where
T: AsRef<[u8]>,
{
pub(crate) data: T,
idx_cache: RefCell<Vec<usize>>,
@ -128,8 +152,8 @@ unsafe impl<T: AsRef<[u8]>> Send for Tuple<T> {}
unsafe impl<T: AsRef<[u8]>> Sync for Tuple<T> {}
impl<T> From<T> for Tuple<T>
where
T: AsRef<[u8]>,
where
T: AsRef<[u8]>,
{
fn from(data: T) -> Self {
Tuple::new(data)
@ -137,8 +161,8 @@ where
}
impl<T> Tuple<T>
where
T: AsRef<[u8]>,
where
T: AsRef<[u8]>,
{
pub(crate) fn clear_cache(&self) {
self.idx_cache.borrow_mut().clear()
@ -146,8 +170,8 @@ where
}
impl<T> AsRef<[u8]> for Tuple<T>
where
T: AsRef<[u8]>,
where
T: AsRef<[u8]>,
{
fn as_ref(&self) -> &[u8] {
self.data.as_ref()
@ -600,6 +624,43 @@ impl OwnTuple {
cache.push(self.data.len());
}
#[inline]
pub(crate) fn push_values_as_list<'a, It: IntoIterator<Item=I>, I: Borrow<Value<'a>> + 'a>
(&mut self, l: It) {
self.push_tag(StorageTag::List);
let start_pos = self.data.len();
let start_len = self.idx_cache.borrow().len();
self.data.extend(0u32.to_be_bytes());
for val in l {
self.push_value(val.borrow());
}
let length = (self.data.len() - start_pos) as u32;
let length_bytes = length.to_be_bytes();
self.data[start_pos..(4 + start_pos)].clone_from_slice(&length_bytes[..4]);
let mut cache = self.idx_cache.borrow_mut();
cache.truncate(start_len);
cache.push(self.data.len());
}
#[inline]
pub(crate) fn push_values_as_dict<'a, I: IntoIterator<Item=(T, &'a Value<'a>)>,
T: AsRef<str> + 'a>
(&mut self, d: I) {
self.push_tag(StorageTag::Dict);
let start_pos = self.data.len();
let start_len = self.idx_cache.borrow().len();
self.data.extend(0u32.to_be_bytes());
for (k, v) in d {
self.push_varint(k.as_ref().len() as u64);
self.data.extend_from_slice(k.as_ref().as_bytes());
self.push_value(v);
}
let length = (self.data.len() - start_pos) as u32;
let length_bytes = length.to_be_bytes();
self.data[start_pos..(4 + start_pos)].clone_from_slice(&length_bytes[..4]);
let mut cache = self.idx_cache.borrow_mut();
cache.truncate(start_len);
cache.push(self.data.len());
}
#[inline]
pub(crate) fn push_value(&mut self, v: &Value) {
match v {
Value::Null => self.push_null(),
@ -609,38 +670,8 @@ impl OwnTuple {
Value::Uuid(u) => self.push_uuid(*u),
Value::Text(t) => self.push_str(t),
Value::Bytes(b) => self.push_bytes(b),
Value::List(l) => {
self.push_tag(StorageTag::List);
let start_pos = self.data.len();
let start_len = self.idx_cache.borrow().len();
self.data.extend(0u32.to_be_bytes());
for val in l {
self.push_value(val);
}
let length = (self.data.len() - start_pos) as u32;
let length_bytes = length.to_be_bytes();
self.data[start_pos..(4 + start_pos)].clone_from_slice(&length_bytes[..4]);
let mut cache = self.idx_cache.borrow_mut();
cache.truncate(start_len);
cache.push(self.data.len());
}
Value::Dict(d) => {
self.push_tag(StorageTag::Dict);
let start_pos = self.data.len();
let start_len = self.idx_cache.borrow().len();
self.data.extend(0u32.to_be_bytes());
for (k, v) in d {
self.push_varint(k.len() as u64);
self.data.extend_from_slice(k.as_bytes());
self.push_value(v);
}
let length = (self.data.len() - start_pos) as u32;
let length_bytes = length.to_be_bytes();
self.data[start_pos..(4 + start_pos)].clone_from_slice(&length_bytes[..4]);
let mut cache = self.idx_cache.borrow_mut();
cache.truncate(start_len);
cache.push(self.data.len());
}
Value::List(l) => self.push_values_as_list(l),
Value::Dict(d) => self.push_values_as_dict(d),
Value::Bottom => self.seal_with_sentinel(),
Value::DescVal(Reverse(v)) => {
self.push_reverse_value(v);
@ -697,7 +728,7 @@ impl OwnTuple {
impl<'a> Extend<Value<'a>> for OwnTuple {
#[inline]
fn extend<T: IntoIterator<Item = Value<'a>>>(&mut self, iter: T) {
fn extend<T: IntoIterator<Item=Value<'a>>>(&mut self, iter: T) {
for v in iter {
self.push_value(&v)
}
@ -720,9 +751,9 @@ impl<T: AsRef<[u8]>> Hash for Tuple<T> {
impl<T: AsRef<[u8]>> Eq for Tuple<T> {}
impl<'a, P, T> From<(P, T)> for OwnTuple
where
T: IntoIterator<Item = &'a Value<'a>>,
P: Into<u32>,
where
T: IntoIterator<Item=&'a Value<'a>>,
P: Into<u32>,
{
fn from((prefix, it): (P, T)) -> Self {
let mut ret = OwnTuple::with_prefix(prefix.into());

@ -1,5 +1,5 @@
use crate::data::tuple::{OwnTuple, ReifiedTuple, TupleError};
use crate::data::value::Value;
use crate::data::value::{StaticValue, Value};
use std::cmp::Ordering;
use std::fmt::{Debug, Formatter};
use std::result;
@ -12,6 +12,8 @@ pub(crate) enum TupleSetError {
IndexOutOfBound(usize),
#[error(transparent)]
Tuple(#[from] TupleError),
#[error("Failed to deserialize {0}")]
Deser(StaticValue),
}
type Result<T> = result::Result<T, TupleSetError>;
@ -24,6 +26,32 @@ pub(crate) struct TableId {
pub(crate) id: u32,
}
impl From<TableId> for StaticValue {
fn from(tid: TableId) -> Self {
Value::from(vec![Value::from(tid.in_root), (tid.id as i64).into()])
}
}
impl<'a> TryFrom<&'a Value<'a>> for TableId {
type Error = TupleSetError;
fn try_from(value: &'a Value<'a>) -> result::Result<Self, Self::Error> {
let make_err = || TupleSetError::Deser(value.clone().to_static());
let fields = value.get_slice().ok_or_else(make_err)?;
let in_root = fields
.get(0)
.ok_or_else(make_err)?
.get_bool()
.ok_or_else(make_err)?;
let id = fields
.get(1)
.ok_or_else(make_err)?
.get_int()
.ok_or_else(make_err)? as u32;
Ok(TableId { in_root, id })
}
}
impl Debug for TableId {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "#{}{}", if self.in_root { 'G' } else { 'L' }, self.id)
@ -104,16 +132,16 @@ impl TupleSet {
self.vals.extend(o.vals);
}
pub(crate) fn extend_keys<I, T>(&mut self, keys: I)
where
I: IntoIterator<Item = T>,
ReifiedTuple: From<T>,
where
I: IntoIterator<Item=T>,
ReifiedTuple: From<T>,
{
self.keys.extend(keys.into_iter().map(ReifiedTuple::from));
}
pub(crate) fn extend_vals<I, T>(&mut self, keys: I)
where
I: IntoIterator<Item = T>,
ReifiedTuple: From<T>,
where
I: IntoIterator<Item=T>,
ReifiedTuple: From<T>,
{
self.vals.extend(keys.into_iter().map(ReifiedTuple::from));
}
@ -157,11 +185,11 @@ impl TupleSet {
}
impl<I1, T1, I2, T2> From<(I1, I2)> for TupleSet
where
I1: IntoIterator<Item = T1>,
ReifiedTuple: From<T1>,
I2: IntoIterator<Item = T2>,
ReifiedTuple: From<T2>,
where
I1: IntoIterator<Item=T1>,
ReifiedTuple: From<T1>,
I2: IntoIterator<Item=T2>,
ReifiedTuple: From<T2>,
{
fn from((keys, vals): (I1, I2)) -> Self {
TupleSet {

@ -23,6 +23,48 @@ pub enum Value<'a> {
Bottom, // Acts as "any" in type inference, end value in sorting
}
impl<'a> Value<'a> {
pub(crate) fn is_null(&self) -> bool {
*self == Value::Null
}
pub(crate) fn get_bool(&self) -> Option<bool> {
match self {
Value::Bool(b) => Some(*b),
_ => None
}
}
pub(crate) fn get_int(&self) -> Option<i64> {
match self {
Value::Int(b) => Some(*b),
_ => None
}
}
pub(crate) fn get_float(&self) -> Option<f64> {
match self {
Value::Float(b) => Some(b.into_inner()),
_ => None
}
}
pub(crate) fn get_str(&self) -> Option<&str> {
match self {
Value::Text(b) => Some(b.as_ref()),
_ => None
}
}
pub(crate) fn get_slice(&self) -> Option<&[Value<'a>]> {
match self {
Value::List(l) => Some(l),
_ => None
}
}
pub(crate) fn get_map(&self) -> Option<&BTreeMap<Cow<str>, Value>> {
match self {
Value::Dict(m) => Some(m),
_ => None
}
}
}
pub(crate) type StaticValue = Value<'static>;
impl<'a> Debug for Value<'a> {

@ -1,8 +1,8 @@
use std::result;
use crate::data::expr::{Expr, StaticExpr};
use crate::data::expr::{Expr, ExprError, StaticExpr};
use crate::data::parser::ExprParseError;
use crate::data::typing::{Typing, TypingError};
use crate::data::value::Value;
use crate::data::value::{StaticValue, Value};
use crate::parser::{Pair, Rule};
use crate::parser::text_identifier::{build_name_in_def, TextParseError};
@ -19,6 +19,12 @@ pub(crate) enum DdlParseError {
#[error("definition error: {0}")]
Definition(&'static str),
#[error("failed to deserialize col schema")]
ColSchemaDeser(StaticValue),
#[error(transparent)]
Expr(#[from] ExprError),
}
type Result<T> = result::Result<T, DdlParseError>;
@ -30,6 +36,35 @@ pub(crate) struct ColSchema {
pub(crate) default: StaticExpr,
}
impl From<ColSchema> for StaticValue {
fn from(s: ColSchema) -> Self {
Value::from(vec![
Value::from(s.name),
Value::from(s.typing.to_string()),
Value::from(s.default),
])
}
}
impl<'a> TryFrom<Value<'a>> for ColSchema {
type Error = DdlParseError;
fn try_from(value: Value<'a>) -> Result<Self> {
let mk_err = || DdlParseError::ColSchemaDeser(value.clone().to_static());
let fields = value.get_slice().ok_or_else(mk_err)?;
let name = fields.get(0).ok_or_else(mk_err)?.get_str().ok_or_else(mk_err)?.to_string();
let typing = fields.get(1).ok_or_else(mk_err)?.get_str().ok_or_else(mk_err)?;
let typing = Typing::try_from(typing)?;
let default = fields.get(1).ok_or_else(mk_err)?.get_str().ok_or_else(mk_err)?;
let default = Expr::try_from(default)?.to_static();
Ok(Self {
name,
typing,
default
})
}
}
#[derive(Debug, Clone)]
pub(crate) struct NodeSchema {
pub(crate) name: String,
@ -170,7 +205,7 @@ impl<'a> TryFrom<Pair<'a>> for IndexSchema {
name: index_name,
src_name: main_name,
assoc_names: associate_names,
index: indices
index: indices,
})
}
}

@ -3,9 +3,12 @@ use std::result;
use chrono::format::Item;
use crate::data::eval::{EvalError, PartialEvalContext};
use crate::data::expr::{Expr, StaticExpr};
use crate::data::tuple::{DataKind, OwnTuple};
use crate::data::tuple_set::{ColId, TableId, TupleSetIdx};
use crate::data::value::Value;
use crate::data::value::{StaticValue, Value};
use crate::ddl::parser::{AssocSchema, ColSchema, DdlSchema, EdgeSchema, IndexSchema, NodeSchema, SequenceSchema};
use crate::runtime::instance::DbInstanceError;
use crate::runtime::session::Session;
#[derive(thiserror::Error, Debug)]
pub(crate) enum DdlReifyError {
@ -14,6 +17,9 @@ pub(crate) enum DdlReifyError {
#[error(transparent)]
Eval(#[from] EvalError),
#[error(transparent)]
Instance(#[from] DbInstanceError),
}
type Result<T> = result::Result<T, DdlReifyError>;
@ -31,13 +37,74 @@ pub(crate) enum TableKind {
pub(crate) enum TableInfo {
Node(NodeInfo),
Edge(EdgeInfo),
Assoc(AssocInfo),
Index(IndexInfo),
Sequence(SequenceInfo),
}
impl TableInfo {
pub(crate) fn table_id(&self) -> TableId {
match self {
TableInfo::Node(n) => n.tid,
TableInfo::Edge(e) => e.tid
TableInfo::Edge(e) => e.tid,
TableInfo::Assoc(a) => a.tid,
TableInfo::Index(i) => i.tid,
TableInfo::Sequence(s) => s.tid
}
}
}
impl From<&TableInfo> for OwnTuple {
fn from(ti: &TableInfo) -> Self {
match ti {
TableInfo::Node(NodeInfo{name, tid, keys, vals}) => {
let mut target = OwnTuple::with_data_prefix(DataKind::Node);
target.push_str(name);
target.push_value(&Value::from(*tid));
let keys = keys.iter().map(|k| Value::from(k.clone()));
target.push_values_as_list(keys);
let vals = vals.iter().map(|k| Value::from(k.clone()));
target.push_values_as_list(vals);
target
}
TableInfo::Edge(EdgeInfo{name, tid, src_id, dst_id, keys, vals,}) => {
let mut target = OwnTuple::with_data_prefix(DataKind::Edge);
target.push_str(name);
target.push_value(&Value::from(*tid));
let keys = keys.iter().map(|k| Value::from(k.clone()));
target.push_values_as_list(keys);
let vals = vals.iter().map(|k| Value::from(k.clone()));
target.push_values_as_list(vals);
target.push_value(&Value::from(*src_id));
target.push_value(&Value::from(*dst_id));
target
}
TableInfo::Assoc(AssocInfo{ name, tid, src_id, vals }) => {
let mut target = OwnTuple::with_data_prefix(DataKind::Assoc);
target.push_str(name);
target.push_value(&Value::from(*tid));
let vals = vals.iter().map(|k| Value::from(k.clone()));
target.push_values_as_list(vals);
target.push_value(&Value::from(*src_id));
target
},
TableInfo::Index(IndexInfo{ name, tid, src_id, assoc_ids, index }) => {
let mut target = OwnTuple::with_data_prefix(DataKind::Index);
target.push_str(name);
target.push_value(&Value::from(*tid));
let indices = index.iter().map(|i| Value::from(i.clone()));
target.push_values_as_list(indices);
target.push_value(&Value::from(*src_id));
let assoc_ids = assoc_ids.iter().map(|v| Value::from(*v));
target.push_values_as_list(assoc_ids);
target
},
TableInfo::Sequence(SequenceInfo{ name, tid }) => {
let mut target = OwnTuple::with_data_prefix(DataKind::Sequence);
target.push_str(name);
target.push_value(&Value::from(*tid));
target
}
}
}
}
@ -78,13 +145,14 @@ pub(crate) struct IndexInfo {
pub(crate) index: Vec<StaticExpr>,
}
#[derive(Debug, Clone)]
pub(crate) struct SequenceInfo {
pub(crate) name: String,
pub(crate) tid: TableId,
}
pub(crate) trait DdlContext {
fn gen_table_id(&mut self) -> TableId;
fn gen_table_id(&mut self) -> Result<TableId>;
fn resolve_table_id_for_derivation<I: IntoIterator<Item=TableKind>>(&self, name: &str, kind: I) -> Result<TableId>;
fn resolve_table<I: IntoIterator<Item=TableKind>>(&self, name: &str, kind: I, for_derivation: bool) -> Result<TableInfo>;
fn resolve_table_by_id(&self, tid: TableId) -> Result<TableInfo>;
@ -103,26 +171,24 @@ pub(crate) trait DdlContext {
check_name_clash([&schema.keys, &schema.vals])?;
let info = NodeInfo {
name: schema.name,
tid: self.gen_table_id(),
tid: self.gen_table_id()?,
keys: eval_defaults(schema.keys)?,
vals: eval_defaults(schema.vals)?,
};
self.store_node(info)
self.store_table(TableInfo::Node(info))
}
fn store_node(&mut self, info: NodeInfo) -> Result<()>;
fn build_edge(&mut self, schema: EdgeSchema) -> Result<()> {
check_name_clash([&schema.keys, &schema.vals])?;
let info = EdgeInfo {
name: schema.name,
tid: self.gen_table_id(),
tid: self.gen_table_id()?,
src_id: self.resolve_table_id_for_derivation(&schema.src_name, [TableKind::Node])?,
dst_id: self.resolve_table_id_for_derivation(&schema.dst_name, [TableKind::Node])?,
keys: eval_defaults(schema.keys)?,
vals: eval_defaults(schema.vals)?,
};
self.store_edge(info)
self.store_table(TableInfo::Edge(info))
}
fn store_edge(&mut self, info: EdgeInfo) -> Result<()>;
fn build_assoc(&mut self, schema: AssocSchema) -> Result<()> {
let src_info = self.resolve_table(&schema.src_name, [TableKind::Node, TableKind::Edge], true)?;
let src_id = src_info.table_id();
@ -132,13 +198,12 @@ pub(crate) trait DdlContext {
check_name_clash(names_to_check)?;
let info = AssocInfo {
name: schema.name,
tid: self.gen_table_id(),
tid: self.gen_table_id()?,
src_id,
vals: eval_defaults(schema.vals)?,
};
self.store_assoc(info)
self.store_table(TableInfo::Assoc(info))
}
fn store_assoc(&mut self, info: AssocInfo) -> Result<()>;
fn build_index(&mut self, schema: IndexSchema) -> Result<()> {
let src_schema = self.resolve_table(&schema.src_name, [TableKind::Node, TableKind::Edge], true)?;
let associates = self.resolve_associates_for(src_schema.table_id());
@ -176,28 +241,28 @@ pub(crate) trait DdlContext {
ex.partial_eval(&ctx).map(|ex| ex.to_static()))
.collect::<result::Result<Vec<_>, _>>()?
}
_ => unreachable!()
};
let info = IndexInfo {
name: schema.name,
tid: self.gen_table_id(),
tid: self.gen_table_id()?,
src_id: src_schema.table_id(),
assoc_ids: schema.assoc_names.iter().map(|n|
self.resolve_table_id_for_derivation(n, [TableKind::Assoc]))
.collect::<Result<Vec<_>>>()?,
index: index_exprs,
};
self.store_index(info)
self.store_table(TableInfo::Index(info))
}
fn store_index(&mut self, info: IndexInfo) -> Result<()>;
fn build_sequence(&mut self, schema: SequenceSchema) -> Result<()> {
let tid = self.gen_table_id();
self.store_sequence(SequenceInfo {
let tid = self.gen_table_id()?;
self.store_table(TableInfo::Sequence(SequenceInfo {
name: schema.name,
tid,
})
}))
}
fn store_sequence(&mut self, info: SequenceInfo) -> Result<()>;
fn store_table(&mut self, info: TableInfo) -> Result<()>;
}
fn check_name_clash<'a, I: IntoIterator<Item=II>, II: IntoIterator<Item=&'a ColSchema>>(kvs: I) -> Result<()> {
@ -346,3 +411,42 @@ impl<'a> PartialEvalContext for EdgeDefEvalCtx<'a> {
self.resolve_name(key).map(Expr::TupleSetIdx)
}
}
struct MainDbContext<'a> {
sess: &'a Session,
}
// impl<'a> DdlContext for MainDbContext<'a> {
// fn gen_table_id(&mut self) -> Result<TableId> {
// let id = self.sess.get_next_main_table_id()?;
// Ok(TableId { in_root: true, id })
// }
// }
//
// impl<'a> DdlContext for TempDbContext<'a> {
// fn gen_table_id(&mut self) -> Result<TableId> {
// let id = self.sess.get_next_temp_table_id();
// Ok(TableId { in_root: false, id })
// }
// fn resolve_table<I: IntoIterator<Item=TableKind>>(&self, name: &str, kind: I, for_derivation: bool) -> Result<TableInfo> {
// todo!()
// }
// }
struct TempDbContext<'a> {
sess: &'a Session,
}
impl Session {
fn main_ctx(&self) -> MainDbContext {
MainDbContext {
sess: self
}
}
fn temp_ctx(&self) -> TempDbContext {
TempDbContext {
sess: self
}
}
}

@ -1,3 +1,3 @@
pub(crate) mod instance;
pub(crate) mod interpreter;
pub(crate) mod session;
pub(crate) mod options;

@ -1,22 +1,11 @@
use crate::data::expr::StaticExpr;
use crate::data::tuple::{DataKind, OwnTuple, Tuple, TupleError};
use crate::data::tuple_set::MIN_TABLE_ID_BOUND;
use crate::data::typing::Typing;
use crate::data::value::{StaticValue, Value};
use crate::runtime::options::{
default_options, default_read_options, default_txn_db_options, default_txn_options,
default_write_options,
};
use cozorocks::{
destroy_db, BridgeError, DbPtr, OptionsPtrShared, ReadOptionsPtr, TDbOptions, TransactionPtr,
WriteOptionsPtr,
};
use lazy_static::lazy_static;
use crate::runtime::options::*;
use cozorocks::*;
use log::error;
use std::collections::{BTreeMap, BTreeSet};
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::{Arc, Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
use std::sync::{Arc, Mutex, RwLock, RwLockWriteGuard};
use std::{mem, result};
use crate::data::tuple::TupleError;
use crate::runtime::session::Session;
#[derive(thiserror::Error, Debug)]
pub enum DbInstanceError {
@ -48,14 +37,14 @@ pub enum SessionStatus {
Completed,
}
struct SessionHandle {
pub(crate) struct SessionHandle {
id: usize,
db: DbPtr,
next_table_id: u32,
status: SessionStatus,
pub(crate) next_table_id: u32,
pub(crate) status: SessionStatus,
}
type TableLock = Arc<RwLock<()>>;
pub(crate) type TableLock = Arc<RwLock<()>>;
pub struct DbInstance {
pub(crate) main: DbPtr,
@ -195,143 +184,6 @@ impl Drop for DbInstance {
}
}
enum SessionDefinable {
Value(StaticValue),
Expr(StaticExpr),
Typing(Typing),
// TODO
}
type SessionStackFrame = BTreeMap<String, SessionDefinable>;
pub struct Session {
pub(crate) main: DbPtr,
pub(crate) temp: DbPtr,
pub(crate) r_opts_main: ReadOptionsPtr,
pub(crate) r_opts_temp: ReadOptionsPtr,
pub(crate) w_opts_main: WriteOptionsPtr,
pub(crate) w_opts_temp: WriteOptionsPtr,
optimistic: bool,
cur_table_id: AtomicU32,
stack: Vec<SessionStackFrame>,
params: BTreeMap<String, StaticValue>,
session_handle: Arc<Mutex<SessionHandle>>,
table_locks: TableLock,
}
pub(crate) struct InterpretContext<'a> {
session: &'a Session,
}
impl<'a> InterpretContext<'a> {
pub(crate) fn resolve(&self, key: impl AsRef<str>) {}
pub(crate) fn resolve_value(&self, key: impl AsRef<str>) {}
pub(crate) fn resolve_typing(&self, key: impl AsRef<str>) {
todo!()
}
// also for expr, table, etc..
}
impl Session {
pub fn start(mut self) -> Result<Self> {
{
self.push_env();
let mut handle = self
.session_handle
.lock()
.map_err(|_| DbInstanceError::SessionLock)?;
handle.status = SessionStatus::Running;
self.cur_table_id = handle.next_table_id.into();
}
Ok(self)
}
pub(crate) fn push_env(&mut self) {
self.stack.push(BTreeMap::new());
}
pub(crate) fn pop_env(&mut self) {
if self.stack.len() > 1 {
self.stack.pop();
}
}
fn clear_data(&self) -> Result<()> {
self.temp.del_range(
&self.w_opts_temp,
Tuple::with_null_prefix(),
Tuple::max_tuple(),
)?;
Ok(())
}
pub fn stop(&mut self) -> Result<()> {
self.clear_data()?;
let mut handle = self.session_handle.lock().map_err(|_| {
error!("failed to stop interpreter");
DbInstanceError::SessionLock
})?;
handle.next_table_id = self.cur_table_id.load(Ordering::SeqCst);
handle.status = SessionStatus::Completed;
Ok(())
}
pub(crate) fn get_next_temp_table_id(&self) -> u32 {
let mut res = self.cur_table_id.fetch_add(1, Ordering::SeqCst);
while res.wrapping_add(1) < MIN_TABLE_ID_BOUND {
res = self
.cur_table_id
.fetch_add(MIN_TABLE_ID_BOUND, Ordering::SeqCst);
}
res + 1
}
pub(crate) fn txn(&self, w_opts: Option<WriteOptionsPtr>) -> TransactionPtr {
self.main.txn(
default_txn_options(self.optimistic),
w_opts.unwrap_or_else(default_write_options),
)
}
pub(crate) fn get_next_main_table_id(&self) -> Result<u32> {
let txn = self.txn(None);
let key = MAIN_DB_TABLE_ID_SEQ_KEY.as_ref();
let cur_id = match txn.get_owned(&self.r_opts_main, key)? {
None => {
let val = OwnTuple::from((DataKind::Data, &[(MIN_TABLE_ID_BOUND as i64).into()]));
txn.put(key, &val)?;
MIN_TABLE_ID_BOUND
}
Some(pt) => {
let pt = Tuple::from(pt);
let prev_id = pt.get_int(0)?;
let val = OwnTuple::from((DataKind::Data, &[(prev_id + 1).into()]));
txn.put(key, &val)?;
(prev_id + 1) as u32
}
};
txn.commit()?;
Ok(cur_id + 1)
}
pub(crate) fn table_access_guard(&self, ids: BTreeSet<u32>) -> Result<RwLockReadGuard<()>> {
self.table_locks
.try_read()
.map_err(|_| DbInstanceError::TableAccessLock)
}
pub(crate) fn table_mutation_guard(&self, ids: BTreeSet<u32>) -> Result<RwLockWriteGuard<()>> {
self.table_locks
.write()
.map_err(|_| DbInstanceError::TableAccessLock)
}
}
lazy_static! {
static ref MAIN_DB_TABLE_ID_SEQ_KEY: OwnTuple = OwnTuple::from((0u32, &[Value::Null]));
}
impl Drop for Session {
fn drop(&mut self) {
if let Err(e) = self.stop() {
error!("failed to drop session {:?}", e);
}
}
}
#[cfg(test)]
mod tests {

@ -0,0 +1,155 @@
use std::collections::{BTreeMap, BTreeSet};
use std::result;
use std::sync::{Arc, Mutex, RwLockReadGuard, RwLockWriteGuard};
use std::sync::atomic::{AtomicU32, Ordering};
use lazy_static::lazy_static;
use log::error;
use cozorocks::{DbPtr, ReadOptionsPtr, TransactionPtr, WriteOptionsPtr};
use crate::data::expr::StaticExpr;
use crate::data::tuple::{DataKind, OwnTuple, Tuple};
use crate::data::tuple_set::MIN_TABLE_ID_BOUND;
use crate::data::typing::Typing;
use crate::data::value::{Value, StaticValue};
use crate::runtime::instance::{DbInstanceError, SessionHandle, SessionStatus, TableLock};
use crate::runtime::options::{default_txn_options, default_write_options};
type Result<T> = result::Result<T, DbInstanceError>;
pub(crate) enum SessionDefinable {
Value(StaticValue),
Expr(StaticExpr),
Typing(Typing),
// TODO
}
pub(crate) type SessionStackFrame = BTreeMap<String, SessionDefinable>;
pub struct Session {
pub(crate) main: DbPtr,
pub(crate) temp: DbPtr,
pub(crate) r_opts_main: ReadOptionsPtr,
pub(crate) r_opts_temp: ReadOptionsPtr,
pub(crate) w_opts_main: WriteOptionsPtr,
pub(crate) w_opts_temp: WriteOptionsPtr,
pub(crate) optimistic: bool,
pub(crate) cur_table_id: AtomicU32,
pub(crate) stack: Vec<SessionStackFrame>,
pub(crate) params: BTreeMap<String, StaticValue>,
pub(crate) session_handle: Arc<Mutex<SessionHandle>>,
pub(crate) table_locks: TableLock,
}
pub(crate) struct InterpretContext<'a> {
session: &'a Session,
}
impl<'a> InterpretContext<'a> {
pub(crate) fn resolve(&self, key: impl AsRef<str>) {}
pub(crate) fn resolve_value(&self, key: impl AsRef<str>) {}
pub(crate) fn resolve_typing(&self, key: impl AsRef<str>) {
todo!()
}
// also for expr, table, etc..
}
impl Session {
pub fn start(mut self) -> Result<Self> {
{
self.push_env();
let mut handle = self
.session_handle
.lock()
.map_err(|_| DbInstanceError::SessionLock)?;
handle.status = SessionStatus::Running;
self.cur_table_id = handle.next_table_id.into();
}
Ok(self)
}
pub(crate) fn push_env(&mut self) {
self.stack.push(BTreeMap::new());
}
pub(crate) fn pop_env(&mut self) {
if self.stack.len() > 1 {
self.stack.pop();
}
}
fn clear_data(&self) -> Result<()> {
self.temp.del_range(
&self.w_opts_temp,
Tuple::with_null_prefix(),
Tuple::max_tuple(),
)?;
Ok(())
}
pub fn stop(&mut self) -> Result<()> {
self.clear_data()?;
let mut handle = self.session_handle.lock().map_err(|_| {
error!("failed to stop interpreter");
DbInstanceError::SessionLock
})?;
handle.next_table_id = self.cur_table_id.load(Ordering::SeqCst);
handle.status = SessionStatus::Completed;
Ok(())
}
pub(crate) fn get_next_temp_table_id(&self) -> u32 {
let mut res = self.cur_table_id.fetch_add(1, Ordering::SeqCst);
while res.wrapping_add(1) < MIN_TABLE_ID_BOUND {
res = self
.cur_table_id
.fetch_add(MIN_TABLE_ID_BOUND, Ordering::SeqCst);
}
res + 1
}
pub(crate) fn txn(&self, w_opts: Option<WriteOptionsPtr>) -> TransactionPtr {
self.main.txn(
default_txn_options(self.optimistic),
w_opts.unwrap_or_else(default_write_options),
)
}
pub(crate) fn get_next_main_table_id(&self) -> Result<u32> {
let txn = self.txn(None);
let key = MAIN_DB_TABLE_ID_SEQ_KEY.as_ref();
let cur_id = match txn.get_owned(&self.r_opts_main, key)? {
None => {
let val = OwnTuple::from((DataKind::Data, &[(MIN_TABLE_ID_BOUND as i64).into()]));
txn.put(key, &val)?;
MIN_TABLE_ID_BOUND
}
Some(pt) => {
let pt = Tuple::from(pt);
let prev_id = pt.get_int(0)?;
let val = OwnTuple::from((DataKind::Data, &[(prev_id + 1).into()]));
txn.put(key, &val)?;
(prev_id + 1) as u32
}
};
txn.commit()?;
Ok(cur_id + 1)
}
pub(crate) fn table_access_guard(&self, ids: BTreeSet<u32>) -> Result<RwLockReadGuard<()>> {
self.table_locks
.try_read()
.map_err(|_| DbInstanceError::TableAccessLock)
}
pub(crate) fn table_mutation_guard(&self, ids: BTreeSet<u32>) -> Result<RwLockWriteGuard<()>> {
self.table_locks
.write()
.map_err(|_| DbInstanceError::TableAccessLock)
}
}
lazy_static! {
static ref MAIN_DB_TABLE_ID_SEQ_KEY: OwnTuple = OwnTuple::from((0u32, &[Value::Null]));
}
impl Drop for Session {
fn drop(&mut self) {
if let Err(e) = self.stop() {
error!("failed to drop session {:?}", e);
}
}
}
Loading…
Cancel
Save