Golang中SSE协议实现方法
<div id="navCategory"><h5 class="catalogue">目录</h5><ul class="first_class_ul"><li><a href="#_label0">Gin流式返回实现方法</a></li><li><a href="#_label1">🧑‍💻 User</a></li><li><a href="#_label2">🤖 Assistant</a></li><li><a href="#_label3">1. 使用 Server-Sent Events (SSE)</a></li><li><a href="#_label4">2. 手动控制响应流</a></li><li><a href="#_label5">3. 结合上下文取消机制</a></li><li><a href="#_label6">4. JSON 流式返回</a></li><li><a href="#_label7">5. 文件流式下载</a></li><li><a href="#_label8">6. 前端示例</a></li><li><a href="#_label9">关键要点</a></li><li><a href="#_label10">🧑‍💻 User</a></li><li><a href="#_label11">🤖 Assistant</a></li><li><a href="#_label12">响应格式解析</a></li><li><a href="#_label13">Gin 框架实现代码示例</a></li><li><a href="#_label14">更完整的实现(包含错误处理)</a></li><li><a href="#_label15">前端接收示例</a></li><li><a href="#_label16">关键技术点</a></li><li><a href="#_label17">🧑‍💻 User</a></li><li><a href="#_label18">🤖 Assistant</a></li><li><a href="#_label19">1. 使用 HTTP 标准库实现流式读取</a></li><li><a href="#_label20">2. 完整的 SSE 客户端实现</a></li><li><a href="#_label21">3. 使用 Context 支持取消操作</a></li><li><a href="#_label22">4. 使用第三方库简化操作</a></li><li><a href="#_label23">5. 简单的命令行聊天客户端示例</a></li><li><a href="#_label24">关键要点</a></li></ul></div><p class="maodian"><a name="_label0"></a></p><h2>Gin流式返回实现方法</h2><p class="maodian"><a name="_label1"></a></p><p class="maodian"><a name="_label10"></a></p><p class="maodian"><a name="_label17"></a></p><h2>🧑‍💻 User</h2>
<p>gin框架如何做到流式返回</p>
<p class="maodian"><a name="_label2"></a></p><p class="maodian"><a name="_label11"></a></p><p class="maodian"><a name="_label18"></a></p><h2>🤖 Assistant</h2>
<p>在 Gin 框架中实现流式返回主要通过以下几个关键技术点:</p>
<p class="maodian"><a name="_label3"></a></p><h2>1. 使用 Server-Sent Events (SSE)</h2>
<p>SSE 是实现流式返回的常用方式:</p>
<div class="jb51code"><pre class="brush:go;">package main
import (
"fmt"
"net/http"
"time"
"github.com/gin-gonic/gin"
)
func main() {
r := gin.Default()
r.GET("/stream", func(c *gin.Context) {
// 设置响应头
c.Header("Content-Type", "text/event-stream")
c.Header("Cache-Control", "no-cache")
c.Header("Connection", "keep-alive")
c.Header("Access-Control-Allow-Origin", "*")
// 创建通道用于发送数据
messageChan := make(chan string)
// 模拟数据生成
go func() {
defer close(messageChan)
for i := 0; i < 10; i++ {
message := fmt.Sprintf("消息 %d: 当前时间 %v", i, time.Now().Format("15:04:05"))
messageChan <- message
time.Sleep(1 * time.Second)
}
}()
// 流式返回数据
c.Stream(func(w io.Writer) bool {
if message, ok := <-messageChan; ok {
// SSE 格式: data: {message}\n\n
c.SSEvent("message", message)
return true
}
return false
})
})
r.Run(":8080")
}</pre></div>
<p class="maodian"><a name="_label4"></a></p><h2>2. 手动控制响应流</h2>
<p>更底层的方式是直接操作 ResponseWriter:</p>
<div class="jb51code"><pre class="brush:go;">r.GET("/stream-data", func(c *gin.Context) {
c.Header("Content-Type", "text/plain")
c.Header("Transfer-Encoding", "chunked")
flusher, ok := c.Writer.(http.Flusher)
if !ok {
c.String(http.StatusInternalServerError, "Streaming not supported")
return
}
for i := 0; i < 10; i++ {
fmt.Fprintf(c.Writer, "数据块 %d\n", i)
flusher.Flush() // 立即发送到客户端
time.Sleep(1 * time.Second)
}
})</pre></div>
<p class="maodian"><a name="_label5"></a></p><h2>3. 结合上下文取消机制</h2>
<p>处理客户端中断连接的情况:</p>
<div class="jb51code"><pre class="brush:go;">r.GET("/stream-with-context", func(c *gin.Context) {
c.Header("Content-Type", "text/event-stream")
c.Header("Cache-Control", "no-cache")
c.Header("Connection", "keep-alive")
flusher, _ := c.Writer.(http.Flusher)
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case <-c.Request.Context().Done():
// 客户端断开连接
fmt.Println("客户端断开连接")
return
case t := <-ticker.C:
data := fmt.Sprintf("时间: %s", t.Format("15:04:05"))
c.SSEvent("update", data)
flusher.Flush()
}
}
})</pre></div>
<p class="maodian"><a name="_label6"></a></p><h2>4. JSON 流式返回</h2>
<p>对于结构化数据的流式返回:</p>
<div class="jb51code"><pre class="brush:json;">r.GET("/stream-json", func(c *gin.Context) {
c.Header("Content-Type", "application/x-ndjson") // 换行分隔的JSON
flusher, _ := c.Writer.(http.Flusher)
for i := 0; i < 5; i++ {
data := gin.H{
"id": i,
"time": time.Now().Unix(),
"data": fmt.Sprintf("项目 %d", i),
}
c.Render(-1, gin.JSON{Data: data})
fmt.Fprintln(c.Writer) // 添加换行符
flusher.Flush()
time.Sleep(1 * time.Second)
}
})</pre></div>
<p class="maodian"><a name="_label7"></a></p><h2>5. 文件流式下载</h2>
<p>大文件的流式下载:</p>
<div class="jb51code"><pre class="brush:go;">r.GET("/download-large-file", func(c *gin.Context) {
filePath := "./large-file.txt"
c.Header("Content-Type", "application/octet-stream")
c.Header("Content-Disposition", "attachment; filename=large-file.txt")
file, err := os.Open(filePath)
if err != nil {
c.String(http.StatusNotFound, "文件不存在")
return
}
defer file.Close()
// 获取文件信息
fileInfo, _ := file.Stat()
c.Header("Content-Length", fmt.Sprintf("%d", fileInfo.Size()))
// 流式传输文件内容
io.Copy(c.Writer, file)
})</pre></div>
<p class="maodian"><a name="_label8"></a></p><h2>6. 前端示例</h2>
<p>对应的前端代码:</p>
<div class="jb51code"><pre class="brush:xhtml;"><!DOCTYPE html>
<html>
<head>
<title>SSE 示例</title>
</head>
<body>
<div id="messages"></div>
<script>
const eventSource = new EventSource('/stream');
eventSource.onmessage = function(event) {
const messageDiv = document.createElement('div');
messageDiv.textContent = event.data;
document.getElementById('messages').appendChild(messageDiv);
};
eventSource.onerror = function(event) {
console.error('SSE 错误:', event);
};
</script>
</body>
</html></pre></div>
<p class="maodian"><a name="_label9"></a></p><p class="maodian"><a name="_label24"></a></p><h2>关键要点</h2>
<ul><li><strong>设置正确的响应头</strong>:<code>text/event-stream</code>、<code>no-cache</code>、<code>keep-alive</code></li><li><strong>使用 Flusher</strong>:确保数据立即发送到客户端</li><li><strong>处理连接中断</strong>:通过上下文监听客户端断开</li><li><strong>错误处理</strong>:妥善处理流式传输中的错误</li><li><strong>资源清理</strong>:确保 goroutine 和连接正确关闭</li></ul>
<p>这种方式特别适合实时数据推送、大文件下载、长时间运行的任务进度更新等场景。</p>
<h2>🧑‍💻 User</h2>
<p>这个服务是如何返回的:</p>
<p>data: {"id":"019b84c169efb22b959b99a7914e4712","object":"chat.completion.chunk","created":1767458826,"model":"deepseek-ai/DeepSeek-V3.2","choices":[{"index":0,"delta":{"content":"","reasoning_content":null,"role":"assistant"},"finish_reason":"stop"}],"system_fingerprint":"","usage":{"prompt_tokens":23,"completion_tokens":19,"total_tokens":42,"completion_tokens_details":{"reasoning_tokens":0}}}</p>
<p>data: </p>
<h2>🤖 Assistant</h2>
<p>这是一个典型的 <strong>Server-Sent Events (SSE)</strong> 流式响应格式。让我详细解析这种返回方式的工作原理:</p>
<p class="maodian"><a name="_label12"></a></p><h2>响应格式解析</h2>
<div class="jb51code"><pre class="brush:json;">// 每条数据都以 "data: " 开头,后跟 JSON 数据
data: {"id":"019b84c169efb22b959b99a7914e4712","object":"chat.completion.chunk","created":1767458826,"model":"deepseek-ai/DeepSeek-V3.2","choices":[{"index":0,"delta":{"content":"服务。","reasoning_content":null,"role":"assistant"},"finish_reason":null}],"usage":{...}}
// 最后以 结束
data: </pre></div>
<p class="maodian"><a name="_label13"></a></p><h2>Gin 框架实现代码示例</h2>
<div class="jb51code"><pre class="brush:go;">package main
import (
"encoding/json"
"fmt"
"net/http"
"time"
"github.com/gin-gonic/gin"
)
func main() {
r := gin.Default()
r.GET("/chat/stream", func(c *gin.Context) {
// 设置 SSE 响应头
c.Header("Content-Type", "text/event-stream")
c.Header("Cache-Control", "no-cache")
c.Header("Connection", "keep-alive")
c.Header("Access-Control-Allow-Origin", "*")
// 检查是否支持流式传输
flusher, ok := c.Writer.(http.Flusher)
if !ok {
c.JSON(http.StatusInternalServerError, gin.H{"error": "Streaming not supported"})
return
}
// 模拟 AI 对话的逐词返回
messages := []string{"服务。", "请问有什么", "可以帮", "您的吗", "?那我", "先退", "下了。"}
for i, message := range messages {
// 构建响应数据
response := gin.H{
"id": "019b84c169efb22b959b99a7914e4712",
"object":"chat.completion.chunk",
"created": time.Now().Unix(),
"model": "deepseek-ai/DeepSeek-V3.2",
"choices": []gin.H{
{
"index": 0,
"delta": gin.H{
"content": message,
"reasoning_content": nil,
"role": "assistant",
},
"finish_reason": nil,
},
},
"usage": gin.H{
"prompt_tokens": 23,
"completion_tokens": 5 + i*2, // 模拟token计数
"total_tokens": 28 + i*2,
"completion_tokens_details": gin.H{
"reasoning_tokens": 0,
},
},
}
// 转换为 JSON
jsonData, _ := json.Marshal(response)
// 按照 SSE 格式写入:data: {json}\n\n
fmt.Fprintf(c.Writer, "data: %s\n\n", string(jsonData))
flusher.Flush() // 立即刷新到客户端
time.Sleep(200 * time.Millisecond) // 模拟处理延迟
}
// 发送结束标志
fmt.Fprintf(c.Writer, "data: \n\n")
flusher.Flush()
})
r.Run(":8080")
}</pre></div>
<p class="maodian"><a name="_label14"></a></p><h2>更完整的实现(包含错误处理)</h2>
<div class="jb51code"><pre class="brush:go;">func streamingChatHandler(c *gin.Context) {
// SSE 头设置
c.Header("Content-Type", "text/event-stream")
c.Header("Cache-Control", "no-cache")
c.Header("Connection", "keep-alive")
c.Header("Access-Control-Allow-Origin", "*")
flusher, ok := c.Writer.(http.Flusher)
if !ok {
c.JSON(http.StatusInternalServerError, gin.H{"error": "Streaming not supported"})
return
}
// 监听客户端断开连接
ctx := c.Request.Context()
// 使用 Gin 的 Stream 方法(推荐)
c.Stream(func(w io.Writer) bool {
select {
case <-ctx.Done():
return false // 客户端断开,停止流式传输
default:
// 生成聊天数据
chunk := generateChatChunk()
if chunk == nil {
// 发送结束信号
c.SSEvent("", "")
return false
}
// 发送数据块
c.SSEvent("", chunk)
return true
}
})
}
func generateChatChunk() interface{} {
// 这里可以连接真实的 AI 模型
// 返回数据块结构
return mapinterface{}{
"id": generateID(),
"object":"chat.completion.chunk",
"choices": []mapinterface{}{
{
"delta": mapinterface{}{
"content": getNextWord(),
},
},
},
}
}</pre></div>
<p class="maodian"><a name="_label15"></a></p><h2>前端接收示例</h2>
<div class="jb51code"><pre class="brush:xhtml;"><!DOCTYPE html>
<html>
<head>
<title>SSE Chat</title>
</head>
<body>
<div id="chat-output"></div>
<script>
const chatOutput = document.getElementById('chat-output');
const eventSource = new EventSource('/chat/stream');
eventSource.onmessage = function(event) {
if (event.data === '') {
eventSource.close();
return;
}
try {
const data = JSON.parse(event.data);
if (data.choices && data.choices.delta.content) {
chatOutput.innerHTML += data.choices.delta.content;
}
} catch (error) {
console.error('解析错误:', error);
}
};
eventSource.onerror = function(event) {
console.error('SSE 错误:', event);
eventSource.close();
};
</script>
</body>
</html></pre></div>
<p class="maodian"><a name="_label16"></a></p><h2>关键技术点</h2>
<ul><li><strong>SSE 格式</strong>:每条消息以 <code>data:</code> 开头,以 <code>\n\n</code> 结束</li><li><strong>流式刷新</strong>:使用 <code>Flusher</code> 立即发送数据到客户端</li><li><strong>连接管理</strong>:监听上下文取消以处理客户端断开</li><li><strong>数据分块</strong>:将完整响应拆分为多个小块逐步发送</li><li><strong>结束标志</strong>:使用 <code></code> 标记流式传输结束</li></ul>
<p>这种模式广泛应用于 ChatGPT、Claude 等 AI 服务的流式对话功能。</p>
<h2>🧑‍💻 User</h2>
<p>golang发起请求是流式输出的时候可不可以支持流式print出来,而不是等待全部结束才输出</p>
<h2>🤖 Assistant</h2>
<p>当然可以!Golang 发起请求时也可以实现流式输出,而不是等待全部结束才输出。以下是几种实现方式:</p>
<p class="maodian"><a name="_label19"></a></p><h2>1. 使用 HTTP 标准库实现流式读取</h2>
<div class="jb51code"><pre class="brush:go;">package main
import (
"bufio"
"fmt"
"io"
"net/http"
"time"
)
func main() {
// 创建请求
req, err := http.NewRequest("GET", "http://localhost:8080/chat/stream", nil)
if err != nil {
panic(err)
}
// 设置接受流式响应
req.Header.Set("Accept", "text/event-stream")
req.Header.Set("Cache-Control", "no-cache")
// 发送请求
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
panic(err)
}
defer resp.Body.Close()
// 创建读取器
reader := bufio.NewReader(resp.Body)
fmt.Println("开始接收流式响应:")
for {
// 逐行读取
line, err := reader.ReadBytes('\n')
if err != nil {
if err == io.EOF {
break
}
fmt.Printf("读取错误: %v\n", err)
break
}
// 处理 SSE 格式的数据行
lineStr := string(line)
if len(lineStr) > 6 && lineStr[:6] == "data: " {
data := lineStr
if data == "\n" {
fmt.Println("\n流式传输结束")
break
}
// 这里可以解析 JSON 并提取内容
fmt.Printf("收到数据块: %s", data)
}
}
}</pre></div>
<p class="maodian"><a name="_label20"></a></p><h2>2. 完整的 SSE 客户端实现</h2>
<div class="jb51code"><pre class="brush:go;">package main
import (
"bufio"
"encoding/json"
"fmt"
"io"
"net/http"
"strings"
)
type StreamResponse struct {
ID string `json:"id"`
Objectstring `json:"object"`
Choices []struct {
Delta struct {
Content string `json:"content"`
} `json:"delta"`
} `json:"choices"`
}
func main() {
streamSSE("http://localhost:8080/chat/stream")
}
func streamSSE(url string) {
req, _ := http.NewRequest("GET", url, nil)
req.Header.Set("Accept", "text/event-stream")
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
fmt.Printf("请求错误: %v\n", err)
return
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
fmt.Printf("HTTP错误: %s\n", resp.Status)
return
}
scanner := bufio.NewScanner(resp.Body)
var fullResponse strings.Builder
fmt.Print("AI回复: ")
for scanner.Scan() {
line := scanner.Text()
if line == "" {
continue // 跳过空行
}
if strings.HasPrefix(line, "data: ") {
data := line
if data == "" {
fmt.Println("\n\n=== 流式传输完成 ===")
break
}
// 解析 JSON
var chunk StreamResponse
if err := json.Unmarshal([]byte(data), &chunk); err != nil {
fmt.Printf("JSON解析错误: %v\n", err)
continue
}
// 输出内容
if len(chunk.Choices) > 0 && chunk.Choices.Delta.Content != "" {
content := chunk.Choices.Delta.Content
fmt.Print(content)
fullResponse.WriteString(content)
}
}
}
if err := scanner.Err(); err != nil {
fmt.Printf("读取流错误: %v\n", err)
}
fmt.Printf("\n完整回复: %s\n", fullResponse.String())
}</pre></div>
<p class="maodian"><a name="_label21"></a></p><h2>3. 使用 Context 支持取消操作</h2>
<div class="jb51code"><pre class="brush:go;">package main
import (
"bufio"
"context"
"encoding/json"
"fmt"
"net/http"
"os"
"os/signal"
"strings"
"syscall"
"time"
)
func main() {
// 创建可取消的上下文
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// 监听中断信号
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM)
go func() {
<-sigCh
fmt.Println("\n接收到中断信号,停止流式接收...")
cancel()
}()
// 启动流式接收
if err := streamWithContext(ctx, "http://localhost:8080/chat/stream"); err != nil {
fmt.Printf("错误: %v\n", err)
}
}
func streamWithContext(ctx context.Context, url string) error {
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
return err
}
req.Header.Set("Accept", "text/event-stream")
client := &http.Client{
Timeout: 0, // 无超时限制
}
resp, err := client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
// 创建带缓冲的读取器
reader := bufio.NewReader(resp.Body)
buffer := make([]byte, 1024)
fmt.Println("开始流式接收 (按Ctrl+C停止):")
fmt.Print("> ")
for {
select {
case <-ctx.Done():
fmt.Println("\n用户主动停止")
return nil
default:
// 非阻塞读取
n, err := reader.Read(buffer)
if err != nil {
if err == io.EOF {
fmt.Println("\n流结束")
return nil
}
return err
}
if n > 0 {
data := string(buffer[:n])
processSSEData(data)
}
time.Sleep(50 * time.Millisecond) // 避免CPU占用过高
}
}
}
func processSSEData(data string) {
lines := strings.Split(data, "\n")
for _, line := range lines {
line = strings.TrimSpace(line)
if strings.HasPrefix(line, "data: ") {
content := line
if content == "" {
fmt.Println("\n[传输完成]")
return
}
// 简单的文本输出(可根据需要解析JSON)
fmt.Print(content)
}
}
}</pre></div>
<p class="maodian"><a name="_label22"></a></p><h2>4. 使用第三方库简化操作</h2>
<div class="jb51code"><pre class="brush:go;">package main
import (
"fmt"
"github.com/r3labs/sse"
)
func main() {
client := sse.NewClient("http://localhost:8080/chat/stream")
fmt.Println("使用SSE库接收流式响应:")
// 订阅事件
err := client.Subscribe("message", func(msg *sse.Event) {
// 收到消息时立即处理
fmt.Printf("收到消息: %s\n", string(msg.Data))
})
if err != nil {
fmt.Printf("订阅错误: %v\n", err)
}
// 保持运行
select {}
}</pre></div>
<p class="maodian"><a name="_label23"></a></p><h2>5. 简单的命令行聊天客户端示例</h2>
<div class="jb51code"><pre class="brush:go;">package main
import (
"bufio"
"encoding/json"
"fmt"
"net/http"
"os"
"strings"
)
func main() {
reader := bufio.NewReader(os.Stdin)
for {
fmt.Print("你: ")
input, _ := reader.ReadString('\n')
input = strings.TrimSpace(input)
if input == "exit" {
break
}
// 发送请求并流式接收回复
streamChatResponse(input)
fmt.Println()
}
}
func streamChatResponse(prompt string) {
url := fmt.Sprintf("http://localhost:8080/chat?message=%s", prompt)
req, _ := http.NewRequest("GET", url, nil)
req.Header.Set("Accept", "text/event-stream")
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
fmt.Printf("请求错误: %v\n", err)
return
}
defer resp.Body.Close()
scanner := bufio.NewScanner(resp.Body)
fmt.Print("AI: ")
for scanner.Scan() {
line := scanner.Text()
if strings.HasPrefix(line, "data: ") {
data := line
if data == "" {
break
}
var result mapinterface{}
if json.Unmarshal([]byte(data), &result) == nil {
if choices, ok := result["choices"].([]interface{}); ok && len(choices) > 0 {
if choice, ok := choices.(mapinterface{}); ok {
if delta, ok := choice["delta"].(mapinterface{}); ok {
if content, ok := delta["content"].(string); ok {
fmt.Print(content)
}
}
}
}
}
}
}
}</pre></div>
<h2>关键要点</h2>
<ul><li>使用 <code>bufio.Scanner</code> 或 <code>bufio.Reader</code> 逐行读取响应</li><li><strong>识别 SSE 格式</strong>:<code>data:</code> 前缀和 <code></code> 结束标记</li><li><strong>实时刷新输出</strong>:使用 <code>fmt.Print</code> 而不是缓冲的打印</li><li><strong>处理上下文取消</strong>:支持用户中断流式接收</li><li><strong>错误处理</strong>:妥善处理网络中断和解析错误</li></ul>
<p>这样就能实现类似 ChatGPT 的流式输出效果,每个数据块到达时立即显示,而不是等待全部完成。</p>
頁:
[1]