net: Fix pipeline impl and add basic pipe test

next
Sayan Nandan 6 months ago
parent 6414b05fa6
commit b65fb6041a
No known key found for this signature in database
GPG Key ID: 0EBD769024B24F0A

@ -86,15 +86,15 @@ fn scan_usize_guaranteed_termination(
let mut ret = 0usize; let mut ret = 0usize;
let mut stop = scanner.rounded_eq(b'\n'); let mut stop = scanner.rounded_eq(b'\n');
while !scanner.eof() & !stop { while !scanner.eof() & !stop {
let next_byte = unsafe { let this_byte = unsafe {
// UNSAFE(@ohsayan): loop invariant // UNSAFE(@ohsayan): loop invariant
scanner.next_byte() scanner.next_byte()
}; };
match ret match ret
.checked_mul(10) .checked_mul(10)
.map(|int| int.checked_add((next_byte & 0x0f) as usize)) .map(|int| int.checked_add((this_byte & 0x0f) as usize))
{ {
Some(Some(int)) if next_byte.is_ascii_digit() => ret = int, Some(Some(int)) if this_byte.is_ascii_digit() => ret = int,
_ => return Err(ExchangeError::NotAsciiByteOrOverflow), _ => return Err(ExchangeError::NotAsciiByteOrOverflow),
} }
stop = scanner.rounded_eq(b'\n'); stop = scanner.rounded_eq(b'\n');
@ -357,22 +357,33 @@ impl<'a> Pipeline<'a> {
scanner: BufferedScanner::new(buf), scanner: BufferedScanner::new(buf),
} }
} }
pub fn next_query(&mut self) -> Result<Option<SQuery<'a>>, ExchangeError> { }
impl<'a> Iterator for Pipeline<'a> {
type Item = Result<SQuery<'a>, ExchangeError>;
fn next(&mut self) -> Option<Self::Item> {
let nonzero = self.scanner.buffer_len() != 0; let nonzero = self.scanner.buffer_len() != 0;
if self.scanner.eof() & nonzero { if self.scanner.eof() & nonzero {
Ok(None) None
} else { } else {
let query_size = scan_usize_guaranteed_termination(&mut self.scanner)?; let mut ret = || {
let param_size = scan_usize_guaranteed_termination(&mut self.scanner)?; let query_size = scan_usize_guaranteed_termination(&mut self.scanner)?;
let (full_size, overflow) = param_size.overflowing_add(query_size); let param_size = scan_usize_guaranteed_termination(&mut self.scanner)?;
if compiler::likely(self.scanner.has_left(full_size) & !overflow) { let (full_size, overflow) = param_size.overflowing_add(query_size);
Ok(Some(SQuery { if compiler::likely(self.scanner.has_left(full_size) & !overflow) {
buf: self.scanner.current_buffer(), let block = unsafe {
q_window: query_size, // UNSAFE(@ohsayan): checked in above branch
})) self.scanner.next_chunk_variable(full_size)
} else { };
Err(ExchangeError::IncorrectQuerySizeOrMoreBytes) Ok(SQuery {
} buf: block,
q_window: query_size,
})
} else {
Err(ExchangeError::IncorrectQuerySizeOrMoreBytes)
}
};
Some(ret())
} }
} }
} }

@ -363,12 +363,12 @@ async fn exec_pipe<'a, S: Socket>(
con: &mut BufWriter<S>, con: &mut BufWriter<S>,
cs: &mut ClientLocalState, cs: &mut ClientLocalState,
global: &Global, global: &Global,
mut pipe: Pipeline<'a>, pipe: Pipeline<'a>,
) -> IoResult<()> { ) -> IoResult<()> {
loop { let mut pipe = pipe.into_iter();
match pipe.next_query() { while let Some(query) = pipe.next() {
Ok(None) => break Ok(()), match query {
Ok(Some(q)) => { Ok(q) => {
write_response( write_response(
engine::core::exec::dispatch_to_executor(global, cs, q).await, engine::core::exec::dispatch_to_executor(global, cs, q).await,
con, con,
@ -381,8 +381,9 @@ async fn exec_pipe<'a, S: Socket>(
.to_le_bytes(); .to_le_bytes();
con.write_all(&[ResponseType::Error.value_u8(), a, b]) con.write_all(&[ResponseType::Error.value_u8(), a, b])
.await?; .await?;
break Ok(()); return Ok(());
} }
} }
} }
Ok(())
} }

@ -363,8 +363,7 @@ impl EQuery {
/* /*
prepare the "back" of the payload prepare the "back" of the payload
*/ */
let encoded_params: String = params.iter().flat_map(|param| param.chars()).collect(); let total_size = query.len() + params.iter().map(|p| p.len()).sum::<usize>();
let total_size = query.len() + encoded_params.len();
let total_size_string = format!("{total_size}\n"); let total_size_string = format!("{total_size}\n");
/* /*
@ -531,3 +530,67 @@ fn simple_query() {
}) })
} }
} }
/*
pipeline
*/
fn pipe_query<const N: usize>(q: &str, p: [&str; N]) -> String {
let mut buffer = String::new();
buffer.extend(q.len().to_string().chars());
buffer.push('\n');
buffer.extend(
p.iter()
.map(|_p| _p.len())
.sum::<usize>()
.to_string()
.chars(),
);
buffer.push('\n');
buffer.extend(q.chars());
for p_ in p {
buffer.push_str(p_);
}
buffer
}
fn pipe<const N: usize>(queries: [String; N]) -> String {
let packed_queries = queries.concat();
format!("P{}\n{packed_queries}", packed_queries.len())
}
#[test]
fn full_pipe_scan() {
let pipeline_buffer = pipe([
pipe_query("create space myspace", []),
pipe_query(
"create model myspace.mymodel(username: string, password: string)",
[],
),
pipe_query("insert into myspace.mymodel(?, ?)", ["sayan", "cake"]),
]);
let (pipeline, cursor) = Exchange::try_complete(
BufferedScanner::new(pipeline_buffer.as_bytes()),
ExchangeState::default(),
)
.unwrap();
assert_eq!(cursor, pipeline_buffer.len());
let pipeline: Vec<SQuery<'_>> = match pipeline {
ExchangeResult::Pipeline(p) => p.into_iter().map(Result::unwrap).collect(),
_ => panic!("expected pipeline got: {:?}", pipeline),
};
assert_eq!(
pipeline,
vec![
SQuery::_new(b"create space myspace", "create space myspace".len()),
SQuery::_new(
b"create model myspace.mymodel(username: string, password: string)",
"create model myspace.mymodel(username: string, password: string)".len()
),
SQuery::_new(
b"insert into myspace.mymodel(?, ?)sayancake",
"insert into myspace.mymodel(?, ?)".len()
)
]
);
}

Loading…
Cancel
Save