diff --git a/server/src/coredb/mod.rs b/server/src/coredb/mod.rs index b8d5572b..947d3957 100644 --- a/server/src/coredb/mod.rs +++ b/server/src/coredb/mod.rs @@ -38,12 +38,12 @@ use core::sync::atomic::Ordering; pub use htable::Data; use libsky::TResult; use std::sync::Arc; +pub mod array; pub mod htable; pub mod iarray; pub mod lazy; pub mod lock; -pub mod array; -mod memstore; +pub mod memstore; /// This is a thread-safe database handle, which on cloning simply /// gives another atomic reference to the `shared` which is a `Shared` object diff --git a/server/src/storage/interface.rs b/server/src/storage/interface.rs new file mode 100644 index 00000000..57289072 --- /dev/null +++ b/server/src/storage/interface.rs @@ -0,0 +1,68 @@ +/* + * Created on Sat Jul 10 2021 + * + * This file is a part of Skytable + * Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source + * NoSQL database written by Sayan Nandan ("the Author") with the + * vision to provide flexibility in data modelling without compromising + * on performance, queryability or scalability. + * + * Copyright (c) 2021, Sayan Nandan + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * +*/ + +//! Interfaces with the file system + +use super::PartitionID; +use crate::coredb::htable::Coremap; +use crate::coredb::htable::Data; +use std::fs; +use std::io::Result as IoResult; +use std::io::{BufWriter, Write}; +use std::thread::{self, JoinHandle}; + +/// Uses a buffered writer under the hood to improve write performance as the provided +/// writable interface might be very slow. The buffer does flush once done, however, it +/// is important that you fsync yourself! +pub fn serialize_map_into_slow_buffer( + buffer: &mut T, + map: &Coremap, +) -> std::io::Result<()> { + let mut buffer = BufWriter::new(buffer); + super::raw_serialize_map(map, &mut buffer)?; + buffer.flush()?; + Ok(()) +} + +/// Get the file for COW. If the parition ID is 0000 +fn cow_file(id: PartitionID) -> String { + let mut id = unsafe { super::raw_byte_repr(&id) }.to_owned(); + id.push(b'_'); + unsafe { String::from_utf8_unchecked(id) } +} + +/// Returns a handle to a thread that was spawned to handle this specific flush routine +pub fn threaded_se( + tblref: Coremap, + partition_id: PartitionID, +) -> JoinHandle> { + thread::spawn(move || { + let mut f = fs::File::create(cow_file(partition_id))?; + self::serialize_map_into_slow_buffer(&mut f, &tblref)?; + f.sync_all()?; + Ok(()) + }) +} diff --git a/server/src/storage/mod.rs b/server/src/storage/mod.rs index 15c435da..184cd924 100644 --- a/server/src/storage/mod.rs +++ b/server/src/storage/mod.rs @@ -56,6 +56,13 @@ use core::mem; use core::ptr; use core::slice; use std::io::Write; +pub mod interface; + +/// The ID of the partition in a keyspace. Using too many keyspaces is an absolute anti-pattern +/// on Skytable, something that it has inherited from prior experience in large scale systems. As +/// such, the maximum number of tables in a keyspace is limited to 4.1 billion tables and ideally, +/// you should never hit that limit. +pub type PartitionID = u32; macro_rules! little_endian { ($block:block) => { @@ -206,11 +213,13 @@ macro_rules! to_64bit_little_endian { Do not down cast before swapping bytes */ -/// Get the raw bytes of an unsigned 64-bit integer -unsafe fn raw_len<'a>(len: &'a u64) -> &'a [u8] { +/// Get the raw bytes of anything. +/// +/// DISCLAIMER: THIS FUNCTION CAN DO TERRIBLE THINGS +unsafe fn raw_byte_repr<'a, T: 'a>(len: &'a T) -> &'a [u8] { { let ptr: *const u8 = mem::transmute(len); - slice::from_raw_parts::<'a>(ptr, mem::size_of::()) + slice::from_raw_parts::<'a>(ptr, mem::size_of::()) } } @@ -221,18 +230,24 @@ pub fn serialize_map(map: &Coremap) -> Result, std::io::Erro */ // write the len header first let mut w = Vec::with_capacity(128); + self::raw_serialize_map(map, &mut w)?; + Ok(w) +} + +/// Serialize a map and write it to a provided buffer +pub fn raw_serialize_map(map: &Coremap, w: &mut W) -> std::io::Result<()> { unsafe { - w.write_all(raw_len(&to_64bit_little_endian!(map.len())))?; + w.write_all(raw_byte_repr(&to_64bit_little_endian!(map.len())))?; // now the keys and values for kv in map.iter() { let (k, v) = (kv.key(), kv.value()); - w.write_all(raw_len(&to_64bit_little_endian!(k.len())))?; - w.write_all(raw_len(&to_64bit_little_endian!(v.len())))?; + w.write_all(raw_byte_repr(&to_64bit_little_endian!(k.len())))?; + w.write_all(raw_byte_repr(&to_64bit_little_endian!(v.len())))?; w.write_all(k)?; w.write_all(v)?; } } - Ok(w) + Ok(()) } /// Deserialize a file that contains a serialized map @@ -269,7 +284,7 @@ pub fn deserialize(data: Vec) -> Option> { let lenval = transmute_len(ptr); ptr = ptr.add(8); if (ptr.add(lenkey + lenval)) > end_ptr { - // not enough space + // not enough data left return None; } // get the key as a raw slice, we've already checked if end_ptr is less @@ -435,7 +450,7 @@ cfg_test!( #[should_panic] fn test_runtime_panic_32bit_or_lower() { let max = u64::MAX; - let byte_stream = unsafe { raw_len(&max).to_owned() }; + let byte_stream = unsafe { raw_byte_repr(&max).to_owned() }; let ptr = byte_stream.as_ptr(); unsafe { transmute_len(ptr) }; }