Python数据复制

更新时间:2025-03-07 06:29:04

数据复制是以异步(近实时)方式将源Bucket中的文件(Object)以及对Object的创建、更新和删除等操作自动复制到目标Bucket。OSS支持跨区域复制(Cross-Region Replication)和同区域复制(Same-Region Replication)。

注意事项

  • 本文示例代码以华东1(杭州)的地域IDcn-hangzhou为例,默认使用外网Endpoint,如果您希望通过与OSS同地域的其他阿里云产品访问OSS,请使用内网Endpoint。关于OSS支持的RegionEndpoint的对应关系,请参见OSS地域和访问域名

  • 阿里云账号默认拥有数据复制的相关权限。如果您希望通过RAM用户或者STS的方式执行数据复制相关操作,例如:

    • 开启数据复制,您必须拥有oss:PutBucketReplication权限。

    • 开启或关闭数据复制时间控制(RTC)功能,您必须拥有oss:PutBucketRtc权限。

    • 查看数据复制规则,您必须拥有oss:GetBucketReplication权限。

    • 查看可复制的目标地域,您必须拥有oss:GetBucketReplicationLocation权限。

    • 查看数据复制进度,您必须拥有oss:GetBucketReplicationProgress权限。

    • 关闭数据复制,您必须拥有oss:DeleteBucketReplication权限

示例代码

开启数据复制

重要

开启数据复制前,请确保源存储空间与目标存储空间同时处于非版本化或已启用版本控制状态。

以下代码用于开启数据复制,将源Bucket中的数据复制到相同或不同地域下的目标Bucket。

import argparse
import alibabacloud_oss_v2 as oss

# 创建一个命令行参数解析器
parser = argparse.ArgumentParser(description="put bucket replication sample")
# 添加需要的命令行参数
parser.add_argument('--region', help='The region in which the bucket is located.', required=True)
parser.add_argument('--bucket', help='The name of the bucket.', required=True)
parser.add_argument('--endpoint', help='The domain names that other services can use to access OSS')
parser.add_argument('--sync_role', help='The role that you want to authorize OSS to use to replicate data', required=True)
parser.add_argument('--target_bucket', help='The destination bucket to which data is replicated', required=True)
parser.add_argument('--target_location', help='The region in which the destination bucket is located', required=True)

def main():
    # 解析命令行参数
    args = parser.parse_args()

    # 从环境变量中加载访问凭证信息
    credentials_provider = oss.credentials.EnvironmentVariableCredentialsProvider()

    # 使用SDK的默认配置
    cfg = oss.config.load_default()
    # 设置访问凭证提供者
    cfg.credentials_provider = credentials_provider
    # 设置区域
    cfg.region = args.region
    # 如果指定了endpoint,则设置endpoint到配置中
    if args.endpoint is not None:
        cfg.endpoint = args.endpoint

    # 创建OSS客户端
    client = oss.Client(cfg)

    # 配置并执行PutBucketReplication请求
    result = client.put_bucket_replication(oss.PutBucketReplicationRequest(
            bucket=args.bucket,  # 源存储空间名称
            replication_configuration=oss.ReplicationConfiguration(
                rules=[oss.ReplicationRule(
                    source_selection_criteria=oss.ReplicationSourceSelectionCriteria(
                        sse_kms_encrypted_objects=oss.SseKmsEncryptedObjects(
                            status=oss.StatusType.ENABLED,
                        ),
                    ),
                    rtc=oss.ReplicationTimeControl(
                        status='disabled',  # 禁用复制时间控制
                    ),
                    destination=oss.ReplicationDestination(
                        bucket=args.target_bucket,  # 目标存储空间名称
                        location=args.target_location,  # 目标存储空间所在区域
                        transfer_type=oss.TransferType.INTERNAL,  # 传输类型
                    ),
                    historical_object_replication=oss.HistoricalObjectReplicationType.DISABLED,  # 禁用历史数据复制功能
                    sync_role=args.sync_role,  # 同步角色
                    status='Disabled',  # 禁用该规则
                    prefix_set=oss.ReplicationPrefixSet(
                        prefixs=['aaa/', 'bbb/'],  # 前缀集,用于指定哪些前缀的对象需要复制
                    ),
                    action='ALL',  # 所有操作
                )],
            ),
    ))

    # 打印请求结果的状态码和请求ID
    print(f'status code: {result.status_code}, request id: {result.request_id}')

if __name__ == "__main__":
    main()

查看数据复制规则

以下代码用于查看Bucket的数据复制规则。

import argparse
import alibabacloud_oss_v2 as oss

# 创建一个命令行参数解析器
parser = argparse.ArgumentParser(description="get bucket replication sample")
# 添加必需的命令行参数
parser.add_argument('--region', help='The region in which the bucket is located.', required=True)
parser.add_argument('--bucket', help='The name of the bucket.', required=True)
parser.add_argument('--endpoint', help='The domain names that other services can use to access OSS')

