-
Notifications
You must be signed in to change notification settings - Fork 25.3k
Suspend Index throttling when relocating #128797
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Suspend Index throttling when relocating #128797
Conversation
Pinging @elastic/es-distributed-indexing (Team:Distributed Indexing) |
I noticed during testing that throttling gets disabled once a shard is moved. I guess this is because of the way the engine is created for the relocated shard. But I haven't had a chance to dig into the relocation code to verify that this is expected behaviour. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for working on this. Left a number of comments.
indexShard.suspendThrottling(); | ||
waitUntilBlocked(ActionListener.assertOnce(onAcquired), timeout, timeUnit, executor); | ||
// TODO: Does this do anything ? Looks like the relocated shard does not have throttling enabled | ||
indexShard.resumeThrottling(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I would prefer to handle this outside this class, we can make a method in IndexShard
that wraps blockOperations
and does this, avoiding sending an object to this method and the effect on testing etc.
Also, notice that this is sort of incorrect as is in that we sometimes call this with the executor set to the generic thread pool. We should instead resume throttling when the listener is called, that will handle all cases.
@@ -2752,6 +2758,40 @@ public void deactivateThrottling() { | |||
} | |||
} | |||
|
|||
public boolean isIndexingPaused() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This does not seem necessary to expose outside IndexShard
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was calling it from RelocationIT. I can remove it. Just for my understanding, why is it risky to expose this ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed in the latest upload
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It exposes internal state from the engine. As such it is not "risky", but exposing more than necessary breaks encapsulation. In particular this one is only there for testing and can be fetched just as easily without this. The IndexShard interface is huge and I'd like to keep the surface it has down.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for explaining Henning
} else { | ||
indexingPaused = engine.isIndexingPaused(); | ||
} | ||
if (indexingPaused) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would prefer to just suspend throttling regardless of pausing or not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I agree that is safer because what if throttling activation sneaks in concurrently with the "acquire permits" operation and an indexing thread gets paused, then we might not be able to acquire permits.
return (indexingPaused); | ||
} | ||
|
||
public boolean suspendThrottling() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do not seem to need the return value. I think we should remove it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, removed.
logger.info("--> index more docs so we have something in the translog"); | ||
for (int i = 10; i < 20; i++) { | ||
prepareIndex("test").setId(Integer.toString(i)).setSource("field", "value" + i).get(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not folllow why this is important to the test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is not. I wrote this test by modifying testRelocationWhileIndexingRandom()
so it's just a carry over from there. Removed it.
assertHitCount(prepareSearch("test").setSize(0), 20); | ||
|
||
logger.info("--> relocate the shard from node1 to node2"); | ||
ClusterRerouteUtils.reroute(client(), new MoveAllocationCommand("test", 0, node_1, node_2)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer to set an allocation rule through index settings. Someting like index.routing.allocation.include._id = node_2
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure I follow this comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or maybe I do. Like this ?
updateIndexSettings(Settings.builder().put("index.routing.allocation.include._id", node_2), "test");
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you want me to change this everywhere in this file ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure how to make that work. I tried this:
updateIndexSettings(Settings.builder().put("index.routing.allocation.include._id", nodes[toNode]), "test");
ensureGreen(ACCEPTABLE_RELOCATION_TIME, "test");
But it looks like this is not enough to ensure that the shard has moved to the target node.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You need to use ._name
if you use node_2
, like done here (though that one excludes, you can do that too - or use include, both should work).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you!
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false)); | ||
|
||
// Relocated shard is not throttled | ||
assertThat(shard.isIndexingPaused(), equalTo(false)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems surprising, why is it not throttled?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I initially thought it might be because the node that we relocate the shard to does not have PAUSE_THROTTLING enabled. But that doesn't help either. So I am guessing it has to do with how we do the relocation, wouldn't we have to recreate the engine on the new node and it probably will not transfer throttling ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But wait, this is the original source shard we are talking about, not the relocated target shard, so it should have throttling enabled after we resume throttling. I will need to look into this a bit more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh I figured it out, it's because the engine is null for the source shard. I will just get rid of this check, I don't think it is useful.
|
||
// Relocated shard is not throttled | ||
assertThat(shard.isIndexingPaused(), equalTo(false)); | ||
logger.info("--> verifying count after relocation ..."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to wait for future.actionGet()
before we can assert the below?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well I figured that if the relocation had gone ahead, the indexing request that future is responsible for must have completed already. But I can add it anyways.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sort of. The problem is that we do not (safely) wait for the thread to be blocked on the lock. So in principle, the indexing request may not have blocked relocation at all and only be done on the target node. Hence it may not have completed yet.
String[] nodes = new String[numberOfNodes]; | ||
logger.info("--> starting [node1] ..."); | ||
nodes[0] = internalCluster().startNode(); | ||
nodes[0] = internalCluster().startNode(Settings.builder().put(IndexingMemoryController.PAUSE_INDEXING_ON_THROTTLE.getKey(), true)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we do this randomly? No need to not verify this for the regular throttling too.
nodes[0] = internalCluster().startNode(Settings.builder().put(IndexingMemoryController.PAUSE_INDEXING_ON_THROTTLE.getKey(), true)); | |
nodes[0] = internalCluster().startNode(Settings.builder().put(IndexingMemoryController.PAUSE_INDEXING_ON_THROTTLE.getKey(), randomBoolean())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure good idea. Took me a minute to parse "No need to not verify" 😄
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for my double negated danglish 🙂
IndexShard shard = indicesService.indexServiceSafe(resolveIndex("test")).getShard(0); | ||
shard.activateThrottling(); | ||
// Verify that indexing is paused for the throttled shard | ||
assertBusy(() -> { assertThat(shard.isIndexingPaused(), equalTo(true)); }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This may not work with the random enable above, but I am also not sure it is important.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup I changed it to check for throttling which will cover both cases.
…exingForPermits Refresh
…exingForPermits Refresh branch
…exingForPermits Refresh branch
…exingForPermits Refresh branch
…exingForPermits Refresh
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left more comments.
IndexShard shard = indicesService.indexServiceSafe(resolveIndex("test")).getShard(0); | ||
shard.activateThrottling(); | ||
// Verify that indexing is paused for the throttled shard | ||
assertThat(shard.isIndexingPaused(), equalTo(true)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am unsure what value this check brings? Could:
assertThat(shard.isIndexingPaused(), equalTo(true)); | |
assertThat(shard.getEngine().isThrottled(), equalTo(true)); |
suffice?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure.
logger.info("--> Try to index a doc while indexing is paused"); | ||
IndexRequestBuilder indexRequestBuilder = prepareIndex("test").setId(Integer.toString(20)).setSource("field", "value" + 20); | ||
var future = indexRequestBuilder.execute(); | ||
expectThrows(ElasticsearchException.class, () -> future.actionGet(10, TimeUnit.SECONDS)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think 10s is too long here, perhaps 500ms? Otherwise IIUC, this test always takes minimum 10s?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes it takes 10s right now. Will change it to 500 ms
|
||
// Relocated shard is not throttled | ||
assertThat(shard.isIndexingPaused(), equalTo(false)); | ||
logger.info("--> verifying count after relocation ..."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sort of. The problem is that we do not (safely) wait for the thread to be blocked on the lock. So in principle, the indexing request may not have blocked relocation at all and only be done on the target node. Hence it may not have completed yet.
assertHitCount(prepareSearch("test").setSize(0), 20); | ||
|
||
logger.info("--> relocate the shard from node1 to node2"); | ||
ClusterRerouteUtils.reroute(client(), new MoveAllocationCommand("test", 0, node_1, node_2)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You need to use ._name
if you use node_2
, like done here (though that one excludes, you can do that too - or use include, both should work).
prepareCreate("test", indexSettings(1, 0)).get(); | ||
|
||
logger.info("--> index 10 docs"); | ||
for (int i = 0; i < 10; i++) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let us randomize the doc count to signal that 10 is not a special number:
for (int i = 0; i < 10; i++) { | |
int numDocs = between(1,10); | |
for (int i = 0; i < numDocs; i++) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
*/ | ||
public abstract void resumeThrottling(); | ||
|
||
public abstract boolean isIndexingPaused(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe my comment above could remove the last usage of this - in that case we should remove the method too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
return (indexingPaused); | ||
} | ||
|
||
public void suspendThrottling() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
public void suspendThrottling() { | |
private void suspendThrottling() { |
} | ||
} | ||
|
||
public void resumeThrottling() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
public void resumeThrottling() { | |
private void resumeThrottling() { |
@@ -90,7 +90,7 @@ public void blockOperations( | |||
waitUntilBlocked(ActionListener.assertOnce(onAcquired), timeout, timeUnit, executor); | |||
} | |||
|
|||
private void waitUntilBlocked(ActionListener<Releasable> onAcquired, long timeout, TimeUnit timeUnit, Executor executor) { | |||
protected void waitUntilBlocked(ActionListener<Releasable> onAcquired, long timeout, TimeUnit timeUnit, Executor executor) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd like to keep these methods private.
indexShardOperationPermits.delayOperations(); | ||
// In case indexing is paused on the shard, suspend throttling so that any currently paused task can | ||
// go ahead and release the indexing permit it holds. | ||
suspendThrottling(); | ||
indexShardOperationPermits.waitUntilBlocked(ActionListener.assertOnce(onAcquired), timeout, timeUnit, executor); | ||
// TODO: Does this do anything ? Looks like the relocated shard does not have throttling enabled | ||
resumeThrottling(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can just use blockOperations
instead, rather than expose the underlying private methods.
Also, we should resume the throttle in the action listener instead.
indexShardOperationPermits.delayOperations(); | |
// In case indexing is paused on the shard, suspend throttling so that any currently paused task can | |
// go ahead and release the indexing permit it holds. | |
suspendThrottling(); | |
indexShardOperationPermits.waitUntilBlocked(ActionListener.assertOnce(onAcquired), timeout, timeUnit, executor); | |
// TODO: Does this do anything ? Looks like the relocated shard does not have throttling enabled | |
resumeThrottling(); | |
// In case indexing is paused on the shard, suspend throttling so that any currently paused task can | |
// go ahead and release the indexing permit it holds. | |
suspendThrottling(); | |
try { | |
indexShardOperationPermits.blockOperations(ActionListener.runAfter(onAcquired, this::resumeThrottling), timeout, timeUnit, executor); | |
} catch (IndexShardClosedException e) { | |
resumeThrottling(); | |
throw e; | |
} |
We can change the exception throw in delayOperations
to use the listener instead in a follow-up (I worry it might have side-effects, so best to tackle separately). Happy to leave with the catch too though not too pretty.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah ok, you want to suspendThrottling() before we start delaying operations even. That should be fine too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, or rather I do not want to expose the difference from the permits class.
…exingForPermits Refresh branch
…uma/elasticsearch into 05192025/UnpauseIndexingForPermits pull
…exingForPermits refresh branch
…uma/elasticsearch into 05192025/UnpauseIndexingForPermits Refresh branch
…exingForPermits refresh branch
Addresses ES-11770.
If index throttling is enabled such that it pauses all indexing threads that try to index into a shard, this can starve other tasks such as relocation that try to acquire all indexing permits. This PR addresses this by suspending throttling to allow the indexing threads that are holding the permits to pass.