Fix backup restoration and simplify `arbiter::run`

next
Sayan Nandan 3 years ago
parent 5c5fe3573f
commit 36b02e9099
No known key found for this signature in database
GPG Key ID: 8BC07A0A4D41DD52

@ -24,14 +24,13 @@
* *
*/ */
use crate::config::BGSave; use crate::config::ConfigurationSet;
use crate::config::SnapshotConfig; use crate::config::SnapshotConfig;
use crate::config::SnapshotPref; use crate::config::SnapshotPref;
use crate::corestore::Corestore; use crate::corestore::Corestore;
use crate::dbnet::{self, Terminator}; use crate::dbnet::{self, Terminator};
use crate::services; use crate::services;
use crate::storage::sengine::SnapshotEngine; use crate::storage::sengine::SnapshotEngine;
use crate::PortConfig;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::broadcast; use tokio::sync::broadcast;
@ -65,16 +64,19 @@ impl Future for UnixTerminationSignal {
/// Start the server waiting for incoming connections or a termsig /// Start the server waiting for incoming connections or a termsig
pub async fn run( pub async fn run(
ports: PortConfig, ConfigurationSet {
bgsave_cfg: BGSave, ports,
snapshot_cfg: SnapshotConfig, bgsave,
_restore_filepath: Option<String>, snapshot,
maxcon: usize, maxcon,
..
}: ConfigurationSet,
restore_filepath: Option<String>,
) -> Result<Corestore, String> { ) -> Result<Corestore, String> {
// Intialize the broadcast channel // Intialize the broadcast channel
let (signal, _) = broadcast::channel(1); let (signal, _) = broadcast::channel(1);
let engine; let engine;
match &snapshot_cfg { match &snapshot {
SnapshotConfig::Enabled(SnapshotPref { atmost, .. }) => { SnapshotConfig::Enabled(SnapshotPref { atmost, .. }) => {
engine = SnapshotEngine::new(*atmost); engine = SnapshotEngine::new(*atmost);
engine engine
@ -86,19 +88,22 @@ pub async fn run(
} }
} }
let engine = Arc::new(engine); let engine = Arc::new(engine);
// restore data
services::restore_data(restore_filepath)
.map_err(|e| format!("Failed to restore data from backup with error: {}", e))?;
let db = Corestore::init_with_snapcfg(engine.clone()) let db = Corestore::init_with_snapcfg(engine.clone())
.map_err(|e| format!("Error while initializing database: {}", e))?; .map_err(|e| format!("Error while initializing database: {}", e))?;
// initialize the background services // initialize the background services
let bgsave_handle = tokio::spawn(services::bgsave::bgsave_scheduler( let bgsave_handle = tokio::spawn(services::bgsave::bgsave_scheduler(
db.clone(), db.clone(),
bgsave_cfg, bgsave,
Terminator::new(signal.subscribe()), Terminator::new(signal.subscribe()),
)); ));
let snapshot_handle = tokio::spawn(services::snapshot::snapshot_service( let snapshot_handle = tokio::spawn(services::snapshot::snapshot_service(
engine, engine,
db.clone(), db.clone(),
snapshot_cfg, snapshot,
Terminator::new(signal.subscribe()), Terminator::new(signal.subscribe()),
)); ));

@ -252,7 +252,7 @@ pub async fn connect(
db: Corestore, db: Corestore,
signal: broadcast::Sender<()>, signal: broadcast::Sender<()>,
) -> Result<MultiListener, String> { ) -> Result<MultiListener, String> {
let climit = Arc::new(Semaphore::const_new(maxcon)); let climit = Arc::new(Semaphore::new(maxcon));
let server = match ports { let server = match ports {
PortConfig::InsecureOnly { host, port } => MultiListener::new_insecure_only( PortConfig::InsecureOnly { host, port } => MultiListener::new_insecure_only(
BaseListener::init(&db, host, port, climit.clone(), signal.clone()) BaseListener::init(&db, host, port, climit.clone(), signal.clone())

@ -95,22 +95,13 @@ fn main() {
.enable_all() .enable_all()
.build() .build()
.unwrap(); .unwrap();
let (ports, bgsave_config, snapshot_config, restore_filepath, maxcon) = let (cfg, restore_file) = check_args_and_get_cfg();
check_args_and_get_cfg();
// check if any other process is using the data directory and lock it if not (else error) // check if any other process is using the data directory and lock it if not (else error)
// important: create the pid_file just here and nowhere else because check_args can also // important: create the pid_file just here and nowhere else because check_args can also
// involve passing --help or wrong arguments which can falsely create a PID file // involve passing --help or wrong arguments which can falsely create a PID file
let pid_file = run_pre_startup_tasks(); let pid_file = run_pre_startup_tasks();
let db: Result<corestore::Corestore, String> = runtime.block_on(async move { let db: Result<corestore::Corestore, String> =
arbiter::run( runtime.block_on(async move { arbiter::run(cfg, restore_file).await });
ports,
bgsave_config,
snapshot_config,
restore_filepath,
maxcon,
)
.await
});
// Make sure all background workers terminate // Make sure all background workers terminate
drop(runtime); drop(runtime);
let db = match db { let db = match db {
@ -162,11 +153,11 @@ pub fn pre_shutdown_cleanup(mut pid_file: FileLock, mr: Option<&Memstore>) {
} }
} }
use self::config::{BGSave, PortConfig, SnapshotConfig}; use self::config::ConfigurationSet;
/// This function checks the command line arguments and either returns a config object /// This function checks the command line arguments and either returns a config object
/// or prints an error to `stderr` and terminates the server /// or prints an error to `stderr` and terminates the server
fn check_args_and_get_cfg() -> (PortConfig, BGSave, SnapshotConfig, Option<String>, usize) { fn check_args_and_get_cfg() -> (ConfigurationSet, Option<String>) {
match config::get_config() { match config::get_config() {
Ok(cfg) => { Ok(cfg) => {
if cfg.is_artful() { if cfg.is_artful() {
@ -181,8 +172,7 @@ fn check_args_and_get_cfg() -> (PortConfig, BGSave, SnapshotConfig, Option<Strin
} }
// print warnings if any // print warnings if any
cfg.print_warnings(); cfg.print_warnings();
let (cfg, restore) = cfg.finish(); cfg.finish()
(cfg.ports, cfg.bgsave, cfg.snapshot, restore, cfg.maxcon)
} }
Err(e) => { Err(e) => {
log::error!("{}", e); log::error!("{}", e);

@ -26,3 +26,13 @@
pub mod bgsave; pub mod bgsave;
pub mod snapshot; pub mod snapshot;
use crate::util::os;
use crate::IoResult;
pub fn restore_data(src: Option<String>) -> IoResult<()> {
if let Some(src) = src {
// hmm, so restore it
os::recursive_copy(src, "data")?;
}
Ok(())
}

@ -78,3 +78,67 @@ mod unix {
let _ = ResourceLimit::get().unwrap(); let _ = ResourceLimit::get().unwrap();
} }
} }
use crate::IoResult;
use std::fs;
use std::path::Path;
/// Recursively copy files from the given `src` to the provided `dest`
pub fn recursive_copy(src: impl AsRef<Path>, dst: impl AsRef<Path>) -> IoResult<()> {
fs::create_dir_all(&dst)?;
for entry in fs::read_dir(src)? {
let entry = entry?;
match entry.file_type()? {
ft if ft.is_dir() => {
// this is a directory, so we'll recursively create it and its contents
recursive_copy(entry.path(), dst.as_ref().join(entry.file_name()))?;
}
_ => {
// this directory has files (or symlinks?)
fs::copy(entry.path(), dst.as_ref().join(entry.file_name()))?;
}
}
}
Ok(())
}
#[test]
fn rcopy_okay() {
let dir_paths = [
"testdata/backups",
"testdata/ks/default",
"testdata/ks/system",
"testdata/rsnaps",
"testdata/snaps",
];
let file_paths = [
"testdata/ks/default/default",
"testdata/ks/default/PARTMAP",
"testdata/ks/PRELOAD",
"testdata/ks/system/PARTMAP",
];
let new_file_paths = [
"my-backups/ks/default/default",
"my-backups/ks/default/PARTMAP",
"my-backups/ks/PRELOAD",
"my-backups/ks/system/PARTMAP",
];
let x = move || -> IoResult<()> {
for dir in dir_paths {
fs::create_dir_all(dir)?;
}
for file in file_paths {
fs::File::create(file)?;
}
Ok(())
};
x().unwrap();
// now copy all files inside testdata/* to my-backups/*
recursive_copy("testdata", "my-backups").unwrap();
new_file_paths
.iter()
.for_each(|path| assert!(Path::new(path).exists()));
// now remove the directories
fs::remove_dir_all("testdata").unwrap();
fs::remove_dir_all("my-backups").unwrap();
}

Loading…
Cancel
Save