Only attempt to restore data if new instance

next
Sayan Nandan 1 year ago
parent 6df09c194c
commit c35e35b9c8
No known key found for this signature in database
GPG Key ID: 42EEDF4AE9D96B54

@ -32,10 +32,13 @@ use {
crate::engine::{
core::GlobalNS,
data::uuid::Uuid,
storage::v1::{
header_meta::HostRunMode,
memfs::{NullFS, VirtualFS},
RawFSInterface,
storage::{
self,
v1::{
header_meta::HostRunMode,
memfs::{NullFS, VirtualFS},
RawFSInterface,
},
},
txn::gns::GNSTransactionDriverAnyFS,
},
@ -75,15 +78,10 @@ impl<Fs: RawFSInterface> TestGlobal<Fs> {
impl<Fs: RawFSInterface> TestGlobal<Fs> {
pub fn new_with_driver_id(log_name: &str) -> Self {
let gns = GlobalNS::empty();
let driver = GNSTransactionDriverAnyFS::open_or_reinit_with_name(
&gns,
log_name,
0,
HostRunMode::Prod,
0,
)
.unwrap();
Self::new(gns, 0, driver)
let driver = storage::v1::loader::open_gns_driver(log_name, 0, HostRunMode::Dev, 0, &gns)
.unwrap()
.into_inner();
Self::new(gns, 0, GNSTransactionDriverAnyFS::new(driver))
}
}
@ -128,13 +126,13 @@ impl<Fs: RawFSInterface> GlobalInstanceLike for TestGlobal<Fs> {
space_uuid: Uuid,
model_name: &str,
model_uuid: Uuid,
) -> crate::engine::storage::v1::SDSSResult<()> {
) -> storage::v1::SDSSResult<()> {
// create model dir
Fs::fs_create_dir(&crate::engine::storage::v1::loader::SEInitState::model_dir(
Fs::fs_create_dir(&storage::v1::loader::SEInitState::model_dir(
space_name, space_uuid, model_name, model_uuid,
))?;
let driver = crate::engine::storage::v1::data_batch::create(
&crate::engine::storage::v1::loader::SEInitState::model_path(
let driver = storage::v1::data_batch::create(
&storage::v1::loader::SEInitState::model_path(
space_name, space_uuid, model_name, model_uuid,
),
self.sys_cfg().host_data().settings_version(),

@ -41,12 +41,10 @@
- FIXME(@ohsayan): we will probably (naively) need to dynamically reposition the cursor in case the metadata is corrupted as well
*/
use super::rw::RawFSInterface;
use {
super::{
header_impl::{FileSpecifierVersion, HostRunMode, SDSSHeaderRaw},
rw::{FileOpen, SDSSFileIO},
rw::{FileOpen, RawFSInterface, SDSSFileIO},
SDSSError, SDSSResult,
},
crate::{
@ -59,34 +57,6 @@ use {
const CRC: crc::Crc<u32> = crc::Crc::<u32>::new(&crc::CRC_32_ISO_HDLC);
const RECOVERY_BLOCK_AUTO_THRESHOLD: usize = 5;
/// A journal to `/dev/null` (app. level impl)
#[cfg(test)]
pub fn null_journal<TA: JournalAdapter>(
log_file_name: &str,
log_kind: FileSpecifier,
log_kind_version: FileSpecifierVersion,
host_setting_version: u32,
host_run_mode: HostRunMode,
host_startup_counter: u64,
_: &TA::GlobalState,
) -> JournalWriter<super::memfs::NullFS, TA> {
let FileOpen::Created(journal) =
SDSSFileIO::<super::memfs::NullFS>::open_or_create_perm_rw::<false>(
log_file_name,
FileScope::Journal,
log_kind,
log_kind_version,
host_setting_version,
host_run_mode,
host_startup_counter,
)
.unwrap()
else {
panic!()
};
JournalWriter::new(journal, 0, true).unwrap()
}
pub fn open_journal<TA: JournalAdapter, Fs: RawFSInterface>(
log_file_name: &str,
log_kind: FileSpecifier,
@ -95,7 +65,7 @@ pub fn open_journal<TA: JournalAdapter, Fs: RawFSInterface>(
host_run_mode: HostRunMode,
host_startup_counter: u64,
gs: &TA::GlobalState,
) -> SDSSResult<JournalWriter<Fs, TA>> {
) -> SDSSResult<FileOpen<JournalWriter<Fs, TA>>> {
macro_rules! open_file {
($modify:literal) => {
SDSSFileIO::<Fs>::open_or_create_perm_rw::<$modify>(
@ -116,11 +86,13 @@ pub fn open_journal<TA: JournalAdapter, Fs: RawFSInterface>(
open_file!(true)
}?;
let file = match f {
FileOpen::Created(f) => return JournalWriter::new(f, 0, true),
FileOpen::Existing(file, _) => file,
FileOpen::Created(f) => return Ok(FileOpen::Created(JournalWriter::new(f, 0, true)?)),
FileOpen::Existing((file, _header)) => file,
};
let (file, last_txn) = JournalReader::<TA, Fs>::scroll(file, gs)?;
JournalWriter::new(file, last_txn, false)
Ok(FileOpen::Existing(JournalWriter::new(
file, last_txn, false,
)?))
}
/// The journal adapter

@ -28,13 +28,20 @@ use crate::engine::{
core::GlobalNS,
data::uuid::Uuid,
fractal::{FractalModelDriver, ModelDrivers, ModelUniqueID},
storage::v1::{batch_jrnl, header_meta::HostRunMode, LocalFS, SDSSErrorContext, SDSSResult},
txn::gns::GNSTransactionDriverAnyFS,
storage::v1::{
batch_jrnl, header_meta,
journal::{self, JournalWriter},
rw::{FileOpen, RawFSInterface},
LocalFS, SDSSErrorContext, SDSSResult,
},
txn::gns::{GNSAdapter, GNSTransactionDriverAnyFS},
};
const GNS_FILE_PATH: &str = "gns.db-tlog";
const GNS_LOG_VERSION_CODE: u32 = 0;
pub struct SEInitState {
pub new_instance: bool,
pub txn_driver: GNSTransactionDriverAnyFS<super::LocalFS>,
pub model_drivers: ModelDrivers<LocalFS>,
pub gns: GlobalNS,
@ -42,11 +49,13 @@ pub struct SEInitState {
impl SEInitState {
pub fn new(
new_instance: bool,
txn_driver: GNSTransactionDriverAnyFS<super::LocalFS>,
model_drivers: ModelDrivers<LocalFS>,
gns: GlobalNS,
) -> Self {
Self {
new_instance,
txn_driver,
model_drivers,
gns,
@ -54,37 +63,47 @@ impl SEInitState {
}
pub fn try_init(
host_setting_version: u32,
host_run_mode: HostRunMode,
host_run_mode: header_meta::HostRunMode,
host_startup_counter: u64,
) -> SDSSResult<Self> {
let gns = GlobalNS::empty();
let gns_txn_driver = GNSTransactionDriverAnyFS::<LocalFS>::open_or_reinit_with_name(
&gns,
let gns_txn_driver = open_gns_driver(
GNS_FILE_PATH,
host_setting_version,
host_run_mode,
host_startup_counter,
&gns,
)?;
let new_instance = gns_txn_driver.is_created();
let mut model_drivers = ModelDrivers::new();
for (space_name, space) in gns.spaces().read().iter() {
let space_uuid = space.get_uuid();
for (model_name, model) in space.models().read().iter() {
let path = Self::model_path(space_name, space_uuid, model_name, model.get_uuid());
let persist_driver = match batch_jrnl::reinit(&path, model) {
Ok(j) => j,
Err(e) => {
return Err(e.with_extra(format!(
"failed to restore model data from journal in `{path}`"
)))
}
};
let _ = model_drivers.insert(
ModelUniqueID::new(space_name, model_name, model.get_uuid()),
FractalModelDriver::init(persist_driver),
);
if !new_instance {
// this is an existing instance, so read in all data
for (space_name, space) in gns.spaces().read().iter() {
let space_uuid = space.get_uuid();
for (model_name, model) in space.models().read().iter() {
let path =
Self::model_path(space_name, space_uuid, model_name, model.get_uuid());
let persist_driver = match batch_jrnl::reinit(&path, model) {
Ok(j) => j,
Err(e) => {
return Err(e.with_extra(format!(
"failed to restore model data from journal in `{path}`"
)))
}
};
let _ = model_drivers.insert(
ModelUniqueID::new(space_name, model_name, model.get_uuid()),
FractalModelDriver::init(persist_driver),
);
}
}
}
Ok(SEInitState::new(gns_txn_driver, model_drivers, gns))
Ok(SEInitState::new(
new_instance,
GNSTransactionDriverAnyFS::new(gns_txn_driver.into_inner()),
model_drivers,
gns,
))
}
pub fn model_path(
space_name: &str,
@ -109,3 +128,21 @@ impl SEInitState {
format!("data/{space_name}-{space_uuid}")
}
}
pub fn open_gns_driver<Fs: RawFSInterface>(
path: &str,
host_setting_version: u32,
host_run_mode: header_meta::HostRunMode,
host_startup_counter: u64,
gns: &GlobalNS,
) -> SDSSResult<FileOpen<JournalWriter<Fs, GNSAdapter>>> {
journal::open_journal::<GNSAdapter, Fs>(
path,
header_meta::FileSpecifier::GNSTxnLog,
header_meta::FileSpecifierVersion::__new(GNS_LOG_VERSION_CODE),
host_setting_version,
host_run_mode,
host_startup_counter,
gns,
)
}

@ -28,8 +28,8 @@ use {
crate::engine::{
storage::v1::{
rw::{
RawFSInterface, RawFileInterface, RawFileInterfaceExt, RawFileInterfaceRead,
RawFileInterfaceWrite, RawFileInterfaceWriteExt, RawFileOpen,
FileOpen, RawFSInterface, RawFileInterface, RawFileInterfaceExt,
RawFileInterfaceRead, RawFileInterfaceWrite, RawFileInterfaceWriteExt,
},
SDSSResult,
},
@ -123,10 +123,10 @@ impl RawFSInterface for VirtualFS {
// create new file
let file = VirtualFS::fs_fopen_or_create_rw(to)?;
match file {
RawFileOpen::Created(mut c) => {
FileOpen::Created(mut c) => {
c.fw_write_all(&data)?;
}
RawFileOpen::Existing(mut e) => {
FileOpen::Existing(mut e) => {
e.fw_truncate_to(0)?;
e.fw_write_all(&data)?;
}
@ -203,7 +203,7 @@ impl RawFSInterface for VirtualFS {
fn fs_delete_dir_all(fpath: &str) -> super::SDSSResult<()> {
delete_dir(fpath, true)
}
fn fs_fopen_or_create_rw(fpath: &str) -> super::SDSSResult<super::rw::RawFileOpen<Self::File>> {
fn fs_fopen_or_create_rw(fpath: &str) -> super::SDSSResult<super::rw::FileOpen<Self::File>> {
let mut vfs = VFS.write();
// components
let (target_file, components) = split_target_and_components(fpath);
@ -213,13 +213,13 @@ impl RawFSInterface for VirtualFS {
VNode::File(f) => {
f.read = true;
f.write = true;
Ok(RawFileOpen::Existing(VFileDescriptor(fpath.into())))
Ok(FileOpen::Existing(VFileDescriptor(fpath.into())))
}
VNode::Dir(_) => return err_item_is_not_file(),
},
Entry::Vacant(v) => {
v.insert(VNode::File(VFile::new(true, true, vec![], 0)));
Ok(RawFileOpen::Created(VFileDescriptor(fpath.into())))
Ok(FileOpen::Created(VFileDescriptor(fpath.into())))
}
}
}
@ -535,8 +535,8 @@ impl RawFSInterface for NullFS {
fn fs_delete_dir_all(_: &str) -> SDSSResult<()> {
Ok(())
}
fn fs_fopen_or_create_rw(_: &str) -> SDSSResult<RawFileOpen<Self::File>> {
Ok(RawFileOpen::Created(NullFile))
fn fs_fopen_or_create_rw(_: &str) -> SDSSResult<FileOpen<Self::File>> {
Ok(FileOpen::Created(NullFile))
}
fn fs_fopen_rw(_: &str) -> SDSSResult<Self::File> {
Ok(NullFile)

@ -24,8 +24,6 @@
*
*/
use std::marker::PhantomData;
use {
super::{
header_impl::{
@ -40,35 +38,45 @@ use {
std::{
fs::{self, File},
io::{BufReader, BufWriter, Read, Seek, SeekFrom, Write},
marker::PhantomData,
},
};
#[derive(Debug)]
/// Log whether
pub enum FileOpen<F> {
Created(F),
Existing(F, SDSSHeader),
pub enum FileOpen<CF, EF = CF> {
Created(CF),
Existing(EF),
}
impl<F> FileOpen<F> {
pub fn into_existing(self) -> Option<(F, SDSSHeader)> {
impl<CF, EF> FileOpen<CF, EF> {
pub const fn is_created(&self) -> bool {
matches!(self, Self::Created(_))
}
pub const fn is_existing(&self) -> bool {
!self.is_created()
}
pub fn into_existing(self) -> Option<EF> {
match self {
Self::Existing(f, h) => Some((f, h)),
Self::Existing(e) => Some(e),
Self::Created(_) => None,
}
}
pub fn into_created(self) -> Option<F> {
pub fn into_created(self) -> Option<CF> {
match self {
Self::Created(f) => Some(f),
Self::Existing(_, _) => None,
Self::Existing(_) => None,
}
}
}
#[derive(Debug)]
pub enum RawFileOpen<F> {
Created(F),
Existing(F),
impl<F> FileOpen<F> {
pub fn into_inner(self) -> F {
match self {
Self::Created(f) => f,
Self::Existing(f) => f,
}
}
}
/// The specification for a file system interface (our own abstraction over the fs)
@ -94,7 +102,7 @@ pub trait RawFSInterface {
/// This will:
/// - Create a file if it doesn't exist
/// - Open a file it it does exist
fn fs_fopen_or_create_rw(fpath: &str) -> SDSSResult<RawFileOpen<Self::File>>;
fn fs_fopen_or_create_rw(fpath: &str) -> SDSSResult<FileOpen<Self::File>>;
/// Open an existing file
fn fs_fopen_rw(fpath: &str) -> SDSSResult<Self::File>;
/// Create a new file
@ -179,7 +187,7 @@ impl RawFSInterface for LocalFS {
fn fs_delete_dir_all(fpath: &str) -> SDSSResult<()> {
cvt(fs::remove_dir_all(fpath))
}
fn fs_fopen_or_create_rw(fpath: &str) -> SDSSResult<RawFileOpen<Self::File>> {
fn fs_fopen_or_create_rw(fpath: &str) -> SDSSResult<FileOpen<Self::File>> {
let f = File::options()
.create(true)
.read(true)
@ -187,9 +195,9 @@ impl RawFSInterface for LocalFS {
.open(fpath)?;
let md = f.metadata()?;
if md.len() == 0 {
Ok(RawFileOpen::Created(f))
Ok(FileOpen::Created(f))
} else {
Ok(RawFileOpen::Existing(f))
Ok(FileOpen::Existing(f))
}
}
fn fs_fcreate_rw(fpath: &str) -> SDSSResult<Self::File> {
@ -485,10 +493,10 @@ impl<Fs: RawFSInterface> SDSSFileIO<Fs> {
host_setting_version: u32,
host_run_mode: HostRunMode,
host_startup_counter: u64,
) -> SDSSResult<FileOpen<Self>> {
) -> SDSSResult<FileOpen<Self, (Self, SDSSHeader)>> {
let f = Fs::fs_fopen_or_create_rw(file_path)?;
match f {
RawFileOpen::Created(f) => {
FileOpen::Created(f) => {
let f = Self::_sdss_fcreate(
file_scope,
file_specifier,
@ -500,14 +508,14 @@ impl<Fs: RawFSInterface> SDSSFileIO<Fs> {
)?;
Ok(FileOpen::Created(f))
}
RawFileOpen::Existing(f) => {
FileOpen::Existing(f) => {
let (f, header) = Self::_sdss_fopen::<REWRITE_MODIFY_COUNTER>(
f,
file_scope,
file_specifier,
file_specifier_version,
)?;
Ok(FileOpen::Existing(header, f))
Ok(FileOpen::Existing((header, f)))
}
}
}

@ -55,7 +55,10 @@ fn pkey(v: impl Into<Datacell>) -> PrimaryIndexKey {
PrimaryIndexKey::try_from_dc(v.into()).unwrap()
}
fn open_file(fpath: &str) -> FileOpen<SDSSFileIO<VirtualFS>> {
fn open_file(
fpath: &str,
) -> FileOpen<SDSSFileIO<VirtualFS>, (SDSSFileIO<VirtualFS>, super::super::header_impl::SDSSHeader)>
{
SDSSFileIO::open_or_create_perm_rw::<false>(
fpath,
FileScope::DataBatch,
@ -71,7 +74,7 @@ fn open_file(fpath: &str) -> FileOpen<SDSSFileIO<VirtualFS>> {
fn open_batch_data(fpath: &str, mdl: &Model) -> DataBatchPersistDriver<VirtualFS> {
match open_file(fpath) {
FileOpen::Created(f) => DataBatchPersistDriver::new(f, true),
FileOpen::Existing(f, _) => {
FileOpen::Existing((f, _header)) => {
let mut dbr = DataBatchRestoreDriver::new(f).unwrap();
dbr.read_data_batch_into_model(mdl).unwrap();
DataBatchPersistDriver::new(dbr.into_file(), false)

@ -43,7 +43,7 @@ fn create_delete() {
)
.unwrap();
match f {
FileOpen::Existing(_, _) => panic!(),
FileOpen::Existing(_) => panic!(),
FileOpen::Created(_) => {}
};
}
@ -58,7 +58,7 @@ fn create_delete() {
)
.unwrap();
let h = match open {
FileOpen::Existing(_, header) => header,
FileOpen::Existing((_, header)) => header,
_ => panic!(),
};
assert_eq!(h.gr_mdr().file_scope(), FileScope::Journal);

@ -143,6 +143,7 @@ fn open_log(
1,
&db,
)
.map(|v| v.into_inner())
}
#[test]

@ -34,7 +34,6 @@ use {
data::uuid::Uuid,
mem::BufferedScanner,
storage::v1::{
self, header_meta,
inf::{self, PersistObject},
JournalAdapter, JournalWriter, LocalFS, RawFSInterface, SDSSResult,
},
@ -64,35 +63,18 @@ pub type GNSTransactionDriverNullZero =
#[cfg(test)]
pub type GNSTransactionDriverVFS = GNSTransactionDriverAnyFS<VirtualFS>;
const CURRENT_LOG_VERSION: u32 = 0;
/// The GNS transaction driver is used to handle DDL transactions
pub struct GNSTransactionDriverAnyFS<F: RawFSInterface = LocalFS> {
journal: JournalWriter<F, GNSAdapter>,
pub struct GNSTransactionDriverAnyFS<Fs: RawFSInterface = LocalFS> {
journal: JournalWriter<Fs, GNSAdapter>,
}
impl<Fs: RawFSInterface> GNSTransactionDriverAnyFS<Fs> {
pub fn new(journal: JournalWriter<Fs, GNSAdapter>) -> Self {
Self { journal }
}
pub fn __journal_mut(&mut self) -> &mut JournalWriter<Fs, GNSAdapter> {
&mut self.journal
}
pub fn open_or_reinit_with_name(
gns: &GlobalNS,
log_file_name: &str,
host_setting_version: u32,
host_run_mode: header_meta::HostRunMode,
host_startup_counter: u64,
) -> TransactionResult<Self> {
let journal = v1::open_journal(
log_file_name,
header_meta::FileSpecifier::GNSTxnLog,
header_meta::FileSpecifierVersion::__new(CURRENT_LOG_VERSION),
host_setting_version,
host_run_mode,
host_startup_counter,
gns,
)?;
Ok(Self { journal })
}
/// Attempts to commit the given event into the journal, handling any possible recovery triggers and returning
/// errors (if any)
pub fn try_commit<GE: GNSEvent>(&mut self, gns_event: GE) -> TransactionResult<()> {

Loading…
Cancel
Save