本文档主要介绍资源函数的语法规则,包括参数解释、函数示例等。

函数列表

类型 函数 说明
本地 res_local 获取当前数据加工任务的高级参数信息。
RDS数据库 res_rds_mysql 从RDS-MySQL中获取特定数据库表格的数据,支持定期刷新。
日志服务 res_log_logstore_pull 在加工当前的Logstore数据时,拉取另一个Logstore中的数据。支持持续拉取与表格维护。
OSS res_oss_file 从OSS中获取特定Bucket下的文件内容。

res_local

  • 函数格式
    res_local(param, default=None, type="auto")
  • 参数说明
    参数名称 参数类型 是否必填 说明
    param String 数据加工任务的高级参数名。
    default 任意 param设置的参数值不存在时的默认值,默认None。
    type String 输出时的数据格式。
    • auto:默认值,首先将原始值转化为JSON格式,当失败时直接返回其字符串形式。
    • JSON:将原始值转化为JSON格式,如果失败则会返回default的值。
    • raw:原始字符串形式。
  • 返回结果

    根据参数配置,返回JSON格式的数据或者原始字符串。

  • JSON转换示例
    • 成功示例
      原始字符串 返回值 返回值类型
      1 1 整数
      1.2 1.2 浮点
      true True 布尔
      false False 布尔
      "123" 123 字符串
      null None None
      ["v1", "v2", "v3"] ["v1", "v2", "v3"] 列表
      ["v1", 3, 4.0] ["v1", 3, 4.0] 列表
      {"v1": 100, "v2": "good"} {"v1": 100, "v2": "good"} 列表
      {"v1": {"v11": 100, "v2": 200}, "v3": "good"} {"v1": {"v11": 100, "v2": 200}, "v3": "good"} 列表
    • 失败示例
      如下形式的原始字符串没有适配的JSON转换,会返回字符串。
      原始字符串 返回值 说明
      (1,2,3) "(1,2,3)" 元组不支持,需要使用列表形式。
      True "True" 布尔类型只有true和false两种形式。
      {1: 2, 3: 4} "{1: 2, 3: 4}" 字典的关键字只能是字符串。
  • 函数示例:从高级参数中获取信息然后赋值给local。
    控制台高级参数设置如下:高级参数
    原始日志
    content: 1
    加工规则
    e_set("local", res_local('endpoint'))
    加工结果
    content: 1
    local: hangzhou

