Implement basic query parsing

next
Sayan Nandan 4 years ago
parent 2b1e92fb17
commit 40d148624d
No known key found for this signature in database
GPG Key ID: C31EFD7DDA12AEE0

@ -1,6 +1,6 @@
# Contribution guidelines
Firstly, thank you for your interest in contributing to this project. This project is powered by the community
and relies on hackers across the globe to contribute code to move this code forward.
and relies on hackers across the globe to contribute code to move this project forward.
You can see a list of contributors **[here](./CONTRIBUTORS.md)**

@ -7,7 +7,7 @@ As noted earlier, Terrabase is pre-alpha software and the entire API is subject
## Getting started
We have an experimnetal client and server implementations for the database already. You can download a pre-built binary for `x86_64-linux` in the releases section and try it out!
We have an experimental client and server implementation for the database already. You can download a pre-built binary for `x86_64-linux` in the releases section and try it out!
* First unzip the file
* Start the database server by running `./terrabase`

@ -22,16 +22,22 @@
pub const DEF_QMETALINE_BUFSIZE: usize = 44;
pub const DEF_QMETALAYOUT_BUFSIZE: usize = 1024;
pub const DEF_QDATAFRAME_BUSIZE: usize = 4096;
pub mod tags {
pub const TAG_GET: &'static str = "GET";
pub const TAG_SET: &'static str = "SET";
pub const TAG_UPDATE: &'static str = "UPDATE";
pub const TAG_DEL: &'static str = "DEL";
pub const TAG_HEYA: &'static str = "HEYA";
}
pub mod responses {
use lazy_static::lazy_static;
lazy_static! {
pub static ref RESP_OKAY_EMPTY: Vec<u8> = "0!0!0#".as_bytes().to_owned();
pub static ref RESP_NOT_FOUND: Vec<u8> = "1!0!0#".as_bytes().to_owned();
pub static ref RESP_OVERWRITE_ERROR: Vec<u8> = "2!0!0#".as_bytes().to_owned();
pub static ref RESP_INVALID_MF: Vec<u8> = "3!0!0#".as_bytes().to_owned();
pub static ref RESP_INCOMPLETE: Vec<u8> = "4!0!0#".as_bytes().to_owned();
pub static ref RESP_SERVER_ERROR: Vec<u8> = "5!0!0#".as_bytes().to_owned();
pub static ref RESP_OKAY_EMPTY: Vec<u8> = "0!0!0".as_bytes().to_owned();
pub static ref RESP_NOT_FOUND: Vec<u8> = "1!0!0".as_bytes().to_owned();
pub static ref RESP_OVERWRITE_ERROR: Vec<u8> = "2!0!0".as_bytes().to_owned();
pub static ref RESP_INVALID_MF: Vec<u8> = "3!0!0".as_bytes().to_owned();
pub static ref RESP_INCOMPLETE: Vec<u8> = "4!0!0".as_bytes().to_owned();
pub static ref RESP_SERVER_ERROR: Vec<u8> = "5!0!0".as_bytes().to_owned();
}
}
@ -86,7 +92,7 @@ impl RespBytes for RespCodes {
use responses::*;
use RespCodes::*;
match self {
EmptyResponseOkay => RESP_NOT_FOUND.to_owned(),
EmptyResponseOkay => RESP_OKAY_EMPTY.to_owned(),
NotFound => RESP_NOT_FOUND.to_owned(),
OverwriteError => RESP_OVERWRITE_ERROR.to_owned(),
InvalidMetaframe => RESP_INVALID_MF.to_owned(),
@ -129,10 +135,10 @@ impl SimpleResponse {
size_tracker: 0,
}
}
pub fn add_data(&mut self, data: &str) {
pub fn add_data(&mut self, data: String) {
self.metalayout_buf.push_str(&format!("{}#", data.len()));
self.size_tracker += data.len() + 1;
self.dataframe_buf.push_str(data);
self.dataframe_buf.push_str(&data);
self.dataframe_buf.push('\n');
}
pub fn prepare_response(&self) -> Vec<u8> {
@ -159,14 +165,14 @@ impl RespBytes for SimpleResponse {
#[test]
fn test_simple_response() {
let mut s = ResponseBuilder::new_simple(RespCodes::EmptyResponseOkay);
s.add_data("Sayan");
s.add_data("loves");
s.add_data("you");
s.add_data("if");
s.add_data("you");
s.add_data("send");
s.add_data("UTF8");
s.add_data("bytes");
s.add_data("Sayan".to_owned());
s.add_data("loves".to_owned());
s.add_data("you".to_owned());
s.add_data("if".to_owned());
s.add_data("you".to_owned());
s.add_data("send".to_owned());
s.add_data("UTF8".to_owned());
s.add_data("bytes".to_owned());
assert_eq!(
String::from_utf8_lossy(&s.into_response()),
String::from("0!39!16\n5#5#3#2#3#4#4#5#\nSayan\nloves\nyou\nif\nyou\nsend\nUTF8\nbytes\n")

@ -19,10 +19,13 @@
*
*/
use corelib::terrapipe::responses::*;
use corelib::terrapipe::QueryDataframe;
use corelib::terrapipe::{responses, tags, ActionType, RespBytes, RespCodes, ResponseBuilder};
use std::collections::{hash_map::Entry, HashMap};
use std::sync::{Arc, RwLock};
pub type DbResult<T> = Result<T, RespCodes>;
pub struct CoreDB {
shared: Arc<Coretable>,
}
@ -32,42 +35,132 @@ pub struct Coretable {
}
impl Coretable {
pub fn get(&self, key: &str) -> Result<String, Vec<u8>> {
pub fn get(&self, key: &str) -> DbResult<String> {
if let Some(value) = self.coremap.read().unwrap().get(key) {
Ok(value.to_string())
} else {
Err(RESP_NOT_FOUND.to_owned())
Err(RespCodes::NotFound)
}
}
pub fn set(&self, key: &str, value: &str) -> Result<(), Vec<u8>> {
pub fn set(&self, key: &str, value: &str) -> DbResult<()> {
match self.coremap.write().unwrap().entry(key.to_string()) {
Entry::Occupied(_) => return Err(RESP_OVERWRITE_ERROR.to_owned()),
Entry::Occupied(_) => return Err(RespCodes::OverwriteError),
Entry::Vacant(e) => {
let _ = e.insert(value.to_string());
Ok(())
}
}
}
pub fn update(&self, key: &str, value: &str) -> Result<(), Vec<u8>> {
pub fn update(&self, key: &str, value: &str) -> DbResult<()> {
match self.coremap.write().unwrap().entry(key.to_string()) {
Entry::Occupied(ref mut e) => {
e.insert(value.to_string());
Ok(())
}
Entry::Vacant(_) => Err(RESP_NOT_FOUND.to_owned()),
Entry::Vacant(_) => Err(RespCodes::NotFound),
}
}
pub fn del(&self, key: &str) -> Result<(), Vec<u8>> {
pub fn del(&self, key: &str) -> DbResult<()> {
if let Some(_) = self.coremap.write().unwrap().remove(&key.to_owned()) {
Ok(())
} else {
Err(RESP_NOT_FOUND.to_owned())
Err(RespCodes::NotFound)
}
}
#[cfg(Debug)]
pub fn print_debug_table(&self) {
println!("{:#?}", *self.coremap.read().unwrap());
}
pub fn execute_query(&self, df: QueryDataframe) -> Vec<u8> {
match df.actiontype {
ActionType::Simple => self.execute_simple(df.data),
// TODO(@ohsayan): Pipeline commands haven't been implemented yet
ActionType::Pipeline => unimplemented!(),
}
}
pub fn execute_simple(&self, buf: Vec<String>) -> Vec<u8> {
let mut buf = buf.into_iter();
while let Some(token) = buf.next() {
match token.to_uppercase().as_str() {
tags::TAG_GET => {
// This is a GET request
if let Some(key) = buf.next() {
if buf.next().is_none() {
let res = match self.get(&key.to_string()) {
Ok(v) => v,
Err(e) => return e.into_response(),
};
let mut resp =
ResponseBuilder::new_simple(RespCodes::EmptyResponseOkay);
resp.add_data(res.to_owned());
return resp.into_response();
}
}
}
tags::TAG_SET => {
// This is a SET request
if let Some(key) = buf.next() {
if let Some(value) = buf.next() {
if buf.next().is_none() {
match self.set(&key.to_string(), &value.to_string()) {
Ok(_) => {
#[cfg(Debug)]
self.print_debug_table();
return RespCodes::EmptyResponseOkay.into_response();
}
Err(e) => return e.into_response(),
}
}
}
}
}
tags::TAG_UPDATE => {
// This is an UPDATE query
if let Some(key) = buf.next() {
if let Some(value) = buf.next() {
if buf.next().is_none() {
match self.update(&key.to_string(), &value.to_string()) {
Ok(_) => {
return {
#[cfg(Debug)]
self.print_debug_table();
RespCodes::EmptyResponseOkay.into_response()
}
}
Err(e) => return e.into_response(),
}
}
}
}
}
tags::TAG_DEL => {
// This is a GET request
if let Some(key) = buf.next() {
if buf.next().is_none() {
match self.del(&key.to_string()) {
Ok(_) => {
#[cfg(Debug)]
self.print_debug_table();
return RespCodes::EmptyResponseOkay.into_response();
}
Err(e) => return e.into_response(),
}
}
}
}
tags::TAG_HEYA => {
let mut resp = ResponseBuilder::new_simple(RespCodes::EmptyResponseOkay);
resp.add_data("HEY!".to_owned());
return resp.into_response();
}
_ => return RespCodes::OtherError("Unknown command".to_owned()).into_response(),
}
}
RespCodes::InvalidMetaframe.into_response()
}
}
impl CoreDB {

@ -23,6 +23,7 @@ use tokio::io::AsyncWriteExt;
use tokio::net::{TcpListener, TcpStream};
mod coredb;
mod protocol;
use coredb::CoreDB;
use corelib::terrapipe::RespBytes;
use protocol::read_query;
static ADDR: &'static str = "127.0.0.1:2003";
@ -31,7 +32,9 @@ static ADDR: &'static str = "127.0.0.1:2003";
async fn main() {
let mut listener = TcpListener::bind(ADDR).await.unwrap();
println!("Server running on terrapipe://127.0.0.1:2003");
let db = CoreDB::new();
loop {
let handle = db.get_handle();
let (mut socket, _) = listener.accept().await.unwrap();
tokio::spawn(async move {
let q = read_query(&mut socket).await;
@ -39,7 +42,7 @@ async fn main() {
Ok(q) => q,
Err(e) => return close_conn_with_error(socket, e).await,
};
println!("{:#?}", df);
socket.write_all(&handle.execute_query(df)).await.unwrap();
});
}
}

@ -150,11 +150,11 @@ pub async fn read_query(mut stream: &mut TcpStream) -> Result<QueryDataframe, im
vec![0; pqmf.content_size],
);
bufreader.read_line(&mut metalayout_buf).await.unwrap();
bufreader.read(&mut dataframe_buf).await.unwrap();
let ss = match get_sizes(metalayout_buf) {
Ok(ss) => ss,
Err(e) => return Err(e),
};
bufreader.read(&mut dataframe_buf).await.unwrap();
let qdf = QueryDataframe {
data: extract_idents(dataframe_buf, ss),
actiontype: pqmf.action_type,

Loading…
Cancel
Save