重庆建设造价工程信息网站,关于中秋节网页设计实训报告,企业如何对自己的网站进行建设,dw网页制作成品12页7.1 顺序性场景
7.1.1 场景概述
假设我们要传输一批订单到另一个系统#xff0c;那么订单对应状态的演变是有顺序性要求的。
已下单 → 已支付 → 已确认
不允许错乱#xff01; 7.1.2 顺序级别
1#xff09;全局有序#xff1a;
串行化。每条经过kafka的消息必须严格…7.1 顺序性场景
7.1.1 场景概述
假设我们要传输一批订单到另一个系统那么订单对应状态的演变是有顺序性要求的。
已下单 → 已支付 → 已确认
不允许错乱 7.1.2 顺序级别
1全局有序
串行化。每条经过kafka的消息必须严格保障有序性。
这就要求kafka单通道每个groupid下单消费者
极大的影响性能现实业务下几乎没必要 2局部有序
业务局部有序。同一条订单有序即可不同订单可以并行处理。不同订单的顺序前后无所谓
充分利用kafka多分区的并发性只需要想办法让需要顺序的一批数据进同一分区即可。 7.1.3 实现方案
1发送端
指定key发送keyorder.id即可案例回顾4.2.3PartitionProducer
2发送中
给队列配置多分区保障并发性。
3读取端
单消费者显然不合理
吞吐量显然上不去kafka开多个分区还有何意义
所以开多个消费者指定分区消费理想状况下每个分区配一个。
但是这个吞吐量依然有限那如何处理呢
方案多线程
在每个消费者上再开多线程是个解决办法。但是要警惕顺序性被打破
参考下图thread处理后会将data变成 2-1-3 改进接收后分发二级内存队列
消费者取到消息后不做处理根据key二次分发到多个阻塞队列。
再开启多个线程每个队列分配一个线程处理。提升吞吐量 7.1.4 代码验证
1新建一个sort队列2个分区
2启动order项目
源码参考
SortedProducer顺序性发送端
SortedConsumer顺序性消费端 - 阻塞队列实现方便大家理解设计思路
SortedConsumer2顺序性消费端 - 线程池实现现实中推荐这种方式 3通过swagger请求 7.2 海量同步场景
假设大数据部门需要大屏来展示用户的打车订单情况需要把订单数据送入druid
这里不涉及顺序只要下单就传输但是对实时性和并发量要求较高 7.2.1 常规架构
在下单完成mysql后通过程序代码打印直接进入kafka
或者logback和kafka集成通过log输送
优点
更符合常规的思维。将数据送给想要的部门
缺点
耦合度高将kafka发送消息嵌入了订单下单的主业务形成代码入侵。
下单不关心也不应该关注送入kafka的情况一旦kafka不可用程序受影响
7.2.2 解耦合
借助canal监听订单表的数据变化不再影响主业务。 7.2.3 部署实现
1mysql部署注意需要打开binlog8.0 默认处于开启状态#启动mysql8
docker run --name mysql8 -v /opt/kafka/data/mysql8:/var/lib/mysql -p 3306:3306 -e TZAsia/Shanghai -e MYSQL_ROOT_PASSWORD123456 -d daocloud.io/mysql:8.0
连上mysql执行以下sql添加canal用户CREATE USER canal IDENTIFIED BY canal;
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO canal%;
FLUSH PRIVILEGES;
ALTER USER canal% IDENTIFIED WITH mysql_native_password BY canal;
创建订单表CREATE TABLE orders (id int unsigned NOT NULL AUTO_INCREMENT,name varchar(255) DEFAULT NULL,PRIMARY KEY (id)
);2canal部署#canal.properties
#附带资料里有放到服务器 /opt/kafka/data/canal/ 目录下
#修改servers为你的kafka的机器地址
canal.serverMode kafka
kafka.bootstrap.servers 192.168.10.30:10903,192.168.10.30:10904
#docker-compose.yml
#附带资料里有canal.yml随便找个目录重命名为docker-compose.yml
#修改mysql的链接信息的链接信息
#然后在当前目录下执行 docker-compose up -d
version: 2
services:canal:image: canal/canal-servercontainer_name: canalrestart: alwaysports:- 10908:11111environment:#mysql的链接信息canal.instance.master.address: 192.168.10.30:3306canal.instance.dbUsername: canalcanal.instance.dbPassword: canal#投放到kafka的哪个主题要提前准备好canal.mq.topic: canalvolumes:- /opt/kafka/data/canal/canal.properties:/home/admin/canal-server/conf/canal.properties3数据通道验证进入kafka容器用上面3.2.4里的命令行方式监听canal队列./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic canal在mysql上创建orders表增删数据试一下mysql insert into orders (name) values (张三);
Query OK, 1 row affected (0.03 sec)在kafka控制台可以看到同步的消息{data:[{id:1,name:张三}],database:canal,es:1611657853000,id:5,isDdl:false,mysqlType:{id:int unsigned,name:varchar(255)},old:null,pkNames:[id],sql:,sqlType:{id:4,name:12},table:orders,ts:1611657853802,type:INSERT}数据通道已打通还缺少的是druid作为消费端来接收消息4druid部署#druid.yml
#在附带资料里有
#随便找个目录执行
docker-compose -f druid.yml up -d5验证配置druid的数据源从kafka读取数据验证数据可以正确进入druid。 7.3 kafka监控
7.3.1 eagle简介
Kafka Eagle监控系统是一款用来监控Kafka集群的工具支持管理多个Kafka集群、管理Kafka主题包含查看、删除、创建等、消费者组合消费者实例监控、消息阻塞告警、Kafka集群健康状态查看等。 7.3.2 部署
推荐docker-compose启动
将配备的资料中 eagle.yml 拷贝到服务器任意目录
修改对应的ip地址为你服务器的地址 #注意ip地址192.168.10.30全部换成你自己服务器的version: 3
services:zookeeper:image: zookeeper:3.4.13kafka-1:container_name: kafka-1image: wurstmeister/kafka:2.12-2.2.2ports:- 10903:9092- 10913:10913environment:KAFKA_BROKER_ID: 1 HOST_IP: 192.168.10.30KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181#docker部署必须设置外部可访问ip和端口否则注册进zk的地址将不可达造成外部无法连接KAFKA_ADVERTISED_HOST_NAME: 192.168.10.30KAFKA_ADVERTISED_PORT: 10903 KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremotetrue -Dcom.sun.management.jmxremote.authenticatefalse -Dcom.sun.management.jmxremote.sslfalse -Djava.rmi.server.hostname192.168.10.30 -Dcom.sun.management.jmxremote.rmi.port10913JMX_PORT: 10913volumes:- /etc/localtime:/etc/localtimedepends_on:- zookeeper kafka-2:container_name: kafka-2image: wurstmeister/kafka:2.12-2.2.2ports:- 10904:9092- 10914:10914environment:KAFKA_BROKER_ID: 2 HOST_IP: 192.168.10.30KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181KAFKA_ADVERTISED_HOST_NAME: 192.168.10.30KAFKA_ADVERTISED_PORT: 10904 KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremotetrue -Dcom.sun.management.jmxremote.authenticatefalse -Dcom.sun.management.jmxremote.sslfalse -Djava.rmi.server.hostname192.168.10.30 -Dcom.sun.management.jmxremote.rmi.port10914JMX_PORT: 10914volumes:- /etc/localtime:/etc/localtimedepends_on:- zookeeper eagle:image: gui66497/kafka_eaglecontainer_name: kerestart: alwaysdepends_on:- kafka-1- kafka-2ports:- 10907:8048environment:ZKSERVER: zookeeper:2181执行 docker-compose -f eagle.yml up -d
7.3.3 使用说明
访问 http://192.168.10.30:10907/ke/
默认用户名密码 admin/ 123456
如果要删除topic等操作需要管理token keadmin 与km到底选哪个呢
界面美观程度和监控曲线优于km有登录权限控制功能操作上不如km简单直白但是km需要配置一定的连接信息