潘志宝
2024-11-21 d338b50afd6504a9676f0a26b3ecbcc844483e7c
提交 | 用户 | 时间
e7c126 1 package com.iailab.framework.websocket.core.sender.rocketmq;
H 2
3 import com.iailab.framework.websocket.core.sender.AbstractWebSocketMessageSender;
4 import com.iailab.framework.websocket.core.sender.WebSocketMessageSender;
5 import com.iailab.framework.websocket.core.session.WebSocketSessionManager;
6 import lombok.extern.slf4j.Slf4j;
7 import org.apache.rocketmq.spring.core.RocketMQTemplate;
8
9 /**
10  * 基于 RocketMQ 的 {@link WebSocketMessageSender} 实现类
11  *
12  * @author iailab
13  */
14 @Slf4j
15 public class RocketMQWebSocketMessageSender extends AbstractWebSocketMessageSender {
16
17     private final RocketMQTemplate rocketMQTemplate;
18
19     private final String topic;
20
21     public RocketMQWebSocketMessageSender(WebSocketSessionManager sessionManager,
22                                           RocketMQTemplate rocketMQTemplate,
23                                           String topic) {
24         super(sessionManager);
25         this.rocketMQTemplate = rocketMQTemplate;
26         this.topic = topic;
27     }
28
29     @Override
30     public void send(Integer userType, Long userId, String messageType, String messageContent) {
31         sendRocketMQMessage(null, userId, userType, messageType, messageContent);
32     }
33
34     @Override
35     public void send(Integer userType, String messageType, String messageContent) {
36         sendRocketMQMessage(null, null, userType, messageType, messageContent);
37     }
38
39     @Override
40     public void send(String sessionId, String messageType, String messageContent) {
41         sendRocketMQMessage(sessionId, null, null, messageType, messageContent);
42     }
43
44     /**
45      * 通过 RocketMQ 广播消息
46      *
47      * @param sessionId Session 编号
48      * @param userId 用户编号
49      * @param userType 用户类型
50      * @param messageType 消息类型
51      * @param messageContent 消息内容
52      */
53     private void sendRocketMQMessage(String sessionId, Long userId, Integer userType,
54                                      String messageType, String messageContent) {
55         RocketMQWebSocketMessage mqMessage = new RocketMQWebSocketMessage()
56                 .setSessionId(sessionId).setUserId(userId).setUserType(userType)
57                 .setMessageType(messageType).setMessageContent(messageContent);
58         rocketMQTemplate.syncSend(topic, mqMessage);
59     }
60
61 }