南山樵夫 發表於 2021-10-18 00:45:00

Go WebSocket 实现

<p>WebSocket是HTML5下的产物,能更好的节省服务器资源和带宽。常见场景:html5多人游戏、聊天室、协同编辑、基于实时位置的应用、股票实时报价、弹幕、视频会议、QQ,微信、等等... ...</p>
<h2 id="websocket-vs-http">websocket VS http</h2>
<p><strong>相似</strong>:</p>
<p>都是应用层协议,都基于tcp传输协议<br>
跟http有良好的兼容性,ws和http的默认端口都是80,wss和https的默认端口都是443<br>
websocket在握手阶段采用http发送数据</p>
<p><strong>差异</strong>:</p>
<p>http是半双工,而websocket通过多路复用实现了全双工<br>
http只能由client主动发起数据请求,而websocket还可以由server主动向client推送数据。在需要及时刷新的场景中,http只能靠client高频地轮询,浪费严重<br>
http是短连接(也可以实现长连接, HTTP1.1 的连接默认使用长连接),每次数据请求都得经过三次握手重新建立连接,而websocket是长连接<br>
http长连接中每次请求都要带上header,而websocket在传输数据阶段不需要带header</p>
<h2 id="websocket握手协议">websocket握手协议</h2>
<p><strong>Request Header</strong></p>
<p>Sec-Websocket-Version:13<br>
Upgrade:websocket<br>
Connection:Upgrade<br>
Sec-Websocket-Key:duR0pUQxNgBJsRQKj2Jxsw==</p>
<p><strong>Response Header</strong></p>
<p>Upgrade:websocket<br>
Connection:Upgrade<br>
Sec-Websocket-Accept:a1y2oy1zvgHsVyHMx+hZ1AYrEHI=</p>
<pre><code class="language-bash">Upgrade:websocket和Connection:Upgrade指明使用WebSocket协议
Sec-WebSocket-Version 指定Websocket协议版本
Sec-WebSocket-Key是一个Base64 encode的值,是浏览器随机生成的
服务端收到Sec-WebSocket-Key后拼接上一个固定的GUID,进行一次SHA-1摘要,再转成Base64编码,得到Sec-WebSocket-Accept返回给客户端。客户端对本地的Sec-WebSocket-Key执行同样的操作跟服务端返回的结果进行对比,如果不一致会返回错误关闭连接。如此操作是为了把websocket header跟http header区分开
</code></pre>
<p><strong>websocket发送的消息类型有5种</strong>:TextMessag、BinaryMessage、CloseMessag、PingMessage、PongMessage<br>
TextMessag和BinaryMessage分别表示发送文本消息和二进制消息<br>
CloseMessage关闭帧,接收方收到这个消息就关闭连接<br>
PingMessage和PongMessage是保持心跳的帧,发送方接收方是PingMessage,接收方发送方是PongMessage,目前浏览器没有相关api发送ping给服务器,只能由服务器发ping给浏览器,浏览器返回pong消息</p>
<h2 id="gorillawebsocket-概述">gorilla/websocket 概述</h2>
<p>Upgrader用于升级 http 请求,把 http 请求升级为长连接的 WebSocket。结构如下:</p>
<pre><code class="language-go">type Upgrader struct {
    // 升级 websocket 握手完成的超时时间
    HandshakeTimeout time.Duration

    // io 操作的缓存大小,如果不指定就会自动分配。
    ReadBufferSize, WriteBufferSize int

    // 写数据操作的缓存池,如果没有设置值,write buffers 将会分配到链接生命周期里。
    WriteBufferPool BufferPool

    //按顺序指定服务支持的协议,如值存在,则服务会从第一个开始匹配客户端的协议。
    Subprotocols []string

    // http 的错误响应函数,如果没有设置 Error 则,会生成 http.Error 的错误响应。
    Error func(w http.ResponseWriter, r *http.Request, status int, reason error)

    // 如果请求Origin标头可以接受,CheckOrigin将返回true。 如果CheckOrigin为nil,则使用安全默认值:如果Origin请求头存在且原始主机不等于请求主机头,则返回false。
    // 请求检查函数,用于统一的链接检查,以防止跨站点请求伪造。如果不检查,就设置一个返回值为true的函数
    CheckOrigin func(r *http.Request) bool

    // EnableCompression 指定服务器是否应尝试协商每个邮件压缩(RFC 7692)。 将此值设置为true并不能保证将支持压缩。 目前仅支持“无上下文接管”模式
    EnableCompression bool
}
</code></pre>
<p>func (*Upgrader) Upgrade 函数将 http 升级到 WebSocket 协议。</p>
<pre><code class="language-go">// responseHeader包含在对客户端升级请求的响应中。
// 使用responseHeader指定cookie(Set-Cookie)和应用程序协商的子协议(Sec-WebSocket-Protocol)。
// 如果升级失败,则升级将使用HTTP错误响应回复客户端
// 返回一个 Conn 指针,使用 Conn 读写数据与客户端通信。
func (u *Upgrader) Upgrade(w http.ResponseWriter, r *http.Request, responseHeader http.Header) (*Conn, error)
</code></pre>
<h2 id="websocket实例">WebSocket实例</h2>
<h3 id="servergo">Server.go</h3>
<pre><code class="language-go">package main

