云下及他云数据库备份管理

数据灾备(DBS)除支持阿里云数据库和阿里云ECS自建数据库的灾备外,还支持对云下及其他云平台数据库进行灾备。

注意事项

若备份库表存在表结构不合理、大表、大字段等情况,备份实例的规格过小可能会导致后续备份实例资源不足,从而引发备份异常。因此,建议您在创建时选择较高规格的备份实例,以免后续备份出现异常。

操作步骤

产品自动备份

购买备份实例(逻辑备份)

  1. 登录数据管理DMS 5.0
  2. 在顶部菜单栏中,选择安全与规范(DBS) > 数据灾备(DBS) > 灾备数据源

    说明

    若您使用的是极简模式的控制台,请单击控制台左上角的2023-01-28_15-57-17.png图标,选择全部功能 > 安全与规范(DBS) > 数据灾备(DBS) > 灾备数据源

  3. 在上方选择地域,在云下及他云数据库 > 产品自动备份页签下,单击目标数据源ID进入数据源详情页。

  4. 备份策略页面中,单击配置备份策略

  5. 选择备份计划页面单击购买备份计划,进入DBS售卖页。

  6. 配置如下参数,单击页面右下角的立即购买

    配置项

    说明

    商品类型

    请选择备份实例(包年包月)

    备份实例地域

    选择要存放备份数据的地域。

    说明

    请确保备份实例所在地域与ECS实例所在地域相同。

    数据源类型

    当前仅支持MySQL

    规格

    规格越高,备份与恢复的性能越高,支持的规格为:serverless(仅MySQL逻辑备份支持)、micro(入门型)、small(低配型)、medium(中配型)、large(高配型)、xlarge(高配型-无流量上限)。

    说明
    • 如果数据库实例(例如生产环境的数据库)需要高性能的备份实例快速执行备份与恢复任务,建议选择xlarge或large规格,获取更高的备份恢复性能。

    • 对备份恢复性能(速度)要求不高,您可以通过计算,选择性价比最高的备份实例规格。更多信息,请参见如何选择备份实例规格

    • 若备份库表存在表结构不合理、大表、大字段等情况,备份实例的规格过小可能会导致后续备份实例资源不足,从而引发备份异常。因此,建议您在创建时选择较高规格的备份实例,以免后续备份出现异常。

    备份方式

    请选择逻辑备份

    存储空间

    您创建时无需选择容量,后续根据实际存入DBS内置存储中的数据量计费。计费详情,请参见存储费用

    资源组

    配置资源组。选择默认或自定义的资源组,方便备份实例管理。

    购买数量

    按需选择购买数量,多个数据库实例需要创建多个备份实例,例如您希望备份数据库实例A与数据库实例B,需要购买2个备份实例。

    购买时长

    选择该备份实例的购买时长。

  7. 确认订单页面,确认订单信息,阅读并选中服务协议,单击去支付

    支付成功后,请返回步骤5的选择备份计划页面,单击已完成支付,即可查看到已创建的备份实例。

    image

配置备份策略

  1. 选择备份计划页面选中待配置的备份实例,并单击下一步

    image

  2. 选择库表页面,选中需要备份的库或表,单击image移动到已选择对象框中,单击提交

    image..png

  3. 提交成功后,您可以在备份策略页面的逻辑备份页签下,单击启动按钮启动备份。

    单击启动后系统会立即发起一次全量和增量备份任务。

    image

    说明

    如果您期望先完成一些其他操作(例如修改备份策略等),您也可以选择当前暂不启动备份,但后续系统会根据备份策略在备份时间自动启动备份。

用户自动备份

重要
  • 目前仅支持MySQL 5.5版本的数据源。

  • 目前仅支持华东1(杭州)地域。

配置备份源以及上传备份文件

  1. 登录数据管理DMS 5.0
  2. 在顶部菜单栏中,选择安全与规范(DBS) > 数据灾备(DBS) > 灾备数据源

    说明

    若您使用的是极简模式的控制台,请单击控制台左上角的2023-01-28_15-57-17.png图标,选择全部功能 > 安全与规范(DBS) > 数据灾备(DBS) > 灾备数据源

  3. 在上方选择地域,在云下及他云数据库页签下,根据数据源类型选择新增入口。

  4. 单击新增数据源,在弹出的对话框中,配置如下信息,并选中目标备份计划,单击页面右下角下一步

    image

    配置项

    说明

    数据源名称

    建议配置具有业务意义的名称,便于后续识别。

    引擎类型

    数据库引擎类型,当前仅支持MySQL。

    引擎版本

    选择数据库引擎的版本。

    引擎参数

    {"lower_case_table_names":1}

    若没有可选备份计划,单击购买备份计划,进入DBS售卖页,购买备份计划。

    说明

    商品类型

    请选择备份实例(包年包月),不支持按量付费。

    备份实例地域

    选择要存放备份数据的地域。

    数据源类型

    选择MySQL

    规格

    选择xmicro。该规格提供的免费数据量额度等,请参见备份计划规格

    备份方式

    选择物理备份

    存储空间

    您创建时无需选择容量,后续根据实际存入DBS内置存储中的数据量计费。计费详情,请参见存储费用

    资源组

    配置资源组。选择默认或自定义的资源组,方便备份实例管理。

    购买数量

    按需选择购买数量,多个数据库实例需要创建多个备份实例,例如您希望备份数据库实例A与数据库实例B,需要购买2个备份实例。

    购买时长

    选择该备份实例的购买时长。

  5. 将备份集上传到指定Bucket中。上传方法,请参见用户自动备份数据上传指南

  6. 上传完成后,单击确定

