Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: handle private and presence auth #48

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ For more examples see the [examples directory](examples).
This project aims to be 100% compatible with [Pusher's features](https://pusher.com/features) in `v1.3`.

- [X] Subscribe to channels
- [ ] Presence channels
- [ ] Authentication
- [x] Presence channels
- [x] Authentication

# License

Expand Down
14 changes: 8 additions & 6 deletions src/ApiSettings.php
Original file line number Diff line number Diff line change
Expand Up @@ -31,21 +31,23 @@ public static function getVersion(string $version = ''): string
* Create WebSocket URL for given App ID.
*
* @param string $appId
* @return string
* @return array
*/
public static function createUrl(string $appId, string $cluster = null): string
public static function createUrl(string $appId, string $cluster = null, string $host = null): string
{
$query = [
'client' => 'api-clients/pusher (https://php-api-clients.org/clients/pusher)',
'protocol' => 7,
'version' => ApiSettings::getVersion(),
];

$host = ($cluster !== null) ? "ws-{$cluster}.pusher.com" : 'ws.pusherapp.com';
if (!$host) {
$host = ($cluster !== null) ? "ws-{$cluster}.pusher.com" : 'ws.pusherapp.com';
}

return 'wss://'.$host.'/app/' .
$appId .
'?' . \http_build_query($query)
;
$appId .
'?' . \http_build_query($query)
;
}
}
120 changes: 94 additions & 26 deletions src/AsyncClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@
use React\EventLoop\LoopInterface;
use RuntimeException;
use Rx\Observable;
use function Rx\p;
use Rx\Scheduler;
use Rx\Subject\Subject;
use Rx\Websocket\WebsocketErrorException;
use Throwable;
use function Rx\p;

