diff --git a/keiko-core/keiko-core.gradle b/keiko-core/keiko-core.gradle index 26f289e847..b04f500ba6 100644 --- a/keiko-core/keiko-core.gradle +++ b/keiko-core/keiko-core.gradle @@ -7,7 +7,7 @@ dependencies { api "org.springframework:spring-context" implementation "com.netflix.spectator:spectator-api" - implementation "javax.annotation:javax.annotation-api" + implementation "jakarta.annotation:jakarta.annotation-api" testImplementation project(":keiko-test-common") } diff --git a/keiko-core/src/main/kotlin/com/netflix/spinnaker/q/QueueProcessor.kt b/keiko-core/src/main/kotlin/com/netflix/spinnaker/q/QueueProcessor.kt index e05f271983..93932f5b21 100644 --- a/keiko-core/src/main/kotlin/com/netflix/spinnaker/q/QueueProcessor.kt +++ b/keiko-core/src/main/kotlin/com/netflix/spinnaker/q/QueueProcessor.kt @@ -24,7 +24,7 @@ import com.netflix.spinnaker.q.metrics.NoHandlerCapacity import java.time.Duration import java.util.Random import java.util.concurrent.RejectedExecutionException -import javax.annotation.PostConstruct +import jakarta.annotation.PostConstruct import org.slf4j.Logger import org.slf4j.LoggerFactory.getLogger import org.springframework.scheduling.annotation.Scheduled diff --git a/keiko-redis-spring/src/main/kotlin/com/netflix/spinnaker/config/RedisQueueConfiguration.kt b/keiko-redis-spring/src/main/kotlin/com/netflix/spinnaker/config/RedisQueueConfiguration.kt index 4ce3c6838e..51a0770d03 100644 --- a/keiko-redis-spring/src/main/kotlin/com/netflix/spinnaker/config/RedisQueueConfiguration.kt +++ b/keiko-redis-spring/src/main/kotlin/com/netflix/spinnaker/config/RedisQueueConfiguration.kt @@ -37,6 +37,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty import org.springframework.boot.context.properties.EnableConfigurationProperties import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration +import redis.clients.jedis.Connection import redis.clients.jedis.HostAndPort import redis.clients.jedis.Jedis import redis.clients.jedis.JedisCluster @@ -138,7 +139,7 @@ class RedisQueueConfiguration { @Value("\${redis.connection:redis://localhost:6379}") connection: String, @Value("\${redis.timeout:2000}") timeout: Int, @Value("\${redis.maxattempts:4}") maxAttempts: Int, - redisPoolConfig: GenericObjectPoolConfig + redisPoolConfig: GenericObjectPoolConfig ): JedisCluster { URI.create(connection).let { cx -> val port = if (cx.port == -1) Protocol.DEFAULT_PORT else cx.port diff --git a/keiko-redis/src/main/kotlin/com/netflix/spinnaker/q/redis/AbstractRedisQueue.kt b/keiko-redis/src/main/kotlin/com/netflix/spinnaker/q/redis/AbstractRedisQueue.kt index a0783d8a94..45775e30ac 100644 --- a/keiko-redis/src/main/kotlin/com/netflix/spinnaker/q/redis/AbstractRedisQueue.kt +++ b/keiko-redis/src/main/kotlin/com/netflix/spinnaker/q/redis/AbstractRedisQueue.kt @@ -17,7 +17,6 @@ import java.util.Optional import org.slf4j.Logger import redis.clients.jedis.Jedis import redis.clients.jedis.Transaction -import redis.clients.jedis.commands.JedisClusterCommands import redis.clients.jedis.commands.JedisCommands abstract class AbstractRedisQueue( @@ -79,27 +78,15 @@ abstract class AbstractRedisQueue( internal fun JedisCommands.hgetInt(key: String, field: String, default: Int = 0) = hget(key, field)?.toInt() ?: default - internal fun JedisClusterCommands.hgetInt(key: String, field: String, default: Int = 0) = - hget(key, field)?.toInt() ?: default - internal fun JedisCommands.zismember(key: String, member: String) = zrank(key, member) != null - internal fun JedisClusterCommands.zismember(key: String, member: String) = - zrank(key, member) != null - internal fun JedisCommands.anyZismember(key: String, members: Set) = members.any { zismember(key, it) } - internal fun JedisClusterCommands.anyZismember(key: String, members: Set) = - members.any { zismember(key, it) } - internal fun JedisCommands.firstFingerprint(key: String, fingerprint: Fingerprint) = fingerprint.all.firstOrNull { zismember(key, it) } - internal fun JedisClusterCommands.firstFingerprint(key: String, fingerprint: Fingerprint) = - fingerprint.all.firstOrNull { zismember(key, it) } - @Deprecated("Hashes the attributes property, which is mutable") internal fun Message.hashV1() = Hashing diff --git a/keiko-redis/src/main/kotlin/com/netflix/spinnaker/q/redis/RedisClusterQueue.kt b/keiko-redis/src/main/kotlin/com/netflix/spinnaker/q/redis/RedisClusterQueue.kt index bc0212195b..f3b3be4af8 100644 --- a/keiko-redis/src/main/kotlin/com/netflix/spinnaker/q/redis/RedisClusterQueue.kt +++ b/keiko-redis/src/main/kotlin/com/netflix/spinnaker/q/redis/RedisClusterQueue.kt @@ -360,11 +360,14 @@ class RedisClusterQueue( fun JedisCluster.multi(block: Transaction.() -> Unit) = getConnectionFromSlot(JedisClusterCRC16.getSlot(queueKey)) .use { c -> - c.multi() - .let { tx -> - tx.block() - tx.exec() - } + // Send MULTI command to start a transaction + c.sendCommand(redis.clients.jedis.Protocol.Command.MULTI) + // Wrap the connection in a Transaction object + val tx = Transaction(c) + // Execute the block on the Transaction + tx.block() + // Commit the transaction + tx.exec() } private fun ackMessage(fingerprint: String) { diff --git a/keiko-redis/src/main/kotlin/com/netflix/spinnaker/q/redis/RedisQueue.kt b/keiko-redis/src/main/kotlin/com/netflix/spinnaker/q/redis/RedisQueue.kt index 273c181f7d..81824bd0f9 100644 --- a/keiko-redis/src/main/kotlin/com/netflix/spinnaker/q/redis/RedisQueue.kt +++ b/keiko-redis/src/main/kotlin/com/netflix/spinnaker/q/redis/RedisQueue.kt @@ -51,7 +51,7 @@ import org.slf4j.Logger import org.slf4j.LoggerFactory import org.springframework.scheduling.annotation.Scheduled import redis.clients.jedis.Jedis -import redis.clients.jedis.commands.ScriptingCommands +import redis.clients.jedis.commands.ScriptingKeyCommands import redis.clients.jedis.exceptions.JedisDataException import redis.clients.jedis.params.ZAddParams.zAddParams import redis.clients.jedis.util.Pool @@ -341,7 +341,7 @@ class RedisQueue( } } - internal fun ScriptingCommands.readMessageWithLock(): Triple? { + internal fun ScriptingKeyCommands.readMessageWithLock(): Triple? { try { val response = evalsha( readMessageWithLockScriptSha, diff --git a/keiko-sql/keiko-sql.gradle b/keiko-sql/keiko-sql.gradle index 1be7b119b1..5565698903 100644 --- a/keiko-sql/keiko-sql.gradle +++ b/keiko-sql/keiko-sql.gradle @@ -13,7 +13,8 @@ dependencies { implementation "io.spinnaker.kork:kork-sql" implementation "de.huxhorn.sulky:de.huxhorn.sulky.ulid" implementation "io.github.resilience4j:resilience4j-retry" - implementation "javax.validation:validation-api" + implementation "io.github.resilience4j:resilience4j-vavr" + implementation "jakarta.validation:jakarta.validation-api" implementation "org.jooq:jooq" testImplementation project(":keiko-tck") diff --git a/keiko-sql/src/main/kotlin/com/netflix/spinnaker/config/SqlQueueProperties.kt b/keiko-sql/src/main/kotlin/com/netflix/spinnaker/config/SqlQueueProperties.kt index 1117f23cab..317b4036fb 100644 --- a/keiko-sql/src/main/kotlin/com/netflix/spinnaker/config/SqlQueueProperties.kt +++ b/keiko-sql/src/main/kotlin/com/netflix/spinnaker/config/SqlQueueProperties.kt @@ -3,8 +3,8 @@ package com.netflix.spinnaker.config import com.netflix.spinnaker.kork.sql.config.RetryProperties import com.netflix.spinnaker.kork.sql.config.SqlRetryProperties import java.time.Duration -import javax.validation.constraints.Pattern -import javax.validation.constraints.Positive +import jakarta.validation.constraints.Pattern +import jakarta.validation.constraints.Positive import org.springframework.boot.context.properties.ConfigurationProperties import org.springframework.validation.annotation.Validated diff --git a/orca-clouddriver/orca-clouddriver.gradle b/orca-clouddriver/orca-clouddriver.gradle index 8413e261f0..9decc6736a 100644 --- a/orca-clouddriver/orca-clouddriver.gradle +++ b/orca-clouddriver/orca-clouddriver.gradle @@ -34,7 +34,7 @@ dependencies { implementation("com.fasterxml.jackson.dataformat:jackson-dataformat-yaml") implementation("io.kubernetes:client-java") implementation("com.amazonaws:aws-java-sdk-lambda") - implementation("javax.validation:validation-api") + implementation("jakarta.validation:jakarta.validation-api") implementation("org.jetbrains:annotations") implementation("com.fasterxml.jackson.datatype:jackson-datatype-guava") diff --git a/orca-clouddriver/src/main/java/com/netflix/spinnaker/orca/clouddriver/pipeline/image/DeleteImageStage.java b/orca-clouddriver/src/main/java/com/netflix/spinnaker/orca/clouddriver/pipeline/image/DeleteImageStage.java index f32524b121..91ba5fbb6c 100644 --- a/orca-clouddriver/src/main/java/com/netflix/spinnaker/orca/clouddriver/pipeline/image/DeleteImageStage.java +++ b/orca-clouddriver/src/main/java/com/netflix/spinnaker/orca/clouddriver/pipeline/image/DeleteImageStage.java @@ -20,9 +20,9 @@ import com.netflix.spinnaker.orca.api.pipeline.graph.TaskNode; import com.netflix.spinnaker.orca.api.pipeline.models.StageExecution; import com.netflix.spinnaker.orca.clouddriver.tasks.image.DeleteImageTask; +import jakarta.validation.constraints.NotNull; import java.util.Set; import javax.annotation.Nonnull; -import javax.validation.constraints.NotNull; import org.springframework.stereotype.Component; @Component diff --git a/orca-clouddriver/src/main/java/com/netflix/spinnaker/orca/clouddriver/pipeline/job/UpdateJobProcessesStage.java b/orca-clouddriver/src/main/java/com/netflix/spinnaker/orca/clouddriver/pipeline/job/UpdateJobProcessesStage.java index 520bd9d303..140eddbc38 100644 --- a/orca-clouddriver/src/main/java/com/netflix/spinnaker/orca/clouddriver/pipeline/job/UpdateJobProcessesStage.java +++ b/orca-clouddriver/src/main/java/com/netflix/spinnaker/orca/clouddriver/pipeline/job/UpdateJobProcessesStage.java @@ -22,8 +22,8 @@ import com.netflix.spinnaker.orca.clouddriver.tasks.MonitorKatoTask; import com.netflix.spinnaker.orca.clouddriver.tasks.job.UpdateJobProcessesTask; import com.netflix.spinnaker.orca.clouddriver.tasks.servergroup.ServerGroupCacheForceRefreshTask; +import jakarta.validation.constraints.NotNull; import javax.annotation.Nonnull; -import javax.validation.constraints.NotNull; import org.springframework.stereotype.Component; @Component diff --git a/orca-clouddriver/src/main/java/com/netflix/spinnaker/orca/clouddriver/pipeline/launchconfigurations/DeleteLaunchConfigurationStage.java b/orca-clouddriver/src/main/java/com/netflix/spinnaker/orca/clouddriver/pipeline/launchconfigurations/DeleteLaunchConfigurationStage.java index 5bd584fcdd..e341132007 100644 --- a/orca-clouddriver/src/main/java/com/netflix/spinnaker/orca/clouddriver/pipeline/launchconfigurations/DeleteLaunchConfigurationStage.java +++ b/orca-clouddriver/src/main/java/com/netflix/spinnaker/orca/clouddriver/pipeline/launchconfigurations/DeleteLaunchConfigurationStage.java @@ -21,9 +21,9 @@ import com.netflix.spinnaker.orca.api.pipeline.models.StageExecution; import com.netflix.spinnaker.orca.clouddriver.tasks.MonitorKatoTask; import com.netflix.spinnaker.orca.clouddriver.tasks.launchconfigurations.DeleteLaunchConfigurationTask; +import jakarta.validation.constraints.NotNull; import java.util.Set; import javax.annotation.Nonnull; -import javax.validation.constraints.NotNull; import lombok.Data; import org.springframework.stereotype.Component; diff --git a/orca-clouddriver/src/main/java/com/netflix/spinnaker/orca/clouddriver/pipeline/launchtemplates/DeleteLaunchTemplateStage.java b/orca-clouddriver/src/main/java/com/netflix/spinnaker/orca/clouddriver/pipeline/launchtemplates/DeleteLaunchTemplateStage.java index f5c80ca9bf..a1e59f40ef 100644 --- a/orca-clouddriver/src/main/java/com/netflix/spinnaker/orca/clouddriver/pipeline/launchtemplates/DeleteLaunchTemplateStage.java +++ b/orca-clouddriver/src/main/java/com/netflix/spinnaker/orca/clouddriver/pipeline/launchtemplates/DeleteLaunchTemplateStage.java @@ -21,9 +21,9 @@ import com.netflix.spinnaker.orca.api.pipeline.models.StageExecution; import com.netflix.spinnaker.orca.clouddriver.tasks.MonitorKatoTask; import com.netflix.spinnaker.orca.clouddriver.tasks.launchtemplates.DeleteLaunchTemplateTask; +import jakarta.validation.constraints.NotNull; import java.util.Set; import javax.annotation.Nonnull; -import javax.validation.constraints.NotNull; import lombok.Data; import org.springframework.stereotype.Component; diff --git a/orca-clouddriver/src/main/java/com/netflix/spinnaker/orca/clouddriver/pipeline/servergroup/UpsertDisruptionBudgetStage.java b/orca-clouddriver/src/main/java/com/netflix/spinnaker/orca/clouddriver/pipeline/servergroup/UpsertDisruptionBudgetStage.java index 9cb088fb54..77bf742a39 100644 --- a/orca-clouddriver/src/main/java/com/netflix/spinnaker/orca/clouddriver/pipeline/servergroup/UpsertDisruptionBudgetStage.java +++ b/orca-clouddriver/src/main/java/com/netflix/spinnaker/orca/clouddriver/pipeline/servergroup/UpsertDisruptionBudgetStage.java @@ -22,8 +22,8 @@ import com.netflix.spinnaker.orca.clouddriver.tasks.MonitorKatoTask; import com.netflix.spinnaker.orca.clouddriver.tasks.servergroup.ServerGroupCacheForceRefreshTask; import com.netflix.spinnaker.orca.clouddriver.tasks.servergroup.UpsertDisruptionBudgetTask; +import jakarta.validation.constraints.NotNull; import javax.annotation.Nonnull; -import javax.validation.constraints.NotNull; import org.springframework.stereotype.Component; @Component diff --git a/orca-clouddriver/src/main/java/com/netflix/spinnaker/orca/clouddriver/pipeline/servergroup/strategies/lambda/LambdaTrafficUpdateStrategyInjector.java b/orca-clouddriver/src/main/java/com/netflix/spinnaker/orca/clouddriver/pipeline/servergroup/strategies/lambda/LambdaTrafficUpdateStrategyInjector.java index d4797ba5f1..d03bf5e590 100644 --- a/orca-clouddriver/src/main/java/com/netflix/spinnaker/orca/clouddriver/pipeline/servergroup/strategies/lambda/LambdaTrafficUpdateStrategyInjector.java +++ b/orca-clouddriver/src/main/java/com/netflix/spinnaker/orca/clouddriver/pipeline/servergroup/strategies/lambda/LambdaTrafficUpdateStrategyInjector.java @@ -16,9 +16,9 @@ package com.netflix.spinnaker.orca.clouddriver.pipeline.servergroup.strategies.lambda; +import jakarta.annotation.PostConstruct; import java.util.HashMap; import java.util.Map; -import javax.annotation.PostConstruct; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; diff --git a/orca-clouddriver/src/main/java/com/netflix/spinnaker/orca/clouddriver/pipeline/snapshot/DeleteSnapshotStage.java b/orca-clouddriver/src/main/java/com/netflix/spinnaker/orca/clouddriver/pipeline/snapshot/DeleteSnapshotStage.java index 72be1d1ada..bc0555aece 100644 --- a/orca-clouddriver/src/main/java/com/netflix/spinnaker/orca/clouddriver/pipeline/snapshot/DeleteSnapshotStage.java +++ b/orca-clouddriver/src/main/java/com/netflix/spinnaker/orca/clouddriver/pipeline/snapshot/DeleteSnapshotStage.java @@ -21,9 +21,9 @@ import com.netflix.spinnaker.orca.api.pipeline.models.StageExecution; import com.netflix.spinnaker.orca.clouddriver.tasks.MonitorKatoTask; import com.netflix.spinnaker.orca.clouddriver.tasks.snapshot.DeleteSnapshotTask; +import jakarta.validation.constraints.NotNull; import java.util.Set; import javax.annotation.Nonnull; -import javax.validation.constraints.NotNull; import org.springframework.stereotype.Component; @Component diff --git a/orca-clouddriver/src/main/java/com/netflix/spinnaker/orca/clouddriver/tasks/image/DeleteImageTask.java b/orca-clouddriver/src/main/java/com/netflix/spinnaker/orca/clouddriver/tasks/image/DeleteImageTask.java index 96f760463f..ded215d278 100644 --- a/orca-clouddriver/src/main/java/com/netflix/spinnaker/orca/clouddriver/tasks/image/DeleteImageTask.java +++ b/orca-clouddriver/src/main/java/com/netflix/spinnaker/orca/clouddriver/tasks/image/DeleteImageTask.java @@ -26,12 +26,12 @@ import com.netflix.spinnaker.orca.clouddriver.model.TaskId; import com.netflix.spinnaker.orca.clouddriver.pipeline.image.DeleteImageStage; import com.netflix.spinnaker.orca.clouddriver.utils.CloudProviderAware; +import jakarta.validation.ConstraintViolation; +import jakarta.validation.Validation; import java.util.*; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import javax.annotation.Nonnull; -import javax.validation.ConstraintViolation; -import javax.validation.Validation; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; diff --git a/orca-clouddriver/src/test/java/com/netflix/spinnaker/orca/clouddriver/pipeline/providers/aws/lambda/LambdaTrafficRoutingStageTest.java b/orca-clouddriver/src/test/java/com/netflix/spinnaker/orca/clouddriver/pipeline/providers/aws/lambda/LambdaTrafficRoutingStageTest.java index 8f8c6bdff8..67fec8cf60 100644 --- a/orca-clouddriver/src/test/java/com/netflix/spinnaker/orca/clouddriver/pipeline/providers/aws/lambda/LambdaTrafficRoutingStageTest.java +++ b/orca-clouddriver/src/test/java/com/netflix/spinnaker/orca/clouddriver/pipeline/providers/aws/lambda/LambdaTrafficRoutingStageTest.java @@ -18,6 +18,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.springframework.security.test.web.servlet.request.SecurityMockMvcRequestPostProcessors.csrf; import com.fasterxml.jackson.databind.ObjectMapper; import com.netflix.spinnaker.orca.StageResolver; @@ -82,6 +83,7 @@ public void LambdaTrafficRoutingStageIntegrationTest() throws Exception { this.mockMvc .perform( MockMvcRequestBuilders.post("/orchestrate") + .with(csrf()) .content(content) .contentType(MediaType.APPLICATION_JSON)) .andReturn(); diff --git a/orca-clouddriver/src/test/kotlin/com/netflix/spinnaker/orca/clouddriver/pipeline/KubernetesPreconfiguredJobSpec.kt b/orca-clouddriver/src/test/kotlin/com/netflix/spinnaker/orca/clouddriver/pipeline/KubernetesPreconfiguredJobSpec.kt index 1d59d6bf66..e2f1b99222 100644 --- a/orca-clouddriver/src/test/kotlin/com/netflix/spinnaker/orca/clouddriver/pipeline/KubernetesPreconfiguredJobSpec.kt +++ b/orca-clouddriver/src/test/kotlin/com/netflix/spinnaker/orca/clouddriver/pipeline/KubernetesPreconfiguredJobSpec.kt @@ -28,6 +28,7 @@ import io.mockk.verify import org.springframework.beans.factory.annotation.Autowired import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc import org.springframework.http.MediaType +import org.springframework.security.test.web.servlet.request.SecurityMockMvcRequestPostProcessors.csrf import org.springframework.test.context.TestPropertySource import org.springframework.test.web.servlet.MockMvc import org.springframework.test.web.servlet.get @@ -75,6 +76,7 @@ class KubernetesPreconfiguredJobSpec : JUnit5Minutests { every { katoRestService.requestOperations(any(), any(), any()) } returns TaskId("1") val resp = subject.post("/orchestrate") { + with(csrf()) contentType = MediaType.APPLICATION_JSON content = pipeline }.andReturn().response diff --git a/orca-core-tck/src/main/groovy/com/netflix/spinnaker/orca/pipeline/persistence/PipelineExecutionRepositoryTck.groovy b/orca-core-tck/src/main/groovy/com/netflix/spinnaker/orca/pipeline/persistence/PipelineExecutionRepositoryTck.groovy index 9d20903b06..748dc52e64 100644 --- a/orca-core-tck/src/main/groovy/com/netflix/spinnaker/orca/pipeline/persistence/PipelineExecutionRepositoryTck.groovy +++ b/orca-core-tck/src/main/groovy/com/netflix/spinnaker/orca/pipeline/persistence/PipelineExecutionRepositoryTck.groovy @@ -26,7 +26,7 @@ import com.netflix.spinnaker.orca.pipeline.model.support.TriggerDeserializer import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository.ExecutionCriteria import com.netflix.spinnaker.orca.sql.PipelineRefTriggerDeserializerSupplier import com.netflix.spinnaker.orca.sql.pipeline.persistence.PipelineRefTrigger -import rx.schedulers.Schedulers +import io.reactivex.rxjava3.schedulers.Schedulers import spock.lang.Specification import spock.lang.Subject import spock.lang.Unroll @@ -68,7 +68,7 @@ abstract class PipelineExecutionRepositoryTck ext repository().store(succeededExecution) def pipelines = repository().retrievePipelinesForPipelineConfigId( "pipeline-1", new ExecutionCriteria(pageSize: 5, statuses: ["RUNNING", "SUCCEEDED", "TERMINAL"]) - ).subscribeOn(Schedulers.io()).toList().toBlocking().single() + ).subscribeOn(Schedulers.io()).toList().blockingGet() then: pipelines*.id.sort() == [runningExecution.id, succeededExecution.id].sort() @@ -76,7 +76,7 @@ abstract class PipelineExecutionRepositoryTck ext when: pipelines = repository().retrievePipelinesForPipelineConfigId( "pipeline-1", new ExecutionCriteria(pageSize: 5, statuses: ["RUNNING"]) - ).subscribeOn(Schedulers.io()).toList().toBlocking().single() + ).subscribeOn(Schedulers.io()).toList().blockingGet() then: pipelines*.id.sort() == [runningExecution.id].sort() @@ -84,7 +84,7 @@ abstract class PipelineExecutionRepositoryTck ext when: pipelines = repository().retrievePipelinesForPipelineConfigId( "pipeline-1", new ExecutionCriteria(pageSize: 5, statuses: ["TERMINAL"]) - ).subscribeOn(Schedulers.io()).toList().toBlocking().single() + ).subscribeOn(Schedulers.io()).toList().blockingGet() then: pipelines.isEmpty() @@ -109,7 +109,7 @@ abstract class PipelineExecutionRepositoryTck ext repository().store(succeededExecution) def orchestrations = repository().retrieveOrchestrationsForApplication( runningExecution.application, new ExecutionCriteria(pageSize: 5, statuses: ["RUNNING", "SUCCEEDED", "TERMINAL"]) - ).subscribeOn(Schedulers.io()).toList().toBlocking().single() + ).subscribeOn(Schedulers.io()).toList().blockingGet() then: orchestrations*.id.sort() == [runningExecution.id, succeededExecution.id].sort() @@ -117,7 +117,7 @@ abstract class PipelineExecutionRepositoryTck ext when: orchestrations = repository().retrieveOrchestrationsForApplication( runningExecution.application, new ExecutionCriteria(pageSize: 5, statuses: ["RUNNING"]) - ).subscribeOn(Schedulers.io()).toList().toBlocking().single() + ).subscribeOn(Schedulers.io()).toList().blockingGet() then: orchestrations*.id.sort() == [runningExecution.id].sort() @@ -125,7 +125,7 @@ abstract class PipelineExecutionRepositoryTck ext when: orchestrations = repository().retrieveOrchestrationsForApplication( runningExecution.application, new ExecutionCriteria(pageSize: 5, statuses: ["TERMINAL"]) - ).subscribeOn(Schedulers.io()).toList().toBlocking().single() + ).subscribeOn(Schedulers.io()).toList().blockingGet() then: orchestrations.isEmpty() @@ -247,7 +247,7 @@ abstract class PipelineExecutionRepositoryTck ext repository().store(pipeline) expect: - repository().retrieve(PIPELINE).toBlocking().first().id == pipeline.id + repository().retrieve(PIPELINE).blockingFirst().id == pipeline.id with(repository().retrieve(pipeline.type, pipeline.id)) { id == pipeline.id @@ -316,7 +316,7 @@ abstract class PipelineExecutionRepositoryTck ext thrown ExecutionNotFoundException and: - repository().retrieve(PIPELINE).toList().toBlocking().first() == [] + repository().retrieve(PIPELINE).toList().blockingGet() == [] } def "updateStatus sets startTime to current time if new status is RUNNING"() { @@ -629,7 +629,7 @@ abstract class PipelineExecutionRepositoryTck ext .setPageSize(limit) expect: - with(repository().retrieve(type, criteria).toList().toBlocking().single()) { + with(repository().retrieve(type, criteria).toList().blockingGet()) { size() == expectedResults type.every { it == type } if (statuses) { diff --git a/orca-core/orca-core.gradle b/orca-core/orca-core.gradle index 36bb692228..1e82604220 100644 --- a/orca-core/orca-core.gradle +++ b/orca-core/orca-core.gradle @@ -29,7 +29,7 @@ dependencies { api("io.spinnaker.kork:kork-plugins") api("io.spinnaker.kork:kork-security") api("io.spinnaker.kork:kork-telemetry") - api("io.reactivex:rxjava") + api("io.reactivex.rxjava3:rxjava") api(project(":orca-api")) implementation("com.github.ben-manes.caffeine:guava") @@ -49,8 +49,8 @@ dependencies { implementation("com.fasterxml.jackson.module:jackson-module-kotlin") implementation("org.apache.commons:commons-lang3") implementation("org.apache.httpcomponents:httpclient") - implementation("javax.servlet:javax.servlet-api:4.0.1") - implementation("javax.validation:validation-api") + implementation("jakarta.servlet:jakarta.servlet-api") + implementation("jakarta.validation:jakarta.validation-api") implementation("com.jayway.jsonpath:json-path") implementation("org.yaml:snakeyaml") implementation("org.apache.groovy:groovy") diff --git a/orca-core/src/main/java/com/netflix/spinnaker/orca/config/OrcaConfiguration.java b/orca-core/src/main/java/com/netflix/spinnaker/orca/config/OrcaConfiguration.java index f7087ae7d4..515e26aa49 100644 --- a/orca-core/src/main/java/com/netflix/spinnaker/orca/config/OrcaConfiguration.java +++ b/orca-core/src/main/java/com/netflix/spinnaker/orca/config/OrcaConfiguration.java @@ -50,6 +50,8 @@ import com.netflix.spinnaker.orca.pipeline.StageDefinitionBuilderFactory; import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository; import com.netflix.spinnaker.orca.pipeline.util.ContextParameterProcessor; +import io.reactivex.rxjava3.core.Scheduler; +import io.reactivex.rxjava3.schedulers.Schedulers; import java.time.Clock; import java.time.Duration; import java.util.Collection; @@ -73,8 +75,6 @@ import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; -import rx.Scheduler; -import rx.schedulers.Schedulers; @Configuration @ComponentScan({ diff --git a/orca-core/src/main/java/com/netflix/spinnaker/orca/config/TaskOverrideConfigurationProperties.kt b/orca-core/src/main/java/com/netflix/spinnaker/orca/config/TaskOverrideConfigurationProperties.kt index 0b21a14831..b665248496 100644 --- a/orca-core/src/main/java/com/netflix/spinnaker/orca/config/TaskOverrideConfigurationProperties.kt +++ b/orca-core/src/main/java/com/netflix/spinnaker/orca/config/TaskOverrideConfigurationProperties.kt @@ -1,13 +1,11 @@ package com.netflix.spinnaker.orca.config import org.springframework.boot.context.properties.ConfigurationProperties -import org.springframework.boot.context.properties.ConstructorBinding /** * Task override configuration to use while planning stages. */ @ConfigurationProperties("task-overrides") -@ConstructorBinding public class TaskOverrideConfigurationProperties( /** * list of task overrides. @@ -15,7 +13,6 @@ public class TaskOverrideConfigurationProperties( public var overrideDefinitions: List = listOf() ) { - @ConstructorBinding public class TaskOverrideDefinition( /** * Candidate stage in which we are looking to replace task definition diff --git a/orca-core/src/main/java/com/netflix/spinnaker/orca/jackson/OrcaObjectMapper.java b/orca-core/src/main/java/com/netflix/spinnaker/orca/jackson/OrcaObjectMapper.java index c152f1e73e..966880181b 100644 --- a/orca-core/src/main/java/com/netflix/spinnaker/orca/jackson/OrcaObjectMapper.java +++ b/orca-core/src/main/java/com/netflix/spinnaker/orca/jackson/OrcaObjectMapper.java @@ -21,8 +21,14 @@ import static com.fasterxml.jackson.databind.DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_USING_DEFAULT_VALUE; import static com.fasterxml.jackson.databind.SerializationFeature.WRITE_DATE_TIMESTAMPS_AS_NANOSECONDS; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.Version; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonDeserializer; +import com.fasterxml.jackson.databind.JsonSerializer; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializerProvider; import com.fasterxml.jackson.databind.module.SimpleAbstractTypeResolver; import com.fasterxml.jackson.databind.module.SimpleModule; import com.fasterxml.jackson.datatype.guava.GuavaModule; @@ -39,6 +45,8 @@ import com.netflix.spinnaker.orca.pipeline.model.PipelineExecutionImpl; import com.netflix.spinnaker.orca.pipeline.model.StageExecutionImpl; import com.netflix.spinnaker.orca.pipeline.model.TaskExecutionImpl; +import java.io.IOException; +import org.springframework.http.HttpMethod; public class OrcaObjectMapper { private OrcaObjectMapper() {} @@ -71,8 +79,14 @@ public static ObjectMapper newInstance() { instance.registerModule(module); + SimpleModule httpMethodModule = new SimpleModule(); + httpMethodModule.addSerializer(HttpMethod.class, new HttpMethodSerializer()); + httpMethodModule.addDeserializer(HttpMethod.class, new HttpMethodDeserializer()); + instance.registerModule(httpMethodModule); + return instance; } + /** * Return an ObjectMapper instance that can be reused. Do not change the configuration of this * instance as it will be shared across the entire application, use {@link #newInstance()} @@ -83,4 +97,19 @@ public static ObjectMapper newInstance() { public static ObjectMapper getInstance() { return INSTANCE; } + + static class HttpMethodSerializer extends JsonSerializer { + @Override + public void serialize(HttpMethod value, JsonGenerator gen, SerializerProvider serializer) + throws IOException { + gen.writeString(value.name().toUpperCase()); + } + } + + static class HttpMethodDeserializer extends JsonDeserializer { + @Override + public HttpMethod deserialize(JsonParser p, DeserializationContext ctxt) throws IOException { + return HttpMethod.valueOf(p.getText().toUpperCase()); + } + } } diff --git a/orca-core/src/main/java/com/netflix/spinnaker/orca/notifications/AbstractPollingNotificationAgent.java b/orca-core/src/main/java/com/netflix/spinnaker/orca/notifications/AbstractPollingNotificationAgent.java index 3bbed00214..5a004e8e2f 100644 --- a/orca-core/src/main/java/com/netflix/spinnaker/orca/notifications/AbstractPollingNotificationAgent.java +++ b/orca-core/src/main/java/com/netflix/spinnaker/orca/notifications/AbstractPollingNotificationAgent.java @@ -19,11 +19,11 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.netflix.spinnaker.kork.discovery.RemoteStatusChangedEvent; +import jakarta.annotation.PreDestroy; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; -import javax.annotation.PreDestroy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; diff --git a/orca-core/src/main/java/com/netflix/spinnaker/orca/notifications/scheduling/OldPipelineCleanupPollingNotificationAgent.java b/orca-core/src/main/java/com/netflix/spinnaker/orca/notifications/scheduling/OldPipelineCleanupPollingNotificationAgent.java index c79a175605..8503de1e48 100644 --- a/orca-core/src/main/java/com/netflix/spinnaker/orca/notifications/scheduling/OldPipelineCleanupPollingNotificationAgent.java +++ b/orca-core/src/main/java/com/netflix/spinnaker/orca/notifications/scheduling/OldPipelineCleanupPollingNotificationAgent.java @@ -25,6 +25,9 @@ import com.netflix.spinnaker.orca.notifications.AbstractPollingNotificationAgent; import com.netflix.spinnaker.orca.notifications.NotificationClusterLock; import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository; +import io.reactivex.rxjava3.core.Observable; +import io.reactivex.rxjava3.functions.Function; +import io.reactivex.rxjava3.functions.Predicate; import java.time.Clock; import java.time.Instant; import java.time.temporal.ChronoUnit; @@ -40,8 +43,6 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.stereotype.Component; -import rx.Observable; -import rx.functions.Func1; @Component @ConditionalOnExpression( @@ -54,10 +55,10 @@ public class OldPipelineCleanupPollingNotificationAgent extends AbstractPollingN private final Logger log = LoggerFactory.getLogger(OldPipelineCleanupPollingNotificationAgent.class); - private Func1 filter = - new Func1() { + private Predicate filter = + new Predicate() { @Override - public Boolean call(PipelineExecution execution) { + public boolean test(PipelineExecution execution) { if (!COMPLETED_STATUSES.contains(execution.getStatus().toString())) { return false; } @@ -72,7 +73,7 @@ public Boolean call(PipelineExecution execution) { } }; - private Func1 mapper = + private Function mapper = execution -> new PipelineExecutionDetails( execution.getId(), @@ -162,7 +163,7 @@ protected void tick() { private void cleanupApp(Observable observable) { List allPipelines = - observable.filter(filter).map(mapper).toList().toBlocking().single(); + observable.filter(filter).map(mapper).toList().blockingGet(); Map> groupedPipelines = new HashMap<>(); allPipelines.forEach( diff --git a/orca-core/src/main/java/com/netflix/spinnaker/orca/notifications/scheduling/TopApplicationExecutionCleanupPollingNotificationAgent.java b/orca-core/src/main/java/com/netflix/spinnaker/orca/notifications/scheduling/TopApplicationExecutionCleanupPollingNotificationAgent.java index 57147aa774..4a12a9645b 100644 --- a/orca-core/src/main/java/com/netflix/spinnaker/orca/notifications/scheduling/TopApplicationExecutionCleanupPollingNotificationAgent.java +++ b/orca-core/src/main/java/com/netflix/spinnaker/orca/notifications/scheduling/TopApplicationExecutionCleanupPollingNotificationAgent.java @@ -30,6 +30,9 @@ import com.netflix.spinnaker.orca.notifications.NotificationClusterLock; import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository; import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository.ExecutionCriteria; +import io.reactivex.rxjava3.core.Observable; +import io.reactivex.rxjava3.functions.Function; +import io.reactivex.rxjava3.functions.Predicate; import java.time.Instant; import java.util.HashMap; import java.util.List; @@ -42,8 +45,6 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.stereotype.Component; -import rx.Observable; -import rx.functions.Func1; @Component @ConditionalOnExpression( @@ -53,12 +54,12 @@ public class TopApplicationExecutionCleanupPollingNotificationAgent private final Logger log = LoggerFactory.getLogger(getClass()); - private Func1 filter = + private Predicate filter = (PipelineExecution execution) -> execution.getStatus().isComplete() || Instant.ofEpochMilli(execution.getBuildTime()) .isBefore(Instant.now().minus(31, DAYS)); - private Func1 mapper = + private Function mapper = (PipelineExecution execution) -> { Map builder = new HashMap<>(); builder.put("id", execution.getId()); @@ -139,8 +140,7 @@ protected void tick() { } private void cleanup(Observable observable, String application, String type) { - List executions = - observable.filter(filter).map(mapper).toList().toBlocking().single(); + List executions = observable.filter(filter).map(mapper).toList().blockingGet(); executions.sort(comparing(a -> (Long) Optional.ofNullable(a.get("startTime")).orElse(0L))); if (executions.size() > threshold) { List removingPipelineExecutions = diff --git a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/DualExecutionRepository.kt b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/DualExecutionRepository.kt index 70c1251e45..ba67f82c11 100644 --- a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/DualExecutionRepository.kt +++ b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/DualExecutionRepository.kt @@ -29,7 +29,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression import org.springframework.context.ApplicationContext import org.springframework.context.annotation.Primary import org.springframework.stereotype.Component -import rx.Observable +import io.reactivex.rxjava3.core.Observable import javax.annotation.Nonnull /** @@ -301,9 +301,9 @@ class DualExecutionRepository( sorter: ExecutionComparator? ): MutableList { val result = Observable.merge( - Observable.from(primary.retrieveOrchestrationsForApplication(application, criteria, sorter)), - Observable.from(previous.retrieveOrchestrationsForApplication(application, criteria, sorter)) - ).toList().toBlocking().single().distinctBy { it.id }.toMutableList() + Observable.fromIterable(primary.retrieveOrchestrationsForApplication(application, criteria, sorter)), + Observable.fromIterable(previous.retrieveOrchestrationsForApplication(application, criteria, sorter)) + ).toList().blockingGet().distinctBy { it.id }.toMutableList() return if (sorter != null) { result.asSequence().sortedWith(sorter as Comparator).toMutableList() @@ -338,16 +338,16 @@ class DualExecutionRepository( override fun retrieveBufferedExecutions(): MutableList { return Observable.merge( - Observable.from(primary.retrieveBufferedExecutions()), - Observable.from(previous.retrieveBufferedExecutions()) - ).toList().toBlocking().single().distinctBy { it.id }.toMutableList() + Observable.fromIterable(primary.retrieveBufferedExecutions()), + Observable.fromIterable(previous.retrieveBufferedExecutions()) + ).toList().blockingGet().distinctBy { it.id }.toMutableList() } override fun retrieveAllApplicationNames(executionType: ExecutionType?): MutableList { return Observable.merge( - Observable.from(primary.retrieveAllApplicationNames(executionType)), - Observable.from(previous.retrieveAllApplicationNames(executionType)) - ).toList().toBlocking().single().distinct().toMutableList() + Observable.fromIterable(primary.retrieveAllApplicationNames(executionType)), + Observable.fromIterable(previous.retrieveAllApplicationNames(executionType)) + ).toList().blockingGet().distinct().toMutableList() } override fun retrieveAllApplicationNames( @@ -355,9 +355,9 @@ class DualExecutionRepository( minExecutions: Int ): MutableList { return Observable.merge( - Observable.from(primary.retrieveAllApplicationNames(executionType, minExecutions)), - Observable.from(previous.retrieveAllApplicationNames(executionType, minExecutions)) - ).toList().toBlocking().single().distinct().toMutableList() + Observable.fromIterable(primary.retrieveAllApplicationNames(executionType, minExecutions)), + Observable.fromIterable(previous.retrieveAllApplicationNames(executionType, minExecutions)) + ).toList().blockingGet().distinct().toMutableList() } override fun hasExecution(type: ExecutionType, id: String): Boolean { @@ -366,8 +366,8 @@ class DualExecutionRepository( override fun retrieveAllExecutionIds(type: ExecutionType): MutableList { return Observable.merge( - Observable.from(primary.retrieveAllExecutionIds(type)), - Observable.from(previous.retrieveAllExecutionIds(type)) - ).toList().toBlocking().single().distinct().toMutableList() + Observable.fromIterable(primary.retrieveAllExecutionIds(type)), + Observable.fromIterable(previous.retrieveAllExecutionIds(type)) + ).toList().blockingGet().distinct().toMutableList() } } diff --git a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/ExecutionRepository.java b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/ExecutionRepository.java index 393872e9f2..9bf82963bf 100644 --- a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/ExecutionRepository.java +++ b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/ExecutionRepository.java @@ -22,11 +22,11 @@ import com.netflix.spinnaker.orca.api.pipeline.models.ExecutionType; import com.netflix.spinnaker.orca.api.pipeline.models.PipelineExecution; import com.netflix.spinnaker.orca.api.pipeline.models.StageExecution; +import io.reactivex.rxjava3.core.Observable; import java.time.Instant; import java.util.*; import javax.annotation.Nonnull; import javax.annotation.Nullable; -import rx.Observable; public interface ExecutionRepository { void store(@Nonnull PipelineExecution execution); diff --git a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/InMemoryExecutionRepository.kt b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/InMemoryExecutionRepository.kt index d8a35103e7..f2c50d4b0b 100644 --- a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/InMemoryExecutionRepository.kt +++ b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/InMemoryExecutionRepository.kt @@ -23,7 +23,7 @@ import com.netflix.spinnaker.orca.api.pipeline.models.PipelineExecution import com.netflix.spinnaker.orca.api.pipeline.models.StageExecution import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository.ExecutionComparator import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository.ExecutionCriteria -import rx.Observable +import io.reactivex.rxjava3.core.Observable import java.lang.System.currentTimeMillis import java.time.Instant import java.util.concurrent.ConcurrentHashMap @@ -157,11 +157,11 @@ class InMemoryExecutionRepository : ExecutionRepository { } override fun retrieve(type: ExecutionType): Observable { - return Observable.from(storageFor(type).values) + return Observable.fromIterable(storageFor(type).values) } override fun retrieve(type: ExecutionType, criteria: ExecutionCriteria): Observable { - return Observable.from(storageFor(type).values) + return Observable.fromIterable(storageFor(type).values) } override fun store(execution: PipelineExecution) { @@ -235,7 +235,7 @@ class InMemoryExecutionRepository : ExecutionRepository { application: String, criteria: ExecutionCriteria ): Observable { - return Observable.from( + return Observable.fromIterable( orchestrations.values .filter { it.application == application } .applyCriteria(criteria) @@ -255,7 +255,7 @@ class InMemoryExecutionRepository : ExecutionRepository { } override fun retrievePipelinesForApplication(application: String): Observable { - return Observable.from( + return Observable.fromIterable( pipelines.values .filter { it.application == application } ) @@ -270,7 +270,7 @@ class InMemoryExecutionRepository : ExecutionRepository { pipelineConfigId: String, criteria: ExecutionCriteria ): Observable { - return Observable.from( + return Observable.fromIterable( pipelines.values .filter { it.pipelineConfigId == pipelineConfigId } .applyCriteria(criteria) diff --git a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/tasks/DependsOnExecutionTask.java b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/tasks/DependsOnExecutionTask.java index dfb91c3a43..b50a48d1e5 100644 --- a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/tasks/DependsOnExecutionTask.java +++ b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/tasks/DependsOnExecutionTask.java @@ -26,9 +26,9 @@ import com.netflix.spinnaker.orca.api.pipeline.models.StageExecution; import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionNotFoundException; import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository; +import jakarta.validation.constraints.NotNull; import java.util.concurrent.TimeUnit; import javax.annotation.Nonnull; -import javax.validation.constraints.NotNull; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; diff --git a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/util/ArtifactUtils.java b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/util/ArtifactUtils.java index e98b228192..a42b8961a1 100644 --- a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/util/ArtifactUtils.java +++ b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/util/ArtifactUtils.java @@ -34,6 +34,7 @@ import com.netflix.spinnaker.orca.pipeline.model.StageExecutionImpl; import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository; import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository.ExecutionCriteria; +import io.reactivex.rxjava3.schedulers.Schedulers; import java.io.IOException; import java.util.Collection; import java.util.Collections; @@ -53,7 +54,6 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import rx.schedulers.Schedulers; @Component @NonnullByDefault @@ -303,8 +303,7 @@ private Optional getExecutionForPipelineId( .retrievePipelinesForPipelineConfigId(pipelineId, criteria) .subscribeOn(Schedulers.io()) .toList() - .toBlocking() - .single() + .blockingGet() .stream() .min(startTimeOrId); } diff --git a/orca-core/src/test/groovy/com/netflix/spinnaker/orca/notifications/scheduling/OldPipelineCleanupPollingNotificationAgentSpec.groovy b/orca-core/src/test/groovy/com/netflix/spinnaker/orca/notifications/scheduling/OldPipelineCleanupPollingNotificationAgentSpec.groovy index 21186a3c20..77bb68b03a 100644 --- a/orca-core/src/test/groovy/com/netflix/spinnaker/orca/notifications/scheduling/OldPipelineCleanupPollingNotificationAgentSpec.groovy +++ b/orca-core/src/test/groovy/com/netflix/spinnaker/orca/notifications/scheduling/OldPipelineCleanupPollingNotificationAgentSpec.groovy @@ -22,6 +22,7 @@ import com.netflix.spinnaker.orca.pipeline.model.PipelineExecutionImpl import com.netflix.spinnaker.orca.pipeline.model.TaskExecutionImpl import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository import spock.lang.Specification +import io.reactivex.rxjava3.core.Observable import java.time.Clock import java.time.Duration @@ -50,15 +51,15 @@ class OldPipelineCleanupPollingNotificationAgentSpec extends Specification { ).filter expect: - filter.call(pipeline { + filter.test(pipeline { status = ExecutionStatus.SUCCEEDED startTime = Duration.ofDays(1).toMillis() }) == true - filter.call(pipeline { + filter.test(pipeline { status = ExecutionStatus.RUNNING startTime = Duration.ofDays(1).toMillis() }) == false - filter.call(pipeline { + filter.test(pipeline { status = ExecutionStatus.SUCCEEDED startTime = Duration.ofDays(3).toMillis() }) == false @@ -88,7 +89,7 @@ class OldPipelineCleanupPollingNotificationAgentSpec extends Specification { ).mapper expect: - with(mapper.call(pipeline)) { + with(mapper.apply(pipeline)) { id == "ID1" application == "orca" pipelineConfigId == "P1" @@ -122,7 +123,7 @@ class OldPipelineCleanupPollingNotificationAgentSpec extends Specification { } def executionRepository = Mock(ExecutionRepository) { 1 * retrieveAllApplicationNames(PIPELINE) >> ["orca"] - 1 * retrievePipelinesForApplication("orca") >> rx.Observable.from(pipelines) + 1 * retrievePipelinesForApplication("orca") >> Observable.fromIterable(pipelines) } def pipelineDependencyCleanupOperator = Mock(PipelineDependencyCleanupOperator) def agent = new OldPipelineCleanupPollingNotificationAgent( diff --git a/orca-core/src/test/groovy/com/netflix/spinnaker/orca/notifications/scheduling/TopApplicationPipelineExecutionCleanupPollingNotificationAgentSpec.groovy b/orca-core/src/test/groovy/com/netflix/spinnaker/orca/notifications/scheduling/TopApplicationPipelineExecutionCleanupPollingNotificationAgentSpec.groovy index f9f23e4bf9..15343c46e0 100644 --- a/orca-core/src/test/groovy/com/netflix/spinnaker/orca/notifications/scheduling/TopApplicationPipelineExecutionCleanupPollingNotificationAgentSpec.groovy +++ b/orca-core/src/test/groovy/com/netflix/spinnaker/orca/notifications/scheduling/TopApplicationPipelineExecutionCleanupPollingNotificationAgentSpec.groovy @@ -24,6 +24,7 @@ import com.netflix.spinnaker.orca.pipeline.model.TaskExecutionImpl import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository import spock.lang.Specification import spock.lang.Unroll +import io.reactivex.rxjava3.core.Observable import java.util.concurrent.atomic.AtomicInteger @@ -53,7 +54,7 @@ class TopApplicationPipelineExecutionCleanupPollingNotificationAgentSpec extends status = s } - filter.call(pipeline) == (s == ExecutionStatus.SUCCEEDED) + filter.test(pipeline) == (s == ExecutionStatus.SUCCEEDED) } } @@ -76,7 +77,7 @@ class TopApplicationPipelineExecutionCleanupPollingNotificationAgentSpec extends ).mapper expect: - with(mapper.call(pipeline)) { + with(mapper.apply(pipeline)) { id == "ID1" startTime == 1000 pipelineConfigId == "P1" @@ -92,7 +93,7 @@ class TopApplicationPipelineExecutionCleanupPollingNotificationAgentSpec extends def executionRepository = Mock(ExecutionRepository) { 1 * retrieveAllApplicationNames(_, _) >> ["app1"] - 1 * retrieveOrchestrationsForApplication("app1", _) >> rx.Observable.from(orchestrations) + 1 * retrieveOrchestrationsForApplication("app1", _) >> Observable.fromIterable(orchestrations) } def pipelineDependencyCleanupOperator = Mock(PipelineDependencyCleanupOperator) def agent = new TopApplicationExecutionCleanupPollingNotificationAgent( diff --git a/orca-core/src/test/groovy/com/netflix/spinnaker/orca/pipeline/util/ArtifactUtilsSpec.groovy b/orca-core/src/test/groovy/com/netflix/spinnaker/orca/pipeline/util/ArtifactUtilsSpec.groovy index c3308625e2..9418e30f47 100644 --- a/orca-core/src/test/groovy/com/netflix/spinnaker/orca/pipeline/util/ArtifactUtilsSpec.groovy +++ b/orca-core/src/test/groovy/com/netflix/spinnaker/orca/pipeline/util/ArtifactUtilsSpec.groovy @@ -24,7 +24,7 @@ import com.netflix.spinnaker.kork.artifacts.model.ExpectedArtifact import com.netflix.spinnaker.orca.api.pipeline.models.ExecutionStatus import com.netflix.spinnaker.orca.pipeline.model.DefaultTrigger import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository -import rx.Observable +import io.reactivex.rxjava3.core.Observable import spock.lang.Specification import static com.netflix.spinnaker.orca.test.model.ExecutionBuilder.pipeline diff --git a/orca-echo/orca-echo.gradle b/orca-echo/orca-echo.gradle index 3a38547e95..25b24d0be4 100644 --- a/orca-echo/orca-echo.gradle +++ b/orca-echo/orca-echo.gradle @@ -25,7 +25,7 @@ dependencies { implementation(project(":orca-dry-run")) implementation("io.spinnaker.kork:kork-core") implementation("org.springframework.boot:spring-boot-autoconfigure") - implementation("javax.validation:validation-api") + implementation("jakarta.validation:jakarta.validation-api") implementation("io.spinnaker.fiat:fiat-core:$fiatVersion") implementation("io.spinnaker.fiat:fiat-api:$fiatVersion") implementation("io.spinnaker.kork:kork-retrofit") diff --git a/orca-echo/src/main/java/com/netflix/spinnaker/orca/echo/JiraService.java b/orca-echo/src/main/java/com/netflix/spinnaker/orca/echo/JiraService.java index 053f5279c1..7322ee5abd 100644 --- a/orca-echo/src/main/java/com/netflix/spinnaker/orca/echo/JiraService.java +++ b/orca-echo/src/main/java/com/netflix/spinnaker/orca/echo/JiraService.java @@ -20,11 +20,11 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableMap; import com.netflix.spinnaker.security.AuthenticatedRequest; +import jakarta.validation.ConstraintViolation; +import jakarta.validation.Validation; +import jakarta.validation.constraints.NotNull; import java.util.*; import java.util.stream.Collectors; -import javax.validation.ConstraintViolation; -import javax.validation.Validation; -import javax.validation.constraints.NotNull; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import retrofit.client.Response; diff --git a/orca-front50/src/test/groovy/com/netflix/spinnaker/orca/front50/DependentPipelineStarterSpec.groovy b/orca-front50/src/test/groovy/com/netflix/spinnaker/orca/front50/DependentPipelineStarterSpec.groovy index 2d8a8d9548..cb16602b47 100644 --- a/orca-front50/src/test/groovy/com/netflix/spinnaker/orca/front50/DependentPipelineStarterSpec.groovy +++ b/orca-front50/src/test/groovy/com/netflix/spinnaker/orca/front50/DependentPipelineStarterSpec.groovy @@ -45,7 +45,7 @@ import com.netflix.spinnaker.orca.pipelinetemplate.v1schema.render.JinjaRenderer import org.slf4j.MDC import org.springframework.context.ApplicationContext import org.springframework.context.support.StaticApplicationContext -import rx.Observable +import io.reactivex.rxjava3.core.Observable import spock.lang.Specification import spock.lang.Subject diff --git a/orca-interlink/orca-interlink.gradle b/orca-interlink/orca-interlink.gradle index ef577656c9..2a1d2bc0fb 100644 --- a/orca-interlink/orca-interlink.gradle +++ b/orca-interlink/orca-interlink.gradle @@ -25,7 +25,7 @@ dependencies { implementation(project(":orca-core")) implementation("com.amazonaws:aws-java-sdk-sqs") - implementation("javax.validation:validation-api") + implementation("jakarta.validation:jakarta.validation-api") compileOnly("org.projectlombok:lombok") annotationProcessor("org.projectlombok:lombok") diff --git a/orca-interlink/src/main/java/com/netflix/spinnaker/orca/interlink/events/InterlinkEvent.java b/orca-interlink/src/main/java/com/netflix/spinnaker/orca/interlink/events/InterlinkEvent.java index 8469422002..30cfc68e11 100644 --- a/orca-interlink/src/main/java/com/netflix/spinnaker/orca/interlink/events/InterlinkEvent.java +++ b/orca-interlink/src/main/java/com/netflix/spinnaker/orca/interlink/events/InterlinkEvent.java @@ -22,7 +22,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.netflix.spinnaker.orca.api.pipeline.models.ExecutionType; import com.netflix.spinnaker.orca.pipeline.CompoundExecutionOperator; -import javax.validation.constraints.NotNull; +import jakarta.validation.constraints.NotNull; /** * Common interface for interlink events diff --git a/orca-migration/src/main/kotlin/com/netflix/spinnaker/orca/pipeline/persistence/migration/OrchestrationMigrationAgent.kt b/orca-migration/src/main/kotlin/com/netflix/spinnaker/orca/pipeline/persistence/migration/OrchestrationMigrationAgent.kt index c93eb0745b..7300cebb3a 100644 --- a/orca-migration/src/main/kotlin/com/netflix/spinnaker/orca/pipeline/persistence/migration/OrchestrationMigrationAgent.kt +++ b/orca-migration/src/main/kotlin/com/netflix/spinnaker/orca/pipeline/persistence/migration/OrchestrationMigrationAgent.kt @@ -60,10 +60,9 @@ class OrchestrationMigrationAgent( else -> 0 } } - .limit(1000) + .take(1000) .toList() - .toBlocking() - .single() + .blockingGet() if (unmigratedOrchestrations.isNotEmpty()) { log.info("${unmigratedOrchestrations.size} orchestrations to migrate ($applicationName) [$index/${allApplications.size}]") diff --git a/orca-migration/src/main/kotlin/com/netflix/spinnaker/orca/pipeline/persistence/migration/PipelineMigrationAgent.kt b/orca-migration/src/main/kotlin/com/netflix/spinnaker/orca/pipeline/persistence/migration/PipelineMigrationAgent.kt index 8771c5a822..041fd4d900 100644 --- a/orca-migration/src/main/kotlin/com/netflix/spinnaker/orca/pipeline/persistence/migration/PipelineMigrationAgent.kt +++ b/orca-migration/src/main/kotlin/com/netflix/spinnaker/orca/pipeline/persistence/migration/PipelineMigrationAgent.kt @@ -54,8 +54,7 @@ class PipelineMigrationAgent( .retrievePipelinesForPipelineConfigId(pipelineConfigId, executionCriteria) .filter { !previouslyMigratedPipelineIds.contains(it.id) } .toList() - .toBlocking() - .single() + .blockingGet() if (unmigratedPipelines.isNotEmpty()) { log.info("${unmigratedPipelines.size} pipelines to migrate ($pipelineConfigId) [$index/${allPipelineConfigIds.size}]") diff --git a/orca-peering/orca-peering.gradle b/orca-peering/orca-peering.gradle index 780a87d5d2..98b34e4de7 100644 --- a/orca-peering/orca-peering.gradle +++ b/orca-peering/orca-peering.gradle @@ -21,6 +21,7 @@ dependencies { implementation(project(":orca-core")) implementation("io.spinnaker.kork:kork-sql") + implementation("io.github.resilience4j:resilience4j-vavr") implementation("org.jooq:jooq") implementation("org.springframework.boot:spring-boot-autoconfigure") diff --git a/orca-pipelinetemplate/orca-pipelinetemplate.gradle b/orca-pipelinetemplate/orca-pipelinetemplate.gradle index c4657be27b..889d9e3bf7 100644 --- a/orca-pipelinetemplate/orca-pipelinetemplate.gradle +++ b/orca-pipelinetemplate/orca-pipelinetemplate.gradle @@ -21,7 +21,7 @@ dependencies { implementation("io.spinnaker.kork:kork-exceptions") implementation("com.squareup.okhttp3:okhttp") implementation("commons-codec:commons-codec") - implementation("javax.validation:validation-api") + implementation("jakarta.validation:jakarta.validation-api") compileOnly("org.projectlombok:lombok") annotationProcessor("org.projectlombok:lombok") diff --git a/orca-pipelinetemplate/src/main/java/com/netflix/spinnaker/orca/pipelinetemplate/PipelineTemplatePreprocessor.kt b/orca-pipelinetemplate/src/main/java/com/netflix/spinnaker/orca/pipelinetemplate/PipelineTemplatePreprocessor.kt index 47729f6c6a..0684f4eb95 100644 --- a/orca-pipelinetemplate/src/main/java/com/netflix/spinnaker/orca/pipelinetemplate/PipelineTemplatePreprocessor.kt +++ b/orca-pipelinetemplate/src/main/java/com/netflix/spinnaker/orca/pipelinetemplate/PipelineTemplatePreprocessor.kt @@ -27,7 +27,7 @@ import com.netflix.spinnaker.orca.pipelinetemplate.handler.PipelineTemplateError import com.netflix.spinnaker.orca.pipelinetemplate.handler.SchemaVersionHandler import com.netflix.spinnaker.orca.pipelinetemplate.v2schema.model.V2PipelineTemplate import javax.annotation.Nonnull -import javax.annotation.PostConstruct +import jakarta.annotation.PostConstruct import org.slf4j.LoggerFactory import org.springframework.beans.factory.annotation.Autowired import org.springframework.core.annotation.Order diff --git a/orca-pipelinetemplate/src/main/java/com/netflix/spinnaker/orca/pipelinetemplate/PipelineTemplateService.java b/orca-pipelinetemplate/src/main/java/com/netflix/spinnaker/orca/pipelinetemplate/PipelineTemplateService.java index a3ff7776d5..c146a61447 100644 --- a/orca-pipelinetemplate/src/main/java/com/netflix/spinnaker/orca/pipelinetemplate/PipelineTemplateService.java +++ b/orca-pipelinetemplate/src/main/java/com/netflix/spinnaker/orca/pipelinetemplate/PipelineTemplateService.java @@ -99,9 +99,7 @@ public PipelineExecution retrievePipelineOrNewestExecution( try { return executionRepository .retrievePipelinesForPipelineConfigId(pipelineConfigId, criteria) - .toSingle() - .toBlocking() - .value(); + .blockingFirst(); } catch (NoSuchElementException e) { throw new ExecutionNotFoundException( "No pipeline execution could be found for config id " diff --git a/orca-pipelinetemplate/src/main/java/com/netflix/spinnaker/orca/pipelinetemplate/v1schema/handler/v2/V2TemplateLoaderHandler.java b/orca-pipelinetemplate/src/main/java/com/netflix/spinnaker/orca/pipelinetemplate/v1schema/handler/v2/V2TemplateLoaderHandler.java index 900d16104b..cb48924454 100644 --- a/orca-pipelinetemplate/src/main/java/com/netflix/spinnaker/orca/pipelinetemplate/v1schema/handler/v2/V2TemplateLoaderHandler.java +++ b/orca-pipelinetemplate/src/main/java/com/netflix/spinnaker/orca/pipelinetemplate/v1schema/handler/v2/V2TemplateLoaderHandler.java @@ -30,10 +30,10 @@ import com.netflix.spinnaker.orca.pipelinetemplate.v1schema.render.v2.V2RenderUtil; import com.netflix.spinnaker.orca.pipelinetemplate.v2schema.model.V2PipelineTemplate; import com.netflix.spinnaker.orca.pipelinetemplate.v2schema.model.V2TemplateConfiguration; +import jakarta.validation.constraints.NotNull; import java.util.List; import java.util.Map; import java.util.stream.Collectors; -import javax.validation.constraints.NotNull; public class V2TemplateLoaderHandler implements Handler { diff --git a/orca-queue-redis/src/main/kotlin/com/netflix/spinnaker/config/RedisQueueShovelConfiguration.kt b/orca-queue-redis/src/main/kotlin/com/netflix/spinnaker/config/RedisQueueShovelConfiguration.kt index fd00ee4c19..4d8e7f1224 100644 --- a/orca-queue-redis/src/main/kotlin/com/netflix/spinnaker/config/RedisQueueShovelConfiguration.kt +++ b/orca-queue-redis/src/main/kotlin/com/netflix/spinnaker/config/RedisQueueShovelConfiguration.kt @@ -41,6 +41,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration +import redis.clients.jedis.Connection import redis.clients.jedis.HostAndPort import redis.clients.jedis.Jedis import redis.clients.jedis.JedisCluster @@ -84,7 +85,7 @@ class RedisQueueShovelConfiguration { @Value("\${redis.connection-previous}") previousConnection: String, @Value("\${redis.timeout:2000}") timeout: Int, @Value("\${redis.maxattempts:4}") maxAttempts: Int, - redisPoolConfig: GenericObjectPoolConfig, + redisPoolConfig: GenericObjectPoolConfig, registry: Registry ): JedisCluster { if (mainConnection == previousConnection) { diff --git a/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/QueueShovel.kt b/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/QueueShovel.kt index f52494946e..c37f52f49c 100644 --- a/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/QueueShovel.kt +++ b/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/QueueShovel.kt @@ -24,7 +24,7 @@ import com.netflix.spinnaker.q.Message import com.netflix.spinnaker.q.Queue import java.time.Duration import java.time.Instant -import javax.annotation.PostConstruct +import jakarta.annotation.PostConstruct import org.slf4j.LoggerFactory import org.springframework.scheduling.annotation.Scheduled diff --git a/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/ZombieExecutionService.kt b/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/ZombieExecutionService.kt index 90358b6626..71f16130d9 100644 --- a/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/ZombieExecutionService.kt +++ b/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/ZombieExecutionService.kt @@ -30,8 +30,8 @@ import org.slf4j.LoggerFactory import org.springframework.beans.factory.annotation.Qualifier import org.springframework.boot.autoconfigure.condition.ConditionalOnBean import org.springframework.stereotype.Component -import rx.Scheduler -import rx.schedulers.Schedulers +import io.reactivex.rxjava3.core.Scheduler +import io.reactivex.rxjava3.schedulers.Schedulers /** * Logic related to operating zombie pipeline executions. @@ -70,8 +70,7 @@ class ZombieExecutionService( .filter { hasBeenAroundAWhile(it, minimumInactivity) } .filter(this::queueHasNoMessages) .toList() - .toBlocking() - .first() + .blockingGet() } /** diff --git a/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/admin/HydrateQueueCommand.kt b/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/admin/HydrateQueueCommand.kt index 46cc9ea22f..dfc5e50584 100644 --- a/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/admin/HydrateQueueCommand.kt +++ b/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/admin/HydrateQueueCommand.kt @@ -44,7 +44,7 @@ import java.time.Instant import kotlin.reflect.full.memberProperties import org.slf4j.LoggerFactory import org.springframework.stereotype.Component -import rx.Observable +import io.reactivex.rxjava3.core.Observable /** * Hydrates (best attempt) the queue from current ExecutionRepository state. @@ -71,13 +71,11 @@ class HydrateQueueCommand( .retrieveRunning() .filter(pInTimeWindow) .toList() - .toBlocking() - .first() + .blockingGet() } else { executionRepository.retrieveSingleRunning(p1.executionId) .toList() - .toBlocking() - .first() + .blockingGet() } val output = HydrateQueueOutput( @@ -245,7 +243,7 @@ class HydrateQueueCommand( ?: true private fun ExecutionRepository.retrieveRunning(): Observable = - rx.Observable.merge( + Observable.merge( retrieve(ExecutionType.ORCHESTRATION, ExecutionRepository.ExecutionCriteria().setStatuses(RUNNING)), retrieve(ExecutionType.PIPELINE, ExecutionRepository.ExecutionCriteria().setStatuses(RUNNING)) ) @@ -262,9 +260,9 @@ class HydrateQueueCommand( } } return if (execution == null || execution.status != RUNNING) { - rx.Observable.empty() + Observable.empty() } else { - rx.Observable.just(execution) + Observable.just(execution) } } diff --git a/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/handler/OrcaMessageHandler.kt b/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/handler/OrcaMessageHandler.kt index b91a83b9f4..4f5e0020f0 100644 --- a/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/handler/OrcaMessageHandler.kt +++ b/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/handler/OrcaMessageHandler.kt @@ -126,8 +126,7 @@ internal interface OrcaMessageHandler : MessageHandler { .retrievePipelinesForPipelineConfigId(configId, criteria) .filter { it.id != id } .count() - .toBlocking() - .first() >= maxConcurrentExecutions + .blockingGet() >= maxConcurrentExecutions } else -> false } @@ -138,8 +137,7 @@ internal interface OrcaMessageHandler : MessageHandler { .retrievePipelinesForPipelineConfigId(configId, criteria) .filter { it.id != id } .count() - .toBlocking() - .first() > 0 + .blockingGet() > 0 } } } diff --git a/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/pending/PendingExecutionAgent.kt b/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/pending/PendingExecutionAgent.kt index 99348417c8..e371b470e7 100644 --- a/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/pending/PendingExecutionAgent.kt +++ b/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/pending/PendingExecutionAgent.kt @@ -63,11 +63,11 @@ class PendingExecutionAgent( for (configId in pendingConfigIds) { val runningPipelines = executionRepository.retrievePipelinesForPipelineConfigId(configId, singleRunningCriteria) - .toList().toBlocking().single() + .toList().blockingGet() if (runningPipelines.isEmpty()) { val lastCompletedPipeline = executionRepository.retrievePipelinesForPipelineConfigId(configId, lastCompletedCriteria) - .toList().toBlocking().single() + .toList().blockingGet() val purgeQueue = if (lastCompletedPipeline.any()) { !(lastCompletedPipeline.first().isKeepWaitingPipelines) diff --git a/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/ZombieExecutionServiceTest.kt b/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/ZombieExecutionServiceTest.kt index 6a56475946..cb272783ba 100644 --- a/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/ZombieExecutionServiceTest.kt +++ b/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/ZombieExecutionServiceTest.kt @@ -36,8 +36,8 @@ import org.jetbrains.spek.api.dsl.it import org.jetbrains.spek.api.dsl.on import org.jetbrains.spek.api.lifecycle.CachingMode import org.jetbrains.spek.subject.SubjectSpek -import rx.Observable -import rx.schedulers.Schedulers +import io.reactivex.rxjava3.core.Observable +import io.reactivex.rxjava3.schedulers.Schedulers object ZombieExecutionServiceTest : SubjectSpek({ @@ -50,7 +50,7 @@ object ZombieExecutionServiceTest : SubjectSpek({ repository, queue, clock, - Optional.of(Schedulers.immediate()) + Optional.of(Schedulers.trampoline()) ) } @@ -68,6 +68,7 @@ object ZombieExecutionServiceTest : SubjectSpek({ beforeGroup { whenever(repository.retrieve(ExecutionType.PIPELINE, criteria)) doReturn Observable.just(pipeline) + whenever(repository.retrieve(ExecutionType.ORCHESTRATION, criteria)) doReturn Observable.empty() whenever(queue.containsMessage(any())) doReturn true } @@ -90,6 +91,7 @@ object ZombieExecutionServiceTest : SubjectSpek({ beforeGroup { whenever(repository.retrieve(pipeline.type, criteria)) doReturn Observable.just(pipeline) + whenever(repository.retrieve(ExecutionType.ORCHESTRATION, criteria)) doReturn Observable.empty() whenever(queue.containsMessage(any())) doReturn true } @@ -112,6 +114,7 @@ object ZombieExecutionServiceTest : SubjectSpek({ beforeGroup { whenever(repository.retrieve(ExecutionType.PIPELINE, criteria)) doReturn Observable.just(pipeline) + whenever(repository.retrieve(ExecutionType.ORCHESTRATION, criteria)) doReturn Observable.empty() whenever(queue.containsMessage(any())) doReturn false } diff --git a/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/admin/HydrateQueueCommandTest.kt b/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/admin/HydrateQueueCommandTest.kt index 63d286a4e0..93cdb195be 100644 --- a/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/admin/HydrateQueueCommandTest.kt +++ b/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/admin/HydrateQueueCommandTest.kt @@ -59,7 +59,7 @@ import org.jetbrains.spek.api.dsl.it import org.jetbrains.spek.api.dsl.on import org.jetbrains.spek.api.lifecycle.CachingMode import org.jetbrains.spek.subject.SubjectSpek -import rx.Observable +import io.reactivex.rxjava3.core.Observable object HydrateQueueCommandTest : SubjectSpek({ diff --git a/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/RestartStageHandlerTest.kt b/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/RestartStageHandlerTest.kt index 1fe66b7924..5f44ab737b 100644 --- a/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/RestartStageHandlerTest.kt +++ b/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/RestartStageHandlerTest.kt @@ -73,7 +73,7 @@ import org.jetbrains.spek.api.dsl.it import org.jetbrains.spek.api.lifecycle.CachingMode.GROUP import org.jetbrains.spek.subject.SubjectSpek import org.springframework.test.web.client.ExpectedCount.once -import rx.Observable +import io.reactivex.rxjava3.core.Observable object RestartStageHandlerTest : SubjectSpek({ @@ -480,7 +480,7 @@ object RestartStageHandlerTest : SubjectSpek({ whenever(repository.retrieve(message.executionType, message.executionId)) doReturn pipeline whenever(repository.retrievePipelinesForPipelineConfigId( pipeline.pipelineConfigId, - ExecutionRepository.ExecutionCriteria().setPageSize(2).setStatuses(RUNNING))) doReturn Observable.from(listOf(runningPipeline)) + ExecutionRepository.ExecutionCriteria().setPageSize(2).setStatuses(RUNNING))) doReturn Observable.fromIterable(listOf(runningPipeline)) } afterGroup(::resetMocks) @@ -527,7 +527,7 @@ object RestartStageHandlerTest : SubjectSpek({ whenever(repository.retrieve(message.executionType, message.executionId)) doReturn pipeline whenever(repository.retrievePipelinesForPipelineConfigId( pipeline.pipelineConfigId, - ExecutionRepository.ExecutionCriteria().setPageSize(2).setStatuses(RUNNING))) doReturn Observable.from(listOf(runningPipeline)) + ExecutionRepository.ExecutionCriteria().setPageSize(2).setStatuses(RUNNING))) doReturn Observable.fromIterable(listOf(runningPipeline)) } afterGroup(::resetMocks) diff --git a/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/StartExecutionHandlerTest.kt b/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/StartExecutionHandlerTest.kt index 1cc2da8d7b..a463ffd1ed 100644 --- a/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/StartExecutionHandlerTest.kt +++ b/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/StartExecutionHandlerTest.kt @@ -57,7 +57,7 @@ import org.jetbrains.spek.api.dsl.on import org.jetbrains.spek.api.lifecycle.CachingMode.GROUP import org.jetbrains.spek.subject.SubjectSpek import org.springframework.context.ApplicationEventPublisher -import rx.Observable.just +import io.reactivex.rxjava3.core.Observable.just object StartExecutionHandlerTest : SubjectSpek({ diff --git a/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/metrics/ZombieExecutionCheckingAgentTest.kt b/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/metrics/ZombieExecutionCheckingAgentTest.kt index d7a9d97140..34942de4b0 100644 --- a/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/metrics/ZombieExecutionCheckingAgentTest.kt +++ b/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/metrics/ZombieExecutionCheckingAgentTest.kt @@ -21,6 +21,7 @@ import com.netflix.spectator.api.Registry import com.netflix.spectator.api.Tag import com.netflix.spinnaker.orca.api.pipeline.models.ExecutionStatus.RUNNING import com.netflix.spinnaker.orca.api.pipeline.models.ExecutionStatus.SUCCEEDED +import com.netflix.spinnaker.orca.api.pipeline.models.ExecutionType.ORCHESTRATION import com.netflix.spinnaker.orca.api.pipeline.models.ExecutionType.PIPELINE import com.netflix.spinnaker.orca.api.test.pipeline import com.netflix.spinnaker.orca.notifications.NotificationClusterLock @@ -38,6 +39,7 @@ import com.nhaarman.mockito_kotlin.reset import com.nhaarman.mockito_kotlin.verify import com.nhaarman.mockito_kotlin.verifyNoMoreInteractions import com.nhaarman.mockito_kotlin.whenever +import io.reactivex.rxjava3.core.Observable import java.time.Duration import java.time.Instant.now import java.time.temporal.ChronoUnit.HOURS @@ -48,8 +50,8 @@ import org.jetbrains.spek.api.dsl.it import org.jetbrains.spek.api.dsl.on import org.jetbrains.spek.api.lifecycle.CachingMode.GROUP import org.jetbrains.spek.subject.SubjectSpek -import rx.Observable.just -import rx.schedulers.Schedulers +import io.reactivex.rxjava3.core.Observable.just +import io.reactivex.rxjava3.schedulers.Schedulers object ZombieExecutionCheckingAgentTest : SubjectSpek({ @@ -71,7 +73,7 @@ object ZombieExecutionCheckingAgentTest : SubjectSpek({ val clusterLock: NotificationClusterLock = mock() diff --git a/orca-redis/orca-redis.gradle b/orca-redis/orca-redis.gradle index a755cbca0a..45537ffbb1 100644 --- a/orca-redis/orca-redis.gradle +++ b/orca-redis/orca-redis.gradle @@ -6,7 +6,7 @@ dependencies { implementation(project(":orca-core")) implementation(project(":orca-front50")) - implementation("io.reactivex:rxjava") + implementation("io.reactivex.rxjava3:rxjava") implementation("net.logstash.logback:logstash-logback-encoder") testImplementation(project(":orca-core-tck")) diff --git a/orca-redis/src/main/java/com/netflix/spinnaker/orca/config/JedisConfiguration.java b/orca-redis/src/main/java/com/netflix/spinnaker/orca/config/JedisConfiguration.java index 480ed44ea4..1d53cf52f7 100644 --- a/orca-redis/src/main/java/com/netflix/spinnaker/orca/config/JedisConfiguration.java +++ b/orca-redis/src/main/java/com/netflix/spinnaker/orca/config/JedisConfiguration.java @@ -15,13 +15,13 @@ */ package com.netflix.spinnaker.orca.config; +import io.reactivex.rxjava3.core.Scheduler; +import io.reactivex.rxjava3.schedulers.Schedulers; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; -import rx.Scheduler; -import rx.schedulers.Schedulers; @Configuration public class JedisConfiguration { diff --git a/orca-redis/src/main/java/com/netflix/spinnaker/orca/config/RedisConfiguration.java b/orca-redis/src/main/java/com/netflix/spinnaker/orca/config/RedisConfiguration.java index 4d9765aff1..1a4745ebd1 100644 --- a/orca-redis/src/main/java/com/netflix/spinnaker/orca/config/RedisConfiguration.java +++ b/orca-redis/src/main/java/com/netflix/spinnaker/orca/config/RedisConfiguration.java @@ -30,6 +30,7 @@ import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository; import com.netflix.spinnaker.orca.pipeline.persistence.jedis.RedisExecutionRepository; import groovy.util.logging.Slf4j; +import io.reactivex.rxjava3.core.Scheduler; import java.time.Clock; import java.util.Collections; import java.util.Optional; @@ -44,7 +45,6 @@ import org.springframework.context.annotation.*; import redis.clients.jedis.JedisCluster; import redis.clients.jedis.JedisPoolConfig; -import rx.Scheduler; @Slf4j @Configuration diff --git a/orca-redis/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/jedis/RedisExecutionRepository.java b/orca-redis/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/jedis/RedisExecutionRepository.java index e8d1721eb8..42dac247f1 100644 --- a/orca-redis/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/jedis/RedisExecutionRepository.java +++ b/orca-redis/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/jedis/RedisExecutionRepository.java @@ -27,8 +27,8 @@ import static java.lang.System.currentTimeMillis; import static java.util.Collections.*; import static net.logstash.logback.argument.StructuredArguments.value; -import static redis.clients.jedis.ListPosition.AFTER; -import static redis.clients.jedis.ListPosition.BEFORE; +import static redis.clients.jedis.args.ListPosition.AFTER; +import static redis.clients.jedis.args.ListPosition.BEFORE; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; @@ -44,6 +44,12 @@ import com.netflix.spinnaker.orca.jackson.OrcaObjectMapper; import com.netflix.spinnaker.orca.pipeline.model.*; import com.netflix.spinnaker.orca.pipeline.persistence.*; +import io.reactivex.rxjava3.core.Observable; +import io.reactivex.rxjava3.core.Scheduler; +import io.reactivex.rxjava3.functions.BiFunction; +import io.reactivex.rxjava3.functions.Function; +import io.reactivex.rxjava3.functions.Supplier; +import io.reactivex.rxjava3.schedulers.Schedulers; import java.io.IOException; import java.util.*; import java.util.concurrent.Executors; @@ -58,16 +64,10 @@ import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import redis.clients.jedis.ListPosition; import redis.clients.jedis.Response; -import redis.clients.jedis.ScanParams; -import redis.clients.jedis.ScanResult; -import rx.Observable; -import rx.Scheduler; -import rx.functions.Func0; -import rx.functions.Func1; -import rx.functions.Func2; -import rx.schedulers.Schedulers; +import redis.clients.jedis.args.ListPosition; +import redis.clients.jedis.params.ScanParams; +import redis.clients.jedis.resps.ScanResult; public class RedisExecutionRepository implements ExecutionRepository { @@ -351,7 +351,7 @@ public void updateStatus( execution -> criteria.getStatuses().contains(execution.getStatus())); } if (criteria.getPageSize() > 0) { - observable = observable.limit(criteria.getPageSize()); + observable = observable.take(criteria.getPageSize()); } return observable; }) @@ -426,15 +426,16 @@ public void delete(@Nonnull ExecutionType type, @Nonnull List idsToDelet })); } - Func2, Func1>> fnBuilder = - (RedisClientDelegate redisClientDelegate, Iterable pipelineIds) -> - (String key) -> - !criteria.getStatuses().isEmpty() - ? pipelineIds - : redisClientDelegate.withCommandsClient( - p -> { - return p.zrevrange(key, 0, (criteria.getPageSize() - 1)); - }); + BiFunction, Function>> + fnBuilder = + (RedisClientDelegate redisClientDelegate, Iterable pipelineIds) -> + (String key) -> + !criteria.getStatuses().isEmpty() + ? pipelineIds + : redisClientDelegate.withCommandsClient( + p -> { + return p.zrevrange(key, 0, (criteria.getPageSize() - 1)); + }); /* * Construct an observable that will retrieve pipelines from the primary redis @@ -444,13 +445,18 @@ public void delete(@Nonnull ExecutionType type, @Nonnull List idsToDelet currentPipelineIds = currentPipelineIds.subList(0, Math.min(criteria.getPageSize(), currentPipelineIds.size())); - Observable currentObservable = - retrieveObservable( - PIPELINE, - executionsByPipelineKey(pipelineConfigId), - fnBuilder.call(redisClientDelegate, currentPipelineIds), - queryByAppScheduler, - redisClientDelegate); + Observable currentObservable = null; + try { + currentObservable = + retrieveObservable( + PIPELINE, + executionsByPipelineKey(pipelineConfigId), + fnBuilder.apply(redisClientDelegate, currentPipelineIds), + queryByAppScheduler, + redisClientDelegate); + } catch (Throwable e) { + throw new RuntimeException(); + } if (previousRedisClientDelegate.isPresent()) { /* @@ -464,13 +470,18 @@ public void delete(@Nonnull ExecutionType type, @Nonnull List idsToDelet previousPipelineIds.subList( 0, Math.min(criteria.getPageSize(), previousPipelineIds.size())); - Observable previousObservable = - retrieveObservable( - PIPELINE, - executionsByPipelineKey(pipelineConfigId), - fnBuilder.call(previousRedisClientDelegate.get(), previousPipelineIds), - queryByAppScheduler, - previousRedisClientDelegate.get()); + Observable previousObservable = null; + try { + previousObservable = + retrieveObservable( + PIPELINE, + executionsByPipelineKey(pipelineConfigId), + fnBuilder.apply(previousRedisClientDelegate.get(), previousPipelineIds), + queryByAppScheduler, + previousRedisClientDelegate.get()); + } catch (Throwable e) { + throw new RuntimeException(); + } // merge primary + secondary observables return Observable.merge(currentObservable, previousObservable); @@ -609,18 +620,19 @@ public List retrieveAllPipelinesForPipelineConfigIdsBetweenBu })); } - Func2, Func1>> fnBuilder = - (RedisClientDelegate redisClientDelegate, Iterable orchestrationIds) -> - (String key) -> - (redisClientDelegate.withCommandsClient( - c -> { - if (!criteria.getStatuses().isEmpty()) { - return orchestrationIds; - } - List unfiltered = new ArrayList<>(c.smembers(key)); - return unfiltered.subList( - 0, Math.min(criteria.getPageSize(), unfiltered.size())); - })); + BiFunction, Function>> + fnBuilder = + (RedisClientDelegate redisClientDelegate, Iterable orchestrationIds) -> + (String key) -> + (redisClientDelegate.withCommandsClient( + c -> { + if (!criteria.getStatuses().isEmpty()) { + return orchestrationIds; + } + List unfiltered = new ArrayList<>(c.smembers(key)); + return unfiltered.subList( + 0, Math.min(criteria.getPageSize(), unfiltered.size())); + })); /* * Construct an observable that will retrieve orchestrations frcm the primary redis @@ -631,13 +643,18 @@ public List retrieveAllPipelinesForPipelineConfigIdsBetweenBu currentOrchestrationIds.subList( 0, Math.min(criteria.getPageSize(), currentOrchestrationIds.size())); - Observable currentObservable = - retrieveObservable( - ORCHESTRATION, - allOrchestrationsKey, - fnBuilder.call(redisClientDelegate, currentOrchestrationIds), - queryByAppScheduler, - redisClientDelegate); + Observable currentObservable = null; + try { + currentObservable = + retrieveObservable( + ORCHESTRATION, + allOrchestrationsKey, + fnBuilder.apply(redisClientDelegate, currentOrchestrationIds), + queryByAppScheduler, + redisClientDelegate); + } catch (Throwable e) { + throw new RuntimeException(); + } if (previousRedisClientDelegate.isPresent()) { /* @@ -651,13 +668,18 @@ public List retrieveAllPipelinesForPipelineConfigIdsBetweenBu previousOrchestrationIds.subList( 0, Math.min(criteria.getPageSize(), previousOrchestrationIds.size())); - Observable previousObservable = - retrieveObservable( - ORCHESTRATION, - allOrchestrationsKey, - fnBuilder.call(previousRedisClientDelegate.get(), previousOrchestrationIds), - queryByAppScheduler, - previousRedisClientDelegate.get()); + Observable previousObservable = null; + try { + previousObservable = + retrieveObservable( + ORCHESTRATION, + allOrchestrationsKey, + fnBuilder.apply(previousRedisClientDelegate.get(), previousOrchestrationIds), + queryByAppScheduler, + previousRedisClientDelegate.get()); + } catch (Throwable e) { + throw new RuntimeException(); + } // merge primary + secondary observables return Observable.merge(currentObservable, previousObservable); @@ -685,8 +707,7 @@ public List retrieveOrchestrationsForApplication( }) .subscribeOn(Schedulers.io()) .toList() - .toBlocking() - .single(); + .blockingGet(); if (sorter != null) { executions.sort(sorter); @@ -775,8 +796,7 @@ PIPELINE, allBufferedExecutionsKey(PIPELINE), queryAllScheduler, d), return Observable.merge(observables) .filter(e -> e.getStatus() == BUFFERED) .toList() - .toBlocking() - .single(); + .blockingGet(); } @Nonnull @@ -1191,7 +1211,7 @@ private Stream getExecutionForPipelineConfigId( Long buildTimeStartBoundary, Long buildTimeEndBoundary) { String executionsKey = executionsByPipelineKey(pipelineConfigId); - Set executionIds = + List executionIds = delegate.withCommandsClient( c -> { return c.zrangeByScore(executionsKey, buildTimeStartBoundary, buildTimeEndBoundary); @@ -1251,7 +1271,7 @@ protected Observable retrieveObservable( Scheduler scheduler, RedisClientDelegate redisClientDelegate) { - Func0>> fnBuilder = + Supplier>> fnBuilder = () -> (String key) -> redisClientDelegate.withCommandsClient( @@ -1259,14 +1279,18 @@ protected Observable retrieveObservable( return c.smembers(key); }); - return retrieveObservable(type, lookupKey, fnBuilder.call(), scheduler, redisClientDelegate); + try { + return retrieveObservable(type, lookupKey, fnBuilder.get(), scheduler, redisClientDelegate); + } catch (Throwable e) { + throw new RuntimeException(); + } } @SuppressWarnings("unchecked") protected Observable retrieveObservable( ExecutionType type, String lookupKey, - Func1> lookupKeyFetcher, + Function> lookupKeyFetcher, Scheduler scheduler, RedisClientDelegate redisClientDelegate) { return Observable.just(lookupKey) @@ -1274,7 +1298,7 @@ protected Observable retrieveObservable( .buffer(chunkSize) .flatMap( (Collection ids) -> - Observable.from(ids) + Observable.fromIterable(ids) .flatMap( (String executionId) -> { try { diff --git a/orca-redis/src/main/java/com/netflix/spinnaker/orca/telemetry/RedisPoolMetricsPostProcessor.java b/orca-redis/src/main/java/com/netflix/spinnaker/orca/telemetry/RedisPoolMetricsPostProcessor.java index f13464b515..0522954390 100644 --- a/orca-redis/src/main/java/com/netflix/spinnaker/orca/telemetry/RedisPoolMetricsPostProcessor.java +++ b/orca-redis/src/main/java/com/netflix/spinnaker/orca/telemetry/RedisPoolMetricsPostProcessor.java @@ -17,8 +17,6 @@ package com.netflix.spinnaker.orca.telemetry; import com.netflix.spectator.api.Registry; -import java.lang.reflect.Field; -import org.apache.commons.pool2.impl.GenericObjectPool; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Component; @@ -37,11 +35,8 @@ public RedisPoolMetricsPostProcessor(Registry registry) { @SuppressWarnings("unchecked") @Override - protected void applyMetrics(JedisPool bean, String beanName) - throws NoSuchFieldException, IllegalAccessException { - final Field poolAccess = Pool.class.getDeclaredField("internalPool"); - poolAccess.setAccessible(true); - GenericObjectPool pool = (GenericObjectPool) poolAccess.get(bean); + protected void applyMetrics(JedisPool bean, String beanName) { + final Pool pool = bean; registry.gauge( registry.createId("redis.connectionPool.maxIdle", "poolName", beanName), pool, diff --git a/orca-redis/src/test/groovy/com/netflix/spinnaker/orca/pipeline/persistence/jedis/JedisPipelineExecutionRepositorySpec.groovy b/orca-redis/src/test/groovy/com/netflix/spinnaker/orca/pipeline/persistence/jedis/JedisPipelineExecutionRepositorySpec.groovy index 979e4c50ab..e067b196df 100644 --- a/orca-redis/src/test/groovy/com/netflix/spinnaker/orca/pipeline/persistence/jedis/JedisPipelineExecutionRepositorySpec.groovy +++ b/orca-redis/src/test/groovy/com/netflix/spinnaker/orca/pipeline/persistence/jedis/JedisPipelineExecutionRepositorySpec.groovy @@ -118,7 +118,7 @@ class JedisPipelineExecutionRepositorySpec extends PipelineExecutionRepositoryTc jedis.sadd("allJobs:pipeline", id) when: - def result = repository.retrieve(PIPELINE).toList().toBlocking().first() + def result = repository.retrieve(PIPELINE).toList().blockingGet() then: result.isEmpty() @@ -145,7 +145,7 @@ class JedisPipelineExecutionRepositorySpec extends PipelineExecutionRepositoryTc then: jedis.zrange(RedisExecutionRepository.executionsByPipelineKey(pipeline.pipelineConfigId), 0, 1) == [ pipeline.id - ] as Set + ] as List when: repository.delete(pipeline.type, pipeline.id) @@ -155,7 +155,7 @@ class JedisPipelineExecutionRepositorySpec extends PipelineExecutionRepositoryTc thrown ExecutionNotFoundException and: - repository.retrieve(PIPELINE).toList().toBlocking().first() == [] + repository.retrieve(PIPELINE).toList().blockingGet() == [] jedis.zrange(RedisExecutionRepository.executionsByPipelineKey(pipeline.pipelineConfigId), 0, 1).isEmpty() } @@ -199,7 +199,7 @@ class JedisPipelineExecutionRepositorySpec extends PipelineExecutionRepositoryTc when: def retrieved = repository.retrieveOrchestrationsForApplication("orca", new ExecutionCriteria(pageSize: limit)) - .toList().toBlocking().first() + .toList().blockingGet() then: retrieved.size() == actual @@ -310,7 +310,7 @@ class JedisPipelineExecutionRepositorySpec extends PipelineExecutionRepositoryTc when: // TODO-AJ limits are current applied to each backing redis def retrieved = repository.retrieveOrchestrationsForApplication("orca", new ExecutionCriteria(pageSize: 2)) - .toList().toBlocking().first() + .toList().blockingGet() then: // orchestrations are stored in an unsorted set and results are non-deterministic @@ -335,7 +335,7 @@ class JedisPipelineExecutionRepositorySpec extends PipelineExecutionRepositoryTc when: repository.delete(orchestration1.type, orchestration1.id) def retrieved = repository.retrieveOrchestrationsForApplication("orca", new ExecutionCriteria(pageSize: 2)) - .toList().toBlocking().first() + .toList().blockingGet() then: retrieved*.id == [orchestration2.id] @@ -343,7 +343,7 @@ class JedisPipelineExecutionRepositorySpec extends PipelineExecutionRepositoryTc when: repository.delete(orchestration2.type, orchestration2.id) retrieved = repository.retrieveOrchestrationsForApplication("orca", new ExecutionCriteria(pageSize: 2)) - .toList().toBlocking().first() + .toList().blockingGet() then: retrieved.isEmpty() @@ -459,7 +459,7 @@ class JedisPipelineExecutionRepositorySpec extends PipelineExecutionRepositoryTc when: // TODO-AJ limits are current applied to each backing redis def retrieved = repository.retrievePipelinesForPipelineConfigId("pipeline-1", new ExecutionCriteria(pageSize: 2)) - .toList().toBlocking().first() + .toList().blockingGet() then: // pipelines are stored in a sorted sets and results should be reverse buildTime ordered @@ -486,7 +486,7 @@ class JedisPipelineExecutionRepositorySpec extends PipelineExecutionRepositoryTc when: repository.delete(pipeline1.type, pipeline1.id) def retrieved = repository.retrievePipelinesForPipelineConfigId("pipeline-1", new ExecutionCriteria(pageSize: 2)) - .toList().toBlocking().first() + .toList().blockingGet() then: retrieved*.id == [pipeline2.id] @@ -494,7 +494,7 @@ class JedisPipelineExecutionRepositorySpec extends PipelineExecutionRepositoryTc when: repository.delete(pipeline2.type, pipeline2.id) retrieved = repository.retrievePipelinesForPipelineConfigId("pipeline-1", new ExecutionCriteria(pageSize: 2)) - .toList().toBlocking().first() + .toList().blockingGet() then: retrieved.isEmpty() diff --git a/orca-retrofit/orca-retrofit.gradle b/orca-retrofit/orca-retrofit.gradle index 85b879eaff..0ec086dc27 100644 --- a/orca-retrofit/orca-retrofit.gradle +++ b/orca-retrofit/orca-retrofit.gradle @@ -24,7 +24,7 @@ dependencies { api("com.jakewharton.retrofit:retrofit1-okhttp3-client") implementation(project(":orca-core")) - implementation("io.reactivex:rxjava") + implementation("io.reactivex.rxjava3:rxjava") implementation("io.spinnaker.kork:kork-retrofit") implementation("com.jakewharton.retrofit:retrofit1-okhttp3-client:1.1.0") implementation "com.google.guava:guava" diff --git a/orca-sql/orca-sql.gradle b/orca-sql/orca-sql.gradle index 2f0ca54757..077306b912 100644 --- a/orca-sql/orca-sql.gradle +++ b/orca-sql/orca-sql.gradle @@ -29,6 +29,7 @@ dependencies { implementation("io.spinnaker.kork:kork-telemetry") implementation("com.netflix.spectator:spectator-api") implementation("io.spinnaker.kork:kork-core") + implementation "io.github.resilience4j:resilience4j-vavr" implementation("org.springframework:spring-jdbc") implementation("org.springframework:spring-tx") implementation("org.springframework.boot:spring-boot-autoconfigure") diff --git a/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepository.kt b/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepository.kt index c0a8bf18f6..072600cc94 100644 --- a/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepository.kt +++ b/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepository.kt @@ -77,7 +77,7 @@ import org.jooq.impl.DSL.timestampSub import org.jooq.impl.DSL.value import org.slf4j.LoggerFactory import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource -import rx.Observable +import io.reactivex.rxjava3.core.Observable import java.io.ByteArrayOutputStream import java.lang.System.currentTimeMillis import java.nio.charset.StandardCharsets @@ -409,7 +409,7 @@ class SqlExecutionRepository( ?: throw ExecutionNotFoundException("No $type found for $id") override fun retrieve(type: ExecutionType): Observable = - Observable.from( + Observable.fromIterable( fetchExecutions { pageSize, cursor -> selectExecutions(type, pageSize, cursor) } @@ -444,13 +444,13 @@ class SqlExecutionRepository( } ) - return Observable.from(select.fetchExecutions()) + return Observable.fromIterable(select.fetchExecutions()) } } override fun retrievePipelinesForApplication(application: String): Observable = withPool(readPoolName) { - Observable.from( + Observable.fromIterable( fetchExecutions { pageSize, cursor -> selectExecutions(PIPELINE, pageSize, cursor) { it.where(field("application").eq(application)) @@ -644,7 +644,7 @@ class SqlExecutionRepository( ) } - return Observable.from(select.fetchExecutions()) + return Observable.fromIterable(select.fetchExecutions()) } } @@ -652,7 +652,7 @@ class SqlExecutionRepository( application: String, criteria: ExecutionCriteria ): Observable { - return Observable.from(retrieveOrchestrationsForApplication(application, criteria, NATURAL_ASC)) + return Observable.fromIterable(retrieveOrchestrationsForApplication(application, criteria, NATURAL_ASC)) } override fun retrieveOrchestrationsForApplication( @@ -769,10 +769,10 @@ class SqlExecutionRepository( .setPageSize(100) .setStatuses(BUFFERED) .let { criteria -> - rx.Observable.merge( + Observable.merge( retrieve(ORCHESTRATION, criteria, partitionName), retrieve(PIPELINE, criteria, partitionName) - ).toList().toBlocking().single() + ).toList().blockingGet() } override fun retrieveAllApplicationNames(type: ExecutionType?): List { diff --git a/orca-sql/src/test/groovy/com/netflix/spinnaker/orca/sql/cleanup/OldPipelineCleanupPollingNotificationAgentSpec.groovy b/orca-sql/src/test/groovy/com/netflix/spinnaker/orca/sql/cleanup/OldPipelineCleanupPollingNotificationAgentSpec.groovy index 2d7d90a51d..4bf0824d49 100644 --- a/orca-sql/src/test/groovy/com/netflix/spinnaker/orca/sql/cleanup/OldPipelineCleanupPollingNotificationAgentSpec.groovy +++ b/orca-sql/src/test/groovy/com/netflix/spinnaker/orca/sql/cleanup/OldPipelineCleanupPollingNotificationAgentSpec.groovy @@ -110,14 +110,14 @@ abstract class OldPipelineCleanupPollingNotificationAgentSpec extends Specificat executionRepository.store(buildExecution(app, 2)) when: - def allExecutions = executionRepository.retrievePipelinesForApplication(app).toList().toBlocking().first().unique() + def allExecutions = executionRepository.retrievePipelinesForApplication(app).toList().blockingGet().unique() then: allExecutions.size() == 12 when: cleanupAgent.tick() - allExecutions = executionRepository.retrievePipelinesForApplication(app).toList().toBlocking().first().unique() + allExecutions = executionRepository.retrievePipelinesForApplication(app).toList().blockingGet().unique() then: // preserve any execution more recent than `thresholdDays` _AND_ @@ -135,14 +135,14 @@ abstract class OldPipelineCleanupPollingNotificationAgentSpec extends Specificat executionRepository.store(buildExecution(app, 49)) when: - def allExecutions = executionRepository.retrievePipelinesForApplication(app).toList().toBlocking().first().unique() + def allExecutions = executionRepository.retrievePipelinesForApplication(app).toList().blockingGet().unique() then: allExecutions.size() == 12 when: cleanupAgent.tick() - allExecutions = executionRepository.retrievePipelinesForApplication(app).toList().toBlocking().first().unique() + allExecutions = executionRepository.retrievePipelinesForApplication(app).toList().blockingGet().unique() then: // preserve any execution more recent than `thresholdDays` _AND_ diff --git a/orca-sql/src/test/groovy/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlPipelineExecutionRepositorySpec.groovy b/orca-sql/src/test/groovy/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlPipelineExecutionRepositorySpec.groovy index 5c256bb864..51e5bab513 100644 --- a/orca-sql/src/test/groovy/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlPipelineExecutionRepositorySpec.groovy +++ b/orca-sql/src/test/groovy/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlPipelineExecutionRepositorySpec.groovy @@ -36,7 +36,7 @@ import com.netflix.spinnaker.orca.pipeline.model.StageExecutionImpl import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository import com.netflix.spinnaker.orca.pipeline.persistence.PipelineExecutionRepositoryTck import org.jooq.impl.DSL -import rx.schedulers.Schedulers +import io.reactivex.rxjava3.schedulers.Schedulers import de.huxhorn.sulky.ulid.ULID import spock.lang.AutoCleanup import spock.lang.Shared @@ -126,7 +126,7 @@ abstract class SqlPipelineExecutionRepositorySpec extends PipelineExecutionRepos repo.store(e) then: - def pipelines = repo.retrieve(PIPELINE).toList().toBlocking().single() + def pipelines = repo.retrieve(PIPELINE).toList().blockingGet() pipelines.size() == 1 pipelines[0].id == e.id pipelines[0].stages.size() == 1 @@ -268,7 +268,7 @@ abstract class SqlPipelineExecutionRepositorySpec extends PipelineExecutionRepos repo.store(e) then: - def pipelines = repo.retrieve(PIPELINE).toList().toBlocking().single() + def pipelines = repo.retrieve(PIPELINE).toList().blockingGet() pipelines.size() == 1 pipelines[0].id == id pipelines[0].stages.size() == 1 @@ -287,7 +287,7 @@ abstract class SqlPipelineExecutionRepositorySpec extends PipelineExecutionRepos repo.store(e) then: - def pipelines = repo.retrieve(PIPELINE).toList().toBlocking().single() + def pipelines = repo.retrieve(PIPELINE).toList().blockingGet() pipelines.size() == 1 pipelines[0].id == id pipelines[0].stages.size() == 1 @@ -309,7 +309,7 @@ abstract class SqlPipelineExecutionRepositorySpec extends PipelineExecutionRepos repo.store(e) then: - def pipelines = repo.retrieve(PIPELINE).toList().toBlocking().single() + def pipelines = repo.retrieve(PIPELINE).toList().blockingGet() pipelines.size() == 1 pipelines[0].id == id pipelines[0].stages.size() == 1 @@ -360,14 +360,14 @@ abstract class SqlPipelineExecutionRepositorySpec extends PipelineExecutionRepos repo.store(orig) when: - def pipelines = repo.retrieve(PIPELINE).toList().toBlocking().single() + def pipelines = repo.retrieve(PIPELINE).toList().blockingGet() then: pipelines.size() == 1 when: repo.delete(PIPELINE, orig.id) - pipelines = repo.retrieve(PIPELINE).toList().toBlocking().single() + pipelines = repo.retrieve(PIPELINE).toList().blockingGet() then: pipelines.size() == 0 @@ -382,14 +382,14 @@ abstract class SqlPipelineExecutionRepositorySpec extends PipelineExecutionRepos repo.store(orig) when: - def pipelines = repo.retrieve(PIPELINE).toList().toBlocking().single() + def pipelines = repo.retrieve(PIPELINE).toList().blockingGet() then: pipelines.size() == 1 when: repo.delete(PIPELINE, id) - pipelines = repo.retrieve(PIPELINE).toList().toBlocking().single() + pipelines = repo.retrieve(PIPELINE).toList().blockingGet() then: pipelines.size() == 0 @@ -408,7 +408,7 @@ abstract class SqlPipelineExecutionRepositorySpec extends PipelineExecutionRepos e.stages.add(new StageExecutionImpl(e, "wait", "wait stage 2", [bar: 'BAR'])) repo.store(e) - def pipelines = repo.retrieve(PIPELINE).toList().toBlocking().single() + def pipelines = repo.retrieve(PIPELINE).toList().blockingGet() def storedStages = e.stages def loadedStages = pipelines*.stages.flatten() @@ -434,7 +434,7 @@ abstract class SqlPipelineExecutionRepositorySpec extends PipelineExecutionRepos e.stages.add(new StageExecutionImpl(e, "wait", "wait stage 2", [bar: 'BAR'])) repo.store(e) - def pipelines = repo.retrieve(PIPELINE).toList().toBlocking().single() + def pipelines = repo.retrieve(PIPELINE).toList().blockingGet() def storedStages = e.stages as Set def loadedStages = pipelines*.stages.flatten() as Set @@ -457,7 +457,7 @@ abstract class SqlPipelineExecutionRepositorySpec extends PipelineExecutionRepos stage.name = "wait stage updated" repo.storeStage(stage) - def pipelines = repo.retrieve(PIPELINE).toList().toBlocking().single() + def pipelines = repo.retrieve(PIPELINE).toList().blockingGet() then: pipelines.size() == 1 @@ -483,10 +483,9 @@ abstract class SqlPipelineExecutionRepositorySpec extends PipelineExecutionRepos when: def results = repository .retrieveOrchestrationsForApplication("spinnaker", criteria) - .subscribeOn(Schedulers.immediate()) + .subscribeOn(Schedulers.trampoline()) .toList() - .toBlocking() - .first() + .blockingGet() then: with(results) { @@ -514,10 +513,9 @@ abstract class SqlPipelineExecutionRepositorySpec extends PipelineExecutionRepos when: def results = repository .retrieveOrchestrationsForApplication("spinnaker", criteria) - .subscribeOn(Schedulers.immediate()) + .subscribeOn(Schedulers.trampoline()) .toList() - .toBlocking() - .first() + .blockingGet() then: results.isEmpty() @@ -543,10 +541,9 @@ abstract class SqlPipelineExecutionRepositorySpec extends PipelineExecutionRepos when: def results = repository .retrieveOrchestrationsForApplication("spinnaker", criteria) - .subscribeOn(Schedulers.immediate()) + .subscribeOn(Schedulers.trampoline()) .toList() - .toBlocking() - .first() + .blockingGet() then: with(results) { @@ -576,10 +573,9 @@ abstract class SqlPipelineExecutionRepositorySpec extends PipelineExecutionRepos when: def results = repository .retrieveOrchestrationsForApplication("spinnaker", criteria) - .subscribeOn(Schedulers.immediate()) + .subscribeOn(Schedulers.trampoline()) .toList() - .toBlocking() - .first() + .blockingGet() then: with(results) { diff --git a/orca-sql/src/test/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepositoryTest.kt b/orca-sql/src/test/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepositoryTest.kt index 1d0632282a..f48e843907 100644 --- a/orca-sql/src/test/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepositoryTest.kt +++ b/orca-sql/src/test/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepositoryTest.kt @@ -48,6 +48,7 @@ import org.testcontainers.DockerClientFactory import java.lang.System.currentTimeMillis import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource import javax.sql.DataSource +import io.reactivex.rxjava3.core.Observable class SqlExecutionRepositoryTest : JUnit5Minutests { @@ -309,7 +310,7 @@ class SqlExecutionRepositoryTest : JUnit5Minutests { sqlExecutionRepository.store(pipelineExecution2) val observable = sqlExecutionRepository.retrievePipelinesForApplication("application-2") - val executions = observable.toList().toBlocking().single() + val executions = observable.toList().blockingGet() assertThat(executions.map(PipelineExecution::getApplication).single()).isEqualTo("application-2") } } diff --git a/orca-web/src/main/groovy/com/netflix/spinnaker/orca/controllers/OperationsController.groovy b/orca-web/src/main/groovy/com/netflix/spinnaker/orca/controllers/OperationsController.groovy index 57346fd6d4..14dc2cfbfb 100644 --- a/orca-web/src/main/groovy/com/netflix/spinnaker/orca/controllers/OperationsController.groovy +++ b/orca-web/src/main/groovy/com/netflix/spinnaker/orca/controllers/OperationsController.groovy @@ -49,7 +49,7 @@ import org.springframework.beans.factory.annotation.Autowired import org.springframework.web.bind.annotation.* import retrofit.http.Query -import javax.servlet.http.HttpServletResponse +import jakarta.servlet.http.HttpServletResponse import static com.netflix.spinnaker.orca.api.pipeline.models.ExecutionType.ORCHESTRATION import static com.netflix.spinnaker.orca.api.pipeline.models.ExecutionType.PIPELINE diff --git a/orca-web/src/main/groovy/com/netflix/spinnaker/orca/controllers/PipelineTemplateController.groovy b/orca-web/src/main/groovy/com/netflix/spinnaker/orca/controllers/PipelineTemplateController.groovy index d90d344a9d..a7cc7d24f9 100644 --- a/orca-web/src/main/groovy/com/netflix/spinnaker/orca/controllers/PipelineTemplateController.groovy +++ b/orca-web/src/main/groovy/com/netflix/spinnaker/orca/controllers/PipelineTemplateController.groovy @@ -22,7 +22,7 @@ import com.netflix.spinnaker.orca.pipelinetemplate.v1schema.converter.PipelineTe import com.netflix.spinnaker.orca.pipelinetemplate.v1schema.model.PipelineTemplate import com.netflix.spinnaker.orca.pipelinetemplate.v1schema.model.TemplateConfiguration.TemplateSource import groovy.util.logging.Slf4j -import javax.servlet.http.HttpServletResponse +import jakarta.servlet.http.HttpServletResponse import org.springframework.beans.factory.annotation.Autowired import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression import org.springframework.http.HttpStatus diff --git a/orca-web/src/main/groovy/com/netflix/spinnaker/orca/controllers/ProjectController.groovy b/orca-web/src/main/groovy/com/netflix/spinnaker/orca/controllers/ProjectController.groovy index c65c8dae49..3ac56dbf32 100644 --- a/orca-web/src/main/groovy/com/netflix/spinnaker/orca/controllers/ProjectController.groovy +++ b/orca-web/src/main/groovy/com/netflix/spinnaker/orca/controllers/ProjectController.groovy @@ -24,7 +24,8 @@ import com.netflix.spinnaker.orca.front50.Front50Service import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository import org.springframework.beans.factory.annotation.Autowired import org.springframework.web.bind.annotation.* -import rx.schedulers.Schedulers +import io.reactivex.rxjava3.schedulers.Schedulers +import io.reactivex.rxjava3.core.Observable @RestController class ProjectController { @@ -63,9 +64,9 @@ class ProjectController { statuses: (statuses.split(",") as Collection) ) - def allPipelines = rx.Observable.merge(pipelineConfigIds.collect { + def allPipelines = Observable.merge(pipelineConfigIds.collect { executionRepository.retrievePipelinesForPipelineConfigId(it, executionCriteria) - }).subscribeOn(Schedulers.io()).toList().toBlocking().single().sort(startTimeOrId) + }).subscribeOn(Schedulers.io()).toList().blockingGet().sort(startTimeOrId) return allPipelines } diff --git a/orca-web/src/main/groovy/com/netflix/spinnaker/orca/controllers/TaskController.groovy b/orca-web/src/main/groovy/com/netflix/spinnaker/orca/controllers/TaskController.groovy index 3f4c4b9f79..5421f0e3ee 100644 --- a/orca-web/src/main/groovy/com/netflix/spinnaker/orca/controllers/TaskController.groovy +++ b/orca-web/src/main/groovy/com/netflix/spinnaker/orca/controllers/TaskController.groovy @@ -43,7 +43,8 @@ import org.springframework.security.access.prepost.PostFilter import org.springframework.security.access.prepost.PreAuthorize import org.springframework.security.access.prepost.PreFilter import org.springframework.web.bind.annotation.* -import rx.schedulers.Schedulers +import io.reactivex.rxjava3.schedulers.Schedulers +import io.reactivex.rxjava3.core.Observable import java.nio.charset.Charset import java.time.Clock @@ -136,7 +137,7 @@ class TaskController { @PostFilter("hasPermission(filterObject.application, 'APPLICATION', 'READ')") @RequestMapping(value = "/tasks", method = RequestMethod.GET) List list() { - executionRepository.retrieve(ORCHESTRATION).toBlocking().iterator.collect { + executionRepository.retrieve(ORCHESTRATION).blockingIterable().collect { convert it } } @@ -226,13 +227,13 @@ class TaskController { if (executionIds) { List ids = executionIds.split(',') - List executions = rx.Observable.from(ids.collect { + List executions = Observable.fromIterable(ids.collect { try { executionRepository.retrieve(PIPELINE, it) } catch (ExecutionNotFoundException e) { null } - }).subscribeOn(Schedulers.io()).toList().toBlocking().single().findAll() + }).subscribeOn(Schedulers.io()).toList().blockingGet().findAll() if (!expand) { unexpandPipelineExecutions(executions) @@ -242,9 +243,9 @@ class TaskController { } List ids = pipelineConfigIds.split(',') - List allPipelines = rx.Observable.merge(ids.collect { + List allPipelines = Observable.merge(ids.collect { executionRepository.retrievePipelinesForPipelineConfigId(it, executionCriteria) - }).subscribeOn(Schedulers.io()).toList().toBlocking().single().sort(startTimeOrId) + }).subscribeOn(Schedulers.io()).toList().blockingGet().sort(startTimeOrId) if (!expand) { unexpandPipelineExecutions(allPipelines) @@ -620,10 +621,10 @@ class TaskController { optimizedGetPipelineExecutions(application, allFront50PipelineConfigIds, executionCriteria) ) } else { - allPipelineExecutions = rx.Observable.merge(allFront50PipelineConfigIds.collect { + allPipelineExecutions = Observable.merge(allFront50PipelineConfigIds.collect { log.debug("processing pipeline config id: $it") executionRepository.retrievePipelinesForPipelineConfigId(it, executionCriteria) - }).subscribeOn(Schedulers.io()).toList().toBlocking().single() + }).subscribeOn(Schedulers.io()).toList().blockingGet() } allPipelineExecutions.sort(startTimeOrId) diff --git a/orca-web/src/main/groovy/com/netflix/spinnaker/orca/web/config/WebConfiguration.groovy b/orca-web/src/main/groovy/com/netflix/spinnaker/orca/web/config/WebConfiguration.groovy index 30beef99ef..eab2da5edc 100644 --- a/orca-web/src/main/groovy/com/netflix/spinnaker/orca/web/config/WebConfiguration.groovy +++ b/orca-web/src/main/groovy/com/netflix/spinnaker/orca/web/config/WebConfiguration.groovy @@ -18,13 +18,13 @@ package com.netflix.spinnaker.orca.web.config import groovy.util.logging.Slf4j -import javax.servlet.Filter -import javax.servlet.FilterChain -import javax.servlet.FilterConfig -import javax.servlet.ServletException -import javax.servlet.ServletRequest -import javax.servlet.ServletResponse -import javax.servlet.http.HttpServletResponse +import jakarta.servlet.Filter +import jakarta.servlet.FilterChain +import jakarta.servlet.FilterConfig +import jakarta.servlet.ServletException +import jakarta.servlet.ServletRequest +import jakarta.servlet.ServletResponse +import jakarta.servlet.http.HttpServletResponse import com.fasterxml.jackson.databind.ObjectMapper import com.netflix.spectator.api.Registry import com.netflix.spinnaker.fiat.shared.EnableFiatAutoConfig @@ -32,7 +32,6 @@ import com.netflix.spinnaker.filters.AuthenticatedRequestFilter import com.netflix.spinnaker.kork.web.interceptors.MetricsInterceptor import com.netflix.spinnaker.orca.jackson.OrcaObjectMapper import groovy.transform.CompileStatic -import org.springframework.beans.factory.annotation.Autowire import org.springframework.boot.web.servlet.FilterRegistrationBean import org.springframework.context.annotation.Bean import org.springframework.context.annotation.ComponentScan @@ -63,7 +62,7 @@ class WebConfiguration { } } - @Bean(name = "objectMapper", autowire = Autowire.BY_TYPE) ObjectMapper orcaObjectMapper() { + @Bean(name = "objectMapper") ObjectMapper orcaObjectMapper() { OrcaObjectMapper.getInstance() } diff --git a/orca-web/src/main/java/com/netflix/spinnaker/config/EnhancedMonitoringConfiguration.java b/orca-web/src/main/java/com/netflix/spinnaker/config/EnhancedMonitoringConfiguration.java index 86336aeb8d..2397b49c5b 100644 --- a/orca-web/src/main/java/com/netflix/spinnaker/config/EnhancedMonitoringConfiguration.java +++ b/orca-web/src/main/java/com/netflix/spinnaker/config/EnhancedMonitoringConfiguration.java @@ -22,6 +22,7 @@ import com.netflix.spinnaker.orca.api.pipeline.models.ExecutionType; import com.netflix.spinnaker.orca.api.pipeline.models.PipelineExecution; import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository; +import io.reactivex.rxjava3.schedulers.Schedulers; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -33,7 +34,6 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.Scheduled; -import rx.schedulers.Schedulers; @Configuration @EnableConfigurationProperties(EnhancedMonitoringConfigurationProperties.class) @@ -85,8 +85,7 @@ void refresh() { .setStatuses(ExecutionStatus.RUNNING)) .subscribeOn(Schedulers.io()) .toList() - .toBlocking() - .single(); + .blockingGet(); orchestrationCountPerApplication.get(application).set(executions.size()); } catch (Exception e) { log.error( diff --git a/orca-web/src/test/groovy/com/netflix/spinnaker/orca/controllers/OperationsControllerSpec.groovy b/orca-web/src/test/groovy/com/netflix/spinnaker/orca/controllers/OperationsControllerSpec.groovy index 0cbad2e2eb..2875d4f131 100644 --- a/orca-web/src/test/groovy/com/netflix/spinnaker/orca/controllers/OperationsControllerSpec.groovy +++ b/orca-web/src/test/groovy/com/netflix/spinnaker/orca/controllers/OperationsControllerSpec.groovy @@ -28,7 +28,7 @@ import com.netflix.spinnaker.orca.clouddriver.service.JobService import com.netflix.spinnaker.orca.exceptions.PipelineTemplateValidationException import com.netflix.spinnaker.orca.front50.Front50Service -import javax.servlet.http.HttpServletResponse +import jakarta.servlet.http.HttpServletResponse import com.netflix.spinnaker.kork.common.Header import com.netflix.spinnaker.orca.igor.BuildService import com.netflix.spinnaker.orca.jackson.OrcaObjectMapper @@ -49,7 +49,7 @@ import org.springframework.http.HttpMethod import org.springframework.http.MediaType import org.springframework.test.web.servlet.setup.MockMvcBuilders import retrofit2.mock.Calls -import rx.Observable +import io.reactivex.rxjava3.core.Observable import spock.lang.Specification import spock.lang.Subject import spock.lang.Unroll diff --git a/orca-web/src/test/groovy/com/netflix/spinnaker/orca/controllers/TaskControllerSpec.groovy b/orca-web/src/test/groovy/com/netflix/spinnaker/orca/controllers/TaskControllerSpec.groovy index 7f03042fe5..86f577dda7 100644 --- a/orca-web/src/test/groovy/com/netflix/spinnaker/orca/controllers/TaskControllerSpec.groovy +++ b/orca-web/src/test/groovy/com/netflix/spinnaker/orca/controllers/TaskControllerSpec.groovy @@ -39,6 +39,7 @@ import org.springframework.test.web.servlet.MockMvc import org.springframework.test.web.servlet.setup.MockMvcBuilders import spock.lang.Specification import spock.lang.Unroll +import io.reactivex.rxjava3.core.Observable import java.time.Clock import java.time.Instant @@ -92,7 +93,7 @@ class TaskControllerSpec extends Specification { then: 1 * executionRepository.retrieve(ORCHESTRATION) >> { - return rx.Observable.empty() + return Observable.empty() } } @@ -114,7 +115,7 @@ class TaskControllerSpec extends Specification { void 'step names are properly translated'() { given: - executionRepository.retrieve(ORCHESTRATION) >> rx.Observable.from([orchestration { + executionRepository.retrieve(ORCHESTRATION) >> Observable.fromIterable([orchestration { id = "1" application = "covfefe" stage { @@ -195,7 +196,7 @@ class TaskControllerSpec extends Specification { MockHttpServletResponse response = mockMvc.perform(get('/tasks')).andReturn().response then: - 1 * executionRepository.retrieve(ORCHESTRATION) >> rx.Observable.from([]) + 1 * executionRepository.retrieve(ORCHESTRATION) >> Observable.fromIterable([]) response.status == 200 response.contentAsString == '[]' } @@ -228,7 +229,7 @@ class TaskControllerSpec extends Specification { [pipelineConfigId: "2", id: 'older3', application: app, startTime: clock.instant().minus(daysOfExecutionHistory + 1, DAYS).minus(4, HOURS).toEpochMilli()] ] - executionRepository.retrievePipelinesForPipelineConfigId("1", _) >> rx.Observable.from(pipelines.findAll { + executionRepository.retrievePipelinesForPipelineConfigId("1", _) >> Observable.fromIterable(pipelines.findAll { it.pipelineConfigId == "1" }.collect { config -> pipeline { @@ -238,7 +239,7 @@ class TaskControllerSpec extends Specification { pipelineConfigId = config.pipelineConfigId } }) - executionRepository.retrievePipelinesForPipelineConfigId("2", _) >> rx.Observable.from(pipelines.findAll { + executionRepository.retrievePipelinesForPipelineConfigId("2", _) >> Observable.fromIterable(pipelines.findAll { it.pipelineConfigId == "2" }.collect { config -> pipeline { @@ -307,7 +308,7 @@ class TaskControllerSpec extends Specification { [pipelineConfigId: "3", id: "started-5", application: "covfefe", startTime: clock.instant().minus(daysOfExecutionHistory, DAYS).minus(2, HOURS).toEpochMilli(), id: 'old-3'] ] - executionRepository.retrievePipelinesForPipelineConfigId("1", _) >> rx.Observable.from(pipelines.findAll { + executionRepository.retrievePipelinesForPipelineConfigId("1", _) >> Observable.fromIterable(pipelines.findAll { it.pipelineConfigId == "1" }.collect { config -> pipeline { @@ -317,7 +318,7 @@ class TaskControllerSpec extends Specification { pipelineConfigId = config.pipelineConfigId } }) - executionRepository.retrievePipelinesForPipelineConfigId("2", _) >> rx.Observable.from(pipelines.findAll { + executionRepository.retrievePipelinesForPipelineConfigId("2", _) >> Observable.fromIterable(pipelines.findAll { it.pipelineConfigId == "2" }.collect { config -> pipeline { @@ -327,7 +328,7 @@ class TaskControllerSpec extends Specification { pipelineConfigId = config.pipelineConfigId } }) - executionRepository.retrievePipelinesForPipelineConfigId("3", _) >> rx.Observable.from(pipelines.findAll { + executionRepository.retrievePipelinesForPipelineConfigId("3", _) >> Observable.fromIterable(pipelines.findAll { it.pipelineConfigId == "3" }.collect { config -> pipeline { diff --git a/orca-web/src/test/kotlin/com/netflix/spinnaker/orca/controllers/AdminControllerTest.kt b/orca-web/src/test/kotlin/com/netflix/spinnaker/orca/controllers/AdminControllerTest.kt index 23c4bc9c31..967a466fe5 100644 --- a/orca-web/src/test/kotlin/com/netflix/spinnaker/orca/controllers/AdminControllerTest.kt +++ b/orca-web/src/test/kotlin/com/netflix/spinnaker/orca/controllers/AdminControllerTest.kt @@ -25,6 +25,7 @@ import dev.minutest.rootContext import org.springframework.beans.factory.annotation.Autowired import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc import org.springframework.http.MediaType +import org.springframework.security.test.web.servlet.request.SecurityMockMvcRequestPostProcessors.csrf import org.springframework.test.web.servlet.MockMvc import org.springframework.test.web.servlet.post import strikt.api.expectThat @@ -53,6 +54,7 @@ class AdminControllerTest : JUnit5Minutests { expectThat(discoveryActivator.enabled).isTrue() val response = mockMvc.post("/admin/instance/enabled") { + with(csrf()) contentType = MediaType.APPLICATION_JSON content = jacksonObjectMapper().writeValueAsString(mapOf("enabled" to false)) }.andReturn().response diff --git a/orca-webhook/src/main/java/com/netflix/spinnaker/orca/webhook/tasks/MonitorWebhookTask.java b/orca-webhook/src/main/java/com/netflix/spinnaker/orca/webhook/tasks/MonitorWebhookTask.java index 4ed13651a1..2a23413a54 100644 --- a/orca-webhook/src/main/java/com/netflix/spinnaker/orca/webhook/tasks/MonitorWebhookTask.java +++ b/orca-webhook/src/main/java/com/netflix/spinnaker/orca/webhook/tasks/MonitorWebhookTask.java @@ -127,7 +127,7 @@ public TaskResult execute(StageExecution stage) { } catch (HttpStatusCodeException e) { var statusCode = e.getStatusCode(); - if (shouldRetry(statusCode, stageData)) { + if (shouldRetry(HttpStatus.valueOf(statusCode.value()), stageData)) { log.warn( "Failed to get webhook status from {} with statusCode={}, will retry", stageData.statusEndpoint, @@ -172,7 +172,7 @@ public TaskResult execute(StageExecution stage) { } monitor.setBody(response.getBody()); - monitor.setStatusCode(response.getStatusCode()); + monitor.setStatusCode(HttpStatus.valueOf(response.getStatusCode().value())); monitor.setStatusCodeValue(response.getStatusCode().value()); if (!response.getHeaders().isEmpty()) { diff --git a/orca-webhook/src/main/java/com/netflix/spinnaker/orca/webhook/tasks/WebhookResponseProcessor.java b/orca-webhook/src/main/java/com/netflix/spinnaker/orca/webhook/tasks/WebhookResponseProcessor.java index 59f9728f90..7e8ff9cf05 100644 --- a/orca-webhook/src/main/java/com/netflix/spinnaker/orca/webhook/tasks/WebhookResponseProcessor.java +++ b/orca-webhook/src/main/java/com/netflix/spinnaker/orca/webhook/tasks/WebhookResponseProcessor.java @@ -82,7 +82,7 @@ public TaskResult process(ResponseEntity response, Exception exceptionRe private TaskResult processReceivedHttpStatusException(HttpStatusCodeException e) { var webhookOutput = new WebhookStage.WebhookResponseStageData(); - webhookOutput.setStatusCode(e.getStatusCode()); + webhookOutput.setStatusCode(HttpStatus.valueOf(e.getStatusCode().value())); webhookOutput.setStatusCodeValue(e.getStatusCode().value()); if (e.getResponseHeaders() != null) { webhookOutput.setHeaders(e.getResponseHeaders().toSingleValueMap()); @@ -90,7 +90,9 @@ private TaskResult processReceivedHttpStatusException(HttpStatusCodeException e) if (!StringUtils.isEmpty(e.getResponseBodyAsString())) { webhookOutput.setBody(processResponseBodyAsJson(e.getResponseBodyAsString())); } - TaskResult result = processReceivedFailureStatusCode(e.getStatusCode(), webhookOutput); + TaskResult result = + processReceivedFailureStatusCode( + HttpStatus.valueOf(e.getStatusCode().value()), webhookOutput); log.warn(webhookOutput.getError(), e); return result; } @@ -162,7 +164,7 @@ private TaskResult processResponse(ResponseEntity response) { Map stageOutput = new HashMap<>(); var webhookOutput = new WebhookStage.WebhookResponseStageData(); stageOutput.put("webhook", webhookOutput); - webhookOutput.setStatusCode(response.getStatusCode()); + webhookOutput.setStatusCode(HttpStatus.valueOf(response.getStatusCode().value())); webhookOutput.setStatusCodeValue(response.getStatusCode().value()); if (response.getBody() != null) { @@ -171,7 +173,7 @@ private TaskResult processResponse(ResponseEntity response) { if (!response.getHeaders().isEmpty()) { webhookOutput.setHeaders(response.getHeaders().toSingleValueMap()); } - HttpStatus status = response.getStatusCode(); + HttpStatus status = HttpStatus.valueOf(response.getStatusCode().value()); if (status.is2xxSuccessful() || status.is3xxRedirection()) { diff --git a/orca-webhook/src/test/groovy/com/netflix/spinnaker/orca/webhook/pipeline/WebhookStageSpec.groovy b/orca-webhook/src/test/groovy/com/netflix/spinnaker/orca/webhook/pipeline/WebhookStageSpec.groovy index 8089590c2a..abe8b7649c 100644 --- a/orca-webhook/src/test/groovy/com/netflix/spinnaker/orca/webhook/pipeline/WebhookStageSpec.groovy +++ b/orca-webhook/src/test/groovy/com/netflix/spinnaker/orca/webhook/pipeline/WebhookStageSpec.groovy @@ -19,6 +19,7 @@ package com.netflix.spinnaker.orca.webhook.pipeline import com.fasterxml.jackson.databind.ObjectMapper import com.netflix.spinnaker.kork.exceptions.UserException import com.netflix.spinnaker.orca.api.pipeline.graph.TaskNode +import com.netflix.spinnaker.orca.jackson.OrcaObjectMapper import com.netflix.spinnaker.orca.pipeline.model.PipelineExecutionImpl import com.netflix.spinnaker.orca.pipeline.model.StageExecutionImpl import com.netflix.spinnaker.orca.pipeline.tasks.WaitTask @@ -95,7 +96,7 @@ class WebhookStageSpec extends Specification { def 'json format is respected'() { given: def json = JsonOutput.toJson([cancelMethod: methodString, method: methodString]) - def mapper = new ObjectMapper() + def mapper = OrcaObjectMapper.getInstance() when: def data = mapper.readValue(json, WebhookStage.StageData)