宋小睿吖 發表於 2025-6-28 11:09:44

.net8创建tcp服务接收数据通过websocket广播的实现代码

<div id="navCategory"><h5 class="catalogue">目录</h5><ul class="first_class_ul"><li><a href="#_label0">注册TCP服务器 注册WebSocket中间件</a></li><li><a href="#_label1">tcp服务实现</a></li><li><a href="#_label2">WebSocket服务</a></li></ul></div><p class="maodian"><a name="_label0"></a></p><h2>注册TCP服务器 注册WebSocket中间件</h2>
<div class="jb51code"><pre class="brush:csharp;">using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Text.Json;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.SignalR.Client;
using Microsoft.AspNetCore.WebSockets;
var builder = WebApplication.CreateBuilder(args);
// 注册TCP服务
builder.Services.AddSingleton&lt;TcpServer&gt;();
builder.Services.AddHostedService(sp =&gt; sp.GetRequiredService&lt;TcpServer&gt;());
// 注册WebSocket中间件
builder.Services.AddSingleton&lt;WebSocketManager&gt;();
builder.WebHost.UseUrls("http://*:5000");//指定websocket端口号
var app = builder.Build();
// WebSocket中间件
app.UseWebSockets();
app.Use(async (context, next) =&gt;
{
    if (context.WebSockets.IsWebSocketRequest)
    {
      var webSocketManager = context.RequestServices.GetRequiredService&lt;WebSocketManagement&gt;();
      var webSocket = await context.WebSockets.AcceptWebSocketAsync();
      await webSocketManager.HandleWebSocketConnectionAsync(webSocket);
    }
    else
    {
      await next(context);
    }
});
app.Run();</pre></div>
<p class="maodian"><a name="_label1"></a></p><h2>tcp服务实现</h2>
<div class="jb51code"><pre class="brush:csharp;">public class TcpServer : BackgroundService
{
    private readonly WebSocketManagement _webSocketManager;
    private const int Port = 8081;
    private const int PacketSize = 14;
    private const int CheckSumSize = 2;
    private TcpListener? _listener;
    public TcpServer(WebSocketManagement webSocketManager)
    {
      _webSocketManager = webSocketManager;
    }
    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
      _listener = new TcpListener(IPAddress.Any, Port);
      _listener.Start();
      Console.WriteLine($"TCP server started on port {Port}");
      try
      {
            while (!stoppingToken.IsCancellationRequested)
            {
                try
                {
                  var client = await _listener.AcceptTcpClientAsync(stoppingToken);
                  _ = HandleClientAsync(client, stoppingToken);
                }
                catch (OperationCanceledException)
                {
                  // 服务停止时正常退出
                  break;
                }
            }
      }
      finally
      {
            _listener.Stop();
            Console.WriteLine("TCP server stopped");
      }
    }
    private async Task HandleClientAsync(TcpClient client, CancellationToken ct)
    {
      var clientId = Guid.NewGuid().ToString();
      Console.WriteLine($"Client connected: {clientId}");
      using (client)
      {
            byte[] buffer = new byte;
            var stream = client.GetStream();
            while (!ct.IsCancellationRequested)
            {
                try
                {
                  int bytesRead = await stream.ReadAsync(buffer, 0, buffer.Length);
                  if (bytesRead &gt; 0)
                  {
                        byte[] receivedData = new byte;
                        Array.Copy(buffer, receivedData, bytesRead);
                        Log.Information($"收到 {bytesRead} bytes 来自仪器.");
                        Console.WriteLine($"收到 {bytesRead} bytes 来自仪器.");
                        // 解析数据并生成应答
                        var result = await ParseData(receivedData/*, out byte[] response*/);
                        if (result.success)
                        {
                            Log.Information(result.message);
                            Console.WriteLine(result.message);
                            //响应发送
                            if (result.response.Length &gt; 0) await stream.WriteAsync(result.response, 0, result.response.Length);
                            Log.Information($"响应发送.");
                            Console.WriteLine("响应发送.");
                        }
                        else
                        {
                            Log.Information($"Error: {result.message}");
                            Console.WriteLine($"Error: {result.message}");
                        }
                  }
                }
                catch (IOException ex)
                {
                  Console.WriteLine($"Client {clientId} connection error: {ex.Message}");
                  return;
                }
                catch (ObjectDisposedException)
                {
                  Console.WriteLine($"Client {clientId} connection closed");
                  return;
                }
                catch (Exception ex)
                {
                  Console.WriteLine(ex.Message, $"Error processing client {clientId}");
                  return;
                }
            }
      }
    }
    //血透
    private const int ZL_PacketLength = 14;
    public const byte ZL_START_CODE_UPLOAD = 0x55; // 血压计上传数据开始码
    private static readonly byte[] ZL_Header = { 0x55, 0xAA };
    // 解析数据包
    private async Task&lt;(bool success, byte[] response, string message)&gt; ParseData(byte[] buffer/*, out byte[] response*/)
    {
      //response = null;
      // 检查前导码
      if (buffer.Length == ZL_PacketLength &amp;&amp; buffer == ZL_Header &amp;&amp; buffer == ZL_Header)
      {//
            var result =await HemodialysisZLMonitor(buffer );
            return (result.success,new byte, result.message);
      }
      else
      {
            return (false, new byte, "前导码错误");
      }
      //return (result.success, result.message);
    }
    /// &lt;summary&gt;
    /// 仪器(
    /// &lt;/summary&gt;
    /// &lt;param name="buffer"&gt;&lt;/param&gt;
    /// &lt;param name="response"&gt;&lt;/param&gt;
    /// &lt;returns&gt;&lt;/returns&gt;
    private async Task&lt;(bool success , string message)&gt; HemodialysisZLMonitor(byte[] buffer)
    {
      // 计算校验和(前12字节的累加和)
      ushort calculatedChecksum = 0;
      for (int i = 0; i &lt; 12; i++)
            calculatedChecksum += buffer;
      // 读取数据包中的校验和(大端序)
      ushort packetChecksum = (ushort)((buffer &lt;&lt; 8) | buffer);
      // 校验和验证
      if (calculatedChecksum != packetChecksum)
            throw new ArgumentException($"Checksum mismatch: calculated 0x{calculatedChecksum:X4}, received 0x{packetChecksum:X4}");
      // 解析数据
      var result = new ZL_ParsedData
      {
            DeviceType = buffer,
            DeviceId = (uint)((buffer &lt;&lt; 16) | (buffer &lt;&lt; 8) | buffer),
            HasOtherAlarm = buffer != 0,
            DataIdentifier = buffer
      };
      // 处理运行模式/权值
      byte modeWeight = buffer;
      if (modeWeight &gt;= 10 &amp;&amp; modeWeight &lt;= 17)
            result.OperationMode = GetModeName(modeWeight);
      else
            result.DataWeight = modeWeight;
      // 解析报警标志
      result.AlarmFlags = ParseAlarmFlags(buffer);
      // 解析数据值(2字节)
      ushort rawValue = (ushort)((buffer &lt;&lt; 8) | buffer);
      // 特殊处理有符号数据
      if (new[] { 0x0A, 0x0B, 0x0E }.Contains(result.DataIdentifier))
            rawValue = (ushort)(short)rawValue; // 保持二进制表示,后续转换为double
      // 应用权值处理
      result.DataValue = ApplyDataWeight(rawValue, result.DataWeight, result.DataIdentifier);
      // 设置数据名称和单位
      (result.DataName, result.Unit) = GetDataInfo(result.DataIdentifier);
      //return result;
      Console.WriteLine("Parsing successful!");
      Console.WriteLine($"设备类型: 0x{result.DeviceType:X2}");//Device Type
      Console.WriteLine($"设备id: 0x{result.DeviceId}");//十进制 Device ID   16进制:0x{result.DeviceId:X6}
      Console.WriteLine($"运行模式: {result.OperationMode ?? "N/A"}");//Operation Mode
      Console.WriteLine($"数据权值: {result.DataWeight}");//Data Weight
      Console.WriteLine($"报警标识: [漏血BloodLeak: {result.AlarmFlags.BloodLeak}, " +//Alarms
                        $"液位LiquidLevel: {result.AlarmFlags.LiquidLevel}, " +
                        $"气泡Bubble: {result.AlarmFlags.Bubble}, " +
                        $"动脉压ArterialPressure: {result.AlarmFlags.ArterialPressure}, " +
                        $"跨膜压TransmembranePressure: {result.AlarmFlags.TransmembranePressure}, " +
                        $"静脉压VenousPressure: {result.AlarmFlags.VenousPressure}, " +
                        $"温度Temperature: {result.AlarmFlags.Temperature}, " +
                        $"电导Conductivity: {result.AlarmFlags.Conductivity}]");
      Console.WriteLine($"其他报警: {result.HasOtherAlarm}");//Other Alarms
      Console.WriteLine($"数据标识: 0x{result.DataIdentifier:X2} ({result.DataName})");//数据标识
      Console.WriteLine($"数据值: {result.DataValue} {result.Unit}");//数据值
      WebSocketSend webSocketSend = new WebSocketSend { Name= result.DataName,Value= result.DataValue };
      await _webSocketManager.BroadcastAsync(Convert.ToString(result.DeviceId), webSocketSend);
      return (true,"");
    }
    private ZL_AlarmFlags ParseAlarmFlags(byte flagByte)
    {
      return new ZL_AlarmFlags
      {
            BloodLeak = (flagByte &amp; 0x01) != 0,
            LiquidLevel = (flagByte &amp; 0x02) != 0,
            Bubble = (flagByte &amp; 0x04) != 0,
            ArterialPressure = (flagByte &amp; 0x08) != 0,
            TransmembranePressure = (flagByte &amp; 0x10) != 0,
            VenousPressure = (flagByte &amp; 0x20) != 0,
            Temperature = (flagByte &amp; 0x40) != 0,
            Conductivity = (flagByte &amp; 0x80) != 0
      };
    }
    private string GetModeName(byte mode)
    {
      return mode switch
      {
            10 =&gt; "Dialysis",
            12 =&gt; "LowSuper",
            13 =&gt; "SingleSuper",
            14 =&gt; "BloodReturn",
            15 =&gt; "Precharge",
            16 =&gt; "SelfTest",
            17 =&gt; "Disinfection",
            _ =&gt; "Unknown"
      };
    }
    private double ApplyDataWeight(ushort rawValue, int weight, byte dataId)
    {
      // 特殊处理电导值(数据标识0x09)
      if (dataId == 0x09 &amp;&amp; weight &gt; 4)
            return rawValue; // 按整数显示
      return weight switch
      {
            0 or 15 =&gt; rawValue,          // 整数
            &gt; 0 and &lt;= 4 =&gt; rawValue / Math.Pow(10, weight), // 小数处理
            _ =&gt; rawValue               // 默认按整数处理
      };
    }
    private (string name, string unit) GetDataInfo(byte dataId)
    {
      var dataMap = new Dictionary&lt;byte, (string, string)&gt;
    {
      { 0x01, ("dehydration", "L") },//脱水1
      { 0x02, ("currentDehydration", "L") },//当前的脱水2
      { 0x03, ("dehydrationSpeed", "L/h") },//脱水速度3
      { 0x04, ("bloodPumpFlow", "ml/min") },//血泵流量4
      { 0x05, ("auxiliaryPump", "") },//辅助泵5
      { 0x06, ("syringe", "ml/h") },//注射器6
      { 0x07, ("dialysateFlow", "") },//透析液流量7
      { 0x08, ("dialysateTemperature", "°C") },//透析液温度8
      { 0x09, ("dialysateConductivity", "mS/cm") },//透析液电导9
      { 0x0A, ("venousPressure", "") },//静脉压力A
      { 0x0B, ("transmembranePressure", "") },//跨膜压力B
      { 0x0C, ("dialyzedTime", "min") },//已透析时间C
      { 0x0D, ("remainingTime", "min") },//剩余透析时间D
      { 0x0E, ("arterialPressure", "") },//动脉压E
      { 0x0F, ("sphygmomanometerHigh", "") },//血压计测量 高压F
      { 0x10, ("sphygmomanometerLow", "") },//血压计测量 低压10
      { 0x11, ("heartRate", "bpm") }//心率11
    };
      return dataMap.TryGetValue(dataId, out var info)
            ? info
            : ("Unknown", "");
    }
}</pre></div>
<p class="maodian"><a name="_label2"></a></p><h2>WebSocket服务</h2>
<div class="jb51code"><pre class="brush:csharp;">public class WebSocketManagement
{
    private readonly ConcurrentDictionary&lt;string, WebSocket&gt; _sockets = new();
    public async Task HandleWebSocketConnectionAsync(WebSocket webSocket)
    {
      var buffer = new byte;
      var result = await webSocket.ReceiveAsync(new ArraySegment&lt;byte&gt;(buffer), CancellationToken.None);
      if (result.MessageType == WebSocketMessageType.Text)
      {
            var deviceId = Encoding.UTF8.GetString(buffer, 0, result.Count);
            _sockets = webSocket;
            Console.WriteLine("socket消息:" + deviceId);
      }
      while (webSocket.State == WebSocketState.Open)
      {
            await Task.Delay(100);
      }
    }
    public async Task BroadcastAsync(string socketId, dynamic data)
    {
      var json = JsonConvert.SerializeObject(data);
      var buffer = Encoding.UTF8.GetBytes(json);
      _sockets.TryGetValue(socketId, out WebSocket socket);
      if (socket!=null&amp;&amp;socket.State == WebSocketState.Open)
      {
            await socket.SendAsync(new ArraySegment&lt;byte&gt;(buffer), WebSocketMessageType.Text, true, CancellationToken.None);
      }
      else
      {
            _sockets.TryRemove(socketId, out _);
      }
    }
}</pre></div>
<p>测试数据发送接收</p>
<div class="jb51code"><pre class="brush:csharp;"># PowerShell
$data = ](0x55, 0xAA, 0x00, 0x26, 0x35, 0xB2, 0x00, 0x00, 0x01, 0x0E, 0x00, 0x00, 0x02, 0x1B)
$client = New-Object System.Net.Sockets.TcpClient('localhost', 8081)
$stream = $client.GetStream()
$stream.Write($data, 0, $data.Length)
$client.Close()</pre></div>
<p style="text-align:center"><img alt="" src="https://img.jbzj.com/file_images/article/202506/2025062811063126.png" /></p>
<p>使用 WebSocket 测试工具:</p>
<p>浏览器开发者工具</p>
<p>https://websocketking.com/</p>
頁: [1]
查看完整版本: .net8创建tcp服务接收数据通过websocket广播的实现代码