package org.example;
import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.common.LogItem;
import com.aliyun.openservices.log.exception.LogException;
import com.aliyun.openservices.log.request.PutLogsRequest;
import lombok.extern.slf4j.Slf4j;
import java.time.*;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@Slf4j
public class PgCollectLogOpt {
private static final String LOG_ENDPOINT = "your-endpoint";
private static final String LOG_PROJECT = "your-project";
private static final String LOG_STORE = "your-log-store";
private static final String ACCESS_KEY_ID = "your-access-key-id";
private static final String ACCESS_KEY_SECRET = "your-access-key-secret";
private static final int INTERVAL_MINUTES = 10;
private static final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm'Z'");
private static volatile String lastEndTime = getCurrentTime();
public static com.aliyun.rds20140815.Client createClient() throws Exception {
com.aliyun.teaopenapi.models.Config config = new com.aliyun.teaopenapi.models.Config()
.setAccessKeyId(ACCESS_KEY_ID)
.setAccessKeySecret(ACCESS_KEY_SECRET);
config.endpoint = "rds.aliyuncs.com";
return new com.aliyun.rds20140815.Client(config);
}
public static void main(String[] args) throws Exception {
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
executor.scheduleAtFixedRate(() -> {
try {
collectAndSendLogs();
} catch (Exception e) {
log.error("Error during log collection and sending: ", e);
}
}, 0, INTERVAL_MINUTES, TimeUnit.MINUTES);
}
private static void collectAndSendLogs() throws Exception {
com.aliyun.rds20140815.Client client = createClient();
String instanceId = "pgm-bp1nz4ed24u6679d";
String startTime = lastEndTime;
String endTime = getNextEndTime(startTime);
try {
List<LogItem> slowLogs = getLogs(client, instanceId, startTime, endTime, LogType.SLOW);
if (!slowLogs.isEmpty()) sendLogsToSLS(slowLogs);
List<LogItem> errorLogs = getLogs(client, instanceId, startTime, endTime, LogType.ERROR);
if (!errorLogs.isEmpty()) sendLogsToSLS(errorLogs);
lastEndTime = endTime;
} catch (Exception e) {
log.error("Log collection error: ", e);
}
}
private static List<LogItem> getLogs(com.aliyun.rds20140815.Client client, String instanceId, String startTime, String endTime, LogType logType) throws Exception {
List<LogItem> logItems = new ArrayList<>();
int pageNumber = 1, totalPage;
do {
totalPage = fetchAndProcessLogs(client, instanceId, startTime, endTime, logType, pageNumber, logItems);
pageNumber++;
} while (pageNumber <= totalPage);
return logItems;
}
private static int fetchAndProcessLogs(com.aliyun.rds20140815.Client client, String instanceId, String startTime, String endTime, LogType logType, int pageNumber, List<LogItem> logItems) throws Exception {
if (logType == LogType.SLOW) {
var request = new com.aliyun.rds20140815.models.DescribeSlowLogRecordsRequest()
.setDBInstanceId(instanceId)
.setStartTime(startTime)
.setEndTime(endTime)
.setPageNumber(pageNumber)
.setPageSize(100);
var response = client.describeSlowLogRecordsWithOptions(request, new com.aliyun.teautil.models.RuntimeOptions());
var items = response.getBody().getItems().getSQLSlowRecord();
items.forEach(item -> logItems.add(createLogItem(item.getExecutionStartTime(), item.getSQLText(), item.getHostAddress(), item.getDBName(), item.getQueryTimes().toString(), item.getLockTimes().toString())));
return (int) Math.ceil((double) response.getBody().getTotalRecordCount() / response.getBody().getPageRecordCount());
} else {
var request = new com.aliyun.rds20140815.models.DescribeErrorLogsRequest()
.setDBInstanceId(instanceId)
.setStartTime(startTime)
.setEndTime(endTime)
.setPageNumber(pageNumber)
.setPageSize(100);
var response = client.describeErrorLogsWithOptions(request, new com.aliyun.teautil.models.RuntimeOptions());
var items = response.getBody().getItems().getErrorLog();
items.forEach(item -> logItems.add(createLogItem(item.getCreateTime(), item.getErrorInfo())));
return (int) Math.ceil((double) response.getBody().getTotalRecordCount() / response.getBody().getPageRecordCount());
}
}
private static LogItem createLogItem(String utcTime, String... fields) {
LogItem logItem = new LogItem();
String collectTime = convertToBeijingTime(utcTime);
logItem.PushBack("collectTime", collectTime);
String[] fieldNames = {"sql", "hostAddress", "dbName", "queryTimes", "lockTimes", "errorInfo"};
for (int i = 0; i < fields.length; i++) {
logItem.PushBack(fieldNames[i], fields[i]);
}
return logItem;
}
private static String convertToBeijingTime(String utcTime) {
Instant instant = Instant.parse(utcTime);
ZonedDateTime utcDateTime = instant.atZone(ZoneId.of("UTC"));
ZonedDateTime beijingDateTime = utcDateTime.withZoneSameInstant(ZoneId.of("Asia/Shanghai"));
return DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ssXXX").format(beijingDateTime);
}
private static void sendLogsToSLS(List<LogItem> logItems) throws LogException {
Client slsClient = new Client(LOG_ENDPOINT, ACCESS_KEY_ID, ACCESS_KEY_SECRET);
PutLogsRequest request = new PutLogsRequest(LOG_PROJECT, LOG_STORE, "pg_stat_activity", "postgresql-monitor", logItems);
slsClient.PutLogs(request);
}
private static String getCurrentTime() {
return formatToIsoInstantWithoutMillis(Instant.now().minus(Duration.ofMinutes(10)).atZone(ZoneId.of("UTC")));
}
private static String getNextEndTime(String startTime) {
Instant startInstant = Instant.parse(startTime.replace("Z", ":00Z"));
return formatToIsoInstantWithoutMillis(startInstant.atZone(ZoneId.of("UTC")).plusMinutes(10).plusSeconds(1));
}
private static String formatToIsoInstantWithoutMillis(ZonedDateTime zdt) {
return DATE_FORMATTER.format(zdt.truncatedTo(ChronoUnit.MINUTES));
}
private enum LogType {
SLOW, ERROR;
}
}