package com.iailab.framework.websocket.core.handler; import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.TypeUtil; import com.iailab.framework.common.util.json.JsonUtils; import com.iailab.framework.tenant.core.util.TenantUtils; import com.iailab.framework.websocket.core.listener.WebSocketMessageListener; import com.iailab.framework.websocket.core.message.JsonWebSocketMessage; import com.iailab.framework.websocket.core.util.WebSocketFrameworkUtils; import lombok.extern.slf4j.Slf4j; import org.springframework.web.socket.TextMessage; import org.springframework.web.socket.WebSocketHandler; import org.springframework.web.socket.WebSocketSession; import org.springframework.web.socket.handler.TextWebSocketHandler; import java.lang.reflect.Type; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.function.Consumer; /** * JSON 格式 {@link WebSocketHandler} 实现类 * * 基于 {@link JsonWebSocketMessage#getType()} 消息类型,调度到对应的 {@link WebSocketMessageListener} 监听器。 * * @author iailab */ @Slf4j public class JsonWebSocketMessageHandler extends TextWebSocketHandler { /** * type 与 WebSocketMessageListener 的映射 */ private final Map> listeners = new HashMap<>(); @SuppressWarnings({"rawtypes", "unchecked"}) public JsonWebSocketMessageHandler(List listenersList) { listenersList.forEach((Consumer) listener -> listeners.put(listener.getType(), listener)); } @Override protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception { // 1.1 空消息,跳过 if (message.getPayloadLength() == 0) { return; } // 1.2 ping 心跳消息,直接返回 pong 消息。 if (message.getPayloadLength() == 4 && Objects.equals(message.getPayload(), "ping")) { session.sendMessage(new TextMessage("pong")); return; } // 2.1 解析消息 try { JsonWebSocketMessage jsonMessage = JsonUtils.parseObject(message.getPayload(), JsonWebSocketMessage.class); if (jsonMessage == null) { log.error("[handleTextMessage][session({}) message({}) 解析为空]", session.getId(), message.getPayload()); return; } if (StrUtil.isEmpty(jsonMessage.getType())) { log.error("[handleTextMessage][session({}) message({}) 类型为空]", session.getId(), message.getPayload()); return; } // 2.2 获得对应的 WebSocketMessageListener WebSocketMessageListener messageListener = listeners.get(jsonMessage.getType()); if (messageListener == null) { log.error("[handleTextMessage][session({}) message({}) 监听器为空]", session.getId(), message.getPayload()); return; } // 2.3 处理消息 Type type = TypeUtil.getTypeArgument(messageListener.getClass(), 0); Object messageObj = JsonUtils.parseObject(jsonMessage.getContent(), type); Long tenantId = WebSocketFrameworkUtils.getTenantId(session); TenantUtils.execute(tenantId, () -> messageListener.onMessage(session, messageObj)); } catch (Throwable ex) { log.error("[handleTextMessage][session({}) message({}) 处理异常]", session.getId(), message.getPayload()); } } }