Reimplement terrapipe metaframe

next
Sayan Nandan 4 years ago
parent 5cce287ff5
commit ab24e6588f
No known key found for this signature in database
GPG Key ID: C31EFD7DDA12AEE0

87
Cargo.lock generated

@ -25,10 +25,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822"
[[package]]
name = "devtimer"
version = "4.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6035b7b9244bf9637cd7ef80b5e1c54404bef92cccd34738c85c45f04ae8b244"
name = "corelib"
version = "0.1.0"
dependencies = [
"lazy_static",
]
[[package]]
name = "fnv"
@ -58,17 +59,6 @@ version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "59f5fff90fd5d971f936ad674802482ba441b6f09ba5e15fd8b39145582ca399"
[[package]]
name = "getrandom"
version = "0.1.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7abc8dd8451921606d809ba32e95b6111925cd2906060d2dcc29c070220503eb"
dependencies = [
"cfg-if",
"libc",
"wasi",
]
[[package]]
name = "hermit-abi"
version = "0.1.15"
@ -109,15 +99,6 @@ version = "0.2.72"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a9f8082297d534141b30c8d39e9b1773713ab50fdbe4ff30f750d063b3bfd701"
[[package]]
name = "libcore"
version = "0.1.0"
dependencies = [
"devtimer",
"lazy_static",
"rand",
]
[[package]]
name = "log"
version = "0.4.8"
@ -224,12 +205,6 @@ version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "282adbf10f2698a7a77f8e983a74b2d18176c19a7fd32a45446139ae7b02b715"
[[package]]
name = "ppv-lite86"
version = "0.2.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "237a5ed80e274dbc66f86bd59c1e25edc039660be53194b5fe0a482e0f2612ea"
[[package]]
name = "proc-macro2"
version = "1.0.18"
@ -248,47 +223,6 @@ dependencies = [
"proc-macro2",
]
[[package]]
name = "rand"
version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6a6b1679d49b24bbfe0c803429aa1874472f50d9b363131f0e89fc356b544d03"
dependencies = [
"getrandom",
"libc",
"rand_chacha",
"rand_core",
"rand_hc",
]
[[package]]
name = "rand_chacha"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f4c8ed856279c9737206bf725bf36935d8666ead7aa69b52be55af369d193402"
dependencies = [
"ppv-lite86",
"rand_core",
]
[[package]]
name = "rand_core"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "90bde5296fc891b0cef12a6d03ddccc162ce7b2aff54160af9338f8d40df6d19"
dependencies = [
"getrandom",
]
[[package]]
name = "rand_hc"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ca3129af7b92a17112d59ad498c6f81eaf463253766b90396d39ea7a39d6613c"
dependencies = [
"rand_core",
]
[[package]]
name = "redox_syscall"
version = "0.1.57"
@ -339,7 +273,7 @@ name = "terrabase"
version = "0.1.0"
dependencies = [
"bytes",
"libcore",
"corelib",
"tokio",
]
@ -381,9 +315,6 @@ dependencies = [
[[package]]
name = "tsh"
version = "0.1.0"
dependencies = [
"libcore",
]
[[package]]
name = "unicode-xid"
@ -391,12 +322,6 @@ version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7fe0bb3479651439c9112f72b6c505038574c9fbb575ed1bf3b797fa39dd564"
[[package]]
name = "wasi"
version = "0.9.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cccddf32554fecc6acb585f82a32a72e28b48f8c4c1883ddfeeeaa96f7d8e519"
[[package]]
name = "winapi"
version = "0.2.8"

@ -2,5 +2,5 @@
members = [
"cli",
"server",
"libcore"
"corelib"
]

@ -7,4 +7,3 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
libcore = {path="../libcore"}

@ -19,160 +19,3 @@
*
*/
use libcore::terrapipe::DEF_R_META_BUFSIZE;
use libcore::terrapipe::{Dataframe, ResponseCodes::*, ResultMetaframe, Version, QUERY_PACKET};
use std::io::{BufRead, BufReader, Read, Write};
use std::net::TcpStream;
use std::process;
const ARG_GET: &'static str = "GET";
const ARG_SET: &'static str = "SET";
const ARG_UPDATE: &'static str = "UPDATE";
const ARG_DEL: &'static str = "DEL";
const ARG_EXIT: &'static str = "EXIT";
const ERR_MULTIPLE_GET: &'static str = "Multiple GETs aren't supported yet";
const ERR_MULTIPLE_SET: &'static str = "Multiple SETs aren't supported yet";
const ERR_MULTIPLE_UPDATE: &'static str = "Multiple UPDATEs aren't supported yet";
const ERR_MULTIPLE_DEL: &'static str = "Multiple DELs aren't supported yet";
const SELF_VERSION: Version = Version(0, 1, 0);
const ADDR: &'static str = "127.0.0.1:2003";
pub const EXIT_ERROR: fn(error: &str) -> ! = |error| {
eprintln!("error: {}", error);
process::exit(0x100);
};
const NORMAL_ERROR: fn(error: &str) = |error| {
eprintln!("error: {}", error);
};
pub fn run(args: String) {
let args: Vec<&str> = args.split_whitespace().collect();
match args[0].to_uppercase().as_str() {
ARG_GET => {
if let Some(key) = args.get(1) {
if args.get(2).is_none() {
send_query(QUERY_PACKET(
SELF_VERSION,
ARG_GET.to_owned(),
key.to_string(),
));
} else {
NORMAL_ERROR(ERR_MULTIPLE_GET);
}
} else {
NORMAL_ERROR("Expected one more argument");
}
}
ARG_SET => {
if let Some(key) = args.get(1) {
if let Some(value) = args.get(2) {
if args.get(3).is_none() {
send_query(QUERY_PACKET(
SELF_VERSION,
ARG_SET.to_owned(),
format!("{} {}", key, value),
))
} else {
NORMAL_ERROR(ERR_MULTIPLE_SET);
}
} else {
NORMAL_ERROR("Expected one more argument");
}
} else {
NORMAL_ERROR("Expected more arguments");
}
}
ARG_UPDATE => {
if let Some(key) = args.get(1) {
if let Some(value) = args.get(2) {
if args.get(3).is_none() {
send_query(QUERY_PACKET(
SELF_VERSION,
ARG_UPDATE.to_owned(),
format!("{} {}", key, value),
))
} else {
NORMAL_ERROR(ERR_MULTIPLE_UPDATE);
}
} else {
NORMAL_ERROR("Expected one more argument");
}
} else {
NORMAL_ERROR("Expected more arguments");
}
}
ARG_DEL => {
if let Some(key) = args.get(1) {
if args.get(2).is_none() {
send_query(QUERY_PACKET(
SELF_VERSION,
ARG_DEL.to_owned(),
key.to_string(),
));
} else {
NORMAL_ERROR(ERR_MULTIPLE_DEL);
}
} else {
NORMAL_ERROR("Expected one more argument");
}
}
ARG_EXIT => {
println!("Goodbye!");
process::exit(0x100)
}
_ => NORMAL_ERROR("Unknown command"),
}
}
fn send_query(query: Vec<u8>) {
let mut binding = match TcpStream::connect(ADDR) {
Ok(b) => b,
Err(_) => EXIT_ERROR("Couldn't connect to Terrabase"),
};
match binding.write(&query) {
Ok(_) => (),
Err(_) => EXIT_ERROR("Couldn't read data from socket"),
}
let mut bufreader = BufReader::new(binding);
let mut buf = String::with_capacity(DEF_R_META_BUFSIZE);
match bufreader.read_line(&mut buf) {
Ok(_) => (),
Err(_) => EXIT_ERROR("Failed to read line from socket"),
}
let rmf = match ResultMetaframe::from_buffer(buf) {
Ok(mf) => mf,
Err(e) => {
NORMAL_ERROR(&e.to_string());
return;
}
};
match &rmf.response {
Okay(_) => (),
x @ _ => {
NORMAL_ERROR(&x.to_string());
return;
}
}
let mut data_buffer = vec![0; rmf.get_content_size()];
match bufreader.read(&mut data_buffer) {
Ok(_) => (),
Err(_) => EXIT_ERROR("Failed to read line from socket"),
}
let df = match Dataframe::from_buffer(rmf.get_content_size(), data_buffer) {
Ok(d) => d,
Err(e) => {
NORMAL_ERROR(&e.to_string());
return;
}
};
let res = df.deflatten();
if res.len() == 0 {
return;
} else {
if res.len() == 1 {
println!("{}", res[0]);
} else {
println!("{:?}", res);
}
}
}

@ -28,20 +28,6 @@ const MSG_WELCOME: &'static str = "Terrabase | Version 0.1.0\nCopyright (c) 2020
fn main() {
println!("{}", MSG_WELCOME);
loop {
let mut buffer = String::new();
print!("tsh> ");
match io::stdout().flush() {
Ok(_) => (),
Err(_) => argparse::EXIT_ERROR("Failed to flush output stream"),
};
match io::stdin().read_line(&mut buffer) {
Ok(_) => (),
Err(_) => argparse::EXIT_ERROR("Failed to read line and append to buffer"),
};
if buffer.len() != 0 {
argparse::run(buffer);
} else {
continue;
}
}
}

@ -1,5 +1,5 @@
[package]
name = "libcore"
name = "corelib"
version = "0.1.0"
authors = ["Sayan Nandan <nandansayan@outlook.com>"]
edition = "2018"
@ -8,8 +8,3 @@ edition = "2018"
[dependencies]
lazy_static = "1.4.0"
[dev-dependencies]
devtimer = "4.0.0"
rand = "0.7.3"

@ -0,0 +1,55 @@
/*
* Created on Sat Jul 18 2020
*
* This file is a part of the source code for the Terrabase database
* Copyright (c) 2020, Sayan Nandan <ohsayan at outlook dot com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
pub const DEF_QMETA_BUFSIZE: usize = 44;
pub mod responses {
use lazy_static::lazy_static;
lazy_static! {
pub static ref RESP_OKAY_EMPTY: Vec<u8> = "0!0!0#".as_bytes().to_owned();
pub static ref RESP_NOT_FOUND: Vec<u8> = "1!0!0#".as_bytes().to_owned();
pub static ref RESP_OVERWRITE_ERROR: Vec<u8> = "2!0!0#".as_bytes().to_owned();
pub static ref RESP_INVALID_MF: Vec<u8> = "3!0!0#".as_bytes().to_owned();
pub static ref RESP_INCOMPLETE: Vec<u8> = "4!0!0#".as_bytes().to_owned();
pub static ref RESP_SERVER_ERROR: Vec<u8> = "5!0!0#".as_bytes().to_owned();
}
}
#[derive(Debug, PartialEq)]
pub enum RespCodes {
Okay(Option<String>),
NotFound,
OverwriteError,
InvalidMetaframe,
Incomplete,
ServerError,
OtherError(Option<String>),
}
#[derive(Debug, PartialEq)]
pub enum ActionType {
Simple,
Pipeline,
}
pub trait Response {
fn into_response(&self) -> Vec<u8>;
}

@ -1,23 +0,0 @@
/*
* Created on Thu Jul 02 2020
*
* This file is a part of the source code for the Terrabase database
* Copyright (c) 2020, Sayan Nandan <ohsayan at outlook dot com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
pub mod terrapipe;
pub use terrapipe::SELF_VERSION;

@ -1,438 +0,0 @@
/*
* Created on Thu Jul 02 2020
*
* This file is a part of the source code for the Terrabase database
* Copyright (c) 2020, Sayan Nandan <ohsayan at outlook dot com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
#![allow(unused)]
//! This implements the Terrapipe protocol
use lazy_static::lazy_static;
use std::fmt;
use std::mem;
/// Current version of the terrapipe protocol
pub const SELF_VERSION: Version = Version(0, 1, 0);
/// Metaframe protocol tag
pub const MF_PROTOCOL_TAG: &'static str = "TP";
/// Metaframe query tag
pub const MF_QUERY_TAG: &'static str = "Q";
/// Metaframe response tag
pub const MF_RESPONSE_TAG: &'static str = "R";
/// Metaframe separator ("/")
pub const MF_SEPARATOR: &'static str = "/";
/// Metaframe `GET` tag
pub const MF_METHOD_GET: &'static str = "GET";
/// Metaframe `SET` tag
pub const MF_METHOD_SET: &'static str = "SET";
/// Metaframe `UPDATE` tag
pub const MF_METHOD_UPDATE: &'static str = "UPDATE";
/// Metaframe `DEL` tag
pub const MF_METHOD_DEL: &'static str = "DEL";
/// ## The default buffer size for the query metaframe
/// This currently enables sizes upto 2^64 to be handled. Since
/// `floor(log10(2^64))+1` = 20 digits and the maximum size of the query
/// metaframe __currently__ is 26 - the buffer size, should be 46
pub const DEF_Q_META_BUFSIZE: usize = 46;
/// ## The default buffer size for the response metaframe
/// This currently enables sizes upto 2^64 to be handled. Since 2^64 has 20 digits
/// and the maximum size of the response metaframe is 20 - the buffer size is kept at 40
pub const DEF_R_META_BUFSIZE: usize = 40;
// HACK(@ohsayan) This is a temporary workaround since `const fn`s don't support `match` yet
/// Constant function to generate a response packet
pub const RESPONSE_PACKET: fn(version: Version, respcode: u8, data: &str) -> Vec<u8> =
|version, respcode, data| {
let res = format!(
"TP/{}.{}.{}/R/{}/{}\n{}",
version.0,
version.1,
version.2,
respcode,
data.len(),
data,
);
res.as_bytes().to_vec()
};
// HACK(@ohsayan) This is a temporary workaround since `const fn`s don't support `match` yet
/// Constant function to generate a query packet
pub const QUERY_PACKET: fn(version: Version, method: String, data: String) -> Vec<u8> =
|version, method, data| {
let res = format!(
"TP/{}.{}.{}/Q/{}/{}\n{}",
version.0,
version.1,
version.2,
method,
data.len(),
data
);
res.as_bytes().to_vec()
};
// Evaluate the common error packets at compile-time
lazy_static! {
/// Success: empty response
static ref RESP_OKAY_EMPTY: Vec<u8> = RESPONSE_PACKET(SELF_VERSION, 0, "");
/// Error response when the target is not found
static ref RESP_NOT_FOUND: Vec<u8> = RESPONSE_PACKET(SELF_VERSION, 1, "");
/// Error response when the target key already exists and cannot be overwritten
static ref RESP_OVERWRITE_ERROR: Vec<u8> = RESPONSE_PACKET(SELF_VERSION, 2, "");
/// Error response when the method in the query is not allowed/deprecated/not supported yet
static ref RESP_METHOD_NOT_ALLOWED: Vec<u8> = RESPONSE_PACKET(SELF_VERSION, 3, "");
/// Error response when the server has a problem processing the response
static ref RESP_INTERNAL_SERVER_ERROR: Vec<u8> = RESPONSE_PACKET(SELF_VERSION, 4, "");
/// Error response when the metaframe contains invalid tokens
static ref RESP_INVALID_MF: Vec<u8> = RESPONSE_PACKET(SELF_VERSION, 5, "");
/// Error response when the dataframe doesn't contain the expected bytes
static ref RESP_CORRUPT_DF: Vec<u8> = RESPONSE_PACKET(SELF_VERSION, 6, "");
/// Error response when the protocol used by the client is not supported by the server
static ref RESP_PROTOCOL_VERSION_MISMATCH: Vec<u8> = RESPONSE_PACKET(SELF_VERSION, 7, "");
/// Error response when the query packet is missing the basic information, usually a newline
static ref RESP_CORRUPT_PACKET: Vec<u8> = RESPONSE_PACKET(SELF_VERSION, 8, "");
}
/// A minimal _Semver_ implementation
pub struct Version(pub u8, pub u8, pub u8);
impl Version {
/// Create a new `Version` instance from an `&str`
/// ## Errors
/// Returns `None` when the string passed isn't in the correct format
pub fn from_str<'a>(vstr: &'a str) -> Option<Self> {
let vstr: Vec<&str> = vstr.split(".").collect();
if vstr.len() != 3 {
return None;
}
if let (Ok(major), Ok(minor), Ok(patch)) = (
vstr[0].parse::<u8>(),
vstr[1].parse::<u8>(),
vstr[2].parse::<u8>(),
) {
Some(Version(major, minor, patch))
} else {
None
}
}
/// Check if `other` is compatible with the current version as required
/// by _Semver_
pub fn incompatible_with(&self, other: &Version) -> bool {
if self.0 == other.0 {
false
} else {
true
}
}
}
#[derive(Debug)]
/// Response codes which are returned by the server
pub enum ResponseCodes {
/// `0` : Okay
Okay(Option<String>),
/// `1` : Not Found
NotFound,
/// `2` : Overwrite Error
OverwriteError,
/// `3` : Method Not Allowed
MethodNotAllowed,
/// `4` : Internal Server Error
InternalServerError,
/// `5` : Invalid Metaframe
InvalidMetaframe,
/// `6` : Corrupt Dataframe
CorruptDataframe,
/// `7` : Protocol Version Mismatch
ProtocolVersionMismatch,
/// `8` : Corrupt Packet
CorruptPacket,
}
impl fmt::Display for ResponseCodes {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
use ResponseCodes::*;
match self {
Okay(v) => {
if let Some(v) = v {
write!(f, "{}", v)
} else {
write!(f, "")
}
}
NotFound => write!(f, "The target could not be found"),
OverwriteError => write!(f, "Existing values cannot be overwritten"),
MethodNotAllowed => write!(f, "The method is not supported"),
InternalServerError => write!(f, "An internal server error occurred"),
InvalidMetaframe => write!(f, "The query had an invalid metaframe"),
CorruptDataframe => write!(f, "The query did not contain the required data"),
ProtocolVersionMismatch => write!(
f,
"The server doesn't support the protocol being used"
),
CorruptPacket => write!(f, "The query did not have the required data"),
}
}
}
impl ResponseCodes {
/// Instantiate a new `ResponseCodes` variant from an `u8` value
/// ## Errors
/// Returns `None` when the `u8` doesn't correspond to any of the
/// response codes
pub fn from_u8(code: u8) -> Option<Self> {
use ResponseCodes::*;
let c = match code {
0 => Okay(None),
1 => NotFound,
2 => OverwriteError,
3 => MethodNotAllowed,
4 => InternalServerError,
5 => InvalidMetaframe,
6 => CorruptDataframe,
7 => ProtocolVersionMismatch,
8 => CorruptPacket,
_ => return None,
};
Some(c)
}
}
/// Any object implementing this trait can be converted into a response
pub trait ResponseBytes {
/// Return a `Vec<u8>` with the response
fn response_bytes(&self) -> Vec<u8>;
}
impl ResponseBytes for ResponseCodes {
fn response_bytes(&self) -> Vec<u8> {
use ResponseCodes::*;
match self {
Okay(val) => {
if let Some(dat) = val {
RESPONSE_PACKET(SELF_VERSION, 0, dat)
} else {
RESP_OKAY_EMPTY.to_vec()
}
}
NotFound => RESP_NOT_FOUND.to_vec(),
OverwriteError => RESP_OVERWRITE_ERROR.to_vec(),
MethodNotAllowed => RESP_METHOD_NOT_ALLOWED.to_vec(),
InternalServerError => RESP_INTERNAL_SERVER_ERROR.to_vec(),
InvalidMetaframe => RESP_INVALID_MF.to_vec(),
CorruptDataframe => RESP_CORRUPT_DF.to_vec(),
ProtocolVersionMismatch => RESP_PROTOCOL_VERSION_MISMATCH.to_vec(),
CorruptPacket => RESP_CORRUPT_PACKET.to_vec(),
}
}
}
/// Query methods that can be used by the client
#[derive(Debug, PartialEq)]
pub enum QueryMethod {
/// A `GET` query
GET,
/// A `SET` query
SET,
/// An `UPDATE` query
UPDATE,
/// A `DEL` query
DEL,
}
/// The query metaframe
#[derive(Debug, PartialEq)]
pub struct QueryMetaframe {
/// The content size that is to be read by the server, from the data packet
content_size: usize,
/// The query method that the client has issued to the server
method: QueryMethod,
}
impl QueryMetaframe {
/// Create a query metaframe instance from a `String` buffer
/// ## Errors
/// Returns `ResponseCodes` which dictate what error has occurred
pub fn from_buffer(buf: &String) -> Result<QueryMetaframe, ResponseCodes> {
let mf_parts: Vec<&str> = buf.split(MF_SEPARATOR).collect();
if mf_parts.len() != 5 {
return Err(ResponseCodes::InvalidMetaframe);
}
if mf_parts[0] != MF_PROTOCOL_TAG || mf_parts[2] != MF_QUERY_TAG {
return Err(ResponseCodes::InvalidMetaframe);
}
let version = match Version::from_str(&mf_parts[1]) {
None => return Err(ResponseCodes::InvalidMetaframe),
Some(v) => v,
};
if SELF_VERSION.incompatible_with(&version) {
return Err(ResponseCodes::ProtocolVersionMismatch);
}
// The size may have extra code point 0s - remove them
let cs = mf_parts[4].trim_matches(char::from(0)).trim();
let content_size = match cs.parse::<usize>() {
Ok(csize) => csize,
Err(e) => {
eprintln!("Errored: {}", e);
return Err(ResponseCodes::InvalidMetaframe);
}
};
let method = match mf_parts[3] {
MF_METHOD_GET => QueryMethod::GET,
MF_METHOD_SET => QueryMethod::SET,
MF_METHOD_UPDATE => QueryMethod::UPDATE,
MF_METHOD_DEL => QueryMethod::DEL,
_ => return Err(ResponseCodes::MethodNotAllowed),
};
Ok(QueryMetaframe {
content_size,
method,
})
}
/// Get the number of bytes that the server should expect from the dataframe
pub fn get_content_size(&self) -> usize {
self.content_size
}
/// Get the method to be used
pub fn get_method(&self) -> &QueryMethod {
&self.method
}
}
/// A `Dataframe` is simply treated as a blob which contains bytes in the form
/// of a `String`
#[derive(Debug)]
pub struct Dataframe(String);
impl Dataframe {
/// Create a new `Dataframe` instance from a `Vec<u8>` buffer
/// ## Errors
/// When the `target_size` is not the same as the size of the buffer, this
/// returns a `CorruptDataframe` response code
pub fn from_buffer(target_size: usize, buffer: Vec<u8>) -> Result<Dataframe, ResponseCodes> {
let buffer = String::from_utf8_lossy(&buffer);
let buffer = buffer.trim();
if buffer.len() != target_size {
return Err(ResponseCodes::CorruptDataframe);
}
Ok(Dataframe(buffer.to_string()))
}
/// Deflatten the dataframe into a `Vec` of actions/identifiers/bytes
pub fn deflatten(&self) -> Vec<&str> {
self.0.split_whitespace().collect()
}
}
#[cfg(test)]
#[test]
fn test_metaframe() {
let v = Version(0, 1, 0);
let mut goodframe = String::from("TP/0.1.0/Q/GET/5");
let res = QueryMetaframe::from_buffer(&goodframe);
let mf_should_be = QueryMetaframe {
content_size: 5,
method: QueryMethod::GET,
};
assert_eq!(res.ok().unwrap(), mf_should_be);
}
#[cfg(test)]
#[test]
fn benchmark_metaframe_parsing() {
let v = Version(0, 1, 0);
use devtimer::run_benchmark;
use rand::Rng;
let mut rng = rand::thread_rng();
let mut metaframes: Vec<String> = Vec::with_capacity(50000);
(0..50000).for_each(|_| {
let s = rng.gen_range(0, usize::MAX);
let mut buf = format!("TP/0.1.0/Q/GET/5/{}", s);
metaframes.push(buf);
});
let b = run_benchmark(50000, |n| {
let _ = QueryMetaframe::from_buffer(&metaframes[n]).ok().unwrap();
});
b.print_stats();
}
impl fmt::Display for ResultError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
use ResultError::*;
match self {
StandardError(r) => write!(f, "{}", r),
UnknownError(u) => write!(f, "The server responded with '{}'", u),
}
}
}
#[derive(Debug)]
/// Errors that may occur when parsing a response packet from the server
pub enum ResultError {
/// A standard response code used by the Terrapipe protocol
StandardError(ResponseCodes),
/// Some nonsense response code that may be returned by a buggy or patched server
UnknownError(String),
}
#[derive(Debug)]
/// A result metaframe
pub struct ResultMetaframe {
content_size: usize,
pub response: ResponseCodes,
}
impl ResultMetaframe {
/// Instantiate a new metaframe from a `String` buffer
/// ## Errors
/// Returns a `ResultError` in case something happened while parsing the
/// response packet's metaframe
pub fn from_buffer(buf: String) -> Result<ResultMetaframe, ResultError> {
use ResultError::*;
let mf_parts = buf.trim();
let mf_parts: Vec<&str> = mf_parts.split("/").collect();
if mf_parts.len() != 5 {
return Err(StandardError(ResponseCodes::InvalidMetaframe));
}
if mf_parts[0] != MF_PROTOCOL_TAG && mf_parts[2] != MF_RESPONSE_TAG {
return Err(StandardError(ResponseCodes::InvalidMetaframe));
}
let response = match mf_parts[3].parse::<u8>() {
Ok(r) => match ResponseCodes::from_u8(r) {
Some(rcode) => rcode,
None => return Err(UnknownError(mf_parts[3].to_owned())),
},
Err(_) => return Err(UnknownError(mf_parts[3].to_owned())),
};
// The size may have extra code point 0s - remove them
let cs = mf_parts[4].trim_matches(char::from(0));
let content_size = match cs.parse::<usize>() {
Ok(csize) => csize,
Err(_) => return Err(StandardError(ResponseCodes::InvalidMetaframe)),
};
Ok(ResultMetaframe {
content_size,
response,
})
}
pub fn get_content_size(&self) -> usize {
self.content_size
}
}

@ -8,6 +8,5 @@ edition = "2018"
[dependencies]
tokio = { version = "0.2.21", features = ["full"] }
# Import `libcore` which contains code for terrapipe and other utils
libcore = {path="../libcore"}
bytes = "0.5.6"
corelib = {path ="../corelib"}

@ -19,66 +19,3 @@
*
*/
use libcore::terrapipe::ResponseCodes;
use std::collections::{hash_map::Entry, HashMap};
use std::sync::{Arc, RwLock};
pub struct CoreDB {
shared: Arc<Coretable>,
}
pub struct Coretable {
coremap: RwLock<HashMap<String, String>>,
}
impl Coretable {
pub fn get(&self, key: &str) -> Result<String, ResponseCodes> {
if let Some(value) = self.coremap.read().unwrap().get(key) {
Ok(value.to_string())
} else {
Err(ResponseCodes::NotFound)
}
}
pub fn set(&self, key: &str, value: &str) -> Result<(), ResponseCodes> {
match self.coremap.write().unwrap().entry(key.to_string()) {
Entry::Occupied(_) => return Err(ResponseCodes::OverwriteError),
Entry::Vacant(e) => {
let _ = e.insert(value.to_string());
Ok(())
}
}
}
pub fn update(&self, key: &str, value: &str) -> Result<(), ResponseCodes> {
match self.coremap.write().unwrap().entry(key.to_string()) {
Entry::Occupied(ref mut e) => {
e.insert(value.to_string());
Ok(())
}
Entry::Vacant(_) => Err(ResponseCodes::NotFound),
}
}
pub fn del(&self, key: &str) -> Result<(), ResponseCodes> {
if let Some(_) = self.coremap.write().unwrap().remove(&key.to_owned()) {
Ok(())
} else {
Err(ResponseCodes::NotFound)
}
}
#[cfg(debug)]
pub fn print_debug_table(&self) {
println!("{:#?}", *self.coremap.read().unwrap());
}
}
impl CoreDB {
pub fn new() -> Self {
CoreDB {
shared: Arc::new(Coretable {
coremap: RwLock::new(HashMap::new()),
}),
}
}
pub fn get_handle(&self) -> Arc<Coretable> {
Arc::clone(&self.shared)
}
}

@ -1,74 +0,0 @@
/*
* Created on Mon Jul 13 2020
*
* This file is a part of the source code for the Terrabase database
* Copyright (c) 2020, Sayan Nandan <ohsayan at outlook dot com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
use libcore::terrapipe::{
Dataframe, QueryMetaframe, ResponseBytes, ResponseCodes, DEF_Q_META_BUFSIZE,
};
use tokio::io::{self, AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
use tokio::net::TcpStream;
use tokio::time::{self, timeout};
pub struct Connection {
stream: TcpStream,
}
pub struct Query {
qmf: QueryMetaframe,
df: Dataframe,
}
impl Connection {
pub fn new(stream: TcpStream) -> Self {
Connection { stream }
}
pub async fn get_query_packet(&mut self) -> Result<Query, impl ResponseBytes> {
let mut meta_buffer = String::with_capacity(DEF_Q_META_BUFSIZE);
let mut bufreader = BufReader::new(&mut self.stream);
match bufreader.read_line(&mut meta_buffer).await {
Ok(_) => (),
Err(_) => {
return Err(ResponseCodes::InternalServerError);
}
}
let qmf = match QueryMetaframe::from_buffer(&meta_buffer) {
Ok(qmf) => qmf,
Err(e) => return Err(e),
};
let mut data_buffer = vec![0; qmf.get_content_size()];
match timeout(
time::Duration::from_millis(400),
bufreader.read(&mut data_buffer),
)
.await
{
Ok(_) => (),
Err(_) => return Err(ResponseCodes::InternalServerError),
}
let df = match Dataframe::from_buffer(qmf.get_content_size(), data_buffer) {
Ok(d) => d,
Err(e) => return Err(e),
};
Ok(Query { qmf, df })
}
pub async fn write_response_packet(&mut self, resp: Vec<u8>) -> io::Result<()> {
self.stream.write_all(&resp).await
}
}

@ -23,107 +23,22 @@ use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream};
mod protocol;
mod coredb;
mod dbnet;
use std::cmp::Ordering;
use std::sync::Arc;
// Internal modules
use libcore::terrapipe::{
Dataframe, QueryMetaframe, QueryMethod, ResponseBytes, ResponseCodes, Version,
DEF_Q_META_BUFSIZE,
};
static ADDR: &'static str = "127.0.0.1:2003";
#[tokio::main]
async fn main() {
let mut listener = TcpListener::bind(ADDR).await.unwrap();
println!("Server running on terrapipe://127.0.0.1:2003");
let db = coredb::CoreDB::new();
loop {
let handle = db.get_handle();
let (mut socket, _) = listener.accept().await.unwrap();
tokio::spawn(async move {
let mut meta_buffer = String::with_capacity(DEF_Q_META_BUFSIZE);
let mut reader = BufReader::with_capacity(DEF_Q_META_BUFSIZE, &mut socket);
reader.read_line(&mut meta_buffer).await.unwrap();
let mf = match QueryMetaframe::from_buffer(&meta_buffer) {
Ok(m) => m,
Err(e) => return close_conn_with_error(socket, e.response_bytes()).await,
};
let mut data_buffer = vec![0; mf.get_content_size()];
reader.read(&mut data_buffer).await.unwrap();
let df = match Dataframe::from_buffer(mf.get_content_size(), data_buffer) {
Ok(d) => d,
Err(e) => return close_conn_with_error(socket, e.response_bytes()).await,
};
return execute_query(socket, handle, mf, df).await;
});
tokio::spawn(async move {});
}
}
async fn close_conn_with_error(mut stream: TcpStream, bytes: Vec<u8>) {
stream.write_all(&bytes).await.unwrap()
}
async fn execute_query(
mut stream: TcpStream,
handle: Arc<coredb::Coretable>,
mf: QueryMetaframe,
df: Dataframe,
) {
let vars = df.deflatten();
use QueryMethod::*;
match mf.get_method() {
GET => {
let result = match vars.len().cmp(&1) {
Ordering::Equal => {
if let Ok(v) = handle.get(vars[0]) {
ResponseCodes::Okay(Some(v.to_string()))
} else {
ResponseCodes::NotFound
}
}
_ => ResponseCodes::CorruptDataframe,
};
stream.write(&result.response_bytes()).await.unwrap();
}
SET => {
let result = match vars.len().cmp(&2) {
Ordering::Equal => match handle.set(vars[0], vars[1]) {
Ok(_) => ResponseCodes::Okay(None),
Err(e) => e,
},
_ => ResponseCodes::CorruptDataframe,
};
#[cfg(debug)]
handle.print_debug_table();
stream.write(&result.response_bytes()).await.unwrap();
}
UPDATE => {
let result = match vars.len().cmp(&2) {
Ordering::Equal => match handle.update(vars[0], vars[1]) {
Ok(_) => ResponseCodes::Okay(None),
Err(e) => e,
},
_ => ResponseCodes::CorruptDataframe,
};
#[cfg(debug)]
handle.print_debug_table();
stream.write(&result.response_bytes()).await.unwrap();
}
DEL => {
let result = match vars.len().cmp(&1) {
Ordering::Equal => match handle.del(vars[0]) {
Ok(_) => ResponseCodes::Okay(None),
Err(e) => e,
},
_ => ResponseCodes::CorruptDataframe,
};
#[cfg(debug)]
handle.print_debug_table();
stream.write(&result.response_bytes()).await.unwrap();
}
}
}

@ -0,0 +1,108 @@
/*
* Created on Sat Jul 18 2020
*
* This file is a part of the source code for the Terrabase database
* Copyright (c) 2020, Sayan Nandan <ohsayan at outlook dot com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
use bytes::{Buf, Bytes};
use corelib::responses;
use corelib::ActionType;
use std::io::{BufRead, Cursor};
#[derive(Debug, PartialEq)]
pub struct PreQMF {
action_type: ActionType,
content_size: usize,
metaline_size: usize,
}
impl PreQMF {
pub fn from_buffer(buf: String) -> Result<Self, Vec<u8>> {
let buf: Vec<&str> = buf.split('!').collect();
if let (Some(atype), Some(csize), Some(metaline_size)) =
(buf.get(0), buf.get(1), buf.get(2))
{
if let Some(atype) = atype.chars().next() {
let atype = match atype {
'+' => ActionType::Simple,
'$' => ActionType::Pipeline,
_ => return Err(responses::RESP_INVALID_MF.to_owned()),
};
let (csize, metaline_size) =
match (csize.parse::<usize>(), metaline_size.parse::<usize>()) {
(Ok(x), Ok(y)) => (x, y),
_ => return Err(responses::RESP_INVALID_MF.to_owned()),
};
return Ok(PreQMF {
action_type: atype,
content_size: csize,
metaline_size,
});
}
}
Err(responses::RESP_INVALID_MF.to_owned())
}
}
#[cfg(test)]
#[test]
fn test_preqmf() {
let read_what = "+!12!4".to_owned();
let preqmf = PreQMF::from_buffer(read_what).unwrap();
let pqmf_should_be = PreQMF {
action_type: ActionType::Simple,
content_size: 12,
metaline_size: 4,
};
assert_eq!(pqmf_should_be, preqmf);
let a_pipe = "$!12!4".to_owned();
let preqmf = PreQMF::from_buffer(a_pipe).unwrap();
let pqmf_should_be = PreQMF {
action_type: ActionType::Pipeline,
content_size: 12,
metaline_size: 4,
};
assert_eq!(preqmf, pqmf_should_be);
}
pub fn get_sizes(stream: String) -> Result<Vec<usize>, Vec<u8>> {
let sstr: Vec<&str> = stream.split('#').collect();
let mut sstr_iter = sstr.into_iter().peekable();
let mut sizes = Vec::with_capacity(sstr_iter.len());
while let Some(size) = sstr_iter.next() {
if sstr_iter.peek().is_some() {
// Skip the last element
if let Ok(val) = size.parse::<usize>() {
sizes.push(val);
} else {
return Err(responses::RESP_INVALID_MF.to_owned());
}
} else {
break;
}
}
Ok(sizes)
}
#[cfg(test)]
#[test]
fn test_get_sizes() {
let retbuf = "10#20#30#".to_owned();
let sizes = get_sizes(retbuf).unwrap();
assert_eq!(sizes, vec![10usize, 20usize, 30usize]);
}
Loading…
Cancel
Save