From 730ff3fa41dbdd023461063c445f17367903dc7a Mon Sep 17 00:00:00 2001 From: Sayan Nandan Date: Thu, 30 Jul 2020 13:35:44 +0530 Subject: [PATCH] Move and document deserializer module --- CHANGELOG.md | 6 + Cargo.lock | 6 +- README.md | 2 +- cli/Cargo.toml | 2 +- corelib/Cargo.toml | 2 +- corelib/src/terrapipe.rs | 20 +- server/Cargo.toml | 2 +- server/src/coredb.rs | 7 +- server/src/dbnet.rs | 2 +- server/src/protocol/deserializer.rs | 334 ++++++++++++++++++++++++++++ server/src/protocol/mod.rs | 267 +++------------------- 11 files changed, 391 insertions(+), 259 deletions(-) create mode 100644 server/src/protocol/deserializer.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index 6e9136f3..2284e616 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,12 @@ All changes in this project will be noted in this file. +## Version 0.3.1 [2020-07-30] + +> No breaking changes + +This release fixes #7 + ## Version 0.3.0 [2020-07-28] > No breaking changes diff --git a/Cargo.lock b/Cargo.lock index 1da37dbb..959b82b1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -51,7 +51,7 @@ dependencies = [ [[package]] name = "corelib" -version = "0.2.0" +version = "0.3.1" dependencies = [ "lazy_static", ] @@ -354,7 +354,7 @@ dependencies = [ [[package]] name = "tdb" -version = "0.3.0" +version = "0.3.1" dependencies = [ "bincode", "bytes", @@ -400,7 +400,7 @@ dependencies = [ [[package]] name = "tsh" -version = "0.2.0" +version = "0.3.1" dependencies = [ "corelib", "tokio", diff --git a/README.md b/README.md index d1938660..4ffa802f 100644 --- a/README.md +++ b/README.md @@ -22,7 +22,7 @@ The releases are uploaded in bundles, for example, `tdb-bundle-v0.2.0-x86_64-unk * Download a bundle for your platform from [releases](https://github.com/terrabasedb/terrabase/releases) * Unzip the downloaded bundle -* Run `chmod +x tdb tsh` (on Unix systems) +* Make the files executable (run `chmod +x tdb tsh` on Unix systems) * Start the database server by running `./tdb` * Start the client by running `./tsh` * You can run commands like `SET sayan 17` , `GET cat` , `UPDATE cat 100` or `DEL cat` ! diff --git a/cli/Cargo.toml b/cli/Cargo.toml index 15ccdf98..518e732a 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "tsh" -version = "0.2.0" +version = "0.3.1" authors = ["Sayan Nandan "] edition = "2018" diff --git a/corelib/Cargo.toml b/corelib/Cargo.toml index c04d7cf2..c545e851 100644 --- a/corelib/Cargo.toml +++ b/corelib/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "corelib" -version = "0.2.0" +version = "0.3.1" authors = ["Sayan Nandan "] edition = "2018" diff --git a/corelib/src/terrapipe.rs b/corelib/src/terrapipe.rs index 6cc68542..d8d85963 100644 --- a/corelib/src/terrapipe.rs +++ b/corelib/src/terrapipe.rs @@ -60,8 +60,6 @@ pub mod responses { pub static ref RESP_ARG_ERROR: Vec = "*!4!0!0\n".as_bytes().to_owned(); /// `5` Internal server error response pub static ref RESP_SERVER_ERROR: Vec = "*!5!0!0\n".as_bytes().to_owned(); - /// `6` Internal server error response - pub static ref RESP_INCOMPLETE: Vec = "*!6!0!0\n".as_bytes().to_owned(); } } @@ -136,9 +134,7 @@ pub enum RespCodes { ArgumentError, /// `5`: Server Error ServerError, - /// `6`: Incomplete - Incomplete, - /// `7`: Some other error - the wrapped `String` will be returned in the response body. + /// `6`: Some other error - the wrapped `String` will be returned in the response body. /// Just a note, this gets quite messy, especially when we're using it for deconding responses OtherError(Option), } @@ -153,7 +149,6 @@ impl fmt::Display for RespCodes { InvalidMetaframe => write!(f, "ERROR: Invalid metaframe"), ArgumentError => write!(f, "ERROR: The command is not in the correct format"), ServerError => write!(f, "ERROR: The server had an internal error"), - Incomplete => write!(f, "ERROR: The packet is incomplete"), OtherError(e) => match e { None => write!(f, "ERROR: Some unknown error occurred"), Some(e) => write!(f, "ERROR: {}", e), @@ -174,8 +169,7 @@ impl From for u8 { InvalidMetaframe => 3, ArgumentError => 4, ServerError => 5, - Incomplete => 6, - OtherError(_) => 7, + OtherError(_) => 6, } } } @@ -191,8 +185,7 @@ impl RespCodes { 3 => InvalidMetaframe, 4 => ArgumentError, 5 => ServerError, - 6 => Incomplete, - 7 => OtherError(extra), + 6 => OtherError(extra), _ => return None, }, Err(_) => return None, @@ -225,15 +218,14 @@ impl RespBytes for RespCodes { InvalidMetaframe => RESP_INVALID_MF.to_owned(), ArgumentError => RESP_ARG_ERROR.to_owned(), ServerError => RESP_SERVER_ERROR.to_owned(), - Incomplete => RESP_INCOMPLETE.to_owned(), OtherError(e) => match e { Some(e) => { let dl = e.len().to_string(); - format!("*!7!{}!{}\n#{}\n{}", e.len(), dl.len(), dl, e) + format!("*!6!{}!{}\n#{}\n{}", e.len(), dl.len(), dl, e) .as_bytes() .to_owned() } - None => format!("*!7!0!0\n").as_bytes().to_owned(), + None => format!("*!6!0!0\n").as_bytes().to_owned(), }, } } @@ -370,7 +362,7 @@ impl SimpleQuery { "{}{}!{}\n{}\n{}", self.metaline, self.size_tracker, - self.metalayout.len() + 1 , // include the new line character + self.metalayout.len() + 1, // include the new line character self.metalayout, self.dataframe ) diff --git a/server/Cargo.toml b/server/Cargo.toml index 9ef1490d..f4b29874 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "tdb" -version = "0.3.0" +version = "0.3.1" authors = ["Sayan Nandan "] edition = "2018" diff --git a/server/src/coredb.rs b/server/src/coredb.rs index b27142c9..061d73db 100644 --- a/server/src/coredb.rs +++ b/server/src/coredb.rs @@ -19,7 +19,7 @@ * */ -use crate::protocol::deserializer::Query; +use crate::protocol::Query; use bincode; use corelib::terrapipe::{tags, ActionType, RespBytes, RespCodes, ResponseBuilder}; use corelib::TResult; @@ -188,6 +188,9 @@ impl CoreDB { RespCodes::ArgumentError.into_response() } /// Create a new `CoreDB` instance + /// + /// This also checks if a local backup of previously saved data is available. + /// If it is - it restores the data. Otherwise it creates a new in-memory table pub fn new() -> TResult { let coretable = CoreDB::get_saved()?; if let Some(coretable) = coretable { @@ -214,12 +217,14 @@ impl CoreDB { fn acquire_read(&self) -> RwLockReadGuard<'_, HashMap> { self.shared.coremap.read() } + /// Flush the contents of the in-memory table onto disk pub fn flush_db(&self) -> TResult<()> { let encoded = bincode::serialize(&*self.acquire_read())?; let mut file = fs::File::create("./data.bin")?; file.write_all(&encoded)?; Ok(()) } + /// Try to get the saved data from disk pub fn get_saved() -> TResult>> { let file = match fs::read("./data.bin") { Ok(f) => f, diff --git a/server/src/dbnet.rs b/server/src/dbnet.rs index 09a4107e..2aa7b7f4 100644 --- a/server/src/dbnet.rs +++ b/server/src/dbnet.rs @@ -145,7 +145,7 @@ impl CHandler { }; match try_df { Ok(Q(s)) => self.con.write_response(self.db.execute_query(s)).await, - Ok(E(r)) => return self.con.close_conn_with_error(r).await, + Ok(E(r)) => self.con.close_conn_with_error(r).await, Err(e) => { eprintln!("Error: {}", e); return; diff --git a/server/src/protocol/deserializer.rs b/server/src/protocol/deserializer.rs new file mode 100644 index 00000000..98071c33 --- /dev/null +++ b/server/src/protocol/deserializer.rs @@ -0,0 +1,334 @@ +/* + * Created on Thu Jul 30 2020 + * + * This file is a part of the source code for the Terrabase database + * Copyright (c) 2020, Sayan Nandan + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * +*/ + +//! This module provides deserialization primitives for query packets + +use bytes::BytesMut; +use corelib::terrapipe::DEF_QMETALINE_BUFSIZE; +use corelib::terrapipe::{ActionType, RespCodes}; +use std::io::Cursor; + +/// Result of parsing a query +/// This is **not** the same as `std`'s `Result` but instead is an `enum` +/// which represents the outcome when a query is parsed +#[derive(Debug)] +pub enum QueryParseResult { + /// A successfully parsed `Query` along with an `usize` which specifies + /// the amount by which the `buffer` must advance + Parsed((Query, usize)), + /// The query parsing failed and returned a Response code as an error + RespCode(RespCodes), + /// The query packet is incomplete + Incomplete, +} + +/// A navigator is a wrapper around a `Cursor` which efficiently navigates over +/// a mutable `BytesMut` object +pub struct Navigator<'a> { + /// The cursor + cursor: Cursor<&'a [u8]>, +} +impl<'a> Navigator<'a> { + /// Create a new `Navigator` instance + pub fn new(buffer: &'a mut BytesMut) -> Self { + Navigator { + cursor: Cursor::new(&buffer[..]), + } + } + /// Get a line from a buffer + /// + /// The `beforehint` argument provides a clue to the `Navigator` about the + /// point till which the line must end. This prevents checking the entire buffer. + /// Note that this `beforehint` is optional and in case no hint as available, + /// just pass `None` + pub fn get_line(&mut self, beforehint: Option) -> Option<&'a [u8]> { + let ref mut cursor = self.cursor; + let start = cursor.position() as usize; + let end = match beforehint { + // The end will be the current position + the moved position + Some(hint) => (start + hint), + None => cursor.get_ref().len() - 1, + }; + for i in start..end { + // If the current character is a `\n` byte, then return this slice + if cursor.get_ref()[i] == b'\n' { + if let Some(slice) = cursor.get_ref().get(start..i) { + // Only move the cursor ahead if the bytes could be fetched + // otherwise the next time we try to get anything, the + // cursor would crash. If we don't change the cursor position + // we will keep moving over stale data + cursor.set_position((i + 1) as u64); + return Some(slice); + } + } + } + // If we are here, then the slice couldn't be extracted, + None + } + /// Get an exact number of bytes from a buffer + pub fn get_exact(&mut self, exact: usize) -> Option<&'a [u8]> { + let ref mut cursor = self.cursor; + // The start position should be set to the current position of the + // cursor, otherwise we'll move from start, which is erroneous + let start = cursor.position() as usize; + // The end position will be the current position + number of bytes to be read + let end = start + exact; + if let Some(chunk) = cursor.get_ref().get(start..end) { + // Move the cursor ahead - only if we could get the slice + self.cursor.set_position(end as u64); + Some(chunk) + } else { + // If we're here, then the slice couldn't be extracted, probably + // because it doesn't exist. Return `None` + None + } + } + /// Get the cursor's position as an `usize` + fn get_pos_usize(&self) -> usize { + self.cursor.position() as usize + } +} + +/// A metaline object which represents a metaline in the Terrapipe protocol's +/// query packet +struct Metaline { + /// The content size, inclusive of the newlines. This is sent by the client + /// driver + content_size: usize, + /// The metaline size, inclusive of the newline character. This is also sent + /// by the client driver + metalayout_size: usize, + /// The action type - whether it is a pipelined operation or a simple query + actiontype: ActionType, +} + +impl Metaline { + /// Create a new metaline from a `Navigator` instance + /// + /// This will use the navigator to extract the metaline + pub fn from_navigator(nav: &mut Navigator) -> Option { + if let Some(mline) = nav.get_line(Some(DEF_QMETALINE_BUFSIZE)) { + // The minimum metaline length is five characters + // if not - clearly something is wrong + if mline.len() < 5 { + println!("Did we?"); + return None; + } + // The first byte is always a `*` or `$` depending on the + // type of query + let actiontype = match mline[0] { + b'$' => ActionType::Pipeline, + b'*' => ActionType::Simple, + _ => return None, + }; + // Get the frame sizes: the first index is the content size + // and the second index is the metalayout size + if let Some(sizes) = get_frame_sizes(&mline[1..]) { + return Some(Metaline { + content_size: sizes[0], + metalayout_size: sizes[1], + actiontype, + }); + } + } + None + } +} + +/// A metalayout object which represents the Terrapipe protocol's metalayout line +/// +/// This is nothing more than a wrapper around `Vec` which provides a more +/// convenient API +#[derive(Debug)] +struct Metalayout(Vec); + +impl Metalayout { + /// Create a new metalayout from a `Navigator` instance + /// + /// This uses the navigator to navigate over the buffer + pub fn from_navigator(nav: &mut Navigator, mlayoutsize: usize) -> Option { + // We pass `mlayoutsize` to `get_line` since we already know where the + // metalayout ends + if let Some(layout) = nav.get_line(Some(mlayoutsize)) { + if let Some(skip_sequence) = get_skip_sequence(&layout) { + return Some(Metalayout(skip_sequence)); + } + } + None + } +} + +/// # A `Query` object +#[derive(Debug, PartialEq)] +pub struct Query { + /// A stream of tokens parsed from the dataframe + pub data: Vec, + /// The type of query - `Simple` or `Pipeline` + pub actiontype: ActionType, +} + +impl Query { + /// Create a new `Query` instance from a `Navigator` + /// + /// This function will use the private `Metalayout` and `Metaline` objects + /// to extract information on the format of the dataframe and then it will + /// parse the dataframe itself + pub fn from_navigator(mut nav: Navigator) -> QueryParseResult { + if let Some(metaline) = Metaline::from_navigator(&mut nav) { + if let Some(metalayout) = Metalayout::from_navigator(&mut nav, metaline.metalayout_size) + { + if let Some(content) = nav.get_exact(metaline.content_size) { + let data = extract_idents(content, metalayout.0); + // Return the parsed query and the amount by which the buffer + // must `advance` + return QueryParseResult::Parsed(( + Query { + data, + actiontype: metaline.actiontype, + }, + nav.get_pos_usize(), + )); + } else { + // Since we couldn't get the slice, this means that the + // query packet was incomplete, return that error + return QueryParseResult::Incomplete; + } + } + } + // If we're here - it clearly means that the metaline/metalayout failed + // to parse - we return a standard invalid metaframe `RespCodes` + QueryParseResult::RespCode(RespCodes::InvalidMetaframe) + } +} + +/// Get the frame sizes from a metaline +fn get_frame_sizes(metaline: &[u8]) -> Option> { + if let Some(s) = extract_sizes_splitoff(metaline, b'!', 2) { + if s.len() == 2 { + Some(s) + } else { + None + } + } else { + None + } +} + +/// Get the skip sequence from the metalayout line +fn get_skip_sequence(metalayout: &[u8]) -> Option> { + let l = metalayout.len() / 2; + extract_sizes_splitoff(metalayout, b'#', l) +} + +/// Extract `usize`s from any buffer which when converted into UTF-8 +/// looks like: '123456567\n', where `` is the separator +/// which in the case of the metaline is a `0x21` byte or a `0x23` byte in the +/// case of the metalayout line +fn extract_sizes_splitoff(buf: &[u8], splitoff: u8, sizehint: usize) -> Option> { + let mut sizes = Vec::with_capacity(sizehint); + let len = buf.len(); + let mut i = 0; + while i < len { + if buf[i] == splitoff { + // This is a hash + let mut res: usize = 0; + // Move to the next element + i = i + 1; + while i < len { + // Only proceed if the current byte is not the separator + if buf[i] != splitoff { + // Make sure we don't go wrong here + // 48 is the unicode byte for 0 so 48-48 should give 0 + // Also the subtraction shouldn't give something greater + // than 9, otherwise it is a different character + let num: usize = match buf[i].checked_sub(48) { + Some(s) => s.into(), + None => return None, + }; + if num > 9 { + return None; + } + res = res * 10 + num; + i = i + 1; + continue; + } else { + break; + } + } + sizes.push(res.into()); + continue; + } else { + // Technically, we should never reach here, but if we do + // clearly, it's an error by the client-side driver + return None; + } + } + Some(sizes) +} +/// Extract the tokens from the slice using the `skip_sequence` +fn extract_idents(buf: &[u8], skip_sequence: Vec) -> Vec { + skip_sequence + .into_iter() + .scan(buf.into_iter(), |databuf, size| { + let tok: Vec = databuf.take(size).map(|val| *val).collect(); + let _ = databuf.next(); + // FIXME(@ohsayan): This is quite slow, we'll have to use SIMD in the future + Some(String::from_utf8_lossy(&tok).to_string()) + }) + .collect() +} + +#[cfg(test)] +#[test] +fn test_navigator() { + use bytes::BytesMut; + let mut mybytes = BytesMut::from("*!5!2\n1#\nHEYA\n".as_bytes()); + let mut nav = Navigator::new(&mut mybytes); + assert_eq!(Some("*!5!2".as_bytes()), nav.get_line(Some(46))); + assert_eq!(Some("1#".as_bytes()), nav.get_line(Some(3))); + assert_eq!(Some("HEYA".as_bytes()), nav.get_line(Some(5))); +} + +#[cfg(test)] +#[test] +fn test_query() { + use bytes::{Buf, BytesMut}; + let mut mybuf = BytesMut::from("*!14!7\n#3#5#3\nSET\nsayan\n123\n".as_bytes()); + let resulting_data_should_be: Vec = "SET sayan 123" + .split_whitespace() + .map(|val| val.to_string()) + .collect(); + let nav = Navigator::new(&mut mybuf); + let query = Query::from_navigator(nav); + if let QueryParseResult::Parsed((query, forward)) = query { + assert_eq!( + query, + Query { + data: resulting_data_should_be, + actiontype: ActionType::Simple, + } + ); + mybuf.advance(forward); + assert_eq!(mybuf.len(), 0); + } else { + panic!("Query parsing failed"); + } +} diff --git a/server/src/protocol/mod.rs b/server/src/protocol/mod.rs index 433e8f2c..18c1f95e 100644 --- a/server/src/protocol/mod.rs +++ b/server/src/protocol/mod.rs @@ -19,251 +19,36 @@ * */ -pub mod deserializer { - //! This module provides deserialization primitives for query packets - use bytes::BytesMut; - use corelib::terrapipe::{ActionType, RespCodes}; - use std::io::Cursor; - #[derive(Debug)] - pub enum QueryParseResult { - Parsed((Query, usize)), - RespCode(RespCodes), - Incomplete, - } - - pub struct Navigator<'a> { - buffer: &'a BytesMut, - cursor: Cursor<&'a [u8]>, - } - impl<'a> Navigator<'a> { - pub fn new(buffer: &'a mut BytesMut) -> Self { - Navigator { - cursor: Cursor::new(&buffer[..]), - buffer, - } - } - pub fn get_line(&mut self, beforehint: Option) -> Option<&'a [u8]> { - let ref mut cursor = self.cursor; - let start = cursor.position() as usize; - let end = match beforehint { - Some(hint) => (start + hint), - None => cursor.get_ref().len() - 1, - }; - for i in start..end { - if cursor.get_ref()[i] == b'\n' { - cursor.set_position((i + 1) as u64); - return cursor.get_ref().get(start..i); - } - } - None - } - pub fn get_exact(&mut self, exact: usize) -> Option<&'a [u8]> { - let ref mut cursor = self.cursor; - let start = cursor.position() as usize; - let end = start + exact; - if let Some(chunk) = cursor.get_ref().get(start..end) { - self.cursor.set_position(end as u64); - Some(chunk) - } else { - None - } - } - fn get_pos_usize(&self) -> usize { - self.cursor.position() as usize - } - } - - struct Metaline { - content_size: usize, - metalayout_size: usize, - actiontype: ActionType, - } - - impl Metaline { - pub fn from_navigator(nav: &mut Navigator) -> Option { - if let Some(mline) = nav.get_line(Some(46)) { - let actiontype = match mline.get(0) { - Some(b'$') => ActionType::Pipeline, - Some(b'*') => ActionType::Simple, - _ => return None, - }; - if let Some(sizes) = get_frame_sizes(&mline[1..]) { - return Some(Metaline { - content_size: sizes[0], - metalayout_size: sizes[1], - actiontype, - }); - } - } - None - } - } - - #[derive(Debug)] - struct Metalayout(Vec); - - impl Metalayout { - pub fn from_navigator(nav: &mut Navigator, mlayoutsize: usize) -> Option { - if let Some(layout) = nav.get_line(Some(mlayoutsize)) { - if let Some(skip_sequence) = get_skip_sequence(&layout) { - return Some(Metalayout(skip_sequence)); - } - } - None - } - } - - #[derive(Debug, PartialEq)] - pub struct Query { - pub data: Vec, - pub actiontype: ActionType, - } - - impl Query { - pub fn from_navigator(mut nav: Navigator) -> QueryParseResult { - if let Some(metaline) = Metaline::from_navigator(&mut nav) { - if let Some(metalayout) = - Metalayout::from_navigator(&mut nav, metaline.metalayout_size) - { - // We reduce the `get_exact`'s by one to avoid including the newline - if let Some(content) = nav.get_exact(metaline.content_size) { - let data = extract_idents(content, metalayout.0); - return QueryParseResult::Parsed(( - Query { - data, - actiontype: metaline.actiontype, - }, - nav.get_pos_usize(), - )); - } else { - return QueryParseResult::Incomplete; - } - } - } - QueryParseResult::RespCode(RespCodes::InvalidMetaframe) - } - } - - fn get_frame_sizes(metaline: &[u8]) -> Option> { - if let Some(s) = extract_sizes_splitoff(metaline, b'!', 2) { - if s.len() == 2 { - Some(s) - } else { - None - } - } else { - None - } - } - fn get_skip_sequence(metalayout: &[u8]) -> Option> { - let l = metalayout.len() / 2; - extract_sizes_splitoff(metalayout, b'#', l) - } - - fn extract_sizes_splitoff(buf: &[u8], splitoff: u8, sizehint: usize) -> Option> { - let mut sizes = Vec::with_capacity(sizehint); - let len = buf.len(); - let mut i = 0; - while i < len { - if buf[i] == splitoff { - // This is a hash - let mut res: usize = 0; - // Move to the next element - i = i + 1; - while i < len { - if buf[i] != splitoff { - let num: usize = match buf[i].checked_sub(48) { - Some(s) => s.into(), - None => return None, - }; - res = res * 10 + num; - i = i + 1; - continue; - } else { - break; - } - } - sizes.push(res.into()); - continue; - } else { - // Technically, we should never reach here, but if we do - // clearly, it's an error by the client-side driver - return None; - } - } - Some(sizes) - } - fn extract_idents(buf: &[u8], skip_sequence: Vec) -> Vec { - skip_sequence - .into_iter() - .scan(buf.into_iter(), |databuf, size| { - let tok: Vec = databuf.take(size).map(|val| *val).collect(); - let _ = databuf.next(); - // FIXME(@ohsayan): This is quite slow, we'll have to use SIMD in the future - Some(String::from_utf8_lossy(&tok).to_string()) - }) - .collect() - } - - #[cfg(test)] - #[test] - fn test_navigator() { - use bytes::BytesMut; - let mut mybytes = BytesMut::from("*!5!2\n1#\nHEYA\n".as_bytes()); - let mut nav = Navigator::new(&mut mybytes); - assert_eq!(Some("*!5!2".as_bytes()), nav.get_line(Some(46))); - assert_eq!(Some("1#".as_bytes()), nav.get_line(Some(3))); - assert_eq!(Some("HEYA".as_bytes()), nav.get_line(Some(5))); - } - - #[cfg(test)] - #[test] - fn test_query() { - use bytes::{Buf, BytesMut}; - let mut mybuf = BytesMut::from("*!14!7\n#3#5#3\nSET\nsayan\n123\n".as_bytes()); - let resulting_data_should_be: Vec = "SET sayan 123" - .split_whitespace() - .map(|val| val.to_string()) - .collect(); - let nav = Navigator::new(&mut mybuf); - let query = Query::from_navigator(nav); - if let QueryParseResult::Parsed((query, forward)) = query { - assert_eq!( - query, - Query { - data: resulting_data_should_be, - actiontype: ActionType::Simple, - } - ); - mybuf.advance(forward); - assert_eq!(mybuf.len(), 0); - } else { - panic!("Query parsing failed"); - } - } -} - +mod deserializer; use bytes::{Buf, BytesMut}; -use corelib::terrapipe::{extract_idents, ActionType}; -use corelib::terrapipe::{RespBytes, RespCodes}; -use corelib::TResult; -use deserializer::{ - Navigator, Query, +use corelib::terrapipe::RespBytes; +use deserializer::Navigator; +pub use deserializer::{ + Query, QueryParseResult::{self, *}, }; -use std::io::{Cursor, Result as IoResult}; +use std::io::Result as IoResult; use std::net::SocketAddr; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::io::{AsyncReadExt, AsyncWriteExt, BufWriter}; use tokio::net::TcpStream; +/// The size of the read buffer in bytes +const BUF_CAP: usize = 8 * 1024; // 8 KB per-connection + /// A TCP connection wrapper pub struct Connection { - stream: TcpStream, + /// The connection to the remote socket, wrapped in a buffer to speed + /// up writing + stream: BufWriter, + /// The in-memory read buffer. The size is given by `BUF_CAP` buffer: BytesMut, } +/// The outcome of running `Connection`'s `try_query` function pub enum QueryResult { + /// A parsed `Query` object Q(Query), + /// An error response E(Vec), } @@ -271,10 +56,14 @@ impl Connection { /// Initiailize a new `Connection` instance pub fn new(stream: TcpStream) -> Self { Connection { - stream, - buffer: BytesMut::with_capacity(4096), + stream: BufWriter::new(stream), + buffer: BytesMut::with_capacity(BUF_CAP), } } + /// Read a query from the remote end + /// + /// This function asynchronously waits until all the data required + /// for parsing the query is available pub async fn read_query(&mut self) -> Result { self.read_again().await?; loop { @@ -289,13 +78,17 @@ impl Connection { self.read_again().await?; } } + /// Try to parse a query from the buffered data fn try_query(&mut self) -> QueryParseResult { - let mut nav = Navigator::new(&mut self.buffer); + let nav = Navigator::new(&mut self.buffer); Query::from_navigator(nav) } + /// Try to fill the buffer again async fn read_again(&mut self) -> Result<(), String> { match self.stream.read_buf(&mut self.buffer).await { Ok(0) => { + // If 0 bytes were received, then the remote end closed + // the connection if self.buffer.is_empty() { return Err(format!("{:?} didn't send any data", self.get_peer()).into()); } else { @@ -310,9 +103,11 @@ impl Connection { Err(e) => return Err(format!("{}", e)), } } + /// Get the peer address fn get_peer(&self) -> IoResult { - self.stream.peer_addr() + self.stream.get_ref().peer_addr() } + /// Write a response to the stream pub async fn write_response(&mut self, resp: Vec) { if let Err(_) = self.stream.write_all(&resp).await { eprintln!("Error while writing to stream: {:?}", self.get_peer());