查企业数据要去什么网站,免费商城小程序模板,wordpress安装说明,网站seo怎么填写本篇文章将带大家运行 Flink 最简单的程序 WordCount。先实践后理论#xff0c;对其基本输入输出、编程代码有初步了解#xff0c;后续篇章再对 Flink 的各种概念和架构进行介绍。 下面将从创建项目开始#xff0c;介绍如何创建出一个 Flink 项目#xff1b;然后从 DataStr…本篇文章将带大家运行 Flink 最简单的程序 WordCount。先实践后理论对其基本输入输出、编程代码有初步了解后续篇章再对 Flink 的各种概念和架构进行介绍。 下面将从创建项目开始介绍如何创建出一个 Flink 项目然后从 DataStream 流处理和 FlinkSQL 执行两种方式来带大家学习 WordCount 程序的开发。 Flink 各版本之间变化较多之前版本的函数在后续版本可能不再支持。跟随学习时请尽量选择和笔者同版本的 Flink。本文使用的 Flink 版本是 1.13.2。
一、创建项目
在很多其他教程中会看到如下来创建 Flink 程序的方式。虽然简单方便但对初学者来说不知道初始化项目的时候做了什么如果报错了也不知道该如何排查。 mvn archetype:generate -DarchetypeGroupIdorg.apache.flink -DarchetypeArtifactIdflink-quickstart-java -DarchetypeVersion1.13.2 通过指定 Maven 工程的三要素即 GroupId、ArtifactId、Version 来创建一个新的工程。同时 Flink 给我提供了更为方便的创建 Flink 工程的方法 curl https://flink.apache.org/q/quickstart.sh | bash -s 1.13.2 因此我们手动来创建一个 Maven 项目看看到底如何创建出一个 Flink 项目。 1、通过 IDEA 创建一个 Maven 项目
2、pom.xml 添加 这里我们选择的是 Flink 1.13.2 版本Flink 1.14 之后部分类和函数有变化可自行探索。 propertiesmaven.compiler.source8/maven.compiler.sourcemaven.compiler.target8/maven.compiler.targetflink.version1.13.2/flink.version !-- 1.14 之后部分类和函数有变化可自行探索 --target.java.version1.8/target.java.versionscala.binary.version2.12/scala.binary.versionmaven.compiler.source${target.java.version}/maven.compiler.sourcemaven.compiler.target${target.java.version}/maven.compiler.target/propertiesdependenciesdependencygroupIdorg.apache.flink/groupIdartifactIdflink-java/artifactIdversion${flink.version}/versionscopeprovided/scope/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-streaming-java_${scala.binary.version}/artifactIdversion${flink.version}/versionscopeprovided/scope/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-clients_${scala.binary.version}/artifactIdversion${flink.version}/versionscopeprovided/scope/dependency/dependencies二、DataStream WordCount
一编写程序
基础项目环境已经搞好了接下来我们模仿一个流式环境监听本地的 Socket 端口使用 Flink 统计流入的不同单词个数。
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class SocketTextStreamWordCount {public static void main(String[] args) throws Exception {//参数检查if (args.length ! 2) {// System.err.println(USAGE:\nSocketTextStreamWordCount hostname port);// return;args new String[]{127.0.0.1, 9000};}String hostname args[0];Integer port Integer.parseInt(args[1]);// 创建 streaming execution environmentfinal StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 获取数据DataStreamSourceString stream env.socketTextStream(hostname, port);// 计数SingleOutputStreamOperatorTuple2String, Integer sum stream.flatMap(new LineSplitter()).keyBy(0).sum(1);sum.print();env.execute(Java WordCount from SocketTextStream Example);}public static final class LineSplitter implements FlatMapFunctionString, Tuple2String, Integer {Overridepublic void flatMap(String s, CollectorTuple2String, Integer collector) {String[] tokens s.toLowerCase().split(\\W);for (String token: tokens) {if (token.length() 0) {collector.collect(new Tuple2String, Integer(token, 1));}}}}
}
二测试
接下来我们进行程序测试。 我们在本地使用 netcat 命令启动一个端口
nc -l 9000然后启动程序能看到控制台一些输出
接下来在 nc 中输入
$ nc -l 9000
hello world
flink flink flink回到我们的程序能看到统计的输出
3 (hello,1)
6 (world,1)
8 (flink,1)
8 (flink,2)
8 (flink,3)三如果有报错
如果出现执行报错
Exception in thread main java.lang.NoClassDefFoundError: org/apache/flink/api/java/io/TextInputFormatat com.shuofxz.SocketTextStreamWordCount.main(SocketTextStreamWordCount.java:25)
Caused by: java.lang.ClassNotFoundException: org.apache.flink.api.java.io.TextInputFormatat java.net.URLClassLoader.findClass(URLClassLoader.java:387)at java.lang.ClassLoader.loadClass(ClassLoader.java:419)at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)at java.lang.ClassLoader.loadClass(ClassLoader.java:352)... 1 more在 IDE 中把 「Add dependencies with “Provided” scope to classpath」勾选上
三、Flink Table SQL WordCount
一介绍 FlinkSQL
Flink SQL 是 Flink 实时计算为简化计算模型降低用户使用实时计算门槛而设计的一套符合标准 SQL 语义的开发语言。 上面单词统计的逻辑可以转化为下面的 SQL。 直接来看这个 SQL
select word as word, sum(frequency) as frequency from WordCount group by wordWordCount 是要进行单词统计的表我们会先做一些处理将输入的单词都存放到这个表中表我们定义为两列(word, frequency)初始转化输入每个单词占一行frequency 都是 1然后就可以按照 SQL 的逻辑来进行统计聚合了。
其中WordCount 表数据如下
wordfrequencyhello1world1flink1flink1flink1
那么接下来我们看如何写一个 FlinkSQL 的程序。
二环境和程序
首先添加 FlinkSQL 需要的依赖 dependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-api-java-bridge_${scala.binary.version}/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-planner-blink_${scala.binary.version}/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-planner_${scala.binary.version}/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-api-scala-bridge_2.11/artifactIdversion${flink.version}/version/dependency程序如下
public class SQLWordCount {public static void main(String[] args) throws Exception {// 创建上下文环境ExecutionEnvironment fbEnv ExecutionEnvironment.getExecutionEnvironment();BatchTableEnvironment fbTableEnv BatchTableEnvironment.create(fbEnv);// 读取一行模拟数据作为输入String words hello world flink flink flink;String[] split words.split(\\W);ArrayListWC list new ArrayList();for (String word : split) {WC wc new WC(word, 1);list.add(wc);}DataSourceWC input fbEnv.fromCollection(list);// DataSet 转 SQL指定字段名Table table fbTableEnv.fromDataSet(input, word,frequency);table.printSchema();// 注册为一个表fbTableEnv.createTemporaryView(WordCount, table);Table table1 fbTableEnv.sqlQuery(select word as word, sum(frequency) as frequency from WordCount group by word);DataSetWC ds1 fbTableEnv.toDataSet(table1, WC.class);ds1.printToErr();}public static class WC {public String word;public long frequency;public WC() {}public WC(String word, long frequency) {this.word word;this.frequency frequency;}Overridepublic String toString() {return word , frequency;}}
}执行结果输出
(word STRING,frequency BIGINT
)
flink, 3
world, 1
hello, 1四、小结
本篇手把手的带大家搭建起 Flink Maven 项目然后使用 DataStream 和 FlinkSQL 两种方式来学习 WordCount 单词计数这一最简单最经典的 Flink 程序开发。跟着步骤一步步执行下来大家应该对 Flink 程序基本执行流程有个初步的了解为后续的学习打下了基础。