水果网站开发所需的成本,没有英文网站怎么做外贸,alexa排名软件,平面设计图100张其实并不是实现session共享#xff0c;而是通过redis的发布订阅#xff0c;让所有集群的服务器#xff0c;都让自己的session发送一下消息。比如说userId在第35台服务器上#xff0c; 有100台服务器#xff0c;那么第1台服务器收到消息#xff0c;需要通知userId#xf… 其实并不是实现session共享而是通过redis的发布订阅让所有集群的服务器都让自己的session发送一下消息。比如说userId在第35台服务器上 有100台服务器那么第1台服务器收到消息需要通知userId不是找到第35台服务器而是通知所有的服务器给userId发条消息其他99台服务器没有userId那就发送不成功 1、配置redis
package com.kakarote.crm.config;import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.kakarote.crm.constant.RedisConstants;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceClientConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.connection.lettuce.LettucePoolingClientConfiguration;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;import java.time.Duration;Configuration
public class CrmTemplateConfig {Value(${spring.redis.host})private String redisHost;Value(${spring.redis.port})private int redisPort;Value(${spring.redis.password})private String redisHasrdpwd;Value(${spring.redis.database})private Integer database;Bean(name crmRedisTemplate)public RedisTemplate redisTemplate() {RedisTemplateString, Object template new RedisTemplate();template.setConnectionFactory(connectionFactory(database, redisHost, redisPort, redisHasrdpwd));template.setKeySerializer(new StringRedisSerializer());template.setValueSerializer(new GenericJackson2JsonRedisSerializer());template.setHashKeySerializer(new StringRedisSerializer());template.setHashValueSerializer(new GenericJackson2JsonRedisSerializer());return template;}public RedisConnectionFactory connectionFactory(int database, String hostName, int port, String password) {RedisStandaloneConfiguration configuration new RedisStandaloneConfiguration();configuration.setHostName(hostName);configuration.setPort(port);if (StringUtils.isNotBlank(password)) {configuration.setPassword(password);}if (database ! 0) {configuration.setDatabase(database);}GenericObjectPoolConfig genericObjectPoolConfig new GenericObjectPoolConfig();genericObjectPoolConfig.setMaxIdle(10);genericObjectPoolConfig.setMinIdle(10);genericObjectPoolConfig.setMaxTotal(100);genericObjectPoolConfig.setMaxWaitMillis(3000);LettuceClientConfiguration clientConfig LettucePoolingClientConfiguration.builder().commandTimeout(Duration.ofMillis(8000)).poolConfig(genericObjectPoolConfig).build();LettuceConnectionFactory lettuce new LettuceConnectionFactory(configuration, clientConfig);lettuce.afterPropertiesSet();return lettuce;}/*** Redis消息监听器容器* 这个容器加载了RedisConnectionFactory和消息监听器* 可以添加多个监听不同话题的redis监听器只需要把消息监听器和相应的消息订阅处理器绑定该消息监听器* 通过反射技术调用消息订阅处理器的相关方法进行一些业务处理** return redis消息监听容器*/BeanSuppressWarnings(all)public RedisMessageListenerContainer container(RedisMessageListener listener) {RedisMessageListenerContainer container new RedisMessageListenerContainer();// 监听所有库的key过期事件container.setConnectionFactory(connectionFactory(database, redisHost, redisPort, redisHasrdpwd));// 所有的订阅消息都需要在这里进行注册绑定,new PatternTopic(TOPIC_NAME1)表示发布的主题信息// 可以添加多个 messageListener配置不同的通道container.addMessageListener(listener, new PatternTopic(RedisConstants.WEBSOCKET_REDIS_TOPIC));/*** 设置序列化对象* 特别注意1. 发布的时候需要设置序列化订阅方也需要设置序列化* 2. 设置序列化对象必须放在[加入消息监听器]这一步后面否则会导致接收器接收不到消息*/Jackson2JsonRedisSerializer seria new Jackson2JsonRedisSerializer(Object.class);ObjectMapper objectMapper new ObjectMapper();objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);seria.setObjectMapper(objectMapper);container.setTopicSerializer(seria);return container;}
}
2、配置RedisMessageListener
package com.kakarote.crm.config;import cn.hutool.json.JSONUtil;
import com.alibaba.fastjson.JSONObject;
import com.kakarote.crm.constant.CrmConst;
import com.kakarote.crm.entity.BO.MessageDto;
import com.kakarote.crm.websocket.TransferCallWebSocket;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;Slf4j
Component
public class RedisMessageListener implements MessageListener {Autowiredprivate RedisTemplateString, Object crmRedisTemplate;Overridepublic void onMessage(Message message, byte[] pattern) {// 接收的topiclog.info(RedisMessageListener-接收到消息1channel: new String(pattern));try {//序列化对象特别注意发布的时候需要设置序列化订阅方也需要设置序列化MessageDto messageDto (MessageDto) crmRedisTemplate.getValueSerializer().deserialize(message.getBody());log.info(RedisMessageListener-接收到消息2channel {}, messageDto {}, new String(pattern), messageDto);if(messageDto null){log.info(RedisMessageListener-messageDto null无消息进行发送! message {}, JSONUtil.toJsonStr(message));return;}if(CrmConst.NOTICE_MSG.equals(messageDto.getTitle())){JSONObject content messageDto.getContent();String toUserId content.getString(toUserId);String fromUserId content.getString(fromUserId);JSONObject msg content.getJSONObject(msg);String resp TransferCallWebSocket.sendMsgByUserId(fromUserId, toUserId, JSONUtil.toJsonStr(msg));if(!resp.equals(success)){log.info(RedisMessageListener-发送弹框消息,resp {},content {}, resp, content);}}}catch (Exception e){log.info(RedisMessageListener-监听消息处理失败失败原因 {}, e , e.getMessage(), e);}}
}
3、静态类
/*** description: 常量类* dateTime: 2021/6/17 16:21*/
public class RedisConstants {/*** UTF-8 字符集*/public static final String UTF8 UTF-8;public final static String WEBSOCKET_REDIS_TOPIC websocket_topic;public static final String TRANSFER_NOTICE transferCallNotice; public static final String NOTICE_MSG noticeMessage;
}4、消息体
package com.kakarote.crm.entity.BO;import com.alibaba.fastjson.JSONObject;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;import java.io.Serializable;AllArgsConstructor
NoArgsConstructor
Data
public class MessageDto implements Serializable {private String data;private String title;private JSONObject content;
}
5、业务类像通道发送消息 /*** 向通道发布消息*/public boolean convertAndSend(String channel, Object message) {if (StringUtil.isBlank(channel)) {return false;}try {crmRedisTemplate.convertAndSend(channel, message);log.info(发送消息成功channel{}message{}, channel, message);return true;} catch (Exception e) {log.info(发送消息失败channel{}message{}, 失败原因 {}, e , channel, message, e.getMessage(), e);e.printStackTrace();}return false;}6、websocket配置
Configuration
ComponentScan
EnableAutoConfiguration
public class WebSocketConfiguration implements ServletContextInitializer {Beanpublic ServerEndpointExporter serverEndpointExporter() {return new ServerEndpointExporter();}Beanpublic TaskScheduler taskScheduler(){ThreadPoolTaskScheduler taskScheduler new ThreadPoolTaskScheduler();taskScheduler.setPoolSize(10);taskScheduler.initialize();return taskScheduler;}Overridepublic void onStartup(ServletContext servletContext) throws ServletException {servletContext.addListener(WebAppRootListener.class);servletContext.setInitParameter(org.apache.tomcat.websocket.textBufferSize,52428800);servletContext.setInitParameter(org.apache.tomcat.websocket.binaryBufferSize,52428800);}
}7、websocket Controller类
ServerEndpoint(/crmDzhWebsocket/transferWebsocket/{userId})
Component
Slf4j
public class TransferCallWebSocket {/*** 当前在线连接数*/private static AtomicInteger onlineCount new AtomicInteger(0);/*** 用来存放每个客户端对应的 WebSocketServer 对象*/private static final ConcurrentHashMapString, Session webSocketMap new ConcurrentHashMap();/*** 与某个客户端的连接会话需要通过它来给客户端发送数据*/private Session session;/*** 接收 userId*/private String userIdKey ;/*** 连接建立成功调用的方法*/OnOpenpublic void onOpen(Session session, PathParam(userId) String userId) {this.session session;this.userIdKey userId;if (webSocketMap.containsKey(userId)) {webSocketMap.remove(userId);webSocketMap.put(userId, session);} else {webSocketMap.put(userId, session);addOnlineCount();}log.info(转接通知用户连接: userId ,当前总在线人数为: getOnlineCount());try {sendMessage(success);} catch (IOException e) {log.error(转接通知用户: userId ,网络异常!!!!!!);log.info(转接通知用户连接: userId ,网络异常!!!!!!);}}/*** 连接关闭调用的方法*/OnClosepublic void onClose() {if (webSocketMap.containsKey(userIdKey)) {webSocketMap.remove(userIdKey);subOnlineCount();}log.info(转接通知用户退出: userIdKey ,当前总在线人数为: getOnlineCount());}/*** 收到客户端消息后调用的方法** param message 客户端发送过来的消息*/OnMessagepublic void onMessage(String message, Session session) {try {if (ping.equals(message)) {webSocketMap.get(this.userIdKey).getBasicRemote().sendText(pong);return;}log.info(this.userIdKey {}, message {}, this.userIdKey, message);} catch (IOException e) {log.error(转接通知发送消息失败失败原因 {}, e , e.getMessage(), e);e.printStackTrace();}}public static String sendMsgByUserId(String fromUserId, String toUserId, String msg) throws IOException {if(webSocketMap.get(toUserId) ! null){try {webSocketMap.get(toUserId).getBasicRemote().sendText(msg);return success;}catch (Exception e){log.error(发送消息失败,fromUserId {}, toUserId {}, fromUserId, toUserId);return e.getMessage();}}return userId: toUserId 当前不在会话中;}/*** 发生错误时调用** param session* param error*/OnErrorpublic void onError(Session session, Throwable error) {log.info(用户错误: session.getId() ,原因: error.getMessage());}/*** 实现服务器主动推送*/public void sendMessage(String message) throws IOException {this.session.getBasicRemote().sendText(message);}public static synchronized AtomicInteger getOnlineCount() {return onlineCount;}public static synchronized void addOnlineCount() {TransferCallWebSocket.onlineCount.getAndIncrement();}public static synchronized void subOnlineCount() {TransferCallWebSocket.onlineCount.getAndDecrement();}}