当前位置: 首页 > news >正文

分类信息的网站排名怎么做姜堰网页设计

分类信息的网站排名怎么做,姜堰网页设计,公司建设网站需要什么资质,青岛制作网站软件版本说明 Flink和kafka的版本号有一定的匹配关系#xff0c;操作成功的版本#xff1a; Flink1.17.1kafka_2.12-3.3.1 添加kafka连接器依赖 将flink-sql-connector-kafka-1.17.1.jar上传到flink的lib目录下 下载flink-sql-connector-kafka连接器jar包 https://mvnreposi…版本说明 Flink和kafka的版本号有一定的匹配关系操作成功的版本 Flink1.17.1kafka_2.12-3.3.1 添加kafka连接器依赖 将flink-sql-connector-kafka-1.17.1.jar上传到flink的lib目录下 下载flink-sql-connector-kafka连接器jar包 https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka/1.17.1 上传到flink的lib目录下 [hadoopnode2 ~]$ cp flink-connector-kafka-1.17.1.jar $FLINK_HOME/lib 分发flink-connector-kafka-1.17.1.jar xsync $FLINK_HOME/lib/flink-connector-kafka-1.17.1.jar 启动yarn-session [hadoopnode2 ~]$ myhadoop.sh start [hadoopnode2 ~]$ yarn-session.sh -d启动kafka集群 [hadoopnode2 ~]$ zk.sh start [hadoopnode2 ~]$ kf.sh start 创建kafka主题 查看主题 [hadoopnode2 ~]$ kafka-topics.sh --bootstrap-server node2:9092,node3:9092,node4:9092 --list ​ 如果没有ws1,则创建 [hadoopnode2 ~]$ kafka-topics.sh --bootstrap-server node2:9092,node3:9092,node4:9092 --create --replication-factor 1 --partitions 1 --topic ws1 ​ 普通Kafka表 connector kafka 进入Flink SQL客户端 [hadoopnode2 ~]$ sql-client.sh embedded -s yarn-session ... 省略若干日志输出 ... Flink SQL 创建Kafka的映射表 CREATE TABLE t1( event_time TIMESTAMP(3) METADATA FROM timestamp,--列名和元数据名一致可以省略 FROM xxxx, VIRTUAL表示只读partition BIGINT METADATA VIRTUAL,offset BIGINT METADATA VIRTUAL, id int, ts bigint , vc int ) WITH (connector kafka,properties.bootstrap.servers node2:9092,node3:9092,node4:9094,properties.group.id test, -- earliest-offset, latest-offset, group-offsets, timestamp and specific-offsetsscan.startup.mode earliest-offset,-- fixed为flink实现的分区器一个并行度只写往kafka一个分区 sink.partitioner fixed,topic ws1,format json ); 可以往kafka读数据也可以往kafka写数据。 插入数据到Kafka表 如果没有source表先创建source表如果source表存在则不需要再创建。 CREATE TABLE source ( id INT, ts BIGINT, vc INT ) WITH ( connector datagen, rows-per-second1, fields.id.kindrandom, fields.id.min1, fields.id.max10, fields.ts.kindsequence, fields.ts.start1, fields.ts.end1000000, fields.vc.kindrandom, fields.vc.min1, fields.vc.max100 ); 把source表插入t1表 insert into t1(id,ts,vc) select * from source; 如果报错 [ERROR] Could not execute SQL statement. Reason: java.lang.ClassNotFoundException: org.apache.kafka.common.serialization.ByteArraySerializer 依然同样错误还不行把kafka libs目录下的kafka-clients-3.3.1.jar把jar包发到Flink的lib目录同时也注意重启sql-client、yarn-session也要重启重要 cp $KAFKA_HOME/libs/kafka-clients-3.3.1.jar $FLINK_HOME/lib 查看是否复制成功 $ ls $FLINK_HOME/lib 重启sql-client重新操作成功如下 Flink SQL CREATE TABLE t1( event_time TIMESTAMP(3) METADATA FROM timestamp,--列名和元数据名一致可以省略 FROM xxxx, VIRTUAL表示只读partition BIGINT METADATA VIRTUAL,offset BIGINT METADATA VIRTUAL,id int, ts bigint , vc int )WITH (connector kafka,properties.bootstrap.servers node2:9092,node3:9092,node4:9094,properties.group.id test,-- earliest-offset, latest-offset, group-offsets, timestamp and specific-offsetsscan.startup.mode earliest-offset,-- fixed为flink实现的分区器一个并度只写往kafka一个分区sink.partitioner fixed,topic ws1,format json); [INFO] Execute statement succeed. ​ Flink SQL CREATE TABLE source ( id INT, ts BIGINT, vc INT) WITH ( connector datagen, rows-per-second1, fields.id.kindrandom, fields.id.min1, fields.id.max10, fields.ts.kindsequence, fields.ts.start1, fields.ts.end1000000, fields.vc.kindrandom, fields.vc.min1, fields.vc.max100); [INFO] Execute statement succeed. ​ Flink SQL insert into t1(id,ts,vc) select * from source;2024-06-14 10:45:30,125 WARN org.apache.flink.yarn.configuration.YarnLogConfigUtil       [] - The configuration directory (/home/hadoop/soft/flink-1.17.1/conf) already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file. 2024-06-14 10:45:30,673 INFO org.apache.hadoop.yarn.client.RMProxy                       [] - Connecting to ResourceManager at node3/192.168.193.143:8032 2024-06-14 10:45:31,027 INFO org.apache.flink.yarn.YarnClusterDescriptor                 [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar 2024-06-14 10:45:31,227 INFO org.apache.flink.yarn.YarnClusterDescriptor                 [] - Found Web Interface node3:41749 of application application_1718331886020_0001. insert into t1(id,ts,vc) select * from source; [INFO] Submitting SQL update statement to the cluster... [INFO] SQL update statement has been successfully submitted to the cluster: Job ID: b1765f969c3ae637bd4c8100efbb0c4e ​ 查询Kafka表 select * from t1; 报错 [ERROR] Could not execute SQL statement. Reason: java.lang.ClassNotFoundException: org.apache.kafka.clients.consumer.ConsumerRecord​ 重启yarn session重新操作成功如下 Flink SQL CREATE TABLE t1( event_time TIMESTAMP(3) METADATA FROM timestamp,--列名和元数据名一致可以省略 FROM xxxx, VIRTUAL表示只读partition BIGINT METADATA VIRTUAL,offset BIGINT METADATA VIRTUAL,id int, ts bigint , vc int )WITH (connector kafka,properties.bootstrap.servers node2:9092,node3:9092,node4:9094,properties.group.id test,-- earliest-offset, latest-offset, group-offsets, timestamp and specific-offsetsscan.startup.mode earliest-offset,-- fixed为flink实现的分区器一个并??度只写往kafka一个分区sink.partitioner fixed,topic ws1,format json); [INFO] Execute statement succeed. ​ Flink SQL CREATE TABLE source ( id INT, ts BIGINT, vc INT) WITH ( connector datagen, rows-per-second1, fields.id.kindrandom, fields.id.min1, fields.id.max10, fields.ts.kindsequence, fields.ts.start1, fields.ts.end1000000, fields.vc.kindrandom, fields.vc.min1, fields.vc.max100); [INFO] Execute statement succeed. ​ Flink SQL insert into t1(id,ts,vc) select * from source;2024-06-14 11:22:17,971 WARN org.apache.flink.yarn.configuration.YarnLogConfigUtil       [] - The configuration directory (/home/hadoop/soft/flink-1.17.1/conf) already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file. 2024-06-14 11:22:18,422 INFO org.apache.hadoop.yarn.client.RMProxy                       [] - Connecting to ResourceManager at node3/192.168.193.143:8032 2024-06-14 11:22:18,895 INFO org.apache.flink.yarn.YarnClusterDescriptor                 [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar 2024-06-14 11:22:19,052 INFO org.apache.flink.yarn.YarnClusterDescriptor                 [] - Found Web Interface node4:38788 of application application_1718331886020_0002. insert into t1(id,ts,vc) select * from source; [INFO] Submitting SQL update statement to the cluster... [INFO] SQL update statement has been successfully submitted to the cluster: Job ID: 84292f84d1fce4756ccd8ae294b6163a ​ ​ Flink SQL select * from t1;2024-06-14 11:23:38,338 WARN org.apache.flink.yarn.configuration.YarnLogConfigUtil       [] - The configuration directory (/home/hadoop/soft/flink-1.17.1/conf) already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file. 2024-06-14 11:23:38,606 INFO org.apache.hadoop.yarn.client.RMProxy                       [] - Connecting to ResourceManager at node3/192.168.193.143:8032 2024-06-14 11:23:38,617 INFO org.apache.flink.yarn.YarnClusterDescriptor                 [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar 2024-06-14 11:23:38,649 INFO org.apache.flink.yarn.YarnClusterDescriptor                 [] - Found Web Interface node4:38788 of application application_1718331886020_0002. select * from t1; [INFO] Result retrieval cancelled. ​ Flink SQL ​ upsert-kafka表 connector upsert-kafka 如果当前表存在更新操作那么普通的kafka连接器将无法满足此时可以使用Upsert Kafka连接器。 创建upsert-kafka的映射表(必须定义主键) CREATE TABLE t2( id int , sumVC int ,primary key (id) NOT ENFORCED ) WITH (connector upsert-kafka,properties.bootstrap.servers node2:9092,topic ws2,key.format json,value.format json ); 如果没有kafka名为ws2的topic将自动被创建。 插入upsert-kafka表 insert into t2 select id,sum(vc) sumVC from source group by id; 查询upsert-kafka表 upsert-kafka 无法从指定的偏移量读取只会从主题的源读取。如此才知道整个数据的更新过程。并且通过 -UUI 等符号来显示数据的变化过程。 设置显示模式 SET sql-client.execution.result-modetableau; 查询t2表数据 select * from t2; 如果发现没有输出数据原因是之前的source表已经生成到end1000000就不再生成数据了。 进入Flink Web UIcancel掉所有running job重新操作成功如下 删除表 Flink SQL show tables; ------------ | table name | ------------ |     source | |         t1 | |         t2 | ------------ 3 rows in set ​ Flink SQL drop table source; Flink SQL drop table t1; Flink SQL drop table t2; 创建表 CREATE TABLE source ( id INT, ts BIGINT, vc INT ) WITH ( connector datagen, rows-per-second1, fields.id.kindrandom, fields.id.min1, fields.id.max10, fields.ts.kindsequence, fields.ts.start1, fields.ts.end1000000, fields.vc.kindrandom, fields.vc.min1, fields.vc.max100 ); CREATE TABLE t2( id int , sumVC int ,primary key (id) NOT ENFORCED ) WITH (connector upsert-kafka,properties.bootstrap.servers node2:9092,topic ws2,key.format json,value.format json ); 设置显示模式 SET sql-client.execution.result-modetableau; 查询表 select * from t2; 完成enjoy it!
http://www.pierceye.com/news/681498/

