快乐肥猪仔 發表於 2026-1-13 14:35:28

SpringBoot WebSocket多消息推送过程

<div id="navCategory"><h5 class="catalogue">目录</h5><ul class="first_class_ul"><li>实现功能</li><li>添加依赖</li><li>websocket配置类</li><li>websocket拦截器</li><li>websocket处理器</li><li>websocket工具类封装</li><li>websocket控制器</li><li>测试地址</li><li>总结</li></ul></div><p class="maodian"></p><h2>实现功能</h2>
<ul><li>1. 给某个分组推送消息</li><li>2. 给所有分组推送消息</li><li>3. 给所有用户推送消息</li><li>4. 给某一个用户单独推送消息(在分组中)</li><li>5. 给某一个用户推送消息(不在在分组中)</li><li>6. 用户可能存在多个分组</li><li>7. 用户多设备登录</li><li>8. 监控连接心跳(后端实现不需要前端实现配合)</li><li>9. 失败超过5次关闭连接</li></ul>
<p>使用公平锁支持并发。</p>
<p class="maodian"></p><h2>添加依赖</h2>
<div class="jb51code"><pre class="brush:xml;">      &lt;!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-websocket --&gt;
      &lt;dependency&gt;
            &lt;groupId&gt;org.springframework.boot&lt;/groupId&gt;
            &lt;artifactId&gt;spring-boot-starter-websocket&lt;/artifactId&gt;
      &lt;/dependency&gt;</pre></div>
<p class="maodian"></p><h2>websocket配置类</h2>
<div class="jb51code"><pre class="brush:java;">/**
* @Description websocket配置类
* @Author WangKun
* @Date 2025/8/10 19:02
* @Version
*/
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {

    /**
   * @Description websocket拦截器配置端点
      * @param registry WebSocket处理程序注册表,用于注册WebSocket处理器和拦截器
   * @Throws 可能抛出WebSocket相关的异常
   * @Return void 无返回值
   * @Date 2025-08-10 19:14:42
   * @Author WangKun
   **/
    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
      // 注册WebSocket处理器,并设置URL路径为"/ws/{id}"
      registry.addHandler(webSocketHandler(), "/ws/{id}")
                // 允许所有来源的跨域请求
                .setAllowedOrigins("*");
    }

    /**
   * @Description websocket核心处理器
   * @Throws 可能抛出Bean创建相关的异常
   * @Return JWebSocketManager 返回WebSocket管理器的实例
   * @Date 2025-08-10 19:13:48
   * @Author WangKun
   **/
    @Bean
    public WebSocketManager webSocketManager() {
      return new WebSocketManager();
    }

    /**
   * @Description websocket拦截处理
   * @Throws 可能抛出Bean创建相关的异常
   * @Return JWebSocketHandler 返回WebSocket处理器的实例
   * @Date 2025-08-10 19:15:20
   * @Author WangKun
   **/
    @Bean // 将此方法返回的对象注册为Spring容器中的Bean
    public WebSocketHandler webSocketHandler() {
      // 创建WebSocket处理器实例,并注入WebSocket管理器
      return new WebSocketHandler(webSocketManager());
    }
}</pre></div>
<p class="maodian"></p><h2>websocket拦截器</h2>
<div class="jb51code"><pre class="brush:java;">/**
* @Description websocket拦截器
* @Author WangKun
* @Date 2025/8/10 19:14
* @Version
*/
@Slf4j
public class WebSocketHandler extends TextWebSocketHandler {

    private final WebSocketManager manager; // WebSocket管理器,用于管理WebSocket会话

    /**
   * @Description 构造器注入核心
      * @param manager WebSocket管理器实例
   * @Throws
   * @Return
   * @Date 2025-08-21 09:27:35
   * @Author WangKun
   **/
    public WebSocketHandler(WebSocketManager manager) {
      this.manager = manager;
    }

    /**
   * @param session WebSocket会话对象
   * @Description 初始化会话
   * @Throws 当会话建立过程中出现异常时抛出
   * @Return void
   * @Date 2025-08-10 19:17:57
   * @Author WangKun
   **/
    @Override
    public void afterConnectionEstablished(WebSocketSession session) {
      manager.addSession(extractId(session), session);
    }

