Add remaining DDL txns

next
Sayan Nandan 1 year ago
parent 138753c4ad
commit 891252a89d
No known key found for this signature in database
GPG Key ID: 42EEDF4AE9D96B54

@ -92,7 +92,7 @@ impl Space {
pub fn get_uuid(&self) -> Uuid {
self.uuid
}
pub(super) fn models(&self) -> &RWLIdx<Box<str>, Model> {
pub fn models(&self) -> &RWLIdx<Box<str>, Model> {
&self.mns
}
pub fn metadata(&self) -> &SpaceMeta {

@ -412,3 +412,68 @@ impl PersistMapSpec for FieldMapSpec {
unimplemented!()
}
}
// TODO(@ohsayan): common trait for k/v associations, independent of underlying maptype
pub struct FieldMapSpecST;
impl PersistMapSpec for FieldMapSpecST {
type MapIter<'a> = std::collections::hash_map::Iter<'a, Box<str>, Field>;
type MapType = std::collections::HashMap<Box<str>, Field>;
type EntryMD = FieldMapEntryMD;
type Key = Box<str>;
type Value = Field;
const ENC_COUPLED: bool = false;
const DEC_COUPLED: bool = false;
fn _get_iter<'a>(m: &'a Self::MapType) -> Self::MapIter<'a> {
m.iter()
}
fn pretest_entry_metadata(scanner: &BufferedScanner) -> bool {
scanner.has_left(sizeof!(u64, 3) + 1)
}
fn pretest_entry_data(scanner: &BufferedScanner, md: &Self::EntryMD) -> bool {
scanner.has_left(md.field_id_l as usize) // TODO(@ohsayan): we can enforce way more here such as atleast one field etc
}
fn entry_md_enc(buf: &mut VecU8, key: &Self::Key, val: &Self::Value) {
buf.extend(key.len().u64_bytes_le());
buf.extend(0u64.to_le_bytes()); // TODO(@ohsayan): props
buf.extend(val.layers().len().u64_bytes_le());
buf.push(val.is_nullable() as u8);
}
unsafe fn entry_md_dec(scanner: &mut BufferedScanner) -> Option<Self::EntryMD> {
Some(FieldMapEntryMD::new(
u64::from_le_bytes(scanner.next_chunk()),
u64::from_le_bytes(scanner.next_chunk()),
u64::from_le_bytes(scanner.next_chunk()),
scanner.next_byte(),
))
}
fn enc_key(buf: &mut VecU8, key: &Self::Key) {
buf.extend(key.as_bytes());
}
fn enc_val(buf: &mut VecU8, val: &Self::Value) {
for layer in val.layers() {
super::obj::LayerRef::default_full_enc(buf, super::obj::LayerRef(layer))
}
}
unsafe fn dec_key(scanner: &mut BufferedScanner, md: &Self::EntryMD) -> Option<Self::Key> {
inf::dec::utils::decode_string(scanner, md.field_id_l as usize)
.map(|s| s.into_boxed_str())
.ok()
}
unsafe fn dec_val(scanner: &mut BufferedScanner, md: &Self::EntryMD) -> Option<Self::Value> {
super::obj::FieldRef::obj_dec(
scanner,
FieldMD::new(md.field_prop_c, md.field_layer_c, md.null),
)
.ok()
}
// unimplemented
fn enc_entry(_: &mut VecU8, _: &Self::Key, _: &Self::Value) {
unimplemented!()
}
unsafe fn dec_entry(
_: &mut BufferedScanner,
_: Self::EntryMD,
) -> Option<(Self::Key, Self::Value)> {
unimplemented!()
}
}

@ -26,14 +26,18 @@
use {
super::{TransactionError, TransactionResult},
crate::engine::{
core::GlobalNS,
storage::v1::{
inf::{self, PersistObject},
BufferedScanner, JournalAdapter, JournalWriter,
crate::{
engine::{
core::GlobalNS,
data::uuid::Uuid,
storage::v1::{
inf::{self, PersistObject},
BufferedScanner, JournalAdapter, JournalWriter, SDSSResult,
},
},
util::EndianQW,
},
std::fs::File,
std::{fs::File, marker::PhantomData},
};
mod model;
@ -121,3 +125,48 @@ where
/// Update the global state from the restored event
fn update_global_state(restore: Self::RestoreType, gns: &GlobalNS) -> TransactionResult<()>;
}
#[derive(Debug, Clone, Copy)]
pub struct SpaceIDRef<'a> {
uuid: Uuid,
name: &'a str,
}
pub struct SpaceIDRes {
uuid: Uuid,
name: Box<str>,
}
struct SpaceID<'a>(PhantomData<SpaceIDRef<'a>>);
pub struct SpaceIDMD {
uuid: Uuid,
space_name_l: u64,
}
impl<'a> PersistObject for SpaceID<'a> {
const METADATA_SIZE: usize = sizeof!(u128) + sizeof!(u64);
type InputType = SpaceIDRef<'a>;
type OutputType = SpaceIDRes;
type Metadata = SpaceIDMD;
fn pretest_can_dec_object(scanner: &BufferedScanner, md: &Self::Metadata) -> bool {
scanner.has_left(md.space_name_l as usize)
}
fn meta_enc(buf: &mut Vec<u8>, data: Self::InputType) {
buf.extend(data.uuid.to_le_bytes());
buf.extend(data.name.len().u64_bytes_le());
}
unsafe fn meta_dec(scanner: &mut BufferedScanner) -> SDSSResult<Self::Metadata> {
Ok(SpaceIDMD {
uuid: Uuid::from_bytes(scanner.next_chunk()),
space_name_l: u64::from_le_bytes(scanner.next_chunk()),
})
}
fn obj_enc(buf: &mut Vec<u8>, data: Self::InputType) {
buf.extend(data.name.as_bytes());
}
unsafe fn obj_dec(s: &mut BufferedScanner, md: Self::Metadata) -> SDSSResult<Self::OutputType> {
let str = inf::dec::utils::decode_string(s, md.space_name_l as usize)?;
Ok(SpaceIDRes {
uuid: md.uuid,
name: str.into_boxed_str(),
})
}
}