res_rds_mysql

  • 函数格式
    res_rds_mysql(address, username, password, database, table=None, sql=None, fields=None, fetch_include_data=None, fetch_exclude_data=None, refresh_interval=0,base_retry_back_off=1,max_retry_back_off=60,use_ssl=False)
    说明
    • RDS(MySQL)白名单:当您的RDS(MySQL)中配置了白名单时,请将白名单设置为0.0.0.0使所有IP地址都可访问。这种方式会增加数据库的风险,如果您需要设置指定的日志服务IP地址,请提工单。后续日志服务将提供更安全的方式支持白名单功能。
    • 访问网络:支持通过公网和阿里云内网访问RDS(MySQL),内网访问方式请参见数据加工支持通过内网访问RDS数据库
  • 参数说明
    参数名称 参数类型 是否必填 说明
    address String 域名或地址,当端口号不是3306时以地址:port形式来配置。目前仅支持公网地址。
    username String 连接数据库的用户名。
    password String 连接数据库的密码。
    database String 要连接的数据库名。
    table String 要获取的表格名,如果配置了sql,则该参数可以不用设置。
    sql String 要获取的表格名,如果配置了table,则该参数可以不用设置。
    fields 字符串列表或字符串别名列表 字符串列表或者字符串映射列表,如果不填默认使用tablesql返回的所有列。例如需要将["user_id", "province", "city", "name", "age"]name改名为user_name时,可以通过["user_id", "province", "city", ("name", "user_name"), ("nickname", "nick_name"), "age"]
    说明 当同时设置sqlfields参数时,仅会执行sql参数中的sql语句,fields参数将不生效。
    fetch_include_data 搜索字符串 字符串白名单机制,必须满足fetch_include_data才会保留,否则丢弃。
    fetch_exclude_data 搜索字符串 字符串黑名单机制,必须满足fetch_exclude_data才会丢弃,否则保留。
    refresh_interval 数字字符串或数字 从rds拉取数据的时间间隔,单位为秒。默认为0,表示紧全量拉取一次。
    base_retry_back_off Number 拉取失败后重新拉取的时间间隔,默认值为1,单位为秒。
    max_retry_back_off int 加工任务失败时,重试请求的最大时间间隔。默认值为60,单位为秒,建议使用默认值。
    primary_keys String/List 设置该字段,会将数据库表的数据以dict的形式保存到内存中,其中dict的key为此处设置的primary_keys参数值,dict的value为拉取下来的数据库表的整行数据。在匹配的时候使用key/value的模式匹配数据库表中的数据。具体请参见该函数的最后一个示例。
    说明
    • 当数据库表的数据量较大时推荐使用该设置。
    • 此处设置的primary_keys的值必须存在于从数据库表拉取的字段中。
    use_ssl Bool 是否使用SSL协议进行安全连接,默认值为false表示不使用SSL协议。
    说明 如果rds-mysql开启SSL,使用该参数会使用SSL通道进行连接,但是不校验服务端的ca证书。目前暂时不支持使用服务端生成ca证书进行连接。
  • 返回结果

    返回多列表格,其字段由fields定义。

  • 错误处理

    如果拉取过程出错则会抛出异常,此时数据加工任务不会停止,而是按照base_retry_back_off参数设置的时间间隔进行退火重试。假设首次重试的时间间隔为1s,如果重试失败,下次重试的时间间隔为上一次的2倍,以此类推,直到达到max_retry_back_off参数设定的值。如果依然失败,则后续将一直按照max_retry_back_off参数设置的时间间隔进行重试。如果重试成功,则重试间隔会恢复到初始值。

  • 函数示例
    • 提取数据库名为test_db,表名为test_table的数据,每隔300秒重新拉取一次表数据。
      res_rds_mysql(address="rds mysql 数据库host 地址",username="xxx",password="xxx",database="test_db",table="test_table",refresh_interval=300)
    • 不提取test_table表中status字段值为delete的数据。
      res_rds_mysql(address="rds mysql 数据库host 地址",username="xxx",password="xxx",database="test_db",table="test_table",refresh_interval=300,fetch_exclude_data="'status':'delete'")
    • 仅提取test_table表中status字段值为exit的数据。
      res_rds_mysql(address="rds mysql 数据库host 地址",username="xxx",password="xxx",database="test_db",table="test_table",refresh_interval=300,fetch_include_data="'status':'exit'")
    • 提取test_table表中status字段值为exit的数据,并且该数据不包含name字段值为aliyun的数据。
      res_rds_mysql(address="rds mysql 数据库host 地址",username="xxx",password="xxx",database="test_db",table="test_table",refresh_interval=300,fetch_exclude_data="'status':'exit'",fetch_exclude_data="'name':'aliyun'")
    • 使用primary_keys主键模式
      • 数据库表table
        userid city_name city_number
        10001 beijing 12345
      • 源Logstore数据
        # 数据1
        userid:10001
        gdp:1000
        
        #数据2
        userid:10002
        gdp:800
      • 加工规则
        e_table_map(res_rds_mysql(..., primary_keys="userid"),"userid",["city_name","city_number"])
      • 加工结果
        # 数据1
        userid:10001
        gdp:1000
        city_name: beijing
        city_number:12345
        
        #数据2
        userid:10002
        gdp:800
      • 工作原理

        当不使用主键模式时,会使用遍历的形式去逐行匹配表数据,拉取的表数据在内存中的存放形式为[{"userid":"10001","city_name":"beijing","city_number":"12345"}],效率较低,但是占用内存小,适合小数据量下使用。而使用主键模式的时候,会把primary_keys的值作为key提取出来,拉取的表数据在内存中的存放形式为{"10001":{"userid":"10001","city_name":"beijing","city_number":"12345"}},这样匹配的时候会使用key/value模式进行哈希匹配,效率非常快,适合大数据量下使用。

