全部产品
云市场

流数据同步DataConnector

更新时间:2019-03-06 10:06:14

流式数据同步DataConnector

DataHub DataConnector是把DataHub服务中的流式数据同步到其他云产品中的功能,目前支持将Topic中的数据实时/准实时同步到MaxCompute(ODPS)、OSS、ElasticSearch、RDS Mysql、ADS、TableStore中。用户只需要向DataHub中写入一次数据,并在DataHub服务中配置好同步功能,便可以在各个云产品中使用这份数据。

注意事项:

  1. 目前所有DataConnector均仅支持同一Region的云服务之间同步数据,不支持同步数据到跨Region的服务。
  2. DataConnector所配置的目标服务Endpoint需要填写相应的内网域名(经典网络),不支持使用公网域名同步。
  3. 数据同步目前仅支持at least once语义,在网络服务异常等小概率场景下可能会导致目的端的数据产生重复,需要做去重处理。

DataConnector支持系统

目标系统 时效性 是否支持vpc 描述
MaxCompute(ODPS) 准实时,通常情况5分钟延迟 No 同步Topic中流式数据到离线MaxCompute表,字段类型名称需一一对应,且DataHub中必须包含一列(或多列)MaxCompute表中分区列对应的字段
OSS 实时 No 同步数据到对象存储OSS指定Bucket的文件中,将以csv格式保存
ElasticSearch 实时 Yes 同步数据到ElasticSearch指定Index中,Shard之间数据同步不保证时序,所以需将同样ID的数据写入相同的Shard中
Mysql 实时 Yes 同步数据到指定的Rds Mysql表中
ADS 实时 No 同步数据到指定的ADS表中
TableStore 实时 No 同步数据到指定的TableStore表中

同步数据到MaxCompute

如何创建

创建Connector主要需要如下前置条件:

  1. 准备对应的MaxCompute表,该表字段类型、名称、顺序必须与DataHub Topic字段完全一致,如果三个条件中的任意一个不满足,则归档Connector无法创建。字段类型对应表见后表。

  2. 访问MaxCompute账号的设置,该账号必须具备该MaxCompute的Project的CreateInstance权限和归档MaxCompute表的Desc、Alter、Update权限,建议使用一个特殊最小权限的账号(如何配置访问MaxCompute账号权限?)。建议使用RAM用户账号(如何创建RAM用户账号?)。

  3. DataHub Topic的Owner/Creator账号, 才有相应的权限操作Connector,包括创建,删除等。

  4. 只支持将TUPLE类型的DataHub Topic同步到MaxCompute表中

操作流程: Project列表->Project查看->Topic查看->点击归档MaxCompute->填写配置,点击创建

  1. 进入Topic的详情页面:

    create_connector

  2. 选择同步MaxCompute并填写相关配置:

    create_odps_connector_detail

  3. 在DataConnector页面查看数据归档状态:

    odps_connector_status

配置说明

名称 是否必须 描述
MaxCompute Project yes MaxComputeProject名称
MaxCompute Table yes MaxCompute表名称
AccessId yes 访问MaxCompute的阿里云账号AccessId
AccessKey yes 访问MaxCompute的阿里云账号AccessKey
分区选项 yes SYSTEM_TIME、EVENT_TIME、USER_DEFINE三种模式,SystemTime模式会使用写入时间转化为字符串进行分区,EventTime模式会根据topic中固定的event_time字段时间进行分区(需要在创建Topic时增加一个TIMESTAMP类型名称为event_time的字段,并且写入数据时向这个字段写入其微秒时间),UserDefine模式将会直接使用用户自定义的分区字段字符串分区
分区范围 yes 划分分区的时间间隔,在SYSTEM_TIME、EVENT_TIME两种模式下生效,最少为15分钟
分区格式定义 yes 目前仅支持固定格式,未来将会开放为自定义格式,目前格式下,若为15分钟的分区范围,则会产生ds=20170704,hh=01,mm=15这样的分区

