|
WebSocket 是一种在单个 TCP 连接上进行全双工通信的协议,允许服务器和客户端之间进行实时双向通信。
基本使用
1. 创建 WebSocket 连接
// 创建 WebSocket 连接
const socket = new WebSocket('ws://localhost:8080');
// 或者使用安全连接
const secureSocket = new WebSocket('wss://example.com/socket');
2. WebSocket 事件
// 连接建立时触发
socket.onopen = function(event) {
console.log('连接已建立');
socket.send('Hello Server!');
};
// 接收到消息时触发
socket.onmessage = function(event) {
console.log('收到消息:', event.data);
// 处理接收到的数据
};
// 发生错误时触发
socket.onerror = function(error) {
console.error('WebSocket 错误:', error);
};
// 连接关闭时触发
socket.onclose = function(event) {
console.log('连接关闭', event.code, event.reason);
// 可以在这里尝试重连
};
完整示例
客户端示例
<!DOCTYPE html>
<html>
<head>
<title>WebSocket 示例</title>
</head>
<body>
<div>
<input type="text" id="messageInput" placeholder="输入消息">
<button onclick="sendMessage()">发送</button>
</div>
<div id="messages"></div>
<script>
// 创建 WebSocket 连接
const socket = new WebSocket('ws://localhost:8080');
const messagesDiv = document.getElementById('messages');
// 连接建立
socket.onopen = function() {
addMessage('系统', '连接成功!');
};
// 接收消息
socket.onmessage = function(event) {
try {
const data = JSON.parse(event.data);
addMessage(data.sender, data.message);
} catch (e) {
addMessage('系统', event.data);
}
};
// 错误处理
socket.onerror = function(error) {
addMessage('系统', '连接错误');
};
// 连接关闭
socket.onclose = function() {
addMessage('系统', '连接已关闭');
};
// 发送消息
function sendMessage() {
const input = document.getElementById('messageInput');
const message = input.value.trim();
if (message) {
socket.send(JSON.stringify({
type: 'message',
content: message,
timestamp: new Date().toISOString()
}));
input.value = '';
}
}
// 显示消息
function addMessage(sender, text) {
const msgElement = document.createElement('div');
msgElement.innerHTML = `<strong>${sender}:</strong> ${text}`;
messagesDiv.appendChild(msgElement);
messagesDiv.scrollTop = messagesDiv.scrollHeight;
}
// 关闭连接(页面卸载时)
window.addEventListener('beforeunload', function() {
if (socket.readyState === WebSocket.OPEN) {
socket.close(1000, '用户离开页面');
}
});
</script>
</body>
</html>
Node.js 服务器端示例
// 使用 ws 库
const WebSocket = require('ws');
// 创建 WebSocket 服务器
const wss = new WebSocket.Server({ port: 8080 });
console.log('WebSocket 服务器启动在 ws://localhost:8080');
// 连接处理
wss.on('connection', function connection(ws) {
console.log('新客户端连接');
// 发送欢迎消息
ws.send(JSON.stringify({
type: 'system',
message: '欢迎连接到服务器!'
}));
// 接收客户端消息
ws.on('message', function incoming(message) {
console.log('收到消息:', message);
try {
const data = JSON.parse(message);
// 广播消息给所有客户端
wss.clients.forEach(function each(client) {
if (client !== ws && client.readyState === WebSocket.OPEN) {
client.send(JSON.stringify({
type: 'message',
sender: '用户',
message: data.content,
timestamp: new Date().toISOString()
}));
}
});
} catch (error) {
console.error('消息解析错误:', error);
}
});
// 连接关闭
ws.on('close', function() {
console.log('客户端断开连接');
});
// 错误处理
ws.on('error', function(error) {
console.error('WebSocket 错误:', error);
});
});
WebSocket 状态
// 检查连接状态
switch(socket.readyState) {
case WebSocket.CONNECTING: // 0 - 连接中
console.log('连接中...');
break;
case WebSocket.OPEN: // 1 - 已连接
console.log('已连接');
break;
case WebSocket.CLOSING: // 2 - 关闭中
console.log('正在关闭...');
break;
case WebSocket.CLOSED: // 3 - 已关闭
console.log('已关闭');
break;
}
高级特性
1. 心跳检测
// 心跳检测
let heartbeatInterval;
socket.onopen = function() {
console.log('连接建立');
// 开始心跳
heartbeatInterval = setInterval(() => {
if (socket.readyState === WebSocket.OPEN) {
socket.send(JSON.stringify({ type: 'ping' }));
}
}, 30000);
};
socket.onclose = function() {
// 清除心跳
clearInterval(heartbeatInterval);
};
2. 重连机制
class WebSocketClient {
constructor(url) {
this.url = url;
this.socket = null;
this.reconnectAttempts = 0;
this.maxReconnectAttempts = 5;
this.reconnectDelay = 1000;
}
connect() {
this.socket = new WebSocket(this.url);
this.socket.onopen = () => {
console.log('连接成功');
this.reconnectAttempts = 0;
};
this.socket.onclose = (event) => {
console.log('连接断开,尝试重连...');
this.reconnect();
};
this.socket.onerror = (error) => {
console.error('连接错误:', error);
};
}
reconnect() {
if (this.reconnectAttempts < this.maxReconnectAttempts) {
this.reconnectAttempts++;
const delay = this.reconnectDelay * Math.pow(2, this.reconnectAttempts);
setTimeout(() => {
console.log(`第 ${this.reconnectAttempts} 次重连`);
this.connect();
}, delay);
} else {
console.error('重连次数已达上限');
}
}
send(data) {
if (this.socket.readyState === WebSocket.OPEN) {
this.socket.send(data);
}
}
}
3. 二进制数据传输
// 发送二进制数据
socket.onopen = function() {
// 发送 ArrayBuffer
const buffer = new ArrayBuffer(4);
const view = new Uint8Array(buffer);
view[0] = 1;
view[1] = 2;
view[2] = 3;
view[3] = 4;
socket.send(buffer);
// 发送 Blob
const blob = new Blob(['Hello'], { type: 'text/plain' });
socket.send(blob);
};
// 接收二进制数据
socket.binaryType = 'arraybuffer'; // 或 'blob'
socket.onmessage = function(event) {
if (event.data instanceof ArrayBuffer) {
// 处理 ArrayBuffer
const view = new Uint8Array(event.data);
console.log('收到二进制数据:', view);
} else {
// 处理文本数据
console.log('收到文本数据:', event.data);
}
};
Spring Boot 中使用 WebSocket
添加依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
基础配置类
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
// 消息代理前缀
config.enableSimpleBroker("/topic", "/queue");
// 应用目的地前缀
config.setApplicationDestinationPrefixes("/app");
// 用户目的地前缀(一对一消息)
config.setUserDestinationPrefix("/user");
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
// 注册 WebSocket 端点
registry.addEndpoint("/ws")
.setAllowedOriginPatterns("*")
.withSockJS(); // 支持 SockJS 降级
// 也可以添加多个端点
registry.addEndpoint("/ws-native")
.setAllowedOriginPatterns("*");
}
@Override
public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
// 配置传输限制
registration.setMessageSizeLimit(128 * 1024); // 消息大小限制 128KB
registration.setSendTimeLimit(20 * 1000); // 发送超时 20秒
registration.setSendBufferSizeLimit(512 * 1024); // 发送缓冲区限制 512KB
}
}
控制器示例
@Controller
public class WebSocketController {
// 注入消息模板
@Autowired
private SimpMessagingTemplate messagingTemplate;
/**
* 处理客户端发送的消息
* 目的地:/app/chat
*/
@MessageMapping("/chat")
@SendTo("/topic/messages")
public ChatMessage handleMessage(ChatMessage message) {
message.setTimestamp(new Date());
System.out.println("收到消息: " + message.getContent());
return message;
}
/**
* 发送广播消息
*/
@GetMapping("/broadcast")
public void broadcast(String content) {
ChatMessage message = new ChatMessage();
message.setContent(content);
message.setSender("系统");
message.setTimestamp(new Date());
// 发送到 /topic/messages
messagingTemplate.convertAndSend("/topic/messages", message);
}
/**
* 发送点对点消息
*/
@GetMapping("/sendToUser")
public void sendToUser(String userId, String content) {
ChatMessage message = new ChatMessage();
message.setContent(content);
message.setSender("管理员");
message.setTimestamp(new Date());
// 发送给指定用户:/user/{userId}/queue/messages
messagingTemplate.convertAndSendToUser(
userId,
"/queue/messages",
message
);
}
}
// 消息实体类
@Data
@AllArgsConstructor
@NoArgsConstructor
public class ChatMessage {
private String sender;
private String content;
private Date timestamp;
}
连接拦截器
@Component
public class WebSocketInterceptor extends ChannelInterceptorAdapter {
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
StompHeaderAccessor accessor =
MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
if (StompCommand.CONNECT.equals(accessor.getCommand())) {
// 连接建立时处理
String token = accessor.getFirstNativeHeader("token");
// 验证 token...
System.out.println("用户连接: " + accessor.getSessionId());
} else if (StompCommand.DISCONNECT.equals(accessor.getCommand())) {
// 连接断开时处理
System.out.println("用户断开: " + accessor.getSessionId());
}
return message;
}
}
原生 Java WebSocket(JSR 356)
注解方式
@ServerEndpoint("/chat/{userId}")
@Component
public class ChatEndpoint {
// 存储所有连接
private static final Map<String, Session> sessions = new ConcurrentHashMap<>();
// 存储用户ID和session的映射
private static final Map<String, String> userSessionMap = new ConcurrentHashMap<>();
/**
* 连接建立时调用
*/
@OnOpen
public void onOpen(Session session, @PathParam("userId") String userId) {
System.out.println("连接建立: " + session.getId() + ", 用户: " + userId);
// 保存连接
sessions.put(session.getId(), session);
userSessionMap.put(userId, session.getId());
// 通知其他用户有新用户上线
broadcast("系统", "用户 " + userId + " 上线了");
}
/**
* 收到消息时调用
*/
@OnMessage
public void onMessage(String message, Session session,
@PathParam("userId") String userId) {
System.out.println("收到消息: " + message + " from: " + userId);
try {
// 解析消息
JSONObject json = new JSONObject(message);
String content = json.getString("content");
String toUserId = json.optString("to", null);
if (toUserId != null) {
// 私聊消息
sendToUser(userId, toUserId, content);
} else {
// 群发消息
broadcast(userId, content);
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 连接关闭时调用
*/
@OnClose
public void onClose(Session session, @PathParam("userId") String userId) {
System.out.println("连接关闭: " + session.getId());
// 移除连接
sessions.remove(session.getId());
userSessionMap.remove(userId);
// 通知其他用户
broadcast("系统", "用户 " + userId + " 下线了");
}
/**
* 发生错误时调用
*/
@OnError
public void onError(Session session, Throwable error) {
System.out.println("连接错误: " + session.getId());
error.printStackTrace();
}
/**
* 广播消息给所有用户
*/
private void broadcast(String sender, String content) {
JSONObject message = new JSONObject();
message.put("sender", sender);
message.put("content", content);
message.put("timestamp", System.currentTimeMillis());
message.put("type", "broadcast");
// 发送给所有连接的客户端
for (Session session : sessions.values()) {
if (session.isOpen()) {
try {
session.getAsyncRemote().sendText(message.toString());
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
/**
* 发送私聊消息
*/
private void sendToUser(String fromUserId, String toUserId, String content) {
String toSessionId = userSessionMap.get(toUserId);
if (toSessionId != null) {
Session toSession = sessions.get(toSessionId);
if (toSession != null && toSession.isOpen()) {
try {
JSONObject message = new JSONObject();
message.put("sender", fromUserId);
message.put("content", content);
message.put("timestamp", System.currentTimeMillis());
message.put("type", "private");
toSession.getAsyncRemote().sendText(message.toString());
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
编程方式(继承 Endpoint 类)
@ServerEndpoint("/game")
public class GameEndpoint extends Endpoint {
private static final Set<Session> sessions = Collections.synchronizedSet(new HashSet<>());
@Override
public void onOpen(Session session, EndpointConfig config) {
System.out.println("新连接: " + session.getId());
sessions.add(session);
// 添加消息处理器
session.addMessageHandler(new MessageHandler.Whole<String>() {
@Override
public void onMessage(String message) {
System.out.println("收到: " + message);
// 处理游戏逻辑
handleGameMessage(session, message);
}
});
// 发送欢迎消息
try {
JSONObject welcome = new JSONObject();
welcome.put("type", "welcome");
welcome.put("message", "欢迎加入游戏!");
welcome.put("sessionId", session.getId());
session.getBasicRemote().sendText(welcome.toString());
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void onClose(Session session, CloseReason closeReason) {
System.out.println("连接关闭: " + session.getId());
sessions.remove(session);
// 通知其他玩家
broadcastPlayerLeft(session.getId());
}
@Override
public void onError(Session session, Throwable thr) {
System.err.println("连接错误: " + session.getId());
thr.printStackTrace();
}
private void handleGameMessage(Session session, String message) {
try {
JSONObject json = new JSONObject(message);
String type = json.getString("type");
switch (type) {
case "move":
// 处理移动
handlePlayerMove(session, json);
break;
case "chat":
// 处理聊天
handleChatMessage(session, json);
break;
default:
System.out.println("未知消息类型: " + type);
}
} catch (Exception e) {
e.printStackTrace();
}
}
private void handlePlayerMove(Session session, JSONObject moveData) {
// 处理玩家移动逻辑
// 广播给所有玩家
broadcastGameUpdate(moveData);
}
private void handleChatMessage(Session session, JSONObject chatData) {
// 广播聊天消息
JSONObject broadcastMsg = new JSONObject();
broadcastMsg.put("type", "chat");
broadcastMsg.put("sender", session.getId());
broadcastMsg.put("message", chatData.getString("message"));
broadcastMsg.put("timestamp", System.currentTimeMillis());
broadcast(broadcastMsg.toString());
}
private void broadcast(String message) {
synchronized (sessions) {
for (Session s : sessions) {
if (s.isOpen()) {
try {
s.getBasicRemote().sendText(message);
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
}
配置文件
application.yml 配置
spring:
websocket:
# WebSocket 配置
enabled: true
server:
# 服务器配置
port: 8080
servlet:
context-path: /api
# 自定义配置
websocket:
max-sessions: 1000
heartbeat-interval: 30000
max-message-size: 128KB
心跳检测和连接管理
@Component
public class WebSocketHeartbeat {
@Autowired
private SimpMessagingTemplate messagingTemplate;
private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
@PostConstruct
public void init() {
// 每30秒发送一次心跳
scheduler.scheduleAtFixedRate(() -> {
try {
messagingTemplate.convertAndSend("/topic/heartbeat",
Map.of("timestamp", System.currentTimeMillis(),
"type", "heartbeat"));
} catch (Exception e) {
e.printStackTrace();
}
}, 0, 30, TimeUnit.SECONDS);
}
@PreDestroy
public void destroy() {
scheduler.shutdown();
}
}
消息编码器/解码器
// 自定义消息编解码器
@Component
public class ChatMessageConverter implements MessageConverter {
@Override
public Message<?> toMessage(Object payload, MessageHeaders headers) {
if (payload instanceof ChatMessage) {
ChatMessage msg = (ChatMessage) payload;
byte[] bytes = serializeMessage(msg);
return MessageBuilder.createMessage(bytes, headers);
}
return null;
}
@Override
public Object fromMessage(Message<?> message, Class<?> targetClass) {
if (targetClass == ChatMessage.class) {
byte[] bytes = (byte[]) message.getPayload();
return deserializeMessage(bytes);
}
return null;
}
private byte[] serializeMessage(ChatMessage message) {
try {
return new ObjectMapper().writeValueAsBytes(message);
} catch (Exception e) {
throw new RuntimeException("序列化失败", e);
}
}
private ChatMessage deserializeMessage(byte[] bytes) {
try {
return new ObjectMapper().readValue(bytes, ChatMessage.class);
} catch (Exception e) {
throw new RuntimeException("反序列化失败", e);
}
}
}
集群支持
@Configuration
@EnableRedisRepositories
public class RedisConfig {
@Bean
public RedisMessageListenerContainer redisContainer(RedisConnectionFactory connectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
return container;
}
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
RedisTemplate<String, Object> template = new RedisTemplate<>();
template.setConnectionFactory(factory);
template.setKeySerializer(new StringRedisSerializer());
template.setValueSerializer(new Jackson2JsonRedisSerializer<>(Object.class));
return template;
}
}
// Redis 广播消息
@Component
public class RedisMessagePublisher {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
public void publish(String channel, Object message) {
redisTemplate.convertAndSend(channel, message);
}
}
@Component
public class RedisMessageSubscriber implements MessageListener {
@Autowired
private SimpMessagingTemplate messagingTemplate;
@Override
public void onMessage(Message message, byte[] pattern) {
// 处理从 Redis 收到的消息
// 转发给 WebSocket 客户端
String channel = new String(pattern);
String msg = new String(message.getBody());
messagingTemplate.convertAndSend("/topic/" + channel, msg);
}
}
Spring Boot 的 STOMP 实现更加完整和易于使用,而原生 WebSocket 则更加灵活。
来源:https://www.cnblogs.com/syf0824/p/19368779 |