storage meta persistence

main
Ziyang Hu 2 years ago
parent bf8c6db090
commit e009bbc81b

@ -0,0 +1,10 @@
<component name="InspectionProjectProfileManager">
<profile version="1.0">
<option name="myName" value="Project Default" />
<inspection_tool class="DuplicatedCode" enabled="true" level="WEAK WARNING" enabled_by_default="true">
<Languages>
<language minSize="89" name="Rust" />
</Languages>
</inspection_tool>
</profile>
</component>

@ -1,15 +1,15 @@
use pest::iterators::{Pair, Pairs};
use crate::ast::parse_string;
use crate::env::{Env, StructuredEnvItem};
use crate::env::{Env, LayeredEnv, StructuredEnvItem};
use crate::error::Result;
use crate::error::CozoError::*;
use crate::eval::Evaluator;
use crate::parser::{Rule};
use crate::storage::Storage;
use crate::typing::{Col, Columns, Edge, Index, Node, Structured, TableId, Typing};
use crate::typing::Persistence::{Global, Local};
use crate::typing::{Col, Columns, Edge, Index, Node, StorageStatus, Structured, TableId, Typing};
use crate::typing::StorageStatus::{Planned, Stored};
use crate::value::{ByteArrayBuilder, Value};
use crate::value::{ByteArrayBuilder, ByteArrayParser, Value};
use crate::parser::{Parser, Rule};
use pest::Parser as PestParser;
fn parse_ident(pair: Pair<Rule>) -> String {
pair.as_str().to_string()
@ -60,7 +60,7 @@ impl StructuredEnvItem {
} else {
return Err(WrongType);
};
if table_id.0 == Global && (src_id.0 == Local || dst_id.0 == Local) {
if table_id.is_global() && (src_id.is_local() || dst_id.is_local()) {
return Err(IncompatibleEdge);
}
let (keys, cols) = if let Some(p) = inner.next() {
@ -73,6 +73,7 @@ impl StructuredEnvItem {
src: src_id,
dst: dst_id,
id: table_id,
name: name.clone(),
keys,
cols,
};
@ -100,6 +101,7 @@ impl StructuredEnvItem {
let node = Node {
status: Planned,
id: table_id,
name: name.to_string(),
keys,
cols,
out_e: vec![],
@ -136,13 +138,14 @@ impl StructuredEnvItem {
if !keys.is_empty() {
return Err(UnexpectedIndexColumns);
}
if table_id.0 == Global && node_id.0 == Local {
if table_id.is_global() && node_id.is_local() {
return Err(IncompatibleEdge);
}
if self.define_new(name.clone(), Structured::Columns(Columns {
status: Planned,
id: table_id,
name: name.to_string(),
attached: node_id,
cols,
})) {
@ -182,7 +185,7 @@ impl StructuredEnvItem {
} else {
return Err(WrongType);
};
if table_id.0 == Global && node_id.0 == Local {
if table_id.is_global() && node_id.is_local() {
return Err(IncompatibleEdge);
}
@ -191,6 +194,7 @@ impl StructuredEnvItem {
if self.define_new(name.clone(), Structured::Index(Index {
status: Planned,
id: table_id,
name: name.clone(),
attached: node_id,
cols: col_list,
})) {
@ -200,6 +204,10 @@ impl StructuredEnvItem {
}
}
pub fn build_type_from_str(&self, src: &str) -> Result<Typing> {
let ast = Parser::parse(Rule::typing, src)?.next().unwrap();
self.build_type(ast)
}
fn build_type(&self, pair: Pair<Rule>) -> Result<Typing> {
let mut pairs = pair.into_inner();
@ -271,25 +279,198 @@ impl StructuredEnvItem {
}
}
#[repr(u8)]
pub enum TableKind {
Node = 1,
Edge = 2,
Columns = 3,
Index = 4,
}
impl Storage {
fn all_metadata(&self, env: &StructuredEnvItem) -> Result<Vec<Structured>> {
let mut builder = ByteArrayBuilder::with_capacity(1);
builder.build_zigzag(0);
let prefix = builder.get();
let it = self.db.as_ref().ok_or(DatabaseClosed)?.prefix_iterator(&prefix);
let mut ret = vec![];
for (k, v) in it {
let mut key_parser = ByteArrayParser::new(&k);
key_parser.parse_zigzag();
let table_id = key_parser.parse_value().unwrap().get_table_id().unwrap();
let mut data_parser = ByteArrayParser::new(&v);
let table_kind = data_parser.parse_value().unwrap();
let table_name = data_parser.parse_value().unwrap().get_string().unwrap();
match table_kind {
Value::UInt(i) if i == TableKind::Node as u64 => {
let keys: Vec<_> = data_parser.parse_value().unwrap().get_list().unwrap()
.into_iter().map(|v| {
let mut vs = v.get_list().unwrap().into_iter();
let name = vs.next().unwrap().get_string().unwrap();
let typ = vs.next().unwrap().get_string().unwrap();
let typ = env.build_type_from_str(&typ).unwrap();
let default = vs.next().unwrap().into_owned();
Col {
name,
typ,
default,
}
}).collect();
let cols: Vec<_> = data_parser.parse_value().unwrap().get_list().unwrap()
.into_iter().map(|v| {
let mut vs = v.get_list().unwrap().into_iter();
let name = vs.next().unwrap().get_string().unwrap();
let typ = vs.next().unwrap().get_string().unwrap();
let typ = env.build_type_from_str(&typ).unwrap();
let default = vs.next().unwrap().into_owned();
Col {
name,
typ,
default,
}
}).collect();
let node = Node {
status: StorageStatus::Stored,
id: table_id,
name: table_name,
keys,
cols,
out_e: vec![], // TODO fix these
in_e: vec![],
attached: vec![],
};
ret.push(Structured::Node(node));
}
Value::UInt(i) if i == TableKind::Edge as u64 => {
let src_id = data_parser.parse_value().unwrap().get_table_id().unwrap();
let dst_id = data_parser.parse_value().unwrap().get_table_id().unwrap();
let keys: Vec<_> = data_parser.parse_value().unwrap().get_list().unwrap()
.into_iter().map(|v| {
let mut vs = v.get_list().unwrap().into_iter();
let name = vs.next().unwrap().get_string().unwrap();
let typ = vs.next().unwrap().get_string().unwrap();
let typ = env.build_type_from_str(&typ).unwrap();
let default = vs.next().unwrap().into_owned();
Col {
name,
typ,
default,
}
}).collect();
let cols: Vec<_> = data_parser.parse_value().unwrap().get_list().unwrap()
.into_iter().map(|v| {
let mut vs = v.get_list().unwrap().into_iter();
let name = vs.next().unwrap().get_string().unwrap();
let typ = vs.next().unwrap().get_string().unwrap();
let typ = env.build_type_from_str(&typ).unwrap();
let default = vs.next().unwrap().into_owned();
Col {
name,
typ,
default,
}
}).collect();
let edge = Edge {
status: StorageStatus::Stored,
src: src_id,
dst: dst_id,
id: table_id,
name: table_name,
keys,
cols,
};
ret.push(Structured::Edge(edge));
}
Value::UInt(i) if i == TableKind::Columns as u64 => {
todo!()
}
Value::UInt(i) if i == TableKind::Index as u64 => {
todo!()
}
_ => unreachable!()
}
}
Ok(ret)
}
fn persist_node(&mut self, node: &mut Node) -> Result<()> {
let mut key_writer = ByteArrayBuilder::with_capacity(8);
key_writer.build_varint(0);
key_writer.build_value(&Value::UInt(node.id.1 as u64));
// println!("{:#?}", node);
// println!("{:?}", key_writer.get());
key_writer.build_zigzag(0);
key_writer.build_value(&Value::Int(node.id.0));
let mut val_writer = ByteArrayBuilder::with_capacity(128);
val_writer.build_value(&Value::UInt(TableKind::Node as u64));
val_writer.build_value(&Value::RefString(&node.name));
val_writer.build_value(&Value::List(Box::new(node.keys.iter().map(|k| {
Value::List(Box::new(vec![
Value::RefString(&k.name),
Value::OwnString(Box::new(format!("{}", k.typ))),
k.default.clone(),
]))
}).collect())));
val_writer.build_value(&Value::List(Box::new(node.cols.iter().map(|k| {
Value::List(Box::new(vec![
Value::RefString(&k.name),
Value::OwnString(Box::new(format!("{}", k.typ))),
k.default.clone(),
]))
}).collect())));
self.put(&key_writer.get(), &val_writer.get(), TableId(0))?;
node.status = Stored;
Ok(())
}
fn persist_edge(&mut self, edge: &mut Edge) -> Result<()> {
let mut key_writer = ByteArrayBuilder::with_capacity(8);
key_writer.build_zigzag(0);
key_writer.build_value(&Value::Int(edge.id.0));
let mut val_writer = ByteArrayBuilder::with_capacity(128);
val_writer.build_value(&Value::UInt(TableKind::Edge as u64));
val_writer.build_value(&Value::RefString(&edge.name));
val_writer.build_value(&Value::Int(edge.src.0));
val_writer.build_value(&Value::Int(edge.dst.0));
val_writer.build_value(&Value::List(Box::new(edge.keys.iter().map(|k| {
Value::List(Box::new(vec![
Value::RefString(&k.name),
Value::OwnString(Box::new(format!("{}", k.typ))),
k.default.clone(),
]))
}).collect())));
val_writer.build_value(&Value::List(Box::new(edge.cols.iter().map(|k| {
Value::List(Box::new(vec![
Value::RefString(&k.name),
Value::OwnString(Box::new(format!("{}", k.typ))),
k.default.clone(),
]))
}).collect())));
self.put(&key_writer.get(), &val_writer.get(), TableId(0))?;
edge.status = Stored;
Ok(())
}
}
impl Evaluator {
pub fn restore_metadata(&mut self) {}
pub fn restore_metadata(&mut self) -> Result<()> {
let mds = self.storage.all_metadata(self.s_envs.root())?;
for md in &mds {
match md {
v @ Structured::Node(n) => {
// TODO: check if they are the same if one already exists
self.s_envs.root_define(n.name.clone(), v.clone());
}
v @ Structured::Edge(e) => {
self.s_envs.root_define(e.name.clone(), v.clone());
}
Structured::Columns(_) => {}
Structured::Index(_) => {}
Structured::Typing(_) => unreachable!()
}
}
Ok(())
}
fn persist_change(&mut self, tname: &str) -> Result<()> {
let tbl = self.s_envs.resolve_mut(tname).unwrap();
@ -368,7 +549,9 @@ mod tests {
let parsed = Parser::parse(Rule::file, s).unwrap();
let mut eval = Evaluator::new("_path_for_rocksdb_storagex".to_string()).unwrap();
eval.build_table(parsed).unwrap();
// println!("{:#?}", eval.s_envs.resolve("Person"));
// println!("{:#?}", eval.s_envs.resolve("Friend"));
eval.restore_metadata().unwrap();
// eval.storage.delete().unwrap();
println!("{:#?}", eval.s_envs.resolve("Person"));
println!("{:#?}", eval.s_envs.resolve("Friend"));
}
}

@ -1,5 +1,5 @@
use std::collections::BTreeMap;
use crate::typing::{define_base_types, Persistence, Structured, TableId};
use crate::typing::{define_base_types, Structured, TableId};
pub trait Env<V> {
fn define(&mut self, name: String, value: V) -> Option<V>;
@ -9,7 +9,7 @@ pub trait Env<V> {
fn undef(&mut self, name: &str) -> Option<V>;
}
pub trait LayeredEnv<V> : Env<V> {
pub trait LayeredEnv<V>: Env<V> {
fn root_define(&mut self, name: String, value: V) -> Option<V>;
fn root_define_new(&mut self, name: String, value: V) -> bool;
fn root_resolve(&self, name: &str) -> Option<&V>;
@ -18,7 +18,6 @@ pub trait LayeredEnv<V> : Env<V> {
}
pub struct StructuredEnvItem {
map: BTreeMap<String, Structured>,
}
@ -66,17 +65,16 @@ impl StructuredEnv {
pub fn get_next_table_id(&self, local: bool) -> TableId {
let mut id = 0;
let persistence = if local { Persistence::Local } else { Persistence::Global };
for env in &self.stack {
for item in env.map.values() {
if let Some(TableId(p, eid)) = item.storage_id() {
if p == persistence {
id = id.max(eid);
if let Some(c_id) = item.storage_id() {
if c_id.is_local() == local {
id = id.max(c_id.0.abs());
}
}
}
}
TableId(persistence, id + 1)
TableId((1 + id) * if local { -1 } else { 1 })
}
}

@ -31,6 +31,9 @@ pub enum CozoError {
#[error("Unexpected index columns found")]
UnexpectedIndexColumns,
#[error("Database already closed")]
DatabaseClosed,
#[error(transparent)]
ParseInt(#[from] std::num::ParseIntError),

@ -1,10 +1,12 @@
use rocksdb::{DB, Options, ColumnFamilyDescriptor};
use crate::error::CozoError::DatabaseClosed;
use crate::error::Result;
use crate::typing::TableId;
use crate::value::cozo_comparator_v1;
pub struct Storage {
db: Option<DB>,
pub db: Option<DB>,
options: Options,
path: String,
}
@ -17,9 +19,8 @@ impl Storage {
options.create_if_missing(true);
options.set_comparator("cozo_comparator_v1", cozo_comparator_v1);
let main_cf = ColumnFamilyDescriptor::new("main", options.clone());
let temp_cf = ColumnFamilyDescriptor::new("temp", options.clone());
let db = DB::open_cf_descriptors(&options, &path, vec![main_cf, temp_cf])?;
let db = DB::open_cf_descriptors(&options, &path, vec![temp_cf])?;
Ok(Storage { db: Some(db), options, path })
}
@ -28,6 +29,16 @@ impl Storage {
DB::destroy(&self.options, &self.path)?;
Ok(())
}
pub fn put(&self, k: &[u8], v: &[u8], table_id: TableId) -> Result<()> {
let db = self.db.as_ref().ok_or(DatabaseClosed)?;
if table_id.is_global() {
db.put(k, v)?;
} else {
let cf = db.cf_handle("temp").ok_or(DatabaseClosed)?;
db.put_cf(cf, k, v)?;
}
Ok(())
}
}
#[cfg(test)]

@ -46,11 +46,6 @@ pub struct Col {
pub default: Value<'static>,
}
#[derive(Debug, PartialEq, Eq, Ord, PartialOrd, Clone, Copy)]
pub enum Persistence {
Global,
Local,
}
#[derive(Debug, PartialEq, Eq, Ord, PartialOrd, Clone)]
pub enum StorageStatus {
@ -59,16 +54,42 @@ pub enum StorageStatus {
Stored,
}
#[derive(Debug, PartialEq, Eq, Ord, PartialOrd, Clone, Copy)]
pub struct TableId(pub Persistence, pub usize);
#[derive(PartialEq, Eq, Ord, PartialOrd, Clone, Copy)]
pub struct TableId(pub i64);
impl TableId {
pub fn is_global(&self) -> bool {
self.0 >= 0
}
pub fn is_local(&self) -> bool {
self.0 < 0
}
}
impl Debug for TableId {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.write_str( if self.is_global() { "G+" } else {"L"})?;
f.write_str(&format!("{}", self.0))?;
Ok(())
}
}
#[derive(Debug, Ord, PartialOrd, Eq, PartialEq)]
pub struct ColumnId(TableId, usize);
#[derive(Ord, PartialOrd, Eq, PartialEq)]
pub struct ColumnId(TableId, i64);
impl Debug for ColumnId {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.write_str(&format!("{:?}", self.0))?;
f.write_str(&format!("~{}", self.1))?;
Ok(())
}
}
#[derive(Debug, PartialEq, Clone)]
pub struct Node {
pub status: StorageStatus,
pub id: TableId,
pub name: String,
pub keys: Vec<Col>,
pub cols: Vec<Col>,
pub out_e: Vec<TableId>,
@ -82,6 +103,7 @@ pub struct Edge {
pub src: TableId,
pub dst: TableId,
pub id: TableId,
pub name: String,
pub keys: Vec<Col>,
pub cols: Vec<Col>,
}
@ -91,6 +113,7 @@ pub struct Columns {
pub status: StorageStatus,
pub attached: TableId,
pub id: TableId,
pub name: String,
pub cols: Vec<Col>,
}
@ -98,6 +121,7 @@ pub struct Columns {
pub struct Index {
pub status: StorageStatus,
pub id: TableId,
pub name: String,
pub attached: TableId,
pub cols: Vec<String>,
}

@ -4,7 +4,7 @@ use std::collections::{BTreeMap};
use std::io::{Write};
use ordered_float::OrderedFloat;
use uuid::Uuid;
use crate::typing::Typing;
use crate::typing::{TableId, Typing};
use Ordering::{Greater, Less, Equal};
// TODO: array types, alignment of values
@ -71,6 +71,30 @@ pub enum Value<'a> {
Dict(Box<BTreeMap<Cow<'a, str>, Value<'a>>>),
}
impl <'a> Value<'a> {
pub fn get_list(self) -> Option<Vec<Self>> {
match self {
Value::List(v) => Some(*v),
_ => None
}
}
pub fn get_string(self) -> Option<String> {
match self {
Value::OwnString(v) => Some(*v),
Value::RefString(v) => Some(v.to_string()),
_ => None
}
}
pub fn get_table_id(self) -> Option<TableId> {
if let Value::Int(id) = self {
Some(TableId(id))
} else {
None
}
}
}
impl<'a> PartialEq for Value<'a> {
fn eq(&self, other: &Self) -> bool {
use Value::*;
@ -454,7 +478,7 @@ impl<T: Write> ByteArrayBuilder<T> {
}
pub fn cmp_keys<'a>(pa: &mut ByteArrayParser<'a>, pb: &mut ByteArrayParser<'a>) -> Ordering {
if let x @ (Greater | Less) = pa.compare_varint(pb) { return x; }
if let x @ (Greater | Less) = pa.compare_zigzag(pb) { return x; }
cmp_data(pa, pb)
}
@ -524,7 +548,7 @@ impl Typing {
}
pub struct CozoKey<'a> {
pub table_id: u64,
pub table_id: i64,
pub values: Vec<Value<'a>>,
}

Loading…
Cancel
Save