注意

  • 支持MaxCompute分区表

    1. 分区选项选择SYSTEM_TIME模式,表结构对应如下:
    2. MaxCompute表: table_test(f1 string, f2 string, f3 double) partitioned by (ds string, hh string, mm string)
    3. 对应Topic应为如下的Schema:
    4. Topic: topic_test(f1 string, f2 string, f3 double)
    5. 数据同步时根据写入时间确定ds/hh/mm的值写入MaxCompute
    1. 分区选项选择EVENT_TIME模式,表结构对应如下:
    2. MaxCompute表: table_test(f1 string, f2 string, f3 double) partitioned by (ds string, hh string,mm string)
    3. 对应Topic应为如下的Schema:
    4. Topic: topic_test(f1 string, f2 string, f3 double, event_time timestamp)
    5. 数据同步时根据event_time字段时间戳确定ds/hh/mm的值写入MaxCompute
    1. 分区选项选择USER_DEFINE模式,表结构对应如下:
    2. MaxCompute表: table_test(f1 string, f2 string, f3 double) partitioned by (ds string, pt string)
    3. 对应Topic应为如下的Schema:
    4. Topic: topic_test(f1 string, f2 string, f3 double, ds string, pt string)
    5. 数据同步时根据dspt的数据值,写入对应分区
  • USER_DEFINE模式下MaxCompute分区字段内容必须非空UTF8字符串,否则归档任务将无法正常运行或数据无效被丢弃。

  • USER_DEFINE模式下MaxCompute的分区字段的值必须符合MaxCompute对分区字段值的范围要求,否则归档任务将无法正常运行。

  • USER_DEFINE模式下请尽量避免产生大量分区值,如随机分区或者较细粒度的多级分区。短时间穿插写入大量不同分区数据的情况下,Connector可能会因为创建分区受限而出现异常。属于同一个分区的数据尽可能连续写入,会对性能有较大提升。

  • 分区选项为EventTime模式时,MaxCompute表中是不包含event_time字段的,DataHub表中需要有event_time字段。同时,若DataHub表中的event_time字段的数据值是null时,会降级使用数据写入DataHub的SystemTime写入对应分区。event_time中的值需要写入单位是微秒的时间戳,如果发现所有的数据都同步到MaxCompute的同一个分区,请检查event_time中的时间戳单位是否正确。

  • 数据归档的频率为每个Shard每5分钟或者Shard中新写入的数据量达到64MB,DataConnector服务会批量进行一次数据归档进入MaxCompute表的操作。所以数据写入DataHub Topic后至多5分钟后在MaxCompute可以被查询到。

  • 如无特殊需求与场景要求,请尽可能选用SYSTEM_TIME模式进行同步,稳定性与性能会有较大提升。

DataHub与MaxCompute新增字段

只支持添加列,不支持修改和删除,操作的顺序为:

  1. 修改MaxCompute,MaxCompute中给表添加列可以参考MaxCompute相关文档。
  2. 修改Datahub,JAVA SDK的用法如下:
    1. Field field = new Field("YOUR_FIELD_NAME", FieldType.STRING);
    2. datahubClient.appendField("projectName", "topicName", field);
  3. 修改Connector,JAVA SDK的用法如下:
    1. String fieldName = "YOUR_COLUMN_NAME";
    2. datahubClient.appendConnectorField("projectName", "topicName", ConnectorType.SINK_ODPS, fieldName);
  4. 重启connector

DataHub与MaxCompute字段类型对应表

MaxCompute表中的类型 DataHub Topic中的类型
STRING STRING
DOUBLE DOUBLE
BIGINT BIGINT
DATETIME TIMESTAMP
BOOLEAN BOOLEAN
DECIMAL DECIMAL
MAP 不支持
ARRAY 不支持

同步数据到ElasticSearch集群

