星星港湾 發表於 2026-1-12 18:10:00

一个高性能的 .NET MQTT 客户端与服务器库

<h2 id="前言">前言</h2>
<p>在物联网(IoT)蓬勃发展的今天,MQTT 协议已经成为设备通信的事实标准。无论是智能家居、工业自动化还是车联网,MQTT 都扮演着至关重要的角色。今天,我要为大家介绍一个完全使用 C# 实现的高性能 MQTT 库</p>
<p>这个库不仅提供了完整的 MQTT 客户端实现,还包含了一个功能齐全的 Broker 服务器,支持桥接、集群等企业级特性。</p>
<h2 id="核心特性">核心特性</h2>
<h3 id="协议支持">协议支持</h3>
<ul>
<li><strong>MQTT 3.1.1</strong> - 完整支持</li>
<li><strong>MQTT 5.0</strong> - 完整支持(包括用户属性、消息过期、主题别名等新特性)</li>
<li><strong>MQTT-SN</strong> - 基于 UDP 的轻量级 MQTT 变体,适合受限设备</li>
<li><strong>CoAP</strong> - 约束应用协议网关支持</li>
</ul>
<h3 id="性能特性">性能特性</h3>
<ul>
<li>高性能异步实现</li>
<li>零不必要的内存分配</li>
<li>缓冲区池技术</li>
<li>支持 10000+ 并发连接</li>
</ul>
<h3 id="企业级功能">企业级功能</h3>
<ul>
<li>Broker 桥接(多 Broker 消息同步)</li>
<li>集群支持(去中心化 P2P 架构)</li>
<li>灵活的认证与授权机制</li>
<li>TLS/SSL 加密传输</li>
<li>持久会话与离线消息存储</li>
</ul>
<h3 id="框架支持">框架支持</h3>
<ul>
<li>.NET 6.0</li>
<li>.NET 8.0</li>
<li>.NET 10.0</li>
</ul>
<hr>
<h2 id="技术实现">技术实现</h2>
<p>本项目采用了大量现代 .NET 高性能技术,下面详细介绍核心技术点。</p>
<h3 id="内存管理技术">内存管理技术</h3>
<h4 id="spant-和-memoryt---零拷贝处理">Span&lt;T&gt; 和 Memory&lt;T&gt; - 零拷贝处理</h4>
<p>项目使用 <code>ref struct</code> 实现的二进制读写器,完全在栈上分配,避免堆内存压力:</p>
<pre><code class="language-csharp">// 零拷贝的二进制读取器
public ref struct MqttBinaryReader
{
    private readonly ReadOnlySpan&lt;byte&gt; _buffer;
    private int _position;

    // 零拷贝切片操作
   
    public ReadOnlySpan&lt;byte&gt; ReadBytes(int count)
    {
      var span = _buffer.Slice(_position, count);
      _position += count;
      return span;
    }
}
</code></pre>
<p><strong>技术优势</strong>:</p>
<ul>
<li><code>ref struct</code> 只能在栈上分配,无 GC 压力</li>
<li><code>ReadOnlySpan&lt;byte&gt;</code> 支持零拷贝切片</li>
<li>避免大量字节数组复制操作</li>
</ul>
<h4 id="arraypoolt---缓冲区复用">ArrayPool&lt;T&gt; - 缓冲区复用</h4>
<p>使用共享内存池减少频繁的内存分配:</p>
<pre><code class="language-csharp">// 从共享池租借缓冲区
var buffer = ArrayPool&lt;byte&gt;.Shared.Rent(1024);
try
{
    await stream.ReadAsync(buffer.AsMemory(0, length), cancellationToken);
    // 处理数据...
}
finally
{
    ArrayPool&lt;byte&gt;.Shared.Return(buffer);// 归还缓冲区
}
</code></pre>
<h4 id="stackalloc---小缓冲区栈分配">stackalloc - 小缓冲区栈分配</h4>
<p>对于小型临时缓冲区,直接在栈上分配:</p>
<pre><code class="language-csharp">// 4 字节的可变长度编码缓冲区,栈分配
Span&lt;byte&gt; remainingLengthBytes = stackalloc byte;
var size = EncodeRemainingLength(length, remainingLengthBytes);
</code></pre>
<h3 id="异步编程模型">异步编程模型</h3>
<h4 id="asyncawait--configureawait">async/await + ConfigureAwait</h4>
<p>所有 IO 操作均采用异步模式,并使用 <code>ConfigureAwait(false)</code> 优化:</p>
<pre><code class="language-csharp">public async Task&lt;MqttConnectResult&gt; ConnectAsync(CancellationToken cancellationToken = default)
{
    // 建立 TCP 连接
    await _tcpClient.ConnectAsync(host, port, cancellationToken).ConfigureAwait(false);

    // TLS 握手
    if (Options.UseTls)
    {
      await sslStream.AuthenticateAsClientAsync(sslOptions, cancellationToken).ConfigureAwait(false);
    }

    // 发送 CONNECT 报文
    await SendPacketAsync(connectPacket, cancellationToken).ConfigureAwait(false);
}
</code></pre>
<h4 id="channelt---高性能事件队列">Channel&lt;T&gt; - 高性能事件队列</h4>
<p>Broker 使用有界通道实现非阻塞的事件分发:</p>
<pre><code class="language-csharp">public sealed class MqttBrokerEventDispatcher
{
    private readonly Channel&lt;BrokerEvent&gt; _eventChannel;

    public MqttBrokerEventDispatcher(int capacity = 10000)
    {
      // 有界通道,队列满时丢弃最旧事件
      _eventChannel = Channel.CreateBounded&lt;BrokerEvent&gt;(new BoundedChannelOptions(capacity)
      {
            FullMode = BoundedChannelFullMode.DropOldest,
            SingleReader = true,
            SingleWriter = false
      });
    }

    // 非阻塞事件发送
    public void Dispatch&lt;TEventArgs&gt;(BrokerEventType type, TEventArgs args, EventHandler&lt;TEventArgs&gt;? handler)
    {
      _eventChannel.Writer.TryWrite(new BrokerEvent(type, args, handler));
    }
}
</code></pre>
<h4 id="taskcompletionsource---请求响应模式">TaskCompletionSource - 请求/响应模式</h4>
<p>实现 QoS 1/2 的确认等待机制:</p>
<pre><code class="language-csharp">private readonly Dictionary&lt;ushort, TaskCompletionSource&lt;object?&gt;&gt; _pendingPackets = new();