相关文章:

  • 专业网站建设公创建app软件
  • 佛山哪家网站建设比较好互联网内容服务商有哪些
  • 商家在携程旅游网站怎样做宣传ppt免费下载素材库
  • 下载建设银行官方网站下载网站模块介绍
  • 网站定制开发公司推荐网站续费问题
  • 专注七星彩网站开发出租云服务器上建网站
  • 天津做网站.都找津坤科技中国菲律宾历史战绩
  • 网站建设合同的效力网站建设公司需要交税么
  • 色弱可以做网站开发吗建网站的步骤及方法
  • 卖衣服的网站排名discuz分类信息模板
  • 广西网站开发公司招聘网页制作软件
  • 网站框架一般用什么做dede搭建网站教程
  • 郑州网站开发比较好的网络公司网络推广公司有多少家
  • 银川做网站服务免费word模板
  • 个人网站备案方法网站开发制作云盘
  • 培训做网站传奇手游发布网站
  • 中国保密在线网站培训系统手机购物平台
  • 厦门网站制作建设沐风 wordpress 主题
  • 网站选择城市怎么做北京招聘信息
  • 一个做搞笑类视频的网站取名手机端企业网站源码
  • 房地产网站模板网站建设论文范文
  • 任丘网站制作公司pc网站制作公司
  • 惠州城乡规划建设局网站工程公司经营范围
  • 淮南服装网站建设地址巴彦淖尔网站建设公司
  • 如何让自己的网站被百度收录wordpress 悬浮网易云
  • 天津展示型网站建设外包腾讯云wordpress镜像
  • python做网站点登入没反映wordpress母公司
  • 中国建设培训网站查询系统地产项目网站建设ppt
  • 温州高端网站建设网站开发实验心得
  • 设计参考网站有哪些陕西省西安市事业单位招聘网