From 59046db8f8d1bf94b7c52d279089c1614a5c4793 Mon Sep 17 00:00:00 2001 From: Sayan Nandan Date: Thu, 23 Jul 2020 13:34:26 +0530 Subject: [PATCH] Implement client --- CONTRIBUTING.md | 2 +- Cargo.lock | 5 +- cli/Cargo.toml | 3 +- cli/src/argparse.rs | 98 +++-------------------- cli/src/client.rs | 160 ++++++++++++++++++++++++++++++++++++++ cli/src/main.rs | 8 +- corelib/src/terrapipe.rs | 162 +++++++++++++++++++++++++++++++-------- server/Cargo.toml | 2 +- server/src/coredb.rs | 27 ++++--- server/src/dbnet.rs | 1 - server/src/protocol.rs | 68 +++------------- 11 files changed, 337 insertions(+), 199 deletions(-) create mode 100644 cli/src/client.rs diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 78cfa21c..768a98c5 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -10,7 +10,7 @@ You can see a list of contributors **[here](./CONTRIBUTORS.md)** * `FIXME(@)` : Use this when you have made an implementation that can be improved in the future, such as improved efficiency * `HACK(@)` : Use this when the code you are using a temporary workaround -* `TODO(@)` : Use this when you have kept something incomplete +* `TODO(@)` : Use this when you have kept something ArgumentError ### Formatting diff --git a/Cargo.lock b/Cargo.lock index 37e52c7f..165426c9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -279,9 +279,9 @@ dependencies = [ [[package]] name = "tokio" -version = "0.2.21" +version = "0.2.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d099fa27b9702bed751524694adbe393e18b36b204da91eb1cbbbbb4a5ee2d58" +checksum = "5d34ca54d84bf2b5b4d7d31e901a8464f7b60ac145a284fba25ceb801f2ddccd" dependencies = [ "bytes", "fnv", @@ -317,6 +317,7 @@ name = "tsh" version = "0.1.0" dependencies = [ "corelib", + "tokio", ] [[package]] diff --git a/cli/Cargo.toml b/cli/Cargo.toml index d4aa6341..eb3edb91 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -7,4 +7,5 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -corelib = {path = "../corelib"} \ No newline at end of file +corelib = {path = "../corelib"} +tokio = {version = "0.2.22", features = ["full"]} \ No newline at end of file diff --git a/cli/src/argparse.rs b/cli/src/argparse.rs index 5d425b47..3f117248 100644 --- a/cli/src/argparse.rs +++ b/cli/src/argparse.rs @@ -19,17 +19,16 @@ * */ -use corelib::terrapipe::{self, DEF_QMETALINE_BUFSIZE}; -use std::io::{self, prelude::*, BufReader}; -use std::net::TcpStream; -use std::process; +use crate::client::Client; +use std::io::{self, prelude::*}; +use tokio::signal; const ADDR: &'static str = "127.0.0.1:2003"; -pub fn execute_query() { - let mut connection = match TcpStream::connect(ADDR) { +pub async fn execute_query() { + let mut client = match Client::new(ADDR).await { Ok(c) => c, - Err(_) => { - eprintln!("ERROR: Couldn't connect to the TDB server"); - process::exit(0x100); + Err(e) => { + eprintln!("Error: {}", e); + return; } }; loop { @@ -41,85 +40,6 @@ pub fn execute_query() { io::stdin() .read_line(&mut rl) .expect("Couldn't read line, this is a serious error!"); - let mut cmd = terrapipe::QueryBuilder::new_simple(); - cmd.from_cmd(rl); - let (size, resp) = cmd.prepare_response(); - match connection.write(&resp) { - Ok(n) => { - if n < size { - eprintln!("ERROR: Couldn't write all bytes to server"); - process::exit(0x100); - } - }, - Err(_) => { - eprintln!("ERROR: Couldn't send data to the TDB server"); - process::exit(0x100); - } - } - println!("{}", parse_response(&connection)); - } -} - -pub fn parse_response(stream: &TcpStream) -> String { - let mut metaline = String::with_capacity(DEF_QMETALINE_BUFSIZE); - let mut bufreader = BufReader::new(stream); - match bufreader.read_line(&mut metaline) { - Ok(_) => (), - Err(_) => { - eprintln!("Couldn't read metaline from tdb server"); - process::exit(0x100); - } + client.run(rl, signal::ctrl_c()).await; } - let metaline = metaline.trim_matches(char::from(0)); - let fields: Vec<&str> = metaline.split('!').collect(); - if let (Some(resptype), Some(respcode), Some(clength), Some(ml_length)) = - (fields.get(0), fields.get(1), fields.get(2), fields.get(3)) - { - if *resptype == "$" { - todo!("Pipelined response deconding is yet to be implemented") - } - let mut is_err_response = false; - match respcode.to_owned() { - "0" => (), - "1" => return format!("ERROR: Couldn't find the requested key"), - "2" => return format!("ERROR: Can't overwrite existing value"), - "3" => return format!("ERROR: tsh sent an invalid metaframe"), - "4" => return format!("ERROR: tsh sent an incomplete query packet"), - "5" => return format!("ERROR: tdb had an internal server error"), - "6" => is_err_response = true, - _ => (), - } - if let (Ok(clength), Ok(ml_length)) = (clength.parse::(), ml_length.parse::()) - { - let mut metalinebuf = String::with_capacity(ml_length); - let mut databuf = vec![0; clength]; - bufreader.read_line(&mut metalinebuf).unwrap(); - let sizes: Vec = metalinebuf - .split("#") - .map(|size| size.parse::().unwrap()) - .collect(); - bufreader.read(&mut databuf).unwrap(); - eprintln!("{:?}", String::from_utf8_lossy(&databuf)); - let res = extract_idents(databuf, sizes); - let resp: String = res.iter().flat_map(|s| s.chars()).collect(); - if !is_err_response { - return resp; - } else { - return format!("ERROR: {}", resp); - } - } - } - format!("ERROR: The server sent an invalid response") -} - -fn extract_idents(buf: Vec, skip_sequence: Vec) -> Vec { - skip_sequence - .into_iter() - .scan(buf.into_iter(), |databuf, size| { - let tok: Vec = databuf.take(size).collect(); - let _ = databuf.next(); - // FIXME(@ohsayan): This is quite slow, we'll have to use SIMD in the future - Some(String::from_utf8_lossy(&tok).to_string()) - }) - .collect() } diff --git a/cli/src/client.rs b/cli/src/client.rs new file mode 100644 index 00000000..6df7b0d3 --- /dev/null +++ b/cli/src/client.rs @@ -0,0 +1,160 @@ +/* + * Created on Thu Jul 23 2020 + * + * This file is a part of the source code for the Terrabase database + * Copyright (c) 2020, Sayan Nandan + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * +*/ + +use corelib::{ + terrapipe::{self, ActionType, QueryBuilder, RespCodes, DEF_QMETALAYOUT_BUFSIZE}, + TResult, +}; +use std::{error::Error, fmt, process}; +use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader}; +use tokio::net::TcpStream; +use std::future::Future; + +#[derive(Debug)] +pub enum ClientError { + RespCode(RespCodes), + InvalidResponse, + OtherError(String), +} + +impl fmt::Display for ClientError { + fn fmt(&self, mut f: &mut fmt::Formatter<'_>) -> fmt::Result { + use ClientError::*; + match self { + RespCode(r) => r.fmt(&mut f), + InvalidResponse => write!(f, "ERROR: The server sent an invalid response"), + OtherError(e) => write!(f, "ERROR: {}", e), + } + } +} + +impl Error for ClientError {} + +pub struct Client { + con: TcpStream, +} + +pub struct RMetaline { + content_size: usize, + metalayout_size: usize, + respcode: RespCodes, + resptype: ActionType, +} + +impl RMetaline { + pub fn from_buf(buf: String) -> TResult { + let parts: Vec<&str> = buf.split('!').collect(); + if let (Some(resptype), Some(respcode), Some(clength), Some(metalayout_size)) = + (parts.get(0), parts.get(1), parts.get(2), parts.get(3)) + { + if resptype == &"$" { + todo!("Pipelined responses are yet to be implemented"); + } + if resptype != &"*" { + return Err(ClientError::InvalidResponse.into()); + } + if let (Some(respcode), Ok(clength), Ok(metalayout_size)) = ( + RespCodes::from_str(respcode, None), + clength.trim_matches(char::from(0)).trim().parse::(), + metalayout_size + .trim_matches(char::from(0)) + .trim() + .parse::(), + ) { + return Ok(RMetaline { + content_size: clength, + metalayout_size, + respcode, + resptype: ActionType::Simple, + }); + } else { + Err(ClientError::InvalidResponse.into()) + } + } else { + Err(ClientError::InvalidResponse.into()) + } + } +} + +impl Client { + pub async fn new(addr: &str) -> TResult { + let con = TcpStream::connect(addr).await?; + Ok(Client { con }) + } + pub async fn run(&mut self, cmd: String, sig: impl Future) { + if cmd.len() == 0 { + return; + } else { + let mut qbuilder = QueryBuilder::new_simple(); + qbuilder.from_cmd(cmd); + let q = tokio::select! { + query = self.run_query(qbuilder.prepare_response()) => query, + _ = sig => { + println!("Goodbye!"); + // Terminate the connection + process::exit(0x100); + } + }; + match q { + Ok(res) => { + res.into_iter().for_each(|val| println!("{}", val)); + return; + } + Err(e) => { + eprintln!("{}", e); + return; + } + }; + } + } + async fn run_query(&mut self, (_, query_bytes): (usize, Vec)) -> TResult> { + self.con.write_all(&query_bytes).await?; + let mut metaline_buf = String::with_capacity(DEF_QMETALAYOUT_BUFSIZE); + let mut bufreader = BufReader::new(&mut self.con); + bufreader.read_line(&mut metaline_buf).await?; + let metaline = RMetaline::from_buf(metaline_buf)?; + // Skip reading the rest of the data if the metaline says that there is an + // error. WARNING: This would mean that any other data sent - would simply be + // ignored + let mut is_other_error = false; + match metaline.respcode { + // Only these two variants have some data in the dataframe, so we continue + RespCodes::Okay => (), + RespCodes::OtherError(_) => is_other_error = true, + code @ _ => return Err(code.into()), + } + if metaline.content_size == 0 { + return Ok(vec![]); + } + let (mut metalayout, mut dataframe) = ( + String::with_capacity(metaline.metalayout_size), + vec![0u8; metaline.content_size], + ); + bufreader.read_line(&mut metalayout).await?; + let metalayout = terrapipe::get_sizes(metalayout)?; + bufreader.read_exact(&mut dataframe).await?; + if is_other_error { + Err(ClientError::OtherError(String::from_utf8_lossy(&dataframe).to_string()).into()) + } else { + Ok(terrapipe::extract_idents(dataframe, metalayout)) + } + } +} diff --git a/cli/src/main.rs b/cli/src/main.rs index ba642810..c1911ffe 100644 --- a/cli/src/main.rs +++ b/cli/src/main.rs @@ -20,10 +20,12 @@ */ mod argparse; - +mod client; +use tokio; const MSG_WELCOME: &'static str = "TerrabaseDB v0.1.0"; -fn main() { +#[tokio::main] +async fn main() { println!("{}", MSG_WELCOME); - argparse::execute_query(); + argparse::execute_query().await; } diff --git a/corelib/src/terrapipe.rs b/corelib/src/terrapipe.rs index ae50c87d..83c518a8 100644 --- a/corelib/src/terrapipe.rs +++ b/corelib/src/terrapipe.rs @@ -19,7 +19,10 @@ * */ -//! This implements the Terrapipe protocol +//! This implements primitives for the Terrapipe protocol + +use std::error::Error; +use std::fmt; /// Default query metaline buffer size pub const DEF_QMETALINE_BUFSIZE: usize = 44; @@ -46,18 +49,73 @@ pub mod responses { use lazy_static::lazy_static; lazy_static! { /// Empty `0`(Okay) response - without any content - pub static ref RESP_OKAY_EMPTY: Vec = "0!0!0".as_bytes().to_owned(); + pub static ref RESP_OKAY_EMPTY: Vec = "*!0!0!0\n".as_bytes().to_owned(); /// `1` Not found response - pub static ref RESP_NOT_FOUND: Vec = "1!0!0".as_bytes().to_owned(); + pub static ref RESP_NOT_FOUND: Vec = "*!1!0!0\n".as_bytes().to_owned(); /// `2` Overwrite Error response - pub static ref RESP_OVERWRITE_ERROR: Vec = "2!0!0".as_bytes().to_owned(); + pub static ref RESP_OVERWRITE_ERROR: Vec = "*!2!0!0\n".as_bytes().to_owned(); /// `3` Invalid Metaframe response - pub static ref RESP_INVALID_MF: Vec = "3!0!0".as_bytes().to_owned(); - /// `4` Incomplete frame response - pub static ref RESP_INCOMPLETE: Vec = "4!0!0".as_bytes().to_owned(); + pub static ref RESP_INVALID_MF: Vec = "*!3!0!0\n".as_bytes().to_owned(); + /// `4` ArgumentError frame response + pub static ref RESP_ArgumentError: Vec = "*!4!0!0\n".as_bytes().to_owned(); /// `5` Internal server error response - pub static ref RESP_SERVER_ERROR: Vec = "5!0!0".as_bytes().to_owned(); + pub static ref RESP_SERVER_ERROR: Vec = "*!5!0!0\n".as_bytes().to_owned(); + } +} + +pub fn get_sizes(stream: String) -> Result, RespCodes> { + let sstr: Vec<&str> = stream.split('#').collect(); + let mut sstr_iter = sstr.into_iter().peekable(); + let mut sizes = Vec::with_capacity(sstr_iter.len()); + while let Some(size) = sstr_iter.next() { + if sstr_iter.peek().is_some() { + // Skip the last element + if let Ok(val) = size.parse::() { + sizes.push(val); + } else { + return Err(RespCodes::InvalidMetaframe); + } + } else { + break; + } } + Ok(sizes) +} + +#[cfg(test)] +#[test] +fn test_get_sizes() { + let retbuf = "10#20#30#".to_owned(); + let sizes = get_sizes(retbuf).unwrap(); + assert_eq!(sizes, vec![10usize, 20usize, 30usize]); +} + +pub fn extract_idents(buf: Vec, skip_sequence: Vec) -> Vec { + skip_sequence + .into_iter() + .scan(buf.into_iter(), |databuf, size| { + let tok: Vec = databuf.take(size).collect(); + let _ = databuf.next(); + // FIXME(@ohsayan): This is quite slow, we'll have to use SIMD in the future + Some(String::from_utf8_lossy(&tok).to_string()) + }) + .collect() +} + +#[cfg(test)] +#[test] +fn test_extract_idents() { + let testbuf = "set\nsayan\n17\n".as_bytes().to_vec(); + let skip_sequence: Vec = vec![3, 5, 2]; + let res = extract_idents(testbuf, skip_sequence); + assert_eq!( + vec!["set".to_owned(), "sayan".to_owned(), "17".to_owned()], + res + ); + let badbuf = vec![0, 0, 159, 146, 150]; + let skip_sequence: Vec = vec![1, 2]; + let res = extract_idents(badbuf, skip_sequence); + assert_eq!(res[1], "��"); } /// Response codes returned by the server @@ -65,36 +123,77 @@ pub mod responses { pub enum RespCodes { /// `0`: Okay (Empty Response) - use the `ResponseBuilder` for building /// responses that contain data - EmptyResponseOkay, + Okay, /// `1`: Not Found NotFound, /// `2`: Overwrite Error OverwriteError, /// `3`: Invalid Metaframe InvalidMetaframe, - /// `4`: Incomplete - Incomplete, + /// `4`: ArgumentError + ArgumentError, /// `5`: Server Error ServerError, - /// `6`: Some other error - the wrapped `String` will be returned in the response body - OtherError(String), + /// `6`: Some other error - the wrapped `String` will be returned in the response body. + /// Just a note, this gets quite messy, especially when we're using it for deconding responses + OtherError(Option), } +impl fmt::Display for RespCodes { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + use RespCodes::*; + match self { + Okay => unimplemented!(), + NotFound => write!(f, "ERROR: Couldn't find the key"), + OverwriteError => write!(f, "ERROR: Existing values cannot be overwritten"), + InvalidMetaframe => write!(f, "ERROR: Invalid metaframe"), + ArgumentError => write!(f, "ERROR: The command is not in the correct format"), + ServerError => write!(f, "ERROR: The server had an internal error"), + OtherError(e) => match e { + None => write!(f, "ERROR: Some unknown error occurred"), + Some(e) => write!(f, "ERROR: {}", e), + }, + } + } +} + +impl Error for RespCodes {} + impl From for u8 { fn from(rcode: RespCodes) -> u8 { use RespCodes::*; match rcode { - EmptyResponseOkay => 0, + Okay => 0, NotFound => 1, OverwriteError => 2, InvalidMetaframe => 3, - Incomplete => 4, + ArgumentError => 4, ServerError => 5, OtherError(_) => 6, } } } +impl RespCodes { + pub fn from_str(val: &str, extra: Option) -> Option { + use RespCodes::*; + let res = match val.parse::() { + Ok(val) => match val { + 0 => Okay, + 1 => NotFound, + 2 => OverwriteError, + 3 => InvalidMetaframe, + 4 => ArgumentError, + 5 => ServerError, + 6 => OtherError(extra), + _ => return None, + }, + Err(_) => return None, + }; + Some(res) + } +} + /// Representation of the query action type - pipelined or simple #[derive(Debug, PartialEq)] pub enum ActionType { @@ -113,26 +212,26 @@ impl RespBytes for RespCodes { use responses::*; use RespCodes::*; match self { - EmptyResponseOkay => RESP_OKAY_EMPTY.to_owned(), + Okay => RESP_OKAY_EMPTY.to_owned(), NotFound => RESP_NOT_FOUND.to_owned(), OverwriteError => RESP_OVERWRITE_ERROR.to_owned(), InvalidMetaframe => RESP_INVALID_MF.to_owned(), - Incomplete => RESP_INCOMPLETE.to_owned(), + ArgumentError => RESP_ArgumentError.to_owned(), ServerError => RESP_SERVER_ERROR.to_owned(), - OtherError(e) => format!("6!{}!#{}", e.len(), e.len()).as_bytes().to_owned(), + OtherError(e) => match e { + Some(e) => { + let dl = e.len().to_string(); + format!("*!6!{}!{}\n#{}\n{}", e.len(), dl.len(), dl, e) + .as_bytes() + .to_owned() + } + None => format!("*!6!0!0\n").as_bytes().to_owned(), + }, } } } -/// The query dataframe #[derive(Debug)] -pub struct QueryDataframe { - /// The data part - pub data: Vec, - /// The query action type - pub actiontype: ActionType, -} - /// This is enum represents types of responses which can be built from it pub enum ResponseBuilder { SimpleResponse, // TODO: Add pipelined response builder here @@ -145,12 +244,12 @@ impl ResponseBuilder { } } +#[derive(Debug)] /// Representation of a simple response pub struct SimpleResponse { respcode: u8, metalayout_buf: String, dataframe_buf: String, - size_tracker: usize, } impl SimpleResponse { @@ -161,14 +260,11 @@ impl SimpleResponse { respcode, metalayout_buf: String::with_capacity(2), dataframe_buf: String::with_capacity(40), - size_tracker: 0, } } /// Add data to the response pub fn add_data(&mut self, data: String) { - let datstr = data.len().to_string(); - self.metalayout_buf.push_str(&format!("{}#", datstr.len())); - self.size_tracker += datstr.len() + 1; + self.metalayout_buf.push_str(&format!("{}#", data.len())); self.dataframe_buf.push_str(&data); self.dataframe_buf.push('\n'); } @@ -178,7 +274,7 @@ impl SimpleResponse { format!( "*!{}!{}!{}\n{}\n{}", self.respcode, - self.size_tracker, + self.dataframe_buf.len(), self.metalayout_buf.len(), self.metalayout_buf, self.dataframe_buf @@ -197,7 +293,7 @@ impl RespBytes for SimpleResponse { #[cfg(test)] #[test] fn test_simple_response() { - let mut s = ResponseBuilder::new_simple(RespCodes::EmptyResponseOkay); + let mut s = ResponseBuilder::new_simple(RespCodes::Okay); s.add_data("Sayan".to_owned()); s.add_data("loves".to_owned()); s.add_data("you".to_owned()); diff --git a/server/Cargo.toml b/server/Cargo.toml index 02a2f707..916bf100 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -7,6 +7,6 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -tokio = { version = "0.2.21", features = ["full"] } +tokio = { version = "0.2.22", features = ["full"] } bytes = "0.5.6" corelib = {path ="../corelib"} \ No newline at end of file diff --git a/server/src/coredb.rs b/server/src/coredb.rs index 5e415ce3..4e6cf6cc 100644 --- a/server/src/coredb.rs +++ b/server/src/coredb.rs @@ -19,7 +19,7 @@ * */ -use corelib::terrapipe::QueryDataframe; +use crate::protocol::QueryDataframe; use corelib::terrapipe::{tags, ActionType, RespBytes, RespCodes, ResponseBuilder}; use std::collections::{hash_map::Entry, HashMap}; use std::sync::{self, Arc, RwLock}; @@ -95,8 +95,7 @@ impl CoreDB { Ok(v) => v, Err(e) => return e.into_response(), }; - let mut resp = - ResponseBuilder::new_simple(RespCodes::EmptyResponseOkay); + let mut resp = ResponseBuilder::new_simple(RespCodes::Okay); resp.add_data(res.to_owned()); return resp.into_response(); } @@ -111,7 +110,7 @@ impl CoreDB { Ok(_) => { #[cfg(Debug)] self.print_debug_table(); - return RespCodes::EmptyResponseOkay.into_response(); + return RespCodes::Okay.into_response(); } Err(e) => return e.into_response(), } @@ -130,7 +129,7 @@ impl CoreDB { #[cfg(Debug)] self.print_debug_table(); - RespCodes::EmptyResponseOkay.into_response() + RespCodes::Okay.into_response() } } Err(e) => return e.into_response(), @@ -148,22 +147,28 @@ impl CoreDB { #[cfg(Debug)] self.print_debug_table(); - return RespCodes::EmptyResponseOkay.into_response(); + return RespCodes::Okay.into_response(); } Err(e) => return e.into_response(), } + } else { } } } tags::TAG_HEYA => { - let mut resp = ResponseBuilder::new_simple(RespCodes::EmptyResponseOkay); - resp.add_data("HEY!".to_owned()); - return resp.into_response(); + if buf.next().is_none() { + let mut resp = ResponseBuilder::new_simple(RespCodes::Okay); + resp.add_data("HEY!".to_owned()); + return resp.into_response(); + } + } + _ => { + return RespCodes::OtherError(Some("Unknown command".to_owned())) + .into_response() } - _ => return RespCodes::OtherError("Unknown command".to_owned()).into_response(), } } - RespCodes::InvalidMetaframe.into_response() + RespCodes::ArgumentError.into_response() } pub fn new() -> Self { CoreDB { diff --git a/server/src/dbnet.rs b/server/src/dbnet.rs index 17e55822..31418d14 100644 --- a/server/src/dbnet.rs +++ b/server/src/dbnet.rs @@ -133,7 +133,6 @@ impl CHandler { return; } }; - eprintln!("{:?}", try_df); match try_df { Ok(df) => self.con.write_response(self.db.execute_query(df)).await, Err(e) => self.con.close_conn_with_error(e).await, diff --git a/server/src/protocol.rs b/server/src/protocol.rs index d9c99d29..c77f796f 100644 --- a/server/src/protocol.rs +++ b/server/src/protocol.rs @@ -19,11 +19,20 @@ * */ -use corelib::terrapipe::{ActionType, QueryDataframe}; +use corelib::terrapipe::{extract_idents, get_sizes, ActionType}; use corelib::terrapipe::{RespBytes, RespCodes, DEF_QMETALINE_BUFSIZE}; use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader}; use tokio::net::TcpStream; +/// The query dataframe +#[derive(Debug)] +pub struct QueryDataframe { + /// The data part + pub data: Vec, + /// The query action type + pub actiontype: ActionType, +} + #[derive(Debug, PartialEq)] pub struct PreQMF { action_type: ActionType, @@ -81,61 +90,6 @@ fn test_preqmf() { assert_eq!(preqmf, pqmf_should_be); } -pub fn get_sizes(stream: String) -> Result, RespCodes> { - let sstr: Vec<&str> = stream.split('#').collect(); - let mut sstr_iter = sstr.into_iter().peekable(); - let mut sizes = Vec::with_capacity(sstr_iter.len()); - while let Some(size) = sstr_iter.next() { - if sstr_iter.peek().is_some() { - // Skip the last element - if let Ok(val) = size.parse::() { - sizes.push(val); - } else { - return Err(RespCodes::InvalidMetaframe); - } - } else { - break; - } - } - Ok(sizes) -} - -#[cfg(test)] -#[test] -fn test_get_sizes() { - let retbuf = "10#20#30#".to_owned(); - let sizes = get_sizes(retbuf).unwrap(); - assert_eq!(sizes, vec![10usize, 20usize, 30usize]); -} - -fn extract_idents(buf: Vec, skip_sequence: Vec) -> Vec { - skip_sequence - .into_iter() - .scan(buf.into_iter(), |databuf, size| { - let tok: Vec = databuf.take(size).collect(); - let _ = databuf.next(); - // FIXME(@ohsayan): This is quite slow, we'll have to use SIMD in the future - Some(String::from_utf8_lossy(&tok).to_string()) - }) - .collect() -} - -#[cfg(test)] -#[test] -fn test_extract_idents() { - let testbuf = "set\nsayan\n17\n".as_bytes().to_vec(); - let skip_sequence: Vec = vec![3, 5, 2]; - let res = extract_idents(testbuf, skip_sequence); - assert_eq!( - vec!["set".to_owned(), "sayan".to_owned(), "17".to_owned()], - res - ); - let badbuf = vec![0, 0, 159, 146, 150]; - let skip_sequence: Vec = vec![1, 2]; - let res = extract_idents(badbuf, skip_sequence); - assert_eq!(res[1], "��"); -} - pub struct Connection { stream: TcpStream, } @@ -176,6 +130,6 @@ impl Connection { } } pub async fn close_conn_with_error(&mut self, bytes: impl RespBytes) { - self.stream.write_all(&bytes.into_response()).await.unwrap() + self.stream.write_all(&bytes.into_response()).await.unwrap(); } }