-
Notifications
You must be signed in to change notification settings - Fork 386
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
API discussion #113
API discussion #113
Conversation
|
||
object Producer { | ||
|
||
def flow[K, V](producerProvider: () => KafkaProducer[K, V]): Flow[ProducerRecord[K, V], Future[ProducerRecord[K, V]], NotUsed] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Future
was discussed in akka/akka-meta#14, but is it really worth it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about exposing RecordMetadata
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I probably miss something, but when I looked at RecordMetadata
it didn't add anything that's not in the ProducerRecord
. (topic, partition, offset)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ProducerRecord
does not have offset
. RecordMetadata
is result of producing ProducerRecord
(s), containing the offset for record sent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How are you intending to handle errors by the way, how does the user of the flow know about why it failed, like for instance producing can fail with MessageSizeTooLargeException
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@RayRoestenburg ah, that was the missing piece, thanks. It should of course be part of the output.
Errors, isn't the stream simply completed with failure?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@patriknw re: errors, that sounds good, never mind, I was looking at some code and mixed some stuff up, time to sleep..
I glad to see detailed API without GraphDSL at all. Looks very promising 👍 Looks like Option 3 can be implemented on top of Option 1. My initial idea of API for reactive-kafka is separate API to low level and high level. Low level API is just stream based abstraction on top of kafka client abstraction. It does not have any shortcuts (like High level API is designed to provide smooth user experience and is build on top of low level API. This is Option 3. What if we keep both APIs? |
case class ConsumerShape[K, V]( | ||
commit: Inlet[CommitMsg], | ||
messages: Outlet[ConsumerRecord[K, V]], | ||
confirmation: Outlet[CommitConfirmation] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this needed? Can't this be handled by the Consumer internally? Also, if the commit is lost, won't the Consumer replay the message eventually anyway?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To provide at most once delivery you have to know that your offset is committed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand this scenario, but I haven't used Kafka before. Can you give me a scenario with steps how could this work? I always find strange if an ack is acked, especially if the side that receives the final ack is not the one that can do the recovery (replay).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Assume we have a stream process which is represented as A => B
. We need at most once delivery. For me implementation will looks like:
- fetch message
- commit offset and wait for commit confirmation
- perform
A => B
- publish to next topic
I am may be wrong in implementation so please correct me if any
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I see. But I don't see why should we expose this to users, in at-most-once there is already the option of losing stuff, so I don't see why the Source itself cannot handle this internally. In other words, what does it buy to expose this extra confirmation compared to just simply handling it in the source itself?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds interesting. Do we then agree that we should go forward with Option 3 as the "high level" API and explore an appropriate low level api?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@patriknw yes, I agree that we need API with 20% of complexity to solve 80% of tasks. But I am not sure about rich messages with commit
on a board. Here it is why - #113 (comment).
p.s. Not sure != dont like. I am just not sure and try to find why
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for all great thoughts. Regarding rich messages we don't intend to hide the meta data, such as topic, partition, offset. The user can act on that information to decide when to invoke commitOffset
. The commitOffset
method would talk to the Source
via a side-channel (probably impl with Promise).
The reason why we prefer a side channel (commitOffset
method) over the stream based inlet for commit messages is only because we think it will be too complicated for the user to wire up such graphs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@patriknw, @drewhk, @ktoso, @rkuhn could you also please comment on #113 (comment)? Am I right about stage lifecycle contract?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, of course the stage lifecycle must match the Kafka client lifecycle. But this has no influence on the API, the issues are the same, no matter whether you use streams or an async notification backchannel (i.e. there is an inherent problem in shutting down a KafkaSource while waiting for outstanding confirmations—because these might never arrive).
The thing is, that even the reverse direction works: you can implement Option 1 over Option 3.
I also think that we will need both style of APIs, but I would opt for using Option 3 first. Also, there will be always use-cases which we don't cover, I would be happy if we could keep complexity low. |
Thanks for very valueable discussion. Implementation will follow in #115 |
As a complement to the descriptions in akka/akka-meta#13 and akka/akka-meta#14 I have created API stubs for option 1 and 3 with example usage.
Let's discuss!
What other typical usage scenarios are there?