Skip to content

Commit

Permalink
Merge pull request #33 from arnaud-lb/drain-event
Browse files Browse the repository at this point in the history
Emit drain event when the request is ready to receive more data
  • Loading branch information
WyriHaximus committed Jun 16, 2015
2 parents f382515 + 4618230 commit 391e1e7
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 4 deletions.
16 changes: 13 additions & 3 deletions src/Request.php
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ class Request implements WritableStreamInterface
private $response;
private $state = self::STATE_INIT;

private $pendingWrites = array();

public function __construct(ConnectorInterface $connector, RequestData $requestData)
{
$this->connector = $connector;
Expand Down Expand Up @@ -89,9 +91,17 @@ public function write($data)
return $this->stream->write($data);
}

$this->on('headers-written', function ($this) use ($data) {
$this->write($data);
});
if (!count($this->pendingWrites)) {
$this->on('headers-written', function ($this) {
foreach ($this->pendingWrites as $pw) {
$this->write($pw);
}
$this->pendingWrites = array();
$this->emit('drain', array($this));
});
}

$this->pendingWrites[] = $data;

if (self::STATE_WRITING_HEAD > $this->state) {
$this->writeHead();
Expand Down
62 changes: 61 additions & 1 deletion tests/RequestTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
use React\Stream\Stream;
use React\Promise\FulfilledPromise;
use React\Promise\RejectedPromise;
use React\Promise;
use React\Promise\Deferred;

class RequestTest extends TestCase
{
Expand Down Expand Up @@ -288,6 +290,53 @@ public function writeWithAPostRequestShouldSendToTheStream()
$request->handleData("\r\nbody");
}

/** @test */
public function writeWithAPostRequestShouldSendBodyAfterHeadersAndEmitDrainEvent()
{
$requestData = new RequestData('POST', 'http://www.example.com');
$request = new Request($this->connector, $requestData);

$resolveConnection = $this->successfulAsyncConnectionMock();

$this->stream
->expects($this->at(4))
->method('write')
->with($this->matchesRegularExpression("#^POST / HTTP/1\.0\r\nHost: www.example.com\r\nUser-Agent:.*\r\n\r\n$#"));
$this->stream
->expects($this->at(5))
->method('write')
->with($this->identicalTo("some"));
$this->stream
->expects($this->at(6))
->method('write')
->with($this->identicalTo("post"));
$this->stream
->expects($this->at(7))
->method('write')
->with($this->identicalTo("data"));

$factory = $this->createCallableMock();
$factory->expects($this->once())
->method('__invoke')
->will($this->returnValue($this->response));

$request->setResponseFactory($factory);

$this->assertFalse($request->write("some"));
$this->assertFalse($request->write("post"));

$request->once('drain', function () use ($request) {
$request->write("data");
$request->end();
});

$resolveConnection();

$request->handleData("HTTP/1.0 200 OK\r\n");
$request->handleData("Content-Type: text/plain\r\n");
$request->handleData("\r\nbody");
}

/** @test */
public function pipeShouldPipeDataIntoTheRequestBody()
{
Expand Down Expand Up @@ -387,11 +436,22 @@ public function requestShouldRelayErrorEventsFromResponse()

private function successfulConnectionMock()
{
call_user_func($this->successfulAsyncConnectionMock());
}

private function successfulAsyncConnectionMock()
{
$deferred = new Deferred();

$this->connector
->expects($this->once())
->method('create')
->with('www.example.com', 80)
->will($this->returnValue(new FulfilledPromise($this->stream)));
->will($this->returnValue($deferred->promise()));

return function () use ($deferred) {
$deferred->resolve($this->stream);
};
}

private function rejectedConnectionMock()
Expand Down

0 comments on commit 391e1e7

Please sign in to comment.