网站策划机构,n127网推广,织梦后台生成网站地图,做窗帘的厂家网站一、写在前面
在实际的生产环境中#xff0c;我们经常会把Flink处理的数据写入MySQL、Doris等数据库中#xff0c;下面以MySQL为例#xff0c;使用JDBC的方式将Flink的数据实时数据写入MySQL。
二、代码示例
2.1 版本说明 flink.version1.14.6/flink.version…一、写在前面
在实际的生产环境中我们经常会把Flink处理的数据写入MySQL、Doris等数据库中下面以MySQL为例使用JDBC的方式将Flink的数据实时数据写入MySQL。
二、代码示例
2.1 版本说明 flink.version1.14.6/flink.versionspark.version2.4.3/spark.versionhadoop.version2.8.5/hadoop.versionhbase.version1.4.9/hbase.versionhive.version2.3.5/hive.versionjava.version1.8/java.versionscala.version2.11.8/scala.versionmysql.version8.0.22/mysql.versionscala.binary.version2.11/scala.binary.version2.2 导入相关依赖 dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-jdbc_2.11/artifactIdversion${flink.version}/version
/dependency
!--mysql连接器依赖--
dependencygroupIdmysql/groupIdartifactIdmysql-connector-java/artifactIdversion8.0.22/version
/dependency2.3 连接数据库创建表
mysql CREATE TABLE ws ( id varchar(100) NOT NULL,ts bigint(20) DEFAULT NULL,vc int(11) DEFAULT NULL, PRIMARY KEY (id) ) ENGINEInnoDB DEFAULT CHARSETutf82.4 创建POJO类
package com.flink.POJOs;import java.util.Objects;/*** TODO POJO类的特点* 类是公有public的* 有一个无参的构造方法* 所有属性都是公有public的* 所有属性的类型都是可以序列化的*/
public class WaterSensor {//类的公共属性public String id;public Long ts;public Integer vc;//无参构造方法public WaterSensor() {//System.out.println(调用了无参数的构造方法);}public WaterSensor(String id, Long ts, Integer vc) {this.id id;this.ts ts;this.vc vc;}//生成get和set方法public void setId(String id) {this.id id;}public void setTs(Long ts) {this.ts ts;}public void setVc(Integer vc) {this.vc vc;}public String getId() {return id;}public Long getTs() {return ts;}public Integer getVc() {return vc;}//重写toString方法Overridepublic String toString() {return WaterSensor{ id id \ , ts ts , vc vc };}//重写equals和hasCode方法Overridepublic boolean equals(Object o) {if (this o) return true;if (o null || getClass() ! o.getClass()) return false;WaterSensor that (WaterSensor) o;return id.equals(that.id) ts.equals(that.ts) vc.equals(that.vc);}Overridepublic int hashCode() {return Objects.hash(id, ts, vc);}
}
//scala的case类2.5 自定义map函数
package com.flink.POJOs;import org.apache.flink.api.common.functions.MapFunction;public class WaterSensorMapFunction implements MapFunctionString, WaterSensor {Overridepublic WaterSensor map(String value) throws Exception {String[] datas value.split(,);return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));}
}2.5 Flink2MySQL
package com.flink.DataStream.Sink;import com.flink.POJOs.WaterSensor;
import com.flink.POJOs.WaterSensorMapFunction;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;import java.sql.PreparedStatement;
import java.sql.SQLException;/*** Flink 输出到 MySQLJDBC*/
public class flinkSinkJdbc {public static void main(String[] args) throws Exception {//TODO 创建Flink上下文执行环境StreamExecutionEnvironment streamExecutionEnvironment StreamExecutionEnvironment.getExecutionEnvironment();streamExecutionEnvironment.setParallelism(1);//TODO SourceDataStreamSourceString dataStreamSource streamExecutionEnvironment.socketTextStream(localhost, 8888);//TODO TransferSingleOutputStreamOperatorWaterSensor waterSensorSingleOutputStreamOperator dataStreamSource.map(new WaterSensorMapFunction());/**TODO 写入 mysql* 1、只能用老的 sink 写法* 2、JDBCSink 的 4 个参数:* 第一个参数 执行的 sql一般就是 insert into* 第二个参数 预编译 sql 对占位符填充值* 第三个参数 执行选项 ----攒批、重试* 第四个参数 连接选项----url、用户名、密码*/SinkFunctionWaterSensor sinkFunction JdbcSink.sink(insert into ws values(?,?,?),new JdbcStatementBuilderWaterSensor() {Overridepublic void accept(PreparedStatement preparedStatement, WaterSensor waterSensor) throws SQLException {preparedStatement.setString(1, waterSensor.getId());preparedStatement.setLong(2, waterSensor.getTs());preparedStatement.setInt(3, waterSensor.getVc());System.out.println(数据写入成功(waterSensor.getId(),waterSensor.getTs(),waterSensor.getVc()));}}, JdbcExecutionOptions.builder().withMaxRetries(3) // 重试次数.withBatchSize(100) // 批次的大小条数.withBatchIntervalMs(3000) // 批次的时间.build(),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl(jdbc:mysql://localhost:3306/dw?serverTimezoneAsia/ShanghaiuseUnicodetruecharacterEncodingUTF-8).withUsername(root).withPassword(********).withConnectionCheckTimeoutSeconds(60) // 重试的超时时间.build());//TODO 写入到MysqlwaterSensorSingleOutputStreamOperator.addSink(sinkFunction);streamExecutionEnvironment.execute();}
}2.6 启动necat、Flink观察数据库写入情况
nc -lk 9999 #启动necat、并监听8888端口写入数据启动Flink程序 查看数据库写入是否正常