refactor storage API

main
Ziyang Hu 2 years ago
parent 1743922dfd
commit aa174a6f6a

73
Cargo.lock generated

@ -386,7 +386,7 @@ checksum = "5827cebf4670468b8772dd191856768aedcb1b0278a04f989f7766351917b9dc"
[[package]] [[package]]
name = "cozo" name = "cozo"
version = "0.1.6" version = "0.1.7"
dependencies = [ dependencies = [
"approx", "approx",
"base64", "base64",
@ -421,6 +421,7 @@ dependencies = [
"serde_bytes", "serde_bytes",
"serde_derive", "serde_derive",
"serde_json", "serde_json",
"sled",
"smallvec", "smallvec",
"smartstring", "smartstring",
"thiserror", "thiserror",
@ -719,12 +720,31 @@ dependencies = [
"percent-encoding", "percent-encoding",
] ]
[[package]]
name = "fs2"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9564fc758e15025b46aa6643b1b77d047d1a56a1aea6e01002ac0c7026876213"
dependencies = [
"libc",
"winapi",
]
[[package]] [[package]]
name = "fs_extra" name = "fs_extra"
version = "1.2.0" version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2022715d62ab30faffd124d40b76f4134a550a87792276512b18d63272333394" checksum = "2022715d62ab30faffd124d40b76f4134a550a87792276512b18d63272333394"
[[package]]
name = "fxhash"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c31b6d751ae2c7f11320402d34e41349dd1016f8d5d45e48c4312bc8625af50c"
dependencies = [
"byteorder",
]
[[package]] [[package]]
name = "generic-array" name = "generic-array"
version = "0.14.6" version = "0.14.6"
@ -959,6 +979,16 @@ dependencies = [
"cc", "cc",
] ]
[[package]]
name = "lock_api"
version = "0.4.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "435011366fe56583b16cf956f9df0095b405b82d76425bc8981c0e22e60ec4df"
dependencies = [
"autocfg",
"scopeguard",
]
[[package]] [[package]]
name = "log" name = "log"
version = "0.4.17" version = "0.4.17"
@ -1210,6 +1240,31 @@ version = "3.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c1b04fb49957986fdce4d6ee7a65027d55d4b6d2265e5848bbb507b58ccfdb6f" checksum = "c1b04fb49957986fdce4d6ee7a65027d55d4b6d2265e5848bbb507b58ccfdb6f"
[[package]]
name = "parking_lot"
version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99"
dependencies = [
"instant",
"lock_api",
"parking_lot_core",
]
[[package]]
name = "parking_lot_core"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d76e8e1493bcac0d2766c42737f34458f1c8c50c0d23bcb24ea953affb273216"
dependencies = [
"cfg-if 1.0.0",
"instant",
"libc",
"redox_syscall",
"smallvec",
"winapi",
]
[[package]] [[package]]
name = "parse-zoneinfo" name = "parse-zoneinfo"
version = "0.3.0" version = "0.3.0"
@ -1765,6 +1820,22 @@ version = "0.3.10"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7bd3e3206899af3f8b12af284fafc038cc1dc2b41d1b89dd17297221c5d225de" checksum = "7bd3e3206899af3f8b12af284fafc038cc1dc2b41d1b89dd17297221c5d225de"
[[package]]
name = "sled"
version = "0.34.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f96b4737c2ce5987354855aed3797279def4ebf734436c6aa4552cf8e169935"
dependencies = [
"crc32fast",
"crossbeam-epoch",
"crossbeam-utils",
"fs2",
"fxhash",
"libc",
"log",
"parking_lot",
]
[[package]] [[package]]
name = "smallvec" name = "smallvec"
version = "1.10.0" version = "1.10.0"

@ -57,6 +57,7 @@ uuid = { version = "1.1.2", features = ["v1", "v4", "serde"] }
csv = "1.1.6" csv = "1.1.6"
tikv-jemallocator-global = { version = "0.5.0", optional = true } tikv-jemallocator-global = { version = "0.5.0", optional = true }
cozorocks = { path = "cozorocks", version = "0.1.0" } cozorocks = { path = "cozorocks", version = "0.1.0" }
sled = "0.34.7"
clap = { version = "3.2.8", features = ["derive"] } clap = { version = "3.2.8", features = ["derive"] }
rouille = "3.5.0" rouille = "3.5.0"

@ -25,6 +25,12 @@ impl Deref for PinSlice {
} }
} }
impl AsRef<[u8]> for PinSlice {
fn as_ref(&self) -> &[u8] {
self as &[u8]
}
}
impl Debug for PinSlice { impl Debug for PinSlice {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let to_d: &[u8] = self; let to_d: &[u8] = self;

@ -21,6 +21,7 @@ use crate::parse::parse_script;
use crate::runtime::relation::{AccessLevel, InputRelationHandle, InsufficientAccessLevel}; use crate::runtime::relation::{AccessLevel, InputRelationHandle, InsufficientAccessLevel};
use crate::runtime::transact::SessionTx; use crate::runtime::transact::SessionTx;
use crate::Db; use crate::Db;
use crate::storage::StoreTx;
#[derive(Debug, Error, Diagnostic)] #[derive(Debug, Error, Diagnostic)]
#[error("attempting to write into relation {0} of arity {1} with data of arity {2}")] #[error("attempting to write into relation {0} of arity {1} with data of arity {2}")]

@ -21,7 +21,7 @@ use serde_json::{json, Map};
use smartstring::SmartString; use smartstring::SmartString;
use thiserror::Error; use thiserror::Error;
use cozorocks::{DbBuilder, RocksDb}; use cozorocks::DbBuilder;
use crate::data::json::JsonValue; use crate::data::json::JsonValue;
use crate::data::program::{InputProgram, QueryAssertion, RelationOp}; use crate::data::program::{InputProgram, QueryAssertion, RelationOp};
@ -36,6 +36,8 @@ use crate::query::relation::{
}; };
use crate::runtime::relation::{RelationHandle, RelationId}; use crate::runtime::relation::{RelationHandle, RelationId};
use crate::runtime::transact::SessionTx; use crate::runtime::transact::SessionTx;
use crate::storage::rocks::RocksDbStorage;
use crate::storage::{Storage, StoreTx};
struct RunningQueryHandle { struct RunningQueryHandle {
started_at: f64, started_at: f64,
@ -66,7 +68,7 @@ const CURRENT_STORAGE_VERSION: u64 = 1;
/// The database object of Cozo. /// The database object of Cozo.
#[derive(Clone)] #[derive(Clone)]
pub struct Db { pub struct Db {
db: RocksDb, db: RocksDbStorage,
relation_store_id: Arc<AtomicU64>, relation_store_id: Arc<AtomicU64>,
queries_count: Arc<AtomicU64>, queries_count: Arc<AtomicU64>,
running_queries: Arc<Mutex<BTreeMap<u64, RunningQueryHandle>>>, running_queries: Arc<Mutex<BTreeMap<u64, RunningQueryHandle>>>,
@ -146,7 +148,7 @@ impl Db {
let db = db_builder.build()?; let db = db_builder.build()?;
let ret = Self { let ret = Self {
db, db: RocksDbStorage::new(db),
relation_store_id: Arc::new(Default::default()), relation_store_id: Arc::new(Default::default()),
queries_count: Arc::new(Default::default()), queries_count: Arc::new(Default::default()),
running_queries: Arc::new(Mutex::new(Default::default())), running_queries: Arc::new(Mutex::new(Default::default())),
@ -170,7 +172,7 @@ impl Db {
} }
fn transact(&self) -> Result<SessionTx> { fn transact(&self) -> Result<SessionTx> {
let ret = SessionTx { let ret = SessionTx {
tx: self.db.transact().set_snapshot(true).start(), tx: self.db.transact()?,
mem_store_id: Default::default(), mem_store_id: Default::default(),
relation_store_id: self.relation_store_id.clone(), relation_store_id: self.relation_store_id.clone(),
}; };
@ -178,7 +180,7 @@ impl Db {
} }
fn transact_write(&self) -> Result<SessionTx> { fn transact_write(&self) -> Result<SessionTx> {
let ret = SessionTx { let ret = SessionTx {
tx: self.db.transact().set_snapshot(true).start(), tx: self.db.transact()?,
mem_store_id: Default::default(), mem_store_id: Default::default(),
relation_store_id: self.relation_store_id.clone(), relation_store_id: self.relation_store_id.clone(),
}; };
@ -273,7 +275,7 @@ impl Db {
assert!(cleanups.is_empty(), "non-empty cleanups on read-only tx"); assert!(cleanups.is_empty(), "non-empty cleanups on read-only tx");
} }
for (lower, upper) in cleanups { for (lower, upper) in cleanups {
self.db.range_del(&lower, &upper)?; self.db.del_range(&lower, &upper)?;
} }
Ok(res) Ok(res)
} }
@ -716,7 +718,7 @@ impl Db {
} }
pub(crate) fn remove_relation(&self, name: &Symbol, tx: &mut SessionTx) -> Result<()> { pub(crate) fn remove_relation(&self, name: &Symbol, tx: &mut SessionTx) -> Result<()> {
let (lower, upper) = tx.destroy_relation(name)?; let (lower, upper) = tx.destroy_relation(name)?;
self.db.range_del(&lower, &upper)?; self.db.del_range(&lower, &upper)?;
Ok(()) Ok(())
} }
pub(crate) fn list_running(&self) -> Result<JsonValue> { pub(crate) fn list_running(&self) -> Result<JsonValue> {
@ -763,23 +765,14 @@ impl Db {
LARGEST_UTF_CHAR, LARGEST_UTF_CHAR,
)))]) )))])
.encode_as_key(RelationId::SYSTEM); .encode_as_key(RelationId::SYSTEM);
let mut it = self let tx = self.db.transact()?;
.db
.transact()
.start()
.iterator()
.upper_bound(&upper)
.start();
it.seek(&lower);
let mut collected = vec![]; let mut collected = vec![];
while let Some((k_slice, v_slice)) = it.pair()? { for kv_res in tx.range_scan_raw(&lower, &upper) {
if upper.as_slice() <= k_slice { let (k_slice, v_slice) = kv_res?;
if upper <= k_slice {
break; break;
} }
// if compare_tuple_keys(&upper, k_slice) != Greater { let meta = RelationHandle::decode(&v_slice)?;
// break;
// }
let meta = RelationHandle::decode(v_slice)?;
let n_keys = meta.metadata.keys.len(); let n_keys = meta.metadata.keys.len();
let n_dependents = meta.metadata.non_keys.len(); let n_dependents = meta.metadata.non_keys.len();
let arity = n_keys + n_dependents; let arity = n_keys + n_dependents;
@ -795,7 +788,6 @@ impl Db {
meta.rm_triggers.len(), meta.rm_triggers.len(),
meta.replace_triggers.len(), meta.replace_triggers.len(),
])); ]));
it.next();
} }
Ok(json!({"rows": collected, "headers": Ok(json!({"rows": collected, "headers":
["name", "arity", "access_level", "n_keys", "n_non_keys", "n_put_triggers", "n_rm_triggers", "n_replace_triggers"]})) ["name", "arity", "access_level", "n_keys", "n_non_keys", "n_put_triggers", "n_rm_triggers", "n_replace_triggers"]}))

