苏州 网站建设,页面永久升级,优普南通网站建设,优秀企业宣传ppt文章目录 今日内容1 Kafka1.1 消息中间件对比1.2 kafka介绍1.3 kafka安装及配置1.4 kafka案例1.4.1 导入kafka客户端1.4.2 编写生产者消费者1.4.3 启动测试1.4.4 多消费者启动 1.5 kafka分区机制1.5.1 topic剖析 1.6 kafka高可用设计1.7 kafka生产者详解1.7.1 同步发送1.7.2 异… 文章目录 今日内容1 Kafka1.1 消息中间件对比1.2 kafka介绍1.3 kafka安装及配置1.4 kafka案例1.4.1 导入kafka客户端1.4.2 编写生产者消费者1.4.3 启动测试1.4.4 多消费者启动 1.5 kafka分区机制1.5.1 topic剖析 1.6 kafka高可用设计1.7 kafka生产者详解1.7.1 同步发送1.7.2 异步发送1.7.3 参数详解1.7.3.1 ack1.7.3.2 retries1.7.3.3 消息压缩 1.8 kafka消费者详解1.8.1 消费者组1.8.2 消息有序性1.8.3 提交和偏移量1.8.3.1 同步提交1.8.3.2 异步提交1.8.3.3 同步异步混合提交 1.9 Spring集成kafka1.9.1 导入依赖1.9.2 创建配置文件1.9.3 创建生产者1.9.4 创建消费者1.9.5 启动类1.9.6 测试 1.10 kafka传递对象1.10.1 创建User1.10.2 添加User的发送和接收 2 自媒体文章上下架2.1 接口定义2.2 Controller2.3 Service2.4 通知Article修改文章配置2.4.1 导入kafka依赖2.4.2 在Nacos中配置kafka的生产者2.4.3 自媒体通知Article2.4.4 在Nacos中配置kafka的消费者2.4.5 配置ap_article_config表2.4.6 article端监听2.4.7 测试 今日内容 1 Kafka
1.1 消息中间件对比 1.2 kafka介绍 1.3 kafka安装及配置 Docker安装zookeeper
拉取镜像
docker pull zookeeper:3.4.14创建容器
docker run -d --name zookeeper -p 2181:2181 zookeeper:3.4.14Docker安装kafka
下载镜像
docker pull wurstmeister/kafka:2.12-2.3.1创建容器
docker run -d --name kafka \
--env KAFKA_ADVERTISED_HOST_NAME192.168.204.129 \
--env KAFKA_ZOOKEEPER_CONNECT192.168.204.129:2181 \
--env KAFKA_ADVERTISED_LISTENERSPLAINTEXT://192.168.204.129:9092 \
--env KAFKA_LISTENERSPLAINTEXT://0.0.0.0:9092 \
--env KAFKA_HEAP_OPTS-Xmx256M -Xms256M \
--nethost wurstmeister/kafka:2.12-2.3.1docker run -d --name kafka \
--env KAFKA_ADVERTISED_HOST_NAME192.168.204.129 \
--env KAFKA_ZOOKEEPER_CONNECT192.168.204.129:2181 \
--env KAFKA_ADVERTISED_LISTENERSPLAINTEXT://192.168.204.129:9092 \
--env KAFKA_LISTENERSPLAINTEXT://0.0.0.0:9092 \
--env KAFKA_HEAP_OPTS-Xmx256M -Xms256M \
-p 9092:9092 wurstmeister/kafka:2.12-2.3.1-p 9092:9092做端口映射
1.4 kafka案例 1.4.1 导入kafka客户端
在heima-leadnews-test模块中创建kafka-demo的模块
dependencygroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactId
/dependency1.4.2 编写生产者消费者
创建com.heima.kafka.sample包
下面两个类ConsumerQuickStart和ProducerQuickStart类
生产者
package com.heima.kafka.sample;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;/*** 生产者*/
public class ProducerQuickStart {public static void main(String[] args) {//1.kafka的配置信息Properties properties new Properties();//kafka的连接地址properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,192.168.204.129:9092);//发送失败失败的重试次数properties.put(ProducerConfig.RETRIES_CONFIG,5);//消息key的序列化器properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringSerializer);//消息value的序列化器properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringSerializer);//2.生产者对象KafkaProducerString,String producer new KafkaProducerString, String(properties);/*** 第一个参数topic 第二个参数key 第三个参数value*///封装发送的消息ProducerRecordString,String record new ProducerRecordString, String(topic-first,key-001,hello kafka);//3.发送消息producer.send(record);//4.关闭消息通道必须关闭否则消息发送不成功producer.close();}}消费者
public class ConsumerQuickStart {public static void main(String[] args) {//1.添加kafka的配置信息Properties properties new Properties();//kafka的连接地址properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 192.168.204.129:9092);//消费者组properties.put(ConsumerConfig.GROUP_ID_CONFIG, group1);//消息的反序列化器properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer);properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer);//2.消费者对象KafkaConsumerString, String consumer new KafkaConsumerString, String(properties);//3.订阅主题consumer.subscribe(Collections.singletonList(topic-first));//当前线程一直处于监听状态while (true) {//4.获取消息ConsumerRecordsString, String consumerRecords consumer.poll(Duration.ofMillis(1000));for (ConsumerRecordString, String consumerRecord : consumerRecords) {System.out.println(consumerRecord.key());System.out.println(consumerRecord.value());}}}}1.4.3 启动测试
消费者成功收到消息 1.4.4 多消费者启动
同一个组下只能有一个消费者的收到消息 如果想一对多则需要将消费者放在不同组中 1.5 kafka分区机制 1.5.1 topic剖析 ProducerRecordString,String record new ProducerRecordString, String(topic-first,key-001,0,hello kafka);在发送消息时可以指定分区partition
1.6 kafka高可用设计 1.7 kafka生产者详解
1.7.1 同步发送 /*** 第一个参数topic 第二个参数key 第三个参数value*/
//封装发送的消息
ProducerRecordString,String record new ProducerRecordString, String(topic-first,key-001,hello kafka);//3.发送消息
//producer.send(record);//3.1 同步发送消息
RecordMetadata recordMetadata producer.send(record).get();
System.out.println(同步发送消息结果topicrecordMetadata.topic(),partitionrecordMetadata.partition(),offsetrecordMetadata.offset());发送结果
同步发送消息结果topictopic-first,partition0,offset11.7.2 异步发送 //3.2 异步发送消息
producer.send(record, new Callback() {Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if(e!null){e.printStackTrace();}else{System.out.println(异步发送消息结果topicrecordMetadata.topic(),partitionrecordMetadata.partition(),offsetrecordMetadata.offset());}}
});发送结果
异步发送消息结果topictopic-first,partition0,offset21.7.3 参数详解
1.7.3.1 ack 1.7.3.2 retries 1.7.3.3 消息压缩 1.8 kafka消费者详解
1.8.1 消费者组 1.8.2 消息有序性 1.8.3 提交和偏移量 手动提交
//手动提交偏移量
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);1.8.3.1 同步提交
把enable.auto.commit设置为false,让应用程序决定何时提交偏移量。使用commitSync()提交偏移量commitSync()将会提交poll返回的最新的偏移量所以在处理完所有记录后要确保调用了commitSync()方法。否则还是会有消息丢失的风险。
只要没有发生不可恢复的错误commitSync()方法会一直尝试直至提交成功如果提交失败也可以记录到错误日志里。 while (true){ConsumerRecordsString, String records consumer.poll(Duration.ofMillis(1000));for (ConsumerRecordString, String record : records) {System.out.println(record.value());System.out.println(record.key());try {consumer.commitSync();//同步提交当前最新的偏移量}catch (CommitFailedException e){System.out.println(记录提交失败的异常e);}}
}1.8.3.2 异步提交
手动提交有一个缺点那就是当发起提交调用时应用会阻塞。当然我们可以减少手动提交的频率但这个会增加消息重复的概率和自动提交一样。另外一个解决办法是使用异步提交的API commitAsync()。 while (true){ConsumerRecordsString, String records consumer.poll(Duration.ofMillis(1000));for (ConsumerRecordString, String record : records) {System.out.println(record.value());System.out.println(record.key());}consumer.commitAsync(new OffsetCommitCallback() {Overridepublic void onComplete(MapTopicPartition, OffsetAndMetadata map, Exception e) {if(e!null){System.out.println(记录错误的提交偏移量 map,异常信息e);}}});
}1.8.3.3 同步异步混合提交
异步提交也有个缺点那就是如果服务器返回提交失败异步提交不会进行重试。
相比较起来同步提交会进行重试直到成功或者最后抛出异常给应用。异步提交没有实现重试是因为如果同时存在多个异步提交进行重试可能会导致位移覆盖。
举个例子假如我们发起了一个异步提交commitA此时的提交位移为2000随后又发起了一个异步提交commitB且位移为3000commitA提交失败但commitB提交成功此时commitA进行重试并成功的话会将实际上将已经提交的位移从3000回滚到2000导致消息重复消费。 try {while (true){ConsumerRecordsString, String records consumer.poll(Duration.ofMillis(1000));for (ConsumerRecordString, String record : records) {System.out.println(record.value());System.out.println(record.key());}consumer.commitAsync();}
}catch (Exception e){e.printStackTrace();System.out.println(记录错误信息e);
}finally {try {consumer.commitSync();}finally {consumer.close();}
}1.9 Spring集成kafka
1.9.1 导入依赖
在kafka-demo中导入依赖
dependenciesdependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependency!-- kafkfa --dependencygroupIdorg.springframework.kafka/groupIdartifactIdspring-kafka/artifactIdexclusionsexclusiongroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactId/exclusion/exclusions/dependencydependencygroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactId/dependencydependencygroupIdcom.alibaba/groupIdartifactIdfastjson/artifactId/dependency
/dependencies1.9.2 创建配置文件
在resources下创建文件application.yml
server:port: 9991
spring:application:name: kafka-demokafka:bootstrap-servers: 192.168.204.129:9092producer:retries: 10key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:group-id: ${spring.application.name}-testkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer1.9.3 创建生产者
创建com.heima.kafka.controller.HelloController类负责发送消息
RestController
public class HelloController {Autowiredprivate KafkaTemplateString, String kafkaTemplate;GetMapping(/hello)public String hello() {kafkaTemplate.send(itcast-topic, hello kafka);return success;}
}1.9.4 创建消费者
建com.heima.kafka.listener.HelloListener类负责监听消息
Component
public class HelloListener {KafkaListener(topics itcast-topic)public void listen(String message) {if(!StringUtils.isEmpty(message)) {System.out.println(message message);}}
}1.9.5 启动类
SpringBootApplication
public class KafkaAppication {public static void main(String[] args) {SpringApplication.run(KafkaAppication.class, args);}
}1.9.6 测试
打开localhost:9991/hello 已经接收到消息
1.10 kafka传递对象 1.10.1 创建User
创建com.heima.kafka.pojo.User
Data
public class User {private String username;private Integer age;
}1.10.2 添加User的发送和接收
使用fastjson进行转换
Controller
GetMapping(/user)
public String user() {User user new User();user.setUsername(zhangsan);user.setAge(20);kafkaTemplate.send(user-topic, JSON.toJSONString(user));return success;
}Listener
KafkaListener(topics user-topic)
public void listenUser(String message) {if(!StringUtils.isEmpty(message)) {User user JSON.parseObject(message, User.class);System.out.println(user);}
}2 自媒体文章上下架 2.1 接口定义 2.2 Controller
PostMapping(/down_or_up)
public ResponseResult downOrUp(RequestBody WmNewsDto wmNewsDto){return wmNewsService.downOrUp(wmNewsDto);
}2.3 Service
接口
ResponseResult downOrUp(WmNewsDto wmNewsDto);实现
Override
public ResponseResult downOrUp(WmNewsDto wmNewsDto) {// 1.参数检查if(wmNewsDto.getId()null){return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID,文章id不能为空);}// 2.查询文章WmNews wmNews getById(wmNewsDto.getId());if(wmNews null){return ResponseResult.errorResult(AppHttpCodeEnum.DATA_NOT_EXIST,文章不存在);}// 3.修改文章状态if(!wmNews.getStatus().equals(WmNews.Status.PUBLISHED.getCode())){return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID,只有已发布的文章才能上下架);}if(wmNewsDto.getEnable()null){return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID,enable不能为空);}wmNews.setEnable(wmNewsDto.getEnable());updateById(wmNews);// 4.返回结果return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
}2.4 通知Article修改文章配置
2.4.1 导入kafka依赖
在heima-leadnews-common模块下导入kafka依赖
!-- kafkfa --
dependencygroupIdorg.springframework.kafka/groupIdartifactIdspring-kafka/artifactId
/dependency
dependencygroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactId
/dependency2.4.2 在Nacos中配置kafka的生产者
在自媒体端的nacos配置中心配置kafka的生产者在heima-leadnews-wemedia下的配置文件中配置kafka
spring:kafka:bootstrap-servers: 192.168.204.129:9092producer:retries: 10key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer2.4.3 自媒体通知Article
创建com.heima.common.constants.mNewsMessageConstants常量类保存kafka的topic.
public class WmNewsMessageConstants {public static final String WM_NEWS_UP_OR_DOWN_TOPICwm.news.up.or.down.topic;
}注入kafka
Autowired
private KafkaTemplateString,String kafkaTemplate;发送消息通知article端修改文章配置
//发送消息通知article端修改文章配置
if(wmNews.getArticleId() ! null){MapString,Object map new HashMap();map.put(articleId,wmNews.getArticleId());map.put(enable,dto.getEnable());kafkaTemplate.send(WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC,JSON.toJSONString(map));
}2.4.4 在Nacos中配置kafka的消费者
在article端的nacos配置中心配置kafka的消费者
spring:kafka:bootstrap-servers: 192.168.204.129:9092consumer:group-id: ${spring.application.name}key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer2.4.5 配置ap_article_config表
因为需要修改ap_article_config所以需要创建对应service和mapper 创捷Servicecom.heima.article.service.ApArticleConfigService接口
public interface ApArticleConfigService extends IServiceApArticleConfig {/*** 修改文章配置* param map*/public void updateByMap(Map map);
}实现
Service
Slf4j
Transactional
public class ApArticleConfigServiceImpl extends ServiceImplApArticleConfigMapper, ApArticleConfig implements ApArticleConfigService {/*** 修改文章配置* param map*/Overridepublic void updateByMap(Map map) {//0 下架 1 上架Object enable map.get(enable);boolean isDown true;if(enable.equals(1)){isDown false;}//修改文章配置update(Wrappers.ApArticleConfiglambdaUpdate().eq(ApArticleConfig::getArticleId,map.get(articleId)).set(ApArticleConfig::getIsDown,isDown));}
}2.4.6 article端监听
在article端编写监听接收数据
Component
Slf4j
public class ArtilceIsDownListener {Autowiredprivate ApArticleConfigService apArticleConfigService;KafkaListener(topics WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC)public void onMessage(String message){if(StringUtils.isNotBlank(message)){Map map JSON.parseObject(message, Map.class);apArticleConfigService.updateByMap(map);log.info(article端文章配置修改articleId{},map.get(articleId));}}
}2.4.7 测试
启动相应启动类 打开自媒体管理界面准备下架这个新闻 下架该文件发现两张表都已经修改完美进行下架 说明我们kafka的消息传递已经成功。