当前位置: 首页 > news >正文

正能量网站免费进入无需下载中园建设银行官方网站

正能量网站免费进入无需下载,中园建设银行官方网站,如何在手机上开自己的网站,对网站开发流程的了解星光下的赶路人star的个人主页 内心的平静始于不再让他人掌控你的感情 文章目录 0、前言1、窗口#xff08;Window#xff09;1.1 窗口的概念1.2 窗口的分类1.3 窗口API概览1.4 窗口分配器#xff08;Window Assigner#xff09;1.4.1 时间窗口1.4.2 计数窗口 1.5 窗口函数…                        星光下的赶路人star的个人主页 内心的平静始于不再让他人掌控你的感情 文章目录 0、前言1、窗口Window1.1 窗口的概念1.2 窗口的分类1.3 窗口API概览1.4 窗口分配器Window Assigner1.4.1 时间窗口1.4.2 计数窗口 1.5 窗口函数1.5.1 增量聚合函数ReduceFunction/AggregateFunction1.5.2 全窗口函数Full Window Functions1.5.3 增量聚合和全窗口函数的结合使用 1.6 其它API1.6.1 触发器Trigger1.6.2 移除器Evictor 0、前言 在批处理统计中我们可以等待一批数据都到齐后统一处理。但是在实时处理统计中我们是来一条就得处理一条那么我们怎么统计最近一段时间内的数据呢引入“窗口”。 所谓的“窗口”一般就是划定的一段时间范围也就是“时间窗”对在这范围内的数据进行处理就是所谓的窗口计算。所以窗口和时间往往是分不开的。接下来我们就深入了解一下Flink中的时间语义和窗口的应用。 1、窗口Window 1.1 窗口的概念 Flink是一种流式计算引擎主要是来处理无界数据流的数据源源不断、无穷无尽。想要更加方便高效地处理无界流一种方式就是将无限数据切割成有限的“数据块”进行处理这就是所谓的“窗口”Window。 注意Flink中窗口并不是静态准备好的而是动态创建——当有落在这个窗口区间范围的数据达到时才创建对应的窗口。另外这里我们认为到达窗口结束时间时窗口就触发计算并关闭事实上“触发计算”和“窗口关闭”两个行为也可以分开. 1.2 窗口的分类 上面的其实是最简单的例子是最简单的一种时间窗口。在Flink中窗口的应用非常灵活我们可以使用各种不同类型的窗口来实现需求。接下来我们就从不同的角度对Flink中内置的窗口做一个分类说明。 1、按照驱动类型分类 2、按照窗口分配数据的规则分类 根据分配数据的规则窗口的具体实现可以分为四类滚动窗口Tumbling Window、滑动窗口Sliding Window、会话窗口Session Window以及全局窗口Global Window。 1滚动窗口Tumbling Window 滚动窗口有固定的大小是一种对数据进行“均匀切片”的划分方式。窗口之间没有重叠也不会有间隔是“首尾相接”的状态。这是最简单的窗口形式每一个数据都会被分配到一个窗口而且只会属于一个窗口。 2滑动窗口Sliding Window 3会话窗口 4全局窗口 就是把所有数据当做在同一个窗口。这种窗口没有结束的时候默认是不会做触发计算的。如果希望它能对数据进行计算处理还需要自定义触发器。 1.3 窗口API概览 1、按键分区Keyed和非按键分区Non-Keyed 在定义窗口操作之前首先要确定到底是基于按键分区Keyed的数据流KeyedStream来开窗还是直接在没有按键分区的DataStream上开窗。也就是说在调用窗口算子之前是否有keyBy操作 1按键分区窗口Keyed Window 经过按键分区KeyBy操作后数据流会按照key被分为多条逻辑流Logical Streams这就是KeyedStream。基于KeyedStream进行窗口操作时窗口计算会在多个并行子任务上同时执行。相同key的数据会被发送到一个并行子任务而窗口操作会基于每个key进行单独的处理。所以可以认为每个key上都定义了一组窗口各自独立地进行统计计算。 在代码实现上我们需要先对DataStream调用.keyBy()进行按键分区然后再调用.window()定义窗口。 stream.keyBy(...).window(...)2非按键分区 如果没有进行keyBy那么原始的DataStream就不会分成多条逻辑流。这时窗口逻辑只能在一个任务task上执行就相当于并行度变成了1。 在代码中直接基于DataStream调用.windowAll()定义窗口。 stream.windowAll(...)注意对于非按键分区的窗口操作手动调大窗口算子的并行度也是无效的windowAll本身就是一个非并行的操作。 2、代码中窗口API的调用 窗口操作主要有两个部分窗口分配器Window Assigners和窗口函数Window Functions。 stream.keyBy(key selector).window(window assigner).aggregate(window function)其中.window()方法需要传入一个窗口分配器它指明了窗口的类型而后面的.aggregate()方法传入一个窗口函数作为参数它用来定义窗口具体的处理逻辑。窗口分配器有各种形式而窗口函数的调用方法也不止.aggregate()一种。 1.4 窗口分配器Window Assigner 定义窗口分配器Window Assaginer是构建窗口算子的第一步它的作用就是定义数据应该被分配到哪个窗口。所以可以说窗口分配器其实就是在指定窗口的类型。 窗口分配器最通用的定义方式就是调用.window()方法。这个方法需要传入一个WindowAssiger作为参数返回WindowedStream。如果是非按键分区窗口那么直接调用.windowAll()方法同样传入一个WindowAssigner返回的是AllWindowedStream。 窗口按照驱动类型可以分成时间窗口和计数窗口而按照具体的分配规则又有滚动窗口、滑动窗口、会话窗口、全局窗口四种。除去需要自定义的全局窗口外其他常用的类型Flink中都给出了内置的分配器的实现。 1.4.1 时间窗口 时间窗口是最常用的窗口类型又可以细分为滚动、滑动和会话三种。 1滚动处理时间窗口 窗口分配器由类TumblingProcessingTimeWindows提供需要调用它的静态方法.of()。 stream.keyBy(...).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).aggregate(...)这里.of()方法需要传入一个Time类型的参数size表示滚动窗口的大小我们这里创建了一个长度为5秒的滚动窗口。 另外.of()还有一个重载方法可以传入两个Time类型的参数size和offset。第一个参数当然还是窗口大小第二个参数则表示窗口起始点的偏移量。 2滑动处理时间窗口 窗口分配器由类SlidingProcessingTimeWindows提供同样需要调用它的静态方法.of()。 stream.keyBy(...).window(SlidingProcessingTimeWindows.of(Time.seconds(10)Time.seconds(5))).aggregate(...)这里.of()方法需要传入两个Time类型的参数size和slide前者表示滑动窗口的大小后者表示滑动窗口的滑动步长。我们这里创建了一个长度为10秒、滑动步长为5秒的滑动窗口。 滑动窗口同样可以追加第三个参数用于指定窗口起始点的偏移量用法与滚动窗口完全一致。 3处理时间会话窗口 窗口分配器由类ProcessingTimeSessionWindows提供需要调用它的静态方法.withGap()或者.withDynamicGap()。 stream.keyBy(...).window(ProcessingTimeSessionWindows.withGap(Time.seconds(10))).aggregate(...)这里.withGap()方法需要传入一个Time类型的参数size表示会话的超时时间也就是最小间隔session gap。我们这里创建了静态会话超时时间为10秒的会话窗口。 另外还可以调用withDynamicGap()方法定义session gap的动态提取逻辑。 4滚动事件时间窗口 窗口分配器由类TumblingEventTimeWindows提供用法与滚动处理事件窗口完全一致。 stream.keyBy(...).window(TumblingEventTimeWindows.of(Time.seconds(5))).aggregate(...)5滑动事件窗口 窗口分配器由类SlidingEventTimeWindows提供用法与滑动处理事件窗口完全一致。 stream.keyBy(...).window(SlidingEventTimeWindows.of(Time.seconds(10)Time.seconds(5))).aggregate(...)6事件时间会话窗口 窗口分配器由类EventTimeSessionWindows提供用法与处理事件会话窗口完全一致。 stream.keyBy(...).window(EventTimeSessionWindows.withGap(Time.seconds(10))).aggregate(...)1.4.2 计数窗口 计数窗口概念非常简单本身底层是基于全局窗口Global Window实现的。Flink为我们提供了非常方便的接口直接调用.countWindow()方法。根据分配规则的不同又可以分为滚动计数窗口和滑动计数窗口两类下面我们就来看它们的具体实现。 1滚动计数窗口 滚动计数窗口只需要传入一个长整型的参数size表示窗口的大小。 stream.keyBy(...).countWindow(10)我们定义了一个长度为10的滚动计数窗口当窗口中元素数量达到10的时候就会触发计算执行并关闭窗口。 2滚动计数窗口 与滚动计数窗口类似不过需要在.countWindow()调用时传入两个参数size和slide前者表示窗口大小后者表示滑动步长。 stream.keyBy(...).countWindow(103)我们定义了一个长度为10、滑动步长为3的滑动计数窗口。每个窗口统计10个数据每隔3个数据就统计输出一次结果。 3全局窗口 全局窗口是计数窗口的底层实现一般在需要自定义窗口时使用。它的定义同样是直接调用.window()分配器由GlobalWindows类提供。 stream.keyBy(...).window(GlobalWindows.create());需要注意使用全局窗口必须自行定义触发器才能实现窗口计算否则起不到任何作用。 1.5 窗口函数 定义了窗口分配器我们只是知道了数据属于哪个窗口可以将数据收集起来了至于收集起来到底是要做什么还得看窗口函数。所以在窗口分配器之后必须再接上一个定义窗口如何计算的操作这就是所谓的“窗口函数”Window Functions。 窗口函数定义了要对窗口中收集的数据做的计算操作根据处理的方式可以分为两类增量聚合函数和全窗口函数。 1.5.1 增量聚合函数ReduceFunction/AggregateFunction 窗口将数据收集起来最基本的处理操作当然就是进行聚合。我们可以每来一个数据就在之前结果上聚合一次这就是“增量聚合”。 典型的增量聚合函数有两个ReduceFunction和AggregateFunction。 1、归约聚合ReduceFunction 代码示例求水位累加值 /*** 特点* 两两聚合* 输入和输出的类型一样*/ public class Demo06_Reduce {public static void main(String[] args) throws Exception {//创建Flink配置类空参创建的话都是默认值Configuration configuration new Configuration();//修改配置类中的WebUI端口号configuration.setInteger(rest.port,3333);//创建Flink环境并且传入配置对象StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(configuration);env.socketTextStream(hadoop102,9999)//映射.map(new WaterSensorFunction())//全局计数滑动窗口.countWindowAll(3)//累加.reduce((waterSensor, t1) - {t1.setVc(waterSensor.getVc()t1.getVc());return t1;}).print();env.execute();}} 测试截图 2、聚合函数AggregateFunction ReduceFunction可以解决大多数归约聚合的问题但是这个接口有一个限制就是聚合状态的类型、数据结果的类型必须和输入数据类型一样。 Flink Window API中的aggragate就突破了这个限制可以定义更加灵活的窗口聚合操作。这个方法需要传入一个AggregateFunction的实现类作为参数。 AggregateFunction可以看作是ReduceFunction通用版本这里有三种类型输入类型IN、累加器类型ACC和输出类型OUT。输入类型IN就是输入流中元素的数据类型累加器类型ACC则是我们进行聚合的中间状态类型而数据类型当然就是最终计算结果的类型了。 接口中有四个方法 createAccumulator():创建一个累加器这就是为聚合创建一个初始状态每个聚合任务只会调用一次。add():将输入的元素添加到累加器中。getResult():从累加器中提取聚合的输出结果merge():合并两个累加器并将合并后的状态作为一个累加器返回 所以可以看到AggregateFunction的工作原理是先调用createAccumulator()方法为任务初始化一个状态累加器而后每来一个数据就调用一次add方法对数据进行聚合得到的结果保存在状态中等到了窗口需要输出时再调用getResult方法得到计算结果。很明显与ReduceFunction相同AggregateFunction也是增量式的聚合而由于输入、中间状态、输出的类型可以不同使得应用更加灵活方便。 代码实现 /*** 输入和输出的类型不一样sum、max、min、minBy、maxBy、reduce就不行了**可以考虑用aggregate*/ public class Demo07_Aggregate {public static void main(String[] args) throws Exception {//创建Flink配置类空参创建的话都是默认值Configuration configuration new Configuration();//修改配置类中的WebUI端口号configuration.setInteger(rest.port,3333);//创建Flink环境并且传入配置对象StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(configuration);env.socketTextStream(hadoop102,9999).map(new WaterSensorFunction())//全局计数滚动窗口.countWindowAll(3)/*** 统计窗口中所有数据的vc之和* AggregateFunctionIN,ACC,OUT* IN:输入窗口中元素的类型* ACC累加器。聚合中使用的中间的缓存类型* OUT输出的类型** 以上三种类型都可以不一致*///输出 vc之和xxx.aggregate(new AggregateFunctionWaterSensor, Integer, String() {//创建一个累加器对象 (在一个窗口创建时执行一次)Overridepublic Integer createAccumulator() {System.out.println(我是一个累加器);return 0;}//把输入的每个元素累加到累加器上Overridepublic Integer add(WaterSensor waterSensor, Integer integer) {return integerwaterSensor.getVc();}//输出最终结果在窗口关闭执行时执行一次Overridepublic String getResult(Integer integer) {System.out.println(我是输出最终结果);return vc之和integer;}//不用写。在DataSetAPI批处理中才要实现Overridepublic Integer merge(Integer integer, Integer acc1) {return null;}}).print();env.execute();} } 测试截图 另外Flink也为窗口的聚合提供了一系列预定义的简单聚合方法可以直接基于WindowedStream调用。主要包括.sum()/max()/maxBy()/min()/minBy()与KeyedStream的简单聚合非常相似。它们的底层其实都是通过AggregateFunction来实现的。 1.5.2 全窗口函数Full Window Functions 有些场景下我们要做的计算必须基于全部的数据才有效这时做增量聚合就没什么意义了另外输出的结果可能要包含上下文中的一些信息比如窗口的起始时间这是增量聚合函数做不到的。 所以我们还需要有更丰富的窗口计算方式。窗口操作中的另一大类就是全窗口函数。与增量函数不同全局窗口函数首先需要收集窗口中的数据并在内部缓存起来等到窗口要输出结果的时候再取出数据进行计算。 在Flink中全局窗口函数也是有两种:WindowFunction和ProcessWindowFunction。 1、窗口函数WindowFunction WindowFunction字面上就是“窗口函数”它其实是老版本的通用窗口函数接口。我们可以基于WindowedStream调用.apply()方法传入一个WindowFunction的实现类。 stream.keyBy(key selector).window(window assigner).apply(new MyWindowFunction());这个类中可以获取到包含窗口所有数据的可迭代集合Iterable还可以拿到窗口Window本身的信息。 不过WindowFunction能提供的上下文信息比较少也没有更高级的功能。事实上它的作用可以被ProcessWindowFunction全覆盖所以之后可能会逐渐弃用。 2、处理窗口函数ProcessWindowFunction ProcessWindowFunction是Window API中最底层的通用窗口函数接口。之所以说它“最底层”是因为除了可以拿到窗口中的所有数据之外ProcessWindowFunction还可以获取到一个“上下文对象”Context。这个上下文对象非常强大不仅能够获取窗口信息还可以访问当前的时间和状态信息。这里的时间就包括了处理时间processing time和事件时间水位线event time watermark。这就使得ProcessWindowFunction更加灵活、功能更加丰富其实就是一个增强版的WindowFunction。 public class WindowProcessDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(hadoop102, 9999).map(new WaterSensorFunction());KeyedStreamWaterSensor, String sensorKS sensorDS.keyBy(sensor - sensor.getId());// 1. 窗口分配器WindowedStreamWaterSensor, String, TimeWindow sensorWS sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));SingleOutputStreamOperatorString process sensorWS.process(new ProcessWindowFunctionWaterSensor, String, String, TimeWindow() {Overridepublic void process(String s, Context context, IterableWaterSensor elements, CollectorString out) throws Exception {long count elements.spliterator().estimateSize();long windowStartTs context.window().getStart();long windowEndTs context.window().getEnd();String windowStart DateFormatUtils.format(windowStartTs, yyyy-MM-dd HH:mm:ss.SSS);String windowEnd DateFormatUtils.format(windowEndTs, yyyy-MM-dd HH:mm:ss.SSS);out.collect(key s 的窗口[ windowStart , windowEnd )包含 count 条数据 elements.toString());}});process.print();env.execute();} }测试截图 1.5.3 增量聚合和全窗口函数的结合使用 在实际应用中我们往往希望兼具这两者的优点把它们结合在一起使用。Flink的Window API就给我们实现了这样的用法。 我们之前在调用WindowedStream的.reduce()和.aggregate()方法时只是简单地直接传入了一个ReduceFunction或AggregateFunction进行增量聚合。除此之外其实还可以传入第二个参数一个全窗口函数可以是WindowFunction或者ProcessWindowFunction。 // ReduceFunction与WindowFunction结合 public R SingleOutputStreamOperatorR reduce(ReduceFunctionT reduceFunctionWindowFunctionTRKW function) // ReduceFunction与ProcessWindowFunction结合 public R SingleOutputStreamOperatorR reduce(ReduceFunctionT reduceFunctionProcessWindowFunctionTRKW function)// AggregateFunction与WindowFunction结合 public ACCVR SingleOutputStreamOperatorR aggregate(AggregateFunctionTACCV aggFunctionWindowFunctionVRKW windowFunction)// AggregateFunction与ProcessWindowFunction结合 public ACCVR SingleOutputStreamOperatorR aggregate(AggregateFunctionTACCV aggFunction,ProcessWindowFunctionVRKW windowFunction)这样调用的处理机制是基于第一个参数增量聚合函数来处理窗口数据每来一个数据就做一次聚合等到窗口需要触发计算的时候就调用第二个参数全局窗口函数的处理逻辑输出结果。需要注意的是这里的全窗口函数就不再缓存所有数据了而是直接将增量聚合函数的结果拿来当做了Iterable类型的输入。 public class WindowAggregateAndProcessDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(hadoop102, 9999).map(new WaterSensorFunction());KeyedStreamWaterSensor, String sensorKS sensorDS.keyBy(sensor - sensor.getId());// 1. 窗口分配器WindowedStreamWaterSensor, String, TimeWindow sensorWS sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));// 2. 窗口函数/*** 增量聚合 Aggregate 全窗口 process* 1、增量聚合函数处理数据 来一条计算一条* 2、窗口触发时 增量聚合的结果只有一条 传递给 全窗口函数* 3、经过全窗口函数的处理包装后输出** 结合两者的优点* 1、增量聚合 来一条计算一条存储中间的计算结果占用的空间少* 2、全窗口函数 可以通过 上下文 实现灵活的功能*/// sensorWS.reduce() //也可以传两个SingleOutputStreamOperatorString result sensorWS.aggregate(new MyAgg(),new MyProcess());result.print();env.execute();}public static class MyAgg implements AggregateFunctionWaterSensor, Integer, String {Overridepublic Integer createAccumulator() {System.out.println(创建累加器);return 0;}Overridepublic Integer add(WaterSensor value, Integer accumulator) {System.out.println(调用add方法,valuevalue);return accumulator value.getVc();}Overridepublic String getResult(Integer accumulator) {System.out.println(调用getResult方法);return accumulator.toString();}Overridepublic Integer merge(Integer a, Integer b) {System.out.println(调用merge方法);return null;}}// 全窗口函数的输入类型 增量聚合函数的输出类型public static class MyProcess extends ProcessWindowFunctionString,String,String,TimeWindow {Overridepublic void process(String s, Context context, IterableString elements, CollectorString out) throws Exception {long startTs context.window().getStart();long endTs context.window().getEnd();String windowStart DateFormatUtils.format(startTs, yyyy-MM-dd HH:mm:ss.SSS);String windowEnd DateFormatUtils.format(endTs, yyyy-MM-dd HH:mm:ss.SSS);long count elements.spliterator().estimateSize();out.collect(key s 的窗口[ windowStart , windowEnd )包含 count 条数据 elements.toString());}} }测试截图 1.6 其它API 对于一个窗口算子而言窗口分配器和窗口函数是必不可少的。除此之外Flink还提供了其他一些可选的API让我们可以更加灵活地控制窗口行为。 触发器和移除器在日常使用中很少会用到这里仅仅简单介绍其语法格式。 1.6.1 触发器Trigger 触发器主要是用来控制窗口什么时候触发计算。所谓的“触发计算”本质上就是执行窗口函数所以可以认为是计算得到的结果并输出的过程。 基于WindowedStream调用.triggrt()方法就可以传入一个自定义的窗口触发器Trigger·。 stream.keyBy(...).window(...).trigger(new MyTrigger())1.6.2 移除器Evictor 移除器主要用来定义移除某些数据的逻辑。基于WindowedStream调用.evictor()方法就可以传人一个自定义的移除器Evictor。Evictor是一个接口不同的窗口类型都有各自预实现的移除器。 stream.keyBy(...).window(...).evictor(new MyEvictor())您的支持是我创作的无限动力 希望我能为您的未来尽绵薄之力 如有错误谢谢指正若有收获谢谢赞美
http://www.pierceye.com/news/669843/

