Elasticsearch数据源为您提供读取和写入Elasticsearch双向通道的功能,本文为您介绍DataWorks的Elasticsearch数据同步的能力支持情况。
适用范围
Elasticsearch在公共资源组上支持Elasticsearch 5.x版本,在Serverless资源组(推荐)和独享数据集成资源组上支持Elasticsearch 5.x、6.x、7.x和8.x版本。
- Serverless资源组的详情请参见使用Serverless资源组。 
- 独享数据集成资源组的详情请参见使用独享数据集成资源组。 
Elasticsearch是遵从Apache开源条款的一款开源产品,是当前主流的企业级搜索引擎。Elasticsearch是一个基于Lucene的搜索和数据分析工具,它提供分布式服务。Elasticsearch核心概念同数据库核心概念的对应关系如下所示。
| Elasticsearch | 关系型数据库 | 
| Elasticsearch(实例) | Relational DB(实例) | 
| Index | Databases(数据库) | 
| Types | Tables(表) | 
| Documents | Rows(一行数据) | 
| Fields | Columns(一行数据的一列) | 
Elasticsearch中可以有多个索引或数据库,每个索引可以包括多个类型或表,每个类型可以包括多个文档或行,每个文档可以包括多个字段或列。Elasticsearch Writer插件使用Elasticsearch的Rest API接口,批量把从Reader读入的数据写入Elasticsearch中。
支持的版本
DataWorks平台目前仅支持配置阿里云Elasticsearch 5.x、6.x、7.x和8.x版本数据源,不支持配置自建Elasticsearch数据源。
使用限制
Elasticsearch数据源在进行离线读写时会受到以下限制:
- Elasticsearch Reader会获取Server端shard信息用于数据同步,需要确保在任务同步中Server端的shards处于存活状态,否则会存在数据不一致风险。 
- 如果您使用的是6.x及以上版本,支持使用Serverless资源组(推荐)和独享数据集成资源组。 
- 不支持同步scaled_float类型的字段。 
- 不支持同步字段中带有关键字 - $ref的索引。
支持的字段类型
| 类型 | 离线读(Elasticsearch Reader) | 离线写(Elasticsearch Writer) | 实时写 | 
| binary | 支持 | 支持 | 支持 | 
| boolean | 支持 | 支持 | 支持 | 
| keyword | 支持 | 支持 | 支持 | 
| constant_keyword | 不支持 | 不支持 | 不支持 | 
| wildcard | 不支持 | 不支持 | 不支持 | 
| long | 支持 | 支持 | 支持 | 
| integer | 支持 | 支持 | 支持 | 
| short | 支持 | 支持 | 支持 | 
| byte | 支持 | 支持 | 支持 | 
| double | 支持 | 支持 | 支持 | 
| float | 支持 | 支持 | 支持 | 
| half_float | 不支持 | 不支持 | 不支持 | 
| scaled_float | 不支持 | 不支持 | 不支持 | 
| unsigned_long | 不支持 | 不支持 | 不支持 | 
| date | 支持 | 支持 | 支持 | 
| date_nanos | 不支持 | 不支持 | 不支持 | 
| alias | 不支持 | 不支持 | 不支持 | 
| object | 支持 | 支持 | 支持 | 
| flattened | 不支持 | 不支持 | 不支持 | 
| nested | 支持 | 支持 | 支持 | 
| join | 不支持 | 不支持 | 不支持 | 
| integer_range | 支持 | 支持 | 支持 | 
| float_range | 支持 | 支持 | 支持 | 
| long_range | 支持 | 支持 | 支持 | 
| double_range | 支持 | 支持 | 支持 | 
| date_range | 支持 | 支持 | 支持 | 
| ip_range | 不支持 | 支持 | 支持 | 
| ip | 支持 | 支持 | 支持 | 
| version | 支持 | 支持 | 支持 | 
| murmur3 | 不支持 | 不支持 | 不支持 | 
| aggregate_metric_double | 不支持 | 不支持 | 不支持 | 
| histogram | 不支持 | 不支持 | 不支持 | 
| text | 支持 | 支持 | 支持 | 
| annotated-text | 不支持 | 不支持 | 不支持 | 
| completion | 支持 | 不支持 | 不支持 | 
| search_as_you_type | 不支持 | 不支持 | 不支持 | 
| token_count | 支持 | 不支持 | 不支持 | 
| dense_vector | 不支持 | 不支持 | 不支持 | 
| rank_feature | 不支持 | 不支持 | 不支持 | 
| rank_features | 不支持 | 不支持 | 不支持 | 
| geo_point | 支持 | 支持 | 支持 | 
| geo_shape | 支持 | 支持 | 支持 | 
| point | 不支持 | 不支持 | 不支持 | 
| shape | 不支持 | 不支持 | 不支持 | 
| percolator | 不支持 | 不支持 | 不支持 | 
| string | 支持 | 支持 | 支持 | 
工作原理
Elasticsearch Reader的工作原理如下:
- 通过Elasticsearch的_searchscrollslice(即游标分片)方式实现,slice结合数据集成任务的task多线程分片机制使用。 
- 根据Elasticsearch中的Mapping配置,转换数据类型。 
更多详情请参见Elasticsearch官方文档。
Elasticsearch Reader会获取Server端shard信息用于数据同步,需要确保在任务同步中Server端的shards处于存活状态,否则会存在数据不一致风险。
基本配置
实际运行时,请删除下述代码中的注释。
{
 "order":{
  "hops":[
   {
    "from":"Reader",
    "to":"Writer"
   }
  ]
 },
 "setting":{
  "errorLimit":{
   "record":"0" //错误记录数。
  },
  "jvmOption":"",
  "speed":{
   "concurrent":3,//并发数
   "throttle":true,//
                     "mbps":"12",//限流,此处1mbps = 1MB/s。
  }
 },
 "steps":[
  {
   "category":"reader",
   "name":"Reader",
   "parameter":{
    "column":[ //读取列。
     "id",
     "name"
    ],
    "endpoint":"", //服务地址。
    "index":"",  //索引。
    "password":"",  //密码。
    "scroll":"",  //scroll标志。
    "search":"",  //查询query参数,与Elasticsearch的query内容相同,使用_search api,重命名为search。
    "type":"default",
    "username":""  //用户名。
   },
   "stepType":"elasticsearch"
  },
  {
   "stepType": "elasticsearch",
            "parameter": {
                "column": [ //写入列
                    {
                        "name": "id",
                        "type": "integer"
                    },
                    {
                        "name": "name",
                        "type": "text"
                    }
                ],
                "index": "test",   //写入索引
                 "indexType": "",   //写入索引类型,es7不填
                "actionType": "index",  //写入方式
                "cleanup": false,         //是否重建索引
                "datasource": "test",   //数据源名称
                "primaryKeyInfo": {     //主键取值方式
                    "fieldDelimiterOrigin": ",",
                    "column": [
                        "id"
                    ],
                    "type": "specific",
                    "fieldDelimiter": ","
                },
                "dynamic": false,  //动态映射
                "batchSize": 1024   //批量写文档数
            },
            "name": "Writer",
            "category": "writer"
  }
 ],
 "type":"job",
 "version":"2.0" //版本号。
}高级功能
- 支持全量拉取 - 支持将Elasticsearch中一个文档的所有内容拉取为一个字段。配置详情请参见场景一:全量拉取。 
- 支持提取半结构化到结构化数据 - 分类 - 描述 - 相关文档 - 产生背景 - Elasticsearch中的数据特征为字段不固定,且有中文名、数据使用深层嵌套的形式。为更好地方便下游业务对数据的计算和存储需求,特推出从半结构化到结构化的转换解决方案。 - — - 实现原理 - 将Elasticsearch获取到的JSON数据,利用JSON工具的路径获取特性,将嵌套数据扁平化为一维结构的数据。然后将数据映射至结构化数据表中,拆分Elasticsearch复合结构数据至多个结构化数据表。 - — - 解决方案 - JSON有嵌套的情况,通过path路径来解决。 - 属性 
- 属性.子属性 
- 属性[0].子属性 
 - 附属信息有一对多的情况,需要进行拆表拆行处理,进行遍历。 - 属性[*].子属性 - 数组归并,一个字符串数组内容,归并为一个属性,并进行去重。 - 属性[] - 多属性合一,将多个属性合并为一个属性。 - 属性1,属性2 - 多属性选择处理。 - 属性1|属性2 
