本文档介绍如何通过资源函数从其他Logstore中获取数据对日志数据进行富化。

资源函数res_log_logstore_pull支持使用两种模式从其他Logstore中获取数据。
  • 获取指定时间间隔内Logstore的数据内容。
  • 不设置结束时间,持续获取目标Logstore内容。
关于函数的更多详细说明请参见res_log_logstore_pull
说明 res_log_logstore_pull是一个单独的语法,只负责从目标Logstore获取数据,自己本身并没有做任何富化的操作,所以不建议单独使用res_log_logstore_pull,结合e_table_mape_search_table_map语句一起使用才有意义。

样例数据

假设我们有两个Logstore,source_logstore是存储个人信息,target_logstore是酒店存储客人入住信息 。现在我们将酒店的入住信息拿来做富化。
说明 此处采用pull_log接口获取数据,富化的Logstore并不依赖索引。
个人信息source_logstore
topic:xxx
city:xxx
cid:12345
name:maki


topic:xxx
city:xxx
cid:12346
name:vicky

topic:xxx
city:xxx
cid:12347
name:mary
酒店入住信息target_logstore
time:1567038284
status:check in
cid:12345
name:maki
room_number:1111

time:1567038284
status:check in
cid:12346
name:vicky
room_number:2222

time:1567038500
status:check in
cid:12347
name:mary
room_number:3333

time:1567038500
status:leave
cid:12345
name:maki
room_number:1111
基本语法。
res_log_logstore_pull(
        endpoint,
        ak_id,
        ak_secret,
        project,
        logstore,
        fields,
        from_time=None,
        to_time=None,
        fetch_include_data=None,
        fetch_exclude_data=None,
        primary_keys=None,
        delete_data=None,
        refresh_interval_max=60,
        fetch_interval=2):

获取指定时间内所有数据

说明 此处的时间是日志获取时间。
  • DSL编排语法。
    res_log_logstore_pull(..., ["cid","name","room_number"],from_time=1567038284,to_time=1567038500)
  • 获取到的数据。
    #这里我们的语法中field填入了cid,name,room_number三个字段,并且指定了时间范围,将会获取这个时间范围内,Logstore所有数据的这三个字段的值
    
    cid:12345
    name:maki
    room_number:1111
    
    cid:12346
    name:vicky
    room_number:2222
    
    cid:12347
    name:mary
    room_number:3333
    
    cid:12345
    name:maki
    room_number:1111

设置黑白名单过滤数据

  • 设置白名单。
    • DSL 编排语法。
      # 设置白名单,只有room_number值等于1111的数据会被拉取下来。
      res_log_logstore_pull(..., ["cid","name","room_number","status"],from_time=1567038284,to_time=1567038500,fetch_include_data="room_number:1111")
    • 获取到的数据。
      # 设置了ferch_include_data白名单,只有包含room_numver:1111的数据会被拉取下来,其他数据不会被拉取。
      
      status: check in
      cid:12345
      name:maki
      room_number:1111
      
      status:leave
      cid:12345
      name:maki
      room_number:1111
  • 设置黑名单。
    • DSL 编排语法。
      res_log_logstore_pull(..., ["cid","name","room_number","status"],from_time=1567038284,to_time=1567038500,fetch_exclude_data="room_number:1111")
    • 获取到的数据。
      # 设置黑名单fetch_exclude_data,当数据包含room_number:1111的时候丢弃这条数据。
      status:check in
      cid:12346
      name:vicky
      room_number:2222
      
      
      status:check in
      cid:12347
      name:mary
      room_number:3333
  • 同时设置黑白名单。
    • DSL编排语法。
      res_log_logstore_pull(..., ["cid","name","room_number","status"],from_time=1567038284,to_time=1567038500,fetch_exclude_data="status:leave",fetch_include_data="status:check in")
    • 获取到的数据。
      # 黑白名单同时存在的情况下,优先进行黑名单数据的匹配,然后在匹配白名单。
      #黑名单填入的是status:leave的值,当数据包含status:leave的值时候,数据会被直接丢弃。
      #然后匹配白名单,白名单我们填入的是status:check in当数据包含status: check in的值时候,该数据才会被拉取下来。
      status:check in
      cid:12345
      name:maki
      room_number:1111
      
      
      status:check in
      cid:12346
      name:vicky
      room_number:2222
      
      
      status:check in
      cid:12347
      name:mary
      room_number:3333

