Skip to content
This repository was archived by the owner on Jun 10, 2022. It is now read-only.

Add sync/async sockets differentiation for broker #212

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 44 additions & 24 deletions src/Broker.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
use Kafka\Sasl\Plain;
use Kafka\Sasl\Scram;
use function array_keys;
use function array_walk_recursive;
use function explode;
use function in_array;
use function serialize;
Expand All @@ -18,6 +19,9 @@ class Broker
{
use SingletonTrait;

public const SOCKET_MODE_ASYNC = 0;
public const SOCKET_MODE_SYNC = 1;

/**
* @var int
*/
Expand All @@ -43,6 +47,9 @@ class Broker
*/
private $dataSockets = [];

/** @var SocketFactory */
private $socketFactory;

/**
* @var callable|null
*/
Expand All @@ -53,6 +60,11 @@ class Broker
*/
private $config;

public function setSocketFactory(SocketFactory $socketFactory): void
{
$this->socketFactory = $socketFactory;
}

public function setProcess(callable $process): void
{
$this->process = $process;
Expand Down Expand Up @@ -134,12 +146,12 @@ public function getBrokers(): array
return $this->brokers;
}

public function getMetaConnect(string $key, bool $modeSync = false): ?CommonSocket
public function getMetaConnect(string $key, int $mode = self::SOCKET_MODE_ASYNC): ?CommonSocket
{
return $this->getConnect($key, 'metaSockets', $modeSync);
return $this->getConnect($key, 'metaSockets', $mode);
}

public function getRandConnect(bool $modeSync = false): ?CommonSocket
public function getRandConnect(int $mode = self::SOCKET_MODE_ASYNC): ?CommonSocket
{
$nodeIds = array_keys($this->brokers);
shuffle($nodeIds);
Expand All @@ -148,24 +160,24 @@ public function getRandConnect(bool $modeSync = false): ?CommonSocket
return null;
}

return $this->getMetaConnect((string) $nodeIds[0], $modeSync);
return $this->getMetaConnect((string) $nodeIds[0], $mode);
}

public function getDataConnect(string $key, bool $modeSync = false): ?CommonSocket
public function getDataConnect(string $key, int $mode = self::SOCKET_MODE_ASYNC): ?CommonSocket
{
return $this->getConnect($key, 'dataSockets', $modeSync);
return $this->getConnect($key, 'dataSockets', $mode);
}

