当前位置: 首页 > news >正文

沈阳市三好街网站建设公司股票发行ipo和seo是什么意思

沈阳市三好街网站建设公司,股票发行ipo和seo是什么意思,网页制作百度百科,wordpress跟php前言 冬天学习成本太高了#xff0c;每天冻得要死#xff0c;自习室人满为患#xff0c;确实是辛苦。学校基本的硬件条件差的一批#xff08;图书馆贼小贼偏僻、老教室暖气还没有地板热、空教室还得自己一个一个挨着找#xff09;#xff0c;个体无法改变环境只能顺应了每天冻得要死自习室人满为患确实是辛苦。学校基本的硬件条件差的一批图书馆贼小贼偏僻、老教室暖气还没有地板热、空教室还得自己一个一个挨着找个体无法改变环境只能顺应了艹受不了了去校长信箱轰tnd。 今天学习 Flink 处理函数学完这一块就剩状态管理、容错机制和 Flink SQL 了坚持坚持。学完再好好回顾回顾最后就是把剩余的一些框架Kafka、Flume等补齐了。 1、处理函数 之前所介绍的流处理 API无论是基本的转换、聚合还是更为复杂的窗口操作其实都是基于 DataStream 进行转换的所以可以统称为 DataStream API这也是 Flink 编程的核心。而我们知道为了让代码有更强大的表现力和易用性Flink 本身提供了多层 APIDataStream API 只是其中之一如图 在更底层我们可以不定义任何具体的算子比如 mapfilter或者 window而只是提炼出一个统一的“处理”process操作——它是所有转换算子的一个概括性的表达可以自定义处理逻辑我们之前可以从 process 函数中获得 上下文对象 ctx、实现侧输出流等所以这一层接口就被叫作“处理函数”process function。在处理函数中我们直面的就是数据流中最基本的元素数据事件event、状态state以及时间time。这就相当于对流有了完全的控制权。处理函数比较抽象没有具体的操作所以对于一些常见的简单应用比如求和、开窗口会显得有些麻烦不过正是因为它不限定具体做什么所以理论上我们可以做任何事情实现所有需求。就相当于我们 Spark 中的 RDD 编程它是最底层的东西所以一些上层无法实现的它都可以实现。 所以总结一句话就是只要是现有的算子实现不了的直接上 process 即可。 1.1、基本处理函数 1.1.1、处理函数的功能和基本使用 我们之前学习的转换算子一般只是针对某种具体操作来定义的能够拿到的信息比较有限。比如 map 算子我们实现的 MapFunction 中只能获取到当前的数据定义它转换之后的形式而像窗口聚合这样的复杂操作AggregateFunction 中除数据外还可以获取到当前的状态以累加器 Accumulator 形式。另外我们还介绍过富函数类比如 RichMapFunction它提供了获取运行时上下文的方法 getRuntimeContext()可以拿到状态还有并行度、任务名称之类的运行时信息。         但是无论哪种算子如果我们想要访问事件的时间戳或者当前的水位线信息都是完全做不到的。在定义生成规则之后水位线会源源不断地产生像数据一样在任务间流动可我们却不能像数据一样去处理它因为跟时间相关的操作目前我们只会用窗口来处理。而在很多应用需求中要求我们对时间有更精细的控制需要能够获取水位线甚至要“把控时间”、定义什么时候做什么事这就不是基本的时间窗口能够实现的了。         这就需要我们使用——处理函数ProcessFunction了。处理函数提供了一个“定时服务”TimerService我们可以通过它访问流中的事件event、时间戳timestamp、水位线watermark甚至可以注册“定时事件”。而且处理函数继承了 AbstractRichFunction 抽象类所以拥有富函数类的所有特性同样可以访问状态state和其他运行时信息。此外处理函数还可以直接将数据输出到侧输出流side output中。所以处理函数是最为灵活的处理方法可以实现各种自定义的业务逻辑同时也是整个 DataStream API 的底层基础。         处理函数的使用与基本的转换操作类似只需要直接基于 DataStream 调用.process()方法就可以了。方法需要传入一个 ProcessFunction 作为参数用来定义处理逻辑我们之前通过给 process 方法传入一个实现了 ProcessFunction 抽象类的匿名内部类来实现侧输出流、通过给 process 方法传入一个实现了 CoProcessFunction 抽象类的匿名内部类来实现合流 。 stream.process(new MyProcessFunction()); 这里 ProcessFunction 不是接口而是一个抽象类继承了 AbstractRichFunctionMyProcessFunction 是它的一个具体实现。所以所有的处理函数都是富函数RichFunction富函数可以调用的东西这里同样都可以调用。 1.1.2、ProcessFunction 解析 在源码中我们可以看到抽象类 ProcessFunction 继承了 AbstractRichFunction有两个泛型类型参数I 表示 Input也就是输入的数据类型O 表示 Output也就是处理完成之后输出的数据类型。         内部单独定义了两个方法一个是必须要实现的抽象方法.processElement()另一个是非抽象方法.onTimer()。 package org.apache.flink.streaming.api.functions;import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.functions.AbstractRichFunction; import org.apache.flink.streaming.api.TimeDomain; import org.apache.flink.streaming.api.TimerService; import org.apache.flink.util.Collector; import org.apache.flink.util.OutputTag;PublicEvolving public abstract class ProcessFunctionI, O extends AbstractRichFunction {// ...// 核心处理逻辑public abstract void processElement(I value, Context ctx, CollectorO out) throws Exception;// 定时器public void onTimer(long timestamp, OnTimerContext ctx, CollectorO out) throws Exception {}// ... }1. 抽象方法.processElement() 用于“处理元素”定义了处理的核心逻辑。这个方法对于流中的每个元素都会调用一次参数包括三个输入数据值 value上下文 ctx以及“收集器”Collectorout。方法没有返回值处理之后的输出数据是通过收集器 out 来定义的。 value当前流中的输入元素也就是正在处理的数据类型与流中数据类型一致。ctx类型是 ProcessFunction 中定义的内部抽象类 Context表示当前运行的上下文可以获取到当前的时间戳并提供了用于查询时间和注册定时器的“定时服务”(TimerService)以及可以将数据发送到“侧输出流”side output的方法.output()。 Context 抽象类定义如下 public abstract class Context {public abstract Long timestamp();public abstract TimerService timerService();public abstract X void output(OutputTagX outputTag, X value);} out“收集器”类型为 Collector用于返回输出数据。使用方式与 flatMap算子中的收集器完全一样直接调用 out.collect()方法就可以向下游发出一个数据。这个方法可以多次调用也可以不调用。 通过几个参数的分析不难发现ProcessFunction 可以轻松实现 flatMap 这样的基本转换功能当然 map、filter 更不在话下而通过富函数提供的获取上下文方法.getRuntimeContext()也可以自定义状态state进行处理这也就能实现聚合操作的功能了。自定义状态的具体实现我们会在后面学到 “状态管理” 的时候再说。 2. 非抽象方法.onTimer() 用于定义定时触发的操作这是一个非常强大、也非常有趣的功能。这个方法只有在注册好的定时器触发的时候才会调用而定时器是通过“定时服务”TimerService 来注册的。 打个比方注册定时器timer就是设了一个闹钟到了设定时间就会响而.onTimer()中定义的就是闹钟响的时候要做的事。所以它本质上是一个基于时间的“回调”callback方法通过时间的进展来触发在事件时间语义下就是由水位线watermark来触发了。         与.processElement()类似定时方法.onTimer()也有三个参数时间戳timestamp上下文ctx以及收集器out。这里的 timestamp 是指设定好的触发时间事件时间语义下当然就是水位线了。另外这里同样有上下文和收集器所以也可以调用定时服务TimerService以及任意输出处理之后的数据。 既然有.onTimer()方法做定时触发我们用 ProcessFunction 也可以自定义数据按照时间分组、定时触发计算输出结果这其实就实现了窗口window的功能。所以说 ProcessFunction是真正意义上的终极奥义用它可以实现一切功能。我们也可以看到处理函数都是基于事件触发的。水位线就如同插入流中的一条数据一样只不过处理真正的数据事件调用是.processElement()方法而处理水位线事件调用的是.onTimer()。 这里需要注意的是上面的.onTimer()方法只是定时器触发时的操作而定时器timer真正的设置需要用到上下文 ctx 中的定时服务。在 Flink 中只有“按键分区流”KeyedStream才支持设置定时器的操作所以之前的代码中我们并没有使用定时器。所以基于不同类型的流可以使用不同的处理函数它们之间还是有一些微小的区别的。接下来我们就介绍一下处理函数的分类。 1.1.3、处理函数的分类 1. 处理函数的功能和使用 Flink 中的处理函数其实是一个大家族ProcessFunction 只是其中一员。我们知道DataStream 在调用一些转换方法之后有可能生成新的流类型例如调用 .keyBy() 之后得到 KeyedStream进而再调用.window()之后得到 WindowedStream。对于不同类型的流其实都可以直接调用.process()方法进行自定义处理这时传入的参数就都叫作处理函数。当然它们尽管本质相同都是可以访问状态和时间信息的底层 API可彼此之间也会有所差异。Flink 提供了 8 个不同的处理函数 ProcessFunction。最基本的处理函数基于 DataStream 直接调用.process()时作为参数传入。KeyedProcessFunction。对流按键分区后的处理函数基于 KeyedStream 调用.process()时作为参数传入。要想使用定时器比如基于 KeyedStream。ProcessWindowFunction。开窗之后的处理函数也是全窗口函数的代表。基于 WindowedStream 调用.process() 时作为参数传入。ProcessAllWindowFunction。同样是开窗之后的处理函数没有 keyby 的话传这个基于 AllWindowedStream 调用.process()时作为参数传入。CoProcessFunction。合并connect两条流之后的处理函数基于 ConnectedStreams 调用.process()时作为参数传入。关于流的连接合并操作我们会在后续章节详细介绍。ProcessJoinFunction。间隔连接interval join两条流之后的处理函数基于 IntervalJoined 调用.process()时作为参数传入。BroadcastProcessFunction。广播连接流处理函数基于 BroadcastConnectedStream 调用.process()时作为参数传入。这里的“广播连接流”BroadcastConnectedStream是一个未 keyBy 的普通 DataStream 与一个广播流BroadcastStream做连接conncet之后的产物。KeyedBroadcastProcessFunction。按键分区的广播连接流处理函数同样是基于 BroadcastConnectedStream 调用.process()时作为参数传入。与 BroadcastProcessFunction 不同的是这时的广播连接流是一个 KeyedStream与广播流BroadcastStream做连接之后的产物。 接下来我们就对经常用到的 KeyedProcessFunction 和 ProcessWindowFunction 的具体用法展开详细说明。 2.1、按键分区处理函数KeyedProcessFunction 在 Flink 程序中为了实现数据的聚合统计或者开窗计算之类的功能我们一般都要先用 keyBy 算子对数据流进行“按键分区”得到一个 KeyedStream。也就是指定一个键key按照它的哈希值hash code将数据分成不同的“组”然后分配到不同的并行子任务上执行计算这相当于做了一个逻辑分流的操作从而可以充分利用并行计算的优势实时处理海量数据。         另外我们在上节中也提到只有在 KeyedStream 中才支持使用 TimerService 设置定时器的操作。所以一般情况下我们都是先做了 keyBy 分区之后再去定义处理操作代码中更加常见的处理函数是 KeyedProcessFunction最基本的 ProcessFunction 反而使用率没那么高。接下来我们就先从定时服务TimerService入手详细讲解 KeyedProcessFunction 的用法 2.1.1、定时器Timer和定时服务TimerService KeyedProcessFunction 的一个特色就是可以灵活地使用定时器。定时器timers是处理函数中进行时间相关操作的主要机制。在.onTimer()方法中可以实现定时处理的逻辑而它能触发的前提就是之前曾经注册过定时器、并且现在已经到了触发时间。注册定时器的功能是通过上下文中提供的“定时服务”TimerService来实现的。         定时服务与当前运行的环境有关。前面已经介绍过ProcessFunction 的上下文Context中提供了.timerService()方法可以直接返回一个 TimerService 对象。TimerService 是 Flink 关于时间和定时器的基础服务接口包含以下六个方法 public abstract TimerService timerService(); TimerService 是 Flink 关于时间和定时器的基础服务接口包含以下六个方法 // 获取当前的处理时间 long currentProcessingTime(); // 获取当前的水位线事件时间 long currentWatermark(); // 注册处理时间定时器当处理时间超过 time 时触发 void registerProcessingTimeTimer(long time); // 注册事件时间定时器当水位线超过 time 时触发 void registerEventTimeTimer(long time); // 删除触发时间为 time 的处理时间定时器 void deleteProcessingTimeTimer(long time); // 删除触发时间为 time 的处理时间定时器 void deleteEventTimeTimer(long time);六个方法可以分成两大类基于处理时间和基于事件时间。而对应的操作主要有三个获取当前时间注册定时器以及删除定时器。 需要注意尽管处理函数中都可以直接访问TimerService不过只有基于 KeyedStream 的处理函数才能去调用注册和删除定时器的方法未作按键分区的 DataStream 不支持定时器操作只能获取当前时间。 对于处理时间和事件时间这两种类型的定时器TimerService 内部会用一个优先队列将它们的时间戳timestamp保存起来排队等待执行。可以认为定时器其实是 KeyedStream上处理算子的一个状态它以时间戳作为区分。所以 TimerService 会以键key和时间戳为标准对定时器进行去重也就是说对于每个 key 和时间戳最多只有一个定时器如果注册了多次onTimer()方法也将只被调用一次。这样一来我们在代码中就方便了很多可以肆无忌惮地对一个 key 注册定时器而不用担心重复定义——因为一个时间戳上的定时器只会触发一次。 基于 KeyedStream 注册定时器时会传入一个定时器触发的时间戳这个时间戳的定时器对于每个 key 都是有效的。这样我们的代码并不需要做额外的处理底层就可以直接对不同key 进行独立的处理操作了。         利用这个特性有时我们可以故意降低时间戳的精度来减少定时器的数量从而提高处理性能。比如我们可以在设置定时器时只保留整秒数那么定时器的触发频率就是最多 1 秒一次。 1. 事件时间定时器 我们通过 Socket 接收一个无序的数据流WaterSensor类型并指定允许迟到 3s然后进行一个 keyBy 之后得到 KeyedStream我们给它定义一个定时器当事件时间进展到 5s 的时候触发一次 public class KeyedProcessTimerDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(localhost, 9999).map(new WaterSensorFunction())// todo 指定 watermark 策略我们直接使用实现好的.assignTimestampsAndWatermarks(WatermarkStrategy// 指定watermark的生成: 泛型方法需要指定数据类型乱序的watermark 需要设置等待时间.WaterSensorforBoundedOutOfOrderness(Duration.ofSeconds(3)) // 等待3s// 指定如何从数据中提取事件时间.withTimestampAssigner((WaterSensor sensor, long recordTimestamp)- { // System.out.println(数据 sensor ,recordTs recordTimestamp);return sensor.getTs() * 1000L; // 返回的时间戳单位是 ms}));KeyedStreamWaterSensor, String sensorKs sensorDS.keyBy(WaterSensor::getId);// todo Process:keyedSingleOutputStreamOperatorString process sensorKs.process(/*** KeyedProcessFunctionK, T, R* K: key 的类型* T: data 的类型* R: return 的类型*/new KeyedProcessFunctionString, WaterSensor, String() {/*** 来一条数据调用一次这个方法* param value 数据* param ctx 上下文* param out 采集器* throws Exception*/Overridepublic void processElement(WaterSensor value, Context ctx, CollectorString out) throws Exception {// 从数据中提取出来的时间如果没有则返回 nullLong currentEventTime ctx.timestamp();// 定时器TimerService timerService ctx.timerService();// 注册定时器 - 事件时间timerService.registerEventTimeTimer(5000L); // 事件时间进展到 5s 时触发闹钟(定时器)System.out.println(当前的 keyctx.getCurrentKey()当前时间为 currentEventTime ,注册了一个5s的定时器);// 注册定时器 - 处理时间 // timerService.registerProcessingTimeTimer();// 删除定时器 - 事件时间 // timerService.deleteEventTimeTimer();// 删除定时器 - 处理时间 // timerService.deleteProcessingTimeTimer();// 获取当前的处理时间 - 系统时间long currentPs timerService.currentProcessingTime();// 获取当前水位线long watermark timerService.currentWatermark();}/*** 定义定时器(闹钟)触发时的响应逻辑,对于同一个key,onTimer只会被触发一次!!* param timestamp 当前时间进展* param ctx 上下文* param out 采集器* throws Exception*/Overridepublic void onTimer(long timestamp, OnTimerContext ctx, CollectorString out) throws Exception {super.onTimer(timestamp, ctx, out);System.out.println(当前的 key ctx.getCurrentKey()现在时间为 timestamp 定时器触发);}});process.print();env.execute();} }输入数据 s1,1,1 s1,3,3 s1,5,5 s1,8,8 s1,9,9输出数据 当前keys1,当前时间为 1000,注册了一个5s的定时器 当前keys1,当前时间为 3000,注册了一个5s的定时器 当前keys1,当前时间为 5000,注册了一个5s的定时器 当前keys1,当前时间为 8000,注册了一个5s的定时器 当前keys1,当前时间为 9000,注册了一个5s的定时器 当前的 key s1现在时间为 5000定时器触发 首先可以看到每来一条数据都会注册一个定时器我们还可以发现当数据进展到 8s 的时候按说我们设置的最多等待 3s 而这里 8-35 按说应该达到触发条件了可是却没有触发。其实对于触发器来说它这个时候的时间其实是 (8s-3s-1ms4999ms)其实离触发还差 1ms所以当我们数据的事件时间为 9s 的时候就会发现定时器终于被触发了。 注意事件事件语义下对于同一个 key 定时器只触发一次对于相同 key 的数据Flink 会根据 key 对定时器进行去重。 2. 处理时间定时器 和上面一样既然是处理时间的话我们数据中带的事件时间就没用了这里我们给每个来的数据定义一个五秒后的定时器 public class KeyedProcessTimerDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(localhost, 9999).map(new WaterSensorFunction())// todo 指定 watermark 策略我们直接使用实现好的.assignTimestampsAndWatermarks(WatermarkStrategy// 指定watermark的生成: 泛型方法需要指定数据类型乱序的watermark 需要设置等待时间.WaterSensorforBoundedOutOfOrderness(Duration.ofSeconds(3)) // 等待3s// 指定如何从数据中提取事件时间.withTimestampAssigner((WaterSensor sensor, long recordTimestamp)- { // System.out.println(数据 sensor ,recordTs recordTimestamp);return sensor.getTs() * 1000L; // 返回的时间戳单位是 ms}));KeyedStreamWaterSensor, String sensorKs sensorDS.keyBy(WaterSensor::getId);// todo Process:keyedSingleOutputStreamOperatorString process sensorKs.process(/*** KeyedProcessFunctionK, T, R* K: key 的类型* T: data 的类型* R: return 的类型*/new KeyedProcessFunctionString, WaterSensor, String() {/*** 来一条数据调用一次这个方法* param value 数据* param ctx 上下文* param out 采集器* throws Exception*/Overridepublic void processElement(WaterSensor value, Context ctx, CollectorString out) throws Exception {// todo 1.获取定时器并注册TimerService timerService ctx.timerService();// todo 1.1 注册事件时间定时器// 事件时间 也就是当前数据中的 watermark如果没有则返回 null // Long currentEventTime ctx.timestamp();// 注册定时器 - 事件时间 // timerService.registerEventTimeTimer(5000L); // 事件时间进展到 5s 时触发闹钟(定时器) // System.out.println(当前keyctx.getCurrentKey(),当前时间为 currentEventTime ,注册了一个5s的定时器);// todo 1.2 注册处理时间定时器// 处理时间 也就是当前的处理时间 - 系统时间long currentPs timerService.currentProcessingTime();// 注册定时器 - 处理时间timerService.registerProcessingTimeTimer(currentPs5000L); // 当处理时间为 当前时间5s 触发闹钟System.out.println(当前keyctx.getCurrentKey(),当前时间为 currentPs ,注册了一个5s后的定时器);// 删除定时器 - 事件时间 // timerService.deleteEventTimeTimer();// 删除定时器 - 处理时间 // timerService.deleteProcessingTimeTimer();// 获取当前水位线long watermark timerService.currentWatermark();}// todo 2.定义触发定时器逻辑/*** 定义定时器(闹钟)触发时的响应逻辑,对于同一个key,onTimer只会被触发一次!!* param timestamp 当前时间进展* param ctx 上下文* param out 采集器* throws Exception*/Overridepublic void onTimer(long timestamp, OnTimerContext ctx, CollectorString out) throws Exception {super.onTimer(timestamp, ctx, out);System.out.println(当前的 key ctx.getCurrentKey()现在时间为 timestamp 定时器触发);}});process.print();env.execute();} }测试输入 s1,1,1 s1,2,2 s1,3,3 输出 当前keys1,当前时间为 1703043149860,注册了一个5s后的定时器 当前keys1,当前时间为 1703043151317,注册了一个5s后的定时器 当前keys1,当前时间为 1703043152807,注册了一个5s后的定时器 当前的 key s1现在时间为 1703043154860定时器触发 当前的 key s1现在时间为 1703043156317定时器触发 当前的 key s1现在时间为 1703043157807定时器触发 可以看到处理时间语义下对于同一个 key 它有可能会触发多次。 3. watermark 的滞后性 Overridepublic void processElement(WaterSensor value, Context ctx, CollectorString out) throws Exception {// todo 1.获取定时器并注册TimerService timerService ctx.timerService();// 获取当前水位线long watermark timerService.currentWatermark();System.out.println(当前数据value,当前watermarkwatermark); } 输入  s1,1,1 s1,5,5 s1,9,9 输出 当前数据WaterSensor{ids1, ts1, vc1},当前watermark-9223372036854775808 当前数据WaterSensor{ids1, ts5, vc5},当前watermark-2001 当前数据WaterSensor{ids1, ts9, vc9},当前watermark1999 可以看到当我们的数据 {s1,1,1} 到达后watermark 并不是 1-3-1ms -2001 而是 watermark 的初始值 Inerger.MIN_VALUE这是因为我们的水位线总是插入到数据后面的而 processElement 方法一次只能处理一个数据所以当数据  {s1,1,1}  处理完毕之后 watermark-2001 才会进入 processElement 方法并更新 watermark。 定时器 - 总结 只有 KeyedStream 才有定时器事件时间定时器通过数据的 watermark 来触发 注意watermark 当前最大事件时间 - 最大等待时间 -1ms在 processElement 中获取到的 watermark 是上一次的 watermark 因为 watermark 是在数据后面进入 processElement 方法的。 2.3、窗口处理函数 除了 KeyedProcessFunction 另外一大类常用的处 理 函 数 就是基于窗口的ProcessWindowFunction 和 ProcessAllWindowFunction 了。 2.3.1、窗口处理函数的使用 进行窗口计算我们可以直接调用现成的简单聚合方法sum/max/min,也可以通过用.reduce()或.aggregate()来自定义一般的增量聚合函数ReduceFunction/AggregateFucntion而对于更加复杂、需要窗口信息和额外状态的一些场景我们还可以直接使用全窗口函数apply/process、把数据全部收集保存在窗口内等到触发窗口计算时再统一处理。窗口处理函数就是一种典型的全窗口函数。 窗口处理函数 ProcessWindowFunction 的使用与其他窗口函数类似 也是基于WindowedStream 直接调用方法就可以只不过这时调用的是 .process()。就像我们之前窗口那一章节写的全窗口函数 public class WindowProcessDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(localhost, 9999).map(new WaterSensorFunction());KeyedStreamWaterSensor, String sensorKs sensorDS.keyBy(WaterSensor::getId);// todo 1. 指定窗口分配器基于处理时间的滚动窗口WindowedStreamWaterSensor, String, TimeWindow tumblingWindow sensorKs.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));// todo 2. 指定窗口函数全窗口函数SingleOutputStreamOperatorString process tumblingWindow.process(new ProcessWindowFunctionWaterSensor, String, String, TimeWindow() {/**** param key 分组的 key* param context 上下文* param elements 全窗口存的数据* param out 采集器* throws Exception*/Overridepublic void process(String key, Context context, IterableWaterSensor elements, CollectorString out) throws Exception {long startTs context.window().getStart();long endTs context.window().getEnd();SimpleDateFormat sdf new SimpleDateFormat(yyyy-MM-dd HH:mm:ss);String start sdf.format(new Date(startTs));String end sdf.format(new Date(endTs));long size elements.spliterator().estimateSize();out.collect(key key 的窗口[ start , end ]包含 size 条数据 elements.toString());}});process.print();env.execute();} } 2.3.2、ProcessWindowFnction 解析 ProcessWindowFunction 既是处理函数又是全窗口函数。从名字上也可以推测出它的本质似乎更倾向于“窗口函数”一些。事实上它的用法也确实跟其他处理函数有很大不同。我们可以从源码中的定义看到这一点 public abstract class ProcessWindowFunctionIN, OUT, KEY, W extends Window extends AbstractRichFunction {public abstract void process( KEY key, Context context, IterableIN elements, CollectorOUT out) throws Exception;public void clear(Context context) throws Exception {}public abstract class Context implements java.io.Serializable {...} }ProcessWindowFunction 依然是一个继承了 AbstractRichFunction 的抽象类它有四个类型参数 INinput数据流中窗口任务的输入数据类型。OUToutput窗口任务进行计算之后的输出数据类型。KEY数据中键 key 的类型。W窗口的类型是 Window 的子类型。一般情况下我们定义时间窗口W就是 TimeWindow。而内部定义的方法跟我们之前熟悉的处理函数就有所区别了。 因为全窗口函数不是逐个处理元素的所以处理数据的方法在这里并不是.processElement()而是改了.process()。方法包含四个参数。 key窗口做统计计算基于的键也就是之前 keyBy 用来分区的字段。context当前窗口进行计算的上下文它的类型就是 ProcessWindowFunction内部定义的抽象类 Context。elements窗口收集到用来计算的所有数据这是一个可迭代的集合类型。out用来发送数据输出计算结果的收集器类型为 Collector。 可以明显看出这里的参数不再是一个输入数据而是窗口中所有数据的集合一个迭代器对象。而上下文context 所包含的内容也跟其他处理函数有所差别 public abstract class Context implements java.io.Serializable {public abstract W window();public abstract long currentProcessingTime();public abstract long currentWatermark();public abstract KeyedStateStore windowState();public abstract KeyedStateStore globalState();public abstract X void output(OutputTagX outputTag, X value); } 除了可以通过.output()方法定义侧输出流不变外其他部分都有所变化 这里不再持有TimerService 对象只能通过 currentProcessingTime()和 currentWatermark()来获取当前时间所以失去了设置定时器的功能另外由于当前不是只处理一个数据所以也不再提供.timestamp()方法。 与此同时也增加了一些获取其他信息的方法 可以通过.window()直接获取到当前的窗口对象也可以通过.windowState()和.globalState()获取到当前自定义的窗口状态和全局状态。 注意这里的“窗口状态”是自定义的不包括窗口本身已经有的状态针对当前 key、当前窗口有效而“全局状态”同样是自定义的状态针对当前 key 的所有窗口有效。所以我们会发现ProcessWindowFunction 中除了.process()方法外并没有.onTimer()方法而是多出了一个.clear()方法。从名字就可以看出这主要是方便我们进行窗口的清理工作。如果我们自定义了窗口状态那么必须在.clear()方法中进行显式地清除避免内存溢出。 这里有一个问题没有了定时器那窗口处理函数就失去了一个最给力的武器如果我们希望有一些定时操作又该怎么做呢其实仔细思考会发现对于窗口而言它本身的定义就包含了一个触发计算的时间点其实一般情况下是没有必要再去做定时操作的。如果非要这么干Flink也提供了另外的途径——使用窗口触发器Trigger。在触发器中也有一个TriggerContext它可以起到类似 TimerService 的作用获取当前时间、注册和删除定时器另外还可以获取当前的状态。这样设计无疑会让处理流程更加清晰——定时操作也是一种“触发”所以我们就让所有的触发操作归触发器管而所有处理数据的操作则归窗口函数管。 至于另一种窗口处理函数 ProcessAllWindowFunction它的用法非常类似。区别在于它基于的是 AllWindowedStream相当于对没有 keyBy 的数据流直接开窗并调用.process()方法但如果没有进行 keyBy那么原始的 DataStream 就不会分成多条逻辑流。这时窗口逻辑只能在一个任务task上执行就相当于并行度变成了 1。所以在实际应用中一般不推荐使用这种方式。在代码中直接基于 DataStream 调用.windowAll()定义窗口 stream.windowAll( TumblingEventTimeWindows.of(Time.seconds(10)) ).process(new MyProcessAllWindowFunction()); 2.4、应用案例 - Top N 案例需求实时统计一段时间内出现次数最多的水位。例如统计最近10s内出现最多的两个水位并且每5s更新一次。我们知道这可以通过一个滑动窗口来实现于是就需要开滑动窗口收集传感器的数据按照不同的水位进行统计而后汇总排序并最终输出前两名。其实这就是著名的“Top N”问题。 2.4.1、使用 ProcessAllWindowFunction 我们的数据类型 WaterSenor 的三个属性id传感器idts事件时间vc水位高度 /*** 案例 不同水位出现的次数的 topN*/ public class TopNDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(localhost, 9999).map(new WaterSensorFunction())// todo 指定 watermark 策略我们直接使用实现好的.assignTimestampsAndWatermarks(WatermarkStrategy// 指定watermark的生成: 泛型方法需要指定数据类型乱序的watermark 需要设置等待时间.WaterSensorforBoundedOutOfOrderness(Duration.ofSeconds(3)) // 等待3s// 指定如何从数据中提取事件时间.withTimestampAssigner((WaterSensor sensor, long recordTimestamp)- { // System.out.println(数据 sensor ,recordTs recordTimestamp);return sensor.getTs() * 1000L; // 返回的时间戳单位是 ms}));// todo 思路1 不做 keyBy 直接使用一个 hashMapvc,count 来累加 统一vc的count值// 窗口大小:10s,步长:5ssensorDS.windowAll(SlidingEventTimeWindows.of(Time.seconds(10),Time.seconds(5))).process(new ProcessAllWindowFunctionWaterSensor, String, TimeWindow() {Overridepublic void process(Context context, IterableWaterSensor elements, CollectorString out) throws Exception {// 定义一个hashMapMapInteger,Integer map new HashMap();for (WaterSensor waterSensor : elements) {int vc waterSensor.vc;map.put(vc,map.getOrDefault(vc,0)1);}// 排序输出top2,利用 list 对map根据value进行排序ListTuple2Integer, Integer list new ArrayList();for (Integer vc : map.keySet()) {list.add(Tuple2.of(vc,map.get(vc)));}list.sort(new ComparatorTuple2Integer, Integer() {Overridepublic int compare(Tuple2Integer, Integer o1, Tuple2Integer, Integer o2) {return o2.f1-o1.f1;}});StringBuilder builder new StringBuilder();builder.append(\n);// 防止越界考虑list的size可能不够两个for (int i 0; i Math.min(list.size(),2); i) {Tuple2Integer, Integer tuple list.get(i);builder.append(top).append(i1).append(: );builder.append(tuple.f0).append( - );builder.append(tuple.f1).append(\n);}builder.append(窗口结束时间).append(DateFormatUtils.format(context.window().getEnd(), yyyy-MM-dd HH:mm:ss.SSS));builder.append(\n);out.collect(builder.toString());}}).print();env.execute();}}注意滑动窗口一定有第一个步长时被触发到达第 1 个步长触发第 1 个窗口到达第 2 个步长触发第 2 个窗口。 上面我们定义了一个大小为 10 滑动步长为 5 的窗口并且等待时间为 3 注意 等待时间内来的数据如果不在窗口范围内并不会计入到窗口中窗口区间是左闭右开的 所以这里的窗口 第一个窗口[-5,5注意第一个窗口并不是[0,10)第一个窗口[0,10... 测试输入数据 s1,1,1 s2,2,1 s3,3,2 s4,4,2 s5,5,2 // 窗口范围是左闭右开的这里的 2 并不计数这里达到第一个滑动步长所以要等待3s s6,6,1 s7,7,3 s8,8,3 // 此时才触发第一次计算 s10,10,2 s12,12,1 s13,13,4 // 达到第二个滑动步长再次触发计算 输出结果 // [-5,5)的结果 top1: 1 - 2 top2: 2 - 2 窗口结束时间1970-01-01 08:00:05.000 // [0,10)的结果 top1: 1 - 3 top2: 2 - 3 窗口结束时间1970-01-01 08:00:10.000 2.4.2、使用 KeyedProcessFunction 上面我们没有按键区分直接将所有数据放在一个分区上进行了开窗操作。这相当于将并行度强行设置为 1在实际应用中是要尽量避免的因为如果数据量很大一个并行度的情况下机器受不了而且全窗口函数是在最后窗口要关闭滚动窗口或者移动滑动窗口时才对有窗口内的数据进行计算所以计算压力可想而知所以 Flink 官方也并不推荐使用 AllWindowedStream 进行处理。另外我们在全窗口函数中定义了 HashMap来统计 水位 的出现次数计算过程是要先收集齐所有数据、然后再逐一遍历更新 HashMap这显然不够高效。如果我们可以利用增量聚合函数的特性每来一条数据就更新一次该水位出现的次数那么到窗口触发计算时只需要做排序输出就可以了。 所以优化的思路就是先按照 vc 对数据进行 keyBy 分区然后开窗进行增量聚合。所以我们先用增量聚合函数 AggregateFunction 对每个 vc 的次数进行统计然后结合 ProcessWindowFunction 排序输出最终结果。 总结 我们首先根据数据的 vc 进行 keyBy 分区开窗根据需求开一个滑动窗口窗口大小10s滑动步长5s使用聚合函数aggregate结合全窗口函数ProcessWindowFunction先把每个区的数据相同 vc的次数统计出来得到 count然后使用全窗口函数把返回值封装成 Tuple3 vccountendWindow的格式因为我们要根据不同窗口范围进行统计 TopN上面最终的结果是一个普通的 DataStream我们需要再根据窗口范围上面Tuple3 中的 endWindow字段进行一个 keyBy 分区把每个范围的数据放到一起进行统一排序这里使用 hashMapkeywindowEndvalueTuple3vc,count,windowEnd 进行存储使用 arrayList 进行排序。使用定时器当 processElement 数据到齐后进行触发计算输出。 /*** 案例 不同水位出现的次数的 topN*/ public class KeyedProcessFunctionTopNDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(localhost, 9999).map(new WaterSensorFunction())// todo 指定 watermark 策略我们直接使用实现好的.assignTimestampsAndWatermarks(WatermarkStrategy// 指定watermark的生成: 泛型方法需要指定数据类型乱序的watermark 需要设置等待时间.WaterSensorforBoundedOutOfOrderness(Duration.ofSeconds(3)) // 等待3s// 指定如何从数据中提取事件时间.withTimestampAssigner((WaterSensor sensor, long recordTimestamp) - { // System.out.println(数据 sensor ,recordTs recordTimestamp);return sensor.getTs() * 1000L; // 返回的时间戳单位是 ms}));// todo 思路2 使用 keyedProcessFunction 实现KeyedStreamWaterSensor, Integer keyedStream sensorDS.keyBy(sensor - sensor.vc);SingleOutputStreamOperatorTuple3Integer, Integer, Long windowAgg keyedStream.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))// AggregateFunction 3个泛型参数: 输入类型,累加器类型,输出类型.aggregate(new AggregateFunctionWaterSensor, Integer, Integer() {// 累加器初始值Overridepublic Integer createAccumulator() {return 0;}// 累加过程: 直接1,毕竟我们都是相同keyOverridepublic Integer add(WaterSensor value, Integer accumulator) {return accumulator 1;}// 累加结果直接返回Overridepublic Integer getResult(Integer accumulator) {return accumulator;}Overridepublic Integer merge(Integer a, Integer b) {return null;}}, // ProcessWindowFunction的4个泛型参数: 输入类型、输出类型、键类型、窗口类型// 这里由于我们后面要根据输出结果区分数据是来自哪个窗口的,所以使用了Tuple3vc,count,windowEnd 带上了结束窗口的标签new ProcessWindowFunctionInteger, Tuple3Integer, Integer, Long, Integer, TimeWindow() {Overridepublic void process(Integer key, Context context, IterableInteger elements, CollectorTuple3Integer, Integer, Long out) throws Exception {// 迭代器只有一条数据 所以 iterator.next() 就是它的全部数据了Integer count elements.iterator().next();long windowEnd context.window().getEnd();out.collect(Tuple3.of(key, count, windowEnd));}});/*** windowAgg:SingOutputStreamOperator 的聚合结果* vc1,count100,windowEnd10s* vc2,count70,windowEnd10s* vc3,count80,windowEnd10s* 开窗聚合后就变成了普通的数据流SingOutputStreamOperator继承自 DataStream,所以我们自己给聚合结果打上了窗口结束的标签(windowEnd)*/// 2. 按照窗口结束标签进行 keyBy 保证同一窗口时间范围的数据统一处理,之后再排序windowAgg.keyBy(r - r.f2).process(new TopN(2)).print();env.execute();}public static class TopN extends KeyedProcessFunctionLong,Tuple3Integer,Integer,Long,String{private MapLong,ListTuple3Integer,Integer,Long map;private int threshold;public TopN(int threshold) {this.threshold threshold;map new HashMap();}// Tuple3Integer, Integer, Long value : Tuple3的元素类型: vc,count,windowEndOverridepublic void processElement(Tuple3Integer, Integer, Long value, Context ctx, CollectorString out) throws Exception {// 进入这个方法的都只是一条数据,要排序就需要都到齐才行// 1. 存到 hashMapLong windowEnd value.f2;if (map.containsKey(windowEnd)){ListTuple3Integer, Integer, Long list map.get(windowEnd);list.add(value);}else {ListTuple3Integer, Integer, Long list new ArrayList();list.add(value);map.put(windowEnd,list);}// 注册一个定时器windowEnd1ms 触发// 因为同一个窗口范围应该同时输出,只不过是一条一条调用processElement方法1ms就够执行完了ctx.timerService().registerEventTimeTimer(windowEnd 1);// 这里 out 不用操作}// 定时器触发逻辑Overridepublic void onTimer(long timestamp, OnTimerContext ctx, CollectorString out) throws Exception {super.onTimer(timestamp, ctx, out);// 同一个窗口的计算结果攒齐了需要开启排序和取 top N// 1. 排序Long windowEnd ctx.getCurrentKey();ListTuple3Integer, Integer, Long list map.get(windowEnd);list.sort((o1,o2) - o2.f1-o1.f1);// 2. 取 topNStringBuilder builder new StringBuilder();builder.append(\n);// 防止越界,考虑list的size可能不够两个for (int i 0; i Math.min(list.size(),threshold); i) {Tuple3Integer, Integer, Long tuple list.get(i);builder.append(top).append(i1).append(: );builder.append(tuple.f0).append( - );builder.append(tuple.f1).append(\n);builder.append(窗口结束时间).append(DateFormatUtils.format(tuple.f2, yyyy-MM-dd HH:mm:ss.SSS));builder.append(\n);builder.append(\n);}// list 用完就可以及时销毁了,节省空间list.clear();out.collect(builder.toString());}} }输入数据 s1,1,1 s1,2,1 s1,5,2 s1,8,3 s1,9,1 // 第一个窗口范围 [-5,5),但是等待时间3s所以8s进行输出,但是我们触发器1ms所以9s才输出 s1,10,1 s1,13,2 s1,14,3 // 同理,第二个窗口范围 [0,10),14s才输出输出结果 top1: 1 - 2 窗口结束时间1970-01-01 08:00:05.000top1: 1 - 3 窗口结束时间1970-01-01 08:00:10.000top2: 2 - 1 窗口结束时间1970-01-01 08:00:10.0002.5、侧输出流Side Output 上下文对象 ctx 提供了侧输出流方法 output(OutTag,value) 或者如果我们是 keyedStream.processElement() 的话还可以在 .onTimer() 方法中调用上下文的.output()方法就可以了。我们之前用过好多次了。 /*** 案例 不同水位出现的次数的 topN*/ public class SideOutputDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(localhost, 9999).map(new WaterSensorFunction())// todo 指定 watermark 策略我们直接使用实现好的.assignTimestampsAndWatermarks(WatermarkStrategy// 指定watermark的生成: 泛型方法需要指定数据类型乱序的watermark 需要设置等待时间.WaterSensorforBoundedOutOfOrderness(Duration.ofSeconds(3)) // 等待3s// 指定如何从数据中提取事件时间.withTimestampAssigner((WaterSensor sensor, long recordTimestamp) - { // System.out.println(数据 sensor ,recordTs recordTimestamp);return sensor.getTs() * 1000L; // 返回的时间戳单位是 ms}));OutputTagString warnTag new OutputTag(warn, Types.STRING);SingleOutputStreamOperatorWaterSensor process sensorDS.keyBy(sensor - sensor.id).process(new KeyedProcessFunctionString, WaterSensor, WaterSensor() {// KeyedProcessFunction泛型参数类型key类型、输入类型、主流输出类型Overridepublic void processElement(WaterSensor value, Context ctx, CollectorWaterSensor out) throws Exception {// 使用侧输出流告警if (value.vc 10) {ctx.output(warnTag, 当前水位 value.vc 阈值10!!!);}out.collect(value);}});process.print(main);process.getSideOutput(warnTag).print(warn);env.execute();} }输入数据 s1,1,1 s1,2,2 s1,8,8 s1,10,10 s1,12,12 输出结果 main WaterSensor{ids1, ts1, vc1} main WaterSensor{ids1, ts2, vc2} main WaterSensor{ids1, ts8, vc8} main WaterSensor{ids1, ts10, vc10} warn 当前水位12阈值10!!! main WaterSensor{ids1, ts12, vc12} 总结 这一块知识点特别挺多与前面的窗口知识关联紧密都必须熟悉掌握对感兴趣的事并不能算是一种痛苦享受知识越来越丰富的过程吧。
http://www.pierceye.com/news/414755/

