刘入铭 發表於 2026-2-8 16:55:00

SQLAlchemy中使用UPSERT

<h2 id="前言">前言</h2>
<p>SQLite 和 PostgreSQL 都支持 UPSERT 操作,即"有则更新,无则新增"。冲突列必须有唯一约束。</p>
<p>语法:</p>
<ul>
<li>PostgreSQL: <code>INSERT ... ON CONFLICT (column) DO UPDATE/NOTHING</code></li>
<li>SQLite: <code>INSERT ... ON CONFLICT(column) DO UPDATE/NOTHING</code>。注意括号位置</li>
</ul>
<table>
<thead>
<tr>
<th>场景</th>
<th>PostgreSQL</th>
<th>SQLite</th>
<th>说明</th>
</tr>
</thead>
<tbody>
<tr>
<td><strong>基本 UPSERT</strong></td>
<td><code>ON CONFLICT (col) DO UPDATE SET ...</code></td>
<td><code>ON CONFLICT(col) DO UPDATE SET ...</code></td>
<td>括号位置略有不同</td>
</tr>
<tr>
<td><strong>冲突忽略</strong></td>
<td><code>ON CONFLICT (col) DO NOTHING</code></td>
<td><code>ON CONFLICT(col) DO NOTHING</code></td>
<td>相同</td>
</tr>
<tr>
<td><strong>引用新值</strong></td>
<td><code>EXCLUDED.col</code></td>
<td><code>excluded.col</code></td>
<td>PostgreSQL 大写,SQLite 小写</td>
</tr>
<tr>
<td><strong>返回结果</strong></td>
<td><code>RETURNING *</code></td>
<td><code>RETURNING *</code></td>
<td>相同</td>
</tr>
<tr>
<td><strong>条件更新</strong></td>
<td><code>WHERE condition</code></td>
<td>不支持 WHERE</td>
<td>SQLite 限制</td>
</tr>
</tbody>
</table>
<h2 id="注意事项">注意事项</h2>
<ul>
<li>冲突列必须有唯一约束</li>
<li>PostgreSQL 和 SQLite 的语法相似,但仍有细微差别。使用原生 SQL 时需要注意。</li>
<li>SQLite 在 UPSERT 时不支持 WHERE 子句,需要改用 CASE 表达式或应用层过滤。</li>
<li>SQLite 3.35+ 版本才支持 RETURNING</li>
</ul>
<h2 id="excluded-和-returning">EXCLUDED 和 RETURNING</h2>
<h3 id="excluded">EXCLUDED</h3>
<p><code>EXCLUDED</code> 表示冲突时被拦截的新值。</p>
<pre><code class="language-sql">INSERT INTO users (email, name, age)
VALUES ('test@example.com', '新名字', 30)
ON CONFLICT (email) DO UPDATE SET
    name = EXCLUDED.name,   -- ← 引用新值 "新名字"
    age = EXCLUDED.age      -- ← 引用新值 30
</code></pre>
<table>
<thead>
<tr>
<th>场景</th>
<th>表达式</th>
<th>含义</th>
<th>示例值</th>
</tr>
</thead>
<tbody>
<tr>
<td><strong>原表字段</strong></td>
<td><code>users.name</code></td>
<td>冲突行的<strong>当前值</strong></td>
<td>"老名字"</td>
</tr>
<tr>
<td><strong>新值字段</strong></td>
<td><code>EXCLUDED.name</code></td>
<td>试图插入的<strong>新值</strong></td>
<td>"新名字"</td>
</tr>
<tr>
<td><strong>混合计算</strong></td>
<td><code>users.age + EXCLUDED.age</code></td>
<td>原值 + 新值</td>
<td>25 + 30 = 55</td>
</tr>
</tbody>
</table>
<p><strong>示例 1:累加库存</strong></p>
<pre><code class="language-sql">-- 商品库存累加:原库存 100 + 新增 50 = 150
INSERT INTO products (sku, stock)
VALUES ('IPHONE15', 50)
ON CONFLICT (sku) DO UPDATE SET
    stock = products.stock + EXCLUDED.stock-- 100 + 50
