net: Add illegal packet escape for pipeline

Additional changes:
- Added changelog entry
next
Sayan Nandan 6 months ago
parent b9161f1b9c
commit 2dd6537af0
No known key found for this signature in database
GPG Key ID: 0EBD769024B24F0A

@ -2,6 +2,12 @@
All changes in this project will be noted in this file. All changes in this project will be noted in this file.
## Version 0.8.2
### Additions
- Skyhash/2.1: Restored support for pipelines
## Version 0.8.1 ## Version 0.8.1
### Additions ### Additions

@ -53,12 +53,14 @@ use {
}, },
}, },
super::{IoResult, QueryLoopResult, Socket}, super::{IoResult, QueryLoopResult, Socket},
crate::engine::{ crate::{
self, engine::{
core::system_db::VerifyUser, core::{exec, system_db::VerifyUser},
error::{QueryError, QueryResult}, error::{QueryError, QueryResult},
fractal::{Global, GlobalInstanceLike}, fractal::{Global, GlobalInstanceLike},
mem::{BufferedScanner, IntegerRepr}, mem::{BufferedScanner, IntegerRepr},
},
util::compiler,
}, },
bytes::{Buf, BytesMut}, bytes::{Buf, BytesMut},
tokio::io::{AsyncReadExt, AsyncWriteExt, BufWriter}, tokio::io::{AsyncReadExt, AsyncWriteExt, BufWriter},
@ -348,17 +350,17 @@ async fn exec_simple<S: Socket>(
global: &Global, global: &Global,
query: SQuery<'_>, query: SQuery<'_>,
) -> IoResult<()> { ) -> IoResult<()> {
write_response( write_response(exec::dispatch_to_executor(global, cs, query).await, con).await
engine::core::exec::dispatch_to_executor(global, cs, query).await,
con,
)
.await
} }
/* /*
pipeline pipeline
---
malformed packets
*/ */
const ILLEGAL_PACKET_ESCAPE: u8 = 0xFF;
async fn exec_pipe<'a, S: Socket>( async fn exec_pipe<'a, S: Socket>(
con: &mut BufWriter<S>, con: &mut BufWriter<S>,
cs: &mut ClientLocalState, cs: &mut ClientLocalState,
@ -368,20 +370,10 @@ async fn exec_pipe<'a, S: Socket>(
let mut pipe = pipe.into_iter(); let mut pipe = pipe.into_iter();
while let Some(query) = pipe.next() { while let Some(query) = pipe.next() {
match query { match query {
Ok(q) => { Ok(q) => write_response(exec::dispatch_to_executor(global, cs, q).await, con).await?,
write_response(
engine::core::exec::dispatch_to_executor(global, cs, q).await,
con,
)
.await?
}
Err(_) => { Err(_) => {
// respond with error return compiler::cold_call(|| async { con.write_u8(ILLEGAL_PACKET_ESCAPE).await })
let [a, b] = (QueryError::SysNetworkSystemIllegalClientPacket.value_u8() as u16) .await
.to_le_bytes();
con.write_all(&[ResponseType::Error.value_u8(), a, b])
.await?;
return Ok(());
} }
} }
} }

Loading…
Cancel
Save