Fix bug causing bgsave/termsig flush failure if a snapshot flushed it

I came across an interesting scenario that led to the discovery of this bug:
if a keyspace is created before a snapshot cycle and before termsig/bgsave,
and then the snapshot cycle runs, then the subsequent runs of bgsave/termsig
would fail. The reason behind this interesting bug turns out to be rather
trivial.

Since the `flush_full` routine, regardless of storage targets would untrip
the PRELOAD switch, the snapshot target as expected untrips the switch. Now,
this means that a tree re-init is not run by BGSAVE or save on termsig.
As a consequence, while attempting to run bgsave/save on termsig, the new
keyspace directories are not found, resulting in flush failure. This commit
fixes it my simply checking if the target should untrip the switch or not.

Tests for the same were added
next
Sayan Nandan 3 years ago
parent c4eaf3d182
commit 08cffa1d3f
No known key found for this signature in database
GPG Key ID: 8BC07A0A4D41DD52

@ -31,6 +31,8 @@ All changes in this project will be noted in this file.
- (skyd) Fixed new instance detection (now checks if data directory is empty or not)
- (skyd) Fixed panic resulting from corrupted metadata in `PARTMAP`
- (skyd) Fixed invalid pipeline response when action error is propagated from a single stage
- (skyd) Fixed bug where the preload wasn't flushed if a snapshot already flushed it before the
save on termination routine
## Version 0.7.4

@ -0,0 +1,14 @@
[server]
host = "127.0.0.1"
port = 2007
noart = true
[snapshot]
every = 3600
atmost = 4
failsafe = true
[ssl]
key = "../key.pem"
chain = "../cert.pem"
port = 2008

