Implement basic coredb

next
Sayan Nandan 4 years ago
parent 520bcbc95c
commit 0edaf27bd3
No known key found for this signature in database
GPG Key ID: C31EFD7DDA12AEE0

1
Cargo.lock generated

@ -356,6 +356,7 @@ dependencies = [
name = "terrabase"
version = "0.1.0"
dependencies = [
"bytes 0.5.5 (registry+https://github.com/rust-lang/crates.io-index)",
"libcore 0.1.0",
"tokio 0.2.21 (registry+https://github.com/rust-lang/crates.io-index)",
]

@ -35,11 +35,11 @@ pub const MF_METHOD_UPDATE: &'static str = "UPDATE";
pub const MF_METHOD_DEL: &'static str = "DEL";
pub const DEF_Q_META_BUFSIZE: usize = 46;
pub const DEF_R_META_BUFSIZE: usize = 40;
#[macro_export]
macro_rules! response_packet {
($version:expr, $respcode:expr, $data:expr) => {{
let res = format!(
"TP/{}.{}.{}/R/{}/{}\n\n{}",
"TP/{}.{}.{}/R/{}/{}\n{}",
$version.0,
$version.1,
$version.2,
@ -79,7 +79,7 @@ impl Version {
}
pub enum ResponseCodes {
Okay, // Code: 0
Okay(Option<String>), // Code: 0
NotFound, // Code: 1
OverwriteError, // Code: 2
MethodNotAllowed, // Code: 3
@ -94,7 +94,7 @@ impl ResponseCodes {
pub fn from_u8(code: u8) -> Option<Self> {
use ResponseCodes::*;
let c = match code {
0 => Okay,
0 => Okay(None),
1 => NotFound,
2 => OverwriteError,
3 => MethodNotAllowed,
@ -117,9 +117,12 @@ impl ResponseBytes for ResponseCodes {
fn response_bytes(&self, v: &Version) -> Vec<u8> {
use ResponseCodes::*;
match self {
Okay => {
// We will never need an implementation for Okay
unimplemented!()
Okay(val) => {
if let Some(dat) = val {
response_packet!(v, 0, dat)
} else {
response_packet!(v, 0, "")
}
}
NotFound => response_packet!(v, 1, ""),
OverwriteError => response_packet!(v, 2, ""),
@ -150,8 +153,8 @@ pub struct QueryMetaframe {
impl QueryMetaframe {
pub fn from_buffer(
self_version: &Version,
buf: &String
) -> Result<QueryMetaframe, impl ResponseBytes> {
buf: &String,
) -> Result<QueryMetaframe, ResponseCodes> {
let mf_parts: Vec<&str> = buf.split(MF_SEPARATOR).collect();
if mf_parts.len() != 5 {
return Err(ResponseCodes::InvalidMetaframe);
@ -172,10 +175,9 @@ impl QueryMetaframe {
Ok(csize) => csize,
Err(e) => {
eprintln!("Errored: {}", e);
return Err(ResponseCodes::InvalidMetaframe)
},
return Err(ResponseCodes::InvalidMetaframe);
}
};
let method = match mf_parts[3] {
MF_METHOD_GET => QueryMethod::GET,
MF_METHOD_SET => QueryMethod::SET,
@ -201,10 +203,7 @@ impl QueryMetaframe {
pub struct Dataframe(String);
impl Dataframe {
pub fn from_buffer(
target_size: usize,
buffer: Vec<u8>,
) -> Result<Dataframe, impl ResponseBytes> {
pub fn from_buffer(target_size: usize, buffer: Vec<u8>) -> Result<Dataframe, ResponseCodes> {
let buffer = String::from_utf8_lossy(&buffer);
let buffer = buffer.trim();
if buffer.len() != target_size {
@ -250,7 +249,9 @@ fn benchmark_metaframe_parsing() {
metaframes.push(buf);
});
let b = run_benchmark(50000, |n| {
let _ = QueryMetaframe::from_buffer(&v, &metaframes[n]).ok().unwrap();
let _ = QueryMetaframe::from_buffer(&v, &metaframes[n])
.ok()
.unwrap();
});
b.print_stats();
}

@ -9,4 +9,5 @@ edition = "2018"
[dependencies]
tokio = { version = "0.2.21", features = ["full"] }
# Import `libcore` which contains code for terrapipe and other utils
libcore = {path="../libcore"}
libcore = {path="../libcore"}
bytes = "0.5.5"

@ -0,0 +1,83 @@
/*
* Created on Mon Jul 13 2020
*
* This file is a part of the source code for the Terrabase database
* Copyright (c) 2020 Sayan Nandan
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
use libcore::terrapipe::ResponseCodes;
use std::collections::{hash_map::Entry, HashMap};
use std::sync::{Arc, RwLock};
pub struct CoreDB {
shared: Arc<Coretable>,
}
pub struct Coretable {
coremap: RwLock<HashMap<String, String>>,
}
impl Coretable {
pub fn get(&self, key: &str) -> Result<String, ResponseCodes> {
if let Some(value) = self.coremap.read().unwrap().get(key) {
Ok(value.to_string())
} else {
Err(ResponseCodes::NotFound)
}
}
pub fn set(&self, key: &str, value: &str) -> Result<(), ResponseCodes> {
match self.coremap.write().unwrap().entry(key.to_string()) {
Entry::Occupied(_) => return Err(ResponseCodes::OverwriteError),
Entry::Vacant(e) => {
let _ = e.insert(value.to_string());
Ok(())
}
}
}
pub fn update(&self, key: &str, value: &str) -> Result<(), ResponseCodes> {
match self.coremap.write().unwrap().entry(key.to_string()) {
Entry::Occupied(ref mut e) => {
e.insert(value.to_string());
Ok(())
}
Entry::Vacant(_) => Err(ResponseCodes::NotFound),
}
}
pub fn del(&self, key: &str) -> Result<(), ResponseCodes> {
if let Some(_) = self.coremap.write().unwrap().remove(&key.to_owned()) {
Ok(())
} else {
Err(ResponseCodes::NotFound)
}
}
pub fn print_debug_table(&self) {
println!("{:#?}", *self.coremap.read().unwrap());
}
}
impl CoreDB {
pub fn new() -> Self {
CoreDB {
shared: Arc::new(Coretable {
coremap: RwLock::new(HashMap::new()),
}),
}
}
pub fn get_handle(&self) -> Arc<Coretable> {
Arc::clone(&self.shared)
}
}

@ -0,0 +1,75 @@
/*
* Created on Mon Jul 13 2020
*
* This file is a part of the source code for the Terrabase database
* Copyright (c) 2020 Sayan Nandan
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
use super::SELF_VERSION;
use libcore::terrapipe::{
Dataframe, QueryMetaframe, ResponseBytes, ResponseCodes, DEF_Q_META_BUFSIZE,
};
use tokio::io::{self, AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
use tokio::net::TcpStream;
use tokio::time::{self, timeout};
pub struct Connection {
stream: TcpStream,
}
pub struct Query {
qmf: QueryMetaframe,
df: Dataframe,
}
impl Connection {
pub fn new(stream: TcpStream) -> Self {
Connection { stream }
}
pub async fn get_query_packet(&mut self) -> Result<Query, impl ResponseBytes> {
let mut meta_buffer = String::with_capacity(DEF_Q_META_BUFSIZE);
let mut bufreader = BufReader::new(&mut self.stream);
match bufreader.read_line(&mut meta_buffer).await {
Ok(_) => (),
Err(_) => {
return Err(ResponseCodes::InternalServerError);
}
}
let qmf = match QueryMetaframe::from_buffer(&SELF_VERSION, &meta_buffer) {
Ok(qmf) => qmf,
Err(e) => return Err(e),
};
let mut data_buffer = vec![0; qmf.get_content_size()];
match timeout(
time::Duration::from_millis(400),
bufreader.read(&mut data_buffer),
)
.await
{
Ok(_) => (),
Err(_) => return Err(ResponseCodes::InternalServerError),
}
let df = match Dataframe::from_buffer(qmf.get_content_size(), data_buffer) {
Ok(d) => d,
Err(e) => return Err(e),
};
Ok(Query { qmf, df })
}
pub async fn write_response_packet(&mut self, resp: Vec<u8>) -> io::Result<()> {
self.stream.write_all(&resp).await
}
}

@ -22,13 +22,18 @@
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream};
use tokio::task;
mod coredb;
mod dbnet;
use std::cmp::Ordering;
use std::sync::Arc;
// Internal modules
use libcore::response_packet;
use libcore::terrapipe::{
Dataframe, QueryMetaframe, QueryMethod, ResponseBytes, ResponseCodes, Version,
DEF_Q_META_BUFSIZE,
};
const SELF_VERSION: Version = Version(0, 1, 0);
static ADDR: &'static str = "127.0.0.1:2003";
@ -37,8 +42,9 @@ static ADDR: &'static str = "127.0.0.1:2003";
async fn main() {
let mut listener = TcpListener::bind(ADDR).await.unwrap();
println!("Server running on terrapipe://127.0.0.1:2003");
let db = coredb::CoreDB::new();
loop {
let handle = db.get_handle();
let (mut socket, _) = listener.accept().await.unwrap();
tokio::spawn(async move {
let mut meta_buffer = String::with_capacity(DEF_Q_META_BUFSIZE);
@ -58,7 +64,7 @@ async fn main() {
return close_conn_with_error(socket, e.response_bytes(&SELF_VERSION)).await
}
};
return execute_query(socket, mf, df).await;
return execute_query(socket, handle, mf, df).await;
});
}
}
@ -67,57 +73,72 @@ async fn close_conn_with_error(mut stream: TcpStream, bytes: Vec<u8>) {
stream.write_all(&bytes).await.unwrap()
}
async fn execute_query(mut stream: TcpStream, mf: QueryMetaframe, df: Dataframe) {
async fn execute_query(
mut stream: TcpStream,
handle: Arc<coredb::Coretable>,
mf: QueryMetaframe,
df: Dataframe,
) {
let vars = df.deflatten();
use QueryMethod::*;
match mf.get_method() {
GET => {
if vars.len() == 1 {
println!("GET {}", vars[0]);
} else if vars.len() > 1 {
eprintln!("ERROR: Cannot do multiple GETs just yet");
} else {
stream
.write(&ResponseCodes::CorruptPacket.response_bytes(&SELF_VERSION))
.await
.unwrap();
}
let result = match vars.len().cmp(&1) {
Ordering::Equal => {
if let Ok(v) = handle.get(vars[0]) {
ResponseCodes::Okay(Some(v.to_string()))
} else {
ResponseCodes::NotFound
}
}
_ => ResponseCodes::CorruptDataframe,
};
stream
.write(&result.response_bytes(&SELF_VERSION))
.await
.unwrap();
}
SET => {
if vars.len() == 2 {
println!("SET {} {}", vars[0], vars[1]);
} else if vars.len() < 2 {
stream
.write(&ResponseCodes::CorruptPacket.response_bytes(&SELF_VERSION))
.await
.unwrap();
} else {
eprintln!("ERROR: Cannot do multiple SETs just yet");
}
let result = match vars.len().cmp(&2) {
Ordering::Equal => match handle.set(vars[0], vars[1]) {
Ok(_) => ResponseCodes::Okay(None),
Err(e) => e,
},
_ => ResponseCodes::CorruptDataframe,
};
handle.print_debug_table();
stream
.write(&result.response_bytes(&SELF_VERSION))
.await
.unwrap();
}
UPDATE => {
if vars.len() == 2 {
println!("UPDATE {} {}", vars[0], vars[1]);
} else if vars.len() < 2 {
stream
.write(&ResponseCodes::CorruptPacket.response_bytes(&SELF_VERSION))
.await
.unwrap();
} else {
eprintln!("ERROR: Cannot do multiple UPDATEs just yet");
}
let result = match vars.len().cmp(&2) {
Ordering::Equal => match handle.update(vars[0], vars[1]) {
Ok(_) => ResponseCodes::Okay(None),
Err(e) => e,
},
_ => ResponseCodes::CorruptDataframe,
};
handle.print_debug_table();
stream
.write(&result.response_bytes(&SELF_VERSION))
.await
.unwrap();
}
DEL => {
if vars.len() == 1 {
println!("DEL {}", vars[0]);
} else if vars.len() > 1 {
eprintln!("ERROR: Cannot do multiple DELs just yet")
} else {
stream
.write(&ResponseCodes::CorruptPacket.response_bytes(&SELF_VERSION))
.await
.unwrap();
}
let result = match vars.len().cmp(&1) {
Ordering::Equal => match handle.del(vars[0]) {
Ok(_) => ResponseCodes::Okay(None),
Err(e) => e,
},
_ => ResponseCodes::CorruptDataframe,
};
handle.print_debug_table();
stream
.write(&result.response_bytes(&SELF_VERSION))
.await
.unwrap();
}
}
}

Loading…
Cancel
Save