Skip to content

Commit

Permalink
feat: handle private and presence auth
Browse files Browse the repository at this point in the history
  • Loading branch information
L3o-pold committed Feb 6, 2023
1 parent 3bc39c6 commit e0f4340
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 38 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ For more examples see the [examples directory](examples).
This project aims to be 100% compatible with [Pusher's features](https://pusher.com/features) in `v1.3`.

- [X] Subscribe to channels
- [ ] Presence channels
- [ ] Authentication
- [x] Presence channels
- [x] Authentication

# License

Expand Down
14 changes: 8 additions & 6 deletions src/ApiSettings.php
Original file line number Diff line number Diff line change
Expand Up @@ -31,21 +31,23 @@ public static function getVersion(string $version = ''): string
* Create WebSocket URL for given App ID.
*
* @param string $appId
* @return string
* @return array
*/
public static function createUrl(string $appId, string $cluster = null): string
public static function createUrl(string $appId, string $cluster = null, string $host = null): string
{
$query = [
'client' => 'api-clients/pusher (https://php-api-clients.org/clients/pusher)',
'protocol' => 7,
'version' => ApiSettings::getVersion(),
];

$host = ($cluster !== null) ? "ws-{$cluster}.pusher.com" : 'ws.pusherapp.com';
if (!$host) {
$host = ($cluster !== null) ? "ws-{$cluster}.pusher.com" : 'ws.pusherapp.com';
}

return 'wss://'.$host.'/app/' .
$appId .
'?' . \http_build_query($query)
;
$appId .
'?' . \http_build_query($query)
;
}
}
120 changes: 94 additions & 26 deletions src/AsyncClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@
use React\EventLoop\LoopInterface;
use RuntimeException;
use Rx\Observable;
use function Rx\p;
use Rx\Scheduler;
use Rx\Subject\Subject;
use Rx\Websocket\WebsocketErrorException;
use Throwable;
use function Rx\p;

