From 37d4b808e14dfc4eaa34f76222b883bc64ab8fe6 Mon Sep 17 00:00:00 2001 From: Sayan Nandan Date: Fri, 24 Jul 2020 12:23:22 +0530 Subject: [PATCH] Add docs --- cli/src/client.rs | 11 +++++++++-- corelib/src/lib.rs | 1 + corelib/src/terrapipe.rs | 1 + server/src/coredb.rs | 21 ++++++++++++++++++--- server/src/dbnet.rs | 11 +++++++++++ server/src/main.rs | 2 ++ server/src/protocol.rs | 22 ++++++++++++++++++++++ 7 files changed, 64 insertions(+), 5 deletions(-) diff --git a/cli/src/client.rs b/cli/src/client.rs index dd6ed4bb..e76b0e6b 100644 --- a/cli/src/client.rs +++ b/cli/src/client.rs @@ -23,11 +23,11 @@ use corelib::{ terrapipe::{self, ActionType, QueryBuilder, RespCodes, DEF_QMETALAYOUT_BUFSIZE}, TResult, }; -use std::{error::Error, fmt, process}; +use std::{error::Error, fmt}; use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader}; use tokio::net::TcpStream; -use std::future::Future; +/// Errors that may occur while parsing responses from the server #[derive(Debug)] pub enum ClientError { RespCode(RespCodes), @@ -48,10 +48,12 @@ impl fmt::Display for ClientError { impl Error for ClientError {} +/// A client pub struct Client { con: TcpStream, } +/// The Request metaline pub struct RMetaline { content_size: usize, metalayout_size: usize, @@ -60,6 +62,7 @@ pub struct RMetaline { } impl RMetaline { + /// Decode a metaline from a `String` buffer pub fn from_buf(buf: String) -> TResult { let parts: Vec<&str> = buf.split('!').collect(); if let (Some(resptype), Some(respcode), Some(clength), Some(metalayout_size)) = @@ -95,10 +98,13 @@ impl RMetaline { } impl Client { + /// Create a new client instance pub async fn new(addr: &str) -> TResult { let con = TcpStream::connect(addr).await?; Ok(Client { con }) } + /// Run a query read from stdin. This function will take care of everything + /// including printing errors pub async fn run(&mut self, cmd: String) { if cmd.len() == 0 { return; @@ -117,6 +123,7 @@ impl Client { }; } } + /// Run a query, reading and writng to the stream async fn run_query(&mut self, (_, query_bytes): (usize, Vec)) -> TResult> { self.con.write_all(&query_bytes).await?; let mut metaline_buf = String::with_capacity(DEF_QMETALAYOUT_BUFSIZE); diff --git a/corelib/src/lib.rs b/corelib/src/lib.rs index c9dfd445..4b9e1f01 100644 --- a/corelib/src/lib.rs +++ b/corelib/src/lib.rs @@ -25,4 +25,5 @@ pub mod terrapipe; use std::error::Error; +/// A generic result pub type TResult = Result>; diff --git a/corelib/src/terrapipe.rs b/corelib/src/terrapipe.rs index e7b363ee..ca8c40cb 100644 --- a/corelib/src/terrapipe.rs +++ b/corelib/src/terrapipe.rs @@ -341,6 +341,7 @@ impl SimpleQuery { } } pub fn add(&mut self, cmd: &str) { + // FIXME(@ohsayan): This should take the UTF8 repr's length let ref mut layout = self.metalayout; let ref mut df = self.dataframe; let len = cmd.len().to_string(); diff --git a/server/src/coredb.rs b/server/src/coredb.rs index 4e6cf6cc..1936d29f 100644 --- a/server/src/coredb.rs +++ b/server/src/coredb.rs @@ -27,18 +27,23 @@ use std::sync::{self, Arc, RwLock}; /// Results from actions on the Database pub type ActionResult = Result; +/// This is a thread-safe database handle, which on cloning simply +/// gives another atomic reference to the `Coretable` #[derive(Debug, Clone)] pub struct CoreDB { shared: Arc, terminate: bool, } +/// The `Coretable` holds all the key-value pairs in a `HashMap` +/// wrapped in a Read/Write lock #[derive(Debug)] pub struct Coretable { coremap: RwLock>, } impl CoreDB { + /// GET a `key` pub fn get(&self, key: &str) -> ActionResult { if let Some(value) = self.acquire_read().get(key) { Ok(value.to_string()) @@ -46,6 +51,7 @@ impl CoreDB { Err(RespCodes::NotFound) } } + /// SET a `key` to `value` pub fn set(&self, key: &str, value: &str) -> ActionResult<()> { match self.acquire_write().entry(key.to_string()) { Entry::Occupied(_) => return Err(RespCodes::OverwriteError), @@ -55,6 +61,7 @@ impl CoreDB { } } } + /// UPDATE a `key` to `value` pub fn update(&self, key: &str, value: &str) -> ActionResult<()> { match self.acquire_write().entry(key.to_string()) { Entry::Occupied(ref mut e) => { @@ -64,6 +71,7 @@ impl CoreDB { Entry::Vacant(_) => Err(RespCodes::NotFound), } } + /// DEL a `key` pub fn del(&self, key: &str) -> ActionResult<()> { if let Some(_) = self.acquire_write().remove(&key.to_owned()) { Ok(()) @@ -72,10 +80,12 @@ impl CoreDB { } } #[cfg(Debug)] + /// Flush the coretable entries when in debug mode pub fn print_debug_table(&self) { println!("{:#?}", *self.coremap.read().unwrap()); } + /// Execute a query that has already been validated by `Connection::read_query` pub fn execute_query(&self, df: QueryDataframe) -> Vec { match df.actiontype { ActionType::Simple => self.execute_simple(df.data), @@ -83,12 +93,14 @@ impl CoreDB { ActionType::Pipeline => unimplemented!(), } } + + /// Execute a simple(*) query pub fn execute_simple(&self, buf: Vec) -> Vec { let mut buf = buf.into_iter(); while let Some(token) = buf.next() { match token.to_uppercase().as_str() { tags::TAG_GET => { - // This is a GET request + // This is a GET query if let Some(key) = buf.next() { if buf.next().is_none() { let res = match self.get(&key.to_string()) { @@ -102,7 +114,7 @@ impl CoreDB { } } tags::TAG_SET => { - // This is a SET request + // This is a SET query if let Some(key) = buf.next() { if let Some(value) = buf.next() { if buf.next().is_none() { @@ -139,7 +151,7 @@ impl CoreDB { } } tags::TAG_DEL => { - // This is a GET request + // This is a DEL query if let Some(key) = buf.next() { if buf.next().is_none() { match self.del(&key.to_string()) { @@ -170,6 +182,7 @@ impl CoreDB { } RespCodes::ArgumentError.into_response() } + /// Create a new `CoreDB` instance pub fn new() -> Self { CoreDB { shared: Arc::new(Coretable { @@ -178,9 +191,11 @@ impl CoreDB { terminate: false, } } + /// Acquire a write lock fn acquire_write(&self) -> sync::RwLockWriteGuard<'_, HashMap> { self.shared.coremap.write().unwrap() } + /// Acquire a read lock fn acquire_read(&self) -> sync::RwLockReadGuard<'_, HashMap> { self.shared.coremap.read().unwrap() } diff --git a/server/src/dbnet.rs b/server/src/dbnet.rs index e5fe34a3..977be9e1 100644 --- a/server/src/dbnet.rs +++ b/server/src/dbnet.rs @@ -37,6 +37,7 @@ pub struct Terminator { } impl Terminator { + /// Create a new `Terminator` instance pub fn new(signal: broadcast::Receiver<()>) -> Self { Terminator { // Don't terminate on creation! @@ -44,9 +45,11 @@ impl Terminator { signal, } } + /// Check if the signal is a termination signal pub fn is_termination_signal(&self) -> bool { self.terminate } + /// Check if a shutdown signal was received pub async fn receive_signal(&mut self) { // The server may have already been terminated // In that event, just return @@ -60,6 +63,7 @@ impl Terminator { // We'll use the idea of gracefully shutting down from tokio +/// A listener pub struct Listener { /// An atomic reference to the coretable db: CoreDB, @@ -75,6 +79,7 @@ pub struct Listener { terminate_rx: mpsc::Receiver<()>, } +/// A per-connection handler struct CHandler { db: CoreDB, con: Connection, @@ -84,6 +89,7 @@ struct CHandler { } impl Listener { + /// Accept an incoming connection async fn accept(&mut self) -> TResult { // We will steal the idea of Ethernet's backoff for connection errors let mut backoff = 1; @@ -104,6 +110,7 @@ impl Listener { backoff *= 2; } } + /// Run the server pub async fn run(&mut self) -> TResult<()> { loop { // Take the permit first, but we won't use it right now @@ -125,6 +132,7 @@ impl Listener { } impl CHandler { + /// Process the incoming connection async fn run(&mut self) { while !self.terminator.is_termination_signal() { let try_df = tokio::select! { @@ -143,10 +151,13 @@ impl CHandler { impl Drop for CHandler { fn drop(&mut self) { + // Make sure that the permit is returned to the semaphore + // in the case that there is a panic inside self.climit.add_permits(1); } } +/// Start the server waiting for incoming connections or a CTRL+C signal pub async fn run(listener: TcpListener, sig: impl Future) { let (signal, _) = broadcast::channel(1); let (terminate_tx, terminate_rx) = mpsc::channel(1); diff --git a/server/src/main.rs b/server/src/main.rs index 2619076d..1c55b820 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -33,5 +33,7 @@ static ADDR: &'static str = "127.0.0.1:2003"; async fn main() { let listener = TcpListener::bind(ADDR).await.unwrap(); println!("Server running on terrapipe://127.0.0.1:2003"); + // Start the server which asynchronously waits for a CTRL+C signal + // which will safely shut down the server run(listener, signal::ctrl_c()).await; } diff --git a/server/src/protocol.rs b/server/src/protocol.rs index f6bd7733..bbb68ab7 100644 --- a/server/src/protocol.rs +++ b/server/src/protocol.rs @@ -35,12 +35,19 @@ pub struct QueryDataframe { #[derive(Debug, PartialEq)] pub struct PreQMF { + /// The type of action: Simple/Pipelined action_type: ActionType, + /// The content size excluding the metaline length content_size: usize, + /// The length of the metaline metaline_size: usize, } impl PreQMF { + /// Create a new PreQueryMetaframe from a `String` + /// ## Errors + /// This returns `Respcodes` as an error and hence this error can be directly + /// written to the stream pub fn from_buffer(buf: String) -> Result { let buf: Vec<&str> = buf.split('!').collect(); if let (Some(atype), Some(csize), Some(metaline_size)) = @@ -90,25 +97,36 @@ fn test_preqmf() { assert_eq!(preqmf, pqmf_should_be); } +/// A TCP connection wrapper pub struct Connection { stream: TcpStream, } impl Connection { + /// Initiailize a new `Connection` instance pub fn new(stream: TcpStream) -> Self { Connection { stream } } + /// Read a query + /// + /// This will return a QueryDataframe if parsing is successful - otherwise + /// it returns a `RespCodes` variant which can be converted into a response pub async fn read_query(&mut self) -> Result { let mut bufreader = BufReader::new(&mut self.stream); let mut metaline_buf = String::with_capacity(DEF_QMETALINE_BUFSIZE); + // First read the metaline + // TODO: We will use a read buffer in the future and then do all the + // actions below to improve efficiency - it would be way more efficient bufreader.read_line(&mut metaline_buf).await.unwrap(); let pqmf = PreQMF::from_buffer(metaline_buf)?; let (mut metalayout_buf, mut dataframe_buf) = ( String::with_capacity(pqmf.metaline_size), vec![0; pqmf.content_size], ); + // Read the metalayout bufreader.read_line(&mut metalayout_buf).await.unwrap(); let ss = get_sizes(metalayout_buf)?; + // Read the dataframe bufreader.read(&mut dataframe_buf).await.unwrap(); let qdf = QueryDataframe { data: extract_idents(dataframe_buf, ss), @@ -116,6 +134,7 @@ impl Connection { }; Ok(qdf) } + /// Write a response to the stream pub async fn write_response(&mut self, resp: Vec) { if let Err(_) = self.stream.write_all(&resp).await { eprintln!( @@ -124,6 +143,7 @@ impl Connection { ); return; } + // Flush the stream to make sure that the data was delivered if let Err(_) = self.stream.flush().await { eprintln!( "Error while flushing data to stream: {:?}", @@ -132,6 +152,8 @@ impl Connection { return; } } + /// Wraps around the `write_response` used to differentiate between a + /// success response and an error response pub async fn close_conn_with_error(&mut self, bytes: impl RespBytes) { self.write_response(bytes.into_response()).await }