网站建设 军报,上海广告公司电话号码,深圳市建设工程交易服务中心网,wordpress新建页面common模块回顾
app BaseApp: 作为其他子模块中使用Flink - StreamAPI的父类#xff0c;实现了StreamAPI中的通用逻辑#xff0c;在其他子模块中只需编写关于数据处理的核心逻辑。BaseSQLApp: 作为其他子模块中使用Flink- SQLAPI的父类。在里面设置了使用SQL API的环境、并行…common模块回顾
app BaseApp: 作为其他子模块中使用Flink - StreamAPI的父类实现了StreamAPI中的通用逻辑在其他子模块中只需编写关于数据处理的核心逻辑。BaseSQLApp: 作为其他子模块中使用Flink- SQLAPI的父类。在里面设置了使用SQL API的环境、并行度、检查点等固定逻辑。 bean存放其他子模块中使用到的javaBean对象因为如果一直使用jsonObject对象调用数据的话需要使用类似getString(字段名)的方式没有直接使用javaBean对象那么方便。constant 存储字符串常量为了保证一致性如果某个常量修改时只需在这里修改即可对整个项目进行修改 function DorisMapFunction将javaBean对象转换为对应的json字符串对象并且将驼峰式命名方式修改为蛇形命名方式。便于写入doris。 util DateFormateUtilFlinkSinkUtilFlinkSourceUtilHBaseUtilIkUtilJdbcUtilSQLUtil getUpsertKafakaSQL: 一定要声明主键支持撤回流getDorisSinkSQL: 用于写入Doris
dim层回顾
Flink-cdc监控mysql中的维度配置表将监控的数据流做成广播流将广播流和读取数据的主流进行connect主流数据根据广播流的配置信息进行分流注意需要先提前缓存一次配置表信息达到动态拆分数据表的效果
dwd层FlinkSQL回顾
注意join时会将所有数据都存储到内存中需要考虑设置TTL大表join小表时可以考虑使用lookup join如果数据流有明确的先后关系时考虑使用Interval join 在支付成功模块由于订单详情表处理时已经存在撤回流但支付成功模块也是使用left join方式调用订单详情数据会导致产生两次撤回流。在后续dws层处理时要注意对数据进行去重过滤。 dws层回顾
如何判断使用FlinkSQL还是StreamAPI 如果比较标准化, 比如简单的开窗聚合一般使用FlinkSQL如果需要使用状态处理数据比如判断是否为独立用户使用StreamAPI
交易域sku粒度订单下单各窗口汇总 需求分析从Kafka订单明细主题读取数据过滤null数据并按照唯一键对数据去重按照SKU维度分组统计原始金额、活动减免金额、优惠券减免金额和订单金额并关联维度信息将数据写入Doris交易域SKU粒度下单各窗口汇总表 思路分析 方案一按照订单ID进行分组根据业务逻辑设置定时器取最后一个数据进行发送方案二将度量值存放到状态中每次新数据到达时将新的度量值减去状态中的度量值 具体实现 因为需要使用状态故使用BaseApp; 设置端口号10029并发度4消费者组为类名消费者主题名称为dwd订单详情读取dwd下单主题数据 stream.print()过滤清洗: 去掉null数据, stream.flatMap(new FlatMapFunction())ts: 水位线不能为空进行位数的修正如果是10位的使用 jsonObj.put(ts, ts*1000)id: keyby的关键字不能为空sku_id: group by的粒度关键字也不能为空 添加水位线 网络延迟5L添加数据的泛型提取数据中的ts作为水位线注意观察ts的位数需要为13位毫秒级 修正度量值转换数据结构 使用id关键字进行分组使用process算子中的状态来进行处理stream.process(new KeyedProcessFunction)返回值为对应的javabean对象在状态中存储上一次的度量值大小只保存30秒在processElement()方法中获取状态中的度量值使用前需要判空如果为空设置为0之后才能进行数值计算。创建对应的bean对象度量值都减去状态中的度量值和更新状态中当前的度量值 分组开窗聚合 使用skuId进行keyby分组后使用window算子进行开窗,设置窗口时间注意Time属于org.apache.flink.streaming.api.windowing.time.Time.seconds()使用reduce算子进行聚合计算, 聚合时需要累积所有度量值new ProcessWindowFunction()获取窗口信息, startTime, EndTime, curTime, 获取到后写入javaBean对象中 关联维度信息 先分组聚合再关联维度信息的原因关联维度信息需要join操作是很耗费性能的大操作。先聚合数据能大幅度减少数据量。启动HBase查看对的sku_info表中是否存储着对应的维度信息获取外部连接需要使用生命周期方法(openclose在整个算子执行过程中只运行一次);对应的关联维度信息即RichMapFunction()在map方法中使用HBase的API读取表格数据使用读取到的字段补全原本的信息 创建HBase的API读取表格数据 get 获取table创建get对象调用get方法获取数据写入jsonObj 写出到Doris
维度关联优化
旁路缓存独立缓存服务有redis, memcache.
使用旁路缓存时要注意保持数据的一致性如果数据发生修改和删除直接删除redis中的数据。
同步旁路缓存模式
引入Jedis相关依赖
dependencygroupIdredis.clients/groupIdartifactIdjedis/artifactId
/dependency创建Redis工具类RedisUtil在RichMapFunction中的open和close方法中获取和关闭HBase和Redisd的连接。拼接对应的redisRowKey读取Redis缓存的数据jsonObj的字符串判断redis读取到的数据是否为空 没有数据需要读取HBasejsonObj HBaseUtil.getCells(), 读取到数据后使用jedis.setex()存储到redisredis有缓存直接返回 进行维度关联
Dim层写入HBase修正
在dim层将数据写入HBase时需要同时获取Redis的连接。判断redis中的缓存是否发生变化 判断数据类型是修改或删除时删除Redis中对应的数据拼写数据的rowkey使用jedis.del(rediskey)来删除