丁丁丁啊 發表於 2025-12-8 21:23:00

doc-llm-autotest 基于大模型的文档自动化测试平台:worker服务的可靠性增强

<p>一、可靠性分析</p>
<p>从架构图上,我们可以看出worker调用大模型服务过程中,会发生阻塞等待,如果此时worker异常容器挂掉了,那么此次任务状态会一直为processing,并且因为redis关联task_id的消息已经被消费了,那么这个任务就无法被识别出来重试。</p>
<p>基于这个场景分析,我们要补充巡检服务,去定时重启处于超时并且状态为processing的任务,此时服务可以从mysql捞任务表,但考虑到性能等影响,我们选择在redis构建新的processing队列,存储正在执行的task_id,构建processing_ts队列存储开始处理时间,巡检服务访问redis的processing队列、processing_ts队列来更新状态异常的任务。</p>
<p>适配worker服务逻辑:设置原子操作保证worker取任务+放入processing不会被中断。</p>
<p><img src="https://img2024.cnblogs.com/blog/3130244/202512/3130244-20251208192629813-842590394.png"></p>
<p>&nbsp;二、逻辑实现</p>
<p>1.&nbsp;doc_llm_test_worker补充原子操作将task从ready移动到processing,记录开始执行的时间</p>
<div class="cnblogs_code">
<pre>TASK_QUEUE_READY_KEY = <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">docllm:queue:ready</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">
TASK_QUEUE_PROCESSING_KEY </span>= <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">docllm:queue:processing</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">
TASK_PROCESSING_TS_KEY </span>= <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">docllm:hash:processing_ts</span><span style="color: rgba(128, 0, 0, 1)">"</span>

<span style="color: rgba(0, 0, 255, 1)">def</span><span style="color: rgba(0, 0, 0, 1)"> worker_loop():
    </span><span style="color: rgba(128, 0, 0, 1)">"""</span><span style="color: rgba(128, 0, 0, 1)">文档检查任务 worker 主循环</span><span style="color: rgba(128, 0, 0, 1)">"""</span><span style="color: rgba(0, 0, 0, 1)">
    logging.info(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">doc_llm_test_worker started, waiting for tasks...</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">)
    </span><span style="color: rgba(0, 0, 255, 1)">while</span><span style="color: rgba(0, 0, 0, 1)"> True:
      </span><span style="color: rgba(0, 0, 255, 1)">try</span><span style="color: rgba(0, 0, 0, 1)">:
            raw_item </span>= redis_client.brpoplpush(TASK_QUEUE_READY_KEY, TASK_QUEUE_PROCESSING_KEY, timeout=10<span style="color: rgba(0, 0, 0, 1)">)
            </span><span style="color: rgba(0, 0, 255, 1)">if</span> <span style="color: rgba(0, 0, 255, 1)">not</span><span style="color: rgba(0, 0, 0, 1)"> raw_item:
                time.sleep(</span>5<span style="color: rgba(0, 0, 0, 1)">)
                </span><span style="color: rgba(0, 0, 255, 1)">continue</span> <span style="color: rgba(0, 128, 0, 1)">#</span><span style="color: rgba(0, 128, 0, 1)"> 没有任务,就继续下一轮</span>

            <span style="color: rgba(0, 0, 255, 1)">try</span><span style="color: rgba(0, 0, 0, 1)">:
                payload_str </span>= raw_item.decode(<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">utf-8</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">)
                data </span>=<span style="color: rgba(0, 0, 0, 1)"> json.loads(payload_str)
                task_id </span>= int(data[<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">task_id</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">])
            </span><span style="color: rgba(0, 0, 255, 1)">except</span><span style="color: rgba(0, 0, 0, 1)"> Exception as e:
                logging.exception(f</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">invalid processing queue item: {raw_item!r}</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">)
                redis_client.lrem(TASK_QUEUE_PROCESSING_KEY, </span>1<span style="color: rgba(0, 0, 0, 1)">, raw_item)
                </span><span style="color: rgba(0, 0, 255, 1)">continue</span><span style="color: rgba(0, 0, 0, 1)">
            
            start_ts </span>=<span style="color: rgba(0, 0, 0, 1)"> int(time.time())
            redis_client.hset(TASK_PROCESSING_TS_KEY, task_id, start_ts)

            </span><span style="color: rgba(0, 0, 255, 1)">try</span><span style="color: rgba(0, 0, 0, 1)">:
                process_task(task_id)
            </span><span style="color: rgba(0, 0, 255, 1)">finally</span><span style="color: rgba(0, 0, 0, 1)">:
                redis_client.lrem(TASK_QUEUE_PROCESSING_KEY, </span>1<span style="color: rgba(0, 0, 0, 1)">, raw_item)
                redis_client.hdel(TASK_PROCESSING_TS_KEY, task_id)
      </span><span style="color: rgba(0, 0, 255, 1)">except</span><span style="color: rgba(0, 0, 0, 1)"> Exception:
            logging.exception(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">unexpected error in worker loop, sleep 3s</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">)
            time.sleep(</span>3)</pre>
