Sled engine is ready.

main
Ziyang Hu 2 years ago
parent e270ad8f3c
commit eb608fe548

@ -19,13 +19,15 @@
pub use miette::Error; pub use miette::Error;
pub use runtime::db::Db; pub use runtime::db::Db;
pub use runtime::db::new_cozo_rocksdb; pub use storage::rocks::new_cozo_rocksdb;
pub use storage::rocks::RocksDbStorage; pub use storage::rocks::RocksDbStorage;
pub use storage::sled::new_cozo_sled;
pub use storage::sled::SledStorage;
pub(crate) mod algo; pub(crate) mod algo;
pub(crate) mod data; pub(crate) mod data;
pub(crate) mod parse; pub(crate) mod parse;
pub(crate) mod query; pub(crate) mod query;
pub(crate) mod runtime; pub(crate) mod runtime;
pub(crate) mod utils;
pub(crate) mod storage; pub(crate) mod storage;
pub(crate) mod utils;

@ -4,29 +4,26 @@
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::fmt::{Debug, Formatter}; use std::fmt::{Debug, Formatter};
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::thread;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use std::{fs, thread};
use either::{Left, Right}; use either::{Left, Right};
use itertools::Itertools; use itertools::Itertools;
use lazy_static::lazy_static; use lazy_static::lazy_static;
use miette::{ use miette::{
bail, ensure, miette, Diagnostic, GraphicalReportHandler, GraphicalTheme, IntoDiagnostic, bail, ensure, Diagnostic, GraphicalReportHandler, GraphicalTheme, IntoDiagnostic,
JSONReportHandler, Result, WrapErr, JSONReportHandler, Result, WrapErr,
}; };
use serde_json::{json, Map}; use serde_json::{json, Map};
use smartstring::SmartString; use smartstring::SmartString;
use thiserror::Error; use thiserror::Error;
use cozorocks::DbBuilder;
use crate::data::json::JsonValue; use crate::data::json::JsonValue;
use crate::data::program::{InputProgram, QueryAssertion, RelationOp}; use crate::data::program::{InputProgram, QueryAssertion, RelationOp};
use crate::data::symb::Symbol; use crate::data::symb::Symbol;
use crate::data::tuple::{Tuple, KEY_PREFIX_LEN}; use crate::data::tuple::Tuple;
use crate::data::value::{DataValue, LARGEST_UTF_CHAR}; use crate::data::value::{DataValue, LARGEST_UTF_CHAR};
use crate::parse::sys::SysOp; use crate::parse::sys::SysOp;
use crate::parse::{parse_script, CozoScript, SourceSpan}; use crate::parse::{parse_script, CozoScript, SourceSpan};
@ -36,7 +33,6 @@ use crate::query::relation::{
}; };
use crate::runtime::relation::{RelationHandle, RelationId}; use crate::runtime::relation::{RelationHandle, RelationId};
use crate::runtime::transact::SessionTx; use crate::runtime::transact::SessionTx;
use crate::storage::rocks::RocksDbStorage;
use crate::storage::{Storage, StoreTx}; use crate::storage::{Storage, StoreTx};
struct RunningQueryHandle { struct RunningQueryHandle {
@ -59,11 +55,12 @@ impl Drop for RunningQueryCleanup {
} }
#[derive(serde_derive::Serialize, serde_derive::Deserialize)] #[derive(serde_derive::Serialize, serde_derive::Deserialize)]
pub(crate) struct DbManifest { pub struct DbManifest {
storage_version: u64, pub storage_version: u64,
} }
const CURRENT_STORAGE_VERSION: u64 = 1; // FIXME this should be storage-specific
pub(crate) const CURRENT_STORAGE_VERSION: u64 = 1;
/// The database object of Cozo. /// The database object of Cozo.
#[derive(Clone)] #[derive(Clone)]
@ -86,7 +83,7 @@ where
#[derive(Debug, Diagnostic, Error)] #[derive(Debug, Diagnostic, Error)]
#[error("Initialization of database failed")] #[error("Initialization of database failed")]
#[diagnostic(code(db::init))] #[diagnostic(code(db::init))]
struct BadDbInit(#[help] String); pub(crate) struct BadDbInit(#[help] pub(crate) String);
lazy_static! { lazy_static! {
static ref TEXT_ERR_HANDLER: GraphicalReportHandler = static ref TEXT_ERR_HANDLER: GraphicalReportHandler =
@ -94,64 +91,6 @@ lazy_static! {
static ref JSON_ERR_HANDLER: JSONReportHandler = miette::JSONReportHandler::new(); static ref JSON_ERR_HANDLER: JSONReportHandler = miette::JSONReportHandler::new();
} }
/// Creates a database object.
pub fn new_cozo_rocksdb(path: impl AsRef<str>) -> Result<Db<RocksDbStorage>> {
let builder = DbBuilder::default().path(path.as_ref());
let path = builder.opts.db_path;
fs::create_dir_all(path)
.map_err(|err| BadDbInit(format!("cannot create directory {}: {}", path, err)))?;
let path_buf = PathBuf::from(path);
let is_new = {
let mut manifest_path = path_buf.clone();
manifest_path.push("manifest");
if manifest_path.exists() {
let existing: DbManifest = rmp_serde::from_slice(
&fs::read(manifest_path)
.into_diagnostic()
.wrap_err_with(|| "when reading manifest")?,
)
.into_diagnostic()
.wrap_err_with(|| "when reading manifest")?;
assert_eq!(
existing.storage_version, CURRENT_STORAGE_VERSION,
"Unknown storage version {}",
existing.storage_version
);
false
} else {
fs::write(
manifest_path,
rmp_serde::to_vec_named(&DbManifest {
storage_version: CURRENT_STORAGE_VERSION,
})
.into_diagnostic()
.wrap_err_with(|| "when serializing manifest")?,
)
.into_diagnostic()
.wrap_err_with(|| "when serializing manifest")?;
true
}
};
let mut store_path = path_buf;
store_path.push("data");
let db_builder = builder
.create_if_missing(is_new)
.use_capped_prefix_extractor(true, KEY_PREFIX_LEN)
.use_bloom_filter(true, 9.9, true)
.path(
store_path
.to_str()
.ok_or_else(|| miette!("bad path name"))?,
);
let db = db_builder.build()?;
Db::new(RocksDbStorage::new(db))
}
impl<S> Db<S> impl<S> Db<S>
where where
S: Storage, S: Storage,
@ -271,6 +210,7 @@ where
}; };
let mut res = json!(null); let mut res = json!(null);
let mut cleanups = vec![]; let mut cleanups = vec![];
for p in ps { for p in ps {
let sleep_opt = p.out_opts.sleep; let sleep_opt = p.out_opts.sleep;
let (q_res, q_cleanups) = self.run_query(&mut tx, p)?; let (q_res, q_cleanups) = self.run_query(&mut tx, p)?;

@ -2,14 +2,78 @@
* Copyright 2022, The Cozo Project Authors. Licensed under MPL-2.0. * Copyright 2022, The Cozo Project Authors. Licensed under MPL-2.0.
*/ */
use miette::{IntoDiagnostic, Result}; use std::fs;
use std::path::PathBuf;
use cozorocks::{DbIter, RocksDb, Tx}; use miette::{miette, IntoDiagnostic, Result, WrapErr};
use crate::data::tuple::Tuple; use cozorocks::{DbBuilder, DbIter, RocksDb, Tx};
use crate::data::tuple::{Tuple, KEY_PREFIX_LEN};
use crate::runtime::db::{BadDbInit, DbManifest, CURRENT_STORAGE_VERSION};
use crate::runtime::relation::decode_tuple_from_kv; use crate::runtime::relation::decode_tuple_from_kv;
use crate::storage::{Storage, StoreTx}; use crate::storage::{Storage, StoreTx};
use crate::utils::swap_option_result; use crate::utils::swap_option_result;
use crate::Db;
/// Creates a RocksDB database object.
pub fn new_cozo_rocksdb(path: impl AsRef<str>) -> Result<Db<RocksDbStorage>> {
let builder = DbBuilder::default().path(path.as_ref());
let path = builder.opts.db_path;
fs::create_dir_all(path)
.map_err(|err| BadDbInit(format!("cannot create directory {}: {}", path, err)))?;
let path_buf = PathBuf::from(path);
let is_new = {
let mut manifest_path = path_buf.clone();
manifest_path.push("manifest");
if manifest_path.exists() {
let existing: DbManifest = rmp_serde::from_slice(
&fs::read(manifest_path)
.into_diagnostic()
.wrap_err_with(|| "when reading manifest")?,
)
.into_diagnostic()
.wrap_err_with(|| "when reading manifest")?;
assert_eq!(
existing.storage_version, CURRENT_STORAGE_VERSION,
"Unknown storage version {}",
existing.storage_version
);
false
} else {
fs::write(
manifest_path,
rmp_serde::to_vec_named(&DbManifest {
storage_version: CURRENT_STORAGE_VERSION,
})
.into_diagnostic()
.wrap_err_with(|| "when serializing manifest")?,
)
.into_diagnostic()
.wrap_err_with(|| "when serializing manifest")?;
true
}
};
let mut store_path = path_buf;
store_path.push("data");
let db_builder = builder
.create_if_missing(is_new)
.use_capped_prefix_extractor(true, KEY_PREFIX_LEN)
.use_bloom_filter(true, 9.9, true)
.path(
store_path
.to_str()
.ok_or_else(|| miette!("bad path name"))?,
);
let db = db_builder.build()?;
Db::new(RocksDbStorage::new(db))
}
/// RocksDB storage engine /// RocksDB storage engine
#[derive(Clone)] #[derive(Clone)]

@ -1,291 +1,345 @@
// /* /*
// * Copyright 2022, The Cozo Project Authors. Licensed under MPL-2.0. * Copyright 2022, The Cozo Project Authors. Licensed under MPL-2.0.
// */ */
//
// use std::cmp::Ordering; use std::cmp::Ordering;
// use std::collections::btree_map::Range; use std::iter::Fuse;
// use std::collections::BTreeMap; use std::path::Path;
// use std::iter::Fuse; use std::thread;
// use std::sync::{Arc, RwLock};
// use std::thread; use itertools::Itertools;
// use miette::{IntoDiagnostic, Result};
// use miette::{IntoDiagnostic, Result}; use sled::{Batch, Config, Db, IVec, Iter, Mode};
// use sled::transaction::{ConflictableTransactionError, TransactionalTree};
// use sled::{Db, IVec, Iter}; use crate::data::tuple::Tuple;
// use crate::runtime::relation::decode_tuple_from_kv;
// use crate::data::tuple::Tuple; use crate::storage::{Storage, StoreTx};
// use crate::runtime::relation::decode_tuple_from_kv; use crate::utils::swap_option_result;
// use crate::storage::{Storage, StoreTx};
// use crate::utils::swap_option_result; /// Creates a Sled database object.
// pub fn new_cozo_sled(path: impl AsRef<Path>) -> Result<crate::Db<SledStorage>> {
// #[derive(Clone)] let db = sled::open(path).into_diagnostic()?;
// struct SledStorage { crate::Db::new(SledStorage { db })
// db: Db, }
// }
// /// Storage engine using Sled
// impl Storage for SledStorage { #[derive(Clone)]
// type Tx = SledTx; pub struct SledStorage {
// db: Db,
// fn transact(&self) -> Result<Self::Tx> { }
// Ok(SledTx {
// db: self.db.clone(), const PUT_MARKER: u8 = 1;
// changes: Default::default(), const DEL_MARKER: u8 = 0;
// })
// } impl Storage for SledStorage {
// type Tx = SledTx;
// fn del_range(&self, lower: &[u8], upper: &[u8]) -> Result<()> {
// let db = self.db.clone(); fn transact(&self) -> Result<Self::Tx> {
// let lower_v = lower.to_vec(); Ok(SledTx {
// let upper_v = upper.to_vec(); db: self.db.clone(),
// thread::spawn(move || -> Result<()> { changes: Default::default(),
// for k_res in db.range(lower_v..upper_v).keys() { })
// db.remove(k_res.into_diagnostic()?).into_diagnostic()?; }
// }
// Ok(()) fn del_range(&self, lower: &[u8], upper: &[u8]) -> Result<()> {
// }); let db = self.db.clone();
// Ok(()) let lower_v = lower.to_vec();
// } let upper_v = upper.to_vec();
// thread::spawn(move || -> Result<()> {
// fn range_compact(&self, _lower: &[u8], _upper: &[u8]) -> Result<()> { for k_res in db.range(lower_v..upper_v).keys() {
// Ok(()) db.remove(k_res.into_diagnostic()?).into_diagnostic()?;
// } }
// } Ok(())
// });
// struct SledTx { Ok(())
// db: Db, }
// changes: Arc<RwLock<BTreeMap<Vec<u8>, Option<Vec<u8>>>>>,
// } fn range_compact(&self, _lower: &[u8], _upper: &[u8]) -> Result<()> {
// Ok(())
// impl StoreTx for SledTx { }
// #[inline] }
// fn get(&self, key: &[u8], _for_update: bool) -> Result<Option<Vec<u8>>> {
// Ok(match self.changes.read().unwrap().get(key) { pub struct SledTx {
// Some(Some(val)) => Some(val.clone()), db: Db,
// Some(None) => None, changes: Option<Db>,
// None => { }
// let ret = self.db.get(key).into_diagnostic()?;
// ret.map(|v| v.to_vec()) impl SledTx {
// } fn ensure_changes_db(&mut self) -> Result<()> {
// }) if self.changes.is_none() {
// } let db = Config::new()
// .temporary(true)
// #[inline] .mode(Mode::HighThroughput)
// fn put(&mut self, key: &[u8], val: &[u8]) -> Result<()> { .use_compression(false)
// self.changes.write().unwrap().insert(key.into(), Some(val.into())); .open()
// Ok(()) .into_diagnostic()?;
// } self.changes = Some(db)
// }
// #[inline] Ok(())
// fn del(&mut self, key: &[u8]) -> Result<()> { }
// self.changes.write().unwrap().insert(key.into(), None); }
// Ok(())
// } impl StoreTx for SledTx {
// #[inline]
// #[inline] fn get(&self, key: &[u8], _for_update: bool) -> Result<Option<Vec<u8>>> {
// fn exists(&self, key: &[u8], _for_update: bool) -> Result<bool> { if let Some(changes) = &self.changes {
// Ok(match self.changes.read().unwrap().get(key) { if let Some(val) = changes.get(key).into_diagnostic()? {
// Some(Some(_)) => true, return if val[0] == DEL_MARKER {
// Some(None) => false, Ok(None)
// None => self.db.get(key).into_diagnostic()?.is_some(), } else {
// }) let data = val[1..].to_vec();
// } Ok(Some(data))
// };
// fn commit(&mut self) -> Result<()> { }
// self.db }
// .transaction( let ret = self.db.get(key).into_diagnostic()?;
// |db: &TransactionalTree| -> Result<(), ConflictableTransactionError> { Ok(ret.map(|v| v.to_vec()))
// for (k, v) in self.changes.read().unwrap().iter() { }
// match v {
// None => { #[inline]
// db.remove(k as &[u8])?; fn put(&mut self, key: &[u8], val: &[u8]) -> Result<()> {
// } self.ensure_changes_db()?;
// Some(v) => { let mut val_to_write = Vec::with_capacity(val.len() + 1);
// db.insert(k as &[u8], v as &[u8])?; val_to_write.push(PUT_MARKER);
// } val_to_write.extend_from_slice(val);
// } self.changes
// } .as_mut()
// Ok(()) .unwrap()
// }, .insert(key, val_to_write)
// ) .into_diagnostic()?;
// .into_diagnostic()?; Ok(())
// Ok(()) }
// }
// #[inline]
// fn range_scan(&self, lower: &[u8], upper: &[u8]) -> Box<dyn Iterator<Item = Result<Tuple>>> { fn del(&mut self, key: &[u8]) -> Result<()> {
// let change_iter = self.changes.read().unwrap().range(lower.to_vec()..upper.to_vec()).fuse(); self.ensure_changes_db()?;
// let db_iter = self.db.range(lower..upper).fuse(); let val_to_write = [PUT_MARKER];
// Box::new(SledIter { self.changes
// change_iter, .as_mut()
// db_iter, .unwrap()
// change_cache: None, .insert(key, &val_to_write)
// db_cache: None, .into_diagnostic()?;
// }) Ok(())
// } }
//
// fn range_scan_raw( #[inline]
// &self, fn exists(&self, key: &[u8], _for_update: bool) -> Result<bool> {
// lower: &[u8], if let Some(changes) = &self.changes {
// upper: &[u8], if let Some(val) = changes.get(key).into_diagnostic()? {
// ) -> Box<dyn Iterator<Item = Result<(Vec<u8>, Vec<u8>)>>> { return Ok(val[0] != DEL_MARKER);
// let change_iter = self.changes.read().unwrap().range(lower.to_vec()..upper.to_vec()).fuse(); }
// let db_iter = self.db.range(lower..upper).fuse(); }
// Box::new(SledIterRaw { let ret = self.db.get(key).into_diagnostic()?;
// change_iter, Ok(ret.is_some())
// db_iter, }
// change_cache: None,
// db_cache: None, fn commit(&mut self) -> Result<()> {
// }) if let Some(changes) = &self.changes {
// } let mut batch = Batch::default();
// } for pair in changes.iter() {
// let (k, v) = pair.into_diagnostic()?;
// struct SledIter<'a> { if v[0] == DEL_MARKER {
// change_iter: Fuse<Range<'a, Vec<u8>, Option<Vec<u8>>>>, batch.remove(&k);
// db_iter: Fuse<Iter>, } else {
// change_cache: Option<(Vec<u8>, Option<Vec<u8>>)>, batch.insert(&k, &v[1..]);
// db_cache: Option<(IVec, IVec)>, }
// } }
// self.db.apply_batch(batch).into_diagnostic()?;
// impl<'a> SledIter<'a> { }
// #[inline] Ok(())
// fn fill_cache(&mut self) -> Result<()> { }
// if self.change_cache.is_none() {
// if let Some((k, v)) = self.change_iter.next() { fn range_scan(&self, lower: &[u8], upper: &[u8]) -> Box<dyn Iterator<Item = Result<Tuple>>> {
// self.change_cache = Some((k.to_vec(), v.clone())) if let Some(changes) = &self.changes {
// } let change_iter = changes.range(lower.to_vec()..upper.to_vec()).fuse();
// } let db_iter = self.db.range(lower.to_vec()..upper.to_vec()).fuse();
// Box::new(SledIter {
// if self.db_cache.is_none() { change_iter,
// if let Some(res) = self.db_iter.next() { db_iter,
// self.db_cache = Some(res.into_diagnostic()?); change_cache: None,
// } db_cache: None,
// } })
// } else {
// Ok(()) Box::new(
// } self.db
// .range(lower.to_vec()..upper.to_vec())
// #[inline] .map(|d| d.into_diagnostic())
// fn next_inner(&mut self) -> Result<Option<Tuple>> { .map_ok(|(k, v)| decode_tuple_from_kv(&k, &v)),
// loop { )
// self.fill_cache()?; }
// match (&self.change_cache, &self.db_cache) { }
// (None, None) => return Ok(None),
// (Some((_, None)), None) => { fn range_scan_raw(
// self.change_cache.take(); &self,
// continue; lower: &[u8],
// } upper: &[u8],
// (Some((_, Some(_))), None) => { ) -> Box<dyn Iterator<Item = Result<(Vec<u8>, Vec<u8>)>>> {
// let (k, sv) = self.change_cache.take().unwrap(); if let Some(changes) = &self.changes {
// let v = sv.unwrap(); let change_iter = changes.range(lower.to_vec()..upper.to_vec()).fuse();
// return Ok(Some(decode_tuple_from_kv(&k, &v))); let db_iter = self.db.range(lower.to_vec()..upper.to_vec()).fuse();
// } Box::new(SledIterRaw {
// (None, Some(_)) => { change_iter,
// let (k, v) = self.db_cache.take().unwrap(); db_iter,
// return Ok(Some(decode_tuple_from_kv(&k, &v))); change_cache: None,
// } db_cache: None,
// (Some((ck, _)), Some((dk, _))) => match ck.as_slice().cmp(dk) { })
// Ordering::Less => { } else {
// let (k, sv) = self.change_cache.take().unwrap(); Box::new(
// match sv { self.db
// None => continue, .range(lower.to_vec()..upper.to_vec())
// Some(v) => { .map(|d| d.into_diagnostic())
// return Ok(Some(decode_tuple_from_kv(&k, &v))); .map_ok(|(k, v)| (k.to_vec(), v.to_vec())),
// } )
// } }
// } }
// Ordering::Greater => { }
// let (k, v) = self.db_cache.take().unwrap();
// return Ok(Some(decode_tuple_from_kv(&k, &v))); struct SledIterRaw {
// } change_iter: Fuse<Iter>,
// Ordering::Equal => { db_iter: Fuse<Iter>,
// self.db_cache.take(); change_cache: Option<(IVec, IVec)>,
// continue; db_cache: Option<(IVec, IVec)>,
// } }
// },
// } impl SledIterRaw {
// } #[inline]
// } fn fill_cache(&mut self) -> Result<()> {
// } if self.change_cache.is_none() {
// if let Some(res) = self.change_iter.next() {
// impl<'a> Iterator for SledIter<'a> { self.change_cache = Some(res.into_diagnostic()?)
// type Item = Result<Tuple>; }
// }
// #[inline]
// fn next(&mut self) -> Option<Self::Item> { if self.db_cache.is_none() {
// swap_option_result(self.next_inner()) if let Some(res) = self.db_iter.next() {
// } self.db_cache = Some(res.into_diagnostic()?);
// } }
// }
// struct SledIterRaw<'a> {
// change_iter: Fuse<Range<'a, Vec<u8>, Option<Vec<u8>>>>, Ok(())
// db_iter: Fuse<Iter>, }
// change_cache: Option<(Vec<u8>, Option<Vec<u8>>)>,
// db_cache: Option<(IVec, IVec)>, #[inline]
// } fn next_inner(&mut self) -> Result<Option<(Vec<u8>, Vec<u8>)>> {
// loop {
// impl<'a> SledIterRaw<'a> { self.fill_cache()?;
// #[inline] match (&self.change_cache, &self.db_cache) {
// fn fill_cache(&mut self) -> Result<()> { (None, None) => return Ok(None),
// if self.change_cache.is_none() { (Some(_), None) => {
// if let Some((k, v)) = self.change_iter.next() { let (k, cv) = self.change_cache.take().unwrap();
// self.change_cache = Some((k.to_vec(), v.clone())) if cv[0] == DEL_MARKER {
// } continue;
// } } else {
// return Ok(Some((k.to_vec(), cv[1..].to_vec())));
// if self.db_cache.is_none() { }
// if let Some(res) = self.db_iter.next() { }
// self.db_cache = Some(res.into_diagnostic()?); (None, Some(_)) => {
// } let (k, v) = self.db_cache.take().unwrap();
// } return Ok(Some((k.to_vec(), v.to_vec())));
// }
// Ok(()) (Some((ck, _)), Some((dk, _))) => match ck.cmp(dk) {
// } Ordering::Less => {
// let (k, sv) = self.change_cache.take().unwrap();
// #[inline] if sv[0] == DEL_MARKER {
// fn next_inner(&mut self) -> Result<Option<(Vec<u8>, Vec<u8>)>> { continue;
// loop { } else {
// self.fill_cache()?; return Ok(Some((k.to_vec(), sv[1..].to_vec())));
// match (&self.change_cache, &self.db_cache) { }
// (None, None) => return Ok(None), }
// (Some((_, None)), None) => { Ordering::Greater => {
// self.change_cache.take(); let (k, v) = self.db_cache.take().unwrap();
// continue; return Ok(Some((k.to_vec(), v.to_vec())));
// } }
// (Some((_, Some(_))), None) => { Ordering::Equal => {
// let (k, sv) = self.change_cache.take().unwrap(); self.db_cache.take();
// let v = sv.unwrap(); continue;
// return Ok(Some((k, v))); }
// } },
// (None, Some(_)) => { }
// let (k, v) = self.db_cache.take().unwrap(); }
// return Ok(Some((k.to_vec(), v.to_vec()))); }
// } }
// (Some((ck, _)), Some((dk, _))) => match ck.as_slice().cmp(dk) {
// Ordering::Less => { impl Iterator for SledIterRaw {
// let (k, sv) = self.change_cache.take().unwrap(); type Item = Result<(Vec<u8>, Vec<u8>)>;
// match sv {
// None => continue, #[inline]
// Some(v) => return Ok(Some((k, v))), fn next(&mut self) -> Option<Self::Item> {
// } swap_option_result(self.next_inner())
// } }
// Ordering::Greater => { }
// let (k, v) = self.db_cache.take().unwrap();
// return Ok(Some((k.to_vec(), v.to_vec()))); struct SledIter {
// } change_iter: Fuse<Iter>,
// Ordering::Equal => { db_iter: Fuse<Iter>,
// self.db_cache.take(); change_cache: Option<(IVec, IVec)>,
// continue; db_cache: Option<(IVec, IVec)>,
// } }
// },
// } impl SledIter {
// } #[inline]
// } fn fill_cache(&mut self) -> Result<()> {
// } if self.change_cache.is_none() {
// if let Some(res) = self.change_iter.next() {
// impl<'a> Iterator for SledIterRaw<'a> { self.change_cache = Some(res.into_diagnostic()?)
// type Item = Result<(Vec<u8>, Vec<u8>)>; }
// }
// #[inline]
// fn next(&mut self) -> Option<Self::Item> { if self.db_cache.is_none() {
// swap_option_result(self.next_inner()) if let Some(res) = self.db_iter.next() {
// } self.db_cache = Some(res.into_diagnostic()?);
// } }
}
Ok(())
}
#[inline]
fn next_inner(&mut self) -> Result<Option<Tuple>> {
loop {
self.fill_cache()?;
match (&self.change_cache, &self.db_cache) {
(None, None) => return Ok(None),
(Some(_), None) => {
let (k, cv) = self.change_cache.take().unwrap();
if cv[0] == DEL_MARKER {
continue;
} else {
return Ok(Some(decode_tuple_from_kv(&k, &cv[1..])));
}
}
(None, Some(_)) => {
let (k, v) = self.db_cache.take().unwrap();
return Ok(Some(decode_tuple_from_kv(&k, &v)));
}
(Some((ck, _)), Some((dk, _))) => match ck.cmp(dk) {
Ordering::Less => {
let (k, sv) = self.change_cache.take().unwrap();
if sv[0] == DEL_MARKER {
continue;
} else {
return Ok(Some(decode_tuple_from_kv(&k, &sv[1..])));
}
}
Ordering::Greater => {
let (k, v) = self.db_cache.take().unwrap();
return Ok(Some(decode_tuple_from_kv(&k, &v)));
}
Ordering::Equal => {
self.db_cache.take();
continue;
}
},
}
}
}
}
impl Iterator for SledIter {
type Item = Result<Tuple>;
#[inline]
fn next(&mut self) -> Option<Self::Item> {
swap_option_result(self.next_inner())
}
}

@ -15,6 +15,7 @@ use cozo::{new_cozo_rocksdb, Db};
lazy_static! { lazy_static! {
static ref TEST_DB: Db<RocksDbStorage> = { static ref TEST_DB: Db<RocksDbStorage> = {
env_logger::Builder::from_env(Env::default().default_filter_or("info")).init();
let creation = Instant::now(); let creation = Instant::now();
let path = "_test_air_routes"; let path = "_test_air_routes";
_ = std::fs::remove_dir_all(path); _ = std::fs::remove_dir_all(path);
@ -1474,7 +1475,6 @@ fn longest_routes() {
#[test] #[test]
fn longest_routes_from_each_airports() { fn longest_routes_from_each_airports() {
env_logger::Builder::from_env(Env::default().default_filter_or("info")).init();
check_db(); check_db();
let longest_routes_from_each_airports = Instant::now(); let longest_routes_from_each_airports = Instant::now();

File diff suppressed because it is too large Load Diff
Loading…
Cancel
Save