官方网站建设专家磐石网络,网易企业邮箱邮件怎么撤回,做网站安全联盟解,小学学校网站建设计划书案例与解决方案汇总页#xff1a;阿里云实时计算产品案例解决方案汇总 一. 背景介绍
如利用blinkMQ实现流计算中的延时统计问题一文中所描述的场景#xff0c;我们将其简化为以下案例#xff1a; 实时流的数据源结构如下#xff1a;
物流订单号支付时间仓接… 案例与解决方案汇总页阿里云实时计算产品案例解决方案汇总 一. 背景介绍
如利用blinkMQ实现流计算中的延时统计问题一文中所描述的场景我们将其简化为以下案例 实时流的数据源结构如下
物流订单号支付时间仓接单时间仓出库时间LP12018-08-01 08:00 LP12018-08-01 08:002018-08-01 09:00 LP22018-08-01 09:10 LP22018-08-01 09:102018-08-01 09:50 LP22018-08-01 09:102018-08-01 09:502018-08-01 12:00
我们期望通过以上数据源按照支付日期统计每个仓库的仓接单量、仓出库量、仓接单超2H未出库单量、仓接单超6H未出库单量。可以看出其中LP1仓接单时间是2018-08-01 09:00但一直到2018-08-01 12:00点之前一直都没有出库LP1满足仓接单超2H未出库的行为。
该场景的难点就在于订单未出库。而对于TT中的源消息流订单未出库TT就不会下发新的消息不下发新的消息blink就无法被触发计算。而针对上述的场景对于LP1我们需要在仓接单时间是2018-08-01 09:002H也就是2018-08-01 11:00的之后就要知道LP1已经仓接单但超2H未出库了。
二. 解决方案
本文主要是利用blink CEP来实现上述场景具体实现步骤如下所述。 第一步在source DDL中定义event_timestamp并定义sink如下
----定义source
create table sourcett_dwd_ri
(lg_order_code varchar comment 物流订单号,ded_pay_time varchar comment 支付时间,store_code varchar comment 仓库编码,store_name varchar comment 仓库名称,wms_create_time varchar comment 仓接单时间,wms_consign_create_time varchar comment 仓出库时间,evtstamp as case when coalesce(wms_create_time, ) then to_timestamp(wms_create_time, yyyy-MM-dd HH:mm:ss)else to_timestamp(1970-01-01 00:00:00, yyyy-MM-dd HH:mm:ss)end --构造event_timestamp如果源表本身带有消息的occur_time,可直接选择occur_time作为event_timestamp,WATERMARK FOR evtstamp AS withOffset(evtstamp, 10000) --设置延迟10秒处理
)
with
(typett,topicdwd_ri,accessKeyxxxxxx,accessIdxxxxxx,lengthCheckPAD,nullValues\\N|
);----定义sink
create table sink_hybrid_blink_cep
(ded_pay_date varchar comment 支付日期,store_code varchar comment 仓库编码,store_name varchar comment 仓库名称,wms_create_ord_cnt bigint comment 仓接单量,wms_confirm_ord_cnt bigint comment 仓出库量,wmsin_nowmsout_2h_ord_cnt bigint comment 仓接单超2小时未出库单量,wmsin_nowmsout_6h_ord_cnt bigint comment 仓接单超6小时未出库单量 ,sub_partition bigint comment 二级分区支付日期),PRIMARY KEY (ded_pay_date, store_code, sub_partition)
)
with
(typePetaData,url xxxxxx,tableNameblink_cep,userNamexxxxxx,passwordxxxxxx,bufferSize30000,batchSize3000,batchWriteTimeoutMs15000
);第二步根据blink CEP的标准语义进行改写如下
create view blink_cep_v1
as
select 仓接单-仓出库超时 as timeout_type,lg_order_code,wms_create_time as start_time,wms_consign_create_time as end_time
from source_dwd_csn_whc_lgt_fl_ord_ri
MATCH_RECOGNIZE
(PARTITION BY lg_order_codeORDER BY evtstampMEASURESe1.wms_create_time as wms_create_time,e2.wms_consign_create_time as wms_consign_create_timeONE ROW PER MATCH WITH TIMEOUT ROWS --重要必须设置延迟也下发AFTER MATCH SKIP TO NEXT ROWPATTERN (e1 - e2) WITHIN INTERVAL 6 HOUREMIT TIMEOUT (INTERVAL 2 HOUR, INTERVAL 6 HOUR)DEFINEe1 as e1.wms_create_time is not null and e1.wms_consign_create_time is null,e2 as e2.wms_create_time is not null and e2.wms_consign_create_time is not null
)
where wms_create_time is not null --重要可以大大减少进入CEP的消息量
and wms_consign_create_time is null --重要可以大大减少进入CEP的消息量
;
第三步根据blink的执行机制我们通过源实时流sourcett_dwd_ri与超时消息流blink_cep_v1关联来触发blink对超时消息进行聚合操作如下
create view blink_cep_v2
as
select a.lg_order_code as lg_order_code,last_value(a.store_code ) as store_code,last_value(a.store_name ) as store_name,last_value(a.ded_pay_time ) as ded_pay_time,last_value(a.wms_create_time ) as wms_create_time,last_value(a.real_wms_confirm_time ) as real_wms_confirm_time,last_value(case when coalesce(a.wms_create_time, ) and coalesce(a.real_wms_confirm_time, ) and now() - unix_timestamp(a.wms_create_time,yyyy-MM-dd HH:mm:ss) 7200then Y else N end) as flag_01,last_value(case when coalesce(a.wms_create_time, ) and coalesce(a.real_wms_confirm_time, ) and now() - unix_timestamp(a.wms_create_time,yyyy-MM-dd HH:mm:ss) 21600then Y else N end) as flag_02
from(select lg_order_code as lg_order_code,last_value(store_code ) as store_code,last_value(store_name ) as store_name,last_value(ded_pay_time ) as ded_pay_time,last_value(wms_create_time ) as wms_create_time,last_value(wms_consign_create_time) as real_wms_confirm_timefrom sourcett_dwd_rigroup by lg_order_code) a
left outer join(select lg_order_code,count(*) as cntfrom blink_cep_v1group by lg_order_code) b
on a.lg_order_code b.lg_order_code
group by a.lg_order_code
;insert into sink_hybrid_blink_cep
select regexp_replace(substring(a.ded_pay_time, 1, 10), -, ) as ded_pay_date,a.store_code,max(a.store_name) as store_name,count(case when coalesce(a.wms_create_time, ) then a.lg_order_code end) as wmsin_ord_cnt,count(case when coalesce(a.real_wms_confirm_time, ) then a.lg_order_code end) as wmsout_ord_cnt,count(case when a.flag_01 Y then a.lg_order_code end) as wmsin_nowmsout_2h_ord_cnt,count(case when a.flag_02 Y then a.lg_order_code end) as wmsin_nowmsout_6h_ord_cnt,cast(regexp_replace(SUBSTRING(ded_pay_time, 1, 10), -, ) as bigint) as sub_partition
from blink_cep_v2 as t1
where coalesce(lg_cancel_time, )
and coalesce(ded_pay_time, )
group by regexp_replace(substring(ded_pay_time, 1, 10), -, ),a.store_code
;
三. 问题拓展
blink CEP的参数比较多要完全看懂着实需要一些时间但CEP的强大是毋庸置疑的。CEP不仅可以解决物流场景中的超时统计问题风控中的很多场景也是信手拈来。这里有一个风控中的场景通过上述物流案例的用法我们是否能推敲出这个场景的用法呢 风控案例测试数据如下
刷卡时间银行卡ID刷卡地点2018-04-13 12:00:001WW2018-04-13 12:05:001WW12018-04-13 12:10:001WW22018-04-13 12:20:001WW
我们认为当一张银行卡在10min之内在不同的地点被刷卡大于等于两次我们就期望对消费者出发预警机制。
blink CEP是万能的么答案是否定的当消息乱序程度比较高的时候实时性和准确性就成了一对矛盾的存在。要想实时性比较高必然要求设置的offset越小越好但offset设置比较小就直接可能导致很多eventtimewatermark-offset的消息直接被丢弃准确性很难保证。比如在CP回传物流详情的时候经常回传的时间跟实操的时间差异很大实操时间是10点但回传时间是15点如果以实操时间作为eventtime可能就会导致这种差异很大的消息被直接丢掉无法进入CEP进而无法触发CEP后续的计算在使用CEP的过程中应该注意这一点。
四. 作者简介
花名缘桥来自菜鸟-CTO-数据部-仓配数据研发主要负责菜鸟仓配业务的离线和实时数据仓库建设以及创新数据技术和工具的探索和应用。 原文链接 本文为云栖社区原创内容未经允许不得转载。