初吻给奶瓶 發表於 2026-1-17 17:22:00

[python]Flask - Tracking ID的设计

<h2 id="前言">前言</h2>
<p>在实际业务中,根据 <code>tracking_id</code> 追溯一条请求的完整处理路径是比较常见的需求。借助 Flask 自带的全局对象 <code>g</code> 以及钩子函数,可以很容易地为每条请求添加 <code>tracking_id</code>,并在日志中自动记录。</p>
<p>主要内容:</p>
<ul>
<li>如何为每条请求添加 <code>tracking_id</code></li>
<li>如何为日志自动添加 <code>tracking_id</code> 记录</li>
<li>如何自定义响应类,实现统一的响应格式,并在响应头中添加 <code>tracking_id</code></li>
<li>视图函数单元测试示例</li>
<li>Gunicorn 配置</li>
</ul>
<h2 id="项目结构">项目结构</h2>
<p>虽然内容看起来很多,但 tracking_id 的实现其实很简单。本文按照生产项目的规范组织了代码,添加了 Gunicorn 配置和单元测试代码,以及规范了日志格式和 JSON 响应格式。</p>
<pre><code>├── apis
│   ├── common
│   │   ├── common.py
│   │   └── __init__.py
│   └── __init__.py
├── gunicorn.conf.py
├── handles
│   └── user.py
├── logs
│   ├── access.log
│   └── error.log
├── main.py
├── middlewares
│   ├── __init__.py
│   └── tracking_id.py
├── pkgs
│   └── log
│       ├── app_log.py
│       └── __init__.py
├── pyproject.toml
├── pytest.ini
├── README.md
├── responses
│   ├── __init__.py
│   └── json_response.py
├── tests
│   └── apis
│       └── test_common.py
├── tmp
│   └── gunicorn.pid
└── uv.lock
</code></pre>
<p>安装依赖</p>
<pre><code class="language-shell">uv add flask
uv add gunicorn gevent# 生产环境部署一般依赖这两个
uv add --dev pytest         # 测试库
</code></pre>
<h2 id="实现添加-tracking_id-的中间件">实现添加 tracking_id 的中间件</h2>
<p>代码文件:<code>middlewares/tracking_id.py</code></p>
<pre><code class="language-python">from uuid import uuid4

from flask import Flask, Response, g, request


def tracking_id_middleware(app: Flask):
    """
    跟踪 ID 中间件
    为每个请求生成或获取跟踪 ID,用于追踪请求链路
    """
   
    @app.before_request
    def tracking_id_before_request():
      """
      请求前处理函数
      检查请求头中是否包含 X-Tracking-ID,如果没有则生成一个新的 UUID 作为跟踪 ID
      并将其存储到 Flask 的全局对象 g 中,供后续处理使用
      """
      # 从请求头中获取 X-Tracking-ID
      tracking_id = request.headers.get("X-Tracking-ID")
      if not tracking_id:
            # 如果请求头中没有 X-Tracking-ID,则生成一个新的 UUID
            tracking_id = str(uuid4())
      # 将跟踪 ID 存储到 Flask 的全局对象 g 中,供后续处理使用
      g.tracking_id = tracking_id

    @app.after_request
    def tracking_id_after_request(response: Response):
      """
      请求后处理函数
      将跟踪 ID 添加到响应头中,以便客户端知道本次请求的跟踪 ID
      """
      # 检查响应头中是否已经有 X-Tracking-ID
      tracking_id = response.headers.get("X-Tracking-ID", "")
      if not tracking_id:
            # 如果响应头中没有 X-Tracking-ID,则从全局对象 g 中获取
            tracking_id = g.get("tracking_id", "")
            # 将跟踪 ID 添加到响应头中
            response.headers["X-Tracking-ID"] = tracking_id
      return response

    # 返回应用实例
    return app
</code></pre>
<p>代码文件 <code>middlewares/__init__.py</code>,方便其他模块导入</p>
<pre><code class="language-python">from .tracking_id import tracking_id_middleware

__all__ = [
    "tracking_id_middleware",
]
</code></pre>
<h2 id="日志模块---自动记录-tracking_id">日志模块 - 自动记录 tracking_id</h2>
<p>实现一个简单的输出到控制台的日志模块,日志格式为 JSON,自动添加 tracking_id 到日志中,避免手动在 <code>logger.info()</code> 这类方法中传入 <code>tracking_id</code>。</p>
<p>代码文件 <code>pkgs/log/app_log.py</code></p>
<pre><code class="language-python">import json
import logging
import sys

from flask import g


