通过HBR实现NAS容灾方案

无川
  • 收获赞:6
  • 擅长领域:这个同学很专业,但是有点神秘哟~

本文介绍如何基于NAS+云上备份HBR实现云文件数据容灾。

概述

越来越多的企业在数字化转型中,将业务系统迁移上云或基于搭建原生的系统,并在进程中选择云上形态的容灾建设,既解了原来线下灾备IDC、网络建设时间和成本的问题,也可享受云上各种资源的便捷。业务容灾的核心是数据容灾,线下与线上容灾一样,都会涉及结构化数据和非结构数据的容灾,阿里云块存储EBS和对象存储OSS已经支持复制,但部分应用系统的非结构化数据基于性能或应用系统的改造成本考虑,数据还是采用NAS存储。当前阿里云通用NAS还不支持复制,无法直接基于NAS存储自身能力容灾,本文介绍如何基于NAS+云上备份HBR实现云文件数据容灾。

方案介绍

业务系统的容灾包含多个层级,一般是应用主机(ECS)、数据库(PolarDB-X)、存储(通用NAS)三个层级的容灾,灾备方案结合负载均衡SLB、多可用区 ECS云服务器和云数据库 PolarDB-X实现了业务系统、数据库系统的高可用功能。该方案结合阿云备份 HBR+开发恢复脚本,将备份库数据定期恢复到灾备NAS,实现了NAS存储文件的同城容灾,本文重点介绍云上NAS的容灾。

适用场景

阿里云上NAS存储数据容灾

方案架构

方案优势 

应用容灾

为了用户提供更加稳定可靠的负载均衡服务,阿里云SLB已在大部分地域部署了多可用区以实现同地域下的跨机房容灾。当主可用区的机房故障或不可用时, SLB仍然有能力在非常短的时间内(约30秒)切换到另外一个备可用区的机房并恢复业务;当主可用区恢复时, SLB自动切换到主可用区的机房提供服务。

应用容灾

数据库容灾

PolarDB-X跨AZ的容灾可以通过DTS实现, 访问方式:PolarDB-X实例跨机房主从部署,通过跨机房SLB对外宣告一个VIP,ECS通过域名方式访问PolarDB-X实例。切换过程中,域名和VIP不变,对业务无感知。高可用:正常高可用切换时,通过HA监控两站点的数据库状态,完成正常主备库切换。可以参考:通过DTS实现PolarDB-X跨AZ的容灾

NAS容灾方案实施

前提条件

在主可用区和备可用区分别创建用于需容灾的生产NAS文件系统和灾备NAS两个文件系统,挂载到业务系统上

提前申请一台ECS用于部署恢复脚本,将备份库数据定期恢复到灾备NAS

操作步骤

步骤一:对生产NAS创建备份计划

  1. 登录混合云备份管理控制台

  2. 在左侧导航栏,选择备份 > NAS备份

  3. 在顶部菜单栏左上角,选择所在地域。

  4. 在阿里云NAS页签,右上角单击备份文件系统

  5. 在备份文件系统面板,按照如下说明填写各项参数,然后单击确定

  6. 具体可以参考NAS备份

步骤二:部署恢复脚本,将数据从备份库恢复到灾备NAS文件系统

  1. 利用定时脚本将HBR中备份的文件同步至备可用区HBR备份库中

  2. 将脚本部署在备可用区的ECS,并将灾备NAS通过NFS挂载至该ECS,具备写入权限

备份库恢复脚本为:

#  coding=utf-8
import json
import logging
import time
import os

from aliyunsdkcore import client

from aliyunsdkhbr.request.v20170908.DescribeTaskRequest import DescribeTaskRequest

