Merge pull request #9 from terrabasedb/fix-buffer

Implement read buffering
next
Sayan 4 years ago committed by GitHub
commit 4e26fe2050
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -2,6 +2,12 @@
All changes in this project will be noted in this file.
## Version 0.3.1 [2020-07-30]
> No breaking changes
This release fixes #7
## Version 0.3.0 [2020-07-28]
> No breaking changes

78
Cargo.lock generated

@ -51,17 +51,11 @@ dependencies = [
[[package]]
name = "corelib"
version = "0.2.0"
version = "0.3.1"
dependencies = [
"lazy_static",
]
[[package]]
name = "devtimer"
version = "4.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6035b7b9244bf9637cd7ef80b5e1c54404bef92cccd34738c85c45f04ae8b244"
[[package]]
name = "fnv"
version = "1.0.7"
@ -90,17 +84,6 @@ version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "59f5fff90fd5d971f936ad674802482ba441b6f09ba5e15fd8b39145582ca399"
[[package]]
name = "getrandom"
version = "0.1.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7abc8dd8451921606d809ba32e95b6111925cd2906060d2dcc29c070220503eb"
dependencies = [
"cfg-if",
"libc",
"wasi",
]
[[package]]
name = "hermit-abi"
version = "0.1.15"
@ -288,12 +271,6 @@ version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "282adbf10f2698a7a77f8e983a74b2d18176c19a7fd32a45446139ae7b02b715"
[[package]]
name = "ppv-lite86"
version = "0.2.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "237a5ed80e274dbc66f86bd59c1e25edc039660be53194b5fe0a482e0f2612ea"
[[package]]
name = "proc-macro2"
version = "1.0.18"
@ -312,47 +289,6 @@ dependencies = [
"proc-macro2",
]
[[package]]
name = "rand"
version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6a6b1679d49b24bbfe0c803429aa1874472f50d9b363131f0e89fc356b544d03"
dependencies = [
"getrandom",
"libc",
"rand_chacha",
"rand_core",
"rand_hc",
]
[[package]]
name = "rand_chacha"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f4c8ed856279c9737206bf725bf36935d8666ead7aa69b52be55af369d193402"
dependencies = [
"ppv-lite86",
"rand_core",
]
[[package]]
name = "rand_core"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "90bde5296fc891b0cef12a6d03ddccc162ce7b2aff54160af9338f8d40df6d19"
dependencies = [
"getrandom",
]
[[package]]
name = "rand_hc"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ca3129af7b92a17112d59ad498c6f81eaf463253766b90396d39ea7a39d6613c"
dependencies = [
"rand_core",
]
[[package]]
name = "redox_syscall"
version = "0.1.57"
@ -418,7 +354,7 @@ dependencies = [
[[package]]
name = "tdb"
version = "0.3.0"
version = "0.3.1"
dependencies = [
"bincode",
"bytes",
@ -464,11 +400,9 @@ dependencies = [
[[package]]
name = "tsh"
version = "0.2.0"
version = "0.3.1"
dependencies = [
"corelib",
"devtimer",
"rand",
"tokio",
]
@ -478,12 +412,6 @@ version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7fe0bb3479651439c9112f72b6c505038574c9fbb575ed1bf3b797fa39dd564"
[[package]]
name = "wasi"
version = "0.9.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cccddf32554fecc6acb585f82a32a72e28b48f8c4c1883ddfeeeaa96f7d8e519"
[[package]]
name = "winapi"
version = "0.2.8"

@ -22,7 +22,7 @@ The releases are uploaded in bundles, for example, `tdb-bundle-v0.2.0-x86_64-unk
* Download a bundle for your platform from [releases](https://github.com/terrabasedb/terrabase/releases)
* Unzip the downloaded bundle
* Run `chmod +x tdb tsh` (on Unix systems)
* Make the files executable (run `chmod +x tdb tsh` on Unix systems)
* Start the database server by running `./tdb`
* Start the client by running `./tsh`
* You can run commands like `SET sayan 17` , `GET cat` , `UPDATE cat 100` or `DEL cat` !

@ -1,6 +1,6 @@
[package]
name = "tsh"
version = "0.2.0"
version = "0.3.1"
authors = ["Sayan Nandan <ohsayan@outlook.com>"]
edition = "2018"

@ -1,6 +1,6 @@
[package]
name = "corelib"
version = "0.2.0"
version = "0.3.1"
authors = ["Sayan Nandan <ohsayan@outlook.com>"]
edition = "2018"

@ -304,7 +304,9 @@ fn test_simple_response() {
s.add_data("bytes".to_owned());
assert_eq!(
String::from_utf8_lossy(&s.into_response()),
String::from("*!0!39!16\n5#5#3#2#3#4#4#5#\nSayan\nloves\nyou\nif\nyou\nsend\nUTF8\nbytes\n")
String::from(
"*!0!39!16\n5#5#3#2#3#4#4#5#\nSayan\nloves\nyou\nif\nyou\nsend\nUTF8\nbytes\n"
)
);
}
@ -345,9 +347,10 @@ impl SimpleQuery {
let ref mut layout = self.metalayout;
let ref mut df = self.dataframe;
let len = cmd.len().to_string();
// Include the newline character in total size
self.size_tracker += cmd.len() + 1;
layout.push_str(&len);
layout.push('#');
layout.push_str(&len);
df.push_str(cmd);
df.push('\n');
}
@ -359,7 +362,7 @@ impl SimpleQuery {
"{}{}!{}\n{}\n{}",
self.metaline,
self.size_tracker,
self.metalayout.len(),
self.metalayout.len() + 1, // include the new line character
self.metalayout,
self.dataframe
)

@ -1,6 +1,6 @@
[package]
name = "tdb"
version = "0.3.0"
version = "0.3.1"
authors = ["Sayan Nandan <ohsayan@outlook.com>"]
edition = "2018"

@ -19,7 +19,7 @@
*
*/
use crate::protocol::QueryDataframe;
use crate::protocol::Query;
use bincode;
use corelib::terrapipe::{tags, ActionType, RespBytes, RespCodes, ResponseBuilder};
use corelib::TResult;
@ -91,7 +91,7 @@ impl CoreDB {
}
/// Execute a query that has already been validated by `Connection::read_query`
pub fn execute_query(&self, df: QueryDataframe) -> Vec<u8> {
pub fn execute_query(&self, df: Query) -> Vec<u8> {
match df.actiontype {
ActionType::Simple => self.execute_simple(df.data),
// TODO(@ohsayan): Pipeline commands haven't been implemented yet
@ -188,6 +188,9 @@ impl CoreDB {
RespCodes::ArgumentError.into_response()
}
/// Create a new `CoreDB` instance
///
/// This also checks if a local backup of previously saved data is available.
/// If it is - it restores the data. Otherwise it creates a new in-memory table
pub fn new() -> TResult<Self> {
let coretable = CoreDB::get_saved()?;
if let Some(coretable) = coretable {
@ -214,12 +217,14 @@ impl CoreDB {
fn acquire_read(&self) -> RwLockReadGuard<'_, HashMap<String, String>> {
self.shared.coremap.read()
}
/// Flush the contents of the in-memory table onto disk
pub fn flush_db(&self) -> TResult<()> {
let encoded = bincode::serialize(&*self.acquire_read())?;
let mut file = fs::File::create("./data.bin")?;
file.write_all(&encoded)?;
Ok(())
}
/// Try to get the saved data from disk
pub fn get_saved() -> TResult<Option<HashMap<String, String>>> {
let file = match fs::read("./data.bin") {
Ok(f) => f,

@ -19,7 +19,8 @@
*
*/
use crate::{Connection, CoreDB};
use crate::protocol::{Connection, QueryResult::*};
use crate::CoreDB;
use corelib::TResult;
use std::future::Future;
use std::process;
@ -143,8 +144,12 @@ impl CHandler {
}
};
match try_df {
Ok(df) => self.con.write_response(self.db.execute_query(df)).await,
Err(e) => return self.con.close_conn_with_error(e).await,
Ok(Q(s)) => self.con.write_response(self.db.execute_query(s)).await,
Ok(E(r)) => self.con.close_conn_with_error(r).await,
Err(e) => {
eprintln!("Error: {}", e);
return;
}
}
}
}

@ -25,7 +25,6 @@ mod dbnet;
mod protocol;
use coredb::CoreDB;
use dbnet::run;
use protocol::Connection;
use tokio::signal;
static ADDR: &'static str = "127.0.0.1:2003";

@ -1,160 +0,0 @@
/*
* Created on Sat Jul 18 2020
*
* This file is a part of the source code for the Terrabase database
* Copyright (c) 2020, Sayan Nandan <ohsayan at outlook dot com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
use corelib::terrapipe::{extract_idents, get_sizes, ActionType};
use corelib::terrapipe::{RespBytes, RespCodes, DEF_QMETALINE_BUFSIZE};
use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
use tokio::net::TcpStream;
/// The query dataframe
#[derive(Debug)]
pub struct QueryDataframe {
/// The data part
pub data: Vec<String>,
/// The query action type
pub actiontype: ActionType,
}
#[derive(Debug, PartialEq)]
pub struct PreQMF {
/// The type of action: Simple/Pipelined
action_type: ActionType,
/// The content size excluding the metaline length
content_size: usize,
/// The length of the metaline
metaline_size: usize,
}
impl PreQMF {
/// Create a new PreQueryMetaframe from a `String`
/// ## Errors
/// This returns `Respcodes` as an error and hence this error can be directly
/// written to the stream
pub fn from_buffer(buf: String) -> Result<Self, RespCodes> {
let buf: Vec<&str> = buf.split('!').collect();
if let (Some(atype), Some(csize), Some(metaline_size)) =
(buf.get(0), buf.get(1), buf.get(2))
{
if let Some(atype) = atype.chars().next() {
let atype = match atype {
'*' => ActionType::Simple,
'$' => ActionType::Pipeline,
_ => return Err(RespCodes::InvalidMetaframe),
};
let csize = csize.trim().trim_matches(char::from(0));
let metaline_size = metaline_size.trim().trim_matches(char::from(0));
if let (Ok(csize), Ok(metaline_size)) =
(csize.parse::<usize>(), metaline_size.parse::<usize>())
{
return Ok(PreQMF {
action_type: atype,
content_size: csize,
metaline_size,
});
}
}
}
Err(RespCodes::InvalidMetaframe)
}
}
#[cfg(test)]
#[test]
fn test_preqmf() {
let read_what = "*!12!4".to_owned();
let preqmf = PreQMF::from_buffer(read_what).unwrap();
let pqmf_should_be = PreQMF {
action_type: ActionType::Simple,
content_size: 12,
metaline_size: 4,
};
assert_eq!(pqmf_should_be, preqmf);
let a_pipe = "$!12!4".to_owned();
let preqmf = PreQMF::from_buffer(a_pipe).unwrap();
let pqmf_should_be = PreQMF {
action_type: ActionType::Pipeline,
content_size: 12,
metaline_size: 4,
};
assert_eq!(preqmf, pqmf_should_be);
}
/// A TCP connection wrapper
pub struct Connection {
stream: TcpStream,
}
impl Connection {
/// Initiailize a new `Connection` instance
pub fn new(stream: TcpStream) -> Self {
Connection { stream }
}
/// Read a query
///
/// This will return a QueryDataframe if parsing is successful - otherwise
/// it returns a `RespCodes` variant which can be converted into a response
pub async fn read_query(&mut self) -> Result<QueryDataframe, RespCodes> {
let mut bufreader = BufReader::new(&mut self.stream);
let mut metaline_buf = String::with_capacity(DEF_QMETALINE_BUFSIZE);
// First read the metaline
// TODO(@ohsayan): We will use a read buffer in the future and then do all the
// actions below to improve efficiency - it would be way more efficient
bufreader.read_line(&mut metaline_buf).await.unwrap();
let pqmf = PreQMF::from_buffer(metaline_buf)?;
let (mut metalayout_buf, mut dataframe_buf) = (
String::with_capacity(pqmf.metaline_size),
vec![0; pqmf.content_size],
);
// Read the metalayout
bufreader.read_line(&mut metalayout_buf).await.unwrap();
let ss = get_sizes(metalayout_buf)?;
// Read the dataframe
bufreader.read(&mut dataframe_buf).await.unwrap();
let qdf = QueryDataframe {
data: extract_idents(dataframe_buf, ss),
actiontype: pqmf.action_type,
};
Ok(qdf)
}
/// Write a response to the stream
pub async fn write_response(&mut self, resp: Vec<u8>) {
if let Err(_) = self.stream.write_all(&resp).await {
eprintln!(
"Error while writing to stream: {:?}",
self.stream.peer_addr()
);
return;
}
// Flush the stream to make sure that the data was delivered
if let Err(_) = self.stream.flush().await {
eprintln!(
"Error while flushing data to stream: {:?}",
self.stream.peer_addr()
);
return;
}
}
/// Wraps around the `write_response` used to differentiate between a
/// success response and an error response
pub async fn close_conn_with_error(&mut self, bytes: impl RespBytes) {
self.write_response(bytes.into_response()).await
}
}

@ -0,0 +1,334 @@
/*
* Created on Thu Jul 30 2020
*
* This file is a part of the source code for the Terrabase database
* Copyright (c) 2020, Sayan Nandan <ohsayan at outlook dot com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
//! This module provides deserialization primitives for query packets
use bytes::BytesMut;
use corelib::terrapipe::DEF_QMETALINE_BUFSIZE;
use corelib::terrapipe::{ActionType, RespCodes};
use std::io::Cursor;
/// Result of parsing a query
/// This is **not** the same as `std`'s `Result<T, E>` but instead is an `enum`
/// which represents the outcome when a query is parsed
#[derive(Debug)]
pub enum QueryParseResult {
/// A successfully parsed `Query` along with an `usize` which specifies
/// the amount by which the `buffer` must advance
Parsed((Query, usize)),
/// The query parsing failed and returned a Response code as an error
RespCode(RespCodes),
/// The query packet is incomplete
Incomplete,
}
/// A navigator is a wrapper around a `Cursor` which efficiently navigates over
/// a mutable `BytesMut` object
pub struct Navigator<'a> {
/// The cursor
cursor: Cursor<&'a [u8]>,
}
impl<'a> Navigator<'a> {
/// Create a new `Navigator` instance
pub fn new(buffer: &'a mut BytesMut) -> Self {
Navigator {
cursor: Cursor::new(&buffer[..]),
}
}
/// Get a line from a buffer
///
/// The `beforehint` argument provides a clue to the `Navigator` about the
/// point till which the line must end. This prevents checking the entire buffer.
/// Note that this `beforehint` is optional and in case no hint as available,
/// just pass `None`
pub fn get_line(&mut self, beforehint: Option<usize>) -> Option<&'a [u8]> {
let ref mut cursor = self.cursor;
let start = cursor.position() as usize;
let end = match beforehint {
// The end will be the current position + the moved position
Some(hint) => (start + hint),
None => cursor.get_ref().len() - 1,
};
for i in start..end {
// If the current character is a `\n` byte, then return this slice
if cursor.get_ref()[i] == b'\n' {
if let Some(slice) = cursor.get_ref().get(start..i) {
// Only move the cursor ahead if the bytes could be fetched
// otherwise the next time we try to get anything, the
// cursor would crash. If we don't change the cursor position
// we will keep moving over stale data
cursor.set_position((i + 1) as u64);
return Some(slice);
}
}
}
// If we are here, then the slice couldn't be extracted,
None
}
/// Get an exact number of bytes from a buffer
pub fn get_exact(&mut self, exact: usize) -> Option<&'a [u8]> {
let ref mut cursor = self.cursor;
// The start position should be set to the current position of the
// cursor, otherwise we'll move from start, which is erroneous
let start = cursor.position() as usize;
// The end position will be the current position + number of bytes to be read
let end = start + exact;
if let Some(chunk) = cursor.get_ref().get(start..end) {
// Move the cursor ahead - only if we could get the slice
self.cursor.set_position(end as u64);
Some(chunk)
} else {
// If we're here, then the slice couldn't be extracted, probably
// because it doesn't exist. Return `None`
None
}
}
/// Get the cursor's position as an `usize`
fn get_pos_usize(&self) -> usize {
self.cursor.position() as usize
}
}
/// A metaline object which represents a metaline in the Terrapipe protocol's
/// query packet
struct Metaline {
/// The content size, inclusive of the newlines. This is sent by the client
/// driver
content_size: usize,
/// The metaline size, inclusive of the newline character. This is also sent
/// by the client driver
metalayout_size: usize,
/// The action type - whether it is a pipelined operation or a simple query
actiontype: ActionType,
}
impl Metaline {
/// Create a new metaline from a `Navigator` instance
///
/// This will use the navigator to extract the metaline
pub fn from_navigator(nav: &mut Navigator) -> Option<Self> {
if let Some(mline) = nav.get_line(Some(DEF_QMETALINE_BUFSIZE)) {
// The minimum metaline length is five characters
// if not - clearly something is wrong
if mline.len() < 5 {
println!("Did we?");
return None;
}
// The first byte is always a `*` or `$` depending on the
// type of query
let actiontype = match mline[0] {
b'$' => ActionType::Pipeline,
b'*' => ActionType::Simple,
_ => return None,
};
// Get the frame sizes: the first index is the content size
// and the second index is the metalayout size
if let Some(sizes) = get_frame_sizes(&mline[1..]) {
return Some(Metaline {
content_size: sizes[0],
metalayout_size: sizes[1],
actiontype,
});
}
}
None
}
}
/// A metalayout object which represents the Terrapipe protocol's metalayout line
///
/// This is nothing more than a wrapper around `Vec<usize>` which provides a more
/// convenient API
#[derive(Debug)]
struct Metalayout(Vec<usize>);
impl Metalayout {
/// Create a new metalayout from a `Navigator` instance
///
/// This uses the navigator to navigate over the buffer
pub fn from_navigator(nav: &mut Navigator, mlayoutsize: usize) -> Option<Self> {
// We pass `mlayoutsize` to `get_line` since we already know where the
// metalayout ends
if let Some(layout) = nav.get_line(Some(mlayoutsize)) {
if let Some(skip_sequence) = get_skip_sequence(&layout) {
return Some(Metalayout(skip_sequence));
}
}
None
}
}
/// # A `Query` object
#[derive(Debug, PartialEq)]
pub struct Query {
/// A stream of tokens parsed from the dataframe
pub data: Vec<String>,
/// The type of query - `Simple` or `Pipeline`
pub actiontype: ActionType,
}
impl Query {
/// Create a new `Query` instance from a `Navigator`
///
/// This function will use the private `Metalayout` and `Metaline` objects
/// to extract information on the format of the dataframe and then it will
/// parse the dataframe itself
pub fn from_navigator(mut nav: Navigator) -> QueryParseResult {
if let Some(metaline) = Metaline::from_navigator(&mut nav) {
if let Some(metalayout) = Metalayout::from_navigator(&mut nav, metaline.metalayout_size)
{
if let Some(content) = nav.get_exact(metaline.content_size) {
let data = extract_idents(content, metalayout.0);
// Return the parsed query and the amount by which the buffer
// must `advance`
return QueryParseResult::Parsed((
Query {
data,
actiontype: metaline.actiontype,
},
nav.get_pos_usize(),
));
} else {
// Since we couldn't get the slice, this means that the
// query packet was incomplete, return that error
return QueryParseResult::Incomplete;
}
}
}
// If we're here - it clearly means that the metaline/metalayout failed
// to parse - we return a standard invalid metaframe `RespCodes`
QueryParseResult::RespCode(RespCodes::InvalidMetaframe)
}
}
/// Get the frame sizes from a metaline
fn get_frame_sizes(metaline: &[u8]) -> Option<Vec<usize>> {
if let Some(s) = extract_sizes_splitoff(metaline, b'!', 2) {
if s.len() == 2 {
Some(s)
} else {
None
}
} else {
None
}
}
/// Get the skip sequence from the metalayout line
fn get_skip_sequence(metalayout: &[u8]) -> Option<Vec<usize>> {
let l = metalayout.len() / 2;
extract_sizes_splitoff(metalayout, b'#', l)
}
/// Extract `usize`s from any buffer which when converted into UTF-8
/// looks like: '<SEP>123<SEP>456<SEP>567\n', where `<SEP>` is the separator
/// which in the case of the metaline is a `0x21` byte or a `0x23` byte in the
/// case of the metalayout line
fn extract_sizes_splitoff(buf: &[u8], splitoff: u8, sizehint: usize) -> Option<Vec<usize>> {
let mut sizes = Vec::with_capacity(sizehint);
let len = buf.len();
let mut i = 0;
while i < len {
if buf[i] == splitoff {
// This is a hash
let mut res: usize = 0;
// Move to the next element
i = i + 1;
while i < len {
// Only proceed if the current byte is not the separator
if buf[i] != splitoff {
// Make sure we don't go wrong here
// 48 is the unicode byte for 0 so 48-48 should give 0
// Also the subtraction shouldn't give something greater
// than 9, otherwise it is a different character
let num: usize = match buf[i].checked_sub(48) {
Some(s) => s.into(),
None => return None,
};
if num > 9 {
return None;
}
res = res * 10 + num;
i = i + 1;
continue;
} else {
break;
}
}
sizes.push(res.into());
continue;
} else {
// Technically, we should never reach here, but if we do
// clearly, it's an error by the client-side driver
return None;
}
}
Some(sizes)
}
/// Extract the tokens from the slice using the `skip_sequence`
fn extract_idents(buf: &[u8], skip_sequence: Vec<usize>) -> Vec<String> {
skip_sequence
.into_iter()
.scan(buf.into_iter(), |databuf, size| {
let tok: Vec<u8> = databuf.take(size).map(|val| *val).collect();
let _ = databuf.next();
// FIXME(@ohsayan): This is quite slow, we'll have to use SIMD in the future
Some(String::from_utf8_lossy(&tok).to_string())
})
.collect()
}
#[cfg(test)]
#[test]
fn test_navigator() {
use bytes::BytesMut;
let mut mybytes = BytesMut::from("*!5!2\n1#\nHEYA\n".as_bytes());
let mut nav = Navigator::new(&mut mybytes);
assert_eq!(Some("*!5!2".as_bytes()), nav.get_line(Some(46)));
assert_eq!(Some("1#".as_bytes()), nav.get_line(Some(3)));
assert_eq!(Some("HEYA".as_bytes()), nav.get_line(Some(5)));
}
#[cfg(test)]
#[test]
fn test_query() {
use bytes::{Buf, BytesMut};
let mut mybuf = BytesMut::from("*!14!7\n#3#5#3\nSET\nsayan\n123\n".as_bytes());
let resulting_data_should_be: Vec<String> = "SET sayan 123"
.split_whitespace()
.map(|val| val.to_string())
.collect();
let nav = Navigator::new(&mut mybuf);
let query = Query::from_navigator(nav);
if let QueryParseResult::Parsed((query, forward)) = query {
assert_eq!(
query,
Query {
data: resulting_data_should_be,
actiontype: ActionType::Simple,
}
);
mybuf.advance(forward);
assert_eq!(mybuf.len(), 0);
} else {
panic!("Query parsing failed");
}
}

@ -0,0 +1,127 @@
/*
* Created on Thu Jul 30 2020
*
* This file is a part of the source code for the Terrabase database
* Copyright (c) 2020, Sayan Nandan <ohsayan at outlook dot com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
mod deserializer;
use bytes::{Buf, BytesMut};
use corelib::terrapipe::RespBytes;
use deserializer::Navigator;
pub use deserializer::{
Query,
QueryParseResult::{self, *},
};
use std::io::Result as IoResult;
use std::net::SocketAddr;
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufWriter};
use tokio::net::TcpStream;
/// The size of the read buffer in bytes
const BUF_CAP: usize = 8 * 1024; // 8 KB per-connection
/// A TCP connection wrapper
pub struct Connection {
/// The connection to the remote socket, wrapped in a buffer to speed
/// up writing
stream: BufWriter<TcpStream>,
/// The in-memory read buffer. The size is given by `BUF_CAP`
buffer: BytesMut,
}
/// The outcome of running `Connection`'s `try_query` function
pub enum QueryResult {
/// A parsed `Query` object
Q(Query),
/// An error response
E(Vec<u8>),
}
impl Connection {
/// Initiailize a new `Connection` instance
pub fn new(stream: TcpStream) -> Self {
Connection {
stream: BufWriter::new(stream),
buffer: BytesMut::with_capacity(BUF_CAP),
}
}
/// Read a query from the remote end
///
/// This function asynchronously waits until all the data required
/// for parsing the query is available
pub async fn read_query(&mut self) -> Result<QueryResult, String> {
self.read_again().await?;
loop {
match self.try_query() {
Parsed((query, forward)) => {
self.buffer.advance(forward);
return Ok(QueryResult::Q(query));
}
RespCode(r) => return Ok(QueryResult::E(r.into_response())),
_ => (),
}
self.read_again().await?;
}
}
/// Try to parse a query from the buffered data
fn try_query(&mut self) -> QueryParseResult {
let nav = Navigator::new(&mut self.buffer);
Query::from_navigator(nav)
}
/// Try to fill the buffer again
async fn read_again(&mut self) -> Result<(), String> {
match self.stream.read_buf(&mut self.buffer).await {
Ok(0) => {
// If 0 bytes were received, then the remote end closed
// the connection
if self.buffer.is_empty() {
return Err(format!("{:?} didn't send any data", self.get_peer()).into());
} else {
return Err(format!(
"Connection reset while reading from: {:?}",
self.get_peer()
)
.into());
}
}
Ok(_) => Ok(()),
Err(e) => return Err(format!("{}", e)),
}
}
/// Get the peer address
fn get_peer(&self) -> IoResult<SocketAddr> {
self.stream.get_ref().peer_addr()
}
/// Write a response to the stream
pub async fn write_response(&mut self, resp: Vec<u8>) {
if let Err(_) = self.stream.write_all(&resp).await {
eprintln!("Error while writing to stream: {:?}", self.get_peer());
return;
}
// Flush the stream to make sure that the data was delivered
if let Err(_) = self.stream.flush().await {
eprintln!("Error while flushing data to stream: {:?}", self.get_peer());
return;
}
}
/// Wraps around the `write_response` used to differentiate between a
/// success response and an error response
pub async fn close_conn_with_error(&mut self, bytes: Vec<u8>) {
self.write_response(bytes).await
}
}
Loading…
Cancel
Save