潘志宝
5 天以前 bab43330bf6f48bdb7bfb258611f51bb05ef56fe
提交 | 用户 | 时间
e7c126 1 package com.iailab.framework.websocket.core.sender.kafka;
H 2
3 import com.iailab.framework.websocket.core.sender.AbstractWebSocketMessageSender;
4 import com.iailab.framework.websocket.core.sender.WebSocketMessageSender;
5 import com.iailab.framework.websocket.core.session.WebSocketSessionManager;
6 import lombok.extern.slf4j.Slf4j;
7 import org.springframework.kafka.core.KafkaTemplate;
8
9 import java.util.concurrent.ExecutionException;
10
11 /**
12  * 基于 Kafka 的 {@link WebSocketMessageSender} 实现类
13  *
14  * @author iailab
15  */
16 @Slf4j
17 public class KafkaWebSocketMessageSender extends AbstractWebSocketMessageSender {
18
19     private final KafkaTemplate<Object, Object> kafkaTemplate;
20
21     private final String topic;
22
23     public KafkaWebSocketMessageSender(WebSocketSessionManager sessionManager,
24                                        KafkaTemplate<Object, Object> kafkaTemplate,
25                                        String topic) {
26         super(sessionManager);
27         this.kafkaTemplate = kafkaTemplate;
28         this.topic = topic;
29     }
30
31     @Override
32     public void send(Integer userType, Long userId, String messageType, String messageContent) {
33         sendKafkaMessage(null, userId, userType, messageType, messageContent);
34     }
35
36     @Override
37     public void send(Integer userType, String messageType, String messageContent) {
38         sendKafkaMessage(null, null, userType, messageType, messageContent);
39     }
40
41     @Override
42     public void send(String sessionId, String messageType, String messageContent) {
43         sendKafkaMessage(sessionId, null, null, messageType, messageContent);
44     }
45
46     /**
47      * 通过 Kafka 广播消息
48      *
49      * @param sessionId Session 编号
50      * @param userId 用户编号
51      * @param userType 用户类型
52      * @param messageType 消息类型
53      * @param messageContent 消息内容
54      */
55     private void sendKafkaMessage(String sessionId, Long userId, Integer userType,
56                                   String messageType, String messageContent) {
57         KafkaWebSocketMessage mqMessage = new KafkaWebSocketMessage()
58                 .setSessionId(sessionId).setUserId(userId).setUserType(userType)
59                 .setMessageType(messageType).setMessageContent(messageContent);
60         try {
61             kafkaTemplate.send(topic, mqMessage).get();
62         } catch (InterruptedException | ExecutionException e) {
63             log.error("[sendKafkaMessage][发送消息({}) 到 Kafka 失败]", mqMessage, e);
64         }
65     }
66
67 }