private async Task&lt;object?&gt; WaitForPacketAsync(ushort packetId, CancellationToken cancellationToken)
{
    var tcs = new TaskCompletionSource&lt;object?&gt;(TaskCreationOptions.RunContinuationsAsynchronously);
    _pendingPackets = tcs;

    using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
    cts.CancelAfter(TimeSpan.FromSeconds(30));// 30 秒超时

    using var registration = cts.Token.Register(() =&gt; tcs.TrySetCanceled());
    return await tcs.Task.ConfigureAwait(false);
}
</code></pre>
<h4 id="semaphoreslim---发送同步">SemaphoreSlim - 发送同步</h4>
<p>确保报文发送的串行化:</p>
<pre><code class="language-csharp">private readonly SemaphoreSlim _sendLock = new(1, 1);

private async Task SendPacketBytesAsync(byte[] packet, CancellationToken cancellationToken)
{
    await _sendLock.WaitAsync(cancellationToken).ConfigureAwait(false);
    try
    {
      await _stream.WriteAsync(packet.AsMemory(), cancellationToken).ConfigureAwait(false);
      await _stream.FlushAsync(cancellationToken).ConfigureAwait(false);
    }
    finally
    {
      _sendLock.Release();
    }
}
</code></pre>
<h3 id="编译器优化">编译器优化</h3>
<h4 id="methodimpl-特性">MethodImpl 特性</h4>
<p>针对不同场景使用合适的编译器优化指令:</p>
<pre><code class="language-csharp">// 强制内联 - 用于频繁调用的短方法

public ushort ReadUInt16()
{
    var value = (ushort)((_buffer &lt;&lt; 8) | _buffer);
    _position += 2;
    return value;
}

// 最积极的优化 - 用于热路径

private static async Task&lt;int&gt; DecodeRemainingLengthAsync(Stream stream, CancellationToken ct)
{
    // 可变长度解码实现...
}

// 禁止内联 - 避免异常处理代码膨胀热路径

