使用Kafka(离线与实时)

MaxCompute与Kafka的集成能够提供高效、可靠的数据处理和分析能力,适用于需要实时处理、大规模数据流和复杂数据分析的场景。本文介绍消息队列Kafka版和自建Kafka数据的写入流程,以及自建Kafka数据的写入示例。

Kafka数据写入MaxCompute流程:阿里云全托管Kafka

MaxCompute与消息队列Kafka版服务紧密集成,借助消息队列Kafka版服务的MaxCompute Sink Connector,无需第三方工具及二次开发,即可满足将指定Topic数据持续导入MaxCompute数据表的需求,操作详情请参见创建MaxCompute Sink Connector

Kafka数据写入MaxCompute流程:自建开源Kafka

前提条件

  • 已部署V2.2及以上版本的Kafka服务(推荐最新版本V3.4.0),并已创建Kafka Topic信息。

  • 已创建MaxCompute项目和表。具体操作,请参见创建MaxCompute项目创建表

注意事项

Kafka-connector服务支持TEXTCSVJSONFLATTEN类型的Kafka数据写入,不同类型的注意事项详情如下。关于数据类型的详情介绍,请参见数据类型说明

  • TEXTJSON类型的Kafka数据写入MaxCompute时,MaxCompute表要求如下:

    字段名称

    字段类型

    是否为固定字段

    topic

    STRING

    partition

    BIGINT

    offset

    BIGINT

    key

    • TEXT类型Kafka数据写入时,字段类型必须为STRING。

    • JSON类型Kafka数据写入时,根据写入的数据类型设置,支持STRING与JSON。

    需要将Kafka消息的中的Key值同步到MaxCompute表中时,此字段为固定字段。关于Kafka消息同步到MaxCompute的模式,详情请参见mode

    value

    • TEXT类型Kafka数据写入时,字段类型必须为STRING。

    • JSON类型Kafka数据写入时,根据写入的数据类型设置,支持STRING与JSON。

    需要将Kafka消息的中的Value值同步到MaxCompute表中时,此字段为固定字段。关于Kafka消息同步到MaxCompute的模式,详情请参见mode

    pt

    STRING(分区字段)

  • FLATTENCSV类型的Kafka数据写入MaxCompute时,必须包含以下字段和字段类型,您可以根据写入数据的内容自定义其他字段。

    字段名称

    字段类型

    topic

    STRING

    partition

    BIGINT

    offset

    BIGINT

    pt

    STRING(分区字段)

    • CSV类型的Kafka数据写入MaxCompute表中时,MaxCompute表中自定义的字段顺序和字段类型,必须与Kafka写入的数据保持一致,以确保数据能正确写入。

    • FLATTEN类型的Kafka数据写入MaxCompute表中时,MaxCompute表中自定义的字段名称必须Kafka数据中字段名称保持一致,以确保数据能正确写入。

      例如:要写入的FLATTEN类型的Kafka数据内容为{"A":a,"B":"b","C":{"D":"d","E":"e"}},那MaxCompute表信息如下所示。

      CREATE TABLE IF NOT EXISTS table_flatten(
       topic STRING,
       `partition` BIGINT,
       `offset` BIGINT,
       A BIGINT,
       B STRING,
       C JSON
      ) PARTITIONED BY (pt STRING);

配置并启动Kafka-connector服务

  1. 以Linux环境为例,在命令窗口执行以下命令或下载链接,下载kafka-connector-2.0.jar包。

    wget http://maxcompute-repo.oss-cn-hangzhou.aliyuncs.com/kafka/kafka-connector-2.0.jar

    为防止依赖冲突,建议在$KAFKA_HOME/libs下新建一个子文件夹,例如connector,用来放置kafka-connector-2.0.jar

    说明

    kafka-connector-2.0.jar包与Kafka的部署环境不一致,配置并启动Kafka-connector服务的操作详情,请参见配置Kafka-connector

  2. $KAFKA_HOME/config目录下,配置connect-distributed.properties文件。

    connect-distributed.properties文件中补充以下内容。

    ##新增以下内容
    plugin.path=<KAFKA_HOME>/libs/connector
    
    ##更新key.converter和value.converter参数值
    key.converter=org.apache.kafka.connect.storage.StringConverter
    value.converter=org.apache.kafka.connect.storage.StringConverter  
  3. $KAFKA_HOME/路径下,执行以下命令,启动Kafka-connector服务。

    ##启动命令
    bin/connect-distributed.sh config/connect-distributed.properties &

