当前位置: 首页 > news >正文

苏州 网站建设页面永久升级

苏州 网站建设,页面永久升级,优普南通网站建设,优秀企业宣传ppt文章目录 今日内容1 Kafka1.1 消息中间件对比1.2 kafka介绍1.3 kafka安装及配置1.4 kafka案例1.4.1 导入kafka客户端1.4.2 编写生产者消费者1.4.3 启动测试1.4.4 多消费者启动 1.5 kafka分区机制1.5.1 topic剖析 1.6 kafka高可用设计1.7 kafka生产者详解1.7.1 同步发送1.7.2 异… 文章目录 今日内容1 Kafka1.1 消息中间件对比1.2 kafka介绍1.3 kafka安装及配置1.4 kafka案例1.4.1 导入kafka客户端1.4.2 编写生产者消费者1.4.3 启动测试1.4.4 多消费者启动 1.5 kafka分区机制1.5.1 topic剖析 1.6 kafka高可用设计1.7 kafka生产者详解1.7.1 同步发送1.7.2 异步发送1.7.3 参数详解1.7.3.1 ack1.7.3.2 retries1.7.3.3 消息压缩 1.8 kafka消费者详解1.8.1 消费者组1.8.2 消息有序性1.8.3 提交和偏移量1.8.3.1 同步提交1.8.3.2 异步提交1.8.3.3 同步异步混合提交 1.9 Spring集成kafka1.9.1 导入依赖1.9.2 创建配置文件1.9.3 创建生产者1.9.4 创建消费者1.9.5 启动类1.9.6 测试 1.10 kafka传递对象1.10.1 创建User1.10.2 添加User的发送和接收 2 自媒体文章上下架2.1 接口定义2.2 Controller2.3 Service2.4 通知Article修改文章配置2.4.1 导入kafka依赖2.4.2 在Nacos中配置kafka的生产者2.4.3 自媒体通知Article2.4.4 在Nacos中配置kafka的消费者2.4.5 配置ap_article_config表2.4.6 article端监听2.4.7 测试 今日内容 1 Kafka 1.1 消息中间件对比 1.2 kafka介绍 1.3 kafka安装及配置 Docker安装zookeeper 拉取镜像 docker pull zookeeper:3.4.14创建容器 docker run -d --name zookeeper -p 2181:2181 zookeeper:3.4.14Docker安装kafka 下载镜像 docker pull wurstmeister/kafka:2.12-2.3.1创建容器 docker run -d --name kafka \ --env KAFKA_ADVERTISED_HOST_NAME192.168.204.129 \ --env KAFKA_ZOOKEEPER_CONNECT192.168.204.129:2181 \ --env KAFKA_ADVERTISED_LISTENERSPLAINTEXT://192.168.204.129:9092 \ --env KAFKA_LISTENERSPLAINTEXT://0.0.0.0:9092 \ --env KAFKA_HEAP_OPTS-Xmx256M -Xms256M \ --nethost wurstmeister/kafka:2.12-2.3.1docker run -d --name kafka \ --env KAFKA_ADVERTISED_HOST_NAME192.168.204.129 \ --env KAFKA_ZOOKEEPER_CONNECT192.168.204.129:2181 \ --env KAFKA_ADVERTISED_LISTENERSPLAINTEXT://192.168.204.129:9092 \ --env KAFKA_LISTENERSPLAINTEXT://0.0.0.0:9092 \ --env KAFKA_HEAP_OPTS-Xmx256M -Xms256M \ -p 9092:9092 wurstmeister/kafka:2.12-2.3.1-p 9092:9092做端口映射 1.4 kafka案例 1.4.1 导入kafka客户端 在heima-leadnews-test模块中创建kafka-demo的模块 dependencygroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactId /dependency1.4.2 编写生产者消费者 创建com.heima.kafka.sample包 下面两个类ConsumerQuickStart和ProducerQuickStart类 生产者 package com.heima.kafka.sample; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;/*** 生产者*/ public class ProducerQuickStart {public static void main(String[] args) {//1.kafka的配置信息Properties properties new Properties();//kafka的连接地址properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,192.168.204.129:9092);//发送失败失败的重试次数properties.put(ProducerConfig.RETRIES_CONFIG,5);//消息key的序列化器properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringSerializer);//消息value的序列化器properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringSerializer);//2.生产者对象KafkaProducerString,String producer new KafkaProducerString, String(properties);/*** 第一个参数topic 第二个参数key 第三个参数value*///封装发送的消息ProducerRecordString,String record new ProducerRecordString, String(topic-first,key-001,hello kafka);//3.发送消息producer.send(record);//4.关闭消息通道必须关闭否则消息发送不成功producer.close();}}消费者 public class ConsumerQuickStart {public static void main(String[] args) {//1.添加kafka的配置信息Properties properties new Properties();//kafka的连接地址properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 192.168.204.129:9092);//消费者组properties.put(ConsumerConfig.GROUP_ID_CONFIG, group1);//消息的反序列化器properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer);properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer);//2.消费者对象KafkaConsumerString, String consumer new KafkaConsumerString, String(properties);//3.订阅主题consumer.subscribe(Collections.singletonList(topic-first));//当前线程一直处于监听状态while (true) {//4.获取消息ConsumerRecordsString, String consumerRecords consumer.poll(Duration.ofMillis(1000));for (ConsumerRecordString, String consumerRecord : consumerRecords) {System.out.println(consumerRecord.key());System.out.println(consumerRecord.value());}}}}1.4.3 启动测试 消费者成功收到消息 1.4.4 多消费者启动 同一个组下只能有一个消费者的收到消息 如果想一对多则需要将消费者放在不同组中 1.5 kafka分区机制 1.5.1 topic剖析 ProducerRecordString,String record new ProducerRecordString, String(topic-first,key-001,0,hello kafka);在发送消息时可以指定分区partition 1.6 kafka高可用设计 1.7 kafka生产者详解 1.7.1 同步发送 /*** 第一个参数topic 第二个参数key 第三个参数value*/ //封装发送的消息 ProducerRecordString,String record new ProducerRecordString, String(topic-first,key-001,hello kafka);//3.发送消息 //producer.send(record);//3.1 同步发送消息 RecordMetadata recordMetadata producer.send(record).get(); System.out.println(同步发送消息结果topicrecordMetadata.topic(),partitionrecordMetadata.partition(),offsetrecordMetadata.offset());发送结果 同步发送消息结果topictopic-first,partition0,offset11.7.2 异步发送 //3.2 异步发送消息 producer.send(record, new Callback() {Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if(e!null){e.printStackTrace();}else{System.out.println(异步发送消息结果topicrecordMetadata.topic(),partitionrecordMetadata.partition(),offsetrecordMetadata.offset());}} });发送结果 异步发送消息结果topictopic-first,partition0,offset21.7.3 参数详解 1.7.3.1 ack 1.7.3.2 retries 1.7.3.3 消息压缩 1.8 kafka消费者详解 1.8.1 消费者组 1.8.2 消息有序性 1.8.3 提交和偏移量 手动提交 //手动提交偏移量 properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);1.8.3.1 同步提交 把enable.auto.commit设置为false,让应用程序决定何时提交偏移量。使用commitSync()提交偏移量commitSync()将会提交poll返回的最新的偏移量所以在处理完所有记录后要确保调用了commitSync()方法。否则还是会有消息丢失的风险。 只要没有发生不可恢复的错误commitSync()方法会一直尝试直至提交成功如果提交失败也可以记录到错误日志里。 while (true){ConsumerRecordsString, String records consumer.poll(Duration.ofMillis(1000));for (ConsumerRecordString, String record : records) {System.out.println(record.value());System.out.println(record.key());try {consumer.commitSync();//同步提交当前最新的偏移量}catch (CommitFailedException e){System.out.println(记录提交失败的异常e);}} }1.8.3.2 异步提交 手动提交有一个缺点那就是当发起提交调用时应用会阻塞。当然我们可以减少手动提交的频率但这个会增加消息重复的概率和自动提交一样。另外一个解决办法是使用异步提交的API commitAsync()。 while (true){ConsumerRecordsString, String records consumer.poll(Duration.ofMillis(1000));for (ConsumerRecordString, String record : records) {System.out.println(record.value());System.out.println(record.key());}consumer.commitAsync(new OffsetCommitCallback() {Overridepublic void onComplete(MapTopicPartition, OffsetAndMetadata map, Exception e) {if(e!null){System.out.println(记录错误的提交偏移量 map,异常信息e);}}}); }1.8.3.3 同步异步混合提交 异步提交也有个缺点那就是如果服务器返回提交失败异步提交不会进行重试。 相比较起来同步提交会进行重试直到成功或者最后抛出异常给应用。异步提交没有实现重试是因为如果同时存在多个异步提交进行重试可能会导致位移覆盖。 举个例子假如我们发起了一个异步提交commitA此时的提交位移为2000随后又发起了一个异步提交commitB且位移为3000commitA提交失败但commitB提交成功此时commitA进行重试并成功的话会将实际上将已经提交的位移从3000回滚到2000导致消息重复消费。 try {while (true){ConsumerRecordsString, String records consumer.poll(Duration.ofMillis(1000));for (ConsumerRecordString, String record : records) {System.out.println(record.value());System.out.println(record.key());}consumer.commitAsync();} }catch (Exception e){e.printStackTrace();System.out.println(记录错误信息e); }finally {try {consumer.commitSync();}finally {consumer.close();} }1.9 Spring集成kafka 1.9.1 导入依赖 在kafka-demo中导入依赖 dependenciesdependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependency!-- kafkfa --dependencygroupIdorg.springframework.kafka/groupIdartifactIdspring-kafka/artifactIdexclusionsexclusiongroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactId/exclusion/exclusions/dependencydependencygroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactId/dependencydependencygroupIdcom.alibaba/groupIdartifactIdfastjson/artifactId/dependency /dependencies1.9.2 创建配置文件 在resources下创建文件application.yml server:port: 9991 spring:application:name: kafka-demokafka:bootstrap-servers: 192.168.204.129:9092producer:retries: 10key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:group-id: ${spring.application.name}-testkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer1.9.3 创建生产者 创建com.heima.kafka.controller.HelloController类负责发送消息 RestController public class HelloController {Autowiredprivate KafkaTemplateString, String kafkaTemplate;GetMapping(/hello)public String hello() {kafkaTemplate.send(itcast-topic, hello kafka);return success;} }1.9.4 创建消费者 建com.heima.kafka.listener.HelloListener类负责监听消息 Component public class HelloListener {KafkaListener(topics itcast-topic)public void listen(String message) {if(!StringUtils.isEmpty(message)) {System.out.println(message message);}} }1.9.5 启动类 SpringBootApplication public class KafkaAppication {public static void main(String[] args) {SpringApplication.run(KafkaAppication.class, args);} }1.9.6 测试 打开localhost:9991/hello 已经接收到消息 1.10 kafka传递对象 1.10.1 创建User 创建com.heima.kafka.pojo.User Data public class User {private String username;private Integer age; }1.10.2 添加User的发送和接收 使用fastjson进行转换 Controller GetMapping(/user) public String user() {User user new User();user.setUsername(zhangsan);user.setAge(20);kafkaTemplate.send(user-topic, JSON.toJSONString(user));return success; }Listener KafkaListener(topics user-topic) public void listenUser(String message) {if(!StringUtils.isEmpty(message)) {User user JSON.parseObject(message, User.class);System.out.println(user);} }2 自媒体文章上下架 2.1 接口定义 2.2 Controller PostMapping(/down_or_up) public ResponseResult downOrUp(RequestBody WmNewsDto wmNewsDto){return wmNewsService.downOrUp(wmNewsDto); }2.3 Service 接口 ResponseResult downOrUp(WmNewsDto wmNewsDto);实现 Override public ResponseResult downOrUp(WmNewsDto wmNewsDto) {// 1.参数检查if(wmNewsDto.getId()null){return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID,文章id不能为空);}// 2.查询文章WmNews wmNews getById(wmNewsDto.getId());if(wmNews null){return ResponseResult.errorResult(AppHttpCodeEnum.DATA_NOT_EXIST,文章不存在);}// 3.修改文章状态if(!wmNews.getStatus().equals(WmNews.Status.PUBLISHED.getCode())){return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID,只有已发布的文章才能上下架);}if(wmNewsDto.getEnable()null){return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID,enable不能为空);}wmNews.setEnable(wmNewsDto.getEnable());updateById(wmNews);// 4.返回结果return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS); }2.4 通知Article修改文章配置 2.4.1 导入kafka依赖 在heima-leadnews-common模块下导入kafka依赖 !-- kafkfa -- dependencygroupIdorg.springframework.kafka/groupIdartifactIdspring-kafka/artifactId /dependency dependencygroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactId /dependency2.4.2 在Nacos中配置kafka的生产者 在自媒体端的nacos配置中心配置kafka的生产者在heima-leadnews-wemedia下的配置文件中配置kafka spring:kafka:bootstrap-servers: 192.168.204.129:9092producer:retries: 10key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer2.4.3 自媒体通知Article 创建com.heima.common.constants.mNewsMessageConstants常量类保存kafka的topic. public class WmNewsMessageConstants {public static final String WM_NEWS_UP_OR_DOWN_TOPICwm.news.up.or.down.topic; }注入kafka Autowired private KafkaTemplateString,String kafkaTemplate;发送消息通知article端修改文章配置 //发送消息通知article端修改文章配置 if(wmNews.getArticleId() ! null){MapString,Object map new HashMap();map.put(articleId,wmNews.getArticleId());map.put(enable,dto.getEnable());kafkaTemplate.send(WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC,JSON.toJSONString(map)); }2.4.4 在Nacos中配置kafka的消费者 在article端的nacos配置中心配置kafka的消费者 spring:kafka:bootstrap-servers: 192.168.204.129:9092consumer:group-id: ${spring.application.name}key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer2.4.5 配置ap_article_config表 因为需要修改ap_article_config所以需要创建对应service和mapper 创捷Servicecom.heima.article.service.ApArticleConfigService接口 public interface ApArticleConfigService extends IServiceApArticleConfig {/*** 修改文章配置* param map*/public void updateByMap(Map map); }实现 Service Slf4j Transactional public class ApArticleConfigServiceImpl extends ServiceImplApArticleConfigMapper, ApArticleConfig implements ApArticleConfigService {/*** 修改文章配置* param map*/Overridepublic void updateByMap(Map map) {//0 下架 1 上架Object enable map.get(enable);boolean isDown true;if(enable.equals(1)){isDown false;}//修改文章配置update(Wrappers.ApArticleConfiglambdaUpdate().eq(ApArticleConfig::getArticleId,map.get(articleId)).set(ApArticleConfig::getIsDown,isDown));} }2.4.6 article端监听 在article端编写监听接收数据 Component Slf4j public class ArtilceIsDownListener {Autowiredprivate ApArticleConfigService apArticleConfigService;KafkaListener(topics WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC)public void onMessage(String message){if(StringUtils.isNotBlank(message)){Map map JSON.parseObject(message, Map.class);apArticleConfigService.updateByMap(map);log.info(article端文章配置修改articleId{},map.get(articleId));}} }2.4.7 测试 启动相应启动类 打开自媒体管理界面准备下架这个新闻 下架该文件发现两张表都已经修改完美进行下架 说明我们kafka的消息传递已经成功。
http://www.pierceye.com/news/380162/