如何创建

  • 创建ElasticSearch DataConnector主要需要如下前置条件:

    1. ElasticSearch服务域名。
    2. 访问ElasticSearch账号信息,目前仅支持SearchGuard方式鉴权。
    3. DataHub Topic的Owner/Creator账号, 才有相应的权限操作DataConnector,包括创建,删除等。
  • 操作流程: Project列表->Project查看->Topic查看->点击创建DataConnector->选择ElasticSearch填写配置,点击创建

    1. 进入Topic的详情页面:

      create_connector

    2. 选择同步ElasticSearch并填写相关配置:

      create_es_connector_detail

    3. 在DataConnector页面查看数据归档状态:

      es_connector_status

配置说明

名称 是否必须 描述
ElasticSearch Endpoint yes ElasticSearch服务地址,需要填写端口,例如xxxxx.aliyuncs.com:9200
ElasticSearch Index yes ElasticSearch Index名称,会将Topic数据同步到该Index中
UserName yes 访问ElasticSearch账号
Password yes 访问ElasticSearch密码
BatchSize yes 每次batch写入ES的大小,通常建议64KB-128KB
Type属性列 yes ElasticSearch Doc的Type,指定一列或多列组合为Type,根据每条记录的Type值决定写入该Index的某个Type中(指定多列的情况类似ID属性列)
ID属性列 no ElasticSearch Doc的Id,指定DataHub中一列或多列组合成为ID,需要尽可能保证该ID不重复,相同ID的数据写入同一个Shard,否则无法保证同步后的时序。(若指定多列作为ID列,将会把每条记录这些列的值通过“|”分隔符组合在一起作为ID)
是否使用Proxy模式 yes 公网ES默认请选用true
VPC ID yes VPC模式下必选,填写ES所在VPC的ID
ES Instance ID yes ES的实例ID,需要在ES控制台获取,并且在末尾加上“-worker”关键字

注意事项

  • 数据时序性:DataHub中每一个Shard是独立的通道服务,若相同ID和Type的数据先后写入不同2个Shard中,由于同步数据的任务是后台异步执行,因此只能保证同一个Shard中的数据同步的时序性,Shard之间数据无法保证,所以强烈建议同样ID+Type的数据写入同一个Shard以保证写入ElasticSearch的时序性。

  • ID与Type属性列详细介绍:

  • ElasticSearch中数据标识的两个关键字分别是Type与Id,每条数据必属于唯一一个Type,也携带一个唯一ID,相同Type+ID的数据重复写入将会Overwrite并且更新Version。
  • DataHub同步ElasticSearch时必须指定某一列或多列(多列将会以“|”作为分隔符拼接字符串)作为Type及ID属性,最佳实现情况下ID属性列数据或组合字符串需要尽可能唯一或不指定,不指定的情况下ES将会生成随机ID在索引速度优化上会有较大提升,Type属性列需要尽可能能够均衡切分数据。例如如下场景:
    1. DataHub Schema : f1 string, f2 string, f3 string, f4 string
    2. Type 属性列选择: f1
    3. ID 属性列选择: f2,f3
    4. 数据1 ["test1","test2","test3","test4"] 写入ES Type:"test1", ID:"test2|test3"
    5. 数据2 ["test5","test6","test7","test8"] 写入ES Type:"test5", ID:"test6|test7"
    1. DataHub Schema : f1 string, f2 string, f3 string, f4 string
    2. Type 属性列选择: f1
    3. ID 属性列选择:
    4. 数据1 ["test1","test2","test3","test4"] 写入ES Type:"test1", ID:"随机生成"
    5. 数据2 ["test5","test6","test7","test8"] 写入ES Type:"test5", ID:"随机生成"
  • 数据同步的时效性正常情况为秒级,即数据写入DataHub中后若干秒内将会同步到ElasticSearch,网络异常的情况可能造成重复数据Update,但是保证数据At least once语义。
  • ElasticSearch中指定的Index建议提前创建,若未提前创建同步数据时将会使用默认配置自动创建(仅5个ES Shard),建议DataHub中的Shard数与ElasticSearch Index的Shard数量保持一致或一个量级,否则鉴于ElasticSearch的写入性能可能造成数据延迟积压。

