北京网站建设软件,有做火币网这种网站的吗,c2c网站建设实例,聚美优品的网站建设Flink 系列文章
1、Flink 部署、概念介绍、source、transformation、sink使用示例、四大基石介绍和示例等系列综合文章链接
13、Flink 的table api与sql的基本概念、通用api介绍及入门示例 14、Flink 的table api与sql之数据类型: 内置数据类型以及它们的属性 15、Flink 的ta…Flink 系列文章
1、Flink 部署、概念介绍、source、transformation、sink使用示例、四大基石介绍和示例等系列综合文章链接
13、Flink 的table api与sql的基本概念、通用api介绍及入门示例 14、Flink 的table api与sql之数据类型: 内置数据类型以及它们的属性 15、Flink 的table api与sql之流式概念-详解的介绍了动态表、时间属性配置如何处理更新结果、时态表、流上的join、流上的确定性以及查询配置 16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及FileSystem示例1 16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Elasticsearch示例2 16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Kafka示例3 16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及JDBC示例4 16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Hive示例6 17、Flink 之Table API: Table API 支持的操作1 17、Flink 之Table API: Table API 支持的操作2 18、Flink的SQL 支持的操作和语法 19、Flink 的Table API 和 SQL 中的内置函数及示例1 19、Flink 的Table API 和 SQL 中的自定义函数及示例2 19、Flink 的Table API 和 SQL 中的自定义函数及示例3 19、Flink 的Table API 和 SQL 中的自定义函数及示例4 20、Flink SQL之SQL Client: 不用编写代码就可以尝试 Flink SQL可以直接提交 SQL 任务到集群上 21、Flink 的table API与DataStream API 集成1- 介绍及入门示例、集成说明 21、Flink 的table API与DataStream API 集成2- 批处理模式和inser-only流处理 21、Flink 的table API与DataStream API 集成3- changelog流处理、管道示例、类型转换和老版本转换示例 21、Flink 的table API与DataStream API 集成完整版 22、Flink 的table api与sql之创建表的DDL 24、Flink 的table api与sql之Catalogs介绍、类型、java api和sql实现ddl、java api和sql操作catalog-1 24、Flink 的table api与sql之Catalogsjava api操作数据库、表-2 24、Flink 的table api与sql之Catalogsjava api操作视图-3 24、Flink 的table api与sql之Catalogsjava api操作分区与函数-4 25、Flink 的table api与sql之函数(自定义函数示例) 26、Flink 的SQL之概览与入门示例 27、Flink 的SQL之SELECT (select、where、distinct、order by、limit、集合操作和去重)介绍及详细示例1 27、Flink 的SQL之SELECT (SQL Hints 和 Joins)介绍及详细示例2 27、Flink 的SQL之SELECT (窗口函数)介绍及详细示例3 27、Flink 的SQL之SELECT (窗口聚合)介绍及详细示例4 27、Flink 的SQL之SELECT (Group Aggregation分组聚合、Over Aggregation Over聚合 和 Window Join 窗口关联)介绍及详细示例5 27、Flink 的SQL之SELECT (Top-N、Window Top-N 窗口 Top-N 和 Window Deduplication 窗口去重)介绍及详细示例6 27、Flink 的SQL之SELECT (Pattern Recognition 模式检测)介绍及详细示例7 28、Flink 的SQL之DROP 、ALTER 、INSERT 、ANALYZE 语句 29、Flink SQL之DESCRIBE、EXPLAIN、USE、SHOW、LOAD、UNLOAD、SET、RESET、JAR、JOB Statements、UPDATE、DELETE1 29、Flink SQL之DESCRIBE、EXPLAIN、USE、SHOW、LOAD、UNLOAD、SET、RESET、JAR、JOB Statements、UPDATE、DELETE2 30、Flink SQL之SQL 客户端通过kafka和filesystem的例子介绍了配置文件使用-表、视图等 31、Flink的SQL Gateway介绍及示例 32、Flink table api和SQL 之用户自定义 Sources Sinks实现及详细示例 33、Flink 的Table API 和 SQL 中的时区 35、Flink 的 Formats 之CSV 和 JSON Format 36、Flink 的 Formats 之Parquet 和 Orc Format 41、Flink之Hive 方言介绍及详细示例 40、Flink 的Apache Kafka connectorkafka source的介绍及使用示例-1 40、Flink 的Apache Kafka connectorkafka sink的介绍及使用示例-2 40、Flink 的Apache Kafka connectorkafka source 和sink 说明及使用示例 完整版 42、Flink 的table api与sql之Hive Catalog 43、Flink之Hive 读写及详细验证示例 44、Flink之module模块介绍及使用示例和Flink SQL使用hive内置函数及自定义函数详细示例–网上有些说法好像是错误的 文章目录 Flink 系列文章一、Apache Kafka 连接器3、kafka sourcefunction4、kafka sink1、使用示例1、Flink 1.13版本实现2、Flink 1.17版本实现3、说明 2、序列化器3、容错4、监控 5、kafka producer6、kafka 连接器指标7、启用 Kerberos 身份验证8、升级到最近的连接器版本9、问题排查1、数据丢失2、UnknownTopicOrPartitionException3、ProducerFencedException 本文介绍了kafka的版本功能更能换、作为sink的使用、连接器的指标、身份认证、版本升级和问题排查几个主要 方面关于常用的功能均以可运行的示例进行展示并提供完整的验证步骤。 本专题为了便于阅读以及整体查阅分为三个部分 40、Flink 的Apache Kafka connectorkafka source的介绍及使用示例-1 40、Flink 的Apache Kafka connectorkafka sink的介绍及使用示例-2 40、Flink 的Apache Kafka connectorkafka source 和sink 说明及使用示例 完整版 本文依赖kafka集群能正常使用。 本文分为9个部分即sink、producer/sourceFlink版本升级、连接器指标、身份认证、版本升级及问题排查。 本文的示例是在Flink 1.17版本中运行。
一、Apache Kafka 连接器
Flink 提供了 Apache Kafka 连接器使用精确一次Exactly-once的语义在 Kafka topic 中读取和写入数据。
3、kafka sourcefunction
FlinkKafkaConsumer 已被弃用并将在 Flink 1.17 中移除请改用 KafkaSource。 1.13版本的实现参考本文开头的示例。
4、kafka sink
KafkaSink 可将数据流写入一个或多个 Kafka topic。
1、使用示例
Kafka sink 提供了构建类来创建 KafkaSink 的实例。
以下代码片段展示了如何将字符串数据按照至少一次at lease once的语义保证写入 Kafka topic
1、Flink 1.13版本实现
实现代码
import java.util.Properties;
import java.util.Random;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.util.Collector;/*** author alanchan**/
public class TestKafkaSinkDemo {public static void test1() throws Exception {// 1、envStreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 2、source-主题:alan_source// 准备kafka连接参数Properties propSource new Properties();propSource.setProperty(bootstrap.servers, 192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092);// 集群地址propSource.setProperty(group.id, flink_kafka);propSource.setProperty(auto.offset.reset, latest);propSource.setProperty(flink.partition-discovery.interval-millis, 5000);propSource.setProperty(enable.auto.commit, true);// 自动提交的时间间隔propSource.setProperty(auto.commit.interval.ms, 2000);FlinkKafkaConsumerString kafkaSource new FlinkKafkaConsumerString(alan_source, new SimpleStringSchema(), propSource);// 使用kafkaSourceDataStreamString kafkaDS env.addSource(kafkaSource);// 3、transformation-统计单词个数SingleOutputStreamOperatorString result kafkaDS.flatMap(new FlatMapFunctionString, Tuple2String, Integer() {private Random ran new Random();Overridepublic void flatMap(String value, CollectorTuple2String, Integer out) throws Exception {String[] arr value.split(,);for (String word : arr) {out.collect(Tuple2.of(word, 1));}}}).keyBy(t - t.f0).sum(1).map(new MapFunctionTuple2String, Integer, String() {Overridepublic String map(Tuple2String, Integer value) throws Exception {System.out.println(输出 value.f0 - value.f1);return value.f0 - value.f1;}});// 4、sink-主题alan_sinkProperties propSink new Properties();propSink.setProperty(bootstrap.servers, 192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092);propSink.setProperty(transaction.timeout.ms, 5000);FlinkKafkaProducerString kafkaSink new FlinkKafkaProducer(alan_sink, new KeyedSerializationSchemaWrapper(new SimpleStringSchema()), propSink,FlinkKafkaProducer.Semantic.EXACTLY_ONCE); // fault-toleranceresult.addSink(kafkaSink);// 5、executeenv.execute();}public static void main(String[] args) throws Exception {test1();}}验证 1、创建kafka 主题 alan_source 和 alan_sink 2、驱动程序观察运行控制台 3、通过命令往alan_source 写入数据同时消费 alan_sink 主题的数据
## kafka生产数据
[alanchanserver2 bin]$ kafka-console-producer.sh --broker-list 192.168.10.41:9092 --topic alan_source
alan,alach,alanchan,hello
alan_chan,hi,flink
alan,flink,good
alan,alach,alanchan,hello
hello,123
## kafka消费数据
[alanchanserver2 bin]$ kafka-console-consumer.sh --bootstrap-server server1:9092 --topic alan_sink --from-beginning
alanchan-1
hello-1
alan-1
alach-1
flink-1
alan_chan-1
hi-1
alan-2
flink-2
good-1
alanchan-2
hello-2
alan-3
alach-2
hello-3
123-1
4、应用程序控制台输出
2、Flink 1.17版本实现
代码实现
import java.util.Properties;
import java.util.Random;import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.util.Collector;/*** author alanchan**/
public class TestKafkaSinkDemo {public static void test2() throws Exception {// 1、envStreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 2、 sourceKafkaSourceString source KafkaSource.Stringbuilder().setBootstrapServers(192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092).setTopics(alan_nsource).setGroupId(flink_kafka).setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamString kafkaDS env.fromSource(source, WatermarkStrategy.noWatermarks(), Kafka Source);// 3、 transformationDataStreamString result kafkaDS.flatMap(new FlatMapFunctionString, Tuple2String, Integer() {Overridepublic void flatMap(String value, CollectorTuple2String, Integer out) throws Exception {String[] arr value.split(,);for (String word : arr) {out.collect(Tuple2.of(word, 1));}}}).keyBy(t - t.f0).sum(1).map(new MapFunctionTuple2String, Integer, String() {Overridepublic String map(Tuple2String, Integer value) throws Exception {System.out.println(输出 value.f0 - value.f1);return value.f0 - value.f1;}});// 4、 sinkKafkaSinkString kafkaSink KafkaSink.Stringbuilder().setBootstrapServers(192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092).setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopic(alan_nsink).setValueSerializationSchema(new SimpleStringSchema()).build()).setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE).build();result.sinkTo(kafkaSink);// 5、executeenv.execute();}public static void main(String[] args) throws Exception {
// test1();test2();}}
验证 1、创建kafka 主题 alan_nsource 和 alan_nsink 2、驱动程序观察运行控制台 3、通过命令往alan_nsource 写入数据同时消费 alan_nsink 主题的数据
## kafka生产数据
[alanchanserver2 bin]$ kafka-console-producer.sh --broker-list 192.168.10.41:9092 --topic alan_nsource
alan,alach,alanchan,hello
alan_chan,hi,flink
alan,flink,good
alan,alach,alanchan,hello
hello,123
## kafka消费数据
[alanchanserver2 bin]$ kafka-console-consumer.sh --bootstrap-server server1:9092 --topic alan_nsink --from-beginning
alanchan-1
hello-1
alan-1
alach-1
flink-1
alan_chan-1
hi-1
alan-2
flink-2
good-1
alanchan-2
alach-2
alan-3
hello-2
hello-3
123-1
4、应用程序控制台输出
3、说明
以下属性在构建 KafkaSink 时是必须指定的
Bootstrap servers, setBootstrapServers(String)消息序列化器Serializer, setRecordSerializer(KafkaRecordSerializationSchema)如果使用DeliveryGuarantee.EXACTLY_ONCE 的语义保证则需要使用 setTransactionalIdPrefix(String)
2、序列化器
构建时需要提供 KafkaRecordSerializationSchema 来将输入数据转换为 Kafka 的 ProducerRecord。Flink 提供了 schema 构建器 以提供一些通用的组件例如消息键key/消息体value序列化、topic 选择、消息分区同样也可以通过实现对应的接口来进行更丰富的控制。
KafkaRecordSerializationSchema.builder().setTopicSelector(new TopicSelector() {Overridepublic String apply(Object t) {//设置选择的 topic return alan_nsink;}}).setValueSerializationSchema(new SimpleStringSchema()).setKeySerializationSchema(new SimpleStringSchema()).setPartitioner(new FlinkFixedPartitioner()).build()示例代码
import java.util.Properties;
import java.util.Random;import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.sink.TopicSelector;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.util.Collector;/*** author alanchan**/
public class TestKafkaSinkDemo {public static void test3() throws Exception {// 1、envStreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 2、 sourceKafkaSourceString source KafkaSource.Stringbuilder().setBootstrapServers(192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092).setTopics(alan_nsource).setGroupId(flink_kafka).setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamString kafkaDS env.fromSource(source, WatermarkStrategy.noWatermarks(), Kafka Source);// 3、 transformationDataStreamString result kafkaDS.flatMap(new FlatMapFunctionString, Tuple2String, Integer() {Overridepublic void flatMap(String value, CollectorTuple2String, Integer out) throws Exception {String[] arr value.split(,);for (String word : arr) {out.collect(Tuple2.of(word, 1));}}}).keyBy(t - t.f0).sum(1).map(new MapFunctionTuple2String, Integer, String() {Overridepublic String map(Tuple2String, Integer value) throws Exception {System.out.println(输出 value.f0 - value.f1);return value.f0 - value.f1;}});// 4、 sinkKafkaSinkString kafkaSink KafkaSink.Stringbuilder().setBootstrapServers(192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092).setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopicSelector(new TopicSelector() {Overridepublic String apply(Object t) {//设置选择的 topic return alan_nsink;}}).setValueSerializationSchema(new SimpleStringSchema()).setKeySerializationSchema(new SimpleStringSchema()).setPartitioner(new FlinkFixedPartitioner()).build()).setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE).build();result.sinkTo(kafkaSink);// 5、executeenv.execute();}public static void main(String[] args) throws Exception {test3();}}
验证 1、创建kafka 主题 alan_nsource 和 alan_nsink 2、驱动程序观察运行控制台 3、通过命令往alan_nsource 写入数据同时消费 alan_nsink 主题的数据
## kafka生产数据
[alanchanserver2 bin]$ kafka-console-producer.sh --broker-list 192.168.10.41:9092 --topic alan_nsource
alan,alach,alanchan,hello
alan_chan,hi,flink
alan,flink,good
alan,alach,alanchan,hello
hello,123
## kafka消费数据
[alanchanserver2 bin]$ kafka-console-consumer.sh --bootstrap-server server1:9092 --topic alan_nsink --from-beginning
alanchan-1
hello-1
alan-1
alach-1
flink-1
alan_chan-1
hi-1
alan-2
flink-2
good-1
alanchan-2
alach-2
alan-3
hello-2
hello-3
123-1
4、应用程序控制台输出
其中消息体value序列化方法和 topic 的选择方法是必须指定的此外也可以通过 setKafkaKeySerializer(Serializer) 或 setKafkaValueSerializer(Serializer) 来使用 Kafka 提供而非 Flink 提供的序列化器。
3、容错
KafkaSink 总共支持三种不同的语义保证DeliveryGuarantee。对于 DeliveryGuarantee.AT_LEAST_ONCE 和 DeliveryGuarantee.EXACTLY_ONCEFlink checkpoint 必须启用。
默认情况下 KafkaSink 使用 DeliveryGuarantee.NONE。
以下是对不同语义保证的解释
DeliveryGuarantee.NONE 不提供任何保证消息有可能会因 Kafka broker 的原因发生丢失或因 Flink 的故障发生重复。DeliveryGuarantee.AT_LEAST_ONCE: sink 在 checkpoint 时会等待 Kafka 缓冲区中的数据全部被 Kafka producer 确认。消息不会因 Kafka broker 端发生的事件而丢失但可能会在 Flink 重启时重复因为 Flink 会重新处理旧数据。DeliveryGuarantee.EXACTLY_ONCE: 该模式下Kafka sink 会将所有数据通过在 checkpoint 时提交的事务写入。因此如果 consumer 只读取已提交的数据参见 Kafka consumer 配置 isolation.level在 Flink 发生重启时不会发生数据重复。然而这会使数据在 checkpoint 完成时才会可见因此请按需调整 checkpoint 的间隔。请确认事务 ID 的前缀transactionIdPrefix对不同的应用是唯一的以保证不同作业的事务 不会互相影响此外强烈建议将 Kafka 的事务超时时间调整至远大于 checkpoint 最大间隔 最大重启时间否则 Kafka 对未提交事务的过期处理会导致数据丢失。
关于容错请参考文章9、Flink四大基石之Checkpoint容错机制详解及示例checkpoint配置、重启策略、手动恢复checkpoint和savepoint
4、监控
Kafka sink 会在不同的范围Scope中汇报下列指标。
5、kafka producer
FlinkKafkaProducer 已被弃用并将在 Flink 1.15 中移除请改用 KafkaSink。
关于Flink 1.13版本的实现请参考上文中的示例。
6、kafka 连接器指标
Flink 的 Kafka 连接器通过 Flink 的指标系统提供一些指标来帮助分析 connector 的行为。 各个版本的 Kafka producer 和 consumer 会通过 Flink 的指标系统汇报 Kafka 内部的指标。 该 Kafka 文档列出了所有汇报的指标。
同样也可通过将 Kafka source 在该章节描述的 register.consumer.metrics或 Kafka sink 的 register.producer.metrics 配置设置为 false 来关闭 Kafka 指标的注册。
7、启用 Kerberos 身份验证
Flink 通过 Kafka 连接器提供了一流的支持可以对 Kerberos 配置的 Kafka 安装进行身份验证。只需在 flink-conf.yaml 中配置 Flink。 像这样为 Kafka 启用 Kerberos 身份验证
1、通过设置以下内容配置 Kerberos 票据
security.kerberos.login.use-ticket-cache默认情况下这个值是 trueFlink 将尝试在 kinit 管理的票据缓存中使用 Kerberos 票据。注意在 YARN 上部署的 Flink jobs 中使用 Kafka 连接器时使用票据缓存的 Kerberos 授权将不起作用。security.kerberos.login.keytab 和 security.kerberos.login.principal要使用 Kerberos keytabs需为这两个属性设置值。
2、将 KafkaClient 追加到 security.kerberos.login.contexts这告诉 Flink 将配置的 Kerberos 票据提供给 Kafka 登录上下文以用于 Kafka 身份验证。
一旦启用了基于 Kerberos 的 Flink 安全性后只需在提供的属性配置中包含以下两个设置通过传递给内部 Kafka 客户端即可使用 Flink Kafka Consumer 或 Producer 向 Kafk a进行身份验证
将 security.protocol 设置为 SASL_PLAINTEXT默认为 NONE用于与 Kafka broker 进行通信的协议。使用独立 Flink 部署时也可以使用 SASL_SSL请在此处查看如何为 SSL 配置 Kafka 客户端。将 sasl.kerberos.service.name 设置为 kafka默认为 kafka此值应与用于 Kafka broker 配置的 sasl.kerberos.service.name 相匹配。客户端和服务器配置之间的服务名称不匹配将导致身份验证失败。
有关 Kerberos 安全性 Flink 配置的更多信息请参见这里。你也可以在这里进一步了解 Flink 如何在内部设置基于 kerberos 的安全性。
该部分由于没有环境未做验证内容来至于官网。将来拟计划以专栏的形式介绍该部分内容。
8、升级到最近的连接器版本
通用的升级步骤概述见 升级 Jobs 和 Flink 版本指南。对于 Kafka你还需要遵循这些步骤
不要同时升级 Flink 和 Kafka 连接器确保你对 Consumer 设置了 group.id在 Consumer 上设置 setCommitOffsetsOnCheckpoints(true)以便读 offset 提交到 Kafka。务必在停止和恢复 savepoint 前执行此操作。你可能需要在旧的连接器版本上进行停止/重启循环来启用此设置。在 Consumer 上设置 setStartFromGroupOffsets(true)以便我们从 Kafka 获取读 offset。这只会在 Flink 状态中没有读 offset 时生效这也是为什么下一步非要重要的原因。修改 source/sink 分配到的 uid。这会确保新的 source/sink 不会从旧的 sink/source 算子中读取状态。使用 --allow-non-restored-state 参数启动新 job因为我们在 savepoint 中仍然有先前连接器版本的状态。
9、问题排查 如果在使用 Flink 时对 Kafka 有问题Flink 只封装 KafkaConsumer 或 KafkaProducer你的问题可能独立于 Flink有时可以通过升级 Kafka broker 程序、重新配置 Kafka broker 程序或在 Flink 中重新配置 KafkaConsumer 或 KafkaProducer 来解决。 一句话大概是kafka的问题或配置的kafka的问题和flink关系不大。
下面列出了一些常见问题的示例。
1、数据丢失
根据你的 Kafka 配置即使在 Kafka 确认写入后你仍然可能会遇到数据丢失。特别要记住在 Kafka 的配置中设置以下属性
ackslog.flush.interval.messageslog.flush.interval.mslog.flush.*
上述选项的默认值是很容易导致数据丢失的。请参考 Kafka 文档以获得更多的解释。
2、UnknownTopicOrPartitionException
导致此错误的一个可能原因是正在进行新的 leader 选举例如在重新启动 Kafka broker 之后或期间。这是一个可重试的异常因此 Flink job 应该能够重启并恢复正常运行。也可以通过更改 producer 设置中的 retries 属性来规避。但是这可能会导致重新排序消息反过来可以通过将 max.in.flight.requests.per.connection 设置为 1 来避免不需要的消息。
3、ProducerFencedException
这个错误是由于 FlinkKafkaProducer 所生成的 transactional.id 与其他应用所使用的的产生了冲突。多数情况下由于 FlinkKafkaProducer 产生的 ID 都是以 taskName “-” operatorUid 为前缀的这些产生冲突的应用也是使用了相同 Job Graph 的 Flink Job。 我们可以使用 setTransactionalIdPrefix() 方法来覆盖默认的行为为每个不同的 Job 分配不同的 transactional.id 前缀来解决这个问题。
以上本文介绍了kafka的版本功能更能换、作为sink的使用、连接器的指标、身份认证、版本升级和问题排查几个主要 方面关于常用的功能均以可运行的示例进行展示并提供完整的验证步骤。 本专题为了便于阅读以及整体查阅分为三个部分 40、Flink 的Apache Kafka connectorkafka source的介绍及使用示例-1 40、Flink 的Apache Kafka connectorkafka sink的介绍及使用示例-2 40、Flink 的Apache Kafka connectorkafka source 和sink 说明及使用示例 完整版