Skip to content

Commit

Permalink
fix: Include metadata in StatusRuntimeException (#1906)
Browse files Browse the repository at this point in the history
  • Loading branch information
jtjeferreira authored Feb 23, 2024
1 parent 701dce6 commit 65989ff
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 15 deletions.
29 changes: 18 additions & 11 deletions runtime/src/main/scala/akka/grpc/internal/AkkaHttpClientUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@ import akka.annotation.InternalApi
import akka.event.LoggingAdapter
import akka.grpc.GrpcProtocol.GrpcProtocolReader
import akka.grpc.{ GrpcClientSettings, GrpcResponseMetadata, GrpcSingleResponse, ProtobufSerializer }
import akka.grpc.scaladsl.StringEntry
import akka.http.scaladsl.model.HttpEntity.{ Chunk, Chunked, LastChunk, Strict }
import akka.http.scaladsl.{ ClientTransport, ConnectionContext, Http }
import akka.http.scaladsl.model.{ AttributeKey, HttpHeader, HttpRequest, HttpResponse, RequestResponseAssociation, Uri }
import akka.http.scaladsl.model._
import akka.http.scaladsl.settings.ClientConnectionSettings
import akka.stream.{ Materializer, OverflowStrategy }
import akka.stream.scaladsl.{ Keep, Sink, Source }
Expand Down Expand Up @@ -220,7 +221,11 @@ object AkkaHttpClientUtils {
.watchTermination()((_, done) =>
done.onComplete(_ => trailerPromise.trySuccess(immutable.Seq.empty)))
case Strict(_, data) =>
trailerPromise.success(immutable.Seq.empty)
val rawTrailers = response.attribute(AttributeKeys.trailer).map(_.headers).getOrElse(Seq.empty)
val trailers = rawTrailers.map(h => HttpHeader.parse(h._1, h._2)).collect {
case HttpHeader.ParsingResult.Ok(header, _) => header
}
trailerPromise.success(trailers)
Source.single[ByteString](data)
case _ =>
response.entity.discardBytes()
Expand Down Expand Up @@ -281,19 +286,21 @@ object AkkaHttpClientUtils {
response: HttpResponse,
trailers: Seq[HttpHeader]): StatusRuntimeException = {
val allHeaders = response.headers ++ trailers
val metadata: io.grpc.Metadata =
new MetadataImpl(allHeaders.map(h => (h.name, StringEntry(h.value))).toList).toGoogleGrpcMetadata()
allHeaders.find(_.name == "grpc-status").map(_.value) match {
case None =>
new StatusRuntimeException(
mapHttpStatus(response)
.withDescription("No grpc-status found")
.augmentDescription(s"When calling rpc service: ${requestUri.toString()}"))
val status = mapHttpStatus(response)
.withDescription("No grpc-status found")
.augmentDescription(s"When calling rpc service: ${requestUri.toString()}")
new StatusRuntimeException(status, metadata)
case Some(statusCode) =>
val description = allHeaders.find(_.name == "grpc-message").map(_.value)
new StatusRuntimeException(
Status
.fromCodeValue(statusCode.toInt)
.withDescription(description.orNull)
.augmentDescription(s"When calling rpc service: ${requestUri.toString()}"))
val status = Status
.fromCodeValue(statusCode.toInt)
.withDescription(description.orNull)
.augmentDescription(s"When calling rpc service: ${requestUri.toString()}")
new StatusRuntimeException(status, metadata)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,13 @@ package akka.grpc.internal
import scala.concurrent.Future
import scala.concurrent.duration._
import akka.actor.ActorSystem
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.HttpEntity.Strict
import akka.http.scaladsl.model.HttpResponse
import akka.http.scaladsl.model.StatusCodes._
import akka.http.scaladsl.model.Uri
import akka.http.scaladsl.model.headers.RawHeader
import akka.testkit.TestKit
import akka.util.ByteString
import io.grpc.{ Status, StatusRuntimeException }
import io.grpc.{ Metadata, Status, StatusRuntimeException }
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.matchers.should.Matchers
import org.scalatest.time.Span
Expand All @@ -41,12 +40,35 @@ class AkkaHttpClientUtilsSpec extends TestKit(ActorSystem()) with AnyWordSpecLik

"map a strict 200 response with non-0 gRPC error code to a failed stream" in {
val requestUri = Uri("https://example.com/GuestExeSample/GrpcHello")
val responseHeaders = List(RawHeader("grpc-status", "9"), RawHeader("custom-key", "custom-value-in-header"))
val response =
Future.successful(HttpResponse(OK, responseHeaders, Strict(GrpcProtocolNative.contentType, ByteString.empty)))
val source = AkkaHttpClientUtils.responseToSource(requestUri, response, null, false)

val failure = source.run().failed.futureValue
failure.asInstanceOf[StatusRuntimeException].getStatus.getCode should be(Status.Code.FAILED_PRECONDITION)
failure.asInstanceOf[StatusRuntimeException].getTrailers.get(key) should be("custom-value-in-header")
}

"map a strict 200 response with non-0 gRPC error code with a trailer to a failed stream with trailer metadata" in {
val requestUri = Uri("https://example.com/GuestExeSample/GrpcHello")
val responseHeaders = List(RawHeader("grpc-status", "9"))
val responseTrailers = Trailer(RawHeader("custom-key", "custom-trailer-value") :: Nil)
val response = Future.successful(
HttpResponse(OK, List(RawHeader("grpc-status", "9")), Strict(GrpcProtocolNative.contentType, ByteString.empty)))
new HttpResponse(
OK,
responseHeaders,
Map.empty[AttributeKey[_], Any].updated(AttributeKeys.trailer, responseTrailers),
Strict(GrpcProtocolNative.contentType, ByteString.empty),
HttpProtocols.`HTTP/1.1`))
val source = AkkaHttpClientUtils.responseToSource(requestUri, response, null, false)

val failure = source.run().failed.futureValue
failure.asInstanceOf[StatusRuntimeException].getStatus.getCode should be(Status.Code.FAILED_PRECONDITION)
failure.asInstanceOf[StatusRuntimeException].getTrailers should not be null
failure.asInstanceOf[StatusRuntimeException].getTrailers.get(key) should be("custom-trailer-value")
}

lazy val key = Metadata.Key.of("custom-key", Metadata.ASCII_STRING_MARSHALLER)
}
}

0 comments on commit 65989ff

Please sign in to comment.