1.下载Python版本对用的 setuptools和pip版本 源码包
下载地址:
setuptools:https://pypi.org/project/setuptools/#files(选择.tar.gz或.whl)
pip:https://pypi.org/project/pip/#files(选择.tar.gz或.whl)
建议下载最新版本的.tar.gz源码包(兼容性更好)
2.将下载的setuptools-xxx.tar.gz和pip-xxx.tar.gz通过 U 盘、共享文件夹等方式拷贝到麒麟 V10 系统中(如/root/offline_packages目录)
3.安装 setuptools(pip 依赖)
# 进入安装包目录
cd /root/offline_packages
# 解压setuptools
tar -zxvf setuptools-xxx.tar.gz
cd setuptools-xxx
# 用系统自带的python3安装
python3 setup.py install
4.安装 pip
# 返回安装包目录
cd ..
# 解压pip
tar -zxvf pip-xxx.tar.gz
cd pip-xxx
# 安装pip
python3 setup.py install
5.验证安装
pip3 --version # 或 pip --version(若系统默认指向python3)
1.在有网络的机器上下载包
# 创建存储目录
mkdir offline_packages
# 下载dmPython及其依赖
pip download dmPython -d ./offline_packages
# 下载minio及其依赖
pip download minio -d ./offline_packages
2.将包拷贝到目标机器
通过 U 盘、移动硬盘或其他方式,将offline_packages目录拷贝到需要离线安装的机器上。
3.在目标机器上进行离线安装
# 安装dmPython
pip install --no-index --find-links=./offline_packages dmPython
# 安装minio
pip install --no-index --find-links=./offline_packages minio
# -*- coding: utf-8 -*-
"""将达梦数据库中的文件上传到 MinIO 并按要求处理。
依赖:
pip install dmPython minio
运行示例:
python scripts/upload_dm_to_minio.py
"""
from __future__ import annotations
import logging
import os
import sys
from contextlib import closing
from datetime import datetime
from pathlib import Path
from typing import Iterator, Tuple
try:
import dmPython # type: ignore
except ModuleNotFoundError as exc: # pragma: no cover
raise SystemExit(
"未找到 dmPython 库,请先使用 `pip install dmPython` 安装后再运行此脚本"
) from exc
from minio import Minio
from minio.error import S3Error
# --------------------------- 配置区 --------------------------- #
DM_HOST = "127.0.0.1"
DM_PORT = 5237
DM_USER = "SYSDBA"
DM_PASSWORD = "SYSDBA001"
DM_SCHEMA = "TEST_SCHEMA"
DM_TABLE = "t_files"
DM_FIELD_ROWGUID = "ROW_GUID"
DM_FIELD_FILEPATH = "FILE_PATH"
MINIO_ENDPOINT = "127.0.0.1:9000"
MINIO_ACCESS_KEY = "admin"
MINIO_SECRET_KEY = "admin"
MINIO_BUCKET = "test"
MINIO_SECURE = False # MinIO 默认为 http
BASE_DIR = Path("/data")
LOG_FILE = Path(__file__).with_name("upload_failures.log")
# --------------------------- 日志配置 --------------------------- #
LOG_FILE.parent.mkdir(parents=True, exist_ok=True)
file_handler = logging.FileHandler(LOG_FILE, encoding="utf-8")
file_handler.setLevel(logging.ERROR)
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(logging.INFO)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[file_handler, console_handler],
)
logger = logging.getLogger(__name__)
# --------------------------- 数据库相关 --------------------------- #
def iter_attachments() -> Iterator[Tuple[str, str]]:
"""从达梦数据库读取 rowGuid 和 filePath。"""
conn = dmPython.connect(
user=DM_USER,
password=DM_PASSWORD,
server=DM_HOST,
port=DM_PORT,
)
with closing(conn):
with closing(conn.cursor()) as cursor:
cursor.execute(
f"SELECT {DM_FIELD_ROWGUID}, {DM_FIELD_FILEPATH} "
f"FROM {DM_SCHEMA}.{DM_TABLE} "
f"WHERE {DM_FIELD_ROWGUID} IS NOT NULL AND {DM_FIELD_FILEPATH} IS NOT NULL"
)
while True:
row = cursor.fetchone()
if row is None:
break
yield row[0], row[1]
# --------------------------- MinIO 相关 --------------------------- #
def ensure_bucket_exists(client: Minio, bucket_name: str) -> None:
if not client.bucket_exists(bucket_name):
client.make_bucket(bucket_name)
def build_object_name(row_guid: str, source_path: Path) -> str:
suffix = source_path.suffix
if not suffix:
return row_guid
return f"{row_guid}{suffix}"
def upload_one_file(client: Minio, bucket: str, row_guid: str, file_path: Path) -> None:
object_name = build_object_name(row_guid, file_path)
client.fput_object(bucket, object_name, file_path.as_posix())
logger.info("上传成功:%s -> %s/%s", file_path, bucket, object_name)
def log_failure(file_path: Path, exc: Exception) -> None:
size = file_path.stat().st_size if file_path.exists() else -1
logger.error(
"上传失败 | 路径: %s | 时间: %s | 大小: %s | 错误: %s",
file_path,
datetime.now().isoformat(timespec="seconds"),
size,
exc,
)
# --------------------------- 主流程 --------------------------- #
def main() -> None:
client = Minio(
MINIO_ENDPOINT,
access_key=MINIO_ACCESS_KEY,
secret_key=MINIO_SECRET_KEY,
secure=MINIO_SECURE,
)
ensure_bucket_exists(client, MINIO_BUCKET)
processed = 0
failed = 0
for row_guid, file_path_value in iter_attachments():
relative_path = file_path_value.lstrip("/\\")
full_path = (BASE_DIR / relative_path).resolve()
if not full_path.exists():
log_failure(full_path, FileNotFoundError("文件不存在"))
failed += 1
continue
try:
upload_one_file(client, MINIO_BUCKET, row_guid, full_path)
except (FileNotFoundError, S3Error, OSError) as exc:
log_failure(full_path, exc)
failed += 1
continue
try:
full_path.unlink()
logger.info("已删除本地文件:%s", full_path)
except OSError as exc:
logger.warning("文件删除失败:%s,错误:%s", full_path, exc)
processed += 1
logger.info("处理完成,总计:%s,失败:%s", processed + failed, failed)
if __name__ == "__main__":
try:
main()
except Exception as exc: # pragma: no cover
logger.exception("脚本运行异常: %s", exc)
sys.exit(1)