Reduce batch metadata size

next
Sayan Nandan 1 year ago
parent 20c937451f
commit 7dff706115
No known key found for this signature in database
GPG Key ID: 42EEDF4AE9D96B54

@ -197,8 +197,14 @@ impl DeltaState {
fn __data_delta_step(&self) -> u64 {
self.data_current_version.fetch_add(1, Ordering::AcqRel)
}
pub fn __data_delta_queue(&self) -> &Queue<DataDelta> {
&self.data_deltas
pub fn __data_delta_dequeue(&self, g: &Guard) -> Option<DataDelta> {
match self.data_deltas.blocking_try_dequeue(g) {
Some(d) => {
self.data_deltas_size.fetch_sub(1, Ordering::Release);
Some(d)
}
None => None,
}
}
}

@ -83,7 +83,6 @@ impl<F: RawFileIOInterface> DataBatchPersistDriver<F> {
// pin model
let irm = model.intent_read_model();
let schema_version = model.delta_state().schema_current_version();
let data_q = model.delta_state().__data_delta_queue();
let g = pin();
// init restore list
let mut restore_list = Vec::new();
@ -92,9 +91,14 @@ impl<F: RawFileIOInterface> DataBatchPersistDriver<F> {
let mut inconsistent_reads = 0;
let mut exec = || -> SDSSResult<()> {
// write batch start
self.write_batch_start(observed_len, schema_version)?;
self.write_batch_start(
observed_len,
schema_version,
model.p_tag().tag_unique(),
irm.fields().len() - 1,
)?;
while i < observed_len {
let delta = data_q.blocking_try_dequeue(&g).unwrap();
let delta = model.delta_state().__data_delta_dequeue(&g).unwrap();
restore_list.push(delta.clone()); // TODO(@ohsayan): avoid this
match delta.change() {
DataDeltaKind::Delete => {
@ -140,18 +144,24 @@ impl<F: RawFileIOInterface> DataBatchPersistDriver<F> {
}
/// Write the batch start block:
/// - Batch start magic
/// - Primary key type
/// - Expected commit
/// - Schema version
/// - Column count
fn write_batch_start(
&mut self,
observed_len: usize,
schema_version: DeltaVersion,
pk_tag: TagUnique,
col_cnt: usize,
) -> Result<(), SDSSError> {
self.f.unfsynced_write(&[MARKER_ACTUAL_BATCH_EVENT])?;
self.f
.unfsynced_write(&[MARKER_ACTUAL_BATCH_EVENT, pk_tag.d()])?;
let observed_len_bytes = observed_len.u64_bytes_le();
self.f.unfsynced_write(&observed_len_bytes)?;
self.f
.unfsynced_write(&schema_version.value_u64().to_le_bytes())?;
self.f.unfsynced_write(&col_cnt.u64_bytes_le())?;
Ok(())
}
/// Append a summary of this batch
@ -266,9 +276,6 @@ impl<F: RawFileIOInterface> DataBatchPersistDriver<F> {
irm: &IRModel,
row_data: &RowData,
) -> SDSSResult<()> {
// nasty hack; we need to avoid the pk
self.f
.unfsynced_write(&(row_data.fields().len()).to_le_bytes())?;
for field_name in irm.fields().stseq_ord_key() {
match row_data.fields().get(field_name) {
Some(cell) => {
@ -280,9 +287,10 @@ impl<F: RawFileIOInterface> DataBatchPersistDriver<F> {
}
Ok(())
}
/// Write the change type and txnid
fn write_batch_item_common_row_data(&mut self, delta: &DataDelta) -> Result<(), SDSSError> {
let p1_dc_pk_ty = [delta.change().value_u8(), delta.row().d_key().tag().d()];
self.f.unfsynced_write(&p1_dc_pk_ty)?;
let change_type = [delta.change().value_u8()];
self.f.unfsynced_write(&change_type)?;
let txn_id = delta.data_version().value_u64().to_le_bytes();
self.f.unfsynced_write(&txn_id)?;
Ok(())

@ -30,20 +30,17 @@ use {
super::{
MARKER_ACTUAL_BATCH_EVENT, MARKER_END_OF_BATCH, MARKER_RECOVERY_EVENT, RECOVERY_THRESHOLD,
},
crate::{
engine::{
core::{index::PrimaryIndexKey, model::Model},
data::{
cell::Datacell,
tag::{CUTag, TagClass, TagUnique},
},
storage::v1::{
inf::PersistTypeDscr,
rw::{RawFileIOInterface, SDSSFileIO, SDSSFileTrackedReader},
SDSSError, SDSSResult,
},
crate::engine::{
core::{index::PrimaryIndexKey, model::Model},
data::{
cell::Datacell,
tag::{CUTag, TagClass, TagUnique},
},
storage::v1::{
inf::PersistTypeDscr,
rw::{RawFileIOInterface, SDSSFileIO, SDSSFileTrackedReader},
SDSSError, SDSSResult,
},
util::copy_slice_to_array,
},
std::mem::ManuallyDrop,
};
@ -119,7 +116,9 @@ impl<F: RawFileIOInterface> DataBatchRestoreDriver<F> {
Self::apply_batch(model, batch)
})
}
pub fn read_all_batches(&mut self) -> SDSSResult<Vec<NormalBatch>> {
pub(in crate::engine::storage::v1) fn read_all_batches(
&mut self,
) -> SDSSResult<Vec<NormalBatch>> {
let mut all_batches = vec![];
self.read_all_batches_and_for_each(|batch| {
all_batches.push(batch);
@ -212,8 +211,7 @@ impl<F: RawFileIOInterface> DataBatchRestoreDriver<F> {
}
}
// read actual commit
let mut actual_commit = [0; sizeof!(u64)];
self.f.read_into_buffer(&mut actual_commit)?;
let actual_commit = self.f.read_u64_le()?;
// find actual checksum
let actual_checksum = self.f.__reset_checksum();
// find hardcoded checksum
@ -224,7 +222,7 @@ impl<F: RawFileIOInterface> DataBatchRestoreDriver<F> {
// move file cursor ahead
self.f.__cursor_ahead_by(sizeof!(u64));
if actual_checksum == u64::from_le_bytes(hardcoded_checksum) {
Ok(u64::from_le_bytes(actual_commit))
Ok(actual_commit)
} else {
Err(SDSSError::DataBatchRestoreCorruptedBatch)
}
@ -249,15 +247,11 @@ impl<F: RawFileIOInterface> DataBatchRestoreDriver<F> {
return Err(SDSSError::DataBatchRestoreCorruptedBatch);
}
}
// we're expecting a "good batch"
let mut batch_size_schema_version = [0; sizeof!(u64, 2)];
self.f.read_into_buffer(&mut batch_size_schema_version)?;
// we have the batch length
let batch_size = u64::from_le_bytes(copy_slice_to_array(&batch_size_schema_version[..8]));
let schema_version =
u64::from_le_bytes(copy_slice_to_array(&batch_size_schema_version[8..]));
// decode batch start block
let batch_start_block = self.read_start_batch_block()?;
let mut processed_in_this_batch = 0;
while (processed_in_this_batch != batch_size) & !self.f.is_eof() {
while (processed_in_this_batch != batch_start_block.expected_commit()) & !self.f.is_eof() {
// decode common row data
let change_type = self.f.read_byte()?;
// now decode event
@ -266,15 +260,15 @@ impl<F: RawFileIOInterface> DataBatchRestoreDriver<F> {
// the file tells us that we've reached the end of this batch; hmmm
return Ok(Batch::FinishedEarly(NormalBatch::new(
this_batch,
schema_version,
batch_start_block.schema_version(),
)));
}
normal_event => {
let (pk_type, txnid) = self.read_normal_event_metadata()?;
let txnid = self.f.read_u64_le()?;
match normal_event {
0 => {
// delete
let pk = self.decode_primary_key(pk_type)?;
let pk = self.decode_primary_key(batch_start_block.pk_tag())?;
this_batch.push(DecodedBatchEvent::new(
txnid,
pk,
@ -285,18 +279,15 @@ impl<F: RawFileIOInterface> DataBatchRestoreDriver<F> {
1 | 2 => {
// insert or update
// get pk
let pk = self.decode_primary_key(pk_type)?;
// get column count
let mut column_count = [0; sizeof!(u64)];
self.f.read_into_buffer(&mut column_count)?;
let mut column_count = u64::from_le_bytes(column_count);
let pk = self.decode_primary_key(batch_start_block.pk_tag())?;
// prepare row
let mut row = vec![];
while column_count != 0 && !self.f.is_eof() {
let mut this_col_cnt = batch_start_block.column_cnt();
while this_col_cnt != 0 && !self.f.is_eof() {
row.push(self.decode_cell()?);
column_count -= 1;
this_col_cnt -= 1;
}
if column_count != 0 {
if this_col_cnt != 0 {
return Err(SDSSError::DataBatchRestoreCorruptedEntry);
}
if change_type == 1 {
@ -321,14 +312,10 @@ impl<F: RawFileIOInterface> DataBatchRestoreDriver<F> {
}
}
}
Ok(Batch::Normal(NormalBatch::new(this_batch, schema_version)))
}
fn read_normal_event_metadata(&mut self) -> Result<(u8, u64), SDSSError> {
let pk_type = self.f.read_byte()?;
let mut txnid = [0; sizeof!(u64)];
self.f.read_into_buffer(&mut txnid)?;
let txnid = u64::from_le_bytes(txnid);
Ok((pk_type, txnid))
Ok(Batch::Normal(NormalBatch::new(
this_batch,
batch_start_block.schema_version(),
)))
}
fn attempt_recover_data_batch(&mut self) -> SDSSResult<()> {
let mut max_threshold = RECOVERY_THRESHOLD;
@ -340,6 +327,49 @@ impl<F: RawFileIOInterface> DataBatchRestoreDriver<F> {
}
Err(SDSSError::DataBatchRestoreCorruptedBatch)
}
fn read_start_batch_block(&mut self) -> SDSSResult<BatchStartBlock> {
let pk_tag = self.f.read_byte()?;
let expected_commit = self.f.read_u64_le()?;
let schema_version = self.f.read_u64_le()?;
let column_cnt = self.f.read_u64_le()?;
Ok(BatchStartBlock::new(
pk_tag,
expected_commit,
schema_version,
column_cnt,
))
}
}
#[derive(Debug, PartialEq)]
struct BatchStartBlock {
pk_tag: u8,
expected_commit: u64,
schema_version: u64,
column_cnt: u64,
}
impl BatchStartBlock {
const fn new(pk_tag: u8, expected_commit: u64, schema_version: u64, column_cnt: u64) -> Self {
Self {
pk_tag,
expected_commit,
schema_version,
column_cnt,
}
}
fn pk_tag(&self) -> u8 {
self.pk_tag
}
fn expected_commit(&self) -> u64 {
self.expected_commit
}
fn schema_version(&self) -> u64 {
self.schema_version
}
fn column_cnt(&self) -> u64 {
self.column_cnt
}
}
impl<F: RawFileIOInterface> DataBatchRestoreDriver<F> {
@ -349,17 +379,15 @@ impl<F: RawFileIOInterface> DataBatchRestoreDriver<F> {
};
Ok(match pk_type {
TagUnique::SignedInt | TagUnique::UnsignedInt => {
let mut chunk = [0; sizeof!(u64)];
self.f.read_into_buffer(&mut chunk)?;
let qw = self.f.read_u64_le()?;
unsafe {
// UNSAFE(@ohsayan): +tagck
PrimaryIndexKey::new_from_qw(pk_type, u64::from_le_bytes(chunk))
PrimaryIndexKey::new_from_qw(pk_type, qw)
}
}
TagUnique::Str | TagUnique::Bin => {
let mut len = [0; sizeof!(u64)];
self.f.read_into_buffer(&mut len)?;
let mut data = vec![0; u64::from_le_bytes(len) as usize];
let len = self.f.read_u64_le()?;
let mut data = vec![0; len as usize];
self.f.read_into_buffer(&mut data)?;
if pk_type == TagUnique::Str {
if core::str::from_utf8(&data).is_err() {
@ -369,11 +397,7 @@ impl<F: RawFileIOInterface> DataBatchRestoreDriver<F> {
unsafe {
// UNSAFE(@ohsayan): +tagck +verityck
let mut md = ManuallyDrop::new(data);
PrimaryIndexKey::new_from_dual(
pk_type,
u64::from_le_bytes(len),
md.as_mut_ptr() as usize,
)
PrimaryIndexKey::new_from_dual(pk_type, len, md.as_mut_ptr() as usize)
}
}
_ => unsafe {
@ -397,18 +421,15 @@ impl<F: RawFileIOInterface> DataBatchRestoreDriver<F> {
Datacell::new_bool(bool == 1)
}
PersistTypeDscr::UnsignedInt | PersistTypeDscr::SignedInt | PersistTypeDscr::Float => {
let mut block = [0; sizeof!(u64)];
self.f.read_into_buffer(&mut block)?;
let qw = self.f.read_u64_le()?;
unsafe {
// UNSAFE(@ohsayan): choosing the correct type and tag
let tc = TagClass::from_raw(cell_type.value_u8() - 1);
Datacell::new_qw(u64::from_le_bytes(block), CUTag::new(tc, tc.tag_unique()))
Datacell::new_qw(qw, CUTag::new(tc, tc.tag_unique()))
}
}
PersistTypeDscr::Str | PersistTypeDscr::Bin => {
let mut len_block = [0; sizeof!(u64)];
self.f.read_into_buffer(&mut len_block)?;
let len = u64::from_le_bytes(len_block) as usize;
let len = self.f.read_u64_le()? as usize;
let mut data = vec![0; len];
self.f.read_into_buffer(&mut data)?;
unsafe {
@ -424,9 +445,7 @@ impl<F: RawFileIOInterface> DataBatchRestoreDriver<F> {
}
}
PersistTypeDscr::List => {
let mut len_block = [0; sizeof!(u64)];
self.f.read_into_buffer(&mut len_block)?;
let len = u64::from_le_bytes(len_block);
let len = self.f.read_u64_le()?;
let mut list = Vec::new();
while !self.f.is_eof() && list.len() as u64 != len {
list.push(self.decode_cell()?);

@ -251,6 +251,19 @@ impl<F: RawFileIOInterface> SDSSFileTrackedReader<F> {
pub fn __cursor_ahead_by(&mut self, sizeof: usize) {
self.pos += sizeof as u64;
}
pub fn read_block<const N: usize>(&mut self) -> SDSSResult<[u8; N]> {
if !self.has_left(N as _) {
return Err(SDSSError::IoError(SysIOError::from(
std::io::ErrorKind::InvalidInput,
)));
}
let mut buf = [0; N];
self.read_into_buffer(&mut buf)?;
Ok(buf)
}
pub fn read_u64_le(&mut self) -> SDSSResult<u64> {
Ok(u64::from_le_bytes(self.read_block()?))
}
}
#[derive(Debug)]

Loading…
Cancel
Save