Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
NSAmelchev committed Jul 11, 2024
1 parent 38b3739 commit 1b01fae
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1958,14 +1958,6 @@ public void removeTransmissionHandler(Object topic) {
"on local node [nodeId=" + ctx.localNodeId() + ']'));
}

/**
* @param topic The topic to interrupt receiver from.
* @param ex Exception.
*/
public void interruptTransmissionReceiver(Object topic, Exception ex) {
interruptReceiver(rcvCtxs.remove(topic), ex);
}

/**
* This method must be used prior to opening a {@link TransmissionSender} by calling
* {@link #openTransmissionSender(UUID, Object)} to ensure that remote and local nodes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@
import org.apache.ignite.internal.processors.marshaller.MappedName;
import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageImpl;
import org.apache.ignite.internal.processors.task.GridInternal;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
import org.apache.ignite.internal.util.BasicRateLimiter;
import org.apache.ignite.internal.util.GridBusyLock;
import org.apache.ignite.internal.util.GridCloseableIteratorAdapter;
Expand Down Expand Up @@ -721,6 +722,8 @@ public static String partDeltaFileName(int partId) {
busyLock.block();

try {
snpRmtMgr.stop();

restoreCacheGrpProc.interrupt(new NodeStoppingException("Node is stopping."));

// Try stop all snapshot processing if not yet.
Expand All @@ -729,8 +732,6 @@ public static String partDeltaFileName(int partId) {

locSnpTasks.clear();

snpRmtMgr.stop();

synchronized (snpOpMux) {
if (clusterSnpFut != null) {
clusterSnpFut.onDone(new NodeStoppingException(SNP_NODE_STOPPING_ERR_MSG));
Expand Down Expand Up @@ -3738,33 +3739,33 @@ private class SequentialRemoteSnapshotManager implements TransmissionHandler, Gr
/** {@code true} if the node is stopping. */
private boolean stopping;

/**
* @param next New task for scheduling.
*/
public synchronized void submit(IgniteSnapshotManager.RemoteSnapshotFilesRecevier next) {
/** @param next New task for scheduling. */
public void submit(IgniteSnapshotManager.RemoteSnapshotFilesRecevier next) {
assert next != null;

if (stopping) {
next.acceptException(new IgniteException(SNP_NODE_STOPPING_ERR_MSG));
synchronized (this) {
if (stopping) {
next.acceptException(new IgniteException(SNP_NODE_STOPPING_ERR_MSG));

return;
}
return;
}

RemoteSnapshotFilesRecevier curr = active;
if (active != null && !active.isDone()) {
queue.offer(next);

if (curr == null || curr.isDone()) {
next.listen(this::scheduleNext);
return;
}

active = next;

next.init();
active.listen(this::scheduleNext);
}
else
queue.offer(next);

next.init();
}

/** Schedule next async receiver. */
private synchronized void scheduleNext() {
private void scheduleNext() {
RemoteSnapshotFilesRecevier next = queue.poll();

if (next == null)
Expand All @@ -3774,44 +3775,42 @@ private synchronized void scheduleNext() {
}

/** Stopping handler. */
public synchronized void stop() {
stopping = true;

IgniteException err = new IgniteException(SNP_NODE_STOPPING_ERR_MSG);

if (active != null)
active.acceptException(err);
public void stop() {
synchronized (this) {
stopping = true;
}

RemoteSnapshotFilesRecevier r;

while ((r = queue.poll()) != null)
r.acceptException(err);
}
r.acceptException(new IgniteException(SNP_NODE_STOPPING_ERR_MSG));

/**
* @param nodeId A node left the cluster.
*/
public synchronized void onNodeLeft(UUID nodeId) {
if (active == null || active.isDone())
return;
if (active != null)
active.acceptException(new IgniteException(SNP_NODE_STOPPING_ERR_MSG));
}

/** @param nodeId A node left the cluster. */
public void onNodeLeft(UUID nodeId) {
ClusterTopologyCheckedException ex = new ClusterTopologyCheckedException("The node from which a snapshot has been " +
"requested left the grid");

if (active.rmtNodeId.equals(nodeId))
interruptActive(ex);

queue.forEach(t -> {
if (t.rmtNodeId.equals(nodeId))
t.acceptException(ex);
queue.forEach(r -> {
if (r.rmtNodeId.equals(nodeId))
r.acceptException(ex);
});
}

/** Interrupts current active task (if present) to safely shedule next. */
private void interruptActive(Exception e) {
cctx.kernalContext().io().interruptTransmissionReceiver(DFLT_INITIAL_SNAPSHOT_TOPIC, e);
RemoteSnapshotFilesRecevier active0 = active;

active.acceptException(e);
if (active0 != null && !active0.isDone() && active0.rmtNodeId.equals(nodeId)) {
// If the task was started, but the remote node did not start the transmission and did not send SnapshotFilesFailureMessage.
// Otherwise, the task will fail via TransmissionHandler#onException.
cctx.kernalContext().timeout().addTimeoutObject(
new GridTimeoutObjectAdapter(2 * cctx.kernalContext().config().getNetworkTimeout()) {
@Override public void onTimeout() {
active0.acceptException(ex);
}
});
}
}

/** {@inheritDoc} */
Expand Down Expand Up @@ -3881,20 +3880,20 @@ private void interruptActive(Exception e) {
else if (msg instanceof SnapshotFilesFailureMessage) {
SnapshotFilesFailureMessage respMsg0 = (SnapshotFilesFailureMessage)msg;

synchronized (this) {
if (active == null || !active.reqId.equals(respMsg0.id())) {
if (log.isInfoEnabled()) {
log.info("A stale snapshot response message has been received. Will be ignored " +
"[fromNodeId=" + nodeId + ", response=" + respMsg0 + ']');
}
RemoteSnapshotFilesRecevier task = active;

return;
if (task == null || !task.reqId.equals(respMsg0.id())) {
if (log.isInfoEnabled()) {
log.info("A stale snapshot response message has been received. Will be ignored " +
"[fromNodeId=" + nodeId + ", response=" + respMsg0 + ']');
}

if (respMsg0.errorMessage() != null) {
interruptActive(new IgniteCheckedException("Request cancelled. The snapshot operation " +
"stopped on the remote node with an error: " + respMsg0.errorMessage()));
}
return;
}

if (respMsg0.errorMessage() != null) {
task.acceptException(new IgniteCheckedException("Request cancelled. The snapshot operation stopped " +
"on the remote node with an error: " + respMsg0.errorMessage()));
}
}
}
Expand Down Expand Up @@ -3927,13 +3926,15 @@ else if (msg instanceof SnapshotFilesFailureMessage) {
}

/** {@inheritDoc} */
@Override public synchronized void onException(UUID nodeId, Throwable ex) {
if (active == null)
@Override public void onException(UUID nodeId, Throwable ex) {
RemoteSnapshotFilesRecevier task = active;

if (task == null)
return;

assert active.rmtNodeId.equals(nodeId);
assert task.rmtNodeId.equals(nodeId);

active.acceptException(ex);
task.acceptException(ex);
}

/** {@inheritDoc} */
Expand Down

0 comments on commit 1b01fae

Please sign in to comment.