import (
        "fmt"
        "github.com/gorilla/websocket"
        "net"
        "net/http"
        "os"
        "strconv"
        "time"
)

type (
        Request struct {
                A int
                B int
        }
        Response struct {
                Sum int
        }
        WsServer struct {
                listener net.Listener
                addr   string
                upgrade*websocket.Upgrader
        }
)

func CheckError(err error) {
        if err != nil {
                fmt.Println(err)
                os.Exit(1)
        }
}

func NewWsServer(port int) *WsServer {
        ws := new(WsServer)
        ws.addr = "0.0.0.0:" + strconv.Itoa(port)
        ws.upgrade = &amp;websocket.Upgrader{
                HandshakeTimeout: 2 * time.Second,
                ReadBufferSize:   1024,
                WriteBufferSize:1024,
                Error:            func(w http.ResponseWriter, r *http.Request, status int, reason error) {},
                CheckOrigin:      func(r *http.Request) bool { return true },
        }
        return ws
}

func (ws *WsServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
        if r.URL.Path != "/add" {
                httpCode := http.StatusInternalServerError
                phrase := http.StatusText(httpCode)
                http.Error(w, phrase, httpCode)
        }
        for key, values := range r.Header {
                fmt.Printf("%s:%v\n", key, values)
        }
        conn, err := ws.upgrade.Upgrade(w, r, nil)
        if err != nil {
                fmt.Printf("upgrade from http to websocket failed : %v\n", err)
        }
        defer conn.Close()
        _ = conn.SetReadDeadline(time.Now().Add(30 * time.Second))
        for {
                var request Request
                err = conn.ReadJSON(&amp;request)
                if err != nil {
                        fmt.Printf("Mage read error: %v\n", err)
                        break
                }
                fmt.Printf("receive request a=%d b=%d\n", request.A, request.B)
                sum := request.A + request.B
                response := Response{
                        Sum: sum,
                }
                err = conn.WriteJSON(&amp;response)
                CheckError(err)
        }
}

func main() {
        ws := NewWsServer(3434)
        listener, err := net.Listen("tcp", ws.addr)
        CheckError(err)
        ws.listener = listener
        err = http.Serve(listener, ws)
        CheckError(err)
}

</code></pre>
<h3 id="clientgo">Client.go</h3>
<pre><code class="language-go">package main

import (
        "fmt"
        "github.com/gorilla/websocket"
        "net/http"
        "os"
        "time"
)
type (
        Request struct {
                A int
                B int
        }
        Response struct {
                Sum int
        }
)

func CheckError(err error){
        if err != nil {
                fmt.Println(err)
                os.Exit(1)
        }
}


func main(){
        dialer := &amp;websocket.Dialer{}
        header := http.Header{
                "name":[]string{"Tome","Jim"},
        }
        conn, resp, err := dialer.Dial("ws://127.0.0.1:3434/add",header)
        CheckError(err)
        for key,values := range resp.Header {
                fmt.Printf("%s:%v\n",key,values)
        }
        deferconn.Close()

        for {
                request := Request{A: 3,B: 9}
                err = conn.WriteJSON(request)
                CheckError(err)

                var response Response
                err = conn.ReadJSON(&amp;response)
                fmt.Printf("response sum=%d\n",response.Sum)
                time.Sleep(time.Second)
        }

}

