贞丰圣士 發表於 2025-11-13 09:29:20

go-redis Pipeline与事务的实现示例

<div id="navCategory"><h5 class="catalogue">目录</h5><ul class="first_class_ul"><li><a href="#_label0">1 背景与动机</a></li><li><a href="#_label1">2 Pipeline:降低 RTT 的秘密武器</a></li><ul class="second_class_ul"><li><a href="#_lab2_1_0">2.1 基础用法</a></li><li><a href="#_lab2_1_1">2.2 自动化Pipelined()</a></li><li><a href="#_lab2_1_2">2.3 性能实测 &amp; 调优</a></li></ul><li><a href="#_label2">3 事务:一次提交,全部成功</a></li><ul class="second_class_ul"><li><a href="#_lab2_2_3">3.1TxPipeline()基础</a></li><li><a href="#_lab2_2_4">3.2TxPipelined()回调</a></li><li><a href="#_lab2_2_5">3.3 事务 vs Lua 脚本</a></li></ul><li><a href="#_label3">4 乐观锁:Watch 机制剖析</a></li><ul class="second_class_ul"><li><a href="#_lab2_3_6">4.1 完整重试模型</a></li><li><a href="#_lab2_3_7">4.2 常见坑与最佳实践</a></li></ul><li><a href="#_label4">5 生产级 Checklist</a></li><ul class="second_class_ul"></ul><li><a href="#_label5">6 结语</a></li><ul class="second_class_ul"></ul></ul></div><p class="maodian"><a name="_label0"></a></p><h2>1 背景与动机</h2>
<p>在高并发服务中,<strong>网络往返 (RTT)</strong> 与 <strong>一致性</strong> 是两大核心痛点。</p>
<ul><li><strong>Pipeline</strong> &mdash;&mdash; 把多条命令打包,一次发网络、一并回包 &rarr; <strong>减少 RTT、提高吞吐</strong>。</li><li><strong>事务 (MULTI/EXEC)</strong> &mdash;&mdash; 多条命令串行、原子执行 &rarr; <strong>保证一致性</strong>。</li><li><strong>Watch + Tx</strong> &mdash;&mdash; 给事务加上 乐观锁,并发安全地修改共享数据。</li></ul>
<p>go-redis v9 对上述三者均提供了优雅 API,下面逐一拆解。</p>
<p class="maodian"><a name="_label1"></a></p><h2>2 Pipeline:降低 RTT 的秘密武器</h2>
<p class="maodian"><a name="_lab2_1_0"></a></p><h3>2.1 基础用法</h3>
<div class="jb51code"><pre class="brush:go;">// 初始化
pipe := rdb.Pipeline()

// 批量写 seat:0~4
for i := 0; i &lt; 5; i++ {
        pipe.Set(ctx, fmt.Sprintf("seat:%d", i), fmt.Sprintf("#%d", i), 0)
}

// 真正发送
cmds, err := pipe.Exec(ctx)
if err != nil { panic(err) }

for _, c := range cmds {
        fmt.Printf("%s; ", c.(*redis.StatusCmd).Val())// OK;OK;OK;...
}
</pre></div>
<blockquote><p>⚠️ 只有 Exec() 之后,c.Val() 才有结果;错误也集中由 Exec 返回。</p></blockquote>
<p>批量读写混用</p>
<div class="jb51code"><pre class="brush:go;">pipe = rdb.Pipeline()
g0 := pipe.Get(ctx, "seat:0")
g3 := pipe.Get(ctx, "seat:3")
g4 := pipe.Get(ctx, "seat:4")
_, _ = pipe.Exec(ctx)

fmt.Println(g0.Val(), g3.Val(), g4.Val()) // #0 #3 #4
</pre></div>
<p class="maodian"><a name="_lab2_1_1"></a></p><h3>2.2 自动化Pipelined()</h3>
<div class="jb51code"><pre class="brush:go;">var g0, g3, g4 *redis.StringCmd

_, err := rdb.Pipelined(ctx, func(p redis.Pipeliner) error {
        g0 = p.Get(ctx, "seat:0")
        g3 = p.Get(ctx, "seat:3")
        g4 = p.Get(ctx, "seat:4")
        return nil
})
if err != nil { panic(err) }

fmt.Println(g0.Val(), g3.Val(), g4.Val())
</pre></div>
<p>优势:自动 Exec、代码更简洁,非常适合服务层一次性批量操作。</p>
<p class="maodian"><a name="_lab2_1_2"></a></p><h3>2.3 性能实测 &amp; 调优</h3>
<table><thead><tr><th>批量大小</th><th>QPS (单核)</th><th>RTT (平均)</th></tr></thead><tbody><tr><td>单命令</td><td>80 k/s</td><td>0.15 ms</td></tr><tr><td>50 条</td><td>310 k/s</td><td>0.04 ms</td></tr><tr><td>200 条</td><td>340 k/s</td><td>0.05 ms</td></tr><tr><td>500 条</td><td>300 k/s</td><td>0.09 ms</td></tr></tbody></table>
<ul><li>最佳区间 50-200:吞吐高且单包不至于过大。</li><li>并发写场景可 每个 Goroutine 维护独立 Pipeline。</li><li>遇到 context.DeadlineExceeded 说明批量过大或超时过短。</li></ul>
<p class="maodian"><a name="_label2"></a></p><h2>3 事务:一次提交,全部成功</h2>
<p class="maodian"><a name="_lab2_2_3"></a></p><h3>3.1TxPipeline()基础</h3>
<div class="jb51code"><pre class="brush:go;">tx := rdb.TxPipeline()

