素婧流年 發表於 2025-10-12 16:42:00

Python psycopg2 类库使用学习总结

<h2 id="实践环境">实践环境</h2>
<p>openGauss 6.0.0 TLS企业版</p>
<p>python3 .9.13</p>
<p>psycopg2 2.9.10</p>
<h2 id="实践操作">实践操作</h2>
<pre><code class="language-python"># -*- coding:utf-8 -*-

import psycopg2

if __name__ == '__main__':   
    # 连接方式1
    # connection_str = 'host=192.168.88.139 port=15400 dbname=testdb user=testacc password=test1234#'
    # conn = psycopg2.connect(connection_str)
   
    # 连接方式2
    # 注意:
    # host: 如果未提供,默认为UNIX socket
    # port: 如果未提供,默认为5432
    # database 仅用于关键词参数,不能用于连接参数字符串中
    conn = psycopg2.connect(host='192.168.88.139', port=15400, database='testdb', user='testacc', password='test1234#')
   
    cursor = conn.cursor()
   
    cursor.execute('SELECT datname AS "Database", pg_catalog.pg_get_userbyid(datdba) AS "Owner" FROM pg_database')
    res = cursor.fetchall()
    print(res)# 输出形如 [('template1', 'omm'), ('template0', 'omm'), ('testdb', 'omm'), ('postgres', 'omm')]
   
    # 删除表
    cursor.execute('DROP TABLE IF EXISTS t_user')
    #print(cursor.fetchall())# 会报错:psycopg2.ProgrammingError: no results to fetch
   
    create_tb_sql = '''CREATE TABLE IF NOT EXISTS t_user (
                        user_id    INT PRIMARY KEY,
                        username   VARCHAR(50) NOT NULL UNIQUE,
                        password   VARCHAR(60) NOT NULL,
                        email      VARCHAR(100) UNIQUE
                  ) WITH (
                        fillfactor = 85-- 行存表填充因子(预留15%空间用于更新)
                  )'''
    cursor.execute(create_tb_sql)
    cursor.execute('commit') # 注意,如果没有这个,不会提交到数据库,即执行完上述语句后,数据库public模式下依然看不到数据表
   
    # 插入数据--使用 %s 占位符
    cursor.execute('INSERT INTO t_user (user_id, username, password, email) VALUES (%s, %s, %s, %s)',
                   (1, '赖某某', '123456', 'testemail1@163.com'))
    print(cursor.rowcount)# 获取execute 产生记录数 输出:1
   
    cursor.execute('INSERT INTO t_user (user_id, username, password, email) VALUES (%s, %s, %s, %s)',
                   )
   
    # 插入数据--使用 %(field_name)s 占位符
    cursor.execute('INSERT INTO t_user (user_id, username, password, email) VALUES (%(user_id)s, %(username)s, %(password)s, %(email)s)',
                   {'user_id':3, 'username': '肖某某', 'password': '123456', 'email':'testemail3@163.com'})

    # 插入数据-插入多条
    cursor.executemany('INSERT INTO t_user (user_id, username, password, email) VALUES (%s, %s, %s, %s)',
                   [(4, 'testacc1', '123456', 'testemail5@163.com'),
                        (5, 'testacc2', '123456', 'testemail6@163.com')])
    # 最后提交
    cursor.execute('commit')# 注意,如果没有这个,不会提交到数据库,即执行完上述语句后,数据库表中依然查不到对应数据

    cursor.execute('SELECT * FROM testdb.public.t_user')
    print(cursor.fetchall()) # 输出包含对应记录的list
   
    # cursor.execute("SELECT (%s % 2) = 0 AS even", (10,))# WRONG
    cursor.execute("SELECT (%s %% 2) = 0 AS even", (10,))# correct
    print(cursor.fetchall()) # 输出:[(True,)]
   
    cursor.execute('SELECT * FROM t_user WHERE user_id=1')
    print(cursor.fetchone())# 输出:(1, '赖某某', '123456', 'testemail1@163.com')
    print(cursor.fetchone())# 输出:None
   
    cursor.execute('SELECT * FROM t_user')
    print(cursor.rowcount)# 输出:5
   
    # 只取部分记录
    print(cursor.fetchmany(3))
    # 输出:[(1, '赖某某', '123456', 'testemail1@163.com'), (2, '林某某', '123456', 'testemail2@163.com'), (3, '王某某', '123456', 'testemail3@163.com')]
   
    cursor.execute('SELECT * FROM t_user WHERE user_id=10')
    res = cursor.fetchall()
    print(res) # 输出:[]
   
    cursor.close()
    conn.close()
