Python

更新时间: 2025-06-06 17:30:07

Psycopg是Python编程语言新设计的PostgreSQL数据库适配器。由于Hologres兼容PostgreSQL 11,因此您可以通过Psycopg访问Hologres。本文将指导您使用Psycopg 3访问Hologres。

前提条件

已安装3.7及以上版本的Python环境。

安装Psycopg 3

执行如下命令安装Psycopg 3

pip  install   --upgrade pip             # 升级 pip 到 20.3 以上版本
pip install "psycopg[binary]"

连接Hologres

Psycopg 3安装完成之后,您可以执行如下操作并连接Hologres。

  1. 加载Psycopg 3。

    您可以执行以下命令,加载安装的Psycopg 3。

    import psycopg
  2. 创建数据库连接。

    您可以通过psycopg.connect()函数连接Hologres,具体语法和参数说明如下所示。

    conn = psycopg.connect(
        host="<Endpoint>",
        port=<Port>, 
        dbname="<databases>", 
        user="<Access ID>", 
        password="<Access Key>",
        keepalives=<keepalives>, 
        keepalives_idle=<keepalives_idle>,
        keepalives_interval=<keepalives_interval>, 
        keepalives_count=<keepalives_count>
    )
    

    参数

    描述

    Endpoint

    Hologres实例的网络地址和端口。

    进入Hologres管理控制台,选择左侧导航栏实例列表,单击目标实例,在实例详情网络信息中获取网络地址和端口。

    重要

    请根据代码运行所在网络环境选择正确的网络地址和端口,否则将无法正常连接。

    Port

    databases

    Hologres创建的数据库名称。

    Access ID

    当前阿里云账号的AccessKey ID。

    您可以单击AccessKey 管理,获取AccessKey ID。

    Access Key

    当前阿里云账号的AccessKey Secret。

    keepalives

    可选(推荐使用),连接方式:

    • 1表示使用长连接。

    • 0表示非长连接。

    keepalives_idle

    空闲时,保持连接连通的时间间隔,单位秒(s)。

    keepalives_interval

    没得到回应时,等待重新尝试保持连通的时间间隔,单位秒(s)。

    keepalives_count

    尝试重新保持连通最大次数。

    代码示例如下。

    conn = psycopg.connect(
        host="<Endpoint>",
        port=<Port>, 
        dbname="<databases>", 
        user="<Access ID>", 
        password="<Access Key>",
        keepalives=1, # 保持连接
        keepalives_idle=130, # 空闲时,每130秒保持连接连通
        keepalives_interval=10, # 没得到回应时,等待10秒重新尝试保持连通
        keepalives_count=15, # 尝试最多15次重新保持连通
        application_name="<Application Name>"
    )
    说明

    配置Application Name参数可以帮助您在历史慢Query列表中快速查看发起请求的应用。

使用Hologres

当您成功连接Hologres数据库之后,即可通过Psycopg 3进行数据开发操作。如下内容将指导您创建表、插入数据、查询和释放资源等操作。如果需要使用Fixed Plan能力实现更高性能的读写操作,需要配置相关GUC参数,请参见Fixed Plan加速SQL执行

  1. 创建游标。

    在进行数据开发之前,您需要执行命令cur = conn.cursor()来创建连接的游标。

  2. 数据开发。

    1. 创建表

      您可以执行如下命令,创建一个表holo_test并定义表的数据类型为integer。您也可以根据业务需求定义表名称和数据类型。

      cur.execute("CREATE TABLE holo_test (num integer);")
    2. 插入数据

      您可以执行如下命令,为创建的表holo_test插入数据1~1000。

      cur.execute("INSERT INTO holo_test SELECT generate_series(%s, %s)", (1, 1000))
    3. 查询数据

      cur.execute("SELECT sum(num) FROM holo_test;")
      cur.fetchone()
  3. 提交事务。

    上述示例存在DDL、DML和DQL三种情况,您需要在每个SQL后执行命令conn.commit()提交事务,才能确保操作已经提交。建议您直接在conn连接代码之后把autocommit参数设置为true,实现SQL命令的自动提交。示例代码如下:

    • 同步调用示例

      conn = psycopg.connect(
          host="<Endpoint>",
          port=<Port>, 
          dbname="<databases>", 
          user="<Access ID>", 
          password="<Access Key>",
          keepalives=1, # 保持连接
          keepalives_idle=130, # 空闲时,每130秒保持连接连通
          keepalives_interval=10, # 没得到回应时,等待10秒重新尝试保持连通
          keepalives_count=15, # 尝试最多15次重新保持连通
          application_name="<Application Name>"
      )
      conn.autocommit = "True"
    • 异步调用示例

      async with await psycopg.AsyncConnection.connect(
          host="<Endpoint>",
          port=<Port>, 
          dbname="<databases>", 
          user="<Access ID>", 
          password="<Access Key>",
          application_name="<Application Name>",
          autocommit = "True"
          ) as aconn:
          async with aconn.cursor() as acur:
              await acur.execute(
                  "INSERT INTO test (num, data) VALUES (%s, %s)",
                  (100, "abc'def"))
              await acur.execute("SELECT * FROM test")
              await acur.fetchone()
              # will return (1, 100, "abc'def")
              async for record in acur:
                  print(record)
  4. 释放资源。

    为避免影响后续的操作,当操作执行完成后,您需要执行如下命令关闭游标并断开数据库连接。

    cur.close()
    conn.close()

Pandas DataFrame快速写入Hologres最佳实践

使用Python时,经常会使用Pandas将数据转换为DataFrame,并对DataFrame进行处理,最终将DataFrame导入Hologres,此时希望将DataFrame快速导入Hologres。

# pip install Pandas==1.5.1

推荐使用COPY模式进行写入,Python示例代码如下。

import psycopg
import pandas as pd

# 连接Hologres
conn = psycopg.connect(
    host="hgpostcn-cn-xxxxx-cn-hangzhou.hologres.aliyuncs.com",
    port=80,
    dbname="db",
    user="xxx",
    password="xxx",
    application_name="psycopg3"
)

cur = conn.cursor()

# 删除多余的表
cur.execute("""
            DROP TABLE IF EXISTS df_data;
            """)
conn.commit()

# 创建测试表,用于写入数据
cur.execute("""
            CREATE TABLE IF NOT EXISTS df_data(
                col1 int,
                col2 int,
                col3 int,
                primary key(col1)
            );
            """)
conn.commit()

# 构建dataframe
data = [('1','1','1'),('2','2','2')]
cols = ('col1','col2','col3')
pd_data = pd.DataFrame(data, columns=cols)




# 一批一批写
# 在这里用StringIO将DataFrame转换成CSV格式字符串
from io import StringIO

# 创建一个缓冲区
buffer = StringIO()
        
# 将DataFrame写入缓冲区CSV格式
pd_data.to_csv(buffer, index=False, header=False)
        
# 将缓冲区位置重置到开始
buffer.seek(0)

with cur.copy("COPY df_data(col1,col2,col3) FROM STDIN WITH (STREAM_MODE TRUE,ON_CONFLICT UPDATE,FORMAT CSV);") as copy:
    while data := buffer.read(1024):
        copy.write(data)
conn.commit()

# 查看数据
cur.execute("SELECT * FROM df_data")
cur.fetchone()
cur.commit()

查看历史查询,验证已经使用COPY方式写入数据至Hologres。历史慢Query

上一篇: JDBC 下一篇: 向量计算SDK
阿里云首页 实时数仓 Hologres 相关技术圈