Enable TLS port to be set and impl `MultiListener`

Signed-off-by: Sayan Nandan <nandansayan@outlook.com>
next
Sayan Nandan 4 years ago
parent b2e3d90799
commit 68552b4d71
No known key found for this signature in database
GPG Key ID: C31EFD7DDA12AEE0

@ -16,7 +16,7 @@ TerrabaseDB (or TDB for short) is an effort to provide the best of key/value sto
## Getting started 🚀
1. Download a bundle for your platform from [here ⬇️ ](https://github.com/terrabasedb/terrabase/releases)
1. Download a bundle for your platform from [here ⬇️ ](https://github.com/terrabasedb/terrabasedb/releases)
2. Unzip the bundle
3. Make the files executable (run `chmod +x tdb tsh` on *nix systems)
4. First run `tdb` to start the database server and then run `tsh` to start the interactive shell

@ -6,6 +6,7 @@ noart = false
[ssl]
key = "/path/to/keyfile.pem"
chain = "/path/to/chain.pem"
port = 2004
[bgsave]
enabled = true

@ -34,6 +34,10 @@ use std::path::PathBuf;
use tokio::net::ToSocketAddrs;
use toml;
const DEFAULT_IPV4: IpAddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
const DEFAULT_PORT: u16 = 2003;
const DEFAULT_SSL_PORT: u16 = 2004;
/// This struct is an _object representation_ used for parsing the TOML file
#[derive(Deserialize, Debug, PartialEq)]
pub struct Config {
@ -126,38 +130,74 @@ pub struct ConfigKeySnapshot {
atmost: usize,
}
/// The SSL configuration
/// Port configuration
///
/// This enumeration determines whether the SSL listener is:
/// - `Enabled`: This means that the database server will be listening to both
/// This enumeration determines whether the ports are:
/// - `Multi`: This means that the database server will be listening to both
/// SSL **and** non-SSL requests
/// - `EnabledOnly` : This means that the database server will only accept SSL requests
/// - `SecureOnly` : This means that the database server will only accept SSL requests
/// and will not even activate the non-SSL socket
/// - `Disabled` : This indicates that the server would only accept non-SSL connections
/// - `InsecureOnly` : This indicates that the server would only accept non-SSL connections
/// and will not even activate the SSL socket
#[derive(Debug, PartialEq)]
pub enum SslConfig {
Enabled(SslOpts),
EnabledOnly(SslOpts),
Disabled,
pub enum PortConfig {
SecureOnly {
host: IpAddr,
ssl: SslOpts,
},
Multi {
host: IpAddr,
port: u16,
ssl: SslOpts,
},
InsecureOnly {
host: IpAddr,
port: u16,
},
}
impl PortConfig {
pub const fn default() -> PortConfig {
PortConfig::InsecureOnly {
host: DEFAULT_IPV4,
port: DEFAULT_PORT,
}
}
}
impl PortConfig {
pub const fn new_secure_only(host: IpAddr, ssl: SslOpts) -> Self {
PortConfig::SecureOnly { host, ssl }
}
pub const fn new_insecure_only(host: IpAddr, port: u16) -> Self {
PortConfig::InsecureOnly { host, port }
}
pub const fn new_multi(host: IpAddr, port: u16, ssl: SslOpts) -> Self {
PortConfig::Multi { host, port, ssl }
}
}
#[derive(Deserialize, Debug, PartialEq)]
pub struct KeySslOpts {
key: String,
chain: String,
port: u16,
only: Option<bool>,
}
#[derive(Deserialize, Debug, PartialEq)]
pub struct SslOpts {
key: String,
chain: String,
pub key: String,
pub chain: String,
pub port: u16,
}
impl SslOpts {
pub const fn new(key: String, chain: String) -> Self {
SslOpts { key, chain }
pub const fn new(key: String, chain: String, port: u16) -> Self {
SslOpts { key, chain, port }
}
pub fn get_host_port_tuple(&self, host: String) -> impl ToSocketAddrs {
((host), self.port)
}
}
@ -217,18 +257,14 @@ impl SnapshotConfig {
/// configuration
#[derive(Debug, PartialEq)]
pub struct ParsedConfig {
/// A valid IPv4/IPv6 address
host: IpAddr,
/// A valid port
port: u16,
/// If `noart` is set to true, no terminal artwork should be displayed
noart: bool,
/// The BGSAVE configuration
pub bgsave: BGSave,
/// The snapshot configuration
pub snapshot: SnapshotConfig,
/// The SSL configuration
pub ssl: SslConfig,
/// Port configuration
pub ports: PortConfig,
}
impl ParsedConfig {
@ -247,8 +283,6 @@ impl ParsedConfig {
/// TOML file (represented as an object)
fn from_config(cfg_info: Config) -> Self {
ParsedConfig {
host: cfg_info.server.host,
port: cfg_info.server.port,
noart: if let Some(noart) = cfg_info.server.noart {
noart
} else {
@ -270,20 +304,33 @@ impl ParsedConfig {
} else {
SnapshotConfig::default()
},
ssl: if let Some(sslopts) = cfg_info.ssl {
ports: if let Some(sslopts) = cfg_info.ssl {
if sslopts.only.is_some() {
SslConfig::EnabledOnly(SslOpts {
key: sslopts.key,
chain: sslopts.chain,
})
PortConfig::SecureOnly {
ssl: SslOpts {
key: sslopts.key,
chain: sslopts.chain,
port: sslopts.port,
},
host: cfg_info.server.host,
}
} else {
SslConfig::Enabled(SslOpts {
key: sslopts.key,
chain: sslopts.chain,
})
PortConfig::Multi {
ssl: SslOpts {
key: sslopts.key,
chain: sslopts.chain,
port: sslopts.port,
},
host: cfg_info.server.host,
port: cfg_info.server.port,
}
}
} else {
SslConfig::Disabled
PortConfig::InsecureOnly {
host: cfg_info.server.host,
port: cfg_info.server.port,
}
},
}
}
@ -296,42 +343,37 @@ impl ParsedConfig {
/// and a supplied `port`
pub const fn default_with_port(port: u16) -> Self {
ParsedConfig {
host: IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
port,
noart: false,
bgsave: BGSave::default(),
snapshot: SnapshotConfig::default(),
ssl: SslConfig::Disabled,
ports: PortConfig::new_insecure_only(DEFAULT_IPV4, port),
}
}
pub const fn default_ports() -> PortConfig {
PortConfig::default()
}
/// Create a new `ParsedConfig` with the default `port` and `noart` settngs
/// and a supplied `host`
pub const fn default_with_host(host: IpAddr) -> Self {
ParsedConfig::new(
host,
2003,
false,
BGSave::default(),
SnapshotConfig::default(),
SslConfig::Disabled,
PortConfig::new_insecure_only(host, 2003),
)
}
/// Create a new `ParsedConfig` with all the fields
pub const fn new(
host: IpAddr,
port: u16,
noart: bool,
bgsave: BGSave,
snapshot: SnapshotConfig,
ssl: SslConfig,
ports: PortConfig,
) -> Self {
ParsedConfig {
host,
port,
noart,
bgsave,
snapshot,
ssl,
ports,
}
}
/// Create a default `ParsedConfig` with the following setup defaults:
@ -343,18 +385,12 @@ impl ParsedConfig {
/// - `ssl` : disabled
pub const fn default() -> Self {
ParsedConfig {
host: IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
port: 2003,
noart: false,
bgsave: BGSave::default(),
snapshot: SnapshotConfig::default(),
ssl: SslConfig::Disabled,
ports: PortConfig::new_insecure_only(DEFAULT_IPV4, 2003),
}
}
/// Return a (host, port) tuple which can be bound to with `TcpListener`
pub fn get_host_port_tuple(&self) -> impl ToSocketAddrs {
((self.host), self.port)
}
/// Returns `false` if `noart` is enabled. Otherwise it returns `true`
pub const fn is_artful(&self) -> bool {
!self.noart
@ -574,7 +610,7 @@ pub fn get_config_file_or_return_cfg() -> Result<ConfigType<ParsedConfig, PathBu
}
(None, None) => SnapshotConfig::Disabled,
};
let sslcfg = match (
let portcfg = match (
sslkey.map(|val| val.to_owned()),
sslchain.map(|val| val.to_owned()),
) {
@ -584,14 +620,14 @@ pub fn get_config_file_or_return_cfg() -> Result<ConfigType<ParsedConfig, PathBu
"You mast pass values for both --sslkey and --sslchain to use the --sslonly flag"
));
} else {
SslConfig::Disabled
PortConfig::new_insecure_only(host, port)
}
}
(Some(key), Some(chain)) => {
if sslonly {
SslConfig::EnabledOnly(SslOpts::new(key, chain))
PortConfig::new_secure_only(host, SslOpts::new(key, chain, DEFAULT_SSL_PORT))
} else {
SslConfig::Enabled(SslOpts::new(key, chain))
PortConfig::new_multi(host, port, SslOpts::new(key, chain, DEFAULT_SSL_PORT))
}
}
_ => {
@ -600,7 +636,7 @@ pub fn get_config_file_or_return_cfg() -> Result<ConfigType<ParsedConfig, PathBu
));
}
};
let cfg = ParsedConfig::new(host, port, noart, bgsave, snapcfg, sslcfg);
let cfg = ParsedConfig::new(noart, bgsave, snapcfg, portcfg);
return Ok(ConfigType::Custom(cfg, restorefile));
}
if let Some(filename) = filename {
@ -652,12 +688,10 @@ fn test_config_file_noart() {
assert_eq!(
cfg,
ParsedConfig {
port: 2003,
host: IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
noart: true,
bgsave: BGSave::default(),
snapshot: SnapshotConfig::default(),
ssl: SslConfig::Disabled
ports: PortConfig::default()
}
);
}
@ -670,12 +704,13 @@ fn test_config_file_ipv6() {
assert_eq!(
cfg,
ParsedConfig {
port: 2003,
host: IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0x1)),
noart: false,
bgsave: BGSave::default(),
snapshot: SnapshotConfig::default(),
ssl: SslConfig::Disabled
ports: PortConfig::new_insecure_only(
IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0x1)),
DEFAULT_PORT
)
}
);
}
@ -688,12 +723,10 @@ fn test_config_file_template() {
assert_eq!(
cfg,
ParsedConfig::new(
IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
2003,
false,
BGSave::default(),
SnapshotConfig::Enabled(SnapshotPref::new(3600, 4)),
SslConfig::Disabled // TODO: Update the template
PortConfig::default() // TODO: Update the template
)
);
}
@ -714,12 +747,10 @@ fn test_config_file_custom_bgsave() {
assert_eq!(
cfg,
ParsedConfig {
host: IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
port: 2003,
noart: false,
bgsave: BGSave::new(true, 600),
snapshot: SnapshotConfig::default(),
ssl: SslConfig::Disabled
ports: PortConfig::default()
}
);
}
@ -735,12 +766,10 @@ fn test_config_file_bgsave_enabled_only() {
assert_eq!(
cfg,
ParsedConfig {
host: IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
port: 2003,
noart: false,
bgsave: BGSave::default(),
snapshot: SnapshotConfig::default(),
ssl: SslConfig::Disabled
ports: PortConfig::default()
}
)
}
@ -756,12 +785,10 @@ fn test_config_file_bgsave_every_only() {
assert_eq!(
cfg,
ParsedConfig {
host: IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
port: 2003,
noart: false,
bgsave: BGSave::new(true, 600),
snapshot: SnapshotConfig::default(),
ssl: SslConfig::Disabled
ports: PortConfig::default()
}
)
}
@ -775,10 +802,8 @@ fn test_config_file_snapshot() {
ParsedConfig {
snapshot: SnapshotConfig::Enabled(SnapshotPref::new(3600, 4)),
bgsave: BGSave::default(),
host: IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
port: 2003,
noart: false,
ssl: SslConfig::Disabled
ports: PortConfig::default()
}
);
}

@ -35,9 +35,12 @@
//!
use crate::config::BGSave;
use crate::config::PortConfig;
use crate::config::SnapshotConfig;
use crate::config::SslOpts;
use crate::diskstore::snapshot::DIR_REMOTE_SNAPSHOT;
use crate::protocol::tls::SslConnection;
use crate::protocol::tls::SslListener;
use crate::protocol::{Connection, QueryResult::*};
use crate::resp::Writable;
use crate::CoreDB;
@ -46,6 +49,7 @@ use libtdb::TResult;
use std::fs;
use std::future::Future;
use std::io::ErrorKind;
use std::net::IpAddr;
use std::path::PathBuf;
use std::process;
use std::sync::Arc;
@ -235,9 +239,171 @@ impl Drop for CHandler {
}
use std::io::{self, prelude::*};
enum MultiListener {
SecureOnly(SslListener),
InsecureOnly(Listener),
Multi(Listener, SslListener),
}
impl MultiListener {
pub async fn new_insecure_only(
host: IpAddr,
port: u16,
climit: Arc<Semaphore>,
db: CoreDB,
signal: broadcast::Sender<()>,
terminate_tx: mpsc::Sender<()>,
terminate_rx: mpsc::Receiver<()>,
) -> Self {
let listener = TcpListener::bind((host, port))
.await
.expect("Failed to bind to port");
MultiListener::InsecureOnly(Listener {
listener,
db,
climit,
signal,
terminate_tx,
terminate_rx,
})
}
pub async fn new_secure_only(
host: IpAddr,
climit: Arc<Semaphore>,
db: CoreDB,
signal: broadcast::Sender<()>,
terminate_tx: mpsc::Sender<()>,
terminate_rx: mpsc::Receiver<()>,
ssl: SslOpts,
) -> Self {
let listener = TcpListener::bind((host, ssl.port))
.await
.expect("Failed to bind to port");
MultiListener::SecureOnly(
SslListener::new_pem_based_ssl_connection(
ssl.key,
ssl.chain,
db,
listener,
climit,
signal,
terminate_tx,
terminate_rx,
)
.expect("Couldn't bind to secure port"),
)
}
pub async fn run_server(&mut self) -> TResult<()> {
match self {
MultiListener::SecureOnly(secure_listener) => secure_listener.run().await,
MultiListener::InsecureOnly(insecure_listener) => insecure_listener.run().await,
MultiListener::Multi(insecure_listener, secure_listener) => {
secure_listener.run().await?;
insecure_listener.run().await
}
}
}
pub fn print_binding(&self) {
match self {
MultiListener::SecureOnly(secure_listener) => {
log::info!(
"Server started on tps://{}",
secure_listener.listener.local_addr().expect("Failed to g")
)
}
MultiListener::InsecureOnly(insecure_listener) => {
log::info!(
"Server started on tp://{}",
insecure_listener
.listener
.local_addr()
.expect("Failed to g")
)
}
MultiListener::Multi(insecure_listener, secure_listener) => {
log::info!(
"Listening to tp://{} and tps://{}",
insecure_listener
.listener
.local_addr()
.expect("Failed to g"),
secure_listener.listener.local_addr().expect("Failed to g")
)
}
}
}
pub async fn finish_with_termsig(self) {
match self {
MultiListener::InsecureOnly(server) => {
let Listener {
mut terminate_rx,
terminate_tx,
signal,
db,
..
} = server;
if let Ok(_) = db.flush_db() {
log::info!("Successfully saved data to disk");
()
} else {
log::error!("Failed to flush data to disk");
loop {
// Keep looping until we successfully write the in-memory table to disk
log::warn!("Press enter to try again...");
io::stdout().flush().unwrap();
io::stdin().read(&mut [0]).unwrap();
if let Ok(_) = db.flush_db() {
log::info!("Successfully saved data to disk");
break;
} else {
continue;
}
}
}
drop(signal);
drop(terminate_tx);
let _ = terminate_rx.recv().await;
}
MultiListener::SecureOnly(server) => {
let SslListener {
mut terminate_rx,
terminate_tx,
signal,
db,
..
} = server;
if let Ok(_) = db.flush_db() {
log::info!("Successfully saved data to disk");
()
} else {
log::error!("Failed to flush data to disk");
loop {
// Keep looping until we successfully write the in-memory table to disk
log::warn!("Press enter to try again...");
io::stdout().flush().unwrap();
io::stdin().read(&mut [0]).unwrap();
if let Ok(_) = db.flush_db() {
log::info!("Successfully saved data to disk");
break;
} else {
continue;
}
}
}
drop(signal);
drop(terminate_tx);
let _ = terminate_rx.recv().await;
}
_ => {
todo!("Multiple listeners haven't been implemented yet!");
}
}
}
}
/// Start the server waiting for incoming connections or a CTRL+C signal
pub async fn run(
listener: TcpListener,
ports: PortConfig,
bgsave_cfg: BGSave,
snapshot_cfg: SnapshotConfig,
sig: impl Future,
@ -262,54 +428,44 @@ pub async fn run(
}
},
}
log::info!(
"Started server on terrapipe://{}",
listener
.local_addr()
.expect("The local address couldn't be fetched. Please file a bug report")
);
let mut server = Listener {
listener,
db,
climit: Arc::new(Semaphore::new(50000)),
signal,
terminate_tx,
terminate_rx,
let climit = Arc::new(Semaphore::new(50000));
let mut server = match ports {
PortConfig::InsecureOnly { host, port } => {
MultiListener::new_insecure_only(
host,
port,
climit.clone(),
db,
signal,
terminate_tx,
terminate_rx,
)
.await
}
PortConfig::SecureOnly { host, ssl } => {
MultiListener::new_secure_only(
host,
climit.clone(),
db,
signal,
terminate_tx,
terminate_rx,
ssl,
)
.await
}
_ => {
todo!("Multiple listeners haven't been implemented yet!")
}
};
server.print_binding();
tokio::select! {
_ = server.run() => {}
_ = server.run_server() => {}
_ = sig => {
log::info!("Signalling all workers to shut down");
}
}
let Listener {
mut terminate_rx,
terminate_tx,
signal,
db,
..
} = server;
if let Ok(_) = db.flush_db() {
log::info!("Successfully saved data to disk");
()
} else {
log::error!("Failed to flush data to disk");
loop {
// Keep looping until we successfully write the in-memory table to disk
log::warn!("Press enter to try again...");
io::stdout().flush().unwrap();
io::stdin().read(&mut [0]).unwrap();
if let Ok(_) = db.flush_db() {
log::info!("Successfully saved data to disk");
break;
} else {
continue;
}
}
}
drop(signal);
drop(terminate_tx);
let _ = terminate_rx.recv().await;
server.finish_with_termsig().await;
terminal::write_info("Goodbye :)\n").unwrap();
}

