本文介绍数据加工(新版)与数据加工(旧版)服务的差异,以及如何将旧版本任务升级到新版本。
新旧版本对比
对比项 | 数据加工(新版) | 数据加工(旧版) |
数据处理语法 | 数据处理SPL,参考SPL语法。
| 数据加工DSL,参考数据加工语法。 |
支持场景 |
|
|
CreateETL - 创建数据加工任务(接口对比) |
|
|
源数据Logstore消费组依赖 | 不依赖 | 依赖 |
新旧版本选择
场景考量
目前,数据加工(新版)暂未支持数据富化等场景。如果需要从SLS Logstore,OSS Object、RDS Table等数据源关联维表信息,进行IP地理位置解析,或者跨地域同步数据,请选择数据加工(旧版)。
计费考量
通过数据加工服务的技术迭代升级,数据加工(新版)提供了更具优势的使用成本,仅为数据加工(旧版)的1/3,具体参考按使用功能计费模式计费项。所以,在需求场景已经支持时,推荐使用数据加工(新版)。
新旧版本语法对比
相对于数据加工(旧版)DSL语言,日志服务SPL语法提升了易用性,具体如下:
数据加工(旧版)DSL语法作为Python语法子集,需要函数式编程,使用过程中存在较多语法符号冗余。相较而言,日志服务SPL语言是类Shell指令式语法,在最大程度减少语法符号的冗余。
旧版本使用函数
v
引用字段值v("field")
,SPL则直接引用字段,比如| where field='ERROR'
。旧版本函数调用
func(arg1, arg2)
,转为SPL指令| cmd arg1, arg2
,编写更简洁。
数据加工(旧版)DSL的定义中,字段值固定为字符串类型,类型转换的中间结果不支持保留。SPL语言支持处理过程中临时字段类型保持,请参见类型保持。
数据加工(旧版):
如下DSL脚本,需两次调用ct_int函数。
e_set("ms", ct_float(v("sec"))*1000) e_keep(ct_float(v("ms")) > 500)
数据加工(新版):
对应的SPL逻辑则更为简洁,无需两次转换类型。
| extend ms=cast(sec as double)*1000 | where ms>1024
复用日志服务SQL函数,无需额外的理解成本,详情请参考函数概览。
升级到新版
如果您已有运行中的数据加工(旧版)任务,可以升级至数据加工(新版),任务升级过程说明:
数据加工服务为保证数据的完整性,自动将当前的数据消费点位迁移至数据加工(新版),升级后将从该点位继续消费数据。
升级已有数据处理逻辑,需使用SPL语言实现与现有一致的数据处理需求,请参考升级SPL规则语法对照。
任务升级完成后,观测和监控数据加工(新版)任务运行状态,请参考观测与监控数据加工(新版)任务。
任务升级至新版以后,不支持回退至旧版。
升级过程须将旧版本DSL脚本翻译为SPL规则,并详细检查处理结果后方可保存并升级。
升级后,任务的观测仪表盘仅显示升级后的运行指标,如需查看升级前运行指标,请参见数据加工仪表盘。
任务升级操作完成后,需要刷新页面,加载数据加工(新版)任务概览。
日志服务控制台升级
登录日志服务控制台。
在Project列表区域,单击目标Project。
在左侧导航栏中,选择
。在加工任务列表中,单击目标加工任务。
在数据加工概览(旧版)页签中,查看加工任务详情,确认旧版本数据加工任务正常运行。
单击修改规则按钮,进入编辑页面后,查看数据加工(旧版)DSL脚本。
单机切换至数据加工(新版),进入新版编辑界面,通过SPL编写数据处理需求。
可通过单击按钮切换至数据加工(新版)或者切换至数据加工(旧版)进行切换,在代码编辑框中写对应的SPL规则、或者数据加工(旧版)SDL。
使用数据加工(新版)调试功能,请参见调试SPL规则。详细检查更新后的SPL规则符合处理需求。
单击修改数据加工(新版),然后单击确认。
数据加工API升级
日志服务API更新已有数据加工任务,请参见UpdateETL - 更新数据加工任务。
Python
日志服务OpenAPI SDK安装请参考SDK安装。基于Python SDK升级任务至数据加工(新版)的脚本参考如下,执行脚本将更新、并重启数据加工任务。
import os
import sys
from typing import List
from alibabacloud_sls20201230.client import Client as Sls20201230Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_sls20201230 import models as sls_20201230_models
class EtlUpgrade:
@staticmethod
def create_client() -> Sls20201230Client:
config = open_api_models.Config(
# 必填,请确保代码运行环境设置了环境变量 ALIBABA_CLOUD_ACCESS_KEY_ID。,
access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
# 必填,请确保代码运行环境设置了环境变量 ALIBABA_CLOUD_ACCESS_KEY_SECRET。,
access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
)
# Endpoint 请参考 https://api.aliyun.com/product/Sls
config.endpoint = f'cn-chengdu.log.aliyuncs.com'
return Sls20201230Client(config)
@staticmethod
def upgrade(args: List[str]) -> None:
client = EtlUpgrade.create_client()
try:
resp = client.get_etl('my-project', 'etl-1715564059-320063')
except Exception as error:
print(error)
raise
# 更新数据加工配置,升级至新版
# SPL规则,请详细检查更新后的SPL规则符合处理需求
job = resp.to_map()
req = sls_20201230_models.UpdateETLRequest().from_map(job["body"])
req.configuration.lang = "SPL"
req.configuration.script = "* | where cast(Status as bigint)=200"
for sink in req.configuration.sinks:
sink.datasets = ["__UNNAMED__"]
try:
# 更新、并重启数据加工任务
client.update_etl('my-project', 'etl-1715564059-320063', req)
except Exception as error:
print(error)
raise
if __name__ == '__main__':
EtlUpgrade.upgrade(sys.argv[1:])
Go
日志服务OpenAPI SDK安装请参考SDK安装。基于Go SDK升级任务至数据加工(新版)的脚本参考如下,执行脚本将更新、并重启数据加工任务。
package main
import (
"fmt"
openapi "github.com/alibabacloud-go/darabonba-openapi/v2/client"
sls20201230 "github.com/alibabacloud-go/sls-20201230/v6/client"
"github.com/alibabacloud-go/tea/tea"
"os"
)
// 创建SLS客户端
func createClient(accessKeyId, accessKeySecret, endpoint string) (_result *sls20201230.Client, _err error) {
config := &openapi.Config{
Endpoint: tea.String(endpoint),
AccessKeyId: tea.String(accessKeyId),
AccessKeySecret: tea.String(accessKeySecret),
Protocol: tea.String("http"),
}
_result, _err = sls20201230.NewClient(config)
return _result, _err
}
// 升级ETL作业
func upgrade(client *sls20201230.Client, project, etlJobName string) (_err error) {
// 获取ETL作业信息
getETLResp, _err := client.GetETL(tea.String(project), tea.String(etlJobName))
if _err != nil {
fmt.Println(_err)
return _err
}
// 准备更新ETL作业请求
updateETLReq := sls20201230.UpdateETLRequest{
Configuration: getETLResp.Body.Configuration,
DisplayName: tea.String(fmt.Sprint(etlJobName, "-update")),
Description: tea.String("this is update"),
}
// 更新语言和脚本
updateETLReq.Configuration.Lang = tea.String("SPL")
updateETLReq.Configuration.Script = tea.String("* | where cast(Status as bigint)=200")
// 更新sink datasets
for _, sink := range updateETLReq.Configuration.Sinks {
sink.Datasets = []*string{tea.String("__UNNAMED__")}
}
// 更新ETL作业
_, _err = client.UpdateETL(tea.String(project), tea.String(etlJobName), &updateETLReq)
if _err != nil {
fmt.Println(_err)
return _err
}
return nil
}
func main() {
// 从环境变量获取必要的配置
accessKeyId, _ := os.LookupEnv("ALIBABA_CLOUD_ACCESS_KEY_ID")
accessKeySecret, _ := os.LookupEnv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
endpoint := "cn-chengdu.log.aliyuncs.com"
// 创建SLS客户端
client, err := createClient(accessKeyId, accessKeySecret, endpoint)
if err != nil {
fmt.Println("创建客户端失败:", err)
return
}
// 升级ETL作业
err = upgrade(client, "my-project", "etl-1715564059-320063")
if err != nil {
fmt.Println("ETL升级失败:", err)
return
}
fmt.Println("ETL升级成功")
}
升级SPL规则语法对照
日志服务SPL语言在数据处理场景下与SQL的使用对照请参考SPL与SQL的使用场景对照。
数据同步(无需处理逻辑) | |
旧版 | 数据加工(旧版)DSL脚本为空 |
新版 | 数据加工(新版)SPL规则为空 |
数据筛选与过滤:文本类型精确匹配 | |
旧版 |
|
新版 |
|
数据筛选与过滤:数值类型过滤 | |
旧版 |
|
新版 |
|
数据筛选与过滤:模糊匹配 | |
旧版 |
|
新版 |
|
新增字段,比如单个关键信息提取或者构造 | |
旧版 |
|
新版 |
|
时间信息解析与格式化 | |
旧版 |
|
新版 |
|
字段处理与筛选 | |
旧版 |
|
新版 |
|
正则提取多个字段 | |
旧版 |
|
新版 |
|
JSON Object键值信息展开为数据字段 | |
旧版 | 数据加工(旧版)JSON查询语言请参见JMES语法。
|
新版 | 数据加工(新版)JSON对象路径引用请参见JsonPath。
|
CSV格式内容提取为数据字段 | |
旧版 |
|
新版 |
|