本文向您介绍将定时SQL任务同步到其他Project中。
配置文件说明
[
{
"target_schedule_sql_config": {
"s_sql_job_name": "sql-1704166982-166294",
"project": "sls-ml-demo",
"endpoint": "cn-chengdu.log.aliyuncs.com"
},
"newly_job_config": {
"description": "",
"displayName": "",
"fromTime": 0,
"toTime": 0,
"source": {
"project": "sls-ml-demo",
"logstore": "cdn_access_log",
"endpoint": "cn-chengdu.log.aliyuncs.com",
"roleArn": "acs:ram::xxxxx:role/aliyunlogetlrole"
},
"destination": {
"project": "sls-ml-demo",
"logstore": "test_temp",
"endpoint": "cn-chengdu-intranet.log.aliyuncs.com",
"roleArn": "acs:ram::xxxxx:role/aliyunlogetlrole"
}
}
}
]
以上配置内容中:
target_schedule_sql_config
字段:描述原始定时SQL任务的基本信息。其中s_sql_job_name
为任务名,您可以通过控制台查看定时SQL任务基础信息。newly_job_config
字段:描述将上述定时SQL任务复制到哪个Project中。description
和displayName
字段如果为空字符串,则使用原始的Job的相关字段进行填充fromTime
和toTime
字段用来设置,新创建任务的开始时间和结束时间,如果设置的值小于等于零,则复用原始Job的时间区间。source
字段用来设置,新的Job创建在哪个Project中。destination
字段用来设置,新的Job中的SQL结果数据存储到哪个Logstore中。如果destination
和source
在相同的Region,则将destination
中的endpoint
设置为私网地址,可以降低流量费用。roleArn
字段代表写目标授权角色ARN。
示例代码
# -*- coding: utf-8 -*-
import json
import time
from aliyun.log import *
from aliyun.log.scheduled_sql import *
global_ak_id = ""
global_ak_key = ""
client_map = {}
def get_sls_client(endpoint: str) -> LogClient:
sls_client = LogClient(endpoint, global_ak_id, global_ak_key)
if endpoint in client_map.keys():
return client_map[endpoint]
client_map[endpoint] = sls_client
return sls_client
def check_store_item(store_item: dict):
key_names = ["project", "logstore", "endpoint"]
for key in key_names:
if key not in store_item.keys():
raise ValueError(f"logstore config miss key {key}")
if len(store_item[key]) == "":
raise ValueError(f"logstore config miss value [{key}]")
def get_schedule_sql_job_config(endpoint: str, project: str, ssql_job_name: str) -> dict:
sls_client = get_sls_client(endpoint)
ssql_job_resp = sls_client.get_scheduled_sql(project, ssql_job_name)
ssql_job = ssql_job_resp.get_scheduled_sql()
# print(type(ssql_job))
# print(json.dumps(ssql_job))
return ssql_job
def make_schedule_sql_name() -> str:
# sql-1704166982-166294
now_stamp = int(time.time())
postfix = time.time_ns() % 1000000
job_name = f"sql-{now_stamp}-{postfix}"
return job_name
def create_schedule_sql(s_sql_config: dict):
"""
1. 仅拷贝原始的S-SQL中的Query部分的配置,在新建任务中,需要确认对应的开始和结束时间
2. 要判断下 source 和 destination 中的 logstore 是否在相同的Region,如果是相同的Region,则使用内网地址;
如果是不同Region则需要使用公网地址,但是公网地址会产生费用
"""
def make_scheduled_sql_schedule(origin_ssql_job: dict):
origin_job_schedule = origin_ssql_job["schedule"]
job_schedule = JobSchedule()
job_schedule.setJobName("")
job_schedule.setDisplayName("")
job_schedule.setDescription("")
job_schedule.setType(origin_job_schedule["type"])
job_schedule.setInterval(origin_job_schedule["interval"])
job_schedule.setDelay(origin_job_schedule["delay"])
job_schedule.setRunImmediately(origin_job_schedule["runImmediately"])
if "hour" in origin_job_schedule.keys():
job_schedule.setHour(origin_job_schedule["hour"])
if "dayOfWeek" in origin_job_schedule.keys():
job_schedule.setDayOfWeek(origin_job_schedule["dayOfWeek"])
if "timeZone" in origin_job_schedule.keys():
job_schedule.setTimeZone(origin_job_schedule["timeZone"])
if "cronExpression" in origin_job_schedule.keys():
job_schedule.setCronExpression(origin_job_schedule["cronExpression"])
return job_schedule
def make_scheduled_sql_config(origin_ssql_job: dict, from_time: int, to_time: int, source_store_config: dict, dest_store_config: dict):
source_role_arn = source_store_config["roleArn"]
dest_role_arn = dest_store_config["roleArn"]
origin_job_config = origin_ssql_job["configuration"]
schedule_sql_config = ScheduledSQLConfiguration()
schedule_sql_config.setScript(origin_job_config["script"])
schedule_sql_config.setSqlType(origin_job_config["sqlType"])
schedule_sql_config.setRoleArn(origin_job_config["roleArn"])
if len(source_role_arn) > 0:
schedule_sql_config.setRoleArn(source_role_arn)
schedule_sql_config.setDestRoleArn(origin_job_config["destRoleArn"])
if len(dest_role_arn) > 0:
schedule_sql_config.setDestRoleArn(dest_role_arn)
schedule_sql_config.setSourceLogstore(origin_job_config["sourceLogstore"])
if len(source_store_config["logstore"]) > 0:
schedule_sql_config.setSourceLogstore(source_store_config["logstore"])
schedule_sql_config.setDestEndpoint(origin_job_config["destEndpoint"])
schedule_sql_config.setDestProject(origin_job_config["destProject"])
schedule_sql_config.setDestLogstore(origin_job_config["destLogstore"])
schedule_sql_config.setDestRoleArn(origin_job_config["destRoleArn"])
if len(dest_store_config["project"]) > 0:
schedule_sql_config.setDestProject(dest_store_config["project"])
if len(dest_store_config["logstore"]) > 0:
schedule_sql_config.setDestLogstore(dest_store_config["logstore"])
if len(dest_store_config["endpoint"]) > 0:
schedule_sql_config.setDestEndpoint(dest_store_config["endpoint"])
schedule_sql_config.setMaxRetries(origin_job_config["maxRetries"])
schedule_sql_config.setMaxRunTimeInSeconds(origin_job_config["maxRunTimeInSeconds"])
schedule_sql_config.setDataFormat(origin_job_config["dataFormat"])
schedule_sql_config.setResourcePool(origin_job_config["resourcePool"])
schedule_sql_config.setFromTime(origin_job_config["fromTime"])
schedule_sql_config.setFromTimeExpr(origin_job_config["fromTimeExpr"])
if from_time > 0:
schedule_sql_config.setFromTime(from_time)
schedule_sql_config.setToTime(origin_job_config["toTime"])
schedule_sql_config.setToTimeExpr(origin_job_config["toTimeExpr"])
if to_time > 0:
schedule_sql_config.setToTime(to_time)
schedule_sql_config.setParameters(origin_job_config["parameters"])
return schedule_sql_config
target_job_config = s_sql_config["target_schedule_sql_config"]
ssql_job_name = target_job_config["s_sql_job_name"]
project = target_job_config["project"]
endpoint = target_job_config["endpoint"]
ssql_job = get_schedule_sql_job_config(endpoint, project, ssql_job_name)
newly_job_config = s_sql_config["newly_job_config"]
source_config = newly_job_config["source"]
dest_config = newly_job_config["destination"]
check_store_item(source_config)
check_store_item(dest_config)
from_time, to_time = newly_job_config["fromTime"], newly_job_config["toTime"]
schedule_sql_config = make_scheduled_sql_config(ssql_job, from_time, to_time, source_config, dest_config)
job_schedule = make_scheduled_sql_schedule(ssql_job)
scheduled_sql = ScheduledSQL()
job_name = make_schedule_sql_name()
scheduled_sql.setName(job_name)
scheduled_sql.setConfiguration(schedule_sql_config)
scheduled_sql.setSchedule(job_schedule)
if len(newly_job_config["description"]) > 0:
scheduled_sql.setDescription(newly_job_config["description"])
else:
scheduled_sql.setDescription(ssql_job["description"])
if len(newly_job_config["displayName"]) > 0:
scheduled_sql.setDisplayName(newly_job_config["displayName"])
else:
scheduled_sql.setDisplayName(ssql_job["displayName"])
sls_client = get_sls_client(source_config["endpoint"])
sls_client.create_scheduled_sql(source_config["project"], scheduled_sql)
print(f"sync to \n\tsrc_project {source_config}\n\tdest_project {dest_config}\n\tjob_name {job_name}")
if __name__ == "__main__":
sync_store_config_path = "./sls_tools/sync_ssql.json"
with open(sync_store_config_path, "r") as reader:
sync_map = json.load(reader)
for ssql_config in sync_map:
try:
create_schedule_sql(ssql_config)
except Exception as e:
print(e)
该文章对您有帮助吗?
- 本页导读
- 配置文件说明
- 示例代码