本文为您介绍ClickHouse Reader支持的数据类型、字段映射和数据源等参数及配置示例。

说明 仅支持阿里云ClickHouse。

ClickHouse Reader插件实现了从ClickHouse读取数据。在底层实现上,ClickHouse Reader通过JDBC连接远程ClickHouse数据库,并执行相应的SQL语句,从ClickHouse库中读取数据。

使用限制

  • ClickHouse Reader仅支持使用独享数据集成资源组概述,不支持使用公共资源组自定义资源组概述
  • ClickHouse Reader使用JDBC连接ClickHouse,且仅支持使用JDBC Statement读取数据。
  • ClickHouse Reader支持筛选部分列、列换序等功能,您可以自行填写列。
  • 您需要确认驱动和您的ClickHouse服务之间的兼容能力,数据库驱动使用如下版本。
    <dependency>
        <groupId>ru.yandex.clickhouse</groupId>
        <artifactId>clickhouse-jdbc</artifactId>
        <version>0.2.4.ali2-SNAPSHOT</version>
    </dependency>

背景信息

ClickHouse Reader面向ETL开发工程师,通过ClickHouse Reader从ClickHouse读取数据。在底层实现上,ClickHouse Reader通过JDBC连接远程ClickHouse数据库,ClickHouse Reader会根据您的配置生成相应的ClickHouse SQL查询语句,匹配数据集成框架各Writer的写入协议,并利用各引擎暴露的写入接口写入其他引擎。

参数说明

参数 描述 是否必选 默认值
datasource 数据源名称,脚本模式支持添加数据源,此配置项填写的内容必须要与添加的数据源名称保持一致。
table 所选取的需要同步的表。使用JSON数据进行描述。
说明 table必须包含在connection配置单元中。
fetchSize 该配置项定义了插件和数据库服务器端每次批量数据获取条数,该值决定了数据同步系统和服务器端的网络交互次数,能够提升数据抽取性能。
说明 fetchSize值过大会造成数据同步进程OOM,需要根据ClickHouse负载情况递增。
1,024
column 需要读取的ClickHouse数据,字段之间用英文逗号分隔。例如"column": ["id", "name", "age"]
说明 column配置项必须指定,不能为空。
jdbcUrl 到源端数据库的JDBC连接信息,jdbcUrl包含在connection配置单元中。
  • 在一个数据库上只能配置一个值。
  • jdbcUrl的格式和ClickHouse官方一致,并可以连接附加参数信息。例如:jdbc:clickhouse://localhost:3306/test?user=root&password=&useUnicode=true&characterEncoding=gbk &autoReconnect=true&failOverReadOnly=false
username 数据源的用户名。
password 数据源指定用户名的密码。
splitPk ClickHouse进行数据抽取时,如果指定splitPk,表示您希望使用splitPk代表的字段进行数据分片,数据同步因此会启动并发任务进行数据同步,提高数据同步的效能。
说明 当配置了splitPk时,fetchSize参数为必填项。
where 筛选条件,在实际业务场景中,往往会选择当天的数据进行同步,将where条件指定为gmt_create>$bizdate

where条件可以有效地进行业务增量同步。如果不填写where语句,包括不提供where的key或value,数据同步均视作同步全量数据。

向导开发介绍

  1. 选择数据源。
    配置同步任务的数据来源数据去向选择数据源
    参数 描述
    数据源 即上述参数说明中的datasource,通常填写您配置的数据源名称。
    即上述参数说明中的table
    数据过滤 即上述参数说明中的where。您将要同步数据的筛选条件,暂时不支持limit关键字过滤。SQL语法与选择的数据源一致。
    切分键 即上述参数说明中的splitPk。您可以将源数据表中某一列作为切分键,建议使用主键或有索引的列作为切分键,仅支持类型为整型的字段。
    读取数据时,根据配置的字段进行数据分片,实现并发读取,可以提升数据同步效率。
    说明 切分键与数据同步中的选择来源有关,配置数据来源时才显示切分键配置项。
  2. 字段映射,即上述参数说明中的column
    左侧的源头表字段和右侧的目标表字段为一一对应关系。单击添加一行可以增加单个字段。鼠标放至需要删除的字段上,即可单击删除图标进行删除 。字段映射
    参数 描述
    同名映射 单击同名映射,可以根据名称建立相应的映射关系,请注意匹配数据类型。
    同行映射 单击同行映射,可以在同行建立相应的映射关系,请注意匹配数据类型。
    取消映射 单击取消映射,可以取消建立的映射关系。
    自动排版 可以根据相应的规律自动排版。
    手动编辑源表字段 请手动编辑字段,一行表示一个字段,首尾空行会被采用,其它空行会被忽略。
    添加一行 添加一行的功能如下所示:
    • 可以输入常量,输入的值需要使用英文单引号。例如,'abc''123'等。
    • 可以配合调度参数使用。例如,${bizdate}等。
    • 可以输入关系数据库支持的函数。例如,now()count(1)等。
    • 如果您输入的值无法解析,则类型显示为未识别
  3. 通道控制。通道配置
    参数 描述
    任务期望最大并发数 数据同步任务内,可以从源并行读取或并行写入数据存储端的最大线程数。向导模式通过界面化配置并发数,指定任务所使用的并行度。
    同步速率 设置同步速率可以保护读取端数据库,以避免抽取速度过大,给源库造成太大的压力。同步速率建议限流,结合源库的配置,请合理配置抽取速率。
    错误记录数 错误记录数,表示脏数据的最大容忍条数。
    分布式处理能力

    数据同步时,可以将任务切片分散到多台执行节点上并发执行,提高同步速率。该模式下,配置较大任务并发数会增加数据存储访问压力,如需使用该功能,请提前评估数据存储的访问负载。该功能仅支持在独享数据集成资源组配置,详情请参见独享数据集成资源组概述新增和使用独享数据集成资源组

脚本开发介绍

脚本配置示例如下,详情请参见上述参数说明。
说明 实际执行时,请删除下述代码中的注释。
{
    "type": "job",
    "version": "2.0",
    "steps": [
        {
            "stepType": "clickhouse", //插件名。
            "parameter": {
                "fetchSize":1024,//该配置项定义了插件和数据库服务器端每次批量数据获取条数。
                "datasource": "example",
                "column": [   //列名。
                    "id",
                    "name"
                ],
                "where": "",    //过滤条件。
                "splitPk": "",  //切分键。
                "table": ""    //表名。
            },
            "name": "Reader",
            "category": "reader"
        },
        {
            "stepType": "clickhouse",
            "parameter": {
                "postSql": [
                    "update @table set db_modify_time = now() where db_id = 1"
                ],
                "datasource": "example",    //数据源。
                "batchByteSize": "67108864",
                "column": [
                    "id",
                    "name"
                ],
                "writeMode": "insert",
                "encoding": "UTF-8",
                "batchSize": 1024,
                "table": "ClickHouse_table",
                "preSql": [
                    "delete from @table where db_id = -1"
                ]
            },
            "name": "Writer",
            "category": "writer"
        }
    ],
    "setting": {
        "executeMode": null,
        "errorLimit": {
            "record": "0"  //同步过程中的错误记录限流数。
        },
        "speed": {
         "throttle":true,//当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
            "concurrent":1 //作业并发数。
            "mbps":"12",//限流
        }
    },
    "order": {
        "hops": [
            {
                "from": "Reader",
                "to": "Writer"
            }
        ]
    }
}