Fix bufwriter impl

next
Sayan Nandan 7 months ago
parent 092d4c8eaa
commit 85fd646d44
No known key found for this signature in database
GPG Key ID: 0EBD769024B24F0A

@ -359,6 +359,19 @@ pub struct TrackedWriter<
buf: FixedVec<u8, SIZE>, buf: FixedVec<u8, SIZE>,
} }
impl<
F,
S: FileSpecV1,
const SIZE: usize,
const PANIC_IF_UNFLUSHED: bool,
const CHECKSUM_WRITTEN_IF_BLOCK_ERROR: bool,
> TrackedWriter<F, S, SIZE, PANIC_IF_UNFLUSHED, CHECKSUM_WRITTEN_IF_BLOCK_ERROR>
{
fn available_capacity(&self) -> usize {
self.buf.remaining_capacity()
}
}
impl< impl<
F, F,
S: FileSpecV1, S: FileSpecV1,
@ -492,23 +505,26 @@ impl<
} }
/// Do an untracked write /// Do an untracked write
pub fn untracked_write(&mut self, buf: &[u8]) -> RuntimeResult<()> { pub fn untracked_write(&mut self, buf: &[u8]) -> RuntimeResult<()> {
if self.buf.at_capacity() { if self.available_capacity() >= buf.len() {
self.flush_buf()?; unsafe {
// UNSAFE(@ohsayan): above branch guarantees that we have sufficient space
self.buf.extend_from_slice(buf)
}
return Ok(());
} }
let write_size = buf.len().saturating_sub(SIZE); self.flush_buf()?;
match self.f_d.fwrite_all_count(&buf[..write_size]) { // write whatever capacity exceeds the buffer size
(written, r) => { let to_write_cnt = buf.len().saturating_sub(SIZE);
self.t_cursor += written; match self.f_d.fwrite_all_count(&buf[..to_write_cnt]) {
// the buffer was flushed, but we errored here. the caller should be able to track (cnt, r) => {
// the number of bytes that we wrote using the cursor and utilize it for any self.t_cursor += cnt;
// recovery attempts
r?; r?;
} }
} }
// store remainder in buffer
unsafe { unsafe {
// UNSAFE(@ohsayan): the slice is at most SIZE bytes in length // UNSAFE(@ohsayan): above branch guarantees that we have sufficient space
debug_assert!(buf[write_size..].len() <= SIZE); self.buf.extend_from_slice(&buf[to_write_cnt..])
self.buf.extend_from_slice(&buf[write_size..]);
} }
Ok(()) Ok(())
} }

@ -233,8 +233,6 @@ impl<'a, 'b, Fs: FSInterface> BatchWriter<'a, 'b, Fs> {
let delta = me.model.delta_state().__data_delta_dequeue(me.g).unwrap(); let delta = me.model.delta_state().__data_delta_dequeue(me.g).unwrap();
match me.step(&delta) { match me.step(&delta) {
Ok(()) => { Ok(()) => {
// flush buffer after every delta write
me.row_writer.f.flush_buf()?;
applied_deltas.push(delta); applied_deltas.push(delta);
i += 1; i += 1;
} }

@ -151,7 +151,7 @@ fn model_data_inserts() {
#[test] #[test]
fn model_data_updates() { fn model_data_updates() {
test_utils::with_variable(("model_data_updates", 50), |(log_name, n)| { test_utils::with_variable(("model_data_updates", 8200), |(log_name, n)| {
let key_values = create_test_kv(n); let key_values = create_test_kv(n);
/* /*
- we first open the log and then insert n values - we first open the log and then insert n values

Loading…
Cancel
Save