Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SpanId, traceId is lost when a coroutine is resumed on a different thread #243

Closed
andrejczyn opened this issue May 2, 2023 · 3 comments
Closed

Comments

@andrejczyn
Copy link

andrejczyn commented May 2, 2023

We are trying to migrate our services (kotlin/coroutines/webflux/spring) to Spring Boot 3. Unfortunately, we cannot force tracing to work correctly on our setup. It looks like the span context is lost when a coroutine is resumed on a different thread.

Environment
Spring, kotlin, coroutines

  • Micrometer version 1.10.5
  • Otel 1.0.3
  • context-propagation: 1.0.2
    A tried also to upgrade micrometer, otel, reactor dependencies to the latest snapshots but it doesn't help.

To Reproduce
I've created the sample project to demonstrate behavior:
https://github.com/andrejczyn/context-propagation-bug/blob/main/src/main/kotlin/com/andrejczyn/DemoApplication.kt
But generally when we call the endpoint:

    @GetMapping("/")
    suspend fun log() {
        logger.info { "Before ${Thread.currentThread().name} ${Span.current()}" }
        delay(10)
        logger.info { "After ${Thread.currentThread().name} ${Span.current()}" }
    }
}

we'll see that in second log the trace id has "0000..." trace, spandId:

2023-05-02T16:49:55.583+02:00  INFO [demo,470c8f2984f2cb5ddfd024818ba2af3e,81cf79a8a20cd1ce] 48439 --- [-2 @coroutine#1] com.andrejczyn.LogController             : Before reactor-http-nio-2 @coroutine#1 PropagatedSpan{ImmutableSpanContext{traceId=470c8f2984f2cb5ddfd024818ba2af3e, spanId=81cf79a8a20cd1ce, traceFlags=00, traceState=ArrayBasedTraceState{entries=[]}, remote=false, valid=true}}
2023-05-02T16:49:55.598+02:00  INFO [demo,,] 48439 --- [or @coroutine#1] com.andrejczyn.LogController             : After kotlinx.coroutines.DefaultExecutor @coroutine#1 PropagatedSpan{ImmutableSpanContext{traceId=00000000000000000000000000000000, spanId=0000000000000000, traceFlags=00, traceState=ArrayBasedTraceState{entries=[]}, remote=false, valid=false}}

Is there a chance that trace/span id will be propagated correctly for the coroutines?

@jonatan-ivanov jonatan-ivanov transferred this issue from micrometer-metrics/micrometer May 2, 2023
@jonatan-ivanov
Copy link
Member

Might be related: #174

@marcingrzejszczak
Copy link
Contributor

Closing in favour of #174

@maxxis95
Copy link

maxxis95 commented Jul 25, 2024

@marcingrzejszczak Hi, I'm trying to start coroutine from GatewayFilter. The tracer works fine till I enter into this flat map " return super.writeWith(fluxBody.buffer().flatMap { dataBuffers: List ->..."
until then I'm able to get current span and trace id. Inside this flat map tracer giving me null for current context.
During research I found I need to use this lines, to activate automatic propagation. So I having that set up
"
Hooks.enableAutomaticContextPropagation()
ContextRegistry.getInstance().registerThreadLocalAccessor(ObservationAwareSpanThreadLocalAccessor(tracer))
ObservationThreadLocalAccessor.getInstance().observationRegistry = observationRegistry
Metrics.observationRegistry(observationRegistry)
"
can someone told me how to resolve this propagation of trace id to reactive processing like here in this flat map?

@component
class BodyModifyingFilter(
private val linkExpansionAndTranslationService: LinkExpansionAndTranslationService,
private val observationRegistry: ObservationRegistry,
private val tracer: Tracer
) :
GatewayFilter, Ordered {

override fun filter(exchange: ServerWebExchange, chain: GatewayFilterChain): Mono<Void> {
    val decoratedResponse: ServerHttpResponseDecorator = object : ServerHttpResponseDecorator(exchange.response) {
        override fun writeWith(body: Publisher<out DataBuffer>): Mono<Void> {
            if (body is Flux<*>) {
                val fluxBody: Flux<out DataBuffer> = Flux.from(body as Publisher<DataBuffer>)
                return super.writeWith(fluxBody.buffer().flatMap { dataBuffers: List<DataBuffer> ->
                    val bufferFactory = exchange.response.bufferFactory()
                    val joined = bufferFactory.join(dataBuffers)
                    val bytes = ByteArray(joined.readableByteCount())
                    joined.read(bytes)
                    DataBufferUtils.release(joined) // Release the memory for the data buffer
                    val securityContextMono = ReactiveSecurityContextHolder.getContext()

                    securityContextMono.flatMap { securityContext ->
                        mono(SecurityCoroutineContext(securityContext, tracer)) {
                            linkExpansionAndTranslationService.processBody(exchange, bytes)
                        }.flatMap { processedBytes ->
                            val buffer = bufferFactory.wrap(processedBytes)
                            exchange.response.headers.contentLength = processedBytes.size.toLong()
                            Mono.just(buffer)
                        }
                    }

                })
            }
            return super.writeWith(body) // Fallback for other types of publishers
        }
    }

    return chain.filter(exchange.mutate().response(decoratedResponse).build())
}

}
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants