MaxCompute与Kafka的集成能够提供高效、可靠的数据处理和分析能力,适用于需要实时处理、大规模数据流和复杂数据分析的场景。本文介绍消息队列Kafka版和自建Kafka数据的写入流程,以及自建Kafka数据的写入示例。
Kafka数据写入MaxCompute流程:阿里云全托管Kafka
MaxCompute与消息队列Kafka版服务紧密集成,借助消息队列Kafka版服务的MaxCompute Sink Connector,无需第三方工具及二次开发,即可满足将指定Topic数据持续导入MaxCompute数据表的需求,操作详情请参见创建MaxCompute Sink Connector。
Kafka数据写入MaxCompute流程:自建开源Kafka
前提条件
- 已部署V2.2及以上版本的Kafka服务(推荐最新版本V3.4.0),并已创建Kafka Topic信息。 
- 已创建MaxCompute项目和表。具体操作,请参见创建MaxCompute项目和创建表。 
注意事项
Kafka-connector服务支持TEXT、CSV、JSON和FLATTEN类型的Kafka数据写入,不同类型的注意事项详情如下。关于数据类型的详情介绍,请参见数据类型说明。
- TEXT和JSON类型的Kafka数据写入MaxCompute时,MaxCompute表要求如下: - 字段名称 - 字段类型 - 是否为固定字段 - topic - STRING - 是 - partition - BIGINT - 是 - offset - BIGINT - 是 - key - TEXT类型Kafka数据写入时,字段类型必须为STRING。 
- JSON类型Kafka数据写入时,根据写入的数据类型设置,支持STRING与JSON。 
 - 需要将Kafka消息的中的Key值同步到MaxCompute表中时,此字段为固定字段。关于Kafka消息同步到MaxCompute的模式,详情请参见mode。 - value - TEXT类型Kafka数据写入时,字段类型必须为STRING。 
- JSON类型Kafka数据写入时,根据写入的数据类型设置,支持STRING与JSON。 
 - 需要将Kafka消息的中的Value值同步到MaxCompute表中时,此字段为固定字段。关于Kafka消息同步到MaxCompute的模式,详情请参见mode。 - pt - STRING(分区字段) - 是 
- FLATTEN和CSV类型的Kafka数据写入MaxCompute时,必须包含以下字段和字段类型,您可以根据写入数据的内容自定义其他字段。 - 字段名称 - 字段类型 - topic - STRING - partition - BIGINT - offset - BIGINT - pt - STRING(分区字段) - CSV类型的Kafka数据写入MaxCompute表中时,MaxCompute表中自定义的字段顺序和字段类型,必须与Kafka写入的数据保持一致,以确保数据能正确写入。 
- FLATTEN类型的Kafka数据写入MaxCompute表中时,MaxCompute表中自定义的字段名称必须Kafka数据中字段名称保持一致,以确保数据能正确写入。 - 例如:要写入的FLATTEN类型的Kafka数据内容为 - {"A":a,"B":"b","C":{"D":"d","E":"e"}},那MaxCompute表信息如下所示。- CREATE TABLE IF NOT EXISTS table_flatten( topic STRING, `partition` BIGINT, `offset` BIGINT, A BIGINT, B STRING, C JSON ) PARTITIONED BY (pt STRING);
 
