Skip to content

Commit

Permalink
Feature: chunked encoding (#58)
Browse files Browse the repository at this point in the history
  • Loading branch information
WyriHaximus authored Sep 14, 2016
1 parent 19dd5ac commit 7662852
Show file tree
Hide file tree
Showing 5 changed files with 412 additions and 5 deletions.
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ Interesting events emitted by Request:
Interesting events emitted by Response:

* `data`: Passes a chunk of the response body as first argument and a Response
object itself as second argument.
object itself as second argument. When a response encounters a chunked encoded response it will parse it transparently for the user of `Response` and removing the `Transfer-Encoding` header.
* `error`: An error occurred.
* `end`: The response has been fully received. If an error
occurred, it is passed as first argument.
Expand Down Expand Up @@ -55,6 +55,5 @@ $loop->run();
## TODO

* gzip content encoding
* chunked transfer encoding
* keep-alive connections
* following redirections
204 changes: 204 additions & 0 deletions src/ChunkedStreamDecoder.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
<?php

namespace React\HttpClient;

use Evenement\EventEmitterTrait;
use Exception;
use React\Stream\ReadableStreamInterface;
use React\Stream\Util;
use React\Stream\WritableStreamInterface;

/**
* @internal
*/
class ChunkedStreamDecoder implements ReadableStreamInterface
{
const CRLF = "\r\n";

use EventEmitterTrait;

/**
* @var string
*/
protected $buffer = '';

/**
* @var int
*/
protected $remainingLength = 0;

/**
* @var bool
*/
protected $nextChunkIsLength = true;

/**
* @var ReadableStreamInterface
*/
protected $stream;

/**
* @var bool
*/
protected $closed = false;

/**
* @var bool
*/
protected $reachedEnd = false;

/**
* @param ReadableStreamInterface $stream
*/
public function __construct(ReadableStreamInterface $stream)
{
$this->stream = $stream;
$this->stream->on('data', array($this, 'handleData'));
$this->stream->on('end', array($this, 'handleEnd'));
Util::forwardEvents($this->stream, $this, [
'error',
]);
}

/** @internal */
public function handleData($data)
{
$this->buffer .= $data;

do {
$bufferLength = strlen($this->buffer);
$continue = $this->iterateBuffer();
$iteratedBufferLength = strlen($this->buffer);
} while (
$continue &&
$bufferLength !== $iteratedBufferLength &&
$iteratedBufferLength > 0
);

if ($this->buffer === false) {
$this->buffer = '';
}
}

protected function iterateBuffer()
{
if (strlen($this->buffer) <= 1) {
return false;
}

if ($this->nextChunkIsLength) {
$crlfPosition = strpos($this->buffer, static::CRLF);
if ($crlfPosition === false && strlen($this->buffer) > 1024) {
$this->emit('error', [
new Exception('Chunk length header longer then 1024 bytes'),
]);
$this->close();
return false;
}
if ($crlfPosition === false) {
return false; // Chunk header hasn't completely come in yet
}
$this->nextChunkIsLength = false;
$lengthChunk = substr($this->buffer, 0, $crlfPosition);
if (strpos($lengthChunk, ';') !== false) {
list($lengthChunk) = explode(';', $lengthChunk, 2);
}
if (dechex(hexdec($lengthChunk)) !== $lengthChunk) {
$this->emit('error', [
new Exception('Unable to validate "' . $lengthChunk . '" as chunk length header'),
]);
$this->close();
return false;
}
$this->remainingLength = hexdec($lengthChunk);
$this->buffer = substr($this->buffer, $crlfPosition + 2);
return true;
}

if ($this->remainingLength > 0) {
$chunkLength = $this->getChunkLength();
if ($chunkLength === 0) {
return true;
}
$this->emit('data', array(
substr($this->buffer, 0, $chunkLength),
$this
));
$this->remainingLength -= $chunkLength;
$this->buffer = substr($this->buffer, $chunkLength);
return true;
}

$this->nextChunkIsLength = true;
$this->buffer = substr($this->buffer, 2);

if (substr($this->buffer, 0, 3) === "0\r\n") {
$this->reachedEnd = true;
$this->emit('end');
$this->close();
return false;
}
return true;
}

protected function getChunkLength()
{
$bufferLength = strlen($this->buffer);

if ($bufferLength >= $this->remainingLength) {
return $this->remainingLength;
}

This comment has been minimized.

Copy link
@mdrost

mdrost Mar 19, 2017

Contributor

return min($bufferLength, $this->remainingLength); is much more readable.


return $bufferLength;
}

public function pause()
{
$this->stream->pause();
}

public function resume()
{
$this->stream->resume();
}

public function isReadable()
{
return $this->stream->isReadable();
}

public function pipe(WritableStreamInterface $dest, array $options = array())
{
Util::pipe($this, $dest, $options);

return $dest;
}

public function close()
{
$this->closed = true;
return $this->stream->close();
}

/** @internal */
public function handleEnd()
{
if ($this->closed) {
return;
}

if ($this->buffer === '' && $this->reachedEnd) {
$this->emit('end');
$this->close();
return;
}

$this->emit(
'error',
[
new Exception('Stream ended with incomplete control code')
]
);
$this->close();
}
}
18 changes: 15 additions & 3 deletions src/Response.php
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,22 @@ public function __construct(DuplexStreamInterface $stream, $protocol, $version,
$this->code = $code;
$this->reasonPhrase = $reasonPhrase;
$this->headers = $headers;
$normalizedHeaders = array_change_key_case($headers, CASE_LOWER);

$stream->on('data', array($this, 'handleData'));
$stream->on('error', array($this, 'handleError'));
$stream->on('end', array($this, 'handleEnd'));
if (isset($normalizedHeaders['transfer-encoding']) && strtolower($normalizedHeaders['transfer-encoding']) === 'chunked') {
$this->stream = new ChunkedStreamDecoder($stream);

foreach ($this->headers as $key => $value) {
if (strcasecmp('transfer-encoding', $key) === 0) {
unset($this->headers[$key]);
break;
}
}
}

$this->stream->on('data', array($this, 'handleData'));
$this->stream->on('error', array($this, 'handleError'));
$this->stream->on('end', array($this, 'handleEnd'));
}

public function getProtocol()
Expand Down
143 changes: 143 additions & 0 deletions tests/DecodeChunkedStreamTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
<?php

namespace React\Tests\HttpClient;

use Exception;
use React\HttpClient\ChunkedStreamDecoder;
use React\Stream\ThroughStream;

class DecodeChunkedStreamTest extends TestCase
{
public function provideChunkedEncoding()
{
return [
'data-set-1' => [
["4\r\nWiki\r\n5\r\npedia\r\ne\r\n in\r\n\r\nchunks.\r\n0\r\n\r\n"],
],
'data-set-2' => [
["4\r\nWiki\r\n", "5\r\npedia\r\ne\r\n in\r\n\r\nchunks.\r\n0\r\n\r\n"],
],
'data-set-3' => [
["4\r\nWiki\r\n", "5\r\n", "pedia\r\ne\r\n in\r\n\r\nchunks.\r\n0\r\n\r\n"],
],
'data-set-4' => [
["4\r\nWiki\r\n", "5\r\n", "pedia\r\ne\r\n in\r\n", "\r\nchunks.\r\n0\r\n\r\n"],
],
'data-set-5' => [
["4\r\n", "Wiki\r\n", "5\r\n", "pedia\r\ne\r\n in\r\n", "\r\nchunks.\r\n0\r\n\r\n"],
],
'data-set-6' => [
["4\r\n", "Wiki\r\n", "5\r\n", "pedia\r\ne; foo=[bar,beer,pool,cue,win,won]\r\n", " in\r\n", "\r\nchunks.\r\n0\r\n\r\n"],
],
'header-fields' => [
["4; foo=bar\r\n", "Wiki\r\n", "5\r\n", "pedia\r\ne\r\n", " in\r\n", "\r\nchunks.\r\n", "0\r\n\r\n"],
],
'character-for-charactrr' => [
str_split("4\r\nWiki\r\n5\r\npedia\r\ne\r\n in\r\n\r\nchunks.\r\n0\r\n\r\n"),
],
'extra-newline-in-wiki-character-for-chatacter' => [
str_split("6\r\nWi\r\nki\r\n5\r\npedia\r\ne\r\n in\r\n\r\nchunks.\r\n0\r\n\r\n"),
"Wi\r\nkipedia in\r\n\r\nchunks."
],
'extra-newline-in-wiki' => [
["6\r\nWi\r\n", "ki\r\n5\r\npedia\r\ne\r\n in\r\n\r\nchunks.\r\n0\r\n\r\n"],
"Wi\r\nkipedia in\r\n\r\nchunks."
],
];
}

/**
* @test
* @dataProvider provideChunkedEncoding
*/
public function testChunkedEncoding(array $strings, $expected = "Wikipedia in\r\n\r\nchunks.")
{
$stream = new ThroughStream();
$response = new ChunkedStreamDecoder($stream);
$buffer = '';
$response->on('data', function ($data) use (&$buffer) {
$buffer .= $data;
});
$response->on('error', function (Exception $exception) {
throw $exception;
});
foreach ($strings as $string) {
$stream->write($string);
}
$this->assertSame($expected, $buffer);
}

public function provideInvalidChunkedEncoding()
{
return [
'chunk-body-longer-than-header-suggests' => [
["4\r\nWiwot40n98w3498tw3049nyn039409t34\r\n", "ki\r\n5\r\npedia\r\ne\r\n in\r\n\r\nchunks.\r\n0\r\n\r\n"],
],
'invalid-header-charactrrs' => [
str_split("xyz\r\nWi\r\nki\r\n5\r\npedia\r\ne\r\n in\r\n\r\nchunks.\r\n0\r\n\r\n")
],
'header-chunk-to-long' => [
str_split(str_repeat('a', 2015) . "\r\nWi\r\nki\r\n5\r\npedia\r\ne\r\n in\r\n\r\nchunks.\r\n0\r\n\r\n")
],
];
}

/**
* @test
* @dataProvider provideInvalidChunkedEncoding
* @expectedException Exception
*/
public function testInvalidChunkedEncoding(array $strings)
{
$stream = new ThroughStream();
$response = new ChunkedStreamDecoder($stream);
$response->on('error', function (Exception $exception) {
throw $exception;
});
foreach ($strings as $string) {
$stream->write($string);
}
}

public function testHandleEnd()
{
$ended = false;
$stream = new ThroughStream();
$response = new ChunkedStreamDecoder($stream);
$response->on('end', function () use (&$ended) {
$ended = true;
});

$stream->write("4\r\nWiki\r\n0\r\n\r\n");

$this->assertTrue($ended);
}

public function testHandleEndIncomplete()
{
$exception = null;
$stream = new ThroughStream();
$response = new ChunkedStreamDecoder($stream);
$response->on('error', function ($e) use (&$exception) {
$exception = $e;
});

$stream->end("4\r\nWiki");

$this->assertInstanceOf('Exception', $exception);
}

public function testHandleEndTrailers()
{
$ended = false;
$stream = new ThroughStream();
$response = new ChunkedStreamDecoder($stream);
$response->on('end', function () use (&$ended) {
$ended = true;
});

$stream->write("4\r\nWiki\r\n0\r\nabc: def\r\nghi: klm\r\n\r\n");

$this->assertTrue($ended);
}
}
Loading

0 comments on commit 7662852

Please sign in to comment.