Tair Serverless KV客户端限流处理

Tair Serverless KV实例遇到流量突增,后台会自动扩容至原峰值的两倍。默认情况下,扩容期间超过原峰值部分请求会进行排队(与开源Redis行为一致)。如果需要被限流的请求直接返回错误,可设置参数return-err-when-throttle值为yes,此时发生限流的请求将返回THROTTLED报错;对于此报错,您可以选择将请求重试或者放弃,下面将展示重试请求的代码示例。

Jedis

  1. 引入Jedis依赖。

    <!-- 以5.2.0版本为例 -->
    <dependency>
        <groupId>redis.clients</groupId>
        <artifactId>jedis</artifactId>
        <version>5.2.0</version>
    </dependency>
  2. 代码示例。

    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import redis.clients.jedis.Jedis;
    import redis.clients.jedis.JedisPool;
    import redis.clients.jedis.JedisPoolConfig;
    import redis.clients.jedis.exceptions.JedisException;
    
    public class JedisThrottledTest {
        private static final Logger logger = LoggerFactory.getLogger(JedisThrottledTest.class);
    
        private static final int MAX_RETRY = 10; // max retry attempts
    
        public static void main(String[] args) {
            if (args.length < 3) {
                System.out.println("Usage: java -jar JedisThrottledTest.jar <host> <port> <password>");
                return;
            }
    
            String host = args[0];
            int port = Integer.parseInt(args[1]);
            String password = args[2];
    
            JedisPoolConfig poolConfig = new JedisPoolConfig();
            poolConfig.setMaxTotal(32);
            poolConfig.setMaxIdle(32);
            poolConfig.setMinIdle(16);
    
            JedisPool jedisPool = new JedisPool(poolConfig, host, port, 3000, password);
            for (int i = 0; i < 4; i++) {
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        for (int i = 0; i < Integer.MAX_VALUE; i++) {
                            executeWithRetry(jedisPool, "key" + i, "value" + i);
                        }
                    }
                }).start();
            }
        }
    
        private static void executeWithRetry(JedisPool jedisPool, String key, String value) {
            int retryCount = 0;
            while (retryCount < MAX_RETRY) {
                try (Jedis jedis = jedisPool.getResource()) {
                    jedis.set(key, value);
                    break;
                } catch (JedisException e) {
                    if (e.getMessage().contains("THROTTLED")) {
                        logger.info("Throttled error occurred (attempt " + retryCount + "): " + e.getMessage());
                        retryCount++;
                        if (retryCount >= MAX_RETRY) {
                            logger.info("Max retry attempts reached.");
                            throw e;
                        }
                        try {
                            int sleepTime = (int)Math.pow(2, retryCount);
                            Thread.sleep(sleepTime * 1000);
                        } catch (InterruptedException ie) {
                            Thread.currentThread().interrupt();
                            throw new RuntimeException("Thread interrupted during retry delay", ie);
                        }
                    } else {
                        throw e;
                    }
                }
            }
        }
    }

