Refactor dbnet and protocol into logical modules

next
Sayan Nandan 3 years ago
parent eb71335a62
commit ef36351428

@ -24,10 +24,10 @@
*
*/
use crate::dbnet::con::prelude::*;
use crate::diskstore;
use crate::diskstore::snapshot::SnapshotEngine;
use crate::diskstore::snapshot::DIR_SNAPSHOT;
use crate::protocol::con::prelude::*;
use crate::protocol::responses;
use std::hint::unreachable_unchecked;
use std::path::{Component, PathBuf};

@ -29,8 +29,8 @@
use crate::config::BGSave;
use crate::config::SnapshotConfig;
use crate::config::SnapshotPref;
use crate::dbnet::con::prelude::*;
use crate::diskstore;
use crate::protocol::con::prelude::*;
use crate::protocol::Query;
use crate::queryengine;
use bytes::Bytes;

@ -24,18 +24,18 @@
*
*/
use super::deserializer;
use super::responses;
use super::tcp::Connection;
use crate::dbnet::tls::SslConnection;
use crate::dbnet::Terminator;
use crate::protocol::tls::SslConnection;
use crate::protocol::Connection;
use crate::protocol::ParseResult;
use crate::protocol::QueryResult;
use crate::protocol;
use crate::protocol::responses;
use crate::resp::Writable;
use crate::CoreDB;
use bytes::Buf;
use bytes::BytesMut;
use libsky::TResult;
use protocol::ParseResult;
use protocol::QueryResult;
use std::future::Future;
use std::io::Error as IoError;
use std::io::ErrorKind;
@ -90,7 +90,7 @@ where
if self.get_buffer().is_empty() {
return Err(());
}
Ok(deserializer::parse(&self.get_buffer()))
Ok(protocol::parse(&self.get_buffer()))
}
/// Read a query from the remote end
///

@ -43,9 +43,9 @@ use crate::config::BGSave;
use crate::config::PortConfig;
use crate::config::SnapshotConfig;
use crate::config::SslOpts;
use crate::dbnet::tcp::Listener;
use crate::diskstore::snapshot::DIR_REMOTE_SNAPSHOT;
use crate::protocol::tls::SslListener;
use crate::protocol::Listener;
mod tcp;
use crate::CoreDB;
use libsky::util::terminal;
use libsky::TResult;
@ -56,9 +56,12 @@ use std::net::IpAddr;
use std::path::PathBuf;
use std::process;
use std::sync::Arc;
use tls::SslListener;
use tokio::net::TcpListener;
use tokio::sync::Semaphore;
use tokio::sync::{broadcast, mpsc};
pub mod con;
mod tls;
/// Responsible for gracefully shutting down the server instead of dying randomly
// Sounds very sci-fi ;)

@ -0,0 +1,125 @@
/*
* Created on Mon Apr 26 2021
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2021, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
use crate::dbnet::con::ConnectionHandler;
use crate::dbnet::Terminator;
use crate::protocol;
use crate::CoreDB;
use bytes::BytesMut;
use libsky::TResult;
use libsky::BUF_CAP;
pub use protocol::ParseResult;
pub use protocol::Query;
use std::sync::Arc;
use std::time::Duration;
use tokio::io::BufWriter;
use tokio::net::TcpListener;
use tokio::net::TcpStream;
use tokio::sync::Semaphore;
use tokio::sync::{broadcast, mpsc};
use tokio::time;
/// A TCP connection wrapper
pub struct Connection {
/// The connection to the remote socket, wrapped in a buffer to speed
/// up writing
pub stream: BufWriter<TcpStream>,
/// The in-memory read buffer. The size is given by `BUF_CAP`
pub buffer: BytesMut,
}
impl Connection {
/// Initiailize a new `Connection` instance
pub fn new(stream: TcpStream) -> Self {
Connection {
stream: BufWriter::new(stream),
buffer: BytesMut::with_capacity(BUF_CAP),
}
}
}
// We'll use the idea of gracefully shutting down from tokio
/// A listener
pub struct Listener {
/// An atomic reference to the coretable
pub db: CoreDB,
/// The incoming connection listener (binding)
pub listener: TcpListener,
/// The maximum number of connections
pub climit: Arc<Semaphore>,
/// The shutdown broadcaster
pub signal: broadcast::Sender<()>,
// When all `Sender`s are dropped - the `Receiver` gets a `None` value
// We send a clone of `terminate_tx` to each `CHandler`
pub terminate_tx: mpsc::Sender<()>,
pub terminate_rx: mpsc::Receiver<()>,
}
impl Listener {
/// Accept an incoming connection
async fn accept(&mut self) -> TResult<TcpStream> {
// We will steal the idea of Ethernet's backoff for connection errors
let mut backoff = 1;
loop {
match self.listener.accept().await {
// We don't need the bindaddr
Ok((stream, _)) => return Ok(stream),
Err(e) => {
if backoff > 64 {
// Too many retries, goodbye user
return Err(e.into());
}
}
}
// Wait for the `backoff` duration
time::sleep(Duration::from_secs(backoff)).await;
// We're using exponential backoff
backoff *= 2;
}
}
/// Run the server
pub async fn run(&mut self) -> TResult<()> {
loop {
// Take the permit first, but we won't use it right now
// that's why we will forget it
self.climit.acquire().await.unwrap().forget();
let stream = self.accept().await?;
let mut chandle = ConnectionHandler::new(
self.db.clone(),
Connection::new(stream),
self.climit.clone(),
Terminator::new(self.signal.subscribe()),
self.terminate_tx.clone(),
);
tokio::spawn(async move {
if let Err(e) = chandle.run().await {
log::error!("Error: {}", e);
}
});
}
}
}

@ -24,8 +24,8 @@
*
*/
use super::con::ConnectionHandler;
use crate::dbnet::Terminator;
use crate::protocol::ConnectionHandler;
use crate::CoreDB;
use bytes::BytesMut;
use libsky::TResult;

