本指南为您提供了从Amazon DynamoDB迁移至PolarDB PostgreSQL版详尽的操作步骤和最佳实践。PolarDB提供了一套专用的迁移工具,通过全量同步与增量同步相结合的方式,帮助您实现平滑、低停机时间的数据迁移。
迁移流程概述
迁移过程主要分为四个阶段,由nimo-shake
(数据同步)和nimo-full-check
(数据校验)两个核心工具协同完成。
全量同步(Full Synchronization)
工具:
nimo-shake
过程:工具首先自动在目标PolarDB集群中创建与源端一致的表结构。随后,通过并发
Scan
操作高效读取源端数据库的全量数据,并使用BatchWriteItem
批量写入目标集群。
增量同步(Incremental Synchronization)
工具:
nimo-shake
过程:全量同步完成后,工具会自动利用AWS DynamoDB Streams机制,实时捕获源端自迁移启动以来的所有数据变更(增、删、改),并将其同步到目标集群,确保数据最终一致。该过程支持断点续传。
一致性校验(Consistency Validation)
工具:
nimo-full-check
过程:在数据同步期间或之后,可随时运行此工具。它会并发地从源端和目标端读取数据,按主键进行比对,并生成详细的差异报告,以验证数据完整性。
业务割接(Business Cutover)
过程:当增量数据延迟极低且一致性校验无差异后,短暂停止业务写入,待所有数据同步完毕,即可将应用连接切换至PolarDB集群,完成迁移。
注意事项
性能影响:数据迁移过程,尤其是全量同步阶段,会对源数据库和目标数据库产生一定的读写负载。建议您在业务低峰期执行迁移,并提前评估数据库的承载能力。
安全配置:在业务割接前,建议对目标PolarDB集群的写入权限进行管控,仅允许数据同步工具的账号写入,防止意外数据污染。
前提条件
在开始迁移前,请确保您已完成以下准备工作:
获取迁移工具包:NimoShake.tar.gz。其中包含
nimo-shake
和nimo-full-check
两种迁移工具包。为已有集群或新集群开启兼容DynamoDB能力,并获取DynamoDB访问地址和创建DynamoDB专用账号用于API访问的身份凭证(AccessKey)。
获取源端AWS DynamoDB的访问凭证(AccessKey ID和Secret Access Key)。
为需要迁移的源端DynamoDB表开启DynamoDB Streams功能。
迁移实施步骤
步骤一:配置并启动数据同步
解压
nimo-shake
工具包,编辑配置文件conf/nimo-shake.conf
。以下是核心配置项说明:参数
说明
示例值
sync_mode
同步模式。
all
表示全量+增量,full
表示仅全量。all
source.access_key_id
源端AWS DynamoDB的AccessKey ID。
AKIAIOSFODNN7...
source.secret_access_key
源端AWS DynamoDB的Secret Access Key。
wJalrXUtnFEMI...
source.region
源端AWS DynamoDB所在的区域。
cn-north-1
source.endpoint_url
源端连接地址。对于AWS DynamoDB,此项留空。
-
target.address
目标PolarDB的DynamoDB访问地址(含http://与端口)。
http://pe-xxx.rwlb.rds...
target.access_key_id
目标PolarDB的DynamoDB账号AccessKey。
your-polardb-access-key
target.secret_access_key
目标PolarDB的DynamoDB账号SecretKey。
your-polardb-secret-key
filter.collection.white
表过滤白名单,表示仅同步指定表。多个表之间用
;
相隔。说明不可与黑名单同时使用,同时指定时,代表全部同步。
c1;c2
filter.collection.black
表过滤黑名单,表示同步时,过滤指定表不进行同步。多个表之间用
;
相隔。说明不可与白名单同时使用,同时指定时,代表全部同步。
c1;c2
根据您的操作系统架构,选择对应的二进制文件启动同步任务。
说明建议您在后台运行,避免因终端服务断开而导致同步任务中断。
# 以Linux x86-64架构为例 ./bin/nimo-shake.linux.amd64 -conf=./conf/nimo-shake.conf
程序将首先进行全量同步,完成后自动转入增量同步阶段,并持续运行。
步骤二:执行数据一致性校验
编辑其配置文件
conf/nimo-full-check.conf
。配置项与nimo-shake
基本一致,需分别填写源端和目标端的连接信息。启动校验任务。
# 以Linux x86-64架构为例 ./bin/nimo-shake.linux.amd64 -conf=./conf/nimo-shake.conf
校验工具会在终端实时输出进度,并将详细日志和数据差异报告分别存放在
logs/
和nimo-full-check-diff/
目录中。
步骤三:执行业务割接
当您准备好进行最终的业务切换时,请遵循以下严谨的步骤:
最终校验:在计划的停机窗口前,反复运行一致性校验工具,直至确认增量同步延迟极低,且数据差异数量降至0或可接受的范围内。
停止源端写入:在停机窗口开始时,暂停所有向源端AWS DynamoDB写入数据的业务应用。
等待同步完成:观察
nimo-shake
的日志,确认已无新的增量数据需要同步。最后一次校验:再次运行
nimo-full-check
工具,确保源端和目标端的数据完全一致。停止同步工具:在确认数据完全同步后,停止
nimo-shake
进程。切换应用连接:将业务应用的数据库连接配置,从AWS DynamoDB的地址切换为PolarDB的DynamoDB访问地址。
启动业务:重启业务应用。至此,割接完成。
附录:为测试环境模拟实时业务流量
如果您希望在测试环境中模拟一个真实的、持续有数据写入的迁移场景,可以使用以下Go语言示例代码。该代码会向源端DynamoDB表周期性地写入和更新数据。
此步骤仅用于测试和验证迁移流程,在实际生产迁移中无需执行。
package main
import (
"context"
"fmt"
"log"
"math/rand"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
)
// --- Configuration for your source AWS DynamoDB ---
var (
region = "cn-north-1" // AWS DynamoDB region
accessKey = "your-aws-access-key" // AWS DynamoDB access key
secretKey = "your-aws-secret-key" // AWS DynamoDB secret key
)
// --- Helper function to create a DynamoDB client ---
func createClient() (*dynamodb.Client, context.Context) {
ctx := context.Background()
sdkConfig, err := config.LoadDefaultConfig(ctx, config.WithRegion(region))
if err != nil {
log.Fatalf("Failed to load AWS config: %v", err)
}
client := dynamodb.NewFromConfig(sdkConfig, func(o *dynamodb.Options) {
o.Credentials = credentials.NewStaticCredentialsProvider(accessKey, secretKey, "")
})
return client, ctx
}
// --- Function to create a table and populate it with initial data ---
func initializeData(client *dynamodb.Client, ctx context.Context) {
tableName := "src1" // Example table name
// Create table if not exists
_, err := client.CreateTable(ctx, &dynamodb.CreateTableInput{
TableName: &tableName,
AttributeDefinitions: []types.AttributeDefinition{
{AttributeName: aws.String("pk"), AttributeType: types.ScalarAttributeTypeS},
},
KeySchema: []types.KeySchemaElement{
{AttributeName: aws.String("pk"), KeyType: types.KeyTypeHash},
},
ProvisionedThroughput: &types.ProvisionedThroughput{
ReadCapacityUnits: aws.Int64(100),
WriteCapacityUnits: aws.Int64(100),
},
})
if err != nil {
// Ignore if table already exists, fail on other errors
if _, ok := err.(*types.ResourceInUseException); !ok {
log.Fatalf("CreateTable failed for %s: %v", tableName, err)
}
}
fmt.Printf("Waiting for table '%s' to become active...\n", tableName)
waiter := dynamodb.NewTableExistsWaiter(client)
err = waiter.Wait(ctx, &dynamodb.DescribeTableInput{TableName: &tableName}, 5*time.Minute)
if err != nil {
log.Fatalf("Waiter failed for table %s: %v", tableName, err)
}
// Insert 100 sample items
for i := 0; i < 100; i++ {
pk := fmt.Sprintf("%s_user_%03d", tableName, i)
item := map[string]types.AttributeValue{
"pk": &types.AttributeValueMemberS{Value: pk},
"val": &types.AttributeValueMemberN{Value: fmt.Sprintf("%d", i)},
}
client.PutItem(ctx, &dynamodb.PutItemInput{TableName: &tableName, Item: item})
}
fmt.Printf("Inserted 100 initial items into '%s'.\n", tableName)
}
// --- Function to simulate continuous business traffic ---
func simulateTraffic(client *dynamodb.Client, ctx context.Context) {
tableName := "src1"
fmt.Println("Starting periodic updates to simulate traffic. Press Ctrl+C to stop.")
i := 0
for {
pk := fmt.Sprintf("%s_user_%03d", tableName, i%100)
newValue := fmt.Sprintf("%d", rand.Intn(1000))
_, err := client.UpdateItem(ctx, &dynamodb.UpdateItemInput{
TableName: &tableName,
Key: map[string]types.AttributeValue{
"pk": &types.AttributeValueMemberS{Value: pk},
},
ExpressionAttributeValues: map[string]types.AttributeValue{
":newval": &types.AttributeValueMemberN{Value: newValue},
},
UpdateExpression: aws.String("SET val = :newval"),
})
if err != nil {
fmt.Printf("Update error: %v\n", err)
} else {
fmt.Printf("Updated pk=%s with new val=%s\n", pk, newValue)
}
i++
time.Sleep(1 * time.Second) // Update one record per second
}
}
func main() {
client, ctx := createClient()
initializeData(client, ctx)
simulateTraffic(client, ctx)
}