@ -25,15 +25,20 @@
*/
use {
super::GNSEvent,
super::{GNSEvent, TransactionResult},
crate::{
engine::{
core::model::{delta::IRModel, Model},
core::{
model::{delta::IRModel, Field, Model},
space::Space,
GlobalNS,
},
data::uuid::Uuid,
idx::STIndex,
idx::{IndexST, IndexSTSeqCns, STIndex, STIndexSeq},
ql::lex::Ident,
storage::v1::{
inf::{self, obj, PersistObject},
BufferedScanner, SDSSResult,
inf::{self, map, obj, PersistObject},
BufferedScanner, SDSSError, SDSSResult,
},
txn::TransactionError,
},
@ -42,6 +47,91 @@ use {
std::marker::PhantomData,
};
pub struct ModelID<'a>(PhantomData<&'a ()>);
#[derive(Debug, Clone, Copy)]
pub struct ModelIDRef<'a> {
model_name: &'a str,
model_uuid: Uuid,
model_version: u64,
}
pub struct ModelIDRes {
model_name: Box<str>,
model_uuid: Uuid,
model_version: u64,
}
pub struct ModelIDMD {
model_name_l: u64,
model_version: u64,
model_uuid: Uuid,
}
impl<'a> PersistObject for ModelID<'a> {
const METADATA_SIZE: usize = sizeof!(u64, 2) + sizeof!(u128);
type InputType = ModelIDRef<'a>;
type OutputType = ModelIDRes;
type Metadata = ModelIDMD;
fn pretest_can_dec_object(scanner: &BufferedScanner, md: &Self::Metadata) -> bool {
scanner.has_left(md.model_name_l as usize)
}
fn meta_enc(buf: &mut Vec<u8>, data: Self::InputType) {
buf.extend(data.model_name.len().u64_bytes_le());
buf.extend(data.model_version.to_le_bytes());
buf.extend(data.model_uuid.to_le_bytes());
}
unsafe fn meta_dec(scanner: &mut BufferedScanner) -> SDSSResult<Self::Metadata> {
Ok(ModelIDMD {
model_name_l: u64::from_le_bytes(scanner.next_chunk()),
model_version: u64::from_le_bytes(scanner.next_chunk()),
model_uuid: Uuid::from_bytes(scanner.next_chunk()),
})
}
fn obj_enc(buf: &mut Vec<u8>, data: Self::InputType) {
buf.extend(data.model_name.as_bytes());
}
unsafe fn obj_dec(s: &mut BufferedScanner, md: Self::Metadata) -> SDSSResult<Self::OutputType> {
Ok(ModelIDRes {
model_name: inf::dec::utils::decode_string(s, md.model_name_l as usize)?
.into_boxed_str(),
model_uuid: md.model_uuid,
model_version: md.model_version,
})
}
}
fn with_space<T>(
gns: &GlobalNS,
space_id: &super::SpaceIDRes,
mut f: impl FnMut(&Space) -> TransactionResult<T>,
) -> TransactionResult<T> {
let spaces = gns.spaces().read();
let Some(space) = spaces.st_get(&space_id.name) else {
return Err(TransactionError::OnRestoreDataMissing);
};
if space.get_uuid() != space_id.uuid {
return Err(TransactionError::OnRestoreDataConflictMismatch);
}
f(space)
}
fn with_model<T>(
gns: &GlobalNS,
space_id: &super::SpaceIDRes,
model_id: &ModelIDRes,
mut f: impl FnMut(&Model) -> TransactionResult<T>,
) -> TransactionResult<T> {
with_space(gns, space_id, |space| {
let models = space.models().read();
let Some(model) = models.st_get(&model_id.model_name) else {
return Err(TransactionError::OnRestoreDataMissing);
};
if model.get_uuid() != model_id.model_uuid {
// this should have been handled by an earlier transaction
return Err(TransactionError::OnRestoreDataConflictMismatch);
}
f(model)
})
}
/*
create model
*/
@ -58,8 +148,10 @@ impl<'a> CreateModelTxn<'a> {
model_read: &'a IRModel<'a>,
) -> CreateModelTxnCommitPL<'a> {
CreateModelTxnCommitPL {
space_name,
space_uuid,
space_id: super::SpaceIDRef {
uuid: space_uuid,
name: space_name,
},
model_name,
model,
model_read,
@ -69,30 +161,28 @@ impl<'a> CreateModelTxn<'a> {
#[derive(Clone, Copy)]
pub struct CreateModelTxnCommitPL<'a> {
space_name: &'a str,
space_uuid: Uuid,
space_id: super::SpaceIDRef<'a>,
model_name: &'a str,
model: &'a Model,
model_read: &'a IRModel<'a>,
}
pub struct CreateModelTxnRestorePL {
space_name: Box<str>,
space_uuid: Uuid,
space_id: super::SpaceIDRes,
model_name: Box<str>,
model: Model,
}
pub struct CreateModelTxnMD {
space_name_l: u64,
space_uuid: Uuid,
space_id_meta: super::SpaceIDMD,
model_name_l: u64,
model_meta: <obj::ModelLayoutRef<'static> as PersistObject>::Metadata,
}
impl<'a> PersistObject for CreateModelTxn<'a> {
const METADATA_SIZE: usize =
sizeof!(u64, 2) + sizeof!(u128) + <obj::ModelLayoutRef<'a> as PersistObject>::METADATA_SIZE;
const METADATA_SIZE: usize = <super::SpaceID as PersistObject>::METADATA_SIZE
+ sizeof!(u64)
+ <obj::ModelLayoutRef<'a> as PersistObject>::METADATA_SIZE;
type InputType = CreateModelTxnCommitPL<'a>;
type OutputType = CreateModelTxnRestorePL;
type Metadata = CreateModelTxnMD;
@ -100,42 +190,44 @@ impl<'a> PersistObject for CreateModelTxn<'a> {
scanner.has_left((md.model_meta.p_key_len() + md.model_name_l) as usize)
}
fn meta_enc(buf: &mut Vec<u8>, data: Self::InputType) {
buf.extend(data.space_name.len().u64_bytes_le());
buf.extend(data.space_uuid.to_le_bytes());
// space ID
<super::SpaceID as PersistObject>::meta_enc(buf, data.space_id);
// model name
buf.extend(data.model_name.len().u64_bytes_le());
// model meta dump
<obj::ModelLayoutRef as PersistObject>::meta_enc(
buf,
obj::ModelLayoutRef::from((data.model, data.model_read)),
)
}
unsafe fn meta_dec(scanner: &mut BufferedScanner) -> SDSSResult<Self::Metadata> {
let space_name_l = u64::from_le_bytes(scanner.next_chunk());
let space_uuid = Uuid::from_bytes(scanner.next_chunk());
let space_id = <super::SpaceID as PersistObject>::meta_dec(scanner)?;
let model_name_l = u64::from_le_bytes(scanner.next_chunk());
let model_meta = <obj::ModelLayoutRef as PersistObject>::meta_dec(scanner)?;
Ok(CreateModelTxnMD {
space_name_l,
space_uuid,
space_id_meta: space_id,
model_name_l,
model_meta,
})
}
fn obj_enc(buf: &mut Vec<u8>, data: Self::InputType) {
// space id dump
<super::SpaceID as PersistObject>::obj_enc(buf, data.space_id);
// model name
buf.extend(data.model_name.as_bytes());
// model dump
<obj::ModelLayoutRef as PersistObject>::obj_enc(
buf,
obj::ModelLayoutRef::from((data.model, data.model_read)),
)
}
unsafe fn obj_dec(s: &mut BufferedScanner, md: Self::Metadata) -> SDSSResult<Self::OutputType> {
let space_name =
inf::dec::utils::decode_string(s, md.space_name_l as usize)?.into_boxed_str();
let space_id = <super::SpaceID as PersistObject>::obj_dec(s, md.space_id_meta)?;
let model_name =
inf::dec::utils::decode_string(s, md.model_name_l as usize)?.into_boxed_str();
let model = <obj::ModelLayoutRef as PersistObject>::obj_dec(s, md.model_meta)?;
Ok(CreateModelTxnRestorePL {
space_name,
space_uuid: md.space_uuid,
space_id,
model_name,
model,
})
@ -148,12 +240,11 @@ impl<'a> GNSEvent for CreateModelTxn<'a> {
type RestoreType = CreateModelTxnRestorePL;
fn update_global_state(
CreateModelTxnRestorePL {
space_name,
space_uuid,
space_id,
model_name,
model,
}: Self::RestoreType,
gns: &crate::engine::core::GlobalNS,
gns: &GlobalNS,
) -> crate::engine::txn::TransactionResult<()> {
let rgns = gns.spaces().read();
/*
@ -164,8 +255,8 @@ impl<'a> GNSEvent for CreateModelTxn<'a> {
There is no evident way about how this is going to be handled, but the ideal way would be to keep
versioned index of schemas.
*/
match rgns.st_get(&space_name) {
Some(space) if space.get_uuid() == space_uuid => {
match rgns.st_get(&space_id.name) {
Some(space) if space.get_uuid() == space_id.uuid => {
if space._create_model(&model_name, model).is_ok() {
Ok(())
} else {
@ -177,3 +268,387 @@ impl<'a> GNSEvent for CreateModelTxn<'a> {
}
}
}
/*
alter model add
*/
pub struct AlterModelAddTxn<'a>(PhantomData<&'a ()>);
#[derive(Debug, Clone, Copy)]
pub struct AlterModelAddTxnCommitPL<'a> {
space_id: super::SpaceIDRef<'a>,
model_id: ModelIDRef<'a>,
new_fields: &'a IndexSTSeqCns<Box<str>, Field>,
}
pub struct AlterModelAddTxnMD {
space_id_meta: super::SpaceIDMD,
model_id_meta: ModelIDMD,
new_field_c: u64,
}
pub struct AlterModelAddTxnRestorePL {
space_id: super::SpaceIDRes,
model_id: ModelIDRes,
new_fields: IndexSTSeqCns<Box<str>, Field>,
}
impl<'a> PersistObject for AlterModelAddTxn<'a> {
const METADATA_SIZE: usize = <super::SpaceID as PersistObject>::METADATA_SIZE
+ <ModelID as PersistObject>::METADATA_SIZE
+ sizeof!(u64);
type InputType = AlterModelAddTxnCommitPL<'a>;
type OutputType = AlterModelAddTxnRestorePL;
type Metadata = AlterModelAddTxnMD;
fn pretest_can_dec_object(scanner: &BufferedScanner, md: &Self::Metadata) -> bool {
scanner.has_left((md.space_id_meta.space_name_l + md.model_id_meta.model_name_l) as usize)
}
fn meta_enc(buf: &mut Vec<u8>, data: Self::InputType) {
<super::SpaceID as PersistObject>::meta_enc(buf, data.space_id);
<ModelID as PersistObject>::meta_enc(buf, data.model_id);
buf.extend(data.new_fields.st_len().u64_bytes_le());
}
unsafe fn meta_dec(scanner: &mut BufferedScanner) -> SDSSResult<Self::Metadata> {
let space_id_meta = <super::SpaceID as PersistObject>::meta_dec(scanner)?;
let model_id_meta = <ModelID as PersistObject>::meta_dec(scanner)?;
let new_field_c = u64::from_le_bytes(scanner.next_chunk());
Ok(AlterModelAddTxnMD {
space_id_meta,
model_id_meta,
new_field_c,
})
}
fn obj_enc(buf: &mut Vec<u8>, data: Self::InputType) {
<super::SpaceID as PersistObject>::obj_enc(buf, data.space_id);
<ModelID as PersistObject>::obj_enc(buf, data.model_id);
<map::PersistMapImpl<map::FieldMapSpec> as PersistObject>::obj_enc(buf, data.new_fields);
}
unsafe fn obj_dec(s: &mut BufferedScanner, md: Self::Metadata) -> SDSSResult<Self::OutputType> {
let space_id = <super::SpaceID as PersistObject>::obj_dec(s, md.space_id_meta)?;
let model_id = <ModelID as PersistObject>::obj_dec(s, md.model_id_meta)?;
let new_fields = <map::PersistMapImpl<map::FieldMapSpec> as PersistObject>::obj_dec(
s,
map::MapIndexSizeMD(md.new_field_c as usize),
)?;
Ok(AlterModelAddTxnRestorePL {
space_id,
model_id,
new_fields,
})
}
}
impl<'a> GNSEvent for AlterModelAddTxn<'a> {
const OPC: u16 = 4;
type CommitType = AlterModelAddTxnCommitPL<'a>;
type RestoreType = AlterModelAddTxnRestorePL;
fn update_global_state(
AlterModelAddTxnRestorePL {
space_id,
model_id,
new_fields,
}: Self::RestoreType,
gns: &GlobalNS,
) -> crate::engine::txn::TransactionResult<()> {
with_model(gns, &space_id, &model_id, |model| {
let mut wmodel = model.intent_write_model();
for (i, (field_name, field)) in new_fields.stseq_ord_kv().enumerate() {
if !wmodel
.fields_mut()
.st_insert(field_name.to_owned(), field.clone())
{
// rollback; corrupted
new_fields.stseq_ord_key().take(i).for_each(|field_id| {
let _ = wmodel.fields_mut().st_delete(field_id);
});
return Err(TransactionError::OnRestoreDataConflictMismatch);
}
}
Ok(())
})
}
}
/*
alter model remove
*/
pub struct AlterModelRemoveTxn<'a>(PhantomData<&'a ()>);
#[derive(Debug, Clone, Copy)]
pub struct AlterModelRemoveTxnCommitPL<'a> {
space_id: super::SpaceIDRef<'a>,
model_id: ModelIDRef<'a>,
removed_fields: &'a [Ident<'a>],
}
pub struct AlterModelRemoveTxnMD {
space_id_meta: super::SpaceIDMD,
model_id_meta: ModelIDMD,
remove_field_c: u64,
}
pub struct AlterModelRemoveTxnRestorePL {
space_id: super::SpaceIDRes,
model_id: ModelIDRes,
removed_fields: Box<[Box<str>]>,
}
impl<'a> PersistObject for AlterModelRemoveTxn<'a> {
const METADATA_SIZE: usize = <super::SpaceID as PersistObject>::METADATA_SIZE
+ <ModelID as PersistObject>::METADATA_SIZE
+ sizeof!(u64);
type InputType = AlterModelRemoveTxnCommitPL<'a>;
type OutputType = AlterModelRemoveTxnRestorePL;
type Metadata = AlterModelRemoveTxnMD;
fn pretest_can_dec_object(scanner: &BufferedScanner, md: &Self::Metadata) -> bool {
scanner.has_left((md.space_id_meta.space_name_l + md.model_id_meta.model_name_l) as usize)
}
fn meta_enc(buf: &mut Vec<u8>, data: Self::InputType) {
<super::SpaceID as PersistObject>::meta_enc(buf, data.space_id);
<ModelID as PersistObject>::meta_enc(buf, data.model_id);
buf.extend(data.removed_fields.len().u64_bytes_le());
}
unsafe fn meta_dec(scanner: &mut BufferedScanner) -> SDSSResult<Self::Metadata> {
let space_id_meta = <super::SpaceID as PersistObject>::meta_dec(scanner)?;
let model_id_meta = <ModelID as PersistObject>::meta_dec(scanner)?;
Ok(AlterModelRemoveTxnMD {
space_id_meta,
model_id_meta,
remove_field_c: u64::from_le_bytes(scanner.next_chunk()),
})
}
fn obj_enc(buf: &mut Vec<u8>, data: Self::InputType) {
<super::SpaceID as PersistObject>::obj_enc(buf, data.space_id);
<ModelID as PersistObject>::obj_enc(buf, data.model_id);
for field in data.removed_fields {
buf.extend(field.len().u64_bytes_le());
buf.extend(field.as_bytes());
}
}
unsafe fn obj_dec(s: &mut BufferedScanner, md: Self::Metadata) -> SDSSResult<Self::OutputType> {
let space_id = <super::SpaceID as PersistObject>::obj_dec(s, md.space_id_meta)?;
let model_id = <ModelID as PersistObject>::obj_dec(s, md.model_id_meta)?;
let mut removed_fields = Vec::with_capacity(md.remove_field_c as usize);
while !s.eof()
& (removed_fields.len() as u64 != md.remove_field_c)
& s.has_left(sizeof!(u64))
{
let len = u64::from_le_bytes(s.next_chunk()) as usize;
if !s.has_left(len) {
break;
}
removed_fields.push(inf::dec::utils::decode_string(s, len)?.into_boxed_str());
}
if removed_fields.len() as u64 != md.remove_field_c {
return Err(SDSSError::InternalDecodeStructureCorruptedPayload);
}
Ok(AlterModelRemoveTxnRestorePL {
space_id,
model_id,
removed_fields: removed_fields.into_boxed_slice(),
})
}
}
impl<'a> GNSEvent for AlterModelRemoveTxn<'a> {
const OPC: u16 = 5;
type CommitType = AlterModelRemoveTxnCommitPL<'a>;
type RestoreType = AlterModelRemoveTxnRestorePL;
fn update_global_state(
AlterModelRemoveTxnRestorePL {
space_id,
model_id,
removed_fields,
}: Self::RestoreType,
gns: &GlobalNS,
) -> crate::engine::txn::TransactionResult<()> {
with_model(gns, &space_id, &model_id, |model| {
let mut iwm = model.intent_write_model();
let mut removed_fields_rb = vec![];
for removed_field in removed_fields.iter() {
match iwm.fields_mut().st_delete_return(removed_field) {
Some(field) => {
removed_fields_rb.push((removed_field as &str, field));
}
None => {
// rollback
removed_fields_rb.into_iter().for_each(|(field_id, field)| {
let _ = iwm.fields_mut().st_insert(field_id.into(), field);
});
return Err(TransactionError::OnRestoreDataConflictMismatch);
}
}
}
Ok(())
})
}
}
/*
alter model update
*/
pub struct AlterModelUpdateTxn<'a>(PhantomData<&'a ()>);
#[derive(Debug, Clone, Copy)]
pub struct AlterModelUpdateTxnCommitPL<'a> {
space_id: super::SpaceIDRef<'a>,
model_id: ModelIDRef<'a>,
updated_fields: &'a IndexST<Box<str>, Field>,
}
pub struct AlterModelUpdateTxnMD {
space_id_md: super::SpaceIDMD,
model_id_md: ModelIDMD,
updated_field_c: u64,
}
pub struct AlterModelUpdateTxnRestorePL {
space_id: super::SpaceIDRes,
model_id: ModelIDRes,
updated_fields: IndexST<Box<str>, Field>,
}
impl<'a> PersistObject for AlterModelUpdateTxn<'a> {
const METADATA_SIZE: usize = <super::SpaceID as PersistObject>::METADATA_SIZE
+ <ModelID as PersistObject>::METADATA_SIZE
+ sizeof!(u64);
type InputType = AlterModelUpdateTxnCommitPL<'a>;
type OutputType = AlterModelUpdateTxnRestorePL;
type Metadata = AlterModelUpdateTxnMD;
fn pretest_can_dec_object(scanner: &BufferedScanner, md: &Self::Metadata) -> bool {
scanner
.has_left(md.space_id_md.space_name_l as usize + md.model_id_md.model_name_l as usize)
}
fn meta_enc(buf: &mut Vec<u8>, data: Self::InputType) {
<super::SpaceID as PersistObject>::meta_enc(buf, data.space_id);
<ModelID as PersistObject>::meta_enc(buf, data.model_id);
buf.extend(data.updated_fields.st_len().u64_bytes_le());
}
unsafe fn meta_dec(scanner: &mut BufferedScanner) -> SDSSResult<Self::Metadata> {
let space_id_md = <super::SpaceID as PersistObject>::meta_dec(scanner)?;
let model_id_md = <ModelID as PersistObject>::meta_dec(scanner)?;
Ok(AlterModelUpdateTxnMD {
space_id_md,
model_id_md,
updated_field_c: u64::from_le_bytes(scanner.next_chunk()),
})
}
fn obj_enc(buf: &mut Vec<u8>, data: Self::InputType) {
<super::SpaceID as PersistObject>::obj_enc(buf, data.space_id);
<ModelID as PersistObject>::obj_enc(buf, data.model_id);
<map::PersistMapImpl<map::FieldMapSpecST> as PersistObject>::obj_enc(
buf,
data.updated_fields,
);
}
unsafe fn obj_dec(s: &mut BufferedScanner, md: Self::Metadata) -> SDSSResult<Self::OutputType> {
let space_id = <super::SpaceID as PersistObject>::obj_dec(s, md.space_id_md)?;
let model_id = <ModelID as PersistObject>::obj_dec(s, md.model_id_md)?;
let updated_fields = <map::PersistMapImpl<map::FieldMapSpecST> as PersistObject>::obj_dec(
s,
map::MapIndexSizeMD(md.updated_field_c as usize),
)?;
Ok(AlterModelUpdateTxnRestorePL {
space_id,
model_id,
updated_fields,
})
}
}
impl<'a> GNSEvent for AlterModelUpdateTxn<'a> {
const OPC: u16 = 6;
type CommitType = AlterModelUpdateTxnCommitPL<'a>;
type RestoreType = AlterModelUpdateTxnRestorePL;
fn update_global_state(
AlterModelUpdateTxnRestorePL {
space_id,
model_id,
updated_fields,
}: Self::RestoreType,
gns: &GlobalNS,
) -> TransactionResult<()> {
with_model(gns, &space_id, &model_id, |model| {
let mut iwm = model.intent_write_model();
let mut fields_rb = vec![];
for (field_id, field) in updated_fields.iter() {
match iwm.fields_mut().st_update_return(field_id, field.clone()) {
Some(f) => fields_rb.push((field_id as &str, f)),
None => {
// rollback
fields_rb.into_iter().for_each(|(field_id, field)| {
let _ = iwm.fields_mut().st_update(field_id, field);
});
return Err(TransactionError::OnRestoreDataConflictMismatch);
}
}
}
Ok(())
})
}
}
/*
drop model
*/
pub struct DropModelTxn<'a>(PhantomData<&'a ()>);
#[derive(Debug, Clone, Copy)]
pub struct DropModelTxnCommitPL<'a> {
space_id: super::SpaceIDRef<'a>,
model_id: ModelIDRef<'a>,
}
pub struct DropModelTxnMD {
space_id_md: super::SpaceIDMD,
model_id_md: ModelIDMD,
}
pub struct DropModelTxnRestorePL {
space_id: super::SpaceIDRes,
model_id: ModelIDRes,
}
impl<'a> PersistObject for DropModelTxn<'a> {
const METADATA_SIZE: usize = <super::SpaceID as PersistObject>::METADATA_SIZE
+ <ModelID as PersistObject>::METADATA_SIZE;
type InputType = DropModelTxnCommitPL<'a>;
type OutputType = DropModelTxnRestorePL;
type Metadata = DropModelTxnMD;
fn pretest_can_dec_object(scanner: &BufferedScanner, md: &Self::Metadata) -> bool {
scanner
.has_left(md.space_id_md.space_name_l as usize + md.model_id_md.model_name_l as usize)
}
fn meta_enc(buf: &mut Vec<u8>, data: Self::InputType) {
<super::SpaceID as PersistObject>::meta_enc(buf, data.space_id);
<ModelID as PersistObject>::meta_enc(buf, data.model_id);
}
unsafe fn meta_dec(scanner: &mut BufferedScanner) -> SDSSResult<Self::Metadata> {
let space_id_md = <super::SpaceID as PersistObject>::meta_dec(scanner)?;
let model_id_md = <ModelID as PersistObject>::meta_dec(scanner)?;
Ok(DropModelTxnMD {
space_id_md,
model_id_md,
})
}
fn obj_enc(buf: &mut Vec<u8>, data: Self::InputType) {
<super::SpaceID as PersistObject>::obj_enc(buf, data.space_id);
<ModelID as PersistObject>::obj_enc(buf, data.model_id);
}
unsafe fn obj_dec(s: &mut BufferedScanner, md: Self::Metadata) -> SDSSResult<Self::OutputType> {
let space_id = <super::SpaceID as PersistObject>::obj_dec(s, md.space_id_md)?;
let model_id = <ModelID as PersistObject>::obj_dec(s, md.model_id_md)?;
Ok(DropModelTxnRestorePL { space_id, model_id })
}
}
impl<'a> GNSEvent for DropModelTxn<'a> {
const OPC: u16 = 7;
type CommitType = DropModelTxnCommitPL<'a>;
type RestoreType = DropModelTxnRestorePL;
fn update_global_state(
DropModelTxnRestorePL { space_id, model_id }: Self::RestoreType,
gns: &GlobalNS,
) -> TransactionResult<()> {
with_space(gns, &space_id, |space| {
let mut wgns = space.models().write();
match wgns.st_delete_if(&model_id.model_name, |mdl| {
mdl.get_uuid() == model_id.model_uuid
}) {
Some(true) => Ok(()),
Some(false) => return Err(TransactionError::OnRestoreDataConflictMismatch),
None => Err(TransactionError::OnRestoreDataMissing),
}
})
}
}