    /**
   * @param session WebSocket会话对象
   * @param message 接收到的文本消息
   * @Description PONG消息处理
   * @Throws 当消息处理过程中出现异常时抛出
   * @Return void
   * @Date 2025-08-10 19:18:01
   * @Author WangKun
   **/
    @Override
    protected void handleTextMessage(WebSocketSession session, TextMessage message) {
      String payload = message.getPayload();
      log.info("收到来自[{}]的PONG消息: {}", extractId(session), payload);
      if ("PONG".equals(payload)) {
            manager.handlePongMessage(session);
      }
    }

    /**
   * @param session WebSocket会话对象
   * @param message 接收到的PONG消息
   * @Description 消息处理
   * @Throws 当消息处理过程中出现异常时抛出
   * @Return void
   * @Date 2025-08-10 19:18:08
   * @Author WangKun
   **/
    @Override
    public void handlePongMessage(WebSocketSession session, PongMessage message) {
      manager.handlePongMessage(session);
    }

    /**
   * @param session WebSocket会话对象
   * @param status 会话关闭状态
   * @Description 会话关闭
   * @Throws 当会话关闭过程中出现异常时抛出
   * @Return void
   * @Date 2025-08-10 19:18:13
   * @Author WangKun
   **/
    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
      manager.removeToSession(session);
    }

    /**
   * @param session WebSocket会话对象
   * @Description 拦截请求参数处理
   * @Throws 当URI解析过程中出现异常时抛出
   * @Return java.lang.String 返回从URI中提取的ID
   * @Date 2025-08-10 19:18:22
   * @Author WangKun
   **/
    private String extractId(WebSocketSession session) {
      // 从URI路径提取ID:/ws/{id}
      String path = session.getUri().getPath();
      return path.substring(path.lastIndexOf('/') + 1);
    }
}</pre></div>
<p class="maodian"></p><h2>websocket处理器</h2>
<div class="jb51code"><pre class="brush:java;">/**
* @Description websocket处理器
* @Author WangKun
* @Date 2025/8/10 19:20
* @Version
*/
@Slf4j
public class WebSocketManager {

    // ID -&gt; 设备会话集合 (支持多设备)
    private final ConcurrentMap&lt;String, Set&lt;WebSocketSession&gt;&gt; SESSIONS = new ConcurrentHashMap&lt;&gt;();
    // 会话 -&gt; ID
    private final ConcurrentMap&lt;WebSocketSession, String&gt; SESSION_MAP = new ConcurrentHashMap&lt;&gt;();
    // 分组ID -&gt; ID集合
    private final ConcurrentMap&lt;String, Set&lt;String&gt;&gt; GROUPS = new ConcurrentHashMap&lt;&gt;();
    // 会话失败计数器 (Session -&gt; 失败次数)
    private final ConcurrentMap&lt;WebSocketSession, Integer&gt; FAILURE_COUNTS = new ConcurrentHashMap&lt;&gt;();
    // 心跳状态监控 (Session -&gt; 最后活跃时间)
    private final ConcurrentMap&lt;WebSocketSession, Long&gt; LAST_ACTIVE_TIMES = new ConcurrentHashMap&lt;&gt;();
    // 公平锁,确保所有操作的原子性
    private final ReentrantLock MAIN_LOCK = new ReentrantLock(true);
    // 心跳状态监控
    private final ConcurrentMap&lt;WebSocketSession, Long&gt; lastPongTimes = new ConcurrentHashMap&lt;&gt;();
    private final ConcurrentMap&lt;WebSocketSession, ScheduledFuture&lt;?&gt;&gt; pingTasks = new ConcurrentHashMap&lt;&gt;();
    // 心跳间隔25秒
    private static final long HEARTBEAT_INTERVAL = 25000;
    // 心跳超时40秒
    private static final long HEARTBEAT_TIMEOUT = 40000;
    // 心跳PING NIO包装
    private static final ByteBuffer PING_PAYLOAD = ByteBuffer.wrap(new byte[]{0x1});

    // 线程池
    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(4);

