君政 發表於 2025-7-21 14:50:00

SSE客户端C++实现(使用libcurl)

<p><strong>1、概述</strong></p>
<p><strong>&nbsp; &nbsp;&nbsp;</strong>SSE协议 的全称是 Server-Sent Events(服务器发送事件),本质是基于 HTTP 协议的 “单向实时推送技术”——只有服务器能主动给客户端发消息,除了发送订阅请求外,客户端只能接收数据。SSE消息是纯文本格式,SSE标准支持自动重连。</p>
<p><strong>2、SSE与WebSocket</strong></p>
<p>&nbsp; 底层协议:SSE底层使用http协议(http/1.1 chunked编码传输或http/2&nbsp;<em>DATA帧序列</em>),其消息格式为纯文本。WebSocket握手(建立连接)通过http实现,连接建立后使用WebSocket协议(底层使用tcp协议),WebSocket协议帧头比http/1.1要小,而且SSE消息使用文本格式,WebSocket则可以为文本 / 二进制,所以WebSocket通信效率要高。</p>
<p>&nbsp; 通信特点:WebSocket支持双向通信,SSE是单向通信。SSE标准支持自动重连,WebSocket则需要自己实现自动重连功能。</p>
<p>&nbsp; 浏览器支持:SSE、WebSocket都属于JS原生标准,可以直接在浏览器里使用。</p>
<p>&nbsp; 开发运维:SSE支持自动重连,WebSocket则需要自己实现心跳、连接状态判断等,SSE开发要比WebSocket简单。SSE使用http协议,所以可以复用http生态(端口、配置等),而WebSocket需要单独配置ws/wss协议、Nginx 转发。</p>
<p>&nbsp; 适用场景:SSE——系统通知、实时文本数据推送。WebSocket——实时聊天、实时请求-应答(保持连接的请求-应答)。</p>
<p><strong>3、SSE服务实现</strong></p>
<p>&nbsp;&nbsp;实现SSE服务的话,推荐使用Spring MVC的SseEmitter(适用于传统的 Spring Boot Web 项目)或Spring WebFlux的Flux&lt;ServerSentEvent&gt;(响应式非阻塞编程,适合高并发的场景)。</p>
<ul>
<li class="ybc-li-component ybc-li-component_ol">
<div class="ybc-p"><strong>安全考虑</strong>:</div>
<ul class="ybc-ul-component">
<li class="ybc-li-component ybc-li-component_ul">
<div class="ybc-p">使用HTTPS防止中间人攻击</div>
</li>
<li class="ybc-li-component ybc-li-component_ul">
<div class="ybc-p">验证连接权限(如JWT Token)</div>
</li>
<li class="ybc-li-component ybc-li-component_ul">
<div class="ybc-p">防止CSRF攻击</div>
</li>
</ul>
</li>
<li class="ybc-li-component ybc-li-component_ol">
<div class="ybc-p"><strong>监控与日志</strong>:</div>
<ul class="ybc-ul-component">
<li class="ybc-li-component ybc-li-component_ul">
<div class="ybc-p">记录连接数、消息发送成功率</div>
</li>
</ul>
</li>
</ul>
<p><strong>4、nginx与SSE</strong></p>
<p>&nbsp;&nbsp;Nginx可能会开启缓冲(Buffering),比如试图收集足够多的数据(凑满4KB 或 8KB)后再进行转发,这就破坏了 SSE 的实时性,使用 Nginx 反向代理 SSE 服务,必须显式关闭缓冲:</p>
<p>&nbsp;&nbsp;<img src="https://img2024.cnblogs.com/blog/593856/202604/593856-20260408102336219-1548397377.png"></p>
<p>&nbsp;&nbsp;使用 Nginx 反向代理SSE服务的话,调整read_timeout和proxy_read_timeout以禁用读取超时,read_timeout、send_timeout是nginx读取、写入客户的超时,proxy_read_timeout、proxy_send_timeout是nginx读取、写入后端服务的超时,proxy_connect_timeout是nginx连接后端服务的超时。</p>
<p><strong>5、SSE数据包格式</strong></p>
<p>&nbsp; SSE消息由一个或多个message组成,每个message由"\n\n"分隔。一个message可以由一个或多个filed组成,每个filed由\n分隔,filed有data、id、event、retry四种类型。</p>
<p>&nbsp; data表示消息数据,如"data:value\n"。</p>
<p>&nbsp; id表示数据包编号,当连接断开重连的时候,客户端应该发送一个 HTTP 头,里面包含一个特殊的Last-Event-ID头信息,值为收到的最后一条数据的id。</p>
<p>&nbsp; event表示自定义的事件类型。</p>
<p>&nbsp; retry表示浏览器重新发起连接的间隔时间,当时间间隔到期,客户端应该重新发起连接。</p>
<p>&nbsp;&nbsp;还可以有仅以冒号开头的行,表示注释。通常服务器每隔一段时间就会向浏览器发送一个注释,保持连接不中断。</p>
<p>&nbsp; 包含一条message的消息:</p>
<div class="cnblogs_code">
<pre>data: {<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">username</span><span style="color: rgba(128, 0, 0, 1)">"</span>: <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">bobby</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">time</span><span style="color: rgba(128, 0, 0, 1)">"</span>: <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">02:33:48</span><span style="color: rgba(128, 0, 0, 1)">"</span>}</pre>
</div>
<p>&nbsp; 包含两条message的消息:</p>
<div class="cnblogs_code">
<pre><span style="color: rgba(128, 0, 0, 1)">data:this is message A

data:this is
data:message B</span></pre>
</div>
<p>&nbsp; 包含三条message的消息:</p>
<div class="cnblogs_code">
<pre><span style="color: rgba(128, 0, 0, 1)">:explan text

data:this is message A

data:this is
data:message B</span></pre>
</div>
<p><strong>6、SSE客户端实现</strong></p>
<p><strong>&nbsp; </strong>SSE向服务端发送订阅推送请求的话,使用GET方式,如果需要发送参数的话,通过<span class="qk-md-strong complete">URL 查询参数或设置http请求头实现</span>。</p>
<div class="cnblogs_code">
<pre>#include &lt;thread&gt;<span style="color: rgba(0, 0, 0, 1)">
#include </span>&lt;functional&gt;<span style="color: rgba(0, 0, 0, 1)">

size_t dataCallback(</span><span style="color: rgba(0, 0, 255, 1)">void</span>* ptr, size_t size, size_t nmemb, <span style="color: rgba(0, 0, 255, 1)">void</span>*<span style="color: rgba(0, 0, 0, 1)"> param)
{
    </span><span style="color: rgba(0, 0, 255, 1)">if</span> (ptr == NULL || size == <span style="color: rgba(128, 0, 128, 1)">0</span> || param ==<span style="color: rgba(0, 0, 0, 1)"> NULL)
      </span><span style="color: rgba(0, 0, 255, 1)">return</span> <span style="color: rgba(128, 0, 128, 1)">0</span><span style="color: rgba(0, 0, 0, 1)">;

    </span><span style="color: rgba(0, 0, 255, 1)">int</span> s = size *<span style="color: rgba(0, 0, 0, 1)"> nmemb;
    std::</span><span style="color: rgba(0, 0, 255, 1)">string</span> strMsg((<span style="color: rgba(0, 0, 255, 1)">char</span>*<span style="color: rgba(0, 0, 0, 1)">)ptr, s);
    CSSEClient</span>* client = static_cast&lt;CSSEClient*&gt;<span style="color: rgba(0, 0, 0, 1)">(param);
    </span><span style="color: rgba(0, 0, 255, 1)">if</span> (client &amp;&amp; client-&gt;<span style="color: rgba(0, 0, 0, 1)">onMessage)
      client</span>-&gt;<span style="color: rgba(0, 0, 0, 1)">onMessage(strMsg);

    </span><span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)"> s;
}