查看备份信息

用户自动备份页签,单击目标数据源ID。

image

image

配置备份策略

  1. 用户自动备份页签,单击目标数据源操作列的查看备份策略

    image

    image

  2. 配置完成之后,单击确定

看和下载备份数

  1. 用户自动备份页签,单击目标数据源ID。

  2. 单击左侧导航栏中的备份数据

    说明

    用户执行上传数据脚本并完成数据源新建后,当用户新产生一个备份集数据,数据备份管理系统会同步这个备份数据到备份数据页面。

    image

  3. 单击目标备份集操作列的下载按钮获取下载链接,将备份集下载到本地。

    image

创建恢复任务

说明

恢复任务要求数据备份页面上必须存在已生成的备份集,并且该备份集的状态需为完成

  1. 用户自动备份页签,单击目标数据源ID。

  2. 单击左侧导航栏中的备份数据,单击创建恢复任务,手动配置恢复参数。

    image

    参数名称

    说明

    恢复任务名称

    请输入本次恢复任务的名称。建议配置具有业务意义的名称,便于后续识别。

    恢复位置

    选择恢复实例位置,默认为恢复到RDS新实例。

    数据库所在位置

    选择数据库所在位置,默认为RDS。

    实例地区

    选择实例所在地区,目前仅支持华东1(杭州)地区。

    VPC

    选择创建的专有网络。

    VSwitch

    选择创建的交换机。

    实例系列

    选择恢复的实例系列。

    实例规格

    选择恢复的实例规格。

    存储空间

    选择所需存储空间。

    恢复方式

    选择恢复方式,目前仅支持按时间点恢复。

    还原时间

    选择还原时间,可选择恢复时间范围请参考恢复方式参数后的数据。

  3. 配置完成之后单击提交,会生成一个按所选还原时间点恢复的恢复任务,该任务信息将以列表的形式展示在恢复任务页面。

查看恢复任务

  1. 用户自动备份页签,单击目标数据源ID。

  2. 单击左侧导航栏中的任务管理 > 恢复任务

    image

  3. 点击恢复结果列中的实例ID,可跳转至恢复的RDS实例基本信息页面。

查看恢复演练

  1. 用户自动备份页签,单击目标数据源ID。

  2. 单击左侧导航栏中的恢复演练

    恢复演练概览指标

    查看恢复演练概览,包含恢复任务成功率恢复时长均值数据备份演练覆盖率日志备份演练覆盖率

    image

    名称

    说明

    恢复任务成功率

    在恢复时间点在筛选时间范围内的恢复任务成功率。

    恢复时长均值

    在筛选时间范围内恢复成功任务的平均耗时。

    数据备份演练覆盖率

    在筛选时间范围内数据备份在开源MySQL或RDS MySQL上执行过恢复演练的覆盖率。

    日志备份演练覆盖率

    在筛选时间范围内的日志备份在开源MySQL或RDS MySQL上执行过恢复演练的覆盖率。

    恢复演练时间轴

    通过时间轴展示在筛选时间范围内的各个时间点的恢复演练详情,在时间轴上点击鼠标左键可以查看当前时间点的演练信息。

    image

    备份数据恢复演练详情

    单击数据备份,查看数据备份恢复演练详情列表。单击演练结果列实例ID,进入恢复的rds 实例的基本信息页面。

    image

    单击日志备份,可以查看日志备份恢复演练详情列表。

    image

用户自动备份数据上传指南

准备工作

  • 提前配置数据源,获得数据源ID,如何配置数据源请参见添加数据源

  • 创建一个RAM用户并授予管理对应产品的权限,准备好AccessKeyId和AccessKeySecret信息。具体操作,请参见创建RAM用户为RAM用户授权

环境依赖

  • 命令工具:Bash、Python3。

  • Python相关:oss2、alibabacloud_openapi_util、alibabacloud_tea_openapi、alibabacloud_tea_util。

## 安装阿里云OpenAPI SDK
pip3 install --upgrade pip
pip3 install -i https://mirrors.aliyun.com/pypi/simple/ oss2 alibabacloud_openapi_util alibabacloud_tea_openapi alibabacloud_tea_util

完整上传脚本

