本文为您介绍Elasticsearch Reader的工作原理、功能和参数。

使用限制

  • DataWorks平台目前仅支持配置阿里云Elasticsearch5.x、6.x、7.x版本数据源,不支持配置自建Elasticsearch数据源。
  • 不支持同步scaled_float类型的字段。

工作原理

Elasticsearch在公共资源组上支持Elasticsearch5.x版本,在独享数据集成资源组上支持Elasticsearch5.x、6.x和7.x版本。独享数据集成资源组的详情请参见新增和使用独享数据集成资源组

Elasticsearch Reader的工作原理如下:
  • 通过Elasticsearch的_searchscrollslice(即游标分片)方式实现,slice结合数据集成任务的task多线程分片机制使用。
  • 根据Elasticsearch中的Mapping配置,转换数据类型。

更多详情请参见Elasticsearch官方文档

基本配置

注意 实际运行时,请删除下述代码中的注释。
{
    "order":{
        "hops":[
            {
                "from":"Reader",
                "to":"Writer"
            }
        ]
    },
    "setting":{
        "errorLimit":{
            "record":"0" //错误记录数。
        },
        "jvmOption":"",
        "speed":{
            "concurrent":3,//并发数
            "throttle":true,//
                     "mbps":"12",//限流
        }
    },
    "steps":[
        {
            "category":"reader",
            "name":"Reader",
            "parameter":{
                "column":[ //读取列。
                    "id",
                    "name"
                ],
                "endpoint":"", //服务地址。
                "index":"",  //索引。
                "password":"",  //密码。
                "scroll":"",  //scroll标志。
                "search":"",  //查询query参数,与Elasticsearch的query内容相同,使用_search api,重命名为search。
                "type":"default",
                "username":""  //用户名。
            },
            "stepType":"elasticsearch"
        },
        {
            "category":"writer",
            "name":"Writer",
            "parameter":{ },
            "stepType":"stream"
        }
    ],
    "type":"job",
    "version":"2.0" //版本号。
}

高级功能

  • 支持全量拉取

    支持将Elasticsearch中一个文档的所有内容拉取为一个字段。

  • 支持提取半结构化到结构化数据
    分类 描述
    产生背景 Elasticsearch中的数据特征为字段不固定,且有中文名、数据使用深层嵌套的形式。为更好地方便下游业务对数据的计算和存储需求,特推出从半结构化到结构化的转换解决方案。
    实现原理 将Elasticsearch获取到的JSON数据,利用JSON工具的路径获取特性,将嵌套数据扁平化为一维结构的数据。然后将数据映射至结构化数据表中,拆分Elasticsearch复合结构数据至多个结构化数据表。
    解决方案
    • JSON有嵌套的情况,通过path路径来解决:
      • 属性
      • 属性.子属性
      • 属性[0].子属性
    • 附属信息有一对多的情况,需要进行拆表拆行处理,进行遍历。

      属性[*].子属性

    • 数组归并,一个字符串数组内容,归并为一个属性,并进行去重。

      属性[] 去重

    • 多属性合一,将多个属性合并为一个属性。

      属性1,属性2

    • 多属性选择处理

      属性1|属性2

参数说明

参数 描述 是否必选 默认值
datasource 数据源名称,脚本模式支持添加数据源,该配置项填写的内容必须与添加的数据源名称保持一致。
index Elasticsearch中的index名。
type Elasticsearch中indextype名。 index名
search Elasticsearch的query参数。
pageSize 每次读取数据的条数。 100
scroll Elasticsearch的分页参数,设置游标存放时间。
  • 设置的过小时,如果获取两页数据间隔时间超出scroll,会导致游标过期,进而丢失数据。
  • 设置的过大时,如果同一时刻发起的查询过多,超出服务端max_open_scroll_context配置时,会导致数据查询报错。
sort 返回结果的排序字段。
retryCount 失败后重试的次数。 300
connTimeOut 客户端连接超时时间。 600,000
readTimeOut 客户端读取超时时间。 600,000
multiThread http请求,是否有多线程。 true

向导开发介绍

打开新建的数据同步节点,即可进行同步任务的配置,详情请参见通过向导模式配置离线同步任务