</span><span style="color: rgba(0, 0, 255, 1)">int</span> progressCallback(<span style="color: rgba(0, 0, 255, 1)">void</span>*<span style="color: rgba(0, 0, 0, 1)"> param, curl_off_t dltotal, curl_off_t dlnow, curl_off_t ultotal, curl_off_t ulnow) {
    CSSEClient</span>* client = static_cast&lt;CSSEClient*&gt;<span style="color: rgba(0, 0, 0, 1)">(param);
    </span><span style="color: rgba(0, 0, 255, 1)">return</span> client-&gt;isExit() ? <span style="color: rgba(128, 0, 128, 1)">1</span> : <span style="color: rgba(128, 0, 128, 1)">0</span>;<span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 返回非0值会中断curl_easy_perform()</span>
<span style="color: rgba(0, 0, 0, 1)">}

</span><span style="color: rgba(0, 0, 255, 1)">class</span><span style="color: rgba(0, 0, 0, 1)"> CSSEClient
{
</span><span style="color: rgba(0, 0, 255, 1)">public</span><span style="color: rgba(0, 0, 0, 1)">:
    CSSEClient(</span><span style="color: rgba(0, 0, 255, 1)">const</span> std::<span style="color: rgba(0, 0, 255, 1)">string</span>&amp;<span style="color: rgba(0, 0, 0, 1)"> strURL) {
      curl_global_init(CURL_GLOBAL_DEFAULT);
      _strURL </span>=<span style="color: rgba(0, 0, 0, 1)"> strURL;
      _thread </span>= std::thread(&amp;CSSEClient::run, <span style="color: rgba(0, 0, 255, 1)">this</span><span style="color: rgba(0, 0, 0, 1)">);
    }
    </span><span style="color: rgba(0, 0, 255, 1)">virtual</span> ~<span style="color: rgba(0, 0, 0, 1)">CSSEClient() {
      _bExit </span>= <span style="color: rgba(0, 0, 255, 1)">true</span><span style="color: rgba(0, 0, 0, 1)">;
      _thread.join();
      curl_global_cleanup();
    }
    </span><span style="color: rgba(0, 0, 255, 1)">bool</span> isExit() { <span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)"> _bExit; }
    std::function</span>&lt;<span style="color: rgba(0, 0, 255, 1)">void</span>(<span style="color: rgba(0, 0, 255, 1)">const</span> std::<span style="color: rgba(0, 0, 255, 1)">string</span>&amp;)&gt;<span style="color: rgba(0, 0, 0, 1)"> onMessage;
</span><span style="color: rgba(0, 0, 255, 1)">private</span><span style="color: rgba(0, 0, 0, 1)">:
    </span><span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> run() {
      </span><span style="color: rgba(0, 0, 255, 1)">while</span> (!<span style="color: rgba(0, 0, 0, 1)">_bExit) {
            subscribeSSE();
            </span><span style="color: rgba(0, 0, 255, 1)">if</span><span style="color: rgba(0, 0, 0, 1)"> (_bExit)
                </span><span style="color: rgba(0, 0, 255, 1)">break</span><span style="color: rgba(0, 0, 0, 1)">;
            std::this_thread::sleep_for(std::chrono::seconds(</span><span style="color: rgba(128, 0, 128, 1)">1</span><span style="color: rgba(0, 0, 0, 1)">));
      }
    }
    </span><span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> subscribeSSE() {
      CURL</span>* curl =<span style="color: rgba(0, 0, 0, 1)"> curl_easy_init();
      </span><span style="color: rgba(0, 0, 255, 1)">if</span> (!curl) <span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)">;

      </span><span style="color: rgba(0, 0, 255, 1)">struct</span> curl_slist* headers =<span style="color: rgba(0, 0, 0, 1)"> NULL;
      headers </span>= curl_slist_append(headers, <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Accept: text/event-stream</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);
      headers </span>= curl_slist_append(headers, <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Cache-Control: no-cache</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);
      curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers);

      curl_easy_setopt(curl, CURLOPT_URL, _strURL.c_str());
      curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, dataCallback);
      curl_easy_setopt(curl, CURLOPT_WRITEDATA, </span><span style="color: rgba(0, 0, 255, 1)">this</span><span style="color: rgba(0, 0, 0, 1)">);
      curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, </span><span style="color: rgba(128, 0, 128, 1)">10L</span><span style="color: rgba(0, 0, 0, 1)">);
      curl_easy_setopt(curl, CURLOPT_TIMEOUT, </span><span style="color: rgba(128, 0, 128, 1)">0L</span><span style="color: rgba(0, 0, 0, 1)">);

      curl_easy_setopt(curl, CURLOPT_TCP_KEEPALIVE, </span><span style="color: rgba(128, 0, 128, 1)">1L</span>); <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">开启TCP心跳</span>
      curl_easy_setopt(curl, CURLOPT_TCP_KEEPIDLE, <span style="color: rgba(128, 0, 128, 1)">60L</span>); <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">空闲60秒后探测</span>
      curl_easy_setopt(curl, CURLOPT_TCP_KEEPINTVL, <span style="color: rgba(128, 0, 128, 1)">10L</span>); <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">探测间隔10秒</span>
