本文介绍如何通过Lindorm计算引擎访问Kafka数据。您可以将Kafka实例中的数据加载到Lindorm计算引擎中,结合Lindorm计算引擎的其他数据源数据,完成复杂的数据生产作业。

前提条件

操作步骤

  1. 启动Beeline,并访问Lindorm计算引擎服务。具体操作,请参见使用Beeline访问JDBC服务
  2. 登录消息队列Kafka版控制台,获取访问Kafka数据的连接信息。
    1. 实例详情页面的接入点信息区域,获取Kafka实例的域名接入点。
    2. Topic管理页面,获取Kafka实例的Topic名称。
  3. 在Lindorm计算引擎中创建Kafka Topic对应的Hive临时表。
    CREATE TEMPORARY TABLE kafka_tbl USING kafka OPTIONS(table_options);
    USING kafka中Kafka为指定数据源。Kafka表无需指定Schema,Lindorm计算引擎提供默认的Kafka Schema,不可修改,Schema信息如下:
    列名称数据类型说明
    keybinary该记录在Kafka中的Key信息。
    valuebinary该记录在Kafka中的Value信息。
    topicstring该记录所属的Kafka Topic。
    partitionint该记录所属的Partition。
    offsetbigint该记录在Partition的offset。
    timestamptimestamp该记录的时间戳。
    timestampTypeint该记录时间戳的类型:
    • 0:CreateTime。
    • 1:LogAppendTime。
    详细信息,请参见timestampType
    table_options参数说明:
    参数是否可选说明示例值
    kafka.bootstrap.serversKafka实例的域名接入点。alikafka-pre-cn-x0r34a20****-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-x0r34a20****-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-x0r34a20****-3-vpc.alikafka.aliyuncs.com:9092
    subscribeKafka实例的Topic名称。topic_to_read
    startingTimestamp待访问Topic数据片段开始时间的时间戳,单位为毫秒。
    说明 您可以使用Spark SQL unix_timestamp函数将时间转换为UNIX时间戳。
    1682406000000
    endingTimestamp待访问Topic数据片段结束时间的时间戳,单位为毫秒。
    说明 您可以使用Spark SQL unix_timestamp函数将时间转换为UNIX时间戳。
    1682409600000
    startingOffsetsByTimestampStrategy如果Kafka实例中部分Partition中没有数据,需要添加该参数。latest
    创建Kafka表支持多种类型参数,更多参数请参考:Structured Streaming + Kafka Integration Guide
    示例:
    CREATE TEMPORARY TABLE kafka_tbl USING kafka OPTIONS(
      "kafka.bootstrap.servers"="alikafka-pre-cn-x0r34a20****-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-x0r34a20****-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-x0r34a20****-3-vpc.alikafka.aliyuncs.com:9092",
      "subscribe"="topic_to_read",
      "startingTimestamp"=1682406000000,
      "endingTimestamp"=1682409600000);
  4. 查询Kafka表的数据。
    • 查询表Kafka_tbl中Schema的数据。
      DESCRIBE kafka_tbl;
    • 查询表Kafka_tbl中的数据。
      SELECT * FROM kafka_tbl LIMIT 10;
    • 使用Spark函数提取Kafka中的数据。例如,查询表Kafka_tbl中Value为{"content": "kafka record"}的数据。
      SELECT get_json_object(cast(value as string), '$.content') FROM kafka_tbl LIMIT 10;
      返回结果:
      Output: kafka record

(可选)实践:将Kafka数据导入Hive表

如果您有数据分析等相关需求,可以参考以下步骤将Kafka中的数据导入Hive表。

假设域名接入点为kafka_addr:9092,topic名称为topic1的Kafka实例中有两条写入时间在2023-04-25 15:00:00至2023-04-25 16:00:00之间的数据,具体内容为:
{"id": 1, "name": "name1"}
{"id": 2, "name": "name2"}
现在需要将这两条数据写入Hive表中,便于后续进行数据分析。
  1. 创建Kafka源表。
    CREATE TEMPORARY TABLE kafka_src_tbl USING kafka OPTIONS(
      "kafka.bootstrap.servers"="kafka_addr:9092",
      "subscribe"="topic1",
      "startingTimestamp"=1682406000000,
      "endingTimestamp"=1682409600000);
    参数的详细说明,请参见参数说明
  2. 创建Hive目标表。
    CREATE TABLE kafka_target_tbl(id LONG, name STRING) USING parquet;
  3. 将Kafka源表中所有数据解析后写入Hive目标表中。
    INSERT INTO kafka_target_tbl
        SELECT
          cast(get_json_object(cast(value as string), '$.id') as long),
          get_json_object(cast(value as string), '$.name')
        FROM
          kafka_src_tbl;
  4. 查询Kafka导入Hive表中的数据。
    SELECT * FROM kafka_target_tbl LIMIT 10;
    返回结果:
    Output:
    +-----+--------+
    | id  |  name  |
    +-----+--------+
    | 1   | name1  |
    | 2   | name2  |
    +-----+--------+