|
|
|
@ -54,7 +54,7 @@ use {
|
|
|
|
|
sq definition
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
#[derive(Debug)]
|
|
|
|
|
#[derive(Debug, PartialEq)]
|
|
|
|
|
pub struct SQuery<'a> {
|
|
|
|
|
buf: &'a [u8],
|
|
|
|
|
q_window: usize,
|
|
|
|
@ -76,7 +76,9 @@ impl<'a> SQuery<'a> {
|
|
|
|
|
scanint
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
fn scan_usize_guaranteed_termination(scanner: &mut BufferedScanner) -> Result<usize, ()> {
|
|
|
|
|
fn scan_usize_guaranteed_termination(
|
|
|
|
|
scanner: &mut BufferedScanner,
|
|
|
|
|
) -> Result<usize, ExchangeError> {
|
|
|
|
|
let mut ret = 0usize;
|
|
|
|
|
let mut stop = scanner.rounded_eq(b'\n');
|
|
|
|
|
while !scanner.eof() & !stop {
|
|
|
|
@ -89,7 +91,7 @@ fn scan_usize_guaranteed_termination(scanner: &mut BufferedScanner) -> Result<us
|
|
|
|
|
.map(|int| int.checked_add((next_byte & 0x0f) as usize))
|
|
|
|
|
{
|
|
|
|
|
Some(Some(int)) if next_byte.is_ascii_digit() => ret = int,
|
|
|
|
|
_ => return Err(()),
|
|
|
|
|
_ => return Err(ExchangeError::NotAsciiByteOrOverflow),
|
|
|
|
|
}
|
|
|
|
|
stop = scanner.rounded_eq(b'\n');
|
|
|
|
|
}
|
|
|
|
@ -100,11 +102,11 @@ fn scan_usize_guaranteed_termination(scanner: &mut BufferedScanner) -> Result<us
|
|
|
|
|
if stop {
|
|
|
|
|
Ok(ret)
|
|
|
|
|
} else {
|
|
|
|
|
Err(())
|
|
|
|
|
Err(ExchangeError::UnterminatedInteger)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[derive(Clone, Copy)]
|
|
|
|
|
#[derive(Clone, Copy, PartialEq)]
|
|
|
|
|
struct Usize {
|
|
|
|
|
v: isize,
|
|
|
|
|
}
|
|
|
|
@ -181,14 +183,14 @@ impl Usize {
|
|
|
|
|
states
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
#[derive(Debug)]
|
|
|
|
|
#[derive(Debug, PartialEq)]
|
|
|
|
|
pub enum ExchangeState {
|
|
|
|
|
Initial,
|
|
|
|
|
Simple(SQState),
|
|
|
|
|
Pipeline(PipeState),
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[derive(Debug)]
|
|
|
|
|
#[derive(Debug, PartialEq)]
|
|
|
|
|
pub struct SQState {
|
|
|
|
|
packet_s: Usize,
|
|
|
|
|
}
|
|
|
|
@ -199,7 +201,7 @@ impl SQState {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[derive(Debug)]
|
|
|
|
|
#[derive(Debug, PartialEq)]
|
|
|
|
|
pub struct PipeState {
|
|
|
|
|
packet_s: Usize,
|
|
|
|
|
}
|
|
|
|
@ -216,12 +218,22 @@ impl Default for ExchangeState {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[derive(Debug, PartialEq)]
|
|
|
|
|
pub enum ExchangeResult<'a> {
|
|
|
|
|
NewState(ExchangeState),
|
|
|
|
|
Simple(SQuery<'a>),
|
|
|
|
|
Pipeline(Pipeline<'a>),
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[derive(Debug, PartialEq, Clone, Copy)]
|
|
|
|
|
#[repr(u8)]
|
|
|
|
|
pub enum ExchangeError {
|
|
|
|
|
UnknownFirstByte,
|
|
|
|
|
NotAsciiByteOrOverflow,
|
|
|
|
|
UnterminatedInteger,
|
|
|
|
|
IncorrectQuerySizeOrMoreBytes,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub struct Exchange<'a> {
|
|
|
|
|
scanner: BufferedScanner<'a>,
|
|
|
|
|
}
|
|
|
|
@ -234,13 +246,16 @@ impl<'a> Exchange<'a> {
|
|
|
|
|
pub fn try_complete(
|
|
|
|
|
scanner: BufferedScanner<'a>,
|
|
|
|
|
state: ExchangeState,
|
|
|
|
|
) -> Result<(ExchangeResult, usize), ()> {
|
|
|
|
|
) -> Result<(ExchangeResult, usize), ExchangeError> {
|
|
|
|
|
Self::new(scanner).complete(state)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl<'a> Exchange<'a> {
|
|
|
|
|
fn complete(mut self, state: ExchangeState) -> Result<(ExchangeResult<'a>, usize), ()> {
|
|
|
|
|
fn complete(
|
|
|
|
|
mut self,
|
|
|
|
|
state: ExchangeState,
|
|
|
|
|
) -> Result<(ExchangeResult<'a>, usize), ExchangeError> {
|
|
|
|
|
match state {
|
|
|
|
|
ExchangeState::Initial => {
|
|
|
|
|
if compiler::likely(self.scanner.has_left(Self::MIN_Q_SIZE)) {
|
|
|
|
@ -251,7 +266,7 @@ impl<'a> Exchange<'a> {
|
|
|
|
|
match first_byte {
|
|
|
|
|
b'S' => self.process_simple(SQState::new(Usize::new_unflagged(0))),
|
|
|
|
|
b'P' => self.process_pipe(PipeState::new(Usize::new_unflagged(0))),
|
|
|
|
|
_ => return Err(()),
|
|
|
|
|
_ => return Err(ExchangeError::UnknownFirstByte),
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
Ok(ExchangeResult::NewState(state))
|
|
|
|
@ -262,29 +277,38 @@ impl<'a> Exchange<'a> {
|
|
|
|
|
}
|
|
|
|
|
.map(|ret| (ret, self.scanner.cursor()))
|
|
|
|
|
}
|
|
|
|
|
fn process_simple(&mut self, mut sq_state: SQState) -> Result<ExchangeResult<'a>, ()> {
|
|
|
|
|
fn process_simple(
|
|
|
|
|
&mut self,
|
|
|
|
|
mut sq_state: SQState,
|
|
|
|
|
) -> Result<ExchangeResult<'a>, ExchangeError> {
|
|
|
|
|
// try to complete the packet size if needed
|
|
|
|
|
sq_state.packet_s.update_scanned(&mut self.scanner)?;
|
|
|
|
|
if sq_state.packet_s.flag() & self.scanner.remaining_size_is(sq_state.packet_s.int()) {
|
|
|
|
|
sq_state
|
|
|
|
|
.packet_s
|
|
|
|
|
.update_scanned(&mut self.scanner)
|
|
|
|
|
.map_err(|_| ExchangeError::NotAsciiByteOrOverflow)?;
|
|
|
|
|
if sq_state.packet_s.flag() & self.scanner.has_left(sq_state.packet_s.int()) {
|
|
|
|
|
// we have the full packet size and the required data
|
|
|
|
|
let q_window = scan_usize_guaranteed_termination(&mut self.scanner)?;
|
|
|
|
|
let nonzero = (q_window != 0) & (sq_state.packet_s.int() != 0);
|
|
|
|
|
if compiler::likely(self.scanner.remaining_size_is(q_window) & nonzero) {
|
|
|
|
|
if compiler::likely(self.scanner.remaining_size_is(sq_state.packet_s.int()) & nonzero) {
|
|
|
|
|
// this check is important because the client might have given us an incorrect q size
|
|
|
|
|
Ok(ExchangeResult::Simple(SQuery::new(
|
|
|
|
|
self.scanner.current_buffer(),
|
|
|
|
|
q_window,
|
|
|
|
|
)))
|
|
|
|
|
} else {
|
|
|
|
|
Err(())
|
|
|
|
|
Err(ExchangeError::IncorrectQuerySizeOrMoreBytes)
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
Ok(ExchangeResult::NewState(ExchangeState::Simple(sq_state)))
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
fn process_pipe(&mut self, mut pipe_s: PipeState) -> Result<ExchangeResult<'a>, ()> {
|
|
|
|
|
fn process_pipe(&mut self, mut pipe_s: PipeState) -> Result<ExchangeResult<'a>, ExchangeError> {
|
|
|
|
|
// try to complete the packet size if needed
|
|
|
|
|
pipe_s.packet_s.update_scanned(&mut self.scanner)?;
|
|
|
|
|
pipe_s
|
|
|
|
|
.packet_s
|
|
|
|
|
.update_scanned(&mut self.scanner)
|
|
|
|
|
.map_err(|_| ExchangeError::NotAsciiByteOrOverflow)?;
|
|
|
|
|
if pipe_s.packet_s.flag() & self.scanner.remaining_size_is(pipe_s.packet_s.int()) {
|
|
|
|
|
// great, we have the entire packet
|
|
|
|
|
Ok(ExchangeResult::Pipeline(Pipeline::new(
|
|
|
|
@ -300,6 +324,7 @@ impl<'a> Exchange<'a> {
|
|
|
|
|
pipeline
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
#[derive(Debug, PartialEq)]
|
|
|
|
|
pub struct Pipeline<'a> {
|
|
|
|
|
scanner: BufferedScanner<'a>,
|
|
|
|
|
}
|
|
|
|
@ -310,7 +335,7 @@ impl<'a> Pipeline<'a> {
|
|
|
|
|
scanner: BufferedScanner::new(buf),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
pub fn next_query(&mut self) -> Result<Option<SQuery<'a>>, ()> {
|
|
|
|
|
pub fn next_query(&mut self) -> Result<Option<SQuery<'a>>, ExchangeError> {
|
|
|
|
|
let nonzero = self.scanner.buffer_len() != 0;
|
|
|
|
|
if self.scanner.eof() & nonzero {
|
|
|
|
|
Ok(None)
|
|
|
|
@ -318,13 +343,13 @@ impl<'a> Pipeline<'a> {
|
|
|
|
|
let query_size = scan_usize_guaranteed_termination(&mut self.scanner)?;
|
|
|
|
|
let param_size = scan_usize_guaranteed_termination(&mut self.scanner)?;
|
|
|
|
|
let (full_size, overflow) = param_size.overflowing_add(query_size);
|
|
|
|
|
if compiler::likely(self.scanner.remaining_size_is(full_size) & !overflow) {
|
|
|
|
|
if compiler::likely(self.scanner.has_left(full_size) & !overflow) {
|
|
|
|
|
Ok(Some(SQuery {
|
|
|
|
|
buf: self.scanner.current_buffer(),
|
|
|
|
|
q_window: query_size,
|
|
|
|
|
}))
|
|
|
|
|
} else {
|
|
|
|
|
Err(())
|
|
|
|
|
Err(ExchangeError::IncorrectQuerySizeOrMoreBytes)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|