本文为您介绍Elasticsearch Writer支持的数据类型、写入方式、字段映射和数据源等参数及配置示例。
支持的Elasticsearch版本
支持的字段类型
类型 | 离线读(Elasticsearch Reader) | 离线写(Elasticsearch Writer) | 实时写 |
---|---|---|---|
binary | 支持 | 支持 | 支持 |
boolean | 支持 | 支持 | 支持 |
keyword | 支持 | 支持 | 支持 |
constant_keyword | 不支持 | 不支持 | 不支持 |
wildcard | 不支持 | 不支持 | 不支持 |
long | 支持 | 支持 | 支持 |
integer | 支持 | 支持 | 支持 |
short | 支持 | 支持 | 支持 |
byte | 支持 | 支持 | 支持 |
double | 支持 | 支持 | 支持 |
float | 支持 | 支持 | 支持 |
half_float | 不支持 | 不支持 | 不支持 |
scaled_float | 不支持 | 不支持 | 不支持 |
unsigned_long | 不支持 | 不支持 | 不支持 |
date | 支持 | 支持 | 支持 |
date_nanos | 不支持 | 不支持 | 不支持 |
alias | 不支持 | 不支持 | 不支持 |
object | 支持 | 支持 | 支持 |
flattened | 不支持 | 不支持 | 不支持 |
nested | 支持 | 支持 | 支持 |
join | 不支持 | 不支持 | 不支持 |
integer_range | 支持 | 支持 | 支持 |
float_range | 支持 | 支持 | 支持 |
long_range | 支持 | 支持 | 支持 |
double_range | 支持 | 支持 | 支持 |
date_range | 支持 | 支持 | 支持 |
ip_range | 不支持 | 支持 | 支持 |
ip | 支持 | 支持 | 支持 |
version | 支持 | 支持 | 支持 |
murmur3 | 不支持 | 不支持 | 不支持 |
aggregate_metric_double | 不支持 | 不支持 | 不支持 |
histogram | 不支持 | 不支持 | 不支持 |
text | 支持 | 支持 | 支持 |
annotated-text | 不支持 | 不支持 | 不支持 |
completion | 支持 | 不支持 | 不支持 |
search_as_you_type | 不支持 | 不支持 | 不支持 |
token_count | 支持 | 不支持 | 不支持 |
dense_vector | 不支持 | 不支持 | 不支持 |
rank_feature | 不支持 | 不支持 | 不支持 |
rank_features | 不支持 | 不支持 | 不支持 |
geo_point | 支持 | 支持 | 支持 |
geo_shape | 支持 | 支持 | 支持 |
point | 不支持 | 不支持 | 不支持 |
shape | 不支持 | 不支持 | 不支持 |
percolator | 不支持 | 不支持 | 不支持 |
string | 支持 | 支持 | 支持 |
背景信息
Elasticsearch在公共资源组上支持Elasticsearch5.x版本,在独享数据集成资源组上支持Elasticsearch5.x、6.x和7.x版本。独享数据集成资源组的详情请参见新增和使用独享数据集成资源组。
Relational DB(实例)-> Databases(数据库)-> Tables(表)-> Rows(一行数据)-> Columns(一行数据的一列)
Elasticsearch -> Index -> Types -> Documents -> Fields
Elasticsearch中可以有多个索引或数据库,每个索引可以包括多个类型或表,每个类型可以包括多个文档或行,每个文档可以包括多个字段或列。Elasticsearch Writer插件使用Elasticsearch的Rest API接口,批量把从Reader读入的数据写入Elasticsearch中。
参数说明
参数 | 描述 | 是否必选 | 默认值 |
---|---|---|---|
datasource | 选择需要同步的Elasticsearch数据源,若还未在DataWorks创建该数据源,请先创建,详情请参见配置Elasticsearch数据源。 | 是 | 无 |
index | Elasticsearch中的index名。 | 是 | 无 |
indexType | Elasticsearch中index的type名。 | 否 | Elasticsearch |
cleanup | 定义当前任务在索引index已存在的情况是否要删除数据。
| 否 | false |
batchSize | 定义同步任务一次性插入ElasticSearch的Document条数。 | 否 | 1,000 |
trySize | 定义往ElasticSearch写入数据失败后的重试次数。 | 否 | 30 |
timeout | 客户端超时时间。 | 否 | 600,000 |
discovery | 任务是否启动节点发现功能。
| 否 | false |
compression | HTTP请求,开启压缩。 | 否 | true |
multiThread | HTTP请求,是否有多线程。 | 否 | true |
ignoreWriteError | 忽略写入错误,不重试,继续写入。 | 否 | false |
ignoreParseError | 忽略解析数据格式错误,继续写入。 | 否 | true |
alias | Elasticsearch的别名类似于数据库的视图机制,为索引my_index创建一个别名my_index_alias,对my_index_alias的操作与my_index的操作一致。 配置alias表示在数据导入完成后,为指定的索引创建别名。 | 否 | 无 |
aliasMode | 数据导入完成后增加别名的模式,包括append(增加模式)和exclusive(只留这一个):
后续会转换别名为实际的索引名称,别名可以用来进行索引迁移和多个索引的查询统一,并可以用来实现视图的功能。 | 否 | append |
settings | 创建index时的settings,与Elasticsearch官方一致。 | 否 | 无 |
column | column用来配置文档的多个字段Filed信息,具体每个字段项可以配置name(名称)、type(类型)等基础配置,以及Analyzer、Format和Array等扩展配置。 Elasticsearch所支持的字段类型如下所示。
列类型的说明如下:
如果需要在column中配置除了type以外的属性值,您可以使用other_params参数,该参数配置在column中,在update mappings时,用于描述column中除了type以外的Elasticsearch属性信息。
如果您希望源端数据写入为Elasticsearch时按照数组类型写入,您可按照json格式或指定分隔符的方式来解析源端数据。配置详情请参见附录:ElasticSearch写入的格式期望是数组类型。 | 是 | 无 |
dynamic | 定义当在文档中发现未存在的字段时,同步任务是否通过Elasticsearch动态映射机制为字段添加映射。
Elasticsearch 7.x版本的默认type为_doc。使用Elasticsearch的自动mappings时,请配置_doc和esVersion为7。 您需要转换为脚本模式,添加一个版本参数: | 否 | false |
actionType | 表示Elasticsearch在数据写出时的action类型,目前数据集成支持index和update两种actionType,默认值为index:
| 否 | index |
primaryKeyInfo | 定义当前写入ElasticSearch的主键取值方式。
| 是 | specific |
esPartitionColumn | 定义写入ElasticSearch时是否开启分区,用于修改ElasticSearch中的routing的参数。
| 否 | false |
enableWriteNull | 该参数用于是否支持将来源端的空值字段同步至Elasticsearch。取值如下:
| 否 | false |
脚本开发介绍
通过脚本模式开发的详情请参见通过脚本模式配置离线同步任务。
{
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
},
"setting": {
"errorLimit": {
"record": "0"
},
"speed": {
"throttle":true,//当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
"concurrent":1, //作业并发数。
"mbps":"12"//限流
}
},
"steps": [
{
"category": "reader",
"name": "Reader",
"parameter": {
},
"stepType": "stream"
},
{
"category": "writer",
"name": "Writer",
"parameter": {
"datasource": "xxx",
"index": "test-1",
"type": "default",
"cleanup": true,
"settings": {
"number_of_shards": 1,
"number_of_replicas": 0
},
"discovery": false,
"primaryKeyInfo": {
"type": "pk",
"fieldDelimiter": ",",
"column": []
},
"batchSize": 1000,
"dynamic": false,
"esPartitionColumn": [
{
"name": "col1",
"comment": "xx",
"type": "STRING"
}
],
"column": [
{
"name": "pk",
"type": "id"
},
{
"name": "col_ip",
"type": "ip"
},
{
"name": "col_array",
"type": "long",
"array": true,
},
{
"name": "col_double",
"type": "double"
},
{
"name": "col_long",
"type": "long"
},
{
"name": "col_integer",
"type": "integer"
{
"name": "col_keyword",
"type": "keyword"
},
{
"name": "col_text",
"type": "text",
"analyzer": "ik_max_word",
"other_params":
{
"doc_values": false
},
},
{
"name": "col_geo_point",
"type": "geo_point"
},
{
"name": "col_date",
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss"
},
{
"name": "col_nested1",
"type": "nested"
},
{
"name": "col_nested2",
"type": "nested"
},
{
"name": "col_object1",
"type": "object"
},
{
"name": "col_object2",
"type": "object"
},
{
"name": "col_integer_array",
"type": "integer",
"array": true
},
{
"name": "col_geo_shape",
"type": "geo_shape",
"tree": "quadtree",
"precision": "10m"
}
]
},
"stepType": "elasticsearch"
}
],
"type": "job",
"version": "2.0"
}
向导模式开发介绍
打开新建的数据同步节点,即可进行同步任务的配置,通用配置详情请参见通过向导模式配置离线同步任务,本文为您介绍向导模式配置数据同步至Elasticsearch。
- 选择数据源。配置同步任务的数据来源和数据去向。
参数 描述 数据源 即上述参数说明中的datasource。 索引 即上述参数说明中的index。 是否删除原索引 即上述参数说明中的cleanup。 写入类型 即上述参数说明中的ActionType,支持两种写入类型:插入(index)、更新(update)。 使用ElasticSearch的自动mappings 即上述参数说明中的dynamic。 主键取值方式 即上述参数说明中的primaryKeyInfo。 批量插入条数 即上述参数说明中的batchSize。 开启分区 即上述参数说明中的esPartitionColumn。 启用节点发现 即上述参数说明中的discovery。 Settings 即上述参数说明中的settings。 - 字段映射,即上述参数说明中的column。左侧的源头表字段和右侧的目标表字段为一一对应的关系。
- 通道控制
参数 描述 任务期望最大并发数 数据同步任务内,可以从源并行读取或并行写入数据存储端的最大线程数。向导模式通过界面化配置并发数,指定任务所使用的并行度。 同步速率 设置同步速率可以保护读取端数据库,以避免抽取速度过大,给源库造成太大的压力。同步速率建议限流,结合源库的配置,请合理配置抽取速率。 错误记录数 错误记录数,表示脏数据的最大容忍条数。 分布式处理能力 数据同步时,可以将任务切片分散到多台执行节点上并发执行,提高同步速率。该模式下,配置较大任务并发数会增加数据存储访问压力,如需使用该功能,请提前评估数据存储的访问负载。该功能仅支持在独享数据集成资源组配置,详情请参见独享数据集成资源组和新增和使用独享数据集成资源组。
附录:ElasticSearch写入的格式期望是数组类型
支持以下两种方式将源端数据按照数组类型写入ElasticSearch。
- 按json格式解析源端数据
例如:源端数据为
"[1,2,3,4,5]"
,配置json_array=true对其进行解析,同步将以数组格式写入ElasticSearch。"parameter" : { { "name":"docs_1", "type":"keyword", "json_array":true } }
- 按分隔符解析源端数据
例如:源端数据为
"1,2,3,4,5"
, 配置分隔符splitter=","对其进行解析,同步将以数组格式写入ElasticSearch。说明 一个任务仅支持配置一种分隔符,splitter全局唯一,不支持多array字段配置为不同的分隔符。例如源端字段列col1="1,2,3,4,5" , col2="6-7-8-9-10", splitter无法针对每列单独配置使用。"parameter" : { "column": [ { "name": "docs_2", "array": true, "type": "long" } ], "splitter":","//注意:splitter配置与column配置同级。 }