陈耀良 發表於 2022-1-24 08:03:00

聊一聊如何用C#轻松完成一个SAGA分布式事务

<h2 id="背景">背景</h2>
<p>银行跨行转账业务是一个典型分布式事务场景,假设 A 需要跨行转账给B,那么就涉及两个银行的数据,无法通过一个数据库的本地事务保证转账的 ACID ,只能够通过分布式事务来解决。</p>
<p>市面上使用比较多的分布式事务框架,支持 SAGA 的,大部分都是 JAVA 为主的,没有提供 C# 的对接方式,或者是对接难度大,一定程度上让人望而却步。</p>
<p>这里推荐一下叶东富大佬的分布式事务框架 dtm,一款跨语言的开源分布式事务管理器,优雅的解决了幂等、空补偿、悬挂等分布式事务难题。提供了简单易用、高性能、易水平扩展的分布式事务解决方案。</p>
<p>老黄在搜索相关分布式事务资料的时候,他写的文章都是相对比较好理解的,也就是这样关注到了 dtm 这个项目。</p>
<p>下面就基于这个框架来实践一下银行转账的例子。</p>
<p>前置工作</p>
<pre><code>dotnet add package Dtmcli --version 0.3.0
</code></pre>
<h2 id="成功的-saga">成功的 SAGA</h2>
<p>先来看一下一个成功完成的 SAGA 时序图。</p>
<p><img src="https://img2022.cnblogs.com/blog/558945/202201/558945-20220122113130946-312924907.png" alt="" loading="lazy"></p>
<p>上图的微服务1,对应我们示例的 OutApi,也就是转钱出去的那个服务。</p>
<p>微服务2,对应我们示例的 InApi,也就是转钱进来的那个服务。</p>
<p>下面是两个服务的正向操作和补偿操作的处理。</p>
<p>OutApi</p>
<pre><code class="language-c#">app.MapPost("/api/TransOut", (string branch_id, string gid, string op, TransRequest req) =&gt;
{
    // 进行 数据库操作
    Console.WriteLine($"用户【{req.UserId}】转出【{req.Amount}】正向操作,gid={gid}, branch_id={branch_id}, op={op}");

    return Results.Ok(TransResponse.BuildSucceedResponse());
});

app.MapPost("/api/TransOutCompensate", (string branch_id, string gid, string op, TransRequest req) =&gt;
{
    // 进行 数据库操作
    Console.WriteLine($"用户【{req.UserId}】转出【{req.Amount}】补偿操作,gid={gid}, branch_id={branch_id}, op={op}");

    return Results.Ok(TransResponse.BuildSucceedResponse());
});
</code></pre>
<p>InApi</p>
<pre><code class="language-c#">app.MapPost("/api/TransIn", (string branch_id, string gid, string op, TransRequest req) =&gt;
{
    Console.WriteLine($"用户【{req.UserId}】转入【{req.Amount}】正向操作,gid={gid}, branch_id={branch_id}, op={op}");

    return Results.Ok(TransResponse.BuildSucceedResponse());
});

app.MapPost("/api/TransInCompensate", (string branch_id, string gid, string op, TransRequest req) =&gt;
{
    Console.WriteLine($"用户【{req.UserId}】转入【{req.Amount}】补偿操作,gid={gid}, branch_id={branch_id}, op={op}");

    return Results.Ok(TransResponse.BuildSucceedResponse());
});
</code></pre>
<blockquote>
<p>注:示例为了简单,没有进行实际的数据库操作。</p>
</blockquote>
<p>到此各个子事务的处理已经 OK 了,然后是开启 SAGA 事务,进行分支调用</p>
<pre><code class="language-cs">var userOutReq = new TransRequest() { UserId = "1", Amount = -30 };
var userInReq = new TransRequest() { UserId = "2", Amount = 30 };

