访问Kafka数据

本文介绍如何通过Lindorm计算引擎访问Kafka数据。您可以将Kafka实例中的数据加载到Lindorm计算引擎中,结合Lindorm计算引擎的其他数据源数据,完成复杂的数据生产作业。

前提条件

操作步骤

  1. 启动Beeline,并访问Lindorm计算引擎服务。具体操作,请参见使用Beeline访问JDBC服务

  2. 登录云消息队列 Kafka 版控制台,获取访问Kafka数据的连接信息。

    1. 实例详情页面的接入点信息区域,获取Kafka实例的域名接入点。

    2. Topic管理页面,获取Kafka实例的Topic名称。

  3. 在Lindorm计算引擎中创建Kafka Topic对应的Hive临时表。

    CREATE TEMPORARY TABLE kafka_tbl USING kafka OPTIONS(table_options);

    USING kafka中Kafka为指定数据源。Kafka表无需指定Schema,Lindorm计算引擎提供默认的Kafka Schema,不可修改,Schema信息如下:

    列名称

    数据类型

    说明

    key

    binary

    该记录在Kafka中的Key信息。

    value

    binary

    该记录在Kafka中的Value信息。

    topic

    string

    该记录所属的Kafka Topic。

    partition

    int

    该记录所属的Partition。

    offset

    bigint

    该记录在Partition的offset。

    timestamp

    timestamp

    该记录的时间戳。

    timestampType

    int

    该记录时间戳的类型:

    • 0:CreateTime。

    • 1:LogAppendTime。

    详细信息,请参见timestampType

    table_options参数说明:

    参数

    是否必选

    说明

    示例值

    kafka.bootstrap.servers

    Kafka实例的域名接入点。

    alikafka-pre-cn-x0r34a20****-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-x0r34a20****-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-x0r34a20****-3-vpc.alikafka.aliyuncs.com:9092

    subscribe

    Kafka实例的Topic名称。

    topic_to_read

    startingTimestamp

    待访问Topic数据片段开始时间的时间戳,单位为毫秒。

    说明

    您可以使用Spark SQL unix_timestamp函数将时间转换为UNIX时间戳。

    1682406000000

    endingTimestamp

    待访问Topic数据片段结束时间的时间戳,单位为毫秒。

    说明

    您可以使用Spark SQL unix_timestamp函数将时间转换为UNIX时间戳。

    1682409600000

    startingOffsetsByTimestampStrategy

    如果Kafka实例中部分Partition中没有数据,需要添加该参数。

    latest

    创建Kafka表支持多种类型参数,更多参数请参考:Structured Streaming + Kafka Integration Guide

    示例:

    CREATE TEMPORARY TABLE kafka_tbl USING kafka OPTIONS(
      "kafka.bootstrap.servers"="alikafka-pre-cn-x0r34a20****-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-x0r34a20****-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-x0r34a20****-3-vpc.alikafka.aliyuncs.com:9092",
      "subscribe"="topic_to_read",
      "startingTimestamp"=1682406000000,
      "endingTimestamp"=1682409600000);
  4. 查询Kafka表的数据。

    • 查询表Kafka_tbl中Schema的数据。

      DESCRIBE kafka_tbl;
    • 查询表Kafka_tbl中的数据。

      SELECT * FROM kafka_tbl LIMIT 10;
    • 使用Spark函数提取Kafka中的数据。例如,查询表Kafka_tbl中Value为{"content": "kafka record"}的数据。

      SELECT get_json_object(cast(value as string), '$.content') FROM kafka_tbl LIMIT 10;

      返回结果:

      Output: kafka record

(可选)实践:将Kafka数据导入Hive表

如果您有数据分析等相关需求,可以参考以下步骤将Kafka中的数据导入Hive表。

假设域名接入点为kafka_addr:9092,topic名称为topic1的Kafka实例中有两条写入时间在2023-04-25 15:00:00至2023-04-25 16:00:00之间的数据,具体内容为:

{"id": 1, "name": "name1"}
{"id": 2, "name": "name2"}

现在需要将这两条数据写入Hive表中,便于后续进行数据分析。

  1. 创建Kafka源表。

    CREATE TEMPORARY TABLE kafka_src_tbl USING kafka OPTIONS(
      "kafka.bootstrap.servers"="kafka_addr:9092",
      "subscribe"="topic1",
      "startingTimestamp"=1682406000000,
      "endingTimestamp"=1682409600000);

    参数的详细说明,请参见参数说明

  2. 创建Hive目标表。

    CREATE TABLE kafka_target_tbl(id LONG, name STRING) USING parquet;
  3. 将Kafka源表中所有数据解析后写入Hive目标表中。

    INSERT INTO kafka_target_tbl
        SELECT
          cast(get_json_object(cast(value as string), '$.id') as long),
          get_json_object(cast(value as string), '$.name')
        FROM
          kafka_src_tbl;
  4. 查询Kafka导入Hive表中的数据。

    SELECT * FROM kafka_target_tbl LIMIT 10;

    返回结果:

    Output:
    +-----+--------+
    | id  |  name  |
    +-----+--------+
    | 1   | name1  |
    | 2   | name2  |
    +-----+--------+