Implement client

next
Sayan Nandan 4 years ago
parent 0f1dde109b
commit 59046db8f8
No known key found for this signature in database
GPG Key ID: C31EFD7DDA12AEE0

@ -10,7 +10,7 @@ You can see a list of contributors **[here](./CONTRIBUTORS.md)**
* `FIXME(@<username>)` : Use this when you have made an implementation that can be improved in the future, such as improved efficiency * `FIXME(@<username>)` : Use this when you have made an implementation that can be improved in the future, such as improved efficiency
* `HACK(@<username>)` : Use this when the code you are using a temporary workaround * `HACK(@<username>)` : Use this when the code you are using a temporary workaround
* `TODO(@<username>)` : Use this when you have kept something incomplete * `TODO(@<username>)` : Use this when you have kept something ArgumentError
### Formatting ### Formatting

5
Cargo.lock generated

@ -279,9 +279,9 @@ dependencies = [
[[package]] [[package]]
name = "tokio" name = "tokio"
version = "0.2.21" version = "0.2.22"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d099fa27b9702bed751524694adbe393e18b36b204da91eb1cbbbbb4a5ee2d58" checksum = "5d34ca54d84bf2b5b4d7d31e901a8464f7b60ac145a284fba25ceb801f2ddccd"
dependencies = [ dependencies = [
"bytes", "bytes",
"fnv", "fnv",
@ -317,6 +317,7 @@ name = "tsh"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"corelib", "corelib",
"tokio",
] ]
[[package]] [[package]]

@ -7,4 +7,5 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
corelib = {path = "../corelib"} corelib = {path = "../corelib"}
tokio = {version = "0.2.22", features = ["full"]}

@ -19,17 +19,16 @@
* *
*/ */
use corelib::terrapipe::{self, DEF_QMETALINE_BUFSIZE}; use crate::client::Client;
use std::io::{self, prelude::*, BufReader}; use std::io::{self, prelude::*};
use std::net::TcpStream; use tokio::signal;
use std::process;
const ADDR: &'static str = "127.0.0.1:2003"; const ADDR: &'static str = "127.0.0.1:2003";
pub fn execute_query() { pub async fn execute_query() {
let mut connection = match TcpStream::connect(ADDR) { let mut client = match Client::new(ADDR).await {
Ok(c) => c, Ok(c) => c,
Err(_) => { Err(e) => {
eprintln!("ERROR: Couldn't connect to the TDB server"); eprintln!("Error: {}", e);
process::exit(0x100); return;
} }
}; };
loop { loop {
@ -41,85 +40,6 @@ pub fn execute_query() {
io::stdin() io::stdin()
.read_line(&mut rl) .read_line(&mut rl)
.expect("Couldn't read line, this is a serious error!"); .expect("Couldn't read line, this is a serious error!");
let mut cmd = terrapipe::QueryBuilder::new_simple(); client.run(rl, signal::ctrl_c()).await;
cmd.from_cmd(rl);
let (size, resp) = cmd.prepare_response();
match connection.write(&resp) {
Ok(n) => {
if n < size {
eprintln!("ERROR: Couldn't write all bytes to server");
process::exit(0x100);
}
},
Err(_) => {
eprintln!("ERROR: Couldn't send data to the TDB server");
process::exit(0x100);
}
}
println!("{}", parse_response(&connection));
}
}
pub fn parse_response(stream: &TcpStream) -> String {
let mut metaline = String::with_capacity(DEF_QMETALINE_BUFSIZE);
let mut bufreader = BufReader::new(stream);
match bufreader.read_line(&mut metaline) {
Ok(_) => (),
Err(_) => {
eprintln!("Couldn't read metaline from tdb server");
process::exit(0x100);
}
} }
let metaline = metaline.trim_matches(char::from(0));
let fields: Vec<&str> = metaline.split('!').collect();
if let (Some(resptype), Some(respcode), Some(clength), Some(ml_length)) =
(fields.get(0), fields.get(1), fields.get(2), fields.get(3))
{
if *resptype == "$" {
todo!("Pipelined response deconding is yet to be implemented")
}
let mut is_err_response = false;
match respcode.to_owned() {
"0" => (),
"1" => return format!("ERROR: Couldn't find the requested key"),
"2" => return format!("ERROR: Can't overwrite existing value"),
"3" => return format!("ERROR: tsh sent an invalid metaframe"),
"4" => return format!("ERROR: tsh sent an incomplete query packet"),
"5" => return format!("ERROR: tdb had an internal server error"),
"6" => is_err_response = true,
_ => (),
}
if let (Ok(clength), Ok(ml_length)) = (clength.parse::<usize>(), ml_length.parse::<usize>())
{
let mut metalinebuf = String::with_capacity(ml_length);
let mut databuf = vec![0; clength];
bufreader.read_line(&mut metalinebuf).unwrap();
let sizes: Vec<usize> = metalinebuf
.split("#")
.map(|size| size.parse::<usize>().unwrap())
.collect();
bufreader.read(&mut databuf).unwrap();
eprintln!("{:?}", String::from_utf8_lossy(&databuf));
let res = extract_idents(databuf, sizes);
let resp: String = res.iter().flat_map(|s| s.chars()).collect();
if !is_err_response {
return resp;
} else {
return format!("ERROR: {}", resp);
}
}
}
format!("ERROR: The server sent an invalid response")
}
fn extract_idents(buf: Vec<u8>, skip_sequence: Vec<usize>) -> Vec<String> {
skip_sequence
.into_iter()
.scan(buf.into_iter(), |databuf, size| {
let tok: Vec<u8> = databuf.take(size).collect();
let _ = databuf.next();
// FIXME(@ohsayan): This is quite slow, we'll have to use SIMD in the future
Some(String::from_utf8_lossy(&tok).to_string())
})
.collect()
} }

@ -0,0 +1,160 @@
/*
* Created on Thu Jul 23 2020
*
* This file is a part of the source code for the Terrabase database
* Copyright (c) 2020, Sayan Nandan <ohsayan at outlook dot com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
use corelib::{
terrapipe::{self, ActionType, QueryBuilder, RespCodes, DEF_QMETALAYOUT_BUFSIZE},
TResult,
};
use std::{error::Error, fmt, process};
use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
use tokio::net::TcpStream;
use std::future::Future;
#[derive(Debug)]
pub enum ClientError {
RespCode(RespCodes),
InvalidResponse,
OtherError(String),
}
impl fmt::Display for ClientError {
fn fmt(&self, mut f: &mut fmt::Formatter<'_>) -> fmt::Result {
use ClientError::*;
match self {
RespCode(r) => r.fmt(&mut f),
InvalidResponse => write!(f, "ERROR: The server sent an invalid response"),
OtherError(e) => write!(f, "ERROR: {}", e),
}
}
}
impl Error for ClientError {}
pub struct Client {
con: TcpStream,
}
pub struct RMetaline {
content_size: usize,
metalayout_size: usize,
respcode: RespCodes,
resptype: ActionType,
}
impl RMetaline {
pub fn from_buf(buf: String) -> TResult<Self> {
let parts: Vec<&str> = buf.split('!').collect();
if let (Some(resptype), Some(respcode), Some(clength), Some(metalayout_size)) =
(parts.get(0), parts.get(1), parts.get(2), parts.get(3))
{
if resptype == &"$" {
todo!("Pipelined responses are yet to be implemented");
}
if resptype != &"*" {
return Err(ClientError::InvalidResponse.into());
}
if let (Some(respcode), Ok(clength), Ok(metalayout_size)) = (
RespCodes::from_str(respcode, None),
clength.trim_matches(char::from(0)).trim().parse::<usize>(),
metalayout_size
.trim_matches(char::from(0))
.trim()
.parse::<usize>(),
) {
return Ok(RMetaline {
content_size: clength,
metalayout_size,
respcode,
resptype: ActionType::Simple,
});
} else {
Err(ClientError::InvalidResponse.into())
}
} else {
Err(ClientError::InvalidResponse.into())
}
}
}
impl Client {
pub async fn new(addr: &str) -> TResult<Self> {
let con = TcpStream::connect(addr).await?;
Ok(Client { con })
}
pub async fn run(&mut self, cmd: String, sig: impl Future) {
if cmd.len() == 0 {
return;
} else {
let mut qbuilder = QueryBuilder::new_simple();
qbuilder.from_cmd(cmd);
let q = tokio::select! {
query = self.run_query(qbuilder.prepare_response()) => query,
_ = sig => {
println!("Goodbye!");
// Terminate the connection
process::exit(0x100);
}
};
match q {
Ok(res) => {
res.into_iter().for_each(|val| println!("{}", val));
return;
}
Err(e) => {
eprintln!("{}", e);
return;
}
};
}
}
async fn run_query(&mut self, (_, query_bytes): (usize, Vec<u8>)) -> TResult<Vec<String>> {
self.con.write_all(&query_bytes).await?;
let mut metaline_buf = String::with_capacity(DEF_QMETALAYOUT_BUFSIZE);
let mut bufreader = BufReader::new(&mut self.con);
bufreader.read_line(&mut metaline_buf).await?;
let metaline = RMetaline::from_buf(metaline_buf)?;
// Skip reading the rest of the data if the metaline says that there is an
// error. WARNING: This would mean that any other data sent - would simply be
// ignored
let mut is_other_error = false;
match metaline.respcode {
// Only these two variants have some data in the dataframe, so we continue
RespCodes::Okay => (),
RespCodes::OtherError(_) => is_other_error = true,
code @ _ => return Err(code.into()),
}
if metaline.content_size == 0 {
return Ok(vec![]);
}
let (mut metalayout, mut dataframe) = (
String::with_capacity(metaline.metalayout_size),
vec![0u8; metaline.content_size],
);
bufreader.read_line(&mut metalayout).await?;
let metalayout = terrapipe::get_sizes(metalayout)?;
bufreader.read_exact(&mut dataframe).await?;
if is_other_error {
Err(ClientError::OtherError(String::from_utf8_lossy(&dataframe).to_string()).into())
} else {
Ok(terrapipe::extract_idents(dataframe, metalayout))
}
}
}

@ -20,10 +20,12 @@
*/ */
mod argparse; mod argparse;
mod client;
use tokio;
const MSG_WELCOME: &'static str = "TerrabaseDB v0.1.0"; const MSG_WELCOME: &'static str = "TerrabaseDB v0.1.0";
fn main() { #[tokio::main]
async fn main() {
println!("{}", MSG_WELCOME); println!("{}", MSG_WELCOME);
argparse::execute_query(); argparse::execute_query().await;
} }

@ -19,7 +19,10 @@
* *
*/ */
//! This implements the Terrapipe protocol //! This implements primitives for the Terrapipe protocol
use std::error::Error;
use std::fmt;
/// Default query metaline buffer size /// Default query metaline buffer size
pub const DEF_QMETALINE_BUFSIZE: usize = 44; pub const DEF_QMETALINE_BUFSIZE: usize = 44;
@ -46,18 +49,73 @@ pub mod responses {
use lazy_static::lazy_static; use lazy_static::lazy_static;
lazy_static! { lazy_static! {
/// Empty `0`(Okay) response - without any content /// Empty `0`(Okay) response - without any content
pub static ref RESP_OKAY_EMPTY: Vec<u8> = "0!0!0".as_bytes().to_owned(); pub static ref RESP_OKAY_EMPTY: Vec<u8> = "*!0!0!0\n".as_bytes().to_owned();
/// `1` Not found response /// `1` Not found response
pub static ref RESP_NOT_FOUND: Vec<u8> = "1!0!0".as_bytes().to_owned(); pub static ref RESP_NOT_FOUND: Vec<u8> = "*!1!0!0\n".as_bytes().to_owned();
/// `2` Overwrite Error response /// `2` Overwrite Error response
pub static ref RESP_OVERWRITE_ERROR: Vec<u8> = "2!0!0".as_bytes().to_owned(); pub static ref RESP_OVERWRITE_ERROR: Vec<u8> = "*!2!0!0\n".as_bytes().to_owned();
/// `3` Invalid Metaframe response /// `3` Invalid Metaframe response
pub static ref RESP_INVALID_MF: Vec<u8> = "3!0!0".as_bytes().to_owned(); pub static ref RESP_INVALID_MF: Vec<u8> = "*!3!0!0\n".as_bytes().to_owned();
/// `4` Incomplete frame response /// `4` ArgumentError frame response
pub static ref RESP_INCOMPLETE: Vec<u8> = "4!0!0".as_bytes().to_owned(); pub static ref RESP_ArgumentError: Vec<u8> = "*!4!0!0\n".as_bytes().to_owned();
/// `5` Internal server error response /// `5` Internal server error response
pub static ref RESP_SERVER_ERROR: Vec<u8> = "5!0!0".as_bytes().to_owned(); pub static ref RESP_SERVER_ERROR: Vec<u8> = "*!5!0!0\n".as_bytes().to_owned();
}
}
pub fn get_sizes(stream: String) -> Result<Vec<usize>, RespCodes> {
let sstr: Vec<&str> = stream.split('#').collect();
let mut sstr_iter = sstr.into_iter().peekable();
let mut sizes = Vec::with_capacity(sstr_iter.len());
while let Some(size) = sstr_iter.next() {
if sstr_iter.peek().is_some() {
// Skip the last element
if let Ok(val) = size.parse::<usize>() {
sizes.push(val);
} else {
return Err(RespCodes::InvalidMetaframe);
}
} else {
break;
}
} }
Ok(sizes)
}
#[cfg(test)]
#[test]
fn test_get_sizes() {
let retbuf = "10#20#30#".to_owned();
let sizes = get_sizes(retbuf).unwrap();
assert_eq!(sizes, vec![10usize, 20usize, 30usize]);
}
pub fn extract_idents(buf: Vec<u8>, skip_sequence: Vec<usize>) -> Vec<String> {
skip_sequence
.into_iter()
.scan(buf.into_iter(), |databuf, size| {
let tok: Vec<u8> = databuf.take(size).collect();
let _ = databuf.next();
// FIXME(@ohsayan): This is quite slow, we'll have to use SIMD in the future
Some(String::from_utf8_lossy(&tok).to_string())
})
.collect()
}
#[cfg(test)]
#[test]
fn test_extract_idents() {
let testbuf = "set\nsayan\n17\n".as_bytes().to_vec();
let skip_sequence: Vec<usize> = vec![3, 5, 2];
let res = extract_idents(testbuf, skip_sequence);
assert_eq!(
vec!["set".to_owned(), "sayan".to_owned(), "17".to_owned()],
res
);
let badbuf = vec![0, 0, 159, 146, 150];
let skip_sequence: Vec<usize> = vec![1, 2];
let res = extract_idents(badbuf, skip_sequence);
assert_eq!(res[1], "<22><>");
} }
/// Response codes returned by the server /// Response codes returned by the server
@ -65,36 +123,77 @@ pub mod responses {
pub enum RespCodes { pub enum RespCodes {
/// `0`: Okay (Empty Response) - use the `ResponseBuilder` for building /// `0`: Okay (Empty Response) - use the `ResponseBuilder` for building
/// responses that contain data /// responses that contain data
EmptyResponseOkay, Okay,
/// `1`: Not Found /// `1`: Not Found
NotFound, NotFound,
/// `2`: Overwrite Error /// `2`: Overwrite Error
OverwriteError, OverwriteError,
/// `3`: Invalid Metaframe /// `3`: Invalid Metaframe
InvalidMetaframe, InvalidMetaframe,
/// `4`: Incomplete /// `4`: ArgumentError
Incomplete, ArgumentError,
/// `5`: Server Error /// `5`: Server Error
ServerError, ServerError,
/// `6`: Some other error - the wrapped `String` will be returned in the response body /// `6`: Some other error - the wrapped `String` will be returned in the response body.
OtherError(String), /// Just a note, this gets quite messy, especially when we're using it for deconding responses
OtherError(Option<String>),
} }
impl fmt::Display for RespCodes {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
use RespCodes::*;
match self {
Okay => unimplemented!(),
NotFound => write!(f, "ERROR: Couldn't find the key"),
OverwriteError => write!(f, "ERROR: Existing values cannot be overwritten"),
InvalidMetaframe => write!(f, "ERROR: Invalid metaframe"),
ArgumentError => write!(f, "ERROR: The command is not in the correct format"),
ServerError => write!(f, "ERROR: The server had an internal error"),
OtherError(e) => match e {
None => write!(f, "ERROR: Some unknown error occurred"),
Some(e) => write!(f, "ERROR: {}", e),
},
}
}
}
impl Error for RespCodes {}
impl From<RespCodes> for u8 { impl From<RespCodes> for u8 {
fn from(rcode: RespCodes) -> u8 { fn from(rcode: RespCodes) -> u8 {
use RespCodes::*; use RespCodes::*;
match rcode { match rcode {
EmptyResponseOkay => 0, Okay => 0,
NotFound => 1, NotFound => 1,
OverwriteError => 2, OverwriteError => 2,
InvalidMetaframe => 3, InvalidMetaframe => 3,
Incomplete => 4, ArgumentError => 4,
ServerError => 5, ServerError => 5,
OtherError(_) => 6, OtherError(_) => 6,
} }
} }
} }
impl RespCodes {
pub fn from_str(val: &str, extra: Option<String>) -> Option<Self> {
use RespCodes::*;
let res = match val.parse::<u8>() {
Ok(val) => match val {
0 => Okay,
1 => NotFound,
2 => OverwriteError,
3 => InvalidMetaframe,
4 => ArgumentError,
5 => ServerError,
6 => OtherError(extra),
_ => return None,
},
Err(_) => return None,
};
Some(res)
}
}
/// Representation of the query action type - pipelined or simple /// Representation of the query action type - pipelined or simple
#[derive(Debug, PartialEq)] #[derive(Debug, PartialEq)]
pub enum ActionType { pub enum ActionType {
@ -113,26 +212,26 @@ impl RespBytes for RespCodes {
use responses::*; use responses::*;
use RespCodes::*; use RespCodes::*;
match self { match self {
EmptyResponseOkay => RESP_OKAY_EMPTY.to_owned(), Okay => RESP_OKAY_EMPTY.to_owned(),
NotFound => RESP_NOT_FOUND.to_owned(), NotFound => RESP_NOT_FOUND.to_owned(),
OverwriteError => RESP_OVERWRITE_ERROR.to_owned(), OverwriteError => RESP_OVERWRITE_ERROR.to_owned(),
InvalidMetaframe => RESP_INVALID_MF.to_owned(), InvalidMetaframe => RESP_INVALID_MF.to_owned(),
Incomplete => RESP_INCOMPLETE.to_owned(), ArgumentError => RESP_ArgumentError.to_owned(),
ServerError => RESP_SERVER_ERROR.to_owned(), ServerError => RESP_SERVER_ERROR.to_owned(),
OtherError(e) => format!("6!{}!#{}", e.len(), e.len()).as_bytes().to_owned(), OtherError(e) => match e {
Some(e) => {
let dl = e.len().to_string();
format!("*!6!{}!{}\n#{}\n{}", e.len(), dl.len(), dl, e)
.as_bytes()
.to_owned()
}
None => format!("*!6!0!0\n").as_bytes().to_owned(),
},
} }
} }
} }
/// The query dataframe
#[derive(Debug)] #[derive(Debug)]
pub struct QueryDataframe {
/// The data part
pub data: Vec<String>,
/// The query action type
pub actiontype: ActionType,
}
/// This is enum represents types of responses which can be built from it /// This is enum represents types of responses which can be built from it
pub enum ResponseBuilder { pub enum ResponseBuilder {
SimpleResponse, // TODO: Add pipelined response builder here SimpleResponse, // TODO: Add pipelined response builder here
@ -145,12 +244,12 @@ impl ResponseBuilder {
} }
} }
#[derive(Debug)]
/// Representation of a simple response /// Representation of a simple response
pub struct SimpleResponse { pub struct SimpleResponse {
respcode: u8, respcode: u8,
metalayout_buf: String, metalayout_buf: String,
dataframe_buf: String, dataframe_buf: String,
size_tracker: usize,
} }
impl SimpleResponse { impl SimpleResponse {
@ -161,14 +260,11 @@ impl SimpleResponse {
respcode, respcode,
metalayout_buf: String::with_capacity(2), metalayout_buf: String::with_capacity(2),
dataframe_buf: String::with_capacity(40), dataframe_buf: String::with_capacity(40),
size_tracker: 0,
} }
} }
/// Add data to the response /// Add data to the response
pub fn add_data(&mut self, data: String) { pub fn add_data(&mut self, data: String) {
let datstr = data.len().to_string(); self.metalayout_buf.push_str(&format!("{}#", data.len()));
self.metalayout_buf.push_str(&format!("{}#", datstr.len()));
self.size_tracker += datstr.len() + 1;
self.dataframe_buf.push_str(&data); self.dataframe_buf.push_str(&data);
self.dataframe_buf.push('\n'); self.dataframe_buf.push('\n');
} }
@ -178,7 +274,7 @@ impl SimpleResponse {
format!( format!(
"*!{}!{}!{}\n{}\n{}", "*!{}!{}!{}\n{}\n{}",
self.respcode, self.respcode,
self.size_tracker, self.dataframe_buf.len(),
self.metalayout_buf.len(), self.metalayout_buf.len(),
self.metalayout_buf, self.metalayout_buf,
self.dataframe_buf self.dataframe_buf
@ -197,7 +293,7 @@ impl RespBytes for SimpleResponse {
#[cfg(test)] #[cfg(test)]
#[test] #[test]
fn test_simple_response() { fn test_simple_response() {
let mut s = ResponseBuilder::new_simple(RespCodes::EmptyResponseOkay); let mut s = ResponseBuilder::new_simple(RespCodes::Okay);
s.add_data("Sayan".to_owned()); s.add_data("Sayan".to_owned());
s.add_data("loves".to_owned()); s.add_data("loves".to_owned());
s.add_data("you".to_owned()); s.add_data("you".to_owned());

@ -7,6 +7,6 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
tokio = { version = "0.2.21", features = ["full"] } tokio = { version = "0.2.22", features = ["full"] }
bytes = "0.5.6" bytes = "0.5.6"
corelib = {path ="../corelib"} corelib = {path ="../corelib"}

@ -19,7 +19,7 @@
* *
*/ */
use corelib::terrapipe::QueryDataframe; use crate::protocol::QueryDataframe;
use corelib::terrapipe::{tags, ActionType, RespBytes, RespCodes, ResponseBuilder}; use corelib::terrapipe::{tags, ActionType, RespBytes, RespCodes, ResponseBuilder};
use std::collections::{hash_map::Entry, HashMap}; use std::collections::{hash_map::Entry, HashMap};
use std::sync::{self, Arc, RwLock}; use std::sync::{self, Arc, RwLock};
@ -95,8 +95,7 @@ impl CoreDB {
Ok(v) => v, Ok(v) => v,
Err(e) => return e.into_response(), Err(e) => return e.into_response(),
}; };
let mut resp = let mut resp = ResponseBuilder::new_simple(RespCodes::Okay);
ResponseBuilder::new_simple(RespCodes::EmptyResponseOkay);
resp.add_data(res.to_owned()); resp.add_data(res.to_owned());
return resp.into_response(); return resp.into_response();
} }
@ -111,7 +110,7 @@ impl CoreDB {
Ok(_) => { Ok(_) => {
#[cfg(Debug)] #[cfg(Debug)]
self.print_debug_table(); self.print_debug_table();
return RespCodes::EmptyResponseOkay.into_response(); return RespCodes::Okay.into_response();
} }
Err(e) => return e.into_response(), Err(e) => return e.into_response(),
} }
@ -130,7 +129,7 @@ impl CoreDB {
#[cfg(Debug)] #[cfg(Debug)]
self.print_debug_table(); self.print_debug_table();
RespCodes::EmptyResponseOkay.into_response() RespCodes::Okay.into_response()
} }
} }
Err(e) => return e.into_response(), Err(e) => return e.into_response(),
@ -148,22 +147,28 @@ impl CoreDB {
#[cfg(Debug)] #[cfg(Debug)]
self.print_debug_table(); self.print_debug_table();
return RespCodes::EmptyResponseOkay.into_response(); return RespCodes::Okay.into_response();
} }
Err(e) => return e.into_response(), Err(e) => return e.into_response(),
} }
} else {
} }
} }
} }
tags::TAG_HEYA => { tags::TAG_HEYA => {
let mut resp = ResponseBuilder::new_simple(RespCodes::EmptyResponseOkay); if buf.next().is_none() {
resp.add_data("HEY!".to_owned()); let mut resp = ResponseBuilder::new_simple(RespCodes::Okay);
return resp.into_response(); resp.add_data("HEY!".to_owned());
return resp.into_response();
}
}
_ => {
return RespCodes::OtherError(Some("Unknown command".to_owned()))
.into_response()
} }
_ => return RespCodes::OtherError("Unknown command".to_owned()).into_response(),
} }
} }
RespCodes::InvalidMetaframe.into_response() RespCodes::ArgumentError.into_response()
} }
pub fn new() -> Self { pub fn new() -> Self {
CoreDB { CoreDB {

@ -133,7 +133,6 @@ impl CHandler {
return; return;
} }
}; };
eprintln!("{:?}", try_df);
match try_df { match try_df {
Ok(df) => self.con.write_response(self.db.execute_query(df)).await, Ok(df) => self.con.write_response(self.db.execute_query(df)).await,
Err(e) => self.con.close_conn_with_error(e).await, Err(e) => self.con.close_conn_with_error(e).await,

@ -19,11 +19,20 @@
* *
*/ */
use corelib::terrapipe::{ActionType, QueryDataframe}; use corelib::terrapipe::{extract_idents, get_sizes, ActionType};
use corelib::terrapipe::{RespBytes, RespCodes, DEF_QMETALINE_BUFSIZE}; use corelib::terrapipe::{RespBytes, RespCodes, DEF_QMETALINE_BUFSIZE};
use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader}; use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
use tokio::net::TcpStream; use tokio::net::TcpStream;
/// The query dataframe
#[derive(Debug)]
pub struct QueryDataframe {
/// The data part
pub data: Vec<String>,
/// The query action type
pub actiontype: ActionType,
}
#[derive(Debug, PartialEq)] #[derive(Debug, PartialEq)]
pub struct PreQMF { pub struct PreQMF {
action_type: ActionType, action_type: ActionType,
@ -81,61 +90,6 @@ fn test_preqmf() {
assert_eq!(preqmf, pqmf_should_be); assert_eq!(preqmf, pqmf_should_be);
} }
pub fn get_sizes(stream: String) -> Result<Vec<usize>, RespCodes> {
let sstr: Vec<&str> = stream.split('#').collect();
let mut sstr_iter = sstr.into_iter().peekable();
let mut sizes = Vec::with_capacity(sstr_iter.len());
while let Some(size) = sstr_iter.next() {
if sstr_iter.peek().is_some() {
// Skip the last element
if let Ok(val) = size.parse::<usize>() {
sizes.push(val);
} else {
return Err(RespCodes::InvalidMetaframe);
}
} else {
break;
}
}
Ok(sizes)
}
#[cfg(test)]
#[test]
fn test_get_sizes() {
let retbuf = "10#20#30#".to_owned();
let sizes = get_sizes(retbuf).unwrap();
assert_eq!(sizes, vec![10usize, 20usize, 30usize]);
}
fn extract_idents(buf: Vec<u8>, skip_sequence: Vec<usize>) -> Vec<String> {
skip_sequence
.into_iter()
.scan(buf.into_iter(), |databuf, size| {
let tok: Vec<u8> = databuf.take(size).collect();
let _ = databuf.next();
// FIXME(@ohsayan): This is quite slow, we'll have to use SIMD in the future
Some(String::from_utf8_lossy(&tok).to_string())
})
.collect()
}
#[cfg(test)]
#[test]
fn test_extract_idents() {
let testbuf = "set\nsayan\n17\n".as_bytes().to_vec();
let skip_sequence: Vec<usize> = vec![3, 5, 2];
let res = extract_idents(testbuf, skip_sequence);
assert_eq!(
vec!["set".to_owned(), "sayan".to_owned(), "17".to_owned()],
res
);
let badbuf = vec![0, 0, 159, 146, 150];
let skip_sequence: Vec<usize> = vec![1, 2];
let res = extract_idents(badbuf, skip_sequence);
assert_eq!(res[1], "<22><>");
}
pub struct Connection { pub struct Connection {
stream: TcpStream, stream: TcpStream,
} }
@ -176,6 +130,6 @@ impl Connection {
} }
} }
pub async fn close_conn_with_error(&mut self, bytes: impl RespBytes) { pub async fn close_conn_with_error(&mut self, bytes: impl RespBytes) {
self.stream.write_all(&bytes.into_response()).await.unwrap() self.stream.write_all(&bytes.into_response()).await.unwrap();
} }
} }

Loading…
Cancel
Save