本文为您介绍Elasticsearch Reader的工作原理、功能和参数。

工作原理

Elasticsearch Reader仅支持Elasticsearch5.x版本,其工作原理如下:
  • 通过Elasticsearch的_search+scroll+slice(即游标+分片)方式实现,slice结合数据集成任务的task多线程分片机制使用。
  • 根据Elasticsearch中的Mapping配置,转换数据类型。

更多详情请参见Elasticsearch官方文档

基本配置

{
    "order":{
        "hops":[
            {
                "from":"Reader",
                "to":"Writer"
            }
        ]
    },
    "setting":{
        "errorLimit":{
            "record":"0" //错误记录数。
        },
        "jvmOption":"",
        "speed":{
            "concurrent":3,
            "throttle":false
        }
    },
    "steps":[
        {
            "category":"reader",
            "name":"Reader",
            "parameter":{
                "column":[ //读取列。
                    "id",
                    "name"
                ],
                "endpoint":"", //服务地址。
                "index":"",  //索引。
                "password":"",  //密码。
                "scroll":"",  //scroll标志。
                "search":"",  //查询query参数,与Elasticsearch的query内容相同,使用_search api,重命名为search。
                "type":"default",
                "username":""  //用户名。
            },
            "stepType":"elasticsearch"
        },
        {
            "category":"writer",
            "name":"Writer",
            "parameter":{ },
            "stepType":"stream"
        }
    ],
    "type":"job",
    "version":"2.0" //版本号。
}

高级功能

  • 支持全量拉取

    支持将Elasticsearch中一个文档的所有内容拉取为一个字段。

  • 支持提取半结构化到结构化数据
    分类 描述
    产生背景 Elasticsearch中的数据特征为字段不固定,且有中文名、数据使用深层嵌套的形式。为更好地方便下游业务对数据的计算和存储需求,特推出从半结构化到结构化的转换解决方案。
    实现原理 将Elasticsearch获取到的JSON数据,利用JSON工具的路径获取特性,将嵌套数据扁平化为一维结构的数据。然后将数据映射至结构化数据表中,拆分Elasticsearch复合结构数据至多个结构化数据表。
    解决方案
    • JSON有嵌套的情况,通过path路径来解决:
      • 属性
      • 属性.子属性
      • 属性[0].子属性
    • 附属信息有一对多的情况,需要进行拆表拆行处理,进行遍历。

      属性[*].子属性

    • 数组归并,一个字符串数组内容,归并为一个属性,并进行去重。

      属性[] 去重

    • 多属性合一,将多个属性合并为一个属性。

      属性1,属性2

    • 多属性选择处理

      属性1|属性2

参数说明

参数 描述 是否必选 默认值
endpoint Elasticsearch的连接地址。
username http auth中的username
password http auth中的password
index Elasticsearch中的index名。
type Elasticsearch中indextype名。 index名
pageSize 每次读取数据的条数。 100
search Elasticsearch的query参数。
scroll Elasticsearch的分页参数,设置游标存放时间。
sort 返回结果的排序字段。
retryCount 失败后重试的次数。 300
connTimeOut 客户端连接超时时间。 600,000
readTimeOut 客户端读取超时时间。 600,000
multiThread http请求,是否有多线程。 true
column Elasticsearch所支持的字段类型。

脚本开发介绍

配置一个从Elasticsearch读取数据的JSON示例,使用脚本开发的详情请参见通过脚本模式配置任务
{
    "order":{
        "hops":[
            {
                "from":"Reader",
                "to":"Writer"
            }
        ]
    },
    "setting":{
        "errorLimit":{
            "record":"0" //错误记录数。
        },
        "jvmOption":"",
        "speed":{
            "concurrent":3,
            "throttle":false
        }
    },
    "steps":[
        {
            "category":"reader",
            "name":"Reader",
            "parameter":{
                "column":[ //读取列。
                    "id",
                    "name"
                ],
                "endpoint":"http://es-cn-xxx.elasticsearch.aliyuncs.com:9200", //服务地址。
                "index":"aliyun_es_xx",  //索引。
                "password":"*******",  //密码。
                "multiThread":true,
                "scroll":"5m",  //scroll标志。
                "pageSize":5000,
                "connTimeOut":600000,
                "readTimeOut":600000,
                "retryCount":30,
                "retrySleepTime":"10000",
                "search":{
                            "range":{
                                "gmt_modified":{
                                    "gte":0
                                }
                            }
                        },  //查询query参数,与Elasticsearch的query内容相同,使用_search api,重命名为search。
                "type":"doc",
                "username":"aliyun_di"  //用户名。
            },
            "stepType":"elasticsearch"
        },
        {
            "category":"writer",
            "name":"Writer",
            "parameter":{ },
            "stepType":"stream"
        }
    ],
    "type":"job",
    "version":"2.0" //版本号。
}