当您需要将某个Prometheus实例的数据导出,进行自定义业务处理时,您可以选择Prometheus数据投递服务。本文介绍如何使用Prometheus数据投递功能将实例数据投递至Kafka并进行消费处理。
前提条件
已接入Prometheus实例。具体操作,请参见快速入门。
已部署投递目标云消息队列Kafka版,并创建Topic等资源。具体操作,请参见快速入门。
已开通事件总线EventBridge服务。具体操作,请参见开通事件总线EventBridge并授权。
步骤一:创建投递任务
登录ARMS控制台。
在左侧导航栏,选择Prometheus监控,然后单击数据投递(beta),进入可观测监控Prometheus版的数据投递页面。
在数据投递页面,单击顶部菜单栏选择目标地域,然后单击新建任务。
在对话框中输入任务名称和任务描述后,单击确定。
在任务编辑页面,配置数据源和投递目标。
单击+ 添加数据源,配置以下参数,然后单击确定。
配置项
说明
示例
Prometheus实例
被投递的Prometheus数据源。
c78cb8273c02*****
数据过滤
根据白名单或黑名单模式填入需要过滤的指标,通过Label筛选投递数据。
支持正则表达式,多个条件换行,多个条件为且(&&)的关系。
__name__=rpc.* job=apiserver instance=192.*
攒批设置
指数据量超过多大投递一次数据或者超过多久投递一次数据。设置为空时使用默认值,推荐使用默认值。
100 MB
单击添加目标,选择目标类型为阿里云消息队列Kafka版,请按照表单所需填写其余信息,然后单击确定。
配置完成后,在任务编辑页面,单击确定,然后单击保存创建投递任务。
步骤二:查看Prometheus监控数据
为减轻投递目标的压力,投递至Kafka的Prometheus监控数据为经过Snappy标准化压缩后的JsonArray数据。更多信息,请参见Snappy压缩格式。
方式一:通过控制台查看
在概览页面的资源分布区域,选择地域。
在实例列表页面,单击目标实例名称。
在左侧导航栏,单击Topic 管理,然后单击目标Topic操作列的详情,在云监控或消息查询页签查看您导入的数据。
方式二:通过客户端查看
初始化Kafka客户端,请参见单Consumer订阅消息。
在
KafkaConsumerDemo.java
文件中添加以下代码。以下为初始化Kafka客户端后,消费数据并使用Snappy解压的示例:public static void main(String[] args) { // 请先初始化kafka consumer while (true){ try { ConsumerRecords<String, byte[]> records = consumer.poll(1000); //必须在下次poll之前消费完这些数据, 且总耗时不得超过SESSION_TIMEOUT_MS_CONFIG。 //建议开一个单独的线程池来消费消息,然后异步返回结果。 for (ConsumerRecord<String, byte[]> record : records) { byte[] compressedData = record.value(); byte[] data = Snappy.uncompress(compressedData); System.out.println(new String(data)); } } catch (Exception e) { try { Thread.sleep(1000); } catch (Throwable ignore) { } e.printStackTrace(); } } }
编译并运行
KafkaConsumerDemo.java
文件,您可看到以下JSON格式的指标数据输出。[{ "instance": "*****", "pod": "*****", "rejected": "false", "type": "validate", "pod_name": "*****", "endpoint": "http-metrics", "__name__": "apiserver_admission_controller_admission_duration_seconds_bucket", "service": "kubernetes", "name": "*****", "namespace": "default", "le": "2.5", "job": "apiserver", "operation": "UPDATE", "value": "675.0", "timestamp": "1698732988354" }, { "instance": "*****", "pod": "*****", "rejected": "false", "type": "validate", "pod_name": "*****", "endpoint": "http-metrics", "__name__": "apiserver_admission_controller_admission_duration_seconds_bucket", "service": "kubernetes", "name": "*****", "namespace": "default", "le": "+Inf", "job": "apiserver", "operation": "UPDATE", "value": "675.0", "timestamp": "1698732988354" }, { "instance": "*****", "pod": "*****", "rejected": "false", "type": "validate", "pod_name": "*****", "endpoint": "http-metrics", "__name__": "apiserver_admission_controller_admission_duration_seconds_bucket", "service": "kubernetes", "name": "*****", "namespace": "default", "le": "0.005", "job": "apiserver", "operation": "UPDATE", "value": "1037.0", "timestamp": "1698732988519" }, { "instance": "*****", "pod": "*****", "rejected": "false", "type": "validate", "pod_name": "*****", "endpoint": "http-metrics", "__name__": "apiserver_admission_controller_admission_duration_seconds_bucket", "service": "kubernetes", "name": "*****", "namespace": "default", "le": "0.025", "job": "apiserver", "operation": "UPDATE", "value": "1037.0", "timestamp": "1698732988519" }]
- 本页导读 (1)