Upgrade `get` to the new protocol

next
Sayan Nandan 4 years ago
parent c6f2504b30
commit e65ee21f82
No known key found for this signature in database
GPG Key ID: C31EFD7DDA12AEE0

@ -24,7 +24,7 @@
use crate::coredb::CoreDB;
use crate::protocol::{responses, ActionGroup, Connection};
use crate::resputil::GroupBegin;
use crate::resp::GroupBegin;
use libtdb::terrapipe::RespCodes;
use libtdb::TResult;
@ -40,7 +40,7 @@ pub async fn del(handle: &CoreDB, con: &mut Connection, act: ActionGroup) -> TRe
// Write #<m>\n#<n>\n&<howmany>\n to the stream
con.write_response(GroupBegin(howmany)).await?;
let mut keys = act.into_iter();
let mut handle = handle.acquire_write(); // Get a write handle
let mut handle = handle.acquire_write(); // Get a write lock
while let Some(key) = keys.next() {
if handle.remove(&key).is_some() {
con.write_response(RespCodes::Okay).await?;

@ -24,7 +24,7 @@
use crate::coredb::CoreDB;
use crate::protocol::{responses, ActionGroup, Connection};
use crate::resputil::GroupBegin;
use crate::resp::GroupBegin;
use libtdb::terrapipe::RespCodes;
use libtdb::TResult;
@ -37,7 +37,7 @@ pub async fn del(handle: &CoreDB, con: &mut Connection, act: ActionGroup) -> TRe
// Write #<m>\n#<n>\n&<howmany>\n to the stream
con.write_response(GroupBegin(howmany)).await?;
let mut keys = act.into_iter();
let handle = handle.acquire_read(); // Get a write handle
let handle = handle.acquire_read(); // Get a read lock
while let Some(key) = keys.next() {
if handle.contains_key(&key) {
con.write_response(RespCodes::Okay).await?;

@ -23,10 +23,30 @@
//! This module provides functions to work with `GET` queries
use crate::coredb::CoreDB;
use crate::protocol::{ActionGroup, Connection};
use crate::protocol::{responses, ActionGroup, Connection};
use crate::resp::{BytesWrapper, GroupBegin};
use libtdb::terrapipe::RespCodes;
use libtdb::TResult;
/// Run a `GET` query
pub async fn get(handle: &CoreDB, con: &mut Connection, act: ActionGroup) -> TResult<()> {
todo!()
let howmany = act.howmany();
if howmany == 0 {
return con.write_response(responses::ACTION_ERR.to_owned()).await;
}
// Write #<m>\n#<n>\n&<howmany>\n to the stream
con.write_response(GroupBegin(howmany)).await?;
let mut keys = act.into_iter();
let handle = handle.acquire_read(); // Get a read lock
while let Some(key) = keys.next() {
if let Some(value) = handle.get(&key) {
// Good, we got the value, write it off to the stream
con.write_response(BytesWrapper(value.get_blob().clone()))
.await?;
} else {
// Ah, couldn't find that key
con.write_response(RespCodes::NotFound).await?;
}
}
Ok(())
}

@ -26,7 +26,7 @@ mod diskstore;
mod kvengine;
mod protocol;
mod queryengine;
mod resputil;
mod resp;
use coredb::CoreDB;
use dbnet::run;
use tokio::signal;

@ -21,7 +21,7 @@
mod deserializer;
pub mod responses;
use crate::resputil::Writable;
use crate::resp::Writable;
use bytes::{Buf, BytesMut};
pub use deserializer::ActionGroup;
pub use deserializer::ParseResult;

@ -52,7 +52,7 @@ pub trait Writable {
/// This wrapper exists to prevent trait implementation conflicts when
/// an impl for `fmt::Display` may be implemented upstream
#[derive(Debug, PartialEq)]
pub struct BytesWrapper(Bytes);
pub struct BytesWrapper(pub Bytes);
/// This indicates the beginning of a response group in a response.
///
Loading…
Cancel
Save