上传用户自动备份数据

本文介绍在使用数据灾备(DBS)用户自动备份功能进行备份上云时,如何将本地备份文件上传到对应数据源中。本文包含上传所需完整脚本以及脚本内容讲解。

准备工作

  • 提前配置数据源,获得数据源ID,如何配置数据源请参见添加数据源

  • 创建一个RAM用户并授予管理对应产品的权限,准备好AccessKeyId和AccessKeySecret信息。具体操作,请参见创建RAM用户为RAM用户授权

使用限制

目前仅支持MySQL 5.5版本的数据源。

环境依赖

  • 命令工具:Bash、Python3。

  • Python相关:oss2、alibabacloud_openapi_util、alibabacloud_tea_openapi、alibabacloud_tea_util。

## 安装阿里云OpenAPI SDK
pip3 install --upgrade pip
pip3 install -i https://mirrors.aliyun.com/pypi/simple/ oss2 alibabacloud_openapi_util alibabacloud_tea_openapi alibabacloud_tea_util

Bash脚本处理流程

  1. 配置阿里云AccessKeyId和AccessKeySecret、DBS OpenAPI Endpoint,以及对应的数据源ID。

    #!/bin/bash
    ## 阿里云账号AccessKey拥有所有API的访问权限,风险很高。强烈建议您创建并使用RAM用户进行API访问或日常运维
    ## 强烈建议不要把 AccessKey 和 AccessKeySecret 保存到代码里,会存在密钥泄漏风险,您可以根据业务需要,保存到配置文件里
    
    AK=<ALIBABA_CLOUD_ACCESS_KEY_ID>
    SK=<ALIBABA_CLOUD_ACCESS_KEY_SECRET>
    DBS_API=dbs-api.<您的数据源所在地域ID,例如cn-hangzhou>.aliyuncs.com
    DATASOURCE_ID=<您的数据源ID>
  2. 执行xtrabackup备份命令,将备份数据保存到指定的目录,同时将错误日志输出到xtrabackup.log文件中。

    ## 获取脚本当前路径
    BASE_PATH=$(cd `dirname $0`; pwd)
    
    TS=`date +%Y%m%d_%H%M%S`
    XTRA_DATA_DIR=~/tool/xtrabackup_data/$TS
    
    mkdir -p $XTRA_DATA_DIR
    
    ## 执行xtrabackup备份命令,将错误日志输出的到xtrabackup.log文件
    ~/innobackupex --defaults-file=/etc/my.cnf --user='<您的数据库账号,例如root>' --password='<您的数据库密码,例如root>' --host='<您的数据库IP,例如localhost>' --port=<您的数据库端口号,例如3306> --socket='/var/lib/mysql/mysql.sock' --parallel=4 $XTRA_DATA_DIR/ 2>$XTRA_DATA_DIR/xtrabackup.log
    • 后续将解析xtrabackup.log文件获得全量备份集的元数据信息:备份开始时间、备份结束时间、一致性时间点、对应Binlog名称。

    • 限制说明:

      当前执行xtrabackup备份命令,暂不支持压缩和加密参数(--compress --compress-threads和--encrypt --encrypt-key-file --encrypt-threads),使用xtrabackup备份命令时请先删除对应的参数。

  3. 将文件目录形式的全量备份数据进行gzip压缩和打包成tar.gz格式的文件。

    ## 获取当前xtrabackup备份的目录
    backup_dir=`ls $XTRA_DATA_DIR | grep -v xtrabackup.log | head -n1`
    echo -e "\033[33mexecute innobackupex success, backup_dir: $backup_dir" && echo -n -e "\033[0m" && chmod 755 $XTRA_DATA_DIR/$backup_dir
    
    ## 打包成tar.gz文件
    cd $XTRA_DATA_DIR/$backup_dir && tar -czvf ../$backup_dir.tar.gz . && file ../$backup_dir.tar.gz
    echo -e "\033[33mpackage to $backup_dir.tar.gz" && echo -n -e "\033[0m" && sleep 2
    • 限制说明:当前仅支持文件格式的备份数据上传,支持如下格式:

      • tar:目录文件进行tar打包。

      • tar.gz:目录文件进行tar打包,然后进行gzip压缩。

    • 注意事项:

      • 使用tar命令打包前,需要chmod 755命令修改文件目录权限。

      • 需进入文件夹根目录,进行tar命令的打包和压缩。

  4. 调用Python脚本upload_and_register_backup_set.py进行备份数据的上传,同时注册备份集元数据。

    ## 调用Python脚本进行上传备份数据,并注册备份集元数据
    python3 $BASE_PATH/upload_and_register_backup_set.py --access_key_id $AK --access_key_secret $SK --endpoint $DBS_API --datasource_id $DATASOURCE_ID --region_code=cn-hangzhou --data_type=FullBackup --file_path=$XTRA_DATA_DIR/$backup_dir.tar.gz --xtrabackup_log_path=$XTRA_DATA_DIR/xtrabackup.log

    参数

    说明

    --access_key_id

    阿里云AccessKeyId。

    --access_key_secret

    阿里云AccessKeySecret。

    --endpoint

    DBS OpenAPI对应的Endpoint。请参见服务接入点

    --datasource_id

    DBS对应的数据源ID。

    --region_code

    对应地域信息。

    --data_type

    备份数据类型,全量备份集:FullBackup,日志备份:LogBackup。

    --file_path

    全量备份数据路径。

    --xtrabackup_log_path

    全量备份xtrabackup命令产生的xtrabackup.log文件路径。