def main():
    # 解析命令行参数
    args = parser.parse_args()

    # 从环境变量中加载访问凭证
    credentials_provider = oss.credentials.EnvironmentVariableCredentialsProvider()

    # 使用SDK的默认配置
    cfg = oss.config.load_default()
    # 设置访问凭证提供者
    cfg.credentials_provider = credentials_provider
    # 设置区域
    cfg.region = args.region
    # 如果提供了endpoint,则设置endpoint到配置中
    if args.endpoint is not None:
        cfg.endpoint = args.endpoint

    # 创建OSS客户端
    client = oss.Client(cfg)

    # 获取存储空间的复制规则
    result = client.get_bucket_replication(oss.GetBucketReplicationRequest(
            bucket=args.bucket,
    ))

    # 打印响应的基本信息
    print(f'status code: {result.status_code},'
          f' request id: {result.request_id},'
          f' replication configuration: {result.replication_configuration}')

    # 如果存在数据复制规则,则打印详细的复制规则信息
    if result.replication_configuration.rules:
        for r in result.replication_configuration.rules:
            print(f'result: source selection criteria: {r.source_selection_criteria}, '
                  f'rtc: {r.rtc}, destination: {r.destination}, '
                  f'historical object replication: {r.historical_object_replication}, '
                  f'sync role: {r.sync_role}, status: {r.status}, '
                  f'encryption configuration: {r.encryption_configuration}, '
                  f'id: {r.id}, prefix set: {r.prefix_set}, action: {r.action}')

if __name__ == "__main__":
    main()

设置数据复制时间控制(RTC)

以下代码用于为已有的跨区域复制规则开启或关闭数据复制时间控制(RTC)功能。

import argparse
import alibabacloud_oss_v2 as oss

# 创建一个命令行参数解析器
parser = argparse.ArgumentParser(description="put bucket rtc sample")

# 添加必需的命令行参数
parser.add_argument('--region', help='The region in which the bucket is located.', required=True)
parser.add_argument('--bucket', help='The name of the bucket.', required=True)
parser.add_argument('--endpoint', help='The domain names that other services can use to access OSS')
parser.add_argument('--status', help='Specifies whether to enable RTC. Valid values: disabled, enabled', default='disabled')
parser.add_argument('--rule_id', help='The ID of the data replication rule for which you want to configure RTC.', required=True)

def main():
    # 解析命令行参数
    args = parser.parse_args()

    # 从环境变量中加载访问凭证
    credentials_provider = oss.credentials.EnvironmentVariableCredentialsProvider()

    # 使用SDK的默认配置
    cfg = oss.config.load_default()
    # 设置访问凭证提供者
    cfg.credentials_provider = credentials_provider
    # 设置区域
    cfg.region = args.region
    # 如果提供了endpoint,则设置endpoint到配置中
    if args.endpoint is not None:
        cfg.endpoint = args.endpoint

    # 创建OSS客户端
    client = oss.Client(cfg)

    # 配置RTC(Replication Time Control)并将其应用于指定的存储空间
    result = client.put_bucket_rtc(oss.PutBucketRtcRequest(
            bucket=args.bucket,
            rtc_configuration=oss.RtcConfiguration(
                rtc=oss.ReplicationTimeControl(
                    status=args.status,
                ),
                id=args.rule_id,
            ),
    ))

    # 打印响应的基本信息
    print(f'status code: {result.status_code},'
          f' request id: {result.request_id}')

if __name__ == "__main__":
    main()

查看可复制的目标地域

以下代码用于查看Bucket的数据可复制的目标地域列表。

import argparse
import alibabacloud_oss_v2 as oss

# 创建一个命令行参数解析器
parser = argparse.ArgumentParser(description="get bucket replication location sample")
# 添加必需的命令行参数
parser.add_argument('--region', help='The region in which the bucket is located.', required=True)
parser.add_argument('--bucket', help='The name of the bucket.', required=True)
parser.add_argument('--endpoint', help='The domain names that other services can use to access OSS')


def main():
    # 解析命令行参数
    args = parser.parse_args()

    # 从环境变量中加载访问凭证
    credentials_provider = oss.credentials.EnvironmentVariableCredentialsProvider()

    # 使用SDK的默认配置
    cfg = oss.config.load_default()
    # 设置访问凭证提供者
    cfg.credentials_provider = credentials_provider
    # 设置区域
    cfg.region = args.region
    # 如果提供了endpoint,则设置endpoint到配置中
    if args.endpoint is not None:
        cfg.endpoint = args.endpoint

    # 创建OSS客户端
    client = oss.Client(cfg)

    # 获取存储空间的复制位置信息
    result = client.get_bucket_replication_location(oss.GetBucketReplicationLocationRequest(
            bucket=args.bucket,
    ))

    # 打印响应的基本信息
    print(f'status code: {result.status_code},'
          f' request id: {result.request_id},'
          f' replication location: {result.replication_location},'
          f' location transfer type constraint: {result.replication_location.location_transfer_type_constraint},'
          # f' location: {result.replication_location.location_transfer_type_constraint.location_transfer_types[0].location},'
          # f' transfer types: {result.replication_location.location_transfer_type_constraint.location_transfer_types[0].transfer_types},'
          # f' location: {result.replication_location.location_transfer_type_constraint.location_transfer_types[1].location},'
          # f' transfer types: {result.replication_location.location_transfer_type_constraint.location_transfer_types[1].transfer_types},'
          f' locationrtc constraint: {result.replication_location.locationrtc_constraint},'
    )

    # 如果存在位置传输类型约束信息,则打印详细的位置和传输类型信息
    if result.replication_location.location_transfer_type_constraint.location_transfer_types:
        for r in result.replication_location.location_transfer_type_constraint.location_transfer_types:
            print(f'result: location: {r.location}, transfer types: {r.transfer_types}')