配置并启动Kafka-connector服务
- 以Linux环境为例,在命令窗口执行以下命令或下载链接,下载 - kafka-connector-2.0.jar包。- wget http://maxcompute-repo.oss-cn-hangzhou.aliyuncs.com/kafka/kafka-connector-2.0.jar- 为防止依赖冲突,建议在 - $KAFKA_HOME/libs下新建一个子文件夹,例如- connector,用来放置- kafka-connector-2.0.jar包。说明- 若 - kafka-connector-2.0.jar包与Kafka的部署环境不一致,配置并启动- Kafka-connector服务的操作详情,请参见配置Kafka-connector。
- 在 - $KAFKA_HOME/config目录下,配置- connect-distributed.properties文件。- 在 - connect-distributed.properties文件中补充以下内容。- ##新增以下内容 plugin.path=<KAFKA_HOME>/libs/connector ##更新key.converter和value.converter参数值 key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter
- 在 - $KAFKA_HOME/路径下,执行以下命令,启动- Kafka-connector服务。- ##启动命令 bin/connect-distributed.sh config/connect-distributed.properties &
配置并启动Kafka-connector任务
- 创建并配置 - odps-sink-connector.json配置文件,并将- odps-sink-connector.json文件上传至任意位置。- odps-sink-connector.json配置文件内容与参数介绍如下。- { "name": "Kafka connector task name", "config": { "connector.class": "com.aliyun.odps.kafka.connect.MaxComputeSinkConnector", "tasks.max": "3", "topics": "your_topic", "endpoint": "endpoint", "tunnel_endpoint": "your_tunnel endpoint", "project": "project", "schema":"default", "table": "your_table", "account_type": "account type (STS or ALIYUN)", "access_id": "access id", "access_key": "access key", "account_id": "account id for sts", "sts.endpoint": "sts endpoint", "region_id": "region id for sts", "role_name": "role name for sts", "client_timeout_ms": "STS Token valid period (ms)", "format": "TEXT", "mode": "KEY", "partition_window_type": "MINUTE", "use_streaming": false, "buffer_size_kb": 65536, "sink_pool_size":"150", "record_batch_size":"8000", "runtime.error.topic.name":"kafka topic when runtime errors happens", "runtime.error.topic.bootstrap.servers":"kafka bootstrap servers of error topic queue", "skip_error":"false" } }- 公共参数 - 参数名 - 是否必填 - 说明 - name - 是 - 任务名称,且名称必须保持唯一。 - connector.class - 是 - 启动 - Kafka connector服务的类名,默认值为- com.aliyun.odps.kafka.connect.MaxComputeSinkConnector。- tasks.max - 是 - Kafka connector中消费者进程最大个数,必须为大于0的整数。- topics - 是 - Kafka的Topic名称。 - endpoint - 是 - MaxCompute服务的连接地址。 - 您需要根据创建MaxCompute项目时选择的地域以及网络连接方式配置Endpoint。各地域及网络对应的Endpoint值,请参见Endpoint。 - tunnel_endpoint - 否 - Tunnel服务的外网访问链接。 - 如果您未配置Tunnel Endpoint,Tunnel会自动路由到MaxCompute服务所在网络对应的Tunnel Endpoint。如果您配置了Tunnel Endpoint,则以配置为准,不进行自动路由。 - 各地域及网络对应的Tunnel Endpoint值,请参见Endpoint。 - project - 是 - 访问的目标MaxCompute项目名称。 - schema - 否 - 若目标MaxCompute项目配置Schema三层模型,则需要此参数,且默认值为default。 
- 若目标MaxCompute项目未配置Schema三层模型,则无需配置此参数。 
 - 关于Schema的介绍详情,请参见Schema操作。 - table - 是 - 目标MaxCompute项目的表名称。 - format - 否 - 写入的消息格式。取值如下: - TEXT(默认值):消息的格式为字符串。 
- BINARY:消息的格式为字节数组。 
- CSV:消息的格式为逗号(,)分隔的字符串。 
- JSON:消息格式为JSON数据类型的字符串。关于MaxCompute JSON类型的详情,请参见MaxCompute JSON类型使用指南。 
- FLATTEN:消息格式为JSON数据类型的字符串,JSON中的Key和Value会被解析,写入到对应的MaxCompute表中,其中JSON数据中的Key和需要与MaxCompute的表列名对应。 
 - 关于不同格式消息导入的案例,详情请参见使用示例。 - mode - 否 - 消息同步到MaxCompute的模式。取值说明如下: - KEY:只保留消息的Key,并将Key值写入目标MaxCompute表中。 
- VALUE:只保留消息的Value,并将Value值写入目标MaxCompute表中。 
- DEFAULT(默认值):同时保留消息的Key和Value,并将Key和Value值都写入目标MaxCompute表中。 - DEFAULT模式下,只支持TEXT和BINARY格式数据写入。 
 - partition_window_type - 否 - 按照系统时间进行数据分区。取值为DAY、HOUR(默认值)、MINUTE。 - use_streaming - 否 - 是否使用流式数据通道。取值说明如下: - false(默认值):不使用。 
- true:使用。 
 - buffer_size_kb - 否 - odps partition writer内部缓冲区的大小,单位KB。默认65536 KB。 - sink_pool_size - 否 - 多线程写入的最大线程数,默认为系统CPU核数。 - record_batch_size - 否 - 一个Kafka-connector任务内部的一个线程最多可以一次并行发送消息数量。 - skip_error - 否 - 是否跳过发生未知错误的记录。取值说明如下: - false(默认值):不会跳过。 
