From 85fd646d44dd37eb0af4c8e557582096627748bf Mon Sep 17 00:00:00 2001 From: Sayan Nandan Date: Fri, 23 Feb 2024 09:42:33 +0530 Subject: [PATCH] Fix bufwriter impl --- .../storage/common/sdss/impls/sdss_r1/rw.rs | 40 +++++++++++++------ .../engine/storage/v2/impls/mdl_journal.rs | 2 - .../storage/v2/impls/tests/model_driver.rs | 2 +- 3 files changed, 29 insertions(+), 15 deletions(-) diff --git a/server/src/engine/storage/common/sdss/impls/sdss_r1/rw.rs b/server/src/engine/storage/common/sdss/impls/sdss_r1/rw.rs index 8fd5b6a2..348bb384 100644 --- a/server/src/engine/storage/common/sdss/impls/sdss_r1/rw.rs +++ b/server/src/engine/storage/common/sdss/impls/sdss_r1/rw.rs @@ -359,6 +359,19 @@ pub struct TrackedWriter< buf: FixedVec, } +impl< + F, + S: FileSpecV1, + const SIZE: usize, + const PANIC_IF_UNFLUSHED: bool, + const CHECKSUM_WRITTEN_IF_BLOCK_ERROR: bool, + > TrackedWriter +{ + fn available_capacity(&self) -> usize { + self.buf.remaining_capacity() + } +} + impl< F, S: FileSpecV1, @@ -492,23 +505,26 @@ impl< } /// Do an untracked write pub fn untracked_write(&mut self, buf: &[u8]) -> RuntimeResult<()> { - if self.buf.at_capacity() { - self.flush_buf()?; + if self.available_capacity() >= buf.len() { + unsafe { + // UNSAFE(@ohsayan): above branch guarantees that we have sufficient space + self.buf.extend_from_slice(buf) + } + return Ok(()); } - let write_size = buf.len().saturating_sub(SIZE); - match self.f_d.fwrite_all_count(&buf[..write_size]) { - (written, r) => { - self.t_cursor += written; - // the buffer was flushed, but we errored here. the caller should be able to track - // the number of bytes that we wrote using the cursor and utilize it for any - // recovery attempts + self.flush_buf()?; + // write whatever capacity exceeds the buffer size + let to_write_cnt = buf.len().saturating_sub(SIZE); + match self.f_d.fwrite_all_count(&buf[..to_write_cnt]) { + (cnt, r) => { + self.t_cursor += cnt; r?; } } + // store remainder in buffer unsafe { - // UNSAFE(@ohsayan): the slice is at most SIZE bytes in length - debug_assert!(buf[write_size..].len() <= SIZE); - self.buf.extend_from_slice(&buf[write_size..]); + // UNSAFE(@ohsayan): above branch guarantees that we have sufficient space + self.buf.extend_from_slice(&buf[to_write_cnt..]) } Ok(()) } diff --git a/server/src/engine/storage/v2/impls/mdl_journal.rs b/server/src/engine/storage/v2/impls/mdl_journal.rs index 2aae8aa9..a4f92542 100644 --- a/server/src/engine/storage/v2/impls/mdl_journal.rs +++ b/server/src/engine/storage/v2/impls/mdl_journal.rs @@ -233,8 +233,6 @@ impl<'a, 'b, Fs: FSInterface> BatchWriter<'a, 'b, Fs> { let delta = me.model.delta_state().__data_delta_dequeue(me.g).unwrap(); match me.step(&delta) { Ok(()) => { - // flush buffer after every delta write - me.row_writer.f.flush_buf()?; applied_deltas.push(delta); i += 1; } diff --git a/server/src/engine/storage/v2/impls/tests/model_driver.rs b/server/src/engine/storage/v2/impls/tests/model_driver.rs index 2188244a..78c7ceeb 100644 --- a/server/src/engine/storage/v2/impls/tests/model_driver.rs +++ b/server/src/engine/storage/v2/impls/tests/model_driver.rs @@ -151,7 +151,7 @@ fn model_data_inserts() { #[test] fn model_data_updates() { - test_utils::with_variable(("model_data_updates", 50), |(log_name, n)| { + test_utils::with_variable(("model_data_updates", 8200), |(log_name, n)| { let key_values = create_test_kv(n); /* - we first open the log and then insert n values