RETURNING stock;
</code></pre>
<p><strong>示例 2:仅更新非空字段</strong></p>
<pre><code class="language-sql">-- 如果新值为 NULL,保留原值
INSERT INTO users (email, name, age)
VALUES ('test@example.com', '新名字', NULL)
ON CONFLICT (email) DO UPDATE SET
    name = COALESCE(EXCLUDED.name, users.name),-- 新名字
    age = COALESCE(EXCLUDED.age, users.age)      -- 保留原 age
</code></pre>
<p><strong>示例 3:时间戳更新</strong></p>
<pre><code class="language-sql">-- 更新时刷新 updated_at
INSERT INTO users (email, name)
VALUES ('test@example.com', '新名字')
ON CONFLICT (email) DO UPDATE SET
    name = EXCLUDED.name,
    updated_at = NOW()-- PostgreSQL
    -- updated_at = CURRENT_TIMESTAMP-- SQLite
</code></pre>
<h3 id="returning">RETURNING</h3>
<p><code>RETURNING</code> 用于返回操作结果。在 <code>INSERT</code>/<code>UPDATE</code>/<code>DELETE</code> 后<strong>直接返回指定列</strong>,避免额外 <code>SELECT</code> 查询:</p>
<pre><code class="language-sql">INSERT INTO users (email, name)
VALUES ('test@example.com', '张三')
RETURNING id, email, name, created_at;
</code></pre>
<p><strong>示例 1:插入后立即获取 ID</strong></p>
<pre><code class="language-python"># PostgreSQL / SQLite 3.35+
sql = text("""
    INSERT INTO users (email, name)
    VALUES (:email, :name)
    RETURNING id, email, created_at
""")

result = await session.execute(sql, {"email": "test@example.com", "name": "张三"})
user = result.mappings().first()
print(user["id"])# 直接获取 ID
</code></pre>
<p><strong>示例 2:UPSERT 后统一返回</strong></p>
<pre><code class="language-sql">-- 无论插入还是更新,都返回最终状态
INSERT INTO users (email, name, login_count)
VALUES ('test@example.com', '张三', 1)
ON CONFLICT (email) DO UPDATE SET
    name = EXCLUDED.name,
    login_count = users.login_count + 1-- 累加登录次数
RETURNING
    id,
    email,
    name,
    login_count,
    CASE
      WHEN xmax = 0 THEN 'inserted'-- PostgreSQL 特有:xmax=0 表示插入
      ELSE 'updated'
    END AS action
</code></pre>
<p><strong>示例 3:批量操作返回所有结果</strong></p>
<pre><code class="language-sql">-- PostgreSQL 支持批量 RETURNING
INSERT INTO users (email, name)
VALUES
    ('a@example.com', 'A'),
    ('b@example.com', 'B')
ON CONFLICT (email) DO UPDATE SET
    name = EXCLUDED.name
RETURNING id, email, name;
</code></pre>
<p>Python 处理批量返回:</p>
<pre><code class="language-python">result = await session.execute(sql)
users =
# [{'id': 1, 'email': 'a@example.com', 'name': 'A'}, ...]
</code></pre>
<h3 id="示例用户登录计数器">示例:用户登录计数器</h3>
<pre><code class="language-python">async def record_user_login(session: AsyncSession, email: str, name: str) -&gt; dict:
    """
    用户登录计数器:
    - 新用户:插入,login_count = 1
    - 老用户:更新,login_count += 1
    - 返回最终状态 + 操作类型
    """
    sql = text("""
      INSERT INTO users (
            email, name, login_count, last_login, created_at
      ) VALUES (
            :email, :name, 1, :now, :now
      )
      ON CONFLICT (email) DO UPDATE SET
            name = EXCLUDED.name,                        -- 更新用户名
            login_count = users.login_count + 1,         -- 累加登录次数
            last_login = EXCLUDED.last_login               -- 更新最后登录时间
      RETURNING
            id,
            email,
            name,
            login_count,
            last_login,
            created_at,
            CASE
                WHEN xmax = 0 THEN 'inserted'
                ELSE 'updated'
            END AS action-- PostgreSQL 特有:区分插入/更新
    """)
   
    now = datetime.utcnow()
    result = await session.execute(
      sql,
      {"email": email, "name": name, "now": now}
    )
   
    row = result.mappings().first()
    return dict(row) if row else None