配置并启动Kafka-connector任务

  1. 创建并配置odps-sink-connector.json配置文件,并将odps-sink-connector.json文件上传至任意位置。

    odps-sink-connector.json配置文件内容与参数介绍如下。

    {
      "name": "Kafka connector task name",
      "config": {
        "connector.class": "com.aliyun.odps.kafka.connect.MaxComputeSinkConnector",
        "tasks.max": "3",
        "topics": "your_topic",
        "endpoint": "endpoint",
        "tunnel_endpoint": "your_tunnel endpoint",
        "project": "project",
        "schema":"default",
        "table": "your_table",
        "account_type": "account type (STS or ALIYUN)",
        "access_id": "access id",
        "access_key": "access key",
        "account_id": "account id for sts",
        "sts.endpoint": "sts endpoint",
        "region_id": "region id for sts",
        "role_name": "role name for sts",
        "client_timeout_ms": "STS Token valid period (ms)",
        "format": "TEXT",
        "mode": "KEY",
        "partition_window_type": "MINUTE",
        "use_streaming": false,
        "buffer_size_kb": 65536,
        "sink_pool_size":"150",
        "record_batch_size":"8000",
        "runtime.error.topic.name":"kafka topic when runtime errors happens",
        "runtime.error.topic.bootstrap.servers":"kafka bootstrap servers of error topic queue",
        "skip_error":"false"
      }
    }
    • 公共参数

      参数名

      是否必填

      说明

      name

      任务名称,且名称必须保持唯一。

      connector.class

      启动Kafka connector服务的类名,默认值为com.aliyun.odps.kafka.connect.MaxComputeSinkConnector

      tasks.max

      Kafka connector中消费者进程最大个数,必须为大于0的整数。

      topics

      Kafka的Topic名称。

      endpoint

      MaxCompute服务的连接地址。

      您需要根据创建MaxCompute项目时选择的地域以及网络连接方式配置Endpoint。各地域及网络对应的Endpoint值,请参见Endpoint

      tunnel_endpoint

      Tunnel服务的外网访问链接。

      如果您未配置Tunnel Endpoint,Tunnel会自动路由到MaxCompute服务所在网络对应的Tunnel Endpoint。如果您配置了Tunnel Endpoint,则以配置为准,不进行自动路由。

      各地域及网络对应的Tunnel Endpoint值,请参见Endpoint

      project

      访问的目标MaxCompute项目名称。

      schema

      • 若目标MaxCompute项目配置Schema三层模型,则需要此参数,且默认值为default

      • 若目标MaxCompute项目未配置Schema三层模型,则无需配置此参数。

      关于Schema的介绍详情,请参见Schema操作

      table

      目标MaxCompute项目的表名称。

      format

      写入的消息格式。取值如下:

      • TEXT(默认值):消息的格式为字符串。

      • BINARY:消息的格式为字节数组。

      • CSV:消息的格式为逗号(,)分隔的字符串。

      • JSON:消息格式为JSON数据类型的字符串。关于MaxCompute JSON类型的详情,请参见MaxCompute JSON类型使用指南(试用Beta版本)

      • FLATTEN:消息格式为JSON数据类型的字符串,JSON中的Key和Value会被解析,写入到对应的MaxCompute表中,其中JSON数据中的Key和需要与MaxCompute的表列名对应。

      关于不同格式消息导入的案例,详情请参见使用示例

      mode

      消息同步到MaxCompute的模式。取值说明如下:

      • KEY:只保留消息的Key,并将Key值写入目标MaxCompute表中。

      • VALUE:只保留消息的Value,并将Value值写入目标MaxCompute表中。

      • DEFAULT(默认值):同时保留消息的Key和Value,并将Key和Value值都写入目标MaxCompute表中。

        DEFAULT模式下,只支持TEXTBINARY格式数据写入。

      partition_window_type

      按照系统时间进行数据分区。取值为DAYHOUR(默认值)、MINUTE

      use_streaming

      是否使用流式数据通道。取值说明如下:

      • false(默认值):不使用。

      • true:使用。

      buffer_size_kb

      odps partition writer内部缓冲区的大小,单位KB。默认65536 KB。

      sink_pool_size

      多线程写入的最大线程数,默认为系统CPU核数。

      record_batch_size

      一个Kafka-connector任务内部的一个线程最多可以一次并行发送消息数量。

      skip_error

      是否跳过发生未知错误的记录。取值说明如下:

      • false(默认值):不会跳过。

      • true:跳过。

        说明
        • skip_errorfalse且未配置runtime.error.topic.name参数,若遇到未知错误,会停止后续的数据写入,进程会被阻塞并在日志中抛出异常。

        • skip_error取值trueruntime.error.topic.name未配置,写入数据的进程会继续写入,异常数据会被丢弃。

        • skip_errorfalse且已配置runtime.error.topic.name参数,写入数据的进程会继续写入,异常数据会被记录到runtime.error.topic.name配置的Topic

        异常数据处理示例详情,请参见异常数据处理示例

      runtime.error.topic.name

      将数据写入时发生的未知错误的数据写入至Kafka的Topic名称。

      runtime.error.topic.bootstrap.servers

      将数据写入时发生的未知错误的数据写入至Kafka的bootstrap servers地址。

      account_type

      访问目标MaxCompute服务的方式,支持STSALIYUN两种方式,默认ALIYUN

      不同方式访问MaxCompute需要配置不同的访问凭证参数,详情请参见通过ALIYUN方式访问MaxCompute通过STS方式访问MaxComput

    • 通过ALIYUN方式访问MaxCompute,除公共参数外还需配置以下参数。

      参数名

      说明

      access_id

      阿里云账号或RAM账号的AccessKey ID。

      您可以进入AccessKey管理页面获取AccessKey ID。

      access_key

      AccessKey ID对应的AccessKey Secret。

      您可以进入AccessKey管理页面获取AccessKey Secret。

    • 通过STS方式访问MaxCompute,除公共参数外还需配置以下参数。

      参数名

      说明

      account_id

      访问目标MaxCompute项目的账号ID。您可以进入账号中心查看您的账号ID。

      region_id

      访问目标MaxCompute项目的地域ID。各地域对应的地域ID,请参见服务接入点

      role_name

      访问目标MaxCompute项目的角色名称。您可以进入角色页面查看角色名称。

      client_timeout_ms

      STS Token刷新的时间间隔,单位为毫秒(ms),默认值为11(ms)。

      sts.endpoint

      使用临时安全令牌(STS)进行身份认证时需要的STS 服务地址。

      各地域及网络对应的Endpoint值,请参见服务接入点

  2. 执行以下命令,启动Kafka-connector数据传输任务。

    curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors -d @odps-sink-connector.json

