数据迁移指南

本指南为您提供了从Amazon DynamoDB迁移至PolarDB PostgreSQL详尽的操作步骤和最佳实践。PolarDB提供了一套专用的迁移工具,通过全量同步与增量同步相结合的方式,帮助您实现平滑、低停机时间的数据迁移。

迁移流程概述

迁移过程主要分为四个阶段,由nimo-shake(数据同步)和nimo-full-check(数据校验)两个核心工具协同完成。

image
  1. 全量同步(Full Synchronization)

    • 工具:nimo-shake

    • 过程:工具首先自动在目标PolarDB集群中创建与源端一致的表结构。随后,通过并发Scan操作高效读取源端数据库的全量数据,并使用BatchWriteItem批量写入目标集群。

  2. 增量同步(Incremental Synchronization)

    • 工具:nimo-shake

    • 过程:全量同步完成后,工具会自动利用AWS DynamoDB Streams机制,实时捕获源端自迁移启动以来的所有数据变更(增、删、改),并将其同步到目标集群,确保数据最终一致。该过程支持断点续传。

  3. 一致性校验(Consistency Validation)

    • 工具:nimo-full-check

    • 过程:在数据同步期间或之后,可随时运行此工具。它会并发地从源端和目标端读取数据,按主键进行比对,并生成详细的差异报告,以验证数据完整性。

  4. 业务割接(Business Cutover)

    • 过程:当增量数据延迟极低且一致性校验无差异后,短暂停止业务写入,待所有数据同步完毕,即可将应用连接切换至PolarDB集群,完成迁移。

注意事项

  • 性能影响:数据迁移过程,尤其是全量同步阶段,会对源数据库和目标数据库产生一定的读写负载。建议您在业务低峰期执行迁移,并提前评估数据库的承载能力。

  • 安全配置:在业务割接前,建议对目标PolarDB集群的写入权限进行管控,仅允许数据同步工具的账号写入,防止意外数据污染。

前提条件

在开始迁移前,请确保您已完成以下准备工作:

  1. 获取迁移工具包:NimoShake.tar.gz。其中包含nimo-shakenimo-full-check两种迁移工具包。

  2. 为已有集群或新集群开启兼容DynamoDB能力,并获取DynamoDB访问地址创建DynamoDB专用账号用于API访问的身份凭证(AccessKey)。

  3. 获取源端AWS DynamoDB的访问凭证(AccessKey IDSecret Access Key)。

  4. 为需要迁移的源端DynamoDB表开启DynamoDB Streams功能。

迁移实施步骤

