Document terrapipe

next
Sayan Nandan 4 years ago
parent a78e54b215
commit e4663a6206
No known key found for this signature in database
GPG Key ID: C31EFD7DDA12AEE0

@ -20,23 +20,40 @@
*/ */
#![allow(unused)] #![allow(unused)]
//! This is an implementation of [Terrabase/RFC1](https://github.com/terrabasedb/rfcs/pull/1) //! This implements the Terrapipe protocol
use lazy_static::lazy_static; use lazy_static::lazy_static;
use std::fmt; use std::fmt;
use std::mem; use std::mem;
/// Current version of the terrapipe protocol
pub const SELF_VERSION: Version = Version(0, 1, 0); pub const SELF_VERSION: Version = Version(0, 1, 0);
/// Metaframe protocol tag
pub const MF_PROTOCOL_TAG: &'static str = "TP"; pub const MF_PROTOCOL_TAG: &'static str = "TP";
/// Metaframe query tag
pub const MF_QUERY_TAG: &'static str = "Q"; pub const MF_QUERY_TAG: &'static str = "Q";
/// Metaframe response tag
pub const MF_RESPONSE_TAG: &'static str = "R"; pub const MF_RESPONSE_TAG: &'static str = "R";
/// Metaframe separator ("/")
pub const MF_SEPARATOR: &'static str = "/"; pub const MF_SEPARATOR: &'static str = "/";
/// Metaframe `GET` tag
pub const MF_METHOD_GET: &'static str = "GET"; pub const MF_METHOD_GET: &'static str = "GET";
/// Metaframe `SET` tag
pub const MF_METHOD_SET: &'static str = "SET"; pub const MF_METHOD_SET: &'static str = "SET";
/// Metaframe `UPDATE` tag
pub const MF_METHOD_UPDATE: &'static str = "UPDATE"; pub const MF_METHOD_UPDATE: &'static str = "UPDATE";
/// Metaframe `DEL` tag
pub const MF_METHOD_DEL: &'static str = "DEL"; pub const MF_METHOD_DEL: &'static str = "DEL";
/// ## The default buffer size for the query metaframe
/// This currently enables sizes upto 2^64 to be handled. Since
/// `floor(log10(2^64))+1` = 20 digits and the maximum size of the query
/// metaframe __currently__ is 26 - the buffer size, should be 46
pub const DEF_Q_META_BUFSIZE: usize = 46; pub const DEF_Q_META_BUFSIZE: usize = 46;
/// ## The default buffer size for the response metaframe
/// This currently enables sizes upto 2^64 to be handled. Since 2^64 has 20 digits
/// and the maximum size of the response metaframe is 20 - the buffer size is kept at 40
pub const DEF_R_META_BUFSIZE: usize = 40; pub const DEF_R_META_BUFSIZE: usize = 40;
/// Constant function to generate a response packet
const RESPONSE_PACKET: fn(version: Version, respcode: u8, data: &str) -> Vec<u8> = const RESPONSE_PACKET: fn(version: Version, respcode: u8, data: &str) -> Vec<u8> =
|version, respcode, data| { |version, respcode, data| {
let res = format!( let res = format!(
@ -51,21 +68,50 @@ const RESPONSE_PACKET: fn(version: Version, respcode: u8, data: &str) -> Vec<u8>
res.as_bytes().to_vec() res.as_bytes().to_vec()
}; };
/// Constant function to generate a query packet
const QUERY_PACKET: fn(version: Version, method: String, data: String) -> Vec<u8> =
|version, method, data| {
let res = format!(
"TP/{}.{}.{}/Q/{}/{}\n{}",
version.0,
version.1,
version.2,
method,
data.len(),
data
);
res.as_bytes().to_vec()
};
// Evaluate the common error packets at compile-time
lazy_static! { lazy_static! {
/// Success: empty response
static ref RESP_OKAY_EMPTY: Vec<u8> = RESPONSE_PACKET(SELF_VERSION, 0, ""); static ref RESP_OKAY_EMPTY: Vec<u8> = RESPONSE_PACKET(SELF_VERSION, 0, "");
/// Error response when the target is not found
static ref RESP_NOT_FOUND: Vec<u8> = RESPONSE_PACKET(SELF_VERSION, 1, ""); static ref RESP_NOT_FOUND: Vec<u8> = RESPONSE_PACKET(SELF_VERSION, 1, "");
/// Error response when the target key already exists and cannot be overwritten
static ref RESP_OVERWRITE_ERROR: Vec<u8> = RESPONSE_PACKET(SELF_VERSION, 2, ""); static ref RESP_OVERWRITE_ERROR: Vec<u8> = RESPONSE_PACKET(SELF_VERSION, 2, "");
/// Error response when the method in the query is not allowed/deprecated/not supported yet
static ref RESP_METHOD_NOT_ALLOWED: Vec<u8> = RESPONSE_PACKET(SELF_VERSION, 3, ""); static ref RESP_METHOD_NOT_ALLOWED: Vec<u8> = RESPONSE_PACKET(SELF_VERSION, 3, "");
/// Error response when the server has a problem processing the response
static ref RESP_INTERNAL_SERVER_ERROR: Vec<u8> = RESPONSE_PACKET(SELF_VERSION, 4, ""); static ref RESP_INTERNAL_SERVER_ERROR: Vec<u8> = RESPONSE_PACKET(SELF_VERSION, 4, "");
/// Error response when the metaframe contains invalid tokens
static ref RESP_INVALID_MF: Vec<u8> = RESPONSE_PACKET(SELF_VERSION, 5, ""); static ref RESP_INVALID_MF: Vec<u8> = RESPONSE_PACKET(SELF_VERSION, 5, "");
/// Error response when the dataframe doesn't contain the expected bytes
static ref RESP_CORRUPT_DF: Vec<u8> = RESPONSE_PACKET(SELF_VERSION, 6, ""); static ref RESP_CORRUPT_DF: Vec<u8> = RESPONSE_PACKET(SELF_VERSION, 6, "");
/// Error response when the protocol used by the client is not supported by the server
static ref RESP_PROTOCOL_VERSION_MISMATCH: Vec<u8> = RESPONSE_PACKET(SELF_VERSION, 7, ""); static ref RESP_PROTOCOL_VERSION_MISMATCH: Vec<u8> = RESPONSE_PACKET(SELF_VERSION, 7, "");
/// Error response when the query packet is missing the basic information, usually a newline
static ref RESP_CORRUPT_PACKET: Vec<u8> = RESPONSE_PACKET(SELF_VERSION, 8, ""); static ref RESP_CORRUPT_PACKET: Vec<u8> = RESPONSE_PACKET(SELF_VERSION, 8, "");
} }
/// A minimal _Semver_ implementation
pub struct Version(pub u8, pub u8, pub u8); pub struct Version(pub u8, pub u8, pub u8);
impl Version { impl Version {
/// Create a new `Version` instance from an `&str`
/// ## Errors
/// Returns `None` when the string passed isn't in the correct format
pub fn from_str<'a>(vstr: &'a str) -> Option<Self> { pub fn from_str<'a>(vstr: &'a str) -> Option<Self> {
let vstr: Vec<&str> = vstr.split(".").collect(); let vstr: Vec<&str> = vstr.split(".").collect();
if vstr.len() != 3 { if vstr.len() != 3 {
@ -81,6 +127,8 @@ impl Version {
None None
} }
} }
/// Check if `other` is compatible with the current version as required
/// by _Semver_
pub fn incompatible_with(&self, other: &Version) -> bool { pub fn incompatible_with(&self, other: &Version) -> bool {
if self.0 == other.0 { if self.0 == other.0 {
false false
@ -90,19 +138,33 @@ impl Version {
} }
} }
/// Response codes which are returned by the server
pub enum ResponseCodes { pub enum ResponseCodes {
Okay(Option<String>), // Code: 0 /// `0` : Okay
NotFound, // Code: 1 Okay(Option<String>),
OverwriteError, // Code: 2 /// `1` : Not Found
MethodNotAllowed, // Code: 3 NotFound,
InternalServerError, // Code: 4 /// `2` : Overwrite Error
InvalidMetaframe, // Code: 5 OverwriteError,
CorruptDataframe, // Code: 6 /// `3` : Method Not Allowed
ProtocolVersionMismatch, // Code: 7 MethodNotAllowed,
CorruptPacket, // Code: 8 /// `4` : Internal Server Error
InternalServerError,
/// `5` : Invalid Metaframe
InvalidMetaframe,
/// `6` : Corrupt Dataframe
CorruptDataframe,
/// `7` : Protocol Version Mismatch
ProtocolVersionMismatch,
/// `8` : Corrupt Packet
CorruptPacket,
} }
impl ResponseCodes { impl ResponseCodes {
/// Instantiate a new `ResponseCodes` variant from an `u8` value
/// ## Errors
/// Returns `None` when the `u8` doesn't correspond to any of the
/// response codes
pub fn from_u8(code: u8) -> Option<Self> { pub fn from_u8(code: u8) -> Option<Self> {
use ResponseCodes::*; use ResponseCodes::*;
let c = match code { let c = match code {
@ -121,7 +183,9 @@ impl ResponseCodes {
} }
} }
/// Any object implementing this trait can be converted into a response
pub trait ResponseBytes { pub trait ResponseBytes {
/// Return a `Vec<u8>` with the response
fn response_bytes(&self) -> Vec<u8>; fn response_bytes(&self) -> Vec<u8>;
} }
@ -148,21 +212,32 @@ impl ResponseBytes for ResponseCodes {
} }
} }
/// Query methods that can be used by the client
#[derive(Debug, PartialEq)] #[derive(Debug, PartialEq)]
pub enum QueryMethod { pub enum QueryMethod {
/// A `GET` query
GET, GET,
/// A `SET` query
SET, SET,
/// An `UPDATE` query
UPDATE, UPDATE,
/// A `DEL` query
DEL, DEL,
} }
/// The query metaframe
#[derive(Debug, PartialEq)] #[derive(Debug, PartialEq)]
pub struct QueryMetaframe { pub struct QueryMetaframe {
/// The content size that is to be read by the server, from the data packet
content_size: usize, content_size: usize,
/// The query method that the client has issued to the server
method: QueryMethod, method: QueryMethod,
} }
impl QueryMetaframe { impl QueryMetaframe {
/// Create a query metaframe instance from a `String` buffer
/// ## Errors
/// Returns `ResponseCodes` which dictate what error has occurred
pub fn from_buffer(buf: &String) -> Result<QueryMetaframe, ResponseCodes> { pub fn from_buffer(buf: &String) -> Result<QueryMetaframe, ResponseCodes> {
let mf_parts: Vec<&str> = buf.split(MF_SEPARATOR).collect(); let mf_parts: Vec<&str> = buf.split(MF_SEPARATOR).collect();
if mf_parts.len() != 5 { if mf_parts.len() != 5 {
@ -200,18 +275,26 @@ impl QueryMetaframe {
method, method,
}) })
} }
/// Get the number of bytes that the server should expect from the dataframe
pub fn get_content_size(&self) -> usize { pub fn get_content_size(&self) -> usize {
self.content_size self.content_size
} }
/// Get the method to be used
pub fn get_method(&self) -> &QueryMethod { pub fn get_method(&self) -> &QueryMethod {
&self.method &self.method
} }
} }
/// A `Dataframe` is simply treated as a blob which contains bytes in the form
/// of a `String`
#[derive(Debug)] #[derive(Debug)]
pub struct Dataframe(String); pub struct Dataframe(String);
impl Dataframe { impl Dataframe {
/// Create a new `Dataframe` instance from a `Vec<u8>` buffer
/// ## Errors
/// When the `target_size` is not the same as the size of the buffer, this
/// returns a `CorruptDataframe` response code
pub fn from_buffer(target_size: usize, buffer: Vec<u8>) -> Result<Dataframe, ResponseCodes> { pub fn from_buffer(target_size: usize, buffer: Vec<u8>) -> Result<Dataframe, ResponseCodes> {
let buffer = String::from_utf8_lossy(&buffer); let buffer = String::from_utf8_lossy(&buffer);
let buffer = buffer.trim(); let buffer = buffer.trim();
@ -220,6 +303,7 @@ impl Dataframe {
} }
Ok(Dataframe(buffer.to_string())) Ok(Dataframe(buffer.to_string()))
} }
/// Deflatten the dataframe into a `Vec` of actions/identifiers/bytes
pub fn deflatten(&self) -> Vec<&str> { pub fn deflatten(&self) -> Vec<&str> {
self.0.split_whitespace().collect() self.0.split_whitespace().collect()
} }
@ -228,11 +312,8 @@ impl Dataframe {
#[cfg(test)] #[cfg(test)]
#[test] #[test]
fn test_metaframe() { fn test_metaframe() {
use std::io::Write;
let v = Version(0, 1, 0); let v = Version(0, 1, 0);
let mut goodframe = String::from("TP/0.1.0/Q/GET/5"); let mut goodframe = String::from("TP/0.1.0/Q/GET/5");
// let mut goodframe = [0u8; DEF_Q_META_BUFSIZE];
// write!(&mut goodframe[..], "TP/0.1.1/Q/GET/5").unwrap();
let res = QueryMetaframe::from_buffer(&goodframe); let res = QueryMetaframe::from_buffer(&goodframe);
let mf_should_be = QueryMetaframe { let mf_should_be = QueryMetaframe {
content_size: 5, content_size: 5,
@ -244,7 +325,6 @@ fn test_metaframe() {
#[cfg(test)] #[cfg(test)]
#[test] #[test]
fn benchmark_metaframe_parsing() { fn benchmark_metaframe_parsing() {
use std::io::Write;
let v = Version(0, 1, 0); let v = Version(0, 1, 0);
use devtimer::run_benchmark; use devtimer::run_benchmark;
use rand::Rng; use rand::Rng;
@ -253,8 +333,6 @@ fn benchmark_metaframe_parsing() {
(0..50000).for_each(|_| { (0..50000).for_each(|_| {
let s = rng.gen_range(0, usize::MAX); let s = rng.gen_range(0, usize::MAX);
let mut buf = format!("TP/0.1.0/Q/GET/5/{}", s); let mut buf = format!("TP/0.1.0/Q/GET/5/{}", s);
// let mut buf = [0u8; DEF_Q_META_BUFSIZE];
// write!(&mut buf[..], "TP/0.1.1/Q/GET/{}", s).unwrap();
metaframes.push(buf); metaframes.push(buf);
}); });
let b = run_benchmark(50000, |n| { let b = run_benchmark(50000, |n| {
@ -263,17 +341,25 @@ fn benchmark_metaframe_parsing() {
b.print_stats(); b.print_stats();
} }
/// Errors that may occur when parsing a response packet from the server
pub enum ResultError { pub enum ResultError {
/// A standard response code used by the Terrapipe protocol
StandardError(ResponseCodes), StandardError(ResponseCodes),
/// Some nonsense response code that may be returned by a buggy or patched server
UnknownError(String), UnknownError(String),
} }
/// A result metaframe
pub struct ResultMetaframe { pub struct ResultMetaframe {
content_size: usize, content_size: usize,
response: ResponseCodes, response: ResponseCodes,
} }
impl ResultMetaframe { impl ResultMetaframe {
/// Instantiate a new metaframe from a `String` buffer
/// ## Errors
/// Returns a `ResultError` in case something happened while parsing the
/// response packet's metaframe
pub fn from_buffer(buf: String) -> Result<ResultMetaframe, ResultError> { pub fn from_buffer(buf: String) -> Result<ResultMetaframe, ResultError> {
use ResultError::*; use ResultError::*;
let mf_parts = buf.trim(); let mf_parts = buf.trim();

Loading…
Cancel
Save