Implement BGSAVE

next
Sayan Nandan 4 years ago
parent 09d7e33f9d
commit fb38acd392
No known key found for this signature in database
GPG Key ID: C31EFD7DDA12AEE0

@ -26,24 +26,65 @@ use crate::protocol::Connection;
use crate::protocol::Query;
use crate::queryengine;
use bytes::Bytes;
use libtdb::util::terminal;
use libtdb::TResult;
use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use parking_lot::RwLock;
use parking_lot::RwLockReadGuard;
use parking_lot::RwLockWriteGuard;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use tokio;
use tokio::sync::Notify;
use tokio::time::Instant;
/// This is a thread-safe database handle, which on cloning simply
/// gives another atomic reference to the `Coretable`
#[derive(Debug, Clone)]
pub struct CoreDB {
shared: Arc<Coretable>,
terminate: bool,
pub shared: Arc<Shared>,
}
#[derive(Debug)]
pub struct Shared {
pub bgsave_task: Notify,
pub table: RwLock<Coretable>,
}
impl Shared {
pub fn get_next_bgsave_point(&self) -> Option<Instant> {
let state = self.table.read();
if state.terminate {
return None;
}
// Kick in BGSAVE
match diskstore::flush_data(&self.table.read().get_ref()) {
Ok(_) => terminal::write_info("info: BGSAVE completed successfully\n").unwrap(),
Err(e) => terminal::write_error(format!("error: BGSAVE failed with error: '{}'\n", e))
.unwrap(),
}
Some(Instant::now() + Duration::from_secs(120))
}
pub fn is_termsig(&self) -> bool {
self.table.read().terminate
}
}
/// The `Coretable` holds all the key-value pairs in a `HashMap`
/// wrapped in a Read/Write lock
#[derive(Debug)]
pub struct Coretable {
coremap: RwLock<HashMap<String, Data>>,
coremap: HashMap<String, Data>,
pub terminate: bool,
}
impl Coretable {
pub fn get_ref<'a>(&'a self) -> &'a HashMap<String, Data> {
&self.coremap
}
pub fn get_mut_ref<'a>(&'a mut self) -> &'a mut HashMap<String, Data> {
&mut self.coremap
}
}
/// A wrapper for `Bytes`
@ -74,10 +115,10 @@ impl CoreDB {
#[cfg(debug_assertions)]
/// Flush the coretable entries when in debug mode
pub fn print_debug_table(&self) {
if self.acquire_read().len() == 0 {
if self.acquire_read().coremap.len() == 0 {
println!("In-memory table is empty");
} else {
println!("{:#?}", *self.acquire_read());
println!("{:#?}", self.acquire_read());
}
}
@ -98,41 +139,49 @@ impl CoreDB {
/// If it is - it restores the data. Otherwise it creates a new in-memory table
pub fn new() -> TResult<Self> {
let coretable = diskstore::get_saved()?;
if let Some(coretable) = coretable {
Ok(CoreDB {
shared: Arc::new(Coretable {
coremap: RwLock::new(coretable),
let db = if let Some(coretable) = coretable {
CoreDB {
shared: Arc::new(Shared {
bgsave_task: Notify::new(),
table: RwLock::new(Coretable {
coremap: coretable,
terminate: false,
}),
}),
terminate: false,
})
}
} else {
Ok(CoreDB {
shared: Arc::new(Coretable {
coremap: RwLock::new(HashMap::new()),
CoreDB {
shared: Arc::new(Shared {
bgsave_task: Notify::new(),
table: RwLock::new(Coretable {
coremap: HashMap::new(),
terminate: false,
}),
}),
terminate: false,
})
}
}
};
tokio::spawn(diskstore::bgsave(db.clone()));
Ok(db)
}
/// Acquire a write lock
pub fn acquire_write(&self) -> RwLockWriteGuard<'_, HashMap<String, Data>> {
self.shared.coremap.write()
pub fn acquire_write(&self) -> RwLockWriteGuard<'_, Coretable> {
self.shared.table.write()
}
/// Acquire a read lock
pub fn acquire_read(&self) -> RwLockReadGuard<'_, HashMap<String, Data>> {
self.shared.coremap.read()
pub fn acquire_read(&self) -> RwLockReadGuard<'_, Coretable> {
self.shared.table.read()
}
/// Flush the contents of the in-memory table onto disk
pub fn flush_db(&self) -> TResult<()> {
let data = &*self.acquire_write();
diskstore::flush_data(data)?;
let data = &self.acquire_write();
diskstore::flush_data(&data.coremap)?;
Ok(())
}
/// **⚠⚠⚠ This deletes everything stored in the in-memory table**
pub fn finish_db(self, areyousure: bool, areyouverysure: bool, areyousupersure: bool) {
if areyousure && areyouverysure && areyousupersure {
self.acquire_write().clear()
self.acquire_write().coremap.clear()
}
}
}
@ -140,12 +189,17 @@ impl CoreDB {
impl Drop for CoreDB {
// This prevents us from killing the database, in the event someone tries
// to access it
// If this is indeed the last DB instance, we should tell BGSAVE to terminate
fn drop(&mut self) {
if Arc::strong_count(&self.shared) == 1 {
// The strong count should be
if Arc::strong_count(&self.shared) == 2 {
// Acquire a lock to prevent anyone from writing something
let coremap = self.shared.coremap.write();
self.terminate = true;
drop(coremap);
let mut coretable = self.shared.table.write();
coretable.terminate = true;
drop(coretable);
// Drop the write lock first to avoid BGSAVE ending up in failing
// to get a read lock
self.shared.bgsave_task.notify();
}
}
}

@ -21,7 +21,7 @@
//! This module provides tools for handling persistently stored data
use crate::coredb::Data;
use crate::coredb::{self, Data};
use bincode;
use bytes::Bytes;
use libtdb::TResult;
@ -29,6 +29,7 @@ use std::collections::HashMap;
use std::fs;
use std::io::{ErrorKind, Write};
use std::iter::FromIterator;
use tokio::time;
type DiskStore = (Vec<String>, Vec<Vec<u8>>);
@ -69,3 +70,16 @@ pub fn flush_data(data: &HashMap<String, Data>) -> TResult<()> {
file.write_all(&encoded)?;
Ok(())
}
pub async fn bgsave(handle: coredb::CoreDB) {
while !handle.shared.is_termsig() {
if let Some(dur) = handle.shared.get_next_bgsave_point() {
tokio::select! {
_ = time::delay_until(dur) => {}
_ = handle.shared.bgsave_task.notified() => {}
}
} else {
handle.shared.bgsave_task.notified().await
}
}
}

@ -43,11 +43,13 @@ pub async fn del(handle: &CoreDB, con: &mut Connection, act: ActionGroup) -> TRe
let mut done_howmany = 0usize;
{
let mut whandle = handle.acquire_write();
let cmap = (*whandle).get_mut_ref();
act.into_iter().for_each(|key| {
if whandle.remove(&key).is_some() {
if cmap.remove(&key).is_some() {
done_howmany += 1
}
});
drop(cmap);
drop(whandle);
}
con.write_response(done_howmany).await?;

@ -40,11 +40,13 @@ pub async fn exists(handle: &CoreDB, con: &mut Connection, act: ActionGroup) ->
let mut how_many_of_them_exist = 0usize;
{
let rhandle = handle.acquire_read();
let cmap = rhandle.get_ref();
act.into_iter().for_each(|key| {
if rhandle.contains_key(&key) {
if cmap.contains_key(&key) {
how_many_of_them_exist += 1;
}
});
drop(cmap);
drop(rhandle);
}
con.write_response(how_many_of_them_exist).await?;

@ -39,7 +39,8 @@ pub async fn get(handle: &CoreDB, con: &mut Connection, act: ActionGroup) -> TRe
// Write #<m>\n#<n>\n&1\n to the stream
con.write_response(GroupBegin(1)).await?;
let res: Option<Bytes> = {
let reader = handle.acquire_read();
let rhandle = handle.acquire_read();
let reader = rhandle.get_ref();
unsafe {
reader
.get(act.get_ref().get_unchecked(1))

@ -40,7 +40,8 @@ pub async fn mget(handle: &CoreDB, con: &mut Connection, act: ActionGroup) -> TR
let mut keys = act.into_iter();
while let Some(key) = keys.next() {
let res: Option<Bytes> = {
let reader = handle.acquire_read();
let rhandle = handle.acquire_read();
let reader = rhandle.get_ref();
reader.get(&key).map(|b| b.get_blob().clone())
};
if let Some(value) = res {

@ -43,12 +43,14 @@ pub async fn mset(handle: &CoreDB, con: &mut Connection, act: ActionGroup) -> TR
let mut done_howmany = 0usize;
{
let mut whandle = handle.acquire_write();
let writer = whandle.get_mut_ref();
while let (Some(key), Some(val)) = (kviter.next(), kviter.next()) {
if let Entry::Vacant(v) = whandle.entry(key) {
if let Entry::Vacant(v) = writer.entry(key) {
let _ = v.insert(coredb::Data::from_string(val));
done_howmany += 1;
}
}
drop(writer);
drop(whandle);
}
con.write_response(done_howmany).await?;

@ -43,12 +43,14 @@ pub async fn mupdate(handle: &CoreDB, con: &mut Connection, act: ActionGroup) ->
let mut done_howmany = 0usize;
{
let mut whandle = handle.acquire_write();
let writer = whandle.get_mut_ref();
while let (Some(key), Some(val)) = (kviter.next(), kviter.next()) {
if let Entry::Occupied(mut v) = whandle.entry(key) {
if let Entry::Occupied(mut v) = writer.entry(key) {
let _ = v.insert(coredb::Data::from_string(val));
done_howmany += 1;
}
}
drop(writer);
drop(whandle);
}
con.write_response(done_howmany).await?;

@ -41,7 +41,8 @@ pub async fn set(handle: &CoreDB, con: &mut Connection, act: ActionGroup) -> TRe
let mut it = act.into_iter();
let did_we = {
let mut whandle = handle.acquire_write();
if let Entry::Vacant(e) = whandle.entry(
let writer = whandle.get_mut_ref();
if let Entry::Vacant(e) = writer.entry(
it.next()
.unwrap_or_else(|| unsafe { unreachable_unchecked() }),
) {

@ -41,7 +41,8 @@ pub async fn update(handle: &CoreDB, con: &mut Connection, act: ActionGroup) ->
let mut it = act.into_iter();
let did_we = {
let mut whandle = handle.acquire_write();
if let Entry::Occupied(mut e) = whandle.entry(
let writer = whandle.get_mut_ref();
if let Entry::Occupied(mut e) = writer.entry(
it.next()
.unwrap_or_else(|| unsafe { unreachable_unchecked() }),
) {

Loading…
Cancel
Save