|
1、概述
SSE协议 的全称是 Server-Sent Events(服务器发送事件),本质是基于 HTTP 协议的 “单向实时推送技术”——只有服务器能主动给客户端发消息,除了发送订阅请求外,客户端只能接收数据。SSE消息是纯文本格式,SSE标准支持自动重连。
2、SSE与WebSocket
底层协议:SSE底层使用http协议(http/1.1 chunked编码传输或http/2 DATA帧序列),其消息格式为纯文本。WebSocket握手(建立连接)通过http实现,连接建立后使用WebSocket协议(底层使用tcp协议),WebSocket协议帧头比http/1.1要小,而且SSE消息使用文本格式,WebSocket则可以为文本 / 二进制,所以WebSocket通信效率要高。
通信特点:WebSocket支持双向通信,SSE是单向通信。SSE标准支持自动重连,WebSocket则需要自己实现自动重连功能。
浏览器支持:SSE、WebSocket都属于JS原生标准,可以直接在浏览器里使用。
开发运维:SSE支持自动重连,WebSocket则需要自己实现心跳、连接状态判断等,SSE开发要比WebSocket简单。SSE使用http协议,所以可以复用http生态(端口、配置等),而WebSocket需要单独配置ws/wss协议、Nginx 转发。
适用场景:SSE——系统通知、实时文本数据推送。WebSocket——实时聊天、实时请求-应答(保持连接的请求-应答)。
3、SSE服务实现
实现SSE服务的话,推荐使用Spring MVC的SseEmitter(适用于传统的 Spring Boot Web 项目)或Spring WebFlux的Flux<ServerSentEvent>(响应式非阻塞编程,适合高并发的场景)。
-
安全考虑:
-
使用HTTPS防止中间人攻击
-
验证连接权限(如JWT Token)
-
防止CSRF攻击
-
监控与日志:
4、nginx与SSE
Nginx可能会开启缓冲(Buffering),比如试图收集足够多的数据(凑满4KB 或 8KB)后再进行转发,这就破坏了 SSE 的实时性,使用 Nginx 反向代理 SSE 服务,必须显式关闭缓冲:
使用 Nginx 反向代理SSE服务的话,调整read_timeout和proxy_read_timeout以禁用读取超时,read_timeout、send_timeout是nginx读取、写入客户的超时,proxy_read_timeout、proxy_send_timeout是nginx读取、写入后端服务的超时,proxy_connect_timeout是nginx连接后端服务的超时。
5、SSE数据包格式
SSE消息由一个或多个message组成,每个message由"\n\n"分隔。一个message可以由一个或多个filed组成,每个filed由\n分隔,filed有data、id、event、retry四种类型。
data表示消息数据,如"data:value\n"。
id表示数据包编号,当连接断开重连的时候,客户端应该发送一个 HTTP 头,里面包含一个特殊的Last-Event-ID头信息,值为收到的最后一条数据的id。
event表示自定义的事件类型。
retry表示浏览器重新发起连接的间隔时间,当时间间隔到期,客户端应该重新发起连接。
还可以有仅以冒号开头的行,表示注释。通常服务器每隔一段时间就会向浏览器发送一个注释,保持连接不中断。
包含一条message的消息:
data: {"username": "bobby", "time": "02:33:48"}
包含两条message的消息:
data:this is message A
data:this is
data:message B
包含三条message的消息:
:explan text
data:this is message A
data:this is
data:message B
6、SSE客户端实现
SSE向服务端发送订阅推送请求的话,使用GET方式,如果需要发送参数的话,通过URL 查询参数或设置http请求头实现。
#include <thread>
#include <functional>
size_t dataCallback(void* ptr, size_t size, size_t nmemb, void* param)
{
if (ptr == NULL || size == 0 || param == NULL)
return 0;
int s = size * nmemb;
std::string strMsg((char*)ptr, s);
CSSEClient* client = static_cast<CSSEClient*>(param);
if (client && client->onMessage)
client->onMessage(strMsg);
return s;
}
int progressCallback(void* param, curl_off_t dltotal, curl_off_t dlnow, curl_off_t ultotal, curl_off_t ulnow) {
CSSEClient* client = static_cast<CSSEClient*>(param);
return client->isExit() ? 1 : 0; // 返回非0值会中断curl_easy_perform()
}
class CSSEClient
{
public:
CSSEClient(const std::string& strURL) {
curl_global_init(CURL_GLOBAL_DEFAULT);
_strURL = strURL;
_thread = std::thread(&CSSEClient::run, this);
}
virtual ~CSSEClient() {
_bExit = true;
_thread.join();
curl_global_cleanup();
}
bool isExit() { return _bExit; }
std::function<void(const std::string&)> onMessage;
private:
void run() {
while (!_bExit) {
subscribeSSE();
if (_bExit)
break;
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}
void subscribeSSE() {
CURL* curl = curl_easy_init();
if (!curl) return;
struct curl_slist* headers = NULL;
headers = curl_slist_append(headers, "Accept: text/event-stream");
headers = curl_slist_append(headers, "Cache-Control: no-cache");
curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers);
curl_easy_setopt(curl, CURLOPT_URL, _strURL.c_str());
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, dataCallback);
curl_easy_setopt(curl, CURLOPT_WRITEDATA, this);
curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, 10L);
curl_easy_setopt(curl, CURLOPT_TIMEOUT, 0L);
curl_easy_setopt(curl, CURLOPT_TCP_KEEPALIVE, 1L); //开启TCP心跳
curl_easy_setopt(curl, CURLOPT_TCP_KEEPIDLE, 60L); //空闲60秒后探测
curl_easy_setopt(curl, CURLOPT_TCP_KEEPINTVL, 10L); //探测间隔10秒
curl_easy_setopt(curl, CURLOPT_NOPROGRESS, 0L); // 开启传输进度(已下载/上传数据量、总数据量等)回调
curl_easy_setopt(curl, CURLOPT_XFERINFOFUNCTION, progressCallback); //设置传输进度回调方法,即使没有数据传输该方法也会被调用(通常每秒1次),所以可以在该方法中判断是否需要退出
curl_easy_setopt(curl, CURLOPT_XFERINFODATA, this); //设置传输回调方法的第一个参数
CURLcode res = curl_easy_perform(curl);
if (res == CURLE_ABORTED_BY_CALLBACK) {
//SSE connection aborted by user;
}
else {
//error
}
curl_easy_cleanup(curl);
}
std::thread _thread;
std::string _strURL;
std::atomic_bool _bExit = false;
};
比如,客户端订阅用户相关数据的变化:
CSSEClient _sseClient("http://test-api.baidu.com.cn:8080/SSE/subscribe/userID")
对于需要认证的SSE服务,需添加CURLOPT_USERPWD或OAuth头。
来源:https://www.cnblogs.com/milanleon/p/18995770 |