golang使用zookeeper进行CURD
<div id="navCategory"><h5 class="catalogue">目录</h5><ul class="first_class_ul"><li><a href="#_label0">一、Zookeeper入门</a></li><ul class="second_class_ul"><li><a href="#_lab2_0_0">1.1. Zookeeper简介</a></li></ul><li><a href="#_label1">二.启动zookeeper</a></li><ul class="second_class_ul"></ul><li><a href="#_label2">三.核心包</a></li><ul class="second_class_ul"></ul><li><a href="#_label3">四.Golang实现Zookeeper核心功能</a></li><ul class="second_class_ul"><li><a href="#_lab2_3_1">4.1 建立连接</a></li><li><a href="#_lab2_3_2">4.2创建节点</a></li><li><a href="#_lab2_3_3">4.3查询节点</a></li><li><a href="#_lab2_3_4">4.4 节点是否存在</a></li><li><a href="#_lab2_3_5">4.5删除节点</a></li><li><a href="#_lab2_3_6">4.6 修改节点内容</a></li><li><a href="#_lab2_3_7">4.7获取目录信息</a></li></ul><li><a href="#_label4">五.watch</a></li><ul class="second_class_ul"></ul></ul></div><p class="maodian"><a name="_label0"></a></p><h2>一、Zookeeper入门</h2><p class="maodian"><a name="_lab2_0_0"></a></p><h3>1.1. Zookeeper简介</h3>
<p>Zookeeper是一个分布式数据库(程序协调服务),Hadoop子项目;以树状方式维护节点方数据的增、删、改、查;通过监听可以获取相应消息事件;<br />节点类型:</p>
<ul><li>持久节点:一直存储在服务器上(0)</li><li>临时节点:会话失效、节点自动清理(FlagEphemeral)</li><li>顺序节点:节点创建时自动分配序列号(FlagSequence)</li></ul>
<p class="maodian"><a name="_label1"></a></p><h2>二.启动zookeeper</h2>
<p style="text-align:center"><img alt="" src="https://img.jbzj.com/file_images/article/202511/2025112109481128.jpg" /></p>
<p class="maodian"><a name="_label2"></a></p><h2>三.核心包</h2>
<p>go-zookeeper</p>
<div class="jb51code"><pre class="brush:go;">go get github.com/samuel/go-zookeeper/zk
文档地址: http://godoc.org/github.com/samuel/go-zookeeper/zk
</pre></div>
<p class="maodian"><a name="_label3"></a></p><h2>四.Golang实现Zookeeper核心功能</h2>
<p class="maodian"><a name="_lab2_3_1"></a></p><h3>4.1 建立连接</h3>
<div class="jb51code"><pre class="brush:go;">func GetConnect(zkList []string) (conn *zk.Conn) {
// 创建监听的option,用于初始化zk
conn, _, err := zk.Connect(zkList, 10*time.Second)
if err != nil {
fmt.Println(err)
}
return
}
</pre></div>
<p class="maodian"><a name="_lab2_3_2"></a></p><h3>4.2创建节点</h3>
<div class="jb51code"><pre class="brush:go;">func Create(conn *zk.Conn, path string, data []byte, flags int32, acl int32) (val string, err error) {
//flags有4种取值:
//0:永久,除非手动删除
//zk.FlagEphemeral = 1:短暂,session断开则改节点也被删除
//zk.FlagSequence= 2:会自动在节点后面添加序号
//3:Ephemeral和Sequence,即,短暂且自动添加序号
// 获取访问控制权限
var acls []zk.ACL
if acl == 0 {
/**
PermRead = 1 << iota 1
PermWrite 2
PermCreate 4
PermDelete 8
PermAdmin 16
PermAll = 0x1f 31
*/
acls = zk.WorldACL(zk.PermAll)
} else {
acls = zk.WorldACL(acl)
}
val, err = conn.Create(path, data, flags, acls)
return
}
var (
zkList = []string{"127.0.0.1:2181"}
path = "/root"
)
func TestCreate(t *testing.T) {
conn := GetConnect(zkList)
defer conn.Close()
//创建节点
val, err := Create(conn, path, []byte("root value"), 0, 31)
if err != nil {
fmt.Printf("创建失败: %v\n", err)
return
}
fmt.Printf("创建: %s 成功", val)
}
</pre></div>
<p class="maodian"><a name="_lab2_3_3"></a></p><h3>4.3查询节点</h3>
<div class="jb51code"><pre class="brush:go;">// 查
func Get(conn *zk.Conn, path string) (dataStr string, stat *zk.Stat, err error) {
data, stat, err := conn.Get(path)
if err != nil {
return "", nil, err
}
return string(data), stat, err
}
func TestGet(t *testing.T) {
conn := GetConnect(zkList)
defer conn.Close()
//查询节点
val, _, err := Get(conn, path)
if err != nil {
fmt.Printf("查询%s失败, err: %v\n", path, err)
return
}
fmt.Printf("%s 的值为 %s\n", path, val)
}
</pre></div>
<p class="maodian"><a name="_lab2_3_4"></a></p><h3>4.4 节点是否存在</h3>
<div class="jb51code"><pre class="brush:go;">func Exists(conn *zk.Conn, path string) (exist bool, err error) {
exist, _, err = conn.Exists(path)
return
}
func TestExist(t *testing.T) {
conn := GetConnect(zkList)
defer conn.Close()
//是否存在
val, err := Exists(conn, path)
if err != nil {
fmt.Printf("查询%s失败, err: %v\n", path, err)
return
}
if val {
fmt.Printf("%s 存在\n", path)
} else {
fmt.Printf("%s 不存在\n", path)
}
}
</pre></div>
<p class="maodian"><a name="_lab2_3_5"></a></p><h3>4.5删除节点</h3>
<div class="jb51code"><pre class="brush:go;">//删除 cas支持
func Del(conn *zk.Conn, path string) (err error) {
_, sate, _ := Get(conn, path)
fmt.Println(sate)
err = conn.Delete(path, sate.Version)
return err
}
func TestDel(t *testing.T) {
conn := GetConnect(zkList)
defer conn.Close()
// 删除
err := Del(conn, path)
if err != nil {
fmt.Printf("数据删除失败: %v\n", err)
return
}
fmt.Println("数据删除成功")
}
</pre></div>
<p class="maodian"><a name="_lab2_3_6"></a></p><h3>4.6 修改节点内容</h3>
<div class="jb51code"><pre class="brush:go;">// 改 CAS支持
// 可以通过此种方式保证原子性
func Modify(conn *zk.Conn, path string, newData []byte) (sate *zk.Stat, err error) {
_, sate, _ = conn.Get(path)
fmt.Println(sate)
sate, err = conn.Set(path, newData, sate.Version)
return
}
func TestModify(t *testing.T) {
conn := GetConnect(zkList)
defer conn.Close()
newData := []byte("hello zookeeper")
stat, err := Modify(conn, path, newData)
if err != nil {
fmt.Printf("数据修改失败: %v\n", err)
return
}
fmt.Printf("数据修改成功,stat %v\n", stat)
}
</pre></div>
<p class="maodian"><a name="_lab2_3_7"></a></p><h3>4.7获取目录信息</h3>
<div class="jb51code"><pre class="brush:go;">func Children(conn *zk.Conn, path string) (data []string, err error) {
data, _, err = conn.Children(path)
return
}
func TestChildren(t *testing.T) {
conn := GetConnect(zkList)
defer conn.Close()
data, err := Children(conn, "/")
if err != nil {
fmt.Printf("获取数据失败: %v\n", err)
return
}
fmt.Printf("获取数据成功,data %v\n", data)
}
</pre></div>
<p class="maodian"><a name="_label4"></a></p><h2>五.watch</h2>
<div class="jb51code"><pre class="brush:go;">func callback(event zk.Event) {
fmt.Println(">>>>>>>>>>>>>>>>>>>")
fmt.Println("path:", event.Path)
fmt.Println("type:", event.Type.String())
fmt.Println("state:", event.State.String())
fmt.Println("<<<<<<<<<<<<<<<<<<<")
}
func ZKOperateWatchTest() {
fmt.Printf("ZKOperateWatchTest\n")
option := zk.WithEventCallback(callback)
var hosts = []string{"localhost:2181"}
conn, _, err := zk.Connect(hosts, time.Second*5, option)
if err != nil {
fmt.Println(err)
return
}
defer conn.Close()
var path1 = "/zk_test_go1"
var data1 = []byte("zk_test_go1_data1")
exist, s, _, err := conn.ExistsW(path1)
if err != nil {
fmt.Println(err)
return
}
fmt.Printf("path[%s] exist[%t]\n", path1, exist)
fmt.Printf("state:\n")
fmt.Printf("%s\n", ZkStateStringFormat(s))
// try create
var acls = zk.WorldACL(zk.PermAll)
p, err_create := conn.Create(path1, data1, zk.FlagEphemeral, acls)
if err_create != nil {
fmt.Println(err_create)
return
}
fmt.Printf("created path[%s]\n", p)
time.Sleep(time.Second * 2)
exist, s, _, err = conn.ExistsW(path1)
if err != nil {
fmt.Println(err)
return
}
fmt.Printf("path[%s] exist[%t] after create\n", path1, exist)
fmt.Printf("state:\n")
fmt.Printf("%s\n", ZkStateStringFormat(s))
// delete
conn.Delete(path1, s.Version)
exist, s, _, err = conn.ExistsW(path1)
if err != nil {
fmt.Println(err)
return
}
fmt.Printf("path[%s] exist[%t] after delete\n", path1, exist)
fmt.Printf("state:\n")
fmt.Printf("%s\n", ZkStateStringFormat(s))
}
</pre></div>
<p>不止会打印watch监听的节点信息,还有打印session会话的状态</p>
頁:
[1]