doc-llm-autotest 基于大模型的文档自动化测试平台:worker服务的可靠性增强
<p>一、可靠性分析</p><p>从架构图上,我们可以看出worker调用大模型服务过程中,会发生阻塞等待,如果此时worker异常容器挂掉了,那么此次任务状态会一直为processing,并且因为redis关联task_id的消息已经被消费了,那么这个任务就无法被识别出来重试。</p>
<p>基于这个场景分析,我们要补充巡检服务,去定时重启处于超时并且状态为processing的任务,此时服务可以从mysql捞任务表,但考虑到性能等影响,我们选择在redis构建新的processing队列,存储正在执行的task_id,构建processing_ts队列存储开始处理时间,巡检服务访问redis的processing队列、processing_ts队列来更新状态异常的任务。</p>
<p>适配worker服务逻辑:设置原子操作保证worker取任务+放入processing不会被中断。</p>
<p><img src="https://img2024.cnblogs.com/blog/3130244/202512/3130244-20251208192629813-842590394.png"></p>
<p> 二、逻辑实现</p>
<p>1. doc_llm_test_worker补充原子操作将task从ready移动到processing,记录开始执行的时间</p>
<div class="cnblogs_code">
<pre>TASK_QUEUE_READY_KEY = <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">docllm:queue:ready</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">
TASK_QUEUE_PROCESSING_KEY </span>= <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">docllm:queue:processing</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">
TASK_PROCESSING_TS_KEY </span>= <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">docllm:hash:processing_ts</span><span style="color: rgba(128, 0, 0, 1)">"</span>
<span style="color: rgba(0, 0, 255, 1)">def</span><span style="color: rgba(0, 0, 0, 1)"> worker_loop():
</span><span style="color: rgba(128, 0, 0, 1)">"""</span><span style="color: rgba(128, 0, 0, 1)">文档检查任务 worker 主循环</span><span style="color: rgba(128, 0, 0, 1)">"""</span><span style="color: rgba(0, 0, 0, 1)">
logging.info(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">doc_llm_test_worker started, waiting for tasks...</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">)
</span><span style="color: rgba(0, 0, 255, 1)">while</span><span style="color: rgba(0, 0, 0, 1)"> True:
</span><span style="color: rgba(0, 0, 255, 1)">try</span><span style="color: rgba(0, 0, 0, 1)">:
raw_item </span>= redis_client.brpoplpush(TASK_QUEUE_READY_KEY, TASK_QUEUE_PROCESSING_KEY, timeout=10<span style="color: rgba(0, 0, 0, 1)">)
</span><span style="color: rgba(0, 0, 255, 1)">if</span> <span style="color: rgba(0, 0, 255, 1)">not</span><span style="color: rgba(0, 0, 0, 1)"> raw_item:
time.sleep(</span>5<span style="color: rgba(0, 0, 0, 1)">)
</span><span style="color: rgba(0, 0, 255, 1)">continue</span> <span style="color: rgba(0, 128, 0, 1)">#</span><span style="color: rgba(0, 128, 0, 1)"> 没有任务,就继续下一轮</span>
<span style="color: rgba(0, 0, 255, 1)">try</span><span style="color: rgba(0, 0, 0, 1)">:
payload_str </span>= raw_item.decode(<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">utf-8</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">)
data </span>=<span style="color: rgba(0, 0, 0, 1)"> json.loads(payload_str)
task_id </span>= int(data[<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">task_id</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">])
</span><span style="color: rgba(0, 0, 255, 1)">except</span><span style="color: rgba(0, 0, 0, 1)"> Exception as e:
logging.exception(f</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">invalid processing queue item: {raw_item!r}</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">)
redis_client.lrem(TASK_QUEUE_PROCESSING_KEY, </span>1<span style="color: rgba(0, 0, 0, 1)">, raw_item)
</span><span style="color: rgba(0, 0, 255, 1)">continue</span><span style="color: rgba(0, 0, 0, 1)">
start_ts </span>=<span style="color: rgba(0, 0, 0, 1)"> int(time.time())
redis_client.hset(TASK_PROCESSING_TS_KEY, task_id, start_ts)
</span><span style="color: rgba(0, 0, 255, 1)">try</span><span style="color: rgba(0, 0, 0, 1)">:
process_task(task_id)
</span><span style="color: rgba(0, 0, 255, 1)">finally</span><span style="color: rgba(0, 0, 0, 1)">:
redis_client.lrem(TASK_QUEUE_PROCESSING_KEY, </span>1<span style="color: rgba(0, 0, 0, 1)">, raw_item)
redis_client.hdel(TASK_PROCESSING_TS_KEY, task_id)
</span><span style="color: rgba(0, 0, 255, 1)">except</span><span style="color: rgba(0, 0, 0, 1)"> Exception:
logging.exception(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">unexpected error in worker loop, sleep 3s</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">)
time.sleep(</span>3)</pre>
</div>
<p>2.补充巡检服务,定时重启处于超时并且状态为processing的任务,需要做到重新入队 + 状态恢复流程</p>
<p>设置参数 PROCESSING_TIMEOUT_SECONDS = 600</p>
<p>判断逻辑:</p>
<div class="cnblogs_code">
<pre>now_ts - start_ts > PROCESSING_TIMEOUT_SECONDS</pre>
</div>
<p data-start="911" data-end="917">该任务视为:</p>
<ul data-start="919" data-end="985">
<li data-start="919" data-end="946">
<p data-start="921" data-end="946">worker 处理失败(worker 崩了/卡死)</p>
</li>
<li data-start="947" data-end="961">
<p data-start="949" data-end="961">需要重新 pending</p>
</li>
<li data-start="962" data-end="985">
<p data-start="964" data-end="985">丢回 ready 队列给新的 worker</p>
</li>
</ul>
<p>适配task_service,提供给巡检服务同步改数据库任务状态</p>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 255, 1)">def</span> mark_task_processing(task_id: int) -><span style="color: rgba(0, 0, 0, 1)"> bool:
</span><span style="color: rgba(128, 0, 0, 1)">"""</span><span style="color: rgba(128, 0, 0, 1)">worker 刚拿到任务时调用:pending -> processing</span><span style="color: rgba(128, 0, 0, 1)">"""</span><span style="color: rgba(0, 0, 0, 1)">
with get_session() as session:
stmt </span>=<span style="color: rgba(0, 0, 0, 1)"> (
update(TaskDocLLM).where(
TaskDocLLM.task_id </span>==<span style="color: rgba(0, 0, 0, 1)"> task_id,
TaskDocLLM.status </span>==<span style="color: rgba(0, 0, 0, 1)"> TaskStatus.pending
).values(
status</span>=<span style="color: rgba(0, 0, 0, 1)">TaskStatus.processing,
processing_started_at</span>=<span style="color: rgba(0, 0, 0, 1)">func.now()
)
)
result </span>=<span style="color: rgba(0, 0, 0, 1)"> session.execute(stmt)
session.commit()
</span><span style="color: rgba(0, 0, 255, 1)">return</span> result.rowcount == 1
<span style="color: rgba(0, 0, 255, 1)">def</span> reclaim_task(task_id: int, timeout_dt) -><span style="color: rgba(0, 0, 0, 1)"> bool:
</span><span style="color: rgba(128, 0, 0, 1)">"""</span><span style="color: rgba(128, 0, 0, 1)">
将超时的任务重新放回队列
:param timeout_dt: datetime对象,代表“必须早于此时间才会被恢复”
</span><span style="color: rgba(128, 0, 0, 1)">"""</span><span style="color: rgba(0, 0, 0, 1)">
with get_session() as session:
stmt </span>=<span style="color: rgba(0, 0, 0, 1)"> (
update(TaskDocLLM).where(
TaskDocLLM.task_id </span>==<span style="color: rgba(0, 0, 0, 1)"> task_id,
TaskDocLLM.status </span>==<span style="color: rgba(0, 0, 0, 1)"> TaskStatus.processing,
TaskDocLLM.processing_started_at </span><<span style="color: rgba(0, 0, 0, 1)"> timeout_dt
).values(
status</span>=<span style="color: rgba(0, 0, 0, 1)">TaskStatus.pending,
retry_count</span>=TaskDocLLM.retry_count + 1<span style="color: rgba(0, 0, 0, 1)">,
processing_started_at</span>=<span style="color: rgba(0, 0, 0, 1)">None,
result</span>=<span style="color: rgba(0, 0, 0, 1)">None
)
)
result </span>=<span style="color: rgba(0, 0, 0, 1)"> session.execute(stmt)
session.commit()
</span><span style="color: rgba(0, 0, 255, 1)">return</span> result.rowcount == 1</pre>
</div>
<p>新增巡检函数reaper_loop,筛选超时任务,恢复状态,其中要做到:</p>
<ol data-start="666" data-end="752">
<li data-start="666" data-end="698">
<p data-start="669" data-end="698"><strong data-start="669" data-end="681">扫描处理中的任务</strong>(在 processing 队列)</p>
</li>
<li data-start="699" data-end="722">
<p data-start="702" data-end="722"><strong data-start="702" data-end="722">查看 start_ts 是否超时</strong></p>
</li>
<li data-start="723" data-end="752">
<p data-start="726" data-end="752"><strong data-start="726" data-end="752">原子重置数据库的任务状态 ,然后再重入 ready 队列</strong></p>
</li>
</ol>
<p>且reaper也需要保证假设有多个worker的情况下,其他worker的巡检进程不会同时抢占同一个状态为processing的任务,否则可能会导致重复入队。所以用到了reclaim_task的原子 UPDATE。</p>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 255, 1)">def</span><span style="color: rgba(0, 0, 0, 1)"> reaper_loop():
</span><span style="color: rgba(128, 0, 0, 1)">"""</span><span style="color: rgba(128, 0, 0, 1)">巡检 processing 队列,恢复超时的任务</span><span style="color: rgba(128, 0, 0, 1)">"""</span><span style="color: rgba(0, 0, 0, 1)">
logging.info(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">doc_llm_reaper started, interval=%ss, timeout=%ss</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">, REAPER_INTERVAL_SECONDS, PROCESSING_TIMEOUT_SECONDS)
</span><span style="color: rgba(0, 0, 255, 1)">while</span><span style="color: rgba(0, 0, 0, 1)"> True:
</span><span style="color: rgba(0, 0, 255, 1)">try</span><span style="color: rgba(0, 0, 0, 1)">:
now_ts </span>=<span style="color: rgba(0, 0, 0, 1)"> int(time.time())
timeout_border_ts </span>= now_ts -<span style="color: rgba(0, 0, 0, 1)"> PROCESSING_TIMEOUT_SECONDS
timeout_threshold_dt </span>= datetime.utcnow() - timedelta(seconds=<span style="color: rgba(0, 0, 0, 1)">PROCESSING_TIMEOUT_SECONDS)
items </span>= redis_client.lrange(TASK_QUEUE_PROCESSING_KEY, 0, -1<span style="color: rgba(0, 0, 0, 1)">)
</span><span style="color: rgba(0, 0, 255, 1)">if</span> <span style="color: rgba(0, 0, 255, 1)">not</span><span style="color: rgba(0, 0, 0, 1)"> items:
time.sleep(REAPER_INTERVAL_SECONDS)
</span><span style="color: rgba(0, 0, 255, 1)">continue</span>
<span style="color: rgba(0, 0, 255, 1)">for</span> raw <span style="color: rgba(0, 0, 255, 1)">in</span><span style="color: rgba(0, 0, 0, 1)"> items:
</span><span style="color: rgba(0, 0, 255, 1)">try</span><span style="color: rgba(0, 0, 0, 1)">:
payload_str </span>= raw.decode(<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">utf-8</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">)
payload </span>=<span style="color: rgba(0, 0, 0, 1)"> json.loads(payload_str)
task_id </span>= payload.get(<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">task_id</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">)
task_name </span>= payload.get(<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">task_name</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">)
</span><span style="color: rgba(0, 0, 255, 1)">except</span><span style="color: rgba(0, 0, 0, 1)"> Exception:
redis_client.lrem(TASK_QUEUE_PROCESSING_KEY, </span>1<span style="color: rgba(0, 0, 0, 1)">, raw)
</span><span style="color: rgba(0, 0, 255, 1)">continue</span><span style="color: rgba(0, 0, 0, 1)">
start_ts_raw </span>=<span style="color: rgba(0, 0, 0, 1)"> redis_client.hget(TASK_PROCESSING_TS_KEY, task_id)
</span><span style="color: rgba(0, 0, 255, 1)">if</span> start_ts_raw <span style="color: rgba(0, 0, 255, 1)">is</span><span style="color: rgba(0, 0, 0, 1)"> None:
</span><span style="color: rgba(0, 0, 255, 1)">continue</span><span style="color: rgba(0, 0, 0, 1)">
start_ts </span>=<span style="color: rgba(0, 0, 0, 1)"> int(start_ts_raw)
</span><span style="color: rgba(0, 0, 255, 1)">if</span> start_ts ><span style="color: rgba(0, 0, 0, 1)"> timeout_border_ts:
</span><span style="color: rgba(0, 0, 255, 1)">continue</span><span style="color: rgba(0, 0, 0, 1)">
logging.warning(f</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">doc_llm_reaper: task {task_id} seems stuck, start_ts={start_ts}, now_ts={now_ts}</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">)
ok </span>=<span style="color: rgba(0, 0, 0, 1)"> task_service.reclaim_task(task_id, timeout_threshold_dt)
</span><span style="color: rgba(0, 0, 255, 1)">if</span> <span style="color: rgba(0, 0, 255, 1)">not</span><span style="color: rgba(0, 0, 0, 1)"> ok:
</span><span style="color: rgba(0, 0, 255, 1)">continue</span><span style="color: rgba(0, 0, 0, 1)">
redis_client.lrem(TASK_QUEUE_PROCESSING_KEY, </span>1<span style="color: rgba(0, 0, 0, 1)">, raw)
redis_client.hdel(TASK_PROCESSING_TS_KEY, task_id)
new_payload </span>=<span style="color: rgba(0, 0, 0, 1)"> json.dumps(
{</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">task_id</span><span style="color: rgba(128, 0, 0, 1)">"</span>: task_id, <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">task_name</span><span style="color: rgba(128, 0, 0, 1)">"</span>: task_name}, ensure_ascii=<span style="color: rgba(0, 0, 0, 1)">False
)
redis_client.lpush(TASK_QUEUE_READY_KEY, new_payload)
logging.info(f</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">doc_llm_reaper: task {task_id} reclaimed and requeued to READY</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">)
</span><span style="color: rgba(0, 0, 255, 1)">except</span><span style="color: rgba(0, 0, 0, 1)"> Exception:
logging.exception(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">unexpected error in reaper loop, sleep 3s</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">)
time.sleep(REAPER_INTERVAL_SECONDS)</span></pre>
</div>
<p>在主进程之外,起一个线程循环跑巡检:</p>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 255, 1)">def</span><span style="color: rgba(0, 0, 0, 1)"> start_reaper_thread():
reaper_thread </span>= threading.Thread(target=reaper_loop, name=<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">doc_llm_reaper</span><span style="color: rgba(128, 0, 0, 1)">"</span>, daemon=<span style="color: rgba(0, 0, 0, 1)">True)
reaper_thread.start()
</span><span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)"> reaper_thread
</span><span style="color: rgba(0, 0, 255, 1)">if</span> <span style="color: rgba(128, 0, 128, 1)">__name__</span> == <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">__main__</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">:
setup_logging()
init_llm()
start_reaper_thread()
worker_loop()</span></pre>
</div>
<p> 三、测试验证</p>
<p>我们构造场景让worker在处理一条任务的时候主动挂掉,此时环境有一条mysql状态为processing的task,并且redis的doc_llm:task_queue:ready没有这条task_id关联的消息,只有doc_llm:task_queue:processing,doc_llm:hash:processing_ts队列有值。</p>
<p>这相当于这次任务被消费了,但worker异常导致任务丢失,如果没有巡检,我们就只能主动删除这条不会被触发的任务。但我们期望重启后的巡检进程reaper_loop能够检测到异常,并让这条任务重新入队,设置的超时时间是10分钟。</p>
<p><img src="https://img2024.cnblogs.com/blog/3130244/202512/3130244-20251208215713929-478925103.png"></p>
<p> </p>
<p><img src="https://img2024.cnblogs.com/blog/3130244/202512/3130244-20251208215604060-1667620268.png"></p>
<p> </p>
<p>现在重启worker,并开启巡检服务:</p>
<p> </p>
<p><img src="https://img2024.cnblogs.com/blog/3130244/202512/3130244-20251208220238792-1877105069.png"></p>
<p> 查看日志,显示51这条任务被巡检出来,重新进入测试:</p>
<p><img src="https://img2024.cnblogs.com/blog/3130244/202512/3130244-20251208220426405-1996267293.png"></p>
<p> 任务最终被执行,数据库记录了重试次数、时间等信息,这就说明巡检服务是有效的</p>
<p><img src="https://img2024.cnblogs.com/blog/3130244/202512/3130244-20251208220629134-692394998.png"></p>
<p> </p><br><br>
来源:https://www.cnblogs.com/xiaojp65536/p/19323722
頁:
[1]