使用SLS日志监控服务监控RDS PostgreSQL实例会话

监控实例会话有助于更好地了解当前数据库的状态,进而排查数据库性能问题。本文介绍如何利用pg_stat_activity系统视图、阿里云日志服务(SLS)和云监控(CloudMonitor),实现对RDS PostgreSQL会话级别的监控。通过这种方法,可以对系统的状态和性能进行分析,从而了解数据库在过去一段时间内的运行情况。

背景信息

数据库系统通常是较为庞大的应用,当其负载较高时,往往需要耗费大量的内存、CPU、IO和网络资源。RDS PostgreSQL采用进程模型,每一个会话都对应一个后台进程。对这些会话进行监控,有助于更好地了解当前数据库的状态,进而排查数据库的性能瓶颈。

pg_stat_activity系统视图

pg_stat_activity视图提供了关于系统当前正在运行的所有会话的信息。通过定期查询的方式,将会话信息保留下来,相当于每隔一段时间对系统进行一次快照。这不仅有助于了解系统的状态,还能帮助排查数据库的性能问题。

会话信息参数

pg_stat_activity输出内容中各参数的含义如下所示。

参数名称

数据类型

说明

datid

oid

进程连接的数据库OID。

datname

name

进程连接的数据库名称。

pid

integer

进程ID。

leader_pid

integer

并行处理或者应用工作的领导进程,如果没有适用的领导进程或工作者进程,则该字段为 NULL。

usesysid

oid

登录到该进程的用户OID。

usename

name

登录到该进程的用户名。

application_name

text

连接到该进程的应用程序名称。

client_addr

inet

连接到该进程的客户端IP地址。

client_hostname

text

连接到客户端的主机名,通过反向DNS查找client_addr获得。

client_port

integer

客户端使用的TCP端口号,如果使用Unix套接字,取值为-1。

backend_start

timestamp with time zone

进程的开始时间。

xact_start

timestamp with time zone

进程当前事务的开始时间,如果没有活动事务,则为NULL。

query_start

timestamp with time zone

当前正在进行的查询的开始时间。

state_change

timestamp with time zone

最后一次更改状态的时间。

wait_event_type

text

进程中正在等待的事件类型。

wait_event

text

进程中正在等待的事件名称,若无正在等待的事件,则取值为NULL。

state

text

当前进程的整体状态。

backend_xid

xid

当前进程的顶层事务ID。

backend_xmin

xid

当前进程的最小事务ID。

query_id

bigint

此进程最近一次查询的ID。

query

text

此进程最近一次查询的文本。

backend_type

text

进程的类型。

会话信息采集SQL

信息采集SQL示例如下。其中leader_id字段仅适用于RDS PostgreSQL13及以后的大版本,您可以根据实例的实际版本调整采集SQL。

会话信息采集SQL

SELECT
    (
        CASE
            WHEN leader_pid is NULL THEN pid
            ELSE leader_pid
        END
    ) AS leader_pid,
    (
        CASE
            WHEN state_change <= now() AND state != 'active' THEN extract(
                epoch
                FROM
                    state_change - query_start
            )
            ELSE extract(
                epoch
                FROM
                    now() - query_start
            )
        END
    ) AS query_duration,
    (
        CASE
            WHEN wait_event_type is NULL THEN 'CPU'
            ELSE coalesce(wait_event_type || '.' || wait_event, '')
        END
    ) AS wait_entry,
    query_id,
    (
        CASE
            WHEN state = 'active' THEN 'running'
            ELSE 'finished'
        END
    ) AS execute_state,
    query,
    datname,
    application_name,
    client_hostname,
    query_start
FROM
    pg_stat_activity
WHERE
    usename NOT IN ('aurora', 'replicator')
    AND backend_type IN ('client backend','parallel worker');

前提条件

  • 已创建RAM用户并为其授予如下权限。具体操作,请参见创建RAM用户并完成授权

    • AliyunRDSFullAccess:管理云数据库服务(RDS)的权限。

    • AliyunLogFullAccess:管理日志服务(Log)的权限。

    • AliyunCloudMonitorFullAccess:管理云监控的权限。

  • 已创建日志项目(Project)和日志库(LogStore)。具体操作,请参见开通日志服务

  • 已获取RDS PostgreSQL实例目标数据库的账号和密码。具体操作,请参见账号与权限

