胡竹林 發表於 2025-11-21 09:49:27

golang使用zookeeper进行CURD

<div id="navCategory"><h5 class="catalogue">目录</h5><ul class="first_class_ul"><li><a href="#_label0">一、Zookeeper入门</a></li><ul class="second_class_ul"><li><a href="#_lab2_0_0">1.1. Zookeeper简介</a></li></ul><li><a href="#_label1">二.启动zookeeper</a></li><ul class="second_class_ul"></ul><li><a href="#_label2">三.核心包</a></li><ul class="second_class_ul"></ul><li><a href="#_label3">四.Golang实现Zookeeper核心功能</a></li><ul class="second_class_ul"><li><a href="#_lab2_3_1">4.1 建立连接</a></li><li><a href="#_lab2_3_2">4.2创建节点</a></li><li><a href="#_lab2_3_3">4.3查询节点</a></li><li><a href="#_lab2_3_4">4.4 节点是否存在</a></li><li><a href="#_lab2_3_5">4.5删除节点</a></li><li><a href="#_lab2_3_6">4.6 修改节点内容</a></li><li><a href="#_lab2_3_7">4.7获取目录信息</a></li></ul><li><a href="#_label4">五.watch</a></li><ul class="second_class_ul"></ul></ul></div><p class="maodian"><a name="_label0"></a></p><h2>一、Zookeeper入门</h2>
<p class="maodian"><a name="_lab2_0_0"></a></p><h3>1.1. Zookeeper简介</h3>
<p>Zookeeper是一个分布式数据库(程序协调服务),Hadoop子项目;以树状方式维护节点方数据的增、删、改、查;通过监听可以获取相应消息事件;<br />节点类型:</p>
<ul><li>持久节点:一直存储在服务器上(0)</li><li>临时节点:会话失效、节点自动清理(FlagEphemeral)</li><li>顺序节点:节点创建时自动分配序列号(FlagSequence)</li></ul>
<p class="maodian"><a name="_label1"></a></p><h2>二.启动zookeeper</h2>
<p style="text-align:center"><img alt="" src="https://img.jbzj.com/file_images/article/202511/2025112109481128.jpg" /></p>
<p class="maodian"><a name="_label2"></a></p><h2>三.核心包</h2>
<p>go-zookeeper</p>
<div class="jb51code"><pre class="brush:go;">go get github.com/samuel/go-zookeeper/zk
文档地址: http://godoc.org/github.com/samuel/go-zookeeper/zk
</pre></div>
<p class="maodian"><a name="_label3"></a></p><h2>四.Golang实现Zookeeper核心功能</h2>
<p class="maodian"><a name="_lab2_3_1"></a></p><h3>4.1 建立连接</h3>
<div class="jb51code"><pre class="brush:go;">func GetConnect(zkList []string) (conn *zk.Conn) {
        // 创建监听的option,用于初始化zk
        conn, _, err := zk.Connect(zkList, 10*time.Second)
        if err != nil {
                fmt.Println(err)
        }
        return
}
</pre></div>
<p class="maodian"><a name="_lab2_3_2"></a></p><h3>4.2创建节点</h3>
<div class="jb51code"><pre class="brush:go;">func Create(conn *zk.Conn, path string, data []byte, flags int32, acl int32) (val string, err error) {
        //flags有4种取值:
        //0:永久,除非手动删除
        //zk.FlagEphemeral = 1:短暂,session断开则改节点也被删除
        //zk.FlagSequence= 2:会自动在节点后面添加序号
        //3:Ephemeral和Sequence,即,短暂且自动添加序号
        // 获取访问控制权限
        var acls []zk.ACL
        if acl == 0 {
                /**
                PermRead = 1 &lt;&lt; iota   1
                PermWrite            2
                PermCreate             4
                PermDelete             8
                PermAdmin             16
                PermAll = 0x1f      31
                */
                acls = zk.WorldACL(zk.PermAll)
        } else {
                acls = zk.WorldACL(acl)
        }

        val, err = conn.Create(path, data, flags, acls)
        return
}

var (
        zkList = []string{"127.0.0.1:2181"}
        path   = "/root"
)

func TestCreate(t *testing.T) {
        conn := GetConnect(zkList)

        defer conn.Close()
        //创建节点
        val, err := Create(conn, path, []byte("root value"), 0, 31)
        if err != nil {
                fmt.Printf("创建失败: %v\n", err)
                return
        }
        fmt.Printf("创建: %s 成功", val)
}
</pre></div>
<p class="maodian"><a name="_lab2_3_3"></a></p><h3>4.3查询节点</h3>
<div class="jb51code"><pre class="brush:go;">// 查
func Get(conn *zk.Conn, path string) (dataStr string, stat *zk.Stat, err error) {
        data, stat, err := conn.Get(path)
        if err != nil {
                return "", nil, err
        }
        return string(data), stat, err
}

func TestGet(t *testing.T) {
        conn := GetConnect(zkList)

        defer conn.Close()
        //查询节点
        val, _, err := Get(conn, path)
        if err != nil {
                fmt.Printf("查询%s失败, err: %v\n", path, err)
                return
        }
        fmt.Printf("%s 的值为 %s\n", path, val)
}