</code></pre>
<h2 id="多人聊天室案例">多人聊天室案例</h2>
<p><img src="https://img2020.cnblogs.com/blog/1774189/202110/1774189-20211018002124500-974360576.png" alt="" loading="lazy"></p>
<p><strong>Hub</strong>:持有每一个client的指针,broadcast管道里有数据时,把它写入每一个client的send管道中,注销client时关闭client的send管道。</p>
<p><strong>client</strong>:前端(Browser)请求建立websocket连接时,为这条websocket连接专门启用一个协程,创建一个client,把前端请求发来的数据写入到hub中的broadcast管道中,把自身管道里的数据发送写入给前端,跟前端的连接断开时,请求从hub中注销自己。</p>
<p><strong>前端(Browser)</strong>:当打开浏览器界面时,前端会请求建立websocket连接,关闭浏览器界面时会主动关闭websocket连接。</p>
<p>存活监测:当hub发现client的send管道写不进数据时,把client注销掉,client给websocket连接设置一个读超时,并周期性地给前端发ping消息,如果没有收到pong消息,则下一次的conn.read()会报超时错误,此时client关闭websocket连接。</p>
<h3 id="hubgo">hub.go</h3>
<pre><code class="language-go">package main

type Hub struct {
        clients    map[*Client]bool //维护所有的client
        broadcastchan []byte      //广播消息
        register   chan *Client   //注册
        unregister chan *Client   //注销

}

func NewHub() *Hub {
        return &amp;Hub{
                clients:    make(map[*Client]bool),
                broadcast:make(chan []byte), //同步管道,确保hub消息不堆积,同时多个client给hub发数据会阻塞
                register:   make(chan *Client),
                unregister: make(chan *Client),
        }
}

func (hub *Hub) Run() {
        for {
                select {
                case client := &lt;-hub.register:
                        //client上线,注册
                        hub.clients = true
                case client := &lt;-hub.unregister:
                        //查询当前client是否存在
                        if _, exists := hub.clients; exists {
                                //注销client 通道
                                close(client.send)
                                //删除注销的client
                                delete(hub.clients, client)
                        }
                case msg := &lt;-hub.broadcast:
                        //将message广播给每一位client
                        for client := range hub.clients {
                                select {
                                case client.send &lt;- msg:
                                //异常client处理
                                default:
                                        close(client.send)
                                        //删除异常的client
                                        delete(hub.clients, client)
                                }
                        }
                }
        }
}

</code></pre>
<h3 id="clientgo-1">client.go</h3>
<pre><code class="language-go">package main

import (
        "bytes"
        "fmt"
        "github.com/gorilla/websocket"
        "net/http"
        "time"
)

var (
        pongWait         = 60 * time.Second//等待时间
        pingPeriod       = 9 * pongWait / 10 //周期54s
        maxMsgSize int64 = 512               //消息最大长度
        writeWait      = 10 * time.Second//
)
var (
        newLine = []byte{'\n'}
        space   = []byte{' '}
)
var upgrader = websocket.Upgrader{
        HandshakeTimeout: 2 * time.Second, //握手超时时间
        ReadBufferSize:   1024,            //读缓冲大小
        WriteBufferSize:1024,            //写缓冲大小
        CheckOrigin:      func(r *http.Request) bool { return true },
        Error:            func(w http.ResponseWriter, r *http.Request, status int, reason error) {},
}

type Client struct {
        send      chan []byte
        hub       *Hub
        conn      *websocket.Conn
        frontName []byte //前端的名字,用于展示在消息前面
}