</div>
<p>2.补充巡检服务,定时重启处于超时并且状态为processing的任务,需要做到重新入队 + 状态恢复流程</p>
<p>设置参数 PROCESSING_TIMEOUT_SECONDS = 600</p>
<p>判断逻辑:</p>
<div class="cnblogs_code">
<pre>now_ts - start_ts &gt; PROCESSING_TIMEOUT_SECONDS</pre>
</div>
<p data-start="911" data-end="917">该任务视为:</p>
<ul data-start="919" data-end="985">
<li data-start="919" data-end="946">
<p data-start="921" data-end="946">worker 处理失败(worker 崩了/卡死)</p>
</li>
<li data-start="947" data-end="961">
<p data-start="949" data-end="961">需要重新 pending</p>
</li>
<li data-start="962" data-end="985">
<p data-start="964" data-end="985">丢回 ready 队列给新的 worker</p>
</li>
</ul>
<p>适配task_service,提供给巡检服务同步改数据库任务状态</p>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 255, 1)">def</span> mark_task_processing(task_id: int) -&gt;<span style="color: rgba(0, 0, 0, 1)"> bool:
    </span><span style="color: rgba(128, 0, 0, 1)">"""</span><span style="color: rgba(128, 0, 0, 1)">worker 刚拿到任务时调用:pending -&gt; processing</span><span style="color: rgba(128, 0, 0, 1)">"""</span><span style="color: rgba(0, 0, 0, 1)">
    with get_session() as session:
      stmt </span>=<span style="color: rgba(0, 0, 0, 1)"> (
            update(TaskDocLLM).where(
                TaskDocLLM.task_id </span>==<span style="color: rgba(0, 0, 0, 1)"> task_id,
                TaskDocLLM.status </span>==<span style="color: rgba(0, 0, 0, 1)"> TaskStatus.pending
            ).values(
                status</span>=<span style="color: rgba(0, 0, 0, 1)">TaskStatus.processing,
                processing_started_at</span>=<span style="color: rgba(0, 0, 0, 1)">func.now()
            )
      )
      result </span>=<span style="color: rgba(0, 0, 0, 1)"> session.execute(stmt)
      session.commit()
      </span><span style="color: rgba(0, 0, 255, 1)">return</span> result.rowcount == 1

<span style="color: rgba(0, 0, 255, 1)">def</span> reclaim_task(task_id: int, timeout_dt) -&gt;<span style="color: rgba(0, 0, 0, 1)"> bool:
    </span><span style="color: rgba(128, 0, 0, 1)">"""</span><span style="color: rgba(128, 0, 0, 1)">
    将超时的任务重新放回队列
    :param timeout_dt: datetime对象,代表“必须早于此时间才会被恢复”
    </span><span style="color: rgba(128, 0, 0, 1)">"""</span><span style="color: rgba(0, 0, 0, 1)">
    with get_session() as session:
      stmt </span>=<span style="color: rgba(0, 0, 0, 1)"> (
            update(TaskDocLLM).where(
                TaskDocLLM.task_id </span>==<span style="color: rgba(0, 0, 0, 1)"> task_id,
                TaskDocLLM.status </span>==<span style="color: rgba(0, 0, 0, 1)"> TaskStatus.processing,
                TaskDocLLM.processing_started_at </span>&lt;<span style="color: rgba(0, 0, 0, 1)"> timeout_dt
            ).values(
                status</span>=<span style="color: rgba(0, 0, 0, 1)">TaskStatus.pending,
                retry_count</span>=TaskDocLLM.retry_count + 1<span style="color: rgba(0, 0, 0, 1)">,
                processing_started_at</span>=<span style="color: rgba(0, 0, 0, 1)">None,
                result</span>=<span style="color: rgba(0, 0, 0, 1)">None
            )
      )
      result </span>=<span style="color: rgba(0, 0, 0, 1)"> session.execute(stmt)
      session.commit()
      </span><span style="color: rgba(0, 0, 255, 1)">return</span> result.rowcount == 1</pre>
