Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-22319 Fixed node crashing if a snapshot restore cancelled due to network issues #11361

Merged
merged 24 commits into from
Jul 31, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
1ba83f5
IGNITE-22319 Node crashes if a snapshot restore cancelled due to netw…
NSAmelchev May 23, 2024
803c190
IGNITE-22319 Node crashes if a snapshot restore cancelled due to netw…
NSAmelchev May 24, 2024
4b55d31
IGNITE-22319 Node crashes if a snapshot restore cancelled due to netw…
NSAmelchev Jun 5, 2024
b5077d1
IGNITE-22319 Node crashes if a snapshot restore cancelled due to netw…
NSAmelchev Jun 6, 2024
7c86431
Merge remote-tracking branch 'refs/remotes/apache/master' into ignite…
NSAmelchev Jun 6, 2024
8b88ea9
IGNITE-22319 Node crashes if a snapshot restore cancelled due to netw…
NSAmelchev Jun 6, 2024
e461ba4
IGNITE-22319 Node crashes if a snapshot restore cancelled due to netw…
NSAmelchev Jun 6, 2024
6dbebea
Review fixes
NSAmelchev Jun 10, 2024
4df1317
Review fixes
NSAmelchev Jun 10, 2024
b877084
Review fixes
NSAmelchev Jun 10, 2024
72678c6
Review fixes
NSAmelchev Jun 11, 2024
38b3739
Merge remote-tracking branch 'refs/remotes/apache/master' into ignite…
NSAmelchev Jul 4, 2024
1b01fae
wip
NSAmelchev Jul 11, 2024
66964b3
Merge remote-tracking branch 'refs/remotes/apache/master' into ignite…
NSAmelchev Jul 11, 2024
53bde1d
wip
NSAmelchev Jul 11, 2024
155223e
review fixes
NSAmelchev Jul 23, 2024
4117e2e
review fixes
NSAmelchev Jul 25, 2024
003558a
Merge remote-tracking branch 'refs/remotes/apache/master' into ignite…
NSAmelchev Jul 25, 2024
78d0d42
review fixes
NSAmelchev Jul 26, 2024
9b11203
review fixes
NSAmelchev Jul 30, 2024
cd779d3
Merge remote-tracking branch 'refs/remotes/apache/master' into ignite…
NSAmelchev Jul 30, 2024
1d801f5
review fixes
NSAmelchev Jul 30, 2024
925d5fb
review fixes
NSAmelchev Jul 30, 2024
d266ed4
revert test
NSAmelchev Jul 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1958,6 +1958,14 @@ public void removeTransmissionHandler(Object topic) {
"on local node [nodeId=" + ctx.localNodeId() + ']'));
}

/**
* @param topic The topic to interrupt receiver from.
* @param ex Exception.
*/
public void interruptTransmissionReceiver(Object topic, Exception ex) {
interruptReceiver(rcvCtxs.remove(topic), ex);
}