    public WebSocketManager() {
      // 心跳检测任务
      scheduler.scheduleAtFixedRate(this::checkHeartbeats, 10, 10, TimeUnit.SECONDS);
      // 会话清理任务
      scheduler.scheduleAtFixedRate(this::cleanExpiredSessions, 5, 5, TimeUnit.MINUTES);
    }

    /**
   * @param id
   * @param session
   * @Description 建立连接添加到session
   * @Throws
   * @Return void
   * @Date 2025-08-10 19:37:14
   * @Author WangKun
   **/
    public void addSession(String id, WebSocketSession session) {
      MAIN_LOCK.lock();
      try {
            if (!SESSION_MAP.containsKey(session)) {
                // 会话管理
                SESSIONS.computeIfAbsent(id, k -&gt; ConcurrentHashMap.newKeySet()).add(session);
                SESSION_MAP.put(session, id);
                // 初始化心跳状态
                lastPongTimes.put(session, System.currentTimeMillis());
                startPingTask(session);
                log.info("[连接建立] {}: 会话:{}", id, session.getId());
            }
      } finally {
            MAIN_LOCK.unlock();
      }
    }

    /**
   * @param session
   * @Description 移除session中的
   * @Throws
   * @Return void
   * @Date 2025-08-10 19:40:11
   * @Author WangKun
   **/
    public void removeToSession(WebSocketSession session) {
      MAIN_LOCK.lock();
      try {
            String id = SESSION_MAP.get(session);
            if (id != null) {
                // 清理会话
                Set&lt;WebSocketSession&gt; sessions = SESSIONS.get(id);
                if (sessions != null) {
                  sessions.remove(session);
                  if (sessions.isEmpty()) {
                        SESSIONS.remove(id);
                  }
                }
                // 清理心跳任务
                stopPingTask(session);
                lastPongTimes.remove(session);
                SESSION_MAP.remove(session);

                log.info("[连接关闭] {}:会话{}: ", id, session.getId());
            }
      } finally {
            MAIN_LOCK.unlock();
      }
    }

    /**
   * @param session
   * @Description 开始心跳PING
   * @Throws
   * @Return void
   * @Date 2025-08-10 19:41:06
   * @Author WangKun
   **/
    private void startPingTask(WebSocketSession session) {
      ScheduledFuture&lt;?&gt; task = scheduler.scheduleAtFixedRate(() -&gt; {
            MAIN_LOCK.lock();
            try {
                if (session.isOpen()) {
                  try {
                        // 标准WebSocket Ping发送
                        if (session instanceof StandardWebSocketSession) {
                            ((StandardWebSocketSession) session).getNativeSession()
                                    .getAsyncRemote().sendPing(PING_PAYLOAD);
                        } else {
                            session.sendMessage(new PingMessage(PING_PAYLOAD));
                        }
                        log.info(" 会话{}: ", session.getId());
                  } catch (Exception e) {
                        log.info(" 会话{}: 错误{}: {}", session.getId(), e.getMessage(), session.getUri());
                        closeSession(session);
                  }
                }
            } finally {
                MAIN_LOCK.unlock();
            }
      }, HEARTBEAT_INTERVAL, HEARTBEAT_INTERVAL, TimeUnit.MILLISECONDS);
      pingTasks.put(session, task);
    }

    /**
   * @param session
   * @Description 心跳停止
   * @Throws
   * @Return void
   * @Date 2025-08-10 19:41:55
   * @Author WangKun
   **/
    private void stopPingTask(WebSocketSession session) {
      ScheduledFuture&lt;?&gt; task = pingTasks.remove(session);
      if (task != null) {
            task.cancel(true);
      }
    }

    /**
   * @param session
   * @Description 心跳PONG接收
   * @Throws
   * @Return void
   * @Date 2025-08-10 19:42:20
   * @Author WangKun
   **/
    public void handlePongMessage(WebSocketSession session) {
      MAIN_LOCK.lock();
      try {
            lastPongTimes.put(session, System.currentTimeMillis());
            log.info(" 会话{}: {}", session.getId(), session.getUri());
      } finally {
            MAIN_LOCK.unlock();
      }
    }