同步OSS

如何创建

  • 创建OSS DataConnector主要需要如下前置条件:

    1. OSS服务域名以及创建Bucket。

    2. 使用主账号登录并在下列授权链接中,授权服务角色AliyunDataHubDefaultRole,使得DataHub可以访问您的OSS资源。授权链接

    3. 登录DataHub控制台创建OSS DataConnector。

  • 操作流程: Project列表->Project查看->Topic查看->点击创建DataConnector->选择OSS填写配置,点击创建

    1. 进入Topic的详情页面:

      create_connector

    2. 选择同步OSS并填写相关配置:

      create_oss_connector_detail

    3. 在DataConnector页面查看数据归档状态:

      oss_connector_status

配置说明

名称 是否必须 描述
OSS Endpoint yes OSS服务地址,请使用经典网络域名,内部网络仅支持http,暂不支持https
OSS Bucket yes OSS Bucket名称数据将要同步的目的bucket
目录前缀 yes OSS目录前缀(prefix),数据将同步到bucket的该目录下
时间切分间隔 yes 单位分钟,同步任务根据该时间间隔按数据写到datahub的时间进行目录划分,例如该配置的值为5时,2017/02/05 12:00到2017/02/05 12:05的数据将同步到 bucket/prefix/201702051200/ 路径下

注意事项

  1. 需保证Bucket已存在,Bucket不存在或者删除均会导致同步任务停止。
  2. 数据写入DataHub后,满4MB或者时间达到1分钟,往OSS进行同步;数据写入OSS的所以最大延迟为1分钟。
  3. 在控制台创建OSS DataConnector的时间格式无法自定义,目前均默认为%Y%m%d%H%M,例如上述的 201702051200。
  4. 同步任务将在配置所定义的OSS路径下以随机文件名创建文件,每个Shard对应一个文件,当文件大小达到5GB时,将创建新文件。
  5. OSS同步任务创建的文件属于Appendable文件,无法跨Bucket拷贝。
  6. 文件格式:目前只支持DataHub Topic为TUPLE类型,OSS文件为标准CSV格式,即CSV文件中每行数据对应DataHub中一条记录。

同步数据到Mysql

如何创建

  • 创建Mysql DataConnector主要需要如下前置条件:

    1. Mysql相关信息,包括Host、Port、User、Password、Database、Table。
    2. DataHub Topic的Owner/Creator账号, 才有相应的权限操作DataConnector,包括创建,删除等。、
    3. Mysql中Database与Table必须事先创建好并且格式与DataHub一致
  • 操作流程: Project列表->Project查看->Topic查看->点击创建DataConnector->选择Mysql填写配置,点击创建

    1. 进入Topic的详情页面:

      create_connector

    2. 选择同步Mysql并填写相关配置:

      create_mysql_connector_detail

    3. 在DataConnector页面查看数据归档状态:

      mysql_connector_status

配置说明

名称 是否必须 描述
Mysql Host yes Mysql服务地址
Mysql Port yes Mysql端口
Mysql Database yes 同步Mysql的Database
Mysql Table yes 同步Mysql的Table
User yes 访问Mysql账号
Password yes 访问Mysql密码
模式 yes 选择ReplaceInto或者IgnoreInto

注意事项

  • ReplaceInto与IgnoreInto: ReplaceInto模式下,会使用replace into语句将数据插入,反之,IgnoreInto会使用insert ignore方式插入数据库(replace into将根据主键覆盖记录,ignore into将忽略冲突进行写入)
  • 数据同步的时效性正常情况为秒级,即数据写入DataHub中后若干秒内将会同步到Mysql,网络异常的情况可能造成重复数据Update,但是保证数据At least once语义。
  • Mysql的写入性能直接影响同步的性能,若RDS实例性能过低,可能导致DataHub同步较慢,数据堆积,严重情况可能因为同步延迟超过生命周期从而丢失数据。

