diff --git a/cozorocks/Cargo.toml b/cozorocks/Cargo.toml index 0f6afcd3..9b7bc023 100644 --- a/cozorocks/Cargo.toml +++ b/cozorocks/Cargo.toml @@ -7,7 +7,7 @@ edition = "2021" [dependencies] cxx = "1.0.69" -#tikv-jemalloc-sys = { "version" = "0.5", "features" = ["unprefixed_malloc_on_supported_platforms"] } +miette = "5.3.0" [build-dependencies] cxx-build = "1.0.69" \ No newline at end of file diff --git a/cozorocks/src/bridge/mod.rs b/cozorocks/src/bridge/mod.rs index 74331b45..ae46f76a 100644 --- a/cozorocks/src/bridge/mod.rs +++ b/cozorocks/src/bridge/mod.rs @@ -1,6 +1,10 @@ use std::error::Error; use std::fmt::{Display, Formatter}; +use miette::{Diagnostic, Severity}; + +use crate::StatusSeverity; + pub(crate) mod db; pub(crate) mod iter; pub(crate) mod tx; @@ -120,7 +124,10 @@ pub(crate) mod ffi { fn set_ignore_range_deletions(self: &RawRocksDbBridge, val: bool); fn make_snapshot(self: &RawRocksDbBridge) -> SharedPtr; fn iterator(self: &RawRocksDbBridge) -> UniquePtr; - fn iterator_with_snapshot(self: &RawRocksDbBridge, snapshot: &SnapshotBridge) -> UniquePtr; + fn iterator_with_snapshot( + self: &RawRocksDbBridge, + snapshot: &SnapshotBridge, + ) -> UniquePtr; fn get( self: &RawRocksDbBridge, key: &[u8], @@ -145,23 +152,27 @@ pub(crate) mod ffi { cmp_impl: fn(&[u8], &[u8]) -> i8, ) -> SharedPtr; fn transact(self: &RocksDbBridge) -> UniquePtr; - fn del_range( + fn del_range(self: &RocksDbBridge, lower: &[u8], upper: &[u8], status: &mut RocksDbStatus); + fn compact_range( self: &RocksDbBridge, lower: &[u8], upper: &[u8], status: &mut RocksDbStatus, ); - fn compact_range( + fn get_sst_writer( self: &RocksDbBridge, - lower: &[u8], - upper: &[u8], + path: &str, status: &mut RocksDbStatus, - ); - fn get_sst_writer(self: &RocksDbBridge, path: &str, status: &mut RocksDbStatus) -> UniquePtr; + ) -> UniquePtr; fn ingest_sst(self: &RocksDbBridge, path: &str, status: &mut RocksDbStatus); type SstFileWriterBridge; - fn put(self: Pin<&mut SstFileWriterBridge>, key: &[u8], val: &[u8], status: &mut RocksDbStatus); + fn put( + self: Pin<&mut SstFileWriterBridge>, + key: &[u8], + val: &[u8], + status: &mut RocksDbStatus, + ); fn finish(self: Pin<&mut SstFileWriterBridge>, status: &mut RocksDbStatus); type TxBridge; @@ -240,6 +251,25 @@ impl Display for ffi::RocksDbStatus { } } +impl Diagnostic for ffi::RocksDbStatus { + fn code<'a>(&'a self) -> Option> { + if self.is_ok() { + None + } else { + Some(Box::new(format!( + "cozorocks::{:?}::{:?}", + self.code, self.subcode + ))) + } + } + fn severity(&self) -> Option { + match self.severity { + StatusSeverity::kNoError => None, + _ => Some(Severity::Error), + } + } +} + impl ffi::RocksDbStatus { #[inline(always)] pub fn is_ok(&self) -> bool { diff --git a/src/query/pull.rs b/src/query/pull.rs index 32f553e3..f6549e9a 100644 --- a/src/query/pull.rs +++ b/src/query/pull.rs @@ -72,22 +72,22 @@ impl SessionTx { for data in res_iter { let data = data?; let encoded = data.encode_as_key(view_store.metadata.id); - vtx.del(&encoded).into_diagnostic()?; + vtx.del(&encoded)?; } - vtx.commit().into_diagnostic()?; + vtx.commit()?; } else { let file = NamedTempFile::new().into_diagnostic()?; let path = file.into_temp_path(); let path = path.to_string_lossy(); - let mut writer = self.view_db.get_sst_writer(&path).into_diagnostic()?; + let mut writer = self.view_db.get_sst_writer(&path)?; for data in res_iter { let data = data?; let encoded = data.encode_as_key(view_store.metadata.id); - writer.put(&encoded, &[]).into_diagnostic()?; + writer.put(&encoded, &[])?; } - writer.finish().into_diagnostic()?; - self.view_db.ingest_sst_file(&path).into_diagnostic()?; + writer.finish()?; + self.view_db.ingest_sst_file(&path)?; } Ok(()) } diff --git a/src/runtime/db.rs b/src/runtime/db.rs index d9239d6c..12b80e81 100644 --- a/src/runtime/db.rs +++ b/src/runtime/db.rs @@ -94,8 +94,8 @@ impl Db { .use_capped_prefix_extractor(true, SCRATCH_DB_KEY_PREFIX_LEN) .use_custom_comparator("cozo_rusty_scratch_cmp", rusty_scratch_cmp, false); - let db = db_builder.build().into_diagnostic()?; - let view_db = view_db_builder.build().into_diagnostic()?; + let db = db_builder.build()?; + let view_db = view_db_builder.build()?; let ret = Self { db, @@ -116,14 +116,14 @@ impl Db { pub fn compact_main(&self) -> Result<()> { let l = smallest_key(); let u = largest_key(); - self.db.range_compact(&l, &u).into_diagnostic()?; + self.db.range_compact(&l, &u)?; Ok(()) } pub fn compact_view(&self) -> Result<()> { let l = Tuple::default().encode_as_key(ViewRelId(0)); let u = Tuple(vec![DataValue::Bot]).encode_as_key(ViewRelId(u64::MAX)); - self.db.range_compact(&l, &u).into_diagnostic()?; + self.db.range_compact(&l, &u)?; Ok(()) } @@ -459,8 +459,8 @@ impl Db { } let key = Tuple(ks).encode_as_key(ViewRelId::SYSTEM); let mut vtx = self.view_db.transact().start(); - vtx.put(&key, v).into_diagnostic()?; - vtx.commit().into_diagnostic()?; + vtx.put(&key, v)?; + vtx.commit()?; Ok(()) } pub fn remove_meta_kv(&self, k: &[&str]) -> Result<()> { @@ -470,8 +470,8 @@ impl Db { } let key = Tuple(ks).encode_as_key(ViewRelId::SYSTEM); let mut vtx = self.view_db.transact().start(); - vtx.del(&key).into_diagnostic()?; - vtx.commit().into_diagnostic()?; + vtx.del(&key)?; + vtx.commit()?; Ok(()) } pub fn get_meta_kv(&self, k: &[&str]) -> Result>> { @@ -481,7 +481,7 @@ impl Db { } let key = Tuple(ks).encode_as_key(ViewRelId::SYSTEM); let vtx = self.view_db.transact().start(); - Ok(match vtx.get(&key, false).into_diagnostic()? { + Ok(match vtx.get(&key, false)? { None => None, Some(slice) => Some(slice.to_vec()), }) @@ -516,7 +516,7 @@ impl Db { } else { self.started = true; } - match self.it.pair().into_diagnostic()? { + match self.it.pair()? { None => Ok(None), Some((k_slice, v_slice)) => { let encoded = EncodedTuple(k_slice).decode()?; @@ -562,7 +562,7 @@ impl Db { .start(); it.seek(&lower); let mut collected = vec![]; - while let Some(v_slice) = it.val().into_diagnostic()? { + while let Some(v_slice) = it.val()? { let meta: ViewRelMetadata = rmp_serde::from_slice(v_slice).into_diagnostic()?; let name = meta.name.0; let arity = meta.arity; diff --git a/src/runtime/transact.rs b/src/runtime/transact.rs index 1d38986f..5dfb899d 100644 --- a/src/runtime/transact.rs +++ b/src/runtime/transact.rs @@ -96,7 +96,7 @@ impl SessionTx { let e_upper = encode_sentinel_entity_attr(EntityId::MAX_PERM, AttrId::MIN_PERM); let it = self.bounded_scan_last(&e_lower, &e_upper); - Ok(match it.key().into_diagnostic()? { + Ok(match it.key()? { None => EntityId::MAX_TEMP, Some(data) => EntityId::from_bytes(data), }) @@ -106,7 +106,7 @@ impl SessionTx { let e_lower = encode_sentinel_attr_by_id(AttrId::MIN_PERM); let e_upper = encode_sentinel_attr_by_id(AttrId::MAX_PERM); let it = self.bounded_scan_last(&e_lower, &e_upper); - Ok(match it.key().into_diagnostic()? { + Ok(match it.key()? { None => AttrId::MAX_TEMP, Some(data) => AttrId::from_bytes(data), }) @@ -116,7 +116,7 @@ impl SessionTx { let tuple = Tuple(vec![DataValue::Null]); let t_encoded = tuple.encode_as_key(ViewRelId::SYSTEM); let vtx = self.view_db.transact().start(); - let found = vtx.get(&t_encoded, false).into_diagnostic()?; + let found = vtx.get(&t_encoded, false)?; match found { None => Ok(ViewRelId::SYSTEM), Some(slice) => ViewRelId::raw_decode(&slice), @@ -127,7 +127,7 @@ impl SessionTx { let e_lower = encode_tx(TxId::MAX_USER); let e_upper = encode_tx(TxId::MAX_SYS); let it = self.bounded_scan_first(&e_lower, &e_upper); - Ok(match it.key().into_diagnostic()? { + Ok(match it.key()? { None => TxId::MAX_SYS, Some(data) => TxId::from_bytes(data), }) @@ -138,8 +138,8 @@ impl SessionTx { let encoded = encode_tx(tx_id); let log = TxLog::new(tx_id, comment); - self.tx.put(&encoded, &log.encode()).into_diagnostic()?; - self.tx.commit().into_diagnostic()?; + self.tx.put(&encoded, &log.encode())?; + self.tx.commit()?; if refresh { let new_tx_id = TxId(self.last_tx_id.fetch_add(1, Ordering::AcqRel) + 1); self.tx.set_snapshot(); diff --git a/src/runtime/view.rs b/src/runtime/view.rs index a855093f..21f82750 100644 --- a/src/runtime/view.rs +++ b/src/runtime/view.rs @@ -1,7 +1,7 @@ use std::fmt::{Debug, Formatter}; use std::sync::atomic::Ordering; -use miette::{miette, bail, Result, IntoDiagnostic}; +use miette::{bail, miette, IntoDiagnostic, Result}; use rmp_serde::Serializer; use serde::Serialize; @@ -130,7 +130,7 @@ impl ViewRelIterator { } else { self.started = true; } - Ok(match self.inner.key().into_diagnostic()? { + Ok(match self.inner.key()? { None => None, Some(k_slice) => Some(EncodedTuple(k_slice).decode()?), }) @@ -149,14 +149,14 @@ impl SessionTx { let key = DataValue::Str(name.0.clone()); let encoded = Tuple(vec![key]).encode_as_key(ViewRelId::SYSTEM); let vtx = self.view_db.transact().start(); - Ok(vtx.exists(&encoded, false).into_diagnostic()?) + Ok(vtx.exists(&encoded, false)?) } pub(crate) fn create_view_rel(&self, mut meta: ViewRelMetadata) -> Result { let key = DataValue::Str(meta.name.0.clone()); let encoded = Tuple(vec![key]).encode_as_key(ViewRelId::SYSTEM); let mut vtx = self.view_db.transact().set_snapshot(true).start(); - if vtx.exists(&encoded, true).into_diagnostic()? { + if vtx.exists(&encoded, true)? { bail!( "cannot create view {}: one with the same name already exists", meta.name @@ -164,18 +164,18 @@ impl SessionTx { }; let last_id = self.view_store_id.fetch_add(1, Ordering::SeqCst); meta.id = ViewRelId::new(last_id + 1)?; - vtx.put(&encoded, &meta.id.raw_encode()).into_diagnostic()?; + vtx.put(&encoded, &meta.id.raw_encode())?; let name_key = Tuple(vec![DataValue::Str(meta.name.0.clone())]).encode_as_key(ViewRelId::SYSTEM); let mut meta_val = vec![]; meta.serialize(&mut Serializer::new(&mut meta_val)).unwrap(); - vtx.put(&name_key, &meta_val).into_diagnostic()?; + vtx.put(&name_key, &meta_val)?; let tuple = Tuple(vec![DataValue::Null]); let t_encoded = tuple.encode_as_key(ViewRelId::SYSTEM); - vtx.put(&t_encoded, &meta.id.raw_encode()).into_diagnostic()?; - vtx.commit().into_diagnostic()?; + vtx.put(&t_encoded, &meta.id.raw_encode())?; + vtx.commit()?; Ok(ViewRelStore { view_db: self.view_db.clone(), metadata: meta, @@ -190,7 +190,7 @@ impl SessionTx { let encoded = Tuple(vec![key]).encode_as_key(ViewRelId::SYSTEM); let found = vtx - .get(&encoded, true).into_diagnostic()? + .get(&encoded, true)? .ok_or_else(|| miette!("cannot find stored view {}", name))?; let metadata: ViewRelMetadata = rmp_serde::from_slice(&found).into_diagnostic()?; Ok(ViewRelStore { @@ -203,11 +203,11 @@ impl SessionTx { let store = self.do_get_view_rel(name, &vtx)?; let key = DataValue::Str(name.0.clone()); let encoded = Tuple(vec![key]).encode_as_key(ViewRelId::SYSTEM); - vtx.del(&encoded).into_diagnostic()?; + vtx.del(&encoded)?; let lower_bound = Tuple::default().encode_as_key(store.metadata.id); let upper_bound = Tuple::default().encode_as_key(store.metadata.id.next()?); - self.view_db.range_del(&lower_bound, &upper_bound).into_diagnostic()?; - vtx.commit().into_diagnostic()?; + self.view_db.range_del(&lower_bound, &upper_bound)?; + vtx.commit()?; Ok(()) } } diff --git a/src/transact/meta.rs b/src/transact/meta.rs index 47651a01..0da42308 100644 --- a/src/transact/meta.rs +++ b/src/transact/meta.rs @@ -1,6 +1,6 @@ use std::sync::atomic::Ordering; -use miette::{bail, ensure, miette, IntoDiagnostic, Result}; +use miette::{bail, ensure, miette, Result}; use cozorocks::{DbIter, IterBuilder}; @@ -44,7 +44,7 @@ impl SessionTx { } let anchor = encode_sentinel_attr_by_id(aid); - Ok(match self.tx.get(&anchor, false).into_diagnostic()? { + Ok(match self.tx.get(&anchor, false)? { None => { self.attr_by_id_cache.borrow_mut().insert(aid, None); None @@ -76,7 +76,7 @@ impl SessionTx { } let anchor = encode_sentinel_attr_by_name(name); - Ok(match self.tx.get(&anchor, false).into_diagnostic()? { + Ok(match self.tx.get(&anchor, false)? { None => { self.attr_by_kw_cache .borrow_mut() @@ -153,7 +153,7 @@ impl SessionTx { ); let kw_sentinel = encode_sentinel_attr_by_name(&existing.name); let attr_data = existing.encode_with_op_and_tx(StoreOp::Retract, tx_id); - self.tx.put(&kw_sentinel, &attr_data).into_diagnostic()?; + self.tx.put(&kw_sentinel, &attr_data)?; } self.put_attr(&attr, StoreOp::Assert) } @@ -162,11 +162,11 @@ impl SessionTx { let tx_id = self.get_write_tx_id()?; let attr_data = attr.encode_with_op_and_tx(op, tx_id); let id_encoded = encode_attr_by_id(attr.id, tx_id); - self.tx.put(&id_encoded, &attr_data).into_diagnostic()?; + self.tx.put(&id_encoded, &attr_data)?; let id_sentinel = encode_sentinel_attr_by_id(attr.id); - self.tx.put(&id_sentinel, &attr_data).into_diagnostic()?; + self.tx.put(&id_sentinel, &attr_data)?; let kw_sentinel = encode_sentinel_attr_by_name(&attr.name); - self.tx.put(&kw_sentinel, &attr_data).into_diagnostic()?; + self.tx.put(&kw_sentinel, &attr_data)?; Ok(attr.id) } @@ -210,7 +210,7 @@ impl AttrIter { self.it.next(); } loop { - match self.it.val().into_diagnostic()? { + match self.it.val()? { None => return Ok(None), Some(v) => { let found_op = StoreOp::try_from(v[0])?; diff --git a/src/transact/triple.rs b/src/transact/triple.rs index d10e25b1..a546a89e 100644 --- a/src/transact/triple.rs +++ b/src/transact/triple.rs @@ -1,7 +1,7 @@ use std::collections::BTreeMap; use std::sync::atomic::Ordering; -use miette::{bail, ensure, miette, IntoDiagnostic, Result}; +use miette::{bail, ensure, miette, Result}; use smartstring::{LazyCompact, SmartString}; use cozorocks::{DbIter, IterBuilder}; @@ -145,23 +145,23 @@ impl SessionTx { let aev_encoded = encode_aev_key(attr.id, eid, v_in_key, vld_in_key); if real_delete { - self.tx.del(&aev_encoded).into_diagnostic()?; + self.tx.del(&aev_encoded)?; } else { - self.tx.put(&aev_encoded, &val_encoded).into_diagnostic()?; + self.tx.put(&aev_encoded, &val_encoded)?; } // vae for ref types if attr.val_type.is_ref_type() { let vae_encoded = encode_vae_key(v.get_entity_id()?, attr.id, eid, vld_in_key); if real_delete { - self.tx.del(&vae_encoded).into_diagnostic()?; + self.tx.del(&vae_encoded)?; } else { self.tx .put( &vae_encoded, &DataValue::Guard.encode_with_op_and_tx(op, tx_id), ) - .into_diagnostic()?; + ?; } } @@ -202,7 +202,7 @@ impl SessionTx { v ); } - } else if let Some(v_slice) = self.tx.get(&ave_encoded, false).into_diagnostic()? { + } else if let Some(v_slice) = self.tx.get(&ave_encoded, false)? { let found_eid = decode_value_from_val(&v_slice)?.get_entity_id()?; ensure!( found_eid == eid, @@ -214,11 +214,11 @@ impl SessionTx { } let e_in_val_encoded = eid.as_datavalue().encode_with_op_and_tx(op, tx_id); if real_delete { - self.tx.del(&ave_encoded).into_diagnostic()?; + self.tx.del(&ave_encoded)?; } else { self.tx .put(&ave_encoded, &e_in_val_encoded) - .into_diagnostic()?; + ?; } self.tx @@ -226,7 +226,7 @@ impl SessionTx { &encode_sentinel_attr_val(attr.id, v), &tx_id.bytes_with_op(op), ) - .into_diagnostic()?; + ?; } self.tx @@ -234,7 +234,7 @@ impl SessionTx { &encode_sentinel_entity_attr(eid, attr.id), &tx_id.bytes_with_op(op), ) - .into_diagnostic()?; + ?; Ok(eid) } @@ -289,7 +289,7 @@ impl SessionTx { if let Some(v_slice) = self .bounded_scan_first(&lower, &upper) .val() - .into_diagnostic()? + ? { if StoreOp::try_from(v_slice[0])?.is_assert() { let eid = decode_value(&v_slice[8..])?.get_entity_id()?; @@ -487,7 +487,7 @@ impl TripleAttrEntityIter { } else { self.started = true; } - match self.it.pair().into_diagnostic()? { + match self.it.pair()? { None => Ok(None), Some((k_slice, v_slice)) => { let (aid, eid, _tid) = decode_ae_key(k_slice)?; @@ -534,7 +534,7 @@ impl TripleAttrEntityBeforeIter { fn next_inner(&mut self) -> Result> { loop { self.it.seek(&self.current); - match self.it.pair().into_diagnostic()? { + match self.it.pair()? { None => return Ok(None), Some((k_slice, v_slice)) => { let (aid, eid, tid) = decode_ae_key(k_slice)?; @@ -595,7 +595,7 @@ impl TripleAttrEntityRangeIter { } else { self.started = true; } - match self.it.pair().into_diagnostic()? { + match self.it.pair()? { None => Ok(None), Some((k_slice, v_slice)) => { let (aid, eid, _tid) = decode_ae_key(k_slice)?; @@ -648,7 +648,7 @@ impl TripleAttrEntityRangeBeforeIter { fn next_inner(&mut self) -> Result> { loop { self.it.seek(&self.current); - match self.it.pair().into_diagnostic()? { + match self.it.pair()? { None => return Ok(None), Some((k_slice, v_slice)) => { let (aid, eid, tid) = decode_ae_key(k_slice)?; @@ -712,7 +712,7 @@ impl TripleAttrValueRangeIter { } else { self.started = true; } - match self.it.pair().into_diagnostic()? { + match self.it.pair()? { None => Ok(None), Some((k_slice, v_slice)) => { let (aid, mut eid, _tid) = decode_ae_key(k_slice)?; @@ -766,7 +766,7 @@ impl TripleAttrValueRangeBeforeIter { fn next_inner(&mut self) -> Result> { loop { self.it.seek(&self.current); - match self.it.pair().into_diagnostic()? { + match self.it.pair()? { None => return Ok(None), Some((k_slice, v_slice)) => { let (aid, mut eid, tid) = decode_ae_key(k_slice)?; @@ -824,7 +824,7 @@ impl TripleAttrValueIter { } else { self.started = true; } - match self.it.pair().into_diagnostic()? { + match self.it.pair()? { None => Ok(None), Some((k_slice, v_slice)) => { let (aid, mut eid, _tid) = decode_ae_key(k_slice)?; @@ -871,7 +871,7 @@ impl TripleAttrValueBeforeIter { fn next_inner(&mut self) -> Result> { loop { self.it.seek(&self.current); - match self.it.pair().into_diagnostic()? { + match self.it.pair()? { None => return Ok(None), Some((k_slice, v_slice)) => { let (aid, mut eid, tid) = decode_ae_key(k_slice)?; @@ -930,7 +930,7 @@ impl TripleAttrValueAfterIter { fn next_inner(&mut self) -> Result> { loop { self.it.seek_back(&self.current); - match self.it.pair().into_diagnostic()? { + match self.it.pair()? { None => return Ok(None), Some((k_slice, v_slice)) => { if compare_key(k_slice, &self.lower_bound) == std::cmp::Ordering::Less { @@ -985,7 +985,7 @@ impl TripleValueRefAttrIter { } else { self.started = true; } - match self.it.key().into_diagnostic()? { + match self.it.key()? { None => Ok(None), Some(k_slice) => { let (v_eid, aid, eid, _) = decode_vae_key(k_slice)?; @@ -1028,7 +1028,7 @@ impl TripleValueRefAttrBeforeIter { fn next_inner(&mut self) -> Result> { loop { self.it.seek(&self.current); - match self.it.pair().into_diagnostic()? { + match self.it.pair()? { None => return Ok(None), Some((k_slice, v_slice)) => { let (v_eid, aid, eid, tid) = decode_vae_key(k_slice)?;