private void ThrowIfDisposed()
{
    if (_disposed) throw new ObjectDisposedException(nameof(MqttClient));
}
</code></pre>
<h3 id="网络编程">网络编程</h3>
<h4 id="多层传输抽象">多层传输抽象</h4>
<p>支持 TCP、UDP 等多种传输方式:</p>
<pre><code class="language-csharp">public interface ITransportConnection : IAsyncDisposable
{
    string ConnectionId { get; }
    TransportType TransportType { get; }
    EndPoint? RemoteEndPoint { get; }
    bool IsConnected { get; }

    ValueTask&lt;int&gt; ReadAsync(Memory&lt;byte&gt; buffer, CancellationToken cancellationToken = default);
    ValueTask WriteAsync(ReadOnlyMemory&lt;byte&gt; buffer, CancellationToken cancellationToken = default);
    ValueTask FlushAsync(CancellationToken cancellationToken = default);
}
</code></pre>
<h4 id="tlsssl-支持">TLS/SSL 支持</h4>
<p>使用 SslStream 实现加密传输,支持 TLS 1.2 和 TLS 1.3:</p>
<pre><code class="language-csharp">var sslOptions = new SslClientAuthenticationOptions
{
    TargetHost = Options.Host,
    EnabledSslProtocols = SslProtocols.Tls12 | SslProtocols.Tls13,
    ClientCertificates = Options.ClientCertificate != null
      ? new X509CertificateCollection { Options.ClientCertificate }
      : null
};

await sslStream.AuthenticateAsClientAsync(sslOptions, cancellationToken);
</code></pre>
<h3 id="协议序列化">协议序列化</h3>
<h4 id="工厂模式--延迟初始化">工厂模式 + 延迟初始化</h4>
<p>协议处理器采用单例 + 延迟初始化模式:</p>
<pre><code class="language-csharp">public static class MqttProtocolHandlerFactory
{
    private static readonly Lazy&lt;IMqttProtocolHandler&gt; _v311Handler =
      new(() =&gt; new V311ProtocolHandler());
    private static readonly Lazy&lt;IMqttProtocolHandler&gt; _v500Handler =
      new(() =&gt; new V500ProtocolHandler());

    public static IMqttProtocolHandler GetHandler(MqttProtocolVersion version)
    {
      return version switch
      {
            MqttProtocolVersion.V311 =&gt; _v311Handler.Value,
            MqttProtocolVersion.V500 =&gt; _v500Handler.Value,
            _ =&gt; throw new NotSupportedException()
      };
    }
}
</code></pre>
<h4 id="可变长度整数编码">可变长度整数编码</h4>
<p>MQTT 协议特有的可变长度编码,1-4 字节可表示 0 到 268,435,455:</p>
<pre><code class="language-csharp">public uint ReadVariableByteInteger()
{
    uint value = 0;
    int multiplier = 1;
    byte encodedByte;

    do
    {
      encodedByte = _buffer;
      value += (uint)((encodedByte &amp; 0x7F) * multiplier);
      multiplier *= 128;
    } while ((encodedByte &amp; 0x80) != 0);

    return value;
}
</code></pre>
<h3 id="并发数据结构">并发数据结构</h3>
<h4 id="concurrentdictionary---线程安全集合">ConcurrentDictionary - 线程安全集合</h4>
<p>用于管理客户端会话和订阅:</p>
<pre><code class="language-csharp">private readonly ConcurrentDictionary&lt;string, MqttClientSession&gt; _sessions = new();
private readonly ConcurrentDictionary&lt;string, MqttApplicationMessage&gt; _retainedMessages = new();

