当前位置: 首页 > news >正文

网站策划机构n127网推广

网站策划机构,n127网推广,织梦后台生成网站地图,做窗帘的厂家网站一、写在前面 在实际的生产环境中#xff0c;我们经常会把Flink处理的数据写入MySQL、Doris等数据库中#xff0c;下面以MySQL为例#xff0c;使用JDBC的方式将Flink的数据实时数据写入MySQL。 二、代码示例 2.1 版本说明 flink.version1.14.6/flink.version…一、写在前面 在实际的生产环境中我们经常会把Flink处理的数据写入MySQL、Doris等数据库中下面以MySQL为例使用JDBC的方式将Flink的数据实时数据写入MySQL。 二、代码示例 2.1 版本说明 flink.version1.14.6/flink.versionspark.version2.4.3/spark.versionhadoop.version2.8.5/hadoop.versionhbase.version1.4.9/hbase.versionhive.version2.3.5/hive.versionjava.version1.8/java.versionscala.version2.11.8/scala.versionmysql.version8.0.22/mysql.versionscala.binary.version2.11/scala.binary.version2.2 导入相关依赖 dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-jdbc_2.11/artifactIdversion${flink.version}/version /dependency !--mysql连接器依赖-- dependencygroupIdmysql/groupIdartifactIdmysql-connector-java/artifactIdversion8.0.22/version /dependency2.3 连接数据库创建表 mysql CREATE TABLE ws ( id varchar(100) NOT NULL,ts bigint(20) DEFAULT NULL,vc int(11) DEFAULT NULL, PRIMARY KEY (id) ) ENGINEInnoDB DEFAULT CHARSETutf82.4 创建POJO类 package com.flink.POJOs;import java.util.Objects;/*** TODO POJO类的特点* 类是公有public的* 有一个无参的构造方法* 所有属性都是公有public的* 所有属性的类型都是可以序列化的*/ public class WaterSensor {//类的公共属性public String id;public Long ts;public Integer vc;//无参构造方法public WaterSensor() {//System.out.println(调用了无参数的构造方法);}public WaterSensor(String id, Long ts, Integer vc) {this.id id;this.ts ts;this.vc vc;}//生成get和set方法public void setId(String id) {this.id id;}public void setTs(Long ts) {this.ts ts;}public void setVc(Integer vc) {this.vc vc;}public String getId() {return id;}public Long getTs() {return ts;}public Integer getVc() {return vc;}//重写toString方法Overridepublic String toString() {return WaterSensor{ id id \ , ts ts , vc vc };}//重写equals和hasCode方法Overridepublic boolean equals(Object o) {if (this o) return true;if (o null || getClass() ! o.getClass()) return false;WaterSensor that (WaterSensor) o;return id.equals(that.id) ts.equals(that.ts) vc.equals(that.vc);}Overridepublic int hashCode() {return Objects.hash(id, ts, vc);} } //scala的case类2.5 自定义map函数 package com.flink.POJOs;import org.apache.flink.api.common.functions.MapFunction;public class WaterSensorMapFunction implements MapFunctionString, WaterSensor {Overridepublic WaterSensor map(String value) throws Exception {String[] datas value.split(,);return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));} }2.5 Flink2MySQL package com.flink.DataStream.Sink;import com.flink.POJOs.WaterSensor; import com.flink.POJOs.WaterSensorMapFunction; import org.apache.flink.connector.jdbc.JdbcConnectionOptions; import org.apache.flink.connector.jdbc.JdbcExecutionOptions; import org.apache.flink.connector.jdbc.JdbcSink; import org.apache.flink.connector.jdbc.JdbcStatementBuilder; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkFunction;import java.sql.PreparedStatement; import java.sql.SQLException;/*** Flink 输出到 MySQLJDBC*/ public class flinkSinkJdbc {public static void main(String[] args) throws Exception {//TODO 创建Flink上下文执行环境StreamExecutionEnvironment streamExecutionEnvironment StreamExecutionEnvironment.getExecutionEnvironment();streamExecutionEnvironment.setParallelism(1);//TODO SourceDataStreamSourceString dataStreamSource streamExecutionEnvironment.socketTextStream(localhost, 8888);//TODO TransferSingleOutputStreamOperatorWaterSensor waterSensorSingleOutputStreamOperator dataStreamSource.map(new WaterSensorMapFunction());/**TODO 写入 mysql* 1、只能用老的 sink 写法* 2、JDBCSink 的 4 个参数:* 第一个参数 执行的 sql一般就是 insert into* 第二个参数 预编译 sql 对占位符填充值* 第三个参数 执行选项 ----攒批、重试* 第四个参数 连接选项----url、用户名、密码*/SinkFunctionWaterSensor sinkFunction JdbcSink.sink(insert into ws values(?,?,?),new JdbcStatementBuilderWaterSensor() {Overridepublic void accept(PreparedStatement preparedStatement, WaterSensor waterSensor) throws SQLException {preparedStatement.setString(1, waterSensor.getId());preparedStatement.setLong(2, waterSensor.getTs());preparedStatement.setInt(3, waterSensor.getVc());System.out.println(数据写入成功(waterSensor.getId(),waterSensor.getTs(),waterSensor.getVc()));}}, JdbcExecutionOptions.builder().withMaxRetries(3) // 重试次数.withBatchSize(100) // 批次的大小条数.withBatchIntervalMs(3000) // 批次的时间.build(),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl(jdbc:mysql://localhost:3306/dw?serverTimezoneAsia/ShanghaiuseUnicodetruecharacterEncodingUTF-8).withUsername(root).withPassword(********).withConnectionCheckTimeoutSeconds(60) // 重试的超时时间.build());//TODO 写入到MysqlwaterSensorSingleOutputStreamOperator.addSink(sinkFunction);streamExecutionEnvironment.execute();} }2.6 启动necat、Flink观察数据库写入情况 nc -lk 9999 #启动necat、并监听8888端口写入数据启动Flink程序 查看数据库写入是否正常
http://www.pierceye.com/news/670355/

