Psycopg是Python编程语言新设计的PostgreSQL数据库适配器。由于Hologres兼容PostgreSQL 11,因此您可以通过Psycopg访问Hologres。本文将指导您使用Psycopg 3访问Hologres。
前提条件
已安装3.7及以上版本的Python环境。
安装Psycopg 3
执行如下命令安装Psycopg 3。
pip  install   --upgrade pip             # 升级 pip 到 20.3 以上版本
pip install "psycopg[binary]"
连接Hologres
Psycopg 3安装完成之后,您可以执行如下操作并连接Hologres。
- 加载Psycopg 3。 - 您可以执行以下命令,加载安装的Psycopg 3。 - import psycopg
- 创建数据库连接。 - 您可以通过 - psycopg.connect()函数连接Hologres,具体语法和参数说明如下所示。- conn = psycopg.connect( host="<Endpoint>", port=<Port>, dbname="<databases>", user="<Access ID>", password="<Access Key>", keepalives=<keepalives>, keepalives_idle=<keepalives_idle>, keepalives_interval=<keepalives_interval>, keepalives_count=<keepalives_count> )- 参数 - 描述 - Endpoint - Hologres实例的网络地址和端口。 - 进入Hologres管理控制台,选择左侧导航栏实例列表,单击目标实例,在实例详情页网络信息中获取网络地址和端口。 重要- 请根据代码运行所在网络环境选择正确的网络地址和端口,否则将无法正常连接。 - Port - databases - Hologres创建的数据库名称。 - Access ID - 当前阿里云账号的AccessKey ID。 - 您可以单击AccessKey 管理,获取AccessKey ID。 - Access Key - 当前阿里云账号的AccessKey Secret。 - keepalives - 可选(推荐使用),连接方式: - 1表示使用长连接。 
- 0表示非长连接。 
 - keepalives_idle - 空闲时,保持连接连通的时间间隔,单位秒(s)。 - keepalives_interval - 没得到回应时,等待重新尝试保持连通的时间间隔,单位秒(s)。 - keepalives_count - 尝试重新保持连通最大次数。 - 代码示例如下。 - conn = psycopg.connect( host="<Endpoint>", port=<Port>, dbname="<databases>", user="<Access ID>", password="<Access Key>", keepalives=1, # 保持连接 keepalives_idle=130, # 空闲时,每130秒保持连接连通 keepalives_interval=10, # 没得到回应时,等待10秒重新尝试保持连通 keepalives_count=15, # 尝试最多15次重新保持连通 application_name="<Application Name>" )说明- 配置Application Name参数可以帮助您在历史慢Query列表中快速查看发起请求的应用。 
使用Hologres
当您成功连接Hologres数据库之后,即可通过Psycopg 3进行数据开发操作。如下内容将指导您创建表、插入数据、查询和释放资源等操作。如果需要使用Fixed Plan能力实现更高性能的读写操作,需要配置相关GUC参数,请参见Fixed Plan加速SQL执行。
- 创建游标。 - 在进行数据开发之前,您需要执行命令 - cur = conn.cursor()来创建连接的游标。
- 数据开发。 - 创建表 - 您可以执行如下命令,创建一个表 - holo_test并定义表的数据类型为integer。您也可以根据业务需求定义表名称和数据类型。- cur.execute("CREATE TABLE holo_test (num integer);")
- 插入数据 - 您可以执行如下命令,为创建的表 - holo_test插入数据1~1000。- cur.execute("INSERT INTO holo_test SELECT generate_series(%s, %s)", (1, 1000))
- 查询数据 - cur.execute("SELECT sum(num) FROM holo_test;") cur.fetchone()
 
- 提交事务。 - 上述示例存在DDL、DML和DQL三种情况,您需要在每个SQL后执行命令 - conn.commit()提交事务,才能确保操作已经提交。建议您直接在- conn连接代码之后把autocommit参数设置为true,实现SQL命令的自动提交。示例代码如下:- 同步调用示例 - conn = psycopg.connect( host="<Endpoint>", port=<Port>, dbname="<databases>", user="<Access ID>", password="<Access Key>", keepalives=1, # 保持连接 keepalives_idle=130, # 空闲时,每130秒保持连接连通 keepalives_interval=10, # 没得到回应时,等待10秒重新尝试保持连通 keepalives_count=15, # 尝试最多15次重新保持连通 application_name="<Application Name>" ) conn.autocommit = "True"
- 异步调用示例 - async with await psycopg.AsyncConnection.connect( host="<Endpoint>", port=<Port>, dbname="<databases>", user="<Access ID>", password="<Access Key>", application_name="<Application Name>", autocommit = "True" ) as aconn: async with aconn.cursor() as acur: await acur.execute( "INSERT INTO test (num, data) VALUES (%s, %s)", (100, "abc'def")) await acur.execute("SELECT * FROM test") await acur.fetchone() # will return (1, 100, "abc'def") async for record in acur: print(record)
 
- 释放资源。 - 为避免影响后续的操作,当操作执行完成后,您需要执行如下命令关闭游标并断开数据库连接。 - cur.close() conn.close()
Pandas DataFrame快速写入Hologres最佳实践
使用Python时,经常会使用Pandas将数据转换为DataFrame,并对DataFrame进行处理,最终将DataFrame导入Hologres,此时希望将DataFrame快速导入Hologres。
# pip install Pandas==1.5.1推荐使用COPY模式进行写入,Python示例代码如下。
import psycopg
import pandas as pd
# 连接Hologres
conn = psycopg.connect(
    host="hgpostcn-cn-xxxxx-cn-hangzhou.hologres.aliyuncs.com",
    port=80,
    dbname="db",
    user="xxx",
    password="xxx",
    application_name="psycopg3"
)
cur = conn.cursor()
# 删除多余的表
cur.execute("""
            DROP TABLE IF EXISTS df_data;
            """)
conn.commit()
# 创建测试表,用于写入数据
cur.execute("""
            CREATE TABLE IF NOT EXISTS df_data(
                col1 int,
                col2 int,
                col3 int,
                primary key(col1)
            );
            """)
conn.commit()
# 构建dataframe
data = [('1','1','1'),('2','2','2')]
cols = ('col1','col2','col3')
pd_data = pd.DataFrame(data, columns=cols)
# 一批一批写
# 在这里用StringIO将DataFrame转换成CSV格式字符串
from io import StringIO
# 创建一个缓冲区
buffer = StringIO()
        
# 将DataFrame写入缓冲区CSV格式
pd_data.to_csv(buffer, index=False, header=False)
        
# 将缓冲区位置重置到开始
buffer.seek(0)
with cur.copy("COPY df_data(col1,col2,col3) FROM STDIN WITH (STREAM_MODE TRUE,ON_CONFLICT UPDATE,FORMAT CSV);") as copy:
    while data := buffer.read(1024):
        copy.write(data)
conn.commit()
# 查看数据
cur.execute("SELECT * FROM df_data")
cur.fetchone()
cur.commit()查看历史查询,验证已经使用COPY方式写入数据至Hologres。