make database code generic

main
Ziyang Hu 2 years ago
parent ecc97542bd
commit e270ad8f3c

@ -16,6 +16,9 @@ exclude = [
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[features]
storage-rocksdb = []
storage-sled = []
storage-tikv = []
jemalloc = ["tikv-jemallocator-global", "cozorocks/jemalloc"]
io-uring = ["cozorocks/io-uring"]

@ -27,12 +27,12 @@ use crate::runtime::transact::SessionTx;
pub(crate) struct BetweennessCentrality;
impl AlgoImpl for BetweennessCentrality {
fn run(
fn run<'a>(
&mut self,
tx: &SessionTx,
algo: &MagicAlgoApply,
stores: &BTreeMap<MagicSymbol, InMemRelation>,
out: &InMemRelation,
tx: &'a SessionTx,
algo: &'a MagicAlgoApply,
stores: &'a BTreeMap<MagicSymbol, InMemRelation>,
out: &'a InMemRelation,
poison: Poison,
) -> Result<()> {
let edges = algo.relation(0)?;
@ -97,12 +97,12 @@ impl AlgoImpl for BetweennessCentrality {
pub(crate) struct ClosenessCentrality;
impl AlgoImpl for ClosenessCentrality {
fn run(
fn run<'a>(
&mut self,
tx: &SessionTx,
algo: &MagicAlgoApply,
stores: &BTreeMap<MagicSymbol, InMemRelation>,
out: &InMemRelation,
tx: &'a SessionTx,
algo: &'a MagicAlgoApply,
stores: &'a BTreeMap<MagicSymbol, InMemRelation>,
out: &'a InMemRelation,
poison: Poison,
) -> Result<()> {
let edges = algo.relation(0)?;

@ -24,12 +24,12 @@ use crate::runtime::transact::SessionTx;
pub(crate) struct ShortestPathAStar;
impl AlgoImpl for ShortestPathAStar {
fn run(
fn run<'a>(
&mut self,
tx: &SessionTx,
algo: &MagicAlgoApply,
stores: &BTreeMap<MagicSymbol, InMemRelation>,
out: &InMemRelation,
tx: &'a SessionTx,
algo: &'a MagicAlgoApply,
stores: &'a BTreeMap<MagicSymbol, InMemRelation>,
out: &'a InMemRelation,
poison: Poison,
) -> Result<()> {
let edges = algo.relation_with_min_len(0, 3, tx, stores)?;
@ -81,14 +81,14 @@ impl AlgoImpl for ShortestPathAStar {
}
}
fn astar(
fn astar<'a>(
starting: &Tuple,
goal: &Tuple,
edges: &MagicAlgoRuleArg,
nodes: &MagicAlgoRuleArg,
edges: &'a MagicAlgoRuleArg,
nodes: &'a MagicAlgoRuleArg,
heuristic: &Expr,
tx: &SessionTx,
stores: &BTreeMap<MagicSymbol, InMemRelation>,
tx: &'a SessionTx,
stores: &'a BTreeMap<MagicSymbol, InMemRelation>,
poison: Poison,
) -> Result<(f64, Vec<DataValue>)> {
let start_node = &starting.0[0];

@ -21,12 +21,12 @@ use crate::runtime::transact::SessionTx;
pub(crate) struct Bfs;
impl AlgoImpl for Bfs {
fn run(
fn run<'a>(
&mut self,
tx: &SessionTx,
algo: &MagicAlgoApply,
stores: &BTreeMap<MagicSymbol, InMemRelation>,
out: &InMemRelation,
tx: &'a SessionTx,
algo: &'a MagicAlgoApply,
stores: &'a BTreeMap<MagicSymbol, InMemRelation>,
out: &'a InMemRelation,
poison: Poison,
) -> Result<()> {
let edges = algo.relation_with_min_len(0, 2, tx, stores)?;

@ -21,12 +21,12 @@ use crate::runtime::transact::SessionTx;
pub(crate) struct DegreeCentrality;
impl AlgoImpl for DegreeCentrality {
fn run(
fn run<'a>(
&mut self,
tx: &SessionTx,
algo: &MagicAlgoApply,
stores: &BTreeMap<MagicSymbol, InMemRelation>,
out: &InMemRelation,
tx: &'a SessionTx,
algo: &'a MagicAlgoApply,
stores: &'a BTreeMap<MagicSymbol, InMemRelation>,
out: &'a InMemRelation,
poison: Poison,
) -> Result<()> {
let it = algo

@ -21,12 +21,12 @@ use crate::runtime::transact::SessionTx;
pub(crate) struct Dfs;
impl AlgoImpl for Dfs {
fn run(
fn run<'a>(
&mut self,
tx: &SessionTx,
algo: &MagicAlgoApply,
stores: &BTreeMap<MagicSymbol, InMemRelation>,
out: &InMemRelation,
tx: &'a SessionTx,
algo: &'a MagicAlgoApply,
stores: &'a BTreeMap<MagicSymbol, InMemRelation>,
out: &'a InMemRelation,
poison: Poison,
) -> Result<()> {
let edges = algo.relation_with_min_len(0, 2, tx, stores)?;

@ -25,12 +25,12 @@ use crate::runtime::transact::SessionTx;
pub(crate) struct MinimumSpanningForestKruskal;
impl AlgoImpl for MinimumSpanningForestKruskal {
fn run(
fn run<'a>(
&mut self,
tx: &SessionTx,
algo: &MagicAlgoApply,
stores: &BTreeMap<MagicSymbol, InMemRelation>,
out: &InMemRelation,
tx: &'a SessionTx,
algo: &'a MagicAlgoApply,
stores: &'a BTreeMap<MagicSymbol, InMemRelation>,
out: &'a InMemRelation,
poison: Poison,
) -> Result<()> {
let edges = algo.relation(0)?;

@ -23,12 +23,12 @@ use crate::runtime::transact::SessionTx;
pub(crate) struct LabelPropagation;
impl AlgoImpl for LabelPropagation {
fn run(
fn run<'a>(
&mut self,
tx: &SessionTx,
algo: &MagicAlgoApply,
stores: &BTreeMap<MagicSymbol, InMemRelation>,
out: &InMemRelation,
tx: &'a SessionTx,
algo: &'a MagicAlgoApply,
stores: &'a BTreeMap<MagicSymbol, InMemRelation>,
out: &'a InMemRelation,
poison: Poison,
) -> Result<()> {
let edges = algo.relation(0)?;

@ -23,12 +23,12 @@ use crate::runtime::transact::SessionTx;
pub(crate) struct CommunityDetectionLouvain;
impl AlgoImpl for CommunityDetectionLouvain {
fn run(
fn run<'a>(
&mut self,
tx: &SessionTx,
algo: &MagicAlgoApply,
stores: &BTreeMap<MagicSymbol, InMemRelation>,
out: &InMemRelation,
tx: &'a SessionTx,
algo: &'a MagicAlgoApply,
stores: &'a BTreeMap<MagicSymbol, InMemRelation>,
out: &'a InMemRelation,
poison: Poison,
) -> Result<()> {
let edges = algo.relation(0)?;

@ -60,12 +60,12 @@ pub(crate) mod triangles;
pub(crate) mod yen;
pub(crate) trait AlgoImpl {
fn run(
fn run<'a>(
&mut self,
tx: &SessionTx,
algo: &MagicAlgoApply,
stores: &BTreeMap<MagicSymbol, InMemRelation>,
out: &InMemRelation,
tx: &'a SessionTx,
algo: &'a MagicAlgoApply,
stores: &'a BTreeMap<MagicSymbol, InMemRelation>,
out: &'a InMemRelation,
poison: Poison,
) -> Result<()>;
fn arity(
@ -191,12 +191,12 @@ pub(crate) struct BadExprValueError(
pub(crate) struct AlgoNotFoundError(pub(crate) String, #[label] pub(crate) SourceSpan);
impl MagicAlgoRuleArg {
pub(crate) fn convert_edge_to_weighted_graph(
&self,
pub(crate) fn convert_edge_to_weighted_graph<'a>(
&'a self,
undirected: bool,
allow_negative_edges: bool,
tx: &SessionTx,
stores: &BTreeMap<MagicSymbol, InMemRelation>,
tx: &'a SessionTx,
stores: &'a BTreeMap<MagicSymbol, InMemRelation>,
) -> Result<(
Vec<Vec<(usize, f64)>>,
Vec<DataValue>,
@ -276,11 +276,11 @@ impl MagicAlgoRuleArg {
}
Ok((graph, indices, inv_indices, has_neg_edge))
}
pub(crate) fn convert_edge_to_graph(
&self,
pub(crate) fn convert_edge_to_graph<'a>(
&'a self,
undirected: bool,
tx: &SessionTx,
stores: &BTreeMap<MagicSymbol, InMemRelation>,
tx: &'a SessionTx,
stores: &'a BTreeMap<MagicSymbol, InMemRelation>,
) -> Result<(Vec<Vec<usize>>, Vec<DataValue>, BTreeMap<DataValue, usize>)> {
let mut graph: Vec<Vec<usize>> = vec![];
let mut indices: Vec<DataValue> = vec![];

@ -24,12 +24,12 @@ use crate::runtime::transact::SessionTx;
pub(crate) struct PageRank;
impl AlgoImpl for PageRank {
fn run(
fn run<'a>(
&mut self,
tx: &SessionTx,
algo: &MagicAlgoApply,
stores: &BTreeMap<MagicSymbol, InMemRelation>,
out: &InMemRelation,
tx: &'a SessionTx,
algo: &'a MagicAlgoApply,
stores: &'a BTreeMap<MagicSymbol, InMemRelation>,
out: &'a InMemRelation,
poison: Poison,
) -> Result<()> {
let edges = algo.relation(0)?;

@ -26,12 +26,12 @@ use crate::runtime::transact::SessionTx;
pub(crate) struct MinimumSpanningTreePrim;
impl AlgoImpl for MinimumSpanningTreePrim {
fn run(
fn run<'a>(
&mut self,
tx: &SessionTx,
algo: &MagicAlgoApply,
stores: &BTreeMap<MagicSymbol, InMemRelation>,
out: &InMemRelation,
tx: &'a SessionTx,
algo: &'a MagicAlgoApply,
stores: &'a BTreeMap<MagicSymbol, InMemRelation>,
out: &'a InMemRelation,
poison: Poison,
) -> Result<()> {
let edges = algo.relation(0)?;

@ -24,12 +24,12 @@ use crate::runtime::transact::SessionTx;
pub(crate) struct RandomWalk;
impl AlgoImpl for RandomWalk {
fn run(
fn run<'a>(
&mut self,
tx: &SessionTx,
algo: &MagicAlgoApply,
stores: &BTreeMap<MagicSymbol, InMemRelation>,
out: &InMemRelation,
tx: &'a SessionTx,
algo: &'a MagicAlgoApply,
stores: &'a BTreeMap<MagicSymbol, InMemRelation>,
out: &'a InMemRelation,
poison: Poison,
) -> Result<()> {
let edges = algo.relation_with_min_len(0, 2, tx, stores)?;

@ -23,12 +23,12 @@ use crate::runtime::transact::SessionTx;
pub(crate) struct ReorderSort;
impl AlgoImpl for ReorderSort {
fn run(
fn run<'a>(
&mut self,
tx: &SessionTx,
algo: &MagicAlgoApply,
stores: &BTreeMap<MagicSymbol, InMemRelation>,
out: &InMemRelation,
tx: &'a SessionTx,
algo: &'a MagicAlgoApply,
stores: &'a BTreeMap<MagicSymbol, InMemRelation>,
out: &'a InMemRelation,
poison: Poison,
) -> Result<()> {
let in_rel = algo.relation(0)?;

@ -28,12 +28,12 @@ use crate::runtime::transact::SessionTx;
pub(crate) struct ShortestPathDijkstra;
impl AlgoImpl for ShortestPathDijkstra {
fn run(
fn run<'a>(
&mut self,
tx: &SessionTx,
algo: &MagicAlgoApply,
stores: &BTreeMap<MagicSymbol, InMemRelation>,
out: &InMemRelation,
tx: &'a SessionTx,
algo: &'a MagicAlgoApply,
stores: &'a BTreeMap<MagicSymbol, InMemRelation>,
out: &'a InMemRelation,
poison: Poison,
) -> Result<()> {
let edges = algo.relation(0)?;

@ -31,12 +31,12 @@ impl StronglyConnectedComponent {
}
impl AlgoImpl for StronglyConnectedComponent {
fn run(
fn run<'a>(
&mut self,
tx: &SessionTx,
algo: &MagicAlgoApply,
stores: &BTreeMap<MagicSymbol, InMemRelation>,
out: &InMemRelation,
tx: &'a SessionTx,
algo: &'a MagicAlgoApply,
stores: &'a BTreeMap<MagicSymbol, InMemRelation>,
out: &'a InMemRelation,
poison: Poison,
) -> Result<()> {
let edges = algo.relation(0)?;

@ -21,12 +21,12 @@ use crate::runtime::transact::SessionTx;
pub(crate) struct TopSort;
impl AlgoImpl for TopSort {
fn run(
fn run<'a>(
&mut self,
tx: &SessionTx,
algo: &MagicAlgoApply,
stores: &BTreeMap<MagicSymbol, InMemRelation>,
out: &InMemRelation,
tx: &'a SessionTx,
algo: &'a MagicAlgoApply,
stores: &'a BTreeMap<MagicSymbol, InMemRelation>,
out: &'a InMemRelation,
poison: Poison,
) -> Result<()> {
let edges = algo.relation(0)?;

@ -22,12 +22,12 @@ use crate::runtime::transact::SessionTx;
pub(crate) struct ClusteringCoefficients;
impl AlgoImpl for ClusteringCoefficients {
fn run(
fn run<'a>(
&mut self,
tx: &SessionTx,
algo: &MagicAlgoApply,
stores: &BTreeMap<MagicSymbol, InMemRelation>,
out: &InMemRelation,
tx: &'a SessionTx,
algo: &'a MagicAlgoApply,
stores: &'a BTreeMap<MagicSymbol, InMemRelation>,
out: &'a InMemRelation,
poison: Poison,
) -> Result<()> {
let edges = algo.relation(0)?;

@ -24,12 +24,12 @@ use crate::runtime::transact::SessionTx;
pub(crate) struct KShortestPathYen;
impl AlgoImpl for KShortestPathYen {
fn run(
fn run<'a>(
&mut self,
tx: &SessionTx,
algo: &MagicAlgoApply,
stores: &BTreeMap<MagicSymbol, InMemRelation>,
out: &InMemRelation,
tx: &'a SessionTx,
algo: &'a MagicAlgoApply,
stores: &'a BTreeMap<MagicSymbol, InMemRelation>,
out: &'a InMemRelation,
poison: Poison,
) -> Result<()> {
let edges = algo.relation(0)?;

@ -14,7 +14,7 @@ use log::{error, info};
use rand::Rng;
use rouille::{router, try_or_400, Request, Response};
use cozo::Db;
use cozo::new_cozo_rocksdb;
#[derive(Parser, Debug)]
#[clap(version, about, long_about = None)]
@ -39,7 +39,7 @@ fn main() {
eprintln!("{}", SECURITY_WARNING);
}
let db = Db::new(args.path.as_str()).unwrap();
let db = new_cozo_rocksdb(args.path.as_str()).unwrap();
let mut path_buf = PathBuf::from(&args.path);
path_buf.push("auth.txt");

@ -13,7 +13,7 @@ use crate::runtime::relation::RelationId;
pub(crate) const KEY_PREFIX_LEN: usize = 9;
#[derive(Clone, Ord, PartialOrd, Eq, PartialEq, Default)]
pub(crate) struct Tuple(pub(crate) Vec<DataValue>);
pub struct Tuple(pub(crate) Vec<DataValue>);
impl Debug for Tuple {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {

@ -19,6 +19,8 @@
pub use miette::Error;
pub use runtime::db::Db;
pub use runtime::db::new_cozo_rocksdb;
pub use storage::rocks::RocksDbStorage;
pub(crate) mod algo;
pub(crate) mod data;

@ -20,8 +20,8 @@ use crate::data::value::DataValue;
use crate::parse::parse_script;
use crate::runtime::relation::{AccessLevel, InputRelationHandle, InsufficientAccessLevel};
use crate::runtime::transact::SessionTx;
use crate::storage::Storage;
use crate::Db;
use crate::storage::StoreTx;
#[derive(Debug, Error, Diagnostic)]
#[error("attempting to write into relation {0} of arity {1} with data of arity {2}")]
@ -29,14 +29,17 @@ use crate::storage::StoreTx;
struct RelationArityMismatch(String, usize, usize);
impl SessionTx {
pub(crate) fn execute_relation<'a>(
&'a mut self,
db: &Db,
res_iter: impl Iterator<Item = Result<Tuple>> + 'a,
pub(crate) fn execute_relation<S: Storage>(
&mut self,
db: &Db<S>,
res_iter: impl Iterator<Item = Result<Tuple>>,
op: RelationOp,
meta: &InputRelationHandle,
headers: &[Symbol],
) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
) -> Result<Vec<(Vec<u8>, Vec<u8>)>>
where
<S as Storage>::Tx: 'static,
{
let mut to_clear = vec![];
let mut replaced_old_triggers = None;
if op == RelationOp::Replace {

@ -67,14 +67,17 @@ const CURRENT_STORAGE_VERSION: u64 = 1;
/// The database object of Cozo.
#[derive(Clone)]
pub struct Db {
db: RocksDbStorage,
pub struct Db<S> {
db: S,
relation_store_id: Arc<AtomicU64>,
queries_count: Arc<AtomicU64>,
running_queries: Arc<Mutex<BTreeMap<u64, RunningQueryHandle>>>,
}
impl Debug for Db {
impl<S> Debug for Db<S>
where
S: Storage,
{
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "Db")
}
@ -91,9 +94,8 @@ lazy_static! {
static ref JSON_ERR_HANDLER: JSONReportHandler = miette::JSONReportHandler::new();
}
impl Db {
/// Creates a database object.
pub fn new(path: impl AsRef<str>) -> Result<Self> {
pub fn new_cozo_rocksdb(path: impl AsRef<str>) -> Result<Db<RocksDbStorage>> {
let builder = DbBuilder::default().path(path.as_ref());
let path = builder.opts.db_path;
fs::create_dir_all(path)
@ -147,8 +149,18 @@ impl Db {
let db = db_builder.build()?;
Db::new(RocksDbStorage::new(db))
}
impl<S> Db<S>
where
S: Storage,
<S as Storage>::Tx: 'static,
{
/// create a new database with the specified storage
pub fn new(storage: S) -> Result<Self> {
let ret = Self {
db: RocksDbStorage::new(db),
db: storage,
relation_store_id: Arc::new(Default::default()),
queries_count: Arc::new(Default::default()),
running_queries: Arc::new(Mutex::new(Default::default())),
@ -156,7 +168,6 @@ impl Db {
ret.load_last_ids()?;
Ok(ret)
}
fn compact_relation(&self) -> Result<()> {
let l = Tuple::default().encode_as_key(RelationId(0));
let u = Tuple(vec![DataValue::Bot]).encode_as_key(RelationId(u64::MAX));
@ -172,7 +183,7 @@ impl Db {
}
fn transact(&self) -> Result<SessionTx> {
let ret = SessionTx {
tx: self.db.transact()?,
tx: Box::new(self.db.transact()?),
mem_store_id: Default::default(),
relation_store_id: self.relation_store_id.clone(),
};
@ -180,7 +191,7 @@ impl Db {
}
fn transact_write(&self) -> Result<SessionTx> {
let ret = SessionTx {
tx: self.db.transact()?,
tx: Box::new(self.db.transact()?),
mem_store_id: Default::default(),
relation_store_id: self.relation_store_id.clone(),
};

@ -19,7 +19,6 @@ use crate::data::tuple::{Tuple, ENCODED_KEY_MIN_LEN};
use crate::data::value::DataValue;
use crate::parse::SourceSpan;
use crate::runtime::transact::SessionTx;
use crate::storage::StoreTx;
#[derive(
Copy,
@ -233,8 +232,7 @@ impl RelationHandle {
let prefix_encoded = Tuple(lower).encode_as_key(self.id);
let upper_encoded = Tuple(upper).encode_as_key(self.id);
// RelationIterator::new(tx, &prefix_encoded, &upper_encoded)
tx.tx
.range_scan(&prefix_encoded, &upper_encoded)
tx.tx.range_scan(&prefix_encoded, &upper_encoded)
}
pub(crate) fn scan_bounded_prefix(
&self,
@ -250,8 +248,7 @@ impl RelationHandle {
upper_t.0.push(DataValue::Bot);
let lower_encoded = lower_t.encode_as_key(self.id);
let upper_encoded = upper_t.encode_as_key(self.id);
tx.tx
.range_scan(&lower_encoded, &upper_encoded)
tx.tx.range_scan(&lower_encoded, &upper_encoded)
}
}

@ -2,8 +2,8 @@
* Copyright 2022, The Cozo Project Authors. Licensed under MPL-2.0.
*/
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
use std::sync::Arc;
use miette::Result;
@ -14,11 +14,10 @@ use crate::data::value::DataValue;
use crate::parse::SourceSpan;
use crate::runtime::in_mem::{InMemRelation, StoredRelationId};
use crate::runtime::relation::RelationId;
use crate::storage::rocks::RocksDbTx;
use crate::storage::StoreTx;
pub struct SessionTx {
pub(crate) tx: RocksDbTx,
pub(crate) tx: Box<dyn StoreTx>,
pub(crate) relation_store_id: Arc<AtomicU64>,
pub(crate) mem_store_id: Arc<AtomicU32>,
}

@ -10,25 +10,28 @@ pub(crate) mod rocks;
pub(crate) mod sled;
pub(crate) mod tikv;
pub(crate) trait Storage<'a> {
type Tx: StoreTx<'a>;
pub trait Storage {
type Tx: StoreTx;
fn transact(&'a self) -> Result<Self::Tx>;
fn transact(&self) -> Result<Self::Tx>;
fn del_range(&self, lower: &[u8], upper: &[u8]) -> Result<()>;
fn range_compact(&self, lower: &[u8], upper: &[u8]) -> Result<()>;
}
pub(crate) trait StoreTx<'a> {
type ReadSlice: AsRef<[u8]>;
type KVIter: Iterator<Item = Result<Tuple>>;
type KVIterRaw: Iterator<Item = Result<(Vec<u8>, Vec<u8>)>>;
fn get(&self, key: &[u8], for_update: bool) -> Result<Option<Self::ReadSlice>>;
pub trait StoreTx {
fn get(&self, key: &[u8], for_update: bool) -> Result<Option<Vec<u8>>>;
fn put(&mut self, key: &[u8], val: &[u8]) -> Result<()>;
fn del(&mut self, key: &[u8]) -> Result<()>;
fn exists(&self, key: &[u8], for_update: bool) -> Result<bool>;
fn commit(&mut self) -> Result<()>;
fn range_scan(&'a self, lower: &[u8], upper: &[u8]) -> Self::KVIter;
fn range_scan_raw(&'a self, lower: &[u8], upper: &[u8]) -> Self::KVIterRaw;
fn range_scan(
&self,
lower: &[u8],
upper: &[u8],
) -> Box<dyn Iterator<Item = Result<Tuple>>>;
fn range_scan_raw(
&self,
lower: &[u8],
upper: &[u8],
) -> Box<dyn Iterator<Item = Result<(Vec<u8>, Vec<u8>)>>>;
}

@ -4,15 +4,16 @@
use miette::{IntoDiagnostic, Result};
use cozorocks::{DbIter, PinSlice, RocksDb, Tx};
use cozorocks::{DbIter, RocksDb, Tx};
use crate::data::tuple::Tuple;
use crate::runtime::relation::decode_tuple_from_kv;
use crate::storage::{Storage, StoreTx};
use crate::utils::swap_option_result;
/// RocksDB storage engine
#[derive(Clone)]
pub(crate) struct RocksDbStorage {
pub struct RocksDbStorage {
db: RocksDb,
}
@ -22,7 +23,7 @@ impl RocksDbStorage {
}
}
impl Storage<'_> for RocksDbStorage {
impl Storage for RocksDbStorage {
type Tx = RocksDbTx;
fn transact(&self) -> Result<Self::Tx> {
@ -39,18 +40,14 @@ impl Storage<'_> for RocksDbStorage {
}
}
pub(crate) struct RocksDbTx {
pub struct RocksDbTx {
db_tx: Tx,
}
impl StoreTx<'_> for RocksDbTx {
type ReadSlice = PinSlice;
type KVIter = RocksDbIterator;
type KVIterRaw = RocksDbIteratorRaw;
impl StoreTx for RocksDbTx {
#[inline]
fn get(&self, key: &[u8], for_update: bool) -> Result<Option<Self::ReadSlice>> {
Ok(self.db_tx.get(key, for_update)?)
fn get(&self, key: &[u8], for_update: bool) -> Result<Option<Vec<u8>>> {
Ok(self.db_tx.get(key, for_update)?.map(|v| v.to_vec()))
}
#[inline]
@ -72,24 +69,28 @@ impl StoreTx<'_> for RocksDbTx {
Ok(self.db_tx.commit()?)
}
fn range_scan(&self, lower: &[u8], upper: &[u8]) -> Self::KVIter {
fn range_scan(&self, lower: &[u8], upper: &[u8]) -> Box<dyn Iterator<Item = Result<Tuple>>> {
let mut inner = self.db_tx.iterator().upper_bound(upper).start();
inner.seek(lower);
RocksDbIterator {
Box::new(RocksDbIterator {
inner,
started: false,
upper_bound: upper.to_vec(),
}
})
}
fn range_scan_raw(&self, lower: &[u8], upper: &[u8]) -> Self::KVIterRaw {
fn range_scan_raw(
&self,
lower: &[u8],
upper: &[u8],
) -> Box<dyn Iterator<Item = Result<(Vec<u8>, Vec<u8>)>>> {
let mut inner = self.db_tx.iterator().upper_bound(upper).start();
inner.seek(lower);
RocksDbIteratorRaw {
Box::new(RocksDbIteratorRaw {
inner,
started: false,
upper_bound: upper.to_vec(),
}
})
}
}

@ -1,291 +1,291 @@
/*
* Copyright 2022, The Cozo Project Authors. Licensed under MPL-2.0.
*/
use std::cmp::Ordering;
use std::collections::btree_map::Range;
use std::collections::BTreeMap;
use std::iter::Fuse;
use std::marker::PhantomData;
use std::thread;
use miette::{IntoDiagnostic, Result};
use sled::transaction::{ConflictableTransactionError, TransactionalTree};
use sled::{Db, IVec, Iter};
use crate::data::tuple::Tuple;
use crate::runtime::relation::decode_tuple_from_kv;
use crate::storage::{Storage, StoreTx};
use crate::utils::swap_option_result;
#[derive(Clone)]
struct SledStorage<'a> {
db: Db,
_phantom: PhantomData<&'a [u8]>,
}
impl<'a> Storage<'a> for SledStorage<'a> {
type Tx = SledTx<'a>;
fn transact(&'a self) -> Result<Self::Tx> {
Ok(SledTx {
db: self.db.clone(),
changes: Default::default(),
_phantom: Default::default(),
})
}
fn del_range(&self, lower: &[u8], upper: &[u8]) -> Result<()> {
let db = self.db.clone();
let lower_v = lower.to_vec();
let upper_v = upper.to_vec();
thread::spawn(move || -> Result<()> {
for k_res in db.range(lower_v..upper_v).keys() {
db.remove(k_res.into_diagnostic()?).into_diagnostic()?;
}
Ok(())
});
Ok(())
}
fn range_compact(&self, _lower: &[u8], _upper: &[u8]) -> Result<()> {
Ok(())
}
}
struct SledTx<'a> {
db: Db,
changes: BTreeMap<Vec<u8>, Option<Vec<u8>>>,
_phantom: PhantomData<&'a [u8]>,
}
impl<'a> StoreTx<'a> for SledTx<'a> {
type ReadSlice = IVec;
type KVIter = SledIter<'a>;
type KVIterRaw = SledIterRaw<'a>;
#[inline]
fn get(&self, key: &[u8], _for_update: bool) -> Result<Option<Self::ReadSlice>> {
Ok(match self.changes.get(key) {
Some(Some(val)) => Some(IVec::from(val as &[u8])),
Some(None) => None,
None => self.db.get(key).into_diagnostic()?,
})
}
#[inline]
fn put(&mut self, key: &[u8], val: &[u8]) -> Result<()> {
self.changes.insert(key.into(), Some(val.into()));
Ok(())
}
#[inline]
fn del(&mut self, key: &[u8]) -> Result<()> {
self.changes.insert(key.into(), None);
Ok(())
}
#[inline]
fn exists(&self, key: &[u8], _for_update: bool) -> Result<bool> {
Ok(match self.changes.get(key) {
Some(Some(_)) => true,
Some(None) => false,
None => self.db.get(key).into_diagnostic()?.is_some(),
})
}
fn commit(&mut self) -> Result<()> {
self.db
.transaction(
|db: &TransactionalTree| -> Result<(), ConflictableTransactionError> {
for (k, v) in &self.changes {
match v {
None => {
db.remove(k as &[u8])?;
}
Some(v) => {
db.insert(k as &[u8], v as &[u8])?;
}
}
}
Ok(())
},
)
.into_diagnostic()?;
Ok(())
}
fn range_scan(&'a self, lower: &[u8], upper: &[u8]) -> Self::KVIter {
let change_iter = self.changes.range(lower.to_vec()..upper.to_vec()).fuse();
let db_iter = self.db.range(lower..upper).fuse();
SledIter {
change_iter,
db_iter,
change_cache: None,
db_cache: None,
}
}
fn range_scan_raw(&'a self, lower: &[u8], upper: &[u8]) -> Self::KVIterRaw {
let change_iter = self.changes.range(lower.to_vec()..upper.to_vec()).fuse();
let db_iter = self.db.range(lower..upper).fuse();
SledIterRaw {
change_iter,
db_iter,
change_cache: None,
db_cache: None,
}
}
}
struct SledIter<'a> {
change_iter: Fuse<Range<'a, Vec<u8>, Option<Vec<u8>>>>,
db_iter: Fuse<Iter>,
change_cache: Option<(Vec<u8>, Option<Vec<u8>>)>,
db_cache: Option<(IVec, IVec)>,
}
impl<'a> SledIter<'a> {
#[inline]
fn fill_cache(&mut self) -> Result<()> {
if self.change_cache.is_none() {
if let Some((k, v)) = self.change_iter.next() {
self.change_cache = Some((k.to_vec(), v.clone()))
}
}
if self.db_cache.is_none() {
if let Some(res) = self.db_iter.next() {
self.db_cache = Some(res.into_diagnostic()?);
}
}
Ok(())
}
#[inline]
fn next_inner(&mut self) -> Result<Option<Tuple>> {
loop {
self.fill_cache()?;
match (&self.change_cache, &self.db_cache) {
(None, None) => return Ok(None),
(Some((_, None)), None) => {
self.change_cache.take();
continue;
}
(Some((_, Some(_))), None) => {
let (k, sv) = self.change_cache.take().unwrap();
let v = sv.unwrap();
return Ok(Some(decode_tuple_from_kv(&k, &v)));
}
(None, Some(_)) => {
let (k, v) = self.db_cache.take().unwrap();
return Ok(Some(decode_tuple_from_kv(&k, &v)));
}
(Some((ck, _)), Some((dk, _))) => match ck.as_slice().cmp(dk) {
Ordering::Less => {
let (k, sv) = self.change_cache.take().unwrap();
match sv {
None => continue,
Some(v) => {
return Ok(Some(decode_tuple_from_kv(&k, &v)));
}
}
}
Ordering::Greater => {
let (k, v) = self.db_cache.take().unwrap();
return Ok(Some(decode_tuple_from_kv(&k, &v)));
}
Ordering::Equal => {
self.db_cache.take();
continue;
}
},
}
}
}
}
impl<'a> Iterator for SledIter<'a> {
type Item = Result<Tuple>;
#[inline]
fn next(&mut self) -> Option<Self::Item> {
swap_option_result(self.next_inner())
}
}
struct SledIterRaw<'a> {
change_iter: Fuse<Range<'a, Vec<u8>, Option<Vec<u8>>>>,
db_iter: Fuse<Iter>,
change_cache: Option<(Vec<u8>, Option<Vec<u8>>)>,
db_cache: Option<(IVec, IVec)>,
}
impl<'a> SledIterRaw<'a> {
#[inline]
fn fill_cache(&mut self) -> Result<()> {
if self.change_cache.is_none() {
if let Some((k, v)) = self.change_iter.next() {
self.change_cache = Some((k.to_vec(), v.clone()))
}
}
if self.db_cache.is_none() {
if let Some(res) = self.db_iter.next() {
self.db_cache = Some(res.into_diagnostic()?);
}
}
Ok(())
}
#[inline]
fn next_inner(&mut self) -> Result<Option<(Vec<u8>, Vec<u8>)>> {
loop {
self.fill_cache()?;
match (&self.change_cache, &self.db_cache) {
(None, None) => return Ok(None),
(Some((_, None)), None) => {
self.change_cache.take();
continue;
}
(Some((_, Some(_))), None) => {
let (k, sv) = self.change_cache.take().unwrap();
let v = sv.unwrap();
return Ok(Some((k, v)));
}
(None, Some(_)) => {
let (k, v) = self.db_cache.take().unwrap();
return Ok(Some((k.to_vec(), v.to_vec())));
}
(Some((ck, _)), Some((dk, _))) => match ck.as_slice().cmp(dk) {
Ordering::Less => {
let (k, sv) = self.change_cache.take().unwrap();
match sv {
None => continue,
Some(v) => return Ok(Some((k, v))),
}
}
Ordering::Greater => {
let (k, v) = self.db_cache.take().unwrap();
return Ok(Some((k.to_vec(), v.to_vec())));
}
Ordering::Equal => {
self.db_cache.take();
continue;
}
},
}
}
}
}
impl<'a> Iterator for SledIterRaw<'a> {
type Item = Result<(Vec<u8>, Vec<u8>)>;
#[inline]
fn next(&mut self) -> Option<Self::Item> {
swap_option_result(self.next_inner())
}
}
// /*
// * Copyright 2022, The Cozo Project Authors. Licensed under MPL-2.0.
// */
//
// use std::cmp::Ordering;
// use std::collections::btree_map::Range;
// use std::collections::BTreeMap;
// use std::iter::Fuse;
// use std::sync::{Arc, RwLock};
// use std::thread;
//
// use miette::{IntoDiagnostic, Result};
// use sled::transaction::{ConflictableTransactionError, TransactionalTree};
// use sled::{Db, IVec, Iter};
//
// use crate::data::tuple::Tuple;
// use crate::runtime::relation::decode_tuple_from_kv;
// use crate::storage::{Storage, StoreTx};
// use crate::utils::swap_option_result;
//
// #[derive(Clone)]
// struct SledStorage {
// db: Db,
// }
//
// impl Storage for SledStorage {
// type Tx = SledTx;
//
// fn transact(&self) -> Result<Self::Tx> {
// Ok(SledTx {
// db: self.db.clone(),
// changes: Default::default(),
// })
// }
//
// fn del_range(&self, lower: &[u8], upper: &[u8]) -> Result<()> {
// let db = self.db.clone();
// let lower_v = lower.to_vec();
// let upper_v = upper.to_vec();
// thread::spawn(move || -> Result<()> {
// for k_res in db.range(lower_v..upper_v).keys() {
// db.remove(k_res.into_diagnostic()?).into_diagnostic()?;
// }
// Ok(())
// });
// Ok(())
// }
//
// fn range_compact(&self, _lower: &[u8], _upper: &[u8]) -> Result<()> {
// Ok(())
// }
// }
//
// struct SledTx {
// db: Db,
// changes: Arc<RwLock<BTreeMap<Vec<u8>, Option<Vec<u8>>>>>,
// }
//
// impl StoreTx for SledTx {
// #[inline]
// fn get(&self, key: &[u8], _for_update: bool) -> Result<Option<Vec<u8>>> {
// Ok(match self.changes.read().unwrap().get(key) {
// Some(Some(val)) => Some(val.clone()),
// Some(None) => None,
// None => {
// let ret = self.db.get(key).into_diagnostic()?;
// ret.map(|v| v.to_vec())
// }
// })
// }
//
// #[inline]
// fn put(&mut self, key: &[u8], val: &[u8]) -> Result<()> {
// self.changes.write().unwrap().insert(key.into(), Some(val.into()));
// Ok(())
// }
//
// #[inline]
// fn del(&mut self, key: &[u8]) -> Result<()> {
// self.changes.write().unwrap().insert(key.into(), None);
// Ok(())
// }
//
// #[inline]
// fn exists(&self, key: &[u8], _for_update: bool) -> Result<bool> {
// Ok(match self.changes.read().unwrap().get(key) {
// Some(Some(_)) => true,
// Some(None) => false,
// None => self.db.get(key).into_diagnostic()?.is_some(),
// })
// }
//
// fn commit(&mut self) -> Result<()> {
// self.db
// .transaction(
// |db: &TransactionalTree| -> Result<(), ConflictableTransactionError> {
// for (k, v) in self.changes.read().unwrap().iter() {
// match v {
// None => {
// db.remove(k as &[u8])?;
// }
// Some(v) => {
// db.insert(k as &[u8], v as &[u8])?;
// }
// }
// }
// Ok(())
// },
// )
// .into_diagnostic()?;
// Ok(())
// }
//
// fn range_scan(&self, lower: &[u8], upper: &[u8]) -> Box<dyn Iterator<Item = Result<Tuple>>> {
// let change_iter = self.changes.read().unwrap().range(lower.to_vec()..upper.to_vec()).fuse();
// let db_iter = self.db.range(lower..upper).fuse();
// Box::new(SledIter {
// change_iter,
// db_iter,
// change_cache: None,
// db_cache: None,
// })
// }
//
// fn range_scan_raw(
// &self,
// lower: &[u8],
// upper: &[u8],
// ) -> Box<dyn Iterator<Item = Result<(Vec<u8>, Vec<u8>)>>> {
// let change_iter = self.changes.read().unwrap().range(lower.to_vec()..upper.to_vec()).fuse();
// let db_iter = self.db.range(lower..upper).fuse();
// Box::new(SledIterRaw {
// change_iter,
// db_iter,
// change_cache: None,
// db_cache: None,
// })
// }
// }
//
// struct SledIter<'a> {
// change_iter: Fuse<Range<'a, Vec<u8>, Option<Vec<u8>>>>,
// db_iter: Fuse<Iter>,
// change_cache: Option<(Vec<u8>, Option<Vec<u8>>)>,
// db_cache: Option<(IVec, IVec)>,
// }
//
// impl<'a> SledIter<'a> {
// #[inline]
// fn fill_cache(&mut self) -> Result<()> {
// if self.change_cache.is_none() {
// if let Some((k, v)) = self.change_iter.next() {
// self.change_cache = Some((k.to_vec(), v.clone()))
// }
// }
//
// if self.db_cache.is_none() {
// if let Some(res) = self.db_iter.next() {
// self.db_cache = Some(res.into_diagnostic()?);
// }
// }
//
// Ok(())
// }
//
// #[inline]
// fn next_inner(&mut self) -> Result<Option<Tuple>> {
// loop {
// self.fill_cache()?;
// match (&self.change_cache, &self.db_cache) {
// (None, None) => return Ok(None),
// (Some((_, None)), None) => {
// self.change_cache.take();
// continue;
// }
// (Some((_, Some(_))), None) => {
// let (k, sv) = self.change_cache.take().unwrap();
// let v = sv.unwrap();
// return Ok(Some(decode_tuple_from_kv(&k, &v)));
// }
// (None, Some(_)) => {
// let (k, v) = self.db_cache.take().unwrap();
// return Ok(Some(decode_tuple_from_kv(&k, &v)));
// }
// (Some((ck, _)), Some((dk, _))) => match ck.as_slice().cmp(dk) {
// Ordering::Less => {
// let (k, sv) = self.change_cache.take().unwrap();
// match sv {
// None => continue,
// Some(v) => {
// return Ok(Some(decode_tuple_from_kv(&k, &v)));
// }
// }
// }
// Ordering::Greater => {
// let (k, v) = self.db_cache.take().unwrap();
// return Ok(Some(decode_tuple_from_kv(&k, &v)));
// }
// Ordering::Equal => {
// self.db_cache.take();
// continue;
// }
// },
// }
// }
// }
// }
//
// impl<'a> Iterator for SledIter<'a> {
// type Item = Result<Tuple>;
//
// #[inline]
// fn next(&mut self) -> Option<Self::Item> {
// swap_option_result(self.next_inner())
// }
// }
//
// struct SledIterRaw<'a> {
// change_iter: Fuse<Range<'a, Vec<u8>, Option<Vec<u8>>>>,
// db_iter: Fuse<Iter>,
// change_cache: Option<(Vec<u8>, Option<Vec<u8>>)>,
// db_cache: Option<(IVec, IVec)>,
// }
//
// impl<'a> SledIterRaw<'a> {
// #[inline]
// fn fill_cache(&mut self) -> Result<()> {
// if self.change_cache.is_none() {
// if let Some((k, v)) = self.change_iter.next() {
// self.change_cache = Some((k.to_vec(), v.clone()))
// }
// }
//
// if self.db_cache.is_none() {
// if let Some(res) = self.db_iter.next() {
// self.db_cache = Some(res.into_diagnostic()?);
// }
// }
//
// Ok(())
// }
//
// #[inline]
// fn next_inner(&mut self) -> Result<Option<(Vec<u8>, Vec<u8>)>> {
// loop {
// self.fill_cache()?;
// match (&self.change_cache, &self.db_cache) {
// (None, None) => return Ok(None),
// (Some((_, None)), None) => {
// self.change_cache.take();
// continue;
// }
// (Some((_, Some(_))), None) => {
// let (k, sv) = self.change_cache.take().unwrap();
// let v = sv.unwrap();
// return Ok(Some((k, v)));
// }
// (None, Some(_)) => {
// let (k, v) = self.db_cache.take().unwrap();
// return Ok(Some((k.to_vec(), v.to_vec())));
// }
// (Some((ck, _)), Some((dk, _))) => match ck.as_slice().cmp(dk) {
// Ordering::Less => {
// let (k, sv) = self.change_cache.take().unwrap();
// match sv {
// None => continue,
// Some(v) => return Ok(Some((k, v))),
// }
// }
// Ordering::Greater => {
// let (k, v) = self.db_cache.take().unwrap();
// return Ok(Some((k.to_vec(), v.to_vec())));
// }
// Ordering::Equal => {
// self.db_cache.take();
// continue;
// }
// },
// }
// }
// }
// }
//
// impl<'a> Iterator for SledIterRaw<'a> {
// type Item = Result<(Vec<u8>, Vec<u8>)>;
//
// #[inline]
// fn next(&mut self) -> Option<Self::Item> {
// swap_option_result(self.next_inner())
// }
// }

@ -9,15 +9,16 @@ use approx::AbsDiffEq;
use env_logger::Env;
use lazy_static::lazy_static;
use serde_json::json;
use cozo::RocksDbStorage;
use cozo::Db;
use cozo::{new_cozo_rocksdb, Db};
lazy_static! {
static ref TEST_DB: Db = {
static ref TEST_DB: Db<RocksDbStorage> = {
let creation = Instant::now();
let path = "_test_air_routes";
_ = std::fs::remove_dir_all(path);
let db = Db::new(path).unwrap();
let db = new_cozo_rocksdb(path).unwrap();
dbg!(creation.elapsed());
let init = Instant::now();

@ -11,16 +11,19 @@ use std::sync::Mutex;
use lazy_static::lazy_static;
use cozo::Db;
use cozo::RocksDbStorage;
use cozo::{new_cozo_rocksdb, Db};
#[derive(Default)]
struct Handles {
struct Handles<S> {
current: AtomicI32,
dbs: Mutex<BTreeMap<i32, Db>>,
dbs: Mutex<BTreeMap<i32, Db<S>>>,
}
lazy_static! {
static ref HANDLES: Handles = Handles::default();
static ref HANDLES: Handles<RocksDbStorage> = Handles {
current: Default::default(),
dbs: Mutex::new(Default::default())
};
}
/// Open a database.
@ -38,7 +41,7 @@ pub unsafe extern "C" fn cozo_open_db(path: *const c_char, db_id: &mut i32) -> *
Err(err) => return CString::new(format!("{}", err)).unwrap().into_raw(),
};
match Db::new(path) {
match new_cozo_rocksdb(path) {
Ok(db) => {
let id = HANDLES.current.fetch_add(1, Ordering::AcqRel);
let mut dbs = HANDLES.dbs.lock().unwrap();

@ -9,15 +9,19 @@ use lazy_static::lazy_static;
use robusta_jni::bridge;
use cozo::Db;
use cozo::RocksDbStorage;
#[derive(Default)]
struct Handles {
struct Handles<S> {
current: AtomicI32,
dbs: Mutex<BTreeMap<i32, Db>>,
dbs: Mutex<BTreeMap<i32, Db<S>>>,
}
lazy_static! {
static ref HANDLES: Handles = Handles::default();
static ref HANDLES: Handles<RocksDbStorage> = Handles {
current: Default::default(),
dbs: Mutex::new(Default::default())
};
}
#[bridge]
@ -29,7 +33,7 @@ mod jni {
use robusta_jni::jni::errors::Result as JniResult;
use robusta_jni::jni::objects::AutoLocal;
use cozo::Db;
use cozo::{new_cozo_rocksdb};
use crate::HANDLES;
@ -42,7 +46,7 @@ mod jni {
impl<'env: 'borrow, 'borrow> CozoDb<'env, 'borrow> {
pub extern "jni" fn openDb(path: String) -> JniResult<i32> {
match Db::new(path) {
match new_cozo_rocksdb(path) {
Ok(db) => {
let id = HANDLES.current.fetch_add(1, Ordering::AcqRel);
let mut dbs = HANDLES.dbs.lock().unwrap();

Loading…
Cancel
Save