Skip to content

Commit

Permalink
Correctly autoRead through the pipeline (#52)
Browse files Browse the repository at this point in the history
Motivation:

When autoRead is on, the pipeline must observe the read calls in order
to be able to exert backpressure. Otherwise, autoRead is a
zero-backpressure mode, which isn't great.

Correctly call pipeline.read instead of self.read0 to avoid this.

Modifications:

- Updated NIOTSConnectionChannel to call pipeline.read().

Result:

Backpressure can be exerted.
  • Loading branch information
Lukasa authored and weissi committed Jul 23, 2019
1 parent eec3aed commit 6cba688
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 1 deletion.
2 changes: 1 addition & 1 deletion Sources/NIOTransportServices/NIOTSConnectionChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -625,7 +625,7 @@ extension NIOTSConnectionChannel: StateManagedChannel {
/// A function that will trigger a socket read if necessary.
internal func readIfNeeded0() {
if self.options.autoRead {
self.read0()
self.pipeline.read()
}
}
}
Expand Down
71 changes: 71 additions & 0 deletions Tests/NIOTransportServicesTests/NIOTSConnectionChannelTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -642,5 +642,76 @@ class NIOTSConnectionChannelTests: XCTestCase {
XCTFail("Unexpected error: \(error)")
}
}

func testAutoReadTraversesThePipeline() throws {
// This test is driven entirely by a channel handler inserted into the client channel.
final class TestHandler: ChannelDuplexHandler {
typealias InboundIn = ByteBuffer
typealias OutboundIn = ByteBuffer
typealias OutboundOut = ByteBuffer

var readCount = 0

private let testCompletePromise: EventLoopPromise<Void>

init(testCompletePromise: EventLoopPromise<Void>) {
self.testCompletePromise = testCompletePromise
}

func read(context: ChannelHandlerContext) {
self.readCount += 1
context.read()
}

func channelActive(context: ChannelHandlerContext) {
var buffer = context.channel.allocator.buffer(capacity: 12)
buffer.writeString("Hello, world!")

context.writeAndFlush(self.wrapOutboundOut(buffer), promise: nil)
}

func channelRead(context: ChannelHandlerContext, data: NIOAny) {
var buffer = self.unwrapInboundIn(data)
if buffer.readString(length: buffer.readableBytes) == "Hello, world!" {
self.testCompletePromise.succeed(())
}
}
}

let testCompletePromise = self.group.next().makePromise(of: Void.self)
let testHandler = TestHandler(testCompletePromise: testCompletePromise)
let listener = try assertNoThrowWithValue(NIOTSListenerBootstrap(group: self.group)
.childChannelInitializer { channel in channel.pipeline.addHandler(EchoHandler()) }
.bind(host: "localhost", port: 0).wait())
defer {
XCTAssertNoThrow(try listener.close().wait())
}

let connectBootstrap = NIOTSConnectionBootstrap(group: self.group)
.channelInitializer { channel in channel.pipeline.addHandler(testHandler) }

let connection = try assertNoThrowWithValue(connectBootstrap.connect(to: listener.localAddress!).wait())
defer {
XCTAssertNoThrow(try connection.close().wait())
}

// Let the test run.
XCTAssertNoThrow(try testCompletePromise.futureResult.wait())

// When the test is completed, we expect the following:
//
// 1. channelActive, which leads to a write and flush.
// 2. read, triggered by autoRead.
// 3. channelRead, enabled by the read above, which completes our promise.
// 4. IN THE SAME EVENT LOOP TICK, read(), triggered by autoRead.
//
// Thus, once the test has completed we can enter the event loop and check the read count.
// We expect 2.
XCTAssertNoThrow(try connection.eventLoop.submit {
XCTAssertEqual(testHandler.readCount, 2)
}.wait())


}
}
#endif
14 changes: 14 additions & 0 deletions Tests/NIOTransportServicesTests/NIOTSEndToEndTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,20 @@ import Foundation
import Network


func assertNoThrowWithValue<T>(_ body: @autoclosure () throws -> T, defaultValue: T? = nil, message: String? = nil, file: StaticString = #file, line: UInt = #line) throws -> T {
do {
return try body()
} catch {
XCTFail("\(message.map { $0 + ": " } ?? "")unexpected error \(error) thrown", file: file, line: line)
if let defaultValue = defaultValue {
return defaultValue
} else {
throw error
}
}
}


final class EchoHandler: ChannelInboundHandler {
typealias InboundIn = Any
typealias OutboundOut = Any
Expand Down

0 comments on commit 6cba688

Please sign in to comment.