@ -24,7 +24,7 @@
*
*/
use crate::protocol::con::prelude::*;
use crate::dbnet::con::prelude::*;
use crate::protocol::responses;
use crate::resp::GroupBegin;

@ -28,7 +28,7 @@
//! This module provides functions to work with `DEL` queries
use crate::protocol::con::prelude::*;
use crate::dbnet::con::prelude::*;
use crate::protocol::responses;
use crate::resp::GroupBegin;

@ -28,7 +28,7 @@
//! This module provides functions to work with `EXISTS` queries
use crate::protocol::con::prelude::*;
use crate::dbnet::con::prelude::*;
use crate::protocol::responses;
use crate::resp::GroupBegin;

@ -25,7 +25,7 @@
*/
use crate::protocol::con::prelude::*;
use crate::dbnet::con::prelude::*;
use crate::protocol::responses;

@ -28,7 +28,7 @@
//! This module provides functions to work with `GET` queries
use crate::protocol::con::prelude::*;
use crate::dbnet::con::prelude::*;
use crate::protocol::responses;
use crate::resp::{BytesWrapper, GroupBegin};
use bytes::Bytes;

@ -29,7 +29,7 @@
//! Functions for handling `JGET` queries
use crate::protocol::con::prelude::*;
use crate::dbnet::con::prelude::*;
use crate::protocol::responses;

@ -24,7 +24,7 @@
*
*/
use crate::protocol::con::prelude::*;
use crate::dbnet::con::prelude::*;
use crate::protocol::responses;
use crate::resp::GroupBegin;

@ -25,7 +25,7 @@
*/
use crate::protocol::con::prelude::*;
use crate::dbnet::con::prelude::*;
use crate::protocol::responses;
use crate::resp::{BytesWrapper, GroupBegin};
use bytes::Bytes;

