diff --git a/Project.toml b/Project.toml index ec81981..11dbe24 100644 --- a/Project.toml +++ b/Project.toml @@ -1,6 +1,6 @@ name = "RustyObjectStore" uuid = "1b5eed3d-1f46-4baa-87f3-a4a892b23610" -version = "0.6.5" +version = "0.7.0" [deps] DocStringExtensions = "ffbed154-4ef7-542d-bbb7-c09d3a79fcae" @@ -16,7 +16,7 @@ ReTestItems = "1" Sockets = "1" Test = "1" julia = "1.8" -object_store_ffi_jll = "0.6.5" +object_store_ffi_jll = "0.7.0" [extras] CloudBase = "85eb1798-d7c4-4918-bb13-c944d38e27ed" diff --git a/src/RustyObjectStore.jl b/src/RustyObjectStore.jl index 451f0e8..8f656c7 100644 --- a/src/RustyObjectStore.jl +++ b/src/RustyObjectStore.jl @@ -73,7 +73,7 @@ end const DEFAULT_CONFIG = StaticConfig( n_threads=0, - cache_capacity=20, + cache_capacity=100, cache_ttl_secs=30 * 60, cache_tti_secs=5 * 60, multipart_put_threshold=10 * 1024 * 1024, @@ -82,17 +82,60 @@ const DEFAULT_CONFIG = StaticConfig( concurrency_limit=512 ) +function default_panic_hook() + println("Rust thread panicked, exiting the process") + exit(1) +end + const _OBJECT_STORE_STARTED = Ref(false) const _INIT_LOCK::ReentrantLock = ReentrantLock() +_PANIC_HOOK::Function = default_panic_hook struct InitException <: Exception msg::String return_code::Cint end -function default_panic_hook() - println("Rust thread panicked, exiting the process") - exit(1) +Base.@ccallable function panic_hook_wrapper()::Cint + global _PANIC_HOOK + _PANIC_HOOK() + return 0 +end + +# This is the callback that Rust calls to notify a Julia task of a completed operation. +# The argument is transparent to Rust and is simply what gets passed from Julia in the handle +# argument of the @ccall. Currently we pass a pointer to a Base.Event that must be notified to +# wakeup the appropriate task. +Base.@ccallable function notify_result(event_ptr::Ptr{Nothing})::Cint + event = unsafe_pointer_to_objref(event_ptr)::Base.Event + notify(event) + return 0 +end + +# A dict of all tasks that are waiting some result from Rust +# and should thus not be garbage collected. +# This copies the behavior of Base.preserve_handle. +const tasks_in_flight = IdDict{Task, Int64}() +const preserve_task_lock = Threads.SpinLock() +function preserve_task(x::Task) + @lock preserve_task_lock begin + v = get(tasks_in_flight, x, 0)::Int + tasks_in_flight[x] = v + 1 + end + nothing +end +function unpreserve_task(x::Task) + @lock preserve_task_lock begin + v = get(tasks_in_flight, x, 0)::Int + if v == 0 + error("unbalanced call to unpreserve_task for $(typeof(x))") + elseif v == 1 + pop!(tasks_in_flight, x) + else + tasks_in_flight[x] = v - 1 + end + end + nothing end """ @@ -117,23 +160,15 @@ function init_object_store( config::StaticConfig=DEFAULT_CONFIG; on_rust_panic::Function=default_panic_hook ) + global _PANIC_HOOK @lock _INIT_LOCK begin if _OBJECT_STORE_STARTED[] return nothing end - cond = Base.AsyncCondition() - errormonitor(Threads.@spawn begin - while true - wait(cond) - try - on_rust_panic() - catch e - @error "Custom panic hook failed" exception=(e, catch_backtrace()) - end - end - end) - panic_cond_handle = cond.handle - res = @ccall rust_lib.start(config::StaticConfig, panic_cond_handle::Ptr{Cvoid})::Cint + _PANIC_HOOK = on_rust_panic + panic_fn_ptr = @cfunction(panic_hook_wrapper, Cint, ()) + fn_ptr = @cfunction(notify_result, Cint, (Ptr{Nothing},)) + res = @ccall rust_lib.start(config::StaticConfig, panic_fn_ptr::Ptr{Nothing}, fn_ptr::Ptr{Nothing})::Cint if res != 0 throw(InitException("Failed to initialise object store runtime.", res)) end @@ -161,10 +196,10 @@ function throw_on_error(response, operation, exception) return :( $(esc(:($response.result == 1))) ? throw($exception($response_error_to_string($(esc(response)), $operation))) : $(nothing) ) end -function ensure_wait(cond::Base.AsyncCondition) +function ensure_wait(event::Base.Event) for i in 1:20 try - return wait(cond) + return wait(event) catch e @error "cannot skip this wait point to prevent UB, ignoring exception: $(e)" end @@ -174,12 +209,12 @@ function ensure_wait(cond::Base.AsyncCondition) exit(1) end -function wait_or_cancel(cond::Base.AsyncCondition, response) +function wait_or_cancel(event::Base.Event, response) try - return wait(cond) + return wait(event) catch e @ccall rust_lib.cancel_context(response.context::Ptr{Cvoid})::Cint - ensure_wait(cond) + ensure_wait(event) @ccall rust_lib.destroy_cstring(response.error_message::Ptr{Cchar})::Cint rethrow() finally @@ -597,9 +632,17 @@ end function rust_message_to_reason(msg::AbstractString) - if contains(msg, "tcp connect error: deadline has elapsed") || - contains(msg, "tcp connect error: Connection refused") || - contains(msg, "error trying to connect: dns error") + if ( + contains(msg, "connection error") + || contains(msg, "tcp connect error") + || contains(msg, "error trying to connect") + || contains(msg, "client error (Connect)") + ) && ( + contains(msg, "deadline has elapsed") + || contains(msg, "Connection refused") + || contains(msg, "Connection reset by peer") + || contains(msg, "dns error") + ) return ConnectionError() elseif contains(msg, "Client error with status") m = match(r"Client error with status (\d+) ", msg) @@ -626,7 +669,8 @@ function rust_message_to_reason(msg::AbstractString) return UnknownError() end elseif contains(msg, "connection closed before message completed") || - contains(msg, "end of file before message length reached") + contains(msg, "end of file before message length reached") || + contains(msg, "Connection reset by peer") return EarlyEOF() elseif contains(msg, "timed out") return TimeoutError() @@ -664,28 +708,32 @@ Fetches the data bytes at `path` and writes them to the given `buffer`. function get_object!(buffer::AbstractVector{UInt8}, path::String, conf::AbstractConfig) response = Response() size = length(buffer) - cond = Base.AsyncCondition() - cond_handle = cond.handle + ct = current_task() + event = Base.Event() + handle = pointer_from_objref(event) config = into_config(conf) while true - result = GC.@preserve buffer config response cond begin + preserve_task(ct) + result = GC.@preserve buffer config response event try result = @ccall rust_lib.get( path::Cstring, buffer::Ref{Cuchar}, size::Culonglong, config::Ref{Config}, response::Ref{Response}, - cond_handle::Ptr{Cvoid} + handle::Ptr{Cvoid} )::Cint - wait_or_cancel(cond, response) + wait_or_cancel(event, response) result + finally + unpreserve_task(ct) end if result == 2 # backoff - sleep(1.0) + sleep(0.01) continue end @@ -719,28 +767,32 @@ Atomically writes the data bytes in `buffer` to `path`. function put_object(buffer::AbstractVector{UInt8}, path::String, conf::AbstractConfig) response = Response() size = length(buffer) - cond = Base.AsyncCondition() - cond_handle = cond.handle + ct = current_task() + event = Base.Event() + handle = pointer_from_objref(event) config = into_config(conf) while true - result = GC.@preserve buffer config response cond begin + preserve_task(ct) + result = GC.@preserve buffer config response event try result = @ccall rust_lib.put( path::Cstring, buffer::Ref{Cuchar}, size::Culonglong, config::Ref{Config}, response::Ref{Response}, - cond_handle::Ptr{Cvoid} + handle::Ptr{Cvoid} )::Cint - wait_or_cancel(cond, response) + wait_or_cancel(event, response) result + finally + unpreserve_task(ct) end if result == 2 # backoff - sleep(1.0) + sleep(0.01) continue end @@ -766,26 +818,30 @@ Send a delete request to the object store. """ function delete_object(path::String, conf::AbstractConfig) response = Response() - cond = Base.AsyncCondition() - cond_handle = cond.handle + ct = current_task() + event = Base.Event() + handle = pointer_from_objref(event) config = into_config(conf) while true - result = GC.@preserve config response cond begin + preserve_task(ct) + result = GC.@preserve config response event try result = @ccall rust_lib.delete( path::Cstring, config::Ref{Config}, response::Ref{Response}, - cond_handle::Ptr{Cvoid} + handle::Ptr{Cvoid} )::Cint - wait_or_cancel(cond, response) + wait_or_cancel(event, response) result + finally + unpreserve_task(ct) end if result == 2 # backoff - sleep(1.0) + sleep(0.01) continue end @@ -841,18 +897,22 @@ function Base.eof(io::ReadStream) return false else response = ReadResponseFFI() - cond = Base.AsyncCondition() - cond_handle = cond.handle - GC.@preserve io response cond begin + ct = current_task() + event = Base.Event() + handle = pointer_from_objref(event) + preserve_task(ct) + GC.@preserve io response event try result = @ccall rust_lib.is_end_of_stream( io.ptr::Ptr{Cvoid}, response::Ref{ReadResponseFFI}, - cond_handle::Ptr{Cvoid} + handle::Ptr{Cvoid} )::Cint @assert result == 0 - wait_or_cancel(cond, response) + wait_or_cancel(event, response) + finally + unpreserve_task(ct) end try @@ -982,29 +1042,33 @@ Send a get request to the object store returning a stream of object data. """ function get_object_stream(path::String, conf::AbstractConfig; size_hint::Int=0, decompress::String="") response = ReadStreamResponseFFI() - cond = Base.AsyncCondition() - cond_handle = cond.handle + ct = current_task() + event = Base.Event() + handle = pointer_from_objref(event) config = into_config(conf) hint = convert(UInt64, size_hint) while true - result = GC.@preserve config response cond begin + preserve_task(ct) + result = GC.@preserve config response event try result = @ccall rust_lib.get_stream( path::Cstring, hint::Culonglong, decompress::Cstring, config::Ref{Config}, response::Ref{ReadStreamResponseFFI}, - cond_handle::Ptr{Cvoid} + handle::Ptr{Cvoid} )::Cint - wait_or_cancel(cond, response) + wait_or_cancel(event, response) result + finally + unpreserve_task(ct) end if result == 2 # backoff - sleep(1.0) + sleep(0.01) continue end @@ -1030,19 +1094,23 @@ function _unsafe_read(stream::ReadStream, dest::Ptr{UInt8}, bytes_to_read::Int) end response = ReadResponseFFI() - cond = Base.AsyncCondition() - cond_handle = cond.handle - GC.@preserve stream dest response cond begin + ct = current_task() + event = Base.Event() + handle = pointer_from_objref(event) + preserve_task(ct) + GC.@preserve stream dest response event try result = @ccall rust_lib.read_from_stream( stream.ptr::Ptr{Cvoid}, dest::Ptr{UInt8}, bytes_to_read::Culonglong, bytes_to_read::Culonglong, response::Ref{ReadResponseFFI}, - cond_handle::Ptr{Cvoid} + handle::Ptr{Cvoid} )::Cint - wait_or_cancel(cond, response) + wait_or_cancel(event, response) + finally + unpreserve_task(ct) end try @@ -1142,27 +1210,31 @@ Send a put request to the object store returning a stream to write data into. """ function put_object_stream(path::String, conf::AbstractConfig; compress::String="") response = WriteStreamResponseFFI() - cond = Base.AsyncCondition() - cond_handle = cond.handle + ct = current_task() + event = Base.Event() + handle = pointer_from_objref(event) config = into_config(conf) while true - result = GC.@preserve config response cond begin + preserve_task(ct) + result = GC.@preserve config response event try result = @ccall rust_lib.put_stream( path::Cstring, compress::Cstring, config::Ref{Config}, response::Ref{WriteStreamResponseFFI}, - cond_handle::Ptr{Cvoid} + handle::Ptr{Cvoid} )::Cint - wait_or_cancel(cond, response) + wait_or_cancel(event, response) result + finally + unpreserve_task(ct) end if result == 2 # backoff - sleep(1.0) + sleep(0.01) continue end @@ -1222,18 +1294,22 @@ function shutdown!(stream::WriteStream) end response = WriteResponseFFI() - cond = Base.AsyncCondition() - cond_handle = cond.handle - GC.@preserve stream response cond begin + ct = current_task() + event = Base.Event() + handle = pointer_from_objref(event) + GC.@preserve stream response event try + preserve_task(ct) result = @ccall rust_lib.shutdown_write_stream( stream.ptr::Ptr{Cvoid}, response::Ref{WriteResponseFFI}, - cond_handle::Ptr{Cvoid} + handle::Ptr{Cvoid} )::Cint @assert result == 0 - wait_or_cancel(cond, response) + wait_or_cancel(event, response) + finally + unpreserve_task(ct) end try @@ -1296,21 +1372,25 @@ function _unsafe_write(stream::WriteStream, input::Ptr{UInt8}, nbytes::Int; flus end response = WriteResponseFFI() - cond = Base.AsyncCondition() - cond_handle = cond.handle - GC.@preserve stream response cond begin + ct = current_task() + event = Base.Event() + handle = pointer_from_objref(event) + GC.@preserve stream response event try + preserve_task(ct) result = @ccall rust_lib.write_to_stream( stream.ptr::Ptr{Cvoid}, input::Ptr{UInt8}, nbytes::Culonglong, flush::Cuchar, response::Ref{WriteResponseFFI}, - cond_handle::Ptr{Cvoid} + handle::Ptr{Cvoid} )::Cint @assert result == 0 - wait_or_cancel(cond, response) + wait_or_cancel(event, response) + finally + unpreserve_task(ct) end try diff --git a/test/aws_s3_exception_tests.jl b/test/aws_s3_exception_tests.jl index 59ebe64..243145a 100644 --- a/test/aws_s3_exception_tests.jl +++ b/test/aws_s3_exception_tests.jl @@ -33,7 +33,7 @@ @test false # Should have thrown an error catch err @test err isa RustyObjectStore.GetException - @test err.msg == "failed to process get with error: Supplied buffer was too small" + @test occursin("Supplied buffer was too small", err.msg) end end @@ -253,6 +253,148 @@ end # @testitem return nrequests[] end + function dummy_cb(handle::Ptr{Cvoid}) + return nothing + end + + function test_tcp_reset(method) + @assert method === :GET || method === :PUT + nrequests = Ref(0) + + (port, tcp_server) = Sockets.listenany(8082) + @async begin + while true + sock = Sockets.accept(tcp_server) + _ = read(sock, 4) + nrequests[] += 1 + ccall( + :uv_tcp_close_reset, + Cint, + (Ptr{Cvoid}, Ptr{Cvoid}), + sock.handle, @cfunction(dummy_cb, Cvoid, (Ptr{Cvoid},)) + ) + end + end + + baseurl = "http://127.0.0.1:$port" + conf = AWSConfig(; + region=region, + bucket_name=container, + access_key_id=dummy_access_key_id, + secret_access_key=dummy_secret_access_key, + host=baseurl, + opts=ClientOptions(; + max_retries=max_retries, + retry_timeout_secs=retry_timeout_secs + ) + ) + + try + method === :GET && get_object!(zeros(UInt8, 5), "blob", conf) + method === :PUT && put_object(codeunits("a,b,c"), "blob", conf) + @test false # Should have thrown an error + catch e + method === :GET && @test e isa RustyObjectStore.GetException + method === :PUT && @test e isa RustyObjectStore.PutException + @test occursin("reset by peer", e.msg) + @test is_connection(e) + finally + close(tcp_server) + end + return nrequests[] + end + + function test_get_stream_reset() + nrequests = Ref(0) + + (port, tcp_server) = Sockets.listenany(8083) + http_server = HTTP.listen!(tcp_server) do http::HTTP.Stream + nrequests[] += 1 + HTTP.setstatus(http, 200) + HTTP.setheader(http, "Content-Length" => "20") + HTTP.startwrite(http) + write(http, "not enough") + socket = HTTP.IOExtras.tcpsocket(HTTP.Connections.getrawstream(http)) + ccall( + :uv_tcp_close_reset, + Cint, + (Ptr{Cvoid}, Ptr{Cvoid}), + socket.handle, @cfunction(dummy_cb, Cvoid, (Ptr{Cvoid},)) + ) + close(http.stream) + end + + baseurl = "http://127.0.0.1:$port" + conf = AWSConfig(; + region=region, + bucket_name=container, + access_key_id=dummy_access_key_id, + secret_access_key=dummy_secret_access_key, + host=baseurl, + opts=ClientOptions(; + max_retries=max_retries, + retry_timeout_secs=retry_timeout_secs + ) + ) + + try + get_object!(zeros(UInt8, 20), "blob", conf) + @test false # Should have thrown an error + catch e + @test e isa RustyObjectStore.GetException + @test occursin("Connection reset by peer", e.msg) + @test is_early_eof(e) + finally + Threads.@spawn HTTP.forceclose(http_server) + end + # wait(http_server) + return nrequests[] + end + + function test_get_stream_timeout() + nrequests = Ref(0) + + (port, tcp_server) = Sockets.listenany(8083) + http_server = HTTP.listen!(tcp_server) do http::HTTP.Stream + nrequests[] += 1 + HTTP.setstatus(http, 200) + HTTP.setheader(http, "Content-Length" => "20") + HTTP.setheader(http, "Last-Modified" => "Tue, 15 Oct 2019 12:45:26 GMT") + HTTP.setheader(http, "ETag" => "123") + HTTP.startwrite(http) + write(http, "not enough") + sleep(10) + close(http.stream) + end + + baseurl = "http://127.0.0.1:$port" + conf = AWSConfig(; + region=region, + bucket_name=container, + access_key_id=dummy_access_key_id, + secret_access_key=dummy_secret_access_key, + host=baseurl, + opts=ClientOptions(; + max_retries=max_retries, + retry_timeout_secs=retry_timeout_secs, + request_timeout_secs + ) + ) + + try + get_object!(zeros(UInt8, 20), "blob", conf) + @test false # Should have thrown an error + catch e + @test e isa RustyObjectStore.GetException + @test occursin("operation timed out", e.msg) + @test is_timeout(e) + finally + Threads.@spawn HTTP.forceclose(http_server) + end + # wait(http_server) + return nrequests[] + end + function test_status(method, response_status, headers=nothing) @assert method === :GET || method === :PUT nrequests = Ref(0) @@ -534,11 +676,28 @@ end # @testitem @test nrequests == 1 + max_retries end + @testset "TCP reset" begin + nrequests = test_tcp_reset(:GET) + @test nrequests == 1 + max_retries + nrequests = test_tcp_reset(:PUT) + @test nrequests == 1 + max_retries + end + @testset "Incomplete GET body" begin nrequests = test_get_stream_error() @test nrequests == 1 + max_retries end + @testset "Incomplete GET body reset" begin + nrequests = test_get_stream_reset() + @test nrequests == 1 + max_retries + end + + @testset "Incomplete GET body timeout" begin + nrequests = test_get_stream_timeout() + @test nrequests == 1 + max_retries + end + @testset "Cancellation" begin nrequests = test_cancellation() @test nrequests == 1 diff --git a/test/azure_blobs_exception_tests.jl b/test/azure_blobs_exception_tests.jl index 59d17be..f2dbeb6 100644 --- a/test/azure_blobs_exception_tests.jl +++ b/test/azure_blobs_exception_tests.jl @@ -31,7 +31,7 @@ @test false # Should have thrown an error catch err @test err isa RustyObjectStore.GetException - @test err.msg == "failed to process get with error: Supplied buffer was too small" + @test occursin("Supplied buffer was too small", err.msg) end end @@ -51,7 +51,7 @@ @test false # Should have thrown an error catch err @test err isa RustyObjectStore.GetException - @test err.msg == "failed to process get with error: Supplied buffer was too small" + @test occursin("Supplied buffer was too small", err.msg) end end @@ -351,6 +351,148 @@ end # @testitem return nrequests[] end + function dummy_cb(handle::Ptr{Cvoid}) + return nothing + end + + function test_tcp_reset(method) + @assert method === :GET || method === :PUT + nrequests = Ref(0) + + (port, tcp_server) = Sockets.listenany(8082) + @async begin + while true + sock = Sockets.accept(tcp_server) + _ = read(sock, 4) + nrequests[] += 1 + ccall( + :uv_tcp_close_reset, + Cint, + (Ptr{Cvoid}, Ptr{Cvoid}), + sock.handle, @cfunction(dummy_cb, Cvoid, (Ptr{Cvoid},)) + ) + end + end + + baseurl = "http://127.0.0.1:$port/$account/$container/" + conf = AzureConfig(; + storage_account_name=account, + container_name=container, + storage_account_key=shared_key_from_azurite, + host=baseurl, + opts=ClientOptions(; + max_retries=max_retries, + retry_timeout_secs=retry_timeout_secs, + request_timeout_secs + ) + ) + + try + method === :GET && get_object!(zeros(UInt8, 5), "blob", conf) + method === :PUT && put_object(codeunits("a,b,c"), "blob", conf) + @test false # Should have thrown an error + catch e + method === :GET && @test e isa RustyObjectStore.GetException + method === :PUT && @test e isa RustyObjectStore.PutException + @test occursin("reset by peer", e.msg) + @test is_connection(e) + finally + close(tcp_server) + end + return nrequests[] + end + + function test_get_stream_reset() + nrequests = Ref(0) + + (port, tcp_server) = Sockets.listenany(8083) + http_server = HTTP.listen!(tcp_server) do http::HTTP.Stream + nrequests[] += 1 + HTTP.setstatus(http, 200) + HTTP.setheader(http, "Content-Length" => "20") + HTTP.setheader(http, "Last-Modified" => "Tue, 15 Oct 2019 12:45:26 GMT") + HTTP.setheader(http, "ETag" => "123") + HTTP.startwrite(http) + write(http, "not enough") + socket = HTTP.IOExtras.tcpsocket(HTTP.Connections.getrawstream(http)) + ccall( + :uv_tcp_close_reset, + Cint, + (Ptr{Cvoid}, Ptr{Cvoid}), + socket.handle, @cfunction(dummy_cb, Cvoid, (Ptr{Cvoid},)) + ) + close(http.stream) + end + + baseurl = "http://127.0.0.1:$port/$account/$container/" + conf = AzureConfig(; + storage_account_name=account, + container_name=container, + storage_account_key=shared_key_from_azurite, + host=baseurl, + opts=ClientOptions(; + max_retries=max_retries, + retry_timeout_secs=retry_timeout_secs + ) + ) + + try + get_object!(zeros(UInt8, 20), "blob", conf) + @test false # Should have thrown an error + catch e + @test e isa RustyObjectStore.GetException + @test occursin("Connection reset by peer", e.msg) + @test is_early_eof(e) + finally + Threads.@spawn HTTP.forceclose(http_server) + end + # wait(http_server) + return nrequests[] + end + + function test_get_stream_timeout() + nrequests = Ref(0) + + (port, tcp_server) = Sockets.listenany(8083) + http_server = HTTP.listen!(tcp_server) do http::HTTP.Stream + nrequests[] += 1 + HTTP.setstatus(http, 200) + HTTP.setheader(http, "Content-Length" => "20") + HTTP.setheader(http, "Last-Modified" => "Tue, 15 Oct 2019 12:45:26 GMT") + HTTP.setheader(http, "ETag" => "123") + HTTP.startwrite(http) + write(http, "not enough") + sleep(10) + close(http.stream) + end + + baseurl = "http://127.0.0.1:$port/$account/$container/" + conf = AzureConfig(; + storage_account_name=account, + container_name=container, + storage_account_key=shared_key_from_azurite, + host=baseurl, + opts=ClientOptions(; + max_retries=max_retries, + retry_timeout_secs=retry_timeout_secs, + request_timeout_secs + ) + ) + + try + get_object!(zeros(UInt8, 20), "blob", conf) + @test false # Should have thrown an error + catch e + @test e isa RustyObjectStore.GetException + @test occursin("operation timed out", e.msg) + @test is_timeout(e) + finally + Threads.@spawn HTTP.forceclose(http_server) + end + # wait(http_server) + return nrequests[] + end + function test_timeout(method, message, wait_secs::Int = 60) @assert method === :GET || method === :PUT nrequests = Ref(0) @@ -594,11 +736,28 @@ end # @testitem @test nrequests == 1 + max_retries end + @testset "TCP reset" begin + nrequests = test_tcp_reset(:GET) + @test nrequests == 1 + max_retries + nrequests = test_tcp_reset(:PUT) + @test nrequests == 1 + max_retries + end + @testset "Incomplete GET body" begin nrequests = test_get_stream_error() @test nrequests == 1 + max_retries end + @testset "Incomplete GET body reset" begin + nrequests = test_get_stream_reset() + @test nrequests == 1 + max_retries + end + + @testset "Incomplete GET body timeout" begin + nrequests = test_get_stream_timeout() + @test nrequests == 1 + max_retries + end + @testset "Cancellation" begin nrequests = test_cancellation() @test nrequests == 1