Skip to content

Commit

Permalink
[HttpClient] Call dns properly with promises, use promises internally
Browse files Browse the repository at this point in the history
  • Loading branch information
igorw committed Nov 26, 2012
1 parent a4ef120 commit 3f39f80
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 81 deletions.
62 changes: 31 additions & 31 deletions ConnectionManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@
namespace React\HttpClient;

use React\EventLoop\LoopInterface;
use React\Stream\Stream;
use React\Dns\Resolver\Resolver;
use React\Stream\Stream;
use React\Promise\Deferred;
use React\Promise\FulfilledPromise;
use React\Promise\RejectedPromise;

class ConnectionManager implements ConnectionManagerInterface
{
Expand All @@ -17,73 +20,70 @@ public function __construct(LoopInterface $loop, Resolver $resolver)
$this->resolver = $resolver;
}

public function getConnection($callback, $host, $port)
public function getConnection($host, $port)
{
$that = $this;
$this->resolve(function ($address, $error = null) use ($that, $callback, $host, $port) {
if ($error) {
call_user_func($callback, null, new \RuntimeException(
sprintf("failed to resolve %s", $host),
0,
$error
));
return;
}
$that->getConnectionForAddress($callback, $address, $port);
}, $host);

return $this
->resolveHostname($host)
->then(function ($address) use ($port, $that) {
return $that->getConnectionForAddress($address, $port);
});
}

public function getConnectionForAddress($callback, $address, $port)
public function getConnectionForAddress($address, $port)
{
$url = $this->getSocketUrl($address, $port);

$socket = stream_socket_client($url, $errno, $errstr, ini_get("default_socket_timeout"), STREAM_CLIENT_CONNECT | STREAM_CLIENT_ASYNC_CONNECT);

if (!$socket) {
call_user_func($callback, null, new \RuntimeException(
return new RejectedPromise(new \RuntimeException(
sprintf("connection to %s:%d failed: %s", $addresss, $port, $errstr),
$errno
));
return;
}

stream_set_blocking($socket, 0);

// wait for connection

$loop = $this->loop;
$that = $this;
return $this
->waitForStreamOnce($socket)
->then(array($this, 'handleConnectedSocket'));
}

$this->loop->addWriteStream($socket, function () use ($that, $callback, $socket, $loop) {
protected function waitForStreamOnce($stream)
{
$deferred = new Deferred();

$loop->removeWriteStream($socket);
$loop = $this->loop;

$that->handleConnectedSocket($callback, $socket);
$this->loop->addWriteStream($stream, function ($stream) use ($loop, $deferred) {
$loop->removeWriteStream($stream);
$deferred->resolve($stream);
});

return $deferred->promise();
}

public function handleConnectedSocket($callback, $socket)
public function handleConnectedSocket($socket)
{
call_user_func($callback, new Stream($socket, $this->loop));
return new Stream($socket, $this->loop);
}

protected function getSocketUrl($host, $port)
{
return sprintf('tcp://%s:%s', $host, $port);
}

protected function resolve($callback, $host)
protected function resolveHostname($host)
{
if (false !== filter_var($host, FILTER_VALIDATE_IP)) {
call_user_func($callback, $host);
return;
return new FulfilledPromise($host);
}

$this->resolver->resolve($host, function ($address) use ($callback) {
call_user_func($callback, $address);
}, function ($error) use ($callback) {
call_user_func($callback, null, $error);
});
return $this->resolver->resolve($host);
}
}

2 changes: 1 addition & 1 deletion ConnectionManagerInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@

interface ConnectionManagerInterface
{
public function getConnection($callback, $host, $port);
public function getConnection($host, $port);
}
47 changes: 20 additions & 27 deletions Request.php
Original file line number Diff line number Diff line change
Expand Up @@ -54,32 +54,28 @@ public function writeHead()
$streamRef = &$this->stream;
$stateRef = &$this->state;

$this->connect(function ($stream, \Exception $error = null) use ($that, $request, &$streamRef, &$stateRef) {
if (!$stream) {
$that->closeError(new \RuntimeException(
"Connection failed",
0,
$error
));
return;
}
$this
->connect()
->then(
function ($stream) use ($that, $request, &$streamRef, &$stateRef) {
$streamRef = $stream;

$streamRef = $stream;
$stream->on('drain', array($that, 'handleDrain'));
$stream->on('data', array($that, 'handleData'));
$stream->on('end', array($that, 'handleEnd'));
$stream->on('error', array($that, 'handleError'));

$stream->on('drain', array($that, 'handleDrain'));
$stream->on('data', array($that, 'handleData'));
$stream->on('end', array($that, 'handleEnd'));
$stream->on('error', array($that, 'handleError'));
$request->setProtocolVersion('1.0');
$headers = (string) $request;

$request->setProtocolVersion('1.0');
$headers = (string) $request;
$stream->write($headers);

$stream->write($headers);
$stateRef = Request::STATE_HEAD_WRITTEN;

$stateRef = Request::STATE_HEAD_WRITTEN;

$that->emit('headers-written', array($that));
});
$that->emit('headers-written', array($that));
},
array($this, 'handleError')
);
}

public function write($data)
Expand Down Expand Up @@ -213,16 +209,13 @@ protected function parseResponse($data)
return array($response, $parsed['body']);
}

protected function connect($callback)
protected function connect()
{
$host = $this->request->getHost();
$port = $this->request->getPort();
$connectionManager = $this->connectionManager;
$that = $this;

$connectionManager->getConnection(function ($stream, $error = null) use ($that, $callback) {
call_user_func($callback, $stream, $error);
}, $host, $port);
return $this->connectionManager
->getConnection($host, $port);
}

public function setResponseFactory($factory)
Expand Down
49 changes: 27 additions & 22 deletions SecureConnectionManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,39 +4,44 @@

use React\EventLoop\LoopInterface;
use React\Stream\Stream;
use React\Promise\Deferred;
use React\Promise\ResolverInterface;

class SecureConnectionManager extends ConnectionManager
{
public function handleConnectedSocket($callback, $socket)
public function handleConnectedSocket($socket)
{
$loop = $this->loop;
$that = $this;

$enableCrypto = function () use ($callback, $socket, $loop) {
$deferred = new Deferred();

$result = stream_socket_enable_crypto($socket, true, STREAM_CRYPTO_METHOD_TLS_CLIENT);

if (true === $result) {
// crypto was successfully enabled
$loop->removeWriteStream($socket);
$loop->removeReadStream($socket);
call_user_func($callback, new Stream($socket, $loop));

} else if (false === $result) {
// an error occured
$loop->removeWriteStream($socket);
$loop->removeReadStream($socket);
call_user_func($callback, null);

} else {
// need more data, will retry
}
$enableCrypto = function () use ($that, $socket, $deferred) {
$that->enableCrypto($socket, $deferred);
};

$this->loop->addWriteStream($socket, $enableCrypto);
$this->loop->addReadStream($socket, $enableCrypto);

$enableCrypto();

return $deferred->promise();
}
}

public function enableCrypto($socket, ResolverInterface $resolver)
{
$result = stream_socket_enable_crypto($socket, true, STREAM_CRYPTO_METHOD_TLS_CLIENT);

if (true === $result) {
$this->loop->removeWriteStream($socket);
$this->loop->removeReadStream($socket);

$resolver->resolve(new Stream($socket, $this->loop));
} else if (false === $result) {
$this->loop->removeWriteStream($socket);
$this->loop->removeReadStream($socket);

$resolver->reject();
} else {
// need more data, will retry
}
}
}

0 comments on commit 3f39f80

Please sign in to comment.