- true:跳过。 说明- 当skip_error为false且未配置runtime.error.topic.name参数,若遇到未知错误,会停止后续的数据写入,进程会被阻塞并在日志中抛出异常。 
- 当skip_error取值true且runtime.error.topic.name未配置,写入数据的进程会继续写入,异常数据会被丢弃。 
- 当skip_error为false且已配置runtime.error.topic.name参数,写入数据的进程会继续写入,异常数据会被记录到runtime.error.topic.name配置的Topic中。 
 - 异常数据处理示例详情,请参见异常数据处理示例。 
 - runtime.error.topic.name - 否 - 将数据写入时发生的未知错误的数据写入至Kafka的Topic名称。 - runtime.error.topic.bootstrap.servers - 否 - 将数据写入时发生的未知错误的数据写入至Kafka的bootstrap servers地址。 - account_type - 是 - 访问目标MaxCompute服务的方式,支持STS、ALIYUN两种方式,默认ALIYUN。 - 不同方式访问MaxCompute需要配置不同的访问凭证参数,详情请参见通过ALIYUN方式访问MaxCompute和通过STS方式访问MaxComput。 
- 通过ALIYUN方式访问MaxCompute,除公共参数外还需配置以下参数。 - 参数名 - 说明 - access_id - 阿里云账号或RAM账号的AccessKey ID。 - 您可以进入AccessKey管理页面获取AccessKey ID。 - access_key - AccessKey ID对应的AccessKey Secret。 
- 通过STS方式访问MaxCompute,除公共参数外还需配置以下参数。 - 参数名 - 说明 - account_id - 访问目标MaxCompute项目的账号ID。您可以进入账号中心查看您的账号ID。 - region_id - 访问目标MaxCompute项目的地域ID。各地域对应的地域ID,请参见服务接入点。 - role_name - 访问目标MaxCompute项目的角色名称。您可以进入角色页面查看角色名称。 - client_timeout_ms - STS Token刷新的时间间隔,单位为毫秒(ms),默认值为11(ms)。 - sts.endpoint - 使用临时安全令牌(STS)进行身份认证时需要的STS 服务地址。 - 各地域及网络对应的Endpoint值,请参见服务接入点。 
 
- 执行以下命令,启动Kafka-connector数据传输任务。 - curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors -d @odps-sink-connector.json
使用示例
TEXT类型数据写入
- 数据准备。 - 通过使用本地客户端(odpscmd)连接或其他可以运行MaxCompute SQL的工具,创建目标MaxCompute表。 - CREATE TABLE IF NOT EXISTS table_text( topic STRING, `partition` BIGINT, `offset` BIGINT, key STRING, value STRING ) PARTITIONED BY (pt STRING);
- 创建Kafka数据。 - 在 - $KAFKA_HOME/bin/目录下,执行以下命令,创建Kafka Topic。以- topic_text为例。- sh kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic_text- 执行以下命令,创建Kafka消息。 - sh kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic_text --property parse.key=true >123 abc >456 edf
 
- (可选)启动 - Kafka-connector服务。具体操作,请参见配置并启动Kafka-connector服务。说明- 若 - Kafka-connector服务已启动,可跳过此步骤。
- 创建并配置 - odps-sink-connector.json文件,并将- odps-sink-connector.json文件上传至任意位置。本文以- $KAFKA_HOME/config路径为例。- odps-sink-connector.json文件内容示例如下,关于- odps-sink-connector.json文件详情介绍,请参见配置并启动Kafka-connector任务。- { "name": "odps-test-text", "config": { "connector.class": "com.aliyun.odps.kafka.connect.MaxComputeSinkConnector", "tasks.max": "3", "topics": "topic_text", "endpoint": "http://service.cn-shanghai.maxcompute.aliyun.com/api", "project": "project_name", "schema":"default", "table": "table_text", "account_type": "ALIYUN", "access_id": "<yourAccessKeyId>", "access_key": "<yourAccessKeySecret>", "partition_window_type": "MINUTE", "mode":"VALUE", "format":"TEXT", "sink_pool_size":"150", "record_batch_size":"9000", "buffer_size_kb":"600000" } }
- 执行以下命令,启动Kafka-connector数据传输任务。 - curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors -d @$KAFKA_HOME/config/odps-sink-connector.json
- 结果验证。 - 通过使用本地客户端(odpscmd)连接或其他可以运行MaxCompute SQL的工具,执行如下命令,查询数据写入结果。 - set odps.sql.allow.fullscan=true; select * from table_text;- 返回结果如下: - # 这里由于我们odps-sink-connector.json配置文件中的mode值为VALUE,所以只保留value的内容,key字段为NULL +-------+------------+------------+-----+-------+----+ | topic | partition | offset | key | value | pt | +-------+------------+------------+-----+-------+----+ | topic_text | 0 | 0 | NULL | abc | 07-13-2023 21:13 | | topic_text | 0 | 1 | NULL | edf | 07-13-2023 21:13 | +-------+------------+------------+-----+-------+----+
CSV类型数据写入
- 数据准备。 - 通过使用本地客户端(odpscmd)连接或其他可以运行MaxCompute SQL的工具,创建目标MaxCompute表。 - CREATE TABLE IF NOT EXISTS table_csv( topic STRING, `partition` BIGINT, `offset` BIGINT, id BIGINT, name STRING, region STRING ) PARTITIONED BY (pt STRING);
- 创建Kafka数据。 - 在 - $KAFKA_HOME/bin/目录下,执行以下命令,创建Kafka Topic。以- topic_csv为例。- sh kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic_csv- 执行以下命令,创建Kafka消息。 - sh kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic_csv --property parse.key=true >123 1103,zhangsan,china >456 1104,lisi,usa
 