    /**
   * @param id
   * @param groupId
   * @Description 添加至分组
   * @Throws
   * @Return void
   * @Date 2025-08-10 19:43:20
   * @Author WangKun
   **/
    public boolean addToGroup(String id, String groupId) {
      MAIN_LOCK.lock();
      if (StringUtils.isBlank(id) &amp;&amp; StringUtils.isEmpty(id)) {
            log.info("加入分组失败,id 为空");
            return false;
      }
      if (StringUtils.isBlank(groupId) &amp;&amp; StringUtils.isEmpty(groupId)) {
            log.info("加入分组失败,groupId 为空");
            return false;
      }
      try {
            boolean flag = GROUPS.computeIfAbsent(groupId, k -&gt; ConcurrentHashMap.newKeySet()).add(id);
            log.info("[加入分组] {}: → 分组{}:(当前组内: {})", id, groupId, GROUPS.get(groupId).size());
            return flag;
      } finally {
            MAIN_LOCK.unlock();
      }
    }

    /**
   * @param id
   * @param groupId
   * @Description 从分组移除
   * @Throws
   * @Return void
   * @Date 2025-08-10 19:44:10
   * @Author WangKun
   **/
    public boolean removeToGroup(String id, String groupId) {
      MAIN_LOCK.lock();
      try {
            if (StringUtils.isBlank(id) &amp;&amp; StringUtils.isEmpty(id)) {
                log.info("移除分组失败,id 为空");
                return false;
            }
            if (StringUtils.isBlank(groupId) &amp;&amp; StringUtils.isEmpty(groupId)) {
                log.info("移除分组失败,groupId 为空");
                return false;
            }
            Set&lt;String&gt; groups = GROUPS.get(groupId);
            if (groups != null) {
                groups.remove(id);
                if (groups.isEmpty()) {
                  GROUPS.remove(groupId);
                }
                log.info("[离开分组] {}:← 分组{}: ", id, groupId);
                return true;
            }
            return false;
      } finally {
            MAIN_LOCK.unlock();
      }
    }

    /**
   * @param id
   * @param message
   * @Description 消息发送,多设备
   * @Throws
   * @Return boolean
   * @Date 2025-08-10 19:44:48
   * @Author WangKun
   **/
    public boolean sendMessages(String id, String message) {
      MAIN_LOCK.lock();
      try {
            if (StringUtils.isBlank(id) &amp;&amp; StringUtils.isEmpty(id)) {
                log.info("消息发送失败,id 为空");
                return false;
            }
            Set&lt;WebSocketSession&gt; sessions = SESSIONS.get(id);
            if (sessions == null || sessions.isEmpty()) {
                log.info("[单发消息失败] {}: 无活跃会话", id);
                return false;
            }
            // 创建副本防止并发修改
            List&lt;WebSocketSession&gt; sessionsCopy = new ArrayList&lt;&gt;(sessions);
            boolean allSuccess = true;
            for (WebSocketSession session : sessionsCopy) {
                if (validateSession(session)) {
                  allSuccess = false;
                  continue;
                }
                allSuccess &amp;= sendMessage(session, message);
            }
            log.info("[单发消息完成] {}: 设备数{}:成功{} ", id, sessionsCopy.size(), allSuccess ? "全部" : "部分");
            return allSuccess;
      } finally {
            MAIN_LOCK.unlock();
      }
    }

    /**
   * @param groupId
   * @param message
   * @Description 按组发送
   * @Throws
   * @Return boolean
   * @Date 2025-08-10 19:46:15
   * @Author WangKun
   **/
    public boolean sendGroupMessages(String groupId, String message) {
      MAIN_LOCK.lock();
      try {
            if (StringUtils.isBlank(groupId)&amp;&amp; StringUtils.isEmpty(message)) {
                log.info("[按组发失败] groudId为空");
                return false;
            }
            Set&lt;String&gt; ids = GROUPS.get(groupId);
            if (ids == null || ids.isEmpty()) {
                log.info("[按组发失败] 分组{}: 无成员", groupId);
                return false;
            }
            boolean allSuccess = true;
            // 创建副本防止并发修改
            for (String id : new ArrayList&lt;&gt;(ids)) {
                allSuccess &amp;= sendMessages(id, message);
            }
            log.info("[组发完成] 分组{}:成员数{}:成功{}", groupId, ids.size(), allSuccess ? "全部" : "部分");
            return allSuccess;
      } finally {
            MAIN_LOCK.unlock();
      }
    }