class JSONFormatter(logging.Formatter):
    """日志格式化器,输出 JSON 格式的日志。"""

    def format(self, record: logging.LogRecord) -&gt; str:
      log_record = {
            "@timestamp": self.formatTime(record, "%Y-%m-%dT%H:%M:%S%z"),
            "level": record.levelname,
            "name": record.name,
            # "processName": record.processName,# 如需记录进程名可取消注释
            "tracking_id": getattr(record, "tracking_id", None),
            "loc": "%s:%d" % (record.filename, record.lineno),
            "func": record.funcName,
            "message": record.getMessage(),
      }

      return json.dumps(log_record, ensure_ascii=False, default=str)


class TrackingIDFilter(logging.Filter):
    """日志过滤器,为日志记录添加 tracking_id。"""

    def filter(self, record):
      record.tracking_id = g.get("tracking_id", None)
      return True


def _setup_console_handler(level: int) -&gt; logging.StreamHandler:
    """设置控制台日志处理器。

    Args:
      level (int): 日志级别。
    """
    handler = logging.StreamHandler(sys.stdout)
    handler.setLevel(level)
    handler.setFormatter(JSONFormatter())
    return handler


def setup_app_logger(level: int = logging.INFO, name: str = "app") -&gt; logging.Logger:
    logger = logging.getLogger(name)

    if logger.hasHandlers():
      return logger

    logger.setLevel(level)
    logger.propagate = False

    logger.addHandler(_setup_console_handler(level))
    logger.addFilter(TrackingIDFilter())

    return logger
</code></pre>
<p>在 <code>pkgs/log/__init__.py</code> 中初始化 <code>logger</code>,实现单例调用。</p>
<pre><code class="language-python">from .app_log import setup_app_logger

logger = setup_app_logger()

__all__ = ["logger"]
</code></pre>
<h2 id="自定义响应类">自定义响应类</h2>
<p>规范 JSON 类型的响应格式,并在响应头中添加 <code>X-Tracking-ID</code> 和 <code>X-DateTime</code>。</p>
<p>代码文件 <code>responses/json_response.py</code></p>
<pre><code class="language-python">import json
from datetime import datetime
from http import HTTPStatus
from typing import Any

from flask import Response, g, request


class JsonResponse(Response):
    def __init__(
      self,
      data: Any = None,
      code: HTTPStatus = HTTPStatus.OK,
      msg: str = "this is a json response",
    ):
      x_tracking_id = g.get("tracking_id", "")
      x_datetime = datetime.now().astimezone().isoformat(timespec="seconds")
      resp_headers = {
            "Content-Type": "application/json",
            "X-Tracking-ID": x_tracking_id,
            "X-DateTime": x_datetime,
      }
      try:
            resp = json.dumps(
                {
                  "code": code.value,
                  "msg": msg,
                  "data": data,
                },
                ensure_ascii=False,
                default=str,
            )
      except Exception as e:
            resp = json.dumps(
                {
                  "code": HTTPStatus.INTERNAL_SERVER_ERROR.value,
                  "msg": f"Response serialization error: {str(e)}",
                  "data": None,
                }
            )
      super().__init__(response=resp, status=code.value, headers=resp_headers)


class Success(JsonResponse):
    def __init__(self, data: Any = None, msg: str = ""):
      if not msg:
            msg = f"{request.method} {request.path} success"
      super().__init__(data=data, code=HTTPStatus.OK, msg=msg)


class Fail(JsonResponse):
    def __init__(self, msg: str = "", data: Any = None):
      if not msg:
            msg = f"{request.method} {request.path} failed"
      super().__init__(data=data, code=HTTPStatus.INTERNAL_SERVER_ERROR, msg=msg)


class ArgumentNotFound(JsonResponse):
    def __init__(self, msg: str = "", data: Any = None):
      if not msg:
            msg = f"{request.method} {request.path} argument not found"
      super().__init__(data=data, code=HTTPStatus.BAD_REQUEST, msg=msg)


class ArgumentInvalid(JsonResponse):
    def __init__(self, msg: str = "", data: Any = None):
      if not msg:
            msg = f"{request.method} {request.path} argument invalid"
      super().__init__(data=data, code=HTTPStatus.BAD_REQUEST, msg=msg)


class AuthFailed(JsonResponse):
    """HTTP 状态码: 401"""

    def __init__(self, msg: str = "", data: Any = None):
      if not msg:
            msg = f"{request.method} {request.path} auth failed"
      super().__init__(data=data, code=HTTPStatus.UNAUTHORIZED, msg=msg)