备份数据上传脚本分为两部分:

Bash脚本

脚本执行处理入口,按需替换实际执行的参数。

boot_backup.sh:负责模拟本地xtrabackup全量备份的流程。

#!/bin/bash
## 阿里云账号AccessKey拥有所有API的访问权限,风险很高。强烈建议您创建并使用RAM用户进行API访问或日常运维
## 强烈建议不要把 AccessKey 和 AccessKeySecret 保存到代码里,会存在密钥泄漏风险,您可以根据业务需要,保存到配置文件里

AK=<ALIBABA_CLOUD_ACCESS_KEY_ID>
SK=<ALIBABA_CLOUD_ACCESS_KEY_SECRET>
DBS_API=dbs-api.<您的数据源所在地域ID,例如cn-hangzhou>.aliyuncs.com
DATASOURCE_ID=<您的数据源ID>

## 获取脚本当前路径
BASE_PATH=$(cd `dirname $0`; pwd)

TS=`date +%Y%m%d_%H%M%S`
XTRA_DATA_DIR=~/tool/xtrabackup_data/$TS

mkdir -p $XTRA_DATA_DIR

## 执行xtrabackup备份命令,将错误日志输出的到xtrabackup.log文件
~/innobackupex --defaults-file=/etc/my.cnf --user='root' --password='root' --host='localhost' --port=3306 --socket='/var/lib/mysql/mysql.sock' --parallel=4 $XTRA_DATA_DIR/ 2>$XTRA_DATA_DIR/xtrabackup.log

## 获取当前xtrabackup备份的目录
backup_dir=`ls $XTRA_DATA_DIR | grep -v xtrabackup.log | head -n1`
echo -e "\033[33mexecute innobackupex success, backup_dir: $backup_dir" && echo -n -e "\033[0m" && chmod 755 $XTRA_DATA_DIR/$backup_dir

## 打包成tar.gz文件
cd $XTRA_DATA_DIR/$backup_dir && tar -czvf ../$backup_dir.tar.gz . && file ../$backup_dir.tar.gz
echo -e "\033[33mpackage to $backup_dir.tar.gz" && echo -n -e "\033[0m" && sleep 2

## 调用Python脚本进行上传备份数据,并注册备份集元数据
python3 $BASE_PATH/upload_and_register_backup_set.py --access_key_id $AK --access_key_secret $SK --endpoint $DBS_API --datasource_id $DATASOURCE_ID --region_code=cn-hangzhou --data_type=FullBackup --file_path=$XTRA_DATA_DIR/$backup_dir.tar.gz --xtrabackup_log_path=$XTRA_DATA_DIR/xtrabackup.log