- (可选)启动 - Kafka-connector服务。具体操作,请参见配置并启动Kafka-connector服务。说明- 若 - Kafka-connector服务已启动,可跳过此步骤。
- 创建并配置 - odps-sink-connector.json文件,并将- odps-sink-connector.json文件上传至任意位置。本文以- $KAFKA_HOME/config路径为例。- odps-sink-connector.json文件内容示例如下,关于- odps-sink-connector.json文件详情介绍,请参见配置并启动Kafka-connector任务。- { "name": "odps-test-csv", "config": { "connector.class": "com.aliyun.odps.kafka.connect.MaxComputeSinkConnector", "tasks.max": "3", "topics": "topic_csv", "endpoint": "http://service.cn-shanghai.maxcompute.aliyun.com/api", "project": "project_name", "schema":"default", "table": "table_csv", "account_type": "ALIYUN", "access_id": "<yourAccessKeyId>", "access_key": "<yourAccessKeySecret>", "partition_window_type": "MINUTE", "format":"CSV", "mode":"VALUE", "sink_pool_size":"150", "record_batch_size":"9000", "buffer_size_kb":"600000" } }
- 执行以下命令,启动Kafka-connector数据传输任务。 - curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors -d @$KAFKA_HOME/config/odps-sink-connector.json
- 结果验证。 - 通过使用本地客户端(odpscmd)连接或其他可以运行MaxCompute SQL的工具,执行如下命令,查询数据写入结果。 - set odps.sql.allow.fullscan=true; select * from table_csv;- 返回结果如下: - +-------+------------+------------+------------+------+--------+----+ | topic | partition | offset | id | name | region | pt | +-------+------------+------------+------------+------+--------+----+ | csv_test | 0 | 0 | 1103 | zhangsan | china | 07-14-2023 00:10 | | csv_test | 0 | 1 | 1104 | lisi | usa | 07-14-2023 00:10 | +-------+------------+------------+------------+------+--------+----+
JSON类型数据写入
- 数据准备。 - 通过使用本地客户端(odpscmd)连接或其他可以运行MaxCompute SQL的工具,创建目标MaxCompute表。 - CREATE TABLE IF NOT EXISTS table_json( topic STRING, `partition` BIGINT, `offset` BIGINT, key STRING, value JSON ) PARTITIONED BY (pt STRING);
- 创建Kafka数据。 - 在 - $KAFKA_HOME/bin/目录下,执行以下命令,创建Kafka Topic。以- topic_json为例。- sh kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic_json- 执行以下命令,创建Kafka消息。 - sh kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic_json --property parse.key=true >123 {"id":123,"name":"json-1","region":"beijing"} >456 {"id":456,"name":"json-2","region":"hangzhou"}
 
