当前位置: 首页 > news >正文

企业标准建站做网站编程

企业标准建站,做网站编程,企业微网站怎么做,市场营销具体是做什么的【README】 本文记录了 窗口算子的触发器trigger和 evictor清理器#xff1b; trigger触发器#xff1a;决定了一个窗口#xff08;由 window assigner 定义#xff09;何时可以被 window function 处理#xff1b;evictor清理器#xff1a; evictor 可以在 trigger 触…【README】 本文记录了 窗口算子的触发器trigger和 evictor清理器 trigger触发器决定了一个窗口由 window assigner 定义何时可以被 window function 处理evictor清理器 evictor 可以在 trigger 触发后、调用窗口函数之前或之后从窗口中删除元素【1】触发器trigger 1Trigger 接口提供了五个方法来响应不同的事件 onElement() 方法在每个元素被加入窗口时调用。onEventTime() 方法在注册的 event-time timer 触发时调用。onProcessingTime() 方法在注册的 processing-time timer 触发时调用。onMerge() 方法与有状态的 trigger 相关。该方法会在两个窗口合并时 将窗口对应 trigger 的状态进行合并比如使用会话窗口时。最后clear() 方法处理在对应窗口被移除时所需的逻辑。 2有两点需要注意 前三个方法通过返回 TriggerResult 来决定 trigger 如何应对到达窗口的事件。应对方案有以下几种 CONTINUE: 什么也不做FIRE: 触发计算PURGE: 清空窗口内的元素FIRE_AND_PURGE: 触发计算计算结束后清空窗口内的元素上面的任意方法都可以用来注册 processing-time 或 event-time timer。 3触发Fire与清除Purge 当 trigger 认定一个窗口可以被计算时它就会触发也就是返回 FIRE 或 FIRE_AND_PURGE。 【1.1】触发器代码 1以滑动计数窗口为例 DataStreamTuple2String, BigDecimal windowAggStream sensorStream.keyBy(SensorReadingWindow::getId)// 按照id 分组.countWindow(5, 2) // 滑动计数窗口 countWindow() 如下 public WindowedStreamT, KEY, GlobalWindow countWindow(long size, long slide) {return this.window(GlobalWindows.create()).evictor(CountEvictor.of(size)).trigger(CountTrigger.of(slide));} CountTrigger.of() 就是返回计数触发器 public static W extends Window CountTriggerW of(long maxCount) {return new CountTrigger(maxCount);} 其中 maxCount等于滑动步长2CountTrigger计数触发器定义如下 对于 onElement方法当 计数值大于 maxCount时则触发生成新窗口而maxCount等于滑动步长2 具体代码如下 public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {ReducingStateLong count (ReducingState)ctx.getPartitionedState(this.stateDesc);count.add(1L);if ((Long)count.get() this.maxCount) {count.clear();return TriggerResult.FIRE;} else {return TriggerResult.CONTINUE;}} 即对于 7个元素的流会在第2 4 6 个元素生成新窗口因为滑动步长为2 【2】清理器 evictor  1定义Evictor 可以在 trigger 触发后、调用窗口函数之前或之后从窗口中删除元素。 2Evictor 接口提供了两个方法实现此功能 evictBefore() 包含在调用窗口函数前的逻辑而 evictAfter() 包含在窗口函数调用之后的逻辑。 在调用窗口函数之前被移除的元素不会被窗口函数计算。 图1 CountEvictor类定义 3清理器分类 Flink 内置有三个 evictor CountEvictor: 仅记录用户指定数量的元素一旦窗口中的元素超过这个数量多余的元素会从窗口缓存的开头移除DeltaEvictor: 接收 DeltaFunction 和 threshold 参数计算最后一个元素与窗口缓存中所有元素的差值 并移除差值大于或等于 threshold 的元素。TimeEvictor: 接收 interval 参数以毫秒表示。 它会找到窗口中元素的最大 timestamp max_ts 并移除比 max_ts - interval 小的所有元素。【2.1】代码示例 对于 countWindow 计数窗口方法其使用了 Evictor如下 public WindowedStreamT, KEY, GlobalWindow countWindow(long size, long slide) {return this.window(GlobalWindows.create()).evictor(CountEvictor.of(size)).trigger(CountTrigger.of(slide));} CountEvictor 定义见图1其长度为 窗口大小 【3】滑动计数窗口代码 1滑动计数窗口参数 窗口大小5滑动步长2即生成新窗口频率每2条数据就会生成一个新窗口 2代码 /*** Description 滑动计数窗口算子* author xiao tang* version 1.0.0* createTime 2022年04月17日*/ public class WindowTest2_CountWindow {public static void main(String[] args) throws Exception {// 创建执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 从socket读取数据DataStreamString fileStream env.readTextFile(D:\\workbench_idea\\diydata\\flinkdemo2\\src\\main\\resources\\sensorTimeWindow.txt);// 转换为 SensorReader pojo类型DataStreamSensorReadingWindow sensorStream fileStream.map(x - {String[] arr x.split(,);return new SensorReadingWindow(arr[0], arr[1], new BigDecimal(arr[2]));});// 滑动计数窗口进行增量聚合来计算温度均值采用 Tuple2 DataStreamTuple2String, BigDecimal windowAggStream sensorStream.keyBy(SensorReadingWindow::getId)// 按照id 分组.countWindow(5, 2) // 滑动计数窗口.aggregate(new AggregateFunctionSensorReadingWindow, Tuple3String, BigDecimal, Integer, Tuple2String, BigDecimal() {Overridepublic Tuple3String, BigDecimal, Integer createAccumulator() {return new Tuple3(, BigDecimal.ZERO, 0); // 初始值}Overridepublic Tuple3String, BigDecimal, Integer add(SensorReadingWindow sensorReading, Tuple3String, BigDecimal, Integer accumulator) {return new Tuple3(sensorReading.getId(), sensorReading.getTemperature().add(accumulator.f1).setScale(2,BigDecimal.ROUND_HALF_UP), accumulator.f21);}Overridepublic Tuple2String, BigDecimal getResult(Tuple3String, BigDecimal, Integer accumulator) {return new Tuple2(accumulator.f0, accumulator.f1.divide(new BigDecimal(accumulator.f2)).setScale(2,BigDecimal.ROUND_HALF_UP));}Overridepublic Tuple3String, BigDecimal, Integer merge(Tuple3String, BigDecimal, Integer a, Tuple3String, BigDecimal, Integer b) {return new Tuple3String, BigDecimal, Integer(a.f0, a.f1.add(b.f1), a.f2 b.f2);}});// 打印windowAggStream.print(slideCountWindowAggStream);// 执行env.execute(slideCountWindowAggStreamJob);} } sensor文本 sensor1,2022-04-17 22:07:01,36.1 sensor2,2022-04-17 22:07:02,36.2 sensor1,2022-04-17 22:07:03,36.3 sensor2,2022-04-17 22:07:04,36.4 sensor1,2022-04-17 22:07:05,36.5 sensor1,2022-04-17 22:07:06,36.6 sensor1,2022-04-17 22:07:07,36.7 打印结果 slideCountWindowAggStream (sensor1,36.20) slideCountWindowAggStream (sensor2,36.30) slideCountWindowAggStream (sensor1,36.38) 3分析 sensor1 的温度是 36.1、 36.3、 36.5、 36.6、 36.7 sensor2 的温度是 36.2、36.4 所以 sensor2 的温度均值是 36.3 打印结果没有问题 对于 sensor1 的温度均值 又每2个生成一个新窗口则 第1个窗口的均值是 数据 36.1 和 36.3  的均值 第2个窗口的均值是数据{36.1  36.3  36.5 36.6 } 4个数据的均值四舍五入 【4】 滑动计数窗口代码变体 把sensor文本修改为多了一条数据第8条数据 sensor1,2022-04-17 22:07:01,36.1 sensor2,2022-04-17 22:07:02,36.2 sensor1,2022-04-17 22:07:03,36.3 sensor2,2022-04-17 22:07:04,36.4 sensor1,2022-04-17 22:07:05,36.5 sensor1,2022-04-17 22:07:06,36.6 sensor1,2022-04-17 22:07:07,36.7 sensor1,2022-04-17 22:07:07,36.7 打印结果为 slideCountWindowAggStream (sensor1,36.20) slideCountWindowAggStream (sensor2,36.30) slideCountWindowAggStream (sensor1,36.38) slideCountWindowAggStream (sensor1,36.56) 分析 【4】与【3】的源代码相同但输入数据sensor和打印结果不同原因如下 【4】有8条数据按照key分组如下 sensor1 的温度是 36.1、 36.3、 36.5、 36.6、 36.7 、36.7 sensor2 的温度是 36.2、36.4 sensor2的数据没变下文只讨论 sensor1的数据又每2个生成一个新窗口则窗口列表如下 第1个窗口的均值是 数据 36.1 和 36.3  的均值 36.2第2个窗口的均值是 数据 36.1 36.336.5, 36.6  的均值 36.375的四舍五入值 36.38第3个窗口的均值是 数据  36.336.5, 36.6 36.7, 36.7  的均值 36.56  的四舍五入值 36.56注意第3个窗口的数据项不是 36.1, 36.336.5, 36.6 36.7, 36.7因为窗口大小为5包不下6个数字 【小结】 经过以上分析本文应该是把窗口计算规则讲明白了 在进行窗口聚合时需要关注2个参数即 窗口大小滑动步长
http://www.pierceye.com/news/746179/