Python脚本

  • upload_and_register_backup_set.py:用于上传全量和日志备份数据,解析对应的元数据,并进行备份集元数据的注册。

    # -*- coding: utf-8 -*-
    # This file is auto-generated, don't edit it. Thanks.
    import os
    import argparse
    import re
    import time
    import json
    from datetime import datetime
    
    import oss2
    from alibabacloud_openapi_util.client import Client as OpenApiUtilClient
    from alibabacloud_tea_openapi import models as open_api_models
    from alibabacloud_tea_openapi.client import Client as OpenApiClient
    from alibabacloud_tea_util import models as util_models
    
    import xtrabackup_info_parser
    import xtrabackup_log_parser
    
    def init_command_args():
        parser = argparse.ArgumentParser(description="A sample command-line parser.")
        parser.add_argument("--access_key_id", help="Aliyun AccessKeyId.")
        parser.add_argument("--access_key_secret", help="Aliyun AccessKeySecret.")
        parser.add_argument("--endpoint", help="Aliyun API Endpoint.")
        parser.add_argument("--region_code", help="Aliyun DataSource RegionCode.")
        parser.add_argument("--datasource_id", help="Aliyun DataSourceId.")
        parser.add_argument("--data_type", help="BackupSet DataType: FullBackup | LogBackup.")
        parser.add_argument("--file_path", help="BackupSet File Path.")
        parser.add_argument("--xtrabackup_info_path", help="Xtrabackup Info Path.")
        parser.add_argument("--xtrabackup_log_path", help="Xtrabackup Log Path.")
        parser.add_argument("--begin_time", help="Binlog Begin Time.")
        parser.add_argument("--end_time", help="Binlog End Time.")
    
        args = parser.parse_args()
        if args.access_key_id:
            print(f"access_key_id: ************")
        if args.access_key_secret:
            print(f"access_key_secret: ************")
        if args.endpoint:
            print(f"endpoint: {args.endpoint}")
        if args.region_code:
            print(f"region_code: {args.region_code}")
        if args.datasource_id:
            print(f"datasource_id: {args.datasource_id}")
        if args.data_type:
            print(f"data_type: {args.data_type}")
        if args.file_path:
            print(f"file_path: {args.file_path}")
        if args.xtrabackup_info_path:
            print(f"xtrabackup_info_path: {args.xtrabackup_info_path}")
        if args.xtrabackup_log_path:
            print(f"xtrabackup_log_path: {args.xtrabackup_log_path}")
        if args.begin_time:
            print(f"begin_time: {args.begin_time}")
        if args.end_time:
            print(f"end_time: {args.end_time}")
    
        print('\n')
        return args
    
    
    def date_to_unix_timestamp(date_str):
        dt_obj = datetime.strptime(date_str, "%Y-%m-%d %H:%M:%S")
        # 使用.time()方法得到时间元组,然后用time.mktime转换为秒级时间戳
        timestamp_seconds = time.mktime(dt_obj.timetuple())
        return int(timestamp_seconds) * 1000
    
    
    def create_oss_client(params):
        # 阿里云OSS认证信息
        access_key_id = params['AccessKeyId']
        access_key_secret = params['AccessKeySecret']
        security_token = params['SecurityToken']
        bucket_name = params['BucketName']
        endpoint = params['OssEndpoint']
    
        # 初始化OSS客户端
        auth = oss2.StsAuth(access_key_id, access_key_secret, security_token)
        return oss2.Bucket(auth, endpoint, bucket_name)
    
    
    def upload_oss_file(oss_client, file_path, object_key):
        """
        分片上传大文件到OSS
        :param oss_client:
        :param file_path: 本地文件路径
        :param object_key: OSS中的对象键,即文件名
        """
        # 设置分片大小,单位为字节,默认1MB
        part_size = 1024 * 1024 * 5
        # 初始化分片上传
        upload_id = oss_client.init_multipart_upload(object_key).upload_id
    
        # 打开文件并读取内容
        with open(file_path, 'rb') as file_obj:
            parts = []
            while True:
                data = file_obj.read(part_size)
                if not data:
                    break
                # 上传分片
                result = oss_client.upload_part(object_key, upload_id, len(parts) + 1, data)
                parts.append(oss2.models.PartInfo(len(parts) + 1, result.etag))
    
            # 完成分片上传
            oss_client.complete_multipart_upload(object_key, upload_id, parts)
    
    
    class OssUploader:
    
        def __init__(self, access_key_id, access_key_secret, endpoint, region_code, datasource_id):
            self.access_key_id = access_key_id
            self.access_key_secret = access_key_secret
            self.endpoint = endpoint
            self.region_code = region_code
            self.datasource_id = datasource_id
    
            config = open_api_models.Config(access_key_id, access_key_secret)
            # Endpoint 请参考 https://api.aliyun.com/product/Rds
            config.endpoint = endpoint
            self.client = OpenApiClient(config)
    
        """
        注册备份集元数据
        """
        def configure_backup_set_info(self, req_param):
            params = open_api_models.Params(
                # 接口名称,
                action='ConfigureBackupSetInfo',
                # 接口版本,
                version='2021-01-01',
                # 接口协议,
                protocol='HTTPS',
                # 接口 HTTP 方法,
                method='POST',
                auth_type='AK',
                style='RPC',
                # 接口 PATH,
                pathname='/',
                # 接口请求体内容格式,
                req_body_type='json',
                # 接口响应体内容格式,
                body_type='json'
            )
            # runtime options
            runtime = util_models.RuntimeOptions()
            request = open_api_models.OpenApiRequest(
                query=OpenApiUtilClient.query(req_param)
            )
            # 返回值为 Map 类型,可从 Map 中获得三类数据:响应体 body、响应头 headers、HTTP 返回的状态码 statusCode。
            print(f"ConfigureBackupSetInfo request: {req_param}")
            data = self.client.call_api(params, request, runtime)
    
            print(f"ConfigureBackupSetInfo response: {data}")
            return data['body']['Data']
    
        """
        获取oss上传信息
        """
        def describe_bak_datasource_storage_access_info(self, req_param):
            params = open_api_models.Params(
                # 接口名称,
                action='DescribeBakDataSourceStorageAccessInfo',
                # 接口版本,
                version='2021-01-01',
                # 接口协议,
                protocol='HTTPS',
                # 接口 HTTP 方法,
                method='POST',
                auth_type='AK',
                style='RPC',
                # 接口 PATH,
                pathname='/',
                # 接口请求体内容格式,
                req_body_type='json',
                # 接口响应体内容格式,
                body_type='json'
            )
            # runtime options
            runtime = util_models.RuntimeOptions()
            request = open_api_models.OpenApiRequest(
                query=OpenApiUtilClient.query(req_param)
            )
            # 返回值为 Map 类型,可从 Map 中获得三类数据:响应体 body、响应头 headers、HTTP 返回的状态码 statusCode。
            print(f"DescribeBakDataSourceStorageAccessInfo request: {req_param}")
            data = self.client.call_api(params, request, runtime)
    
            print(f"DescribeBakDataSourceStorageAccessInfo response: {data}")
            return data['body']['Data']
    
        def _fetch_oss_access_info(self, params):
            info = self.describe_bak_datasource_storage_access_info({
                'RegionId': params['RegionId'],
                'DataSourceId': params['DataSourceId'],
                'RegionCode': params['RegionCode'],
                'BackupType': params['BackupType'],
                'BackupSetId': params['BackupSetId']
            })
            return info['OssAccessInfo']
    
        def upload_and_register_backup_set(self, file_path, data_type, extra_meta):
            filename = os.path.basename(file_path)
            params = {'BackupMode': 'Automated', 'BackupMethod': 'Physical', 'RegionId': self.region_code,
                      'RegionCode': self.region_code, 'DataSourceId': self.datasource_id, 'BackupSetName': filename,
                      'ExtraMeta': extra_meta, 'BackupType': data_type, 'UploadStatus': 'WaitingUpload'}
    
            # 首次注册备份集,返回备份集ID
            data = self.configure_backup_set_info(params)
            params['BackupSetId'] = data['BackupSetId']
            print(f"------ configure_backup_set_info success: {file_path}, {data_type}, {params['BackupSetId']}, WaitingUpload\n")
    
            # 上传数据到oss
            oss_info = self._fetch_oss_access_info(params)
            oss_client = create_oss_client(oss_info)
            upload_oss_file(oss_client, file_path, oss_info['ObjectKey'])
            print(f"------ upload_oss_file success: {file_path}, {data_type}, {params['BackupSetId']}\n")
    
            # 标记备份集上传完成
            params['UploadStatus'] = 'UploadSuccess'
            self.configure_backup_set_info(params)
            print(f"------ configure_backup_set_info success: {file_path}, {data_type}, {params['BackupSetId']}, UploadSuccess\n")
    
    
    if __name__ == '__main__':
        args = init_command_args()
        uploader = OssUploader(args.access_key_id, args.access_key_secret,
                               args.endpoint, args.region_code, args.datasource_id)
    
        # 全量和日志备份分别通过不同方式构造extraMeta
        extra_meta = '{}'
        if args.data_type == 'FullBackup':
            obj = {}
            if args.xtrabackup_log_path is not None:
                obj = xtrabackup_log_parser.analyze_slave_status(logpath=args.xtrabackup_log_path)
            elif args.xtrabackup_info_path is not None:
                parser = xtrabackup_info_parser.ExtraMetaParser(file_path=args.xtrabackup_info_path)
                obj = parser.get_extra_meta()
            extra_meta = {'BINLOG_FILE': obj.get('BINLOG_FILE'),
                          'version': obj.get("SERVER_VERSION"),
                          'dataBegin': date_to_unix_timestamp(obj.get("START_TIME")),
                          'dataEnd': date_to_unix_timestamp(obj.get("END_TIME")),
                          'consistentTime': int(date_to_unix_timestamp(obj.get("END_TIME")) / 1000)}
            extra_meta = json.dumps(extra_meta)
    
        elif args.data_type == 'LogBackup':
            obj = {'dataBegin': date_to_unix_timestamp(args.begin_time),
                   'dataEnd': date_to_unix_timestamp(args.end_time)}
            extra_meta = json.dumps(obj)
        print(f"get extra meta json string: {extra_meta}")
    
        # 上传数据,并注册备份集元信息
        uploader.upload_and_register_backup_set(file_path=args.file_path, data_type=args.data_type, extra_meta=extra_meta)
  • xtrabackup_info_parser.py:使用备份文件xtrabackup_info进行元数据解析。

    # -*- coding: utf-8 -*-
    # This file is auto-generated, don't edit it. Thanks.
    import re
    import json
    
    
    class ExtraMetaParser:
        def __init__(self, file_path):
            self.file_path = file_path
            pass
    
        def _parse_xtrabackup_info(self):
            config_data = {}
            with open(self.file_path, 'r') as file:
                for line in file:
                    line = line.strip()
                    if line and not line.startswith('#'):
                        key, value = line.split('=', 1)
                        config_data[key.strip()] = value.strip()
            return config_data
    
        def get_extra_meta(self):
            config_data = self._parse_xtrabackup_info()
            print(f"xtrabackup_info: {config_data}")
    
            binlog_pos = config_data.get("binlog_pos")
            pattern = f"filename '(.*)', position (.*)"
            match = re.search(pattern, binlog_pos)
    
            return {'BINLOG_FILE': match.group(1),
                    'SERVER_VERSION': config_data.get("server_version"),
                    'START_TIME': config_data.get("start_time"),
                    'END_TIME': config_data.get("end_time")}
  • xtrabackup_log_parser.py:使用备份产生的日志文件xtrabackup.log进行元数据解析。

    #!/usr/bin/python
    # coding:utf8
    
    import io
    import re
    import sys
    from datetime import datetime
    
    from six import binary_type, text_type
    
    
    def parse_date_part(date_part, time_part):
        """
        根据给定的日期部分和时间部分,解析并返回完整的日期时间字符串。
        """
        # 获取当前年份的前两位
        current_century = datetime.now().strftime("%Y")[:2]
    
        year_short = date_part[:2]
        # 组合完整年份
        year_full = current_century + year_short
        date_full = year_full + date_part[2:]
        datetime_str = date_full + " " + time_part
    
        dt = datetime.strptime(datetime_str, "%Y%m%d %H:%M:%S")
        formatted_datetime = dt.strftime("%Y-%m-%d %H:%M:%S")
        return text_type(formatted_datetime)
    
    
    def analyze_slave_status(logpath=None):
        slave_status = {}
        start_time_pattern = (
            r"(\d{6}) (\d{2}:\d{2}:\d{2}) .*Connecting to MySQL server host:"
        )
        """
            240925 17:46:58 completed OK!
            240925 02:22:58 innobackupex: completed OK!
            240925 02:22:58 xtrabackup: completed OK!
        """
        end_time_pattern = r"(\d{6}) (\d{2}:\d{2}:\d{2}) .*completed OK!"
        with io.open(logpath, "rb") as fp:
            lines = fp.read().splitlines()
            for i in reversed(range(len(lines))):
                line = lines[i]
                if isinstance(line, binary_type):
                    line = line.decode("utf-8")
    
                m = re.search(start_time_pattern, line)
                if m:
                    # 提取日期和时间部分
                    date_part = m.group(1)
                    time_part = m.group(2)
                    slave_status["START_TIME"] = parse_date_part(date_part, time_part)
                    continue
    
                m = re.search(r"Using server version (\S*)", line)
                if m:
                    slave_status["SERVER_VERSION"] = text_type(m.group(1))
                    continue
    
                m = re.search("MySQL binlog position:", line)
                if m:
                    binlog_line = line
                    m = re.search(r"filename '(\S*)'", binlog_line)
                    if m:
                        slave_status["BINLOG_FILE"] = text_type(m.group(1))
                    m = re.search(r"position (\d+)", binlog_line)
                    m2 = re.search(r"position '(\d+)'", binlog_line)
                    if m:
                        try:
                            slave_status["BINLOG_POS"] = int(m.group(1))
                        except ValueError:
                            pass
                    elif m2:
                        try:
                            slave_status["BINLOG_POS"] = int(m2.group(1))
                        except ValueError:
                            pass
                    continue
    
                m = re.search("consistent_time (\d+)", line)
                if m:
                    try:
                        slave_status["CONSISTENT_TIME"] = int(m.group(1))
                    except ValueError:
                        pass
                    continue
    
                m = re.search(end_time_pattern, line)
                if m:
                    date_part = m.group(1)
                    time_part = m.group(2)
                    slave_status["END_TIME"] = parse_date_part(date_part, time_part)
                    continue
    
        return slave_status
    
    if __name__ == "__main__":
        logpath = sys.argv[1]
        slave_status = analyze_slave_status(logpath)
        print(slave_status)
    

