Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow @Streaming for Flow #760

Open
ag2s20150909 opened this issue Jan 10, 2025 · 0 comments
Open

Allow @Streaming for Flow #760

ag2s20150909 opened this issue Jan 10, 2025 · 0 comments
Labels
enhancement New feature or request

Comments

@ag2s20150909
Copy link

Is your feature request related to a problem? Please describe.

Allow @streaming for Flow
#756

Describe the solution you'd like

.......
        install(ContentNegotiation){
                json(ktjson)
                register(NDJson, NdjsonContentConverter(ktjson))
                register(SSE, NdjsonContentConverter(ktjson))
            }

.....


@Serializable
data class FooResponse(
    @SerialName("aa")
    val aa: String,
    @SerialName("bb")
    val bb: String
)


interface TestService{
    
    @Streaming
    @GET("/api/test")
    suspend fun test(): Flow<FooResponse>
}



private val NDJson =  ContentType("application","x-ndjson")
 private val SSE =ContentType("text","event-stream")
 class NdjsonContentConverter(private val format: StringFormat): ContentConverter{
        override suspend fun deserialize(
            charset: Charset,
            typeInfo: TypeInfo,
            content: ByteReadChannel
        ): Any? {


            check(typeInfo.type==Flow::class){"For NdjsonContentConverter the return type must be kotlinx.coroutines.flow.Flow ,like  Flow<Foo> or Flow<out Foo>"}
            val responseType = typeInfo.upperBoundType(0)!!.type.javaObjectType
            val loader =format.serializersModule.serializer(responseType)
            return callbackFlow {
                while (!content.isClosedForRead) {
                    val line = content.readUTF8Line()
                    try {
                        if (!line.isNullOrEmpty()) {
                            val obj=if (line.startsWith("data:")){
                                //for SSE text/event-stream
                                ktjson.decodeFromString(loader,line.substringAfter("data:"))
                            }else{
                                //for NdJson application/x-ndjson
                                ktjson.decodeFromString(loader,line)
                            }
                            trySendBlocking(obj)
                        }

                    }catch (e:Exception){
                        throw e
                    }

                }
                awaitClose {  }

            }

        }

Describe alternatives you've considered

interface TestService{

    @Streaming
    @GET("/api/test")
    suspend fun _test() : HttpStatement
}

fun TestService.testFlow():Flow<FooResponse> = callbackFlow {
    val channel: ByteReadChannel=_test().body()
    while (!channel.isClosedForRead) {
        val line = channel.readUTF8Line()

        if (!line.isNullOrBlank()) {
            try {
                val obj=if (line.startsWith("data:")){
                    //for SSE text/event-stream
                    ktjson.decodeFromString(FooResponse.serializer(),line.substringAfter("data:"))
                }else{
                    //for NdJson application/x-ndjson
                    ktjson.decodeFromString(FooResponse.serializer(),line)
                }
                trySendBlocking(obj)
            }catch (e:Exception){
                throw e
            }
           
        }
    }
    awaitClose {

    }
}

or

inline fun  <reified T> HttpStatement.asFlow(format: StringFormat=ktjson):Flow<T> = callbackFlow {
    val content:ByteReadChannel=this@asFlow.body()

    while (!content.isClosedForRead) {
        val line = content.readUTF8Line()
        try {
            if (!line.isNullOrEmpty()) {
                val obj=if (line.startsWith("data:")){
                    //for SSE text/event-stream
                    format.decodeFromString<T>(line.substringAfter("data:"))
                }else{
                    //for NdJson application/x-ndjson
                   format.decodeFromString<T>(line)
                }
                trySendBlocking(obj)
            }

        }catch (e:Exception){
            throw e
        }
    }
    awaitClose {

    }

}

Additional context

No response

@ag2s20150909 ag2s20150909 added the enhancement New feature or request label Jan 10, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

1 participant