将Prometheus数据投递至Kafka

当您需要将某个Prometheus实例的数据导出,进行自定义业务处理时,您可以选择Prometheus数据投递服务。本文介绍如何使用Prometheus数据投递功能将实例数据投递至Kafka并进行消费处理。

前提条件

注意事项

  • 选择专有网络进行数据投递时,如果Prometheus实例所在VPC与目标VPC不在同一个,您需要保证目标VPC内的vSwitchIP已加入Prometheus实例的白名单中,否则会导致网络不通。vSwitch的网段信息可以在专有网络控制台的交换机详情页面获取。444.jpg

  • 支持数据投递的数据源列表。

    实例类型

    说明

    Prometheus for 云服务

    cloud-product-prometheus名称开头的免费实例

    Prometheus for 容器服务

    Prometheus for 应用监控

    Prometheus for Flink Serverless

    Prometheus for Kubernetes

    通用

    除通过OpenTelemetry地址上报上来的数据

步骤一:创建投递任务

  1. 登录ARMS控制台

  2. 在左侧导航栏选择Prometheus监控 > 数据投递,进入可观测监控Prometheus版的数据投递页面。

  3. 数据投递页面,单击顶部菜单栏选择目标地域,然后单击新建任务

  4. 在对话框中输入任务名称任务描述后,单击确定

  5. 任务编辑页面,配置数据源和投递目标。

    1. 单击+ 添加数据源,配置以下参数,然后单击确定

      配置项

      说明

      示例

      Prometheus实例

      被投递的Prometheus数据源。

      c78cb8273c02*****

      数据过滤

      设置需要过滤的指标标签,支持正则表达式。多个条件需要换行,条件需要同时满足,才会投递。

      __name__=AliyunEcs_CPUUtilization|AliyunEcs_memory_usedutilization
      regionId=cn-hangzhou
      id=i-2ze0mxp.*
    2. 单击添加目标,选择目标类型阿里云消息队列Kafka,请按照表单所需填写其余信息,然后单击确定

  6. 配置完成后,在任务编辑页面,单击确定,然后单击保存创建投递任务。

步骤二:查看Prometheus监控数据

说明

为减轻投递目标的压力,投递至KafkaPrometheus监控数据为经过Snappy标准化压缩后的JsonArray数据。更多信息,请参见Snappy压缩格式

方式一:通过控制台查看

  1. 登录云消息队列 Kafka 版控制台

  2. 概览页面的资源分布区域,选择地域。

  3. 实例列表页面,单击目标实例名称。

  4. 在左侧导航栏,单击Topic 管理,然后单击目标Topic操作列的详情,在云监控消息查询页签查看您导入的数据。

    image

方式二:通过客户端查看

  1. 初始化Kafka客户端,请参见Consumer订阅消息

  2. KafkaConsumerDemo.java文件中添加以下代码。以下为初始化Kafka客户端后,消费数据并使用Snappy解压的示例:

    public static void main(String[] args) {
    
            // 请先初始化kafka consumer
            
            while (true){
                try {
                    ConsumerRecords<String, byte[]> records = consumer.poll(1000);
                    //必须在下次poll之前消费完这些数据, 且总耗时不得超过SESSION_TIMEOUT_MS_CONFIG。
                    //建议开一个单独的线程池来消费消息,然后异步返回结果。
                    for (ConsumerRecord<String, byte[]> record : records) {
                        byte[] compressedData = record.value();
                        byte[] data = Snappy.uncompress(compressedData);
                        
                        System.out.println(new String(data));
                    }
                } catch (Exception e) {
                    try {
                        Thread.sleep(1000);
                    } catch (Throwable ignore) {
    
                    }
    
                    e.printStackTrace();
                }
            }
    }
  3. 编译并运行KafkaConsumerDemo.java文件,您可看到以下JSON格式的指标数据输出。

    [{
    	"instance": "*****",
    	"pod": "*****",
    	"rejected": "false",
    	"type": "validate",
    	"pod_name": "*****",
    	"endpoint": "http-metrics",
    	"__name__": "apiserver_admission_controller_admission_duration_seconds_bucket",
    	"service": "kubernetes",
    	"name": "*****",
    	"namespace": "default",
    	"le": "2.5",
    	"job": "apiserver",
    	"operation": "UPDATE",
    	"value": "675.0",
    	"timestamp": "1698732988354"
    }, {
    	"instance": "*****",
    	"pod": "*****",
    	"rejected": "false",
    	"type": "validate",
    	"pod_name": "*****",
    	"endpoint": "http-metrics",
    	"__name__": "apiserver_admission_controller_admission_duration_seconds_bucket",
    	"service": "kubernetes",
    	"name": "*****",
    	"namespace": "default",
    	"le": "+Inf",
    	"job": "apiserver",
    	"operation": "UPDATE",
    	"value": "675.0",
    	"timestamp": "1698732988354"
    }, {
    	"instance": "*****",
    	"pod": "*****",
    	"rejected": "false",
    	"type": "validate",
    	"pod_name": "*****",
    	"endpoint": "http-metrics",
    	"__name__": "apiserver_admission_controller_admission_duration_seconds_bucket",
    	"service": "kubernetes",
    	"name": "*****",
    	"namespace": "default",
    	"le": "0.005",
    	"job": "apiserver",
    	"operation": "UPDATE",
    	"value": "1037.0",
    	"timestamp": "1698732988519"
    }, {
    	"instance": "*****",
    	"pod": "*****",
    	"rejected": "false",
    	"type": "validate",
    	"pod_name": "*****",
    	"endpoint": "http-metrics",
    	"__name__": "apiserver_admission_controller_admission_duration_seconds_bucket",
    	"service": "kubernetes",
    	"name": "*****",
    	"namespace": "default",
    	"le": "0.025",
    	"job": "apiserver",
    	"operation": "UPDATE",
    	"value": "1037.0",
    	"timestamp": "1698732988519"
    }]