res_log_logstore_pull

  • 函数格式
    res_log_logstore_pull(endpoint, ak_id, ak_secret, project, logstore, fields, from_time="begin", to_time="end", fetch_include_data=None, fetch_exclude_data=None, primary_keys=None, upsert_data=None, delete_data=None,max_retry_back_off=60,fetch_interval=2,base_retry_back_off=1)
  • 参数说明
    参数名称 参数类型 是否必填 说明
    endpoint String 服务入口,默认为https地址,也可以配置为http。特殊情况下使用非80/443端口。例如http://入口地址:port。关于日志服务入口请参见服务入口
    ak_id String 账户的AccessKeyID,请勿明文配置。
    ak_secret String 账户的AccessKey密钥,请勿明文配置。
    project String 要拉取数据的项目名称。
    logstore String 要提取数据的项目下的日志库名称。
    fields 字符串列表或字符串别名列表 字符串列表或者字符串映射列表,日志中不包含某个字段时,则该字段的值为空。例如需要将["user_id", "province", "city", "name", "age"]name改名为user_name时,可以通过["user_id", "province", "city", ("name", "user_name"), ("nickname", "nick_name"), "age"]
    from_time 字符串 首次开始拉取日志的服务器时间。支持如下时间格式:
    • Unix时间戳。
    • 时间字符串。
    • 指定字符串。例如beginend
    • 表达式:dt_类函数返回的时间。例如dt_totimestamp(dt_truncate(dt_today(tz="Asia/Shanghai"), day=op_neg(-1)))表示昨天拉取日志的开始时间,如果当前时间是2019-5-5 10:10:10+8:00,则上述表达式表示时间2019-5-4 0:0:0+8:00
    默认值为begin,表示会从第一条数据开始拉取。
    to_time 字符串 首次结束读取日志的服务器时间。支持如下时间格式:
    • Unix时间戳。
    • 时间字符串。
    • 指定字符串。例如beginend
    • 表达式:dt_类函数返回的时间。
    默认值为指定字符串end,表示当前的最后一条日志。

    不配置或者配置为None表示持续拉取最新的日志。

    说明 如果填入的是一个未来时间,只会将该Logstore所有数据拉取完毕,并不会开启持续拉取任务。
    fetch_include_data 搜索字符串 字符串白名单机制,必须满足fetch_include_data才会匹配,否则丢弃。
    fetch_exclude_data 搜索字符串 字符串黑名单机制,必须满足fetch_exclude_data才会丢弃,否则保留。
    primary_keys 字符串列表 维护表格时的关键字段列表。如果fields中对关键字段进行修改,这里应使用修改后的字段名,将修改后的字段作为主键。
    说明 primary_keys参数当前只支持单个字符串,且必须存在于fields字段中,并且仅支持要拉取数据的目标Logstore中只有一个Shard的情况,多个Shard暂时不支持。
    fetch_interval Int 开启持续提取任务时,每次提取请求的时间间隔,默认为2秒。该值必须大于或者等于1秒。
    delete_data 搜索字符串 对满足条件且配置了primary_keys的数据,在表格中进行删除操作。更多条件匹配请参见查询字符串语法
    base_retry_back_off Number 拉取数据失败后重新拉取的时间间隔,默认值为1,单位为秒。
    max_retry_back_off Int 拉取数据失败后,重试请求的最大时间间隔,默认值为60,单位为秒。建议使用默认值。
  • 返回结果

    返回多列表格。

  • 错误处理

    如果拉取过程出错则会抛出异常,此时数据加工任务不会停止,而是按照base_retry_back_off参数设置的时间间隔进行退火重试。假设首次重试的时间间隔为1s,如果重试失败,下次重试的时间间隔为上一次的2倍,以此类推,直到达到max_retry_back_off参数设定的值。如果依然失败,则后续将一直按照max_retry_back_off参数设置的时间间隔进行重试。如果重试成功,则重试间隔会恢复到初始值。

  • 函数示例
    # logstore中数据样例
    
    time:1234567
    __topic__:None
    key1:value1
    key2:value2
    
    ......
    • 拉取项目test_projecttest_logstore日志库的数据,并且仅拉取日志中key1key2两个字段对应的数据,指定时间范围为从日志开始写入到结束,仅拉取一次。
      res_log_logstore_pull("endpoint", "ak_id", "ak_secret", "test_project", "test_logstore", ["key1","key2"], from_time="begin", to_time="end")
    • 将上述仅拉取一次设置为持续拉取,持续拉取的时间间隔为30秒。
      res_log_logstore_pull("endpoint", "ak_id", "ak_secret", "test_project", "test_logstore", ["key1","key2"], from_time="begin", to_time=None,fetch_interval=30)
    • 设置黑名单,不拉取包含key1:value1的数据。
      res_log_logstore_pull("endpoint", "ak_id", "ak_secret", "test_project", "test_logstore", ["key1","key2"], from_time="begin", to_time=None,fetch_interval=30,fetch_exclude_data="key1:value1")
    • 设置白名单,仅拉取包含key1:value1的数据。
      res_log_logstore_pull("endpoint", "ak_id", "ak_secret", "test_project", "test_logstore", ["key1","key2"], from_time="begin", to_time=None,fetch_interval=30,fetch_include_data="key1:value1")