</div>
<p>新增巡检函数reaper_loop,筛选超时任务,恢复状态,其中要做到:</p>
<ol data-start="666" data-end="752">
<li data-start="666" data-end="698">
<p data-start="669" data-end="698"><strong data-start="669" data-end="681">扫描处理中的任务</strong>(在 processing 队列)</p>
</li>
<li data-start="699" data-end="722">
<p data-start="702" data-end="722"><strong data-start="702" data-end="722">查看 start_ts 是否超时</strong></p>
</li>
<li data-start="723" data-end="752">
<p data-start="726" data-end="752"><strong data-start="726" data-end="752">原子重置数据库的任务状态 ,然后再重入 ready 队列</strong></p>
</li>
</ol>
<p>且reaper也需要保证假设有多个worker的情况下,其他worker的巡检进程不会同时抢占同一个状态为processing的任务,否则可能会导致重复入队。所以用到了reclaim_task的原子 UPDATE。</p>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 255, 1)">def</span><span style="color: rgba(0, 0, 0, 1)"> reaper_loop():
    </span><span style="color: rgba(128, 0, 0, 1)">"""</span><span style="color: rgba(128, 0, 0, 1)">巡检 processing 队列,恢复超时的任务</span><span style="color: rgba(128, 0, 0, 1)">"""</span><span style="color: rgba(0, 0, 0, 1)">
    logging.info(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">doc_llm_reaper started, interval=%ss, timeout=%ss</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">, REAPER_INTERVAL_SECONDS, PROCESSING_TIMEOUT_SECONDS)
    </span><span style="color: rgba(0, 0, 255, 1)">while</span><span style="color: rgba(0, 0, 0, 1)"> True:
      </span><span style="color: rgba(0, 0, 255, 1)">try</span><span style="color: rgba(0, 0, 0, 1)">:
            now_ts </span>=<span style="color: rgba(0, 0, 0, 1)"> int(time.time())
            timeout_border_ts </span>= now_ts -<span style="color: rgba(0, 0, 0, 1)"> PROCESSING_TIMEOUT_SECONDS
            timeout_threshold_dt </span>= datetime.utcnow() - timedelta(seconds=<span style="color: rgba(0, 0, 0, 1)">PROCESSING_TIMEOUT_SECONDS)
            
            items </span>= redis_client.lrange(TASK_QUEUE_PROCESSING_KEY, 0, -1<span style="color: rgba(0, 0, 0, 1)">)
            </span><span style="color: rgba(0, 0, 255, 1)">if</span> <span style="color: rgba(0, 0, 255, 1)">not</span><span style="color: rgba(0, 0, 0, 1)"> items:
                time.sleep(REAPER_INTERVAL_SECONDS)
                </span><span style="color: rgba(0, 0, 255, 1)">continue</span>
            <span style="color: rgba(0, 0, 255, 1)">for</span> raw <span style="color: rgba(0, 0, 255, 1)">in</span><span style="color: rgba(0, 0, 0, 1)"> items:
                </span><span style="color: rgba(0, 0, 255, 1)">try</span><span style="color: rgba(0, 0, 0, 1)">:
                  payload_str </span>= raw.decode(<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">utf-8</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">)
                  payload </span>=<span style="color: rgba(0, 0, 0, 1)"> json.loads(payload_str)
                  task_id </span>= payload.get(<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">task_id</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">)
                  task_name </span>= payload.get(<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">task_name</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">)
                </span><span style="color: rgba(0, 0, 255, 1)">except</span><span style="color: rgba(0, 0, 0, 1)"> Exception:
                  redis_client.lrem(TASK_QUEUE_PROCESSING_KEY, </span>1<span style="color: rgba(0, 0, 0, 1)">, raw)
                  </span><span style="color: rgba(0, 0, 255, 1)">continue</span><span style="color: rgba(0, 0, 0, 1)">

                start_ts_raw </span>=<span style="color: rgba(0, 0, 0, 1)"> redis_client.hget(TASK_PROCESSING_TS_KEY, task_id)
                </span><span style="color: rgba(0, 0, 255, 1)">if</span> start_ts_raw <span style="color: rgba(0, 0, 255, 1)">is</span><span style="color: rgba(0, 0, 0, 1)"> None:
                  </span><span style="color: rgba(0, 0, 255, 1)">continue</span><span style="color: rgba(0, 0, 0, 1)">
                start_ts </span>=<span style="color: rgba(0, 0, 0, 1)"> int(start_ts_raw)
                </span><span style="color: rgba(0, 0, 255, 1)">if</span> start_ts &gt;<span style="color: rgba(0, 0, 0, 1)"> timeout_border_ts:
                  </span><span style="color: rgba(0, 0, 255, 1)">continue</span><span style="color: rgba(0, 0, 0, 1)">
                logging.warning(f</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">doc_llm_reaper: task {task_id} seems stuck, start_ts={start_ts}, now_ts={now_ts}</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">)

                ok </span>=<span style="color: rgba(0, 0, 0, 1)"> task_service.reclaim_task(task_id, timeout_threshold_dt)
                </span><span style="color: rgba(0, 0, 255, 1)">if</span> <span style="color: rgba(0, 0, 255, 1)">not</span><span style="color: rgba(0, 0, 0, 1)"> ok:
                  </span><span style="color: rgba(0, 0, 255, 1)">continue</span><span style="color: rgba(0, 0, 0, 1)">
                redis_client.lrem(TASK_QUEUE_PROCESSING_KEY, </span>1<span style="color: rgba(0, 0, 0, 1)">, raw)
                redis_client.hdel(TASK_PROCESSING_TS_KEY, task_id)

                new_payload </span>=<span style="color: rgba(0, 0, 0, 1)"> json.dumps(
                  {</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">task_id</span><span style="color: rgba(128, 0, 0, 1)">"</span>: task_id, <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">task_name</span><span style="color: rgba(128, 0, 0, 1)">"</span>: task_name}, ensure_ascii=<span style="color: rgba(0, 0, 0, 1)">False
                )
                redis_client.lpush(TASK_QUEUE_READY_KEY, new_payload)
                logging.info(f</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">doc_llm_reaper: task {task_id} reclaimed and requeued to READY</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">)
      </span><span style="color: rgba(0, 0, 255, 1)">except</span><span style="color: rgba(0, 0, 0, 1)"> Exception:
            logging.exception(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">unexpected error in reaper loop, sleep 3s</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">)
      time.sleep(REAPER_INTERVAL_SECONDS)</span></pre>
