diff --git a/cozo-rocks/include/cozorocks.h b/cozo-rocks/include/cozorocks.h index a61bd735..b8a37860 100644 --- a/cozo-rocks/include/cozorocks.h +++ b/cozo-rocks/include/cozorocks.h @@ -185,11 +185,11 @@ struct IteratorBridge { inner->SeekForPrev(k); } - inline std::unique_ptr key() const { + inline std::unique_ptr key_raw() const { return std::make_unique(inner->key()); } - inline std::unique_ptr value() const { + inline std::unique_ptr value_raw() const { return std::make_unique(inner->value()); } diff --git a/cozo-rocks/src/lib.rs b/cozo-rocks/src/lib.rs index 5b4538d5..a1facdbd 100644 --- a/cozo-rocks/src/lib.rs +++ b/cozo-rocks/src/lib.rs @@ -110,15 +110,15 @@ mod ffi { fn drop_column_family_raw(self: &DBBridge, name: &CxxString, status: &mut BridgeStatus); fn get_column_family_names_raw(self: &DBBridge) -> UniquePtr>; - type IteratorBridge; + pub type IteratorBridge; fn seek_to_first(self: &IteratorBridge); fn seek_to_last(self: &IteratorBridge); fn next(self: &IteratorBridge); fn is_valid(self: &IteratorBridge) -> bool; fn do_seek(self: &IteratorBridge, key: &[u8]); fn do_seek_for_prev(self: &IteratorBridge, key: &[u8]); - fn key(self: &IteratorBridge) -> UniquePtr; - fn value(self: &IteratorBridge) -> UniquePtr; + fn key_raw(self: &IteratorBridge) -> UniquePtr; + fn value_raw(self: &IteratorBridge) -> UniquePtr; fn status(self: &IteratorBridge) -> BridgeStatus; pub type WriteBatchBridge; @@ -128,10 +128,26 @@ mod ffi { } } +use std::fmt::Formatter; +use std::fmt::Debug; use std::path::Path; use cxx::{UniquePtr, SharedPtr, let_cxx_string}; +pub use ffi::BridgeStatus; +pub use ffi::StatusBridgeCode; +pub use ffi::StatusCode; +pub use ffi::StatusSubCode; +pub use ffi::StatusSeverity; +pub use ffi::IteratorBridge; use ffi::*; +impl std::fmt::Display for BridgeStatus { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + Debug::fmt(self, f) + } +} + +impl std::error::Error for BridgeStatus {} + type Result = std::result::Result; pub type Options = UniquePtr; @@ -249,15 +265,27 @@ pub type Iterator = UniquePtr; pub trait IteratorImpl { fn seek(&self, key: impl AsRef<[u8]>); fn seek_for_prev(&self, key: impl AsRef<[u8]>); + fn key(&self) -> Slice; + fn value(&self) -> Slice; } impl IteratorImpl for IteratorBridge { + #[inline] fn seek(&self, key: impl AsRef<[u8]>) { self.do_seek(key.as_ref()); } + #[inline] fn seek_for_prev(&self, key: impl AsRef<[u8]>) { self.do_seek_for_prev(key.as_ref()) } + #[inline] + fn key(&self) -> Slice { + Slice(self.key_raw()) + } + #[inline] + fn value(&self) -> Slice { + Slice(self.value_raw()) + } } fn get_path_bytes(path: &std::path::Path) -> &[u8] { @@ -304,7 +332,6 @@ pub trait DBImpl { fn delete(&self, key: impl AsRef<[u8]>, cf: &ColumnFamilyHandle, options: Option<&WriteOptions>) -> Result; fn write(&self, updates: WriteBatch, options: Option<&WriteOptions>) -> Result; - } impl DBImpl for DB { @@ -416,8 +443,8 @@ impl DBImpl for DB { fn write(&self, mut updates: WriteBatch, options: Option<&WriteOptions>) -> Result { let mut status = BridgeStatus::default(); self.inner.write_raw(options.unwrap_or(&self.default_write_options), - updates.pin_mut(), - &mut status); + updates.pin_mut(), + &mut status); if status.code == StatusCode::kOk { Ok(status) } else { diff --git a/src/definition.rs b/src/definition.rs index 9761a892..91748dfa 100644 --- a/src/definition.rs +++ b/src/definition.rs @@ -5,12 +5,13 @@ use crate::env::{Env, LayeredEnv, StructuredEnvItem}; use crate::error::Result; use crate::error::CozoError::*; use crate::eval::Evaluator; -use crate::storage::Storage; -use crate::typing::{Col, Columns, Edge, Index, Node, Structured, TableId, Typing}; +use crate::storage::{RocksStorage, Storage}; +use crate::typing::{Col, Columns, Edge, Index, Node, StorageStatus, Structured, TableId, Typing}; use crate::typing::StorageStatus::{Planned, Stored}; -use crate::value::{ByteArrayBuilder, Value}; +use crate::value::{ByteArrayBuilder, ByteArrayParser, Value}; use crate::parser::{Parser, Rule}; use pest::Parser as PestParser; +use cozo_rocks::*; // use rocksdb::IteratorMode; fn parse_ident(pair: Pair) -> String { @@ -290,110 +291,115 @@ pub enum TableKind { Index = 4, } -impl Storage { +impl RocksStorage { #[allow(unused_variables)] fn all_metadata(&self, env: &StructuredEnvItem) -> Result> { - todo!() - // let it = self.db.as_ref().ok_or(DatabaseClosed)?.full_iterator(IteratorMode::Start); - // - // let mut ret = vec![]; - // for (k, v) in it { - // let mut key_parser = ByteArrayParser::new(&k); - // let table_name = key_parser.parse_value().unwrap().get_string().unwrap(); - // - // let mut data_parser = ByteArrayParser::new(&v); - // let table_kind = data_parser.parse_value().unwrap(); - // let table_id = TableId { name: table_name, global: true }; - // match table_kind { - // Value::UInt(i) if i == TableKind::Node as u64 => { - // let keys: Vec<_> = data_parser.parse_value().unwrap().get_list().unwrap() - // .into_iter().map(|v| { - // let mut vs = v.get_list().unwrap().into_iter(); - // let name = vs.next().unwrap().get_string().unwrap(); - // let typ = vs.next().unwrap().get_string().unwrap(); - // let typ = env.build_type_from_str(&typ).unwrap(); - // let default = vs.next().unwrap().into_owned(); - // Col { - // name, - // typ, - // default, - // } - // }).collect(); - // let cols: Vec<_> = data_parser.parse_value().unwrap().get_list().unwrap() - // .into_iter().map(|v| { - // let mut vs = v.get_list().unwrap().into_iter(); - // let name = vs.next().unwrap().get_string().unwrap(); - // let typ = vs.next().unwrap().get_string().unwrap(); - // let typ = env.build_type_from_str(&typ).unwrap(); - // let default = vs.next().unwrap().into_owned(); - // Col { - // name, - // typ, - // default, - // } - // }).collect(); - // let node = Node { - // status: StorageStatus::Stored, - // id: table_id, - // keys, - // cols, - // out_e: vec![], // TODO fix these - // in_e: vec![], - // attached: vec![], - // }; - // ret.push(Structured::Node(node)); - // } - // Value::UInt(i) if i == TableKind::Edge as u64 => { - // let src_name = data_parser.parse_value().unwrap().get_string().unwrap(); - // let dst_name = data_parser.parse_value().unwrap().get_string().unwrap(); - // let src_id = TableId { name: src_name, global: true }; - // let dst_id = TableId { name: dst_name, global: true }; - // let keys: Vec<_> = data_parser.parse_value().unwrap().get_list().unwrap() - // .into_iter().map(|v| { - // let mut vs = v.get_list().unwrap().into_iter(); - // let name = vs.next().unwrap().get_string().unwrap(); - // let typ = vs.next().unwrap().get_string().unwrap(); - // let typ = env.build_type_from_str(&typ).unwrap(); - // let default = vs.next().unwrap().into_owned(); - // Col { - // name, - // typ, - // default, - // } - // }).collect(); - // let cols: Vec<_> = data_parser.parse_value().unwrap().get_list().unwrap() - // .into_iter().map(|v| { - // let mut vs = v.get_list().unwrap().into_iter(); - // let name = vs.next().unwrap().get_string().unwrap(); - // let typ = vs.next().unwrap().get_string().unwrap(); - // let typ = env.build_type_from_str(&typ).unwrap(); - // let default = vs.next().unwrap().into_owned(); - // Col { - // name, - // typ, - // default, - // } - // }).collect(); - // let edge = Edge { - // status: StorageStatus::Stored, - // src: src_id, - // dst: dst_id, - // id: table_id, - // keys, - // cols, - // }; - // ret.push(Structured::Edge(edge)); - // } - // Value::UInt(i) if i == TableKind::Columns as u64 => { - // todo!() - // } - // Value::UInt(i) if i == TableKind::Index as u64 => { - // todo!() - // } - // _ => unreachable!() - // } - // } - // Ok(ret) + let default_cf = self.db.get_cf_handle("default")?; + let it = self.db.iterator(&default_cf, None); + let mut ret = vec![]; + + while it.is_valid() { + let k = it.key(); + let v = it.value(); + let mut key_parser = ByteArrayParser::new(&k); + let mut data_parser = ByteArrayParser::new(&v); + + let table_name = key_parser.parse_value().unwrap().get_string().unwrap(); + let table_kind = data_parser.parse_value().unwrap(); + let table_id = TableId { name: table_name, global: true }; + + match table_kind { + Value::UInt(i) if i == TableKind::Node as u64 => { + let keys: Vec<_> = data_parser.parse_value().unwrap().get_list().unwrap() + .into_iter().map(|v| { + let mut vs = v.get_list().unwrap().into_iter(); + let name = vs.next().unwrap().get_string().unwrap(); + let typ = vs.next().unwrap().get_string().unwrap(); + let typ = env.build_type_from_str(&typ).unwrap(); + let default = vs.next().unwrap().into_owned(); + Col { + name, + typ, + default, + } + }).collect(); + let cols: Vec<_> = data_parser.parse_value().unwrap().get_list().unwrap() + .into_iter().map(|v| { + let mut vs = v.get_list().unwrap().into_iter(); + let name = vs.next().unwrap().get_string().unwrap(); + let typ = vs.next().unwrap().get_string().unwrap(); + let typ = env.build_type_from_str(&typ).unwrap(); + let default = vs.next().unwrap().into_owned(); + Col { + name, + typ, + default, + } + }).collect(); + let node = Node { + status: StorageStatus::Stored, + id: table_id, + keys, + cols, + out_e: vec![], // TODO fix these + in_e: vec![], + attached: vec![], + }; + ret.push(Structured::Node(node)); + } + Value::UInt(i) if i == TableKind::Edge as u64 => { + let src_name = data_parser.parse_value().unwrap().get_string().unwrap(); + let dst_name = data_parser.parse_value().unwrap().get_string().unwrap(); + let src_id = TableId { name: src_name, global: true }; + let dst_id = TableId { name: dst_name, global: true }; + let keys: Vec<_> = data_parser.parse_value().unwrap().get_list().unwrap() + .into_iter().map(|v| { + let mut vs = v.get_list().unwrap().into_iter(); + let name = vs.next().unwrap().get_string().unwrap(); + let typ = vs.next().unwrap().get_string().unwrap(); + let typ = env.build_type_from_str(&typ).unwrap(); + let default = vs.next().unwrap().into_owned(); + Col { + name, + typ, + default, + } + }).collect(); + let cols: Vec<_> = data_parser.parse_value().unwrap().get_list().unwrap() + .into_iter().map(|v| { + let mut vs = v.get_list().unwrap().into_iter(); + let name = vs.next().unwrap().get_string().unwrap(); + let typ = vs.next().unwrap().get_string().unwrap(); + let typ = env.build_type_from_str(&typ).unwrap(); + let default = vs.next().unwrap().into_owned(); + Col { + name, + typ, + default, + } + }).collect(); + let edge = Edge { + status: StorageStatus::Stored, + src: src_id, + dst: dst_id, + id: table_id, + keys, + cols, + }; + ret.push(Structured::Edge(edge)); + } + Value::UInt(i) if i == TableKind::Columns as u64 => { + todo!() + } + Value::UInt(i) if i == TableKind::Index as u64 => { + todo!() + } + _ => unreachable!() + } + + it.next(); + } + Ok(ret) } fn persist_node(&mut self, node: &mut Node) -> Result<()> { @@ -450,7 +456,7 @@ impl Storage { } } -impl Evaluator { +impl Evaluator { pub fn restore_metadata(&mut self) -> Result<()> { let mds = self.storage.all_metadata(self.s_envs.root())?; for md in &mds { @@ -535,6 +541,7 @@ impl Evaluator { mod tests { use super::*; use pest::Parser as PestParser; + use crate::eval::EvaluatorWithStorage; use crate::parser::Parser; #[test] @@ -552,7 +559,7 @@ mod tests { } "#; let parsed = Parser::parse(Rule::file, s).unwrap(); - let mut eval = Evaluator::new("_path_for_rocksdb_storagex".to_string()).unwrap(); + let mut eval = EvaluatorWithStorage::new("_path_for_rocksdb_storagex".to_string()).unwrap(); eval.build_table(parsed).unwrap(); eval.restore_metadata().unwrap(); eval.storage.delete().unwrap(); diff --git a/src/error.rs b/src/error.rs index 1727de4d..b4918e08 100644 --- a/src/error.rs +++ b/src/error.rs @@ -43,8 +43,8 @@ pub enum CozoError { #[error(transparent)] Parse(#[from] pest::error::Error), - // #[error(transparent)] - // Storage(#[from] rocksdb::Error) + #[error(transparent)] + Storage(#[from] cozo_rocks::BridgeStatus) } pub type Result = result::Result; \ No newline at end of file diff --git a/src/eval.rs b/src/eval.rs index fddba504..228a6eb5 100644 --- a/src/eval.rs +++ b/src/eval.rs @@ -6,24 +6,30 @@ use crate::error::CozoError::*; use crate::value::Value::*; use crate::ast::*; use crate::env::StructuredEnv; -use crate::storage::Storage; +use crate::storage::{DummyStorage, RocksStorage, Storage}; -pub struct Evaluator { +pub struct Evaluator { pub s_envs: StructuredEnv, - pub storage: Storage, + pub storage: S, } -impl Evaluator { - pub fn no_storage() -> Self { - Self { s_envs: StructuredEnv::new(), storage: Storage::no_storage() } - } +pub type EvaluatorWithStorage = Evaluator; +pub type BareEvaluator = Evaluator; + +impl EvaluatorWithStorage { pub fn new(path: String) -> Result { - Ok(Self { s_envs: StructuredEnv::new(), storage: Storage::new(path)? }) + Ok(Self { s_envs: StructuredEnv::new(), storage: RocksStorage::new(path)? }) + } +} + +impl BareEvaluator { + pub fn new() -> Self { + Self { s_envs: StructuredEnv::new(), storage: DummyStorage } } } -impl<'a> ExprVisitor<'a, Result>> for Evaluator { +impl<'a, S: Storage> ExprVisitor<'a, Result>> for Evaluator { fn visit_expr(&mut self, ex: &Expr<'a>) -> Result> { match ex { Apply(op, args) => { @@ -57,7 +63,7 @@ impl<'a> ExprVisitor<'a, Result>> for Evaluator { } } -impl Evaluator { +impl Evaluator { fn add_exprs<'a>(&mut self, exprs: &[Expr<'a>]) -> Result> { match exprs { [a, b] => { @@ -499,7 +505,7 @@ mod tests { #[test] fn operators() { - let mut ev = Evaluator::no_storage(); + let mut ev = BareEvaluator::new(); println!("{:#?}", ev.visit_expr(&parse_expr_from_str("1/10+(-2+3)*4^5").unwrap()).unwrap()); println!("{:#?}", ev.visit_expr(&parse_expr_from_str("true && false").unwrap()).unwrap()); @@ -507,7 +513,6 @@ mod tests { println!("{:#?}", ev.visit_expr(&parse_expr_from_str("true || null").unwrap()).unwrap()); println!("{:#?}", ev.visit_expr(&parse_expr_from_str("null || true").unwrap()).unwrap()); println!("{:#?}", ev.visit_expr(&parse_expr_from_str("true && null").unwrap()).unwrap()); - ev.storage.delete().unwrap(); } @@ -842,7 +847,7 @@ mod tests { {"_src":205,"_dst":11,"_type":"InDepartment"}, {"_src":206,"_dst":11,"_type":"InDepartment"}]"#; let parsed = parse_expr_from_str(data)?; - let mut ev = Evaluator::no_storage(); + let mut ev = BareEvaluator::new(); let evaluated = ev.visit_expr(&parsed)?; println!("{:#?}", evaluated); Ok(()) diff --git a/src/storage.rs b/src/storage.rs index 3f3cc568..ca1a11a0 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -1,60 +1,66 @@ -use crate::error::Result; +use crate::error::{CozoError, Result}; +use cozo_rocks::*; +use crate::value::{ByteArrayBuilder, cozo_comparator_v1, Value}; -pub struct Storage { - pub db: Option<()>, +pub struct RocksStorage { + pub db: DB, #[allow(dead_code)] path: String, } -impl Storage { - pub fn no_storage() -> Self { - Self { db: None, path: "".to_string() } - } +impl RocksStorage { #[allow(unused_variables)] pub fn new(path: String) -> Result { - unimplemented!() - // let options = make_options(); - // let cfs = match DB::list_cf(&options, &path) { - // Ok(cfs) => { cfs } - // Err(_) => { vec![] } - // }; - // let cfs = cfs.into_iter().map(|name| { - // ColumnFamilyDescriptor::new(name, make_options()) - // }); - // let db = DB::open_cf_descriptors(&options, &path, cfs)?; - // Ok(Storage { db: Some(db), path }) + let options = Options::default() + .increase_parallelism() + .optimize_level_style_compaction() + .set_create_if_missing(true) + .set_comparator("cozo_comparator_v1", cozo_comparator_v1); + + let db = DB::open(options, path.as_ref())?; + Ok(RocksStorage {db, path}) } + #[allow(unused_variables)] pub fn delete(&mut self) -> Result<()> { - unimplemented!() + // unimplemented!() // drop(self.db.take()); // DB::destroy(&make_options(), &self.path)?; - // Ok(()) + Ok(()) } + #[allow(unused_variables)] pub fn put_global(&self, k: &[u8], v: &[u8]) -> Result<()> { // let db = self.db.as_ref().ok_or(DatabaseClosed)?; - // db.put(k, v)?; - unimplemented!() - // Ok(()) + let default_cf = self.db.get_cf_handle("default")?; + self.db.put(k, v, &default_cf, None)?; + + Ok(()) } #[allow(unused_variables)] pub fn create_table(&mut self, name: &str, _global: bool) -> Result<()> { - unimplemented!() - // let db = self.db.as_mut().ok_or(DatabaseClosed)?; - // db.create_cf(name, &make_options())?; - // Ok(()) + match self.db.create_column_family(name) { + Ok(_) => Ok(()), + Err(s) if s.bridge_code == StatusBridgeCode::EXISTING_ERROR => Ok(()), + Err(e) => Err(CozoError::Storage(e)) + } } #[allow(unused_variables)] pub fn drop_table(&mut self, name: &str, _global: bool) -> Result<()> { - unimplemented!() - // let db = self.db.as_mut().ok_or(DatabaseClosed)?; - // db.drop_cf(name)?; - // Ok(()) + self.db.drop_column_family(name)?; + Ok(()) } } +pub trait Storage {} + +pub struct DummyStorage; + +impl Storage for DummyStorage {} + +impl Storage for RocksStorage {} + #[cfg(test)] mod tests { use std::str::from_utf8;