From b9161f1b9c590f71b1a09c67fe284c9e5a8d5fa1 Mon Sep 17 00:00:00 2001 From: Sayan Nandan Date: Thu, 4 Apr 2024 17:54:44 +0530 Subject: [PATCH] net: Add more pipeline tests in protocol --- server/src/engine/net/protocol/tests.rs | 104 ++++++++++++++++++++++++ 1 file changed, 104 insertions(+) diff --git a/server/src/engine/net/protocol/tests.rs b/server/src/engine/net/protocol/tests.rs index 7d7986bd..c0ad111a 100644 --- a/server/src/engine/net/protocol/tests.rs +++ b/server/src/engine/net/protocol/tests.rs @@ -748,3 +748,107 @@ fn pipeline() { }) } } + +fn run_staged(full_payload: &[u8], f: impl Fn(ExchangeResult)) { + let mut read_amount = 0; + let mut state = ExchangeState::default(); + let mut cursor = 0; + loop { + let buffer = &full_payload[..read_amount]; + let scanner = unsafe { BufferedScanner::new_with_cursor(buffer, cursor) }; + match Exchange::try_complete(scanner, state).unwrap() { + (result, new_cursor) => match result { + ExchangeResult::NewState(new_state) => { + state = new_state; + cursor = new_cursor; + } + state => { + f(state); + break; + } + }, + } + read_amount += 1; + } +} + +#[test] +fn staged_simple_query() { + for eq in [ + EQuery::new( + "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz123456789".repeat(1000), + &[], + ), + EQuery::new( + "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz123456789".repeat(1000), + &["hello", "world"], + ), + ] { + run_staged(eq.payload.as_bytes(), |state| match state { + ExchangeResult::NewState(_) => unreachable!(), + ExchangeResult::Simple(s) => { + assert_eq!(s.query(), eq.query.as_bytes()); + assert_eq!(s.params(), eq.payload[eq.param_range.clone()].as_bytes()); + } + ExchangeResult::Pipeline(_) => unreachable!(), + }); + } +} + +#[test] +fn staged_pipeline() { + for epipe in [ + EPipe::new([ + // small query with no params + EPipeQuery::new("create space myspace".to_owned(), &[]), + // small query with params + EPipeQuery::new( + "insert into myspace.mymodel(?, ?)".to_owned(), + &["sayan", "elx"], + ), + // giant query with no params + EPipeQuery::new( + "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz123456789".repeat(1000), + &[], + ), + // giant query with params + EPipeQuery::new( + "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz123456789".repeat(1000), + &["hello", "world"], + ), + ]), + EPipe::new([ + // giant query with no params + EPipeQuery::new( + "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz123456789".repeat(1000), + &[], + ), + // giant query with params + EPipeQuery::new( + "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz123456789".repeat(1000), + &["hello", "world"], + ), + // small query with no params + EPipeQuery::new("create space myspace".to_owned(), &[]), + // small query with params + EPipeQuery::new( + "insert into myspace.mymodel(?, ?)".to_owned(), + &["sayan", "elx"], + ), + ]), + ] { + run_staged(epipe.payload.as_bytes(), |result| match result { + ExchangeResult::NewState(_) => unreachable!(), + ExchangeResult::Simple(_) => unreachable!(), + ExchangeResult::Pipeline(p) => { + p.into_iter() + .zip(epipe.queries.iter()) + .for_each(|(spq, epq)| { + let spq = spq.unwrap(); + assert_eq!(spq.query(), epq.q.as_bytes()); + assert_eq!(spq.params(), epq.payload[epq.p_range.clone()].as_bytes()); + }) + } + }) + } +}