在Python+FastAPI项目中使用SqlAlchemy操作数据的几种常见方式
<p>在Python+FastAPI的后端项目中,我们往往很多时候需要对数据进行相关的处理,本篇随笔介绍在Python+FastAPI项目中使用SqlAlchemy操作数据的几种常见方式。</p><p>使用 FastAPI, SQLAlchemy, Pydantic构建后端项目的时候,其中数据库访问采用SQLAlchemy 的异步方式处理。一般我们在操作数据库操作的时候,采用基类继承的方式减少重复代码,提高代码复用性。不过我们在分析SQLAlchemy的时候,我们可以简单的方式来剖析几种常见的数据库操作方式,来介绍SQLAlchemy的具体使用。</p>
<h3>1、SQLAlchemy介绍</h3>
<div><strong>SQLAlchemy</strong> 是一个功能强大且灵活的 Python SQL 工具包和对象关系映射(ORM)库。它被广泛用于在 Python 项目中处理关系型数据库的场景,既提供了高级的 ORM 功能,又保留了对底层 SQL 语句的强大控制力。<code>SQLAlchemy</code> 允许开发者通过 Python 代码与数据库进行交互,而无需直接编写 SQL 语句,同时也支持直接使用原生 SQL 进行复杂查询。下面是<strong>SQLAlchemy</strong>和我们常规数据库对象的对应关系说明。</div>
<div>Engine 连接对象 驱动引擎</div>
<div>Session 连接池 事务 由此开始查询</div>
<div>Model 表 类定义</div>
<div>Column 列 </div>
<div>Query 若干行 可以链式添加多个条件</div>
<div> </div>
<div>在使用SQLAlchemy时,通常会将其与数据库对象对应起来。</div>
<div><strong>SQLAlchemy</strong>: 使用 <code>Table</code> 对象或 <code>Declarative Base</code> 中的类来表示。</div>
<div><strong>对应关系</strong>: 数据库中的每一个表对应于SQLAlchemy中的一个类,该类继承自 <code>declarative_base()</code>。</div>
<div>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 255, 1)">from</span> sqlalchemy <span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> Column, Integer, String, create_engine
</span><span style="color: rgba(0, 0, 255, 1)">from</span> sqlalchemy.ext.declarative <span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> declarative_base
Base </span>=<span style="color: rgba(0, 0, 0, 1)"> declarative_base()
</span><span style="color: rgba(0, 0, 255, 1)">class</span><span style="color: rgba(0, 0, 0, 1)"> User(Base):
</span><span style="color: rgba(128, 0, 128, 1)">__tablename__</span> = <span style="color: rgba(128, 0, 0, 1)">'</span><span style="color: rgba(128, 0, 0, 1)">users</span><span style="color: rgba(128, 0, 0, 1)">'</span><span style="color: rgba(0, 128, 0, 1)">#</span><span style="color: rgba(0, 128, 0, 1)"> 数据库表名</span>
id = Column(Integer, primary_key=<span style="color: rgba(0, 0, 0, 1)">True)
name </span>=<span style="color: rgba(0, 0, 0, 1)"> Column(String)
email </span>= Column(String)</pre>
</div>
<p><strong>数据库列 (Database Column):</strong>使用 <code>Column</code> 对象来表示。每个数据库表中的列在SQLAlchemy中表示为 <code>Column</code> 对象,并作为类的属性定义。</p>
<div>
<div class="cnblogs_code">
<pre>id = Column(Integer, primary_key=<span style="color: rgba(0, 0, 0, 1)">True)
name </span>= Column(String(50))</pre>
</div>
<p><strong>数据库行 (Database Row):</strong>每个数据库表的一个实例(对象)代表数据库表中的一行。在SQLAlchemy中,通过实例化模型类来表示数据库表中的一行。</p>
<div>
<div class="cnblogs_code">
<pre>new_user = User(id=1, name=<span style="color: rgba(128, 0, 0, 1)">'</span><span style="color: rgba(128, 0, 0, 1)">John Doe</span><span style="color: rgba(128, 0, 0, 1)">'</span>, email=<span style="color: rgba(128, 0, 0, 1)">'</span><span style="color: rgba(128, 0, 0, 1)">john@example.com</span><span style="color: rgba(128, 0, 0, 1)">'</span>)</pre>
</div>
<p><strong>主键 (Primary Key)</strong>:使用 <code>primary_key=True</code> 参数定义主键。</p>
</div>
</div>
<div>
<div class="cnblogs_code">
<pre>id = Column(Integer, primary_key=True)</pre>
</div>
<p><strong>外键 (Foreign Key): </strong>使用 <code>ForeignKey</code> 对象来表示。</p>
<div>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 255, 1)">from</span> sqlalchemy <span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> ForeignKey
</span><span style="color: rgba(0, 0, 255, 1)">from</span> sqlalchemy.orm <span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> relationship
</span><span style="color: rgba(0, 0, 255, 1)">class</span><span style="color: rgba(0, 0, 0, 1)"> Address(Base):
</span><span style="color: rgba(128, 0, 128, 1)">__tablename__</span> = <span style="color: rgba(128, 0, 0, 1)">'</span><span style="color: rgba(128, 0, 0, 1)">addresses</span><span style="color: rgba(128, 0, 0, 1)">'</span><span style="color: rgba(0, 0, 0, 1)">
id </span>= Column(Integer, primary_key=<span style="color: rgba(0, 0, 0, 1)">True)
user_id </span>= Column(Integer, ForeignKey(<span style="color: rgba(128, 0, 0, 1)">'</span><span style="color: rgba(128, 0, 0, 1)">users.id</span><span style="color: rgba(128, 0, 0, 1)">'</span><span style="color: rgba(0, 0, 0, 1)">))
user </span>= relationship(<span style="color: rgba(128, 0, 0, 1)">'</span><span style="color: rgba(128, 0, 0, 1)">User</span><span style="color: rgba(128, 0, 0, 1)">'</span>)</pre>
</div>
<p><strong>关系 (Relationships): </strong>使用 <code>relationship</code> 对象来表示。数据库中表与表之间的关系在SQLAlchemy中通过 <code>relationship</code> 来定义。</p>
<div>
<div class="cnblogs_code">
<pre>addresses = relationship(<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Address</span><span style="color: rgba(128, 0, 0, 1)">"</span>, back_populates=<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">user</span><span style="color: rgba(128, 0, 0, 1)">"</span>)</pre>
</div>
<p> </p>
</div>
</div>
</div>
</div>
<h3>2、常规的单表处理</h3>
<p>下面我们通过异步处理的方式,介绍如何在单表中操作相关的数据库数据。</p>
<div class="cnblogs_code">
<pre>async <span style="color: rgba(0, 0, 255, 1)">def</span> get(self, db: AsyncSession, id: Any) -><span style="color: rgba(0, 0, 0, 1)"> Any:
</span><span style="color: rgba(128, 0, 0, 1)">"""</span><span style="color: rgba(128, 0, 0, 1)">根据主键获取一个对象</span><span style="color: rgba(128, 0, 0, 1)">"""</span>
<span style="color: rgba(0, 0, 255, 1)">if</span><span style="color: rgba(0, 0, 0, 1)"> isinstance(id, str):
query </span>= select(self.model).filter(func.lower(self.model.id) ==<span style="color: rgba(0, 0, 0, 1)"> id.lower())
</span><span style="color: rgba(0, 0, 255, 1)">else</span><span style="color: rgba(0, 0, 0, 1)">:
query </span>= select(self.model).filter(self.model.id ==<span style="color: rgba(0, 0, 0, 1)"> id)
result </span>=<span style="color: rgba(0, 0, 0, 1)"> await db.execute(query)
item </span>=<span style="color: rgba(0, 0, 0, 1)"> result.scalars().first()
</span><span style="color: rgba(0, 0, 255, 1)">return</span> item</pre>
</div>
<p>如果我们需要强制对外键的类型进行匹配(如对于Postgresql的严格要求,数据比较的类型必须一致),那么我们需要在基类或者CRUD类初始化的时候,获得对应的主键类型。</p>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 255, 1)">class</span><span style="color: rgba(0, 0, 0, 1)"> BaseCrud(Generic):
</span><span style="color: rgba(128, 0, 0, 1)">"""</span><span style="color: rgba(128, 0, 0, 1)">
基础CRUD操作类,传入参数说明:
* `ModelType`: SQLAlchemy 模型类
* `PrimaryKeyType`: 限定主键的类型
* `PageDtoType`: 分页查询输入类
* `DtoType`: 数据传输对象类,如新增、更新的单个对象DTO
</span><span style="color: rgba(128, 0, 0, 1)">"""</span>
<span style="color: rgba(0, 0, 255, 1)">def</span> <span style="color: rgba(128, 0, 128, 1)">__init__</span><span style="color: rgba(0, 0, 0, 1)">(self, model: Type):
</span><span style="color: rgba(128, 0, 0, 1)">"""</span><span style="color: rgba(128, 0, 0, 1)">
数据库访问操作的基类对象(CRUD).
* `model`: A SQLAlchemy model class
</span><span style="color: rgba(128, 0, 0, 1)">"""</span><span style="color: rgba(0, 0, 0, 1)">
self.model </span>= model<span style="color: rgba(0, 128, 0, 1)">#</span><span style="color: rgba(0, 128, 0, 1)"> 模型类型</span>
<span style="color: rgba(0, 128, 0, 1)">#</span><span style="color: rgba(0, 128, 0, 1)"> 运行期获取主键字段类型</span>
<strong><span style="color: rgba(255, 0, 0, 1)">pk_column</span> =</strong><span style="color: rgba(0, 0, 0, 1)"><strong> inspect(model).primary_key</strong>
<span style="color: rgba(255, 0, 0, 1)"><strong> self._pk_type </strong></span></span><span style="color: rgba(255, 0, 0, 1)"><strong>= pk_column.type.python_type</strong></span><span style="color: rgba(0, 128, 0, 1)">#</span><span style="color: rgba(0, 128, 0, 1)"> int / str</span></pre>
</div>
<p>因此对于单表的Get方法,我们修改下,让他匹配主键的类型进行比较,这样过对于严格类型判断的Postgresql也正常匹配了。</p>
<div class="cnblogs_code">
<pre> async <span style="color: rgba(0, 0, 255, 1)">def</span> get(self, db: AsyncSession, id: PrimaryKeyType) -><span style="color: rgba(0, 0, 0, 1)"> Optional:
</span><span style="color: rgba(128, 0, 0, 1)">"""</span><span style="color: rgba(128, 0, 0, 1)">根据主键获取一个对象</span><span style="color: rgba(128, 0, 0, 1)">"""</span>
<span style="color: rgba(0, 128, 0, 1)">#</span><span style="color: rgba(0, 128, 0, 1)">对id的主键进行类型转换,self._pk_type在构造函数的初始化中获取</span>
<span style="color: rgba(0, 0, 255, 1)">try</span><span style="color: rgba(0, 0, 0, 1)">:
id </span>=<span style="color: rgba(0, 0, 0, 1)"> self._pk_type(id)
</span><span style="color: rgba(0, 0, 255, 1)">except</span><span style="color: rgba(0, 0, 0, 1)"> Exception:
</span><span style="color: rgba(0, 0, 255, 1)">raise</span> ValueError(f<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Invalid primary key type: {id}</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">)
</span><span style="color: rgba(0, 0, 255, 1)">if</span><span style="color: rgba(0, 0, 0, 1)"> isinstance(id, str):
query </span>= select(self.model).filter(func.lower(self.model.id) ==<span style="color: rgba(0, 0, 0, 1)"> id.lower())
</span><span style="color: rgba(0, 0, 255, 1)">else</span><span style="color: rgba(0, 0, 0, 1)">:
query </span>= select(self.model).filter(self.model.id ==<span style="color: rgba(0, 0, 0, 1)"> id)
result </span>=<span style="color: rgba(0, 0, 0, 1)"> await db.execute(query)
item </span>=<span style="color: rgba(0, 0, 0, 1)"> result.scalars().first()
</span><span style="color: rgba(0, 0, 255, 1)">return</span> item</pre>
</div>
<p>对于删除的数据,我们也可以类似的处理对比进行了。</p>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 255, 1)">from</span> sqlalchemy.orm <span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> Session, Query
</span><span style="color: rgba(0, 0, 255, 1)">from</span> sqlalchemy.ext.asyncio <span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> AsyncSession
</span><span style="color: rgba(0, 0, 255, 1)">from</span> sqlalchemy <span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> delete as sa_delete, update as sa_update
async </span><span style="color: rgba(0, 0, 255, 1)">def</span> delete_byid(self, db: AsyncSession, id: PrimaryKeyType) -><span style="color: rgba(0, 0, 0, 1)"> bool:
</span><span style="color: rgba(128, 0, 0, 1)">"""</span><span style="color: rgba(128, 0, 0, 1)">根据主键删除一个对象
:param id: 主键值
</span><span style="color: rgba(128, 0, 0, 1)">"""</span>
<span style="color: rgba(0, 128, 0, 1)">#</span><span style="color: rgba(0, 128, 0, 1)">对id的主键进行类型转换,self._pk_type在构造函数的初始化中获取</span>
<span style="color: rgba(0, 0, 255, 1)">try</span><span style="color: rgba(0, 0, 0, 1)">:
id </span>=<span style="color: rgba(0, 0, 0, 1)"> self._pk_type(id)
</span><span style="color: rgba(0, 0, 255, 1)">except</span><span style="color: rgba(0, 0, 0, 1)"> Exception:
</span><span style="color: rgba(0, 0, 255, 1)">raise</span> ValueError(f<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Invalid primary key type: {id}</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">)
del_query: sa_delete
</span><span style="color: rgba(0, 0, 255, 1)">if</span><span style="color: rgba(0, 0, 0, 1)"> isinstance(id, str):
del_query </span>=<span style="color: rgba(0, 0, 0, 1)"> sa_delete(self.model).where(
func.lower(self.model.id) </span>==<span style="color: rgba(0, 0, 0, 1)"> id.lower()
)
</span><span style="color: rgba(0, 0, 255, 1)">else</span><span style="color: rgba(0, 0, 0, 1)">:
del_query </span>= sa_delete(self.model).where(self.model.id ==<span style="color: rgba(0, 0, 0, 1)"> id)
result </span>=<span style="color: rgba(0, 0, 0, 1)"> await db.execute(del_query)
await db.commit()
</span><span style="color: rgba(0, 0, 255, 1)">return</span> result.rowcount > 0</pre>
</div>
<p>对于提供多条件的查询或者过滤,我们可以使用<strong>where</strong>函数或者<strong>filter</strong>函数,在 SQLAlchemy 中,<code>select(...).where(...)</code> 和 <code>select(...).filter(...)</code> 都用于构造查询条件,如下所示等效。</p>
<div class="cnblogs_code">
<pre>query = select(self.model).where(self.model.id ==<span style="color: rgba(0, 0, 0, 1)"> id)
query </span>= select(self.model).filter(self.model.id == id)</pre>
</div>
<p>我们可以通过sqlAlchemy的and_和or_函数来进行组合多个条件。</p>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 255, 1)">from</span> sqlalchemy <span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> ( Table,Column,and_,or_,asc,desc,select,func,distinct,text, Integer)
....
match expression:
case </span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">and</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">:
query </span>=<span style="color: rgba(0, 0, 0, 1)"> await db.execute(
select(self.model)
<span style="color: rgba(255, 0, 0, 1)"><strong> .filter(and_(</strong></span></span><span style="color: rgba(255, 0, 0, 1)"><strong>*</strong></span><span style="color: rgba(0, 0, 0, 1)"><span style="color: rgba(255, 0, 0, 1)"><strong>where_list))</strong></span>
.order_by(</span>*<span style="color: rgba(0, 0, 0, 1)">order_by_list)
)
case </span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">or</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">:
query </span>=<span style="color: rgba(0, 0, 0, 1)"> await db.execute(
select(self.model).<strong><span style="color: rgba(255, 0, 0, 1)">filter(or_(</span></strong></span><strong><span style="color: rgba(255, 0, 0, 1)">*where_list))</span></strong>.order_by(*<span style="color: rgba(0, 0, 0, 1)">order_by_list)
)</span></pre>
</div>
<p>Python的SqlAlchemy提供 InstrumentedAttribute 对象来操作多个条件,如我们对于一些多条件的处理,可以利用它来传递多个参数。</p>
<div class="cnblogs_code">
<pre> async <span style="color: rgba(0, 0, 255, 1)">def</span><span style="color: rgba(0, 0, 0, 1)"> get_all_by_attributes(
self, db: AsyncSession, </span>*attributes: InstrumentedAttribute, sorting: str = <span style="color: rgba(128, 0, 0, 1)">""</span><span style="color: rgba(0, 0, 0, 1)">
) </span>-> List |<span style="color: rgba(0, 0, 0, 1)"> None:
</span><span style="color: rgba(128, 0, 0, 1)">"""</span><span style="color: rgba(128, 0, 0, 1)">根据列名称和值获取相关的对象列表
:param sorting: 格式:name asc 或 name asc,age desc
:param attributes: SQLAlchemy InstrumentedAttribute objects,可以输入多个条件
例子:User.id != 1 或者 User.username == "JohnDoe"
</span><span style="color: rgba(128, 0, 0, 1)">"""</span><span style="color: rgba(0, 0, 0, 1)">
order_by_list </span>=<span style="color: rgba(0, 0, 0, 1)"> parse_sort_string(sorting, self.model)
query </span>= select(self.model).filter(and_(*attributes)).order_by(*<span style="color: rgba(0, 0, 0, 1)">order_by_list)
result </span>=<span style="color: rgba(0, 0, 0, 1)"> await db.execute(query)
</span><span style="color: rgba(0, 0, 255, 1)">return</span> result.scalars().all()</pre>
</div>
<p>例如,对于 模型 Material 对象,我们对它进行多个条件的查询处理,如下所示,红色部分为 *attributes: InstrumentedAttribute 参数。</p>
<div class="cnblogs_code">
<pre>items =<span style="color: rgba(0, 0, 0, 1)"> await super().get_all_by_attributes(
db,
<span style="color: rgba(255, 0, 0, 1)"><strong> Material.id </strong></span></span><span style="color: rgba(255, 0, 0, 1)"><strong>== vercol.id,
Material.vercol == vercol.vercol,
Material.ischecked == 0,
Material.status ==</strong></span><span style="color: rgba(0, 0, 0, 1)"><span style="color: rgba(255, 0, 0, 1)"><strong> 0,</strong></span>
)</span></pre>
</div>
<p>同样我们可以利用它来获取数量,或者判断多条件的记录是否存在。</p>
<p><img src="https://img2024.cnblogs.com/blog/8867/202512/8867-20251225103430193-1873231330.png" alt="image" width="944" height="532" loading="lazy"></p>
<p> 在数据插入或者更新的操作中,我们可以接受对象类型或者字典类型的参数对象,因此方法如下所示。</p>
<div class="cnblogs_code">
<pre> async <span style="color: rgba(0, 0, 255, 1)">def</span> update(self, db: AsyncSession, obj_in: DtoType | dict) -><span style="color: rgba(0, 0, 0, 1)"> bool:
</span><span style="color: rgba(128, 0, 0, 1)">"""</span><span style="color: rgba(128, 0, 0, 1)">更新对象
:param obj_in: 对象输入数据,可以是 DTO 对象或字典
</span><span style="color: rgba(128, 0, 0, 1)">"""</span>
<span style="color: rgba(0, 0, 255, 1)">try</span><span style="color: rgba(0, 0, 0, 1)">:
</span><span style="color: rgba(0, 0, 255, 1)">if</span><span style="color: rgba(0, 0, 0, 1)"> isinstance(obj_in, dict):
obj_id </span>= obj_in.get(<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">id</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">)
</span><span style="color: rgba(0, 0, 255, 1)">if</span> obj_id <span style="color: rgba(0, 0, 255, 1)">is</span><span style="color: rgba(0, 0, 0, 1)"> None:
</span><span style="color: rgba(0, 0, 255, 1)">raise</span> ValueError(<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">id is required for update</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">)
update_data </span>=<span style="color: rgba(0, 0, 0, 1)"> obj_in
</span><span style="color: rgba(0, 0, 255, 1)">else</span><span style="color: rgba(0, 0, 0, 1)">:
obj_id </span>=<span style="color: rgba(0, 0, 0, 1)"> obj_in.id
</span><span style="color: rgba(0, 128, 0, 1)">#</span><span style="color: rgba(0, 128, 0, 1)"> update_data = vars(obj_in)</span>
update_data = obj_in.model_dump(exclude_unset=<span style="color: rgba(0, 0, 0, 1)">True)
query </span>= select(self.model).filter(self.model.id ==<span style="color: rgba(0, 0, 0, 1)"> obj_id)
result </span>=<span style="color: rgba(0, 0, 0, 1)"> await db.execute(query)
db_obj </span>=<span style="color: rgba(0, 0, 0, 1)"> result.scalars().first()
</span><span style="color: rgba(0, 0, 255, 1)">if</span><span style="color: rgba(0, 0, 0, 1)"> db_obj:
</span><span style="color: rgba(0, 128, 0, 1)">#</span><span style="color: rgba(0, 128, 0, 1)"> 更新对象字段</span>
<span style="color: rgba(0, 0, 255, 1)">for</span> field, value <span style="color: rgba(0, 0, 255, 1)">in</span><span style="color: rgba(0, 0, 0, 1)"> update_data.items():
</span><span style="color: rgba(0, 128, 0, 1)">#</span><span style="color: rgba(0, 128, 0, 1)"> 跳过以 "_" 开头的私有属性</span>
<span style="color: rgba(0, 0, 255, 1)">if</span> field.startswith(<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">_</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">):
</span><span style="color: rgba(0, 0, 255, 1)">continue</span><span style="color: rgba(0, 0, 0, 1)">
setattr(db_obj, field, value)
</span><span style="color: rgba(0, 128, 0, 1)">#</span><span style="color: rgba(0, 128, 0, 1)"> 处理更新前的回调处理</span>
<span style="color: rgba(255, 0, 0, 1)"><strong> self.on_before_update(update_data, db_obj)
</strong></span><span style="color: rgba(0, 128, 0, 1)">#</span><span style="color: rgba(0, 128, 0, 1)"> 提交事务</span>
<span style="color: rgba(0, 0, 0, 1)"> await db.commit()
</span><span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)"> True
</span><span style="color: rgba(0, 0, 255, 1)">else</span><span style="color: rgba(0, 0, 0, 1)">:
</span><span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)"> False
</span><span style="color: rgba(0, 0, 255, 1)">except</span><span style="color: rgba(0, 0, 0, 1)"> SQLAlchemyError as e:
self.logger.error(f</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">update 操作出现错误: {e}</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">)
await db.rollback()</span><span style="color: rgba(0, 128, 0, 1)">#</span><span style="color: rgba(0, 128, 0, 1)"> 确保在出错时回滚事务</span>
<span style="color: rgba(0, 0, 255, 1)">return</span> False</pre>
</div>
<p>我们在插入或者更新数据的时候,一般会默认更新一些字段,如创建人,创建日期、编辑人,编辑日期等信息,我们可以把它单独作为一个可以给子类重写的函数,基类做一些默认的处理。</p>
<div class="cnblogs_code">
<pre> <span style="color: rgba(0, 0, 255, 1)">def</span> on_before_update(self, update_data: dict, db_obj: ModelType) -><span style="color: rgba(0, 0, 0, 1)"> None:
</span><span style="color: rgba(128, 0, 0, 1)">"""</span><span style="color: rgba(128, 0, 0, 1)">更新对象前的回调函数,子类可以重写此方法
可通过 setattr(db_obj, field, value) 设置字段值
</span><span style="color: rgba(128, 0, 0, 1)">"""</span><span style="color: rgba(0, 0, 0, 1)">
setattr(db_obj, </span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">edittime</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">, datetime.now())
user :CurrentUserIns</span>=<span style="color: rgba(0, 0, 0, 1)"> get_current_user()
</span><span style="color: rgba(0, 0, 255, 1)">if</span><span style="color: rgba(0, 0, 0, 1)"> user:
setattr(db_obj, </span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">editor</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">, user.fullname)
setattr(db_obj, </span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">editor_id</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">, user.id)
setattr(db_obj, </span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">company_id</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">, user.company_id)
setattr(db_obj, </span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">companyname</span><span style="color: rgba(128, 0, 0, 1)">"</span>, user.companyname)</pre>
</div>
<p>有时候,如果我们需要获取某个字段非重复的列表,用来做为动态下拉列表的数据,那么我们可以通过下面函数封装下。</p>
<div class="cnblogs_code">
<pre> async <span style="color: rgba(0, 0, 255, 1)">def</span> get_field_list(self, db: AsyncSession, field_name: str) -><span style="color: rgba(0, 0, 0, 1)"> Iterable:
</span><span style="color: rgba(128, 0, 0, 1)">"""</span><span style="color: rgba(128, 0, 0, 1)">获取指定字段值的唯一列表
:param field_name: 字段名称
</span><span style="color: rgba(128, 0, 0, 1)">"""</span><span style="color: rgba(0, 0, 0, 1)">
field </span>=<span style="color: rgba(0, 0, 0, 1)"> getattr(self.model, field_name)
query </span>=<span style="color: rgba(0, 0, 0, 1)"> select(distinct(field))
result </span>=<span style="color: rgba(0, 0, 0, 1)"> await db.execute(query)
</span><span style="color: rgba(0, 0, 255, 1)">return</span> result.scalars().all()</pre>
</div>
<p> </p>
<h3>3、多表联合的处理操作 </h3>
<p>多表操作,也是我们经常碰到的处理方式,如对于字典类型和字典项目,他们是两个表,需要联合起来获取数据,那么就需要多表的联合操作。</p>
<p><img src="https://img2024.cnblogs.com/blog/8867/202512/8867-20251225110211988-1085898697.png" alt="image" width="238" height="326" loading="lazy"></p>
<p> 如下是字典CRUD类中,联合字典类型获取数据的记录处理。</p>
<div class="cnblogs_code">
<pre> async <span style="color: rgba(0, 0, 255, 1)">def</span> get_dict_by_typename(self, db: AsyncSession, dicttype_name: str) -><span style="color: rgba(0, 0, 0, 1)"> dict:
</span><span style="color: rgba(128, 0, 0, 1)">"""</span><span style="color: rgba(128, 0, 0, 1)">根据字典类型名称获取所有该类型的字典列表集合</span><span style="color: rgba(128, 0, 0, 1)">"""</span><span style="color: rgba(0, 0, 0, 1)">
result </span>=<span style="color: rgba(0, 0, 0, 1)"> await db.execute(
select(self.model)
<span style="color: rgba(255, 0, 0, 1)"><strong>.join(DictType, DictType.id </strong></span></span><span style="color: rgba(255, 0, 0, 1)"><strong>== self.model.dicttype_id)</strong></span><span style="color: rgba(0, 128, 0, 1)">#</span><span style="color: rgba(0, 128, 0, 1)"> 关联字典类型表</span>
<strong><span style="color: rgba(255, 0, 0, 1)">.filter(DictType.name == dicttype_name)</span> </strong> <span style="color: rgba(0, 128, 0, 1)">#</span><span style="color: rgba(0, 128, 0, 1)"> 过滤字典类型名称</span>
.order_by(DictData.seq)<span style="color: rgba(0, 128, 0, 1)">#</span><span style="color: rgba(0, 128, 0, 1)"> 排序</span>
<span style="color: rgba(0, 0, 0, 1)"> )
items </span>=<span style="color: rgba(0, 0, 0, 1)"> result.scalars().all()
dict </span>=<span style="color: rgba(0, 0, 0, 1)"> {}
</span><span style="color: rgba(0, 0, 255, 1)">for</span> info <span style="color: rgba(0, 0, 255, 1)">in</span><span style="color: rgba(0, 0, 0, 1)"> items:
</span><span style="color: rgba(0, 0, 255, 1)">if</span> info.name <span style="color: rgba(0, 0, 255, 1)">not</span> <span style="color: rgba(0, 0, 255, 1)">in</span><span style="color: rgba(0, 0, 0, 1)"> dict:
dict </span>=<span style="color: rgba(0, 0, 0, 1)"> info.value
</span><span style="color: rgba(0, 0, 255, 1)">return</span> dict</pre>
</div>
<p>如果我们需要对某个表的递归获取树列表,可以如下处理</p>
<div class="cnblogs_code">
<pre> async <span style="color: rgba(0, 0, 255, 1)">def</span> get_tree(self, db: AsyncSession, pid: str) -><span style="color: rgba(0, 0, 0, 1)"> list:
</span><span style="color: rgba(128, 0, 0, 1)">"""</span><span style="color: rgba(128, 0, 0, 1)">获取字典类型一级列表及其下面的内容</span><span style="color: rgba(128, 0, 0, 1)">"""</span>
<span style="color: rgba(0, 128, 0, 1)">#</span><span style="color: rgba(0, 128, 0, 1)"> 使用三元运算符将 pid 设为 "-1"(如果 pid 是 null 或空白)或保持原值</span>
pid = <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">-1</span><span style="color: rgba(128, 0, 0, 1)">"</span> <span style="color: rgba(0, 0, 255, 1)">if</span> <span style="color: rgba(0, 0, 255, 1)">not</span> pid <span style="color: rgba(0, 0, 255, 1)">or</span> pid.strip() == <span style="color: rgba(128, 0, 0, 1)">""</span> <span style="color: rgba(0, 0, 255, 1)">else</span><span style="color: rgba(0, 0, 0, 1)"> pid
result </span>=<span style="color: rgba(0, 0, 0, 1)"> await db.execute(
select(self.model)
.filter(self.model.pid </span>==<span style="color: rgba(0, 0, 0, 1)"> pid)
.options(selectinload(DictType.children))
)
nodes </span>=<span style="color: rgba(0, 0, 0, 1)"> result.scalars().all()
</span><span style="color: rgba(0, 0, 255, 1)">return</span> nodes</pre>
</div>
<p>我们来假设用户和文章的示例表结构(ORM 模型,如下所示。</p>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 255, 1)">class</span><span style="color: rgba(0, 0, 0, 1)"> User(Base):
</span><span style="color: rgba(128, 0, 128, 1)">__tablename__</span> = <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">users</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">
id </span>= Column(Integer, primary_key=True, index=<span style="color: rgba(0, 0, 0, 1)">True)
name </span>=<span style="color: rgba(0, 0, 0, 1)"> Column(String)
email </span>=<span style="color: rgba(0, 0, 0, 1)"> Column(String)
<span style="color: rgba(255, 0, 0, 1)"> articles </span></span><span style="color: rgba(255, 0, 0, 1)">= <strong>relationship</strong>("Article", back_populates="author")
</span><span style="color: rgba(0, 0, 255, 1)">class</span><span style="color: rgba(0, 0, 0, 1)"> Article(Base):
</span><span style="color: rgba(128, 0, 128, 1)">__tablename__</span> = <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">articles</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">
id </span>= Column(Integer, primary_key=True, index=<span style="color: rgba(0, 0, 0, 1)">True)
title </span>=<span style="color: rgba(0, 0, 0, 1)"> Column(String)
content </span>=<span style="color: rgba(0, 0, 0, 1)"> Column(Text)
<span style="color: rgba(255, 0, 0, 1)"> user_id </span></span><span style="color: rgba(255, 0, 0, 1)">= Column(Integer, <strong>ForeignKey</strong>("users.id"))
author = <strong>relationship</strong>("User", back_populates="articles")</span></pre>
</div>
<p>我们可以通过下面函数处理获得相关的记录集合。</p>
<div class="cnblogs_code">
<pre>async <span style="color: rgba(0, 0, 255, 1)">def</span><span style="color: rgba(0, 0, 0, 1)"> get_user_articles(db: AsyncSession):
stmt </span>=<span style="color: rgba(0, 0, 0, 1)"> (
select(User, Article)
.<span style="color: rgba(255, 0, 0, 1)"><strong>join</strong></span>(Article, Article.user_id </span>==<span style="color: rgba(0, 0, 0, 1)"> User.id)
)
result </span>=<span style="color: rgba(0, 0, 0, 1)"> await db.execute(stmt)
</span><span style="color: rgba(0, 0, 255, 1)">return</span> result.all() <span style="color: rgba(0, 128, 0, 1)">#</span><span style="color: rgba(0, 128, 0, 1)"> [<strong><span style="color: rgba(255, 0, 0, 1)">(User(), Article()), .</span></strong>..]</span></pre>
</div>
<p>如果需要可以使用outer_join函数处理</p>
<div class="cnblogs_code">
<pre>async <span style="color: rgba(0, 0, 255, 1)">def</span><span style="color: rgba(0, 0, 0, 1)"> get_users_with_articles(db: AsyncSession):
stmt </span>=<span style="color: rgba(0, 0, 0, 1)"> (
select(User, Article)
.<span style="color: rgba(255, 0, 0, 1)"><strong>outerjoin</strong></span>(Article, Article.user_id </span>==<span style="color: rgba(0, 0, 0, 1)"> User.id)
)
result </span>=<span style="color: rgba(0, 0, 0, 1)"> await db.execute(stmt)
</span><span style="color: rgba(0, 0, 255, 1)">return</span> result.all()<span style="color: rgba(0, 128, 0, 1)">#</span><span style="color: rgba(0, 128, 0, 1)"> 用户即便没有文章也会出现</span></pre>
</div>
<p> </p>
<p>如果我们需要获取有文章的所有用户,如下所示。</p>
<div class="cnblogs_code">
<pre>async <span style="color: rgba(0, 0, 255, 1)">def</span><span style="color: rgba(0, 0, 0, 1)"> get_users_with_articles(db: AsyncSession):
stmt </span>= select(User).options(<span style="color: rgba(255, 0, 0, 1)"><strong>selectinload</strong></span>(User.articles))<span style="color: rgba(0, 128, 0, 1)">#</span><span style="color: rgba(0, 128, 0, 1)"> 自动 load 关联</span>
result =<span style="color: rgba(0, 0, 0, 1)"> await db.execute(stmt)
</span><span style="color: rgba(0, 0, 255, 1)">return</span> result.scalars().all()</pre>
</div>
<p><code data-start="1468" data-end="1482">selectinload</code> 会执行两次 SQL,但效率高,不会产生笛卡尔积,非常适合集合查询。</p>
<p>多表链式 Join的处理,可以获得两个表的不同信息进行组合。</p>
<div class="cnblogs_code">
<pre>async <span style="color: rgba(0, 0, 255, 1)">def</span><span style="color: rgba(0, 0, 0, 1)"> get_articles_with_author(db: AsyncSession):
stmt </span>=<span style="color: rgba(0, 0, 0, 1)"> (
select(Article.title, User.name.label(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">author</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">))
.join(User, Article.user_id </span>==<span style="color: rgba(0, 0, 0, 1)"> User.id)
)
rows </span>=<span style="color: rgba(0, 0, 0, 1)"> await db.execute(stmt)
</span><span style="color: rgba(0, 0, 255, 1)">return</span> <span style="color: rgba(255, 0, 0, 1)"><strong>rows.mappings().all()</strong></span><span style="color: rgba(0, 128, 0, 1)">#</span><span style="color: rgba(0, 128, 0, 1)"> 以 dict 形式返回 [{'title':..., 'author':...}]</span></pre>
</div>
<p>带筛选条件与分页的处理实现,如下所示</p>
<div class="cnblogs_code">
<pre>async <span style="color: rgba(0, 0, 255, 1)">def</span> search_articles(db: AsyncSession, keyword: str, page: int = 1, size: int = 10<span style="color: rgba(0, 0, 0, 1)">):
stmt </span>=<span style="color: rgba(0, 0, 0, 1)"> (
select(Article, User.name.label(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">author</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">))
.join(User)
.filter(Article.title.contains(keyword))
.offset((page </span>- 1) *<span style="color: rgba(0, 0, 0, 1)"> size)
.limit(size)
)
result </span>=<span style="color: rgba(0, 0, 0, 1)"> await db.execute(stmt)
</span><span style="color: rgba(0, 0, 255, 1)">return</span> result.mappings().all()</pre>
</div>
<p> </p>
<p>对于权限管理系统来说,一般有用户、角色,以及用户角色的中间表,一般来说他们的关系如下</p>
<p><img src="https://img2024.cnblogs.com/blog/8867/202512/8867-20251225122040874-818263456.png" alt="image" width="729" height="363" loading="lazy"></p>
<p> 在界面中一般会提供选择用户的操作关联,如角色中维护用户列表。</p>
<p><img src="https://img2024.cnblogs.com/blog/8867/202512/8867-20251225121122302-651245416.png" alt="image" width="481" height="426" loading="lazy"></p>
<p> 我们来看看这个在SQLAlchemy最佳实践是如何的操作。</p>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 255, 1)">from</span> sqlalchemy <span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> Table, Column, Integer, ForeignKey
</span><span style="color: rgba(0, 0, 255, 1)">from</span> sqlalchemy.orm <span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> relationship, Mapped, mapped_column
</span><span style="color: rgba(0, 0, 255, 1)">from</span> database <span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> Base
</span><span style="color: rgba(0, 128, 0, 1)">#</span><span style="color: rgba(0, 128, 0, 1)"> --- 中间表写法 ---</span>
role_user =<span style="color: rgba(0, 0, 0, 1)"> Table(
</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">role_user</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">,
Base.metadata,
Column(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">user_id</span><span style="color: rgba(128, 0, 0, 1)">"</span>, ForeignKey(<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">users.id</span><span style="color: rgba(128, 0, 0, 1)">"</span>), primary_key=<span style="color: rgba(0, 0, 0, 1)">True),
Column(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">role_id</span><span style="color: rgba(128, 0, 0, 1)">"</span>, ForeignKey(<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">roles.id</span><span style="color: rgba(128, 0, 0, 1)">"</span>), primary_key=<span style="color: rgba(0, 0, 0, 1)">True)
)
</span><span style="color: rgba(0, 0, 255, 1)">class</span><span style="color: rgba(0, 0, 0, 1)"> User(Base):
</span><span style="color: rgba(128, 0, 128, 1)">__tablename__</span> = <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">users</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">
id: Mapped </span>= mapped_column(primary_key=<span style="color: rgba(0, 0, 0, 1)">True)
username: Mapped </span>=<span style="color: rgba(0, 0, 0, 1)"> mapped_column()
roles: Mapped] =<span style="color: rgba(0, 0, 0, 1)"> relationship(
secondary</span>=<span style="color: rgba(0, 0, 0, 1)">role_user,
back_populates</span>=<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">users</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">,
lazy</span>=<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">selectin</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">
)
</span><span style="color: rgba(0, 0, 255, 1)">class</span><span style="color: rgba(0, 0, 0, 1)"> Role(Base):
</span><span style="color: rgba(128, 0, 128, 1)">__tablename__</span> = <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">roles</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">
id: Mapped </span>= mapped_column(primary_key=<span style="color: rgba(0, 0, 0, 1)">True)
name: Mapped </span>=<span style="color: rgba(0, 0, 0, 1)"> mapped_column()
users: Mapped] </span>=<span style="color: rgba(0, 0, 0, 1)"> relationship(
secondary</span>=<span style="color: rgba(0, 0, 0, 1)">role_user,
back_populates</span>=<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">roles</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">,
lazy</span>=<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">selectin</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">
)</span></pre>
</div>
<p>在 SQLAlchemy 声明多对多关系时,<code data-start="22" data-end="33">secondary</code> 参数既可以填 <strong data-start="41" data-end="53">字符串形式的表名</strong>,也可以填 <strong data-start="59" data-end="84">已经定义好的中间表对象(Table 对象)。</strong></p>
<p><strong data-start="59" data-end="84">① econdary="role_user" —— 使用字符串表名</strong></p>
<div class="cnblogs_code">
<pre>roles = relationship(<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Role</span><span style="color: rgba(128, 0, 0, 1)">"</span>, secondary=<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">role_user</span><span style="color: rgba(128, 0, 0, 1)">"</span>, back_populates=<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">users</span><span style="color: rgba(128, 0, 0, 1)">"</span>)</pre>
</div>
<p><strong>② secondary=role_user —— 传入中间表对象(推荐方式)</strong></p>
<div class="cnblogs_code">
<pre>role_user =<span style="color: rgba(0, 0, 0, 1)"> Table(
</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">role_user</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">,
Base.metadata,
Column(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">user_id</span><span style="color: rgba(128, 0, 0, 1)">"</span>, ForeignKey(<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">users.id</span><span style="color: rgba(128, 0, 0, 1)">"</span>), primary_key=<span style="color: rgba(0, 0, 0, 1)">True),
Column(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">role_id</span><span style="color: rgba(128, 0, 0, 1)">"</span>, ForeignKey(<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">roles.id</span><span style="color: rgba(128, 0, 0, 1)">"</span>), primary_key=<span style="color: rgba(0, 0, 0, 1)">True)
)
roles </span>= relationship(<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Role</span><span style="color: rgba(128, 0, 0, 1)">"</span>, secondary=role_user, back_populates=<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">users</span><span style="color: rgba(128, 0, 0, 1)">"</span>)</pre>
</div>
<p>对于如果获取对应角色的用户记录,我们可以通过下面方式获取(通过连接中间表的方式)</p>
<div class="cnblogs_code">
<pre>async <span style="color: rgba(0, 0, 255, 1)">def</span> get_users_by_role(db: AsyncSession, role_id: int) -><span style="color: rgba(0, 0, 0, 1)"> list:
stmt </span>=<span style="color: rgba(0, 0, 0, 1)"> (
select(User)
.join(role_user, role_user.c.user_id </span>==<span style="color: rgba(0, 0, 0, 1)"> User.id)
.where(role_user.c.role_id </span>==<span style="color: rgba(0, 0, 0, 1)"> role_id)
)
result </span>=<span style="color: rgba(0, 0, 0, 1)"> await db.execute(stmt)
</span><span style="color: rgba(0, 0, 255, 1)">return</span> result.scalars().all()</pre>
</div>
<p>也可以下面的方式进行处理(使用 relationship any()),效果是一样的。</p>
<div class="cnblogs_code">
<pre>select(User).filter(User.roles.any(id=role_id))</pre>
</div>
<p>如果需要写入用户、角色的关联关系,我们可以使用下面方法来通过中间表进行判断并写入记录。</p>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 255, 1)">from</span> sqlalchemy <span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> select, insert
async </span><span style="color: rgba(0, 0, 255, 1)">def</span><span style="color: rgba(0, 0, 0, 1)"> add_users_to_role(db: AsyncSession, role_id: int, user_ids: list):
</span><span style="color: rgba(0, 128, 0, 1)">#</span><span style="color: rgba(0, 128, 0, 1)"> 1️⃣ 查询已有关联 user_id</span>
stmt = select(role_user.c.user_id).where(role_user.c.role_id ==<span style="color: rgba(0, 0, 0, 1)"> role_id)
res </span>=<span style="color: rgba(0, 0, 0, 1)"> await db.execute(stmt)
existing_user_ids </span>= {row <span style="color: rgba(0, 0, 255, 1)">for</span> row <span style="color: rgba(0, 0, 255, 1)">in</span><span style="color: rgba(0, 0, 0, 1)"> res.fetchall()}
</span><span style="color: rgba(0, 128, 0, 1)">#</span><span style="color: rgba(0, 128, 0, 1)"> 2️⃣ 过滤出新的 user_id</span>
new_user_ids =
</span><span style="color: rgba(0, 0, 255, 1)">if</span> <span style="color: rgba(0, 0, 255, 1)">not</span><span style="color: rgba(0, 0, 0, 1)"> new_user_ids:
</span><span style="color: rgba(0, 0, 255, 1)">return</span> 0<span style="color: rgba(0, 128, 0, 1)">#</span><span style="color: rgba(0, 128, 0, 1)"> 没有新增</span>
<span style="color: rgba(0, 128, 0, 1)">#</span><span style="color: rgba(0, 128, 0, 1)"> 3️⃣ 批量插入</span>
values = [{<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">user_id</span><span style="color: rgba(128, 0, 0, 1)">"</span>: uid, <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">role_id</span><span style="color: rgba(128, 0, 0, 1)">"</span>: role_id} <span style="color: rgba(0, 0, 255, 1)">for</span> uid <span style="color: rgba(0, 0, 255, 1)">in</span><span style="color: rgba(0, 0, 0, 1)"> new_user_ids]
stmt </span>=<span style="color: rgba(0, 0, 0, 1)"> insert(role_user).values(values)
await db.execute(stmt)
await db.commit()
</span><span style="color: rgba(0, 0, 255, 1)">return</span> len(new_user_ids)</pre>
</div>
<p>如果只是单个记录的插入,可以利用下面的方式处理。</p>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 255, 1)">from</span> sqlalchemy <span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> select, insert
async </span><span style="color: rgba(0, 0, 255, 1)">def</span> add_user_to_role(db: AsyncSession, role_id: int, user_id: int) -><span style="color: rgba(0, 0, 0, 1)"> bool:
</span><span style="color: rgba(0, 128, 0, 1)">#</span><span style="color: rgba(0, 128, 0, 1)"> 1️⃣ 检查是否已存在关联</span>
stmt =<span style="color: rgba(0, 0, 0, 1)"> select(role_user).where(
role_user.c.role_id </span>==<span style="color: rgba(0, 0, 0, 1)"> role_id,
role_user.c.user_id </span>==<span style="color: rgba(0, 0, 0, 1)"> user_id
)
res </span>=<span style="color: rgba(0, 0, 0, 1)"> await db.execute(stmt)
exists </span>=<span style="color: rgba(0, 0, 0, 1)"> res.first()
</span><span style="color: rgba(0, 0, 255, 1)">if</span><span style="color: rgba(0, 0, 0, 1)"> exists:
</span><span style="color: rgba(0, 0, 255, 1)">return</span> False<span style="color: rgba(0, 128, 0, 1)">#</span><span style="color: rgba(0, 128, 0, 1)"> 已存在,不再插入</span>
<span style="color: rgba(0, 128, 0, 1)">#</span><span style="color: rgba(0, 128, 0, 1)"> 2️⃣ 插入记录</span>
stmt = insert(role_user).values(user_id=user_id, role_id=<span style="color: rgba(0, 0, 0, 1)">role_id)
await db.execute(stmt)
await db.commit()
</span><span style="color: rgba(0, 0, 255, 1)">return</span> True</pre>
</div>
<p>删除用户角色的关联关系,通过下面函数进行处理即可</p>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 255, 1)">from</span> sqlalchemy <span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> and_, or_, select
</span><span style="color: rgba(0, 0, 255, 1)">from</span> sqlalchemy <span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> delete as sa_delete, update as sa_update
async </span><span style="color: rgba(0, 0, 255, 1)">def</span> remove_user(self, db: AsyncSession, role_id: int, user_id: int) -><span style="color: rgba(0, 0, 0, 1)"> bool:
</span><span style="color: rgba(128, 0, 0, 1)">"""</span><span style="color: rgba(128, 0, 0, 1)">移除角色-用户关联</span><span style="color: rgba(128, 0, 0, 1)">"""</span><span style="color: rgba(0, 0, 0, 1)">
stmt </span>=<span style="color: rgba(0, 0, 0, 1)"> sa_delete(user_role).where(
and_(
user_role.c.role_id </span>==<span style="color: rgba(0, 0, 0, 1)"> role_id,
user_role.c.user_id </span>==<span style="color: rgba(0, 0, 0, 1)"> user_id,
)
)
result </span>=<span style="color: rgba(0, 0, 0, 1)"> await db.execute(stmt)
await db.commit()
</span><span style="color: rgba(0, 128, 0, 1)">#</span><span style="color: rgba(0, 128, 0, 1)"> rowcount 返回删除行数</span>
<span style="color: rgba(0, 0, 255, 1)">return</span> result.rowcount > 0</pre>
</div>
<p> </p>
<p>以上就是对于在Python+FastAPI的后端项目中使用SqlAlchemy操作数据的几种常见方式,包括单表处理,多表关联、中间表的数据维护和定义等内容,是我们在操作常规数据的时候,经常碰到的几种方式。</p>
<p>希望上文对你有所启发和帮助,感谢阅读。</p>
</div>
<div id="MySignature" role="contentinfo">
<div style="border-right-color: #cccccc; border-right-width: 1px; border-right-style: solid; padding-right: 5px; border-top-color: #cccccc; border-top-width: 1px; border-top-style: solid; padding-left: 4px; font-size: 13px; padding-bottom: 4px; border-left-color: #cccccc; border-left-width: 1px; border-left-style: solid; width: 98%; padding-top: 4px; border-bottom-color: #cccccc; border-bottom-width: 1px; border-bottom-style: solid; background-color: #eeeeee;">
<img src="http://www.cnblogs.com/Images/OutliningIndicators/None.gif" align="top" alt>
<span style="color: #000000"><span class="Apple-tab-span" style="white-space: pre"></span>
专注于代码生成工具、.Net/Python 框架架构及软件开发,以及各种Vue.js的前端技术应用。著有Winform开发框架/混合式开发框架、微信开发框架、Bootstrap开发框架、ABP开发框架、SqlSugar开发框架、Python开发框架等框架产品。
<br> 转载请注明出处:撰写人:伍华聪 http://www.iqidi.com <br> </span></div><br><br>
来源:https://www.cnblogs.com/wuhuacong/p/19396612
頁:
[1]