diff --git a/Cargo.toml b/Cargo.toml index 10887bda..c8cd1997 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,6 +9,7 @@ authors = ["Ziyang Hu"] [dependencies] casey = "0.3.3" +tempfile = "3.3.0" either = "1.7.0" rand = "0.8.5" anyhow = "1.0" diff --git a/cozorocks/src/bridge/db.rs b/cozorocks/src/bridge/db.rs index 4ef9f8ac..8652f444 100644 --- a/cozorocks/src/bridge/db.rs +++ b/cozorocks/src/bridge/db.rs @@ -311,6 +311,15 @@ impl SstWriter { Err(status) } } + pub fn finish(&mut self) -> Result<(), RocksDbStatus> { + let mut status = RocksDbStatus::default(); + self.inner.pin_mut().finish(&mut status); + if status.is_ok() { + Ok(()) + } else { + Err(status) + } + } } unsafe impl Send for RocksDb {} diff --git a/cozorocks/src/bridge/mod.rs b/cozorocks/src/bridge/mod.rs index 71231789..74331b45 100644 --- a/cozorocks/src/bridge/mod.rs +++ b/cozorocks/src/bridge/mod.rs @@ -162,6 +162,7 @@ pub(crate) mod ffi { type SstFileWriterBridge; fn put(self: Pin<&mut SstFileWriterBridge>, key: &[u8], val: &[u8], status: &mut RocksDbStatus); + fn finish(self: Pin<&mut SstFileWriterBridge>, status: &mut RocksDbStatus); type TxBridge; // fn get_r_opts(self: Pin<&mut TxBridge>) -> Pin<&mut ReadOptions>; diff --git a/src/query/pull.rs b/src/query/pull.rs index 0fa103c0..7ff86866 100644 --- a/src/query/pull.rs +++ b/src/query/pull.rs @@ -6,6 +6,8 @@ use itertools::Itertools; use serde_json::{json, Map}; use smallvec::{smallvec, SmallVec, ToSmallVec}; +use tempfile::NamedTempFile; + use crate::data::attr::{Attribute, AttributeCardinality, AttributeTyping}; use crate::data::encode::{ decode_ea_key, decode_value_from_key, decode_value_from_val, encode_eav_key, StorageTag, @@ -110,21 +112,29 @@ impl SessionTx { ); found }; - let mut vtx = self.view_db.transact().start(); - - let is_delete = op == ViewOp::Retract; + if op == ViewOp::Retract { + let mut vtx = self.view_db.transact().start(); - for data in res_iter { - let data = data?; - let encoded = data.encode_as_key(view_store.metadata.id); - if is_delete { + for data in res_iter { + let data = data?; + let encoded = data.encode_as_key(view_store.metadata.id); vtx.del(&encoded)?; - } else { - vtx.put(&encoded, &[])?; } - } - vtx.commit()?; + vtx.commit()?; + } else { + let file = NamedTempFile::new()?; + let path = file.into_temp_path(); + let path = path.to_string_lossy(); + let mut writer = self.view_db.get_sst_writer(&path)?; + for data in res_iter { + let data = data?; + let encoded = data.encode_as_key(view_store.metadata.id); + writer.put(&encoded, &[])?; + } + writer.finish()?; + self.view_db.ingest_sst_file(&path)?; + } Ok(()) } pub(crate) fn run_pull_on_query_results<'a>(