Python脚本处理流程

  1. main函数主入口:

    根据data_type传参为FullBackug或者LogBackup,分别构造元数据注册依赖的extra_meta信息:

    说明
    • BINLOG_FILE:对应Binlog名称。

    • dataBegin:备份开始时间,毫秒级别时间戳。

    • dataEnd:备份结束时间,毫秒级别时间戳。

    • consistentTime:一致性时间点,秒级时间戳。

    • 全量备份extra_meta格式:

      {
        'BINLOG_FILE':'mysql-bin.001',
        'version':'5.5',
        'dataBegin':17274********,
        'dataEnd':17274********,
        'consistentTime':17274********
      }
    • 日志备份extra_meta格式:

      {
        'dataBegin':17274********,
        'dataEnd':17274********
      }
  2. 调用OssUploader.upload_and_register_backup_set方法进行备份数据上传和元数据注册。

    if __name__ == '__main__':
        args = init_command_args()
        uploader = OssUploader(args.access_key_id, args.access_key_secret,
                               args.endpoint, args.region_code, args.datasource_id)
    
        # 全量和日志备份分别通过不同方式构造extraMeta
        extra_meta = '{}'
        if args.data_type == 'FullBackup':
            obj = {}
            if args.xtrabackup_log_path is not None:
                obj = xtrabackup_log_parser.analyze_slave_status(logpath=args.xtrabackup_log_path)
            elif args.xtrabackup_info_path is not None:
                parser = xtrabackup_info_parser.ExtraMetaParser(file_path=args.xtrabackup_info_path)
                obj = parser.get_extra_meta()
            extra_meta = {'BINLOG_FILE': obj.get('BINLOG_FILE'),
                          'version': obj.get("SERVER_VERSION"),
                          'dataBegin': date_to_unix_timestamp(obj.get("START_TIME")),
                          'dataEnd': date_to_unix_timestamp(obj.get("END_TIME")),
                          'consistentTime': int(date_to_unix_timestamp(obj.get("END_TIME")) / 1000)}
            extra_meta = json.dumps(extra_meta)
    
        elif args.data_type == 'LogBackup':
            obj = {'dataBegin': date_to_unix_timestamp(args.begin_time),
                   'dataEnd': date_to_unix_timestamp(args.end_time)}
            extra_meta = json.dumps(obj)
        print(f"get extra meta json string: {extra_meta}")
    
        # 上传数据,并注册备份集元信息
        uploader.upload_and_register_backup_set(file_path=args.file_path, data_type=args.data_type, extra_meta=extra_meta)
  3. OssUploader.upload_and_register_backup_set方法备份数据上传流程。

    class OssUploader:
    
        def __init__(self, access_key_id, access_key_secret, endpoint, region_code, datasource_id):
            self.access_key_id = access_key_id
            self.access_key_secret = access_key_secret
            self.endpoint = endpoint
            self.region_code = region_code
            self.datasource_id = datasource_id
    
            config = open_api_models.Config(access_key_id, access_key_secret)
            # Endpoint 请参考 https://api.aliyun.com/product/Rds
            config.endpoint = endpoint
            self.client = OpenApiClient(config)
    
        """
        注册备份集元数据
        """
        def configure_backup_set_info(self, req_param):
            params = open_api_models.Params(
                # 接口名称,
                action='ConfigureBackupSetInfo',
                # 接口版本,
                version='2021-01-01',
                # 接口协议,
                protocol='HTTPS',
                # 接口 HTTP 方法,
                method='POST',
                auth_type='AK',
                style='RPC',
                # 接口 PATH,
                pathname='/',
                # 接口请求体内容格式,
                req_body_type='json',
                # 接口响应体内容格式,
                body_type='json'
            )
            # runtime options
            runtime = util_models.RuntimeOptions()
            request = open_api_models.OpenApiRequest(
                query=OpenApiUtilClient.query(req_param)
            )
            # 返回值为 Map 类型,可从 Map 中获得三类数据:响应体 body、响应头 headers、HTTP 返回的状态码 statusCode。
            print(f"ConfigureBackupSetInfo request: {req_param}")
            data = self.client.call_api(params, request, runtime)
    
            print(f"ConfigureBackupSetInfo response: {data}")
            return data['body']['Data']
    
        """
        获取oss上传信息
        """
        def describe_bak_datasource_storage_access_info(self, req_param):
            params = open_api_models.Params(
                # 接口名称,
                action='DescribeBakDataSourceStorageAccessInfo',
                # 接口版本,
                version='2021-01-01',
                # 接口协议,
                protocol='HTTPS',
                # 接口 HTTP 方法,
                method='POST',
                auth_type='AK',
                style='RPC',
                # 接口 PATH,
                pathname='/',
                # 接口请求体内容格式,
                req_body_type='json',
                # 接口响应体内容格式,
                body_type='json'
            )
            # runtime options
            runtime = util_models.RuntimeOptions()
            request = open_api_models.OpenApiRequest(
                query=OpenApiUtilClient.query(req_param)
            )
            # 返回值为 Map 类型,可从 Map 中获得三类数据:响应体 body、响应头 headers、HTTP 返回的状态码 statusCode。
            print(f"DescribeBakDataSourceStorageAccessInfo request: {req_param}")
            data = self.client.call_api(params, request, runtime)
    
            print(f"DescribeBakDataSourceStorageAccessInfo response: {data}")
            return data['body']['Data']
    
        def _fetch_oss_access_info(self, params):
            info = self.describe_bak_datasource_storage_access_info({
                'RegionId': params['RegionId'],
                'DataSourceId': params['DataSourceId'],
                'RegionCode': params['RegionCode'],
                'BackupType': params['BackupType'],
                'BackupSetId': params['BackupSetId']
            })
            return info['OssAccessInfo']
    
        def upload_and_register_backup_set(self, file_path, data_type, extra_meta):
            filename = os.path.basename(file_path)
            params = {'BackupMode': 'Automated', 'BackupMethod': 'Physical', 'RegionId': self.region_code,
                      'RegionCode': self.region_code, 'DataSourceId': self.datasource_id, 'BackupSetName': filename,
                      'ExtraMeta': extra_meta, 'BackupType': data_type, 'UploadStatus': 'WaitingUpload'}
    
            # 首次注册备份集,返回备份集ID
            data = self.configure_backup_set_info(params)
            params['BackupSetId'] = data['BackupSetId']
            print(f"------ configure_backup_set_info success: {file_path}, {data_type}, {params['BackupSetId']}, WaitingUpload\n")
    
            # 上传数据到oss
            oss_info = self._fetch_oss_access_info(params)
            oss_client = create_oss_client(oss_info)
            upload_oss_file(oss_client, file_path, oss_info['ObjectKey'])
            print(f"------ upload_oss_file success: {file_path}, {data_type}, {params['BackupSetId']}\n")
    
            # 标记备份集上传完成
            params['UploadStatus'] = 'UploadSuccess'
            self.configure_backup_set_info(params)
            print(f"------ configure_backup_set_info success: {file_path}, {data_type}, {params['BackupSetId']}, UploadSuccess\n")

