Implement `mksnap` action

next
Sayan Nandan 4 years ago
parent c31788754c
commit 3b3d2c6994
No known key found for this signature in database
GPG Key ID: C31EFD7DDA12AEE0

@ -130,9 +130,9 @@ pub struct ConfigKeySnapshot {
///
pub struct SnapshotPref {
/// Capture a snapshot `every` seconds
every: u64,
pub every: u64,
/// The maximum numeber of snapshots to be kept
atmost: usize,
pub atmost: usize,
}
impl SnapshotPref {

@ -23,6 +23,7 @@
use crate::config::BGSave;
use crate::config::SnapshotConfig;
use crate::config::SnapshotPref;
use crate::diskstore;
use crate::protocol::Connection;
use crate::protocol::Query;
@ -55,6 +56,11 @@ pub struct CoreDB {
/// This should **not be changed** during runtime, and should only be initialized when `CoreDB`
/// is first initialized
background_tasks: usize,
/// The number of snapshots that are to be kept at the most
///
/// If this is set to Some(0), then all the snapshots will be kept. Otherwise, if it is set to
/// Some(n), n ∈ Z<sup>+</sup> — then _n_ snapshots will be kept at the maximum. If set to `None`, snapshotting is disabled.
pub snap_count: Option<usize>,
}
/// A shared _state_
@ -150,6 +156,11 @@ impl CoreDB {
}
}
/// Check if snapshotting is enabled
pub fn is_snapshot_enabled(&self) -> bool {
self.snap_count.is_some()
}
/// Returns the expected `Arc::strong_count` for the `CoreDB` object
pub const fn expected_strong_count(&self) -> usize {
self.background_tasks + 1
@ -172,8 +183,15 @@ impl CoreDB {
/// If it is - it restores the data. Otherwise it creates a new in-memory table
pub fn new(bgsave: BGSave, snapshot_cfg: SnapshotConfig) -> TResult<Self> {
let coretable = diskstore::get_saved(Some(PERSIST_FILE.to_path_buf()))?;
let background_tasks: usize =
snapshot_cfg.is_enabled() as usize + !bgsave.is_disabled() as usize;
let mut background_tasks: usize = 0;
if !bgsave.is_disabled() {
background_tasks += 1;
}
let mut snap_count = None;
if let SnapshotConfig::Enabled(SnapshotPref { every: _, atmost }) = snapshot_cfg {
background_tasks += 1;
snap_count = Some(atmost);
}
let db = if let Some(coretable) = coretable {
CoreDB {
shared: Arc::new(Shared {
@ -185,9 +203,10 @@ impl CoreDB {
snapshot_service: Notify::new(),
}),
background_tasks,
snap_count,
}
} else {
CoreDB::new_empty(background_tasks)
CoreDB::new_empty(background_tasks, snap_count)
};
// Spawn the background save task in a separate task
tokio::spawn(diskstore::bgsave_scheduler(db.clone(), bgsave));
@ -199,7 +218,7 @@ impl CoreDB {
Ok(db)
}
/// Create an empty in-memory table
pub fn new_empty(background_tasks: usize) -> Self {
pub fn new_empty(background_tasks: usize, snap_count: Option<usize>) -> Self {
CoreDB {
shared: Arc::new(Shared {
bgsave_task: Notify::new(),
@ -210,6 +229,7 @@ impl CoreDB {
snapshot_service: Notify::new(),
}),
background_tasks,
snap_count,
}
}
/// Acquire a write lock

@ -142,18 +142,21 @@ impl<'a> SnapshotEngine<'a> {
Utc::now().format("%Y%m%d-%H%M%S.snapshot").to_string()
}
/// Create a snapshot
pub fn mksnap(&mut self) -> bool {
///
/// This returns `Some(true)` if everything went well, otherwise it returns
/// `Some(false)`. If the database is about to terminate, it returns `None`
pub fn mksnap(&mut self) -> Option<bool> {
let rlock = self.dbref.acquire_read();
if rlock.terminate {
// The database is shutting down, don't create a snapshot
return false;
return None;
}
let mut snapname = PathBuf::new();
snapname.push(&self.snap_dir);
snapname.push(self.get_snapname());
if let Err(e) = diskstore::flush_data(&snapname, &rlock.get_ref()) {
log::error!("Snapshotting failed with error: '{}'", e);
return true;
return Some(false);
} else {
log::info!("Successfully created snapshot");
}
@ -166,11 +169,12 @@ impl<'a> SnapshotEngine<'a> {
old_snapshot.to_string_lossy(),
e
);
return Some(false);
} else {
log::info!("Successfully removed old snapshot");
}
}
true
Some(true)
}
/// Delete all snapshots
pub fn clearall(&mut self) -> TResult<()> {
@ -188,7 +192,7 @@ impl<'a> SnapshotEngine<'a> {
#[test]
fn test_snapshot() {
let ourdir = "TEST_SS";
let db = CoreDB::new_empty(3);
let db = CoreDB::new_empty(3, None);
let mut write = db.acquire_write();
let _ = write.get_mut_ref().insert(
String::from("ohhey"),
@ -210,16 +214,16 @@ fn test_snapshot() {
#[test]
fn test_pre_existing_snapshots() {
let ourdir = "TEST_PX_SS";
let db = CoreDB::new_empty(3);
let db = CoreDB::new_empty(3, None);
let mut snapengine = SnapshotEngine::new(4, &db, Some(ourdir)).unwrap();
// Keep sleeping to ensure the time difference
assert!(snapengine.mksnap());
assert!(snapengine.mksnap().unwrap().eq(&true));
std::thread::sleep(Duration::from_secs(2));
assert!(snapengine.mksnap());
assert!(snapengine.mksnap().unwrap().eq(&true));
std::thread::sleep(Duration::from_secs(2));
assert!(snapengine.mksnap());
assert!(snapengine.mksnap().unwrap().eq(&true));
std::thread::sleep(Duration::from_secs(2));
assert!(snapengine.mksnap());
assert!(snapengine.mksnap().unwrap().eq(&true));
// Now close everything down
drop(snapengine);
let mut snapengine = SnapshotEngine::new(4, &db, Some(ourdir)).unwrap();
@ -261,7 +265,7 @@ pub async fn snapshot_service(handle: CoreDB, ss_config: SnapshotConfig) {
}
};
while !handle.shared.is_termsig() {
if sengine.mksnap() {
if sengine.mksnap().is_some() {
tokio::select! {
_ = time::delay_until(time::Instant::now() + duration) => {},
_ = handle.shared.bgsave_task.notified() => {}

@ -20,14 +20,71 @@
*/
use crate::coredb::CoreDB;
use crate::diskstore::snapshot::SnapshotEngine;
use crate::protocol::{responses, ActionGroup, Connection};
use crate::resp::GroupBegin;
use libtdb::terrapipe::RespCodes;
use libtdb::TResult;
use std::hint::unreachable_unchecked;
/// Create a snapshot
///
/// If there is a second argument: a separate snapshot is created. Otherwise
/// if the action is just `MKSNAP`, then the `maxtop` parameter is adhered to
pub async fn mksnap(handle: &CoreDB, con: &mut Connection, act: ActionGroup) -> TResult<()> {
todo!()
if !handle.is_snapshot_enabled() {
// Since snapshotting is disabled, we can't create a snapshot!
// We'll just return an error returning the same
let error = "Snapshotting is disabled";
con.write_response(GroupBegin(1)).await?;
let error = RespCodes::OtherError(Some(error.to_string()));
return con.write_response(error).await;
}
let howmany = act.howmany();
if howmany == 0 {
// We will just follow the standard convention of creating snapshots
let mut was_engine_error = false;
let mut outerror = None;
{
let snapengine = SnapshotEngine::new(
handle
.snap_count
.unwrap_or_else(|| unsafe { unreachable_unchecked() }),
&handle,
None,
);
if snapengine.is_err() {
was_engine_error = true;
}
let mut snapengine = snapengine.unwrap_or_else(|_| unsafe { unreachable_unchecked() });
outerror = snapengine.mksnap();
}
if was_engine_error {
return con
.write_response(responses::fresp::R_SERVER_ERR.to_owned())
.await;
}
if let Some(val) = outerror {
if val {
// Snapshotting succeeded, return Okay
return con
.write_response(responses::fresp::R_OKAY.to_owned())
.await;
} else {
// Nope, something happened while creating a snapshot
// return a server error
return con
.write_response(responses::fresp::R_SERVER_ERR.to_owned())
.await;
}
} else {
// We shouldn't ever reach here if all our logic is correct
// but if we do, something is wrong with the runtime
con.write_response(GroupBegin(1)).await?;
let error = RespCodes::OtherError(Some("access-after-termsig".to_owned()));
return con.write_response(error).await;
}
} else {
return con
.write_response(responses::fresp::R_ACTION_ERR.to_owned())
.await;
}
}

@ -31,7 +31,7 @@ pub mod get;
pub mod jget;
pub mod keylen;
pub mod mget;
mod mksnap;
pub mod mksnap;
pub mod mset;
pub mod mupdate;
pub mod set;

@ -61,6 +61,8 @@ mod tags {
pub const TAG_USET: &'static str = "USET";
/// `KEYLEN` action tag
pub const TAG_KEYLEN: &'static str = "KEYLEN";
/// `MKSNAP` action tag
pub const TAG_MKSNAP: &'static str = "MKSNAP";
}
/// Execute a simple(*) query
@ -90,6 +92,7 @@ pub async fn execute_simple(db: &CoreDB, con: &mut Connection, buf: ActionGroup)
tags::TAG_FLUSHDB => kvengine::flushdb::flushdb(db, con, buf).await?,
tags::TAG_USET => kvengine::uset::uset(db, con, buf).await?,
tags::TAG_KEYLEN => kvengine::keylen::keylen(db, con, buf).await?,
tags::TAG_MKSNAP => kvengine::mksnap::mksnap(db, con, buf).await?,
_ => {
con.write_response(responses::fresp::R_UNKNOWN_ACTION.to_owned())
.await?

Loading…
Cancel
Save