public int ConnectedClients =&gt; _sessions.Count;
public IEnumerable&lt;MqttClientSession&gt; Sessions =&gt; _sessions.Values;
</code></pre>
<h3 id="设计模式应用">设计模式应用</h3>
<table>
<thead>
<tr>
<th>模式</th>
<th>应用场景</th>
<th>示例</th>
</tr>
</thead>
<tbody>
<tr>
<td><strong>工厂模式</strong></td>
<td>协议处理器创建</td>
<td><code>MqttProtocolHandlerFactory</code></td>
</tr>
<tr>
<td><strong>策略模式</strong></td>
<td>不同协议版本实现</td>
<td><code>V311ProtocolHandler</code> / <code>V500ProtocolHandler</code></td>
</tr>
<tr>
<td><strong>建造者模式</strong></td>
<td>报文构建</td>
<td><code>IPublishPacketBuilder</code> / <code>IConnectPacketBuilder</code></td>
</tr>
<tr>
<td><strong>观察者模式</strong></td>
<td>事件系统</td>
<td><code>MessageReceived</code> / <code>ClientConnected</code></td>
</tr>
<tr>
<td><strong>装饰器模式</strong></td>
<td>传输层 TLS</td>
<td><code>SslStream</code> 装饰 <code>NetworkStream</code></td>
</tr>
<tr>
<td><strong>单例模式</strong></td>
<td>协议处理器缓存</td>
<td>全局共享的处理器实例</td>
</tr>
</tbody>
</table>
<h3 id="技术栈总结">技术栈总结</h3>
<table>
<thead>
<tr>
<th>类别</th>
<th>技术</th>
<th>作用</th>
</tr>
</thead>
<tbody>
<tr>
<td><strong>内存管理</strong></td>
<td><code>Span&lt;T&gt;</code>, <code>Memory&lt;T&gt;</code>, <code>ref struct</code>, <code>ArrayPool&lt;T&gt;</code>, <code>stackalloc</code></td>
<td>零拷贝、栈分配、缓冲区复用</td>
</tr>
<tr>
<td><strong>异步编程</strong></td>
<td><code>async/await</code>, <code>Channel&lt;T&gt;</code>, <code>TaskCompletionSource</code>, <code>SemaphoreSlim</code></td>
<td>高效并发、非阻塞事件处理</td>
</tr>
<tr>
<td><strong>编译优化</strong></td>
<td><code>AggressiveInlining</code>, <code>AggressiveOptimization</code>, <code>NoInlining</code></td>
<td>JIT 编译器优化提示</td>
</tr>
<tr>
<td><strong>网络层</strong></td>
<td><code>TcpClient</code>, <code>TcpListener</code>, <code>SslStream</code>, 传输抽象</td>
<td>多协议支持、安全传输</td>
</tr>
<tr>
<td><strong>并发集合</strong></td>
<td><code>ConcurrentDictionary</code>, <code>ConcurrentQueue</code></td>
<td>线程安全的数据结构</td>
</tr>
<tr>
<td><strong>序列化</strong></td>
<td>自定义二进制读写器、可变长度编码</td>
<td>高效的协议解析</td>
</tr>
</tbody>
</table>
<hr>
<h2 id="性能优化建议">性能优化建议</h2>
<h3 id="客户端优化">客户端优化</h3>
<ol>
<li><strong>选择合适的 QoS</strong>:大多数场景 QoS 1 就足够了,QoS 2 开销较大</li>
<li><strong>批量发送</strong>:如果有大量消息,考虑合并后发送</li>
<li><strong>合理设置 KeepAlive</strong>:根据网络环境调整,一般 60 秒即可</li>
<li><strong>使用持久会话</strong>:如果需要接收离线消息,设置 <code>CleanSession = false</code></li>
</ol>
<h3 id="broker-优化">Broker 优化</h3>
<ol>
<li><strong>调整最大连接数</strong>:根据服务器性能设置 <code>MaxConnections</code></li>
<li><strong>限制消息大小</strong>:设置 <code>MaxMessageSize</code> 防止恶意大消息</li>
<li><strong>离线消息限制</strong>:设置 <code>MaxOfflineMessagesPerClient</code> 防止内存溢出</li>
<li><strong>使用集群</strong>:高可用场景使用集群部署</li>
</ol>
<hr>
<h2 id="客户端使用指南">客户端使用指南</h2>
<h3 id="基础连接">基础连接</h3>
<pre><code class="language-csharp">using System.Net.MQTT;

// 配置客户端选项
var options = new MqttClientOptions
{
    Host = "localhost",
    Port = 1883,
    ClientId = "my-iot-device",
    CleanSession = true
};

// 创建客户端
using var client = new MqttClient(options);

// 连接到 Broker
var result = await client.ConnectAsync();