Valkey-java

  1. 引入Valkey-java依赖。

    重要

    必须使用 valkey-java 5.4.0 及以上版本。

    <dependency>
        <groupId>io.valkey</groupId>
        <artifactId>valkey-java</artifactId>
        <version>5.4.0</version>
    </dependency>
  2. 代码示例。

    import java.time.Duration;
    
    import io.valkey.DefaultJedisClientConfig;
    import io.valkey.ExceptionHandler;
    import io.valkey.HostAndPort;
    import io.valkey.UnifiedJedis;
    import io.valkey.providers.PooledConnectionProvider;
    import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class ThrottledTest {
        private static final Logger logger = LoggerFactory.getLogger(ThrottledTest.class);
    
        /**
         * exponential backoff.
         */
        static class ExponentialBackoffCallback implements ExceptionHandler.ErrorCallback {
            private int attempt = 0;
    
            @Override
            public void onError(String errorMessage) {
                int sleepTime = (int)Math.pow(2, attempt);
                try {
                    logger.info("Sleeping for " + sleepTime + " seconds before handling: " + errorMessage);
                    Thread.sleep(sleepTime * 1000);
                } catch (InterruptedException ie) {
                    // ignore error
                }
                attempt++;
            }
        }
    
        public static void main(String[] args) {
            if (args.length < 3) {
                System.out.println("Usage: java -jar ThrottledTest.jar <host> <port> <password>");
                return;
            }
    
            String host = args[0];
            int port = Integer.parseInt(args[1]);
            String password = args[2];
    
            GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
            poolConfig.setMaxTotal(32);
            poolConfig.setMaxIdle(32);
            poolConfig.setMinIdle(16);
    
            int maxAttempts = 100; // max attempts to retry
            Duration maxTotalRetriesDuration = Duration.ofSeconds(1000); // max total retries duration
            PooledConnectionProvider provider = new PooledConnectionProvider(new HostAndPort(host, port),
                DefaultJedisClientConfig.builder().password(password).build(), poolConfig);
    
            ExceptionHandler handler = new ExceptionHandler();
            handler.register(
                message -> message.contains("THROTTLED"),
                new ExponentialBackoffCallback()
            );
    
            UnifiedJedis unifiedJedis = new UnifiedJedis(provider, maxAttempts, maxTotalRetriesDuration, handler);
            for (int i = 0; i < 4; i++) { // use 4 thread to make high qps
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        for (int i = 0; i < Integer.MAX_VALUE; i++) {
                            try {
                                unifiedJedis.set("" + i, "" + i);
                            } catch (Exception e) {
                                logger.error("Error occurred {}", e.getMessage());
                            }
                        }
                    }
                }).start();
            }
        }
    }

redis-py

本示例基于 redis-py 6.1.1版本。

import sys
import time
import threading
import logging
from redis import Redis, RedisError

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

MAX_RETRY = 10  # max retry attempts

def execute_with_retry(redis_client, key, value):
    retry_count = 0
    while retry_count < MAX_RETRY:
        try:
            redis_client.set(key, value)
            break  # If successful, exit the loop
        except RedisError as e:
            if "THROTTLED" in str(e):
                logger.info(f"Throttled error occurred (attempt {retry_count}): {e}")
                retry_count += 1
                if retry_count >= MAX_RETRY:
                    logger.info("Max retry attempts reached.")
                    raise e
                sleep_time = 2 ** retry_count
                time.sleep(sleep_time)
            else:
                logger.error(f"Non-throttled Redis error: {e}")
                raise e

def worker(redis_client):
    i = 0
    while True:
        try:
            execute_with_retry(redis_client, f"key{i}", f"value{i}")
            i += 1
        except Exception as e:
            logger.exception(f"Unexpected error in worker: {e}")
            time.sleep(1)  # Avoid tight loop in case of persistent errors

def main():
    if len(sys.argv) < 4:
        print("Usage: python script.py <host> <port> <password>")
        return

    host = sys.argv[1]
    port = int(sys.argv[2])
    password = sys.argv[3]

    redis_client = Redis(
        host=host,
        port=port,
        password=password,
        socket_timeout=3,
        decode_responses=True
    )

    # Test the connection
    try:
        redis_client.ping()
        logger.info("Successfully connected to Redis")
    except RedisError as e:
        logger.error(f"Failed to connect to Redis: {e}")
        return

    # Create and start 4 threads
    threads = []
    for i in range(10):
        thread = threading.Thread(target=worker, args=(redis_client,))
        thread.daemon = True
        thread.start()
        threads.append(thread)
        logger.info(f"Started worker thread {i}")

    # Keep the main thread running
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        logger.info("Program interrupted. Exiting...")

    # Wait for all threads to complete
    for thread in threads:
        thread.join()

if __name__ == "__main__":
    main()