游戏平台十大排名,南宁seo优化公司,wordpress 注册 中文,做企业网站要多少钱前言
Redis发布订阅#xff08;Pub/Sub#xff09;是Redis提供的一种消息传递机制#xff0c;它使用“发布者-订阅者”#xff08;publisher-subscriber#xff09;模式来处理消息传递。在这种模式下#xff0c;发布者将消息发布到一组订阅者中#xff0c;而无需关心谁…前言
Redis发布订阅Pub/Sub是Redis提供的一种消息传递机制它使用“发布者-订阅者”publisher-subscriber模式来处理消息传递。在这种模式下发布者将消息发布到一组订阅者中而无需关心谁是订阅者也不需要知道订阅者是否收到了消息。
发布者和订阅者模式允许多个客户端之间建立一个复杂的通信拓扑。在这种模式下发布者可以发布消息到一个特定的主题订阅者可以订阅一个或多个主题并在发布者发布消息时收到消息。由于发布者和订阅者不必直接连接因此发布者和订阅者可以完全独立地运行只要它们都连接到Redis实例即可。
Redis发布订阅支持多种消息类型包括文本、字节数组和数字等。 Redis还支持订阅者识别特定消息通过模式匹配功能可以基于主题模式或模式来检索消息。Redis还提供了许多API来帮助您实现发布/订阅模式因此您可以使用Redis的发布/订阅功能来构建分布式应用程序。
Redis Pub/Sub(发布/订阅) 命令
Redis发布/订阅Pub/Sub分为两种
第一种基于频道(Channel)的发布/订阅。第二种基于模式(pattern)的发布/订阅。
确实Redis提供了一系列的Pub/Sub命令来支持基于频道和基于模式的发布/订阅模式。以下是一些常用的Pub/Sub命令
基于频道的发布/订阅
发布消息到指定频道
PUBLISH channel message例如
PUBLISH my-channel Hello, Redis!这将向名为my-channel的频道发布消息Hello, Redis!。
订阅一个或多个频道
SUBSCRIBE channel channel ...例如
SUBSCRIBE my-channel your-channel这将订阅my-channel和your-channel两个频道。
取消订阅一个或多个频道
UNSUBSCRIBE [channel channel ...]例如
UNSUBSCRIBE my-channel your-channel这将取消订阅my-channel和your-channel两个频道。
基于模式的发布/订阅
订阅一个或多个匹配模式
PSUBSCRIBE pattern pattern ...例如
PSUBSCRIBE news-*这将订阅所有以news-开头的频道。
取消订阅一个或多个匹配模式
PUNSUBSCRIBE [pattern pattern ...]例如
PUNSUBSCRIBE news-*这将取消订阅所有以news-开头的频道。
注意Pub/Sub命令可以在客户端和服务器之间进行通信用于实现消息的发布和订阅。这些命令是异步执行的发送命令后订阅者将在接收到消息时收到通知。 Pub/Sub是一个强大的工具用于实现实时消息传递和事件通知。
实战示例
基于MessageListener实现
创建消息接收者
创建一个接收消息的Bean。
package com.example.demo.redis;import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;/*** 实现MessageListener的监听类*/
Slf4j
Component
public class RedisMessageSubscriber implements MessageListener {Autowiredprivate MessageProcessor messageProcessor;Overridepublic void onMessage(Message message, byte[] pattern) {String channel new String(message.getChannel());String body new String(message.getBody());log.info( 当前执行的方法onMessage);// 处理消息messageProcessor.processMessage(channel, body);}}创建消息处理器
创建一个处理接收到的消息的Bean。
package com.example.demo.redis;
import org.springframework.stereotype.Service;Service
public class MessageProcessor {public void processMessage(String channel, String message) {System.out.println(Received message: message from channel: channel);// 在这里进行具体的消息处理逻辑}
}创建消息发送者
创建一个发送消息的Bean。
Redis有两种发布/订阅模式
基于频道(Channel)的发布/订阅基于模式(pattern)的发布/订阅
package com.example.demo.redis;// RedisMessagePublisher.javaimport org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;Component
public class RedisMessagePublisher {Autowiredprivate RedisTemplateString, String redisTemplate;public void publishMessage(String channel, String message) {// 基于模式(pattern)的发布/订阅// redisTemplate.convertAndSend(your-pattern-channel-1, Hello, Redis!);// 基于频道(Channel)的发布/订阅redisTemplate.convertAndSend(channel, message);}
}使用消息发送者发送消息
在需要发送消息的地方注入RedisMessagePublisher并使用它来发送消息。
package com.example.demo.redis;// MessageController.java
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;RestController
RequestMapping(/api/message)
public class MessageController {Autowiredprivate RedisMessagePublisher messagePublisher;GetMapping(/send)public String sendMessage(RequestParam(value channel) String channel , RequestParam(value messgage) String messgage) {messagePublisher.publishMessage(your-channel, Hello, Redis!);messagePublisher.publishMessage(channel, messgage);return Message sent successfully!;}
}相关原理说明
发布/订阅模型 Redis提供了一种发布/订阅Pub/Sub模型其中消息发送者发布者将消息发送到一个或多个通道而消息接收者订阅者则监听一个或多个通道以接收消息。消息监听器 RedisMessageSubscriber 实现了 MessageListener 接口它监听指定通道上的消息。在这里我们将接收到的消息传递给 MessageProcessor 进行处理。消息处理器 MessageProcessor 是一个简单的服务用于处理接收到的消息。在实际应用中你可以在这里添加业务逻辑来处理消息。消息发布者 RedisMessagePublisher 用于发布消息到指定的通道。在 sendMessage 方法中我们使用 convertAndSend 方法将消息发送到名为 “your-channel” 的通道。消息发送端点 MessageController 是一个简单的REST控制器用于触发消息发送。在这里我们通过调用 messagePublisher.publishMessage 来发送消息。
总体来说这个实现充分利用了Redis的发布/订阅功能通过将消息发送者、消息接收者和消息处理器分离使系统更加模块化和灵活。
RedisConfig配置
package com.example.demo.redis;import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;Slf4j
Configuration
public class RedisConfig {/*** 配置订阅* 基于MessageListenerAdapter和RedisMessageListenerContainer* param redisMessageSubscriber* return*/Beanpublic MessageListenerAdapter messageListenerAdapter(RedisMessageSubscriber redisMessageSubscriber) {return new MessageListenerAdapter(redisMessageSubscriber, handleMessage);}Beanpublic RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory,MessageListenerAdapter messageListenerAdapter) {RedisMessageListenerContainer container new RedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);// 添加消息监听器和监听的频道// 基于频道的发布/订阅container.addMessageListener(messageListenerAdapter, new ChannelTopic(your-channel));// 基于模式的发布/订阅container.addMessageListener(messageListenerAdapter, new PatternTopic(your-pattern-*));return container;}/*** 基于MessageListener的配置* 直接使用RedisMessageSubscriber* param connectionFactory* param redisMessageSubscriber* return*/Beanpublic RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory, RedisMessageSubscriber redisMessageSubscriber) {RedisMessageListenerContainer container new RedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);// 在这里设置你的动态频道名称可以从配置文件或其他地方获取String dynamicChannel your-channel;// 基于频道的发布/订阅ChannelTopic channelTopic new ChannelTopic(dynamicChannel);// 基于模式的发布/订阅// container.addMessageListener(redisMessageSubscriber, new PatternTopic(your-pattern-*));// 添加消息监听器和监听的动态频道container.addMessageListener(redisMessageSubscriber, channelTopic);return container;}// /**
// * 多频道示例
// * param connectionFactory
// * param redisMessageSubscriber
// * return
// */
// Bean
// public RedisMessageListenerContainer redisMessageListenerContainerMoreTopic(
// RedisConnectionFactory connectionFactory,
// RedisMessageSubscriber redisMessageSubscriber
// ) {
// RedisMessageListenerContainer container new RedisMessageListenerContainer();
// container.setConnectionFactory(connectionFactory);
//
// // 在这里设置你的动态频道名称列表可以从配置文件或其他地方获取
// ListString dynamicChannels Arrays.asList(channel1, channel2, channel3);
//
// // 创建包含所有频道的ChannelTopic列表
// ListChannelTopic channelTopics createChannelTopics(dynamicChannels);
//
// // 添加消息监听器和监听的多个频道
// for (ChannelTopic channelTopic : channelTopics) {
// container.addMessageListener(redisMessageSubscriber, channelTopic);
// }
//
// return container;
// }
//
// private ListChannelTopic createChannelTopics(ListString channelNames) {
// // 使用动态频道名称创建ChannelTopic列表
// ListChannelTopic channelTopics new ArrayList();
// for (String channelName : channelNames) {
// channelTopics.add(new ChannelTopic(channelName));
// }
// return channelTopics;
// }
}这段代码是用于配置并创建 RedisMessageListenerContainer 的方法。RedisMessageListenerContainer 是 Spring Data Redis 提供的一个用于监听 Redis 消息的容器。以下是对代码的详细解释
方法签名
RedisConnectionFactory connectionFactory这是用于创建 Redis 连接的工厂。RedisMessageSubscriber redisMessageSubscriber这是一个 Redis 消息订阅者用于处理接收到的消息。
创建 RedisMessageListenerContainer 实例
RedisMessageListenerContainer container new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);创建一个新的 RedisMessageListenerContainer 实例。将 connectionFactory 设置为容器的连接工厂用于创建连接到 Redis 的连接。
设置动态频道名称
String dynamicChannel your-channel;定义一个动态频道名称可以从配置文件或其他地方获取。
创建 ChannelTopic 对象
ChannelTopic channelTopic new ChannelTopic(dynamicChannel);创建一个 ChannelTopic 对象表示基于频道的发布/订阅其中 dynamicChannel 是频道名称。
添加消息监听器和频道
container.addMessageListener(redisMessageSubscriber, channelTopic);将 redisMessageSubscriber 添加为消息监听器用于处理接收到的消息。指定要监听的频道这里使用了基于频道的发布/订阅模式。
返回 RedisMessageListenerContainer 实例
return container;返回配置好的 RedisMessageListenerContainer 实例。
通过以上步骤这段代码的目的是创建一个配置好的 RedisMessageListenerContainer该容器已设置好连接工厂、消息监听器以及要监听的动态频道。当 Redis 中的指定频道发布消息时redisMessageSubscriber 的 onMessage 方法将被调用来处理消息。这是一种基于频道的发布/订阅模式允许应用程序实时地接收并处理消息。
自定义监听的回调函数
创建消息接收者
不需要实现MessageListener接口
package com.example.demo.redis;import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;/*** 自定义的监听类*/
Slf4j
Component
public class RedisMessageCustomSubscriber {Autowiredprivate MessageProcessor messageProcessor;/*** 自定义的回调函数* param message* param channel*/public void handleMessage(String message, String channel) {log.info( 当前执行的方法handleMessage);messageProcessor.processMessage(channel, message);}
}RedisConfig配置
package com.example.demo.redis;import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;Slf4j
Configuration
public class RedisConfig {/*** 配置订阅* 基于MessageListenerAdapter和RedisMessageListenerContainer* param redisMessageSubscriber* return*/Beanpublic MessageListenerAdapter messageListenerAdapter(RedisMessageSubscriber redisMessageSubscriber) {return new MessageListenerAdapter(redisMessageSubscriber, handleMessage);}Beanpublic RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory,MessageListenerAdapter messageListenerAdapter) {RedisMessageListenerContainer container new RedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);// 添加消息监听器和监听的频道// 基于频道的发布/订阅container.addMessageListener(messageListenerAdapter, new ChannelTopic(your-channel));// 基于模式的发布/订阅container.addMessageListener(messageListenerAdapter, new PatternTopic(your-pattern-*));return container;}// /**
// * 基于MessageListener的配置
// * 直接使用RedisMessageSubscriber
// * param connectionFactory
// * param redisMessageSubscriber
// * return
// */
// Bean
// public RedisMessageListenerContainer redisMessageListenerContainer(
// RedisConnectionFactory connectionFactory, RedisMessageSubscriber redisMessageSubscriber) {
//
// RedisMessageListenerContainer container new RedisMessageListenerContainer();
// container.setConnectionFactory(connectionFactory);
//
// // 在这里设置你的动态频道名称可以从配置文件或其他地方获取
// String dynamicChannel your-channel;
//
// // 基于频道的发布/订阅
// ChannelTopic channelTopic new ChannelTopic(dynamicChannel);
//
// // 基于模式的发布/订阅
// // container.addMessageListener(redisMessageSubscriber, new PatternTopic(your-pattern-*));
//
// // 添加消息监听器和监听的动态频道
// container.addMessageListener(redisMessageSubscriber, channelTopic);
//
// return container;
// }// /**
// * 多频道示例
// * param connectionFactory
// * param redisMessageSubscriber
// * return
// */
// Bean
// public RedisMessageListenerContainer redisMessageListenerContainerMoreTopic(
// RedisConnectionFactory connectionFactory,
// RedisMessageSubscriber redisMessageSubscriber
// ) {
// RedisMessageListenerContainer container new RedisMessageListenerContainer();
// container.setConnectionFactory(connectionFactory);
//
// // 在这里设置你的动态频道名称列表可以从配置文件或其他地方获取
// ListString dynamicChannels Arrays.asList(channel1, channel2, channel3);
//
// // 创建包含所有频道的ChannelTopic列表
// ListChannelTopic channelTopics createChannelTopics(dynamicChannels);
//
// // 添加消息监听器和监听的多个频道
// for (ChannelTopic channelTopic : channelTopics) {
// container.addMessageListener(redisMessageSubscriber, channelTopic);
// }
//
// return container;
// }
//
// private ListChannelTopic createChannelTopics(ListString channelNames) {
// // 使用动态频道名称创建ChannelTopic列表
// ListChannelTopic channelTopics new ArrayList();
// for (String channelName : channelNames) {
// channelTopics.add(new ChannelTopic(channelName));
// }
// return channelTopics;
// }
}这段代码配置了两个 Bean 方法一个用于创建 MessageListenerAdapter 实例另一个用于创建 RedisMessageListenerContainer 实例。以下是详细解释
创建 MessageListenerAdapter 实例
Bean
public MessageListenerAdapter messageListenerAdapter(RedisMessageSubscriber redisMessageSubscriber) {return new MessageListenerAdapter(redisMessageSubscriber, handleMessage);
}
通过 Bean 注解创建一个 MessageListenerAdapter 实例。将 RedisMessageSubscriber 对象传递给构造函数表示这个适配器将调用 RedisMessageSubscriber 的方法来处理消息。第二个参数 handleMessage 表示要调用的消息处理方法的名称。
创建 RedisMessageListenerContainer 实例
Bean
public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory,MessageListenerAdapter messageListenerAdapter
) {RedisMessageListenerContainer container new RedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);通过 Bean 注解创建一个 RedisMessageListenerContainer 实例。将 RedisConnectionFactory 传递给构造函数表示这个容器将使用的连接工厂。
添加消息监听器和频道
// 添加消息监听器和监听的频道
// 基于频道的发布/订阅
container.addMessageListener(messageListenerAdapter, new ChannelTopic(your-channel));// 基于模式的发布/订阅
container.addMessageListener(messageListenerAdapter, new PatternTopic(your-pattern-*));
使用 container.addMessageListener 方法添加消息监听器messageListenerAdapter。第一个参数是消息监听器适配器它会调用 RedisMessageSubscriber 的 handleMessage 方法来处理消息。第二个参数是 ChannelTopic 对象表示基于频道的发布/订阅模式监听指定的频道。第三个参数是 PatternTopic 对象表示基于模式的发布/订阅模式监听指定模式的频道。
返回 RedisMessageListenerContainer 实例
return container;返回配置好的 RedisMessageListenerContainer 实例。
通过这样的配置RedisMessageListenerContainer 已经设置好了连接工厂和消息监听器并分别基于频道和基于模式的发布/订阅模式来监听相应的消息。当 Redis 中的指定频道发布消息时handleMessage 方法将被调用来处理消息。
区别
MessageListenerAdapter 和 RedisMessageListenerContainer 是 Spring Data Redis 提供的两个重要组件用于实现 Redis 消息监听的机制。
MessageListenerAdapter
MessageListenerAdapter 是一个适配器用于将普通的 Java 对象POJO转换为 Redis 消息监听器。它通过反射调用目标对象的方法来处理接收到的消息。在你的 POJO 类中你可以定义一个或多个方法来处理不同类型的消息。
主要特点和用法
将普通的 Java 对象转换为 Redis 的消息监听器。可以指定调用目标对象的特定方法来处理消息。通过设置消息转换器可以支持多种消息格式如 JSON、XML 等。提供了一种简化消息处理逻辑的方式避免了直接实现 MessageListener 接口的繁琐性。
RedisMessageListenerContainer
RedisMessageListenerContainer 是 Spring 提供的容器用于管理 Redis 消息的监听器。它可以注册一个或多个消息监听器并在接收到消息时调用相应的处理方法。该容器还负责管理连接到 Redis 的连接工厂以及监听的频道或模式。
主要特点和用法
管理 Redis 连接工厂确保连接的创建和关闭。注册消息监听器并在接收到消息时调用相应的处理方法。支持基于频道和基于模式的发布/订阅模式。提供了灵活的配置选项如消息转换器、错误处理器等。
总体而言MessageListenerAdapter 和 RedisMessageListenerContainer 是一对重要的组件它们使得在 Spring 应用中实现 Redis 消息监听变得更加简单和灵活。