@ -45,7 +45,7 @@ pub mod uset;
pub mod heya {
//! Respond to `HEYA` queries
use crate::protocol;
use crate::protocol::con::prelude::*;
use crate::dbnet::con::prelude::*;
use protocol::responses;
/// Returns a `HEY!` `Response`
pub async fn heya<T, Strm>(

@ -25,7 +25,7 @@
*/
use crate::coredb::{self};
use crate::protocol::con::prelude::*;
use crate::dbnet::con::prelude::*;
use crate::protocol::responses;
use crate::resp::GroupBegin;

@ -25,7 +25,7 @@
*/
use crate::coredb::{self};
use crate::protocol::con::prelude::*;
use crate::dbnet::con::prelude::*;
use crate::protocol::responses;
use crate::resp::GroupBegin;

@ -28,7 +28,7 @@
//! This module provides functions to work with `SET` queries
use crate::coredb::{self};
use crate::protocol::con::prelude::*;
use crate::dbnet::con::prelude::*;
use crate::protocol::responses;
use coredb::Data;

@ -36,7 +36,7 @@
//! Do note that this isn't the same as the gurantees provided by ACID transactions
use crate::coredb::Data;
use crate::protocol::con::prelude::*;
use crate::dbnet::con::prelude::*;
use crate::protocol::responses;
use std::hint::unreachable_unchecked;

@ -28,7 +28,7 @@
//! This module provides functions to work with `UPDATE` queries
//!
use crate::coredb::{self};
use crate::protocol::con::prelude::*;
use crate::dbnet::con::prelude::*;
use crate::protocol::responses;
use coredb::Data;

@ -25,7 +25,7 @@
*/
use crate::coredb::{self};
use crate::protocol::con::prelude::*;
use crate::dbnet::con::prelude::*;
use crate::protocol::responses;
use crate::resp::GroupBegin;

@ -1,388 +0,0 @@
/*
* Created on Thu Jul 30 2020
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2020, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
//! This module provides deserialization primitives for query packets
/*
NOTE: I haven't used any recursion, because:
1. I like things to be explicit
2. I don't like huge stacks
And that's why I've done, what I've done, here.
*/
use std::vec::IntoIter;
/// # `ActionGroup`
///
/// The `ActionGroup` is an "Action Group" in the dataframe as described by the
/// Terrapipe protocol. An `ActionGroup` contains all the elements required to
/// execute an `Action`. The `ActionGroup` contains the "action" itself.
/// It may look like:
/// ```text
/// ["GET", "x", "y"]
/// ```
#[derive(Debug, PartialEq)]
pub struct ActionGroup(Vec<String>);
impl ActionGroup {
/// Returns how many arguments are there excluding the name of the action
pub fn howmany(&self) -> usize {
self.0.len() - 1
}
pub fn get_first(&self) -> Option<&String> {
self.0.get(0)
}
pub fn get_ref(&self) -> &Vec<String> {
&self.0
}
}
impl IntoIterator for ActionGroup {
type Item = String;
type IntoIter = std::iter::Skip<IntoIter<String>>;
fn into_iter(self) -> <Self as IntoIterator>::IntoIter {
self.0.into_iter().skip(1).into_iter()
}
}
#[derive(Debug, PartialEq)]
pub enum Query {
Simple(ActionGroup),
Pipelined(Vec<ActionGroup>),
}
#[derive(Debug, PartialEq)]
/// Outcome of parsing a query
pub enum ParseResult {
/// The packet is incomplete, i.e more data needs to be read
Incomplete,
/// The packet is corrupted, in the sense that it contains invalid data
BadPacket,
/// A successfully parsed query
///
/// The second field is the number of bytes that should be discarded from the buffer as it has already
/// been read
Query(Query, usize),
}
/// # The Query parser
///
/// The query parser, well, parses query packets! Query packets look like this:
/// ```text
/// #<size_of_next_line>\n
/// *<no_of_actions>\n
/// #<size_of_next_line>\n
/// &<no_of_elements_in_actiongroup>\n
/// #<size_of_next_line>\n
/// element[0]\n
/// #<size_of_next_line>\n
/// element[1]\n
/// ...
/// element[n]\n
/// #<size_of_next_line>\n
/// &<no_of_elements_in_this_actiongroup>\n
/// ...
/// ```
///
pub fn parse(buf: &[u8]) -> ParseResult {
if buf.len() < 6 {
// A packet that has less than 6 characters? Nonsense!
return ParseResult::Incomplete;
}
/*
We first get the metaframe, which looks something like:
```
#<numchars_in_next_line>\n
!<num_of_datagroups>\n
```
*/
let mut pos = 0;
if buf[pos] != b'#' {
return ParseResult::BadPacket;
} else {
pos += 1;
}
let next_line = match read_line_and_return_next_line(&mut pos, &buf) {
Some(line) => line,
None => {
// This is incomplete
return ParseResult::Incomplete;
}
};
pos += 1; // Skip LF
// Find out the number of actions that we have to do
let mut action_size = 0usize;
if next_line[0] == b'*' {
let mut line_iter = next_line.into_iter().skip(1);
while let Some(dig) = line_iter.next() {
let curdig: usize = match dig.checked_sub(48) {
Some(dig) => {
if dig > 9 {
return ParseResult::BadPacket;
} else {
dig.into()
}
}
None => return ParseResult::BadPacket,
};
action_size = (action_size * 10) + curdig;
}
// This line gives us the number of actions
} else {
return ParseResult::BadPacket;
}
let mut items: Vec<ActionGroup> = Vec::with_capacity(action_size);
while pos < buf.len() && items.len() <= action_size {
match buf[pos] {
b'#' => {
pos += 1; // Skip '#'
let next_line = match read_line_and_return_next_line(&mut pos, &buf) {
Some(line) => line,
None => {
// This is incomplete
return ParseResult::Incomplete;
}
}; // Now we have the current line
pos += 1; // Skip the newline
// Move the cursor ahead by the number of bytes that we just read
// Let us check the current char
match next_line[0] {
b'&' => {
// This is an array
// Now let us parse the array size
let mut current_array_size = 0usize;
let mut linepos = 1; // Skip the '&' character
while linepos < next_line.len() {
let curdg: usize = match next_line[linepos].checked_sub(48) {
Some(dig) => {
if dig > 9 {
// If `dig` is greater than 9, then the current
// UTF-8 char isn't a number
return ParseResult::BadPacket;
} else {
dig.into()
}
}
None => {
return ParseResult::BadPacket;
}
};
current_array_size = (current_array_size * 10) + curdg; // Increment the size
linepos += 1; // Move the position ahead, since we just read another char
}
// Now we know the array size, good!
let mut actiongroup = Vec::with_capacity(current_array_size);
// Let's loop over to get the elements till the size of this array
while pos < buf.len() && actiongroup.len() < current_array_size {
let mut element_size = 0usize;
if buf[pos] == b'#' {
pos += 1; // skip the '#' character
} else {
return ParseResult::BadPacket;
}
while pos < buf.len() && buf[pos] != b'\n' {
let curdig: usize = match buf[pos].checked_sub(48) {
Some(dig) => {
if dig > 9 {
// If `dig` is greater than 9, then the current
// UTF-8 char isn't a number
return ParseResult::BadPacket;
} else {
dig.into()
}
}
None => {
return ParseResult::BadPacket;
}
};
element_size = (element_size * 10) + curdig; // Increment the size
pos += 1; // Move the position ahead, since we just read another char
}
pos += 1; // Skip the newline
// We now know the item size
let mut value = String::with_capacity(element_size);
let extracted = match buf.get(pos..pos + element_size) {
Some(s) => s,
None => return ParseResult::Incomplete,
};
pos += element_size; // Move the position ahead
value.push_str(&String::from_utf8_lossy(extracted));
pos += 1; // Skip the newline
actiongroup.push(value);
}
items.push(ActionGroup(actiongroup));
}
_ => {
return ParseResult::BadPacket;
}
}
continue;
}
_ => {
// Since the variant '#' would does all the array
// parsing business, we should never reach here unless
// the packet is invalid
return ParseResult::BadPacket;
}
}
}
if buf.get(pos).is_none() {
// Either more data was sent or some data was missing
if items.len() == action_size {
if items.len() == 1 {
ParseResult::Query(Query::Simple(items.remove(0)), buf.len())
} else {
ParseResult::Query(Query::Pipelined(items), buf.len())
}
} else {
ParseResult::Incomplete
}
} else {
ParseResult::BadPacket
}
}
/// Read a size line and return the following line
///
/// This reads a line that begins with the number, i.e make sure that
/// the **`#` character is skipped**
///
fn read_line_and_return_next_line<'a>(pos: &mut usize, buf: &'a [u8]) -> Option<&'a [u8]> {
let mut next_line_size = 0usize;
while pos < &mut buf.len() && buf[*pos] != b'\n' {
// 48 is the UTF-8 code for '0'
let curdig: usize = match buf[*pos].checked_sub(48) {
Some(dig) => {
if dig > 9 {
// If `dig` is greater than 9, then the current
// UTF-8 char isn't a number
return None;
} else {
dig.into()
}
}
None => return None,
};
next_line_size = (next_line_size * 10) + curdig; // Increment the size
*pos += 1; // Move the position ahead, since we just read another char
}
*pos += 1; // Skip the newline
// We now know the size of the next line
let next_line = match buf.get(*pos..*pos + next_line_size) {
Some(line) => line,
None => {
// This is incomplete
return None;
}
}; // Now we have the current line
// Move the cursor ahead by the number of bytes that we just read
*pos += next_line_size;
Some(next_line)
}
#[test]
fn test_parser() {
let input = "#2\n*1\n#2\n&3\n#3\nGET\n#1\nx\n#2\nex\n"
.to_owned()
.into_bytes();
let res = parse(&input);
let res_should_be = ParseResult::Query(
Query::Simple(ActionGroup(vec![
"GET".to_owned(),
"x".to_owned(),
"ex".to_owned(),
])),
input.len(),
);
assert_eq!(res, res_should_be);
let input = "#2\n*2\n#2\n&3\n#3\nGET\n#1\nx\n#2\nex\n"
.to_owned()
.into_bytes();
let res = parse(&input);
let res_should_be = ParseResult::Incomplete;
assert_eq!(res, res_should_be);
let input = "#2\n*A\n#2\n&3\n#3\nGET\n#1\nx\n#2\nex\n"
.to_owned()
.into_bytes();
let res = parse(&input);
let res_should_be = ParseResult::BadPacket;
assert_eq!(res, res_should_be);
let input = "#2\n*1\n#2\n&3\n#3\nSET\n#19\nbeinghumanisawesome\n#4\ntrue\n"
.as_bytes()
.to_owned();
let res = parse(&input);
let res_should_be = ParseResult::Query(
Query::Simple(ActionGroup(vec![
"SET".to_owned(),
"beinghumanisawesome".to_owned(),
"true".to_owned(),
])),
input.len(),
);
assert_eq!(res, res_should_be);
let input ="#2\n*1\n#3\n&17\n#3\nSET\n#3\none\n#1\n1\n#3\ntwo\n#1\n2\n#5\nthree\n#1\n3\n#4\nfour\n#1\n4\n#4\nfive\n#1\n5\n#3\nsix\n#1\n6\n#5\nseven\n#1\n7\n#5\neight\n#1\n8\n";
let res = parse(&input.to_owned().into_bytes());
let res_should_be = ParseResult::Query(
Query::Simple(ActionGroup(vec![
"SET".to_string(),
"one".to_string(),
"1".to_string(),
"two".to_string(),
"2".to_string(),
"three".to_string(),
"3".to_string(),
"four".to_string(),
"4".to_string(),
"five".to_string(),
"5".to_string(),
"six".to_string(),
"6".to_string(),
"seven".to_string(),
"7".to_string(),
"eight".to_string(),
"8".to_string(),
])),
input.len(),
);
assert_eq!(res, res_should_be);
let input = "#2\n*2\n#2\n&3\n#3\nGET\n#1\nx\n#2\nex\n#2\n&3\n#3\nSET\n#1\nx\n#4\ntrue"
.as_bytes()
.to_owned();
let res = parse(&input);
let res_should_be = ParseResult::Query(
Query::Pipelined(vec![
ActionGroup(vec!["GET".to_owned(), "x".to_owned(), "ex".to_owned()]),
ActionGroup(vec!["SET".to_owned(), "x".to_owned(), "true".to_owned()]),
]),
input.len(),
);
assert_eq!(res, res_should_be);
}