使用示例

TEXT类型数据写入

  1. 数据准备。

    • 通过使用本地客户端(odpscmd)连接或其他可以运行MaxCompute SQL的工具,创建目标MaxCompute表。

      CREATE TABLE IF NOT EXISTS table_text(
        topic STRING,
        `partition` BIGINT,
        `offset` BIGINT,
        key STRING,
        value STRING
      ) PARTITIONED BY (pt STRING);
    • 创建Kafka数据。

      $KAFKA_HOME/bin/目录下,执行以下命令,创建Kafka Topic。以topic_text为例。

      sh kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic_text

      执行以下命令,创建Kafka消息。

      sh kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic_text --property parse.key=true
      >123    abc
      >456    edf
  2. (可选)启动Kafka-connector服务。具体操作,请参见配置并启动Kafka-connector服务

    说明

    Kafka-connector服务已启动,可跳过此步骤。

  3. 创建并配置odps-sink-connector.json文件,并将odps-sink-connector.json文件上传至任意位置。本文以$KAFKA_HOME/config路径为例。

    odps-sink-connector.json文件内容示例如下,关于odps-sink-connector.json文件详情介绍,请参见配置并启动Kafka-connector任务

    {
        "name": "odps-test-text",
        "config": {
          "connector.class": "com.aliyun.odps.kafka.connect.MaxComputeSinkConnector",
          "tasks.max": "3",
          "topics": "topic_text",
          "endpoint": "http://service.cn-shanghai.maxcompute.aliyun.com/api",
          "project": "project_name",
          "schema":"default",
          "table": "table_text",
          "account_type": "ALIYUN",
          "access_id": "LTAI5tM2iHkTd4W69nof****",
          "access_key": "S0uZvwDYDa56WZ1tjVmA67z1YS****",
          "partition_window_type": "MINUTE",
          "mode":"VALUE",
          "format":"TEXT",
          "sink_pool_size":"150",
          "record_batch_size":"9000",
          "buffer_size_kb":"600000"
        }
      }
  4. 执行以下命令,启动Kafka-connector数据传输任务。

    curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors -d @$KAFKA_HOME/config/odps-sink-connector.json
  5. 结果验证。

    通过使用本地客户端(odpscmd)连接或其他可以运行MaxCompute SQL的工具,执行如下命令,查询数据写入结果。

    set odps.sql.allow.fullscan=true;
    select * from table_text;

    返回结果如下:

    # 这里由于我们odps-sink-connector.json配置文件中的mode值为VALUE,所以只保留value的内容,key字段为NULL
    
    +-------+------------+------------+-----+-------+----+
    | topic | partition  | offset     | key | value | pt |
    +-------+------------+------------+-----+-------+----+
    | topic_text | 0      | 0          | NULL | abc   | 07-13-2023 21:13 |
    | topic_text | 0      | 1          | NULL | edf   | 07-13-2023 21:13 |
    +-------+------------+------------+-----+-------+----+

