Skip to content

Commit

Permalink
Ensure schema is consistent while bootstrapping (#583)
Browse files Browse the repository at this point in the history
  • Loading branch information
rhuffy authored Dec 12, 2024
1 parent 71d8357 commit 52c815f
Show file tree
Hide file tree
Showing 4 changed files with 265 additions and 24 deletions.
71 changes: 71 additions & 0 deletions src/java/com/palantir/cassandra/utils/SchemaAgreementCheck.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.cassandra.utils;

import java.net.InetAddress;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.function.Supplier;

import com.google.common.annotations.VisibleForTesting;

import org.apache.cassandra.config.Schema;
import org.apache.cassandra.gms.ApplicationState;
import org.apache.cassandra.gms.EndpointState;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.gms.VersionedValue;

public class SchemaAgreementCheck
{
private final Supplier<UUID> localSchemaVersionSupplier;
private final Supplier<Set<Map.Entry<InetAddress, EndpointState>>> endpointStatesSupplier;

public SchemaAgreementCheck()
{
this(Schema.instance::getVersion, Gossiper.instance::getEndpointStates);
}

@VisibleForTesting
SchemaAgreementCheck(Supplier<UUID> localSchemaVersionSupplier, Supplier<Set<Map.Entry<InetAddress, EndpointState>>> endpointStatesSupplier)
{
this.localSchemaVersionSupplier = localSchemaVersionSupplier;
this.endpointStatesSupplier = endpointStatesSupplier;
}

public boolean isSchemaInAgreement()
{
UUID localSchemaVersion = localSchemaVersionSupplier.get();
return endpointStatesSupplier.get().stream()
.map(Map.Entry::getValue)
.filter(value -> !isLeft(value))
.allMatch(value -> schemaIsEqualToLocalVersion(localSchemaVersion, value));
}

private boolean isLeft(EndpointState endpointState)
{
return VersionedValue.STATUS_LEFT.equals(endpointState.getStatus());
}

private boolean schemaIsEqualToLocalVersion(UUID localSchemaVersion, EndpointState endpointState)
{
VersionedValue schema = endpointState.getApplicationState(ApplicationState.SCHEMA);
return schema != null && localSchemaVersion.equals(UUID.fromString(schema.value));
}
}
51 changes: 27 additions & 24 deletions src/java/org/apache/cassandra/service/StorageService.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import com.google.common.util.concurrent.*;
import com.palantir.cassandra.db.BootstrappingSafetyException;
import com.palantir.cassandra.settings.LocalQuorumReadForSerialCasSetting;
import com.palantir.cassandra.utils.SchemaAgreementCheck;
import com.palantir.logsafe.Safe;
import com.palantir.logsafe.SafeArg;
import org.apache.cassandra.schema.LegacySchemaTables;
Expand Down Expand Up @@ -933,11 +934,7 @@ private void joinTokenRing(int delay, boolean autoBootstrap, Collection<String>

if (!noPreviousDataFound)
{
recordNonTransientError(NonTransientError.BOOTSTRAP_ERROR,
ImmutableMap.of("previousDataFound", "true"));
unsafeDisableNode();
// leave node in non-transient error state and prevent it from bootstrapping into the cluster
throw new BootstrappingSafetyException("Detected data from previous bootstrap, failing.");
recordBootstrapErrorAndThrow("previousDataFound");
}

if (SystemKeyspace.bootstrapInProgress())
Expand All @@ -950,21 +947,17 @@ private void joinTokenRing(int delay, boolean autoBootstrap, Collection<String>
}
setMode(Mode.JOINING, "waiting for ring information", true);
// first sleep the delay to make sure we see all our peers
for (int i = 0; i < delay; i += 1000)
{
// if we see schema, we can proceed to the next check directly
if (!Schema.instance.getVersion().equals(Schema.emptyVersion))
{
logger.debug("got schema: {}", Schema.instance.getVersion());
break;
}
Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
}
Uninterruptibles.sleepUninterruptibly(delay, TimeUnit.MILLISECONDS);

// if our schema hasn't matched yet, keep sleeping until it does
// (post CASSANDRA-1391 we don't expect this to be necessary very often, but it doesn't hurt to be careful)
while (!MigrationManager.isReadyForBootstrap())
SchemaAgreementCheck schemaAgreementCheck = new SchemaAgreementCheck();
while (!MigrationManager.isReadyForBootstrap() || !schemaAgreementCheck.isSchemaInAgreement())
{
setMode(Mode.JOINING, "waiting for schema information to complete", true);
logger.info(
"Local schema version {} is not consistent with peers, waiting for schema to become consistent",
SafeArg.of("localSchemaVersion", Schema.instance.getVersion().toString()));
Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
}
setMode(Mode.JOINING, "schema complete, ready to bootstrap", true);
Expand Down Expand Up @@ -1045,9 +1038,7 @@ private void joinTokenRing(int delay, boolean autoBootstrap, Collection<String>
dataAvailable = bootstrap(bootstrapTokens);
if (!dataAvailable)
{
recordNonTransientError(NonTransientError.BOOTSTRAP_ERROR, ImmutableMap.of("streamingFailed", "true"));
unsafeDisableNode();
throw new BootstrappingSafetyException("Bootstrap streaming failed.");
recordBootstrapErrorAndThrow("streamingFailed");
}
logger.info("Bootstrap streaming complete. Waiting to finish bootstrap. Not becoming an active ring " +
"member. Use JMX (StorageService->finishBootstrap()) to finalize ring joining.");
Expand All @@ -1057,11 +1048,8 @@ private void joinTokenRing(int delay, boolean autoBootstrap, Collection<String>
boolean timeoutExceeded = !finishBootstrapCondition.await(BOOTSTRAP_SAFETY_CHECK_GRACE_PERIOD_MINUTES, MINUTES);
if (timeoutExceeded)
{
recordNonTransientError(NonTransientError.BOOTSTRAP_ERROR, ImmutableMap.of("bootstrapSafetyCheckFailed", "true"));
unsafeDisableNode();
String message = "Finish bootstrap was not called within 30 minutes. Bootstrap safety check failed.";
logger.error(message);
throw new BootstrappingSafetyException(message);
logger.error("Finish bootstrap was not called within 30 minutes. Bootstrap safety check failed.");
recordBootstrapErrorAndThrow("bootstrapSafetyCheckFailed");
}
}
catch (InterruptedException e)
Expand Down Expand Up @@ -1597,7 +1585,15 @@ private boolean bootstrap(final Collection<Token> tokens)
BootStrapper bootstrapper = new BootStrapper(FBUtilities.getBroadcastAddress(), tokens, tokenMetadata);
bootstrapper.addProgressListener(progressSupport);
bootstrapListeners.forEach(bootstrapper::addProgressListener);

final UUID initialLocalSchemaVersion = Schema.instance.getVersion();

ListenableFuture<StreamState> bootstrapStream = bootstrapper.bootstrap(streamStateStore, !replacing && useStrictConsistency); // handles token update

if (!MigrationManager.isReadyForBootstrap() || !initialLocalSchemaVersion.equals(Schema.instance.getVersion()))
{
recordBootstrapErrorAndThrow("schemaChangeWhilePreparingStreams");
}
try
{
bootstrapStream.get();
Expand Down Expand Up @@ -1708,6 +1704,13 @@ public Set<Map<String, String>> getNonTransientErrors() {
return ImmutableSet.copyOf(nonTransientErrors);
}

private void recordBootstrapErrorAndThrow(@Safe String reason) throws BootstrappingSafetyException
{
recordNonTransientError(NonTransientError.BOOTSTRAP_ERROR, ImmutableMap.of(reason, "true"));
unsafeDisableNode();
throw new BootstrappingSafetyException(reason);
}

public void recordNonTransientError(NonTransientError nonTransientError, Map<String, String> attributes) {
setMode(Mode.NON_TRANSIENT_ERROR, String.format("Non transient error of type %s", nonTransientError.toString()), true);
NonTransientErrorMetrics.instance.record(nonTransientError);
Expand Down
140 changes: 140 additions & 0 deletions test/unit/com/palantir/cassandra/utils/SchemaAgreementCheckTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.cassandra.utils;

import java.util.Collections;
import java.util.List;
import java.util.UUID;

import com.google.common.collect.ImmutableMap;
import com.google.common.net.InetAddresses;
import org.junit.Test;

import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.gms.ApplicationState;
import org.apache.cassandra.gms.EndpointState;
import org.apache.cassandra.gms.EndpointStateFactory;
import org.apache.cassandra.gms.VersionedValue;

import static org.assertj.core.api.Assertions.assertThat;

public class SchemaAgreementCheckTest
{
private static final VersionedValue.VersionedValueFactory valueFactory = new VersionedValue.VersionedValueFactory(DatabaseDescriptor.getPartitioner());

@Test
public void checkSchemaAgreement_passes()
{
UUID schema = UUID.randomUUID();
EndpointState state = createNormal(schema);

SchemaAgreementCheck schemaAgreementCheck = new SchemaAgreementCheck(() -> schema,
() -> ImmutableMap.of(InetAddresses.forString("127.0.0.1"), state,
InetAddresses.forString("127.0.0.2"), state,
InetAddresses.forString("127.0.0.3"), state).entrySet());
assertThat(schemaAgreementCheck.isSchemaInAgreement()).isTrue();
}

@Test
public void checkSchemaAgreement_failsOnNullSchema()
{
UUID schema = UUID.randomUUID();
EndpointState state1 = createNormal(schema);

EndpointState state2 = EndpointStateFactory.create();
List<Token> tokens = Collections.singletonList(DatabaseDescriptor.getPartitioner().getRandomToken());
state2.addApplicationState(ApplicationState.STATUS, valueFactory.normal(tokens));
state2.addApplicationState(ApplicationState.TOKENS, valueFactory.tokens(tokens));

SchemaAgreementCheck schemaAgreementCheck = new SchemaAgreementCheck(() -> schema,
() -> ImmutableMap.of(InetAddresses.forString("127.0.0.1"), state1,
InetAddresses.forString("127.0.0.2"), state1,
InetAddresses.forString("127.0.0.3"), state2).entrySet());
assertThat(schemaAgreementCheck.isSchemaInAgreement()).isFalse();
}

@Test
public void checkSchemaAgreement_failsOnIncorrectSchema()
{
UUID schema1 = UUID.randomUUID();
EndpointState state1 = createNormal(schema1);
EndpointState state2 = createNormal(UUID.randomUUID());

SchemaAgreementCheck schemaAgreementCheck = new SchemaAgreementCheck(() -> schema1,
() -> ImmutableMap.of(InetAddresses.forString("127.0.0.1"), state1,
InetAddresses.forString("127.0.0.2"), state1,
InetAddresses.forString("127.0.0.3"), state2).entrySet());
assertThat(schemaAgreementCheck.isSchemaInAgreement()).isFalse();
}

@Test
public void checkSchemaAgreement_ignoresLeftNode()
{
UUID schema1 = UUID.randomUUID();
UUID schema2 = UUID.randomUUID();
EndpointState state1 = createNormal(schema1);
EndpointState state2 = createLeft(schema2);

SchemaAgreementCheck schemaAgreementCheck = new SchemaAgreementCheck(() -> schema1,
() -> ImmutableMap.of(InetAddresses.forString("127.0.0.1"), state1,
InetAddresses.forString("127.0.0.2"), state1,
InetAddresses.forString("127.0.0.3"), state2).entrySet());
assertThat(schemaAgreementCheck.isSchemaInAgreement()).isTrue();
}

@Test
public void checkSchemaAgreement_doesNotIgnoreNullStatus()
{
UUID schema1 = UUID.randomUUID();
UUID schema2 = UUID.randomUUID();
EndpointState state1 = createNormal(schema1);

EndpointState state2 = EndpointStateFactory.create();
List<Token> tokens = Collections.singletonList(DatabaseDescriptor.getPartitioner().getRandomToken());
state2.addApplicationState(ApplicationState.SCHEMA, valueFactory.schema(schema2));
state2.addApplicationState(ApplicationState.TOKENS, valueFactory.tokens(tokens));

SchemaAgreementCheck schemaAgreementCheck = new SchemaAgreementCheck(() -> schema1,
() -> ImmutableMap.of(InetAddresses.forString("127.0.0.1"), state1,
InetAddresses.forString("127.0.0.2"), state1,
InetAddresses.forString("127.0.0.3"), state2).entrySet());
assertThat(schemaAgreementCheck.isSchemaInAgreement()).isFalse();
}

private static EndpointState createNormal(UUID schema)
{
EndpointState state = EndpointStateFactory.create();
List<Token> tokens = Collections.singletonList(DatabaseDescriptor.getPartitioner().getRandomToken());
state.addApplicationState(ApplicationState.STATUS, valueFactory.normal(tokens));
state.addApplicationState(ApplicationState.SCHEMA, valueFactory.schema(schema));
state.addApplicationState(ApplicationState.TOKENS, valueFactory.tokens(tokens));
return state;
}

private static EndpointState createLeft(UUID schema)
{
EndpointState state = EndpointStateFactory.create();
List<Token> tokens = Collections.singletonList(DatabaseDescriptor.getPartitioner().getRandomToken());
state.addApplicationState(ApplicationState.STATUS, valueFactory.left(tokens, 1000));
state.addApplicationState(ApplicationState.SCHEMA, valueFactory.schema(schema));
state.addApplicationState(ApplicationState.TOKENS, valueFactory.tokens(tokens));
return state;
}
}
27 changes: 27 additions & 0 deletions test/unit/org/apache/cassandra/gms/EndpointStateFactory.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.gms;

public class EndpointStateFactory
{
public static EndpointState create()
{
return new EndpointState(new HeartBeatState(0));
}
}

0 comments on commit 52c815f

Please sign in to comment.