当前位置: 首页 > news >正文

wap网站推荐网站建设程序开发

wap网站推荐,网站建设程序开发,北京网站设计公司地址,专业团队图片高清系列文章目录 物流实时数仓#xff1a;采集通道搭建 物流实时数仓#xff1a;数仓搭建 物流实时数仓#xff1a;数仓搭建#xff08;DIM#xff09; 物流实时数仓#xff1a;数仓搭建#xff08;DWD#xff09;一 物流实时数仓#xff1a;数仓搭建#xff08;DWD采集通道搭建 物流实时数仓数仓搭建 物流实时数仓数仓搭建DIM 物流实时数仓数仓搭建DWD一 物流实时数仓数仓搭建DWD二 物流实时数仓数仓搭建DWS一 文章目录 系列文章目录前言一、代码编写1.文件创建1.主程序2.实体类3.自定义触发器4.自定义聚合函数5.在HbaseUtil中添加查询方法6.JedisUtil工具类7.封装DimUtil工具类使用旁路缓存优化查询维度8.修改实体类TmsConfigDimBean9.传递op10.获取线程池的工具类11.异步关联函数DimAsyncFunction12.ClickHouseUtil工具类13. DimSinkFunction14. TransientSink注解 2.主程序3.开窗聚合1.MyTriggerFunction2.MyAggregationFunction 4.关联维度信息1.DimAsyncFunction2.DimJoinFunction3.ThreadPoolUtil4.DimUtil5.JedisUtil6. HbaseUtil7. DwsBoundOrgSortDayBean8.补充维度字段9. MyBroadcastProcessFunction10. DimSinkFunction 5.写入CK1. ClickHouseUtil2.TransientSink 二、代码测试1.程序启动2.修改kafka分区3.ck建表1.建库2.建表3.物化视图4.查看结果 总结 前言 这次博客我们进行各机构分拣次数的统计。统计当日各机构的分拣次数并补充城市、省份等维度信息写入ClickHouse对应表。要求每十秒钟更新一次统计结果。 大体流程如图。 一、代码编写 1.文件创建 1.主程序 2.实体类 3.自定义触发器 4.自定义聚合函数 5.在HbaseUtil中添加查询方法 6.JedisUtil工具类 7.封装DimUtil工具类使用旁路缓存优化查询维度 8.修改实体类TmsConfigDimBean 9.传递op 10.获取线程池的工具类 11.异步关联函数DimAsyncFunction 12.ClickHouseUtil工具类 以上就是这次博客要更改或创建的java文件。 13. DimSinkFunction 当维度数据更新时删除redis中的对应数据。 14. TransientSink注解 某些字段不需要写入ClickHouse但对统计有帮助我们可以通过添加自定义注解在写出时获取字段的TransientSink注解通过判断是否注解是否为空在写出时忽略指定字段。 2.主程序 DwsBoundOrgSortDay需要完成的任务如以下流程图。 package com.atguigu.tms.realtime.app.dws;import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.atguigu.tms.realtime.app.func.DimAsyncFunction; import com.atguigu.tms.realtime.app.func.MyAggregationFunction; import com.atguigu.tms.realtime.app.func.MyTriggerFunction; import com.atguigu.tms.realtime.beans.DwdBoundSortBean; import com.atguigu.tms.realtime.beans.DwsBoundOrgSortDayBean; import com.atguigu.tms.realtime.utils.*; import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.streaming.api.datastream.AsyncDataStream; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.WindowedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector;import java.util.concurrent.TimeUnit;public class DwsBoundOrgSortDay {public static void main(String[] args) throws Exception {// 环境准备StreamExecutionEnvironment env CreateEnvUtil.getStreamEnv(args);env.setParallelism(4);// kafka读取数据String topic tms_dwd_bound_sort;String groupId dws_tms_dwd_bound_sort;KafkaSourceString kafkaSource KafkaUtil.getKafkaSource(topic, groupId, args);SingleOutputStreamOperatorString kafkaStrDS env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), kafka_source).uid(kafka_source);// 对流中的数据进行类型转换 jsonStr- 实体类SingleOutputStreamOperatorDwsBoundOrgSortDayBean dwsBoundOrgSortDayBeanSingleOutputStreamOperator kafkaStrDS.map(new MapFunctionString, DwsBoundOrgSortDayBean() {Overridepublic DwsBoundOrgSortDayBean map(String jsonStr) throws Exception {DwdBoundSortBean dwdBoundSortBean JSON.parseObject(jsonStr, DwdBoundSortBean.class);return DwsBoundOrgSortDayBean.builder().orgId(dwdBoundSortBean.getOrgId()).sortCountBase(1L).ts(dwdBoundSortBean.getTs() 8 * 60 * 60 * 1000L).build();}});// 指定Watermark以及提取事件事件字段SingleOutputStreamOperatorDwsBoundOrgSortDayBean withWatermarkDS dwsBoundOrgSortDayBeanSingleOutputStreamOperator.assignTimestampsAndWatermarks(WatermarkStrategy.DwsBoundOrgSortDayBeanforMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssignerDwsBoundOrgSortDayBean() {Overridepublic long extractTimestamp(DwsBoundOrgSortDayBean boundOrgSortDayBean, long recordTimestamp) {return boundOrgSortDayBean.getTs();}}));// withWatermarkDS.print(###);// 按照机构id进行分组KeyedStreamDwsBoundOrgSortDayBean, String keyedDS withWatermarkDS.keyBy(DwsBoundOrgSortDayBean::getOrgId);// 开窗WindowedStreamDwsBoundOrgSortDayBean, String, TimeWindow windowDS keyedDS.window(TumblingEventTimeWindows.of(Time.days(1L)));// 指定自定义触发器WindowedStreamDwsBoundOrgSortDayBean, String, TimeWindow triggerDS windowDS.trigger(new MyTriggerFunction());// 聚合SingleOutputStreamOperatorDwsBoundOrgSortDayBean aggregateDS triggerDS.aggregate(new MyAggregationFunctionDwsBoundOrgSortDayBean() {Overridepublic DwsBoundOrgSortDayBean add(DwsBoundOrgSortDayBean value, DwsBoundOrgSortDayBean accumulator) {if (accumulator null) {return value;}accumulator.setSortCountBase(accumulator.getSortCountBase() 1);return accumulator;}},new ProcessWindowFunctionDwsBoundOrgSortDayBean, DwsBoundOrgSortDayBean, String, TimeWindow() {Overridepublic void process(String s, Context context, IterableDwsBoundOrgSortDayBean elements, CollectorDwsBoundOrgSortDayBean out) throws Exception {for (DwsBoundOrgSortDayBean element : elements) {// 获取窗口起始时间long stt context.window().getStart();// 将窗口时间左移8小时 并转换格式element.setCurDate(DateFormatUtil.toDate(stt - 8 * 60 * 60 * 1000L));element.setTs(System.currentTimeMillis());out.collect(element);}}});// 关联维度城市、省份// 关联机构维度 获取机构名称// 异步I/OSingleOutputStreamOperatorDwsBoundOrgSortDayBean withOrgNameDS AsyncDataStream.unorderedWait(aggregateDS,new DimAsyncFunctionDwsBoundOrgSortDayBean(dim_base_organ) {Overridepublic void join(DwsBoundOrgSortDayBean sortDayBean, JSONObject dimInfoJsonObj) {sortDayBean.setOrgName(dimInfoJsonObj.getString(org_name));String orgParentId dimInfoJsonObj.getString(org_parent_id);sortDayBean.setJoinOrgId(orgParentId ! null?orgParentId:sortDayBean.getOrgId());}Overridepublic Tuple2String, String getCondition(DwsBoundOrgSortDayBean sortDayBean) {return Tuple2.of(id, sortDayBean.getOrgId());}},60,TimeUnit.SECONDS);// 补充城市IDSingleOutputStreamOperatorDwsBoundOrgSortDayBean withCityIdDS AsyncDataStream.unorderedWait(withOrgNameDS,new DimAsyncFunctionDwsBoundOrgSortDayBean(dim_base_organ) {Overridepublic void join(DwsBoundOrgSortDayBean sortDayBean, JSONObject dimInfoJsonObj) {sortDayBean.setCityId(dimInfoJsonObj.getString(region_id));}Overridepublic Tuple2String, String getCondition(DwsBoundOrgSortDayBean sortDayBean) {return Tuple2.of(id, sortDayBean.getJoinOrgId());}},60,TimeUnit.SECONDS);// 关联地区维度表 根据城市的id获取城市名称以及当前城市所属的省份idSingleOutputStreamOperatorDwsBoundOrgSortDayBean withCityNameAndProvinceIdDS AsyncDataStream.unorderedWait(withCityIdDS,new DimAsyncFunctionDwsBoundOrgSortDayBean(dim_base_region_info) {Overridepublic void join(DwsBoundOrgSortDayBean sortDayBean, JSONObject dimInfoJsonObj) {sortDayBean.setCityName(dimInfoJsonObj.getString(name));sortDayBean.setProvinceId(dimInfoJsonObj.getString(parent_id));}Overridepublic Tuple2String, String getCondition(DwsBoundOrgSortDayBean sortDayBean) {return Tuple2.of(id, sortDayBean.getCityId());}},60, TimeUnit.SECONDS);// 关联地区维度表 根据省份的id获取省份的名称SingleOutputStreamOperatorDwsBoundOrgSortDayBean withProvinceDS AsyncDataStream.unorderedWait(withCityNameAndProvinceIdDS,new DimAsyncFunctionDwsBoundOrgSortDayBean(dim_base_region_info) {Overridepublic void join(DwsBoundOrgSortDayBean sortDayBean, JSONObject dimInfoJsonObj) {sortDayBean.setProvinceName(dimInfoJsonObj.getString(name));}Overridepublic Tuple2String, String getCondition(DwsBoundOrgSortDayBean sortDayBean) {return Tuple2.of(id, sortDayBean.getProvinceId());}},60, TimeUnit.SECONDS);withProvinceDS.print();// 将关联的结果写入ck中withProvinceDS.addSink(ClickHouseUtil.getJdbcSink(insert into dws_bound_org_sort_day_base values(?,?,?,?,?,?,?,?,?)));env.execute();} }现在我们就按照主程序的调用来完成其他文件的编写。 3.开窗聚合 开窗之前没有用到新的函数所以不说了。 1.MyTriggerFunction 自定义触发器 package com.atguigu.tms.realtime.app.func;import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.streaming.api.windowing.triggers.Trigger; import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; import org.apache.flink.streaming.api.windowing.windows.TimeWindow;//自定义触发器 每10s触发一次窗口计算 public class MyTriggerFunctionT extends TriggerT, TimeWindow {Overridepublic TriggerResult onElement(T element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {ValueStateDescriptorBoolean valueStateDescriptor new ValueStateDescriptorBoolean(isFirstState,Boolean.class);ValueStateBoolean isFirstState ctx.getPartitionedState(valueStateDescriptor);Boolean isFirst isFirstState.value();if(isFirst null){//如果是窗口中的第一个元素//将状态中的值进行更新isFirstState.update(true);//注册定时器 当前事件时间向下取整后 10s后执行ctx.registerEventTimeTimer(timestamp -timestamp%10000L 2000L);}else if(isFirst){isFirstState.update(false);}return TriggerResult.CONTINUE;}Overridepublic TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {return TriggerResult.CONTINUE;}//time 表示事件时间触发器 触发时间Overridepublic TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {long end window.getEnd();if(time end){if(time 2000L end){ctx.registerEventTimeTimer(time 2000L);}return TriggerResult.FIRE;}return TriggerResult.CONTINUE;}Overridepublic void clear(TimeWindow window, TriggerContext ctx) throws Exception {ctx.deleteEventTimeTimer(window.maxTimestamp());} }2.MyAggregationFunction 自定义聚合函数 package com.atguigu.tms.realtime.app.func;import org.apache.flink.api.common.functions.AggregateFunction;public abstract class MyAggregationFunctionT implements AggregateFunctionT,T,T {Overridepublic T createAccumulator() {return null;}Overridepublic T getResult(T accumulator) {return accumulator;}Overridepublic T merge(T a, T b) {return null;} } 4.关联维度信息 1.DimAsyncFunction 异步I/O package com.atguigu.tms.realtime.app.func;import com.alibaba.fastjson.JSONObject; import com.atguigu.tms.realtime.beans.DimJoinFunction; import com.atguigu.tms.realtime.commom.TmsConfig; import com.atguigu.tms.realtime.utils.DimUtil; import com.atguigu.tms.realtime.utils.ThreadPoolUtil; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.async.ResultFuture; import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;import java.util.Collections; import java.util.concurrent.ExecutorService;public abstract class DimAsyncFunctionT extends RichAsyncFunctionT, T implements DimJoinFunctionT {private String tableName;public DimAsyncFunction(String tableName) {this.tableName tableName;}private ExecutorService executorService;Overridepublic void open(Configuration parameters) throws Exception {executorService ThreadPoolUtil.getInstance();}Overridepublic void asyncInvoke(T obj, ResultFutureT resultFuture) throws Exception {// 从线程池中获取线程发送异步请求executorService.submit(new Runnable() {Overridepublic void run() {// 根据流中的对象获取要作为查询条件的主键或者外键Tuple2String, String keyNameAndValue getCondition(obj);// 根据查询条件获取维度对象JSONObject dimInfoJsonObj DimUtil.getDimInfo(TmsConfig.HBASE_NAMESPACE, tableName, keyNameAndValue);// 将维度对象的属性补充到流中的对象上if (dimInfoJsonObj ! null) {join(obj, dimInfoJsonObj);}// 向下游传递数据resultFuture.complete(Collections.singleton(obj));}});} }2.DimJoinFunction 我们将需要DimAsyncFunction中一些需要子类实现的函数写入DimJoinFunction当作接口接入。 package com.atguigu.tms.realtime.beans;import com.alibaba.fastjson.JSONObject; import org.apache.flink.api.java.tuple.Tuple2;public interface DimJoinFunctionT {void join(T obj, JSONObject dimInfoJsonObj);Tuple2String, String getCondition(T obj); } 3.ThreadPoolUtil 异步I/O中用作创建线程池的工具类 package com.atguigu.tms.realtime.utils;import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit;public class ThreadPoolUtil {private static volatile ThreadPoolExecutor poolExecutor;public static synchronized ThreadPoolExecutor getInstance() {if (poolExecutor null) {synchronized (ThreadPoolUtil.class){if (poolExecutor null) {poolExecutor new ThreadPoolExecutor(4,20,300,TimeUnit.SECONDS,new LinkedBlockingDequeRunnable(Integer.MAX_VALUE));}}}return poolExecutor;} }4.DimUtil 在维度关联时我们需要从hbase中获取维度信息为了为了优化查询速度我们引入了redis流程如图所示 package com.atguigu.tms.realtime.utils;import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.java.tuple.Tuple2; import redis.clients.jedis.Jedis; import lombok.extern.slf4j.Slf4j;Slf4j public class DimUtil {public static JSONObject getDimInfo(String namespace, String tableName, Tuple2String, String nameAndValue) {// 获取的查询条件中的字段名以及字段值String keyName nameAndValue.f0;String keyValue nameAndValue.f1;// 拼接从Redis中查询数据的KeyString redisKey dim: tableName.toLowerCase() : keyName _ keyValue;// 操作Redis的客户端Jedis jedis null;// 用于存放从Redis查询的维度数据String dimJsonStr null;// 用于封装返回结果JSONObject dimJsonObj null;// 先从缓存中查询维度数据try {jedis JedisUtil.getJedis();dimJsonStr jedis.get(redisKey);if (StringUtils.isNotEmpty(dimJsonStr)) {// 如果在缓存中能够找到要查询的维度dimJsonObj JSON.parseObject(dimJsonStr);} else {// 如果在缓存中没有找到要查询的维度数据if (id.equals(keyName)) {dimJsonObj HbaseUtil.getRowByPrimaryKey(namespace, tableName, nameAndValue);} else {dimJsonObj HbaseUtil.getRowByForeignKey(namespace, tableName, nameAndValue);}if (dimJsonObj ! null jedis ! null) {jedis.setex(redisKey, 3600 * 24, dimJsonObj.toJSONString());}}} catch (Exception e) {log.error(从Redis中查询维度数据发生了一场, e);} finally {if (jedis ! null) {System.out.println(关闭客户端);jedis.close();}}return dimJsonObj;}// 从Redis中删除缓存的维度数据public static void delCached(String tableName, Tuple2String, String keyNameAndValue) {String keyName keyNameAndValue.f0;String keyValue keyNameAndValue.f1;String redisKey dim: tableName.toLowerCase() : keyName _ keyValue;Jedis jedis null;try {jedis JedisUtil.getJedis();jedis.decr(redisKey);}catch (Exception e){log.error(清除Redis中缓存的维度数据时发生了异常, e);}finally {if (jedis ! null) {jedis.close();}}} }5.JedisUtil 用于连接reids的jedis客户端。 先在pom中引入依赖 dependencygroupIdredis.clients/groupIdartifactIdjedis/artifactIdversion3.3.0/version /dependencypackage com.atguigu.tms.realtime.utils;import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPoolConfig;public class JedisUtil {private static JedisPool jedisPool;static {JedisPoolConfig poolConfig new JedisPoolConfig();poolConfig.setMaxTotal(1000);poolConfig.setMaxIdle(5);poolConfig.setMinIdle(5);poolConfig.setBlockWhenExhausted(true);poolConfig.setMaxWaitMillis(2000L);poolConfig.setTestOnBorrow(true);jedisPoolnew JedisPool(poolConfig,hadoop102,6379,10000);}public static Jedis getJedis(){System.out.println(创建Jedis客户端);Jedis jedis jedisPool.getResource();return jedis;}public static void main(String[] args) {Jedis jedis getJedis();String pong jedis.ping();System.out.println(pong);} } 6. HbaseUtil 之前我们在HbaseUtil完成了创建表和插入操作现在来完成查询操作。 package com.atguigu.tms.realtime.utils;import com.alibaba.fastjson.JSONObject; import com.atguigu.tms.realtime.commom.TmsConfig; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; import org.apache.hadoop.hbase.util.Bytes;import java.io.IOException;public class HbaseUtil {private static Connection conn;static {try {Configuration conf HBaseConfiguration.create();conf.set(hbase.zookeeper.quorum, TmsConfig.hbase_zookeeper_quorum);conn ConnectionFactory.createConnection(conf);} catch (IOException e) {throw new RuntimeException(e);}}// 创建表public static void createTable(String nameSpace, String tableName, String... families) {Admin admin null;try {if (families.length 1) {System.out.println(至少需要一个列族);return;}admin conn.getAdmin();// 判断表是否存在if (admin.tableExists(TableName.valueOf(nameSpace, tableName))) {System.out.println(nameSpace : tableName 已存在);return;}TableDescriptorBuilder builder TableDescriptorBuilder.newBuilder(TableName.valueOf(nameSpace, tableName));// 指定列族for (String family : families) {ColumnFamilyDescriptor familyDescriptor ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(family)).build();builder.setColumnFamily(familyDescriptor);}admin.createTable(builder.build());} catch (IOException e) {throw new RuntimeException(e);} finally {if (admin ! null) {try {admin.close();} catch (IOException e) {throw new RuntimeException(e);}}}}// 向hbase插入对象public static void putPow(String namespace, String tableName, Put put) {BufferedMutator mutator null;try {BufferedMutatorParams params new BufferedMutatorParams(TableName.valueOf(namespace, tableName));params.writeBufferSize(5 * 1024 * 1024);params.setWriteBufferPeriodicFlushTimeoutMs(3000L);mutator conn.getBufferedMutator(params);mutator.mutate(put);} catch (IOException e) {throw new RuntimeException(e);} finally {if (mutator ! null) {try {mutator.close();} catch (IOException e) {throw new RuntimeException(e);}}}}// 根据主键从Hbase表中查询一行数据public static JSONObject getRowByPrimaryKey(String namespace, String tableName, Tuple2String, String rowKeyNameAndKey) {Table table null;JSONObject dimJsonObj null;String rowKeyName rowKeyNameAndKey.f0;String rowKeyValue rowKeyNameAndKey.f1;try {table conn.getTable(TableName.valueOf(namespace, tableName));Result result table.get(new Get(Bytes.toBytes(rowKeyValue)));Cell[] cells result.rawCells();if (cells.length 0) {dimJsonObj new JSONObject();dimJsonObj.put(rowKeyName, rowKeyValue);for (Cell cell : cells) {dimJsonObj.put(Bytes.toString(CellUtil.cloneQualifier(cell)), Bytes.toString(CellUtil.cloneValue(cell)));}} else {System.out.println(从Hbase表中没有找到对应的维度数据);}} catch (IOException e) {throw new RuntimeException(e);} finally {if (table ! null) {try {table.close();} catch (IOException e) {throw new RuntimeException(e);}}}return dimJsonObj;}//根据外键从hbase表中查询一行数据public static JSONObject getRowByForeignKey(String namespace, String tableName, Tuple2String, String foreignKeyNameAndKey) {Table table null;JSONObject dimJsonObj null;try {table conn.getTable(TableName.valueOf(namespace, tableName));Scan scan new Scan();String foreignKeyName foreignKeyNameAndKey.f0;String foreignKeyValue foreignKeyNameAndKey.f1;SingleColumnValueFilter singleColumnValueFilter new SingleColumnValueFilter(Bytes.toBytes(info),Bytes.toBytes(foreignKeyName), CompareOperator.EQUAL,Bytes.toBytes(foreignKeyValue));singleColumnValueFilter.setFilterIfMissing(true);scan.setFilter(singleColumnValueFilter);ResultScanner scanner table.getScanner(scan);Result result scanner.next();if (result!null){Cell[] cells result.rawCells();if (cells.length 0) {dimJsonObj new JSONObject();dimJsonObj.put(id, Bytes.toString(result.getRow()));for (Cell cell : cells) {dimJsonObj.put(Bytes.toString(CellUtil.cloneQualifier(cell)), Bytes.toString(CellUtil.cloneValue(cell)));}} else {System.out.println(从Hbase表中没有找到对应的维度数据);}}} catch (IOException e) {throw new RuntimeException(e);} finally {if (table ! null) {try {table.close();} catch (IOException e) {throw new RuntimeException(e);}}}return dimJsonObj;} } 7. DwsBoundOrgSortDayBean 自定义的工具类其中包含我们要写入ck的字段 package com.atguigu.tms.realtime.beans;import lombok.Builder; import lombok.Data;Data Builder public class DwsBoundOrgSortDayBean {// 统计日期String curDate;// 机构 IDString orgId;// 机构名称String orgName;// 用于关联获取省份信息的机构 IDTransientSinkString joinOrgId;// 城市 IDString cityId;// 城市名称String cityName;// 省份 IDString provinceId;// 省份名称String provinceName;// 分拣次数Long sortCountBase;// 时间戳Long ts; } 8.补充维度字段 我代码编写我们需要维度表的外键字段所以我们重新修改mysql维度表添加外键字段。 DROP TABLE IF EXISTS tms_config_dim; CREATE TABLE tms_config_dim (source_table varchar(200) NOT NULL COMMENT 数据源表,sink_table varchar(200) DEFAULT NULL COMMENT 目标表,sink_family varchar(200) DEFAULT NULL COMMENT 目标表列族,sink_columns varchar(200) DEFAULT NULL COMMENT 目标表列,sink_pk varchar(256) DEFAULT NULL COMMENT 主键字段,foreign_keys varchar(256) DEFAULT NULL COMMENT 外键查询字段,PRIMARY KEY (source_table) ) ENGINEInnoDB DEFAULT CHARSETutf8 COMMENT物流实时配置表;然后从新导入数据。 然后我们使用dimapp同步一下数据即可具体方法可看Dim层搭建。 9. MyBroadcastProcessFunction 我们之前在DIM层的搭建中在MyBroadcastProcessFunction的processElement函数中过滤掉了外键但现在需要他我们把它加上。 在传递前添加一段代码 // 清除Redis缓存的准备工作传递操作类型、外键字段的k-v String op jsonObj.getString(op); if (u.equals(op)) {afterJsonObj.put(op, op);// 从配置表中获取当前维度表关联的外键名String foreignKeys tmsConfigDimBean.getForeignKeys();// 定义个json对象用于存储当前维度表对应的外键名以及值JSONObject foreignjsonObj new JSONObject();if (StringUtils.isNotEmpty(foreignKeys)) {String[] foreignNameArr foreignKeys.split(,);for (String foreignName : foreignNameArr) {// 获取修改前的数据JSONObject before jsonObj.getJSONObject(before);String foreignKeyBefore before.getString(foreignName);String foreignKeyAfter afterJsonObj.getString(foreignName);if (!foreignKeyBefore.equals(foreignKeyAfter)) {// 如果修改的是外键foreignjsonObj.put(foreignName, foreignKeyBefore);}else {foreignjsonObj.put(foreignName, foreignKeyBefore);}}}afterJsonObj.put(foreign_key, foreignjsonObj); }完成代码 package com.atguigu.tms.realtime.app.func;import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.atguigu.tms.realtime.beans.TmsConfigDimBean; import com.atguigu.tms.realtime.utils.DateFormatUtil; import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.common.state.BroadcastState; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.state.ReadOnlyBroadcastState; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction; import org.apache.flink.util.Collector;import java.sql.*; import java.util.*;// 自定义类 完成主流和广播流的处理 public class MyBroadcastProcessFunction extends BroadcastProcessFunctionJSONObject, String, JSONObject {private MapStateDescriptorString, TmsConfigDimBean mapStateDescriptor;private MapString, TmsConfigDimBean configMap new HashMap();private String username;private String password;public MyBroadcastProcessFunction(MapStateDescriptorString, TmsConfigDimBean mapStateDescriptor, String[] args) {this.mapStateDescriptor mapStateDescriptor;ParameterTool parameterTool ParameterTool.fromArgs(args);this.username parameterTool.get(mysql-username, root);this.password parameterTool.get(mysql-password, 000000);}Overridepublic void open(Configuration parameters) throws Exception {// 将配置表中的数据进行预加载-JDBCClass.forName(com.mysql.cj.jdbc.Driver);String url jdbc:mysql://hadoop102:3306/tms_config?useSSLfalseuseUnicodetrue user username password password charsetutf8TimeZoneAsia/Shanghai;Connection conn DriverManager.getConnection(url);PreparedStatement ps conn.prepareStatement(select * from tms_config.tms_config_dim);ResultSet rs ps.executeQuery();ResultSetMetaData metaData rs.getMetaData();while (rs.next()) {JSONObject jsonObj new JSONObject();for (int i 1; i metaData.getColumnCount(); i) {String columnName metaData.getColumnName(i);Object columValue rs.getObject(i);jsonObj.put(columnName, columValue);}TmsConfigDimBean tmsConfigDimBean jsonObj.toJavaObject(TmsConfigDimBean.class);configMap.put(tmsConfigDimBean.getSourceTable(), tmsConfigDimBean);}rs.close();ps.close();conn.close();super.open(parameters);}Overridepublic void processElement(JSONObject jsonObj, BroadcastProcessFunctionJSONObject, String, JSONObject.ReadOnlyContext ctx, CollectorJSONObject out) throws Exception {// 获取操作的业务数据库的表名String table jsonObj.getString(table);// 获取广播状态ReadOnlyBroadcastStateString, TmsConfigDimBean broadcastState ctx.getBroadcastState(mapStateDescriptor);// 根据操作的业务数据库的表名 到广播状态中获取对应的配置信息TmsConfigDimBean tmsConfigDimBean;if ((tmsConfigDimBean broadcastState.get(table)) ! null || (tmsConfigDimBean configMap.get(table)) ! null) {// 如果对应的配置信息不为空 是维度信息// 获取after对象对应的是影响的业务数据表中的一条记录JSONObject afterJsonObj jsonObj.getJSONObject(after);// 数据脱敏switch (table) {// 员工表信息脱敏case employee_info:String empPassword afterJsonObj.getString(password);String empRealName afterJsonObj.getString(real_name);String idCard afterJsonObj.getString(id_card);String phone afterJsonObj.getString(phone);// 脱敏empPassword DigestUtils.md5Hex(empPassword);empRealName empRealName.charAt(0) empRealName.substring(1).replaceAll(., \\*);//知道有这个操作 idCard是随机生成的和标准的格式不一样 所以这里注释掉// idCard idCard.matches((^[1-9]\\d{5}(18|19|([23]\\d))\\d{2}((0[1-9])|(10|11|12))(([0-2][1-9])|10|20|30|31)\\d{3}[0-9Xx]$)|(^[1-9]\\d{5}\\d{2}((0[1-9])|(10|11|12))(([0-2][1-9])|10|20|30|31)\\d{2}$))// ? DigestUtils.md5Hex(idCard) : null;phone phone.matches(^(13[0-9]|14[01456879]|15[0-35-9]|16[2567]|17[0-8]|18[0-9]|19[0-35-9])\\d{8}$)? DigestUtils.md5Hex(phone) : null;afterJsonObj.put(password, empPassword);afterJsonObj.put(real_name, empRealName);afterJsonObj.put(id_card, idCard);afterJsonObj.put(phone, phone);break;// 快递员信息脱敏case express_courier:String workingPhone afterJsonObj.getString(working_phone);workingPhone workingPhone.matches(^(13[0-9]|14[01456879]|15[0-35-9]|16[2567]|17[0-8]|18[0-9]|19[0-35-9])\\d{8}$)? DigestUtils.md5Hex(workingPhone) : null;afterJsonObj.put(working_phone, workingPhone);break;// 卡车司机信息脱敏case truck_driver:String licenseNo afterJsonObj.getString(license_no);licenseNo DigestUtils.md5Hex(licenseNo);afterJsonObj.put(license_no, licenseNo);break;// 卡车信息脱敏case truck_info:String truckNo afterJsonObj.getString(truck_no);String deviceGpsId afterJsonObj.getString(device_gps_id);String engineNo afterJsonObj.getString(engine_no);truckNo DigestUtils.md5Hex(truckNo);deviceGpsId DigestUtils.md5Hex(deviceGpsId);engineNo DigestUtils.md5Hex(engineNo);afterJsonObj.put(truck_no, truckNo);afterJsonObj.put(device_gps_id, deviceGpsId);afterJsonObj.put(engine_no, engineNo);break;// 卡车型号信息脱敏case truck_model:String modelNo afterJsonObj.getString(model_no);modelNo DigestUtils.md5Hex(modelNo);afterJsonObj.put(model_no, modelNo);break;// 用户地址信息脱敏case user_address:String addressPhone afterJsonObj.getString(phone);addressPhone addressPhone.matches(^(13[0-9]|14[01456879]|15[0-35-9]|16[2567]|17[0-8]|18[0-9]|19[0-35-9])\\d{8}$)? DigestUtils.md5Hex(addressPhone) : null;afterJsonObj.put(phone, addressPhone);break;// 用户信息脱敏case user_info:String passwd afterJsonObj.getString(passwd);String realName afterJsonObj.getString(real_name);String phoneNum afterJsonObj.getString(phone_num);String email afterJsonObj.getString(email);// 脱敏passwd DigestUtils.md5Hex(passwd);if (StringUtils.isNotEmpty(realName)) {realName DigestUtils.md5Hex(realName);afterJsonObj.put(real_name, realName);}phoneNum phoneNum.matches(^(13[0-9]|14[01456879]|15[0-35-9]|16[2567]|17[0-8]|18[0-9]|19[0-35-9])\\d{8}$)? DigestUtils.md5Hex(phoneNum) : null;email email.matches(^[a-zA-Z0-9_-][a-zA-Z0-9_-](\\.[a-zA-Z0-9_-])$)? DigestUtils.md5Hex(email) : null;afterJsonObj.put(birthday, DateFormatUtil.toDate(afterJsonObj.getInteger(birthday) * 24 * 60 * 60 * 1000L));afterJsonObj.put(passwd, passwd);afterJsonObj.put(phone_num, phoneNum);afterJsonObj.put(email, email);break;}// 过滤不需要的维度属性String sinkColumns tmsConfigDimBean.getSinkColumns();filterColum(afterJsonObj, sinkColumns);// 补充输出目的的表名String sinkTable tmsConfigDimBean.getSinkTable();afterJsonObj.put(sink_table, sinkTable);// 补充rowKeyString sinkPk tmsConfigDimBean.getSinkPk();afterJsonObj.put(sink_pk, sinkPk);// 清除Redis缓存的准备工作传递操作类型、外键字段的k-vString op jsonObj.getString(op);if (u.equals(op)) {afterJsonObj.put(op, op);// 从配置表中获取当前维度表关联的外键名String foreignKeys tmsConfigDimBean.getForeignKeys();// 定义个json对象用于存储当前维度表对应的外键名以及值JSONObject foreignjsonObj new JSONObject();if (StringUtils.isNotEmpty(foreignKeys)) {String[] foreignNameArr foreignKeys.split(,);for (String foreignName : foreignNameArr) {// 获取修改前的数据JSONObject before jsonObj.getJSONObject(before);String foreignKeyBefore before.getString(foreignName);String foreignKeyAfter afterJsonObj.getString(foreignName);if (!foreignKeyBefore.equals(foreignKeyAfter)) {// 如果修改的是外键foreignjsonObj.put(foreignName, foreignKeyBefore);}else {foreignjsonObj.put(foreignName, foreignKeyBefore);}}}afterJsonObj.put(foreign_key, foreignjsonObj);}// 将维度数据传递out.collect(afterJsonObj);}}private void filterColum(JSONObject afterJsonObj, String sinkColumns) {String[] fieldArr sinkColumns.split(,);ListString fieldList Arrays.asList(fieldArr);SetMap.EntryString, Object entrySet afterJsonObj.entrySet();entrySet.removeIf(entry - !fieldList.contains(entry.getKey()));}Overridepublic void processBroadcastElement(String jsonStr, BroadcastProcessFunctionJSONObject, String, JSONObject.Context ctx, CollectorJSONObject out) throws Exception {JSONObject jsonObj JSON.parseObject(jsonStr);// 获取广播状态BroadcastStateString, TmsConfigDimBean broadcastState ctx.getBroadcastState(mapStateDescriptor);// 获取对配置表的操作类型String op jsonObj.getString(op);if (d.equals(op)) {String sourceTable jsonObj.getJSONObject(before).getString(source_table);broadcastState.remove(sourceTable);configMap.remove(sourceTable);} else {TmsConfigDimBean configDimBean jsonObj.getObject(after, TmsConfigDimBean.class);String sourceTable configDimBean.getSourceTable();broadcastState.put(sourceTable, configDimBean);configMap.put(sourceTable, configDimBean);}} } 10. DimSinkFunction 添加清除代码 // 如果维度数据发生了变化将Redis中缓存的维度数据清空掉if (u.equals(op)) {// 删除当前维度数据在Redis中对应主键的缓存DimUtil.delCached(sinkTable, Tuple2.of(id, jsonObj.getString(id)));// 删除当前维度数据在Redis中对应外键的缓存SetMap.EntryString, Object set foreignKeyJsonObj.entrySet();for (Map.EntryString, Object entry : set) {DimUtil.delCached(sinkTable, Tuple2.of(entry.getKey(), entry.getValue().toString()));}}完整代码 package com.atguigu.tms.realtime.app.func;import com.alibaba.fastjson.JSONObject; import com.atguigu.tms.realtime.commom.TmsConfig; import com.atguigu.tms.realtime.utils.DimUtil; import com.atguigu.tms.realtime.utils.HbaseUtil; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.hadoop.hbase.client.Put;import java.util.Map; import java.util.Set;public class DimSinkFunction implements SinkFunctionJSONObject {public void invoke(JSONObject jsonObj, Context context) throws Exception {// 获取输出目的地表名和rowKeyString sinkTable jsonObj.getString(sink_table);String sinkPk jsonObj.getString(sink_pk);jsonObj.remove(sink_table);jsonObj.remove(sink_pk);String op jsonObj.getString(op);jsonObj.remove(op);JSONObject foreignKeyJsonObj jsonObj.getJSONObject(foreign_key);jsonObj.remove(foreign_key);// 获取json中的每一个键值对SetMap.EntryString, Object entrySet jsonObj.entrySet();Put put new Put(jsonObj.getString(sinkPk).getBytes());for (Map.EntryString, Object entry : entrySet) {if (!sinkPk.equals(entry.getKey())) {put.addColumn(info.getBytes(), entry.getKey().getBytes(), entry.getValue().toString().getBytes());}}System.out.println(向hbase表中插入数据);HbaseUtil.putPow(TmsConfig.HBASE_NAMESPACE, sinkTable, put);// 如果维度数据发生了变化将Redis中缓存的维度数据清空掉if (u.equals(op)) {// 删除当前维度数据在Redis中对应主键的缓存DimUtil.delCached(sinkTable, Tuple2.of(id, jsonObj.getString(id)));// 删除当前维度数据在Redis中对应外键的缓存SetMap.EntryString, Object set foreignKeyJsonObj.entrySet();for (Map.EntryString, Object entry : set) {DimUtil.delCached(sinkTable, Tuple2.of(entry.getKey(), entry.getValue().toString()));}}} }5.写入CK 1. ClickHouseUtil 先导入需要的依赖。 dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-jdbc/artifactIdversion3.1.0-1.17/version /dependencydependencygroupIdru.yandex.clickhouse/groupIdartifactIdclickhouse-jdbc/artifactIdversion0.3.2/versionexclusionsexclusiongroupIdcom.fasterxml.jackson.core/groupIdartifactIdjackson-databind/artifactId/exclusionexclusiongroupIdcom.fasterxml.jackson.core/groupIdartifactIdjackson-core/artifactId/exclusion/exclusions /dependencyClickHouseUtil package com.atguigu.tms.realtime.utils;import com.atguigu.tms.realtime.beans.TransientSink; import com.atguigu.tms.realtime.commom.TmsConfig; import org.apache.flink.connector.jdbc.JdbcConnectionOptions; import org.apache.flink.connector.jdbc.JdbcExecutionOptions; import org.apache.flink.connector.jdbc.JdbcSink; import org.apache.flink.connector.jdbc.JdbcStatementBuilder; import org.apache.flink.streaming.api.functions.sink.SinkFunction;import java.lang.reflect.Field; import java.sql.PreparedStatement; import java.sql.SQLException;public class ClickHouseUtil {// 获取SinkFunctionpublic static T SinkFunctionT getJdbcSink(String sql) {SinkFunctionT sinkFunction JdbcSink.Tsink(sql,new JdbcStatementBuilderT() {Overridepublic void accept(PreparedStatement ps, T obj) throws SQLException {// 将流中对象的属性给问号占位符赋值// 获取单签流中对象岁数的类型 以及类中的属性Field[] fieldsArr obj.getClass().getDeclaredFields();// 遍历所有属性int skipNum 0;for (int i 0; i fieldsArr.length; i) {Field field fieldsArr[i];// 判断当前属性是否需要向流中保存TransientSink transientSink field.getAnnotation(TransientSink.class);if (transientSink ! null) {skipNum;continue;}// 设置私有属性的访问权限field.setAccessible(true);try {Object fieldValue field.get(obj);ps.setObject(i 1 - skipNum, fieldValue);} catch (IllegalAccessException e) {throw new RuntimeException(e);}}}},new JdbcExecutionOptions.Builder().withBatchSize(5000).withBatchIntervalMs(3000L).build(),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withDriverName(TmsConfig.CLICKHOUSE_DRIVER).withUrl(TmsConfig.CLICKHOUSE_URL).build());return sinkFunction;} } 2.TransientSink package com.atguigu.tms.realtime.beans;// 自定义主键 用于标记不需要向ck中保存的属性import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target;Target(ElementType.FIELD) Retention(RetentionPolicy.RUNTIME) public interface TransientSink { } 二、代码测试 1.程序启动 根据代码逻辑我们需要启动以下程序。 hdfs、zk、kafka、hbase、redise、ck、OdsApp、DwdBoundRelevantApp、DimApp和DwsBoundOrgSortDay。其中DimApp只需启动一次完成维度数据更新即可。 2.修改kafka分区 再从kafka读取数据时应该保证kafka有4个分区不然聚合无法成功。 kafka-topics.sh --bootstrap-server hadoop102:9092 --alter --topic tms_dwd_bound_sort --partitions 43.ck建表 1.建库 DROP DATABASE IF EXISTS tms_realtime; CREATE DATABASE IF NOT EXISTS tms_realtime; USE tms_realtime;2.建表 DROP TABLE IF EXISTS dws_bound_org_sort_day_base; CREATE TABLE IF NOT EXISTS dws_bound_org_sort_day_base (cur_date Date COMMENT 统计日期,org_id String COMMENT 机构ID,org_name String COMMENT 机构名称,city_id String COMMENT 城市ID,city_name String COMMENT 城市名称,province_id String COMMENT 省份ID,province_name String COMMENT 省份名称,sort_count_base UInt64 COMMENT 分拣次数,ts UInt64 COMMENT 时间戳 ) ENGINE MergeTree ORDER BY (cur_date, org_id, org_name, city_id, city_name, province_id, province_name);3.物化视图 DROP VIEW IF EXISTS dws_bound_org_sort_day; CREATE MATERIALIZED VIEW IF NOT EXISTS dws_bound_org_sort_day (cur_date Date, org_id String, org_name String, city_id String, city_name String, province_id String, province_name String, sort_count AggregateFunction(argMax, UInt64, UInt64) ) ENGINE AggregatingMergeTree() ORDER BY (cur_date, org_id, org_name, city_id, city_name, province_id, province_name) SETTINGS index_granularity 8192 AS SELECT cur_date, org_id, org_name, city_id, city_name, province_id, province_name, argMaxState(sort_count_base, ts) AS sort_count FROM dws_bound_org_sort_day_base GROUP BY cur_date, org_id, org_name, city_id, city_name, province_id, province_name;4.查看结果 当运行程序后开始生成数据等待执行完成之后可以在ck中使用如下代码查看。 clickhouse-client -m-m 参数代表可以使用回车。 SELECTcur_date,org_id,org_name,city_id,city_name,province_id,province_name,argMaxMerge(sort_count) AS sort_count FROM dws_bound_org_sort_day GROUP BYcur_date,org_id,org_name,city_id,city_name,province_id,province_name LIMIT 10;总结 至此Dws的部分搭建就结束了为了方便进行文件管理我把项目开源到了github上。 项目地址https://github.com/lcc-666/tms-parent
http://www.pierceye.com/news/902163/