    /**
   * @param message
   * @Description 全局广播无论在不在分组
   * @Throws
   * @Return boolean
   * @Date 2025-08-10 19:47:33
   * @Author WangKun
   **/
    public boolean sendGlobalMessages(String message) {
      MAIN_LOCK.lock();
      try {
            if (SESSIONS.isEmpty()) {
                log.info("[广播失败] 无活跃");
                return false;
            }
            boolean allSuccess = true;
            int totalSessions = 0;
            // 创建副本防止并发修改
            Set&lt;String&gt; ids = new HashSet&lt;&gt;(SESSIONS.keySet());
            for (String id : ids) {
                Set&lt;WebSocketSession&gt; sessions = SESSIONS.get(id);
                if (sessions != null) {
                  // 创建会话副本
                  List&lt;WebSocketSession&gt; sessionsCopy = new ArrayList&lt;&gt;(sessions);
                  totalSessions += sessionsCopy.size();
                  for (WebSocketSession session : sessionsCopy) {
                        if (validateSession(session)) {
                            allSuccess = false;
                            continue;
                        }
                        allSuccess &amp;= sendMessage(session, message);
                  }
                }
            }
            log.info("[广播完成] 总{}: 总设备{}: 成功{} ", ids.size(), totalSessions, allSuccess ? "全部" : "部分");
            return allSuccess;
      } finally {
            MAIN_LOCK.unlock();
      }
    }

    /**
   * @param session
   * @param message
   * @Description 会话发送消息
   * @Throws
   * @Return boolean
   * @Date 2025-08-10 19:49:00
   * @Author WangKun
   **/
    private boolean sendMessage(WebSocketSession session, String message) {
      try {
            if (!session.isOpen()) {
                throw new IllegalStateException("会话已关闭");
            }
            // 会话级别的同步,锁,防止乱发
            synchronized (session) {
                session.sendMessage(new TextMessage(message));
                recordSuccessfulSend(session);
                return true;
            }
      } catch (Exception e) {
            handleSendFailure(session, e);
            return false;
      }
    }

    /**
   * @param session
   * @Description 发送记录 ,心跳更新状态
   * @Throws
   * @Return void
   * @Date 2025-08-10 19:50:41
   * @Author WangKun
   **/
    private void recordSuccessfulSend(WebSocketSession session) {
      FAILURE_COUNTS.put(session, 0);
      LAST_ACTIVE_TIMES.put(session, System.currentTimeMillis());
    }

    /**
   * @param session
   * @param e
   * @Description 失败关闭连接
   * @Throws
   * @Return void
   * @Date 2025-08-10 19:51:10
   * @Author WangKun
   **/
    private void handleSendFailure(WebSocketSession session, Exception e) {
      MAIN_LOCK.lock();
      try {
            int failures = FAILURE_COUNTS.getOrDefault(session, 0) + 1;
            FAILURE_COUNTS.put(session, failures);
            log.info("[发送失败] 会话{}: 次数{}:原因{}: ", session.getId(), failures, e.getMessage());
            if (failures &gt;= 5) {
                log.info("[自动清理] 达到失败上限 会话: {}", session.getId());
                closeSession(session);
            }
      } finally {
            MAIN_LOCK.unlock();
      }
    }

    /**
   * @param
   * @Description 心跳健康监控
   * @Throws
   * @Return void
   * @Date 2025-08-10 19:52:24
   * @Author WangKun
   **/
    private void checkHeartbeats() {
      MAIN_LOCK.lock();
      try {
            long currentTime = System.currentTimeMillis();
            new ArrayList&lt;&gt;(lastPongTimes.keySet()).forEach(session -&gt; {
                Long lastPong = lastPongTimes.get(session);
                if (lastPong != null &amp;&amp; currentTime - lastPong &gt; HEARTBEAT_TIMEOUT) {
                  log.info("[心跳超时] 会话{}:最后Pong{}:前 ", session.getId(), currentTime - lastPong);
                  closeSession(session);
                }
            });
      } finally {
            MAIN_LOCK.unlock();
      }
    }

