store table for temp

main
Ziyang Hu 2 years ago
parent 1c6c3777e8
commit d51a3c482d

@ -90,9 +90,10 @@ impl TryFrom<u8> for StorageTag {
const DATAKIND_DATA: u32 = 0;
const DATAKIND_NODE: u32 = 1;
const DATAKIND_EDGE: u32 = 2;
const DATAKIND_ASSOC: u32 = 3;
const DATAKIND_INDEX: u32 = 4;
const DATAKIND_SEQUENCE: u32 = 5;
const DATAKIND_EDGE_BWD: u32 = 3;
const DATAKIND_ASSOC: u32 = 4;
const DATAKIND_INDEX: u32 = 5;
const DATAKIND_SEQUENCE: u32 = 6;
const DATAKIND_VAL: u32 = 11;
const DATAKIND_TYPE: u32 = 12;
const DATAKIND_EMPTY: u32 = u32::MAX;
@ -103,6 +104,7 @@ pub enum DataKind {
Data = DATAKIND_DATA,
Node = DATAKIND_NODE,
Edge = DATAKIND_EDGE,
EdgeBwd = DATAKIND_EDGE_BWD,
Assoc = DATAKIND_ASSOC,
Index = DATAKIND_INDEX,
Sequence = DATAKIND_SEQUENCE,
@ -122,6 +124,7 @@ impl<T: AsRef<[u8]>> Tuple<T> {
DATAKIND_DATA => Data,
DATAKIND_NODE => Node,
DATAKIND_EDGE => Edge,
DATAKIND_EDGE_BWD => EdgeBwd,
DATAKIND_ASSOC => Assoc,
DATAKIND_INDEX => Index,
DATAKIND_VAL => Val,

@ -1,6 +1,8 @@
use std::collections::BTreeSet;
use std::collections::btree_map::Entry;
use std::collections::{BTreeMap, BTreeSet};
use std::result;
use chrono::format::Item;
use cozorocks::TransactionPtr;
use crate::data::eval::{EvalError, PartialEvalContext};
use crate::data::expr::{Expr, StaticExpr};
use crate::data::tuple::{DataKind, OwnTuple};
@ -8,7 +10,8 @@ use crate::data::tuple_set::{ColId, TableId, TupleSetIdx};
use crate::data::value::{StaticValue, Value};
use crate::ddl::parser::{AssocSchema, ColSchema, DdlSchema, EdgeSchema, IndexSchema, NodeSchema, SequenceSchema};
use crate::runtime::instance::DbInstanceError;
use crate::runtime::session::Session;
use crate::runtime::instance::DbInstanceError::NameConflict;
use crate::runtime::session::{Session, SessionDefinable};
#[derive(thiserror::Error, Debug)]
pub(crate) enum DdlReifyError {
@ -20,6 +23,9 @@ pub(crate) enum DdlReifyError {
#[error(transparent)]
Instance(#[from] DbInstanceError),
#[error(transparent)]
Bridge(#[from] cozorocks::BridgeError),
}
type Result<T> = result::Result<T, DdlReifyError>;
@ -52,12 +58,21 @@ impl TableInfo {
TableInfo::Sequence(s) => s.tid
}
}
pub(crate) fn table_name(&self) -> &str {
match self {
TableInfo::Node(t) => &t.name,
TableInfo::Edge(t) => &t.name,
TableInfo::Assoc(t) => &t.name,
TableInfo::Index(t) => &t.name,
TableInfo::Sequence(t) => &t.name,
}
}
}
impl From<&TableInfo> for OwnTuple {
fn from(ti: &TableInfo) -> Self {
match ti {
TableInfo::Node(NodeInfo{name, tid, keys, vals}) => {
TableInfo::Node(NodeInfo { name, tid, keys, vals }) => {
let mut target = OwnTuple::with_data_prefix(DataKind::Node);
target.push_str(name);
target.push_value(&Value::from(*tid));
@ -67,7 +82,7 @@ impl From<&TableInfo> for OwnTuple {
target.push_values_as_list(vals);
target
}
TableInfo::Edge(EdgeInfo{name, tid, src_id, dst_id, keys, vals,}) => {
TableInfo::Edge(EdgeInfo { name, tid, src_id, dst_id, keys, vals, }) => {
let mut target = OwnTuple::with_data_prefix(DataKind::Edge);
target.push_str(name);
target.push_value(&Value::from(*tid));
@ -79,7 +94,7 @@ impl From<&TableInfo> for OwnTuple {
target.push_value(&Value::from(*dst_id));
target
}
TableInfo::Assoc(AssocInfo{ name, tid, src_id, vals }) => {
TableInfo::Assoc(AssocInfo { name, tid, src_id, vals }) => {
let mut target = OwnTuple::with_data_prefix(DataKind::Assoc);
target.push_str(name);
target.push_value(&Value::from(*tid));
@ -87,8 +102,8 @@ impl From<&TableInfo> for OwnTuple {
target.push_values_as_list(vals);
target.push_value(&Value::from(*src_id));
target
},
TableInfo::Index(IndexInfo{ name, tid, src_id, assoc_ids, index }) => {
}
TableInfo::Index(IndexInfo { name, tid, src_id, assoc_ids, index }) => {
let mut target = OwnTuple::with_data_prefix(DataKind::Index);
target.push_str(name);
target.push_value(&Value::from(*tid));
@ -98,8 +113,8 @@ impl From<&TableInfo> for OwnTuple {
let assoc_ids = assoc_ids.iter().map(|v| Value::from(*v));
target.push_values_as_list(assoc_ids);
target
},
TableInfo::Sequence(SequenceInfo{ name, tid }) => {
}
TableInfo::Sequence(SequenceInfo { name, tid }) => {
let mut target = OwnTuple::with_data_prefix(DataKind::Sequence);
target.push_str(name);
target.push_value(&Value::from(*tid));
@ -153,10 +168,24 @@ pub(crate) struct SequenceInfo {
pub(crate) trait DdlContext {
fn gen_table_id(&mut self) -> Result<TableId>;
fn resolve_table_id_for_derivation<I: IntoIterator<Item=TableKind>>(&self, name: &str, kind: I) -> Result<TableId>;
fn resolve_table<I: IntoIterator<Item=TableKind>>(&self, name: &str, kind: I, for_derivation: bool) -> Result<TableInfo>;
fn resolve_table_by_id(&self, tid: TableId) -> Result<TableInfo>;
fn resolve_associates_for(&self, id: TableId) -> Vec<AssocInfo>;
fn table_id_by_name<I: IntoIterator<Item=TableKind>>(&self, name: &str, kind: I, for_derivation: bool) -> Result<TableId> {
todo!()
}
fn table_by_name<I: IntoIterator<Item=TableKind>>(&self, name: &str, kind: I, for_derivation: bool) -> Result<TableInfo> {
todo!()
}
fn table_by_id(&self, tid: TableId) -> Result<TableInfo> {
todo!()
}
fn assocs_by_main_id(&self, id: TableId) -> Vec<AssocInfo> {
todo!()
}
fn edges_by_main_id(&self, id: TableId) -> Vec<EdgeInfo> {
todo!()
}
fn indices_by_main_id(&self, id: TableId) -> Vec<IndexInfo> {
todo!()
}
fn build_table(&mut self, schema: DdlSchema) -> Result<()> {
match schema {
DdlSchema::Node(n) => self.build_node(n)?,
@ -182,17 +211,17 @@ pub(crate) trait DdlContext {
let info = EdgeInfo {
name: schema.name,
tid: self.gen_table_id()?,
src_id: self.resolve_table_id_for_derivation(&schema.src_name, [TableKind::Node])?,
dst_id: self.resolve_table_id_for_derivation(&schema.dst_name, [TableKind::Node])?,
src_id: self.table_id_by_name(&schema.src_name, [TableKind::Node], true)?,
dst_id: self.table_id_by_name(&schema.dst_name, [TableKind::Node], true)?,
keys: eval_defaults(schema.keys)?,
vals: eval_defaults(schema.vals)?,
};
self.store_table(TableInfo::Edge(info))
}
fn build_assoc(&mut self, schema: AssocSchema) -> Result<()> {
let src_info = self.resolve_table(&schema.src_name, [TableKind::Node, TableKind::Edge], true)?;
let src_info = self.table_by_name(&schema.src_name, [TableKind::Node, TableKind::Edge], true)?;
let src_id = src_info.table_id();
let associates = self.resolve_associates_for(src_id);
let associates = self.assocs_by_main_id(src_id);
let mut names_to_check: Vec<_> = associates.iter().map(|ai| &ai.vals).collect();
names_to_check.push(&schema.vals);
check_name_clash(names_to_check)?;
@ -205,8 +234,8 @@ pub(crate) trait DdlContext {
self.store_table(TableInfo::Assoc(info))
}
fn build_index(&mut self, schema: IndexSchema) -> Result<()> {
let src_schema = self.resolve_table(&schema.src_name, [TableKind::Node, TableKind::Edge], true)?;
let associates = self.resolve_associates_for(src_schema.table_id());
let src_schema = self.table_by_name(&schema.src_name, [TableKind::Node, TableKind::Edge], true)?;
let associates = self.assocs_by_main_id(src_schema.table_id());
let assoc_vals = associates.iter().map(|v| v.vals.as_slice()).collect::<Vec<_>>();
let index_exprs = match &src_schema {
TableInfo::Node(node_info) => {
@ -220,12 +249,12 @@ pub(crate) trait DdlContext {
.collect::<result::Result<Vec<_>, _>>()?
}
TableInfo::Edge(edge_info) => {
let src_info = self.resolve_table_by_id(edge_info.src_id)?;
let src_info = self.table_by_id(edge_info.src_id)?;
let src_keys = match &src_info {
TableInfo::Node(n) => &n.keys,
_ => unreachable!()
};
let dst_info = self.resolve_table_by_id(edge_info.dst_id)?;
let dst_info = self.table_by_id(edge_info.dst_id)?;
let dst_keys = match &dst_info {
TableInfo::Node(n) => &n.keys,
_ => unreachable!()
@ -249,7 +278,7 @@ pub(crate) trait DdlContext {
tid: self.gen_table_id()?,
src_id: src_schema.table_id(),
assoc_ids: schema.assoc_names.iter().map(|n|
self.resolve_table_id_for_derivation(n, [TableKind::Assoc]))
self.table_id_by_name(n, [TableKind::Assoc], true))
.collect::<Result<Vec<_>>>()?,
index: index_exprs,
};
@ -263,6 +292,7 @@ pub(crate) trait DdlContext {
}))
}
fn store_table(&mut self, info: TableInfo) -> Result<()>;
fn commit(&mut self) -> Result<()>;
}
fn check_name_clash<'a, I: IntoIterator<Item=II>, II: IntoIterator<Item=&'a ColSchema>>(kvs: I) -> Result<()> {
@ -415,36 +445,95 @@ impl<'a> PartialEvalContext for EdgeDefEvalCtx<'a> {
struct MainDbContext<'a> {
sess: &'a Session,
txn: TransactionPtr,
}
// impl<'a> DdlContext for MainDbContext<'a> {
// fn gen_table_id(&mut self) -> Result<TableId> {
// let id = self.sess.get_next_main_table_id()?;
// Ok(TableId { in_root: true, id })
// }
// }
impl<'a> DdlContext for MainDbContext<'a> {
fn gen_table_id(&mut self) -> Result<TableId> {
let id = self.sess.get_next_main_table_id()?;
Ok(TableId { in_root: true, id })
}
fn store_table(&mut self, info: TableInfo) -> Result<()> {
let tid = info.table_id().id;
let tname = info.table_name();
match &info {
TableInfo::Index(_) => {}
TableInfo::Edge(_) => {}
TableInfo::Assoc(_) => {}
TableInfo::Node(_) => {}
TableInfo::Sequence(_) => {}
}
todo!()
}
fn commit(&mut self) -> Result<()> {
Ok(self.txn.commit()?)
}
}
//
// impl<'a> DdlContext for TempDbContext<'a> {
// fn gen_table_id(&mut self) -> Result<TableId> {
// let id = self.sess.get_next_temp_table_id();
// Ok(TableId { in_root: false, id })
// }
// fn resolve_table<I: IntoIterator<Item=TableKind>>(&self, name: &str, kind: I, for_derivation: bool) -> Result<TableInfo> {
// todo!()
// }
// }
impl<'a> DdlContext for TempDbContext<'a> {
fn gen_table_id(&mut self) -> Result<TableId> {
let id = self.sess.get_next_temp_table_id();
Ok(TableId { in_root: false, id })
}
fn store_table(&mut self, info: TableInfo) -> Result<()> {
let table_id = info.table_id();
let tid = table_id.id;
let tname = info.table_name();
let stack_frame = self.sess.stack.last_mut().unwrap();
if stack_frame.contains_key(tname) {
return Err(NameConflict(tname.to_string()).into())
} else {
match &info {
TableInfo::Edge(info) => {
let edge_assocs = self.sess.table_assocs.entry(DataKind::Edge).or_insert(Default::default());
let src_assocs = edge_assocs.entry(info.src_id).or_insert(Default::default());
src_assocs.insert(tid);
let back_edge_assocs = self.sess.table_assocs.entry(DataKind::EdgeBwd).or_insert(Default::default());
let dst_assocs = back_edge_assocs.entry(info.dst_id).or_insert(Default::default());
dst_assocs.insert(tid);
}
TableInfo::Assoc(info) => {
let assocs = self.sess.table_assocs.entry(DataKind::Assoc).or_insert(Default::default());
let src_assocs = assocs.entry(info.src_id).or_insert(Default::default());
src_assocs.insert(tid);
}
TableInfo::Index(info) => {
let idx_assocs = self.sess.table_assocs.entry(DataKind::Index).or_insert(Default::default());
let src_assocs = idx_assocs.entry(info.src_id).or_insert(Default::default());
src_assocs.insert(tid);
}
TableInfo::Node(_) => {}
TableInfo::Sequence(_) => {}
}
stack_frame.insert(tname.to_string(), SessionDefinable::Table(tid));
self.sess.tables.insert(tid, info);
}
Ok(())
}
fn commit(&mut self) -> Result<()> {
Ok(())
}
}
struct TempDbContext<'a> {
sess: &'a Session,
sess: &'a mut Session,
}
impl Session {
fn main_ctx(&self) -> MainDbContext {
MainDbContext {
sess: self
sess: self,
txn: self.txn(None)
}
}
fn temp_ctx(&self) -> TempDbContext {
fn temp_ctx(&mut self) -> TempDbContext {
TempDbContext {
sess: self
}

@ -26,6 +26,9 @@ pub enum DbInstanceError {
#[error("Table does not exist: {0}")]
TableDoesNotExist(u32),
#[error("Name conflict {0}")]
NameConflict(String)
}
type Result<T> = result::Result<T, DbInstanceError>;
@ -129,6 +132,7 @@ impl DbInstance {
cur_table_id: 0.into(),
params: Default::default(),
table_locks: self.table_locks.clone(),
tables: Default::default()
})
}

@ -7,9 +7,10 @@ use log::error;
use cozorocks::{DbPtr, ReadOptionsPtr, TransactionPtr, WriteOptionsPtr};
use crate::data::expr::StaticExpr;
use crate::data::tuple::{DataKind, OwnTuple, Tuple};
use crate::data::tuple_set::MIN_TABLE_ID_BOUND;
use crate::data::tuple_set::{MIN_TABLE_ID_BOUND, TableId};
use crate::data::typing::Typing;
use crate::data::value::{Value, StaticValue};
use crate::ddl::reify::TableInfo;
use crate::runtime::instance::{DbInstanceError, SessionHandle, SessionStatus, TableLock};
use crate::runtime::options::{default_txn_options, default_write_options};
@ -19,6 +20,7 @@ pub(crate) enum SessionDefinable {
Value(StaticValue),
Expr(StaticExpr),
Typing(Typing),
Table(u32)
// TODO
}
@ -37,6 +39,8 @@ pub struct Session {
pub(crate) params: BTreeMap<String, StaticValue>,
pub(crate) session_handle: Arc<Mutex<SessionHandle>>,
pub(crate) table_locks: TableLock,
pub(crate) tables: BTreeMap<u32, TableInfo>,
pub(crate) table_assocs: BTreeMap<DataKind, BTreeMap<TableId, BTreeSet<u32>>>
}
pub(crate) struct InterpretContext<'a> {

Loading…
Cancel
Save