连接DynamoDB兼容版实例

更新时间:
复制为 MD 格式

本文介绍如何配置访问控制、获取连接凭证,并使用不同语言的SDK连接DynamoDB兼容版实例。

步骤一:配置访问控制与获取连接信息

在连接数据库前,您需要完成安全设置,并从控制台获取连接凭证。

  1. 创建账号:在实例详情页的左侧导航栏,单击账号管理 ,创建用于访问数据库的账号和密码。更多信息,请参见创建和管理账号

  2. 设置白名单:在实例详情页的左侧导航栏,单击白名单设置,添加允许访问该实例的客户端IP地址或IP地址段。更多信息,请参见设置白名单

  3. 获取连接凭证:完成白名单设置后,在实例详情页的连接信息区域,获取连接所需的信息:

    • 实例连接地址:找到连接地址并复制。该地址用作SDKEndpoint参数的值。实例默认提供专有网络连接地址,如需公网访问可单击公网访问右侧申请按钮开通,开通公网访问不会产生额外费用。

    • CA证书:在连接地址右侧,单击下载CA证书,获取CA证书文件(ApsaraDB-CA-Chain.pem)。该文件用于验证服务端证书,确保连接安全。

步骤二:连接实例与执行数据操作

您可以使用任何兼容DynamoDBSDK连接实例并执行数据操作。在应用程序代码中,您需要配置以下核心参数:

  • Endpoint:设置为您在步骤一获取的实例连接地址。

  • 身份凭证 (Credentials):使用您在步骤一创建的账号和密码进行身份验证。

  • CA证书路径:可以以通过以下两种方式配置 CA 证书:

    • 直接在代码中指定证书文件路径(例如修改例如verifyca_path参数)。

    • 通过设置环境变量(如 AWS_CA_BUNDLE 或 NODE_EXTRA_CA_CERTS)实现无代码侵入式加载。更多操作细节请查阅 AWS SDK相关说明

以下是不同编程语言的示例代码:

Python

import boto3
from botocore.exceptions import ClientError

def test_dynamodb_operations():
    # 1. 配置连接参数
    # 替换为您的实例连接地址
    endpoint = "https://tt-2****b.tairskvddb.rds.aliyuncs.com:6379"
    # 替换为您的CA证书文件路径,如果使用AWS_CA_BUNDLE环境变量则不需要此参数
    ca_path = '/root/ApsaraDB-CA-Chain.pem'
    # 替换为您的账号和密码:使用"账号:密码"格式,例如 'testaccount:Rp829dlwa'
    access_key_id = 'testaccount:Rp829dlwa'

    # 2. 创建DynamoDB资源
    dynamodb = boto3.resource(
        'dynamodb', 
        region_name='anystring',  # 可以为空字符串或任意字符串
        endpoint_url=endpoint,
        verify=ca_path,
        aws_access_key_id=access_key_id,  
        aws_secret_access_key='anystring'  # 可以为空字符串或任意字符串
    ) 
        
    table = dynamodb.Table('Song')

    # 3. 写入数据 (PutItem)
    print("--- 正在测试 PutItem ---")
    try:
        response = table.put_item(
            Item={
                'Artist': '周杰伦',      # 分区键 (Hash Key)
                'SongTitle': '七里香',   # 排序键 (Range Key)
                'Album': '七里香',       # 属性 (Attribute)
                'Year': 2004            # 属性 (Attribute)
            }
        )
        print("写入成功: HTTPStatusCode", response['ResponseMetadata']['HTTPStatusCode'])
    except ClientError as e:
        print(f"写入失败: {e.response['Error']['Message']}")

    # 4. 读取数据 (GetItem)
    print("\n--- 正在测试 GetItem ---")
    try:
        response = table.get_item(
            Key={
                'Artist': '周杰伦',
                'SongTitle': '七里香'
            }
        )
        
        item = response.get('Item')
        if item:
            print("读取成功:", item)
        else:
            print("未找到对应的项目")
            
    except ClientError as e:
        print(f"读取失败: {e.response['Error']['Message']}")

if __name__ == "__main__":
    test_dynamodb_operations()

Java

import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.http.TlsTrustManagersProvider;  
import software.amazon.awssdk.http.apache.ApacheHttpClient;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.*;
import software.amazon.awssdk.utils.AttributeMap;

import javax.net.ssl.TrustManagerFactory;  
import java.io.InputStream; 
import java.net.URI;
import java.nio.file.Files;  
import java.nio.file.Paths; 
import java.security.KeyStore;  
import java.security.cert.Certificate; 
import java.security.cert.CertificateFactory;  
import java.time.Instant;
import java.util.*;
import java.util.stream.Collectors;

