Upgrade `cli` to use the new protocol

next
Sayan Nandan 4 years ago
parent 819a6030b0
commit c5c9709e9b
No known key found for this signature in database
GPG Key ID: C31EFD7DDA12AEE0

38
Cargo.lock generated

@ -1,5 +1,14 @@
# This file is automatically @generated by Cargo.
# It is not intended for manual editing.
[[package]]
name = "aho-corasick"
version = "0.7.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "043164d8ba5c4c3035fec9bbee8647c0261d788f3474306f93bb65901cae0e86"
dependencies = [
"memchr",
]
[[package]]
name = "arc-swap"
version = "0.4.7"
@ -361,6 +370,24 @@ version = "0.1.57"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "41cc0f7e4d5d4544e8861606a285bb08d3e70712ccc7d2b84d7c0ccfaf4b05ce"
[[package]]
name = "regex"
version = "1.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c3780fcf44b193bc4d09f36d2a3c87b251da4a046c87795a0d35f4f927ad8e6"
dependencies = [
"aho-corasick",
"memchr",
"regex-syntax",
"thread_local",
]
[[package]]
name = "regex-syntax"
version = "0.6.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "26412eb97c6b088a6997e05f69403a802a92d520de2f8e63c2b65f9e0f47c4e8"
[[package]]
name = "scopeguard"
version = "1.1.0"
@ -439,6 +466,15 @@ dependencies = [
"winapi-util",
]
[[package]]
name = "thread_local"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d40c6d1b69745a6ec6fb1ca717914848da4b44ae29d9b3080cbee91d72a69b14"
dependencies = [
"lazy_static",
]
[[package]]
name = "tokio"
version = "0.2.22"
@ -480,8 +516,10 @@ version = "0.3.2"
dependencies = [
"bytes",
"devtimer",
"lazy_static",
"libtdb",
"rand",
"regex",
"tokio",
]

@ -12,6 +12,8 @@ tokio = {version = "0.2.22", features = ["full"]}
rand = "0.7.3"
devtimer = "4.0.0"
bytes = "0.5.6"
regex = "1.3.9"
lazy_static = "1.4.0"
[[bin]]
path="src/bin/benchmark.rs"

