Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-22319 Fixed node crashing if a snapshot restore cancelled due to network issues #11361

Merged
merged 24 commits into from
Jul 31, 2024
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
1ba83f5
IGNITE-22319 Node crashes if a snapshot restore cancelled due to netw…
NSAmelchev May 23, 2024
803c190
IGNITE-22319 Node crashes if a snapshot restore cancelled due to netw…
NSAmelchev May 24, 2024
4b55d31
IGNITE-22319 Node crashes if a snapshot restore cancelled due to netw…
NSAmelchev Jun 5, 2024
b5077d1
IGNITE-22319 Node crashes if a snapshot restore cancelled due to netw…
NSAmelchev Jun 6, 2024
7c86431
Merge remote-tracking branch 'refs/remotes/apache/master' into ignite…
NSAmelchev Jun 6, 2024
8b88ea9
IGNITE-22319 Node crashes if a snapshot restore cancelled due to netw…
NSAmelchev Jun 6, 2024
e461ba4
IGNITE-22319 Node crashes if a snapshot restore cancelled due to netw…
NSAmelchev Jun 6, 2024
6dbebea
Review fixes
NSAmelchev Jun 10, 2024
4df1317
Review fixes
NSAmelchev Jun 10, 2024
b877084
Review fixes
NSAmelchev Jun 10, 2024
72678c6
Review fixes
NSAmelchev Jun 11, 2024
38b3739
Merge remote-tracking branch 'refs/remotes/apache/master' into ignite…
NSAmelchev Jul 4, 2024
1b01fae
wip
NSAmelchev Jul 11, 2024
66964b3
Merge remote-tracking branch 'refs/remotes/apache/master' into ignite…
NSAmelchev Jul 11, 2024
53bde1d
wip
NSAmelchev Jul 11, 2024
155223e
review fixes
NSAmelchev Jul 23, 2024
4117e2e
review fixes
NSAmelchev Jul 25, 2024
003558a
Merge remote-tracking branch 'refs/remotes/apache/master' into ignite…
NSAmelchev Jul 25, 2024
78d0d42
review fixes
NSAmelchev Jul 26, 2024
9b11203
review fixes
NSAmelchev Jul 30, 2024
cd779d3
Merge remote-tracking branch 'refs/remotes/apache/master' into ignite…
NSAmelchev Jul 30, 2024
1d801f5
review fixes
NSAmelchev Jul 30, 2024
925d5fb
review fixes
NSAmelchev Jul 30, 2024
d266ed4
revert test
NSAmelchev Jul 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,6 @@
import org.apache.ignite.internal.util.GridCloseableIteratorAdapter;
import org.apache.ignite.internal.util.distributed.DistributedProcess;
import org.apache.ignite.internal.util.distributed.InitMessage;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
Expand Down Expand Up @@ -722,6 +721,8 @@ public static String partDeltaFileName(int partId) {
busyLock.block();

try {
snpRmtMgr.stop();
NSAmelchev marked this conversation as resolved.
Show resolved Hide resolved

restoreCacheGrpProc.interrupt(new NodeStoppingException("Node is stopping."));

// Try stop all snapshot processing if not yet.
Expand All @@ -730,8 +731,6 @@ public static String partDeltaFileName(int partId) {

locSnpTasks.clear();

snpRmtMgr.stop();
NSAmelchev marked this conversation as resolved.
Show resolved Hide resolved

synchronized (snpOpMux) {
if (clusterSnpFut != null) {
clusterSnpFut.onDone(new NodeStoppingException(SNP_NODE_STOPPING_ERR_MSG));
Expand Down Expand Up @@ -3739,33 +3738,33 @@ private class SequentialRemoteSnapshotManager implements TransmissionHandler, Gr
/** {@code true} if the node is stopping. */
private boolean stopping;

/**
* @param next New task for scheduling.
*/
public synchronized void submit(IgniteSnapshotManager.RemoteSnapshotFilesRecevier next) {
/** @param next New task for scheduling. */
public void submit(IgniteSnapshotManager.RemoteSnapshotFilesRecevier next) {
assert next != null;

if (stopping) {
next.acceptException(new IgniteException(SNP_NODE_STOPPING_ERR_MSG));
synchronized (this) {
if (stopping) {
next.acceptException(new IgniteException(SNP_NODE_STOPPING_ERR_MSG));

return;
}
return;
}

RemoteSnapshotFilesRecevier curr = active;
if (active != null && !active.isDone()) {
queue.offer(next);

if (curr == null || curr.isDone()) {
next.listen(this::scheduleNext);
return;
}

active = next;

next.init();
active.listen(this::scheduleNext);
}
else
queue.offer(next);

next.init();
}

/** Schedule next async receiver. */
private synchronized void scheduleNext() {
private void scheduleNext() {
NSAmelchev marked this conversation as resolved.
Show resolved Hide resolved
NSAmelchev marked this conversation as resolved.
Show resolved Hide resolved
NSAmelchev marked this conversation as resolved.
Show resolved Hide resolved
RemoteSnapshotFilesRecevier next = queue.poll();

if (next == null)
Expand All @@ -3775,57 +3774,36 @@ private synchronized void scheduleNext() {
}

/** Stopping handler. */
public synchronized void stop() {
stopping = true;
public void stop() {
synchronized (this) {
stopping = true;
NSAmelchev marked this conversation as resolved.
Show resolved Hide resolved
}

if (active != null)
active.acceptException(new IgniteException(SNP_NODE_STOPPING_ERR_MSG));
IgniteException ex = new IgniteException(SNP_NODE_STOPPING_ERR_MSG);

RemoteSnapshotFilesRecevier r;

while ((r = queue.poll()) != null)
r.acceptException(new IgniteException(SNP_NODE_STOPPING_ERR_MSG));

NSAmelchev marked this conversation as resolved.
Show resolved Hide resolved
Set<RemoteSnapshotFilesRecevier> futs = activeTasks();
GridCompoundFuture<Void, Void> stopFut = new GridCompoundFuture<>();
r.acceptException(ex);

try {
for (IgniteInternalFuture<Void> fut : futs)
stopFut.add(fut);

stopFut.markInitialized().get();
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
if (active != null)
active.acceptException(ex);
}

/**
* @param nodeId A node left the cluster.
*/
/** @param nodeId A node left the cluster. */
public void onNodeLeft(UUID nodeId) {
Set<RemoteSnapshotFilesRecevier> futs = activeTasks();
ClusterTopologyCheckedException ex = new ClusterTopologyCheckedException("The node from which a snapshot has been " +
"requested left the grid");

futs.forEach(t -> {
if (t.rmtNodeId.equals(nodeId))
t.acceptException(ex);
queue.forEach(r -> {
if (r.stopChecker.getAsBoolean() || r.rmtNodeId.equals(nodeId))
r.acceptException(ex);
});
}

/**
* @return The set of currently scheduled tasks, some of them may be already completed.
*/
private Set<RemoteSnapshotFilesRecevier> activeTasks() {
Set<RemoteSnapshotFilesRecevier> futs = new HashSet<>(queue);

RemoteSnapshotFilesRecevier active0 = active;

if (active0 != null)
futs.add(active0);
RemoteSnapshotFilesRecevier task = active;

return futs;
if (task != null && !task.isDone() && (task.stopChecker.getAsBoolean() || task.rmtNodeId.equals(nodeId)))
task.acceptException(ex);
}

/** {@inheritDoc} */
Expand Down Expand Up @@ -3944,7 +3922,7 @@ else if (msg instanceof SnapshotFilesFailureMessage) {
@Override public void onException(UUID nodeId, Throwable ex) {
RemoteSnapshotFilesRecevier task = active;

if (task == null)
if (task == null || task.isDone())
return;

assert task.rmtNodeId.equals(nodeId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1201,7 +1201,7 @@ private IgniteInternalFuture<Boolean> preload(UUID reqId) {
catch (Exception ex) {
opCtx0.errHnd.accept(ex);

NSAmelchev marked this conversation as resolved.
Show resolved Hide resolved
return new GridFinishedFuture<>(ex);
retFut.onDone(ex);
}

return retFut;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ public void testSnapshotRequestRemoteSourceNodeLeft() throws Exception {
latch.countDown();

assertThrowsAnyCause(log, () -> fut.get(TIMEOUT), ClusterTopologyCheckedException.class,
"he node from which a snapshot has been requested left the grid");
"Remote node left the grid.");
}

/** @throws Exception If fails. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.function.Function;
Expand Down Expand Up @@ -64,6 +66,7 @@
import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.partId;
import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.resolveSnapshotWorkDirectory;
import static org.apache.ignite.testframework.GridTestUtils.assertContains;
import static org.apache.ignite.testframework.GridTestUtils.assertThrowsWithCause;

/** */
public class IgniteSnapshotRestoreFromRemoteTest extends IgniteClusterSnapshotRestoreBaseTest {
Expand Down Expand Up @@ -323,6 +326,58 @@ public void testSnapshotCachesStoppedIfLoadingFailOnRemote() throws Exception {
ensureCacheAbsent(dfltCacheCfg);
}

/** @throws Exception If failed. */
@Test
public void testRestoreConnectionLost() throws Exception {
IgniteEx coord = startDedicatedGrids(SECOND_CLUSTER_PREFIX, 2);

copyAndShuffle(snpParts, G.allGrids());

// Start a new node without snapshot working directory.
IgniteEx emptyNode = startDedicatedGrid(SECOND_CLUSTER_PREFIX, 2);

emptyNode.cluster().state(ClusterState.ACTIVE);

emptyNode.cache(DEFAULT_CACHE_NAME).destroy();

awaitPartitionMapExchange();
NSAmelchev marked this conversation as resolved.
Show resolved Hide resolved

CountDownLatch restoreStarted = new CountDownLatch(1);
CountDownLatch nodeStopped = new CountDownLatch(1);

IgniteSnapshotManager mgr = snp(coord);

mgr.remoteSnapshotSenderFactory(new BiFunction<String, UUID, SnapshotSender>() {
NSAmelchev marked this conversation as resolved.
Show resolved Hide resolved
@Override public SnapshotSender apply(String s, UUID uuid) {
return new DelegateSnapshotSender(log, mgr.snapshotExecutorService(), mgr.remoteSnapshotSenderFactory(s, uuid)) {
@Override public void sendPart0(File part, String cacheDirName, GroupPartitionId pair, Long length) {
delegate.sendPart0(part, cacheDirName, pair, length);

restoreStarted.countDown();

try {
nodeStopped.await(TIMEOUT, TimeUnit.MILLISECONDS);
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
};
}
});

// Restore all cache groups.
IgniteFuture<Void> fut = emptyNode.snapshot().restoreSnapshot(SNAPSHOT_NAME, null);

restoreStarted.await(TIMEOUT, TimeUnit.MILLISECONDS);

coord.close();
NSAmelchev marked this conversation as resolved.
Show resolved Hide resolved

nodeStopped.countDown();

assertThrowsWithCause(() -> fut.get(TIMEOUT), IgniteException.class);
}

/**
* @param snpParts Snapshot parts.
* @param toNodes List of toNodes to copy parts to.
Expand Down