当前位置: 首页 > news >正文

菏泽网站建设公司有哪些天眼查企业查询赵宝荣

菏泽网站建设公司有哪些,天眼查企业查询赵宝荣,海外网站导航,做网站公司赚钱么这里写目录标题 Hello word 模式添加依赖生产者消费者获取信道工具类 Work Queues模式消费者代码 C1开启多线程运行启动 消费者代码 C2生产者代码 消息应答自动应答消息应答的方法Multiple 的解释消息自动重新入队消息手动应答代码消费者API 队列持久化消息持久化不公平分发消息… 这里写目录标题 Hello word 模式添加依赖生产者消费者获取信道工具类 Work Queues模式消费者代码 C1开启多线程运行启动 消费者代码 C2生产者代码 消息应答自动应答消息应答的方法Multiple 的解释消息自动重新入队消息手动应答代码消费者API 队列持久化消息持久化不公平分发消息预取值确认发布单个确认发布批量确认发布异步批量确认发布如何处理异步未确认消息 Hello word 模式 “ P”是我们的生产者“ C”是我们的消费者。中间的框是一个队列-RabbitMQ 代表使用者保留的消息缓冲区 添加依赖 !--rabbitmq 依赖客户端--dependencygroupIdcom.rabbitmq/groupIdartifactIdamqp-client/artifactIdversion5.8.0/version/dependency!--操作文件流的一个依赖--dependencygroupIdcommons-io/groupIdartifactIdcommons-io/artifactIdversion2.6/version/dependency生产者 package com.wlj.rabbitmq.one;import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.wlj.rabbitmq.util.MQUtil;import java.io.IOException; import java.util.concurrent.TimeoutException; /***创建人 wlj*创建时间 2023/7/20*描述 消息生产者 Hello World简单队列模式*/ public class Producer {private static String QUEUE_NAMEhello;public static void main(String[] args) {//创建一个连接工厂ConnectionFactory factory new ConnectionFactory();//工厂IP连接MQ的队列factory.setHost(localhost);//设置用户名factory.setUsername(guest);factory.setPassword(guest);try {//创建连接Connection connection factory.newConnection();//获取信道Channel channel connection.createChannel();//生成一个队列/*** 生成一个队列* 1.队列名称* 2.队列里面的消息是否持久化 默认消息存储在内存中* 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费* 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除* 5.其他参数*/channel.queueDeclare(QUEUE_NAME,false,false,false,null);String messagehello world2;/*** 发送一个消息* 1.发送到那个交换机* 2.路由的 key 是哪个* 3.其他的参数信息* 4.发送消息的消息体*/channel.basicPublish(,QUEUE_NAME,null,message.getBytes());System.out.println(消息发送完毕);} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}}} 消费者 package com.wlj.rabbitmq.one;import com.rabbitmq.client.*; /***创建人 wlj*创建时间 2023/7/20*描述 消息消费者 Hello World简单队列模式*/ public class Consumer {private static String QUEUE_NAMEhello;public static void main(String[] args) {//创建有一个连接工厂ConnectionFactory factory new ConnectionFactory();//工厂IP连接MQ的队列factory.setHost(localhost);//设置用户名factory.setUsername(guest);factory.setPassword(guest);//创建连接try {Connection connection factory.newConnection();//获取信道Channel channel connection.createChannel();/***消费者消费消息* 1.消费那个队列* 2. 消费成功之后 是否手动应答 true 自动应答 false 手动应答* 3.消费者成功消费的回调* 4. 消费者取消消费的回调*///接收消息DeliverCallback deliverCallback(conusmerTag,message)-{System.out.println(消费的消息 new String( message.getBody()));};//取消接收消息的回调CancelCallback cancelCallback (e)-{System.out.println(消费消息被中断);};channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);} catch (Exception e) {e.printStackTrace();}} } 获取信道工具类 package com.wlj.rabbitmq.util;import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;import java.io.IOException; import java.util.concurrent.TimeoutException;/*** 创建人 wlj* 创建时间 2023/7/19* 描述 连接MQ获取信道工具类*/ public class MQUtil {public static Channel getMQ( ) {//创建一个连接工厂ConnectionFactory factory new ConnectionFactory();//工厂IP连接MQ的队列factory.setHost(localhost);//设置用户名factory.setUsername(guest);factory.setPassword(guest);try {//创建连接Connection connection factory.newConnection();//获取信道Channel channel connection.createChannel();return channel;} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}return null;}} Work Queues模式 一个生产者对应多个消费者。每个消费者是竞争关系一个消息只能被一个消费者消费 消费者代码 C1 package com.wlj.rabbitmq.two;import com.rabbitmq.client.Channel; import com.wlj.rabbitmq.util.MQUtil;import java.io.IOException;/***创建人 wlj*创建时间 2023/7/20*描述 这是一个工作线程 也是一个消费者*/ public class WorkerConsumer {private static String QUEUE_NAMEhello;public static void main(String[] args) {Channel channel MQUtil.getMQ();System.out.println(C1);//消息的接收try {channel.basicConsume(QUEUE_NAME,true,(tag,msg)-{System.out.println(消费的消息 new String(msg.getBody()));},e-{System.out.println(消费取消);});} catch (IOException e) {}} } 开启多线程运行 启动 消费者代码 C2 只需要更改输出语句即可 System.out.println(C2);生产者代码 从控制台输入 发送消息 package com.wlj.rabbitmq.two;import com.rabbitmq.client.*; import com.wlj.rabbitmq.util.MQUtil;import java.util.Scanner;/***创建人 wlj*创建时间 2023/7/20*描述 消息消费者 工作线程模式*/ public class WorkerProducer {private static String QUEUE_NAMEhello;public static void main(String[] args) {Channel channel MQUtil.getMQ();try {channel.queueDeclare(QUEUE_NAME, false, false, false, null);//从控制台当中接受信息Scanner scanner new Scanner(System.in);while (scanner.hasNext()){String message scanner.next();/*** 生成一个队列* 1.队列名称* 2.队列里面的消息是否持久化 默认消息存储在内存中* 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费* 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除* 5.其他参数*/channel.basicPublish(,QUEUE_NAME,null,message.getBytes());System.out.println(发送消息完成:message);}} catch (Exception e) {e.printStackTrace();}} } 启动C1、C2和生产者进行测试 消息应答 消费者完成一个任务可能需要一段时间如果其中一个消费者处理一个长的任务并仅只完成 了部分突然它挂掉了会发生什么情况。RabbitMQ 一旦向消费者传递了一条消息便立即将该消 息标记为删除。在这种情况下突然有个消费者挂掉了我们将丢失正在处理的消息。以及后续 发送给该消费这的消息因为它无法接收到。 为了保证消息在发送过程中不丢失rabbitmq 引入消息应答机制消息应答就是:消费者在接 收到消息并且处理该消息之后告诉 rabbitmq 它已经处理了rabbitmq 可以把该消息删除了。 自动应答 消息发送后立即被认为已经传送成功这种模式需要在高吞吐量和数据传输安全性方面做权 衡,因为这种模式如果消息在接收到之前消费者那边出现连接或者 channel 关闭那么消息就丢 失了,当然另一方面这种模式消费者那边可以传递过载的消息没有对传递的消息数量进行限制 当然这样有可能使得消费者这边由于接收太多还来不及处理的消息导致这些消息的积压最终 使得内存耗尽最终这些消费者线程被操作系统杀死所以这种模式仅适用在消费者可以高效并 以某种速率能够处理这些消息的情况下使用。 消息应答的方法 Channel.basicAck(用于肯定确认) RabbitMQ 已知道该消息并且成功的处理消息可以将其丢弃了Channel.basicNack(用于否定确认)Channel.basicReject(用于否定确认) 与 Channel.basicNack 相比少一个参数 不处理该消息了直接拒绝可以将其丢弃了 Multiple 的解释 手动应答的好处是可以批量应答并且减少网络拥堵 multiple 的 true 和 false 代表不同意思 true 代表批量应答 channel 上未应答的消息 比如说 channel 上有传送 tag 的消息 5,6,7,8 当前 tag 是 8 那么此时 5-8 的这些还未应答的消息都会被确认收到消息应答 false 同上面相比 只会应答 tag8 的消息 5,6,7 这三个消息依然不会被确认收到消息应答 消息自动重新入队 如果消费者由于某些原因失去连接(其通道已关闭连接已关闭或 TCP 连接丢失)导致消息 未发送 ACK 确认RabbitMQ 将了解到消息未完全处理并将对其重新排队。如果此时其他消费者 可以处理它将很快将其重新分发给另一个消费者。这样即使某个消费者偶尔死亡也可以确 保不会丢失任何消息。 消息手动应答代码 默认消息采用的是自动应答所以我们要想实现消息消费过程中不丢失需要在接收消息是把自动应答改 为手动应答。之后需要在消息处理完成之后进行手动应答 消费者API ** 首先是消费消息时取消自动应答** 消费消息完成时进行手动应答 package com.wlj.rabbitmq.two;import com.rabbitmq.client.Channel; import com.wlj.rabbitmq.util.MQUtil;import java.io.IOException;/***创建人 wlj*创建时间 2023/7/20*描述 这是一个工作线程 也是一个消费者*/ public class WorkerConsumer {private static String QUEUE_NAMEhello;public static void main(String[] args) {Channel channel MQUtil.getMQ();System.out.println(C2);//消息的接收try {/***消费者消费消息* 1.消费那个队列* 2. 消费成功之后 是否手动应答 true 自动应答 false 手动应答* 3.消费者成功消费的回调* 4. 消费者取消消费的回调*///取消自动应答channel.basicConsume(QUEUE_NAME,false,(tag,msg)-{System.out.println(消费的消息 new String(msg.getBody()));//进行手动应答 参数一消息标记tag 参数二false代表只应答接收到的那个传递的消息。true代表为应答所有消息包括传递过来的消息channel.basicAck(msg.getEnvelope().getDeliveryTag(),true);},e-{System.out.println(消费取消);});} catch (IOException e) {}} } 队列持久化 队列为开启持久化时。MQ重启队列就会被删除掉如果想要队列实现持久化需要在声明队列的时候吧durable参数设置为持久化true boolean durable true;channel.queueDeclare(QUEUE_NAME,durable,false,false,null);需要注意的是,如果之前生声明的队列不是持久化的,需要把原来的队列删除之后,重新声明持久化队列 查看mq队列列表出现D就是持久化成功 消息持久化 要想让消息实现持久化需要在消息生产者修改代码,MessageProperties.PERSISTENT_TEXT_PLAIN 添 加这个属性 可以理解为告诉队列把消息实现持久化保存到磁盘上 boolean durable true;channel.queueDeclare(QUEUE_NAME,durable,false,false,null);String messagehello world2;/*** 发送一个消息* 1.发送到那个交换机* 2.路由的 key 是哪个* 3.其他的参数信息* 4.发送消息的消息体*/channel.basicPublish(,QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());将消息标记为持久化并不能完全保证不会丢失消息。尽管它告诉 RabbitMQ 将消息保存到磁盘但是 这里依然存在当消息刚准备存储在磁盘的时候 但是还没有存储完消息还在缓存的一个间隔点。此时并没 有真正写入磁盘。持久性保证并不强但是对于我们的简单任务队列而言这已经绰绰有余了。如果需要 更强有力的持久化策略参考MQ的发布确认。 不公平分发 MQ默认是采用轮询的方式分发消息但是有的消费者处理很慢就会导致消息积压可以设置不公平分发消费者进行应答之后才会接收下一条消息 生产者代码无变动 package com.wlj.rabbitmq.bugongpingfenfa;import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.MessageProperties;import java.io.IOException; import java.util.Scanner; import java.util.concurrent.TimeoutException;/***创建人 wlj*创建时间 2023/7/20*描述 消息生产者 Hello World简单队列模式*/ public class Producer {private static String QUEUE_NAMEhello3;public static void main(String[] args) {//创建一个连接工厂ConnectionFactory factory new ConnectionFactory();//工厂IP连接MQ的队列factory.setHost(localhost);//设置用户名factory.setUsername(guest);factory.setPassword(guest);try {//创建连接Connection connection factory.newConnection();//获取信道Channel channel connection.createChannel();//生成一个队列/*** 生成一个队列* 1.队列名称* 2.队列里面的消息是否持久化 默认消息存储在内存中* 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费* 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除* 5.其他参数*///从控制台当中接受信息channel.queueDeclare(QUEUE_NAME, false, false, false, null);Scanner scanner new Scanner(System.in);while (scanner.hasNext()) {String message scanner.next();//发布消息channel.basicPublish(, QUEUE_NAME, null, message.getBytes());System.out.println(发送消息完成: message);}} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}}} 消费者代码 在消息消费之前 channel.basicQos(1); 同时开通了手动应答手动应答应该先启动消费者在启动生产者 package com.wlj.rabbitmq.bugongpingfenfa;import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; import com.wlj.rabbitmq.util.MQUtil; import com.wlj.rabbitmq.util.ThreadSleep;import java.io.IOException;/***创建人 wlj*创建时间 2023/7/20*描述 进行不公平分发测试处理慢的消费者接受的消息少处理快的消费者处理的消息多*/ public class WorkerConsumer {private static String QUEUE_NAMEhello3;public static void main(String[] args) {Channel channel MQUtil.getMQ();System.out.println(50秒消费的消息); //消息的接收try {channel.basicQos(1);/***消费者消费消息* 1.消费那个队列* 2. 消费成功之后 是否手动应答 true 自动应答 false 手动应答* 3.消费者成功消费的回调* 4. 消费者取消消费的回调*///接收消息DeliverCallback deliverCallback(conusmerTag, message)-{System.out.println(消费的消息 new String( message.getBody()));ThreadSleep.sleep(50000);channel.basicAck(message.getEnvelope().getDeliveryTag(),false);System.out.println(应答结束);};//取消接收消息的回调CancelCallback cancelCallback (e)-{System.out.println(消费消息被中断);};channel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback);} catch (IOException e) {}} } 进行测试 启动消费者一获取消息之后睡眠十秒再应答。启动消费者二获取消息之后睡眠二十秒再应答。启动生产者测试发送第一条消息由第一个消费者消费。再发送第二条消息此时消费者二是空闲的所以消费者二消费消息发送第三条数据如果第一个消费者十秒结束进行应答那么会得到第三条消息。如果没有进行应答则不能接收到消息 所以得出结论不公平分发是消费者消息处理完应答之后才会接收到下一条消息 消息预取值 确认发布 这是一种简单的确认方式。他是一种同步确认发布的方式也就是发布一个消息之后只有它被确认发布后续的消息才能被发布。waitForCnfirmsOrDie(long)这个方法只有在消息被确认的时候才返回如果指定时间内没有被确认则会抛出异常单个确认发布 生产者API 需要使用 信道开启发布确认 package com.wlj.rabbitmq.three;import com.rabbitmq.client.Channel; import com.wlj.rabbitmq.util.MQUtil;import java.util.Scanner;/*** 创建人 wlj* 创建时间 单个发布确认模式* 描述*/ public class ComfirmSelecr {private static String QUEUE_NAME confirms;public static void main(String[] args) {Channel channel MQUtil.getMQ();try {//开启发布确认channel.confirmSelect();channel.queueDeclare(QUEUE_NAME, false, false, false, null);long l System.currentTimeMillis();for (int i 0; i 1000; i) {//发送消息channel.basicPublish(, QUEUE_NAME, null, (i).getBytes());System.out.println(发布确认模式: i);boolean b channel.waitForConfirms(1000);if (b) {System.out.println(发布成功);}}long l1 System.currentTimeMillis();System.out.println(发布时长 (l1 - l));} catch (Exception e) {e.printStackTrace();}} } 批量确认发布 package com.wlj.rabbitmq.three;import com.rabbitmq.client.Channel; import com.wlj.rabbitmq.util.MQUtil;/*** 创建人 wlj* 创建时间 批量发布确认模式* 描述*/ public class BatchComfirmSelecr {private static String QUEUE_NAME confirms;public static void main(String[] args) {Channel channel MQUtil.getMQ();//开启发布确认try {channel.confirmSelect();channel.queueDeclare(QUEUE_NAME, false, false, false, null);long l System.currentTimeMillis();for (int i 0; i 1000; i) {/*** 生成一个队列* 1.队列名称* 2.队列里面的消息是否持久化 默认消息存储在内存中* 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费* 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除* 5.其他参数*/channel.basicPublish(, QUEUE_NAME, null, (i ).getBytes());System.out.println(发布确认模式: i);// 每一百个一次确认发布if (i % 100 0) {boolean b channel.waitForConfirms(1000);if (b) {System.out.println(发布成功);}}}long l1 System.currentTimeMillis();System.out.println(发布时长 (l1 - l));} catch (Exception e) {e.printStackTrace();}} } 异步批量确认发布 需要添加添加一个异步确认的监听器 处理已被处理的消息或者是未被处理的消息 channel.addConfirmListener(ackCallback, nackCallback); package com.wlj.rabbitmq.three;import com.rabbitmq.client.Channel; import com.rabbitmq.client.ConfirmCallback; import com.wlj.rabbitmq.util.MQUtil;import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap;/*** 创建人 wlj* 创建时间 批量发布确认模式* 描述*/ public class SyncBatchComfirmSelecr {private static String QUEUE_NAME confirms;public static void main(String[] args) {Channel channel MQUtil.getMQ();//开启发布确认try {channel.confirmSelect();/*** 线程安全有序的一个哈希表适用于高并发的情况* 1.轻松的将序号与消息进行关联* 2.轻松批量删除条目 只要给到序列号* 3.支持并发访问*/ConcurrentSkipListMapLong, String outstandingConfirms newConcurrentSkipListMap();/*** 确认收到消息的一个回调* 1.消息序列号* 2.true 可以确认小于等于当前序列号的消息* false 确认当前序列号消息*/ConfirmCallback ackCallback (sequenceNumber, multiple) - {String message outstandingConfirms.get(sequenceNumber);System.out.println(发布的消息--------------- message 已被确认序列号 sequenceNumber);//是否是批量确认if (multiple) {//返回的是小于等于当前序列号的未确认消息 是一个 mapConcurrentNavigableMapLong, String confirmed outstandingConfirms.headMap(sequenceNumber, true);//清除该部分未确认消息confirmed.clear();} else {//只清除当前序列号的消息outstandingConfirms.remove(sequenceNumber);}};ConfirmCallback nackCallback (sequenceNumber, multiple) - {String message outstandingConfirms.get(sequenceNumber);System.out.println(发布的消息************* message 未被确认序列号 sequenceNumber);};/*** 添加一个异步确认的监听器* 1.确认收到消息的回调* 2.未收到消息的回调*/// channel.addConfirmListener(ackCallback, null);channel.addConfirmListener(ackCallback, nackCallback);channel.queueDeclare(QUEUE_NAME, false, false, false, null);long l System.currentTimeMillis();for (int i 0; i 1000; i) {String message 消息 i;/*** channel.getNextPublishSeqNo()获取下一个消息的序列号* 通过序列号与消息体进行一个关联* 全部都是未确认的消息体*/outstandingConfirms.put(channel.getNextPublishSeqNo(), message);/*** 生成一个队列* 1.队列名称* 2.队列里面的消息是否持久化 默认消息存储在内存中* 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费* 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除* 5.其他参数*/channel.basicPublish(, QUEUE_NAME, null, (i ).getBytes());}long l1 System.currentTimeMillis();System.out.println(异步批量确认发布时长 (l1 - l));} catch (Exception e) {e.printStackTrace();}} } 如何处理异步未确认消息 最好的解决的解决方案就是把未确认的消息放到一个基于内存的能被发布线程访问的队列 比如说用 ConcurrentLinkedQueue 这个队列在 confirm callbacks 与发布线程之间进行消息的传 递.上面代码就是用了此方法,发消息是进行记录,在未确认回调函数里面进行处理
http://www.pierceye.com/news/512558/

相关文章:

  • 网站主色调简介怎么说本地常州微信网站建设
  • 电子商务网站数据库建设怎样推广一个网站
  • illustrator 学习网站wordpress外链产品
  • 电脑端网站一般做多宽最好网页游戏制作成本
  • 怎样做好手机网站建设wordpress开启xml rpc
  • 泉州企业网站建设公司做外贸要建什么网站
  • 找人做网站价格永城网站设计公司
  • 如何让网站做网页适配深圳市门户网站建设多少钱
  • 中金超钒 网站建设淘客推广方法
  • 网站建设的基本流程域名备案需要哪些资料
  • 怎么查找网站死链怎么自己做微信推送新闻
  • 做网站的人能看到浏览的人的信息吗怎么刷网站权重
  • 有了域名搭建网站详细步骤服务外包有哪些
  • 外贸网站样式传扬互动网站建设公司
  • 企业网站建设需要哪些资料信息免费推广工具
  • 网站怎么更新网页内容如何把自己的产品放到网上卖
  • jQuery网站建设中倒计时代码提高工作效率的重要性
  • 网站建设业务介绍深圳观澜网站建设
  • 最简单的做网站网站开发项目需求文档
  • wordpress网站打开速度小程序搜索排名帝搜sem880官网
  • 台州做网站公司企业网站seo策略
  • 专业网站建设推广网络营销推广方法和手段有哪些
  • 莘县做网站推广2345浏览器官方网站
  • 深圳网站建设公司为什mrskinlove wordpress
  • html 网站建设中模板网络营销推广与策划
  • 企业管理网站模板asp.net做电商网站设计
  • 萧山建站wordpress主题机制
  • ps可以做网站动态图网页设计参考板式
  • 温州集团网站建设西昌市网站建设公司
  • 奇想网站建设wordpress分页调用代码