免费网站建设协议,建筑人才网官网 北京,百度网址大全址大全,天津港电子商务网Flink任务优化分享
1.背景介绍
线上计算任务在某版本上线之后发现每日的任务时长都需要三个多小时才能完成#xff0c;计算时间超过了预估时间#xff0c;通过Dolphinscheduler的每日调度任务看#xff0c;在数据层 dwd 的数据分段任务存在严重的性能问题#xff0c;每天…Flink任务优化分享
1.背景介绍
线上计算任务在某版本上线之后发现每日的任务时长都需要三个多小时才能完成计算时间超过了预估时间通过Dolphinscheduler的每日调度任务看在数据层 dwd 的数据分段任务存在严重的性能问题每天的计算耗时将近40分钟并且随着数据量的上涨时间越来越长因此这个计算节点需要着重优化。
2.改进思路及实施
现在的大数据计算任务是用 flink 执行的因此优化的入手点就是从 Flink History Server 上看任务的执行计划找到耗时较多的节点以及是否有节点因为sql逻辑被重复执行导致耗时较高。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-GwHDHsdd-1690266698352)(/Users/apple/Documents/yangxin/develop/image-20230413173034642.png)]
如图所示可以发现计算任务走了三个分叉从sql最后的输出来看只有两个insert表操作所以这里至少有一条分叉是不必要的然后就是找到分叉点的原因为什么会导致任务分成了三个分支这个就需要执行计划慢慢去理界面上可以点开每个节点看到他的执行计算优化之后的结果然后来判断这一节点对应了sql的哪一步。着重需要判断的就是产生分支的那个节点
Sort(orderBy[tenant_id ASC, room_id ASC, msg_start_time ASC]
) -
Calc(select[__etl_time__, date_id, tenant_id, brand_id, channel, channel_app_id, channel_session_type, msg_id, msg_start_time, msg_end_time, msg_from_id, msg_from_orig_id, msg_from_nk, msg_from_role, msg_to_ids, msg_to_users, msg_type, msg_content, msg_detail, group_chat_info, dialogue_id, room_id, operation_flags, recording_properties, asr_properties, metric_properties, tags, tag_properties, dialogue_properties, lastMsgEndTime, nextMsgStartTime, is_cut_by_msg_time, is_fit_specific_event, pre_is_fit_specific_event, fit_specific_row,CAST(FROM_UNIXTIME(w0$o0)) AS start_time,CAST(FROM_UNIXTIME(w0$o1)) AS end_time,CAST(w1$o0) AS fit_specific_rows,GenIsFitConsecutiveRowsAndTime(channel_session_type, tenant_id, CAST(w1$o0), CAST(CAST(FROM_UNIXTIME(w0$o0))), CAST(CAST(FROM_UNIXTIME(w0$o1))), is_fit_specific_event) AS is_fit_specific_flag,(is_cut_by_msg_time _UTF-16LE1:VARCHAR(2147483647) CHARACTER SET UTF-16LE) AS $39, // (GenIsFitConsecutiveRowsAndTime(channel_session_type, tenant_id, CAST(w1$o0), CAST(CAST(FROM_UNIXTIME(w0$o0))), CAST(CAST(FROM_UNIXTIME(w0$o1))), is_fit_specific_event) 1) AS $40]
) -
OverAggregate(partitionBy[tenant_id, room_id],orderBy[msg_start_time ASC],window#0[LAG(is_fit_specific_flag) AS w0$o0RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW],select[__etl_time__, date_id, tenant_id, brand_id, channel, channel_app_id, channel_session_type, msg_id, msg_start_time, msg_end_time, msg_from_id, msg_from_orig_id, msg_from_nk, msg_from_role, msg_to_ids, msg_to_users, msg_type, msg_content, msg_detail, group_chat_info, dialogue_id, room_id, operation_flags, recording_properties, asr_properties, metric_properties, tags, tag_properties, dialogue_properties, lastMsgEndTime, nextMsgStartTime, is_cut_by_msg_time, is_fit_specific_event, pre_is_fit_specific_event, fit_specific_row,start_time, end_time, fit_specific_rows, is_fit_specific_flag, $39, // 是否按时间切 $40, // 当前条是否为 1w0$o0 - pre_is_fit_specific_flag // 前一条是否满足特殊规则]
) - (Calc(select[date_id, tenant_id, channel_session_type, msg_id, msg_start_time, room_id, tags,IF(($39 OR (w0$o0 IS NULL AND $40) OR ((w0$o0 is_fit_specific_flag) IS TRUE AND w0$o0 IS NOT NULL)), 1, 0) AS is_cut_flag,CAST(tenant_id) AS $8,CAST(msg_start_time) AS $9,GenCutPointTypeByFeature(channel_session_type, tenant_id, tags) AS $10]) - OverAggregate(partitionBy[tenant_id, room_id],orderBy[msg_start_time ASC],window#0[COUNT(is_cut_flag) AS w0$o0,$SUM0(is_cut_flag) AS w0$o1RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW],select[date_id, tenant_id, channel_session_type, msg_id, msg_start_time, room_id, tags, is_cut_flag, $8, - tenant_id$9, - msg_start_time$10, - 特征切分点: START/END/NOw0$o0, - countw0$o1 - sum]) - Calc(select[CONCAT_WS(_UTF-16LE-, $8, room_id, date_id, CAST(CASE((w0$o0 0:BIGINT), w0$o1, null:INTEGER))) AS dialogue_id1,channel_session_type, tenant_id, msg_id, $9 AS $f4, tags, $10 AS cutPointType]), #####################CREATE TEMPORARY VIEW keep_cutpoint_view ASSELECT dialogue_id1, smoothRes.smoothResultVoMapFROM (SELECT dialogue_id1,smoothCutPoint(channel_session_type, tenant_id, dialogue_id1, msg_id, msg_start_time, tags, cutPointType) AS smoothResFROM gen_cut_type_by_feature_viewGROUP BY dialogue_id1);#####################Calc(select[__etl_time__, date_id, tenant_id, brand_id, channel, channel_app_id, channel_session_type, msg_id, msg_start_time, msg_end_time, msg_from_id, msg_from_orig_id, msg_from_nk, msg_from_role, msg_to_ids, msg_to_users, msg_type, msg_content, msg_detail, group_chat_info, dialogue_id, room_id, operation_flags, recording_properties, asr_properties, metric_properties, tags, tag_properties, dialogue_properties, lastMsgEndTime, nextMsgStartTime, is_cut_by_msg_time, is_fit_specific_event, pre_is_fit_specific_event, fit_specific_row, start_time, end_time, fit_specific_rows, is_fit_specific_flag, w0$o0 AS pre_is_fit_specific_flag, CASE(w0$o0 IS NULL, is_fit_specific_flag, (w0$o0 is_fit_specific_flag), 1, 0) AS is_cut_by_specific, IF(($39 OR (w0$o0 IS NULL AND $40) OR ((w0$o0 is_fit_specific_flag) IS TRUE AND w0$o0 IS NOT NULL)), 1, 0) AS is_cut_flag, IF((IF(($39 OR (w0$o0 IS NULL AND $40) OR ((w0$o0 is_fit_specific_flag) IS TRUE AND w0$o0 IS NOT NULL)), 1, 0) 1), _UTF-16LEstart, null:VARCHAR(2147483647) CHARACTER SET UTF-16LE) AS $42,CAST(tenant_id) AS $43, GenCutPointTypeByFeature(channel_session_type, tenant_id, tags) AS $44]) - OverAggregate(partitionBy[tenant_id, room_id],orderBy[msg_start_time ASC],window#0[COUNT(is_cut_flag) AS w0$o0,$SUM0(is_cut_flag) AS w0$o1RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select[__etl_time__, date_id, tenant_id, brand_id, channel, channel_app_id, channel_session_type, msg_id, msg_start_time, msg_end_time, msg_from_id, msg_from_orig_id, msg_from_nk, msg_from_role, msg_to_ids, msg_to_users, msg_type, msg_content, msg_detail, group_chat_info, dialogue_id, room_id, operation_flags, recording_properties, asr_properties, metric_properties, tags, tag_properties, dialogue_properties, lastMsgEndTime, nextMsgStartTime, is_cut_by_msg_time, is_fit_specific_event, pre_is_fit_specific_event, fit_specific_row, start_time, end_time, fit_specific_rows, is_fit_specific_flag, pre_is_fit_specific_flag, is_cut_by_specific, is_cut_flag, $42, - cut_point_type$43, $44, w0$o0, w0$o1]) - Calc(select[date_id, tenant_id, brand_id, channel, channel_app_id, channel_session_type, msg_id, msg_start_time, msg_end_time, msg_from_id, msg_from_orig_id, msg_from_nk, msg_from_role, msg_to_ids, msg_to_users, msg_type, msg_content, msg_detail, group_chat_info, room_id, operation_flags, recording_properties, asr_properties, metric_properties, tags, tag_properties, dialogue_properties, CONCAT_WS(_UTF-16LE-, $43, room_id, date_id, CAST(CASE((w0$o0 0:BIGINT), w0$o1, null:INTEGER))) AS dialogue_id1, $44 AS cutPointType])
)######################
根据时间特殊规则先生成一轮 dialogu_id1
特征的切分点命中被提前下推到这个阶段执行BUG优化点:
1. 平滑使用了 groupBy 再 join 会主表, 导致计算流走了分支, 导致部分计算逻辑重复执行了, 这一部分可以考虑用 over 聚合来做gen_fit_sprcific_flag_view - gen_cut_flag_view - gen_dialogue_id_by_cut_flag_view - gen_cut_type_by_feature_view - keep_cutpoint_view2. CASE WHEN pre_is_fit_specific_flag IS NULL THEN is_fit_specific_flagWHEN pre_is_fit_specific_flag is_fit_specific_flag THEN 1WHEN pre_is_fit_specific_flag is_fit_specific_flag THEN 0ELSE 0END AS is_cut_by_specific逻辑不对, 导致 GenIsFitConsecutiveRowsAndTime 被重复执行
3. IF(is_cut_flag 1, start, CAST(NULL AS STRING)) AS cut_point_type, 需要判断一下是否还有必要5. Sort(orderBy[dialogue_id1 ASC]
) -
SortAggregate(isMerge[false], groupBy[dialogue_id1], select[dialogue_id1, smoothCutPoint(channel_session_type, tenant_id, dialogue_id1, msg_id, $f4, tags, cutPointType) AS smoothRes]
) -
Calc(select[dialogue_id1, smoothRes.smoothResultVoMap AS smoothResultVoMap]
) - (Correlate(invocation[GetCutPointBySplit($cor7.smoothResultVoMap)],correlate[table(GetCutPointBySplit($cor7.smoothResultVoMap))],select[dialogue_id1,smoothResultVoMap,msgId,cutPointMap],rowType[RecordType(VARCHAR(2147483647) dialogue_id1, (VARCHAR(2147483647), (VARCHAR(2147483647), VARCHAR(2147483647)) MAP) MAP smoothResultVoMap,VARCHAR(2147483647) msgId,(VARCHAR(2147483647), VARCHAR(2147483647)) MAP cutPointMap)], joinType[INNER]) - Calc(select[dialogue_id1, msgId, ITEM(cutPointMap, _UTF-16LEisKeep) AS isKeep]),Correlate(invocation[GetCutPointBySplit($cor9.smoothResultVoMap)],correlate[table(GetCutPointBySplit($cor9.smoothResultVoMap))],select[dialogue_id1,smoothResultVoMap,msgId,cutPointMap], rowType[RecordType(VARCHAR(2147483647) dialogue_id1, (VARCHAR(2147483647), (VARCHAR(2147483647), VARCHAR(2147483647)) MAP) MAP smoothResultVoMap, VARCHAR(2147483647) msgId, (VARCHAR(2147483647), VARCHAR(2147483647)) MAP cutPointMap)], joinType[INNER]) - Calc(select[dialogue_id1, msgId, ITEM(cutPointMap, _UTF-16LEisKeep) AS isKeep]), Correlate(invocation[GetCutPointBySplit($cor8.smoothResultVoMap)], correlate[table(GetCutPointBySplit($cor8.smoothResultVoMap))], select[dialogue_id1,smoothResultVoMap,msgId,cutPointMap], rowType[RecordType(VARCHAR(2147483647) dialogue_id1, (VARCHAR(2147483647), (VARCHAR(2147483647), VARCHAR(2147483647)) MAP) MAP smoothResultVoMap, VARCHAR(2147483647) msgId, (VARCHAR(2147483647), VARCHAR(2147483647)) MAP cutPointMap)], joinType[INNER]) -Calc(select[dialogue_id1, msgId, ITEM(cutPointMap, _UTF-16LEisKeep) AS isKeep])
)######################
计算平滑逻辑优化点:
1. smoothCutPoint 很大的性能问题, 改成基于 over 聚合的 udaf, 优化掉 GROUPBY LATERAL TABLE JOIN对应sql如下
--根据配置文件特征数据对数据进行特征切分标记
CREATE TEMPORARY VIEW gen_cut_type_by_feature_view AS
SELECT *,GenCutPointTypeByFeature(channel_session_type, tenant_id, tags) AS cutPointType
FROM gen_dialogue_id_by_cut_flag_view;CREATE TEMPORARY VIEW keep_cutpoint_view AS
SELECT dialogue_id1, smoothRes.smoothResultVoMap
FROM (SELECT dialogue_id1,smoothCutPoint(channel_session_type, tenant_id, dialogue_id1, msg_id, msg_start_time, tags, cutPointType) AS smoothResFROM gen_cut_type_by_feature_viewGROUP BY dialogue_id1
);CREATE TEMPORARY VIEW keep_cutpoint_breakup AS
SELECT dialogue_id1, smoothResultVoMap, msgId, cutPointMap, cutPointMap[isKeep] AS isKeep
FROM keep_cutpoint_view, LATERAL TABLE(GetCutPointBySplit(smoothResultVoMap)) AS T(msgId, cutPointMap);CREATE TEMPORARY VIEW keep_cutpoint_join AS
SELECT t1.*,t2.isKeep, IF(t2.isKeep 0, no, t1.cutPointType) AS curCutPointType, msg_start_time
FROM gen_cut_type_by_feature_view t1
LEFT JOIN keep_cutpoint_breakup t2 ON t1.dialogue_id1 t2.dialogue_id1 AND t1.msg_id t2.msgId;CREATE TEMPORARY VIEW gen_dialogue_id_by_feature_view0 AS
SELECTdate_id,tenant_id,brand_id,channel,channel_app_id,channel_session_type,msg_id,msg_start_time,msg_end_time,msg_from_id,msg_from_nk,msg_from_orig_id,msg_from_role,msg_to_ids,msg_to_users,msg_type,msg_content,msg_detail,group_chat_info,room_id,operation_flags,recording_properties,asr_properties,metric_properties,tags,tag_properties,dialogue_properties,dialogue_id1,cutPointType,curCutPointType,preCutPointType,isKeep,CONCAT_WS(-,CAST(tenant_id AS STRING),room_id,date_id,CAST(SUM(IF(preCutPointType IS NULL OR preCutPointType end OR curCutPointType start, 1, 0)) OVER (PARTITION BY tenant_id, room_id, date_id ORDER BY msg_start_time)AS STRING)) AS dialogue_id
FROM (SELECTdate_id,tenant_id,brand_id,channel,channel_app_id,channel_session_type,msg_id,msg_start_time,msg_end_time,msg_from_id,msg_from_nk,msg_from_orig_id,msg_from_role,msg_to_ids,msg_to_users,msg_type,msg_content,msg_detail,group_chat_info,room_id,operation_flags,recording_properties,asr_properties,metric_properties,tags,tag_properties,dialogue_properties,dialogue_id1,cutPointType,curCutPointType,isKeep,LAG(curCutPointType) OVER ( PARTITION BY dialogue_id1 ORDER BY msg_start_time) AS preCutPointTypeFROM keep_cutpoint_join);之前sql对于这个分段平滑逻辑的实现是先根据idalogue_id group by数据使用udaf去得到聚合结果然后在通过msg_id将聚合结果join回原来的明细数据里这种做法就会产生分岔不仅性能差而且会重复执行计算节点导致耗时上升。这种做法在后边的相关性聚合也是差不多的这样一分析问题就找到了就是要把聚合结果join回主表这种做法换一种更高效的方式实现具体改进思路就是将原来这种方式改成基于 over 聚合的 udaf, 优化掉 GROUPBY LATERAL TABLE JOIN
优化之后的sql
--根据配置文件特征数据对数据进行特征切分标记
CREATE TEMPORARY VIEW gen_cut_type_by_feature_view AS
SELECT *,GenCutPointTypeByFeature(channel_session_type, tenant_id, tags) AS cutPointType
FROM gen_dialogue_id_by_cut_flag_view;CREATE TEMPORARY VIEW keep_cutpoint_view AS
SELECT *,smooth_result[msg_id][is_keep] AS isKeep,CASE WHEN smooth_result[msg_id][is_keep] 0 THEN no ELSE cutPointType END AS curCutPointType
FROM(SELECT *,smoothCutPoint(channel_session_type, tenant_id, dialogue_id1, msg_id, msg_start_time, tags, cutPointType) OVER ( PARTITION BY dialogue_id1) AS smooth_resultFROM gen_cut_type_by_feature_view);CREATE TEMPORARY VIEW gen_dialogue_id_by_feature_view0 AS
SELECTdate_id,tenant_id,brand_id,channel,channel_app_id,channel_session_type,msg_id,msg_start_time,msg_end_time,msg_from_id,msg_from_nk,msg_from_orig_id,msg_from_role,msg_to_ids,msg_to_users,msg_type,msg_content,msg_detail,group_chat_info,room_id,operation_flags,recording_properties,asr_properties,metric_properties,tags,tag_properties,dialogue_properties,CONCAT_WS(-,CAST(tenant_id AS STRING),room_id,date_id,CAST(SUM( CASE WHEN dialogue_id1 preDialogueId OR preCutPointType IS NULL OR preCutPointType end OR curCutPointType start THEN 1 ELSE 0 END) OVER (PARTITION BY tenant_id, room_id, date_id ORDER BY msg_start_time)AS STRING)) AS dialogue_id
FROM (SELECTdate_id,tenant_id,brand_id,channel,channel_app_id,channel_session_type,msg_id,msg_start_time,msg_end_time,msg_from_id,msg_from_nk,msg_from_orig_id,msg_from_role,msg_to_ids,msg_to_users,msg_type,msg_content,msg_detail,group_chat_info,room_id,operation_flags,recording_properties,asr_properties,metric_properties,tags,tag_properties,dialogue_properties,dialogue_id1,cutPointType,curCutPointType,LAG(dialogue_id1) OVER ( PARTITION BY tenant_id, room_id, date_id ORDER BY msg_start_time) AS preDialogueId,LAG(curCutPointType) OVER ( PARTITION BY tenant_id, room_id, date_id ORDER BY msg_start_time) AS preCutPointTypeFROM keep_cutpoint_view);相关性的优化也是一样的思路改成基于 over 聚合的 udaf减少聚合结果join回原表的这种操作
相关性sql对比
CREATE TEMPORARY VIEW dialogue_relevant_view AS
SELECTtenant_id,brand_id,channel,channel_app_id,channel_session_type,date_id,dialogue_id as dialogue_id,res.relevant_config_version as relevant_config_version ,res.relevant_config as relevant_config ,res.metrics as metrics ,res.dialogue_relevant as dialogue_relevant
FROM (select dialogue_relevant_udaf(channel_session_type, tenant_id, msg_id, msg_start_time, msg_end_time, msg_from_role,tags) as res,tenant_id,brand_id,channel,channel_app_id,channel_session_type,date_id,dialogue_idfrom gen_dialogue_id_by_feature_viewgroup by tenant_id,brand_id,channel,channel_app_id,channel_session_type,date_id,dialogue_id);CREATE TEMPORARY VIEW dialogue_view_all AS
selectNOW() as __etl_time__,a.date_id,a.tenant_id,a.brand_id,a.channel,a.channel_app_id,a.channel_session_type,a.msg_id,a.msg_start_time,a.msg_end_time,a.msg_from_id,a.msg_from_nk,a.msg_from_orig_id,a.msg_from_role,a.msg_to_ids,a.msg_to_users,a.msg_type,a.msg_content,a.msg_detail,a.group_chat_info,a.room_id,a.operation_flags,a.recording_properties,a.asr_properties,a.metric_properties,a.tags,a.tag_properties,map_put(map_put(a.dialogue_properties , dialogue_relevant , b.dialogue_relevant),relevant_config,b.relevant_config) as dialogue_properties,a.dialogue_id1,a.cutPointType,a.curCutPointType,a.preCutPointType,a.isKeep,a.dialogue_id
from gen_dialogue_id_by_feature_view aleft join dialogue_relevant_view bon a.tenant_id b.tenant_id anda.brand_id b.brand_id anda.channel b.channel anda.channel_app_id b.channel_app_id anda.channel_session_type b.channel_session_type anda.dialogue_id b.dialogue_id;#####################################CREATE TEMPORARY VIEW dialogue_view AS
selectdate_id,tenant_id,brand_id,channel,channel_app_id,channel_session_type,msg_id,msg_start_time,msg_end_time,msg_from_id,msg_from_nk,msg_from_orig_id,msg_from_role,msg_to_ids,msg_to_users,msg_type,msg_content,msg_detail,group_chat_info,room_id,operation_flags,recording_properties,asr_properties,metric_properties,tags,tag_properties,dialogue_properties,dialogue_id,dialogue_relevant_udaf(channel_session_type, tenant_id, msg_id, msg_start_time, msg_end_time, msg_from_role,tags) OVER (PARTITION BY tenant_id,brand_id,channel,channel_app_id,channel_session_type,date_id,dialogue_id) AS res
from gen_dialogue_id_by_feature_view ;3.优化结果
优化之后的执行计划清爽很多执行速度也有了明显提升从原来的将近40分钟的计算时长减少到7分钟提升巨大