-
Notifications
You must be signed in to change notification settings - Fork 4
add Header support to filter? #1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
Hey Jiming! So you're suggesting adjusting the interface to look something like: /**
* Define the filter behavior.
* A return value of TRUE means the record WILL be shown.
* A return value of FALSE means the record will NOT be shown.
*
* @param topic Name of topic the message came from.
* @param partition Partition the message came from.
* @param offset Offset the message came from.
* @param key Deserialized Key object.
* @param value Deserialized Value object.
* @param headers Headers associated with the record.
* @return True means the record WILL be shown. False means the record will NOT be shown.
*/
boolean includeRecord(final String topic, final int partition, final long offset, final Object key, final Object value, final Map<String, byte[]> headers); ? |
Actually it looks like there's no constraint on Kafka's current implementation of Headers to ensure that header keys are unique, so perhaps using a Map here is not the right answer. |
@Crim , I think you can add a method without the key value, and this method is exeucted before Usually the header contains much smaller values. In that case boolean includeRecordByHeader(final String topic, final int partition, final long offset, final Map<String, byte[]> headers) {
// you can use Headers instead of Map here
if(headers.contains('isMan') && headers.get('isMan').equals("true")){
return true;
} else {
return false;
}
} |
So the RecordFilter is invoked after the key & message have already been deserialized by the underlying kafka consumer, it's injected using Kafka's interceptor interface. |
I see. Then perhaps using your proposal is fine. Only change the boolean includeRecord(final String topic, final int partition, final long offset, final Object key, final Object value, final Headers headers); |
Hi,
Kafka already support headers for record. Which is a very useful feature to support filter message?
What about add the support in the
RecordFilter
?Thanks!
Jiming
The text was updated successfully, but these errors were encountered: