use file ingestion for relation storage

main
Ziyang Hu 2 years ago
parent 94a5972647
commit 9de46e4f20

@ -9,6 +9,7 @@ authors = ["Ziyang Hu"]
[dependencies]
casey = "0.3.3"
tempfile = "3.3.0"
either = "1.7.0"
rand = "0.8.5"
anyhow = "1.0"

@ -311,6 +311,15 @@ impl SstWriter {
Err(status)
}
}
pub fn finish(&mut self) -> Result<(), RocksDbStatus> {
let mut status = RocksDbStatus::default();
self.inner.pin_mut().finish(&mut status);
if status.is_ok() {
Ok(())
} else {
Err(status)
}
}
}
unsafe impl Send for RocksDb {}

@ -162,6 +162,7 @@ pub(crate) mod ffi {
type SstFileWriterBridge;
fn put(self: Pin<&mut SstFileWriterBridge>, key: &[u8], val: &[u8], status: &mut RocksDbStatus);
fn finish(self: Pin<&mut SstFileWriterBridge>, status: &mut RocksDbStatus);
type TxBridge;
// fn get_r_opts(self: Pin<&mut TxBridge>) -> Pin<&mut ReadOptions>;

@ -6,6 +6,8 @@ use itertools::Itertools;
use serde_json::{json, Map};
use smallvec::{smallvec, SmallVec, ToSmallVec};
use tempfile::NamedTempFile;
use crate::data::attr::{Attribute, AttributeCardinality, AttributeTyping};
use crate::data::encode::{
decode_ea_key, decode_value_from_key, decode_value_from_val, encode_eav_key, StorageTag,
@ -110,21 +112,29 @@ impl SessionTx {
);
found
};
let mut vtx = self.view_db.transact().start();
let is_delete = op == ViewOp::Retract;
if op == ViewOp::Retract {
let mut vtx = self.view_db.transact().start();
for data in res_iter {
let data = data?;
let encoded = data.encode_as_key(view_store.metadata.id);
if is_delete {
for data in res_iter {
let data = data?;
let encoded = data.encode_as_key(view_store.metadata.id);
vtx.del(&encoded)?;
} else {
vtx.put(&encoded, &[])?;
}
}
vtx.commit()?;
vtx.commit()?;
} else {
let file = NamedTempFile::new()?;
let path = file.into_temp_path();
let path = path.to_string_lossy();
let mut writer = self.view_db.get_sst_writer(&path)?;
for data in res_iter {
let data = data?;
let encoded = data.encode_as_key(view_store.metadata.id);
writer.put(&encoded, &[])?;
}
writer.finish()?;
self.view_db.ingest_sst_file(&path)?;
}
Ok(())
}
pub(crate) fn run_pull_on_query_results<'a>(

Loading…
Cancel
Save