|
|
|
/*
|
|
|
|
* Created on Sat Jun 26 2021
|
|
|
|
*
|
|
|
|
* This file is a part of Skytable
|
|
|
|
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
|
|
|
|
* NoSQL database written by Sayan Nandan ("the Author") with the
|
|
|
|
* vision to provide flexibility in data modelling without compromising
|
|
|
|
* on performance, queryability or scalability.
|
|
|
|
*
|
|
|
|
* Copyright (c) 2021, Sayan Nandan <ohsayan@outlook.com>
|
|
|
|
*
|
|
|
|
* This program is free software: you can redistribute it and/or modify
|
|
|
|
* it under the terms of the GNU Affero General Public License as published by
|
|
|
|
* the Free Software Foundation, either version 3 of the License, or
|
|
|
|
* (at your option) any later version.
|
|
|
|
*
|
|
|
|
* This program is distributed in the hope that it will be useful,
|
|
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
|
* GNU Affero General Public License for more details.
|
|
|
|
*
|
|
|
|
* You should have received a copy of the GNU Affero General Public License
|
|
|
|
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
|
|
*
|
|
|
|
*/
|
|
|
|
|
|
|
|
use crate::config::ConfigurationSet;
|
|
|
|
use crate::config::SnapshotConfig;
|
|
|
|
use crate::config::SnapshotPref;
|
|
|
|
use crate::corestore::Corestore;
|
|
|
|
use crate::dbnet::{self, Terminator};
|
|
|
|
use crate::diskstore::flock::FileLock;
|
|
|
|
use crate::services;
|
|
|
|
use crate::storage::v1::sengine::SnapshotEngine;
|
|
|
|
use crate::util::os::TerminationSignal;
|
|
|
|
use libsky::util::terminal;
|
|
|
|
use std::sync::Arc;
|
|
|
|
use std::thread::sleep;
|
|
|
|
use tokio::{
|
|
|
|
sync::{
|
|
|
|
broadcast,
|
|
|
|
mpsc::{self, Sender},
|
|
|
|
},
|
|
|
|
task::{self, JoinHandle},
|
|
|
|
time::Duration,
|
|
|
|
};
|
|
|
|
|
|
|
|
const TERMSIG_THRESHOLD: usize = 3;
|
|
|
|
|
|
|
|
/// Start the server waiting for incoming connections or a termsig
|
|
|
|
pub async fn run(
|
|
|
|
ConfigurationSet {
|
|
|
|
ports,
|
|
|
|
bgsave,
|
|
|
|
snapshot,
|
|
|
|
maxcon,
|
|
|
|
..
|
|
|
|
}: ConfigurationSet,
|
|
|
|
restore_filepath: Option<String>,
|
|
|
|
) -> Result<Corestore, String> {
|
|
|
|
// Intialize the broadcast channel
|
|
|
|
let (signal, _) = broadcast::channel(1);
|
|
|
|
let engine;
|
|
|
|
match &snapshot {
|
|
|
|
SnapshotConfig::Enabled(SnapshotPref { atmost, .. }) => {
|
|
|
|
engine = SnapshotEngine::new(*atmost);
|
|
|
|
engine
|
|
|
|
.parse_dir()
|
|
|
|
.map_err(|e| format!("Failed to init snapshot engine: {}", e))?;
|
|
|
|
}
|
|
|
|
SnapshotConfig::Disabled => {
|
|
|
|
engine = SnapshotEngine::new_disabled();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
let engine = Arc::new(engine);
|
|
|
|
// restore data
|
|
|
|
services::restore_data(restore_filepath)
|
|
|
|
.map_err(|e| format!("Failed to restore data from backup with error: {}", e))?;
|
|
|
|
let db = Corestore::init_with_snapcfg(engine.clone())
|
|
|
|
.map_err(|e| format!("Error while initializing database: {}", e))?;
|
|
|
|
|
|
|
|
// initialize the background services
|
|
|
|
let bgsave_handle = tokio::spawn(services::bgsave::bgsave_scheduler(
|
|
|
|
db.clone(),
|
|
|
|
bgsave,
|
|
|
|
Terminator::new(signal.subscribe()),
|
|
|
|
));
|
|
|
|
let snapshot_handle = tokio::spawn(services::snapshot::snapshot_service(
|
|
|
|
engine,
|
|
|
|
db.clone(),
|
|
|
|
snapshot,
|
|
|
|
Terminator::new(signal.subscribe()),
|
|
|
|
));
|
|
|
|
|
|
|
|
// start the server (single or multiple listeners)
|
|
|
|
let mut server = dbnet::connect(ports, maxcon, db.clone(), signal.clone()).await?;
|
|
|
|
|
|
|
|
let termsig = TerminationSignal::init().map_err(|e| e.to_string())?;
|
|
|
|
tokio::select! {
|
|
|
|
_ = server.run_server() => {},
|
|
|
|
_ = termsig => {}
|
|
|
|
}
|
|
|
|
|
|
|
|
log::info!("Signalling all workers to shut down");
|
|
|
|
// drop the signal and let others exit
|
|
|
|
drop(signal);
|
|
|
|
server.finish_with_termsig().await;
|
|
|
|
|
|
|
|
// wait for the background services to terminate
|
|
|
|
let _ = snapshot_handle.await;
|
|
|
|
let _ = bgsave_handle.await;
|
|
|
|
Ok(db)
|
|
|
|
}
|
|
|
|
|
|
|
|
fn spawn_task(tx: Sender<bool>, db: Corestore, do_sleep: bool) -> JoinHandle<()> {
|
|
|
|
task::spawn_blocking(move || {
|
|
|
|
if do_sleep {
|
|
|
|
log::info!("Waiting for 10 seconds before retrying ...");
|
|
|
|
sleep(Duration::from_secs(10));
|
|
|
|
}
|
|
|
|
let ret = match crate::services::bgsave::run_bgsave(&db) {
|
|
|
|
Ok(()) => {
|
|
|
|
log::info!("Save before termination successful");
|
|
|
|
true
|
|
|
|
}
|
|
|
|
Err(e) => {
|
|
|
|
log::error!("Failed to run save on termination: {e}");
|
|
|
|
false
|
|
|
|
}
|
|
|
|
};
|
|
|
|
tx.blocking_send(ret).expect("Receiver dropped");
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn finalize_shutdown(corestore: Corestore, pid_file: FileLock) {
|
|
|
|
assert_eq!(
|
|
|
|
corestore.strong_count(),
|
|
|
|
1,
|
|
|
|
"Correctness error. finalize_shutdown called before dropping server runtime"
|
|
|
|
);
|
|
|
|
let rt = tokio::runtime::Builder::new_multi_thread()
|
|
|
|
.thread_name("server-final")
|
|
|
|
.enable_all()
|
|
|
|
.build()
|
|
|
|
.unwrap();
|
|
|
|
let dbc = corestore.clone();
|
|
|
|
let mut okay: bool = rt.block_on(async move {
|
|
|
|
let db = dbc;
|
|
|
|
let (tx, mut rx) = mpsc::channel::<bool>(1);
|
|
|
|
spawn_task(tx.clone(), db.clone(), false);
|
|
|
|
let spawn_again = || {
|
|
|
|
// we failed, so we better sleep
|
|
|
|
// now spawn it again to see the state
|
|
|
|
spawn_task(tx.clone(), db.clone(), true)
|
|
|
|
};
|
|
|
|
let mut threshold = TERMSIG_THRESHOLD;
|
|
|
|
loop {
|
|
|
|
let termsig = match TerminationSignal::init().map_err(|e| e.to_string()) {
|
|
|
|
Ok(sig) => sig,
|
|
|
|
Err(e) => {
|
|
|
|
log::error!("Failed to bind to signal with error: {e}");
|
|
|
|
crate::exit_error();
|
|
|
|
}
|
|
|
|
};
|
|
|
|
tokio::select! {
|
|
|
|
ret = rx.recv() => {
|
|
|
|
if ret.unwrap() {
|
|
|
|
// that's good to go
|
|
|
|
break true;
|
|
|
|
} else {
|
|
|
|
spawn_again();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
_ = termsig => {
|
|
|
|
threshold -= 1;
|
|
|
|
if threshold == 0 {
|
|
|
|
log::error!("Termination signal received but failed to flush data. Quitting because threshold exceeded");
|
|
|
|
break false;
|
|
|
|
} else {
|
|
|
|
log::error!("Termination signal received but failed to flush data. Threshold is at {threshold}");
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
},
|
|
|
|
}
|
|
|
|
}
|
|
|
|
});
|
|
|
|
okay &= services::pre_shutdown_cleanup(pid_file, Some(corestore.get_store()));
|
|
|
|
if okay {
|
|
|
|
terminal::write_success("Goodbye :)").unwrap()
|
|
|
|
} else {
|
|
|
|
log::error!("Didn't terminate successfully");
|
|
|
|
crate::exit_error();
|
|
|
|
}
|
|
|
|
}
|