Implement `Con<'_>` object for TLS/non-TLS streams

This object either holds a mutable reference to an unencrypted TCP
stream or an encrypted TLS stream (using OpenSSL). The action handlers
were modified as a consequence.

Signed-off-by: Sayan Nandan <nandansayan@outlook.com>
next
Sayan Nandan 4 years ago
parent 43bb178954
commit 25d211d0a2
No known key found for this signature in database
GPG Key ID: C31EFD7DDA12AEE0

@ -20,10 +20,11 @@
*/
use crate::coredb::CoreDB;
use crate::dbnet::Con;
use crate::diskstore;
use crate::diskstore::snapshot::SnapshotEngine;
use crate::diskstore::snapshot::DIR_SNAPSHOT;
use crate::protocol::{responses, ActionGroup, Connection};
use crate::protocol::{responses, ActionGroup};
use crate::resp::GroupBegin;
use libtdb::terrapipe::RespCodes;
use libtdb::TResult;
@ -32,7 +33,7 @@ use std::path::PathBuf;
/// Create a snapshot
///
pub async fn mksnap(handle: &CoreDB, con: &mut Connection, act: ActionGroup) -> TResult<()> {
pub async fn mksnap(handle: &CoreDB, con: &mut Con<'_>, act: ActionGroup) -> TResult<()> {
let howmany = act.howmany();
if howmany == 0 {
if !handle.is_snapshot_enabled() {

@ -21,6 +21,7 @@
//! # The core database engine
use crate::dbnet::Con;
use crate::config::BGSave;
use crate::config::SnapshotConfig;
use crate::config::SnapshotPref;
@ -245,7 +246,7 @@ impl CoreDB {
}
/// Execute a query that has already been validated by `Connection::read_query`
pub async fn execute_query(&self, query: Query, mut con: &mut Connection) -> TResult<()> {
pub async fn execute_query(&self, query: Query, mut con: &mut Con<'_>) -> TResult<()> {
match query {
Query::Simple(q) => {
queryengine::execute_simple(&self, &mut con, q).await?;

@ -37,7 +37,9 @@
use crate::config::BGSave;
use crate::config::SnapshotConfig;
use crate::diskstore::snapshot::DIR_REMOTE_SNAPSHOT;
use crate::protocol::tls::SslConnection;
use crate::protocol::{Connection, QueryResult::*};
use crate::resp::Writable;
use crate::CoreDB;
use libtdb::util::terminal;
use libtdb::TResult;
@ -156,6 +158,49 @@ impl Listener {
}
}
/// # Connection Wrapper
///
/// A `Con` object holds a mutable reference to a standard TCP stream or to
/// an encrypted connection (over the `SslListener` object). It provides a few
/// methods which are provided by the underlying interface.
pub enum Con<'a> {
/// A secure TLS connection
Secure(&'a mut SslConnection),
/// An insecure ('standard') TCP connection
Insecure(&'a mut Connection),
}
impl<'a> Con<'a> {
/// Create a new **unencrypted** connection instance
pub fn init<'b>(con: &'b mut Connection) -> Self
where
'b: 'a,
{
Con::Insecure(con)
}
/// Create a new **encrypted** connection instance
pub fn init_ssl<'b>(con: &'b mut SslConnection) -> Self
where
'b: 'a,
{
Con::Secure(con)
}
/// Flush the stream that is held by the underlying connection
pub async fn flush_stream(&mut self) -> TResult<()> {
match self {
Con::Secure(con) => con.flush_stream().await,
Con::Insecure(con) => con.flush_stream().await,
}
}
/// Write bytes to the underlying stream that implement the `Writable` trait
pub async fn write_response(&mut self, resp: impl Writable) -> TResult<()> {
match self {
Con::Insecure(con) => con.write_response(resp).await,
Con::Secure(con) => con.write_response(resp).await,
}
}
}
impl CHandler {
/// Process the incoming connection
pub async fn run(&mut self) -> TResult<()> {
@ -167,7 +212,11 @@ impl CHandler {
}
};
match try_df {
Ok(Q(s)) => self.db.execute_query(s, &mut self.con).await?,
Ok(Q(s)) => {
self.db
.execute_query(s, &mut Con::Insecure(&mut self.con))
.await?
}
Ok(E(r)) => self.con.close_conn_with_error(r).await?,
Ok(Empty) => return Ok(()),
Err(e) => return Err(e.into()),

@ -19,13 +19,13 @@
*
*/
use crate::coredb::CoreDB;
use crate::protocol::Connection;
use crate::dbnet::Con;
use crate::protocol::{responses, ActionGroup};
use crate::resp::GroupBegin;
use libtdb::TResult;
/// Get the number of keys in the database
pub async fn dbsize(handle: &CoreDB, con: &mut Connection, act: ActionGroup) -> TResult<()> {
pub async fn dbsize(handle: &CoreDB, con: &mut Con<'_>, act: ActionGroup) -> TResult<()> {
if act.howmany() != 0 {
return con.write_response(&**responses::fresp::R_ACTION_ERR).await;
}

@ -23,7 +23,7 @@
//! This module provides functions to work with `DEL` queries
use crate::coredb::CoreDB;
use crate::protocol::Connection;
use crate::dbnet::Con;
use crate::protocol::{responses, ActionGroup};
use crate::resp::GroupBegin;
use libtdb::TResult;
@ -32,7 +32,7 @@ use libtdb::TResult;
///
/// Do note that this function is blocking since it acquires a write lock.
/// It will write an entire datagroup, for this `del` action
pub async fn del(handle: &CoreDB, con: &mut Connection, act: ActionGroup) -> TResult<()> {
pub async fn del(handle: &CoreDB, con: &mut Con<'_>, act: ActionGroup) -> TResult<()> {
let howmany = act.howmany();
if howmany == 0 {
return con.write_response(&**responses::fresp::R_ACTION_ERR).await;

@ -23,13 +23,13 @@
//! This module provides functions to work with `EXISTS` queries
use crate::coredb::CoreDB;
use crate::protocol::Connection;
use crate::dbnet::Con;
use crate::protocol::{responses, ActionGroup};
use crate::resp::GroupBegin;
use libtdb::TResult;
/// Run an `EXISTS` query
pub async fn exists(handle: &CoreDB, con: &mut Connection, act: ActionGroup) -> TResult<()> {
pub async fn exists(handle: &CoreDB, con: &mut Con<'_>, act: ActionGroup) -> TResult<()> {
let howmany = act.howmany();
if howmany == 0 {
return con.write_response(&**responses::fresp::R_ACTION_ERR).await;

@ -20,12 +20,12 @@
*/
use crate::coredb::CoreDB;
use crate::protocol::Connection;
use crate::dbnet::Con;
use crate::protocol::{responses, ActionGroup};
use libtdb::TResult;
/// Delete all the keys in the database
pub async fn flushdb(handle: &CoreDB, con: &mut Connection, act: ActionGroup) -> TResult<()> {
pub async fn flushdb(handle: &CoreDB, con: &mut Con<'_>, act: ActionGroup) -> TResult<()> {
if act.howmany() != 0 {
return con.write_response(&**responses::fresp::R_ACTION_ERR).await;
}

@ -23,14 +23,14 @@
//! This module provides functions to work with `GET` queries
use crate::coredb::CoreDB;
use crate::protocol::Connection;
use crate::dbnet::Con;
use crate::protocol::{responses, ActionGroup};
use crate::resp::{BytesWrapper, GroupBegin};
use bytes::Bytes;
use libtdb::TResult;
/// Run a `GET` query
pub async fn get(handle: &CoreDB, con: &mut Connection, act: ActionGroup) -> TResult<()> {
pub async fn get(handle: &CoreDB, con: &mut Con<'_>, act: ActionGroup) -> TResult<()> {
let howmany = act.howmany();
if howmany != 1 {
return con.write_response(&**responses::fresp::R_ACTION_ERR).await;

@ -24,7 +24,7 @@
//! Functions for handling `JGET` queries
use crate::coredb::CoreDB;
use crate::protocol::Connection;
use crate::dbnet::Con;
use crate::protocol::{responses, ActionGroup};
use libtdb::TResult;
@ -37,7 +37,7 @@ use libtdb::TResult;
/// {"key":"value"}\n
/// ```
///
pub async fn jget(_handle: &CoreDB, con: &mut Connection, act: ActionGroup) -> TResult<()> {
pub async fn jget(_handle: &CoreDB, con: &mut Con<'_>, act: ActionGroup) -> TResult<()> {
let howmany = act.howmany();
if howmany != 1 {
return con.write_response(&**responses::fresp::R_ACTION_ERR).await;

@ -19,7 +19,7 @@
*
*/
use crate::coredb::CoreDB;
use crate::protocol::Connection;
use crate::dbnet::Con;
use crate::protocol::{responses, ActionGroup};
use crate::resp::GroupBegin;
use libtdb::TResult;
@ -27,7 +27,7 @@ use libtdb::TResult;
/// Run a `KEYLEN` query
///
/// At this moment, `keylen` only supports a single key
pub async fn keylen(handle: &CoreDB, con: &mut Connection, act: ActionGroup) -> TResult<()> {
pub async fn keylen(handle: &CoreDB, con: &mut Con<'_>, act: ActionGroup) -> TResult<()> {
let howmany = act.howmany();
if howmany != 1 {
return con.write_response(&**responses::fresp::R_ACTION_ERR).await;

@ -20,7 +20,7 @@
*/
use crate::coredb::CoreDB;
use crate::protocol::Connection;
use crate::dbnet::Con;
use crate::protocol::{responses, ActionGroup};
use crate::resp::{BytesWrapper, GroupBegin};
use bytes::Bytes;
@ -29,7 +29,7 @@ use libtdb::TResult;
/// Run an `MGET` query
///
pub async fn mget(handle: &CoreDB, con: &mut Connection, act: ActionGroup) -> TResult<()> {
pub async fn mget(handle: &CoreDB, con: &mut Con<'_>, act: ActionGroup) -> TResult<()> {
let howmany = act.howmany();
if howmany == 0 {
return con.write_response(&**responses::fresp::R_ACTION_ERR).await;

@ -39,14 +39,14 @@ pub mod update;
pub mod uset;
pub mod heya {
//! Respond to `HEYA` queries
use crate::dbnet::Con;
use crate::protocol;
use crate::protocol::ActionGroup;
use crate::protocol::Connection;
use crate::CoreDB;
use libtdb::TResult;
use protocol::responses;
/// Returns a `HEY!` `Response`
pub async fn heya(_db: &CoreDB, con: &mut Connection, _buf: ActionGroup) -> TResult<()> {
pub async fn heya(_db: &CoreDB, con: &mut Con<'_>, _buf: ActionGroup) -> TResult<()> {
con.write_response(&**responses::fresp::R_HEYA).await
}
}

@ -20,14 +20,14 @@
*/
use crate::coredb::{self, CoreDB};
use crate::protocol::Connection;
use crate::dbnet::Con;
use crate::protocol::{responses, ActionGroup};
use crate::resp::GroupBegin;
use libtdb::TResult;
use std::collections::hash_map::Entry;
/// Run an `MSET` query
pub async fn mset(handle: &CoreDB, con: &mut Connection, act: ActionGroup) -> TResult<()> {
pub async fn mset(handle: &CoreDB, con: &mut Con<'_>, act: ActionGroup) -> TResult<()> {
let howmany = act.howmany();
if howmany & 1 == 1 || howmany == 0 {
// An odd number of arguments means that the number of keys

@ -20,14 +20,14 @@
*/
use crate::coredb::{self, CoreDB};
use crate::protocol::Connection;
use crate::dbnet::Con;
use crate::protocol::{responses, ActionGroup};
use crate::resp::GroupBegin;
use libtdb::TResult;
use std::collections::hash_map::Entry;
/// Run an `MUPDATE` query
pub async fn mupdate(handle: &CoreDB, con: &mut Connection, act: ActionGroup) -> TResult<()> {
pub async fn mupdate(handle: &CoreDB, con: &mut Con<'_>, act: ActionGroup) -> TResult<()> {
let howmany = act.howmany();
if howmany & 1 == 1 || howmany == 0 {
// An odd number of arguments means that the number of keys

@ -23,7 +23,7 @@
//! This module provides functions to work with `SET` queries
use crate::coredb::{self, CoreDB};
use crate::protocol::Connection;
use crate::dbnet::Con;
use crate::protocol::{responses, ActionGroup};
use coredb::Data;
use libtdb::TResult;
@ -31,7 +31,7 @@ use std::collections::hash_map::Entry;
use std::hint::unreachable_unchecked;
/// Run a `SET` query
pub async fn set(handle: &CoreDB, con: &mut Connection, act: ActionGroup) -> TResult<()> {
pub async fn set(handle: &CoreDB, con: &mut Con<'_>, act: ActionGroup) -> TResult<()> {
let howmany = act.howmany();
if howmany != 2 {
// There should be exactly 2 arguments

@ -31,7 +31,7 @@
//! Do note that this isn't the same as the gurantees provided by ACID transactions
use crate::coredb::{CoreDB, Data};
use crate::protocol::Connection;
use crate::dbnet::Con;
use crate::protocol::{responses, ActionGroup};
use libtdb::TResult;
use std::hint::unreachable_unchecked;
@ -40,7 +40,7 @@ use std::hint::unreachable_unchecked;
///
/// This either returns `Okay` if all the keys were set, or it returns an
/// `Overwrite Error` or code `2`
pub async fn sset(handle: &CoreDB, con: &mut Connection, act: ActionGroup) -> TResult<()> {
pub async fn sset(handle: &CoreDB, con: &mut Con<'_>, act: ActionGroup) -> TResult<()> {
let howmany = act.howmany();
if howmany & 1 == 1 || howmany == 0 {
return con.write_response(&**responses::fresp::R_ACTION_ERR).await;
@ -102,7 +102,7 @@ pub async fn sset(handle: &CoreDB, con: &mut Connection, act: ActionGroup) -> TR
///
/// This either returns `Okay` if all the keys were `del`eted, or it returns a
/// `Nil`, which is code `1`
pub async fn sdel(handle: &CoreDB, con: &mut Connection, act: ActionGroup) -> TResult<()> {
pub async fn sdel(handle: &CoreDB, con: &mut Con<'_>, act: ActionGroup) -> TResult<()> {
let howmany = act.howmany();
if howmany == 0 {
return con.write_response(&**responses::fresp::R_ACTION_ERR).await;
@ -158,7 +158,7 @@ pub async fn sdel(handle: &CoreDB, con: &mut Connection, act: ActionGroup) -> TR
///
/// This either returns `Okay` if all the keys were updated, or it returns `Nil`
/// or code `1`
pub async fn supdate(handle: &CoreDB, con: &mut Connection, act: ActionGroup) -> TResult<()> {
pub async fn supdate(handle: &CoreDB, con: &mut Con<'_>, act: ActionGroup) -> TResult<()> {
let howmany = act.howmany();
if howmany & 1 == 1 || howmany == 0 {
return con.write_response(&**responses::fresp::R_ACTION_ERR).await;

@ -23,7 +23,7 @@
//! This module provides functions to work with `UPDATE` queries
//!
use crate::coredb::{self, CoreDB};
use crate::protocol::Connection;
use crate::dbnet::Con;
use crate::protocol::{responses, ActionGroup};
use coredb::Data;
use libtdb::TResult;
@ -31,7 +31,7 @@ use std::collections::hash_map::Entry;
use std::hint::unreachable_unchecked;
/// Run an `UPDATE` query
pub async fn update(handle: &CoreDB, con: &mut Connection, act: ActionGroup) -> TResult<()> {
pub async fn update(handle: &CoreDB, con: &mut Con<'_>, act: ActionGroup) -> TResult<()> {
let howmany = act.howmany();
if howmany != 2 {
// There should be exactly 2 arguments

@ -20,7 +20,7 @@
*/
use crate::coredb::{self, CoreDB};
use crate::protocol::Connection;
use crate::dbnet::Con;
use crate::protocol::{responses, ActionGroup};
use crate::resp::GroupBegin;
use libtdb::TResult;
@ -28,7 +28,7 @@ use libtdb::TResult;
/// Run an `USET` query
///
/// This is like "INSERT or UPDATE"
pub async fn uset(handle: &CoreDB, con: &mut Connection, act: ActionGroup) -> TResult<()> {
pub async fn uset(handle: &CoreDB, con: &mut Con<'_>, act: ActionGroup) -> TResult<()> {
let howmany = act.howmany();
if howmany & 1 == 1 || howmany == 0 {
// An odd number of arguments means that the number of keys

@ -22,6 +22,7 @@
//! # The Query Engine
use crate::coredb::CoreDB;
use crate::dbnet::Con;
use crate::protocol::ActionGroup;
use crate::protocol::{responses, Connection};
use crate::{admin, kvengine};
@ -66,7 +67,7 @@ mod tags {
}
/// Execute a simple(*) query
pub async fn execute_simple(db: &CoreDB, con: &mut Connection, buf: ActionGroup) -> TResult<()> {
pub async fn execute_simple(db: &CoreDB, con: &mut Con<'_>, buf: ActionGroup) -> TResult<()> {
let first = match buf.get_first() {
None => {
return con

Loading…
Cancel
Save