当前位置: 首页 > news >正文

网站灰色 代码安徽省建设厅官方网站进不去

网站灰色 代码,安徽省建设厅官方网站进不去,网站开发榜单规则,校园网站建设价格基于 Flink CDC 构建 MySQL 和 Postgres 的 Streaming ETL 1.准备阶段1.1 准备教程所需要的组件1.2 下载 Flink 和所需要的依赖包1.3 准备数据1.3.1 在 MySQL 数据库中准备数据1.3.2 在 Postgres 数据库中准备数据 2.启动 Flink 集群和 Flink SQL CLI3.在 Flink SQL CLI 中使用… 基于 Flink CDC 构建 MySQL 和 Postgres 的 Streaming ETL 1.准备阶段1.1 准备教程所需要的组件1.2 下载 Flink 和所需要的依赖包1.3 准备数据1.3.1 在 MySQL 数据库中准备数据1.3.2 在 Postgres 数据库中准备数据 2.启动 Flink 集群和 Flink SQL CLI3.在 Flink SQL CLI 中使用 Flink DDL 创建表4.关联订单数据并且将其写入 Elasticsearch 中5.环境清理 这篇教程将展示如何基于 Flink CDC 快速构建 MySQL 和 Postgres 的流式 ETL。本教程的演示都将在 Flink SQL CLI 中进行只涉及 SQL无需一行 Java / Scala 代码也无需安装 IDE。 假设我们正在经营电子商务业务商品和订单的数据存储在 MySQL 中订单对应的物流信息存储在 Postgres 中。 对于订单表为了方便进行分析我们希望让它关联上其对应的商品和物流信息构成一张宽表并且实时把它写到 ElasticSearch 中。 接下来的内容将介绍如何使用 Flink MySQL / Postgres CDC 来实现这个需求系统的整体架构如下图所示 1.准备阶段 准备一台已经安装了 Docker 的 Linux 或者 MacOS 电脑。 1.1 准备教程所需要的组件 接下来的教程将以 docker-compose 的方式准备所需要的组件。 使用下面的内容创建一个 docker-compose.yml 文件 version: 2.1 services:postgres:image: debezium/example-postgres:1.1ports:- 5432:5432environment:- POSTGRES_DBpostgres- POSTGRES_USERpostgres- POSTGRES_PASSWORDpostgresmysql:image: debezium/example-mysql:1.1ports:- 3306:3306environment:- MYSQL_ROOT_PASSWORD123456- MYSQL_USERmysqluser- MYSQL_PASSWORDmysqlpwelasticsearch:image: elastic/elasticsearch:7.6.0environment:- cluster.namedocker-cluster- bootstrap.memory_locktrue- ES_JAVA_OPTS-Xms512m -Xmx512m- discovery.typesingle-nodeports:- 9200:9200- 9300:9300ulimits:memlock:soft: -1hard: -1nofile:soft: 65536hard: 65536kibana:image: elastic/kibana:7.6.0ports:- 5601:5601该 Docker Compose 中包含的容器有 MySQL商品表 products 和 订单表 orders 将存储在该数据库中 这两张表将和 Postgres 数据库中的物流表 shipments 进行关联得到一张包含更多信息的订单表 enriched_orders。Postgres物流表 shipments 将存储在该数据库中。Elasticsearch最终的订单表 enriched_orders 将写到 Elasticsearch。Kibana用来可视化 ElasticSearch 的数据。 在 docker-compose.yml 所在目录下执行下面的命令来启动本教程需要的组件 docker-compose up -d该命令将以 detached 模式自动启动 Docker Compose 配置中定义的所有容器。你可以通过 docker ps 来观察上述的容器是否正常启动了也可以通过访问 http://localhost:5601/ 来查看 Kibana 是否运行正常。 1.2 下载 Flink 和所需要的依赖包 下载 Flink 1.18.0 并将其解压至目录 flink-1.18.0。 下载以下列出的依赖包并将它们放到目录 flink-1.18.0/lib/ 下 flink-sql-connector-elasticsearch7-3.0.1-1.17.jarflink-sql-connector-mysql-cdc-2.5-SNAPSHOT.jarflink-sql-connector-postgres-cdc-2.5-SNAPSHOT.jar 注下载链接只对已发布的版本有效SNAPSHOT 版本需要本地基于 master 或 release 分支编译。 1.3 准备数据 1.3.1 在 MySQL 数据库中准备数据 进入 MySQL 容器 docker-compose exec mysql mysql -u root -p 123456创建数据库和表 products、orders并插入数据。 -- MySQL CREATE DATABASE mydb; USE mydb; CREATE TABLE products (id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,name VARCHAR(255) NOT NULL,description VARCHAR(512) ); ALTER TABLE products AUTO_INCREMENT 101;INSERT INTO products VALUES (default,scooter,Small 2-wheel scooter),(default,car battery,12V car battery),(default,12-pack drill bits,12-pack of drill bits with sizes ranging from #40 to #3),(default,hammer,12oz carpenters hammer),(default,hammer,14oz carpenters hammer),(default,hammer,16oz carpenters hammer),(default,rocks,box of assorted rocks),(default,jacket,water resistent black wind breaker),(default,spare tire,24 inch spare tire);CREATE TABLE orders (order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,order_date DATETIME NOT NULL,customer_name VARCHAR(255) NOT NULL,price DECIMAL(10, 5) NOT NULL,product_id INTEGER NOT NULL,order_status BOOLEAN NOT NULL -- Whether order has been placed ) AUTO_INCREMENT 10001;INSERT INTO orders VALUES (default, 2020-07-30 10:08:22, Jark, 50.50, 102, false),(default, 2020-07-30 10:11:09, Sally, 15.00, 105, false),(default, 2020-07-30 12:00:30, Edward, 25.25, 106, false);1.3.2 在 Postgres 数据库中准备数据 进入 Postgres 容器 docker-compose exec postgres psql -h localhost -U postgres创建表 shipments并插入数据。 -- PG CREATE TABLE shipments (shipment_id SERIAL NOT NULL PRIMARY KEY,order_id SERIAL NOT NULL,origin VARCHAR(255) NOT NULL,destination VARCHAR(255) NOT NULL,is_arrived BOOLEAN NOT NULL ); ALTER SEQUENCE public.shipments_shipment_id_seq RESTART WITH 1001; ALTER TABLE public.shipments REPLICA IDENTITY FULL; INSERT INTO shipments VALUES (default,10001,Beijing,Shanghai,false),(default,10002,Hangzhou,Shanghai,false),(default,10003,Shanghai,Hangzhou,false);2.启动 Flink 集群和 Flink SQL CLI 使用下面的命令跳转至 Flink 目录下 cd flink-1.18.0使用下面的命令启动 Flink 集群 ./bin/start-cluster.sh启动成功的话可以在 http://localhost:8081/ 访问到 Flink Web UI如下所示 使用下面的命令启动 Flink SQL CLI ./bin/sql-client.sh启动成功后可以看到如下的页面 3.在 Flink SQL CLI 中使用 Flink DDL 创建表 首先开启 checkpoint每隔 3 秒做一次 checkpoint -- Flink SQL Flink SQL SET execution.checkpointing.interval 3s;然后, 对于数据库中的表 productsordersshipments 使用 Flink SQL CLI 创建对应的表用于同步这些底层数据库表的数据。 -- Flink SQL Flink SQL CREATE TABLE products (id INT,name STRING,description STRING,PRIMARY KEY (id) NOT ENFORCED) WITH (connector mysql-cdc,hostname localhost,port 3306,username root,password 123456,database-name mydb,table-name products);Flink SQL CREATE TABLE orders (order_id INT,order_date TIMESTAMP(0),customer_name STRING,price DECIMAL(10, 5),product_id INT,order_status BOOLEAN,PRIMARY KEY (order_id) NOT ENFORCED) WITH (connector mysql-cdc,hostname localhost,port 3306,username root,password 123456,database-name mydb,table-name orders);Flink SQL CREATE TABLE shipments (shipment_id INT,order_id INT,origin STRING,destination STRING,is_arrived BOOLEAN,PRIMARY KEY (shipment_id) NOT ENFORCED) WITH (connector postgres-cdc,hostname localhost,port 5432,username postgres,password postgres,database-name postgres,schema-name public,table-name shipments,slot.name flink);最后创建 enriched_orders 表 用来将关联后的订单数据写入 Elasticsearch 中。 -- Flink SQL Flink SQL CREATE TABLE enriched_orders (order_id INT,order_date TIMESTAMP(0),customer_name STRING,price DECIMAL(10, 5),product_id INT,order_status BOOLEAN,product_name STRING,product_description STRING,shipment_id INT,origin STRING,destination STRING,is_arrived BOOLEAN,PRIMARY KEY (order_id) NOT ENFORCED) WITH (connector elasticsearch-7,hosts http://localhost:9200,index enriched_orders);4.关联订单数据并且将其写入 Elasticsearch 中 使用 Flink SQL 将订单表 order 与 商品表 products物流信息表 shipments 关联并将关联后的订单信息写入 Elasticsearch 中。 -- Flink SQL Flink SQL INSERT INTO enriched_ordersSELECT o.*, p.name, p.description, s.shipment_id, s.origin, s.destination, s.is_arrivedFROM orders AS oLEFT JOIN products AS p ON o.product_id p.idLEFT JOIN shipments AS s ON o.order_id s.order_id;现在就可以在 Kibana 中看到包含商品和物流信息的订单数据。 首先访问 http://localhost:5601/app/kibana#/management/kibana/index_pattern创建 index patternenriched_orders。 然后就可以在 http://localhost:5601/app/kibana#/discover 看到写入的数据了。 接下来修改 MySQL 和 Postgres 数据库中表的数据Kibana 中显示的订单数据也将实时更新 在 MySQL 的 orders 表中插入一条数据 --MySQL INSERT INTO orders VALUES (default, 2020-07-30 15:22:00, Jark, 29.71, 104, false);在 Postgres 的 shipment 表中插入一条数据 --PG INSERT INTO shipments VALUES (default,10004,Shanghai,Beijing,false);在 MySQL 的 orders 表中更新订单的状态 --MySQL UPDATE orders SET order_status true WHERE order_id 10004;在 Postgres 的 shipment 表中更新物流的状态 --PG UPDATE shipments SET is_arrived true WHERE shipment_id 1004;在 MySQL 的 orders 表中删除一条数据 --MySQL DELETE FROM orders WHERE order_id 10004;每执行一步就刷新一次 Kibana可以看到 Kibana 中显示的订单数据将实时更新如下所示 5.环境清理 本教程结束后在 docker-compose.yml 文件所在的目录下执行如下命令停止所有容器 docker-compose down在 Flink 所在目录 flink-1.18.0 下执行如下命令停止 Flink 集群 ./bin/stop-cluster.sh
http://www.pierceye.com/news/546915/

