Add tracked writer impls

next
Sayan Nandan 8 months ago
parent c5044c592f
commit c581e47558
No known key found for this signature in database
GPG Key ID: 0EBD769024B24F0A

@ -187,8 +187,6 @@ enumerate_err! {
HeaderDecodeVersionMismatch = "header-version-mismatch",
/// The entire header is corrupted
HeaderDecodeCorruptedHeader = "header-corrupted",
/// Expected header values were not matched with the current header
HeaderDecodeDataMismatch = "header-data-mismatch",
// journal
/// While attempting to handle a basic failure (such as adding a journal entry), the recovery engine ran into an exceptional
/// situation where it failed to make a necessary repair the log

@ -432,3 +432,11 @@ macro_rules! okay {
$(($expr) &)*true
}
}
#[cfg(test)]
macro_rules! closure {
($($f:tt)*) => {{
let f = || { $($f)* };
f()
}}
}

@ -0,0 +1,227 @@
/*
* Created on Sat Jan 20 2024
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2024, Sayan Nandan <nandansayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
#![allow(dead_code)]
use {
super::unsafe_apis,
std::{
fmt, ops,
ptr::{self, NonNull},
slice,
},
};
/// A fixed capacity vector
///
/// - This is useful for situations where the stack is heavily used (such as during a recursive call, or too
/// many stack variables) so much that even though we have a fixed capacity, pushing an array on the stack
/// would cause an overflow.
/// - Also useful when it doesn't make sense to use the stack at all
pub struct FixedVec<T, const CAP: usize> {
p: NonNull<T>,
l: usize,
}
impl<T, const CAP: usize> Default for FixedVec<T, CAP> {
fn default() -> Self {
Self::allocate()
}
}
impl<T, const CAP: usize> FixedVec<T, CAP> {
const IS_ZERO: () = assert!(CAP != 0);
pub fn allocate() -> Self {
let _ = Self::IS_ZERO;
Self {
p: unsafe {
// UNSAFE(@ohsayan): simple malloc
NonNull::new_unchecked(unsafe_apis::alloc_array(CAP))
},
l: 0,
}
}
pub fn len(&self) -> usize {
self.l
}
pub fn remaining_capacity(&self) -> usize {
CAP - self.len()
}
pub fn at_capacity(&self) -> bool {
self.remaining_capacity() == 0
}
pub unsafe fn set_len(&mut self, l: usize) {
self.l = l;
}
pub unsafe fn decr_len_by(&mut self, by: usize) {
self.set_len(self.len() - by)
}
}
impl<T, const CAP: usize> FixedVec<T, CAP> {
pub fn try_push(&mut self, v: T)
where
T: fmt::Debug,
{
self.try_push_result(v).unwrap()
}
pub fn try_push_result(&mut self, v: T) -> Result<(), T> {
if self.remaining_capacity() == 0 {
Err(v)
} else {
unsafe {
// UNSAFE(@ohsayan): verified capacity
self.push(v);
Ok(())
}
}
}
pub unsafe fn extend_from_slice(&mut self, block: &[T])
where
T: Copy,
{
debug_assert!(block.len() <= self.remaining_capacity(), "reached capacity");
ptr::copy_nonoverlapping(block.as_ptr(), self.cptr(), block.len());
self.l += block.len();
}
pub unsafe fn push(&mut self, v: T) {
debug_assert_ne!(self.remaining_capacity(), 0, "reached capacity");
self.cptr().write(v);
self.l += 1;
}
pub fn clear(&mut self) {
unsafe {
// UNSAFE(@ohsayan): completely fine as we have the correct length
unsafe_apis::drop_slice_in_place_ref(self.slice_mut());
self.l = 0;
}
}
pub unsafe fn clear_start(&mut self, cnt: usize) {
debug_assert!(cnt < self.len(), "`cnt` is greater than vector length");
// drop
unsafe_apis::drop_slice_in_place(self.p.as_ptr(), cnt);
// move block
ptr::copy(self.p.as_ptr().add(cnt), self.p.as_ptr(), self.l - cnt);
self.l -= cnt;
}
}
impl<T, const CAP: usize> Drop for FixedVec<T, CAP> {
fn drop(&mut self) {
// dtor
self.clear();
unsafe {
// UNSAFE(@ohsayan): dealloc
unsafe_apis::dealloc_array(self.p.as_ptr(), self.len());
}
}
}
impl<T, const CAP: usize> FixedVec<T, CAP> {
unsafe fn cptr(&self) -> *mut T {
self.p.as_ptr().add(self.l)
}
fn slice(&self) -> &[T] {
unsafe {
// UNSAFE(@ohsayan): correct ptrs and len based on push impl and clear impls
slice::from_raw_parts(self.p.as_ptr(), self.l)
}
}
fn slice_mut(&mut self) -> &mut [T] {
unsafe {
// UNSAFE(@ohsayan): correct ptrs and len based on push impl and clear impls
slice::from_raw_parts_mut(self.p.as_ptr(), self.l)
}
}
}
impl<T: PartialEq, A, const CAP: usize> PartialEq<A> for FixedVec<T, CAP>
where
A: ops::Deref<Target = [T]>,
{
fn eq(&self, other: &A) -> bool {
self.slice() == ops::Deref::deref(other)
}
}
impl<T, const CAP: usize> ops::Deref for FixedVec<T, CAP> {
type Target = [T];
fn deref(&self) -> &Self::Target {
self.slice()
}
}
impl<T, const CAP: usize> ops::DerefMut for FixedVec<T, CAP> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.slice_mut()
}
}
impl<T: fmt::Debug, const CAP: usize> fmt::Debug for FixedVec<T, CAP> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_list().entries(self.slice()).finish()
}
}
#[test]
fn empty() {
let x = FixedVec::<String, 100>::allocate();
drop(x);
}
#[test]
fn push_clear() {
let mut x: FixedVec<_, 100> = FixedVec::allocate();
for v in 0..50 {
x.try_push(format!("{v}"));
}
assert_eq!(
x,
(0..50)
.into_iter()
.map(|v| format!("{v}"))
.collect::<Vec<String>>()
);
assert_eq!(x.len(), 50);
}
#[test]
fn clear_range() {
let mut x: FixedVec<_, 100> = FixedVec::allocate();
for v in 0..100 {
x.try_push(format!("{v}"));
}
assert_eq!(x.len(), 100);
unsafe { x.clear_start(50) }
assert_eq!(
x,
(50..100)
.into_iter()
.map(|i| ToString::to_string(&i))
.collect::<Vec<String>>()
);
assert_eq!(x.len(), 50);
}

@ -25,6 +25,7 @@
*/
mod astr;
pub mod fixed_vec;
mod ll;
mod numbuf;
mod rawslice;

