Skip to content

Commit

Permalink
fixed sync block
Browse files Browse the repository at this point in the history
  • Loading branch information
FewJuho committed Nov 1, 2024
1 parent db47a99 commit 5b32fcd
Showing 1 changed file with 117 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,70 +62,108 @@
* Communication channel with failover and partition awareness.
*/
final class ReliableChannel implements AutoCloseable {
/** Channel factory. */
/**
* Channel factory.
*/
private final BiFunction<ClientChannelConfiguration, ClientConnectionMultiplexer, ClientChannel> chFactory;

/** Client channel holders for each configured address. */
/**
* Client channel holders for each configured address.
*/
private volatile List<ClientChannelHolder> channels;

/** Limit of attempts to execute each service. */
/**
* Limit of attempts to execute each service.
*/
private volatile int attemptsLimit;

/** Index of the current channel. */
/**
* Index of the current channel.
*/
private volatile int curChIdx = -1;

/** Partition awareness enabled. */
/**
* Partition awareness enabled.
*/
final boolean partitionAwarenessEnabled;

/** Cache partition awareness context. */
/**
* Cache partition awareness context.
*/
private final ClientCacheAffinityContext affinityCtx;

/** Nodes discovery context. */
/**
* Nodes discovery context.
*/
private final ClientDiscoveryContext discoveryCtx;

/** Client configuration. */
/**
* Client configuration.
*/
private final ClientConfiguration clientCfg;

/** Logger. */
/**
* Logger.
*/
private final IgniteLogger log;

/** Node channels. */
/**
* Node channels.
*/
private final Map<UUID, ClientChannelHolder> nodeChannels = new ConcurrentHashMap<>();

/** Channels reinit was scheduled. */
/**
* Channels reinit was scheduled.
*/
private final AtomicBoolean scheduledChannelsReinit = new AtomicBoolean();

/** Timestamp of start of channels reinitialization. */
/**
* Timestamp of start of channels reinitialization.
*/
private volatile long startChannelsReInit;

/** Timestamp of finish of channels reinitialization. */
/**
* Timestamp of finish of channels reinitialization.
*/
private volatile long finishChannelsReInit;

/** Affinity map update is in progress. */
/**
* Affinity map update is in progress.
*/
private final AtomicBoolean affinityUpdateInProgress = new AtomicBoolean();

/** Channel is closed. */
/**
* Channel is closed.
*/
private volatile boolean closed;

/** Fail (disconnect) listeners. */
/**
* Fail (disconnect) listeners.
*/
private final ArrayList<Runnable> chFailLsnrs = new ArrayList<>();

/** Guard channels and curChIdx together. */
/**
* Guard channels and curChIdx together.
*/
private final ReadWriteLock curChannelsGuard = new ReentrantReadWriteLock();

/** Connection manager. */
/**
* Connection manager.
*/
private final ClientConnectionMultiplexer connMgr;

/** Open channels counter. */
/**
* Open channels counter.
*/
private final AtomicInteger channelsCnt = new AtomicInteger();

/**
* Constructor.
*/
ReliableChannel(
BiFunction<ClientChannelConfiguration, ClientConnectionMultiplexer, ClientChannel> chFactory,
ClientConfiguration clientCfg,
IgniteBinary binary
BiFunction<ClientChannelConfiguration, ClientConnectionMultiplexer, ClientChannel> chFactory,
ClientConfiguration clientCfg,
IgniteBinary binary
) {
if (chFactory == null)
throw new NullPointerException("chFactory");
Expand Down Expand Up @@ -154,7 +192,9 @@ final class ReliableChannel implements AutoCloseable {
log.debug("ReliableChannel created");
}

/** {@inheritDoc} */
/**
* {@inheritDoc}
*/
@Override public synchronized void close() {
if (log.isDebugEnabled())
log.debug("ReliableChannel stopping");
Expand All @@ -166,7 +206,7 @@ final class ReliableChannel implements AutoCloseable {
List<ClientChannelHolder> holders = channels;

if (holders != null) {
for (ClientChannelHolder hld: holders)
for (ClientChannelHolder hld : holders)
hld.close();
}

Expand All @@ -177,11 +217,11 @@ final class ReliableChannel implements AutoCloseable {
/**
* Send request and handle response.
*
* @throws ClientException Thrown by {@code payloadWriter} or {@code payloadReader}.
* @throws ClientException Thrown by {@code payloadWriter} or {@code payloadReader}.
* @throws ClientAuthenticationException When user name or password is invalid.
* @throws ClientAuthorizationException When user has no permission to perform operation.
* @throws ClientProtocolError When failed to handshake with server.
* @throws ClientServerError When failed to process request on server.
* @throws ClientAuthorizationException When user has no permission to perform operation.
* @throws ClientProtocolError When failed to handshake with server.
* @throws ClientServerError When failed to process request on server.
*/
public <T> T service(
ClientOperation op,
Expand All @@ -194,11 +234,11 @@ public <T> T service(
/**
* Send request to one of the passed nodes and handle response.
*
* @throws ClientException Thrown by {@code payloadWriter} or {@code payloadReader}.
* @throws ClientException Thrown by {@code payloadWriter} or {@code payloadReader}.
* @throws ClientAuthenticationException When user name or password is invalid.
* @throws ClientAuthorizationException When user has no permission to perform operation.
* @throws ClientProtocolError When failed to handshake with server.
* @throws ClientServerError When failed to process request on server.
* @throws ClientAuthorizationException When user has no permission to perform operation.
* @throws ClientProtocolError When failed to handshake with server.
* @throws ClientServerError When failed to process request on server.
*/
public <T> T service(
ClientOperation op,
Expand All @@ -220,9 +260,9 @@ public <T> T service(
* Send request and handle response asynchronously.
*/
public <T> IgniteClientFuture<T> serviceAsync(
ClientOperation op,
Consumer<PayloadOutputChannel> payloadWriter,
Function<PayloadInputChannel, T> payloadReader
ClientOperation op,
Consumer<PayloadOutputChannel> payloadWriter,
Function<PayloadInputChannel, T> payloadReader
) throws ClientException, ClientError {
CompletableFuture<T> fut = new CompletableFuture<>();

Expand Down Expand Up @@ -254,7 +294,9 @@ private <T> void handleServiceAsync(
}
}

/** */
/**
*
*/
private <T> Object applyOnClientChannelAsync(
final CompletableFuture<T> fut,
ClientChannel ch,
Expand Down Expand Up @@ -306,7 +348,7 @@ private <T> Object applyOnClientChannelAsync(
* Send request without payload and handle response.
*/
public <T> T service(ClientOperation op, Function<PayloadInputChannel, T> payloadReader)
throws ClientException, ClientError {
throws ClientException, ClientError {
return service(op, null, payloadReader);
}

Expand Down Expand Up @@ -451,9 +493,9 @@ private boolean affinityInfoIsUpToDate(int cacheId) {
return false;

Boolean result = applyOnNodeChannel(nodeId, channel ->
channel.service(ClientOperation.CACHE_PARTITIONS,
affinityCtx::writePartitionsUpdateRequest,
affinityCtx::readPartitionsUpdateResponse),
channel.service(ClientOperation.CACHE_PARTITIONS,
affinityCtx::writePartitionsUpdateRequest,
affinityCtx::readPartitionsUpdateResponse),
failures
);

Expand Down Expand Up @@ -554,9 +596,6 @@ else if (scheduledChannelsReinit.get() && !partitionAwarenessEnabled) {
}
}

/**
* Asynchronously try to establish a connection to all configured servers.
*/
/**
* Asynchronously try to establish a connection to all configured servers.
*/
Expand All @@ -581,7 +620,6 @@ private void initAllChannelsAsync() {
);
}


/**
* Topology version change detected on the channel.
*
Expand Down Expand Up @@ -808,7 +846,9 @@ private <T> T applyOnNodeChannel(
return null;
}

/** */
/**
*
*/
<T> T applyOnDefaultChannel(Function<ClientChannel, T> function, ClientOperation op) {
return applyOnDefaultChannel(function, op, null);
}
Expand Down Expand Up @@ -889,7 +929,9 @@ private <T> T applyOnDefaultChannel(
throw composeException(failures);
}

/** */
/**
*
*/
private ClientConnectionException composeException(List<ClientConnectionException> failures) {
if (F.isEmpty(failures))
return null;
Expand All @@ -905,7 +947,8 @@ private ClientConnectionException composeException(List<ClientConnectionExceptio
* Try apply specified {@code function} on a channel corresponding to {@code tryNodeId}.
* If failed then apply the function on any available channel.
*/
private <T> T applyOnNodeChannelWithFallback(UUID tryNodeId, Function<ClientChannel, T> function, ClientOperation op) {
private <T> T applyOnNodeChannelWithFallback(UUID tryNodeId, Function<ClientChannel, T> function,
ClientOperation op) {
ClientChannelHolder hld = nodeChannels.get(tryNodeId);

List<ClientConnectionException> failures = null;
Expand All @@ -932,7 +975,9 @@ private <T> T applyOnNodeChannelWithFallback(UUID tryNodeId, Function<ClientChan
return applyOnDefaultChannel(function, op, failures);
}

/** Get retry limit. */
/**
* Get retry limit.
*/
private int getRetryLimit() {
List<ClientChannelHolder> holders = channels;

Expand All @@ -944,7 +989,9 @@ private int getRetryLimit() {
return clientCfg.getRetryLimit() > 0 ? Math.min(clientCfg.getRetryLimit(), size) : size;
}

/** Determines whether specified operation should be retried. */
/**
* Determines whether specified operation should be retried.
*/
private boolean shouldRetry(ClientOperation op, int iteration, ClientConnectionException exception) {
ClientOperationType opType = op.toPublicOperationType();

Expand Down Expand Up @@ -983,7 +1030,9 @@ ClientCacheAffinityContext affinityContext() {
return affinityCtx;
}

/** */
/**
*
*/
private boolean isConnectionEstablished(UUID node) {
ClientChannelHolder chHolder = nodeChannels.get(node);

Expand All @@ -1000,19 +1049,29 @@ private boolean isConnectionEstablished(UUID node) {
*/
@SuppressWarnings("PackageVisibleInnerClass") // Visible for tests.
class ClientChannelHolder {
/** Channel configuration. */
/**
* Channel configuration.
*/
private volatile ClientChannelConfiguration chCfg;

/** Channel. */
/**
* Channel.
*/
private volatile ClientChannel ch;

/** ID of the last server node that {@link #ch} is or was connected to. */
/**
* ID of the last server node that {@link #ch} is or was connected to.
*/
private volatile UUID serverNodeId;

/** Address that holder is bind to (chCfg.addr) is not in use now. So close the holder. */
/**
* Address that holder is bind to (chCfg.addr) is not in use now. So close the holder.
*/
private volatile boolean close;

/** Timestamps of reconnect retries. */
/**
* Timestamps of reconnect retries.
*/
private final long[] reconnectRetries;

/**
Expand Down Expand Up @@ -1062,6 +1121,9 @@ private ClientChannel getOrCreateChannel(boolean ignoreThrottling)
throw new ClientConnectionException("Channel is closed [addresses=" + getAddresses() + ']');

if (ch == null) {
if (ch != null)
return ch;

synchronized (this) {
if (close)
throw new ClientConnectionException("Channel is closed [addresses=" + getAddresses() + ']');
Expand All @@ -1088,7 +1150,6 @@ private ClientChannel getOrCreateChannel(boolean ignoreThrottling)
nodeChannels.putIfAbsent(channel.serverNodeId(), this);
}
}

ch = channel;

channelsCnt.incrementAndGet();
Expand Down

0 comments on commit 5b32fcd

Please sign in to comment.