Flush data delta write buffer to avoid memory blowup

next
Sayan Nandan 7 months ago
parent f4a1823462
commit e28d94efba
No known key found for this signature in database
GPG Key ID: 0EBD769024B24F0A

@ -218,32 +218,25 @@ impl<'a, 'b, Fs: FSInterface> BatchWriter<'a, 'b, Fs> {
>, >,
) -> RuntimeResult<usize> { ) -> RuntimeResult<usize> {
/* /*
go over each delta, check if inconsistent and apply if not. we currently keep a track go over each delta, check if inconsistent and apply if not. if any delta sync fails, we enqueue the delta again.
of applied deltas in a vec which is a TERRIBLY INEFFICENT WAY to do so. Instead we should Since the diffing algorithm will ensure that a stale delta is never written, we don't have to worry about checking
be able to "iterate" on the concurrent queue. Since that demands a proof of correctness, if the delta is stale or not manually. It will be picked up by another collection sequence.
once I do finish implementing it I'll swap it in here. This is the primary source of huge
memory blowup during a batch sync. There are several scopes of improvement, but one that is evident here is to try and use a sequential memory block
rather than remote allocations for the deltas which should theoretically improve performance. But as always, we're bound by
disk I/O so it might not be a major factor.
-- @ohsayan -- @ohsayan
*/ */
let mut me = Self::new(model, g, f)?; let mut me = Self::new(model, g, f)?;
let mut applied_deltas = vec![];
let mut i = 0; let mut i = 0;
while i < expected { while i < expected {
let delta = me.model.delta_state().__data_delta_dequeue(me.g).unwrap(); let delta = me.model.delta_state().__data_delta_dequeue(me.g).unwrap();
match me.step(&delta) { match me.step(&delta) {
Ok(()) => { Ok(()) => i += 1,
applied_deltas.push(delta);
i += 1;
}
Err(e) => { Err(e) => {
// errored, so push everything back in // errored, so push this back in; we have written and flushed all prior deltas
me.model.delta_state().append_new_data_delta(delta, me.g); me.model.delta_state().append_new_data_delta(delta, me.g);
for applied_delta in applied_deltas {
me.model
.delta_state()
.append_new_data_delta(applied_delta, g);
}
return Err(e); return Err(e);
} }
} }
@ -292,6 +285,7 @@ impl<'a, 'b, Fs: FSInterface> BatchWriter<'a, 'b, Fs> {
self.row_writer.write_row_data(self.model, &row_data)?; self.row_writer.write_row_data(self.model, &row_data)?;
} }
} }
self.row_writer.f.flush_buf()?;
self.sync_count += 1; self.sync_count += 1;
Ok(()) Ok(())
} }

Loading…
Cancel
Save