Improve reliability, simplicity and recoverability of BGSAVE (#153)

* Create a new file on writing to flock-ed file

This fix is a very important one in two ways. Say we have an user A.
They go ahead and launch skyd. skyd creates a data.bin file. Now A just
deletes the data.bin file for fun. Funny enough, this never causes flock
to error!
Why? Well because the descriptor/handle is still valid and was just
unlinked from the current directory. But this might seem silly since
the user exits with a 'successfully saved notice' only to find that the
file never existed and all of their data was lost. That's bad.
There's a hidden problem in our current approach too, apart from this.
Our writing process begins by truncating the old file and then writing
to it by placing the cursor at 0. Nice, but what if this operation just
crashes. So we lost the current data AND the old data. Not good.

This commit does a better thing: it creates a new temporary file, locks
it before writing and then flushes the current data to the temporary
file. Once that succeeds, it replaces the old data.bin file with the
newly created file.

This solves both the problems mentioned here for us:
1. No more of the silly error
2. If BGSAVE crashes in between, we can be sure that at least the last
data.bin file is in proper shape and not half truncated or so.

This commit further moves the background services into their
own module(s) for easy management.

* Fix CI scripts

Fixes:
1. Our custom runner (drone/.ci.yml) was modified to kill the skyd
process once done since this pipeline is not ephemeral.
2. GHA for some reason ignores any error in the test step and proceeds
to kill the skyd process without erroring. Since GHA runners are
ephemeral, we don't need to do this manually.
next
Sayan 3 years ago committed by GitHub
parent b4c44e4684
commit 790558d2c7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -13,19 +13,22 @@ server:
from_secret: password from_secret: password
steps: steps:
- name: Update rust - name: Update rust
commands: commands:
- /usr/local/bin/rustup-update - /usr/local/bin/rustup-update
- name: Run tests - name: Run tests
environment: environment:
SRCENV: SRCENV:
from_secret: srcenv from_secret: srcenv
CARGO_HOME: CARGO_HOME:
from_secret: cargohome from_secret: cargohome
CARGO_TARGET_DIR: CARGO_TARGET_DIR:
from_secret: cargotarget from_secret: cargotarget
commands: commands:
- source $SRCENV/.cargo/env - source $SRCENV/.cargo/env
- export RUSTUP_HOME=$SRCENV/.rustup - export RUSTUP_HOME=$SRCENV/.rustup
- cargo run -p skyd -- --nosave & - cargo run -p skyd -- --nosave &
- cargo test -- --test-threads=1 - cargo test -- --test-threads=1
- name: Stop skyd
commands:
- pkill skyd

@ -70,7 +70,6 @@ jobs:
cargo build -p skyd cargo build -p skyd
cargo run -p skyd -- --nosave --noart & cargo run -p skyd -- --nosave --noart &
cargo test --verbose -- --test-threads=1 cargo test --verbose -- --test-threads=1
sudo pkill skyd
env: env:
RUST_BACKTRACE: 1 RUST_BACKTRACE: 1
if: env.IS_MD_FILE == 'false' && runner.os != 'Windows' if: env.IS_MD_FILE == 'false' && runner.os != 'Windows'
@ -80,7 +79,6 @@ jobs:
cargo build -p skyd cargo build -p skyd
START /B cargo run -p skyd -- --nosave --noart START /B cargo run -p skyd -- --nosave --noart
cargo test --verbose -- --test-threads=1 cargo test --verbose -- --test-threads=1
taskkill /IM skyd.exe /F
env: env:
RUST_BACKTRACE: 1 RUST_BACKTRACE: 1
shell: cmd shell: cmd

@ -99,7 +99,6 @@ jobs:
cargo build --target ${{ matrix.rust }} -p skyd cargo build --target ${{ matrix.rust }} -p skyd
cargo run -p skyd --target ${{ matrix.rust }} -- --nosave --noart & cargo run -p skyd --target ${{ matrix.rust }} -- --nosave --noart &
cargo test --verbose --target ${{ matrix.rust }} -- --test-threads=1 cargo test --verbose --target ${{ matrix.rust }} -- --test-threads=1
sudo pkill skyd
env: env:
RUST_BACKTRACE: 1 RUST_BACKTRACE: 1
if: env.IS_MD_FILE == 'false' && runner.os != 'Windows' if: env.IS_MD_FILE == 'false' && runner.os != 'Windows'
@ -109,7 +108,6 @@ jobs:
cargo build -p skyd --target ${{ matrix.rust }} cargo build -p skyd --target ${{ matrix.rust }}
START /B cargo run -p skyd --target ${{ matrix.rust }} -- --nosave --noart START /B cargo run -p skyd --target ${{ matrix.rust }} -- --nosave --noart
cargo test --target ${{ matrix.rust }} --verbose -- --test-threads=1 cargo test --target ${{ matrix.rust }} --verbose -- --test-threads=1
taskkill /IM skyd.exe /F
env: env:
RUST_BACKTRACE: 1 RUST_BACKTRACE: 1
shell: cmd shell: cmd
@ -204,7 +202,6 @@ jobs:
cargo build --target ${{ matrix.rust }} -p skyd cargo build --target ${{ matrix.rust }} -p skyd
cargo run -p skyd --target ${{ matrix.rust }} -- --nosave --noart & cargo run -p skyd --target ${{ matrix.rust }} -- --nosave --noart &
cargo test --verbose --target ${{ matrix.rust }} -- --test-threads=1 cargo test --verbose --target ${{ matrix.rust }} -- --test-threads=1
sudo pkill skyd
env: env:
RUST_BACKTRACE: 1 RUST_BACKTRACE: 1
if: env.IS_MD_FILE == 'false' && runner.os == 'Linux' if: env.IS_MD_FILE == 'false' && runner.os == 'Linux'
@ -214,7 +211,6 @@ jobs:
cargo build -p skyd --target ${{ matrix.rust }} cargo build -p skyd --target ${{ matrix.rust }}
START /B cargo run -p skyd --target ${{ matrix.rust }} -- --nosave --noart START /B cargo run -p skyd --target ${{ matrix.rust }} -- --nosave --noart
cargo test --target ${{ matrix.rust }} --verbose -- --test-threads=1 cargo test --target ${{ matrix.rust }} --verbose -- --test-threads=1
taskkill /IM skyd.exe /F
env: env:
RUST_BACKTRACE: 1 RUST_BACKTRACE: 1
shell: cmd shell: cmd
@ -306,7 +302,6 @@ jobs:
cargo build --target ${{ matrix.rust }} -p skyd cargo build --target ${{ matrix.rust }} -p skyd
cargo run -p skyd --target ${{ matrix.rust }} -- --nosave --noart & cargo run -p skyd --target ${{ matrix.rust }} -- --nosave --noart &
cargo test --verbose --target ${{ matrix.rust }} -- --test-threads=1 cargo test --verbose --target ${{ matrix.rust }} -- --test-threads=1
sudo pkill skyd
env: env:
RUST_BACKTRACE: 1 RUST_BACKTRACE: 1
if: env.IS_MD_FILE == 'false' && runner.os == 'Linux' if: env.IS_MD_FILE == 'false' && runner.os == 'Linux'

@ -34,8 +34,6 @@ use crate::diskstore;
use crate::protocol::Query; use crate::protocol::Query;
use crate::queryengine; use crate::queryengine;
use bytes::Bytes; use bytes::Bytes;
use diskstore::flock;
use diskstore::PERSIST_FILE;
use libsky::TResult; use libsky::TResult;
use parking_lot::RwLock; use parking_lot::RwLock;
use parking_lot::RwLockReadGuard; use parking_lot::RwLockReadGuard;
@ -43,16 +41,6 @@ use parking_lot::RwLockWriteGuard;
use std::sync::Arc; use std::sync::Arc;
pub mod htable; pub mod htable;
#[macro_export]
macro_rules! flush_db {
($db:expr) => {
crate::coredb::CoreDB::flush_db(&$db, None)
};
($db:expr, $file:expr) => {
crate::coredb::CoreDB::flush_db(&$db, Some(&mut $file))
};
}
/// This is a thread-safe database handle, which on cloning simply /// This is a thread-safe database handle, which on cloning simply
/// gives another atomic reference to the `shared` which is a `Shared` object /// gives another atomic reference to the `shared` which is a `Shared` object
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
@ -115,38 +103,6 @@ pub struct Shared {
pub table: RwLock<Coretable>, pub table: RwLock<Coretable>,
} }
impl Shared {
/// This task performs a `sync`hronous background save operation
///
/// It runs BGSAVE and then returns control to the caller. The caller is responsible
/// for periodically calling BGSAVE
pub fn run_bgsave(&self, file: &mut flock::FileLock) {
log::trace!("BGSAVE started");
let rlock = self.table.read();
// Kick in BGSAVE
match diskstore::flush_data(file, rlock.get_ref()) {
Ok(_) => {
drop(rlock);
{
// just scope it to ensure dropping of the lock
// since this bgsave succeeded, mark the service as !poisoned, enabling it to recover
self.table.write().poisoned = false;
}
log::info!("BGSAVE completed successfully");
}
Err(e) => {
drop(rlock);
// IMPORTANT! POISON THE DATABASE, NO MORE WRITES FOR YOU!
{
// scope to ensure dropping of the lock
self.table.write().poisoned = true;
}
log::error!("BGSAVE failed with error: '{}'", e);
}
}
}
}
/// The `Coretable` holds all the key-value pairs in a `HTable` /// The `Coretable` holds all the key-value pairs in a `HTable`
#[derive(Debug)] #[derive(Debug)]
pub struct Coretable { pub struct Coretable {
@ -213,6 +169,10 @@ impl CoreDB {
(*self.shared).table.write().poisoned = true; (*self.shared).table.write().poisoned = true;
} }
pub fn unpoison(&self) {
(*self.shared).table.write().poisoned = false;
}
/// Check if snapshotting is enabled /// Check if snapshotting is enabled
pub fn is_snapshot_enabled(&self) -> bool { pub fn is_snapshot_enabled(&self) -> bool {
self.snapcfg.is_some() self.snapcfg.is_some()
@ -309,19 +269,6 @@ impl CoreDB {
pub fn acquire_read(&self) -> RwLockReadGuard<'_, Coretable> { pub fn acquire_read(&self) -> RwLockReadGuard<'_, Coretable> {
self.shared.table.read() self.shared.table.read()
} }
/// Flush the contents of the in-memory table onto disk
pub fn flush_db(&self, file: Option<&mut flock::FileLock>) -> TResult<()> {
let data = match self.acquire_write() {
Some(wlock) => wlock,
None => return Err("Can no longer flush data; coretable is poisoned".into()),
};
if let Some(mut file) = file {
diskstore::flush_data(&mut file, &data.coremap)?;
} else {
diskstore::write_to_disk(&PERSIST_FILE, &data.coremap)?;
}
Ok(())
}
#[cfg(test)] #[cfg(test)]
/// Get a deep copy of the `HTable` /// Get a deep copy of the `HTable`

@ -45,8 +45,8 @@ use crate::config::SnapshotConfig;
use crate::config::SslOpts; use crate::config::SslOpts;
use crate::dbnet::tcp::Listener; use crate::dbnet::tcp::Listener;
use crate::diskstore; use crate::diskstore;
use crate::services;
use diskstore::snapshot::DIR_REMOTE_SNAPSHOT; use diskstore::snapshot::DIR_REMOTE_SNAPSHOT;
use diskstore::{flock, PERSIST_FILE};
mod tcp; mod tcp;
use crate::CoreDB; use crate::CoreDB;
use libsky::TResult; use libsky::TResult;
@ -316,7 +316,7 @@ pub async fn run(
snapshot_cfg: SnapshotConfig, snapshot_cfg: SnapshotConfig,
sig: impl Future, sig: impl Future,
restore_filepath: Option<String>, restore_filepath: Option<String>,
) -> (CoreDB, flock::FileLock) { ) -> CoreDB {
let (signal, _) = broadcast::channel(1); let (signal, _) = broadcast::channel(1);
let (terminate_tx, terminate_rx) = mpsc::channel(1); let (terminate_tx, terminate_rx) = mpsc::channel(1);
match fs::create_dir_all(&*DIR_REMOTE_SNAPSHOT) { match fs::create_dir_all(&*DIR_REMOTE_SNAPSHOT) {
@ -336,20 +336,12 @@ pub async fn run(
process::exit(0x100); process::exit(0x100);
} }
}; };
let file = match flock::FileLock::lock(&*PERSIST_FILE) { let bgsave_handle = tokio::spawn(services::bgsave::bgsave_scheduler(
Ok(lck) => lck,
Err(e) => {
log::error!("Failed to acquire lock on data file with error: {}", e);
process::exit(1);
}
};
let bgsave_handle = tokio::spawn(diskstore::bgsave_scheduler(
db.clone(), db.clone(),
bgsave_cfg, bgsave_cfg,
file,
Terminator::new(signal.subscribe()), Terminator::new(signal.subscribe()),
)); ));
let snapshot_handle = tokio::spawn(diskstore::snapshot::snapshot_service( let snapshot_handle = tokio::spawn(services::snapshot::snapshot_service(
db.clone(), db.clone(),
snapshot_cfg, snapshot_cfg,
Terminator::new(signal.subscribe()), Terminator::new(signal.subscribe()),
@ -407,6 +399,6 @@ pub async fn run(
} }
server.finish_with_termsig().await; server.finish_with_termsig().await;
let _ = snapshot_handle.await; let _ = snapshot_handle.await;
let lock = bgsave_handle.await.unwrap(); let _ = bgsave_handle.await;
(db, lock) db
} }

@ -41,7 +41,7 @@ use std::path::PathBuf;
#[derive(Debug)] #[derive(Debug)]
/// # File Lock /// # File Lock
/// A file lock object holds a `std::fs::File` that is used to `lock()` and `unlock()` a file with a given /// A file lock object holds a `std::fs::File` that is used to `lock()` and `unlock()` a file with a given
/// `filename` passed into the `lock()` method. The file lock is configured to drop the file lock when the /// `filename` passed into the `lock()` method. The file lock is **not configured** to drop the file lock when the
/// object is dropped. The `file` field is essentially used to get the raw file descriptor for passing to /// object is dropped. The `file` field is essentially used to get the raw file descriptor for passing to
/// the platform-specific lock/unlock methods. /// the platform-specific lock/unlock methods.
/// ///
@ -49,9 +49,8 @@ use std::path::PathBuf;
/// ///
/// ## Suggestions /// ## Suggestions
/// ///
/// It is always a good idea to attempt a lock release (unlock) explicitly than letting the `Drop` implementation /// It is always a good idea to attempt a lock release (unlock) explicitly than leaving it to the operating
/// run it for you as that may cause some Wild West panic if the lock release fails (haha!). /// system. If you manually run unlock, another unlock won't be called to avoid an extra costly (is it?)
/// If you manually run unlock, then `Drop`'s implementation won't call another unlock to avoid an extra
/// syscall; this is achieved with the `unlocked` flag (field) which is set to true when the `unlock()` function /// syscall; this is achieved with the `unlocked` flag (field) which is set to true when the `unlock()` function
/// is called. /// is called.
/// ///
@ -84,19 +83,17 @@ impl FileLock {
fn _lock(file: &File) -> Result<()> { fn _lock(file: &File) -> Result<()> {
__sys::try_lock_ex(file) __sys::try_lock_ex(file)
} }
#[cfg(test)]
pub fn relock(&mut self) -> Result<()> {
__sys::try_lock_ex(&self.file)?;
self.unlocked = false;
Ok(())
}
/// Unlock the file /// Unlock the file
/// ///
/// This sets the `unlocked` flag to true /// This sets the `unlocked` flag to true
pub fn unlock(&mut self) -> Result<()> { pub fn unlock(&mut self) -> Result<()> {
__sys::unlock_file(&self.file)?; if !self.unlocked {
self.unlocked = true; __sys::unlock_file(&self.file)?;
Ok(()) self.unlocked = true;
Ok(())
} else {
Ok(())
}
} }
/// Write something to this file /// Write something to this file
pub fn write(&mut self, bytes: &[u8]) -> Result<()> { pub fn write(&mut self, bytes: &[u8]) -> Result<()> {
@ -107,6 +104,7 @@ impl FileLock {
// Now write to the file // Now write to the file
self.file.write_all(bytes) self.file.write_all(bytes)
} }
#[cfg(test)]
pub fn try_clone(&self) -> Result<Self> { pub fn try_clone(&self) -> Result<Self> {
Ok(FileLock { Ok(FileLock {
file: __sys::duplicate(&self.file)?, file: __sys::duplicate(&self.file)?,
@ -133,15 +131,6 @@ mod tests {
} }
#[cfg(windows)] #[cfg(windows)]
#[test] #[test]
#[should_panic]
fn test_windows_with_two_unlock_attempts() {
// This is a windows specific test to ensure that our logic with the `unlocked` field is correct
let mut file = FileLock::lock("data3.bin").unwrap();
file.unlock().unwrap();
file.unlock().unwrap();
}
#[cfg(windows)]
#[test]
fn test_windows_lock_and_then_unlock() { fn test_windows_lock_and_then_unlock() {
let mut file = FileLock::lock("data4.bin").unwrap(); let mut file = FileLock::lock("data4.bin").unwrap();
file.unlock().unwrap(); file.unlock().unwrap();

@ -26,10 +26,8 @@
//! This module provides tools for handling persistently stored data //! This module provides tools for handling persistently stored data
use crate::config::BGSave;
use crate::coredb::htable::HTable; use crate::coredb::htable::HTable;
use crate::coredb::{self, Data}; use crate::coredb::Data;
use crate::dbnet::Terminator;
use crate::diskstore::snapshot::{DIR_OLD_SNAPSHOT, DIR_SNAPSHOT}; use crate::diskstore::snapshot::{DIR_OLD_SNAPSHOT, DIR_SNAPSHOT};
use bincode; use bincode;
use bytes::Bytes; use bytes::Bytes;
@ -38,8 +36,6 @@ use std::fs;
use std::io::{ErrorKind, Write}; use std::io::{ErrorKind, Write};
use std::iter::FromIterator; use std::iter::FromIterator;
use std::path::PathBuf; use std::path::PathBuf;
use std::time::Duration;
use tokio::time;
pub mod flock; pub mod flock;
pub mod snapshot; pub mod snapshot;
mod snapstore; mod snapstore;
@ -182,55 +178,3 @@ fn serialize(data: &HTable<String, Data>) -> TResult<Vec<u8>> {
let encoded = bincode::serialize(&ds)?; let encoded = bincode::serialize(&ds)?;
Ok(encoded) Ok(encoded)
} }
/// The bgsave_scheduler calls the bgsave task in `CoreDB` after `every` seconds
///
/// The time after which the scheduler will wake up the BGSAVE task is determined by
/// `bgsave_cfg` which is to be passed as an argument. If BGSAVE is disabled, this function
/// immediately returns
pub async fn bgsave_scheduler(
handle: coredb::CoreDB,
bgsave_cfg: BGSave,
file: flock::FileLock,
mut terminator: Terminator,
) -> flock::FileLock {
match bgsave_cfg {
BGSave::Enabled(duration) => {
// If we're here - the user doesn't trust his power supply or just values
// his data - which is good! So we'll turn this into a `Duration`
let duration = Duration::from_secs(duration);
loop {
tokio::select! {
// Sleep until `duration` from the current time instant
_ = time::sleep_until(time::Instant::now() + duration) => {
let clone_file = match file.try_clone() {
Ok(cloned_descriptor) => cloned_descriptor,
Err(e) => {
// failed to get a clone of the descriptor ugh
handle.poison();
log::error!("BGSAVE service failed to clone descriptor: '{}'", e);
continue;
}
};
let cloned_handle = handle.clone();
tokio::task::spawn_blocking(move || {
let mut owned_file = clone_file;
let owned_handle = cloned_handle;
owned_handle.shared.run_bgsave(&mut owned_file)
}).await.expect("Something caused the background service to panic");
}
// Otherwise wait for a notification
_ = terminator.receive_signal() => {
// we got a notification to quit; so break out
break;
}
}
}
}
BGSave::Disabled => {
// the user doesn't bother about his data; cool, let's not bother about it either
}
}
log::info!("BGSAVE service has exited");
file
}

@ -26,11 +26,9 @@
//! Tools for creating snapshots //! Tools for creating snapshots
use crate::config::SnapshotConfig;
use crate::coredb::CoreDB; use crate::coredb::CoreDB;
#[cfg(test)] #[cfg(test)]
use crate::coredb::SnapshotStatus; use crate::coredb::SnapshotStatus;
use crate::dbnet::Terminator;
use crate::diskstore; use crate::diskstore;
use chrono::prelude::*; use chrono::prelude::*;
#[cfg(test)] #[cfg(test)]
@ -355,6 +353,7 @@ fn test_snapshot() {
#[test] #[test]
fn test_pre_existing_snapshots() { fn test_pre_existing_snapshots() {
use std::time::Duration;
let ourdir = "TEST_PX_SS"; let ourdir = "TEST_PX_SS";
let db = CoreDB::new_empty(std::sync::Arc::new(Some(SnapshotStatus::new(4)))); let db = CoreDB::new_empty(std::sync::Arc::new(Some(SnapshotStatus::new(4))));
let mut snapengine = SnapshotEngine::new(4, &db, Some(ourdir)).unwrap(); let mut snapengine = SnapshotEngine::new(4, &db, Some(ourdir)).unwrap();
@ -381,50 +380,6 @@ fn test_pre_existing_snapshots() {
fs::remove_dir_all(ourdir).unwrap(); fs::remove_dir_all(ourdir).unwrap();
} }
use std::time::Duration;
use tokio::time;
/// The snapshot service
///
/// This service calls `SnapEngine::mksnap()` periodically to create snapshots. Whenever
/// the interval for snapshotting expires or elapses, we create a snapshot. The snapshot service
/// keeps creating snapshots, as long as the database keeps running. Once [`dbnet::run`] broadcasts
/// a termination signal, we're ready to quit
pub async fn snapshot_service(
handle: CoreDB,
ss_config: SnapshotConfig,
mut termination_signal: Terminator,
) {
match ss_config {
SnapshotConfig::Disabled => {
// since snapshotting is disabled, we'll imediately return
return;
}
SnapshotConfig::Enabled(configuration) => {
let (duration, atmost) = configuration.decompose();
let duration = Duration::from_secs(duration);
let mut sengine = match SnapshotEngine::new(atmost, &handle, None) {
Ok(ss) => ss,
Err(e) => {
log::error!("Failed to initialize snapshot service with error: '{}'", e);
return;
}
};
loop {
tokio::select! {
_ = time::sleep_until(time::Instant::now() + duration) => {
let _ = sengine.mksnap().await;
},
_ = termination_signal.receive_signal() => {
// time to terminate; goodbye!
break;
}
}
}
}
}
log::info!("Snapshot service has exited");
}
mod queue { mod queue {
//! An extremely simple queue implementation which adds more items to the queue //! An extremely simple queue implementation which adds more items to the queue
//! freely and once the threshold limit is reached, it pops off the oldest element and returns it //! freely and once the threshold limit is reached, it pops off the oldest element and returns it

@ -49,6 +49,7 @@ use dbnet::run;
use env_logger::*; use env_logger::*;
use libsky::util::terminal; use libsky::util::terminal;
use std::sync::Arc; use std::sync::Arc;
mod services;
use tokio::signal; use tokio::signal;
#[cfg(test)] #[cfg(test)]
mod tests; mod tests;
@ -77,10 +78,10 @@ fn main() {
.enable_all() .enable_all()
.build() .build()
.unwrap(); .unwrap();
let (db, mut lock) = runtime.block_on(async { let db = runtime.block_on(async {
let (tcplistener, bgsave_config, snapshot_config, restore_filepath) = let (tcplistener, bgsave_config, snapshot_config, restore_filepath) =
check_args_and_get_cfg().await; check_args_and_get_cfg().await;
let (db, lock) = run( let db = run(
tcplistener, tcplistener,
bgsave_config, bgsave_config,
snapshot_config, snapshot_config,
@ -88,7 +89,7 @@ fn main() {
restore_filepath, restore_filepath,
) )
.await; .await;
(db, lock) db
}); });
// Make sure all background workers terminate // Make sure all background workers terminate
drop(runtime); drop(runtime);
@ -97,32 +98,28 @@ fn main() {
1, 1,
"Maybe the compiler reordered the drop causing more than one instance of CoreDB to live at this point" "Maybe the compiler reordered the drop causing more than one instance of CoreDB to live at this point"
); );
if let Err(e) = flush_db!(db, lock) { if let Err(e) = services::bgsave::_bgsave_blocking_section(&db) {
log::error!("Failed to flush data to disk with '{}'", e); log::error!("Failed to flush data to disk with '{}'", e);
loop { loop {
// Keep looping until we successfully write the in-memory table to disk // Keep looping until we successfully write the in-memory table to disk
log::warn!("Press enter to try again..."); log::warn!("Press enter to try again...");
io::stdout().flush().unwrap(); io::stdout().flush().unwrap();
io::stdin().read(&mut [0]).unwrap(); io::stdin().read(&mut [0]).unwrap();
if let Ok(_) = flush_db!(db, lock) { match services::bgsave::_bgsave_blocking_section(&db) {
log::info!("Successfully saved data to disk"); Ok(_) => {
break; log::info!("Successfully saved data to disk");
} else { break;
continue; }
Err(e) => {
log::error!("Failed to flush data to disk with '{}'", e);
continue;
}
} }
} }
} else { } else {
log::info!("Successfully saved data to disk"); log::info!("Successfully saved data to disk");
} }
if let Err(e) = lock.unlock() { terminal::write_info("Goodbye :)\n").unwrap();
log::error!(
"Failed to unlock data file even after successfully saving data: {}",
e
);
std::process::exit(0x100);
} else {
terminal::write_info("Goodbye :)\n").unwrap();
}
} }
/// This function checks the command line arguments and either returns a config object /// This function checks the command line arguments and either returns a config object

@ -0,0 +1,123 @@
/*
* Created on Sun May 16 2021
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2021, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
use crate::config::BGSave;
use crate::coredb::CoreDB;
use crate::dbnet::Terminator;
use crate::diskstore::{self, flock};
#[cfg(not(test))]
use diskstore::PERSIST_FILE;
use libsky::TResult;
use std::fs;
use tokio::time::{self, Duration};
const SKY_TEMP_FILE: &str = "__skydata_file.bin";
#[cfg(test)]
/// The **test location** for the PERSIST_FILE
pub const BGSAVE_DIRECTORY_TESTING_LOC: &str = "skydata_bgsavetest.bin";
/// The bgsave_scheduler calls the bgsave task in `CoreDB` after `every` seconds
///
/// The time after which the scheduler will wake up the BGSAVE task is determined by
/// `bgsave_cfg` which is to be passed as an argument. If BGSAVE is disabled, this function
/// immediately returns
pub async fn bgsave_scheduler(handle: CoreDB, bgsave_cfg: BGSave, mut terminator: Terminator) {
match bgsave_cfg {
BGSave::Enabled(duration) => {
// If we're here - the user doesn't trust his power supply or just values
// his data - which is good! So we'll turn this into a `Duration`
let duration = Duration::from_secs(duration);
loop {
tokio::select! {
// Sleep until `duration` from the current time instant
_ = time::sleep_until(time::Instant::now() + duration) => {
let cloned_handle = handle.clone();
// we spawn this process just to ensure that it doesn't block the runtime's workers
// dedicated to async tasks (non-blocking)
tokio::task::spawn_blocking(move || {
let owned_handle = cloned_handle;
let _ = bgsave_blocking_section(owned_handle);
}).await.expect("Something caused the background service to panic");
}
// Otherwise wait for a notification
_ = terminator.receive_signal() => {
// we got a notification to quit; so break out
break;
}
}
}
}
BGSave::Disabled => {
// the user doesn't bother about his data; cool, let's not bother about it either
}
}
log::info!("BGSAVE service has exited");
}
/// This is a _raw_ version of what Sky's persistence does and is **blocking in nature** since it does
/// a good amount of disk I/O (which totally depends on the size of the dataset though)
/// There's nothing dangerous about this really and hence it isn't as _raw_ as it sounds. This method accepts
/// a handle to a [`coredb::CoreDB`] and uses that to acquire a read lock. This method will create a temporary
/// file and lock it. It then passes an immutable HTable reference to [`diskstore::flush_data`] which flushes the data to our
/// temporary locked file. Once the data is successfully flushed, the new temporary file replaces the old data file
/// by using [`fs::rename`]. This provides us with two gurantees:
/// 1. No silly logic is seen if the user deletes the data.bin file and yet BGSAVE doesn't complain
/// 2. If this method crashes midway, we can still be sure that the old file is intact
pub fn _bgsave_blocking_section(handle: &CoreDB) -> TResult<()> {
// first lock our temporary file
let mut file = flock::FileLock::lock(SKY_TEMP_FILE)?;
// get a read lock on the coretable
let lock = handle.acquire_read();
diskstore::flush_data(&mut file, lock.get_ref())?;
// now rename the file
#[cfg(not(test))]
fs::rename(SKY_TEMP_FILE, &*PERSIST_FILE)?;
#[cfg(test)]
fs::rename(SKY_TEMP_FILE, BGSAVE_DIRECTORY_TESTING_LOC)?;
// now unlock the file
file.unlock()?;
// close the file
drop(file);
// drop the lock since we're done writing the file
drop(lock);
Ok(())
}
/// This just wraps around [`_bgsave_blocking_section`] and prints nice log messages depending on the outcome
fn bgsave_blocking_section(handle: CoreDB) -> bool {
match _bgsave_blocking_section(&handle) {
Ok(_) => {
log::info!("BGSAVE completed successfully");
drop(handle.unpoison());
true
}
Err(e) => {
log::info!("BGSAVE failed with error: {}", e);
drop(handle.poison());
false
}
}
}

@ -0,0 +1,28 @@
/*
* Created on Sun May 16 2021
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2021, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
pub mod bgsave;
pub mod snapshot;

@ -0,0 +1,73 @@
/*
* Created on Sun May 16 2021
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2021, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
use crate::config::SnapshotConfig;
use crate::dbnet::Terminator;
use crate::diskstore::snapshot::SnapshotEngine;
use crate::CoreDB;
use tokio::time::{self, Duration};
/// The snapshot service
///
/// This service calls `SnapEngine::mksnap()` periodically to create snapshots. Whenever
/// the interval for snapshotting expires or elapses, we create a snapshot. The snapshot service
/// keeps creating snapshots, as long as the database keeps running. Once [`dbnet::run`] broadcasts
/// a termination signal, we're ready to quit
pub async fn snapshot_service(
handle: CoreDB,
ss_config: SnapshotConfig,
mut termination_signal: Terminator,
) {
match ss_config {
SnapshotConfig::Disabled => {
// since snapshotting is disabled, we'll imediately return
return;
}
SnapshotConfig::Enabled(configuration) => {
let (duration, atmost) = configuration.decompose();
let duration = Duration::from_secs(duration);
let mut sengine = match SnapshotEngine::new(atmost, &handle, None) {
Ok(ss) => ss,
Err(e) => {
log::error!("Failed to initialize snapshot service with error: '{}'", e);
return;
}
};
loop {
tokio::select! {
_ = time::sleep_until(time::Instant::now() + duration) => {
let _ = sengine.mksnap().await;
},
_ = termination_signal.receive_signal() => {
// time to terminate; goodbye!
break;
}
}
}
}
}
log::info!("Snapshot service has exited");
}

@ -33,6 +33,8 @@ mod bgsave {
use crate::coredb::{htable::HTable, CoreDB, Data}; use crate::coredb::{htable::HTable, CoreDB, Data};
use crate::dbnet::Terminator; use crate::dbnet::Terminator;
use crate::diskstore; use crate::diskstore;
use crate::services;
use services::bgsave::BGSAVE_DIRECTORY_TESTING_LOC;
use std::fs; use std::fs;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::broadcast; use tokio::sync::broadcast;
@ -49,23 +51,17 @@ mod bgsave {
let DUR_WITH_EPSILON: Duration = Duration::from_millis(1500) + Duration::from_secs(10); let DUR_WITH_EPSILON: Duration = Duration::from_millis(1500) + Duration::from_secs(10);
let (signal, _) = broadcast::channel(1); let (signal, _) = broadcast::channel(1);
let datahandle = CoreDB::new_empty(Arc::new(None)); let datahandle = CoreDB::new_empty(Arc::new(None));
let mut flock = diskstore::flock::FileLock::lock("bgsave_test_1.bin").unwrap();
let bgsave_configuration = BGSave::Enabled(10); let bgsave_configuration = BGSave::Enabled(10);
let handle = tokio::spawn(diskstore::bgsave_scheduler( let handle = tokio::spawn(services::bgsave::bgsave_scheduler(
datahandle.clone(), datahandle.clone(),
bgsave_configuration, bgsave_configuration,
flock.try_clone().unwrap(),
Terminator::new(signal.subscribe()), Terminator::new(signal.subscribe()),
)); ));
// sleep for 10 seconds with epsilon 1.5s // sleep for 10 seconds with epsilon 1.5s
time::sleep(DUR_WITH_EPSILON).await; time::sleep(DUR_WITH_EPSILON).await;
// temporarily unlock the the file
flock.unlock().unwrap();
// we should get an empty map // we should get an empty map
let saved = diskstore::test_deserialize(fs::read("bgsave_test_1.bin").unwrap()).unwrap(); let saved = diskstore::test_deserialize(fs::read(BGSAVE_DIRECTORY_TESTING_LOC).unwrap()).unwrap();
assert!(saved.len() == 0); assert!(saved.len() == 0);
// now relock the file
flock.relock().unwrap();
// now let's quickly write some data // now let's quickly write some data
{ {
datahandle.acquire_write().unwrap().get_mut_ref().insert( datahandle.acquire_write().unwrap().get_mut_ref().insert(
@ -76,27 +72,22 @@ mod bgsave {
// sleep for 10 seconds with epsilon 1.5s // sleep for 10 seconds with epsilon 1.5s
time::sleep(DUR_WITH_EPSILON).await; time::sleep(DUR_WITH_EPSILON).await;
// we should get a map with the one key // we should get a map with the one key
flock.unlock().unwrap(); let saved = diskstore::test_deserialize(fs::read(BGSAVE_DIRECTORY_TESTING_LOC).unwrap()).unwrap();
let saved = diskstore::test_deserialize(fs::read("bgsave_test_1.bin").unwrap()).unwrap();
assert_eq!(saved, map_should_be_with_one); assert_eq!(saved, map_should_be_with_one);
flock.relock().unwrap();
// now let's remove all the data // now let's remove all the data
{ {
datahandle.acquire_write().unwrap().get_mut_ref().clear(); datahandle.acquire_write().unwrap().get_mut_ref().clear();
} }
// sleep for 10 seconds with epsilon 1.5s // sleep for 10 seconds with epsilon 1.5s
time::sleep(DUR_WITH_EPSILON).await; time::sleep(DUR_WITH_EPSILON).await;
flock.unlock().unwrap(); let saved = diskstore::test_deserialize(fs::read(BGSAVE_DIRECTORY_TESTING_LOC).unwrap()).unwrap();
let saved = diskstore::test_deserialize(fs::read("bgsave_test_1.bin").unwrap()).unwrap();
assert!(saved.len() == 0); assert!(saved.len() == 0);
flock.relock().unwrap();
// drop the signal; all waiting tasks can now terminate // drop the signal; all waiting tasks can now terminate
drop(signal); drop(signal);
handle.await.unwrap().unlock().unwrap(); handle.await.unwrap();
drop(flock);
// check the file again after unlocking // check the file again after unlocking
let saved = diskstore::test_deserialize(fs::read("bgsave_test_1.bin").unwrap()).unwrap(); let saved = diskstore::test_deserialize(fs::read(BGSAVE_DIRECTORY_TESTING_LOC).unwrap()).unwrap();
assert!(saved.len() == 0); assert!(saved.len() == 0);
fs::remove_file("bgsave_test_1.bin").unwrap(); fs::remove_file(BGSAVE_DIRECTORY_TESTING_LOC).unwrap();
} }
} }

Loading…
Cancel
Save