    /**
   * @param session
   * @Description 检验session
   * @Throws
   * @Return boolean
   * @Date 2025-08-10 19:53:35
   * @Author WangKun
   **/
    private boolean validateSession(WebSocketSession session) {
      if (!session.isOpen()) {
            removeToSession(session);
            return true;
      }
      return false;
    }

    /**
   * @param session
   * @Description 关闭session, 会话结束
   * @Throws
   * @Return void
   * @Date 2025-08-10 19:54:07
   * @Author WangKun
   **/
    private void closeSession(WebSocketSession session) {
      try {
            if (session.isOpen()) {
                session.close(CloseStatus.SESSION_NOT_RELIABLE);
            }
      } catch (Exception e) {
            log.info("[关闭异常] 会话{}:原因{}: ", session.getId(), e.getMessage());
      } finally {
            removeToSession(session);
      }
    }

    /**
   * @param
   * @Description 清理无效会话
   * @Throws
   * @Return void
   * @Date 2025-08-10 19:55:08
   * @Author WangKun
   **/
    private void cleanExpiredSessions() {
      MAIN_LOCK.lock();
      try {
            new ArrayList&lt;&gt;(SESSION_MAP.keySet()).forEach(session -&gt; {
                if (!session.isOpen()) {
                  removeToSession(session);
                }
            });
      } finally {
            MAIN_LOCK.unlock();
      }
    }

}</pre></div>
<p class="maodian"></p><h2>websocket工具类封装</h2>
<p>(SpringUtils就是一个自己封装获取spring上下文的工具)</p>
<div class="jb51code"><pre class="brush:java;">/**
* @Description websocket工具类
* @Author WangKun
* @Date 2025/8/11 10:08
* @Version
*/
@Component
public class WebSocketUtils {

    /**
   * websocket处理器资源
   **/
    private static final WebSocketManager WEB_SOCKET_MANAGER = SpringUtils.getBean(WebSocketManager.class);


    /**
   * @param groupId
   * @param id
   * @Description 添加至分组
   * @Throws
   * @Return boolean
   * @Date 2025-08-11 10:14:27
   * @Author WangKun
   **/
    public static boolean addToGroup(String id, String groupId) {
      return WEB_SOCKET_MANAGER.addToGroup(id, groupId);
    }

    /**
   * @param id
   * @param groupId
   * @Description 移除分组
   * @Throws
   * @Return boolean
   * @Date 2025-08-11 10:44:47
   * @Author WangKun
   **/
    public static boolean removeToGroup(String id, String groupId) {
      return WEB_SOCKET_MANAGER.removeToGroup(id, groupId);
    }

    /**
   * @param id
   * @param message
   * @Description 指定单发消息
   * @Throws
   * @Return boolean
   * @Date 2025-08-11 11:03:14
   * @Author WangKun
   **/
    public static boolean sendMessage(String id, String message) {
      return WEB_SOCKET_MANAGER.sendMessages(id, message);
    }

    /**
   * @param groupId
   * @param message
   * @Description 指定分组发消息
   * @Throws
   * @Return boolean
   * @Date 2025-08-11 11:09:22
   * @Author WangKun
   **/
    public static boolean sendGroupMessages(String groupId, String message) {
      return WEB_SOCKET_MANAGER.sendGroupMessages(groupId, message);
    }

    /**
   * @param message
   * @Description 全局广播发消息
   * @Throws
   * @Return boolean
   * @Date 2025-08-11 11:16:54
   * @Author WangKun
   **/
    public static boolean sendGlobalMessages(String message) {
      return WEB_SOCKET_MANAGER.sendGlobalMessages(message);
    }
}
</pre></div>
<p class="maodian"></p><h2>websocket控制器</h2>
<div class="jb51code"><pre class="brush:java;">/**
* @Description websocket控制器
* @Author WangKun
* @Date 2025/8/10 19:59
* @Version
*/
@RestController
@RequestMapping("/api/websocket")
public class WebSocketController {

    private final WebSocketManager manager;

    /**
   * @param manager
   * @Description 构造函数,通过依赖注入方式初始化WebSocket管理器
   * @Throws
   * @Return
   * @Date 2025-08-21 09:25:44
   * @Author WangKun
   **/
    public WebSocketController(WebSocketManager manager) {
      this.manager = manager;
    }