CSV类型数据写入

  1. 数据准备。

    • 通过使用本地客户端(odpscmd)连接或其他可以运行MaxCompute SQL的工具,创建目标MaxCompute表。

      CREATE TABLE IF NOT EXISTS table_csv(
        topic STRING,
        `partition` BIGINT,
        `offset` BIGINT,
        id BIGINT,
        name STRING,
        region STRING
      ) PARTITIONED BY (pt STRING);
    • 创建Kafka数据。

      $KAFKA_HOME/bin/目录下,执行以下命令,创建Kafka Topic。以topic_csv为例。

      sh kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic_csv

      执行以下命令,创建Kafka消息。

      sh kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic_csv --property parse.key=true
      >123	1103,zhangsan,china
      >456	1104,lisi,usa
  2. (可选)启动Kafka-connector服务。具体操作,请参见配置并启动Kafka-connector服务

    说明

    Kafka-connector服务已启动,可跳过此步骤。

  3. 创建并配置odps-sink-connector.json文件,并将odps-sink-connector.json文件上传至任意位置。本文以$KAFKA_HOME/config路径为例。

    odps-sink-connector.json文件内容示例如下,关于odps-sink-connector.json文件详情介绍,请参见配置并启动Kafka-connector任务

    {
        "name": "odps-test-csv",
        "config": {
          "connector.class": "com.aliyun.odps.kafka.connect.MaxComputeSinkConnector",
          "tasks.max": "3",
          "topics": "topic_csv",
          "endpoint": "http://service.cn-shanghai.maxcompute.aliyun.com/api",
          "project": "project_name",    
          "schema":"default",
          "table": "table_csv",
          "account_type": "ALIYUN",
          "access_id": "LTAI5tM2iHkTd4W69nof****",
          "access_key": "S0uZvwDYDa56WZ1tjVmA67z1YS****",
          "partition_window_type": "MINUTE",
          "format":"CSV",
          "mode":"VALUE",
          "sink_pool_size":"150",
          "record_batch_size":"9000",
          "buffer_size_kb":"600000"
        }
      }
    
  4. 执行以下命令,启动Kafka-connector数据传输任务。

    curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors -d @$KAFKA_HOME/config/odps-sink-connector.json
  5. 结果验证。

    通过使用本地客户端(odpscmd)连接或其他可以运行MaxCompute SQL的工具,执行如下命令,查询数据写入结果。

    set odps.sql.allow.fullscan=true;
    select * from table_csv;

    返回结果如下:

    +-------+------------+------------+------------+------+--------+----+
    | topic | partition  | offset     | id         | name | region | pt |
    +-------+------------+------------+------------+------+--------+----+
    | csv_test | 0       | 0          | 1103       | zhangsan | china  | 07-14-2023 00:10 |
    | csv_test | 0       | 1          | 1104       | lisi | usa    | 07-14-2023 00:10 |
    +-------+------------+------------+------------+------+--------+----+

