Skip to content

Commit

Permalink
IGNITE-22662 : Snapshot check as distributed process (#11391)
Browse files Browse the repository at this point in the history
  • Loading branch information
Vladsz83 authored Aug 22, 2024
1 parent 325640d commit 8ef9bcf
Show file tree
Hide file tree
Showing 14 changed files with 1,649 additions and 271 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStore;
import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotChecker;
import org.apache.ignite.internal.processors.cache.verify.GridNotIdleException;
import org.apache.ignite.internal.processors.cache.verify.PartitionHashRecordV2;
import org.apache.ignite.internal.processors.task.GridInternal;
Expand All @@ -63,6 +64,7 @@
import org.apache.ignite.resources.LoggerResource;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import static java.util.Collections.emptyMap;
import static org.apache.ignite.internal.pagemem.PageIdAllocator.FLAG_DATA;
import static org.apache.ignite.internal.processors.cache.verify.IdleVerifyUtility.GRID_NOT_IDLE_MSG;
Expand Down Expand Up @@ -143,8 +145,8 @@ public class VerifyBackupPartitionsTaskV2 extends ComputeTaskAdapter<CacheIdleVe
* @return Idle verify job result constructed from results of remote executions.
*/
public static IdleVerifyResultV2 reduce0(List<ComputeJobResult> results) {
Map<PartitionKeyV2, List<PartitionHashRecordV2>> clusterHashes = new HashMap<>();
Map<ClusterNode, Exception> ex = new HashMap<>();
Map<ClusterNode, Map<PartitionKeyV2, PartitionHashRecordV2>> hashes = new HashMap<>();

for (ComputeJobResult res : results) {
if (res.getException() != null) {
Expand All @@ -153,19 +155,10 @@ public static IdleVerifyResultV2 reduce0(List<ComputeJobResult> results) {
continue;
}

Map<PartitionKeyV2, PartitionHashRecordV2> nodeHashes = res.getData();

for (Map.Entry<PartitionKeyV2, PartitionHashRecordV2> e : nodeHashes.entrySet()) {
List<PartitionHashRecordV2> records = clusterHashes.computeIfAbsent(e.getKey(), k -> new ArrayList<>());

records.add(e.getValue());
}
hashes.put(res.getNode(), res.getData());
}

if (results.size() != ex.size())
return new IdleVerifyResultV2(clusterHashes, ex);
else
return new IdleVerifyResultV2(ex);
return SnapshotChecker.reduceHashesResults(hashes, ex);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.processors.cache.persistence.snapshot;

import java.io.Serializable;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import org.apache.ignite.internal.util.distributed.DistributedProcess;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.jetbrains.annotations.Nullable;

/**
* Snapshot operation start request for {@link DistributedProcess} initiate message.
*/
abstract class AbstractSnapshotOperationRequest implements Serializable {
/** Serial version uid. */
private static final long serialVersionUID = 0L;

/** Request ID. */
@GridToStringInclude
private final UUID reqId;

/** Snapshot name. */
@GridToStringInclude
private final String snpName;

/** Snapshot directory path. */
@GridToStringInclude
private final String snpPath;

/** List of cache group names. */
@GridToStringInclude
private final Collection<String> grps;

/** Start time. */
@GridToStringInclude
private final long startTime;

/** IDs of the nodes that must be alive to complete the operation. */
@GridToStringInclude
private final Set<UUID> nodes;

/**
* @param reqId Request ID.
* @param snpName Snapshot name.
* @param snpPath Snapshot directory path.
* @param grps List of cache group names.
* @param incIdx Incremental snapshot index.
* @param nodes IDs of the nodes that must be alive to complete the operation.
*/
protected AbstractSnapshotOperationRequest(
UUID reqId,
String snpName,
String snpPath,
@Nullable Collection<String> grps,
int incIdx,
Collection<UUID> nodes
) {
this.reqId = reqId;
this.snpName = snpName;
this.grps = grps;
this.snpPath = snpPath;
this.nodes = new HashSet<>(nodes);
this.startTime = System.currentTimeMillis();
}

/** @return Request ID. */
@Nullable public UUID requestId() {
return reqId;
}

/** @return Snapshot name. */
public String snapshotName() {
return snpName;
}

/** @return Snapshot directory path. */
public String snapshotPath() {
return snpPath;
}

/** @return List of cache group names. */
public @Nullable Collection<String> groups() {
return grps;
}

/** @return Start time. */
public long startTime() {
return startTime;
}

/** @return IDs of the nodes that must be alive to complete the operation. */
public Set<UUID> nodes() {
return nodes;
}

/** {@inheritDoc} */
@Override public String toString() {
return S.toString(AbstractSnapshotOperationRequest.class, this);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,9 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
/** Take snapshot operation procedure. */
private final DistributedProcess<SnapshotOperationRequest, SnapshotOperationResponse> startSnpProc;

/** Snapshot full validation distributed process. */
private final SnapshotCheckProcess checkSnpProc;

/** Check previously performed snapshot operation and delete uncompleted files if we need. */
private final DistributedProcess<SnapshotOperationRequest, SnapshotOperationResponse> endSnpProc;

Expand Down Expand Up @@ -488,6 +491,8 @@ public IgniteSnapshotManager(GridKernalContext ctx) {

restoreCacheGrpProc = new SnapshotRestoreProcess(ctx, locBuff);

checkSnpProc = new SnapshotCheckProcess(ctx);

// Manage remote snapshots.
snpRmtMgr = new SequentialRemoteSnapshotManager();

Expand Down Expand Up @@ -720,8 +725,11 @@ public static String partDeltaFileName(int partId) {
@Override protected void stop0(boolean cancel) {
busyLock.block();

IgniteCheckedException stopErr = new NodeStoppingException("Node is stopping.");

try {
restoreCacheGrpProc.interrupt(new NodeStoppingException("Node is stopping."));
restoreCacheGrpProc.interrupt(stopErr);
checkSnpProc.interrupt(stopErr);

// Try stop all snapshot processing if not yet.
for (AbstractSnapshotFutureTask<?> sctx : locSnpTasks.values())
Expand Down Expand Up @@ -1890,13 +1898,16 @@ public IgniteInternalFuture<SnapshotPartitionsVerifyTaskResult> checkSnapshot(
A.ensure(grps == null || grps.stream().filter(Objects::isNull).collect(Collectors.toSet()).isEmpty(),
"Collection of cache groups names cannot contain null elements.");

GridFutureAdapter<SnapshotPartitionsVerifyTaskResult> res = new GridFutureAdapter<>();

if (log.isInfoEnabled()) {
log.info("The check snapshot procedure started [snpName=" + name + ", snpPath=" + snpPath +
", incIdx=" + incIdx + ", grps=" + grps + ']');
", incIdx=" + incIdx + ", grps=" + grps + ", validateParts=" + check + ']');
}

if (check && incIdx < 1)
return checkSnpProc.start(name, snpPath, grps, includeCustomHandlers);

GridFutureAdapter<SnapshotPartitionsVerifyTaskResult> res = new GridFutureAdapter<>();

GridKernalContext kctx0 = cctx.kernalContext();

Collection<ClusterNode> bltNodes = F.view(cctx.discovery().serverNodes(AffinityTopologyVersion.NONE),
Expand Down Expand Up @@ -2507,20 +2518,20 @@ public static String snapshotMetaFileName(String consId) {
* @param grpName Cache group name.
* @param partId Partition id.
* @param pageStore File page store to iterate over.
* @param cacheSharedCtx Related cache shared context.
* @return Iterator over partition.
* @throws IgniteCheckedException If and error occurs.
*/
public GridCloseableIterator<CacheDataRow> partitionRowIterator(GridKernalContext ctx,
String grpName,
int partId,
FilePageStore pageStore
FilePageStore pageStore,
GridCacheSharedContext<?, ?> cacheSharedCtx
) throws IgniteCheckedException {
CacheObjectContext coctx = new CacheObjectContext(ctx, grpName, null, false,
false, false, false, false);

GridCacheSharedContext<?, ?> sctx = GridCacheSharedContext.builder().build(ctx, null);

return new DataPageIterator(sctx, coctx, pageStore, partId);
return new DataPageIterator(cacheSharedCtx, coctx, pageStore, partId);
}

/**
Expand Down Expand Up @@ -2571,7 +2582,8 @@ public GridCloseableIterator<CacheDataRow> partitionRowIterator(String snpName,
snpPart::toPath,
val -> {});

GridCloseableIterator<CacheDataRow> partIter = partitionRowIterator(cctx.kernalContext(), grpName, partId, pageStore);
GridCloseableIterator<CacheDataRow> partIter = partitionRowIterator(cctx.kernalContext(), grpName, partId, pageStore,
GridCacheSharedContext.builder().build(cctx.kernalContext(), null));

return new GridCloseableIteratorAdapter<CacheDataRow>() {
/** {@inheritDoc} */
Expand Down
Loading

0 comments on commit 8ef9bcf

Please sign in to comment.