dedecms 旅游网站模板,上海闵行做网站,资金盘网站建设,wordpress页面模板位置大纲 新建工程自定义无界流 使用打包、提交、运行工程代码 在《Java版Flink使用指南——从RabbitMQ中队列中接入消息流》一文中#xff0c;我们让外部组件RabbitMQ充当了无界流的数据源#xff0c;使得Flink进行了流式处理。在《Java版Flink使用指南——将消息写入到RabbitMQ… 大纲 新建工程自定义无界流 使用打包、提交、运行工程代码 在《Java版Flink使用指南——从RabbitMQ中队列中接入消息流》一文中我们让外部组件RabbitMQ充当了无界流的数据源使得Flink进行了流式处理。在《Java版Flink使用指南——将消息写入到RabbitMQ的队列中》一文中我们使用了Flink自带的数据生成器生成了有限数据从而让Flink以批处理形式运行了该任务。 本文我们将自定义一个无界流生成器以方便后续测试。
新建工程
我们新建一个名字叫UnboundedStreamGenerator的工程。 Archetypeorg.apache.flink:flink-quickstart-java 版本1.19.1
自定义无界流
新建src/main/java/org/example/generator/UnBoundedStreamGenerator.java 然后UnBoundedStreamGenerator实现RichSourceFunction接口
public abstract class RichSourceFunctionOUT extends AbstractRichFunctionimplements SourceFunctionOUT {private static final long serialVersionUID 1L;
}主要实现SourceFunction接口的run和cancel方法。run方法用来获取获取cancel方法用于终止任务。
package org.example.generator;import org.apache.flink.streaming.api.functions.source.RichSourceFunction;public class UnBoundedStreamGenerator extends RichSourceFunctionLong {private volatile boolean isRunning true;Overridepublic void run(SourceContextLong ctx) throws Exception {long count 0L;while (isRunning) {Thread.sleep(1000); // Simulate delayctx.collect(count); // Emit data}}Overridepublic void cancel() {isRunning false;System.out.println(UnBoundedStreamGenerator canceled);}
}在run方法中我们每隔一秒产生一条数据且这个数字自增。
使用
我们使用addSource方法将该无界流生成器添加成数据源。然后将其输出到日志。
/** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements. See the NOTICE file* distributed with this work for additional information* regarding copyright ownership. The ASF licenses this file* to you under the Apache License, Version 2.0 (the* License); you may not use this file except in compliance* with the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an AS IS BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package org.example;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.example.generator.UnBoundedStreamGenerator;/*** Skeleton for a Flink DataStream Job.** pFor a tutorial how to write a Flink application, check the* tutorials and examples on the a hrefhttps://flink.apache.orgFlink Website/a.** pTo package your application into a JAR file for execution, run* mvn clean package on the command line.** pIf you change the name of the main class (with the public static void main(String[] args))* method, change the respective entry in the POM.xml file (simply search for mainClass).*/
public class DataStreamJob {public static void main(String[] args) throws Exception {// Sets up the execution environment, which is the main entry point// to building Flink applications.final StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.addSource(new UnBoundedStreamGenerator()).name(Custom Stream Source).setParallelism(1) .print(); // For demonstration, print the stream to stdout// Execute program, beginning computation.env.execute(Flink Java API Skeleton);}
}
打包、提交、运行 使用下面命令查看日志输出
tail -f log/*然后我们在后台点击Cancel Job 可以看到输出
工程代码
https://github.com/f304646673/FlinkDemo