<span style="color: rgba(0, 0, 0, 1)">
      curl_easy_setopt(curl, CURLOPT_NOPROGRESS, </span><span style="color: rgba(128, 0, 128, 1)">0L</span>);<span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 开启传输进度(已下载/上传数据量、总数据量等)回调</span>
      curl_easy_setopt(curl, CURLOPT_XFERINFOFUNCTION, progressCallback); <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">设置传输进度回调方法,即使没有数据传输该方法也会被调用(通常每秒1次),所以可以在该方法中判断是否需要退出</span>
      curl_easy_setopt(curl, CURLOPT_XFERINFODATA, <span style="color: rgba(0, 0, 255, 1)">this</span>); <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">设置传输回调方法的第一个参数</span>
<span style="color: rgba(0, 0, 0, 1)">
      CURLcode res </span>=<span style="color: rgba(0, 0, 0, 1)"> curl_easy_perform(curl);
      </span><span style="color: rgba(0, 0, 255, 1)">if</span> (res ==<span style="color: rgba(0, 0, 0, 1)"> CURLE_ABORTED_BY_CALLBACK) {
            </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">SSE connection aborted by user;</span>
<span style="color: rgba(0, 0, 0, 1)">      }
      </span><span style="color: rgba(0, 0, 255, 1)">else</span><span style="color: rgba(0, 0, 0, 1)"> {
            </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">error</span>
<span style="color: rgba(0, 0, 0, 1)">      }

      curl_easy_cleanup(curl);
    }

    std::thread _thread;
    std::</span><span style="color: rgba(0, 0, 255, 1)">string</span><span style="color: rgba(0, 0, 0, 1)"> _strURL;
    std::atomic_bool _bExit </span>= <span style="color: rgba(0, 0, 255, 1)">false</span><span style="color: rgba(0, 0, 0, 1)">;
};</span></pre>
</div>
<p>&nbsp; 比如,客户端订阅用户相关数据的变化:</p>
<div class="cnblogs_code">
<pre>CSSEClient _sseClient(<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">http://test-api.baidu.com.cn:8080/SSE/subscribe/userID</span><span style="color: rgba(128, 0, 0, 1)">"</span>)</pre>
</div>
<p>&nbsp;&nbsp;对于需要认证的SSE服务,需添加CURLOPT_USERPWD或OAuth头。</p>
<p>&nbsp;&nbsp;</p><br><br>
来源:https://www.cnblogs.com/milanleon/p/18995770
頁: [1]
查看完整版本: SSE客户端C++实现(使用libcurl)