本文介绍如何通过Lindorm计算引擎访问Kafka数据。您可以将Kafka实例中的数据加载到Lindorm计算引擎中,结合Lindorm计算引擎的其他数据源数据,完成复杂的数据生产作业。
前提条件
操作步骤
启动Beeline,并访问Lindorm计算引擎服务。具体操作,请参见使用Beeline访问JDBC服务。
登录云消息队列 Kafka 版控制台,获取访问Kafka数据的连接信息。
在实例详情页面的接入点信息区域,获取Kafka实例的域名接入点。
在Topic管理页面,获取Kafka实例的Topic名称。
在Lindorm计算引擎中创建Kafka Topic对应的Hive临时表。
CREATE TEMPORARY TABLE kafka_tbl USING kafka OPTIONS(table_options);
USING kafka
中Kafka为指定数据源。Kafka表无需指定Schema,Lindorm计算引擎提供默认的Kafka Schema,不可修改,Schema信息如下:列名称
数据类型
说明
key
binary
该记录在Kafka中的Key信息。
value
binary
该记录在Kafka中的Value信息。
topic
string
该记录所属的Kafka Topic。
partition
int
该记录所属的Partition。
offset
bigint
该记录在Partition的offset。
timestamp
timestamp
该记录的时间戳。
timestampType
int
该记录时间戳的类型:
0:CreateTime。
1:LogAppendTime。
详细信息,请参见timestampType。
table_options
参数说明:参数
是否必选
说明
示例值
kafka.bootstrap.servers
是
Kafka实例的域名接入点。
alikafka-pre-cn-x0r34a20****-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-x0r34a20****-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-x0r34a20****-3-vpc.alikafka.aliyuncs.com:9092
subscribe
是
Kafka实例的Topic名称。
topic_to_read
startingTimestamp
是
待访问Topic数据片段开始时间的时间戳,单位为毫秒。
说明您可以使用Spark SQL unix_timestamp函数将时间转换为UNIX时间戳。
1682406000000
endingTimestamp
是
待访问Topic数据片段结束时间的时间戳,单位为毫秒。
说明您可以使用Spark SQL unix_timestamp函数将时间转换为UNIX时间戳。
1682409600000
startingOffsetsByTimestampStrategy
否
如果Kafka实例中部分Partition中没有数据,需要添加该参数。
latest
创建Kafka表支持多种类型参数,更多参数请参考:Structured Streaming + Kafka Integration Guide。
示例:
CREATE TEMPORARY TABLE kafka_tbl USING kafka OPTIONS( "kafka.bootstrap.servers"="alikafka-pre-cn-x0r34a20****-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-x0r34a20****-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-x0r34a20****-3-vpc.alikafka.aliyuncs.com:9092", "subscribe"="topic_to_read", "startingTimestamp"=1682406000000, "endingTimestamp"=1682409600000);
查询Kafka表的数据。
查询表Kafka_tbl中Schema的数据。
DESCRIBE kafka_tbl;
查询表Kafka_tbl中的数据。
SELECT * FROM kafka_tbl LIMIT 10;
使用Spark函数提取Kafka中的数据。例如,查询表Kafka_tbl中Value为
{"content": "kafka record"}
的数据。SELECT get_json_object(cast(value as string), '$.content') FROM kafka_tbl LIMIT 10;
返回结果:
Output: kafka record
(可选)实践:将Kafka数据导入Hive表
如果您有数据分析等相关需求,可以参考以下步骤将Kafka中的数据导入Hive表。
假设域名接入点为kafka_addr:9092,topic名称为topic1的Kafka实例中有两条写入时间在2023-04-25 15:00:00至2023-04-25 16:00:00之间的数据,具体内容为:
{"id": 1, "name": "name1"}
{"id": 2, "name": "name2"}
现在需要将这两条数据写入Hive表中,便于后续进行数据分析。
创建Kafka源表。
CREATE TEMPORARY TABLE kafka_src_tbl USING kafka OPTIONS( "kafka.bootstrap.servers"="kafka_addr:9092", "subscribe"="topic1", "startingTimestamp"=1682406000000, "endingTimestamp"=1682409600000);
参数的详细说明,请参见参数说明。
创建Hive目标表。
CREATE TABLE kafka_target_tbl(id LONG, name STRING) USING parquet;
将Kafka源表中所有数据解析后写入Hive目标表中。
INSERT INTO kafka_target_tbl SELECT cast(get_json_object(cast(value as string), '$.id') as long), get_json_object(cast(value as string), '$.name') FROM kafka_src_tbl;
查询Kafka导入Hive表中的数据。
SELECT * FROM kafka_target_tbl LIMIT 10;
返回结果:
Output: +-----+--------+ | id | name | +-----+--------+ | 1 | name1 | | 2 | name2 | +-----+--------+