网站建设方案书例子,网站建设服务合同范本,虹口基础微网站开发,文昌建设局网站简介#xff1a; FlinkHologres亿级用户实时UV精确去重最佳实践
UV、PV计算#xff0c;因为业务需求不同#xff0c;通常会分为两种场景#xff1a;
离线计算场景#xff1a;以T1为主#xff0c;计算历史数据实时计算场景#xff1a;实时计算日常新增的数据#xff0…简介 FlinkHologres亿级用户实时UV精确去重最佳实践
UV、PV计算因为业务需求不同通常会分为两种场景
离线计算场景以T1为主计算历史数据实时计算场景实时计算日常新增的数据对用户标签去重
针对离线计算场景Hologres基于RoaringBitmap提供超高基数的UV计算只需进行一次最细粒度的预聚合计算也只生成一份最细粒度的预聚合结果表就能达到亚秒级查询。具体详情可以参见往期文章Hologres如何支持超高基数UV计算(基于RoaringBitmap实现)
对于实时计算场景可以使用FlinkHologres方式并基于RoaringBitmap实时对用户标签去重。这样的方式可以较细粒度的实时得到用户UV、PV数据同时便于根据需求调整最小统计窗口如最近5分钟的UV实现类似实时监控的效果更好的在大屏等BI展示。相较于以天、周、月等为单位的去重更适合在活动日期进行更细粒度的统计并且通过简单的聚合也可以得到较大时间单位的统计结果。
主体思想
Flink将流式数据转化为表与维表进行JOIN操作再转化为流式数据。此举可以利用Hologres维表的insertIfNotExists特性结合自增字段实现高效的uid映射。Flink把关联的结果数据按照时间窗口进行处理根据查询维度使用RoaringBitmap进行聚合并将查询维度以及聚合的uid存放在聚合结果表其中聚合出的uid结果放入Hologres的RoaringBitmap类型的字段中。查询时与离线方式相似直接按照查询条件查询聚合结果表并对其中关键的RoaringBitmap字段做or运算后并统计基数即可得出对应用户数。处理流程如下图所示方案最佳实践
1.创建相关基础表
1创建表uid_mapping为uid映射表用于映射uid到32位int类型。
RoaringBitmap类型要求用户ID必须是32位int类型且越稠密越好即用户ID最好连续。常见的业务系统或者埋点中的用户ID很多是字符串类型或Long类型因此需要使用uid_mapping类型构建一张映射表。映射表利用Hologres的SERIAL类型自增的32位int来实现用户映射的自动管理和稳定映射。由于是实时数据, 设置该表为行存表以提高Flink维表实时JOIN的QPS。
BEGIN;
CREATE TABLE public.uid_mapping (
uid text NOT NULL,
uid_int32 serial,
PRIMARY KEY (uid)
);
--将uid设为clustering_key和distribution_key便于快速查找其对应的int32值
CALL set_table_property(public.uid_mapping, clustering_key, uid);
CALL set_table_property(public.uid_mapping, distribution_key, uid);
CALL set_table_property(public.uid_mapping, orientation, row);
COMMIT; 2创建表dws_app为基础聚合表用于存放在基础维度上聚合后的结果。
使用RoaringBitmap前需要创建RoaringBitmap extention同时也需要Hologres实例为0.10版本
CREATE EXTENSION IF NOT EXISTS roaringbitmap;
为了更好性能建议根据基础聚合表数据量合理的设置Shard数但建议基础聚合表的Shard数设置不超过计算资源的Core数。推荐使用以下方式通过Table Group来设置Shard数
--新建shard数为16的Table Group
--因为测试数据量百万级其中后端计算资源为100core设置shard数为16
BEGIN;
CREATE TABLE tg16 (a int); --Table Group哨兵表
call set_table_property(tg16, shard_count, 16);
COMMIT;
相比离线结果表此结果表增加了时间戳字段用于实现以Flink窗口周期为单位的统计。结果表DDL如下
BEGIN;
create table dws_app(country text,prov text,city text, ymd text NOT NULL, --日期字段timetz TIMESTAMPTZ, --统计时间戳可以实现以Flink窗口周期为单位的统计uid32_bitmap roaringbitmap, -- 使用roaringbitmap记录uvprimary key(country, prov, city, ymd, timetz)--查询维度和时间作为主键防止重复插入数据
);
CALL set_table_property(public.dws_app, orientation, column);
--日期字段设为clustering_key和event_time_column便于过滤
CALL set_table_property(public.dws_app, clustering_key, ymd);
CALL set_table_property(public.dws_app, event_time_column, ymd);
--等价于将表放在shard数为16的table group
call set_table_property(public.dws_app, colocate_with, tg16);
--group by字段设为distribution_key
CALL set_table_property(public.dws_app, distribution_key, country,prov,city);
COMMIT;
2.Flink实时读取数据并更新dws_app基础聚合表
完整示例源码请见alibabacloud-hologres-connectors examples
1Flink 流式读取数据源DataStream并转化为源表Table
//此处使用csv文件作为数据源也可以是kafka等
DataStreamSource odsStream env.createInput(csvInput, typeInfo);
// 与维表join需要添加proctime字段详见https://help.aliyun.com/document_detail/62506.html
Table odsTable tableEnv.fromDataStream(odsStream,$(uid),$(country),$(prov),$(city),$(ymd),$(proctime).proctime());
// 注册到catalog环境
tableEnv.createTemporaryView(odsTable, odsTable);2将源表与Hologres维表uid_mapping进行关联
其中维表使用insertIfNotExists参数即查询不到数据时自行插入uid_int32字段便可以利用Hologres的serial类型自增创建。
// 创建Hologres维表其中nsertIfNotExists表示查询不到则自行插入
String createUidMappingTable String.format(create table uid_mapping_dim( uid string, uid_int32 INT ) with ( connectorhologres, dbname %s, //Hologres DB名 tablename %s,//Hologres 表名 username %s, //当前账号access id password %s, //当前账号access key endpoint %s, //Hologres endpoint insertifnotexiststrue ),database, dimTableName, username, password, endpoint);
tableEnv.executeSql(createUidMappingTable);
// 源表与维表join
String odsJoinDim SELECT ods.country, ods.prov, ods.city, ods.ymd, dim.uid_int32 FROM odsTable AS ods JOIN uid_mapping_dim FOR SYSTEM_TIME AS OF ods.proctime AS dim ON ods.uid dim.uid;
Table joinRes tableEnv.sqlQuery(odsJoinDim); 3将关联结果转化为DataStream通过Flink时间窗口处理结合RoaringBitmap进行聚合
DataStreamTuple6String, String, String, String, Timestamp, byte[] processedSource source// 筛选需要统计的维度country, prov, city, ymd.keyBy(0, 1, 2, 3)// 滚动时间窗口此处由于使用读取csv模拟输入流采用ProcessingTime实际使用中可使用EventTime.window(TumblingProcessingTimeWindows.of(Time.minutes(5)))// 触发器可以在窗口未结束时获取聚合结果.trigger(ContinuousProcessingTimeTrigger.of(Time.minutes(1))).aggregate(// 聚合函数根据key By筛选的维度进行聚合new AggregateFunctionTuple5String, String, String, String, Integer,RoaringBitmap,RoaringBitmap() {Overridepublic RoaringBitmap createAccumulator() {return new RoaringBitmap();}Overridepublic RoaringBitmap add(Tuple5String, String, String, String, Integer in,RoaringBitmap acc) {// 将32位的uid添加到RoaringBitmap进行去重acc.add(in.f4);return acc;}Overridepublic RoaringBitmap getResult(RoaringBitmap acc) {return acc;}Overridepublic RoaringBitmap merge(RoaringBitmap acc1, RoaringBitmap acc2) {return RoaringBitmap.or(acc1, acc2);}},//窗口函数输出聚合结果new WindowFunctionRoaringBitmap,Tuple6String, String, String, String, Timestamp, byte[],Tuple,TimeWindow() {Overridepublic void apply(Tuple keys,TimeWindow timeWindow,IterableRoaringBitmap iterable,CollectorTuple6String, String, String, String, Timestamp, byte[] out)throws Exception {RoaringBitmap result iterable.iterator().next();// 优化RoaringBitmapresult.runOptimize();// 将RoaringBitmap转化为字节数组以存入Holo中byte[] byteArray new byte[result.serializedSizeInBytes()];result.serialize(ByteBuffer.wrap(byteArray));// 其中 Tuple6.f4(Timestamp) 字段表示以窗口长度为周期进行统计以秒为单位out.collect(new Tuple6(keys.getField(0),keys.getField(1),keys.getField(2),keys.getField(3),new Timestamp(timeWindow.getEnd() / 1000 * 1000),byteArray));}}); 4写入结果表
需要注意的是Hologres中RoaringBitmap类型在Flink中对应Byte数组类型
// 计算结果转换为表
Table resTable tableEnv.fromDataStream(processedSource,$(country),$(prov),$(city),$(ymd),$(timest),$(uid32_bitmap));
// 创建Hologres结果表, 其中Hologres的RoaringBitmap类型通过Byte数组存入
String createHologresTable String.format(create table sink( country string, prov string, city string, ymd string, timetz timestamp, uid32_bitmap BYTES ) with ( connectorhologres, dbname %s, tablename %s, username %s, password %s, endpoint %s, connectionSize %s, mutatetype insertOrReplace ),database, dwsTableName, username, password, endpoint, connectionSize);
tableEnv.executeSql(createHologresTable);
// 写入计算结果到dws表
tableEnv.executeSql(insert into sink select * from resTable);
3.数据查询
查询时从基础聚合表(dws_app)中按照查询维度做聚合计算查询bitmap基数得出group by条件下的用户数
查询某天内各个城市的uv
--运行下面RB_AGG运算查询可执行参数先关闭三阶段聚合开关默认关闭,性能更好
set hg_experimental_enable_force_three_stage_aggoff SELECT country,prov,city,RB_CARDINALITY(RB_OR_AGG(uid32_bitmap)) AS uv
FROM dws_app
WHERE ymd 20210329
GROUP BY country,prov,city
; 查询某段时间内各个省份的uv
--运行下面RB_AGG运算查询可执行参数先关闭三阶段聚合开关默认关闭,性能更好
set hg_experimental_enable_force_three_stage_aggoff SELECT country,prov,RB_CARDINALITY(RB_OR_AGG(uid32_bitmap)) AS uv
FROM dws_app
WHERE time 2021-04-19 18:00:0008 and time 2021-04-19 19:00:0008
GROUP BY country,prov
;
原文链接 本文为阿里云原创内容未经允许不得转载。