diff --git a/cozo-core/Cargo.toml b/cozo-core/Cargo.toml index cb936e49..cd7e335b 100644 --- a/cozo-core/Cargo.toml +++ b/cozo-core/Cargo.toml @@ -16,6 +16,9 @@ exclude = [ # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [features] +storage-rocksdb = [] +storage-sled = [] +storage-tikv = [] jemalloc = ["tikv-jemallocator-global", "cozorocks/jemalloc"] io-uring = ["cozorocks/io-uring"] diff --git a/cozo-core/src/algo/all_pairs_shortest_path.rs b/cozo-core/src/algo/all_pairs_shortest_path.rs index 8ad1aed0..6872ecdd 100644 --- a/cozo-core/src/algo/all_pairs_shortest_path.rs +++ b/cozo-core/src/algo/all_pairs_shortest_path.rs @@ -27,12 +27,12 @@ use crate::runtime::transact::SessionTx; pub(crate) struct BetweennessCentrality; impl AlgoImpl for BetweennessCentrality { - fn run( + fn run<'a>( &mut self, - tx: &SessionTx, - algo: &MagicAlgoApply, - stores: &BTreeMap, - out: &InMemRelation, + tx: &'a SessionTx, + algo: &'a MagicAlgoApply, + stores: &'a BTreeMap, + out: &'a InMemRelation, poison: Poison, ) -> Result<()> { let edges = algo.relation(0)?; @@ -97,12 +97,12 @@ impl AlgoImpl for BetweennessCentrality { pub(crate) struct ClosenessCentrality; impl AlgoImpl for ClosenessCentrality { - fn run( + fn run<'a>( &mut self, - tx: &SessionTx, - algo: &MagicAlgoApply, - stores: &BTreeMap, - out: &InMemRelation, + tx: &'a SessionTx, + algo: &'a MagicAlgoApply, + stores: &'a BTreeMap, + out: &'a InMemRelation, poison: Poison, ) -> Result<()> { let edges = algo.relation(0)?; diff --git a/cozo-core/src/algo/astar.rs b/cozo-core/src/algo/astar.rs index 272fdf8a..f3fa9a59 100644 --- a/cozo-core/src/algo/astar.rs +++ b/cozo-core/src/algo/astar.rs @@ -24,12 +24,12 @@ use crate::runtime::transact::SessionTx; pub(crate) struct ShortestPathAStar; impl AlgoImpl for ShortestPathAStar { - fn run( + fn run<'a>( &mut self, - tx: &SessionTx, - algo: &MagicAlgoApply, - stores: &BTreeMap, - out: &InMemRelation, + tx: &'a SessionTx, + algo: &'a MagicAlgoApply, + stores: &'a BTreeMap, + out: &'a InMemRelation, poison: Poison, ) -> Result<()> { let edges = algo.relation_with_min_len(0, 3, tx, stores)?; @@ -81,14 +81,14 @@ impl AlgoImpl for ShortestPathAStar { } } -fn astar( +fn astar<'a>( starting: &Tuple, goal: &Tuple, - edges: &MagicAlgoRuleArg, - nodes: &MagicAlgoRuleArg, + edges: &'a MagicAlgoRuleArg, + nodes: &'a MagicAlgoRuleArg, heuristic: &Expr, - tx: &SessionTx, - stores: &BTreeMap, + tx: &'a SessionTx, + stores: &'a BTreeMap, poison: Poison, ) -> Result<(f64, Vec)> { let start_node = &starting.0[0]; diff --git a/cozo-core/src/algo/bfs.rs b/cozo-core/src/algo/bfs.rs index e13ca36d..5fa5ef06 100644 --- a/cozo-core/src/algo/bfs.rs +++ b/cozo-core/src/algo/bfs.rs @@ -21,12 +21,12 @@ use crate::runtime::transact::SessionTx; pub(crate) struct Bfs; impl AlgoImpl for Bfs { - fn run( + fn run<'a>( &mut self, - tx: &SessionTx, - algo: &MagicAlgoApply, - stores: &BTreeMap, - out: &InMemRelation, + tx: &'a SessionTx, + algo: &'a MagicAlgoApply, + stores: &'a BTreeMap, + out: &'a InMemRelation, poison: Poison, ) -> Result<()> { let edges = algo.relation_with_min_len(0, 2, tx, stores)?; diff --git a/cozo-core/src/algo/degree_centrality.rs b/cozo-core/src/algo/degree_centrality.rs index 775c26cb..f13ac886 100644 --- a/cozo-core/src/algo/degree_centrality.rs +++ b/cozo-core/src/algo/degree_centrality.rs @@ -21,12 +21,12 @@ use crate::runtime::transact::SessionTx; pub(crate) struct DegreeCentrality; impl AlgoImpl for DegreeCentrality { - fn run( + fn run<'a>( &mut self, - tx: &SessionTx, - algo: &MagicAlgoApply, - stores: &BTreeMap, - out: &InMemRelation, + tx: &'a SessionTx, + algo: &'a MagicAlgoApply, + stores: &'a BTreeMap, + out: &'a InMemRelation, poison: Poison, ) -> Result<()> { let it = algo diff --git a/cozo-core/src/algo/dfs.rs b/cozo-core/src/algo/dfs.rs index 27205685..e44466dd 100644 --- a/cozo-core/src/algo/dfs.rs +++ b/cozo-core/src/algo/dfs.rs @@ -21,12 +21,12 @@ use crate::runtime::transact::SessionTx; pub(crate) struct Dfs; impl AlgoImpl for Dfs { - fn run( + fn run<'a>( &mut self, - tx: &SessionTx, - algo: &MagicAlgoApply, - stores: &BTreeMap, - out: &InMemRelation, + tx: &'a SessionTx, + algo: &'a MagicAlgoApply, + stores: &'a BTreeMap, + out: &'a InMemRelation, poison: Poison, ) -> Result<()> { let edges = algo.relation_with_min_len(0, 2, tx, stores)?; diff --git a/cozo-core/src/algo/kruskal.rs b/cozo-core/src/algo/kruskal.rs index f7788704..6a822cbb 100644 --- a/cozo-core/src/algo/kruskal.rs +++ b/cozo-core/src/algo/kruskal.rs @@ -25,12 +25,12 @@ use crate::runtime::transact::SessionTx; pub(crate) struct MinimumSpanningForestKruskal; impl AlgoImpl for MinimumSpanningForestKruskal { - fn run( + fn run<'a>( &mut self, - tx: &SessionTx, - algo: &MagicAlgoApply, - stores: &BTreeMap, - out: &InMemRelation, + tx: &'a SessionTx, + algo: &'a MagicAlgoApply, + stores: &'a BTreeMap, + out: &'a InMemRelation, poison: Poison, ) -> Result<()> { let edges = algo.relation(0)?; diff --git a/cozo-core/src/algo/label_propagation.rs b/cozo-core/src/algo/label_propagation.rs index bf5083aa..61c09d60 100644 --- a/cozo-core/src/algo/label_propagation.rs +++ b/cozo-core/src/algo/label_propagation.rs @@ -23,12 +23,12 @@ use crate::runtime::transact::SessionTx; pub(crate) struct LabelPropagation; impl AlgoImpl for LabelPropagation { - fn run( + fn run<'a>( &mut self, - tx: &SessionTx, - algo: &MagicAlgoApply, - stores: &BTreeMap, - out: &InMemRelation, + tx: &'a SessionTx, + algo: &'a MagicAlgoApply, + stores: &'a BTreeMap, + out: &'a InMemRelation, poison: Poison, ) -> Result<()> { let edges = algo.relation(0)?; diff --git a/cozo-core/src/algo/louvain.rs b/cozo-core/src/algo/louvain.rs index cd686153..2883fad6 100644 --- a/cozo-core/src/algo/louvain.rs +++ b/cozo-core/src/algo/louvain.rs @@ -23,12 +23,12 @@ use crate::runtime::transact::SessionTx; pub(crate) struct CommunityDetectionLouvain; impl AlgoImpl for CommunityDetectionLouvain { - fn run( + fn run<'a>( &mut self, - tx: &SessionTx, - algo: &MagicAlgoApply, - stores: &BTreeMap, - out: &InMemRelation, + tx: &'a SessionTx, + algo: &'a MagicAlgoApply, + stores: &'a BTreeMap, + out: &'a InMemRelation, poison: Poison, ) -> Result<()> { let edges = algo.relation(0)?; diff --git a/cozo-core/src/algo/mod.rs b/cozo-core/src/algo/mod.rs index c5d22b8b..55db7d74 100644 --- a/cozo-core/src/algo/mod.rs +++ b/cozo-core/src/algo/mod.rs @@ -60,12 +60,12 @@ pub(crate) mod triangles; pub(crate) mod yen; pub(crate) trait AlgoImpl { - fn run( + fn run<'a>( &mut self, - tx: &SessionTx, - algo: &MagicAlgoApply, - stores: &BTreeMap, - out: &InMemRelation, + tx: &'a SessionTx, + algo: &'a MagicAlgoApply, + stores: &'a BTreeMap, + out: &'a InMemRelation, poison: Poison, ) -> Result<()>; fn arity( @@ -191,12 +191,12 @@ pub(crate) struct BadExprValueError( pub(crate) struct AlgoNotFoundError(pub(crate) String, #[label] pub(crate) SourceSpan); impl MagicAlgoRuleArg { - pub(crate) fn convert_edge_to_weighted_graph( - &self, + pub(crate) fn convert_edge_to_weighted_graph<'a>( + &'a self, undirected: bool, allow_negative_edges: bool, - tx: &SessionTx, - stores: &BTreeMap, + tx: &'a SessionTx, + stores: &'a BTreeMap, ) -> Result<( Vec>, Vec, @@ -276,11 +276,11 @@ impl MagicAlgoRuleArg { } Ok((graph, indices, inv_indices, has_neg_edge)) } - pub(crate) fn convert_edge_to_graph( - &self, + pub(crate) fn convert_edge_to_graph<'a>( + &'a self, undirected: bool, - tx: &SessionTx, - stores: &BTreeMap, + tx: &'a SessionTx, + stores: &'a BTreeMap, ) -> Result<(Vec>, Vec, BTreeMap)> { let mut graph: Vec> = vec![]; let mut indices: Vec = vec![]; diff --git a/cozo-core/src/algo/pagerank.rs b/cozo-core/src/algo/pagerank.rs index 597b35ff..854e6430 100644 --- a/cozo-core/src/algo/pagerank.rs +++ b/cozo-core/src/algo/pagerank.rs @@ -24,12 +24,12 @@ use crate::runtime::transact::SessionTx; pub(crate) struct PageRank; impl AlgoImpl for PageRank { - fn run( + fn run<'a>( &mut self, - tx: &SessionTx, - algo: &MagicAlgoApply, - stores: &BTreeMap, - out: &InMemRelation, + tx: &'a SessionTx, + algo: &'a MagicAlgoApply, + stores: &'a BTreeMap, + out: &'a InMemRelation, poison: Poison, ) -> Result<()> { let edges = algo.relation(0)?; diff --git a/cozo-core/src/algo/prim.rs b/cozo-core/src/algo/prim.rs index 8127e601..7e5db062 100644 --- a/cozo-core/src/algo/prim.rs +++ b/cozo-core/src/algo/prim.rs @@ -26,12 +26,12 @@ use crate::runtime::transact::SessionTx; pub(crate) struct MinimumSpanningTreePrim; impl AlgoImpl for MinimumSpanningTreePrim { - fn run( + fn run<'a>( &mut self, - tx: &SessionTx, - algo: &MagicAlgoApply, - stores: &BTreeMap, - out: &InMemRelation, + tx: &'a SessionTx, + algo: &'a MagicAlgoApply, + stores: &'a BTreeMap, + out: &'a InMemRelation, poison: Poison, ) -> Result<()> { let edges = algo.relation(0)?; diff --git a/cozo-core/src/algo/random_walk.rs b/cozo-core/src/algo/random_walk.rs index b61a6632..dcdc73df 100644 --- a/cozo-core/src/algo/random_walk.rs +++ b/cozo-core/src/algo/random_walk.rs @@ -24,12 +24,12 @@ use crate::runtime::transact::SessionTx; pub(crate) struct RandomWalk; impl AlgoImpl for RandomWalk { - fn run( + fn run<'a>( &mut self, - tx: &SessionTx, - algo: &MagicAlgoApply, - stores: &BTreeMap, - out: &InMemRelation, + tx: &'a SessionTx, + algo: &'a MagicAlgoApply, + stores: &'a BTreeMap, + out: &'a InMemRelation, poison: Poison, ) -> Result<()> { let edges = algo.relation_with_min_len(0, 2, tx, stores)?; diff --git a/cozo-core/src/algo/reorder_sort.rs b/cozo-core/src/algo/reorder_sort.rs index 14a489a7..443d474f 100644 --- a/cozo-core/src/algo/reorder_sort.rs +++ b/cozo-core/src/algo/reorder_sort.rs @@ -23,12 +23,12 @@ use crate::runtime::transact::SessionTx; pub(crate) struct ReorderSort; impl AlgoImpl for ReorderSort { - fn run( + fn run<'a>( &mut self, - tx: &SessionTx, - algo: &MagicAlgoApply, - stores: &BTreeMap, - out: &InMemRelation, + tx: &'a SessionTx, + algo: &'a MagicAlgoApply, + stores: &'a BTreeMap, + out: &'a InMemRelation, poison: Poison, ) -> Result<()> { let in_rel = algo.relation(0)?; diff --git a/cozo-core/src/algo/shortest_path_dijkstra.rs b/cozo-core/src/algo/shortest_path_dijkstra.rs index b4abd82b..01fa7583 100644 --- a/cozo-core/src/algo/shortest_path_dijkstra.rs +++ b/cozo-core/src/algo/shortest_path_dijkstra.rs @@ -28,12 +28,12 @@ use crate::runtime::transact::SessionTx; pub(crate) struct ShortestPathDijkstra; impl AlgoImpl for ShortestPathDijkstra { - fn run( + fn run<'a>( &mut self, - tx: &SessionTx, - algo: &MagicAlgoApply, - stores: &BTreeMap, - out: &InMemRelation, + tx: &'a SessionTx, + algo: &'a MagicAlgoApply, + stores: &'a BTreeMap, + out: &'a InMemRelation, poison: Poison, ) -> Result<()> { let edges = algo.relation(0)?; diff --git a/cozo-core/src/algo/strongly_connected_components.rs b/cozo-core/src/algo/strongly_connected_components.rs index 6f3b3162..612b3db1 100644 --- a/cozo-core/src/algo/strongly_connected_components.rs +++ b/cozo-core/src/algo/strongly_connected_components.rs @@ -31,12 +31,12 @@ impl StronglyConnectedComponent { } impl AlgoImpl for StronglyConnectedComponent { - fn run( + fn run<'a>( &mut self, - tx: &SessionTx, - algo: &MagicAlgoApply, - stores: &BTreeMap, - out: &InMemRelation, + tx: &'a SessionTx, + algo: &'a MagicAlgoApply, + stores: &'a BTreeMap, + out: &'a InMemRelation, poison: Poison, ) -> Result<()> { let edges = algo.relation(0)?; diff --git a/cozo-core/src/algo/top_sort.rs b/cozo-core/src/algo/top_sort.rs index 48d5a547..c1672f48 100644 --- a/cozo-core/src/algo/top_sort.rs +++ b/cozo-core/src/algo/top_sort.rs @@ -21,12 +21,12 @@ use crate::runtime::transact::SessionTx; pub(crate) struct TopSort; impl AlgoImpl for TopSort { - fn run( + fn run<'a>( &mut self, - tx: &SessionTx, - algo: &MagicAlgoApply, - stores: &BTreeMap, - out: &InMemRelation, + tx: &'a SessionTx, + algo: &'a MagicAlgoApply, + stores: &'a BTreeMap, + out: &'a InMemRelation, poison: Poison, ) -> Result<()> { let edges = algo.relation(0)?; diff --git a/cozo-core/src/algo/triangles.rs b/cozo-core/src/algo/triangles.rs index 66bbcfe5..aecb6a36 100644 --- a/cozo-core/src/algo/triangles.rs +++ b/cozo-core/src/algo/triangles.rs @@ -22,12 +22,12 @@ use crate::runtime::transact::SessionTx; pub(crate) struct ClusteringCoefficients; impl AlgoImpl for ClusteringCoefficients { - fn run( + fn run<'a>( &mut self, - tx: &SessionTx, - algo: &MagicAlgoApply, - stores: &BTreeMap, - out: &InMemRelation, + tx: &'a SessionTx, + algo: &'a MagicAlgoApply, + stores: &'a BTreeMap, + out: &'a InMemRelation, poison: Poison, ) -> Result<()> { let edges = algo.relation(0)?; diff --git a/cozo-core/src/algo/yen.rs b/cozo-core/src/algo/yen.rs index eb2c8379..fccc4df1 100644 --- a/cozo-core/src/algo/yen.rs +++ b/cozo-core/src/algo/yen.rs @@ -24,12 +24,12 @@ use crate::runtime::transact::SessionTx; pub(crate) struct KShortestPathYen; impl AlgoImpl for KShortestPathYen { - fn run( + fn run<'a>( &mut self, - tx: &SessionTx, - algo: &MagicAlgoApply, - stores: &BTreeMap, - out: &InMemRelation, + tx: &'a SessionTx, + algo: &'a MagicAlgoApply, + stores: &'a BTreeMap, + out: &'a InMemRelation, poison: Poison, ) -> Result<()> { let edges = algo.relation(0)?; diff --git a/cozo-core/src/bin/cozoserver.rs b/cozo-core/src/bin/cozoserver.rs index 4b952a66..0e08f42e 100644 --- a/cozo-core/src/bin/cozoserver.rs +++ b/cozo-core/src/bin/cozoserver.rs @@ -14,7 +14,7 @@ use log::{error, info}; use rand::Rng; use rouille::{router, try_or_400, Request, Response}; -use cozo::Db; +use cozo::new_cozo_rocksdb; #[derive(Parser, Debug)] #[clap(version, about, long_about = None)] @@ -39,7 +39,7 @@ fn main() { eprintln!("{}", SECURITY_WARNING); } - let db = Db::new(args.path.as_str()).unwrap(); + let db = new_cozo_rocksdb(args.path.as_str()).unwrap(); let mut path_buf = PathBuf::from(&args.path); path_buf.push("auth.txt"); diff --git a/cozo-core/src/data/tuple.rs b/cozo-core/src/data/tuple.rs index 6dc267a8..a538c22c 100644 --- a/cozo-core/src/data/tuple.rs +++ b/cozo-core/src/data/tuple.rs @@ -13,7 +13,7 @@ use crate::runtime::relation::RelationId; pub(crate) const KEY_PREFIX_LEN: usize = 9; #[derive(Clone, Ord, PartialOrd, Eq, PartialEq, Default)] -pub(crate) struct Tuple(pub(crate) Vec); +pub struct Tuple(pub(crate) Vec); impl Debug for Tuple { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { diff --git a/cozo-core/src/lib.rs b/cozo-core/src/lib.rs index 72e5bcda..ab82d505 100644 --- a/cozo-core/src/lib.rs +++ b/cozo-core/src/lib.rs @@ -19,6 +19,8 @@ pub use miette::Error; pub use runtime::db::Db; +pub use runtime::db::new_cozo_rocksdb; +pub use storage::rocks::RocksDbStorage; pub(crate) mod algo; pub(crate) mod data; diff --git a/cozo-core/src/query/stored.rs b/cozo-core/src/query/stored.rs index 14e9d6d3..3e52e2b4 100644 --- a/cozo-core/src/query/stored.rs +++ b/cozo-core/src/query/stored.rs @@ -20,8 +20,8 @@ use crate::data::value::DataValue; use crate::parse::parse_script; use crate::runtime::relation::{AccessLevel, InputRelationHandle, InsufficientAccessLevel}; use crate::runtime::transact::SessionTx; +use crate::storage::Storage; use crate::Db; -use crate::storage::StoreTx; #[derive(Debug, Error, Diagnostic)] #[error("attempting to write into relation {0} of arity {1} with data of arity {2}")] @@ -29,14 +29,17 @@ use crate::storage::StoreTx; struct RelationArityMismatch(String, usize, usize); impl SessionTx { - pub(crate) fn execute_relation<'a>( - &'a mut self, - db: &Db, - res_iter: impl Iterator> + 'a, + pub(crate) fn execute_relation( + &mut self, + db: &Db, + res_iter: impl Iterator>, op: RelationOp, meta: &InputRelationHandle, headers: &[Symbol], - ) -> Result, Vec)>> { + ) -> Result, Vec)>> + where + ::Tx: 'static, + { let mut to_clear = vec![]; let mut replaced_old_triggers = None; if op == RelationOp::Replace { diff --git a/cozo-core/src/runtime/db.rs b/cozo-core/src/runtime/db.rs index 55cfe39c..81feaf33 100644 --- a/cozo-core/src/runtime/db.rs +++ b/cozo-core/src/runtime/db.rs @@ -67,14 +67,17 @@ const CURRENT_STORAGE_VERSION: u64 = 1; /// The database object of Cozo. #[derive(Clone)] -pub struct Db { - db: RocksDbStorage, +pub struct Db { + db: S, relation_store_id: Arc, queries_count: Arc, running_queries: Arc>>, } -impl Debug for Db { +impl Debug for Db +where + S: Storage, +{ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!(f, "Db") } @@ -91,64 +94,73 @@ lazy_static! { static ref JSON_ERR_HANDLER: JSONReportHandler = miette::JSONReportHandler::new(); } -impl Db { - /// Creates a database object. - pub fn new(path: impl AsRef) -> Result { - let builder = DbBuilder::default().path(path.as_ref()); - let path = builder.opts.db_path; - fs::create_dir_all(path) - .map_err(|err| BadDbInit(format!("cannot create directory {}: {}", path, err)))?; - let path_buf = PathBuf::from(path); - - let is_new = { - let mut manifest_path = path_buf.clone(); - manifest_path.push("manifest"); - - if manifest_path.exists() { - let existing: DbManifest = rmp_serde::from_slice( - &fs::read(manifest_path) - .into_diagnostic() - .wrap_err_with(|| "when reading manifest")?, - ) - .into_diagnostic() - .wrap_err_with(|| "when reading manifest")?; - assert_eq!( - existing.storage_version, CURRENT_STORAGE_VERSION, - "Unknown storage version {}", - existing.storage_version - ); - false - } else { - fs::write( - manifest_path, - rmp_serde::to_vec_named(&DbManifest { - storage_version: CURRENT_STORAGE_VERSION, - }) +/// Creates a database object. +pub fn new_cozo_rocksdb(path: impl AsRef) -> Result> { + let builder = DbBuilder::default().path(path.as_ref()); + let path = builder.opts.db_path; + fs::create_dir_all(path) + .map_err(|err| BadDbInit(format!("cannot create directory {}: {}", path, err)))?; + let path_buf = PathBuf::from(path); + + let is_new = { + let mut manifest_path = path_buf.clone(); + manifest_path.push("manifest"); + + if manifest_path.exists() { + let existing: DbManifest = rmp_serde::from_slice( + &fs::read(manifest_path) .into_diagnostic() - .wrap_err_with(|| "when serializing manifest")?, - ) - .into_diagnostic() - .wrap_err_with(|| "when serializing manifest")?; - true - } - }; - - let mut store_path = path_buf; - store_path.push("data"); - let db_builder = builder - .create_if_missing(is_new) - .use_capped_prefix_extractor(true, KEY_PREFIX_LEN) - .use_bloom_filter(true, 9.9, true) - .path( - store_path - .to_str() - .ok_or_else(|| miette!("bad path name"))?, + .wrap_err_with(|| "when reading manifest")?, + ) + .into_diagnostic() + .wrap_err_with(|| "when reading manifest")?; + assert_eq!( + existing.storage_version, CURRENT_STORAGE_VERSION, + "Unknown storage version {}", + existing.storage_version ); + false + } else { + fs::write( + manifest_path, + rmp_serde::to_vec_named(&DbManifest { + storage_version: CURRENT_STORAGE_VERSION, + }) + .into_diagnostic() + .wrap_err_with(|| "when serializing manifest")?, + ) + .into_diagnostic() + .wrap_err_with(|| "when serializing manifest")?; + true + } + }; + + let mut store_path = path_buf; + store_path.push("data"); + let db_builder = builder + .create_if_missing(is_new) + .use_capped_prefix_extractor(true, KEY_PREFIX_LEN) + .use_bloom_filter(true, 9.9, true) + .path( + store_path + .to_str() + .ok_or_else(|| miette!("bad path name"))?, + ); + + let db = db_builder.build()?; + + Db::new(RocksDbStorage::new(db)) +} - let db = db_builder.build()?; - +impl Db +where + S: Storage, + ::Tx: 'static, +{ + /// create a new database with the specified storage + pub fn new(storage: S) -> Result { let ret = Self { - db: RocksDbStorage::new(db), + db: storage, relation_store_id: Arc::new(Default::default()), queries_count: Arc::new(Default::default()), running_queries: Arc::new(Mutex::new(Default::default())), @@ -156,7 +168,6 @@ impl Db { ret.load_last_ids()?; Ok(ret) } - fn compact_relation(&self) -> Result<()> { let l = Tuple::default().encode_as_key(RelationId(0)); let u = Tuple(vec![DataValue::Bot]).encode_as_key(RelationId(u64::MAX)); @@ -172,7 +183,7 @@ impl Db { } fn transact(&self) -> Result { let ret = SessionTx { - tx: self.db.transact()?, + tx: Box::new(self.db.transact()?), mem_store_id: Default::default(), relation_store_id: self.relation_store_id.clone(), }; @@ -180,7 +191,7 @@ impl Db { } fn transact_write(&self) -> Result { let ret = SessionTx { - tx: self.db.transact()?, + tx: Box::new(self.db.transact()?), mem_store_id: Default::default(), relation_store_id: self.relation_store_id.clone(), }; diff --git a/cozo-core/src/runtime/relation.rs b/cozo-core/src/runtime/relation.rs index 9d25c83b..8aa863ca 100644 --- a/cozo-core/src/runtime/relation.rs +++ b/cozo-core/src/runtime/relation.rs @@ -19,7 +19,6 @@ use crate::data::tuple::{Tuple, ENCODED_KEY_MIN_LEN}; use crate::data::value::DataValue; use crate::parse::SourceSpan; use crate::runtime::transact::SessionTx; -use crate::storage::StoreTx; #[derive( Copy, @@ -233,8 +232,7 @@ impl RelationHandle { let prefix_encoded = Tuple(lower).encode_as_key(self.id); let upper_encoded = Tuple(upper).encode_as_key(self.id); // RelationIterator::new(tx, &prefix_encoded, &upper_encoded) - tx.tx - .range_scan(&prefix_encoded, &upper_encoded) + tx.tx.range_scan(&prefix_encoded, &upper_encoded) } pub(crate) fn scan_bounded_prefix( &self, @@ -250,8 +248,7 @@ impl RelationHandle { upper_t.0.push(DataValue::Bot); let lower_encoded = lower_t.encode_as_key(self.id); let upper_encoded = upper_t.encode_as_key(self.id); - tx.tx - .range_scan(&lower_encoded, &upper_encoded) + tx.tx.range_scan(&lower_encoded, &upper_encoded) } } diff --git a/cozo-core/src/runtime/transact.rs b/cozo-core/src/runtime/transact.rs index 84e3e21a..458a90ce 100644 --- a/cozo-core/src/runtime/transact.rs +++ b/cozo-core/src/runtime/transact.rs @@ -2,8 +2,8 @@ * Copyright 2022, The Cozo Project Authors. Licensed under MPL-2.0. */ -use std::sync::Arc; use std::sync::atomic::{AtomicU32, AtomicU64, Ordering}; +use std::sync::Arc; use miette::Result; @@ -14,11 +14,10 @@ use crate::data::value::DataValue; use crate::parse::SourceSpan; use crate::runtime::in_mem::{InMemRelation, StoredRelationId}; use crate::runtime::relation::RelationId; -use crate::storage::rocks::RocksDbTx; use crate::storage::StoreTx; pub struct SessionTx { - pub(crate) tx: RocksDbTx, + pub(crate) tx: Box, pub(crate) relation_store_id: Arc, pub(crate) mem_store_id: Arc, } diff --git a/cozo-core/src/storage/mod.rs b/cozo-core/src/storage/mod.rs index 2faced6f..85f3088c 100644 --- a/cozo-core/src/storage/mod.rs +++ b/cozo-core/src/storage/mod.rs @@ -10,25 +10,28 @@ pub(crate) mod rocks; pub(crate) mod sled; pub(crate) mod tikv; -pub(crate) trait Storage<'a> { - type Tx: StoreTx<'a>; +pub trait Storage { + type Tx: StoreTx; - fn transact(&'a self) -> Result; + fn transact(&self) -> Result; fn del_range(&self, lower: &[u8], upper: &[u8]) -> Result<()>; fn range_compact(&self, lower: &[u8], upper: &[u8]) -> Result<()>; } -pub(crate) trait StoreTx<'a> { - type ReadSlice: AsRef<[u8]>; - - type KVIter: Iterator>; - type KVIterRaw: Iterator, Vec)>>; - - fn get(&self, key: &[u8], for_update: bool) -> Result>; +pub trait StoreTx { + fn get(&self, key: &[u8], for_update: bool) -> Result>>; fn put(&mut self, key: &[u8], val: &[u8]) -> Result<()>; fn del(&mut self, key: &[u8]) -> Result<()>; fn exists(&self, key: &[u8], for_update: bool) -> Result; fn commit(&mut self) -> Result<()>; - fn range_scan(&'a self, lower: &[u8], upper: &[u8]) -> Self::KVIter; - fn range_scan_raw(&'a self, lower: &[u8], upper: &[u8]) -> Self::KVIterRaw; + fn range_scan( + &self, + lower: &[u8], + upper: &[u8], + ) -> Box>>; + fn range_scan_raw( + &self, + lower: &[u8], + upper: &[u8], + ) -> Box, Vec)>>>; } diff --git a/cozo-core/src/storage/rocks.rs b/cozo-core/src/storage/rocks.rs index 6709f6b4..988e7952 100644 --- a/cozo-core/src/storage/rocks.rs +++ b/cozo-core/src/storage/rocks.rs @@ -4,15 +4,16 @@ use miette::{IntoDiagnostic, Result}; -use cozorocks::{DbIter, PinSlice, RocksDb, Tx}; +use cozorocks::{DbIter, RocksDb, Tx}; use crate::data::tuple::Tuple; use crate::runtime::relation::decode_tuple_from_kv; use crate::storage::{Storage, StoreTx}; use crate::utils::swap_option_result; +/// RocksDB storage engine #[derive(Clone)] -pub(crate) struct RocksDbStorage { +pub struct RocksDbStorage { db: RocksDb, } @@ -22,7 +23,7 @@ impl RocksDbStorage { } } -impl Storage<'_> for RocksDbStorage { +impl Storage for RocksDbStorage { type Tx = RocksDbTx; fn transact(&self) -> Result { @@ -39,18 +40,14 @@ impl Storage<'_> for RocksDbStorage { } } -pub(crate) struct RocksDbTx { +pub struct RocksDbTx { db_tx: Tx, } -impl StoreTx<'_> for RocksDbTx { - type ReadSlice = PinSlice; - type KVIter = RocksDbIterator; - type KVIterRaw = RocksDbIteratorRaw; - +impl StoreTx for RocksDbTx { #[inline] - fn get(&self, key: &[u8], for_update: bool) -> Result> { - Ok(self.db_tx.get(key, for_update)?) + fn get(&self, key: &[u8], for_update: bool) -> Result>> { + Ok(self.db_tx.get(key, for_update)?.map(|v| v.to_vec())) } #[inline] @@ -72,24 +69,28 @@ impl StoreTx<'_> for RocksDbTx { Ok(self.db_tx.commit()?) } - fn range_scan(&self, lower: &[u8], upper: &[u8]) -> Self::KVIter { + fn range_scan(&self, lower: &[u8], upper: &[u8]) -> Box>> { let mut inner = self.db_tx.iterator().upper_bound(upper).start(); inner.seek(lower); - RocksDbIterator { + Box::new(RocksDbIterator { inner, started: false, upper_bound: upper.to_vec(), - } + }) } - fn range_scan_raw(&self, lower: &[u8], upper: &[u8]) -> Self::KVIterRaw { + fn range_scan_raw( + &self, + lower: &[u8], + upper: &[u8], + ) -> Box, Vec)>>> { let mut inner = self.db_tx.iterator().upper_bound(upper).start(); inner.seek(lower); - RocksDbIteratorRaw { + Box::new(RocksDbIteratorRaw { inner, started: false, upper_bound: upper.to_vec(), - } + }) } } diff --git a/cozo-core/src/storage/sled.rs b/cozo-core/src/storage/sled.rs index 392d16bf..3deaa9e1 100644 --- a/cozo-core/src/storage/sled.rs +++ b/cozo-core/src/storage/sled.rs @@ -1,291 +1,291 @@ -/* - * Copyright 2022, The Cozo Project Authors. Licensed under MPL-2.0. - */ - -use std::cmp::Ordering; -use std::collections::btree_map::Range; -use std::collections::BTreeMap; -use std::iter::Fuse; -use std::marker::PhantomData; -use std::thread; - -use miette::{IntoDiagnostic, Result}; -use sled::transaction::{ConflictableTransactionError, TransactionalTree}; -use sled::{Db, IVec, Iter}; - -use crate::data::tuple::Tuple; -use crate::runtime::relation::decode_tuple_from_kv; -use crate::storage::{Storage, StoreTx}; -use crate::utils::swap_option_result; - -#[derive(Clone)] -struct SledStorage<'a> { - db: Db, - _phantom: PhantomData<&'a [u8]>, -} - -impl<'a> Storage<'a> for SledStorage<'a> { - type Tx = SledTx<'a>; - - fn transact(&'a self) -> Result { - Ok(SledTx { - db: self.db.clone(), - changes: Default::default(), - _phantom: Default::default(), - }) - } - - fn del_range(&self, lower: &[u8], upper: &[u8]) -> Result<()> { - let db = self.db.clone(); - let lower_v = lower.to_vec(); - let upper_v = upper.to_vec(); - thread::spawn(move || -> Result<()> { - for k_res in db.range(lower_v..upper_v).keys() { - db.remove(k_res.into_diagnostic()?).into_diagnostic()?; - } - Ok(()) - }); - Ok(()) - } - - fn range_compact(&self, _lower: &[u8], _upper: &[u8]) -> Result<()> { - Ok(()) - } -} - -struct SledTx<'a> { - db: Db, - changes: BTreeMap, Option>>, - _phantom: PhantomData<&'a [u8]>, -} - -impl<'a> StoreTx<'a> for SledTx<'a> { - type ReadSlice = IVec; - type KVIter = SledIter<'a>; - type KVIterRaw = SledIterRaw<'a>; - - #[inline] - fn get(&self, key: &[u8], _for_update: bool) -> Result> { - Ok(match self.changes.get(key) { - Some(Some(val)) => Some(IVec::from(val as &[u8])), - Some(None) => None, - None => self.db.get(key).into_diagnostic()?, - }) - } - - #[inline] - fn put(&mut self, key: &[u8], val: &[u8]) -> Result<()> { - self.changes.insert(key.into(), Some(val.into())); - Ok(()) - } - - #[inline] - fn del(&mut self, key: &[u8]) -> Result<()> { - self.changes.insert(key.into(), None); - Ok(()) - } - - #[inline] - fn exists(&self, key: &[u8], _for_update: bool) -> Result { - Ok(match self.changes.get(key) { - Some(Some(_)) => true, - Some(None) => false, - None => self.db.get(key).into_diagnostic()?.is_some(), - }) - } - - fn commit(&mut self) -> Result<()> { - self.db - .transaction( - |db: &TransactionalTree| -> Result<(), ConflictableTransactionError> { - for (k, v) in &self.changes { - match v { - None => { - db.remove(k as &[u8])?; - } - Some(v) => { - db.insert(k as &[u8], v as &[u8])?; - } - } - } - Ok(()) - }, - ) - .into_diagnostic()?; - Ok(()) - } - - fn range_scan(&'a self, lower: &[u8], upper: &[u8]) -> Self::KVIter { - let change_iter = self.changes.range(lower.to_vec()..upper.to_vec()).fuse(); - let db_iter = self.db.range(lower..upper).fuse(); - SledIter { - change_iter, - db_iter, - change_cache: None, - db_cache: None, - } - } - - fn range_scan_raw(&'a self, lower: &[u8], upper: &[u8]) -> Self::KVIterRaw { - let change_iter = self.changes.range(lower.to_vec()..upper.to_vec()).fuse(); - let db_iter = self.db.range(lower..upper).fuse(); - SledIterRaw { - change_iter, - db_iter, - change_cache: None, - db_cache: None, - } - } -} - -struct SledIter<'a> { - change_iter: Fuse, Option>>>, - db_iter: Fuse, - change_cache: Option<(Vec, Option>)>, - db_cache: Option<(IVec, IVec)>, -} - -impl<'a> SledIter<'a> { - #[inline] - fn fill_cache(&mut self) -> Result<()> { - if self.change_cache.is_none() { - if let Some((k, v)) = self.change_iter.next() { - self.change_cache = Some((k.to_vec(), v.clone())) - } - } - - if self.db_cache.is_none() { - if let Some(res) = self.db_iter.next() { - self.db_cache = Some(res.into_diagnostic()?); - } - } - - Ok(()) - } - - #[inline] - fn next_inner(&mut self) -> Result> { - loop { - self.fill_cache()?; - match (&self.change_cache, &self.db_cache) { - (None, None) => return Ok(None), - (Some((_, None)), None) => { - self.change_cache.take(); - continue; - } - (Some((_, Some(_))), None) => { - let (k, sv) = self.change_cache.take().unwrap(); - let v = sv.unwrap(); - return Ok(Some(decode_tuple_from_kv(&k, &v))); - } - (None, Some(_)) => { - let (k, v) = self.db_cache.take().unwrap(); - return Ok(Some(decode_tuple_from_kv(&k, &v))); - } - (Some((ck, _)), Some((dk, _))) => match ck.as_slice().cmp(dk) { - Ordering::Less => { - let (k, sv) = self.change_cache.take().unwrap(); - match sv { - None => continue, - Some(v) => { - return Ok(Some(decode_tuple_from_kv(&k, &v))); - } - } - } - Ordering::Greater => { - let (k, v) = self.db_cache.take().unwrap(); - return Ok(Some(decode_tuple_from_kv(&k, &v))); - } - Ordering::Equal => { - self.db_cache.take(); - continue; - } - }, - } - } - } -} - -impl<'a> Iterator for SledIter<'a> { - type Item = Result; - - #[inline] - fn next(&mut self) -> Option { - swap_option_result(self.next_inner()) - } -} - -struct SledIterRaw<'a> { - change_iter: Fuse, Option>>>, - db_iter: Fuse, - change_cache: Option<(Vec, Option>)>, - db_cache: Option<(IVec, IVec)>, -} - -impl<'a> SledIterRaw<'a> { - #[inline] - fn fill_cache(&mut self) -> Result<()> { - if self.change_cache.is_none() { - if let Some((k, v)) = self.change_iter.next() { - self.change_cache = Some((k.to_vec(), v.clone())) - } - } - - if self.db_cache.is_none() { - if let Some(res) = self.db_iter.next() { - self.db_cache = Some(res.into_diagnostic()?); - } - } - - Ok(()) - } - - #[inline] - fn next_inner(&mut self) -> Result, Vec)>> { - loop { - self.fill_cache()?; - match (&self.change_cache, &self.db_cache) { - (None, None) => return Ok(None), - (Some((_, None)), None) => { - self.change_cache.take(); - continue; - } - (Some((_, Some(_))), None) => { - let (k, sv) = self.change_cache.take().unwrap(); - let v = sv.unwrap(); - return Ok(Some((k, v))); - } - (None, Some(_)) => { - let (k, v) = self.db_cache.take().unwrap(); - return Ok(Some((k.to_vec(), v.to_vec()))); - } - (Some((ck, _)), Some((dk, _))) => match ck.as_slice().cmp(dk) { - Ordering::Less => { - let (k, sv) = self.change_cache.take().unwrap(); - match sv { - None => continue, - Some(v) => return Ok(Some((k, v))), - } - } - Ordering::Greater => { - let (k, v) = self.db_cache.take().unwrap(); - return Ok(Some((k.to_vec(), v.to_vec()))); - } - Ordering::Equal => { - self.db_cache.take(); - continue; - } - }, - } - } - } -} - -impl<'a> Iterator for SledIterRaw<'a> { - type Item = Result<(Vec, Vec)>; - - #[inline] - fn next(&mut self) -> Option { - swap_option_result(self.next_inner()) - } -} +// /* +// * Copyright 2022, The Cozo Project Authors. Licensed under MPL-2.0. +// */ +// +// use std::cmp::Ordering; +// use std::collections::btree_map::Range; +// use std::collections::BTreeMap; +// use std::iter::Fuse; +// use std::sync::{Arc, RwLock}; +// use std::thread; +// +// use miette::{IntoDiagnostic, Result}; +// use sled::transaction::{ConflictableTransactionError, TransactionalTree}; +// use sled::{Db, IVec, Iter}; +// +// use crate::data::tuple::Tuple; +// use crate::runtime::relation::decode_tuple_from_kv; +// use crate::storage::{Storage, StoreTx}; +// use crate::utils::swap_option_result; +// +// #[derive(Clone)] +// struct SledStorage { +// db: Db, +// } +// +// impl Storage for SledStorage { +// type Tx = SledTx; +// +// fn transact(&self) -> Result { +// Ok(SledTx { +// db: self.db.clone(), +// changes: Default::default(), +// }) +// } +// +// fn del_range(&self, lower: &[u8], upper: &[u8]) -> Result<()> { +// let db = self.db.clone(); +// let lower_v = lower.to_vec(); +// let upper_v = upper.to_vec(); +// thread::spawn(move || -> Result<()> { +// for k_res in db.range(lower_v..upper_v).keys() { +// db.remove(k_res.into_diagnostic()?).into_diagnostic()?; +// } +// Ok(()) +// }); +// Ok(()) +// } +// +// fn range_compact(&self, _lower: &[u8], _upper: &[u8]) -> Result<()> { +// Ok(()) +// } +// } +// +// struct SledTx { +// db: Db, +// changes: Arc, Option>>>>, +// } +// +// impl StoreTx for SledTx { +// #[inline] +// fn get(&self, key: &[u8], _for_update: bool) -> Result>> { +// Ok(match self.changes.read().unwrap().get(key) { +// Some(Some(val)) => Some(val.clone()), +// Some(None) => None, +// None => { +// let ret = self.db.get(key).into_diagnostic()?; +// ret.map(|v| v.to_vec()) +// } +// }) +// } +// +// #[inline] +// fn put(&mut self, key: &[u8], val: &[u8]) -> Result<()> { +// self.changes.write().unwrap().insert(key.into(), Some(val.into())); +// Ok(()) +// } +// +// #[inline] +// fn del(&mut self, key: &[u8]) -> Result<()> { +// self.changes.write().unwrap().insert(key.into(), None); +// Ok(()) +// } +// +// #[inline] +// fn exists(&self, key: &[u8], _for_update: bool) -> Result { +// Ok(match self.changes.read().unwrap().get(key) { +// Some(Some(_)) => true, +// Some(None) => false, +// None => self.db.get(key).into_diagnostic()?.is_some(), +// }) +// } +// +// fn commit(&mut self) -> Result<()> { +// self.db +// .transaction( +// |db: &TransactionalTree| -> Result<(), ConflictableTransactionError> { +// for (k, v) in self.changes.read().unwrap().iter() { +// match v { +// None => { +// db.remove(k as &[u8])?; +// } +// Some(v) => { +// db.insert(k as &[u8], v as &[u8])?; +// } +// } +// } +// Ok(()) +// }, +// ) +// .into_diagnostic()?; +// Ok(()) +// } +// +// fn range_scan(&self, lower: &[u8], upper: &[u8]) -> Box>> { +// let change_iter = self.changes.read().unwrap().range(lower.to_vec()..upper.to_vec()).fuse(); +// let db_iter = self.db.range(lower..upper).fuse(); +// Box::new(SledIter { +// change_iter, +// db_iter, +// change_cache: None, +// db_cache: None, +// }) +// } +// +// fn range_scan_raw( +// &self, +// lower: &[u8], +// upper: &[u8], +// ) -> Box, Vec)>>> { +// let change_iter = self.changes.read().unwrap().range(lower.to_vec()..upper.to_vec()).fuse(); +// let db_iter = self.db.range(lower..upper).fuse(); +// Box::new(SledIterRaw { +// change_iter, +// db_iter, +// change_cache: None, +// db_cache: None, +// }) +// } +// } +// +// struct SledIter<'a> { +// change_iter: Fuse, Option>>>, +// db_iter: Fuse, +// change_cache: Option<(Vec, Option>)>, +// db_cache: Option<(IVec, IVec)>, +// } +// +// impl<'a> SledIter<'a> { +// #[inline] +// fn fill_cache(&mut self) -> Result<()> { +// if self.change_cache.is_none() { +// if let Some((k, v)) = self.change_iter.next() { +// self.change_cache = Some((k.to_vec(), v.clone())) +// } +// } +// +// if self.db_cache.is_none() { +// if let Some(res) = self.db_iter.next() { +// self.db_cache = Some(res.into_diagnostic()?); +// } +// } +// +// Ok(()) +// } +// +// #[inline] +// fn next_inner(&mut self) -> Result> { +// loop { +// self.fill_cache()?; +// match (&self.change_cache, &self.db_cache) { +// (None, None) => return Ok(None), +// (Some((_, None)), None) => { +// self.change_cache.take(); +// continue; +// } +// (Some((_, Some(_))), None) => { +// let (k, sv) = self.change_cache.take().unwrap(); +// let v = sv.unwrap(); +// return Ok(Some(decode_tuple_from_kv(&k, &v))); +// } +// (None, Some(_)) => { +// let (k, v) = self.db_cache.take().unwrap(); +// return Ok(Some(decode_tuple_from_kv(&k, &v))); +// } +// (Some((ck, _)), Some((dk, _))) => match ck.as_slice().cmp(dk) { +// Ordering::Less => { +// let (k, sv) = self.change_cache.take().unwrap(); +// match sv { +// None => continue, +// Some(v) => { +// return Ok(Some(decode_tuple_from_kv(&k, &v))); +// } +// } +// } +// Ordering::Greater => { +// let (k, v) = self.db_cache.take().unwrap(); +// return Ok(Some(decode_tuple_from_kv(&k, &v))); +// } +// Ordering::Equal => { +// self.db_cache.take(); +// continue; +// } +// }, +// } +// } +// } +// } +// +// impl<'a> Iterator for SledIter<'a> { +// type Item = Result; +// +// #[inline] +// fn next(&mut self) -> Option { +// swap_option_result(self.next_inner()) +// } +// } +// +// struct SledIterRaw<'a> { +// change_iter: Fuse, Option>>>, +// db_iter: Fuse, +// change_cache: Option<(Vec, Option>)>, +// db_cache: Option<(IVec, IVec)>, +// } +// +// impl<'a> SledIterRaw<'a> { +// #[inline] +// fn fill_cache(&mut self) -> Result<()> { +// if self.change_cache.is_none() { +// if let Some((k, v)) = self.change_iter.next() { +// self.change_cache = Some((k.to_vec(), v.clone())) +// } +// } +// +// if self.db_cache.is_none() { +// if let Some(res) = self.db_iter.next() { +// self.db_cache = Some(res.into_diagnostic()?); +// } +// } +// +// Ok(()) +// } +// +// #[inline] +// fn next_inner(&mut self) -> Result, Vec)>> { +// loop { +// self.fill_cache()?; +// match (&self.change_cache, &self.db_cache) { +// (None, None) => return Ok(None), +// (Some((_, None)), None) => { +// self.change_cache.take(); +// continue; +// } +// (Some((_, Some(_))), None) => { +// let (k, sv) = self.change_cache.take().unwrap(); +// let v = sv.unwrap(); +// return Ok(Some((k, v))); +// } +// (None, Some(_)) => { +// let (k, v) = self.db_cache.take().unwrap(); +// return Ok(Some((k.to_vec(), v.to_vec()))); +// } +// (Some((ck, _)), Some((dk, _))) => match ck.as_slice().cmp(dk) { +// Ordering::Less => { +// let (k, sv) = self.change_cache.take().unwrap(); +// match sv { +// None => continue, +// Some(v) => return Ok(Some((k, v))), +// } +// } +// Ordering::Greater => { +// let (k, v) = self.db_cache.take().unwrap(); +// return Ok(Some((k.to_vec(), v.to_vec()))); +// } +// Ordering::Equal => { +// self.db_cache.take(); +// continue; +// } +// }, +// } +// } +// } +// } +// +// impl<'a> Iterator for SledIterRaw<'a> { +// type Item = Result<(Vec, Vec)>; +// +// #[inline] +// fn next(&mut self) -> Option { +// swap_option_result(self.next_inner()) +// } +// } diff --git a/cozo-core/tests/air_routes.rs b/cozo-core/tests/air_routes.rs index 36f7354a..2d36f8e9 100644 --- a/cozo-core/tests/air_routes.rs +++ b/cozo-core/tests/air_routes.rs @@ -9,15 +9,16 @@ use approx::AbsDiffEq; use env_logger::Env; use lazy_static::lazy_static; use serde_json::json; +use cozo::RocksDbStorage; -use cozo::Db; +use cozo::{new_cozo_rocksdb, Db}; lazy_static! { - static ref TEST_DB: Db = { + static ref TEST_DB: Db = { let creation = Instant::now(); let path = "_test_air_routes"; _ = std::fs::remove_dir_all(path); - let db = Db::new(path).unwrap(); + let db = new_cozo_rocksdb(path).unwrap(); dbg!(creation.elapsed()); let init = Instant::now(); diff --git a/cozo-lib-c/src/lib.rs b/cozo-lib-c/src/lib.rs index 1bb27399..3d13622f 100644 --- a/cozo-lib-c/src/lib.rs +++ b/cozo-lib-c/src/lib.rs @@ -11,16 +11,19 @@ use std::sync::Mutex; use lazy_static::lazy_static; -use cozo::Db; +use cozo::RocksDbStorage; +use cozo::{new_cozo_rocksdb, Db}; -#[derive(Default)] -struct Handles { +struct Handles { current: AtomicI32, - dbs: Mutex>, + dbs: Mutex>>, } lazy_static! { - static ref HANDLES: Handles = Handles::default(); + static ref HANDLES: Handles = Handles { + current: Default::default(), + dbs: Mutex::new(Default::default()) + }; } /// Open a database. @@ -38,7 +41,7 @@ pub unsafe extern "C" fn cozo_open_db(path: *const c_char, db_id: &mut i32) -> * Err(err) => return CString::new(format!("{}", err)).unwrap().into_raw(), }; - match Db::new(path) { + match new_cozo_rocksdb(path) { Ok(db) => { let id = HANDLES.current.fetch_add(1, Ordering::AcqRel); let mut dbs = HANDLES.dbs.lock().unwrap(); diff --git a/cozo-lib-java/src/lib.rs b/cozo-lib-java/src/lib.rs index 1fd85142..5fdb147d 100644 --- a/cozo-lib-java/src/lib.rs +++ b/cozo-lib-java/src/lib.rs @@ -9,15 +9,19 @@ use lazy_static::lazy_static; use robusta_jni::bridge; use cozo::Db; +use cozo::RocksDbStorage; #[derive(Default)] -struct Handles { +struct Handles { current: AtomicI32, - dbs: Mutex>, + dbs: Mutex>>, } lazy_static! { - static ref HANDLES: Handles = Handles::default(); + static ref HANDLES: Handles = Handles { + current: Default::default(), + dbs: Mutex::new(Default::default()) + }; } #[bridge] @@ -29,7 +33,7 @@ mod jni { use robusta_jni::jni::errors::Result as JniResult; use robusta_jni::jni::objects::AutoLocal; - use cozo::Db; + use cozo::{new_cozo_rocksdb}; use crate::HANDLES; @@ -42,7 +46,7 @@ mod jni { impl<'env: 'borrow, 'borrow> CozoDb<'env, 'borrow> { pub extern "jni" fn openDb(path: String) -> JniResult { - match Db::new(path) { + match new_cozo_rocksdb(path) { Ok(db) => { let id = HANDLES.current.fetch_add(1, Ordering::AcqRel); let mut dbs = HANDLES.dbs.lock().unwrap();