func (client *Client) read() {
        defer func() {
                //hub中注销client
                client.hub.unregister &lt;- client
                fmt.Printf("close connection to %s\n", client.conn.RemoteAddr().String())
                //关闭websocket管道
                client.conn.Close()
        }()
        //一次从管管中读取的最大长度
        client.conn.SetReadLimit(maxMsgSize)
        //连接中,每隔54秒向客户端发一次ping,客户端返回pong,所以把SetReadDeadline设为60秒,超过60秒后不允许读
        _ = client.conn.SetReadDeadline(time.Now().Add(pongWait))
        //心跳
        client.conn.SetPongHandler(func(appData string) error {
                //每次收到pong都把deadline往后推迟60秒
                _ = client.conn.SetReadDeadline(time.Now().Add(pongWait))
                return nil
        })

        for {
                //如果前端主动断开连接,运行会报错,for循环会退出。注册client时,hub中会关闭client.send管道
                _, msg, err := client.conn.ReadMessage()
                if err != nil {
                        //如果以意料之外的关闭状态关闭,就打印日志
                        if websocket.IsUnexpectedCloseError(err, websocket.CloseAbnormalClosure, websocket.CloseGoingAway) {
                                fmt.Printf("read from websocket err: %v\n", err)
                        }
                        //ReadMessage失败,关闭websocket管道、注销client,退出
                        break
                } else {
                        //换行符替换成空格,去除首尾空格
                        message := bytes.TrimSpace(bytes.Replace(msg, newLine, space, -1))
                        if len(client.frontName) == 0 {
                                //赋给frontName,不进行广播
                                client.frontName = message
                                fmt.Printf("%s online\n", string(client.frontName))
                        } else {
                                //要广播的内容前面加上front的名字,从websocket连接里读出数据,发给hub的broadcast
                                client.hub.broadcast &lt;- bytes.Join([][]byte{client.frontName, message}, []byte(": "))
                        }
                }
        }
}

//从hub的broadcast那儿读限数据,写到websocket连接里面去
func (client *Client) write() {
        //给前端发心跳,看前端是否还存活
        ticker := time.NewTicker(pingPeriod)
        defer func() {
                //ticker不用就stop,防止协程泄漏
                ticker.Stop()
                fmt.Printf("close connection to %s\n", client.conn.RemoteAddr().String())
                //给前端写数据失败,关闭连接
                client.conn.Close()
        }()

        for {
                select {
                //正常情况是hub发来了数据。如果前端断开了连接,read()会触发client.send管道的关闭,该case会立即执行。从而执行!ok里的return,从而执行defer
                case msg, ok := &lt;-client.send:
                        //client.send该管道被hub关闭
                        if !ok {
                                //写一条关闭信息就可以结束一切
                                _ = client.conn.WriteMessage(websocket.CloseMessage, []byte{})
                                return
                        }
                        //10秒内必须把信息写给前端(写到websocket连接里去),否则就关闭连接
                        _ = client.conn.SetWriteDeadline(time.Now().Add(writeWait))
                        //通过NextWriter创建一个新的writer,主要是为了确保上一个writer已经被关闭,即它想写的内容已经flush到conn里去
                        if writer, err := client.conn.NextWriter(websocket.TextMessage); err != nil {
                                return
                        } else {
                                _, _ = writer.Write(msg)
                                _, _ = writer.Write(newLine) //每发一条消息,都加一个换行符
                                //为了提升性能,如果client.send里还有消息,则趁这一次都写给前端
                                n := len(client.send)
                                for i := 0; i &lt; n; i++ {
                                        _, _ = writer.Write(&lt;-client.send)
                                        _, _ = writer.Write(newLine)
                                }
                                if err := writer.Close(); err != nil {
                                        return //结束一切
                                }
                        }
                case &lt;-ticker.C:
                        _ = client.conn.SetWriteDeadline(time.Now().Add(writeWait))
                        //心跳保持,给浏览器发一个PingMessage,等待浏览器返回PongMessage
                        if err := client.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
                                return //写websocket连接失败,说明连接出问题了,该client可以over了
                        }
                }
        }
}

func ServeWs(hub *Hub, w http.ResponseWriter, r *http.Request) {
        conn, err := upgrader.Upgrade(w, r, nil) //http升级为websocket协议
        if err != nil {
                fmt.Printf("upgrade error: %v\n", err)
                return
        }
        fmt.Printf("connect to client %s\n", conn.RemoteAddr().String())
        //每来一个前端请求,就会创建一个client
        client := &amp;Client{hub: hub, conn: conn, send: make(chan []byte, 256)}
        //向hub注册client
        client.hub.register &lt;- client

        //启动子协程,运行ServeWs的协程退出后子协程也不会能出
        //websocket是全双工模式,可以同时read和write
        go client.read()
        go client.write()
}

</code></pre>
<h3 id="maingo">main.go</h3>
<pre><code class="language-go">package main

import (
        "flag"
        "fmt"
        "net/http"
)