JSON类型数据写入

  1. 数据准备。

    • 通过使用本地客户端(odpscmd)连接或其他可以运行MaxCompute SQL的工具,创建目标MaxCompute表。

      CREATE TABLE IF NOT EXISTS table_json(
        topic STRING,
        `partition` BIGINT,
        `offset` BIGINT,
        key STRING,
        value JSON
      ) PARTITIONED BY (pt STRING);
    • 创建Kafka数据。

      $KAFKA_HOME/bin/目录下,执行以下命令,创建Kafka Topic。以topic_json为例。

      sh kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic_json

      执行以下命令,创建Kafka消息。

      sh kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic_json --property parse.key=true
      >123    {"id":123,"name":"json-1","region":"beijing"}                         
      >456    {"id":456,"name":"json-2","region":"hangzhou"}
  2. (可选)启动Kafka-connector服务。具体操作,请参见配置并启动Kafka-connector服务

    说明

    Kafka-connector服务已启动,可跳过此步骤。

  3. 创建并配置odps-sink-connector.json文件,并将odps-sink-connector.json文件上传至任意位置。本文以$KAFKA_HOME/config路径为例。

    odps-sink-connector.json文件内容示例如下,关于odps-sink-connector.json文件详情介绍,请参见配置并启动Kafka-connector任务

    {
        "name": "odps-test-json",
        "config": {
          "connector.class": "com.aliyun.odps.kafka.connect.MaxComputeSinkConnector",
          "tasks.max": "3",
          "topics": "topic_json",
          "endpoint": "http://service.cn-shanghai.maxcompute.aliyun.com/api",
          "project": "project_name",    
          "schema":"default",
          "table": "table_json",
          "account_type": "ALIYUN",
          "access_id": "LTAI5tM2iHkTd4W69nof****",
          "access_key": "S0uZvwDYDa56WZ1tjVmA67z1YS****",
          "partition_window_type": "MINUTE",
          "mode":"VALUE",
          "format":"JSON",
          "sink_pool_size":"150",
          "record_batch_size":"9000",
          "buffer_size_kb":"600000"
        }
      }
    
  4. 执行以下命令,启动Kafka-connector数据传输任务。

    curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors -d @$KAFKA_HOME/config/odps-sink-connector.json
  5. 结果验证。

    通过使用本地客户端(odpscmd)连接或其他可以运行MaxCompute SQL的工具,执行如下命令,查询数据写入结果。

    set odps.sql.allow.fullscan=true;
    select * from table_json;

    返回结果如下:

    # json 数据被成功写入value字段中
    +-------+------------+------------+-----+-------+----+
    | topic | partition  | offset     | key | value | pt |
    +-------+------------+------------+-----+-------+----+
    | Topic_json | 0      | 0          | NULL | {"id":123,"name":"json-1","region":"beijing"} | 07-14-2023 00:28 |
    | Topic_json | 0      | 1          | NULL | {"id":456,"name":"json-2","region":"hangzhou"} | 07-14-2023 00:28 |
    +-------+------------+------------+-----+-------+----+

