next
Sayan Nandan 4 years ago
parent 29e9452d40
commit 37d4b808e1
No known key found for this signature in database
GPG Key ID: C31EFD7DDA12AEE0

@ -23,11 +23,11 @@ use corelib::{
terrapipe::{self, ActionType, QueryBuilder, RespCodes, DEF_QMETALAYOUT_BUFSIZE},
TResult,
};
use std::{error::Error, fmt, process};
use std::{error::Error, fmt};
use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
use tokio::net::TcpStream;
use std::future::Future;
/// Errors that may occur while parsing responses from the server
#[derive(Debug)]
pub enum ClientError {
RespCode(RespCodes),
@ -48,10 +48,12 @@ impl fmt::Display for ClientError {
impl Error for ClientError {}
/// A client
pub struct Client {
con: TcpStream,
}
/// The Request metaline
pub struct RMetaline {
content_size: usize,
metalayout_size: usize,
@ -60,6 +62,7 @@ pub struct RMetaline {
}
impl RMetaline {
/// Decode a metaline from a `String` buffer
pub fn from_buf(buf: String) -> TResult<Self> {
let parts: Vec<&str> = buf.split('!').collect();
if let (Some(resptype), Some(respcode), Some(clength), Some(metalayout_size)) =
@ -95,10 +98,13 @@ impl RMetaline {
}
impl Client {
/// Create a new client instance
pub async fn new(addr: &str) -> TResult<Self> {
let con = TcpStream::connect(addr).await?;
Ok(Client { con })
}
/// Run a query read from stdin. This function will take care of everything
/// including printing errors
pub async fn run(&mut self, cmd: String) {
if cmd.len() == 0 {
return;
@ -117,6 +123,7 @@ impl Client {
};
}
}
/// Run a query, reading and writng to the stream
async fn run_query(&mut self, (_, query_bytes): (usize, Vec<u8>)) -> TResult<Vec<String>> {
self.con.write_all(&query_bytes).await?;
let mut metaline_buf = String::with_capacity(DEF_QMETALAYOUT_BUFSIZE);

@ -25,4 +25,5 @@
pub mod terrapipe;
use std::error::Error;
/// A generic result
pub type TResult<T> = Result<T, Box<dyn Error>>;

@ -341,6 +341,7 @@ impl SimpleQuery {
}
}
pub fn add(&mut self, cmd: &str) {
// FIXME(@ohsayan): This should take the UTF8 repr's length
let ref mut layout = self.metalayout;
let ref mut df = self.dataframe;
let len = cmd.len().to_string();

@ -27,18 +27,23 @@ use std::sync::{self, Arc, RwLock};
/// Results from actions on the Database
pub type ActionResult<T> = Result<T, RespCodes>;
/// This is a thread-safe database handle, which on cloning simply
/// gives another atomic reference to the `Coretable`
#[derive(Debug, Clone)]
pub struct CoreDB {
shared: Arc<Coretable>,
terminate: bool,
}
/// The `Coretable` holds all the key-value pairs in a `HashMap`
/// wrapped in a Read/Write lock
#[derive(Debug)]
pub struct Coretable {
coremap: RwLock<HashMap<String, String>>,
}
impl CoreDB {
/// GET a `key`
pub fn get(&self, key: &str) -> ActionResult<String> {
if let Some(value) = self.acquire_read().get(key) {
Ok(value.to_string())
@ -46,6 +51,7 @@ impl CoreDB {
Err(RespCodes::NotFound)
}
}
/// SET a `key` to `value`
pub fn set(&self, key: &str, value: &str) -> ActionResult<()> {
match self.acquire_write().entry(key.to_string()) {
Entry::Occupied(_) => return Err(RespCodes::OverwriteError),
@ -55,6 +61,7 @@ impl CoreDB {
}
}
}
/// UPDATE a `key` to `value`
pub fn update(&self, key: &str, value: &str) -> ActionResult<()> {
match self.acquire_write().entry(key.to_string()) {
Entry::Occupied(ref mut e) => {
@ -64,6 +71,7 @@ impl CoreDB {
Entry::Vacant(_) => Err(RespCodes::NotFound),
}
}
/// DEL a `key`
pub fn del(&self, key: &str) -> ActionResult<()> {
if let Some(_) = self.acquire_write().remove(&key.to_owned()) {
Ok(())
@ -72,10 +80,12 @@ impl CoreDB {
}
}
#[cfg(Debug)]
/// Flush the coretable entries when in debug mode
pub fn print_debug_table(&self) {
println!("{:#?}", *self.coremap.read().unwrap());
}
/// Execute a query that has already been validated by `Connection::read_query`
pub fn execute_query(&self, df: QueryDataframe) -> Vec<u8> {
match df.actiontype {
ActionType::Simple => self.execute_simple(df.data),
@ -83,12 +93,14 @@ impl CoreDB {
ActionType::Pipeline => unimplemented!(),
}
}
/// Execute a simple(*) query
pub fn execute_simple(&self, buf: Vec<String>) -> Vec<u8> {
let mut buf = buf.into_iter();
while let Some(token) = buf.next() {
match token.to_uppercase().as_str() {
tags::TAG_GET => {
// This is a GET request
// This is a GET query
if let Some(key) = buf.next() {
if buf.next().is_none() {
let res = match self.get(&key.to_string()) {
@ -102,7 +114,7 @@ impl CoreDB {
}
}
tags::TAG_SET => {
// This is a SET request
// This is a SET query
if let Some(key) = buf.next() {
if let Some(value) = buf.next() {
if buf.next().is_none() {
@ -139,7 +151,7 @@ impl CoreDB {
}
}
tags::TAG_DEL => {
// This is a GET request
// This is a DEL query
if let Some(key) = buf.next() {
if buf.next().is_none() {
match self.del(&key.to_string()) {
@ -170,6 +182,7 @@ impl CoreDB {
}
RespCodes::ArgumentError.into_response()
}
/// Create a new `CoreDB` instance
pub fn new() -> Self {
CoreDB {
shared: Arc::new(Coretable {
@ -178,9 +191,11 @@ impl CoreDB {
terminate: false,
}
}
/// Acquire a write lock
fn acquire_write(&self) -> sync::RwLockWriteGuard<'_, HashMap<String, String>> {
self.shared.coremap.write().unwrap()
}
/// Acquire a read lock
fn acquire_read(&self) -> sync::RwLockReadGuard<'_, HashMap<String, String>> {
self.shared.coremap.read().unwrap()
}

@ -37,6 +37,7 @@ pub struct Terminator {
}
impl Terminator {
/// Create a new `Terminator` instance
pub fn new(signal: broadcast::Receiver<()>) -> Self {
Terminator {
// Don't terminate on creation!
@ -44,9 +45,11 @@ impl Terminator {
signal,
}
}
/// Check if the signal is a termination signal
pub fn is_termination_signal(&self) -> bool {
self.terminate
}
/// Check if a shutdown signal was received
pub async fn receive_signal(&mut self) {
// The server may have already been terminated
// In that event, just return
@ -60,6 +63,7 @@ impl Terminator {
// We'll use the idea of gracefully shutting down from tokio
/// A listener
pub struct Listener {
/// An atomic reference to the coretable
db: CoreDB,
@ -75,6 +79,7 @@ pub struct Listener {
terminate_rx: mpsc::Receiver<()>,
}
/// A per-connection handler
struct CHandler {
db: CoreDB,
con: Connection,
@ -84,6 +89,7 @@ struct CHandler {
}
impl Listener {
/// Accept an incoming connection
async fn accept(&mut self) -> TResult<TcpStream> {
// We will steal the idea of Ethernet's backoff for connection errors
let mut backoff = 1;
@ -104,6 +110,7 @@ impl Listener {
backoff *= 2;
}
}
/// Run the server
pub async fn run(&mut self) -> TResult<()> {
loop {
// Take the permit first, but we won't use it right now
@ -125,6 +132,7 @@ impl Listener {
}
impl CHandler {
/// Process the incoming connection
async fn run(&mut self) {
while !self.terminator.is_termination_signal() {
let try_df = tokio::select! {
@ -143,10 +151,13 @@ impl CHandler {
impl Drop for CHandler {
fn drop(&mut self) {
// Make sure that the permit is returned to the semaphore
// in the case that there is a panic inside
self.climit.add_permits(1);
}
}
/// Start the server waiting for incoming connections or a CTRL+C signal
pub async fn run(listener: TcpListener, sig: impl Future) {
let (signal, _) = broadcast::channel(1);
let (terminate_tx, terminate_rx) = mpsc::channel(1);

@ -33,5 +33,7 @@ static ADDR: &'static str = "127.0.0.1:2003";
async fn main() {
let listener = TcpListener::bind(ADDR).await.unwrap();
println!("Server running on terrapipe://127.0.0.1:2003");
// Start the server which asynchronously waits for a CTRL+C signal
// which will safely shut down the server
run(listener, signal::ctrl_c()).await;
}

@ -35,12 +35,19 @@ pub struct QueryDataframe {
#[derive(Debug, PartialEq)]
pub struct PreQMF {
/// The type of action: Simple/Pipelined
action_type: ActionType,
/// The content size excluding the metaline length
content_size: usize,
/// The length of the metaline
metaline_size: usize,
}
impl PreQMF {
/// Create a new PreQueryMetaframe from a `String`
/// ## Errors
/// This returns `Respcodes` as an error and hence this error can be directly
/// written to the stream
pub fn from_buffer(buf: String) -> Result<Self, RespCodes> {
let buf: Vec<&str> = buf.split('!').collect();
if let (Some(atype), Some(csize), Some(metaline_size)) =
@ -90,25 +97,36 @@ fn test_preqmf() {
assert_eq!(preqmf, pqmf_should_be);
}
/// A TCP connection wrapper
pub struct Connection {
stream: TcpStream,
}
impl Connection {
/// Initiailize a new `Connection` instance
pub fn new(stream: TcpStream) -> Self {
Connection { stream }
}
/// Read a query
///
/// This will return a QueryDataframe if parsing is successful - otherwise
/// it returns a `RespCodes` variant which can be converted into a response
pub async fn read_query(&mut self) -> Result<QueryDataframe, RespCodes> {
let mut bufreader = BufReader::new(&mut self.stream);
let mut metaline_buf = String::with_capacity(DEF_QMETALINE_BUFSIZE);
// First read the metaline
// TODO: We will use a read buffer in the future and then do all the
// actions below to improve efficiency - it would be way more efficient
bufreader.read_line(&mut metaline_buf).await.unwrap();
let pqmf = PreQMF::from_buffer(metaline_buf)?;
let (mut metalayout_buf, mut dataframe_buf) = (
String::with_capacity(pqmf.metaline_size),
vec![0; pqmf.content_size],
);
// Read the metalayout
bufreader.read_line(&mut metalayout_buf).await.unwrap();
let ss = get_sizes(metalayout_buf)?;
// Read the dataframe
bufreader.read(&mut dataframe_buf).await.unwrap();
let qdf = QueryDataframe {
data: extract_idents(dataframe_buf, ss),
@ -116,6 +134,7 @@ impl Connection {
};
Ok(qdf)
}
/// Write a response to the stream
pub async fn write_response(&mut self, resp: Vec<u8>) {
if let Err(_) = self.stream.write_all(&resp).await {
eprintln!(
@ -124,6 +143,7 @@ impl Connection {
);
return;
}
// Flush the stream to make sure that the data was delivered
if let Err(_) = self.stream.flush().await {
eprintln!(
"Error while flushing data to stream: {:?}",
@ -132,6 +152,8 @@ impl Connection {
return;
}
}
/// Wraps around the `write_response` used to differentiate between a
/// success response and an error response
pub async fn close_conn_with_error(&mut self, bytes: impl RespBytes) {
self.write_response(bytes.into_response()).await
}

Loading…
Cancel
Save