From ff1067d64822b708bb46c29b77efa2030972b7ab Mon Sep 17 00:00:00 2001 From: Sayan Nandan Date: Thu, 22 Feb 2024 22:32:33 +0530 Subject: [PATCH] Fix model data driver impl and add tests --- server/src/engine/core/tests/ddl_space/mod.rs | 6 +- server/src/engine/core/tests/dml/mod.rs | 15 +- server/src/engine/fractal/mgr.rs | 6 + server/src/engine/fractal/test_utils.rs | 61 +++++++- .../engine/storage/v2/impls/mdl_journal.rs | 33 +++-- server/src/engine/storage/v2/impls/mod.rs | 2 + .../src/engine/storage/v2/impls/tests/mod.rs | 27 ++++ .../storage/v2/impls/tests/model_driver.rs | 134 ++++++++++++++++++ .../src/engine/storage/v2/raw/journal/mod.rs | 4 +- server/src/util/test_utils.rs | 10 ++ 10 files changed, 270 insertions(+), 28 deletions(-) create mode 100644 server/src/engine/storage/v2/impls/tests/mod.rs create mode 100644 server/src/engine/storage/v2/impls/tests/model_driver.rs diff --git a/server/src/engine/core/tests/ddl_space/mod.rs b/server/src/engine/core/tests/ddl_space/mod.rs index 47eb544e..792caa0a 100644 --- a/server/src/engine/core/tests/ddl_space/mod.rs +++ b/server/src/engine/core/tests/ddl_space/mod.rs @@ -38,7 +38,7 @@ use crate::engine::{ }, }; -fn exec_create( +pub fn exec_create( gns: &impl GlobalInstanceLike, create: &str, verify: impl Fn(&Space), @@ -54,7 +54,7 @@ fn exec_create( }) } -fn exec_alter( +pub fn exec_alter( gns: &impl GlobalInstanceLike, alter: &str, verify: impl Fn(&Space), @@ -70,7 +70,7 @@ fn exec_alter( }) } -fn exec_create_alter( +pub fn exec_create_alter( gns: &impl GlobalInstanceLike, crt: &str, alt: &str, diff --git a/server/src/engine/core/tests/dml/mod.rs b/server/src/engine/core/tests/dml/mod.rs index 4c52d162..23b04586 100644 --- a/server/src/engine/core/tests/dml/mod.rs +++ b/server/src/engine/core/tests/dml/mod.rs @@ -113,19 +113,28 @@ fn _exec_only_update(global: &impl GlobalInstanceLike, update: &str) -> QueryRes dml::update(global, update) } -pub(self) fn exec_insert( +pub fn exec_insert_core( global: &impl GlobalInstanceLike, - model: &str, insert: &str, key_name: &str, f: impl Fn(Row) -> T, ) -> QueryResult { - _exec_only_create_space_model(global, model)?; _exec_only_insert(global, insert, |entity| { _exec_only_read_key_and_then(global, entity, key_name, |row| f(row)) })? } +pub fn exec_insert( + global: &impl GlobalInstanceLike, + model: &str, + insert: &str, + key_name: &str, + f: impl Fn(Row) -> T, +) -> QueryResult { + _exec_only_create_space_model(global, model)?; + self::exec_insert_core(global, insert, key_name, f) +} + pub(self) fn exec_insert_only(global: &impl GlobalInstanceLike, insert: &str) -> QueryResult<()> { _exec_only_insert(global, insert, |_| {}) } diff --git a/server/src/engine/fractal/mgr.rs b/server/src/engine/fractal/mgr.rs index bcf1ce7d..01a16f95 100644 --- a/server/src/engine/fractal/mgr.rs +++ b/server/src/engine/fractal/mgr.rs @@ -51,6 +51,7 @@ use { pub const GENERAL_EXECUTOR_WINDOW: u64 = 5 * 60; /// A task for the [`FractalMgr`] to perform +#[derive(Debug)] pub struct Task { threshold: usize, task: T, @@ -66,6 +67,10 @@ impl Task { fn with_threshold(task: T, threshold: usize) -> Self { Self { threshold, task } } + #[cfg(test)] + pub fn into_task(self) -> T { + self.task + } } /// A general task @@ -94,6 +99,7 @@ impl GenericTask { } /// A critical task +#[derive(Debug)] pub enum CriticalTask { /// Write a new data batch WriteBatch(ModelUniqueID, usize), diff --git a/server/src/engine/fractal/test_utils.rs b/server/src/engine/fractal/test_utils.rs index 41ecccae..5b1bb646 100644 --- a/server/src/engine/fractal/test_utils.rs +++ b/server/src/engine/fractal/test_utils.rs @@ -30,13 +30,15 @@ use { Task, }, crate::engine::{ - core::GlobalNS, + core::{EntityIDRef, GlobalNS}, data::uuid::Uuid, error::ErrorKind, + fractal::drivers::FractalModelDriver, storage::{ - safe_interfaces::{paths_v1, FSInterface, NullFS, VirtualFS}, + safe_interfaces::{paths_v1, FSInterface, NullFS, StdModelBatch, VirtualFS}, GNSDriver, ModelDriver, }, + RuntimeResult, }, parking_lot::{Mutex, RwLock}, std::collections::HashMap, @@ -45,25 +47,52 @@ use { /// A `test` mode global implementation pub struct TestGlobal { gns: GlobalNS, - hp_queue: RwLock>>, lp_queue: RwLock>>, #[allow(unused)] max_delta_size: usize, txn_driver: Mutex>, model_drivers: RwLock>>, + max_data_pressure: usize, } impl TestGlobal { fn new(gns: GlobalNS, max_delta_size: usize, txn_driver: GNSDriver) -> Self { Self { gns, - hp_queue: RwLock::default(), lp_queue: RwLock::default(), max_delta_size, txn_driver: Mutex::new(FractalGNSDriver::new(txn_driver)), model_drivers: RwLock::default(), + max_data_pressure: usize::MAX, } } + pub fn set_max_data_pressure(&mut self, max_data_pressure: usize) { + self.max_data_pressure = max_data_pressure; + } + /// Normally, model drivers are not loaded on startup because of shared global state. Calling this will attempt to load + /// all model drivers + pub fn load_model_drivers(&self) -> RuntimeResult<()> { + let mut mdl_drivers = self.model_drivers.write(); + let space_idx = self.gns.idx().read(); + for (model_name, model) in self.gns.idx_models().read().iter() { + let space_uuid = space_idx.get(model_name.space()).unwrap().get_uuid(); + assert!(mdl_drivers + .insert( + ModelUniqueID::new(model_name.space(), model_name.entity(), model.get_uuid()), + FractalModelDriver::init(ModelDriver::open_model_driver( + model, + &paths_v1::model_path( + model_name.space(), + space_uuid, + model_name.entity(), + model.get_uuid(), + ), + )?) + ) + .is_none()); + } + Ok(()) + } } impl TestGlobal { @@ -110,13 +139,28 @@ impl GlobalInstanceLike for TestGlobal { &self.txn_driver } fn taskmgr_post_high_priority(&self, task: Task) { - self.hp_queue.write().push(task) + match task.into_task() { + CriticalTask::WriteBatch(mdl_id, count) => { + let models = self.gns.idx_models().read(); + let mdl = models + .get(&EntityIDRef::new(mdl_id.space(), mdl_id.model())) + .unwrap(); + self.model_drivers + .read() + .get(&mdl_id) + .unwrap() + .batch_driver() + .lock() + .commit_event(StdModelBatch::new(mdl, count)) + .unwrap() + } + } } fn taskmgr_post_standard_priority(&self, task: Task) { self.lp_queue.write().push(task) } fn get_max_delta_size(&self) -> usize { - 100 + self.max_data_pressure } fn purge_model_driver( &self, @@ -162,6 +206,9 @@ impl GlobalInstanceLike for TestGlobal { impl Drop for TestGlobal { fn drop(&mut self) { let mut txn_driver = self.txn_driver.lock(); - GNSDriver::close_driver(&mut txn_driver.txn_driver).unwrap() + GNSDriver::close_driver(&mut txn_driver.txn_driver).unwrap(); + for (_, model_driver) in self.model_drivers.write().drain() { + model_driver.close().unwrap(); + } } } diff --git a/server/src/engine/storage/v2/impls/mdl_journal.rs b/server/src/engine/storage/v2/impls/mdl_journal.rs index 4419cff9..398fef9c 100644 --- a/server/src/engine/storage/v2/impls/mdl_journal.rs +++ b/server/src/engine/storage/v2/impls/mdl_journal.rs @@ -110,8 +110,11 @@ struct RowWriter<'b, Fs: FSInterface> { } impl<'b, Fs: FSInterface> RowWriter<'b, Fs> { - fn write_static_metadata(&mut self, model: &Model) -> RuntimeResult<()> { - // write batch start information: [pk tag:1B][schema version][column count] + /// write global row information: + /// - pk tag + /// - schema version + /// - column count + fn write_row_global_metadata(&mut self, model: &Model) -> RuntimeResult<()> { self.f .dtrack_write(&[model.p_tag().tag_unique().value_u8()])?; self.f.dtrack_write( @@ -119,11 +122,14 @@ impl<'b, Fs: FSInterface> RowWriter<'b, Fs> { .delta_state() .schema_current_version() .value_u64() - .to_le_bytes(), + .u64_bytes_le(), )?; self.f - .dtrack_write(&(model.fields().st_len() as u64).to_le_bytes()) + .dtrack_write(&(model.fields().st_len() - 1).u64_bytes_le()) } + /// write row metadata: + /// - change type + /// - txn id fn write_row_metadata( &mut self, change: DataDeltaKind, @@ -139,10 +145,9 @@ impl<'b, Fs: FSInterface> RowWriter<'b, Fs> { _ => panic!(), } } - // write [change type][txn id] let change_type = [change.value_u8()]; self.f.dtrack_write(&change_type)?; - let txn_id = txn_id.value_u64().to_le_bytes(); + let txn_id = txn_id.value_u64().u64_bytes_le(); self.f.dtrack_write(&txn_id)?; Ok(()) } @@ -154,7 +159,7 @@ impl<'b, Fs: FSInterface> RowWriter<'b, Fs> { // UNSAFE(@ohsayan): +tagck pk.read_uint() } - .to_le_bytes(); + .u64_bytes_le(); self.f.dtrack_write(&data)?; } TagUnique::Str | TagUnique::Bin => { @@ -206,7 +211,7 @@ impl<'a, 'b, Fs: FSInterface> BatchWriter<'a, 'b, Fs> { fn write_batch( model: &'a Model, g: &'a Guard, - count: usize, + expected: usize, f: &'b mut TrackedWriter< Fs::File, as RawJournalAdapter>::Spec, @@ -224,10 +229,12 @@ impl<'a, 'b, Fs: FSInterface> BatchWriter<'a, 'b, Fs> { let mut me = Self::new(model, g, f)?; let mut applied_deltas = vec![]; let mut i = 0; - while i < count { + while i < expected { let delta = me.model.delta_state().__data_delta_dequeue(me.g).unwrap(); match me.step(&delta) { Ok(()) => { + // flush buffer after every delta write + me.row_writer.f.flush_buf()?; applied_deltas.push(delta); i += 1; } @@ -254,7 +261,7 @@ impl<'a, 'b, Fs: FSInterface> BatchWriter<'a, 'b, Fs> { >, ) -> RuntimeResult { let mut row_writer = RowWriter { f }; - row_writer.write_static_metadata(model)?; + row_writer.write_row_global_metadata(model)?; Ok(Self { model, row_writer, @@ -313,14 +320,14 @@ impl<'a> JournalAdapterEvent> for StdModelBatch<' >, ) -> RuntimeResult<()> { // [expected commit] - writer.dtrack_write(&(self.1 as u64).to_le_bytes())?; + writer.dtrack_write(&self.1.u64_bytes_le())?; let g = pin(); let actual_commit = BatchWriter::::write_batch(self.0, &g, self.1, writer)?; if actual_commit != self.1 { // early exit writer.dtrack_write(&[EventType::EarlyExit.dscr()])?; } - writer.dtrack_write(&(actual_commit as u64).to_le_bytes()) + writer.dtrack_write(&actual_commit.u64_bytes_le()) } } @@ -352,7 +359,7 @@ impl<'a> JournalAdapterEvent> for FullModel<'a> { .f .dtrack_write(¤t_row_count.u64_bytes_le())?; // [pk tag][schema version][column cnt] - row_writer.write_static_metadata(self.0)?; + row_writer.write_row_global_metadata(self.0)?; for (key, row_data) in index.mt_iter_kv(&g) { let row_data = row_data.read(); row_writer.write_row_metadata(DataDeltaKind::Insert, row_data.get_txn_revised())?; diff --git a/server/src/engine/storage/v2/impls/mod.rs b/server/src/engine/storage/v2/impls/mod.rs index d155c026..fb734ede 100644 --- a/server/src/engine/storage/v2/impls/mod.rs +++ b/server/src/engine/storage/v2/impls/mod.rs @@ -26,3 +26,5 @@ pub mod gns_log; pub mod mdl_journal; +#[cfg(test)] +mod tests; diff --git a/server/src/engine/storage/v2/impls/tests/mod.rs b/server/src/engine/storage/v2/impls/tests/mod.rs new file mode 100644 index 00000000..13ab3812 --- /dev/null +++ b/server/src/engine/storage/v2/impls/tests/mod.rs @@ -0,0 +1,27 @@ +/* + * Created on Thu Feb 22 2024 + * + * This file is a part of Skytable + * Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source + * NoSQL database written by Sayan Nandan ("the Author") with the + * vision to provide flexibility in data modelling without compromising + * on performance, queryability or scalability. + * + * Copyright (c) 2024, Sayan Nandan + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * +*/ + +mod model_driver; diff --git a/server/src/engine/storage/v2/impls/tests/model_driver.rs b/server/src/engine/storage/v2/impls/tests/model_driver.rs new file mode 100644 index 00000000..fcb927b3 --- /dev/null +++ b/server/src/engine/storage/v2/impls/tests/model_driver.rs @@ -0,0 +1,134 @@ +/* + * Created on Thu Feb 22 2024 + * + * This file is a part of Skytable + * Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source + * NoSQL database written by Sayan Nandan ("the Author") with the + * vision to provide flexibility in data modelling without compromising + * on performance, queryability or scalability. + * + * Copyright (c) 2024, Sayan Nandan + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * +*/ + +use { + crate::{ + engine::{ + core::{dml, model::Model, space::Space, EntityIDRef}, + data::lit::Lit, + error::QueryResult, + fractal::{test_utils::TestGlobal, GlobalInstanceLike}, + ql::{ + ast, + ddl::crt::{CreateModel, CreateSpace}, + dml::ins::InsertStatement, + tests::lex_insecure, + }, + storage::common::interface::fs_test::VirtualFS, + }, + util::test_utils, + }, + crossbeam_epoch::pin, +}; + +fn create_model_and_space(global: &TestGlobal, create_model: &str) -> QueryResult<()> { + let tokens = lex_insecure(create_model.as_bytes()).unwrap(); + let create_model: CreateModel = ast::parse_ast_node_full(&tokens[2..]).unwrap(); + // first create space + let create_space_str = format!("create space {}", create_model.model_name.space()); + let create_space_tokens = lex_insecure(create_space_str.as_bytes()).unwrap(); + let create_space: CreateSpace = ast::parse_ast_node_full(&create_space_tokens[2..]).unwrap(); + Space::transactional_exec_create(global, create_space)?; + Model::transactional_exec_create(global, create_model).map(|_| ()) +} + +fn run_insert(global: &TestGlobal, insert: &str) -> QueryResult<()> { + let tokens = lex_insecure(insert.as_bytes()).unwrap(); + let insert: InsertStatement = ast::parse_ast_node_full(&tokens[1..]).unwrap(); + dml::insert(global, insert) +} + +#[test] +fn empty_model_data() { + test_utils::with_variable("empty_model_data", |log_name| { + // create and close + { + let global = TestGlobal::new_with_vfs_driver(log_name); + let _ = create_model_and_space( + &global, + "create model milky_way.solar_system(planet_name: string, population: uint64)", + ) + .unwrap(); + } + // open + { + let global = TestGlobal::new_with_vfs_driver(log_name); + drop(global); + } + }) +} + +#[test] +fn model_data_deltas() { + test_utils::with_variable(("model_data_deltas", 1000), |(log_name, change_count)| { + // create, insert and close + { + let mut global = TestGlobal::new_with_vfs_driver(log_name); + global.set_max_data_pressure(change_count); + let _ = create_model_and_space( + &global, + "create model apps.social(user_name: string, password: string)", + ) + .unwrap(); + for i in 1..=change_count { + run_insert( + &global, + &format!("insert into apps.social('user-{i:0>1000}', 'password-{i:0>1000}')"), + ) + .unwrap(); + } + } + // reopen and verify a 100 times + test_utils::multi_run(100, || { + let global = TestGlobal::new_with_vfs_driver(log_name); + global.load_model_drivers().unwrap(); + global + .state() + .with_model(EntityIDRef::new("apps", "social"), |model| { + let g = pin(); + for i in 1..=change_count { + assert_eq!( + model + .primary_index() + .select(Lit::new_string(format!("user-{i:0>1000}")), &g) + .unwrap() + .d_data() + .read() + .fields() + .get("password") + .cloned() + .unwrap() + .into_str() + .unwrap(), + format!("password-{i:0>1000}") + ) + } + Ok(()) + }) + .unwrap() + }) + }) +} diff --git a/server/src/engine/storage/v2/raw/journal/mod.rs b/server/src/engine/storage/v2/raw/journal/mod.rs index dceb63a5..8f11a222 100644 --- a/server/src/engine/storage/v2/raw/journal/mod.rs +++ b/server/src/engine/storage/v2/raw/journal/mod.rs @@ -286,8 +286,8 @@ impl RawJournalAdapter for BatchAdapter { if real_commit_size == _stored_expected_commit_size { break; } - let event_type = f.read_block::<1>().and_then(|b| { - <::EventType as TaggedEnum>::try_from_raw(b[0]) + let event_type = f.read_block().and_then(|[b]| { + <::EventType as TaggedEnum>::try_from_raw(b) .ok_or(StorageError::RawJournalCorrupted.into()) })?; // is this an early exit marker? if so, exit diff --git a/server/src/util/test_utils.rs b/server/src/util/test_utils.rs index a12110a1..319de084 100644 --- a/server/src/util/test_utils.rs +++ b/server/src/util/test_utils.rs @@ -42,10 +42,20 @@ pub fn rng() -> ThreadRng { rand::thread_rng() } +pub fn multi_run(count: usize, f: impl Fn()) { + for _ in 0..count { + f() + } +} + pub fn shuffle_slice(slice: &mut [T], rng: &mut impl Rng) { slice.shuffle(rng) } +pub fn with_variable(variable: T, f: impl Fn(T) -> U) -> U { + f(variable) +} + pub fn with_files(files: [&str; N], f: impl Fn([&str; N]) -> T) -> T { use std::fs; for file in files {