相关文章:

  • 如何加快网站打开速度浦东新区建设机械网站
  • 求网站制作焦作网站建设的公司
  • 用python做网站不常见久久建筑网外墙岩棉保温板施工工艺
  • 做刷票的网站广告牌的样式大全
  • 手机登录网站怎么建设网站风格和功能设计方案
  • 网站报价天津网站在哪里建设
  • 湖北专业网站建设维修电话福清网站商城建设
  • 网站建设模块怎么使用线上注册公司流程和费用
  • 营销型网站设计内容wordpress加速优化插件
  • 设计坞网站官方下载4399网页游戏入口
  • 太原百度网站建设如何联系网站管理员
  • 海东高端网站建设公司视频网站 费用
  • 可以帮别人备案网站吗手机建网站公司
  • 四川建设厅网上查询网站信用网站系统建设方案
  • 克隆网站后台做系统用哪个网站好
  • html5 手机网站页面实例wordpress 路由404
  • 百度地图嵌入公司网站wordpress如何去掉分类里面的大字
  • 山东住房与城乡建设网站够完美网站建设
  • 班级网站建设首页报告如何查询一个网站是否备案
  • 艺术设计类网站石家庄公司的网站设计
  • 舞钢网站建设企业做网站需要什么软件
  • 网站开发上市公司专业的网站建设价格低
  • 备案网站有哪些资料公司名字大全四个字
  • 网站推广预期达到的目标建湖人才网手机版
  • 营销网站设计公司排名wordpress图片缓冲
  • 山西建设官方网站第三方网站流量统计
  • 企业网站用wordpress龙岗网站建设网站排名优化
  • 成都建设网站哪家好事件营销的特点
  • 如何利用模板做网站视频wordpress手机版边侧导航
  • 网站制作在哪里找wordpress 设置登陆界面