怎么学做电子商务网站,知果果网站谁做的,代做网站和说明书,免费企业建站本笔记内容为黑马头条项目的kafka及异步通知文章上下架部分
目录
一、kafka概述
二、kafka安装配置
三、kafka入门
四、kafka高可用设计
1、集群
2、备份机制(Replication#xff09;
五、kafka生产者详解
1、发送类型
2、参数详解
六、kafka消费者详解
1、消费者…本笔记内容为黑马头条项目的kafka及异步通知文章上下架部分
目录
一、kafka概述
二、kafka安装配置
三、kafka入门
四、kafka高可用设计
1、集群
2、备份机制(Replication
五、kafka生产者详解
1、发送类型
2、参数详解
六、kafka消费者详解
1、消费者组
2、消息有序性
3、提交和偏移量
七、springboot集成kafka
1、入门
2、传递消息为对象
八、自媒体文章上下架功能完成
1、需求分析
2、流程说明
3、接口定义
4、自媒体文章上下架-功能实现
5、消息通知article端文章上下架 一、kafka概述 消息中间件对比
特性ActiveMQRabbitMQRocketMQKafka开发语言javaerlangjavascala单机吞吐量万级万级10万级100万级时效性msusmsms级以内可用性高主从高主从非常高分布式非常高分布式功能特性成熟的产品、较全的文档、各种协议支持好并发能力强、性能好、延迟低MQ功能比较完善扩展性佳只支持主要的MQ功能主要应用于大数据领域
消息中间件对比-选择建议
消息中间件建议Kafka追求高吞吐量适合产生大量数据的互联网服务的数据收集业务RocketMQ可靠性要求很高的金融互联网领域,稳定性高经历了多次阿里双11考验RabbitMQ性能较好社区活跃度高数据量没有那么大优先选择功能比较完备的RabbitMQ
kafka介绍
Kafka 是一个分布式流媒体平台,类似于消息队列或企业消息传递系统。kafka官网Apache Kafka
kafka介绍-名词解释 producer发布消息的对象称之为主题生产者Kafka topic producer topicKafka将消息分门别类每一类的消息称之为一个主题Topic consumer订阅消息并处理发布的消息的对象称之为主题消费者consumers broker已发布的消息保存在一组服务器中称之为Kafka集群。集群中的每一个服务器都是一个代理Broker。 消费者可以订阅一个或多个主题topic并从Broker拉数据从而消费这些已发布的消息。
二、kafka安装配置 Kafka对于zookeeper是强依赖保存kafka相关的节点数据所以安装Kafka之前必须先安装zookeeper Docker安装zookeeper 下载镜像
docker pull zookeeper:3.4.14
创建容器
docker run -d --name zookeeper -p 2181:2181 zookeeper:3.4.14 Docker安装kafka 下载镜像
docker pull wurstmeister/kafka:2.12-2.3.1
创建容器
docker run -d --name kafka \
--env KAFKA_ADVERTISED_HOST_NAME192.168.200.130 \
--env KAFKA_ZOOKEEPER_CONNECT192.168.200.130:2181 \
--env KAFKA_ADVERTISED_LISTENERSPLAINTEXT://192.168.200.130:9092 \
--env KAFKA_LISTENERSPLAINTEXT://0.0.0.0:9092 \
--env KAFKA_HEAP_OPTS-Xmx256M -Xms256M \
--nethost wurstmeister/kafka:2.12-2.3.1
三、kafka入门 生产者发送消息多个消费者只能有一个消费者接收到消息 生产者发送消息多个消费者都可以接收到消息
1创建kafka-demo项目导入依赖
dependencygroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactId
/dependency
2生产者发送消息
package com.heima.kafka.sample;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;/*** 生产者*/
public class ProducerQuickStart {public static void main(String[] args) {//1.kafka的配置信息Properties properties new Properties();//kafka的连接地址properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,192.168.200.130:9092);//发送失败失败的重试次数properties.put(ProducerConfig.RETRIES_CONFIG,5);//消息key的序列化器properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringSerializer);//消息value的序列化器properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringSerializer);//2.生产者对象KafkaProducerString,String producer new KafkaProducerString, String(properties);//封装发送的消息ProducerRecordString,String record new ProducerRecordString, String(itheima-topic,100001,hello kafka);//3.发送消息producer.send(record);//4.关闭消息通道必须关闭否则消息发送不成功producer.close();}}
3消费者接收消息
package com.heima.kafka.sample;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.time.Duration;
import java.util.Collections;
import java.util.Properties;/*** 消费者*/
public class ConsumerQuickStart {public static void main(String[] args) {//1.添加kafka的配置信息Properties properties new Properties();//kafka的连接地址properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 192.168.200.130:9092);//消费者组properties.put(ConsumerConfig.GROUP_ID_CONFIG, group2);//消息的反序列化器properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer);properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer);//2.消费者对象KafkaConsumerString, String consumer new KafkaConsumerString, String(properties);//3.订阅主题consumer.subscribe(Collections.singletonList(itheima-topic));//当前线程一直处于监听状态while (true) {//4.获取消息ConsumerRecordsString, String consumerRecords consumer.poll(Duration.ofMillis(1000));for (ConsumerRecordString, String consumerRecord : consumerRecords) {System.out.println(consumerRecord.key());System.out.println(consumerRecord.value());}}}}
总结 生产者发送消息多个消费者订阅同一个主题只能有一个消费者收到消息一对一 生产者发送消息多个消费者订阅同一个主题所有消费者都能收到消息一对多
四、kafka高可用设计 1、集群 Kafka 的服务器端由被称为 Broker 的服务进程构成即一个 Kafka 集群由多个 Broker 组成 这样如果集群中某一台机器宕机其他机器上的 Broker 也依然能够对外提供服务。这其实就是 Kafka 提供高可用的手段之一
2、备份机制(Replication Kafka 中消息的备份又叫做 副本Replica
Kafka 定义了两类副本 领导者副本Leader Replica 追随者副本Follower Replica
同步方式 ISRin-sync replica需要同步复制保存的follower 如果leader失效后需要选出新的leader选举的原则如下 第一选举时优先从ISR中选定因为这个列表中follower的数据是与leader同步的 第二如果ISR列表中的follower都不行了就只能从其他follower中选取 极端情况就是所有副本都失效了这时有两种方案 第一等待ISR中的一个活过来选为Leader数据可靠但活过来的时间不确定 第二选择第一个活过来的Replication不一定是ISR中的选为leader以最快速度恢复可用性但数据不一定完整 五、kafka生产者详解 1、发送类型 同步发送 使用send()方法发送它会返回一个Future对象调用get()方法进行等待就可以知道消息是否发送成功
RecordMetadata recordMetadata producer.send(kvProducerRecord).get();
System.out.println(recordMetadata.offset()); 异步发送 调用send()方法并指定一个回调函数服务器在返回响应时调用函数
//异步消息发送
producer.send(kvProducerRecord, new Callback() {Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if(e ! null){System.out.println(记录异常信息到日志表中);}System.out.println(recordMetadata.offset());}
});
2、参数详解 ack 代码的配置方式
//ack配置 消息确认机制
prop.put(ProducerConfig.ACKS_CONFIG,all);
参数的选择说明
确认机制说明acks0生产者在成功写入消息之前不会等待任何来自服务器的响应,消息有丢失的风险但是速度最快acks1默认值只要集群首领节点收到消息生产者就会收到一个来自服务器的成功响应acksall只有当所有参与赋值的节点全部收到消息时生产者才会收到一个来自服务器的成功响应 retries 生产者从服务器收到的错误有可能是临时性错误在这种情况下retries参数的值决定了生产者可以重发消息的次数如果达到这个次数生产者会放弃重试返回错误默认情况下生产者会在每次重试之间等待100ms
代码中配置方式
//重试次数
prop.put(ProducerConfig.RETRIES_CONFIG,10); 消息压缩 默认情况下 消息发送时不会被压缩。
代码中配置方式
//数据压缩
prop.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,lz4);
压缩算法说明snappy占用较少的 CPU 却能提供较好的性能和相当可观的压缩比 如果看重性能和网络带宽建议采用lz4占用较少的 CPU 压缩和解压缩速度较快压缩比也很客观gzip占用较多的 CPU但会提供更高的压缩比网络带宽有限可以使用这种算法
使用压缩可以降低网络传输开销和存储开销而这往往是向 Kafka 发送消息的瓶颈所在。
六、kafka消费者详解 1、消费者组 消费者组Consumer Group 指的就是由一个或多个消费者组成的群体 一个发布在Topic上消息被分发给此消费者组中的一个消费者 所有的消费者都在一个组中那么这就变成了queue模型 所有的消费者都在不同的组中那么就完全变成了发布-订阅模型
2、消息有序性
应用场景 即时消息中的单对单聊天和群聊保证发送方消息发送顺序与接收方的顺序一致 充值转账两个渠道在同一个时间进行余额变更短信通知必须要有顺序
topic分区中消息只能由消费者组中的唯一一个消费者处理所以消息肯定是按照先后顺序进行处理的。但是它也仅仅是保证Topic的一个分区顺序处理不能保证跨分区的消息先后处理顺序。 所以如果你想要顺序的处理Topic的所有消息那就只提供一个分区。
3、提交和偏移量
kafka不会像其他JMS队列那样需要得到消费者的确认消费者可以使用kafka来追踪消息在分区的位置偏移量
消费者会往一个叫做_consumer_offset的特殊主题发送消息消息里包含了每个分区的偏移量。如果消费者发生崩溃或有新的消费者加入群组就会触发再均衡 正常的情况 如果消费者2挂掉以后会发生再均衡消费者2负责的分区会被其他消费者进行消费
再均衡后不可避免会出现一些问题
问题一 如果提交偏移量小于客户端处理的最后一个消息的偏移量那么处于两个偏移量之间的消息就会被重复处理。
问题二 如果提交的偏移量大于客户端的最后一个消息的偏移量那么处于两个偏移量之间的消息将会丢失。 如果想要解决这些问题还要知道目前kafka提交偏移量的方式 提交偏移量的方式有两种分别是自动提交偏移量和手动提交
自动提交偏移量
当enable.auto.commit被设置为true提交方式就是让消费者自动提交偏移量每隔5秒消费者会自动把从poll()方法接收的最大偏移量提交上去
手动提交 当enable.auto.commit被设置为false可以有以下三种提交方式 提交当前偏移量同步提交 异步提交 同步和异步组合提交
1.提交当前偏移量同步提交
把enable.auto.commit设置为false,让应用程序决定何时提交偏移量。使用commitSync()提交偏移量commitSync()将会提交poll返回的最新的偏移量所以在处理完所有记录后要确保调用了commitSync()方法。否则还是会有消息丢失的风险。
只要没有发生不可恢复的错误commitSync()方法会一直尝试直至提交成功如果提交失败也可以记录到错误日志里。
while (true){ConsumerRecordsString, String records consumer.poll(Duration.ofMillis(1000));for (ConsumerRecordString, String record : records) {System.out.println(record.value());System.out.println(record.key());try {consumer.commitSync();//同步提交当前最新的偏移量}catch (CommitFailedException e){System.out.println(记录提交失败的异常e);}}
}
2.异步提交
手动提交有一个缺点那就是当发起提交调用时应用会阻塞。当然我们可以减少手动提交的频率但这个会增加消息重复的概率和自动提交一样。另外一个解决办法是使用异步提交的API。
while (true){ConsumerRecordsString, String records consumer.poll(Duration.ofMillis(1000));for (ConsumerRecordString, String record : records) {System.out.println(record.value());System.out.println(record.key());}consumer.commitAsync(new OffsetCommitCallback() {Overridepublic void onComplete(MapTopicPartition, OffsetAndMetadata map, Exception e) {if(e!null){System.out.println(记录错误的提交偏移量 map,异常信息e);}}});
}
3.同步和异步组合提交
异步提交也有个缺点那就是如果服务器返回提交失败异步提交不会进行重试。相比较起来同步提交会进行重试直到成功或者最后抛出异常给应用。异步提交没有实现重试是因为如果同时存在多个异步提交进行重试可能会导致位移覆盖。
举个例子假如我们发起了一个异步提交commitA此时的提交位移为2000随后又发起了一个异步提交commitB且位移为3000commitA提交失败但commitB提交成功此时commitA进行重试并成功的话会将实际上将已经提交的位移从3000回滚到2000导致消息重复消费。
try {while (true){ConsumerRecordsString, String records consumer.poll(Duration.ofMillis(1000));for (ConsumerRecordString, String record : records) {System.out.println(record.value());System.out.println(record.key());}consumer.commitAsync();}
}catch (Exception e){e.printStackTrace();System.out.println(记录错误信息e);
}finally {try {consumer.commitSync();}finally {consumer.close();}
}
七、springboot集成kafka 1、入门
1.导入spring-kafka依赖信息
dependenciesdependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependency!-- kafkfa --dependencygroupIdorg.springframework.kafka/groupIdartifactIdspring-kafka/artifactIdexclusionsexclusiongroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactId/exclusion/exclusions/dependencydependencygroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactId/dependencydependencygroupIdcom.alibaba/groupIdartifactIdfastjson/artifactId/dependency
/dependencies
2.在resources下创建文件application.yml
server:port: 9991
spring:application:name: kafka-demokafka:bootstrap-servers: 192.168.200.130:9092producer:retries: 10key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:group-id: ${spring.application.name}-testkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer
3.消息生产者
package com.heima.kafka.controller;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;RestController
public class HelloController {Autowiredprivate KafkaTemplateString,String kafkaTemplate;GetMapping(/hello)public String hello(){kafkaTemplate.send(itcast-topic,黑马程序员);return ok;}
}
4.消息消费者
package com.heima.kafka.listener;import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;Component
public class HelloListener {KafkaListener(topics itcast-topic)public void onMessage(String message){if(!StringUtils.isEmpty(message)){System.out.println(message);}}
}
2、传递消息为对象
目前springboot整合后的kafka因为序列化器是StringSerializer这个时候如果需要传递对象可以有两种方式
方式一可以自定义序列化器对象类型众多这种方式通用性不强本章节不介绍
方式二可以把要传递的对象进行转json字符串接收消息后再转为对象即可本项目采用这种方式 发送消息 GetMapping(/hello)
public String hello(){User user new User();user.setUsername(xiaowang);user.setAge(18);kafkaTemplate.send(user-topic, JSON.toJSONString(user));return ok;
} 接收消息 package com.heima.kafka.listener;import com.alibaba.fastjson.JSON;
import com.heima.kafka.pojo.User;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;Component
public class HelloListener {KafkaListener(topics user-topic)public void onMessage(String message){if(!StringUtils.isEmpty(message)){User user JSON.parseObject(message, User.class);System.out.println(user);}}
}
八、自媒体文章上下架功能完成 1、需求分析 已发表且已上架的文章可以下架 已发表且已下架的文章可以上架
2、流程说明 3、接口定义
说明接口路径/api/v1/news/down_or_up请求方式POST参数DTO响应结果ResponseResult
DTO
Data
public class WmNewsDto {private Integer id;/*** 是否上架 0 下架 1 上架*/private Short enable;}
ResponseResult 4、自媒体文章上下架-功能实现
1.接口定义
在heima-leadnews-wemedia工程下的WmNewsController新增方法
PostMapping(/down_or_up)
public ResponseResult downOrUp(RequestBody WmNewsDto dto){return null;
}
在WmNewsDto中新增enable属性 完整的代码如下
package com.heima.model.wemedia.dtos;import lombok.Data;import java.util.Date;
import java.util.List;Data
public class WmNewsDto {private Integer id;/*** 标题*/private String title;/*** 频道id*/private Integer channelId;/*** 标签*/private String labels;/*** 发布时间*/private Date publishTime;/*** 文章内容*/private String content;/*** 文章封面类型 0 无图 1 单图 3 多图 -1 自动*/private Short type;/*** 提交时间*/private Date submitedTime; /*** 状态 提交为1 草稿为0*/private Short status;/*** 封面图片列表 多张图以逗号隔开*/private ListString images;/*** 上下架 0 下架 1 上架*/private Short enable;
}
2.业务层编写
在WmNewsService新增方法
/*** 文章的上下架* param dto* return*/
public ResponseResult downOrUp(WmNewsDto dto);
实现方法
/*** 文章的上下架* param dto* return*/
Override
public ResponseResult downOrUp(WmNewsDto dto) {//1.检查参数if(dto.getId() null){return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);}//2.查询文章WmNews wmNews getById(dto.getId());if(wmNews null){return ResponseResult.errorResult(AppHttpCodeEnum.DATA_NOT_EXIST,文章不存在);}//3.判断文章是否已发布if(!wmNews.getStatus().equals(WmNews.Status.PUBLISHED.getCode())){return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID,当前文章不是发布状态不能上下架);}//4.修改文章enableif(dto.getEnable() ! null dto.getEnable() -1 dto.getEnable() 2){update(Wrappers.WmNewslambdaUpdate().set(WmNews::getEnable,dto.getEnable()).eq(WmNews::getId,wmNews.getId()));}return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
}
3.控制器
PostMapping(/down_or_up)
public ResponseResult downOrUp(RequestBody WmNewsDto dto){return wmNewsService.downOrUp(dto);
}
测试
5、消息通知article端文章上下架
1.在heima-leadnews-common模块下导入kafka依赖
!-- kafkfa --
dependencygroupIdorg.springframework.kafka/groupIdartifactIdspring-kafka/artifactId
/dependency
dependencygroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactId
/dependency
2.在自媒体端的nacos配置中心配置kafka的生产者
spring:kafka:bootstrap-servers: 192.168.200.130:9092producer:retries: 10key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer
3.在自媒体端文章上下架后发送消息
//发送消息通知article端修改文章配置
if(wmNews.getArticleId() ! null){MapString,Object map new HashMap();map.put(articleId,wmNews.getArticleId());map.put(enable,dto.getEnable());kafkaTemplate.send(WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC,JSON.toJSONString(map));
}
常量类
public class WmNewsMessageConstants {public static final String WM_NEWS_UP_OR_DOWN_TOPICwm.news.up.or.down.topic;
}
4.在article端的nacos配置中心配置kafka的消费者
spring:kafka:bootstrap-servers: 192.168.200.130:9092consumer:group-id: ${spring.application.name}key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer
5.在article端编写监听接收数据
package com.heima.article.listener;import com.alibaba.fastjson.JSON;
import com.heima.article.service.ApArticleConfigService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;import java.util.Map;Component
Slf4j
public class ArtilceIsDownListener {Autowiredprivate ApArticleConfigService apArticleConfigService;KafkaListener(topics WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC)public void onMessage(String message){if(StringUtils.isNotBlank(message)){Map map JSON.parseObject(message, Map.class);apArticleConfigService.updateByMap(map);log.info(article端文章配置修改articleId{},map.get(articleId));}}
}
6.修改ap_article_config表的数据
新建ApArticleConfigService
package com.heima.article.service;import com.baomidou.mybatisplus.extension.service.IService;
import com.heima.model.article.pojos.ApArticleConfig;import java.util.Map;public interface ApArticleConfigService extends IServiceApArticleConfig {/*** 修改文章配置* param map*/public void updateByMap(Map map);
}
实现类
package com.heima.article.service.impl;import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.heima.article.mapper.ApArticleConfigMapper;
import com.heima.article.service.ApArticleConfigService;
import com.heima.model.article.pojos.ApArticleConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;import java.util.Map;Service
Slf4j
Transactional
public class ApArticleConfigServiceImpl extends ServiceImplApArticleConfigMapper, ApArticleConfig implements ApArticleConfigService {/*** 修改文章配置* param map*/Overridepublic void updateByMap(Map map) {//0 下架 1 上架Object enable map.get(enable);boolean isDown true;if(enable.equals(1)){isDown false;}//修改文章配置update(Wrappers.ApArticleConfiglambdaUpdate().eq(ApArticleConfig::getArticleId,map.get(articleId)).set(ApArticleConfig::getIsDown,isDown));}
}
结束