@ -33,10 +33,11 @@
use std::{
alloc::{self, Layout},
ptr,
ptr::{self, NonNull},
};
/// Allocate the given layout. This will panic if the allocator returns an error
#[inline(always)]
pub unsafe fn alloc_layout<T>(layout: Layout) -> *mut T {
let ptr = alloc::alloc(layout);
assert!(!ptr.is_null(), "malloc failed");
@ -44,17 +45,24 @@ pub unsafe fn alloc_layout<T>(layout: Layout) -> *mut T {
}
/// Allocate an block with an array layout of type `T` with space for `l` elements
#[inline(always)]
pub unsafe fn alloc_array<T>(l: usize) -> *mut T {
self::alloc_layout(Layout::array::<T>(l).unwrap_unchecked())
if l != 0 {
self::alloc_layout(Layout::array::<T>(l).unwrap_unchecked())
} else {
NonNull::dangling().as_ptr()
}
}
/// Deallocate the given layout
#[inline(always)]
pub unsafe fn dealloc_layout(ptr: *mut u8, layout: Layout) {
alloc::dealloc(ptr, layout)
}
/// Deallocate an array of type `T` with size `l`. This function will ensure that nonzero calls to the
/// allocator are made
#[inline(always)]
pub unsafe fn dealloc_array<T>(ptr: *mut T, l: usize) {
if l != 0 {
self::dealloc_layout(ptr as *mut u8, Layout::array::<T>(l).unwrap_unchecked())
@ -62,16 +70,19 @@ pub unsafe fn dealloc_array<T>(ptr: *mut T, l: usize) {
}
/// Run the dtor for the given slice (range)
#[inline(always)]
pub unsafe fn drop_slice_in_place_ref<T>(ptr: &mut [T]) {
ptr::drop_in_place(ptr as *mut [T])
}
/// Run the dtor for the given slice (defined using ptr and len)
#[inline(always)]
pub unsafe fn drop_slice_in_place<T>(ptr: *mut T, l: usize) {
ptr::drop_in_place(ptr::slice_from_raw_parts_mut(ptr, l))
}
/// Copy exactly `N` bytes from `src` to a new array of size `N`
#[inline(always)]
pub unsafe fn memcpy<const N: usize>(src: &[u8]) -> [u8; N] {
let mut dst = [0u8; N];
src.as_ptr().copy_to_nonoverlapping(dst.as_mut_ptr(), N);

@ -38,6 +38,7 @@ use crc::{Crc, Digest, CRC_64_XZ};
const CRC64: Crc<u64> = Crc::<u64>::new(&CRC_64_XZ);
#[derive(Clone)]
pub struct SCrc64 {
digest: Digest<'static, u64>,
}

@ -188,6 +188,9 @@ mod util {
*/
impl VirtualFS {
pub fn fetch_raw_data(file_name: &str) -> RuntimeResult<Vec<u8>> {
Self::with_file(file_name, |f| Ok(f.data.clone()))
}
/// Get a handle to the virtual filesystem
fn handle() -> &'static RwLock<VDir> {
static VFS: Lazy<RwLock<VDir>, fn() -> RwLock<VDir>> = Lazy::new(|| Default::default());

@ -30,6 +30,8 @@
//! and functions are defined here to deal with SDSSv1 files.
//!
pub(super) mod rw;
use {
super::super::super::{
interface::fs_traits::{FileInterfaceRead, FileInterfaceWrite},
@ -46,6 +48,8 @@ use {
},
};
pub const TEST_TIME: u128 = (u64::MAX / sizeof!(u64) as u64) as _;
/*
header utils
*/
@ -238,7 +242,11 @@ impl<H: HeaderV1Spec> HeaderV1<H> {
file_specifier: H::FileSpecifier,
file_specifier_version: FileSpecifierVersion,
) -> (Self, [u8; 64]) {
let epoch_time = os::get_epoch_time();
let epoch_time = if cfg!(test) {
TEST_TIME
} else {
os::get_epoch_time()
};
let encoded = Self::_encode_auto_raw(
file_class,
file_specifier,
@ -414,6 +422,11 @@ pub trait FileSpecV1 {
f: &mut impl FileInterfaceWrite,
args: Self::EncodeArgs,
) -> RuntimeResult<Self::Metadata>;
fn metadata_to_block(args: Self::EncodeArgs) -> RuntimeResult<Vec<u8>> {
let mut v = Vec::new();
Self::write_metadata(&mut v, args)?;
Ok(v)
}
}
/// # Simple SDSS file specification (v1)

@ -0,0 +1,555 @@
/*
* Created on Sat Jan 20 2024
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2024, Sayan Nandan <nandansayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
#![allow(dead_code)]
use crate::{
engine::{
mem::fixed_vec::FixedVec,
storage::common::{
checksum::SCrc64,
interface::fs_traits::{
FSInterface, FileInterface, FileInterfaceBufWrite, FileInterfaceExt,
FileInterfaceRead, FileInterfaceWrite, FileInterfaceWriteExt, FileOpen,
},
sdss::v1::FileSpecV1,
},
RuntimeResult,
},
util::os::SysIOError,
};
/*
file impl
*/
#[derive(Debug, PartialEq)]
/// A file with it's layout defined by a SDSS file specification
pub struct SdssFile<F, S: FileSpecV1> {
file: F,
meta: S::Metadata,
}
impl<F, S: FileSpecV1> SdssFile<F, S> {
fn new(file: F, meta: S::Metadata) -> Self {
Self { file, meta }
}
/// Returns the SDSS metadata associated with this file
pub fn sdss_metadata(&self) -> &S::Metadata {
&self.meta
}
}
impl<F: FileInterface, S: FileSpecV1> SdssFile<F, S> {
/// Open an existing SDSS based file (with no validation arguments)
pub fn open<Fs: FSInterface<File = F>>(path: &str) -> RuntimeResult<Self>
where
S: FileSpecV1<DecodeArgs = ()>,
{
let mut f = Fs::fs_fopen_rw(path)?;
let md = S::read_metadata(&mut f, ())?;
Ok(Self::new(f, md))
}
/// Create a new SDSS based file (with no initialization arguments)
pub fn create<Fs: FSInterface<File = F>>(path: &str) -> RuntimeResult<Self>
where
S: FileSpecV1<EncodeArgs = ()>,
{
let mut f = Fs::fs_fcreate_rw(path)?;
let md = S::write_metadata(&mut f, ())?;
Ok(Self::new(f, md))
}
/// Create or open an SDSS based file (with no initialization or validation arguments)
pub fn open_or_create_perm_rw<Fs: FSInterface<File = F>>(
path: &str,
) -> RuntimeResult<FileOpen<Self>>
where
S: FileSpecV1<DecodeArgs = (), EncodeArgs = ()>,
{
match Fs::fs_fopen_or_create_rw(path)? {
FileOpen::Created(mut new) => {
let md = S::write_metadata(&mut new, ())?;
Ok(FileOpen::Created(Self::new(new, md)))
}
FileOpen::Existing(mut existing) => {
let md = S::read_metadata(&mut existing, ())?;
Ok(FileOpen::Existing(Self::new(existing, md)))
}
}
}
}
impl<F: FileInterface, S: FileSpecV1> SdssFile<F, S> {
/// Get a buffered reader. Use [`SdssFile::downgrade_reader`] to get back the original file
pub fn into_buffered_reader(self) -> RuntimeResult<SdssFile<F::BufReader, S>> {
let Self { file, meta } = self;
let bufreader = F::upgrade_to_buffered_reader(file)?;
Ok(SdssFile::new(bufreader, meta))
}
/// Get back the original file from the buffered reader
pub fn downgrade_reader(
SdssFile { file, meta }: SdssFile<F::BufReader, S>,
) -> RuntimeResult<Self> {
let me = F::downgrade_reader(file)?;
Ok(Self::new(me, meta))
}
/// Get a buffered writer. Use [`SdssFile::downgrade_writer`] to get back the original file
pub fn into_buffered_writer(self) -> RuntimeResult<SdssFile<F::BufWriter, S>> {
let Self { file, meta } = self;
let bufwriter = F::upgrade_to_buffered_writer(file)?;
Ok(SdssFile::new(bufwriter, meta))
}
/// Get back the original file from the buffered writer
///
/// NB: THis will usually not explicitly sync any pending data, unless the downgrade implementation
/// of the interface does so
pub fn downgrade_writer(
SdssFile { file, meta }: SdssFile<F::BufWriter, S>,
) -> RuntimeResult<Self> {
let me = F::downgrade_writer(file)?;
Ok(Self::new(me, meta))
}
}
impl<F: FileInterfaceBufWrite, S: FileSpecV1> SdssFile<F, S> {
/// Sync writes
pub fn sync_writes(&mut self) -> RuntimeResult<()> {
self.file.sync_write_cache()
}
}
impl<F: FileInterfaceRead, S: FileSpecV1> SdssFile<F, S> {
/// Attempt to fill the entire buffer from the file
pub fn read_buffer(&mut self, buffer: &mut [u8]) -> RuntimeResult<()> {
self.file.fread_exact(buffer)
}
}
impl<F: FileInterfaceRead + FileInterfaceExt, S: FileSpecV1> SdssFile<F, S> {
/// Read the entire part of the remaining file into memory
pub fn read_full(&mut self) -> RuntimeResult<Vec<u8>> {
let len = self.file_length()? - self.file_cursor()?;
let mut buf = vec![0; len as usize];
self.read_buffer(&mut buf)?;
Ok(buf)
}
}
impl<F: FileInterfaceExt, S: FileSpecV1> SdssFile<F, S> {
/// Get the current position of the file
pub fn file_cursor(&mut self) -> RuntimeResult<u64> {
self.file.fext_cursor()
}
/// Get the length of the file
pub fn file_length(&self) -> RuntimeResult<u64> {
self.file.fext_length()
}
/// Move the cursor `n` bytes from the start
pub fn seek_from_start(&mut self, n: u64) -> RuntimeResult<()> {
self.file.fext_seek_ahead_from_start_by(n)
}
}
impl<F: FileInterfaceWrite, S: FileSpecV1> SdssFile<F, S> {
/// Attempt to write the entire buffer into the file
pub fn write_buffer(&mut self, data: &[u8]) -> RuntimeResult<()> {
self.file.fw_write_all(data)
}
}
impl<F: FileInterfaceWrite + FileInterfaceWriteExt, S: FileSpecV1> SdssFile<F, S> {
/// Sync all data and metadata permanently
pub fn fsync_all(&mut self) -> RuntimeResult<()> {
self.file.fwext_sync_all()?;
Ok(())
}
/// Write a block followed by an explicit fsync call
pub fn fsynced_write(&mut self, data: &[u8]) -> RuntimeResult<()> {
self.file.fw_write_all(data)?;
self.file.fwext_sync_all()
}
}
/*
tracked reader impl
*/
/// A [`TrackedReader`] will track various parameters of the file during read operations. By default
/// all reads are buffered
pub struct TrackedReader<F, S: FileSpecV1> {
f: SdssFile<F, S>,
len: u64,
cursor: u64,
cs: SCrc64,
}
impl<F: FileInterface, S: FileSpecV1> TrackedReader<F, S> {
/// Create a new [`TrackedReader`]. This needs to retrieve file position and length
pub fn new(mut f: SdssFile<F, S>) -> RuntimeResult<TrackedReader<F::BufReader, S>> {
let len = f.file_length()?;
let cursor = f.file_cursor()?;
let f = f.into_buffered_reader()?;
Ok(TrackedReader {
f,
len,
cursor,
cs: SCrc64::new(),
})
}
}
impl<F: FileInterfaceRead, S: FileSpecV1> TrackedReader<F, S> {
/// Attempt to fill the buffer. This read is tracked.
pub fn tracked_read(&mut self, buf: &mut [u8]) -> RuntimeResult<()> {
self.untracked_read(buf)
.map(|_| self.cs.recompute_with_new_var_block(buf))
}
/// Attempt to read a byte. This read is also tracked.
pub fn read_byte(&mut self) -> RuntimeResult<u8> {
let mut buf = [0u8; 1];
self.tracked_read(&mut buf).map(|_| buf[0])
}
/// Reset the tracked checksum
pub fn __reset_checksum(&mut self) -> u64 {
let mut crc = SCrc64::new();
core::mem::swap(&mut crc, &mut self.cs);
crc.finish()
}
/// Do an untracked read of the file.
///
/// NB: The change in cursor however will still be tracked.
pub fn untracked_read(&mut self, buf: &mut [u8]) -> RuntimeResult<()> {
if self.remaining() >= buf.len() as u64 {
match self.f.read_buffer(buf) {
Ok(()) => {
self.cursor += buf.len() as u64;
Ok(())
}
Err(e) => return Err(e),
}
} else {
Err(SysIOError::from(std::io::ErrorKind::InvalidInput).into())
}
}
/// Tracked read of a given block size. Shorthand for [`Self::tracked_read`]
pub fn read_block<const N: usize>(&mut self) -> RuntimeResult<[u8; N]> {
if !self.has_left(N as _) {
return Err(SysIOError::from(std::io::ErrorKind::InvalidInput).into());
}
let mut buf = [0; N];
self.tracked_read(&mut buf)?;
Ok(buf)
}
/// Tracked read of a [`u64`] value
pub fn read_u64_le(&mut self) -> RuntimeResult<u64> {
Ok(u64::from_le_bytes(self.read_block()?))
}
}
impl<F, S: FileSpecV1> TrackedReader<F, S> {
/// Returns the base [`SdssFile`]
pub fn into_inner<F_: FileInterface<BufReader = F>>(self) -> RuntimeResult<SdssFile<F_, S>> {
SdssFile::downgrade_reader(self.f)
}
/// Returns the number of remaining bytes
pub fn remaining(&self) -> u64 {
self.len - self.cursor
}
/// Checks if EOF
pub fn is_eof(&self) -> bool {
self.len == self.cursor
}
/// Check if atleast `v` bytes are left
pub fn has_left(&self, v: u64) -> bool {
self.remaining() >= v
}
}
/*
tracked writer
*/
/// A [`TrackedWriter`] is an advanced writer primitive that provides a robust abstraction over a writable
/// interface. It tracks the cursor, automatically buffers writes and in case of buffer flush failure,
/// provides methods to robustly handle errors, down to byte-level cursor tracking in case of failure.
pub struct TrackedWriter<
F,
S: FileSpecV1,
const SIZE: usize = 8192,
const PANIC_IF_UNFLUSHED: bool = true,
const CHECKSUM_WRITTEN_IF_BLOCK_ERROR: bool = true,
> {
f_d: F,
f_md: S::Metadata,
t_cursor: u64,
t_checksum: SCrc64,
buf: FixedVec<u8, SIZE>,
}
impl<
F,
S: FileSpecV1,
const SIZE: usize,
const PANIC_IF_UNFLUSHED: bool,
const CHECKSUM_WRITTEN_IF_BLOCK_ERROR: bool,
> TrackedWriter<F, S, SIZE, PANIC_IF_UNFLUSHED, CHECKSUM_WRITTEN_IF_BLOCK_ERROR>
{
fn _new(f_d: F, f_md: S::Metadata, t_cursor: u64, t_checksum: SCrc64) -> Self {
Self {
f_d,
f_md,
t_cursor,
t_checksum,
buf: FixedVec::allocate(),
}
}
/// Get the writer tracked cursor
///
/// IMPORTANT: this might not be the real file cursor if the file is externally modified
pub fn cursor(&self) -> u64 {
self.t_cursor
}
/// Get the cursor (casted to an [`usize`])
pub fn cursor_usize(&self) -> usize {
self.cursor() as _
}
}
impl<
F: FileInterfaceExt,
S: FileSpecV1,
const SIZE: usize,
const PANIC_IF_UNFLUSHED: bool,
const CHECKSUM_WRITTEN_IF_BLOCK_ERROR: bool,
> TrackedWriter<F, S, SIZE, PANIC_IF_UNFLUSHED, CHECKSUM_WRITTEN_IF_BLOCK_ERROR>
{
/// Create a new tracked writer
///
/// NB: The cursor is fetched. If the cursor is already available, use [`Self::with_cursor`]
pub fn new(
mut f: SdssFile<F, S>,
) -> RuntimeResult<TrackedWriter<F, S, SIZE, PANIC_IF_UNFLUSHED, CHECKSUM_WRITTEN_IF_BLOCK_ERROR>>
{
f.file_cursor().map(|v| TrackedWriter::with_cursor(f, v))
}
/// Create a new tracked writer with the provided cursor
pub fn with_cursor(f: SdssFile<F, S>, c: u64) -> Self {
Self::with_cursor_and_checksum(f, c, SCrc64::new())
}
/// Create a new tracked writer with the provided checksum and cursor
pub fn with_cursor_and_checksum(
SdssFile { file, meta }: SdssFile<F, S>,
c: u64,
ck: SCrc64,
) -> Self {
Self::_new(file, meta, c, ck)
}
pub fn current_checksum(&self) -> u64 {
self.t_checksum.clone().finish()
}
}
impl<
F: FileInterfaceWrite,
S: FileSpecV1,
const SIZE: usize,
const PANIC_IF_UNFLUSHED: bool,
const CHECKSUM_WRITTEN_IF_BLOCK_ERROR: bool,
> TrackedWriter<F, S, SIZE, PANIC_IF_UNFLUSHED, CHECKSUM_WRITTEN_IF_BLOCK_ERROR>
{
/// Do a tracked write
///
/// On error, if block error checksumming is set then whatever part of the block was written
/// will be updated in the checksum. If disabled, then the checksum is unchanged.
pub fn tracked_write(&mut self, buf: &[u8]) -> RuntimeResult<()> {
let cursor_start = self.cursor_usize();
match self.untracked_write(buf) {
Ok(()) => {
self.t_checksum.recompute_with_new_var_block(buf);
Ok(())
}
Err(e) => {
if CHECKSUM_WRITTEN_IF_BLOCK_ERROR {
let cursor_now = self.cursor_usize();
self.t_checksum
.recompute_with_new_var_block(&buf[..cursor_now - cursor_start]);
}
Err(e)
}
}
}
/// Do an untracked write
pub fn untracked_write(&mut self, buf: &[u8]) -> RuntimeResult<()> {
if self.buf.at_capacity() {
self.flush_buf()?;
}
let write_size = buf.len().saturating_sub(SIZE);
match self.f_d.fwrite_all_count(&buf[..write_size]) {
(written, r) => {
self.t_cursor += written;
// the buffer was flushed, but we errored here. the caller should be able to track
// the number of bytes that we wrote using the cursor and utilize it for any
// recovery attempts
r?;
}
}
unsafe {
// UNSAFE(@ohsayan): the slice is at most SIZE bytes in length
debug_assert!(buf[write_size..].len() <= SIZE);
self.buf.extend_from_slice(&buf[write_size..]);
}
Ok(())
}
/// Flush the buffer and then sync data and metadata
pub fn flush_sync(&mut self) -> RuntimeResult<()>
where
F: FileInterfaceWriteExt,
{
self.flush_buf().and_then(|_| self.fsync())
}
/// Flush the buffer
pub fn flush_buf(&mut self) -> RuntimeResult<()> {
match self.f_d.fwrite_all_count(&self.buf) {
(written, r) => {
if written as usize == self.buf.len() {
// if we wrote the full buffer, simply decrement
unsafe {
// UNSAFE(@ohsayan): completely safe as no dtor needed (and is a decrement anyways)
self.buf.decr_len_by(written as usize)
}
} else {
// if we failed to write the whole buffer, only remove what was written and keep
// the remaining in the buffer
unsafe {
// UNSAFE(@ohsayan): written is obviously not larger, so this is fine
self.buf.clear_start(written as _)
}
}
// update the cursor to what was written (atleast what the syscall told us)
self.t_cursor += written;
// return
r
}
}
}
pub fn fsync(&mut self) -> RuntimeResult<()>
where
F: FileInterfaceWriteExt,
{
self.f_d.fwext_sync_all()
}
}
impl<
F,
S: FileSpecV1,
const SIZE: usize,
const PANIC_IF_UNFLUSHED: bool,
const CHECKSUM_WRITTEN_IF_BLOCK_ERROR: bool,
> Drop for TrackedWriter<F, S, SIZE, PANIC_IF_UNFLUSHED, CHECKSUM_WRITTEN_IF_BLOCK_ERROR>
{
fn drop(&mut self) {
if PANIC_IF_UNFLUSHED && !self.buf.is_empty() {
panic!("buffer not completely flushed");
}
}
}
#[test]
fn check_vfs_buffering() {
use crate::engine::storage::{
common::interface::fs_test::{VFileDescriptor, VirtualFS},
v2::spec::{Header, SystemDatabaseV1},
};
fn rawfile() -> Vec<u8> {
VirtualFS::fetch_raw_data("myfile").unwrap()
}
let compiled_header = SystemDatabaseV1::metadata_to_block(()).unwrap();
let expected_checksum = {
let mut crc = SCrc64::new();
crc.recompute_with_new_var_block(&vec![0; 8192]);
crc.recompute_with_new_var_block(&[0]);
crc.recompute_with_new_var_block(&vec![0xFF; 8192]);
crc.finish()
};
closure! {
// init writer
let mut twriter: TrackedWriter<VFileDescriptor, SystemDatabaseV1> =
TrackedWriter::new(SdssFile::create::<VirtualFS>("myfile")?)?;
assert_eq!(twriter.cursor_usize(), Header::SIZE);
{
// W8192: write exact bufsize block; nothing is written (except SDSS header)
twriter.tracked_write(&[0; 8192])?;
assert_eq!(rawfile(), compiled_header);
assert_eq!(twriter.cursor_usize(), Header::SIZE);
}
{
// W1: write one more byte; buf should be flushed
twriter.tracked_write(&[0; 1])?;
assert_eq!(twriter.cursor_usize(), Header::SIZE + 8192);
let _raw_file = rawfile();
assert_eq!(&_raw_file[..Header::SIZE], compiled_header);
assert_eq!(&_raw_file[Header::SIZE..], vec![0u8; 8192]);
}
{
// FLUSH: flush buffer; 8193 bytes should be on disk (+header)
twriter.flush_buf()?;
let _raw_file = rawfile();
assert_eq!(twriter.cursor_usize(), Header::SIZE + 8192 + 1);
assert_eq!(&_raw_file[..Header::SIZE], compiled_header);
assert_eq!(&_raw_file[Header::SIZE..], vec![0u8; 8193]);
}
{
// W1: now write one byte, nothing should happen
twriter.tracked_write(&[0xFF; 1])?;
let _raw_file = rawfile();
assert_eq!(twriter.cursor_usize(), Header::SIZE + 8192 + 1);
assert_eq!(&_raw_file[..Header::SIZE], compiled_header);
assert_eq!(&_raw_file[Header::SIZE..], vec![0u8; 8193]);
}
{
// W8191: now write 8191 bytes, nothing should happen
twriter.tracked_write(&[0xFF; 8191])?;
let _raw_file = rawfile();
assert_eq!(twriter.cursor_usize(), Header::SIZE + 8192 + 1);
assert_eq!(&_raw_file[..Header::SIZE], compiled_header);
assert_eq!(&_raw_file[Header::SIZE..], vec![0u8; 8193]);
}
assert_eq!(expected_checksum, twriter.current_checksum());
{
// FLUSH: now flush and we should have header + 8193 bytes with 0x00 + 8192 bytes with 0xFF
twriter.flush_buf()?;
let _raw_file = rawfile();
assert_eq!(twriter.cursor_usize(), Header::SIZE + 8192 + 1 + 8191 + 1);
assert_eq!(&_raw_file[..Header::SIZE], compiled_header);
assert_eq!(&_raw_file[Header::SIZE..Header::SIZE + 8192 + 1], vec![0u8; 8193]);
assert_eq!(&_raw_file[Header::SIZE + 8193..], vec![0xFF; 8192]);
}
assert_eq!(expected_checksum, twriter.current_checksum());
RuntimeResult::Ok(())
}
.unwrap()
}

@ -24,4 +24,4 @@
*
*/
mod spec;
pub mod spec;

@ -26,12 +26,15 @@
use {
crate::engine::storage::common::{
sdss,
sdss::{self, v1::HeaderV1},
versions::{self, DriverVersion, FileSpecifierVersion, ServerVersion},
},
std::mem::transmute,
};
#[allow(unused)]
pub type Header = HeaderV1<HeaderImplV2>;
/// The file scope
#[repr(u8)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, sky_macros::EnumMethods)]

Loading…
Cancel
Save