Make the FS definition more generic

next
Sayan Nandan 1 year ago
parent cbac4e6739
commit e309e35b63
No known key found for this signature in database
GPG Key ID: 42EEDF4AE9D96B54

@ -45,7 +45,7 @@ use {
idx::STIndexSeq,
storage::v1::{
inf::PersistTypeDscr,
rw::{RawFileIOInterface, SDSSFileIO, SDSSFileTrackedWriter},
rw::{RawFSInterface, SDSSFileIO, SDSSFileTrackedWriter},
SDSSError, SDSSResult,
},
},
@ -54,12 +54,12 @@ use {
crossbeam_epoch::pin,
};
pub struct DataBatchPersistDriver<F> {
f: SDSSFileTrackedWriter<F>,
pub struct DataBatchPersistDriver<Fs: RawFSInterface> {
f: SDSSFileTrackedWriter<Fs>,
}
impl<F: RawFileIOInterface> DataBatchPersistDriver<F> {
pub fn new(mut file: SDSSFileIO<F>, is_new: bool) -> SDSSResult<Self> {
impl<Fs: RawFSInterface> DataBatchPersistDriver<Fs> {
pub fn new(mut file: SDSSFileIO<Fs>, is_new: bool) -> SDSSResult<Self> {
if !is_new {
file.fsynced_write(&[MARKER_BATCH_REOPEN])?;
}
@ -193,7 +193,7 @@ impl<F: RawFileIOInterface> DataBatchPersistDriver<F> {
}
}
impl<F: RawFileIOInterface> DataBatchPersistDriver<F> {
impl<Fs: RawFSInterface> DataBatchPersistDriver<Fs> {
/// encode the primary key only. this means NO TAG is encoded.
fn encode_pk_only(&mut self, pk: &PrimaryIndexKey) -> SDSSResult<()> {
let buf = &mut self.f;

@ -41,7 +41,7 @@ use {
idx::{MTIndex, STIndex, STIndexSeq},
storage::v1::{
inf::PersistTypeDscr,
rw::{RawFileIOInterface, SDSSFileIO, SDSSFileTrackedReader},
rw::{RawFSInterface, SDSSFileIO, SDSSFileTrackedReader},
SDSSError, SDSSResult,
},
},
@ -105,11 +105,11 @@ enum Batch {
BatchClosed,
}
pub struct DataBatchRestoreDriver<F> {
pub struct DataBatchRestoreDriver<F: RawFSInterface> {
f: SDSSFileTrackedReader<F>,
}
impl<F: RawFileIOInterface> DataBatchRestoreDriver<F> {
impl<F: RawFSInterface> DataBatchRestoreDriver<F> {
pub fn new(f: SDSSFileIO<F>) -> SDSSResult<Self> {
Ok(Self {
f: SDSSFileTrackedReader::new(f)?,
@ -139,7 +139,7 @@ impl<F: RawFileIOInterface> DataBatchRestoreDriver<F> {
}
}
impl<F: RawFileIOInterface> DataBatchRestoreDriver<F> {
impl<F: RawFSInterface> DataBatchRestoreDriver<F> {
fn read_all_batches_and_for_each(
&mut self,
mut f: impl FnMut(NormalBatch) -> SDSSResult<()>,
@ -206,7 +206,7 @@ impl<F: RawFileIOInterface> DataBatchRestoreDriver<F> {
}
}
impl<F: RawFileIOInterface> DataBatchRestoreDriver<F> {
impl<F: RawFSInterface> DataBatchRestoreDriver<F> {
fn apply_batch(
m: &Model,
NormalBatch {
@ -297,7 +297,7 @@ impl<F: RawFileIOInterface> DataBatchRestoreDriver<F> {
}
}
impl<F: RawFileIOInterface> DataBatchRestoreDriver<F> {
impl<F: RawFSInterface> DataBatchRestoreDriver<F> {
fn read_batch_summary(&mut self, finished_early: bool) -> SDSSResult<u64> {
if !finished_early {
// we must read the batch termination signature
@ -468,7 +468,7 @@ impl BatchStartBlock {
}
}
impl<F: RawFileIOInterface> DataBatchRestoreDriver<F> {
impl<F: RawFSInterface> DataBatchRestoreDriver<F> {
fn decode_primary_key(&mut self, pk_type: u8) -> SDSSResult<PrimaryIndexKey> {
let Some(pk_type) = TagUnique::try_from_raw(pk_type) else {
return Err(SDSSError::DataBatchRestoreCorruptedEntry);

@ -41,10 +41,12 @@
- FIXME(@ohsayan): we will probably (naively) need to dynamically reposition the cursor in case the metadata is corrupted as well
*/
use super::rw::RawFSInterface;
use {
super::{
header_impl::{FileSpecifierVersion, HostRunMode, SDSSHeaderRaw},
rw::{FileOpen, RawFileIOInterface, SDSSFileIO},
rw::{FileOpen, SDSSFileIO},
SDSSError, SDSSResult,
},
crate::{
@ -67,9 +69,9 @@ pub fn null_journal<TA: JournalAdapter>(
host_run_mode: HostRunMode,
host_startup_counter: u64,
_: &TA::GlobalState,
) -> JournalWriter<super::rw::NullZero, TA> {
) -> JournalWriter<super::memfs::NullFS, TA> {
let FileOpen::Created(journal) =
SDSSFileIO::<super::rw::NullZero>::open_or_create_perm_rw::<false>(
SDSSFileIO::<super::memfs::NullFS>::open_or_create_perm_rw::<false>(
log_file_name,
FileScope::Journal,
log_kind,
@ -85,7 +87,7 @@ pub fn null_journal<TA: JournalAdapter>(
JournalWriter::new(journal, 0, true).unwrap()
}
pub fn open_journal<TA: JournalAdapter, LF: RawFileIOInterface>(
pub fn open_journal<TA: JournalAdapter, Fs: RawFSInterface>(
log_file_name: &str,
log_kind: FileSpecifier,
log_kind_version: FileSpecifierVersion,
@ -93,10 +95,10 @@ pub fn open_journal<TA: JournalAdapter, LF: RawFileIOInterface>(
host_run_mode: HostRunMode,
host_startup_counter: u64,
gs: &TA::GlobalState,
) -> SDSSResult<JournalWriter<LF, TA>> {
) -> SDSSResult<JournalWriter<Fs, TA>> {
macro_rules! open_file {
($modify:literal) => {
SDSSFileIO::<LF>::open_or_create_perm_rw::<$modify>(
SDSSFileIO::<Fs>::open_or_create_perm_rw::<$modify>(
log_file_name,
FileScope::Journal,
log_kind,
@ -117,7 +119,7 @@ pub fn open_journal<TA: JournalAdapter, LF: RawFileIOInterface>(
FileOpen::Created(f) => return JournalWriter::new(f, 0, true),
FileOpen::Existing(file, _) => file,
};
let (file, last_txn) = JournalReader::<TA, LF>::scroll(file, gs)?;
let (file, last_txn) = JournalReader::<TA, Fs>::scroll(file, gs)?;
JournalWriter::new(file, last_txn, false)
}
@ -233,9 +235,8 @@ impl JournalEntryMetadata {
}
}
#[derive(Debug)]
pub struct JournalReader<TA, LF> {
log_file: SDSSFileIO<LF>,
pub struct JournalReader<TA, Fs: RawFSInterface> {
log_file: SDSSFileIO<Fs>,
log_size: u64,
evid: u64,
closed: bool,
@ -243,8 +244,8 @@ pub struct JournalReader<TA, LF> {
_m: PhantomData<TA>,
}
impl<TA: JournalAdapter, LF: RawFileIOInterface> JournalReader<TA, LF> {
pub fn new(log_file: SDSSFileIO<LF>) -> SDSSResult<Self> {
impl<TA: JournalAdapter, Fs: RawFSInterface> JournalReader<TA, Fs> {
pub fn new(log_file: SDSSFileIO<Fs>) -> SDSSResult<Self> {
let log_size = log_file.file_length()? - SDSSHeaderRaw::header_size() as u64;
Ok(Self {
log_file,
@ -361,7 +362,7 @@ impl<TA: JournalAdapter, LF: RawFileIOInterface> JournalReader<TA, LF> {
Err(SDSSError::JournalCorrupted)
}
/// Read and apply all events in the given log file to the global state, returning the (open file, last event ID)
pub fn scroll(file: SDSSFileIO<LF>, gs: &TA::GlobalState) -> SDSSResult<(SDSSFileIO<LF>, u64)> {
pub fn scroll(file: SDSSFileIO<Fs>, gs: &TA::GlobalState) -> SDSSResult<(SDSSFileIO<Fs>, u64)> {
let mut slf = Self::new(file)?;
while !slf.end_of_file() {
slf.rapply_next_event(gs)?;
@ -374,7 +375,7 @@ impl<TA: JournalAdapter, LF: RawFileIOInterface> JournalReader<TA, LF> {
}
}
impl<TA, LF> JournalReader<TA, LF> {
impl<TA, Fs: RawFSInterface> JournalReader<TA, Fs> {
fn _incr_evid(&mut self) {
self.evid += 1;
}
@ -389,7 +390,7 @@ impl<TA, LF> JournalReader<TA, LF> {
}
}
impl<TA, LF: RawFileIOInterface> JournalReader<TA, LF> {
impl<TA, Fs: RawFSInterface> JournalReader<TA, Fs> {
fn logfile_read_into_buffer(&mut self, buf: &mut [u8]) -> SDSSResult<()> {
if !self.has_remaining_bytes(buf.len() as _) {
// do this right here to avoid another syscall
@ -401,18 +402,17 @@ impl<TA, LF: RawFileIOInterface> JournalReader<TA, LF> {
}
}
#[derive(Debug)]
pub struct JournalWriter<LF, TA> {
pub struct JournalWriter<Fs: RawFSInterface, TA> {
/// the txn log file
log_file: SDSSFileIO<LF>,
log_file: SDSSFileIO<Fs>,
/// the id of the **next** journal
id: u64,
_m: PhantomData<TA>,
closed: bool,
}
impl<LF: RawFileIOInterface, TA: JournalAdapter> JournalWriter<LF, TA> {
pub fn new(mut log_file: SDSSFileIO<LF>, last_txn_id: u64, new: bool) -> SDSSResult<Self> {
impl<Fs: RawFSInterface, TA: JournalAdapter> JournalWriter<Fs, TA> {
pub fn new(mut log_file: SDSSFileIO<Fs>, last_txn_id: u64, new: bool) -> SDSSResult<Self> {
let log_size = log_file.file_length()?;
log_file.seek_from_start(log_size)?; // avoid jumbling with headers
let mut slf = Self {
@ -450,7 +450,7 @@ impl<LF: RawFileIOInterface, TA: JournalAdapter> JournalWriter<LF, TA> {
}
}
impl<LF: RawFileIOInterface, TA> JournalWriter<LF, TA> {
impl<Fs: RawFSInterface, TA> JournalWriter<Fs, TA> {
pub fn appendrec_journal_reverse_entry(&mut self) -> SDSSResult<()> {
let mut threshold = Threshold::<RECOVERY_BLOCK_AUTO_THRESHOLD>::new();
let mut entry =
@ -480,7 +480,7 @@ impl<LF: RawFileIOInterface, TA> JournalWriter<LF, TA> {
}
}
impl<LF, TA> JournalWriter<LF, TA> {
impl<Fs: RawFSInterface, TA> JournalWriter<Fs, TA> {
fn _incr_id(&mut self) -> u64 {
let current = self.id;
self.id += 1;
@ -488,7 +488,7 @@ impl<LF, TA> JournalWriter<LF, TA> {
}
}
impl<LF, TA> Drop for JournalWriter<LF, TA> {
impl<Fs: RawFSInterface, TA> Drop for JournalWriter<Fs, TA> {
fn drop(&mut self) {
assert!(self.closed, "log not closed");
}

@ -0,0 +1,502 @@
/*
* Created on Fri Sep 08 2023
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2023, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
use {
crate::engine::{
storage::v1::{
rw::{
RawFSInterface, RawFileInterface, RawFileInterfaceExt, RawFileInterfaceRead,
RawFileInterfaceWrite, RawFileInterfaceWriteExt, RawFileOpen,
},
SDSSResult,
},
sync::cell::Lazy,
},
parking_lot::RwLock,
std::{
collections::{hash_map::Entry, HashMap},
io::{Error, ErrorKind},
},
};
static VFS: Lazy<RwLock<HashMap<Box<str>, VNode>>, fn() -> RwLock<HashMap<Box<str>, VNode>>> =
Lazy::new(|| Default::default());
/*
vnode
---
either a vfile or a vdir
*/
#[derive(Debug)]
enum VNode {
Dir(HashMap<Box<str>, Self>),
File(VFile),
}
impl VNode {
const fn is_file(&self) -> bool {
matches!(self, Self::File(_))
}
const fn is_dir(&self) -> bool {
matches!(self, Self::Dir(_))
}
fn as_dir_mut(&mut self) -> Option<&mut HashMap<Box<str>, Self>> {
match self {
Self::Dir(d) => Some(d),
Self::File(_) => None,
}
}
}
/*
vfs impl:
- nested directory structure
- make parents
- make child
*/
#[derive(Debug)]
pub struct VirtualFS;
impl RawFSInterface for VirtualFS {
type File = VFileDescriptor;
fn fs_create_dir(fpath: &str) -> super::SDSSResult<()> {
// get vfs
let mut vfs = VFS.write();
// get root dir
let path = fpath.split("/").collect::<Vec<&str>>();
// get target
let target = *path.last().unwrap();
let mut current = &mut *vfs;
// process components
let component_len = path.len() - 1;
let mut path = path.into_iter().take(component_len);
while let Some(component) = path.next() {
match current.get_mut(component) {
Some(VNode::Dir(d)) => {
current = d;
}
Some(VNode::File(_)) => {
return Err(Error::new(ErrorKind::InvalidInput, "found file in path").into())
}
None => {
return Err(
Error::new(ErrorKind::NotFound, "could not find directory in path").into(),
)
}
}
}
match current.entry(target.into()) {
Entry::Occupied(_) => return Err(Error::from(ErrorKind::AlreadyExists).into()),
Entry::Vacant(ve) => {
ve.insert(VNode::Dir(into_dict!()));
Ok(())
}
}
}
fn fs_create_dir_all(fpath: &str) -> super::SDSSResult<()> {
let mut vfs = VFS.write();
fn create_ahead(
mut ahead: &[&str],
current: &mut HashMap<Box<str>, VNode>,
) -> SDSSResult<()> {
if ahead.is_empty() {
return Ok(());
}
let this = ahead[0];
ahead = &ahead[1..];
match current.get_mut(this) {
Some(VNode::Dir(d)) => {
if ahead.is_empty() {
// hmm, this was the list dir that was to be created, but it already exists
return Err(Error::from(ErrorKind::AlreadyExists).into());
}
return create_ahead(ahead, d);
}
Some(VNode::File(_)) => {
return Err(Error::new(ErrorKind::InvalidInput, "found file in path").into())
}
None => {
let _ = current.insert(this.into(), VNode::Dir(into_dict!()));
let dir = current.get_mut(this).unwrap().as_dir_mut().unwrap();
return create_ahead(ahead, dir);
}
}
}
let pieces: Vec<&str> = fpath.split("/").collect();
create_ahead(&pieces, &mut *vfs)
}
fn fs_delete_dir(fpath: &str) -> super::SDSSResult<()> {
delete_dir(fpath, false)
}
fn fs_delete_dir_all(fpath: &str) -> super::SDSSResult<()> {
delete_dir(fpath, true)
}
fn fs_fopen_or_create_rw(fpath: &str) -> super::SDSSResult<super::rw::RawFileOpen<Self::File>> {
let mut vfs = VFS.write();
// components
let components = fpath.split("/").collect::<Vec<&str>>();
let file = components.last().unwrap().to_owned().into();
let target_dir = find_target_dir_mut(components, &mut vfs)?;
match target_dir.entry(file) {
Entry::Occupied(mut oe) => match oe.get_mut() {
VNode::File(f) => {
f.read = true;
f.write = true;
Ok(RawFileOpen::Existing(VFileDescriptor(fpath.into())))
}
VNode::Dir(_) => {
return Err(
Error::new(ErrorKind::InvalidInput, "found directory, not a file").into(),
)
}
},
Entry::Vacant(v) => {
v.insert(VNode::File(VFile::new(true, true, vec![], 0)));
Ok(RawFileOpen::Created(VFileDescriptor(fpath.into())))
}
}
}
}
fn find_target_dir_mut<'a>(
components: Vec<&str>,
mut current: &'a mut HashMap<Box<str>, VNode>,
) -> Result<&'a mut HashMap<Box<str>, VNode>, super::SDSSError> {
let path_len = components.len() - 1;
for component in components.into_iter().take(path_len) {
match current.get_mut(component) {
Some(VNode::Dir(d)) => current = d,
Some(VNode::File(_)) => {
return Err(Error::new(ErrorKind::InvalidInput, "found file in path").into())
}
None => {
return Err(
Error::new(ErrorKind::NotFound, "could not find directory in path").into(),
)
}
}
}
Ok(current)
}
fn find_target_dir<'a>(
components: Vec<&str>,
mut current: &'a HashMap<Box<str>, VNode>,
) -> Result<&'a HashMap<Box<str>, VNode>, super::SDSSError> {
let path_len = components.len() - 1;
for component in components.into_iter().take(path_len) {
match current.get(component) {
Some(VNode::Dir(d)) => current = d,
Some(VNode::File(_)) => {
return Err(Error::new(ErrorKind::InvalidInput, "found file in path").into())
}
None => {
return Err(
Error::new(ErrorKind::NotFound, "could not find directory in path").into(),
)
}
}
}
Ok(current)
}
fn delete_dir(fpath: &str, allow_if_non_empty: bool) -> Result<(), super::SDSSError> {
let mut vfs = VFS.write();
let mut current = &mut *vfs;
// process components
let components = fpath.split("/").collect::<Vec<&str>>();
let components_len = components.len() - 1;
let target = *components.last().unwrap();
for component in components.into_iter().take(components_len) {
match current.get_mut(component) {
Some(VNode::Dir(dir)) => {
current = dir;
}
Some(VNode::File(_)) => {
return Err(Error::new(ErrorKind::InvalidInput, "found file in path").into())
}
None => {
return Err(
Error::new(ErrorKind::NotFound, "could not find directory in path").into(),
)
}
}
}
match current.entry(target.into()) {
Entry::Occupied(dir) => match dir.get() {
VNode::Dir(d) => {
if allow_if_non_empty || d.is_empty() {
dir.remove();
return Ok(());
}
return Err(Error::new(ErrorKind::InvalidInput, "directory is not empty").into());
}
VNode::File(_) => {
return Err(Error::new(ErrorKind::InvalidInput, "found file in path").into())
}
},
Entry::Vacant(_) => {
return Err(Error::new(ErrorKind::NotFound, "could not find directory in path").into())
}
}
}
/*
vfile impl
---
- all r/w operations
- all seek operations
- dummy sync operations
*/
#[derive(Debug)]
pub struct VFile {
read: bool,
write: bool,
data: Vec<u8>,
pos: usize,
}
impl VFile {
fn new(read: bool, write: bool, data: Vec<u8>, pos: usize) -> Self {
Self {
read,
write,
data,
pos,
}
}
fn current(&self) -> &[u8] {
&self.data[self.pos..]
}
}
pub struct VFileDescriptor(Box<str>);
impl Drop for VFileDescriptor {
fn drop(&mut self) {
let _ = with_file_mut(&self.0, |f| {
f.read = false;
f.write = false;
f.pos = 0;
Ok(())
});
}
}
fn with_file_mut<T>(fpath: &str, mut f: impl FnMut(&mut VFile) -> SDSSResult<T>) -> SDSSResult<T> {
let mut vfs = VFS.write();
let components = fpath.split("/").collect::<Vec<&str>>();
let file = *components.last().unwrap();
let target_dir = find_target_dir_mut(components, &mut vfs)?;
match target_dir.get_mut(file) {
Some(VNode::File(file)) => f(file),
Some(VNode::Dir(_)) => {
return Err(Error::new(ErrorKind::InvalidInput, "found directory, not a file").into())
}
None => return Err(Error::from(ErrorKind::NotFound).into()),
}
}
fn with_file<T>(fpath: &str, mut f: impl FnMut(&VFile) -> SDSSResult<T>) -> SDSSResult<T> {
let vfs = VFS.read();
let components = fpath.split("/").collect::<Vec<&str>>();
let file = *components.last().unwrap();
let target_dir = find_target_dir(components, &vfs)?;
match target_dir.get(file) {
Some(VNode::File(file)) => f(file),
Some(VNode::Dir(_)) => {
return Err(Error::new(ErrorKind::InvalidInput, "found directory, not a file").into())
}
None => return Err(Error::from(ErrorKind::NotFound).into()),
}
}
impl RawFileInterface for VFileDescriptor {
type Reader = Self;
type Writer = Self;
fn into_buffered_reader(self) -> super::SDSSResult<Self::Reader> {
Ok(self)
}
fn into_buffered_writer(self) -> super::SDSSResult<Self::Writer> {
Ok(self)
}
}
impl RawFileInterfaceRead for VFileDescriptor {
fn fr_read_exact(&mut self, buf: &mut [u8]) -> super::SDSSResult<()> {
with_file_mut(&self.0, |file| {
if !file.read {
return Err(
Error::new(ErrorKind::PermissionDenied, "Read permission denied").into(),
);
}
let available_bytes = file.current().len();
if available_bytes < buf.len() {
return Err(Error::from(ErrorKind::UnexpectedEof).into());
}
buf.copy_from_slice(&file.data[file.pos..file.pos + buf.len()]);
file.pos += buf.len();
Ok(())
})
}
}
impl RawFileInterfaceWrite for VFileDescriptor {
fn fw_write_all(&mut self, bytes: &[u8]) -> super::SDSSResult<()> {
with_file_mut(&self.0, |file| {
if !file.write {
return Err(
Error::new(ErrorKind::PermissionDenied, "Write permission denied").into(),
);
}
if file.pos + bytes.len() > file.data.len() {
file.data.resize(file.pos + bytes.len(), 0);
}
file.data[file.pos..file.pos + bytes.len()].copy_from_slice(bytes);
file.pos += bytes.len();
Ok(())
})
}
}
impl RawFileInterfaceWriteExt for VFileDescriptor {
fn fw_fsync_all(&mut self) -> super::SDSSResult<()> {
with_file(&self.0, |_| Ok(()))
}
fn fw_truncate_to(&mut self, to: u64) -> super::SDSSResult<()> {
with_file_mut(&self.0, |file| {
if !file.write {
return Err(
Error::new(ErrorKind::PermissionDenied, "Write permission denied").into(),
);
}
if to as usize > file.data.len() {
file.data.resize(to as usize, 0);
} else {
file.data.truncate(to as usize);
}
if file.pos > file.data.len() {
file.pos = file.data.len();
}
Ok(())
})
}
}
impl RawFileInterfaceExt for VFileDescriptor {
fn fext_file_length(&self) -> super::SDSSResult<u64> {
with_file(&self.0, |f| Ok(f.data.len() as u64))
}
fn fext_cursor(&mut self) -> super::SDSSResult<u64> {
with_file(&self.0, |f| Ok(f.pos as u64))
}
fn fext_seek_ahead_from_start_by(&mut self, by: u64) -> super::SDSSResult<()> {
with_file_mut(&self.0, |file| {
if by > file.data.len() as u64 {
return Err(
Error::new(ErrorKind::InvalidInput, "Can't seek beyond file's end").into(),
);
}
file.pos = by as usize;
Ok(())
})
}
}
/*
nullfs
---
- equivalent of `/dev/null`
- all calls are no-ops
- infallible
*/
/// An infallible `/dev/null` implementation. Whatever you run on this, it will always be a no-op since nothing
/// is actually happening
#[derive(Debug)]
pub struct NullFS;
pub struct NullFile;
impl RawFSInterface for NullFS {
type File = NullFile;
fn fs_create_dir(_: &str) -> SDSSResult<()> {
Ok(())
}
fn fs_create_dir_all(_: &str) -> SDSSResult<()> {
Ok(())
}
fn fs_delete_dir(_: &str) -> SDSSResult<()> {
Ok(())
}
fn fs_delete_dir_all(_: &str) -> SDSSResult<()> {
Ok(())
}
fn fs_fopen_or_create_rw(_: &str) -> SDSSResult<RawFileOpen<Self::File>> {
Ok(RawFileOpen::Created(NullFile))
}
}
impl RawFileInterfaceRead for NullFile {
fn fr_read_exact(&mut self, _: &mut [u8]) -> SDSSResult<()> {
Ok(())
}
}
impl RawFileInterfaceWrite for NullFile {
fn fw_write_all(&mut self, _: &[u8]) -> SDSSResult<()> {
Ok(())
}
}
impl RawFileInterfaceWriteExt for NullFile {
fn fw_fsync_all(&mut self) -> SDSSResult<()> {
Ok(())
}
fn fw_truncate_to(&mut self, _: u64) -> SDSSResult<()> {
Ok(())
}
}
impl RawFileInterfaceExt for NullFile {
fn fext_file_length(&self) -> SDSSResult<u64> {
Ok(0)
}
fn fext_cursor(&mut self) -> SDSSResult<u64> {
Ok(0)
}
fn fext_seek_ahead_from_start_by(&mut self, _: u64) -> SDSSResult<()> {
Ok(())
}
}
impl RawFileInterface for NullFile {
type Reader = Self;
type Writer = Self;
fn into_buffered_reader(self) -> SDSSResult<Self::Reader> {
Ok(self)
}
fn into_buffered_writer(self) -> SDSSResult<Self::Writer> {
Ok(self)
}
}

@ -34,15 +34,15 @@ mod rw;
pub mod inf;
mod start_stop;
// test
#[cfg(test)]
pub mod test_util;
pub mod memfs;
#[cfg(test)]
mod tests;
// re-exports
pub use {
journal::{open_journal, JournalAdapter, JournalWriter},
rw::{BufferedScanner, NullZero, RawFileIOInterface, SDSSFileIO},
memfs::NullFS,
rw::{BufferedScanner, LocalFS, RawFSInterface, SDSSFileIO},
};
pub mod header_meta {
pub use super::header_impl::{FileScope, FileSpecifier, FileSpecifierVersion, HostRunMode};

@ -24,6 +24,8 @@
*
*/
use std::marker::PhantomData;
use {
super::{
header_impl::{
@ -36,8 +38,8 @@ use {
util::os::SysIOError,
},
std::{
fs::File,
io::{Read, Seek, SeekFrom, Write},
fs::{self, File},
io::{BufReader, BufWriter, Read, Seek, SeekFrom, Write},
ptr, slice,
},
};
@ -70,57 +72,94 @@ pub enum RawFileOpen<F> {
Existing(F),
}
pub trait RawFileIOInterface: Sized {
/// Indicates that the interface is not a `/dev/null` (or related) implementation
const NOTNULL: bool = true;
fn fopen_or_create_rw(file_path: &str) -> SDSSResult<RawFileOpen<Self>>;
fn fread_exact(&mut self, buf: &mut [u8]) -> SDSSResult<()>;
fn fwrite_all(&mut self, bytes: &[u8]) -> SDSSResult<()>;
fn fsync_all(&mut self) -> SDSSResult<()>;
fn fseek_ahead(&mut self, by: u64) -> SDSSResult<()>;
fn flen(&self) -> SDSSResult<u64>;
fn flen_set(&mut self, to: u64) -> SDSSResult<()>;
fn fcursor(&mut self) -> SDSSResult<u64>;
pub trait RawFSInterface {
const NOT_NULL: bool = true;
type File: RawFileInterface;
fn fs_create_dir(fpath: &str) -> SDSSResult<()>;
fn fs_create_dir_all(fpath: &str) -> SDSSResult<()>;
fn fs_delete_dir(fpath: &str) -> SDSSResult<()>;
fn fs_delete_dir_all(fpath: &str) -> SDSSResult<()>;
fn fs_fopen_or_create_rw(fpath: &str) -> SDSSResult<RawFileOpen<Self::File>>;
}
/// This is a kind of file like `/dev/null`. It exists in ... nothing!
pub struct NullZero;
/// A file (well, probably) that can be used for RW operations along with advanced write and extended operations (such as seeking)
pub trait RawFileInterface
where
Self: RawFileInterfaceRead
+ RawFileInterfaceWrite
+ RawFileInterfaceWriteExt
+ RawFileInterfaceExt,
{
type Reader: RawFileInterfaceRead + RawFileInterfaceExt;
type Writer: RawFileInterfaceWrite + RawFileInterfaceExt;
fn into_buffered_reader(self) -> SDSSResult<Self::Reader>;
fn into_buffered_writer(self) -> SDSSResult<Self::Writer>;
}
impl RawFileIOInterface for NullZero {
const NOTNULL: bool = false;
fn fopen_or_create_rw(_: &str) -> SDSSResult<RawFileOpen<Self>> {
Ok(RawFileOpen::Created(Self))
}
fn fread_exact(&mut self, _: &mut [u8]) -> SDSSResult<()> {
Ok(())
}
fn fwrite_all(&mut self, _: &[u8]) -> SDSSResult<()> {
Ok(())
/// A file interface that supports read operations
pub trait RawFileInterfaceRead {
fn fr_read_exact(&mut self, buf: &mut [u8]) -> SDSSResult<()>;
}
impl<R: Read> RawFileInterfaceRead for R {
fn fr_read_exact(&mut self, buf: &mut [u8]) -> SDSSResult<()> {
self.read_exact(buf).map_err(From::from)
}
fn fsync_all(&mut self) -> SDSSResult<()> {
Ok(())
}
/// A file interface that supports write operations
pub trait RawFileInterfaceWrite {
fn fw_write_all(&mut self, buf: &[u8]) -> SDSSResult<()>;
}
impl<W: Write> RawFileInterfaceWrite for W {
fn fw_write_all(&mut self, buf: &[u8]) -> SDSSResult<()> {
self.write_all(buf).map_err(From::from)
}
fn fseek_ahead(&mut self, _: u64) -> SDSSResult<()> {
Ok(())
}
/// A file interface that supports advanced write operations
pub trait RawFileInterfaceWriteExt {
fn fw_fsync_all(&mut self) -> SDSSResult<()>;
fn fw_truncate_to(&mut self, to: u64) -> SDSSResult<()>;
}
/// A file interface that supports advanced file operations
pub trait RawFileInterfaceExt {
fn fext_file_length(&self) -> SDSSResult<u64>;
fn fext_cursor(&mut self) -> SDSSResult<u64>;
fn fext_seek_ahead_from_start_by(&mut self, ahead_by: u64) -> SDSSResult<()>;
}
fn cvt<T>(v: std::io::Result<T>) -> SDSSResult<T> {
let r = v?;
Ok(r)
}
/// The actual local host file system (as an abstraction [`fs`])
#[derive(Debug)]
pub struct LocalFS;
impl RawFSInterface for LocalFS {
type File = File;
fn fs_create_dir(fpath: &str) -> SDSSResult<()> {
cvt(fs::create_dir(fpath))
}
fn flen(&self) -> SDSSResult<u64> {
Ok(0)
fn fs_create_dir_all(fpath: &str) -> SDSSResult<()> {
cvt(fs::create_dir_all(fpath))
}
fn flen_set(&mut self, _: u64) -> SDSSResult<()> {
Ok(())
fn fs_delete_dir(fpath: &str) -> SDSSResult<()> {
cvt(fs::remove_dir(fpath))
}
fn fcursor(&mut self) -> SDSSResult<u64> {
Ok(0)
fn fs_delete_dir_all(fpath: &str) -> SDSSResult<()> {
cvt(fs::remove_dir_all(fpath))
}
}
impl RawFileIOInterface for File {
fn fopen_or_create_rw(file_path: &str) -> SDSSResult<RawFileOpen<Self>> {
fn fs_fopen_or_create_rw(fpath: &str) -> SDSSResult<RawFileOpen<Self::File>> {
let f = File::options()
.create(true)
.read(true)
.write(true)
.open(file_path)?;
.open(fpath)?;
let md = f.metadata()?;
if md.len() == 0 {
Ok(RawFileOpen::Created(f))
@ -128,41 +167,79 @@ impl RawFileIOInterface for File {
Ok(RawFileOpen::Existing(f))
}
}
fn fread_exact(&mut self, buf: &mut [u8]) -> SDSSResult<()> {
self.read_exact(buf)?;
Ok(())
}
impl RawFileInterface for File {
type Reader = BufReader<Self>;
type Writer = BufWriter<Self>;
fn into_buffered_reader(self) -> SDSSResult<Self::Reader> {
Ok(BufReader::new(self))
}
fn fwrite_all(&mut self, bytes: &[u8]) -> SDSSResult<()> {
self.write_all(bytes)?;
Ok(())
fn into_buffered_writer(self) -> SDSSResult<Self::Writer> {
Ok(BufWriter::new(self))
}
fn fsync_all(&mut self) -> SDSSResult<()> {
self.sync_all()?;
Ok(())
}
impl RawFileInterfaceWriteExt for File {
fn fw_fsync_all(&mut self) -> SDSSResult<()> {
cvt(self.sync_all())
}
fn flen(&self) -> SDSSResult<u64> {
Ok(self.metadata()?.len())
fn fw_truncate_to(&mut self, to: u64) -> SDSSResult<()> {
cvt(self.set_len(to))
}
fn fseek_ahead(&mut self, by: u64) -> SDSSResult<()> {
self.seek(SeekFrom::Start(by))?;
Ok(())
}
trait LocalFSFile {
fn file_mut(&mut self) -> &mut File;
fn file(&self) -> &File;
}
impl LocalFSFile for File {
fn file_mut(&mut self) -> &mut File {
self
}
fn flen_set(&mut self, to: u64) -> SDSSResult<()> {
self.set_len(to)?;
Ok(())
fn file(&self) -> &File {
self
}
fn fcursor(&mut self) -> SDSSResult<u64> {
self.stream_position().map_err(From::from)
}
impl LocalFSFile for BufReader<File> {
fn file_mut(&mut self) -> &mut File {
self.get_mut()
}
fn file(&self) -> &File {
self.get_ref()
}
}
pub struct SDSSFileTrackedWriter<F> {
f: SDSSFileIO<F>,
impl LocalFSFile for BufWriter<File> {
fn file_mut(&mut self) -> &mut File {
self.get_mut()
}
fn file(&self) -> &File {
self.get_ref()
}
}
impl<F: LocalFSFile> RawFileInterfaceExt for F {
fn fext_file_length(&self) -> SDSSResult<u64> {
Ok(self.file().metadata()?.len())
}
fn fext_cursor(&mut self) -> SDSSResult<u64> {
cvt(self.file_mut().stream_position())
}
fn fext_seek_ahead_from_start_by(&mut self, by: u64) -> SDSSResult<()> {
cvt(self.file_mut().seek(SeekFrom::Start(by)).map(|_| ()))
}
}
pub struct SDSSFileTrackedWriter<Fs: RawFSInterface> {
f: SDSSFileIO<Fs>,
cs: SCrc,
}
impl<F: RawFileIOInterface> SDSSFileTrackedWriter<F> {
pub fn new(f: SDSSFileIO<F>) -> Self {
impl<Fs: RawFSInterface> SDSSFileTrackedWriter<Fs> {
pub fn new(f: SDSSFileIO<Fs>) -> Self {
Self { f, cs: SCrc::new() }
}
pub fn unfsynced_write(&mut self, block: &[u8]) -> SDSSResult<()> {
@ -182,23 +259,23 @@ impl<F: RawFileIOInterface> SDSSFileTrackedWriter<F> {
core::mem::swap(&mut self.cs, &mut scrc);
scrc.finish()
}
pub fn inner_file(&mut self) -> &mut SDSSFileIO<F> {
pub fn inner_file(&mut self) -> &mut SDSSFileIO<Fs> {
&mut self.f
}
}
/// [`SDSSFileLenTracked`] simply maintains application level length and checksum tracking to avoid frequent syscalls because we
/// do not expect (even though it's very possible) users to randomly modify file lengths while we're reading them
pub struct SDSSFileTrackedReader<F> {
f: SDSSFileIO<F>,
pub struct SDSSFileTrackedReader<Fs: RawFSInterface> {
f: SDSSFileIO<Fs>,
len: u64,
pos: u64,
cs: SCrc,
}
impl<F: RawFileIOInterface> SDSSFileTrackedReader<F> {
impl<Fs: RawFSInterface> SDSSFileTrackedReader<Fs> {
/// Important: this will only look at the data post the current cursor!
pub fn new(mut f: SDSSFileIO<F>) -> SDSSResult<Self> {
pub fn new(mut f: SDSSFileIO<Fs>) -> SDSSResult<Self> {
let len = f.file_length()?;
let pos = f.retrieve_cursor()?;
Ok(Self {
@ -242,10 +319,10 @@ impl<F: RawFileIOInterface> SDSSFileTrackedReader<F> {
core::mem::swap(&mut crc, &mut self.cs);
crc.finish()
}
pub fn inner_file(&mut self) -> &mut SDSSFileIO<F> {
pub fn inner_file(&mut self) -> &mut SDSSFileIO<Fs> {
&mut self.f
}
pub fn into_inner_file(self) -> SDSSFileIO<F> {
pub fn into_inner_file(self) -> SDSSFileIO<Fs> {
self.f
}
pub fn __cursor_ahead_by(&mut self, sizeof: usize) {
@ -267,11 +344,12 @@ impl<F: RawFileIOInterface> SDSSFileTrackedReader<F> {
}
#[derive(Debug)]
pub struct SDSSFileIO<F> {
f: F,
pub struct SDSSFileIO<Fs: RawFSInterface> {
f: Fs::File,
_fs: PhantomData<Fs>,
}
impl<F: RawFileIOInterface> SDSSFileIO<F> {
impl<Fs: RawFSInterface> SDSSFileIO<Fs> {
/// **IMPORTANT: File position: end-of-header-section**
pub fn open_or_create_perm_rw<const REWRITE_MODIFY_COUNTER: bool>(
file_path: &str,
@ -282,7 +360,7 @@ impl<F: RawFileIOInterface> SDSSFileIO<F> {
host_run_mode: HostRunMode,
host_startup_counter: u64,
) -> SDSSResult<FileOpen<Self>> {
let f = F::fopen_or_create_rw(file_path)?;
let f = Fs::fs_fopen_or_create_rw(file_path)?;
match f {
RawFileOpen::Created(f) => {
// since this file was just created, we need to append the header
@ -303,7 +381,7 @@ impl<F: RawFileIOInterface> SDSSFileIO<F> {
RawFileOpen::Existing(mut f) => {
// this is an existing file. decoded the header
let mut header_raw = [0u8; SDSSHeaderRaw::header_size()];
f.fread_exact(&mut header_raw)?;
f.fr_read_exact(&mut header_raw)?;
let header = SDSSHeaderRaw::decode_noverify(header_raw)
.ok_or(SDSSError::HeaderDecodeCorruptedHeader)?;
// now validate the header
@ -323,35 +401,38 @@ impl<F: RawFileIOInterface> SDSSFileIO<F> {
}
}
impl<F: RawFileIOInterface> SDSSFileIO<F> {
fn _new(f: F) -> Self {
Self { f }
impl<Fs: RawFSInterface> SDSSFileIO<Fs> {
fn _new(f: Fs::File) -> Self {
Self {
f,
_fs: PhantomData,
}
}
pub fn unfsynced_write(&mut self, data: &[u8]) -> SDSSResult<()> {
self.f.fwrite_all(data)
self.f.fw_write_all(data)
}
pub fn fsync_all(&mut self) -> SDSSResult<()> {
self.f.fsync_all()?;
self.f.fw_fsync_all()?;
Ok(())
}
pub fn fsynced_write(&mut self, data: &[u8]) -> SDSSResult<()> {
self.f.fwrite_all(data)?;
self.f.fsync_all()
self.f.fw_write_all(data)?;
self.f.fw_fsync_all()
}
pub fn read_to_buffer(&mut self, buffer: &mut [u8]) -> SDSSResult<()> {
self.f.fread_exact(buffer)
self.f.fr_read_exact(buffer)
}
pub fn file_length(&self) -> SDSSResult<u64> {
self.f.flen()
self.f.fext_file_length()
}
pub fn seek_from_start(&mut self, by: u64) -> SDSSResult<()> {
self.f.fseek_ahead(by)
self.f.fext_seek_ahead_from_start_by(by)
}
pub fn trim_file_to(&mut self, to: u64) -> SDSSResult<()> {
self.f.flen_set(to)
self.f.fw_truncate_to(to)
}
pub fn retrieve_cursor(&mut self) -> SDSSResult<u64> {
self.f.fcursor()
self.f.fext_cursor()
}
pub fn read_byte(&mut self) -> SDSSResult<u8> {
let mut r = [0; 1];

@ -1,229 +0,0 @@
/*
* Created on Thu Aug 24 2023
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2023, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
#[cfg(test)]
use super::{
header_impl::{FileScope, FileSpecifier, FileSpecifierVersion, HostRunMode},
rw::{FileOpen, SDSSFileIO},
};
use {
super::{
rw::{RawFileIOInterface, RawFileOpen},
SDSSResult,
},
crate::engine::sync::cell::Lazy,
parking_lot::RwLock,
std::{
collections::hash_map::{Entry, HashMap},
io::{Error, ErrorKind},
},
};
static VFS: Lazy<RwLock<HashMap<Box<str>, VFile>>, fn() -> RwLock<HashMap<Box<str>, VFile>>> =
Lazy::new(|| RwLock::new(HashMap::new()));
#[derive(Debug)]
struct VFile {
read: bool,
write: bool,
data: Vec<u8>,
pos: usize,
}
impl VFile {
fn new(read: bool, write: bool, data: Vec<u8>, pos: usize) -> Self {
Self {
read,
write,
data,
pos,
}
}
fn current(&self) -> &[u8] {
&self.data[self.pos..]
}
}
#[derive(Debug)]
pub struct VirtualFS(Box<str>);
impl VirtualFS {
pub fn get_file_data(f: &str) -> Option<Vec<u8>> {
VFS.read().get(f).map(|f| f.data.clone())
}
}
impl RawFileIOInterface for VirtualFS {
fn fopen_or_create_rw(file_path: &str) -> super::SDSSResult<RawFileOpen<Self>> {
match VFS.write().entry(file_path.into()) {
Entry::Occupied(mut oe) => {
oe.get_mut().read = true;
oe.get_mut().write = true;
oe.get_mut().pos = 0;
Ok(RawFileOpen::Existing(Self(file_path.into())))
}
Entry::Vacant(v) => {
v.insert(VFile::new(true, true, vec![], 0));
Ok(RawFileOpen::Created(Self(file_path.into())))
}
}
}
fn fread_exact(&mut self, buf: &mut [u8]) -> SDSSResult<()> {
let mut vfs = VFS.write();
let file = vfs
.get_mut(&self.0)
.ok_or(Error::new(ErrorKind::NotFound, "File not found"))?;
if !file.read {
return Err(Error::new(ErrorKind::PermissionDenied, "Read permission denied").into());
}
let available_bytes = file.current().len();
if available_bytes < buf.len() {
return Err(Error::from(ErrorKind::UnexpectedEof).into());
}
buf.copy_from_slice(&file.data[file.pos..file.pos + buf.len()]);
file.pos += buf.len();
Ok(())
}
fn fwrite_all(&mut self, bytes: &[u8]) -> SDSSResult<()> {
let mut vfs = VFS.write();
let file = vfs
.get_mut(&self.0)
.ok_or(Error::new(ErrorKind::NotFound, "File not found"))?;
if !file.write {
return Err(Error::new(ErrorKind::PermissionDenied, "Write permission denied").into());
}
if file.pos + bytes.len() > file.data.len() {
file.data.resize(file.pos + bytes.len(), 0);
}
file.data[file.pos..file.pos + bytes.len()].copy_from_slice(bytes);
file.pos += bytes.len();
Ok(())
}
fn fsync_all(&mut self) -> super::SDSSResult<()> {
// pretty redundant for us
Ok(())
}
fn fseek_ahead(&mut self, by: u64) -> SDSSResult<()> {
let mut vfs = VFS.write();
let file = vfs
.get_mut(&self.0)
.ok_or(Error::new(ErrorKind::NotFound, "File not found"))?;
if by > file.data.len() as u64 {
return Err(Error::new(ErrorKind::InvalidInput, "Can't seek beyond file's end").into());
}
file.pos = by as usize;
Ok(())
}
fn flen(&self) -> SDSSResult<u64> {
let vfs = VFS.read();
let file = vfs
.get(&self.0)
.ok_or(Error::new(ErrorKind::NotFound, "File not found"))?;
Ok(file.data.len() as u64)
}
fn flen_set(&mut self, to: u64) -> SDSSResult<()> {
let mut vfs = VFS.write();
let file = vfs
.get_mut(&self.0)
.ok_or(Error::new(ErrorKind::NotFound, "File not found"))?;
if !file.write {
return Err(Error::new(ErrorKind::PermissionDenied, "Write permission denied").into());
}
if to as usize > file.data.len() {
file.data.resize(to as usize, 0);
} else {
file.data.truncate(to as usize);
}
if file.pos > file.data.len() {
file.pos = file.data.len();
}
Ok(())
}
fn fcursor(&mut self) -> SDSSResult<u64> {
let vfs = VFS.read();
let file = vfs
.get(&self.0)
.ok_or(Error::new(ErrorKind::NotFound, "File not found"))?;
Ok(file.pos as u64)
}
}
#[test]
fn sdss_file() {
let f = SDSSFileIO::<VirtualFS>::open_or_create_perm_rw::<false>(
"this_is_a_test_file.db",
FileScope::Journal,
FileSpecifier::TestTransactionLog,
FileSpecifierVersion::__new(0),
0,
HostRunMode::Prod,
128,
)
.unwrap();
let FileOpen::Created(mut f) = f else {
panic!()
};
f.fsynced_write(b"hello, world\n").unwrap();
f.fsynced_write(b"hello, again\n").unwrap();
let f = SDSSFileIO::<VirtualFS>::open_or_create_perm_rw::<false>(
"this_is_a_test_file.db",
FileScope::Journal,
FileSpecifier::TestTransactionLog,
FileSpecifierVersion::__new(0),
0,
HostRunMode::Prod,
128,
)
.unwrap();
let FileOpen::Existing(mut f, _) = f else {
panic!()
};
let mut buf1 = [0u8; 13];
f.read_to_buffer(&mut buf1).unwrap();
let mut buf2 = [0u8; 13];
f.read_to_buffer(&mut buf2).unwrap();
assert_eq!(&buf1, b"hello, world\n");
assert_eq!(&buf2, b"hello, again\n");
}

@ -24,7 +24,7 @@
*
*/
type VirtualFS = super::test_util::VirtualFS;
type VirtualFS = super::memfs::VirtualFS;
mod batch;
mod rw;

@ -42,8 +42,8 @@ use {
DecodedBatchEventKind, NormalBatch,
},
header_meta::{FileScope, FileSpecifier, FileSpecifierVersion, HostRunMode},
memfs::VirtualFS,
rw::{FileOpen, SDSSFileIO},
test_util::VirtualFS,
},
},
util::test_utils,

@ -31,20 +31,22 @@ use crate::engine::storage::v1::{
#[test]
fn create_delete() {
let f = SDSSFileIO::<super::VirtualFS>::open_or_create_perm_rw::<false>(
"hello_world.db-tlog",
FileScope::Journal,
FileSpecifier::GNSTxnLog,
FileSpecifierVersion::__new(0),
0,
HostRunMode::Prod,
0,
)
.unwrap();
match f {
FileOpen::Existing(_, _) => panic!(),
FileOpen::Created(_) => {}
};
{
let f = SDSSFileIO::<super::VirtualFS>::open_or_create_perm_rw::<false>(
"hello_world.db-tlog",
FileScope::Journal,
FileSpecifier::GNSTxnLog,
FileSpecifierVersion::__new(0),
0,
HostRunMode::Prod,
0,
)
.unwrap();
match f {
FileOpen::Existing(_, _) => panic!(),
FileOpen::Created(_) => {}
};
}
let open = SDSSFileIO::<super::VirtualFS>::open_or_create_perm_rw::<false>(
"hello_world.db-tlog",
FileScope::Journal,

@ -25,7 +25,7 @@
*/
#[cfg(test)]
use crate::engine::storage::v1::test_util::VirtualFS;
use crate::engine::storage::v1::memfs::VirtualFS;
use {
super::{TransactionError, TransactionResult},
crate::{
@ -35,7 +35,8 @@ use {
storage::v1::{
self, header_meta,
inf::{self, PersistObject},
BufferedScanner, JournalAdapter, JournalWriter, RawFileIOInterface, SDSSResult,
BufferedScanner, JournalAdapter, JournalWriter, LocalFS, RawFSInterface,
SDSSResult,
},
},
util::EndianQW,
@ -59,26 +60,25 @@ pub use {
};
pub type GNSTransactionDriverNullZero =
GNSTransactionDriverAnyFS<crate::engine::storage::v1::NullZero>;
GNSTransactionDriverAnyFS<crate::engine::storage::v1::NullFS>;
pub type GNSTransactionDriver = GNSTransactionDriverAnyFS<File>;
#[cfg(test)]
pub type GNSTransactionDriverVFS = GNSTransactionDriverAnyFS<VirtualFS>;
const CURRENT_LOG_VERSION: u32 = 0;
pub trait GNSTransactionDriverLLInterface: RawFileIOInterface {
pub trait GNSTransactionDriverLLInterface: RawFSInterface {
/// If true, this is an actual txn driver with a non-null (not `/dev/null` like) journal
const NONNULL: bool = <Self as RawFileIOInterface>::NOTNULL;
const NONNULL: bool = <Self as RawFSInterface>::NOT_NULL;
}
impl<T: RawFileIOInterface> GNSTransactionDriverLLInterface for T {}
impl<T: RawFSInterface> GNSTransactionDriverLLInterface for T {}
#[derive(Debug)]
/// The GNS transaction driver is used to handle DDL transactions
pub struct GNSTransactionDriverAnyFS<F = File> {
pub struct GNSTransactionDriverAnyFS<F: RawFSInterface = LocalFS> {
journal: JournalWriter<F, GNSAdapter>,
}
impl GNSTransactionDriverAnyFS<crate::engine::storage::v1::NullZero> {
impl GNSTransactionDriverAnyFS<crate::engine::storage::v1::NullFS> {
pub fn nullzero(gns: &GlobalNS) -> Self {
let journal = v1::open_journal(
"gns.db-tlog",

Loading…
Cancel
Save