</code></pre>
<p><strong>问题</strong></p>
<h3 id="1使用psycopg2-连接数据时遇到如下报错">1、使用psycopg2 连接数据时遇到如下报错:</h3>
<pre><code>psycopg2.OperationalError: connection to server at "192.168.88.139", port 15400 failed: none of the server's SASL authentication mechanisms are supported
</code></pre>
<p>前提:</p>
<p>opengauss<code>pg_hba.conf</code> 关键配置</p>
<pre><code class="language-properties"># IPv4 local connections:
...
host    all    all    0.0.0.0/0    sha256
</code></pre>
<p>解决方法</p>
<p>1、编辑opengauss服务器<code>postgresql.conf</code>配置文件,修改<code>password_encryption_type</code> 为1</p>
<pre><code class="language-properties">password_encryption_type = 1            #Password storage type, 0 is md5 for PG, 1 is sha256 + md5, 2 is sha256 only
</code></pre>
<p>2、重启数据库服务器</p>
<p>3、修改连接数据库所用用户的密码</p>
<h2 id="类库封装">类库封装</h2>
<p>测试用数据表</p>
<pre><code class="language-sql">CREATE TABLE IF NOT EXISTS test (
id    INT PRIMARY KEY,
log_message   VARCHAR(60) NOT NULL
) WITH (
fillfactor = 85-- 行存表填充因子(预留15%空间用于更新)
)
</code></pre>
<pre><code class="language-sql"># -*- coding:utf-8 -*-


import re
import traceback
import psycopg2
from utils.log import logger

