Impl SDSS Writer

next
Sayan Nandan 1 year ago
parent 17b07897e5
commit 9091db6bd3
No known key found for this signature in database
GPG Key ID: 42EEDF4AE9D96B54

@ -264,7 +264,7 @@ impl HostPointerWidth {
}
}
#[derive(Debug, PartialEq)]
#[derive(Debug, PartialEq, Clone)]
pub struct StaticRecordUV {
header_version: HeaderVersion,
ptr_width: HostPointerWidth,
@ -315,7 +315,7 @@ impl StaticRecordUV {
}
}
#[derive(Debug, PartialEq)]
#[derive(Debug, PartialEq, Clone)]
/// The static record
pub struct StaticRecordUVRaw {
data: ByteStack<16>,

@ -48,7 +48,7 @@ use crate::{
- 1B: OS
*/
#[derive(Debug, PartialEq)]
#[derive(Debug, PartialEq, Clone)]
pub struct DRHostSignature {
server_version: ServerVersion,
driver_version: DriverVersion,
@ -146,9 +146,9 @@ impl DRHostSignature {
}
}
#[derive(Debug, PartialEq)]
#[derive(Debug, PartialEq, Clone)]
pub struct DRHostSignatureRaw {
data: ByteStack<24>,
pub(super) data: ByteStack<24>,
}
impl DRHostSignatureRaw {
@ -258,7 +258,7 @@ impl DRHostSignatureRaw {
= 296B
*/
#[derive(Debug, PartialEq)]
#[derive(Debug, PartialEq, Clone)]
pub struct DRRuntimeSignature {
modify_count: u64,
epoch_time: u128,
@ -336,11 +336,17 @@ impl DRRuntimeSignature {
self.host_name_raw(),
)
}
pub fn set_modify_count(&mut self, new: u64) {
self.modify_count = new;
}
pub fn bump_modify_count(&mut self) {
self.modify_count += 1;
}
}
#[derive(Debug, PartialEq, Clone)]
pub struct DRRuntimeSignatureRaw {
data: ByteStack<296>,
pub(super) data: ByteStack<296>,
}
impl DRRuntimeSignatureRaw {

@ -46,7 +46,7 @@ use crate::{
0, 63
*/
#[derive(Debug, PartialEq)]
#[derive(Debug, PartialEq, Clone)]
pub struct GRMetadataRecord {
server_version: ServerVersion,
driver_version: DriverVersion,
@ -97,6 +97,7 @@ impl GRMetadataRecord {
}
}
#[derive(Clone)]
pub struct GRMetadataRecordRaw {
pub(super) data: ByteStack<32>,
}
@ -229,7 +230,7 @@ impl GRMetadataRecordRaw {
= 304B
*/
#[derive(Debug, PartialEq)]
#[derive(Debug, PartialEq, Clone)]
pub struct GRHostRecord {
epoch_time: u128,
uptime: u128,
@ -319,7 +320,7 @@ impl GRHostRecord {
}
}
#[derive(Debug, PartialEq)]
#[derive(Debug, PartialEq, Clone)]
pub struct GRHostRecordRaw {
pub(super) data: ByteStack<304>,
}

@ -56,6 +56,8 @@
* Note: The entire part of the header is little endian encoded
*/
use crate::util::copy_slice_to_array as cp;
// (1) sr
mod sr;
// (2) gr
@ -139,7 +141,7 @@ impl HostRunMode {
}
}
#[derive(Debug, PartialEq)]
#[derive(Debug, PartialEq, Clone)]
pub struct SDSSHeader {
// static record
sr: sr::StaticRecord,
@ -182,6 +184,9 @@ impl SDSSHeader {
pub fn dr_rs(&self) -> &dr::DRRuntimeSignature {
&self.dr_rs
}
pub fn dr_rs_mut(&mut self) -> &mut dr::DRRuntimeSignature {
&mut self.dr_rs
}
pub fn encoded(&self) -> SDSSHeaderRaw {
SDSSHeaderRaw::new_full(
self.sr.encoded(),
@ -193,6 +198,7 @@ impl SDSSHeader {
}
}
#[derive(Clone)]
pub struct SDSSHeaderRaw {
sr: sr::StaticRecordRaw,
gr_0_mdr: gr::GRMetadataRecordRaw,
@ -202,6 +208,11 @@ pub struct SDSSHeaderRaw {
}
impl SDSSHeaderRaw {
const OFFSET_SR0: usize = 0;
const OFFSET_SR1: usize = sizeof!(sr::StaticRecordRaw);
const OFFSET_SR2: usize = Self::OFFSET_SR1 + sizeof!(gr::GRMetadataRecordRaw);
const OFFSET_SR3: usize = Self::OFFSET_SR2 + sizeof!(gr::GRHostRecordRaw);
const OFFSET_SR4: usize = Self::OFFSET_SR3 + sizeof!(dr::DRHostSignatureRaw);
pub fn new_auto(
gr_mdr_scope: FileScope,
gr_mdr_specifier: FileSpecifier,
@ -269,4 +280,23 @@ impl SDSSHeaderRaw {
+ sizeof!(dr::DRHostSignatureRaw)
+ sizeof!(dr::DRRuntimeSignatureRaw)
}
pub fn array(&self) -> [u8; Self::header_size()] {
let mut data = [0u8; Self::header_size()];
data[Self::OFFSET_SR0..Self::OFFSET_SR1].copy_from_slice(self.sr.base.get_ref());
data[Self::OFFSET_SR1..Self::OFFSET_SR2].copy_from_slice(self.gr_0_mdr.data.slice());
data[Self::OFFSET_SR2..Self::OFFSET_SR3].copy_from_slice(self.gr_1_hr.data.slice());
data[Self::OFFSET_SR3..Self::OFFSET_SR4].copy_from_slice(self.dr_0_hs.data.slice());
data[Self::OFFSET_SR4..].copy_from_slice(self.dr_1_rs.data.slice());
data
}
/// **☢ WARNING ☢: This only decodes; it doesn't validate expected values!**
pub fn decode(slice: [u8; Self::header_size()]) -> Option<SDSSHeader> {
let sr = sr::StaticRecordRaw::decode(cp(&slice[Self::OFFSET_SR0..Self::OFFSET_SR1]))?;
let gr_mdr =
gr::GRMetadataRecordRaw::decode(cp(&slice[Self::OFFSET_SR1..Self::OFFSET_SR2]))?;
let gr_hr = gr::GRHostRecord::decode(cp(&slice[Self::OFFSET_SR2..Self::OFFSET_SR3]))?;
let dr_sig = dr::DRHostSignature::decode(cp(&slice[Self::OFFSET_SR3..Self::OFFSET_SR4]))?;
let dr_rt = dr::DRRuntimeSignature::decode(cp(&slice[Self::OFFSET_SR4..]))?;
Some(SDSSHeader::new(sr, gr_mdr, gr_hr, dr_sig, dr_rt))
}
}

@ -29,7 +29,7 @@ use crate::engine::storage::{
versions,
};
#[derive(Debug, PartialEq)]
#[derive(Debug, PartialEq, Clone)]
pub struct StaticRecord {
sr: StaticRecordUV,
}
@ -49,6 +49,7 @@ impl StaticRecord {
}
/// Static record
#[derive(Clone)]
pub struct StaticRecordRaw {
pub(super) base: StaticRecordUVRaw,
}

@ -24,9 +24,15 @@
*
*/
// raw
mod header_impl;
// impls
mod rw;
mod start_stop;
mod txn;
// test
#[cfg(test)]
mod tests;
use std::io::Error as IoError;
@ -50,6 +56,7 @@ pub enum SDSSError {
IoErrorExtra(IoError, &'static str),
CorruptedFile(&'static str),
StartupError(&'static str),
CorruptedHeader,
}
impl SDSSError {

@ -1,5 +1,5 @@
/*
* Created on Fri May 19 2023
* Created on Tue Jul 23 2023
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
@ -23,3 +23,127 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
use {
super::{
header_impl::{
FileScope, FileSpecifier, FileSpecifierVersion, HostRunMode, SDSSHeader, SDSSHeaderRaw,
},
SDSSResult,
},
crate::engine::storage::v1::SDSSError,
std::{
fs::File,
io::{Read, Write},
},
};
#[derive(Debug)]
/// Log whether
pub enum FileOpen<F> {
Created(F),
Existing(F, SDSSHeader),
}
#[derive(Debug)]
pub enum RawFileOpen<F> {
Created(F),
Existing(F),
}
pub trait RawFileIOInterface: Sized {
fn fopen_or_create_rw(file_path: &str) -> SDSSResult<RawFileOpen<Self>>;
fn fread_exact(&mut self, buf: &mut [u8]) -> SDSSResult<()>;
fn fwrite_all(&mut self, bytes: &[u8]) -> SDSSResult<()>;
fn fsync_all(&mut self) -> SDSSResult<()>;
}
impl RawFileIOInterface for File {
fn fopen_or_create_rw(file_path: &str) -> SDSSResult<RawFileOpen<Self>> {
let f = File::options()
.create(true)
.read(true)
.write(true)
.open(file_path)?;
let md = f.metadata()?;
if md.created()? == md.modified()? {
return Ok(RawFileOpen::Created(f));
} else {
return Ok(RawFileOpen::Existing(f));
}
}
fn fread_exact(&mut self, buf: &mut [u8]) -> SDSSResult<()> {
self.read_exact(buf)?;
Ok(())
}
fn fwrite_all(&mut self, bytes: &[u8]) -> SDSSResult<()> {
self.write_all(bytes)?;
Ok(())
}
fn fsync_all(&mut self) -> SDSSResult<()> {
self.sync_all()?;
Ok(())
}
}
#[derive(Debug)]
pub struct SDSSFileIO<F> {
f: F,
}
impl<F: RawFileIOInterface> SDSSFileIO<F> {
pub fn open_or_create_perm_rw(
file_path: &str,
file_scope: FileScope,
file_specifier: FileSpecifier,
file_specifier_version: FileSpecifierVersion,
host_setting_version: u32,
host_run_mode: HostRunMode,
host_startup_counter: u64,
) -> SDSSResult<FileOpen<Self>> {
let f = F::fopen_or_create_rw(file_path)?;
match f {
RawFileOpen::Created(f) => {
// since this file was just created, we need to append the header
let data = SDSSHeaderRaw::new_auto(
file_scope,
file_specifier,
file_specifier_version,
host_setting_version,
host_run_mode,
host_startup_counter,
0,
)
.array();
let mut f = Self::_new(f);
f.fsynced_write(&data)?;
Ok(FileOpen::Created(f))
}
RawFileOpen::Existing(mut f) => {
// this is an existing file. decoded the header
let mut header_raw = [0u8; SDSSHeaderRaw::header_size()];
f.fread_exact(&mut header_raw)?;
let header = SDSSHeaderRaw::decode(header_raw).ok_or(SDSSError::CorruptedHeader)?;
// since we updated this file, let us update the header
let mut new_header = header.clone();
new_header.dr_rs_mut().bump_modify_count();
let mut f = Self::_new(f);
f.fsynced_write(new_header.encoded().array().as_ref())?;
Ok(FileOpen::Existing(f, header))
}
}
}
}
impl<F: RawFileIOInterface> SDSSFileIO<F> {
fn _new(f: F) -> Self {
Self { f }
}
pub fn fsynced_write(&mut self, data: &[u8]) -> SDSSResult<()> {
self.f.fwrite_all(data)?;
self.f.fsync_all()
}
pub fn read_to_buffer(&mut self, buffer: &mut [u8]) -> SDSSResult<()> {
self.f.fread_exact(buffer)
}
}

@ -0,0 +1,151 @@
/*
* Created on Thu Jul 23 2023
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2023, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
use {
super::{
rw::{RawFileIOInterface, RawFileOpen},
SDSSError, SDSSResult,
},
crate::engine::sync::cell::Lazy,
parking_lot::RwLock,
std::{
collections::{hash_map::Entry, HashMap},
io::{ErrorKind, Read, Write},
},
};
static VFS: Lazy<
RwLock<HashMap<String, VirtualFile>>,
fn() -> RwLock<HashMap<String, VirtualFile>>,
> = Lazy::new(|| RwLock::new(HashMap::new()));
fn vfs<T>(fname: &str, mut func: impl FnMut(&mut VirtualFile) -> SDSSResult<T>) -> SDSSResult<T> {
let mut vfs = VFS.write();
let f = vfs
.get_mut(fname)
.ok_or(SDSSError::from(std::io::Error::from(ErrorKind::NotFound)))?;
func(f)
}
struct VirtualFile {
read: bool,
write: bool,
data: Vec<u8>,
}
impl VirtualFile {
fn new(read: bool, write: bool, data: Vec<u8>) -> Self {
Self { read, write, data }
}
fn rw(data: Vec<u8>) -> Self {
Self::new(true, true, data)
}
fn w(data: Vec<u8>) -> Self {
Self::new(false, true, data)
}
fn r(data: Vec<u8>) -> Self {
Self::new(true, false, data)
}
}
struct VirtualFileInterface(Box<str>);
impl RawFileIOInterface for VirtualFileInterface {
fn fopen_or_create_rw(file_path: &str) -> SDSSResult<RawFileOpen<Self>> {
match VFS.write().entry(file_path.to_owned()) {
Entry::Occupied(_) => Ok(RawFileOpen::Existing(Self(file_path.into()))),
Entry::Vacant(ve) => {
ve.insert(VirtualFile::rw(vec![]));
Ok(RawFileOpen::Created(Self(file_path.into())))
}
}
}
fn fread_exact(&mut self, buf: &mut [u8]) -> super::SDSSResult<()> {
vfs(&self.0, |f| {
assert!(f.read);
f.data.as_slice().read_exact(buf)?;
Ok(())
})
}
fn fwrite_all(&mut self, bytes: &[u8]) -> super::SDSSResult<()> {
vfs(&self.0, |f| {
assert!(f.write);
f.data.write_all(bytes)?;
Ok(())
})
}
fn fsync_all(&mut self) -> super::SDSSResult<()> {
Ok(())
}
}
mod rw {
use {
super::VirtualFileInterface,
crate::engine::storage::v1::{
header_impl::{FileScope, FileSpecifier, FileSpecifierVersion, HostRunMode},
rw::{FileOpen, SDSSFileIO},
},
};
#[test]
fn create_delete() {
let f = SDSSFileIO::<VirtualFileInterface>::open_or_create_perm_rw(
"hello_world.db-tlog",
FileScope::TransactionLogCompacted,
FileSpecifier::GNSTxnLog,
FileSpecifierVersion::__new(0),
0,
HostRunMode::Prod,
0,
)
.unwrap();
match f {
FileOpen::Existing(_, _) => panic!(),
FileOpen::Created(_) => {}
};
let open = SDSSFileIO::<VirtualFileInterface>::open_or_create_perm_rw(
"hello_world.db-tlog",
FileScope::TransactionLogCompacted,
FileSpecifier::GNSTxnLog,
FileSpecifierVersion::__new(0),
0,
HostRunMode::Prod,
0,
)
.unwrap();
let h = match open {
FileOpen::Existing(_, header) => header,
_ => panic!(),
};
assert_eq!(h.gr_mdr().file_scope(), FileScope::TransactionLogCompacted);
assert_eq!(h.gr_mdr().file_spec(), FileSpecifier::GNSTxnLog);
assert_eq!(h.gr_mdr().file_spec_id(), FileSpecifierVersion::__new(0));
assert_eq!(h.gr_hr().run_mode(), HostRunMode::Prod);
assert_eq!(h.gr_hr().setting_version(), 0);
assert_eq!(h.gr_hr().startup_counter(), 0);
}
}

@ -0,0 +1,38 @@
/*
* Created on Thu Jul 23 2023
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2023, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
/*
+----------------+------------------------------+-----------------+------------------+--------------------+
| EVENT ID (16B) | EVENT SOURCE + METADATA (8B) | EVENT MD5 (16B) | PAYLOAD LEN (8B) | EVENT PAYLOAD (?B) |
+----------------+------------------------------+-----------------+------------------+--------------------+
Event ID:
- The atomically incrementing event ID (for future scale we have 16B; it's like the ZFS situation haha)
- Event source (1B) + 7B padding (for future metadata)
- Event MD5; yeah, it's "not as strong as" SHA256 but I've chosen to have it here (since it's sometimes faster and further on,
we already sum the entire log)
- Payload len: the size of the pyload
- Payload: the payload
*/

@ -35,6 +35,7 @@ pub const ORD_RLX: Ordering = Ordering::Relaxed;
pub const ORD_ACQ: Ordering = Ordering::Acquire;
pub const ORD_REL: Ordering = Ordering::Release;
pub const ORD_ACR: Ordering = Ordering::AcqRel;
pub const ORD_SEQ: Ordering = Ordering::SeqCst;
type CxResult<'g, T, P> = Result<Shared<'g, T>, CompareExchangeError<'g, T, P>>;

@ -25,11 +25,113 @@
*/
use {
super::atm::{upin, Atomic, Guard, Owned, Shared, ORD_REL},
core::{marker::PhantomData, ops::Deref},
super::{
atm::{upin, Atomic, Guard, Owned, Shared, ORD_ACQ, ORD_REL, ORD_SEQ},
Backoff,
},
core::{
marker::PhantomData,
mem,
ops::Deref,
ptr,
sync::atomic::{AtomicBool, AtomicPtr},
},
parking_lot::{Mutex, MutexGuard},
};
/// A lazily intialized, or _call by need_ value
#[derive(Debug)]
pub struct Lazy<T, F> {
/// the value (null at first)
value: AtomicPtr<T>,
/// the function that will init the value
init_func: F,
/// is some thread trying to initialize the value
init_state: AtomicBool,
}
impl<T, F> Lazy<T, F> {
pub const fn new(init_func: F) -> Self {
Self {
value: AtomicPtr::new(ptr::null_mut()),
init_func,
init_state: AtomicBool::new(false),
}
}
}
impl<T, F> Deref for Lazy<T, F>
where
F: Fn() -> T,
{
type Target = T;
fn deref(&self) -> &Self::Target {
let value_ptr = self.value.load(ORD_ACQ);
if !value_ptr.is_null() {
// the value has already been initialized, return
unsafe {
// UNSAFE(@ohsayan): We've just asserted that the value is not null
return &*value_ptr;
}
}
// it's null, so it's useless
// hold on until someone is trying to init
let backoff = Backoff::new();
while self
.init_state
.compare_exchange(false, true, ORD_SEQ, ORD_SEQ)
.is_err()
{
// wait until the other thread finishes
backoff.snooze();
}
/*
see the value before the last store. while we were one the loop,
some other thread could have initialized it already
*/
let value_ptr = self.value.load(ORD_ACQ);
if !value_ptr.is_null() {
// no more init, someone initialized it already
assert!(self.init_state.swap(false, ORD_SEQ));
unsafe {
// UNSAFE(@ohsayan): We've already loaded the value checked
// that it isn't null
&*value_ptr
}
} else {
// so no one cared to initialize the value in between
// fine, we'll init it
let value = (self.init_func)();
let value_ptr = Box::into_raw(Box::new(value));
// now swap out the older value and check it for sanity
assert!(self.value.swap(value_ptr, ORD_SEQ).is_null());
// set trying to init flag to false
assert!(self.init_state.swap(false, ORD_SEQ));
unsafe {
// UNSAFE(@ohsayan): We just initialized the value ourselves
// so it is not null!
&*value_ptr
}
}
}
}
impl<T, F> Drop for Lazy<T, F> {
fn drop(&mut self) {
if mem::needs_drop::<T>() {
// this needs drop
let value_ptr = self.value.load(ORD_ACQ);
if !value_ptr.is_null() {
unsafe {
// UNSAFE(@ohsayan): We've just checked if the value is null or not
mem::drop(Box::from_raw(value_ptr))
}
}
}
}
}
/// A [`TMCell`] provides atomic reads and serialized writes; the `static` is a CB hack
#[derive(Debug)]
pub struct TMCell<T: 'static> {

@ -27,3 +27,36 @@
pub(super) mod atm;
pub(super) mod cell;
pub(super) mod smart;
use std::{cell::Cell, hint::spin_loop, thread};
/// Type to perform exponential backoff
pub struct Backoff {
cur: Cell<u8>,
}
impl Backoff {
const MAX_SPIN: u8 = 6;
const MAX_YIELD: u8 = 8;
pub fn new() -> Self {
Self { cur: Cell::new(0) }
}
/// Spin a few times, giving way to the CPU but if we have spun too many times,
/// then block by yielding to the OS scheduler. This will **eventually block**
/// if we spin more than the set `MAX_SPIN`
pub fn snooze(&self) {
if self.cur.get() <= Self::MAX_SPIN {
// we can still spin (exp)
for _ in 0..1 << self.cur.get() {
spin_loop();
}
} else {
// nope, yield to scheduler
thread::yield_now();
}
if self.cur.get() <= Self::MAX_YIELD {
// bump current step
self.cur.set(self.cur.get() + 1)
}
}
}

Loading…
Cancel
Save