- (可选)启动 - Kafka-connector服务。具体操作,请参见配置并启动Kafka-connector服务。说明- 若 - Kafka-connector服务已启动,可跳过此步骤。
- 创建并配置 - odps-sink-connector.json文件,并将- odps-sink-connector.json文件上传至任意位置。本文以- $KAFKA_HOME/config路径为例。- odps-sink-connector.json文件内容示例如下,关于- odps-sink-connector.json文件详情介绍,请参见配置并启动Kafka-connector任务。- { "name": "odps-test-json", "config": { "connector.class": "com.aliyun.odps.kafka.connect.MaxComputeSinkConnector", "tasks.max": "3", "topics": "topic_json", "endpoint": "http://service.cn-shanghai.maxcompute.aliyun.com/api", "project": "project_name", "schema":"default", "table": "table_json", "account_type": "ALIYUN", "access_id": "<yourAccessKeyId>", "access_key": "<yourAccessKeySecret>", "partition_window_type": "MINUTE", "mode":"VALUE", "format":"JSON", "sink_pool_size":"150", "record_batch_size":"9000", "buffer_size_kb":"600000" } }
- 执行以下命令,启动Kafka-connector数据传输任务。 - curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors -d @$KAFKA_HOME/config/odps-sink-connector.json
- 结果验证。 - 通过使用本地客户端(odpscmd)连接或其他可以运行MaxCompute SQL的工具,执行如下命令,查询数据写入结果。 - set odps.sql.allow.fullscan=true; select * from table_json;- 返回结果如下: - # json 数据被成功写入value字段中 +-------+------------+------------+-----+-------+----+ | topic | partition | offset | key | value | pt | +-------+------------+------------+-----+-------+----+ | Topic_json | 0 | 0 | NULL | {"id":123,"name":"json-1","region":"beijing"} | 07-14-2023 00:28 | | Topic_json | 0 | 1 | NULL | {"id":456,"name":"json-2","region":"hangzhou"} | 07-14-2023 00:28 | +-------+------------+------------+-----+-------+----+
FLATTEN类型数据写入
- 数据准备。 - 通过使用本地客户端(odpscmd)连接或其他可以运行MaxCompute SQL的工具,创建目标MaxCompute表。 - CREATE TABLE IF NOT EXISTS table_flatten( topic STRING, `partition` BIGINT, `offset` BIGINT, id BIGINT, name STRING, extendinfo JSON ) PARTITIONED BY (pt STRING);
- 创建Kafka数据。 - 在 - $KAFKA_HOME/bin/目录下,执行以下命令,创建Kafka Topic。以- topic_flatten为例。- ./kafka/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic_flatten- 执行以下命令,创建Kafka消息。 - sh kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic_flatten --property parse.key=true >123 {"id":123,"name":"json-1","extendinfo":{"region":"beijing","sex":"M"}} >456 {"id":456,"name":"json-2","extendinfo":{"region":"hangzhou","sex":"W"}}
 
- (可选)启动 - Kafka-connector服务。具体操作,请参见配置并启动Kafka-connector服务。说明- 若 - Kafka-connector服务已启动,可跳过此步骤。
- 创建并配置 - odps-sink-connector.json文件,并将- odps-sink-connector.json文件上传至任意位置。本文以- $KAFKA_HOME/config路径为例。- odps-sink-connector.json文件内容示例如下,关于- odps-sink-connector.json文件详情介绍,请参见配置并启动Kafka-connector任务。- { "name": "odps-test-flatten", "config": { "connector.class": "com.aliyun.odps.kafka.connect.MaxComputeSinkConnector", "tasks.max": "3", "topics": "topic_flatten", "endpoint": "http://service.cn-shanghai.maxcompute.aliyun.com/api", "project": "project_name", "schema":"default", "table": "table_flatten", "account_type": "ALIYUN", "access_id": "<yourAccessKeyId>", "access_key": "<yourAccessKeySecret>", "partition_window_type": "MINUTE", "mode":"VALUE", "format":"FLATTEN", "sink_pool_size":"150", "record_batch_size":"9000", "buffer_size_kb":"600000" } }
- 执行以下命令,启动Kafka-connector任务。 - curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors -d @$KAFKA_HOME/config/odps-sink-connector.json
- 结果验证。 - 通过使用本地客户端(odpscmd)连接或其他可以运行MaxCompute SQL的工具,执行如下命令,查询数据写入结果。 - set odps.sql.allow.fullscan=true; select * from table_flatten;- 返回结果如下: - # json数据被解析写入MaxCompute表中,且支持json嵌套类型exteninfo为JSON字段 +-------+------------+--------+-----+------+------------+----+ | topic | partition | offset | id | name | extendinfo | pt | +-------+------------+--------+-----+------+------------+----+ | topic_flatten | 0 | 0 | 123 | json-1 | {"sex":"M","region":"beijing"} | 07-14-2023 01:33 | | topic_flatten | 0 | 1 | 456 | json-2 | {"sex":"W","region":"hangzhou"} | 07-14-2023 01:33 | +-------+------------+--------+-----+------+------------+----+
异常数据处理示例
- 数据准备。 - 通过使用本地客户端(odpscmd)连接或其他可以运行MaxCompute SQL的工具,创建目标MaxCompute表。 - CREATE TABLE IF NOT EXISTS table_flatten( topic STRING, `partition` BIGINT, `offset` BIGINT, id BIGINT, name STRING, extendinfo JSON ) PARTITIONED BY (pt STRING);
- 创建Kafka数据。 - 在 - $KAFKA_HOME/bin/目录下,执行以下命令,创建Kafka Topic。- topic_abnormalTopic。- sh kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic_abnormal
- runtime_error异常消息Topic。- sh kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic runtime_error说明- 当数据写入发生未知错误(通常是Kafka数据与MaxCompute表格式不匹配),异常数据会被写入到 - runtime_errorTopic中。
 - 执行以下命令,创建Kafka消息。 - 以下消息中,其中一条数据格式与目标MaxCompute表格式不匹配。 - sh kafka-console-producer.sh --bootstrap-server localhost:9092 --topic flatten_test --property parse.key=true >100 {"id":100,"name":"json-3","extendinfo":{"region":"beijing","gender":"M"}} >101 {"id":101,"name":"json-4","extendinfos":"null"} >102 {"id":102,"name":"json-5","extendinfo":{"region":"beijing","gender":"M"}}
 
