diff --git a/Cargo.lock b/Cargo.lock index 33870b2f..2b3322e8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -25,10 +25,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822" [[package]] -name = "devtimer" -version = "4.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6035b7b9244bf9637cd7ef80b5e1c54404bef92cccd34738c85c45f04ae8b244" +name = "corelib" +version = "0.1.0" +dependencies = [ + "lazy_static", +] [[package]] name = "fnv" @@ -58,17 +59,6 @@ version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "59f5fff90fd5d971f936ad674802482ba441b6f09ba5e15fd8b39145582ca399" -[[package]] -name = "getrandom" -version = "0.1.14" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7abc8dd8451921606d809ba32e95b6111925cd2906060d2dcc29c070220503eb" -dependencies = [ - "cfg-if", - "libc", - "wasi", -] - [[package]] name = "hermit-abi" version = "0.1.15" @@ -109,15 +99,6 @@ version = "0.2.72" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a9f8082297d534141b30c8d39e9b1773713ab50fdbe4ff30f750d063b3bfd701" -[[package]] -name = "libcore" -version = "0.1.0" -dependencies = [ - "devtimer", - "lazy_static", - "rand", -] - [[package]] name = "log" version = "0.4.8" @@ -224,12 +205,6 @@ version = "0.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "282adbf10f2698a7a77f8e983a74b2d18176c19a7fd32a45446139ae7b02b715" -[[package]] -name = "ppv-lite86" -version = "0.2.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "237a5ed80e274dbc66f86bd59c1e25edc039660be53194b5fe0a482e0f2612ea" - [[package]] name = "proc-macro2" version = "1.0.18" @@ -248,47 +223,6 @@ dependencies = [ "proc-macro2", ] -[[package]] -name = "rand" -version = "0.7.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6a6b1679d49b24bbfe0c803429aa1874472f50d9b363131f0e89fc356b544d03" -dependencies = [ - "getrandom", - "libc", - "rand_chacha", - "rand_core", - "rand_hc", -] - -[[package]] -name = "rand_chacha" -version = "0.2.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f4c8ed856279c9737206bf725bf36935d8666ead7aa69b52be55af369d193402" -dependencies = [ - "ppv-lite86", - "rand_core", -] - -[[package]] -name = "rand_core" -version = "0.5.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "90bde5296fc891b0cef12a6d03ddccc162ce7b2aff54160af9338f8d40df6d19" -dependencies = [ - "getrandom", -] - -[[package]] -name = "rand_hc" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ca3129af7b92a17112d59ad498c6f81eaf463253766b90396d39ea7a39d6613c" -dependencies = [ - "rand_core", -] - [[package]] name = "redox_syscall" version = "0.1.57" @@ -339,7 +273,7 @@ name = "terrabase" version = "0.1.0" dependencies = [ "bytes", - "libcore", + "corelib", "tokio", ] @@ -381,9 +315,6 @@ dependencies = [ [[package]] name = "tsh" version = "0.1.0" -dependencies = [ - "libcore", -] [[package]] name = "unicode-xid" @@ -391,12 +322,6 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f7fe0bb3479651439c9112f72b6c505038574c9fbb575ed1bf3b797fa39dd564" -[[package]] -name = "wasi" -version = "0.9.0+wasi-snapshot-preview1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cccddf32554fecc6acb585f82a32a72e28b48f8c4c1883ddfeeeaa96f7d8e519" - [[package]] name = "winapi" version = "0.2.8" diff --git a/Cargo.toml b/Cargo.toml index 2e6a3bb3..c3572043 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,5 +2,5 @@ members = [ "cli", "server", - "libcore" + "corelib" ] \ No newline at end of file diff --git a/cli/Cargo.toml b/cli/Cargo.toml index 1fc3f6f1..e1dcec08 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -6,5 +6,4 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html -[dependencies] -libcore = {path="../libcore"} \ No newline at end of file +[dependencies] \ No newline at end of file diff --git a/cli/src/argparse.rs b/cli/src/argparse.rs index 0dabb4fa..26a5985c 100644 --- a/cli/src/argparse.rs +++ b/cli/src/argparse.rs @@ -19,160 +19,3 @@ * */ -use libcore::terrapipe::DEF_R_META_BUFSIZE; -use libcore::terrapipe::{Dataframe, ResponseCodes::*, ResultMetaframe, Version, QUERY_PACKET}; -use std::io::{BufRead, BufReader, Read, Write}; -use std::net::TcpStream; -use std::process; - -const ARG_GET: &'static str = "GET"; -const ARG_SET: &'static str = "SET"; -const ARG_UPDATE: &'static str = "UPDATE"; -const ARG_DEL: &'static str = "DEL"; -const ARG_EXIT: &'static str = "EXIT"; -const ERR_MULTIPLE_GET: &'static str = "Multiple GETs aren't supported yet"; -const ERR_MULTIPLE_SET: &'static str = "Multiple SETs aren't supported yet"; -const ERR_MULTIPLE_UPDATE: &'static str = "Multiple UPDATEs aren't supported yet"; -const ERR_MULTIPLE_DEL: &'static str = "Multiple DELs aren't supported yet"; -const SELF_VERSION: Version = Version(0, 1, 0); -const ADDR: &'static str = "127.0.0.1:2003"; -pub const EXIT_ERROR: fn(error: &str) -> ! = |error| { - eprintln!("error: {}", error); - process::exit(0x100); -}; - -const NORMAL_ERROR: fn(error: &str) = |error| { - eprintln!("error: {}", error); -}; - -pub fn run(args: String) { - let args: Vec<&str> = args.split_whitespace().collect(); - match args[0].to_uppercase().as_str() { - ARG_GET => { - if let Some(key) = args.get(1) { - if args.get(2).is_none() { - send_query(QUERY_PACKET( - SELF_VERSION, - ARG_GET.to_owned(), - key.to_string(), - )); - } else { - NORMAL_ERROR(ERR_MULTIPLE_GET); - } - } else { - NORMAL_ERROR("Expected one more argument"); - } - } - ARG_SET => { - if let Some(key) = args.get(1) { - if let Some(value) = args.get(2) { - if args.get(3).is_none() { - send_query(QUERY_PACKET( - SELF_VERSION, - ARG_SET.to_owned(), - format!("{} {}", key, value), - )) - } else { - NORMAL_ERROR(ERR_MULTIPLE_SET); - } - } else { - NORMAL_ERROR("Expected one more argument"); - } - } else { - NORMAL_ERROR("Expected more arguments"); - } - } - ARG_UPDATE => { - if let Some(key) = args.get(1) { - if let Some(value) = args.get(2) { - if args.get(3).is_none() { - send_query(QUERY_PACKET( - SELF_VERSION, - ARG_UPDATE.to_owned(), - format!("{} {}", key, value), - )) - } else { - NORMAL_ERROR(ERR_MULTIPLE_UPDATE); - } - } else { - NORMAL_ERROR("Expected one more argument"); - } - } else { - NORMAL_ERROR("Expected more arguments"); - } - } - ARG_DEL => { - if let Some(key) = args.get(1) { - if args.get(2).is_none() { - send_query(QUERY_PACKET( - SELF_VERSION, - ARG_DEL.to_owned(), - key.to_string(), - )); - } else { - NORMAL_ERROR(ERR_MULTIPLE_DEL); - } - } else { - NORMAL_ERROR("Expected one more argument"); - } - } - ARG_EXIT => { - println!("Goodbye!"); - process::exit(0x100) - } - _ => NORMAL_ERROR("Unknown command"), - } -} - -fn send_query(query: Vec) { - let mut binding = match TcpStream::connect(ADDR) { - Ok(b) => b, - Err(_) => EXIT_ERROR("Couldn't connect to Terrabase"), - }; - match binding.write(&query) { - Ok(_) => (), - Err(_) => EXIT_ERROR("Couldn't read data from socket"), - } - let mut bufreader = BufReader::new(binding); - let mut buf = String::with_capacity(DEF_R_META_BUFSIZE); - match bufreader.read_line(&mut buf) { - Ok(_) => (), - Err(_) => EXIT_ERROR("Failed to read line from socket"), - } - let rmf = match ResultMetaframe::from_buffer(buf) { - Ok(mf) => mf, - Err(e) => { - NORMAL_ERROR(&e.to_string()); - return; - } - }; - match &rmf.response { - Okay(_) => (), - x @ _ => { - NORMAL_ERROR(&x.to_string()); - return; - } - } - let mut data_buffer = vec![0; rmf.get_content_size()]; - match bufreader.read(&mut data_buffer) { - Ok(_) => (), - Err(_) => EXIT_ERROR("Failed to read line from socket"), - } - let df = match Dataframe::from_buffer(rmf.get_content_size(), data_buffer) { - Ok(d) => d, - Err(e) => { - NORMAL_ERROR(&e.to_string()); - return; - } - }; - let res = df.deflatten(); - if res.len() == 0 { - return; - } else { - if res.len() == 1 { - println!("{}", res[0]); - } else { - println!("{:?}", res); - } - } -} diff --git a/cli/src/main.rs b/cli/src/main.rs index 7d83bdc7..c675bb6d 100644 --- a/cli/src/main.rs +++ b/cli/src/main.rs @@ -28,20 +28,6 @@ const MSG_WELCOME: &'static str = "Terrabase | Version 0.1.0\nCopyright (c) 2020 fn main() { println!("{}", MSG_WELCOME); loop { - let mut buffer = String::new(); - print!("tsh> "); - match io::stdout().flush() { - Ok(_) => (), - Err(_) => argparse::EXIT_ERROR("Failed to flush output stream"), - }; - match io::stdin().read_line(&mut buffer) { - Ok(_) => (), - Err(_) => argparse::EXIT_ERROR("Failed to read line and append to buffer"), - }; - if buffer.len() != 0 { - argparse::run(buffer); - } else { - continue; - } + } } diff --git a/libcore/Cargo.toml b/corelib/Cargo.toml similarity index 69% rename from libcore/Cargo.toml rename to corelib/Cargo.toml index 69b26c75..f688b3ff 100644 --- a/libcore/Cargo.toml +++ b/corelib/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "libcore" +name = "corelib" version = "0.1.0" authors = ["Sayan Nandan "] edition = "2018" @@ -7,9 +7,4 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -lazy_static = "1.4.0" - - -[dev-dependencies] -devtimer = "4.0.0" -rand = "0.7.3" \ No newline at end of file +lazy_static = "1.4.0" \ No newline at end of file diff --git a/corelib/src/lib.rs b/corelib/src/lib.rs new file mode 100644 index 00000000..ed3112b5 --- /dev/null +++ b/corelib/src/lib.rs @@ -0,0 +1,55 @@ +/* + * Created on Sat Jul 18 2020 + * + * This file is a part of the source code for the Terrabase database + * Copyright (c) 2020, Sayan Nandan + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * +*/ + +pub const DEF_QMETA_BUFSIZE: usize = 44; + +pub mod responses { + use lazy_static::lazy_static; + lazy_static! { + pub static ref RESP_OKAY_EMPTY: Vec = "0!0!0#".as_bytes().to_owned(); + pub static ref RESP_NOT_FOUND: Vec = "1!0!0#".as_bytes().to_owned(); + pub static ref RESP_OVERWRITE_ERROR: Vec = "2!0!0#".as_bytes().to_owned(); + pub static ref RESP_INVALID_MF: Vec = "3!0!0#".as_bytes().to_owned(); + pub static ref RESP_INCOMPLETE: Vec = "4!0!0#".as_bytes().to_owned(); + pub static ref RESP_SERVER_ERROR: Vec = "5!0!0#".as_bytes().to_owned(); + } +} + +#[derive(Debug, PartialEq)] +pub enum RespCodes { + Okay(Option), + NotFound, + OverwriteError, + InvalidMetaframe, + Incomplete, + ServerError, + OtherError(Option), +} + +#[derive(Debug, PartialEq)] +pub enum ActionType { + Simple, + Pipeline, +} + +pub trait Response { + fn into_response(&self) -> Vec; +} diff --git a/libcore/src/lib.rs b/libcore/src/lib.rs deleted file mode 100644 index 0d0c9e9d..00000000 --- a/libcore/src/lib.rs +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Created on Thu Jul 02 2020 - * - * This file is a part of the source code for the Terrabase database - * Copyright (c) 2020, Sayan Nandan - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - * -*/ - -pub mod terrapipe; -pub use terrapipe::SELF_VERSION; \ No newline at end of file diff --git a/libcore/src/terrapipe.rs b/libcore/src/terrapipe.rs deleted file mode 100644 index 606a6df7..00000000 --- a/libcore/src/terrapipe.rs +++ /dev/null @@ -1,438 +0,0 @@ -/* - * Created on Thu Jul 02 2020 - * - * This file is a part of the source code for the Terrabase database - * Copyright (c) 2020, Sayan Nandan - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - * -*/ -#![allow(unused)] - -//! This implements the Terrapipe protocol -use lazy_static::lazy_static; -use std::fmt; -use std::mem; - -/// Current version of the terrapipe protocol -pub const SELF_VERSION: Version = Version(0, 1, 0); -/// Metaframe protocol tag -pub const MF_PROTOCOL_TAG: &'static str = "TP"; -/// Metaframe query tag -pub const MF_QUERY_TAG: &'static str = "Q"; -/// Metaframe response tag -pub const MF_RESPONSE_TAG: &'static str = "R"; -/// Metaframe separator ("/") -pub const MF_SEPARATOR: &'static str = "/"; -/// Metaframe `GET` tag -pub const MF_METHOD_GET: &'static str = "GET"; -/// Metaframe `SET` tag -pub const MF_METHOD_SET: &'static str = "SET"; -/// Metaframe `UPDATE` tag -pub const MF_METHOD_UPDATE: &'static str = "UPDATE"; -/// Metaframe `DEL` tag -pub const MF_METHOD_DEL: &'static str = "DEL"; -/// ## The default buffer size for the query metaframe -/// This currently enables sizes upto 2^64 to be handled. Since -/// `floor(log10(2^64))+1` = 20 digits and the maximum size of the query -/// metaframe __currently__ is 26 - the buffer size, should be 46 -pub const DEF_Q_META_BUFSIZE: usize = 46; -/// ## The default buffer size for the response metaframe -/// This currently enables sizes upto 2^64 to be handled. Since 2^64 has 20 digits -/// and the maximum size of the response metaframe is 20 - the buffer size is kept at 40 -pub const DEF_R_META_BUFSIZE: usize = 40; - -// HACK(@ohsayan) This is a temporary workaround since `const fn`s don't support `match` yet -/// Constant function to generate a response packet -pub const RESPONSE_PACKET: fn(version: Version, respcode: u8, data: &str) -> Vec = - |version, respcode, data| { - let res = format!( - "TP/{}.{}.{}/R/{}/{}\n{}", - version.0, - version.1, - version.2, - respcode, - data.len(), - data, - ); - res.as_bytes().to_vec() - }; - -// HACK(@ohsayan) This is a temporary workaround since `const fn`s don't support `match` yet -/// Constant function to generate a query packet -pub const QUERY_PACKET: fn(version: Version, method: String, data: String) -> Vec = - |version, method, data| { - let res = format!( - "TP/{}.{}.{}/Q/{}/{}\n{}", - version.0, - version.1, - version.2, - method, - data.len(), - data - ); - res.as_bytes().to_vec() - }; - -// Evaluate the common error packets at compile-time -lazy_static! { - /// Success: empty response - static ref RESP_OKAY_EMPTY: Vec = RESPONSE_PACKET(SELF_VERSION, 0, ""); - /// Error response when the target is not found - static ref RESP_NOT_FOUND: Vec = RESPONSE_PACKET(SELF_VERSION, 1, ""); - /// Error response when the target key already exists and cannot be overwritten - static ref RESP_OVERWRITE_ERROR: Vec = RESPONSE_PACKET(SELF_VERSION, 2, ""); - /// Error response when the method in the query is not allowed/deprecated/not supported yet - static ref RESP_METHOD_NOT_ALLOWED: Vec = RESPONSE_PACKET(SELF_VERSION, 3, ""); - /// Error response when the server has a problem processing the response - static ref RESP_INTERNAL_SERVER_ERROR: Vec = RESPONSE_PACKET(SELF_VERSION, 4, ""); - /// Error response when the metaframe contains invalid tokens - static ref RESP_INVALID_MF: Vec = RESPONSE_PACKET(SELF_VERSION, 5, ""); - /// Error response when the dataframe doesn't contain the expected bytes - static ref RESP_CORRUPT_DF: Vec = RESPONSE_PACKET(SELF_VERSION, 6, ""); - /// Error response when the protocol used by the client is not supported by the server - static ref RESP_PROTOCOL_VERSION_MISMATCH: Vec = RESPONSE_PACKET(SELF_VERSION, 7, ""); - /// Error response when the query packet is missing the basic information, usually a newline - static ref RESP_CORRUPT_PACKET: Vec = RESPONSE_PACKET(SELF_VERSION, 8, ""); -} - -/// A minimal _Semver_ implementation -pub struct Version(pub u8, pub u8, pub u8); - -impl Version { - /// Create a new `Version` instance from an `&str` - /// ## Errors - /// Returns `None` when the string passed isn't in the correct format - pub fn from_str<'a>(vstr: &'a str) -> Option { - let vstr: Vec<&str> = vstr.split(".").collect(); - if vstr.len() != 3 { - return None; - } - if let (Ok(major), Ok(minor), Ok(patch)) = ( - vstr[0].parse::(), - vstr[1].parse::(), - vstr[2].parse::(), - ) { - Some(Version(major, minor, patch)) - } else { - None - } - } - /// Check if `other` is compatible with the current version as required - /// by _Semver_ - pub fn incompatible_with(&self, other: &Version) -> bool { - if self.0 == other.0 { - false - } else { - true - } - } -} - -#[derive(Debug)] -/// Response codes which are returned by the server -pub enum ResponseCodes { - /// `0` : Okay - Okay(Option), - /// `1` : Not Found - NotFound, - /// `2` : Overwrite Error - OverwriteError, - /// `3` : Method Not Allowed - MethodNotAllowed, - /// `4` : Internal Server Error - InternalServerError, - /// `5` : Invalid Metaframe - InvalidMetaframe, - /// `6` : Corrupt Dataframe - CorruptDataframe, - /// `7` : Protocol Version Mismatch - ProtocolVersionMismatch, - /// `8` : Corrupt Packet - CorruptPacket, -} - -impl fmt::Display for ResponseCodes { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - use ResponseCodes::*; - match self { - Okay(v) => { - if let Some(v) = v { - write!(f, "{}", v) - } else { - write!(f, "") - } - } - NotFound => write!(f, "The target could not be found"), - OverwriteError => write!(f, "Existing values cannot be overwritten"), - MethodNotAllowed => write!(f, "The method is not supported"), - InternalServerError => write!(f, "An internal server error occurred"), - InvalidMetaframe => write!(f, "The query had an invalid metaframe"), - CorruptDataframe => write!(f, "The query did not contain the required data"), - ProtocolVersionMismatch => write!( - f, - "The server doesn't support the protocol being used" - ), - CorruptPacket => write!(f, "The query did not have the required data"), - } - } -} - -impl ResponseCodes { - /// Instantiate a new `ResponseCodes` variant from an `u8` value - /// ## Errors - /// Returns `None` when the `u8` doesn't correspond to any of the - /// response codes - pub fn from_u8(code: u8) -> Option { - use ResponseCodes::*; - let c = match code { - 0 => Okay(None), - 1 => NotFound, - 2 => OverwriteError, - 3 => MethodNotAllowed, - 4 => InternalServerError, - 5 => InvalidMetaframe, - 6 => CorruptDataframe, - 7 => ProtocolVersionMismatch, - 8 => CorruptPacket, - _ => return None, - }; - Some(c) - } -} - -/// Any object implementing this trait can be converted into a response -pub trait ResponseBytes { - /// Return a `Vec` with the response - fn response_bytes(&self) -> Vec; -} - -impl ResponseBytes for ResponseCodes { - fn response_bytes(&self) -> Vec { - use ResponseCodes::*; - match self { - Okay(val) => { - if let Some(dat) = val { - RESPONSE_PACKET(SELF_VERSION, 0, dat) - } else { - RESP_OKAY_EMPTY.to_vec() - } - } - NotFound => RESP_NOT_FOUND.to_vec(), - OverwriteError => RESP_OVERWRITE_ERROR.to_vec(), - MethodNotAllowed => RESP_METHOD_NOT_ALLOWED.to_vec(), - InternalServerError => RESP_INTERNAL_SERVER_ERROR.to_vec(), - InvalidMetaframe => RESP_INVALID_MF.to_vec(), - CorruptDataframe => RESP_CORRUPT_DF.to_vec(), - ProtocolVersionMismatch => RESP_PROTOCOL_VERSION_MISMATCH.to_vec(), - CorruptPacket => RESP_CORRUPT_PACKET.to_vec(), - } - } -} - -/// Query methods that can be used by the client -#[derive(Debug, PartialEq)] -pub enum QueryMethod { - /// A `GET` query - GET, - /// A `SET` query - SET, - /// An `UPDATE` query - UPDATE, - /// A `DEL` query - DEL, -} - -/// The query metaframe -#[derive(Debug, PartialEq)] -pub struct QueryMetaframe { - /// The content size that is to be read by the server, from the data packet - content_size: usize, - /// The query method that the client has issued to the server - method: QueryMethod, -} - -impl QueryMetaframe { - /// Create a query metaframe instance from a `String` buffer - /// ## Errors - /// Returns `ResponseCodes` which dictate what error has occurred - pub fn from_buffer(buf: &String) -> Result { - let mf_parts: Vec<&str> = buf.split(MF_SEPARATOR).collect(); - if mf_parts.len() != 5 { - return Err(ResponseCodes::InvalidMetaframe); - } - if mf_parts[0] != MF_PROTOCOL_TAG || mf_parts[2] != MF_QUERY_TAG { - return Err(ResponseCodes::InvalidMetaframe); - } - let version = match Version::from_str(&mf_parts[1]) { - None => return Err(ResponseCodes::InvalidMetaframe), - Some(v) => v, - }; - if SELF_VERSION.incompatible_with(&version) { - return Err(ResponseCodes::ProtocolVersionMismatch); - } - // The size may have extra code point 0s - remove them - let cs = mf_parts[4].trim_matches(char::from(0)).trim(); - let content_size = match cs.parse::() { - Ok(csize) => csize, - Err(e) => { - eprintln!("Errored: {}", e); - return Err(ResponseCodes::InvalidMetaframe); - } - }; - let method = match mf_parts[3] { - MF_METHOD_GET => QueryMethod::GET, - MF_METHOD_SET => QueryMethod::SET, - MF_METHOD_UPDATE => QueryMethod::UPDATE, - MF_METHOD_DEL => QueryMethod::DEL, - _ => return Err(ResponseCodes::MethodNotAllowed), - }; - - Ok(QueryMetaframe { - content_size, - method, - }) - } - /// Get the number of bytes that the server should expect from the dataframe - pub fn get_content_size(&self) -> usize { - self.content_size - } - /// Get the method to be used - pub fn get_method(&self) -> &QueryMethod { - &self.method - } -} - -/// A `Dataframe` is simply treated as a blob which contains bytes in the form -/// of a `String` -#[derive(Debug)] -pub struct Dataframe(String); - -impl Dataframe { - /// Create a new `Dataframe` instance from a `Vec` buffer - /// ## Errors - /// When the `target_size` is not the same as the size of the buffer, this - /// returns a `CorruptDataframe` response code - pub fn from_buffer(target_size: usize, buffer: Vec) -> Result { - let buffer = String::from_utf8_lossy(&buffer); - let buffer = buffer.trim(); - if buffer.len() != target_size { - return Err(ResponseCodes::CorruptDataframe); - } - Ok(Dataframe(buffer.to_string())) - } - /// Deflatten the dataframe into a `Vec` of actions/identifiers/bytes - pub fn deflatten(&self) -> Vec<&str> { - self.0.split_whitespace().collect() - } -} - -#[cfg(test)] -#[test] -fn test_metaframe() { - let v = Version(0, 1, 0); - let mut goodframe = String::from("TP/0.1.0/Q/GET/5"); - let res = QueryMetaframe::from_buffer(&goodframe); - let mf_should_be = QueryMetaframe { - content_size: 5, - method: QueryMethod::GET, - }; - assert_eq!(res.ok().unwrap(), mf_should_be); -} - -#[cfg(test)] -#[test] -fn benchmark_metaframe_parsing() { - let v = Version(0, 1, 0); - use devtimer::run_benchmark; - use rand::Rng; - let mut rng = rand::thread_rng(); - let mut metaframes: Vec = Vec::with_capacity(50000); - (0..50000).for_each(|_| { - let s = rng.gen_range(0, usize::MAX); - let mut buf = format!("TP/0.1.0/Q/GET/5/{}", s); - metaframes.push(buf); - }); - let b = run_benchmark(50000, |n| { - let _ = QueryMetaframe::from_buffer(&metaframes[n]).ok().unwrap(); - }); - b.print_stats(); -} - -impl fmt::Display for ResultError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - use ResultError::*; - match self { - StandardError(r) => write!(f, "{}", r), - UnknownError(u) => write!(f, "The server responded with '{}'", u), - } - } -} - -#[derive(Debug)] -/// Errors that may occur when parsing a response packet from the server -pub enum ResultError { - /// A standard response code used by the Terrapipe protocol - StandardError(ResponseCodes), - /// Some nonsense response code that may be returned by a buggy or patched server - UnknownError(String), -} - -#[derive(Debug)] -/// A result metaframe -pub struct ResultMetaframe { - content_size: usize, - pub response: ResponseCodes, -} - -impl ResultMetaframe { - /// Instantiate a new metaframe from a `String` buffer - /// ## Errors - /// Returns a `ResultError` in case something happened while parsing the - /// response packet's metaframe - pub fn from_buffer(buf: String) -> Result { - use ResultError::*; - let mf_parts = buf.trim(); - let mf_parts: Vec<&str> = mf_parts.split("/").collect(); - if mf_parts.len() != 5 { - return Err(StandardError(ResponseCodes::InvalidMetaframe)); - } - - if mf_parts[0] != MF_PROTOCOL_TAG && mf_parts[2] != MF_RESPONSE_TAG { - return Err(StandardError(ResponseCodes::InvalidMetaframe)); - } - - let response = match mf_parts[3].parse::() { - Ok(r) => match ResponseCodes::from_u8(r) { - Some(rcode) => rcode, - None => return Err(UnknownError(mf_parts[3].to_owned())), - }, - Err(_) => return Err(UnknownError(mf_parts[3].to_owned())), - }; - // The size may have extra code point 0s - remove them - let cs = mf_parts[4].trim_matches(char::from(0)); - let content_size = match cs.parse::() { - Ok(csize) => csize, - Err(_) => return Err(StandardError(ResponseCodes::InvalidMetaframe)), - }; - - Ok(ResultMetaframe { - content_size, - response, - }) - } - pub fn get_content_size(&self) -> usize { - self.content_size - } -} diff --git a/server/Cargo.toml b/server/Cargo.toml index 3bb279e9..a47af9e0 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -8,6 +8,5 @@ edition = "2018" [dependencies] tokio = { version = "0.2.21", features = ["full"] } -# Import `libcore` which contains code for terrapipe and other utils -libcore = {path="../libcore"} -bytes = "0.5.6" \ No newline at end of file +bytes = "0.5.6" +corelib = {path ="../corelib"} \ No newline at end of file diff --git a/server/src/coredb.rs b/server/src/coredb.rs index 2e9f7755..d8e78794 100644 --- a/server/src/coredb.rs +++ b/server/src/coredb.rs @@ -19,66 +19,3 @@ * */ -use libcore::terrapipe::ResponseCodes; -use std::collections::{hash_map::Entry, HashMap}; -use std::sync::{Arc, RwLock}; - -pub struct CoreDB { - shared: Arc, -} - -pub struct Coretable { - coremap: RwLock>, -} - -impl Coretable { - pub fn get(&self, key: &str) -> Result { - if let Some(value) = self.coremap.read().unwrap().get(key) { - Ok(value.to_string()) - } else { - Err(ResponseCodes::NotFound) - } - } - pub fn set(&self, key: &str, value: &str) -> Result<(), ResponseCodes> { - match self.coremap.write().unwrap().entry(key.to_string()) { - Entry::Occupied(_) => return Err(ResponseCodes::OverwriteError), - Entry::Vacant(e) => { - let _ = e.insert(value.to_string()); - Ok(()) - } - } - } - pub fn update(&self, key: &str, value: &str) -> Result<(), ResponseCodes> { - match self.coremap.write().unwrap().entry(key.to_string()) { - Entry::Occupied(ref mut e) => { - e.insert(value.to_string()); - Ok(()) - } - Entry::Vacant(_) => Err(ResponseCodes::NotFound), - } - } - pub fn del(&self, key: &str) -> Result<(), ResponseCodes> { - if let Some(_) = self.coremap.write().unwrap().remove(&key.to_owned()) { - Ok(()) - } else { - Err(ResponseCodes::NotFound) - } - } - #[cfg(debug)] - pub fn print_debug_table(&self) { - println!("{:#?}", *self.coremap.read().unwrap()); - } -} - -impl CoreDB { - pub fn new() -> Self { - CoreDB { - shared: Arc::new(Coretable { - coremap: RwLock::new(HashMap::new()), - }), - } - } - pub fn get_handle(&self) -> Arc { - Arc::clone(&self.shared) - } -} diff --git a/server/src/dbnet.rs b/server/src/dbnet.rs deleted file mode 100644 index 76aca6e2..00000000 --- a/server/src/dbnet.rs +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Created on Mon Jul 13 2020 - * - * This file is a part of the source code for the Terrabase database - * Copyright (c) 2020, Sayan Nandan - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - * -*/ - -use libcore::terrapipe::{ - Dataframe, QueryMetaframe, ResponseBytes, ResponseCodes, DEF_Q_META_BUFSIZE, -}; -use tokio::io::{self, AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader}; -use tokio::net::TcpStream; -use tokio::time::{self, timeout}; - -pub struct Connection { - stream: TcpStream, -} - -pub struct Query { - qmf: QueryMetaframe, - df: Dataframe, -} - -impl Connection { - pub fn new(stream: TcpStream) -> Self { - Connection { stream } - } - pub async fn get_query_packet(&mut self) -> Result { - let mut meta_buffer = String::with_capacity(DEF_Q_META_BUFSIZE); - let mut bufreader = BufReader::new(&mut self.stream); - match bufreader.read_line(&mut meta_buffer).await { - Ok(_) => (), - Err(_) => { - return Err(ResponseCodes::InternalServerError); - } - } - let qmf = match QueryMetaframe::from_buffer(&meta_buffer) { - Ok(qmf) => qmf, - Err(e) => return Err(e), - }; - let mut data_buffer = vec![0; qmf.get_content_size()]; - match timeout( - time::Duration::from_millis(400), - bufreader.read(&mut data_buffer), - ) - .await - { - Ok(_) => (), - Err(_) => return Err(ResponseCodes::InternalServerError), - } - let df = match Dataframe::from_buffer(qmf.get_content_size(), data_buffer) { - Ok(d) => d, - Err(e) => return Err(e), - }; - Ok(Query { qmf, df }) - } - pub async fn write_response_packet(&mut self, resp: Vec) -> io::Result<()> { - self.stream.write_all(&resp).await - } -} diff --git a/server/src/main.rs b/server/src/main.rs index e778c664..5eb21d4f 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -23,107 +23,22 @@ use tokio::io::{AsyncBufReadExt, BufReader}; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::{TcpListener, TcpStream}; +mod protocol; mod coredb; -mod dbnet; use std::cmp::Ordering; use std::sync::Arc; -// Internal modules -use libcore::terrapipe::{ - Dataframe, QueryMetaframe, QueryMethod, ResponseBytes, ResponseCodes, Version, - DEF_Q_META_BUFSIZE, -}; - static ADDR: &'static str = "127.0.0.1:2003"; #[tokio::main] async fn main() { let mut listener = TcpListener::bind(ADDR).await.unwrap(); println!("Server running on terrapipe://127.0.0.1:2003"); - let db = coredb::CoreDB::new(); loop { - let handle = db.get_handle(); let (mut socket, _) = listener.accept().await.unwrap(); - tokio::spawn(async move { - let mut meta_buffer = String::with_capacity(DEF_Q_META_BUFSIZE); - let mut reader = BufReader::with_capacity(DEF_Q_META_BUFSIZE, &mut socket); - reader.read_line(&mut meta_buffer).await.unwrap(); - let mf = match QueryMetaframe::from_buffer(&meta_buffer) { - Ok(m) => m, - Err(e) => return close_conn_with_error(socket, e.response_bytes()).await, - }; - let mut data_buffer = vec![0; mf.get_content_size()]; - reader.read(&mut data_buffer).await.unwrap(); - let df = match Dataframe::from_buffer(mf.get_content_size(), data_buffer) { - Ok(d) => d, - Err(e) => return close_conn_with_error(socket, e.response_bytes()).await, - }; - return execute_query(socket, handle, mf, df).await; - }); + tokio::spawn(async move {}); } } async fn close_conn_with_error(mut stream: TcpStream, bytes: Vec) { stream.write_all(&bytes).await.unwrap() } - -async fn execute_query( - mut stream: TcpStream, - handle: Arc, - mf: QueryMetaframe, - df: Dataframe, -) { - let vars = df.deflatten(); - use QueryMethod::*; - match mf.get_method() { - GET => { - let result = match vars.len().cmp(&1) { - Ordering::Equal => { - if let Ok(v) = handle.get(vars[0]) { - ResponseCodes::Okay(Some(v.to_string())) - } else { - ResponseCodes::NotFound - } - } - _ => ResponseCodes::CorruptDataframe, - }; - stream.write(&result.response_bytes()).await.unwrap(); - } - SET => { - let result = match vars.len().cmp(&2) { - Ordering::Equal => match handle.set(vars[0], vars[1]) { - Ok(_) => ResponseCodes::Okay(None), - Err(e) => e, - }, - _ => ResponseCodes::CorruptDataframe, - }; - #[cfg(debug)] - handle.print_debug_table(); - stream.write(&result.response_bytes()).await.unwrap(); - } - UPDATE => { - let result = match vars.len().cmp(&2) { - Ordering::Equal => match handle.update(vars[0], vars[1]) { - Ok(_) => ResponseCodes::Okay(None), - Err(e) => e, - }, - _ => ResponseCodes::CorruptDataframe, - }; - #[cfg(debug)] - handle.print_debug_table(); - stream.write(&result.response_bytes()).await.unwrap(); - } - DEL => { - let result = match vars.len().cmp(&1) { - Ordering::Equal => match handle.del(vars[0]) { - Ok(_) => ResponseCodes::Okay(None), - Err(e) => e, - }, - _ => ResponseCodes::CorruptDataframe, - }; - #[cfg(debug)] - handle.print_debug_table(); - - stream.write(&result.response_bytes()).await.unwrap(); - } - } -} diff --git a/server/src/protocol.rs b/server/src/protocol.rs new file mode 100644 index 00000000..1732eebd --- /dev/null +++ b/server/src/protocol.rs @@ -0,0 +1,108 @@ +/* + * Created on Sat Jul 18 2020 + * + * This file is a part of the source code for the Terrabase database + * Copyright (c) 2020, Sayan Nandan + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * +*/ + +use bytes::{Buf, Bytes}; +use corelib::responses; +use corelib::ActionType; +use std::io::{BufRead, Cursor}; + +#[derive(Debug, PartialEq)] +pub struct PreQMF { + action_type: ActionType, + content_size: usize, + metaline_size: usize, +} + +impl PreQMF { + pub fn from_buffer(buf: String) -> Result> { + let buf: Vec<&str> = buf.split('!').collect(); + if let (Some(atype), Some(csize), Some(metaline_size)) = + (buf.get(0), buf.get(1), buf.get(2)) + { + if let Some(atype) = atype.chars().next() { + let atype = match atype { + '+' => ActionType::Simple, + '$' => ActionType::Pipeline, + _ => return Err(responses::RESP_INVALID_MF.to_owned()), + }; + let (csize, metaline_size) = + match (csize.parse::(), metaline_size.parse::()) { + (Ok(x), Ok(y)) => (x, y), + _ => return Err(responses::RESP_INVALID_MF.to_owned()), + }; + return Ok(PreQMF { + action_type: atype, + content_size: csize, + metaline_size, + }); + } + } + Err(responses::RESP_INVALID_MF.to_owned()) + } +} + +#[cfg(test)] +#[test] +fn test_preqmf() { + let read_what = "+!12!4".to_owned(); + let preqmf = PreQMF::from_buffer(read_what).unwrap(); + let pqmf_should_be = PreQMF { + action_type: ActionType::Simple, + content_size: 12, + metaline_size: 4, + }; + assert_eq!(pqmf_should_be, preqmf); + let a_pipe = "$!12!4".to_owned(); + let preqmf = PreQMF::from_buffer(a_pipe).unwrap(); + let pqmf_should_be = PreQMF { + action_type: ActionType::Pipeline, + content_size: 12, + metaline_size: 4, + }; + assert_eq!(preqmf, pqmf_should_be); +} + +pub fn get_sizes(stream: String) -> Result, Vec> { + let sstr: Vec<&str> = stream.split('#').collect(); + let mut sstr_iter = sstr.into_iter().peekable(); + let mut sizes = Vec::with_capacity(sstr_iter.len()); + while let Some(size) = sstr_iter.next() { + if sstr_iter.peek().is_some() { + // Skip the last element + if let Ok(val) = size.parse::() { + sizes.push(val); + } else { + return Err(responses::RESP_INVALID_MF.to_owned()); + } + } else { + break; + } + } + Ok(sizes) +} + +#[cfg(test)] +#[test] +fn test_get_sizes() { + let retbuf = "10#20#30#".to_owned(); + let sizes = get_sizes(retbuf).unwrap(); + assert_eq!(sizes, vec![10usize, 20usize, 30usize]); +}