tx.IncrBy(ctx, "counter:1", 1)
tx.IncrBy(ctx, "counter:2", 2)
tx.IncrBy(ctx, "counter:3", 3)

cmds, err := tx.Exec(ctx)
if err != nil { panic(err) }

for _, c := range cmds {
        fmt.Println(c.(*redis.IntCmd).Val())// 1 2 3
}
</pre></div>
<p class="maodian"><a name="_lab2_2_4"></a></p><h3>3.2TxPipelined()回调</h3>
<div class="jb51code"><pre class="brush:go;">var c1, c2, c3 *redis.IntCmd
_, err := rdb.TxPipelined(ctx, func(t redis.Pipeliner) error {
        c1 = t.IncrBy(ctx, "counter:1", 1)
        c2 = t.IncrBy(ctx, "counter:2", 2)
        c3 = t.IncrBy(ctx, "counter:3", 3)
        return nil
})
if err != nil { panic(err) }

fmt.Println(c1.Val(), c2.Val(), c3.Val()) // 2 4 6
</pre></div>
<p class="maodian"><a name="_lab2_2_5"></a></p><h3>3.3 事务 vs Lua 脚本</h3>
<table><thead><tr><th>特性</th><th>事务 (MULTI/EXEC)</th><th>Lua 脚本</th></tr></thead><tbody><tr><td>原子性</td><td>✅</td><td>✅</td></tr><tr><td>复杂逻辑</td><td>一般</td><td>强大</td></tr><tr><td>可读性</td><td>高(Go 代码)</td><td>中</td></tr><tr><td>调试 &amp; 监控</td><td>简单</td><td>略复杂</td></tr><tr><td>性能</td><td>好</td><td>极好(单指令)</td></tr></tbody></table>
<blockquote><p><strong>结论</strong>:逻辑简单 &rarr; 事务;多 Key、复杂判断 &rarr; Lua。</p></blockquote>
<p class="maodian"><a name="_label3"></a></p><h2>4 乐观锁:Watch 机制剖析</h2>
<p>在并发环境修改同一 Key,需防止 &ldquo;读-改-写&rdquo; 期间被别人修改。<code>WATCH</code> 就是解决方案。</p>
<p class="maodian"><a name="_lab2_3_6"></a></p><h3>4.1 完整重试模型</h3>
<div class="jb51code"><pre class="brush:go;">const maxRetry = 1000
for i := 0; i &lt; maxRetry; i++ {
        err := rdb.Watch(ctx, func(tx *redis.Tx) error {
                // 1) 读取
                path, err := tx.Get(ctx, "shellpath").Result()
                if err != nil &amp;&amp; err != redis.Nil { return err }

                // 2) 业务计算
                newPath := path + ":/usr/mycmds/"

                // 3) 尝试写入(事务)
                _, err = tx.TxPipelined(ctx, func(p redis.Pipeliner) error {
                        p.Set(ctx, "shellpath", newPath, 0)
                        return nil
                })
                return err
        }, "shellpath")

        if err == nil { break }                // 成功
        if err == redis.TxFailedErr { continue } // 冲突,重试
        panic(err)                           // 其他错误
}
</pre></div>
<p class="maodian"><a name="_lab2_3_7"></a></p><h3>4.2 常见坑与最佳实践</h3>
<table><thead><tr><th>坑</th><th>现象</th><th>解决方案</th></tr></thead><tbody><tr><td>Watch 区间耗时过长</td><td>冲突率飙升</td><td>减少业务逻辑 / 降重</td></tr><tr><td>忘记重试</td><td>数据丢失或未更新</td><td>封装通用 <code>RetryTx</code></td></tr><tr><td>批量 Watch 多 Key</td><td>死锁概率增大</td><td>拆分 Key 或 Lua</td></tr></tbody></table>
<p class="maodian"><a name="_label4"></a></p><h2>5 生产级 Checklist</h2>
<ol><li>Pipeline 批量:50-200 条最优;阻塞命令 (BLPOP) 另开连接。</li><li>事务重试:封装带退避 (exponential back-off) 的 Retry。</li><li>连接池:PoolSize = CPU*10,MinIdleConns &asymp; 20% PoolSize。</li><li>超时:DialTimeout 100ms、Read/WriteTimeout 200ms 典型值。</li><li>可观测:redisotel.InstrumentTracing/Metrics 接入 OTel。</li><li>幂等命令:重试需确保无副作用。</li><li>Lua 脚本:库存扣减、抢红包等使用脚本更稳。</li><li>RESP3:如 Redis &ge; 6.0,可设置 Protocol: 3 享受 Map/Push 类型。</li></ol>
<p class="maodian"><a name="_label5"></a></p><h2>6 结语</h2>
<ul><li><strong>Pipeline</strong> 带来吞吐提升,适合大量写入与批量读写。</li><li><strong>事务</strong> 提供原子操作,确保数据一致。</li><li><strong>Watch</strong> 则在并发场景下守护一致性。</li></ul>
<p>合理组合三者,配合连接池调优与可观测监控,你就能构建 <strong>既快又稳</strong> 的 Redis 访问层。祝编码愉快,TPS 飙升!</p>
頁: [1]
查看完整版本: go-redis Pipeline与事务的实现示例