@ -21,18 +21,25 @@
//! This module provides methods to deserialize an incoming response packet
use libtdb::builders::MLINE_BUF;
use libtdb::de::*;
use libtdb::terrapipe::*;
use std::fmt;
#[derive(Debug, PartialEq)]
pub struct DataGroup(Vec<DataType>);
#[derive(Debug, PartialEq)]
#[non_exhaustive]
pub enum DataType {
Str(Option<String>),
RespCode(Option<String>),
}
/// Errors that may occur while parsing responses from the server
#[derive(Debug, PartialEq)]
pub enum ClientResult {
InvalidResponse(usize),
Response(Vec<DataGroup>, usize),
Empty(usize),
Incomplete(usize),
Incomplete,
}
impl fmt::Display for ClientResult {
@ -42,76 +49,222 @@ impl fmt::Display for ClientResult {
InvalidResponse(_) => write!(f, "ERROR: The server sent an invalid response"),
Response(_, _) => unimplemented!(),
Empty(_) => write!(f, ""),
Incomplete(_) => write!(f, "ERROR: The server sent an incomplete response"),
Incomplete => write!(f, "ERROR: The server sent an incomplete response"),
}
}
}
struct Metaline {
content_size: usize,
metalayout_size: usize,
resp_type: ActionType,
}
impl Metaline {
pub fn from_navigator(nav: &mut Navigator) -> Option<Self> {
if let Some(mline) = nav.get_line(Some(MLINE_BUF)) {
if mline.len() < 5 {
return None;
}
let resp_type = match unsafe { mline.get_unchecked(0) } {
b'$' => ActionType::Pipeline,
b'*' => ActionType::Simple,
_ => return None,
pub fn parse(buf: &[u8]) -> ClientResult {
if buf.len() < 6 {
// A packet that has less than 6 characters? Nonsense!
return ClientResult::Incomplete;
}
/*
We first get the metaframe, which looks something like:
```
#<numchars_in_next_line>\n
!<num_of_datagroups>\n
```
*/
let mut pos = 0;
if buf[pos] != b'#' {
return ClientResult::InvalidResponse(pos);
} else {
pos += 1;
}
let next_line = match read_line_and_return_next_line(&mut pos, &buf) {
Some(line) => line,
None => {
// This is incomplete
return ClientResult::Incomplete;
}
};
pos += 1; // Skip LF
// Find out the number of actions that we have to do
let mut action_size = 0usize;
if next_line[0] == b'*' {
let mut line_iter = next_line.into_iter().skip(1).peekable();
while let Some(dig) = line_iter.next() {
let curdig: usize = match dig.checked_sub(48) {
Some(dig) => {
if dig > 9 {
return ClientResult::InvalidResponse(pos);
} else {
dig.into()
}
}
None => return ClientResult::InvalidResponse(pos),
};
if resp_type == ActionType::Pipeline {
// TODO(@ohsayan): Enable pipelined responses to be parsed
unimplemented!("Pipelined responses cannot be parsed yet");
action_size = (action_size * 10) + curdig;
}
// This line gives us the number of actions
} else {
return ClientResult::InvalidResponse(pos);
}
let mut items: Vec<DataGroup> = Vec::with_capacity(action_size);
while pos < buf.len() && items.len() <= action_size {
match buf[pos] {
b'#' => {
pos += 1; // Skip '#'
let next_line = match read_line_and_return_next_line(&mut pos, &buf) {
Some(line) => line,
None => {
// This is incomplete
return ClientResult::Incomplete;
}
}; // Now we have the current line
pos += 1; // Skip the newline
// Move the cursor ahead by the number of bytes that we just read
// Let us check the current char
match next_line[0] {
b'&' => {
// This is an array
// Now let us parse the array size
let mut current_array_size = 0usize;
let mut linepos = 1; // Skip the '&' character
while linepos < next_line.len() {
let curdg: usize = match next_line[linepos].checked_sub(48) {
Some(dig) => {
if dig > 9 {
// If `dig` is greater than 9, then the current
// UTF-8 char isn't a number
return ClientResult::InvalidResponse(pos);
} else {
dig.into()
}
}
None => return ClientResult::InvalidResponse(pos),
};
current_array_size = (current_array_size * 10) + curdg; // Increment the size
linepos += 1; // Move the position ahead, since we just read another char
}
// Now we know the array size, good!
let mut actiongroup: Vec<DataType> = Vec::with_capacity(current_array_size);
// Let's loop over to get the elements till the size of this array
while pos < buf.len() && actiongroup.len() < current_array_size {
let mut element_size = 0usize;
let datatype = match buf[pos] {
b'+' => DataType::Str(None),
b'!' => DataType::RespCode(None),
_ => unimplemented!(),
};
pos += 1; // We've got the tsymbol above, so skip it
while pos < buf.len() && buf[pos] != b'\n' {
let curdig: usize = match buf[pos].checked_sub(48) {
Some(dig) => {
if dig > 9 {
// If `dig` is greater than 9, then the current
// UTF-8 char isn't a number
return ClientResult::InvalidResponse(pos);
} else {
dig.into()
}
}
None => return ClientResult::InvalidResponse(pos),
};
element_size = (element_size * 10) + curdig; // Increment the size
pos += 1; // Move the position ahead, since we just read another char
}
pos += 1;
// We now know the item size
let mut value = String::with_capacity(element_size);
let extracted = match buf.get(pos..pos + element_size) {
Some(s) => s,
None => return ClientResult::Incomplete,
};
pos += element_size; // Move the position ahead
value.push_str(&String::from_utf8_lossy(extracted));
pos += 1; // Skip the newline
actiongroup.push(match datatype {
DataType::Str(_) => DataType::Str(Some(value)),
DataType::RespCode(_) => DataType::RespCode(Some(value)),
});
}
items.push(DataGroup(actiongroup));
}
_ => return ClientResult::InvalidResponse(pos),
}
continue;
}
if let Some(sizes) = get_frame_sizes(unsafe { mline.get_unchecked(1..) }) {
return Some(Metaline {
content_size: unsafe { *sizes.get_unchecked(0) },
metalayout_size: unsafe { *sizes.get_unchecked(1) },
resp_type,
});
_ => {
// Since the variant '#' would does all the array
// parsing business, we should never reach here unless
// the packet is invalid
return ClientResult::InvalidResponse(pos);
}
}
None
}
}
#[derive(Debug)]
struct Metalayout(Vec<usize>);
impl Metalayout {
pub fn from_navigator(nav: &mut Navigator, mlayoutsize: usize) -> Option<Self> {
if let Some(layout) = nav.get_line(Some(mlayoutsize)) {
if let Some(skip_sequence) = get_skip_sequence(&layout) {
return Some(Metalayout(skip_sequence));
if buf.get(pos).is_none() {
// Either more data was sent or some data was missing
if items.len() == action_size {
if items.len() == 1 {
ClientResult::Response(items, pos)
} else {
// The CLI does not support batch queries
unimplemented!();
}
} else {
ClientResult::Incomplete
}
None
} else {
ClientResult::InvalidResponse(pos)
}
}
#[derive(Debug)]
pub struct Response {
pub data: Vec<String>,
pub resptype: ActionType,
}
impl Response {
pub fn from_navigator(mut nav: Navigator) -> ClientResult {
if let Some(metaline) = Metaline::from_navigator(&mut nav) {
if let Some(layout) = Metalayout::from_navigator(&mut nav, metaline.metalayout_size) {
if let Some(content) = nav.get_exact(metaline.content_size) {
let data = parse_df(content, layout.0, 1);
if let Some(data) = data {
return ClientResult::Response(data, nav.get_pos_usize());
}
/// Read a size line and return the following line
///
/// This reads a line that begins with the number, i.e make sure that
/// the **`#` character is skipped**
///
fn read_line_and_return_next_line<'a>(pos: &mut usize, buf: &'a [u8]) -> Option<&'a [u8]> {
let mut next_line_size = 0usize;
while pos < &mut buf.len() && buf[*pos] != b'\n' {
// 48 is the UTF-8 code for '0'
let curdig: usize = match buf[*pos].checked_sub(48) {
Some(dig) => {
if dig > 9 {
// If `dig` is greater than 9, then the current
// UTF-8 char isn't a number
return None;
} else {
dig.into()
}
}
}
ClientResult::InvalidResponse(nav.get_pos_usize())
None => return None,
};
next_line_size = (next_line_size * 10) + curdig; // Increment the size
*pos += 1; // Move the position ahead, since we just read another char
}
*pos += 1; // Skip the newline
// We now know the size of the next line
let next_line = match buf.get(*pos..*pos + next_line_size) {
Some(line) => line,
None => {
// This is incomplete
return None;
}
}; // Now we have the current line
// Move the cursor ahead by the number of bytes that we just read
*pos += next_line_size;
Some(next_line)
}
#[cfg(test)]
#[test]
fn test_parser() {
let res = "#2\n*1\n#2\n&1\n+4\nHEY!\n".as_bytes().to_owned();
assert_eq!(
parse(&res),
ClientResult::Response(
vec![DataGroup(vec![DataType::Str(Some("HEY!".to_owned()))])],
res.len()
)
);
let res = "#2\n*1\n#2\n&1\n!1\n0\n".as_bytes().to_owned();
assert_eq!(
parse(&res),
ClientResult::Response(
vec![DataGroup(vec![DataType::RespCode(Some("0".to_owned()))])],
res.len()
)
);
}

@ -21,13 +21,18 @@
mod deserializer;
use bytes::{Buf, BytesMut};
use deserializer::{ClientResult, Response};
use libtdb::builders::query::*;
use libtdb::de::*;
use deserializer::ClientResult;
use lazy_static::lazy_static;
use libtdb::TResult;
use libtdb::BUF_CAP;
use regex::Regex;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
lazy_static! {
static ref RE: Regex = Regex::new("[^\\s\"']+|\"[^\"]*\"|'[^']*'").unwrap();
}
pub struct Connection {
stream: TcpStream,
buffer: BytesMut,
@ -42,9 +47,8 @@ impl Connection {
})
}
pub async fn run_query(&mut self, query: String) {
let mut qbuilder = QueryBuilder::new_simple();
qbuilder.from_cmd(query);
match self.stream.write_all(&qbuilder.into_query()).await {
let query = proc_query(query);
match self.stream.write_all(&query).await {
Ok(_) => (),
Err(_) => {
eprintln!("ERROR: Couldn't write data to socket");
@ -65,8 +69,8 @@ impl Connection {
eprintln!("ERROR: The remote end reset the connection");
return;
}
ClientResult::Incomplete(f) => {
self.buffer.advance(f);
ClientResult::Incomplete => {
continue;
}
ClientResult::Response(r, f) => {
self.buffer.advance(f);
@ -74,7 +78,7 @@ impl Connection {
return;
}
for group in r {
println!("{}", group);
println!("{:?}", group);
}
return;
}
@ -91,7 +95,36 @@ impl Connection {
// The connection was possibly reset
return ClientResult::Empty(0);
}
let nav = Navigator::new(&self.buffer);
Response::from_navigator(nav)
deserializer::parse(&self.buffer)
}
}
fn proc_query(querystr: String) -> Vec<u8> {
// TODO(@ohsayan): Enable "" to be escaped
// let args: Vec<&str> = RE.find_iter(&querystr).map(|val| val.as_str()).collect();
let args: Vec<&str> = querystr.split_whitespace().collect();
let mut bytes = Vec::with_capacity(querystr.len());
bytes.extend(b"#2\n*1\n#2\n&");
bytes.extend(args.len().to_string().into_bytes());
bytes.push(b'\n');
args.into_iter().for_each(|arg| {
bytes.push(b'#');
let len_bytes = arg.len().to_string().into_bytes();
bytes.extend(len_bytes);
bytes.push(b'\n');
bytes.extend(arg.as_bytes());
bytes.push(b'\n');
});
bytes
}
#[test]
fn test_queryproc() {
let query = "GET x y".to_owned();
assert_eq!(
"#2\n*1\n#2\n&3\n#3\nGET\n#1\nx\n#1\ny\n"
.as_bytes()
.to_owned(),
proc_query(query)
)
}

@ -31,6 +31,7 @@ lazy_static! {
pub static ref SERVER_ERR: Vec<u8> = "#2\n*1\n#2\n&1\n!1\n5\n".as_bytes().to_owned();
pub static ref OTHER_ERR_EMPTY: Vec<u8> = "#2\n*1\n#2\n&1\n!1\n6\n".as_bytes().to_owned();
pub static ref HEYA: Vec<u8> = "#2\n*1\n#2\n&1\n+4\nHEY!\n".as_bytes().to_owned();
pub static ref UNKNOWN_ACTION: Vec<u8> =
"#2\n*1\n&1\n!15\nUnknown command\n".as_bytes().to_owned();
pub static ref UNKNOWN_ACTION: Vec<u8> = "#2\n*1\n#2\n&1\n!15\nUnknown command\n"
.as_bytes()
.to_owned();
}

Loading…
Cancel
Save