net: Add more pipeline tests in protocol

next
Sayan Nandan 6 months ago
parent 356560feea
commit b9161f1b9c
No known key found for this signature in database
GPG Key ID: 0EBD769024B24F0A

@ -748,3 +748,107 @@ fn pipeline() {
}) })
} }
} }
fn run_staged(full_payload: &[u8], f: impl Fn(ExchangeResult)) {
let mut read_amount = 0;
let mut state = ExchangeState::default();
let mut cursor = 0;
loop {
let buffer = &full_payload[..read_amount];
let scanner = unsafe { BufferedScanner::new_with_cursor(buffer, cursor) };
match Exchange::try_complete(scanner, state).unwrap() {
(result, new_cursor) => match result {
ExchangeResult::NewState(new_state) => {
state = new_state;
cursor = new_cursor;
}
state => {
f(state);
break;
}
},
}
read_amount += 1;
}
}
#[test]
fn staged_simple_query() {
for eq in [
EQuery::new(
"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz123456789".repeat(1000),
&[],
),
EQuery::new(
"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz123456789".repeat(1000),
&["hello", "world"],
),
] {
run_staged(eq.payload.as_bytes(), |state| match state {
ExchangeResult::NewState(_) => unreachable!(),
ExchangeResult::Simple(s) => {
assert_eq!(s.query(), eq.query.as_bytes());
assert_eq!(s.params(), eq.payload[eq.param_range.clone()].as_bytes());
}
ExchangeResult::Pipeline(_) => unreachable!(),
});
}
}
#[test]
fn staged_pipeline() {
for epipe in [
EPipe::new([
// small query with no params
EPipeQuery::new("create space myspace".to_owned(), &[]),
// small query with params
EPipeQuery::new(
"insert into myspace.mymodel(?, ?)".to_owned(),
&["sayan", "elx"],
),
// giant query with no params
EPipeQuery::new(
"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz123456789".repeat(1000),
&[],
),
// giant query with params
EPipeQuery::new(
"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz123456789".repeat(1000),
&["hello", "world"],
),
]),
EPipe::new([
// giant query with no params
EPipeQuery::new(
"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz123456789".repeat(1000),
&[],
),
// giant query with params
EPipeQuery::new(
"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz123456789".repeat(1000),
&["hello", "world"],
),
// small query with no params
EPipeQuery::new("create space myspace".to_owned(), &[]),
// small query with params
EPipeQuery::new(
"insert into myspace.mymodel(?, ?)".to_owned(),
&["sayan", "elx"],
),
]),
] {
run_staged(epipe.payload.as_bytes(), |result| match result {
ExchangeResult::NewState(_) => unreachable!(),
ExchangeResult::Simple(_) => unreachable!(),
ExchangeResult::Pipeline(p) => {
p.into_iter()
.zip(epipe.queries.iter())
.for_each(|(spq, epq)| {
let spq = spq.unwrap();
assert_eq!(spq.query(), epq.q.as_bytes());
assert_eq!(spq.params(), epq.payload[epq.p_range.clone()].as_bytes());
})
}
})
}
}

Loading…
Cancel
Save