网站开发教程免费,wordpress禁用客户端登录,营销型网站怎么建设,wordpress 评论调用使用Flink实现MySQL到Kafka的数据流转换
本篇博客将介绍如何使用Flink将数据从MySQL数据库实时传输到Kafka#xff0c;这是一个常见的用例#xff0c;适用于需要实时数据connector的场景。
环境准备
在开始之前#xff0c;确保你的环境中已经安装了以下软件#xff1a;…使用Flink实现MySQL到Kafka的数据流转换
本篇博客将介绍如何使用Flink将数据从MySQL数据库实时传输到Kafka这是一个常见的用例适用于需要实时数据connector的场景。
环境准备
在开始之前确保你的环境中已经安装了以下软件 Apache Flink 准备相关pom依赖
?xml version1.0 encodingUTF-8?
project xmlnshttp://maven.apache.org/POM/4.0.0xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersiongroupIdorg.example/groupIdartifactIdEastMoney/artifactIdversion1.0-SNAPSHOT/versionpropertiesmaven.compiler.source8/maven.compiler.sourcemaven.compiler.target8/maven.compiler.targetproject.build.sourceEncodingUTF-8/project.build.sourceEncoding/propertiesdependenciesdependencygroupIdorg.apache.flink/groupIdartifactIdflink-clients_2.11/artifactIdversion1.14.0/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-api-scala-bridge_2.11/artifactIdversion1.14.0/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-planner_2.11/artifactIdversion1.14.0/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-api-scala_2.11/artifactIdversion1.14.0/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-jdbc_2.11/artifactIdversion1.14.0/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-csv/artifactIdversion1.14.0/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-kafka_2.11/artifactIdversion1.14.0/version/dependencydependencygroupIdmysql/groupIdartifactIdmysql-connector-java/artifactIdversion8.0.25/version/dependency/dependencies/projectMySQL数据库初始化mysql表
CREATE TABLE t_stock_code_price (id bigint NOT NULL AUTO_INCREMENT,code varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT 股票代码,name varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT 股票名称,close double DEFAULT NULL COMMENT 最新价,change_percent double DEFAULT NULL COMMENT 涨跌幅,change double DEFAULT NULL COMMENT 涨跌额,volume double DEFAULT NULL COMMENT 成交量手,amount double DEFAULT NULL COMMENT 成交额,amplitude double DEFAULT NULL COMMENT 振幅,turnover_rate double DEFAULT NULL COMMENT 换手率,peration double DEFAULT NULL COMMENT 市盈率,volume_rate double DEFAULT NULL COMMENT 量比,hign double DEFAULT NULL COMMENT 最高,low double DEFAULT NULL COMMENT 最低,open double DEFAULT NULL COMMENT 今开,previous_close double DEFAULT NULL COMMENT 昨收,pb double DEFAULT NULL COMMENT 市净率,create_time varchar(64) NOT NULL COMMENT 写入时间,PRIMARY KEY (id)
) ENGINEInnoDB AUTO_INCREMENT5605 DEFAULT CHARSETutf8mb4 COLLATEutf8mb4_0900_ai_ciKafka消息队列
1. 启动zookeeperzkServer start
2. 启动kafka服务kafka-server-start /opt/homebrew/etc/kafka/server.properties
3. 创建topickafka-topics --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 1 --topic east_money
4. 消费数据kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic east_money --from-beginning步骤解释
获取流执行环境首先我们通过StreamExecutionEnvironment.getExecutionEnvironment获取Flink的流执行环境并设置其运行模式为流处理模式。
创建流表环境接着我们通过StreamTableEnvironment.create创建一个流表环境这个环境允许我们使用SQL语句来操作数据流。
val senv StreamExecutionEnvironment.getExecutionEnvironment.setRuntimeMode(RuntimeExecutionMode.STREAMING)val tEnv StreamTableEnvironment.create(senv)定义MySQL数据源表我们使用一个SQL语句创建了一个临时表t_stock_code_price这个表代表了我们要从MySQL读取的数据结构和连接信息。
val source_table |CREATE TEMPORARY TABLE t_stock_code_price (| id BIGINT NOT NULL,| code STRING NOT NULL,| name STRING NOT NULL,| close DOUBLE,| change_percent DOUBLE,| change DOUBLE,| volume DOUBLE,| amount DOUBLE,| amplitude DOUBLE,| turnover_rate DOUBLE,| peration DOUBLE,| volume_rate DOUBLE,| hign DOUBLE,| low DOUBLE,| open DOUBLE,| previous_close DOUBLE,| pb DOUBLE,| create_time STRING NOT NULL,| PRIMARY KEY (id) NOT ENFORCED|) WITH (| connector jdbc,| url jdbc:mysql://localhost:3306/mydb,| driver com.mysql.cj.jdbc.Driver,| table-name t_stock_code_price,| username root,| password 12345678|)|.stripMargintEnv.executeSql(source_table)定义Kafka目标表然后我们定义了一个Kafka表re_stock_code_price_kafka指定了Kafka的连接参数和表结构。
tEnv.executeSql(CREATE TABLE re_stock_code_price_kafka ( id BIGINT, code STRING, name STRING, close DOUBLE, change_percent DOUBLE, change DOUBLE, volume DOUBLE, amount DOUBLE, amplitude DOUBLE, turnover_rate DOUBLE, operation DOUBLE, volume_rate DOUBLE, high DOUBLE, low DOUBLE, open DOUBLE, previous_close DOUBLE, pb DOUBLE, create_time STRING, rise int) WITH ( connector kafka, topic east_money, properties.bootstrap.servers 127.0.0.1:9092, properties.group.id mysql2kafka, scan.startup.mode earliest-offset, format csv, csv.field-delimiter , ))数据转换和写入最后我们执行了一个插入操作将从MySQL读取的数据转换这里通过case when语句添加了一个新字段rise并写入到Kafka中。这个可以实现任何的sql etl 来满足我们的需求。 tEnv.executeSql(insert into re_stock_code_price_kafka select *,case when change_percent0 then 1 else 0 end as rise from t_stock_code_price)全部代码
package org.eastimport org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironmentobject Mysql2Kafka {def main(args: Array[String]): Unit {val senv StreamExecutionEnvironment.getExecutionEnvironment.setRuntimeMode(RuntimeExecutionMode.STREAMING)val tEnv StreamTableEnvironment.create(senv)val source_table |CREATE TEMPORARY TABLE t_stock_code_price (| id BIGINT NOT NULL,| code STRING NOT NULL,| name STRING NOT NULL,| close DOUBLE,| change_percent DOUBLE,| change DOUBLE,| volume DOUBLE,| amount DOUBLE,| amplitude DOUBLE,| turnover_rate DOUBLE,| peration DOUBLE,| volume_rate DOUBLE,| hign DOUBLE,| low DOUBLE,| open DOUBLE,| previous_close DOUBLE,| pb DOUBLE,| create_time STRING NOT NULL,| PRIMARY KEY (id) NOT ENFORCED|) WITH (| connector jdbc,| url jdbc:mysql://localhost:3306/mydb,| driver com.mysql.cj.jdbc.Driver,| table-name t_stock_code_price,| username root,| password 12345678|)|.stripMargintEnv.executeSql(source_table)val result tEnv.executeSql(select * from t_stock_code_price)result.print()tEnv.executeSql(CREATE TABLE re_stock_code_price_kafka ( id BIGINT, code STRING, name STRING, close DOUBLE, change_percent DOUBLE, change DOUBLE, volume DOUBLE, amount DOUBLE, amplitude DOUBLE, turnover_rate DOUBLE, operation DOUBLE, volume_rate DOUBLE, high DOUBLE, low DOUBLE, open DOUBLE, previous_close DOUBLE, pb DOUBLE, create_time STRING, rise int) WITH ( connector kafka, topic east_money, properties.bootstrap.servers 127.0.0.1:9092, properties.group.id mysql2kafka, scan.startup.mode earliest-offset, format csv, csv.field-delimiter , ))tEnv.executeSql(insert into re_stock_code_price_kafka select *,case when change_percent0 then 1 else 0 end as rise from t_stock_code_price)}
}如有遇到问题可以找小编沟通交流哦。另外小编帮忙辅导大课作业学生毕设等。不限于pythonjava大数据模型训练等。