高端建设网站,哪家专门做特卖网站,东莞网站建设对比,秦皇岛建设里小区目录 Work queues 工作队列模式 Pub/Sub 订阅模式
Routing路由模式
Topics通配符模式 工作模式总结 Work queues 工作队列模式
C1和C2属于竞争关系#xff0c;一个消息只有一个消费者可以取到。 代码部分只需要用两个消费者进程监听同一个队里即可。
两个消费者呈现竞争关…目录 Work queues 工作队列模式 Pub/Sub 订阅模式
Routing路由模式
Topics通配符模式 工作模式总结 Work queues 工作队列模式
C1和C2属于竞争关系一个消息只有一个消费者可以取到。 代码部分只需要用两个消费者进程监听同一个队里即可。
两个消费者呈现竞争关系。
用一个生产者推送10条消息 for(int i0;i10;i){String bodyihello rabbitmq!!!;channel.basicPublish(,work_queues,null,body.getBytes());}
两个监听的消费者接收情况如下。 Pub/Sub 订阅模式
一个生产者发送消息后有两个消费者可以收到消息。
生产者把消息发给交换机交换机再把消息通过Routes路由分发给不同的队列。 //发送消息
public class producer_PubSub {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工厂ConnectionFactory factorynew ConnectionFactory();//2.设置参数factory.setHost(); //设置ip地址。默认为127.0.0.1factory.setPort(5672); //端口 默认值5672factory.setVirtualHost(/itcast); //设置虚拟机 默认值/factory.setUsername(yhy); //用户名默认值guestfactory.setPassword(); //密码默认值guest//3.创建连接ConnectionConnection connection factory.newConnection();//4.创建ChannelChannel channel connection.createChannel();/** exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, MapString, Object arguments)* 参数:* 1.exchange : 交换价名称* 2.type : 交换机类型 有四种* DIRECT(direct), 定向FANOUT(fanout), 扇形(广播)发送消息到每一个与之绑定队列TOPIC(topic), 通配符的方式HEADERS(headers); 参数匹配*3.durable :是否持久化* 4.autoDelete:是否自动删除* 5.internal: 内部使用。一般false* 6.arguments:参数* *///5.创建交换机String exchangeNametest_fanout;channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null);//6.创建队列String queue1Nametest_fanout_queue1;String queue2Nametest_fanout_queue2;channel.queueDeclare(queue1Name,true,false,false,null);channel.queueDeclare(queue2Name,true,false,false,null);/** queueBind(String queue, String exchange, String routingKey)* 参数:* queue:队列名* exchange:交换机名称* routingKey:路由键绑定规则* 如果交换机类型为fanoutroutingKey设置为* *///7.绑定队列和交换机channel.queueBind(queue1Name,exchangeName,);channel.queueBind(queue2Name,exchangeName,);String body日志信息:调用了findAll方法;//8.发送消息channel.basicPublish(exchangeName,,null,body.getBytes());//9.释放资源channel.close();connection.close();}
} 运行之后两个队列里面就会多一条消息 两个消费者的代码大同小异只是绑定的队列名不同这里只给其中一个
public class consumer_PubSub1 {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工厂ConnectionFactory factorynew ConnectionFactory();//2.设置参数factory.setHost(); //设置ip地址。默认为127.0.0.1factory.setPort(5672); //端口 默认值5672factory.setVirtualHost(/itcast); //设置虚拟机 默认值/factory.setUsername(yhy); //用户名默认值guestfactory.setPassword(); //密码默认值guest//3.创建连接ConnectionConnection connection factory.newConnection();//4.创建ChannelChannel channel connection.createChannel();String queue1Nametest_fanout_queue1;String queue2Nametest_fanout_queue2;/** basicConsume(String queue, boolean autoAck, Consumer callback)* 参数:* 1.队列名称* 2.autoAck:是否自动确认* 3.callback:回调对象* *///6.接收消息Consumer consumernew DefaultConsumer(channel){/** 回调方法当收到消息后会自动执行该方法* 1.consumerTag:标识* 2.envelope :获取一些信息交换机路由key...* 3.properties: 配置信息* 4.body: 数据* */Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// System.out.println(consumerTag:consumerTag);
// System.out.println(Exchange:envelope.getExchange());
// System.out.println(RoutingKey:envelope.getRoutingKey());
// System.out.println(properties:properties);System.out.println(body:new String(body));System.out.println(将日志信息打印到控制台......);}};channel.basicConsume(queue1Name,true,consumer);//不需要关闭资源}
} 控制台输出有 Routing路由模式 对于特定级别的信息会发送到别的队列如上图的error在发送消息时也会有一个routing只要和后面的队列对应上就可以发送到对应队列。 生产者代码
//发送消息
public class producer_Routing {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工厂ConnectionFactory factorynew ConnectionFactory();//2.设置参数factory.setHost(); //设置ip地址。默认为127.0.0.1factory.setPort(5672); //端口 默认值5672factory.setVirtualHost(/itcast); //设置虚拟机 默认值/factory.setUsername(yhy); //用户名默认值guestfactory.setPassword(); //密码默认值guest//3.创建连接ConnectionConnection connection factory.newConnection();//4.创建ChannelChannel channel connection.createChannel();/** exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, MapString, Object arguments)* 参数:* 1.exchange : 交换价名称* 2.type : 交换机类型 有四种* DIRECT(direct), 定向FANOUT(fanout), 扇形(广播)发送消息到每一个与之绑定队列TOPIC(topic), 通配符的方式HEADERS(headers); 参数匹配*3.durable :是否持久化* 4.autoDelete:是否自动删除* 5.internal: 内部使用。一般false* 6.arguments:参数* *///5.创建交换机String exchangeNametest_direct;channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,true,false,false,null);//6.创建队列String queue1Nametest_direct_queue1;String queue2Nametest_direct_queue2;channel.queueDeclare(queue1Name,true,false,false,null);channel.queueDeclare(queue2Name,true,false,false,null);/** queueBind(String queue, String exchange, String routingKey)* 参数:* queue:队列名* exchange:交换机名称* routingKey:路由键绑定规则* 如果交换机类型为fanoutroutingKey设置为* *///7.绑定队列和交换机//队列1绑定errorchannel.queueBind(queue1Name,exchangeName,error);//队列2绑定error,infowarningchannel.queueBind(queue2Name,exchangeName,info);channel.queueBind(queue2Name,exchangeName,error);channel.queueBind(queue2Name,exchangeName,warning);String body日志信息:调用了findAll方法,级别:info,error,warning;//8.发送消息channel.basicPublish(exchangeName,error,null,body.getBytes());//9.释放资源channel.close();connection.close();}
}
消费者代码两个消费者就绑定队列名不一样
public class consumer_Routing1 {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工厂ConnectionFactory factorynew ConnectionFactory();//2.设置参数factory.setHost(); //设置ip地址。默认为127.0.0.1factory.setPort(5672); //端口 默认值5672factory.setVirtualHost(/itcast); //设置虚拟机 默认值/factory.setUsername(yhy); //用户名默认值guestfactory.setPassword(); //密码默认值guest//3.创建连接ConnectionConnection connection factory.newConnection();//4.创建ChannelChannel channel connection.createChannel();String queue1Nametest_direct_queue1;String queue2Nametest_direct_queue2;/** basicConsume(String queue, boolean autoAck, Consumer callback)* 参数:* 1.队列名称* 2.autoAck:是否自动确认* 3.callback:回调对象* *///6.接收消息Consumer consumernew DefaultConsumer(channel){/** 回调方法当收到消息后会自动执行该方法* 1.consumerTag:标识* 2.envelope :获取一些信息交换机路由key...* 3.properties: 配置信息* 4.body: 数据* */Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// System.out.println(consumerTag:consumerTag);
// System.out.println(Exchange:envelope.getExchange());
// System.out.println(RoutingKey:envelope.getRoutingKey());
// System.out.println(properties:properties);System.out.println(body:new String(body));System.out.println(将日志信息存储到数据库);}};channel.basicConsume(queue1Name,true,consumer);//不需要关闭资源}
} Topics通配符模式
发送消息时设定的routingkey会和后面的routingkey进行匹配。 生产者代码:
//发送消息
public class producer_Topic {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工厂ConnectionFactory factorynew ConnectionFactory();//2.设置参数factory.setHost(); //设置ip地址。默认为127.0.0.1factory.setPort(5672); //端口 默认值5672factory.setVirtualHost(/itcast); //设置虚拟机 默认值/factory.setUsername(yhy); //用户名默认值guestfactory.setPassword(); //密码默认值guest//3.创建连接ConnectionConnection connection factory.newConnection();//4.创建ChannelChannel channel connection.createChannel();/** exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, MapString, Object arguments)* 参数:* 1.exchange : 交换价名称* 2.type : 交换机类型 有四种* DIRECT(direct), 定向FANOUT(fanout), 扇形(广播)发送消息到每一个与之绑定队列TOPIC(topic), 通配符的方式HEADERS(headers); 参数匹配*3.durable :是否持久化* 4.autoDelete:是否自动删除* 5.internal: 内部使用。一般false* 6.arguments:参数* *///5.创建交换机String exchangeNametest_topic;channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true,false,false,null);//6.创建队列String queue1Nametest_topic_queue1;String queue2Nametest_topic_queue2;channel.queueDeclare(queue1Name,true,false,false,null);channel.queueDeclare(queue2Name,true,false,false,null);/** queueBind(String queue, String exchange, String routingKey)* 参数:* queue:队列名* exchange:交换机名称* routingKey:路由键绑定规则* 如果交换机类型为fanoutroutingKey设置为* *///7.绑定队列和交换机// routing key 系统的名称.日志的级别。//需求:所有error级别的日志存入数据库所有order系统的日志存入数据库channel.queueBind(queue1Name,exchangeName,#.error);channel.queueBind(queue1Name,exchangeName,order.*);channel.queueBind(queue2Name,exchangeName,*.*);String body日志信息:调用了findAll方法;//8.发送消息channel.basicPublish(exchangeName,goods.error,null,body.getBytes());//9.释放资源channel.close();connection.close();}
}消费者代码
public class consumer_Topic1 {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工厂ConnectionFactory factorynew ConnectionFactory();//2.设置参数factory.setHost(); //设置ip地址。默认为127.0.0.1factory.setPort(5672); //端口 默认值5672factory.setVirtualHost(/itcast); //设置虚拟机 默认值/factory.setUsername(yhy); //用户名默认值guestfactory.setPassword(); //密码默认值guest//3.创建连接ConnectionConnection connection factory.newConnection();//4.创建ChannelChannel channel connection.createChannel();String queue1Nametest_topic_queue1;String queue2Nametest_topic_queue2;/** basicConsume(String queue, boolean autoAck, Consumer callback)* 参数:* 1.队列名称* 2.autoAck:是否自动确认* 3.callback:回调对象* *///6.接收消息Consumer consumernew DefaultConsumer(channel){/** 回调方法当收到消息后会自动执行该方法* 1.consumerTag:标识* 2.envelope :获取一些信息交换机路由key...* 3.properties: 配置信息* 4.body: 数据* */Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// System.out.println(consumerTag:consumerTag);
// System.out.println(Exchange:envelope.getExchange());
// System.out.println(RoutingKey:envelope.getRoutingKey());
// System.out.println(properties:properties);System.out.println(body:new String(body));System.out.println(将日志信息存储到数据库);}};channel.basicConsume(queue1Name,true,consumer);//不需要关闭资源}
} 工作模式总结