Implement new terrapipe spec and implement coredb

next
Sayan Nandan 4 years ago
parent 8613ddb694
commit 6781468519
No known key found for this signature in database
GPG Key ID: C31EFD7DDA12AEE0

@ -1,14 +1,16 @@
# Contribution guidelines
Firstly, thank you for your interest in contributing to this project. This project is powered by the community
and relies on hackers across the globe to contribute code to move this code forward. You can see a list of contributors [here](./CONTRIBUTORS.md)
and relies on hackers across the globe to contribute code to move this code forward.
You can see a list of contributors **[here](./CONTRIBUTORS.md)**
## Coding guidelines
### Conventions
* `FIXME` : Use this when you have made an implementation that can be improved in the future, such as improved efficiency
* `HACK` : Use this when the code you are using a temporary workaround
* `TODO` : Use this when you have kept something incomplete
* `FIXME(@<username>)` : Use this when you have made an implementation that can be improved in the future, such as improved efficiency
* `HACK(@<username>)` : Use this when the code you are using a temporary workaround
* `TODO(@<username>)` : Use this when you have kept something incomplete
### Formatting
@ -21,4 +23,4 @@ and relies on hackers across the globe to contribute code to move this code forw
3. Sign the CLA (if you haven't signed it already)
4. One of the maintainers will review your patch and suggest changes if required
5. Once your patch is approved, it will be merged into the respective branch
6. Done!
6. Done, you're now one of the [contributors](./CONTRIBUTORS.md)!

@ -19,7 +19,9 @@
*
*/
pub const DEF_QMETA_BUFSIZE: usize = 44;
pub const DEF_QMETALINE_BUFSIZE: usize = 44;
pub const DEF_QMETALAYOUT_BUFSIZE: usize = 1024;
pub const DEF_QDATAFRAME_BUSIZE: usize = 4096;
pub mod responses {
use lazy_static::lazy_static;

@ -19,3 +19,66 @@
*
*/
use corelib::responses::*;
use std::collections::{hash_map::Entry, HashMap};
use std::sync::{Arc, RwLock};
pub struct CoreDB {
shared: Arc<Coretable>,
}
pub struct Coretable {
coremap: RwLock<HashMap<String, String>>,
}
impl Coretable {
pub fn get(&self, key: &str) -> Result<String, Vec<u8>> {
if let Some(value) = self.coremap.read().unwrap().get(key) {
Ok(value.to_string())
} else {
Err(RESP_NOT_FOUND.to_owned())
}
}
pub fn set(&self, key: &str, value: &str) -> Result<(), Vec<u8>> {
match self.coremap.write().unwrap().entry(key.to_string()) {
Entry::Occupied(_) => return Err(RESP_OVERWRITE_ERROR.to_owned()),
Entry::Vacant(e) => {
let _ = e.insert(value.to_string());
Ok(())
}
}
}
pub fn update(&self, key: &str, value: &str) -> Result<(), Vec<u8>> {
match self.coremap.write().unwrap().entry(key.to_string()) {
Entry::Occupied(ref mut e) => {
e.insert(value.to_string());
Ok(())
}
Entry::Vacant(_) => Err(RESP_NOT_FOUND.to_owned()),
}
}
pub fn del(&self, key: &str) -> Result<(), Vec<u8>> {
if let Some(_) = self.coremap.write().unwrap().remove(&key.to_owned()) {
Ok(())
} else {
Err(RESP_NOT_FOUND.to_owned())
}
}
#[cfg(Debug)]
pub fn print_debug_table(&self) {
println!("{:#?}", *self.coremap.read().unwrap());
}
}
impl CoreDB {
pub fn new() -> Self {
CoreDB {
shared: Arc::new(Coretable {
coremap: RwLock::new(HashMap::new()),
}),
}
}
pub fn get_handle(&self) -> Arc<Coretable> {
Arc::clone(&self.shared)
}
}

@ -22,11 +22,9 @@
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream};
mod protocol;
mod coredb;
use std::cmp::Ordering;
use std::sync::Arc;
mod protocol;
use protocol::read_query;
static ADDR: &'static str = "127.0.0.1:2003";
#[tokio::main]
@ -35,7 +33,14 @@ async fn main() {
println!("Server running on terrapipe://127.0.0.1:2003");
loop {
let (mut socket, _) = listener.accept().await.unwrap();
tokio::spawn(async move {});
tokio::spawn(async move {
let q = read_query(&mut socket).await;
let df = match q {
Ok(q) => q,
Err(e) => return close_conn_with_error(socket, e).await,
};
println!("{:#?}", df);
});
}
}

@ -21,7 +21,10 @@
use corelib::responses;
use corelib::ActionType;
use std::panic;
use corelib::{DEF_QDATAFRAME_BUSIZE, DEF_QMETALAYOUT_BUFSIZE, DEF_QMETALINE_BUFSIZE};
use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
use tokio::net::TcpStream;
#[derive(Debug, PartialEq)]
pub struct PreQMF {
@ -38,23 +41,29 @@ impl PreQMF {
{
if let Some(atype) = atype.chars().next() {
let atype = match atype {
'+' => ActionType::Simple,
'*' => ActionType::Simple,
'$' => ActionType::Pipeline,
_ => return Err(responses::RESP_INVALID_MF.to_owned()),
};
let (csize, metaline_size) =
match (csize.parse::<usize>(), metaline_size.parse::<usize>()) {
(Ok(x), Ok(y)) => (x, y),
_ => return Err(responses::RESP_INVALID_MF.to_owned()),
};
return Ok(PreQMF {
action_type: atype,
content_size: csize,
metaline_size,
});
let csize = csize.trim().trim_matches(char::from(0));
let metaline_size = metaline_size.trim().trim_matches(char::from(0));
if let (Ok(csize), Ok(metaline_size)) =
(csize.parse::<usize>(), metaline_size.parse::<usize>())
{
return Ok(PreQMF {
action_type: atype,
content_size: csize,
metaline_size,
});
} else {
return Err(responses::RESP_INVALID_MF.to_owned());
}
} else {
Err(responses::RESP_INVALID_MF.to_owned())
}
} else {
Err(responses::RESP_INVALID_MF.to_owned())
}
Err(responses::RESP_INVALID_MF.to_owned())
}
}
@ -106,12 +115,13 @@ fn test_get_sizes() {
assert_eq!(sizes, vec![10usize, 20usize, 30usize]);
}
pub fn extract_idents(buf: Vec<u8>, skip_sequence: Vec<usize>) -> Vec<String> {
fn extract_idents(buf: Vec<u8>, skip_sequence: Vec<usize>) -> Vec<String> {
skip_sequence
.into_iter()
.scan(buf.into_iter(), |databuf, size| {
let tok: Vec<u8> = databuf.take(size).collect();
let _ = databuf.next();
// FIXME(ohsayan): This is quite slow, we'll have to use SIMD in the future
Some(String::from_utf8_lossy(&tok).to_string())
})
.collect()
@ -132,3 +142,34 @@ fn test_extract_idents() {
let res = extract_idents(badbuf, skip_sequence);
assert_eq!(res[1], "<22><>");
}
#[derive(Debug)]
pub struct Dataframe {
data: Vec<String>,
actiontype: ActionType,
}
pub async fn read_query(mut stream: &mut TcpStream) -> Result<Dataframe, Vec<u8>> {
let mut bufreader = BufReader::new(&mut stream);
let mut metaline_buf = String::with_capacity(DEF_QMETALINE_BUFSIZE);
bufreader.read_line(&mut metaline_buf).await.unwrap();
let pqmf = match PreQMF::from_buffer(metaline_buf) {
Ok(pq) => pq,
Err(e) => return Err(e),
};
let (mut metalayout_buf, mut dataframe_buf) = (
String::with_capacity(pqmf.metaline_size),
vec![0; pqmf.content_size],
);
bufreader.read_line(&mut metalayout_buf).await.unwrap();
bufreader.read(&mut dataframe_buf).await.unwrap();
let ss = match get_sizes(metalayout_buf) {
Ok(ss) => ss,
Err(e) => return Err(e),
};
let dataframe = Dataframe {
data: extract_idents(dataframe_buf, ss),
actiontype: pqmf.action_type,
};
Ok(dataframe)
}

Loading…
Cancel
Save