wap网站推荐,网站建设程序开发,北京网站设计公司地址,专业团队图片高清系列文章目录
物流实时数仓#xff1a;采集通道搭建 物流实时数仓#xff1a;数仓搭建 物流实时数仓#xff1a;数仓搭建#xff08;DIM#xff09; 物流实时数仓#xff1a;数仓搭建#xff08;DWD#xff09;一 物流实时数仓#xff1a;数仓搭建#xff08;DWD采集通道搭建 物流实时数仓数仓搭建 物流实时数仓数仓搭建DIM 物流实时数仓数仓搭建DWD一 物流实时数仓数仓搭建DWD二 物流实时数仓数仓搭建DWS一 文章目录 系列文章目录前言一、代码编写1.文件创建1.主程序2.实体类3.自定义触发器4.自定义聚合函数5.在HbaseUtil中添加查询方法6.JedisUtil工具类7.封装DimUtil工具类使用旁路缓存优化查询维度8.修改实体类TmsConfigDimBean9.传递op10.获取线程池的工具类11.异步关联函数DimAsyncFunction12.ClickHouseUtil工具类13. DimSinkFunction14. TransientSink注解 2.主程序3.开窗聚合1.MyTriggerFunction2.MyAggregationFunction 4.关联维度信息1.DimAsyncFunction2.DimJoinFunction3.ThreadPoolUtil4.DimUtil5.JedisUtil6. HbaseUtil7. DwsBoundOrgSortDayBean8.补充维度字段9. MyBroadcastProcessFunction10. DimSinkFunction 5.写入CK1. ClickHouseUtil2.TransientSink 二、代码测试1.程序启动2.修改kafka分区3.ck建表1.建库2.建表3.物化视图4.查看结果 总结 前言
这次博客我们进行各机构分拣次数的统计。统计当日各机构的分拣次数并补充城市、省份等维度信息写入ClickHouse对应表。要求每十秒钟更新一次统计结果。
大体流程如图。 一、代码编写
1.文件创建
1.主程序 2.实体类 3.自定义触发器 4.自定义聚合函数 5.在HbaseUtil中添加查询方法 6.JedisUtil工具类 7.封装DimUtil工具类使用旁路缓存优化查询维度 8.修改实体类TmsConfigDimBean 9.传递op 10.获取线程池的工具类 11.异步关联函数DimAsyncFunction 12.ClickHouseUtil工具类
以上就是这次博客要更改或创建的java文件。
13. DimSinkFunction
当维度数据更新时删除redis中的对应数据。
14. TransientSink注解
某些字段不需要写入ClickHouse但对统计有帮助我们可以通过添加自定义注解在写出时获取字段的TransientSink注解通过判断是否注解是否为空在写出时忽略指定字段。
2.主程序
DwsBoundOrgSortDay需要完成的任务如以下流程图。
package com.atguigu.tms.realtime.app.dws;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.tms.realtime.app.func.DimAsyncFunction;
import com.atguigu.tms.realtime.app.func.MyAggregationFunction;
import com.atguigu.tms.realtime.app.func.MyTriggerFunction;
import com.atguigu.tms.realtime.beans.DwdBoundSortBean;
import com.atguigu.tms.realtime.beans.DwsBoundOrgSortDayBean;
import com.atguigu.tms.realtime.utils.*;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.util.concurrent.TimeUnit;public class DwsBoundOrgSortDay {public static void main(String[] args) throws Exception {// 环境准备StreamExecutionEnvironment env CreateEnvUtil.getStreamEnv(args);env.setParallelism(4);// kafka读取数据String topic tms_dwd_bound_sort;String groupId dws_tms_dwd_bound_sort;KafkaSourceString kafkaSource KafkaUtil.getKafkaSource(topic, groupId, args);SingleOutputStreamOperatorString kafkaStrDS env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), kafka_source).uid(kafka_source);// 对流中的数据进行类型转换 jsonStr- 实体类SingleOutputStreamOperatorDwsBoundOrgSortDayBean dwsBoundOrgSortDayBeanSingleOutputStreamOperator kafkaStrDS.map(new MapFunctionString, DwsBoundOrgSortDayBean() {Overridepublic DwsBoundOrgSortDayBean map(String jsonStr) throws Exception {DwdBoundSortBean dwdBoundSortBean JSON.parseObject(jsonStr, DwdBoundSortBean.class);return DwsBoundOrgSortDayBean.builder().orgId(dwdBoundSortBean.getOrgId()).sortCountBase(1L).ts(dwdBoundSortBean.getTs() 8 * 60 * 60 * 1000L).build();}});// 指定Watermark以及提取事件事件字段SingleOutputStreamOperatorDwsBoundOrgSortDayBean withWatermarkDS dwsBoundOrgSortDayBeanSingleOutputStreamOperator.assignTimestampsAndWatermarks(WatermarkStrategy.DwsBoundOrgSortDayBeanforMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssignerDwsBoundOrgSortDayBean() {Overridepublic long extractTimestamp(DwsBoundOrgSortDayBean boundOrgSortDayBean, long recordTimestamp) {return boundOrgSortDayBean.getTs();}}));// withWatermarkDS.print(###);// 按照机构id进行分组KeyedStreamDwsBoundOrgSortDayBean, String keyedDS withWatermarkDS.keyBy(DwsBoundOrgSortDayBean::getOrgId);// 开窗WindowedStreamDwsBoundOrgSortDayBean, String, TimeWindow windowDS keyedDS.window(TumblingEventTimeWindows.of(Time.days(1L)));// 指定自定义触发器WindowedStreamDwsBoundOrgSortDayBean, String, TimeWindow triggerDS windowDS.trigger(new MyTriggerFunction());// 聚合SingleOutputStreamOperatorDwsBoundOrgSortDayBean aggregateDS triggerDS.aggregate(new MyAggregationFunctionDwsBoundOrgSortDayBean() {Overridepublic DwsBoundOrgSortDayBean add(DwsBoundOrgSortDayBean value, DwsBoundOrgSortDayBean accumulator) {if (accumulator null) {return value;}accumulator.setSortCountBase(accumulator.getSortCountBase() 1);return accumulator;}},new ProcessWindowFunctionDwsBoundOrgSortDayBean, DwsBoundOrgSortDayBean, String, TimeWindow() {Overridepublic void process(String s, Context context, IterableDwsBoundOrgSortDayBean elements, CollectorDwsBoundOrgSortDayBean out) throws Exception {for (DwsBoundOrgSortDayBean element : elements) {// 获取窗口起始时间long stt context.window().getStart();// 将窗口时间左移8小时 并转换格式element.setCurDate(DateFormatUtil.toDate(stt - 8 * 60 * 60 * 1000L));element.setTs(System.currentTimeMillis());out.collect(element);}}});// 关联维度城市、省份// 关联机构维度 获取机构名称// 异步I/OSingleOutputStreamOperatorDwsBoundOrgSortDayBean withOrgNameDS AsyncDataStream.unorderedWait(aggregateDS,new DimAsyncFunctionDwsBoundOrgSortDayBean(dim_base_organ) {Overridepublic void join(DwsBoundOrgSortDayBean sortDayBean, JSONObject dimInfoJsonObj) {sortDayBean.setOrgName(dimInfoJsonObj.getString(org_name));String orgParentId dimInfoJsonObj.getString(org_parent_id);sortDayBean.setJoinOrgId(orgParentId ! null?orgParentId:sortDayBean.getOrgId());}Overridepublic Tuple2String, String getCondition(DwsBoundOrgSortDayBean sortDayBean) {return Tuple2.of(id, sortDayBean.getOrgId());}},60,TimeUnit.SECONDS);// 补充城市IDSingleOutputStreamOperatorDwsBoundOrgSortDayBean withCityIdDS AsyncDataStream.unorderedWait(withOrgNameDS,new DimAsyncFunctionDwsBoundOrgSortDayBean(dim_base_organ) {Overridepublic void join(DwsBoundOrgSortDayBean sortDayBean, JSONObject dimInfoJsonObj) {sortDayBean.setCityId(dimInfoJsonObj.getString(region_id));}Overridepublic Tuple2String, String getCondition(DwsBoundOrgSortDayBean sortDayBean) {return Tuple2.of(id, sortDayBean.getJoinOrgId());}},60,TimeUnit.SECONDS);// 关联地区维度表 根据城市的id获取城市名称以及当前城市所属的省份idSingleOutputStreamOperatorDwsBoundOrgSortDayBean withCityNameAndProvinceIdDS AsyncDataStream.unorderedWait(withCityIdDS,new DimAsyncFunctionDwsBoundOrgSortDayBean(dim_base_region_info) {Overridepublic void join(DwsBoundOrgSortDayBean sortDayBean, JSONObject dimInfoJsonObj) {sortDayBean.setCityName(dimInfoJsonObj.getString(name));sortDayBean.setProvinceId(dimInfoJsonObj.getString(parent_id));}Overridepublic Tuple2String, String getCondition(DwsBoundOrgSortDayBean sortDayBean) {return Tuple2.of(id, sortDayBean.getCityId());}},60, TimeUnit.SECONDS);// 关联地区维度表 根据省份的id获取省份的名称SingleOutputStreamOperatorDwsBoundOrgSortDayBean withProvinceDS AsyncDataStream.unorderedWait(withCityNameAndProvinceIdDS,new DimAsyncFunctionDwsBoundOrgSortDayBean(dim_base_region_info) {Overridepublic void join(DwsBoundOrgSortDayBean sortDayBean, JSONObject dimInfoJsonObj) {sortDayBean.setProvinceName(dimInfoJsonObj.getString(name));}Overridepublic Tuple2String, String getCondition(DwsBoundOrgSortDayBean sortDayBean) {return Tuple2.of(id, sortDayBean.getProvinceId());}},60, TimeUnit.SECONDS);withProvinceDS.print();// 将关联的结果写入ck中withProvinceDS.addSink(ClickHouseUtil.getJdbcSink(insert into dws_bound_org_sort_day_base values(?,?,?,?,?,?,?,?,?)));env.execute();}
}现在我们就按照主程序的调用来完成其他文件的编写。
3.开窗聚合
开窗之前没有用到新的函数所以不说了。
1.MyTriggerFunction
自定义触发器
package com.atguigu.tms.realtime.app.func;import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;//自定义触发器 每10s触发一次窗口计算
public class MyTriggerFunctionT extends TriggerT, TimeWindow {Overridepublic TriggerResult onElement(T element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {ValueStateDescriptorBoolean valueStateDescriptor new ValueStateDescriptorBoolean(isFirstState,Boolean.class);ValueStateBoolean isFirstState ctx.getPartitionedState(valueStateDescriptor);Boolean isFirst isFirstState.value();if(isFirst null){//如果是窗口中的第一个元素//将状态中的值进行更新isFirstState.update(true);//注册定时器 当前事件时间向下取整后 10s后执行ctx.registerEventTimeTimer(timestamp -timestamp%10000L 2000L);}else if(isFirst){isFirstState.update(false);}return TriggerResult.CONTINUE;}Overridepublic TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {return TriggerResult.CONTINUE;}//time 表示事件时间触发器 触发时间Overridepublic TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {long end window.getEnd();if(time end){if(time 2000L end){ctx.registerEventTimeTimer(time 2000L);}return TriggerResult.FIRE;}return TriggerResult.CONTINUE;}Overridepublic void clear(TimeWindow window, TriggerContext ctx) throws Exception {ctx.deleteEventTimeTimer(window.maxTimestamp());}
}2.MyAggregationFunction
自定义聚合函数
package com.atguigu.tms.realtime.app.func;import org.apache.flink.api.common.functions.AggregateFunction;public abstract class MyAggregationFunctionT implements AggregateFunctionT,T,T {Overridepublic T createAccumulator() {return null;}Overridepublic T getResult(T accumulator) {return accumulator;}Overridepublic T merge(T a, T b) {return null;}
}
4.关联维度信息
1.DimAsyncFunction
异步I/O
package com.atguigu.tms.realtime.app.func;import com.alibaba.fastjson.JSONObject;
import com.atguigu.tms.realtime.beans.DimJoinFunction;
import com.atguigu.tms.realtime.commom.TmsConfig;
import com.atguigu.tms.realtime.utils.DimUtil;
import com.atguigu.tms.realtime.utils.ThreadPoolUtil;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;import java.util.Collections;
import java.util.concurrent.ExecutorService;public abstract class DimAsyncFunctionT extends RichAsyncFunctionT, T implements DimJoinFunctionT {private String tableName;public DimAsyncFunction(String tableName) {this.tableName tableName;}private ExecutorService executorService;Overridepublic void open(Configuration parameters) throws Exception {executorService ThreadPoolUtil.getInstance();}Overridepublic void asyncInvoke(T obj, ResultFutureT resultFuture) throws Exception {// 从线程池中获取线程发送异步请求executorService.submit(new Runnable() {Overridepublic void run() {// 根据流中的对象获取要作为查询条件的主键或者外键Tuple2String, String keyNameAndValue getCondition(obj);// 根据查询条件获取维度对象JSONObject dimInfoJsonObj DimUtil.getDimInfo(TmsConfig.HBASE_NAMESPACE, tableName, keyNameAndValue);// 将维度对象的属性补充到流中的对象上if (dimInfoJsonObj ! null) {join(obj, dimInfoJsonObj);}// 向下游传递数据resultFuture.complete(Collections.singleton(obj));}});}
}2.DimJoinFunction
我们将需要DimAsyncFunction中一些需要子类实现的函数写入DimJoinFunction当作接口接入。
package com.atguigu.tms.realtime.beans;import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.java.tuple.Tuple2;public interface DimJoinFunctionT {void join(T obj, JSONObject dimInfoJsonObj);Tuple2String, String getCondition(T obj);
}
3.ThreadPoolUtil
异步I/O中用作创建线程池的工具类
package com.atguigu.tms.realtime.utils;import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;public class ThreadPoolUtil {private static volatile ThreadPoolExecutor poolExecutor;public static synchronized ThreadPoolExecutor getInstance() {if (poolExecutor null) {synchronized (ThreadPoolUtil.class){if (poolExecutor null) {poolExecutor new ThreadPoolExecutor(4,20,300,TimeUnit.SECONDS,new LinkedBlockingDequeRunnable(Integer.MAX_VALUE));}}}return poolExecutor;}
}4.DimUtil
在维度关联时我们需要从hbase中获取维度信息为了为了优化查询速度我们引入了redis流程如图所示
package com.atguigu.tms.realtime.utils;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.java.tuple.Tuple2;
import redis.clients.jedis.Jedis;
import lombok.extern.slf4j.Slf4j;Slf4j
public class DimUtil {public static JSONObject getDimInfo(String namespace, String tableName, Tuple2String, String nameAndValue) {// 获取的查询条件中的字段名以及字段值String keyName nameAndValue.f0;String keyValue nameAndValue.f1;// 拼接从Redis中查询数据的KeyString redisKey dim: tableName.toLowerCase() : keyName _ keyValue;// 操作Redis的客户端Jedis jedis null;// 用于存放从Redis查询的维度数据String dimJsonStr null;// 用于封装返回结果JSONObject dimJsonObj null;// 先从缓存中查询维度数据try {jedis JedisUtil.getJedis();dimJsonStr jedis.get(redisKey);if (StringUtils.isNotEmpty(dimJsonStr)) {// 如果在缓存中能够找到要查询的维度dimJsonObj JSON.parseObject(dimJsonStr);} else {// 如果在缓存中没有找到要查询的维度数据if (id.equals(keyName)) {dimJsonObj HbaseUtil.getRowByPrimaryKey(namespace, tableName, nameAndValue);} else {dimJsonObj HbaseUtil.getRowByForeignKey(namespace, tableName, nameAndValue);}if (dimJsonObj ! null jedis ! null) {jedis.setex(redisKey, 3600 * 24, dimJsonObj.toJSONString());}}} catch (Exception e) {log.error(从Redis中查询维度数据发生了一场, e);} finally {if (jedis ! null) {System.out.println(关闭客户端);jedis.close();}}return dimJsonObj;}// 从Redis中删除缓存的维度数据public static void delCached(String tableName, Tuple2String, String keyNameAndValue) {String keyName keyNameAndValue.f0;String keyValue keyNameAndValue.f1;String redisKey dim: tableName.toLowerCase() : keyName _ keyValue;Jedis jedis null;try {jedis JedisUtil.getJedis();jedis.decr(redisKey);}catch (Exception e){log.error(清除Redis中缓存的维度数据时发生了异常, e);}finally {if (jedis ! null) {jedis.close();}}}
}5.JedisUtil
用于连接reids的jedis客户端。 先在pom中引入依赖
dependencygroupIdredis.clients/groupIdartifactIdjedis/artifactIdversion3.3.0/version
/dependencypackage com.atguigu.tms.realtime.utils;import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;public class JedisUtil {private static JedisPool jedisPool;static {JedisPoolConfig poolConfig new JedisPoolConfig();poolConfig.setMaxTotal(1000);poolConfig.setMaxIdle(5);poolConfig.setMinIdle(5);poolConfig.setBlockWhenExhausted(true);poolConfig.setMaxWaitMillis(2000L);poolConfig.setTestOnBorrow(true);jedisPoolnew JedisPool(poolConfig,hadoop102,6379,10000);}public static Jedis getJedis(){System.out.println(创建Jedis客户端);Jedis jedis jedisPool.getResource();return jedis;}public static void main(String[] args) {Jedis jedis getJedis();String pong jedis.ping();System.out.println(pong);}
}
6. HbaseUtil
之前我们在HbaseUtil完成了创建表和插入操作现在来完成查询操作。
package com.atguigu.tms.realtime.utils;import com.alibaba.fastjson.JSONObject;
import com.atguigu.tms.realtime.commom.TmsConfig;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.util.Bytes;import java.io.IOException;public class HbaseUtil {private static Connection conn;static {try {Configuration conf HBaseConfiguration.create();conf.set(hbase.zookeeper.quorum, TmsConfig.hbase_zookeeper_quorum);conn ConnectionFactory.createConnection(conf);} catch (IOException e) {throw new RuntimeException(e);}}// 创建表public static void createTable(String nameSpace, String tableName, String... families) {Admin admin null;try {if (families.length 1) {System.out.println(至少需要一个列族);return;}admin conn.getAdmin();// 判断表是否存在if (admin.tableExists(TableName.valueOf(nameSpace, tableName))) {System.out.println(nameSpace : tableName 已存在);return;}TableDescriptorBuilder builder TableDescriptorBuilder.newBuilder(TableName.valueOf(nameSpace, tableName));// 指定列族for (String family : families) {ColumnFamilyDescriptor familyDescriptor ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(family)).build();builder.setColumnFamily(familyDescriptor);}admin.createTable(builder.build());} catch (IOException e) {throw new RuntimeException(e);} finally {if (admin ! null) {try {admin.close();} catch (IOException e) {throw new RuntimeException(e);}}}}// 向hbase插入对象public static void putPow(String namespace, String tableName, Put put) {BufferedMutator mutator null;try {BufferedMutatorParams params new BufferedMutatorParams(TableName.valueOf(namespace, tableName));params.writeBufferSize(5 * 1024 * 1024);params.setWriteBufferPeriodicFlushTimeoutMs(3000L);mutator conn.getBufferedMutator(params);mutator.mutate(put);} catch (IOException e) {throw new RuntimeException(e);} finally {if (mutator ! null) {try {mutator.close();} catch (IOException e) {throw new RuntimeException(e);}}}}// 根据主键从Hbase表中查询一行数据public static JSONObject getRowByPrimaryKey(String namespace, String tableName, Tuple2String, String rowKeyNameAndKey) {Table table null;JSONObject dimJsonObj null;String rowKeyName rowKeyNameAndKey.f0;String rowKeyValue rowKeyNameAndKey.f1;try {table conn.getTable(TableName.valueOf(namespace, tableName));Result result table.get(new Get(Bytes.toBytes(rowKeyValue)));Cell[] cells result.rawCells();if (cells.length 0) {dimJsonObj new JSONObject();dimJsonObj.put(rowKeyName, rowKeyValue);for (Cell cell : cells) {dimJsonObj.put(Bytes.toString(CellUtil.cloneQualifier(cell)), Bytes.toString(CellUtil.cloneValue(cell)));}} else {System.out.println(从Hbase表中没有找到对应的维度数据);}} catch (IOException e) {throw new RuntimeException(e);} finally {if (table ! null) {try {table.close();} catch (IOException e) {throw new RuntimeException(e);}}}return dimJsonObj;}//根据外键从hbase表中查询一行数据public static JSONObject getRowByForeignKey(String namespace, String tableName, Tuple2String, String foreignKeyNameAndKey) {Table table null;JSONObject dimJsonObj null;try {table conn.getTable(TableName.valueOf(namespace, tableName));Scan scan new Scan();String foreignKeyName foreignKeyNameAndKey.f0;String foreignKeyValue foreignKeyNameAndKey.f1;SingleColumnValueFilter singleColumnValueFilter new SingleColumnValueFilter(Bytes.toBytes(info),Bytes.toBytes(foreignKeyName), CompareOperator.EQUAL,Bytes.toBytes(foreignKeyValue));singleColumnValueFilter.setFilterIfMissing(true);scan.setFilter(singleColumnValueFilter);ResultScanner scanner table.getScanner(scan);Result result scanner.next();if (result!null){Cell[] cells result.rawCells();if (cells.length 0) {dimJsonObj new JSONObject();dimJsonObj.put(id, Bytes.toString(result.getRow()));for (Cell cell : cells) {dimJsonObj.put(Bytes.toString(CellUtil.cloneQualifier(cell)), Bytes.toString(CellUtil.cloneValue(cell)));}} else {System.out.println(从Hbase表中没有找到对应的维度数据);}}} catch (IOException e) {throw new RuntimeException(e);} finally {if (table ! null) {try {table.close();} catch (IOException e) {throw new RuntimeException(e);}}}return dimJsonObj;}
}
7. DwsBoundOrgSortDayBean
自定义的工具类其中包含我们要写入ck的字段
package com.atguigu.tms.realtime.beans;import lombok.Builder;
import lombok.Data;Data
Builder
public class DwsBoundOrgSortDayBean {// 统计日期String curDate;// 机构 IDString orgId;// 机构名称String orgName;// 用于关联获取省份信息的机构 IDTransientSinkString joinOrgId;// 城市 IDString cityId;// 城市名称String cityName;// 省份 IDString provinceId;// 省份名称String provinceName;// 分拣次数Long sortCountBase;// 时间戳Long ts;
}
8.补充维度字段
我代码编写我们需要维度表的外键字段所以我们重新修改mysql维度表添加外键字段。
DROP TABLE IF EXISTS tms_config_dim;
CREATE TABLE tms_config_dim (source_table varchar(200) NOT NULL COMMENT 数据源表,sink_table varchar(200) DEFAULT NULL COMMENT 目标表,sink_family varchar(200) DEFAULT NULL COMMENT 目标表列族,sink_columns varchar(200) DEFAULT NULL COMMENT 目标表列,sink_pk varchar(256) DEFAULT NULL COMMENT 主键字段,foreign_keys varchar(256) DEFAULT NULL COMMENT 外键查询字段,PRIMARY KEY (source_table)
) ENGINEInnoDB DEFAULT CHARSETutf8 COMMENT物流实时配置表;然后从新导入数据。
然后我们使用dimapp同步一下数据即可具体方法可看Dim层搭建。
9. MyBroadcastProcessFunction
我们之前在DIM层的搭建中在MyBroadcastProcessFunction的processElement函数中过滤掉了外键但现在需要他我们把它加上。 在传递前添加一段代码
// 清除Redis缓存的准备工作传递操作类型、外键字段的k-v
String op jsonObj.getString(op);
if (u.equals(op)) {afterJsonObj.put(op, op);// 从配置表中获取当前维度表关联的外键名String foreignKeys tmsConfigDimBean.getForeignKeys();// 定义个json对象用于存储当前维度表对应的外键名以及值JSONObject foreignjsonObj new JSONObject();if (StringUtils.isNotEmpty(foreignKeys)) {String[] foreignNameArr foreignKeys.split(,);for (String foreignName : foreignNameArr) {// 获取修改前的数据JSONObject before jsonObj.getJSONObject(before);String foreignKeyBefore before.getString(foreignName);String foreignKeyAfter afterJsonObj.getString(foreignName);if (!foreignKeyBefore.equals(foreignKeyAfter)) {// 如果修改的是外键foreignjsonObj.put(foreignName, foreignKeyBefore);}else {foreignjsonObj.put(foreignName, foreignKeyBefore);}}}afterJsonObj.put(foreign_key, foreignjsonObj);
}完成代码
package com.atguigu.tms.realtime.app.func;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.tms.realtime.beans.TmsConfigDimBean;
import com.atguigu.tms.realtime.utils.DateFormatUtil;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;import java.sql.*;
import java.util.*;// 自定义类 完成主流和广播流的处理
public class MyBroadcastProcessFunction extends BroadcastProcessFunctionJSONObject, String, JSONObject {private MapStateDescriptorString, TmsConfigDimBean mapStateDescriptor;private MapString, TmsConfigDimBean configMap new HashMap();private String username;private String password;public MyBroadcastProcessFunction(MapStateDescriptorString, TmsConfigDimBean mapStateDescriptor, String[] args) {this.mapStateDescriptor mapStateDescriptor;ParameterTool parameterTool ParameterTool.fromArgs(args);this.username parameterTool.get(mysql-username, root);this.password parameterTool.get(mysql-password, 000000);}Overridepublic void open(Configuration parameters) throws Exception {// 将配置表中的数据进行预加载-JDBCClass.forName(com.mysql.cj.jdbc.Driver);String url jdbc:mysql://hadoop102:3306/tms_config?useSSLfalseuseUnicodetrue user username password password charsetutf8TimeZoneAsia/Shanghai;Connection conn DriverManager.getConnection(url);PreparedStatement ps conn.prepareStatement(select * from tms_config.tms_config_dim);ResultSet rs ps.executeQuery();ResultSetMetaData metaData rs.getMetaData();while (rs.next()) {JSONObject jsonObj new JSONObject();for (int i 1; i metaData.getColumnCount(); i) {String columnName metaData.getColumnName(i);Object columValue rs.getObject(i);jsonObj.put(columnName, columValue);}TmsConfigDimBean tmsConfigDimBean jsonObj.toJavaObject(TmsConfigDimBean.class);configMap.put(tmsConfigDimBean.getSourceTable(), tmsConfigDimBean);}rs.close();ps.close();conn.close();super.open(parameters);}Overridepublic void processElement(JSONObject jsonObj, BroadcastProcessFunctionJSONObject, String, JSONObject.ReadOnlyContext ctx, CollectorJSONObject out) throws Exception {// 获取操作的业务数据库的表名String table jsonObj.getString(table);// 获取广播状态ReadOnlyBroadcastStateString, TmsConfigDimBean broadcastState ctx.getBroadcastState(mapStateDescriptor);// 根据操作的业务数据库的表名 到广播状态中获取对应的配置信息TmsConfigDimBean tmsConfigDimBean;if ((tmsConfigDimBean broadcastState.get(table)) ! null || (tmsConfigDimBean configMap.get(table)) ! null) {// 如果对应的配置信息不为空 是维度信息// 获取after对象对应的是影响的业务数据表中的一条记录JSONObject afterJsonObj jsonObj.getJSONObject(after);// 数据脱敏switch (table) {// 员工表信息脱敏case employee_info:String empPassword afterJsonObj.getString(password);String empRealName afterJsonObj.getString(real_name);String idCard afterJsonObj.getString(id_card);String phone afterJsonObj.getString(phone);// 脱敏empPassword DigestUtils.md5Hex(empPassword);empRealName empRealName.charAt(0) empRealName.substring(1).replaceAll(., \\*);//知道有这个操作 idCard是随机生成的和标准的格式不一样 所以这里注释掉// idCard idCard.matches((^[1-9]\\d{5}(18|19|([23]\\d))\\d{2}((0[1-9])|(10|11|12))(([0-2][1-9])|10|20|30|31)\\d{3}[0-9Xx]$)|(^[1-9]\\d{5}\\d{2}((0[1-9])|(10|11|12))(([0-2][1-9])|10|20|30|31)\\d{2}$))// ? DigestUtils.md5Hex(idCard) : null;phone phone.matches(^(13[0-9]|14[01456879]|15[0-35-9]|16[2567]|17[0-8]|18[0-9]|19[0-35-9])\\d{8}$)? DigestUtils.md5Hex(phone) : null;afterJsonObj.put(password, empPassword);afterJsonObj.put(real_name, empRealName);afterJsonObj.put(id_card, idCard);afterJsonObj.put(phone, phone);break;// 快递员信息脱敏case express_courier:String workingPhone afterJsonObj.getString(working_phone);workingPhone workingPhone.matches(^(13[0-9]|14[01456879]|15[0-35-9]|16[2567]|17[0-8]|18[0-9]|19[0-35-9])\\d{8}$)? DigestUtils.md5Hex(workingPhone) : null;afterJsonObj.put(working_phone, workingPhone);break;// 卡车司机信息脱敏case truck_driver:String licenseNo afterJsonObj.getString(license_no);licenseNo DigestUtils.md5Hex(licenseNo);afterJsonObj.put(license_no, licenseNo);break;// 卡车信息脱敏case truck_info:String truckNo afterJsonObj.getString(truck_no);String deviceGpsId afterJsonObj.getString(device_gps_id);String engineNo afterJsonObj.getString(engine_no);truckNo DigestUtils.md5Hex(truckNo);deviceGpsId DigestUtils.md5Hex(deviceGpsId);engineNo DigestUtils.md5Hex(engineNo);afterJsonObj.put(truck_no, truckNo);afterJsonObj.put(device_gps_id, deviceGpsId);afterJsonObj.put(engine_no, engineNo);break;// 卡车型号信息脱敏case truck_model:String modelNo afterJsonObj.getString(model_no);modelNo DigestUtils.md5Hex(modelNo);afterJsonObj.put(model_no, modelNo);break;// 用户地址信息脱敏case user_address:String addressPhone afterJsonObj.getString(phone);addressPhone addressPhone.matches(^(13[0-9]|14[01456879]|15[0-35-9]|16[2567]|17[0-8]|18[0-9]|19[0-35-9])\\d{8}$)? DigestUtils.md5Hex(addressPhone) : null;afterJsonObj.put(phone, addressPhone);break;// 用户信息脱敏case user_info:String passwd afterJsonObj.getString(passwd);String realName afterJsonObj.getString(real_name);String phoneNum afterJsonObj.getString(phone_num);String email afterJsonObj.getString(email);// 脱敏passwd DigestUtils.md5Hex(passwd);if (StringUtils.isNotEmpty(realName)) {realName DigestUtils.md5Hex(realName);afterJsonObj.put(real_name, realName);}phoneNum phoneNum.matches(^(13[0-9]|14[01456879]|15[0-35-9]|16[2567]|17[0-8]|18[0-9]|19[0-35-9])\\d{8}$)? DigestUtils.md5Hex(phoneNum) : null;email email.matches(^[a-zA-Z0-9_-][a-zA-Z0-9_-](\\.[a-zA-Z0-9_-])$)? DigestUtils.md5Hex(email) : null;afterJsonObj.put(birthday, DateFormatUtil.toDate(afterJsonObj.getInteger(birthday) * 24 * 60 * 60 * 1000L));afterJsonObj.put(passwd, passwd);afterJsonObj.put(phone_num, phoneNum);afterJsonObj.put(email, email);break;}// 过滤不需要的维度属性String sinkColumns tmsConfigDimBean.getSinkColumns();filterColum(afterJsonObj, sinkColumns);// 补充输出目的的表名String sinkTable tmsConfigDimBean.getSinkTable();afterJsonObj.put(sink_table, sinkTable);// 补充rowKeyString sinkPk tmsConfigDimBean.getSinkPk();afterJsonObj.put(sink_pk, sinkPk);// 清除Redis缓存的准备工作传递操作类型、外键字段的k-vString op jsonObj.getString(op);if (u.equals(op)) {afterJsonObj.put(op, op);// 从配置表中获取当前维度表关联的外键名String foreignKeys tmsConfigDimBean.getForeignKeys();// 定义个json对象用于存储当前维度表对应的外键名以及值JSONObject foreignjsonObj new JSONObject();if (StringUtils.isNotEmpty(foreignKeys)) {String[] foreignNameArr foreignKeys.split(,);for (String foreignName : foreignNameArr) {// 获取修改前的数据JSONObject before jsonObj.getJSONObject(before);String foreignKeyBefore before.getString(foreignName);String foreignKeyAfter afterJsonObj.getString(foreignName);if (!foreignKeyBefore.equals(foreignKeyAfter)) {// 如果修改的是外键foreignjsonObj.put(foreignName, foreignKeyBefore);}else {foreignjsonObj.put(foreignName, foreignKeyBefore);}}}afterJsonObj.put(foreign_key, foreignjsonObj);}// 将维度数据传递out.collect(afterJsonObj);}}private void filterColum(JSONObject afterJsonObj, String sinkColumns) {String[] fieldArr sinkColumns.split(,);ListString fieldList Arrays.asList(fieldArr);SetMap.EntryString, Object entrySet afterJsonObj.entrySet();entrySet.removeIf(entry - !fieldList.contains(entry.getKey()));}Overridepublic void processBroadcastElement(String jsonStr, BroadcastProcessFunctionJSONObject, String, JSONObject.Context ctx, CollectorJSONObject out) throws Exception {JSONObject jsonObj JSON.parseObject(jsonStr);// 获取广播状态BroadcastStateString, TmsConfigDimBean broadcastState ctx.getBroadcastState(mapStateDescriptor);// 获取对配置表的操作类型String op jsonObj.getString(op);if (d.equals(op)) {String sourceTable jsonObj.getJSONObject(before).getString(source_table);broadcastState.remove(sourceTable);configMap.remove(sourceTable);} else {TmsConfigDimBean configDimBean jsonObj.getObject(after, TmsConfigDimBean.class);String sourceTable configDimBean.getSourceTable();broadcastState.put(sourceTable, configDimBean);configMap.put(sourceTable, configDimBean);}}
}
10. DimSinkFunction
添加清除代码
// 如果维度数据发生了变化将Redis中缓存的维度数据清空掉if (u.equals(op)) {// 删除当前维度数据在Redis中对应主键的缓存DimUtil.delCached(sinkTable, Tuple2.of(id, jsonObj.getString(id)));// 删除当前维度数据在Redis中对应外键的缓存SetMap.EntryString, Object set foreignKeyJsonObj.entrySet();for (Map.EntryString, Object entry : set) {DimUtil.delCached(sinkTable, Tuple2.of(entry.getKey(), entry.getValue().toString()));}}完整代码
package com.atguigu.tms.realtime.app.func;import com.alibaba.fastjson.JSONObject;
import com.atguigu.tms.realtime.commom.TmsConfig;
import com.atguigu.tms.realtime.utils.DimUtil;
import com.atguigu.tms.realtime.utils.HbaseUtil;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.hadoop.hbase.client.Put;import java.util.Map;
import java.util.Set;public class DimSinkFunction implements SinkFunctionJSONObject {public void invoke(JSONObject jsonObj, Context context) throws Exception {// 获取输出目的地表名和rowKeyString sinkTable jsonObj.getString(sink_table);String sinkPk jsonObj.getString(sink_pk);jsonObj.remove(sink_table);jsonObj.remove(sink_pk);String op jsonObj.getString(op);jsonObj.remove(op);JSONObject foreignKeyJsonObj jsonObj.getJSONObject(foreign_key);jsonObj.remove(foreign_key);// 获取json中的每一个键值对SetMap.EntryString, Object entrySet jsonObj.entrySet();Put put new Put(jsonObj.getString(sinkPk).getBytes());for (Map.EntryString, Object entry : entrySet) {if (!sinkPk.equals(entry.getKey())) {put.addColumn(info.getBytes(), entry.getKey().getBytes(), entry.getValue().toString().getBytes());}}System.out.println(向hbase表中插入数据);HbaseUtil.putPow(TmsConfig.HBASE_NAMESPACE, sinkTable, put);// 如果维度数据发生了变化将Redis中缓存的维度数据清空掉if (u.equals(op)) {// 删除当前维度数据在Redis中对应主键的缓存DimUtil.delCached(sinkTable, Tuple2.of(id, jsonObj.getString(id)));// 删除当前维度数据在Redis中对应外键的缓存SetMap.EntryString, Object set foreignKeyJsonObj.entrySet();for (Map.EntryString, Object entry : set) {DimUtil.delCached(sinkTable, Tuple2.of(entry.getKey(), entry.getValue().toString()));}}}
}5.写入CK
1. ClickHouseUtil
先导入需要的依赖。
dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-jdbc/artifactIdversion3.1.0-1.17/version
/dependencydependencygroupIdru.yandex.clickhouse/groupIdartifactIdclickhouse-jdbc/artifactIdversion0.3.2/versionexclusionsexclusiongroupIdcom.fasterxml.jackson.core/groupIdartifactIdjackson-databind/artifactId/exclusionexclusiongroupIdcom.fasterxml.jackson.core/groupIdartifactIdjackson-core/artifactId/exclusion/exclusions
/dependencyClickHouseUtil
package com.atguigu.tms.realtime.utils;import com.atguigu.tms.realtime.beans.TransientSink;
import com.atguigu.tms.realtime.commom.TmsConfig;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;import java.lang.reflect.Field;
import java.sql.PreparedStatement;
import java.sql.SQLException;public class ClickHouseUtil {// 获取SinkFunctionpublic static T SinkFunctionT getJdbcSink(String sql) {SinkFunctionT sinkFunction JdbcSink.Tsink(sql,new JdbcStatementBuilderT() {Overridepublic void accept(PreparedStatement ps, T obj) throws SQLException {// 将流中对象的属性给问号占位符赋值// 获取单签流中对象岁数的类型 以及类中的属性Field[] fieldsArr obj.getClass().getDeclaredFields();// 遍历所有属性int skipNum 0;for (int i 0; i fieldsArr.length; i) {Field field fieldsArr[i];// 判断当前属性是否需要向流中保存TransientSink transientSink field.getAnnotation(TransientSink.class);if (transientSink ! null) {skipNum;continue;}// 设置私有属性的访问权限field.setAccessible(true);try {Object fieldValue field.get(obj);ps.setObject(i 1 - skipNum, fieldValue);} catch (IllegalAccessException e) {throw new RuntimeException(e);}}}},new JdbcExecutionOptions.Builder().withBatchSize(5000).withBatchIntervalMs(3000L).build(),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withDriverName(TmsConfig.CLICKHOUSE_DRIVER).withUrl(TmsConfig.CLICKHOUSE_URL).build());return sinkFunction;}
}
2.TransientSink
package com.atguigu.tms.realtime.beans;// 自定义主键 用于标记不需要向ck中保存的属性import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;Target(ElementType.FIELD)
Retention(RetentionPolicy.RUNTIME)
public interface TransientSink {
}
二、代码测试
1.程序启动
根据代码逻辑我们需要启动以下程序。 hdfs、zk、kafka、hbase、redise、ck、OdsApp、DwdBoundRelevantApp、DimApp和DwsBoundOrgSortDay。其中DimApp只需启动一次完成维度数据更新即可。
2.修改kafka分区
再从kafka读取数据时应该保证kafka有4个分区不然聚合无法成功。
kafka-topics.sh --bootstrap-server hadoop102:9092 --alter --topic tms_dwd_bound_sort --partitions 43.ck建表
1.建库
DROP DATABASE IF EXISTS tms_realtime;
CREATE DATABASE IF NOT EXISTS tms_realtime;
USE tms_realtime;2.建表
DROP TABLE IF EXISTS dws_bound_org_sort_day_base;
CREATE TABLE IF NOT EXISTS dws_bound_org_sort_day_base
(cur_date Date COMMENT 统计日期,org_id String COMMENT 机构ID,org_name String COMMENT 机构名称,city_id String COMMENT 城市ID,city_name String COMMENT 城市名称,province_id String COMMENT 省份ID,province_name String COMMENT 省份名称,sort_count_base UInt64 COMMENT 分拣次数,ts UInt64 COMMENT 时间戳
)
ENGINE MergeTree
ORDER BY (cur_date, org_id, org_name, city_id, city_name, province_id, province_name);3.物化视图
DROP VIEW IF EXISTS dws_bound_org_sort_day;
CREATE MATERIALIZED VIEW IF NOT EXISTS dws_bound_org_sort_day
(cur_date Date, org_id String, org_name String, city_id String, city_name String, province_id String, province_name String, sort_count AggregateFunction(argMax, UInt64, UInt64)
)
ENGINE AggregatingMergeTree()
ORDER BY (cur_date, org_id, org_name, city_id, city_name, province_id, province_name)
SETTINGS index_granularity 8192 AS
SELECT cur_date, org_id, org_name, city_id, city_name, province_id, province_name, argMaxState(sort_count_base, ts) AS sort_count
FROM dws_bound_org_sort_day_base
GROUP BY cur_date, org_id, org_name, city_id, city_name, province_id, province_name;4.查看结果
当运行程序后开始生成数据等待执行完成之后可以在ck中使用如下代码查看。
clickhouse-client -m-m 参数代表可以使用回车。
SELECTcur_date,org_id,org_name,city_id,city_name,province_id,province_name,argMaxMerge(sort_count) AS sort_count
FROM dws_bound_org_sort_day
GROUP BYcur_date,org_id,org_name,city_id,city_name,province_id,province_name
LIMIT 10;总结
至此Dws的部分搭建就结束了为了方便进行文件管理我把项目开源到了github上。 项目地址https://github.com/lcc-666/tms-parent