步骤一:配置并启动数据同步

  1. 解压nimo-shake工具包,编辑配置文件conf/nimo-shake.conf。以下是核心配置项说明:

    参数

    说明

    示例值

    sync_mode

    同步模式。all表示全量+增量,full表示仅全量。

    all

    source.access_key_id

    源端AWS DynamoDBAccessKey ID。

    AKIAIOSFODNN7...

    source.secret_access_key

    源端AWS DynamoDBSecret Access Key。

    wJalrXUtnFEMI...

    source.region

    源端AWS DynamoDB所在的区域。

    cn-north-1

    source.endpoint_url

    源端连接地址。对于AWS DynamoDB,此项留空。

    -

    target.address

    目标PolarDBDynamoDB访问地址(含http://与端口)。

    http://pe-xxx.rwlb.rds...

    target.access_key_id

    目标PolarDBDynamoDB账号AccessKey。

    your-polardb-access-key

    target.secret_access_key

    目标PolarDBDynamoDB账号SecretKey。

    your-polardb-secret-key

    filter.collection.white

    表过滤白名单,表示仅同步指定表。多个表之间用;相隔。

    说明

    不可与黑名单同时使用,同时指定时,代表全部同步。

    c1;c2

    filter.collection.black

    表过滤黑名单,表示同步时,过滤指定表不进行同步。多个表之间用;相隔。

    说明

    不可与白名单同时使用,同时指定时,代表全部同步。

    c1;c2

  2. 根据您的操作系统架构,选择对应的二进制文件启动同步任务。

    说明

    建议您在后台运行,避免因终端服务断开而导致同步任务中断。

    # 以Linux x86-64架构为例
    ./bin/nimo-shake.linux.amd64 -conf=./conf/nimo-shake.conf

    程序将首先进行全量同步,完成后自动转入增量同步阶段,并持续运行。

步骤二:执行数据一致性校验

  1. 编辑其配置文件conf/nimo-full-check.conf。配置项与nimo-shake基本一致,需分别填写源端和目标端的连接信息。

  2. 启动校验任务。

    # 以Linux x86-64架构为例
    ./bin/nimo-shake.linux.amd64 -conf=./conf/nimo-shake.conf

    校验工具会在终端实时输出进度,并将详细日志和数据差异报告分别存放在logs/nimo-full-check-diff/目录中。 image.png

步骤三:执行业务割接

当您准备好进行最终的业务切换时,请遵循以下严谨的步骤:

  1. 最终校验:在计划的停机窗口前,反复运行一致性校验工具,直至确认增量同步延迟极低,且数据差异数量降至0或可接受的范围内。

  2. 停止源端写入:在停机窗口开始时,暂停所有向源端AWS DynamoDB写入数据的业务应用。

  3. 等待同步完成:观察nimo-shake的日志,确认已无新的增量数据需要同步。

  4. 最后一次校验:再次运行nimo-full-check工具,确保源端和目标端的数据完全一致。

  5. 停止同步工具:在确认数据完全同步后,停止nimo-shake进程。

  6. 切换应用连接:将业务应用的数据库连接配置,从AWS DynamoDB的地址切换为PolarDBDynamoDB访问地址。

  7. 启动业务:重启业务应用。至此,割接完成。

附录:为测试环境模拟实时业务流量

如果您希望在测试环境中模拟一个真实的、持续有数据写入的迁移场景,可以使用以下Go语言示例代码。该代码会向源端DynamoDB表周期性地写入和更新数据。

说明

此步骤仅用于测试和验证迁移流程,在实际生产迁移中无需执行。

package main

import (
	"context"
	"fmt"
	"log"
	"math/rand"
	"time"

	"github.com/aws/aws-sdk-go-v2/aws"
	"github.com/aws/aws-sdk-go-v2/config"
	"github.com/aws/aws-sdk-go-v2/credentials"
	"github.com/aws/aws-sdk-go-v2/service/dynamodb"
	"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
)

// --- Configuration for your source AWS DynamoDB ---
var (
    region    = "cn-north-1"       // AWS DynamoDB region
    accessKey = "your-aws-access-key" // AWS DynamoDB access key
    secretKey = "your-aws-secret-key" // AWS DynamoDB secret key
)

// --- Helper function to create a DynamoDB client ---
func createClient() (*dynamodb.Client, context.Context) {
    ctx := context.Background()
    sdkConfig, err := config.LoadDefaultConfig(ctx, config.WithRegion(region))
    if err != nil {
        log.Fatalf("Failed to load AWS config: %v", err)
    }
    client := dynamodb.NewFromConfig(sdkConfig, func(o *dynamodb.Options) {
        o.Credentials = credentials.NewStaticCredentialsProvider(accessKey, secretKey, "")
    })
    return client, ctx
}

// --- Function to create a table and populate it with initial data ---
func initializeData(client *dynamodb.Client, ctx context.Context) {
    tableName := "src1" // Example table name

    // Create table if not exists
    _, err := client.CreateTable(ctx, &dynamodb.CreateTableInput{
        TableName: &tableName,
        AttributeDefinitions: []types.AttributeDefinition{
            {AttributeName: aws.String("pk"), AttributeType: types.ScalarAttributeTypeS},
        },
        KeySchema: []types.KeySchemaElement{
            {AttributeName: aws.String("pk"), KeyType: types.KeyTypeHash},
        },
        ProvisionedThroughput: &types.ProvisionedThroughput{
            ReadCapacityUnits:  aws.Int64(100),
            WriteCapacityUnits: aws.Int64(100),
        },
    })
    if err != nil {
		// Ignore if table already exists, fail on other errors
        if _, ok := err.(*types.ResourceInUseException); !ok {
			log.Fatalf("CreateTable failed for %s: %v", tableName, err)
		}
    }

    fmt.Printf("Waiting for table '%s' to become active...\n", tableName)
    waiter := dynamodb.NewTableExistsWaiter(client)
    err = waiter.Wait(ctx, &dynamodb.DescribeTableInput{TableName: &tableName}, 5*time.Minute)
    if err != nil {
        log.Fatalf("Waiter failed for table %s: %v", tableName, err)
    }

    // Insert 100 sample items
    for i := 0; i < 100; i++ {
        pk := fmt.Sprintf("%s_user_%03d", tableName, i)
        item := map[string]types.AttributeValue{
            "pk":  &types.AttributeValueMemberS{Value: pk},
            "val": &types.AttributeValueMemberN{Value: fmt.Sprintf("%d", i)},
        }
        client.PutItem(ctx, &dynamodb.PutItemInput{TableName: &tableName, Item: item})
    }
    fmt.Printf("Inserted 100 initial items into '%s'.\n", tableName)
}

// --- Function to simulate continuous business traffic ---
func simulateTraffic(client *dynamodb.Client, ctx context.Context) {
    tableName := "src1"
    fmt.Println("Starting periodic updates to simulate traffic. Press Ctrl+C to stop.")
    i := 0
    for {
        pk := fmt.Sprintf("%s_user_%03d", tableName, i%100)
        newValue := fmt.Sprintf("%d", rand.Intn(1000))
        _, err := client.UpdateItem(ctx, &dynamodb.UpdateItemInput{
            TableName: &tableName,
            Key: map[string]types.AttributeValue{
                "pk": &types.AttributeValueMemberS{Value: pk},
            },
            ExpressionAttributeValues: map[string]types.AttributeValue{
                ":newval": &types.AttributeValueMemberN{Value: newValue},
            },
            UpdateExpression: aws.String("SET val = :newval"),
        })
        if err != nil {
            fmt.Printf("Update error: %v\n", err)
        } else {
            fmt.Printf("Updated pk=%s with new val=%s\n", pk, newValue)
        }
        i++
        time.Sleep(1 * time.Second) // Update one record per second
    }
}

func main() {
    client, ctx := createClient()
    initializeData(client, ctx)
    simulateTraffic(client, ctx)
}