package com.iailab.framework.websocket.core.sender; import cn.hutool.core.collection.CollUtil; import cn.hutool.core.util.StrUtil; import com.iailab.framework.common.util.json.JsonUtils; import com.iailab.framework.websocket.core.message.JsonWebSocketMessage; import com.iailab.framework.websocket.core.session.WebSocketSessionManager; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.web.socket.TextMessage; import org.springframework.web.socket.WebSocketSession; import java.io.IOException; import java.util.Collection; import java.util.Collections; import java.util.List; /** * WebSocketMessageSender 实现类 * * @author iailab */ @Slf4j @RequiredArgsConstructor public abstract class AbstractWebSocketMessageSender implements WebSocketMessageSender { private final WebSocketSessionManager sessionManager; @Override public void send(Integer userType, Long userId, String messageType, String messageContent) { send(null, userType, userId, messageType, messageContent); } @Override public void send(Integer userType, String messageType, String messageContent) { send(null, userType, null, messageType, messageContent); } @Override public void send(String sessionId, String messageType, String messageContent) { send(sessionId, null, null, messageType, messageContent); } /** * 发送消息 * * @param sessionId Session 编号 * @param userType 用户类型 * @param userId 用户编号 * @param messageType 消息类型 * @param messageContent 消息内容 */ public void send(String sessionId, Integer userType, Long userId, String messageType, String messageContent) { // 1. 获得 Session 列表 List sessions = Collections.emptyList(); if (StrUtil.isNotEmpty(sessionId)) { WebSocketSession session = sessionManager.getSession(sessionId); if (session != null) { sessions = Collections.singletonList(session); } } else if (userType != null && userId != null) { sessions = (List) sessionManager.getSessionList(userType, userId); } else if (userType != null) { sessions = (List) sessionManager.getSessionList(userType); } if (CollUtil.isEmpty(sessions)) { log.info("[send][sessionId({}) userType({}) userId({}) messageType({}) messageContent({}) 未匹配到会话]", sessionId, userType, userId, messageType, messageContent); } // 2. 执行发送 doSend(sessions, messageType, messageContent); } /** * 发送消息的具体实现 * * @param sessions Session 列表 * @param messageType 消息类型 * @param messageContent 消息内容 */ public void doSend(Collection sessions, String messageType, String messageContent) { JsonWebSocketMessage message = new JsonWebSocketMessage().setType(messageType).setContent(messageContent); String payload = JsonUtils.toJsonString(message); // 关键,使用 JSON 序列化 sessions.forEach(session -> { // 1. 各种校验,保证 Session 可以被发送 if (session == null) { log.error("[doSend][session 为空, message({})]", message); return; } if (!session.isOpen()) { log.error("[doSend][session({}) 已关闭, message({})]", session.getId(), message); return; } // 2. 执行发送 try { session.sendMessage(new TextMessage(payload)); log.info("[doSend][session({}) 发送消息成功,message({})]", session.getId(), message); } catch (IOException ex) { log.error("[doSend][session({}) 发送消息失败,message({})]", session.getId(), message, ex); } }); } }