dengzedong
2025-01-03 c9e48bd2dff2b5766589024cf7264189b5f2a05c
提交 | 用户 | 时间
e7c126 1 package com.iailab.framework.websocket.core.handler;
H 2
3 import cn.hutool.core.util.StrUtil;
4 import cn.hutool.core.util.TypeUtil;
5 import com.iailab.framework.common.util.json.JsonUtils;
6 import com.iailab.framework.tenant.core.util.TenantUtils;
7 import com.iailab.framework.websocket.core.listener.WebSocketMessageListener;
8 import com.iailab.framework.websocket.core.message.JsonWebSocketMessage;
9 import com.iailab.framework.websocket.core.util.WebSocketFrameworkUtils;
10 import lombok.extern.slf4j.Slf4j;
11 import org.springframework.web.socket.TextMessage;
12 import org.springframework.web.socket.WebSocketHandler;
13 import org.springframework.web.socket.WebSocketSession;
14 import org.springframework.web.socket.handler.TextWebSocketHandler;
15
16 import java.lang.reflect.Type;
17 import java.util.HashMap;
18 import java.util.List;
19 import java.util.Map;
20 import java.util.Objects;
21 import java.util.function.Consumer;
22
23 /**
24  * JSON 格式 {@link WebSocketHandler} 实现类
25  *
26  * 基于 {@link JsonWebSocketMessage#getType()} 消息类型,调度到对应的 {@link WebSocketMessageListener} 监听器。
27  *
28  * @author iailab
29  */
30 @Slf4j
31 public class JsonWebSocketMessageHandler extends TextWebSocketHandler {
32
33     /**
34      * type 与 WebSocketMessageListener 的映射
35      */
36     private final Map<String, WebSocketMessageListener<Object>> listeners = new HashMap<>();
37
38     @SuppressWarnings({"rawtypes", "unchecked"})
39     public JsonWebSocketMessageHandler(List<? extends WebSocketMessageListener> listenersList) {
40         listenersList.forEach((Consumer<WebSocketMessageListener>)
41                 listener -> listeners.put(listener.getType(), listener));
42     }
43
44     @Override
45     protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
46         // 1.1 空消息,跳过
47         if (message.getPayloadLength() == 0) {
48             return;
49         }
50         // 1.2 ping 心跳消息,直接返回 pong 消息。
51         if (message.getPayloadLength() == 4 && Objects.equals(message.getPayload(), "ping")) {
52             session.sendMessage(new TextMessage("pong"));
53             return;
54         }
55
56         // 2.1 解析消息
57         try {
58             JsonWebSocketMessage jsonMessage = JsonUtils.parseObject(message.getPayload(), JsonWebSocketMessage.class);
59             if (jsonMessage == null) {
60                 log.error("[handleTextMessage][session({}) message({}) 解析为空]", session.getId(), message.getPayload());
61                 return;
62             }
63             if (StrUtil.isEmpty(jsonMessage.getType())) {
64                 log.error("[handleTextMessage][session({}) message({}) 类型为空]", session.getId(), message.getPayload());
65                 return;
66             }
67             // 2.2 获得对应的 WebSocketMessageListener
68             WebSocketMessageListener<Object> messageListener = listeners.get(jsonMessage.getType());
69             if (messageListener == null) {
70                 log.error("[handleTextMessage][session({}) message({}) 监听器为空]", session.getId(), message.getPayload());
71                 return;
72             }
73             // 2.3 处理消息
74             Type type = TypeUtil.getTypeArgument(messageListener.getClass(), 0);
75             Object messageObj = JsonUtils.parseObject(jsonMessage.getContent(), type);
76             Long tenantId = WebSocketFrameworkUtils.getTenantId(session);
77             TenantUtils.execute(tenantId, () -> messageListener.onMessage(session, messageObj));
78         } catch (Throwable ex) {
79             log.error("[handleTextMessage][session({}) message({}) 处理异常]", session.getId(), message.getPayload());
80         }
81     }
82
83 }