数据迁移指南

本指南为您提供了从Amazon DynamoDB迁移至PolarDB PostgreSQL详尽的操作步骤和最佳实践。PolarDB提供了一套专用的迁移工具,通过全量同步与增量同步相结合的方式,帮助您实现平滑、低停机时间的数据迁移。

迁移流程概述

迁移过程主要分为五个阶段,由nimo-shake(数据同步,包括全量同步与增量同步)、nimo-full-check(数据校验)和PolarDBBackSync(数据反向同步)三个核心工具协同完成。

image
  1. 全量同步(Full Synchronization)

    • 工具:nimo-shake

    • 过程:工具首先自动在目标PolarDB集群中创建与源端一致的表结构。随后,通过并发Scan操作高效读取源端数据库的全量数据,并使用BatchWriteItem批量写入目标集群。

  2. 增量同步(Incremental Synchronization)

    • 工具:nimo-shake

    • 过程:全量同步完成后,工具会自动利用AWS DynamoDB Streams机制,实时捕获源端自迁移启动以来的所有数据变更(增、删、改),并将其同步到目标集群,确保数据最终一致。该过程支持断点续传。

  3. 一致性校验(Consistency Validation)

    • 工具:nimo-full-check

    • 过程:在数据同步期间或之后,可随时运行此工具。它会并发地从源端和目标端读取数据,按主键进行比对,并生成详细的差异报告,以验证数据完整性。

  4. (可选)反向同步(Reverse Synchronization)

    • 工具:PolarDBBackSync.jar (基于阿里云实时计算Flink)

    • 过程:验证完数据一致性后,为确保业务回滚时数据的完整性,可以创建从PolarDB PostgreSQL到源端DynamoDB的反向同步。该工具基于Flink实时捕获源端PolarDB的变更数据,并根据变更类型调用DynamoDBPutItemDeleteItem接口同步更新DynamoDB的数据。

  5. 业务割接(Business Cutover)

    • 过程:当增量数据延迟极低且一致性校验无差异后,短暂停止业务写入,待所有数据同步完毕,即可将应用连接切换至PolarDB集群,完成迁移。

注意事项

  • 性能影响:数据迁移过程,尤其是全量同步阶段,会对源数据库和目标数据库产生一定的读写负载。建议您在业务低峰期执行迁移,并提前评估数据库的承载能力。

  • 安全配置:在业务割接前,建议对目标PolarDB集群的写入权限进行管控,仅允许数据同步工具的账号写入,防止意外数据污染。

准备工作

在开始迁移前,请确保您已完成以下准备工作:

  1. 获取工具包

    1. 迁移工具包:NimoShake.tar.gz。其中包含nimo-shakenimo-full-check两种迁移工具包。

    2. (可选)反向迁移工具包:PolarDBBackSync.jar

  2. PolarDB集群

    1. 为已有集群或新集群开启兼容DynamoDB能力,并获取DynamoDB访问地址创建DynamoDB专用账号用于API访问的身份凭证(AccessKey)。

    2. (可选)参数配置:若需配置反向同步,则需将PolarDB集群的wal_level参数修改为logical。由于该参数的调整需重启集群,建议在整体迁移流程开始之前完成此项设置。

  3. AWS DynamoDB

    1. 获取AWS DynamoDB的访问凭证(AccessKey IDSecret Access Key)。

    2. 为需要迁移的源端DynamoDB表开启DynamoDB Streams功能。

  4. 运行环境:准备一台ECS实例或其他能够与PolarDB集群及AWS DynamoDB连接的服务器,以便运行迁移工具包。

迁移实施步骤