/**
* This method must be used prior to opening a {@link TransmissionSender} by calling
* {@link #openTransmissionSender(UUID, Object)} to ensure that remote and local nodes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,6 @@
import org.apache.ignite.internal.util.GridCloseableIteratorAdapter;
import org.apache.ignite.internal.util.distributed.DistributedProcess;
import org.apache.ignite.internal.util.distributed.InitMessage;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
Expand Down Expand Up @@ -3785,47 +3784,32 @@ public synchronized void stop() {

while ((r = queue.poll()) != null)
r.acceptException(new IgniteException(SNP_NODE_STOPPING_ERR_MSG));

NSAmelchev marked this conversation as resolved.
Show resolved Hide resolved
Set<RemoteSnapshotFilesRecevier> futs = activeTasks();
GridCompoundFuture<Void, Void> stopFut = new GridCompoundFuture<>();

try {
for (IgniteInternalFuture<Void> fut : futs)
stopFut.add(fut);

stopFut.markInitialized().get();
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
}

/**
* @param nodeId A node left the cluster.
*/
public void onNodeLeft(UUID nodeId) {
Set<RemoteSnapshotFilesRecevier> futs = activeTasks();
public synchronized void onNodeLeft(UUID nodeId) {
NSAmelchev marked this conversation as resolved.
Show resolved Hide resolved
if (active == null || active.isDone())
return;

ClusterTopologyCheckedException ex = new ClusterTopologyCheckedException("The node from which a snapshot has been " +
"requested left the grid");

futs.forEach(t -> {
if (active.rmtNodeId.equals(nodeId))
interruptActive(ex);

queue.forEach(t -> {
if (t.rmtNodeId.equals(nodeId))
t.acceptException(ex);
});
}

/**
* @return The set of currently scheduled tasks, some of them may be already completed.
*/
private Set<RemoteSnapshotFilesRecevier> activeTasks() {
Set<RemoteSnapshotFilesRecevier> futs = new HashSet<>(queue);

RemoteSnapshotFilesRecevier active0 = active;
/** Interrupts current active task (if present) to safely shedule next. */
private void interruptActive(Exception e) {
cctx.kernalContext().io().interruptTransmissionReceiver(DFLT_INITIAL_SNAPSHOT_TOPIC, e);
NSAmelchev marked this conversation as resolved.
Show resolved Hide resolved

if (active0 != null)
futs.add(active0);

return futs;
active.acceptException(e);
}

/** {@inheritDoc} */
Expand Down Expand Up @@ -3895,20 +3879,22 @@ private Set<RemoteSnapshotFilesRecevier> activeTasks() {
else if (msg instanceof SnapshotFilesFailureMessage) {
SnapshotFilesFailureMessage respMsg0 = (SnapshotFilesFailureMessage)msg;

RemoteSnapshotFilesRecevier task = active;
synchronized (this) {
RemoteSnapshotFilesRecevier task = active;
NSAmelchev marked this conversation as resolved.
Show resolved Hide resolved

if (task == null || !task.reqId.equals(respMsg0.id())) {
if (log.isInfoEnabled()) {
log.info("A stale snapshot response message has been received. Will be ignored " +
"[fromNodeId=" + nodeId + ", response=" + respMsg0 + ']');
}
if (task == null || !task.reqId.equals(respMsg0.id())) {
if (log.isInfoEnabled()) {
log.info("A stale snapshot response message has been received. Will be ignored " +
"[fromNodeId=" + nodeId + ", response=" + respMsg0 + ']');
}

return;
}
return;
}

if (respMsg0.errorMessage() != null) {
task.acceptException(new IgniteCheckedException("Request cancelled. The snapshot operation stopped " +
"on the remote node with an error: " + respMsg0.errorMessage()));
if (respMsg0.errorMessage() != null) {
interruptActive(new IgniteCheckedException("Request cancelled. The snapshot operation " +
"stopped on the remote node with an error: " + respMsg0.errorMessage()));
}
}
}
}
Expand Down Expand Up @@ -3941,7 +3927,7 @@ else if (msg instanceof SnapshotFilesFailureMessage) {
}

/** {@inheritDoc} */
@Override public void onException(UUID nodeId, Throwable ex) {
@Override public synchronized void onException(UUID nodeId, Throwable ex) {
NSAmelchev marked this conversation as resolved.
Show resolved Hide resolved
RemoteSnapshotFilesRecevier task = active;

if (task == null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1200,8 +1200,7 @@ private IgniteInternalFuture<Boolean> preload(UUID reqId) {
}
catch (Exception ex) {
opCtx0.errHnd.accept(ex);

NSAmelchev marked this conversation as resolved.
Show resolved Hide resolved
return new GridFinishedFuture<>(ex);
retFut.onDone(ex);
}

return retFut;
Expand Down Expand Up @@ -1887,7 +1886,7 @@ private static class SnapshotRestoreContext {
private volatile Map<Integer, StoredCacheData> cfgs = Collections.emptyMap();

/** Graceful shutdown future. */
private volatile IgniteFuture<?> stopFut;
NSAmelchev marked this conversation as resolved.
Show resolved Hide resolved
private volatile IgniteFuture<Void> stopFut;

/** Operation start time. */
private final long startTime;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.function.Function;
Expand Down Expand Up @@ -64,6 +66,7 @@
import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.partId;
import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.resolveSnapshotWorkDirectory;
import static org.apache.ignite.testframework.GridTestUtils.assertContains;
import static org.apache.ignite.testframework.GridTestUtils.assertThrowsWithCause;

/** */
public class IgniteSnapshotRestoreFromRemoteTest extends IgniteClusterSnapshotRestoreBaseTest {
Expand Down Expand Up @@ -323,6 +326,46 @@ public void testSnapshotCachesStoppedIfLoadingFailOnRemote() throws Exception {
ensureCacheAbsent(dfltCacheCfg);
}

/** @throws Exception If failed. */
@Test
public void testRestoreConnectionLost() throws Exception {
IgniteEx coord = startDedicatedGrids(SECOND_CLUSTER_PREFIX, 2);

copyAndShuffle(snpParts, G.allGrids());

// Start a new node without snapshot working directory.
IgniteEx emptyNode = startDedicatedGrid(SECOND_CLUSTER_PREFIX, 2);

emptyNode.cluster().state(ClusterState.ACTIVE);

emptyNode.cache(DEFAULT_CACHE_NAME).destroy();
awaitPartitionMapExchange();
NSAmelchev marked this conversation as resolved.
Show resolved Hide resolved

CountDownLatch latch = new CountDownLatch(1);

IgniteSnapshotManager mgr = snp(coord);
mgr.remoteSnapshotSenderFactory(new BiFunction<String, UUID, SnapshotSender>() {
NSAmelchev marked this conversation as resolved.
Show resolved Hide resolved
@Override public SnapshotSender apply(String s, UUID uuid) {
return new DelegateSnapshotSender(log, mgr.snapshotExecutorService(), mgr.remoteSnapshotSenderFactory(s, uuid)) {
@Override public void sendPart0(File part, String cacheDirName, GroupPartitionId pair, Long length) {
delegate.sendPart0(part, cacheDirName, pair, length);

latch.countDown();
}
};
}
});

// Restore all cache groups.
IgniteFuture<Void> fut = emptyNode.snapshot().restoreSnapshot(SNAPSHOT_NAME, null);

latch.await(TIMEOUT, TimeUnit.MILLISECONDS);

coord.close();
NSAmelchev marked this conversation as resolved.
Show resolved Hide resolved

assertThrowsWithCause(() -> fut.get(TIMEOUT), IgniteException.class);
}

/**
* @param snpParts Snapshot parts.
* @param toNodes List of toNodes to copy parts to.
Expand Down
Loading