手机网站设计只选亿企邦,全屏网站大小,网页即时聊天源码,北京朝阳区房价2022最新价格文章目录 1、Flink简介2、Flink部署2.1 本地模式2.1 Standalone模式部署2.2 Standalone模式下的高可用2.3 Yarn模式Yarn模式的高可用配置#xff1a;yarn模式中三种子模式的区别#xff1a; 3、并行度4、提交命令执行指定任务Application Mode VS yarn per-job 5、注意事项5、… 文章目录 1、Flink简介2、Flink部署2.1 本地模式2.1 Standalone模式部署2.2 Standalone模式下的高可用2.3 Yarn模式Yarn模式的高可用配置yarn模式中三种子模式的区别 3、并行度4、提交命令执行指定任务Application Mode VS yarn per-job 5、注意事项5、注意事项 1、Flink简介
Spark 和 Flink 一开始都都希望能够用同一个技术把流处理和批处理统一起来但他们走了完全不一样的两条路。前者是以批处理的技术为根本并尝试在批处理之上支持流计算后者则认为流计算技术是最基本的在流计算的基础之上支持批处理。通过Flink和Spark的对比来说
SparkFlink流批世界观一切都是由批次组成。离线数据是一个大批次而实时数据是由一个一个无限的小批次组成的。一切都是由流组成。离线数据是有界限的流实时数据是一个没有界限的流。计算模型微批处理模型秒级在批处理的基础上做流处理连续流模型毫秒级在流的基础上做批处理驱动时间驱动型:主动拉取数据即使没有数据到达一定时间也会去计算浪费资源事件驱动型:被动拉取数据如果没数据的时候什么也不干节省资源checkpoint小文件问题一个分区一个小文件重启任务会有很多小任务浪费资源无小文件问题exactly once自己实现exactly once保证窗口灵活的窗口语义Spark有的都有吞吐量大于Flink 2、Flink部署
开发模式idea本地模式(零配置)Standalone模式Yarn模式 Session-ClusterApplication ModePer-Job-Cluster
2.1 本地模式 上传Flink安装包flink-1.13.1-bin-scala_2.12.tgz到节点zyn-node01 解压 tar -zxvf flink-1.13.1-bin-scala_2.12.tgz -C /opt/module
cd /opt/module
cp -r flink-1.13.1 flink-local启动Flink集群 bin/start-cluster.sh
bin/stop-cluster.sh在hadoop102启动netcat #sudo yum install -y nc
nc -lk 9999命令行提交Flink命令 bin/flink run -m zyn-node01:8081 -c com.sunmi.day01.Flink03_Stream_Unbounded_WordCount /home/hadoop/jars/Flink-202307-1.0-SNAPSHOT-jar-with-dependencies.jar查看应用执行情况 http://zyn-node01:80812.1 Standalone模式部署 配置文件flink-conf.yaml jobmanager.rpc.address: zyn-node01workers、 zyn-node02
zyn-node03
zyn-node04
zyn-node05
zyn-node06分发至其他节点 启动集群 bin/start-cluster.sh提交命令执行任务 bin/flink run -m zyn-node01:8081 -c com.sunmi.day01.Flink03_Stream_Unbounded_WordCount /home/hadoop/jars/Flink-202307-1.0-SNAPSHOT-jar-with-dependencies.jar 通过8081端口访问WebUI 一台节点可以同时启动多个TaskManager 启动集群后再次启动bin/start-cluster.sh则一台节点会有两个TaskManager可通过jps查看 2.2 Standalone模式下的高可用
任何时候都有一个主 JobManager和多个备用 JobManagers以便在主节点失败时有备用 JobManagers 来接管集群。这可以避免单点故障一旦备 JobManager 接管集群作业就可以正常运行。主备 JobManager 实例之间没有明显的区别。每个 JobManager都可以充当主备节点。 修改配置文件flink-conf.yaml high-availability: zookeeper
high-availability.storageDir: hdfs://hadoop102:8020/flink/standalone/ha
high-availability.zookeeper.quorum: hadoop102:2181,hadoop103:2181,hadoop104:2181
high-availability.zookeeper.path.root: /flink-standalone
high-availability.cluster-id: /cluster_hpumasters hadoop102:8081
hadoop103:8081分发至其他节点 修改环境变量myenv.sh,并分发source export HADOOP_CLASSPATHhadoop classpath启动flink集群 先查看通过zookeeper客户端查看哪个是master然后kill掉master进行测试 zkCli.sh
get /flink-standalone/cluster_hpu/leader/rest_server_lock2.3 Yarn模式
独立部署Standalone模式由Flink自身提供计算资源无需其他框架提供资源这种方式降低了和其他第三方资源框架的耦合性独立性非常强。但是Flink主要是计算框架而不是资源调度框架所以本身提供的资源调度并不是它的强项所以还是和其他专业的资源调度框架集成更靠谱所以接下来我们来学习在强大的Yarn环境中Flink是如何使用的。
把Flink应用提交给Yarn的ResourceManager, Yarn的ResourceManager会申请容器从Yarn的NodeManager上面. Flink会创建JobManager和TaskManager在这些容器上.Flink会根据运行在JobManager上的job的需要的slot的数量动态的分配TaskManager资源。 复制flink-yarn cp -r flink-1.13.1 flink-yarn仅需配置/etc/profile.d/my.sh中配置并分发 export HADOOP_CLASSPATHhadoop classpath执行命令提交任务 bin/flink run -t yarn-per-job -c com.sunmi.day01.Flink03_Stream_Unbounded_WordCount /home/hadoop/jars/Flink-202307-1.0-SNAPSHOT-jar-with-dependencies.jar通过zyn-node038088查看任务 进入任务 Yarn中启动两个Container其中一个是JobManager一个TaskManager
Yarn模式的高可用配置
Standalone模式中, 同时启动多个Jobmanager, 一个为leader其他为standby的, 当leader挂了, 其他的才会有一个成为leader。
yarn的高可用是同时只启动一个Jobmanager, 当这个Jobmanager挂了之后, yarn会再次启动一个, 其实是利用的yarn的重试次数来实现的高可用。 yarn-site.xml propertynameyarn.resourcemanager.am.max-attempts/namevalue4/valuedescriptionThe maximum number of application master execution attempts./description
/propertyflink-conf.yaml yarn.application-attempts: 3
high-availability: zookeeper
high-availability.storageDir: hdfs://hadoop102:8020/flink/yarn/ha
high-availability.zookeeper.quorum: hadoop102:2181,hadoop103:2181,hadoop104:2181
high-availability.zookeeper.path.root: /flink-yarn启动yarn-session 杀死Jobmanager查看复活情况 注意: yarn-site.xml中是复活次数的上限, flink-conf.xml中的次数应该小于这个值。 测试过程中会发现一直kill不掉jobManager是因为除了重试次数这个机制外还有一个时间的机制Akka超时时间如果在一定的时间(这个时间很短)内jobManager重新拉取了几次还是挂掉的话那就会真正的挂掉。 yarn模式中三种子模式的区别 Session模式适合需要频繁提交的多个小job并且执行时间都不长因为flink会在yarn中启动一个session集群这个集群主要用来申请资源的后续提交的其他作业都会直接提交到这个session集群中不需要频繁创建flink集群这样效率会变高但是作业之间相互不隔离。 Session-Cluster模式需要先启动Flink集群向Yarn申请资源。以后提交任务都向这里提交。这个Flink集群会常驻在yarn集群中除非手动停止。在向Flink集群提交Job的时候, 如果资源被用完了,则新的Job不能正常提交。 缺点: 如果提交的作业中有长时间执行的大作业, 占用了该Flink集群的所有资源, 则后续无法提交新的job. per-job模式:一个Job会对应一个Flink集群每提交一个作业会根据自身的情况都会单独向yarn申请资源直到作业执行完成一个作业的失败与否并不会影响下一个作业的正常提交和运行。独享Dispatcher和ResourceManager按需接受资源申请适合规模大长时间运行的作业。每次提交job都会创建一个新的flink集群任务之间互相独立互不影响方便管理。任务执行完成之后创建的集群也会消失。同时main方法是在本地上运行。 application Mode模式 每提交一个任务application可能会包含多个job一个application对应一个flink集群main方法是在集群中运行。 Application Mode会在Yarn上启动集群, 应用jar包的main函数(用户类的main函数)将会在JobManager上执行. 只要应用程序执行结束, Flink集群会马上被关闭。也可以手动停止集群。 与Per-Job-Cluster的区别就是Application Mode下, 用户的main函数式在集群中执行的并且当一个application中有多个job的话per-job模式则是一个job对应一个yarn中的application而ApplicationMode则这个application中对应多个job。 application Mode模式存在bug不使用。 bug每个job的id都为0000000而checkpoint依赖于id命名在hdfs集群上进行存储。这将导致错误发生。
3、并行度 并行度优先级 算子指定env全局指定提交参数配置文件 slot个数与并行度的关系 默认情况下slot个数等于流程序的并行度程序中最大算子的并行度 在有多个共享组时slot个数等于每个共享组中最大算子并行的和
4、提交命令执行指定任务
flink提交任务脚本参数: flink 类似于spark-submit用于提交作业 run 用来执行作业除了applicationMode模式不需要 run-application (applicaitonMode模式执行作业的命令) -t yarn模式中指定以yarn哪种模式运行的参数 -d 后台提交(断开与客户端的连接) -m 指定JobManager以及UI端口 -D 指定其他参数。比如多队列提交参数-Dyarn.application.queuehive -c 指定全类名
举例 本地模式 bin/flink run -m hadoop102:8081 -c com.hpu.flink.java.chapter_2.Flink03_WC_UnBoundedStream ./flink-prepare-1.0-SNAPSHOT.jarstandalone模式 bin/flink run -m hadoop102:8081 -c com.hpu.flink.java.chapter_2.Flink03_WC_UnBoundedStream ./flink-prepare-1.0-SNAPSHOT.jaryarn模式 per-job bin/flink run -d -t yarn-per-job -c com.hpu.flink.java.chapter_2.Flink03_WC_UnBoundedStream ./flink-prepare-1.0-SNAPSHOT.jar提交任务到Yarn的其他队列 bin/flink run -d -m yarn-cluster -yqu hive -c com.hpu.flink.java.chapter_2.Flink03_WC_UnBoundedStream ./flink-prepare-1.0-SNAPSHOT.jar老版本bin/flink run -d -t yarn-per-job -Dyarn.application.queuehive -c com.hpu.flink.java.chapter_2.Flink03_WC_UnBoundedStream ./flink-prepare-1.0-SNAPSHOT.jarsession-cluster 启动一个Flink-session在Session上运行Job bin/yarn-session.sh -d bin/flink run -c com.hpu.flink.java.chapter_2.Flink03_WC_UnBoundedStream ./flink-prepare-1.0-SNAPSHOT.jarbin/flink run -t yarn-session -Dyarn.application.idapplication_XXXX_YY -c com.hpu.flink.java.chapter_2.Flink03_WC_UnBoundedStream ./flink-prepare-1.0-SNAPSHOT.jar如果是1.12版本开启了Yarn模式的高可用上面指定yarn-session集群的命令不能用需要去掉 -t yarn-session 1.13版本已修复如果存在多个session集群可以指定application进行提交到指定session集群中。 bin/flink run -Dyarn.application.idapplication_XXXX_YY -c com.hpu.flink.java.chapter_2.Flink03_WC_UnBoundedStream ./flink-prepare-1.0-SNAPSHOT.jar在session中提交一个任务此时session对应的flink集群在yarn上的任务为2个container其中一个为JobManager一个为TaskManager。 bin/yarn-session.sh -d 启动以后会有一个container为JobManager bin/flink run -c com.sunmi.day01.Flink03_Stream_Unbounded_WordCount /home/hadoop/jars/Flink-202307-1.0-SNAPSHOT-jar-with-dependencies.jar提交一个任务后增加一个container为TaskManager bin/flink run -c com.sunmi.day01.Flink03_Stream_Unbounded_WordCount /home/hadoop/jars/Flink-202307-1.0-SNAPSHOT-jar-with-dependencies.jar再提交一个会再增加一个container存放对应的TaskManager flink默认配置conf/flink-conf.yaml,决定任务内存大小以及所需的slot个数。 application mode: bin/flink run-application -t yarn-application -c com.sunmi.day01.Flink03_Stream_Unbounded_WordCount /home/hadoop/jars/Flink-202307-1.0-SNAPSHOT-jar-with-dependencies.jarApplication Mode VS yarn per-job
分别使用Application Mode、yarn per-job两种方式提交任务观察application情况。
package com.sunmi.day01;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class Flink04_Test_PerJob_ApplicationMode {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();test1(env);test2(env);test3(env);}public static void test1(StreamExecutionEnvironment env) throws Exception {DataStreamSourceString stringDataStreamSource env.fromElements(22222);stringDataStreamSource.map(new MapFunctionString, String() {Overridepublic String map(String value) throws Exception {return value;}}).print();env.execute();}public static void test2(StreamExecutionEnvironment env) throws Exception {DataStreamSourceString stringDataStreamSource env.fromElements(22222);stringDataStreamSource.map(new MapFunctionString, String() {Overridepublic String map(String value) throws Exception {return value;}}).print();env.execute();}public static void test3(StreamExecutionEnvironment env) throws Exception {DataStreamSourceString stringDataStreamSource env.socketTextStream(zyn-node01, 9999);stringDataStreamSource.map(new MapFunctionString, String() {Overridepublic String map(String value) throws Exception {return value;}}).print();env.execute();}
}application mode:
bin/flink run-application -t yarn-application -c com.sunmi.day01.Flink04_Test_PerJob_ApplicationMode /home/hadoop/jars/Flink-202307-1.0-SNAPSHOT-jar-with-dependencies.jar仅存在一个application。
per-job:
bin/flink run -d -t yarn-per-job -Dyarn.application.queuehive -c com.sunmi.day01.Flink04_Test_PerJob_ApplicationMode /home/hadoop/jars/Flink-202307-1.0-SNAPSHOT-jar-with-dependencies.jar3个job生成3个application。
停止任务的三种方式 通过 flink cancel jobid 在yarn网页端kill对应application 在flink网页端cancel 进入flink网页端有两种方式 通过提交任务时生成的链接进入 通过yarn对应application的applicationMaster代理进入
5、注意事项
在java语法的flink编程中调用一个方法有以下三种实现方式
自定义一个类实现接口 √写接口的匿名实现类 √写Lambda表达式 注意在写Lambda表达式的时候可能会因为类型擦除的原因报错解决方式如下 在方法的最后调用.returns(Types.类型)解决 比如 SingleOutputStreamOperatorTuple2String, Integer wordToOneDStream wordDStream.map(value - Tuple2.of(value, 1)).returns(Types.TUPLE(Types.STRING,Types.INT)); 外链图片转存中…(img-f84HwYXQ-1689861444196)] 通过yarn对应application的applicationMaster代理进入 [外链图片转存中…(img-AKY0p7au-1689861444196)]
5、注意事项
在java语法的flink编程中调用一个方法有以下三种实现方式
自定义一个类实现接口 √写接口的匿名实现类 √写Lambda表达式 注意在写Lambda表达式的时候可能会因为类型擦除的原因报错解决方式如下 在方法的最后调用.returns(Types.类型)解决 比如 SingleOutputStreamOperatorTuple2String, Integer wordToOneDStream wordDStream.map(value - Tuple2.of(value, 1)).returns(Types.TUPLE(Types.STRING,Types.INT));