final class AsyncClient
{
Expand Down Expand Up @@ -38,19 +38,32 @@ final class AsyncClient
*/
private $client;

/**
* @var string|null
*/
private $authEndpoint;

/**
* @var array|null
*/
private $authEndpointHeaders;

/**
* @var Observable
*/
private $connected;

/**
* @internal
* @param Subject $client
* @param Subject $client
*
* @throws \InvalidArgumentException
* @internal
*/
public function __construct(Subject $client)
public function __construct(Subject $client, string $authEndpoint = null, array $authEndpointHeaders = null)
{
$this->client = $client;
$this->client = $client;
$this->authEndpoint = $authEndpoint;
$this->authEndpointHeaders = $authEndpointHeaders;

/** @var Observable $events */
$events = $client
Expand All @@ -77,17 +90,21 @@ public function __construct(Subject $client)
}

/**
* @param LoopInterface $loop
* @param string $app Application ID
* @param Resolver $resolver Optional DNS resolver
* @throws \InvalidArgumentException
* @param LoopInterface $loop
* @param string $app Application ID
* @param Resolver $resolver Optional DNS resolver
*
* @return AsyncClient
* @throws \InvalidArgumentException
*/
public static function create(
LoopInterface $loop,
string $app,
Resolver $resolver = null,
string $cluster = null
string $cluster = null,
string $host = null,
string $authEndpoint = null,
array $authEndpointHeaders = null,
): AsyncClient {
try {
Scheduler::setDefaultFactory(function () use ($loop) {
Expand All @@ -97,16 +114,19 @@ public static function create(
}

return new self(
WebSocket::createFactory(ApiSettings::createUrl($app, $cluster), false, [], $loop, $resolver)
WebSocket::createFactory(ApiSettings::createUrl($app, $cluster, $host), false, [], $loop, $resolver),
$authEndpoint,
$authEndpointHeaders
);
}

/**
* Listen on a channel.
*
* @param string $channel Channel to listen on
* @throws \InvalidArgumentException
* @param string $channel Channel to listen on
*
* @return Observable
* @throws \InvalidArgumentException
*/
public function channel(string $channel): Observable
{
Expand All @@ -121,9 +141,15 @@ public function channel(string $channel): Observable
});

$subscribe = $this->connected
->do(function () use ($channel): void {
->do(function (Event $event) use ($channel): void {
$authKey = $channelData = null;

if (str_starts_with($channel, 'private-') || str_starts_with($channel, 'presence-')) {
[$authKey, $channelData] = $this->generateAuthToken($channel, $event->getData()['socket_id']);
}

// Subscribe to pusher channel after connected
$this->send(Event::subscribeOn($channel));
$this->send(Event::subscribeOn($channel, $authKey, $channelData));
})
->flatMapTo(Observable::empty());

Expand Down Expand Up @@ -152,7 +178,6 @@ public function channel(string $channel): Observable
* Send a message through the client.
*
* @param array $message Message to send, will be json encoded
*
*/
public function send(array $message): void
{
Expand All @@ -163,7 +188,8 @@ public function send(array $message): void
* Returns an observable of TimeoutException.
* The timeout observable will get cancelled every time a new event is received.
*
* @param Observable $events
* @param Observable $events
*
* @return Observable
*/
private function timeout(Observable $events): Observable
Expand All @@ -184,20 +210,22 @@ private function timeout(Observable $events): Observable
}

return Observable::never()
->timeout($time)
->catch(function () use ($time) {
// ping (do something that causes incoming stream to get a message)
$this->send(Event::ping());
// this timeout will actually timeout with a TimeoutException - causing
// everything above this to dispose
return Observable::never()->timeout($time);
});
->timeout($time)
->catch(function () use ($time) {
// ping (do something that causes incoming stream to get a message)
$this->send(Event::ping());
// this timeout will actually timeout with a TimeoutException - causing
// everything above this to dispose
return Observable::never()->timeout($time);
});
});
}

/**
* Handle errors as described at https://pusher.com/docs/pusher_protocol#error-codes.
* @param Throwable $throwable
*
* @param Throwable $throwable
*
* @return Observable
*/
private function handleLowLevelError(Throwable $throwable): Observable
Expand Down Expand Up @@ -233,4 +261,44 @@ private function handleLowLevelError(Throwable $throwable): Observable

return Observable::timer($this->delay);
}

/**
* @throws \Exception
*/
private function generateAuthToken(string $channel, string $socketId): array
{
if (!$this->authEndpoint) {
throw new \Exception('No auth endpoint is configured to connect private or presence channel.');
}

$curl = curl_init();

curl_setopt_array($curl, [
CURLOPT_URL => $this->authEndpoint,
CURLOPT_RETURNTRANSFER => true,
CURLOPT_MAXREDIRS => 10,
CURLOPT_TIMEOUT => 5,
CURLOPT_HTTP_VERSION => CURL_HTTP_VERSION_1_1,
CURLOPT_CUSTOMREQUEST => 'POST',
CURLOPT_POSTFIELDS => ['channel_name' => $channel, 'socket_id' => $socketId, 'user_data' => []],
CURLOPT_HTTPHEADER => $this->authEndpointHeaders,
]);

$response = curl_exec($curl);
$responseCode = curl_getinfo($curl, CURLINFO_RESPONSE_CODE);

curl_close($curl);

if ($responseCode !== 200) {
throw new \Exception('Can\'t generate auth token for ' . $channel . '. Response code ' . $responseCode);
}

$response = json_decode($response, true);

if (!isset($response['auth'])) {
throw new \Exception('Invalid response for auth token.');
}

return [$response['auth'], $response['channel_data'] ?? null];
}
}
27 changes: 23 additions & 4 deletions src/Event.php
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,23 @@ public function __construct(string $event, array $data, string $channel = '')

public static function createFromMessage(array $message): self
{
$data = [];

if (isset($message['data'])) {
$data = $message['data'];
if (!\is_array($message['data'])) {
$data = \json_decode($message['data'], true);
}
}

return new self(
$message['event'],
\is_array($message['data']) ? $message['data'] : \json_decode($message['data'], true),
$data,
$message['channel'] ?? ''
);
}

public function jsonSerialize()
public function jsonSerialize(): mixed
{
return \json_encode(['event' => $this->event, 'data' => $this->data, 'channel' => $this->channel]);
}
Expand Down Expand Up @@ -84,9 +93,19 @@ public static function connectionEstablished(Event $event): bool
return $event->getEvent() === 'pusher:connection_established';
}

public static function subscribeOn(string $channel): array
public static function subscribeOn(string $channel, string $authKey = null, string $channelData = null): array
{
return ['event' => 'pusher:subscribe', 'data' => ['channel' => $channel]];
$data = ['channel' => $channel];

if ($authKey) {
$data['auth'] = $authKey;
}

if ($channelData) {
$data['channel_data'] = $channelData;
}

return ['event' => 'pusher:subscribe', 'data' => $data];
}

public static function unsubscribeOn(string $channel): array
Expand Down