From 4618230a1d8c03c6d31bbc1cca0ff562fcfd7939 Mon Sep 17 00:00:00 2001 From: Arnaud Le Blanc Date: Tue, 9 Jun 2015 20:08:13 +0200 Subject: [PATCH] Emit drain event when the request is ready to receive more data --- src/Request.php | 16 ++++++++--- tests/RequestTest.php | 62 ++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 74 insertions(+), 4 deletions(-) diff --git a/src/Request.php b/src/Request.php index 67ae3a1..8d96f6b 100644 --- a/src/Request.php +++ b/src/Request.php @@ -32,6 +32,8 @@ class Request implements WritableStreamInterface private $response; private $state = self::STATE_INIT; + private $pendingWrites = array(); + public function __construct(ConnectorInterface $connector, RequestData $requestData) { $this->connector = $connector; @@ -89,9 +91,17 @@ public function write($data) return $this->stream->write($data); } - $this->on('headers-written', function ($this) use ($data) { - $this->write($data); - }); + if (!count($this->pendingWrites)) { + $this->on('headers-written', function ($this) { + foreach ($this->pendingWrites as $pw) { + $this->write($pw); + } + $this->pendingWrites = array(); + $this->emit('drain', array($this)); + }); + } + + $this->pendingWrites[] = $data; if (self::STATE_WRITING_HEAD > $this->state) { $this->writeHead(); diff --git a/tests/RequestTest.php b/tests/RequestTest.php index eb17727..1193c90 100644 --- a/tests/RequestTest.php +++ b/tests/RequestTest.php @@ -7,6 +7,8 @@ use React\Stream\Stream; use React\Promise\FulfilledPromise; use React\Promise\RejectedPromise; +use React\Promise; +use React\Promise\Deferred; class RequestTest extends TestCase { @@ -288,6 +290,53 @@ public function writeWithAPostRequestShouldSendToTheStream() $request->handleData("\r\nbody"); } + /** @test */ + public function writeWithAPostRequestShouldSendBodyAfterHeadersAndEmitDrainEvent() + { + $requestData = new RequestData('POST', 'http://www.example.com'); + $request = new Request($this->connector, $requestData); + + $resolveConnection = $this->successfulAsyncConnectionMock(); + + $this->stream + ->expects($this->at(4)) + ->method('write') + ->with($this->matchesRegularExpression("#^POST / HTTP/1\.0\r\nHost: www.example.com\r\nUser-Agent:.*\r\n\r\n$#")); + $this->stream + ->expects($this->at(5)) + ->method('write') + ->with($this->identicalTo("some")); + $this->stream + ->expects($this->at(6)) + ->method('write') + ->with($this->identicalTo("post")); + $this->stream + ->expects($this->at(7)) + ->method('write') + ->with($this->identicalTo("data")); + + $factory = $this->createCallableMock(); + $factory->expects($this->once()) + ->method('__invoke') + ->will($this->returnValue($this->response)); + + $request->setResponseFactory($factory); + + $this->assertFalse($request->write("some")); + $this->assertFalse($request->write("post")); + + $request->once('drain', function () use ($request) { + $request->write("data"); + $request->end(); + }); + + $resolveConnection(); + + $request->handleData("HTTP/1.0 200 OK\r\n"); + $request->handleData("Content-Type: text/plain\r\n"); + $request->handleData("\r\nbody"); + } + /** @test */ public function pipeShouldPipeDataIntoTheRequestBody() { @@ -387,11 +436,22 @@ public function requestShouldRelayErrorEventsFromResponse() private function successfulConnectionMock() { + call_user_func($this->successfulAsyncConnectionMock()); + } + + private function successfulAsyncConnectionMock() + { + $deferred = new Deferred(); + $this->connector ->expects($this->once()) ->method('create') ->with('www.example.com', 80) - ->will($this->returnValue(new FulfilledPromise($this->stream))); + ->will($this->returnValue($deferred->promise())); + + return function () use ($deferred) { + $deferred->resolve($this->stream); + }; } private function rejectedConnectionMock()