FLATTEN类型数据写入

  1. 数据准备。

    • 通过使用本地客户端(odpscmd)连接或其他可以运行MaxCompute SQL的工具,创建目标MaxCompute表。

      CREATE TABLE IF NOT EXISTS table_flatten(
        topic STRING,
        `partition` BIGINT,
        `offset` BIGINT,
        id BIGINT,
        name STRING,
        extendinfo JSON
      ) PARTITIONED BY (pt STRING);
    • 创建Kafka数据。

      $KAFKA_HOME/bin/目录下,执行以下命令,创建Kafka Topic。以topic_flatten为例。

      ./kafka/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic_flatten

      执行以下命令,创建Kafka消息。

      sh kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic_flatten --property parse.key=true
      >123  {"id":123,"name":"json-1","extendinfo":{"region":"beijing","sex":"M"}}                         
      >456  {"id":456,"name":"json-2","extendinfo":{"region":"hangzhou","sex":"W"}}

  2. (可选)启动Kafka-connector服务。具体操作,请参见配置并启动Kafka-connector服务

    说明

    Kafka-connector服务已启动,可跳过此步骤。

  3. 创建并配置odps-sink-connector.json文件,并将odps-sink-connector.json文件上传至任意位置。本文以$KAFKA_HOME/config路径为例。

    odps-sink-connector.json文件内容示例如下,关于odps-sink-connector.json文件详情介绍,请参见配置并启动Kafka-connector任务

    {
        "name": "odps-test-flatten",
        "config": {
          "connector.class": "com.aliyun.odps.kafka.connect.MaxComputeSinkConnector",
          "tasks.max": "3",
          "topics": "topic_flatten",
          "endpoint": "http://service.cn-shanghai.maxcompute.aliyun.com/api",
          "project": "project_name",    
          "schema":"default",
          "table": "table_flatten",
          "account_type": "ALIYUN",
          "access_id": "LTAI5tM2iHkTd4W69nof****",
          "access_key": "S0uZvwDYDa56WZ1tjVmA67z1YS****",
          "partition_window_type": "MINUTE",
          "mode":"VALUE",
          "format":"FLATTEN",
          "sink_pool_size":"150",
          "record_batch_size":"9000",
          "buffer_size_kb":"600000"
        }
      }
    
  4. 执行以下命令,启动Kafka-connector任务。

    curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors -d @$KAFKA_HOME/config/odps-sink-connector.json
  5. 结果验证。

    通过使用本地客户端(odpscmd)连接或其他可以运行MaxCompute SQL的工具,执行如下命令,查询数据写入结果。

    set odps.sql.allow.fullscan=true;
    select * from table_flatten;

    返回结果如下:

    # json数据被解析写入MaxCompute表中,且支持json嵌套类型exteninfo为JSON字段
    +-------+------------+--------+-----+------+------------+----+
    | topic | partition  | offset | id  | name | extendinfo | pt |
    +-------+------------+--------+-----+------+------------+----+
    | topic_flatten | 0   | 0      | 123 | json-1 | {"sex":"M","region":"beijing"} | 07-14-2023 01:33 |
    | topic_flatten | 0   | 1      | 456 | json-2 | {"sex":"W","region":"hangzhou"} | 07-14-2023 01:33 |
    +-------+------------+--------+-----+------+------------+----+

