From ba3220e1d72f45e378af56e4630298ee07b0fa92 Mon Sep 17 00:00:00 2001 From: ekawinataa Date: Wed, 24 Jul 2024 18:20:12 +0700 Subject: [PATCH] feat: Add Retryable Configuration for GRPC Sink (using CEL) (#44) Add new config to enable response based retry in gRPC Sink Config Example Parameter: SINK_GRPC_RESPONSE_RETRY_CEL_EXPRESSION ="GenericResponse.success == false && GenericResponse.errors.exists(e, e.code == "400")" Reason for the PR: EGLC users who want to migrate expect this feature to be supported in Firehose Notes : CEL Expression: https://cel.dev/ cel-java: https://github.com/google/cel-java * [feat] Add CEL evaluator and add it to grpc sink * Remove blank check * remove unintended change * [feat] add success field checking * [feat] add evaluator method * [feat] handle descriptor update * Add test for evaluator * Update test * Update SINK_GRPC_RESPONSE_RETRY_CEL_EXPRESSION default value * Add test for GrpcSink * Rename descriptor to payloadDescriptor * Checkstyle update * Refactor instantiation logic to separate method * Remove schema refresh * Remove projectnessie and use implementation from cel-java * update checkstyle * Add comment * Update docs * Move the evaluator instantiation to factory method * Remove unused sink config * Add more testcases * revert protoc version * Add more test cases * Add more comprehensive documentation * Rename default class and update docs * Refactor typical cel functionality to util class * Add checking for expression result * Use built in UnsupportedOperationException * Update build-info-extractor * Update to classpath("org.jfrog.buildinfo:build-info-extractor-gradle:4.33.1") * Remove OperationNotSupportedException.java * Remove jfrog build info on dependencies * - Tidy up tests - Make exception message more verbose * Bump version * Makes error type for retryable error configurable through env * Add 1 more test case * Update default value * Use default value of true on CEL Expression config to retry on default case. Remove implementation of DefaultGrpcResponsePayloadEvaluator.java --- docs/docs/sinks/grpc-sink.md | 43 +++++++ .../firehose/config/GrpcSinkConfig.java | 11 ++ .../GrpcSinkRetryErrorTypeConverter.java | 14 ++ .../GrpcResponseCelPayloadEvaluator.java | 61 +++++++++ .../firehose/evaluator/PayloadEvaluator.java | 16 +++ .../firehose/sink/grpc/GrpcSink.java | 29 ++++- .../firehose/sink/grpc/GrpcSinkFactory.java | 17 ++- .../GrpcSinkRetryErrorTypeConverterTest.java | 34 +++++ .../GrpcResponseCelPayloadEvaluatorTest.java | 121 ++++++++++++++++++ .../firehose/sink/grpc/GrpcSinkTest.java | 83 +++++++++++- 10 files changed, 418 insertions(+), 11 deletions(-) create mode 100644 src/main/java/com/gotocompany/firehose/config/converter/GrpcSinkRetryErrorTypeConverter.java create mode 100644 src/main/java/com/gotocompany/firehose/evaluator/GrpcResponseCelPayloadEvaluator.java create mode 100644 src/main/java/com/gotocompany/firehose/evaluator/PayloadEvaluator.java create mode 100644 src/test/java/com/gotocompany/firehose/converter/GrpcSinkRetryErrorTypeConverterTest.java create mode 100644 src/test/java/com/gotocompany/firehose/evaluator/GrpcResponseCelPayloadEvaluatorTest.java diff --git a/docs/docs/sinks/grpc-sink.md b/docs/docs/sinks/grpc-sink.md index a4b5f26e8..384018731 100644 --- a/docs/docs/sinks/grpc-sink.md +++ b/docs/docs/sinks/grpc-sink.md @@ -82,3 +82,46 @@ Defines the amount of time (in milliseconds) gRPC clients are willing to wait fo - Example value: `1000` - Type: `optional` + +### `SINK_GRPC_RESPONSE_RETRY_CEL_EXPRESSION` + +Defines the CEL(Common Expression Language) expression used to evaluate whether gRPC sink call should be retried or not based on the gRPC response. +The given CEL expression should evaluate to a boolean value. If the expression evaluates to true, the unsuccessful gRPC sink call will be retried, otherwise it won't. +Currently, we support all standard CEL macro including: has, all, exists, exists_one, map, map_filter, filter +For more information about CEL please refer to this documentation : https://github.com/google/cel-spec/blob/master/doc/langdef.md + +- Example value: `com.gotocompany.generic.GrpcResponse.success == false && com.gotocompany.generic.GenericResponse.errors.exists(e, int(e.code) >= 400 && int(e.code) <= 500)` +- Type: `optional` +- Default value: `` +- Use cases : + Example response proto : + ``` + syntax = "proto3"; + package com.gotocompany.generic; + + GenericResponse { + bool success = 1; + repeated Error errors = 2; + } + + Error { + string code = 1; + string reason = 2; + } + ``` + + Example retry rule : + - Retry on specific error code : `com.gotocompany.generic.GenericResponse.errors.exists(e, e.code == "400")` + - Retry on specific error code range : `com.gotocompany.generic.GenericResponse.errors.exists(e, int(e.code) >= 400 && int(e.code) <= 500)` + - Retry on error codes outside from specific error codes : `com.gotocompany.generic.GenericResponse.errors.exists(e, !(int(e.code) in [400, 500, 600]))` + - Disable retry on all cases : `false` + - Retry on all error codes : `true` + +### `SINK_GRPC_RESPONSE_RETRY_ERROR_TYPE` + +Defines the ErrorType to assign for a retryable error. This is used in conjunction with `SINK_GRPC_RESPONSE_RETRY_CEL_EXPRESSION` and `ERROR_TYPES_FOR_RETRY`. +Value must be defined in com.gotocompany.depot.error.ErrorType + +- Example value: `SINK_RETRYABLE_ERROR` +- Type: `optional` +- Default Value: `DEFAULT_ERROR` diff --git a/src/main/java/com/gotocompany/firehose/config/GrpcSinkConfig.java b/src/main/java/com/gotocompany/firehose/config/GrpcSinkConfig.java index 81a2ae5f3..03aaad343 100644 --- a/src/main/java/com/gotocompany/firehose/config/GrpcSinkConfig.java +++ b/src/main/java/com/gotocompany/firehose/config/GrpcSinkConfig.java @@ -1,6 +1,8 @@ package com.gotocompany.firehose.config; +import com.gotocompany.depot.error.ErrorType; import com.gotocompany.firehose.config.converter.GrpcMetadataConverter; +import com.gotocompany.firehose.config.converter.GrpcSinkRetryErrorTypeConverter; import org.aeonbits.owner.Config; import java.util.Map; @@ -31,6 +33,15 @@ public interface GrpcSinkConfig extends AppConfig { @Config.Key("SINK_GRPC_ARG_DEADLINE_MS") Long getSinkGrpcArgDeadlineMS(); + @Config.Key("SINK_GRPC_RESPONSE_RETRY_CEL_EXPRESSION") + @DefaultValue("true") + String getSinkGrpcResponseRetryCELExpression(); + + @Config.Key("SINK_GRPC_RESPONSE_RETRY_ERROR_TYPE") + @DefaultValue("DEFAULT_ERROR") + @ConverterClass(GrpcSinkRetryErrorTypeConverter.class) + ErrorType getSinkGrpcRetryErrorType(); + @Key("SINK_GRPC_METADATA") @DefaultValue("") @ConverterClass(GrpcMetadataConverter.class) diff --git a/src/main/java/com/gotocompany/firehose/config/converter/GrpcSinkRetryErrorTypeConverter.java b/src/main/java/com/gotocompany/firehose/config/converter/GrpcSinkRetryErrorTypeConverter.java new file mode 100644 index 000000000..25da0603a --- /dev/null +++ b/src/main/java/com/gotocompany/firehose/config/converter/GrpcSinkRetryErrorTypeConverter.java @@ -0,0 +1,14 @@ +package com.gotocompany.firehose.config.converter; + +import com.gotocompany.depot.error.ErrorType; +import org.aeonbits.owner.Converter; + +import java.lang.reflect.Method; +import java.util.Locale; + +public class GrpcSinkRetryErrorTypeConverter implements Converter { + @Override + public ErrorType convert(Method method, String s) { + return ErrorType.valueOf(s.trim().toUpperCase(Locale.ROOT)); + } +} diff --git a/src/main/java/com/gotocompany/firehose/evaluator/GrpcResponseCelPayloadEvaluator.java b/src/main/java/com/gotocompany/firehose/evaluator/GrpcResponseCelPayloadEvaluator.java new file mode 100644 index 000000000..b234455a1 --- /dev/null +++ b/src/main/java/com/gotocompany/firehose/evaluator/GrpcResponseCelPayloadEvaluator.java @@ -0,0 +1,61 @@ +package com.gotocompany.firehose.evaluator; + +import com.google.protobuf.Descriptors; +import com.google.protobuf.Message; +import com.gotocompany.firehose.utils.CelUtils; +import dev.cel.common.types.CelKind; +import dev.cel.compiler.CelCompiler; +import dev.cel.runtime.CelRuntime; +import dev.cel.runtime.CelRuntimeFactory; +import lombok.extern.slf4j.Slf4j; + +/** + * Implementation of PayloadEvaluator that evaluates gRPC responses using CEL (Common Expression Language). + */ +@Slf4j +public class GrpcResponseCelPayloadEvaluator implements PayloadEvaluator { + + private final Descriptors.Descriptor descriptor; + private CelRuntime.Program celProgram; + + /** + * Constructs a GrpcResponseCelPayloadEvaluator with the specified descriptor and CEL expression. + * + * @param descriptor the descriptor of the gRPC message + * @param celExpression the CEL expression to evaluate against the message + */ + public GrpcResponseCelPayloadEvaluator(Descriptors.Descriptor descriptor, String celExpression) { + this.descriptor = descriptor; + buildCelEnvironment(celExpression); + } + + /** + * Evaluates the given gRPC message payload using the CEL program. + * + * @param payload the gRPC message to be evaluated + * @return true if the payload passes the evaluation, false otherwise + */ + @Override + public boolean evaluate(Message payload) { + if (!descriptor.getFullName().equals(payload.getDescriptorForType().getFullName())) { + throw new IllegalArgumentException(String.format("Payload %s does not match descriptor %s", + payload.getDescriptorForType().getFullName(), descriptor.getFullName())); + } + return (boolean) CelUtils.evaluate(this.celProgram, payload); + } + + /** + * Builds the CEL environment required to evaluate the CEL expression. + * + * @param celExpression the CEL expression to evaluate against the message + * @throws IllegalArgumentException if the CEL expression is invalid or if the evaluator cannot be constructed + */ + private void buildCelEnvironment(String celExpression) { + CelCompiler celCompiler = CelUtils.initializeCelCompiler(this.descriptor); + CelRuntime celRuntime = CelRuntimeFactory.standardCelRuntimeBuilder() + .build(); + this.celProgram = CelUtils.initializeCelProgram(celExpression, celRuntime, celCompiler, + celType -> celType.kind().equals(CelKind.BOOL)); + } + +} diff --git a/src/main/java/com/gotocompany/firehose/evaluator/PayloadEvaluator.java b/src/main/java/com/gotocompany/firehose/evaluator/PayloadEvaluator.java new file mode 100644 index 000000000..7b8ba480b --- /dev/null +++ b/src/main/java/com/gotocompany/firehose/evaluator/PayloadEvaluator.java @@ -0,0 +1,16 @@ +package com.gotocompany.firehose.evaluator; + +/** + * A generic interface for evaluating payloads. + * + * @param the type of payload to be evaluated + */ +public interface PayloadEvaluator { + /** + * Evaluates the given payload. + * + * @param payload the payload to be evaluated + * @return true if the payload passes the evaluation, false otherwise + */ + boolean evaluate(T payload); +} diff --git a/src/main/java/com/gotocompany/firehose/sink/grpc/GrpcSink.java b/src/main/java/com/gotocompany/firehose/sink/grpc/GrpcSink.java index c08312844..048c057ef 100644 --- a/src/main/java/com/gotocompany/firehose/sink/grpc/GrpcSink.java +++ b/src/main/java/com/gotocompany/firehose/sink/grpc/GrpcSink.java @@ -1,7 +1,11 @@ package com.gotocompany.firehose.sink.grpc; - +import com.gotocompany.depot.error.ErrorInfo; +import com.gotocompany.depot.error.ErrorType; +import com.gotocompany.firehose.config.GrpcSinkConfig; +import com.gotocompany.firehose.evaluator.PayloadEvaluator; +import com.gotocompany.firehose.exception.DefaultException; import com.gotocompany.firehose.exception.DeserializerException; import com.gotocompany.firehose.message.Message; import com.gotocompany.firehose.metrics.FirehoseInstrumentation; @@ -21,13 +25,21 @@ public class GrpcSink extends AbstractSink { private final GrpcClient grpcClient; + private final StencilClient stencilClient; + private final GrpcSinkConfig grpcSinkConfig; private List messages; - private StencilClient stencilClient; + private PayloadEvaluator retryEvaluator; - public GrpcSink(FirehoseInstrumentation firehoseInstrumentation, GrpcClient grpcClient, StencilClient stencilClient) { + public GrpcSink(FirehoseInstrumentation firehoseInstrumentation, + GrpcClient grpcClient, + StencilClient stencilClient, + GrpcSinkConfig grpcSinkConfig, + PayloadEvaluator retryEvaluator) { super(firehoseInstrumentation, "grpc"); this.grpcClient = grpcClient; this.stencilClient = stencilClient; + this.grpcSinkConfig = grpcSinkConfig; + this.retryEvaluator = retryEvaluator; } @Override @@ -43,6 +55,7 @@ protected List execute() throws Exception { if (!success) { getFirehoseInstrumentation().logWarn("Grpc Service returned error"); failedMessages.add(message); + setRetryableErrorInfo(message, response); } } getFirehoseInstrumentation().logDebug("Failed messages count: {}", failedMessages.size()); @@ -60,4 +73,14 @@ public void close() throws IOException { this.messages = new ArrayList<>(); stencilClient.close(); } + + private void setRetryableErrorInfo(Message message, DynamicMessage dynamicMessage) { + boolean eligibleToRetry = retryEvaluator.evaluate(dynamicMessage); + if (eligibleToRetry) { + getFirehoseInstrumentation().logDebug("Retrying grpc service"); + message.setErrorInfo(new ErrorInfo(new DefaultException("Retryable gRPC Error"), grpcSinkConfig.getSinkGrpcRetryErrorType())); + return; + } + message.setErrorInfo(new ErrorInfo(new DefaultException("Non Retryable gRPC Error"), ErrorType.SINK_NON_RETRYABLE_ERROR)); + } } diff --git a/src/main/java/com/gotocompany/firehose/sink/grpc/GrpcSinkFactory.java b/src/main/java/com/gotocompany/firehose/sink/grpc/GrpcSinkFactory.java index e06276b18..c8b315b73 100644 --- a/src/main/java/com/gotocompany/firehose/sink/grpc/GrpcSinkFactory.java +++ b/src/main/java/com/gotocompany/firehose/sink/grpc/GrpcSinkFactory.java @@ -1,8 +1,11 @@ package com.gotocompany.firehose.sink.grpc; +import com.google.protobuf.Message; import com.gotocompany.firehose.config.AppConfig; import com.gotocompany.firehose.config.GrpcSinkConfig; +import com.gotocompany.firehose.evaluator.GrpcResponseCelPayloadEvaluator; +import com.gotocompany.firehose.evaluator.PayloadEvaluator; import com.gotocompany.firehose.metrics.FirehoseInstrumentation; import com.gotocompany.firehose.proto.ProtoToMetadataMapper; import com.gotocompany.firehose.sink.grpc.client.GrpcClient; @@ -32,15 +35,21 @@ public static AbstractSink create(Map configuration, StatsDRepor grpcConfig.getSinkGrpcServiceHost(), grpcConfig.getSinkGrpcServicePort(), grpcConfig.getSinkGrpcMethodUrl(), grpcConfig.getSinkGrpcResponseSchemaProtoClass()); firehoseInstrumentation.logDebug(grpcSinkConfig); ManagedChannel managedChannel = ManagedChannelBuilder.forAddress(grpcConfig.getSinkGrpcServiceHost(), grpcConfig.getSinkGrpcServicePort()) - .keepAliveTime(grpcConfig.getSinkGrpcArgKeepaliveTimeMS(), TimeUnit.MILLISECONDS) - .keepAliveTimeout(grpcConfig.getSinkGrpcArgKeepaliveTimeoutMS(), TimeUnit.MILLISECONDS) - .usePlaintext().build(); + .keepAliveTime(grpcConfig.getSinkGrpcArgKeepaliveTimeMS(), TimeUnit.MILLISECONDS) + .keepAliveTimeout(grpcConfig.getSinkGrpcArgKeepaliveTimeoutMS(), TimeUnit.MILLISECONDS) + .usePlaintext().build(); AppConfig appConfig = ConfigFactory.create(AppConfig.class, configuration); ProtoToMetadataMapper protoToMetadataMapper = new ProtoToMetadataMapper(stencilClient.get(appConfig.getInputSchemaProtoClass()), grpcConfig.getSinkGrpcMetadata()); GrpcClient grpcClient = new GrpcClient(new FirehoseInstrumentation(statsDReporter, GrpcClient.class), grpcConfig, managedChannel, stencilClient, protoToMetadataMapper); firehoseInstrumentation.logInfo("GRPC connection established"); + PayloadEvaluator grpcResponseRetryEvaluator = instantiatePayloadEvaluator(grpcConfig, stencilClient); + return new GrpcSink(new FirehoseInstrumentation(statsDReporter, GrpcSink.class), grpcClient, stencilClient, grpcConfig, grpcResponseRetryEvaluator); + } - return new GrpcSink(new FirehoseInstrumentation(statsDReporter, GrpcSink.class), grpcClient, stencilClient); + private static PayloadEvaluator instantiatePayloadEvaluator(GrpcSinkConfig grpcSinkConfig, StencilClient stencilClient) { + return new GrpcResponseCelPayloadEvaluator( + stencilClient.get(grpcSinkConfig.getSinkGrpcResponseSchemaProtoClass()), + grpcSinkConfig.getSinkGrpcResponseRetryCELExpression()); } } diff --git a/src/test/java/com/gotocompany/firehose/converter/GrpcSinkRetryErrorTypeConverterTest.java b/src/test/java/com/gotocompany/firehose/converter/GrpcSinkRetryErrorTypeConverterTest.java new file mode 100644 index 000000000..26e04d3e1 --- /dev/null +++ b/src/test/java/com/gotocompany/firehose/converter/GrpcSinkRetryErrorTypeConverterTest.java @@ -0,0 +1,34 @@ +package com.gotocompany.firehose.converter; + +import com.gotocompany.depot.error.ErrorType; +import com.gotocompany.firehose.config.converter.GrpcSinkRetryErrorTypeConverter; +import org.junit.Test; +import org.junit.jupiter.api.Assertions; + +import java.util.Arrays; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + + +public class GrpcSinkRetryErrorTypeConverterTest { + @Test + public void shouldConvertToAppropriateEnumType() { + Map stringToExpectedValue = Arrays.stream(ErrorType.values()) + .collect(Collectors.toMap(ErrorType::toString, Function.identity())); + GrpcSinkRetryErrorTypeConverter grpcSinkRetryErrorTypeConverter = new GrpcSinkRetryErrorTypeConverter(); + + stringToExpectedValue.keySet().stream() + .forEach(key -> { + ErrorType expectedValue = stringToExpectedValue.get(key); + ErrorType actualValue = grpcSinkRetryErrorTypeConverter.convert(null, key); + Assertions.assertEquals(expectedValue, actualValue); + }); + } + + @Test(expected = IllegalArgumentException.class) + public void shouldThrowExceptionForInvalidValue() { + GrpcSinkRetryErrorTypeConverter grpcSinkRetryErrorTypeConverter = new GrpcSinkRetryErrorTypeConverter(); + grpcSinkRetryErrorTypeConverter.convert(null, "ErrorType.UNREGISTERED"); + } +} diff --git a/src/test/java/com/gotocompany/firehose/evaluator/GrpcResponseCelPayloadEvaluatorTest.java b/src/test/java/com/gotocompany/firehose/evaluator/GrpcResponseCelPayloadEvaluatorTest.java new file mode 100644 index 000000000..6d99cba10 --- /dev/null +++ b/src/test/java/com/gotocompany/firehose/evaluator/GrpcResponseCelPayloadEvaluatorTest.java @@ -0,0 +1,121 @@ +package com.gotocompany.firehose.evaluator; + +import com.google.protobuf.Message; +import com.gotocompany.firehose.consumer.GenericError; +import com.gotocompany.firehose.consumer.GenericResponse; +import com.gotocompany.firehose.consumer.TestMessage; +import org.junit.Before; +import org.junit.Test; +import org.junit.jupiter.api.Assertions; + +public class GrpcResponseCelPayloadEvaluatorTest { + + private static final String CEL_EXPRESSION = "GenericResponse.success == false && GenericResponse.errors.exists(e, e.code == \"400\")"; + private PayloadEvaluator grpcPayloadEvaluator; + + @Before + public void setup() { + this.grpcPayloadEvaluator = new GrpcResponseCelPayloadEvaluator(GenericResponse.getDescriptor(), CEL_EXPRESSION); + } + + @Test + public void shouldEvaluateResponseToTrueWhenCELExpressionMatchesPayload() { + GenericResponse genericResponse = GenericResponse.newBuilder() + .setSuccess(false) + .setDetail("Detail Message") + .addErrors(GenericError.newBuilder() + .setCode("400") + .setEntity("GoFin") + .setCause("Unknown") + .build()) + .build(); + + boolean result = grpcPayloadEvaluator.evaluate(genericResponse); + + Assertions.assertTrue(result); + } + + @Test + public void shouldEvaluateResponseToFalseWhenCELExpressionDoesntMatchPayload() { + GenericResponse genericResponse = GenericResponse.newBuilder() + .setSuccess(false) + .setDetail("Detail Message") + .addErrors(GenericError.newBuilder() + .setCode("50000") + .setEntity("GoFin") + .setCause("Unknown") + .build()) + .build(); + + boolean result = grpcPayloadEvaluator.evaluate(genericResponse); + + Assertions.assertFalse(result); + } + + @Test + public void shouldEvaluateResponseToTrueWhenCelExpressionMatchesRange() { + PayloadEvaluator evaluator = forCELExpression("GenericResponse.errors.exists(e, int(e.code) >= 1000 && int(e.code) <= 2000)"); + GenericResponse genericResponse = GenericResponse.newBuilder() + .setSuccess(false) + .setDetail("Detail Message") + .addErrors(GenericError.newBuilder() + .setCode("1500") + .setEntity("GoFin") + .setCause("Unknown") + .build()) + .build(); + + boolean result = evaluator.evaluate(genericResponse); + + Assertions.assertTrue(result); + } + + @Test + public void shouldEvaluateResponseToFalseWhenCelExpressionMatchesRangeAndNotInSet() { + PayloadEvaluator evaluator = forCELExpression("GenericResponse.errors.exists(e, int(e.code) >= 1000 && int(e.code) <= 2000 && !(int(e.code) in [1500, 1600]))"); + GenericResponse genericResponse = GenericResponse.newBuilder() + .setSuccess(false) + .setDetail("Detail Message") + .addErrors(GenericError.newBuilder() + .setCode("1500") + .setEntity("GoFin") + .setCause("Unknown") + .build()) + .build(); + + boolean result = evaluator.evaluate(genericResponse); + + Assertions.assertFalse(result); + } + + @Test(expected = IllegalArgumentException.class) + public void shouldThrowIllegalArgumentExceptionWhenCelValidationFailed() { + forCELExpression("GenericResponse.nonExistField == true"); + } + + @Test(expected = IllegalArgumentException.class) + public void shouldThrowIllegalArgumentExceptionWhenPayloadIsNotRecognizedByDescriptor() { + TestMessage unregisteredPayload = TestMessage.newBuilder() + .setOrderUrl("url") + .build(); + + grpcPayloadEvaluator.evaluate(unregisteredPayload); + } + + @Test(expected = IllegalArgumentException.class) + public void shouldThrowIllegalArgumentExceptionWhenCelExpressionContainsUnregisteredMacro() { + String expressionWithUnregisteredMacro = "GenericResponse.errors.nonStandardMacro(e, e.code == \"400\")"; + + forCELExpression(expressionWithUnregisteredMacro); + } + + @Test(expected = UnsupportedOperationException.class) + public void shouldThrowOperationNotSupportedExceptionWhenCelExpressionResultIsNotBoolean() { + forCELExpression("GenericResponse.errors"); + } + + private static PayloadEvaluator forCELExpression(String celExpression) { + return new GrpcResponseCelPayloadEvaluator(GenericResponse.getDescriptor(), celExpression); + } + +} diff --git a/src/test/java/com/gotocompany/firehose/sink/grpc/GrpcSinkTest.java b/src/test/java/com/gotocompany/firehose/sink/grpc/GrpcSinkTest.java index 0e66daa2a..9a9a1b464 100644 --- a/src/test/java/com/gotocompany/firehose/sink/grpc/GrpcSinkTest.java +++ b/src/test/java/com/gotocompany/firehose/sink/grpc/GrpcSinkTest.java @@ -1,5 +1,11 @@ package com.gotocompany.firehose.sink.grpc; +import com.google.protobuf.InvalidProtocolBufferException; +import com.gotocompany.firehose.config.GrpcSinkConfig; +import com.gotocompany.firehose.consumer.GenericError; +import com.gotocompany.firehose.consumer.GenericResponse; +import com.gotocompany.firehose.evaluator.GrpcResponseCelPayloadEvaluator; +import com.gotocompany.firehose.evaluator.PayloadEvaluator; import com.gotocompany.firehose.exception.DeserializerException; import com.gotocompany.firehose.message.Message; import com.gotocompany.firehose.metrics.FirehoseInstrumentation; @@ -12,6 +18,7 @@ import org.apache.kafka.common.header.internals.RecordHeaders; import org.junit.Before; import org.junit.Test; +import org.junit.jupiter.api.Assertions; import org.mockito.Mock; import java.io.IOException; @@ -40,10 +47,23 @@ public class GrpcSinkTest { @Mock private FirehoseInstrumentation firehoseInstrumentation; + @Mock + private GrpcSinkConfig grpcSinkConfig; + + @Mock + private PayloadEvaluator defaultGrpcResponsePayloadEvaluator; + + private PayloadEvaluator grpcResponsePayloadEvaluator; @Before public void setUp() { initMocks(this); - sink = new GrpcSink(firehoseInstrumentation, grpcClient, stencilClient); + when(grpcSinkConfig.getSinkGrpcRetryErrorType()).thenReturn(ErrorType.SINK_RETRYABLE_ERROR); + when(defaultGrpcResponsePayloadEvaluator.evaluate(any())).thenReturn(true); + this.grpcResponsePayloadEvaluator = new GrpcResponseCelPayloadEvaluator( + GenericResponse.getDescriptor(), + "GenericResponse.success == false && GenericResponse.errors.exists(e, e.code == \"4000\")" + ); + sink = new GrpcSink(firehoseInstrumentation, grpcClient, stencilClient, grpcSinkConfig, defaultGrpcResponsePayloadEvaluator); } @Test @@ -72,20 +92,21 @@ public void shouldReturnBackListOfFailedMessages() throws IOException, Deseriali TestGrpcResponse build = TestGrpcResponse.newBuilder().setSuccess(false).build(); DynamicMessage response = DynamicMessage.parseFrom(build.getDescriptorForType(), build.toByteArray()); when(grpcClient.execute(any(), any(RecordHeaders.class))).thenReturn(response); + List failedMessages = sink.pushMessage(Collections.singletonList(message)); assertFalse(failedMessages.isEmpty()); assertEquals(1, failedMessages.size()); - verify(firehoseInstrumentation, times(1)).logInfo("Preparing {} messages", 1); verify(firehoseInstrumentation, times(1)).logDebug("Response: {}", response); verify(firehoseInstrumentation, times(1)).logWarn("Grpc Service returned error"); verify(firehoseInstrumentation, times(1)).logDebug("Failed messages count: {}", 1); + verify(firehoseInstrumentation, times(1)).logDebug("Retrying grpc service"); } @Test public void shouldCloseStencilClient() throws IOException { - sink = new GrpcSink(firehoseInstrumentation, grpcClient, stencilClient); + sink = new GrpcSink(firehoseInstrumentation, grpcClient, stencilClient, grpcSinkConfig, this.grpcResponsePayloadEvaluator); sink.close(); verify(stencilClient, times(1)).close(); @@ -93,9 +114,63 @@ public void shouldCloseStencilClient() throws IOException { @Test public void shouldLogWhenClosingConnection() throws IOException { - sink = new GrpcSink(firehoseInstrumentation, grpcClient, stencilClient); + sink = new GrpcSink(firehoseInstrumentation, grpcClient, stencilClient, grpcSinkConfig, this.grpcResponsePayloadEvaluator); sink.close(); verify(firehoseInstrumentation, times(1)).logInfo("GRPC connection closing"); } + + @Test + public void shouldReturnFailedMessagesWithRetryableErrorsWhenCELExpressionMatches() throws InvalidProtocolBufferException { + sink = new GrpcSink(firehoseInstrumentation, grpcClient, stencilClient, grpcSinkConfig, this.grpcResponsePayloadEvaluator); + Message payload = new Message(new byte[]{}, new byte[]{}, "topic", 0, 1); + GenericResponse response = GenericResponse.newBuilder() + .setSuccess(false) + .setDetail("detail") + .addErrors(GenericError.newBuilder() + .setCode("4000") + .setCause("cause") + .setEntity("gtf") + .build()) + .build(); + DynamicMessage dynamicMessage = DynamicMessage.parseFrom( + response.getDescriptorForType(), + response.toByteArray() + ); + when(grpcClient.execute(any(), any())) + .thenReturn(dynamicMessage); + sink = new GrpcSink(firehoseInstrumentation, grpcClient, stencilClient, grpcSinkConfig, grpcResponsePayloadEvaluator); + + List result = sink.pushMessage(Collections.singletonList(payload)); + + Assertions.assertEquals(1, result.size()); + Assertions.assertEquals(result.get(0).getErrorInfo().getErrorType(), ErrorType.SINK_RETRYABLE_ERROR); + } + + @Test + public void shouldReturnFailedMessagesWithNonRetryableErrorsWhenCELExpressionDoesntMatch() throws InvalidProtocolBufferException { + sink = new GrpcSink(firehoseInstrumentation, grpcClient, stencilClient, grpcSinkConfig, this.grpcResponsePayloadEvaluator); + Message payload = new Message(new byte[]{}, new byte[]{}, "topic", 0, 1); + GenericResponse response = GenericResponse.newBuilder() + .setSuccess(false) + .setDetail("detail") + .addErrors(GenericError.newBuilder() + .setCode("not-exist-code") + .setCause("cause") + .setEntity("gtf") + .build()) + .build(); + DynamicMessage dynamicMessage = DynamicMessage.parseFrom( + response.getDescriptorForType(), + response.toByteArray() + ); + when(grpcClient.execute(any(), any())) + .thenReturn(dynamicMessage); + sink = new GrpcSink(firehoseInstrumentation, grpcClient, stencilClient, grpcSinkConfig, grpcResponsePayloadEvaluator); + + List result = sink.pushMessage(Collections.singletonList(payload)); + + Assertions.assertEquals(1, result.size()); + Assertions.assertEquals(result.get(0).getErrorInfo().getErrorType(), ErrorType.SINK_NON_RETRYABLE_ERROR); + } }