人生没有重来 發表於 2026-4-10 18:00:00

WebSocket实现实时通知

<h1 id="后端">后端</h1>
<h2 id="引入依赖">引入依赖</h2>
<pre><code class="language-xml">&lt;!-- WebSocket支持 --&gt;
&lt;dependency&gt;
    &lt;groupId&gt;org.springframework.boot&lt;/groupId&gt;
    &lt;artifactId&gt;spring-boot-starter-websocket&lt;/artifactId&gt;
&lt;/dependency&gt;
</code></pre>
<h2 id="添加server">添加Server</h2>
<pre><code class="language-java">
import com.ruoyi.common.websocket.WebSocketUsers;
import jakarta.websocket.*;
import jakarta.websocket.server.PathParam;
import jakarta.websocket.server.ServerEndpoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;

/**
* WebSocket服务端
* 路径格式:/websocket/{userId} 用于区分不同用户
*/
@Component
@ServerEndpoint("/websocket/{userId}")
public class WebSocketServer {

    private static final Logger log = LoggerFactory.getLogger(WebSocketServer.class);

    /**
   * 当前连接数(在线人数)
   */
    private static AtomicInteger onlineCount = new AtomicInteger(0);

    /**
   * 当前会话
   */
    private Session session;

    /**
   * 用户ID
   */
    private Long userId;

    /**
   * 连接建立成功调用的方法
   */
    @OnOpen
    public void onOpen(Session session, @PathParam("userId") Long userId) {
      this.session = session;
      this.userId = userId;

      // 使用会话管理类添加用户
      WebSocketUsers.addUser(userId, session);

      onlineCount.incrementAndGet();
      log.info("用户 {} 连接WebSocket,当前在线人数:{}", userId, onlineCount.get());

      // 发送连接成功消息
      try {
            String message = "{\"type\":\"connected\",\"message\":\"WebSocket连接成功\",\"userId\":" + userId + "}";
            session.getBasicRemote().sendText(message);
      } catch (IOException e) {
            log.error("发送欢迎消息失败", e);
      }
    }

    /**
   * 连接关闭调用的方法
   */
    @OnClose
    public void onClose() {
      // 使用会话管理类移除用户
      WebSocketUsers.removeUser(this.userId);

      onlineCount.decrementAndGet();
      log.info("用户 {} 断开连接,当前在线人数:{}", userId, onlineCount.get());
    }

    /**
   * 收到客户端消息后调用的方法
   */
    @OnMessage
    public void onMessage(String message, Session session) {
      log.info("收到用户 {} 的消息: {}", userId, message);

      // 处理心跳消息
      try {
            if (message.contains("ping")) {
                String reply = "{\"type\":\"pong\",\"timestamp\":" + System.currentTimeMillis() + "}";
                session.getBasicRemote().sendText(reply);
            } else {
                // 简单回复客户端
                String reply =
                        "{\"type\":\"reply\",\"message\":\"服务器收到消息\",\"timestamp\":" + System.currentTimeMillis() + "}";
                session.getBasicRemote().sendText(reply);
            }
      } catch (IOException e) {
            log.error("回复消息失败", e);
      }
    }

    /**
   * 发生错误时调用
   */
    @OnError
    public void onError(Session session, Throwable error) {
      log.error("WebSocket连接发生错误,用户ID:{}", this.userId, error);
      // 发生错误时,移除用户会话
      if (this.userId != null) {
            WebSocketUsers.removeUser(this.userId);
      }
    }

    /**
   * 获取当前在线人数
   */
    public static int getOnlineCount() {
      return onlineCount.get();
    }

    /**
   * 发送消息到指定用户(你业务里调用的就是这个方法!)
   *
   * @param message 要推送的消息内容
   * @param userId指定用户ID
   */
    public void sendInfo(String message, Long userId) {
      log.info("发送消息到用户:{},消息内容:{}", userId, message);

      // 判断用户是否在线
      Session session = WebSocketUsers.getSession(userId);
      if (userId != null &amp;&amp; session != null) {
            try {
                // 异步发送消息给前端
                session.getBasicRemote().sendText(message);
            } catch (Exception e) {
                log.error("发送消息到用户【{}】异常:{}", userId, e.getMessage());
            }
      } else {
            log.warn("用户【{}】不在线,消息暂存", userId);
      }
    }
}
</code></pre>
<h2 id="websocket用户会话管理">WebSocket用户会话管理</h2>
<pre><code class="language-java">import jakarta.websocket.Session;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;