public class DynamoDBTest {

    public static void main(String[] args) {
        // 1. 配置参数
        // 替换为您的实例连接地址
        String endpoint = "https://tt-2****b.tairskvddb.rds.aliyuncs.com:6379";
        String caPath = "/root/ApsaraDB-CA-Chain.pem";
        // 替换为您的账号和密码:使用"账号:密码"格式,例如 'testaccount:Rp829dlwa'
        String accessKeyId = "testaccount:Rp829dlwa"; 
        String secretAccessKey = "anystring"; // 兼容性需要,可填写任意值

        try {
            // 2. 创建 DynamoDB 客户端
            DynamoDbClient ddb = DynamoDbClient.builder()
                .endpointOverride(URI.create(endpoint))
                .region(Region.US_EAST_1) // 随便指定一个
                .credentialsProvider(StaticCredentialsProvider.create(
                        AwsBasicCredentials.create(accessKeyId, secretAccessKey)))
                .httpClient(ApacheHttpClient.builder()
                        .tlsTrustManagersProvider(createTrustManagersProvider(caPath))
                        .build())
                .overrideConfiguration(ClientOverrideConfiguration.builder()
                        .retryPolicy(p -> p.numRetries(3))
                        .build())
                .build();

            String tableName = "Song";

            // 4. 写入数据 (PutItem)
            System.out.println("--- 正在测试 PutItem ---");
            Map<String, AttributeValue> item = new HashMap<>();
            item.put("Artist", AttributeValue.builder().s("周杰伦").build());
            item.put("SongTitle", AttributeValue.builder().s("七里香").build());
            item.put("Album", AttributeValue.builder().s("七里香").build());
            item.put("Year", AttributeValue.builder().n("2004").build());

            PutItemRequest putRequest = PutItemRequest.builder()
                    .tableName(tableName)
                    .item(item)
                    .build();

            PutItemResponse putResponse = ddb.putItem(putRequest);
            System.out.println("写入成功: HTTP StatusCode " + putResponse.sdkHttpResponse().statusCode());

            // 5. 读取数据 (GetItem)
            System.out.println("\n--- 正在测试 GetItem ---");
            Map<String, AttributeValue> key = new HashMap<>();
            key.put("Artist", AttributeValue.builder().s("周杰伦").build());
            key.put("SongTitle", AttributeValue.builder().s("七里香").build());

            GetItemRequest getRequest = GetItemRequest.builder()
                    .tableName(tableName)
                    .key(key)
                    .build();

            Map<String, AttributeValue> result = ddb.getItem(getRequest).item();
            if (result != null && !result.isEmpty()) {
                System.out.println("读取成功: " + result);
            } else {
                System.out.println("未找到对应的项目");
            }

        } catch (Exception e) {
            System.err.println("操作失败: " + e.getMessage());
            e.printStackTrace();
        }
    }

    /**
     * 读取指定路径的证书并创建 TrustManager
     */
    private static TlsTrustManagersProvider createTrustManagersProvider(String certPath) {
        return () -> {
            try (InputStream is = Files.newInputStream(Paths.get(certPath))) {
                CertificateFactory cf = CertificateFactory.getInstance("X.509");
                Collection<? extends Certificate> certs = cf.generateCertificates(is);

                KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType());
                keyStore.load(null, null);

                // 遍历并添加所有证书到 KeyStore
                int index = 0;
                for (Certificate cert : certs) {
                    keyStore.setCertificateEntry("custom-ca-" + (index++), cert);
                }

                TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
                tmf.init(keyStore);

                return tmf.getTrustManagers();
            } catch (Exception e) {
                throw new RuntimeException("无法加载证书: " + certPath, e);
            }
        };
    }
}

GO

// 本示例使用设置AWS_CA_BUNDLE环境变量的方式设置CA 信息
package main

import (
        "context"
        "fmt"
        "log"
        "github.com/aws/aws-sdk-go-v2/aws"
        "github.com/aws/aws-sdk-go-v2/config"
        "github.com/aws/aws-sdk-go-v2/credentials"
        "github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue"
        "github.com/aws/aws-sdk-go-v2/service/dynamodb"
)

// 定义数据结构
type Song struct {
        Artist    string `dynamodbav:"Artist"`
        SongTitle string `dynamodbav:"SongTitle"`
        Album     string `dynamodbav:"Album"`
        Year      int    `dynamodbav:"Year"`
}

