DataX Writer插件实现了写入数据到StarRocks目的表的功能。在底层实现上,DataX Writer通过Stream Load以CSV或JSON格式导入数据至StarRocks。内部将Reader读取的数据进行缓存后批量导入至StarRocks,以提高写入性能。阿里云DataWorks已经集成了DataX导入的能力,可以同步MaxCompute数据到EMR StarRocks。本文为您介绍DataX Writer原理,以及如何使用DataWorks进行离线同步任务。

背景信息

DataX Writer总体的数据流为Source -> Reader -> DataX channel -> Writer -> StarRocks。

功能说明

环境准备

您可以下载DataX插件DataX源码进行测试:

测试时可以使用命令python datax.py --jvm="-Xms6G -Xmx6G" --loglevel=debug job.json

配置样例

从MySQL读取数据后导入至StarRocks。
{
    "job": {
        "setting": {
            "speed": {
                 "channel": 1
            },
            "errorLimit": {
                "record": 0,
                "percentage": 0
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "xxxx",
                        "password": "xxxx",
                        "column": [ "k1", "k2", "v1", "v2" ],
                        "connection": [
                            {
                                "table": [ "table1", "table2" ],
                                "jdbcUrl": [
                                     "jdbc:mysql://127.0.0.1:3306/datax_test1"
                                ]
                            },
                            {
                                "table": [ "table3", "table4" ],
                                "jdbcUrl": [
                                     "jdbc:mysql://127.0.0.1:3306/datax_test2"
                                ]
                            }
                        ]
                    }
                },
               "writer": {
                    "name": "starrockswriter",
                    "parameter": {
                        "username": "xxxx",
                        "password": "xxxx",
                        "database": "xxxx",
                        "table": "xxxx",
                        "column": ["k1", "k2", "v1", "v2"],
                        "preSql": [],
                        "postSql": [],
                        "jdbcUrl": "jdbc:mysql://fe-c-*****-internal.starrocks.aliyuncs.com:9030/",
                        "loadUrl": ["fe-c-*****-internal.starrocks.aliyuncs.com:8030""],
                        "loadProps": {}
                    }
                }
            }
        ]
    }
}
相关参数描述如下表所示。
参数描述是否必选默认值
usernameStarRocks数据库的用户名。
passwordStarRocks数据库的密码。
databaseStarRocks数据库的名称。
tableStarRocks表的名称。
loadUrlStarRocks FE的地址,用于Stream Load,可以为多个FE地址,格式为fe_ip:fe_http_port
column目的表需要写入数据的字段,字段之间用英文逗号(,)分隔。例如,"column": ["id","name","age"]
重要 该参数必须指定。如果希望导入所有字段,可以使用["*"]
preSql写入数据到目的表前,会先执行设置的标准语句。
postSql写入数据到目的表后,会先执行设置的标准语句。
jdbcUrl目的数据库的JDBC连接信息,用于执行preSqlpostSql
maxBatchRows单次Stream Load导入的最大行数。500000(50W)
maxBatchSize单次Stream Load导入的最大字节数。104857600 (100M)
flushInterval上一次Stream Load结束至下一次开始的时间间隔。单位为ms。300000(ms)
loadPropsStream Load的请求参数,详情请参见Stream Load

类型转换

默认传入的数据均会被转为字符串,并以\t作为列分隔符,\n作为行分隔符,组成CSV文件进行Stream Load导入操作。类型转换示例如下:
  • 更改列分隔符,则loadProps配置如下。
    "loadProps": {
        "column_separator": "\\x01",
        "row_delimiter": "\\x02"
    }
  • 更改导入格式为JSON,则loadProps配置如下。
    "loadProps": {
        "format": "json",
        "strip_outer_array": true
    }

导入案例

重要 请确保RDS MySQL和StarRocks实例在同一个网络VPC和VSW下。
  • 创建MySQL源数据表
    create table `sr_db`.sr_table(id int, name varchar (1024) ,event_time  DATETIME);
    insert into `sr_db`.sr_table values (1,"aaa","2015-09-12 00:00:00"),(2,"bbb","2015-09-12 00:00:00");
  • 创建StarRocks数据表
    CREATE TABLE IF NOT EXISTS load_db.datax_into_tbl (
      id          INT,
      name           STRING,
      event_time  DATETIME
    ) ENGINE=OLAP
    DUPLICATE KEY(id, name)
    DISTRIBUTED BY HASH(id, name) BUCKETS 1
    PROPERTIES (
      "replication_num" = "1"
    );
  • 创建同步任务
    {
        "job": {
            "setting": {
                "speed": {
                     "channel": 1
                },
                "errorLimit": {
                    "record": 0,
                    "percentage": 0
                }
            },
            "content": [
                {
                    "reader": {
                        "name": "mysqlreader",
                        "parameter": {
                            "username": "username",
                            "password": "***",
                            "column": [ "id", "name", "event_time" ],
                            "connection": [
                                {
                                    "table": [ "sr_table"],
                                    "jdbcUrl": [
                                         "jdbc:mysql://rm-*****.mysql.rds.aliyuncs.com:3306/sr_db"
                                    ]
                                }
                            ]
                        }
                    },
                   "writer": {
                        "name": "starrockswriter",
                        "parameter": {
                            "username": "admin",
                            "password": "****",
                            "database": "load_db",
                            "table": "datax_load_tbl",
                            "column": ["id", "name", "event_time"],
                            "preSql": [],
                            "postSql": [],
                            "jdbcUrl": "jdbc:mysql://fe-c-*****-internal.starrocks.aliyuncs.com:9030/",
                            "loadUrl": ["fe-c-*****-internal.starrocks.aliyuncs.com:8030"],
                            "loadProps": {}
                        }
                    }
                }
            ]
        }
    }
    返回信息如下所示。
    任务启动时刻                    : 2023-04-07 13:05:55
    任务结束时刻                    : 2023-04-07 13:06:05
    任务总计耗时                    :                 10s
    任务平均流量                    :                2B/s
    记录写入速度                    :              0rec/s
    读出记录总数                    :                   2
    读写失败总数                    :                   0

DataWorks离线同步使用方式

  1. 在DataWorks上创建工作空间,详情请参见创建工作空间
  2. 在DataWorks上创建测试表并上传数据到MaxCompute数据源,详情请参见建表并上传数据
  3. 创建StarRocks数据源。
    1. 在DataWorks的工作空间列表页面,单击目标工作空间操作列的数据集成
    2. 在左侧导航栏,单击数据源
    3. 单击右上角的新增数据源
    4. 新增数据源对话框中,新增StarRocks类型的数据源。
      add_StarRocks
  4. 创建离线同步任务流程。
    1. 新建业务流程,详情请参见创建业务流程
    2. 在目录业务流程,新建离线同步任务,详情请参见创建离线同步节点
      add_node
  5. 在StarRocks实例中查看数据,详情请参见查看元数据