相关文章:

  • 网站开发集成环境国内html5网站欣赏
  • iis7.5 没有默认网站北京seo的排名优化
  • 两学一做网站是多少钱营销型网站策划怎么做
  • 渭南做网站的自建房设计图
  • 移动网站建设价格线上推广专员是干嘛的
  • 做化妆刷的外贸网站企业网站托管备案
  • 湖南省建设干部学校 网站金融直播室网站建设
  • 贵州建设厅特殊工种考试网站photoshop平面设计教学视频
  • 怎么推广我的网站代理网站推荐
  • wordpress主题站模板做网站跟做APP哪个容易
  • 杭州网站建设公司推荐网站建设优化服务渠道
  • php是网站开发语言吗做网站前端需要编程基础吗
  • python 网站开发 前端企业信用信息系统官网
  • 公司网站设计有哪些使用技巧呢商城网站建设怎么收费
  • 东莞做网站平台安阳营销型网站建设
  • 如何查看网站开发语言百度排行榜风云榜
  • 泉州 网站建设公司首选广告设计公司名字有寓意有创意
  • 天津个人做网站慈利网站制作
  • 专门做推广的网站吗宿迁房价2023年最新房价
  • 0基础12天精通网站建设网站建设 全网推广
  • 东莞网站营销推广公司移动应用开发案例
  • 妇科医院网站建设怎么做网站建设培训心得体会
  • 网站建设 管理正能量网站入口地址
  • 做网站没有创意Wordpress国际收款
  • 网站推广关键词工具wap网站分享到微信
  • 哪个网站可以给图片做链接做网站的公司在哪
  • 搬瓦工可以长期做网站广告制作开票大类是什么
  • 高级网站开发工信部小企业门户网站建设
  • 网站建站知识秦皇岛汽车网站制作
  • 建站之星极速版app开发需求