class ResourceConflict(JsonResponse):
    """HTTP 状态码: 409"""

    def __init__(self, msg: str = "", data: Any = None):
      if not msg:
            msg = f"{request.method} {request.path} resource conflict"
      super().__init__(data=data, code=HTTPStatus.CONFLICT, msg=msg)


class ResourceNotFound(JsonResponse):
    """HTTP 状态码: 404"""

    def __init__(self, msg: str = "", data: Any = None):
      if not msg:
            msg = f"{request.method} {request.path} resource not found"
      super().__init__(data=data, code=HTTPStatus.NOT_FOUND, msg=msg)


class ResourceForbidden(JsonResponse):
    """HTTP 状态码: 403"""

    def __init__(self, msg: str = "", data: Any = None):
      if not msg:
            msg = f"{request.method} {request.path} resource forbidden"
      super().__init__(data=data, code=HTTPStatus.FORBIDDEN, msg=msg)

</code></pre>
<p>代码文件 <code>responses/__init__.py</code>,方便其他模块调用。</p>
<pre><code class="language-python">from .json_response import (
    ArgumentInvalid,
    ArgumentNotFound,
    AuthFailed,
    Fail,
    JsonResponse,
    ResourceConflict,
    ResourceForbidden,
    ResourceNotFound,
    Success,
)

__all__ = [
    "JsonResponse",
    "Success",
    "Fail",
    "ArgumentNotFound",
    "ArgumentInvalid",
    "AuthFailed",
    "ResourceConflict",
    "ResourceNotFound",
    "ResourceForbidden",
]
</code></pre>
<h2 id="编写视图函数">编写视图函数</h2>
<p>代码文件 <code>apis/common/common.py</code>。以下定义了 5 个路由,主要用于测试响应类是否正常返回 JSON 格式。</p>
<pre><code class="language-python">from datetime import datetime

from flask import Blueprint

from handles import user as user_handle
from pkgs.log import logger
from responses import Success

route = Blueprint("common_apis", __name__, url_prefix="/api")


@route.get("/health")
def health_check():
    # print(g.get("tracking_id", "no-tracking-id"))
    logger.info("Health check")
    return Success(data="OK")


@route.get("/users")
def get_users():
    users = user_handle.get_users()
    return Success(data=users)


@route.get("/names")
def get_names():
    names = ["Alice", "Bob", "Charlie"]
    return Success(data=names)


@route.get("/item")
def get_item():
    item = {"id": 101, "name": "Sample Item", "price": 29.99, "now": datetime.now()}
    return Success(data=item)


@route.get("/error")
def get_error():
    raise Exception("This is a test exception")

</code></pre>
<p><code>GET /api/users</code> 调用了 <code>handles/</code> 中的代码,模拟查询数据库。<code>handles/user.py</code> 中的代码如下:</p>
<pre><code class="language-python">import time
from typing import Any, Dict, List


def get_users() -&gt; List]:
    # 模拟查询用户数据
    time.sleep(0.1)# 模拟延迟
    users = [{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}]
    return users
</code></pre>
<p>代码文件 <code>apis/common/__init__.py</code> 中导入各个蓝图并统一暴露。由于示例代码只定义了一个蓝图,所以这里写得很简单。如果有多个蓝图,可以把蓝图都添加到一个列表中,在 Flask 应用中一次性遍历注册。</p>
<pre><code class="language-python">from .common import route
# from .common import route as common_route

# routes = [
#   common_route,
# ]

__all__ = ["route"]
</code></pre>
<p>代码文件 <code>apis/__init__.py</code> 中提供 Flask 应用的工厂函数。</p>
<pre><code class="language-python">import traceback

from flask import Flask

from apis.common import route as common_route
from middlewares import tracking_id_middleware
from responses import Fail, ResourceNotFound
from pkgs.log import logger



# 错误处理器
def error_handler_notfound(error):
    return ResourceNotFound()


def error_handler_generic(error):
    logger.error(traceback.format_exc())
    return Fail(data=str(error))



def create_app() -&gt; Flask:
    app = Flask(__name__)

    # 注册中间件
    app = tracking_id_middleware(app)

    # 注册错误处理器
    app.errorhandler(Exception)(error_handler_generic)
    app.errorhandler(404)(error_handler_notfound)

    # 注册蓝图
    app.register_blueprint(common_route)

    return app

__all__ = [
    "create_app",
]
</code></pre>
<p>入口代码文件 <code>main.py</code></p>
<pre><code class="language-python">from apis import create_app

app = create_app()

if __name__ == "__main__":
    app.run(host="127.0.0.1", port=8000, debug=False)