# 使用示例
user = await record_user_login(session, "test@example.com", "张三")
print(f"{user['action']} user {user['email']} with {user['login_count']} logins")
# 输出: inserted user test@example.com with 1 logins
# 或: updated user test@example.com with 5 logins
</code></pre>
<h2 id="示例数据模型类">示例数据模型类</h2>
<pre><code class="language-python">from sqlalchemy import Column, Integer, String, UniqueConstraint
from sqlalchemy.orm import DeclarativeBase

class Base(DeclarativeBase):
    pass

class User(Base):
    __tablename__ = "users"
   
    id = Column(Integer, primary_key=True, autoincrement=True)
    email = Column(String(100), unique=True, nullable=False)# 唯一约束
    name = Column(String(50))
    age = Column(Integer)
    balance = Column(Integer, default=0)
   
    __table_args__ = (
      UniqueConstraint("email", name="uq_users_email"),
    )

class Product(Base):
    __tablename__ = "products"
   
    id = Column(Integer, primary_key=True)
    sku = Column(String(50), unique=True, nullable=False)# 唯一 SKU
    name = Column(String(100))
    stock = Column(Integer, default=0)
    price = Column(Integer)
</code></pre>
<h2 id="orm-方式">ORM 方式</h2>
<p>注意 <code>insert</code> 的导入路径。</p>
<h3 id="基本示例">基本示例</h3>
<pre><code class="language-python">from sqlalchemy.dialects.postgresql import insert as pg_insert
from sqlalchemy.dialects.sqlite import insert as sqlite_insert
from sqlalchemy import insert

async def upsert_user_orm(session: AsyncSession, user_data: dict) -&gt; dict:
    """
    UPSERT 用户(ORM 风格)
    如果 email 冲突则更新,否则插入
    """
   
    # 方式 1:使用通用 insert(推荐⭐)
    # SQLAlchemy 会根据方言自动选择正确的语法
    stmt = (
      insert(User)
      .values(**user_data)
      .on_conflict_do_update(
            index_elements=["email"],# 冲突检测列(唯一约束)
            set_={
                "name": user_data["name"],
                "age": user_data.get("age"),
                "updated_at": func.now()# 假设有 updated_at 列
            }
      )
      .returning(User)# 返回插入/更新后的行
    )
   
    result = await session.execute(stmt)
    user = result.scalar_one()
   
    return {
      "id": user.id,
      "email": user.email,
      "name": user.name,
      "age": user.age
    }

async def upsert_user_ignore(session: AsyncSession, user_data: dict) -&gt; bool:
    """
    UPSERT 但冲突时忽略(DO NOTHING)
    """
    stmt = (
      insert(User)
      .values(**user_data)
      .on_conflict_do_nothing(
            index_elements=["email"]
      )
    )
   
    result = await session.execute(stmt)
    return result.rowcount &gt; 0# 返回是否插入成功
</code></pre>
<h3 id="条件更新仅更新特定字段">条件更新:仅更新特定字段</h3>
<pre><code class="language-python">async def upsert_user_conditional(session: AsyncSession, user_data: dict) -&gt; dict:
    """
    UPSERT:冲突时只更新非空字段
    """
    stmt = (
      insert(User)
      .values(**user_data)
      .on_conflict_do_update(
            index_elements=["email"],
            set_={
                "name": user_data["name"],
                # 条件:只有提供了 age 才更新
                "age": user_data.get("age", User.age),# 保持原值
            },
            # 可选:添加 WHERE 条件
            where=User.email == user_data["email"]
      )
      .returning(User)
    )
   
    result = await session.execute(stmt)
    return result.mappings().first()