final class AsyncClient
{
Expand Down Expand Up @@ -38,19 +38,32 @@ final class AsyncClient
*/
private $client;

/**
* @var string|null
*/
private $authEndpoint;

/**
* @var array|null
*/
private $authEndpointHeaders;

/**
* @var Observable
*/
private $connected;

/**
* @internal
* @param Subject $client
* @param Subject $client
*
* @throws \InvalidArgumentException
* @internal
*/
public function __construct(Subject $client)
public function __construct(Subject $client, string $authEndpoint = null, array $authEndpointHeaders = null)
{
$this->client = $client;
$this->client = $client;
$this->authEndpoint = $authEndpoint;
$this->authEndpointHeaders = $authEndpointHeaders;

/** @var Observable $events */
$events = $client
Expand All @@ -77,17 +90,21 @@ public function __construct(Subject $client)
}

/**
* @param LoopInterface $loop
* @param string $app Application ID
* @param Resolver $resolver Optional DNS resolver
* @throws \InvalidArgumentException
* @param LoopInterface $loop
* @param string $app Application ID
* @param Resolver $resolver Optional DNS resolver
*
* @return AsyncClient
* @throws \InvalidArgumentException
*/
public static function create(
LoopInterface $loop,
string $app,
Resolver $resolver = null,
string $cluster = null
string $cluster = null,
string $host = null,
string $authEndpoint = null,
array $authEndpointHeaders = null,
): AsyncClient {
try {
Scheduler::setDefaultFactory(function () use ($loop) {
Expand All @@ -97,16 +114,19 @@ public static function create(
}

return new self(
WebSocket::createFactory(ApiSettings::createUrl($app, $cluster), false, [], $loop, $resolver)
WebSocket::createFactory(ApiSettings::createUrl($app, $cluster, $host), false, [], $loop, $resolver),
$authEndpoint,
$authEndpointHeaders
);
}

/**
* Listen on a channel.
*
* @param string $channel Channel to listen on
* @throws \InvalidArgumentException
* @param string $channel Channel to listen on
*
* @return Observable
* @throws \InvalidArgumentException
*/
public function channel(string $channel): Observable
{
Expand All @@ -121,9 +141,15 @@ public function channel(string $channel): Observable
});

$subscribe = $this->connected
->do(function () use ($channel): void {
->do(function (Event $event) use ($channel): void {
$authKey = $channelData = null;

if (str_starts_with($channel, 'private-') || str_starts_with($channel, 'presence-')) {
[$authKey, $channelData] = $this->generateAuthToken($channel, $event->getData()['socket_id']);
}

// Subscribe to pusher channel after connected
$this->send(Event::subscribeOn($channel));
$this->send(Event::subscribeOn($channel, $authKey, $channelData));
})
->flatMapTo(Observable::empty());

Expand Down Expand Up @@ -152,7 +178,6 @@ public function channel(string $channel): Observable
* Send a message through the client.
*
* @param array $message Message to send, will be json encoded
*
*/
public function send(array $message): void
{
Expand All @@ -163,7 +188,8 @@ public function send(array $message): void
* Returns an observable of TimeoutException.
* The timeout observable will get cancelled every time a new event is received.
*
* @param Observable $events
* @param Observable $events
*
* @return Observable
*/
private function timeout(Observable $events): Observable
Expand All @@ -184,20 +210,22 @@ private function timeout(Observable $events): Observable
}

return Observable::never()
->timeout($time)
->catch(function () use ($time) {
// ping (do something that causes incoming stream to get a message)
$this->send(Event::ping());
// this timeout will actually timeout with a TimeoutException - causing
// everything above this to dispose
return Observable::never()->timeout($time);
});
->timeout($time)
->catch(function () use ($time) {
// ping (do something that causes incoming stream to get a message)
$this->send(Event::ping());
// this timeout will actually timeout with a TimeoutException - causing
// everything above this to dispose
return Observable::never()->timeout($time);
});
});
}

/**
* Handle errors as described at https://pusher.com/docs/pusher_protocol#error-codes.
* @param Throwable $throwable
*
* @param Throwable $throwable
*
* @return Observable
*/
private function handleLowLevelError(Throwable $throwable): Observable
Expand Down Expand Up @@ -233,4 +261,44 @@ private function handleLowLevelError(Throwable $throwable): Observable

return Observable::timer($this->delay);
}

/**
* @throws \Exception
*/
private function generateAuthToken(string $channel, string $socketId): array
{
if (!$this->authEndpoint) {
throw new \Exception('No auth endpoint is configured to connect private or presence channel.');
}

$curl = curl_init();

curl_setopt_array($curl, [
CURLOPT_URL => $this->authEndpoint,
CURLOPT_RETURNTRANSFER => true,
CURLOPT_MAXREDIRS => 10,
CURLOPT_TIMEOUT => 5,
CURLOPT_HTTP_VERSION => CURL_HTTP_VERSION_1_1,
CURLOPT_CUSTOMREQUEST => 'POST',
CURLOPT_POSTFIELDS => ['channel_name' => $channel, 'socket_id' => $socketId, 'user_data' => []],
CURLOPT_HTTPHEADER => $this->authEndpointHeaders,
]);

$response = curl_exec($curl);
$responseCode = curl_getinfo($curl, CURLINFO_RESPONSE_CODE);

curl_close($curl);

if ($responseCode !== 200) {
throw new \Exception('Can\'t generate auth token for ' . $channel . '. Response code ' . $responseCode);
}

$response = json_decode($response, true);

if (!isset($response['auth'])) {
throw new \Exception('Invalid response for auth token.');
}

return [$response['auth'], $response['channel_data'] ?? null];
}
}
23 changes: 19 additions & 4 deletions src/Event.php
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,23 @@ public function __construct(string $event, array $data, string $channel = '')

public static function createFromMessage(array $message): self
{
$data = [];

if (isset($message['data'])) {
$data = $message['data'];
if (!\is_array($message['data'])) {
$data = \json_decode($message['data'], true);
}
}

return new self(
$message['event'],
\is_array($message['data']) ? $message['data'] : \json_decode($message['data'], true),
$data,
$message['channel'] ?? ''
);
}

public function jsonSerialize()
public function jsonSerialize(): mixed
{
return \json_encode(['event' => $this->event, 'data' => $this->data, 'channel' => $this->channel]);
}
Expand Down Expand Up @@ -84,9 +93,15 @@ public static function connectionEstablished(Event $event): bool
return $event->getEvent() === 'pusher:connection_established';
}

public static function subscribeOn(string $channel): array
public static function subscribeOn(string $channel, string $authKey = null): array
{
return ['event' => 'pusher:subscribe', 'data' => ['channel' => $channel]];
$data = ['channel' => $channel];

if ($authKey) {
$data['auth'] = $authKey;
}

return ['event' => 'pusher:subscribe', 'data' => $data];
}

public static function unsubscribeOn(string $channel): array
Expand Down

0 comments on commit e0f4340

Please sign in to comment.