dotnetty 内存泄漏的BUG修复了
<h2>一、前言</h2><p> 当你做的产品内存不稳定,CPU不稳定,内存在600MB-3G之内波动,cpu 在30%左右,就算你对外宣传支持可以十万设备,也不会有人相信,如果你做的产品直播推流内存一直稳定在60MB左右,cpu 在1%左右,我说带宽足够,支持1万人在线观看,客户对于这个产品也不会有所怀疑,通过一个月的努力我终于找出dotnetty 内存泄漏的问题所在,已经进行修复,以下是我现在运行的物联网平台,内存最少的是网关,有十几个协议主机运行,内存多的是业务服务并没有更新修复的dotnetty, 下面我要阐述问题所在</p>
<p><img src="https://img2024.cnblogs.com/blog/192878/202507/192878-20250718145115228-1618484333.png"></p>
<p> </p>
<p>HttpFlv:http://demo.kayakiot.cn:281/httpflv.html (黑衣人)</p>
<p> HttpFlv:http://demo.kayakiot.cn:281/httpflv1.html (大红包)</p>
<p>HttpFlv:http://demo.kayakiot.cn:281/httpflv2.html (鹿鼎记)</p>
<p>rtmp:rtmp://demo.kayakiot.cn:76/live1/livestream2 (黑衣人)</p>
<p>rtmp:rtmp://demo.kayakiot.cn:76/live1/livestream3 (大红包)</p>
<p>rtmp:rtmp://demo.kayakiot.cn:76/live1/livestream4(鹿鼎记)</p>
<p>注:测试服务器带宽只有8MB, httpflv 缓冲做的没有rtmp好,然后httpflv卡就多刷新几次</p>
<p> 凯亚 (Kayak) 是什么?</p>
<p> 凯亚(Kayak)是基于.NET8.0软件环境下的surging微服务引擎进行开发的, 平台包含了微服务和物联网平台。支持异步和响应式编程开发,功能包含了物模型,设备,产品,网络组件的统一管理和微服务平台下的注册中心,服务路由,模块,中间服务等管理。还有多协议适配(TCP,MQTT,UDP,CoAP,HTTP,Grpc,websocket,rtmp,httpflv,webservice,等),通过灵活多样的配置适配能够接入不同厂家不同协议等设备。并且通过设备告警,消息通知,数据可视化等功能。能够让你能快速建立起微服务物联网平台系统。</p>
<p> 凯亚物联网平台:http://demo.kayakiot.cn:3100(用户名:fanly 密码:123456)</p>
<p> 链路跟踪Skywalking V8:http://117.72.121.2:8080/</p>
<p> dotnetty:https://github.com/microsurging/DotNetty</p>
<p> surging 微服务引擎开源地址:https://github.com/fanliang11/surging(后面surging 会移动到microsurging进行维护)</p>
<h2>二、dump分析</h2>
<p>物联网平台 1天会增长90mb内存,这些是我不能接受的,因为并没有并发访问,然后我下载dump 文件进行分析,然后用windbg分析,没有大对象的占用</p>
<p><img src="https://img2024.cnblogs.com/blog/192878/202507/192878-20250718141745590-1626168307.png"></p>
<p> 以上没有问题,那就是线程阻塞了,输入!threads 进行分析,这么多空闲的线程(MTA),都把线程池资源耗光了,当超过最大值<span style="color: rgba(30, 30, 30, 1)">32767就崩溃死掉了</span></p>
<p><img src="https://img2024.cnblogs.com/blog/192878/202507/192878-20250718142201394-1124222664.png"></p>
<p> 然后再用VS可视化界面看到底有哪些线程运行,发现是dotnetty占用资源最多。那么下面来找出代码的问题</p>
<p><img src="https://img2024.cnblogs.com/blog/192878/202507/192878-20250718142611242-748742253.png"></p>
<h2>三、代码修改 </h2>
<p>dotnetty 创建线程无非就是EventExecutor,所以就把代码定位到SingleThreadEventExecutor 和LoopExecutor 类中,然后你会发现 Task.Factory.StartNew ,这个就是问题的关键了。</p>
<p>SingleThreadEventExecutor:</p>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 255, 1)">private</span> <span style="color: rgba(0, 0, 255, 1)">void</span> Loop(<span style="color: rgba(0, 0, 255, 1)">object</span><span style="color: rgba(0, 0, 0, 1)"> s)
{
SetCurrentExecutor(</span><span style="color: rgba(0, 0, 255, 1)">this</span><span style="color: rgba(0, 0, 0, 1)">);
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">high CPU consumption tasks, running RunAllTasks in a dead loop, set TaskCreationOptions.LongRunning to avoid running out of thread pool resources. </span>
_ =<span style="color: rgba(0, 0, 0, 1)"> Task.Factory.StartNew( _loopCoreAciton,CancellationToken.None, TaskCreationOptions.None, _taskScheduler);
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">Loop processing is too fast and generates a large number of loopCoreAciton task schedulers.
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">Using ManualResetEventSlim to process it is too late to wait, Using threadLock, LoopCore task schedulers will be released after execution</span>
}</pre>
</div>
<p>LoopExecutor:</p>
<div class="cnblogs_code">
<pre> <span style="color: rgba(0, 0, 255, 1)">private</span> <span style="color: rgba(0, 0, 255, 1)">static</span> <span style="color: rgba(0, 0, 255, 1)">void</span> Run(<span style="color: rgba(0, 0, 255, 1)">object</span><span style="color: rgba(0, 0, 0, 1)"> state)
{
</span><span style="color: rgba(0, 0, 255, 1)">var</span> loopExecutor =<span style="color: rgba(0, 0, 0, 1)"> (LoopExecutor)state;
loopExecutor.SetCurrentExecutor(loopExecutor);
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">High CPU consumption tasks, run libuv's UV_RUN_DEFAULT mode in a loop, and set TaskCreationOptions. LongRunning can prevent thread pool resource depletion. . </span>
_ =<span style="color: rgba(0, 0, 0, 1)"> Task.Factory.StartNew(
executor </span>=><span style="color: rgba(0, 0, 0, 1)"> ((LoopExecutor)executor).StartLoop(), state,
CancellationToken.None,
TaskCreationOptions.AttachedToParent,</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> TaskCreationOptions.RunContinuationsAsynchronously?</span>
<span style="color: rgba(0, 0, 0, 1)"> loopExecutor.Scheduler);
}</span></pre>
</div>
<p>然后我试试改成<span style="color: rgba(0, 0, 0, 1)">TaskCreationOptions.LongRunning ,然后没有用,然后再把问题定位到任务调度<span style="color: rgba(0, 0, 0, 1)">_taskScheduler上,发现_executor.Execute(new TaskQueueNode(this, _tasks.Take()))这段代码就是导致内存的原因,因为线程池会分配一个线程去执行,如果任务执行时间比较长,就会导致一直占用线程池线程得不到释放,所以后面我就进行修改创建新的线程进行执行,代码如下:</span></span></p>
<p> </p>
<div class="cnblogs_code">
<pre> <span style="color: rgba(0, 0, 255, 1)">internal</span> <span style="color: rgba(0, 0, 255, 1)">class</span><span style="color: rgba(0, 0, 0, 1)"> AloneExecutorTaskScheduler : TaskScheduler
{
</span><span style="color: rgba(0, 0, 255, 1)">private</span> <span style="color: rgba(0, 0, 255, 1)">readonly</span><span style="color: rgba(0, 0, 0, 1)"> IEventExecutor _executor;
</span><span style="color: rgba(0, 0, 255, 1)">private</span> <span style="color: rgba(0, 0, 255, 1)">bool</span><span style="color: rgba(0, 0, 0, 1)"> _started;
</span><span style="color: rgba(0, 0, 255, 1)">private</span> <span style="color: rgba(0, 0, 255, 1)">readonly</span> BlockingCollection<Task> _tasks = <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)">();
</span><span style="color: rgba(0, 0, 255, 1)">private</span> <span style="color: rgba(0, 0, 255, 1)">readonly</span><span style="color: rgba(0, 0, 0, 1)"> Thread[] _threads;
</span><span style="color: rgba(0, 0, 255, 1)">protected</span> <span style="color: rgba(0, 0, 255, 1)">override</span> IEnumerable<Task>? GetScheduledTasks() =><span style="color: rgba(0, 0, 0, 1)"> _tasks;
</span><span style="color: rgba(0, 0, 255, 1)">protected</span> <span style="color: rgba(0, 0, 255, 1)">override</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> QueueTask(Task task)
{
</span><span style="color: rgba(0, 0, 255, 1)">if</span><span style="color: rgba(0, 0, 0, 1)"> (_started)
{
_tasks.Add(task);
}
</span><span style="color: rgba(0, 0, 255, 1)">else</span><span style="color: rgba(0, 0, 0, 1)">
{
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> hack: enables this executor to be seen as default on Executor's worker thread.
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> This is a special case for SingleThreadEventExecutor.Loop initiated task.</span>
_started = <span style="color: rgba(0, 0, 255, 1)">true</span><span style="color: rgba(0, 0, 0, 1)">;
_ </span>=<span style="color: rgba(0, 0, 0, 1)"> TryExecuteTask(task);
}
}
</span><span style="color: rgba(0, 0, 255, 1)">protected</span> <span style="color: rgba(0, 0, 255, 1)">override</span> <span style="color: rgba(0, 0, 255, 1)">bool</span> TryExecuteTaskInline(Task task, <span style="color: rgba(0, 0, 255, 1)">bool</span> taskWasPreviouslyQueued) => <span style="color: rgba(0, 0, 255, 1)">false</span><span style="color: rgba(0, 0, 0, 1)">;
</span><span style="color: rgba(0, 0, 255, 1)">public</span> AloneExecutorTaskScheduler(IEventExecutor executor,<span style="color: rgba(0, 0, 255, 1)">int</span><span style="color: rgba(0, 0, 0, 1)"> threadCount)
{
_executor </span>=<span style="color: rgba(0, 0, 0, 1)"> executor;
_threads </span>= <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> Thread;
</span><span style="color: rgba(0, 0, 255, 1)">for</span> (<span style="color: rgba(0, 0, 255, 1)">int</span> index = <span style="color: rgba(128, 0, 128, 1)">0</span>; index < threadCount; index++<span style="color: rgba(0, 0, 0, 1)">)
{
_threads </span>= <span style="color: rgba(0, 0, 255, 1)">new</span> Thread(_ =><span style="color: rgba(0, 0, 0, 1)">
{
</span><span style="color: rgba(0, 0, 255, 1)">while</span> (<span style="color: rgba(0, 0, 255, 1)">true</span><span style="color: rgba(0, 0, 0, 1)">)
{
_executor.Execute(</span><span style="color: rgba(0, 0, 255, 1)">new</span> TaskQueueNode(<span style="color: rgba(0, 0, 255, 1)">this</span><span style="color: rgba(0, 0, 0, 1)">, _tasks.Take()));
}
});
}
Array.ForEach(_threads, it </span>=><span style="color: rgba(0, 0, 0, 1)"> it.Start());
}
</span><span style="color: rgba(0, 0, 255, 1)">sealed</span> <span style="color: rgba(0, 0, 255, 1)">class</span><span style="color: rgba(0, 0, 0, 1)"> TaskQueueNode : IRunnable
{
</span><span style="color: rgba(0, 0, 255, 1)">readonly</span><span style="color: rgba(0, 0, 0, 1)"> AloneExecutorTaskScheduler _scheduler;
</span><span style="color: rgba(0, 0, 255, 1)">readonly</span><span style="color: rgba(0, 0, 0, 1)"> Task _task;
</span><span style="color: rgba(0, 0, 255, 1)">public</span><span style="color: rgba(0, 0, 0, 1)"> TaskQueueNode(AloneExecutorTaskScheduler scheduler, Task task)
{
_scheduler </span>=<span style="color: rgba(0, 0, 0, 1)"> scheduler;
_task </span>=<span style="color: rgba(0, 0, 0, 1)"> task;
}
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span> Run() =><span style="color: rgba(0, 0, 0, 1)"> _scheduler.TryExecuteTask(_task);
}
}</span></pre>
</div>
<p> </p>
<h2>三、总结</h2>
<p>dotnetty 最大的问题已经修复,我将会发布到nuget, 将由我发布dotnetty 1.0版本</p>
<p> </p><br><br>
来源:https://www.cnblogs.com/fanliang11/p/18991484
頁:
[1]