本文介绍如何通过Lindorm计算引擎访问Kafka数据。您可以将Kafka实例中的数据加载到Lindorm计算引擎中,结合Lindorm计算引擎的其他数据源数据,完成复杂的数据生产作业。
前提条件
操作步骤
- 启动Beeline,并访问Lindorm计算引擎服务。具体操作,请参见使用Beeline访问JDBC服务。
- 登录消息队列Kafka版控制台,获取访问Kafka数据的连接信息。
- 在实例详情页面的接入点信息区域,获取Kafka实例的域名接入点。
- 在Topic管理页面,获取Kafka实例的Topic名称。
- 在Lindorm计算引擎中创建Kafka Topic对应的Hive临时表。
CREATE TEMPORARY TABLE kafka_tbl USING kafka OPTIONS(table_options);
USING kafka
中Kafka为指定数据源。Kafka表无需指定Schema,Lindorm计算引擎提供默认的Kafka Schema,不可修改,Schema信息如下:列名称 数据类型 说明 key binary 该记录在Kafka中的Key信息。 value binary 该记录在Kafka中的Value信息。 topic string 该记录所属的Kafka Topic。 partition int 该记录所属的Partition。 offset bigint 该记录在Partition的offset。 timestamp timestamp 该记录的时间戳。 timestampType int 该记录时间戳的类型: - 0:CreateTime。
- 1:LogAppendTime。
table_options
参数说明:
创建Kafka表支持多种类型参数,更多参数请参考:Structured Streaming + Kafka Integration Guide。参数 是否可选 说明 示例值 kafka.bootstrap.servers 是 Kafka实例的域名接入点。 alikafka-pre-cn-x0r34a20****-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-x0r34a20****-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-x0r34a20****-3-vpc.alikafka.aliyuncs.com:9092 subscribe 是 Kafka实例的Topic名称。 topic_to_read startingTimestamp 是 待访问Topic数据片段开始时间的时间戳,单位为毫秒。 说明 您可以使用Spark SQL unix_timestamp函数将时间转换为UNIX时间戳。1682406000000 endingTimestamp 是 待访问Topic数据片段结束时间的时间戳,单位为毫秒。 说明 您可以使用Spark SQL unix_timestamp函数将时间转换为UNIX时间戳。1682409600000 startingOffsetsByTimestampStrategy 否 如果Kafka实例中部分Partition中没有数据,需要添加该参数。 latest 示例:CREATE TEMPORARY TABLE kafka_tbl USING kafka OPTIONS( "kafka.bootstrap.servers"="alikafka-pre-cn-x0r34a20****-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-x0r34a20****-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-x0r34a20****-3-vpc.alikafka.aliyuncs.com:9092", "subscribe"="topic_to_read", "startingTimestamp"=1682406000000, "endingTimestamp"=1682409600000);
- 查询Kafka表的数据。
- 查询表Kafka_tbl中Schema的数据。
DESCRIBE kafka_tbl;
- 查询表Kafka_tbl中的数据。
SELECT * FROM kafka_tbl LIMIT 10;
- 使用Spark函数提取Kafka中的数据。例如,查询表Kafka_tbl中Value为
{"content": "kafka record"}
的数据。
返回结果:SELECT get_json_object(cast(value as string), '$.content') FROM kafka_tbl LIMIT 10;
Output: kafka record
- 查询表Kafka_tbl中Schema的数据。
(可选)实践:将Kafka数据导入Hive表
如果您有数据分析等相关需求,可以参考以下步骤将Kafka中的数据导入Hive表。
假设域名接入点为kafka_addr:9092,topic名称为topic1的Kafka实例中有两条写入时间在2023-04-25 15:00:00至2023-04-25 16:00:00之间的数据,具体内容为:
{"id": 1, "name": "name1"}
{"id": 2, "name": "name2"}
现在需要将这两条数据写入Hive表中,便于后续进行数据分析。- 创建Kafka源表。
参数的详细说明,请参见参数说明。CREATE TEMPORARY TABLE kafka_src_tbl USING kafka OPTIONS( "kafka.bootstrap.servers"="kafka_addr:9092", "subscribe"="topic1", "startingTimestamp"=1682406000000, "endingTimestamp"=1682409600000);
- 创建Hive目标表。
CREATE TABLE kafka_target_tbl(id LONG, name STRING) USING parquet;
- 将Kafka源表中所有数据解析后写入Hive目标表中。
INSERT INTO kafka_target_tbl SELECT cast(get_json_object(cast(value as string), '$.id') as long), get_json_object(cast(value as string), '$.name') FROM kafka_src_tbl;
- 查询Kafka导入Hive表中的数据。
返回结果:SELECT * FROM kafka_target_tbl LIMIT 10;
Output: +-----+--------+ | id | name | +-----+--------+ | 1 | name1 | | 2 | name2 | +-----+--------+