完整上传脚本

备份数据上传脚本分为两部分:

Bash脚本

脚本执行处理入口,按需替换实际执行的参数。

boot_backup.sh:负责模拟本地xtrabackup全量备份的流程。

#!/bin/bash
## 阿里云账号AccessKey拥有所有API的访问权限,风险很高。强烈建议您创建并使用RAM用户进行API访问或日常运维
## 强烈建议不要把 AccessKey 和 AccessKeySecret 保存到代码里,会存在密钥泄漏风险,您可以根据业务需要,保存到配置文件里

AK=<ALIBABA_CLOUD_ACCESS_KEY_ID>
SK=<ALIBABA_CLOUD_ACCESS_KEY_SECRET>
DBS_API=dbs-api.<您的数据源所在地域ID,例如cn-hangzhou>.aliyuncs.com
DATASOURCE_ID=<您的数据源ID>

## 获取脚本当前路径
BASE_PATH=$(cd `dirname $0`; pwd)

TS=`date +%Y%m%d_%H%M%S`
XTRA_DATA_DIR=~/tool/xtrabackup_data/$TS

mkdir -p $XTRA_DATA_DIR

## 执行xtrabackup备份命令,将错误日志输出的到xtrabackup.log文件
~/innobackupex --defaults-file=/etc/my.cnf --user='root' --password='root' --host='localhost' --port=3306 --socket='/var/lib/mysql/mysql.sock' --parallel=4 $XTRA_DATA_DIR/ 2>$XTRA_DATA_DIR/xtrabackup.log