class PostgreSQLCli:
    def __init__(self, db_name='', db_host='', port=3306, user='', password='', connect_timeout=15):
      try:
            self.dbconn = None
            self.host = db_host
            self.port = port
            self.user = user
            self.passwd = password
            self.db_name = db_name
            self.connect_timeout = connect_timeout
            self.connect_config = {'host': self.host, 'port': self.port, 'user': self.user, 'password': self.passwd, 'database': self.db_name}
            self.__connect_database()
            logger.debug('初始化数据库连接成功(数据库:%s)' % self.db_name)
      except Exception as e:
            raise Exception('初始化数据库(%s)连接失败:%s' % (self.db_name, traceback.format_exc()))

    def __connect_database(self):
      self.dbconn = psycopg2.connect(**self.connect_config)
      
    def insert(self, query, params=None):
      '''插入单条数据
      示例:
      :query "INSERT INTO test (x) VALUES(%(x)s)"
      :params {'x': 100}
      
      :query "INSERT INTO test (x) VALUES(%s)"
      :params 或者(100,)
      '''
      
      try:
            db_cursor = self.dbconn.cursor()
      except Exception:
            logger.error('获取数据库游标失败:%s,正在尝试重新连接数据库' % traceback.format_exc())
            self.__connect_database()
            db_cursor = self.dbconn.cursor()
      try:
            db_cursor.execute(query, params)
            db_cursor.execute('commit')
            db_cursor.close()
      except Exception:
            db_cursor.execute('rollback')
            db_cursor.close()
            raise Exception(f'执行数据库插入操作({query})失败:{traceback.format_exc()}')
   
    def insert_many(self, query, params=None):
      '''插入多条数据
      示例:
      :query "INSERT INTO test (x) VALUES(%(x)s)"
      :params [{'x': 100}, {'x': 101}]

      :query "INSERT INTO test (x) VALUES(%s)"
      :params [, ] 或者[(100,), (101, )]
      '''
      
      try:
            db_cursor = self.dbconn.cursor()
      except Exception:
            logger.error('获取数据库游标失败:%s,正在尝试重新连接数据库' % traceback.format_exc())
            self.__connect_database()
            db_cursor = self.dbconn.cursor()
      try:
            db_cursor.executemany(query, params)
            db_cursor.execute('commit')
            db_cursor.close()
      except Exception:
            db_cursor.execute('rollback')
            db_cursor.close()
            raise Exception(f'执行数据库批量插入操作({query})失败:{traceback.format_exc()}')
      
      
    def delete(self, query, params=None):
      '''例子:
      :query DELETE FROM test WHERE id = %(id)s'
      :params {'id': 1}
      当然,也可以把参数放到query中 DELETE FROM test WHERE id = 2
      '''
      
      try:
            db_cursor = self.dbconn.cursor()
      except Exception:
            logger.error('获取数据库游标失败:%s,正在尝试重新连接数据库' % traceback.format_exc())
            self.__connect_database()
            db_cursor = self.dbconn.cursor()
      try:
            db_cursor.execute(query, params)
            db_cursor.execute('commit')
            db_cursor.close()
      except Exception:
            db_cursor.execute('rollback')
            db_cursor.close()
            raise Exception(f'执行数据库删除操作({query})失败:{traceback.format_exc()}')


    def update(self, query, params=None):
      '''
      例子:
               :query "UPDATE test SET log_message=%(log_message)s WHERE id = %(id)s"
               :params {'log_message':'log message', 'id': 2}
               当然,也可以把参数放到query中 UPDATE test SET log_message='log message' WHERE id = 2
      '''
      try:
            db_cursor = self.dbconn.cursor()
      except Exception:
            logger.error('获取数据库游标失败:%s,正在尝试重新连接数据库' % traceback.format_exc())
            self.__connect_database()
            db_cursor = self.dbconn.cursor()
      try:
            db_cursor.execute(query, params)
            db_cursor.execute('commit')
            db_cursor.close()
      except Exception:
            db_cursor.execute('rollback')
            db_cursor.close()
            raise Exception(f'执行数据库更新操作({query})失败:{traceback.format_exc()}')
      

    def select(self, query, params=None):
      '''查询结果最多只包含一条记录
      
      示例:查询获取获取id为2的记录
      :query'SELECT * FROM test WHERE id = %(id)s'
      :param{'id': 2}

      当然,也可以把参数放到query中 SELECT * FROM test WHERE id = 2
      '''

      result = []
      try:
            db_cursor = self.dbconn.cursor()
      except Exception:
            logger.error('获取数据库游标失败:%s,正在尝试重新连接数据库' % traceback.format_exc())
            self.__connect_database()
            db_cursor = self.dbconn.cursor()

      try:
            db_cursor.execute(query, params)
            query_result = db_cursor.fetchall()
            if query_result:
                result = query_result
            db_cursor.close()
      except Exception:
            db_cursor.close()
            raise Exception(f'执行数据库查询操作({query})失败:{traceback.format_exc()}')

      return result


    def select_many(self, query, params=None):
      '''查询,查询结果包含多条记录'''

      try:
            db_cursor = self.dbconn.cursor()
      except Exception:
            logger.error('获取数据库游标失败:%s,正在尝试重新连接数据库(%s)' % (traceback.format_exc(), self.db_name))
            self.__connect_database()
            db_cursor = self.dbconn.cursor()
            
      try:
            db_cursor.execute(query, params)
            query_result = db_cursor.fetchall()
            db_cursor.close()
      except Exception:
            db_cursor.close()
            raise Exception(f'执行数据库查询操作({query})失败:{traceback.format_exc()}')

      return query_result


    def close(self):
      if self.dbconn:
            self.dbconn.close()
            self.dbconn = None

    def __del__(self):
      self.close()
      
      