res_oss_file

  • 函数格式
    
    res_oss_file(endpoint, ak_id, ak_key, bucket, file, format='text', change_detect_interval=0,base_retry_back_off=1, max_retry_back_off=60,encoding='utf8',error='ignore')
  • 参数说明
    参数名称 参数类型 是否必填 说明
    endpoint String 服务入口,默认为https地址,也可以配置为http。特殊情况下使用非80/443端口。例如http://入口地址:port。关于OSS对应的服务入口请参见访问域名和数据中心
    说明 建议OSS和数据加工日志库在同一个region下,内网的网络稳定且速度快。相反公网比较消耗带宽且网络不稳定。
    ak_id String 账户的AccessKeyID,请勿明文配置。
    ak_key String 账户的AccessKey密钥,请勿明文配置。
    bucket String 将待提取文件内容的Bucket名称。
    file String 待提取项目下的文件路径。例如test/data.txt,不能用/开头。
    format String 文件输出形式。
    • text:以文本形式输出。
    • binary:以字节流形式输出。
    change_detect_interval String 从OSS拉取文件的时间间隔。拉取时会检查文件是否有更新,如果有更新则刷新。默认为0,表示不刷新,仅在程序启动时拉取一次数据。
    base_retry_back_off Number 拉取失败后重新拉取的时间间隔,默认值为1,单位为秒。
    max_retry_back_off int 拉取数据失败后,重试请求的最大时间间隔,默认值为60,单位为秒。建议使用默认值。
    encoding String 编码方式。当formattext的时候,该参数默认为utf8
    error String 设置不同错误的处理方案。默认为ignore。 其他可能值为strictreplacexmlcharrefreplace。该值仅在编码上报UnicodeError错误消息时有效。
  • 返回结果

    返回字节流形式或text形式的文件数据。

  • 错误处理

    如果拉取过程出错则会抛出异常,此时数据加工任务不会停止,而是按照base_retry_back_off参数设置的时间间隔进行退火重试。假设首次重试的时间间隔为1s,如果重试失败,下次重试的时间间隔为上一次的2倍,以此类推,直到达到max_retry_back_off参数设定的值。如果依然失败,则后续将一直按照max_retry_back_off参数设置的时间间隔进行重试。如果重试成功,则重试间隔会恢复到初始值。

  • 示例1:从OSS中提取JSON格式的数据。
    • JSON内容
      {
        "users": [
          {
              "name": "user1",
              "login_historys": [
                {
                  "date": "2019-10-10 0:0:0",
                  "login_ip": "1.1.1.1"
                },
                {
                  "date": "2019-10-10 1:0:0",
                  "login_ip": "1.1.1.1"
                }
              ]
          },
          {
              "name": "user2",
              "login_historys": [
                {
                  "date": "2019-10-11 0:0:0",
                  "login_ip": "1.1.1.2"
                },
                {
                  "date": "2019-10-11 1:0:0",
                  "login_ip": "1.1.1.3"
                },
                {
                  "date": "2019-10-11 1:1:0",
                  "login_ip": "1.1.1.5"
                }
              ]
          }
        ]
      }
    • 原始日志
      content: 123
    • 加工规则
      e_set("json_parse",json_parse(res_oss_file(endpoint='http://oss-cn-hangzhou.aliyuncs.com',ak_id='your ak_id',
                                                                 ak_key='your ak_key',
                                                                 bucket='your bucket', file='testjson.json')))
    • 加工结果
      content: 123
          prjson_parse: 
      '{
        "users": [
          {
              "name": "user1",
              "login_historys": [
                {
                  "date": "2019-10-10 0:0:0",
                  "login_ip": "1.1.1.1"
                },
                {
                  "date": "2019-10-10 1:0:0",
                  "login_ip": "1.1.1.1"
                }
              ]
          },
          {
              "name": "user2",
              "login_historys": [
                {
                  "date": "2019-10-11 0:0:0",
                  "login_ip": "1.1.1.2"
                },
                {
                  "date": "2019-10-11 1:0:0",
                  "login_ip": "1.1.1.3"
                },
                {
                  "date": "2019-10-11 1:1:0",
                  "login_ip": "1.1.1.5"
                }
              ]
          }
        ]
      }'
  • 示例2:从OSS中提取test.txt文本内容。
    • 文本内容
      Test bytes
    • 原始日志
      content: 123
    • 加工规则
      e_set("test_txt",res_oss_file(endpoint='http://oss-cn-hangzhou.aliyuncs.com',ak_id='your ak_id',
                                                                 ak_key='your ak_key',
                                                                 bucket='your bucket', file='test.txt'))
    • 加工结果
      content: 123
      test_txt: Test bytes