模拟网站开发,四叶天代理ip官网,青岛建手机网站哪家好,南京网站推广哪家便宜RabbitMQ的交换机的四种类型
一、Direct类型交换机 Direct类型交换机 ///1.在发送消息的时候#xff0c;通过Direct类型的路由转发#xff1b; //要求Direct类型交换机和队列绑定#xff1b;绑定需要一个标识#xff0c;生产者在发送消息的时候#xff0c;也需要指定一个…RabbitMQ的交换机的四种类型
一、Direct类型交换机 Direct类型交换机 ///1.在发送消息的时候通过Direct类型的路由转发 //要求Direct类型交换机和队列绑定绑定需要一个标识生产者在发送消息的时候也需要指定一个标识消息发送给交换机以后交换机进行标识的匹配知道和交换机绑定队列完全吻合的标识只要是匹配到了就把消息通过交换机转发给当前绑定标识吻合的这个队列中去 //Direct类型的交换机可以做点啥 //如果说有一个生产者发送了很多消息需要把消息分类处理 //消息需要分几类就可以定义几个队列分别把队列和交换机在绑定的是偶分别给出不同的表示发送消息的时候就给出不同标识就可以把消息发送到不同的经过分类的队列中去了 //记录日志记录日志分类的记录如果是异常就需要另外的处理 //还需要来一个所有日志的记录 //定义一个记录所有日志的队列定义一个专门为异常日志存在的队列 //定义一个Dirct类型的交换机分别绑定不同的标识日志生产出来以后就可以根据日志的类型不同发给路由把类型作为标识路由匹配后就可以转发到不同的队列中去中就可以把日志分类处理 生产者
public class DirectExchangeProducer
{public static void Send(){var factory new ConnectionFactory();factory.HostName localhost;//RabbitMQ服务在本地运行factory.UserName guest;//用户名factory.Password guest;//密码 using (var connection factory.CreateConnection()){using (IModel channel connection.CreateModel()){#region 删除队列和交换机channel.ExchangeDelete(DirectExChange);channel.QueueDelete(DirectExchangeLogAllQueue);channel.QueueDelete(DirectExchangeErrorQueue);#endregionchannel.QueueDeclare(queue: DirectExchangeLogAllQueue, durable: true, exclusive: false, autoDelete: false, arguments: null); channel.QueueDeclare(queue: DirectExchangeErrorQueue, durable: true, exclusive: false, autoDelete: false, arguments: null); //交换机的类型typeExchangeTypechannel.ExchangeDeclare(exchange: DirectExChange, type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);//定义四种类型的日志string[] logtypes new string[] { debug, info, warn, error };//把交换机和队列绑定把所有的日志类型作为标识绑定起来//DirectExchangeLogAllQueue用来接受所有的日志消息//交换机和队列可以绑定多个标识foreach (string logtype in logtypes){channel.QueueBind(queue: DirectExchangeLogAllQueue,exchange: DirectExChange,routingKey: logtype);}//针对异常处理的这里DirectExChange 绑定DirectExchangeErrorQueue只指定一个标识就是errorchannel.QueueBind(queue: DirectExchangeErrorQueue,exchange: DirectExChange,routingKey: error);//通过取模得到四种类型的日志各自25个日志信息ListLogMsgModel logList new ListLogMsgModel();for (int i 1; i 100; i){if (i % 4 0){logList.Add(new LogMsgModel() { LogType info, Msg Encoding.UTF8.GetBytes($info第{i}条信息) });}if (i % 4 1){logList.Add(new LogMsgModel() { LogType debug, Msg Encoding.UTF8.GetBytes($debug第{i}条信息) });}if (i % 4 2){logList.Add(new LogMsgModel() { LogType warn, Msg Encoding.UTF8.GetBytes($warn第{i}条信息) });}if (i % 4 3){logList.Add(new LogMsgModel() { LogType error, Msg Encoding.UTF8.GetBytes($error第{i}条信息) });}}Console.WriteLine(生产者发送100条日志信息);logList logList.OrderBy(l l.LogType).ToList();//发送日志信息foreach (var log in logList){channel.BasicPublish(exchange: DirectExChange,routingKey: log.LogType,basicProperties: null,body: log.Msg);Console.WriteLine(${Encoding.UTF8.GetString(log.Msg)} 已发送~~);}}}}public class LogMsgModel{public string LogType { get; set; }public byte[] Msg { get; set; }}
}消费者
public class DirectExchangeConsumerLogAll
{public static void Consumption(){var factory new ConnectionFactory();factory.HostName localhost;//RabbitMQ服务在本地运行factory.UserName guest;//用户名factory.Password guest;//密码 using (var connection factory.CreateConnection()){using (IModel channel connection.CreateModel()){channel.QueueDeclare(queue: DirectExchangeLogAllQueue, durable: true, exclusive: false, autoDelete: false, arguments: null);channel.ExchangeDeclare(exchange: DirectExChange, type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);string[] logtypes new string[] { debug, info, warn, error };foreach (string logtype in logtypes){channel.QueueBind(queue: DirectExchangeLogAllQueue,exchange: DirectExChange,routingKey: logtype);}//消费队列中的所有消息 var consumer new EventingBasicConsumer(channel);consumer.Received (model, ea) {var body ea.Body;var message Encoding.UTF8.GetString(body.ToArray());Console.WriteLine($【{message}】写入文本~~);};//处理消息channel.BasicConsume(queue: DirectExchangeLogAllQueue,autoAck: true,consumer: consumer);Console.ReadLine();}}}
}二、fanout 类型交换机 ///fanout类型的Exchange路由规则非常简单它会把所有发送到该Exchange的消息路由到所有与它绑定的Queue中。 //交换机和队列绑定不需要指定标识对于生产者发过来的消息发给交换机以后只要是整个交换机和队列有绑定交换机就转发给队列 //生产者发送的消息都可以转发给和他绑定额队列 //广播式发布订阅模式一个生产者把消息发送过去多个消费者都可以接受到了 生产者
public class FanoutExchange
{public static void Send(){var factory new ConnectionFactory();factory.HostName localhost;//RabbitMQ服务在本地运行factory.UserName guest;//用户名factory.Password guest;//密码 using (var connection factory.CreateConnection()){using (IModel channel connection.CreateModel()){channel.QueueDeclare(queue: FanoutExchangeZhaoxi001, durable: true, exclusive: false, autoDelete: false, arguments: null);channel.QueueDeclare(queue: FanoutExchangeZhaoxi002, durable: true, exclusive: false, autoDelete: false, arguments: null);//在这里声明一个Fanout 类型的交换机channel.ExchangeDeclare(exchange: FanoutExchange, type: ExchangeType.Fanout, durable: true, autoDelete: false, arguments: null);//交换机绑定队列不需要标识channel.QueueBind(queue: FanoutExchangeZhaoxi001, exchange: FanoutExchange, routingKey: string.Empty, arguments: null);channel.QueueBind(queue: FanoutExchangeZhaoxi002, exchange: FanoutExchange, routingKey: string.Empty, arguments: null);//在控制台输入消息按enter键发送消息int i 1;while (true){var message $通知{i};if (i10){Console.WriteLine(请输入通知~~);message Console.ReadLine();} var body Encoding.UTF8.GetBytes(message);//基本发布channel.BasicPublish(exchange: FanoutExchange,routingKey: string.Empty,basicProperties: null,body: body);Console.WriteLine($通知【{message}】已发送到队列);Thread.Sleep(2000);i;}}}}
}消费者
public class FanoutExchange
{public static void Consumption(){var factory new ConnectionFactory();factory.HostName localhost;//RabbitMQ服务在本地运行factory.UserName guest;//用户名factory.Password guest;//密码 using (var connection factory.CreateConnection()){//创建通道channelusing (var channel connection.CreateModel()){channel.QueueDeclare(queue: FanoutExchangeZhaoxi001, durable: true, exclusive: false, autoDelete: false, arguments: null);channel.QueueDeclare(queue: FanoutExchangeZhaoxi002, durable: true, exclusive: false, autoDelete: false, arguments: null);//在这里声明一个Fanout 类型的交换机channel.ExchangeDeclare(exchange: FanoutExchange, type: ExchangeType.Fanout, durable: true, autoDelete: false, arguments: null);//交换机绑定队列不需要标识channel.QueueBind(queue: FanoutExchangeZhaoxi001, exchange: FanoutExchange, routingKey: string.Empty, arguments: null);channel.QueueBind(queue: FanoutExchangeZhaoxi002, exchange: FanoutExchange, routingKey: string.Empty, arguments: null);//定义消费者 var consumer new EventingBasicConsumer(channel);consumer.Received (model, ea) {var body ea.Body;var message Encoding.UTF8.GetString(body.ToArray());//只是为了演示并没有存入文本文件Console.WriteLine($消费者0接收成功【{message}】邮件通知);};Console.WriteLine(消费者0:通知服务准备就绪...);//处理消息channel.BasicConsume(queue: FanoutExchangeZhaoxi002,autoAck: true,consumer: consumer);Console.ReadLine();}}}public static void Consumption1(){var factory new ConnectionFactory();factory.HostName localhost;//RabbitMQ服务在本地运行factory.UserName guest;//用户名factory.Password guest;//密码 using (var connection factory.CreateConnection()){//创建通道channelusing (var channel connection.CreateModel()){channel.QueueDeclare(queue: FanoutExchangeZhaoxi001, durable: true, exclusive: false, autoDelete: false, arguments: null);channel.QueueDeclare(queue: FanoutExchangeZhaoxi002, durable: true, exclusive: false, autoDelete: false, arguments: null);//在这里声明一个Fanout 类型的交换机channel.ExchangeDeclare(exchange: FanoutExchange, type: ExchangeType.Fanout, durable: true, autoDelete: false, arguments: null);//交换机绑定队列不需要标识channel.QueueBind(queue: FanoutExchangeZhaoxi001, exchange: FanoutExchange, routingKey: string.Empty, arguments: null);channel.QueueBind(queue: FanoutExchangeZhaoxi002, exchange: FanoutExchange, routingKey: string.Empty, arguments: null);//定义消费者 var consumer new EventingBasicConsumer(channel);consumer.Received (model, ea) {var body ea.Body;var message Encoding.UTF8.GetString(body.ToArray());//只是为了演示并没有存入文本文件Console.WriteLine($消费者1接收成功【{message}】邮件通知);};Console.WriteLine(消费者1通知服务准备就绪...);//处理消息channel.BasicConsume(queue: FanoutExchangeZhaoxi001,autoAck: true,consumer: consumer);Console.ReadLine();}}}
}三、Topic 类型交换机 Topic交换机可以做到模糊匹配 Exchange绑定队列需要制定标识 标识 可以有自己的规则标识可以有占位符、通配符*/#*匹配一个单词、#匹配多个单词在Direct基础上加上模糊匹配 生产者
public class TopicExchange
{public static void Send(){var factory new ConnectionFactory();factory.HostName localhost;//RabbitMQ服务在本地运行factory.UserName guest;//用户名factory.Password guest;//密码 using (var connection factory.CreateConnection()){using (IModel channel connection.CreateModel()){//声明一个Topic类型的交换机channel.ExchangeDeclare(exchange: TopicExchange, type: ExchangeType.Topic, durable: true, autoDelete: false, arguments: null);channel.QueueDeclare(queue: ChinaQueue, durable: true, exclusive: false, autoDelete: false, arguments: null);channel.QueueDeclare(queue: newsQueue, durable: true, exclusive: false, autoDelete: false, arguments: null);channel.QueueBind(queue: ChinaQueue, exchange: TopicExchange, routingKey: China.#, arguments: null);channel.QueueBind(queue: newsQueue, exchange: TopicExchange, routingKey: #.news, arguments: null);{string message 来自中国的新闻消息。。。。;var body Encoding.UTF8.GetBytes(message);channel.BasicPublish(exchange: TopicExchange, routingKey: China.news, basicProperties: null, body: body);Console.WriteLine($消息【{message}】已发送到队列);}{string message 来自中国的天气消息。。。。;var body Encoding.UTF8.GetBytes(message);channel.BasicPublish(exchange: TopicExchange, routingKey: China.weather, basicProperties: null, body: body);Console.WriteLine($消息【{message}】已发送到队列);}{string message 来自美国的新闻消息。。。。;var body Encoding.UTF8.GetBytes(message);channel.BasicPublish(exchange: TopicExchange, routingKey: usa.news, basicProperties: null, body: body);Console.WriteLine($消息【{message}】已发送到队列);}{string message 来自美国的天气消息。。。。;var body Encoding.UTF8.GetBytes(message);channel.BasicPublish(exchange: TopicExchange, routingKey: usa.weather, basicProperties: null, body: body);Console.WriteLine($消息【{message}】已发送到队列);}}}}
}消费者
public class TopicExchange
{public static void Consumption(){var factory new ConnectionFactory();factory.HostName localhost;//RabbitMQ服务在本地运行factory.UserName guest;//用户名factory.Password guest;//密码 using (var connection factory.CreateConnection()){using (IModel channel connection.CreateModel()){channel.ExchangeDeclare(exchange: TopicExchange, type: ExchangeType.Topic, durable: true, autoDelete: false, arguments: null);channel.QueueDeclare(queue: ChinaQueue, durable: true, exclusive: false, autoDelete: false, arguments: null);channel.QueueBind(queue: ChinaQueue, exchange: TopicExchange, routingKey: China.#, arguments: null);//定义消费者 var consumer new EventingBasicConsumer(channel);consumer.Received (model, ea) {var body ea.Body;var message Encoding.UTF8.GetString(body.ToArray());Console.WriteLine($接收成功【{message}】);};//处理消息channel.BasicConsume(queue: ChinaQueue,autoAck: true,consumer: consumer);Console.WriteLine(对来自于中国的消息比较感兴趣的 消费者);}}}
}四、Headers 类型交换机 //规则headers类型的Exchange不依赖于routing key与binding key的匹配规则来路由消息而是根据发送的消息内容中的headers属性进行匹配。在绑定Queue与Exchange时指定一组键值对以及x-match参数x-match参数是字符串类型可以设置为any或者all。如果设置为any意思就是只要匹配到了headers表中的任何一对键值即可all则代表需要全部匹配。 生产者
public class HeaderExchange
{public static void Send(){var factory new ConnectionFactory();factory.HostName localhost;//RabbitMQ服务在本地运行factory.UserName guest;//用户名factory.Password guest;//密码 using (var connection factory.CreateConnection()){using (var channel connection.CreateModel()){//声明Headers类型的交换机HeaderExchangechannel.ExchangeDeclare(exchange: HeaderExchange, type: ExchangeType.Headers, durable: false, autoDelete: false, arguments: null);channel.QueueDeclare(queue: HeaderExchangeAllqueue, durable: false, exclusive: false, autoDelete: false, arguments: null);channel.QueueDeclare(queue: HeaderExchangeAnyqueue, durable: false, exclusive: false, autoDelete: false, arguments: null);Console.WriteLine(生产者准备就绪....);//绑定的时候需要给arguments 指定一个字典的实例根据字典中的 { x-match,all/any},//如果{ x-match,all}, 发送消息的时候带的参数列表必须和arguments参数中除了x-match以外其他的必须都具备才能转发到对应的队列中去//如果{ x-match,any},发送消息的时候带的参数列表必须和arguments参数中除了x-match以外任何一个能够匹配就转发到该队列中去channel.QueueBind(queue: HeaderExchangeAllqueue, exchange: HeaderExchange, routingKey: string.Empty,arguments: new Dictionarystring, object {{ x-match,all},{ teacher,Richard},{ pass,123}});{string message teacher和pass都相同时发送的消息;IBasicProperties props channel.CreateBasicProperties();props.Headers new Dictionarystring, object() {{ teacher,Richard},{ pass,123}};var body Encoding.UTF8.GetBytes(message);//基本发布channel.BasicPublish(exchange: HeaderExchange,routingKey: string.Empty,basicProperties: props,body: body);Console.WriteLine($消息【{message}】已发送);}{string message teacher和pass有一个不相同时发送的消息;var props channel.CreateBasicProperties();props.Headers new Dictionarystring, object() {{ teacher,Richard},{ pass,234}};var body Encoding.UTF8.GetBytes(message);channel.BasicPublish(exchange: HeaderExchange,routingKey: string.Empty,basicProperties: props,body: body);Console.WriteLine($消息【{message}】已发送);}Console.WriteLine(**************************************************************);{channel.QueueBind(queue: HeaderExchangeAnyqueue, exchange: HeaderExchange, routingKey: string.Empty,arguments: new Dictionarystring, object {{ x-match,any},{ teacher,Richard},{ pass,123},});string msg teacher和pass完全相同时发送的消息;var props channel.CreateBasicProperties();props.Headers new Dictionarystring, object() {{ teacher,Richard},{ pass,123}};var body Encoding.UTF8.GetBytes(msg);channel.BasicPublish(exchange: HeaderExchange,routingKey: string.Empty,basicProperties: props,body: body);Console.WriteLine($消息【{msg}】已发送);}{string msg teacher和pass有一个不相同时发送的消息;var props channel.CreateBasicProperties();props.Headers new Dictionarystring, object() {{ teacher,Richard},{ pass,234}};var body Encoding.UTF8.GetBytes(msg);channel.BasicPublish(exchange: HeaderExchange,routingKey: string.Empty,basicProperties: props,body: body);Console.WriteLine($消息【{msg}】已发送);}}}Console.ReadKey();}
}如果{ “x-match”,“all”}, 发送消息的时候带的参数列表必须和arguments参数中除了x-match以外其他的必须都具备才能转发到对应的队列中去
如果{ “x-match”,“any”},发送消息的时候带的参数列表必须和arguments参数中除了x-match以外任何一个能够匹配就转发到该队列中去
消费者
消费者同前面因为这种交换机是对发布者的限制