当您需要将某个Prometheus实例的数据导出,进行自定义业务处理时,您可以选择Prometheus数据投递服务。本文介绍如何使用Prometheus数据投递功能将实例数据投递至Kafka并进行消费处理。
前提条件
已接入Prometheus实例,具体操作,请参见:
已部署投递目标云消息队列Kafka版,并创建Topic等资源。具体操作,请参见概述。
已开通事件总线EventBridge服务。具体操作,请参见开通事件总线EventBridge并授权。
注意事项
选择专有网络进行数据投递时,如果Prometheus实例所在VPC与目标VPC不在同一个,您需要保证目标VPC内的vSwitch的IP已加入Prometheus实例的白名单中,否则会导致网络不通。vSwitch的网段信息可以在专有网络控制台的交换机详情页面获取。
支持数据投递的数据源列表。
实例类型
说明
Prometheus for 云服务
除cloud-product-prometheus名称开头的免费实例
Prometheus for 容器服务
无
Prometheus for 应用监控
无
Prometheus for Flink Serverless
无
Prometheus for Kubernetes
无
通用
除通过OpenTelemetry地址上报上来的数据
步骤一:创建投递任务
在左侧导航栏,单击数据投递。
在数据投递页面,单击顶部菜单栏选择目标地域,然后单击新建任务。
在对话框中输入任务名称和任务描述后,单击确定。
在任务编辑页面,配置数据源和投递目标。
单击+ 添加数据源,配置以下参数,然后单击确定。
配置项
说明
示例
Prometheus实例
被投递的Prometheus数据源。
c78cb8273c02*****
数据过滤
设置需要过滤的指标标签,支持正则表达式。多个条件需要换行,条件需要同时满足,才会投递。
__name__=AliyunEcs_CPUUtilization|AliyunEcs_memory_usedutilization regionId=cn-hangzhou id=i-2ze0mxp.*
单击添加目标,选择目标类型为阿里云消息队列Kafka版,请按照表单所需填写其余信息,然后单击确定。
配置完成后,在任务编辑页面,单击确定,然后单击保存创建投递任务。
步骤二:查看Prometheus监控数据
为减轻投递目标的压力,投递至Kafka的Prometheus监控数据为经过Snappy标准化压缩后的JsonArray数据。更多信息,请参见Snappy压缩格式。
方式一:通过控制台查看
在概览页面的资源分布区域,选择地域。
在实例列表页面,单击目标实例名称。
在左侧导航栏,单击Topic 管理,然后单击目标Topic操作列的详情,在云监控或消息查询页签查看您导入的数据。
方式二:通过客户端查看
初始化Kafka客户端,请参见单Consumer订阅消息。
在
KafkaConsumerDemo.java
文件中添加以下代码。以下为初始化Kafka客户端后,消费数据并使用Snappy解压的示例:public static void main(String[] args) { // 请先初始化kafka consumer while (true){ try { ConsumerRecords<String, byte[]> records = consumer.poll(1000); //必须在下次poll之前消费完这些数据, 且总耗时不得超过SESSION_TIMEOUT_MS_CONFIG。 //建议开一个单独的线程池来消费消息,然后异步返回结果。 for (ConsumerRecord<String, byte[]> record : records) { byte[] compressedData = record.value(); byte[] data = Snappy.uncompress(compressedData); System.out.println(new String(data)); } } catch (Exception e) { try { Thread.sleep(1000); } catch (Throwable ignore) { } e.printStackTrace(); } } }
编译并运行
KafkaConsumerDemo.java
文件,您可看到以下JSON格式的指标数据输出。[{ "instance": "*****", "pod": "*****", "rejected": "false", "type": "validate", "pod_name": "*****", "endpoint": "http-metrics", "__name__": "apiserver_admission_controller_admission_duration_seconds_bucket", "service": "kubernetes", "name": "*****", "namespace": "default", "le": "2.5", "job": "apiserver", "operation": "UPDATE", "value": "675.0", "timestamp": "1698732988354" }, { "instance": "*****", "pod": "*****", "rejected": "false", "type": "validate", "pod_name": "*****", "endpoint": "http-metrics", "__name__": "apiserver_admission_controller_admission_duration_seconds_bucket", "service": "kubernetes", "name": "*****", "namespace": "default", "le": "+Inf", "job": "apiserver", "operation": "UPDATE", "value": "675.0", "timestamp": "1698732988354" }, { "instance": "*****", "pod": "*****", "rejected": "false", "type": "validate", "pod_name": "*****", "endpoint": "http-metrics", "__name__": "apiserver_admission_controller_admission_duration_seconds_bucket", "service": "kubernetes", "name": "*****", "namespace": "default", "le": "0.005", "job": "apiserver", "operation": "UPDATE", "value": "1037.0", "timestamp": "1698732988519" }, { "instance": "*****", "pod": "*****", "rejected": "false", "type": "validate", "pod_name": "*****", "endpoint": "http-metrics", "__name__": "apiserver_admission_controller_admission_duration_seconds_bucket", "service": "kubernetes", "name": "*****", "namespace": "default", "le": "0.025", "job": "apiserver", "operation": "UPDATE", "value": "1037.0", "timestamp": "1698732988519" }]