Add a header magic

We add a header magic '0x0D' or the CR byte. This acts as a boundary
between multiple queries on the same connection
next
Sayan Nandan 3 years ago
parent 88b4eb88a1
commit 0bdbd81f92

@ -24,6 +24,9 @@
*
*/
/// The header magic (a '\r' or CR)
const START_HEADER_MAGIC: u8 = 0x0D;
#[derive(Debug)]
pub(super) struct Parser<'a> {
cursor: usize,
@ -85,8 +88,9 @@ impl<'a> Parser<'a> {
/// This function will return the number of bytes this sizeline has (this is usually the number of items in
/// the following line)
/// This **will forward the cursor itself**
fn read_sizeline(&mut self) -> ParseResult<usize> {
if let Some(b'#') = self.buffer.get(self.cursor) {
fn read_sizeline(&mut self, opt_char: Option<u8>) -> ParseResult<usize> {
let opt_char: u8 = opt_char.unwrap_or(b'#');
if let Some(opt_char) = self.buffer.get(self.cursor) {
// Good, we found a #; time to move ahead
self.incr_cursor();
// Now read the remaining line
@ -122,8 +126,8 @@ impl<'a> Parser<'a> {
///
/// This **will forward the cursor itself**
fn parse_metaframe_get_datagroup_count(&mut self) -> ParseResult<usize> {
// This will give us the `#<m>\n`
let metaframe_sizeline = self.read_sizeline()?;
// This will give us the `\r<m>\n`
let metaframe_sizeline = self.read_sizeline(Some(START_HEADER_MAGIC))?;
// Now we want to read `*<n>\n`
let our_chunk = self.read_until(metaframe_sizeline)?;
if our_chunk[0] == b'*' {
@ -142,7 +146,7 @@ impl<'a> Parser<'a> {
/// This will return the number of items in a datagroup
fn parse_datagroup_get_group_size(&mut self) -> ParseResult<usize> {
// This will give us `#<p>\n`
let dataframe_sizeline = self.read_sizeline()?;
let dataframe_sizeline = self.read_sizeline(None)?;
// Now we want to read `&<q>\n`
let our_chunk = self.read_until(dataframe_sizeline)?;
if our_chunk[0] == b'&' {
@ -161,7 +165,7 @@ impl<'a> Parser<'a> {
/// for the next datagroup element
fn parse_next_datagroup_element(&mut self) -> ParseResult<Vec<u8>> {
// So we need to read the sizeline for this element first!
let element_size = self.read_sizeline()?;
let element_size = self.read_sizeline(None)?;
// Now we want to read the element itself
let mut ret = Vec::with_capacity(element_size);
ret.extend_from_slice(self.read_until(element_size)?);
@ -177,20 +181,19 @@ impl<'a> Parser<'a> {
for _ in 0..len {
elements.push(self.parse_next_datagroup_element()?);
}
if self.parse_next_datagroup_element().is_ok() {
// they sent one more element when we already have what we expected? that's unexpected
Err(ParseError::UnexpectedByte)
} else {
Ok(elements)
}
Ok(elements)
}
pub fn parse(mut self) -> Result<(Query, usize), ParseError> {
let number_of_queries = self.parse_metaframe_get_datagroup_count()?;
if number_of_queries == 1 {
// This is a simple query
let single_group = self.parse_next_actiongroup()?;
if self.parse_next_actiongroup().is_ok() {
// they sent one more actiongroup? that's unexpected
// The below line defaults to false if no item is there in the buffer
// or it checks if the next time is a \r char; if it is, then it is the beginning
// of the next query
if self.buffer.get(self.cursor).map_or(false, |v| *v != b'\r') {
// the next item isn't the beginning of a query but something else?
// that doesn't look right!
Err(ParseError::UnexpectedByte)
} else {
Ok((Query::SimpleQuery(single_group), self.cursor))
@ -202,12 +205,7 @@ impl<'a> Parser<'a> {
for _ in 0..number_of_queries {
queries.push(self.parse_next_actiongroup()?);
}
if self.parse_next_actiongroup().is_ok() {
// they sent one more actiongroup? that's unexpected
Err(ParseError::UnexpectedByte)
} else {
Ok((Query::PipelinedQuery(queries), self.cursor))
}
Ok((Query::PipelinedQuery(queries), self.cursor))
}
}
}
@ -216,13 +214,13 @@ impl<'a> Parser<'a> {
fn test_sizeline_parse() {
let sizeline = "#125\n".as_bytes();
let mut parser = Parser::new(&sizeline);
assert_eq!(125, parser.read_sizeline().unwrap());
assert_eq!(125, parser.read_sizeline(None).unwrap());
assert_eq!(parser.cursor, sizeline.len());
}
#[test]
fn test_metaframe_parse() {
let metaframe = "#2\n*2\n".as_bytes();
let metaframe = "\r2\n*2\n".as_bytes();
let mut parser = Parser::new(&metaframe);
assert_eq!(2, parser.parse_metaframe_get_datagroup_count().unwrap());
assert_eq!(parser.cursor, metaframe.len());
@ -263,7 +261,7 @@ fn test_parse_actiongroup_single() {
#[test]
fn test_complete_query_packet_parse() {
let query_packet = "#2\n*1\n#2\n&2\n#3\nGET\n#3\nfoo\n".as_bytes();
let query_packet = "\r2\n*1\n#2\n&2\n#3\nGET\n#3\nfoo\n".as_bytes();
let (res, forward_by) = Parser::new(&query_packet).parse().unwrap();
assert_eq!(
res,

Loading…
Cancel
Save