1. webSocket实现客户端-服务端互发消息的底层原理
在Spring中,WebSocket是一种全双工的通信协议,允许服务器和客户端实时双向传输数据。WebSocket非常适合需要实时更新的场景,比如即时聊天、通知系统、实时数据推送等。
核心理解
消息推送给某个客户端
通过唯一的标识(通常是用户ID或Session ID)来确定目标客户端。 当然,因为用户可能更换浏览器或者设备,因此一般都使用用户id作为唯一标识来识别不同的客户端,当然需要维护用户id和webSocketSession的映射关系,因为互发消息的本质是通过webSocketSession进行操作的。消息推送给一批客户端
可以根据组、标签或特定条件筛选出一批客户端,并将消息广播给这些客户端。 此处筛选出来的客户端,本质上是一堆webSocketSession对象,对于服务器来讲,一个webSocket客户端就是一个webSocketSession对象。是否基于Session操作
Spring WebSocket的默认实现是通过Session
(或者StompSession
)来管理客户端连接。你可以通过WebSocket会话ID来操作特定的连接。
实现步骤:Spring WebSocket整合案例
1. 引入依赖
在pom.xml
中添加Spring WebSocket的依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
2. 配置WebSocket
定义一个WebSocket配置类来注册WebSocket端点。
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
// 配置消息代理,用于将消息广播到客户端
config.enableSimpleBroker("/topic", "/queue"); // 广播路径前缀
config.setApplicationDestinationPrefixes("/app"); // 客户端向服务器发送消息的路径前缀
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
// 注册一个WebSocket端点,客户端通过此端点连接到服务器
registry.addEndpoint("/ws").setAllowedOrigins("*").withSockJS(); // 兼容SockJS
}
}
3. 控制层处理消息
定义一个控制器,处理客户端发送的消息以及服务器主动推送的消息。
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Controller;
@Controller
public class WebSocketController {
private final SimpMessagingTemplate messagingTemplate;
public WebSocketController(SimpMessagingTemplate messagingTemplate) {
this.messagingTemplate = messagingTemplate;
}
// 处理客户端发送到 /app/message 的消息,并广播到 /topic/public
@MessageMapping("/message")
@SendTo("/topic/public")
public String broadcastMessage(String message) {
return "服务器接收到:" + message;
}
// 主动推送消息给某个客户端
public void sendMessageToClient(String sessionId, String message) {
messagingTemplate.convertAndSendToUser(sessionId, "/queue/private", message);
}
// 主动推送消息给所有客户端
public void sendMessageToAll(String message) {
messagingTemplate.convertAndSend("/topic/public", message);
}
}
4. 前端连接与测试
前端可以使用SockJS
和Stomp.js
来建立连接和处理消息。
<!DOCTYPE html>
<html>
<head>
<title>WebSocket Example</title>
<script src="https://cdnjs.cloudflare.com/ajax/libs/sockjs-client/1.5.0/sockjs.min.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/stomp.js/2.3.3/stomp.min.js"></script>
</head>
<body>
<h1>WebSocket Example</h1>
<input type="text" id="message" placeholder="Enter message" />
<button onclick="sendMessage()">Send</button>
<div id="messages"></div>
<script>
let stompClient;
function connect() {
const socket = new SockJS('/ws');
stompClient = Stomp.over(socket);
stompClient.connect({}, () => {
console.log('Connected');
stompClient.subscribe('/topic/public', (message) => {
document.getElementById('messages').innerHTML += `<p>${message.body}</p>`;
});
});
}
function sendMessage() {
const message = document.getElementById('message').value;
stompClient.send('/app/message', {}, message);
}
connect();
</script>
</body>
</html>
关键点详解
- 单客户端消息推送
使用SimpMessagingTemplate
的convertAndSendToUser
方法,通过用户的sessionId
进行精准推送。messagingTemplate.convertAndSendToUser(sessionId, "/queue/private", "单用户消息内容");
- 群体消息推送
使用convertAndSend
将消息广播到特定主题路径:messagingTemplate.convertAndSend("/topic/public", "广播消息内容");
- 区分用户会话
如果需要用户标识,Spring提供Principal
来标记每个连接的用户ID。
测试步骤
- 启动Spring Boot应用。
- 打开前端页面,输入消息并发送。
- 观察页面中是否实时显示广播的消息。
- 通过服务端主动调用
sendMessageToClient
或sendMessageToAll
方法,推送测试消息到客户端。
扩展
自定义用户身份认证
可在WebSocket连接时使用HandshakeInterceptor
实现用户认证逻辑。持久化连接
对于重要的实时消息,可以结合消息队列(如RabbitMQ)实现消息持久化。集群环境
在多实例情况下,可以通过Redis或其他共享存储来管理WebSocket连接。
2. webSocket服务端实现消息的广播
在Spring WebSocket中,广播操作是将一条消息发送给符合某些条件的一组客户端。理解广播的关键在于消息的目标路径和客户端订阅的路径是否匹配,以及如何使用消息代理(Message Broker)管理这些消息。
广播的原理
订阅机制
WebSocket通信中,客户端通常需要先订阅某个主题(topic),然后服务器发送到该主题的消息会自动推送给所有订阅了这个主题的客户端。路径匹配
广播的关键是路径匹配。服务器发送的消息目标路径(destination)和客户端订阅路径(subscription path)匹配时,消息会被发送到客户端。消息代理
Spring WebSocket通常使用内置的消息代理(如简单代理enableSimpleBroker
)或外部代理(如RabbitMQ、ActiveMQ)来管理广播消息。消息代理负责将消息路由到订阅了该路径的所有客户端。
如何确定哪些客户端接收到消息
客户端订阅的主题
- 每个客户端在连接到WebSocket服务端后,会通过
subscribe
方法订阅一个或多个主题路径。 - 订阅路径如:
/topic/public
。
服务端广播的路径
- 服务端发送消息时,指定消息的目标路径。
- 例如:广播到
/topic/public
。
路径匹配逻辑
- 如果客户端订阅的路径与服务端发送消息的路径一致,客户端会接收到消息。
- 路径可以包含动态部分(如
/topic/user/{id}
),允许灵活控制接收范围。
动态广播场景
- 服务端可根据条件动态决定消息目标路径。
- 例如,发送到特定组的路径:
/topic/group/{groupId}
。
广播的关键配置
在Spring WebSocket中,广播的核心在于配置消息代理和路径规则。
配置消息代理
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableSimpleBroker("/topic", "/queue"); // 配置广播前缀
config.setApplicationDestinationPrefixes("/app"); // 应用消息前缀
}
/topic
:广播路径前缀,适合群体消息推送。/queue
:点对点路径前缀,适合单用户消息推送。/app
:客户端发送给服务器的路径前缀。
广播操作流程
- 客户端订阅主题 客户端通过
subscribe
订阅路径:stompClient.subscribe('/topic/public', (message) => { console.log('收到广播消息:', message.body); });
- 服务端发送广播消息 服务端通过
SimpMessagingTemplate
发送消息到目标路径:messagingTemplate.convertAndSend("/topic/public", "广播消息内容");
- 消息路由到订阅的客户端 如果客户端已订阅
/topic/public
,会接收到这条消息。
广播操作的实现案例
服务端代码
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Controller;
@Controller
public class BroadcastController {
private final SimpMessagingTemplate messagingTemplate;
public BroadcastController(SimpMessagingTemplate messagingTemplate) {
this.messagingTemplate = messagingTemplate;
}
// 广播消息到公共主题
public void broadcastToAll(String message) {
messagingTemplate.convertAndSend("/topic/public", message);
}
// 广播消息到某个组
public void broadcastToGroup(String groupId, String message) {
messagingTemplate.convertAndSend("/topic/group/" + groupId, message);
}
}
客户端代码
客户端使用SockJS
和Stomp.js
与服务器通信:
const socket = new SockJS('/ws');
const stompClient = Stomp.over(socket);
stompClient.connect({}, () => {
console.log('连接成功!');
// 订阅公共广播主题
stompClient.subscribe('/topic/public', (message) => {
console.log('公共广播消息:', message.body);
});
// 订阅组广播主题
stompClient.subscribe('/topic/group/123', (message) => {
console.log('组广播消息:', message.body);
});
});
广播的高级用法
1. 分组广播
通过路径区分组:
- 组1:
/topic/group/1
- 组2:
/topic/group/2
服务端发送到特定组:
messagingTemplate.convertAndSend("/topic/group/1", "组1的消息");
2. 条件广播
根据逻辑动态决定广播的路径:
if (condition) {
messagingTemplate.convertAndSend("/topic/condition/true", "满足条件的消息");
} else {
messagingTemplate.convertAndSend("/topic/condition/false", "不满足条件的消息");
}
3. 订阅匹配规则
使用动态路径:
stompClient.subscribe('/topic/user/123', (message) => {
console.log('用户123的消息:', message.body);
});
服务端广播到特定用户:
messagingTemplate.convertAndSend("/topic/user/123", "针对用户123的消息");
总结
- 广播的核心是通过路径匹配客户端订阅和服务端目标路径实现消息分发。
- 消息路由由消息代理负责,它根据路径将消息推送到所有匹配的客户端。
- 客户端和服务端的交互路径需要明确定义,以确保消息能正确分发。
- 分组广播和动态路径可以实现更加灵活的广播策略。
广播适合多用户实时数据更新的场景,熟悉路径配置和匹配机制是掌握WebSocket广播操作的关键。
3. SimpMessagingTemplate和webSocketSession
在实际项目中,通过 WebSocketSession
发送 WebSocket 消息是一种较底层的操作方式,相对于基于 SimpMessagingTemplate
的高层次实现,它允许开发者直接与 WebSocket 会话交互。这种方式常用于需要直接控制会话或处理高级需求的场景,例如点对点通信或精确推送。
使用 WebSocketSession
发送消息的原理
会话管理
每个客户端连接成功后,服务器端会生成一个唯一的WebSocketSession
对象,通过这个会话可以直接向特定客户端发送消息。 也就是说,实现消息发送的前提是webSocket客户端必须成功和服务器建立webSocket连接后,服务端生成一个webSocketSession对象之后才可以实现消息互发。消息发送
使用WebSocketSession
的sendMessage
方法,可以向客户端推送文本消息或二进制消息。 当然,当客户端主动向服务端进行消息发送时,只需要调用webSocket服务器的api接口,当作http请求一样调用即可。会话存储
为了向特定客户端发送消息,服务器需要保存所有连接的WebSocketSession
,通常会使用一个线程安全的容器来管理这些会话。 这也是webSocket能够实现服务器主动向客户端发送消息的关键,因为所有与服务器建立的会话对象webSocketSession都被服务器缓存起来了,进而才能通过webSocketSession进行消息的推送,推送到指定的客户端。
具体实现步骤
1. 配置 WebSocket 处理器
实现 WebSocketHandler
来管理 WebSocket 会话:
import org.springframework.web.socket.*;
import org.springframework.web.socket.handler.TextWebSocketHandler;
import java.util.concurrent.ConcurrentHashMap;
public class CustomWebSocketHandler extends TextWebSocketHandler {
// 用于存储所有连接的客户端会话
private static final ConcurrentHashMap<String, WebSocketSession> sessions = new ConcurrentHashMap<>();
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
// 在连接建立时保存会话,使用 sessionId 作为键
sessions.put(session.getId(), session);
System.out.println("客户端连接成功,SessionID: " + session.getId());
}
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
// 接收客户端发送的消息
String payload = message.getPayload();
System.out.println("接收到消息: " + payload);
// 回复消息给客户端
session.sendMessage(new TextMessage("服务端收到: " + payload));
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
// 移除断开的会话
sessions.remove(session.getId());
System.out.println("客户端断开连接,SessionID: " + session.getId());
}
// 广播消息给所有客户端
public void broadcast(String message) throws Exception {
for (WebSocketSession session : sessions.values()) {
if (session.isOpen()) {
session.sendMessage(new TextMessage(message));
}
}
}
// 发送消息给特定客户端
public void sendToClient(String sessionId, String message) throws Exception {
WebSocketSession session = sessions.get(sessionId);
if (session != null && session.isOpen()) {
session.sendMessage(new TextMessage(message));
}
}
}
2. 配置 WebSocket 端点
使用 WebSocketConfigurer
来注册端点并绑定自定义处理器:
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
private final CustomWebSocketHandler customWebSocketHandler;
public WebSocketConfig(CustomWebSocketHandler customWebSocketHandler) {
this.customWebSocketHandler = customWebSocketHandler;
}
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
// 注册 WebSocket 端点
registry.addHandler(customWebSocketHandler, "/ws")
.setAllowedOrigins("*"); // 配置跨域
}
}
3. 服务端发送消息
在业务逻辑中,可以通过 CustomWebSocketHandler
的实例发送消息:
- 广播消息:
customWebSocketHandler.broadcast("这是一条广播消息");
- 发送消息给特定客户端:
String sessionId = "某个客户端的SessionID"; customWebSocketHandler.sendToClient(sessionId, "这是一条点对点消息");
4. 客户端代码
前端通过 WebSocket 连接到服务端,并接收消息:
const socket = new WebSocket("ws://localhost:8080/ws");
// 连接成功
socket.onopen = () => {
console.log("WebSocket连接成功");
socket.send("Hello, Server!");
};
// 接收服务端消息
socket.onmessage = (event) => {
console.log("收到服务端消息:", event.data);
};
// 连接关闭
socket.onclose = () => {
console.log("WebSocket连接已关闭");
};
扩展功能
1. 会话认证
在 afterConnectionEstablished
方法中,验证连接的合法性,例如检查用户身份或授权:
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
String token = (String) session.getAttributes().get("token");
if (!isValidToken(token)) {
session.close(CloseStatus.NOT_ACCEPTABLE);
return;
}
sessions.put(session.getId(), session);
System.out.println("客户端认证通过,SessionID: " + session.getId());
}
2. 分组广播
可以为会话分配组,然后按组广播消息:
private final Map<String, Set<WebSocketSession>> groupSessions = new ConcurrentHashMap<>();
public void sendToGroup(String groupId, String message) throws Exception {
Set<WebSocketSession> group = groupSessions.get(groupId);
if (group != null) {
for (WebSocketSession session : group) {
if (session.isOpen()) {
session.sendMessage(new TextMessage(message));
}
}
}
}
3. 心跳检测
定期发送心跳消息,确保连接可用:
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
public void startHeartbeat() {
executor.scheduleAtFixedRate(() -> {
for (WebSocketSession session : sessions.values()) {
if (session.isOpen()) {
try {
session.sendMessage(new TextMessage("ping"));
} catch (IOException e) {
e.printStackTrace();
}
}
}
}, 0, 30, TimeUnit.SECONDS);
}
总结
- 使用
WebSocketSession
是一种底层操作方式,适合直接控制客户端会话,特别是需要实现复杂推送逻辑时。 - 广播、点对点推送和分组推送可以通过管理会话集合实现。
- 在实际项目中,如果需求简单,推荐使用
SimpMessagingTemplate
;如果需要自定义推送逻辑,使用WebSocketSession
更灵活。
4. 请在进一步详细全面的解释使用SimpMessagingTemplate的方式以及它和WebSession的区别
使用 SimpMessagingTemplate
的方式及其区别
SimpMessagingTemplate
是 Spring 提供的基于消息代理的高级 WebSocket 消息处理工具,它和直接操作 WebSocketSession
的方式有本质的区别。SimpMessagingTemplate
构建在 STOMP 协议之上,通过简化消息发送与订阅流程,更适合处理广播、订阅和点对点消息传递的场景。
1. SimpMessagingTemplate 的特点
基于 STOMP 协议:
通过 STOMP 协议实现消息路由,支持主题订阅 (topic
)、点对点通信 (queue
) 等消息模式。消息代理:
借助内置或外部的消息代理(如 RabbitMQ、ActiveMQ),处理复杂的消息发布和订阅逻辑。高层封装:
提供模板方法,无需直接管理会话,开发者只需关心消息发送目标和内容。
2. 使用 SimpMessagingTemplate
的场景
- 广播消息:向所有订阅某主题的客户端发送消息。
- 点对点通信:发送给指定用户(支持用户唯一标识)。
- 分组推送:向某组订阅者发送消息。
3. 使用 SimpMessagingTemplate 的实现步骤
3.1 配置 WebSocket 端点
在 Spring 中配置 WebSocket 支持及 STOMP 端点:
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
// 配置消息代理
config.enableSimpleBroker("/topic", "/queue"); // 广播和点对点的消息前缀
config.setApplicationDestinationPrefixes("/app"); // 应用消息前缀
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
// 配置 STOMP 端点,供客户端连接
registry.addEndpoint("/ws").setAllowedOrigins("*").withSockJS(); // 支持 SockJS
}
}
3.2 使用 SimpMessagingTemplate 发送消息
在控制器或服务中注入 SimpMessagingTemplate
,用于发送消息:
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Service;
@Service
public class WebSocketService {
private final SimpMessagingTemplate messagingTemplate;
public WebSocketService(SimpMessagingTemplate messagingTemplate) {
this.messagingTemplate = messagingTemplate;
}
// 广播消息
public void sendBroadcast(String message) {
messagingTemplate.convertAndSend("/topic/broadcast", message);
}
// 点对点消息
public void sendToUser(String username, String message) {
messagingTemplate.convertAndSendToUser(username, "/queue/messages", message);
}
}
3.3 客户端订阅消息
客户端通过 STOMP 协议订阅消息和发送消息。例如,使用 JavaScript 的 stomp.js
:
const socket = new SockJS('/ws');
const stompClient = Stomp.over(socket);
// 连接到 WebSocket 服务端
stompClient.connect({}, () => {
console.log("Connected");
// 订阅广播主题
stompClient.subscribe('/topic/broadcast', (message) => {
console.log("广播消息:", message.body);
});
// 订阅点对点消息
stompClient.subscribe('/user/queue/messages', (message) => {
console.log("点对点消息:", message.body);
});
// 向服务端发送消息
stompClient.send('/app/sayHello', {}, JSON.stringify({content: "Hello Server!"}));
});
4. WebSocketSession 与 SimpMessagingTemplate 的区别
特性 | WebSocketSession | SimpMessagingTemplate |
---|---|---|
协议层 | 仅支持 WebSocket 原生协议 | 基于 WebSocket + STOMP 协议 |
会话管理 | 开发者需手动管理 WebSocketSession | Spring 自动管理会话,不直接操作会话 |
消息模式 | 点对点,广播需自行实现 | 支持广播 (topic )、点对点 (queue ) 等模式 |
消息代理支持 | 无,直接通过 WebSocket 发送消息 | 支持内置或外部消息代理(如 RabbitMQ) |
扩展性 | 灵活,但开发成本高 | 提供多种开箱即用功能,简化复杂场景 |
适用场景 | 自定义会话管理或高级需求(如心跳检测) | 标准化消息传递需求,尤其是广播或订阅场景 |
集群支持 | 无内置支持,需要自行实现 | 借助消息代理可天然支持集群 |
5. 使用场景对比
- WebSocketSession:
- 适用于直接管理客户端会话的低级场景。
- 场景:精准点对点通信、定制化需求(如复杂认证流程、定时心跳检测)。
- SimpMessagingTemplate:
- 适用于需要广播或订阅模式的标准场景。
- 场景:实时推送通知(如新闻、股票更新)、点对点聊天应用。
6. 扩展功能(以 SimpMessagingTemplate 为例)
6.1 广播推送特定类型消息
通过设置自定义消息类,实现类型安全的推送:
public class NotificationMessage {
private String type;
private String content;
// Getters and setters
}
@Service
public class NotificationService {
private final SimpMessagingTemplate messagingTemplate;
public NotificationService(SimpMessagingTemplate messagingTemplate) {
this.messagingTemplate = messagingTemplate;
}
public void broadcastNotification(String type, String content) {
NotificationMessage message = new NotificationMessage();
message.setType(type);
message.setContent(content);
messagingTemplate.convertAndSend("/topic/notifications", message);
}
}
6.2 配合 Spring Security 实现用户认证
将 WebSocket 会话与用户身份绑定,确保消息发送给正确用户:
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.setUserDestinationPrefix("/user"); // 配置用户目的地前缀
}
结合 sendToUser
,发送给指定用户的消息:
messagingTemplate.convertAndSendToUser(username, "/queue/private", "私人消息");
总结
- WebSocketSession:提供了底层控制能力,适合自定义实现和精细化需求。
- SimpMessagingTemplate:提供了高层封装,结合消息代理支持广播、点对点通信和订阅模式,更适合标准化应用场景。
- 实际项目选择:简单推送和实时订阅场景优先使用
SimpMessagingTemplate
,复杂需求可结合WebSocketSession
。
5. SimpMessagingTemplate借助第三方的MQ实现代理?如何理解呢?
SimpMessagingTemplate 借助第三方 MQ 实现代理的理解
在 Spring WebSocket 中,SimpMessagingTemplate
是一种高级消息发送工具,它依赖消息代理(Message Broker
)来实现广播、点对点等复杂的消息传递场景。默认情况下,Spring 内置了一个简单的消息代理(基于内存),但对于更大规模的应用,特别是需要集群支持或跨系统通信时,可以通过整合第三方消息队列(如 RabbitMQ、ActiveMQ)来作为消息代理。
1. 消息代理的作用
消息代理(Message Broker)的主要职责是接收消息并分发消息,它在 WebSocket 的消息处理流程中充当“中间人”。具体作用如下:
管理订阅关系:
负责记录哪些客户端订阅了哪些主题(topic),以及用户消息的目的地(queue)。路由消息:
根据消息的目标地址(destination),将消息发送到相应的订阅者或队列中。消息持久化(视消息队列类型而定):
某些消息队列(如 RabbitMQ)支持将消息持久化以防丢失。支持集群和负载均衡:
第三方消息代理支持多节点部署,适合高并发场景。
2. SimpMessagingTemplate 与消息代理的关系
默认代理:内置简单代理
Spring 默认启用的简单代理是内存中的轻量级实现,只支持基本的广播和点对点功能,适合小型应用或开发测试阶段。外部代理:集成第三方 MQ
当需要更强大的消息功能(如分布式通信、消息持久化、负载均衡)时,可以配置 Spring 使用外部消息队列(RabbitMQ、ActiveMQ、Kafka 等)作为代理。
3. 使用第三方 MQ 作为消息代理的配置和流程
3.1 配置外部消息代理
以 RabbitMQ 为例,假设你已部署了 RabbitMQ 服务:
- 引入依赖
在 pom.xml
中添加 RabbitMQ 的依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
- 配置 WebSocket 使用 RabbitMQ
在 WebSocket 配置类中启用 STOMP 协议并使用 RabbitMQ 作为代理:
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
// 配置外部消息代理(RabbitMQ)
config.enableStompBrokerRelay("/topic", "/queue") // 广播和点对点消息使用消息代理
.setRelayHost("localhost") // RabbitMQ 地址
.setRelayPort(61613) // STOMP 协议端口
.setClientLogin("guest") // RabbitMQ 登录信息
.setClientPasscode("guest");
// 应用程序目的地前缀
config.setApplicationDestinationPrefixes("/app");
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/ws").setAllowedOrigins("*").withSockJS();
}
}
3.2 使用 SimpMessagingTemplate
配置完成后,SimpMessagingTemplate
的使用方式和默认简单代理完全一致。例如:
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Service;
@Service
public class NotificationService {
private final SimpMessagingTemplate messagingTemplate;
public NotificationService(SimpMessagingTemplate messagingTemplate) {
this.messagingTemplate = messagingTemplate;
}
// 广播消息(通过 RabbitMQ 代理分发)
public void sendBroadcast(String message) {
messagingTemplate.convertAndSend("/topic/notifications", message);
}
// 点对点消息
public void sendToUser(String username, String message) {
messagingTemplate.convertAndSendToUser(username, "/queue/private", message);
}
}
3.3 消息发送的工作流程
以广播消息为例:
客户端订阅:
客户端订阅/topic/notifications
(通过 RabbitMQ 代理处理订阅关系)。服务端发送消息:
调用messagingTemplate.convertAndSend("/topic/notifications", message)
发送消息。消息代理分发:
RabbitMQ 代理负责将消息路由到所有订阅了/topic/notifications
的客户端。客户端接收消息:
每个订阅该主题的客户端收到消息。
4. 内置代理 vs 第三方代理
特性 | 内置简单代理 | 第三方代理(如 RabbitMQ) |
---|---|---|
实现方式 | Spring 内存中实现 | 借助外部 MQ(RabbitMQ、ActiveMQ 等) |
持久化支持 | 不支持 | 支持消息持久化 |
集群支持 | 不支持 | 支持,适合高并发场景 |
消息路由能力 | 基本的主题订阅和点对点消息 | 提供高级的路由功能 |
性能和吞吐量 | 适合小规模应用 | 支持高性能和大规模消息传递 |
配置复杂度 | 简单,无需额外配置 | 需要部署和配置 MQ |
典型应用场景 | 开发、测试、简单实时推送应用 | 分布式系统、生产环境、高并发场景 |
5. 适用场景
- 使用内置代理:
- 小型应用或开发阶段。
- 消息量较少,应用逻辑简单。
- 无需消息持久化和复杂路由。
- 使用第三方代理:
- 分布式系统需要跨服务传递消息。
- 高并发场景(如实时金融系统、物联网)。
- 需要持久化消息或延迟消息(MQ 支持延迟队列)。
总结
SimpMessagingTemplate
是一种高层工具,用于发送 WebSocket 消息,它本身不处理消息的存储和分发,而是依赖消息代理。- 内置简单代理适合小型应用,而第三方消息队列(如 RabbitMQ、Kafka)可以提升扩展性、可靠性和性能。
- 通过 STOMP 协议,Spring WebSocket 可以无缝整合外部消息代理,为复杂场景提供更强的支持。
6. 这种MQ代理情况下,请问各个客户端是和MQ服务器建立的webSocket连接吗?我有些迷惑
在使用消息代理(如 RabbitMQ、ActiveMQ)作为 WebSocket 的消息处理核心时,客户端是否直接与 MQ 服务器建立 WebSocket 连接,取决于具体的配置和消息流的设计。以下是对这种模式的详细解析:
1. 各个客户端是否直接与 MQ 建立 WebSocket 连接?
答:一般情况下,客户端并不直接与 MQ 服务器建立 WebSocket 连接。
而是通过应用服务器(Spring WebSocket Server)与客户端建立 WebSocket 连接,由应用服务器和 MQ 服务器之间通过 STOMP 协议进行通信。
2. 工作原理
当 Spring WebSocket 配置了消息代理(如 RabbitMQ)后,消息的流转过程可以分为两部分:
客户端与应用服务器的连接:
客户端通过 WebSocket 或 SockJS 连接到应用服务器。
客户端订阅特定的主题(如/topic/news
)或队列(如/queue/private
)。应用服务器维护 WebSocket 连接和会话(Session),并充当客户端的入口。
应用服务器与 MQ 的通信:
Spring WebSocket 将消息路由给消息代理(MQ)。
应用服务器通过 STOMP 协议和 MQ 服务器通信,将客户端发来的消息转发给 MQ。- MQ 处理消息的路由和分发:
- 根据订阅规则(主题或点对点队列),MQ 将消息分发给目标客户端的订阅者。
- MQ 支持持久化、路由和跨集群分发。
- 应用服务器从 MQ 获取消息:
应用服务器作为 MQ 的消费者,从 MQ 拉取或接收分发的消息,并通过 WebSocket 将消息发送给订阅的客户端。
3. 核心连接关系图
以下是这种架构的连接关系:
客户端 A -- WebSocket -- 应用服务器 -- STOMP -- MQ 服务器
客户端 B -- WebSocket -- 应用服务器 -- STOMP -- MQ 服务器
具体说明:
- 客户端:
- 只与 Spring 应用服务器建立 WebSocket 连接。
- 不直接连接 MQ 服务器。
- 应用服务器:
- 对客户端提供 WebSocket 连接管理和消息转发服务。
- 与 MQ 服务器通过 STOMP 协议通信。
- MQ 服务器:
- 处理消息的路由、分发、持久化等复杂逻辑。
- 应用服务器通过 STOMP 提交和订阅消息。
4. 为什么客户端不直接连接 MQ?
客户端不直接连接 MQ 的原因如下:
- WebSocket 连接管理复杂:
- MQ 服务器本身并非专门为管理 WebSocket 长连接设计,直接与大量客户端建立 WebSocket 连接会导致扩展性和连接管理问题。
- 应用服务器(如 Spring WebSocket)对 WebSocket 连接的管理更高效。
- 安全性和控制:
- 应用服务器可以对 WebSocket 请求进行认证和鉴权,而直接连接 MQ 可能暴露消息队列的配置和权限问题。
- 解耦客户端和 MQ:
- 客户端仅需处理 WebSocket 连接,与具体使用的 MQ(RabbitMQ、ActiveMQ、Kafka)无关,便于更换或优化底层实现。
- 协议支持:
- MQ 服务器通常使用 STOMP 协议,而 WebSocket 是一种低层的通信协议,直接连接可能需要额外的客户端支持。
- 多协议支持:
- Spring WebSocket 支持 WebSocket 和 SockJS,兼容性更高,能处理不支持原生 WebSocket 的浏览器。
5. 消息处理流程详解
以广播消息为例(使用 RabbitMQ 代理):
客户端订阅主题:
客户端通过 WebSocket 向应用服务器订阅/topic/news
。订阅关系存储:
应用服务器通过 STOMP 协议将订阅请求发送到 RabbitMQ。- 消息发送:
当应用服务器调用SimpMessagingTemplate.convertAndSend("/topic/news", message)
:- 消息被发送到 RabbitMQ 的
/topic/news
主题。 - RabbitMQ 处理消息路由,将消息投递给所有订阅
/topic/news
的消费者。
- 消息被发送到 RabbitMQ 的
- 消息转发给客户端:
- RabbitMQ 将消息推送到应用服务器(作为消费者)。
- 应用服务器通过 WebSocket 将消息发送给订阅该主题的客户端。
6. 总结
WebSocket 连接管理
- 客户端与 MQ 之间没有直接 WebSocket 连接。
- 客户端只与 Spring 应用服务器建立 WebSocket 连接。
应用服务器的作用
- 管理客户端的 WebSocket 会话。
- 作为客户端和消息代理之间的桥梁,负责转发消息。
消息代理的作用
- 负责复杂的消息路由、订阅管理和持久化。
- 提供集群支持以处理大规模分布式通信。
这种架构既充分利用了 MQ 的强大能力,又简化了客户端的实现复杂度,是一种高效且可扩展的设计方式。
7. 我貌似理解了。实际上web客户端和spring webSocket的关系,与是否引入MQ根本就没有直接联系。后端应用服务引入MQ的目的就是想将webSocket请求在后端服务器节点之间通过MQ进行转发而已。
是的,你理解得非常准确!让我们更系统地总结一下这种关系:
客户端与后端 WebSocket 服务的关系
1. 客户端和 Spring WebSocket
客户端通过 WebSocket 或 SockJS 与后端的 Spring WebSocket 服务建立连接。这是最基础的通信通道,无论是否引入 MQ,这个连接都是直接的。
- 客户端的职责:
- 建立 WebSocket 连接。
- 订阅主题或队列。
- 发送消息或接收消息。
- Spring WebSocket 的职责:
- 管理与客户端的 WebSocket 会话。
- 接收客户端的消息。
- 将消息推送给订阅的客户端。
2. 引入 MQ 是否影响客户端?
不影响客户端。
客户端只需要关心与 Spring WebSocket 的交互,它不知道也不需要知道后端是否使用 MQ。客户端的代码逻辑与是否引入 MQ 没有直接关系。
引入 MQ 的目的
引入 MQ 的主要目的是在后端服务器之间处理 WebSocket 消息的路由和转发。其核心目标如下:
1. 实现后端服务节点之间的消息分发
在分布式系统中,通常会有多个服务节点同时运行。客户端的 WebSocket 连接可能分布在不同的节点上,引入 MQ 后,可以实现跨节点的消息分发。
示例:
- 客户端 A 连接到服务节点 A,客户端 B 连接到服务节点 B。
- 如果客户端 A 发消息需要推送给客户端 B,节点 A 的消息会通过 MQ 转发到节点 B,再由节点 B 推送给客户端 B。
2. 提供可靠的消息中转机制
MQ 通常具备消息持久化、重试机制等特性,可以保证消息不会因为服务节点故障而丢失。
3. 解耦消息处理
使用 MQ 后,消息的生产者和消费者可以解耦。WebSocket 消息的发送和处理逻辑可以在不同的服务中实现,增强了系统的灵活性和扩展性。
流程对比
没有 MQ 的场景:
- 客户端与 WebSocket 服务建立连接。
- WebSocket 服务直接管理所有的消息转发。
- 缺点:当有多个服务节点时,需要复杂的逻辑来同步各节点之间的消息。
引入 MQ 的场景:
- 客户端与 WebSocket 服务建立连接(保持不变)。
- WebSocket 服务通过 MQ 在各服务节点之间路由和转发消息。
- 优点:服务节点之间的消息分发由 MQ 管理,简化了节点间通信逻辑。
总结
- 客户端与 Spring WebSocket 是直接连接的,这种关系与是否引入 MQ 无关。
- 引入 MQ 是为了后端服务器之间更高效地处理消息路由,特别是在分布式系统中。
- 客户端的逻辑无需感知 MQ 的存在,只需要处理与 WebSocket 服务的交互即可。
你可以把 MQ 想象成一个“消息中转站”,专门为后端服务节点之间的消息传递提供支持,而客户端仍然只与自己连接的服务节点进行直接通信。
8. 如果服务端自己发生某个动作事件后想把消息推送给所有的客户端,请问webSocket必须事先和服务端建立好webSocket连接才可以吗?如果某个session失效了或者下线了,此时服务端推送的消息怎么处理?直接丢失了吗?如何持久化服务端待推送的消息呢?等客户端上线后继续推送
这是一个关于 WebSocket 在实时消息推送场景中如何处理连接、消息持久化和重推送的关键问题。让我们详细解析。
1. 服务端推送消息需要客户端事先建立 WebSocket 连接吗?
是的,WebSocket 是基于双向连接的协议,客户端必须事先和服务端建立好 WebSocket 连接,服务端才能推送消息。
- 如果客户端没有建立 WebSocket 连接,服务端无法直接推送消息给它。
- WebSocket 是一个“实时”的协议,不像 HTTP 需要请求才能响应。只有连接建立后,服务端才能在任意时刻主动向客户端发送数据。
2. 如果某个客户端的 WebSocket 会话失效或者下线了,服务端推送消息会怎样处理?
(1) 客户端断开时会话失效
当客户端断开 WebSocket 连接时:
- 服务端的
WebSocketSession
对应的会话会被销毁。 - 此时尝试向这个会话推送消息会抛出异常(通常是
IllegalStateException
或IOException
)。
(2) 消息会丢失
在原生 WebSocket 协议中,消息没有持久化机制。
- 如果客户端断开连接,服务端无法缓冲待推送的消息。
- 消息直接丢失。
3. 如何持久化待推送的消息?
为了确保客户端离线期间的消息不会丢失,常用的解决方案是消息持久化。当客户端重新上线时,服务端可以继续推送这些待处理消息。
(1) 持久化方式
服务端需要将消息存储在某种可靠的存储介质中,例如:
- 数据库:如 MySQL、PostgreSQL,用于存储每个用户的待推送消息。
- 分布式消息队列:如 Kafka、RabbitMQ,用于暂存消息并定期消费。
- 缓存系统:如 Redis,快速存取未推送的消息。
(2) 存储策略
通常可以通过以下方式持久化消息:
- 消息内容存储:记录消息的具体内容和元数据(目标用户 ID、主题等)。
- 消息状态记录:标记每条消息是否已推送成功(成功、失败、待推送)。
4. 当客户端重新上线时,如何继续推送?
当客户端重新建立 WebSocket 连接后:
- 服务端识别客户端身份:
- 通过 WebSocket 握手时传递的身份信息(如用户 ID、Token)识别用户。
- 可以将用户的
WebSocketSession
与其用户 ID 绑定。
- 查询未推送消息:
- 根据用户 ID 从消息持久化存储中获取未推送的消息。
- 重新推送消息:
- 通过
SimpMessagingTemplate
或WebSocketSession
将消息逐条推送给客户端。 - 推送成功后,更新消息状态为已推送。
- 通过
示例代码
以 Spring WebSocket 为例:
@Autowired
private SimpMessagingTemplate messagingTemplate;
@Autowired
private MessageRepository messageRepository; // 存储消息的持久化服务
// 客户端上线后触发的逻辑
public void handleClientReconnect(String userId) {
// 从数据库查询待推送消息
List<Message> pendingMessages = messageRepository.findPendingMessagesByUserId(userId);
// 推送消息
for (Message message : pendingMessages) {
messagingTemplate.convertAndSendToUser(userId, "/queue/notifications", message);
// 更新消息状态为已推送
messageRepository.updateMessageStatus(message.getId(), "SENT");
}
}
5. 常见的架构方案
在实际项目中,离线消息推送的设计通常包括以下几个部分:
(1) WebSocket 连接管理
- 服务端维护所有在线用户的 WebSocket 会话(
WebSocketSession
或类似的)。 - 断开连接时释放会话资源。
(2) 消息持久化
- 将每条待推送的消息存储在可靠的介质中(数据库、MQ、Redis 等)。
- 根据用户状态(在线/离线)决定是否立即推送。
(3) 离线消息推送
- 用户重新连接时,检查其待处理消息队列。
- 将消息从存储中取出并推送。
6. 典型的流程图
客户端 (Browser/App) <----> Spring WebSocket Server <----> 数据库/消息队列
↑ ↑
| |
建立连接 持久化消息
- 客户端与 WebSocket 服务建立连接。
- 客户端发送订阅或消息。
- 服务端处理消息:
- 如果目标客户端在线,直接推送。
- 如果目标客户端离线,持久化消息。
- 客户端重新上线时,服务端从存储中拉取待推送消息,逐条推送。
7. 总结
- WebSocket 必须建立连接才能推送消息。如果没有连接,服务端无法主动向客户端发送消息。
- 连接失效时消息会丢失,必须通过持久化机制保存待推送消息。
- 常见的解决方案是使用数据库或消息队列持久化消息,并在客户端重新上线时继续推送。
- Spring WebSocket 配合
SimpMessagingTemplate
和持久化机制,可以轻松实现上述功能,适合大多数场景。
9. 如果用户换了设备重新登录上线了,此时如何确定用户的webSocketSession呢?webSocketSession应该会发生变化吧?
是的,当用户换设备或者重新登录上线时,由于重新建立了 WebSocket 连接,服务端的 WebSocketSession
会发生变化。这会引发一个关键问题:如何在用户切换设备或者重新连接时,正确关联新的 WebSocketSession?
以下是详细的分析和解决方案:
1. 用户换设备或重新登录的场景分析
WebSocketSession 的特性
- 每次建立 WebSocket 连接,服务端都会为该连接分配一个新的
WebSocketSession
对象。 - 如果同一个用户在不同设备上登录,或者断开后重新登录,新的连接会导致之前的
WebSocketSession
失效。
问题
- 用户切换设备或重新登录后,服务端需要将新的
WebSocketSession
与用户正确绑定。 - 如果用户的旧
WebSocketSession
没有被清理,可能会导致消息推送到失效的连接上。
2. 解决方案:通过用户标识管理 WebSocketSession
要正确管理用户的 WebSocketSession,需要使用用户的唯一标识(如用户 ID)将 WebSocketSession 映射到具体的用户。
步骤:
(1) 用户标识的获取
- 在 WebSocket 握手阶段,通过认证方式获取用户的唯一标识(如 Token 或 UserID)。
- 常见方式:
- 在 URL 或 Header 中携带 Token:
- WebSocket 握手时,客户端通过 URL 或 Header 携带 Token。
- 服务端解析 Token,获取用户 ID。
- 通过 Spring Security 提供的认证信息:
- 如果应用已集成 Spring Security,可直接从
Principal
中获取用户信息。
- 如果应用已集成 Spring Security,可直接从
- 在 URL 或 Header 中携带 Token:
(2) 维护用户与 WebSocketSession 的映射
- 使用
ConcurrentHashMap
或类似的数据结构,维护用户 ID 和 WebSocketSession 的映射关系。 - 每当用户建立新连接时,更新此映射。
(3) 清理旧的 WebSocketSession
- 如果用户已有连接,断开旧连接,避免多端冲突。
- 清理无效的会话,释放资源。
3. 核心实现代码
以下是基于 Spring WebSocket 的实现:
(1) 维护用户与 WebSocketSession 的映射
import org.springframework.web.socket.WebSocketSession;
import java.util.concurrent.ConcurrentHashMap;
@Component
public class WebSocketSessionManager {
// 用户ID和WebSocketSession的映射
private final ConcurrentHashMap<String, WebSocketSession> sessionMap = new ConcurrentHashMap<>();
// 添加用户会话
public void addSession(String userId, WebSocketSession session) {
// 如果已存在会话,断开旧会话
WebSocketSession oldSession = sessionMap.put(userId, session);
if (oldSession != null && oldSession.isOpen()) {
try {
oldSession.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
// 获取用户会话
public WebSocketSession getSession(String userId) {
return sessionMap.get(userId);
}
// 删除用户会话
public void removeSession(String userId) {
sessionMap.remove(userId);
}
}
(2) 在握手阶段绑定用户 ID
通过 WebSocket 握手拦截器获取用户信息:
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.support.HttpSessionHandshakeInterceptor;
import java.util.Map;
public class CustomHandshakeInterceptor extends HttpSessionHandshakeInterceptor {
@Override
public boolean beforeHandshake(
ServerHttpRequest request,
ServerHttpResponse response,
WebSocketHandler wsHandler,
Map<String, Object> attributes) throws Exception {
// 从 URL 参数或 Header 中获取用户 ID(例如通过 Token)
String userId = extractUserIdFromRequest(request);
if (userId != null) {
attributes.put("userId", userId); // 将用户ID存入WebSocket会话属性
return true;
}
return false; // 未找到用户ID,拒绝连接
}
private String extractUserIdFromRequest(ServerHttpRequest request) {
// 解析用户 ID 的逻辑,例如从Token中解析
return "mockedUserId"; // 示例返回值
}
}
(3) 在连接建立时绑定 WebSocketSession
使用 WebSocketSessionManager
来管理连接:
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;
@Component
public class CustomWebSocketHandler extends TextWebSocketHandler {
@Autowired
private WebSocketSessionManager sessionManager;
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
// 获取握手时存储的用户 ID
String userId = (String) session.getAttributes().get("userId");
if (userId != null) {
// 将用户ID和Session绑定
sessionManager.addSession(userId, session);
}
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
// 清理用户的会话
String userId = (String) session.getAttributes().get("userId");
if (userId != null) {
sessionManager.removeSession(userId);
}
}
}
4. 用户重新登录后的行为
- 绑定新 WebSocketSession:
- 当用户在新设备或重新登录时,会重新建立 WebSocket 连接。
- 服务端通过用户 ID 绑定新的 WebSocketSession,同时清理旧的会话。
- 消息推送到新的 WebSocketSession:
- 在消息推送时,直接从
WebSocketSessionManager
中获取用户的最新会话。 - 如果用户在线,推送消息。
- 如果用户不在线,消息持久化,等待用户重新上线后推送。
- 在消息推送时,直接从
推送消息示例
public void pushMessageToUser(String userId, String message) {
WebSocketSession session = sessionManager.getSession(userId);
if (session != null && session.isOpen()) {
try {
session.sendMessage(new TextMessage(message));
} catch (Exception e) {
e.printStackTrace();
}
} else {
// 用户不在线,持久化消息
saveMessageToDatabase(userId, message);
}
}
5. 总结
- WebSocketSession 是会随用户重连而变化的,必须基于用户 ID 维护映射关系。
- 通过拦截器或其他手段获取用户 ID,绑定到 WebSocketSession。
- 使用
ConcurrentHashMap
等数据结构管理用户 ID 和 WebSocketSession 的关系。 - 清理旧会话,确保消息推送到最新的 WebSocketSession 上。
- 对于离线用户,持久化消息,等待用户重新上线时继续推送。
10. 对于用户变更webSocketSession的场景,当用户下线后,消息的持久化应该是和用户的唯一标识比如用户id绑定在一起的吧?
是的,消息的持久化通常应该与用户的唯一标识(例如用户 ID)绑定在一起。这是因为用户在不同设备或多次重新登录后,WebSocketSession
会发生变化,而用户的唯一标识是稳定且不变的。通过绑定用户 ID,可以确保消息准确无误地推送到对应的用户,即使用户的会话发生了变化。
具体实现的逻辑
1. 消息与用户 ID 绑定
持久化离线消息时,需要以用户的唯一标识(如 userId
)作为关联键,保存到数据库或其他持久化存储中。例如:
- 数据库结构可以是:
+-----------+----------------+---------------------+ | messageId | userId | content | +-----------+----------------+---------------------+ | 1 | 1001 | "Hello User 1001" | | 2 | 1002 | "Your order is ..." | +-----------+----------------+---------------------+
- 保存消息时,绑定
userId
,确保用户可以在重新上线时重新接收到这些消息。
2. 消息的生命周期
- 消息产生:
- 当服务端需要推送消息给某个用户时,会检查用户的
WebSocketSession
是否存在且有效。 - 如果用户离线(
WebSocketSession
不存在或已关闭),将消息与用户 ID 绑定后存入持久化存储。
- 当服务端需要推送消息给某个用户时,会检查用户的
- 用户上线:
- 用户重新连接时,服务端从存储中查询该用户的离线消息。
- 推送这些消息到用户的新
WebSocketSession
后,清理已发送的离线消息。
- 消息过期:
- 根据业务需要,定期清理过期的或已处理的消息,避免持久化存储无限增长。
核心实现代码示例
以下是一个完整示例,展示如何处理消息持久化和推送的逻辑。
(1) 消息存储管理
假设我们用一个简单的内存数据结构来模拟数据库:
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.List;
@Component
public class OfflineMessageStore {
// 模拟数据库:userId -> 消息列表
private final ConcurrentHashMap<String, List<String>> messageMap = new ConcurrentHashMap<>();
// 保存离线消息
public void saveMessage(String userId, String message) {
messageMap.computeIfAbsent(userId, key -> new CopyOnWriteArrayList<>()).add(message);
}
// 获取离线消息
public List<String> getMessages(String userId) {
return messageMap.getOrDefault(userId, new CopyOnWriteArrayList<>());
}
// 清除已发送的消息
public void removeMessages(String userId) {
messageMap.remove(userId);
}
}
(2) 推送逻辑
在用户上线时推送离线消息:
@Component
public class MessagePusher {
@Autowired
private WebSocketSessionManager sessionManager;
@Autowired
private OfflineMessageStore messageStore;
public void pushMessage(String userId, String message) {
WebSocketSession session = sessionManager.getSession(userId);
if (session != null && session.isOpen()) {
try {
// 在线推送消息
session.sendMessage(new TextMessage(message));
} catch (Exception e) {
e.printStackTrace();
// 推送失败,重新保存为离线消息
messageStore.saveMessage(userId, message);
}
} else {
// 用户离线,存储消息
messageStore.saveMessage(userId, message);
}
}
public void pushOfflineMessages(String userId) {
WebSocketSession session = sessionManager.getSession(userId);
if (session != null && session.isOpen()) {
// 获取离线消息
List<String> messages = messageStore.getMessages(userId);
for (String message : messages) {
try {
// 推送离线消息
session.sendMessage(new TextMessage(message));
} catch (Exception e) {
e.printStackTrace();
// 推送失败,保留消息
messageStore.saveMessage(userId, message);
}
}
// 清除已推送的消息
messageStore.removeMessages(userId);
}
}
}
(3) 用户上线时处理离线消息
当用户重新连接后,可以直接调用 pushOfflineMessages
方法:
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
// 获取用户 ID
String userId = (String) session.getAttributes().get("userId");
if (userId != null) {
// 绑定用户与 WebSocketSession
sessionManager.addSession(userId, session);
// 推送离线消息
messagePusher.pushOfflineMessages(userId);
}
}
关键点总结
- 消息持久化与用户 ID 绑定:
- 将消息与用户的唯一标识绑定,保证消息的针对性和一致性。
- 离线消息存储:
- 如果用户离线,消息会存储在持久化系统中(如数据库、Redis 等)。
- 重新上线处理:
- 用户重新上线时,拉取并推送离线消息,同时清理已处理的消息。
- 扩展到分布式:
- 如果服务是分布式部署,可以用 Redis 或 MQ 替代本地的
ConcurrentHashMap
来存储离线消息,确保多节点之间的数据一致性。
- 如果服务是分布式部署,可以用 Redis 或 MQ 替代本地的
持久化存储选择
根据业务需求,选择适合的持久化存储方式:
- 轻量级方案:
- 使用内存(
ConcurrentHashMap
)或本地文件。 - 适用于小规模、单节点部署。
- 使用内存(
- 中大型方案:
- 使用 Redis 存储离线消息,支持快速读写和分布式部署。
- 消息格式:
key = userId, value = [message1, message2, ...]
。
- 企业级方案:
- 使用消息队列(如 RabbitMQ 或 Kafka)管理离线消息,结合数据库存储持久化历史记录。
- 消息推送时从队列消费,确保消息可靠性。
通过这种方式,可以确保消息的推送准确性和可靠性,即使用户频繁切换设备或重新连接,消息也不会丢失。
11. webSocket消息的持久化,一般都有哪些存储策略?请列举业界比较成熟的持久化方案
在业界,WebSocket 消息持久化方案通常需要根据具体业务场景选择不同的策略。这些方案的目标是确保消息在用户离线、网络异常或系统故障时不会丢失,并在适当的时候重新推送给客户端。以下是一些成熟的持久化方案及其特点:
1. 基于内存的方案
描述
- 使用服务器内存(如
ConcurrentHashMap
或其他内存存储结构)暂存离线消息。 - 通常作为短期存储方案,适用于小规模部署和实时性较高的系统。
特点
- 优点:
- 实现简单,访问速度快。
- 适合轻量级应用或单节点服务。
- 缺点:
- 无法持久化,服务器重启或宕机会丢失消息。
- 不适合分布式架构,难以支持高并发。
使用场景
- 单节点小型应用,消息无需长期保存。
2. 使用 Redis 作为离线消息存储
描述
- 将离线消息存储在 Redis 中,通过用户 ID 或其他唯一标识进行分组。
- Redis 的数据结构(如
List
或SortedSet
)可以方便地管理用户的离线消息。
特点
- 优点:
- 支持高并发和快速读写,适合分布式部署。
- 方便实现消息的超时删除(通过 TTL 设置)。
- 可以灵活管理每个用户的消息队列(如基于优先级排序)。
- 缺点:
- 消息量大时可能增加存储成本。
- 需要定期清理过期或已处理的消息。
Redis 实现示例
- 消息存储格式:
- Key:
offline:messages:{userId}
- Value:
List
(存储消息内容)
- Key:
// 保存消息
redisTemplate.opsForList().leftPush("offline:messages:1001", "Hello User 1001");
// 获取消息
List<String> messages = redisTemplate.opsForList().range("offline:messages:1001", 0, -1);
// 删除消息
redisTemplate.delete("offline:messages:1001");
使用场景
- 中小型分布式应用,消息需要短期持久化(如 7 天内有效)。
3. 基于消息队列(MQ)
描述
- 使用消息队列(如 RabbitMQ、Kafka、RocketMQ 等)存储和转发消息。
- 当用户离线时,将消息存入队列;用户重新上线后,从队列中消费并推送消息。
特点
- 优点:
- 高吞吐量,支持大规模分布式系统。
- 自带消息重试、延时投递等特性,可靠性高。
- 可以设置消息过期时间或持久化到磁盘。
- 缺点:
- 需要额外维护 MQ 集群。
- 开发和部署成本较高。
RabbitMQ 示例
- 消息存储在
queue
中,消费者(推送逻辑)会在用户上线后消费消息。
// 推送消息到队列
rabbitTemplate.convertAndSend("exchange", "routingKey", "Hello User 1001");
// 消费消息
@RabbitListener(queues = "queueName")
public void consumeMessage(String message) {
System.out.println("Received message: " + message);
}
使用场景
- 大型分布式系统,尤其是需要严格消息顺序或高可靠性场景。
4. 使用数据库(如 MySQL、PostgreSQL)
描述
- 将离线消息持久化存储到数据库表中。
- 每条消息记录与用户唯一标识绑定,支持查询和定期清理。
特点
- 优点:
- 持久化程度高,适合存储历史消息。
- 容易扩展,可以记录消息状态(已读、未读)。
- 与现有业务数据库集成方便。
- 缺点:
- 性能瓶颈明显,不适合高并发场景。
- 对实时性要求高的场景可能存在延迟。
数据库结构示例
CREATE TABLE offline_messages (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
user_id VARCHAR(255) NOT NULL,
content TEXT NOT NULL,
status VARCHAR(20) DEFAULT 'UNREAD',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
// 保存消息
offlineMessageRepository.save(new OfflineMessage(userId, content));
// 获取消息
List<OfflineMessage> messages = offlineMessageRepository.findByUserIdAndStatus(userId, "UNREAD");
// 标记消息为已读
offlineMessageRepository.updateStatusByUserId(userId, "READ");
使用场景
- 需要长期保存历史消息,或需要对消息进行状态管理的场景。
5. 使用分布式存储(如 HBase、Cassandra)
描述
- 对于消息量巨大的系统,可以使用分布式存储(如 HBase、Cassandra)来持久化消息。
- 这些存储系统擅长处理大规模数据的写入和读取。
特点
- 优点:
- 适合大规模消息存储和查询。
- 支持分布式架构,高可用。
- 缺点:
- 部署和维护复杂度高。
- 需要额外开发高效的消息查询接口。
使用场景
- 超大规模社交网络或消息系统,如社交媒体、即时通信平台。
6. 结合多种方案
在实际系统中,往往会结合多种存储策略,以满足不同的业务需求。例如:
- 短期离线消息存储: 使用 Redis 暂存最近 7 天的消息。
- 长期消息记录: 使用数据库(如 MySQL)存储历史消息。
- 消息队列: 使用 RabbitMQ 或 Kafka 转发实时消息。
对比总结
方案 | 性能 | 持久化程度 | 实现复杂度 | 适用场景 |
---|---|---|---|---|
内存(如 Map) | 高 | 无 | 低 | 小型系统,实时性高 |
Redis | 较高 | 中 | 中 | 中型分布式系统,短期离线消息存储 |
消息队列(MQ) | 高 | 中 | 高 | 分布式系统,消息可靠性要求高 |
数据库(如 MySQL) | 中 | 高 | 中 | 需要长期保存消息 |
分布式存储 | 高 | 高 | 高 | 超大规模消息系统 |
推荐实践
- 中小型系统:
- 使用 Redis 作为离线消息的短期存储。
- 数据库用于存储消息历史记录。
- 大型分布式系统:
- 引入消息队列(如 RabbitMQ/Kafka)处理实时消息。
- Redis 暂存离线消息 + 数据库保存历史消息。
- 超大规模系统:
- 使用分布式存储(如 HBase)和 Kafka 配合,处理高并发和海量消息存储需求。
通过合理选择存储方案,可以有效提升系统的可靠性、性能和扩展性,同时满足 WebSocket 消息推送的业务需求。
12. 像国内的微信,这种离线消息一般存储多久?
像微信这样的大型即时通讯应用,其离线消息存储时间通常受多种因素影响,包括用户体验需求、存储成本、隐私保护政策等。以下是微信或类似即时通讯应用中离线消息存储的一些通用做法和时间范围(并非官方数据,而是基于业界经验的推测):
1. 离线消息存储时间
短期存储
- 一般为 7 天到 30 天。
- 适合未读的离线消息,消息在一定时间内保持可用。
- 如果用户在这段时间内上线,服务端会推送离线消息。
长期存储
- 部分关键消息(如系统通知、付费相关的信息)可能会存储更久,例如 90 天到 1 年。
- 如果用户长时间不在线,普通聊天消息可能会被删除,而不是无限期保存。
2. 不同消息类型的存储策略
- 普通聊天消息
- 存储时间:7 天到 30 天(微信的普通聊天消息据推测为 7 天)。
- 处理方式:如果用户未在规定时间内上线,未读消息可能被丢弃,或仅保留消息摘要。
- 群聊消息
- 存储时间:与普通消息相似,但可能增加对群成员的消息分发优化。
- 处理方式:部分消息可能在分发后清除,减少存储压力。
- 系统通知/关键消息
- 存储时间:更长,通常至少 90 天甚至更久(如交易相关信息)。
- 处理方式:用户上线后强制推送,确保用户接收。
- 音视频和大文件
- 存储时间:短期存储(7 天到 15 天)。
- 处理方式:音视频消息可能仅存储引用链接或缩略图,真正的媒体内容通过 CDN 提供下载,超期后删除。
3. 影响存储时长的因素
隐私政策
- 根据中国《个人信息保护法》和《网络安全法》,即时通讯应用需要遵循“最小化原则”,只保存必要的信息,避免长期或无期限存储用户消息。
用户体验
- 短期存储可以满足大多数用户的正常使用场景,同时避免积压过多消息,影响推送效率。
- 对于长期离线的用户,可以提示他们“消息已过期”而非重发所有消息。
存储成本
- 微信这类应用的用户规模庞大,每天产生的消息量可能达到数十亿条甚至更多。离线消息通常存储在高性能分布式存储中(如分布式数据库或对象存储),过长的保存周期会导致存储和运维成本快速增长。
场景和功能
- 企业微信或其他工作场景的即时通讯工具,可能提供更长的消息存储时间,甚至支持消息永久存储(如对企业管理员开放历史查询)。
4. 如何优化离线消息存储
1. 短期与长期结合
- 短期存储: Redis 或消息队列保存最近 7 天内的离线消息。
- 长期存储: 重要消息(如系统通知、交易相关)转存到数据库或分布式文件系统。
2. 压缩与分片
- 对离线消息进行压缩存储。
- 按用户分片存储,减少单点瓶颈。
3. 消息摘要
- 对于过期的消息,保留摘要(如“您有未读消息”),而不是保留完整内容。
4. CDN 加速
- 对于音视频消息或大文件,将内容存储到 CDN,提高分发效率,超期自动清理。
5. 国内其他即时通讯工具的实践
- 钉钉
- 商业用户和企业级服务支持更长的消息存储时间。
- 部分功能提供“消息回执”,确保重要消息不丢失。
- QQ
- 离线消息存储时间较短(可能为 7 天),普通用户无法查询过期消息。
- 会员服务可能会提供更长的消息历史查看功能。
- 企业微信
- 为企业用户提供消息永久存储和记录功能。
- 开发者可以接入历史消息存档 API,满足审计和合规需求。
6. 结论与建议
- 普通用户: 微信等应用的离线消息存储时间较短(通常为 7 天)。建议用户尽快上线读取未读消息。
- 业务需求: 如果您的系统需要实现类似的功能,可以结合 Redis 和分布式数据库实现短期和长期消息存储。
- 关键消息处理: 针对重要消息,确保离线消息能够以持久化方式与用户标识(如用户 ID)绑定,直到确认推送成功。
通过合理设计存储策略,不仅能满足业务需求,还能控制存储成本,提升系统性能。
文档信息
- 本文作者:Marshall