Make sure that only one snapshot is made at a time

`CoreDB` now has a `SnapshotStatus` object for this purpose
This object holds the current state of the service in an `RWLock`
When `mksnap` is called, `SnapEngine` sets `in_progress` to true
When the snapshot is created, `SnapEngine` sets `in_progress` to false
This prevents multiple entities from creating snapshots at the same time
next
Sayan Nandan 4 years ago
parent 037064e8d9
commit 4ce95c6d42
No known key found for this signature in database
GPG Key ID: C31EFD7DDA12AEE0

@ -60,7 +60,48 @@ pub struct CoreDB {
///
/// If this is set to Some(0), then all the snapshots will be kept. Otherwise, if it is set to
/// Some(n), n ∈ Z<sup>+</sup> — then _n_ snapshots will be kept at the maximum. If set to `None`, snapshotting is disabled.
pub snap_count: Option<usize>,
pub snapcfg: Arc<Option<SnapshotStatus>>,
}
/// The status and details of the snapshotting service
///
/// The in_progress field is kept behind a mutex to ensure only one snapshot
/// operation can run at a time. Although on the server side this isn't a problem
/// because we don't have multiple snapshot tasks, but can be an issue when external
/// snapshots are triggered, for example via `MKSNAP`
#[derive(Debug)]
pub struct SnapshotStatus {
/// The maximum number of recent snapshots to keep
pub max: usize,
/// The current state of the snapshot service
pub in_progress: RwLock<bool>,
}
impl SnapshotStatus {
/// Create a new `SnapshotStatus` instance with preset values
///
/// **Note: ** The initial state of the snapshot service is set to false
pub fn new(max: usize) -> Self {
SnapshotStatus {
max,
in_progress: RwLock::new(false),
}
}
/// Set `in_progress` to true
pub fn lock_snap(&self) {
*self.in_progress.write() = true;
}
/// Set `in_progress` to false
pub fn unlock_snap(&self) {
*self.in_progress.write() = false;
}
/// Check if `in_progress` is set to true
pub fn is_busy(&self) -> bool {
*self.in_progress.read()
}
}
/// A shared _state_
@ -158,7 +199,23 @@ impl CoreDB {
/// Check if snapshotting is enabled
pub fn is_snapshot_enabled(&self) -> bool {
self.snap_count.is_some()
self.snapcfg.is_some()
}
/// Mark the snapshotting service to be busy
///
/// ## Panics
/// If snapshotting is disabled, this will panic
pub fn lock_snap(&self) {
(*self.snapcfg).as_ref().unwrap().lock_snap();
}
/// Mark the snapshotting service to be free
///
/// ## Panics
/// If snapshotting is disabled, this will panic
pub fn unlock_snap(&self) {
(*self.snapcfg).as_ref().unwrap().unlock_snap();
}
/// Returns the expected `Arc::strong_count` for the `CoreDB` object
@ -192,6 +249,11 @@ impl CoreDB {
background_tasks += 1;
snap_count = Some(atmost);
}
let snapcfg = if let Some(max) = snap_count {
Arc::new(Some(SnapshotStatus::new(max)))
} else {
Arc::new(None)
};
let db = if let Some(coretable) = coretable {
CoreDB {
shared: Arc::new(Shared {
@ -203,10 +265,10 @@ impl CoreDB {
snapshot_service: Notify::new(),
}),
background_tasks,
snap_count,
snapcfg,
}
} else {
CoreDB::new_empty(background_tasks, snap_count)
CoreDB::new_empty(background_tasks, snapcfg)
};
// Spawn the background save task in a separate task
tokio::spawn(diskstore::bgsave_scheduler(db.clone(), bgsave));
@ -218,7 +280,7 @@ impl CoreDB {
Ok(db)
}
/// Create an empty in-memory table
pub fn new_empty(background_tasks: usize, snap_count: Option<usize>) -> Self {
pub fn new_empty(background_tasks: usize, snapcfg: Arc<Option<SnapshotStatus>>) -> Self {
CoreDB {
shared: Arc::new(Shared {
bgsave_task: Notify::new(),
@ -229,7 +291,7 @@ impl CoreDB {
snapshot_service: Notify::new(),
}),
background_tasks,
snap_count,
snapcfg,
}
}
/// Acquire a write lock

@ -28,6 +28,7 @@ use chrono::prelude::*;
use libtdb::TResult;
use regex::Regex;
use std::fs;
use std::hint::unreachable_unchecked;
use std::io::ErrorKind;
use std::path::PathBuf;
lazy_static::lazy_static! {
@ -144,10 +145,31 @@ impl<'a> SnapshotEngine<'a> {
/// Create a snapshot
///
/// This returns `Some(true)` if everything went well, otherwise it returns
/// `Some(false)`. If the database is about to terminate, it returns `None`
/// `Some(false)`. If the database is about to terminate, it returns `None`.
///
/// ## Nature
///
/// This function is **blocking in nature** since it waits for the snapshotting service
/// to be free. It's best to check if the snapshotting service is busy by using the function `coredb.snapcfg.is_busy()`
///
///
/// ## Panics
/// If snapshotting is disabled in `CoreDB` then this will panic badly! It
/// may not even panic: but terminate abruptly with _invalid instruction_
pub fn mksnap(&mut self) -> Option<bool> {
log::trace!("Snapshotting was initiated");
while (*self.dbref.snapcfg)
.as_ref()
.unwrap_or_else(|| unsafe { unreachable_unchecked() })
.is_busy()
{
// Endlessly wait for a lock to be free
}
log::trace!("Acquired a lock on the snapshot service");
self.dbref.lock_snap(); // Set the snapshotting service to be busy
let rlock = self.dbref.acquire_read();
if rlock.terminate {
self.dbref.unlock_snap();
// The database is shutting down, don't create a snapshot
return None;
}
@ -156,6 +178,8 @@ impl<'a> SnapshotEngine<'a> {
snapname.push(self.get_snapname());
if let Err(e) = diskstore::flush_data(&snapname, &rlock.get_ref()) {
log::error!("Snapshotting failed with error: '{}'", e);
self.dbref.unlock_snap();
log::trace!("Released lock on the snapshot service");
return Some(false);
} else {
log::info!("Successfully created snapshot");
@ -169,11 +193,15 @@ impl<'a> SnapshotEngine<'a> {
old_snapshot.to_string_lossy(),
e
);
self.dbref.unlock_snap();
log::trace!("Released lock on the snapshot service");
return Some(false);
} else {
log::info!("Successfully removed old snapshot");
}
}
self.dbref.unlock_snap();
log::trace!("Released lock on the snapshot service");
Some(true)
}
/// Delete all snapshots
@ -192,7 +220,7 @@ impl<'a> SnapshotEngine<'a> {
#[test]
fn test_snapshot() {
let ourdir = "TEST_SS";
let db = CoreDB::new_empty(3, None);
let db = CoreDB::new_empty(3, std::sync::Arc::new(None));
let mut write = db.acquire_write();
let _ = write.get_mut_ref().insert(
String::from("ohhey"),
@ -214,7 +242,7 @@ fn test_snapshot() {
#[test]
fn test_pre_existing_snapshots() {
let ourdir = "TEST_PX_SS";
let db = CoreDB::new_empty(3, None);
let db = CoreDB::new_empty(3, std::sync::Arc::new(None));
let mut snapengine = SnapshotEngine::new(4, &db, Some(ourdir)).unwrap();
// Keep sleeping to ensure the time difference
assert!(snapengine.mksnap().unwrap().eq(&true));

@ -43,20 +43,24 @@ pub async fn mksnap(handle: &CoreDB, con: &mut Connection, act: ActionGroup) ->
// We will just follow the standard convention of creating snapshots
let mut was_engine_error = false;
let mut outerror = None;
let mut engine_was_busy = false;
{
let snapengine = SnapshotEngine::new(
handle
.snap_count
.unwrap_or_else(|| unsafe { unreachable_unchecked() }),
&handle,
None,
);
let snaphandle = handle.snapcfg.clone();
let snapstatus = (*snaphandle)
.as_ref()
.unwrap_or_else(|| unsafe { unreachable_unchecked() });
let snapengine = SnapshotEngine::new(snapstatus.max, &handle, None);
if snapengine.is_err() {
was_engine_error = true;
} else {
let mut snapengine =
snapengine.unwrap_or_else(|_| unsafe { unreachable_unchecked() });
outerror = snapengine.mksnap();
if snapstatus.is_busy() {
engine_was_busy = true;
} else {
let mut snapengine =
snapengine.unwrap_or_else(|_| unsafe { unreachable_unchecked() });
outerror = snapengine.mksnap();
}
}
}
if was_engine_error {
@ -64,6 +68,11 @@ pub async fn mksnap(handle: &CoreDB, con: &mut Connection, act: ActionGroup) ->
.write_response(responses::fresp::R_SERVER_ERR.to_owned())
.await;
}
if engine_was_busy {
con.write_response(GroupBegin(1)).await?;
let error = RespCodes::OtherError(Some("Snapshotting already in progress".to_owned()));
return con.write_response(error).await;
}
if let Some(val) = outerror {
if val {
// Snapshotting succeeded, return Okay

@ -42,10 +42,14 @@ macro_rules! __func__ {
}};
}
async fn start_test_server(port: u16) -> SocketAddr {
async fn start_test_server(port: u16, db: Option<CoreDB>) -> SocketAddr {
let mut socket = String::from("127.0.0.1:");
socket.push_str(&port.to_string());
let db = CoreDB::new(BGSave::Disabled, SnapshotConfig::default()).unwrap();
let db = if let Some(db) = db {
db
} else {
CoreDB::new(BGSave::Disabled, SnapshotConfig::default()).unwrap()
};
let listener = TcpListener::bind(socket)
.await
.expect(&format!("Failed to bind to port {}", port));

@ -47,7 +47,7 @@ fn parse_dbtest(mut input: syn::ItemFn, rand: u16) -> Result<TokenStream, syn::E
}
sig.asyncness = None;
let body = quote! {
let addr = crate::tests::start_test_server(#rand).await;
let addr = crate::tests::start_test_server(#rand, None).await;
let mut stream = tokio::net::TcpStream::connect(&addr).await.unwrap();
#body
stream.shutdown(::std::net::Shutdown::Write).unwrap();

Loading…
Cancel
Save