文档

数据加工新旧版本对比与升级

更新时间:

本文介绍数据加工(新版)与数据加工(旧版)服务的差异,以及如何将旧版本任务升级到新版本。

新旧版本对比

对比项

数据加工(新版)

数据加工(旧版)

数据处理语法

数据处理SPL,参考SPL语法

数据加工DSL,参考数据加工语法

支持场景

  1. 数据规整、信息提取。

  2. 数据清洗与过滤。

  3. 数据分发至多目标、或动态目标。

  1. 数据规整、信息提取。

  2. 数据清洗与过滤。

  3. 数据分发至多目标、或动态目标。

  4. 信息富化:数据维表,IPv4解析等。

  5. 跨地域同步数据(公网)。

CreateETL - 创建数据加工任务(接口对比)

  1. 添加配置项lang,值为SPL。

  2. 配置项script值为SPL规则。

  3. 输出目标sink中添加配置项datasets。

  1. 无配置项lang。

  2. 配置项script值为DSL脚本。

  3. 输出目标sink中无配置项datasets。

源数据Logstore消费组依赖

不依赖

依赖

新旧版本选择

场景考量

目前,数据加工(新版)暂未支持数据富化等场景。如果需要从SLS Logstore,OSS Object、RDS Table等数据源关联维表信息,进行IP地理位置解析,或者跨地域同步数据,请选择数据加工(旧版)。

计费考量

通过数据加工服务的技术迭代升级,数据加工(新版)提供了更具优势的使用成本,仅为数据加工(旧版)的1/3,具体参考按使用功能计费模式计费项。所以,在需求场景已经支持时,推荐使用数据加工(新版)。

新旧版本语法对比

相对于数据加工(旧版)DSL语言,日志服务SPL语法提升了易用性,具体如下:

  • 数据加工(旧版)DSL语法作为Python语法子集,需要函数式编程,使用过程中存在较多语法符号冗余。相较而言,日志服务SPL语言是类Shell指令式语法,在最大程度减少语法符号的冗余。

    • 旧版本使用函数v引用字段值v("field"),SPL则直接引用字段,比如| where field='ERROR'

    • 旧版本函数调用func(arg1, arg2),转为SPL指令| cmd arg1, arg2,编写更简洁。

  • 数据加工(旧版)DSL的定义中,字段值固定为字符串类型,类型转换的中间结果不支持保留。SPL语言支持处理过程中临时字段类型保持,请参见类型保持

    • 数据加工(旧版):

    如下DSL脚本,需两次调用ct_int函数。

    e_set("ms", ct_float(v("sec"))*1000)
    e_keep(ct_float(v("ms")) > 500)
    • 数据加工(新版):

    对应的SPL逻辑则更为简洁,无需两次转换类型。

    | extend ms=cast(sec as double)*1000
    | where ms>1024
  • 复用日志服务SQL函数,无需额外的理解成本,详情请参考函数概览

迁移到新版

如果您已有运行中的数据加工(旧版)任务,可以升级至新版,任务升级过程说明:

升级前注意事项

任务升级至新版以后,不支持回退至旧版。

升级中注意事项

  • 升级过程须将旧版本DSL脚本翻译为SPL规则,并详细检查处理结果后方可保存并升级。

  • 升级已有数据处理逻辑,需使用SPL语言实现与现有一致的数据处理需求,请参见升级SPL规则语法对照

升级后注意事项

  • 数据加工服务保证数据的完整性,自动将当前的数据消费点位迁移至数据加工(新版),升级后将从该点位继续消费数据。

  • 任务升级完成后,观测和监控数据加工(新版)任务运行状态,请参见观测与监控数据加工(新版)任务

  • 升级保存后,任务的观测仪表盘仅显示升级之后的运行指标,请到名为internal-etl-log的Logstore中查看。

  • 任务升级操作完成后,需要刷新页面,加载数据加工(新版)任务概览。

日志服务控制台升级

  1. 登录日志服务控制台

  2. 在Project列表区域,单击目标Project。

    image

  3. 在左侧导航栏中,选择任务管理 > 数据加工

  4. 在加工任务列表中,单击目标加工任务。

  5. 数据加工概览(旧版)页签中,查看加工任务详情,确认旧版本数据加工任务正常运行。

    image

  6. 单击修改规则按钮,进入编辑页面后,查看数据加工(旧版)DSL脚本。

  7. 单机切换至数据加工(新版),进入新版编辑界面,通过SPL编写数据处理需求。

    可通过单击按钮切换至数据加工(新版)或者切换至数据加工(旧版)进行切换,在代码编辑框中写对应的SPL规则、或者数据加工(旧版)SDL。

    使用数据加工(新版)调试功能,请参见调试SPL规则。详细检查更新后的SPL规则符合处理需求。

    image

  8. 单击修改数据加工(新版),然后单击确认

