upload_and_register_backup_set.py:用于上传全量和日志备份数据,解析对应的元数据,并进行备份集元数据的注册。
# -*- coding: utf-8 -*-
# This file is auto-generated, don't edit it. Thanks.
import os
import argparse
import re
import time
import json
from datetime import datetime
import oss2
from alibabacloud_openapi_util.client import Client as OpenApiUtilClient
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_tea_openapi.client import Client as OpenApiClient
from alibabacloud_tea_util import models as util_models
import xtrabackup_info_parser
import xtrabackup_log_parser
def init_command_args():
parser = argparse.ArgumentParser(description="A sample command-line parser.")
parser.add_argument("--access_key_id", help="Aliyun AccessKeyId.")
parser.add_argument("--access_key_secret", help="Aliyun AccessKeySecret.")
parser.add_argument("--endpoint", help="Aliyun API Endpoint.")
parser.add_argument("--region_code", help="Aliyun DataSource RegionCode.")
parser.add_argument("--datasource_id", help="Aliyun DataSourceId.")
parser.add_argument("--data_type", help="BackupSet DataType: FullBackup | LogBackup.")
parser.add_argument("--file_path", help="BackupSet File Path.")
parser.add_argument("--xtrabackup_info_path", help="Xtrabackup Info Path.")
parser.add_argument("--xtrabackup_log_path", help="Xtrabackup Log Path.")
parser.add_argument("--begin_time", help="Binlog Begin Time.")
parser.add_argument("--end_time", help="Binlog End Time.")
args = parser.parse_args()
if args.access_key_id:
print(f"access_key_id: ************")
if args.access_key_secret:
print(f"access_key_secret: ************")
if args.endpoint:
print(f"endpoint: {args.endpoint}")
if args.region_code:
print(f"region_code: {args.region_code}")
if args.datasource_id:
print(f"datasource_id: {args.datasource_id}")
if args.data_type:
print(f"data_type: {args.data_type}")
if args.file_path:
print(f"file_path: {args.file_path}")
if args.xtrabackup_info_path:
print(f"xtrabackup_info_path: {args.xtrabackup_info_path}")
if args.xtrabackup_log_path:
print(f"xtrabackup_log_path: {args.xtrabackup_log_path}")
if args.begin_time:
print(f"begin_time: {args.begin_time}")
if args.end_time:
print(f"end_time: {args.end_time}")
print('\n')
return args
def date_to_unix_timestamp(date_str):
dt_obj = datetime.strptime(date_str, "%Y-%m-%d %H:%M:%S")
# 使用.time()方法得到时间元组,然后用time.mktime转换为秒级时间戳
timestamp_seconds = time.mktime(dt_obj.timetuple())
return int(timestamp_seconds) * 1000
def create_oss_client(params):
# 阿里云OSS认证信息
access_key_id = params['AccessKeyId']
access_key_secret = params['AccessKeySecret']
security_token = params['SecurityToken']
bucket_name = params['BucketName']
endpoint = params['OssEndpoint']
# 初始化OSS客户端
auth = oss2.StsAuth(access_key_id, access_key_secret, security_token)
return oss2.Bucket(auth, endpoint, bucket_name)
def upload_oss_file(oss_client, file_path, object_key):
"""
分片上传大文件到OSS
:param oss_client:
:param file_path: 本地文件路径
:param object_key: OSS中的对象键,即文件名
"""
# 设置分片大小,单位为字节,默认1MB
part_size = 1024 * 1024 * 5
# 初始化分片上传
upload_id = oss_client.init_multipart_upload(object_key).upload_id
# 打开文件并读取内容
with open(file_path, 'rb') as file_obj:
parts = []
while True:
data = file_obj.read(part_size)
if not data:
break
# 上传分片
result = oss_client.upload_part(object_key, upload_id, len(parts) + 1, data)
parts.append(oss2.models.PartInfo(len(parts) + 1, result.etag))
# 完成分片上传
oss_client.complete_multipart_upload(object_key, upload_id, parts)
class OssUploader:
def __init__(self, access_key_id, access_key_secret, endpoint, region_code, datasource_id):
self.access_key_id = access_key_id
self.access_key_secret = access_key_secret
self.endpoint = endpoint
self.region_code = region_code
self.datasource_id = datasource_id
config = open_api_models.Config(access_key_id, access_key_secret)
# Endpoint 请参考 https://api.aliyun.com/product/Rds
config.endpoint = endpoint
self.client = OpenApiClient(config)
"""
注册备份集元数据
"""
def configure_backup_set_info(self, req_param):
params = open_api_models.Params(
# 接口名称,
action='ConfigureBackupSetInfo',
# 接口版本,
version='2021-01-01',
# 接口协议,
protocol='HTTPS',
# 接口 HTTP 方法,
method='POST',
auth_type='AK',
style='RPC',
# 接口 PATH,
pathname='/',
# 接口请求体内容格式,
req_body_type='json',
# 接口响应体内容格式,
body_type='json'
)
# runtime options
runtime = util_models.RuntimeOptions()
request = open_api_models.OpenApiRequest(
query=OpenApiUtilClient.query(req_param)
)
# 返回值为 Map 类型,可从 Map 中获得三类数据:响应体 body、响应头 headers、HTTP 返回的状态码 statusCode。
print(f"ConfigureBackupSetInfo request: {req_param}")
data = self.client.call_api(params, request, runtime)
print(f"ConfigureBackupSetInfo response: {data}")
return data['body']['Data']
"""
获取oss上传信息
"""
def describe_bak_datasource_storage_access_info(self, req_param):
params = open_api_models.Params(
# 接口名称,
action='DescribeBakDataSourceStorageAccessInfo',
# 接口版本,
version='2021-01-01',
# 接口协议,
protocol='HTTPS',
# 接口 HTTP 方法,
method='POST',
auth_type='AK',
style='RPC',
# 接口 PATH,
pathname='/',
# 接口请求体内容格式,
req_body_type='json',
# 接口响应体内容格式,
body_type='json'
)
# runtime options
runtime = util_models.RuntimeOptions()
request = open_api_models.OpenApiRequest(
query=OpenApiUtilClient.query(req_param)
)
# 返回值为 Map 类型,可从 Map 中获得三类数据:响应体 body、响应头 headers、HTTP 返回的状态码 statusCode。
print(f"DescribeBakDataSourceStorageAccessInfo request: {req_param}")
data = self.client.call_api(params, request, runtime)
print(f"DescribeBakDataSourceStorageAccessInfo response: {data}")
return data['body']['Data']
def _fetch_oss_access_info(self, params):
info = self.describe_bak_datasource_storage_access_info({
'RegionId': params['RegionId'],
'DataSourceId': params['DataSourceId'],
'RegionCode': params['RegionCode'],
'BackupType': params['BackupType'],
'BackupSetId': params['BackupSetId']
})
return info['OssAccessInfo']
def upload_and_register_backup_set(self, file_path, data_type, extra_meta):
filename = os.path.basename(file_path)
params = {'BackupMode': 'Automated', 'BackupMethod': 'Physical', 'RegionId': self.region_code,
'RegionCode': self.region_code, 'DataSourceId': self.datasource_id, 'BackupSetName': filename,
'ExtraMeta': extra_meta, 'BackupType': data_type, 'UploadStatus': 'WaitingUpload'}
# 首次注册备份集,返回备份集ID
data = self.configure_backup_set_info(params)
params['BackupSetId'] = data['BackupSetId']
print(f"------ configure_backup_set_info success: {file_path}, {data_type}, {params['BackupSetId']}, WaitingUpload\n")
# 上传数据到oss
oss_info = self._fetch_oss_access_info(params)
oss_client = create_oss_client(oss_info)
upload_oss_file(oss_client, file_path, oss_info['ObjectKey'])
print(f"------ upload_oss_file success: {file_path}, {data_type}, {params['BackupSetId']}\n")
# 标记备份集上传完成
params['UploadStatus'] = 'UploadSuccess'
self.configure_backup_set_info(params)
print(f"------ configure_backup_set_info success: {file_path}, {data_type}, {params['BackupSetId']}, UploadSuccess\n")
if __name__ == '__main__':
args = init_command_args()
uploader = OssUploader(args.access_key_id, args.access_key_secret,
args.endpoint, args.region_code, args.datasource_id)
# 全量和日志备份分别通过不同方式构造extraMeta
extra_meta = '{}'
if args.data_type == 'FullBackup':
obj = {}
if args.xtrabackup_log_path is not None:
obj = xtrabackup_log_parser.analyze_slave_status(logpath=args.xtrabackup_log_path)
elif args.xtrabackup_info_path is not None:
parser = xtrabackup_info_parser.ExtraMetaParser(file_path=args.xtrabackup_info_path)
obj = parser.get_extra_meta()
extra_meta = {'BINLOG_FILE': obj.get('BINLOG_FILE'),
'version': obj.get("SERVER_VERSION"),
'dataBegin': date_to_unix_timestamp(obj.get("START_TIME")),
'dataEnd': date_to_unix_timestamp(obj.get("END_TIME")),
'consistentTime': int(date_to_unix_timestamp(obj.get("END_TIME")) / 1000)}
extra_meta = json.dumps(extra_meta)
elif args.data_type == 'LogBackup':
obj = {'dataBegin': date_to_unix_timestamp(args.begin_time),
'dataEnd': date_to_unix_timestamp(args.end_time)}
extra_meta = json.dumps(obj)
print(f"get extra meta json string: {extra_meta}")
# 上传数据,并注册备份集元信息
uploader.upload_and_register_backup_set(file_path=args.file_path, data_type=args.data_type, extra_meta=extra_meta)