Skip to content

Commit

Permalink
feat: transfer去重字段优化 #1010158081121734120
Browse files Browse the repository at this point in the history
  • Loading branch information
wencong1724427771 committed Jan 21, 2025
1 parent 921e4f9 commit a50144a
Show file tree
Hide file tree
Showing 10 changed files with 128 additions and 69 deletions.
14 changes: 14 additions & 0 deletions bklog/apps/log_databus/handlers/collector_scenario/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,20 @@ def get_built_in_config(cls, es_version="5.X", etl_config=EtlConfig.BK_LOG_TEXT)
"""
raise NotImplementedError()

@staticmethod
def get_unique_field_list(field_list: list, target_fields: list, sort_fields: list):
"""
获取唯一字段列表
:param field_list: 字段列表
:param target_fields: 定位字段
:param sort_fields: 排序字段
"""
if target_fields:
field_list.extend(target_fields)
if sort_fields:
field_list.extend(sort_fields)
return sorted(set(field_list))

@classmethod
def change_data_stream(
cls, collector_config: CollectorConfig, mq_topic: Optional[str] = None, mq_partition: int = 1
Expand Down
26 changes: 16 additions & 10 deletions bklog/apps/log_databus/handlers/collector_scenario/custom.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,33 @@
We undertake not to change the open source license (MIT license) applicable to the current version of
the project delivered to anyone in the future.
"""
from django.utils.translation import ugettext as _

from apps.log_databus.constants import EtlConfig
from apps.log_databus.handlers.collector_scenario.base import CollectorScenario
from django.utils.translation import ugettext as _