var ct = new CancellationToken();
var gid = await dtmClient.GenGid(ct);
var saga = new Saga(dtmClient, gid)
    .Add(outApi + "/TransOut", outApi + "/TransOutCompensate", userOutReq)
    .Add(inApi + "/TransIn", inApi + "/TransInCompensate", userInReq)
    ;

var flag = await saga.Submit(ct);

Console.WriteLine($"case1, {gid} saga 提交结果 = {flag}");
</code></pre>
<p>到这里,一个完整的 SAGA 分布式事务就编写完成了。</p>
<p>搭建好 dtm 的环境后,运行上面的例子,会看到下面的输出。</p>
<p><img src="https://img2022.cnblogs.com/blog/558945/202201/558945-20220122113147353-593841516.png" alt="" loading="lazy"></p>
<p>当然,上面的情况太理想了,转出转入都是一次性就成功了。</p>
<p>但是实际上我们会遇到许许多多的问题,最常见的应该就是网络故障了。</p>
<p>下面来看一个异常的 SAGA 示例</p>
<h2 id="异常的-saga">异常的 SAGA</h2>
<p>做一个假设,用户1的转出是正常的,但是用户2在转入的时候出现了问题。</p>
<p>由于事务已经提交给 dtm 了,按照 SAGA 事务的协议,dtm 会重试未完成的操作。</p>
<p>这个时候用户2 这边会出现什么样的情况呢?</p>
<ol>
<li>转入其实成功了,但是 dtm 收到错误 (网络故障等)</li>
<li>转入没有成功,直接告诉 dtm 失败了 (应用异常等)</li>
</ol>
<p>无论是那一种,dtm 都会进行重试操作。这个时候会发生什么呢?我们继续往下看。</p>
<p>先看一下事务失败交互的时序图</p>
<p><img src="https://img2022.cnblogs.com/blog/558945/202201/558945-20220122113158552-1511687700.png" alt="" loading="lazy"></p>
<p>再通过调整上面成功的例子,来比较直观的看看出现的情况。</p>
<p>在 InApi 加多一个转入失败的处理接口</p>
<pre><code class="language-cs">app.MapPost("/api/TransInError", (string branch_id, string gid, string op, TransRequest req) =&gt;
{
    Console.WriteLine($"用户【{req.UserId}】转入【{req.Amount}】正向操作--失败,gid={gid}, branch_id={branch_id}, op={op}");

    //return Results.BadRequest();
    return Results.Ok(TransResponse.BuildFailureResponse());
});
</code></pre>
<p>失败的返回有两种,一种是状态码大于 400,一种是状态码是 200 并且响应体包含 FAILURE,上面的例子是第二种</p>
<p>调整一下调用方,把转入正向操作替换成上面这个返回错误的接口。</p>
<pre><code class="language-cs">var saga = new Saga(dtmClient, gid)
    .Add(outApi + "/TransOut", outApi + "/TransOutCompensate", userOutReq)
    .Add(inApi + "/TransInError", inApi + "/TransInCompensate", userInReq);
