Decompose strong actions' snapshotting core

next
Sayan Nandan 3 years ago
parent b47fcc2c88
commit 62b58f1a9f

@ -25,6 +25,8 @@
*/
use crate::dbnet::connection::prelude::*;
use crate::kvengine::KVEngine;
use crate::kvengine::SingleEncoder;
use crate::util::compiler;
action! {
@ -36,39 +38,12 @@ action! {
err_if_len_is!(act, con, eq 0);
let kve = kve!(con, handle);
let key_encoder = kve.get_key_encoder();
let mut snapshots = Vec::with_capacity(act.len());
let mut err_enc = false;
let iter_stat_ok;
{
iter_stat_ok = act.as_ref().iter().all(|key| {
if compiler::likely(key_encoder.is_ok(key)) {
if let Some(snap) = kve.take_snapshot(key) {
snapshots.push(snap);
true
} else {
false
}
} else {
err_enc = true;
false
}
});
}
if iter_stat_ok {
{
let kve = kve;
let lowtable = kve.__get_inner_ref();
// nice, all keys exist; let's plonk 'em
act.zip(snapshots).for_each(|(key, snapshot)| {
// the check is very important: some thread may have updated the
// value after we snapshotted it. In that case, let this key
// be whatever the "newer" value is. Since our snapshot is a "happens-before"
// thing, this is absolutely fine
let _ = lowtable.remove_if(&key, |_, val| val.eq(&snapshot));
});
}
let (all_okay, enc_err) = {
self::snapshot_and_del(kve, key_encoder, act)
};
if all_okay {
conwrite!(con, groups::OKAY)?;
} else if compiler::unlikely(err_enc) {
} else if compiler::unlikely(enc_err) {
// the errors we love to hate: encoding error
conwrite!(con, groups::ENCODING_ERROR)?;
} else {
@ -78,3 +53,41 @@ action! {
Ok(())
}
}
/// Snapshot the current status and then delete maintaining concurrency
/// guarantees. `(all_okay, enc_err)`
fn snapshot_and_del(kve: &KVEngine, key_encoder: SingleEncoder, act: ActionIter) -> (bool, bool) {
let mut snapshots = Vec::with_capacity(act.len());
let mut err_enc = false;
let iter_stat_ok;
{
iter_stat_ok = act.as_ref().iter().all(|key| {
if compiler::likely(key_encoder.is_ok(key)) {
if let Some(snap) = kve.take_snapshot(key) {
snapshots.push(snap);
true
} else {
false
}
} else {
err_enc = true;
false
}
});
}
if iter_stat_ok {
// nice, all keys exist; let's plonk 'em
let kve = kve;
let lowtable = kve.__get_inner_ref();
act.zip(snapshots).for_each(|(key, snapshot)| {
// the check is very important: some thread may have updated the
// value after we snapshotted it. In that case, let this key
// be whatever the "newer" value is. Since our snapshot is a "happens-before"
// thing, this is absolutely fine
let _ = lowtable.remove_if(&key, |_, val| val.eq(&snapshot));
});
(true, false)
} else {
(iter_stat_ok, err_enc)
}
}

@ -26,6 +26,8 @@
use crate::corestore::Data;
use crate::dbnet::connection::prelude::*;
use crate::kvengine::DoubleEncoder;
use crate::kvengine::KVEngine;
use crate::util::compiler;
action! {
@ -40,36 +42,10 @@ action! {
}
let kve = kve!(con, handle);
let encoder = kve.get_encoder();
let mut enc_err = false;
let lowtable = kve.__get_inner_ref();
let key_iter_stat_ok;
{
key_iter_stat_ok = act.as_ref().chunks_exact(2).all(|kv| {
unsafe {
let key = kv.get_unchecked(0);
let value = kv.get_unchecked(1);
if compiler::likely(encoder.is_ok(key, value)) {
lowtable.get(key).is_none()
} else {
enc_err = true;
false
}
}
});
}
if key_iter_stat_ok {
{
let _kve = kve;
let lowtable = lowtable;
// fine, the keys were non-existent when we looked at them
while let (Some(key), Some(value)) = (act.next(), act.next()) {
if let Some(fresh) = lowtable.fresh_entry(Data::from(key)) {
fresh.insert(Data::from(value));
}
// we don't care if some other thread initialized the value we checked
// it. We expected a fresh entry, so that's what we'll check and use
}
}
let (all_okay, enc_err) = {
self::snapshot_and_insert(kve, encoder, act)
};
if all_okay {
conwrite!(con, groups::OKAY)?;
} else if compiler::unlikely(enc_err) {
conwrite!(con, groups::ENCODING_ERROR)?;
@ -79,3 +55,43 @@ action! {
Ok(())
}
}
/// Take a consistent snapshot of the database at this current point in time
/// and then mutate the entries, respecting concurrency guarantees
/// `(all_okay, enc_err)`
fn snapshot_and_insert(
kve: &KVEngine,
encoder: DoubleEncoder,
mut act: ActionIter,
) -> (bool, bool) {
let mut enc_err = false;
let lowtable = kve.__get_inner_ref();
let key_iter_stat_ok;
{
key_iter_stat_ok = act.as_ref().chunks_exact(2).all(|kv| unsafe {
let key = kv.get_unchecked(0);
let value = kv.get_unchecked(1);
if compiler::likely(encoder.is_ok(key, value)) {
lowtable.get(key).is_none()
} else {
enc_err = true;
false
}
});
}
if key_iter_stat_ok {
let _kve = kve;
let lowtable = lowtable;
// fine, the keys were non-existent when we looked at them
while let (Some(key), Some(value)) = (act.next(), act.next()) {
if let Some(fresh) = lowtable.fresh_entry(Data::from(key)) {
fresh.insert(Data::from(value));
}
// we don't care if some other thread initialized the value we checked
// it. We expected a fresh entry, so that's what we'll check and use
}
(true, false)
} else {
(key_iter_stat_ok, enc_err)
}
}

@ -26,6 +26,8 @@
use crate::corestore::Data;
use crate::dbnet::connection::prelude::*;
use crate::kvengine::DoubleEncoder;
use crate::kvengine::KVEngine;
use crate::util::compiler;
action! {
@ -40,49 +42,12 @@ action! {
}
let kve = kve!(con, handle);
let encoder = kve.get_encoder();
let mut err_enc = false;
let mut snapshots = Vec::with_capacity(act.len());
let iter_stat_ok;
{
// snapshot the values at this point in time
iter_stat_ok = act.as_ref().chunks_exact(2).all(|kv| {
unsafe {
let key = kv.get_unchecked(0);
let value = kv.get_unchecked(1);
if compiler::likely(encoder.is_ok(key, value)) {
if let Some(snapshot) = kve.take_snapshot(key) {
snapshots.push(snapshot);
true
} else {
false
}
} else {
err_enc = true;
false
}
}
});
}
if iter_stat_ok {
{
let kve = kve;
// good, so all the values existed when we snapshotted them; let's update 'em
let mut snap_cc = snapshots.into_iter();
let lowtable = kve.__get_inner_ref();
while let (Some(key), Some(value), Some(snapshot)) = (act.next(), act.next(), snap_cc.next()) {
// When we snapshotted, we looked at `snapshot`. If the value is still the
// same, then we'll update it. Otherwise, let it be
if let Some(mut mutable) = lowtable.mut_entry(Data::from(key)) {
if mutable.get().eq(&snapshot) {
mutable.insert(Data::from(value));
} else{
drop(mutable);
}
}
}
}
let (all_okay, enc_err) = {
self::snapshot_and_update(kve, encoder, act)
};
if all_okay {
conwrite!(con, groups::OKAY)?;
} else if compiler::unlikely(err_enc) {
} else if compiler::unlikely(enc_err) {
conwrite!(con, groups::ENCODING_ERROR)?;
} else {
conwrite!(con, groups::NIL)?;
@ -90,3 +55,56 @@ action! {
Ok(())
}
}
/// Take a consistent snapshot of the database at this point in time. Once snapshotting
/// completes, mutate the entries in place while keeping up with isolation guarantees
/// `(all_okay, enc_err)`
fn snapshot_and_update(
kve: &KVEngine,
encoder: DoubleEncoder,
mut act: ActionIter,
) -> (bool, bool) {
let mut err_enc = false;
let mut snapshots = Vec::with_capacity(act.len());
let iter_stat_ok;
{
// snapshot the values at this point in time
iter_stat_ok = act.as_ref().chunks_exact(2).all(|kv| unsafe {
let key = kv.get_unchecked(0);
let value = kv.get_unchecked(1);
if compiler::likely(encoder.is_ok(key, value)) {
if let Some(snapshot) = kve.take_snapshot(key) {
snapshots.push(snapshot);
true
} else {
false
}
} else {
err_enc = true;
false
}
});
}
if iter_stat_ok {
let kve = kve;
// good, so all the values existed when we snapshotted them; let's update 'em
let mut snap_cc = snapshots.into_iter();
let lowtable = kve.__get_inner_ref();
while let (Some(key), Some(value), Some(snapshot)) =
(act.next(), act.next(), snap_cc.next())
{
// When we snapshotted, we looked at `snapshot`. If the value is still the
// same, then we'll update it. Otherwise, let it be
if let Some(mut mutable) = lowtable.mut_entry(Data::from(key)) {
if mutable.get().eq(&snapshot) {
mutable.insert(Data::from(value));
} else {
drop(mutable);
}
}
}
(true, false)
} else {
(iter_stat_ok, err_enc)
}
}

Loading…
Cancel
Save