func main() {
        // 替换为实例连接地址,设置白名单后,可在实例详情页连接信息区域获取
        endpoint := "https://tt-2****b.tairskvddb.rds.aliyuncs.com:6379"
        // 替换为您的账号和密码:使用"账号:密码"格式,例如 'testaccount:Rp829dlwa'
        accessKeyID := "testaccount:Rp829dlwa"


        // 3. 初始化 AWS 配置
        cfg, err := config.LoadDefaultConfig(context.TODO(),
                config.WithRegion("anystring"), // 阿里云兼容接口通常可填任意值
                config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(accessKeyID, "anystring", "")),
        )
        if err != nil {
                log.Fatalf("无法加载配置: %v", err)
        }

        // 创建 DynamoDB 客户端并指定 Endpoint
        client := dynamodb.NewFromConfig(cfg, func(o *dynamodb.Options) {
                o.BaseEndpoint = aws.String(endpoint)
        })

        tableName := "Song"

        // 4. 写入数据 (PutItem)
        fmt.Println("--- 正在测试 PutItem ---")
        item := Song{
                Artist:    "周杰伦",
                SongTitle: "七里香",
                Album:     "七里香",
                Year:      2004,
        }

        av, err := attributevalue.MarshalMap(item)
        if err != nil {
                log.Fatalf("序列化失败: %v", err)
        }

        _, err = client.PutItem(context.TODO(), &dynamodb.PutItemInput{
                TableName: aws.String(tableName),
                Item:      av,
        })
        if err != nil {
                fmt.Printf("写入失败: %v\n", err)
        } else {
                fmt.Println("写入成功")
        }

        // 5. 读取数据 (GetItem)
        fmt.Println("\n--- 正在测试 GetItem ---")
        key, err := attributevalue.MarshalMap(map[string]string{
                "Artist":    "周杰伦",
                "SongTitle": "七里香",
        })

        out, err := client.GetItem(context.TODO(), &dynamodb.GetItemInput{
                TableName: aws.String(tableName),
                Key:       key,
        })
        if err != nil {
                fmt.Printf("读取失败: %v\n", err)
        } else if out.Item == nil {
                fmt.Println("未找到对应的项目")
        } else {
                var result Song
                err = attributevalue.UnmarshalMap(out.Item, &result)
                fmt.Printf("读取成功: %+v\n", result)
        }
}

Node.js

// 本示例使用配置环境变量NODE_EXTRA_CA_CERTS的方式指定CA证书
const { DynamoDBClient } = require("@aws-sdk/client-dynamodb");
const { DynamoDBDocument } = require("@aws-sdk/lib-dynamodb");
const { NodeHttpHandler } = require("@smithy/node-http-handler"); // 或者从 @aws-sdk/node-http-handler 导入
const https = require("https");
const fs = require("fs");

async function testDynamoDBOperations() {
    // 替换为实例连接地址,设置白名单后,可在实例详情页连接信息区域获取
    const endpoint = "https://tt-2****b.tairskvddb.rds.aliyuncs.com:6379";

    // 替换为您的账号和密码:使用"账号:密码"格式,例如 'testaccount:Rp829dlwa'
    const accessKeyId = "testaccount:Rp829dlwa";
    const secretAccessKey = "anystring";  // 阿里云兼容接口可填任意值

    // 3. 初始化客户端
    const baseClient = new DynamoDBClient({
        endpoint: endpoint,
        region: "anystring", // 阿里云兼容接口可填任意值
        credentials: {
            accessKeyId: accessKeyId,
            secretAccessKey: secretAccessKey
        },
        maxAttempts: 3
    });

    // 使用 DocumentClient 简化数据操作
    const ddbDoc = DynamoDBDocument.from(baseClient);
    const tableName = "Song";

    // 4. 写入数据 (PutItem)
    console.log("--- 正在测试 PutItem ---");
    try {
        const response = await ddbDoc.put({
            TableName: tableName,
            Item: {
                Artist: '周杰伦',
                SongTitle: '七里香',
                Album: '七里香',
                Year: 2004
            }
        });
        console.log("写入成功: HTTPStatusCode", response.$metadata.httpStatusCode);
    } catch (err) {
        console.error("写入失败:", err.message);
    }

    // 5. 读取数据 (GetItem)
    console.log("\n--- 正在测试 GetItem ---");
    try {
        const response = await ddbDoc.get({
            TableName: tableName,
            Key: {
                Artist: '周杰伦',
                SongTitle: '七里香'
            }
        });

        if (response.Item) {
            console.log("读取成功:", response.Item);
        } else {
            console.log("未找到对应的项目");
        }
    } catch (err) {
        console.error("读取失败:", err.message);
    }
}

testDynamoDBOperations();