</code></pre>
<p>运行结果如下:</p>
<p><img src="https://img2022.cnblogs.com/blog/558945/202201/558945-20220122113213257-320616258.png" alt="" loading="lazy"></p>
<p>在这个例子中,只考虑补偿/重试成功的情况下。</p>
<p>用户1 转出的 30 块钱最终是回到了他的帐号上,他没有出现损失。</p>
<p>用户2 就有点苦逼了,转入没有成功,返回了失败,还触发了转入的补偿机制,结果就是把用户2 还没进帐的 30 块钱给多扣了,这个就是上面的情况2,常见的空补偿问题。</p>
<p>这个时候就要在进行转入补偿的时候做一系列的判断,转入有没有成功,转出有没有失败等等,把业务变的十分复杂。</p>
<p>如果出现了上述的情况1,会发生什么呢?</p>
<p>用户2 第一次已经成功转入 30 块钱,返回的也是成功,但是网络出了点问题,导致 dtm 认为失败了,它就会进行重试,相当于用户2还会收到第二个转入 30 块钱的请求!也就是说这次转帐,用户2 会进账 60 块钱,翻倍了,也就是说这个请求不是幂等。</p>
<p>同样的,要处理这个问题,在进行转入的正向操作中也要进行一系列的判断,同样会把复杂度上升一个级别。</p>
<p>前面有提到 dtm 提供了子事务屏障的功能,保证了幂等、空补偿等常见问题。</p>
<p><img src="https://img2022.cnblogs.com/blog/558945/202201/558945-20220122113226683-229896877.png" alt="" loading="lazy"></p>
<p>再来看看这个子事务屏障的功能有没有帮我们简化上面异常处理。</p>
<h2 id="子事务屏障">子事务屏障</h2>
<p>子事务屏障,需要根据 <strong>trans_type</strong>,<strong>gid</strong>,<strong>branch_id</strong> 和 <strong>op</strong> 四个内容进行创建。</p>
<p>这4个内容 dtm 在回调时会放在 querysting 上面。</p>
<p>客户端里面提供了 <strong>IBranchBarrierFactory</strong> 来供我们使用。</p>
<h3 id="空补偿">空补偿</h3>
<p>针对上面的异常情况(用户2 凭空消失 30 块钱),对转入的补偿进行子事务屏障的改造。</p>
<pre><code class="language-cs">app.MapPost("/api/BarrierTransInCompensate", async (string branch_id, string gid, string op, string trans_type, TransRequest req, IBranchBarrierFactory factory) =&gt;
{
    var barrier = factory.CreateBranchBarrier(trans_type, gid, branch_id, op);

    using var db = Db.GeConn();
    await barrier.Call(db, async (tx) =&gt;
    {
      // 转入失败的情况下,不应该输出下面这个
      Console.WriteLine($"用户【{req.UserId}】转入【{req.Amount}】补偿操作,gid={gid}, branch_id={branch_id}, op={op}");
      // tx 参数是事务,可和本地事务一起提交回滚
      await Task.CompletedTask;
    });

    Console.WriteLine($"子事务屏障-补偿操作,gid={gid}, branch_id={branch_id}, op={op}");
    return Results.Ok(TransResponse.BuildSucceedResponse());
});
</code></pre>
<p><strong>Call</strong> 方法就是关键所在了,需要传入一个 DbConnection 和真正的业务操作,这里的业务操作就是在控制台输出补偿操作的信息。</p>
<p>同样的,我们再调整一下调用方,把转入补偿操作替换成上面带子事务屏障的接口。</p>
<pre><code class="language-cs">var saga = new Saga(dtmClient, gid)
    .Add(outApi + "/TransOut", outApi + "/TransOutCompensate", userOutReq)
    .Add(inApi + "/TransInError", inApi + "/BarrierTransInCompensate", userInReq)
    ;