创建数据源
在进行数据同步任务开发时,您需要在DataWorks上创建一个对应的数据源,操作流程请参见数据源管理,详细的配置参数解释可在配置界面查看对应参数的文案提示。
数据同步任务开发
数据同步任务的配置入口和通用配置流程可参见下文的配置指导。
单表离线同步任务配置指导
- 脚本模式配置的全量参数和脚本Demo请参见下文的附录一:脚本Demo与参数说明。 
单表实时写同步任务配置指导
操作流程请参见DataStudio侧实时同步任务配置。
整库离线写、单表/整库全增量实时写同步任务配置指导
操作流程请参见整库实时同步任务配置。
附录一:脚本Demo与参数说明
离线任务脚本配置方式
如果您配置离线任务时使用脚本模式的方式进行配置,您需要按照统一的脚本格式要求,在任务脚本中编写相应的参数,详情请参见脚本模式配置,以下为您介绍脚本模式下数据源的参数配置详情。
Reader脚本Demo
{
    "order":{
        "hops":[
            {
                "from":"Reader",
                "to":"Writer"
            }
        ]
    },
    "setting":{
        "errorLimit":{
            "record":"0" //错误记录数。
        },
        "jvmOption":"",
        "speed":{
            "concurrent":3,
            "throttle":false
        }
    },
    "steps":[
        {
            "category":"reader",
            "name":"Reader",
            "parameter":{
                "column":[ //读取列。
                    "id",
                    "name"
                ],
                "endpoint":"http://es-cn-xxx.elasticsearch.aliyuncs.com:9200", //服务地址。
                "index":"aliyun_es_xx",  //索引。
                "password":"*******",  //密码。
                "multiThread":true,
                "scroll":"5m",  //scroll标志。
                "pageSize":5000,
                "connTimeOut":600000,
                "readTimeOut":600000,
                "retryCount":30,
                "retrySleepTime":"10000",
                "search":{
                            "range":{
                                "gmt_modified":{
                                    "gte":0
                                }
                            }
                        },  //查询query参数,与Elasticsearch的query内容相同,使用_search api,重命名为search。
                "type":"doc",
                "username":"aliyun_di"  //用户名。
            },
            "stepType":"elasticsearch"
        },
        {
            "category":"writer",
            "name":"Writer",
            "parameter":{ },
            "stepType":"stream"
        }
    ],
    "type":"job",
    "version":"2.0" //版本号。
}Reader脚本参数
| 参数 | 描述 | 是否必选 | 默认值 | 
| datasource | 数据源名称,脚本模式支持添加数据源,该配置项填写的内容必须与添加的数据源名称保持一致。 | 是 | 无 | 
| index | Elasticsearch中的index名。 | 是 | 无 | 
| type | Elasticsearch中index的type名。 | 否 | index名 | 
| search | Elasticsearch的query参数。 | 是 | 无 | 
| pageSize | 每次读取数据的条数。 | 否 | 100 | 
| scroll | Elasticsearch的分页参数,设置游标存放时间。 
 | 是 | 无 | 