持续拉取目标Logstore数据

如果目标Logstore的数据是持续写入,我们需要持续的去拉取的时候,设置to_time参数为None就可以。同时可以通过fetch_interval设置拉取的时间间隔,通过refresh_interval_max设置当拉取遇到错误时重试的最大时间间隔。

DSL编排语法。
res_log_logstore_pull(..., ["cid","name","room_number","status"],from_time=1567038284,to_time=None,fetch_interval=15,refresh_interval_max=60)
# 需要注意的是,在持续拉取的过程中,如果遇到错误,服务器会一直退火重试,直到成功为止,不会停止数据加工进程。

开启主键维护获取目标Logstore数据

  • 注意事项:

    目前该功能仅支持所有数据存储在目标Logstore的一个Shard中,否则请不要使用该功能。多Shard场景会在以后版本中进行支持。

  • 背景:

    以上述个人信息source_logstore和酒店信息target_logstore的数据为例。因为Logstore中的数据只能写入无法删除,而有时候我们希望不要匹配已经删除的数据,此时就需要开启主键维护功能。

  • 需求演示:

    我们需要获取酒店信息target_logstore中,已经入住但还没离开的客人信息。当status=leave时表示客人已经离开酒店,不需要拉取该信息。

  • DSL编排语法:
    res_log_logstore_pull(..., ["cid","name","room_number","status","time"],from_time=1567038284,to_time=None,primary_keys="cid",delete_data="status:leave")
  • 得到的数据:
    ## 可以看到name为maki的客人的最后更新status为leave ,已经离开酒店,所以并没有将maki的数据拉取下来。
    time:1567038284
    status:check in
    cid:12346
    name:vicky
    room_number:2222
    
    time:1567038500
    status:check in
    cid:12347
    name:mary
    room_number:3333
说明 primary_keys目前只支持设置单字符串。需要设置Logstore数据中值唯一的字段,例如样例中的cid,类似数据库的唯一主键。当设置primary_keys的时候,delete_data也必须不为None,这样才有意义。

使用函数做数据富化

  • 使用e_table_map函数。
    • DSL编排语法
      # 使用e_table_map做富化
      e_table_map(res_log_logstore_pull(...,
              fields=["cid","room_number"],
              from_time="begin",
              ), "cid","room_number")
    • 富化后数据
      使用e_table_map将拉取的酒店信息和用户信息作了富化,将每个人居住的房间号room_number通过cid字段富化到了个人信息的数据当中。关于函数的使用请参见e_table_map
      topic:xxx
      city:xxx
      cid:12345
      name:maki
      room_nuber:1111
      
      topic:xxx
      city:xxx
      cid:12346
      name:vicky
      room_number:2222
      
      topic:xxx
      city:xxx
      cid:12347
      name:mary
      room_number:3333
  • 使用e_search_table_map函数。
    • DSL编排语法
      # 使用e_search_table_map做富化
      e_search_table_map(res_log_logstore_pull(...,
              fields=["cid","room_number"],
              from_time="begin",
              ), "cid=12346","room_number")
    • 富化后的数据
      使用e_search_table_map函数将个人信息和酒店入住信息做搜索匹配,将匹配酒店入住数据中cid=12346的数据,并且将命中的数据的room_number字段添加到源数据中。关于函数的使用请参见e_search_table_map
      topic:xxx
      city:xxx
      cid:12345
      name:maki
      room_nuber:2222
      
      topic:xxx
      city:xxx
      cid:12346
      name:vicky
      room_number:2222
      
      topic:xxx
      city:xxx
      cid:12347
      name:mary
      room_number:2222