试客网站 源码,网络营销专业主要学什么,wordpress dropship,企业概况的内容这一节#xff0c;我们学习用户消息是如何发送的。
消息的分类
spring websocket将消息分为两种#xff0c;一种是给指定的用户发送#xff08;用户消息#xff09;#xff0c;一种是广播消息#xff0c;即给所有用户发送消息。那怎么区分这两种消息呢?那就是用前缀了…这一节我们学习用户消息是如何发送的。
消息的分类
spring websocket将消息分为两种一种是给指定的用户发送用户消息一种是广播消息即给所有用户发送消息。那怎么区分这两种消息呢?那就是用前缀了。
用户消息的前缀
不配置的情况下默认用户消息的前缀是/user也可以通过下面的方式来配置用户消息
Overridepublic void configureMessageBroker(MessageBrokerRegistry registry) {/*** stompClient.subscribe(/user/topic/subNewMsg,...)* 这个时候后端推送消息应该这么写* msgOperations.convertAndSendToUser(username, /topic/subNewMsg, msg);* 即去掉了/user前缀*/registry.setUserDestinationPrefix(WsConstants.USER_DESTINATION_PREFIX);}默认情况下/user是用户消息前缀那么前端订阅的代码可以这么写 //订阅用户消息topic1stompClient.subscribe(/user/topic/answer, function (response) {//do something});后端的发送消息的代码可以这么写注意在这里发送的时候调用的convertAndSendToUser没有带/user前缀 private final SimpMessageSendingOperations msgOperations;public void echo(Principal principal, Msg msg) {msgOperations.convertAndSendToUser(username, /topic/answer, msg);}广播消息的前缀
广播消息没有默认值必须显示地指定配置广播消息的前缀是这么配置通过/topic或者/queue前缀来订阅的就是广播消息
Overridepublic void configureMessageBroker(MessageBrokerRegistry registry) {registry.enableSimpleBroker(/topic, /queue)//配置stomp协议里, server返回的心跳.setHeartbeatValue(new long[]{10000L, 10000L})//配置发送心跳的scheduler.setTaskScheduler(new DefaultManagedTaskScheduler());}前端代码可以这么写
//订阅广播消息topicstompClient.subscribe(/topic/boardCast/hello, function (response) {// do something});后端代码可以这么写 private final SimpMessageSendingOperations msgOperations;public void echo2(Msg msg) {log.info(收到的消息为:{}, msg.getContent());msgOperations.convertAndSend(/topic/boardCast/hello, hello boardCast Message);}发送用户消息源码分析
用户订阅过程
发送消息本质上就是从内存中找到注册的用户通过用户名找到用户会话在从用户会话中找到该用户的订阅如果该用户有该订阅那么就发送消息给前端。
总结一下用户和会话之间的关系如下图 如果这块不太熟悉建议回顾这篇文章了解一下用户用户会话订阅之间的关系【stomp 实战】Spring websocket 用户订阅和会话的管理源码分析
我们通过Debug来看一下前端执行用户订阅经历了哪些过程。 假设当前登录用户是1001 stompClient.subscribe(/user/topic/answer, function (response) {//do something});该用户建立连接并且绑定1001的用户会话后执行后端的订阅注册 DefaultSimpUserRegistry响应订阅事件代码如下 可以看到当前的sessionIddestination 将订阅放到一个subscriptions的map里面。缓存在内存中。
用户消息的发送
后端代码是这么写的我们来调试一下 private final SimpMessageSendingOperations msgOperations;public void echo(Principal principal, Msg msg) {msgOperations.convertAndSendToUser(username, /topic/answer, msg);}经过层层调用发现调到了下面的方法 发现我们的发送目的地变成了这个this.destinationPrefix user destination 通过调试时发现值如上图所示。 也就是说我们的发送目的变成了/user用户名我们传的入参/topic/answer 然后再进入下面的代码
//AbstractMessageSendingTemplateOverridepublic void convertAndSend(D destination, Object payload, Nullable MapString, Object headers,Nullable MessagePostProcessor postProcessor) throws MessagingException {//对消息进行转换对象转字符串或者字节数组之类的Message? message doConvert(payload, headers, postProcessor);//调用Send发送send(destination, message);}做了两个事
对消息进行转换对象转字符串或者字节数组之类的调用Send发送
再来看下send方法 Overridepublic void send(D destination, Message? message) {doSend(destination, message);}再调用doSend,由子类SimpMessagingTemplate实现。
//SimpMessagingTemplateOverrideprotected void doSend(String destination, Message? message) {Assert.notNull(destination, Destination must not be null);SimpMessageHeaderAccessor simpAccessor MessageHeaderAccessor.getAccessor(message, SimpMessageHeaderAccessor.class);if (simpAccessor ! null) {if (simpAccessor.isMutable()) {simpAccessor.setDestination(destination);simpAccessor.setMessageTypeIfNotSet(SimpMessageType.MESSAGE);simpAccessor.setImmutable();sendInternal(message);return;}else {// Try and keep the original accessor typesimpAccessor (SimpMessageHeaderAccessor) MessageHeaderAccessor.getMutableAccessor(message);initHeaders(simpAccessor);}}else {simpAccessor SimpMessageHeaderAccessor.wrap(message);initHeaders(simpAccessor);}simpAccessor.setDestination(destination);simpAccessor.setMessageTypeIfNotSet(SimpMessageType.MESSAGE);message MessageBuilder.createMessage(message.getPayload(), simpAccessor.getMessageHeaders());sendInternal(message);}
其中最关键的是sendInternal
private void sendInternal(Message? message) {String destination SimpMessageHeaderAccessor.getDestination(message.getHeaders());Assert.notNull(destination, Destination header required);long timeout this.sendTimeout;boolean sent (timeout 0 ? this.messageChannel.send(message, timeout) : this.messageChannel.send(message));if (!sent) {throw new MessageDeliveryException(message,Failed to send message to destination destination within timeout: timeout);}}然后再通过messageChannel来发送此条消息。
//AbstractMessageChannelOverridepublic final boolean send(Message? message, long timeout) {Assert.notNull(message, Message must not be null);Message? messageToUse message;ChannelInterceptorChain chain new ChannelInterceptorChain();boolean sent false;try {messageToUse chain.applyPreSend(messageToUse, this);if (messageToUse null) {return false;}sent sendInternal(messageToUse, timeout);chain.applyPostSend(messageToUse, this, sent);chain.triggerAfterSendCompletion(messageToUse, this, sent, null);return sent;}catch (Exception ex) {chain.triggerAfterSendCompletion(messageToUse, this, sent, ex);if (ex instanceof MessagingException) {throw (MessagingException) ex;}throw new MessageDeliveryException(messageToUse,Failed to send message to this, ex);}catch (Throwable err) {MessageDeliveryException ex2 new MessageDeliveryException(messageToUse, Failed to send message to this, err);chain.triggerAfterSendCompletion(messageToUse, this, sent, ex2);throw ex2;}}构造了一个拦截链在发送前可以进行前置处理和后置处理。这个拦截链就是扩展的关键了。我们可以定义自己的拦截器在发送消息前后进行拦截处理。这里spring给我们的扩展点。通过sendInternal将消息发送出去
再来看下sendInternal方法,进入子类ExecutorSubscribableChannel
//ExecutorSubscribableChannelOverridepublic boolean sendInternal(Message? message, long timeout) {for (MessageHandler handler : getSubscribers()) {SendTask sendTask new SendTask(message, handler);if (this.executor null) {sendTask.run();}else {this.executor.execute(sendTask);}}return true;}可以看到通过这个Channel找到messageHandler这个messageHandler有多个依次将消息进行处理。 这里取到的有两个messageHandler
SimpleBrokerMessageHandlerUserDestinationMessageHandler
进入SendTask看一下run方法
//
public void run() {Message? message this.inputMessage;try {message applyBeforeHandle(message);if (message null) {return;}this.messageHandler.handleMessage(message);triggerAfterMessageHandled(message, null);}catch (Exception ex) {triggerAfterMessageHandled(message, ex);if (ex instanceof MessagingException) {throw (MessagingException) ex;}String description Failed to handle message to this in this.messageHandler;throw new MessageDeliveryException(message, description, ex);}catch (Throwable err) {String description Failed to handle message to this in this.messageHandler;MessageDeliveryException ex2 new MessageDeliveryException(message, description, err);triggerAfterMessageHandled(message, ex2);throw ex2;}
}这里的关键点是this.messageHandler.handleMessage(message); 首先会进入SimpleBrokerMessageHandler的handleMessage 可以看到这里直接跳出去了。 SimpleBrokerMessageHandler的作用就是看是不是我们配置的广播消息的前缀要满足这个条件才能发送消息。我们配置的前缀是/topic,/queue这里destination前缀是/user所以提前返回不处理。 然后我们还有一个UserDestinationMessageHandler会继续处理。 这里对destination进行了处理发现生成了一个result对象这里解析出一个targetDestinations可以看到我们的destination变成了下面的样子 /topic/answer-usero2zuy4zg
这个的构成实际上就是把/user前缀去掉然后加上-user后面加上sessionId就是当前会话的id最后再以这个新生成的targetDestination将消息发送出去
这里的messagingTemplate就是SimpMessagingTemplate。又会回到上面分析的代码。
SimpMessagingTemplate调用messageChannel来发送消息messageChannel中会取得两个messageHandler来处理。 像不像递归调用 不过这一次由于我们的destination已经变成了/topic/answer-usero2zuy4zg。这时候在进入SimpleBrokerMessageHandler时情况就不一样了 由于destination变成了/topic开头的此时我们不会跳出去会找到用户-user后面跟了SessionId订阅将消息发送出去
可以看到我们找到了一个用户订阅。
其实是每个用户订阅时会将/user前缀去掉将用户的destination改写成了如下形式 /user/topic/hello-/topic/hello-user{sessionId} 所以经过UserDestinationMessageHandler处理改写后的destination可以通过destination找到用户会话将此消息发送出去。 到此我们的用户消息的发送就分析完了
总结
发送用户消息的整个过程如下:
SimpMessageSendingOperations.convertAndSendToUser接口发送用户消息这里不传/user前缀注意一下接着SimpMessagingTemplate进行消息的发送SimpMessagingTemplate会交由MessageChannelMessageChannel将会调用MessageHandler来处理消息有以下两个MessageHandler SimpleBrokerMessageHandlerUserDestinationMessageHandler 经过MessageHandler的处理destination由/user/topic/answer,变成了/topic/answer-usero2zuy4zg。改写后的destination可以找到用户会话将此消息发送出去