Elasticsearch数据源

Elasticsearch数据源为您提供读取和写入Elasticsearch双向通道的功能,本文为您介绍DataWorks的Elasticsearch数据同步的能力支持情况。

背景信息

Elasticsearch在公共资源组上支持Elasticsearch 5.x版本,在Serverless资源组(推荐)和独享数据集成资源组上支持Elasticsearch 5.x、6.x、7.x和8.x版本。

说明

Elasticsearch是遵从Apache开源条款的一款开源产品,是当前主流的企业级搜索引擎。Elasticsearch是一个基于Lucene的搜索和数据分析工具,它提供分布式服务。Elasticsearch核心概念同数据库核心概念的对应关系如下所示。

Relational DB(实例)-> Databases(数据库)-> Tables(表)-> Rows(一行数据)-> Columns(一行数据的一列)
Elasticsearch        -> Index              -> Types       -> Documents       -> Fields

Elasticsearch中可以有多个索引或数据库,每个索引可以包括多个类型或表,每个类型可以包括多个文档或行,每个文档可以包括多个字段或列。Elasticsearch Writer插件使用Elasticsearch的Rest API接口,批量把从Reader读入的数据写入Elasticsearch中。

支持的版本

DataWorks平台目前仅支持配置阿里云Elasticsearch 5.x、6.x、7.x和8.x版本数据源,不支持配置自建Elasticsearch数据源。

使用限制

离线读写

  • Elasticsearch Reader会获取Server端shard信息用于数据同步,需要确保在任务同步中Server端的shards处于存活状态,否则会存在数据不一致风险。

  • 如果您使用的是6.x及以上版本,支持使用Serverless资源组(推荐)独享数据集成资源组

  • 不支持同步scaled_float类型的字段。

  • 不支持同步字段中带有关键字 $ref的索引。

支持的字段类型

类型

离线读(Elasticsearch Reader)

离线写(Elasticsearch Writer)

实时写

binary

支持

支持

支持

boolean

支持

支持

支持

keyword

支持

支持

支持

constant_keyword

不支持

不支持

不支持

wildcard

不支持

不支持

不支持

long

支持

支持

支持

integer

支持

支持

支持

short

支持

支持

支持

byte

支持

支持

支持

double

支持

支持

支持

float

支持

支持

支持

half_float

不支持

不支持

不支持

scaled_float

不支持

不支持

不支持

unsigned_long

不支持

不支持

不支持

date

支持

支持

支持

date_nanos

不支持

不支持

不支持

alias

不支持

不支持

不支持

object

支持

支持

支持

flattened

不支持

不支持

不支持

nested

支持

支持

支持

join

不支持

不支持

不支持

integer_range

支持

支持

支持

float_range

支持

支持

支持

long_range

支持

支持

支持

double_range

支持

支持

支持

date_range

支持

支持

支持

ip_range

不支持

支持

支持

ip

支持

支持

支持

version

支持

支持

支持

murmur3

不支持

不支持

不支持

aggregate_metric_double

不支持

不支持

不支持

histogram

不支持

不支持

不支持

text

支持

支持

支持

annotated-text

不支持

不支持

不支持

completion

支持

不支持

不支持

search_as_you_type

不支持

不支持

不支持

token_count

支持

不支持

不支持

dense_vector

不支持

不支持

不支持

rank_feature

不支持

不支持

不支持

rank_features

不支持

不支持

不支持

geo_point

支持

支持

支持

geo_shape

支持

支持

支持

point

不支持

不支持

不支持

shape

不支持

不支持

不支持

percolator

不支持

不支持

不支持

string

支持

支持

支持

工作原理

Elasticsearch Reader的工作原理如下:

  • 通过Elasticsearch的_searchscrollslice(即游标分片)方式实现,slice结合数据集成任务的task多线程分片机制使用。

  • 根据Elasticsearch中的Mapping配置,转换数据类型。

更多详情请参见Elasticsearch官方文档

说明

