本文主要介绍如何将数据加工(旧版)升级到数据加工(新版)。
升级概述
如果您已有运行中的数据加工(旧版)任务,可以升级至数据加工(新版),任务升级过程说明:
数据加工服务为保证数据的完整性,自动将当前的数据消费点位迁移至数据加工(新版),升级后将从该点位继续消费数据。
升级已有数据处理逻辑,需使用SPL语言实现与现有一致的数据处理需求,请参考升级SPL规则语法对照。
任务升级完成后,观测和监控数据加工(新版)任务运行状态,请参考观测与监控数据加工(新版)任务。
任务升级至新版以后,不支持回退至旧版。
升级过程须将旧版本DSL脚本翻译为SPL规则,并详细检查处理结果后方可保存并升级。
升级后,任务的观测仪表盘仅显示升级后的运行指标,如需查看升级前运行指标,请参见数据加工仪表盘。
任务升级操作完成后,需要刷新页面,加载数据加工(新版)任务概览。
日志服务控制台升级
登录日志服务控制台。
在Project列表区域,单击目标Project。
在左侧导航栏中,选择
。在加工任务列表中,单击目标加工任务。
在数据加工概览(旧版)页签中,查看加工任务详情,确认旧版本数据加工任务正常运行。
单击修改规则按钮,进入编辑页面后,查看数据加工(旧版)DSL脚本。
单击切换至数据加工(新版),进入新版编辑界面,通过SPL编写数据处理需求。
可通过单击按钮切换至数据加工(新版)或者切换至数据加工(旧版)进行切换,在代码编辑框中写对应的SPL规则、或者数据加工(旧版)SDL。
使用数据加工(新版)调试功能,请参见调试SPL规则。详细检查更新后的SPL规则符合处理需求。
单击修改数据加工(新版),然后单击确认。
数据加工API升级
日志服务API更新已有数据加工任务,请参见UpdateETL - 更新数据加工任务。
日志服务OpenAPI SDK安装请参考SDK安装。基于Python SDK升级任务至数据加工(新版)的脚本参考如下,执行脚本将更新、并重启数据加工任务。
import os
import sys
from typing import List
from alibabacloud_sls20201230.client import Client as Sls20201230Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_sls20201230 import models as sls_20201230_models
class EtlUpgrade:
@staticmethod
def create_client() -> Sls20201230Client:
config = open_api_models.Config(
# 必填,请确保代码运行环境设置了环境变量 ALIBABA_CLOUD_ACCESS_KEY_ID。,
access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
# 必填,请确保代码运行环境设置了环境变量 ALIBABA_CLOUD_ACCESS_KEY_SECRET。,
access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
)
# Endpoint 请参考 https://api.aliyun.com/product/Sls
config.endpoint = f'cn-chengdu.log.aliyuncs.com'
return Sls20201230Client(config)
@staticmethod
def upgrade(args: List[str]) -> None:
client = EtlUpgrade.create_client()
try:
resp = client.get_etl('my-project', 'etl-1715564059-320063')
except Exception as error:
print(error)
raise
# 更新数据加工配置,升级至新版
# SPL规则,请详细检查更新后的SPL规则符合处理需求
job = resp.to_map()
req = sls_20201230_models.UpdateETLRequest().from_map(job["body"])
req.configuration.lang = "SPL"
req.configuration.script = "* | where cast(Status as bigint)=200"
for sink in req.configuration.sinks:
sink.datasets = ["__UNNAMED__"]
try:
# 更新、并重启数据加工任务
client.update_etl('my-project', 'etl-1715564059-320063', req)
except Exception as error:
print(error)
raise
if __name__ == '__main__':
EtlUpgrade.upgrade(sys.argv[1:])
日志服务OpenAPI SDK安装请参考SDK安装。基于Go SDK升级任务至数据加工(新版)的脚本参考如下,执行脚本将更新、并重启数据加工任务。
package main
import (
"fmt"
openapi "github.com/alibabacloud-go/darabonba-openapi/v2/client"
sls20201230 "github.com/alibabacloud-go/sls-20201230/v6/client"
"github.com/alibabacloud-go/tea/tea"
"os"
)
// 创建SLS客户端
func createClient(accessKeyId, accessKeySecret, endpoint string) (_result *sls20201230.Client, _err error) {
config := &openapi.Config{
Endpoint: tea.String(endpoint),
AccessKeyId: tea.String(accessKeyId),
AccessKeySecret: tea.String(accessKeySecret),
Protocol: tea.String("http"),
}
_result, _err = sls20201230.NewClient(config)
return _result, _err
}
// 升级ETL作业
func upgrade(client *sls20201230.Client, project, etlJobName string) (_err error) {
// 获取ETL作业信息
getETLResp, _err := client.GetETL(tea.String(project), tea.String(etlJobName))
if _err != nil {
fmt.Println(_err)
return _err
}
// 准备更新ETL作业请求
updateETLReq := sls20201230.UpdateETLRequest{
Configuration: getETLResp.Body.Configuration,
DisplayName: tea.String(fmt.Sprint(etlJobName, "-update")),
Description: tea.String("this is update"),
}
// 更新语言和脚本
updateETLReq.Configuration.Lang = tea.String("SPL")
updateETLReq.Configuration.Script = tea.String("* | where cast(Status as bigint)=200")
// 更新sink datasets
for _, sink := range updateETLReq.Configuration.Sinks {
sink.Datasets = []*string{tea.String("__UNNAMED__")}
}
// 更新ETL作业
_, _err = client.UpdateETL(tea.String(project), tea.String(etlJobName), &updateETLReq)
if _err != nil {
fmt.Println(_err)
return _err
}
return nil
}
func main() {
// 从环境变量获取必要的配置
accessKeyId, _ := os.LookupEnv("ALIBABA_CLOUD_ACCESS_KEY_ID")
accessKeySecret, _ := os.LookupEnv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
endpoint := "cn-chengdu.log.aliyuncs.com"
// 创建SLS客户端
client, err := createClient(accessKeyId, accessKeySecret, endpoint)
if err != nil {
fmt.Println("创建客户端失败:", err)
return
}
// 升级ETL作业
err = upgrade(client, "my-project", "etl-1715564059-320063")
if err != nil {
fmt.Println("ETL升级失败:", err)
return
}
fmt.Println("ETL升级成功")
}
- 本页导读 (1)
- 升级概述
- 日志服务控制台升级
- 数据加工API升级