本文介绍如何配置访问控制、获取连接凭证,并使用不同语言的SDK连接DynamoDB兼容版实例。
步骤一:配置访问控制与获取连接信息
在连接数据库前,您需要完成安全设置,并从控制台获取连接凭证。
创建账号:在实例详情页的左侧导航栏,单击账号管理 ,创建用于访问数据库的账号和密码。更多信息,请参见创建和管理账号。
设置白名单:在实例详情页的左侧导航栏,单击白名单设置,添加允许访问该实例的客户端IP地址或IP地址段。更多信息,请参见设置白名单。
获取连接凭证:完成白名单设置后,在实例详情页的连接信息区域,获取连接所需的信息:
实例连接地址:找到连接地址并复制。该地址用作SDK中Endpoint参数的值。实例默认提供专有网络连接地址,如需公网访问可单击公网访问右侧申请按钮开通,开通公网访问不会产生额外费用。
CA证书:在连接地址右侧,单击下载CA证书,获取CA证书文件(
ApsaraDB-CA-Chain.pem)。该文件用于验证服务端证书,确保连接安全。
步骤二:连接实例与执行数据操作
您可以使用任何兼容DynamoDB的SDK连接实例并执行数据操作。在应用程序代码中,您需要配置以下核心参数:
Endpoint:设置为您在步骤一获取的实例连接地址。
身份凭证 (Credentials):使用您在步骤一创建的账号和密码进行身份验证。
CA证书路径:可以以通过以下两种方式配置 CA 证书:
直接在代码中指定证书文件路径(例如修改例如
verify或ca_path参数)。通过设置环境变量(如
AWS_CA_BUNDLE或NODE_EXTRA_CA_CERTS)实现无代码侵入式加载。更多操作细节请查阅 AWS SDK相关说明。
以下是不同编程语言的示例代码:
Python
import boto3
from botocore.exceptions import ClientError
def test_dynamodb_operations():
# 1. 配置连接参数
# 替换为您的实例连接地址
endpoint = "https://tt-2****b.tairskvddb.rds.aliyuncs.com:6379"
# 替换为您的CA证书文件路径,如果使用AWS_CA_BUNDLE环境变量则不需要此参数
ca_path = '/root/ApsaraDB-CA-Chain.pem'
# 替换为您的账号和密码:使用"账号:密码"格式,例如 'testaccount:Rp829dlwa'
access_key_id = 'testaccount:Rp829dlwa'
# 2. 创建DynamoDB资源
dynamodb = boto3.resource(
'dynamodb',
region_name='anystring', # 可以为空字符串或任意字符串
endpoint_url=endpoint,
verify=ca_path,
aws_access_key_id=access_key_id,
aws_secret_access_key='anystring' # 可以为空字符串或任意字符串
)
table = dynamodb.Table('Song')
# 3. 写入数据 (PutItem)
print("--- 正在测试 PutItem ---")
try:
response = table.put_item(
Item={
'Artist': '周杰伦', # 分区键 (Hash Key)
'SongTitle': '七里香', # 排序键 (Range Key)
'Album': '七里香', # 属性 (Attribute)
'Year': 2004 # 属性 (Attribute)
}
)
print("写入成功: HTTPStatusCode", response['ResponseMetadata']['HTTPStatusCode'])
except ClientError as e:
print(f"写入失败: {e.response['Error']['Message']}")
# 4. 读取数据 (GetItem)
print("\n--- 正在测试 GetItem ---")
try:
response = table.get_item(
Key={
'Artist': '周杰伦',
'SongTitle': '七里香'
}
)
item = response.get('Item')
if item:
print("读取成功:", item)
else:
print("未找到对应的项目")
except ClientError as e:
print(f"读取失败: {e.response['Error']['Message']}")
if __name__ == "__main__":
test_dynamodb_operations()Java
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.http.TlsTrustManagersProvider;
import software.amazon.awssdk.http.apache.ApacheHttpClient;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.*;
import software.amazon.awssdk.utils.AttributeMap;
import javax.net.ssl.TrustManagerFactory;
import java.io.InputStream;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.security.KeyStore;
import java.security.cert.Certificate;
import java.security.cert.CertificateFactory;
import java.time.Instant;
import java.util.*;
import java.util.stream.Collectors;
public class DynamoDBTest {
public static void main(String[] args) {
// 1. 配置参数
// 替换为您的实例连接地址
String endpoint = "https://tt-2****b.tairskvddb.rds.aliyuncs.com:6379";
String caPath = "/root/ApsaraDB-CA-Chain.pem";
// 替换为您的账号和密码:使用"账号:密码"格式,例如 'testaccount:Rp829dlwa'
String accessKeyId = "testaccount:Rp829dlwa";
String secretAccessKey = "anystring"; // 兼容性需要,可填写任意值
try {
// 2. 创建 DynamoDB 客户端
DynamoDbClient ddb = DynamoDbClient.builder()
.endpointOverride(URI.create(endpoint))
.region(Region.US_EAST_1) // 随便指定一个
.credentialsProvider(StaticCredentialsProvider.create(
AwsBasicCredentials.create(accessKeyId, secretAccessKey)))
.httpClient(ApacheHttpClient.builder()
.tlsTrustManagersProvider(createTrustManagersProvider(caPath))
.build())
.overrideConfiguration(ClientOverrideConfiguration.builder()
.retryPolicy(p -> p.numRetries(3))
.build())
.build();
String tableName = "Song";
// 4. 写入数据 (PutItem)
System.out.println("--- 正在测试 PutItem ---");
Map<String, AttributeValue> item = new HashMap<>();
item.put("Artist", AttributeValue.builder().s("周杰伦").build());
item.put("SongTitle", AttributeValue.builder().s("七里香").build());
item.put("Album", AttributeValue.builder().s("七里香").build());
item.put("Year", AttributeValue.builder().n("2004").build());
PutItemRequest putRequest = PutItemRequest.builder()
.tableName(tableName)
.item(item)
.build();
PutItemResponse putResponse = ddb.putItem(putRequest);
System.out.println("写入成功: HTTP StatusCode " + putResponse.sdkHttpResponse().statusCode());
// 5. 读取数据 (GetItem)
System.out.println("\n--- 正在测试 GetItem ---");
Map<String, AttributeValue> key = new HashMap<>();
key.put("Artist", AttributeValue.builder().s("周杰伦").build());
key.put("SongTitle", AttributeValue.builder().s("七里香").build());
GetItemRequest getRequest = GetItemRequest.builder()
.tableName(tableName)
.key(key)
.build();
Map<String, AttributeValue> result = ddb.getItem(getRequest).item();
if (result != null && !result.isEmpty()) {
System.out.println("读取成功: " + result);
} else {
System.out.println("未找到对应的项目");
}
} catch (Exception e) {
System.err.println("操作失败: " + e.getMessage());
e.printStackTrace();
}
}
/**
* 读取指定路径的证书并创建 TrustManager
*/
private static TlsTrustManagersProvider createTrustManagersProvider(String certPath) {
return () -> {
try (InputStream is = Files.newInputStream(Paths.get(certPath))) {
CertificateFactory cf = CertificateFactory.getInstance("X.509");
Collection<? extends Certificate> certs = cf.generateCertificates(is);
KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType());
keyStore.load(null, null);
// 遍历并添加所有证书到 KeyStore
int index = 0;
for (Certificate cert : certs) {
keyStore.setCertificateEntry("custom-ca-" + (index++), cert);
}
TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
tmf.init(keyStore);
return tmf.getTrustManagers();
} catch (Exception e) {
throw new RuntimeException("无法加载证书: " + certPath, e);
}
};
}
}GO
// 本示例使用设置AWS_CA_BUNDLE环境变量的方式设置CA 信息
package main
import (
"context"
"fmt"
"log"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
)
// 定义数据结构
type Song struct {
Artist string `dynamodbav:"Artist"`
SongTitle string `dynamodbav:"SongTitle"`
Album string `dynamodbav:"Album"`
Year int `dynamodbav:"Year"`
}
func main() {
// 替换为实例连接地址,设置白名单后,可在实例详情页连接信息区域获取
endpoint := "https://tt-2****b.tairskvddb.rds.aliyuncs.com:6379"
// 替换为您的账号和密码:使用"账号:密码"格式,例如 'testaccount:Rp829dlwa'
accessKeyID := "testaccount:Rp829dlwa"
// 3. 初始化 AWS 配置
cfg, err := config.LoadDefaultConfig(context.TODO(),
config.WithRegion("anystring"), // 阿里云兼容接口通常可填任意值
config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(accessKeyID, "anystring", "")),
)
if err != nil {
log.Fatalf("无法加载配置: %v", err)
}
// 创建 DynamoDB 客户端并指定 Endpoint
client := dynamodb.NewFromConfig(cfg, func(o *dynamodb.Options) {
o.BaseEndpoint = aws.String(endpoint)
})
tableName := "Song"
// 4. 写入数据 (PutItem)
fmt.Println("--- 正在测试 PutItem ---")
item := Song{
Artist: "周杰伦",
SongTitle: "七里香",
Album: "七里香",
Year: 2004,
}
av, err := attributevalue.MarshalMap(item)
if err != nil {
log.Fatalf("序列化失败: %v", err)
}
_, err = client.PutItem(context.TODO(), &dynamodb.PutItemInput{
TableName: aws.String(tableName),
Item: av,
})
if err != nil {
fmt.Printf("写入失败: %v\n", err)
} else {
fmt.Println("写入成功")
}
// 5. 读取数据 (GetItem)
fmt.Println("\n--- 正在测试 GetItem ---")
key, err := attributevalue.MarshalMap(map[string]string{
"Artist": "周杰伦",
"SongTitle": "七里香",
})
out, err := client.GetItem(context.TODO(), &dynamodb.GetItemInput{
TableName: aws.String(tableName),
Key: key,
})
if err != nil {
fmt.Printf("读取失败: %v\n", err)
} else if out.Item == nil {
fmt.Println("未找到对应的项目")
} else {
var result Song
err = attributevalue.UnmarshalMap(out.Item, &result)
fmt.Printf("读取成功: %+v\n", result)
}
}Node.js
// 本示例使用配置环境变量NODE_EXTRA_CA_CERTS的方式指定CA证书
const { DynamoDBClient } = require("@aws-sdk/client-dynamodb");
const { DynamoDBDocument } = require("@aws-sdk/lib-dynamodb");
const { NodeHttpHandler } = require("@smithy/node-http-handler"); // 或者从 @aws-sdk/node-http-handler 导入
const https = require("https");
const fs = require("fs");
async function testDynamoDBOperations() {
// 替换为实例连接地址,设置白名单后,可在实例详情页连接信息区域获取
const endpoint = "https://tt-2****b.tairskvddb.rds.aliyuncs.com:6379";
// 替换为您的账号和密码:使用"账号:密码"格式,例如 'testaccount:Rp829dlwa'
const accessKeyId = "testaccount:Rp829dlwa";
const secretAccessKey = "anystring"; // 阿里云兼容接口可填任意值
// 3. 初始化客户端
const baseClient = new DynamoDBClient({
endpoint: endpoint,
region: "anystring", // 阿里云兼容接口可填任意值
credentials: {
accessKeyId: accessKeyId,
secretAccessKey: secretAccessKey
},
maxAttempts: 3
});
// 使用 DocumentClient 简化数据操作
const ddbDoc = DynamoDBDocument.from(baseClient);
const tableName = "Song";
// 4. 写入数据 (PutItem)
console.log("--- 正在测试 PutItem ---");
try {
const response = await ddbDoc.put({
TableName: tableName,
Item: {
Artist: '周杰伦',
SongTitle: '七里香',
Album: '七里香',
Year: 2004
}
});
console.log("写入成功: HTTPStatusCode", response.$metadata.httpStatusCode);
} catch (err) {
console.error("写入失败:", err.message);
}
// 5. 读取数据 (GetItem)
console.log("\n--- 正在测试 GetItem ---");
try {
const response = await ddbDoc.get({
TableName: tableName,
Key: {
Artist: '周杰伦',
SongTitle: '七里香'
}
});
if (response.Item) {
console.log("读取成功:", response.Item);
} else {
console.log("未找到对应的项目");
}
} catch (err) {
console.error("读取失败:", err.message);
}
}
testDynamoDBOperations();该文章对您有帮助吗?