From 3f39f80581e3f613aa1ca027d4c5f52f2774920e Mon Sep 17 00:00:00 2001 From: Igor Wiedler Date: Mon, 26 Nov 2012 02:22:07 +0100 Subject: [PATCH] [HttpClient] Call dns properly with promises, use promises internally --- ConnectionManager.php | 62 +++++++++++++++++----------------- ConnectionManagerInterface.php | 2 +- Request.php | 47 +++++++++++--------------- SecureConnectionManager.php | 49 +++++++++++++++------------ 4 files changed, 79 insertions(+), 81 deletions(-) diff --git a/ConnectionManager.php b/ConnectionManager.php index 718adba..6047858 100644 --- a/ConnectionManager.php +++ b/ConnectionManager.php @@ -3,8 +3,11 @@ namespace React\HttpClient; use React\EventLoop\LoopInterface; -use React\Stream\Stream; use React\Dns\Resolver\Resolver; +use React\Stream\Stream; +use React\Promise\Deferred; +use React\Promise\FulfilledPromise; +use React\Promise\RejectedPromise; class ConnectionManager implements ConnectionManagerInterface { @@ -17,54 +20,56 @@ public function __construct(LoopInterface $loop, Resolver $resolver) $this->resolver = $resolver; } - public function getConnection($callback, $host, $port) + public function getConnection($host, $port) { $that = $this; - $this->resolve(function ($address, $error = null) use ($that, $callback, $host, $port) { - if ($error) { - call_user_func($callback, null, new \RuntimeException( - sprintf("failed to resolve %s", $host), - 0, - $error - )); - return; - } - $that->getConnectionForAddress($callback, $address, $port); - }, $host); + + return $this + ->resolveHostname($host) + ->then(function ($address) use ($port, $that) { + return $that->getConnectionForAddress($address, $port); + }); } - public function getConnectionForAddress($callback, $address, $port) + public function getConnectionForAddress($address, $port) { $url = $this->getSocketUrl($address, $port); $socket = stream_socket_client($url, $errno, $errstr, ini_get("default_socket_timeout"), STREAM_CLIENT_CONNECT | STREAM_CLIENT_ASYNC_CONNECT); if (!$socket) { - call_user_func($callback, null, new \RuntimeException( + return new RejectedPromise(new \RuntimeException( sprintf("connection to %s:%d failed: %s", $addresss, $port, $errstr), $errno )); - return; } stream_set_blocking($socket, 0); // wait for connection - $loop = $this->loop; - $that = $this; + return $this + ->waitForStreamOnce($socket) + ->then(array($this, 'handleConnectedSocket')); + } - $this->loop->addWriteStream($socket, function () use ($that, $callback, $socket, $loop) { + protected function waitForStreamOnce($stream) + { + $deferred = new Deferred(); - $loop->removeWriteStream($socket); + $loop = $this->loop; - $that->handleConnectedSocket($callback, $socket); + $this->loop->addWriteStream($stream, function ($stream) use ($loop, $deferred) { + $loop->removeWriteStream($stream); + $deferred->resolve($stream); }); + + return $deferred->promise(); } - public function handleConnectedSocket($callback, $socket) + public function handleConnectedSocket($socket) { - call_user_func($callback, new Stream($socket, $this->loop)); + return new Stream($socket, $this->loop); } protected function getSocketUrl($host, $port) @@ -72,18 +77,13 @@ protected function getSocketUrl($host, $port) return sprintf('tcp://%s:%s', $host, $port); } - protected function resolve($callback, $host) + protected function resolveHostname($host) { if (false !== filter_var($host, FILTER_VALIDATE_IP)) { - call_user_func($callback, $host); - return; + return new FulfilledPromise($host); } - $this->resolver->resolve($host, function ($address) use ($callback) { - call_user_func($callback, $address); - }, function ($error) use ($callback) { - call_user_func($callback, null, $error); - }); + return $this->resolver->resolve($host); } } diff --git a/ConnectionManagerInterface.php b/ConnectionManagerInterface.php index 0bd0e80..02ccca5 100644 --- a/ConnectionManagerInterface.php +++ b/ConnectionManagerInterface.php @@ -4,5 +4,5 @@ interface ConnectionManagerInterface { - public function getConnection($callback, $host, $port); + public function getConnection($host, $port); } diff --git a/Request.php b/Request.php index c8d6998..d8c4e07 100644 --- a/Request.php +++ b/Request.php @@ -54,32 +54,28 @@ public function writeHead() $streamRef = &$this->stream; $stateRef = &$this->state; - $this->connect(function ($stream, \Exception $error = null) use ($that, $request, &$streamRef, &$stateRef) { - if (!$stream) { - $that->closeError(new \RuntimeException( - "Connection failed", - 0, - $error - )); - return; - } + $this + ->connect() + ->then( + function ($stream) use ($that, $request, &$streamRef, &$stateRef) { + $streamRef = $stream; - $streamRef = $stream; + $stream->on('drain', array($that, 'handleDrain')); + $stream->on('data', array($that, 'handleData')); + $stream->on('end', array($that, 'handleEnd')); + $stream->on('error', array($that, 'handleError')); - $stream->on('drain', array($that, 'handleDrain')); - $stream->on('data', array($that, 'handleData')); - $stream->on('end', array($that, 'handleEnd')); - $stream->on('error', array($that, 'handleError')); + $request->setProtocolVersion('1.0'); + $headers = (string) $request; - $request->setProtocolVersion('1.0'); - $headers = (string) $request; + $stream->write($headers); - $stream->write($headers); + $stateRef = Request::STATE_HEAD_WRITTEN; - $stateRef = Request::STATE_HEAD_WRITTEN; - - $that->emit('headers-written', array($that)); - }); + $that->emit('headers-written', array($that)); + }, + array($this, 'handleError') + ); } public function write($data) @@ -213,16 +209,13 @@ protected function parseResponse($data) return array($response, $parsed['body']); } - protected function connect($callback) + protected function connect() { $host = $this->request->getHost(); $port = $this->request->getPort(); - $connectionManager = $this->connectionManager; - $that = $this; - $connectionManager->getConnection(function ($stream, $error = null) use ($that, $callback) { - call_user_func($callback, $stream, $error); - }, $host, $port); + return $this->connectionManager + ->getConnection($host, $port); } public function setResponseFactory($factory) diff --git a/SecureConnectionManager.php b/SecureConnectionManager.php index b1ec33b..be4532a 100644 --- a/SecureConnectionManager.php +++ b/SecureConnectionManager.php @@ -4,39 +4,44 @@ use React\EventLoop\LoopInterface; use React\Stream\Stream; +use React\Promise\Deferred; +use React\Promise\ResolverInterface; class SecureConnectionManager extends ConnectionManager { - public function handleConnectedSocket($callback, $socket) + public function handleConnectedSocket($socket) { - $loop = $this->loop; + $that = $this; - $enableCrypto = function () use ($callback, $socket, $loop) { + $deferred = new Deferred(); - $result = stream_socket_enable_crypto($socket, true, STREAM_CRYPTO_METHOD_TLS_CLIENT); - - if (true === $result) { - // crypto was successfully enabled - $loop->removeWriteStream($socket); - $loop->removeReadStream($socket); - call_user_func($callback, new Stream($socket, $loop)); - - } else if (false === $result) { - // an error occured - $loop->removeWriteStream($socket); - $loop->removeReadStream($socket); - call_user_func($callback, null); - - } else { - // need more data, will retry - } + $enableCrypto = function () use ($that, $socket, $deferred) { + $that->enableCrypto($socket, $deferred); }; $this->loop->addWriteStream($socket, $enableCrypto); $this->loop->addReadStream($socket, $enableCrypto); - $enableCrypto(); + + return $deferred->promise(); } -} + public function enableCrypto($socket, ResolverInterface $resolver) + { + $result = stream_socket_enable_crypto($socket, true, STREAM_CRYPTO_METHOD_TLS_CLIENT); + + if (true === $result) { + $this->loop->removeWriteStream($socket); + $this->loop->removeReadStream($socket); + + $resolver->resolve(new Stream($socket, $this->loop)); + } else if (false === $result) { + $this->loop->removeWriteStream($socket); + $this->loop->removeReadStream($socket); + $resolver->reject(); + } else { + // need more data, will retry + } + } +}