Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
Add an Action to Setting Index Priority (#241)
Browse files Browse the repository at this point in the history
* add action for setting index priority

* resolve PR comments

* unit test for unhappy path of set index priority action

* bump version of ism-config schema to 3 from 2
  • Loading branch information
bowenlan-amzn authored Jun 25, 2020
1 parent b4ab545 commit c011253
Show file tree
Hide file tree
Showing 11 changed files with 399 additions and 7 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.indexstatemanagement.action

import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.ManagedIndexMetaData
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.ActionConfig.ActionType
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.IndexPriorityActionConfig
import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.Step
import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.indexpriority.AttemptSetIndexPriorityStep
import org.elasticsearch.client.Client
import org.elasticsearch.cluster.service.ClusterService

class IndexPriorityAction(
clusterService: ClusterService,
client: Client,
managedIndexMetaData: ManagedIndexMetaData,
config: IndexPriorityActionConfig
) : Action(ActionType.INDEX_PRIORITY, config, managedIndexMetaData) {

private val attemptSetIndexPriorityStep = AttemptSetIndexPriorityStep(clusterService, client, config, managedIndexMetaData)
private val steps = listOf(attemptSetIndexPriorityStep)

override fun getSteps(): List<Step> = steps

override fun getStepToExecute(): Step = attemptSetIndexPriorityStep
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ abstract class ActionConfig(
REPLICA_COUNT("replica_count"),
FORCE_MERGE("force_merge"),
NOTIFICATION("notification"),
SNAPSHOT("snapshot");
SNAPSHOT("snapshot"),
INDEX_PRIORITY("index_priority");

override fun toString(): String {
return type
Expand Down Expand Up @@ -96,6 +97,7 @@ abstract class ActionConfig(
ActionType.FORCE_MERGE.type -> actionConfig = ForceMergeActionConfig.parse(xcp, index)
ActionType.NOTIFICATION.type -> actionConfig = NotificationActionConfig.parse(xcp, index)
ActionType.SNAPSHOT.type -> actionConfig = SnapshotActionConfig.parse(xcp, index)
ActionType.INDEX_PRIORITY.type -> actionConfig = IndexPriorityActionConfig.parse(xcp, index)
else -> throw IllegalArgumentException("Invalid field: [$fieldName] found in Action.")
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action

import com.amazon.opendistroforelasticsearch.indexstatemanagement.action.Action
import com.amazon.opendistroforelasticsearch.indexstatemanagement.action.IndexPriorityAction
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.ManagedIndexMetaData
import org.elasticsearch.client.Client
import org.elasticsearch.cluster.service.ClusterService
import org.elasticsearch.common.xcontent.ToXContent
import org.elasticsearch.common.xcontent.ToXContentObject
import org.elasticsearch.common.xcontent.XContentBuilder
import org.elasticsearch.common.xcontent.XContentParser
import org.elasticsearch.common.xcontent.XContentParser.Token
import org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken
import org.elasticsearch.script.ScriptService
import java.io.IOException

data class IndexPriorityActionConfig(
val indexPriority: Int,
val index: Int
) : ToXContentObject, ActionConfig(ActionType.INDEX_PRIORITY, index) {

init {
require(indexPriority >= 0) { "IndexPriorityActionConfig index_priority value must be a non-negative number" }
}

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
builder.startObject()
super.toXContent(builder, params).startObject(ActionType.INDEX_PRIORITY.type)
builder.field(INDEX_PRIORITY_FIELD, indexPriority)
return builder.endObject().endObject()
}

override fun isFragment(): Boolean = super<ToXContentObject>.isFragment()

override fun toAction(
clusterService: ClusterService,
scriptService: ScriptService,
client: Client,
managedIndexMetaData: ManagedIndexMetaData
): Action = IndexPriorityAction(clusterService, client, managedIndexMetaData, this)

companion object {
const val INDEX_PRIORITY_FIELD = "priority"

@JvmStatic
@Throws(IOException::class)
fun parse(xcp: XContentParser, index: Int): IndexPriorityActionConfig {
var indexPriority: Int? = null

ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp::getTokenLocation)
while (xcp.nextToken() != Token.END_OBJECT) {
val fieldName = xcp.currentName()
xcp.nextToken()

when (fieldName) {
INDEX_PRIORITY_FIELD -> indexPriority = xcp.intValue()
else -> throw IllegalArgumentException("Invalid field: [$fieldName] found in IndexPriorityActionConfig.")
}
}

return IndexPriorityActionConfig(
indexPriority = requireNotNull(indexPriority) { "$INDEX_PRIORITY_FIELD is null" },
index = index
)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.indexstatemanagement.step.indexpriority

import com.amazon.opendistroforelasticsearch.indexstatemanagement.elasticapi.suspendUntil
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.ManagedIndexMetaData
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.IndexPriorityActionConfig
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.managedindexmetadata.StepMetaData
import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.Step
import org.apache.logging.log4j.LogManager
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest
import org.elasticsearch.action.support.master.AcknowledgedResponse
import org.elasticsearch.client.Client
import org.elasticsearch.cluster.service.ClusterService
import org.elasticsearch.common.settings.Settings

class AttemptSetIndexPriorityStep(
val clusterService: ClusterService,
val client: Client,
val config: IndexPriorityActionConfig,
managedIndexMetaData: ManagedIndexMetaData
) : Step("attempt_set_index_priority", managedIndexMetaData) {

private val logger = LogManager.getLogger(javaClass)
private var stepStatus = StepStatus.STARTING
private var info: Map<String, Any>? = null

override fun isIdempotent() = true

@Suppress("TooGenericExceptionCaught")
override suspend fun execute() {
val indexPriority = config.indexPriority
try {
logger.info("Executing $name on ${managedIndexMetaData.index}")
val updateSettingsRequest = UpdateSettingsRequest()
.indices(managedIndexMetaData.index)
.settings(Settings.builder().put("index.priority", indexPriority))
val response: AcknowledgedResponse = client.admin().indices()
.suspendUntil { updateSettings(updateSettingsRequest, it) }

if (response.isAcknowledged) {
logger.info("Successfully executed $name on ${managedIndexMetaData.index}")
stepStatus = StepStatus.COMPLETED
info = mapOf("message" to "Successfully set index priority to $indexPriority")
} else {
stepStatus = StepStatus.FAILED
info = mapOf("message" to "Failed to set index priority to $indexPriority")
}
} catch (e: Exception) {
logger.error("Failed to set index priority [index=${managedIndexMetaData.index}]", e)
stepStatus = StepStatus.FAILED
val mutableInfo = mutableMapOf("message" to "Failed to set index priority to $indexPriority")
val errorMessage = e.message
if (errorMessage != null) mutableInfo["cause"] = errorMessage
info = mutableInfo.toMap()
}
}

override fun getUpdatedManagedIndexMetaData(currentMetaData: ManagedIndexMetaData): ManagedIndexMetaData {
return currentMetaData.copy(
stepMetaData = StepMetaData(name, getStepStartTime().toEpochMilli(), stepStatus),
transitionTo = null,
info = info
)
}
}
9 changes: 8 additions & 1 deletion src/main/resources/mappings/opendistro-ism-config.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"_meta" : {
"schema_version": 2
"schema_version": 3
},
"dynamic": "strict",
"properties": {
Expand Down Expand Up @@ -175,6 +175,13 @@
}
}
},
"index_priority": {
"properties": {
"priority": {
"type": "integer"
}
}
},
"close": {
"type": "object"
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ class IndexStateManagementIndicesIT : IndexStateManagementRestTestCase() {
val policyId = ESTestCase.randomAlphaOfLength(10)
client().makeRequest("PUT", "$POLICY_BASE_URI/$policyId", emptyMap(), policy.toHttpEntity())
assertIndexExists(INDEX_STATE_MANAGEMENT_INDEX)
verifyIndexSchemaVersion(INDEX_STATE_MANAGEMENT_INDEX, 2)
verifyIndexSchemaVersion(INDEX_STATE_MANAGEMENT_INDEX, 3)
}

fun `test update management index mapping with new schema version`() {
assertIndexDoesNotExist(INDEX_STATE_MANAGEMENT_INDEX)

val mapping = indexStateManagementMappings.trimStart('{').trimEnd('}')
.replace("\"schema_version\": 2", "\"schema_version\": 0")
.replace("\"schema_version\": 3", "\"schema_version\": 0")

createIndex(INDEX_STATE_MANAGEMENT_INDEX, Settings.builder().put("index.hidden", true).build(), mapping)
assertIndexExists(INDEX_STATE_MANAGEMENT_INDEX)
Expand All @@ -43,7 +43,7 @@ class IndexStateManagementIndicesIT : IndexStateManagementRestTestCase() {
client().makeRequest("PUT", "$POLICY_BASE_URI/$policyId", emptyMap(), policy.toHttpEntity())

assertIndexExists(INDEX_STATE_MANAGEMENT_INDEX)
verifyIndexSchemaVersion(INDEX_STATE_MANAGEMENT_INDEX, 2)
verifyIndexSchemaVersion(INDEX_STATE_MANAGEMENT_INDEX, 3)
}

fun `test changing policy on an index that hasn't initialized yet check schema version`() {
Expand All @@ -58,7 +58,7 @@ class IndexStateManagementIndicesIT : IndexStateManagementRestTestCase() {
assertEquals("Policy id does not match", policy.id, managedIndexConfig.policyID)

val mapping = "{" + indexStateManagementMappings.trimStart('{').trimEnd('}')
.replace("\"schema_version\": 2", "\"schema_version\": 0")
.replace("\"schema_version\": 3", "\"schema_version\": 0")

val entity = StringEntity(mapping, ContentType.APPLICATION_JSON)
client().makeRequest(RestRequest.Method.PUT.toString(),
Expand All @@ -72,7 +72,7 @@ class IndexStateManagementIndicesIT : IndexStateManagementRestTestCase() {
RestRequest.Method.POST.toString(),
"${RestChangePolicyAction.CHANGE_POLICY_BASE_URI}/$index", emptyMap(), changePolicy.toHttpEntity())

verifyIndexSchemaVersion(INDEX_STATE_MANAGEMENT_INDEX, 2)
verifyIndexSchemaVersion(INDEX_STATE_MANAGEMENT_INDEX, 3)

assertAffectedIndicesResponseIsEqual(mapOf(FAILURES to false, FAILED_INDICES to emptyList<Any>(), UPDATED_INDICES to 1), response.asMap())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,12 @@ abstract class IndexStateManagementRestTestCase : ESRestTestCase() {
return (indexSettings[indexName]!!["settings"]!!["index.number_of_replicas"] as String).toInt()
}

@Suppress("UNCHECKED_CAST")
protected fun getIndexPrioritySetting(indexName: String): Int {
val indexSettings = getIndexSettings(indexName) as Map<String, Map<String, Map<String, Any?>>>
return (indexSettings[indexName]!!["settings"]!!["index.priority"] as String).toInt()
}

@Suppress("UNCHECKED_CAST")
protected fun getUuid(indexName: String): String {
val indexSettings = getIndexSettings(indexName) as Map<String, Map<String, Map<String, Any?>>>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.N
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.ReadOnlyActionConfig
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.ReadWriteActionConfig
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.ReplicaCountActionConfig
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.IndexPriorityActionConfig
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.RolloverActionConfig
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.SnapshotActionConfig
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.coordinator.ClusterStateManagedIndexConfig
Expand Down Expand Up @@ -145,6 +146,10 @@ fun randomReplicaCountActionConfig(numOfReplicas: Int = ESRestTestCase.randomInt
return ReplicaCountActionConfig(index = 0, numOfReplicas = numOfReplicas)
}

fun randomIndexPriorityActionConfig(indexPriority: Int = ESRestTestCase.randomIntBetween(0, 100)): IndexPriorityActionConfig {
return IndexPriorityActionConfig(index = 0, indexPriority = indexPriority)
}

fun randomForceMergeActionConfig(
maxNumSegments: Int = ESRestTestCase.randomIntBetween(1, 50)
): ForceMergeActionConfig {
Expand Down Expand Up @@ -345,6 +350,11 @@ fun ReplicaCountActionConfig.toJsonString(): String {
return this.toXContent(builder, ToXContent.EMPTY_PARAMS).string()
}

fun IndexPriorityActionConfig.toJsonString(): String {
val builder = XContentFactory.jsonBuilder()
return this.toXContent(builder, ToXContent.EMPTY_PARAMS).string()
}

fun ForceMergeActionConfig.toJsonString(): String {
val builder = XContentFactory.jsonBuilder()
return this.toXContent(builder, ToXContent.EMPTY_PARAMS).string()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.indexstatemanagement.action

import com.amazon.opendistroforelasticsearch.indexstatemanagement.IndexStateManagementRestTestCase
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.Policy
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.State
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.IndexPriorityActionConfig
import com.amazon.opendistroforelasticsearch.indexstatemanagement.randomErrorNotification
import com.amazon.opendistroforelasticsearch.indexstatemanagement.waitFor
import java.time.Instant
import java.time.temporal.ChronoUnit
import java.util.Locale

class IndexPriorityActionIT : IndexStateManagementRestTestCase() {

private val testIndexName = javaClass.simpleName.toLowerCase(Locale.ROOT)

fun `test basic index priority`() {
val indexName = "${testIndexName}_index_1"
val policyID = "${testIndexName}_testPolicyName_1"
val actionConfig = IndexPriorityActionConfig(50, 0)
val states = listOf(State(name = "SetPriorityState", actions = listOf(actionConfig), transitions = listOf()))
val policy = Policy(
id = policyID,
description = "$testIndexName description",
schemaVersion = 1L,
lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS),
errorNotification = randomErrorNotification(),
defaultState = states[0].name,
states = states
)

createPolicy(policy, policyID)

createIndex(indexName, policyID)

val managedIndexConfig = getExistingManagedIndexConfig(indexName)
// Change the runJob start time so the job will trigger in 2 seconds
updateManagedIndexConfigStartTime(managedIndexConfig)

// ism policy initialized
waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(indexName).policyID) }

// change the runJob start time to change index priority
updateManagedIndexConfigStartTime(managedIndexConfig)

waitFor { assertEquals("Index did not set index_priority to ${actionConfig.indexPriority}", actionConfig.indexPriority, getIndexPrioritySetting(indexName)) }
}
}
Loading

0 comments on commit c011253

Please sign in to comment.