Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-22662 : Snapshot check as distributed process #11391

Merged
Show file tree
Hide file tree
Changes from 87 commits
Commits
Show all changes
93 commits
Select commit Hold shift + click to select a range
1e4cee5
raw
Vladsz83 Jun 11, 2024
c5916ea
test fixes
Vladsz83 Jun 14, 2024
a995a1b
raw tests
Vladsz83 Jun 15, 2024
57d92b3
Merge branch 'master' into checkSnpAsDistrProc
Vladsz83 Jun 22, 2024
bdc625e
raw tests
Vladsz83 Jun 24, 2024
01b5d73
Merge branch 'master' into checkSnpAsDistrProc
Vladsz83 Jun 24, 2024
f5c46c0
Merge remote-tracking branch 'my/checkSnpAsDistrProc' into checkSnpAs…
Vladsz83 Jun 24, 2024
c7636c5
stabilized tests alpha
Vladsz83 Jun 25, 2024
87e0740
better tests
Vladsz83 Jun 27, 2024
15cdc22
better tests
Vladsz83 Jun 27, 2024
a908f73
+ raw node stopped tests
Vladsz83 Jul 1, 2024
7154eb8
+ non-baseline tests
Vladsz83 Jul 1, 2024
3940fd2
+ new baseline node test
Vladsz83 Jul 2, 2024
338d0cd
+ tests
Vladsz83 Jul 2, 2024
3dd6431
Merge remote-tracking branch 'my/checkSnpAsDistrProc' into checkSnpAs…
Vladsz83 Jul 2, 2024
88cf1e7
Merge branch 'master' into checkSnpAsDistrProc
Vladsz83 Jul 2, 2024
7e3290d
beta
Vladsz83 Jul 2, 2024
59167e5
test fixes
Vladsz83 Jul 3, 2024
69c9a2e
test fixes
Vladsz83 Jul 3, 2024
6eb067a
test fixes
Vladsz83 Jul 3, 2024
4002bdb
test fixes. manual review
Vladsz83 Jul 3, 2024
a11d453
test fixes. manual review
Vladsz83 Jul 3, 2024
0f2903d
fix
Vladsz83 Jul 4, 2024
68e5e48
fix
Vladsz83 Jul 4, 2024
fe2f7ae
errors processing fix
Vladsz83 Jul 8, 2024
f8575ca
Merge branch 'master' into IGNITE-22662_SnapshotCheckToDistributedPro…
Vladsz83 Jul 8, 2024
feefce9
renaming
Vladsz83 Jul 10, 2024
3b32f1a
renaming
Vladsz83 Jul 10, 2024
53f8f6d
minority
Vladsz83 Jul 10, 2024
27e2463
moved tests
Vladsz83 Jul 10, 2024
01bb0fc
Merge remote-tracking branch 'ignite/IGNITE-22662__snapshot_refactori…
Vladsz83 Jul 10, 2024
aa60631
merged snp checker
Vladsz83 Jul 10, 2024
2b23416
Merge branch 'IGNITE-22662__snapshot_refactoring' into IGNITE-22662_S…
Vladsz83 Jul 10, 2024
e724395
fix
Vladsz83 Jul 10, 2024
df68043
- log
Vladsz83 Jul 11, 2024
1105e3f
clean local request fix
Vladsz83 Jul 11, 2024
fca7aed
minority
Vladsz83 Jul 11, 2024
691de08
review fixes
Vladsz83 Jul 11, 2024
dcbabdc
review fixes
Vladsz83 Jul 15, 2024
c899d8c
exec service to the checker
Vladsz83 Jul 15, 2024
300dcd2
fix
Vladsz83 Jul 15, 2024
8938cd7
review fixes
Vladsz83 Jul 16, 2024
458a463
review fixes
Vladsz83 Jul 16, 2024
ac63542
review fixes
Vladsz83 Jul 16, 2024
0397b24
review fixes
Vladsz83 Jul 16, 2024
bb0db3f
review fixes
Vladsz83 Jul 17, 2024
1a51e85
review fixes
Vladsz83 Jul 18, 2024
d98be51
+assertTrue in the tests on the conditional waitings
Vladsz83 Jul 18, 2024
991eb8b
Merge remote-tracking branch 'my/checkSnpAsDistrProc' into checkSnpAs…
Vladsz83 Jul 28, 2024
fc2eeeb
log fix
Vladsz83 Jul 28, 2024
2dda2e7
Merge branch 'master' into checkSnpAsDistrProc
Vladsz83 Jul 29, 2024
d25e931
review fixes
Vladsz83 Jul 29, 2024
830a8fb
Merge branch 'master' into checkSnpAsDistrProc
Vladsz83 Jul 29, 2024
28d49b6
Merge remote-tracking branch 'ignite/IGNITE-22662__snapshot_refactori…
Vladsz83 Jul 29, 2024
31f9251
interrupt fix
Vladsz83 Jul 29, 2024
6c475b5
- 'clean()'. Better initiatorId / coordinatorId
Vladsz83 Jul 29, 2024
b11def8
test fix
Vladsz83 Jul 30, 2024
c5baee9
refactoring: context
Vladsz83 Jul 31, 2024
8bf2b0d
better phases processing
Vladsz83 Aug 1, 2024
3d4b6e7
better error processing
Vladsz83 Aug 1, 2024
ec08cbb
+ onNodeLeft()
Vladsz83 Aug 1, 2024
526496b
renaming
Vladsz83 Aug 2, 2024
683a494
-initiatorId. - opNodeId
Vladsz83 Aug 2, 2024
553a639
checking metas on every node
Vladsz83 Aug 5, 2024
f36e236
revert changes of SnpOpRequest
Vladsz83 Aug 5, 2024
67f4983
NPE fix, minors
Vladsz83 Aug 5, 2024
d569bbc
review fixes
Vladsz83 Aug 6, 2024
b07f368
includeHandles and snpChecker fix
Vladsz83 Aug 6, 2024
1a5ba42
review fixes
Vladsz83 Aug 6, 2024
bbc4c75
review fixes
Vladsz83 Aug 7, 2024
d6efa75
+ctx cleaning
Vladsz83 Aug 7, 2024
aba4ead
fix
Vladsz83 Aug 9, 2024
f754569
codestyle
Vladsz83 Aug 9, 2024
16b6c2b
review fixes
Vladsz83 Aug 9, 2024
857b9f7
NPE fix
Vladsz83 Aug 9, 2024
a1c41a4
codestyle
Vladsz83 Aug 9, 2024
da9f916
+ locProcFut
Vladsz83 Aug 14, 2024
cdc3e0e
fix
Vladsz83 Aug 15, 2024
744b524
renaming
Vladsz83 Aug 15, 2024
e4962b3
review fixes minors
Vladsz83 Aug 15, 2024
dfc2b46
checkstyle
Vladsz83 Aug 16, 2024
db4aa41
test fixes
Vladsz83 Aug 19, 2024
37defc5
test fix
Vladsz83 Aug 19, 2024
beaa40c
major future hang fix
Vladsz83 Aug 20, 2024
fae23e6
Merge remote-tracking branch 'my/checkSnpAsDistrProc' into checkSnpAs…
Vladsz83 Aug 20, 2024
e281152
checkstyle fix
Vladsz83 Aug 20, 2024
57e2fea
log spam fix
Vladsz83 Aug 21, 2024
8d2069d
review fix. revert cleaning on node leave
Vladsz83 Aug 22, 2024
11f0be3
- isStopping
Vladsz83 Aug 22, 2024
9c1918d
Revert "- isStopping"
Vladsz83 Aug 22, 2024
0db7831
+ sync on stop
Vladsz83 Aug 22, 2024
e0706b1
+ sync on stopping
Vladsz83 Aug 22, 2024
b0db74d
fix
Vladsz83 Aug 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStore;
import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotChecker;
import org.apache.ignite.internal.processors.cache.verify.GridNotIdleException;
import org.apache.ignite.internal.processors.cache.verify.PartitionHashRecordV2;
import org.apache.ignite.internal.processors.task.GridInternal;
Expand All @@ -63,6 +64,7 @@
import org.apache.ignite.resources.LoggerResource;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import static java.util.Collections.emptyMap;
import static org.apache.ignite.internal.pagemem.PageIdAllocator.FLAG_DATA;
import static org.apache.ignite.internal.processors.cache.verify.IdleVerifyUtility.GRID_NOT_IDLE_MSG;
Expand Down Expand Up @@ -143,8 +145,8 @@ public class VerifyBackupPartitionsTaskV2 extends ComputeTaskAdapter<CacheIdleVe
* @return Idle verify job result constructed from results of remote executions.
*/
public static IdleVerifyResultV2 reduce0(List<ComputeJobResult> results) {
Map<PartitionKeyV2, List<PartitionHashRecordV2>> clusterHashes = new HashMap<>();
Map<ClusterNode, Exception> ex = new HashMap<>();
Map<ClusterNode, Map<PartitionKeyV2, PartitionHashRecordV2>> hashes = new HashMap<>();

for (ComputeJobResult res : results) {
if (res.getException() != null) {
Expand All @@ -153,19 +155,10 @@ public static IdleVerifyResultV2 reduce0(List<ComputeJobResult> results) {
continue;
}

Map<PartitionKeyV2, PartitionHashRecordV2> nodeHashes = res.getData();

for (Map.Entry<PartitionKeyV2, PartitionHashRecordV2> e : nodeHashes.entrySet()) {
List<PartitionHashRecordV2> records = clusterHashes.computeIfAbsent(e.getKey(), k -> new ArrayList<>());

records.add(e.getValue());
}
hashes.put(res.getNode(), res.getData());
}

if (results.size() != ex.size())
return new IdleVerifyResultV2(clusterHashes, ex);
else
return new IdleVerifyResultV2(ex);
return SnapshotChecker.reduceHashesResults(hashes, ex);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.processors.cache.persistence.snapshot;

import java.io.Serializable;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import org.apache.ignite.internal.util.distributed.DistributedProcess;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.jetbrains.annotations.Nullable;

/**
* Snapshot operation start request for {@link DistributedProcess} initiate message.
*/
abstract class AbstractSnapshotOperationRequest implements Serializable {
/** Serial version uid. */
private static final long serialVersionUID = 0L;

/** Request ID. */
@GridToStringInclude
private final UUID reqId;

/** Snapshot name. */
@GridToStringInclude
private final String snpName;

/** Snapshot directory path. */
@GridToStringInclude
private final String snpPath;

/** List of cache group names. */
@GridToStringInclude
private final Collection<String> grps;

/** Start time. */
@GridToStringInclude
private final long startTime;

/** IDs of the nodes that must be alive to complete the operation. */
@GridToStringInclude
private final Set<UUID> nodes;

/**
* @param reqId Request ID.
* @param snpName Snapshot name.
* @param snpPath Snapshot directory path.
* @param grps List of cache group names.
* @param incIdx Incremental snapshot index.
* @param nodes IDs of the nodes that must be alive to complete the operation.
*/
protected AbstractSnapshotOperationRequest(
UUID reqId,
String snpName,
String snpPath,
@Nullable Collection<String> grps,
int incIdx,
Collection<UUID> nodes
) {
this.reqId = reqId;
this.snpName = snpName;
this.grps = grps;
this.snpPath = snpPath;
this.nodes = new HashSet<>(nodes);
this.startTime = System.currentTimeMillis();
}

/** @return Request ID. */
@Nullable public UUID requestId() {
return reqId;
}

/** @return Snapshot name. */
public String snapshotName() {
return snpName;
}

/** @return Snapshot directory path. */
public String snapshotPath() {
return snpPath;
}

/** @return List of cache group names. */
public @Nullable Collection<String> groups() {
return grps;
}

/** @return Start time. */
public long startTime() {
return startTime;
}

/** @return IDs of the nodes that must be alive to complete the operation. */
public Set<UUID> nodes() {
return nodes;
}

/** {@inheritDoc} */
@Override public String toString() {
return S.toString(AbstractSnapshotOperationRequest.class, this);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,9 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
/** Take snapshot operation procedure. */
private final DistributedProcess<SnapshotOperationRequest, SnapshotOperationResponse> startSnpProc;

/** Snapshot full validation distributed process. */
private final SnapshotCheckProcess checkSnpProc;

/** Check previously performed snapshot operation and delete uncompleted files if we need. */
private final DistributedProcess<SnapshotOperationRequest, SnapshotOperationResponse> endSnpProc;

Expand Down Expand Up @@ -488,6 +491,8 @@ public IgniteSnapshotManager(GridKernalContext ctx) {

restoreCacheGrpProc = new SnapshotRestoreProcess(ctx, locBuff);

checkSnpProc = new SnapshotCheckProcess(ctx);

// Manage remote snapshots.
snpRmtMgr = new SequentialRemoteSnapshotManager();

Expand Down Expand Up @@ -720,8 +725,11 @@ public static String partDeltaFileName(int partId) {
@Override protected void stop0(boolean cancel) {
busyLock.block();

IgniteCheckedException stopErr = new NodeStoppingException("Node is stopping.");

try {
restoreCacheGrpProc.interrupt(new NodeStoppingException("Node is stopping."));
restoreCacheGrpProc.interrupt(stopErr);
checkSnpProc.interrupt(stopErr);

// Try stop all snapshot processing if not yet.
for (AbstractSnapshotFutureTask<?> sctx : locSnpTasks.values())
Expand Down Expand Up @@ -1890,13 +1898,16 @@ public IgniteInternalFuture<SnapshotPartitionsVerifyTaskResult> checkSnapshot(
A.ensure(grps == null || grps.stream().filter(Objects::isNull).collect(Collectors.toSet()).isEmpty(),
"Collection of cache groups names cannot contain null elements.");

GridFutureAdapter<SnapshotPartitionsVerifyTaskResult> res = new GridFutureAdapter<>();

if (log.isInfoEnabled()) {
log.info("The check snapshot procedure started [snpName=" + name + ", snpPath=" + snpPath +
", incIdx=" + incIdx + ", grps=" + grps + ']');
", incIdx=" + incIdx + ", grps=" + grps + ", validateParts=" + check + ']');
}

if (check && incIdx < 1)
return checkSnpProc.start(name, snpPath, grps, includeCustomHandlers);

GridFutureAdapter<SnapshotPartitionsVerifyTaskResult> res = new GridFutureAdapter<>();

GridKernalContext kctx0 = cctx.kernalContext();

Collection<ClusterNode> bltNodes = F.view(cctx.discovery().serverNodes(AffinityTopologyVersion.NONE),
Expand Down Expand Up @@ -2507,18 +2518,23 @@ public static String snapshotMetaFileName(String consId) {
* @param grpName Cache group name.
* @param partId Partition id.
* @param pageStore File page store to iterate over.
* @param cacheSharedCtx Related cache shared context. If {@code null}, a temporary one is created using
* {@link GridCacheSharedContext#builder()}.
* @return Iterator over partition.
* @throws IgniteCheckedException If and error occurs.
*/
public GridCloseableIterator<CacheDataRow> partitionRowIterator(GridKernalContext ctx,
String grpName,
int partId,
FilePageStore pageStore
FilePageStore pageStore,
@Nullable GridCacheSharedContext<?, ?> cacheSharedCtx
) throws IgniteCheckedException {
CacheObjectContext coctx = new CacheObjectContext(ctx, grpName, null, false,
false, false, false, false);

GridCacheSharedContext<?, ?> sctx = GridCacheSharedContext.builder().build(ctx, null);
GridCacheSharedContext<?, ?> sctx = cacheSharedCtx == null
? GridCacheSharedContext.builder().build(ctx, null)
: cacheSharedCtx;

return new DataPageIterator(sctx, coctx, pageStore, partId);
}
Expand Down Expand Up @@ -2571,7 +2587,7 @@ public GridCloseableIterator<CacheDataRow> partitionRowIterator(String snpName,
snpPart::toPath,
val -> {});

GridCloseableIterator<CacheDataRow> partIter = partitionRowIterator(cctx.kernalContext(), grpName, partId, pageStore);
GridCloseableIterator<CacheDataRow> partIter = partitionRowIterator(cctx.kernalContext(), grpName, partId, pageStore, null);

return new GridCloseableIteratorAdapter<CacheDataRow>() {
/** {@inheritDoc} */
Expand Down
Loading