当前位置: 首页 > news >正文

网站开发教程免费wordpress禁用客户端登录

网站开发教程免费,wordpress禁用客户端登录,营销型网站怎么建设,wordpress 评论调用使用Flink实现MySQL到Kafka的数据流转换 本篇博客将介绍如何使用Flink将数据从MySQL数据库实时传输到Kafka#xff0c;这是一个常见的用例#xff0c;适用于需要实时数据connector的场景。 环境准备 在开始之前#xff0c;确保你的环境中已经安装了以下软件#xff1a;…使用Flink实现MySQL到Kafka的数据流转换 本篇博客将介绍如何使用Flink将数据从MySQL数据库实时传输到Kafka这是一个常见的用例适用于需要实时数据connector的场景。 环境准备 在开始之前确保你的环境中已经安装了以下软件 Apache Flink 准备相关pom依赖 ?xml version1.0 encodingUTF-8? project xmlnshttp://maven.apache.org/POM/4.0.0xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersiongroupIdorg.example/groupIdartifactIdEastMoney/artifactIdversion1.0-SNAPSHOT/versionpropertiesmaven.compiler.source8/maven.compiler.sourcemaven.compiler.target8/maven.compiler.targetproject.build.sourceEncodingUTF-8/project.build.sourceEncoding/propertiesdependenciesdependencygroupIdorg.apache.flink/groupIdartifactIdflink-clients_2.11/artifactIdversion1.14.0/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-api-scala-bridge_2.11/artifactIdversion1.14.0/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-planner_2.11/artifactIdversion1.14.0/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-api-scala_2.11/artifactIdversion1.14.0/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-jdbc_2.11/artifactIdversion1.14.0/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-csv/artifactIdversion1.14.0/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-kafka_2.11/artifactIdversion1.14.0/version/dependencydependencygroupIdmysql/groupIdartifactIdmysql-connector-java/artifactIdversion8.0.25/version/dependency/dependencies/projectMySQL数据库初始化mysql表 CREATE TABLE t_stock_code_price (id bigint NOT NULL AUTO_INCREMENT,code varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT 股票代码,name varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT 股票名称,close double DEFAULT NULL COMMENT 最新价,change_percent double DEFAULT NULL COMMENT 涨跌幅,change double DEFAULT NULL COMMENT 涨跌额,volume double DEFAULT NULL COMMENT 成交量手,amount double DEFAULT NULL COMMENT 成交额,amplitude double DEFAULT NULL COMMENT 振幅,turnover_rate double DEFAULT NULL COMMENT 换手率,peration double DEFAULT NULL COMMENT 市盈率,volume_rate double DEFAULT NULL COMMENT 量比,hign double DEFAULT NULL COMMENT 最高,low double DEFAULT NULL COMMENT 最低,open double DEFAULT NULL COMMENT 今开,previous_close double DEFAULT NULL COMMENT 昨收,pb double DEFAULT NULL COMMENT 市净率,create_time varchar(64) NOT NULL COMMENT 写入时间,PRIMARY KEY (id) ) ENGINEInnoDB AUTO_INCREMENT5605 DEFAULT CHARSETutf8mb4 COLLATEutf8mb4_0900_ai_ciKafka消息队列 1. 启动zookeeperzkServer start 2. 启动kafka服务kafka-server-start /opt/homebrew/etc/kafka/server.properties 3. 创建topickafka-topics --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 1 --topic east_money 4. 消费数据kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic east_money --from-beginning步骤解释 获取流执行环境首先我们通过StreamExecutionEnvironment.getExecutionEnvironment获取Flink的流执行环境并设置其运行模式为流处理模式。 创建流表环境接着我们通过StreamTableEnvironment.create创建一个流表环境这个环境允许我们使用SQL语句来操作数据流。 val senv StreamExecutionEnvironment.getExecutionEnvironment.setRuntimeMode(RuntimeExecutionMode.STREAMING)val tEnv StreamTableEnvironment.create(senv)定义MySQL数据源表我们使用一个SQL语句创建了一个临时表t_stock_code_price这个表代表了我们要从MySQL读取的数据结构和连接信息。 val source_table |CREATE TEMPORARY TABLE t_stock_code_price (| id BIGINT NOT NULL,| code STRING NOT NULL,| name STRING NOT NULL,| close DOUBLE,| change_percent DOUBLE,| change DOUBLE,| volume DOUBLE,| amount DOUBLE,| amplitude DOUBLE,| turnover_rate DOUBLE,| peration DOUBLE,| volume_rate DOUBLE,| hign DOUBLE,| low DOUBLE,| open DOUBLE,| previous_close DOUBLE,| pb DOUBLE,| create_time STRING NOT NULL,| PRIMARY KEY (id) NOT ENFORCED|) WITH (| connector jdbc,| url jdbc:mysql://localhost:3306/mydb,| driver com.mysql.cj.jdbc.Driver,| table-name t_stock_code_price,| username root,| password 12345678|)|.stripMargintEnv.executeSql(source_table)定义Kafka目标表然后我们定义了一个Kafka表re_stock_code_price_kafka指定了Kafka的连接参数和表结构。 tEnv.executeSql(CREATE TABLE re_stock_code_price_kafka ( id BIGINT, code STRING, name STRING, close DOUBLE, change_percent DOUBLE, change DOUBLE, volume DOUBLE, amount DOUBLE, amplitude DOUBLE, turnover_rate DOUBLE, operation DOUBLE, volume_rate DOUBLE, high DOUBLE, low DOUBLE, open DOUBLE, previous_close DOUBLE, pb DOUBLE, create_time STRING, rise int) WITH ( connector kafka, topic east_money, properties.bootstrap.servers 127.0.0.1:9092, properties.group.id mysql2kafka, scan.startup.mode earliest-offset, format csv, csv.field-delimiter , ))数据转换和写入最后我们执行了一个插入操作将从MySQL读取的数据转换这里通过case when语句添加了一个新字段rise并写入到Kafka中。这个可以实现任何的sql etl 来满足我们的需求。 tEnv.executeSql(insert into re_stock_code_price_kafka select *,case when change_percent0 then 1 else 0 end as rise from t_stock_code_price)全部代码 package org.eastimport org.apache.flink.api.common.RuntimeExecutionMode import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.bridge.scala.StreamTableEnvironmentobject Mysql2Kafka {def main(args: Array[String]): Unit {val senv StreamExecutionEnvironment.getExecutionEnvironment.setRuntimeMode(RuntimeExecutionMode.STREAMING)val tEnv StreamTableEnvironment.create(senv)val source_table |CREATE TEMPORARY TABLE t_stock_code_price (| id BIGINT NOT NULL,| code STRING NOT NULL,| name STRING NOT NULL,| close DOUBLE,| change_percent DOUBLE,| change DOUBLE,| volume DOUBLE,| amount DOUBLE,| amplitude DOUBLE,| turnover_rate DOUBLE,| peration DOUBLE,| volume_rate DOUBLE,| hign DOUBLE,| low DOUBLE,| open DOUBLE,| previous_close DOUBLE,| pb DOUBLE,| create_time STRING NOT NULL,| PRIMARY KEY (id) NOT ENFORCED|) WITH (| connector jdbc,| url jdbc:mysql://localhost:3306/mydb,| driver com.mysql.cj.jdbc.Driver,| table-name t_stock_code_price,| username root,| password 12345678|)|.stripMargintEnv.executeSql(source_table)val result tEnv.executeSql(select * from t_stock_code_price)result.print()tEnv.executeSql(CREATE TABLE re_stock_code_price_kafka ( id BIGINT, code STRING, name STRING, close DOUBLE, change_percent DOUBLE, change DOUBLE, volume DOUBLE, amount DOUBLE, amplitude DOUBLE, turnover_rate DOUBLE, operation DOUBLE, volume_rate DOUBLE, high DOUBLE, low DOUBLE, open DOUBLE, previous_close DOUBLE, pb DOUBLE, create_time STRING, rise int) WITH ( connector kafka, topic east_money, properties.bootstrap.servers 127.0.0.1:9092, properties.group.id mysql2kafka, scan.startup.mode earliest-offset, format csv, csv.field-delimiter , ))tEnv.executeSql(insert into re_stock_code_price_kafka select *,case when change_percent0 then 1 else 0 end as rise from t_stock_code_price)} }如有遇到问题可以找小编沟通交流哦。另外小编帮忙辅导大课作业学生毕设等。不限于pythonjava大数据模型训练等。
http://www.pierceye.com/news/813555/

