Upgrade `del` to use `ExceptFor`

next
Sayan Nandan 4 years ago
parent fc3e760c9a
commit fb3fc5d2f1
No known key found for this signature in database
GPG Key ID: C31EFD7DDA12AEE0

@ -195,12 +195,11 @@ impl IntoRespGroup for RespGroup {
})
.flatten()
.collect();
let metalayout = [
vec![b'#'],
(sizeline.len() - 1).to_string().into_bytes().to_vec(),
sizes,
]
.concat();
let sizeline_bytes = (sizeline.len() - 1).to_string().into_bytes();
let mut metalayout = Vec::with_capacity(1 + sizeline_bytes.len() + sizes.len());
metalayout.push(b'#');
metalayout.extend(sizeline_bytes);
metalayout.extend(sizes);
let dataframe = [sizeline, self.df_bytes].concat();
(metalayout, dataframe)
}

@ -26,7 +26,6 @@
use crate::terrapipe::RespCodes;
use bytes::BytesMut;
use std::fmt;
use std::iter::Skip;
use std::ops::Deref;
use std::vec::IntoIter;

@ -103,14 +103,6 @@ impl CoreDB {
Entry::Vacant(_) => Err(RespCodes::NotFound),
}
}
/// DEL a `key`
pub fn del(&self, key: &str) -> ActionResult<()> {
if let Some(_) = self.acquire_write().remove(&key.to_owned()) {
Ok(())
} else {
Err(RespCodes::NotFound)
}
}
/// Check if a `key` exists
pub fn exists(&self, key: &str) -> bool {
@ -155,11 +147,11 @@ impl CoreDB {
}
}
/// Acquire a write lock
fn acquire_write(&self) -> RwLockWriteGuard<'_, HashMap<String, Data>> {
pub fn acquire_write(&self) -> RwLockWriteGuard<'_, HashMap<String, Data>> {
self.shared.coremap.write()
}
/// Acquire a read lock
fn acquire_read(&self) -> RwLockReadGuard<'_, HashMap<String, Data>> {
pub fn acquire_read(&self) -> RwLockReadGuard<'_, HashMap<String, Data>> {
self.shared.coremap.read()
}
/// Flush the contents of the in-memory table onto disk

@ -22,24 +22,103 @@
//! # `DEL` queries
//! This module provides functions to work with `DEL` queries
use crate::coredb::CoreDB;
use crate::coredb::{self, CoreDB};
use crate::resputil::*;
use corelib::builders::response::*;
use corelib::de::DataGroup;
use corelib::terrapipe::RespCodes;
use corelib::terrapipe::responses;
/// Run a `DEL` query
pub fn del(handle: &CoreDB, act: DataGroup) -> Response {
if act.len() < 2 {
return RespCodes::ActionError.into_response();
let howmany = act.len() - 1;
if howmany == 0 {
// What's the use of just a `del`? Tell us more!
return responses::ARG_ERR.to_owned();
}
// Get a write lock
let mut db_handle = handle.acquire_write();
// Assume that half of the actions will fail
let mut except_for = ExceptFor::with_space_for(howmany / 2);
act.into_iter().skip(1).enumerate().for_each(|(idx, key)| {
if db_handle.remove(&key).is_none() {
// In the event this is none -> the key didn't exist
// so we add this to `except_for`
except_for.add(idx);
}
let mut resp = SResp::new();
let mut respgroup = RespGroup::new();
act.into_iter()
.skip(1)
.for_each(|key| match handle.del(&key) {
Ok(_) => respgroup.add_item(RespCodes::Okay),
Err(e) => respgroup.add_item(e),
});
resp.add_group(respgroup);
resp.into_response()
if except_for.no_failures() {
return responses::OKAY.to_owned();
} else if except_for.did_all_fail(howmany) {
return responses::NOT_FOUND.to_owned();
} else {
return except_for.into_response();
}
}
#[cfg(test)]
#[test]
fn test_kvengine_del_allfailed() {
let db = CoreDB::new().unwrap();
let action = DataGroup::new(vec!["DEL".to_owned(), "x".to_owned(), "y".to_owned()]);
let r = del(&db, action);
db.finish_db(true, true, true);
let resp_should_be = responses::NOT_FOUND.to_owned();
assert_eq!(resp_should_be, r);
}
#[cfg(test)]
#[test]
fn test_kvenegine_del_allokay() {
let db = CoreDB::new().unwrap();
let mut write_handle = db.acquire_write();
assert!(write_handle
.insert(
"foo".to_owned(),
coredb::Data::from_string(&"bar".to_owned()),
)
.is_none());
assert!(write_handle
.insert(
"foo2".to_owned(),
coredb::Data::from_string(&"bar2".to_owned()),
)
.is_none());
drop(write_handle); // Drop the write lock
let action = DataGroup::new(vec!["DEL".to_owned(), "foo".to_owned(), "foo2".to_owned()]);
let r = del(&db, action);
db.finish_db(true, true, true);
assert_eq!(r, responses::OKAY.to_owned());
}
#[cfg(test)]
#[test]
fn test_kvenegine_del_exceptfor() {
let db = CoreDB::new().unwrap();
let mut write_handle = db.acquire_write();
assert!(write_handle
.insert(
"foo".to_owned(),
coredb::Data::from_string(&"bar2".to_owned())
)
.is_none());
assert!(write_handle
.insert(
"foo3".to_owned(),
coredb::Data::from_string(&"bar3".to_owned())
)
.is_none());
// For us `foo2` is the missing key, which should fail to delete
drop(write_handle); // Drop the write lock
let action = DataGroup::new(vec![
"DEL".to_owned(),
"foo".to_owned(),
"foo2".to_owned(),
"foo3".to_owned(),
]);
let r = del(&db, action);
db.finish_db(true, true, true);
let mut except_for = ExceptFor::new();
except_for.add(1);
let resp_should_be = except_for.into_response();
assert_eq!(resp_should_be, r);
}

@ -55,6 +55,7 @@ use tokio::net::TcpStream;
/// needed at times.
pub struct ExceptFor {
df_ext: Vec<u8>,
howmany: usize,
}
const EXFOR_CAP: usize = 2 * 10;
@ -63,17 +64,28 @@ impl ExceptFor {
pub fn new() -> Self {
let mut df_ext = Vec::with_capacity(EXFOR_CAP + 2);
df_ext.push(b'^');
ExceptFor { df_ext }
ExceptFor { df_ext, howmany: 0 }
}
pub fn with_space_for(howmany: usize) -> Self {
let mut df_ext = vec![b'^'];
df_ext.reserve(howmany);
ExceptFor { df_ext }
ExceptFor { df_ext, howmany: 0 }
}
/// This will essentially add 'idx,' to the `df_ext` field as bytes
fn bump_up_count(&mut self) {
self.howmany += 1;
}
pub fn did_all_fail(&self, howmany: usize) -> bool {
howmany - self.howmany == 0
}
/// This will essentially add `idx`, to the `df_ext` field as bytes
pub fn add(&mut self, idx: usize) {
self.df_ext.extend(idx.to_string().into_bytes());
self.df_ext.push(b',');
self.bump_up_count();
}
/// Check if no failures have occurred
pub fn no_failures(&self) -> bool {
self.df_ext.len() - 1 == 0
}
}

Loading…
Cancel
Save