| strictMode | 以严格模式读取Elasticsearch中的数据,当出现Elasticsearch的shard.failed时会停止读取,避免读取到少量数据 | 否 | true | 
| sort | 返回结果的排序字段。 | 否 | 无 | 
| retryCount | 失败后重试的次数。 | 否 | 300 | 
| connTimeOut | 客户端连接超时时间。 | 否 | 600,000 | 
| readTimeOut | 客户端读取超时时间。 | 否 | 600,000 | 
| multiThread | http请求,是否有多线程。 | 否 | true | 
| preemptiveAuth | http是否使用抢先模式请求 | 否 | false | 
| retrySleepTime | 失败后重试的时间间隔。 | 否 | 1000 | 
| discovery | 是否开启节点发现。 
 | 否 | false | 
| compression | 是否使用GZIP压缩请求正文,使用时需要在es节点上启用http.compression设置。 | 否 | false | 
| dateFormat | 待同步字段存在date类型,且该字段mapping没有format配置时,需要配置dateFormat参数。配置形式如下:  | 否 | 无 | 
| full | 是否将全文档内容作为一个字段同步至目标端,将Elasticsearch的查询数据作为一个字段,配置详情请参见场景一:全量拉取。 | 否 | 无 | 
| multi | 该配置是一个高级功能,具有五种用法,两个子属性分别为 | 否 | 无 | 
Writer脚本Demo
{
    "order": {
        "hops": [
            {
                "from": "Reader",
                "to": "Writer"
            }
        ]
    },
    "setting": {
        "errorLimit": {
            "record": "0"
        },
        "speed": {
            "throttle":true,//当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
            "concurrent":1, //作业并发数。
            "mbps":"12"//限流,此处1mbps = 1MB/s。
        }
    },
    "steps": [
        {
            "category": "reader",
            "name": "Reader",
            "parameter": {
            },
            "stepType": "stream"
        },
        {
            "category": "writer",
            "name": "Writer",
            "parameter": {
                "datasource":"xxx",
                "index": "test-1",
                "type": "default",
                "cleanup": true,
                "settings": {
                        "number_of_shards": 1,
                        "number_of_replicas": 0
                },
                "discovery": false,
                "primaryKeyInfo":{
                    "type":"pk",    
                     "fieldDelimiter":",",
                     "column":[]
                    },
                "batchSize": 1000,
                "dynamic":false,
                "esPartitionColumn":[
                    {
                        "name":"col1",  
                        "comment":"xx", 
                        "type":"STRING" 
                        }
                     ],
                "column": [
                    {
                        "name": "pk",
                        "type": "id"
                    },
                    {
                        "name": "col_ip",
                        "type": "ip"
                    },
                    {
                        "name": "col_array",
                        "type": "long",
                        "array": true,
                    },
                    {
                        "name": "col_double",
                        "type": "double"
                    },
                    {
                        "name": "col_long",
                        "type": "long"
                    },
                    {
                        "name": "col_integer",
                        "type": "integer"
                    {
                        "name": "col_keyword",
                        "type": "keyword"
                    },
                    {
                        "name": "col_text",
                        "type": "text",
                        "analyzer": "ik_max_word",
                        "other_params":
                            {
                                "doc_values": false
                            },
                    },
                    {
                        "name": "col_geo_point",
                        "type": "geo_point"
                    },
                    {
                        "name": "col_date",
                        "type": "date",
                        "format": "yyyy-MM-dd HH:mm:ss"
                    },
                    {
                        "name": "col_nested1",
                        "type": "nested"
                    },
                    {
                        "name": "col_nested2",
                        "type": "nested"
                    },
                    {
                        "name": "col_object1",
                        "type": "object"
                    },
                    {
                        "name": "col_object2",
                        "type": "object"
                    },
                    {
                        "name": "col_integer_array",
                        "type": "integer",
                        "array": true
                    },
                    {
                        "name": "col_geo_shape",
                        "type": "geo_shape",
                        "tree": "quadtree",
                        "precision": "10m"
                    }
                ]
            },
            "stepType": "elasticsearch"
        }
    ],
    "type": "job",
    "version": "2.0"
}VPC环境的Elasticsearch运行在默认资源组会存在网络不通的情况。您需要使用Serverless资源组(推荐)和独享数据集成资源组,才能连通VPC进行数据同步。添加资源的详情请参见Serverless资源组。
Writer脚本参数
| 参数 | 描述 | 是否必选 | 默认值 | 
| datasource | 选择需要同步的Elasticsearch数据源,若还未在DataWorks创建该数据源,请先创建,详情请参见配置Elasticsearch数据源。 | 是 | 无 | 
| index | Elasticsearch中的index名。 | 是 | 无 | 
| indexType | Elasticsearch中index的type名。 | 否 | Elasticsearch | 
| cleanup | 定义当前任务在索引index已存在的情况是否要删除数据。 
 | 否 | false | 
| batchSize | 定义同步任务一次性插入ElasticSearch的Document条数。 | 否 | 1,000 | 
| trySize | 定义往ElasticSearch写入数据失败后的重试次数。 | 否 | 30 | 
| timeout | 客户端超时时间。 | 否 | 600,000 | 
| discovery | 任务是否启动节点发现功能。 
 | 否 | false | 
| compression | HTTP请求,开启压缩。 | 否 | true | 
| multiThread | HTTP请求,是否有多线程。 | 否 | true | 
| ignoreWriteError | 忽略写入错误,不重试,继续写入。 | 否 | false | 
| ignoreParseError | 忽略解析数据格式错误,继续写入。 | 否 | true | 
| alias | Elasticsearch的别名类似于数据库的视图机制,为索引my_index创建一个别名my_index_alias,对my_index_alias的操作与my_index的操作一致。 配置alias表示在数据导入完成后,为指定的索引创建别名。 | 否 | 无 | 
| aliasMode | 数据在导入完成后增加别名的模式,包括append(增加模式)和exclusive(只留这一个): 
 后续会转换别名为实际的索引名称,别名可以用来进行索引迁移和多个索引的查询统一,并可以用来实现视图的功能。 | 否 | append | 
| settings | 创建index时的settings,与Elasticsearch官方一致。 | 否 | 无 | 
| column | column用来配置文档的多个字段Filed信息,具体每个字段项可以配置name(名称)、type(类型)等基础配置,以及Analyzer、Format和Array等扩展配置。 Elasticsearch所支持的字段类型如下所示。 列类型的说明如下: 
 如果需要在column中配置除了type以外的属性值,您可以使用other_params参数,该参数配置在column中,在update mappings时,用于描述column中除了type以外的Elasticsearch属性信息。 如果您希望源端数据写入为Elasticsearch时按照数组类型写入,您可按照JSON格式或指定分隔符的方式来解析源端数据。配置详情请参见附录二:ElasticSearch写入的格式期望是数组类型。 | 是 | 无 | 
| dynamic | 定义当在文档中发现不存在的字段时,同步任务是否通过Elasticsearch动态映射机制为字段添加映射。 
 Elasticsearch 7.x版本的默认type为_doc。使用Elasticsearch的自动mappings时,请配置_doc和esVersion为7。 您需要转换为脚本模式,添加一个版本参数: 重要  如果遇到字段映射相关报错,可开启此参数尝试解决,但此方法可能导致字段类型与预期不符或数据异常,请根据数据结构评估风险后再决定是否启用。 | 否 | false | 
| actionType | 表示Elasticsearch在数据写出时的action类型,目前数据集成支持index和update两种actionType,默认值为index: 
 | 否 | index | 
| primaryKeyInfo | 定义当前写入ElasticSearch的主键取值方式。 
 | 是 | specific | 
| esPartitionColumn | 定义写入ElasticSearch时是否开启分区,用于修改ElasticSearch中的routing的参数。 
 | 否 | false | 
| enableWriteNull | 该参数用于是否支持将来源端的空值字段同步至Elasticsearch。取值如下: 
 | 否 | true | 
附录二:ElasticSearch写入的格式期望是数组类型
支持以下两种方式将源端数据按照数组类型写入ElasticSearch。
- 按JSON格式解析源端数据 - 例如:源端数据为 - "[1,2,3,4,5]",配置json_array=true对其进行解析,同步将以数组格式写入ElasticSearch。- "parameter" : { { "name":"docs_1", "type":"keyword", "json_array":true } }
- 按分隔符解析源端数据 - 例如:源端数据为 - "1,2,3,4,5",配置分隔符splitter=","对其进行解析,同步将以数组格式写入ElasticSearch。说明- 一个任务仅支持配置一种分隔符,splitter全局唯一,不支持多array字段配置为不同的分隔符。例如源端字段列 - col1="1,2,3,4,5",- col2="6-7-8-9-10", splitter无法针对每列单独配置使用。- "parameter" : { "column": [ { "name": "docs_2", "array": true, "type": "long" } ], "splitter":","//注意:splitter配置与column配置同级。 }
附录三:场景示例
场景一:全量拉取
- 背景说明:将Elasticsearch中文档查询的结果拉取为一个字段。 
- 配置示例: - ## 读端:Elasticsearch中的原始数据 "hits": [ { "_index": "mutiltest_1", "_type": "_doc", "_id": "IXgdO4MB4GR_1DmrjTXP", "_score": 1.0, "_source": { "feature1": "value1", "feature2": "value2", "feature3": "value3" } }] ##数据集成Elasticsearch Reader插件配置 "parameter": { "column": [ "content" ], "full":true } ##写端结果:同步至目标端1行1列 {"_index":"mutiltest_1","_type":"_doc","_id":"IXgdO4MB4GR_1DmrjTXP","_source":{"feature1":"value1","feature2":"value2","feature3":"value3"},"sort":["IXgdO4MB4GR_1DmrjTXP"]}
场景二:嵌套或对象字段属性同步
- 背景说明:Object对象或nested嵌套字段的属性时,通过path路径来解决。 
- 配置形式: - 属性 
- 属性.子属性 
- 属性[0].子属性 
 
- 脚本配置: - "multi":{ "multi":true }说明- 向导模式暂不支持配置。 
- 配置示例: - ## 读端:Elasticsearch中的原始数据 "hits": [ { "_index": "mutiltest_1", "_type": "_doc", "_id": "7XAOOoMB4GR_1Dmrrust", "_score": 1.0, "_source": { "level1": { "level2": [ { "level3": "testlevel3_1" }, { "level3": "testlevel3_2" } ] } } } ] ##数据集成Elasticsearch reader插件配置 "parameter": { "column": [ "level1", "level1.level2", "level1.level2[0]", "level1.level2.level3" ], "multi":{ "multi":true } } ##写端结果:1行数据4列 column1(level1): {"level2":[{"level3":"testlevel3_1"},{"level3":"testlevel3_2"}]} column2(level1.level2): [{"level3":"testlevel3_1"},{"level3":"testlevel3_2"}] column3(level1.level2[0]): {"level3":"testlevel3_1"} column4(level1.level2.level3): null说明- 获取的节点上层有数组时结果为null,如上样例获取level1.level2.level3不会报错,但同步结果为null,需要配置为level1.level2[0].level3或level1.level2[1].level3,当前不支持level1.level2[*].level3。 
- 不支持key出现"."的数据,如"level1.level2":{"level3":"testlevel3_1"},此时该条数据获取结果为null。 
 
场景三:数组属性拆分为多行
- 背景说明:附属信息有一对多的情况,需要将数组列拆成多行。 
- 配置形式:属性[*].子属性 
- 效果示意:源端数据{ "splitKey" :[1,2,3,4,5]},拆完后写到目标端为5行:{"splitKey[0]":1,"splitKey[1]":2,"splitKey[2]":3,"splitKey[3]":4,"splitKey[4]":5} 
- 脚本配置: - "multi":{ "multi":true, "key": "headers" }说明- 向导模式下配置拆多行数组列名,会自动生成脚本配置,具有相同效果。 
- value必须为List,否则会报错。 
 
- 配置示例: - ## 读端:Elasticsearch中的原始数据 [ { "_index": "lmtestjson", "_type": "_doc", "_id": "nhxmIYMBKDL4VkVLyXRN", "_score": 1.0, "_source": { "headers": [ { "remoteip": "192.0.2.1" }, { "remoteip": "192.0.2.2" } ] } }, { "_index": "lmtestjson", "_type": "_doc", "_id": "wRxsIYMBKDL4VkVLcXqf", "_score": 1.0, "_source": { "headers": [ { "remoteip": "192.0.2.3" }, { "remoteip": "192.0.2.4" } ] } } ] ##数据集成Elasticsearch reader插件配置 { "column":[ "headers[*].remoteip" ] "multi":{ "multi":true, "key": "headers" } } ##写端结果:4行 192.0.2.1 192.0.2.2 192.0.2.3 192.0.2.4
场景四:数组属性去重归并
- 背景说明:数组去重归并,将一个数组属性去重归并后写入为字符串属性,数组属性可以为子属性如name1.name2,去重采用tostring结果作为标准。 
- 配置形式:属性[]。 - column里面带有 [] 关键字就会被认为对该属性做去重归并。 
- 脚本配置: - "multi":{ "multi":true }说明- 向导模式暂不支持配置。 
- 配置示例: - ## 读端:Elasticsearch中的原始数据 "hits": [ { "_index": "mutiltest_1", "_type": "_doc", "_id": "4nbUOoMB4GR_1Dmryj8O", "_score": 1.0, "_source": { "feature1": [ "value1", "value1", "value2", "value2", "value3" ] } } ] ##数据集成Elasticsearch reader插件配置 "parameter": { "column":[ "feature1[]" ], "multi":{ "multi":true } } ##写端结果:1行1列数据 "value1,value2,value3"
场景五:多属性合一同步
- 背景说明:多属性选择处理,返回第一个有值的属性;如果都不存在时将写入null。 
- 配置形式:属性1|属性2|... - column里面带有 "|"关键字就会对该项做多属性选择。 
- 脚本配置: - "multi":{ "multi":true }说明- 向导模式暂不支持该配置。 
- 配置示例: - ##读端:Elasticsearch中的原始数据 "hits": [ { "_index": "mutiltest_1", "_type": "_doc", "_id": "v3ShOoMB4GR_1DmrZN22", "_score": 1.0, "_source": { "feature1": "feature1", "feature2": [ 1, 2, 3 ], "feature3": { "child": "feature3" } } }] ##数据集成Elasticsearch reade插件配置 "parameter": { "column":[ "feature1|feature2|feature3" ], "multi":{ "multi":true } } ##写端结果:1行1列数据 "feature1"
场景六:多属性选择同步
- 背景说明:多属性选择处理,返回第一个有值的属性,若都不存在时写入null。 
- 配置形式:属性1|属性2|... - column里面带有 "|"关键字就会对该项做多属性选择 
- 脚本配置: - "multi":{ "multi":true }说明- 向导模式暂不支持该配置。 
- 配置示例: - ##读端:Elasticsearch中的原始数据 "hits": [ { "_index": "mutiltest_1", "_type": "_doc", "_id": "v3ShOoMB4GR_1DmrZN22", "_score": 1.0, "_source": { "feature1": "feature1", "feature2": [ 1, 2, 3 ], "feature3": { "child": "feature3" } } }] ##数据集成Elasticsearch reader插件配置 "parameter": { "column":[ "feature1,feature2,feature3" ], "multi":{ "multi":true } } ##写端结果:1行1列数据 "feature1,[1,2,3],{"child":"feature3"}"
相关文档
数据集成支持其他更多数据源接入,更多信息,请参见支持的数据源及同步方案。