go使用go-redis操作redis 连接类型,pipline, 发布订阅
<p>内容:</p><p>一 . 客户端Client(普通模式,主从模式,哨兵模式)<br>二. conn连接(连接, pipline, 发布订阅等)<br>三. 示例程序(连接, pipline, 发布订阅等)<br>客户端<br>Client 普通模式的客户端<br>go redis依据用途提供了多种客户端创建的函数, 如下:</p>
<p>func NewClient(opt *Options) *Client<br>func NewFailoverClient(failoverOpt *FailoverOptions) *Client<br>func (c *Client) Context() context.Context<br>func (c *Client) Do(args ...interface{}) *Cmd<br>func (c *Client) DoContext(ctx context.Context, args ...interface{}) *Cmd<br>func (c *Client) Options() *Options<br>func (c *Client) PSubscribe(channels ...string) *PubSub<br>func (c *Client) Pipeline() Pipeliner<br>func (c *Client) Pipelined(fn func(Pipeliner) error) ([]Cmder, error)<br>func (c *Client) PoolStats() *PoolStats<br>func (c *Client) Process(cmd Cmder) error<br>func (c *Client) ProcessContext(ctx context.Context, cmd Cmder) error<br>func (c *Client) SetLimiter(l Limiter) *Client<br>func (c *Client) Subscribe(channels ...string) *PubSub<br>func (c *Client) TxPipeline() Pipeliner<br>func (c *Client) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error)<br>func (c *Client) Watch(fn func(*Tx) error, keys ...string) error<br>func (c *Client) WithContext(ctx context.Context) *Client<br>1<br>2<br>3<br>4<br>5<br>6<br>7<br>8<br>9<br>10<br>11<br>12<br>13<br>14<br>15<br>16<br>17<br>18<br>NewClient 创建一个普通连接</p>
<p>NewFailoverClient 具有故障检测以及故障转移的client</p>
<p>PSubscribe / Subscribe 发布订阅模式的client</p>
<p>Pipeline 启用pipline管道模式的client</p>
<p>PoolStats 连接池状态</p>
<p>Close 关闭连接</p>
<p>集群模式的ClusterClient<br>func NewClusterClient(opt *ClusterOptions) *ClusterClient<br>func (c *ClusterClient) Close() error<br>func (c *ClusterClient) Context() context.Context<br>func (c *ClusterClient) DBSize() *IntCmd<br>func (c *ClusterClient) Do(args ...interface{}) *Cmd<br>func (c *ClusterClient) DoContext(ctx context.Context, args ...interface{}) *Cmd<br>func (c *ClusterClient) ForEachMaster(fn func(client *Client) error) error<br>func (c *ClusterClient) ForEachNode(fn func(client *Client) error) error<br>func (c *ClusterClient) ForEachSlave(fn func(client *Client) error) error<br>func (c *ClusterClient) Options() *ClusterOptions<br>func (c *ClusterClient) PSubscribe(channels ...string) *PubSub<br>func (c *ClusterClient) Pipeline() Pipeliner<br>func (c *ClusterClient) Pipelined(fn func(Pipeliner) error) ([]Cmder, error)<br>func (c *ClusterClient) PoolStats() *PoolStats<br>func (c *ClusterClient) Process(cmd Cmder) error<br>func (c *ClusterClient) ProcessContext(ctx context.Context, cmd Cmder) error<br>func (c *ClusterClient) ReloadState() error<br>func (c *ClusterClient) Subscribe(channels ...string) *PubSub<br>func (c *ClusterClient) TxPipeline() Pipeliner<br>func (c *ClusterClient) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error)<br>func (c *ClusterClient) Watch(fn func(*Tx) error, keys ...string) error<br>func (c *ClusterClient) WithContext(ctx context.Context) *ClusterClient<br>1<br>2<br>3<br>4<br>5<br>6<br>7<br>8<br>9<br>10<br>11<br>12<br>13<br>14<br>15<br>16<br>17<br>18<br>19<br>20<br>21<br>22<br>方法与client大致相同</p>
<p>哨兵SentinelClient<br>func NewSentinelClient(opt *Options) *SentinelClient<br>func (c *SentinelClient) CkQuorum(name string) *StringCmd<br>func (c SentinelClient) Close() error<br>func (c *SentinelClient) Context() context.Context<br>func (c *SentinelClient) Failover(name string) *StatusCmd<br>func (c *SentinelClient) FlushConfig() *StatusCmd<br>func (c *SentinelClient) GetMasterAddrByName(name string) *StringSliceCmd<br>func (c *SentinelClient) Master(name string) *StringStringMapCmd<br>func (c *SentinelClient) Masters() *SliceCmd<br>func (c *SentinelClient) Monitor(name, ip, port, quorum string) *StringCmd<br>func (c *SentinelClient) PSubscribe(channels ...string) *PubSub<br>func (c *SentinelClient) Ping() *StringCmd<br>func (c *SentinelClient) Process(cmd Cmder) error<br>func (c *SentinelClient) ProcessContext(ctx context.Context, cmd Cmder) error<br>func (c *SentinelClient) Remove(name string) *StringCmd<br>func (c *SentinelClient) Reset(pattern string) *IntCmd<br>func (c *SentinelClient) Sentinels(name string) *SliceCmd<br>func (c *SentinelClient) Set(name, option, value string) *StringCmd<br>func (c *SentinelClient) Slaves(name string) *SliceCmd<br>func (c SentinelClient) String() string<br>func (c *SentinelClient) Subscribe(channels ...string) *PubSub<br>func (c *SentinelClient) WithContext(ctx context.Context) *SentinelClient<br>1<br>2<br>3<br>4<br>5<br>6<br>7<br>8<br>9<br>10<br>11<br>12<br>13<br>14<br>15<br>16<br>17<br>18<br>19<br>20<br>21<br>22<br>连接:<br>创建量客户端之后,需要与redis建立连接才能够发送请求进行使用</p>
<p>在连接层, 提供了很多封装后的函数</p>
<p>普通连接Conn<br>例如:</p>
<p>涉及很多函数, 函数名称基本可以通过redis的命令进行类比找到,不一一列举</p>
<p>func (c Conn) Append(key, value string) *IntCmd<br>func (c Conn) Auth(password string) *StatusCmd<br>func (c Conn) BLPop(timeout time.Duration, keys ...string) *StringSliceCmd<br>func (c Conn) BRPop(timeout time.Duration, keys ...string) *StringSliceCmd<br>func (c Conn) BRPopLPush(source, destination string, timeout time.Duration) *StringCmd<br>func (c Conn) BZPopMax(timeout time.Duration, keys ...string) *ZWithKeyCmd<br>func (c Conn) BZPopMin(timeout time.Duration, keys ...string) *ZWithKeyCmd<br>func (c Conn) BgRewriteAOF() *StatusCmd<br>func (c Conn) BgSave() *StatusCmd<br>func (c Conn) BitCount(key string, bitCount *BitCount) *IntCmd<br>func (c Conn) BitField(key string, args ...interface{}) *IntSliceCmd<br>func (c Conn) BitOpAnd(destKey string, keys ...string) *IntCmd</p>
<p>....</p>
<p>func (c Conn) HKeys(key string) *StringSliceCmd<br>func (c Conn) HLen(key string) *IntCmd<br>func (c Conn) HMGet(key string, fields ...string) *SliceCmd<br>func (c Conn) HMSet(key string, fields mapinterface{}) *StatusCmd<br>func (c Conn) HScan(key string, cursor uint64, match string, count int64) *ScanCmd<br>func (c Conn) HSet(key, field string, value interface{}) *BoolCmd<br>func (c Conn) HSetNX(key, field string, value interface{}) *BoolCmd<br>func (c Conn) HVals(key string) *StringSliceCmd<br>1<br>2<br>3<br>4<br>5<br>6<br>7<br>8<br>9<br>10<br>11<br>12<br>13<br>14<br>15<br>16<br>17<br>18<br>19<br>20<br>21<br>22<br>23<br>Pipeline<br>pipline 管道批量执行命令,可以节约带宽</p>
<p>提供的方法与conn基本一致</p>
<p>func (c Pipeline) Append(key, value string) *IntCmd<br>func (c Pipeline) Auth(password string) *StatusCmd<br>func (c Pipeline) BLPop(timeout time.Duration, keys ...string) *StringSliceCmd<br>func (c Pipeline) BRPop(timeout time.Duration, keys ...string) *StringSliceCmd<br>func (c Pipeline) BRPopLPush(source, destination string, timeout time.Duration) *StringCmd<br>func (c Pipeline) BZPopMax(timeout time.Duration, keys ...string) *ZWithKeyCmd<br>func (c Pipeline) BZPopMin(timeout time.Duration, keys ...string) *ZWithKeyCmd<br>func (c Pipeline) BgRewriteAOF() *StatusCmd</p>
<p>.....</p>
<p><br>1<br>2<br>3<br>4<br>5<br>6<br>7<br>8<br>9<br>10<br>11<br>12<br>PubSub<br>发布订阅模式</p>
<p>func (c *PubSub) Channel() <-chan *Message<br>func (c *PubSub) ChannelSize(size int) <-chan *Message<br>func (c *PubSub) ChannelWithSubscriptions(size int) <-chan interface{}<br>func (c *PubSub) Close() error<br>func (c *PubSub) PSubscribe(patterns ...string) error<br>func (c *PubSub) PUnsubscribe(patterns ...string) error<br>func (c *PubSub) Ping(payload ...string) error<br>func (c *PubSub) Receive() (interface{}, error)<br>func (c *PubSub) ReceiveMessage() (*Message, error)<br>func (c *PubSub) ReceiveTimeout(timeout time.Duration) (interface{}, error)<br>func (c *PubSub) String() string<br>func (c *PubSub) Subscribe(channels ...string) error<br>func (c *PubSub) Unsubscribe(channels ...string) error<br>1<br>2<br>3<br>4<br>5<br>6<br>7<br>8<br>9<br>10<br>11<br>12<br>13<br>可以建立channel Channel()</p>
<p>设置超时时间ReceiveTimeout</p>
<p>Subscribe订阅</p>
<p>Unsubscribe取消订阅</p>
<p>PSubscribe 发布消息等</p>
<p>排他 TX<br>func (c Tx) Get(key string) *StringCmd<br>func (c Tx) GetBit(key string, offset int64) *IntCmd<br>func (c Tx) GetRange(key string, start, end int64) *StringCmd<br>func (c Tx) GetSet(key string, value interface{}) *StringCmd<br>func (c Tx) HDel(key string, fields ...string) *IntCmd<br>func (c Tx) HExists(key, field string) *BoolCmd<br>func (c Tx) HGet(key, field string) *StringCmd<br>func (c Tx) HGetAll(key string) *StringStringMapCmd<br>func (c Tx) HIncrBy(key, field string, incr int64) *IntCmd<br>func (c Tx) HIncrByFloat(key, field string, incr float64) *FloatCmd<br>func (c Tx) HKeys(key string) *StringSliceCmd<br>1<br>2<br>3<br>4<br>5<br>6<br>7<br>8<br>9<br>10<br>11<br>在conn的基础上加入排他性功能</p>
<p>提供的函数及方法与conn基本相同, 可以参考conn进行使用</p>
<p>示例程序<br>示例:</p>
<p>func ExampleClient() {<br> redisdb := redis.NewClient(&redis.Options{<br> Addr: "192.168.137.18:6379",<br> Password: "", // no password set<br> DB: 0,// use default DB<br> })</p>
<p> err := redisdb.Set("key", "value", 0).Err()<br> if err != nil {<br> panic(err)<br> }</p>
<p> val, err := redisdb.Get("key").Result()<br> if err != nil {<br> panic(err)<br> }<br> fmt.Println("key", val)</p>
<p> val2, err := redisdb.Get("missing_key").Result()<br> if err == redis.Nil {<br> fmt.Println("missing_key does not exist")<br> } else if err != nil {<br> panic(err)<br> } else {<br> fmt.Println("missing_key", val2)<br> }</p>
<p>}</p>
<p>func main() {<br> ExampleClient()<br>}<br>1<br>2<br>3<br>4<br>5<br>6<br>7<br>8<br>9<br>10<br>11<br>12<br>13<br>14<br>15<br>16<br>17<br>18<br>19<br>20<br>21<br>22<br>23<br>24<br>25<br>26<br>27<br>28<br>29<br>30<br>31<br>32<br>输出:</p>
<p>key value<br>missing_key does not exist<br>1<br>2<br>NewClient创建连接:<br>redisdb := redis.NewClient(&redis.Options{<br> Addr: "localhost:6379", // use default Addr<br> Password: "", // no password set<br> DB: 0, // use default DB<br>})</p>
<p>pong, err := redisdb.Ping().Result()<br>fmt.Println(pong, err)<br>1<br>2<br>3<br>4<br>5<br>6<br>7<br>8<br>如果使用tls/ssl 则在Options参数 TLSConfig *tls.Config 中指定</p>
<p>主从故障转移NewFailoverClient<br>redisdb := redis.NewFailoverClient(&redis.FailoverOptions{<br> MasterName: "master",<br> SentinelAddrs: []string{":26379"},<br>})<br>redisdb.Ping()<br>1<br>2<br>3<br>4<br>5<br>集群NewClusterClient<br>redisdb := redis.NewClusterClient(&redis.ClusterOptions{<br> Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"},<br>})<br>redisdb.Ping()<br>1<br>2<br>3<br>4<br>url样式建立连接 ParseURL<br>opt, err := redis.ParseURL("redis://:qwerty@localhost:6379/1")<br>if err != nil {<br> panic(err)<br>}<br>fmt.Println("addr is", opt.Addr)<br>fmt.Println("db is", opt.DB)<br>fmt.Println("password is", opt.Password)</p>
<p>// Create client as usually.<br>_ = redis.NewClient(opt)<br>1<br>2<br>3<br>4<br>5<br>6<br>7<br>8<br>9<br>10<br>pipline<br>pipe := redisdb.Pipeline()</p>
<p>incr := pipe.Incr("pipeline_counter")<br>pipe.Expire("pipeline_counter", time.Hour)</p>
<p>// Execute<br>//<br>// INCR pipeline_counter<br>// EXPIRE pipeline_counts 3600<br>//<br>// using one redisdb-server roundtrip.<br>_, err := pipe.Exec()<br>fmt.Println(incr.Val(), err)<br>1<br>2<br>3<br>4<br>5<br>6<br>7<br>8<br>9<br>10<br>11<br>12<br>13<br>Pipelined</p>
<p>var incr *redis.IntCmd<br>_, err := redisdb.Pipelined(func(pipe redis.Pipeliner) error {<br> incr = pipe.Incr("pipelined_counter")<br> pipe.Expire("pipelined_counter", time.Hour)<br> return nil<br>})<br>fmt.Println(incr.Val(), err)<br>1<br>2<br>3<br>4<br>5<br>6<br>7<br>排他TxPipeline</p>
<p>pipe := redisdb.TxPipeline()</p>
<p>incr := pipe.Incr("tx_pipeline_counter")<br>pipe.Expire("tx_pipeline_counter", time.Hour)</p>
<p>// Execute<br>//<br>// MULTI<br>// INCR pipeline_counter<br>// EXPIRE pipeline_counts 3600<br>// EXEC<br>//<br>// using one redisdb-server roundtrip.<br>_, err := pipe.Exec()<br>fmt.Println(incr.Val(), err)<br>1<br>2<br>3<br>4<br>5<br>6<br>7<br>8<br>9<br>10<br>11<br>12<br>13<br>14<br>15<br>rdb := redis.NewClient(&redis.Options{<br> Addr: ":6379",<br>})<br>rdb.AddHook(redisHook{})</p>
<p>rdb.Pipelined(func(pipe redis.Pipeliner) error {<br> pipe.Ping()<br> pipe.Ping()<br> return nil<br>})<br>1<br>2<br>3<br>4<br>5<br>6<br>7<br>8<br>9<br>10<br>发布订阅<br>sub := client.Subscribe(queryResp)<br>iface, err := sub.Receive()<br>if err != nil {<br> // handle error<br>}</p>
<p>// Should be *Subscription, but others are possible if other actions have been<br>// taken on sub since it was created.<br>switch iface.(type) {<br>case *Subscription:<br> // subscribe succeeded<br>case *Message:<br> // received first message<br>case *Pong:<br> // pong received<br>default:<br> // handle error<br>}</p>
<p>ch := sub.Channel()<br>1<br>2<br>3<br>4<br>5<br>6<br>7<br>8<br>9<br>10<br>11<br>12<br>13<br>14<br>15<br>16<br>17<br>18<br>19<br>20<br>示例程序</p>
<p>package main</p>
<p>import (<br> "github.com/go-redis/redis"<br> "time"<br> "fmt"<br>)</p>
<p><br>func ExampleClient() {<br> redisdb := redis.NewClient(&redis.Options{<br> Addr: "192.168.137.18:6379",<br> Password: "", // no password set<br> DB: 0,// use default DB<br> })<br> //rdb.AddHook()</p>
<p> pubsub := redisdb.Subscribe("mychannel1")</p>
<p> // Wait for confirmation that subscription is created before publishing anything.<br> _, err := pubsub.Receive()<br> if err != nil {<br> panic(err)<br> }</p>
<p> // Go channel which receives messages.<br> ch := pubsub.Channel()</p>
<p> // Publish a message.<br> err = redisdb.Publish("mychannel1", "hello").Err()<br> if err != nil {<br> panic(err)<br> }</p>
<p> time.AfterFunc(time.Second, func() {<br> // When pubsub is closed channel is closed too.<br> _ = pubsub.Close()<br> })</p>
<p> // Consume messages.<br> for msg := range ch {<br> fmt.Println(msg.Channel, msg.Payload)<br> }<br>}</p>
<p>func main() {<br> ExampleClient()<br>}<br>1<br>2<br>3<br>4<br>5<br>6<br>7<br>8<br>9<br>10<br>11<br>12<br>13<br>14<br>15<br>16<br>17<br>18<br>19<br>20<br>21<br>22<br>23<br>24<br>25<br>26<br>27<br>28<br>29<br>30<br>31<br>32<br>33<br>34<br>35<br>36<br>37<br>38<br>39<br>40<br>41<br>42<br>43<br>44<br>45<br>46<br>47<br>48<br>输出:</p>
<p>mychannel1 hello<br>1<br>示例2</p>
<p>func ExampleClient2() {<br> redisdb := redis.NewClient(&redis.Options{<br> Addr: "192.168.137.18:6379",<br> Password: "", // no password set<br> DB: 0,// use default DB<br> })<br> //rdb.AddHook()<br> pubsub := redisdb.Subscribe("mychannel2")<br> defer pubsub.Close()</p>
<p> for i := 0; i < 2; i++ {<br> // ReceiveTimeout is a low level API. Use ReceiveMessage instead.<br> msgi, err := pubsub.ReceiveTimeout(time.Second)<br> if err != nil {<br> break<br> }</p>
<p> switch msg := msgi.(type) {<br> case *redis.Subscription:<br> fmt.Println("subscribed to", msg.Channel)</p>
<p> _, err := redisdb.Publish("mychannel2", "hello").Result()<br> if err != nil {<br> panic(err)<br> }<br> case *redis.Message:<br> fmt.Println("received", msg.Payload, "from", msg.Channel)<br> default:<br> panic("unreached")<br> }<br> }</p>
<p>}<br>1<br>2<br>3<br>4<br>5<br>6<br>7<br>8<br>9<br>10<br>11<br>12<br>13<br>14<br>15<br>16<br>17<br>18<br>19<br>20<br>21<br>22<br>23<br>24<br>25<br>26<br>27<br>28<br>29<br>30<br>31<br>32<br>33<br>输出:</p>
<p>subscribed to mychannel2<br>received hello from mychannel2<br>————————————————<br>版权声明:本文为CSDN博主「comprel」的原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接及本声明。<br>原文链接:https://blog.csdn.net/comprel/article/details/96716708</p><br><br>
来源:https://www.cnblogs.com/ExMan/p/11493192.html
頁:
[1]