From 891252a89d55138d9c27e7edaa86705e3127136e Mon Sep 17 00:00:00 2001 From: Sayan Nandan Date: Thu, 24 Aug 2023 06:32:46 +0000 Subject: [PATCH] Add remaining DDL txns --- server/src/engine/core/space.rs | 2 +- server/src/engine/storage/v1/inf/map.rs | 65 +++ server/src/engine/txn/gns/mod.rs | 61 ++- server/src/engine/txn/gns/model.rs | 535 ++++++++++++++++++++++-- server/src/engine/txn/gns/space.rs | 79 ++-- 5 files changed, 654 insertions(+), 88 deletions(-) diff --git a/server/src/engine/core/space.rs b/server/src/engine/core/space.rs index 86169d7c..d282d837 100644 --- a/server/src/engine/core/space.rs +++ b/server/src/engine/core/space.rs @@ -92,7 +92,7 @@ impl Space { pub fn get_uuid(&self) -> Uuid { self.uuid } - pub(super) fn models(&self) -> &RWLIdx, Model> { + pub fn models(&self) -> &RWLIdx, Model> { &self.mns } pub fn metadata(&self) -> &SpaceMeta { diff --git a/server/src/engine/storage/v1/inf/map.rs b/server/src/engine/storage/v1/inf/map.rs index 7fdc3b0a..40949bcb 100644 --- a/server/src/engine/storage/v1/inf/map.rs +++ b/server/src/engine/storage/v1/inf/map.rs @@ -412,3 +412,68 @@ impl PersistMapSpec for FieldMapSpec { unimplemented!() } } + +// TODO(@ohsayan): common trait for k/v associations, independent of underlying maptype +pub struct FieldMapSpecST; +impl PersistMapSpec for FieldMapSpecST { + type MapIter<'a> = std::collections::hash_map::Iter<'a, Box, Field>; + type MapType = std::collections::HashMap, Field>; + type EntryMD = FieldMapEntryMD; + type Key = Box; + type Value = Field; + const ENC_COUPLED: bool = false; + const DEC_COUPLED: bool = false; + fn _get_iter<'a>(m: &'a Self::MapType) -> Self::MapIter<'a> { + m.iter() + } + fn pretest_entry_metadata(scanner: &BufferedScanner) -> bool { + scanner.has_left(sizeof!(u64, 3) + 1) + } + fn pretest_entry_data(scanner: &BufferedScanner, md: &Self::EntryMD) -> bool { + scanner.has_left(md.field_id_l as usize) // TODO(@ohsayan): we can enforce way more here such as atleast one field etc + } + fn entry_md_enc(buf: &mut VecU8, key: &Self::Key, val: &Self::Value) { + buf.extend(key.len().u64_bytes_le()); + buf.extend(0u64.to_le_bytes()); // TODO(@ohsayan): props + buf.extend(val.layers().len().u64_bytes_le()); + buf.push(val.is_nullable() as u8); + } + unsafe fn entry_md_dec(scanner: &mut BufferedScanner) -> Option { + Some(FieldMapEntryMD::new( + u64::from_le_bytes(scanner.next_chunk()), + u64::from_le_bytes(scanner.next_chunk()), + u64::from_le_bytes(scanner.next_chunk()), + scanner.next_byte(), + )) + } + fn enc_key(buf: &mut VecU8, key: &Self::Key) { + buf.extend(key.as_bytes()); + } + fn enc_val(buf: &mut VecU8, val: &Self::Value) { + for layer in val.layers() { + super::obj::LayerRef::default_full_enc(buf, super::obj::LayerRef(layer)) + } + } + unsafe fn dec_key(scanner: &mut BufferedScanner, md: &Self::EntryMD) -> Option { + inf::dec::utils::decode_string(scanner, md.field_id_l as usize) + .map(|s| s.into_boxed_str()) + .ok() + } + unsafe fn dec_val(scanner: &mut BufferedScanner, md: &Self::EntryMD) -> Option { + super::obj::FieldRef::obj_dec( + scanner, + FieldMD::new(md.field_prop_c, md.field_layer_c, md.null), + ) + .ok() + } + // unimplemented + fn enc_entry(_: &mut VecU8, _: &Self::Key, _: &Self::Value) { + unimplemented!() + } + unsafe fn dec_entry( + _: &mut BufferedScanner, + _: Self::EntryMD, + ) -> Option<(Self::Key, Self::Value)> { + unimplemented!() + } +} diff --git a/server/src/engine/txn/gns/mod.rs b/server/src/engine/txn/gns/mod.rs index f0b705a3..b800295e 100644 --- a/server/src/engine/txn/gns/mod.rs +++ b/server/src/engine/txn/gns/mod.rs @@ -26,14 +26,18 @@ use { super::{TransactionError, TransactionResult}, - crate::engine::{ - core::GlobalNS, - storage::v1::{ - inf::{self, PersistObject}, - BufferedScanner, JournalAdapter, JournalWriter, + crate::{ + engine::{ + core::GlobalNS, + data::uuid::Uuid, + storage::v1::{ + inf::{self, PersistObject}, + BufferedScanner, JournalAdapter, JournalWriter, SDSSResult, + }, }, + util::EndianQW, }, - std::fs::File, + std::{fs::File, marker::PhantomData}, }; mod model; @@ -121,3 +125,48 @@ where /// Update the global state from the restored event fn update_global_state(restore: Self::RestoreType, gns: &GlobalNS) -> TransactionResult<()>; } + +#[derive(Debug, Clone, Copy)] +pub struct SpaceIDRef<'a> { + uuid: Uuid, + name: &'a str, +} +pub struct SpaceIDRes { + uuid: Uuid, + name: Box, +} +struct SpaceID<'a>(PhantomData>); +pub struct SpaceIDMD { + uuid: Uuid, + space_name_l: u64, +} + +impl<'a> PersistObject for SpaceID<'a> { + const METADATA_SIZE: usize = sizeof!(u128) + sizeof!(u64); + type InputType = SpaceIDRef<'a>; + type OutputType = SpaceIDRes; + type Metadata = SpaceIDMD; + fn pretest_can_dec_object(scanner: &BufferedScanner, md: &Self::Metadata) -> bool { + scanner.has_left(md.space_name_l as usize) + } + fn meta_enc(buf: &mut Vec, data: Self::InputType) { + buf.extend(data.uuid.to_le_bytes()); + buf.extend(data.name.len().u64_bytes_le()); + } + unsafe fn meta_dec(scanner: &mut BufferedScanner) -> SDSSResult { + Ok(SpaceIDMD { + uuid: Uuid::from_bytes(scanner.next_chunk()), + space_name_l: u64::from_le_bytes(scanner.next_chunk()), + }) + } + fn obj_enc(buf: &mut Vec, data: Self::InputType) { + buf.extend(data.name.as_bytes()); + } + unsafe fn obj_dec(s: &mut BufferedScanner, md: Self::Metadata) -> SDSSResult { + let str = inf::dec::utils::decode_string(s, md.space_name_l as usize)?; + Ok(SpaceIDRes { + uuid: md.uuid, + name: str.into_boxed_str(), + }) + } +} diff --git a/server/src/engine/txn/gns/model.rs b/server/src/engine/txn/gns/model.rs index 77c9d6a2..85792239 100644 --- a/server/src/engine/txn/gns/model.rs +++ b/server/src/engine/txn/gns/model.rs @@ -25,15 +25,20 @@ */ use { - super::GNSEvent, + super::{GNSEvent, TransactionResult}, crate::{ engine::{ - core::model::{delta::IRModel, Model}, + core::{ + model::{delta::IRModel, Field, Model}, + space::Space, + GlobalNS, + }, data::uuid::Uuid, - idx::STIndex, + idx::{IndexST, IndexSTSeqCns, STIndex, STIndexSeq}, + ql::lex::Ident, storage::v1::{ - inf::{self, obj, PersistObject}, - BufferedScanner, SDSSResult, + inf::{self, map, obj, PersistObject}, + BufferedScanner, SDSSError, SDSSResult, }, txn::TransactionError, }, @@ -42,6 +47,91 @@ use { std::marker::PhantomData, }; +pub struct ModelID<'a>(PhantomData<&'a ()>); +#[derive(Debug, Clone, Copy)] +pub struct ModelIDRef<'a> { + model_name: &'a str, + model_uuid: Uuid, + model_version: u64, +} +pub struct ModelIDRes { + model_name: Box, + model_uuid: Uuid, + model_version: u64, +} +pub struct ModelIDMD { + model_name_l: u64, + model_version: u64, + model_uuid: Uuid, +} + +impl<'a> PersistObject for ModelID<'a> { + const METADATA_SIZE: usize = sizeof!(u64, 2) + sizeof!(u128); + type InputType = ModelIDRef<'a>; + type OutputType = ModelIDRes; + type Metadata = ModelIDMD; + fn pretest_can_dec_object(scanner: &BufferedScanner, md: &Self::Metadata) -> bool { + scanner.has_left(md.model_name_l as usize) + } + fn meta_enc(buf: &mut Vec, data: Self::InputType) { + buf.extend(data.model_name.len().u64_bytes_le()); + buf.extend(data.model_version.to_le_bytes()); + buf.extend(data.model_uuid.to_le_bytes()); + } + unsafe fn meta_dec(scanner: &mut BufferedScanner) -> SDSSResult { + Ok(ModelIDMD { + model_name_l: u64::from_le_bytes(scanner.next_chunk()), + model_version: u64::from_le_bytes(scanner.next_chunk()), + model_uuid: Uuid::from_bytes(scanner.next_chunk()), + }) + } + fn obj_enc(buf: &mut Vec, data: Self::InputType) { + buf.extend(data.model_name.as_bytes()); + } + unsafe fn obj_dec(s: &mut BufferedScanner, md: Self::Metadata) -> SDSSResult { + Ok(ModelIDRes { + model_name: inf::dec::utils::decode_string(s, md.model_name_l as usize)? + .into_boxed_str(), + model_uuid: md.model_uuid, + model_version: md.model_version, + }) + } +} + +fn with_space( + gns: &GlobalNS, + space_id: &super::SpaceIDRes, + mut f: impl FnMut(&Space) -> TransactionResult, +) -> TransactionResult { + let spaces = gns.spaces().read(); + let Some(space) = spaces.st_get(&space_id.name) else { + return Err(TransactionError::OnRestoreDataMissing); + }; + if space.get_uuid() != space_id.uuid { + return Err(TransactionError::OnRestoreDataConflictMismatch); + } + f(space) +} + +fn with_model( + gns: &GlobalNS, + space_id: &super::SpaceIDRes, + model_id: &ModelIDRes, + mut f: impl FnMut(&Model) -> TransactionResult, +) -> TransactionResult { + with_space(gns, space_id, |space| { + let models = space.models().read(); + let Some(model) = models.st_get(&model_id.model_name) else { + return Err(TransactionError::OnRestoreDataMissing); + }; + if model.get_uuid() != model_id.model_uuid { + // this should have been handled by an earlier transaction + return Err(TransactionError::OnRestoreDataConflictMismatch); + } + f(model) + }) +} + /* create model */ @@ -58,8 +148,10 @@ impl<'a> CreateModelTxn<'a> { model_read: &'a IRModel<'a>, ) -> CreateModelTxnCommitPL<'a> { CreateModelTxnCommitPL { - space_name, - space_uuid, + space_id: super::SpaceIDRef { + uuid: space_uuid, + name: space_name, + }, model_name, model, model_read, @@ -69,30 +161,28 @@ impl<'a> CreateModelTxn<'a> { #[derive(Clone, Copy)] pub struct CreateModelTxnCommitPL<'a> { - space_name: &'a str, - space_uuid: Uuid, + space_id: super::SpaceIDRef<'a>, model_name: &'a str, model: &'a Model, model_read: &'a IRModel<'a>, } pub struct CreateModelTxnRestorePL { - space_name: Box, - space_uuid: Uuid, + space_id: super::SpaceIDRes, model_name: Box, model: Model, } pub struct CreateModelTxnMD { - space_name_l: u64, - space_uuid: Uuid, + space_id_meta: super::SpaceIDMD, model_name_l: u64, model_meta: as PersistObject>::Metadata, } impl<'a> PersistObject for CreateModelTxn<'a> { - const METADATA_SIZE: usize = - sizeof!(u64, 2) + sizeof!(u128) + as PersistObject>::METADATA_SIZE; + const METADATA_SIZE: usize = ::METADATA_SIZE + + sizeof!(u64) + + as PersistObject>::METADATA_SIZE; type InputType = CreateModelTxnCommitPL<'a>; type OutputType = CreateModelTxnRestorePL; type Metadata = CreateModelTxnMD; @@ -100,42 +190,44 @@ impl<'a> PersistObject for CreateModelTxn<'a> { scanner.has_left((md.model_meta.p_key_len() + md.model_name_l) as usize) } fn meta_enc(buf: &mut Vec, data: Self::InputType) { - buf.extend(data.space_name.len().u64_bytes_le()); - buf.extend(data.space_uuid.to_le_bytes()); + // space ID + ::meta_enc(buf, data.space_id); + // model name buf.extend(data.model_name.len().u64_bytes_le()); + // model meta dump ::meta_enc( buf, obj::ModelLayoutRef::from((data.model, data.model_read)), ) } unsafe fn meta_dec(scanner: &mut BufferedScanner) -> SDSSResult { - let space_name_l = u64::from_le_bytes(scanner.next_chunk()); - let space_uuid = Uuid::from_bytes(scanner.next_chunk()); + let space_id = ::meta_dec(scanner)?; let model_name_l = u64::from_le_bytes(scanner.next_chunk()); let model_meta = ::meta_dec(scanner)?; Ok(CreateModelTxnMD { - space_name_l, - space_uuid, + space_id_meta: space_id, model_name_l, model_meta, }) } fn obj_enc(buf: &mut Vec, data: Self::InputType) { + // space id dump + ::obj_enc(buf, data.space_id); + // model name buf.extend(data.model_name.as_bytes()); + // model dump ::obj_enc( buf, obj::ModelLayoutRef::from((data.model, data.model_read)), ) } unsafe fn obj_dec(s: &mut BufferedScanner, md: Self::Metadata) -> SDSSResult { - let space_name = - inf::dec::utils::decode_string(s, md.space_name_l as usize)?.into_boxed_str(); + let space_id = ::obj_dec(s, md.space_id_meta)?; let model_name = inf::dec::utils::decode_string(s, md.model_name_l as usize)?.into_boxed_str(); let model = ::obj_dec(s, md.model_meta)?; Ok(CreateModelTxnRestorePL { - space_name, - space_uuid: md.space_uuid, + space_id, model_name, model, }) @@ -148,12 +240,11 @@ impl<'a> GNSEvent for CreateModelTxn<'a> { type RestoreType = CreateModelTxnRestorePL; fn update_global_state( CreateModelTxnRestorePL { - space_name, - space_uuid, + space_id, model_name, model, }: Self::RestoreType, - gns: &crate::engine::core::GlobalNS, + gns: &GlobalNS, ) -> crate::engine::txn::TransactionResult<()> { let rgns = gns.spaces().read(); /* @@ -164,8 +255,8 @@ impl<'a> GNSEvent for CreateModelTxn<'a> { There is no evident way about how this is going to be handled, but the ideal way would be to keep versioned index of schemas. */ - match rgns.st_get(&space_name) { - Some(space) if space.get_uuid() == space_uuid => { + match rgns.st_get(&space_id.name) { + Some(space) if space.get_uuid() == space_id.uuid => { if space._create_model(&model_name, model).is_ok() { Ok(()) } else { @@ -177,3 +268,387 @@ impl<'a> GNSEvent for CreateModelTxn<'a> { } } } + +/* + alter model add +*/ + +pub struct AlterModelAddTxn<'a>(PhantomData<&'a ()>); +#[derive(Debug, Clone, Copy)] +pub struct AlterModelAddTxnCommitPL<'a> { + space_id: super::SpaceIDRef<'a>, + model_id: ModelIDRef<'a>, + new_fields: &'a IndexSTSeqCns, Field>, +} +pub struct AlterModelAddTxnMD { + space_id_meta: super::SpaceIDMD, + model_id_meta: ModelIDMD, + new_field_c: u64, +} +pub struct AlterModelAddTxnRestorePL { + space_id: super::SpaceIDRes, + model_id: ModelIDRes, + new_fields: IndexSTSeqCns, Field>, +} +impl<'a> PersistObject for AlterModelAddTxn<'a> { + const METADATA_SIZE: usize = ::METADATA_SIZE + + ::METADATA_SIZE + + sizeof!(u64); + type InputType = AlterModelAddTxnCommitPL<'a>; + type OutputType = AlterModelAddTxnRestorePL; + type Metadata = AlterModelAddTxnMD; + fn pretest_can_dec_object(scanner: &BufferedScanner, md: &Self::Metadata) -> bool { + scanner.has_left((md.space_id_meta.space_name_l + md.model_id_meta.model_name_l) as usize) + } + fn meta_enc(buf: &mut Vec, data: Self::InputType) { + ::meta_enc(buf, data.space_id); + ::meta_enc(buf, data.model_id); + buf.extend(data.new_fields.st_len().u64_bytes_le()); + } + unsafe fn meta_dec(scanner: &mut BufferedScanner) -> SDSSResult { + let space_id_meta = ::meta_dec(scanner)?; + let model_id_meta = ::meta_dec(scanner)?; + let new_field_c = u64::from_le_bytes(scanner.next_chunk()); + Ok(AlterModelAddTxnMD { + space_id_meta, + model_id_meta, + new_field_c, + }) + } + fn obj_enc(buf: &mut Vec, data: Self::InputType) { + ::obj_enc(buf, data.space_id); + ::obj_enc(buf, data.model_id); + as PersistObject>::obj_enc(buf, data.new_fields); + } + unsafe fn obj_dec(s: &mut BufferedScanner, md: Self::Metadata) -> SDSSResult { + let space_id = ::obj_dec(s, md.space_id_meta)?; + let model_id = ::obj_dec(s, md.model_id_meta)?; + let new_fields = as PersistObject>::obj_dec( + s, + map::MapIndexSizeMD(md.new_field_c as usize), + )?; + Ok(AlterModelAddTxnRestorePL { + space_id, + model_id, + new_fields, + }) + } +} + +impl<'a> GNSEvent for AlterModelAddTxn<'a> { + const OPC: u16 = 4; + type CommitType = AlterModelAddTxnCommitPL<'a>; + type RestoreType = AlterModelAddTxnRestorePL; + fn update_global_state( + AlterModelAddTxnRestorePL { + space_id, + model_id, + new_fields, + }: Self::RestoreType, + gns: &GlobalNS, + ) -> crate::engine::txn::TransactionResult<()> { + with_model(gns, &space_id, &model_id, |model| { + let mut wmodel = model.intent_write_model(); + for (i, (field_name, field)) in new_fields.stseq_ord_kv().enumerate() { + if !wmodel + .fields_mut() + .st_insert(field_name.to_owned(), field.clone()) + { + // rollback; corrupted + new_fields.stseq_ord_key().take(i).for_each(|field_id| { + let _ = wmodel.fields_mut().st_delete(field_id); + }); + return Err(TransactionError::OnRestoreDataConflictMismatch); + } + } + Ok(()) + }) + } +} + +/* + alter model remove +*/ + +pub struct AlterModelRemoveTxn<'a>(PhantomData<&'a ()>); +#[derive(Debug, Clone, Copy)] +pub struct AlterModelRemoveTxnCommitPL<'a> { + space_id: super::SpaceIDRef<'a>, + model_id: ModelIDRef<'a>, + removed_fields: &'a [Ident<'a>], +} +pub struct AlterModelRemoveTxnMD { + space_id_meta: super::SpaceIDMD, + model_id_meta: ModelIDMD, + remove_field_c: u64, +} +pub struct AlterModelRemoveTxnRestorePL { + space_id: super::SpaceIDRes, + model_id: ModelIDRes, + removed_fields: Box<[Box]>, +} + +impl<'a> PersistObject for AlterModelRemoveTxn<'a> { + const METADATA_SIZE: usize = ::METADATA_SIZE + + ::METADATA_SIZE + + sizeof!(u64); + type InputType = AlterModelRemoveTxnCommitPL<'a>; + type OutputType = AlterModelRemoveTxnRestorePL; + type Metadata = AlterModelRemoveTxnMD; + fn pretest_can_dec_object(scanner: &BufferedScanner, md: &Self::Metadata) -> bool { + scanner.has_left((md.space_id_meta.space_name_l + md.model_id_meta.model_name_l) as usize) + } + fn meta_enc(buf: &mut Vec, data: Self::InputType) { + ::meta_enc(buf, data.space_id); + ::meta_enc(buf, data.model_id); + buf.extend(data.removed_fields.len().u64_bytes_le()); + } + unsafe fn meta_dec(scanner: &mut BufferedScanner) -> SDSSResult { + let space_id_meta = ::meta_dec(scanner)?; + let model_id_meta = ::meta_dec(scanner)?; + Ok(AlterModelRemoveTxnMD { + space_id_meta, + model_id_meta, + remove_field_c: u64::from_le_bytes(scanner.next_chunk()), + }) + } + fn obj_enc(buf: &mut Vec, data: Self::InputType) { + ::obj_enc(buf, data.space_id); + ::obj_enc(buf, data.model_id); + for field in data.removed_fields { + buf.extend(field.len().u64_bytes_le()); + buf.extend(field.as_bytes()); + } + } + unsafe fn obj_dec(s: &mut BufferedScanner, md: Self::Metadata) -> SDSSResult { + let space_id = ::obj_dec(s, md.space_id_meta)?; + let model_id = ::obj_dec(s, md.model_id_meta)?; + let mut removed_fields = Vec::with_capacity(md.remove_field_c as usize); + while !s.eof() + & (removed_fields.len() as u64 != md.remove_field_c) + & s.has_left(sizeof!(u64)) + { + let len = u64::from_le_bytes(s.next_chunk()) as usize; + if !s.has_left(len) { + break; + } + removed_fields.push(inf::dec::utils::decode_string(s, len)?.into_boxed_str()); + } + if removed_fields.len() as u64 != md.remove_field_c { + return Err(SDSSError::InternalDecodeStructureCorruptedPayload); + } + Ok(AlterModelRemoveTxnRestorePL { + space_id, + model_id, + removed_fields: removed_fields.into_boxed_slice(), + }) + } +} + +impl<'a> GNSEvent for AlterModelRemoveTxn<'a> { + const OPC: u16 = 5; + type CommitType = AlterModelRemoveTxnCommitPL<'a>; + type RestoreType = AlterModelRemoveTxnRestorePL; + fn update_global_state( + AlterModelRemoveTxnRestorePL { + space_id, + model_id, + removed_fields, + }: Self::RestoreType, + gns: &GlobalNS, + ) -> crate::engine::txn::TransactionResult<()> { + with_model(gns, &space_id, &model_id, |model| { + let mut iwm = model.intent_write_model(); + let mut removed_fields_rb = vec![]; + for removed_field in removed_fields.iter() { + match iwm.fields_mut().st_delete_return(removed_field) { + Some(field) => { + removed_fields_rb.push((removed_field as &str, field)); + } + None => { + // rollback + removed_fields_rb.into_iter().for_each(|(field_id, field)| { + let _ = iwm.fields_mut().st_insert(field_id.into(), field); + }); + return Err(TransactionError::OnRestoreDataConflictMismatch); + } + } + } + Ok(()) + }) + } +} + +/* + alter model update +*/ + +pub struct AlterModelUpdateTxn<'a>(PhantomData<&'a ()>); +#[derive(Debug, Clone, Copy)] +pub struct AlterModelUpdateTxnCommitPL<'a> { + space_id: super::SpaceIDRef<'a>, + model_id: ModelIDRef<'a>, + updated_fields: &'a IndexST, Field>, +} +pub struct AlterModelUpdateTxnMD { + space_id_md: super::SpaceIDMD, + model_id_md: ModelIDMD, + updated_field_c: u64, +} +pub struct AlterModelUpdateTxnRestorePL { + space_id: super::SpaceIDRes, + model_id: ModelIDRes, + updated_fields: IndexST, Field>, +} + +impl<'a> PersistObject for AlterModelUpdateTxn<'a> { + const METADATA_SIZE: usize = ::METADATA_SIZE + + ::METADATA_SIZE + + sizeof!(u64); + type InputType = AlterModelUpdateTxnCommitPL<'a>; + type OutputType = AlterModelUpdateTxnRestorePL; + type Metadata = AlterModelUpdateTxnMD; + fn pretest_can_dec_object(scanner: &BufferedScanner, md: &Self::Metadata) -> bool { + scanner + .has_left(md.space_id_md.space_name_l as usize + md.model_id_md.model_name_l as usize) + } + fn meta_enc(buf: &mut Vec, data: Self::InputType) { + ::meta_enc(buf, data.space_id); + ::meta_enc(buf, data.model_id); + buf.extend(data.updated_fields.st_len().u64_bytes_le()); + } + unsafe fn meta_dec(scanner: &mut BufferedScanner) -> SDSSResult { + let space_id_md = ::meta_dec(scanner)?; + let model_id_md = ::meta_dec(scanner)?; + Ok(AlterModelUpdateTxnMD { + space_id_md, + model_id_md, + updated_field_c: u64::from_le_bytes(scanner.next_chunk()), + }) + } + fn obj_enc(buf: &mut Vec, data: Self::InputType) { + ::obj_enc(buf, data.space_id); + ::obj_enc(buf, data.model_id); + as PersistObject>::obj_enc( + buf, + data.updated_fields, + ); + } + unsafe fn obj_dec(s: &mut BufferedScanner, md: Self::Metadata) -> SDSSResult { + let space_id = ::obj_dec(s, md.space_id_md)?; + let model_id = ::obj_dec(s, md.model_id_md)?; + let updated_fields = as PersistObject>::obj_dec( + s, + map::MapIndexSizeMD(md.updated_field_c as usize), + )?; + Ok(AlterModelUpdateTxnRestorePL { + space_id, + model_id, + updated_fields, + }) + } +} + +impl<'a> GNSEvent for AlterModelUpdateTxn<'a> { + const OPC: u16 = 6; + type CommitType = AlterModelUpdateTxnCommitPL<'a>; + type RestoreType = AlterModelUpdateTxnRestorePL; + fn update_global_state( + AlterModelUpdateTxnRestorePL { + space_id, + model_id, + updated_fields, + }: Self::RestoreType, + gns: &GlobalNS, + ) -> TransactionResult<()> { + with_model(gns, &space_id, &model_id, |model| { + let mut iwm = model.intent_write_model(); + let mut fields_rb = vec![]; + for (field_id, field) in updated_fields.iter() { + match iwm.fields_mut().st_update_return(field_id, field.clone()) { + Some(f) => fields_rb.push((field_id as &str, f)), + None => { + // rollback + fields_rb.into_iter().for_each(|(field_id, field)| { + let _ = iwm.fields_mut().st_update(field_id, field); + }); + return Err(TransactionError::OnRestoreDataConflictMismatch); + } + } + } + Ok(()) + }) + } +} + +/* + drop model +*/ + +pub struct DropModelTxn<'a>(PhantomData<&'a ()>); +#[derive(Debug, Clone, Copy)] +pub struct DropModelTxnCommitPL<'a> { + space_id: super::SpaceIDRef<'a>, + model_id: ModelIDRef<'a>, +} +pub struct DropModelTxnMD { + space_id_md: super::SpaceIDMD, + model_id_md: ModelIDMD, +} +pub struct DropModelTxnRestorePL { + space_id: super::SpaceIDRes, + model_id: ModelIDRes, +} +impl<'a> PersistObject for DropModelTxn<'a> { + const METADATA_SIZE: usize = ::METADATA_SIZE + + ::METADATA_SIZE; + type InputType = DropModelTxnCommitPL<'a>; + type OutputType = DropModelTxnRestorePL; + type Metadata = DropModelTxnMD; + fn pretest_can_dec_object(scanner: &BufferedScanner, md: &Self::Metadata) -> bool { + scanner + .has_left(md.space_id_md.space_name_l as usize + md.model_id_md.model_name_l as usize) + } + fn meta_enc(buf: &mut Vec, data: Self::InputType) { + ::meta_enc(buf, data.space_id); + ::meta_enc(buf, data.model_id); + } + unsafe fn meta_dec(scanner: &mut BufferedScanner) -> SDSSResult { + let space_id_md = ::meta_dec(scanner)?; + let model_id_md = ::meta_dec(scanner)?; + Ok(DropModelTxnMD { + space_id_md, + model_id_md, + }) + } + fn obj_enc(buf: &mut Vec, data: Self::InputType) { + ::obj_enc(buf, data.space_id); + ::obj_enc(buf, data.model_id); + } + unsafe fn obj_dec(s: &mut BufferedScanner, md: Self::Metadata) -> SDSSResult { + let space_id = ::obj_dec(s, md.space_id_md)?; + let model_id = ::obj_dec(s, md.model_id_md)?; + Ok(DropModelTxnRestorePL { space_id, model_id }) + } +} + +impl<'a> GNSEvent for DropModelTxn<'a> { + const OPC: u16 = 7; + type CommitType = DropModelTxnCommitPL<'a>; + type RestoreType = DropModelTxnRestorePL; + fn update_global_state( + DropModelTxnRestorePL { space_id, model_id }: Self::RestoreType, + gns: &GlobalNS, + ) -> TransactionResult<()> { + with_space(gns, &space_id, |space| { + let mut wgns = space.models().write(); + match wgns.st_delete_if(&model_id.model_name, |mdl| { + mdl.get_uuid() == model_id.model_uuid + }) { + Some(true) => Ok(()), + Some(false) => return Err(TransactionError::OnRestoreDataConflictMismatch), + None => Err(TransactionError::OnRestoreDataMissing), + } + }) + } +} diff --git a/server/src/engine/txn/gns/space.rs b/server/src/engine/txn/gns/space.rs index 3febe374..865dcf9b 100644 --- a/server/src/engine/txn/gns/space.rs +++ b/server/src/engine/txn/gns/space.rs @@ -144,33 +144,30 @@ pub struct AlterSpaceTxn<'a>(PhantomData<&'a ()>); impl<'a> AlterSpaceTxn<'a> { pub const fn new_commit( - space_uuid: Uuid, - space_name: &'a str, + uuid: Uuid, + name: &'a str, space_meta: &'a DictGeneric, ) -> AlterSpaceTxnCommitPL<'a> { AlterSpaceTxnCommitPL { - space_uuid, - space_name, + space_id: super::SpaceIDRef { uuid, name }, space_meta, } } } pub struct AlterSpaceTxnMD { - uuid: Uuid, - space_name_l: u64, + space_id_meta: super::SpaceIDMD, dict_len: u64, } #[derive(Clone, Copy)] pub struct AlterSpaceTxnCommitPL<'a> { - space_uuid: Uuid, - space_name: &'a str, + space_id: super::SpaceIDRef<'a>, space_meta: &'a DictGeneric, } pub struct AlterSpaceTxnRestorePL { - space_name: Box, + space_id: super::SpaceIDRes, space_meta: DictGeneric, } @@ -180,33 +177,30 @@ impl<'a> PersistObject for AlterSpaceTxn<'a> { type OutputType = AlterSpaceTxnRestorePL; type Metadata = AlterSpaceTxnMD; fn pretest_can_dec_object(scanner: &BufferedScanner, md: &Self::Metadata) -> bool { - scanner.has_left(md.space_name_l as usize) + scanner.has_left(md.space_id_meta.space_name_l as usize) } fn meta_enc(buf: &mut Vec, data: Self::InputType) { - buf.extend(data.space_uuid.to_le_bytes()); - buf.extend(data.space_name.len().u64_bytes_le()); + ::meta_enc(buf, data.space_id); buf.extend(data.space_meta.len().u64_bytes_le()); } unsafe fn meta_dec(scanner: &mut BufferedScanner) -> SDSSResult { Ok(AlterSpaceTxnMD { - uuid: Uuid::from_bytes(scanner.next_chunk()), - space_name_l: u64::from_le_bytes(scanner.next_chunk()), + space_id_meta: ::meta_dec(scanner)?, dict_len: u64::from_le_bytes(scanner.next_chunk()), }) } fn obj_enc(buf: &mut Vec, data: Self::InputType) { - buf.extend(data.space_name.as_bytes()); + ::obj_enc(buf, data.space_id); as PersistObject>::obj_enc(buf, data.space_meta); } unsafe fn obj_dec(s: &mut BufferedScanner, md: Self::Metadata) -> SDSSResult { - let space_name = - inf::dec::utils::decode_string(s, md.space_name_l as usize)?.into_boxed_str(); + let space_id = ::obj_dec(s, md.space_id_meta)?; let space_meta = as PersistObject>::obj_dec( s, map::MapIndexSizeMD(md.dict_len as usize), )?; Ok(AlterSpaceTxnRestorePL { - space_name, + space_id, space_meta, }) } @@ -221,13 +215,13 @@ impl<'a> GNSEvent for AlterSpaceTxn<'a> { fn update_global_state( AlterSpaceTxnRestorePL { - space_name, + space_id, space_meta, }: Self::RestoreType, gns: &crate::engine::core::GlobalNS, ) -> TransactionResult<()> { let gns = gns.spaces().read(); - match gns.st_get(&space_name) { + match gns.st_get(&space_id.name) { Some(space) => { let mut wmeta = space.metadata().env().write(); space_meta @@ -248,67 +242,50 @@ impl<'a> GNSEvent for AlterSpaceTxn<'a> { pub struct DropSpaceTxn<'a>(PhantomData<&'a ()>); impl<'a> DropSpaceTxn<'a> { - pub const fn new_commit(space_name: &'a str, uuid: Uuid) -> DropSpaceTxnCommitPL<'a> { - DropSpaceTxnCommitPL { space_name, uuid } + pub const fn new_commit(name: &'a str, uuid: Uuid) -> DropSpaceTxnCommitPL<'a> { + DropSpaceTxnCommitPL { + space_id: super::SpaceIDRef { uuid, name }, + } } } -pub struct DropSpaceTxnMD { - space_name_l: u64, - uuid: Uuid, -} #[derive(Clone, Copy)] pub struct DropSpaceTxnCommitPL<'a> { - space_name: &'a str, - uuid: Uuid, -} - -pub struct DropSpaceTxnRestorePL { - uuid: Uuid, - space_name: Box, + space_id: super::SpaceIDRef<'a>, } impl<'a> PersistObject for DropSpaceTxn<'a> { const METADATA_SIZE: usize = sizeof!(u128) + sizeof!(u64); type InputType = DropSpaceTxnCommitPL<'a>; - type OutputType = DropSpaceTxnRestorePL; - type Metadata = DropSpaceTxnMD; + type OutputType = super::SpaceIDRes; + type Metadata = super::SpaceIDMD; fn pretest_can_dec_object(scanner: &BufferedScanner, md: &Self::Metadata) -> bool { scanner.has_left(md.space_name_l as usize) } fn meta_enc(buf: &mut Vec, data: Self::InputType) { - buf.extend(data.space_name.len().u64_bytes_le()); - buf.extend(data.uuid.to_le_bytes()); + ::meta_enc(buf, data.space_id); } unsafe fn meta_dec(scanner: &mut BufferedScanner) -> SDSSResult { - Ok(DropSpaceTxnMD { - space_name_l: u64::from_le_bytes(scanner.next_chunk()), - uuid: Uuid::from_bytes(scanner.next_chunk()), - }) + ::meta_dec(scanner) } fn obj_enc(buf: &mut Vec, data: Self::InputType) { - buf.extend(data.space_name.as_bytes()); + ::obj_enc(buf, data.space_id) } unsafe fn obj_dec(s: &mut BufferedScanner, md: Self::Metadata) -> SDSSResult { - let space_name = - inf::dec::utils::decode_string(s, md.space_name_l as usize)?.into_boxed_str(); - Ok(DropSpaceTxnRestorePL { - uuid: md.uuid, - space_name, - }) + ::obj_dec(s, md) } } impl<'a> GNSEvent for DropSpaceTxn<'a> { const OPC: u16 = 2; type CommitType = DropSpaceTxnCommitPL<'a>; - type RestoreType = DropSpaceTxnRestorePL; + type RestoreType = super::SpaceIDRes; fn update_global_state( - DropSpaceTxnRestorePL { uuid, space_name }: Self::RestoreType, + super::SpaceIDRes { uuid, name }: Self::RestoreType, gns: &GlobalNS, ) -> TransactionResult<()> { let mut wgns = gns.spaces().write(); - match wgns.entry(space_name) { + match wgns.entry(name) { std::collections::hash_map::Entry::Occupied(oe) => { if oe.get().get_uuid() == uuid { oe.remove_entry();