Upgrade sky-bench to use client driver

next
Sayan Nandan 3 years ago
parent f55fa85174
commit 5847cf1da9

6
Cargo.lock generated

@ -374,6 +374,7 @@ dependencies = [
"bytes",
"lazy_static",
"regex",
"skytable",
"termcolor",
]
@ -802,10 +803,13 @@ version = "0.5.2"
dependencies = [
"clap",
"devtimer",
"lazy_static",
"libsky",
"rand",
"regex",
"serde",
"serde_json",
"skytable",
]
[[package]]
@ -866,7 +870,7 @@ dependencies = [
[[package]]
name = "skytable"
version = "0.2.3"
source = "git+https://github.com/skytable/client-rust?branch=next#9268e8d9e40e9c9599142734108ec7f2bcc07101"
source = "git+https://github.com/skytable/client-rust?branch=next#48eeaad07dc21d070fa3f3fca3ff2979841bbb65"
dependencies = [
"bytes",
"tokio",

@ -24,12 +24,7 @@
*
*/
use crossterm::style::{Color, Print, ResetColor, SetForegroundColor};
use skytable::{AsyncConnection, Element, Query, RespCode, Response};
use std::str::FromStr;
lazy_static::lazy_static! {
static ref RE: regex::Regex = regex::Regex::from_str(r#"("[^"]*"|'[^']*'|[\S]+)+"#).unwrap();
}
use skytable::{AsyncConnection, Element, RespCode, Response};
pub struct Runner {
con: AsyncConnection,
@ -110,14 +105,7 @@ impl Runner {
Runner { con }
}
pub async fn run_query(&mut self, unescaped_items: &str) {
let args: Vec<String> = RE
.find_iter(unescaped_items)
.map(|val| val.as_str().replace("'", "").replace("\"", "").to_owned())
.collect();
let mut query = Query::new();
args.into_iter().for_each(|arg| {
query.arg(arg);
});
let query = libsky::turn_into_query(unescaped_items);
match self.con.run_simple_query(query).await {
Ok(resp) => match resp {
Response::InvalidResponse => {

@ -10,4 +10,5 @@ edition = "2018"
lazy_static = "1.4.0"
bytes = "1.0.1"
termcolor = "1.1.2"
regex = "1.5.4"
regex = "1.5.4"
skytable = { git = "https://github.com/skytable/client-rust", branch = "next", features = ["dbg"], default-features = false }

@ -28,10 +28,36 @@
//!
//! This contains modules which are shared by both the `cli` and the `server` modules
pub mod terrapipe;
pub mod util;
use skytable::Query;
use std::error::Error;
/// A generic result
pub type TResult<T> = Result<T, Box<dyn Error>>;
/// The size of the read buffer in bytes
pub const BUF_CAP: usize = 8 * 1024; // 8 KB per-connection
use std::str::FromStr;
lazy_static::lazy_static! {
static ref RE: regex::Regex = regex::Regex::from_str(r#"("[^"]*"|'[^']*'|[\S]+)+"#).unwrap();
}
pub fn split_into_args(q: &str) -> Vec<String> {
let args: Vec<String> = RE
.find_iter(q)
.map(|val| val.as_str().replace("'", "").replace("\"", "").to_owned())
.collect();
args
}
pub fn turn_into_query(q: &str) -> Query {
let mut query = Query::new();
split_into_args(q).into_iter().for_each(|arg| {
query.arg(arg);
});
query
}
pub fn into_raw_query(q: &str) -> Vec<u8> {
turn_into_query(q).into_raw_query()
}

@ -1,177 +0,0 @@
/*
* Created on Sat Jul 18 2020
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2020, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
//! # The Terrapipe protocol
//! This module implements primitives for the Terrapipe protocol
//!
pub const ADDR: &'static str = "127.0.0.1";
use std::str::FromStr;
lazy_static::lazy_static! {
static ref RE: regex::Regex = regex::Regex::from_str(r#"("[^"]*"|'[^']*'|[\S]+)+"#).unwrap();
}
/// Response codes returned by the server
#[derive(Debug, PartialEq)]
pub enum RespCodes {
/// `0`: Okay (Empty Response) - use the `ResponseBuilder` for building
/// responses that contain data
Okay,
/// `1`: Not Found
NotFound,
/// `2`: Overwrite Error
OverwriteError,
/// `3`: Action Error
ActionError,
/// `4`: Packet Error
PacketError,
/// `5`: Server Error
ServerError,
/// `6`: Some other error - the wrapped `String` will be returned in the response body.
/// Just a note, this gets quite messy, especially when we're using it for deconding responses
OtherError(Option<String>),
}
impl From<RespCodes> for u8 {
fn from(rcode: RespCodes) -> u8 {
use RespCodes::*;
match rcode {
Okay => 0,
NotFound => 1,
OverwriteError => 2,
ActionError => 3,
PacketError => 4,
ServerError => 5,
OtherError(_) => 6,
}
}
}
impl From<RespCodes> for char {
fn from(rcode: RespCodes) -> char {
use RespCodes::*;
match rcode {
Okay => '0',
NotFound => '1',
OverwriteError => '2',
ActionError => '3',
PacketError => '4',
ServerError => '5',
OtherError(_) => '6',
}
}
}
impl RespCodes {
pub fn from_str(val: &str, extra: Option<String>) -> Option<Self> {
use RespCodes::*;
let res = match val.parse::<u8>() {
Ok(val) => match val {
0 => Okay,
1 => NotFound,
2 => OverwriteError,
3 => ActionError,
4 => PacketError,
5 => ServerError,
6 => OtherError(extra),
_ => return None,
},
Err(_) => return None,
};
Some(res)
}
pub fn from_u8(val: u8, extra: Option<String>) -> Option<Self> {
use RespCodes::*;
let res = match val {
0 => Okay,
1 => NotFound,
2 => OverwriteError,
3 => ActionError,
4 => PacketError,
5 => ServerError,
6 => OtherError(extra),
_ => return None,
};
Some(res)
}
pub fn from_utf8(val: u8) -> Option<Self> {
let result = match val.checked_sub(48) {
Some(r) => r,
None => return None,
};
if result > 6 {
return None;
}
return RespCodes::from_u8(result, None);
}
}
/// Prepare a query packet from a string of whitespace separated values
pub fn proc_query<T>(querystr: T) -> Vec<u8>
where
T: AsRef<str>,
{
let mut bytes = Vec::with_capacity(querystr.as_ref().len());
let args: Vec<String> = RE
.find_iter(&querystr.as_ref())
.map(|val| val.as_str().replace("'", "").replace("\"", "").to_owned())
.collect();
bytes.extend(b"#2\n*1\n#");
let arg_len_bytes = args.len().to_string().into_bytes();
let arg_len_bytes_len = (arg_len_bytes.len() + 1).to_string().into_bytes();
bytes.extend(arg_len_bytes_len);
bytes.extend(b"\n&");
bytes.extend(arg_len_bytes);
bytes.push(b'\n');
args.into_iter().for_each(|arg| {
bytes.push(b'#');
let len_bytes = arg.len().to_string().into_bytes();
bytes.extend(len_bytes);
bytes.push(b'\n');
bytes.extend(arg.as_bytes());
bytes.push(b'\n');
});
bytes
}
#[test]
fn test_queryproc() {
let query = "GET x y".to_owned();
assert_eq!(
"#2\n*1\n#2\n&3\n#3\nGET\n#1\nx\n#1\ny\n"
.as_bytes()
.to_owned(),
proc_query(query)
);
let q_escaped = proc_query(r#"SET X 'sayan with spaces'"#);
assert_eq!(
"#2\n*1\n#2\n&3\n#3\nSET\n#1\nX\n#17\nsayan with spaces\n"
.as_bytes()
.to_owned(),
q_escaped
);
}

@ -9,7 +9,10 @@ edition = "2018"
[dependencies]
rand = "0.8.3"
devtimer = "4.0.1"
libsky = {path="../libsky"}
clap = {version = "2.33.3", features=["yaml"]}
serde = {version="1.0.125", features=["derive"]}
skytable = { git = "https://github.com/skytable/client-rust", branch = "next" }
clap = { version = "2.33.3", features = ["yaml"] }
serde = { version = "1.0.125", features = ["derive"] }
serde_json = "1.0.64"
regex = "1.5.4"
lazy_static = "1.4.0"
libsky = {path = "../libsky"}

@ -31,7 +31,6 @@
mod benchtool {
use clap::{load_yaml, App};
use devtimer::DevTime;
use libsky::terrapipe;
use rand::distributions::Alphanumeric;
use rand::thread_rng;
use serde::Serialize;
@ -162,24 +161,26 @@ mod benchtool {
Some(h) => h.to_owned(),
None => "127.0.0.1".to_owned(),
};
host.push(':');
match matches.value_of("port") {
let port = match matches.value_of("port") {
Some(p) => match p.parse::<u16>() {
Ok(p) => host.push_str(&p.to_string()),
Ok(p) => p,
Err(_) => {
eprintln!("ERROR: Invalid port");
std::process::exit(0x100);
}
},
None => host.push_str("2003"),
}
None => 2003,
};
println!("Running a sanity test...");
// Run a ping test
if let Err(e) = sanity_test(&host) {
// Run a sanity test
if let Err(e) = sanity_test(&host, port) {
eprintln!("ERROR: Sanity test failed: {}\nBenchmark terminated", e);
return;
}
println!("Sanity test succeeded");
// now push in the port to the host string
host.push_str(":");
host.push_str(&port.to_string());
let mut rand = thread_rng();
if let Some(matches) = matches.subcommand_matches("testkey") {
let numkeys = matches.value_of("count").unwrap();
@ -195,7 +196,7 @@ mod benchtool {
.map(|_| ran_string(8, &mut rand))
.collect();
let set_packs: Vec<Vec<u8>> = (0..num)
.map(|idx| terrapipe::proc_query(format!("SET {} {}", keys[idx], values[idx])))
.map(|idx| libsky::into_raw_query(format!("SET {} {}", keys[idx], values[idx])))
.collect();
set_packs.into_iter().for_each(|packet| {
np.execute(packet);
@ -254,13 +255,13 @@ mod benchtool {
one of `set_packs`
*/
let set_packs: Vec<Vec<u8>> = (0..max_queries)
.map(|idx| terrapipe::proc_query(format!("SET {} {}", keys[idx], values[idx])))
.map(|idx| libsky::into_raw_query(format!("SET {} {}", keys[idx], values[idx])))
.collect();
let get_packs: Vec<Vec<u8>> = (0..max_queries)
.map(|idx| terrapipe::proc_query(format!("GET {}", keys[idx])))
.map(|idx| libsky::into_raw_query(format!("GET {}", keys[idx])))
.collect();
let del_packs: Vec<Vec<u8>> = (0..max_queries)
.map(|idx| terrapipe::proc_query(format!("DEL {}", keys[idx])))
.map(|idx| libsky::into_raw_query(format!("DEL {}", keys[idx])))
.collect();
eprintln!("Per-packet size (GET): {} bytes", get_packs[0].len());
eprintln!("Per-packet size (SET): {} bytes", set_packs[0].len());
@ -314,50 +315,51 @@ mod benchtool {
/// ## Limitations
/// A 65535 character long key/value pair is created and fetched. This random string has extremely low
/// chances of colliding with any existing key
fn sanity_test(host: &String) -> Result<(), Box<dyn Error>> {
let mut sock = TcpStream::connect(host)
.map_err(|e| format!("connection to host failed with error '{}'", e))?;
let query = terrapipe::proc_query("HEYA");
sock.write_all(&query)
.map_err(|e| format!("couldn't write data to socket with error '{}'", e))?;
let res_should_be = "#2\n*1\n#2\n&1\n+4\nHEY!\n".as_bytes().to_owned();
let mut response = vec![0; res_should_be.len()];
sock.read_exact(&mut response)
.map_err(|e| format!("couldn't read data from socket with error '{}'", e))?;
if response != res_should_be {
fn sanity_test(host: &str, port: u16) -> Result<(), Box<dyn Error>> {
use skytable::{Connection, Element, Query, RespCode, Response};
let mut rng = thread_rng();
let mut connection = Connection::new(host, port)?;
// test heya
let mut query = Query::new();
query.arg("heya");
if !connection
.run_simple_query(query)
.unwrap()
.eq(&Response::Item(Element::String("HEY!".to_owned())))
{
return Err("HEYA test failed".into());
}
let mut ran = thread_rng();
let key = ran_string(65535, &mut ran);
let value = ran_string(65535, &mut ran);
let query = terrapipe::proc_query(format!("SET {} {}", key, value));
sock.write_all(&query)
.map_err(|e| format!("couldn't write data to socket with error '{}'", e))?;
let res_should_be = "#2\n*1\n#2\n&1\n!1\n0\n".as_bytes().to_owned();
let mut response = vec![0; res_should_be.len()];
sock.read_exact(&mut response)
.map_err(|e| format!("couldn't read data from socket with error '{}'", e))?;
if response != res_should_be {
let key = ran_string(65536, &mut rng);
let value = ran_string(65536, &mut rng);
let mut query = Query::new();
query.arg("set");
query.arg(&key);
query.arg(&value);
if !connection
.run_simple_query(query)
.unwrap()
.eq(&Response::Item(Element::RespCode(RespCode::Okay)))
{
return Err("SET test failed".into());
}
let query = terrapipe::proc_query(format!("GET {}", key));
sock.write_all(&query)
.map_err(|e| format!("couldn't write data to socket with error '{}'", e))?;
let res_should_be = format!("#2\n*1\n#2\n&1\n+65535\n{}\n", value).into_bytes();
let mut response = vec![0; res_should_be.len()];
sock.read_exact(&mut response)
.map_err(|e| format!("couldn't read data from socket with error '{}'", e))?;
if response != res_should_be {
let mut query = Query::new();
query.arg("get");
query.arg(&key);
if !connection
.run_simple_query(query)
.unwrap()
.eq(&Response::Item(Element::String(value.to_owned())))
{
return Err("GET test failed".into());
}
let query = terrapipe::proc_query(format!("DEL {}", key));
sock.write_all(&query)
.map_err(|e| format!("couldn't write data to socket with error '{}'", e))?;
let res_should_be = "#2\n*1\n#2\n&1\n:1\n1\n".as_bytes().to_owned();
let mut response = vec![0; res_should_be.len()];
sock.read_exact(&mut response)
.map_err(|e| format!("couldn't read data from socket with error '{}'", e))?;
if response != res_should_be {
let mut query = Query::new();
query.arg("del");
query.arg(&key);
if !connection
.run_simple_query(query)
.unwrap()
.eq(&Response::Item(Element::UnsignedInt(1)))
{
return Err("DEL test failed".into());
}
Ok(())

Loading…
Cancel
Save