diff --git a/src/AsyncClient.php b/src/AsyncClient.php index dfb3aab..1e60633 100644 --- a/src/AsyncClient.php +++ b/src/AsyncClient.php @@ -107,6 +107,7 @@ public function __construct(Observable $client) )); } + // If this event represents the connection_established event set the timeout if ($event->getEvent() === 'pusher:connection_established') { $this->setActivityTimeout($event); } @@ -167,6 +168,7 @@ public static function create(LoopInterface $loop, string $app, Resolver $resolv */ public function channel(string $channel): Observable { + // Only join a channel once if (isset($this->channels[$channel])) { return $this->channels[$channel]; } @@ -176,12 +178,14 @@ public function channel(string $channel): Observable return $event->getChannel() !== '' && $event->getChannel() === $channel; }); + // Observable representing channel events $events = Observable::create(function ( ObserverInterface $observer ) use ( $channel, $channelMessages ) { + // Subscribe to channel messages but filter out internal events $subscription = $channelMessages ->filter(function (Event $event) { return $event->getEvent() !== 'pusher_internal:subscription_succeeded'; @@ -231,6 +235,7 @@ public function send(array $message): bool */ private function handleLowLevelError(Throwable $throwable) { + // Only allow certain, relevant, exceptions if (!($throwable instanceof WebsocketErrorException) && !($throwable instanceof RuntimeException) && !($throwable instanceof PusherErrorException)