</code></pre>
<h3 id="批量-upsert">批量 UPSERT</h3>
<pre><code class="language-python">async def bulk_upsert_users(session: AsyncSession, users: list) -&gt; int:
    """
    批量 UPSERT 用户
    """
    stmt = (
      insert(User)
      .values(users)
      .on_conflict_do_update(
            index_elements=["email"],
            set_={
                "name": insert(User).excluded.name,# 使用 excluded 表示新值
                "age": insert(User).excluded.age,
            }
      )
    )
   
    result = await session.execute(stmt)
    return result.rowcount
</code></pre>
<h3 id="使用-excluded-引用新值">使用 EXCLUDED 引用新值</h3>
<pre><code class="language-python">async def upsert_product_with_stock(session: AsyncSession, product_data: dict) -&gt; dict:
    """
    UPSERT 产品:冲突时累加库存
    """
    stmt = (
      insert(Product)
      .values(**product_data)
      .on_conflict_do_update(
            index_elements=["sku"],
            set_={
                # 累加库存:原库存 + 新库存
                "stock": Product.stock + insert(Product).excluded.stock,
                # 更新其他字段
                "name": insert(Product).excluded.name,
                "price": insert(Product).excluded.price,
            }
      )
      .returning(Product)
    )
   
    result = await session.execute(stmt)
    return result.mappings().first()
</code></pre>
<h3 id="用户服务">用户服务</h3>
<pre><code class="language-python">class UserService:
    """用户服务(支持 UPSERT)"""
   
    def __init__(self, session: AsyncSession):
      self.session = session
   
    async def create_or_update(self, email: str, name: str, age: int | None = None) -&gt; dict:
      """创建或更新用户"""
      stmt = (
            insert(User)
            .values(
                email=email,
                name=name,
                age=age,
                created_at=datetime.utcnow()
            )
            .on_conflict_do_update(
                index_elements=["email"],
                set_={
                  "name": name,
                  "age": age,
                  "updated_at": datetime.utcnow()
                }
            )
            .returning(User)
      )
      
      result = await self.session.execute(stmt)
      user = result.scalar_one()
      
      return {
            "id": user.id,
            "email": user.email,
            "name": user.name,
            "age": user.age
      }
   
    async def bulk_create_or_update(self, users: list) -&gt; int:
      """批量创建或更新"""
      stmt = (
            insert(User)
            .values(users)
            .on_conflict_do_update(
                index_elements=["email"],
                set_={
                  "name": insert(User).excluded.name,
                  "age": insert(User).excluded.age,
                  "updated_at": datetime.utcnow()
                }
            )
      )
      
      result = await self.session.execute(stmt)
      return result.rowcount
   
    async def create_if_not_exists(self, email: str, name: str) -&gt; bool:
      """仅当不存在时创建"""
      stmt = (
            insert(User)
            .values(
                email=email,
                name=name,
                created_at=datetime.utcnow()
            )
            .on_conflict_do_nothing(
                index_elements=["email"]
            )
      )
      
      result = await self.session.execute(stmt)
      return result.rowcount &gt; 0# True = 插入成功,False = 已存在
</code></pre>
<h2 id="原生-sql">原生 SQL</h2>
<h3 id="基本示例-1">基本示例</h3>
<p><strong>PostgreSQL</strong></p>
<pre><code class="language-python">async def upsert_user_pg(session: AsyncSession, user_data: dict) -&gt; dict | None:
    """
    PostgreSQL 原生 UPSERT
    """
    sql = text("""
      INSERT INTO users (email, name, age, created_at)
      VALUES (:email, :name, :age, :created_at)
      ON CONFLICT (email) DO UPDATE-- 冲突列
      SET
            name = EXCLUDED.name,      -- EXCLUDED 表示新插入的值
            age = EXCLUDED.age,
            updated_at = NOW()
      RETURNING id, email, name, age
    """)
   
    result = await session.execute(
      sql,
      {
            "email": user_data["email"],
            "name": user_data["name"],
            "age": user_data.get("age"),
            "created_at": datetime.utcnow()
      }
    )
   
    row = result.mappings().first()
    return dict(row) if row else None