</pre></div>
<p class="maodian"><a name="_lab2_3_4"></a></p><h3>4.4 节点是否存在</h3>
<div class="jb51code"><pre class="brush:go;">func Exists(conn *zk.Conn, path string) (exist bool, err error) {
        exist, _, err = conn.Exists(path)
        return
}

func TestExist(t *testing.T) {
        conn := GetConnect(zkList)

        defer conn.Close()
        //是否存在
        val, err := Exists(conn, path)
        if err != nil {
                fmt.Printf("查询%s失败, err: %v\n", path, err)
                return
        }
        if val {
                fmt.Printf("%s 存在\n", path)
        } else {
                fmt.Printf("%s 不存在\n", path)
        }
}

</pre></div>
<p class="maodian"><a name="_lab2_3_5"></a></p><h3>4.5删除节点</h3>
<div class="jb51code"><pre class="brush:go;">//删除 cas支持
func Del(conn *zk.Conn, path string) (err error) {
        _, sate, _ := Get(conn, path)
        fmt.Println(sate)
        err = conn.Delete(path, sate.Version)
        return err
}

func TestDel(t *testing.T) {
        conn := GetConnect(zkList)

        defer conn.Close()
        // 删除
        err := Del(conn, path)
        if err != nil {
                fmt.Printf("数据删除失败: %v\n", err)
                return
        }
        fmt.Println("数据删除成功")
}

</pre></div>
<p class="maodian"><a name="_lab2_3_6"></a></p><h3>4.6 修改节点内容</h3>
<div class="jb51code"><pre class="brush:go;">// 改 CAS支持
// 可以通过此种方式保证原子性
func Modify(conn *zk.Conn, path string, newData []byte) (sate *zk.Stat, err error) {
        _, sate, _ = conn.Get(path)
        fmt.Println(sate)
        sate, err = conn.Set(path, newData, sate.Version)
        return
}


func TestModify(t *testing.T) {
        conn := GetConnect(zkList)

        defer conn.Close()
        newData := []byte("hello zookeeper")
        stat, err := Modify(conn, path, newData)
        if err != nil {
                fmt.Printf("数据修改失败: %v\n", err)
                return
        }
        fmt.Printf("数据修改成功,stat %v\n", stat)
}

</pre></div>
<p class="maodian"><a name="_lab2_3_7"></a></p><h3>4.7获取目录信息</h3>
<div class="jb51code"><pre class="brush:go;">func Children(conn *zk.Conn, path string) (data []string, err error) {
        data, _, err = conn.Children(path)
        return
}

func TestChildren(t *testing.T) {
        conn := GetConnect(zkList)

        defer conn.Close()
        data, err := Children(conn, "/")
        if err != nil {
                fmt.Printf("获取数据失败: %v\n", err)
                return
        }
        fmt.Printf("获取数据成功,data %v\n", data)
}

</pre></div>
<p class="maodian"><a name="_label4"></a></p><h2>五.watch</h2>
<div class="jb51code"><pre class="brush:go;">func callback(event zk.Event) {
    fmt.Println("&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;")
    fmt.Println("path:", event.Path)
    fmt.Println("type:", event.Type.String())
    fmt.Println("state:", event.State.String())
    fmt.Println("&lt;&lt;&lt;&lt;&lt;&lt;&lt;&lt;&lt;&lt;&lt;&lt;&lt;&lt;&lt;&lt;&lt;&lt;&lt;")
}

func ZKOperateWatchTest() {
    fmt.Printf("ZKOperateWatchTest\n")

    option := zk.WithEventCallback(callback)
    var hosts = []string{"localhost:2181"}
    conn, _, err := zk.Connect(hosts, time.Second*5, option)
    if err != nil {
      fmt.Println(err)
      return
    }
    defer conn.Close()

    var path1 = "/zk_test_go1"
    var data1 = []byte("zk_test_go1_data1")
    exist, s, _, err := conn.ExistsW(path1)
    if err != nil {
      fmt.Println(err)
      return
    }

    fmt.Printf("path[%s] exist[%t]\n", path1, exist)
    fmt.Printf("state:\n")
    fmt.Printf("%s\n", ZkStateStringFormat(s))

    // try create
    var acls = zk.WorldACL(zk.PermAll)
    p, err_create := conn.Create(path1, data1, zk.FlagEphemeral, acls)
    if err_create != nil {
      fmt.Println(err_create)
      return
    }
    fmt.Printf("created path[%s]\n", p)

    time.Sleep(time.Second * 2)

    exist, s, _, err = conn.ExistsW(path1)
    if err != nil {
      fmt.Println(err)
      return
    }

    fmt.Printf("path[%s] exist[%t] after create\n", path1, exist)
    fmt.Printf("state:\n")
    fmt.Printf("%s\n", ZkStateStringFormat(s))

    // delete
    conn.Delete(path1, s.Version)

    exist, s, _, err = conn.ExistsW(path1)
    if err != nil {
      fmt.Println(err)
      return
    }
    fmt.Printf("path[%s] exist[%t] after delete\n", path1, exist)
    fmt.Printf("state:\n")
    fmt.Printf("%s\n", ZkStateStringFormat(s))
}
</pre></div>
<p>不止会打印watch监听的节点信息,还有打印session会话的状态</p>
頁: [1]
查看完整版本: golang使用zookeeper进行CURD