Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixChangeFeedHangWhenUsingStaleContainerRid #43729

Draft
wants to merge 11 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ private[cosmos] object SparkBridgeInternal {
private[cosmos] def clearCollectionCache(container: CosmosAsyncContainer, obsoleteRid: String): Unit = {
val clientWrapper = container.getDatabase.getDocClientWrapper

val link = container.getLinkWithoutTrailingSlash;
val link = CosmosAsyncContainer.getLinkWithoutTrailingSlash(container.getLink)

val obsoleteValue = new DocumentCollection
obsoleteValue.setResourceId(obsoleteRid)
Expand All @@ -84,7 +84,7 @@ private[cosmos] object SparkBridgeInternal {
.getDatabase
.getDocClientWrapper
.getCollectionCache
.resolveByNameAsync(null, container.getLinkWithoutTrailingSlash, null)
.resolveByNameAsync(null, CosmosAsyncContainer.getLinkWithoutTrailingSlash(container.getLink), null)
.block()

if (documentCollectionHolder != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package com.azure.cosmos;

import com.azure.cosmos.implementation.DocumentCollection;
import com.azure.cosmos.implementation.HttpConstants;
import com.azure.cosmos.implementation.ImplementationBridgeHelpers;
import com.azure.cosmos.implementation.RetryAnalyzer;
import com.azure.cosmos.implementation.Utils;
Expand All @@ -18,6 +19,7 @@
import com.azure.cosmos.implementation.guava25.collect.Multimap;
import com.azure.cosmos.implementation.query.CompositeContinuationToken;
import com.azure.cosmos.implementation.routing.Range;
import com.azure.cosmos.implementation.throughputControl.TestItem;
import com.azure.cosmos.models.ChangeFeedPolicy;
import com.azure.cosmos.models.CosmosChangeFeedRequestOptions;
import com.azure.cosmos.models.CosmosContainerProperties;
Expand Down Expand Up @@ -147,6 +149,15 @@ public static Object[][] changeFeedSplitHandlingDataProvider() {
};
}

@DataProvider(name = "changeFeedWithStaleContainerRidDataProvider")
public static Object[][] changeFeedWithStaleContainerRidDataProvider() {
return new Object[][]{
// re-created container RU, multi-partition container
{ 400, false },
{ 10100, true }
};
}

@Factory(dataProvider = "simpleClientBuildersWithDirect")
public CosmosContainerChangeFeedTest(CosmosClientBuilder clientBuilder) {
super(clientBuilder);
Expand Down Expand Up @@ -1029,6 +1040,161 @@ public void changeFeedQueryCompleteAfterAvailableNow(
}
}

@Test(groups = { "emulator" }, dataProvider = "changeFeedWithStaleContainerRidDataProvider", timeOut = 4 * TIMEOUT)
public void changeFeedQueryWithStaleCollectionRidInContinuationToken(
int throughput,
boolean isMultiPartitionContainer
) throws InterruptedException {
// this test is to validate when using stale container rid in the continuationToken, query change feed will return BadRequestException and does not hang
String testContainerId = UUID.randomUUID().toString();

try {
CosmosContainerProperties containerProperties = new CosmosContainerProperties(testContainerId, "/mypk");
CosmosAsyncContainer testContainer =
createCollection(
this.createdAsyncDatabase,
containerProperties,
new CosmosContainerRequestOptions(),
400);

String testContainerRid = testContainer.read().block().getProperties().getResourceId();

// create items
for (int i = 0; i < 10; i++) {
testContainer.createItem(TestItem.createNewItem()).block();
}

// using query changeFeed
logger.info("Doing initial changeFeed query on the container " + testContainerId);
AtomicReference<String> continuationToken = new AtomicReference<>();
CosmosChangeFeedRequestOptions changeFeedRequestOptions =
CosmosChangeFeedRequestOptions.createForProcessingFromBeginning(FeedRange.forFullRange());
testContainer.queryChangeFeed(changeFeedRequestOptions, TestItem.class)
.byPage()
.doOnNext(response -> {
continuationToken.set(response.getContinuationToken());
})
.blockLast();

logger.info("Delete the container");
testContainer.delete().block();

Thread.sleep(Duration.ofSeconds(5).toMillis());
logger.info("Re-create the container");
testContainer =
createCollection(
this.createdAsyncDatabase,
containerProperties,
new CosmosContainerRequestOptions(),
throughput);
Thread.sleep(Duration.ofSeconds(5).toMillis());
List<FeedRange> feedRanges = testContainer.getFeedRanges().block();
if (isMultiPartitionContainer) {
assertThat(feedRanges.size()).isGreaterThan(1);
} else {
assertThat(feedRanges.size()).isEqualTo(1);
}

String reCreatedContainerRid = testContainer.read().block().getProperties().getResourceId();
assertThat(testContainerRid).isNotEqualTo(reCreatedContainerRid);

logger.info("Using continuation token with incorrect containerRid");
changeFeedRequestOptions = CosmosChangeFeedRequestOptions.createForProcessingFromContinuation(continuationToken.get());
try {
testContainer.queryChangeFeed(changeFeedRequestOptions, TestItem.class)
.byPage()
.blockLast();
fail("ChangeFeed query request should fail when using incorrect collectionRid in the continuation token");
} catch (CosmosException e) {
assertThat(e.getStatusCode()).isEqualTo(HttpConstants.StatusCodes.BADREQUEST);
assertThat(e.getSubStatusCode()).isEqualTo(HttpConstants.SubStatusCodes.INCORRECT_CONTAINER_RID_SUB_STATUS);
}
} finally {
safeDeleteCollection(this.createdAsyncDatabase.getContainer(testContainerId));
}
}

@Test(groups = { "emulator" }, dataProvider = "changeFeedWithStaleContainerRidDataProvider", timeOut = 4 * TIMEOUT)
public void changeFeedQueryWithCorrectContainerRidWithStaledClient(
int throughput,
boolean isMultiPartitionContainer
) throws InterruptedException {
// this test is to validate when container re-created, using correct continuation token on client with stale cache, the request will succeed
String testContainerId = UUID.randomUUID().toString();
CosmosAsyncClient newClient = null;

try {
CosmosContainerProperties containerProperties = new CosmosContainerProperties(testContainerId, "/mypk");
CosmosAsyncContainer testContainer =
createCollection(
this.createdAsyncDatabase,
containerProperties,
new CosmosContainerRequestOptions(),
400);

String testContainerRid = testContainer.read().block().getProperties().getResourceId();

// create items
for (int i = 0; i < 10; i++) {
testContainer.createItem(TestItem.createNewItem()).block();
}

// using query changeFeed
logger.info("Doing initial changeFeed query on the container " + testContainerId);
CosmosChangeFeedRequestOptions changeFeedRequestOptions =
CosmosChangeFeedRequestOptions.createForProcessingFromBeginning(FeedRange.forFullRange());
testContainer.queryChangeFeed(changeFeedRequestOptions, TestItem.class)
.byPage()
.blockLast();

logger.info("Delete the container");
testContainer.delete().block();

Thread.sleep(Duration.ofSeconds(5).toMillis());
logger.info("Re-create the container through a different client");
newClient = this.getClientBuilder().buildAsyncClient();
CosmosAsyncDatabase databaseWithNewClient = newClient.getDatabase(this.createdAsyncDatabase.getId());
CosmosAsyncContainer testContainerWithNewClient =
createCollection(
databaseWithNewClient,
containerProperties,
new CosmosContainerRequestOptions(),
throughput);
Thread.sleep(Duration.ofSeconds(5).toMillis());
List<FeedRange> feedRanges = testContainerWithNewClient.getFeedRanges().block();
if (isMultiPartitionContainer) {
assertThat(feedRanges.size()).isGreaterThan(1);
} else {
assertThat(feedRanges.size()).isEqualTo(1);
}

String reCreatedContainerRid = testContainerWithNewClient.read().block().getProperties().getResourceId();
assertThat(testContainerRid).isNotEqualTo(reCreatedContainerRid);

logger.info("Creating items in the re-created container");
for (int i = 0; i < 10; i++) {
testContainerWithNewClient.createItem(TestItem.createNewItem()).block();
}
logger.info("query changeFeed from re-created container");
AtomicReference<String> continuationToken = new AtomicReference<>();
testContainerWithNewClient
.queryChangeFeed(changeFeedRequestOptions, TestItem.class)
.byPage()
.doOnNext(response -> continuationToken.set(response.getContinuationToken()))
.blockLast();

logger.info("Using the continuation token from the re-created container on previous stale client");
changeFeedRequestOptions = CosmosChangeFeedRequestOptions.createForProcessingFromContinuation(continuationToken.get());
testContainer
.queryChangeFeed(changeFeedRequestOptions, TestItem.class)
.byPage()
.blockLast();
} finally {
safeDeleteCollection(this.createdAsyncDatabase.getContainer(testContainerId));
safeClose(newClient);
}
}

void insertDocuments(
int partitionCount,
int documentCount) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1775,7 +1775,7 @@ public void readYouWriteWithNoExplicitRegionSwitching(
shouldBloomFilterBeAccessed,
new PartitionKey(possiblePartitionKeyWhichSawRequestsInSecondPreferredRegion),
expectedCosmosContainerProperties.getPartitionKeyDefinition(),
resolvedContainer.getLinkWithoutTrailingSlash(),
CosmosAsyncContainer.getLinkWithoutTrailingSlash(resolvedContainer.getLink()),
normalizedSecondPreferredRegion,
documentClient
);
Expand Down Expand Up @@ -1845,7 +1845,7 @@ public void readYouWriteWithExplicitRegionSwitching(
shouldBloomFilterBeAccessed,
new PartitionKey(possiblePartitionKeyWhichSawRequestsInSecondPreferredRegion),
expectedCosmosContainerProperties.getPartitionKeyDefinition(),
resolvedContainer.getLinkWithoutTrailingSlash(),
CosmosAsyncContainer.getLinkWithoutTrailingSlash(resolvedContainer.getLink()),
normalizedSecondPreferredRegion,
documentClient
);
Expand Down Expand Up @@ -1904,7 +1904,7 @@ public void readManyWithNoExplicitRegionSwitching(
shouldBloomFilterBeAccessed,
new PartitionKey(possiblePartitionKeyWhichSawRequestsInSecondPreferredRegion),
expectedCosmosContainerProperties.getPartitionKeyDefinition(),
resolvedContainer.getLinkWithoutTrailingSlash(),
CosmosAsyncContainer.getLinkWithoutTrailingSlash(resolvedContainer.getLink()),
normalizedSecondPreferredRegion,
documentClient
);
Expand Down Expand Up @@ -1958,7 +1958,7 @@ public void readManyWithExplicitRegionSwitching(
shouldBloomFilterBeAccessed,
new PartitionKey(possiblePartitionKeyWhichSawRequestsInSecondPreferredRegion),
expectedCosmosContainerProperties.getPartitionKeyDefinition(),
resolvedContainer.getLinkWithoutTrailingSlash(),
CosmosAsyncContainer.getLinkWithoutTrailingSlash(resolvedContainer.getLink()),
normalizedSecondPreferredRegion,
documentClient
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package com.azure.cosmos.implementation.changefeed.epkversion;

import com.azure.cosmos.CosmosAsyncContainer;
import com.azure.cosmos.implementation.ImplementationBridgeHelpers;
import com.azure.cosmos.implementation.PartitionKeyRangeIsSplittingException;
import com.azure.cosmos.implementation.changefeed.CancellationTokenSource;
import com.azure.cosmos.implementation.changefeed.ChangeFeedContextClient;
Expand All @@ -20,16 +21,20 @@
import com.azure.cosmos.implementation.feedranges.FeedRangeContinuation;
import com.azure.cosmos.implementation.feedranges.FeedRangePartitionKeyRangeImpl;
import com.azure.cosmos.models.ChangeFeedProcessorItem;
import com.azure.cosmos.models.ChangeFeedProcessorOptions;
import org.mockito.Mockito;
import org.testng.annotations.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

import java.util.UUID;

import static org.assertj.core.api.AssertionsForClassTypes.assertThat;

public class PartitionProcessorImplTests {
private static final ImplementationBridgeHelpers.CosmosAsyncContainerHelper.CosmosAsyncContainerAccessor containerAccessor =
ImplementationBridgeHelpers.CosmosAsyncContainerHelper.getCosmosAsyncContainerAccessor();

@Test
public void partitionSplitHappenOnFirstRequest() {
Expand All @@ -42,21 +47,30 @@ public void partitionSplitHappenOnFirstRequest() {

ChangeFeedState changeFeedState = this.getChangeFeedStateWithContinuationToken();
CosmosAsyncContainer containerMock = Mockito.mock(CosmosAsyncContainer.class);
Mockito
.when(containerAccessor.extractCollectionRid(containerMock))
.thenReturn(Mono.just(changeFeedState.getContainerRid()));
ChangeFeedProcessorOptions requestOptions = new ChangeFeedProcessorOptions()
.setStartContinuation(changeFeedState.toString()).setMaxItemCount(10);
ProcessorSettings processorSettings = new ProcessorSettings(changeFeedState, containerMock);
processorSettings.withMaxItemCount(10);

Lease leaseMock = Mockito.mock(ServiceItemLeaseV1.class);
Mockito
.when(leaseMock.getContinuationToken())
.thenReturn(changeFeedState.getContinuation().getCurrentContinuationToken().getToken());

Mockito
.when(leaseMock.getContinuationState(changeFeedState.getContainerRid(), changeFeedState.getMode()))
.thenReturn(changeFeedState);
LeaseCheckpointer leaseCheckpointerMock = Mockito.mock(LeaseCheckpointer.class);
PartitionCheckpointer partitionCheckpointer = new PartitionCheckpointerImpl(leaseCheckpointerMock, leaseMock);

PartitionProcessorImpl<ChangeFeedProcessorItem> partitionProcessor = new PartitionProcessorImpl<>(
observerMock,
changeFeedContextClientMock,
processorSettings,
containerMock,
requestOptions,
changeFeedState.getContainerRid(),
partitionCheckpointer,
leaseMock,
ChangeFeedProcessorItem.class,
Expand Down
Loading
Loading