/**
* WebSocket用户会话管理
* 存储所有在线用户的会话信息
*/
public class WebSocketUsers {
    private static final Logger log = LoggerFactory.getLogger(WebSocketUsers.class);

    /**
   * 用户会话存储
   * key: userId (用户ID)
   * value: Session (WebSocket会话)
   */
    private static ConcurrentHashMap&lt;Long, Session&gt; USERS = new ConcurrentHashMap&lt;&gt;();

    /**
   * 添加用户会话
   */
    public static void addUser(Long userId, Session session) {
      USERS.put(userId, session);
      log.info("用户 {} 加入WebSocket连接,当前在线人数:{}", userId, USERS.size());
    }

    /**
   * 移除用户会话
   */
    public static void removeUser(Long userId) {
      Session session = USERS.remove(userId);
      if (session != null) {
            try {
                if (session.isOpen()) {
                  session.close();
                }
            } catch (IOException e) {
                log.error("关闭会话失败", e);
            }
      }
      log.info("用户 {} 断开WebSocket连接,当前在线人数:{}", userId, USERS.size());
    }

    /**
   * 根据会话对象移除用户
   */
    public static boolean removeUserBySession(Session session) {
      Long userId = null;
      for (Long key : USERS.keySet()) {
            if (USERS.get(key).equals(session)) {
                userId = key;
                break;
            }
      }
      if (userId != null) {
            removeUser(userId);
            return true;
      }
      return false;
    }

    /**
   * 获取用户会话
   */
    public static Session getSession(Long userId) {
      return USERS.get(userId);
    }

    /**
   * 获取在线用户数
   */
    public static int getOnlineCount() {
      return USERS.size();
    }

    /**
   * 判断用户是否在线
   */
    public static boolean isOnline(Long userId) {
      return USERS.containsKey(userId) &amp;&amp; USERS.get(userId).isOpen();
    }

    /**
   * 发送消息给指定用户
   */
    public static void sendMessage(Long userId, String message) {
      Session session = USERS.get(userId);
      if (session != null &amp;&amp; session.isOpen()) {
            try {
                session.getBasicRemote().sendText(message);
                log.debug("发送消息给用户 {}: {}", userId, message);
            } catch (IOException e) {
                log.error("发送消息给用户 {} 失败", userId, e);
                // 发送失败,移除该用户会话
                removeUser(userId);
            }
      }
    }

    /**
   * 发送消息给所有在线用户
   */
    public static void sendMessageToAll(String message) {
      for (Long userId : USERS.keySet()) {
            sendMessage(userId, message);
      }
    }
}
</code></pre>
<h1 id="前端">前端</h1>
<pre><code class="language-javascript">created() {
this.initWebSocket()
},
beforeDestroy() {
this.closeWebSocket()
},
methods: {
    // 初始化WebSocket
    initWebSocket() {
      if (typeof WebSocket === 'undefined') {
      this.$message.warning("您的浏览器不支持WebSocket,无法实时获取通知数量")
      return
      }
      const userId = this.$store.state.user.id
      // 若依原生ws地址
      const wsUrl = `ws://${process.env.VUE_APP_BASE_URL}/websocket/${userId}`
      this.ws = new WebSocket(wsUrl)
      // 收到消息 → 立即刷新通知列表
      this.ws.onmessage = (evt) =&gt; {
      console.log("收到实时通知:", evt.data)
      try {
          JSON.parse(evt.data)
      } catch (e) {
          this.initCount()
      }
      }
    },
    // 关闭
    closeWebSocket() {
      this.ws &amp;&amp; this.ws.close()
    }
}
</code></pre>


</div>
<div id="MySignature" role="contentinfo">
    <p>本文来自博客园,作者:喵师傅,转载请注明原文链接:https://www.cnblogs.com/wywblogs/p/19848237</p><br><br>
来源:https://www.cnblogs.com/wywblogs/p/19848237
頁: [1]
查看完整版本: WebSocket实现实时通知