Skip to content

Commit

Permalink
h2spec ETA: 140/147
Browse files Browse the repository at this point in the history
- h2 converter takes into account the window of the converted stream
- update windows upon receiving a new initial_window setting
- track negative windows

Signed-off-by: Eloi DEMOLIS <[email protected]>
  • Loading branch information
Wonshtrum authored and Keksoj committed Jul 19, 2024
1 parent c7f709a commit 8ae52a9
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 47 deletions.
2 changes: 1 addition & 1 deletion lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ hdrhistogram = "^7.5.4"
hex = "^0.4.3"
hpack = "^0.3.0"
idna = "^0.5.0"
kawa = { version = "^0.6.6", default-features = false }
kawa = { version = "^0.6.7", default-features = false }
libc = "^0.2.155"
memchr = "^2.7.2"
mio = { version = "^1.0.0", features = ["os-poll", "os-ext", "net"] }
Expand Down
23 changes: 17 additions & 6 deletions lib/src/protocol/mux/converter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use crate::protocol::{
};

pub struct H2BlockConverter<'a> {
pub window: i32,
pub stream_id: StreamId,
pub encoder: &'a mut hpack::Encoder<'static>,
pub out: Vec<u8>,
Expand Down Expand Up @@ -122,12 +123,21 @@ impl<'a, T: AsBuffer> BlockConverter<T> for H2BlockConverter<'a> {
}
Block::Chunk(Chunk { data }) => {
let mut header = [0; 9];
let payload_len = match &data {
Store::Empty => 0,
Store::Detached(s) | Store::Slice(s) => s.len,
Store::Static(s) => s.len() as u32,
Store::Alloc(a, i) => a.len() as u32 - i,
let payload_len = data.len();
let (data, payload_len, can_continue) = if self.window >= payload_len as i32 {
// the window is wide enought to send the entire chunk
(data, payload_len as u32, true)
} else if self.window > 0 {
// we split the chunk to fit in the window
let (before, after) = data.split(self.window as usize);
kawa.blocks.push_front(Block::Chunk(Chunk { data: after }));
(before, self.window as u32, false)
} else {
// the window can't take any more bytes, return the chunk to the blocks
kawa.blocks.push_front(Block::Chunk(Chunk { data }));
return false;
};
self.window -= payload_len as i32;
gen_frame_header(
&mut header,
&FrameHeader {
Expand All @@ -140,7 +150,8 @@ impl<'a, T: AsBuffer> BlockConverter<T> for H2BlockConverter<'a> {
.unwrap();
kawa.push_out(Store::from_slice(&header));
kawa.push_out(data);
kawa.push_delimiter()
kawa.push_delimiter();
return can_continue;
}
Block::Flags(Flags {
end_header,
Expand Down
80 changes: 56 additions & 24 deletions lib/src/protocol/mux/h2.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::HashMap;
use std::{cmp::min, collections::HashMap};

use rusty_ulid::Ulid;
use sozu_command::ready::Ready;
Expand All @@ -9,7 +9,7 @@ use crate::{
converter, debug_kawa, forcefully_terminate_answer,
parser::{
self, error_code_to_str, Frame, FrameHeader, FrameType, H2Error, Headers, ParserError,
ParserErrorKind, StreamDependency, WindowUpdate,
ParserErrorKind, WindowUpdate,
},
pkawa, serializer, set_default_answer, update_readiness_after_read,
update_readiness_after_write, BackendStatus, Context, Endpoint, GenericHttpStream,
Expand Down Expand Up @@ -127,13 +127,13 @@ pub struct ConnectionH2<Front: SocketHandler> {
impl<Front: SocketHandler> std::fmt::Debug for ConnectionH2<Front> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ConnectionH2")
.field("expect", &self.expect_read)
.field("position", &self.position)
.field("state", &self.state)
.field("expect", &self.expect_read)
.field("readiness", &self.readiness)
.field("local_settings", &self.local_settings)
.field("peer_settings", &self.peer_settings)
.field("socket", &self.socket.socket_ref())
.field("state", &self.state)
.field("streams", &self.streams)
.field("zero", &self.zero.storage.meter(20))
.field("window", &self.window)
Expand Down Expand Up @@ -185,15 +185,6 @@ impl<Front: SocketHandler> ConnectionH2<Front> {
return self.force_disconnect();
}
}
// (
// H2State::Frame(FrameHeader {
// payload_len,
// frame_type: FrameType::Data,
// flags,
// stream_id,
// }),
// _,
// ) => {}
_ => {}
}
return MuxResult::Continue;
Expand Down Expand Up @@ -340,9 +331,7 @@ impl<Front: SocketHandler> ConnectionH2<Front> {
Some(_) => {}
None => return self.goaway(H2Error::InternalError),
}
} else if header.frame_type != FrameType::Priority
&& header.frame_type != FrameType::WindowUpdate
{
} else if header.frame_type != FrameType::Priority {
println_!(
"ONLY HEADERS AND PRIORITY CAN BE RECEIVED ON IDLE/CLOSED STREAMS"
);
Expand Down Expand Up @@ -546,6 +535,7 @@ impl<Front: SocketHandler> ConnectionH2<Front> {
}

let mut converter = converter::H2BlockConverter {
window: 0,
stream_id: 0,
encoder: &mut self.encoder,
out: Vec::new(),
Expand All @@ -557,10 +547,16 @@ impl<Front: SocketHandler> ConnectionH2<Front> {
'outer: for stream_id in priorities {
let global_stream_id = *self.streams.get(stream_id).unwrap();
let stream = &mut context.streams[global_stream_id];
let kawa = stream.wbuffer(&self.position);
let parts = stream.split(&self.position);
let kawa = parts.wbuffer;
if kawa.is_main_phase() || kawa.is_error() {
let window = min(*parts.window, self.window);
converter.window = window;
converter.stream_id = *stream_id;
kawa.prepare(&mut converter);
let consumed = window - converter.window;
*parts.window -= consumed;
self.window -= consumed;
debug_kawa(kawa);
}
while !kawa.out.is_empty() {
Expand Down Expand Up @@ -828,7 +824,7 @@ impl<Front: SocketHandler> ConnectionH2<Front> {
1 => { self.peer_settings.settings_header_table_size = v },
2 => { self.peer_settings.settings_enable_push = v == 1; is_error |= v > 1 },
3 => { self.peer_settings.settings_max_concurrent_streams = v },
4 => { self.peer_settings.settings_initial_window_size = v; is_error |= v >= 1<<31 },
4 => { is_error |= self.update_initial_window_size(v, context) },
5 => { self.peer_settings.settings_max_frame_size = v; is_error |= v >= 1<<24 || v < 1<<14 },
6 => { self.peer_settings.settings_max_header_list_size = v },
8 => { self.peer_settings.settings_enable_connect_protocol = v == 1; is_error |= v > 1 },
Expand Down Expand Up @@ -881,23 +877,29 @@ impl<Front: SocketHandler> ConnectionH2<Front> {
}) => {
let increment = increment as i32;
if stream_id == 0 {
if increment > i32::MAX - self.window {
return self.goaway(H2Error::FlowControlError);
if let Some(window) = self.window.checked_add(increment) {
if self.window <= 0 && window > 0 {
self.readiness.interest.insert(Ready::WRITABLE);
}
self.window = window;
} else {
self.window += increment;
return self.goaway(H2Error::FlowControlError);
}
} else {
if let Some(global_stream_id) = self.streams.get(&stream_id) {
let stream = &mut context.streams[*global_stream_id];
if increment > i32::MAX - stream.window {
if let Some(window) = stream.window.checked_add(increment) {
if stream.window <= 0 && window > 0 {
self.readiness.interest.insert(Ready::WRITABLE);
}
stream.window = window;
} else {
return self.reset_stream(
*global_stream_id,
context,
endpoint,
H2Error::FlowControlError,
);
} else {
stream.window += increment;
}
} else {
println_!(
Expand All @@ -911,6 +913,36 @@ impl<Front: SocketHandler> ConnectionH2<Front> {
MuxResult::Continue
}

fn update_initial_window_size<L>(&mut self, value: u32, context: &mut Context<L>) -> bool
where
L: ListenerHandler + L7ListenerHandler,
{
if value >= 1 << 31 {
return true;
}
let delta = value as i32 - self.peer_settings.settings_initial_window_size as i32;
println!(
"INITIAL_WINDOW_SIZE: {} -> {} => {}",
self.peer_settings.settings_initial_window_size, value, delta
);
let mut open_window = false;
for (i, stream) in context.streams.iter_mut().enumerate() {
println!(
" - stream_{i}: {} -> {}",
stream.window,
stream.window + delta
);
open_window |= stream.window <= 0 && stream.window + delta > 0;
stream.window += delta;
}
println_!("UPDATE INIT WINDOW: {open_window} {:?}", self.readiness);
if open_window {
self.readiness.interest.insert(Ready::WRITABLE);
}
self.peer_settings.settings_initial_window_size = value;
false
}

pub fn force_disconnect(&mut self) -> MuxResult {
self.state = H2State::Error;
match self.position {
Expand Down
7 changes: 5 additions & 2 deletions lib/src/protocol/mux/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ impl<Front: SocketHandler> Connection<Front> {
state: H2State::ClientPreface,
streams: HashMap::new(),
timeout_container,
window: 1 << 16,
window: (1 << 16) - 1,
zero: kawa::Kawa::new(kawa::Kind::Request, kawa::Buffer::new(buffer)),
}))
}
Expand Down Expand Up @@ -306,7 +306,7 @@ impl<Front: SocketHandler> Connection<Front> {
state: H2State::ClientPreface,
streams: HashMap::new(),
timeout_container,
window: 1 << 16,
window: (1 << 16) - 1,
zero: kawa::Kawa::new(kawa::Kind::Request, kawa::Buffer::new(buffer)),
}))
}
Expand Down Expand Up @@ -586,6 +586,7 @@ pub struct Stream {
/// This struct allows to mutably borrow the read and write buffers (dependant on the position)
/// as well as the context of a Stream at the same time
pub struct StreamParts<'a> {
pub window: &'a mut i32,
pub rbuffer: &'a mut GenericHttpStream,
pub wbuffer: &'a mut GenericHttpStream,
pub context: &'a mut HttpContext,
Expand Down Expand Up @@ -616,11 +617,13 @@ impl Stream {
pub fn split(&mut self, position: &Position) -> StreamParts<'_> {
match position {
Position::Client(_) => StreamParts {
window: &mut self.window,
rbuffer: &mut self.back,
wbuffer: &mut self.front,
context: &mut self.context,
},
Position::Server => StreamParts {
window: &mut self.window,
rbuffer: &mut self.front,
wbuffer: &mut self.back,
context: &mut self.context,
Expand Down
14 changes: 0 additions & 14 deletions lib/src/protocol/mux/pkawa.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,6 @@ use crate::{
},
};

trait AdHocStore {
fn len(&self) -> usize;
}
impl AdHocStore for Store {
fn len(&self) -> usize {
match self {
Store::Empty => 0,
Store::Slice(slice) | Store::Detached(slice) => slice.len(),
Store::Static(s) => s.len(),
Store::Alloc(a, i) => a.len() - *i as usize,
}
}
}

pub fn handle_header<C>(
decoder: &mut hpack::Decoder,
prioriser: &mut Prioriser,
Expand Down

0 comments on commit 8ae52a9

Please sign in to comment.