public function getConnect(string $key, string $type, bool $modeSync = false): ?CommonSocket
public function getConnect(string $key, string $type, int $mode = self::SOCKET_MODE_ASYNC): ?CommonSocket
{
if (isset($this->{$type}[$key])) {
return $this->{$type}[$key];
if (isset($this->{$type}[$key][$mode])) {
return $this->{$type}[$key][$mode];
}

if (isset($this->brokers[$key])) {
$hostname = $this->brokers[$key];
if (isset($this->{$type}[$hostname])) {
return $this->{$type}[$hostname];
if (isset($this->{$type}[$hostname][$mode])) {
return $this->{$type}[$hostname][$mode];
}
}

Expand All @@ -182,19 +194,19 @@ public function getConnect(string $key, string $type, bool $modeSync = false): ?
[$host, $port] = explode(':', $key);
}

if ($host === null || $port === null || (! $modeSync && $this->process === null)) {
if ($host === null || $port === null || ($mode === self::SOCKET_MODE_ASYNC && $this->process === null)) {
return null;
}

try {
$socket = $this->getSocket((string) $host, (int) $port, $modeSync);
$socket = $this->getSocket((string) $host, (int) $port, $mode);

if ($socket instanceof Socket && $this->process !== null) {
$socket->setOnReadable($this->process);
}

$socket->connect();
$this->{$type}[$key] = $socket;
$this->{$type}[$key][$mode] = $socket;

return $socket;
} catch (\Throwable $e) {
Expand All @@ -205,30 +217,29 @@ public function getConnect(string $key, string $type, bool $modeSync = false): ?

public function clear(): void
{
foreach ($this->metaSockets as $key => $socket) {
$socket->close();
}
foreach ($this->dataSockets as $key => $socket) {
$sockets = [$this->metaSockets, $this->dataSockets];

array_walk_recursive($sockets, function (CommonSocket $socket): void {
$socket->close();
}
});

$this->brokers = [];
}

/**
* @throws \Kafka\Exception
*/
public function getSocket(string $host, int $port, bool $modeSync): CommonSocket
public function getSocket(string $host, int $port, int $mode): CommonSocket
{
$saslProvider = $this->judgeConnectionConfig();

if ($modeSync) {
return new SocketSync($host, $port, $this->config, $saslProvider);
if ($mode === self::SOCKET_MODE_SYNC) {
return $this->getSocketFactory()->createSocketSync($host, $port, $this->config, $saslProvider);
}

return new Socket($host, $port, $this->config, $saslProvider);
return $this->getSocketFactory()->createSocket($host, $port, $this->config, $saslProvider);
}


/**
* @throws \Kafka\Exception
*/
Expand Down Expand Up @@ -281,4 +292,13 @@ private function getSaslMechanismProvider(Config $config): SaslMechanism

throw new Exception(sprintf('"%s" is an invalid SASL mechanism', $mechanism));
}

private function getSocketFactory(): SocketFactory
{
if ($this->socketFactory === null) {
$this->socketFactory = new SocketFactory();
}

return $this->socketFactory;
}
}
4 changes: 2 additions & 2 deletions src/Producer/SyncProcess.php
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public function send(array $recordSet): array
$sendData = $this->convertRecordSet($recordSet);
$result = [];
foreach ($sendData as $brokerId => $topicList) {
$connect = $broker->getDataConnect((string) $brokerId, true);
$connect = $broker->getDataConnect((string) $brokerId, Broker::SOCKET_MODE_SYNC);

if ($connect === null) {
return [];
Expand Down Expand Up @@ -118,7 +118,7 @@ public function syncMeta(): void
$broker = $this->getBroker();

foreach ($brokerHost as $host) {
$socket = $broker->getMetaConnect($host, true);
$socket = $broker->getMetaConnect($host, Broker::SOCKET_MODE_SYNC);

if ($socket === null) {
continue;
Expand Down
25 changes: 25 additions & 0 deletions src/SocketFactory.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
<?php
declare(strict_types=1);

namespace Kafka;

class SocketFactory
{
public function createSocket(
string $host,
int $port,
?Config $config = null,
?SaslMechanism $saslProvider = null
): Socket {
return new Socket($host, $port, $config, $saslProvider);
}

public function createSocketSync(
string $host,
int $port,
?Config $config = null,
?SaslMechanism $saslProvider = null
): SocketSync {
return new SocketSync($host, $port, $config, $saslProvider);
}
}
33 changes: 31 additions & 2 deletions tests/Base/BrokerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

use Kafka\Broker;
use Kafka\Socket;
use Kafka\SocketFactory;
use Kafka\SocketSync;
use PHPUnit\Framework\TestCase;

Expand Down Expand Up @@ -95,7 +96,7 @@ public function testData(): void
$this->assertEquals($topics, $broker->getTopics());
}

public function getConnect(): void
public function testGetConnect(): void
{
$broker = $this->getBroker();
$data = [
Expand Down Expand Up @@ -128,6 +129,34 @@ public function getConnect(): void
$this->assertNull($result);
}

public function testGetConnectSyncAndAsyncForTheSameBroker(): void
{
$socket = $this->getMockBuilder(Socket::class)
->disableOriginalConstructor()
->getMock();

$socketSync = $this->getMockBuilder(SocketSync::class)
->disableOriginalConstructor()
->getMock();

$socketFactory = $this->getMockBuilder(SocketFactory::class)
->setMethods(['createSocket', 'createSocketSync'])
->getMock();

$socketFactory->method('createSocket')
->willReturn($socket);
$socketFactory->method('createSocketSync')
->willReturn($socketSync);

$broker = $this->getBroker();
$broker->setSocketFactory($socketFactory);
$broker->setProcess(function (): void {
});

$this->assertSame($socket, $broker->getConnect('kafka:9092', 'metaSockets'));
$this->assertSame($socketSync, $broker->getConnect('kafka:9092', 'metaSockets', Broker::SOCKET_MODE_SYNC));
}

public function testConnectRandFalse(): void
{
$broker = $this->getBroker();
Expand All @@ -141,7 +170,7 @@ public function testGetSocketNotSetConfig(): void
$broker = $this->getBroker();
$hostname = '127.0.0.1';
$port = 9092;
$socket = $broker->getSocket($hostname, $port, true);
$socket = $broker->getSocket($hostname, $port, Broker::SOCKET_MODE_SYNC);

$this->assertInstanceOf(SocketSync::class, $socket);
}
Expand Down