</code></pre>
<h2 id="简单运行测试">简单运行测试</h2>
<ol>
<li>启动应用</li>
</ol>
<pre><code class="language-bash"># 方式1, 直接启动, 用于简单测试
python main.py

# 方式2, 使用 gunicorn, 这是生产环境启动方式. 配置文件默认路径即 ./gunicorn.conf.py
gunicorn main:app
</code></pre>
<ol start="2">
<li>curl 请求 <code>/api/health</code>。可以看到响应头中已经有了 <code>X-Tracking-ID</code> 和 <code>X-DateTime</code></li>
</ol>
<pre><code class="language-bash">$ curl -v http://127.0.0.1:8000/api/health
*   Trying 127.0.0.1:8000...
* Connected to 127.0.0.1 (127.0.0.1) port 8000
* using HTTP/1.x
&gt; GET /api/health HTTP/1.1
&gt; Host: 127.0.0.1:8000
&gt; User-Agent: curl/8.14.1
&gt; Accept: */*
&gt;
* Request completely sent off
&lt; HTTP/1.1 200 OK
&lt; Server: gunicorn
&lt; Date: Sat, 17 Jan 2026 08:41:07 GMT
&lt; Connection: keep-alive
&lt; Content-Type: application/json
&lt; X-Tracking-ID: 1f0adb8d-9bee-49d4-873f-31aa1437da60
&lt; X-DateTime: 2026-01-17T16:41:07+08:00
&lt; Content-Length: 61
&lt;
* Connection #0 to host 127.0.0.1 left intact
{"code": 200, "msg": "GET /api/health success", "data": "OK"}
</code></pre>
<ol start="3">
<li>curl 请求 <code>/api/users</code>。手动指定请求头中的 <code>X-Tracking-ID</code>,响应时也会保持相同的 ID。</li>
</ol>
<pre><code class="language-bash">$ curl -v http://127.0.0.1:8000/api/users -H 'X-Tracking-ID:123456'
*   Trying 127.0.0.1:8000...
* Connected to 127.0.0.1 (127.0.0.1) port 8000
* using HTTP/1.x
&gt; GET /api/users HTTP/1.1
&gt; Host: 127.0.0.1:8000
&gt; User-Agent: curl/8.14.1
&gt; Accept: */*
&gt; X-Tracking-ID:123456
&gt;
* Request completely sent off
&lt; HTTP/1.1 200 OK
&lt; Server: gunicorn
&lt; Date: Sat, 17 Jan 2026 08:44:37 GMT
&lt; Connection: keep-alive
&lt; Content-Type: application/json
&lt; X-Tracking-ID: 123456
&lt; X-DateTime: 2026-01-17T16:44:37+08:00
&lt; Content-Length: 110
&lt;
* Connection #0 to host 127.0.0.1 left intact
{"code": 200, "msg": "GET /api/users success", "data": [{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}]}
</code></pre>
<h2 id="编写单元测试">编写单元测试</h2>
<p>使用 pytest 进行单元测试,这里只是一个简单的示例</p>
<h3 id="配置-pytest">配置 pytest</h3>
<p>配置文件 <code>pytest.ini</code></p>
<pre><code class="language-ini">
testpaths = "tests"
pythonpath = "."
</code></pre>
<h3 id="测试代码">测试代码</h3>
<p>代码文件 <code>tests/apis/test_common.py</code></p>
<pre><code class="language-python">from typing import Generator
from unittest.mock import MagicMock, patch

import pytest
from flask import Flask
from flask.testing import FlaskClient

from apis.common import route as common_route


@pytest.fixture
def app() -&gt; Generator:
    app = Flask(__name__)
    app.config.update(
      {
            "TESTING": True,
            "DEBUG": False,
      }
    )
    app.register_blueprint(common_route)
    yield app


@pytest.fixture
def client(app: Flask) -&gt; FlaskClient:
    return app.test_client()


class TestGetHealth:
    def test_get_health_success(self, client: FlaskClient) -&gt; None:
      resp = client.get("/api/health")
      assert resp.status_code == 200

      resp_headers = resp.headers
      assert resp_headers.get("Content-Type") == "application/json"
      assert "X-Tracking-ID" in resp_headers
      assert "X-DateTime" in resp_headers

      resp_body = resp.json
      assert resp_body == {
            "code": 200,
            "msg": "GET /api/health success",
            "data": "OK",
      }


