文档

基于Python的应用开发

更新时间:

本文介绍如何在Python中通过PyMySQL连接Lindorm宽表引擎。

前提条件

  • 已安装Python环境,且Python为3.7及以上版本。

  • 已开通MySQL协议兼容功能。如何开通,请参见开通MySQL协议兼容功能

  • 已将客户端IP添加至白名单,具体操作请参见设置白名单

操作步骤

  1. 安装PyMySQL。您也可以通过pip install PyMySQL语句,使用pip进行安装。

  2. 创建连接并配置连接参数。

    connection = pymysql.connect(host='ld-uf6k8yqb741t3****-proxy-sql-lindorm.lindorm.rds.aliyuncs.com', port=33060, user='user', passwd='test', db='default')
    with connection.cursor() as statement:

    参数说明

    参数

    说明

    host

    Lindorm宽表引擎的MySQL兼容地址。如何获取,请参见查看连接地址

    重要
    • 如果应用部署在ECS实例,建议您通过专有网络访问Lindorm实例,可获得更高的安全性和更低的网络延迟。

    • 如果应用部署在本地,在通过公网连接Lindorm实例前,需在控制台开通公网地址。开通方式:在控制台选择数据库连接 > 宽表引擎,在宽表引擎页签单击开通公网地址

    • 通过专有网络访问Lindorm实例,host请填写MySQL兼容地址对应的专有网络地址。通过公网访问Lindorm实例,host请填写MySQL兼容地址对应的公网地址。

    port

    Lindorm宽表引擎MySQL协议的端口,固定为33060。

    user

    如果您忘记用户密码,可以通过Lindorm宽表引擎的集群管理系统修改密码。具体操作,请参见修改用户密码

    passwd

    db

    需要连接的数据库名称。默认连接default数据库。

  3. 通过宽表SQL语法使用Lindorm宽表引擎。以创建表为例。

    sql_create_table = "create table if not exists test_python(c1 integer, c2 integer, primary key(c1))"
    print(sql_create_table)
    statement.execute(sql_create_table)

完整示例

完整示例代码如下:

#!/usr/bin/python
# -*- coding: UTF-8 -*-
import pymysql
# 打开数据库连接。
# host为Lindorm宽表引擎MySQL兼容协议的连接地址。
# port为Lindorm宽表引擎MYSQL兼容协议的端口33060。
# user为Lindorm宽表引擎的用户名。
# passwd为Lindorm宽表引擎的密码。
# db是需要连接的数据库名。
connection = pymysql.connect(host='ld-uf6k8yqb741t3****-proxy-sql-lindorm.lindorm.rds.aliyuncs.com', port=33060, user='user', passwd='test', db='default')
with connection.cursor() as statement:
    # 创建表
    sql_create_table = "create table if not exists test_python(c1 integer, c2 integer, primary key(c1))"
    print(sql_create_table)
    statement.execute(sql_create_table)

    # 插入一行数据
    sql_upsert = "upsert into test_python(c1, c2) values(1,1)"
    print(sql_upsert)
    statement.execute(sql_upsert)

    # 插入多行数据
    with connection.cursor() as stat:
        sql_upsert = "upsert into test_python(c1, c2) values(%s,%s)"
        print(sql_upsert)
        stat.executemany(sql_upsert, [(2, 2), (3, 3)])

    # 删除数据
    sql_delete = "delete from test_python where c1=2"
    print(sql_delete)
    statement.execute(sql_delete)

    # 修改数据
    sql_update = "upsert into test_python(c1, c2) values(1,10)"
    print(sql_update)
    statement.execute(sql_update)

    # 查询
    sql_select = "select * from test_python"
    print(sql_select)
    statement.execute(sql_select)
    rows = statement.fetchall()
    print(rows)


# 关闭连接
connection.close()

执行成功后将返回如下结果:

create table if not exists test_python(c1 integer, c2 integer, primary key(c1))
upsert into test_python(c1, c2) values(1,1)
upsert into test_python(c1, c2) values(%s,%s)
delete from test_python where c1=2
upsert into test_python(c1, c2) values(1,10)
select * from test_python
((1, 10), (3, 3))
  • 本页导读 (1)
文档反馈