@ -26,8 +26,8 @@
//! the modules for their respective documentation.
use crate::config::BGSave;
use crate::config::PortConfig;
use crate::config::SnapshotConfig;
use tokio::net::TcpListener;
mod config;
use std::env;
mod admin;
@ -54,7 +54,7 @@ use jemallocator::Jemalloc;
static GLOBAL: Jemalloc = Jemalloc;
/// The version text
static MSG: &'static str = "TerrabaseDB v0.5.0 | https://github.com/terrabasedb/terrabase";
static MSG: &'static str = "TerrabaseDB v0.5.0 | https://github.com/terrabasedb/terrabasedb";
/// The terminal art for `!noart` configurations
static TEXT: &'static str = "
_______ _ _____ ____
@ -76,7 +76,7 @@ async fn main() {
// Start the server which asynchronously waits for a CTRL+C signal
// which will safely shut down the server
let (tcplistener, bgsave_config, snapshot_config, restore_filepath) =
check_args_or_connect().await;
check_args_and_get_cfg().await;
run(
tcplistener,
bgsave_config,
@ -87,10 +87,10 @@ async fn main() {
.await;
}
/// This function checks the command line arguments and binds to an appropriate
/// port and host, as per the supplied configuration options
async fn check_args_or_connect() -> (
TcpListener,
/// This function checks the command line arguments and either returns a config object
/// or prints an error to `stderr` and terminates the server
async fn check_args_and_get_cfg() -> (
PortConfig,
BGSave,
SnapshotConfig,
Option<std::path::PathBuf>,
@ -104,35 +104,17 @@ async fn check_args_or_connect() -> (
println!("{}", MSG);
}
log::info!("Using settings from supplied configuration");
(
TcpListener::bind(cfg.get_host_port_tuple()).await,
cfg.bgsave,
cfg.snapshot,
file,
)
(cfg.ports, cfg.bgsave, cfg.snapshot, file)
}
Ok(config::ConfigType::Def(cfg, file)) => {
println!("{}\n{}", TEXT, MSG);
log::warn!("No configuration file supplied. Using default settings");
(
TcpListener::bind(cfg.get_host_port_tuple()).await,
cfg.bgsave,
cfg.snapshot,
file,
)
(cfg.ports, cfg.bgsave, cfg.snapshot, file)
}
Err(e) => {
log::error!("{}", e);
std::process::exit(0x100);
}
};
match binding_and_cfg {
(Ok(b), bgsave_cfg, snapshot_cfg, restore_file) => {
(b, bgsave_cfg, snapshot_cfg, restore_file)
}
(Err(e), _, _, _) => {
log::error!("Failed to bind to socket with error: '{}'", e);
std::process::exit(0x100);
}
}
binding_and_cfg
}

@ -45,17 +45,17 @@ use tokio_openssl::SslStream;
pub struct SslListener {
/// An atomic reference to the coretable
db: CoreDB,
pub db: CoreDB,
/// The incoming connection listener (binding)
listener: TcpListener,
pub listener: TcpListener,
/// The maximum number of connections
climit: Arc<Semaphore>,
/// The shutdown broadcaster
signal: broadcast::Sender<()>,
pub signal: broadcast::Sender<()>,
// When all `Sender`s are dropped - the `Receiver` gets a `None` value
// We send a clone of `terminate_tx` to each `CHandler`
terminate_tx: mpsc::Sender<()>,
terminate_rx: mpsc::Receiver<()>,
pub terminate_tx: mpsc::Sender<()>,
pub terminate_rx: mpsc::Receiver<()>,
acceptor: Arc<SslAcceptor>,
}

Loading…
Cancel
Save