分类信息的网站排名怎么做,姜堰网页设计,公司建设网站需要什么资质,青岛制作网站软件版本说明
Flink和kafka的版本号有一定的匹配关系#xff0c;操作成功的版本#xff1a;
Flink1.17.1kafka_2.12-3.3.1 添加kafka连接器依赖
将flink-sql-connector-kafka-1.17.1.jar上传到flink的lib目录下
下载flink-sql-connector-kafka连接器jar包
https://mvnreposi…版本说明
Flink和kafka的版本号有一定的匹配关系操作成功的版本
Flink1.17.1kafka_2.12-3.3.1 添加kafka连接器依赖
将flink-sql-connector-kafka-1.17.1.jar上传到flink的lib目录下
下载flink-sql-connector-kafka连接器jar包
https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka/1.17.1 上传到flink的lib目录下
[hadoopnode2 ~]$ cp flink-connector-kafka-1.17.1.jar $FLINK_HOME/lib
分发flink-connector-kafka-1.17.1.jar
xsync $FLINK_HOME/lib/flink-connector-kafka-1.17.1.jar
启动yarn-session
[hadoopnode2 ~]$ myhadoop.sh start
[hadoopnode2 ~]$ yarn-session.sh -d启动kafka集群
[hadoopnode2 ~]$ zk.sh start
[hadoopnode2 ~]$ kf.sh start
创建kafka主题
查看主题
[hadoopnode2 ~]$ kafka-topics.sh --bootstrap-server node2:9092,node3:9092,node4:9092 --list
如果没有ws1,则创建
[hadoopnode2 ~]$ kafka-topics.sh --bootstrap-server node2:9092,node3:9092,node4:9092 --create --replication-factor 1 --partitions 1 --topic ws1
普通Kafka表
connector kafka 进入Flink SQL客户端
[hadoopnode2 ~]$ sql-client.sh embedded -s yarn-session
...
省略若干日志输出
...
Flink SQL 创建Kafka的映射表
CREATE TABLE t1( event_time TIMESTAMP(3) METADATA FROM timestamp,--列名和元数据名一致可以省略 FROM xxxx, VIRTUAL表示只读partition BIGINT METADATA VIRTUAL,offset BIGINT METADATA VIRTUAL,
id int,
ts bigint ,
vc int )
WITH (connector kafka,properties.bootstrap.servers node2:9092,node3:9092,node4:9094,properties.group.id test,
-- earliest-offset, latest-offset, group-offsets, timestamp and specific-offsetsscan.startup.mode earliest-offset,-- fixed为flink实现的分区器一个并行度只写往kafka一个分区
sink.partitioner fixed,topic ws1,format json
);
可以往kafka读数据也可以往kafka写数据。 插入数据到Kafka表
如果没有source表先创建source表如果source表存在则不需要再创建。
CREATE TABLE source ( id INT, ts BIGINT, vc INT
) WITH ( connector datagen, rows-per-second1, fields.id.kindrandom, fields.id.min1, fields.id.max10, fields.ts.kindsequence, fields.ts.start1, fields.ts.end1000000, fields.vc.kindrandom, fields.vc.min1, fields.vc.max100
);
把source表插入t1表
insert into t1(id,ts,vc) select * from source;
如果报错
[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassNotFoundException: org.apache.kafka.common.serialization.ByteArraySerializer
依然同样错误还不行把kafka libs目录下的kafka-clients-3.3.1.jar把jar包发到Flink的lib目录同时也注意重启sql-client、yarn-session也要重启重要
cp $KAFKA_HOME/libs/kafka-clients-3.3.1.jar $FLINK_HOME/lib
查看是否复制成功
$ ls $FLINK_HOME/lib 重启sql-client重新操作成功如下
Flink SQL CREATE TABLE t1( event_time TIMESTAMP(3) METADATA FROM timestamp,--列名和元数据名一致可以省略 FROM xxxx, VIRTUAL表示只读partition BIGINT METADATA VIRTUAL,offset BIGINT METADATA VIRTUAL,id int, ts bigint , vc int )WITH (connector kafka,properties.bootstrap.servers node2:9092,node3:9092,node4:9094,properties.group.id test,-- earliest-offset, latest-offset, group-offsets, timestamp and specific-offsetsscan.startup.mode earliest-offset,-- fixed为flink实现的分区器一个并度只写往kafka一个分区sink.partitioner fixed,topic ws1,format json);
[INFO] Execute statement succeed.
Flink SQL CREATE TABLE source ( id INT, ts BIGINT, vc INT) WITH ( connector datagen, rows-per-second1, fields.id.kindrandom, fields.id.min1, fields.id.max10, fields.ts.kindsequence, fields.ts.start1, fields.ts.end1000000, fields.vc.kindrandom, fields.vc.min1, fields.vc.max100);
[INFO] Execute statement succeed.
Flink SQL insert into t1(id,ts,vc) select * from source;2024-06-14 10:45:30,125 WARN org.apache.flink.yarn.configuration.YarnLogConfigUtil [] - The configuration directory (/home/hadoop/soft/flink-1.17.1/conf) already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
2024-06-14 10:45:30,673 INFO org.apache.hadoop.yarn.client.RMProxy [] - Connecting to ResourceManager at node3/192.168.193.143:8032
2024-06-14 10:45:31,027 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2024-06-14 10:45:31,227 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Found Web Interface node3:41749 of application application_1718331886020_0001.
insert into t1(id,ts,vc) select * from source;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: b1765f969c3ae637bd4c8100efbb0c4e
查询Kafka表
select * from t1;
报错
[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassNotFoundException: org.apache.kafka.clients.consumer.ConsumerRecord
重启yarn session重新操作成功如下
Flink SQL CREATE TABLE t1( event_time TIMESTAMP(3) METADATA FROM timestamp,--列名和元数据名一致可以省略 FROM xxxx, VIRTUAL表示只读partition BIGINT METADATA VIRTUAL,offset BIGINT METADATA VIRTUAL,id int, ts bigint , vc int )WITH (connector kafka,properties.bootstrap.servers node2:9092,node3:9092,node4:9094,properties.group.id test,-- earliest-offset, latest-offset, group-offsets, timestamp and specific-offsetsscan.startup.mode earliest-offset,-- fixed为flink实现的分区器一个并??度只写往kafka一个分区sink.partitioner fixed,topic ws1,format json);
[INFO] Execute statement succeed.
Flink SQL CREATE TABLE source ( id INT, ts BIGINT, vc INT) WITH ( connector datagen, rows-per-second1, fields.id.kindrandom, fields.id.min1, fields.id.max10, fields.ts.kindsequence, fields.ts.start1, fields.ts.end1000000, fields.vc.kindrandom, fields.vc.min1, fields.vc.max100);
[INFO] Execute statement succeed.
Flink SQL insert into t1(id,ts,vc) select * from source;2024-06-14 11:22:17,971 WARN org.apache.flink.yarn.configuration.YarnLogConfigUtil [] - The configuration directory (/home/hadoop/soft/flink-1.17.1/conf) already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
2024-06-14 11:22:18,422 INFO org.apache.hadoop.yarn.client.RMProxy [] - Connecting to ResourceManager at node3/192.168.193.143:8032
2024-06-14 11:22:18,895 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2024-06-14 11:22:19,052 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Found Web Interface node4:38788 of application application_1718331886020_0002.
insert into t1(id,ts,vc) select * from source;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 84292f84d1fce4756ccd8ae294b6163a
Flink SQL select * from t1;2024-06-14 11:23:38,338 WARN org.apache.flink.yarn.configuration.YarnLogConfigUtil [] - The configuration directory (/home/hadoop/soft/flink-1.17.1/conf) already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
2024-06-14 11:23:38,606 INFO org.apache.hadoop.yarn.client.RMProxy [] - Connecting to ResourceManager at node3/192.168.193.143:8032
2024-06-14 11:23:38,617 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2024-06-14 11:23:38,649 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Found Web Interface node4:38788 of application application_1718331886020_0002.
select * from t1;
[INFO] Result retrieval cancelled.
Flink SQL
upsert-kafka表
connector upsert-kafka
如果当前表存在更新操作那么普通的kafka连接器将无法满足此时可以使用Upsert Kafka连接器。
创建upsert-kafka的映射表(必须定义主键)
CREATE TABLE t2( id int , sumVC int ,primary key (id) NOT ENFORCED
)
WITH (connector upsert-kafka,properties.bootstrap.servers node2:9092,topic ws2,key.format json,value.format json
);
如果没有kafka名为ws2的topic将自动被创建。 插入upsert-kafka表
insert into t2 select id,sum(vc) sumVC from source group by id;
查询upsert-kafka表
upsert-kafka 无法从指定的偏移量读取只会从主题的源读取。如此才知道整个数据的更新过程。并且通过 -UUI 等符号来显示数据的变化过程。
设置显示模式
SET sql-client.execution.result-modetableau; 查询t2表数据
select * from t2;
如果发现没有输出数据原因是之前的source表已经生成到end1000000就不再生成数据了。
进入Flink Web UIcancel掉所有running job重新操作成功如下
删除表
Flink SQL show tables;
------------
| table name |
------------
| source |
| t1 |
| t2 |
------------
3 rows in set
Flink SQL drop table source;
Flink SQL drop table t1;
Flink SQL drop table t2;
创建表
CREATE TABLE source ( id INT, ts BIGINT, vc INT
) WITH ( connector datagen, rows-per-second1, fields.id.kindrandom, fields.id.min1, fields.id.max10, fields.ts.kindsequence, fields.ts.start1, fields.ts.end1000000, fields.vc.kindrandom, fields.vc.min1, fields.vc.max100
);
CREATE TABLE t2( id int , sumVC int ,primary key (id) NOT ENFORCED
)
WITH (connector upsert-kafka,properties.bootstrap.servers node2:9092,topic ws2,key.format json,value.format json
); 设置显示模式
SET sql-client.execution.result-modetableau;
查询表
select * from t2; 完成enjoy it!