异常数据处理示例

  1. 数据准备。

    • 通过使用本地客户端(odpscmd)连接或其他可以运行MaxCompute SQL的工具,创建目标MaxCompute表。

      CREATE TABLE IF NOT EXISTS table_flatten(
        topic STRING,
        `partition` BIGINT,
        `offset` BIGINT,
        id BIGINT,
        name STRING,
        extendinfo JSON
      ) PARTITIONED BY (pt STRING);
    • 创建Kafka数据。

      $KAFKA_HOME/bin/目录下,执行以下命令,创建Kafka Topic。

      • topic_abnormalTopic。

        sh kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic_abnormal
      • runtime_error异常消息Topic。

        sh kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic runtime_error
        说明

        当数据写入发生未知错误(通常是Kafka数据与MaxCompute表格式不匹配),异常数据会被写入到runtime_errorTopic中。

      执行以下命令,创建Kafka消息。

      以下消息中,其中一条数据格式与目标MaxCompute表格式不匹配。

      sh kafka-console-producer.sh --bootstrap-server localhost:9092 --topic flatten_test --property parse.key=true
      
      >100  {"id":100,"name":"json-3","extendinfo":{"region":"beijing","gender":"M"}}                         
      >101  {"id":101,"name":"json-4","extendinfos":"null"}
      >102	{"id":102,"name":"json-5","extendinfo":{"region":"beijing","gender":"M"}} 
  2. (可选)启动Kafka-connector服务。具体操作,请参见配置并启动Kafka-connector服务

    说明

    Kafka-connector服务已启动,可跳过此步骤。

  3. 创建并配置odps-sink-connector.json文件,并将odps-sink-connector.json文件上传至任意位置。本文以$KAFKA_HOME/config路径为例。

    odps-sink-connector.json文件内容示例如下,关于odps-sink-connector.json文件详情介绍,请参见配置并启动Kafka-connector任务

    {
      "name": "odps-test-runtime-error",
      "config": {
        "connector.class": "com.aliyun.odps.kafka.connect.MaxComputeSinkConnector",
        "tasks.max": "3",
        "topics": "topic_abnormal",
        "endpoint": "http://service.cn-shanghai.maxcompute.aliyun.com/api",
        "project": "project_name",
        "schema":"default",
        "table": "test_flatten",
        "account_type": "ALIYUN",
        "access_id": "LTAI5tM2iHkTd4W69nof****",
        "access_key": "S0uZvwDYDa56WZ1tjVmA67z1YS****",
        "partition_window_type": "MINUTE",
        "mode":"VALUE",
        "format":"FLATTEN",
        "sink_pool_size":"150",
        "record_batch_size":"9000",
        "buffer_size_kb":"600000",
        "runtime.error.topic.name":"runtime_error",
        "runtime.error.topic.bootstrap.servers":"http://XXXX",
        "skip_error":"false"
      }
    }
    
  4. 执行以下命令,启动Kafka-connector任务。

    curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors -d @$KAFKA_HOME/config/odps-sink-connector.json
  5. 结果验证。

    • 查询MaxCompute表数据

      通过使用本地客户端(odpscmd)连接或其他可以运行MaxCompute SQL的工具,执行如下命令,查询数据写入结果。

      set odps.sql.allow.fullscan=true;
      select * from table_flatten;

      返回结果如下:

      # 我们看到最后两条数据,因为设置了skip_error参数为true,所以id为101的数据没有被写入MaxCompute,且没有block后面数据的写入。
      +-------+------------+------------+------------+------+------------+----+
      | topic | partition  | offset     | id         | name | extendinfo | pt |
      +-------+------------+------------+------------+------+------------+----+
      | flatten_test | 0          | 0          | 123        | json-1 | {"gender":"M","region":"beijing"} | 07-14-2023 01:33 |
      | flatten_test | 0          | 1          | 456        | json-2 | {"gender":"W","region":"hangzhou"} | 07-14-2023 01:33 |
      | flatten_test | 0          | 0          | 123        | json-1 | {"gender":"M","region":"beijing"} | 07-14-2023 13:16 |
      | flatten_test | 0          | 1          | 456        | json-2 | {"gender":"W","region":"hangzhou"} | 07-14-2023 13:16 |
      | flatten_test | 0          | 2          | 100        | json-3 | {"gender":"M","region":"beijing"} | 07-14-2023 13:16 |
      | flatten_test | 0          | 4          | 102        | json-5 | {"gender":"M","region":"beijing"} | 07-14-2023 13:16 |
      +-------+------------+------------+------------+------+------------+----+
    • 查询runtime_errorTopic的消息

      $KAFKA_HOME/bin/目录下,执行以下命令,查看消息写入结果。

      sh kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic runtime_error --from-beginning

      返回结果如下:

      # 异常数据被成功写入runtime_error消息队列中
      {"id":101,"name":"json-4","extendinfos":"null"}