class TestGetUsers:
    @patch("apis.common.common.user_handle.get_users")
    def test_get_users(self, mock_get_users: MagicMock, client: FlaskClient) -&gt; None:
      # mock user.get_users() 的返回值
      mock_get_users.return_value = [
            {"id": 1, "name": "Alice123"},
            {"id": 2, "name": "Bob456"},
      ]

      # 发送请求
      resp = client.get("/api/users")
      assert resp.status_code == 200

      resp_headers = resp.headers
      assert resp_headers.get("Content-Type") == "application/json"
      assert "X-Tracking-ID" in resp_headers
      assert "X-DateTime" in resp_headers

      # resp_body = resp.json

      mock_get_users.assert_called_once()

</code></pre>
<h3 id="执行测试">执行测试</h3>
<pre><code class="language-shell">pytest -vv
</code></pre>
<h2 id="配置-gunicorn">配置 Gunicorn</h2>
<p>代码文件 <code>gunicorn.conf.py</code>。简单配置了一些启动参数,以及请求日志的格式。</p>
<pre><code class="language-python"># Gunicorn 配置文件
from pathlib import Path
from multiprocessing import cpu_count
import gunicorn.glogging
from datetime import datetime

class CustomLogger(gunicorn.glogging.Logger):
    def atoms(self, resp, req, environ, request_time):
      """
      重写 atoms 方法来自定义日志占位符
      """
      # 获取默认的所有占位符数据
      atoms = super().atoms(resp, req, environ, request_time)
      
      # 自定义 't' (时间戳) 的格式
      now = datetime.now().astimezone()
      atoms['t'] = now.isoformat(timespec="seconds")
      
      return atoms
   

# 预加载应用代码
preload_app = True

# 工作进程数量:通常是 CPU 核心数的 2 倍加 1
# workers = int(cpu_count() * 2 + 1)
workers = 2

# 使用 gevent 异步 worker 类型,适合 I/O 密集型应用
# 注意:gevent worker 不使用 threads 参数,而是使用协程进行并发处理
worker_class = "gevent"

# 每个 gevent worker 可处理的最大并发连接数
worker_connections = 2000

# 绑定地址和端口
bind = "127.0.0.1:8000"

# 进程名称
proc_name = "flask-dev"

# PID 文件路径
pidfile = str(Path(__file__).parent / "tmp" / "gunicorn.pid")

logger_class = CustomLogger
access_log_format = (
    '{"@timestamp": "%(t)s", '
    '"remote_addr": "%(h)s", '
    '"protocol": "%(H)s", '
    '"host": "%({host}i)s", '
    '"request_method": "%(m)s", '
    '"request_path": "%(U)s", '
    '"status_code": %(s)s, '
    '"response_length": %(b)s, '
    '"referer": "%(f)s", '
    '"user_agent": "%(a)s", '
    '"x_tracking_id": "%({x-tracking-id}i)s", '
    '"request_time": %(L)s}'
)

# 访问日志路径
accesslog = str(Path(__file__).parent / "logs" / "access.log")

# 错误日志路径
errorlog = str(Path(__file__).parent / "logs" / "error.log")

# 日志级别
loglevel = "debug"
</code></pre>
<p>输出的日志格式。可以看到日志格式符合 JSON 规范,便于 Filebeat 收集后在 Kibana 上检索。</p>
<pre><code class="language-shell">$ tail -n 1 logs/access.log | python3 -m json.tool
{
    "@timestamp": "2026-01-17T16:44:37+08:00",
    "remote_addr": "127.0.0.1",
    "protocol": "HTTP/1.1",
    "host": "127.0.0.1:8000",
    "request_method": "GET",
    "request_path": "/api/users",
    "status_code": 200,
    "response_length": 110,
    "referer": "-",
    "user_agent": "curl/8.14.1",
    "x_tracking_id": "123456",
    "request_time": 0.102042
}
</code></pre>
<h2 id="补充">补充</h2>
<h3 id="全局对象-g-的注意事项">全局对象 g 的注意事项</h3>
<ol>
<li><code>g</code> 不是进程或线程共享的全局变量,请只在请求处理流程中使用 <code>g</code>。</li>
<li>如果视图函数中启动了后台线程或异步任务,在子线程中直接访问 <code>g</code> 通常会报错或获取不到数据。这时建议显式传递数据。</li>
<li>不要在 <code>g</code> 中存储大文件或数据对象,否则会占用过高内存。</li>
<li><code>g</code> 不是 <code>session</code>。</li>
</ol>


</div>
<div id="MySignature" role="contentinfo">
    <p>本文来自博客园,作者:花酒锄作田,转载请注明原文链接:https://www.cnblogs.com/XY-Heruo/p/19496753</p><br><br>
来源:https://www.cnblogs.com/XY-Heruo/p/19496753
頁: [1]
查看完整版本: [python]Flask - Tracking ID的设计