【RabbitMQ】与ASP.NET Core集成
<h2>本章目标</h2><ul>
<li>
<p class="ds-markdown-paragraph">掌握在ASP.NET Core中配置和依赖注入RabbitMQ服务。</p>
</li>
<li>
<p class="ds-markdown-paragraph">学习使用<code>IHostedService</code>/<code>BackgroundService</code>实现常驻消费者服务。</p>
</li>
<li>
<p class="ds-markdown-paragraph">实现基于RabbitMQ的请求-响应模式。</p>
</li>
<li>
<p class="ds-markdown-paragraph">构建完整的微服务间异步通信解决方案。</p>
</li>
<li>
<p class="ds-markdown-paragraph">学习配置管理和健康检查。</p>
</li>
</ul>
<hr>
<h2>一、理论部分</h2>
<h3>1. ASP.NET Core集成模式</h3>
<p class="ds-markdown-paragraph">将RabbitMQ集成到ASP.NET Core应用程序时,我们需要考虑几个关键方面:</p>
<ul>
<li>
<p class="ds-markdown-paragraph">依赖注入:正确管理连接和通道的生命周期。</p>
</li>
<li>
<p class="ds-markdown-paragraph">托管服务:实现后台消息消费者。</p>
</li>
<li>
<p class="ds-markdown-paragraph">配置管理:从配置文件读取RabbitMQ连接设置。</p>
</li>
<li>
<p class="ds-markdown-paragraph">健康检查:监控RabbitMQ连接状态。</p>
</li>
<li>
<p class="ds-markdown-paragraph">日志记录:使用ASP.NET Core的日志系统。</p>
</li>
</ul>
<h3>2. 生命周期管理</h3>
<ul>
<li>
<p class="ds-markdown-paragraph">IConnection:建议注册为单例,因为创建TCP连接开销大。</p>
</li>
<li>
<p class="ds-markdown-paragraph">IModel:建议注册为瞬态或作用域,因为通道不是线程安全的。</p>
</li>
<li>
<p class="ds-markdown-paragraph">生产者服务:可以注册为作用域或瞬态。</p>
</li>
<li>
<p class="ds-markdown-paragraph">消费者服务:通常在托管服务中管理。</p>
</li>
</ul>
<h3>3. 托管服务(Hosted Services)</h3>
<p class="ds-markdown-paragraph">ASP.NET Core提供了<code>IHostedService</code>接口和<code>BackgroundService</code>基类,用于实现长时间运行的后台任务。这是实现RabbitMQ消费者的理想方式。</p>
<h3>4. 微服务架构中的消息模式</h3>
<ul>
<li>
<p class="ds-markdown-paragraph">异步命令:发送指令但不期待立即响应。</p>
</li>
<li>
<p class="ds-markdown-paragraph">事件通知:广播状态变化。</p>
</li>
<li>
<p class="ds-markdown-paragraph">请求-响应:类似RPC,但通过消息中间件。</p>
</li>
</ul>
<hr>
<h2>二、实操部分:构建订单处理微服务</h2>
<p class="ds-markdown-paragraph">我们将创建一个完整的订单处理系统,包含:</p>
<ul>
<li>
<p class="ds-markdown-paragraph">Order.API:接收HTTP订单请求,发布消息</p>
</li>
<li>
<p class="ds-markdown-paragraph">OrderProcessor.BackgroundService:后台处理订单</p>
</li>
<li>
<p class="ds-markdown-paragraph">订单状态查询API</p>
</li>
<li>
<p class="ds-markdown-paragraph">健康检查</p>
</li>
<li>
<p class="ds-markdown-paragraph">配置管理</p>
</li>
</ul>
<h3>第1步:创建项目结构</h3>
<div class="md-code-block md-code-block-light">
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 0, 1)"># 创建解决方案
dotnet new sln </span>-<span style="color: rgba(0, 0, 0, 1)">n OrderSystem
# 创建Web API项目
dotnet new webapi </span>-<span style="color: rgba(0, 0, 0, 1)">n Order.API
dotnet new classlib </span>-<span style="color: rgba(0, 0, 0, 1)">n Order.Core
dotnet new classlib </span>-<span style="color: rgba(0, 0, 0, 1)">n Order.Infrastructure
dotnet new classlib </span>-<span style="color: rgba(0, 0, 0, 1)">n OrderProcessor.Service
# 添加到解决方案
dotnet sln add Order.API</span>/<span style="color: rgba(0, 0, 0, 1)">Order.API.csproj
dotnet sln add Order.Core</span>/<span style="color: rgba(0, 0, 0, 1)">Order.Core.csproj
dotnet sln add Order.Infrastructure</span>/<span style="color: rgba(0, 0, 0, 1)">Order.Infrastructure.csproj
dotnet sln add OrderProcessor.Service</span>/<span style="color: rgba(0, 0, 0, 1)">OrderProcessor.Service.csproj
# 添加项目引用
dotnet add Order.API reference Order.Core
dotnet add Order.API reference Order.Infrastructure
dotnet add OrderProcessor.Service reference Order.Core
dotnet add OrderProcessor.Service reference Order.Infrastructure
dotnet add Order.Infrastructure reference Order.Core
# 添加NuGet包
cd Order.API
dotnet add package RabbitMQ.Client
dotnet add package Microsoft.Extensions.Diagnostics.HealthChecks
cd ..</span>/<span style="color: rgba(0, 0, 0, 1)">Order.Infrastructure
dotnet add package RabbitMQ.Client
dotnet add package Microsoft.Extensions.Configuration
cd ..</span>/<span style="color: rgba(0, 0, 0, 1)">OrderProcessor.Service
dotnet add package RabbitMQ.Client</span></pre>
</div>
</div>
<h3>第2步:定义领域模型(Order.Core)</h3>
<p class="ds-markdown-paragraph">Models/Order.cs</p>
<div class="md-code-block md-code-block-light">
<div class="md-code-block-banner-wrap">
<div class="md-code-block-banner md-code-block-banner-lite">
<div class="_121d384">
<div class="d2a24f03">
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 255, 1)">namespace</span><span style="color: rgba(0, 0, 0, 1)"> Order.Core.Models
{
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">class</span><span style="color: rgba(0, 0, 0, 1)"> Order
{
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">string</span> Id { <span style="color: rgba(0, 0, 255, 1)">get</span>; <span style="color: rgba(0, 0, 255, 1)">set</span>; } =<span style="color: rgba(0, 0, 0, 1)"> Guid.NewGuid().ToString();
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">string</span> CustomerId { <span style="color: rgba(0, 0, 255, 1)">get</span>; <span style="color: rgba(0, 0, 255, 1)">set</span><span style="color: rgba(0, 0, 0, 1)">; }
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">string</span> ProductId { <span style="color: rgba(0, 0, 255, 1)">get</span>; <span style="color: rgba(0, 0, 255, 1)">set</span><span style="color: rgba(0, 0, 0, 1)">; }
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">int</span> Quantity { <span style="color: rgba(0, 0, 255, 1)">get</span>; <span style="color: rgba(0, 0, 255, 1)">set</span><span style="color: rgba(0, 0, 0, 1)">; }
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">decimal</span> TotalAmount { <span style="color: rgba(0, 0, 255, 1)">get</span>; <span style="color: rgba(0, 0, 255, 1)">set</span><span style="color: rgba(0, 0, 0, 1)">; }
</span><span style="color: rgba(0, 0, 255, 1)">public</span> OrderStatus Status { <span style="color: rgba(0, 0, 255, 1)">get</span>; <span style="color: rgba(0, 0, 255, 1)">set</span>; } =<span style="color: rgba(0, 0, 0, 1)"> OrderStatus.Pending;
</span><span style="color: rgba(0, 0, 255, 1)">public</span> DateTime CreatedAt { <span style="color: rgba(0, 0, 255, 1)">get</span>; <span style="color: rgba(0, 0, 255, 1)">set</span>; } =<span style="color: rgba(0, 0, 0, 1)"> DateTime.UtcNow;
</span><span style="color: rgba(0, 0, 255, 1)">public</span> DateTime? ProcessedAt { <span style="color: rgba(0, 0, 255, 1)">get</span>; <span style="color: rgba(0, 0, 255, 1)">set</span><span style="color: rgba(0, 0, 0, 1)">; }
}
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">enum</span><span style="color: rgba(0, 0, 0, 1)"> OrderStatus
{
Pending,
Processing,
Completed,
Failed,
Cancelled
}
}</span></pre>
</div>
</div>
</div>
</div>
</div>
</div>
<p class="ds-markdown-paragraph">Messages/OrderMessage.cs</p>
<div class="md-code-block md-code-block-light">
<div class="md-code-block-banner-wrap">
<div class="md-code-block-banner md-code-block-banner-lite">
<div class="_121d384">
<div class="d2a24f03">
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 255, 1)">namespace</span><span style="color: rgba(0, 0, 0, 1)"> Order.Core.Messages
{
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">class</span><span style="color: rgba(0, 0, 0, 1)"> OrderMessage
{
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">string</span> OrderId { <span style="color: rgba(0, 0, 255, 1)">get</span>; <span style="color: rgba(0, 0, 255, 1)">set</span><span style="color: rgba(0, 0, 0, 1)">; }
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">string</span> CustomerId { <span style="color: rgba(0, 0, 255, 1)">get</span>; <span style="color: rgba(0, 0, 255, 1)">set</span><span style="color: rgba(0, 0, 0, 1)">; }
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">string</span> ProductId { <span style="color: rgba(0, 0, 255, 1)">get</span>; <span style="color: rgba(0, 0, 255, 1)">set</span><span style="color: rgba(0, 0, 0, 1)">; }
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">int</span> Quantity { <span style="color: rgba(0, 0, 255, 1)">get</span>; <span style="color: rgba(0, 0, 255, 1)">set</span><span style="color: rgba(0, 0, 0, 1)">; }
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">decimal</span> TotalAmount { <span style="color: rgba(0, 0, 255, 1)">get</span>; <span style="color: rgba(0, 0, 255, 1)">set</span><span style="color: rgba(0, 0, 0, 1)">; }
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">string</span> Action { <span style="color: rgba(0, 0, 255, 1)">get</span>; <span style="color: rgba(0, 0, 255, 1)">set</span>; } <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> "create", "cancel"</span>
<span style="color: rgba(0, 0, 0, 1)"> }
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">class</span><span style="color: rgba(0, 0, 0, 1)"> OrderStatusMessage
{
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">string</span> OrderId { <span style="color: rgba(0, 0, 255, 1)">get</span>; <span style="color: rgba(0, 0, 255, 1)">set</span><span style="color: rgba(0, 0, 0, 1)">; }
</span><span style="color: rgba(0, 0, 255, 1)">public</span> OrderStatus Status { <span style="color: rgba(0, 0, 255, 1)">get</span>; <span style="color: rgba(0, 0, 255, 1)">set</span><span style="color: rgba(0, 0, 0, 1)">; }
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">string</span> Message { <span style="color: rgba(0, 0, 255, 1)">get</span>; <span style="color: rgba(0, 0, 255, 1)">set</span><span style="color: rgba(0, 0, 0, 1)">; }
</span><span style="color: rgba(0, 0, 255, 1)">public</span> DateTime Timestamp { <span style="color: rgba(0, 0, 255, 1)">get</span>; <span style="color: rgba(0, 0, 255, 1)">set</span>; } =<span style="color: rgba(0, 0, 0, 1)"> DateTime.UtcNow;
}
}</span></pre>
</div>
</div>
</div>
</div>
</div>
</div>
<h3>第3步:基础设施层(Order.Infrastructure)</h3>
<p class="ds-markdown-paragraph">Services/IRabbitMQConnection.cs</p>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 255, 1)">using</span><span style="color: rgba(0, 0, 0, 1)"> RabbitMQ.Client;
</span><span style="color: rgba(0, 0, 255, 1)">namespace</span><span style="color: rgba(0, 0, 0, 1)"> Order.Infrastructure.Services
{
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">interface</span><span style="color: rgba(0, 0, 0, 1)"> IRabbitMQConnection : IDisposable
{
</span><span style="color: rgba(0, 0, 255, 1)">bool</span> IsConnected { <span style="color: rgba(0, 0, 255, 1)">get</span><span style="color: rgba(0, 0, 0, 1)">; }
IModel CreateModel();
</span><span style="color: rgba(0, 0, 255, 1)">bool</span><span style="color: rgba(0, 0, 0, 1)"> TryConnect();
}
}</span></pre>
</div>
<p class="ds-markdown-paragraph">Services/RabbitMQConnection.cs</p>
<div class="md-code-block md-code-block-light">
<div class="md-code-block-banner-wrap">
<div class="md-code-block-banner md-code-block-banner-lite">
<div class="_121d384">
<div class="d2a24f03">
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 255, 1)">using</span><span style="color: rgba(0, 0, 0, 1)"> System.Net.Sockets;
</span><span style="color: rgba(0, 0, 255, 1)">using</span><span style="color: rgba(0, 0, 0, 1)"> Microsoft.Extensions.Logging;
</span><span style="color: rgba(0, 0, 255, 1)">using</span><span style="color: rgba(0, 0, 0, 1)"> RabbitMQ.Client;
</span><span style="color: rgba(0, 0, 255, 1)">using</span><span style="color: rgba(0, 0, 0, 1)"> RabbitMQ.Client.Events;
</span><span style="color: rgba(0, 0, 255, 1)">using</span><span style="color: rgba(0, 0, 0, 1)"> RabbitMQ.Client.Exceptions;
</span><span style="color: rgba(0, 0, 255, 1)">namespace</span><span style="color: rgba(0, 0, 0, 1)"> Order.Infrastructure.Services
{
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">class</span><span style="color: rgba(0, 0, 0, 1)"> RabbitMQConnection : IRabbitMQConnection
{
</span><span style="color: rgba(0, 0, 255, 1)">private</span> <span style="color: rgba(0, 0, 255, 1)">readonly</span><span style="color: rgba(0, 0, 0, 1)"> IConnectionFactory _connectionFactory;
</span><span style="color: rgba(0, 0, 255, 1)">private</span> <span style="color: rgba(0, 0, 255, 1)">readonly</span> ILogger<RabbitMQConnection><span style="color: rgba(0, 0, 0, 1)"> _logger;
</span><span style="color: rgba(0, 0, 255, 1)">private</span><span style="color: rgba(0, 0, 0, 1)"> IConnection _connection;
</span><span style="color: rgba(0, 0, 255, 1)">private</span> <span style="color: rgba(0, 0, 255, 1)">bool</span><span style="color: rgba(0, 0, 0, 1)"> _disposed;
</span><span style="color: rgba(0, 0, 255, 1)">private</span> <span style="color: rgba(0, 0, 255, 1)">readonly</span> <span style="color: rgba(0, 0, 255, 1)">object</span> _syncRoot = <span style="color: rgba(0, 0, 255, 1)">new</span> <span style="color: rgba(0, 0, 255, 1)">object</span><span style="color: rgba(0, 0, 0, 1)">();
</span><span style="color: rgba(0, 0, 255, 1)">public</span> RabbitMQConnection(IConnectionFactory connectionFactory, ILogger<RabbitMQConnection><span style="color: rgba(0, 0, 0, 1)"> logger)
{
_connectionFactory </span>=<span style="color: rgba(0, 0, 0, 1)"> connectionFactory;
_logger </span>=<span style="color: rgba(0, 0, 0, 1)"> logger;
}
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">bool</span> IsConnected => _connection != <span style="color: rgba(0, 0, 255, 1)">null</span> && _connection.IsOpen && !<span style="color: rgba(0, 0, 0, 1)">_disposed;
</span><span style="color: rgba(0, 0, 255, 1)">public</span><span style="color: rgba(0, 0, 0, 1)"> IModel CreateModel()
{
</span><span style="color: rgba(0, 0, 255, 1)">if</span> (!<span style="color: rgba(0, 0, 0, 1)">IsConnected)
{
</span><span style="color: rgba(0, 0, 255, 1)">throw</span> <span style="color: rgba(0, 0, 255, 1)">new</span> InvalidOperationException(<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">No RabbitMQ connections are available to perform this action</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);
}
</span><span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)"> _connection.CreateModel();
}
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">bool</span><span style="color: rgba(0, 0, 0, 1)"> TryConnect()
{
</span><span style="color: rgba(0, 0, 255, 1)">lock</span><span style="color: rgba(0, 0, 0, 1)"> (_syncRoot)
{
</span><span style="color: rgba(0, 0, 255, 1)">if</span> (IsConnected) <span style="color: rgba(0, 0, 255, 1)">return</span> <span style="color: rgba(0, 0, 255, 1)">true</span><span style="color: rgba(0, 0, 0, 1)">;
_logger.LogInformation(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">RabbitMQ Client is trying to connect</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);
</span><span style="color: rgba(0, 0, 255, 1)">try</span><span style="color: rgba(0, 0, 0, 1)">
{
_connection </span>=<span style="color: rgba(0, 0, 0, 1)"> _connectionFactory.CreateConnection();
_connection.ConnectionShutdown </span>+=<span style="color: rgba(0, 0, 0, 1)"> OnConnectionShutdown;
_connection.CallbackException </span>+=<span style="color: rgba(0, 0, 0, 1)"> OnCallbackException;
_connection.ConnectionBlocked </span>+=<span style="color: rgba(0, 0, 0, 1)"> OnConnectionBlocked;
_logger.LogInformation(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">RabbitMQ Client acquired a persistent connection to '{HostName}' and is subscribed to failure events</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">,
_connectionFactory.HostName);
</span><span style="color: rgba(0, 0, 255, 1)">return</span> <span style="color: rgba(0, 0, 255, 1)">true</span><span style="color: rgba(0, 0, 0, 1)">;
}
</span><span style="color: rgba(0, 0, 255, 1)">catch</span><span style="color: rgba(0, 0, 0, 1)"> (BrokerUnreachableException ex)
{
_logger.LogError(ex, </span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">RabbitMQ connection failed: {Message}</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">, ex.Message);
</span><span style="color: rgba(0, 0, 255, 1)">return</span> <span style="color: rgba(0, 0, 255, 1)">false</span><span style="color: rgba(0, 0, 0, 1)">;
}
</span><span style="color: rgba(0, 0, 255, 1)">catch</span><span style="color: rgba(0, 0, 0, 1)"> (SocketException ex)
{
_logger.LogError(ex, </span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">RabbitMQ connection failed: {Message}</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">, ex.Message);
</span><span style="color: rgba(0, 0, 255, 1)">return</span> <span style="color: rgba(0, 0, 255, 1)">false</span><span style="color: rgba(0, 0, 0, 1)">;
}
}
}
</span><span style="color: rgba(0, 0, 255, 1)">private</span> <span style="color: rgba(0, 0, 255, 1)">void</span> OnConnectionBlocked(<span style="color: rgba(0, 0, 255, 1)">object</span><span style="color: rgba(0, 0, 0, 1)"> sender, ConnectionBlockedEventArgs e)
{
</span><span style="color: rgba(0, 0, 255, 1)">if</span> (_disposed) <span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)">;
_logger.LogWarning(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">A RabbitMQ connection is blocked. Reason: {Reason}</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">, e.Reason);
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 这里可以实现重连逻辑</span>
<span style="color: rgba(0, 0, 0, 1)"> TryConnect();
}
</span><span style="color: rgba(0, 0, 255, 1)">private</span> <span style="color: rgba(0, 0, 255, 1)">void</span> OnCallbackException(<span style="color: rgba(0, 0, 255, 1)">object</span><span style="color: rgba(0, 0, 0, 1)"> sender, CallbackExceptionEventArgs e)
{
</span><span style="color: rgba(0, 0, 255, 1)">if</span> (_disposed) <span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)">;
_logger.LogWarning(e.Exception, </span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">A RabbitMQ connection throw exception. Trying to re-connect...</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 这里可以实现重连逻辑</span>
<span style="color: rgba(0, 0, 0, 1)"> TryConnect();
}
</span><span style="color: rgba(0, 0, 255, 1)">private</span> <span style="color: rgba(0, 0, 255, 1)">void</span> OnConnectionShutdown(<span style="color: rgba(0, 0, 255, 1)">object</span><span style="color: rgba(0, 0, 0, 1)"> sender, ShutdownEventArgs reason)
{
</span><span style="color: rgba(0, 0, 255, 1)">if</span> (_disposed) <span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)">;
_logger.LogWarning(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">A RabbitMQ connection is on shutdown. Trying to re-connect...</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 这里可以实现重连逻辑</span>
<span style="color: rgba(0, 0, 0, 1)"> TryConnect();
}
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> Dispose()
{
</span><span style="color: rgba(0, 0, 255, 1)">if</span> (_disposed) <span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)">;
_disposed </span>= <span style="color: rgba(0, 0, 255, 1)">true</span><span style="color: rgba(0, 0, 0, 1)">;
</span><span style="color: rgba(0, 0, 255, 1)">try</span><span style="color: rgba(0, 0, 0, 1)">
{
_connection</span>?<span style="color: rgba(0, 0, 0, 1)">.Dispose();
}
</span><span style="color: rgba(0, 0, 255, 1)">catch</span><span style="color: rgba(0, 0, 0, 1)"> (IOException ex)
{
_logger.LogCritical(ex, </span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Error disposing RabbitMQ connection</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);
}
}
}
}</span></pre>
</div>
</div>
</div>
</div>
</div>
</div>
<p class="ds-markdown-paragraph">Services/IOrderPublisher.cs</p>
<div class="md-code-block md-code-block-light">
<div class="md-code-block-banner-wrap">
<div class="md-code-block-banner md-code-block-banner-lite">
<div class="_121d384">
<div class="d2a24f03">
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 255, 1)">using</span><span style="color: rgba(0, 0, 0, 1)"> Order.Core.Messages;
</span><span style="color: rgba(0, 0, 255, 1)">namespace</span><span style="color: rgba(0, 0, 0, 1)"> Order.Infrastructure.Services
{
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">interface</span><span style="color: rgba(0, 0, 0, 1)"> IOrderPublisher
{
Task PublishOrderCreatedAsync(OrderMessage order);
Task PublishOrderStatusAsync(OrderStatusMessage status);
}
}</span></pre>
</div>
</div>
</div>
</div>
</div>
</div>
<p class="ds-markdown-paragraph">Services/OrderPublisher.cs</p>
<div class="md-code-block md-code-block-light">
<div class="md-code-block-banner-wrap">
<div class="md-code-block-banner md-code-block-banner-lite">
<div class="_121d384">
<div class="d2a24f03">
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 255, 1)">using</span><span style="color: rgba(0, 0, 0, 1)"> System.Text;
</span><span style="color: rgba(0, 0, 255, 1)">using</span><span style="color: rgba(0, 0, 0, 1)"> System.Text.Json;
</span><span style="color: rgba(0, 0, 255, 1)">using</span><span style="color: rgba(0, 0, 0, 1)"> Microsoft.Extensions.Logging;
</span><span style="color: rgba(0, 0, 255, 1)">using</span><span style="color: rgba(0, 0, 0, 1)"> Order.Core.Messages;
</span><span style="color: rgba(0, 0, 255, 1)">using</span><span style="color: rgba(0, 0, 0, 1)"> RabbitMQ.Client;
</span><span style="color: rgba(0, 0, 255, 1)">namespace</span><span style="color: rgba(0, 0, 0, 1)"> Order.Infrastructure.Services
{
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">class</span><span style="color: rgba(0, 0, 0, 1)"> OrderPublisher : IOrderPublisher
{
</span><span style="color: rgba(0, 0, 255, 1)">private</span> <span style="color: rgba(0, 0, 255, 1)">readonly</span><span style="color: rgba(0, 0, 0, 1)"> IRabbitMQConnection _connection;
</span><span style="color: rgba(0, 0, 255, 1)">private</span> <span style="color: rgba(0, 0, 255, 1)">readonly</span> ILogger<OrderPublisher><span style="color: rgba(0, 0, 0, 1)"> _logger;
</span><span style="color: rgba(0, 0, 255, 1)">private</span> <span style="color: rgba(0, 0, 255, 1)">const</span> <span style="color: rgba(0, 0, 255, 1)">string</span> ExchangeName = <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">order.events</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">;
</span><span style="color: rgba(0, 0, 255, 1)">private</span> <span style="color: rgba(0, 0, 255, 1)">const</span> <span style="color: rgba(0, 0, 255, 1)">string</span> OrderCreatedRoutingKey = <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">order.created</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">;
</span><span style="color: rgba(0, 0, 255, 1)">private</span> <span style="color: rgba(0, 0, 255, 1)">const</span> <span style="color: rgba(0, 0, 255, 1)">string</span> OrderStatusRoutingKey = <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">order.status</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">;
</span><span style="color: rgba(0, 0, 255, 1)">public</span> OrderPublisher(IRabbitMQConnection connection, ILogger<OrderPublisher><span style="color: rgba(0, 0, 0, 1)"> logger)
{
_connection </span>=<span style="color: rgba(0, 0, 0, 1)"> connection;
_logger </span>=<span style="color: rgba(0, 0, 0, 1)"> logger;
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 确保交换机和队列存在</span>
<span style="color: rgba(0, 0, 0, 1)"> InitializeInfrastructure();
}
</span><span style="color: rgba(0, 0, 255, 1)">private</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> InitializeInfrastructure()
{
</span><span style="color: rgba(0, 0, 255, 1)">using</span> <span style="color: rgba(0, 0, 255, 1)">var</span> channel =<span style="color: rgba(0, 0, 0, 1)"> _connection.CreateModel();
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 声明主题交换机</span>
channel.ExchangeDeclare(ExchangeName, ExchangeType.Topic, durable: <span style="color: rgba(0, 0, 255, 1)">true</span><span style="color: rgba(0, 0, 0, 1)">);
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 声明订单创建队列</span>
channel.QueueDeclare(<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">order.created.queue</span><span style="color: rgba(128, 0, 0, 1)">"</span>, durable: <span style="color: rgba(0, 0, 255, 1)">true</span>, exclusive: <span style="color: rgba(0, 0, 255, 1)">false</span>, autoDelete: <span style="color: rgba(0, 0, 255, 1)">false</span><span style="color: rgba(0, 0, 0, 1)">);
channel.QueueBind(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">order.created.queue</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">, ExchangeName, OrderCreatedRoutingKey);
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 声明订单状态队列</span>
channel.QueueDeclare(<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">order.status.queue</span><span style="color: rgba(128, 0, 0, 1)">"</span>, durable: <span style="color: rgba(0, 0, 255, 1)">true</span>, exclusive: <span style="color: rgba(0, 0, 255, 1)">false</span>, autoDelete: <span style="color: rgba(0, 0, 255, 1)">false</span><span style="color: rgba(0, 0, 0, 1)">);
channel.QueueBind(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">order.status.queue</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">, ExchangeName, OrderStatusRoutingKey);
_logger.LogInformation(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">RabbitMQ infrastructure initialized</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);
}
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">async</span><span style="color: rgba(0, 0, 0, 1)"> Task PublishOrderCreatedAsync(OrderMessage order)
{
</span><span style="color: rgba(0, 0, 255, 1)">await</span> PublishMessageAsync(order, OrderCreatedRoutingKey, <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">OrderCreated</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);
}
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">async</span><span style="color: rgba(0, 0, 0, 1)"> Task PublishOrderStatusAsync(OrderStatusMessage status)
{
</span><span style="color: rgba(0, 0, 255, 1)">await</span> PublishMessageAsync(status, OrderStatusRoutingKey, <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">OrderStatus</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);
}
</span><span style="color: rgba(0, 0, 255, 1)">private</span> <span style="color: rgba(0, 0, 255, 1)">async</span> Task PublishMessageAsync<T>(T message, <span style="color: rgba(0, 0, 255, 1)">string</span> routingKey, <span style="color: rgba(0, 0, 255, 1)">string</span><span style="color: rgba(0, 0, 0, 1)"> messageType)
{
</span><span style="color: rgba(0, 0, 255, 1)">if</span> (!<span style="color: rgba(0, 0, 0, 1)">_connection.IsConnected)
{
_connection.TryConnect();
}
</span><span style="color: rgba(0, 0, 255, 1)">using</span> <span style="color: rgba(0, 0, 255, 1)">var</span> channel =<span style="color: rgba(0, 0, 0, 1)"> _connection.CreateModel();
</span><span style="color: rgba(0, 0, 255, 1)">var</span> json =<span style="color: rgba(0, 0, 0, 1)"> JsonSerializer.Serialize(message);
</span><span style="color: rgba(0, 0, 255, 1)">var</span> body =<span style="color: rgba(0, 0, 0, 1)"> Encoding.UTF8.GetBytes(json);
</span><span style="color: rgba(0, 0, 255, 1)">var</span> properties =<span style="color: rgba(0, 0, 0, 1)"> channel.CreateBasicProperties();
properties.Persistent </span>= <span style="color: rgba(0, 0, 255, 1)">true</span><span style="color: rgba(0, 0, 0, 1)">;
properties.ContentType </span>= <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">application/json</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">;
properties.Type </span>=<span style="color: rgba(0, 0, 0, 1)"> messageType;
</span><span style="color: rgba(0, 0, 255, 1)">try</span><span style="color: rgba(0, 0, 0, 1)">
{
channel.BasicPublish(
exchange: ExchangeName,
routingKey: routingKey,
mandatory: </span><span style="color: rgba(0, 0, 255, 1)">true</span><span style="color: rgba(0, 0, 0, 1)">,
basicProperties: properties,
body: body);
_logger.LogInformation(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Published {MessageType} message for Order {OrderId}</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">,
messageType, GetOrderId(message));
}
</span><span style="color: rgba(0, 0, 255, 1)">catch</span><span style="color: rgba(0, 0, 0, 1)"> (Exception ex)
{
_logger.LogError(ex, </span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Error publishing {MessageType} message for Order {OrderId}</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">,
messageType, GetOrderId(message));
</span><span style="color: rgba(0, 0, 255, 1)">throw</span><span style="color: rgba(0, 0, 0, 1)">;
}
</span><span style="color: rgba(0, 0, 255, 1)">await</span><span style="color: rgba(0, 0, 0, 1)"> Task.CompletedTask;
}
</span><span style="color: rgba(0, 0, 255, 1)">private</span> <span style="color: rgba(0, 0, 255, 1)">static</span> <span style="color: rgba(0, 0, 255, 1)">string</span> GetOrderId<T><span style="color: rgba(0, 0, 0, 1)">(T message)
{
</span><span style="color: rgba(0, 0, 255, 1)">return</span> message <span style="color: rgba(0, 0, 255, 1)">switch</span><span style="color: rgba(0, 0, 0, 1)">
{
OrderMessage order </span>=><span style="color: rgba(0, 0, 0, 1)"> order.OrderId,
OrderStatusMessage status </span>=><span style="color: rgba(0, 0, 0, 1)"> status.OrderId,
_ </span>=> <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">unknown</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">
};
}
}
}</span></pre>
</div>
</div>
</div>
</div>
</div>
</div>
<h3>第4步:Order.API项目配置</h3>
<p class="ds-markdown-paragraph">appsettings.json</p>
<div class="md-code-block md-code-block-light">
<div class="md-code-block-banner-wrap">
<div class="md-code-block-banner md-code-block-banner-lite">
<div class="_121d384">
<div class="d2a24f03">
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 0, 1)">{
</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">RabbitMQ</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">: {
</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">HostName</span><span style="color: rgba(128, 0, 0, 1)">"</span>: <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">localhost</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">,
</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">UserName</span><span style="color: rgba(128, 0, 0, 1)">"</span>: <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">myuser</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">,
</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Password</span><span style="color: rgba(128, 0, 0, 1)">"</span>: <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">mypassword</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">,
</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Port</span><span style="color: rgba(128, 0, 0, 1)">"</span>: <span style="color: rgba(128, 0, 128, 1)">5672</span><span style="color: rgba(0, 0, 0, 1)">,
</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">VirtualHost</span><span style="color: rgba(128, 0, 0, 1)">"</span>: <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">/</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">
},
</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Logging</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">: {
</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">LogLevel</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">: {
</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Default</span><span style="color: rgba(128, 0, 0, 1)">"</span>: <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Information</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">,
</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Microsoft.AspNetCore</span><span style="color: rgba(128, 0, 0, 1)">"</span>: <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Warning</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">
}
},
</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">AllowedHosts</span><span style="color: rgba(128, 0, 0, 1)">"</span>: <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">*</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">
}</span></pre>
</div>
</div>
</div>
</div>
</div>
<pre></pre>
</div>
<p class="ds-markdown-paragraph">Program.cs</p>
<div class="md-code-block md-code-block-light">
<div class="md-code-block-banner-wrap">
<div class="md-code-block-banner md-code-block-banner-lite">
<div class="_121d384">
<div class="d2a24f03">
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 255, 1)">using</span><span style="color: rgba(0, 0, 0, 1)"> Order.API.Controllers;
</span><span style="color: rgba(0, 0, 255, 1)">using</span><span style="color: rgba(0, 0, 0, 1)"> Order.API.Services;
</span><span style="color: rgba(0, 0, 255, 1)">using</span><span style="color: rgba(0, 0, 0, 1)"> Order.Core.Models;
</span><span style="color: rgba(0, 0, 255, 1)">using</span><span style="color: rgba(0, 0, 0, 1)"> Order.Infrastructure.Services;
</span><span style="color: rgba(0, 0, 255, 1)">using</span><span style="color: rgba(0, 0, 0, 1)"> RabbitMQ.Client;
</span><span style="color: rgba(0, 0, 255, 1)">var</span> builder =<span style="color: rgba(0, 0, 0, 1)"> WebApplication.CreateBuilder(args);
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> Add services to the container.</span>
<span style="color: rgba(0, 0, 0, 1)">builder.Services.AddControllers();
builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen();
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> Configure RabbitMQ</span>
builder.Services.AddSingleton<IConnectionFactory>(sp =><span style="color: rgba(0, 0, 0, 1)">
{
</span><span style="color: rgba(0, 0, 255, 1)">var</span> configuration = sp.GetRequiredService<IConfiguration><span style="color: rgba(0, 0, 0, 1)">();
</span><span style="color: rgba(0, 0, 255, 1)">return</span> <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> ConnectionFactory
{
HostName </span>= configuration[<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">RabbitMQ:HostName</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">],
UserName </span>= configuration[<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">RabbitMQ:UserName</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">],
Password </span>= configuration[<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">RabbitMQ:Password</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">],
Port </span>= <span style="color: rgba(0, 0, 255, 1)">int</span>.Parse(configuration[<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">RabbitMQ:Port</span><span style="color: rgba(128, 0, 0, 1)">"</span>] ?? <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">5672</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">),
VirtualHost </span>= configuration[<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">RabbitMQ:VirtualHost</span><span style="color: rgba(128, 0, 0, 1)">"</span>] ?? <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">/</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">,
DispatchConsumersAsync </span>= <span style="color: rgba(0, 0, 255, 1)">true</span><span style="color: rgba(0, 0, 0, 1)">
};
});
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> Register RabbitMQ services</span>
builder.Services.AddSingleton<IRabbitMQConnection, RabbitMQConnection><span style="color: rgba(0, 0, 0, 1)">();
builder.Services.AddScoped</span><IOrderPublisher, OrderPublisher><span style="color: rgba(0, 0, 0, 1)">();
builder.Services.AddScoped</span><IOrderService, OrderService><span style="color: rgba(0, 0, 0, 1)">();
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> Add Health Checks</span>
<span style="color: rgba(0, 0, 0, 1)">builder.Services.AddHealthChecks()
.AddRabbitMQ(provider </span>=><span style="color: rgba(0, 0, 0, 1)">
{
</span><span style="color: rgba(0, 0, 255, 1)">var</span> factory = provider.GetRequiredService<IConnectionFactory><span style="color: rgba(0, 0, 0, 1)">();
</span><span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)"> factory.CreateConnection();
}, name: </span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">rabbitmq</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> Add hosted service for status updates consumer</span>
builder.Services.AddHostedService<OrderStatusConsumerService><span style="color: rgba(0, 0, 0, 1)">();
</span><span style="color: rgba(0, 0, 255, 1)">var</span> app =<span style="color: rgba(0, 0, 0, 1)"> builder.Build();
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> Configure the HTTP request pipeline.</span>
<span style="color: rgba(0, 0, 255, 1)">if</span><span style="color: rgba(0, 0, 0, 1)"> (app.Environment.IsDevelopment())
{
app.UseSwagger();
app.UseSwaggerUI();
}
app.UseHttpsRedirection();
app.UseAuthorization();
app.MapControllers();
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> Add health check endpoint</span>
app.MapHealthChecks(<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">/health</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);
app.Run();</span></pre>
</div>
</div>
</div>
</div>
</div>
</div>
<p class="ds-markdown-paragraph">Services/IOrderService.cs</p>
<div class="md-code-block md-code-block-light">
<div class="md-code-block-banner-wrap">
<div class="md-code-block-banner md-code-block-banner-lite">
<div class="_121d384">
<div class="d2a24f03">
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 255, 1)">using</span><span style="color: rgba(0, 0, 0, 1)"> Order.Core.Models;
</span><span style="color: rgba(0, 0, 255, 1)">namespace</span><span style="color: rgba(0, 0, 0, 1)"> Order.API.Services
{
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">interface</span><span style="color: rgba(0, 0, 0, 1)"> IOrderService
{
Task</span><Order> CreateOrderAsync(<span style="color: rgba(0, 0, 255, 1)">string</span> customerId, <span style="color: rgba(0, 0, 255, 1)">string</span> productId, <span style="color: rgba(0, 0, 255, 1)">int</span> quantity, <span style="color: rgba(0, 0, 255, 1)">decimal</span><span style="color: rgba(0, 0, 0, 1)"> unitPrice);
Task</span><Order?> GetOrderAsync(<span style="color: rgba(0, 0, 255, 1)">string</span><span style="color: rgba(0, 0, 0, 1)"> orderId);
Task UpdateOrderStatusAsync(</span><span style="color: rgba(0, 0, 255, 1)">string</span><span style="color: rgba(0, 0, 0, 1)"> orderId, OrderStatus status);
}
}</span></pre>
</div>
</div>
</div>
</div>
</div>
</div>
<p class="ds-markdown-paragraph">Services/OrderService.cs</p>
<div class="cnblogs_code"><img id="code_img_closed_224d9537-a296-41b8-a874-bcd15190cdfb" class="code_img_closed lazyload" data-src="http://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif"><img id="code_img_opened_224d9537-a296-41b8-a874-bcd15190cdfb" class="code_img_opened lazyload" style="display: none" data-src="http://images.cnblogs.com/OutliningIndicators/ExpandedBlockStart.gif">
<div id="cnblogs_code_open_224d9537-a296-41b8-a874-bcd15190cdfb" class="cnblogs_code_hide">
<pre><span style="color: rgba(0, 0, 255, 1)">using</span><span style="color: rgba(0, 0, 0, 1)"> Order.Core.Messages;
</span><span style="color: rgba(0, 0, 255, 1)">using</span><span style="color: rgba(0, 0, 0, 1)"> Order.Core.Models;
</span><span style="color: rgba(0, 0, 255, 1)">using</span><span style="color: rgba(0, 0, 0, 1)"> Order.Infrastructure.Services;
</span><span style="color: rgba(0, 0, 255, 1)">namespace</span><span style="color: rgba(0, 0, 0, 1)"> Order.API.Services
{
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">class</span><span style="color: rgba(0, 0, 0, 1)"> OrderService : IOrderService
{
</span><span style="color: rgba(0, 0, 255, 1)">private</span> <span style="color: rgba(0, 0, 255, 1)">readonly</span><span style="color: rgba(0, 0, 0, 1)"> IOrderPublisher _orderPublisher;
</span><span style="color: rgba(0, 0, 255, 1)">private</span> <span style="color: rgba(0, 0, 255, 1)">readonly</span> ILogger<OrderService><span style="color: rgba(0, 0, 0, 1)"> _logger;
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 内存存储用于演示(生产环境应该用数据库)</span>
<span style="color: rgba(0, 0, 255, 1)">private</span> <span style="color: rgba(0, 0, 255, 1)">static</span> <span style="color: rgba(0, 0, 255, 1)">readonly</span> Dictionary<<span style="color: rgba(0, 0, 255, 1)">string</span>, Order> _orders = <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)">();
</span><span style="color: rgba(0, 0, 255, 1)">public</span> OrderService(IOrderPublisher orderPublisher, ILogger<OrderService><span style="color: rgba(0, 0, 0, 1)"> logger)
{
_orderPublisher </span>=<span style="color: rgba(0, 0, 0, 1)"> orderPublisher;
_logger </span>=<span style="color: rgba(0, 0, 0, 1)"> logger;
}
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">async</span> Task<Order> CreateOrderAsync(<span style="color: rgba(0, 0, 255, 1)">string</span> customerId, <span style="color: rgba(0, 0, 255, 1)">string</span> productId, <span style="color: rgba(0, 0, 255, 1)">int</span> quantity, <span style="color: rgba(0, 0, 255, 1)">decimal</span><span style="color: rgba(0, 0, 0, 1)"> unitPrice)
{
</span><span style="color: rgba(0, 0, 255, 1)">var</span> order = <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> Order
{
CustomerId </span>=<span style="color: rgba(0, 0, 0, 1)"> customerId,
ProductId </span>=<span style="color: rgba(0, 0, 0, 1)"> productId,
Quantity </span>=<span style="color: rgba(0, 0, 0, 1)"> quantity,
TotalAmount </span>= quantity *<span style="color: rgba(0, 0, 0, 1)"> unitPrice,
Status </span>=<span style="color: rgba(0, 0, 0, 1)"> OrderStatus.Pending
};
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 保存到内存</span>
_orders =<span style="color: rgba(0, 0, 0, 1)"> order;
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 发布订单创建事件</span>
<span style="color: rgba(0, 0, 255, 1)">var</span> orderMessage = <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> OrderMessage
{
OrderId </span>=<span style="color: rgba(0, 0, 0, 1)"> order.Id,
CustomerId </span>=<span style="color: rgba(0, 0, 0, 1)"> order.CustomerId,
ProductId </span>=<span style="color: rgba(0, 0, 0, 1)"> order.ProductId,
Quantity </span>=<span style="color: rgba(0, 0, 0, 1)"> order.Quantity,
TotalAmount </span>=<span style="color: rgba(0, 0, 0, 1)"> order.TotalAmount,
Action </span>= <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">create</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">
};
</span><span style="color: rgba(0, 0, 255, 1)">await</span><span style="color: rgba(0, 0, 0, 1)"> _orderPublisher.PublishOrderCreatedAsync(orderMessage);
_logger.LogInformation(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Order {OrderId} created and published</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">, order.Id);
</span><span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)"> order;
}
</span><span style="color: rgba(0, 0, 255, 1)">public</span> Task<Order?> GetOrderAsync(<span style="color: rgba(0, 0, 255, 1)">string</span><span style="color: rgba(0, 0, 0, 1)"> orderId)
{
_orders.TryGetValue(orderId, </span><span style="color: rgba(0, 0, 255, 1)">out</span> <span style="color: rgba(0, 0, 255, 1)">var</span><span style="color: rgba(0, 0, 0, 1)"> order);
</span><span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)"> Task.FromResult(order);
}
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">async</span> Task UpdateOrderStatusAsync(<span style="color: rgba(0, 0, 255, 1)">string</span><span style="color: rgba(0, 0, 0, 1)"> orderId, OrderStatus status)
{
</span><span style="color: rgba(0, 0, 255, 1)">if</span> (_orders.TryGetValue(orderId, <span style="color: rgba(0, 0, 255, 1)">out</span> <span style="color: rgba(0, 0, 255, 1)">var</span><span style="color: rgba(0, 0, 0, 1)"> order))
{
order.Status </span>=<span style="color: rgba(0, 0, 0, 1)"> status;
order.ProcessedAt </span>=<span style="color: rgba(0, 0, 0, 1)"> DateTime.UtcNow;
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 发布状态更新</span>
<span style="color: rgba(0, 0, 255, 1)">var</span> statusMessage = <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> OrderStatusMessage
{
OrderId </span>=<span style="color: rgba(0, 0, 0, 1)"> orderId,
Status </span>=<span style="color: rgba(0, 0, 0, 1)"> status,
Message </span>= $<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Order {status.ToString().ToLower()}</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">
};
</span><span style="color: rgba(0, 0, 255, 1)">await</span><span style="color: rgba(0, 0, 0, 1)"> _orderPublisher.PublishOrderStatusAsync(statusMessage);
_logger.LogInformation(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Order {OrderId} status updated to {Status}</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">, orderId, status);
}
}
}
}</span></pre>
</div>
<span class="cnblogs_code_collapse">View Code</span></div>
<p class="ds-markdown-paragraph">Services/OrderStatusConsumerService.cs</p>
<div class="md-code-block md-code-block-light">
<div class="md-code-block-banner-wrap">
<div class="md-code-block-banner md-code-block-banner-lite">
<div class="_121d384">
<div class="d2a24f03">
<div class="cnblogs_code"><img id="code_img_closed_f3b3d861-a0b0-4b53-9a4e-b8f843c16fdd" class="code_img_closed lazyload" data-src="http://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif"><img id="code_img_opened_f3b3d861-a0b0-4b53-9a4e-b8f843c16fdd" class="code_img_opened lazyload" style="display: none" data-src="http://images.cnblogs.com/OutliningIndicators/ExpandedBlockStart.gif">
<div id="cnblogs_code_open_f3b3d861-a0b0-4b53-9a4e-b8f843c16fdd" class="cnblogs_code_hide">
<pre><span style="color: rgba(0, 0, 255, 1)">using</span><span style="color: rgba(0, 0, 0, 1)"> System.Text;
</span><span style="color: rgba(0, 0, 255, 1)">using</span><span style="color: rgba(0, 0, 0, 1)"> System.Text.Json;
</span><span style="color: rgba(0, 0, 255, 1)">using</span><span style="color: rgba(0, 0, 0, 1)"> Microsoft.Extensions.Options;
</span><span style="color: rgba(0, 0, 255, 1)">using</span><span style="color: rgba(0, 0, 0, 1)"> Order.API.Services;
</span><span style="color: rgba(0, 0, 255, 1)">using</span><span style="color: rgba(0, 0, 0, 1)"> Order.Core.Messages;
</span><span style="color: rgba(0, 0, 255, 1)">using</span><span style="color: rgba(0, 0, 0, 1)"> Order.Infrastructure.Services;
</span><span style="color: rgba(0, 0, 255, 1)">using</span><span style="color: rgba(0, 0, 0, 1)"> RabbitMQ.Client;
</span><span style="color: rgba(0, 0, 255, 1)">using</span><span style="color: rgba(0, 0, 0, 1)"> RabbitMQ.Client.Events;
</span><span style="color: rgba(0, 0, 255, 1)">namespace</span><span style="color: rgba(0, 0, 0, 1)"> Order.API.Services
{
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">class</span><span style="color: rgba(0, 0, 0, 1)"> OrderStatusConsumerService : BackgroundService
{
</span><span style="color: rgba(0, 0, 255, 1)">private</span> <span style="color: rgba(0, 0, 255, 1)">readonly</span><span style="color: rgba(0, 0, 0, 1)"> IRabbitMQConnection _connection;
</span><span style="color: rgba(0, 0, 255, 1)">private</span> <span style="color: rgba(0, 0, 255, 1)">readonly</span><span style="color: rgba(0, 0, 0, 1)"> IServiceProvider _serviceProvider;
</span><span style="color: rgba(0, 0, 255, 1)">private</span> <span style="color: rgba(0, 0, 255, 1)">readonly</span> ILogger<OrderStatusConsumerService><span style="color: rgba(0, 0, 0, 1)"> _logger;
</span><span style="color: rgba(0, 0, 255, 1)">private</span><span style="color: rgba(0, 0, 0, 1)"> IModel _channel;
</span><span style="color: rgba(0, 0, 255, 1)">private</span> <span style="color: rgba(0, 0, 255, 1)">const</span> <span style="color: rgba(0, 0, 255, 1)">string</span> QueueName = <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">order.status.queue</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">;
</span><span style="color: rgba(0, 0, 255, 1)">public</span><span style="color: rgba(0, 0, 0, 1)"> OrderStatusConsumerService(
IRabbitMQConnection connection,
IServiceProvider serviceProvider,
ILogger</span><OrderStatusConsumerService><span style="color: rgba(0, 0, 0, 1)"> logger)
{
_connection </span>=<span style="color: rgba(0, 0, 0, 1)"> connection;
_serviceProvider </span>=<span style="color: rgba(0, 0, 0, 1)"> serviceProvider;
_logger </span>=<span style="color: rgba(0, 0, 0, 1)"> logger;
InitializeChannel();
}
</span><span style="color: rgba(0, 0, 255, 1)">private</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> InitializeChannel()
{
</span><span style="color: rgba(0, 0, 255, 1)">if</span> (!<span style="color: rgba(0, 0, 0, 1)">_connection.IsConnected)
{
_connection.TryConnect();
}
_channel </span>=<span style="color: rgba(0, 0, 0, 1)"> _connection.CreateModel();
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 确保队列存在(已经在Publisher中声明,这里做双重保险)</span>
_channel.QueueDeclare(QueueName, durable: <span style="color: rgba(0, 0, 255, 1)">true</span>, exclusive: <span style="color: rgba(0, 0, 255, 1)">false</span>, autoDelete: <span style="color: rgba(0, 0, 255, 1)">false</span><span style="color: rgba(0, 0, 0, 1)">);
_channel.BasicQos(</span><span style="color: rgba(128, 0, 128, 1)">0</span>, <span style="color: rgba(128, 0, 128, 1)">1</span>, <span style="color: rgba(0, 0, 255, 1)">false</span>); <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 公平分发</span>
<span style="color: rgba(0, 0, 0, 1)">
_logger.LogInformation(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">OrderStatusConsumerService channel initialized</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);
}
</span><span style="color: rgba(0, 0, 255, 1)">protected</span> <span style="color: rgba(0, 0, 255, 1)">override</span> <span style="color: rgba(0, 0, 255, 1)">async</span><span style="color: rgba(0, 0, 0, 1)"> Task ExecuteAsync(CancellationToken stoppingToken)
{
stoppingToken.ThrowIfCancellationRequested();
</span><span style="color: rgba(0, 0, 255, 1)">var</span> consumer = <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> AsyncEventingBasicConsumer(_channel);
consumer.Received </span>+= <span style="color: rgba(0, 0, 255, 1)">async</span> (model, ea) =><span style="color: rgba(0, 0, 0, 1)">
{
</span><span style="color: rgba(0, 0, 255, 1)">var</span> body =<span style="color: rgba(0, 0, 0, 1)"> ea.Body.ToArray();
</span><span style="color: rgba(0, 0, 255, 1)">var</span> message =<span style="color: rgba(0, 0, 0, 1)"> Encoding.UTF8.GetString(body);
</span><span style="color: rgba(0, 0, 255, 1)">try</span><span style="color: rgba(0, 0, 0, 1)">
{
</span><span style="color: rgba(0, 0, 255, 1)">await</span><span style="color: rgba(0, 0, 0, 1)"> ProcessMessageAsync(message);
_channel.BasicAck(ea.DeliveryTag, </span><span style="color: rgba(0, 0, 255, 1)">false</span><span style="color: rgba(0, 0, 0, 1)">);
}
</span><span style="color: rgba(0, 0, 255, 1)">catch</span><span style="color: rgba(0, 0, 0, 1)"> (Exception ex)
{
_logger.LogError(ex, </span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Error processing message: {Message}</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">, message);
_channel.BasicNack(ea.DeliveryTag, </span><span style="color: rgba(0, 0, 255, 1)">false</span>, <span style="color: rgba(0, 0, 255, 1)">false</span>); <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 不重新入队</span>
<span style="color: rgba(0, 0, 0, 1)"> }
};
_channel.BasicConsume(QueueName, </span><span style="color: rgba(0, 0, 255, 1)">false</span><span style="color: rgba(0, 0, 0, 1)">, consumer);
_logger.LogInformation(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">OrderStatusConsumerService started consuming</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);
</span><span style="color: rgba(0, 0, 255, 1)">await</span><span style="color: rgba(0, 0, 0, 1)"> Task.CompletedTask;
}
</span><span style="color: rgba(0, 0, 255, 1)">private</span> <span style="color: rgba(0, 0, 255, 1)">async</span> Task ProcessMessageAsync(<span style="color: rgba(0, 0, 255, 1)">string</span><span style="color: rgba(0, 0, 0, 1)"> message)
{
</span><span style="color: rgba(0, 0, 255, 1)">using</span> <span style="color: rgba(0, 0, 255, 1)">var</span> scope =<span style="color: rgba(0, 0, 0, 1)"> _serviceProvider.CreateScope();
</span><span style="color: rgba(0, 0, 255, 1)">var</span> orderService = scope.ServiceProvider.GetRequiredService<IOrderService><span style="color: rgba(0, 0, 0, 1)">();
</span><span style="color: rgba(0, 0, 255, 1)">try</span><span style="color: rgba(0, 0, 0, 1)">
{
</span><span style="color: rgba(0, 0, 255, 1)">var</span> statusMessage = JsonSerializer.Deserialize<OrderStatusMessage><span style="color: rgba(0, 0, 0, 1)">(message);
</span><span style="color: rgba(0, 0, 255, 1)">if</span> (statusMessage != <span style="color: rgba(0, 0, 255, 1)">null</span><span style="color: rgba(0, 0, 0, 1)">)
{
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 这里可以处理状态更新,比如更新数据库、发送通知等</span>
_logger.LogInformation(<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Received order status update: {OrderId} -> {Status}</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">,
statusMessage.OrderId, statusMessage.Status);
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 在实际应用中,这里可能会更新数据库中的订单状态
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> await orderService.UpdateOrderStatusAsync(statusMessage.OrderId, statusMessage.Status);</span>
<span style="color: rgba(0, 0, 0, 1)"> }
}
</span><span style="color: rgba(0, 0, 255, 1)">catch</span><span style="color: rgba(0, 0, 0, 1)"> (JsonException ex)
{
_logger.LogError(ex, </span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Error deserializing message: {Message}</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">, message);
</span><span style="color: rgba(0, 0, 255, 1)">throw</span><span style="color: rgba(0, 0, 0, 1)">;
}
}
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">override</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> Dispose()
{
_channel</span>?<span style="color: rgba(0, 0, 0, 1)">.Close();
_channel</span>?<span style="color: rgba(0, 0, 0, 1)">.Dispose();
</span><span style="color: rgba(0, 0, 255, 1)">base</span><span style="color: rgba(0, 0, 0, 1)">.Dispose();
}
}
}</span></pre>
</div>
<span class="cnblogs_code_collapse">View Code</span></div>
</div>
</div>
</div>
</div>
</div>
<p class="ds-markdown-paragraph">Controllers/OrdersController.cs</p>
<div class="md-code-block md-code-block-light">
<div class="md-code-block-banner-wrap">
<div class="md-code-block-banner md-code-block-banner-lite">
<div class="_121d384">
<div class="d2a24f03">
<div class="cnblogs_code"><img id="code_img_closed_9090d75e-ceaa-4cad-aba9-e82a942819bf" class="code_img_closed lazyload" data-src="http://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif"><img id="code_img_opened_9090d75e-ceaa-4cad-aba9-e82a942819bf" class="code_img_opened lazyload" style="display: none" data-src="http://images.cnblogs.com/OutliningIndicators/ExpandedBlockStart.gif">
<div id="cnblogs_code_open_9090d75e-ceaa-4cad-aba9-e82a942819bf" class="cnblogs_code_hide">
<pre><span style="color: rgba(0, 0, 255, 1)">using</span><span style="color: rgba(0, 0, 0, 1)"> Microsoft.AspNetCore.Mvc;
</span><span style="color: rgba(0, 0, 255, 1)">using</span><span style="color: rgba(0, 0, 0, 1)"> Order.API.Services;
</span><span style="color: rgba(0, 0, 255, 1)">using</span><span style="color: rgba(0, 0, 0, 1)"> Order.Core.Models;
</span><span style="color: rgba(0, 0, 255, 1)">namespace</span><span style="color: rgba(0, 0, 0, 1)"> Order.API.Controllers
{
</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">)]
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">class</span><span style="color: rgba(0, 0, 0, 1)"> OrdersController : ControllerBase
{
</span><span style="color: rgba(0, 0, 255, 1)">private</span> <span style="color: rgba(0, 0, 255, 1)">readonly</span><span style="color: rgba(0, 0, 0, 1)"> IOrderService _orderService;
</span><span style="color: rgba(0, 0, 255, 1)">private</span> <span style="color: rgba(0, 0, 255, 1)">readonly</span> ILogger<OrdersController><span style="color: rgba(0, 0, 0, 1)"> _logger;
</span><span style="color: rgba(0, 0, 255, 1)">public</span> OrdersController(IOrderService orderService, ILogger<OrdersController><span style="color: rgba(0, 0, 0, 1)"> logger)
{
_orderService </span>=<span style="color: rgba(0, 0, 0, 1)"> orderService;
_logger </span>=<span style="color: rgba(0, 0, 0, 1)"> logger;
}
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">async</span> Task<ActionResult<Order>><span style="color: rgba(0, 0, 0, 1)"> CreateOrder( CreateOrderRequest request)
{
</span><span style="color: rgba(0, 0, 255, 1)">try</span><span style="color: rgba(0, 0, 0, 1)">
{
</span><span style="color: rgba(0, 0, 255, 1)">var</span> order = <span style="color: rgba(0, 0, 255, 1)">await</span><span style="color: rgba(0, 0, 0, 1)"> _orderService.CreateOrderAsync(
request.CustomerId,
request.ProductId,
request.Quantity,
request.UnitPrice);
</span><span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)"> Ok(order);
}
</span><span style="color: rgba(0, 0, 255, 1)">catch</span><span style="color: rgba(0, 0, 0, 1)"> (Exception ex)
{
_logger.LogError(ex, </span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Error creating order</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);
</span><span style="color: rgba(0, 0, 255, 1)">return</span> StatusCode(<span style="color: rgba(128, 0, 128, 1)">500</span>, <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Error creating order</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);
}
}
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">async</span> Task<ActionResult<Order>> GetOrder(<span style="color: rgba(0, 0, 255, 1)">string</span><span style="color: rgba(0, 0, 0, 1)"> orderId)
{
</span><span style="color: rgba(0, 0, 255, 1)">var</span> order = <span style="color: rgba(0, 0, 255, 1)">await</span><span style="color: rgba(0, 0, 0, 1)"> _orderService.GetOrderAsync(orderId);
</span><span style="color: rgba(0, 0, 255, 1)">if</span> (order == <span style="color: rgba(0, 0, 255, 1)">null</span><span style="color: rgba(0, 0, 0, 1)">)
{
</span><span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)"> NotFound();
}
</span><span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)"> Ok(order);
}
</span><span style="color: rgba(0, 0, 255, 1)">public</span> ActionResult<IEnumerable<Order>><span style="color: rgba(0, 0, 0, 1)"> GetOrders()
{
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 这里只是演示,实际应该从数据库获取</span>
<span style="color: rgba(0, 0, 255, 1)">return</span> Ok(<span style="color: rgba(0, 0, 255, 1)">new</span> List<Order><span style="color: rgba(0, 0, 0, 1)">());
}
}
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">class</span><span style="color: rgba(0, 0, 0, 1)"> CreateOrderRequest
{
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">string</span> CustomerId { <span style="color: rgba(0, 0, 255, 1)">get</span>; <span style="color: rgba(0, 0, 255, 1)">set</span><span style="color: rgba(0, 0, 0, 1)">; }
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">string</span> ProductId { <span style="color: rgba(0, 0, 255, 1)">get</span>; <span style="color: rgba(0, 0, 255, 1)">set</span><span style="color: rgba(0, 0, 0, 1)">; }
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">int</span> Quantity { <span style="color: rgba(0, 0, 255, 1)">get</span>; <span style="color: rgba(0, 0, 255, 1)">set</span><span style="color: rgba(0, 0, 0, 1)">; }
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">decimal</span> UnitPrice { <span style="color: rgba(0, 0, 255, 1)">get</span>; <span style="color: rgba(0, 0, 255, 1)">set</span><span style="color: rgba(0, 0, 0, 1)">; }
}
}</span></pre>
</div>
<span class="cnblogs_code_collapse">View Code</span></div>
</div>
</div>
</div>
</div>
</div>
<h3>第5步:订单处理器服务(OrderProcessor.Service)</h3>
<p class="ds-markdown-paragraph">Program.cs</p>
<div class="md-code-block md-code-block-light">
<div class="md-code-block-banner-wrap">
<div class="md-code-block-banner md-code-block-banner-lite">
<div class="_121d384">
<div class="d2a24f03">
<div class="cnblogs_code"><img id="code_img_closed_9d0efc35-2b7c-4c29-b510-e4db1a906898" class="code_img_closed lazyload" data-src="http://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif"><img id="code_img_opened_9d0efc35-2b7c-4c29-b510-e4db1a906898" class="code_img_opened lazyload" style="display: none" data-src="http://images.cnblogs.com/OutliningIndicators/ExpandedBlockStart.gif">
<div id="cnblogs_code_open_9d0efc35-2b7c-4c29-b510-e4db1a906898" class="cnblogs_code_hide">
<pre><span style="color: rgba(0, 0, 255, 1)">using</span><span style="color: rgba(0, 0, 0, 1)"> Order.Core.Messages;
</span><span style="color: rgba(0, 0, 255, 1)">using</span><span style="color: rgba(0, 0, 0, 1)"> Order.Infrastructure.Services;
</span><span style="color: rgba(0, 0, 255, 1)">using</span><span style="color: rgba(0, 0, 0, 1)"> OrderProcessor.Service.Services;
</span><span style="color: rgba(0, 0, 255, 1)">using</span><span style="color: rgba(0, 0, 0, 1)"> RabbitMQ.Client;
</span><span style="color: rgba(0, 0, 255, 1)">var</span> builder =<span style="color: rgba(0, 0, 0, 1)"> Host.CreateApplicationBuilder(args);
builder.Services.AddHostedService</span><OrderProcessorService><span style="color: rgba(0, 0, 0, 1)">();
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> Configure RabbitMQ</span>
builder.Services.AddSingleton<IConnectionFactory>(sp =><span style="color: rgba(0, 0, 0, 1)">
{
</span><span style="color: rgba(0, 0, 255, 1)">var</span> configuration = sp.GetRequiredService<IConfiguration><span style="color: rgba(0, 0, 0, 1)">();
</span><span style="color: rgba(0, 0, 255, 1)">return</span> <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> ConnectionFactory
{
HostName </span>= configuration[<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">RabbitMQ:HostName</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">],
UserName </span>= configuration[<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">RabbitMQ:UserName</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">],
Password </span>= configuration[<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">RabbitMQ:Password</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">],
Port </span>= <span style="color: rgba(0, 0, 255, 1)">int</span>.Parse(configuration[<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">RabbitMQ:Port</span><span style="color: rgba(128, 0, 0, 1)">"</span>] ?? <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">5672</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">),
VirtualHost </span>= configuration[<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">RabbitMQ:VirtualHost</span><span style="color: rgba(128, 0, 0, 1)">"</span>] ?? <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">/</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">,
DispatchConsumersAsync </span>= <span style="color: rgba(0, 0, 255, 1)">true</span><span style="color: rgba(0, 0, 0, 1)">
};
});
builder.Services.AddSingleton</span><IRabbitMQConnection, RabbitMQConnection><span style="color: rgba(0, 0, 0, 1)">();
builder.Services.AddScoped</span><IOrderPublisher, OrderPublisher><span style="color: rgba(0, 0, 0, 1)">();
builder.Services.AddLogging();
</span><span style="color: rgba(0, 0, 255, 1)">var</span> host =<span style="color: rgba(0, 0, 0, 1)"> builder.Build();
host.Run();</span></pre>
</div>
<span class="cnblogs_code_collapse">View Code</span></div>
</div>
</div>
</div>
</div>
</div>
<p class="ds-markdown-paragraph">Services/OrderProcessorService.cs</p>
<div class="md-code-block md-code-block-light">
<div class="md-code-block-banner-wrap">
<div class="md-code-block-banner md-code-block-banner-lite">
<div class="_121d384">
<div class="d2a24f03">
<div class="cnblogs_code"><img id="code_img_closed_4c23c76b-8832-405d-b3fa-58431230be35" class="code_img_closed lazyload" data-src="http://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif"><img id="code_img_opened_4c23c76b-8832-405d-b3fa-58431230be35" class="code_img_opened lazyload" style="display: none" data-src="http://images.cnblogs.com/OutliningIndicators/ExpandedBlockStart.gif">
<div id="cnblogs_code_open_4c23c76b-8832-405d-b3fa-58431230be35" class="cnblogs_code_hide">
<pre><span style="color: rgba(0, 0, 255, 1)">using</span><span style="color: rgba(0, 0, 0, 1)"> System.Text;
</span><span style="color: rgba(0, 0, 255, 1)">using</span><span style="color: rgba(0, 0, 0, 1)"> System.Text.Json;
</span><span style="color: rgba(0, 0, 255, 1)">using</span><span style="color: rgba(0, 0, 0, 1)"> Microsoft.Extensions.Logging;
</span><span style="color: rgba(0, 0, 255, 1)">using</span><span style="color: rgba(0, 0, 0, 1)"> Order.Core.Messages;
</span><span style="color: rgba(0, 0, 255, 1)">using</span><span style="color: rgba(0, 0, 0, 1)"> Order.Infrastructure.Services;
</span><span style="color: rgba(0, 0, 255, 1)">using</span><span style="color: rgba(0, 0, 0, 1)"> RabbitMQ.Client;
</span><span style="color: rgba(0, 0, 255, 1)">using</span><span style="color: rgba(0, 0, 0, 1)"> RabbitMQ.Client.Events;
</span><span style="color: rgba(0, 0, 255, 1)">namespace</span><span style="color: rgba(0, 0, 0, 1)"> OrderProcessor.Service.Services
{
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">class</span><span style="color: rgba(0, 0, 0, 1)"> OrderProcessorService : BackgroundService
{
</span><span style="color: rgba(0, 0, 255, 1)">private</span> <span style="color: rgba(0, 0, 255, 1)">readonly</span><span style="color: rgba(0, 0, 0, 1)"> IRabbitMQConnection _connection;
</span><span style="color: rgba(0, 0, 255, 1)">private</span> <span style="color: rgba(0, 0, 255, 1)">readonly</span><span style="color: rgba(0, 0, 0, 1)"> IOrderPublisher _orderPublisher;
</span><span style="color: rgba(0, 0, 255, 1)">private</span> <span style="color: rgba(0, 0, 255, 1)">readonly</span> ILogger<OrderProcessorService><span style="color: rgba(0, 0, 0, 1)"> _logger;
</span><span style="color: rgba(0, 0, 255, 1)">private</span><span style="color: rgba(0, 0, 0, 1)"> IModel _channel;
</span><span style="color: rgba(0, 0, 255, 1)">private</span> <span style="color: rgba(0, 0, 255, 1)">const</span> <span style="color: rgba(0, 0, 255, 1)">string</span> QueueName = <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">order.created.queue</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">;
</span><span style="color: rgba(0, 0, 255, 1)">public</span><span style="color: rgba(0, 0, 0, 1)"> OrderProcessorService(
IRabbitMQConnection connection,
IOrderPublisher orderPublisher,
ILogger</span><OrderProcessorService><span style="color: rgba(0, 0, 0, 1)"> logger)
{
_connection </span>=<span style="color: rgba(0, 0, 0, 1)"> connection;
_orderPublisher </span>=<span style="color: rgba(0, 0, 0, 1)"> orderPublisher;
_logger </span>=<span style="color: rgba(0, 0, 0, 1)"> logger;
InitializeChannel();
}
</span><span style="color: rgba(0, 0, 255, 1)">private</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> InitializeChannel()
{
</span><span style="color: rgba(0, 0, 255, 1)">if</span> (!<span style="color: rgba(0, 0, 0, 1)">_connection.IsConnected)
{
_connection.TryConnect();
}
_channel </span>=<span style="color: rgba(0, 0, 0, 1)"> _connection.CreateModel();
_channel.QueueDeclare(QueueName, durable: </span><span style="color: rgba(0, 0, 255, 1)">true</span>, exclusive: <span style="color: rgba(0, 0, 255, 1)">false</span>, autoDelete: <span style="color: rgba(0, 0, 255, 1)">false</span><span style="color: rgba(0, 0, 0, 1)">);
_channel.BasicQos(</span><span style="color: rgba(128, 0, 128, 1)">0</span>, <span style="color: rgba(128, 0, 128, 1)">1</span>, <span style="color: rgba(0, 0, 255, 1)">false</span><span style="color: rgba(0, 0, 0, 1)">);
_logger.LogInformation(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">OrderProcessorService channel initialized</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);
}
</span><span style="color: rgba(0, 0, 255, 1)">protected</span> <span style="color: rgba(0, 0, 255, 1)">override</span> <span style="color: rgba(0, 0, 255, 1)">async</span><span style="color: rgba(0, 0, 0, 1)"> Task ExecuteAsync(CancellationToken stoppingToken)
{
stoppingToken.ThrowIfCancellationRequested();
</span><span style="color: rgba(0, 0, 255, 1)">var</span> consumer = <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> AsyncEventingBasicConsumer(_channel);
consumer.Received </span>+= <span style="color: rgba(0, 0, 255, 1)">async</span> (model, ea) =><span style="color: rgba(0, 0, 0, 1)">
{
</span><span style="color: rgba(0, 0, 255, 1)">var</span> body =<span style="color: rgba(0, 0, 0, 1)"> ea.Body.ToArray();
</span><span style="color: rgba(0, 0, 255, 1)">var</span> message =<span style="color: rgba(0, 0, 0, 1)"> Encoding.UTF8.GetString(body);
</span><span style="color: rgba(0, 0, 255, 1)">try</span><span style="color: rgba(0, 0, 0, 1)">
{
</span><span style="color: rgba(0, 0, 255, 1)">await</span><span style="color: rgba(0, 0, 0, 1)"> ProcessOrderAsync(message);
_channel.BasicAck(ea.DeliveryTag, </span><span style="color: rgba(0, 0, 255, 1)">false</span><span style="color: rgba(0, 0, 0, 1)">);
}
</span><span style="color: rgba(0, 0, 255, 1)">catch</span><span style="color: rgba(0, 0, 0, 1)"> (Exception ex)
{
_logger.LogError(ex, </span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Error processing order: {Message}</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">, message);
_channel.BasicNack(ea.DeliveryTag, </span><span style="color: rgba(0, 0, 255, 1)">false</span>, <span style="color: rgba(0, 0, 255, 1)">false</span><span style="color: rgba(0, 0, 0, 1)">);
}
};
_channel.BasicConsume(QueueName, </span><span style="color: rgba(0, 0, 255, 1)">false</span><span style="color: rgba(0, 0, 0, 1)">, consumer);
_logger.LogInformation(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">OrderProcessorService started consuming orders</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);
</span><span style="color: rgba(0, 0, 255, 1)">await</span><span style="color: rgba(0, 0, 0, 1)"> Task.CompletedTask;
}
</span><span style="color: rgba(0, 0, 255, 1)">private</span> <span style="color: rgba(0, 0, 255, 1)">async</span> Task ProcessOrderAsync(<span style="color: rgba(0, 0, 255, 1)">string</span><span style="color: rgba(0, 0, 0, 1)"> message)
{
</span><span style="color: rgba(0, 0, 255, 1)">try</span><span style="color: rgba(0, 0, 0, 1)">
{
</span><span style="color: rgba(0, 0, 255, 1)">var</span> orderMessage = JsonSerializer.Deserialize<OrderMessage><span style="color: rgba(0, 0, 0, 1)">(message);
</span><span style="color: rgba(0, 0, 255, 1)">if</span> (orderMessage == <span style="color: rgba(0, 0, 255, 1)">null</span><span style="color: rgba(0, 0, 0, 1)">)
{
_logger.LogWarning(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Received invalid order message: {Message}</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">, message);
</span><span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)">;
}
_logger.LogInformation(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Processing order {OrderId} for customer {CustomerId}</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">,
orderMessage.OrderId, orderMessage.CustomerId);
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 模拟订单处理逻辑</span>
<span style="color: rgba(0, 0, 255, 1)">await</span><span style="color: rgba(0, 0, 0, 1)"> ProcessOrderBusinessLogic(orderMessage);
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 发布处理完成状态</span>
<span style="color: rgba(0, 0, 255, 1)">var</span> statusMessage = <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> OrderStatusMessage
{
OrderId </span>=<span style="color: rgba(0, 0, 0, 1)"> orderMessage.OrderId,
Status </span>=<span style="color: rgba(0, 0, 0, 1)"> Order.Core.Models.OrderStatus.Completed,
Message </span>= <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Order processed successfully</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">
};
</span><span style="color: rgba(0, 0, 255, 1)">await</span><span style="color: rgba(0, 0, 0, 1)"> _orderPublisher.PublishOrderStatusAsync(statusMessage);
_logger.LogInformation(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Order {OrderId} processed successfully</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">, orderMessage.OrderId);
}
</span><span style="color: rgba(0, 0, 255, 1)">catch</span><span style="color: rgba(0, 0, 0, 1)"> (JsonException ex)
{
_logger.LogError(ex, </span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Error deserializing order message: {Message}</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">, message);
</span><span style="color: rgba(0, 0, 255, 1)">throw</span><span style="color: rgba(0, 0, 0, 1)">;
}
}
</span><span style="color: rgba(0, 0, 255, 1)">private</span> <span style="color: rgba(0, 0, 255, 1)">async</span><span style="color: rgba(0, 0, 0, 1)"> Task ProcessOrderBusinessLogic(OrderMessage orderMessage)
{
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 模拟复杂的业务逻辑处理</span>
_logger.LogInformation(<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Starting business logic for order {OrderId}</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">, orderMessage.OrderId);
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 模拟处理时间</span>
<span style="color: rgba(0, 0, 255, 1)">var</span> random = <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> Random();
</span><span style="color: rgba(0, 0, 255, 1)">var</span> processingTime = random.Next(<span style="color: rgba(128, 0, 128, 1)">2000</span>, <span style="color: rgba(128, 0, 128, 1)">5000</span><span style="color: rgba(0, 0, 0, 1)">);
</span><span style="color: rgba(0, 0, 255, 1)">await</span><span style="color: rgba(0, 0, 0, 1)"> Task.Delay(processingTime);
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 模拟10%的失败率</span>
<span style="color: rgba(0, 0, 255, 1)">if</span> (random.Next(<span style="color: rgba(128, 0, 128, 1)">0</span>, <span style="color: rgba(128, 0, 128, 1)">10</span>) == <span style="color: rgba(128, 0, 128, 1)">0</span><span style="color: rgba(0, 0, 0, 1)">)
{
</span><span style="color: rgba(0, 0, 255, 1)">throw</span> <span style="color: rgba(0, 0, 255, 1)">new</span> Exception(<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Simulated business logic failure</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);
}
_logger.LogInformation(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Business logic completed for order {OrderId}</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">, orderMessage.OrderId);
}
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">override</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> Dispose()
{
_channel</span>?<span style="color: rgba(0, 0, 0, 1)">.Close();
_channel</span>?<span style="color: rgba(0, 0, 0, 1)">.Dispose();
</span><span style="color: rgba(0, 0, 255, 1)">base</span><span style="color: rgba(0, 0, 0, 1)">.Dispose();
}
}
}</span></pre>
</div>
<span class="cnblogs_code_collapse">View Code</span></div>
</div>
</div>
</div>
</div>
</div>
<h3>第6步:运行与测试</h3>
<ol start="1">
<li>
<p class="ds-markdown-paragraph">启动服务</p>
<div class="md-code-block md-code-block-light">
<div class="md-code-block-banner-wrap">
<div class="md-code-block-banner md-code-block-banner-lite">
<div class="_121d384">
<div class="d2a24f03">
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 0, 1)"># 终端1:启动Order.API
cd Order.API
dotnet run
# 终端2:启动OrderProcessor.Service
cd OrderProcessor.Service
dotnet run</span></pre>
</div>
</div>
</div>
</div>
</div>
</div>
</li>
<li>
<p class="ds-markdown-paragraph">测试API</p>
<div class="md-code-block md-code-block-light">
<div class="md-code-block-banner-wrap">
<div class="md-code-block-banner md-code-block-banner-lite">
<div class="_121d384">
<div class="d2a24f03">
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 0, 1)"># 创建订单
curl </span>-X POST <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">https://localhost:7000/api/orders</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)"> \
</span>-H <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Content-Type: application/json</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)"> \
</span>-d <span style="color: rgba(128, 0, 0, 1)">'</span><span style="color: rgba(128, 0, 0, 1)">{</span>
<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">customerId</span><span style="color: rgba(128, 0, 0, 1)">"</span>: <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">customer-123</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">,
</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">productId</span><span style="color: rgba(128, 0, 0, 1)">"</span>: <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">product-456</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">,
</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">quantity</span><span style="color: rgba(128, 0, 0, 1)">"</span>: <span style="color: rgba(128, 0, 128, 1)">2</span><span style="color: rgba(0, 0, 0, 1)">,
</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">unitPrice</span><span style="color: rgba(128, 0, 0, 1)">"</span>: <span style="color: rgba(128, 0, 128, 1)">29.99</span><span style="color: rgba(0, 0, 0, 1)">
}</span><span style="color: rgba(128, 0, 0, 1)">'
</span><span style="color: rgba(0, 0, 0, 1)">
# 查询订单状态
curl </span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">https://localhost:7000/api/orders/{orderId}</span><span style="color: rgba(128, 0, 0, 1)">"</span></pre>
</div>
</div>
</div>
</div>
</div>
</div>
</li>
<li>
<p class="ds-markdown-paragraph">测试健康检查</p>
<div class="md-code-block md-code-block-light">
<div class="md-code-block-banner-wrap">
<div class="md-code-block-banner md-code-block-banner-lite">
<div class="_121d384">
<div class="d2a24f03">
<div class="cnblogs_code">
<pre>GET https:<span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">localhost:7000/health</span></pre>
</div>
</div>
</div>
</div>
</div>
</div>
</li>
<li>
<p class="ds-markdown-paragraph">观察日志输出</p>
<ul>
<li>
<p class="ds-markdown-paragraph">Order.API:接收HTTP请求,发布订单创建消息</p>
</li>
<li>
<p class="ds-markdown-paragraph">OrderProcessor.Service:消费订单消息,处理业务逻辑,发布状态更新</p>
</li>
<li>
<p class="ds-markdown-paragraph">Order.API:消费状态更新消息</p>
</li>
</ul>
</li>
<li>
<p class="ds-markdown-paragraph">测试错误场景</p>
<ul>
<li>
<p class="ds-markdown-paragraph">停止RabbitMQ服务,观察重连机制</p>
</li>
<li>
<p class="ds-markdown-paragraph">停止OrderProcessor.Service,观察消息堆积</p>
</li>
<li>
<p class="ds-markdown-paragraph">重启服务,观察消息恢复处理</p>
</li>
</ul>
</li>
</ol>
<h3>第7步:高级特性 - 配置重试和 resilience</h3>
<p class="ds-markdown-paragraph">在Order.Infrastructure中添加Polly支持:</p>
<div class="md-code-block md-code-block-light">
<div class="md-code-block-banner-wrap">
<div class="md-code-block-banner md-code-block-banner-lite">
<div class="_121d384">
<div class="d2a24f03">
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 添加NuGet包</span>
<span style="color: rgba(0, 0, 0, 1)">dotnet add package Polly
dotnet add package Microsoft.Extensions.Http.Polly
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 在Program.cs中添加重试策略</span>
builder.Services.AddHttpClient(<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">retry-client</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">)
.AddTransientHttpErrorPolicy(policy </span>=><span style="color: rgba(0, 0, 0, 1)">
policy.WaitAndRetryAsync(</span><span style="color: rgba(128, 0, 128, 1)">3</span>, retryAttempt =><span style="color: rgba(0, 0, 0, 1)">
TimeSpan.FromSeconds(Math.Pow(</span><span style="color: rgba(128, 0, 128, 1)">2</span>, retryAttempt))));</pre>
</div>
</div>
</div>
</div>
</div>
<pre></pre>
</div>
<hr>
<h2>本章总结</h2>
<p class="ds-markdown-paragraph">在这一章中,我们成功地将RabbitMQ集成到ASP.NET Core应用程序中,构建了一个完整的微服务系统:</p>
<ol start="1">
<li>
<p class="ds-markdown-paragraph">依赖注入配置:正确管理RabbitMQ连接和通道的生命周期。</p>
</li>
<li>
<p class="ds-markdown-paragraph">托管服务:使用<code>BackgroundService</code>实现长时间运行的消费者服务。</p>
</li>
<li>
<p class="ds-markdown-paragraph">领域驱动设计:采用分层架构,分离关注点。</p>
</li>
<li>
<p class="ds-markdown-paragraph">消息序列化:使用JSON序列化消息体。</p>
</li>
<li>
<p class="ds-markdown-paragraph">健康检查:集成RabbitMQ健康监控。</p>
</li>
<li>
<p class="ds-markdown-paragraph">错误处理:实现完善的错误处理和日志记录。</p>
</li>
<li>
<p class="ds-markdown-paragraph">配置管理:从配置文件读取连接字符串。</p>
</li>
</ol>
<p class="ds-markdown-paragraph">这个架构为构建生产级的微服务系统提供了坚实的基础。在下一章,我们将学习RabbitMQ的RPC模式。</p><br><br>
来源:https://www.cnblogs.com/jixingsuiyuan/p/19167628
頁:
[1]