Implement client

However, the client has some propagation errors
next
Sayan Nandan 4 years ago
parent 35a86c96ac
commit 0f1dde109b
No known key found for this signature in database
GPG Key ID: C31EFD7DDA12AEE0

5
Cargo.lock generated

@ -269,7 +269,7 @@ dependencies = [
]
[[package]]
name = "terrabase"
name = "tdb"
version = "0.1.0"
dependencies = [
"bytes",
@ -315,6 +315,9 @@ dependencies = [
[[package]]
name = "tsh"
version = "0.1.0"
dependencies = [
"corelib",
]
[[package]]
name = "unicode-xid"

@ -10,7 +10,7 @@ As noted earlier, Terrabase is pre-alpha software and the entire API is subject
We have an experimental client and server implementation for the database already. You can download a pre-built binary for `x86_64-linux` in the releases section and try it out!
* First unzip the file
* Start the database server by running `./terrabase`
* Start the database server by running `./tdb`
* Start the client by running `./tsh`
* You can run commands like `SET sayan 17` , `GET cat` , `UPDATE cat 100` or `DEL cat` !

@ -1,9 +1,10 @@
[package]
name = "tsh"
version = "0.1.0"
authors = ["Sayan Nandan <nandansayan@outlook.com>"]
authors = ["Sayan Nandan <ohsayan@outlook.com>"]
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
[dependencies]
corelib = {path = "../corelib"}

@ -19,3 +19,107 @@
*
*/
use corelib::terrapipe::{self, DEF_QMETALINE_BUFSIZE};
use std::io::{self, prelude::*, BufReader};
use std::net::TcpStream;
use std::process;
const ADDR: &'static str = "127.0.0.1:2003";
pub fn execute_query() {
let mut connection = match TcpStream::connect(ADDR) {
Ok(c) => c,
Err(_) => {
eprintln!("ERROR: Couldn't connect to the TDB server");
process::exit(0x100);
}
};
loop {
print!("tsh>");
io::stdout()
.flush()
.expect("Couldn't flush buffer, this is a serious error!");
let mut rl = String::new();
io::stdin()
.read_line(&mut rl)
.expect("Couldn't read line, this is a serious error!");
let mut cmd = terrapipe::QueryBuilder::new_simple();
cmd.from_cmd(rl);
let (size, resp) = cmd.prepare_response();
match connection.write(&resp) {
Ok(n) => {
if n < size {
eprintln!("ERROR: Couldn't write all bytes to server");
process::exit(0x100);
}
},
Err(_) => {
eprintln!("ERROR: Couldn't send data to the TDB server");
process::exit(0x100);
}
}
println!("{}", parse_response(&connection));
}
}
pub fn parse_response(stream: &TcpStream) -> String {
let mut metaline = String::with_capacity(DEF_QMETALINE_BUFSIZE);
let mut bufreader = BufReader::new(stream);
match bufreader.read_line(&mut metaline) {
Ok(_) => (),
Err(_) => {
eprintln!("Couldn't read metaline from tdb server");
process::exit(0x100);
}
}
let metaline = metaline.trim_matches(char::from(0));
let fields: Vec<&str> = metaline.split('!').collect();
if let (Some(resptype), Some(respcode), Some(clength), Some(ml_length)) =
(fields.get(0), fields.get(1), fields.get(2), fields.get(3))
{
if *resptype == "$" {
todo!("Pipelined response deconding is yet to be implemented")
}
let mut is_err_response = false;
match respcode.to_owned() {
"0" => (),
"1" => return format!("ERROR: Couldn't find the requested key"),
"2" => return format!("ERROR: Can't overwrite existing value"),
"3" => return format!("ERROR: tsh sent an invalid metaframe"),
"4" => return format!("ERROR: tsh sent an incomplete query packet"),
"5" => return format!("ERROR: tdb had an internal server error"),
"6" => is_err_response = true,
_ => (),
}
if let (Ok(clength), Ok(ml_length)) = (clength.parse::<usize>(), ml_length.parse::<usize>())
{
let mut metalinebuf = String::with_capacity(ml_length);
let mut databuf = vec![0; clength];
bufreader.read_line(&mut metalinebuf).unwrap();
let sizes: Vec<usize> = metalinebuf
.split("#")
.map(|size| size.parse::<usize>().unwrap())
.collect();
bufreader.read(&mut databuf).unwrap();
eprintln!("{:?}", String::from_utf8_lossy(&databuf));
let res = extract_idents(databuf, sizes);
let resp: String = res.iter().flat_map(|s| s.chars()).collect();
if !is_err_response {
return resp;
} else {
return format!("ERROR: {}", resp);
}
}
}
format!("ERROR: The server sent an invalid response")
}
fn extract_idents(buf: Vec<u8>, skip_sequence: Vec<usize>) -> Vec<String> {
skip_sequence
.into_iter()
.scan(buf.into_iter(), |databuf, size| {
let tok: Vec<u8> = databuf.take(size).collect();
let _ = databuf.next();
// FIXME(@ohsayan): This is quite slow, we'll have to use SIMD in the future
Some(String::from_utf8_lossy(&tok).to_string())
})
.collect()
}

@ -19,15 +19,11 @@
*
*/
use std::io;
use std::io::prelude::*;
mod argparse;
const MSG_WELCOME: &'static str = "Terrabase | Version 0.1.0\nCopyright (c) 2020 Sayan Nandan";
const MSG_WELCOME: &'static str = "TerrabaseDB v0.1.0";
fn main() {
println!("{}", MSG_WELCOME);
loop {
}
argparse::execute_query();
}

@ -1,7 +1,7 @@
[package]
name = "corelib"
version = "0.1.0"
authors = ["Sayan Nandan <nandansayan@outlook.com>"]
authors = ["Sayan Nandan <ohsayan@outlook.com>"]
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

@ -24,7 +24,7 @@
/// Default query metaline buffer size
pub const DEF_QMETALINE_BUFSIZE: usize = 44;
/// Default query metalayout buffer size
pub const DEF_QMETALAYOUT_BUFSIZE: usize = 1024;
pub const DEF_QMETALAYOUT_BUFSIZE: usize = 576;
/// Default query dataframe buffer size
pub const DEF_QDATAFRAME_BUSIZE: usize = 4096;
pub mod tags {
@ -176,7 +176,7 @@ impl SimpleResponse {
/// for creating a `Vec<u8>` which can be written to a TCP stream
fn prepare_response(&self) -> Vec<u8> {
format!(
"{}!{}!{}\n{}\n{}",
"*!{}!{}!{}\n{}\n{}",
self.respcode,
self.size_tracker,
self.metalayout_buf.len(),
@ -211,3 +211,63 @@ fn test_simple_response() {
String::from("0!39!16\n5#5#3#2#3#4#4#5#\nSayan\nloves\nyou\nif\nyou\nsend\nUTF8\nbytes\n")
);
}
pub enum QueryBuilder {
SimpleQuery,
// TODO(@ohsayan): Add pipelined queries here
}
// TODO(@ohsayan): I think we should move the client stuff into a separate repo
// altogether to let users customize the client as they like and avoid licensing
// issues
impl QueryBuilder {
pub fn new_simple() -> SimpleQuery {
SimpleQuery::new()
}
}
pub struct SimpleQuery {
metaline: String,
metalayout: String,
dataframe: String,
size_tracker: usize,
}
impl SimpleQuery {
pub fn new() -> Self {
let mut metaline = String::with_capacity(DEF_QMETALINE_BUFSIZE);
metaline.push_str("*!");
SimpleQuery {
metaline,
size_tracker: 0,
metalayout: String::with_capacity(DEF_QMETALAYOUT_BUFSIZE),
dataframe: String::with_capacity(DEF_QDATAFRAME_BUSIZE),
}
}
pub fn add(&mut self, cmd: &str) {
let ref mut layout = self.metalayout;
let ref mut df = self.dataframe;
let len = cmd.len().to_string();
self.size_tracker += cmd.len() + 1;
layout.push_str(&len);
layout.push('#');
df.push_str(cmd);
df.push('\n');
}
pub fn from_cmd(&mut self, cmd: String) {
cmd.split_whitespace().for_each(|val| self.add(val));
}
pub fn prepare_response(&self) -> (usize, Vec<u8>) {
let resp = format!(
"{}{}!{}\n{}\n{}",
self.metaline,
self.size_tracker,
self.metalayout.len(),
self.metalayout,
self.dataframe
)
.as_bytes()
.to_owned();
(resp.len(), resp)
}
}

@ -1,7 +1,7 @@
[package]
name = "terrabase"
name = "tdb"
version = "0.1.0"
authors = ["Sayan Nandan <nandansayan@outlook.com>"]
authors = ["Sayan Nandan <ohsayan@outlook.com>"]
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

@ -58,6 +58,8 @@ impl Terminator {
}
}
// We'll use the idea of gracefully shutting down from tokio
pub struct Listener {
/// An atomic reference to the coretable
db: CoreDB,
@ -131,9 +133,10 @@ impl CHandler {
return;
}
};
eprintln!("{:?}", try_df);
match try_df {
Ok(df) => return self.con.write_response(self.db.execute_query(df)).await,
Err(e) => return self.con.close_conn_with_error(e).await,
Ok(df) => self.con.write_response(self.db.execute_query(df)).await,
Err(e) => self.con.close_conn_with_error(e).await,
}
}
}

@ -144,7 +144,7 @@ impl Connection {
pub fn new(stream: TcpStream) -> Self {
Connection { stream }
}
pub async fn read_query(&mut self) -> Result<QueryDataframe, impl RespBytes> {
pub async fn read_query(&mut self) -> Result<QueryDataframe, RespCodes> {
let mut bufreader = BufReader::new(&mut self.stream);
let mut metaline_buf = String::with_capacity(DEF_QMETALINE_BUFSIZE);
bufreader.read_line(&mut metaline_buf).await.unwrap();

Loading…
Cancel
Save