diff --git a/ext/hyper_ruby/src/lib.rs b/ext/hyper_ruby/src/lib.rs index 54f785c..dbd892d 100644 --- a/ext/hyper_ruby/src/lib.rs +++ b/ext/hyper_ruby/src/lib.rs @@ -27,7 +27,7 @@ use crossbeam_channel; use hyper::service::service_fn; use hyper::{Error, Request as HyperRequest, Response as HyperResponse, StatusCode}; -use hyper::body::Incoming; +use hyper::body::{Body, Incoming}; use hyper_util::rt::TokioIo; use hyper_util::server::conn::auto; use http_body_util::BodyExt; @@ -494,7 +494,17 @@ async fn handle_request( debug!("Headers: {:?}", req.headers()); let (parts, body) = req.into_parts(); - + + // Capture the declared body length before consuming the body. Hyper's + // `Incoming::size_hint().exact()` mirrors the inbound Content-Length + // (populated from the header for H2 as well as H1), and is the only + // signal we have here: hyper converts `RST_STREAM(NO_ERROR|CANCEL)` — + // the codes browsers send on navigation cancellation — into a clean + // end-of-body rather than surfacing the reset (see + // hyper-1.6.0/src/body/incoming.rs:~249). We therefore validate the + // declared length against what we actually collected. + let declared_len = body.size_hint().exact(); + // Collect the body with timeout let body_bytes = match timeout( std::time::Duration::from_millis(recv_timeout), @@ -510,9 +520,19 @@ async fn handle_request( return Ok(create_timeout_response()); } }; - + debug!("Collected body size: {}", body_bytes.len()); + if let Some(declared) = declared_len { + if declared != body_bytes.len() as u64 { + warn!( + "Body truncated: declared {} bytes, received {} — likely RST_STREAM(CANCEL); rejecting request", + declared, body_bytes.len() + ); + return Ok(create_bad_request_response("Body length does not match Content-Length")); + } + } + let hyper_request = HyperRequest::from_parts(parts, body_bytes); let is_grpc = grpc::is_grpc_request(&hyper_request); debug!("Is gRPC: {}", is_grpc); @@ -628,11 +648,19 @@ fn create_too_many_requests_response(error_message: &str) -> HyperResponse HyperResponse { + HyperResponse::builder() + .status(StatusCode::BAD_REQUEST) + .header("content-type", "text/plain") + .body(BodyWithTrailers::new(Bytes::from(error_message.to_string()), None)) + .unwrap() +} + #[magnus::init] fn init(ruby: &Ruby) -> Result<(), MagnusError> { let module = ruby.define_module("HyperRuby")?; diff --git a/test/test_h2_stream_reset.rb b/test/test_h2_stream_reset.rb new file mode 100644 index 0000000..14c8429 --- /dev/null +++ b/test/test_h2_stream_reset.rb @@ -0,0 +1,112 @@ +# frozen_string_literal: true + +require "test_helper" +require "httpx" # with_configured_server builds an HTTPX client (unused here) +require "socket" +require "timeout" + +# Regression test for a silent-truncation bug: when an HTTP/2 peer sends +# HEADERS + a partial DATA frame + RST_STREAM (the frame sequence a browser +# produces when it cancels an in-flight request on page navigation), the +# server currently hands the truncated body to the Ruby handler as if the +# request had completed normally. Downstream consumers (Kafka, etc.) then +# see a short body alongside the original Content-Length header, producing +# confusingly "truncated" payloads. +# +# Correct behaviour: a stream that ends via RST_STREAM (not END_STREAM) is +# not a completed request; the handler should not run. +class TestH2StreamReset < HyperRubyTest + PORT = 3010 + PARTIAL_BYTES = 16_384 # default HTTP/2 SETTINGS_MAX_FRAME_SIZE + CLAIMED_CONTENT_LENGTH = 450_403 + + def test_rst_stream_after_partial_data_does_not_invoke_handler + invocations = [] + mutex = Mutex.new + + handler = lambda do |request| + mutex.synchronize do + invocations << { + path: request.path, + body_size: request.body_size, + content_length: request.header("content-length"), + } + end + HyperRuby::Response.new(200, { "Content-Type" => "text/plain" }, "ok") + end + + config = { bind_address: "127.0.0.1:#{PORT}", tokio_threads: 1, recv_timeout: 1_000 } + + with_configured_server(config, handler) do + send_h2_headers_data_rst( + host: "127.0.0.1", + port: PORT, + partial_bytes: PARTIAL_BYTES, + claimed_content_length: CLAIMED_CONTENT_LENGTH, + ) + + # Give the server time to hand off to the worker if it's going to. + sleep 0.2 + end + + mutex.synchronize do + assert_empty invocations, + "handler should not have been invoked for a RST_STREAM'd request, but it ran with: #{invocations.inspect}" + end + end + + private + + H2_PREFACE = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n".b + TYPE_DATA = 0x0 + TYPE_HEADERS = 0x1 + TYPE_RST_STREAM = 0x3 + TYPE_SETTINGS = 0x4 + FLAG_END_HEADERS = 0x4 + FLAG_ACK = 0x1 + RST_CODE_CANCEL = 0x8 # what Chromium sends on navigation + + def h2_frame(type, flags, stream_id, payload) + len = payload.bytesize + [len >> 16 & 0xff, len >> 8 & 0xff, len & 0xff, + type, flags, stream_id & 0x7fffffff].pack("CCCCCN") + payload + end + + # Minimal HPACK: "literal header field, never indexed, new name" with + # 7-bit string lengths. Names and values must fit in 126 bytes — fine for + # everything we send here. + def hpack_literal(name, value) + raise "name too long" if name.bytesize > 126 + raise "value too long" if value.bytesize > 126 + [0x10, name.bytesize].pack("CC") + name.b + + [value.bytesize].pack("C") + value.b + end + + def send_h2_headers_data_rst(host:, port:, partial_bytes:, claimed_content_length:) + sock = TCPSocket.new(host, port) + sock.write(H2_PREFACE) + sock.write(h2_frame(TYPE_SETTINGS, 0, 0, "".b)) + sock.write(h2_frame(TYPE_SETTINGS, FLAG_ACK, 0, "".b)) + + headers = "".b + headers << hpack_literal(":method", "POST") + headers << hpack_literal(":scheme", "http") + headers << hpack_literal(":path", "/rst-truncated") + headers << hpack_literal(":authority", "#{host}:#{port}") + headers << hpack_literal("content-type", "application/octet-stream") + headers << hpack_literal("content-length", claimed_content_length.to_s) + + sock.write(h2_frame(TYPE_HEADERS, FLAG_END_HEADERS, 1, headers)) + sock.write(h2_frame(TYPE_DATA, 0, 1, "X".b * partial_bytes)) + sock.write(h2_frame(TYPE_RST_STREAM, 0, 1, [RST_CODE_CANCEL].pack("N"))) + + # Drain anything the server may have written before we hung up; we don't + # care what it is. + begin + Timeout.timeout(0.5) { sock.read(4096) } + rescue Timeout::Error + end + ensure + sock&.close + end +end