@ -46,6 +46,15 @@ mod svc;
/// Run the test suite
pub fn run_test() -> HarnessResult<()> {
info!("Creating test directories");
for (server_id, _ports) in svc::SERVERS {
fs::create_dir_all(server_id).map_err(|e| {
HarnessError::Other(format!(
"Failed to create `{server_id}` dir with error: {e}"
))
})?;
}
let ret = run_test_inner();
let kill_check = svc::kill_servers();
util::sleep_sec(SLEEP_FOR_TERMINATION);
@ -55,12 +64,13 @@ pub fn run_test() -> HarnessResult<()> {
// clean up
info!("Cleaning up test directories ...");
fs::remove_dir_all("server1").map_err(|e| {
HarnessError::Other(format!("Failed to remove dir `server1` with error: {e}"))
})?;
fs::remove_dir_all("server2").map_err(|e| {
HarnessError::Other(format!("Failed to remove dir `server1` with error: {e}"))
})?;
for (server_id, _ports) in svc::SERVERS {
fs::remove_dir_all(server_id).map_err(|e| {
HarnessError::Other(format!(
"Failed to remove dir `{server_id}` with error: {e}"
))
})?;
}
ret
}
@ -84,12 +94,6 @@ fn run_test_inner() -> HarnessResult<()> {
pkeyfile
.write_all(&pkey.private_key_to_pem_pkcs8().unwrap())
.unwrap();
fs::create_dir_all("server1").map_err(|e| {
HarnessError::Other(format!("Failed to create `server1` dir with error: {e}"))
})?;
fs::create_dir_all("server2").map_err(|e| {
HarnessError::Other(format!("Failed to create `server2` dir with error: {e}"))
})?;
// assemble commands
let target_folder = util::get_target_folder(BuildMode::Debug);

@ -42,12 +42,13 @@ const POWERSHELL_SCRIPT: &str = include_str!("../../../ci/windows/stop.ps1");
#[cfg(windows)]
/// Flag for new console Window
const CREATE_NEW_CONSOLE: u32 = 0x00000010;
pub(super) const SERVERS: [(&str, [u16; 2]); 3] = [
("server1", [2003, 2004]),
("server2", [2005, 2006]),
("server3", [2007, 2008]),
];
/// The test suite server host
const TESTSUITE_SERVER_HOST: &str = "127.0.0.1";
/// The test suite server ports
const TESTSUITE_SERVER_PORTS: [u16; 4] = [2003, 2004, 2005, 2006];
/// The server IDs matching with the configuration files
const SERVER_IDS: [&str; 2] = ["server1", "server2"];
/// The workspace root
const WORKSPACE_ROOT: &str = env!("ROOT_DIR");
@ -89,26 +90,28 @@ fn connection_refused<T>(input: Result<T, IoError>) -> HarnessResult<bool> {
/// Waits for the servers to start up or errors if something unexpected happened
fn wait_for_startup() -> HarnessResult<()> {
info!("Waiting for servers to start up");
for port in TESTSUITE_SERVER_PORTS {
let connection_string = format!("{TESTSUITE_SERVER_HOST}:{port}");
let mut backoff = 1;
let mut con = Connection::new(TESTSUITE_SERVER_HOST, port);
while connection_refused(con)? {
if backoff > 64 {
// enough sleeping, return an error
error!("Server didn't respond in {backoff} seconds. Something is wrong");
return Err(HarnessError::Other(format!(
"Startup backoff elapsed. Server at {connection_string} did not respond."
)));
}
info!(
for (_, ports) in SERVERS {
for port in ports {
let connection_string = format!("{TESTSUITE_SERVER_HOST}:{port}");
let mut backoff = 1;
let mut con = Connection::new(TESTSUITE_SERVER_HOST, port);
while connection_refused(con)? {
if backoff > 64 {
// enough sleeping, return an error
error!("Server didn't respond in {backoff} seconds. Something is wrong");
return Err(HarnessError::Other(format!(
"Startup backoff elapsed. Server at {connection_string} did not respond."
)));
}
info!(
"Server at {connection_string} not started. Sleeping for {backoff} second(s) ..."
);
util::sleep_sec(backoff);
con = Connection::new(TESTSUITE_SERVER_HOST, port);
backoff *= 2;
util::sleep_sec(backoff);
con = Connection::new(TESTSUITE_SERVER_HOST, port);
backoff *= 2;
}
info!("Server at {connection_string} has started");
}
info!("Server at {connection_string} has started");
}
info!("All servers started up");
Ok(())
@ -117,26 +120,28 @@ fn wait_for_startup() -> HarnessResult<()> {
/// Wait for the servers to shutdown, returning an error if something unexpected happens
fn wait_for_shutdown() -> HarnessResult<()> {
info!("Waiting for servers to shut down");
for port in TESTSUITE_SERVER_PORTS {
let connection_string = format!("{TESTSUITE_SERVER_HOST}:{port}");
let mut backoff = 1;
let mut con = Connection::new(TESTSUITE_SERVER_HOST, port);
while !connection_refused(con)? {
if backoff > 64 {
// enough sleeping, return an error
error!("Server didn't shut down within {backoff} seconds. Something is wrong");
return Err(HarnessError::Other(format!(
for (_, ports) in SERVERS {
for port in ports {
let connection_string = format!("{TESTSUITE_SERVER_HOST}:{port}");
let mut backoff = 1;
let mut con = Connection::new(TESTSUITE_SERVER_HOST, port);
while !connection_refused(con)? {
if backoff > 64 {
// enough sleeping, return an error
error!("Server didn't shut down within {backoff} seconds. Something is wrong");
return Err(HarnessError::Other(format!(
"Shutdown backoff elapsed. Server at {connection_string} did not shut down."
)));
}
info!(
}
info!(
"Server at {connection_string} still active. Sleeping for {backoff} second(s) ..."
);
util::sleep_sec(backoff);
con = Connection::new(TESTSUITE_SERVER_HOST, port);
backoff *= 2;
util::sleep_sec(backoff);
con = Connection::new(TESTSUITE_SERVER_HOST, port);
backoff *= 2;
}
info!("Server at {connection_string} has stopped accepting connections");
}
info!("Server at {connection_string} has stopped accepting connections");
}
info!("All servers have stopped accepting connections. Allowing {SLEEP_FOR_TERMINATION} seconds for them to exit");
util::sleep_sec(SLEEP_FOR_TERMINATION);
@ -146,8 +151,8 @@ fn wait_for_shutdown() -> HarnessResult<()> {
/// Start the servers returning handles to the child processes
fn start_servers(target_folder: impl AsRef<Path>) -> HarnessResult<Vec<Child>> {
let mut ret = Vec::with_capacity(SERVER_IDS.len());
for server_id in SERVER_IDS {
let mut ret = Vec::with_capacity(SERVERS.len());
for (server_id, _ports) in SERVERS {
let cmd = get_run_server_cmd(server_id, target_folder.as_ref());
info!("Starting {server_id} ...");
ret.push(util::get_child(format!("start {server_id}"), cmd)?);
@ -169,7 +174,7 @@ pub(super) fn run_with_servers(
wait_for_shutdown()?;
}
// just use this to avoid ignoring the children vector
assert_eq!(children.len(), SERVER_IDS.len());
assert_eq!(children.len(), SERVERS.len());
Ok(())
}

@ -96,12 +96,13 @@ pub async fn run(
Terminator::new(signal.subscribe()),
));
// bind to signals
let termsig =
TerminationSignal::init().map_err(|e| Error::ioerror_extra(e, "binding to signals"))?;
// start the server (single or multiple listeners)
let mut server =
dbnet::connect(ports, maxcon, db.clone(), auth_provider, signal.clone()).await?;
let termsig =
TerminationSignal::init().map_err(|e| Error::ioerror_extra(e, "binding to signals"))?;
tokio::select! {
_ = server.run_server() => {},
_ = termsig => {}

@ -47,6 +47,10 @@ pub trait StorageTarget {
/// This storage target needs a reinit of the tree despite no preload trip.
/// Exempli gratia: rsnap, snap
const NEEDS_TREE_INIT: bool;
/// This storage target should untrip the trip switch
///
/// Example cases where this doesn't apply: snapshots
const SHOULD_UNTRIP_PRELOAD_TRIPSWITCH: bool;
/// The root for this storage target. **Must not be separator terminated!**
fn root(&self) -> String;
/// Returns the path to the `PRELOAD_` **temporary file** ($ROOT/PRELOAD)
@ -86,6 +90,7 @@ pub struct Autoflush;
impl StorageTarget for Autoflush {
const NEEDS_TREE_INIT: bool = false;
const SHOULD_UNTRIP_PRELOAD_TRIPSWITCH: bool = true;
fn root(&self) -> String {
String::from(interface::DIR_KSROOT)
}
@ -104,6 +109,7 @@ impl<'a> RemoteSnapshot<'a> {
impl<'a> StorageTarget for RemoteSnapshot<'a> {
const NEEDS_TREE_INIT: bool = true;
const SHOULD_UNTRIP_PRELOAD_TRIPSWITCH: bool = false;
fn root(&self) -> String {
let mut p = String::from(interface::DIR_RSNAPROOT);
p.push('/');
@ -125,6 +131,7 @@ impl LocalSnapshot {
impl StorageTarget for LocalSnapshot {
const NEEDS_TREE_INIT: bool = true;
const SHOULD_UNTRIP_PRELOAD_TRIPSWITCH: bool = false;
fn root(&self) -> String {
let mut p = String::from(interface::DIR_SNAPROOT);
p.push('/');
@ -215,8 +222,12 @@ pub fn flush_full<T: StorageTarget>(target: T, store: &Memstore) -> IoResult<()>
// IMPORTANT: Just untrip and get the status at this exact point in time
// don't spread it over two atomic accesses because another thread may have updated
// it in-between. Even if it was untripped, we'll get the expected outcome here: false
let has_tripped = registry::get_preload_tripswitch().check_and_untrip();
if has_tripped || T::NEEDS_TREE_INIT {
let mut should_create_tree = T::NEEDS_TREE_INIT;
if T::SHOULD_UNTRIP_PRELOAD_TRIPSWITCH {
// this target shouldn't untrip the tripswitch
should_create_tree |= registry::get_preload_tripswitch().check_and_untrip();
}
if should_create_tree {
// re-init the tree as new tables/keyspaces may have been added
super::interface::create_tree(&target, store)?;
self::oneshot::flush_preload(&target, store)?;

@ -37,6 +37,7 @@ mod kvengine_encoding;
mod kvengine_list;
mod persist;
mod pipeline;
mod snapshot;
mod tls {
use skytable::{query, Element};

@ -0,0 +1,54 @@
/*
* Created on Tue Mar 29 2022
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2022, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
use sky_macros::dbtest_func as dbtest;
use skytable::{query, Element, RespCode};
const SNAPSHOT_DISABLED: &str = "err-snapshot-disabled";
#[dbtest]
async fn test_snapshot_local_disabled() {
runeq!(
con,
query!("mksnap"),
Element::RespCode(RespCode::ErrorString(SNAPSHOT_DISABLED.to_owned()))
)
}
#[dbtest(skip_if_cfg = "persist-suite")]
async fn test_snapshot_remote_okay() {
assert_okay!(con, query!("mksnap", "myremote"))
}
#[dbtest(port = 2007)]
async fn test_snapshot_local_okay() {
assert_okay!(con, query!("mksnap"))
}
#[dbtest(port = 2007, skip_if_cfg = "persist-suite")]
async fn test_snapshot_remote_okay_with_local_enabled() {
assert_okay!(con, query!("mksnap", "myremote"))
}
Loading…
Cancel
Save