Skip to content

Commit 82b39ab

Browse files
committed
v2.5.0
1 parent c7a1d6a commit 82b39ab

6 files changed

+285
-5
lines changed

Advanced/Admin-API.md

+35-3
Original file line numberDiff line numberDiff line change
@@ -389,7 +389,7 @@ end
389389

390390
## Renaming a Consumer Group
391391

392-
!!! Warning "Never rename active consumer groups"
392+
!!! Warning "Never Rename Active Consumer Groups"
393393

394394
This method should **not** be used on actively running consumer groups, as it involves creating a temporary consumer to handle offset migration. Running this operation on active groups may cause unexpected behavior.
395395

@@ -409,9 +409,41 @@ When using `rename_consumer_group`, the method ensures that offsets from the old
409409

410410
If the new consumer group already exists, the offsets from the old group will be merged into it. This may result in the continuation of message processing from the combined offsets, so plan accordingly.
411411

412+
## Copying a Consumer Group
413+
414+
!!! warning "Never Copy Active Consumer Groups"
415+
416+
This method should **not** be used on actively running consumer groups, as it involves creating a temporary consumer to handle offset migration. Running this operation on active groups may cause unexpected behavior.
417+
418+
The `#copy_consumer_group` method in Karafka Admin API allows you to copy offsets from an existing consumer group to another while preserving its consumption state for specific topics. This functionality is useful when creating a duplicate consumer group with the same consumption progress as an existing one.
419+
420+
```ruby
421+
Karafka::Admin.copy_consumer_group(
422+
'source_group_name',
423+
'target_group_name',
424+
['topic1', 'topic2']
425+
)
426+
```
427+
428+
When using `#copy_consumer_group`, the method ensures that offsets from the source consumer group are transferred to the target one, maintaining continuity in message consumption. You need to specify which topics should have their offsets copied during the process, giving you control over what gets migrated.
429+
430+
!!! Tip "Offset Merger with Existing Consumer Groups"
431+
432+
If the target consumer group already exists, the offsets from the source group will be merged into it. This may result in the continuation of message processing from the combined offsets, so plan accordingly.
433+
434+
The method returns `true` if offsets were successfully copied or `false` if there was nothing to copy (for example, if the source consumer group doesn't exist or has no committed offsets for the specified topics).
435+
436+
This functionality is particularly useful for:
437+
438+
- Creating backup consumer groups before making significant changes
439+
- Testing new consumer configurations with the same consumption progress
440+
- Setting up disaster recovery scenarios
441+
442+
Unlike `#rename_consumer_group`, this method preserves the source consumer group, allowing both groups to exist simultaneously.
443+
412444
## Deleting a Consumer Group
413445

414-
!!! warning "Never delete active consumer groups"
446+
!!! warning "Never Delete Active Consumer Groups"
415447

416448
This method should only be used for consumer groups **not** actively used. Deleting a consumer group that is currently in use (running) can lead to data loss, inconsistencies, or unexpected behavior in your Kafka cluster.
417449

@@ -425,7 +457,7 @@ Karafka::Admin.delete_consumer_group('your_consumer_group_name')
425457

426458
## Changing an Offset of a Consumer Group
427459

428-
!!! warning "Never alter active consumer groups"
460+
!!! warning "Never Alter Active Consumer Groups"
429461

430462
This method should only be used for consumer groups **not** actively used. Altering a consumer group that is currently in use (running) can lead to data loss, inconsistencies, or unexpected behavior in your Kafka cluster.
431463

Advanced/Concurrency-and-Multithreading.md

+21
Original file line numberDiff line numberDiff line change
@@ -181,3 +181,24 @@ Karafka provides an advanced operation mode known as Swarm, designed to optimize
181181
In Swarm Mode, Karafka forks multiple independent processes, each capable of running concurrently. This approach allows the framework to manage and supervise these processes effectively, ensuring high availability and resilience. By doing so, Karafka can better distribute the workload across available CPU cores, minimizing bottlenecks and maximizing processing speed.
182182

183183
Swarm has its own section. You can read about it [here](Swarm-Multi-Process).
184+
185+
## Setting Thread Priority
186+
187+
Karafka supports explicit thread priority configuration. Adjusting thread priorities can mitigate performance issues caused by mixed workloads, particularly by reducing latency when running IO-bound and CPU-bound tasks concurrently.
188+
189+
Karafka processing threads have a default priority set to `-1`. Lowering this priority further can significantly reduce tail latency for IO-bound tasks, ensuring more balanced resource allocation, especially in scenarios with CPU-intensive workloads that could monopolize the Global VM Lock (GVL).
190+
191+
```ruby
192+
class KarafkaApp < Karafka::App
193+
setup do |config|
194+
# Lower worker thread priority to prevent CPU-bound tasks from starving IO-bound threads
195+
config.worker_thread_priority = -3
196+
end
197+
end
198+
```
199+
200+
Lowering thread priority (e.g., negative values like `-1`, `-3`) can significantly reduce tail latency for IO-bound tasks. This ensures more balanced resource allocation, especially in scenarios with CPU-intensive workloads that could monopolize the Global VM Lock (GVL).
201+
202+
!!! tip "Thread Priority and GVL"
203+
204+
Ruby employs a Global VM Lock (GVL) that ensures only one thread executes Ruby code at a time. The Ruby VM switches threads roughly every 100ms (thread quantum) unless explicitly released (such as during IO operations). CPU-intensive tasks holding the GVL for the entire quantum period can significantly increase latency for other threads, especially those performing quick IO tasks. Adjusting thread priority mitigates this issue by influencing the scheduling decisions and allowing shorter, IO-bound threads more frequent access to the CPU.

Consuming-Messages.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -348,7 +348,7 @@ iterator = ::Karafka::Pro::Iterator.new(
348348
)
349349

350350
iterator.each do |message|
351-
# Cast to integer because headers are always string
351+
# Cast to integer because headers are always strings or arrays of strings
352352
next unless message.headers['user-id'].to_i == 5
353353

354354
user_5_events << message

Pro/Enhanced-Dead-Letter-Queue.md

+34
Original file line numberDiff line numberDiff line change
@@ -356,3 +356,37 @@ When implementing a custom DLQ strategy in Karafka, the `#call` method is expect
356356
</tr>
357357
</tbody>
358358
</table>
359+
360+
## Dynamic DLQ Target Topic
361+
362+
Karafka Pro also supports the dynamic determination of the DLQ target topic. This feature is useful when the target DLQ topic may vary depending on runtime conditions or message metadata.
363+
364+
To enable dynamic DLQ target topics, set the `topic:` option to `:strategy` in your routing configuration. Your strategy class's `#call` method should then return an array instead of a single symbol:
365+
366+
- The first element is the symbol representing the action (`:retry`, `:dispatch`, `:skip`).
367+
- The second element specifies the dynamically determined target DLQ topic.
368+
369+
```ruby
370+
class DynamicDlqStrategy
371+
def call(errors_tracker, attempt)
372+
if errors_tracker.last.is_a?(SpecialError)
373+
[:dispatch, 'dlq_topic_for_specials']
374+
else
375+
[:dispatch, 'dlq_topic_for_anything_else']
376+
end
377+
end
378+
end
379+
380+
class KarafkaApp < Karafka::App
381+
routes.draw do
382+
topic :orders_states do
383+
consumer OrdersStatesConsumer
384+
385+
dead_letter_queue(
386+
topic: :strategy,
387+
strategy: DynamicDlqStrategy.new
388+
)
389+
end
390+
end
391+
end
392+
```

Pro/Virtual-Partitions.md

+137-1
Original file line numberDiff line numberDiff line change
@@ -82,12 +82,27 @@ Below is a list of arguments the `#virtual_partitions` topic method accepts.
8282
<td><code>#call</code></td>
8383
<td>Reducer for VPs key. It allows for a custom reducer to achieve enhanced parallelization when the default reducer is insufficient.</td>
8484
</tr>
85+
<tr>
86+
<td><code>distribution</code></td>
87+
<td>Symbol</td>
88+
<td>
89+
Strategy used to distribute messages across virtual partitions:
90+
<ul style="margin-top: 10px;">
91+
<li>
92+
<code>:consistent</code> (default) ensures messages with the same key always go to the same virtual partition, maintaining consistency across batches.
93+
</li>
94+
<li>
95+
<code>:balanced</code> distributes work evenly across workers while preserving message order within key groups, improving utilization by up to 50% for uneven workloads.
96+
</li>
97+
</ul>
98+
</td>
99+
</tr>
85100
</tbody>
86101
</table>
87102

88103
## Messages Distribution
89104

90-
Message distribution is based on the outcome of the `virtual_partitions` settings. Karafka will make sure to distribute work into jobs with a similar number of messages in them (as long as possible). It will also take into consideration the current `concurrency` setting and the `max_partitions` setting defined within the `virtual_partitions` method.
105+
Message distribution is based on the outcome of the `virtual_partitions` settings. Karafka will make sure to distribute work into jobs with a similar number of messages in them (as long as possible). It will also take into consideration the current `concurrency` setting and the `max_partitions` setting defined within the `virtual_partitions` method and will take into consideration appropriate `:strategy`.
91106

92107
Below is a diagram illustrating an example partitioning flow of a single partition data. Each job will be picked by a separate worker and executed in parallel (or concurrently when IO is involved).
93108

@@ -187,6 +202,127 @@ routes.draw do
187202
end
188203
```
189204

205+
### Distribution Strategies
206+
207+
Karafka's Virtual Partitions feature provides two distribution strategies to determine how messages are allocated across consumer instances:
208+
209+
- `:consistent` (default)
210+
- `:balanced`.
211+
212+
These strategies give you flexibility in optimizing message distribution based on your specific workload characteristics and processing approach.
213+
214+
#### Consistent Distribution (Default)
215+
216+
By default, Karafka uses a consistent distribution strategy that ensures messages with the same partitioner result are always assigned to the same virtual partition consumer. This provides predictable and stable message routing, particularly important for stateful processing or when message order within a key group must be preserved across multiple batches.
217+
218+
```ruby
219+
routes.draw do
220+
topic :orders_states do
221+
consumer OrdersStatesConsumer
222+
223+
virtual_partitions(
224+
partitioner: ->(message) { message.headers['order_id'] },
225+
# Default - each key always gets routed to the same virtual partition
226+
# This provides consistent multi-batch distribution
227+
distribution: :consistent
228+
)
229+
end
230+
end
231+
```
232+
233+
The consistent distribution strategy ensures that:
234+
235+
1. The same virtual partition always processes messages with the same partitioner outcome
236+
2. Distribution remains stable between batches
237+
3. Per-key ordering is strictly maintained
238+
239+
However, consistent distribution can sometimes lead to suboptimal resource utilization when certain keys contain significantly more messages than others, potentially leaving some worker threads idle while others are overloaded.
240+
241+
#### Balanced Distribution
242+
243+
Karafka also supports a balanced distribution strategy that dynamically distributes workloads across available workers, potentially improving resource utilization by up to 50%. This strategy prioritizes even work distribution while maintaining message order within each key group.
244+
245+
```ruby
246+
routes.draw do
247+
topic :orders_states do
248+
consumer OrdersStatesConsumer
249+
250+
virtual_partitions(
251+
partitioner: ->(message) { message.headers['order_id'] },
252+
# Balanced distribution for more even workload distribution
253+
distribution: :balanced
254+
)
255+
end
256+
end
257+
```
258+
259+
The balanced distribution strategy operates as follows:
260+
261+
1. Messages are grouped by their partition key (as determined by the partitioner)
262+
2. Key groups are sorted by size (number of messages) in descending order
263+
3. Each key group is assigned to the worker with the least current workload
264+
4. Messages within each group maintain their offset order
265+
266+
This approach ensures that:
267+
268+
- Larger message groups are processed first
269+
- Work is distributed more evenly across available workers
270+
- Message order within each key group is preserved within a single batch
271+
- All available worker threads are utilized effectively
272+
273+
##### Important Considerations for Balanced Distribution
274+
275+
When using the balanced distribution strategy, keep in mind:
276+
277+
- **Cross-batch assignment is not guaranteed** - Unlike consistent distribution, the same key may be assigned to different virtual partitions across different batches
278+
- **Stateful processing considerations** - If your consumer maintains state for specific keys across multiple batches, consistent distribution may still be more appropriate
279+
- **Messages with the same key are never split** - While keys may be assigned to different virtual partitions in different batches, all messages with the same key in a single batch will be processed together
280+
281+
#### Choosing the Right Distribution Strategy
282+
283+
Consider these factors when selecting a distribution strategy:
284+
285+
<table border="1">
286+
<thead>
287+
<tr>
288+
<th>Use <code>:consistent</code> when:</th>
289+
<th>Use <code>:balanced</code> when:</th>
290+
</tr>
291+
</thead>
292+
<tbody>
293+
<tr>
294+
<td>Processing requires stable assignment of keys to workers across batches</td>
295+
<td>Processing is stateless or state is managed externally</td>
296+
</tr>
297+
<tr>
298+
<td>You're implementing window-based aggregations spanning multiple polls</td>
299+
<td>Maximizing worker thread utilization is a priority</td>
300+
</tr>
301+
<tr>
302+
<td>Predictable routing is more important than even utilization</td>
303+
<td>Message keys have highly variable message counts</td>
304+
</tr>
305+
<tr>
306+
<td>Keys have relatively similar message counts</td>
307+
<td>You want to optimize for throughput with uneven workloads</td>
308+
</tr>
309+
</tbody>
310+
</table>
311+
312+
#### Performance Comparison
313+
314+
The balanced distribution strategy can significantly improve resource utilization in high-throughput scenarios with uneven message distribution. Internal benchmarks show improvements of up to 50% in throughput for workloads where:
315+
316+
- Message keys have highly variable message counts
317+
- Processing is IO-bound (such as database operations)
318+
- Worker threads would otherwise be underutilized with consistent distribution
319+
320+
The performance gains are most significant when:
321+
322+
1. Some keys contain many more messages than others
323+
2. The total number of keys is greater than the number of available worker threads
324+
3. Message processing involves IO operations that can benefit from concurrent execution
325+
190326
## Managing Number of Virtual Partitions
191327

192328
By default, Karafka will create at most `Karafka::App.config.concurrency` concurrent Virtual Partitions. This approach allows Karafka to occupy all the threads under optimal conditions.

WaterDrop/Usage.md

+57
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,63 @@ Here are all the things you can provide in the message hash:
5151

5252
Keep in mind, that message you want to send should be either binary or stringified (to_s, to_json, etc).
5353

54+
## Headers
55+
56+
Kafka headers allow you to attach key-value metadata to messages, which can be helpful for routing, filtering, tracing, and more. WaterDrop supports headers via the `headers:` key in message hashes.
57+
58+
### Format
59+
60+
Kafka headers are optional and must be provided as a `Hash`. According to [KIP-82](https://cwiki.apache.org/confluence/display/KAFKA/KIP-82+-+Add+Record+Headers), each header key must be a string, and each value must be either:
61+
62+
- a **string**, or
63+
- an **array of strings**.
64+
65+
This means WaterDrop supports both forms:
66+
67+
```ruby
68+
# Single value per header
69+
headers: {
70+
'request-id' => '123abc',
71+
'source' => 'payment-service'
72+
}
73+
```
74+
75+
```ruby
76+
# Multiple values per header key (KIP-82-compliant)
77+
headers: {
78+
'flags' => ['internal', 'async'],
79+
'source' => ['payment-service']
80+
}
81+
```
82+
83+
### Example Usage
84+
85+
#### Sync with headers
86+
87+
```ruby
88+
producer.produce_sync(
89+
topic: 'my-topic',
90+
payload: 'payload-with-headers',
91+
headers: {
92+
'request-id' => 'abc-123',
93+
'tags' => ['blue', 'fast']
94+
}
95+
)
96+
```
97+
98+
#### Async with headers
99+
100+
```ruby
101+
producer.produce_async(
102+
topic: 'my-topic',
103+
payload: 'payload-with-headers',
104+
headers: {
105+
'tenant-id' => 'tenant-42',
106+
'features' => ['beta', 'test']
107+
}
108+
)
109+
```
110+
54111
## Delivery Results
55112

56113
When dispatching messages using WaterDrop, you can choose between receiving a delivery report or a delivery handle, depending on whether you perform synchronous or asynchronous dispatches.

0 commit comments

Comments
 (0)