if __name__ == "__main__":
    main()

查看数据复制进度

说明

数据复制进度分为历史数据复制进度和新写入数据复制进度。

  • 历史数据复制进度用百分比表示,仅对开启了历史数据复制的存储空间有效。

  • 新写入数据复制进度用新写入数据的时间点表示,代表这个时间点之前的数据已复制完成。

以下代码用于查看Bucket中指定规则ID的数据复制进度。

import argparse
import alibabacloud_oss_v2 as oss

# 创建一个命令行参数解析器
parser = argparse.ArgumentParser(description="get bucket replication progress sample")
# 添加必需的命令行参数
parser.add_argument('--region', help='The region in which the bucket is located.', required=True)
parser.add_argument('--bucket', help='The name of the bucket.', required=True)
parser.add_argument('--endpoint', help='The domain names that other services can use to access OSS')
parser.add_argument('--rule_id', help='The ID of the data replication rule for which you want to configure RTC.', required=True)

def main():
    # 解析命令行参数
    args = parser.parse_args()

    # 从环境变量中加载访问凭证
    credentials_provider = oss.credentials.EnvironmentVariableCredentialsProvider()

    # 使用SDK的默认配置
    cfg = oss.config.load_default()
    # 设置访问凭证提供者
    cfg.credentials_provider = credentials_provider
    # 设置区域
    cfg.region = args.region
    # 如果提供了endpoint,则设置endpoint到配置中
    if args.endpoint is not None:
        cfg.endpoint = args.endpoint

    # 创建OSS客户端
    client = oss.Client(cfg)

    # 获取存储空间的复制进度
    result = client.get_bucket_replication_progress(oss.GetBucketReplicationProgressRequest(
            bucket=args.bucket,
            rule_id=args.rule_id,
    ))

    # 打印响应的基本信息
    print(f'status code: {result.status_code},'
          f' request id: {result.request_id},'
          f' replication progress: {result.replication_progress}')

    # 如果存在数据复制规则,则打印详细的复制规则信息
    if result.replication_progress.rules:
        for r in result.replication_progress.rules:
            print(f'result: historical object replication: {r.historical_object_replication}, '
                  f'progress: {r.progress}, '
                  f'id: {r.id}, '
                  f'prefix set: {r.prefix_set}, '
                  f'action: {r.action}, '
                  f'destination: {r.destination}, '
                  f'status: {r.status}')

if __name__ == "__main__":
    main()

关闭数据复制

通过删除存储空间的复制规则,您可以关闭源存储空间到目标存储空间的数据复制关系。

以下代码用于删除Bucket中指定规则ID的数据复制关系。

import argparse
import alibabacloud_oss_v2 as oss

# 创建一个命令行参数解析器
parser = argparse.ArgumentParser(description="delete bucket replication sample")

# 添加必需的命令行参数
parser.add_argument('--region', help='The region in which the bucket is located.', required=True)
parser.add_argument('--bucket', help='The name of the bucket.', required=True)
parser.add_argument('--endpoint', help='The domain names that other services can use to access OSS')
parser.add_argument('--rule_id', help='The ID of the data replication rule for which you want to configure RTC.', required=True)

def main():
    # 解析命令行参数
    args = parser.parse_args()

    # 从环境变量中加载访问凭证
    credentials_provider = oss.credentials.EnvironmentVariableCredentialsProvider()

    # 使用SDK的默认配置
    cfg = oss.config.load_default()

    # 设置访问凭证提供者
    cfg.credentials_provider = credentials_provider

    # 设置区域
    cfg.region = args.region

    # 如果提供了endpoint,则设置endpoint到配置中
    if args.endpoint is not None:
        cfg.endpoint = args.endpoint

    # 创建OSS客户端
    client = oss.Client(cfg)

    # 删除存储空间的复制规则
    result = client.delete_bucket_replication(oss.DeleteBucketReplicationRequest(
            bucket=args.bucket,
            replication_rules=oss.ReplicationRules(
                ids=[args.rule_id],
            ),
    ))

    # 打印响应的基本信息
    print(f'status code: {result.status_code},'
          f' request id: {result.request_id},')

if __name__ == "__main__":
    main()
  • 本页导读 (1)
  • 注意事项
  • 示例代码
  • 开启数据复制
  • 查看数据复制规则
  • 设置数据复制时间控制(RTC)
  • 查看可复制的目标地域
  • 查看数据复制进度
  • 关闭数据复制