</code></pre>
<p><strong>SQLite</strong></p>
<pre><code class="language-python">async def upsert_user_sqlite(session: AsyncSession, user_data: dict) -&gt; dict | None:
    """
    SQLite 原生 UPSERT(语法与 PostgreSQL 几乎相同)
    """
    sql = text("""
      INSERT INTO users (email, name, age, created_at)
      VALUES (:email, :name, :age, :created_at)
      ON CONFLICT(email) DO UPDATE SET-- SQLite 语法稍有不同
            name = excluded.name,
            age = excluded.age,
            updated_at = CURRENT_TIMESTAMP
      RETURNING id, email, name, age
    """)
   
    result = await session.execute(
      sql,
      {
            "email": user_data["email"],
            "name": user_data["name"],
            "age": user_data.get("age"),
            "created_at": datetime.utcnow()
      }
    )
   
    row = result.mappings().first()
    return dict(row) if row else None
</code></pre>
<h3 id="冲突时忽略">冲突时忽略</h3>
<pre><code class="language-python">async def insert_or_ignore_user(session: AsyncSession, user_data: dict) -&gt; bool:
    """
    插入用户,如果冲突则忽略
    """
    # PostgreSQL
    sql = text("""
      INSERT INTO users (email, name, age, created_at)
      VALUES (:email, :name, :age, :created_at)
      ON CONFLICT (email) DO NOTHING
    """)
   
    # SQLite(语法相同)
    # sql = text("""
    #   INSERT INTO users (email, name, age, created_at)
    #   VALUES (:email, :name, :age, :created_at)
    #   ON CONFLICT(email) DO NOTHING
    # """)
   
    result = await session.execute(
      sql,
      {
            "email": user_data["email"],
            "name": user_data["name"],
            "age": user_data.get("age"),
            "created_at": datetime.utcnow()
      }
    )
   
    return result.rowcount &gt; 0# 返回是否插入成功
</code></pre>
<h3 id="批量-upsert-1">批量 UPSERT</h3>
<pre><code class="language-python">async def bulk_upsert_products(session: AsyncSession, products: list) -&gt; int:
    """
    批量 UPSERT 产品(原生 SQL)
    """
    # PostgreSQL
    sql = text("""
      INSERT INTO products (sku, name, stock, price, created_at)
      VALUES (
            :sku, :name, :stock, :price, :created_at
      )
      ON CONFLICT (sku) DO UPDATE SET
            name = EXCLUDED.name,
            stock = products.stock + EXCLUDED.stock,-- 累加库存
            price = EXCLUDED.price,
            updated_at = NOW()
    """)
   
    # 批量执行
    for product in products:
      await session.execute(
            sql,
            {
                "sku": product["sku"],
                "name": product["name"],
                "stock": product.get("stock", 0),
                "price": product.get("price", 0),
                "created_at": datetime.utcnow()
            }
      )
   
    return len(products)
</code></pre>
<h3 id="部分更新--条件判断">部分更新 + 条件判断</h3>
<pre><code class="language-python">async def upsert_user_smart(session: AsyncSession, user_data: dict) -&gt; dict | None:
    """
    智能 UPSERT:
    - 如果提供了 age,才更新 age
    - 如果提供了 name,才更新 name
    - 更新 updated_at
    """
    sql = text("""
      INSERT INTO users (email, name, age, created_at)
      VALUES (:email, :name, :age, :created_at)
      ON CONFLICT (email) DO UPDATE SET
            name = COALESCE(:name, users.name),-- 如果新值为 NULL,保持原值
            age = COALESCE(:age, users.age),
            updated_at = NOW()
      RETURNING id, email, name, age, updated_at
    """)
   
    result = await session.execute(
      sql,
      {
            "email": user_data["email"],
            "name": user_data.get("name"),# 可能为 None
            "age": user_data.get("age"),    # 可能为 None
            "created_at": datetime.utcnow()
      }
    )
   
    row = result.mappings().first()
    return dict(row) if row else None