class CustomCollectorScenario(CollectorScenario):
@classmethod
def get_built_in_config(cls, es_version="5.X", etl_config=EtlConfig.BK_LOG_TEXT):
def get_built_in_config(cls, es_version="5.X", etl_config=EtlConfig.BK_LOG_TEXT, **kwargs):
"""
获取采集器标准字段
"""
unique_field_list = cls.get_unique_field_list(
field_list=[
"cloudId",
"serverIp",
"path",
"gseIndex",
"iterationIndex",
"dtEventTimeStamp",
],
target_fields=kwargs.get("target_fields"),
sort_fields=kwargs.get("sort_fields"),
)
return {
"option": {
"es_unique_field_list": [
"cloudId",
"serverIp",
"path",
"gseIndex",
"iterationIndex",
"dtEventTimeStamp",
],
"es_unique_field_list": unique_field_list,
"separator_node_source": "",
"separator_node_action": "",
"separator_node_name": "",
Expand Down
23 changes: 14 additions & 9 deletions bklog/apps/log_databus/handlers/collector_scenario/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,20 +224,25 @@ def parse_steps(cls, steps):
return params

@classmethod
def get_built_in_config(cls, es_version="5.X", etl_config=EtlConfig.BK_LOG_TEXT):
def get_built_in_config(cls, es_version="5.X", etl_config=EtlConfig.BK_LOG_TEXT, **kwargs):
"""
获取采集器标准字段
"""
unique_field_list = cls.get_unique_field_list(
field_list=[
"cloudId",
"serverIp",
"gseIndex",
"iterationIndex",
"bk_host_id",
"dtEventTimeStamp",
],
target_fields=kwargs.get("target_fields"),
sort_fields=kwargs.get("sort_fields"),
)
return {
"option": {
"es_unique_field_list": [
"cloudId",
"serverIp",
"gseIndex",
"iterationIndex",
"bk_host_id",
"dtEventTimeStamp",
],
"es_unique_field_list": unique_field_list,
"separator_node_source": "",
"separator_node_action": "",
"separator_node_name": "",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,20 +106,25 @@ def parse_steps(cls, steps):
return {"redis_hosts": []}

@classmethod
def get_built_in_config(cls, es_version="5.X", etl_config=EtlConfig.BK_LOG_TEXT):
def get_built_in_config(cls, es_version="5.X", etl_config=EtlConfig.BK_LOG_TEXT, **kwargs):
"""
获取采集器标准字段
"""
unique_field_list = cls.get_unique_field_list(
field_list=[
"cloudId",
"serverIp",
"gseIndex",
"iterationIndex",
"bk_host_id",
"dtEventTimeStamp",
],
target_fields=kwargs.get("target_fields"),
sort_fields=kwargs.get("sort_fields"),
)
return {
"option": {
"es_unique_field_list": [
"cloudId",
"serverIp",
"gseIndex",
"iterationIndex",
"bk_host_id",
"dtEventTimeStamp",
],
"es_unique_field_list": unique_field_list,
"separator_node_source": "",
"separator_node_action": "",
"separator_node_name": "",
Expand Down
25 changes: 15 additions & 10 deletions bklog/apps/log_databus/handlers/collector_scenario/row.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,21 +210,26 @@ def parse_steps(cls, steps):
return params

@classmethod
def get_built_in_config(cls, es_version="5.X", etl_config=EtlConfig.BK_LOG_TEXT):
def get_built_in_config(cls, es_version="5.X", etl_config=EtlConfig.BK_LOG_TEXT, **kwargs):
"""
获取采集器标准字段
"""
unique_field_list = cls.get_unique_field_list(
field_list=[
"cloudId",
"serverIp",
"path",
"gseIndex",
"iterationIndex",
"bk_host_id",
"dtEventTimeStamp",
],
target_fields=kwargs.get("target_fields"),
sort_fields=kwargs.get("sort_fields"),
)
return {
"option": {
"es_unique_field_list": [
"cloudId",
"serverIp",
"path",
"gseIndex",
"iterationIndex",
"bk_host_id",
"dtEventTimeStamp",
],
"es_unique_field_list": unique_field_list,
"separator_node_source": "",
"separator_node_action": "",
"separator_node_name": "",
Expand Down
25 changes: 15 additions & 10 deletions bklog/apps/log_databus/handlers/collector_scenario/section.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,21 +219,26 @@ def parse_steps(cls, steps):
return params

@classmethod
def get_built_in_config(cls, es_version="5.X", etl_config=EtlConfig.BK_LOG_TEXT):
def get_built_in_config(cls, es_version="5.X", etl_config=EtlConfig.BK_LOG_TEXT, **kwargs):
"""
获取采集器标准字段
"""
unique_field_list = cls.get_unique_field_list(
field_list=[
"cloudId",
"serverIp",
"path",
"gseIndex",
"iterationIndex",
"bk_host_id",
"dtEventTimeStamp",
],
target_fields=kwargs.get("target_fields"),
sort_fields=kwargs.get("sort_fields"),
)
return {
"option": {
"es_unique_field_list": [
"cloudId",
"serverIp",
"path",
"gseIndex",
"iterationIndex",
"bk_host_id",
"dtEventTimeStamp",
],
"es_unique_field_list": unique_field_list,
"separator_node_source": "",
"separator_node_action": "",
"separator_node_name": "",
Expand Down
25 changes: 14 additions & 11 deletions bklog/apps/log_databus/handlers/collector_scenario/syslog.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
We undertake not to change the open source license (MIT license) applicable to the current version of
the project delivered to anyone in the future.
"""
from collections import OrderedDict

from django.utils.translation import ugettext as _

from apps.log_databus.constants import EtlConfig, LogPluginInfo, PluginParamLogicOpEnum
Expand Down Expand Up @@ -126,20 +124,25 @@ def parse_steps(cls, steps):
return {"syslog_protocol": "", "syslog_port": 0, "syslog_conditions": []}

@classmethod
def get_built_in_config(cls, es_version="5.X", etl_config=EtlConfig.BK_LOG_TEXT):
def get_built_in_config(cls, es_version="5.X", etl_config=EtlConfig.BK_LOG_TEXT, **kwargs):
"""
获取采集器标准字段
"""
unique_field_list = cls.get_unique_field_list(
field_list=[
"cloudId",
"serverIp",
"gseIndex",
"iterationIndex",
"bk_host_id",
"dtEventTimeStamp",
],
target_fields=kwargs.get("target_fields"),
sort_fields=kwargs.get("sort_fields"),
)
built_in_config = {
"option": {
"es_unique_field_list": [
"cloudId",
"serverIp",
"gseIndex",
"iterationIndex",
"bk_host_id",
"dtEventTimeStamp",
],
"es_unique_field_list": unique_field_list,
"separator_node_source": "",
"separator_node_action": "",
"separator_node_name": "",
Expand Down
25 changes: 15 additions & 10 deletions bklog/apps/log_databus/handlers/collector_scenario/wineventlog.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,21 +123,26 @@ def parse_steps(cls, steps):
}

@classmethod
def get_built_in_config(cls, es_version="5.X", etl_config=EtlConfig.BK_LOG_TEXT):
def get_built_in_config(cls, es_version="5.X", etl_config=EtlConfig.BK_LOG_TEXT, **kwargs):
"""
获取采集器标准字段
"""
unique_field_list = cls.get_unique_field_list(
field_list=[
"cloudId",
"serverIp",
"winEventId",
"winEventChannel",
"winEventRecordId",
"bk_host_id",
"dtEventTimeStamp",
],
target_fields=kwargs.get("target_fields"),
sort_fields=kwargs.get("sort_fields"),
)
return {
"option": {
"es_unique_field_list": [
"cloudId",
"serverIp",
"winEventId",
"winEventChannel",
"winEventRecordId",
"bk_host_id",
"dtEventTimeStamp",
],
"es_unique_field_list": unique_field_list,
"separator_node_source": "",
"separator_node_action": "",
"separator_node_name": "",
Expand Down
7 changes: 7 additions & 0 deletions bklog/apps/log_databus/handlers/etl/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,11 @@ def update_or_create(
# raise CollectorResultTableIDDuplicateException(
# CollectorResultTableIDDuplicateException.MESSAGE.format(result_table_id=table_id)
# )
index_set_obj = LogIndexSet.objects.filter(index_set_id=self.data.index_set_id).first()
if sort_fields is None and index_set_obj:
sort_fields = index_set_obj.sort_fields
if sort_fields is None and index_set_obj:
target_fields = index_set_obj.target_fields

# 1. meta-创建/修改结果表
etl_storage = EtlStorage.get_instance(etl_config=etl_config)
Expand All @@ -123,6 +128,8 @@ def update_or_create(
es_version=cluster_info["cluster_config"]["version"],
hot_warm_config=cluster_info["cluster_config"].get("custom_option", {}).get("hot_warm_config"),
es_shards=es_shards,
sort_fields=sort_fields,
target_fields=target_fields,
)

if not view_roles:
Expand Down
4 changes: 4 additions & 0 deletions bklog/apps/log_databus/handlers/etl_storage/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,8 @@ def update_or_create_result_table(
hot_warm_config: dict = None,
es_shards: int = settings.ES_SHARDS,
index_settings: dict = None,
sort_fields: list = None,
target_fields: list = None,
):
"""
创建或更新结果表
Expand All @@ -440,6 +442,8 @@ def update_or_create_result_table(
:param hot_warm_config: 冷热数据配置
:param es_shards: es分片数
:param index_settings: 索引配置
:param sort_fields: 排序字段
:param target_fields: 定位字段
"""
from apps.log_databus.handlers.collector import build_result_table_id

Expand Down

0 comments on commit a50144a

Please sign in to comment.