Elasticsearch Reader会获取Server端shard信息用于数据同步,需要确保在任务同步中Server端的shards处于存活状态,否则会存在数据不一致风险。

基本配置

重要

实际运行时,请删除下述代码中的注释。

{
 "order":{
  "hops":[
   {
    "from":"Reader",
    "to":"Writer"
   }
  ]
 },
 "setting":{
  "errorLimit":{
   "record":"0" //错误记录数。
  },
  "jvmOption":"",
  "speed":{
   "concurrent":3,//并发数
   "throttle":true,//
                     "mbps":"12",//限流,此处1mbps = 1MB/s。
  }
 },
 "steps":[
  {
   "category":"reader",
   "name":"Reader",
   "parameter":{
    "column":[ //读取列。
     "id",
     "name"
    ],
    "endpoint":"", //服务地址。
    "index":"",  //索引。
    "password":"",  //密码。
    "scroll":"",  //scroll标志。
    "search":"",  //查询query参数,与Elasticsearch的query内容相同,使用_search api,重命名为search。
    "type":"default",
    "username":""  //用户名。
   },
   "stepType":"elasticsearch"
  },
  {
   "stepType": "elasticsearch",
            "parameter": {
                "column": [ //写入列
                    {
                        "name": "id",
                        "type": "integer"
                    },
                    {
                        "name": "name",
                        "type": "text"
                    }
                ],
                "index": "test",   //写入索引
                 "indexType": "",   //写入索引类型,es7不填
                "actionType": "index",  //写入方式
                "cleanup": false,         //是否重建索引
                "datasource": "test",   //数据源名称
                "primaryKeyInfo": {     //主键取值方式
                    "fieldDelimiterOrigin": ",",
                    "column": [
                        "id"
                    ],
                    "type": "specific",
                    "fieldDelimiter": ","
                },
                "dynamic": false,  //动态映射
                "batchSize": 1024   //批量写文档数
            },
            "name": "Writer",
            "category": "writer"
  }
 ],
 "type":"job",
 "version":"2.0" //版本号。
}

高级功能

  • 支持全量拉取

    支持将Elasticsearch中一个文档的所有内容拉取为一个字段。配置详情请参见场景一:全量拉取

  • 支持提取半结构化到结构化数据

    分类

    描述

    相关文档

    产生背景

    Elasticsearch中的数据特征为字段不固定,且有中文名、数据使用深层嵌套的形式。为更好地方便下游业务对数据的计算和存储需求,特推出从半结构化到结构化的转换解决方案。

    实现原理

    将Elasticsearch获取到的JSON数据,利用JSON工具的路径获取特性,将嵌套数据扁平化为一维结构的数据。然后将数据映射至结构化数据表中,拆分Elasticsearch复合结构数据至多个结构化数据表。

    解决方案

    JSON有嵌套的情况,通过path路径来解决。

    • 属性

    • 属性.子属性

    • 属性[0].子属性

    场景二:嵌套或对象字段属性同步

    附属信息有一对多的情况,需要进行拆表拆行处理,进行遍历。

    属性[*].子属性

    场景三:数组属性拆分为多行

    数组归并,一个字符串数组内容,归并为一个属性,并进行去重。

    属性[]

    场景四:数组属性去重归并

    多属性合一,将多个属性合并为一个属性。

    属性1,属性2

    场景五:多属性合一同步

    多属性选择处理。

    属性1|属性2

    场景六:多属性选择同步

创建数据源

在进行数据同步任务开发时,您需要在DataWorks上创建一个对应的数据源,操作流程请参见创建并管理数据源详细的配置参数解释可在配置界面查看对应参数的文案提示

数据同步任务开发

数据同步任务的配置入口和通用配置流程可参见下文的配置指导。

单表离线同步任务配置指导

单表实时写同步任务配置指导

操作流程请参见DataStudio侧实时同步任务配置

整库离线写、单表/整库全增量实时写同步任务配置指导

操作流程请参见数据集成侧同步任务配置

附录一:脚本Demo与参数说明

