Add flush routines

next
Sayan Nandan 3 years ago
parent e89417cbc6
commit 1d403c0d1a

@ -107,6 +107,9 @@ impl KVEngine {
encoded_v: AtomicBool::new(encoded_v),
}
}
pub fn __get_inner_ref(&self) -> &Coremap<Data, Data> {
&self.table
}
/// Alter the table and set the key encoding switch
///
/// Note: this will need an empty table

@ -0,0 +1,113 @@
/*
* Created on Sat Jul 17 2021
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2021, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
//! # Flush routines
//!
//! This module contains multiple flush routines: at the memstore level, the keyspace level and
//! the table level
use crate::coredb::memstore::Keyspace;
use crate::coredb::memstore::Memstore;
use crate::coredb::memstore::ObjectID;
use crate::coredb::memstore::Table;
use crate::storage::interface::DIR_KSROOT;
use std::fs::{self, File};
use std::io::Result as IoResult;
const PRELOAD_FILE_PATH_TEMP: &str = "data/ks/PRELOAD_";
const PRELOAD_FILE_PATH_TEMP_LEN_CHOP: usize = PRELOAD_FILE_PATH_TEMP.len() - 1;
macro_rules! tbl_path {
($ksid:expr, $tableid:expr) => {
unsafe { concat_str!(DIR_KSROOT, "/", $ksid.as_str(), "/", $tableid.as_str()) }
};
}
/// No `partmap` handling. Just flushes the table to the expected location
pub fn flush_table(tableid: &ObjectID, ksid: &ObjectID, table: &Table) -> IoResult<()> {
let path = tbl_path!(tableid, ksid);
let mut file = File::create(&path)?;
match table {
Table::KV(kve) => {
super::interface::serialize_map_into_slow_buffer(&mut file, kve.__get_inner_ref())?
}
}
file.sync_all()?;
fs::rename(&path, &path[..path.len() - 1])
}
/// Flushes an entire keyspace to the expected location. No `partmap` or `preload` handling
pub fn flush_keyspace(ksid: &ObjectID, keyspace: &Keyspace) -> IoResult<()> {
for table in keyspace.tables.iter() {
self::flush_table(table.key(), &ksid, table.value())?;
}
Ok(())
}
/// Flushes a single partmap
pub fn flush_partmap(ksid: &ObjectID, keyspace: &Keyspace) -> IoResult<()> {
let path = unsafe { concat_str!(DIR_KSROOT, "/", ksid.as_str(), "/", "PARTMAP_") };
let mut file = File::create(&path)?;
super::interface::serialize_set_into_slow_buffer(&mut file, &keyspace.tables)?;
file.sync_all()?;
fs::rename(&path, &path[..path.len() - 1])?;
Ok(())
}
/// Flushes the entire **keyspace + partmap**
pub fn flush_keyspace_full(ksid: &ObjectID, keyspace: &Keyspace) -> IoResult<()> {
self::flush_partmap(ksid, keyspace)?;
self::flush_keyspace(ksid, keyspace)
}
/// Flushes everything in memory. No `partmap` or `preload` handling
pub fn flush(store: &Memstore) -> IoResult<()> {
for keyspace in store.keyspaces.iter() {
self::flush_keyspace(keyspace.key(), keyspace.value())?;
}
Ok(())
}
// Flush the `PRELOAD`
pub fn flush_preload(store: &Memstore) -> IoResult<()> {
let mut file = File::create(PRELOAD_FILE_PATH_TEMP)?;
super::interface::serialize_preload_into_slow_buffer(&mut file, store)?;
file.sync_all()?;
fs::rename(
&PRELOAD_FILE_PATH_TEMP,
&PRELOAD_FILE_PATH_TEMP[..PRELOAD_FILE_PATH_TEMP_LEN_CHOP],
)?;
Ok(())
}
/// Flush the entire **preload + keyspaces + their partmaps**
pub fn flush_full(store: &Memstore) -> IoResult<()> {
self::flush_preload(store)?;
for keyspace in store.keyspaces.iter() {
self::flush_keyspace_full(keyspace.key(), keyspace.value())?;
}
Ok(())
}

@ -26,11 +26,10 @@
//! Interfaces with the file system
use super::PartitionID;
use crate::coredb::buffers::Integer32Buffer;
use crate::coredb::htable::Coremap;
use crate::coredb::htable::Data;
use crate::coredb::memstore::Memstore;
use core::hash::Hash;
use std::io::Result as IoResult;
use std::io::{BufWriter, Write};
@ -83,19 +82,32 @@ pub fn create_tree(memroot: Memstore) -> IoResult<()> {
pub fn serialize_map_into_slow_buffer<T: Write>(
buffer: &mut T,
map: &Coremap<Data, Data>,
) -> std::io::Result<()> {
) -> IoResult<()> {
let mut buffer = BufWriter::new(buffer);
super::raw_serialize_map(map, &mut buffer)?;
buffer.flush()?;
Ok(())
}
/// Get the file for COW. If the parition ID is 0000
pub(super) fn cow_file(id: PartitionID) -> Integer32Buffer {
let mut buffer = Integer32Buffer::init(id);
unsafe {
// UNSAFE(@ohsayan): We know we're just pushing in one thing
buffer.push(b'_');
}
buffer
pub fn serialize_set_into_slow_buffer<T: Write, K, V>(
buffer: &mut T,
set: &Coremap<K, V>,
) -> IoResult<()>
where
K: Eq + Hash + AsRef<[u8]>,
{
let mut buffer = BufWriter::new(buffer);
super::raw_serialize_set(set, &mut buffer)?;
buffer.flush()?;
Ok(())
}
pub fn serialize_preload_into_slow_buffer<T: Write>(
buffer: &mut T,
store: &Memstore,
) -> IoResult<()> {
let mut buffer = BufWriter::new(buffer);
super::preload::raw_generate_preload(&mut buffer, store)?;
buffer.flush()?;
Ok(())
}

@ -91,9 +91,18 @@ macro_rules! try_dir_ignore_existing {
#[macro_export]
macro_rules! concat_path {
($($s:expr),*) => {{ {
($($s:expr),+) => {{ {
let mut path = std::path::PathBuf::with_capacity($(($s).len()+)*0);
$(path.push($s);)*
path
}}};
}
#[macro_export]
macro_rules! concat_str {
($($s:expr),+) => {{ {
let mut st = std::string::String::with_capacity($(($s).len()+)*0);
$(st.push_str($s);)*
st
}}};
}

@ -63,6 +63,7 @@ use std::io::Write;
#[macro_use]
mod macros;
// endof do not mess
pub mod flush;
pub mod interface;
pub mod preload;
#[cfg(test)]
@ -229,7 +230,7 @@ where
}
pub trait DeserializeFrom {
fn is_expected_len(current_len: usize) -> bool;
fn is_expected_len(clen: usize) -> bool;
fn from_slice(slice: &[u8]) -> Self;
}
@ -280,7 +281,10 @@ where
// move the ptr ahead; done with the key
ptr = ptr.add(lenkey);
// push it in
set.insert(key);
if !set.insert(key) {
// repeat?; that's not what we wanted
return None;
}
}
if ptr == end_ptr {
Some(set)

@ -24,6 +24,15 @@
*
*/
//! # Preload binary files
//!
//! Preloads are very critical binary files which contain metadata for this instance of
//! the database. Preloads are of two kinds:
//! 1. the `PRELOAD` that is placed at the root directory
//! 2. the `PARTMAP` preload that is placed in the ks directory
//!
use crate::coredb::memstore::Keyspace;
use crate::coredb::memstore::Memstore;
use crate::coredb::memstore::ObjectID;
use core::ptr;
@ -50,7 +59,7 @@ const VERSION: u8 = 1;
/// ([8B: Partion ID len][8B: Parition ID (not padded)])* => Data segment
/// ```
///
pub fn raw_generate_preload<W: Write>(w: &mut W, store: &Memstore) -> IoResult<()> {
pub(super) fn raw_generate_preload<W: Write>(w: &mut W, store: &Memstore) -> IoResult<()> {
// generate the meta segment
#[allow(clippy::identity_op)]
w.write_all(&[META_SEGMENT])?;
@ -58,7 +67,16 @@ pub fn raw_generate_preload<W: Write>(w: &mut W, store: &Memstore) -> IoResult<(
Ok(())
}
pub fn read_preload(preload: Vec<u8>) -> IoResult<HashSet<ObjectID>> {
/// Generate the `PART` disk file for this keyspace
/// ```text
/// ([8B: Len][?B: Label])*
/// ```
pub(super) fn raw_generate_partfile<W: Write>(w: &mut W, store: &Keyspace) -> IoResult<()> {
super::raw_serialize_set(&store.tables, w)
}
/// Reads the preload file and returns a set
pub(super) fn read_preload_raw(preload: Vec<u8>) -> IoResult<HashSet<ObjectID>> {
if preload.len() < 16 {
// nah, this is a bad disk file
return Err(IoError::from(ErrorKind::UnexpectedEof));
@ -74,6 +92,14 @@ pub fn read_preload(preload: Vec<u8>) -> IoResult<HashSet<ObjectID>> {
let ret = super::deserialize_set_ctype(&preload[1..]);
match ret {
Some(ret) => Ok(ret),
_ => Err(IoError::from(ErrorKind::UnexpectedEof)),
_ => Err(IoError::from(ErrorKind::InvalidData)),
}
}
/// Reads the partfile and returns a set
pub fn read_partfile_raw(partfile: Vec<u8>) -> IoResult<HashSet<ObjectID>> {
match super::deserialize_set_ctype(&partfile) {
Some(s) => Ok(s),
None => Err(IoError::from(ErrorKind::InvalidData)),
}
}

@ -129,7 +129,7 @@ fn test_runtime_panic_32bit_or_lower() {
}
mod interface_tests {
use super::interface::{cow_file, create_tree, DIR_KSROOT, DIR_ROOT, DIR_SNAPROOT};
use super::interface::{create_tree, DIR_KSROOT, DIR_ROOT, DIR_SNAPROOT};
use crate::concat_path;
use crate::coredb::memstore::Memstore;
use std::fs;
@ -167,13 +167,6 @@ mod interface_tests {
// clean up
fs::remove_dir_all(DIR_ROOT).unwrap();
}
#[test]
fn test_cowfile() {
let cow_file = cow_file(10);
assert_eq!(cow_file, "10_".to_owned());
assert_eq!(&cow_file[..cow_file.len() - 1], "10".to_owned());
}
}
mod preload_tests {
@ -184,7 +177,7 @@ mod preload_tests {
let memstore = Memstore::new_default();
let mut v = Vec::new();
preload::raw_generate_preload(&mut v, &memstore).unwrap();
let de: Vec<String> = preload::read_preload(v)
let de: Vec<String> = preload::read_preload_raw(v)
.unwrap()
.into_iter()
.map(|each| unsafe { each.as_str().to_owned() })

Loading…
Cancel
Save