操作步骤

  1. 采集pg_stat_activity系统视图的会话信息,并定期将其发送至SLS的LogStore。

    本文结合SLS的Java SDK,提供一个定时采集并发送的示例。

    1. 安装SLS的Java SDK,详情请参见安装Java SDK

    2. 配置环境变量ALIBABA_CLOUD_ACCESS_KEY_IDALIBABA_CLOUD_ACCESS_KEY_SECRET,详情请参见配置环境变量

    3. 以Maven项目为例,在pom.xml中导入相关的Maven依赖。

      pom.xml示例

              <dependency>
                  <groupId>com.aliyun.openservices</groupId>
                  <artifactId>aliyun-log</artifactId>
                  <version>0.6.75</version>
              </dependency>
              <dependency>
                  <groupId>org.postgresql</groupId>
                  <artifactId>postgresql</artifactId>
                  <version>42.2.18</version>
              </dependency>
              <dependency>
                  <groupId>com.aliyun</groupId>
                  <artifactId>tea-openapi</artifactId>
                  <version>0.3.2</version>
              </dependency>
              <dependency>
                  <groupId>com.aliyun</groupId>
                  <artifactId>tea-console</artifactId>
                  <version>0.0.1</version>
              </dependency>
              <dependency>
                  <groupId>com.aliyun</groupId>
                  <artifactId>tea-util</artifactId>
                  <version>0.2.21</version>
              </dependency>
              <!-- 添加 Lombok 依赖 -->
              <dependency>
                  <groupId>org.projectlombok</groupId>
                  <artifactId>lombok</artifactId>
                  <version>1.18.4</version> <!-- 确保使用最新版本 -->
                  <scope>provided</scope>
              </dependency>
              <dependency>
                  <groupId>org.projectlombok</groupId>
                  <artifactId>lombok</artifactId>
                  <version>RELEASE</version>
                  <scope>provided</scope>
              </dependency>
              <dependency>
                  <groupId>ch.qos.logback</groupId>
                  <artifactId>logback-classic</artifactId>
                  <version>1.2.3</version>
              </dependency>
              <dependency>
                  <groupId>ch.qos.logback</groupId>
                  <artifactId>logback-core</artifactId>
                  <version>1.2.3</version>
              </dependency>
              <dependency>
                  <groupId>org.slf4j</groupId>
                  <artifactId>slf4j-api</artifactId>
                  <version>1.7.30</version>
              </dependency>
              <dependency>
                  <groupId>com.aliyun</groupId>
                  <artifactId>rds20140815</artifactId>
                  <version>5.0.1</version>
              </dependency>
              <dependency>
                  <groupId>com.aliyun</groupId>
                  <artifactId>alibabacloud-sls20201230</artifactId>
                  <version>4.0.7</version>
              </dependency>
    4. 运行如下示例程序,定期采集实例会话信息,并将其发送至SLS的LogStore。

      PgMonitor.java示例

      package org.example;
      
      import com.aliyun.openservices.log.Client;
      import com.aliyun.openservices.log.common.LogItem;
      import com.aliyun.openservices.log.exception.LogException;
      import com.aliyun.openservices.log.request.PutLogsRequest;
      
      import java.sql.*;
      import java.util.ArrayList;
      import java.util.List;
      import java.util.concurrent.Executors;
      import java.util.concurrent.ScheduledExecutorService;
      import java.util.concurrent.TimeUnit;
      
      public class PgMonitor {
      
          // PostgreSQL 连接信息
          private static final String PG_URL = "<jdbc:postgresql://your-host:5432/mydb>";
          private static final String PG_USER = "<your-user>";
          private static final String PG_PASSWORD = "<your-passwd>";
      
          // 阿里云日志服务信息
          private static final String LOG_ENDPOINT = "<your-sls-endpoint>";
          private static final String LOG_PROJECT = "<your-project>";
          private static final String LOG_STORE = "<your-logStore>";
          private static final String ACCESS_KEY_ID = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
          private static final String ACCESS_KEY_SECRET = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
      
          // 定时任务执行间隔(分钟)
          private static final int INTERVAL_MINUTES = 1;
      
          public static void main(String[] args) {
              ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
              Runnable task = () -> {
                  try {
                      // 从 PostgreSQL 获取活动数据
                      List<LogItem> logItems = fetchPgStatActivity();
      
                      // 将数据发送到阿里云日志服务
                      sendLogsToSLS(logItems);
                  } catch (Exception e) {
                      e.printStackTrace();
                  }
              };
      
              // 每隔 INTERVAL_MINUTES 分钟执行一次任务
              scheduler.scheduleAtFixedRate(task, 0, INTERVAL_MINUTES, TimeUnit.MINUTES);
          }
      
          private static List<LogItem> fetchPgStatActivity() throws SQLException {
              List<LogItem> logItems = new ArrayList<>();
      
              Connection conn = DriverManager.getConnection(PG_URL, PG_USER, PG_PASSWORD);
              Statement stmt = conn.createStatement();
              String query = "SELECT ( CASE WHEN leader_pid is NULL THEN pid ELSE leader_pid END ) as leader_pid, ( CASE WHEN state_change <= now() AND state != 'active' THEN extract( epoch from state_change - query_start ) ELSE extract( epoch from now() - query_start ) END ) AS query_duration, ( CASE WHEN wait_event_type is NULL THEN 'CPU' ELSE coalesce(wait_event_type || '.' || wait_event, '') END ) AS wait_entry, query_id, ( CASE WHEN state = 'active' THEN 'running' ELSE 'finished' END ) AS execute_state, query, datname, application_name, client_hostname, query_start FROM pg_stat_activity WHERE usename NOT IN ('aurora', 'replicator') AND backend_type IN ('client backend','parallel worker')";
              ResultSet rs = stmt.executeQuery(query);
      
              while (rs.next()) {
                  LogItem logItem = new LogItem();
                  logItem.PushBack("leader_pid", rs.getString("leader_pid"));
                  logItem.PushBack("query_duration", rs.getString("query_duration"));
                  logItem.PushBack("wait_entry", rs.getString("wait_entry"));
                  logItem.PushBack("query", rs.getString("query"));
                  logItem.PushBack("datname", rs.getString("datname"));
                  logItem.PushBack("application_name", rs.getString("application_name"));
                  logItem.PushBack("client_hostname", rs.getString("client_hostname"));
                  logItem.PushBack("query_start", rs.getString("query_start"));
                  logItems.add(logItem);
              }
      
              rs.close();
              stmt.close();
              conn.close();
      
              return logItems;
          }
      
          private static void sendLogsToSLS(List<LogItem> logItems) throws LogException {
              Client client = new Client(LOG_ENDPOINT, ACCESS_KEY_ID, ACCESS_KEY_SECRET);
              String topic = "pg_stat_activity";
              String source = "postgresql-monitor";
      
              PutLogsRequest request = new PutLogsRequest(LOG_PROJECT, LOG_STORE, topic, source, logItems);
              client.PutLogs(request);
          }
      }

      部分参数的含义如下,请用实际数值进行替换。

      参数

      描述

      示例

      PG_URL

      数据库实例链接地址串。包含需要监控的数据库名称。

      jdbc:postgresql://pgm-bp1c82mky1avip****.pg.rds.aliyuncs.com:5432/testdb01

      PG_USER

      数据库账号。

      testdbuser

      PG_PASSWORD

      数据库账号密码。

      ****

      LOG_ENDPOINT

      SLS服务入口,详情请参见服务入口

      cn-hangzhou.log.aliyuncs.com

      LOG_PROJECT

      SLS项目(Project)名称。

      rdspg-test

      LOG_STORE

      SLS日志库(Logstore)名称。

      rdspg-sls

      运行成功后,可以在SLS的LogStore中查询到实例会话的日志信息。详情请参见查询和分析日志

      重要

      在SLS的LogStore中创建索引后,才能对日志数据进行查询和分析。

  2. 将SLS日志接入到云监控。

    登录云监控控制台,创建SLS日志的监控指标,将SLS日志接入云监控。SLS日志接入云监控后,您可以创建监控大盘,通过监控大盘查看指定监控指标的监控图表。详情请参见管理SLS日志的监控指标

  3. (可选)通过RDS OpenAPI将慢日志和错误日志发生到SLS。

    通过监控实例会话、慢日志和错误日志信息,可以对系统的状态和性能进行分析,从而排查数据库性能问题。

    本文结合SLS的Java SDK,通过RDS的DescribeSlowLogRecords - 查看慢日志明细DescribeErrorLogs - 查看错误日志接口,定期获取RDS实例的慢查询日志和错误日志,并将其发送至SLS。示例程序如下。

    说明

    您可以通过API帮助文档中的调试按钮,进入该接口在OpenAPI门户的调试地址,查看该接口的安装及使用方法。

    PgCollectLogOpt.java示例

    package org.example;
    
    import com.aliyun.openservices.log.Client;
    import com.aliyun.openservices.log.common.LogItem;
    import com.aliyun.openservices.log.exception.LogException;
    import com.aliyun.openservices.log.request.PutLogsRequest;
    import lombok.extern.slf4j.Slf4j;
    
    import java.time.*;
    import java.time.format.DateTimeFormatter;
    import java.time.temporal.ChronoUnit;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.Executors;
    import java.util.concurrent.ScheduledExecutorService;
    import java.util.concurrent.TimeUnit;
    
    @Slf4j
    public class PgCollectLogOpt {
        private static final String LOG_ENDPOINT = "your-endpoint";
        private static final String LOG_PROJECT = "your-project";
        private static final String LOG_STORE = "your-log-store";
        private static final String ACCESS_KEY_ID = "your-access-key-id";
        private static final String ACCESS_KEY_SECRET = "your-access-key-secret";
        private static final int INTERVAL_MINUTES = 10;
        private static final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm'Z'");
        private static volatile String lastEndTime = getCurrentTime();
    
        public static com.aliyun.rds20140815.Client createClient() throws Exception {
            com.aliyun.teaopenapi.models.Config config = new com.aliyun.teaopenapi.models.Config()
                    .setAccessKeyId(ACCESS_KEY_ID)
                    .setAccessKeySecret(ACCESS_KEY_SECRET);
            config.endpoint = "rds.aliyuncs.com";
            return new com.aliyun.rds20140815.Client(config);
        }
    
        public static void main(String[] args) throws Exception {
            ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
            executor.scheduleAtFixedRate(() -> {
                try {
                    collectAndSendLogs();
                } catch (Exception e) {
                    log.error("Error during log collection and sending: ", e);
                }
            }, 0, INTERVAL_MINUTES, TimeUnit.MINUTES);
        }
    
        private static void collectAndSendLogs() throws Exception {
            com.aliyun.rds20140815.Client client = createClient();
            String instanceId = "pgm-bp1nz4ed24u6679d";
            String startTime = lastEndTime;
            String endTime = getNextEndTime(startTime);
    
            try {
                List<LogItem> slowLogs = getLogs(client, instanceId, startTime, endTime, LogType.SLOW);
                if (!slowLogs.isEmpty()) sendLogsToSLS(slowLogs);
    
                List<LogItem> errorLogs = getLogs(client, instanceId, startTime, endTime, LogType.ERROR);
                if (!errorLogs.isEmpty()) sendLogsToSLS(errorLogs);
    
                lastEndTime = endTime;
            } catch (Exception e) {
                log.error("Log collection error: ", e);
            }
        }
    
        private static List<LogItem> getLogs(com.aliyun.rds20140815.Client client, String instanceId, String startTime, String endTime, LogType logType) throws Exception {
            List<LogItem> logItems = new ArrayList<>();
            int pageNumber = 1, totalPage;
    
            do {
                totalPage = fetchAndProcessLogs(client, instanceId, startTime, endTime, logType, pageNumber, logItems);
                pageNumber++;
            } while (pageNumber <= totalPage);
    
            return logItems;
        }
    
        private static int fetchAndProcessLogs(com.aliyun.rds20140815.Client client, String instanceId, String startTime, String endTime, LogType logType, int pageNumber, List<LogItem> logItems) throws Exception {
            if (logType == LogType.SLOW) {
                var request = new com.aliyun.rds20140815.models.DescribeSlowLogRecordsRequest()
                        .setDBInstanceId(instanceId)
                        .setStartTime(startTime)
                        .setEndTime(endTime)
                        .setPageNumber(pageNumber)
                        .setPageSize(100);
                var response = client.describeSlowLogRecordsWithOptions(request, new com.aliyun.teautil.models.RuntimeOptions());
                var items = response.getBody().getItems().getSQLSlowRecord();
                items.forEach(item -> logItems.add(createLogItem(item.getExecutionStartTime(), item.getSQLText(), item.getHostAddress(), item.getDBName(), item.getQueryTimes().toString(), item.getLockTimes().toString())));
                return (int) Math.ceil((double) response.getBody().getTotalRecordCount() / response.getBody().getPageRecordCount());
            } else {
                var request = new com.aliyun.rds20140815.models.DescribeErrorLogsRequest()
                        .setDBInstanceId(instanceId)
                        .setStartTime(startTime)
                        .setEndTime(endTime)
                        .setPageNumber(pageNumber)
                        .setPageSize(100);
                var response = client.describeErrorLogsWithOptions(request, new com.aliyun.teautil.models.RuntimeOptions());
                var items = response.getBody().getItems().getErrorLog();
                items.forEach(item -> logItems.add(createLogItem(item.getCreateTime(), item.getErrorInfo())));
                return (int) Math.ceil((double) response.getBody().getTotalRecordCount() / response.getBody().getPageRecordCount());
            }
        }
    
        private static LogItem createLogItem(String utcTime, String... fields) {
            LogItem logItem = new LogItem();
            String collectTime = convertToBeijingTime(utcTime);
            logItem.PushBack("collectTime", collectTime);
            String[] fieldNames = {"sql", "hostAddress", "dbName", "queryTimes", "lockTimes", "errorInfo"};
    
            for (int i = 0; i < fields.length; i++) {
                logItem.PushBack(fieldNames[i], fields[i]);
            }
    
            return logItem;
        }
    
        private static String convertToBeijingTime(String utcTime) {
            Instant instant = Instant.parse(utcTime);
            ZonedDateTime utcDateTime = instant.atZone(ZoneId.of("UTC"));
            ZonedDateTime beijingDateTime = utcDateTime.withZoneSameInstant(ZoneId.of("Asia/Shanghai"));
            return DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ssXXX").format(beijingDateTime);
        }
    
        private static void sendLogsToSLS(List<LogItem> logItems) throws LogException {
            Client slsClient = new Client(LOG_ENDPOINT, ACCESS_KEY_ID, ACCESS_KEY_SECRET);
            PutLogsRequest request = new PutLogsRequest(LOG_PROJECT, LOG_STORE, "pg_stat_activity", "postgresql-monitor", logItems);
            slsClient.PutLogs(request);
        }
    
        private static String getCurrentTime() {
            return formatToIsoInstantWithoutMillis(Instant.now().minus(Duration.ofMinutes(10)).atZone(ZoneId.of("UTC")));
        }
    
        private static String getNextEndTime(String startTime) {
            Instant startInstant = Instant.parse(startTime.replace("Z", ":00Z"));
            return formatToIsoInstantWithoutMillis(startInstant.atZone(ZoneId.of("UTC")).plusMinutes(10).plusSeconds(1));
        }
    
        private static String formatToIsoInstantWithoutMillis(ZonedDateTime zdt) {
            return DATE_FORMATTER.format(zdt.truncatedTo(ChronoUnit.MINUTES));
        }
    
        private enum LogType {
            SLOW, ERROR;
        }
    }
    

    部分参数的含义如下,请用实际数值进行替换。

    参数

    描述

    示例

    LOG_ENDPOINT

    SLS服务入口,详情请参见服务入口

    cn-hangzhou.log.aliyuncs.com

    LOG_PROJECT

    SLS项目(Project)名称。

    rdspg-test

    LOG_STORE

    SLS日志库(Logstore)名称。

    rdspg-sls

    instanceId

    数据库实例ID。

    pgm-bp1c82mky1av****

  4. 在云监控中,根据监控的指标创建报警规则,当监控指标达到报警条件时,云监控自动给您发送报警通知。详情请参见创建报警规则