DataHub与Mysql字段类型对应表

DataHub Topic中的类型 Mysql表中的类型
STRING VARCHAR
DOUBLE DOUBLE
BIGINT BIGINT
DECIMAL DECIMAL
TIMESTAMP TIMESTAMP / BIGINT
BOOLEAN BOOLEAN / TINYINT

同步数据到ADS

如何创建

创建ADS DataConnector主要需要如下前置条件:

  • ADS相关信息,包括Host、Port、User、Password、Database、Table。
  • DataHub Topic的Owner/Creator账号, 才有相应的权限操作DataConnector,包括创建,删除等。
  • ADS中Database与Table必须事先创建好并且格式与DataHub一致,且Table必须为RealTime类型。
  • 获取ADS内网地址,需要使用ADS内网地址创建,内网地址的获取方式请参见ADS使用手册或咨询阿里云技术人员。

操作流程: Project列表->Project查看->Topic查看->点击创建DataConnector->选择ADS填写配置,点击创建

  1. 进入Topic的详情页面:

    create_connector

  2. 选择同步ADS并填写相关配置:

    create_ads_connector_detail

  3. 在DataConnector页面查看数据归档状态:

    ads_connector_status

配置说明

名称 是否必须 描述
ADS Host yes ADS服务内网地址
ADS Port yes ADS端口
ADS Database yes 同步ADS的Database
ADS Table yes 同步ADS的Table
User yes 访问ADS账号
Password yes 访问ADS密码
模式 yes 选择ReplaceInto或者IgnoreInto

注意事项

  • ReplaceInto与IgnoreInto: ReplaceInto模式下,会使用replace into语句将数据插入,反之,IgnoreInto会使用insert方式插入数据库(replace into将根据主键覆盖记录,ignore into将忽略冲突进行写入)
  • 数据同步的时效性正常情况为秒级,即数据写入DataHub中后若干秒内将会同步到ADS,网络异常的情况可能造成重复数据Update,但是保证数据At least once语义。
  • ADS的写入性能直接影响同步的性能,若ADS实例性能过低,可能导致DataHub同步较慢,数据堆积,严重情况可能因为同步延迟超过生命周期从而丢失数据。

DataHub与ADS字段类型对应表

同步数据到TableStore

如何创建

  • 创建TableStore DataConnector主要需要如下前置条件:

    1. TableStore相关信息,包括TableStore服务的Endpoint、TableStore实例及其对应的Table。
    2. DataHub Topic的Owner/Creator账号, 才有相应的权限操作DataConnector,包括创建,删除等。
    3. TableStore表的主键列必须在DataHub Topic下有字段一一对应(定义顺序可以不一致)。
    4. 授权服务角色AliyunDataHubDefaultRole,使得DataHub可以访问您的TableStore资源。授权链接
  • 操作流程: Project列表->Project查看->Topic查看->点击创建DataConnector->选择TableStore填写配置,点击创建

    1. 进入Topic的详情页面:

      create_connector

    2. 选择同步TableStore并填写相关配置:

      create_ots_dataconnector_detail

    3. 在DataConnector页面查看数据归档状态:

      ots_connector_status.png

配置说明

名称 是否必须 描述
TableStore Endpoint yes TableStore服务内网地址
TableStore实例名 yes TableStore实例名
TableStore表名 yes 同步到TableStore的表

注意事项

  • TableStore表的主键列必须在对应的DataHub Topic下存在同名的字段,由于DataHub大小写不敏感,所以TableStore的主键名如果存在大写字母,那么在DataHub Topic有对应小写字段。
  • 同步TableStore表的DataConnector任务仅支持TUPLE类型Topic。
  • TableStore服务端自身的限制最大每次批量写入的行数为200,具体TableStore的相关限制请参见使用限制