离线任务脚本配置方式

如果您配置离线任务时使用脚本模式的方式进行配置,您需要按照统一的脚本格式要求,在任务脚本中编写相应的参数,详情请参见通过脚本模式配置离线同步任务,以下为您介绍脚本模式下数据源的参数配置详情。

Reader脚本Demo

{
    "order":{
        "hops":[
            {
                "from":"Reader",
                "to":"Writer"
            }
        ]
    },
    "setting":{
        "errorLimit":{
            "record":"0" //错误记录数。
        },
        "jvmOption":"",
        "speed":{
            "concurrent":3,
            "throttle":false
        }
    },
    "steps":[
        {
            "category":"reader",
            "name":"Reader",
            "parameter":{
                "column":[ //读取列。
                    "id",
                    "name"
                ],
                "endpoint":"http://es-cn-xxx.elasticsearch.aliyuncs.com:9200", //服务地址。
                "index":"aliyun_es_xx",  //索引。
                "password":"*******",  //密码。
                "multiThread":true,
                "scroll":"5m",  //scroll标志。
                "pageSize":5000,
                "connTimeOut":600000,
                "readTimeOut":600000,
                "retryCount":30,
                "retrySleepTime":"10000",
                "search":{
                            "range":{
                                "gmt_modified":{
                                    "gte":0
                                }
                            }
                        },  //查询query参数,与Elasticsearch的query内容相同,使用_search api,重命名为search。
                "type":"doc",
                "username":"aliyun_di"  //用户名。
            },
            "stepType":"elasticsearch"
        },
        {
            "category":"writer",
            "name":"Writer",
            "parameter":{ },
            "stepType":"stream"
        }
    ],
    "type":"job",
    "version":"2.0" //版本号。
}

Reader脚本参数

参数

描述

是否必选

默认值

datasource

数据源名称,脚本模式支持添加数据源,该配置项填写的内容必须与添加的数据源名称保持一致。

index

Elasticsearch中的index名。

type

Elasticsearch中indextype名。

index名

search

Elasticsearch的query参数。

pageSize

每次读取数据的条数。

100

scroll

Elasticsearch的分页参数,设置游标存放时间。

  • 设置的过小时,如果获取两页数据间隔时间超出scroll,会导致游标过期,进而丢失数据。

  • 设置的过大时,如果同一时刻发起的查询过多,超出服务端max_open_scroll_context配置时,会导致数据查询报错。

strictMode

以严格模式读取Elasticsearch中的数据,当出现Elasticsearch的shard.failed时会停止读取,避免读取少数据。

true

sort

返回结果的排序字段。

retryCount

失败后重试的次数。

300

connTimeOut

客户端连接超时时间。

600,000

readTimeOut

客户端读取超时时间。

600,000

multiThread

http请求,是否有多线程。

true

preemptiveAuth

http是否使用抢先模式请求

false

retrySleepTime

失败后重试的时间间隔。

1000

discovery

是否开启节点发现。

  • true:与集群中随机一个节点进行连接。启用节点发现将轮询并定期更新客户机中的服务器列表,并对发现的节点发起查询请求。

  • false:对配置的endpoint发起查询请求。

false

compression

是否使用GZIP压缩请求正文,使用时需要在es节点上启用http.compression设置。

false

dateFormat

待同步字段存在date类型,且该字段mapping没有format配置时,需要配置dateFormat参数。配置形式如下: "dateFormat" : "yyyy-MM-dd||yyyy-MM-dd HH:mm:ss",该配置需要包含同步date类型字段的所有格式。

full

是否将全文档内容作为一个字段同步至目标端,将Elasticsearch的查询数据作为一个字段,配置详情请参见场景一:全量拉取

multi

该配置是一个高级功能具有五种用法,两个子属性分别为multi.keymulti.mult,配置详情请参见高级功能中表格内容。

Writer脚本Demo

