一个高性能的 .NET MQTT 客户端与服务器库
<h2 id="前言">前言</h2><p>在物联网(IoT)蓬勃发展的今天,MQTT 协议已经成为设备通信的事实标准。无论是智能家居、工业自动化还是车联网,MQTT 都扮演着至关重要的角色。今天,我要为大家介绍一个完全使用 C# 实现的高性能 MQTT 库</p>
<p>这个库不仅提供了完整的 MQTT 客户端实现,还包含了一个功能齐全的 Broker 服务器,支持桥接、集群等企业级特性。</p>
<h2 id="核心特性">核心特性</h2>
<h3 id="协议支持">协议支持</h3>
<ul>
<li><strong>MQTT 3.1.1</strong> - 完整支持</li>
<li><strong>MQTT 5.0</strong> - 完整支持(包括用户属性、消息过期、主题别名等新特性)</li>
<li><strong>MQTT-SN</strong> - 基于 UDP 的轻量级 MQTT 变体,适合受限设备</li>
<li><strong>CoAP</strong> - 约束应用协议网关支持</li>
</ul>
<h3 id="性能特性">性能特性</h3>
<ul>
<li>高性能异步实现</li>
<li>零不必要的内存分配</li>
<li>缓冲区池技术</li>
<li>支持 10000+ 并发连接</li>
</ul>
<h3 id="企业级功能">企业级功能</h3>
<ul>
<li>Broker 桥接(多 Broker 消息同步)</li>
<li>集群支持(去中心化 P2P 架构)</li>
<li>灵活的认证与授权机制</li>
<li>TLS/SSL 加密传输</li>
<li>持久会话与离线消息存储</li>
</ul>
<h3 id="框架支持">框架支持</h3>
<ul>
<li>.NET 6.0</li>
<li>.NET 8.0</li>
<li>.NET 10.0</li>
</ul>
<hr>
<h2 id="技术实现">技术实现</h2>
<p>本项目采用了大量现代 .NET 高性能技术,下面详细介绍核心技术点。</p>
<h3 id="内存管理技术">内存管理技术</h3>
<h4 id="spant-和-memoryt---零拷贝处理">Span<T> 和 Memory<T> - 零拷贝处理</h4>
<p>项目使用 <code>ref struct</code> 实现的二进制读写器,完全在栈上分配,避免堆内存压力:</p>
<pre><code class="language-csharp">// 零拷贝的二进制读取器
public ref struct MqttBinaryReader
{
private readonly ReadOnlySpan<byte> _buffer;
private int _position;
// 零拷贝切片操作
public ReadOnlySpan<byte> ReadBytes(int count)
{
var span = _buffer.Slice(_position, count);
_position += count;
return span;
}
}
</code></pre>
<p><strong>技术优势</strong>:</p>
<ul>
<li><code>ref struct</code> 只能在栈上分配,无 GC 压力</li>
<li><code>ReadOnlySpan<byte></code> 支持零拷贝切片</li>
<li>避免大量字节数组复制操作</li>
</ul>
<h4 id="arraypoolt---缓冲区复用">ArrayPool<T> - 缓冲区复用</h4>
<p>使用共享内存池减少频繁的内存分配:</p>
<pre><code class="language-csharp">// 从共享池租借缓冲区
var buffer = ArrayPool<byte>.Shared.Rent(1024);
try
{
await stream.ReadAsync(buffer.AsMemory(0, length), cancellationToken);
// 处理数据...
}
finally
{
ArrayPool<byte>.Shared.Return(buffer);// 归还缓冲区
}
</code></pre>
<h4 id="stackalloc---小缓冲区栈分配">stackalloc - 小缓冲区栈分配</h4>
<p>对于小型临时缓冲区,直接在栈上分配:</p>
<pre><code class="language-csharp">// 4 字节的可变长度编码缓冲区,栈分配
Span<byte> remainingLengthBytes = stackalloc byte;
var size = EncodeRemainingLength(length, remainingLengthBytes);
</code></pre>
<h3 id="异步编程模型">异步编程模型</h3>
<h4 id="asyncawait--configureawait">async/await + ConfigureAwait</h4>
<p>所有 IO 操作均采用异步模式,并使用 <code>ConfigureAwait(false)</code> 优化:</p>
<pre><code class="language-csharp">public async Task<MqttConnectResult> ConnectAsync(CancellationToken cancellationToken = default)
{
// 建立 TCP 连接
await _tcpClient.ConnectAsync(host, port, cancellationToken).ConfigureAwait(false);
// TLS 握手
if (Options.UseTls)
{
await sslStream.AuthenticateAsClientAsync(sslOptions, cancellationToken).ConfigureAwait(false);
}
// 发送 CONNECT 报文
await SendPacketAsync(connectPacket, cancellationToken).ConfigureAwait(false);
}
</code></pre>
<h4 id="channelt---高性能事件队列">Channel<T> - 高性能事件队列</h4>
<p>Broker 使用有界通道实现非阻塞的事件分发:</p>
<pre><code class="language-csharp">public sealed class MqttBrokerEventDispatcher
{
private readonly Channel<BrokerEvent> _eventChannel;
public MqttBrokerEventDispatcher(int capacity = 10000)
{
// 有界通道,队列满时丢弃最旧事件
_eventChannel = Channel.CreateBounded<BrokerEvent>(new BoundedChannelOptions(capacity)
{
FullMode = BoundedChannelFullMode.DropOldest,
SingleReader = true,
SingleWriter = false
});
}
// 非阻塞事件发送
public void Dispatch<TEventArgs>(BrokerEventType type, TEventArgs args, EventHandler<TEventArgs>? handler)
{
_eventChannel.Writer.TryWrite(new BrokerEvent(type, args, handler));
}
}
</code></pre>
<h4 id="taskcompletionsource---请求响应模式">TaskCompletionSource - 请求/响应模式</h4>
<p>实现 QoS 1/2 的确认等待机制:</p>
<pre><code class="language-csharp">private readonly Dictionary<ushort, TaskCompletionSource<object?>> _pendingPackets = new();
private async Task<object?> WaitForPacketAsync(ushort packetId, CancellationToken cancellationToken)
{
var tcs = new TaskCompletionSource<object?>(TaskCreationOptions.RunContinuationsAsynchronously);
_pendingPackets = tcs;
using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
cts.CancelAfter(TimeSpan.FromSeconds(30));// 30 秒超时
using var registration = cts.Token.Register(() => tcs.TrySetCanceled());
return await tcs.Task.ConfigureAwait(false);
}
</code></pre>
<h4 id="semaphoreslim---发送同步">SemaphoreSlim - 发送同步</h4>
<p>确保报文发送的串行化:</p>
<pre><code class="language-csharp">private readonly SemaphoreSlim _sendLock = new(1, 1);
private async Task SendPacketBytesAsync(byte[] packet, CancellationToken cancellationToken)
{
await _sendLock.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
await _stream.WriteAsync(packet.AsMemory(), cancellationToken).ConfigureAwait(false);
await _stream.FlushAsync(cancellationToken).ConfigureAwait(false);
}
finally
{
_sendLock.Release();
}
}
</code></pre>
<h3 id="编译器优化">编译器优化</h3>
<h4 id="methodimpl-特性">MethodImpl 特性</h4>
<p>针对不同场景使用合适的编译器优化指令:</p>
<pre><code class="language-csharp">// 强制内联 - 用于频繁调用的短方法
public ushort ReadUInt16()
{
var value = (ushort)((_buffer << 8) | _buffer);
_position += 2;
return value;
}
// 最积极的优化 - 用于热路径
private static async Task<int> DecodeRemainingLengthAsync(Stream stream, CancellationToken ct)
{
// 可变长度解码实现...
}
// 禁止内联 - 避免异常处理代码膨胀热路径
private void ThrowIfDisposed()
{
if (_disposed) throw new ObjectDisposedException(nameof(MqttClient));
}
</code></pre>
<h3 id="网络编程">网络编程</h3>
<h4 id="多层传输抽象">多层传输抽象</h4>
<p>支持 TCP、UDP 等多种传输方式:</p>
<pre><code class="language-csharp">public interface ITransportConnection : IAsyncDisposable
{
string ConnectionId { get; }
TransportType TransportType { get; }
EndPoint? RemoteEndPoint { get; }
bool IsConnected { get; }
ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default);
ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default);
ValueTask FlushAsync(CancellationToken cancellationToken = default);
}
</code></pre>
<h4 id="tlsssl-支持">TLS/SSL 支持</h4>
<p>使用 SslStream 实现加密传输,支持 TLS 1.2 和 TLS 1.3:</p>
<pre><code class="language-csharp">var sslOptions = new SslClientAuthenticationOptions
{
TargetHost = Options.Host,
EnabledSslProtocols = SslProtocols.Tls12 | SslProtocols.Tls13,
ClientCertificates = Options.ClientCertificate != null
? new X509CertificateCollection { Options.ClientCertificate }
: null
};
await sslStream.AuthenticateAsClientAsync(sslOptions, cancellationToken);
</code></pre>
<h3 id="协议序列化">协议序列化</h3>
<h4 id="工厂模式--延迟初始化">工厂模式 + 延迟初始化</h4>
<p>协议处理器采用单例 + 延迟初始化模式:</p>
<pre><code class="language-csharp">public static class MqttProtocolHandlerFactory
{
private static readonly Lazy<IMqttProtocolHandler> _v311Handler =
new(() => new V311ProtocolHandler());
private static readonly Lazy<IMqttProtocolHandler> _v500Handler =
new(() => new V500ProtocolHandler());
public static IMqttProtocolHandler GetHandler(MqttProtocolVersion version)
{
return version switch
{
MqttProtocolVersion.V311 => _v311Handler.Value,
MqttProtocolVersion.V500 => _v500Handler.Value,
_ => throw new NotSupportedException()
};
}
}
</code></pre>
<h4 id="可变长度整数编码">可变长度整数编码</h4>
<p>MQTT 协议特有的可变长度编码,1-4 字节可表示 0 到 268,435,455:</p>
<pre><code class="language-csharp">public uint ReadVariableByteInteger()
{
uint value = 0;
int multiplier = 1;
byte encodedByte;
do
{
encodedByte = _buffer;
value += (uint)((encodedByte & 0x7F) * multiplier);
multiplier *= 128;
} while ((encodedByte & 0x80) != 0);
return value;
}
</code></pre>
<h3 id="并发数据结构">并发数据结构</h3>
<h4 id="concurrentdictionary---线程安全集合">ConcurrentDictionary - 线程安全集合</h4>
<p>用于管理客户端会话和订阅:</p>
<pre><code class="language-csharp">private readonly ConcurrentDictionary<string, MqttClientSession> _sessions = new();
private readonly ConcurrentDictionary<string, MqttApplicationMessage> _retainedMessages = new();
public int ConnectedClients => _sessions.Count;
public IEnumerable<MqttClientSession> Sessions => _sessions.Values;
</code></pre>
<h3 id="设计模式应用">设计模式应用</h3>
<table>
<thead>
<tr>
<th>模式</th>
<th>应用场景</th>
<th>示例</th>
</tr>
</thead>
<tbody>
<tr>
<td><strong>工厂模式</strong></td>
<td>协议处理器创建</td>
<td><code>MqttProtocolHandlerFactory</code></td>
</tr>
<tr>
<td><strong>策略模式</strong></td>
<td>不同协议版本实现</td>
<td><code>V311ProtocolHandler</code> / <code>V500ProtocolHandler</code></td>
</tr>
<tr>
<td><strong>建造者模式</strong></td>
<td>报文构建</td>
<td><code>IPublishPacketBuilder</code> / <code>IConnectPacketBuilder</code></td>
</tr>
<tr>
<td><strong>观察者模式</strong></td>
<td>事件系统</td>
<td><code>MessageReceived</code> / <code>ClientConnected</code></td>
</tr>
<tr>
<td><strong>装饰器模式</strong></td>
<td>传输层 TLS</td>
<td><code>SslStream</code> 装饰 <code>NetworkStream</code></td>
</tr>
<tr>
<td><strong>单例模式</strong></td>
<td>协议处理器缓存</td>
<td>全局共享的处理器实例</td>
</tr>
</tbody>
</table>
<h3 id="技术栈总结">技术栈总结</h3>
<table>
<thead>
<tr>
<th>类别</th>
<th>技术</th>
<th>作用</th>
</tr>
</thead>
<tbody>
<tr>
<td><strong>内存管理</strong></td>
<td><code>Span<T></code>, <code>Memory<T></code>, <code>ref struct</code>, <code>ArrayPool<T></code>, <code>stackalloc</code></td>
<td>零拷贝、栈分配、缓冲区复用</td>
</tr>
<tr>
<td><strong>异步编程</strong></td>
<td><code>async/await</code>, <code>Channel<T></code>, <code>TaskCompletionSource</code>, <code>SemaphoreSlim</code></td>
<td>高效并发、非阻塞事件处理</td>
</tr>
<tr>
<td><strong>编译优化</strong></td>
<td><code>AggressiveInlining</code>, <code>AggressiveOptimization</code>, <code>NoInlining</code></td>
<td>JIT 编译器优化提示</td>
</tr>
<tr>
<td><strong>网络层</strong></td>
<td><code>TcpClient</code>, <code>TcpListener</code>, <code>SslStream</code>, 传输抽象</td>
<td>多协议支持、安全传输</td>
</tr>
<tr>
<td><strong>并发集合</strong></td>
<td><code>ConcurrentDictionary</code>, <code>ConcurrentQueue</code></td>
<td>线程安全的数据结构</td>
</tr>
<tr>
<td><strong>序列化</strong></td>
<td>自定义二进制读写器、可变长度编码</td>
<td>高效的协议解析</td>
</tr>
</tbody>
</table>
<hr>
<h2 id="性能优化建议">性能优化建议</h2>
<h3 id="客户端优化">客户端优化</h3>
<ol>
<li><strong>选择合适的 QoS</strong>:大多数场景 QoS 1 就足够了,QoS 2 开销较大</li>
<li><strong>批量发送</strong>:如果有大量消息,考虑合并后发送</li>
<li><strong>合理设置 KeepAlive</strong>:根据网络环境调整,一般 60 秒即可</li>
<li><strong>使用持久会话</strong>:如果需要接收离线消息,设置 <code>CleanSession = false</code></li>
</ol>
<h3 id="broker-优化">Broker 优化</h3>
<ol>
<li><strong>调整最大连接数</strong>:根据服务器性能设置 <code>MaxConnections</code></li>
<li><strong>限制消息大小</strong>:设置 <code>MaxMessageSize</code> 防止恶意大消息</li>
<li><strong>离线消息限制</strong>:设置 <code>MaxOfflineMessagesPerClient</code> 防止内存溢出</li>
<li><strong>使用集群</strong>:高可用场景使用集群部署</li>
</ol>
<hr>
<h2 id="客户端使用指南">客户端使用指南</h2>
<h3 id="基础连接">基础连接</h3>
<pre><code class="language-csharp">using System.Net.MQTT;
// 配置客户端选项
var options = new MqttClientOptions
{
Host = "localhost",
Port = 1883,
ClientId = "my-iot-device",
CleanSession = true
};
// 创建客户端
using var client = new MqttClient(options);
// 连接到 Broker
var result = await client.ConnectAsync();
if (result.IsSuccess)
{
Console.WriteLine("连接成功!");
}
</code></pre>
<h3 id="订阅主题">订阅主题</h3>
<pre><code class="language-csharp">// 订阅单个主题
await client.SubscribeAsync("sensors/temperature", MqttQualityOfService.AtLeastOnce);
// 使用通配符订阅多个主题
await client.SubscribeAsync("sensors/#", MqttQualityOfService.AtLeastOnce);// 多级通配符
await client.SubscribeAsync("sensors/+/status", MqttQualityOfService.AtMostOnce);// 单级通配符
</code></pre>
<h3 id="接收消息">接收消息</h3>
<pre><code class="language-csharp">client.MessageReceived += (sender, e) =>
{
Console.WriteLine($"收到消息:");
Console.WriteLine($"主题: {e.Message.Topic}");
Console.WriteLine($"内容: {e.Message.PayloadAsString}");
Console.WriteLine($"QoS: {e.Message.QualityOfService}");
};
</code></pre>
<h3 id="发布消息">发布消息</h3>
<pre><code class="language-csharp">// 简单发布
await client.PublishAsync("sensors/temperature", "25.5");
// 指定 QoS 发布
await client.PublishAsync("sensors/humidity", "60%", MqttQualityOfService.AtLeastOnce);
// 发布保留消息
await client.PublishAsync("device/status", "online", MqttQualityOfService.AtLeastOnce, retain: true);
// 使用完整的消息对象
var message = MqttApplicationMessage.Create(
topic: "sensors/data",
payload: "{\"temp\": 25.5, \"humidity\": 60}",
qos: MqttQualityOfService.ExactlyOnce,
retain: false
);
await client.PublishAsync(message);
</code></pre>
<h3 id="遗嘱消息last-will">遗嘱消息(Last Will)</h3>
<p>遗嘱消息会在客户端异常断开时自动发布:</p>
<pre><code class="language-csharp">var options = new MqttClientOptions
{
Host = "localhost",
ClientId = "my-device",
WillMessage = MqttApplicationMessage.Create(
topic: "devices/my-device/status",
payload: "offline",
qos: MqttQualityOfService.AtLeastOnce,
retain: true
)
};
</code></pre>
<h3 id="tls-加密连接">TLS 加密连接</h3>
<pre><code class="language-csharp">var options = new MqttClientOptions
{
Host = "secure-broker.example.com",
Port = 8883,
UseTls = true,
// 可选:客户端证书
ClientCertificate = new X509Certificate2("client.pfx", "password")
};
</code></pre>
<h3 id="自动重连">自动重连</h3>
<pre><code class="language-csharp">var options = new MqttClientOptions
{
Host = "localhost",
AutoReconnect = true,
ReconnectDelayMs = 5000// 5秒后重连
};
client.Connected += (s, e) => Console.WriteLine("已连接");
client.Disconnected += (s, e) => Console.WriteLine("连接断开,正在重连...");
</code></pre>
<h3 id="完整客户端示例">完整客户端示例</h3>
<pre><code class="language-csharp">using System.Net.MQTT;
var options = new MqttClientOptions
{
Host = "localhost",
Port = 1883,
ClientId = $"client-{Guid.NewGuid():N}",
Username = "user",
Password = "password",
CleanSession = true,
KeepAliveSeconds = 60,
AutoReconnect = true
};
using var client = new MqttClient(options);
// 设置事件处理
client.Connected += (s, e) => Console.WriteLine("[事件] 已连接到 Broker");
client.Disconnected += (s, e) => Console.WriteLine("[事件] 连接已断开");
client.MessageReceived += (s, e) =>
{
Console.WriteLine($"[消息] {e.Message.Topic}: {e.Message.PayloadAsString}");
};
// 连接
var result = await client.ConnectAsync();
if (!result.IsSuccess)
{
Console.WriteLine($"连接失败: {result.ReasonCode}");
return;
}
// 订阅
await client.SubscribeAsync("test/#", MqttQualityOfService.AtLeastOnce);
// 发布测试消息
for (int i = 0; i < 10; i++)
{
await client.PublishAsync("test/counter", i.ToString());
await Task.Delay(1000);
}
// 断开连接
await client.DisconnectAsync();
</code></pre>
<hr>
<h2 id="服务器broker使用指南">服务器(Broker)使用指南</h2>
<h3 id="启动基础-broker">启动基础 Broker</h3>
<pre><code class="language-csharp">using System.Net.MQTT.Broker;
var options = new MqttBrokerOptions
{
Port = 1883,
AllowAnonymous = true,
EnableRetainedMessages = true,
MaxConnections = 10000
};
using var broker = new MqttBroker(options);
// 启动服务器
await broker.StartAsync();
Console.WriteLine("MQTT Broker 已启动,监听端口 1883");
// 保持运行
await Task.Delay(Timeout.Infinite);
// 停止服务器
await broker.StopAsync();
</code></pre>
<h3 id="配置认证">配置认证</h3>
<pre><code class="language-csharp">// 使用简单认证器
broker.Authenticator = new SimpleAuthenticator()
.AddUser("admin", "admin123")
.AddUser("device1", "device1pass")
.AddUser("device2", "device2pass");
var options = new MqttBrokerOptions
{
Port = 1883,
AllowAnonymous = false// 禁用匿名访问
};
</code></pre>
<h3 id="自定义认证器">自定义认证器</h3>
<pre><code class="language-csharp">public class MyAuthenticator : IMqttAuthenticator
{
public Task<MqttAuthenticationResult> AuthenticateAsync(
MqttAuthenticationContext context,
CancellationToken cancellationToken)
{
// 从数据库验证用户
if (ValidateFromDatabase(context.Username, context.Password))
{
return Task.FromResult(MqttAuthenticationResult.Success());
}
return Task.FromResult(MqttAuthenticationResult.Failure(
MqttConnectReasonCode.BadUserNameOrPassword));
}
}
broker.Authenticator = new MyAuthenticator();
</code></pre>
<h3 id="broker-事件处理">Broker 事件处理</h3>
<pre><code class="language-csharp">// 客户端连接事件
broker.ClientConnected += (s, e) =>
{
Console.WriteLine($"[连接] 客户端 {e.Session.ClientId} 已连接");
Console.WriteLine($"地址: {e.Session.RemoteEndpoint}");
Console.WriteLine($"当前连接数: {broker.ConnectedClients}");
};
// 客户端断开事件
broker.ClientDisconnected += (s, e) =>
{
Console.WriteLine($"[断开] 客户端 {e.Session.ClientId} 已断开");
};
// 客户端订阅事件
broker.ClientSubscribed += (s, e) =>
{
Console.WriteLine($"[订阅] {e.Session.ClientId} 订阅了 {e.TopicFilter}");
};
// 消息发布事件
broker.MessagePublished += (s, e) =>
{
Console.WriteLine($"[消息] {e.Message.Topic}: {e.Message.PayloadAsString}");
Console.WriteLine($"来自: {e.SourceClientId}");
};
// 消息发布前拦截(可以阻止消息发布)
broker.MessagePublishing += (s, e) =>
{
// 检查敏感主题
if (e.Message.Topic.StartsWith("admin/") && e.SourceClientId != "admin")
{
e.Cancel = true;// 阻止非管理员发布到 admin 主题
}
};
</code></pre>
<h3 id="tls-配置">TLS 配置</h3>
<pre><code class="language-csharp">var options = new MqttBrokerOptions
{
// 普通端口
Port = 1883,
// TLS 端口
UseTls = true,
TlsPort = 8883,
ServerCertificate = new X509Certificate2("server.pfx", "password"),
RequireClientCertificate = false
};
</code></pre>
<hr>
<h2 id="高级功能">高级功能</h2>
<h3 id="broker-桥接">Broker 桥接</h3>
<p>桥接功能允许将多个 Broker 连接起来,实现消息的跨 Broker 同步。</p>
<pre><code class="language-csharp">var broker = new MqttBroker(new MqttBrokerOptions { Port = 2883 });
// 添加桥接到父 Broker
var bridge = broker.AddBridge(new MqttBridgeOptions
{
Name = "parent-bridge",
RemoteHost = "parent-broker.example.com",
RemotePort = 1883,
ClientId = "bridge-client-1",
// 上行规则:本地消息 -> 远程 Broker
UpstreamRules =
{
new MqttBridgeRule { LocalTopicFilter = "sensor/#", Enabled = true },
new MqttBridgeRule { LocalTopicFilter = "device/+/data", Enabled = true }
},
// 下行规则:远程消息 -> 本地
DownstreamRules =
{
new MqttBridgeRule { LocalTopicFilter = "commands/#", Enabled = true },
new MqttBridgeRule { LocalTopicFilter = "config/#", Enabled = true }
}
});
// 桥接事件
bridge.Connected += (s, e) => Console.WriteLine("桥接已连接");
bridge.MessageForwarded += (s, e) =>
{
var direction = e.Direction == BridgeDirection.Upstream ? "上行" : "下行";
Console.WriteLine($"[桥接-{direction}] {e.OriginalTopic}");
};
// 获取统计信息
var stats = bridge.GetStatistics();
Console.WriteLine($"上行消息: {stats.UpstreamMessageCount}");
Console.WriteLine($"下行消息: {stats.DownstreamMessageCount}");
</code></pre>
<h3 id="集群部署">集群部署</h3>
<p>集群功能实现了去中心化的 P2P 架构,任何节点都可以独立运行,支持自动故障检测和恢复。</p>
<pre><code class="language-csharp">var broker = new MqttBroker(new MqttBrokerOptions { Port = 1883 });
// 启用集群
broker.EnableCluster(new MqttClusterOptions
{
NodeId = "node-1",
ClusterName = "my-cluster",
ClusterPort = 11883,
SeedNodes = new List<string>
{
"node2.example.com:11883",
"node3.example.com:11883"
},
HeartbeatIntervalMs = 5000,
NodeTimeoutMs = 15000,
EnableDeduplication = true// 防止消息重复
});
// 集群事件
broker.Cluster!.PeerJoined += (s, e) =>
Console.WriteLine($"节点加入: {e.Peer.NodeId}");
broker.Cluster!.PeerLeft += (s, e) =>
Console.WriteLine($"节点离开: {e.Peer.NodeId}");
broker.Cluster!.MessageForwarded += (s, e) =>
Console.WriteLine($"消息转发: {e.Topic}");
await broker.StartAsync();
</code></pre>
<h3 id="mqtt-sn-网关">MQTT-SN 网关</h3>
<p>MQTT-SN 是基于 UDP 的轻量级协议,适合资源受限的嵌入式设备:</p>
<pre><code class="language-csharp">var options = new MqttBrokerOptions
{
Port = 1883,
EnableMqttSn = true,
MqttSnPort = 1885
};
</code></pre>
<h3 id="coap-网关">CoAP 网关</h3>
<p>CoAP 网关允许 CoAP 设备与 MQTT 生态系统互通:</p>
<pre><code class="language-csharp">var options = new MqttBrokerOptions
{
Port = 1883,
EnableCoAP = true,
CoapPort = 5683,
CoapMqttPrefix = "mqtt"
};
// CoAP 客户端可以通过以下方式访问 MQTT 主题:
// GET coap://broker:5683/mqtt/sensors/temperature
// PUT coap://broker:5683/mqtt/sensors/temperature (发布消息)
</code></pre>
<hr>
<h2 id="qos-服务质量">QoS 服务质量</h2>
<p>MQTT 定义了三种服务质量级别:</p>
<table>
<thead>
<tr>
<th>QoS</th>
<th>名称</th>
<th>说明</th>
<th>适用场景</th>
</tr>
</thead>
<tbody>
<tr>
<td>0</td>
<td>At Most Once</td>
<td>最多一次,不保证送达</td>
<td>传感器数据,丢失可接受</td>
</tr>
<tr>
<td>1</td>
<td>At Least Once</td>
<td>至少一次,可能重复</td>
<td>重要数据,可处理重复</td>
</tr>
<tr>
<td>2</td>
<td>Exactly Once</td>
<td>恰好一次,保证送达且不重复</td>
<td>计费、订单等关键数据</td>
</tr>
</tbody>
</table>
<pre><code class="language-csharp">// QoS 0 - 最多一次
await client.PublishAsync("sensor/temp", "25", MqttQualityOfService.AtMostOnce);
// QoS 1 - 至少一次
await client.PublishAsync("alert/fire", "detected", MqttQualityOfService.AtLeastOnce);
// QoS 2 - 恰好一次
await client.PublishAsync("order/create", orderJson, MqttQualityOfService.ExactlyOnce);
</code></pre>
<hr>
<h2 id="主题通配符">主题通配符</h2>
<table>
<thead>
<tr>
<th>通配符</th>
<th>说明</th>
<th>示例</th>
</tr>
</thead>
<tbody>
<tr>
<td><code>+</code></td>
<td>匹配单个层级</td>
<td><code>sensor/+/temp</code> 匹配 <code>sensor/room1/temp</code></td>
</tr>
<tr>
<td><code>#</code></td>
<td>匹配多个层级</td>
<td><code>sensor/#</code> 匹配 <code>sensor/room1/temp</code> 和 <code>sensor/room1/humidity</code></td>
</tr>
</tbody>
</table>
<pre><code class="language-csharp">// 订阅所有房间的温度
await client.SubscribeAsync("sensor/+/temperature", MqttQualityOfService.AtLeastOnce);
// 订阅所有传感器数据
await client.SubscribeAsync("sensor/#", MqttQualityOfService.AtLeastOnce);
</code></pre>
<hr>
<h2 id="mqtt-50-新特性">MQTT 5.0 新特性</h2>
<p>如果你使用 MQTT 5.0 协议,可以利用以下新特性:</p>
<pre><code class="language-csharp">var options = new MqttClientOptions
{
Host = "localhost",
ProtocolVersion = MqttProtocolVersion.V500// 使用 MQTT 5.0
};
// 创建带有 MQTT 5.0 属性的消息
var message = MqttApplicationMessage.CreateWithProperties(
topic: "request/data",
payload: "{\"query\": \"temperature\"}",
qos: MqttQualityOfService.AtLeastOnce,
retain: false,
// MQTT 5.0 特有属性
responseTopic: "response/client1", // 响应主题
correlationData: Encoding.UTF8.GetBytes("req-123"),// 关联数据
messageExpiryInterval: 60, // 消息60秒后过期
contentType: "application/json", // 内容类型
userProperties: new List<MqttUserProperty> // 用户自定义属性
{
new MqttUserProperty("version", "1.0"),
new MqttUserProperty("source", "sensor-hub")
}
);
await client.PublishAsync(message);
</code></pre>
<hr>
<h2 id="总结">总结</h2>
<p>是一个功能完整、性能优秀的 .NET MQTT 库,具有以下优势:</p>
<ul>
<li><strong>完整的协议支持</strong>:MQTT 3.1.1、MQTT 5.0、MQTT-SN、CoAP 全覆盖</li>
<li><strong>高性能设计</strong>:异步 IO、零分配、缓冲区池</li>
<li><strong>企业级特性</strong>:桥接、集群、认证授权</li>
<li><strong>易于使用</strong>:简洁的 API 设计,丰富的示例代码</li>
<li><strong>现代化</strong>:支持最新的 .NET 版本</li>
</ul>
<p>无论你是构建 IoT 平台、实现设备通信,还是搭建消息中间件,这个库都能满足你的需求。</p>
<h2 id="相关链接">相关链接</h2>
<p>源码地址:https://github.com/hnlyf1688/mqtt</p>
</div>
<div id="MySignature" role="contentinfo">
中国.NET协会(http://www.dotnet.org.cn)<br/>
腾讯企鹅群:45132984<img border="0" src="http://pub.idqqimg.com/wpa/images/group.png" alt="中国.NET协会" title="中国.NET协会"><br/>
博客园地址:http://http://www.cnblogs.com/dotnet-org-cn<br/>
国内唯一一个以非盈利的.NET协会,致力打造国内具有权威性、价值性的.NET协会。<br><br>
来源:https://www.cnblogs.com/dotnet-org-cn/p/19473369
頁:
[1]