Bash脚本处理流程

  1. 配置阿里云AccessKeyId和AccessKeySecret、DBS OpenAPI Endpoint,以及对应的数据源ID。

    #!/bin/bash
    ## 阿里云账号AccessKey拥有所有API的访问权限,风险很高。强烈建议您创建并使用RAM用户进行API访问或日常运维
    ## 强烈建议不要把 AccessKey 和 AccessKeySecret 保存到代码里,会存在密钥泄漏风险,您可以根据业务需要,保存到配置文件里
    
    AK=<ALIBABA_CLOUD_ACCESS_KEY_ID>
    SK=<ALIBABA_CLOUD_ACCESS_KEY_SECRET>
    DBS_API=dbs-api.<您的数据源所在地域ID,例如cn-hangzhou>.aliyuncs.com
    DATASOURCE_ID=<您的数据源ID>
  2. 执行xtrabackup备份命令,将备份数据保存到指定的目录,同时将错误日志输出到xtrabackup.log文件中。

    ## 获取脚本当前路径
    BASE_PATH=$(cd `dirname $0`; pwd)
    
    TS=`date +%Y%m%d_%H%M%S`
    XTRA_DATA_DIR=~/tool/xtrabackup_data/$TS
    
    mkdir -p $XTRA_DATA_DIR
    
    ## 执行xtrabackup备份命令,将错误日志输出的到xtrabackup.log文件
    ~/innobackupex --defaults-file=/etc/my.cnf --user='<您的数据库账号,例如root>' --password='<您的数据库密码,例如root>' --host='<您的数据库IP,例如localhost>' --port=<您的数据库端口号,例如3306> --socket='/var/lib/mysql/mysql.sock' --parallel=4 $XTRA_DATA_DIR/ 2>$XTRA_DATA_DIR/xtrabackup.log
    • 后续将解析xtrabackup.log文件获得全量备份集的元数据信息:备份开始时间、备份结束时间、一致性时间点、对应Binlog名称。

    • 限制说明:

      当前执行xtrabackup备份命令,暂不支持压缩和加密参数(--compress --compress-threads和--encrypt --encrypt-key-file --encrypt-threads),使用xtrabackup备份命令时请先删除对应的参数。

  3. 将文件目录形式的全量备份数据进行gzip压缩和打包成tar.gz格式的文件。

    ## 获取当前xtrabackup备份的目录
    backup_dir=`ls $XTRA_DATA_DIR | grep -v xtrabackup.log | head -n1`
    echo -e "\033[33mexecute innobackupex success, backup_dir: $backup_dir" && echo -n -e "\033[0m" && chmod 755 $XTRA_DATA_DIR/$backup_dir
    
    ## 打包成tar.gz文件
    cd $XTRA_DATA_DIR/$backup_dir && tar -czvf ../$backup_dir.tar.gz . && file ../$backup_dir.tar.gz
    echo -e "\033[33mpackage to $backup_dir.tar.gz" && echo -n -e "\033[0m" && sleep 2
    • 限制说明:当前仅支持文件格式的备份数据上传,支持如下格式:

      • tar:目录文件进行tar打包。

      • tar.gz:目录文件进行tar打包,然后进行gzip压缩。

    • 注意事项:

      • 使用tar命令打包前,需要chmod 755命令修改文件目录权限。

      • 需进入文件夹根目录,进行tar命令的打包和压缩。

  4. 调用Python脚本upload_and_register_backup_set.py进行备份数据的上传,同时注册备份集元数据。

    ## 调用Python脚本进行上传备份数据,并注册备份集元数据
    python3 $BASE_PATH/upload_and_register_backup_set.py --access_key_id $AK --access_key_secret $SK --endpoint $DBS_API --datasource_id $DATASOURCE_ID --region_code=cn-hangzhou --data_type=FullBackup --file_path=$XTRA_DATA_DIR/$backup_dir.tar.gz --xtrabackup_log_path=$XTRA_DATA_DIR/xtrabackup.log

    参数

    说明

    --access_key_id

    阿里云AccessKeyId。

    --access_key_secret

    阿里云AccessKeySecret。

    --endpoint

    DBS OpenAPI对应的Endpoint。请参见服务接入点

    --datasource_id

    DBS对应的数据源ID。

    --region_code

    对应地域信息。

    --data_type

    备份数据类型,全量备份集:FullBackup,日志备份:LogBackup。

    --file_path

    全量备份数据路径。

    --xtrabackup_log_path

    全量备份xtrabackup命令产生的xtrabackup.log文件路径。