    /**
   * @param
   * @Description 测试连接地址
   * @Throws
   * @Return java.lang.String
   * @Date 2025-08-10 19:59:26
   * @Author WangKun
   **/
    @GetMapping("/test") // 处理GET请求,用于测试WebSocket连接
    public String testConnection() {
      WebSocketUtils.addToGroup("test", "test");
      WebSocketUtils.sendGlobalMessages("压测");
      return "ws://localhost:8099/ws/{id}";
    }

    /**
   * @param id      用户ID
   * @param groupId 分组ID
   * @Description 添加到分组
   * @Throws
   * @Return java.lang.String
   * @Date 2025-08-10 19:59:42
   * @Author WangKun
   **/
    @PostMapping("/addToGroup")
    public String addToGroup(@RequestParam String id, @RequestParam String groupId) {
      boolean flag = manager.addToGroup(id, groupId);
      return "[" + id + "]添加到分组[" + groupId + "]:" + flag;
    }

    /**
   * @param id      用户ID
   * @param groupId 分组ID
   * @Description 从分组移除
   * @Throws
   * @Return java.lang.String
   * @Date 2025-08-10 19:59:48
   * @Author WangKun
   **/
    @PostMapping("/removeToGroup")
    public String removeToGroup(@RequestParam String id, @RequestParam String groupId) {
      boolean flag = manager.removeToGroup(id, groupId);
      return "[" + id + "]从分组[" + groupId + "]移除:" + flag;
    }

    /**
   * @param id      用户ID
   * @param message 要发送的消息内容
   * @Description 发送消息
   * @Throws
   * @Return java.lang.String
   * @Date 2025-08-10 19:59:56
   * @Author WangKun
   **/
    @PostMapping("/sendMessages")
    public String sendMessages(@RequestParam String id, @RequestParam String message) {
      boolean success = manager.sendMessages(id, message);
      return success ? "发送成功" : "发送失败(无活跃连接)";
    }

    /**
   * @param groupId 分组ID
   * @param message 要发送的消息内容
   * @Description 给分组发送消息
   * @Throws
   * @Return java.lang.String
   * @Date 2025-08-10 20:00:06
   * @Author WangKun
   **/
    @PostMapping("/sendGroupMessages")
    public String sendToGroup(@RequestParam String groupId, @RequestParam String message) {
      boolean success = manager.sendGroupMessages(groupId, message);
      return success ? "发送成功" : "发送失败(分组不存在或无成员)";
    }

    /**
   * @param message 要广播的消息内容
   * @Description 全局广播
   * @Throws
   * @Return java.lang.String
   * @Date 2025-08-10 20:00:13
   * @Author WangKun
   **/
    @PostMapping("/sendGlobalMessages")
    public String broadcast(@RequestParam String message) {
      boolean success = manager.sendGlobalMessages(message);
      return success ? "广播成功" : "部分发送失败";
    }
}</pre></div>
<p class="maodian"></p><h2>测试地址</h2>
<p>ws://localhost:8099/ws/user1</p>
<p style="text-align:center"><img alt="" src="https://img.jbzj.com/file_images/article/202601/2026011310155059.png" /></p>
<p>剩下的测试地址都在控制中。</p>
<p class="maodian"></p><h2>总结</h2>
<p>以上为个人经验,希望能给大家一个参考,也希望大家多多支持琼殿技术社区。</p>
                           
                            <div class="art_xg">
                              <b>您可能感兴趣的文章:</b><ul><li>SpringBoot中使用原生WebSocket详解</li><li>SpringBoot分布式WebSocket的实现指南</li><li>SpringBoot实现WebSocket通信过程解读</li><li>深入浅出SpringBoot&nbsp;WebSocket构建实时应用全面指南</li><li>利用SpringBoot与WebSocket实现实时双向通信功能</li><li>Springboot整合WebSocket&nbsp;实现聊天室功能</li></ul>
                            </div>

                        </div>
                        <!--endmain-->
頁: [1]
查看完整版本: SpringBoot WebSocket多消息推送过程