数据加工API升级

日志服务API更新已有数据加工任务,请参见UpdateETL - 更新数据加工任务,日志服务OpenAPI SDK安装请参考SDK安装。基于Python SDK升级任务至数据加工(新版)的脚本参考如下,执行脚本将更新、并重启数据加工任务。

import os
import sys
from typing import List
from alibabacloud_sls20201230.client import Client as Sls20201230Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_sls20201230 import models as sls_20201230_models


class EtlUpgrade:
    @staticmethod
    def create_client() -> Sls20201230Client:
        config = open_api_models.Config(
            # 必填,请确保代码运行环境设置了环境变量 ALIBABA_CLOUD_ACCESS_KEY_ID。,
            access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
            # 必填,请确保代码运行环境设置了环境变量 ALIBABA_CLOUD_ACCESS_KEY_SECRET。,
            access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
        )
        # Endpoint 请参考 https://api.aliyun.com/product/Sls
        config.endpoint = f'cn-chengdu.log.aliyuncs.com'
        return Sls20201230Client(config)

    @staticmethod
    def upgrade(args: List[str]) -> None:
        client = EtlUpgrade.create_client()
        try:
            resp = client.get_etl('my-project', 'etl-1715564059-320063')
        except Exception as error:
            print(error)
            raise

        # 更新数据加工配置,升级至新版
        # SPL规则,请详细检查更新后的SPL规则符合处理需求
        job = resp.to_map()
        req = sls_20201230_models.UpdateETLRequest().from_map(job["body"])
        req.configuration.lang = "SPL"
        req.configuration.script = "* | where cast(Status as bigint)=200"
        for sink in req.configuration.sinks:
            sink.datasets = ["__UNNAMED__"]

        try:
            # 更新、并重启数据加工任务
            client.update_etl('my-project', 'etl-1715564059-320063', req)
        except Exception as error:
            print(error)
            raise


if __name__ == '__main__':
    EtlUpgrade.upgrade(sys.argv[1:])
package main

import (
	"fmt"
	openapi "github.com/alibabacloud-go/darabonba-openapi/v2/client"
	sls20201230 "github.com/alibabacloud-go/sls-20201230/v6/client"
	"github.com/alibabacloud-go/tea/tea"
	"os"
)

// 创建SLS客户端
func createClient(accessKeyId, accessKeySecret, endpoint string) (_result *sls20201230.Client, _err error) {
	config := &openapi.Config{
		Endpoint:        tea.String(endpoint),
		AccessKeyId:     tea.String(accessKeyId),
		AccessKeySecret: tea.String(accessKeySecret),
		Protocol:        tea.String("http"),
	}

	_result, _err = sls20201230.NewClient(config)
	return _result, _err
}

// 升级ETL作业
func upgrade(client *sls20201230.Client, project, etlJobName string) (_err error) {
	// 获取ETL作业信息
	getETLResp, _err := client.GetETL(tea.String(project), tea.String(etlJobName))
	if _err != nil {
		fmt.Println(_err)
		return _err
	}

	// 准备更新ETL作业请求
	updateETLReq := sls20201230.UpdateETLRequest{
		Configuration: getETLResp.Body.Configuration,
		DisplayName:   tea.String(fmt.Sprint(etlJobName, "-update")),
		Description:   tea.String("this is update"),
	}

	// 更新语言和脚本
	updateETLReq.Configuration.Lang = tea.String("SPL")
	updateETLReq.Configuration.Script = tea.String("* | where cast(Status as bigint)=200")

	// 更新sink datasets
	for _, sink := range updateETLReq.Configuration.Sinks {
		sink.Datasets = []*string{tea.String("__UNNAMED__")}
	}

	// 更新ETL作业
	_, _err = client.UpdateETL(tea.String(project), tea.String(etlJobName), &updateETLReq)
	if _err != nil {
		fmt.Println(_err)
		return _err
	}
	return nil
}