Python脚本处理流程

  1. main函数主入口:

    根据data_type传参为FullBackug或者LogBackup,分别构造元数据注册依赖的extra_meta信息:

    说明
    • BINLOG_FILE:对应Binlog名称。

    • dataBegin:备份开始时间,毫秒级别时间戳。

    • dataEnd:备份结束时间,毫秒级别时间戳。

    • consistentTime:一致性时间点,秒级时间戳。

    • 全量备份extra_meta格式:

      {
        'BINLOG_FILE':'mysql-bin.001',
        'version':'5.5',
        'dataBegin':17274********,
        'dataEnd':17274********,
        'consistentTime':17274********
      }
    • 日志备份extra_meta格式:

      {
        'dataBegin':17274********,
        'dataEnd':17274********
      }
  2. 调用OssUploader.upload_and_register_backup_set方法进行备份数据上传和元数据注册。

    if __name__ == '__main__':
        args = init_command_args()
        uploader = OssUploader(args.access_key_id, args.access_key_secret,
                               args.endpoint, args.region_code, args.datasource_id)
    
        # 全量和日志备份分别通过不同方式构造extraMeta
        extra_meta = '{}'
        if args.data_type == 'FullBackup':
            obj = {}
            if args.xtrabackup_log_path is not None:
                obj = xtrabackup_log_parser.analyze_slave_status(logpath=args.xtrabackup_log_path)
            elif args.xtrabackup_info_path is not None:
                parser = xtrabackup_info_parser.ExtraMetaParser(file_path=args.xtrabackup_info_path)
                obj = parser.get_extra_meta()
            extra_meta = {'BINLOG_FILE': obj.get('BINLOG_FILE'),
                          'version': obj.get("SERVER_VERSION"),
                          'dataBegin': date_to_unix_timestamp(obj.get("START_TIME")),
                          'dataEnd': date_to_unix_timestamp(obj.get("END_TIME")),
                          'consistentTime': int(date_to_unix_timestamp(obj.get("END_TIME")) / 1000)}
            extra_meta = json.dumps(extra_meta)
    
        elif args.data_type == 'LogBackup':
            obj = {'dataBegin': date_to_unix_timestamp(args.begin_time),
                   'dataEnd': date_to_unix_timestamp(args.end_time)}
            extra_meta = json.dumps(obj)
        print(f"get extra meta json string: {extra_meta}")
    
        # 上传数据,并注册备份集元信息
        uploader.upload_and_register_backup_set(file_path=args.file_path, data_type=args.data_type, extra_meta=extra_meta)
  3. OssUploader.upload_and_register_backup_set方法备份数据上传流程。

    class OssUploader:
    
        def __init__(self, access_key_id, access_key_secret, endpoint, region_code, datasource_id):
            self.access_key_id = access_key_id
            self.access_key_secret = access_key_secret
            self.endpoint = endpoint
            self.region_code = region_code
            self.datasource_id = datasource_id
    
            config = open_api_models.Config(access_key_id, access_key_secret)
            # Endpoint 请参考 https://api.aliyun.com/product/Rds
            config.endpoint = endpoint
            self.client = OpenApiClient(config)
    
        """
        注册备份集元数据
        """
        def configure_backup_set_info(self, req_param):
            params = open_api_models.Params(
                # 接口名称,
                action='ConfigureBackupSetInfo',
                # 接口版本,
                version='2021-01-01',
                # 接口协议,
                protocol='HTTPS',
                # 接口 HTTP 方法,
                method='POST',
                auth_type='AK',
                style='RPC',
                # 接口 PATH,
                pathname='/',
                # 接口请求体内容格式,
                req_body_type='json',
                # 接口响应体内容格式,
                body_type='json'
            )
            # runtime options
            runtime = util_models.RuntimeOptions()
            request = open_api_models.OpenApiRequest(
                query=OpenApiUtilClient.query(req_param)
            )
            # 返回值为 Map 类型,可从 Map 中获得三类数据:响应体 body、响应头 headers、HTTP 返回的状态码 statusCode。
            print(f"ConfigureBackupSetInfo request: {req_param}")
            data = self.client.call_api(params, request, runtime)
    
            print(f"ConfigureBackupSetInfo response: {data}")
            return data['body']['Data']
    
        """
        获取oss上传信息
        """
        def describe_bak_datasource_storage_access_info(self, req_param):
            params = open_api_models.Params(
                # 接口名称,
                action='DescribeBakDataSourceStorageAccessInfo',
                # 接口版本,
                version='2021-01-01',
                # 接口协议,
                protocol='HTTPS',
                # 接口 HTTP 方法,
                method='POST',
                auth_type='AK',
                style='RPC',
                # 接口 PATH,
                pathname='/',
                # 接口请求体内容格式,
                req_body_type='json',
                # 接口响应体内容格式,
                body_type='json'
            )
            # runtime options
            runtime = util_models.RuntimeOptions()
            request = open_api_models.OpenApiRequest(
                query=OpenApiUtilClient.query(req_param)
            )
            # 返回值为 Map 类型,可从 Map 中获得三类数据:响应体 body、响应头 headers、HTTP 返回的状态码 statusCode。
            print(f"DescribeBakDataSourceStorageAccessInfo request: {req_param}")
            data = self.client.call_api(params, request, runtime)
    
            print(f"DescribeBakDataSourceStorageAccessInfo response: {data}")
            return data['body']['Data']
    
        def _fetch_oss_access_info(self, params):
            info = self.describe_bak_datasource_storage_access_info({
                'RegionId': params['RegionId'],
                'DataSourceId': params['DataSourceId'],
                'RegionCode': params['RegionCode'],
                'BackupType': params['BackupType'],
                'BackupSetId': params['BackupSetId']
            })
            return info['OssAccessInfo']
    
        def upload_and_register_backup_set(self, file_path, data_type, extra_meta):
            filename = os.path.basename(file_path)
            params = {'BackupMode': 'Automated', 'BackupMethod': 'Physical', 'RegionId': self.region_code,
                      'RegionCode': self.region_code, 'DataSourceId': self.datasource_id, 'BackupSetName': filename,
                      'ExtraMeta': extra_meta, 'BackupType': data_type, 'UploadStatus': 'WaitingUpload'}
    
            # 首次注册备份集,返回备份集ID
            data = self.configure_backup_set_info(params)
            params['BackupSetId'] = data['BackupSetId']
            print(f"------ configure_backup_set_info success: {file_path}, {data_type}, {params['BackupSetId']}, WaitingUpload\n")
    
            # 上传数据到oss
            oss_info = self._fetch_oss_access_info(params)
            oss_client = create_oss_client(oss_info)
            upload_oss_file(oss_client, file_path, oss_info['ObjectKey'])
            print(f"------ upload_oss_file success: {file_path}, {data_type}, {params['BackupSetId']}\n")
    
            # 标记备份集上传完成
            params['UploadStatus'] = 'UploadSuccess'
            self.configure_backup_set_info(params)
            print(f"------ configure_backup_set_info success: {file_path}, {data_type}, {params['BackupSetId']}, UploadSuccess\n")