Enable maximum connections to be configured manually (#182)

* Enable maximum connections to be configured

* Add arbiter for handling server startup

* Add handling of maxcon for command-line args

* Add changelog entry
next
Sayan 3 years ago committed by GitHub
parent ea1871747d
commit 864c6d461f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -2,6 +2,16 @@
All changes in this project will be noted in this file.
## Unreleased
### Additions
- The maximum number of clients can now be manually configured [see [#182](https://github.com/skytable/skytable/pull/182)]
### Fixes
- Ability to connect to the server over TLS has been restored in `skysh` [see [#181](https://github.com/skytable/skytable/pull/181)]
## Version 0.6.2 [2021-06-24]
### Fixes

@ -11,6 +11,7 @@ host = "127.0.0.1" # The IP address to which you want sdb to bind to
port = 2003 # The port to which you want sdb to bind to
# Set `noart` to true if you want to disable terminal artwork
noart = false
maxcon = 50000 # set the maximum number of clients that the server can accept
# This key is *OPTIONAL*
[bgsave]

@ -0,0 +1,131 @@
/*
* Created on Sat Jun 26 2021
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2021, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
use crate::config::BGSave;
use crate::config::SnapshotConfig;
use crate::coredb::CoreDB;
use crate::dbnet::{self, Terminator};
use crate::diskstore::snapshot::DIR_REMOTE_SNAPSHOT;
use crate::services;
use crate::PortConfig;
use std::fs;
use tokio::sync::broadcast;
#[cfg(unix)]
use core::{future::Future, pin::Pin, task::Context, task::Poll};
#[cfg(unix)]
use tokio::signal::unix::{signal as fnsignal, Signal, SignalKind};
#[cfg(unix)]
/// Object to bind to unix-specific signals
pub struct UnixTerminationSignal {
sigterm: Signal,
}
#[cfg(unix)]
impl UnixTerminationSignal {
pub fn init() -> Result<Self, String> {
let sigterm = fnsignal(SignalKind::terminate())
.map_err(|e| format!("Failed to bind to signal with: {}", e))?;
Ok(Self { sigterm })
}
}
#[cfg(unix)]
impl Future for UnixTerminationSignal {
type Output = Option<()>;
fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
self.sigterm.poll_recv(ctx)
}
}
/// Start the server waiting for incoming connections or a termsig
pub async fn run(
ports: PortConfig,
bgsave_cfg: BGSave,
snapshot_cfg: SnapshotConfig,
restore_filepath: Option<String>,
maxcon: usize,
) -> Result<CoreDB, String> {
// Intialize the broadcast channel
let (signal, _) = broadcast::channel(1);
// create the data directories and intialize the coredb
fs::create_dir_all(&*DIR_REMOTE_SNAPSHOT)
.map_err(|e| format!("Failed to create data directories: '{}'", e))?;
let db = CoreDB::new(&snapshot_cfg, restore_filepath)
.map_err(|e| format!("Error while initializing database: {}", e))?;
// initialize the background services
let bgsave_handle = tokio::spawn(services::bgsave::bgsave_scheduler(
db.clone(),
bgsave_cfg,
Terminator::new(signal.subscribe()),
));
let snapshot_handle = tokio::spawn(services::snapshot::snapshot_service(
db.clone(),
snapshot_cfg,
Terminator::new(signal.subscribe()),
));
// bind the ctrlc handler
let sig = tokio::signal::ctrl_c();
// start the server (single or multiple listeners)
let mut server = dbnet::connect(ports, maxcon, db.clone(), signal.clone()).await?;
#[cfg(not(unix))]
{
// Non-unix, usually Windows specific signal handling.
// FIXME(@ohsayan): For now, let's just
// bother with ctrl+c, we'll move ahead as users require them
tokio::select! {
_ = server.run_server() => {}
_ = sig => {}
}
}
#[cfg(unix)]
{
let sigterm = UnixTerminationSignal::init()?;
// apart from CTRLC, the only other thing we care about is SIGTERM
// FIXME(@ohsayan): Maybe we should respond to SIGHUP too?
tokio::select! {
_ = server.run_server() => {},
_ = sig => {},
_ = sigterm => {}
}
}
log::info!("Signalling all workers to shut down");
// drop the signal and let others exit
drop(signal);
server.finish_with_termsig().await;
// wait for the background services to terminate
let _ = snapshot_handle.await;
let _ = bgsave_handle.await;
Ok(db)
}

@ -84,6 +84,12 @@ args:
long: stop-write-on-fail
takes_value: true
help: Stop accepting writes if any persistence method except BGSAVE fails (defaults to true)
- maxcon:
required: false
long: maxcon
takes_value: true
help: Set the maximum number of connections
value_name: maxcon
subcommands:
- upgrade:
about: Upgrades old datsets to the latest format supported by this server edition

@ -27,6 +27,7 @@
//! This module provides tools to handle configuration files and settings
use crate::compat;
use crate::dbnet::MAXIMUM_CONNECTION_LIMIT;
use core::hint::unreachable_unchecked;
#[cfg(test)]
use libsky::TResult;
@ -114,6 +115,8 @@ pub struct ConfigKeyServer {
/// The noart key is an `Option`al boolean value which is set to true
/// for secure environments to disable terminal artwork
noart: Option<bool>,
/// The maximum number of clients
maxclient: Option<usize>,
}
/// The snapshot section in the TOML file
@ -260,6 +263,8 @@ pub struct ParsedConfig {
pub snapshot: SnapshotConfig,
/// Port configuration
pub ports: PortConfig,
/// The maximum number of connections
pub maxcon: usize,
}
impl ParsedConfig {
@ -327,6 +332,7 @@ impl ParsedConfig {
port: cfg_info.server.port,
}
},
maxcon: libsky::option_unwrap_or!(cfg_info.server.maxclient, MAXIMUM_CONNECTION_LIMIT),
}
}
#[cfg(test)]
@ -340,12 +346,14 @@ impl ParsedConfig {
bgsave: BGSave,
snapshot: SnapshotConfig,
ports: PortConfig,
maxcon: usize,
) -> Self {
ParsedConfig {
noart,
bgsave,
snapshot,
ports,
maxcon,
}
}
/// Create a default `ParsedConfig` with the following setup defaults:
@ -361,6 +369,7 @@ impl ParsedConfig {
bgsave: BGSave::default(),
snapshot: SnapshotConfig::default(),
ports: PortConfig::new_insecure_only(DEFAULT_IPV4, 2003),
maxcon: MAXIMUM_CONNECTION_LIMIT,
}
}
/// Returns `false` if `noart` is enabled. Otherwise it returns `true`
@ -442,6 +451,7 @@ pub fn get_config_file_or_return_cfg() -> Result<ConfigType<ParsedConfig, String
let saveduration = matches.value_of("saveduration");
let sslkey = matches.value_of("sslkey");
let sslchain = matches.value_of("sslchain");
let maxcon = matches.value_of("maxcon");
let cli_has_overrideable_args = host.is_some()
|| port.is_some()
|| noart
@ -451,6 +461,7 @@ pub fn get_config_file_or_return_cfg() -> Result<ConfigType<ParsedConfig, String
|| saveduration.is_some()
|| sslchain.is_some()
|| sslkey.is_some()
|| maxcon.is_some()
|| sslonly;
if filename.is_some() && cli_has_overrideable_args {
return Err(ConfigError::CfgError(
@ -483,6 +494,17 @@ pub fn get_config_file_or_return_cfg() -> Result<ConfigType<ParsedConfig, String
},
None => "127.0.0.1".parse().unwrap(),
};
let maxcon: usize = match maxcon {
Some(limit) => match limit.parse() {
Ok(l) => l,
Err(_) => {
return Err(ConfigError::CliArgErr(
"Invalid value for `--maxcon`. Expected a valid positive integer",
));
}
},
None => MAXIMUM_CONNECTION_LIMIT,
};
let bgsave = if nosave {
if saveduration.is_some() {
// If there is both `nosave` and `saveduration` - the arguments aren't logically correct!
@ -582,7 +604,7 @@ pub fn get_config_file_or_return_cfg() -> Result<ConfigType<ParsedConfig, String
));
}
};
let cfg = ParsedConfig::new(noart, bgsave, snapcfg, portcfg);
let cfg = ParsedConfig::new(noart, bgsave, snapcfg, portcfg, maxcon);
return Ok(ConfigType::Custom(cfg, restorefile));
}
if let Some(filename) = filename {
@ -686,7 +708,8 @@ mod tests {
noart: true,
bgsave: BGSave::default(),
snapshot: SnapshotConfig::default(),
ports: PortConfig::default()
ports: PortConfig::default(),
maxcon: MAXIMUM_CONNECTION_LIMIT
}
);
}
@ -704,7 +727,8 @@ mod tests {
ports: PortConfig::new_insecure_only(
IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0x1)),
DEFAULT_PORT
)
),
maxcon: MAXIMUM_CONNECTION_LIMIT
}
);
}
@ -726,7 +750,8 @@ mod tests {
"/path/to/chain.pem".into(),
2004
)
)
),
MAXIMUM_CONNECTION_LIMIT
)
);
}
@ -748,7 +773,8 @@ mod tests {
noart: false,
bgsave: BGSave::new(true, 600),
snapshot: SnapshotConfig::default(),
ports: PortConfig::default()
ports: PortConfig::default(),
maxcon: MAXIMUM_CONNECTION_LIMIT
}
);
}
@ -767,7 +793,8 @@ mod tests {
noart: false,
bgsave: BGSave::default(),
snapshot: SnapshotConfig::default(),
ports: PortConfig::default()
ports: PortConfig::default(),
maxcon: MAXIMUM_CONNECTION_LIMIT
}
)
}
@ -786,7 +813,8 @@ mod tests {
noart: false,
bgsave: BGSave::new(true, 600),
snapshot: SnapshotConfig::default(),
ports: PortConfig::default()
ports: PortConfig::default(),
maxcon: MAXIMUM_CONNECTION_LIMIT
}
)
}
@ -801,7 +829,8 @@ mod tests {
snapshot: SnapshotConfig::Enabled(SnapshotPref::new(3600, 4, true)),
bgsave: BGSave::default(),
noart: false,
ports: PortConfig::default()
ports: PortConfig::default(),
maxcon: MAXIMUM_CONNECTION_LIMIT
}
);
}

@ -39,19 +39,11 @@
//! 5. Now errors are handled if they occur. Otherwise, the query is executed by `CoreDB::execute_query()`
//!
use crate::config::BGSave;
use self::tcp::Listener;
use crate::config::PortConfig;
use crate::config::SnapshotConfig;
use crate::config::SslOpts;
use crate::dbnet::tcp::Listener;
use crate::diskstore;
use crate::services;
use diskstore::snapshot::DIR_REMOTE_SNAPSHOT;
mod tcp;
use crate::coredb::CoreDB;
use libsky::TResult;
use std::fs;
use std::future::Future;
use std::io::Error as IoError;
use std::net::IpAddr;
use std::sync::Arc;
@ -60,6 +52,7 @@ use tokio::net::TcpListener;
use tokio::sync::Semaphore;
use tokio::sync::{broadcast, mpsc};
pub mod connection;
mod tcp;
mod tls;
pub const MAXIMUM_CONNECTION_LIMIT: usize = 50000;
@ -165,7 +158,7 @@ macro_rules! bindaddr {
/// - The `Multi` variant holds both an `SslListener` and a `Listener`
/// This variant enables listening to both secure and insecure sockets at the same time
/// asynchronously
enum MultiListener {
pub enum MultiListener {
SecureOnly(SslListener),
InsecureOnly(Listener),
Multi(Listener, SslListener),
@ -246,59 +239,15 @@ impl MultiListener {
}
}
#[cfg(unix)]
use core::{pin::Pin, task::Context, task::Poll};
#[cfg(unix)]
use tokio::signal::unix::{signal as fnsignal, Signal, SignalKind};
#[cfg(unix)]
/// Object to bind to unix-specific signals
pub struct UnixTerminationSignal {
sigterm: Signal,
}
#[cfg(unix)]
impl UnixTerminationSignal {
pub fn init() -> Result<Self, String> {
let sigterm = fnsignal(SignalKind::terminate())
.map_err(|e| format!("Failed to bind to signal with: {}", e))?;
Ok(Self { sigterm })
}
}
#[cfg(unix)]
impl Future for UnixTerminationSignal {
type Output = Option<()>;
fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
self.sigterm.poll_recv(ctx)
}
}
/// Start the server waiting for incoming connections or a CTRL+C signal
pub async fn run(
/// Initialize the database networking
pub async fn connect(
ports: PortConfig,
bgsave_cfg: BGSave,
snapshot_cfg: SnapshotConfig,
sig: impl Future,
restore_filepath: Option<String>,
) -> Result<CoreDB, String> {
let (signal, _) = broadcast::channel(1);
fs::create_dir_all(&*DIR_REMOTE_SNAPSHOT)
.map_err(|e| format!("Failed to create data directories: '{}'", e))?;
let db = CoreDB::new(&snapshot_cfg, restore_filepath)
.map_err(|e| format!("Error while initializing database: {}", e))?;
let bgsave_handle = tokio::spawn(services::bgsave::bgsave_scheduler(
db.clone(),
bgsave_cfg,
Terminator::new(signal.subscribe()),
));
let snapshot_handle = tokio::spawn(services::snapshot::snapshot_service(
db.clone(),
snapshot_cfg,
Terminator::new(signal.subscribe()),
));
let climit = Arc::new(Semaphore::const_new(MAXIMUM_CONNECTION_LIMIT));
let mut server = match ports {
maxcon: usize,
db: CoreDB,
signal: broadcast::Sender<()>,
) -> Result<MultiListener, String> {
let climit = Arc::new(Semaphore::const_new(maxcon));
let server = match ports {
PortConfig::InsecureOnly { host, port } => MultiListener::new_insecure_only(
BaseListener::init(&db, host, port, climit.clone(), signal.clone())
.await
@ -322,32 +271,5 @@ pub async fn run(
MultiListener::new_multi(secure_listener, insecure_listener, ssl).await?
}
};
#[cfg(not(unix))]
{
// Non-unix, usually Windows specific signal handling.
// FIXME(@ohsayan): For now, let's just
// bother with ctrl+c, we'll move ahead as users require them
tokio::select! {
_ = server.run_server() => {}
_ = sig => {}
}
}
#[cfg(unix)]
{
let sigterm = UnixTerminationSignal::init()?;
// apart from CTRLC, the only other thing we care about is SIGTERM
// FIXME(@ohsayan): Maybe we should respond to SIGHUP too?
tokio::select! {
_ = server.run_server() => {},
_ = sig => {},
_ = sigterm => {}
}
}
log::info!("Signalling all workers to shut down");
// drop the signal and let others exit
drop(signal);
server.finish_with_termsig().await;
let _ = snapshot_handle.await;
let _ = bgsave_handle.await;
Ok(db)
Ok(server)
}

@ -45,9 +45,9 @@ use std::process;
use std::sync::Arc;
use std::thread;
use std::time;
use tokio::signal;
mod actions;
mod admin;
mod arbiter;
mod compat;
mod config;
mod coredb;
@ -85,21 +85,21 @@ fn main() {
.enable_all()
.build()
.unwrap();
let (ports, bgsave_config, snapshot_config, restore_filepath) = check_args_and_get_cfg();
let (ports, bgsave_config, snapshot_config, restore_filepath, maxcon) =
check_args_and_get_cfg();
// check if any other process is using the data directory and lock it if not (else error)
// important: create the pid_file just here and nowhere else because check_args can also
// involve passing --help or wrong arguments which can falsely create a PID file
let pid_file = run_pre_startup_tasks();
let db: Result<coredb::CoreDB, String> = runtime.block_on(async move {
let db = dbnet::run(
arbiter::run(
ports,
bgsave_config,
snapshot_config,
signal::ctrl_c(),
restore_filepath,
maxcon,
)
.await;
db
.await
});
// Make sure all background workers terminate
drop(runtime);
@ -150,7 +150,7 @@ use self::config::{BGSave, PortConfig, SnapshotConfig};
/// This function checks the command line arguments and either returns a config object
/// or prints an error to `stderr` and terminates the server
fn check_args_and_get_cfg() -> (PortConfig, BGSave, SnapshotConfig, Option<String>) {
fn check_args_and_get_cfg() -> (PortConfig, BGSave, SnapshotConfig, Option<String>, usize) {
let cfg = config::get_config_file_or_return_cfg();
let binding_and_cfg = match cfg {
Ok(config::ConfigType::Custom(cfg, file)) => {
@ -160,12 +160,12 @@ fn check_args_and_get_cfg() -> (PortConfig, BGSave, SnapshotConfig, Option<Strin
println!("Skytable v{} | {}", VERSION, URL);
}
log::info!("Using settings from supplied configuration");
(cfg.ports, cfg.bgsave, cfg.snapshot, file)
(cfg.ports, cfg.bgsave, cfg.snapshot, file, cfg.maxcon)
}
Ok(config::ConfigType::Def(cfg, file)) => {
println!("Skytable v{} | {}\n{}", VERSION, URL, TEXT);
log::warn!("No configuration file supplied. Using default settings");
(cfg.ports, cfg.bgsave, cfg.snapshot, file)
(cfg.ports, cfg.bgsave, cfg.snapshot, file, cfg.maxcon)
}
Err(e) => {
log::error!("{}", e);

Loading…
Cancel
Save