相关文章:

  • 网站建设书店用户分几类网站建设项目采购公告
  • 如何做企业网站宣传wordpress站内搜索次数
  • 加盟招商推广网站如何做品牌运营与推广
  • 网站做分布式部署湖南平台网站建设设计
  • 沈阳市建设工程项目管理中心网站网络项目网
  • 沈阳网站建设成创输入网址跳到别的网站
  • 课程网站开发建设商务网站的费用
  • 资讯网站优化排名wordpress 删除所有文章
  • 旅游海外推广网站建设方案wordpress外观无法编辑
  • 品牌手表网站网站推广律师关键词有哪些
  • 卖视频会员个人网站怎么做推广网站的图片怎么做
  • 服务器关闭 网站被k微信公众号推广的好处
  • 工业设计招聘信息网站做网站首页轮播图代码
  • 央企网站开发手机网站 input
  • 千里马招标网站东莞网站推广行者seo08
  • 网络工程专业主要学什么百度seo课程
  • 网站定制开发收费标准是多少网站导航功能
  • 东莞网站(建设信科网络)公众号小程序开发公司
  • dw网站结构图怎么做4399电脑版网页链接
  • 网站服务器网址招聘seo专员
  • 个人网站模板psd主机服务器网站 怎么做
  • 网站开发公司的义务深圳 电子商务网站开发
  • 北京外贸网站设计备案宁波网站推广专业的建站优化公司
  • 政协系统网站建设织梦手机网站
  • 网站建设上海网站制作如何修改上线网站
  • 漫画网站建设教程网站描述怎么设置
  • 网站左侧树形导航怎么做农村网站做移动
  • 建立企业网站方案php做简单网站教程
  • 一个网站交互怎么做的银行营销活动方案
  • 网站读取速度慢58同城二手房出售