-
Notifications
You must be signed in to change notification settings - Fork 421
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Signed-off-by: Song Gao <[email protected]>
- Loading branch information
Showing
3 changed files
with
204 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,98 @@ | ||
# Incremental Computation | ||
|
||
When using eKuiper to perform aggregate function calculations on data within a window, the previous implementation method was to segment the continuous stream of data according to the window definition and cache it in memory. Once the window ended, all data within the window would be aggregated and calculated. A problem with this method is that before the data is aggregated and calculated, caching it in memory can easily lead to memory amplification, causing OOM (Out of Memory) issues. | ||
|
||
Currently, eKuiper supports incremental computation for aggregate functions within a window, provided that the aggregate function supports incremental computation. As stream data enters the window, the incremental computation of the aggregate function will process this data and calculate an intermediate state, thereby eliminating the need to cache the entire data in memory. | ||
|
||
We can check which aggregate functions support incremental computation through the [following function list](../../sqls/functions/aggregate_functions.md). | ||
|
||
## Enabling Incremental Computation | ||
|
||
For the following scenario, we use count for aggregate computation within a window: | ||
|
||
```json | ||
{ | ||
"id": "rule", | ||
"sql": "SELECT count(*) from demo group by countwindow(4)", | ||
"actions": [ | ||
{ | ||
"log": { | ||
} | ||
} | ||
], | ||
"options" :{ | ||
} | ||
} | ||
``` | ||
|
||
For the above rule, we can query the rule's execution plan using the [explain api](../../api/restapi/rules.md#query-rule-plan): | ||
|
||
```txt | ||
{"op":"ProjectPlan_0","info":"Fields:[ Call:{ name:count, args:[*] } ]"} | ||
{"op":"WindowPlan_1","info":"{ length:4, windowType:COUNT_WINDOW, limit: 0 }"} | ||
{"op":"DataSourcePlan_2","info":"StreamName: demo"} | ||
``` | ||
|
||
We can enable incremental computation in the options, as shown in the following rule example: | ||
|
||
```json | ||
{ | ||
"id": "rule", | ||
"sql": "SELECT count(*) from demo group by countwindow(4)", | ||
"actions": [ | ||
{ | ||
"log": { | ||
} | ||
} | ||
], | ||
"options" :{ | ||
"planOptimizeStrategy": { | ||
"enableIncrementalWindow": true | ||
} | ||
} | ||
} | ||
``` | ||
|
||
Then, check the execution plan: | ||
|
||
```txt | ||
{"op":"ProjectPlan_0","info":"Fields:[ Call:{ name:bypass, args:[$$default.inc_agg_col_1] } ]"} | ||
{"op":"IncAggWindowPlan_1","info":"wType:COUNT_WINDOW, funcs:[Call:{ name:inc_count, args:[*] }->inc_agg_col_1]"} | ||
{"op":"DataSourcePlan_2","info":"StreamName: demo, StreamFields:[ inc_agg_col_1 ]"} | ||
``` | ||
|
||
From the above execution plan, it can be seen that during the execution of this rule, its plan has changed from `WindowPla`n to `IncAggWindowPlan`, indicating that data entering this window will be directly computed rather than cached in memory. | ||
|
||
## Scenarios Where Incremental Computation Cannot Be Used | ||
|
||
When there is an aggregate function that inherently cannot be computed incrementally, enabling incremental computation will have no effect, as shown in the following rule: | ||
|
||
```json | ||
{ | ||
"id": "rule", | ||
"sql": "SELECT count(*), stddev(a) from demo group by countwindow(4)", | ||
"actions": [ | ||
{ | ||
"log": { | ||
} | ||
} | ||
], | ||
"options" :{ | ||
"planOptimizeStrategy": { | ||
"enableIncrementalWindow": true | ||
} | ||
} | ||
} | ||
``` | ||
|
||
Check the execution plan: | ||
|
||
查看查询计划: | ||
|
||
```txt | ||
{"op":"ProjectPlan_0","info":"Fields:[ Call:{ name:count, args:[*] }, Call:{ name:stddev, args:[demo.a] } ]"} | ||
{"op":"WindowPlan_1","info":"{ length:4, windowType:COUNT_WINDOW, limit: 0 }"} | ||
{"op":"DataSourcePlan_2","info":"StreamName: demo"} | ||
``` | ||
|
||
It can be seen that since `stddev` is an aggregate function that does not support incremental computation, the execution plan for this rule does not enable incremental computation. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,98 @@ | ||
# 增量计算 | ||
|
||
在使用 eKuiper 对窗口内的数据进行聚合函数计算时,之前的实现方法是将源源不断的流数据按照窗口定义切分成窗口,并缓存在内存中。当窗口结束后,再将窗口内的所有数据进行聚合计算。该方法所带来的一个问题是当数据还未被聚合计算时,缓存在内存里容易造成内存放大,引起 OOM 问题。 | ||
|
||
目前,eKuiper 支持了对窗口内的聚合函数进行增量计算,只要该聚合函数支持增量计算。当流数据进入窗口时,聚合函数的增量计算将会对该数据进行处理并计算成一个中间状态,从而无需再将整条数据缓存在内存中。 | ||
|
||
我们可以通过以下[函数列表](../../sqls/functions/aggregate_functions.md)查询哪些聚合函数支持增量计算。 | ||
|
||
## 启用增量计算 | ||
|
||
对于以下场景,我们在一个窗口内用 `count` 来进行聚合计算: | ||
|
||
```json | ||
{ | ||
"id": "rule", | ||
"sql": "SELECT count(*) from demo group by countwindow(4)", | ||
"actions": [ | ||
{ | ||
"log": { | ||
} | ||
} | ||
], | ||
"options" :{ | ||
} | ||
} | ||
``` | ||
|
||
对于以上规则,我们可以通过 [explain api](../../api/restapi/rules.md#查询规则计划) 来查询规则的查询计划: | ||
|
||
```txt | ||
{"op":"ProjectPlan_0","info":"Fields:[ Call:{ name:count, args:[*] } ]"} | ||
{"op":"WindowPlan_1","info":"{ length:4, windowType:COUNT_WINDOW, limit: 0 }"} | ||
{"op":"DataSourcePlan_2","info":"StreamName: demo"} | ||
``` | ||
|
||
通过上述查询计划,我们可以了解到上述规则在实际运行时,会将数据缓存在内存中,等窗口结束后再进行计算,这可能会导致内存消耗过大。 | ||
|
||
我们可以通过在 `options` 中启用增量计算,以以下规则为例: | ||
|
||
```json | ||
{ | ||
"id": "rule", | ||
"sql": "SELECT count(*) from demo group by countwindow(4)", | ||
"actions": [ | ||
{ | ||
"log": { | ||
} | ||
} | ||
], | ||
"options" :{ | ||
"planOptimizeStrategy": { | ||
"enableIncrementalWindow": true | ||
} | ||
} | ||
} | ||
``` | ||
|
||
然后查看查询计划: | ||
|
||
```txt | ||
{"op":"ProjectPlan_0","info":"Fields:[ Call:{ name:bypass, args:[$$default.inc_agg_col_1] } ]"} | ||
{"op":"IncAggWindowPlan_1","info":"wType:COUNT_WINDOW, funcs:[Call:{ name:inc_count, args:[*] }->inc_agg_col_1]"} | ||
{"op":"DataSourcePlan_2","info":"StreamName: demo, StreamFields:[ inc_agg_col_1 ]"} | ||
``` | ||
|
||
通过上述查询计划,可以发现在该规则运行时,它的计划从 `WindowPlan` 改变为了 `IncAggWindowPlan`, 这代表了数据进入该窗口后会直接进行计算,而非缓存在内存内。 | ||
|
||
## 无法使用增量计算的场景 | ||
|
||
当存在某一个聚合函数本身无法被增量计算时,即使打开了增量计算也没有作用,如下述规则所示: | ||
|
||
```json | ||
{ | ||
"id": "rule", | ||
"sql": "SELECT count(*), stddev(a) from demo group by countwindow(4)", | ||
"actions": [ | ||
{ | ||
"log": { | ||
} | ||
} | ||
], | ||
"options" :{ | ||
"planOptimizeStrategy": { | ||
"enableIncrementalWindow": true | ||
} | ||
} | ||
} | ||
``` | ||
|
||
查看查询计划: | ||
|
||
```txt | ||
{"op":"ProjectPlan_0","info":"Fields:[ Call:{ name:count, args:[*] }, Call:{ name:stddev, args:[demo.a] } ]"} | ||
{"op":"WindowPlan_1","info":"{ length:4, windowType:COUNT_WINDOW, limit: 0 }"} | ||
{"op":"DataSourcePlan_2","info":"StreamName: demo"} | ||
``` | ||
|
||
可以看到由于 `stddev` 是一个不支持增量计算的聚合函数,所以这个规则的查询计划中并没有打开增量计算。 |