if (result.IsSuccess)
{
    Console.WriteLine("连接成功!");
}
</code></pre>
<h3 id="订阅主题">订阅主题</h3>
<pre><code class="language-csharp">// 订阅单个主题
await client.SubscribeAsync("sensors/temperature", MqttQualityOfService.AtLeastOnce);

// 使用通配符订阅多个主题
await client.SubscribeAsync("sensors/#", MqttQualityOfService.AtLeastOnce);// 多级通配符
await client.SubscribeAsync("sensors/+/status", MqttQualityOfService.AtMostOnce);// 单级通配符
</code></pre>
<h3 id="接收消息">接收消息</h3>
<pre><code class="language-csharp">client.MessageReceived += (sender, e) =&gt;
{
    Console.WriteLine($"收到消息:");
    Console.WriteLine($"主题: {e.Message.Topic}");
    Console.WriteLine($"内容: {e.Message.PayloadAsString}");
    Console.WriteLine($"QoS: {e.Message.QualityOfService}");
};
</code></pre>
<h3 id="发布消息">发布消息</h3>
<pre><code class="language-csharp">// 简单发布
await client.PublishAsync("sensors/temperature", "25.5");

// 指定 QoS 发布
await client.PublishAsync("sensors/humidity", "60%", MqttQualityOfService.AtLeastOnce);

// 发布保留消息
await client.PublishAsync("device/status", "online", MqttQualityOfService.AtLeastOnce, retain: true);

// 使用完整的消息对象
var message = MqttApplicationMessage.Create(
    topic: "sensors/data",
    payload: "{\"temp\": 25.5, \"humidity\": 60}",
    qos: MqttQualityOfService.ExactlyOnce,
    retain: false
);
await client.PublishAsync(message);
</code></pre>
<h3 id="遗嘱消息last-will">遗嘱消息(Last Will)</h3>
<p>遗嘱消息会在客户端异常断开时自动发布:</p>
<pre><code class="language-csharp">var options = new MqttClientOptions
{
    Host = "localhost",
    ClientId = "my-device",
    WillMessage = MqttApplicationMessage.Create(
      topic: "devices/my-device/status",
      payload: "offline",
      qos: MqttQualityOfService.AtLeastOnce,
      retain: true
    )
};
</code></pre>
<h3 id="tls-加密连接">TLS 加密连接</h3>
<pre><code class="language-csharp">var options = new MqttClientOptions
{
    Host = "secure-broker.example.com",
    Port = 8883,
    UseTls = true,
    // 可选:客户端证书
    ClientCertificate = new X509Certificate2("client.pfx", "password")
};
</code></pre>
<h3 id="自动重连">自动重连</h3>
<pre><code class="language-csharp">var options = new MqttClientOptions
{
    Host = "localhost",
    AutoReconnect = true,
    ReconnectDelayMs = 5000// 5秒后重连
};

client.Connected += (s, e) =&gt; Console.WriteLine("已连接");
client.Disconnected += (s, e) =&gt; Console.WriteLine("连接断开,正在重连...");
</code></pre>
<h3 id="完整客户端示例">完整客户端示例</h3>
<pre><code class="language-csharp">using System.Net.MQTT;

var options = new MqttClientOptions
{
    Host = "localhost",
    Port = 1883,
    ClientId = $"client-{Guid.NewGuid():N}",
    Username = "user",
    Password = "password",
    CleanSession = true,
    KeepAliveSeconds = 60,
    AutoReconnect = true
};

using var client = new MqttClient(options);

// 设置事件处理
client.Connected += (s, e) =&gt; Console.WriteLine("[事件] 已连接到 Broker");
client.Disconnected += (s, e) =&gt; Console.WriteLine("[事件] 连接已断开");
client.MessageReceived += (s, e) =&gt;
{
    Console.WriteLine($"[消息] {e.Message.Topic}: {e.Message.PayloadAsString}");
};

// 连接
var result = await client.ConnectAsync();
if (!result.IsSuccess)
{
    Console.WriteLine($"连接失败: {result.ReasonCode}");
    return;
}

// 订阅
await client.SubscribeAsync("test/#", MqttQualityOfService.AtLeastOnce);

// 发布测试消息
for (int i = 0; i &lt; 10; i++)
{
    await client.PublishAsync("test/counter", i.ToString());
    await Task.Delay(1000);
}