</div>
<p>在主进程之外,起一个线程循环跑巡检:</p>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 255, 1)">def</span><span style="color: rgba(0, 0, 0, 1)"> start_reaper_thread():
    reaper_thread </span>= threading.Thread(target=reaper_loop, name=<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">doc_llm_reaper</span><span style="color: rgba(128, 0, 0, 1)">"</span>, daemon=<span style="color: rgba(0, 0, 0, 1)">True)
    reaper_thread.start()
    </span><span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)"> reaper_thread

</span><span style="color: rgba(0, 0, 255, 1)">if</span> <span style="color: rgba(128, 0, 128, 1)">__name__</span> == <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">__main__</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">:
    setup_logging()
    init_llm()
    start_reaper_thread()
    worker_loop()</span></pre>
</div>
<p>&nbsp;三、测试验证</p>
<p>我们构造场景让worker在处理一条任务的时候主动挂掉,此时环境有一条mysql状态为processing的task,并且redis的doc_llm:task_queue:ready没有这条task_id关联的消息,只有doc_llm:task_queue:processing,doc_llm:hash:processing_ts队列有值。</p>
<p>这相当于这次任务被消费了,但worker异常导致任务丢失,如果没有巡检,我们就只能主动删除这条不会被触发的任务。但我们期望重启后的巡检进程reaper_loop能够检测到异常,并让这条任务重新入队,设置的超时时间是10分钟。</p>
<p><img src="https://img2024.cnblogs.com/blog/3130244/202512/3130244-20251208215713929-478925103.png"></p>
<p>&nbsp;</p>
<p><img src="https://img2024.cnblogs.com/blog/3130244/202512/3130244-20251208215604060-1667620268.png"></p>
<p>&nbsp;</p>
<p>现在重启worker,并开启巡检服务:</p>
<p>&nbsp;</p>
<p><img src="https://img2024.cnblogs.com/blog/3130244/202512/3130244-20251208220238792-1877105069.png"></p>
<p>&nbsp;查看日志,显示51这条任务被巡检出来,重新进入测试:</p>
<p><img src="https://img2024.cnblogs.com/blog/3130244/202512/3130244-20251208220426405-1996267293.png"></p>
<p>&nbsp;任务最终被执行,数据库记录了重试次数、时间等信息,这就说明巡检服务是有效的</p>
<p><img src="https://img2024.cnblogs.com/blog/3130244/202512/3130244-20251208220629134-692394998.png"></p>
<p>&nbsp;</p><br><br>
来源:https://www.cnblogs.com/xiaojp65536/p/19323722
頁: [1]
查看完整版本: doc-llm-autotest 基于大模型的文档自动化测试平台:worker服务的可靠性增强