Ensure PID locks prevent corruption

next
Sayan Nandan 10 months ago
parent f923a4fc39
commit 3cdd814067
No known key found for this signature in database
GPG Key ID: 42EEDF4AE9D96B54

1
Cargo.lock generated

@ -1244,7 +1244,6 @@ name = "skyd"
version = "0.8.0"
dependencies = [
"bytes",
"cc",
"crc",
"crossbeam-epoch",
"env_logger",

@ -1,227 +0,0 @@
#!/usr/bin/perl -w
=pod
All credits for the random unicode string generation logic go to Paul Sarena who released
the original version here: https://github.com/bits/UTF-8-Unicode-Test-Documents and released
it under the BSD 3-Clause "New" or "Revised" License
=cut
use strict;
use warnings qw( FATAL utf8 );
use utf8; # tell Perl parser there are non-ASCII characters in this lexical scope
use open qw( :encoding(UTF-8) :std ); # Declare that anything that opens a filehandles within this lexical scope is to assume that that stream is encoded in UTF-8 unless you tell it otherwise
use Encode;
use HTML::Entities;
my $html_pre = q|<?xml version="1.0" encoding="utf-8"?>
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
<html xmlns="http://www.w3.org/1999/xhtml" xml:lang="en" lang="en">
<head>
<title>UTF-8 Codepoint Sequence</title>
</head>
<body>|;
my $html_post = q|</body>
</html>|;
my $output_directory = './utf8/';
my $utf8_seq;
# 0000FFFF Plane 0: Basic Multilingual Plane
# 100001FFFF Plane 1: Supplementary Multilingual Plane
# 200002FFFF Plane 2: Supplementary Ideographic Plane
# 30000DFFFF Planes 313: Unassigned
# E0000EFFFF Plane 14: Supplement­ary Special-purpose Plane
# F000010FFFF Planes 1516: Supplement­ary Private Use Area
foreach my $separator ('', ' ') {
foreach my $end (0xFF, 0xFFF, 0xFFFF, 0x1FFFF, 0x2FFFF, 0x10FFFF) {
# UTF-8 codepoint sequence of assigned, printable codepoints
$utf8_seq = gen_seq({
start => 0x00,
end => $end,
separator => $separator,
skip_unprintable => 1,
replace_unprintable => 1,
skip_unassigned => 1,
writefiles => ($separator ? 'txt,html' : 'txt')
});
# UTF-8 codepoint sequence of assigned, printable and unprintable codepoints as-is
$utf8_seq = gen_seq({
start => 0x00,
end => $end,
separator => $separator,
skip_unprintable => 0,
replace_unprintable => 0,
skip_unassigned => 1,
writefiles => ($separator ? 'txt,html' : 'txt')
});
# UTF-8 codepoint sequence of assigned, printable and unprintable codepoints replaced
$utf8_seq = gen_seq({
start => 0x00,
end => $end,
separator => $separator,
skip_unprintable => 0,
replace_unprintable => 1,
skip_unassigned => 1,
writefiles => ($separator ? 'txt,html' : 'txt')
});
# UTF-8 codepoint sequence of assinged and unassigned, printable and unprintable codepoints as-is
$utf8_seq = gen_seq({
start => 0x00,
end => $end,
separator => $separator,
skip_unprintable => 0,
replace_unprintable => 0,
skip_unassigned => 0,
writefiles => ($separator ? 'txt,html' : 'txt')
});
# UTF-8 codepoint sequence of assinged and unassigned, printable and unprintable codepoints replaced
$utf8_seq = gen_seq({
start => 0x00,
end => $end,
separator => $separator,
skip_unprintable => 0,
replace_unprintable => 1,
skip_unassigned => 0,
writefiles => ($separator ? 'txt,html' : 'txt')
});
}
}
# print Encode::encode('UTF-8', $utf8_seq), "\n";
sub gen_seq{
my $config = shift;
$config->{start} = 0x00 unless defined $config->{start};
$config->{end} = 0x10FFFF unless defined $config->{end};
$config->{skip_unassigned} = 1 unless defined $config->{skip_unassigned};
$config->{skip_unprintable} = 1 unless defined $config->{skip_unprintable};
$config->{replace_unprintable} = 1 unless defined $config->{replace_unprintable};
$config->{separator} = ' ' unless defined $config->{separator};
$config->{newlines_every} = 50 unless defined $config->{newlines_every};
$config->{writefiles} = 'text,html' unless defined $config->{writefiles};
my $utf8_seq;
my $codepoints_this_line = 0;
my $codepoints_printed = 0;
for my $i ($config->{start} .. $config->{end}) {
next if ($i >= 0xD800 && $i <= 0xDFFF); # high and low surrogate halves used by UTF-16 (U+D800 through U+DFFF) are not legal Unicode values, and the UTF-8 encoding of them is an invalid byte sequence
next if ($i >= 0xFDD0 && $i <= 0xFDEF); # Non-characters
next if ( # Non-characters
$i == 0xFFFE || $i == 0xFFFF ||
$i == 0x1FFFE || $i == 0x1FFFF ||
$i == 0x2FFFE || $i == 0x2FFFF ||
$i == 0x3FFFE || $i == 0x3FFFF ||
$i == 0x4FFFE || $i == 0x4FFFF ||
$i == 0x5FFFE || $i == 0x5FFFF ||
$i == 0x6FFFE || $i == 0x6FFFF ||
$i == 0x7FFFE || $i == 0x7FFFF ||
$i == 0x8FFFE || $i == 0x8FFFF ||
$i == 0x9FFFE || $i == 0x9FFFF ||
$i == 0xaFFFE || $i == 0xAFFFF ||
$i == 0xbFFFE || $i == 0xBFFFF ||
$i == 0xcFFFE || $i == 0xCFFFF ||
$i == 0xdFFFE || $i == 0xDFFFF ||
$i == 0xeFFFE || $i == 0xEFFFF ||
$i == 0xfFFFE || $i == 0xFFFFF ||
$i == 0x10FFFE || $i == 0x10FFFF
);
my $codepoint = chr($i);
# skip unassiggned codepoints
next if $config->{skip_unassigned} && $codepoint !~ /^\p{Assigned}/o;
if ( $codepoint =~ /^\p{IsPrint}/o ) {
$utf8_seq .= $codepoint;
} else { # not printable
next if $config->{skip_unprintable};
# include unprintable or replace it
$utf8_seq .= $config->{replace_unprintable} ? '<27>' : $codepoint;
}
$codepoints_printed++;
if ($config->{separator}) {
if ($config->{newlines_every} && $codepoints_this_line++ == $config->{newlines_every}) {
$utf8_seq .= "\n";
$codepoints_this_line = 0;
} else {
$utf8_seq .= $config->{separator};
}
}
}
utf8::upgrade($utf8_seq);
if ($config->{writefiles}) {
my $filebasename = 'utf8_sequence_' .
(sprintf '%#x', $config->{start}) .
'-' .
(sprintf '%#x', $config->{end}) .
($config->{skip_unassigned} ? '_assigned' : '_including-unassigned') .
($config->{skip_unprintable} ? '_printable' : '_including-unprintable') .
(!$config->{skip_unprintable} ?
($config->{replace_unprintable} ? '-replaced' : '-asis') :
''
) .
($config->{separator} ?
($config->{newlines_every} ? '' : '_without-newlines') :
'_unseparated'
);
my $title = 'UTF-8 codepoint sequence' .
($config->{skip_unassigned} ? ' of assigned' : ' of assinged and unassigned') .
($config->{skip_unprintable} ? ', printable' : ', with unprintable') .
(!$config->{skip_unprintable} ?
($config->{replace_unprintable} ? ' codepoints replaced' : ' codepoints as-is') :
' codepoints'
) .
' in the range ' .
(sprintf '%#x', $config->{start}) .
'-' .
(sprintf '%#x', $config->{end}) .
($config->{newlines_every} ? '' : ', as a long string without newlines');
my $html_pre_custom = $html_pre;
$html_pre_custom =~ s|UTF\-8 codepoint sequence|$title|;
my $filename = ${output_directory} . ($config->{separator} ? '' : 'un') . 'separated/' . ${filebasename};
if ($config->{writefiles} =~ /te?xt/) {
open FH, ">${filename}.txt" or die "cannot open $filename: $!";
print FH $utf8_seq;
close FH;
}
if ($config->{writefiles} =~ /html/) {
open FH, ">${filename}_unescaped.html" or die "cannot open $filename: $!";
print FH $html_pre_custom, $utf8_seq, $html_post;
close FH;
}
# open FH, ">${output_directory}${filebasename}_escaped.html";
# print FH $html_pre_custom, HTML::Entities::encode_entities($utf8_seq), $html_post;
# close FH;
print "Output $title ($codepoints_printed codepoints)\n";
}
return $utf8_seq;
}

@ -1,6 +1,5 @@
[package]
authors = ["Sayan Nandan <ohsayan@outlook.com>"]
build = "build.rs"
edition = "2021"
name = "skyd"
version = "0.8.0"
@ -35,10 +34,6 @@ winapi = { version = "0.3.9", features = ["fileapi"] }
# external deps
libc = "0.2.147"
[target.'cfg(unix)'.build-dependencies]
# external deps
cc = "1.0.82"
[dev-dependencies]
# external deps
rand = "0.8.5"

@ -1,9 +0,0 @@
fn main() {
#[cfg(unix)]
{
println!("cargo:rerun-if-changed=native/flock-posix.c");
cc::Build::new()
.file("native/flock-posix.c")
.compile("libflock-posix.a");
}
}

@ -1,60 +0,0 @@
/*
* Created on Fri Aug 07 2020
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2020, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
#include <errno.h>
#include <sys/file.h>
/* Acquire an exclusive lock for a file with the given descriptor */
int lock_exclusive(int descriptor) {
if (descriptor < 0) {
return EBADF;
}
if (flock(descriptor, LOCK_EX) == -1) {
return errno;
}
return 0;
}
int try_lock_exclusive(int descriptor) {
if (descriptor < 0) {
return EBADF;
}
if (flock(descriptor, LOCK_EX | LOCK_NB) == -1) {
return errno;
}
return 0;
}
/* Unlock a file with the given descriptor */
int unlock(int descriptor) {
if (descriptor < 0) {
return EBADF;
}
if (flock(descriptor, LOCK_UN) == -1) {
return errno;
}
return 0;
}

@ -137,7 +137,7 @@ async fn run_blocking_stmt(
let d_s = (drop & Token![space].eq(a) & last_id) as u8 * 6;
let d_m = (drop & Token![model].eq(a) & last_id) as u8 * 7;
let fc = sysctl as u8 | c_s | c_m | a_s | a_m | d_s | d_m;
state.cursor_ahead();
state.cursor_ahead_if(!sysctl);
static BLK_EXEC: [fn(Global, &ClientLocalState, RawSlice<Token<'static>>) -> QueryResult<()>;
8] = [
|_, _, _| Err(QueryError::QLUnknownStatement), // unknown
@ -174,7 +174,7 @@ fn blocking_exec_sysctl(
/*
currently supported: sysctl create user, sysctl drop user
*/
if state.remaining() != 2 {
if state.remaining() < 2 {
return Err(QueryError::QLInvalidSyntax);
}
let (a, b) = (state.fw_read(), state.fw_read());

@ -154,6 +154,16 @@ impl<Fs: RawFSInterface> SystemStore<Fs> {
_fs: PhantomData,
}
}
fn _try_sync_or(&self, auth: &mut SysAuth, rb: impl FnOnce(&mut SysAuth)) -> QueryResult<()> {
match self.sync_db(auth) {
Ok(()) => Ok(()),
Err(e) => {
error!("failed to sync system store: {e}");
rb(auth);
Err(e.into())
}
}
}
/// Create a new user with the given details
pub fn create_new_user(&self, username: String, password: String) -> QueryResult<()> {
// TODO(@ohsayan): we want to be very careful with this
@ -166,10 +176,9 @@ impl<Fs: RawFSInterface> SystemStore<Fs> {
.unwrap()
.into_boxed_slice(),
));
self.sync_db_or_rollback(|| {
self._try_sync_or(&mut auth, |auth| {
auth.users.remove(_username.as_str());
})?;
Ok(())
})
}
Entry::Occupied(_) => Err(QueryError::SysAuthError),
}
@ -181,12 +190,9 @@ impl<Fs: RawFSInterface> SystemStore<Fs> {
return Err(QueryError::SysAuthError);
}
match auth.users.remove_entry(username) {
Some((username, user)) => {
self.sync_db_or_rollback(|| {
Some((username, user)) => self._try_sync_or(&mut auth, |auth| {
let _ = auth.users.insert(username, user);
})?;
Ok(())
}
}),
None => Err(QueryError::SysAuthError),
}
}

@ -38,7 +38,7 @@ use crate::engine::{
fn parse<'a, Qd: QueryData<'a>>(state: &mut State<'a, Qd>) -> QueryResult<UserMeta<'a>> {
/*
user add [username] with { password: [password], ... }
[username] with { password: [password], ... }
^cursor
7 tokens
*/
@ -61,7 +61,7 @@ fn parse<'a, Qd: QueryData<'a>>(state: &mut State<'a, Qd>) -> QueryResult<UserMe
token_buffer[0].uck_read_lit()
};
state.poison_if_not(maybe_username.kind().tag_class() == TagClass::Str);
if state.not_exhausted() & !state.okay() {
if state.not_exhausted() | !state.okay() {
// we shouldn't have more tokens
return Err(QueryError::QLInvalidSyntax);
}

@ -82,17 +82,8 @@ impl<Fs: RawFSInterface> SystemStore<Fs> {
) -> RuntimeResult<(Self, SystemStoreInitState)> {
Self::open_with_name(Self::SYSDB_PATH, Self::SYSDB_COW_PATH, auth, run_mode)
}
pub fn sync_db_or_rollback(&self, rb: impl FnOnce()) -> RuntimeResult<()> {
match self.sync_db() {
Ok(()) => Ok(()),
Err(e) => {
rb();
Err(e)
}
}
}
pub fn sync_db(&self) -> RuntimeResult<()> {
self._sync_with(Self::SYSDB_PATH, Self::SYSDB_COW_PATH)
pub fn sync_db(&self, auth: &SysAuth) -> RuntimeResult<()> {
self._sync_with(Self::SYSDB_PATH, Self::SYSDB_COW_PATH, auth)
}
pub fn open_with_name(
sysdb_name: &str,
@ -103,7 +94,7 @@ impl<Fs: RawFSInterface> SystemStore<Fs> {
match SDSSFileIO::open_or_create_perm_rw::<spec::SysDBV1>(sysdb_name)? {
FileOpen::Created(new) => {
let me = Self::_new(SysConfig::new_auth(auth, run_mode));
me._sync(new)?;
me._sync(new, &me.system_store().auth_data().read())?;
Ok((me, SystemStoreInitState::Created))
}
FileOpen::Existing((ex, _)) => {
@ -114,7 +105,7 @@ impl<Fs: RawFSInterface> SystemStore<Fs> {
}
impl<Fs: RawFSInterface> SystemStore<Fs> {
fn _sync(&self, mut f: SDSSFileIO<Fs>) -> RuntimeResult<()> {
fn _sync(&self, mut f: SDSSFileIO<Fs>, auth: &SysAuth) -> RuntimeResult<()> {
let cfg = self.system_store();
// prepare our flat file
let mut map: DictGeneric = into_dict!(
@ -125,7 +116,6 @@ impl<Fs: RawFSInterface> SystemStore<Fs> {
Self::SYS_KEY_AUTH => DictGeneric::new(),
);
let auth_key = map.get_mut(Self::SYS_KEY_AUTH).unwrap();
let auth = cfg.auth_data().read();
let auth_key = auth_key.as_dict_mut().unwrap();
auth_key.insert(
Self::SYS_KEY_AUTH_USERS.into(),
@ -148,9 +138,9 @@ impl<Fs: RawFSInterface> SystemStore<Fs> {
let buf = super::inf::enc::enc_dict_full::<super::inf::map::GenericDictSpec>(&map);
f.fsynced_write(&buf)
}
fn _sync_with(&self, target: &str, cow: &str) -> RuntimeResult<()> {
fn _sync_with(&self, target: &str, cow: &str, auth: &SysAuth) -> RuntimeResult<()> {
let f = SDSSFileIO::create::<spec::SysDBV1>(cow)?;
self._sync(f)?;
self._sync(f, auth)?;
Fs::fs_rename_file(cow, target)
}
fn restore_and_sync(
@ -185,7 +175,7 @@ impl<Fs: RawFSInterface> SystemStore<Fs> {
);
let slf = Self::_new(new_syscfg);
// now sync
slf._sync_with(fname, fcow_name)?;
slf._sync_with(fname, fcow_name, &slf.system_store().auth_data().read())?;
Ok((slf, state))
}
fn _restore(mut f: SDSSFileIO<Fs>, run_mode: ConfigMode) -> RuntimeResult<SysConfig> {

@ -61,6 +61,7 @@ const TEXT: &str = "
";
type IoResult<T> = std::io::Result<T>;
const SKY_PID_FILE: &str = ".sky_pid";
fn main() {
Builder::new()
@ -68,12 +69,14 @@ fn main() {
.init();
println!("{TEXT}\nSkytable v{VERSION} | {URL}\n");
let run = || {
engine::set_context_init("locking PID file");
let pid_file = util::os::FileLock::new(SKY_PID_FILE)?;
let runtime = tokio::runtime::Builder::new_multi_thread()
.thread_name("server")
.enable_all()
.build()
.unwrap();
runtime.block_on(async move {
let g = runtime.block_on(async move {
engine::set_context_init("binding system signals");
let signal = util::os::TerminationSignal::init()?;
let (config, global) = tokio::task::spawn_blocking(|| engine::load_all())
@ -82,12 +85,14 @@ fn main() {
let g = global.global.clone();
engine::start(signal, config, global).await?;
engine::RuntimeResult::Ok(g)
})
})?;
engine::RuntimeResult::Ok((pid_file, g))
};
match run() {
Ok(g) => {
Ok((_, g)) => {
info!("completing cleanup before exit");
engine::finish(g);
std::fs::remove_file(SKY_PID_FILE).expect("failed to remove PID file");
println!("Goodbye!");
}
Err(e) => {

@ -27,8 +27,6 @@
//! Dark compiler arts and hackery to defy the normal. Use at your own
//! risk
use core::mem;
#[cold]
#[inline(never)]
pub const fn cold() {}
@ -67,18 +65,6 @@ pub const fn hot<T>(v: T) -> T {
v
}
/// # Safety
/// The caller is responsible for ensuring lifetime validity
pub const unsafe fn extend_lifetime<'a, 'b, T>(inp: &'a T) -> &'b T {
mem::transmute(inp)
}
/// # Safety
/// The caller is responsible for ensuring lifetime validity
pub unsafe fn extend_lifetime_mut<'a, 'b, T>(inp: &'a mut T) -> &'b mut T {
mem::transmute(inp)
}
#[cold]
#[inline(never)]
pub fn cold_rerr<T, E>(e: E) -> Result<T, E> {

@ -48,39 +48,39 @@ impl FileLock {
{
use winapi::um::{
fileapi::LockFileEx,
minwinbase::{LOCKFILE_EXCLUSIVE_LOCK, OVERLAPPED},
minwinbase::{LOCKFILE_EXCLUSIVE_LOCK, LOCKFILE_FAIL_IMMEDIATELY, OVERLAPPED},
winnt::HANDLE,
};
let handle = file.as_raw_handle();
let mut overlapped = OVERLAPPED::default();
let result = unsafe {
LockFileEx(
handle as HANDLE,
LOCKFILE_EXCLUSIVE_LOCK,
LOCKFILE_EXCLUSIVE_LOCK | LOCKFILE_FAIL_IMMEDIATELY,
0,
u32::MAX,
u32::MAX,
&mut overlapped,
)
};
if result == 0 {
return Err(io::Error::last_os_error());
return Err(io::Error::new(
io::ErrorKind::AlreadyExists,
"file is already locked",
));
}
return Ok(Self { file, handle });
}
#[cfg(unix)]
{
use libc::{flock, LOCK_EX};
let result = unsafe { flock(file.as_raw_fd(), LOCK_EX) };
use libc::{flock, LOCK_EX, LOCK_NB};
let result = unsafe { flock(file.as_raw_fd(), LOCK_EX | LOCK_NB) };
if result != 0 {
return Err(io::Error::last_os_error());
return Err(io::Error::new(
io::ErrorKind::AlreadyExists,
"file is already locked",
));
}
return Ok(Self { file });
}
}

@ -24,19 +24,17 @@
*
*/
use std::io::Read;
use rand::seq::SliceRandom;
use {
rand::{
distributions::{uniform::SampleUniform, Alphanumeric},
rngs::ThreadRng,
seq::SliceRandom,
Rng,
},
std::{
collections::hash_map::RandomState,
hash::{BuildHasher, Hash, Hasher},
io::Read,
},
};

Loading…
Cancel
Save