use new storage engine

main
Ziyang Hu 2 years ago
parent 6dbbec65d3
commit d067a127cf

@ -185,11 +185,11 @@ struct IteratorBridge {
inner->SeekForPrev(k); inner->SeekForPrev(k);
} }
inline std::unique_ptr <SliceBridge> key() const { inline std::unique_ptr <SliceBridge> key_raw() const {
return std::make_unique<SliceBridge>(inner->key()); return std::make_unique<SliceBridge>(inner->key());
} }
inline std::unique_ptr <SliceBridge> value() const { inline std::unique_ptr <SliceBridge> value_raw() const {
return std::make_unique<SliceBridge>(inner->value()); return std::make_unique<SliceBridge>(inner->value());
} }

@ -110,15 +110,15 @@ mod ffi {
fn drop_column_family_raw(self: &DBBridge, name: &CxxString, status: &mut BridgeStatus); fn drop_column_family_raw(self: &DBBridge, name: &CxxString, status: &mut BridgeStatus);
fn get_column_family_names_raw(self: &DBBridge) -> UniquePtr<CxxVector<CxxString>>; fn get_column_family_names_raw(self: &DBBridge) -> UniquePtr<CxxVector<CxxString>>;
type IteratorBridge; pub type IteratorBridge;
fn seek_to_first(self: &IteratorBridge); fn seek_to_first(self: &IteratorBridge);
fn seek_to_last(self: &IteratorBridge); fn seek_to_last(self: &IteratorBridge);
fn next(self: &IteratorBridge); fn next(self: &IteratorBridge);
fn is_valid(self: &IteratorBridge) -> bool; fn is_valid(self: &IteratorBridge) -> bool;
fn do_seek(self: &IteratorBridge, key: &[u8]); fn do_seek(self: &IteratorBridge, key: &[u8]);
fn do_seek_for_prev(self: &IteratorBridge, key: &[u8]); fn do_seek_for_prev(self: &IteratorBridge, key: &[u8]);
fn key(self: &IteratorBridge) -> UniquePtr<SliceBridge>; fn key_raw(self: &IteratorBridge) -> UniquePtr<SliceBridge>;
fn value(self: &IteratorBridge) -> UniquePtr<SliceBridge>; fn value_raw(self: &IteratorBridge) -> UniquePtr<SliceBridge>;
fn status(self: &IteratorBridge) -> BridgeStatus; fn status(self: &IteratorBridge) -> BridgeStatus;
pub type WriteBatchBridge; pub type WriteBatchBridge;
@ -128,10 +128,26 @@ mod ffi {
} }
} }
use std::fmt::Formatter;
use std::fmt::Debug;
use std::path::Path; use std::path::Path;
use cxx::{UniquePtr, SharedPtr, let_cxx_string}; use cxx::{UniquePtr, SharedPtr, let_cxx_string};
pub use ffi::BridgeStatus;
pub use ffi::StatusBridgeCode;
pub use ffi::StatusCode;
pub use ffi::StatusSubCode;
pub use ffi::StatusSeverity;
pub use ffi::IteratorBridge;
use ffi::*; use ffi::*;
impl std::fmt::Display for BridgeStatus {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
Debug::fmt(self, f)
}
}
impl std::error::Error for BridgeStatus {}
type Result<T> = std::result::Result<T, BridgeStatus>; type Result<T> = std::result::Result<T, BridgeStatus>;
pub type Options = UniquePtr<OptionsBridge>; pub type Options = UniquePtr<OptionsBridge>;
@ -249,15 +265,27 @@ pub type Iterator = UniquePtr<IteratorBridge>;
pub trait IteratorImpl { pub trait IteratorImpl {
fn seek(&self, key: impl AsRef<[u8]>); fn seek(&self, key: impl AsRef<[u8]>);
fn seek_for_prev(&self, key: impl AsRef<[u8]>); fn seek_for_prev(&self, key: impl AsRef<[u8]>);
fn key(&self) -> Slice;
fn value(&self) -> Slice;
} }
impl IteratorImpl for IteratorBridge { impl IteratorImpl for IteratorBridge {
#[inline]
fn seek(&self, key: impl AsRef<[u8]>) { fn seek(&self, key: impl AsRef<[u8]>) {
self.do_seek(key.as_ref()); self.do_seek(key.as_ref());
} }
#[inline]
fn seek_for_prev(&self, key: impl AsRef<[u8]>) { fn seek_for_prev(&self, key: impl AsRef<[u8]>) {
self.do_seek_for_prev(key.as_ref()) self.do_seek_for_prev(key.as_ref())
} }
#[inline]
fn key(&self) -> Slice {
Slice(self.key_raw())
}
#[inline]
fn value(&self) -> Slice {
Slice(self.value_raw())
}
} }
fn get_path_bytes(path: &std::path::Path) -> &[u8] { fn get_path_bytes(path: &std::path::Path) -> &[u8] {
@ -304,7 +332,6 @@ pub trait DBImpl {
fn delete(&self, key: impl AsRef<[u8]>, cf: &ColumnFamilyHandle, options: Option<&WriteOptions>) fn delete(&self, key: impl AsRef<[u8]>, cf: &ColumnFamilyHandle, options: Option<&WriteOptions>)
-> Result<BridgeStatus>; -> Result<BridgeStatus>;
fn write(&self, updates: WriteBatch, options: Option<&WriteOptions>) -> Result<BridgeStatus>; fn write(&self, updates: WriteBatch, options: Option<&WriteOptions>) -> Result<BridgeStatus>;
} }
impl DBImpl for DB { impl DBImpl for DB {
@ -416,8 +443,8 @@ impl DBImpl for DB {
fn write(&self, mut updates: WriteBatch, options: Option<&WriteOptions>) -> Result<BridgeStatus> { fn write(&self, mut updates: WriteBatch, options: Option<&WriteOptions>) -> Result<BridgeStatus> {
let mut status = BridgeStatus::default(); let mut status = BridgeStatus::default();
self.inner.write_raw(options.unwrap_or(&self.default_write_options), self.inner.write_raw(options.unwrap_or(&self.default_write_options),
updates.pin_mut(), updates.pin_mut(),
&mut status); &mut status);
if status.code == StatusCode::kOk { if status.code == StatusCode::kOk {
Ok(status) Ok(status)
} else { } else {

@ -5,12 +5,13 @@ use crate::env::{Env, LayeredEnv, StructuredEnvItem};
use crate::error::Result; use crate::error::Result;
use crate::error::CozoError::*; use crate::error::CozoError::*;
use crate::eval::Evaluator; use crate::eval::Evaluator;
use crate::storage::Storage; use crate::storage::{RocksStorage, Storage};
use crate::typing::{Col, Columns, Edge, Index, Node, Structured, TableId, Typing}; use crate::typing::{Col, Columns, Edge, Index, Node, StorageStatus, Structured, TableId, Typing};
use crate::typing::StorageStatus::{Planned, Stored}; use crate::typing::StorageStatus::{Planned, Stored};
use crate::value::{ByteArrayBuilder, Value}; use crate::value::{ByteArrayBuilder, ByteArrayParser, Value};
use crate::parser::{Parser, Rule}; use crate::parser::{Parser, Rule};
use pest::Parser as PestParser; use pest::Parser as PestParser;
use cozo_rocks::*;
// use rocksdb::IteratorMode; // use rocksdb::IteratorMode;
fn parse_ident(pair: Pair<Rule>) -> String { fn parse_ident(pair: Pair<Rule>) -> String {
@ -290,110 +291,115 @@ pub enum TableKind {
Index = 4, Index = 4,
} }
impl Storage { impl RocksStorage {
#[allow(unused_variables)] #[allow(unused_variables)]
fn all_metadata(&self, env: &StructuredEnvItem) -> Result<Vec<Structured>> { fn all_metadata(&self, env: &StructuredEnvItem) -> Result<Vec<Structured>> {
todo!() let default_cf = self.db.get_cf_handle("default")?;
// let it = self.db.as_ref().ok_or(DatabaseClosed)?.full_iterator(IteratorMode::Start); let it = self.db.iterator(&default_cf, None);
// let mut ret = vec![];
// let mut ret = vec![];
// for (k, v) in it { while it.is_valid() {
// let mut key_parser = ByteArrayParser::new(&k); let k = it.key();
// let table_name = key_parser.parse_value().unwrap().get_string().unwrap(); let v = it.value();
// let mut key_parser = ByteArrayParser::new(&k);
// let mut data_parser = ByteArrayParser::new(&v); let mut data_parser = ByteArrayParser::new(&v);
// let table_kind = data_parser.parse_value().unwrap();
// let table_id = TableId { name: table_name, global: true }; let table_name = key_parser.parse_value().unwrap().get_string().unwrap();
// match table_kind { let table_kind = data_parser.parse_value().unwrap();
// Value::UInt(i) if i == TableKind::Node as u64 => { let table_id = TableId { name: table_name, global: true };
// let keys: Vec<_> = data_parser.parse_value().unwrap().get_list().unwrap()
// .into_iter().map(|v| { match table_kind {
// let mut vs = v.get_list().unwrap().into_iter(); Value::UInt(i) if i == TableKind::Node as u64 => {
// let name = vs.next().unwrap().get_string().unwrap(); let keys: Vec<_> = data_parser.parse_value().unwrap().get_list().unwrap()
// let typ = vs.next().unwrap().get_string().unwrap(); .into_iter().map(|v| {
// let typ = env.build_type_from_str(&typ).unwrap(); let mut vs = v.get_list().unwrap().into_iter();
// let default = vs.next().unwrap().into_owned(); let name = vs.next().unwrap().get_string().unwrap();
// Col { let typ = vs.next().unwrap().get_string().unwrap();
// name, let typ = env.build_type_from_str(&typ).unwrap();
// typ, let default = vs.next().unwrap().into_owned();
// default, Col {
// } name,
// }).collect(); typ,
// let cols: Vec<_> = data_parser.parse_value().unwrap().get_list().unwrap() default,
// .into_iter().map(|v| { }
// let mut vs = v.get_list().unwrap().into_iter(); }).collect();
// let name = vs.next().unwrap().get_string().unwrap(); let cols: Vec<_> = data_parser.parse_value().unwrap().get_list().unwrap()
// let typ = vs.next().unwrap().get_string().unwrap(); .into_iter().map(|v| {
// let typ = env.build_type_from_str(&typ).unwrap(); let mut vs = v.get_list().unwrap().into_iter();
// let default = vs.next().unwrap().into_owned(); let name = vs.next().unwrap().get_string().unwrap();
// Col { let typ = vs.next().unwrap().get_string().unwrap();
// name, let typ = env.build_type_from_str(&typ).unwrap();
// typ, let default = vs.next().unwrap().into_owned();
// default, Col {
// } name,
// }).collect(); typ,
// let node = Node { default,
// status: StorageStatus::Stored, }
// id: table_id, }).collect();
// keys, let node = Node {
// cols, status: StorageStatus::Stored,
// out_e: vec![], // TODO fix these id: table_id,
// in_e: vec![], keys,
// attached: vec![], cols,
// }; out_e: vec![], // TODO fix these
// ret.push(Structured::Node(node)); in_e: vec![],
// } attached: vec![],
// Value::UInt(i) if i == TableKind::Edge as u64 => { };
// let src_name = data_parser.parse_value().unwrap().get_string().unwrap(); ret.push(Structured::Node(node));
// let dst_name = data_parser.parse_value().unwrap().get_string().unwrap(); }
// let src_id = TableId { name: src_name, global: true }; Value::UInt(i) if i == TableKind::Edge as u64 => {
// let dst_id = TableId { name: dst_name, global: true }; let src_name = data_parser.parse_value().unwrap().get_string().unwrap();
// let keys: Vec<_> = data_parser.parse_value().unwrap().get_list().unwrap() let dst_name = data_parser.parse_value().unwrap().get_string().unwrap();
// .into_iter().map(|v| { let src_id = TableId { name: src_name, global: true };
// let mut vs = v.get_list().unwrap().into_iter(); let dst_id = TableId { name: dst_name, global: true };
// let name = vs.next().unwrap().get_string().unwrap(); let keys: Vec<_> = data_parser.parse_value().unwrap().get_list().unwrap()
// let typ = vs.next().unwrap().get_string().unwrap(); .into_iter().map(|v| {
// let typ = env.build_type_from_str(&typ).unwrap(); let mut vs = v.get_list().unwrap().into_iter();
// let default = vs.next().unwrap().into_owned(); let name = vs.next().unwrap().get_string().unwrap();
// Col { let typ = vs.next().unwrap().get_string().unwrap();
// name, let typ = env.build_type_from_str(&typ).unwrap();
// typ, let default = vs.next().unwrap().into_owned();
// default, Col {
// } name,
// }).collect(); typ,
// let cols: Vec<_> = data_parser.parse_value().unwrap().get_list().unwrap() default,
// .into_iter().map(|v| { }
// let mut vs = v.get_list().unwrap().into_iter(); }).collect();
// let name = vs.next().unwrap().get_string().unwrap(); let cols: Vec<_> = data_parser.parse_value().unwrap().get_list().unwrap()
// let typ = vs.next().unwrap().get_string().unwrap(); .into_iter().map(|v| {
// let typ = env.build_type_from_str(&typ).unwrap(); let mut vs = v.get_list().unwrap().into_iter();
// let default = vs.next().unwrap().into_owned(); let name = vs.next().unwrap().get_string().unwrap();
// Col { let typ = vs.next().unwrap().get_string().unwrap();
// name, let typ = env.build_type_from_str(&typ).unwrap();
// typ, let default = vs.next().unwrap().into_owned();
// default, Col {
// } name,
// }).collect(); typ,
// let edge = Edge { default,
// status: StorageStatus::Stored, }
// src: src_id, }).collect();
// dst: dst_id, let edge = Edge {
// id: table_id, status: StorageStatus::Stored,
// keys, src: src_id,
// cols, dst: dst_id,
// }; id: table_id,
// ret.push(Structured::Edge(edge)); keys,
// } cols,
// Value::UInt(i) if i == TableKind::Columns as u64 => { };
// todo!() ret.push(Structured::Edge(edge));
// } }
// Value::UInt(i) if i == TableKind::Index as u64 => { Value::UInt(i) if i == TableKind::Columns as u64 => {
// todo!() todo!()
// } }
// _ => unreachable!() Value::UInt(i) if i == TableKind::Index as u64 => {
// } todo!()
// } }
// Ok(ret) _ => unreachable!()
}
it.next();
}
Ok(ret)
} }
fn persist_node(&mut self, node: &mut Node) -> Result<()> { fn persist_node(&mut self, node: &mut Node) -> Result<()> {
@ -450,7 +456,7 @@ impl Storage {
} }
} }
impl Evaluator { impl Evaluator<RocksStorage> {
pub fn restore_metadata(&mut self) -> Result<()> { pub fn restore_metadata(&mut self) -> Result<()> {
let mds = self.storage.all_metadata(self.s_envs.root())?; let mds = self.storage.all_metadata(self.s_envs.root())?;
for md in &mds { for md in &mds {
@ -535,6 +541,7 @@ impl Evaluator {
mod tests { mod tests {
use super::*; use super::*;
use pest::Parser as PestParser; use pest::Parser as PestParser;
use crate::eval::EvaluatorWithStorage;
use crate::parser::Parser; use crate::parser::Parser;
#[test] #[test]
@ -552,7 +559,7 @@ mod tests {
} }
"#; "#;
let parsed = Parser::parse(Rule::file, s).unwrap(); let parsed = Parser::parse(Rule::file, s).unwrap();
let mut eval = Evaluator::new("_path_for_rocksdb_storagex".to_string()).unwrap(); let mut eval = EvaluatorWithStorage::new("_path_for_rocksdb_storagex".to_string()).unwrap();
eval.build_table(parsed).unwrap(); eval.build_table(parsed).unwrap();
eval.restore_metadata().unwrap(); eval.restore_metadata().unwrap();
eval.storage.delete().unwrap(); eval.storage.delete().unwrap();

@ -43,8 +43,8 @@ pub enum CozoError {
#[error(transparent)] #[error(transparent)]
Parse(#[from] pest::error::Error<Rule>), Parse(#[from] pest::error::Error<Rule>),
// #[error(transparent)] #[error(transparent)]
// Storage(#[from] rocksdb::Error) Storage(#[from] cozo_rocks::BridgeStatus)
} }
pub type Result<T> = result::Result<T, CozoError>; pub type Result<T> = result::Result<T, CozoError>;

@ -6,24 +6,30 @@ use crate::error::CozoError::*;
use crate::value::Value::*; use crate::value::Value::*;
use crate::ast::*; use crate::ast::*;
use crate::env::StructuredEnv; use crate::env::StructuredEnv;
use crate::storage::Storage; use crate::storage::{DummyStorage, RocksStorage, Storage};
pub struct Evaluator { pub struct Evaluator<S: Storage> {
pub s_envs: StructuredEnv, pub s_envs: StructuredEnv,
pub storage: Storage, pub storage: S,
} }
impl Evaluator { pub type EvaluatorWithStorage = Evaluator<RocksStorage>;
pub fn no_storage() -> Self { pub type BareEvaluator = Evaluator<DummyStorage>;
Self { s_envs: StructuredEnv::new(), storage: Storage::no_storage() }
} impl EvaluatorWithStorage {
pub fn new(path: String) -> Result<Self> { pub fn new(path: String) -> Result<Self> {
Ok(Self { s_envs: StructuredEnv::new(), storage: Storage::new(path)? }) Ok(Self { s_envs: StructuredEnv::new(), storage: RocksStorage::new(path)? })
}
}
impl BareEvaluator {
pub fn new() -> Self {
Self { s_envs: StructuredEnv::new(), storage: DummyStorage }
} }
} }
impl<'a> ExprVisitor<'a, Result<Expr<'a>>> for Evaluator { impl<'a, S: Storage> ExprVisitor<'a, Result<Expr<'a>>> for Evaluator<S> {
fn visit_expr(&mut self, ex: &Expr<'a>) -> Result<Expr<'a>> { fn visit_expr(&mut self, ex: &Expr<'a>) -> Result<Expr<'a>> {
match ex { match ex {
Apply(op, args) => { Apply(op, args) => {
@ -57,7 +63,7 @@ impl<'a> ExprVisitor<'a, Result<Expr<'a>>> for Evaluator {
} }
} }
impl Evaluator { impl<S: Storage> Evaluator<S> {
fn add_exprs<'a>(&mut self, exprs: &[Expr<'a>]) -> Result<Expr<'a>> { fn add_exprs<'a>(&mut self, exprs: &[Expr<'a>]) -> Result<Expr<'a>> {
match exprs { match exprs {
[a, b] => { [a, b] => {
@ -499,7 +505,7 @@ mod tests {
#[test] #[test]
fn operators() { fn operators() {
let mut ev = Evaluator::no_storage(); let mut ev = BareEvaluator::new();
println!("{:#?}", ev.visit_expr(&parse_expr_from_str("1/10+(-2+3)*4^5").unwrap()).unwrap()); println!("{:#?}", ev.visit_expr(&parse_expr_from_str("1/10+(-2+3)*4^5").unwrap()).unwrap());
println!("{:#?}", ev.visit_expr(&parse_expr_from_str("true && false").unwrap()).unwrap()); println!("{:#?}", ev.visit_expr(&parse_expr_from_str("true && false").unwrap()).unwrap());
@ -507,7 +513,6 @@ mod tests {
println!("{:#?}", ev.visit_expr(&parse_expr_from_str("true || null").unwrap()).unwrap()); println!("{:#?}", ev.visit_expr(&parse_expr_from_str("true || null").unwrap()).unwrap());
println!("{:#?}", ev.visit_expr(&parse_expr_from_str("null || true").unwrap()).unwrap()); println!("{:#?}", ev.visit_expr(&parse_expr_from_str("null || true").unwrap()).unwrap());
println!("{:#?}", ev.visit_expr(&parse_expr_from_str("true && null").unwrap()).unwrap()); println!("{:#?}", ev.visit_expr(&parse_expr_from_str("true && null").unwrap()).unwrap());
ev.storage.delete().unwrap();
} }
@ -842,7 +847,7 @@ mod tests {
{"_src":205,"_dst":11,"_type":"InDepartment"}, {"_src":205,"_dst":11,"_type":"InDepartment"},
{"_src":206,"_dst":11,"_type":"InDepartment"}]"#; {"_src":206,"_dst":11,"_type":"InDepartment"}]"#;
let parsed = parse_expr_from_str(data)?; let parsed = parse_expr_from_str(data)?;
let mut ev = Evaluator::no_storage(); let mut ev = BareEvaluator::new();
let evaluated = ev.visit_expr(&parsed)?; let evaluated = ev.visit_expr(&parsed)?;
println!("{:#?}", evaluated); println!("{:#?}", evaluated);
Ok(()) Ok(())

@ -1,60 +1,66 @@
use crate::error::Result; use crate::error::{CozoError, Result};
use cozo_rocks::*;
use crate::value::{ByteArrayBuilder, cozo_comparator_v1, Value};
pub struct Storage { pub struct RocksStorage {
pub db: Option<()>, pub db: DB,
#[allow(dead_code)] #[allow(dead_code)]
path: String, path: String,
} }
impl Storage { impl RocksStorage {
pub fn no_storage() -> Self {
Self { db: None, path: "".to_string() }
}
#[allow(unused_variables)] #[allow(unused_variables)]
pub fn new(path: String) -> Result<Self> { pub fn new(path: String) -> Result<Self> {
unimplemented!() let options = Options::default()
// let options = make_options(); .increase_parallelism()
// let cfs = match DB::list_cf(&options, &path) { .optimize_level_style_compaction()
// Ok(cfs) => { cfs } .set_create_if_missing(true)
// Err(_) => { vec![] } .set_comparator("cozo_comparator_v1", cozo_comparator_v1);
// };
// let cfs = cfs.into_iter().map(|name| { let db = DB::open(options, path.as_ref())?;
// ColumnFamilyDescriptor::new(name, make_options()) Ok(RocksStorage {db, path})
// });
// let db = DB::open_cf_descriptors(&options, &path, cfs)?;
// Ok(Storage { db: Some(db), path })
} }
#[allow(unused_variables)] #[allow(unused_variables)]
pub fn delete(&mut self) -> Result<()> { pub fn delete(&mut self) -> Result<()> {
unimplemented!() // unimplemented!()
// drop(self.db.take()); // drop(self.db.take());
// DB::destroy(&make_options(), &self.path)?; // DB::destroy(&make_options(), &self.path)?;
// Ok(()) Ok(())
} }
#[allow(unused_variables)] #[allow(unused_variables)]
pub fn put_global(&self, k: &[u8], v: &[u8]) -> Result<()> { pub fn put_global(&self, k: &[u8], v: &[u8]) -> Result<()> {
// let db = self.db.as_ref().ok_or(DatabaseClosed)?; // let db = self.db.as_ref().ok_or(DatabaseClosed)?;
// db.put(k, v)?; let default_cf = self.db.get_cf_handle("default")?;
unimplemented!() self.db.put(k, v, &default_cf, None)?;
// Ok(())
Ok(())
} }
#[allow(unused_variables)] #[allow(unused_variables)]
pub fn create_table(&mut self, name: &str, _global: bool) -> Result<()> { pub fn create_table(&mut self, name: &str, _global: bool) -> Result<()> {
unimplemented!() match self.db.create_column_family(name) {
// let db = self.db.as_mut().ok_or(DatabaseClosed)?; Ok(_) => Ok(()),
// db.create_cf(name, &make_options())?; Err(s) if s.bridge_code == StatusBridgeCode::EXISTING_ERROR => Ok(()),
// Ok(()) Err(e) => Err(CozoError::Storage(e))
}
} }
#[allow(unused_variables)] #[allow(unused_variables)]
pub fn drop_table(&mut self, name: &str, _global: bool) -> Result<()> { pub fn drop_table(&mut self, name: &str, _global: bool) -> Result<()> {
unimplemented!() self.db.drop_column_family(name)?;
// let db = self.db.as_mut().ok_or(DatabaseClosed)?; Ok(())
// db.drop_cf(name)?;
// Ok(())
} }
} }
pub trait Storage {}
pub struct DummyStorage;
impl Storage for DummyStorage {}
impl Storage for RocksStorage {}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::str::from_utf8; use std::str::from_utf8;

Loading…
Cancel
Save