cozorocks error diagnostics

main
Ziyang Hu 2 years ago
parent 5662e1c93e
commit ec27f75ab1

@ -7,7 +7,7 @@ edition = "2021"
[dependencies]
cxx = "1.0.69"
#tikv-jemalloc-sys = { "version" = "0.5", "features" = ["unprefixed_malloc_on_supported_platforms"] }
miette = "5.3.0"
[build-dependencies]
cxx-build = "1.0.69"

@ -1,6 +1,10 @@
use std::error::Error;
use std::fmt::{Display, Formatter};
use miette::{Diagnostic, Severity};
use crate::StatusSeverity;
pub(crate) mod db;
pub(crate) mod iter;
pub(crate) mod tx;
@ -120,7 +124,10 @@ pub(crate) mod ffi {
fn set_ignore_range_deletions(self: &RawRocksDbBridge, val: bool);
fn make_snapshot(self: &RawRocksDbBridge) -> SharedPtr<SnapshotBridge>;
fn iterator(self: &RawRocksDbBridge) -> UniquePtr<IterBridge>;
fn iterator_with_snapshot(self: &RawRocksDbBridge, snapshot: &SnapshotBridge) -> UniquePtr<IterBridge>;
fn iterator_with_snapshot(
self: &RawRocksDbBridge,
snapshot: &SnapshotBridge,
) -> UniquePtr<IterBridge>;
fn get(
self: &RawRocksDbBridge,
key: &[u8],
@ -145,23 +152,27 @@ pub(crate) mod ffi {
cmp_impl: fn(&[u8], &[u8]) -> i8,
) -> SharedPtr<RocksDbBridge>;
fn transact(self: &RocksDbBridge) -> UniquePtr<TxBridge>;
fn del_range(
fn del_range(self: &RocksDbBridge, lower: &[u8], upper: &[u8], status: &mut RocksDbStatus);
fn compact_range(
self: &RocksDbBridge,
lower: &[u8],
upper: &[u8],
status: &mut RocksDbStatus,
);
fn compact_range(
fn get_sst_writer(
self: &RocksDbBridge,
lower: &[u8],
upper: &[u8],
path: &str,
status: &mut RocksDbStatus,
);
fn get_sst_writer(self: &RocksDbBridge, path: &str, status: &mut RocksDbStatus) -> UniquePtr<SstFileWriterBridge>;
) -> UniquePtr<SstFileWriterBridge>;
fn ingest_sst(self: &RocksDbBridge, path: &str, status: &mut RocksDbStatus);
type SstFileWriterBridge;
fn put(self: Pin<&mut SstFileWriterBridge>, key: &[u8], val: &[u8], status: &mut RocksDbStatus);
fn put(
self: Pin<&mut SstFileWriterBridge>,
key: &[u8],
val: &[u8],
status: &mut RocksDbStatus,
);
fn finish(self: Pin<&mut SstFileWriterBridge>, status: &mut RocksDbStatus);
type TxBridge;
@ -240,6 +251,25 @@ impl Display for ffi::RocksDbStatus {
}
}
impl Diagnostic for ffi::RocksDbStatus {
fn code<'a>(&'a self) -> Option<Box<dyn Display + 'a>> {
if self.is_ok() {
None
} else {
Some(Box::new(format!(
"cozorocks::{:?}::{:?}",
self.code, self.subcode
)))
}
}
fn severity(&self) -> Option<Severity> {
match self.severity {
StatusSeverity::kNoError => None,
_ => Some(Severity::Error),
}
}
}
impl ffi::RocksDbStatus {
#[inline(always)]
pub fn is_ok(&self) -> bool {

@ -72,22 +72,22 @@ impl SessionTx {
for data in res_iter {
let data = data?;
let encoded = data.encode_as_key(view_store.metadata.id);
vtx.del(&encoded).into_diagnostic()?;
vtx.del(&encoded)?;
}
vtx.commit().into_diagnostic()?;
vtx.commit()?;
} else {
let file = NamedTempFile::new().into_diagnostic()?;
let path = file.into_temp_path();
let path = path.to_string_lossy();
let mut writer = self.view_db.get_sst_writer(&path).into_diagnostic()?;
let mut writer = self.view_db.get_sst_writer(&path)?;
for data in res_iter {
let data = data?;
let encoded = data.encode_as_key(view_store.metadata.id);
writer.put(&encoded, &[]).into_diagnostic()?;
writer.put(&encoded, &[])?;
}
writer.finish().into_diagnostic()?;
self.view_db.ingest_sst_file(&path).into_diagnostic()?;
writer.finish()?;
self.view_db.ingest_sst_file(&path)?;
}
Ok(())
}

@ -94,8 +94,8 @@ impl Db {
.use_capped_prefix_extractor(true, SCRATCH_DB_KEY_PREFIX_LEN)
.use_custom_comparator("cozo_rusty_scratch_cmp", rusty_scratch_cmp, false);
let db = db_builder.build().into_diagnostic()?;
let view_db = view_db_builder.build().into_diagnostic()?;
let db = db_builder.build()?;
let view_db = view_db_builder.build()?;
let ret = Self {
db,
@ -116,14 +116,14 @@ impl Db {
pub fn compact_main(&self) -> Result<()> {
let l = smallest_key();
let u = largest_key();
self.db.range_compact(&l, &u).into_diagnostic()?;
self.db.range_compact(&l, &u)?;
Ok(())
}
pub fn compact_view(&self) -> Result<()> {
let l = Tuple::default().encode_as_key(ViewRelId(0));
let u = Tuple(vec![DataValue::Bot]).encode_as_key(ViewRelId(u64::MAX));
self.db.range_compact(&l, &u).into_diagnostic()?;
self.db.range_compact(&l, &u)?;
Ok(())
}
@ -459,8 +459,8 @@ impl Db {
}
let key = Tuple(ks).encode_as_key(ViewRelId::SYSTEM);
let mut vtx = self.view_db.transact().start();
vtx.put(&key, v).into_diagnostic()?;
vtx.commit().into_diagnostic()?;
vtx.put(&key, v)?;
vtx.commit()?;
Ok(())
}
pub fn remove_meta_kv(&self, k: &[&str]) -> Result<()> {
@ -470,8 +470,8 @@ impl Db {
}
let key = Tuple(ks).encode_as_key(ViewRelId::SYSTEM);
let mut vtx = self.view_db.transact().start();
vtx.del(&key).into_diagnostic()?;
vtx.commit().into_diagnostic()?;
vtx.del(&key)?;
vtx.commit()?;
Ok(())
}
pub fn get_meta_kv(&self, k: &[&str]) -> Result<Option<Vec<u8>>> {
@ -481,7 +481,7 @@ impl Db {
}
let key = Tuple(ks).encode_as_key(ViewRelId::SYSTEM);
let vtx = self.view_db.transact().start();
Ok(match vtx.get(&key, false).into_diagnostic()? {
Ok(match vtx.get(&key, false)? {
None => None,
Some(slice) => Some(slice.to_vec()),
})
@ -516,7 +516,7 @@ impl Db {
} else {
self.started = true;
}
match self.it.pair().into_diagnostic()? {
match self.it.pair()? {
None => Ok(None),
Some((k_slice, v_slice)) => {
let encoded = EncodedTuple(k_slice).decode()?;
@ -562,7 +562,7 @@ impl Db {
.start();
it.seek(&lower);
let mut collected = vec![];
while let Some(v_slice) = it.val().into_diagnostic()? {
while let Some(v_slice) = it.val()? {
let meta: ViewRelMetadata = rmp_serde::from_slice(v_slice).into_diagnostic()?;
let name = meta.name.0;
let arity = meta.arity;

@ -96,7 +96,7 @@ impl SessionTx {
let e_upper = encode_sentinel_entity_attr(EntityId::MAX_PERM, AttrId::MIN_PERM);
let it = self.bounded_scan_last(&e_lower, &e_upper);
Ok(match it.key().into_diagnostic()? {
Ok(match it.key()? {
None => EntityId::MAX_TEMP,
Some(data) => EntityId::from_bytes(data),
})
@ -106,7 +106,7 @@ impl SessionTx {
let e_lower = encode_sentinel_attr_by_id(AttrId::MIN_PERM);
let e_upper = encode_sentinel_attr_by_id(AttrId::MAX_PERM);
let it = self.bounded_scan_last(&e_lower, &e_upper);
Ok(match it.key().into_diagnostic()? {
Ok(match it.key()? {
None => AttrId::MAX_TEMP,
Some(data) => AttrId::from_bytes(data),
})
@ -116,7 +116,7 @@ impl SessionTx {
let tuple = Tuple(vec![DataValue::Null]);
let t_encoded = tuple.encode_as_key(ViewRelId::SYSTEM);
let vtx = self.view_db.transact().start();
let found = vtx.get(&t_encoded, false).into_diagnostic()?;
let found = vtx.get(&t_encoded, false)?;
match found {
None => Ok(ViewRelId::SYSTEM),
Some(slice) => ViewRelId::raw_decode(&slice),
@ -127,7 +127,7 @@ impl SessionTx {
let e_lower = encode_tx(TxId::MAX_USER);
let e_upper = encode_tx(TxId::MAX_SYS);
let it = self.bounded_scan_first(&e_lower, &e_upper);
Ok(match it.key().into_diagnostic()? {
Ok(match it.key()? {
None => TxId::MAX_SYS,
Some(data) => TxId::from_bytes(data),
})
@ -138,8 +138,8 @@ impl SessionTx {
let encoded = encode_tx(tx_id);
let log = TxLog::new(tx_id, comment);
self.tx.put(&encoded, &log.encode()).into_diagnostic()?;
self.tx.commit().into_diagnostic()?;
self.tx.put(&encoded, &log.encode())?;
self.tx.commit()?;
if refresh {
let new_tx_id = TxId(self.last_tx_id.fetch_add(1, Ordering::AcqRel) + 1);
self.tx.set_snapshot();

@ -1,7 +1,7 @@
use std::fmt::{Debug, Formatter};
use std::sync::atomic::Ordering;
use miette::{miette, bail, Result, IntoDiagnostic};
use miette::{bail, miette, IntoDiagnostic, Result};
use rmp_serde::Serializer;
use serde::Serialize;
@ -130,7 +130,7 @@ impl ViewRelIterator {
} else {
self.started = true;
}
Ok(match self.inner.key().into_diagnostic()? {
Ok(match self.inner.key()? {
None => None,
Some(k_slice) => Some(EncodedTuple(k_slice).decode()?),
})
@ -149,14 +149,14 @@ impl SessionTx {
let key = DataValue::Str(name.0.clone());
let encoded = Tuple(vec![key]).encode_as_key(ViewRelId::SYSTEM);
let vtx = self.view_db.transact().start();
Ok(vtx.exists(&encoded, false).into_diagnostic()?)
Ok(vtx.exists(&encoded, false)?)
}
pub(crate) fn create_view_rel(&self, mut meta: ViewRelMetadata) -> Result<ViewRelStore> {
let key = DataValue::Str(meta.name.0.clone());
let encoded = Tuple(vec![key]).encode_as_key(ViewRelId::SYSTEM);
let mut vtx = self.view_db.transact().set_snapshot(true).start();
if vtx.exists(&encoded, true).into_diagnostic()? {
if vtx.exists(&encoded, true)? {
bail!(
"cannot create view {}: one with the same name already exists",
meta.name
@ -164,18 +164,18 @@ impl SessionTx {
};
let last_id = self.view_store_id.fetch_add(1, Ordering::SeqCst);
meta.id = ViewRelId::new(last_id + 1)?;
vtx.put(&encoded, &meta.id.raw_encode()).into_diagnostic()?;
vtx.put(&encoded, &meta.id.raw_encode())?;
let name_key =
Tuple(vec![DataValue::Str(meta.name.0.clone())]).encode_as_key(ViewRelId::SYSTEM);
let mut meta_val = vec![];
meta.serialize(&mut Serializer::new(&mut meta_val)).unwrap();
vtx.put(&name_key, &meta_val).into_diagnostic()?;
vtx.put(&name_key, &meta_val)?;
let tuple = Tuple(vec![DataValue::Null]);
let t_encoded = tuple.encode_as_key(ViewRelId::SYSTEM);
vtx.put(&t_encoded, &meta.id.raw_encode()).into_diagnostic()?;
vtx.commit().into_diagnostic()?;
vtx.put(&t_encoded, &meta.id.raw_encode())?;
vtx.commit()?;
Ok(ViewRelStore {
view_db: self.view_db.clone(),
metadata: meta,
@ -190,7 +190,7 @@ impl SessionTx {
let encoded = Tuple(vec![key]).encode_as_key(ViewRelId::SYSTEM);
let found = vtx
.get(&encoded, true).into_diagnostic()?
.get(&encoded, true)?
.ok_or_else(|| miette!("cannot find stored view {}", name))?;
let metadata: ViewRelMetadata = rmp_serde::from_slice(&found).into_diagnostic()?;
Ok(ViewRelStore {
@ -203,11 +203,11 @@ impl SessionTx {
let store = self.do_get_view_rel(name, &vtx)?;
let key = DataValue::Str(name.0.clone());
let encoded = Tuple(vec![key]).encode_as_key(ViewRelId::SYSTEM);
vtx.del(&encoded).into_diagnostic()?;
vtx.del(&encoded)?;
let lower_bound = Tuple::default().encode_as_key(store.metadata.id);
let upper_bound = Tuple::default().encode_as_key(store.metadata.id.next()?);
self.view_db.range_del(&lower_bound, &upper_bound).into_diagnostic()?;
vtx.commit().into_diagnostic()?;
self.view_db.range_del(&lower_bound, &upper_bound)?;
vtx.commit()?;
Ok(())
}
}

@ -1,6 +1,6 @@
use std::sync::atomic::Ordering;
use miette::{bail, ensure, miette, IntoDiagnostic, Result};
use miette::{bail, ensure, miette, Result};
use cozorocks::{DbIter, IterBuilder};
@ -44,7 +44,7 @@ impl SessionTx {
}
let anchor = encode_sentinel_attr_by_id(aid);
Ok(match self.tx.get(&anchor, false).into_diagnostic()? {
Ok(match self.tx.get(&anchor, false)? {
None => {
self.attr_by_id_cache.borrow_mut().insert(aid, None);
None
@ -76,7 +76,7 @@ impl SessionTx {
}
let anchor = encode_sentinel_attr_by_name(name);
Ok(match self.tx.get(&anchor, false).into_diagnostic()? {
Ok(match self.tx.get(&anchor, false)? {
None => {
self.attr_by_kw_cache
.borrow_mut()
@ -153,7 +153,7 @@ impl SessionTx {
);
let kw_sentinel = encode_sentinel_attr_by_name(&existing.name);
let attr_data = existing.encode_with_op_and_tx(StoreOp::Retract, tx_id);
self.tx.put(&kw_sentinel, &attr_data).into_diagnostic()?;
self.tx.put(&kw_sentinel, &attr_data)?;
}
self.put_attr(&attr, StoreOp::Assert)
}
@ -162,11 +162,11 @@ impl SessionTx {
let tx_id = self.get_write_tx_id()?;
let attr_data = attr.encode_with_op_and_tx(op, tx_id);
let id_encoded = encode_attr_by_id(attr.id, tx_id);
self.tx.put(&id_encoded, &attr_data).into_diagnostic()?;
self.tx.put(&id_encoded, &attr_data)?;
let id_sentinel = encode_sentinel_attr_by_id(attr.id);
self.tx.put(&id_sentinel, &attr_data).into_diagnostic()?;
self.tx.put(&id_sentinel, &attr_data)?;
let kw_sentinel = encode_sentinel_attr_by_name(&attr.name);
self.tx.put(&kw_sentinel, &attr_data).into_diagnostic()?;
self.tx.put(&kw_sentinel, &attr_data)?;
Ok(attr.id)
}
@ -210,7 +210,7 @@ impl AttrIter {
self.it.next();
}
loop {
match self.it.val().into_diagnostic()? {
match self.it.val()? {
None => return Ok(None),
Some(v) => {
let found_op = StoreOp::try_from(v[0])?;

@ -1,7 +1,7 @@
use std::collections::BTreeMap;
use std::sync::atomic::Ordering;
use miette::{bail, ensure, miette, IntoDiagnostic, Result};
use miette::{bail, ensure, miette, Result};
use smartstring::{LazyCompact, SmartString};
use cozorocks::{DbIter, IterBuilder};
@ -145,23 +145,23 @@ impl SessionTx {
let aev_encoded = encode_aev_key(attr.id, eid, v_in_key, vld_in_key);
if real_delete {
self.tx.del(&aev_encoded).into_diagnostic()?;
self.tx.del(&aev_encoded)?;
} else {
self.tx.put(&aev_encoded, &val_encoded).into_diagnostic()?;
self.tx.put(&aev_encoded, &val_encoded)?;
}
// vae for ref types
if attr.val_type.is_ref_type() {
let vae_encoded = encode_vae_key(v.get_entity_id()?, attr.id, eid, vld_in_key);
if real_delete {
self.tx.del(&vae_encoded).into_diagnostic()?;
self.tx.del(&vae_encoded)?;
} else {
self.tx
.put(
&vae_encoded,
&DataValue::Guard.encode_with_op_and_tx(op, tx_id),
)
.into_diagnostic()?;
?;
}
}
@ -202,7 +202,7 @@ impl SessionTx {
v
);
}
} else if let Some(v_slice) = self.tx.get(&ave_encoded, false).into_diagnostic()? {
} else if let Some(v_slice) = self.tx.get(&ave_encoded, false)? {
let found_eid = decode_value_from_val(&v_slice)?.get_entity_id()?;
ensure!(
found_eid == eid,
@ -214,11 +214,11 @@ impl SessionTx {
}
let e_in_val_encoded = eid.as_datavalue().encode_with_op_and_tx(op, tx_id);
if real_delete {
self.tx.del(&ave_encoded).into_diagnostic()?;
self.tx.del(&ave_encoded)?;
} else {
self.tx
.put(&ave_encoded, &e_in_val_encoded)
.into_diagnostic()?;
?;
}
self.tx
@ -226,7 +226,7 @@ impl SessionTx {
&encode_sentinel_attr_val(attr.id, v),
&tx_id.bytes_with_op(op),
)
.into_diagnostic()?;
?;
}
self.tx
@ -234,7 +234,7 @@ impl SessionTx {
&encode_sentinel_entity_attr(eid, attr.id),
&tx_id.bytes_with_op(op),
)
.into_diagnostic()?;
?;
Ok(eid)
}
@ -289,7 +289,7 @@ impl SessionTx {
if let Some(v_slice) = self
.bounded_scan_first(&lower, &upper)
.val()
.into_diagnostic()?
?
{
if StoreOp::try_from(v_slice[0])?.is_assert() {
let eid = decode_value(&v_slice[8..])?.get_entity_id()?;
@ -487,7 +487,7 @@ impl TripleAttrEntityIter {
} else {
self.started = true;
}
match self.it.pair().into_diagnostic()? {
match self.it.pair()? {
None => Ok(None),
Some((k_slice, v_slice)) => {
let (aid, eid, _tid) = decode_ae_key(k_slice)?;
@ -534,7 +534,7 @@ impl TripleAttrEntityBeforeIter {
fn next_inner(&mut self) -> Result<Option<(AttrId, EntityId, DataValue)>> {
loop {
self.it.seek(&self.current);
match self.it.pair().into_diagnostic()? {
match self.it.pair()? {
None => return Ok(None),
Some((k_slice, v_slice)) => {
let (aid, eid, tid) = decode_ae_key(k_slice)?;
@ -595,7 +595,7 @@ impl TripleAttrEntityRangeIter {
} else {
self.started = true;
}
match self.it.pair().into_diagnostic()? {
match self.it.pair()? {
None => Ok(None),
Some((k_slice, v_slice)) => {
let (aid, eid, _tid) = decode_ae_key(k_slice)?;
@ -648,7 +648,7 @@ impl TripleAttrEntityRangeBeforeIter {
fn next_inner(&mut self) -> Result<Option<(AttrId, EntityId, DataValue)>> {
loop {
self.it.seek(&self.current);
match self.it.pair().into_diagnostic()? {
match self.it.pair()? {
None => return Ok(None),
Some((k_slice, v_slice)) => {
let (aid, eid, tid) = decode_ae_key(k_slice)?;
@ -712,7 +712,7 @@ impl TripleAttrValueRangeIter {
} else {
self.started = true;
}
match self.it.pair().into_diagnostic()? {
match self.it.pair()? {
None => Ok(None),
Some((k_slice, v_slice)) => {
let (aid, mut eid, _tid) = decode_ae_key(k_slice)?;
@ -766,7 +766,7 @@ impl TripleAttrValueRangeBeforeIter {
fn next_inner(&mut self) -> Result<Option<(AttrId, DataValue, EntityId)>> {
loop {
self.it.seek(&self.current);
match self.it.pair().into_diagnostic()? {
match self.it.pair()? {
None => return Ok(None),
Some((k_slice, v_slice)) => {
let (aid, mut eid, tid) = decode_ae_key(k_slice)?;
@ -824,7 +824,7 @@ impl TripleAttrValueIter {
} else {
self.started = true;
}
match self.it.pair().into_diagnostic()? {
match self.it.pair()? {
None => Ok(None),
Some((k_slice, v_slice)) => {
let (aid, mut eid, _tid) = decode_ae_key(k_slice)?;
@ -871,7 +871,7 @@ impl TripleAttrValueBeforeIter {
fn next_inner(&mut self) -> Result<Option<(AttrId, DataValue, EntityId)>> {
loop {
self.it.seek(&self.current);
match self.it.pair().into_diagnostic()? {
match self.it.pair()? {
None => return Ok(None),
Some((k_slice, v_slice)) => {
let (aid, mut eid, tid) = decode_ae_key(k_slice)?;
@ -930,7 +930,7 @@ impl TripleAttrValueAfterIter {
fn next_inner(&mut self) -> Result<Option<(AttrId, DataValue, EntityId)>> {
loop {
self.it.seek_back(&self.current);
match self.it.pair().into_diagnostic()? {
match self.it.pair()? {
None => return Ok(None),
Some((k_slice, v_slice)) => {
if compare_key(k_slice, &self.lower_bound) == std::cmp::Ordering::Less {
@ -985,7 +985,7 @@ impl TripleValueRefAttrIter {
} else {
self.started = true;
}
match self.it.key().into_diagnostic()? {
match self.it.key()? {
None => Ok(None),
Some(k_slice) => {
let (v_eid, aid, eid, _) = decode_vae_key(k_slice)?;
@ -1028,7 +1028,7 @@ impl TripleValueRefAttrBeforeIter {
fn next_inner(&mut self) -> Result<Option<(EntityId, AttrId, EntityId)>> {
loop {
self.it.seek(&self.current);
match self.it.pair().into_diagnostic()? {
match self.it.pair()? {
None => return Ok(None),
Some((k_slice, v_slice)) => {
let (v_eid, aid, eid, tid) = decode_vae_key(k_slice)?;

Loading…
Cancel
Save