本指南为您提供了从Amazon DynamoDB迁移至PolarDB PostgreSQL版详尽的操作步骤和最佳实践。PolarDB提供了一套专用的迁移工具,通过全量同步与增量同步相结合的方式,帮助您实现平滑、低停机时间的数据迁移。
迁移流程概述
迁移过程主要分为五个阶段,由nimo-shake(数据同步,包括全量同步与增量同步)、nimo-full-check(数据校验)和PolarDBBackSync(数据反向同步)三个核心工具协同完成。
全量同步(Full Synchronization)
工具:
nimo-shake过程:工具首先自动在目标PolarDB集群中创建与源端一致的表结构。随后,通过并发
Scan操作高效读取源端数据库的全量数据,并使用BatchWriteItem批量写入目标集群。
增量同步(Incremental Synchronization)
工具:
nimo-shake过程:全量同步完成后,工具会自动利用AWS DynamoDB Streams机制,实时捕获源端自迁移启动以来的所有数据变更(增、删、改),并将其同步到目标集群,确保数据最终一致。该过程支持断点续传。
一致性校验(Consistency Validation)
工具:
nimo-full-check过程:在数据同步期间或之后,可随时运行此工具。它会并发地从源端和目标端读取数据,按主键进行比对,并生成详细的差异报告,以验证数据完整性。
(可选)反向同步(Reverse Synchronization)
工具:
PolarDBBackSync.jar(基于阿里云实时计算Flink版)过程:验证完数据一致性后,为确保业务回滚时数据的完整性,可以创建从PolarDB PostgreSQL版到源端DynamoDB的反向同步。该工具基于Flink实时捕获源端PolarDB的变更数据,并根据变更类型调用DynamoDB的PutItem或DeleteItem接口同步更新DynamoDB的数据。
业务割接(Business Cutover)
过程:当增量数据延迟极低且一致性校验无差异后,短暂停止业务写入,待所有数据同步完毕,即可将应用连接切换至PolarDB集群,完成迁移。
注意事项
性能影响:数据迁移过程,尤其是全量同步阶段,会对源数据库和目标数据库产生一定的读写负载。建议您在业务低峰期执行迁移,并提前评估数据库的承载能力。
安全配置:在业务割接前,建议对目标PolarDB集群的写入权限进行管控,仅允许数据同步工具的账号写入,防止意外数据污染。
准备工作
在开始迁移前,请确保您已完成以下准备工作:
获取工具包:
迁移工具包:NimoShake.tar.gz。其中包含
nimo-shake和nimo-full-check两种迁移工具包。(可选)反向迁移工具包:PolarDBBackSync.jar。
PolarDB集群:
为已有集群或新集群开启兼容DynamoDB能力,并获取DynamoDB访问地址和创建DynamoDB专用账号用于API访问的身份凭证(AccessKey)。
(可选)参数配置:若需配置反向同步,则需将PolarDB集群的
wal_level参数修改为logical。由于该参数的调整需重启集群,建议在整体迁移流程开始之前完成此项设置。
AWS DynamoDB:
获取AWS DynamoDB的访问凭证(AccessKey ID和Secret Access Key)。
为需要迁移的源端DynamoDB表开启DynamoDB Streams功能。
运行环境:准备一台ECS实例或其他能够与PolarDB集群及AWS DynamoDB连接的服务器,以便运行迁移工具包。
迁移实施步骤
步骤一:配置并启动数据同步
解压
nimo-shake工具包,进入NimoShake/nimo-shake目录,编辑配置文件conf/nimo-shake.conf。以下是核心配置项说明:参数
说明
示例值
sync_mode同步模式。
all表示全量+增量,full表示仅全量。allsource.access_key_id源端AWS DynamoDB的AccessKey ID。
AKIAIOSFODNN7...source.secret_access_key源端AWS DynamoDB的Secret Access Key。
wJalrXUtnFEMI...source.region源端AWS DynamoDB所在的区域。
cn-north-1source.endpoint_url源端连接地址。对于AWS DynamoDB,此项留空。
-
target.address目标PolarDB的DynamoDB访问地址(含http://与端口)。
http://pe-xxx.rwlb.rds...target.access_key_id目标PolarDB的DynamoDB账号AccessKey。
your-polardb-access-keytarget.secret_access_key目标PolarDB的DynamoDB账号SecretKey。
your-polardb-secret-keyfilter.collection.white表过滤白名单,表示仅同步指定表。多个表之间用
;相隔。说明不可与黑名单同时使用,同时指定时,代表全部同步。
c1;c2filter.collection.black表过滤黑名单,表示同步时,过滤指定表不进行同步。多个表之间用
;相隔。说明不可与白名单同时使用,同时指定时,代表全部同步。
c1;c2根据您的操作系统架构,选择对应的二进制文件启动同步任务。
说明建议您在后台
nohup运行,避免因终端服务断开而导致同步任务中断。# 返回nimo-shake目录 cd .. # 以Linux x86-64架构为例启动任务 nohup ./bin/nimo-shake.linux.amd64 -conf=./conf/nimo-shake.conf > nimo-shake.log 2>&1 &程序将首先进行全量同步,完成后自动转入增量同步阶段,并持续运行。
步骤二:执行数据一致性校验
进入
NimoShake/nimo-full-check目录,编辑其配置文件conf/nimo-full-check.conf。配置项与nimo-shake基本一致,需分别填写源端和目标端的连接信息。启动校验任务。
# 返回nimo-full-check目录 cd .. # 以Linux x86-64架构为例 nohup ./bin/nimo-full-check.linux.amd64 -conf=./conf/nimo-full-check.conf > nimo-full-check.log 2>&1 &校验工具会在终端实时输出进度,并将详细日志和数据差异报告分别存放在
logs/和nimo-full-check-diff/目录中。
步骤三:(可选)配置反向同步
在准备进行业务割接前,您可以预先配置从PolarDB PostgreSQL版到源端DynamoDB的反向同步链路。该链路在业务正式切换至PolarDB期间启动,用于实现数据的反向回流,为业务回滚提供保障。配置流程如下:
环境准备
确保PolarDB参数
wal_level的值为logical。创建高权限数据库账号并授权:
(可选)如果您尚未创建高权限账号,请前往PolarDB控制台,在集群的中创建高权限账号。
授权:在集群的中,找到
polardb_internal_dynamodb数据库,并修改其Owner为高权限账号。
创建逻辑复制槽和发布:使用高权限账号连接至
polardb_internal_dynamodb数据库,执行以下SQL命令以创建一个逻辑复制槽,并向DynamoDB账号授予复制权限,最终发布包含该数据库下所有表的订阅。您可以查看所创建逻辑复制槽的活跃状态。-- 创建逻辑复制槽,'flink_slot' 名称需与后续 Flink 配置保持一致 SELECT * FROM pg_create_logical_replication_slot('flink_slot', 'pgoutput'); -- 为之前创建的 DynamoDB 专用账号授予 REPLICATION 权限 -- 将 <your_dynamodb_user> 替换为您的 DynamoDB 专用账号名 ALTER ROLE <your_dynamodb_user> REPLICATION; -- 为所有表创建发布,以便逻辑复制槽可以捕获变更 CREATE PUBLICATION dbz_publication FOR ALL TABLES; -- 检查复制槽状态,此时 flink_slot 的 active 应为 f (false) SELECT * FROM pg_replication_slots;开通并配置Flink:
开通实时计算Flink版,并创建一个Flink工作空间。
重要Flink工作空间需与PolarDB集群位于同一个VPC下。
为Flink工作空间配置公网访问,使其能够连接AWS DynamoDB。
配置PolarDB集群的IP白名单:
在Flink控制台,单击工作空间的详情按钮,在工作空间详情页面获取其网段信息。

前往PolarDB控制台,在集群的中新增IP白名单分组,将Flink的网段信息添加进去。
验证PolarDB集群与Flink工作空间连通性:
在Flink控制台,进入工作空间,单击右上角的网络探测图标。

填写PolarDB集群主节点的私有地址与端口,单击探测。
弹窗提示网络探测成功连通,即集群白名单配置正确。
部署Flink作业
下载反向同步工具:PolarDBBackSync.jar。
准备配置文件:创建名为
application.yaml的配置文件,内容如下:source: # PolarDB主节点的私网地址 hostname: pc-xxx.pg.polardb.rds.aliyuncs.com # PolarDB主节点的私网端口 port: 5432 # 之前创建的逻辑复制槽的名称 slotName: flink_slot target: # 目标 AWS DynamoDB 的 region region: cn-north-1 # (可选) 表过滤配置,whiteTableSet 和 blackTableSet 只能声明一个 filter: # whiteTableSet: 需要反向回流的表 # blackTableSet: 不需要反向回流的表 whiteTableSet: blackTableSet: # Flink 作业的检查点(checkpoint)间隔,单位毫秒 checkpoint: interval: 3000上传文件:进入Flink控制台,找到并进入目标工作空间,在文件管理页面上传
PolarDBBackSync.jar和application.yaml。安全存储凭证:为避免明文暴露密钥,建议使用Flink的变量管理功能存储敏感信息。在变量管理页面新增以下四个变量:
变量名称
变量值
polardbusernamePolarDB的DynamoDB账号。
polardbpasswordPolarDB的DynamoDB账号密码。
dynamodbakAWS DynamoDB的AccessKey。
dynamodbskAWS DynamoDB的SecretKey。
部署并启动作业:
进入作业运维页面,选择。
填写以下主要参数,其他参数可根据业务环境进行配置。然后单击部署。
参数名称
填写参考
部署模式
固定为流模式。
部署名称
填写作业部署名称,此处以PolarDBBackSync为例。
引擎版本
固定为vvr-11.3-jdk11-flink-1.20。
JAR URI
选择已上传的
PolarDBBackSync.jar。Entry Point Class
固定为
org.example.PolarDBCdcJob。Entry Point Main Arguments
固定为:
--polardbusername ${secret_values.polardbusername}--polardbpassword ${secret_values.polardbpassword}--dynamodbak ${secret_values.dynamodbak}--dynamodbsk ${secret_values.dynamodbsk}附加依赖文件
选择已上传的
application.yaml部署成功后,单击。
验证与清理
验证:在作业启动后,使用高权限账号连接至PolarDB集群的
polardb_internal_dynamodb数据库,并执行SELECT * FROM pg_replication_slots;。若复制槽flink_slot的active字段变为t(true),则表示Flink作业已成功连接。此时即可在PolarDB集群内开始导入业务流量。清理:当不再需要进行反向同步时,您可执行以下步骤以释放相关资源节省费用。
实时计算Flink版:
停止作业:前往Flink控制台,在目标工作空间中,进入作业运维页面,找到目标作业并单击停止
释放实例:返回Flink控制台,找到目标工作空间,单击释放资源。
PolarDB集群:使用高权限账号连接至
polardb_internal_dynamodb数据库,执行以下命令删除逻辑复制槽。SELECT pg_drop_replication_slot('flink_slot');
步骤四:执行业务割接
增量同步延迟较低且数据一致性校验无差异后,可计划业务割接。当您准备好进行最终的业务切换时,请遵循以下严谨的步骤:
最终校验:在计划的停机窗口前,反复运行一致性校验工具,直至确认增量同步延迟极低,且数据差异数量降至0或可接受的范围内。
停止源端写入:在停机窗口开始时,暂停所有向源端AWS DynamoDB写入数据的业务应用。
等待同步完成:观察
nimo-shake的日志,确认已无新的增量数据需要同步。最后一次校验:再次运行
nimo-full-check工具,确保源端和目标端的数据完全一致。停止同步工具:在确认数据完全同步后,停止
nimo-shake进程。切换应用连接:停止业务应用,将业务应用的数据库连接配置,从AWS DynamoDB的地址切换为PolarDB的DynamoDB访问地址。
(可选)开启反向同步任务:在业务正式切换至PolarDB期间开启反向同步任务,基于阿里云实时计算Flink版实现数据的反向回流,为业务回滚提供保障。
启动业务:重启业务应用。至此,割接完成。
(可选)停止反向同步任务:在割接完成且业务稳定运行一段时间后,确认数据一致性满足业务需求后,即可安全停止反向同步任务(停止Flink作业与释放相关资源)。
附录:为测试环境模拟实时业务流量
如果您希望在测试环境中模拟一个真实的、持续有数据写入的迁移场景,可以使用以下Go语言示例代码。该代码会向源端DynamoDB表周期性地写入和更新数据。
此步骤仅用于测试和验证迁移流程,在实际生产迁移中无需执行。
package main
import (
"context"
"fmt"
"log"
"math/rand"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
)
// --- Configuration for your source AWS DynamoDB ---
var (
region = "cn-north-1" // AWS DynamoDB region
accessKey = "your-aws-access-key" // AWS DynamoDB access key
secretKey = "your-aws-secret-key" // AWS DynamoDB secret key
)
// --- Helper function to create a DynamoDB client ---
func createClient() (*dynamodb.Client, context.Context) {
ctx := context.Background()
sdkConfig, err := config.LoadDefaultConfig(ctx, config.WithRegion(region))
if err != nil {
log.Fatalf("Failed to load AWS config: %v", err)
}
client := dynamodb.NewFromConfig(sdkConfig, func(o *dynamodb.Options) {
o.Credentials = credentials.NewStaticCredentialsProvider(accessKey, secretKey, "")
})
return client, ctx
}
// --- Function to create a table and populate it with initial data ---
func initializeData(client *dynamodb.Client, ctx context.Context) {
tableName := "src1" // Example table name
// Create table if not exists
_, err := client.CreateTable(ctx, &dynamodb.CreateTableInput{
TableName: &tableName,
AttributeDefinitions: []types.AttributeDefinition{
{AttributeName: aws.String("pk"), AttributeType: types.ScalarAttributeTypeS},
},
KeySchema: []types.KeySchemaElement{
{AttributeName: aws.String("pk"), KeyType: types.KeyTypeHash},
},
ProvisionedThroughput: &types.ProvisionedThroughput{
ReadCapacityUnits: aws.Int64(100),
WriteCapacityUnits: aws.Int64(100),
},
})
if err != nil {
// Ignore if table already exists, fail on other errors
if _, ok := err.(*types.ResourceInUseException); !ok {
log.Fatalf("CreateTable failed for %s: %v", tableName, err)
}
}
fmt.Printf("Waiting for table '%s' to become active...\n", tableName)
waiter := dynamodb.NewTableExistsWaiter(client)
err = waiter.Wait(ctx, &dynamodb.DescribeTableInput{TableName: &tableName}, 5*time.Minute)
if err != nil {
log.Fatalf("Waiter failed for table %s: %v", tableName, err)
}
// Insert 100 sample items
for i := 0; i < 100; i++ {
pk := fmt.Sprintf("%s_user_%03d", tableName, i)
item := map[string]types.AttributeValue{
"pk": &types.AttributeValueMemberS{Value: pk},
"val": &types.AttributeValueMemberN{Value: fmt.Sprintf("%d", i)},
}
client.PutItem(ctx, &dynamodb.PutItemInput{TableName: &tableName, Item: item})
}
fmt.Printf("Inserted 100 initial items into '%s'.\n", tableName)
}
// --- Function to simulate continuous business traffic ---
func simulateTraffic(client *dynamodb.Client, ctx context.Context) {
tableName := "src1"
fmt.Println("Starting periodic updates to simulate traffic. Press Ctrl+C to stop.")
i := 0
for {
pk := fmt.Sprintf("%s_user_%03d", tableName, i%100)
newValue := fmt.Sprintf("%d", rand.Intn(1000))
_, err := client.UpdateItem(ctx, &dynamodb.UpdateItemInput{
TableName: &tableName,
Key: map[string]types.AttributeValue{
"pk": &types.AttributeValueMemberS{Value: pk},
},
ExpressionAttributeValues: map[string]types.AttributeValue{
":newval": &types.AttributeValueMemberN{Value: newValue},
},
UpdateExpression: aws.String("SET val = :newval"),
})
if err != nil {
fmt.Printf("Update error: %v\n", err)
} else {
fmt.Printf("Updated pk=%s with new val=%s\n", pk, newValue)
}
i++
time.Sleep(1 * time.Second) // Update one record per second
}
}
func main() {
client, ctx := createClient()
initializeData(client, ctx)
simulateTraffic(client, ctx)
}