From e009bbc81b9913c1e67efad4e3f57fd18c765395 Mon Sep 17 00:00:00 2001 From: Ziyang Hu Date: Mon, 11 Apr 2022 21:01:41 +0800 Subject: [PATCH] storage meta persistence --- .idea/inspectionProfiles/Project_Default.xml | 10 + src/definition.rs | 213 +++++++++++++++++-- src/env.rs | 14 +- src/error.rs | 3 + src/storage.rs | 17 +- src/typing.rs | 42 +++- src/value.rs | 30 ++- 7 files changed, 291 insertions(+), 38 deletions(-) create mode 100644 .idea/inspectionProfiles/Project_Default.xml diff --git a/.idea/inspectionProfiles/Project_Default.xml b/.idea/inspectionProfiles/Project_Default.xml new file mode 100644 index 00000000..36741ea4 --- /dev/null +++ b/.idea/inspectionProfiles/Project_Default.xml @@ -0,0 +1,10 @@ + + + + \ No newline at end of file diff --git a/src/definition.rs b/src/definition.rs index 720d5d50..6e8d53bd 100644 --- a/src/definition.rs +++ b/src/definition.rs @@ -1,15 +1,15 @@ use pest::iterators::{Pair, Pairs}; use crate::ast::parse_string; -use crate::env::{Env, StructuredEnvItem}; +use crate::env::{Env, LayeredEnv, StructuredEnvItem}; use crate::error::Result; use crate::error::CozoError::*; use crate::eval::Evaluator; -use crate::parser::{Rule}; use crate::storage::Storage; -use crate::typing::{Col, Columns, Edge, Index, Node, Structured, TableId, Typing}; -use crate::typing::Persistence::{Global, Local}; +use crate::typing::{Col, Columns, Edge, Index, Node, StorageStatus, Structured, TableId, Typing}; use crate::typing::StorageStatus::{Planned, Stored}; -use crate::value::{ByteArrayBuilder, Value}; +use crate::value::{ByteArrayBuilder, ByteArrayParser, Value}; +use crate::parser::{Parser, Rule}; +use pest::Parser as PestParser; fn parse_ident(pair: Pair) -> String { pair.as_str().to_string() @@ -60,7 +60,7 @@ impl StructuredEnvItem { } else { return Err(WrongType); }; - if table_id.0 == Global && (src_id.0 == Local || dst_id.0 == Local) { + if table_id.is_global() && (src_id.is_local() || dst_id.is_local()) { return Err(IncompatibleEdge); } let (keys, cols) = if let Some(p) = inner.next() { @@ -73,6 +73,7 @@ impl StructuredEnvItem { src: src_id, dst: dst_id, id: table_id, + name: name.clone(), keys, cols, }; @@ -100,6 +101,7 @@ impl StructuredEnvItem { let node = Node { status: Planned, id: table_id, + name: name.to_string(), keys, cols, out_e: vec![], @@ -136,13 +138,14 @@ impl StructuredEnvItem { if !keys.is_empty() { return Err(UnexpectedIndexColumns); } - if table_id.0 == Global && node_id.0 == Local { + if table_id.is_global() && node_id.is_local() { return Err(IncompatibleEdge); } if self.define_new(name.clone(), Structured::Columns(Columns { status: Planned, id: table_id, + name: name.to_string(), attached: node_id, cols, })) { @@ -182,7 +185,7 @@ impl StructuredEnvItem { } else { return Err(WrongType); }; - if table_id.0 == Global && node_id.0 == Local { + if table_id.is_global() && node_id.is_local() { return Err(IncompatibleEdge); } @@ -191,6 +194,7 @@ impl StructuredEnvItem { if self.define_new(name.clone(), Structured::Index(Index { status: Planned, id: table_id, + name: name.clone(), attached: node_id, cols: col_list, })) { @@ -200,6 +204,10 @@ impl StructuredEnvItem { } } + pub fn build_type_from_str(&self, src: &str) -> Result { + let ast = Parser::parse(Rule::typing, src)?.next().unwrap(); + self.build_type(ast) + } fn build_type(&self, pair: Pair) -> Result { let mut pairs = pair.into_inner(); @@ -271,25 +279,198 @@ impl StructuredEnvItem { } } +#[repr(u8)] +pub enum TableKind { + Node = 1, + Edge = 2, + Columns = 3, + Index = 4, +} + impl Storage { + fn all_metadata(&self, env: &StructuredEnvItem) -> Result> { + let mut builder = ByteArrayBuilder::with_capacity(1); + builder.build_zigzag(0); + let prefix = builder.get(); + let it = self.db.as_ref().ok_or(DatabaseClosed)?.prefix_iterator(&prefix); + + let mut ret = vec![]; + for (k, v) in it { + let mut key_parser = ByteArrayParser::new(&k); + key_parser.parse_zigzag(); + let table_id = key_parser.parse_value().unwrap().get_table_id().unwrap(); + + let mut data_parser = ByteArrayParser::new(&v); + let table_kind = data_parser.parse_value().unwrap(); + let table_name = data_parser.parse_value().unwrap().get_string().unwrap(); + match table_kind { + Value::UInt(i) if i == TableKind::Node as u64 => { + let keys: Vec<_> = data_parser.parse_value().unwrap().get_list().unwrap() + .into_iter().map(|v| { + let mut vs = v.get_list().unwrap().into_iter(); + let name = vs.next().unwrap().get_string().unwrap(); + let typ = vs.next().unwrap().get_string().unwrap(); + let typ = env.build_type_from_str(&typ).unwrap(); + let default = vs.next().unwrap().into_owned(); + Col { + name, + typ, + default, + } + }).collect(); + let cols: Vec<_> = data_parser.parse_value().unwrap().get_list().unwrap() + .into_iter().map(|v| { + let mut vs = v.get_list().unwrap().into_iter(); + let name = vs.next().unwrap().get_string().unwrap(); + let typ = vs.next().unwrap().get_string().unwrap(); + let typ = env.build_type_from_str(&typ).unwrap(); + let default = vs.next().unwrap().into_owned(); + Col { + name, + typ, + default, + } + }).collect(); + let node = Node { + status: StorageStatus::Stored, + id: table_id, + name: table_name, + keys, + cols, + out_e: vec![], // TODO fix these + in_e: vec![], + attached: vec![], + }; + ret.push(Structured::Node(node)); + } + Value::UInt(i) if i == TableKind::Edge as u64 => { + let src_id = data_parser.parse_value().unwrap().get_table_id().unwrap(); + let dst_id = data_parser.parse_value().unwrap().get_table_id().unwrap(); + let keys: Vec<_> = data_parser.parse_value().unwrap().get_list().unwrap() + .into_iter().map(|v| { + let mut vs = v.get_list().unwrap().into_iter(); + let name = vs.next().unwrap().get_string().unwrap(); + let typ = vs.next().unwrap().get_string().unwrap(); + let typ = env.build_type_from_str(&typ).unwrap(); + let default = vs.next().unwrap().into_owned(); + Col { + name, + typ, + default, + } + }).collect(); + let cols: Vec<_> = data_parser.parse_value().unwrap().get_list().unwrap() + .into_iter().map(|v| { + let mut vs = v.get_list().unwrap().into_iter(); + let name = vs.next().unwrap().get_string().unwrap(); + let typ = vs.next().unwrap().get_string().unwrap(); + let typ = env.build_type_from_str(&typ).unwrap(); + let default = vs.next().unwrap().into_owned(); + Col { + name, + typ, + default, + } + }).collect(); + let edge = Edge { + status: StorageStatus::Stored, + src: src_id, + dst: dst_id, + id: table_id, + name: table_name, + keys, + cols, + }; + ret.push(Structured::Edge(edge)); + } + Value::UInt(i) if i == TableKind::Columns as u64 => { + todo!() + } + Value::UInt(i) if i == TableKind::Index as u64 => { + todo!() + } + _ => unreachable!() + } + } + Ok(ret) + } + fn persist_node(&mut self, node: &mut Node) -> Result<()> { let mut key_writer = ByteArrayBuilder::with_capacity(8); - key_writer.build_varint(0); - key_writer.build_value(&Value::UInt(node.id.1 as u64)); - // println!("{:#?}", node); - // println!("{:?}", key_writer.get()); + key_writer.build_zigzag(0); + key_writer.build_value(&Value::Int(node.id.0)); + let mut val_writer = ByteArrayBuilder::with_capacity(128); + val_writer.build_value(&Value::UInt(TableKind::Node as u64)); + val_writer.build_value(&Value::RefString(&node.name)); + val_writer.build_value(&Value::List(Box::new(node.keys.iter().map(|k| { + Value::List(Box::new(vec![ + Value::RefString(&k.name), + Value::OwnString(Box::new(format!("{}", k.typ))), + k.default.clone(), + ])) + }).collect()))); + val_writer.build_value(&Value::List(Box::new(node.cols.iter().map(|k| { + Value::List(Box::new(vec![ + Value::RefString(&k.name), + Value::OwnString(Box::new(format!("{}", k.typ))), + k.default.clone(), + ])) + }).collect()))); + + self.put(&key_writer.get(), &val_writer.get(), TableId(0))?; node.status = Stored; Ok(()) } fn persist_edge(&mut self, edge: &mut Edge) -> Result<()> { + let mut key_writer = ByteArrayBuilder::with_capacity(8); + key_writer.build_zigzag(0); + key_writer.build_value(&Value::Int(edge.id.0)); + let mut val_writer = ByteArrayBuilder::with_capacity(128); + val_writer.build_value(&Value::UInt(TableKind::Edge as u64)); + val_writer.build_value(&Value::RefString(&edge.name)); + val_writer.build_value(&Value::Int(edge.src.0)); + val_writer.build_value(&Value::Int(edge.dst.0)); + val_writer.build_value(&Value::List(Box::new(edge.keys.iter().map(|k| { + Value::List(Box::new(vec![ + Value::RefString(&k.name), + Value::OwnString(Box::new(format!("{}", k.typ))), + k.default.clone(), + ])) + }).collect()))); + val_writer.build_value(&Value::List(Box::new(edge.cols.iter().map(|k| { + Value::List(Box::new(vec![ + Value::RefString(&k.name), + Value::OwnString(Box::new(format!("{}", k.typ))), + k.default.clone(), + ])) + }).collect()))); + + self.put(&key_writer.get(), &val_writer.get(), TableId(0))?; edge.status = Stored; Ok(()) } } impl Evaluator { - pub fn restore_metadata(&mut self) {} + pub fn restore_metadata(&mut self) -> Result<()> { + let mds = self.storage.all_metadata(self.s_envs.root())?; + for md in &mds { + match md { + v @ Structured::Node(n) => { + // TODO: check if they are the same if one already exists + self.s_envs.root_define(n.name.clone(), v.clone()); + } + v @ Structured::Edge(e) => { + self.s_envs.root_define(e.name.clone(), v.clone()); + } + Structured::Columns(_) => {} + Structured::Index(_) => {} + Structured::Typing(_) => unreachable!() + } + } + Ok(()) + } fn persist_change(&mut self, tname: &str) -> Result<()> { let tbl = self.s_envs.resolve_mut(tname).unwrap(); @@ -368,7 +549,9 @@ mod tests { let parsed = Parser::parse(Rule::file, s).unwrap(); let mut eval = Evaluator::new("_path_for_rocksdb_storagex".to_string()).unwrap(); eval.build_table(parsed).unwrap(); - // println!("{:#?}", eval.s_envs.resolve("Person")); - // println!("{:#?}", eval.s_envs.resolve("Friend")); + eval.restore_metadata().unwrap(); + // eval.storage.delete().unwrap(); + println!("{:#?}", eval.s_envs.resolve("Person")); + println!("{:#?}", eval.s_envs.resolve("Friend")); } } \ No newline at end of file diff --git a/src/env.rs b/src/env.rs index ba9d9dec..26aed0a3 100644 --- a/src/env.rs +++ b/src/env.rs @@ -1,5 +1,5 @@ use std::collections::BTreeMap; -use crate::typing::{define_base_types, Persistence, Structured, TableId}; +use crate::typing::{define_base_types, Structured, TableId}; pub trait Env { fn define(&mut self, name: String, value: V) -> Option; @@ -9,7 +9,7 @@ pub trait Env { fn undef(&mut self, name: &str) -> Option; } -pub trait LayeredEnv : Env { +pub trait LayeredEnv: Env { fn root_define(&mut self, name: String, value: V) -> Option; fn root_define_new(&mut self, name: String, value: V) -> bool; fn root_resolve(&self, name: &str) -> Option<&V>; @@ -18,7 +18,6 @@ pub trait LayeredEnv : Env { } - pub struct StructuredEnvItem { map: BTreeMap, } @@ -66,17 +65,16 @@ impl StructuredEnv { pub fn get_next_table_id(&self, local: bool) -> TableId { let mut id = 0; - let persistence = if local { Persistence::Local } else { Persistence::Global }; for env in &self.stack { for item in env.map.values() { - if let Some(TableId(p, eid)) = item.storage_id() { - if p == persistence { - id = id.max(eid); + if let Some(c_id) = item.storage_id() { + if c_id.is_local() == local { + id = id.max(c_id.0.abs()); } } } } - TableId(persistence, id + 1) + TableId((1 + id) * if local { -1 } else { 1 }) } } diff --git a/src/error.rs b/src/error.rs index 33a3d371..393f3798 100644 --- a/src/error.rs +++ b/src/error.rs @@ -31,6 +31,9 @@ pub enum CozoError { #[error("Unexpected index columns found")] UnexpectedIndexColumns, + #[error("Database already closed")] + DatabaseClosed, + #[error(transparent)] ParseInt(#[from] std::num::ParseIntError), diff --git a/src/storage.rs b/src/storage.rs index a4fcc85c..ceb54299 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -1,10 +1,12 @@ use rocksdb::{DB, Options, ColumnFamilyDescriptor}; +use crate::error::CozoError::DatabaseClosed; use crate::error::Result; +use crate::typing::TableId; use crate::value::cozo_comparator_v1; pub struct Storage { - db: Option, + pub db: Option, options: Options, path: String, } @@ -17,9 +19,8 @@ impl Storage { options.create_if_missing(true); options.set_comparator("cozo_comparator_v1", cozo_comparator_v1); - let main_cf = ColumnFamilyDescriptor::new("main", options.clone()); let temp_cf = ColumnFamilyDescriptor::new("temp", options.clone()); - let db = DB::open_cf_descriptors(&options, &path, vec![main_cf, temp_cf])?; + let db = DB::open_cf_descriptors(&options, &path, vec![temp_cf])?; Ok(Storage { db: Some(db), options, path }) } @@ -28,6 +29,16 @@ impl Storage { DB::destroy(&self.options, &self.path)?; Ok(()) } + pub fn put(&self, k: &[u8], v: &[u8], table_id: TableId) -> Result<()> { + let db = self.db.as_ref().ok_or(DatabaseClosed)?; + if table_id.is_global() { + db.put(k, v)?; + } else { + let cf = db.cf_handle("temp").ok_or(DatabaseClosed)?; + db.put_cf(cf, k, v)?; + } + Ok(()) + } } #[cfg(test)] diff --git a/src/typing.rs b/src/typing.rs index c70d0746..8305f0db 100644 --- a/src/typing.rs +++ b/src/typing.rs @@ -46,11 +46,6 @@ pub struct Col { pub default: Value<'static>, } -#[derive(Debug, PartialEq, Eq, Ord, PartialOrd, Clone, Copy)] -pub enum Persistence { - Global, - Local, -} #[derive(Debug, PartialEq, Eq, Ord, PartialOrd, Clone)] pub enum StorageStatus { @@ -59,16 +54,42 @@ pub enum StorageStatus { Stored, } -#[derive(Debug, PartialEq, Eq, Ord, PartialOrd, Clone, Copy)] -pub struct TableId(pub Persistence, pub usize); +#[derive(PartialEq, Eq, Ord, PartialOrd, Clone, Copy)] +pub struct TableId(pub i64); + +impl TableId { + pub fn is_global(&self) -> bool { + self.0 >= 0 + } + pub fn is_local(&self) -> bool { + self.0 < 0 + } +} + +impl Debug for TableId { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.write_str( if self.is_global() { "G+" } else {"L"})?; + f.write_str(&format!("{}", self.0))?; + Ok(()) + } +} -#[derive(Debug, Ord, PartialOrd, Eq, PartialEq)] -pub struct ColumnId(TableId, usize); +#[derive(Ord, PartialOrd, Eq, PartialEq)] +pub struct ColumnId(TableId, i64); + +impl Debug for ColumnId { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.write_str(&format!("{:?}", self.0))?; + f.write_str(&format!("~{}", self.1))?; + Ok(()) + } +} #[derive(Debug, PartialEq, Clone)] pub struct Node { pub status: StorageStatus, pub id: TableId, + pub name: String, pub keys: Vec, pub cols: Vec, pub out_e: Vec, @@ -82,6 +103,7 @@ pub struct Edge { pub src: TableId, pub dst: TableId, pub id: TableId, + pub name: String, pub keys: Vec, pub cols: Vec, } @@ -91,6 +113,7 @@ pub struct Columns { pub status: StorageStatus, pub attached: TableId, pub id: TableId, + pub name: String, pub cols: Vec, } @@ -98,6 +121,7 @@ pub struct Columns { pub struct Index { pub status: StorageStatus, pub id: TableId, + pub name: String, pub attached: TableId, pub cols: Vec, } diff --git a/src/value.rs b/src/value.rs index f0fa9cbe..1075d96a 100644 --- a/src/value.rs +++ b/src/value.rs @@ -4,7 +4,7 @@ use std::collections::{BTreeMap}; use std::io::{Write}; use ordered_float::OrderedFloat; use uuid::Uuid; -use crate::typing::Typing; +use crate::typing::{TableId, Typing}; use Ordering::{Greater, Less, Equal}; // TODO: array types, alignment of values @@ -71,6 +71,30 @@ pub enum Value<'a> { Dict(Box, Value<'a>>>), } +impl <'a> Value<'a> { + pub fn get_list(self) -> Option> { + match self { + Value::List(v) => Some(*v), + _ => None + } + } + pub fn get_string(self) -> Option { + match self { + Value::OwnString(v) => Some(*v), + Value::RefString(v) => Some(v.to_string()), + _ => None + } + } + + pub fn get_table_id(self) -> Option { + if let Value::Int(id) = self { + Some(TableId(id)) + } else { + None + } + } +} + impl<'a> PartialEq for Value<'a> { fn eq(&self, other: &Self) -> bool { use Value::*; @@ -454,7 +478,7 @@ impl ByteArrayBuilder { } pub fn cmp_keys<'a>(pa: &mut ByteArrayParser<'a>, pb: &mut ByteArrayParser<'a>) -> Ordering { - if let x @ (Greater | Less) = pa.compare_varint(pb) { return x; } + if let x @ (Greater | Less) = pa.compare_zigzag(pb) { return x; } cmp_data(pa, pb) } @@ -524,7 +548,7 @@ impl Typing { } pub struct CozoKey<'a> { - pub table_id: u64, + pub table_id: i64, pub values: Vec>, }