网站建设在线商城,莱芜金点子招工启事,个人微信小程序开发,北京市专业网站制作企业声明
本文提炼于个人练手项目#xff0c;其中的实现逻辑不一定标准#xff0c;实现思路没有参考权威的文档和教程#xff0c;仅为个人思考得出#xff0c;因此可能存在较多本人未考虑到的情况和漏洞#xff0c;因此仅供参考#xff0c;如果大家觉得有问题#xff0c;恳…声明
本文提炼于个人练手项目其中的实现逻辑不一定标准实现思路没有参考权威的文档和教程仅为个人思考得出因此可能存在较多本人未考虑到的情况和漏洞因此仅供参考如果大家觉得有问题恳请大家指出有问题的地方
如果对客户端的实现感兴趣可以转身查看【UniApp开发小程序】私聊功能uniapp界面实现 (买家、卖家 沟通商品信息)【后端基于若依管理系统开发】
聊天数据查询管理
数据库设计
【私信表】
Vo
package com.ruoyi.common.core.domain.vo;import lombok.Data;import java.util.Date;/*** Author dam* create 2023/8/22 21:39*/
Data
public class ChatUserVo {private Long userId;private String userAvatar;private String userName;private String userNickname;/*** 最后一条消息的内容*/private String lastChatContent;/*** 最后一次聊天的日期*/private Date lastChatDate;/*** 未读消息数量*/private Integer unReadChatNum;
}
Controller
其中两个方法较为重要介绍如下
listChatUserVo当用户进入消息界面的时候需要查询出最近聊天的用户其中还需要展示一些信息如ChatUserVo的属性listChat该方法用于查询对方最近和自己的私聊内容当用户查询了这些私聊内容默认用户已经看过了将这些私聊内容设置为已读状态
package com.shm.controller;import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import javax.servlet.http.HttpServletResponse;import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.ruoyi.common.core.domain.entity.Chat;
import com.ruoyi.common.core.domain.vo.ChatUserVo;
import com.shm.service.IChatService;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.ruoyi.common.annotation.Log;
import com.ruoyi.common.core.controller.BaseController;
import com.ruoyi.common.core.domain.AjaxResult;
import com.ruoyi.common.enums.BusinessType;
import com.ruoyi.common.utils.poi.ExcelUtil;
import com.ruoyi.common.core.page.TableDataInfo;/*** 聊天数据Controller** author dam* date 2023-08-19*/
RestController
RequestMapping(/market/chat)
Api
public class ChatController extends BaseController {Autowiredprivate IChatService chatService;/*** 查询聊天数据列表*/PreAuthorize(ss.hasPermi(market:chat:list))GetMapping(/list)public TableDataInfo list(Chat chat) {startPage();ListChat list chatService.list(new QueryWrapperChat(chat));return getDataTable(list);}/*** 查询最近和自己聊天的用户*/ApiOperation(listChatUserVo)PreAuthorize(ss.hasPermi(market:chat:list))GetMapping(/listChatUserVo)public TableDataInfo listChatUserVo() {startPage();String username getLoginUser().getUsername();ListChatUserVo list chatService.listChatUserVo(username);return getDataTable(list);}/*** 查询用户和自己最近的聊天信息*/ApiOperation(listUsersChatWithMe)PreAuthorize(ss.hasPermi(market:chat:list))GetMapping(/listChat/{toUsername})public TableDataInfo listChat(PathVariable(toUsername) String toUsername) {String curUsername getLoginUser().getUsername();startPage();ListChat list chatService.listChat(curUsername, toUsername);for (Chat chat : list) {System.out.println(chat:chat.toString());}System.out.println();// 查出的数据如果消息是对方发的且是未读状态重新设置为已读ListLong unReadIdList list.stream().filter((item1) - {if (item1.getIsRead() 0 item1.getFromWho().equals(toUsername)) {return true;} else {return false;}}).map(item2 - {return item2.getId();}).collect(Collectors.toList());System.out.println(将 unReadIdList.toString()设置为已读);if (unReadIdList.size() 0) {// 批量设置私聊为已读状态chatService.batchRead(unReadIdList);}return getDataTable(list);}/*** 导出聊天数据列表*/PreAuthorize(ss.hasPermi(market:chat:export))Log(title 聊天数据, businessType BusinessType.EXPORT)PostMapping(/export)public void export(HttpServletResponse response, Chat chat) {ListChat list chatService.list(new QueryWrapperChat(chat));ExcelUtilChat util new ExcelUtilChat(Chat.class);util.exportExcel(response, list, 聊天数据数据);}/*** 获取聊天数据详细信息*/PreAuthorize(ss.hasPermi(market:chat:query))GetMapping(value /getInfo/{id})public AjaxResult getInfo(PathVariable(id) Long id) {return success(chatService.getById(id));}/*** 新增聊天数据*/PreAuthorize(ss.hasPermi(market:chat:add))Log(title 聊天数据, businessType BusinessType.INSERT)PostMappingpublic AjaxResult add(RequestBody Chat chat) {return toAjax(chatService.save(chat));}/*** 修改聊天数据*/PreAuthorize(ss.hasPermi(market:chat:edit))Log(title 聊天数据, businessType BusinessType.UPDATE)PutMappingpublic AjaxResult edit(RequestBody Chat chat) {return toAjax(chatService.updateById(chat));}/*** 删除聊天数据*/PreAuthorize(ss.hasPermi(market:chat:remove))Log(title 聊天数据, businessType BusinessType.DELETE)DeleteMapping(/{ids})public AjaxResult remove(PathVariable ListLong ids) {return toAjax(chatService.removeByIds(ids));}
}
Service
package com.shm.service.impl;import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.ruoyi.common.core.domain.entity.Chat;
import com.ruoyi.common.core.domain.vo.ChatUserVo;
import com.shm.mapper.ChatMapper;
import com.shm.service.IChatService;
import org.springframework.stereotype.Service;import java.util.List;/*** author 17526* description 针对表【chat(聊天数据表)】的数据库操作Service实现* createDate 2023-08-19 21:12:49*/
Service
public class IChatServiceImpl extends ServiceImplChatMapper, Chatimplements IChatService {/*** 查询最近和自己聊天的用户** return*/Overridepublic ListChatUserVo listChatUserVo(String username) {return baseMapper.listChatUserVo(username);}/*** 查询用户和自己最近的聊天信息** param curUsername* param toUsername* return*/Overridepublic ListChat listChat(String curUsername, String toUsername) {return baseMapper.listChat(curUsername, toUsername);}Overridepublic void batchRead(ListLong unReadIdList) {baseMapper.batchRead(unReadIdList);}
}Mapper
package com.shm.mapper;import com.ruoyi.common.core.domain.entity.Chat;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.ruoyi.common.core.domain.vo.ChatUserVo;
import org.apache.ibatis.annotations.Param;import java.util.List;/**
* author 17526
* description 针对表【chat(聊天数据表)】的数据库操作Mapper
* createDate 2023-08-19 21:12:49
* Entity com.ruoyi.common.core.domain.entity.Chat
*/
public interface ChatMapper extends BaseMapperChat {ListChatUserVo listChatUserVo(Param(username) String username);ListChat listChat(Param(curUsername) String curUsername, Param(toUsername) String toUsername);void batchRead(Param(unReadIdList) ListLong unReadIdList);
}【xml文件】
?xml version1.0 encodingUTF-8?
!DOCTYPE mapperPUBLIC -//mybatis.org//DTD Mapper 3.0//ENhttp://mybatis.org/dtd/mybatis-3-mapper.dtd
mapper namespacecom.shm.mapper.ChatMapperresultMap idBaseResultMap typecom.ruoyi.common.core.domain.entity.Chatid propertyid columnid jdbcTypeBIGINT/result propertycreateTime columncreate_time jdbcTypeTIMESTAMP/result propertyupdateTime columnupdate_time jdbcTypeTIMESTAMP/result propertyisDeleted columnis_deleted jdbcTypeTINYINT/result propertyfromWho columnfrom_who jdbcTypeBIGINT/result propertytoWho columnto_who jdbcTypeBIGINT/result propertycontent columncontent jdbcTypeVARCHAR/result propertypicUrl columnpic_url jdbcTypeVARCHAR//resultMapsql idBase_Column_Listid,create_time,update_time,is_deleted,from,to,content,pic_url/sqlupdate idbatchReadupdate chat set is_read 1 where id inforeach collectionunReadIdList itemchatId separator, open( close)#{chatId}/foreach/updateselect idlistChatUserVo resultTypecom.ruoyi.common.core.domain.vo.ChatUserVoSELECT(CASE WHEN c.from_who#{username} THEN c.to_who ELSE c.from_who END) AS userName,c.content AS lastChatContent,c.create_time AS lastChatDate,u.user_id AS userId,u.avatar AS userAvatar,u.nick_name AS userNickname,ur.unReadNum as unReadChatNumFROM(SELECTMAX(id) AS chatId,CASEWHEN from_who #{username} THEN to_whoELSE from_whoEND AS unameFROM chatWHERE from_who #{username} OR to_who #{username}GROUP BY uname) AS tINNER JOIN chat c ON c.id t.chatIdLEFT JOIN sys_user u ON t.uname u.user_nameLEFT JOIN (SELECT from_who, SUM(CASE WHEN is_read1 THEN 0 ELSE 1 END) AS unReadNum FROM chat WHERE is_deleted0 AND to_who #{username} GROUP BY from_who) ur ON ur.from_who t.unameORDER BY c.create_time DESC/selectselect idlistChat resultTypecom.ruoyi.common.core.domain.entity.ChatSELECT*FROMchatWHERE( from_who #{curUsername} AND to_who #{toUsername} )OR ( to_who #{curUsername} AND from_who #{toUsername} )ORDER BYcreate_time DESC/select
/mapper
【查询最近聊天的用户的用户名和那条消息的id】 因为id是自增的所以最新的那条消息的id肯定最大因此可以使用MAX(id)来获取最近的消息
SELECT MAX(id) AS chatId,CASE WHEN from_who admin THEN to_whoELSE from_whoEND AS unameFROM chatWHERE from_who admin OR to_who adminGROUP BY uname【内连接私信表获取消息的其他信息】
INNER JOIN chat c ON c.id t.chatId 【左连接用户表获取用户的相关信息】
LEFT JOIN sys_user u ON t.uname u.user_name【左联接私信表获取未读对方消息的数量】 CASE WHEN is_read1 THEN 0 ELSE 1 END 如果已读说明未读数量为0否则为1
LEFT JOIN (SELECT from_who, SUM(CASE WHEN is_read1 THEN 0 ELSE 1 END) AS unReadNum FROM chat WHERE is_deleted0 AND to_who admin GROUP BY from_who) ur ON ur.from_who t.uname【最后按照用户和自己最后聊天的时间来降序排序】
ORDER BY c.create_time DESCWebSocket引入
为什么使用WebSocket
WebSocket不仅支持客户端向服务端发送消息同时也支持服务端向客户端发送消息这样才能完成私聊的功能。即 用户1--服务端--用户2
依赖
!-- websocket --
dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-websocket/artifactId
/dependency配置类
package com.shm.config;import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;Configuration
public class WebSocketConfig {/*** 注入一个ServerEndpointExporter,* 该Bean会自动注册使用ServerEndpoint注解 声明的websocket endpoint*/Beanpublic ServerEndpointExporter serverEndpointExporter() {return new ServerEndpointExporter();}}WebSocket服务
需要注意的是Websocket是多例模式无法直接使用Autowired注解来注入rabbitTemplate需要使用下面的方式其中rabbitTemplate为静态变量
private static RabbitTemplate rabbitTemplate;Autowiredpublic void setRabbitTemplate(RabbitTemplate rabbitTemplate) {WebSocketServer.rabbitTemplate rabbitTemplate;}package com.shm.component;import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.ruoyi.common.core.domain.entity.Chat;
import com.shm.component.delay.DelayQueueManager;
import com.shm.component.delay.DelayTask;
import com.shm.constant.RabbitMqConstant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;/*** author websocket服务*/
ServerEndpoint(value /websocket/{username})
Component//将WebSocketServer注册为spring的一个bean
public class WebSocketServer {private static final Logger log LoggerFactory.getLogger(WebSocketServer.class);/*** 记录当前在线连接的客户端的session*/public static final MapString, Session usernameAndSessionMap new ConcurrentHashMap();/*** 记录正在进行的聊天的发出者和接收者*/public static final MapString, Integer fromToMap new ConcurrentHashMap();/*** 用户Session保留时间如果超过该时间用户还没有给服务端发送消息认为用户下线删除其Session* 注意该时间需要比客户端的心跳时间更长*/private static final long expire 6000;// websocket为多例模式无法直接注入需要换成下面的方式
// Autowired
// RabbitTemplate rabbitTemplate;private static RabbitTemplate rabbitTemplate;Autowiredpublic void setRabbitTemplate(RabbitTemplate rabbitTemplate) {WebSocketServer.rabbitTemplate rabbitTemplate;}Autowiredprivate static DelayQueueManager delayQueueManager;Autowiredpublic void setDelayQueueManager(DelayQueueManager delayQueueManager) {WebSocketServer.delayQueueManager delayQueueManager;}/*** 浏览器和服务端连接建立成功之后会调用这个方法*/OnOpenpublic void onOpen(Session session, PathParam(username) String username) {usernameAndSessionMap.put(username, session);// 建立延时任务如果到expire时间客户端还是没有和服务器有任何交互的话就删除该用户的session表示该用户下线delayQueueManager.put(new DelayTask(username, expire));log.info(有新用户加入username{}, 当前在线人数为{}, username, usernameAndSessionMap.size());}/*** 连接关闭调用的方法*/OnClosepublic void onClose(Session session, PathParam(username) String username) {usernameAndSessionMap.remove(username);log.info(有一连接关闭移除username{}的用户session, 当前在线人数为{}, username, usernameAndSessionMap.size());}/*** 发生错误的时候会调用这个方法*/OnErrorpublic void onError(Session session, Throwable error) {log.error(发生错误);error.printStackTrace();}/*** 服务端发送消息给客户端*/public void sendMessage(String message, Session toSession) {try {log.info(服务端给客户端[{}]发送消息{}, toSession.getId(), message);toSession.getBasicRemote().sendText(message);} catch (Exception e) {log.error(服务端发送消息给客户端失败, e);}}/*** onMessage方法是一个消息的中转站* 1、首先接受浏览器端socket.send发送过来的json数据* 2、然后解析其数据找到消息要发送给谁* 3、最后将数据发送给相应的人** param message 客户端发送过来的消息 数据格式{from:user1,to:admin,text:你好呀}*/OnMessagepublic void onMessage(String message, Session session, PathParam(username) String username) {
// log.info(服务端接收到 {} 的消息消息内容是:{}, username, message);// 收到用户的信息删除之前的延时任务创建新的延时任务delayQueueManager.put(new DelayTask(username, expire));if (!usernameAndSessionMap.containsKey(username)) {// 可能用户挂机了一段时间被下线了后面又重新回来发信息了需要重新将用户和session添加字典中usernameAndSessionMap.put(username, session);}// 将json字符串转化为json对象JSONObject obj JSON.parseObject(message);String status (String) obj.get(status);// 获取消息的内容String text (String) obj.get(text);// 查看消息要发送给哪个用户String to (String) obj.get(to);String fromToKey username - to;String toFromKey to - username;if (status ! null) {if (status.equals(start)) {fromToMap.put(fromToKey, 1);} else if (status.equals(end)) {System.out.println(移除销毁的fromToKey: fromToKey);fromToMap.remove(fromToKey);} else if (status.equals(ping)) {// 更新用户对应的时间戳
// usernameAndTimeStampMap.put(username, System.currentTimeMillis());}} else {// 封装数据发送给消息队列Chat chat new Chat();chat.setFromWho(username);chat.setToWho(to);chat.setContent(text);chat.setIsRead(0);// chat.setPicUrl();// 根据to来获取相应的session然后通过session将消息内容转发给相应的用户Session toSession usernameAndSessionMap.get(to);if (toSession ! null) {JSONObject jsonObject new JSONObject();// 设置消息来源的用户名jsonObject.put(from, username);// 设置消息内容jsonObject.put(text, text);// 服务端发送消息给目标客户端this.sendMessage(jsonObject.toString(), toSession);log.info(发送消息给用户 {} 消息内容是{} , toSession, jsonObject.toString());if (fromToMap.containsKey(toFromKey)) {chat.setIsRead(1);}} else {log.info(发送失败未找到用户 {} 的session, to);}rabbitTemplate.convertAndSend(RabbitMqConstant.CHAT_STORAGE_EXCHANGE, RabbitMqConstant.CHAT_STORAGE_ROUTER_KEY, chat);}}}RabbitMQ引入
为什么使用消息队列
在用户之间进行聊天的时候需要将用户的聊天数据存储到数据库中但是如果大量用户同时在线的话可能同一时间发送的消息数量太多如果同时将这些消息存储到数据库中会给数据库带来较大的压力使用RabbitMQ可以先把要存储的数据放到消息队列然后数据库服务器压力没这么大的时候就会从消息队列中获取数据来存储这样可以分散数据库的压力。但是如果用户是直接从数据库获取消息的话消息可能有一定的延迟如果用户之间正在聊天的话消息则不会延迟因为聊天内容会立刻通过WebSocket发送给对方。
依赖
!-- rabbitMQ--
dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId
/dependency启动类添加注解
在启动类的上方添加EnableRabbit注解
常量类
因为有多处会使用到队列命名等信息创建一个常量类来保存相关信息
package com.shm.constant;public class RabbitMqConstant {public static final String CHAT_STORAGE_QUEUE shm.chat-storage.queue;public static final String CHAT_STORAGE_EXCHANGE shm.chat-storage-event-exchange;public static final String CHAT_STORAGE_ROUTER_KEY shm.chat-storage.register;
}
使用配置类创建队列、交换机、绑定关系
package com.shm.config;import com.shm.constant.RabbitMqConstant;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;Configuration
public class MyRabbitConfig {/*** 使用JSON序列化机制进行消息转换* return*/Beanpublic MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();}/*** 私信存储队列** return*/Beanpublic Queue chatStorageQueue() {Queue queue new Queue(RabbitMqConstant.CHAT_STORAGE_QUEUE, true, false, false);return queue;}/*** 私信存储交换机* 创建交换机由于只需要一个队列创建direct交换机** return*/Beanpublic Exchange chatStorageExchange() {//durable持久化return new DirectExchange(RabbitMqConstant.CHAT_STORAGE_EXCHANGE, true, false);}/*** 创建私信存储 交换机和队列的绑定关系** return*/Beanpublic Binding chatStorageBinding() {return new Binding(RabbitMqConstant.CHAT_STORAGE_QUEUE,Binding.DestinationType.QUEUE,RabbitMqConstant.CHAT_STORAGE_EXCHANGE,RabbitMqConstant.CHAT_STORAGE_ROUTER_KEY,null);}}消息监听器
创建一个消息监听类来监听队列的消息然后调用相关的逻辑来处理信息本文主要的处理是将私信内容存储到数据库中
package com.shm.listener;import com.rabbitmq.client.Channel;
import com.ruoyi.common.core.domain.entity.Chat;
import com.shm.constant.RabbitMqConstant;
import com.shm.service.IChatService;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.io.IOException;Service
/*** 注意类上面需要RabbitListener注解*/
RabbitListener(queues RabbitMqConstant.CHAT_STORAGE_QUEUE)
public class ChatStorageListener {Autowiredprivate IChatService chatService;RabbitHandlerpublic void handleStockLockedRelease(Chat chat, Message message, Channel channel) throws IOException {try {boolean save chatService.save(chat);//解锁成功手动确认消息才从MQ中删除channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {//只要有异常拒绝消息让消息重新返回队列让别的消费者继续解锁channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);}}}
发送消息到消息队列
WebSocketServer为Websocket后端服务代码其中的onMessage方法会接受客户端发送过来的消息当接收到消息的时候将消息发送给消息队列
// 封装数据发送给消息队列
Chat chat new Chat();
chat.setFromWho(username);
chat.setToWho(to);
chat.setContent(text);
chat.setPicUrl();
rabbitTemplate.convertAndSend(RabbitMqConstant.CHAT_STORAGE_EXCHANGE,RabbitMqConstant.CHAT_STORAGE_ROUTER_KEY,chat);延时任务
为什么使用延时任务
为了更好地感知用户的在线状态在用户连接了WebSocket或者发送消息之后建立一个延时任务如果到达了所设定的延时时间就删除用户的Session认为用户已经下线如果在延时期间之内用户发送了新消息或者发送了心跳信号证明该用户还处于在线状态删除前面的延时任务并创建新的延时任务
延时任务类
package com.shm.component.delay;import lombok.Data;
import lombok.Getter;import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;/*** Author dam* create 2023/8/25 15:12*/
Getter
public class DelayTask implements Delayed {/*** 用户名*/private final String userName;/*** 任务的真正执行时间*/private final long executeTime;/*** 任务延时多久执行*/private final long expire;/*** param expire 任务需要延时的时间*/public DelayTask(String userName, long expire) {this.userName userName;this.executeTime expire System.currentTimeMillis();this.expire expire;}/*** 根据给定的时间单位返回与此对象关联的剩余延迟时间* * param unit the time unit 时间单位* return 返回剩余延迟零值或负值表示延迟已经过去*/Overridepublic long getDelay(TimeUnit unit) {return unit.convert(this.executeTime - System.currentTimeMillis(), unit);}Overridepublic int compareTo(Delayed o) {return 0;}
}
延时任务管理
package com.shm.component.delay;import com.shm.component.WebSocketServer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Executors;/*** Author dam* create 2023/8/25 15:12*/
Component
Slf4j
public class DelayQueueManager implements CommandLineRunner {private final DelayQueueDelayTask delayQueue new DelayQueue();private final MapString, DelayTask usernameAndDelayTaskMap new ConcurrentHashMap();/*** 加入到延时队列中** param task*/public void put(DelayTask task) {// 因为一个用户只能对应一个延时任务所以如果已经存在了延时任务将其进行删除if (usernameAndDelayTaskMap.containsKey(task.getUserName())) {this.remove(task.getUserName());}delayQueue.put(task);usernameAndDelayTaskMap.put(task.getUserName(), task);}/*** 取消延时任务** param username 要删除的任务的用户名* return*/public boolean remove(String username) {DelayTask remove usernameAndDelayTaskMap.remove(username);return delayQueue.remove(remove);}Overridepublic void run(String... args) throws Exception {this.executeThread();}/*** 延时任务执行线程*/private void executeThread() {while (true) {try {DelayTask task delayQueue.take();//执行任务processTask(task);} catch (InterruptedException e) {break;}}}/*** 执行延时任务** param task*/private void processTask(DelayTask task) {// 删除该用户的session表示用户下线WebSocketServer.usernameAndSessionMap.remove(task.getUserName());log.error(执行定时任务{}下线, task.getUserName());}}