From 6dbe0fcfaab628d53824d602d6e5beb035c1c2ca Mon Sep 17 00:00:00 2001 From: Sayan Nandan Date: Mon, 10 May 2021 11:10:23 +0530 Subject: [PATCH] Add parsing for complete query --- server/src/protocol/parserv2.rs | 64 +++++++++++++++++++++++++++++---- 1 file changed, 57 insertions(+), 7 deletions(-) diff --git a/server/src/protocol/parserv2.rs b/server/src/protocol/parserv2.rs index 27c5b6a1..50b7a448 100644 --- a/server/src/protocol/parserv2.rs +++ b/server/src/protocol/parserv2.rs @@ -36,6 +36,14 @@ enum ParseError { UnexpectedByte, } +type ActionGroup = Vec>; + +#[derive(Debug, PartialEq)] +enum Query { + SimpleQuery(ActionGroup), + PipelinedQuery(Vec), +} + type ParseResult = Result; impl<'a> Parser<'a> { @@ -118,7 +126,7 @@ impl<'a> Parser<'a> { let metaframe_sizeline = self.read_sizeline()?; // Now we want to read `*\n` let our_chunk = self.read_until(metaframe_sizeline)?; - if our_chunk[0] == b'!' { + if our_chunk[0] == b'*' { // Good, this will tell us the number of actions // Let us attempt to read the usize from this point onwards // that is excluding the '!' (so 1..) @@ -169,7 +177,38 @@ impl<'a> Parser<'a> { for _ in 0..len { elements.push(self.parse_next_datagroup_element()?); } - Ok(elements) + if self.parse_next_datagroup_element().is_ok() { + // they sent one more element when we already have what we expected? that's unexpected + Err(ParseError::UnexpectedByte) + } else { + Ok(elements) + } + } + fn parse(mut self) -> Result<(Query, usize), ParseError> { + let number_of_queries = self.parse_metaframe_get_datagroup_count()?; + if number_of_queries == 1 { + // This is a simple query + let single_group = self.parse_next_actiongroup()?; + if self.parse_next_actiongroup().is_ok() { + // they sent one more actiongroup? that's unexpected + Err(ParseError::UnexpectedByte) + } else { + Ok((Query::SimpleQuery(single_group), self.cursor)) + } + } else { + // This is a pipelined query + // We'll first make space for all the actiongroups + let mut queries = Vec::with_capacity(number_of_queries); + for _ in 0..number_of_queries { + queries.push(self.parse_next_actiongroup()?); + } + if self.parse_next_actiongroup().is_ok() { + // they sent one more actiongroup? that's unexpected + Err(ParseError::UnexpectedByte) + } else { + Ok((Query::PipelinedQuery(queries), self.cursor)) + } + } } } @@ -183,7 +222,7 @@ fn test_sizeline_parse() { #[test] fn test_metaframe_parse() { - let metaframe = "#2\n!2\n".as_bytes(); + let metaframe = "#2\n*2\n".as_bytes(); let mut parser = Parser::new(&metaframe); assert_eq!(2, parser.parse_metaframe_get_datagroup_count().unwrap()); assert_eq!(parser.cursor, metaframe.len()); @@ -193,10 +232,7 @@ fn test_metaframe_parse() { fn test_actiongroup_size_parse() { let dataframe_layout = "#6\n&12345\n".as_bytes(); let mut parser = Parser::new(&dataframe_layout); - assert_eq!( - 12345, - parser.parse_datagroup_get_group_size().unwrap() - ); + assert_eq!(12345, parser.parse_datagroup_get_group_size().unwrap()); assert_eq!(parser.cursor, dataframe_layout.len()); } @@ -224,3 +260,17 @@ fn test_parse_actiongroup_single() { ); assert_eq!(parser.cursor, actiongroup.len()); } + +#[test] +fn test_complete_query_packet_parse() { + let query_packet = "#2\n*1\n#2\n&2\n#3\nGET\n#3\nfoo\n".as_bytes(); + let (res, forward_by) = Parser::new(&query_packet).parse().unwrap(); + assert_eq!( + res, + Query::SimpleQuery(vec![ + "GET".as_bytes().to_owned(), + "foo".as_bytes().to_owned() + ]) + ); + assert_eq!(forward_by, query_packet.len()); +}