# 测试
if __name__ == '__main__':
    db_cli = PostgreSQLCli(db_name='testdb', db_host='192.168.88.139', port=15400, user='testacc', password='test1234#')
    db_cli.insert('INSERT INTO test (id, log_message) VALUES(%s, %s)', )
    db_cli.insert_many('INSERT INTO test (id, log_message) VALUES(%s, %s)', [,])
    res = db_cli.select_many('SELECT * FROM test')
    print(res) #输出:[(1, 'test message 1'), (2, 'test message 2'), (3, 'test message 3')]

    res = db_cli.select('SELECT * FROM test WHERE id = %(id)s', {'id': 2})
    print(res) # 输出:(2, 'test message 2')
    res = db_cli.select('SELECT * FROM test WHERE id = 2')
    print(res)

    db_cli.update("UPDATE test SET log_message = %(log_message)s WHERE id = %(id)s", {'log_message':'log_message %s' % datetime.now().strftime('%Y%m%d%H%M%S'), 'id': 2})
    res = db_cli.select('SELECT * FROM test WHERE id = 2')
    print(res)

    db_cli.update("UPDATE test SET log_message = '%s' WHERE id = 2" % ('log_message %s' % datetime.now().strftime('%Y%m%d%H%M%S')))
    res = db_cli.select('SELECT * FROM test WHERE id = 2')
    print(res)

    db_cli.delete('DELETE FROM test WHERE id IN (%(id)s, %(id2)s)', {'id': 2, 'id2': 3})
   
    res = db_cli.select_many('SELECT * FROM test')
    print(res)
</code></pre>
<p>注意:误区</p>
<p>当前驱动版本下验证,使用类似以下代码,尝试切换当前数据库至目标数据库test_db,然后获取获取test_db数据库中所有表</p>
<pre><code class="language-python">db_cli.select('USE `test_db`')
tables = db_cli.select('SHOW TABLES')
</code></pre>
<p>实际执行结果,<code>db_cli.select('SHOW TABLES')</code>总是返回初始化连接时连接的数据库的中的表。解决方法如下:</p>
<pre><code class="language-shell">tables = db_cli.select('SHOW TABLES FROM test_db')
</code></pre>
<p>注意:如果表名中存在特殊字符比如 / 时,表名需要加双引号,否则会报错</p>
<pre><code class="language-python">db_cli.select('SELECT * FROM db_name.schema_name."agent/defualt");
db_cli.select('SELECT * FROM "agent/defualt");
</code></pre>
<h2 id="参考链接">参考链接</h2>
<p>https://github.com/psycopg/psycopg2</p>
<p>https://www.psycopg.org/docs/usage.html</p>


</div>
<div id="MySignature" role="contentinfo">
    <div id="AllanboltSignature">
    <p id="PSignature" style="border: #330066 1px dashed; padding: 5px 10px; font-family: 微软雅黑; font-size: 11px">
      <span style="margin-left: 5px; font-weight: bold">作者:授客</span>
                <br>
      <span style="margin-left: 5px; font-weight: bold">微信/QQ:1033553122
                <br>
      <span style="margin-left: 5px; font-weight: bold">全国软件测试QQ交流群:7156436</span></span>
                <br>
      <span style="margin-left: 5px; font-weight: bold">Git地址:https://gitee.com/ishouke</span>
                <br>
      <span style="margin-left: 5px; font-weight: bold">友情提示:<span>限于时间仓促,文中可能存在错误,欢迎指正、评论!</span>
      <br>
                <span><span style="margin-left: 5px; font-weight: bold; color: red">作者五行缺钱,如果觉得文章对您有帮助,请扫描下边的二维码打赏作者,金额随意</span>,您的支持将是我继续创作的源动力,<span style="margin-left: 10px; font-weight: bold; color: red">打赏后如有任何疑问,请联系我!!!</span></span>
      <br>
                <span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;微信打赏&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
                支付宝打赏&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;全国软件测试交流QQ群&nbsp;&nbsp;<br>
                <img src="https://www.cnblogs.com/images/cnblogs_com/shouke/1368383/t_%E5%BE%AE%E4%BF%A1%E6%94%B6%E6%AC%BE%E7%A0%81.bmp">
                &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<img src="https://www.cnblogs.com/images/cnblogs_com/shouke/1368383/t_%E6%94%AF%E4%BB%98%E5%AE%9D%E6%94%B6%E6%AC%BE%E7%A0%81.bmp">
                &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<img src="https://www.cnblogs.com/images/cnblogs_com/shouke/1368383/t_qq%E7%BE%A4.bmp">
    </span></span></p>
</div><br><br>
来源:https://www.cnblogs.com/shouke/p/19134092
頁: [1]
查看完整版本: Python psycopg2 类库使用学习总结