// 断开连接
await client.DisconnectAsync();
</code></pre>
<hr>
<h2 id="服务器broker使用指南">服务器(Broker)使用指南</h2>
<h3 id="启动基础-broker">启动基础 Broker</h3>
<pre><code class="language-csharp">using System.Net.MQTT.Broker;

var options = new MqttBrokerOptions
{
    Port = 1883,
    AllowAnonymous = true,
    EnableRetainedMessages = true,
    MaxConnections = 10000
};

using var broker = new MqttBroker(options);

// 启动服务器
await broker.StartAsync();
Console.WriteLine("MQTT Broker 已启动,监听端口 1883");

// 保持运行
await Task.Delay(Timeout.Infinite);

// 停止服务器
await broker.StopAsync();
</code></pre>
<h3 id="配置认证">配置认证</h3>
<pre><code class="language-csharp">// 使用简单认证器
broker.Authenticator = new SimpleAuthenticator()
    .AddUser("admin", "admin123")
    .AddUser("device1", "device1pass")
    .AddUser("device2", "device2pass");

var options = new MqttBrokerOptions
{
    Port = 1883,
    AllowAnonymous = false// 禁用匿名访问
};
</code></pre>
<h3 id="自定义认证器">自定义认证器</h3>
<pre><code class="language-csharp">public class MyAuthenticator : IMqttAuthenticator
{
    public Task&lt;MqttAuthenticationResult&gt; AuthenticateAsync(
      MqttAuthenticationContext context,
      CancellationToken cancellationToken)
    {
      // 从数据库验证用户
      if (ValidateFromDatabase(context.Username, context.Password))
      {
            return Task.FromResult(MqttAuthenticationResult.Success());
      }

      return Task.FromResult(MqttAuthenticationResult.Failure(
            MqttConnectReasonCode.BadUserNameOrPassword));
    }
}

broker.Authenticator = new MyAuthenticator();
</code></pre>
<h3 id="broker-事件处理">Broker 事件处理</h3>
<pre><code class="language-csharp">// 客户端连接事件
broker.ClientConnected += (s, e) =&gt;
{
    Console.WriteLine($"[连接] 客户端 {e.Session.ClientId} 已连接");
    Console.WriteLine($"地址: {e.Session.RemoteEndpoint}");
    Console.WriteLine($"当前连接数: {broker.ConnectedClients}");
};

// 客户端断开事件
broker.ClientDisconnected += (s, e) =&gt;
{
    Console.WriteLine($"[断开] 客户端 {e.Session.ClientId} 已断开");
};

// 客户端订阅事件
broker.ClientSubscribed += (s, e) =&gt;
{
    Console.WriteLine($"[订阅] {e.Session.ClientId} 订阅了 {e.TopicFilter}");
};

// 消息发布事件
broker.MessagePublished += (s, e) =&gt;
{
    Console.WriteLine($"[消息] {e.Message.Topic}: {e.Message.PayloadAsString}");
    Console.WriteLine($"来自: {e.SourceClientId}");
};

// 消息发布前拦截(可以阻止消息发布)
broker.MessagePublishing += (s, e) =&gt;
{
    // 检查敏感主题
    if (e.Message.Topic.StartsWith("admin/") &amp;&amp; e.SourceClientId != "admin")
    {
      e.Cancel = true;// 阻止非管理员发布到 admin 主题
    }
};
</code></pre>
<h3 id="tls-配置">TLS 配置</h3>
<pre><code class="language-csharp">var options = new MqttBrokerOptions
{
    // 普通端口
    Port = 1883,

    // TLS 端口
    UseTls = true,
    TlsPort = 8883,
    ServerCertificate = new X509Certificate2("server.pfx", "password"),
    RequireClientCertificate = false
};
</code></pre>
<hr>
<h2 id="高级功能">高级功能</h2>
<h3 id="broker-桥接">Broker 桥接</h3>
<p>桥接功能允许将多个 Broker 连接起来,实现消息的跨 Broker 同步。</p>
<pre><code class="language-csharp">var broker = new MqttBroker(new MqttBrokerOptions { Port = 2883 });

