当前位置: 首页 > news >正文

南昌网站开发公司哪家公司好网站建设接单

南昌网站开发公司哪家公司好,网站建设接单,电子工程网,wordpress 有点尴尬诶1、概述 在生产场景值#xff0c;经常需要和上游、下游对数#xff0c;离线场景可以直接 group by 再 count #xff0c;但是实时场景中#xff0c;如果使用 kafka 作为中间件#xff0c;中间经过几个 job 的过滤转化后#xff0c;再对照像 Doris 或 Clickhouse 中最终层…1、概述 在生产场景值经常需要和上游、下游对数离线场景可以直接 group by 再 count 但是实时场景中如果使用 kafka 作为中间件中间经过几个 job 的过滤转化后再对照像 Doris 或 Clickhouse 中最终层的数据如果出现缺失很难判断是哪一层缺失的。 2、使用 侧流输出处理时间的滚动窗口状态进行数据量级统计 package com.flink.feature.windowcount;import org.apache.flink.api.common.state.MapState; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import org.apache.flink.util.OutputTag;/*** 1、先输入数据* 1* 1* 1* 1* 输出结果* main:8 业务处理1* main:1 业务处理1* main:2 业务处理1* main:3 业务处理1* 每10秒每个key接受到的数据量:2 (1698913020000,1698913030000,窗口统计1,4)** 2、再输入数据* 1* 2* 2* 3* 3* 4* 4* 输出结果* main:4 业务处理1* main:5 业务处理2* main:6 业务处理2* main:7 业务处理3* main:8 业务处理3* main:1 业务处理4* main:2 业务处理4* 每10秒每个key接受到的数据量:2 (1698913030000,1698913040000,窗口统计1,1)* 每10秒每个key接受到的数据量:7 (1698913030000,1698913040000,窗口统计4,2)* 每10秒每个key接受到的数据量:6 (1698913030000,1698913040000,窗口统计2,2)* 每10秒每个key接受到的数据量:6 (1698913030000,1698913040000,窗口统计3,2)*/public class UseWindowValidateData {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();OutputTagTuple2String,Integer windowCountTag new OutputTagTuple2String,Integer(window_count){};DataStreamSourceString source env.socketTextStream(localhost, 8888);SingleOutputStreamOperatorString process source.process(new ProcessFunctionString, String() {Overridepublic void processElement(String input, ProcessFunctionString, String.Context ctx, CollectorString collector) throws Exception {ctx.output(windowCountTag,new Tuple2(窗口统计input,1));collector.collect(业务处理 input);}});process.getSideOutput(windowCountTag).keyBy(new KeySelectorTuple2String,Integer, String() {Overridepublic String getKey(Tuple2String,Integer tp) throws Exception {return tp.f0;}}).window(TumblingProcessingTimeWindows.of(Time.seconds(10))).process(new ProcessWindowFunctionTuple2String, Integer, Tuple4String,String,String,Integer, String, TimeWindow() {private MapStateString, Integer mapState;Overridepublic void open(Configuration parameters) throws Exception {MapStateDescriptorString, Integer stateDescriptor new MapStateDescriptor(map-state, String.class, Integer.class);mapState getRuntimeContext().getMapState(stateDescriptor);}Overridepublic void process(String key, ProcessWindowFunctionTuple2String, Integer, Tuple4String, String, String, Integer, String, TimeWindow.Context ctx, IterableTuple2String, Integer elements, CollectorTuple4String, String, String, Integer out) throws Exception {for (Tuple2String, Integer tp : elements) {Integer res mapState.get(tp.f0);if (res null) {res 0;}res 1;mapState.put(tp.f0, res);}out.collect(new Tuple4(String.valueOf(ctx.window().getStart()),String.valueOf(ctx.window().getEnd()),key,mapState.get(key)));// 每个窗口计算后清空状态mapState.clear();}}).print(每10秒每个key接受到的数据量);process.print(main);env.execute();} } 3、测试结果 1先输入数据 1 1 1 1 输出结果 main:8 业务处理1 main:1 业务处理1 main:2 业务处理1 main:3 业务处理1 每10秒每个key接受到的数据量:2 (1698913020000,1698913030000,窗口统计1,4) 2再输入数据 1 2 2 3 3 4 4 输出结果 main:4 业务处理1 main:5 业务处理2 main:6 业务处理2 main:7 业务处理3 main:8 业务处理3 main:1 业务处理4 main:2 业务处理4 每10秒每个key接受到的数据量:2 (1698913030000,1698913040000,窗口统计1,1) 每10秒每个key接受到的数据量:7 (1698913030000,1698913040000,窗口统计4,2) 每10秒每个key接受到的数据量:6 (1698913030000,1698913040000,窗口统计2,2) 每10秒每个key接受到的数据量:6 (1698913030000,1698913040000,窗口统计3,2)
http://www.pierceye.com/news/206070/

相关文章:

  • 宁波网站制作哪家强调用wordpress的文章编辑器
  • 在线制作手机网站公司网站建设厂家
  • 在线分析网站一个小外贸公司怎么开
  • 给自己的公司做网站怎么做好电脑手机一体网站
  • 精通网站建设 全能建站密码pdf电商网站设计理念
  • 百度推广建设网站是不是合发手机网站的必要性
  • 企业网站建设是什么实现的物质基础和技术支撑现货交易平台代理
  • 网站建设的描述长沙发布app
  • 好的设计作品网站代理网站建设
  • 做网站的软件m开头网站建设公司问候语
  • 做网站需要工商证吗app软件开发价格
  • 做足球原创短视频网站网站建设永远在路上
  • 做seo为什么要了解网站苏州做网站公司
  • 这几年做哪些网站能致富网站开发账务处理
  • 网站的版权信息做阿里巴巴网站卖货咋样
  • 找项目去哪个网站成都哪里有做网站的公司
  • 网站推广的方法及特点国外专门做童装的网站
  • 企业网站开发模型图wordpress 侧边导航
  • 济南网站系统优化网站建设属于什么专业
  • 114啦建站程序页面效果好的网站
  • 龙华网站建设-信科网络电子商务网站建设和技术现状
  • 网站备案有效期wordpress 评论图片
  • 搭建网站需要哪些步骤wordpress 主题使用
  • 网站怎么发布做微商天眼官方网站
  • qq群网站制作异常网站服务器失去响应
  • aspnet网站开发模板紫光华宇拼音输入法官方下载
  • 东莞网站设计价格wordpress的配置dns
  • 韶关网站建设公司电子商务网站建设考试重点
  • 网站左侧 导航小红书广告投放平台
  • 资阳住房和城乡建设厅网站重庆建设网站建站