## 获取当前xtrabackup备份的目录
backup_dir=`ls $XTRA_DATA_DIR | grep -v xtrabackup.log | head -n1`
echo -e "\033[33mexecute innobackupex success, backup_dir: $backup_dir" && echo -n -e "\033[0m" && chmod 755 $XTRA_DATA_DIR/$backup_dir

## 打包成tar.gz文件
cd $XTRA_DATA_DIR/$backup_dir && tar -czvf ../$backup_dir.tar.gz . && file ../$backup_dir.tar.gz
echo -e "\033[33mpackage to $backup_dir.tar.gz" && echo -n -e "\033[0m" && sleep 2

## 调用Python脚本进行上传备份数据,并注册备份集元数据
python3 $BASE_PATH/upload_and_register_backup_set.py --access_key_id $AK --access_key_secret $SK --endpoint $DBS_API --datasource_id $DATASOURCE_ID --region_code=cn-hangzhou --data_type=FullBackup --file_path=$XTRA_DATA_DIR/$backup_dir.tar.gz --xtrabackup_log_path=$XTRA_DATA_DIR/xtrabackup.log

Python脚本

  • upload_and_register_backup_set.py:用于上传全量和日志备份数据,解析对应的元数据,并进行备份集元数据的注册。

    # -*- coding: utf-8 -*-
    # This file is auto-generated, don't edit it. Thanks.
    import os
    import argparse
    import re
    import time
    import json
    from datetime import datetime
    
    import oss2
    from alibabacloud_openapi_util.client import Client as OpenApiUtilClient
    from alibabacloud_tea_openapi import models as open_api_models
    from alibabacloud_tea_openapi.client import Client as OpenApiClient
    from alibabacloud_tea_util import models as util_models
    
    import xtrabackup_info_parser
    import xtrabackup_log_parser
    
    def init_command_args():
        parser = argparse.ArgumentParser(description="A sample command-line parser.")
        parser.add_argument("--access_key_id", help="Aliyun AccessKeyId.")
        parser.add_argument("--access_key_secret", help="Aliyun AccessKeySecret.")
        parser.add_argument("--endpoint", help="Aliyun API Endpoint.")
        parser.add_argument("--region_code", help="Aliyun DataSource RegionCode.")
        parser.add_argument("--datasource_id", help="Aliyun DataSourceId.")
        parser.add_argument("--data_type", help="BackupSet DataType: FullBackup | LogBackup.")
        parser.add_argument("--file_path", help="BackupSet File Path.")
        parser.add_argument("--xtrabackup_info_path", help="Xtrabackup Info Path.")
        parser.add_argument("--xtrabackup_log_path", help="Xtrabackup Log Path.")
        parser.add_argument("--begin_time", help="Binlog Begin Time.")
        parser.add_argument("--end_time", help="Binlog End Time.")
    
        args = parser.parse_args()
        if args.access_key_id:
            print(f"access_key_id: ************")
        if args.access_key_secret:
            print(f"access_key_secret: ************")
        if args.endpoint:
            print(f"endpoint: {args.endpoint}")
        if args.region_code:
            print(f"region_code: {args.region_code}")
        if args.datasource_id:
            print(f"datasource_id: {args.datasource_id}")
        if args.data_type:
            print(f"data_type: {args.data_type}")
        if args.file_path:
            print(f"file_path: {args.file_path}")
        if args.xtrabackup_info_path:
            print(f"xtrabackup_info_path: {args.xtrabackup_info_path}")
        if args.xtrabackup_log_path:
            print(f"xtrabackup_log_path: {args.xtrabackup_log_path}")
        if args.begin_time:
            print(f"begin_time: {args.begin_time}")
        if args.end_time:
            print(f"end_time: {args.end_time}")
    
        print('\n')
        return args
    
    
    def date_to_unix_timestamp(date_str):
        dt_obj = datetime.strptime(date_str, "%Y-%m-%d %H:%M:%S")
        # 使用.time()方法得到时间元组,然后用time.mktime转换为秒级时间戳
        timestamp_seconds = time.mktime(dt_obj.timetuple())
        return int(timestamp_seconds) * 1000
    
    
    def create_oss_client(params):
        # 阿里云OSS认证信息
        access_key_id = params['AccessKeyId']
        access_key_secret = params['AccessKeySecret']
        security_token = params['SecurityToken']
        bucket_name = params['BucketName']
        endpoint = params['OssEndpoint']
    
        # 初始化OSS客户端
        auth = oss2.StsAuth(access_key_id, access_key_secret, security_token)
        return oss2.Bucket(auth, endpoint, bucket_name)
    
    
    def upload_oss_file(oss_client, file_path, object_key):
        """
        分片上传大文件到OSS
        :param oss_client:
        :param file_path: 本地文件路径
        :param object_key: OSS中的对象键,即文件名
        """
        # 设置分片大小,单位为字节,默认1MB
        part_size = 1024 * 1024 * 5
        # 初始化分片上传
        upload_id = oss_client.init_multipart_upload(object_key).upload_id
    
        # 打开文件并读取内容
        with open(file_path, 'rb') as file_obj:
            parts = []
            while True:
                data = file_obj.read(part_size)
                if not data:
                    break
                # 上传分片
                result = oss_client.upload_part(object_key, upload_id, len(parts) + 1, data)
                parts.append(oss2.models.PartInfo(len(parts) + 1, result.etag))
    
            # 完成分片上传
            oss_client.complete_multipart_upload(object_key, upload_id, parts)
    
    
    class OssUploader:
    
        def __init__(self, access_key_id, access_key_secret, endpoint, region_code, datasource_id):
            self.access_key_id = access_key_id
            self.access_key_secret = access_key_secret
            self.endpoint = endpoint
            self.region_code = region_code
            self.datasource_id = datasource_id
    
            config = open_api_models.Config(access_key_id, access_key_secret)
            # Endpoint 请参考 https://api.aliyun.com/product/Rds
            config.endpoint = endpoint
            self.client = OpenApiClient(config)
    
        """
        注册备份集元数据
        """
        def configure_backup_set_info(self, req_param):
            params = open_api_models.Params(
                # 接口名称,
                action='ConfigureBackupSetInfo',
                # 接口版本,
                version='2021-01-01',
                # 接口协议,
                protocol='HTTPS',
                # 接口 HTTP 方法,
                method='POST',
                auth_type='AK',
                style='RPC',
                # 接口 PATH,
                pathname='/',
                # 接口请求体内容格式,
                req_body_type='json',
                # 接口响应体内容格式,
                body_type='json'
            )
            # runtime options
            runtime = util_models.RuntimeOptions()
            request = open_api_models.OpenApiRequest(
                query=OpenApiUtilClient.query(req_param)
            )
            # 返回值为 Map 类型,可从 Map 中获得三类数据:响应体 body、响应头 headers、HTTP 返回的状态码 statusCode。
            print(f"ConfigureBackupSetInfo request: {req_param}")
            data = self.client.call_api(params, request, runtime)
    
            print(f"ConfigureBackupSetInfo response: {data}")
            return data['body']['Data']
    
        """
        获取oss上传信息
        """
        def describe_bak_datasource_storage_access_info(self, req_param):
            params = open_api_models.Params(
                # 接口名称,
                action='DescribeBakDataSourceStorageAccessInfo',
                # 接口版本,
                version='2021-01-01',
                # 接口协议,
                protocol='HTTPS',
                # 接口 HTTP 方法,
                method='POST',
                auth_type='AK',
                style='RPC',
                # 接口 PATH,
                pathname='/',
                # 接口请求体内容格式,
                req_body_type='json',
                # 接口响应体内容格式,
                body_type='json'
            )
            # runtime options
            runtime = util_models.RuntimeOptions()
            request = open_api_models.OpenApiRequest(
                query=OpenApiUtilClient.query(req_param)
            )
            # 返回值为 Map 类型,可从 Map 中获得三类数据:响应体 body、响应头 headers、HTTP 返回的状态码 statusCode。
            print(f"DescribeBakDataSourceStorageAccessInfo request: {req_param}")
            data = self.client.call_api(params, request, runtime)
    
            print(f"DescribeBakDataSourceStorageAccessInfo response: {data}")
            return data['body']['Data']
    
        def _fetch_oss_access_info(self, params):
            info = self.describe_bak_datasource_storage_access_info({
                'RegionId': params['RegionId'],
                'DataSourceId': params['DataSourceId'],
                'RegionCode': params['RegionCode'],
                'BackupType': params['BackupType'],
                'BackupSetId': params['BackupSetId']
            })
            return info['OssAccessInfo']
    
        def upload_and_register_backup_set(self, file_path, data_type, extra_meta):
            filename = os.path.basename(file_path)
            params = {'BackupMode': 'Automated', 'BackupMethod': 'Physical', 'RegionId': self.region_code,
                      'RegionCode': self.region_code, 'DataSourceId': self.datasource_id, 'BackupSetName': filename,
                      'ExtraMeta': extra_meta, 'BackupType': data_type, 'UploadStatus': 'WaitingUpload'}
    
            # 首次注册备份集,返回备份集ID
            data = self.configure_backup_set_info(params)
            params['BackupSetId'] = data['BackupSetId']
            print(f"------ configure_backup_set_info success: {file_path}, {data_type}, {params['BackupSetId']}, WaitingUpload\n")
    
            # 上传数据到oss
            oss_info = self._fetch_oss_access_info(params)
            oss_client = create_oss_client(oss_info)
            upload_oss_file(oss_client, file_path, oss_info['ObjectKey'])
            print(f"------ upload_oss_file success: {file_path}, {data_type}, {params['BackupSetId']}\n")
    
            # 标记备份集上传完成
            params['UploadStatus'] = 'UploadSuccess'
            self.configure_backup_set_info(params)
            print(f"------ configure_backup_set_info success: {file_path}, {data_type}, {params['BackupSetId']}, UploadSuccess\n")
    
    
    if __name__ == '__main__':
        args = init_command_args()
        uploader = OssUploader(args.access_key_id, args.access_key_secret,
                               args.endpoint, args.region_code, args.datasource_id)
    
        # 全量和日志备份分别通过不同方式构造extraMeta
        extra_meta = '{}'
        if args.data_type == 'FullBackup':
            obj = {}
            if args.xtrabackup_log_path is not None:
                obj = xtrabackup_log_parser.analyze_slave_status(logpath=args.xtrabackup_log_path)
            elif args.xtrabackup_info_path is not None:
                parser = xtrabackup_info_parser.ExtraMetaParser(file_path=args.xtrabackup_info_path)
                obj = parser.get_extra_meta()
            extra_meta = {'BINLOG_FILE': obj.get('BINLOG_FILE'),
                          'version': obj.get("SERVER_VERSION"),
                          'dataBegin': date_to_unix_timestamp(obj.get("START_TIME")),
                          'dataEnd': date_to_unix_timestamp(obj.get("END_TIME")),
                          'consistentTime': int(date_to_unix_timestamp(obj.get("END_TIME")) / 1000)}
            extra_meta = json.dumps(extra_meta)
    
        elif args.data_type == 'LogBackup':
            obj = {'dataBegin': date_to_unix_timestamp(args.begin_time),
                   'dataEnd': date_to_unix_timestamp(args.end_time)}
            extra_meta = json.dumps(obj)
        print(f"get extra meta json string: {extra_meta}")
    
        # 上传数据,并注册备份集元信息
        uploader.upload_and_register_backup_set(file_path=args.file_path, data_type=args.data_type, extra_meta=extra_meta)
  • xtrabackup_info_parser.py:使用备份文件xtrabackup_info进行元数据解析。

    # -*- coding: utf-8 -*-
    # This file is auto-generated, don't edit it. Thanks.
    import re
    import json
    
    
    class ExtraMetaParser:
        def __init__(self, file_path):
            self.file_path = file_path
            pass
    
        def _parse_xtrabackup_info(self):
            config_data = {}
            with open(self.file_path, 'r') as file:
                for line in file:
                    line = line.strip()
                    if line and not line.startswith('#'):
                        key, value = line.split('=', 1)
                        config_data[key.strip()] = value.strip()
            return config_data
    
        def get_extra_meta(self):
            config_data = self._parse_xtrabackup_info()
            print(f"xtrabackup_info: {config_data}")
    
            binlog_pos = config_data.get("binlog_pos")
            pattern = f"filename '(.*)', position (.*)"
            match = re.search(pattern, binlog_pos)
    
            return {'BINLOG_FILE': match.group(1),
                    'SERVER_VERSION': config_data.get("server_version"),
                    'START_TIME': config_data.get("start_time"),
                    'END_TIME': config_data.get("end_time")}
  • xtrabackup_log_parser.py:使用备份产生的日志文件xtrabackup.log进行元数据解析。

    #!/usr/bin/python
    # coding:utf8
    
    import io
    import re
    import sys
    from datetime import datetime
    
    from six import binary_type, text_type
    
    
    def parse_date_part(date_part, time_part):
        """
        根据给定的日期部分和时间部分,解析并返回完整的日期时间字符串。
        """
        # 获取当前年份的前两位
        current_century = datetime.now().strftime("%Y")[:2]
    
        year_short = date_part[:2]
        # 组合完整年份
        year_full = current_century + year_short
        date_full = year_full + date_part[2:]
        datetime_str = date_full + " " + time_part
    
        dt = datetime.strptime(datetime_str, "%Y%m%d %H:%M:%S")
        formatted_datetime = dt.strftime("%Y-%m-%d %H:%M:%S")
        return text_type(formatted_datetime)
    
    
    def analyze_slave_status(logpath=None):
        slave_status = {}
        start_time_pattern = (
            r"(\d{6}) (\d{2}:\d{2}:\d{2}) .*Connecting to MySQL server host:"
        )
        """
            240925 17:46:58 completed OK!
            240925 02:22:58 innobackupex: completed OK!
            240925 02:22:58 xtrabackup: completed OK!
        """
        end_time_pattern = r"(\d{6}) (\d{2}:\d{2}:\d{2}) .*completed OK!"
        with io.open(logpath, "rb") as fp:
            lines = fp.read().splitlines()
            for i in reversed(range(len(lines))):
                line = lines[i]
                if isinstance(line, binary_type):
                    line = line.decode("utf-8")
    
                m = re.search(start_time_pattern, line)
                if m:
                    # 提取日期和时间部分
                    date_part = m.group(1)
                    time_part = m.group(2)
                    slave_status["START_TIME"] = parse_date_part(date_part, time_part)
                    continue
    
                m = re.search(r"Using server version (\S*)", line)
                if m:
                    slave_status["SERVER_VERSION"] = text_type(m.group(1))
                    continue
    
                m = re.search("MySQL binlog position:", line)
                if m:
                    binlog_line = line
                    m = re.search(r"filename '(\S*)'", binlog_line)
                    if m:
                        slave_status["BINLOG_FILE"] = text_type(m.group(1))
                    m = re.search(r"position (\d+)", binlog_line)
                    m2 = re.search(r"position '(\d+)'", binlog_line)
                    if m:
                        try:
                            slave_status["BINLOG_POS"] = int(m.group(1))
                        except ValueError:
                            pass
                    elif m2:
                        try:
                            slave_status["BINLOG_POS"] = int(m2.group(1))
                        except ValueError:
                            pass
                    continue
    
                m = re.search("consistent_time (\d+)", line)
                if m:
                    try:
                        slave_status["CONSISTENT_TIME"] = int(m.group(1))
                    except ValueError:
                        pass
                    continue
    
                m = re.search(end_time_pattern, line)
                if m:
                    date_part = m.group(1)
                    time_part = m.group(2)
                    slave_status["END_TIME"] = parse_date_part(date_part, time_part)
                    continue
    
        return slave_status
    
    if __name__ == "__main__":
        logpath = sys.argv[1]
        slave_status = analyze_slave_status(logpath)
        print(slave_status)