@ -12,16 +12,14 @@ use serde::Serialize;
use smartstring::{LazyCompact, SmartString}; use smartstring::{LazyCompact, SmartString};
use thiserror::Error; use thiserror::Error;
use cozorocks::DbIter;
use crate::data::memcmp::MemCmpEncoder; use crate::data::memcmp::MemCmpEncoder;
use crate::data::relation::StoredRelationMetadata; use crate::data::relation::StoredRelationMetadata;
use crate::data::symb::Symbol; use crate::data::symb::Symbol;
use crate::data::tuple::{ENCODED_KEY_MIN_LEN, Tuple}; use crate::data::tuple::{Tuple, ENCODED_KEY_MIN_LEN};
use crate::data::value::DataValue; use crate::data::value::DataValue;
use crate::parse::SourceSpan; use crate::parse::SourceSpan;
use crate::runtime::transact::SessionTx; use crate::runtime::transact::SessionTx;
use crate::utils::swap_option_result; use crate::storage::StoreTx;
#[derive( #[derive(
Copy, Copy,
@ -220,7 +218,7 @@ impl RelationHandle {
pub(crate) fn scan_all(&self, tx: &SessionTx) -> impl Iterator<Item = Result<Tuple>> { pub(crate) fn scan_all(&self, tx: &SessionTx) -> impl Iterator<Item = Result<Tuple>> {
let lower = Tuple::default().encode_as_key(self.id); let lower = Tuple::default().encode_as_key(self.id);
let upper = Tuple::default().encode_as_key(self.id.next()); let upper = Tuple::default().encode_as_key(self.id.next());
RelationIterator::new(tx, &lower, &upper) tx.tx.range_scan(&lower, &upper)
} }
pub(crate) fn scan_prefix( pub(crate) fn scan_prefix(
@ -234,7 +232,9 @@ impl RelationHandle {
upper.push(DataValue::Bot); upper.push(DataValue::Bot);
let prefix_encoded = Tuple(lower).encode_as_key(self.id); let prefix_encoded = Tuple(lower).encode_as_key(self.id);
let upper_encoded = Tuple(upper).encode_as_key(self.id); let upper_encoded = Tuple(upper).encode_as_key(self.id);
RelationIterator::new(tx, &prefix_encoded, &upper_encoded) // RelationIterator::new(tx, &prefix_encoded, &upper_encoded)
tx.tx
.range_scan(&prefix_encoded, &upper_encoded)
} }
pub(crate) fn scan_bounded_prefix( pub(crate) fn scan_bounded_prefix(
&self, &self,
@ -250,56 +250,67 @@ impl RelationHandle {
upper_t.0.push(DataValue::Bot); upper_t.0.push(DataValue::Bot);
let lower_encoded = lower_t.encode_as_key(self.id); let lower_encoded = lower_t.encode_as_key(self.id);
let upper_encoded = upper_t.encode_as_key(self.id); let upper_encoded = upper_t.encode_as_key(self.id);
RelationIterator::new(tx, &lower_encoded, &upper_encoded) tx.tx
.range_scan(&lower_encoded, &upper_encoded)
} }
} }
struct RelationIterator { // struct RelationIterator {
inner: DbIter, // inner: DbIter,
started: bool, // started: bool,
upper_bound: Vec<u8>, // upper_bound: Vec<u8>,
} // }
impl RelationIterator { #[inline]
fn new(sess: &SessionTx, lower: &[u8], upper: &[u8]) -> Self { pub(crate) fn decode_tuple_from_kv(key: &[u8], val: &[u8]) -> Tuple {
let mut inner = sess.tx.iterator().upper_bound(upper).start(); let mut tup = Tuple::decode_from_key(key);
inner.seek(lower); if !val.is_empty() {
Self { let vals: Vec<DataValue> = rmp_serde::from_slice(&val[ENCODED_KEY_MIN_LEN..]).unwrap();
inner,
started: false,
upper_bound: upper.to_vec(),
}
}
fn next_inner(&mut self) -> Result<Option<Tuple>> {
if self.started {
self.inner.next()
} else {
self.started = true;
}
Ok(match self.inner.pair()? {
None => None,
Some((k_slice, v_slice)) => {
if self.upper_bound.as_slice() <= k_slice {
None
} else {
let mut tup = Tuple::decode_from_key(k_slice);
if !v_slice.is_empty() {
let vals: Vec<DataValue> = rmp_serde::from_slice(&v_slice[ENCODED_KEY_MIN_LEN..]).unwrap();
tup.0.extend(vals); tup.0.extend(vals);
} }
Some(tup) tup
}
}
})
}
} }
impl Iterator for RelationIterator { // impl RelationIterator {
type Item = Result<Tuple>; // fn new(sess: &SessionTx, lower: &[u8], upper: &[u8]) -> Self {
fn next(&mut self) -> Option<Self::Item> { // let mut inner = sess.tx.iterator().upper_bound(upper).start();
swap_option_result(self.next_inner()) // inner.seek(lower);
} // Self {
} // inner,
// started: false,
// upper_bound: upper.to_vec(),
// }
// }
// fn next_inner(&mut self) -> Result<Option<Tuple>> {
// if self.started {
// self.inner.next()
// } else {
// self.started = true;
// }
// Ok(match self.inner.pair()? {
// None => None,
// Some((k_slice, v_slice)) => {
// if self.upper_bound.as_slice() <= k_slice {
// None
// } else {
// let mut tup = Tuple::decode_from_key(k_slice);
// if !v_slice.is_empty() {
// let vals: Vec<DataValue> = rmp_serde::from_slice(&v_slice[ENCODED_KEY_MIN_LEN..]).unwrap();
// tup.0.extend(vals);
// }
// Some(tup)
// }
// }
// })
// }
// }
//
// impl Iterator for RelationIterator {
// type Item = Result<Tuple>;
// fn next(&mut self) -> Option<Self::Item> {
// swap_option_result(self.next_inner())
// }
// }
#[derive(Debug, Diagnostic, Error)] #[derive(Debug, Diagnostic, Error)]
#[error("Cannot create relation {0} as one with the same name already exists")] #[error("Cannot create relation {0} as one with the same name already exists")]

@ -7,8 +7,6 @@ use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
use miette::Result; use miette::Result;
use cozorocks::Tx;
use crate::data::program::MagicSymbol; use crate::data::program::MagicSymbol;
use crate::data::symb::Symbol; use crate::data::symb::Symbol;
use crate::data::tuple::Tuple; use crate::data::tuple::Tuple;
@ -16,9 +14,11 @@ use crate::data::value::DataValue;
use crate::parse::SourceSpan; use crate::parse::SourceSpan;
use crate::runtime::in_mem::{InMemRelation, StoredRelationId}; use crate::runtime::in_mem::{InMemRelation, StoredRelationId};
use crate::runtime::relation::RelationId; use crate::runtime::relation::RelationId;
use crate::storage::rocks::RocksDbTx;
use crate::storage::StoreTx;
pub struct SessionTx { pub struct SessionTx {
pub(crate) tx: Tx, pub(crate) tx: RocksDbTx,
pub(crate) relation_store_id: Arc<AtomicU64>, pub(crate) relation_store_id: Arc<AtomicU64>,
pub(crate) mem_store_id: Arc<AtomicU32>, pub(crate) mem_store_id: Arc<AtomicU32>,
} }

@ -4,6 +4,8 @@
use miette::Result; use miette::Result;
use crate::data::tuple::Tuple;
pub(crate) mod rocks; pub(crate) mod rocks;
pub(crate) mod sled; pub(crate) mod sled;
pub(crate) mod tikv; pub(crate) mod tikv;
@ -11,22 +13,34 @@ pub(crate) mod tikv;
pub(crate) trait Storage { pub(crate) trait Storage {
type Tx: StoreTx; type Tx: StoreTx;
fn tx(&self) -> Result<Self::Tx>; fn transact(&self) -> Result<Self::Tx>;
fn del_range(&self, lower: &[u8], upper: &[u8]) -> Result<()>; fn del_range(&self, lower: &[u8], upper: &[u8]) -> Result<()>;
fn range_compact(
&self,
lower: &[u8],
upper: &[u8],
) -> Result<()>;
} }
pub(crate) trait StoreTx { pub(crate) trait StoreTx {
type ReadSlice: AsRef<[u8]>; type ReadSlice: AsRef<[u8]>;
type IterSlice: AsRef<[u8]>;
type KeyIter: Iterator<Item = Result<Self::IterSlice>>; type KVIter: Iterator<Item = Result<Tuple>>;
type KeyValueIter: Iterator<Item = Result<(Self::IterSlice, Self::IterSlice)>>; type KVIterRaw: Iterator<Item = Result<(Vec<u8>, Vec<u8>)>>;
fn get(&self, key: &[u8], for_update: bool) -> Result<Option<Self::ReadSlice>>; fn get(&self, key: &[u8], for_update: bool) -> Result<Option<Self::ReadSlice>>;
fn put(&mut self, key: &[u8], val: &[u8]) -> Result<()>; fn put(&mut self, key: &[u8], val: &[u8]) -> Result<()>;
fn del(&mut self, key: &[u8]) -> Result<()>; fn del(&mut self, key: &[u8]) -> Result<()>;
fn exists(&self, key: &[u8], for_update: bool) -> Result<bool>; fn exists(&self, key: &[u8], for_update: bool) -> Result<bool>;
fn commit(&mut self) -> Result<()>; fn commit(&mut self) -> Result<()>;
fn range_scan(&self, lower: &[u8], upper: &[u8]) -> Self::KeyValueIter; fn range_scan(
fn range_key_scan(&self, lower: &[u8], upper: &[u8]) -> Self::KeyIter; &self,
lower: &[u8],
upper: &[u8],
) -> Self::KVIter;
fn range_scan_raw(
&self,
lower: &[u8],
upper: &[u8],
) -> Self::KVIterRaw;
} }

@ -2,77 +2,159 @@
* Copyright 2022, The Cozo Project Authors. Licensed under MPL-2.0. * Copyright 2022, The Cozo Project Authors. Licensed under MPL-2.0.
*/ */
use miette::Result; use miette::{IntoDiagnostic, Result};
use cozorocks::{DbIter, PinSlice, RocksDb, Tx};
use crate::data::tuple::Tuple;
use crate::runtime::relation::decode_tuple_from_kv;
use crate::storage::{Storage, StoreTx}; use crate::storage::{Storage, StoreTx};
use crate::utils::swap_option_result;
#[derive(Clone)]
pub(crate) struct RocksDbStorage {
db: RocksDb,
}
struct RocksDbStorage; impl RocksDbStorage {
pub(crate) fn new(db: RocksDb) -> Self {
Self { db }
}
}
impl Storage for RocksDbStorage { impl Storage for RocksDbStorage {
type Tx = RocksDbTx; type Tx = RocksDbTx;
fn tx(&self) -> miette::Result<Self::Tx> { fn transact(&self) -> Result<Self::Tx> {
todo!() let db_tx = self.db.transact().set_snapshot(true).start();
Ok(RocksDbTx { db_tx })
}
fn del_range(&self, lower: &[u8], upper: &[u8]) -> Result<()> {
Ok(self.db.range_del(lower, upper)?)
} }
fn del_range(&self, lower: &[u8], upper: &[u8]) -> miette::Result<()> { fn range_compact(&self, lower: &[u8], upper: &[u8]) -> Result<()> {
todo!() self.db.range_compact(lower, upper).into_diagnostic()
} }
} }
struct RocksDbTx; pub(crate) struct RocksDbTx {
db_tx: Tx,
}
impl StoreTx for RocksDbTx { impl StoreTx for RocksDbTx {
type ReadSlice = Vec<u8>; type ReadSlice = PinSlice;
type IterSlice = Vec<u8>; type KVIter = RocksDbIterator;
type KeyIter = RocksDbKeyIter; type KVIterRaw = RocksDbIteratorRaw;
type KeyValueIter = RocksDbIter;
fn get(&self, key: &[u8], for_update: bool) -> miette::Result<Option<Self::ReadSlice>> { #[inline]
todo!() fn get(&self, key: &[u8], for_update: bool) -> Result<Option<Self::ReadSlice>> {
Ok(self.db_tx.get(key, for_update)?)
} }
fn put(&mut self, key: &[u8], val: &[u8]) -> miette::Result<()> { #[inline]
todo!() fn put(&mut self, key: &[u8], val: &[u8]) -> Result<()> {
Ok(self.db_tx.put(key, val)?)
} }
fn del(&mut self, key: &[u8]) -> miette::Result<()> { #[inline]
todo!() fn del(&mut self, key: &[u8]) -> Result<()> {
Ok(self.db_tx.del(key)?)
} }
fn exists(&self, key: &[u8], for_update: bool) -> miette::Result<bool> { #[inline]
todo!() fn exists(&self, key: &[u8], for_update: bool) -> Result<bool> {
Ok(self.db_tx.exists(key, for_update)?)
} }
fn commit(&mut self) -> miette::Result<()> { fn commit(&mut self) -> Result<()> {
todo!() Ok(self.db_tx.commit()?)
} }
fn range_scan(&self, lower: &[u8], upper: &[u8]) -> Self::KeyValueIter { fn range_scan(&self, lower: &[u8], upper: &[u8]) -> Self::KVIter {
todo!() let mut inner = self.db_tx.iterator().upper_bound(upper).start();
inner.seek(lower);
RocksDbIterator {
inner,
started: false,
upper_bound: upper.to_vec(),
}
} }
fn range_key_scan(&self, lower: &[u8], upper: &[u8]) -> Self::KeyIter { fn range_scan_raw(&self, lower: &[u8], upper: &[u8]) -> Self::KVIterRaw {
todo!() let mut inner = self.db_tx.iterator().upper_bound(upper).start();
inner.seek(lower);
RocksDbIteratorRaw {
inner,
started: false,
upper_bound: upper.to_vec(),
}
} }
} }
struct RocksDbKeyIter; pub(crate) struct RocksDbIterator {
inner: DbIter,
started: bool,
upper_bound: Vec<u8>,
}
impl Iterator for RocksDbKeyIter { impl RocksDbIterator {
type Item = Result<Vec<u8>>; fn next_inner(&mut self) -> Result<Option<Tuple>> {
if self.started {
self.inner.next()
} else {
self.started = true;
}
Ok(match self.inner.pair()? {
None => None,
Some((k_slice, v_slice)) => {
if self.upper_bound.as_slice() <= k_slice {
None
} else {
Some(decode_tuple_from_kv(k_slice, v_slice))
}
}
})
}
}
impl Iterator for RocksDbIterator {
type Item = Result<Tuple>;
fn next(&mut self) -> Option<Self::Item> { fn next(&mut self) -> Option<Self::Item> {
todo!() swap_option_result(self.next_inner())
} }
} }
struct RocksDbIter; pub(crate) struct RocksDbIteratorRaw {
inner: DbIter,
started: bool,
upper_bound: Vec<u8>,
}
impl Iterator for RocksDbIter { impl RocksDbIteratorRaw {
type Item = Result<(Vec<u8>, Vec<u8>)>; fn next_inner(&mut self) -> Result<Option<(Vec<u8>, Vec<u8>)>> {
if self.started {
self.inner.next()
} else {
self.started = true;
}
Ok(match self.inner.pair()? {
None => None,
Some((k_slice, v_slice)) => {
if self.upper_bound.as_slice() <= k_slice {
None
} else {
Some((k_slice.to_vec(), v_slice.to_vec()))
}
}
})
}
}
impl Iterator for RocksDbIteratorRaw {
type Item = Result<(Vec<u8>, Vec<u8>)>;
fn next(&mut self) -> Option<Self::Item> { fn next(&mut self) -> Option<Self::Item> {
todo!() swap_option_result(self.next_inner())
} }
} }

@ -2,3 +2,125 @@
* Copyright 2022, The Cozo Project Authors. Licensed under MPL-2.0. * Copyright 2022, The Cozo Project Authors. Licensed under MPL-2.0.
*/ */
use std::collections::BTreeMap;
use miette::{IntoDiagnostic, Result};
use sled::transaction::{ConflictableTransactionError, TransactionalTree};
use sled::{Db, IVec};
use crate::data::tuple::Tuple;
use crate::storage::{Storage, StoreTx};
#[derive(Clone)]
struct SledStorage {
db: Db,
}
impl Storage for SledStorage {
type Tx = SledTx;
fn transact(&self) -> Result<Self::Tx> {
Ok(SledTx {
db: self.db.clone(),
changes: Default::default(),
})
}
fn del_range(&self, lower: &[u8], upper: &[u8]) -> Result<()> {
todo!()
}
fn range_compact(&self, _lower: &[u8], _upper: &[u8]) -> Result<()> {
Ok(())
}
}
struct SledTx {
db: Db,
changes: BTreeMap<Box<[u8]>, Option<Box<[u8]>>>,
}
impl StoreTx for SledTx {
type ReadSlice = IVec;
type KVIter = SledIter;
type KVIterRaw = SledIterRaw;
fn get(&self, key: &[u8], _for_update: bool) -> Result<Option<Self::ReadSlice>> {
Ok(match self.changes.get(key) {
Some(Some(val)) => Some(IVec::from(val as &[u8])),
Some(None) => None,
None => self.db.get(key).into_diagnostic()?,
})
}
fn put(&mut self, key: &[u8], val: &[u8]) -> Result<()> {
self.changes.insert(key.into(), Some(val.into()));
Ok(())
}
fn del(&mut self, key: &[u8]) -> Result<()> {
self.changes.insert(key.into(), None);
Ok(())
}
fn exists(&self, key: &[u8], _for_update: bool) -> Result<bool> {
Ok(match self.changes.get(key) {
Some(Some(_)) => true,
Some(None) => false,
None => self.db.get(key).into_diagnostic()?.is_some(),
})
}
fn commit(&mut self) -> Result<()> {
self.db
.transaction(
|db: &TransactionalTree| -> Result<(), ConflictableTransactionError> {
for (k, v) in &self.changes {
match v {
None => {
db.remove(k as &[u8])?;
}
Some(v) => {
db.insert(k as &[u8], v as &[u8])?;
}
}
}
Ok(())
},
)
.into_diagnostic()?;
Ok(())
}
fn range_scan(
&self,
lower: &[u8],
upper: &[u8],
) -> Self::KVIter {
todo!()
}
fn range_scan_raw(&self, lower: &[u8], upper: &[u8]) -> Self::KVIterRaw {
todo!()
}
}
struct SledIter {}
impl Iterator for SledIter {
type Item = Result<Tuple>;
fn next(&mut self) -> Option<Self::Item> {
todo!()
}
}
struct SledIterRaw {}
impl Iterator for SledIterRaw {
type Item = Result<(Vec<u8>, Vec<u8>)>;
fn next(&mut self) -> Option<Self::Item> {
todo!()
}
}
Loading…
Cancel
Save