doc-llm-autotest 基于大模型的文档自动化测试平台::用户提交文件进行文档测试
<h1>一、技术选型与功能设计</h1><p>使用minio服务,进行文件的中转与存储。用户提交文件到doc-llm-controller,控制面将文件转存到minio中,关联此次任务id。然后doc-llm-worker轮询redis发现有需要执行的任务,拿到id后,根据id从minio拿取文件,然后将文件解析成结构化信息,再提交到大模型,进行文档测试。</p>
<p>那么此部分功能流程图大致如下:</p>
<p><img src="https://img2024.cnblogs.com/blog/3130244/202512/3130244-20251208151020126-1797538158.png"></p>
<p>相对应的,在整体业务流程中补充文件存取步骤,最后如下:</p>
<p> </p>
<p><img src="https://img2024.cnblogs.com/blog/3130244/202512/3130244-20251208150918767-865933305.png"></p>
<h1>二、minio配置与使用</h1>
<p>minio安装部署:我们使用docker镜像来部署minio服务,暴露9000端口提供给我们自己服务使用:</p>
<div class="cnblogs_code">
<pre>docker run -d --name doc-llm-minio -p 9000:9000 -p 9001:9001 --restart=always -e MINIO_ROOT_USER=root -e MINIO_ROOT_PASSWORD=password -v /home/workspace/minio:/data minio/minio:latest server /data --console-address <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">:9001</span><span style="color: rgba(128, 0, 0, 1)">"</span></pre>
</div>
<p>通过python来调用minio服务:</p>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 128, 0, 1)">#</span><span style="color: rgba(0, 128, 0, 1)"> minio下载</span>
pip install minio</pre>
</div>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 255, 1)">from</span> minio <span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> Minio
</span><span style="color: rgba(0, 0, 255, 1)">from</span> minio.error <span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> S3Error
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> io
</span><span style="color: rgba(0, 128, 0, 1)">#</span><span style="color: rgba(0, 128, 0, 1)"> 配置minio</span>
client =<span style="color: rgba(0, 0, 0, 1)"> Minio(
</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">localhost:9000</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">,
access_key</span>=<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">root</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">,
secret_key</span>=<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">xiao1234</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">,
secure</span>=<span style="color: rgba(0, 0, 0, 1)">False,
)
bucket_name </span>= <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">doc-llm-bucket</span><span style="color: rgba(128, 0, 0, 1)">"</span>
<span style="color: rgba(0, 0, 255, 1)">try</span><span style="color: rgba(0, 0, 0, 1)">:
</span><span style="color: rgba(0, 0, 255, 1)">if</span> <span style="color: rgba(0, 0, 255, 1)">not</span><span style="color: rgba(0, 0, 0, 1)"> client.bucket_exists(bucket_name):
client.make_bucket(bucket_name)
</span><span style="color: rgba(0, 0, 255, 1)">else</span><span style="color: rgba(0, 0, 0, 1)">:
</span><span style="color: rgba(0, 0, 255, 1)">print</span>(f<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Bucket '{bucket_name}' already exists.</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">)
</span><span style="color: rgba(0, 0, 255, 1)">except</span><span style="color: rgba(0, 0, 0, 1)"> S3Error as e:
</span><span style="color: rgba(0, 0, 255, 1)">print</span>(f<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Error occurred: {e}</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">)
</span><span style="color: rgba(0, 128, 0, 1)">#</span><span style="color: rgba(0, 128, 0, 1)"> 通过python上传文件到minio</span>
<span style="color: rgba(0, 0, 255, 1)">def</span><span style="color: rgba(0, 0, 0, 1)"> upload_file(local_file_path, object_name):
</span><span style="color: rgba(0, 0, 255, 1)">try</span><span style="color: rgba(0, 0, 0, 1)">:
client.fput_object(bucket_name, object_name, local_file_path)
</span><span style="color: rgba(0, 0, 255, 1)">print</span>(f<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">'{local_file_path}' is successfully uploaded as '{object_name}' to bucket '{bucket_name}'.</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">)
</span><span style="color: rgba(0, 0, 255, 1)">except</span><span style="color: rgba(0, 0, 0, 1)"> S3Error as e:
</span><span style="color: rgba(0, 0, 255, 1)">print</span>(f<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Error occurred while uploading: {e}</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">)
</span><span style="color: rgba(0, 128, 0, 1)">#</span><span style="color: rgba(0, 128, 0, 1)"> 文件下载</span>
<span style="color: rgba(0, 0, 255, 1)">def</span><span style="color: rgba(0, 0, 0, 1)"> download_file(object_name, local_file_path):
</span><span style="color: rgba(0, 0, 255, 1)">try</span><span style="color: rgba(0, 0, 0, 1)">:
client.fget_object(bucket_name, object_name, local_file_path)
</span><span style="color: rgba(0, 0, 255, 1)">print</span>(f<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">'{object_name}' is successfully downloaded to '{local_file_path}'.</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">)
</span><span style="color: rgba(0, 0, 255, 1)">except</span><span style="color: rgba(0, 0, 0, 1)"> S3Error as e:
</span><span style="color: rgba(0, 0, 255, 1)">print</span>(f<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Error occurred while downloading: {e}</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">)
</span><span style="color: rgba(0, 128, 0, 1)">#</span><span style="color: rgba(0, 128, 0, 1)"> 列出所有文件</span>
<span style="color: rgba(0, 0, 255, 1)">def</span><span style="color: rgba(0, 0, 0, 1)"> list_files():
</span><span style="color: rgba(0, 0, 255, 1)">try</span><span style="color: rgba(0, 0, 0, 1)">:
objects </span>=<span style="color: rgba(0, 0, 0, 1)"> client.list_objects(bucket_name)
</span><span style="color: rgba(0, 0, 255, 1)">print</span>(f<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Objects in bucket '{bucket_name}':</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">)
</span><span style="color: rgba(0, 0, 255, 1)">for</span> obj <span style="color: rgba(0, 0, 255, 1)">in</span><span style="color: rgba(0, 0, 0, 1)"> objects:
</span><span style="color: rgba(0, 0, 255, 1)">print</span>(f<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">- {obj.object_name} (size: {obj.size} bytes)</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">)
</span><span style="color: rgba(0, 0, 255, 1)">except</span><span style="color: rgba(0, 0, 0, 1)"> S3Error as e:
</span><span style="color: rgba(0, 0, 255, 1)">print</span>(f<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Error occurred while listing objects: {e}</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">)
</span><span style="color: rgba(0, 128, 0, 1)">#</span><span style="color: rgba(0, 128, 0, 1)"> 删除指定文件</span>
<span style="color: rgba(0, 0, 255, 1)">def</span><span style="color: rgba(0, 0, 0, 1)"> delete_file(object_name):
</span><span style="color: rgba(0, 0, 255, 1)">try</span><span style="color: rgba(0, 0, 0, 1)">:
client.remove_object(bucket_name, object_name)
</span><span style="color: rgba(0, 0, 255, 1)">print</span>(f<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">'{object_name}' is successfully deleted from bucket '{bucket_name}'.</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">)
</span><span style="color: rgba(0, 0, 255, 1)">except</span><span style="color: rgba(0, 0, 0, 1)"> S3Error as e:
</span><span style="color: rgba(0, 0, 255, 1)">print</span>(f<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Error occurred while deleting: {e}</span><span style="color: rgba(128, 0, 0, 1)">"</span>)</pre>
</div>
<p>测试效果如下:</p>
<p><img src="https://img2024.cnblogs.com/blog/3130244/202512/3130244-20251208151849870-907674598.png"></p>
<h1> 三、控制面doc-llm-controller服务适配</h1>
<h3>总体思路:</h3>
<p>接口层接收到带文件的创建任务请求,先新增一条任务数据到mysql,其中doc字段为__PENDING_FILE__。然后拿到任务id后,调用推送文件服务将文件关联任务id一起推送到minio,结束后更新任务信息doc字段为:f"minio://{MINIO_BUCKET}/{object_name}"。</p>
<p>至此控制面业务结束。</p>
<h3>services层:</h3>
<p>新增file_service.py,提供minio服务的调用</p>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 128, 0, 1)">#</span><span style="color: rgba(0, 128, 0, 1)"> 代码样例</span>
<span style="color: rgba(0, 0, 255, 1)">def</span><span style="color: rgba(0, 0, 0, 1)"> _ensure_bucket():
</span><span style="color: rgba(128, 0, 0, 1)">"""</span><span style="color: rgba(128, 0, 0, 1)">确保 bucket 存在</span><span style="color: rgba(128, 0, 0, 1)">"""</span>
<span style="color: rgba(0, 0, 255, 1)">if</span> <span style="color: rgba(0, 0, 255, 1)">not</span><span style="color: rgba(0, 0, 0, 1)"> _minio_client.bucket_exists(MINIO_BUCKET):
_minio_client.make_bucket(MINIO_BUCKET)
</span><span style="color: rgba(0, 0, 255, 1)">def</span> save_task_file(task_id: int, file_obj: FileStorage) -><span style="color: rgba(0, 0, 0, 1)"> str:
</span><span style="color: rgba(128, 0, 0, 1)">"""</span><span style="color: rgba(128, 0, 0, 1)">
把用户上传的文件存到 MinIO,文件名格式:{task_id}_{orig_filename}
返回存入数据库的 doc 字段值,例如:minio://doc-llm-bucket/123_xxx.docx
...
doc_path = f"minio://{MINIO_BUCKET}/{object_name}"
return doc_path</span></pre>
</div>
<p>给doc_check_service, task_service 增加更新doc方法</p>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 128, 0, 1)">#</span><span style="color: rgba(0, 128, 0, 1)"> doc_check_service</span>
<span style="color: rgba(0, 0, 255, 1)">def</span> update_task_doc(task_id: int, doc: str) -><span style="color: rgba(0, 0, 0, 1)"> None:
</span><span style="color: rgba(128, 0, 0, 1)">"""</span><span style="color: rgba(128, 0, 0, 1)">更新任务的 doc 字段</span><span style="color: rgba(128, 0, 0, 1)">"""</span><span style="color: rgba(0, 0, 0, 1)">
task </span>=<span style="color: rgba(0, 0, 0, 1)"> task_service.get_task_by_id(task_id)
</span><span style="color: rgba(0, 0, 255, 1)">if</span> <span style="color: rgba(0, 0, 255, 1)">not</span><span style="color: rgba(0, 0, 0, 1)"> task:
</span><span style="color: rgba(0, 0, 255, 1)">raise</span> TaskNotFoundError(f<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">任务 {task_id} 不存在</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">)
task_service.update_task_doc(task_id, doc)
</span><span style="color: rgba(0, 128, 0, 1)">#</span><span style="color: rgba(0, 128, 0, 1)"> task_service</span>
<span style="color: rgba(0, 0, 255, 1)">def</span> update_task_doc(task_id: int, doc: str) -><span style="color: rgba(0, 0, 0, 1)"> None:
</span><span style="color: rgba(128, 0, 0, 1)">"""</span><span style="color: rgba(128, 0, 0, 1)">更新任务的 doc 字段</span><span style="color: rgba(128, 0, 0, 1)">"""</span><span style="color: rgba(0, 0, 0, 1)">
with get_session() as session:
task </span>=<span style="color: rgba(0, 0, 0, 1)"> session.scalar(
select(TaskDocLLM).where(TaskDocLLM.task_id </span>==<span style="color: rgba(0, 0, 0, 1)"> task_id)
)
</span><span style="color: rgba(0, 0, 255, 1)">if</span> <span style="color: rgba(0, 0, 255, 1)">not</span><span style="color: rgba(0, 0, 0, 1)"> task:
</span><span style="color: rgba(0, 0, 255, 1)">raise</span> ValueError(f<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">任务 {task_id} 不存在</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">)
task.doc </span>= doc</pre>
</div>
<p>更新接口函数,兼容传文本信息、文本文件两种方式:</p>
<div class="cnblogs_code">
<pre>@bp.route(<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">/tasks/</span><span style="color: rgba(128, 0, 0, 1)">"</span>, methods=[<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">POST</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">])
</span><span style="color: rgba(0, 0, 255, 1)">def</span><span style="color: rgba(0, 0, 0, 1)"> create_doc_task():
</span><span style="color: rgba(0, 128, 0, 1)">#</span><span style="color: rgba(0, 128, 0, 1)"> 判断是不是文件上传</span>
<span style="color: rgba(0, 0, 255, 1)">if</span> request.content_type <span style="color: rgba(0, 0, 255, 1)">and</span> <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">multipart/form-data</span><span style="color: rgba(128, 0, 0, 1)">"</span> <span style="color: rgba(0, 0, 255, 1)">in</span><span style="color: rgba(0, 0, 0, 1)"> request.content_type:
</span><span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)"> _create_task_with_file()
</span><span style="color: rgba(0, 128, 0, 1)">#</span><span style="color: rgba(0, 128, 0, 1)"> 默认走老的 JSON 逻辑</span>
<span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)"> _create_task_with_json()
</span><span style="color: rgba(0, 0, 255, 1)">def</span><span style="color: rgba(0, 0, 0, 1)"> _create_task_with_json():
...
task_id </span>=<span style="color: rgba(0, 0, 0, 1)"> doc_check_service.submit_doc_task(task_name, doc, product, feature)
...
</span><span style="color: rgba(0, 0, 255, 1)">def</span><span style="color: rgba(0, 0, 0, 1)"> _create_task_with_file():
....
</span><span style="color: rgba(0, 0, 255, 1)">try</span><span style="color: rgba(0, 0, 0, 1)">:
</span><span style="color: rgba(0, 128, 0, 1)">#</span><span style="color: rgba(0, 128, 0, 1)"> 1. 先写一条任务,doc 用占位符,保证非空</span>
placeholder_doc = <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">__PENDING_FILE__</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">
task_id </span>=<span style="color: rgba(0, 0, 0, 1)"> doc_check_service.submit_doc_task(
task_name</span>=<span style="color: rgba(0, 0, 0, 1)">task_name,
doc</span>=<span style="color: rgba(0, 0, 0, 1)">placeholder_doc,
product</span>=<span style="color: rgba(0, 0, 0, 1)">product,
feature</span>=<span style="color: rgba(0, 0, 0, 1)">feature,
)
doc_path </span>=<span style="color: rgba(0, 0, 0, 1)"> file_service.save_task_file(task_id, file_obj)
</span><span style="color: rgba(0, 128, 0, 1)">#</span><span style="color: rgba(0, 128, 0, 1)"> 3. 回写 doc 字段</span>
<span style="color: rgba(0, 0, 0, 1)"> doc_check_service.update_task_doc(task_id, doc_path)
...</span></pre>
</div>
<p>用postman测试下接口效果,大致是OK的:</p>
<p>接口请求:</p>
<p><img src="https://img2024.cnblogs.com/blog/3130244/202512/3130244-20251208161045464-2020315156.png"></p>
<p> flask这边日志、数据库、minio表现都OK,数据一致性有保障:</p>
<p><img src="https://img2024.cnblogs.com/blog/3130244/202512/3130244-20251208161448502-304665951.png"></p>
<p> </p>
<h1> 四、数据面doc-llm-worker服务适配</h1>
<p>当前数据流的流转:从时间先后顺序,最先会写入task到mysql,此时doc字段是pending字样,然后写入task_id到redis,再就是把文件传给minio,最后更新mysql.doc为minio的文件路径。</p>
<p>doc-llm-worker初始逻辑是:读redis队列找到需要执行的任务,读mysql拿到doc文本信息,调用大模型进行测试。因此数据面doc-llm-worker要做一些适配:</p>
<h3>1.新增文件任务的下载</h3>
<p>从minio下载文件,在file_service层补充函数:</p>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 255, 1)">def</span> download_file(bucket: str, object_name: str) -><span style="color: rgba(0, 0, 0, 1)"> bytes:
</span><span style="color: rgba(128, 0, 0, 1)">"""</span><span style="color: rgba(128, 0, 0, 1)">
从 MinIO 下载文件并返回 bytes 内容。
调用方式:
content = download_file("doc-llm-bucket", "15_readme.txt")
text = content.decode("utf-8")
</span><span style="color: rgba(128, 0, 0, 1)">"""</span>
<span style="color: rgba(0, 0, 255, 1)">try</span><span style="color: rgba(0, 0, 0, 1)">:
response </span>=<span style="color: rgba(0, 0, 0, 1)"> _minio_client.get_object(bucket, object_name)
data </span>=<span style="color: rgba(0, 0, 0, 1)"> response.read()
</span><span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)"> data
</span><span style="color: rgba(0, 0, 255, 1)">except</span><span style="color: rgba(0, 0, 0, 1)"> S3Error as e:
</span><span style="color: rgba(0, 0, 255, 1)">raise</span> RuntimeError(f<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Download from minio failed: {e}</span><span style="color: rgba(128, 0, 0, 1)">"</span>) <span style="color: rgba(0, 0, 255, 1)">from</span> e</pre>
</div>
<h3>2.将文件解析</h3>
<p>其中如果doc是纯文本的话走老逻辑;是minio格式的话,走文件下载,然后解析成文本;是pending的话,等待知道文件上传ok</p>
<p>新增doc_loader.py</p>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 128, 0, 1)">#</span><span style="color: rgba(0, 128, 0, 1)"> app/worker/doc_loader.py</span>
<span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> logging
</span><span style="color: rgba(0, 0, 255, 1)">from</span> typing <span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> Tuple
</span><span style="color: rgba(0, 0, 255, 1)">from</span> app.services <span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> file_service
PENDING_MARK </span>= <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">__PENDING_FILE__</span><span style="color: rgba(128, 0, 0, 1)">"</span>
<span style="color: rgba(0, 0, 255, 1)">def</span> _is_minio_path(doc: str) -><span style="color: rgba(0, 0, 0, 1)"> bool:
</span><span style="color: rgba(128, 0, 0, 1)">"""</span><span style="color: rgba(128, 0, 0, 1)">
判断 doc 是否为 MinIO 路径:
- /bucket/object_name
- minio://bucket/object_name
</span><span style="color: rgba(128, 0, 0, 1)">"""</span><span style="color: rgba(0, 0, 0, 1)">
...
</span><span style="color: rgba(0, 0, 255, 1)">def</span> _parse_minio_path(doc: str) -><span style="color: rgba(0, 0, 0, 1)"> Tuple:
</span><span style="color: rgba(128, 0, 0, 1)">"""</span><span style="color: rgba(128, 0, 0, 1)">解析 doc 字段为 (bucket, object_name)</span><span style="color: rgba(128, 0, 0, 1)">"""</span><span style="color: rgba(0, 0, 0, 1)">
...
</span><span style="color: rgba(0, 0, 255, 1)">def</span> load_doc_for_task(task) -><span style="color: rgba(0, 0, 0, 1)"> str:
</span><span style="color: rgba(128, 0, 0, 1)">"""</span><span style="color: rgba(128, 0, 0, 1)">
根据任务对象,返回真正要给 LLM 的 doc 文本(str)
1. doc == "__PENDING_FILE__" -> 抛 DocPendingError
2. doc 是 MinIO 路径 (/bucket/obj)-> 从 MinIO 下载并 decode
3. 其他 -> 当作普通文本直接返回
</span><span style="color: rgba(128, 0, 0, 1)">"""</span><span style="color: rgba(0, 0, 0, 1)">
doc </span>= (task.doc <span style="color: rgba(0, 0, 255, 1)">or</span> <span style="color: rgba(128, 0, 0, 1)">""</span><span style="color: rgba(0, 0, 0, 1)">).strip()
</span><span style="color: rgba(0, 0, 255, 1)">if</span> <span style="color: rgba(0, 0, 255, 1)">not</span><span style="color: rgba(0, 0, 0, 1)"> doc:
</span><span style="color: rgba(0, 0, 255, 1)">raise</span> DocPathError(f<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">task {task.id} doc is empty</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">)
</span><span style="color: rgba(0, 0, 255, 1)">if</span> doc ==<span style="color: rgba(0, 0, 0, 1)"> PENDING_MARK:
</span><span style="color: rgba(0, 0, 255, 1)">raise</span> DocPendingError(f<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">task {task.id} doc is still pending file upload</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">)
</span><span style="color: rgba(0, 0, 255, 1)">if</span><span style="color: rgba(0, 0, 0, 1)"> _is_minio_path(doc):
bucket, object_name </span>=<span style="color: rgba(0, 0, 0, 1)"> _parse_minio_path(doc)
logging.info(
f</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">task {task.id} doc is minio path, bucket={bucket}, object={object_name}</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">
)
content_bytes </span>=<span style="color: rgba(0, 0, 0, 1)"> file_service.download_file(bucket, object_name)
</span><span style="color: rgba(0, 0, 255, 1)">return</span> content_bytes.decode(<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">utf-8</span><span style="color: rgba(128, 0, 0, 1)">"</span>, errors=<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">replace</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">)
</span><span style="color: rgba(0, 0, 255, 1)">return</span> doc</pre>
</div>
<h3>3.worker的处理</h3>
<p>读redis队列,根据任务id找到这条task,但当文件任务doc字段还是"__PENDING_FILE__"时,做阻塞等待,直到doc字段更新为"minio://{bucket}/{object_name}",从minio下载文件再处理,适配doc_llm_test_worker</p>
<p>新增阻塞等待函数</p>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 255, 1)">def</span><span style="color: rgba(0, 0, 0, 1)"> wait_for_doc_ready(task_id: int):
</span><span style="color: rgba(128, 0, 0, 1)">"""</span><span style="color: rgba(128, 0, 0, 1)">
当 doc == "__PENDING_FILE__" 时,等待 doc 字段被控制面更新。
超过最大重试次数仍未更新则抛出异常。
</span><span style="color: rgba(128, 0, 0, 1)">"""</span><span style="color: rgba(0, 0, 0, 1)">
PENDING_RETRY_INTERVAL </span>= 2<span style="color: rgba(0, 0, 0, 1)">
PENDING_RETRY_MAX </span>= 5
<span style="color: rgba(0, 0, 255, 1)">for</span> i <span style="color: rgba(0, 0, 255, 1)">in</span><span style="color: rgba(0, 0, 0, 1)"> range(PENDING_RETRY_MAX):
time.sleep(PENDING_RETRY_INTERVAL)
task </span>=<span style="color: rgba(0, 0, 0, 1)"> task_service.get_task_by_id(task_id)
</span><span style="color: rgba(0, 0, 255, 1)">if</span> <span style="color: rgba(0, 0, 255, 1)">not</span><span style="color: rgba(0, 0, 0, 1)"> task:
</span><span style="color: rgba(0, 0, 255, 1)">raise</span> RuntimeError(f<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">task {task_id} disappeared during pending wait</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">)
doc </span>= (task.doc <span style="color: rgba(0, 0, 255, 1)">or</span> <span style="color: rgba(128, 0, 0, 1)">""</span><span style="color: rgba(0, 0, 0, 1)">).strip()
</span><span style="color: rgba(0, 0, 255, 1)">if</span> doc !=<span style="color: rgba(0, 0, 0, 1)"> doc_loader.PENDING_MARK:
logging.info(f</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">task {task_id} doc is ready after {i+1} retries: {doc}</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">)
</span><span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)"> task
logging.info(f</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">task {task_id} doc still pending (retry {i+1}/{PENDING_RETRY_MAX})</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">)
</span><span style="color: rgba(0, 0, 255, 1)">raise</span> RuntimeError(f<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">task {task_id} doc still pending after max retries</span><span style="color: rgba(128, 0, 0, 1)">"</span>)</pre>
</div>
<p>适配文档处理函数process_task</p>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 255, 1)">def</span><span style="color: rgba(0, 0, 0, 1)"> process_task(task_id: int):
</span><span style="color: rgba(128, 0, 0, 1)">"""</span><span style="color: rgba(128, 0, 0, 1)">处理文档检查任务</span><span style="color: rgba(128, 0, 0, 1)">"""</span><span style="color: rgba(0, 0, 0, 1)">
logging.info(f</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">start process task {task_id}</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">)
...
</span><span style="color: rgba(0, 0, 255, 1)">try</span><span style="color: rgba(0, 0, 0, 1)">:
</span><span style="color: rgba(0, 0, 255, 1)">try</span><span style="color: rgba(0, 0, 0, 1)">:
doc_text </span>=<span style="color: rgba(0, 0, 0, 1)"> doc_loader.load_doc_for_task(task)
</span><span style="color: rgba(0, 0, 255, 1)">except</span><span style="color: rgba(0, 0, 0, 1)"> doc_loader.DocPendingError as e:
logging.info(f</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">task {task_id} doc pending, waiting...</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">)
</span><span style="color: rgba(0, 0, 255, 1)">try</span><span style="color: rgba(0, 0, 0, 1)">:
task </span>=<span style="color: rgba(0, 0, 0, 1)"> wait_for_doc_ready(task_id)
doc_text </span>=<span style="color: rgba(0, 0, 0, 1)"> doc_loader.load_doc_for_task(task)
</span><span style="color: rgba(0, 0, 255, 1)">except</span><span style="color: rgba(0, 0, 0, 1)"> Exception as e2:
logging.error(f</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">task {task_id} pending wait failed: {e2}</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">)
task_service.mark_task_failed(task_id, str(e2))
</span><span style="color: rgba(0, 0, 255, 1)">return</span>
<span style="color: rgba(0, 0, 255, 1)">except</span><span style="color: rgba(0, 0, 0, 1)"> doc_loader.DocPathError as e:
logging.error(f</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">task {task_id} invalid doc path: {e}</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">)
task_service.mark_task_failed(task_id, str(e))
</span><span style="color: rgba(0, 0, 255, 1)">return</span></pre>
</div>
<p>测试效果:</p>
<p> 数据库数据</p>
<p><img src="https://img2024.cnblogs.com/blog/3130244/202512/3130244-20251208175014355-1733011035.png"></p>
<p> worker处理日志:</p>
<p><img src="https://img2024.cnblogs.com/blog/3130244/202512/3130244-20251208175055612-1096007653.png"></p>
<p> 最终效果:</p>
<p><img src="https://img2024.cnblogs.com/blog/3130244/202512/3130244-20251208175132905-163029416.png"></p>
<p> </p>
<h1>五、前端界面适配接口</h1>
<p>补充文本上传的操作方式,适配</p>
<p>旧的文本输入方式:</p>
<p><img src="https://img2024.cnblogs.com/blog/3130244/202512/3130244-20251208180201497-1757389066.png"></p>
<p>新支持的文件输入方式</p>
<p><img src="https://img2024.cnblogs.com/blog/3130244/202512/3130244-20251208180439151-1348151369.png"></p>
<p> </p><br><br>
来源:https://www.cnblogs.com/xiaojp65536/p/19321881
頁:
[1]