Enable BGSAVE to be configured via config file

next
Sayan Nandan 4 years ago
parent 3a84feb14e
commit 511e227f81
No known key found for this signature in database
GPG Key ID: C31EFD7DDA12AEE0

@ -46,7 +46,7 @@ pub struct Config {
/// The BGSAVE configuration
///
#[derive(Deserialize, Debug, PartialEq)]
#[derive(Deserialize, Debug, PartialEq, Clone)]
pub struct BGSave {
/// Whether BGSAVE is enabled or not
///
@ -70,6 +70,12 @@ impl BGSave {
pub const fn default() -> Self {
BGSave::new(true, 120)
}
pub const fn get_duration(&self) -> u64 {
self.every
}
pub const fn is_disabled(&self) -> bool {
!self.enabled
}
}
/// This struct represents the `server` key in the TOML file
@ -96,7 +102,7 @@ pub struct ParsedConfig {
/// If `noart` is set to true, no terminal artwork should be displayed
noart: bool,
/// The BGSAVE configuration
bgsave: BGSave,
pub bgsave: BGSave,
}
impl ParsedConfig {
@ -172,7 +178,7 @@ impl ParsedConfig {
}
}
/// Return a (host, port) tuple which can be bound to with `TcpListener`
pub fn get_host_port_tuple(self) -> impl ToSocketAddrs {
pub fn get_host_port_tuple(&self) -> impl ToSocketAddrs {
((self.host), self.port)
}
/// Returns `false` if `noart` is enabled. Otherwise it returns `true`
@ -271,7 +277,13 @@ pub fn get_config_file_or_return_cfg() -> Result<ConfigType<ParsedConfig>, Confi
let filename = matches.value_of("config");
if let Some(filename) = filename {
match ParsedConfig::new_from_file(filename.to_owned()) {
Ok(cfg) => return Ok(ConfigType::Custom(cfg)),
Ok(cfg) => {
if cfg.bgsave.is_disabled() {
log::warn!("BGSAVE is disabled: If this system crashes unexpectedly, it may lead to the loss of data");
log::warn!("Unused key 'bgsave.every' in configuration file");
}
return Ok(ConfigType::Custom(cfg));
}
Err(e) => return Err(e),
}
} else {

@ -21,6 +21,7 @@
//! # The core database engine
use crate::config::BGSave;
use crate::diskstore;
use crate::protocol::Connection;
use crate::protocol::Query;
@ -32,10 +33,8 @@ use parking_lot::RwLockReadGuard;
use parking_lot::RwLockWriteGuard;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use tokio;
use tokio::sync::Notify;
use tokio::time::Instant;
/// This is a thread-safe database handle, which on cloning simply
/// gives another atomic reference to the `shared` which is a `Shared` object
@ -57,11 +56,12 @@ pub struct Shared {
}
impl Shared {
/// This task performs a `sync`hronous background save operation and returns
/// the duration for which the background thread for BGSAVE must sleep for, before
/// it calls this function again. If the server has received a termination signal
/// then we just return `None`
pub fn run_bgsave_and_get_next_point(&self) -> Option<Instant> {
/// This task performs a `sync`hronous background save operation
///
/// It runs BGSAVE and then returns control to the caller. The caller is responsible
/// for periodically calling BGSAVE. This returns `None`, **if** the database
/// is shutting down
pub fn run_bgsave(&self) -> Option<()> {
let state = self.table.read();
if state.terminate {
return None;
@ -71,7 +71,7 @@ impl Shared {
Ok(_) => log::info!("BGSAVE completed successfully"),
Err(e) => log::error!("BGSAVE failed with error: '{}'", e),
}
Some(Instant::now() + Duration::from_secs(120))
Some(())
}
/// Check if the server has received a termination signal
pub fn is_termsig(&self) -> bool {
@ -151,7 +151,7 @@ impl CoreDB {
///
/// This also checks if a local backup of previously saved data is available.
/// If it is - it restores the data. Otherwise it creates a new in-memory table
pub fn new() -> TResult<Self> {
pub fn new(bgsave: BGSave) -> TResult<Self> {
let coretable = diskstore::get_saved()?;
let db = if let Some(coretable) = coretable {
CoreDB {
@ -175,7 +175,7 @@ impl CoreDB {
}
};
// Spawn the background save task in a separate task
tokio::spawn(diskstore::bgsave_scheduler(db.clone()));
tokio::spawn(diskstore::bgsave_scheduler(db.clone(), bgsave));
Ok(db)
}
/// Acquire a write lock

@ -19,6 +19,7 @@
*
*/
use crate::config::BGSave;
use crate::protocol::{Connection, QueryResult::*};
use crate::CoreDB;
use libtdb::util::terminal;
@ -167,10 +168,10 @@ impl Drop for CHandler {
use std::io::{self, prelude::*};
/// Start the server waiting for incoming connections or a CTRL+C signal
pub async fn run(listener: TcpListener, sig: impl Future) {
pub async fn run(listener: TcpListener, bgsave_cfg: BGSave, sig: impl Future) {
let (signal, _) = broadcast::channel(1);
let (terminate_tx, terminate_rx) = mpsc::channel(1);
let db = match CoreDB::new() {
let db = match CoreDB::new(bgsave_cfg) {
Ok(d) => d,
Err(e) => {
eprintln!("ERROR: {}", e);

@ -21,6 +21,7 @@
//! This module provides tools for handling persistently stored data
use crate::config::BGSave;
use crate::coredb::{self, Data};
use bincode;
use bytes::Bytes;
@ -29,6 +30,7 @@ use std::collections::HashMap;
use std::fs;
use std::io::{ErrorKind, Write};
use std::iter::FromIterator;
use std::time::Duration;
use tokio::time;
type DiskStore = (Vec<String>, Vec<Vec<u8>>);
@ -74,12 +76,17 @@ pub fn flush_data(data: &HashMap<String, Data>) -> TResult<()> {
/// The bgsave_scheduler calls the bgsave task in `CoreDB` after every `dur` which
/// is returned by the `run_bgsave_and_get_next_point()` associated function
pub async fn bgsave_scheduler(handle: coredb::CoreDB) {
pub async fn bgsave_scheduler(handle: coredb::CoreDB, bgsave_cfg: BGSave) {
if bgsave_cfg.is_disabled() {
handle.shared.bgsave_task.notified().await;
return;
}
let duration = Duration::from_secs(bgsave_cfg.get_duration());
while !handle.shared.is_termsig() {
if let Some(dur) = handle.shared.run_bgsave_and_get_next_point() {
if let Some(_) = handle.shared.run_bgsave() {
tokio::select! {
// Sleep until `dur`
_ = time::delay_until(dur) => {}
_ = time::delay_until(time::Instant::now() + duration) => {}
// Otherwise wait for a notification
_ = handle.shared.bgsave_task.notified() => {}
}

@ -19,6 +19,7 @@
*
*/
use crate::config::BGSave;
use tokio::net::TcpListener;
mod config;
use std::env;
@ -63,15 +64,15 @@ async fn main() {
.init();
// Start the server which asynchronously waits for a CTRL+C signal
// which will safely shut down the server
run(check_args_or_connect().await, signal::ctrl_c()).await;
let (tcplistener, bgsave_config) = check_args_or_connect().await;
run(tcplistener, bgsave_config, signal::ctrl_c()).await;
}
/// This function checks the command line arguments and binds to an appropriate
/// port and host, as per the supplied configuration options
async fn check_args_or_connect() -> TcpListener {
use libtdb::util::terminal;
async fn check_args_or_connect() -> (TcpListener, BGSave) {
let cfg = config::get_config_file_or_return_cfg();
let binding = match cfg {
let binding_and_cfg = match cfg {
Ok(config::ConfigType::Custom(cfg)) => {
if cfg.is_artful() {
println!("{}\n{}", TEXT, MSG);
@ -79,21 +80,27 @@ async fn check_args_or_connect() -> TcpListener {
println!("{}", MSG);
}
log::info!("Using settings from config file");
TcpListener::bind(cfg.get_host_port_tuple()).await
(
TcpListener::bind(cfg.get_host_port_tuple()).await,
cfg.bgsave,
)
}
Ok(config::ConfigType::Def(cfg)) => {
println!("{}\n{}", TEXT, MSG);
log::warn!("No configuration file supplied. Using default settings");
TcpListener::bind(cfg.get_host_port_tuple()).await
(
TcpListener::bind(cfg.get_host_port_tuple()).await,
cfg.bgsave,
)
}
Err(e) => {
log::error!("{}", e);
std::process::exit(0x100);
}
};
match binding {
Ok(b) => b,
Err(e) => {
match binding_and_cfg {
(Ok(b), bgsave_cfg) => (b, bgsave_cfg),
(Err(e), _) => {
log::error!("Failed to bind to socket with error: '{}'", e);
std::process::exit(0x100);
}

@ -24,6 +24,7 @@
use crate::coredb::CoreDB;
use crate::dbnet;
use crate::protocol::responses::fresp;
use crate::BGSave;
use libtdb::terrapipe;
use std::future::Future;
use std::net::{Shutdown, SocketAddr};
@ -51,7 +52,7 @@ async fn start_server() -> (Option<SocketAddr>, CoreDB) {
// running, or use it if it is already running, we just return none if we failed
// to bind to the port, since this will _almost_ never happen on our CI
let listener = TcpListener::bind(ADDR).await.unwrap();
let db = CoreDB::new().unwrap();
let db = CoreDB::new(BGSave::default()).unwrap();
let asyncdb = db.clone();
let addr = if let Ok(addr) = listener.local_addr() {
Some(addr)
@ -91,4 +92,4 @@ impl<'a> QueryVec<'a> {
}
self.db.finish_db();
}
}
}

Loading…
Cancel
Save