钓鱼网站开发,公司网站设计的公司,ps做的网站图片好大,重庆响应式网页建设公司Flink Window 窗口
在Flink流式计算中#xff0c;最重要的转换就是窗口转换Window#xff0c;在DataStream转换图中#xff0c;可以发现处处都可以对DataStream进行窗口Window计算。 窗口#xff08;window#xff09;就是从 Streaming 到 Batch 的一个桥梁。窗口将无界流…Flink Window 窗口
在Flink流式计算中最重要的转换就是窗口转换Window在DataStream转换图中可以发现处处都可以对DataStream进行窗口Window计算。 窗口window就是从 Streaming 到 Batch 的一个桥梁。窗口将无界流unbounded data stream划分很多有界流bounded stream对无界流进行计算。 在实际业务需求中往往说窗口指的就是基于时间Time窗口比如最近1分钟内数据指的就是1分钟时间内产生的数据放在窗口中。 Flink Window 窗口的结构中有两个必须的两个操作 第一、窗口分配器Window Assigner将数据流中的元素分配到对应的窗口。第二、窗口函数Window Function当满足窗口触发条件后对窗口内的数据使用窗口处理函数Window Function进行处理常用的有reduce、aggregate、process。其他的trigger、evictor则是窗口的触发和销毁过程中的附加选项主要面向需要更多自定义的高级编程者如果不设置则会使用默认的配置。 上图是窗口的生命周期示意图假如设置的是一个10分钟的滚动窗口第一个窗口的起始时间是0:00结束时间是0:10后面以此类推。当数据流中的元素流入后窗口分配器会根据时间Event Time或Processing Time分配给相应的窗口。相应窗口满足了触发条件比如已经到了窗口的结束时间会触发相应的Window Function进行计算。 在Flink计算引擎中支持窗口类型有很多种几乎所有Streaming流式计算引擎需要实现窗口都支持。 1、时间窗口TimeWindow 按照时间间隔划分出窗口并对窗口中数据进行计如每xx分钟统计最近xx分钟的数据划分为**滚动Tumbling窗口和滑动Sliding**窗口 2、计数窗口CountWindow 按照数据条目数进行设置窗口比如每10条数据统计一次划分为**滚动Tumbling窗口和滑动Sliding**窗口[此种方式窗口计算在实际项目中使用不多但是有些特殊业务需要需要使用此场景。]
package com.lyj.sx.flink.day05;import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.AllWindowedStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;public class CountWindowAllDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());DataStreamSourceString source env.socketTextStream(192.168.25.62, 8889);SingleOutputStreamOperatorInteger map source.map(Integer::parseInt);//不keyBy直接划分窗口//窗口内的数据达到5条就生成一个窗口然后对窗口内的数据进行计算AllWindowedStreamInteger, GlobalWindow win map.countWindowAll(5);SingleOutputStreamOperatorInteger sum win.sum(0);sum.print();env.execute(pxj);}
}