数据灾备(DBS)除支持阿里云数据库和阿里云ECS自建数据库的灾备外,还支持对云下及其他云平台数据库进行灾备。
注意事项
若备份库表存在表结构不合理、大表、大字段等情况,备份实例的规格过小可能会导致后续备份实例资源不足,从而引发备份异常。因此,建议您在创建时选择较高规格的备份实例,以免后续备份出现异常。
操作步骤
产品自动备份
购买备份实例(逻辑备份)
- 登录数据管理DMS 5.0。
在顶部菜单栏中,选择安全与规范(DBS) > 数据灾备(DBS) > 灾备数据源。
说明若您使用的是极简模式的控制台,请单击控制台左上角的图标,选择全部功能 > 安全与规范(DBS) > 数据灾备(DBS) > 灾备数据源。
在上方选择地域,在云下及他云数据库 > 产品自动备份页签下,单击目标数据源ID进入数据源详情页。
在备份策略页面中,单击配置备份策略。
在选择备份计划页面单击购买备份计划,进入DBS售卖页。
配置如下参数,单击页面右下角的立即购买。
配置项
说明
商品类型
请选择备份实例(包年包月)。
备份实例地域
选择要存放备份数据的地域。
说明请确保备份实例所在地域与ECS实例所在地域相同。
数据源类型
当前仅支持MySQL。
规格
规格越高,备份与恢复的性能越高,支持的规格为:serverless(仅MySQL逻辑备份支持)、micro(入门型)、small(低配型)、medium(中配型)、large(高配型)、xlarge(高配型-无流量上限)。
说明如果数据库实例(例如生产环境的数据库)需要高性能的备份实例快速执行备份与恢复任务,建议选择xlarge或large规格,获取更高的备份恢复性能。
对备份恢复性能(速度)要求不高,您可以通过计算,选择性价比最高的备份实例规格。更多信息,请参见如何选择备份实例规格。
若备份库表存在表结构不合理、大表、大字段等情况,备份实例的规格过小可能会导致后续备份实例资源不足,从而引发备份异常。因此,建议您在创建时选择较高规格的备份实例,以免后续备份出现异常。
备份方式
请选择逻辑备份。
存储空间
您创建时无需选择容量,后续根据实际存入DBS内置存储中的数据量计费。计费详情,请参见存储费用。
资源组
配置资源组。选择默认或自定义的资源组,方便备份实例管理。
购买数量
按需选择购买数量,多个数据库实例需要创建多个备份实例,例如您希望备份数据库实例A与数据库实例B,需要购买2个备份实例。
购买时长
选择该备份实例的购买时长。
在确认订单页面,确认订单信息,阅读并选中服务协议,单击去支付。
支付成功后,请返回步骤5的选择备份计划页面,单击已完成支付,即可查看到已创建的备份实例。
配置备份策略
在选择备份计划页面选中待配置的备份实例,并单击下一步。
在选择库表页面,选中需要备份的库或表,单击移动到已选择对象框中,单击提交。
提交成功后,您可以在备份策略页面的逻辑备份页签下,单击启动按钮启动备份。
单击启动后系统会立即发起一次全量和增量备份任务。
说明如果您期望先完成一些其他操作(例如修改备份策略等),您也可以选择当前暂不启动备份,但后续系统会根据备份策略在备份时间自动启动备份。
用户自动备份
目前仅支持MySQL 5.5版本的数据源。
目前仅支持华东1(杭州)地域。
配置备份源以及上传备份文件
- 登录数据管理DMS 5.0。
在顶部菜单栏中,选择安全与规范(DBS) > 数据灾备(DBS) > 灾备数据源。
说明若您使用的是极简模式的控制台,请单击控制台左上角的图标,选择全部功能 > 安全与规范(DBS) > 数据灾备(DBS) > 灾备数据源。
在上方选择地域,在云下及他云数据库页签下,根据数据源类型选择新增入口。
单击新增数据源,在弹出的对话框中,配置如下信息,并选中目标备份计划,单击页面右下角下一步。
配置项
说明
数据源名称
建议配置具有业务意义的名称,便于后续识别。
引擎类型
数据库引擎类型,当前仅支持MySQL。
引擎版本
选择数据库引擎的版本。
引擎参数
{"lower_case_table_names":1}
若没有可选备份计划,单击购买备份计划,进入DBS售卖页,购买备份计划。
说明
商品类型
请选择备份实例(包年包月),不支持按量付费。
备份实例地域
选择要存放备份数据的地域。
数据源类型
选择MySQL。
规格
选择xmicro。该规格提供的免费数据量额度等,请参见备份计划规格。
备份方式
选择物理备份。
存储空间
您创建时无需选择容量,后续根据实际存入DBS内置存储中的数据量计费。计费详情,请参见存储费用。
资源组
配置资源组。选择默认或自定义的资源组,方便备份实例管理。
购买数量
按需选择购买数量,多个数据库实例需要创建多个备份实例,例如您希望备份数据库实例A与数据库实例B,需要购买2个备份实例。
购买时长
选择该备份实例的购买时长。
将备份集上传到指定Bucket中。上传方法,请参见用户自动备份数据上传指南。
上传完成后,单击确定。
查看备份信息
在用户自动备份页签,单击目标数据源ID。
配置备份策略
在用户自动备份页签,单击目标数据源操作列的查看备份策略。
配置完成之后,单击确定。
查看和下载备份数据
在用户自动备份页签,单击目标数据源ID。
单击左侧导航栏中的备份数据。
说明用户执行上传数据脚本并完成数据源新建后,当用户新产生一个备份集数据,数据备份管理系统会同步这个备份数据到备份数据页面。
单击目标备份集操作列的下载按钮获取下载链接,将备份集下载到本地。
创建恢复任务
恢复任务要求数据备份页面上必须存在已生成的备份集,并且该备份集的状态需为完成。
在用户自动备份页签,单击目标数据源ID。
单击左侧导航栏中的备份数据,单击创建恢复任务,手动配置恢复参数。
参数名称
说明
恢复任务名称
请输入本次恢复任务的名称。建议配置具有业务意义的名称,便于后续识别。
恢复位置
选择恢复实例位置,默认为恢复到RDS新实例。
数据库所在位置
选择数据库所在位置,默认为RDS。
实例地区
选择实例所在地区,目前仅支持华东1(杭州)地区。
VPC
选择创建的专有网络。
VSwitch
选择创建的交换机。
实例系列
选择恢复的实例系列。
实例规格
选择恢复的实例规格。
存储空间
选择所需存储空间。
恢复方式
选择恢复方式,目前仅支持按时间点恢复。
还原时间
选择还原时间,可选择恢复时间范围请参考恢复方式参数后的数据。
配置完成之后单击提交,会生成一个按所选还原时间点恢复的恢复任务,该任务信息将以列表的形式展示在恢复任务页面。
查看恢复任务
在用户自动备份页签,单击目标数据源ID。
单击左侧导航栏中的任务管理 > 恢复任务。
点击恢复结果列中的实例ID,可跳转至恢复的RDS实例基本信息页面。
查看恢复演练
在用户自动备份页签,单击目标数据源ID。
单击左侧导航栏中的恢复演练。
恢复演练概览指标
查看恢复演练概览,包含恢复任务成功率、恢复时长均值、数据备份演练覆盖率、日志备份演练覆盖率。
名称
说明
恢复任务成功率
在恢复时间点在筛选时间范围内的恢复任务成功率。
恢复时长均值
在筛选时间范围内恢复成功任务的平均耗时。
数据备份演练覆盖率
在筛选时间范围内数据备份在开源MySQL或RDS MySQL上执行过恢复演练的覆盖率。
日志备份演练覆盖率
在筛选时间范围内的日志备份在开源MySQL或RDS MySQL上执行过恢复演练的覆盖率。
恢复演练时间轴
通过时间轴展示在筛选时间范围内的各个时间点的恢复演练详情,在时间轴上点击鼠标左键可以查看当前时间点的演练信息。
备份数据恢复演练详情
单击数据备份,查看数据备份恢复演练详情列表。单击演练结果列实例ID,进入恢复的rds 实例的基本信息页面。
单击日志备份,可以查看日志备份恢复演练详情列表。
用户自动备份数据上传指南
准备工作
环境依赖
命令工具:Bash、Python3。
Python相关:oss2、alibabacloud_openapi_util、alibabacloud_tea_openapi、alibabacloud_tea_util。
## 安装阿里云OpenAPI SDK
pip3 install --upgrade pip
pip3 install -i https://mirrors.aliyun.com/pypi/simple/ oss2 alibabacloud_openapi_util alibabacloud_tea_openapi alibabacloud_tea_util
完整上传脚本
备份数据上传脚本分为两部分:
Bash脚本
脚本执行处理入口,按需替换实际执行的参数。
boot_backup.sh:负责模拟本地xtrabackup全量备份的流程。
#!/bin/bash
## 阿里云账号AccessKey拥有所有API的访问权限,风险很高。强烈建议您创建并使用RAM用户进行API访问或日常运维
## 强烈建议不要把 AccessKey 和 AccessKeySecret 保存到代码里,会存在密钥泄漏风险,您可以根据业务需要,保存到配置文件里
AK=<ALIBABA_CLOUD_ACCESS_KEY_ID>
SK=<ALIBABA_CLOUD_ACCESS_KEY_SECRET>
DBS_API=dbs-api.<您的数据源所在地域ID,例如cn-hangzhou>.aliyuncs.com
DATASOURCE_ID=<您的数据源ID>
## 获取脚本当前路径
BASE_PATH=$(cd `dirname $0`; pwd)
TS=`date +%Y%m%d_%H%M%S`
XTRA_DATA_DIR=~/tool/xtrabackup_data/$TS
mkdir -p $XTRA_DATA_DIR
## 执行xtrabackup备份命令,将错误日志输出的到xtrabackup.log文件
~/innobackupex --defaults-file=/etc/my.cnf --user='root' --password='root' --host='localhost' --port=3306 --socket='/var/lib/mysql/mysql.sock' --parallel=4 $XTRA_DATA_DIR/ 2>$XTRA_DATA_DIR/xtrabackup.log
## 获取当前xtrabackup备份的目录
backup_dir=`ls $XTRA_DATA_DIR | grep -v xtrabackup.log | head -n1`
echo -e "\033[33mexecute innobackupex success, backup_dir: $backup_dir" && echo -n -e "\033[0m" && chmod 755 $XTRA_DATA_DIR/$backup_dir
## 打包成tar.gz文件
cd $XTRA_DATA_DIR/$backup_dir && tar -czvf ../$backup_dir.tar.gz . && file ../$backup_dir.tar.gz
echo -e "\033[33mpackage to $backup_dir.tar.gz" && echo -n -e "\033[0m" && sleep 2
## 调用Python脚本进行上传备份数据,并注册备份集元数据
python3 $BASE_PATH/upload_and_register_backup_set.py --access_key_id $AK --access_key_secret $SK --endpoint $DBS_API --datasource_id $DATASOURCE_ID --region_code=cn-hangzhou --data_type=FullBackup --file_path=$XTRA_DATA_DIR/$backup_dir.tar.gz --xtrabackup_log_path=$XTRA_DATA_DIR/xtrabackup.log
Python脚本
upload_and_register_backup_set.py:用于上传全量和日志备份数据,解析对应的元数据,并进行备份集元数据的注册。
# -*- coding: utf-8 -*- # This file is auto-generated, don't edit it. Thanks. import os import argparse import re import time import json from datetime import datetime import oss2 from alibabacloud_openapi_util.client import Client as OpenApiUtilClient from alibabacloud_tea_openapi import models as open_api_models from alibabacloud_tea_openapi.client import Client as OpenApiClient from alibabacloud_tea_util import models as util_models import xtrabackup_info_parser import xtrabackup_log_parser def init_command_args(): parser = argparse.ArgumentParser(description="A sample command-line parser.") parser.add_argument("--access_key_id", help="Aliyun AccessKeyId.") parser.add_argument("--access_key_secret", help="Aliyun AccessKeySecret.") parser.add_argument("--endpoint", help="Aliyun API Endpoint.") parser.add_argument("--region_code", help="Aliyun DataSource RegionCode.") parser.add_argument("--datasource_id", help="Aliyun DataSourceId.") parser.add_argument("--data_type", help="BackupSet DataType: FullBackup | LogBackup.") parser.add_argument("--file_path", help="BackupSet File Path.") parser.add_argument("--xtrabackup_info_path", help="Xtrabackup Info Path.") parser.add_argument("--xtrabackup_log_path", help="Xtrabackup Log Path.") parser.add_argument("--begin_time", help="Binlog Begin Time.") parser.add_argument("--end_time", help="Binlog End Time.") args = parser.parse_args() if args.access_key_id: print(f"access_key_id: ************") if args.access_key_secret: print(f"access_key_secret: ************") if args.endpoint: print(f"endpoint: {args.endpoint}") if args.region_code: print(f"region_code: {args.region_code}") if args.datasource_id: print(f"datasource_id: {args.datasource_id}") if args.data_type: print(f"data_type: {args.data_type}") if args.file_path: print(f"file_path: {args.file_path}") if args.xtrabackup_info_path: print(f"xtrabackup_info_path: {args.xtrabackup_info_path}") if args.xtrabackup_log_path: print(f"xtrabackup_log_path: {args.xtrabackup_log_path}") if args.begin_time: print(f"begin_time: {args.begin_time}") if args.end_time: print(f"end_time: {args.end_time}") print('\n') return args def date_to_unix_timestamp(date_str): dt_obj = datetime.strptime(date_str, "%Y-%m-%d %H:%M:%S") # 使用.time()方法得到时间元组,然后用time.mktime转换为秒级时间戳 timestamp_seconds = time.mktime(dt_obj.timetuple()) return int(timestamp_seconds) * 1000 def create_oss_client(params): # 阿里云OSS认证信息 access_key_id = params['AccessKeyId'] access_key_secret = params['AccessKeySecret'] security_token = params['SecurityToken'] bucket_name = params['BucketName'] endpoint = params['OssEndpoint'] # 初始化OSS客户端 auth = oss2.StsAuth(access_key_id, access_key_secret, security_token) return oss2.Bucket(auth, endpoint, bucket_name) def upload_oss_file(oss_client, file_path, object_key): """ 分片上传大文件到OSS :param oss_client: :param file_path: 本地文件路径 :param object_key: OSS中的对象键,即文件名 """ # 设置分片大小,单位为字节,默认1MB part_size = 1024 * 1024 * 5 # 初始化分片上传 upload_id = oss_client.init_multipart_upload(object_key).upload_id # 打开文件并读取内容 with open(file_path, 'rb') as file_obj: parts = [] while True: data = file_obj.read(part_size) if not data: break # 上传分片 result = oss_client.upload_part(object_key, upload_id, len(parts) + 1, data) parts.append(oss2.models.PartInfo(len(parts) + 1, result.etag)) # 完成分片上传 oss_client.complete_multipart_upload(object_key, upload_id, parts) class OssUploader: def __init__(self, access_key_id, access_key_secret, endpoint, region_code, datasource_id): self.access_key_id = access_key_id self.access_key_secret = access_key_secret self.endpoint = endpoint self.region_code = region_code self.datasource_id = datasource_id config = open_api_models.Config(access_key_id, access_key_secret) # Endpoint 请参考 https://api.aliyun.com/product/Rds config.endpoint = endpoint self.client = OpenApiClient(config) """ 注册备份集元数据 """ def configure_backup_set_info(self, req_param): params = open_api_models.Params( # 接口名称, action='ConfigureBackupSetInfo', # 接口版本, version='2021-01-01', # 接口协议, protocol='HTTPS', # 接口 HTTP 方法, method='POST', auth_type='AK', style='RPC', # 接口 PATH, pathname='/', # 接口请求体内容格式, req_body_type='json', # 接口响应体内容格式, body_type='json' ) # runtime options runtime = util_models.RuntimeOptions() request = open_api_models.OpenApiRequest( query=OpenApiUtilClient.query(req_param) ) # 返回值为 Map 类型,可从 Map 中获得三类数据:响应体 body、响应头 headers、HTTP 返回的状态码 statusCode。 print(f"ConfigureBackupSetInfo request: {req_param}") data = self.client.call_api(params, request, runtime) print(f"ConfigureBackupSetInfo response: {data}") return data['body']['Data'] """ 获取oss上传信息 """ def describe_bak_datasource_storage_access_info(self, req_param): params = open_api_models.Params( # 接口名称, action='DescribeBakDataSourceStorageAccessInfo', # 接口版本, version='2021-01-01', # 接口协议, protocol='HTTPS', # 接口 HTTP 方法, method='POST', auth_type='AK', style='RPC', # 接口 PATH, pathname='/', # 接口请求体内容格式, req_body_type='json', # 接口响应体内容格式, body_type='json' ) # runtime options runtime = util_models.RuntimeOptions() request = open_api_models.OpenApiRequest( query=OpenApiUtilClient.query(req_param) ) # 返回值为 Map 类型,可从 Map 中获得三类数据:响应体 body、响应头 headers、HTTP 返回的状态码 statusCode。 print(f"DescribeBakDataSourceStorageAccessInfo request: {req_param}") data = self.client.call_api(params, request, runtime) print(f"DescribeBakDataSourceStorageAccessInfo response: {data}") return data['body']['Data'] def _fetch_oss_access_info(self, params): info = self.describe_bak_datasource_storage_access_info({ 'RegionId': params['RegionId'], 'DataSourceId': params['DataSourceId'], 'RegionCode': params['RegionCode'], 'BackupType': params['BackupType'], 'BackupSetId': params['BackupSetId'] }) return info['OssAccessInfo'] def upload_and_register_backup_set(self, file_path, data_type, extra_meta): filename = os.path.basename(file_path) params = {'BackupMode': 'Automated', 'BackupMethod': 'Physical', 'RegionId': self.region_code, 'RegionCode': self.region_code, 'DataSourceId': self.datasource_id, 'BackupSetName': filename, 'ExtraMeta': extra_meta, 'BackupType': data_type, 'UploadStatus': 'WaitingUpload'} # 首次注册备份集,返回备份集ID data = self.configure_backup_set_info(params) params['BackupSetId'] = data['BackupSetId'] print(f"------ configure_backup_set_info success: {file_path}, {data_type}, {params['BackupSetId']}, WaitingUpload\n") # 上传数据到oss oss_info = self._fetch_oss_access_info(params) oss_client = create_oss_client(oss_info) upload_oss_file(oss_client, file_path, oss_info['ObjectKey']) print(f"------ upload_oss_file success: {file_path}, {data_type}, {params['BackupSetId']}\n") # 标记备份集上传完成 params['UploadStatus'] = 'UploadSuccess' self.configure_backup_set_info(params) print(f"------ configure_backup_set_info success: {file_path}, {data_type}, {params['BackupSetId']}, UploadSuccess\n") if __name__ == '__main__': args = init_command_args() uploader = OssUploader(args.access_key_id, args.access_key_secret, args.endpoint, args.region_code, args.datasource_id) # 全量和日志备份分别通过不同方式构造extraMeta extra_meta = '{}' if args.data_type == 'FullBackup': obj = {} if args.xtrabackup_log_path is not None: obj = xtrabackup_log_parser.analyze_slave_status(logpath=args.xtrabackup_log_path) elif args.xtrabackup_info_path is not None: parser = xtrabackup_info_parser.ExtraMetaParser(file_path=args.xtrabackup_info_path) obj = parser.get_extra_meta() extra_meta = {'BINLOG_FILE': obj.get('BINLOG_FILE'), 'version': obj.get("SERVER_VERSION"), 'dataBegin': date_to_unix_timestamp(obj.get("START_TIME")), 'dataEnd': date_to_unix_timestamp(obj.get("END_TIME")), 'consistentTime': int(date_to_unix_timestamp(obj.get("END_TIME")) / 1000)} extra_meta = json.dumps(extra_meta) elif args.data_type == 'LogBackup': obj = {'dataBegin': date_to_unix_timestamp(args.begin_time), 'dataEnd': date_to_unix_timestamp(args.end_time)} extra_meta = json.dumps(obj) print(f"get extra meta json string: {extra_meta}") # 上传数据,并注册备份集元信息 uploader.upload_and_register_backup_set(file_path=args.file_path, data_type=args.data_type, extra_meta=extra_meta)
xtrabackup_info_parser.py:使用备份文件xtrabackup_info进行元数据解析。
# -*- coding: utf-8 -*- # This file is auto-generated, don't edit it. Thanks. import re import json class ExtraMetaParser: def __init__(self, file_path): self.file_path = file_path pass def _parse_xtrabackup_info(self): config_data = {} with open(self.file_path, 'r') as file: for line in file: line = line.strip() if line and not line.startswith('#'): key, value = line.split('=', 1) config_data[key.strip()] = value.strip() return config_data def get_extra_meta(self): config_data = self._parse_xtrabackup_info() print(f"xtrabackup_info: {config_data}") binlog_pos = config_data.get("binlog_pos") pattern = f"filename '(.*)', position (.*)" match = re.search(pattern, binlog_pos) return {'BINLOG_FILE': match.group(1), 'SERVER_VERSION': config_data.get("server_version"), 'START_TIME': config_data.get("start_time"), 'END_TIME': config_data.get("end_time")}
xtrabackup_log_parser.py:使用备份产生的日志文件xtrabackup.log进行元数据解析。
#!/usr/bin/python # coding:utf8 import io import re import sys from datetime import datetime from six import binary_type, text_type def parse_date_part(date_part, time_part): """ 根据给定的日期部分和时间部分,解析并返回完整的日期时间字符串。 """ # 获取当前年份的前两位 current_century = datetime.now().strftime("%Y")[:2] year_short = date_part[:2] # 组合完整年份 year_full = current_century + year_short date_full = year_full + date_part[2:] datetime_str = date_full + " " + time_part dt = datetime.strptime(datetime_str, "%Y%m%d %H:%M:%S") formatted_datetime = dt.strftime("%Y-%m-%d %H:%M:%S") return text_type(formatted_datetime) def analyze_slave_status(logpath=None): slave_status = {} start_time_pattern = ( r"(\d{6}) (\d{2}:\d{2}:\d{2}) .*Connecting to MySQL server host:" ) """ 240925 17:46:58 completed OK! 240925 02:22:58 innobackupex: completed OK! 240925 02:22:58 xtrabackup: completed OK! """ end_time_pattern = r"(\d{6}) (\d{2}:\d{2}:\d{2}) .*completed OK!" with io.open(logpath, "rb") as fp: lines = fp.read().splitlines() for i in reversed(range(len(lines))): line = lines[i] if isinstance(line, binary_type): line = line.decode("utf-8") m = re.search(start_time_pattern, line) if m: # 提取日期和时间部分 date_part = m.group(1) time_part = m.group(2) slave_status["START_TIME"] = parse_date_part(date_part, time_part) continue m = re.search(r"Using server version (\S*)", line) if m: slave_status["SERVER_VERSION"] = text_type(m.group(1)) continue m = re.search("MySQL binlog position:", line) if m: binlog_line = line m = re.search(r"filename '(\S*)'", binlog_line) if m: slave_status["BINLOG_FILE"] = text_type(m.group(1)) m = re.search(r"position (\d+)", binlog_line) m2 = re.search(r"position '(\d+)'", binlog_line) if m: try: slave_status["BINLOG_POS"] = int(m.group(1)) except ValueError: pass elif m2: try: slave_status["BINLOG_POS"] = int(m2.group(1)) except ValueError: pass continue m = re.search("consistent_time (\d+)", line) if m: try: slave_status["CONSISTENT_TIME"] = int(m.group(1)) except ValueError: pass continue m = re.search(end_time_pattern, line) if m: date_part = m.group(1) time_part = m.group(2) slave_status["END_TIME"] = parse_date_part(date_part, time_part) continue return slave_status if __name__ == "__main__": logpath = sys.argv[1] slave_status = analyze_slave_status(logpath) print(slave_status)
Bash脚本处理流程
配置阿里云AccessKeyId和AccessKeySecret、DBS OpenAPI Endpoint,以及对应的数据源ID。
#!/bin/bash ## 阿里云账号AccessKey拥有所有API的访问权限,风险很高。强烈建议您创建并使用RAM用户进行API访问或日常运维 ## 强烈建议不要把 AccessKey 和 AccessKeySecret 保存到代码里,会存在密钥泄漏风险,您可以根据业务需要,保存到配置文件里 AK=<ALIBABA_CLOUD_ACCESS_KEY_ID> SK=<ALIBABA_CLOUD_ACCESS_KEY_SECRET> DBS_API=dbs-api.<您的数据源所在地域ID,例如cn-hangzhou>.aliyuncs.com DATASOURCE_ID=<您的数据源ID>
执行xtrabackup备份命令,将备份数据保存到指定的目录,同时将错误日志输出到xtrabackup.log文件中。
## 获取脚本当前路径 BASE_PATH=$(cd `dirname $0`; pwd) TS=`date +%Y%m%d_%H%M%S` XTRA_DATA_DIR=~/tool/xtrabackup_data/$TS mkdir -p $XTRA_DATA_DIR ## 执行xtrabackup备份命令,将错误日志输出的到xtrabackup.log文件 ~/innobackupex --defaults-file=/etc/my.cnf --user='<您的数据库账号,例如root>' --password='<您的数据库密码,例如root>' --host='<您的数据库IP,例如localhost>' --port=<您的数据库端口号,例如3306> --socket='/var/lib/mysql/mysql.sock' --parallel=4 $XTRA_DATA_DIR/ 2>$XTRA_DATA_DIR/xtrabackup.log
后续将解析xtrabackup.log文件获得全量备份集的元数据信息:备份开始时间、备份结束时间、一致性时间点、对应Binlog名称。
限制说明:
当前执行xtrabackup备份命令,暂不支持压缩和加密参数(--compress --compress-threads和--encrypt --encrypt-key-file --encrypt-threads),使用xtrabackup备份命令时请先删除对应的参数。
将文件目录形式的全量备份数据进行gzip压缩和打包成tar.gz格式的文件。
## 获取当前xtrabackup备份的目录 backup_dir=`ls $XTRA_DATA_DIR | grep -v xtrabackup.log | head -n1` echo -e "\033[33mexecute innobackupex success, backup_dir: $backup_dir" && echo -n -e "\033[0m" && chmod 755 $XTRA_DATA_DIR/$backup_dir ## 打包成tar.gz文件 cd $XTRA_DATA_DIR/$backup_dir && tar -czvf ../$backup_dir.tar.gz . && file ../$backup_dir.tar.gz echo -e "\033[33mpackage to $backup_dir.tar.gz" && echo -n -e "\033[0m" && sleep 2
限制说明:当前仅支持文件格式的备份数据上传,支持如下格式:
tar:目录文件进行tar打包。
tar.gz:目录文件进行tar打包,然后进行gzip压缩。
注意事项:
使用tar命令打包前,需要chmod 755命令修改文件目录权限。
需进入文件夹根目录,进行tar命令的打包和压缩。
调用Python脚本upload_and_register_backup_set.py进行备份数据的上传,同时注册备份集元数据。
## 调用Python脚本进行上传备份数据,并注册备份集元数据 python3 $BASE_PATH/upload_and_register_backup_set.py --access_key_id $AK --access_key_secret $SK --endpoint $DBS_API --datasource_id $DATASOURCE_ID --region_code=cn-hangzhou --data_type=FullBackup --file_path=$XTRA_DATA_DIR/$backup_dir.tar.gz --xtrabackup_log_path=$XTRA_DATA_DIR/xtrabackup.log
参数
说明
--access_key_id
阿里云AccessKeyId。
--access_key_secret
阿里云AccessKeySecret。
--endpoint
DBS OpenAPI对应的Endpoint。请参见服务接入点。
--datasource_id
DBS对应的数据源ID。
--region_code
对应地域信息。
--data_type
备份数据类型,全量备份集:FullBackup,日志备份:LogBackup。
--file_path
全量备份数据路径。
--xtrabackup_log_path
全量备份xtrabackup命令产生的xtrabackup.log文件路径。
Python脚本处理流程
main函数主入口:
根据data_type传参为FullBackug或者LogBackup,分别构造元数据注册依赖的extra_meta信息:
说明BINLOG_FILE:对应Binlog名称。
dataBegin:备份开始时间,毫秒级别时间戳。
dataEnd:备份结束时间,毫秒级别时间戳。
consistentTime:一致性时间点,秒级时间戳。
全量备份extra_meta格式:
{ 'BINLOG_FILE':'mysql-bin.001', 'version':'5.5', 'dataBegin':17274********, 'dataEnd':17274********, 'consistentTime':17274******** }
日志备份extra_meta格式:
{ 'dataBegin':17274********, 'dataEnd':17274******** }
调用OssUploader.upload_and_register_backup_set方法进行备份数据上传和元数据注册。
if __name__ == '__main__': args = init_command_args() uploader = OssUploader(args.access_key_id, args.access_key_secret, args.endpoint, args.region_code, args.datasource_id) # 全量和日志备份分别通过不同方式构造extraMeta extra_meta = '{}' if args.data_type == 'FullBackup': obj = {} if args.xtrabackup_log_path is not None: obj = xtrabackup_log_parser.analyze_slave_status(logpath=args.xtrabackup_log_path) elif args.xtrabackup_info_path is not None: parser = xtrabackup_info_parser.ExtraMetaParser(file_path=args.xtrabackup_info_path) obj = parser.get_extra_meta() extra_meta = {'BINLOG_FILE': obj.get('BINLOG_FILE'), 'version': obj.get("SERVER_VERSION"), 'dataBegin': date_to_unix_timestamp(obj.get("START_TIME")), 'dataEnd': date_to_unix_timestamp(obj.get("END_TIME")), 'consistentTime': int(date_to_unix_timestamp(obj.get("END_TIME")) / 1000)} extra_meta = json.dumps(extra_meta) elif args.data_type == 'LogBackup': obj = {'dataBegin': date_to_unix_timestamp(args.begin_time), 'dataEnd': date_to_unix_timestamp(args.end_time)} extra_meta = json.dumps(obj) print(f"get extra meta json string: {extra_meta}") # 上传数据,并注册备份集元信息 uploader.upload_and_register_backup_set(file_path=args.file_path, data_type=args.data_type, extra_meta=extra_meta)
OssUploader.upload_and_register_backup_set方法备份数据上传流程。
class OssUploader: def __init__(self, access_key_id, access_key_secret, endpoint, region_code, datasource_id): self.access_key_id = access_key_id self.access_key_secret = access_key_secret self.endpoint = endpoint self.region_code = region_code self.datasource_id = datasource_id config = open_api_models.Config(access_key_id, access_key_secret) # Endpoint 请参考 https://api.aliyun.com/product/Rds config.endpoint = endpoint self.client = OpenApiClient(config) """ 注册备份集元数据 """ def configure_backup_set_info(self, req_param): params = open_api_models.Params( # 接口名称, action='ConfigureBackupSetInfo', # 接口版本, version='2021-01-01', # 接口协议, protocol='HTTPS', # 接口 HTTP 方法, method='POST', auth_type='AK', style='RPC', # 接口 PATH, pathname='/', # 接口请求体内容格式, req_body_type='json', # 接口响应体内容格式, body_type='json' ) # runtime options runtime = util_models.RuntimeOptions() request = open_api_models.OpenApiRequest( query=OpenApiUtilClient.query(req_param) ) # 返回值为 Map 类型,可从 Map 中获得三类数据:响应体 body、响应头 headers、HTTP 返回的状态码 statusCode。 print(f"ConfigureBackupSetInfo request: {req_param}") data = self.client.call_api(params, request, runtime) print(f"ConfigureBackupSetInfo response: {data}") return data['body']['Data'] """ 获取oss上传信息 """ def describe_bak_datasource_storage_access_info(self, req_param): params = open_api_models.Params( # 接口名称, action='DescribeBakDataSourceStorageAccessInfo', # 接口版本, version='2021-01-01', # 接口协议, protocol='HTTPS', # 接口 HTTP 方法, method='POST', auth_type='AK', style='RPC', # 接口 PATH, pathname='/', # 接口请求体内容格式, req_body_type='json', # 接口响应体内容格式, body_type='json' ) # runtime options runtime = util_models.RuntimeOptions() request = open_api_models.OpenApiRequest( query=OpenApiUtilClient.query(req_param) ) # 返回值为 Map 类型,可从 Map 中获得三类数据:响应体 body、响应头 headers、HTTP 返回的状态码 statusCode。 print(f"DescribeBakDataSourceStorageAccessInfo request: {req_param}") data = self.client.call_api(params, request, runtime) print(f"DescribeBakDataSourceStorageAccessInfo response: {data}") return data['body']['Data'] def _fetch_oss_access_info(self, params): info = self.describe_bak_datasource_storage_access_info({ 'RegionId': params['RegionId'], 'DataSourceId': params['DataSourceId'], 'RegionCode': params['RegionCode'], 'BackupType': params['BackupType'], 'BackupSetId': params['BackupSetId'] }) return info['OssAccessInfo'] def upload_and_register_backup_set(self, file_path, data_type, extra_meta): filename = os.path.basename(file_path) params = {'BackupMode': 'Automated', 'BackupMethod': 'Physical', 'RegionId': self.region_code, 'RegionCode': self.region_code, 'DataSourceId': self.datasource_id, 'BackupSetName': filename, 'ExtraMeta': extra_meta, 'BackupType': data_type, 'UploadStatus': 'WaitingUpload'} # 首次注册备份集,返回备份集ID data = self.configure_backup_set_info(params) params['BackupSetId'] = data['BackupSetId'] print(f"------ configure_backup_set_info success: {file_path}, {data_type}, {params['BackupSetId']}, WaitingUpload\n") # 上传数据到oss oss_info = self._fetch_oss_access_info(params) oss_client = create_oss_client(oss_info) upload_oss_file(oss_client, file_path, oss_info['ObjectKey']) print(f"------ upload_oss_file success: {file_path}, {data_type}, {params['BackupSetId']}\n") # 标记备份集上传完成 params['UploadStatus'] = 'UploadSuccess' self.configure_backup_set_info(params) print(f"------ configure_backup_set_info success: {file_path}, {data_type}, {params['BackupSetId']}, UploadSuccess\n")