Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 32 additions & 4 deletions ext/hyper_ruby/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crossbeam_channel;

use hyper::service::service_fn;
use hyper::{Error, Request as HyperRequest, Response as HyperResponse, StatusCode};
use hyper::body::Incoming;
use hyper::body::{Body, Incoming};
use hyper_util::rt::TokioIo;
use hyper_util::server::conn::auto;
use http_body_util::BodyExt;
Expand Down Expand Up @@ -494,7 +494,17 @@ async fn handle_request(
debug!("Headers: {:?}", req.headers());

let (parts, body) = req.into_parts();


// Capture the declared body length before consuming the body. Hyper's
// `Incoming::size_hint().exact()` mirrors the inbound Content-Length
// (populated from the header for H2 as well as H1), and is the only
// signal we have here: hyper converts `RST_STREAM(NO_ERROR|CANCEL)` —
// the codes browsers send on navigation cancellation — into a clean
// end-of-body rather than surfacing the reset (see
// hyper-1.6.0/src/body/incoming.rs:~249). We therefore validate the
// declared length against what we actually collected.
let declared_len = body.size_hint().exact();

// Collect the body with timeout
let body_bytes = match timeout(
std::time::Duration::from_millis(recv_timeout),
Expand All @@ -510,9 +520,19 @@ async fn handle_request(
return Ok(create_timeout_response());
}
};

debug!("Collected body size: {}", body_bytes.len());

if let Some(declared) = declared_len {
if declared != body_bytes.len() as u64 {
warn!(
"Body truncated: declared {} bytes, received {} — likely RST_STREAM(CANCEL); rejecting request",
declared, body_bytes.len()
);
return Ok(create_bad_request_response("Body length does not match Content-Length"));
}
}

let hyper_request = HyperRequest::from_parts(parts, body_bytes);
let is_grpc = grpc::is_grpc_request(&hyper_request);
debug!("Is gRPC: {}", is_grpc);
Expand Down Expand Up @@ -628,11 +648,19 @@ fn create_too_many_requests_response(error_message: &str) -> HyperResponse<BodyW
let builder = HyperResponse::builder()
.status(StatusCode::TOO_MANY_REQUESTS)
.header("content-type", "text/plain");

builder.body(BodyWithTrailers::new(Bytes::from(error_message.to_string()), None))
.unwrap()
}

fn create_bad_request_response(error_message: &str) -> HyperResponse<BodyWithTrailers> {
HyperResponse::builder()
.status(StatusCode::BAD_REQUEST)
.header("content-type", "text/plain")
.body(BodyWithTrailers::new(Bytes::from(error_message.to_string()), None))
.unwrap()
}

#[magnus::init]
fn init(ruby: &Ruby) -> Result<(), MagnusError> {
let module = ruby.define_module("HyperRuby")?;
Expand Down
112 changes: 112 additions & 0 deletions test/test_h2_stream_reset.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
# frozen_string_literal: true

require "test_helper"
require "httpx" # with_configured_server builds an HTTPX client (unused here)
require "socket"
require "timeout"

# Regression test for a silent-truncation bug: when an HTTP/2 peer sends
# HEADERS + a partial DATA frame + RST_STREAM (the frame sequence a browser
# produces when it cancels an in-flight request on page navigation), the
# server currently hands the truncated body to the Ruby handler as if the
# request had completed normally. Downstream consumers (Kafka, etc.) then
# see a short body alongside the original Content-Length header, producing
# confusingly "truncated" payloads.
#
# Correct behaviour: a stream that ends via RST_STREAM (not END_STREAM) is
# not a completed request; the handler should not run.
class TestH2StreamReset < HyperRubyTest
PORT = 3010
PARTIAL_BYTES = 16_384 # default HTTP/2 SETTINGS_MAX_FRAME_SIZE
CLAIMED_CONTENT_LENGTH = 450_403

def test_rst_stream_after_partial_data_does_not_invoke_handler
invocations = []
mutex = Mutex.new

handler = lambda do |request|
mutex.synchronize do
invocations << {
path: request.path,
body_size: request.body_size,
content_length: request.header("content-length"),
}
end
HyperRuby::Response.new(200, { "Content-Type" => "text/plain" }, "ok")
end

config = { bind_address: "127.0.0.1:#{PORT}", tokio_threads: 1, recv_timeout: 1_000 }

with_configured_server(config, handler) do
send_h2_headers_data_rst(
host: "127.0.0.1",
port: PORT,
partial_bytes: PARTIAL_BYTES,
claimed_content_length: CLAIMED_CONTENT_LENGTH,
)

# Give the server time to hand off to the worker if it's going to.
sleep 0.2
end

mutex.synchronize do
assert_empty invocations,
"handler should not have been invoked for a RST_STREAM'd request, but it ran with: #{invocations.inspect}"
end
end

private

H2_PREFACE = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n".b
TYPE_DATA = 0x0
TYPE_HEADERS = 0x1
TYPE_RST_STREAM = 0x3
TYPE_SETTINGS = 0x4
FLAG_END_HEADERS = 0x4
FLAG_ACK = 0x1
RST_CODE_CANCEL = 0x8 # what Chromium sends on navigation

def h2_frame(type, flags, stream_id, payload)
len = payload.bytesize
[len >> 16 & 0xff, len >> 8 & 0xff, len & 0xff,
type, flags, stream_id & 0x7fffffff].pack("CCCCCN") + payload
end

# Minimal HPACK: "literal header field, never indexed, new name" with
# 7-bit string lengths. Names and values must fit in 126 bytes — fine for
# everything we send here.
def hpack_literal(name, value)
raise "name too long" if name.bytesize > 126
raise "value too long" if value.bytesize > 126
[0x10, name.bytesize].pack("CC") + name.b +
[value.bytesize].pack("C") + value.b
end

def send_h2_headers_data_rst(host:, port:, partial_bytes:, claimed_content_length:)
sock = TCPSocket.new(host, port)
sock.write(H2_PREFACE)
sock.write(h2_frame(TYPE_SETTINGS, 0, 0, "".b))
sock.write(h2_frame(TYPE_SETTINGS, FLAG_ACK, 0, "".b))

headers = "".b
headers << hpack_literal(":method", "POST")
headers << hpack_literal(":scheme", "http")
headers << hpack_literal(":path", "/rst-truncated")
headers << hpack_literal(":authority", "#{host}:#{port}")
headers << hpack_literal("content-type", "application/octet-stream")
headers << hpack_literal("content-length", claimed_content_length.to_s)

sock.write(h2_frame(TYPE_HEADERS, FLAG_END_HEADERS, 1, headers))
sock.write(h2_frame(TYPE_DATA, 0, 1, "X".b * partial_bytes))
sock.write(h2_frame(TYPE_RST_STREAM, 0, 1, [RST_CODE_CANCEL].pack("N")))

# Drain anything the server may have written before we hung up; we don't
# care what it is.
begin
Timeout.timeout(0.5) { sock.read(4096) }
rescue Timeout::Error
end
ensure
sock&.close
end
end
Loading