@ -144,33 +144,30 @@ pub struct AlterSpaceTxn<'a>(PhantomData<&'a ()>);
impl<'a> AlterSpaceTxn<'a> {
pub const fn new_commit(
space_uuid: Uuid,
space_name: &'a str,
uuid: Uuid,
name: &'a str,
space_meta: &'a DictGeneric,
) -> AlterSpaceTxnCommitPL<'a> {
AlterSpaceTxnCommitPL {
space_uuid,
space_name,
space_id: super::SpaceIDRef { uuid, name },
space_meta,
}
}
}
pub struct AlterSpaceTxnMD {
uuid: Uuid,
space_name_l: u64,
space_id_meta: super::SpaceIDMD,
dict_len: u64,
}
#[derive(Clone, Copy)]
pub struct AlterSpaceTxnCommitPL<'a> {
space_uuid: Uuid,
space_name: &'a str,
space_id: super::SpaceIDRef<'a>,
space_meta: &'a DictGeneric,
}
pub struct AlterSpaceTxnRestorePL {
space_name: Box<str>,
space_id: super::SpaceIDRes,
space_meta: DictGeneric,
}
@ -180,33 +177,30 @@ impl<'a> PersistObject for AlterSpaceTxn<'a> {
type OutputType = AlterSpaceTxnRestorePL;
type Metadata = AlterSpaceTxnMD;
fn pretest_can_dec_object(scanner: &BufferedScanner, md: &Self::Metadata) -> bool {
scanner.has_left(md.space_name_l as usize)
scanner.has_left(md.space_id_meta.space_name_l as usize)
}
fn meta_enc(buf: &mut Vec<u8>, data: Self::InputType) {
buf.extend(data.space_uuid.to_le_bytes());
buf.extend(data.space_name.len().u64_bytes_le());
<super::SpaceID as PersistObject>::meta_enc(buf, data.space_id);
buf.extend(data.space_meta.len().u64_bytes_le());
}
unsafe fn meta_dec(scanner: &mut BufferedScanner) -> SDSSResult<Self::Metadata> {
Ok(AlterSpaceTxnMD {
uuid: Uuid::from_bytes(scanner.next_chunk()),
space_name_l: u64::from_le_bytes(scanner.next_chunk()),
space_id_meta: <super::SpaceID as PersistObject>::meta_dec(scanner)?,
dict_len: u64::from_le_bytes(scanner.next_chunk()),
})
}
fn obj_enc(buf: &mut Vec<u8>, data: Self::InputType) {
buf.extend(data.space_name.as_bytes());
<super::SpaceID as PersistObject>::obj_enc(buf, data.space_id);
<map::PersistMapImpl<map::GenericDictSpec> as PersistObject>::obj_enc(buf, data.space_meta);
}
unsafe fn obj_dec(s: &mut BufferedScanner, md: Self::Metadata) -> SDSSResult<Self::OutputType> {
let space_name =
inf::dec::utils::decode_string(s, md.space_name_l as usize)?.into_boxed_str();
let space_id = <super::SpaceID as PersistObject>::obj_dec(s, md.space_id_meta)?;
let space_meta = <map::PersistMapImpl<map::GenericDictSpec> as PersistObject>::obj_dec(
s,
map::MapIndexSizeMD(md.dict_len as usize),
)?;
Ok(AlterSpaceTxnRestorePL {
space_name,
space_id,
space_meta,
})
}
@ -221,13 +215,13 @@ impl<'a> GNSEvent for AlterSpaceTxn<'a> {
fn update_global_state(
AlterSpaceTxnRestorePL {
space_name,
space_id,
space_meta,
}: Self::RestoreType,
gns: &crate::engine::core::GlobalNS,
) -> TransactionResult<()> {
let gns = gns.spaces().read();
match gns.st_get(&space_name) {
match gns.st_get(&space_id.name) {
Some(space) => {
let mut wmeta = space.metadata().env().write();
space_meta
@ -248,67 +242,50 @@ impl<'a> GNSEvent for AlterSpaceTxn<'a> {
pub struct DropSpaceTxn<'a>(PhantomData<&'a ()>);
impl<'a> DropSpaceTxn<'a> {
pub const fn new_commit(space_name: &'a str, uuid: Uuid) -> DropSpaceTxnCommitPL<'a> {
DropSpaceTxnCommitPL { space_name, uuid }
pub const fn new_commit(name: &'a str, uuid: Uuid) -> DropSpaceTxnCommitPL<'a> {
DropSpaceTxnCommitPL {
space_id: super::SpaceIDRef { uuid, name },
}
}
}
pub struct DropSpaceTxnMD {
space_name_l: u64,
uuid: Uuid,
}
#[derive(Clone, Copy)]
pub struct DropSpaceTxnCommitPL<'a> {
space_name: &'a str,
uuid: Uuid,
}
pub struct DropSpaceTxnRestorePL {
uuid: Uuid,
space_name: Box<str>,
space_id: super::SpaceIDRef<'a>,
}
impl<'a> PersistObject for DropSpaceTxn<'a> {
const METADATA_SIZE: usize = sizeof!(u128) + sizeof!(u64);
type InputType = DropSpaceTxnCommitPL<'a>;
type OutputType = DropSpaceTxnRestorePL;
type Metadata = DropSpaceTxnMD;
type OutputType = super::SpaceIDRes;
type Metadata = super::SpaceIDMD;
fn pretest_can_dec_object(scanner: &BufferedScanner, md: &Self::Metadata) -> bool {
scanner.has_left(md.space_name_l as usize)
}
fn meta_enc(buf: &mut Vec<u8>, data: Self::InputType) {
buf.extend(data.space_name.len().u64_bytes_le());
buf.extend(data.uuid.to_le_bytes());
<super::SpaceID as PersistObject>::meta_enc(buf, data.space_id);
}
unsafe fn meta_dec(scanner: &mut BufferedScanner) -> SDSSResult<Self::Metadata> {
Ok(DropSpaceTxnMD {
space_name_l: u64::from_le_bytes(scanner.next_chunk()),
uuid: Uuid::from_bytes(scanner.next_chunk()),
})
<super::SpaceID as PersistObject>::meta_dec(scanner)
}
fn obj_enc(buf: &mut Vec<u8>, data: Self::InputType) {
buf.extend(data.space_name.as_bytes());
<super::SpaceID as PersistObject>::obj_enc(buf, data.space_id)
}
unsafe fn obj_dec(s: &mut BufferedScanner, md: Self::Metadata) -> SDSSResult<Self::OutputType> {
let space_name =
inf::dec::utils::decode_string(s, md.space_name_l as usize)?.into_boxed_str();
Ok(DropSpaceTxnRestorePL {
uuid: md.uuid,
space_name,
})
<super::SpaceID as PersistObject>::obj_dec(s, md)
}
}
impl<'a> GNSEvent for DropSpaceTxn<'a> {
const OPC: u16 = 2;
type CommitType = DropSpaceTxnCommitPL<'a>;
type RestoreType = DropSpaceTxnRestorePL;
type RestoreType = super::SpaceIDRes;
fn update_global_state(
DropSpaceTxnRestorePL { uuid, space_name }: Self::RestoreType,
super::SpaceIDRes { uuid, name }: Self::RestoreType,
gns: &GlobalNS,
) -> TransactionResult<()> {
let mut wgns = gns.spaces().write();
match wgns.entry(space_name) {
match wgns.entry(name) {
std::collections::hash_map::Entry::Occupied(oe) => {
if oe.get().get_uuid() == uuid {
oe.remove_entry();

Loading…
Cancel
Save