Python
Psycopg是Python编程语言新设计的PostgreSQL数据库适配器。由于Hologres兼容PostgreSQL 11,因此您可以通过Psycopg访问Hologres。本文将指导您使用Psycopg 3访问Hologres。
前提条件
已安装3.7及以上版本的Python环境。
安装Psycopg 3
执行如下命令安装Psycopg 3。
pip install --upgrade pip # 升级 pip 到 20.3 以上版本
pip install "psycopg[binary]"
连接Hologres
Psycopg 3安装完成之后,您可以执行如下操作并连接Hologres。
加载Psycopg 3。
您可以执行以下命令,加载安装的Psycopg 3。
import psycopg
创建数据库连接。
您可以通过
psycopg.connect()
函数连接Hologres,具体语法和参数说明如下所示。conn = psycopg.connect( host="<Endpoint>", port=<Port>, dbname="<databases>", user="<Access ID>", password="<Access Key>", keepalives=<keepalives>, keepalives_idle=<keepalives_idle>, keepalives_interval=<keepalives_interval>, keepalives_count=<keepalives_count> )
参数
描述
Endpoint
Hologres实例的网络地址和端口。
进入Hologres管理控制台,选择左侧导航栏实例列表,单击目标实例,在实例详情页网络信息中获取网络地址和端口。
重要请根据代码运行所在网络环境选择正确的网络地址和端口,否则将无法正常连接。
Port
databases
Hologres创建的数据库名称。
Access ID
当前阿里云账号的AccessKey ID。
您可以单击AccessKey 管理,获取AccessKey ID。
Access Key
当前阿里云账号的AccessKey Secret。
keepalives
可选(推荐使用),连接方式:
1表示使用长连接。
0表示非长连接。
keepalives_idle
空闲时,保持连接连通的时间间隔,单位秒(s)。
keepalives_interval
没得到回应时,等待重新尝试保持连通的时间间隔,单位秒(s)。
keepalives_count
尝试重新保持连通最大次数。
代码示例如下。
conn = psycopg.connect( host="<Endpoint>", port=<Port>, dbname="<databases>", user="<Access ID>", password="<Access Key>", keepalives=1, # 保持连接 keepalives_idle=130, # 空闲时,每130秒保持连接连通 keepalives_interval=10, # 没得到回应时,等待10秒重新尝试保持连通 keepalives_count=15, # 尝试最多15次重新保持连通 application_name="<Application Name>" )
说明配置Application Name参数可以帮助您在历史慢Query列表中快速查看发起请求的应用。
使用Hologres
当您成功连接Hologres数据库之后,即可通过Psycopg 3进行数据开发操作。如下内容将指导您创建表、插入数据、查询和释放资源等操作。如果需要使用Fixed Plan能力实现更高性能的读写操作,需要配置相关GUC参数,请参见Fixed Plan加速SQL执行。
创建游标。
在进行数据开发之前,您需要执行命令
cur = conn.cursor()
来创建连接的游标。数据开发。
创建表
您可以执行如下命令,创建一个表
holo_test
并定义表的数据类型为integer。您也可以根据业务需求定义表名称和数据类型。cur.execute("CREATE TABLE holo_test (num integer);")
插入数据
您可以执行如下命令,为创建的表
holo_test
插入数据1~1000。cur.execute("INSERT INTO holo_test SELECT generate_series(%s, %s)", (1, 1000))
查询数据
cur.execute("SELECT sum(num) FROM holo_test;") cur.fetchone()
提交事务。
上述示例存在DDL、DML和DQL三种情况,您需要在每个SQL后执行命令
conn.commit()
提交事务,才能确保操作已经提交。建议您直接在conn
连接代码之后把autocommit参数设置为true,实现SQL命令的自动提交。示例代码如下:同步调用示例
conn = psycopg.connect( host="<Endpoint>", port=<Port>, dbname="<databases>", user="<Access ID>", password="<Access Key>", keepalives=1, # 保持连接 keepalives_idle=130, # 空闲时,每130秒保持连接连通 keepalives_interval=10, # 没得到回应时,等待10秒重新尝试保持连通 keepalives_count=15, # 尝试最多15次重新保持连通 application_name="<Application Name>" ) conn.autocommit = "True"
异步调用示例
async with await psycopg.AsyncConnection.connect( host="<Endpoint>", port=<Port>, dbname="<databases>", user="<Access ID>", password="<Access Key>", application_name="<Application Name>", autocommit = "True" ) as aconn: async with aconn.cursor() as acur: await acur.execute( "INSERT INTO test (num, data) VALUES (%s, %s)", (100, "abc'def")) await acur.execute("SELECT * FROM test") await acur.fetchone() # will return (1, 100, "abc'def") async for record in acur: print(record)
释放资源。
为避免影响后续的操作,当操作执行完成后,您需要执行如下命令关闭游标并断开数据库连接。
cur.close() conn.close()
Pandas DataFrame快速写入Hologres最佳实践
使用Python时,经常会使用Pandas将数据转换为DataFrame,并对DataFrame进行处理,最终将DataFrame导入Hologres,此时希望将DataFrame快速导入Hologres。
# pip install Pandas==1.5.1
推荐使用COPY模式进行写入,Python示例代码如下。
import psycopg
import pandas as pd
# 连接Hologres
conn = psycopg.connect(
host="hgpostcn-cn-xxxxx-cn-hangzhou.hologres.aliyuncs.com",
port=80,
dbname="db",
user="xxx",
password="xxx",
application_name="psycopg3"
)
cur = conn.cursor()
# 删除多余的表
cur.execute("""
DROP TABLE IF EXISTS df_data;
""")
conn.commit()
# 创建测试表,用于写入数据
cur.execute("""
CREATE TABLE IF NOT EXISTS df_data(
col1 int,
col2 int,
col3 int,
primary key(col1)
);
""")
conn.commit()
# 构建dataframe
data = [('1','1','1'),('2','2','2')]
cols = ('col1','col2','col3')
pd_data = pd.DataFrame(data, columns=cols)
# 一批一批写
# 在这里用StringIO将DataFrame转换成CSV格式字符串
from io import StringIO
# 创建一个缓冲区
buffer = StringIO()
# 将DataFrame写入缓冲区CSV格式
pd_data.to_csv(buffer, index=False, header=False)
# 将缓冲区位置重置到开始
buffer.seek(0)
with cur.copy("COPY df_data(col1,col2,col3) FROM STDIN WITH (STREAM_MODE TRUE,ON_CONFLICT UPDATE,FORMAT CSV);") as copy:
while data := buffer.read(1024):
copy.write(data)
conn.commit()
# 查看数据
cur.execute("SELECT * FROM df_data")
cur.fetchone()
cur.commit()
查看历史查询,验证已经使用COPY方式写入数据至Hologres。