</code></pre>
<h3 id="用户注册登录存在则更新最后登录时间">用户注册/登录:存在则更新最后登录时间</h3>
<pre><code class="language-python">async def register_or_login(session: AsyncSession, email: str, name: str) -&gt; dict:
    """
    用户注册或登录:
    - 新用户:插入
    - 老用户:更新最后登录时间
    """
    sql = text("""
      INSERT INTO users (email, name, last_login, created_at)
      VALUES (:email, :name, :now, :now)
      ON CONFLICT (email) DO UPDATE SET
            last_login = EXCLUDED.last_login,
            name = EXCLUDED.name-- 可选:更新用户名
      RETURNING id, email, name, last_login, created_at
    """)
   
    now = datetime.utcnow()
    result = await session.execute(
      sql,
      {"email": email, "name": name, "now": now}
    )
   
    return dict(result.mappings().first())
</code></pre>
<h3 id="库存累加">库存累加</h3>
<pre><code class="language-python">async def add_product_stock(session: AsyncSession, sku: str, quantity: int) -&gt; bool:
    """
    增加商品库存:
    - 商品不存在:插入
    - 商品存在:累加库存
    """
    sql = text("""
      INSERT INTO products (sku, stock, created_at)
      VALUES (:sku, :quantity, :now)
      ON CONFLICT (sku) DO UPDATE SET
            stock = products.stock + EXCLUDED.stock,
            updated_at = NOW()
    """)
   
    result = await session.execute(
      sql,
      {
            "sku": sku,
            "quantity": quantity,
            "now": datetime.utcnow()
      }
    )
   
    return result.rowcount &gt; 0
</code></pre>
<h3 id="用户积分累加">用户积分累加</h3>
<pre><code class="language-python">async def add_user_points(session: AsyncSession, user_id: int, points: int) -&gt; dict | None:
    """
    增加用户积分(累加)
    """
    sql = text("""
      INSERT INTO user_points (user_id, points, created_at)
      VALUES (:user_id, :points, :now)
      ON CONFLICT (user_id) DO UPDATE SET
            points = user_points.points + EXCLUDED.points,
            updated_at = NOW()
      RETURNING user_id, points
    """)
   
    result = await session.execute(
      sql,
      {
            "user_id": user_id,
            "points": points,
            "now": datetime.utcnow()
      }
    )
   
    row = result.mappings().first()
    return dict(row) if row else None
</code></pre>
<h3 id="标签计数">标签计数</h3>
<p>存在则 +1,不存在则创建:</p>
<pre><code class="language-python">async def increment_tag_count(session: AsyncSession, tag_name: str) -&gt; int:
    """
    标签计数:
    - 标签不存在:插入 count=1
    - 标签存在:count += 1
    """
    sql = text("""
      INSERT INTO tags (name, count, created_at)
      VALUES (:name, 1, :now)
      ON CONFLICT (name) DO UPDATE SET
            count = tags.count + 1,
            updated_at = NOW()
      RETURNING count
    """)
   
    result = await session.execute(
      sql,
      {"name": tag_name, "now": datetime.utcnow()}
    )
   
    return result.scalar() or 0
</code></pre>


</div>
<div id="MySignature" role="contentinfo">
    <p>本文来自博客园,作者:花酒锄作田,转载请注明原文链接:https://www.cnblogs.com/XY-Heruo/p/19592220</p><br><br>
来源:https://www.cnblogs.com/XY-Heruo/p/19592220
頁: [1]
查看完整版本: SQLAlchemy中使用UPSERT