相关文章:

  • 克隆网站后台做系统用哪个网站好
  • html5 手机网站页面实例wordpress 路由404
  • 百度地图嵌入公司网站wordpress如何去掉分类里面的大字
  • 山东住房与城乡建设网站够完美网站建设
  • 班级网站建设首页报告如何查询一个网站是否备案
  • 艺术设计类网站石家庄公司的网站设计
  • 舞钢网站建设企业做网站需要什么软件
  • 网站开发上市公司专业的网站建设价格低
  • 备案网站有哪些资料公司名字大全四个字
  • 网站推广预期达到的目标建湖人才网手机版
  • 营销网站设计公司排名wordpress图片缓冲
  • 山西建设官方网站第三方网站流量统计
  • 企业网站用wordpress龙岗网站建设网站排名优化
  • 成都建设网站哪家好事件营销的特点
  • 如何利用模板做网站视频wordpress手机版边侧导航
  • 网站制作在哪里找wordpress 设置登陆界面
  • 济南seo网站建设上海seo网站优化_搜索引擎排名_优化型企业网站建设_锦鱼网络
  • 深圳网站备影楼网站建设
  • asp网站开门桂林市区
  • dw个人网站主页怎么做网站前端用什么语言
  • 网站建设是平面设计吗网站如何做中英文双语言
  • 网站关键词先后论坛网站在线生成
  • 无为建设局网站安装wordpress xampp
  • 广药网站建设试卷wordpress人力资源模板下载
  • 电商网站规划的开发背景明年做那个网站能致富
  • 网站建设及托管合同wordpress页面批量生成二维码
  • 益阳市住房和建设局 网站哪些网站可做矿机期货
  • 网站开发哪里有html5网站赏析
  • 襄阳网站建设八零后做的网站怎么上传到网上运行
  • 学网站开发培训学校专业集团门户网站建设费用