func serveHome(w http.ResponseWriter, r *http.Request) {
        //只允许访问根路径
        if r.URL.Path != "/" {
                http.Error(w, "Not Found", http.StatusNotFound)
                return
        }
        //只允许GET请求
        if r.Method != "GET" {
                http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
                return
        }
        http.ServeFile(w, r, "home.html")
}

func main() {
        //如果命令行不指定port参数,则默认为3434
        port := flag.String("port", "3434", "http service port")
        //解析命令行输入的port参数
        flag.Parse()
        hub := NewHub()
        go hub.Run()
        //注册每种请求对应的处理函数
        http.HandleFunc("/", serveHome)
        http.HandleFunc("/ws", func(rw http.ResponseWriter, r *http.Request) {
                ServeWs(hub, rw, r)
        })
        //如果启动成功,该行会一直阻塞,hub.run()会一直运行
        if err := http.ListenAndServe(":"+*port, nil); err != nil {
                fmt.Printf("start http service error: %s\n", err)
        }
}

//go run main.go --port 3434

</code></pre>
<h3 id="homehtml">home.html</h3>
<pre><code class="language-html">&lt;!DOCTYPE html&gt;
&lt;html lang="en"&gt;

&lt;head&gt;
    &lt;title&gt;聊天室&lt;/title&gt;
    &lt;script type="text/javascript"&gt;
      window.onload = function () {//页面打开时执行以下初始化内容
            var conn;
            var msg = document.getElementById("msg");
            var log = document.getElementById("log");

            function appendLog(item) {
                var doScroll = log.scrollTop &gt; log.scrollHeight - log.clientHeight - 1;
                log.appendChild(item);
                if (doScroll) {
                  log.scrollTop = log.scrollHeight - log.clientHeight;
                }
            }

            document.getElementById("form").onsubmit = function () {
                if (!conn) {
                  return false;
                }
                if (!msg.value) {
                  return false;
                }
                conn.send(msg.value);
                msg.value = "";
                return false;
            };

            if (window["WebSocket"]) {//如果支持websockte就尝试连接
                //从浏览器的开发者工具里看一下ws的请求头
                conn = new WebSocket("ws://127.0.0.1:3434/ws");//请求跟websocket服务端建立连接(注意端口要一致)。关闭浏览器页面时会自动断开连接
                conn.onclose = function (evt) {
                  var item = document.createElement("div")
                  item.innerHTML = "&lt;b&gt;Connection closed.&lt;/b&gt;";//连接关闭时打印一条信息
                  appendLog(item);
                };
                conn.onmessage = function (evt) {//如果conn里有消息
                  var messages = evt.data.split('\n');//用换行符分隔每条消息
                  for (var i = 0; i &lt; messages.length; i++) {
                        var item = document.createElement("div");
                        item.innerText = messages;//把消息逐条显示在屏幕上
                        appendLog(item);
                  }
                };
            } else {
                var item = document.createElement("div");
                item.innerHTML = "&lt;b&gt;Your browser does not support WebSockets.&lt;/b&gt;";
                appendLog(item);
            }
      };
    &lt;/script&gt;
    &lt;style type="text/css"&gt;
      html {
            overflow: hidden;
      }

      body {
            overflow: hidden;
            padding: 0;
            margin: 0;
            width: 100%;
            height: 100%;
            background: gray;
      }

      #log {
            background: white;
            margin: 0;
            padding: 0.5em 0.5em 0.5em 0.5em;
            position: absolute;
            top: 0.5em;
            left: 0.5em;
            right: 0.5em;
            bottom: 3em;
            overflow: auto;
      }

      #form {
            padding: 0 0.5em 0 0.5em;
            margin: 0;
            position: absolute;
            bottom: 1em;
            left: 0px;
            width: 100%;
            overflow: hidden;
      }
    &lt;/style&gt;
&lt;/head&gt;

&lt;body&gt;
    &lt;div id="log"&gt;&lt;/div&gt;
    &lt;form id="form"&gt;
      &lt;input type="submit" value="发送" /&gt;
      &lt;input type="text" id="msg" size="100" autofocus /&gt;
    &lt;/form&gt;
&lt;/body&gt;

&lt;/html&gt;
</code></pre><br><br>
来源:https://www.cnblogs.com/remixnameless/p/15418929.html
頁: [1]
查看完整版本: Go WebSocket 实现