@ -24,42 +24,9 @@
*
*/
//! # The `protocol` module
//!
//! This module provides low-level interfaces to read data from a socket, when control
//! is handed over to it by `dbnet`, and high-level interfaces for parsing an incoming
//! query into an _executable query_ via the `deserializer` module.
//! This module provides deserialization primitives for query packets
mod deserializer;
pub mod responses;
use crate::dbnet::Terminator;
use crate::protocol::con::ConnectionHandler;
use crate::CoreDB;
use bytes::BytesMut;
pub use deserializer::ActionGroup;
pub use deserializer::ParseResult;
pub use deserializer::Query;
use libsky::TResult;
use libsky::BUF_CAP;
use std::sync::Arc;
use std::time::Duration;
use tokio::io::BufWriter;
use tokio::net::TcpListener;
use tokio::net::TcpStream;
use tokio::sync::Semaphore;
use tokio::sync::{broadcast, mpsc};
use tokio::time;
pub mod con;
pub mod tls;
/// A TCP connection wrapper
pub struct Connection {
/// The connection to the remote socket, wrapped in a buffer to speed
/// up writing
stream: BufWriter<TcpStream>,
/// The in-memory read buffer. The size is given by `BUF_CAP`
buffer: BytesMut,
}
/// The outcome of running `Connection`'s `try_query` function
pub enum QueryResult {
@ -71,75 +38,363 @@ pub enum QueryResult {
Empty,
}
impl Connection {
/// Initiailize a new `Connection` instance
pub fn new(stream: TcpStream) -> Self {
Connection {
stream: BufWriter::new(stream),
buffer: BytesMut::with_capacity(BUF_CAP),
}
/*
NOTE: I haven't used any recursion, because:
1. I like things to be explicit
2. I don't like huge stacks
And that's why I've done, what I've done, here.
*/
use std::vec::IntoIter;
/// # `ActionGroup`
///
/// The `ActionGroup` is an "Action Group" in the dataframe as described by the
/// Terrapipe protocol. An `ActionGroup` contains all the elements required to
/// execute an `Action`. The `ActionGroup` contains the "action" itself.
/// It may look like:
/// ```text
/// ["GET", "x", "y"]
/// ```
#[derive(Debug, PartialEq)]
pub struct ActionGroup(Vec<String>);
impl ActionGroup {
/// Returns how many arguments are there excluding the name of the action
pub fn howmany(&self) -> usize {
self.0.len() - 1
}
pub fn get_first(&self) -> Option<&String> {
self.0.get(0)
}
pub fn get_ref(&self) -> &Vec<String> {
&self.0
}
}
impl IntoIterator for ActionGroup {
type Item = String;
type IntoIter = std::iter::Skip<IntoIter<String>>;
fn into_iter(self) -> <Self as IntoIterator>::IntoIter {
self.0.into_iter().skip(1).into_iter()
}
}
// We'll use the idea of gracefully shutting down from tokio
/// A listener
pub struct Listener {
/// An atomic reference to the coretable
pub db: CoreDB,
/// The incoming connection listener (binding)
pub listener: TcpListener,
/// The maximum number of connections
pub climit: Arc<Semaphore>,
/// The shutdown broadcaster
pub signal: broadcast::Sender<()>,
// When all `Sender`s are dropped - the `Receiver` gets a `None` value
// We send a clone of `terminate_tx` to each `CHandler`
pub terminate_tx: mpsc::Sender<()>,
pub terminate_rx: mpsc::Receiver<()>,
#[derive(Debug, PartialEq)]
pub enum Query {
Simple(ActionGroup),
Pipelined(Vec<ActionGroup>),
}
impl Listener {
/// Accept an incoming connection
async fn accept(&mut self) -> TResult<TcpStream> {
// We will steal the idea of Ethernet's backoff for connection errors
let mut backoff = 1;
loop {
match self.listener.accept().await {
// We don't need the bindaddr
Ok((stream, _)) => return Ok(stream),
Err(e) => {
if backoff > 64 {
// Too many retries, goodbye user
return Err(e.into());
#[derive(Debug, PartialEq)]
/// Outcome of parsing a query
pub enum ParseResult {
/// The packet is incomplete, i.e more data needs to be read
Incomplete,
/// The packet is corrupted, in the sense that it contains invalid data
BadPacket,
/// A successfully parsed query
///
/// The second field is the number of bytes that should be discarded from the buffer as it has already
/// been read
Query(Query, usize),
}
/// # The Query parser
///
/// The query parser, well, parses query packets! Query packets look like this:
/// ```text
/// #<size_of_next_line>\n
/// *<no_of_actions>\n
/// #<size_of_next_line>\n
/// &<no_of_elements_in_actiongroup>\n
/// #<size_of_next_line>\n
/// element[0]\n
/// #<size_of_next_line>\n
/// element[1]\n
/// ...
/// element[n]\n
/// #<size_of_next_line>\n
/// &<no_of_elements_in_this_actiongroup>\n
/// ...
/// ```
///
pub fn parse(buf: &[u8]) -> ParseResult {
if buf.len() < 6 {
// A packet that has less than 6 characters? Nonsense!
return ParseResult::Incomplete;
}
/*
We first get the metaframe, which looks something like:
```
#<numchars_in_next_line>\n
!<num_of_datagroups>\n
```
*/
let mut pos = 0;
if buf[pos] != b'#' {
return ParseResult::BadPacket;
} else {
pos += 1;
}
let next_line = match read_line_and_return_next_line(&mut pos, &buf) {
Some(line) => line,
None => {
// This is incomplete
return ParseResult::Incomplete;
}
};
pos += 1; // Skip LF
// Find out the number of actions that we have to do
let mut action_size = 0usize;
if next_line[0] == b'*' {
let mut line_iter = next_line.into_iter().skip(1);
while let Some(dig) = line_iter.next() {
let curdig: usize = match dig.checked_sub(48) {
Some(dig) => {
if dig > 9 {
return ParseResult::BadPacket;
} else {
dig.into()
}
}
}
// Wait for the `backoff` duration
time::sleep(Duration::from_secs(backoff)).await;
// We're using exponential backoff
backoff *= 2;
None => return ParseResult::BadPacket,
};
action_size = (action_size * 10) + curdig;
}
// This line gives us the number of actions
} else {
return ParseResult::BadPacket;
}
/// Run the server
pub async fn run(&mut self) -> TResult<()> {
loop {
// Take the permit first, but we won't use it right now
// that's why we will forget it
self.climit.acquire().await.unwrap().forget();
let stream = self.accept().await?;
let mut chandle = ConnectionHandler::new(
self.db.clone(),
Connection::new(stream),
self.climit.clone(),
Terminator::new(self.signal.subscribe()),
self.terminate_tx.clone(),
);
tokio::spawn(async move {
if let Err(e) = chandle.run().await {
log::error!("Error: {}", e);
let mut items: Vec<ActionGroup> = Vec::with_capacity(action_size);
while pos < buf.len() && items.len() <= action_size {
match buf[pos] {
b'#' => {
pos += 1; // Skip '#'
let next_line = match read_line_and_return_next_line(&mut pos, &buf) {
Some(line) => line,
None => {
// This is incomplete
return ParseResult::Incomplete;
}
}; // Now we have the current line
pos += 1; // Skip the newline
// Move the cursor ahead by the number of bytes that we just read
// Let us check the current char
match next_line[0] {
b'&' => {
// This is an array
// Now let us parse the array size
let mut current_array_size = 0usize;
let mut linepos = 1; // Skip the '&' character
while linepos < next_line.len() {
let curdg: usize = match next_line[linepos].checked_sub(48) {
Some(dig) => {
if dig > 9 {
// If `dig` is greater than 9, then the current
// UTF-8 char isn't a number
return ParseResult::BadPacket;
} else {
dig.into()
}
}
None => {
return ParseResult::BadPacket;
}
};
current_array_size = (current_array_size * 10) + curdg; // Increment the size
linepos += 1; // Move the position ahead, since we just read another char
}
// Now we know the array size, good!
let mut actiongroup = Vec::with_capacity(current_array_size);
// Let's loop over to get the elements till the size of this array
while pos < buf.len() && actiongroup.len() < current_array_size {
let mut element_size = 0usize;
if buf[pos] == b'#' {
pos += 1; // skip the '#' character
} else {
return ParseResult::BadPacket;
}
while pos < buf.len() && buf[pos] != b'\n' {
let curdig: usize = match buf[pos].checked_sub(48) {
Some(dig) => {
if dig > 9 {
// If `dig` is greater than 9, then the current
// UTF-8 char isn't a number
return ParseResult::BadPacket;
} else {
dig.into()
}
}
None => {
return ParseResult::BadPacket;
}
};
element_size = (element_size * 10) + curdig; // Increment the size
pos += 1; // Move the position ahead, since we just read another char
}
pos += 1; // Skip the newline
// We now know the item size
let mut value = String::with_capacity(element_size);
let extracted = match buf.get(pos..pos + element_size) {
Some(s) => s,
None => return ParseResult::Incomplete,
};
pos += element_size; // Move the position ahead
value.push_str(&String::from_utf8_lossy(extracted));
pos += 1; // Skip the newline
actiongroup.push(value);
}
items.push(ActionGroup(actiongroup));
}
_ => {
return ParseResult::BadPacket;
}
}
});
continue;
}
_ => {
// Since the variant '#' would does all the array
// parsing business, we should never reach here unless
// the packet is invalid
return ParseResult::BadPacket;
}
}
}
if buf.get(pos).is_none() {
// Either more data was sent or some data was missing
if items.len() == action_size {
if items.len() == 1 {
ParseResult::Query(Query::Simple(items.remove(0)), buf.len())
} else {
ParseResult::Query(Query::Pipelined(items), buf.len())
}
} else {
ParseResult::Incomplete
}
} else {
ParseResult::BadPacket
}
}
/// Read a size line and return the following line
///
/// This reads a line that begins with the number, i.e make sure that
/// the **`#` character is skipped**
///
fn read_line_and_return_next_line<'a>(pos: &mut usize, buf: &'a [u8]) -> Option<&'a [u8]> {
let mut next_line_size = 0usize;
while pos < &mut buf.len() && buf[*pos] != b'\n' {
// 48 is the UTF-8 code for '0'
let curdig: usize = match buf[*pos].checked_sub(48) {
Some(dig) => {
if dig > 9 {
// If `dig` is greater than 9, then the current
// UTF-8 char isn't a number
return None;
} else {
dig.into()
}
}
None => return None,
};
next_line_size = (next_line_size * 10) + curdig; // Increment the size
*pos += 1; // Move the position ahead, since we just read another char
}
*pos += 1; // Skip the newline
// We now know the size of the next line
let next_line = match buf.get(*pos..*pos + next_line_size) {
Some(line) => line,
None => {
// This is incomplete
return None;
}
}; // Now we have the current line
// Move the cursor ahead by the number of bytes that we just read
*pos += next_line_size;
Some(next_line)
}
#[test]
fn test_parser() {
let input = "#2\n*1\n#2\n&3\n#3\nGET\n#1\nx\n#2\nex\n"
.to_owned()
.into_bytes();
let res = parse(&input);
let res_should_be = ParseResult::Query(
Query::Simple(ActionGroup(vec![
"GET".to_owned(),
"x".to_owned(),
"ex".to_owned(),
])),
input.len(),
);
assert_eq!(res, res_should_be);
let input = "#2\n*2\n#2\n&3\n#3\nGET\n#1\nx\n#2\nex\n"
.to_owned()
.into_bytes();
let res = parse(&input);
let res_should_be = ParseResult::Incomplete;
assert_eq!(res, res_should_be);
let input = "#2\n*A\n#2\n&3\n#3\nGET\n#1\nx\n#2\nex\n"
.to_owned()
.into_bytes();
let res = parse(&input);
let res_should_be = ParseResult::BadPacket;
assert_eq!(res, res_should_be);
let input = "#2\n*1\n#2\n&3\n#3\nSET\n#19\nbeinghumanisawesome\n#4\ntrue\n"
.as_bytes()
.to_owned();
let res = parse(&input);
let res_should_be = ParseResult::Query(
Query::Simple(ActionGroup(vec![
"SET".to_owned(),
"beinghumanisawesome".to_owned(),
"true".to_owned(),
])),
input.len(),
);
assert_eq!(res, res_should_be);
let input ="#2\n*1\n#3\n&17\n#3\nSET\n#3\none\n#1\n1\n#3\ntwo\n#1\n2\n#5\nthree\n#1\n3\n#4\nfour\n#1\n4\n#4\nfive\n#1\n5\n#3\nsix\n#1\n6\n#5\nseven\n#1\n7\n#5\neight\n#1\n8\n";
let res = parse(&input.to_owned().into_bytes());
let res_should_be = ParseResult::Query(
Query::Simple(ActionGroup(vec![
"SET".to_string(),
"one".to_string(),
"1".to_string(),
"two".to_string(),
"2".to_string(),
"three".to_string(),
"3".to_string(),
"four".to_string(),
"4".to_string(),
"five".to_string(),
"5".to_string(),
"six".to_string(),
"6".to_string(),
"seven".to_string(),
"7".to_string(),
"eight".to_string(),
"8".to_string(),
])),
input.len(),
);
assert_eq!(res, res_should_be);
let input = "#2\n*2\n#2\n&3\n#3\nGET\n#1\nx\n#2\nex\n#2\n&3\n#3\nSET\n#1\nx\n#4\ntrue"
.as_bytes()
.to_owned();
let res = parse(&input);
let res_should_be = ParseResult::Query(
Query::Pipelined(vec![
ActionGroup(vec!["GET".to_owned(), "x".to_owned(), "ex".to_owned()]),
ActionGroup(vec!["SET".to_owned(), "x".to_owned(), "true".to_owned()]),
]),
input.len(),
);
assert_eq!(res, res_should_be);
}

@ -27,8 +27,8 @@
//! # The Query Engine
use crate::coredb::CoreDB;
use crate::dbnet::con::prelude::*;
use crate::gen_match;
use crate::protocol::con::prelude::*;
use crate::protocol::responses;
use crate::protocol::ActionGroup;
use crate::{admin, kvengine};

Loading…
Cancel
Save