// 添加桥接到父 Broker
var bridge = broker.AddBridge(new MqttBridgeOptions
{
    Name = "parent-bridge",
    RemoteHost = "parent-broker.example.com",
    RemotePort = 1883,
    ClientId = "bridge-client-1",

    // 上行规则:本地消息 -&gt; 远程 Broker
    UpstreamRules =
    {
      new MqttBridgeRule { LocalTopicFilter = "sensor/#", Enabled = true },
      new MqttBridgeRule { LocalTopicFilter = "device/+/data", Enabled = true }
    },

    // 下行规则:远程消息 -&gt; 本地
    DownstreamRules =
    {
      new MqttBridgeRule { LocalTopicFilter = "commands/#", Enabled = true },
      new MqttBridgeRule { LocalTopicFilter = "config/#", Enabled = true }
    }
});

// 桥接事件
bridge.Connected += (s, e) =&gt; Console.WriteLine("桥接已连接");
bridge.MessageForwarded += (s, e) =&gt;
{
    var direction = e.Direction == BridgeDirection.Upstream ? "上行" : "下行";
    Console.WriteLine($"[桥接-{direction}] {e.OriginalTopic}");
};

// 获取统计信息
var stats = bridge.GetStatistics();
Console.WriteLine($"上行消息: {stats.UpstreamMessageCount}");
Console.WriteLine($"下行消息: {stats.DownstreamMessageCount}");
</code></pre>
<h3 id="集群部署">集群部署</h3>
<p>集群功能实现了去中心化的 P2P 架构,任何节点都可以独立运行,支持自动故障检测和恢复。</p>
<pre><code class="language-csharp">var broker = new MqttBroker(new MqttBrokerOptions { Port = 1883 });

// 启用集群
broker.EnableCluster(new MqttClusterOptions
{
    NodeId = "node-1",
    ClusterName = "my-cluster",
    ClusterPort = 11883,
    SeedNodes = new List&lt;string&gt;
    {
      "node2.example.com:11883",
      "node3.example.com:11883"
    },
    HeartbeatIntervalMs = 5000,
    NodeTimeoutMs = 15000,
    EnableDeduplication = true// 防止消息重复
});

// 集群事件
broker.Cluster!.PeerJoined += (s, e) =&gt;
    Console.WriteLine($"节点加入: {e.Peer.NodeId}");

broker.Cluster!.PeerLeft += (s, e) =&gt;
    Console.WriteLine($"节点离开: {e.Peer.NodeId}");

broker.Cluster!.MessageForwarded += (s, e) =&gt;
    Console.WriteLine($"消息转发: {e.Topic}");

await broker.StartAsync();
</code></pre>
<h3 id="mqtt-sn-网关">MQTT-SN 网关</h3>
<p>MQTT-SN 是基于 UDP 的轻量级协议,适合资源受限的嵌入式设备:</p>
<pre><code class="language-csharp">var options = new MqttBrokerOptions
{
    Port = 1883,
    EnableMqttSn = true,
    MqttSnPort = 1885
};
</code></pre>
<h3 id="coap-网关">CoAP 网关</h3>
<p>CoAP 网关允许 CoAP 设备与 MQTT 生态系统互通:</p>
<pre><code class="language-csharp">var options = new MqttBrokerOptions
{
    Port = 1883,
    EnableCoAP = true,
    CoapPort = 5683,
    CoapMqttPrefix = "mqtt"
};

// CoAP 客户端可以通过以下方式访问 MQTT 主题:
// GET coap://broker:5683/mqtt/sensors/temperature
// PUT coap://broker:5683/mqtt/sensors/temperature (发布消息)
</code></pre>
<hr>
<h2 id="qos-服务质量">QoS 服务质量</h2>
<p>MQTT 定义了三种服务质量级别:</p>
<table>
<thead>
<tr>
<th>QoS</th>
<th>名称</th>
<th>说明</th>
<th>适用场景</th>
</tr>
</thead>
<tbody>
<tr>
<td>0</td>
<td>At Most Once</td>
<td>最多一次,不保证送达</td>
<td>传感器数据,丢失可接受</td>
</tr>
<tr>
<td>1</td>
<td>At Least Once</td>
<td>至少一次,可能重复</td>
<td>重要数据,可处理重复</td>
</tr>
<tr>
<td>2</td>
<td>Exactly Once</td>
<td>恰好一次,保证送达且不重复</td>
<td>计费、订单等关键数据</td>
</tr>
</tbody>
</table>
<pre><code class="language-csharp">// QoS 0 - 最多一次
await client.PublishAsync("sensor/temp", "25", MqttQualityOfService.AtMostOnce);