相关文章:

  • 嘉峪关建设路小学网站游戏网页链接
  • 阿里云 网站根目录广东建筑企业50强
  • 河北省网络科技网站装饰设计素描
  • 合肥网站建设索q479185700企业做网站公司哪家好
  • wordpress暂停网站兰州网站建设方法
  • 丰台网站制作html教程 菜鸟教程
  • 在那个网站做直播好赚钱吗重庆妇科医院排名大全
  • 在线教育网站建设投标书查询公司的网站备案信息查询
  • 俄文网站策划wdcp wordpress
  • 建设个人网站流程中国工程建设招聘信息网站
  • 电影网站设计说明书在原域名给公司建立网站
  • 小规模公司做网站成本是什么wordpress主题转html
  • seo做的比较好的网站的几个特征app网站建设教程视频教程
  • 网站建设规范优质高等职业院校建设网站
  • 国内做网站哪家公司好机票什么网站建设
  • 万盛经开区建设局官方网站高校校园网站建设的要求
  • 制作企业网站的实训报告防伪码查询网站怎么做的
  • 做网站会很忙吗网站 js 广告代码
  • 没有网站域名备案专业做书画推广的网站
  • 做app网站公司名称有没有做黑市网站
  • apache建设网站做网站页面代码
  • html5 单页网站网络运维从入门到精通
  • 联合建设官方网站银川网站seo
  • jsp网站开发与设计摘要网站开发是什么
  • 公司网站建设论文结束语谷歌建站多少钱
  • 陕西省建设工会网站漳州做网站最便宜
  • asp网站怎么做301定向辽宁网络优化方法
  • 足球网站建设无锡企业网站的建设
  • 网站建设 国风网络学校网站建设新闻
  • 网站集成微信登录ai logo设计网站