from aliyunsdkhbr.request.v20170908.DescribeBackupJobs2Request import DescribeBackupJobs2Request
from aliyunsdkhbr.request.v20170908.GetNasToRestoreRequest import GetNasToRestoreRequest
from aliyunsdkhbr.request.v20170908.SearchHistoricalSnapshotsRequest import SearchHistoricalSnapshotsRequest
from aliyunsdkhbr.request.v20170908.DescribeNasFileSystemsRequest import DescribeNasFileSystemsRequest
from aliyunsdkhbr.request.v20170908.CreateRestoreJobRequest import CreateRestoreJobRequest
from aliyunsdkhbr.request.v20170908.DescribeRestoreJobs2Request import DescribeRestoreJobs2Request
from aliyunsdkhbr.request.v20170908.DescribeBackupPlansRequest import DescribeBackupPlansRequest

logging.basicConfig(filename='hbr_nas_task.log', level=logging.INFO,
                    format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
                    datefmt='%a, %d %b %Y %H:%M:%S')

NAS_ID = 'source_nas_id'
RESTORE_NAS_ID = 'target_nas_id'
AK_ID = 'akID'
AK_SK = 'akSK'
REGION_ID = 'regionID'

clt = client.AcsClient(AK_ID, AK_SK, REGION_ID)


# 查询nas备份计划
def describe_backup_plans(nas_id):
    request = DescribeBackupPlansRequest()
    request.set_SourceType('NAS')
    request.set_Filterss([{'Key': 'FileSystemId', 'Values': [nas_id, ]}, ])
    request.set_PageNumber(1)
    request.set_PageSize(10)

    response = _send_request(request)
    return response


# 查询nas备份任务
def nas_backup_jobs(nas_id):
    request = DescribeBackupJobs2Request()
    request.set_SourceType('NAS')
    request.set_Filterss([{'Key': 'FileSystemId', 'Values': [nas_id, ]}, ])
    request.set_PageNumber(1)
    request.set_PageSize(10)

    response = _send_request(request)
    return response


# 获取备份
def search_historical_snapshots(nas_id, vault_id, nas_c_time):
    data = [
        {
            'field': 'vaultId',
            'value': vault_id,
            'operation': 'MATCH_TERM'
        },
        {
            'field': 'fileSystemId',
            'value': nas_id,
            'operation': 'MATCH_TERM'
        },
        {
            'field': 'createTime',
            'value': nas_c_time,
            'operation': 'MATCH_TERM'
        }
    ]
    request = SearchHistoricalSnapshotsRequest()
    request.set_SourceType('NAS')
    request.set_Limit(10)
    request.set_Query(json.dumps(data))
    request.set_NextToken('')

    response = _send_request(request)
    return response


def describe_nas_filesystems():
    request = DescribeNasFileSystemsRequest()
    response = _send_request(request)
    return response


# 创建nas恢复任务
def create_restore_job(snapshot_id, snapshot_hash, vault_id, nas_id):
    nas_response = describe_nas_filesystems()
    restore_nas_c_time = ''
    for i in nas_response.get('FileSystems').get('FileSystem'):
        if nas_id == i.get('FileSystemId'):
            restore_nas_c_time = i.get('CreateTime')
    data = {
        'includes': [],
        'excludes': []
    }
    request = CreateRestoreJobRequest()
    request.set_SourceType('NAS')
    request.set_RestoreType('NAS')
    request.set_VaultId(vault_id)
    request.set_SnapshotId(snapshot_id)
    request.set_SnapshotHash(snapshot_hash)
    request.set_TargetPath('/')
    request.set_TargetFileSystemId(nas_id)
    request.set_TargetCreateTime(restore_nas_c_time)
    request.set_Options(json.dumps(data))

    response = _send_request(request)
    return response


# 查询恢复任务
def describe_restore_jobs(nas_id):
    request = DescribeRestoreJobs2Request()
    request.set_RestoreType('NAS')
    request.set_Filterss([{'Key': 'TargetFileSystemId', 'Values': [nas_id, ]}, ])
    request.set_PageNumber(1)
    request.set_PageSize(10)
    response = _send_request(request)
    return response