相关文章:

  • 烟台免费网站建设宝应网站开发
  • 用网站做淘宝客的人多吗3liang 设计网站 源码
  • 实训小结网站建设国内外最新新闻
  • 最新网站排名优化方法云龙徐州网站开发
  • 扬州做网站多少钱免费拿货的代理商
  • html做校园网站服装设计图片
  • 做三网站推广一般给多少钱
  • 网站关键词的写法牛肉煲的做法
  • 网站权限怎么设置吉林电商网站建设报价
  • wordpress修改站点名wordpress 插件 调用
  • vs2015做的网站广东省白云区属于哪个市
  • 微信群投票网站怎么做佳木斯做网站公司
  • 建设网站用哪个主机好阳西哪里有做网站
  • 沈阳市有做网站的公司中文企业网站html模板
  • 破解织梦做的网站有什么页游传奇平台好
  • 临安网站开发网站建设做什么费用
  • 辽宁建设工程信息网网站python 网站开发
  • 企业网站.net免费做ppt的网站
  • 浦城 做网站wordpress下载页面
  • 广西住房城乡建设部网站网站优化怎么看
  • 网站建设负责人证明网络营销的10个特点
  • 泉州市服务好的网站设计塘沽网吧开门了吗
  • 商城网站建设哪家公司好wordpress输出到模板
  • 建站报价网站建设培训学校
  • 杭州高端网站定制手机网站开发应注意
  • 深圳网站建设选云聚达做二手元器件那个网站查价格
  • 网站建设公司企业模板微网站开发制作
  • 北京网站制作计划合理的网站结构
  • 网站建设如何搭建框架兰州seo排名
  • 网站作为医院形象建设cms搭建网站