- (可选)启动 - Kafka-connector服务。具体操作,请参见配置并启动Kafka-connector服务。说明- 若 - Kafka-connector服务已启动,可跳过此步骤。
- 创建并配置 - odps-sink-connector.json文件,并将- odps-sink-connector.json文件上传至任意位置。本文以- $KAFKA_HOME/config路径为例。- odps-sink-connector.json文件内容示例如下,关于- odps-sink-connector.json文件详情介绍,请参见配置并启动Kafka-connector任务。- { "name": "odps-test-runtime-error", "config": { "connector.class": "com.aliyun.odps.kafka.connect.MaxComputeSinkConnector", "tasks.max": "3", "topics": "topic_abnormal", "endpoint": "http://service.cn-shanghai.maxcompute.aliyun.com/api", "project": "project_name", "schema":"default", "table": "test_flatten", "account_type": "ALIYUN", "access_id": "<yourAccessKeyId>", "access_key": "<yourAccessKeySecret>", "partition_window_type": "MINUTE", "mode":"VALUE", "format":"FLATTEN", "sink_pool_size":"150", "record_batch_size":"9000", "buffer_size_kb":"600000", "runtime.error.topic.name":"runtime_error", "runtime.error.topic.bootstrap.servers":"http://XXXX", "skip_error":"false" } }
- 执行以下命令,启动Kafka-connector任务。 - curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors -d @$KAFKA_HOME/config/odps-sink-connector.json
- 结果验证。 - 查询MaxCompute表数据 - 通过使用本地客户端(odpscmd)连接或其他可以运行MaxCompute SQL的工具,执行如下命令,查询数据写入结果。 - set odps.sql.allow.fullscan=true; select * from table_flatten;- 返回结果如下: - # 我们看到最后两条数据,因为设置了skip_error参数为true,所以id为101的数据没有被写入MaxCompute,且没有block后面数据的写入。 +-------+------------+------------+------------+------+------------+----+ | topic | partition | offset | id | name | extendinfo | pt | +-------+------------+------------+------------+------+------------+----+ | flatten_test | 0 | 0 | 123 | json-1 | {"gender":"M","region":"beijing"} | 07-14-2023 01:33 | | flatten_test | 0 | 1 | 456 | json-2 | {"gender":"W","region":"hangzhou"} | 07-14-2023 01:33 | | flatten_test | 0 | 0 | 123 | json-1 | {"gender":"M","region":"beijing"} | 07-14-2023 13:16 | | flatten_test | 0 | 1 | 456 | json-2 | {"gender":"W","region":"hangzhou"} | 07-14-2023 13:16 | | flatten_test | 0 | 2 | 100 | json-3 | {"gender":"M","region":"beijing"} | 07-14-2023 13:16 | | flatten_test | 0 | 4 | 102 | json-5 | {"gender":"M","region":"beijing"} | 07-14-2023 13:16 | +-------+------------+------------+------------+------+------------+----+
- 查询 - runtime_errorTopic的消息- 在 - $KAFKA_HOME/bin/目录下,执行以下命令,查看消息写入结果。- sh kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic runtime_error --from-beginning- 返回结果如下: - # 异常数据被成功写入runtime_error消息队列中 {"id":101,"name":"json-4","extendinfos":"null"}