From 7dff706115f33874d4710abad4452a00e6f18cfa Mon Sep 17 00:00:00 2001 From: Sayan Nandan Date: Thu, 7 Sep 2023 04:21:13 +0000 Subject: [PATCH] Reduce batch metadata size --- server/src/engine/core/model/delta.rs | 10 +- .../engine/storage/v1/batch_jrnl/persist.rs | 26 ++-- .../engine/storage/v1/batch_jrnl/restore.rs | 147 ++++++++++-------- server/src/engine/storage/v1/rw.rs | 13 ++ 4 files changed, 121 insertions(+), 75 deletions(-) diff --git a/server/src/engine/core/model/delta.rs b/server/src/engine/core/model/delta.rs index d3c6720b..245d9542 100644 --- a/server/src/engine/core/model/delta.rs +++ b/server/src/engine/core/model/delta.rs @@ -197,8 +197,14 @@ impl DeltaState { fn __data_delta_step(&self) -> u64 { self.data_current_version.fetch_add(1, Ordering::AcqRel) } - pub fn __data_delta_queue(&self) -> &Queue { - &self.data_deltas + pub fn __data_delta_dequeue(&self, g: &Guard) -> Option { + match self.data_deltas.blocking_try_dequeue(g) { + Some(d) => { + self.data_deltas_size.fetch_sub(1, Ordering::Release); + Some(d) + } + None => None, + } } } diff --git a/server/src/engine/storage/v1/batch_jrnl/persist.rs b/server/src/engine/storage/v1/batch_jrnl/persist.rs index 1dcf824e..b9e99ec0 100644 --- a/server/src/engine/storage/v1/batch_jrnl/persist.rs +++ b/server/src/engine/storage/v1/batch_jrnl/persist.rs @@ -83,7 +83,6 @@ impl DataBatchPersistDriver { // pin model let irm = model.intent_read_model(); let schema_version = model.delta_state().schema_current_version(); - let data_q = model.delta_state().__data_delta_queue(); let g = pin(); // init restore list let mut restore_list = Vec::new(); @@ -92,9 +91,14 @@ impl DataBatchPersistDriver { let mut inconsistent_reads = 0; let mut exec = || -> SDSSResult<()> { // write batch start - self.write_batch_start(observed_len, schema_version)?; + self.write_batch_start( + observed_len, + schema_version, + model.p_tag().tag_unique(), + irm.fields().len() - 1, + )?; while i < observed_len { - let delta = data_q.blocking_try_dequeue(&g).unwrap(); + let delta = model.delta_state().__data_delta_dequeue(&g).unwrap(); restore_list.push(delta.clone()); // TODO(@ohsayan): avoid this match delta.change() { DataDeltaKind::Delete => { @@ -140,18 +144,24 @@ impl DataBatchPersistDriver { } /// Write the batch start block: /// - Batch start magic + /// - Primary key type /// - Expected commit /// - Schema version + /// - Column count fn write_batch_start( &mut self, observed_len: usize, schema_version: DeltaVersion, + pk_tag: TagUnique, + col_cnt: usize, ) -> Result<(), SDSSError> { - self.f.unfsynced_write(&[MARKER_ACTUAL_BATCH_EVENT])?; + self.f + .unfsynced_write(&[MARKER_ACTUAL_BATCH_EVENT, pk_tag.d()])?; let observed_len_bytes = observed_len.u64_bytes_le(); self.f.unfsynced_write(&observed_len_bytes)?; self.f .unfsynced_write(&schema_version.value_u64().to_le_bytes())?; + self.f.unfsynced_write(&col_cnt.u64_bytes_le())?; Ok(()) } /// Append a summary of this batch @@ -266,9 +276,6 @@ impl DataBatchPersistDriver { irm: &IRModel, row_data: &RowData, ) -> SDSSResult<()> { - // nasty hack; we need to avoid the pk - self.f - .unfsynced_write(&(row_data.fields().len()).to_le_bytes())?; for field_name in irm.fields().stseq_ord_key() { match row_data.fields().get(field_name) { Some(cell) => { @@ -280,9 +287,10 @@ impl DataBatchPersistDriver { } Ok(()) } + /// Write the change type and txnid fn write_batch_item_common_row_data(&mut self, delta: &DataDelta) -> Result<(), SDSSError> { - let p1_dc_pk_ty = [delta.change().value_u8(), delta.row().d_key().tag().d()]; - self.f.unfsynced_write(&p1_dc_pk_ty)?; + let change_type = [delta.change().value_u8()]; + self.f.unfsynced_write(&change_type)?; let txn_id = delta.data_version().value_u64().to_le_bytes(); self.f.unfsynced_write(&txn_id)?; Ok(()) diff --git a/server/src/engine/storage/v1/batch_jrnl/restore.rs b/server/src/engine/storage/v1/batch_jrnl/restore.rs index 7e54fe6d..213c7a1c 100644 --- a/server/src/engine/storage/v1/batch_jrnl/restore.rs +++ b/server/src/engine/storage/v1/batch_jrnl/restore.rs @@ -30,20 +30,17 @@ use { super::{ MARKER_ACTUAL_BATCH_EVENT, MARKER_END_OF_BATCH, MARKER_RECOVERY_EVENT, RECOVERY_THRESHOLD, }, - crate::{ - engine::{ - core::{index::PrimaryIndexKey, model::Model}, - data::{ - cell::Datacell, - tag::{CUTag, TagClass, TagUnique}, - }, - storage::v1::{ - inf::PersistTypeDscr, - rw::{RawFileIOInterface, SDSSFileIO, SDSSFileTrackedReader}, - SDSSError, SDSSResult, - }, + crate::engine::{ + core::{index::PrimaryIndexKey, model::Model}, + data::{ + cell::Datacell, + tag::{CUTag, TagClass, TagUnique}, + }, + storage::v1::{ + inf::PersistTypeDscr, + rw::{RawFileIOInterface, SDSSFileIO, SDSSFileTrackedReader}, + SDSSError, SDSSResult, }, - util::copy_slice_to_array, }, std::mem::ManuallyDrop, }; @@ -119,7 +116,9 @@ impl DataBatchRestoreDriver { Self::apply_batch(model, batch) }) } - pub fn read_all_batches(&mut self) -> SDSSResult> { + pub(in crate::engine::storage::v1) fn read_all_batches( + &mut self, + ) -> SDSSResult> { let mut all_batches = vec![]; self.read_all_batches_and_for_each(|batch| { all_batches.push(batch); @@ -212,8 +211,7 @@ impl DataBatchRestoreDriver { } } // read actual commit - let mut actual_commit = [0; sizeof!(u64)]; - self.f.read_into_buffer(&mut actual_commit)?; + let actual_commit = self.f.read_u64_le()?; // find actual checksum let actual_checksum = self.f.__reset_checksum(); // find hardcoded checksum @@ -224,7 +222,7 @@ impl DataBatchRestoreDriver { // move file cursor ahead self.f.__cursor_ahead_by(sizeof!(u64)); if actual_checksum == u64::from_le_bytes(hardcoded_checksum) { - Ok(u64::from_le_bytes(actual_commit)) + Ok(actual_commit) } else { Err(SDSSError::DataBatchRestoreCorruptedBatch) } @@ -249,15 +247,11 @@ impl DataBatchRestoreDriver { return Err(SDSSError::DataBatchRestoreCorruptedBatch); } } - // we're expecting a "good batch" - let mut batch_size_schema_version = [0; sizeof!(u64, 2)]; - self.f.read_into_buffer(&mut batch_size_schema_version)?; - // we have the batch length - let batch_size = u64::from_le_bytes(copy_slice_to_array(&batch_size_schema_version[..8])); - let schema_version = - u64::from_le_bytes(copy_slice_to_array(&batch_size_schema_version[8..])); + // decode batch start block + let batch_start_block = self.read_start_batch_block()?; + let mut processed_in_this_batch = 0; - while (processed_in_this_batch != batch_size) & !self.f.is_eof() { + while (processed_in_this_batch != batch_start_block.expected_commit()) & !self.f.is_eof() { // decode common row data let change_type = self.f.read_byte()?; // now decode event @@ -266,15 +260,15 @@ impl DataBatchRestoreDriver { // the file tells us that we've reached the end of this batch; hmmm return Ok(Batch::FinishedEarly(NormalBatch::new( this_batch, - schema_version, + batch_start_block.schema_version(), ))); } normal_event => { - let (pk_type, txnid) = self.read_normal_event_metadata()?; + let txnid = self.f.read_u64_le()?; match normal_event { 0 => { // delete - let pk = self.decode_primary_key(pk_type)?; + let pk = self.decode_primary_key(batch_start_block.pk_tag())?; this_batch.push(DecodedBatchEvent::new( txnid, pk, @@ -285,18 +279,15 @@ impl DataBatchRestoreDriver { 1 | 2 => { // insert or update // get pk - let pk = self.decode_primary_key(pk_type)?; - // get column count - let mut column_count = [0; sizeof!(u64)]; - self.f.read_into_buffer(&mut column_count)?; - let mut column_count = u64::from_le_bytes(column_count); + let pk = self.decode_primary_key(batch_start_block.pk_tag())?; // prepare row let mut row = vec![]; - while column_count != 0 && !self.f.is_eof() { + let mut this_col_cnt = batch_start_block.column_cnt(); + while this_col_cnt != 0 && !self.f.is_eof() { row.push(self.decode_cell()?); - column_count -= 1; + this_col_cnt -= 1; } - if column_count != 0 { + if this_col_cnt != 0 { return Err(SDSSError::DataBatchRestoreCorruptedEntry); } if change_type == 1 { @@ -321,14 +312,10 @@ impl DataBatchRestoreDriver { } } } - Ok(Batch::Normal(NormalBatch::new(this_batch, schema_version))) - } - fn read_normal_event_metadata(&mut self) -> Result<(u8, u64), SDSSError> { - let pk_type = self.f.read_byte()?; - let mut txnid = [0; sizeof!(u64)]; - self.f.read_into_buffer(&mut txnid)?; - let txnid = u64::from_le_bytes(txnid); - Ok((pk_type, txnid)) + Ok(Batch::Normal(NormalBatch::new( + this_batch, + batch_start_block.schema_version(), + ))) } fn attempt_recover_data_batch(&mut self) -> SDSSResult<()> { let mut max_threshold = RECOVERY_THRESHOLD; @@ -340,6 +327,49 @@ impl DataBatchRestoreDriver { } Err(SDSSError::DataBatchRestoreCorruptedBatch) } + fn read_start_batch_block(&mut self) -> SDSSResult { + let pk_tag = self.f.read_byte()?; + let expected_commit = self.f.read_u64_le()?; + let schema_version = self.f.read_u64_le()?; + let column_cnt = self.f.read_u64_le()?; + Ok(BatchStartBlock::new( + pk_tag, + expected_commit, + schema_version, + column_cnt, + )) + } +} + +#[derive(Debug, PartialEq)] +struct BatchStartBlock { + pk_tag: u8, + expected_commit: u64, + schema_version: u64, + column_cnt: u64, +} + +impl BatchStartBlock { + const fn new(pk_tag: u8, expected_commit: u64, schema_version: u64, column_cnt: u64) -> Self { + Self { + pk_tag, + expected_commit, + schema_version, + column_cnt, + } + } + fn pk_tag(&self) -> u8 { + self.pk_tag + } + fn expected_commit(&self) -> u64 { + self.expected_commit + } + fn schema_version(&self) -> u64 { + self.schema_version + } + fn column_cnt(&self) -> u64 { + self.column_cnt + } } impl DataBatchRestoreDriver { @@ -349,17 +379,15 @@ impl DataBatchRestoreDriver { }; Ok(match pk_type { TagUnique::SignedInt | TagUnique::UnsignedInt => { - let mut chunk = [0; sizeof!(u64)]; - self.f.read_into_buffer(&mut chunk)?; + let qw = self.f.read_u64_le()?; unsafe { // UNSAFE(@ohsayan): +tagck - PrimaryIndexKey::new_from_qw(pk_type, u64::from_le_bytes(chunk)) + PrimaryIndexKey::new_from_qw(pk_type, qw) } } TagUnique::Str | TagUnique::Bin => { - let mut len = [0; sizeof!(u64)]; - self.f.read_into_buffer(&mut len)?; - let mut data = vec![0; u64::from_le_bytes(len) as usize]; + let len = self.f.read_u64_le()?; + let mut data = vec![0; len as usize]; self.f.read_into_buffer(&mut data)?; if pk_type == TagUnique::Str { if core::str::from_utf8(&data).is_err() { @@ -369,11 +397,7 @@ impl DataBatchRestoreDriver { unsafe { // UNSAFE(@ohsayan): +tagck +verityck let mut md = ManuallyDrop::new(data); - PrimaryIndexKey::new_from_dual( - pk_type, - u64::from_le_bytes(len), - md.as_mut_ptr() as usize, - ) + PrimaryIndexKey::new_from_dual(pk_type, len, md.as_mut_ptr() as usize) } } _ => unsafe { @@ -397,18 +421,15 @@ impl DataBatchRestoreDriver { Datacell::new_bool(bool == 1) } PersistTypeDscr::UnsignedInt | PersistTypeDscr::SignedInt | PersistTypeDscr::Float => { - let mut block = [0; sizeof!(u64)]; - self.f.read_into_buffer(&mut block)?; + let qw = self.f.read_u64_le()?; unsafe { // UNSAFE(@ohsayan): choosing the correct type and tag let tc = TagClass::from_raw(cell_type.value_u8() - 1); - Datacell::new_qw(u64::from_le_bytes(block), CUTag::new(tc, tc.tag_unique())) + Datacell::new_qw(qw, CUTag::new(tc, tc.tag_unique())) } } PersistTypeDscr::Str | PersistTypeDscr::Bin => { - let mut len_block = [0; sizeof!(u64)]; - self.f.read_into_buffer(&mut len_block)?; - let len = u64::from_le_bytes(len_block) as usize; + let len = self.f.read_u64_le()? as usize; let mut data = vec![0; len]; self.f.read_into_buffer(&mut data)?; unsafe { @@ -424,9 +445,7 @@ impl DataBatchRestoreDriver { } } PersistTypeDscr::List => { - let mut len_block = [0; sizeof!(u64)]; - self.f.read_into_buffer(&mut len_block)?; - let len = u64::from_le_bytes(len_block); + let len = self.f.read_u64_le()?; let mut list = Vec::new(); while !self.f.is_eof() && list.len() as u64 != len { list.push(self.decode_cell()?); diff --git a/server/src/engine/storage/v1/rw.rs b/server/src/engine/storage/v1/rw.rs index a21b2b5e..8c6f87dd 100644 --- a/server/src/engine/storage/v1/rw.rs +++ b/server/src/engine/storage/v1/rw.rs @@ -251,6 +251,19 @@ impl SDSSFileTrackedReader { pub fn __cursor_ahead_by(&mut self, sizeof: usize) { self.pos += sizeof as u64; } + pub fn read_block(&mut self) -> SDSSResult<[u8; N]> { + if !self.has_left(N as _) { + return Err(SDSSError::IoError(SysIOError::from( + std::io::ErrorKind::InvalidInput, + ))); + } + let mut buf = [0; N]; + self.read_into_buffer(&mut buf)?; + Ok(buf) + } + pub fn read_u64_le(&mut self) -> SDSSResult { + Ok(u64::from_le_bytes(self.read_block()?)) + } } #[derive(Debug)]