步骤一:配置并启动数据同步

  1. 解压nimo-shake工具包,进入NimoShake/nimo-shake目录,编辑配置文件conf/nimo-shake.conf。以下是核心配置项说明:

    参数

    说明

    示例值

    sync_mode

    同步模式。all表示全量+增量,full表示仅全量。

    all

    source.access_key_id

    源端AWS DynamoDBAccessKey ID。

    AKIAIOSFODNN7...

    source.secret_access_key

    源端AWS DynamoDBSecret Access Key。

    wJalrXUtnFEMI...

    source.region

    源端AWS DynamoDB所在的区域。

    cn-north-1

    source.endpoint_url

    源端连接地址。对于AWS DynamoDB,此项留空。

    -

    target.address

    目标PolarDBDynamoDB访问地址(含http://与端口)。

    http://pe-xxx.rwlb.rds...

    target.access_key_id

    目标PolarDBDynamoDB账号AccessKey。

    your-polardb-access-key

    target.secret_access_key

    目标PolarDBDynamoDB账号SecretKey。

    your-polardb-secret-key

    filter.collection.white

    表过滤白名单,表示仅同步指定表。多个表之间用;相隔。

    说明

    不可与黑名单同时使用,同时指定时,代表全部同步。

    c1;c2

    filter.collection.black

    表过滤黑名单,表示同步时,过滤指定表不进行同步。多个表之间用;相隔。

    说明

    不可与白名单同时使用,同时指定时,代表全部同步。

    c1;c2

  2. 根据您的操作系统架构,选择对应的二进制文件启动同步任务。

    说明

    建议您在后台nohup运行,避免因终端服务断开而导致同步任务中断。

    # 返回nimo-shake目录
    cd ..
    # 以Linux x86-64架构为例启动任务
    nohup ./bin/nimo-shake.linux.amd64 -conf=./conf/nimo-shake.conf > nimo-shake.log 2>&1 &

    程序将首先进行全量同步,完成后自动转入增量同步阶段,并持续运行。

步骤二:执行数据一致性校验

  1. 进入NimoShake/nimo-full-check目录,编辑其配置文件conf/nimo-full-check.conf。配置项与nimo-shake基本一致,需分别填写源端和目标端的连接信息。

  2. 启动校验任务。

    # 返回nimo-full-check目录
    cd ..
    # 以Linux x86-64架构为例
    nohup ./bin/nimo-full-check.linux.amd64 -conf=./conf/nimo-full-check.conf > nimo-full-check.log 2>&1 &

    校验工具会在终端实时输出进度,并将详细日志和数据差异报告分别存放在logs/nimo-full-check-diff/目录中。 image.png

步骤三:(可选)配置反向同步

在准备进行业务割接前,您可以预先配置从PolarDB PostgreSQL到源端DynamoDB的反向同步链路。该链路在业务正式切换至PolarDB期间启动,用于实现数据的反向回流,为业务回滚提供保障。配置流程如下:

环境准备

  1. 确保PolarDB参数wal_level的值为logical

  2. 创建高权限数据库账号并授权

    1. (可选)如果您尚未创建高权限账号,请前往PolarDB控制台,在集群的配置与管理 > 账号管理中创建高权限账号。

    2. 授权:在集群的配置与管理 > 数据库管理中,找到polardb_internal_dynamodb数据库,并修改其Owner为高权限账号。

  3. 创建逻辑复制槽和发布:使用高权限账号连接至polardb_internal_dynamodb数据库,执行以下SQL命令以创建一个逻辑复制槽,并向DynamoDB账号授予复制权限,最终发布包含该数据库下所有表的订阅。您可以查看所创建逻辑复制槽的活跃状态。

    -- 创建逻辑复制槽,'flink_slot' 名称需与后续 Flink 配置保持一致
    SELECT * FROM pg_create_logical_replication_slot('flink_slot', 'pgoutput');
    
    -- 为之前创建的 DynamoDB 专用账号授予 REPLICATION 权限
    -- 将 <your_dynamodb_user> 替换为您的 DynamoDB 专用账号名
    ALTER ROLE <your_dynamodb_user> REPLICATION;
    
    -- 为所有表创建发布,以便逻辑复制槽可以捕获变更
    CREATE PUBLICATION dbz_publication FOR ALL TABLES;
    
    -- 检查复制槽状态,此时 flink_slot 的 active 应为 f (false)
    SELECT * FROM pg_replication_slots;
  4. 开通并配置Flink

    1. 开通实时计算Flink,并创建一个Flink工作空间。

      重要

      Flink工作空间需与PolarDB集群位于同一个VPC下。

    2. Flink工作空间配置公网访问,使其能够连接AWS DynamoDB。

  5. 配置PolarDB集群的IP白名单

    1. Flink控制台,单击工作空间的详情按钮,在工作空间详情页面获取其网段信息。image

    2. 前往PolarDB控制台,在集群的配置与管理 > 集群白名单新增IP白名单分组,将Flink的网段信息添加进去。

  6. 验证PolarDB集群与Flink工作空间连通性

    1. Flink控制台,进入工作空间,单击右上角的网络探测图标。image

    2. 填写PolarDB集群主节点的私有地址与端口,单击探测

    3. 弹窗提示网络探测成功连通,即集群白名单配置正确。

部署Flink作业

  1. 下载反向同步工具PolarDBBackSync.jar

  2. 准备配置文件:创建名为application.yaml的配置文件,内容如下:

    source:
        # PolarDB主节点的私网地址
        hostname: pc-xxx.pg.polardb.rds.aliyuncs.com
        # PolarDB主节点的私网端口
        port: 5432
        # 之前创建的逻辑复制槽的名称
        slotName: flink_slot
    
    target:
      # 目标 AWS DynamoDB 的 region
      region: cn-north-1
    
    # (可选) 表过滤配置,whiteTableSet 和 blackTableSet 只能声明一个
    filter:
      # whiteTableSet: 需要反向回流的表
      # blackTableSet: 不需要反向回流的表
      whiteTableSet:
      blackTableSet:
    
    # Flink 作业的检查点(checkpoint)间隔,单位毫秒
    checkpoint:
      interval: 3000
  3. 上传文件:进入Flink控制台,找到并进入目标工作空间,在文件管理页面上传PolarDBBackSync.jarapplication.yaml

  4. 安全存储凭证:为避免明文暴露密钥,建议使用Flink变量管理功能存储敏感信息。在变量管理页面新增以下四个变量:

    变量名称

    变量值

    polardbusername

    PolarDBDynamoDB账号。

    polardbpassword

    PolarDBDynamoDB账号密码。

    dynamodbak

    AWS DynamoDBAccessKey。

    dynamodbsk

    AWS DynamoDBSecretKey。

  5. 部署并启动作业

    1. 进入作业运维页面,选择部署作业 > JAR作业

    2. 填写以下主要参数,其他参数可根据业务环境进行配置。然后单击部署

      参数名称

      填写参考

      部署模式

      固定为流模式

      部署名称

      填写作业部署名称,此处以PolarDBBackSync为例。

      引擎版本

      固定为vvr-11.3-jdk11-flink-1.20

      JAR URI

      选择已上传的PolarDBBackSync.jar

      Entry Point Class

      固定为org.example.PolarDBCdcJob

      Entry Point Main Arguments

      固定为:

      --polardbusername ${secret_values.polardbusername}

      --polardbpassword ${secret_values.polardbpassword}

      --dynamodbak ${secret_values.dynamodbak}

      --dynamodbsk ${secret_values.dynamodbsk}

      附加依赖文件

      选择已上传的application.yaml

    3. 部署成功后,单击启动 > 无状态启动

验证与清理

  • 验证:在作业启动后,使用高权限账号连接至PolarDB集群的polardb_internal_dynamodb数据库,并执行SELECT * FROM pg_replication_slots;。若复制槽flink_slotactive字段变为t(true),则表示Flink作业已成功连接。此时即可在PolarDB集群内开始导入业务流量。

  • 清理:当不再需要进行反向同步时,您可执行以下步骤以释放相关资源节省费用。

    • 实时计算Flink

      • 停止作业:前往Flink控制台,在目标工作空间中,进入作业运维页面,找到目标作业并单击停止

      • 释放实例:返回Flink控制台,找到目标工作空间,单击释放资源

    • PolarDB集群:使用高权限账号连接至polardb_internal_dynamodb数据库,执行以下命令删除逻辑复制槽。

      SELECT pg_drop_replication_slot('flink_slot');

步骤四:执行业务割接

增量同步延迟较低且数据一致性校验无差异后,可计划业务割接。当您准备好进行最终的业务切换时,请遵循以下严谨的步骤:

  1. 最终校验:在计划的停机窗口前,反复运行一致性校验工具,直至确认增量同步延迟极低,且数据差异数量降至0或可接受的范围内。

  2. 停止源端写入:在停机窗口开始时,暂停所有向源端AWS DynamoDB写入数据的业务应用。

  3. 等待同步完成:观察nimo-shake的日志,确认已无新的增量数据需要同步。

  4. 最后一次校验:再次运行nimo-full-check工具,确保源端和目标端的数据完全一致。

  5. 停止同步工具:在确认数据完全同步后,停止nimo-shake进程。

  6. 切换应用连接:停止业务应用,将业务应用的数据库连接配置,从AWS DynamoDB的地址切换为PolarDBDynamoDB访问地址。

  7. (可选)开启反向同步任务:在业务正式切换至PolarDB期间开启反向同步任务,基于阿里云实时计算Flink实现数据的反向回流,为业务回滚提供保障。

  8. 启动业务:重启业务应用。至此,割接完成。

  9. (可选)停止反向同步任务:在割接完成且业务稳定运行一段时间后,确认数据一致性满足业务需求后,即可安全停止反向同步任务(停止Flink作业与释放相关资源)

附录:为测试环境模拟实时业务流量

如果您希望在测试环境中模拟一个真实的、持续有数据写入的迁移场景,可以使用以下Go语言示例代码。该代码会向源端DynamoDB表周期性地写入和更新数据。

说明

此步骤仅用于测试和验证迁移流程,在实际生产迁移中无需执行。

package main

import (
	"context"
	"fmt"
	"log"
	"math/rand"
	"time"

	"github.com/aws/aws-sdk-go-v2/aws"
	"github.com/aws/aws-sdk-go-v2/config"
	"github.com/aws/aws-sdk-go-v2/credentials"
	"github.com/aws/aws-sdk-go-v2/service/dynamodb"
	"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
)

// --- Configuration for your source AWS DynamoDB ---
var (
    region    = "cn-north-1"       // AWS DynamoDB region
    accessKey = "your-aws-access-key" // AWS DynamoDB access key
    secretKey = "your-aws-secret-key" // AWS DynamoDB secret key
)

// --- Helper function to create a DynamoDB client ---
func createClient() (*dynamodb.Client, context.Context) {
    ctx := context.Background()
    sdkConfig, err := config.LoadDefaultConfig(ctx, config.WithRegion(region))
    if err != nil {
        log.Fatalf("Failed to load AWS config: %v", err)
    }
    client := dynamodb.NewFromConfig(sdkConfig, func(o *dynamodb.Options) {
        o.Credentials = credentials.NewStaticCredentialsProvider(accessKey, secretKey, "")
    })
    return client, ctx
}

// --- Function to create a table and populate it with initial data ---
func initializeData(client *dynamodb.Client, ctx context.Context) {
    tableName := "src1" // Example table name

    // Create table if not exists
    _, err := client.CreateTable(ctx, &dynamodb.CreateTableInput{
        TableName: &tableName,
        AttributeDefinitions: []types.AttributeDefinition{
            {AttributeName: aws.String("pk"), AttributeType: types.ScalarAttributeTypeS},
        },
        KeySchema: []types.KeySchemaElement{
            {AttributeName: aws.String("pk"), KeyType: types.KeyTypeHash},
        },
        ProvisionedThroughput: &types.ProvisionedThroughput{
            ReadCapacityUnits:  aws.Int64(100),
            WriteCapacityUnits: aws.Int64(100),
        },
    })
    if err != nil {
		// Ignore if table already exists, fail on other errors
        if _, ok := err.(*types.ResourceInUseException); !ok {
			log.Fatalf("CreateTable failed for %s: %v", tableName, err)
		}
    }

    fmt.Printf("Waiting for table '%s' to become active...\n", tableName)
    waiter := dynamodb.NewTableExistsWaiter(client)
    err = waiter.Wait(ctx, &dynamodb.DescribeTableInput{TableName: &tableName}, 5*time.Minute)
    if err != nil {
        log.Fatalf("Waiter failed for table %s: %v", tableName, err)
    }

    // Insert 100 sample items
    for i := 0; i < 100; i++ {
        pk := fmt.Sprintf("%s_user_%03d", tableName, i)
        item := map[string]types.AttributeValue{
            "pk":  &types.AttributeValueMemberS{Value: pk},
            "val": &types.AttributeValueMemberN{Value: fmt.Sprintf("%d", i)},
        }
        client.PutItem(ctx, &dynamodb.PutItemInput{TableName: &tableName, Item: item})
    }
    fmt.Printf("Inserted 100 initial items into '%s'.\n", tableName)
}

// --- Function to simulate continuous business traffic ---
func simulateTraffic(client *dynamodb.Client, ctx context.Context) {
    tableName := "src1"
    fmt.Println("Starting periodic updates to simulate traffic. Press Ctrl+C to stop.")
    i := 0
    for {
        pk := fmt.Sprintf("%s_user_%03d", tableName, i%100)
        newValue := fmt.Sprintf("%d", rand.Intn(1000))
        _, err := client.UpdateItem(ctx, &dynamodb.UpdateItemInput{
            TableName: &tableName,
            Key: map[string]types.AttributeValue{
                "pk": &types.AttributeValueMemberS{Value: pk},
            },
            ExpressionAttributeValues: map[string]types.AttributeValue{
                ":newval": &types.AttributeValueMemberN{Value: newValue},
            },
            UpdateExpression: aws.String("SET val = :newval"),
        })
        if err != nil {
            fmt.Printf("Update error: %v\n", err)
        } else {
            fmt.Printf("Updated pk=%s with new val=%s\n", pk, newValue)
        }
        i++
        time.Sleep(1 * time.Second) // Update one record per second
    }
}

func main() {
    client, ctx := createClient()
    initializeData(client, ctx)
    simulateTraffic(client, ctx)
}