您需要在数据同步任务的编辑页面进行以下配置:
  1. 选择数据源。
    配置同步任务的数据来源数据去向ES
    参数 描述
    数据源 通常填写您配置的数据源名称。
    索引 Elasticsearch中的index名。
    检索查询条件 Elasticsearch的query参数。
    分页大小 每次读取数据的条数,默认为100。
    游标时间 分页参数,设置游标存放时间。
    高级配置 高级配置包括以下内容:
    • 排序方式:返回结果的排序字段。
    • 全文作为一列:是否将Elasticsearch的数据拉取为一个字段。

      例如,Elasticsearch Reader需要读取Elasticsearch的所有数据作为一列同步至MaxCompute,则需要设置全文作为一列。设置Elasticsearch Reader中的column为contentcontenthits[]中的一行信息_source全内容。

      说明 _id属性为Elasticsearch数据的固有属性,目前无法通过数据集成同步任务单独抽取并写入目的端。您可以将Elasticsearch Reader中全文作为一列参数配置为(即JSON脚本中full参数配置为true),将Elasticsearch中的每个数据都作为一个字段同步到目的端,然后在目的端使用get_json_object函数或其他JSON处理函数,将_id值单独取出来做后续处理。
    • 拆多行数组列名:是否将数组进行列拆多行的处理,需要辅助设置子属性。
  2. 字段映射,即上述参数说明中的column
    左侧的源头表字段和右侧的目标表字段为一一对应关系。单击添加一行可以增加单个字段,鼠标放至需要删除的字段上,即可单击删除图标进行删除 。
    说明 来源或目标端有Lindom、HBase、Tair、Elasticsearch数据源,字段无需连线,直接编辑即可保存。
    字段映射
    参数 描述
    同名映射 单击同名映射,可以根据名称建立相应的映射关系,请注意匹配数据类型。
    同行映射 单击同行映射,可以在同行建立相应的映射关系,请注意匹配数据类型。
    取消映射 单击取消映射,可以取消建立的映射关系。
    自动排版 可以根据相应的规律自动排版。
    添加一行 单击添加一行,您可以输入以下类型的字段:
    • 可以配合调度参数使用,例如${bizdate}等。
    • 可以输入关系数据库支持的函数,例如now()count(1)等。
    • 如果您输入的值无法解析,则类型显示为未识别。
  3. 通道控制。通道配置
    参数 描述
    任务期望最大并发数 数据同步任务内,可以从源并行读取或并行写入数据存储端的最大线程数。向导模式通过界面化配置并发数,指定任务所使用的并行度。
    同步速率 设置同步速率可以保护读取端数据库,以避免抽取速度过大,给源库造成太大的压力。同步速率建议限流,结合源库的配置,请合理配置抽取速率。
    错误记录数 错误记录数,表示脏数据的最大容忍条数。
    分布式处理能力

    数据同步时,可以将任务切片分散到多台执行节点上并发执行,提高同步速率。该模式下,配置较大任务并发数会增加数据存储访问压力,如需使用该功能,请提前评估数据存储的访问负载。该功能仅支持在独享数据集成资源组配置,详情请参见独享数据集成资源组概述新增和使用独享数据集成资源组

脚本开发介绍

配置一个从Elasticsearch读取数据的JSON示例,使用脚本开发的详情请参见通过脚本模式配置任务
注意 实际运行时,请删除下述代码中的注释。
{
    "order":{
        "hops":[
            {
                "from":"Reader",
                "to":"Writer"
            }
        ]
    },
    "setting":{
        "errorLimit":{
            "record":"0" //错误记录数。
        },
        "jvmOption":"",
        "speed":{
            "concurrent":3,
            "throttle":false
        }
    },
    "steps":[
        {
            "category":"reader",
            "name":"Reader",
            "parameter":{
                "column":[ //读取列。
                    "id",
                    "name"
                ],
                "endpoint":"http://es-cn-xxx.elasticsearch.aliyuncs.com:9200", //服务地址。
                "index":"aliyun_es_xx",  //索引。
                "password":"*******",  //密码。
                "multiThread":true,
                "scroll":"5m",  //scroll标志。
                "pageSize":5000,
                "connTimeOut":600000,
                "readTimeOut":600000,
                "retryCount":30,
                "retrySleepTime":"10000",
                "search":{
                            "range":{
                                "gmt_modified":{
                                    "gte":0
                                }
                            }
                        },  //查询query参数,与Elasticsearch的query内容相同,使用_search api,重命名为search。
                "type":"doc",
                "username":"aliyun_di"  //用户名。
            },
            "stepType":"elasticsearch"
        },
        {
            "category":"writer",
            "name":"Writer",
            "parameter":{ },
            "stepType":"stream"
        }
    ],
    "type":"job",
    "version":"2.0" //版本号。
}

配置数据集成资源组

  1. 单击数据同步任务编辑页面右侧的数据集成资源组配置
  2. 根据提示选择对应的独享数据集成资源组
    独享数据集成资源组-zh
    说明
    • (推荐)数据集成资源组配置页面默认支持选择独享数据集成资源组,为确保数据同步的稳定性和性能要求,推荐使用独享数据集成资源组。
    • 如果您需要选择公共资源组,请在页面右下方单击更多选项,在弹出的警告对话框单击确认,在数据集成资源组配置子页面进行选择。关于自定义数据集成资源组和公共资源组,详情请参见公共资源组