# 查询进行中的恢复任务
def describe_running_restore_jobs(nas_id):
    request = DescribeRestoreJobs2Request()
    request.set_RestoreType('NAS')
    request.set_Filterss([{'Key': 'TargetFileSystemId', 'Values': [nas_id, ]}, {'Key': 'Status', 'Values': ['RUNNING', ]}])
    request.set_PageNumber(1)
    request.set_PageSize(10)
    response = _send_request(request)
    return response


def my_describe_task(task_id):
    request = DescribeTaskRequest()
    request.set_TaskId(task_id)
    time.sleep(60)
    response = _send_request(request)
    while response.get('Progress') not in [-1, 100]:
        time.sleep(30)
        response = _send_request(request)
    return response


# send open api request
def _send_request(request):
    request.set_accept_format('json')
    try:
        response_str = clt.do_action_with_exception(request)
        logging.info(response_str)
        response_detail = json.loads(response_str)
        return response_detail
    except Exception as e:
        logging.error(e)


# 记录并验证历史备份是否执行校验
def check_history_job(nas_id, job_id):
    res = 0
    if os.path.exists(nas_id):
        with open(nas_id) as nas_job_f:
            if (job_id+'\n') in nas_job_f.readlines():
                res = 1
    with open(nas_id, 'a+') as nas_job_f:
        nas_job_f.write(job_id)
        nas_job_f.write('\n')
    return res


def hbr_nas_task(backup_nas, restore_nas):
    logging.info('Do hbr_nas_task!')
    plan_response = describe_backup_plans(backup_nas)
    if plan_response.get('TotalCount'):
        logging.info('NAS系统({})存在{}个备份计划。'.format(backup_nas, plan_response.get('TotalCount')))
    else:
        logging.info('NAS系统({})无备份计划。'.format(backup_nas))
        return
    backup_job_response = nas_backup_jobs(backup_nas)
    if backup_job_response.get('TotalCount') and \
            backup_job_response.get('BackupJobs').get('BackupJob')[0].get('Status') == 'COMPLETE':
        logging.info('NAS系统({})最新备份任务完成。'.format(backup_nas))
        if check_history_job(backup_nas, backup_job_response.get('BackupJobs').get('BackupJob')[0].get('JobId')):
            logging.info('该备份已创建恢复任务。')
            return
    else:
        logging.info('NAS系统({})最新备份任务未完成。'.format(backup_nas))
        return
    restore_running_job_response = describe_running_restore_jobs(restore_nas)
    if restore_running_job_response.get('TotalCount') > 0:
        logging.info('NAS系统({})有恢复任务进行中,此次不新建恢复任务。'.format(restore_nas))
        return
    snapshot_response = search_historical_snapshots(
        backup_nas, plan_response.get('BackupPlans').get('BackupPlan')[0].get('VaultId'),
        str(plan_response.get('BackupPlans').get('BackupPlan')[0].get('CreateTime')))
    if snapshot_response.get('Snapshots').get('Snapshot')[0].get('JobId') != \
            backup_job_response.get('BackupJobs').get('BackupJob')[0].get('JobId'):
        logging.info('NAS系统({})有更新备份任务完成,需要重新执行。'.format(backup_nas))
        return 1
    restore_create_response = \
        create_restore_job(snapshot_response.get('Snapshots').get('Snapshot')[0].get('SnapshotId'),
                           snapshot_response.get('Snapshots').get('Snapshot')[0].get('SnapshotHash'),
                           snapshot_response.get('Snapshots').get('Snapshot')[0].get('VaultId'),
                           restore_nas)
    logging.info('恢复任务{}创建成功。'.format(restore_create_response.get('RestoreId')))
    restore_job_response = describe_restore_jobs(restore_nas)
    logging.info('恢复任务ID:{},恢复任务状态{}'
                 .format(restore_job_response.get('RestoreJobs').get('RestoreJob')[0].get('RestoreId'),
                         restore_job_response.get('RestoreJobs').get('RestoreJob')[0].get('Status')))


if __name__ == '__main__':
    logging.info("Do HBR by OpenApi!")
    hbr_nas_task(NAS_ID, RESTORE_NAS_ID)