{
    "order": {
        "hops": [
            {
                "from": "Reader",
                "to": "Writer"
            }
        ]
    },
    "setting": {
        "errorLimit": {
            "record": "0"
        },
        "speed": {
            "throttle":true,//当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
            "concurrent":1, //作业并发数。
            "mbps":"12"//限流,此处1mbps = 1MB/s。
        }
    },
    "steps": [
        {
            "category": "reader",
            "name": "Reader",
            "parameter": {

            },
            "stepType": "stream"
        },
        {
            "category": "writer",
            "name": "Writer",
            "parameter": {
                "datasource":"xxx",
                "index": "test-1",
                "type": "default",
                "cleanup": true,
                "settings": {
                        "number_of_shards": 1,
                        "number_of_replicas": 0
                },
                "discovery": false,
                "primaryKeyInfo":{
                    "type":"pk",    
                     "fieldDelimiter":",",
                     "column":[]
                    },
                "batchSize": 1000,
                "dynamic":false,
                "esPartitionColumn":[
                    {
                        "name":"col1",  
                        "comment":"xx", 
                        "type":"STRING" 
                        }
                     ],
                "column": [
                    {
                        "name": "pk",
                        "type": "id"
                    },
                    {
                        "name": "col_ip",
                        "type": "ip"
                    },
                    {
                        "name": "col_array",
                        "type": "long",
                        "array": true,
                    },
                    {
                        "name": "col_double",
                        "type": "double"
                    },
                    {
                        "name": "col_long",
                        "type": "long"
                    },
                    {
                        "name": "col_integer",
                        "type": "integer"
                    {
                        "name": "col_keyword",
                        "type": "keyword"
                    },
                    {
                        "name": "col_text",
                        "type": "text",
                        "analyzer": "ik_max_word",
                        "other_params":
                            {
                                "doc_values": false
                            },
                    },
                    {
                        "name": "col_geo_point",
                        "type": "geo_point"
                    },
                    {
                        "name": "col_date",
                        "type": "date",
                        "format": "yyyy-MM-dd HH:mm:ss"
                    },
                    {
                        "name": "col_nested1",
                        "type": "nested"
                    },
                    {
                        "name": "col_nested2",
                        "type": "nested"
                    },
                    {
                        "name": "col_object1",
                        "type": "object"
                    },
                    {
                        "name": "col_object2",
                        "type": "object"
                    },
                    {
                        "name": "col_integer_array",
                        "type": "integer",
                        "array": true
                    },
                    {
                        "name": "col_geo_shape",
                        "type": "geo_shape",
                        "tree": "quadtree",
                        "precision": "10m"
                    }
                ]
            },
            "stepType": "elasticsearch"
        }
    ],
    "type": "job",
    "version": "2.0"
}
说明

VPC环境的Elasticsearch运行在默认资源组会存在网络不通的情况。您需要使用Serverless资源组(推荐)和独享数据集成资源组,才能连通VPC进行数据同步。添加资源的详情请参见Serverless资源组

Writer脚本参数

参数

描述

是否必选

默认值

datasource

选择需要同步的Elasticsearch数据源,若还未在DataWorks创建该数据源,请先创建,详情请参见配置Elasticsearch数据源

index

Elasticsearch中的index名。

indexType

Elasticsearch中index的type名。

Elasticsearch

cleanup

定义当前任务在索引index已存在的情况是否要删除数据。

  • 是(true):导入数据前删除原来的索引并重建同名索引,此操作会删除该索引下的数据。

  • 否(false):导入数据前保留索引中已存在的数据。

false

batchSize

定义同步任务一次性插入ElasticSearch的Document条数。

1,000

trySize

定义往ElasticSearch写入数据失败后的重试次数。

30

timeout

客户端超时时间。

600,000

discovery

任务是否启动节点发现功能。

  • true:与集群中随机一个节点进行连接。启用节点发现将轮询并定期更新客户机中的服务器列表。

  • false:与Elasticsearch集群进行连接。

false

compression

HTTP请求,开启压缩。

true

multiThread

HTTP请求,是否有多线程。

true

ignoreWriteError

忽略写入错误,不重试,继续写入。

false

ignoreParseError

忽略解析数据格式错误,继续写入。

true

alias

Elasticsearch的别名类似于数据库的视图机制,为索引my_index创建一个别名my_index_alias,对my_index_alias的操作与my_index的操作一致。

配置alias表示在数据导入完成后,为指定的索引创建别名。

aliasMode

数据导入完成后增加别名的模式,包括append(增加模式)和exclusive(只留这一个):

  • aliasModeappend时,表示追加当前索引至别名alias映射中(一个别名对应多个索引)。

  • aliasModeexclusive时,表示首先删除别名alias,再添加当前索引至别名alias映射中(一个别名对应一个索引)。

后续会转换别名为实际的索引名称,别名可以用来进行索引迁移和多个索引的查询统一,并可以用来实现视图的功能。

append

settings

创建index时的settings,与Elasticsearch官方一致。

column

column用来配置文档的多个字段Filed信息,具体每个字段项可以配置name(名称)、type(类型)等基础配置,以及AnalyzerFormatArray等扩展配置。

Elasticsearch所支持的字段类型如下所示。

- id  //type id对应Elasticsearch中的_id,可以理解为唯一主键。写入时,相同id的数据会被覆盖,且不会被索引。
- string
- text
- keyword
- long
- integer
- short
- byte
- double
- float
- date
- boolean
- binary
- integer_range
- float_range
- long_range
- double_range
- date_range
- geo_point
- geo_shape
- ip
- token_count
- array
- object
- nested

列类型的说明如下:

  • 列类型为text类型时,可以配置analyzer(分词器)、normsindex_options等参数,示例如下。

    {
        "name": "col_text",
        "type": "text",
        "analyzer": "ik_max_word"
        }
  • 列类型为Date类型时,您可配置如下两种方式解析源端数据,配置方式请保持一致。

    • 方式一:根据reader端读取字段的内容直接写入es data字段:

      • 配置origin:true必填,让读取字段的内容直接写入es data

      • 配置"format",表示在通过es writer创建mapping时,该字段需要设置format属性。示例如下:

          {
             "parameter":{
               "column":[{
                   "name": "col_date",
                   "type": "date",
                   "format": "yyyy-MM-dd HH:mm:ss",
                   "origin": true
                }]
           }
        }
    • 方式二:(时区转换)如果需要数据集成帮助您进行时区转换,可添加Timezone参数。示例如下:

      配置的"format"表示数据集成在做时区转换时,解析的时间格式如下:

        {
           "parameter" :{
             "column": [{
                "name": "col_date",
                "type": "date",
                "format": "yyyy-MM-dd HH:mm:ss",
               "Timezone": "UTC"
             }]
         }
      }
  • 列类型为地理形状geo_shape时,可以配置tree(geohash或quadtree)、precision(精度)属性,示例如下。

    {
        "name": "col_geo_shape",
        "type": "geo_shape",
        "tree": "quadtree",
        "precision": "10m"
        }

如果需要在column中配置除了type以外的属性值,您可以使用other_params参数,该参数配置在column中,在update mappings时,用于描述column中除了type以外的Elasticsearch属性信息。

 {
   "name": "guid",
   "other_params":
    {
       "doc_values": false
      },
    "type": "text"
  }

如果您希望源端数据写入为Elasticsearch时按照数组类型写入,您可按照JSON格式或指定分隔符的方式来解析源端数据。配置详情请参见附录二:ElasticSearch写入的格式期望是数组类型

dynamic

定义当在文档中发现未存在的字段时,同步任务是否通过Elasticsearch动态映射机制为字段添加映射。

  • true:保留Elasticsearch的自动mappings映射。

  • false:默认值,不填写默认为false,根据同步任务配置的column生成并更新Elasticsearch的mappings映射。

Elasticsearch 7.x版本的默认type_doc。使用Elasticsearch的自动mappings时,请配置_docesVersion为7。

您需要转换为脚本模式,添加一个版本参数:"esVersion": "7"

false

actionType

表示Elasticsearch在数据写出时的action类型,目前数据集成支持indexupdate两种actionType,默认值为index

  • index:底层使用了Elasticsearch SDK的Index.Builder构造批量请求。Elasticsearch index插入时,需要首先判断插入的文档数据中是否指定ID:

    • 如果没有指定ID,Elasticsearch会默认生成一个唯一ID。该情况下会直接添加文档至Elasticsearch中。

    • 如果已指定ID,会进行更新(替换整个文档),且不支持针对特定Field进行修改。

      说明

      此处的更新并非Elasticsearch中的更新(替换部分指定列替换)。

  • update:根据用户指定的ID进行文档更新,如果ID值在索引中不存在则插入文档,存在则更新指定的column字段内容(其他文档字段内容不变)。每次update完成都会获取整个文档信息,从而实现针对特定字段进行修改。这里update不支持条件筛选,仅根据指定ID值进行更新操作。由于每次更新都需要获取一遍原始文档,因此对性能同步能会有较大影响。

    说明

    设置action类型为update时,您需要设置主键primaryKeyInfo

index

primaryKeyInfo

定义当前写入ElasticSearch的主键取值方式。

  • 业务主键(pk):_id 的值指定为某一个字段。

    "parameter":{
    "primaryKeyInfo":{
    "type":"pk",
    "column":["id"]}
    }
  • 联合主键(specific):_id 的值指定为某几个字段的值拼接,分隔符为您设置的主键分隔符fieldDelimiter

    说明

    其中字段名为eswriter的待写入字段,向导模式拉取主键列配置时只包含Elasticsearch索引中已存在的字段。

    "parameter":{
    "primaryKeyInfo":{
    "type":"specific",
    "fieldDelimiter":",",
    "column":["col1","col2"]}
    }
  • 无主键(nopk):_id在写入ElasticSearch时系统自动生成。

    "primaryKeyInfo":{
    "type":"nopk"
    }

specific

esPartitionColumn

定义写入ElasticSearch时是否开启分区,用于修改ElasticSearch中的routing的参数。

  • 开启分区:把指定列的value通过分隔符空串连接指定为routing的值,在写入时,插入或更新指定shard中的doc,开启分区的情况下您需要指定分区列。

    {    "esPartitionColumn": [
            {
                "name":"col1",
                "comment":"xx",
                "type":"STRING"
                }
            ],
        }
  • 不开启分区:不填写该参数,默认使用_id作为routing起到将文档均匀分布到多个分片上防止数据倾斜的作用。

false

enableWriteNull

该参数用于是否支持将来源端的空值字段同步至Elasticsearch。取值如下:

  • true:支持。同步后,Elasticsearch中对应字段的value为空。

  • false:不支持。来源端的空值字段无法同步至Elasticsearch,即在Elasticsearch中不显示该字段。

true

附录二:ElasticSearch写入的格式期望是数组类型

支持以下两种方式将源端数据按照数组类型写入ElasticSearch。

  • 按JSON格式解析源端数据

    例如:源端数据为"[1,2,3,4,5]",配置json_array=true对其进行解析,同步将以数组格式写入ElasticSearch。

    "parameter" : {
      {
        "name":"docs_1",
        "type":"keyword",
        "json_array":true
      }
    }
  • 按分隔符解析源端数据

    例如:源端数据为"1,2,3,4,5", 配置分隔符splitter=","对其进行解析,同步将以数组格式写入ElasticSearch。

    说明

    一个任务仅支持配置一种分隔符,splitter全局唯一,不支持多array字段配置为不同的分隔符。例如源端字段列col1="1,2,3,4,5" , col2="6-7-8-9-10", splitter无法针对每列单独配置使用。

    "parameter" : {
          "column": [
            {
              "name": "docs_2",
              "array": true,
              "type": "long"
            }
          ],
          "splitter":","//注意:splitter配置与column配置同级。
    }

附录三:场景示例

场景一:全量拉取

  • 背景说明:将Elasticsearch中文档查询的结果拉取为一个字段。

  • 配置示例:

    
    ## 读端:Elasticsearch中的原始数据
    "hits": [
        {
            "_index": "mutiltest_1",
            "_type": "_doc",
            "_id": "IXgdO4MB4GR_1DmrjTXP",
            "_score": 1.0,
            "_source": {
                "feature1": "value1",
                "feature2": "value2",
                "feature3": "value3"
            }
        }]
    
    ##数据集成Elasticsearch Reader插件配置
    "parameter": {
      "column": [
          "content"
      ],
      "full":true
    }
    
    ##写端结果:同步至目标端1行1列
    {"_index":"mutiltest_1","_type":"_doc","_id":"IXgdO4MB4GR_1DmrjTXP","_source":{"feature1":"value1","feature2":"value2","feature3":"value3"},"sort":["IXgdO4MB4GR_1DmrjTXP"]}

场景二:嵌套或对象字段属性同步

  • 背景说明:Object对象或nested嵌套字段的属性时,通过path路径来解决。

  • 配置形式:

    • 属性

    • 属性.子属性

    • 属性[0].子属性

  • 脚本配置:

    "multi":{
        "multi":true
    }
    说明

    向导模式暂不支持配置。

  • 配置示例:

    ## 读端:Elasticsearch中的原始数据
    "hits": [
        {
            "_index": "mutiltest_1",
            "_type": "_doc",
            "_id": "7XAOOoMB4GR_1Dmrrust",
            "_score": 1.0,
            "_source": {
                "level1": {
                    "level2": [
                        {
                            "level3": "testlevel3_1"
                        },
                        {
                            "level3": "testlevel3_2"
                        }
                    ]
                }
            }
        }
    ]
    ##数据集成Elasticsearch reader插件配置
    "parameter": {
      "column": [
          "level1",
          "level1.level2",
          "level1.level2[0]",
          "level1.level2.level3"
      ],
      "multi":{
            "multi":true
        }
    }
    
    ##写端结果:1行数据4列
    column1(level1):            {"level2":[{"level3":"testlevel3_1"},{"level3":"testlevel3_2"}]}
    column2(level1.level2):     [{"level3":"testlevel3_1"},{"level3":"testlevel3_2"}]
    column3(level1.level2[0]):  {"level3":"testlevel3_1"}
    column4(level1.level2.level3):  null
    说明
    • 获取的节点上层有数组时结果为null,如上样例获取level1.level2.level3不会报错,同步结果为null,需要配置为level1.level2[0].level3或level1.level2[1].level3,当前不支持level1.level2[*].level3。

    • 不支持key出现"."的数据, 如"level1.level2":{"level3":"testlevel3_1"}, 此时该条数据获取结果为null。

场景三:数组属性拆分为多行

  • 背景说明:附属信息有一对多的情况,需要将数组列拆成多行。

  • 配置形式:属性[*].子属性

  • 效果示意:源端数据{ "splitKey" :[1,2,3,4,5]},拆完后写到目标端为5行:{"splitKey[0]":1,"splitKey[1]":2,"splitKey[2]":3,"splitKey[3]":4,"splitKey[4]":5}

  • 脚本配置:

    "multi":{   
           "multi":true,    
            "key": "headers"
    }
    说明
    • 向导模式下配置拆多行数组列名,会自动生成脚本配置,具有相同效果。

    • value必须为List,否则会报错。

  • 配置示例:

    ## 读端:Elasticsearch中的原始数据
    [
        {
            "_index": "lmtestjson",
            "_type": "_doc",
            "_id": "nhxmIYMBKDL4VkVLyXRN",
            "_score": 1.0,
            "_source": {
                "headers": [
                    {
                        "remoteip": "192.0.2.1"
                    },
                    {
                        "remoteip": "192.0.2.2"
                    }
                ]
            }
        },
        {
            "_index": "lmtestjson",
            "_type": "_doc",
            "_id": "wRxsIYMBKDL4VkVLcXqf",
            "_score": 1.0,
            "_source": {
                "headers": [
                    {
                        "remoteip": "192.0.2.3"
                    },
                    {
                        "remoteip": "192.0.2.4"
                    }
                ]
            }
        }
    ]
    ##数据集成Elasticsearch reader插件配置
    {
       "column":[
          "headers[*].remoteip"
      ]
      "multi":{
          "multi":true,
          "key": "headers"
      }
    }
    
    ##写端结果:4行
    192.0.2.1
    192.0.2.2
    192.0.2.3
    192.0.2.4

场景四:数组属性去重归并

  • 背景说明:数组去重归并,将一个数组属性去重归并后写入为字符串属性,数组属性可以为子属性如name1.name2,去重采用tostring结果作为标准。

  • 配置形式:属性[]。

    column里面带有 [] 关键字就会认为对该属性做去重归并。

  • 脚本配置:

    "multi":{
        "multi":true
    }
    说明

    向导模式暂不支持配置。

  • 配置示例:

    ## 读端:Elasticsearch中的原始数据
    "hits": [
    {
        "_index": "mutiltest_1",
        "_type": "_doc",
        "_id": "4nbUOoMB4GR_1Dmryj8O",
        "_score": 1.0,
        "_source": {
            "feature1": [
                "value1",
                "value1",
                "value2",
                "value2",
                "value3"
            ]
        }
    }
    ]
    ##数据集成Elasticsearch reader插件配置
    "parameter": {
      "column":[
            "feature1[]"
      ],
      "multi":{
            "multi":true
        }
    }
    
    ##写端结果:1行1列数据
    "value1,value2,value3"

场景五:多属性合一同步

  • 背景说明:多属性选择处理,返回第一个有值的属性,都不存在时将写入null。

  • 配置形式:属性1|属性2|...

    column里面带有 "|"关键字就会对该项做多属性选择。

  • 脚本配置:

    "multi":{    
        "multi":true
    }
    说明

    向导模式暂不支持该配置。

  • 配置示例:

    ##读端:Elasticsearch中的原始数据
    "hits": [
        {
            "_index": "mutiltest_1",
            "_type": "_doc",
            "_id": "v3ShOoMB4GR_1DmrZN22",
            "_score": 1.0,
            "_source": {
                "feature1": "feature1",
                "feature2": [
                    1,
                    2,
                    3
                ],
                "feature3": {
                    "child": "feature3"
                }
            }
        }]
    
    ##数据集成Elasticsearch reade插件配置
    "parameter": {
      "column":[
            "feature1|feature2|feature3"
      ],
      "multi":{
            "multi":true
        }
    }
    
    ##写端结果:1行1列数据
    "feature1"

场景六:多属性选择同步

  • 背景说明:多属性选择处理 ,返回第一个有值的属性,都不存在时写入null。

  • 配置形式:属性1|属性2|...

    column里面带有 "|"关键字就会对该项做多属性选择

  • 脚本配置:

    "multi":{
        "multi":true
    }
    说明

    向导模式暂不支持该配置。

  • 配置示例:

    ##读端:Elasticsearch中的原始数据
    "hits": [
        {
            "_index": "mutiltest_1",
            "_type": "_doc",
            "_id": "v3ShOoMB4GR_1DmrZN22",
            "_score": 1.0,
            "_source": {
                "feature1": "feature1",
                "feature2": [
                    1,
                    2,
                    3
                ],
                "feature3": {
                    "child": "feature3"
                }
            }
        }]
    ##数据集成Elasticsearch reader插件配置
    "parameter": {
      "column":[
            "feature1,feature2,feature3"
      ],
      "multi":{
            "multi":true
        }
    }
    
    ##写端结果:1行1列数据
    "feature1,[1,2,3],{"child":"feature3"}"

相关文档

数据集成支持其他更多数据源接入,更多信息,请参见支持的数据源及同步方案