相关文章:

  • 如何申请cn域名做网站wordpress 企业网站主题
  • 网站建设 锋云科技公司东莞有什么比较好的网站公司
  • json取数据做网站做问卷哪个网站好
  • 做特产网站的原因手机网站建设技术
  • 唐山企业网站建设公司wordpress 插件 破解
  • 西安自助建站系统做360网站快速排名软件
  • 青岛响应式网站设计规划排版网站
  • 吉林省住房和建设厅网站免费的海报设计软件
  • 求创意设计分享的网站做国产免费视频网站
  • 易橙云做的网站怎么样做网站的设计理念
  • 费县住房和城乡建设局网站谷歌找网站后台
  • 青岛网站建设最便宜应用商城app下载
  • 陕西省建设部官方网站青岛网站制作案例
  • 珠海中国建设银行招聘信息网站刘金鹏做网站
  • 广州住房建设部网站php学校网站建设
  • 企业网站建设顾问网站美工做专题尺寸多少
  • 第一代网站建设技术网站建设前期规划方案范文
  • 网站建设基础心得蓝色的包装材料企业网站模板
  • thinkphp网站开发实战教程做厂房出租有那些推广网站
  • 怎么设自己的网站wordpress后台登陆很慢
  • 响水做网站网站方案书什么东西
  • 青岛seo网站排名优化wordpress页面伪静态
  • 汕尾东莞网站建设wordpress 反斜杠
  • 免费养殖网站模板jquery 的网站模板下载地址
  • 东莞市手机网站建设wordpress异步上传图片
  • 网站阵地建设管理郑州做网站多少钱
  • 自建站平台官方网站建设手机银行
  • 手机端的网站怎么做的苏州网站开发建设
  • wordpress 中型网站重庆seo网站管理
  • 网站有那些风格佛山小程序开发公司