diff --git a/README.md b/README.md index 1750a94..66132f0 100644 --- a/README.md +++ b/README.md @@ -47,8 +47,8 @@ For more examples see the [examples directory](examples). This project aims to be 100% compatible with [Pusher's features](https://pusher.com/features) in `v1.3`. - [X] Subscribe to channels -- [ ] Presence channels -- [ ] Authentication +- [x] Presence channels +- [x] Authentication # License diff --git a/src/ApiSettings.php b/src/ApiSettings.php index 544144f..b1179ec 100644 --- a/src/ApiSettings.php +++ b/src/ApiSettings.php @@ -31,9 +31,9 @@ public static function getVersion(string $version = ''): string * Create WebSocket URL for given App ID. * * @param string $appId - * @return string + * @return array */ - public static function createUrl(string $appId, string $cluster = null): string + public static function createUrl(string $appId, string $cluster = null, string $host = null): string { $query = [ 'client' => 'api-clients/pusher (https://php-api-clients.org/clients/pusher)', @@ -41,11 +41,13 @@ public static function createUrl(string $appId, string $cluster = null): string 'version' => ApiSettings::getVersion(), ]; - $host = ($cluster !== null) ? "ws-{$cluster}.pusher.com" : 'ws.pusherapp.com'; + if (!$host) { + $host = ($cluster !== null) ? "ws-{$cluster}.pusher.com" : 'ws.pusherapp.com'; + } return 'wss://'.$host.'/app/' . - $appId . - '?' . \http_build_query($query) - ; + $appId . + '?' . \http_build_query($query) + ; } } diff --git a/src/AsyncClient.php b/src/AsyncClient.php index 9915d24..1acf4ed 100644 --- a/src/AsyncClient.php +++ b/src/AsyncClient.php @@ -6,11 +6,11 @@ use React\EventLoop\LoopInterface; use RuntimeException; use Rx\Observable; -use function Rx\p; use Rx\Scheduler; use Rx\Subject\Subject; use Rx\Websocket\WebsocketErrorException; use Throwable; +use function Rx\p; final class AsyncClient { @@ -38,19 +38,32 @@ final class AsyncClient */ private $client; + /** + * @var string|null + */ + private $authEndpoint; + + /** + * @var array|null + */ + private $authEndpointHeaders; + /** * @var Observable */ private $connected; /** - * @internal - * @param Subject $client + * @param Subject $client + * * @throws \InvalidArgumentException + * @internal */ - public function __construct(Subject $client) + public function __construct(Subject $client, string $authEndpoint = null, array $authEndpointHeaders = null) { - $this->client = $client; + $this->client = $client; + $this->authEndpoint = $authEndpoint; + $this->authEndpointHeaders = $authEndpointHeaders; /** @var Observable $events */ $events = $client @@ -77,17 +90,21 @@ public function __construct(Subject $client) } /** - * @param LoopInterface $loop - * @param string $app Application ID - * @param Resolver $resolver Optional DNS resolver - * @throws \InvalidArgumentException + * @param LoopInterface $loop + * @param string $app Application ID + * @param Resolver $resolver Optional DNS resolver + * * @return AsyncClient + * @throws \InvalidArgumentException */ public static function create( LoopInterface $loop, string $app, Resolver $resolver = null, - string $cluster = null + string $cluster = null, + string $host = null, + string $authEndpoint = null, + array $authEndpointHeaders = null, ): AsyncClient { try { Scheduler::setDefaultFactory(function () use ($loop) { @@ -97,16 +114,19 @@ public static function create( } return new self( - WebSocket::createFactory(ApiSettings::createUrl($app, $cluster), false, [], $loop, $resolver) + WebSocket::createFactory(ApiSettings::createUrl($app, $cluster, $host), false, [], $loop, $resolver), + $authEndpoint, + $authEndpointHeaders ); } /** * Listen on a channel. * - * @param string $channel Channel to listen on - * @throws \InvalidArgumentException + * @param string $channel Channel to listen on + * * @return Observable + * @throws \InvalidArgumentException */ public function channel(string $channel): Observable { @@ -121,9 +141,15 @@ public function channel(string $channel): Observable }); $subscribe = $this->connected - ->do(function () use ($channel): void { + ->do(function (Event $event) use ($channel): void { + $authKey = $channelData = null; + + if (str_starts_with($channel, 'private-') || str_starts_with($channel, 'presence-')) { + [$authKey, $channelData] = $this->generateAuthToken($channel, $event->getData()['socket_id']); + } + // Subscribe to pusher channel after connected - $this->send(Event::subscribeOn($channel)); + $this->send(Event::subscribeOn($channel, $authKey, $channelData)); }) ->flatMapTo(Observable::empty()); @@ -152,7 +178,6 @@ public function channel(string $channel): Observable * Send a message through the client. * * @param array $message Message to send, will be json encoded - * */ public function send(array $message): void { @@ -163,7 +188,8 @@ public function send(array $message): void * Returns an observable of TimeoutException. * The timeout observable will get cancelled every time a new event is received. * - * @param Observable $events + * @param Observable $events + * * @return Observable */ private function timeout(Observable $events): Observable @@ -184,20 +210,22 @@ private function timeout(Observable $events): Observable } return Observable::never() - ->timeout($time) - ->catch(function () use ($time) { - // ping (do something that causes incoming stream to get a message) - $this->send(Event::ping()); - // this timeout will actually timeout with a TimeoutException - causing - // everything above this to dispose - return Observable::never()->timeout($time); - }); + ->timeout($time) + ->catch(function () use ($time) { + // ping (do something that causes incoming stream to get a message) + $this->send(Event::ping()); + // this timeout will actually timeout with a TimeoutException - causing + // everything above this to dispose + return Observable::never()->timeout($time); + }); }); } /** * Handle errors as described at https://pusher.com/docs/pusher_protocol#error-codes. - * @param Throwable $throwable + * + * @param Throwable $throwable + * * @return Observable */ private function handleLowLevelError(Throwable $throwable): Observable @@ -233,4 +261,44 @@ private function handleLowLevelError(Throwable $throwable): Observable return Observable::timer($this->delay); } + + /** + * @throws \Exception + */ + private function generateAuthToken(string $channel, string $socketId): array + { + if (!$this->authEndpoint) { + throw new \Exception('No auth endpoint is configured to connect private or presence channel.'); + } + + $curl = curl_init(); + + curl_setopt_array($curl, [ + CURLOPT_URL => $this->authEndpoint, + CURLOPT_RETURNTRANSFER => true, + CURLOPT_MAXREDIRS => 10, + CURLOPT_TIMEOUT => 5, + CURLOPT_HTTP_VERSION => CURL_HTTP_VERSION_1_1, + CURLOPT_CUSTOMREQUEST => 'POST', + CURLOPT_POSTFIELDS => ['channel_name' => $channel, 'socket_id' => $socketId, 'user_data' => []], + CURLOPT_HTTPHEADER => $this->authEndpointHeaders, + ]); + + $response = curl_exec($curl); + $responseCode = curl_getinfo($curl, CURLINFO_RESPONSE_CODE); + + curl_close($curl); + + if ($responseCode !== 200) { + throw new \Exception('Can\'t generate auth token for ' . $channel . '. Response code ' . $responseCode); + } + + $response = json_decode($response, true); + + if (!isset($response['auth'])) { + throw new \Exception('Invalid response for auth token.'); + } + + return [$response['auth'], $response['channel_data'] ?? null]; + } } diff --git a/src/Event.php b/src/Event.php index a1a4e9b..1154164 100644 --- a/src/Event.php +++ b/src/Event.php @@ -33,14 +33,23 @@ public function __construct(string $event, array $data, string $channel = '') public static function createFromMessage(array $message): self { + $data = []; + + if (isset($message['data'])) { + $data = $message['data']; + if (!\is_array($message['data'])) { + $data = \json_decode($message['data'], true); + } + } + return new self( $message['event'], - \is_array($message['data']) ? $message['data'] : \json_decode($message['data'], true), + $data, $message['channel'] ?? '' ); } - public function jsonSerialize() + public function jsonSerialize(): mixed { return \json_encode(['event' => $this->event, 'data' => $this->data, 'channel' => $this->channel]); } @@ -84,9 +93,19 @@ public static function connectionEstablished(Event $event): bool return $event->getEvent() === 'pusher:connection_established'; } - public static function subscribeOn(string $channel): array + public static function subscribeOn(string $channel, string $authKey = null, string $channelData = null): array { - return ['event' => 'pusher:subscribe', 'data' => ['channel' => $channel]]; + $data = ['channel' => $channel]; + + if ($authKey) { + $data['auth'] = $authKey; + } + + if ($channelData) { + $data['channel_data'] = $channelData; + } + + return ['event' => 'pusher:subscribe', 'data' => $data]; } public static function unsubscribeOn(string $channel): array