</code></pre>
<p>再来运行这个例子。</p>
<p><img src="https://img2022.cnblogs.com/blog/558945/202201/558945-20220122113241788-1732829651.png" alt="" loading="lazy"></p>
<p>会发现转入的补偿操作并没执行,控制台没有输出补偿信息,而是输出了</p>
<pre><code>Will not exec busiCall, isNullCompensation=True, isDuplicateOrPend=False
</code></pre>
<p>这个就表明了,这个请求是个空补偿,是不应该执行业务方法的,既空操作。</p>
<p>再来看一下,转入成功的,但是 dtm 收到了失败的信号,不断重试造成重复请求的情况。</p>
<h3 id="幂等">幂等</h3>
<p>针对用户2 转入两次 30 块钱的异常情况,对转入的正向操作进行子事务屏障的改造。</p>
<pre><code class="language-cs">app.MapPost("/api/BarrierTransIn", async (string branch_id, string gid, string op, string trans_type, TransRequest req, IBranchBarrierFactory factory) =&gt;
{
    Console.WriteLine($"用户【{req.UserId}】转入【{req.Amount}】请求来了!!! gid={gid}, branch_id={branch_id}, op={op}");

    var barrier = factory.CreateBranchBarrier(trans_type, gid, branch_id, op);

    using var db = Db.GeConn();
    await barrier.Call(db, async (tx) =&gt;
    {
      var c = Interlocked.Increment(ref _errCount);

      // 模拟一个超时执行
      if (c &gt; 0 &amp;&amp; c &lt; 2) await Task.Delay(10000);

      Console.WriteLine($"用户【{req.UserId}】转入【{req.Amount}】正向操作,gid={gid}, branch_id={branch_id}, op={op}");
      await Task.CompletedTask;
    });

    return Results.Ok(TransResponse.BuildSucceedResponse());
});
</code></pre>
<p>这里通过一个超时执行来让 dtm 进行转入正向操作的重试。</p>
<p>同样的,我们再调整一下调用方,把转入的正向操作也替换成上面带子事务屏障的接口。</p>
<pre><code class="language-cs">var saga = new Saga(dtmClient, gid)
    .Add(outApi + "/TransOut", outApi + "/TransOutCompensate", userOutReq)
    .Add(inApi + "/BarrierTransIn", inApi + "/BarrierTransInCompensate", userInReq)
    ;
</code></pre>
<p>再来运行这个例子。</p>
<p><img src="https://img2022.cnblogs.com/blog/558945/202201/558945-20220122113315447-1502875629.png" alt="" loading="lazy"></p>
<p>可以看到转入的正向操作确实是触发了多次,第一次实际上是成功,只是响应比较慢,导致 dtm 认为是失败了,触发了第二次请求,但是第二次请求并没有执行业务操作,而是输出了</p>
<pre><code>Will not exec busiCall, isNullCompensation=False, isDuplicateOrPend=True
</code></pre>
<p>这个就表明了,这个请求是个重复请求,是不应该执行业务方法的,保证了幂等。</p>
<p>到这里,可以看出,子事务屏障确实解决了幂等和空补偿的问题,<strong>大大降低了业务判断的复杂度和出错的可能性</strong>。</p>
<h2 id="写在最后">写在最后</h2>
<p>在这篇文章里,也通过几个例子,完整给出了编写一个 SAGA 事务的过程,涵盖了正常成功完成,异常情况,以及成功回滚的情况。希望对研究分布式事务的您有所帮助。</p>
<p>本文示例代码: DtmSagaSample</p>
<p>参考资料</p>
<ul>
<li>用Go轻松完成一个SAGA分布式事务,保姆级教程</li>
<li>线上的分布式事务是什么样的?以python的saga为例</li>
<li>dtm-labs/dtmcli-csharp</li>
</ul>


</div>
<div id="MySignature" role="contentinfo">
    <div style="text-align: center;">
    <img src="https://images.cnblogs.com/cnblogs_com/catcher1994/1933755/o_210220013029ewm2.jpg" />
</div>
<div class="signclass">
    <div>
      如果您认为这篇文章还不错或者有所收获,可以点击右下角的<strong style="color: red">【推荐】</strong>按钮,因为你的支持是我继续写作,分享的最大动力!
    </div>
    <div>
      <div>作者:Catcher Wong ( 黄文清 )</div>
      <div>来源:http://catcher1994.cnblogs.com/</div>
      <div>声明:
            本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。如果您发现博客中出现了错误,或者有更好的建议、想法,请及时与我联系!!如果想找我私下交流,可以私信或者加我微信。
      </div>
    </div>
</div><br><br>
来源:https://www.cnblogs.com/catcher1994/p/csharp-dtm-saga.html
頁: [1]
查看完整版本: 聊一聊如何用C#轻松完成一个SAGA分布式事务