complete Sled storage

main
Ziyang Hu 2 years ago
parent aa174a6f6a
commit 7a51120a90

@ -10,19 +10,15 @@ pub(crate) mod rocks;
pub(crate) mod sled;
pub(crate) mod tikv;
pub(crate) trait Storage {
type Tx: StoreTx;
pub(crate) trait Storage<'a> {
type Tx: StoreTx<'a>;
fn transact(&self) -> Result<Self::Tx>;
fn transact(&'a self) -> Result<Self::Tx>;
fn del_range(&self, lower: &[u8], upper: &[u8]) -> Result<()>;
fn range_compact(
&self,
lower: &[u8],
upper: &[u8],
) -> Result<()>;
fn range_compact(&self, lower: &[u8], upper: &[u8]) -> Result<()>;
}
pub(crate) trait StoreTx {
pub(crate) trait StoreTx<'a> {
type ReadSlice: AsRef<[u8]>;
type KVIter: Iterator<Item = Result<Tuple>>;
@ -33,14 +29,6 @@ pub(crate) trait StoreTx {
fn del(&mut self, key: &[u8]) -> Result<()>;
fn exists(&self, key: &[u8], for_update: bool) -> Result<bool>;
fn commit(&mut self) -> Result<()>;
fn range_scan(
&self,
lower: &[u8],
upper: &[u8],
) -> Self::KVIter;
fn range_scan_raw(
&self,
lower: &[u8],
upper: &[u8],
) -> Self::KVIterRaw;
fn range_scan(&'a self, lower: &[u8], upper: &[u8]) -> Self::KVIter;
fn range_scan_raw(&'a self, lower: &[u8], upper: &[u8]) -> Self::KVIterRaw;
}

@ -22,7 +22,7 @@ impl RocksDbStorage {
}
}
impl Storage for RocksDbStorage {
impl Storage<'_> for RocksDbStorage {
type Tx = RocksDbTx;
fn transact(&self) -> Result<Self::Tx> {
@ -43,7 +43,7 @@ pub(crate) struct RocksDbTx {
db_tx: Tx,
}
impl StoreTx for RocksDbTx {
impl StoreTx<'_> for RocksDbTx {
type ReadSlice = PinSlice;
type KVIter = RocksDbIterator;
type KVIterRaw = RocksDbIteratorRaw;
@ -100,6 +100,7 @@ pub(crate) struct RocksDbIterator {
}
impl RocksDbIterator {
#[inline]
fn next_inner(&mut self) -> Result<Option<Tuple>> {
if self.started {
self.inner.next()
@ -112,6 +113,7 @@ impl RocksDbIterator {
if self.upper_bound.as_slice() <= k_slice {
None
} else {
// upper bound is exclusive
Some(decode_tuple_from_kv(k_slice, v_slice))
}
}
@ -121,6 +123,7 @@ impl RocksDbIterator {
impl Iterator for RocksDbIterator {
type Item = Result<Tuple>;
#[inline]
fn next(&mut self) -> Option<Self::Item> {
swap_option_result(self.next_inner())
}
@ -133,6 +136,7 @@ pub(crate) struct RocksDbIteratorRaw {
}
impl RocksDbIteratorRaw {
#[inline]
fn next_inner(&mut self) -> Result<Option<(Vec<u8>, Vec<u8>)>> {
if self.started {
self.inner.next()
@ -143,6 +147,7 @@ impl RocksDbIteratorRaw {
None => None,
Some((k_slice, v_slice)) => {
if self.upper_bound.as_slice() <= k_slice {
// upper bound is exclusive
None
} else {
Some((k_slice.to_vec(), v_slice.to_vec()))
@ -154,6 +159,7 @@ impl RocksDbIteratorRaw {
impl Iterator for RocksDbIteratorRaw {
type Item = Result<(Vec<u8>, Vec<u8>)>;
#[inline]
fn next(&mut self) -> Option<Self::Item> {
swap_option_result(self.next_inner())
}

@ -2,32 +2,50 @@
* Copyright 2022, The Cozo Project Authors. Licensed under MPL-2.0.
*/
use std::cmp::Ordering;
use std::collections::btree_map::Range;
use std::collections::BTreeMap;
use std::iter::Fuse;
use std::marker::PhantomData;
use std::thread;
use miette::{IntoDiagnostic, Result};
use sled::transaction::{ConflictableTransactionError, TransactionalTree};
use sled::{Db, IVec};
use sled::{Db, IVec, Iter};
use crate::data::tuple::Tuple;
use crate::runtime::relation::decode_tuple_from_kv;
use crate::storage::{Storage, StoreTx};
use crate::utils::swap_option_result;
#[derive(Clone)]
struct SledStorage {
struct SledStorage<'a> {
db: Db,
_phantom: PhantomData<&'a [u8]>,
}
impl Storage for SledStorage {
type Tx = SledTx;
impl<'a> Storage<'a> for SledStorage<'a> {
type Tx = SledTx<'a>;
fn transact(&self) -> Result<Self::Tx> {
fn transact(&'a self) -> Result<Self::Tx> {
Ok(SledTx {
db: self.db.clone(),
changes: Default::default(),
_phantom: Default::default(),
})
}
fn del_range(&self, lower: &[u8], upper: &[u8]) -> Result<()> {
todo!()
let db = self.db.clone();
let lower_v = lower.to_vec();
let upper_v = upper.to_vec();
thread::spawn(move || -> Result<()> {
for k_res in db.range(lower_v..upper_v).keys() {
db.remove(k_res.into_diagnostic()?).into_diagnostic()?;
}
Ok(())
});
Ok(())
}
fn range_compact(&self, _lower: &[u8], _upper: &[u8]) -> Result<()> {
@ -35,15 +53,16 @@ impl Storage for SledStorage {
}
}
struct SledTx {
struct SledTx<'a> {
db: Db,
changes: BTreeMap<Box<[u8]>, Option<Box<[u8]>>>,
changes: BTreeMap<Vec<u8>, Option<Vec<u8>>>,
_phantom: PhantomData<&'a [u8]>,
}
impl StoreTx for SledTx {
impl<'a> StoreTx<'a> for SledTx<'a> {
type ReadSlice = IVec;
type KVIter = SledIter;
type KVIterRaw = SledIterRaw;
type KVIter = SledIter<'a>;
type KVIterRaw = SledIterRaw<'a>;
fn get(&self, key: &[u8], _for_update: bool) -> Result<Option<Self::ReadSlice>> {
Ok(match self.changes.get(key) {
@ -92,35 +111,171 @@ impl StoreTx for SledTx {
Ok(())
}
fn range_scan(
&self,
lower: &[u8],
upper: &[u8],
) -> Self::KVIter {
todo!()
fn range_scan(&'a self, lower: &[u8], upper: &[u8]) -> Self::KVIter {
let change_iter = self.changes.range(lower.to_vec()..upper.to_vec()).fuse();
let db_iter = self.db.range(lower..upper).fuse();
SledIter {
change_iter,
db_iter,
change_cache: None,
db_cache: None,
}
}
fn range_scan_raw(&self, lower: &[u8], upper: &[u8]) -> Self::KVIterRaw {
todo!()
fn range_scan_raw(&'a self, lower: &[u8], upper: &[u8]) -> Self::KVIterRaw {
let change_iter = self.changes.range(lower.to_vec()..upper.to_vec()).fuse();
let db_iter = self.db.range(lower..upper).fuse();
SledIterRaw {
change_iter,
db_iter,
change_cache: None,
db_cache: None,
}
}
}
struct SledIter {}
struct SledIter<'a> {
change_iter: Fuse<Range<'a, Vec<u8>, Option<Vec<u8>>>>,
db_iter: Fuse<Iter>,
change_cache: Option<(Vec<u8>, Option<Vec<u8>>)>,
db_cache: Option<(IVec, IVec)>,
}
impl<'a> SledIter<'a> {
fn fill_cache(&mut self) -> Result<()> {
if self.change_cache.is_none() {
if let Some((k, v)) = self.change_iter.next() {
self.change_cache = Some((k.to_vec(), v.clone().into()))
}
}
if self.db_cache.is_none() {
if let Some(res) = self.db_iter.next() {
self.db_cache = Some(res.into_diagnostic()?);
}
}
Ok(())
}
fn next_inner(&mut self) -> Result<Option<Tuple>> {
loop {
self.fill_cache()?;
match (&self.change_cache, &self.db_cache) {
(None, None) => return Ok(None),
(Some((_, None)), None) => {
self.change_cache.take();
continue;
}
(Some((_, Some(_))), None) => {
let (k, sv) = self.change_cache.take().unwrap();
let v = sv.unwrap();
return Ok(Some(decode_tuple_from_kv(&k, &v)));
}
(None, Some(_)) => {
let (k, v) = self.db_cache.take().unwrap();
return Ok(Some(decode_tuple_from_kv(&k, &v)));
}
(Some((ck, _)), Some((dk, _))) => match ck.as_slice().cmp(dk) {
Ordering::Less => {
let (k, sv) = self.change_cache.take().unwrap();
if sv.is_none() {
continue;
} else {
return Ok(Some(decode_tuple_from_kv(&k, &sv.unwrap())));
}
}
Ordering::Greater => {
let (k, v) = self.db_cache.take().unwrap();
return Ok(Some(decode_tuple_from_kv(&k, &v)));
}
Ordering::Equal => {
self.db_cache.take();
continue;
}
},
}
}
}
}
impl Iterator for SledIter {
impl<'a> Iterator for SledIter<'a> {
type Item = Result<Tuple>;
fn next(&mut self) -> Option<Self::Item> {
todo!()
swap_option_result(self.next_inner())
}
}
struct SledIterRaw {}
struct SledIterRaw<'a> {
change_iter: Fuse<Range<'a, Vec<u8>, Option<Vec<u8>>>>,
db_iter: Fuse<Iter>,
change_cache: Option<(Vec<u8>, Option<Vec<u8>>)>,
db_cache: Option<(IVec, IVec)>,
}
impl<'a> SledIterRaw<'a> {
fn fill_cache(&mut self) -> Result<()> {
if self.change_cache.is_none() {
if let Some((k, v)) = self.change_iter.next() {
self.change_cache = Some((k.to_vec(), v.clone().into()))
}
}
if self.db_cache.is_none() {
if let Some(res) = self.db_iter.next() {
self.db_cache = Some(res.into_diagnostic()?);
}
}
Ok(())
}
impl Iterator for SledIterRaw {
fn next_inner(&mut self) -> Result<Option<(Vec<u8>, Vec<u8>)>> {
loop {
self.fill_cache()?;
match (&self.change_cache, &self.db_cache) {
(None, None) => return Ok(None),
(Some((_, None)), None) => {
self.change_cache.take();
continue;
}
(Some((_, Some(_))), None) => {
let (k, sv) = self.change_cache.take().unwrap();
let v = sv.unwrap();
return Ok(Some((k, v)));
}
(None, Some(_)) => {
let (k, v) = self.db_cache.take().unwrap();
return Ok(Some((k.to_vec(), v.to_vec())));
}
(Some((ck, _)), Some((dk, _))) => match ck.as_slice().cmp(dk) {
Ordering::Less => {
let (k, sv) = self.change_cache.take().unwrap();
if sv.is_none() {
continue;
} else {
return Ok(Some((k, sv.unwrap())));
}
}
Ordering::Greater => {
let (k, v) = self.db_cache.take().unwrap();
return Ok(Some((k.to_vec(), v.to_vec())));
}
Ordering::Equal => {
self.db_cache.take();
continue;
}
},
}
}
}
}
impl<'a> Iterator for SledIterRaw<'a> {
type Item = Result<(Vec<u8>, Vec<u8>)>;
fn next(&mut self) -> Option<Self::Item> {
todo!()
swap_option_result(self.next_inner())
}
}
}

Loading…
Cancel
Save