Add a connection abstraction

Also enabled safe dropping of the `Coretable`
next
Sayan Nandan 4 years ago
parent 40d148624d
commit 755e8d80f4
No known key found for this signature in database
GPG Key ID: C31EFD7DDA12AEE0

@ -8,15 +8,18 @@
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
pub mod terrapipe;
//! The core library for the Terrabase database
//! This contains modules which are shared by both the `cli` and the `server` modules
pub mod terrapipe;

@ -19,28 +19,48 @@
*
*/
//! This implements the Terrapipe protocol
/// Default query metaline buffer size
pub const DEF_QMETALINE_BUFSIZE: usize = 44;
/// Default query metalayout buffer size
pub const DEF_QMETALAYOUT_BUFSIZE: usize = 1024;
/// Default query dataframe buffer size
pub const DEF_QDATAFRAME_BUSIZE: usize = 4096;
pub mod tags {
//! This module is a collection of tags/strings used for evaluating queries
//! and responses
/// `GET` command tag
pub const TAG_GET: &'static str = "GET";
/// `SET` command tag
pub const TAG_SET: &'static str = "SET";
/// `UPDATE` command tag
pub const TAG_UPDATE: &'static str = "UPDATE";
/// `DEL` command tag
pub const TAG_DEL: &'static str = "DEL";
/// `HEYA` command tag
pub const TAG_HEYA: &'static str = "HEYA";
}
pub mod responses {
//! Empty responses, mostly errors, which are statically compiled
use lazy_static::lazy_static;
lazy_static! {
/// Empty `0`(Okay) response - without any content
pub static ref RESP_OKAY_EMPTY: Vec<u8> = "0!0!0".as_bytes().to_owned();
/// `1` Not found response
pub static ref RESP_NOT_FOUND: Vec<u8> = "1!0!0".as_bytes().to_owned();
/// `2` Overwrite Error response
pub static ref RESP_OVERWRITE_ERROR: Vec<u8> = "2!0!0".as_bytes().to_owned();
/// `3` Invalid Metaframe response
pub static ref RESP_INVALID_MF: Vec<u8> = "3!0!0".as_bytes().to_owned();
/// `4` Incomplete frame response
pub static ref RESP_INCOMPLETE: Vec<u8> = "4!0!0".as_bytes().to_owned();
/// `5` Internal server error response
pub static ref RESP_SERVER_ERROR: Vec<u8> = "5!0!0".as_bytes().to_owned();
}
}
/// Response codes returned by the server
#[derive(Debug, PartialEq)]
pub enum RespCodes {
/// `0`: Okay (Empty Response) - use the `ResponseBuilder` for building
@ -75,6 +95,7 @@ impl From<RespCodes> for u8 {
}
}
/// Representation of the query action type - pipelined or simple
#[derive(Debug, PartialEq)]
pub enum ActionType {
Simple,
@ -103,22 +124,28 @@ impl RespBytes for RespCodes {
}
}
/// The query dataframe
#[derive(Debug)]
pub struct QueryDataframe {
/// The data part
pub data: Vec<String>,
/// The query action type
pub actiontype: ActionType,
}
/// This is enum represents types of responses which can be built from it
pub enum ResponseBuilder {
SimpleResponse, // TODO: Add pipelined response builder here
}
impl ResponseBuilder {
/// Create a new simple response
pub fn new_simple(respcode: RespCodes) -> SimpleResponse {
SimpleResponse::new(respcode.into())
}
}
/// Representation of a simple response
pub struct SimpleResponse {
respcode: u8,
metalayout_buf: String,
@ -127,6 +154,8 @@ pub struct SimpleResponse {
}
impl SimpleResponse {
/// Create a new response with just a response code
/// The data has to be added by using the `add_data()` member function
pub fn new(respcode: u8) -> Self {
SimpleResponse {
respcode,
@ -135,13 +164,17 @@ impl SimpleResponse {
size_tracker: 0,
}
}
/// Add data to the response
pub fn add_data(&mut self, data: String) {
self.metalayout_buf.push_str(&format!("{}#", data.len()));
self.size_tracker += data.len() + 1;
let datstr = data.len().to_string();
self.metalayout_buf.push_str(&format!("{}#", datstr.len()));
self.size_tracker += datstr.len() + 1;
self.dataframe_buf.push_str(&data);
self.dataframe_buf.push('\n');
}
pub fn prepare_response(&self) -> Vec<u8> {
/// Internal function used in the implementation of the `RespBytes` trait
/// for creating a `Vec<u8>` which can be written to a TCP stream
fn prepare_response(&self) -> Vec<u8> {
format!(
"{}!{}!{}\n{}\n{}",
self.respcode,

@ -20,14 +20,16 @@
*/
use corelib::terrapipe::QueryDataframe;
use corelib::terrapipe::{responses, tags, ActionType, RespBytes, RespCodes, ResponseBuilder};
use corelib::terrapipe::{tags, ActionType, RespBytes, RespCodes, ResponseBuilder};
use std::collections::{hash_map::Entry, HashMap};
use std::sync::{Arc, RwLock};
pub type DbResult<T> = Result<T, RespCodes>;
/// Results from actions on the Database
pub type ActionResult<T> = Result<T, RespCodes>;
pub struct CoreDB {
shared: Arc<Coretable>,
terminate: bool,
}
pub struct Coretable {
@ -35,14 +37,14 @@ pub struct Coretable {
}
impl Coretable {
pub fn get(&self, key: &str) -> DbResult<String> {
pub fn get(&self, key: &str) -> ActionResult<String> {
if let Some(value) = self.coremap.read().unwrap().get(key) {
Ok(value.to_string())
} else {
Err(RespCodes::NotFound)
}
}
pub fn set(&self, key: &str, value: &str) -> DbResult<()> {
pub fn set(&self, key: &str, value: &str) -> ActionResult<()> {
match self.coremap.write().unwrap().entry(key.to_string()) {
Entry::Occupied(_) => return Err(RespCodes::OverwriteError),
Entry::Vacant(e) => {
@ -51,7 +53,7 @@ impl Coretable {
}
}
}
pub fn update(&self, key: &str, value: &str) -> DbResult<()> {
pub fn update(&self, key: &str, value: &str) -> ActionResult<()> {
match self.coremap.write().unwrap().entry(key.to_string()) {
Entry::Occupied(ref mut e) => {
e.insert(value.to_string());
@ -60,7 +62,7 @@ impl Coretable {
Entry::Vacant(_) => Err(RespCodes::NotFound),
}
}
pub fn del(&self, key: &str) -> DbResult<()> {
pub fn del(&self, key: &str) -> ActionResult<()> {
if let Some(_) = self.coremap.write().unwrap().remove(&key.to_owned()) {
Ok(())
} else {
@ -169,9 +171,23 @@ impl CoreDB {
shared: Arc::new(Coretable {
coremap: RwLock::new(HashMap::new()),
}),
terminate: false,
}
}
pub fn get_handle(&self) -> Arc<Coretable> {
Arc::clone(&self.shared)
}
}
impl Drop for CoreDB {
// This prevents us from killing the database, in the event someone tries
// to access it
fn drop(&mut self) {
if Arc::strong_count(&self.shared) == 1 {
// Acquire a lock to prevent anyone from writing something
let coremap = self.shared.coremap.write().unwrap();
self.terminate = true;
drop(coremap);
}
}
}

@ -19,13 +19,12 @@
*
*/
use tokio::io::AsyncWriteExt;
use tokio::net::{TcpListener, TcpStream};
use tokio::net::TcpListener;
mod coredb;
mod dbnet;
mod protocol;
use coredb::CoreDB;
use corelib::terrapipe::RespBytes;
use protocol::read_query;
use protocol::Connection;
static ADDR: &'static str = "127.0.0.1:2003";
#[tokio::main]
@ -35,18 +34,15 @@ async fn main() {
let db = CoreDB::new();
loop {
let handle = db.get_handle();
let (mut socket, _) = listener.accept().await.unwrap();
let (socket, _) = listener.accept().await.unwrap();
tokio::spawn(async move {
let q = read_query(&mut socket).await;
let mut con = Connection::new(socket);
let q = con.read_query().await;
let df = match q {
Ok(q) => q,
Err(e) => return close_conn_with_error(socket, e).await,
Err(e) => return con.close_conn_with_error(e).await,
};
socket.write_all(&handle.execute_query(df)).await.unwrap();
con.write_response(handle.execute_query(df)).await;
});
}
}
async fn close_conn_with_error(mut stream: TcpStream, bytes: impl RespBytes) {
stream.write_all(&bytes.into_response()).await.unwrap()
}

@ -21,8 +21,7 @@
use corelib::terrapipe::{ActionType, QueryDataframe};
use corelib::terrapipe::{RespBytes, RespCodes, DEF_QMETALINE_BUFSIZE};
use tokio::io::{AsyncBufReadExt, AsyncReadExt, BufReader};
use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
use tokio::net::TcpStream;
#[derive(Debug, PartialEq)]
@ -115,7 +114,7 @@ fn extract_idents(buf: Vec<u8>, skip_sequence: Vec<usize>) -> Vec<String> {
.scan(buf.into_iter(), |databuf, size| {
let tok: Vec<u8> = databuf.take(size).collect();
let _ = databuf.next();
// FIXME(ohsayan): This is quite slow, we'll have to use SIMD in the future
// FIXME(@ohsayan): This is quite slow, we'll have to use SIMD in the future
Some(String::from_utf8_lossy(&tok).to_string())
})
.collect()
@ -137,27 +136,46 @@ fn test_extract_idents() {
assert_eq!(res[1], "<22><>");
}
pub async fn read_query(mut stream: &mut TcpStream) -> Result<QueryDataframe, impl RespBytes> {
let mut bufreader = BufReader::new(&mut stream);
let mut metaline_buf = String::with_capacity(DEF_QMETALINE_BUFSIZE);
bufreader.read_line(&mut metaline_buf).await.unwrap();
let pqmf = match PreQMF::from_buffer(metaline_buf) {
Ok(pq) => pq,
Err(e) => return Err(e),
};
let (mut metalayout_buf, mut dataframe_buf) = (
String::with_capacity(pqmf.metaline_size),
vec![0; pqmf.content_size],
);
bufreader.read_line(&mut metalayout_buf).await.unwrap();
let ss = match get_sizes(metalayout_buf) {
Ok(ss) => ss,
Err(e) => return Err(e),
};
bufreader.read(&mut dataframe_buf).await.unwrap();
let qdf = QueryDataframe {
data: extract_idents(dataframe_buf, ss),
actiontype: pqmf.action_type,
};
Ok(qdf)
pub struct Connection {
stream: TcpStream,
}
impl Connection {
pub fn new(stream: TcpStream) -> Self {
Connection { stream }
}
pub async fn read_query(&mut self) -> Result<QueryDataframe, impl RespBytes> {
let mut bufreader = BufReader::new(&mut self.stream);
let mut metaline_buf = String::with_capacity(DEF_QMETALINE_BUFSIZE);
bufreader.read_line(&mut metaline_buf).await.unwrap();
let pqmf = match PreQMF::from_buffer(metaline_buf) {
Ok(pq) => pq,
Err(e) => return Err(e),
};
let (mut metalayout_buf, mut dataframe_buf) = (
String::with_capacity(pqmf.metaline_size),
vec![0; pqmf.content_size],
);
bufreader.read_line(&mut metalayout_buf).await.unwrap();
let ss = match get_sizes(metalayout_buf) {
Ok(ss) => ss,
Err(e) => return Err(e),
};
bufreader.read(&mut dataframe_buf).await.unwrap();
let qdf = QueryDataframe {
data: extract_idents(dataframe_buf, ss),
actiontype: pqmf.action_type,
};
Ok(qdf)
}
pub async fn write_response(&mut self, resp: Vec<u8>) {
if let Ok(_) = self.stream.write(&resp).await {
return;
} else {
eprintln!("Error writing response to {:?}", self.stream.peer_addr())
}
}
pub async fn close_conn_with_error(&mut self, bytes: impl RespBytes) {
self.stream.write_all(&bytes.into_response()).await.unwrap()
}
}

Loading…
Cancel
Save