相关文章:

  • 南宁市网站建设公司给人做网站网站
  • 怎么查网站的备案号网站页面设计需求
  • 手机网站怎么做的好网页设计参考书籍
  • 网站建设和网络推广方案网站建设技术可行性分析
  • 免费建站网站自助建站的网站建站淘宝网站开始怎么做的
  • 旅游电网站建设目标公司注册成本
  • 建设婚恋网站基本功能有哪些毕业设计网页
  • 广州贸易网站杭州关键词推广优化方案
  • 怎么注册自己的网站wordpress静态设置
  • 网站收录有什么好处仿糗事百科wordpress
  • 面试网站建设工程师鞍山市城市建设管理局网站
  • 电商网站建设与管理柳州网站建设多少钱
  • 网站的访问量统计怎么做企业网站建设基本标准
  • 网站开发服务属于什么行业先做网站再备案吗
  • 做零售去哪个外贸网站专业做网站较好的公司
  • 网站运营职业分析2233网页游戏大全
  • 深圳网站制作880网站建设课设总结
  • 瑶海区网站建设公司中铁建设集团有限公司基础设施事业部
  • wordpress 用js网站备案 seo
  • 网站一级域名和二级域名区别自己怎么做外贸英文网站
  • 南京网站定制南京wordpress安装提示500错误
  • 网站图片优化免费网站建设凡科
  • 网站开发项目团队网页设计图片与文字的研究
  • 百度网站建设策划书范文做暧在线网站
  • 松江新城做网站公司国产成年做视频网站
  • 杭州网站推广服务网站单页面怎么做
  • 房地产网站建设案例wordpress 判断移动端
  • 网站开发过程文档网站代码需要注意什么问题
  • 怎么选一个适合自己的网站wordpress怎么修改后台登录地址
  • 网页制作与网站建设自考西安千秋网络科技有限公司