func main() {
	// 从环境变量获取必要的配置
	accessKeyId, _ := os.LookupEnv("ALIBABA_CLOUD_ACCESS_KEY_ID")
	accessKeySecret, _ := os.LookupEnv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
	endpoint := "cn-chengdu.log.aliyuncs.com"

	// 创建SLS客户端
	client, err := createClient(accessKeyId, accessKeySecret, endpoint)
	if err != nil {
		fmt.Println("创建客户端失败:", err)
		return
	}

	// 升级ETL作业
	err = upgrade(client, "my-project", "etl-1715564059-320063")
	if err != nil {
		fmt.Println("ETL升级失败:", err)
		return
	}
	fmt.Println("ETL升级成功")
}

升级SPL规则语法对照

日志服务SPL语言在数据处理场景下与SQL的使用对照请参考SPL与SQL的使用场景对照

数据同步(无需处理逻辑)

旧版

数据加工(旧版)DSL脚本为空

新版

数据加工(新版)SPL规则为空

数据筛选与过滤:文本类型精确匹配

旧版

e_keep(v("level") == "ERROR")或者

e_drop(v("level") != "ERROR")或者

e_if(v("level") != "ERROR", e_drop())或者

e_keep(e_search("level==ERROR"))

新版

| where level='ERROR'

数据筛选与过滤:数值类型过滤

旧版

e_keep(ct_int(v("status"))>=400)

新版

| where cast(status as bigint)>=400

数据筛选与过滤:模糊匹配

旧版

e_keep(op_in(v("level"), "ERROR"))或者

e_keep(e_search("level: ERROR")或者

e_if(op_not_in(v("level"), "ERROR"), e_drop())

新版

| where level like '%ERROR%'

新增字段,比如单个关键信息提取或者构造

旧版

  1. 正则提取单个信息

e_set("version", regex_select(v("data"), r'"version":\d+'))

  1. JSON提取单个信息,数据加工(旧版)JSON查询语言JMES语法

e_set("version", json_select(v("data"), "version"))

新版

  1. 正则提取单个信息

| extend version=regexp_extract(data, '"version":\d+')

  1. JSON提取单个信息,数据加工(新版)JSON对象路径引用JsonPath

| extend version=json_extract(data, '$.version')

时间信息解析与格式化

旧版

  1. 提取日志时间字段__time__

e_set(
    "__time__", 
    dt_parsetimestamp(
        v("time"), 
        fmt="%Y/%m/%d %H-%M-%S",
    ),
)

  1. 时间格式规范化

e_set(
    "__time__", 
    dt_parsetimestamp(
        v("time"), 
        fmt="%Y/%m/%d %H-%M-%S",
    ),
)

新版

  1. 提取日志时间字段__time__

| extend time=date_parse(time, '%Y/%m/%d %H-%i-%S')

| extend __time__=cast(to_unixtime(time) as bigint)

  1. 时间格式规范化

| extend time=date_parse(time, '%Y/%m/%d %H-%i-%S')

| extend time=date_format(time, '%Y-%m-%d %H:%i:%S')

字段处理与筛选

旧版

  1. 精确选择字段。

e_keep_fields("__tag__:node", "path", regex=False)

  1. 按模式选择字段。

e_keep_fields("__tag__:.*", regex=True)

  1. 原地重命名部分字段。

e_rename("__tag__:node", node)

  1. 按模式排除字段。

e_drop_fields("__tag__:.*", regex=True)

新版

  1. 精确选择字段。

| project node="__tag__:node", path

  1. 按模式选择字段。

| project -wildcard "__tag__:*"

  1. 原地重命名部分字段。

| project-rename node="__tag__:node"

  1. 按模式排除字段。

| project-away -wildcard "__tag__:*"

正则提取多个字段

旧版

e_regex("data", r"(\S+)\s+(\w+)", ["time", "level"])

新版

| parse-regexp data, '(\S+)\s+(\w+)' as time, level

JSON Object键值信息展开为数据字段

旧版

数据加工(旧版)JSON查询语言请参见JMES语法

e_json("data", depth=1, jmes="x.y.z")

新版

数据加工(新版)JSON对象路径引用请参见JsonPath

| parse-json -path='$.x.y.z' data

CSV格式内容提取为数据字段

旧版

e_csv("data", ["time", "addr", "user"], sep="\0", quote='"')

新版

  1. 单字符分隔符请参见CSV RFC 4180

| parse-csv -delim='\0' -quote='"' data as time, addr, user

  1. 多字符分隔符。

| parse-csv -delim='^_^' data AS time, addr, user