Fix model data driver impl and add tests

next
Sayan Nandan 7 months ago
parent b119a7053f
commit ff1067d648
No known key found for this signature in database
GPG Key ID: 0EBD769024B24F0A

@ -38,7 +38,7 @@ use crate::engine::{
},
};
fn exec_create(
pub fn exec_create(
gns: &impl GlobalInstanceLike,
create: &str,
verify: impl Fn(&Space),
@ -54,7 +54,7 @@ fn exec_create(
})
}
fn exec_alter(
pub fn exec_alter(
gns: &impl GlobalInstanceLike,
alter: &str,
verify: impl Fn(&Space),
@ -70,7 +70,7 @@ fn exec_alter(
})
}
fn exec_create_alter(
pub fn exec_create_alter(
gns: &impl GlobalInstanceLike,
crt: &str,
alt: &str,

@ -113,19 +113,28 @@ fn _exec_only_update(global: &impl GlobalInstanceLike, update: &str) -> QueryRes
dml::update(global, update)
}
pub(self) fn exec_insert<T: Default>(
pub fn exec_insert_core<T: Default>(
global: &impl GlobalInstanceLike,
model: &str,
insert: &str,
key_name: &str,
f: impl Fn(Row) -> T,
) -> QueryResult<T> {
_exec_only_create_space_model(global, model)?;
_exec_only_insert(global, insert, |entity| {
_exec_only_read_key_and_then(global, entity, key_name, |row| f(row))
})?
}
pub fn exec_insert<T: Default>(
global: &impl GlobalInstanceLike,
model: &str,
insert: &str,
key_name: &str,
f: impl Fn(Row) -> T,
) -> QueryResult<T> {
_exec_only_create_space_model(global, model)?;
self::exec_insert_core(global, insert, key_name, f)
}
pub(self) fn exec_insert_only(global: &impl GlobalInstanceLike, insert: &str) -> QueryResult<()> {
_exec_only_insert(global, insert, |_| {})
}

@ -51,6 +51,7 @@ use {
pub const GENERAL_EXECUTOR_WINDOW: u64 = 5 * 60;
/// A task for the [`FractalMgr`] to perform
#[derive(Debug)]
pub struct Task<T> {
threshold: usize,
task: T,
@ -66,6 +67,10 @@ impl<T> Task<T> {
fn with_threshold(task: T, threshold: usize) -> Self {
Self { threshold, task }
}
#[cfg(test)]
pub fn into_task(self) -> T {
self.task
}
}
/// A general task
@ -94,6 +99,7 @@ impl GenericTask {
}
/// A critical task
#[derive(Debug)]
pub enum CriticalTask {
/// Write a new data batch
WriteBatch(ModelUniqueID, usize),

@ -30,13 +30,15 @@ use {
Task,
},
crate::engine::{
core::GlobalNS,
core::{EntityIDRef, GlobalNS},
data::uuid::Uuid,
error::ErrorKind,
fractal::drivers::FractalModelDriver,
storage::{
safe_interfaces::{paths_v1, FSInterface, NullFS, VirtualFS},
safe_interfaces::{paths_v1, FSInterface, NullFS, StdModelBatch, VirtualFS},
GNSDriver, ModelDriver,
},
RuntimeResult,
},
parking_lot::{Mutex, RwLock},
std::collections::HashMap,
@ -45,25 +47,52 @@ use {
/// A `test` mode global implementation
pub struct TestGlobal<Fs: FSInterface = VirtualFS> {
gns: GlobalNS,
hp_queue: RwLock<Vec<Task<CriticalTask>>>,
lp_queue: RwLock<Vec<Task<GenericTask>>>,
#[allow(unused)]
max_delta_size: usize,
txn_driver: Mutex<FractalGNSDriver<Fs>>,
model_drivers: RwLock<HashMap<ModelUniqueID, super::drivers::FractalModelDriver<Fs>>>,
max_data_pressure: usize,
}
impl<Fs: FSInterface> TestGlobal<Fs> {
fn new(gns: GlobalNS, max_delta_size: usize, txn_driver: GNSDriver<Fs>) -> Self {
Self {
gns,
hp_queue: RwLock::default(),
lp_queue: RwLock::default(),
max_delta_size,
txn_driver: Mutex::new(FractalGNSDriver::new(txn_driver)),
model_drivers: RwLock::default(),
max_data_pressure: usize::MAX,
}
}
pub fn set_max_data_pressure(&mut self, max_data_pressure: usize) {
self.max_data_pressure = max_data_pressure;
}
/// Normally, model drivers are not loaded on startup because of shared global state. Calling this will attempt to load
/// all model drivers
pub fn load_model_drivers(&self) -> RuntimeResult<()> {
let mut mdl_drivers = self.model_drivers.write();
let space_idx = self.gns.idx().read();
for (model_name, model) in self.gns.idx_models().read().iter() {
let space_uuid = space_idx.get(model_name.space()).unwrap().get_uuid();
assert!(mdl_drivers
.insert(
ModelUniqueID::new(model_name.space(), model_name.entity(), model.get_uuid()),
FractalModelDriver::init(ModelDriver::open_model_driver(
model,
&paths_v1::model_path(
model_name.space(),
space_uuid,
model_name.entity(),
model.get_uuid(),
),
)?)
)
.is_none());
}
Ok(())
}
}
impl<Fs: FSInterface> TestGlobal<Fs> {
@ -110,13 +139,28 @@ impl<Fs: FSInterface> GlobalInstanceLike for TestGlobal<Fs> {
&self.txn_driver
}
fn taskmgr_post_high_priority(&self, task: Task<CriticalTask>) {
self.hp_queue.write().push(task)
match task.into_task() {
CriticalTask::WriteBatch(mdl_id, count) => {
let models = self.gns.idx_models().read();
let mdl = models
.get(&EntityIDRef::new(mdl_id.space(), mdl_id.model()))
.unwrap();
self.model_drivers
.read()
.get(&mdl_id)
.unwrap()
.batch_driver()
.lock()
.commit_event(StdModelBatch::new(mdl, count))
.unwrap()
}
}
}
fn taskmgr_post_standard_priority(&self, task: Task<GenericTask>) {
self.lp_queue.write().push(task)
}
fn get_max_delta_size(&self) -> usize {
100
self.max_data_pressure
}
fn purge_model_driver(
&self,
@ -162,6 +206,9 @@ impl<Fs: FSInterface> GlobalInstanceLike for TestGlobal<Fs> {
impl<Fs: FSInterface> Drop for TestGlobal<Fs> {
fn drop(&mut self) {
let mut txn_driver = self.txn_driver.lock();
GNSDriver::close_driver(&mut txn_driver.txn_driver).unwrap()
GNSDriver::close_driver(&mut txn_driver.txn_driver).unwrap();
for (_, model_driver) in self.model_drivers.write().drain() {
model_driver.close().unwrap();
}
}
}

@ -110,8 +110,11 @@ struct RowWriter<'b, Fs: FSInterface> {
}
impl<'b, Fs: FSInterface> RowWriter<'b, Fs> {
fn write_static_metadata(&mut self, model: &Model) -> RuntimeResult<()> {
// write batch start information: [pk tag:1B][schema version][column count]
/// write global row information:
/// - pk tag
/// - schema version
/// - column count
fn write_row_global_metadata(&mut self, model: &Model) -> RuntimeResult<()> {
self.f
.dtrack_write(&[model.p_tag().tag_unique().value_u8()])?;
self.f.dtrack_write(
@ -119,11 +122,14 @@ impl<'b, Fs: FSInterface> RowWriter<'b, Fs> {
.delta_state()
.schema_current_version()
.value_u64()
.to_le_bytes(),
.u64_bytes_le(),
)?;
self.f
.dtrack_write(&(model.fields().st_len() as u64).to_le_bytes())
.dtrack_write(&(model.fields().st_len() - 1).u64_bytes_le())
}
/// write row metadata:
/// - change type
/// - txn id
fn write_row_metadata(
&mut self,
change: DataDeltaKind,
@ -139,10 +145,9 @@ impl<'b, Fs: FSInterface> RowWriter<'b, Fs> {
_ => panic!(),
}
}
// write [change type][txn id]
let change_type = [change.value_u8()];
self.f.dtrack_write(&change_type)?;
let txn_id = txn_id.value_u64().to_le_bytes();
let txn_id = txn_id.value_u64().u64_bytes_le();
self.f.dtrack_write(&txn_id)?;
Ok(())
}
@ -154,7 +159,7 @@ impl<'b, Fs: FSInterface> RowWriter<'b, Fs> {
// UNSAFE(@ohsayan): +tagck
pk.read_uint()
}
.to_le_bytes();
.u64_bytes_le();
self.f.dtrack_write(&data)?;
}
TagUnique::Str | TagUnique::Bin => {
@ -206,7 +211,7 @@ impl<'a, 'b, Fs: FSInterface> BatchWriter<'a, 'b, Fs> {
fn write_batch(
model: &'a Model,
g: &'a Guard,
count: usize,
expected: usize,
f: &'b mut TrackedWriter<
Fs::File,
<BatchAdapter<ModelDataAdapter> as RawJournalAdapter>::Spec,
@ -224,10 +229,12 @@ impl<'a, 'b, Fs: FSInterface> BatchWriter<'a, 'b, Fs> {
let mut me = Self::new(model, g, f)?;
let mut applied_deltas = vec![];
let mut i = 0;
while i < count {
while i < expected {
let delta = me.model.delta_state().__data_delta_dequeue(me.g).unwrap();
match me.step(&delta) {
Ok(()) => {
// flush buffer after every delta write
me.row_writer.f.flush_buf()?;
applied_deltas.push(delta);
i += 1;
}
@ -254,7 +261,7 @@ impl<'a, 'b, Fs: FSInterface> BatchWriter<'a, 'b, Fs> {
>,
) -> RuntimeResult<Self> {
let mut row_writer = RowWriter { f };
row_writer.write_static_metadata(model)?;
row_writer.write_row_global_metadata(model)?;
Ok(Self {
model,
row_writer,
@ -313,14 +320,14 @@ impl<'a> JournalAdapterEvent<BatchAdapter<ModelDataAdapter>> for StdModelBatch<'
>,
) -> RuntimeResult<()> {
// [expected commit]
writer.dtrack_write(&(self.1 as u64).to_le_bytes())?;
writer.dtrack_write(&self.1.u64_bytes_le())?;
let g = pin();
let actual_commit = BatchWriter::<Fs>::write_batch(self.0, &g, self.1, writer)?;
if actual_commit != self.1 {
// early exit
writer.dtrack_write(&[EventType::EarlyExit.dscr()])?;
}
writer.dtrack_write(&(actual_commit as u64).to_le_bytes())
writer.dtrack_write(&actual_commit.u64_bytes_le())
}
}
@ -352,7 +359,7 @@ impl<'a> JournalAdapterEvent<BatchAdapter<ModelDataAdapter>> for FullModel<'a> {
.f
.dtrack_write(&current_row_count.u64_bytes_le())?;
// [pk tag][schema version][column cnt]
row_writer.write_static_metadata(self.0)?;
row_writer.write_row_global_metadata(self.0)?;
for (key, row_data) in index.mt_iter_kv(&g) {
let row_data = row_data.read();
row_writer.write_row_metadata(DataDeltaKind::Insert, row_data.get_txn_revised())?;

@ -26,3 +26,5 @@
pub mod gns_log;
pub mod mdl_journal;
#[cfg(test)]
mod tests;

@ -0,0 +1,27 @@
/*
* Created on Thu Feb 22 2024
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2024, Sayan Nandan <nandansayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
mod model_driver;

@ -0,0 +1,134 @@
/*
* Created on Thu Feb 22 2024
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2024, Sayan Nandan <nandansayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
use {
crate::{
engine::{
core::{dml, model::Model, space::Space, EntityIDRef},
data::lit::Lit,
error::QueryResult,
fractal::{test_utils::TestGlobal, GlobalInstanceLike},
ql::{
ast,
ddl::crt::{CreateModel, CreateSpace},
dml::ins::InsertStatement,
tests::lex_insecure,
},
storage::common::interface::fs_test::VirtualFS,
},
util::test_utils,
},
crossbeam_epoch::pin,
};
fn create_model_and_space(global: &TestGlobal<VirtualFS>, create_model: &str) -> QueryResult<()> {
let tokens = lex_insecure(create_model.as_bytes()).unwrap();
let create_model: CreateModel = ast::parse_ast_node_full(&tokens[2..]).unwrap();
// first create space
let create_space_str = format!("create space {}", create_model.model_name.space());
let create_space_tokens = lex_insecure(create_space_str.as_bytes()).unwrap();
let create_space: CreateSpace = ast::parse_ast_node_full(&create_space_tokens[2..]).unwrap();
Space::transactional_exec_create(global, create_space)?;
Model::transactional_exec_create(global, create_model).map(|_| ())
}
fn run_insert(global: &TestGlobal<VirtualFS>, insert: &str) -> QueryResult<()> {
let tokens = lex_insecure(insert.as_bytes()).unwrap();
let insert: InsertStatement = ast::parse_ast_node_full(&tokens[1..]).unwrap();
dml::insert(global, insert)
}
#[test]
fn empty_model_data() {
test_utils::with_variable("empty_model_data", |log_name| {
// create and close
{
let global = TestGlobal::new_with_vfs_driver(log_name);
let _ = create_model_and_space(
&global,
"create model milky_way.solar_system(planet_name: string, population: uint64)",
)
.unwrap();
}
// open
{
let global = TestGlobal::new_with_vfs_driver(log_name);
drop(global);
}
})
}
#[test]
fn model_data_deltas() {
test_utils::with_variable(("model_data_deltas", 1000), |(log_name, change_count)| {
// create, insert and close
{
let mut global = TestGlobal::new_with_vfs_driver(log_name);
global.set_max_data_pressure(change_count);
let _ = create_model_and_space(
&global,
"create model apps.social(user_name: string, password: string)",
)
.unwrap();
for i in 1..=change_count {
run_insert(
&global,
&format!("insert into apps.social('user-{i:0>1000}', 'password-{i:0>1000}')"),
)
.unwrap();
}
}
// reopen and verify a 100 times
test_utils::multi_run(100, || {
let global = TestGlobal::new_with_vfs_driver(log_name);
global.load_model_drivers().unwrap();
global
.state()
.with_model(EntityIDRef::new("apps", "social"), |model| {
let g = pin();
for i in 1..=change_count {
assert_eq!(
model
.primary_index()
.select(Lit::new_string(format!("user-{i:0>1000}")), &g)
.unwrap()
.d_data()
.read()
.fields()
.get("password")
.cloned()
.unwrap()
.into_str()
.unwrap(),
format!("password-{i:0>1000}")
)
}
Ok(())
})
.unwrap()
})
})
}

@ -286,8 +286,8 @@ impl<BA: BatchAdapterSpec> RawJournalAdapter for BatchAdapter<BA> {
if real_commit_size == _stored_expected_commit_size {
break;
}
let event_type = f.read_block::<1>().and_then(|b| {
<<BA as BatchAdapterSpec>::EventType as TaggedEnum>::try_from_raw(b[0])
let event_type = f.read_block().and_then(|[b]| {
<<BA as BatchAdapterSpec>::EventType as TaggedEnum>::try_from_raw(b)
.ok_or(StorageError::RawJournalCorrupted.into())
})?;
// is this an early exit marker? if so, exit

@ -42,10 +42,20 @@ pub fn rng() -> ThreadRng {
rand::thread_rng()
}
pub fn multi_run(count: usize, f: impl Fn()) {
for _ in 0..count {
f()
}
}
pub fn shuffle_slice<T>(slice: &mut [T], rng: &mut impl Rng) {
slice.shuffle(rng)
}
pub fn with_variable<T, U>(variable: T, f: impl Fn(T) -> U) -> U {
f(variable)
}
pub fn with_files<const N: usize, T>(files: [&str; N], f: impl Fn([&str; N]) -> T) -> T {
use std::fs;
for file in files {

Loading…
Cancel
Save