// QoS 1 - 至少一次
await client.PublishAsync("alert/fire", "detected", MqttQualityOfService.AtLeastOnce);

// QoS 2 - 恰好一次
await client.PublishAsync("order/create", orderJson, MqttQualityOfService.ExactlyOnce);
</code></pre>
<hr>
<h2 id="主题通配符">主题通配符</h2>
<table>
<thead>
<tr>
<th>通配符</th>
<th>说明</th>
<th>示例</th>
</tr>
</thead>
<tbody>
<tr>
<td><code>+</code></td>
<td>匹配单个层级</td>
<td><code>sensor/+/temp</code> 匹配 <code>sensor/room1/temp</code></td>
</tr>
<tr>
<td><code>#</code></td>
<td>匹配多个层级</td>
<td><code>sensor/#</code> 匹配 <code>sensor/room1/temp</code> 和 <code>sensor/room1/humidity</code></td>
</tr>
</tbody>
</table>
<pre><code class="language-csharp">// 订阅所有房间的温度
await client.SubscribeAsync("sensor/+/temperature", MqttQualityOfService.AtLeastOnce);

// 订阅所有传感器数据
await client.SubscribeAsync("sensor/#", MqttQualityOfService.AtLeastOnce);
</code></pre>
<hr>
<h2 id="mqtt-50-新特性">MQTT 5.0 新特性</h2>
<p>如果你使用 MQTT 5.0 协议,可以利用以下新特性:</p>
<pre><code class="language-csharp">var options = new MqttClientOptions
{
    Host = "localhost",
    ProtocolVersion = MqttProtocolVersion.V500// 使用 MQTT 5.0
};

// 创建带有 MQTT 5.0 属性的消息
var message = MqttApplicationMessage.CreateWithProperties(
    topic: "request/data",
    payload: "{\"query\": \"temperature\"}",
    qos: MqttQualityOfService.AtLeastOnce,
    retain: false,

    // MQTT 5.0 特有属性
    responseTopic: "response/client1",         // 响应主题
    correlationData: Encoding.UTF8.GetBytes("req-123"),// 关联数据
    messageExpiryInterval: 60,                   // 消息60秒后过期
    contentType: "application/json",             // 内容类型
    userProperties: new List&lt;MqttUserProperty&gt;   // 用户自定义属性
    {
      new MqttUserProperty("version", "1.0"),
      new MqttUserProperty("source", "sensor-hub")
    }
);

await client.PublishAsync(message);
</code></pre>
<hr>
<h2 id="总结">总结</h2>
<p>是一个功能完整、性能优秀的 .NET MQTT 库,具有以下优势:</p>
<ul>
<li><strong>完整的协议支持</strong>:MQTT 3.1.1、MQTT 5.0、MQTT-SN、CoAP 全覆盖</li>
<li><strong>高性能设计</strong>:异步 IO、零分配、缓冲区池</li>
<li><strong>企业级特性</strong>:桥接、集群、认证授权</li>
<li><strong>易于使用</strong>:简洁的 API 设计,丰富的示例代码</li>
<li><strong>现代化</strong>:支持最新的 .NET 版本</li>
</ul>
<p>无论你是构建 IoT 平台、实现设备通信,还是搭建消息中间件,这个库都能满足你的需求。</p>
<h2 id="相关链接">相关链接</h2>
<p>源码地址:https://github.com/hnlyf1688/mqtt</p>


</div>
<div id="MySignature" role="contentinfo">
    中国.NET协会(http://www.dotnet.org.cn)<br/>
腾讯企鹅群:45132984<img border="0" src="http://pub.idqqimg.com/wpa/images/group.png" alt="中国.NET协会" title="中国.NET协会"><br/>
博客园地址:http://http://www.cnblogs.com/dotnet-org-cn<br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;国内唯一一个以非盈利的.NET协会,致力打造国内具有权威性、价值性的.NET协会。<br><br>
来源:https://www.cnblogs.com/dotnet-org-cn/p/19473369
頁: [1]
查看完整版本: 一个高性能的 .NET MQTT 客户端与服务器库