From 755e8d80f4ad122aa4ac641003b22b148d2056d4 Mon Sep 17 00:00:00 2001 From: Sayan Nandan Date: Tue, 21 Jul 2020 11:20:43 +0530 Subject: [PATCH] Add a connection abstraction Also enabled safe dropping of the `Coretable` --- corelib/src/lib.rs | 9 ++++-- corelib/src/terrapipe.rs | 39 ++++++++++++++++++++-- server/src/coredb.rs | 28 ++++++++++++---- server/src/main.rs | 20 +++++------- server/src/protocol.rs | 70 +++++++++++++++++++++++++--------------- 5 files changed, 116 insertions(+), 50 deletions(-) diff --git a/corelib/src/lib.rs b/corelib/src/lib.rs index 2c591be7..52e0e5d3 100644 --- a/corelib/src/lib.rs +++ b/corelib/src/lib.rs @@ -8,15 +8,18 @@ * it under the terms of the GNU Affero General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. - * + * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. - * + * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * */ -pub mod terrapipe; \ No newline at end of file +//! The core library for the Terrabase database +//! This contains modules which are shared by both the `cli` and the `server` modules + +pub mod terrapipe; diff --git a/corelib/src/terrapipe.rs b/corelib/src/terrapipe.rs index 294310c6..c4a23d38 100644 --- a/corelib/src/terrapipe.rs +++ b/corelib/src/terrapipe.rs @@ -19,28 +19,48 @@ * */ +//! This implements the Terrapipe protocol + +/// Default query metaline buffer size pub const DEF_QMETALINE_BUFSIZE: usize = 44; +/// Default query metalayout buffer size pub const DEF_QMETALAYOUT_BUFSIZE: usize = 1024; +/// Default query dataframe buffer size pub const DEF_QDATAFRAME_BUSIZE: usize = 4096; pub mod tags { + //! This module is a collection of tags/strings used for evaluating queries + //! and responses + /// `GET` command tag pub const TAG_GET: &'static str = "GET"; + /// `SET` command tag pub const TAG_SET: &'static str = "SET"; + /// `UPDATE` command tag pub const TAG_UPDATE: &'static str = "UPDATE"; + /// `DEL` command tag pub const TAG_DEL: &'static str = "DEL"; + /// `HEYA` command tag pub const TAG_HEYA: &'static str = "HEYA"; } pub mod responses { + //! Empty responses, mostly errors, which are statically compiled use lazy_static::lazy_static; lazy_static! { + /// Empty `0`(Okay) response - without any content pub static ref RESP_OKAY_EMPTY: Vec = "0!0!0".as_bytes().to_owned(); + /// `1` Not found response pub static ref RESP_NOT_FOUND: Vec = "1!0!0".as_bytes().to_owned(); + /// `2` Overwrite Error response pub static ref RESP_OVERWRITE_ERROR: Vec = "2!0!0".as_bytes().to_owned(); + /// `3` Invalid Metaframe response pub static ref RESP_INVALID_MF: Vec = "3!0!0".as_bytes().to_owned(); + /// `4` Incomplete frame response pub static ref RESP_INCOMPLETE: Vec = "4!0!0".as_bytes().to_owned(); + /// `5` Internal server error response pub static ref RESP_SERVER_ERROR: Vec = "5!0!0".as_bytes().to_owned(); } } +/// Response codes returned by the server #[derive(Debug, PartialEq)] pub enum RespCodes { /// `0`: Okay (Empty Response) - use the `ResponseBuilder` for building @@ -75,6 +95,7 @@ impl From for u8 { } } +/// Representation of the query action type - pipelined or simple #[derive(Debug, PartialEq)] pub enum ActionType { Simple, @@ -103,22 +124,28 @@ impl RespBytes for RespCodes { } } +/// The query dataframe #[derive(Debug)] pub struct QueryDataframe { + /// The data part pub data: Vec, + /// The query action type pub actiontype: ActionType, } +/// This is enum represents types of responses which can be built from it pub enum ResponseBuilder { SimpleResponse, // TODO: Add pipelined response builder here } impl ResponseBuilder { + /// Create a new simple response pub fn new_simple(respcode: RespCodes) -> SimpleResponse { SimpleResponse::new(respcode.into()) } } +/// Representation of a simple response pub struct SimpleResponse { respcode: u8, metalayout_buf: String, @@ -127,6 +154,8 @@ pub struct SimpleResponse { } impl SimpleResponse { + /// Create a new response with just a response code + /// The data has to be added by using the `add_data()` member function pub fn new(respcode: u8) -> Self { SimpleResponse { respcode, @@ -135,13 +164,17 @@ impl SimpleResponse { size_tracker: 0, } } + /// Add data to the response pub fn add_data(&mut self, data: String) { - self.metalayout_buf.push_str(&format!("{}#", data.len())); - self.size_tracker += data.len() + 1; + let datstr = data.len().to_string(); + self.metalayout_buf.push_str(&format!("{}#", datstr.len())); + self.size_tracker += datstr.len() + 1; self.dataframe_buf.push_str(&data); self.dataframe_buf.push('\n'); } - pub fn prepare_response(&self) -> Vec { + /// Internal function used in the implementation of the `RespBytes` trait + /// for creating a `Vec` which can be written to a TCP stream + fn prepare_response(&self) -> Vec { format!( "{}!{}!{}\n{}\n{}", self.respcode, diff --git a/server/src/coredb.rs b/server/src/coredb.rs index 2018e22e..ef92ec4c 100644 --- a/server/src/coredb.rs +++ b/server/src/coredb.rs @@ -20,14 +20,16 @@ */ use corelib::terrapipe::QueryDataframe; -use corelib::terrapipe::{responses, tags, ActionType, RespBytes, RespCodes, ResponseBuilder}; +use corelib::terrapipe::{tags, ActionType, RespBytes, RespCodes, ResponseBuilder}; use std::collections::{hash_map::Entry, HashMap}; use std::sync::{Arc, RwLock}; -pub type DbResult = Result; +/// Results from actions on the Database +pub type ActionResult = Result; pub struct CoreDB { shared: Arc, + terminate: bool, } pub struct Coretable { @@ -35,14 +37,14 @@ pub struct Coretable { } impl Coretable { - pub fn get(&self, key: &str) -> DbResult { + pub fn get(&self, key: &str) -> ActionResult { if let Some(value) = self.coremap.read().unwrap().get(key) { Ok(value.to_string()) } else { Err(RespCodes::NotFound) } } - pub fn set(&self, key: &str, value: &str) -> DbResult<()> { + pub fn set(&self, key: &str, value: &str) -> ActionResult<()> { match self.coremap.write().unwrap().entry(key.to_string()) { Entry::Occupied(_) => return Err(RespCodes::OverwriteError), Entry::Vacant(e) => { @@ -51,7 +53,7 @@ impl Coretable { } } } - pub fn update(&self, key: &str, value: &str) -> DbResult<()> { + pub fn update(&self, key: &str, value: &str) -> ActionResult<()> { match self.coremap.write().unwrap().entry(key.to_string()) { Entry::Occupied(ref mut e) => { e.insert(value.to_string()); @@ -60,7 +62,7 @@ impl Coretable { Entry::Vacant(_) => Err(RespCodes::NotFound), } } - pub fn del(&self, key: &str) -> DbResult<()> { + pub fn del(&self, key: &str) -> ActionResult<()> { if let Some(_) = self.coremap.write().unwrap().remove(&key.to_owned()) { Ok(()) } else { @@ -169,9 +171,23 @@ impl CoreDB { shared: Arc::new(Coretable { coremap: RwLock::new(HashMap::new()), }), + terminate: false, } } pub fn get_handle(&self) -> Arc { Arc::clone(&self.shared) } } + +impl Drop for CoreDB { + // This prevents us from killing the database, in the event someone tries + // to access it + fn drop(&mut self) { + if Arc::strong_count(&self.shared) == 1 { + // Acquire a lock to prevent anyone from writing something + let coremap = self.shared.coremap.write().unwrap(); + self.terminate = true; + drop(coremap); + } + } +} diff --git a/server/src/main.rs b/server/src/main.rs index b600d43e..c454eff9 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -19,13 +19,12 @@ * */ -use tokio::io::AsyncWriteExt; -use tokio::net::{TcpListener, TcpStream}; +use tokio::net::TcpListener; mod coredb; +mod dbnet; mod protocol; use coredb::CoreDB; -use corelib::terrapipe::RespBytes; -use protocol::read_query; +use protocol::Connection; static ADDR: &'static str = "127.0.0.1:2003"; #[tokio::main] @@ -35,18 +34,15 @@ async fn main() { let db = CoreDB::new(); loop { let handle = db.get_handle(); - let (mut socket, _) = listener.accept().await.unwrap(); + let (socket, _) = listener.accept().await.unwrap(); tokio::spawn(async move { - let q = read_query(&mut socket).await; + let mut con = Connection::new(socket); + let q = con.read_query().await; let df = match q { Ok(q) => q, - Err(e) => return close_conn_with_error(socket, e).await, + Err(e) => return con.close_conn_with_error(e).await, }; - socket.write_all(&handle.execute_query(df)).await.unwrap(); + con.write_response(handle.execute_query(df)).await; }); } } - -async fn close_conn_with_error(mut stream: TcpStream, bytes: impl RespBytes) { - stream.write_all(&bytes.into_response()).await.unwrap() -} diff --git a/server/src/protocol.rs b/server/src/protocol.rs index 8664ea3f..54e2132a 100644 --- a/server/src/protocol.rs +++ b/server/src/protocol.rs @@ -21,8 +21,7 @@ use corelib::terrapipe::{ActionType, QueryDataframe}; use corelib::terrapipe::{RespBytes, RespCodes, DEF_QMETALINE_BUFSIZE}; -use tokio::io::{AsyncBufReadExt, AsyncReadExt, BufReader}; - +use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader}; use tokio::net::TcpStream; #[derive(Debug, PartialEq)] @@ -115,7 +114,7 @@ fn extract_idents(buf: Vec, skip_sequence: Vec) -> Vec { .scan(buf.into_iter(), |databuf, size| { let tok: Vec = databuf.take(size).collect(); let _ = databuf.next(); - // FIXME(ohsayan): This is quite slow, we'll have to use SIMD in the future + // FIXME(@ohsayan): This is quite slow, we'll have to use SIMD in the future Some(String::from_utf8_lossy(&tok).to_string()) }) .collect() @@ -137,27 +136,46 @@ fn test_extract_idents() { assert_eq!(res[1], "��"); } -pub async fn read_query(mut stream: &mut TcpStream) -> Result { - let mut bufreader = BufReader::new(&mut stream); - let mut metaline_buf = String::with_capacity(DEF_QMETALINE_BUFSIZE); - bufreader.read_line(&mut metaline_buf).await.unwrap(); - let pqmf = match PreQMF::from_buffer(metaline_buf) { - Ok(pq) => pq, - Err(e) => return Err(e), - }; - let (mut metalayout_buf, mut dataframe_buf) = ( - String::with_capacity(pqmf.metaline_size), - vec![0; pqmf.content_size], - ); - bufreader.read_line(&mut metalayout_buf).await.unwrap(); - let ss = match get_sizes(metalayout_buf) { - Ok(ss) => ss, - Err(e) => return Err(e), - }; - bufreader.read(&mut dataframe_buf).await.unwrap(); - let qdf = QueryDataframe { - data: extract_idents(dataframe_buf, ss), - actiontype: pqmf.action_type, - }; - Ok(qdf) +pub struct Connection { + stream: TcpStream, +} + +impl Connection { + pub fn new(stream: TcpStream) -> Self { + Connection { stream } + } + pub async fn read_query(&mut self) -> Result { + let mut bufreader = BufReader::new(&mut self.stream); + let mut metaline_buf = String::with_capacity(DEF_QMETALINE_BUFSIZE); + bufreader.read_line(&mut metaline_buf).await.unwrap(); + let pqmf = match PreQMF::from_buffer(metaline_buf) { + Ok(pq) => pq, + Err(e) => return Err(e), + }; + let (mut metalayout_buf, mut dataframe_buf) = ( + String::with_capacity(pqmf.metaline_size), + vec![0; pqmf.content_size], + ); + bufreader.read_line(&mut metalayout_buf).await.unwrap(); + let ss = match get_sizes(metalayout_buf) { + Ok(ss) => ss, + Err(e) => return Err(e), + }; + bufreader.read(&mut dataframe_buf).await.unwrap(); + let qdf = QueryDataframe { + data: extract_idents(dataframe_buf, ss), + actiontype: pqmf.action_type, + }; + Ok(qdf) + } + pub async fn write_response(&mut self, resp: Vec) { + if let Ok(_) = self.stream.write(&resp).await { + return; + } else { + eprintln!("Error writing response to {:?}", self.stream.peer_addr()) + } + } + pub async fn close_conn_with_error(&mut self, bytes: impl RespBytes) { + self.stream.write_all(&bytes.into_response()).await.unwrap() + } }