Pass references for model data

Also fixed a bug that reported false positives of data corruption
resulting from incorrect checksum tracking.
next
Sayan Nandan 10 months ago
parent 2b4d9efb1b
commit 23f4296982
No known key found for this signature in database
GPG Key ID: 42EEDF4AE9D96B54

@ -1202,11 +1202,6 @@ fn check_config_file(
Some(ep) => match ep.secure.as_mut() {
Some(secure_ep) => {
super::fractal::context::set_dmsg("loading TLS configuration from disk");
dbg!(
&secure_ep.cert,
&secure_ep.private_key,
&secure_ep.pkey_passphrase
);
let cert = fs::read_to_string(&secure_ep.cert)?;
let private_key = fs::read_to_string(&secure_ep.private_key)?;
let private_key_passphrase = fs::read_to_string(&secure_ep.pkey_passphrase)?;

@ -87,23 +87,35 @@ fn prepare_insert(
}
let (field_id, field) = field;
okay &= field.vt_data_fpath(&mut data);
okay &= prepared_data.st_insert(field_id.clone(), data);
okay &= prepared_data.st_insert(
unsafe {
// UNSAFE(@ohsayan): the model is right here, so we're good
field_id.clone()
},
data,
);
}
}
InsertData::Map(map) => {
let mut map = map.into_iter();
while (map.len() != 0) & okay {
let (field_id, mut field_data) = unsafe {
let mut inserted = 0;
let mut iter = fields.st_iter_kv().zip(map.into_iter());
while (iter.len() != 0) & (okay) {
let ((model_field_key, model_field_spec), (this_field_key, mut this_field_data)) = unsafe {
// UNSAFE(@ohsayan): safe because of loop invariant
map.next().unwrap_unchecked()
iter.next().unwrap_unchecked()
};
let Some(field) = fields.st_get_cloned(field_id.as_str()) else {
okay = false;
break;
};
okay &= field.vt_data_fpath(&mut field_data);
prepared_data.st_insert(field_id.boxed_str(), field_data);
okay &= model_field_spec.vt_data_fpath(&mut this_field_data);
okay &= model_field_key.as_str() == this_field_key.as_str();
prepared_data.st_insert(
unsafe {
// UNSAFE(@ohsayan): the model is right here. it saves us the work!
model_field_key.clone()
},
this_field_data,
);
inserted += 1;
}
okay &= inserted == fields.len();
}
}
let primary_key = prepared_data.remove(model.p_key());

@ -24,18 +24,16 @@
*
*/
use {
crate::engine::{
core::{dml, model::Model, space::Space},
error::{QueryError, QueryResult},
fractal::Global,
net::protocol::{ClientLocalState, Response, SQuery},
ql::{
ast::{traits::ASTNode, InplaceData, State},
lex::{Keyword, KeywordStmt, Token},
},
use crate::engine::{
core::{dml, model::Model, space::Space},
error::{QueryError, QueryResult},
fractal::Global,
mem::RawSlice,
net::protocol::{ClientLocalState, Response, SQuery},
ql::{
ast::{traits::ASTNode, InplaceData, State},
lex::{Keyword, KeywordStmt, Token},
},
core::ops::Deref,
};
pub async fn dispatch_to_executor<'a>(
@ -65,32 +63,6 @@ pub async fn dispatch_to_executor<'a>(
trigger warning: disgusting hacks below (why can't async play nice with lifetimes :|)
*/
struct RawSlice<T> {
t: *const T,
l: usize,
}
unsafe impl<T: Send> Send for RawSlice<T> {}
unsafe impl<T: Sync> Sync for RawSlice<T> {}
impl<T> RawSlice<T> {
#[inline(always)]
unsafe fn new(t: *const T, l: usize) -> Self {
Self { t, l }
}
}
impl<T> Deref for RawSlice<T> {
type Target = [T];
#[inline(always)]
fn deref(&self) -> &Self::Target {
unsafe {
// UNSAFE(@ohsayan): the caller MUST guarantee that this remains valid throughout the usage of the slice
core::slice::from_raw_parts(self.t, self.l)
}
}
}
#[inline(always)]
fn call<A: ASTNode<'static> + core::fmt::Debug, T>(
g: Global,

@ -31,6 +31,7 @@ use {
core::model::{DeltaState, DeltaVersion, SchemaDeltaKind},
data::cell::Datacell,
idx::{meta::hash::HasherNativeFx, mtchm::meta::TreeElement, IndexST, STIndex},
mem::RawStr,
sync::smart::RawRC,
},
util::compiler,
@ -39,7 +40,7 @@ use {
std::mem::ManuallyDrop,
};
pub type DcFieldIndex = IndexST<Box<str>, Datacell, HasherNativeFx>;
pub type DcFieldIndex = IndexST<RawStr, Datacell, HasherNativeFx>;
#[derive(Debug)]
pub struct Row {
@ -145,7 +146,7 @@ impl Row {
.read()
.fields()
.st_iter_kv()
.map(|(id, data)| (id.clone(), data.clone()))
.map(|(id, data)| (id.as_str().to_owned().into_boxed_str(), data.clone()))
.collect()
}
}
@ -171,7 +172,15 @@ impl Row {
for (delta_id, delta) in delta_state.resolve_iter_since(wl.txn_revised_schema_version) {
match delta.kind() {
SchemaDeltaKind::FieldAdd(f) => {
wl.fields.st_insert(f.clone(), Datacell::null());
wl.fields.st_insert(
unsafe {
// UNSAFE(@ohsayan): a row is inside a model and is valid as long as it is in there!
// even if the model was chucked and the row was lying around it won't cause any harm because it
// neither frees anything nor allocates
f.clone()
},
Datacell::null(),
);
}
SchemaDeltaKind::FieldRem(f) => {
wl.fields.st_delete(f);

@ -127,7 +127,7 @@ impl<'a> AlterPlan<'a> {
okay &= no_field(mdl, &field_name) & mdl.not_pk(&field_name);
let is_nullable = check_nullable(&mut props)?;
let layers = Field::parse_layers(layers, is_nullable)?;
okay &= add.st_insert(field_name.to_string().into_boxed_str(), layers);
okay &= add.st_insert(field_name.as_str().into(), layers);
}
can_ignore!(AlterAction::Add(add))
}
@ -155,8 +155,7 @@ impl<'a> AlterPlan<'a> {
let (anydelta, new_field) =
Self::ldeltas(current_field, layers, is_nullable, &mut no_lock, &mut okay)?;
any_delta += anydelta as usize;
okay &=
new_fields.st_insert(field_name.to_string().into_boxed_str(), new_field);
okay &= new_fields.st_insert(field_name.as_str().into(), new_field);
}
if any_delta == 0 {
AlterAction::Ignore
@ -281,14 +280,12 @@ impl Model {
// commit txn
global.namespace_txn_driver().lock().try_commit(txn)?;
}
let mut mutator = model.model_mutator();
new_fields
.stseq_ord_kv()
.map(|(x, y)| (x.clone(), y.clone()))
.for_each(|(field_id, field)| {
model
.delta_state_mut()
.schema_append_unresolved_wl_field_add(&field_id);
model.fields_mut().st_insert(field_id, field);
mutator.add_field(field_id, field);
});
}
AlterAction::Remove(removed) => {
@ -301,11 +298,9 @@ impl Model {
// commit txn
global.namespace_txn_driver().lock().try_commit(txn)?;
}
let mut mutator = model.model_mutator();
removed.iter().for_each(|field_id| {
model
.delta_state_mut()
.schema_append_unresolved_wl_field_rem(field_id.as_str());
model.fields_mut().st_delete(field_id.as_str());
mutator.remove_field(field_id.as_str());
});
}
AlterAction::Update(updated) => {
@ -318,8 +313,9 @@ impl Model {
// commit txn
global.namespace_txn_driver().lock().try_commit(txn)?;
}
let mut mutator = model.model_mutator();
updated.into_iter().for_each(|(field_id, field)| {
model.fields_mut().st_update(&field_id, field);
mutator.update_field(field_id.as_ref(), field);
});
}
}

@ -29,6 +29,7 @@ use {
crate::engine::{
core::{dml::QueryExecMeta, index::Row},
fractal::{FractalToken, GlobalInstanceLike},
mem::RawStr,
sync::atm::Guard,
sync::queue::Queue,
},
@ -116,10 +117,10 @@ impl DeltaState {
pub fn schema_current_version(&self) -> DeltaVersion {
DeltaVersion(self.schema_current_version)
}
pub fn schema_append_unresolved_wl_field_add(&mut self, field_name: &str) {
pub fn unresolved_append_field_add(&mut self, field_name: RawStr) {
self.__schema_append_unresolved_delta(SchemaDeltaPart::field_add(field_name));
}
pub fn schema_append_unresolved_wl_field_rem(&mut self, field_name: &str) {
pub fn unresolved_append_field_rem(&mut self, field_name: RawStr) {
self.__schema_append_unresolved_delta(SchemaDeltaPart::field_rem(field_name));
}
}
@ -178,23 +179,19 @@ impl SchemaDeltaPart {
#[derive(Debug)]
pub enum SchemaDeltaKind {
FieldAdd(Box<str>),
FieldRem(Box<str>),
FieldAdd(RawStr),
FieldRem(RawStr),
}
impl SchemaDeltaPart {
fn new(kind: SchemaDeltaKind) -> Self {
Self { kind }
}
fn field_add(field_name: &str) -> Self {
Self::new(SchemaDeltaKind::FieldAdd(
field_name.to_owned().into_boxed_str(),
))
fn field_add(field_name: RawStr) -> Self {
Self::new(SchemaDeltaKind::FieldAdd(field_name))
}
fn field_rem(field_name: &str) -> Self {
Self::new(SchemaDeltaKind::FieldRem(
field_name.to_owned().into_boxed_str(),
))
fn field_rem(field_name: RawStr) -> Self {
Self::new(SchemaDeltaKind::FieldRem(field_name))
}
}

@ -40,8 +40,8 @@ use {
},
error::{QueryError, QueryResult},
fractal::{GenericTask, GlobalInstanceLike, Task},
idx::{IndexBaseSpec, IndexSTSeqCns, STIndex, STIndexSeq},
mem::VInline,
idx::{self, IndexBaseSpec, IndexSTSeqCns, STIndex, STIndexSeq},
mem::{RawStr, VInline},
ql::ddl::{
crt::CreateModel,
drop::DropModel,
@ -49,21 +49,23 @@ use {
},
txn::gns::{self as gnstxn, SpaceIDRef},
},
std::collections::hash_map::{Entry, HashMap},
};
pub(in crate::engine::core) use self::delta::{DeltaState, DeltaVersion, SchemaDeltaKind};
use super::util::{EntityID, EntityIDRef};
type Fields = IndexSTSeqCns<Box<str>, Field>;
type Fields = IndexSTSeqCns<RawStr, Field>;
#[derive(Debug)]
pub struct Model {
uuid: Uuid,
p_key: Box<str>,
p_key: RawStr,
p_tag: FullTag,
fields: Fields,
data: PrimaryIndex,
delta: DeltaState,
private: ModelPrivate,
}
#[cfg(test)]
@ -77,23 +79,6 @@ impl PartialEq for Model {
}
impl Model {
pub fn new(
uuid: Uuid,
p_key: Box<str>,
p_tag: FullTag,
fields: Fields,
data: PrimaryIndex,
delta: DeltaState,
) -> Self {
Self {
uuid,
p_key,
p_tag,
fields,
data,
delta,
}
}
pub fn get_uuid(&self) -> Uuid {
self.uuid
}
@ -122,27 +107,59 @@ impl Model {
pub fn delta_state(&self) -> &DeltaState {
&self.delta
}
pub fn delta_state_mut(&mut self) -> &mut DeltaState {
&mut self.delta
}
pub fn fields_mut(&mut self) -> &mut Fields {
&mut self.fields
}
pub fn fields(&self) -> &Fields {
&self.fields
}
pub fn model_mutator<'a>(&'a mut self) -> ModelMutator<'a> {
ModelMutator { model: self }
}
}
impl Model {
pub fn new_restore(uuid: Uuid, p_key: Box<str>, p_tag: FullTag, fields: Fields) -> Self {
Self::new(
fn new_with_private(
uuid: Uuid,
p_key: RawStr,
p_tag: FullTag,
fields: Fields,
private: ModelPrivate,
) -> Self {
Self {
uuid,
p_key,
p_tag,
fields,
PrimaryIndex::new_empty(),
DeltaState::new_resolved(),
)
data: PrimaryIndex::new_empty(),
delta: DeltaState::new_resolved(),
private,
}
}
pub fn new_restore(
uuid: Uuid,
p_key: Box<str>,
p_tag: FullTag,
decl_fields: IndexSTSeqCns<Box<str>, Field>,
) -> Self {
let mut private = ModelPrivate::empty();
let p_key = unsafe {
// UNSAFE(@ohsayan): once again, all cool since we maintain the allocation
private.push_allocated(p_key)
};
let mut fields = IndexSTSeqCns::idx_init();
decl_fields
.stseq_owned_kv()
.map(|(field_key, field)| {
(
unsafe {
// UNSAFE(@ohsayan): we ensure that priv is dropped iff model is dropped
private.push_allocated(field_key)
},
field,
)
})
.for_each(|(field_key, field)| {
fields.st_insert(field_key, field);
});
Self::new_with_private(uuid, p_key, p_tag, fields, private)
}
pub fn process_create(
CreateModel {
@ -151,6 +168,7 @@ impl Model {
props,
}: CreateModel,
) -> QueryResult<Self> {
let mut private = ModelPrivate::empty();
let mut okay = props.is_empty() & !fields.is_empty();
// validate fields
let mut field_spec = fields.into_iter();
@ -164,20 +182,36 @@ impl Model {
null,
primary,
} = field_spec.next().unwrap();
let this_field_ptr = unsafe {
// UNSAFE(@ohsayan): this is going to go with our alloc, so we're good! if we fail too, the dtor for private will run
private.allocate_or_recycle(field_name.as_str())
};
if primary {
pk_cnt += 1usize;
last_pk = Some(field_name.as_str());
last_pk = Some(unsafe {
// UNSAFE(@ohsayan): totally cool, it's all allocated
this_field_ptr.clone()
});
okay &= !null;
}
let layer = Field::parse_layers(layers, null)?;
okay &= fields.st_insert(field_name.as_str().to_string().into_boxed_str(), layer);
okay &= fields.st_insert(this_field_ptr, layer);
}
okay &= pk_cnt <= 1;
if okay {
let last_pk = last_pk.unwrap_or(fields.stseq_ord_key().next().unwrap());
let tag = fields.st_get(last_pk).unwrap().layers()[0].tag;
let last_pk = last_pk.unwrap_or(unsafe {
// UNSAFE(@ohsayan): once again, all of this is allocated
fields.stseq_ord_key().next().unwrap().clone()
});
let tag = fields.st_get(&last_pk).unwrap().layers()[0].tag;
if tag.tag_unique().is_unique() {
return Ok(Self::new_restore(Uuid::new(), last_pk.into(), tag, fields));
return Ok(Self::new_with_private(
Uuid::new(),
last_pk,
tag,
fields,
private,
));
}
}
Err(QueryError::QExecDdlModelBadDefinition)
@ -286,6 +320,92 @@ impl Model {
}
}
#[derive(Debug, PartialEq)]
struct ModelPrivate {
alloc: HashMap<Box<str>, bool, idx::meta::hash::HasherNativeFx>,
}
impl ModelPrivate {
fn empty() -> Self {
Self {
alloc: HashMap::with_hasher(Default::default()),
}
}
pub(self) unsafe fn allocate_or_recycle(&mut self, new: &str) -> RawStr {
match self.alloc.get_key_value(new) {
Some((prev_alloc, _)) => {
// already allocated this
let ret = RawStr::new(prev_alloc.as_ptr(), prev_alloc.len());
// set live!
*self.alloc.get_mut(ret.as_str()).unwrap() = false;
return ret;
}
None => {
// need to allocate
let alloc = new.to_owned().into_boxed_str();
let ret = RawStr::new(alloc.as_ptr(), alloc.len());
let _ = self.alloc.insert(alloc, false);
return ret;
}
}
}
pub(self) unsafe fn mark_pending_remove(&mut self, v: &str) -> RawStr {
let ret = self.alloc.get_key_value(v).unwrap().0;
let ret = RawStr::new(ret.as_ptr(), ret.len());
*self.alloc.get_mut(v).unwrap() = true;
ret
}
pub(self) unsafe fn vacuum_marked(&mut self) {
self.alloc.retain(|_, dead| !*dead)
}
pub(self) unsafe fn push_allocated(&mut self, alloc: Box<str>) -> RawStr {
match self.alloc.entry(alloc) {
Entry::Occupied(mut oe) => {
oe.insert(false);
RawStr::new(oe.key().as_ptr(), oe.key().len())
}
Entry::Vacant(ve) => {
let ret = RawStr::new(ve.key().as_ptr(), ve.key().len());
ve.insert(false);
return ret;
}
}
}
}
pub struct ModelMutator<'a> {
model: &'a mut Model,
}
impl<'a> ModelMutator<'a> {
pub unsafe fn vacuum_stashed(&mut self) {
self.model.private.vacuum_marked()
}
pub fn remove_field(&mut self, name: &str) -> bool {
// remove
let r = self.model.fields.st_delete(name);
// recycle
let ptr = unsafe { self.model.private.mark_pending_remove(name) };
// publish delta
self.model.delta.unresolved_append_field_rem(ptr);
r
}
pub fn add_field(&mut self, name: Box<str>, field: Field) -> bool {
unsafe {
// allocate
let fkeyptr = self.model.private.push_allocated(name);
// add
let r = self.model.fields.st_insert(fkeyptr.clone(), field);
// delta
self.model.delta.unresolved_append_field_add(fkeyptr);
r
}
}
pub fn update_field(&mut self, name: &str, field: Field) -> bool {
self.model.fields.st_update(name, field)
}
}
/*
Layer
*/

@ -371,7 +371,7 @@ mod exec {
.stseq_ord_kv()
.rev()
.take(2)
.map(|(id, f)| (id.clone(), f.clone()))
.map(|(id, f)| (id.as_str().to_owned(), f.clone()))
.collect::<Vec<_>>(),
[
("col3".into(), Field::new([Layer::uint16()].into(), true)),
@ -400,7 +400,7 @@ mod exec {
.fields()
.stseq_ord_kv()
.rev()
.map(|(a, b)| (a.clone(), b.clone()))
.map(|(a, b)| (a.as_str().to_owned(), b.clone()))
.collect::<Vec<_>>(),
[("username".into(), Field::new([Layer::str()].into(), false))]
);

@ -39,7 +39,9 @@ use {
core::{borrow::Borrow, hash::Hash},
};
pub use stord::iter::IndexSTSeqDllIterOrdKV;
pub mod stdord_iter {
pub use super::stord::iter::IndexSTSeqDllIterOrdKV;
}
// re-exports
pub type IndexSTSeqCns<K, V> = stord::IndexSTSeqDll<K, V, stord::config::ConservativeConfig<K, V>>;
@ -308,7 +310,7 @@ pub trait STIndex<K: ?Sized, V>: IndexBaseSpec {
fn st_iter_value<'a>(&'a self) -> Self::IterValue<'a>;
}
pub trait STIndexSeq<K: ?Sized, V>: STIndex<K, V> {
pub trait STIndexSeq<K, V>: STIndex<K, V> {
/// An ordered iterator over the keys and values
type IterOrdKV<'a>: Iterator<Item = (&'a K, &'a V)> + DoubleEndedIterator<Item = (&'a K, &'a V)>
where
@ -325,10 +327,17 @@ pub trait STIndexSeq<K: ?Sized, V>: STIndex<K, V> {
where
Self: 'a,
V: 'a;
type OwnedIterKV: Iterator<Item = (K, V)> + DoubleEndedIterator<Item = (K, V)>;
type OwnedIterKeys: Iterator<Item = K> + DoubleEndedIterator<Item = K>;
type OwnedIterValues: Iterator<Item = V> + DoubleEndedIterator<Item = V>;
/// Returns an ordered iterator over the KV pairs
fn stseq_ord_kv<'a>(&'a self) -> Self::IterOrdKV<'a>;
/// Returns an ordered iterator over the keys
fn stseq_ord_key<'a>(&'a self) -> Self::IterOrdKey<'a>;
/// Returns an ordered iterator over the values
fn stseq_ord_value<'a>(&'a self) -> Self::IterOrdValue<'a>;
// owned
fn stseq_owned_kv(self) -> Self::OwnedIterKV;
fn stseq_owned_keys(self) -> Self::OwnedIterKeys;
fn stseq_owned_values(self) -> Self::OwnedIterValues;
}

@ -115,7 +115,7 @@ pub trait Config<K, V> {
type AllocStrategy: AllocStrategy<K, V>;
}
#[derive(Debug)]
#[derive(Debug, Default)]
pub struct ConservativeConfig<K, V>(PhantomData<super::IndexSTSeqDll<K, V, Self>>);
impl<K, V> Config<K, V> for ConservativeConfig<K, V> {
@ -123,7 +123,7 @@ impl<K, V> Config<K, V> for ConservativeConfig<K, V> {
type AllocStrategy = ConservativeStrategy<K, V>;
}
#[derive(Debug)]
#[derive(Debug, Default)]
pub struct LiberalConfig<K, V>(PhantomData<super::IndexSTSeqDll<K, V, Self>>);
impl<K, V> Config<K, V> for LiberalConfig<K, V> {

@ -28,14 +28,16 @@ use {
super::{
config::Config, IndexSTSeqDll, IndexSTSeqDllKeyptr, IndexSTSeqDllNode, IndexSTSeqDllNodePtr,
},
crate::engine::idx::{AsKey, AsValue},
std::{
collections::{
hash_map::{Iter, Keys as StdMapIterKey, Values as StdMapIterVal},
hash_map::{Iter as StdMapIter, Keys as StdMapIterKey, Values as StdMapIterVal},
HashMap as StdMap,
},
fmt::{self, Debug},
iter::FusedIterator,
marker::PhantomData,
mem::ManuallyDrop,
ptr::{self, NonNull},
},
};
@ -48,7 +50,7 @@ macro_rules! unsafe_marker_impl {
}
pub struct IndexSTSeqDllIterUnordKV<'a, K: 'a, V: 'a> {
i: Iter<'a, IndexSTSeqDllKeyptr<K>, IndexSTSeqDllNodePtr<K, V>>,
i: StdMapIter<'a, IndexSTSeqDllKeyptr<K>, IndexSTSeqDllNodePtr<K, V>>,
}
// UNSAFE(@ohsayan): aliasing guarantees correctness
@ -263,6 +265,118 @@ impl<K, V> IndexSTSeqDllIterOrdConfig<K, V> for IndexSTSeqDllIterOrdConfigValue
}
}
pub(super) struct OrderedOwnedIteratorRaw<K, V> {
h: *mut IndexSTSeqDllNode<K, V>,
t: *mut IndexSTSeqDllNode<K, V>,
r: usize,
}
impl<K: AsKey, V: AsValue> OrderedOwnedIteratorRaw<K, V> {
pub(super) fn new<Mc: Config<K, V>>(mut idx: IndexSTSeqDll<K, V, Mc>) -> Self {
// clean up if needed
idx.vacuum_full();
let mut idx = ManuallyDrop::new(idx);
// chuck the map
drop(unsafe { ptr::read((&mut idx.m) as *mut _) });
// we own everything now
unsafe {
Self {
h: if idx.h.is_null() {
ptr::null_mut()
} else {
(*idx.h).p
},
t: idx.h,
r: idx.len(),
}
}
}
}
impl<K, V> OrderedOwnedIteratorRaw<K, V> {
#[inline(always)]
fn _next(&mut self) -> Option<(K, V)> {
if self.h == self.t {
None
} else {
self.r -= 1;
unsafe {
// UNSAFE(@ohsayan): +nullck
let this = ptr::read(self.h);
// destroy this node
IndexSTSeqDllNode::dealloc_headless(self.h);
self.h = (*self.h).p;
Some((this.k, this.v))
}
}
}
#[inline(always)]
fn _next_back(&mut self) -> Option<(K, V)> {
if self.h == self.t {
None
} else {
self.r -= 1;
unsafe {
// UNSAFE(@ohsayan): +nullck
self.t = (*self.t).n;
let this = ptr::read(self.t);
IndexSTSeqDllNode::dealloc_headless(self.t);
Some((this.k, this.v))
}
}
}
}
impl<K, V> Drop for OrderedOwnedIteratorRaw<K, V> {
fn drop(&mut self) {
// clean up what's left
while let Some(_) = self._next() {}
}
}
pub struct OrderedOwnedIteratorKV<K, V>(pub(super) OrderedOwnedIteratorRaw<K, V>);
impl<K: AsKey, V: AsValue> Iterator for OrderedOwnedIteratorKV<K, V> {
type Item = (K, V);
fn next(&mut self) -> Option<Self::Item> {
self.0._next()
}
}
impl<K: AsKey, V: AsValue> DoubleEndedIterator for OrderedOwnedIteratorKV<K, V> {
fn next_back(&mut self) -> Option<Self::Item> {
self.0._next_back()
}
}
pub struct OrderedOwnedIteratorKey<K, V>(pub(super) OrderedOwnedIteratorRaw<K, V>);
impl<K: AsKey, V: AsValue> Iterator for OrderedOwnedIteratorKey<K, V> {
type Item = K;
fn next(&mut self) -> Option<Self::Item> {
self.0._next().map(|(k, _)| k)
}
}
impl<K: AsKey, V: AsValue> DoubleEndedIterator for OrderedOwnedIteratorKey<K, V> {
fn next_back(&mut self) -> Option<Self::Item> {
self.0._next_back().map(|(k, _)| k)
}
}
pub struct OrderedOwnedIteratorValue<K, V>(pub(super) OrderedOwnedIteratorRaw<K, V>);
impl<K: AsKey, V: AsValue> Iterator for OrderedOwnedIteratorValue<K, V> {
type Item = V;
fn next(&mut self) -> Option<Self::Item> {
self.0._next().map(|(_, v)| v)
}
}
impl<K: AsKey, V: AsValue> DoubleEndedIterator for OrderedOwnedIteratorValue<K, V> {
fn next_back(&mut self) -> Option<Self::Item> {
self.0._next_back().map(|(_, v)| v)
}
}
struct IndexSTSeqDllIterOrdBase<'a, K: 'a, V: 'a, C: IndexSTSeqDllIterOrdConfig<K, V>> {
h: *const IndexSTSeqDllNode<K, V>,
t: *const IndexSTSeqDllNode<K, V>,

@ -275,6 +275,12 @@ impl<K, V, C: Config<K, V>> IndexSTSeqDll<K, V, C> {
}
}
impl<K, V, C: Config<K, V> + Default> Default for IndexSTSeqDll<K, V, C> {
fn default() -> Self {
Self::with_hasher(C::Hasher::default())
}
}
impl<K, V, C: Config<K, V>> IndexSTSeqDll<K, V, C> {
#[inline(always)]
fn metrics_update_f_decr(&mut self) {
@ -707,28 +713,35 @@ where
Self: 'a,
K: 'a,
V: 'a;
type IterOrdKey<'a> = IndexSTSeqDllIterOrdKey<'a, K, V>
where
Self: 'a,
K: 'a;
type IterOrdValue<'a> = IndexSTSeqDllIterOrdValue<'a, K, V>
where
Self: 'a,
V: 'a;
type OwnedIterKV = iter::OrderedOwnedIteratorKV<K, V>;
type OwnedIterKeys = iter::OrderedOwnedIteratorKey<K, V>;
type OwnedIterValues = iter::OrderedOwnedIteratorValue<K, V>;
fn stseq_ord_kv<'a>(&'a self) -> Self::IterOrdKV<'a> {
self._iter_ord_kv()
}
fn stseq_ord_key<'a>(&'a self) -> Self::IterOrdKey<'a> {
self._iter_ord_k()
}
fn stseq_ord_value<'a>(&'a self) -> Self::IterOrdValue<'a> {
self._iter_ord_v()
}
fn stseq_owned_keys(self) -> Self::OwnedIterKeys {
iter::OrderedOwnedIteratorKey(iter::OrderedOwnedIteratorRaw::new(self))
}
fn stseq_owned_values(self) -> Self::OwnedIterValues {
iter::OrderedOwnedIteratorValue(iter::OrderedOwnedIteratorRaw::new(self))
}
fn stseq_owned_kv(self) -> Self::OwnedIterKV {
iter::OrderedOwnedIteratorKV(iter::OrderedOwnedIteratorRaw::new(self))
}
}
impl<K: AsKeyClone, V: AsValueClone, C: Config<K, V>> Clone for IndexSTSeqDll<K, V, C> {

@ -27,6 +27,7 @@
mod astr;
mod ll;
mod numbuf;
mod rawslice;
pub mod scanner;
mod stackop;
mod uarray;
@ -39,11 +40,12 @@ mod tests;
pub use {
astr::AStr,
ll::CachePadded,
numbuf::IntegerRepr,
rawslice::{RawSlice, RawStr},
scanner::BufferedScanner,
uarray::UArray,
vinline::VInline,
word::{DwordNN, DwordQN, WordIO, ZERO_BLOCK},
numbuf::IntegerRepr,
};
// imports
use std::alloc::{self, Layout};

@ -0,0 +1,158 @@
/*
* Created on Thu Nov 23 2023
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2023, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
use core::{
borrow::Borrow,
fmt,
hash::{Hash, Hasher},
ops::Deref,
slice, str,
};
#[derive(PartialEq, Eq, PartialOrd, Ord)]
pub struct RawStr {
base: RawSlice<u8>,
}
impl RawStr {
pub unsafe fn new(p: *const u8, l: usize) -> Self {
Self {
base: RawSlice::new(p, l),
}
}
pub unsafe fn clone(&self) -> Self {
Self {
base: self.base.clone(),
}
}
pub fn as_str(&self) -> &str {
unsafe {
// UNSAFE(@ohsayan): up to caller to ensure proper pointers
str::from_utf8_unchecked(self.base.as_slice())
}
}
}
impl From<&'static str> for RawStr {
fn from(s: &'static str) -> Self {
unsafe { Self::new(s.as_ptr(), s.len()) }
}
}
impl Borrow<str> for RawStr {
fn borrow(&self) -> &str {
unsafe { core::mem::transmute(self.clone()) }
}
}
impl Deref for RawStr {
type Target = str;
fn deref(&self) -> &Self::Target {
self.as_str()
}
}
impl fmt::Debug for RawStr {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
<str as fmt::Debug>::fmt(self.as_str(), f)
}
}
impl fmt::Display for RawStr {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
<str as fmt::Display>::fmt(self.as_str(), f)
}
}
impl Hash for RawStr {
fn hash<H: Hasher>(&self, state: &mut H) {
self.as_str().hash(state)
}
}
pub struct RawSlice<T> {
t: *const T,
l: usize,
}
unsafe impl<T: Send> Send for RawSlice<T> {}
unsafe impl<T: Sync> Sync for RawSlice<T> {}
impl<T> RawSlice<T> {
#[inline(always)]
pub unsafe fn new(t: *const T, l: usize) -> Self {
Self { t, l }
}
pub fn as_slice(&self) -> &[T] {
unsafe {
// UNSAFE(@ohsayan): the caller MUST guarantee that this remains valid throughout the usage of the slice
slice::from_raw_parts(self.t, self.l)
}
}
pub unsafe fn clone(&self) -> Self {
Self { ..*self }
}
}
impl<T: Hash> Hash for RawSlice<T> {
fn hash<H: Hasher>(&self, state: &mut H) {
self.as_slice().hash(state)
}
}
impl<T: PartialEq> PartialEq for RawSlice<T> {
fn eq(&self, other: &Self) -> bool {
self.as_slice() == other.as_slice()
}
}
impl<T: Eq> Eq for RawSlice<T> {}
impl<T: PartialOrd> PartialOrd for RawSlice<T> {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
self.as_slice().partial_cmp(other.as_slice())
}
}
impl<T: Ord> Ord for RawSlice<T> {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.as_slice().cmp(other.as_slice())
}
}
impl<T: fmt::Debug> fmt::Debug for RawSlice<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_list().entries(self.as_slice()).finish()
}
}
impl<T> Deref for RawSlice<T> {
type Target = [T];
#[inline(always)]
fn deref(&self) -> &Self::Target {
self.as_slice()
}
}

@ -42,7 +42,7 @@ fn parse<'a, Qd: QueryData<'a>>(state: &mut State<'a, Qd>) -> QueryResult<UserMe
^cursor
7 tokens
*/
if state.remaining() < 7 {
if state.remaining() < 7 {
return Err(QueryError::QLInvalidSyntax);
}
let token_buffer = state.current();

@ -151,12 +151,12 @@ impl<Fs: RawFSInterface> DataBatchPersistDriver<Fs> {
col_cnt: usize,
) -> RuntimeResult<()> {
self.f
.write_unfsynced(&[MARKER_ACTUAL_BATCH_EVENT, pk_tag.value_u8()])?;
.tracked_write_unfsynced(&[MARKER_ACTUAL_BATCH_EVENT, pk_tag.value_u8()])?;
let observed_len_bytes = observed_len.u64_bytes_le();
self.f.write_unfsynced(&observed_len_bytes)?;
self.f.tracked_write_unfsynced(&observed_len_bytes)?;
self.f
.write_unfsynced(&schema_version.value_u64().to_le_bytes())?;
self.f.write_unfsynced(&col_cnt.u64_bytes_le())?;
.tracked_write_unfsynced(&schema_version.value_u64().to_le_bytes())?;
self.f.tracked_write_unfsynced(&col_cnt.u64_bytes_le())?;
Ok(())
}
/// Append a summary of this batch and most importantly, **sync everything to disk**
@ -166,9 +166,9 @@ impl<Fs: RawFSInterface> DataBatchPersistDriver<Fs> {
inconsistent_reads: usize,
) -> RuntimeResult<()> {
// [0xFD][actual_commit][checksum]
self.f.write_unfsynced(&[MARKER_END_OF_BATCH])?;
self.f.tracked_write_unfsynced(&[MARKER_END_OF_BATCH])?;
let actual_commit = (observed_len - inconsistent_reads).u64_bytes_le();
self.f.write_unfsynced(&actual_commit)?;
self.f.tracked_write_unfsynced(&actual_commit)?;
let cs = self.f.reset_and_finish_checksum().to_le_bytes();
self.f.untracked_write(&cs)?;
// IMPORTANT: now that all data has been written, we need to actually ensure that the writes pass through the cache
@ -200,7 +200,7 @@ impl<Fs: RawFSInterface> DataBatchPersistDriver<Fs> {
pk.read_uint()
}
.to_le_bytes();
buf.write_unfsynced(&data)?;
buf.tracked_write_unfsynced(&data)?;
}
TagUnique::Str | TagUnique::Bin => {
let slice = unsafe {
@ -208,8 +208,8 @@ impl<Fs: RawFSInterface> DataBatchPersistDriver<Fs> {
pk.read_bin()
};
let slice_l = slice.len().u64_bytes_le();
buf.write_unfsynced(&slice_l)?;
buf.write_unfsynced(slice)?;
buf.tracked_write_unfsynced(&slice_l)?;
buf.tracked_write_unfsynced(slice)?;
}
TagUnique::Illegal => unsafe {
// UNSAFE(@ohsayan): a pk can't be constructed with illegal
@ -222,7 +222,7 @@ impl<Fs: RawFSInterface> DataBatchPersistDriver<Fs> {
fn encode_cell(&mut self, value: &Datacell) -> RuntimeResult<()> {
let mut buf = vec![];
cell::encode(&mut buf, value);
self.f.write_unfsynced(&buf)?;
self.f.tracked_write_unfsynced(&buf)?;
Ok(())
}
/// Encode row data
@ -232,8 +232,8 @@ impl<Fs: RawFSInterface> DataBatchPersistDriver<Fs> {
Some(cell) => {
self.encode_cell(cell)?;
}
None if field_name.as_ref() == model.p_key() => {}
None => self.f.write_unfsynced(&[0])?,
None if field_name.as_str() == model.p_key() => {}
None => self.f.tracked_write_unfsynced(&[0])?,
}
}
Ok(())
@ -241,9 +241,9 @@ impl<Fs: RawFSInterface> DataBatchPersistDriver<Fs> {
/// Write the change type and txnid
fn write_batch_item_common_row_data(&mut self, delta: &DataDelta) -> RuntimeResult<()> {
let change_type = [delta.change().value_u8()];
self.f.write_unfsynced(&change_type)?;
self.f.tracked_write_unfsynced(&change_type)?;
let txn_id = delta.data_version().value_u64().to_le_bytes();
self.f.write_unfsynced(&txn_id)?;
self.f.tracked_write_unfsynced(&txn_id)?;
Ok(())
}
}

@ -146,6 +146,7 @@ impl<F: RawFSInterface> DataBatchRestoreDriver<F> {
// begin
let mut closed = false;
while !self.f.is_eof() && !closed {
self.f.__reset_checksum();
// try to decode this batch
let Ok(batch) = self.read_batch() else {
self.attempt_recover_data_batch()?;
@ -237,10 +238,16 @@ impl<F: RawFSInterface> DataBatchRestoreDriver<F> {
for (field_name, new_data) in m
.fields()
.stseq_ord_key()
.filter(|key| key.as_ref() != m.p_key())
.filter(|key| key.as_str() != m.p_key())
.zip(new_row)
{
data.st_insert(field_name.clone(), new_data);
data.st_insert(
unsafe {
// UNSAFE(@ohsayan): model in scope, we're good
field_name.clone()
},
new_data,
);
}
let row = Row::new_restored(
pk,

@ -30,56 +30,53 @@ use {
cell::{self, CanYieldDict, StorageCellTypeID},
FieldMD,
},
PersistMapSpec, PersistObject, VecU8,
AbstractMap, MapStorageSpec, PersistObject, VecU8,
},
crate::{
engine::{
core::model::Field,
data::{dict::DictEntryGeneric, DictGeneric},
data::dict::DictEntryGeneric,
error::{RuntimeResult, StorageError},
idx::{IndexBaseSpec, IndexSTSeqCns, STIndex, STIndexSeq},
mem::BufferedScanner,
idx::{IndexSTSeqCns, STIndexSeq},
mem::{BufferedScanner, StatelessLen},
storage::v1::inf,
},
util::{copy_slice_to_array as memcpy, EndianQW},
},
core::marker::PhantomData,
std::{collections::HashMap, marker::PhantomData},
};
#[derive(Debug, PartialEq, Eq, Clone, Copy, PartialOrd, Ord)]
pub struct MapIndexSizeMD(pub usize);
/// This is more of a lazy hack than anything sensible. Just implement a spec and then use this wrapper for any enc/dec operations
pub struct PersistMapImpl<'a, M: PersistMapSpec>(PhantomData<&'a M::MapType>);
pub struct PersistMapImpl<'a, M: MapStorageSpec>(PhantomData<&'a M::InMemoryMap>);
impl<'a, M: PersistMapSpec> PersistObject for PersistMapImpl<'a, M>
where
M::MapType: 'a + STIndex<M::Key, M::Value>,
{
impl<'a, M: MapStorageSpec> PersistObject for PersistMapImpl<'a, M> {
const METADATA_SIZE: usize = sizeof!(u64);
type InputType = &'a M::MapType;
type OutputType = M::MapType;
type InputType = &'a M::InMemoryMap;
type OutputType = M::RestoredMap;
type Metadata = MapIndexSizeMD;
fn pretest_can_dec_object(
s: &BufferedScanner,
MapIndexSizeMD(dict_size): &Self::Metadata,
) -> bool {
M::pretest_collection_using_size(s, *dict_size)
M::decode_pretest_for_map(s, *dict_size)
}
fn meta_enc(buf: &mut VecU8, data: Self::InputType) {
buf.extend(data.st_len().u64_bytes_le());
buf.extend(data.stateless_len().u64_bytes_le());
}
unsafe fn meta_dec(scanner: &mut BufferedScanner) -> RuntimeResult<Self::Metadata> {
Ok(MapIndexSizeMD(scanner.next_u64_le() as usize))
}
fn obj_enc(buf: &mut VecU8, map: Self::InputType) {
for (key, val) in M::_get_iter(map) {
M::entry_md_enc(buf, key, val);
if M::ENC_COUPLED {
M::enc_entry(buf, key, val);
for (key, val) in M::get_iter_from_memory(map) {
M::encode_entry_meta(buf, key, val);
if M::ENC_AS_ENTRY {
M::encode_entry_data(buf, key, val);
} else {
M::enc_key(buf, key);
M::enc_val(buf, val);
M::encode_entry_key(buf, key);
M::encode_entry_val(buf, val);
}
}
}
@ -87,22 +84,23 @@ where
scanner: &mut BufferedScanner,
MapIndexSizeMD(dict_size): Self::Metadata,
) -> RuntimeResult<Self::OutputType> {
let mut dict = M::MapType::idx_init();
while M::pretest_entry_metadata(scanner) & (dict.st_len() != dict_size) {
let mut dict = M::RestoredMap::map_new();
let decode_pretest_for_entry_meta = M::decode_pretest_for_entry_meta(scanner);
while decode_pretest_for_entry_meta & (dict.map_length() != dict_size) {
let md = unsafe {
// UNSAFE(@ohsayan): +pretest
M::entry_md_dec(scanner).ok_or::<StorageError>(
M::decode_entry_meta(scanner).ok_or::<StorageError>(
StorageError::InternalDecodeStructureCorruptedPayload.into(),
)?
};
if !M::pretest_entry_data(scanner, &md) {
if !M::decode_pretest_for_entry_data(scanner, &md) {
return Err(StorageError::InternalDecodeStructureCorruptedPayload.into());
}
let key;
let val;
unsafe {
if M::DEC_COUPLED {
match M::dec_entry(scanner, md) {
if M::DEC_AS_ENTRY {
match M::decode_entry_data(scanner, md) {
Some((_k, _v)) => {
key = _k;
val = _v;
@ -112,8 +110,8 @@ where
}
}
} else {
let _k = M::dec_key(scanner, &md);
let _v = M::dec_val(scanner, &md);
let _k = M::decode_entry_key(scanner, &md);
let _v = M::decode_entry_val(scanner, &md);
match (_k, _v) {
(Some(_k), Some(_v)) => {
key = _k;
@ -125,11 +123,11 @@ where
}
}
}
if !dict.st_insert(key, val) {
if !dict.map_insert(key, val) {
return Err(StorageError::InternalDecodeStructureIllegalData.into());
}
}
if dict.st_len() == dict_size {
if dict.map_length() == dict_size {
Ok(dict)
} else {
Err(StorageError::InternalDecodeStructureIllegalData.into())
@ -141,12 +139,12 @@ where
pub struct GenericDictSpec;
/// generic dict entry metadata
pub struct GenericDictEntryMD {
pub struct GenericDictEntryMetadata {
pub(crate) klen: usize,
pub(crate) dscr: u8,
}
impl GenericDictEntryMD {
impl GenericDictEntryMetadata {
/// decode md (no need for any validation since that has to be handled later and can only produce incorrect results
/// if unsafe code is used to translate an incorrect dscr)
pub(crate) fn decode(data: [u8; 9]) -> Self {
@ -157,32 +155,25 @@ impl GenericDictEntryMD {
}
}
impl PersistMapSpec for GenericDictSpec {
type MapIter<'a> = std::collections::hash_map::Iter<'a, Box<str>, DictEntryGeneric>;
type MapType = DictGeneric;
type Key = Box<str>;
type Value = DictEntryGeneric;
type EntryMD = GenericDictEntryMD;
const DEC_COUPLED: bool = false;
const ENC_COUPLED: bool = true;
fn _get_iter<'a>(map: &'a Self::MapType) -> Self::MapIter<'a> {
impl MapStorageSpec for GenericDictSpec {
type InMemoryMap = HashMap<Self::InMemoryKey, Self::InMemoryVal>;
type InMemoryKey = Box<str>;
type InMemoryVal = DictEntryGeneric;
type InMemoryMapIter<'a> =
std::collections::hash_map::Iter<'a, Self::InMemoryKey, Self::InMemoryVal>;
type RestoredKey = Self::InMemoryKey;
type RestoredMap = Self::InMemoryMap;
type RestoredVal = Self::InMemoryVal;
type EntryMetadata = GenericDictEntryMetadata;
const DEC_AS_ENTRY: bool = false;
const ENC_AS_ENTRY: bool = true;
fn get_iter_from_memory<'a>(map: &'a Self::InMemoryMap) -> Self::InMemoryMapIter<'a> {
map.iter()
}
fn pretest_entry_metadata(scanner: &BufferedScanner) -> bool {
// we just need to see if we can decode the entry metadata
scanner.has_left(9)
}
fn pretest_entry_data(scanner: &BufferedScanner, md: &Self::EntryMD) -> bool {
StorageCellTypeID::is_valid(md.dscr)
& scanner.has_left(StorageCellTypeID::expect_atleast(md.dscr))
}
fn entry_md_enc(buf: &mut VecU8, key: &Self::Key, _: &Self::Value) {
fn encode_entry_meta(buf: &mut VecU8, key: &Self::InMemoryKey, _: &Self::InMemoryVal) {
buf.extend(key.len().u64_bytes_le());
}
unsafe fn entry_md_dec(scanner: &mut BufferedScanner) -> Option<Self::EntryMD> {
Some(Self::EntryMD::decode(scanner.next_chunk()))
}
fn enc_entry(buf: &mut VecU8, key: &Self::Key, val: &Self::Value) {
fn encode_entry_data(buf: &mut VecU8, key: &Self::InMemoryKey, val: &Self::InMemoryVal) {
match val {
DictEntryGeneric::Map(map) => {
buf.push(StorageCellTypeID::Dict.value_u8());
@ -196,12 +187,41 @@ impl PersistMapSpec for GenericDictSpec {
}
}
}
unsafe fn dec_key(scanner: &mut BufferedScanner, md: &Self::EntryMD) -> Option<Self::Key> {
inf::dec::utils::decode_string(scanner, md.klen as usize)
fn encode_entry_key(_: &mut VecU8, _: &Self::InMemoryKey) {
unimplemented!()
}
fn encode_entry_val(_: &mut VecU8, _: &Self::InMemoryVal) {
unimplemented!()
}
fn decode_pretest_for_entry_meta(scanner: &mut BufferedScanner) -> bool {
// we just need to see if we can decode the entry metadata
scanner.has_left(9)
}
fn decode_pretest_for_entry_data(s: &mut BufferedScanner, md: &Self::EntryMetadata) -> bool {
StorageCellTypeID::is_valid(md.dscr)
& s.has_left(StorageCellTypeID::expect_atleast(md.dscr))
}
unsafe fn decode_entry_meta(s: &mut BufferedScanner) -> Option<Self::EntryMetadata> {
Some(Self::EntryMetadata::decode(s.next_chunk()))
}
unsafe fn decode_entry_data(
_: &mut BufferedScanner,
_: Self::EntryMetadata,
) -> Option<(Self::RestoredKey, Self::RestoredVal)> {
unimplemented!()
}
unsafe fn decode_entry_key(
s: &mut BufferedScanner,
md: &Self::EntryMetadata,
) -> Option<Self::RestoredKey> {
inf::dec::utils::decode_string(s, md.klen as usize)
.map(|s| s.into_boxed_str())
.ok()
}
unsafe fn dec_val(scanner: &mut BufferedScanner, md: &Self::EntryMD) -> Option<Self::Value> {
unsafe fn decode_entry_val(
scanner: &mut BufferedScanner,
md: &Self::EntryMetadata,
) -> Option<Self::RestoredVal> {
Some(
match cell::decode_element::<CanYieldDict, BufferedScanner>(
scanner,
@ -217,30 +237,16 @@ impl PersistMapSpec for GenericDictSpec {
},
)
}
// not implemented
fn enc_key(_: &mut VecU8, _: &Self::Key) {
unimplemented!()
}
fn enc_val(_: &mut VecU8, _: &Self::Value) {
unimplemented!()
}
unsafe fn dec_entry(
_: &mut BufferedScanner,
_: Self::EntryMD,
) -> Option<(Self::Key, Self::Value)> {
unimplemented!()
}
}
pub struct FieldMapSpec;
pub struct FieldMapEntryMD {
pub struct FieldMapEntryMetadata {
field_id_l: u64,
field_prop_c: u64,
field_layer_c: u64,
null: u8,
}
impl FieldMapEntryMD {
impl FieldMapEntryMetadata {
const fn new(field_id_l: u64, field_prop_c: u64, field_layer_c: u64, null: u8) -> Self {
Self {
field_id_l,
@ -251,130 +257,127 @@ impl FieldMapEntryMD {
}
}
impl PersistMapSpec for FieldMapSpec {
type MapIter<'a> = crate::engine::idx::IndexSTSeqDllIterOrdKV<'a, Box<str>, Field>;
type MapType = IndexSTSeqCns<Self::Key, Self::Value>;
type EntryMD = FieldMapEntryMD;
type Key = Box<str>;
type Value = Field;
const ENC_COUPLED: bool = false;
const DEC_COUPLED: bool = false;
fn _get_iter<'a>(m: &'a Self::MapType) -> Self::MapIter<'a> {
m.stseq_ord_kv()
}
fn pretest_entry_metadata(scanner: &BufferedScanner) -> bool {
scanner.has_left(sizeof!(u64, 3) + 1)
pub trait FieldMapAny: StatelessLen {
type Iterator<'a>: Iterator<Item = (&'a str, &'a Field)>
where
Self: 'a;
fn get_iter<'a>(&'a self) -> Self::Iterator<'a>
where
Self: 'a;
}
impl FieldMapAny for HashMap<Box<str>, Field> {
type Iterator<'a> = std::iter::Map<
std::collections::hash_map::Iter<'a, Box<str>, Field>,
fn((&Box<str>, &Field)) -> (&'a str, &'a Field),
>;
fn get_iter<'a>(&'a self) -> Self::Iterator<'a>
where
Self: 'a,
{
self.iter()
.map(|(a, b)| unsafe { core::mem::transmute((a.as_ref(), b)) })
}
fn pretest_entry_data(scanner: &BufferedScanner, md: &Self::EntryMD) -> bool {
scanner.has_left(md.field_id_l as usize) // TODO(@ohsayan): we can enforce way more here such as atleast one field etc
}
impl FieldMapAny for IndexSTSeqCns<crate::engine::mem::RawStr, Field> {
type Iterator<'a> = std::iter::Map<
crate::engine::idx::stdord_iter::IndexSTSeqDllIterOrdKV<'a, crate::engine::mem::RawStr, Field>,
fn((&crate::engine::mem::RawStr, &Field)) -> (&'a str, &'a Field)>
where
Self: 'a;
fn get_iter<'a>(&'a self) -> Self::Iterator<'a>
where
Self: 'a,
{
self.stseq_ord_kv()
.map(|(k, v)| unsafe { core::mem::transmute((k.as_str(), v)) })
}
}
impl FieldMapAny for IndexSTSeqCns<Box<str>, Field> {
type Iterator<'a> = std::iter::Map<
crate::engine::idx::stdord_iter::IndexSTSeqDllIterOrdKV<'a, Box<str>, Field>,
fn((&Box<str>, &Field)) -> (&'a str, &'a Field)>
where
Self: 'a;
fn get_iter<'a>(&'a self) -> Self::Iterator<'a>
where
Self: 'a,
{
self.stseq_ord_kv()
.map(|(k, v)| unsafe { core::mem::transmute((k.as_ref(), v)) })
}
fn entry_md_enc(buf: &mut VecU8, key: &Self::Key, val: &Self::Value) {
}
pub struct FieldMapSpec<FM>(PhantomData<FM>);
impl<FM: FieldMapAny> MapStorageSpec for FieldMapSpec<FM> {
type InMemoryMap = FM;
type InMemoryKey = str;
type InMemoryVal = Field;
type InMemoryMapIter<'a> = FM::Iterator<'a> where FM: 'a;
type RestoredKey = Box<str>;
type RestoredVal = Field;
type RestoredMap = IndexSTSeqCns<Box<str>, Field>;
type EntryMetadata = FieldMapEntryMetadata;
const ENC_AS_ENTRY: bool = false;
const DEC_AS_ENTRY: bool = false;
fn get_iter_from_memory<'a>(map: &'a Self::InMemoryMap) -> Self::InMemoryMapIter<'a> {
map.get_iter()
}
fn encode_entry_meta(buf: &mut VecU8, key: &Self::InMemoryKey, val: &Self::InMemoryVal) {
buf.extend(key.len().u64_bytes_le());
buf.extend(0u64.to_le_bytes()); // TODO(@ohsayan): props
buf.extend(val.layers().len().u64_bytes_le());
buf.push(val.is_nullable() as u8);
}
unsafe fn entry_md_dec(scanner: &mut BufferedScanner) -> Option<Self::EntryMD> {
Some(FieldMapEntryMD::new(
scanner.next_u64_le(),
scanner.next_u64_le(),
scanner.next_u64_le(),
scanner.next_byte(),
))
fn encode_entry_data(_: &mut VecU8, _: &Self::InMemoryKey, _: &Self::InMemoryVal) {
unimplemented!()
}
fn enc_key(buf: &mut VecU8, key: &Self::Key) {
fn encode_entry_key(buf: &mut VecU8, key: &Self::InMemoryKey) {
buf.extend(key.as_bytes());
}
fn enc_val(buf: &mut VecU8, val: &Self::Value) {
fn encode_entry_val(buf: &mut VecU8, val: &Self::InMemoryVal) {
for layer in val.layers() {
super::obj::LayerRef::default_full_enc(buf, super::obj::LayerRef(layer))
}
}
unsafe fn dec_key(scanner: &mut BufferedScanner, md: &Self::EntryMD) -> Option<Self::Key> {
inf::dec::utils::decode_string(scanner, md.field_id_l as usize)
.map(|s| s.into_boxed_str())
.ok()
}
unsafe fn dec_val(scanner: &mut BufferedScanner, md: &Self::EntryMD) -> Option<Self::Value> {
super::obj::FieldRef::obj_dec(
scanner,
FieldMD::new(md.field_prop_c, md.field_layer_c, md.null),
)
.ok()
}
// unimplemented
fn enc_entry(_: &mut VecU8, _: &Self::Key, _: &Self::Value) {
unimplemented!()
}
unsafe fn dec_entry(
_: &mut BufferedScanner,
_: Self::EntryMD,
) -> Option<(Self::Key, Self::Value)> {
unimplemented!()
}
}
// TODO(@ohsayan): common trait for k/v associations, independent of underlying maptype
pub struct FieldMapSpecST;
impl PersistMapSpec for FieldMapSpecST {
type MapIter<'a> = std::collections::hash_map::Iter<'a, Box<str>, Field>;
type MapType = std::collections::HashMap<Box<str>, Field>;
type EntryMD = FieldMapEntryMD;
type Key = Box<str>;
type Value = Field;
const ENC_COUPLED: bool = false;
const DEC_COUPLED: bool = false;
fn _get_iter<'a>(m: &'a Self::MapType) -> Self::MapIter<'a> {
m.iter()
}
fn pretest_entry_metadata(scanner: &BufferedScanner) -> bool {
fn decode_pretest_for_entry_meta(scanner: &mut BufferedScanner) -> bool {
scanner.has_left(sizeof!(u64, 3) + 1)
}
fn pretest_entry_data(scanner: &BufferedScanner, md: &Self::EntryMD) -> bool {
scanner.has_left(md.field_id_l as usize) // TODO(@ohsayan): we can enforce way more here such as atleast one field etc
fn decode_pretest_for_entry_data(s: &mut BufferedScanner, md: &Self::EntryMetadata) -> bool {
s.has_left(md.field_id_l as usize) // TODO(@ohsayan): we can enforce way more here such as atleast one field etc
}
fn entry_md_enc(buf: &mut VecU8, key: &Self::Key, val: &Self::Value) {
buf.extend(key.len().u64_bytes_le());
buf.extend(0u64.to_le_bytes()); // TODO(@ohsayan): props
buf.extend(val.layers().len().u64_bytes_le());
buf.push(val.is_nullable() as u8);
}
unsafe fn entry_md_dec(scanner: &mut BufferedScanner) -> Option<Self::EntryMD> {
Some(FieldMapEntryMD::new(
unsafe fn decode_entry_meta(scanner: &mut BufferedScanner) -> Option<Self::EntryMetadata> {
Some(FieldMapEntryMetadata::new(
scanner.next_u64_le(),
scanner.next_u64_le(),
scanner.next_u64_le(),
scanner.next_byte(),
))
}
fn enc_key(buf: &mut VecU8, key: &Self::Key) {
buf.extend(key.as_bytes());
}
fn enc_val(buf: &mut VecU8, val: &Self::Value) {
for layer in val.layers() {
super::obj::LayerRef::default_full_enc(buf, super::obj::LayerRef(layer))
}
unsafe fn decode_entry_data(
_: &mut BufferedScanner,
_: Self::EntryMetadata,
) -> Option<(Self::RestoredKey, Self::RestoredVal)> {
unimplemented!()
}
unsafe fn dec_key(scanner: &mut BufferedScanner, md: &Self::EntryMD) -> Option<Self::Key> {
unsafe fn decode_entry_key(
scanner: &mut BufferedScanner,
md: &Self::EntryMetadata,
) -> Option<Self::RestoredKey> {
inf::dec::utils::decode_string(scanner, md.field_id_l as usize)
.map(|s| s.into_boxed_str())
.ok()
}
unsafe fn dec_val(scanner: &mut BufferedScanner, md: &Self::EntryMD) -> Option<Self::Value> {
unsafe fn decode_entry_val(
scanner: &mut BufferedScanner,
md: &Self::EntryMetadata,
) -> Option<Self::RestoredVal> {
super::obj::FieldRef::obj_dec(
scanner,
FieldMD::new(md.field_prop_c, md.field_layer_c, md.null),
)
.ok()
}
// unimplemented
fn enc_entry(_: &mut VecU8, _: &Self::Key, _: &Self::Value) {
unimplemented!()
}
unsafe fn dec_entry(
_: &mut BufferedScanner,
_: Self::EntryMD,
) -> Option<(Self::Key, Self::Value)> {
unimplemented!()
}
}

@ -35,7 +35,7 @@ mod tests;
use crate::engine::{
error::{RuntimeResult, StorageError},
idx::{AsKey, AsValue, STIndex},
mem::BufferedScanner,
mem::{BufferedScanner, StatelessLen},
};
type VecU8 = Vec<u8>;
@ -142,64 +142,74 @@ pub trait PersistObject {
map spec
*/
/// specification for a persist map
pub trait PersistMapSpec {
/// map type
type MapType: STIndex<Self::Key, Self::Value>;
/// map iter
type MapIter<'a>: Iterator<Item = (&'a Self::Key, &'a Self::Value)>
pub trait AbstractMap<K, V> {
fn map_new() -> Self;
fn map_insert(&mut self, k: K, v: V) -> bool;
fn map_length(&self) -> usize;
}
impl<K: AsKey, V: AsValue, M: STIndex<K, V>> AbstractMap<K, V> for M {
fn map_new() -> Self {
Self::idx_init()
}
fn map_insert(&mut self, k: K, v: V) -> bool {
self.st_insert(k, v)
}
fn map_length(&self) -> usize {
self.st_len()
}
}
pub trait MapStorageSpec {
// in memory
type InMemoryMap: StatelessLen;
type InMemoryKey: ?Sized;
type InMemoryVal;
type InMemoryMapIter<'a>: Iterator<Item = (&'a Self::InMemoryKey, &'a Self::InMemoryVal)>
where
Self: 'a;
/// metadata type
type EntryMD;
/// key type (NOTE: set this to the true key type; handle any differences using the spec unless you have an entirely different
/// wrapper type)
type Key: AsKey;
/// value type (NOTE: see [`PersistMapSpec::Key`])
type Value: AsValue;
/// coupled enc
const ENC_COUPLED: bool;
/// coupled dec
const DEC_COUPLED: bool;
// collection misc
fn _get_iter<'a>(map: &'a Self::MapType) -> Self::MapIter<'a>;
// collection meta
/// pretest before jmp to routine for entire collection
fn pretest_collection_using_size(_: &BufferedScanner, _: usize) -> bool {
Self: 'a,
Self::InMemoryKey: 'a,
Self::InMemoryVal: 'a;
// from disk
type RestoredKey: AsKey;
type RestoredVal: AsValue;
type RestoredMap: AbstractMap<Self::RestoredKey, Self::RestoredVal>;
// metadata
type EntryMetadata;
// settings
const ENC_AS_ENTRY: bool;
const DEC_AS_ENTRY: bool;
// iterator
fn get_iter_from_memory<'a>(map: &'a Self::InMemoryMap) -> Self::InMemoryMapIter<'a>;
// encode
fn encode_entry_meta(buf: &mut VecU8, key: &Self::InMemoryKey, val: &Self::InMemoryVal);
fn encode_entry_data(buf: &mut VecU8, key: &Self::InMemoryKey, val: &Self::InMemoryVal);
fn encode_entry_key(buf: &mut VecU8, key: &Self::InMemoryKey);
fn encode_entry_val(buf: &mut VecU8, val: &Self::InMemoryVal);
// decode
fn decode_pretest_for_map(_: &BufferedScanner, _: usize) -> bool {
true
}
/// pretest before jmp to entry dec routine
fn pretest_entry_metadata(scanner: &BufferedScanner) -> bool;
/// pretest the src before jmp to entry data dec routine
fn pretest_entry_data(scanner: &BufferedScanner, md: &Self::EntryMD) -> bool;
// entry meta
/// enc the entry meta
fn entry_md_enc(buf: &mut VecU8, key: &Self::Key, val: &Self::Value);
/// dec the entry meta
/// SAFETY: ensure that all pretests have passed (we expect the caller to not be stupid)
unsafe fn entry_md_dec(scanner: &mut BufferedScanner) -> Option<Self::EntryMD>;
// independent packing
/// enc key (non-packed)
fn enc_key(buf: &mut VecU8, key: &Self::Key);
/// enc val (non-packed)
fn enc_val(buf: &mut VecU8, key: &Self::Value);
/// dec key (non-packed)
unsafe fn dec_key(scanner: &mut BufferedScanner, md: &Self::EntryMD) -> Option<Self::Key>;
/// dec val (non-packed)
unsafe fn dec_val(scanner: &mut BufferedScanner, md: &Self::EntryMD) -> Option<Self::Value>;
// coupled packing
/// entry packed enc
fn enc_entry(buf: &mut VecU8, key: &Self::Key, val: &Self::Value);
/// entry packed dec
unsafe fn dec_entry(
scanner: &mut BufferedScanner,
md: Self::EntryMD,
) -> Option<(Self::Key, Self::Value)>;
fn decode_pretest_for_entry_meta(scanner: &mut BufferedScanner) -> bool;
fn decode_pretest_for_entry_data(s: &mut BufferedScanner, md: &Self::EntryMetadata) -> bool;
unsafe fn decode_entry_meta(s: &mut BufferedScanner) -> Option<Self::EntryMetadata>;
unsafe fn decode_entry_data(
s: &mut BufferedScanner,
md: Self::EntryMetadata,
) -> Option<(Self::RestoredKey, Self::RestoredVal)>;
unsafe fn decode_entry_key(
s: &mut BufferedScanner,
md: &Self::EntryMetadata,
) -> Option<Self::RestoredKey>;
unsafe fn decode_entry_val(
s: &mut BufferedScanner,
md: &Self::EntryMetadata,
) -> Option<Self::RestoredVal>;
}
// enc
pub mod enc {
use super::{map, PersistMapSpec, PersistObject, VecU8};
use super::{map, MapStorageSpec, PersistObject, VecU8};
// obj
#[cfg(test)]
pub fn enc_full<Obj: PersistObject>(obj: Obj::InputType) -> Vec<u8> {
@ -215,12 +225,12 @@ pub mod enc {
enc_full::<Obj>(obj)
}
// dict
pub fn enc_dict_full<PM: PersistMapSpec>(dict: &PM::MapType) -> Vec<u8> {
pub fn enc_dict_full<PM: MapStorageSpec>(dict: &PM::InMemoryMap) -> Vec<u8> {
let mut v = vec![];
enc_dict_full_into_buffer::<PM>(&mut v, dict);
v
}
pub fn enc_dict_full_into_buffer<PM: PersistMapSpec>(buf: &mut VecU8, dict: &PM::MapType) {
pub fn enc_dict_full_into_buffer<PM: MapStorageSpec>(buf: &mut VecU8, dict: &PM::InMemoryMap) {
<map::PersistMapImpl<PM> as PersistObject>::default_full_enc(buf, dict)
}
}
@ -228,7 +238,7 @@ pub mod enc {
// dec
pub mod dec {
use {
super::{map, PersistMapSpec, PersistObject},
super::{map, MapStorageSpec, PersistObject},
crate::engine::{error::RuntimeResult, mem::BufferedScanner},
};
// obj
@ -243,13 +253,13 @@ pub mod dec {
Obj::default_full_dec(scanner)
}
// dec
pub fn dec_dict_full<PM: PersistMapSpec>(data: &[u8]) -> RuntimeResult<PM::MapType> {
pub fn dec_dict_full<PM: MapStorageSpec>(data: &[u8]) -> RuntimeResult<PM::RestoredMap> {
let mut scanner = BufferedScanner::new(data);
dec_dict_full_from_scanner::<PM>(&mut scanner)
}
fn dec_dict_full_from_scanner<PM: PersistMapSpec>(
fn dec_dict_full_from_scanner<PM: MapStorageSpec>(
scanner: &mut BufferedScanner,
) -> RuntimeResult<PM::MapType> {
) -> RuntimeResult<PM::RestoredMap> {
<map::PersistMapImpl<PM> as PersistObject>::default_full_dec(scanner)
}
pub mod utils {

@ -38,6 +38,7 @@ use {
DictGeneric,
},
error::{RuntimeResult, StorageError},
idx::IndexSTSeqCns,
mem::{BufferedScanner, VInline},
storage::v1::inf,
},
@ -378,6 +379,7 @@ impl<'a> PersistObject for FieldRef<'a> {
}
}
#[derive(Debug)]
pub struct ModelLayoutMD {
model_uuid: Uuid,
p_key_len: u64,
@ -435,7 +437,7 @@ impl<'a> PersistObject for ModelLayoutRef<'a> {
}
fn obj_enc(buf: &mut VecU8, ModelLayoutRef(model_definition): Self::InputType) {
buf.extend(model_definition.p_key().as_bytes());
<super::map::PersistMapImpl<super::map::FieldMapSpec> as PersistObject>::obj_enc(
<super::map::PersistMapImpl<super::map::FieldMapSpec<_>> as PersistObject>::obj_enc(
buf,
model_definition.fields(),
)
@ -445,11 +447,11 @@ impl<'a> PersistObject for ModelLayoutRef<'a> {
md: Self::Metadata,
) -> RuntimeResult<Self::OutputType> {
let key = inf::dec::utils::decode_string(scanner, md.p_key_len as usize)?;
let fieldmap =
<super::map::PersistMapImpl<super::map::FieldMapSpec> as PersistObject>::obj_dec(
scanner,
super::map::MapIndexSizeMD(md.field_c as usize),
)?;
let fieldmap = <super::map::PersistMapImpl<
super::map::FieldMapSpec<IndexSTSeqCns<Box<str>, _>>,
> as PersistObject>::obj_dec(
scanner, super::map::MapIndexSizeMD(md.field_c as usize)
)?;
let ptag = if md.p_key_tag > TagSelector::MAX as u64 {
return Err(StorageError::InternalDecodeStructureCorruptedPayload.into());
} else {

@ -80,8 +80,11 @@ fn fieldmap() {
"profile_pic".into(),
Field::new([Layer::bin()].into(), true),
);
let enc = super::enc::enc_dict_full::<super::map::FieldMapSpec>(&fields);
let dec = super::dec::dec_dict_full::<super::map::FieldMapSpec>(&enc).unwrap();
let enc = super::enc::enc_dict_full::<super::map::FieldMapSpec<_>>(&fields);
let dec = super::dec::dec_dict_full::<
super::map::FieldMapSpec<crate::engine::idx::IndexSTSeqCns<Box<str>, _>>,
>(&enc)
.unwrap();
for ((orig_field_id, orig_field), (restored_field_id, restored_field)) in
fields.stseq_ord_kv().zip(dec.stseq_ord_kv())
{

@ -76,20 +76,24 @@ impl SEInitState {
std::fs::create_dir(DATA_DIR).inherit_set_dmsg("creating data directory")?;
}
if !is_new {
let models = gns.idx_models().read();
let mut models = gns.idx_models().write();
// this is an existing instance, so read in all data
for (space_name, space) in gns.idx().read().iter() {
let space = space.read();
let space_uuid = space.get_uuid();
for model_name in space.models().iter() {
let model = models
.get(&EntityIDRef::new(&space_name, &model_name))
.get_mut(&EntityIDRef::new(&space_name, &model_name))
.unwrap();
let path =
Self::model_path(space_name, space_uuid, model_name, model.get_uuid());
let persist_driver = batch_jrnl::reinit(&path, model).inherit_set_dmsg(
format!("failed to restore model data from journal in `{path}`"),
)?;
unsafe {
// UNSAFE(@ohsayan): all pieces of data are upgraded by now, so vacuum
model.model_mutator().vacuum_stashed();
}
let _ = model_drivers.insert(
ModelUniqueID::new(space_name, model_name, model.get_uuid()),
FractalModelDriver::init(persist_driver),

@ -308,7 +308,7 @@ impl<Fs: RawFSInterface> SDSSFileTrackedWriter<Fs> {
cs: SCrc::new(),
})
}
pub fn write_unfsynced(&mut self, block: &[u8]) -> RuntimeResult<()> {
pub fn tracked_write_unfsynced(&mut self, block: &[u8]) -> RuntimeResult<()> {
self.untracked_write(block)
.map(|_| self.cs.recompute_with_new_var_block(block))
}
@ -322,8 +322,7 @@ impl<Fs: RawFSInterface> SDSSFileTrackedWriter<Fs> {
self.f.f.sync_write_cache()
}
pub fn reset_and_finish_checksum(&mut self) -> u64 {
let mut scrc = SCrc::new();
core::mem::swap(&mut self.cs, &mut scrc);
let scrc = core::mem::replace(&mut self.cs, SCrc::new());
scrc.finish()
}
pub fn into_inner_file(self) -> RuntimeResult<SDSSFileIO<Fs>> {

@ -158,7 +158,7 @@ impl<'a> PersistObject for ModelID<'a> {
fn with_space<T>(
gns: &GlobalNS,
space_id: &super::SpaceIDRes,
mut f: impl FnMut(&Space) -> RuntimeResult<T>,
f: impl FnOnce(&Space) -> RuntimeResult<T>,
) -> RuntimeResult<T> {
let spaces = gns.idx().read();
let Some(space) = spaces.st_get(&space_id.name) else {
@ -191,7 +191,7 @@ fn with_model_mut<T>(
gns: &GlobalNS,
space_id: &super::SpaceIDRes,
model_id: &ModelIDRes,
mut f: impl FnMut(&mut Model) -> RuntimeResult<T>,
f: impl FnOnce(&mut Model) -> RuntimeResult<T>,
) -> RuntimeResult<T> {
with_space(gns, space_id, |_| {
let mut models = gns.idx_models().write();
@ -394,14 +394,14 @@ impl<'a> PersistObject for AlterModelAddTxn<'a> {
}
fn obj_enc(buf: &mut Vec<u8>, data: Self::InputType) {
<ModelID as PersistObject>::obj_enc(buf, data.model_id);
<map::PersistMapImpl<map::FieldMapSpec> as PersistObject>::obj_enc(buf, data.new_fields);
<map::PersistMapImpl<map::FieldMapSpec<_>> as PersistObject>::obj_enc(buf, data.new_fields);
}
unsafe fn obj_dec(
s: &mut BufferedScanner,
md: Self::Metadata,
) -> RuntimeResult<Self::OutputType> {
let model_id = <ModelID as PersistObject>::obj_dec(s, md.model_id_meta)?;
let new_fields = <map::PersistMapImpl<map::FieldMapSpec> as PersistObject>::obj_dec(
let new_fields = <map::PersistMapImpl<map::FieldMapSpec<IndexSTSeqCns<Box<str>, _>>> as PersistObject>::obj_dec(
s,
map::MapIndexSizeMD(md.new_field_c as usize),
)?;
@ -424,25 +424,12 @@ impl<'a> GNSEvent for AlterModelAddTxn<'a> {
gns: &GlobalNS,
) -> RuntimeResult<()> {
with_model_mut(gns, &model_id.space_id, &model_id, |model| {
for (i, (field_name, field)) in new_fields.stseq_ord_kv().enumerate() {
if !model
.fields_mut()
.st_insert(field_name.to_owned(), field.clone())
{
// rollback; corrupted
new_fields.stseq_ord_key().take(i).for_each(|field_id| {
let _ = model.fields_mut().st_delete(field_id);
});
let mut mutator = model.model_mutator();
for (field_name, field) in new_fields.stseq_owned_kv() {
if !mutator.add_field(field_name, field) {
return Err(TransactionError::OnRestoreDataConflictMismatch.into());
}
}
// TODO(@ohsayan): avoid double iteration
// publish deltas
for field_name in new_fields.stseq_ord_key() {
model
.delta_state_mut()
.schema_append_unresolved_wl_field_add(field_name);
}
Ok(())
})
}
@ -542,28 +529,12 @@ impl<'a> GNSEvent for AlterModelRemoveTxn<'a> {
gns: &GlobalNS,
) -> RuntimeResult<()> {
with_model_mut(gns, &model_id.space_id, &model_id, |model| {
let mut removed_fields_rb = vec![];
let mut mutator = model.model_mutator();
for removed_field in removed_fields.iter() {
match model.fields_mut().st_delete_return(removed_field) {
Some(field) => {
removed_fields_rb.push((removed_field as &str, field));
}
None => {
// rollback
removed_fields_rb.into_iter().for_each(|(field_id, field)| {
let _ = model.fields_mut().st_insert(field_id.into(), field);
});
return Err(TransactionError::OnRestoreDataConflictMismatch.into());
}
if !mutator.remove_field(&removed_field) {
return Err(TransactionError::OnRestoreDataConflictMismatch.into());
}
}
// TODO(@ohsayan): avoid double iteration
// publish deltas
for field_name in removed_fields.iter() {
model
.delta_state_mut()
.schema_append_unresolved_wl_field_rem(field_name);
}
Ok(())
})
}
@ -598,7 +569,7 @@ pub struct AlterModelUpdateTxnMD {
#[derive(Debug, PartialEq)]
pub struct AlterModelUpdateTxnRestorePL {
pub(super) model_id: ModelIDRes,
pub(super) updated_fields: IndexST<Box<str>, Field>,
pub(super) updated_fields: IndexSTSeqCns<Box<str>, Field>,
}
impl<'a> PersistObject for AlterModelUpdateTxn<'a> {
@ -624,7 +595,7 @@ impl<'a> PersistObject for AlterModelUpdateTxn<'a> {
}
fn obj_enc(buf: &mut Vec<u8>, data: Self::InputType) {
<ModelID as PersistObject>::obj_enc(buf, data.model_id);
<map::PersistMapImpl<map::FieldMapSpecST> as PersistObject>::obj_enc(
<map::PersistMapImpl<map::FieldMapSpec<_>> as PersistObject>::obj_enc(
buf,
data.updated_fields,
);
@ -634,10 +605,11 @@ impl<'a> PersistObject for AlterModelUpdateTxn<'a> {
md: Self::Metadata,
) -> RuntimeResult<Self::OutputType> {
let model_id = <ModelID as PersistObject>::obj_dec(s, md.model_id_md)?;
let updated_fields = <map::PersistMapImpl<map::FieldMapSpecST> as PersistObject>::obj_dec(
s,
map::MapIndexSizeMD(md.updated_field_c as usize),
)?;
let updated_fields =
<map::PersistMapImpl<map::FieldMapSpec<IndexSTSeqCns<Box<str>, _>>> as PersistObject>::obj_dec(
s,
map::MapIndexSizeMD(md.updated_field_c as usize),
)?;
Ok(AlterModelUpdateTxnRestorePL {
model_id,
updated_fields,
@ -657,17 +629,10 @@ impl<'a> GNSEvent for AlterModelUpdateTxn<'a> {
gns: &GlobalNS,
) -> RuntimeResult<()> {
with_model_mut(gns, &model_id.space_id, &model_id, |model| {
let mut fields_rb = vec![];
for (field_id, field) in updated_fields.iter() {
match model.fields_mut().st_update_return(field_id, field.clone()) {
Some(f) => fields_rb.push((field_id as &str, f)),
None => {
// rollback
fields_rb.into_iter().for_each(|(field_id, field)| {
let _ = model.fields_mut().st_update(field_id, field);
});
return Err(TransactionError::OnRestoreDataConflictMismatch.into());
}
let mut mutator = model.model_mutator();
for (field_id, field) in updated_fields.stseq_owned_kv() {
if !mutator.update_field(&field_id, field) {
return Err(TransactionError::OnRestoreDataConflictMismatch.into());
}
}
Ok(())

@ -155,7 +155,9 @@ mod model_tests {
model.get_uuid(),
model.delta_state().schema_current_version().value_u64()
),
new_fields
new_fields: into_dict! {
"auth_2fa" => Field::new([Layer::bool()].into(), true),
}
},
decoded
);
@ -191,6 +193,10 @@ mod model_tests {
#[test]
fn alter_update() {
let (space, model) = default_space_model();
let updated_fields_copy = into_dict! {
// people of your social app will hate this, but hehe
"profile_pic" => Field::new([Layer::bin()].into(), false)
};
let updated_fields = into_dict! {
// people of your social app will hate this, but hehe
"profile_pic" => Field::new([Layer::bin()].into(), false)
@ -214,7 +220,7 @@ mod model_tests {
model.get_uuid(),
model.delta_state().schema_current_version().value_u64()
),
updated_fields
updated_fields: updated_fields_copy
},
decoded
);

Loading…
Cancel
Save