当Tair Serverless KV实例遇到流量突增,后台会自动扩容至原峰值的两倍。扩容期间超过原峰值部分请求会出现THROTTLED报错。对于此报错,您可以选择将请求重试或者放弃,下面将展示重试请求的代码示例。
Jedis
引入Jedis依赖。
<!-- 以5.2.0版本为例 --> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>5.2.0</version> </dependency>
代码示例。
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPoolConfig; import redis.clients.jedis.exceptions.JedisException; public class JedisThrottledTest { private static final Logger logger = LoggerFactory.getLogger(JedisThrottledTest.class); private static final int MAX_RETRY = 10; // max retry attempts public static void main(String[] args) { if (args.length < 3) { System.out.println("Usage: java -jar JedisThrottledTest.jar <host> <port> <password>"); return; } String host = args[0]; int port = Integer.parseInt(args[1]); String password = args[2]; JedisPoolConfig poolConfig = new JedisPoolConfig(); poolConfig.setMaxTotal(32); poolConfig.setMaxIdle(32); poolConfig.setMinIdle(16); JedisPool jedisPool = new JedisPool(poolConfig, host, port, 3000, password); for (int i = 0; i < 4; i++) { new Thread(new Runnable() { @Override public void run() { for (int i = 0; i < Integer.MAX_VALUE; i++) { executeWithRetry(jedisPool, "key" + i, "value" + i); } } }).start(); } } private static void executeWithRetry(JedisPool jedisPool, String key, String value) { int retryCount = 0; while (retryCount < MAX_RETRY) { try (Jedis jedis = jedisPool.getResource()) { jedis.set(key, value); break; } catch (JedisException e) { if (e.getMessage().contains("THROTTLED")) { logger.info("Throttled error occurred (attempt " + retryCount + "): " + e.getMessage()); retryCount++; if (retryCount >= MAX_RETRY) { logger.info("Max retry attempts reached."); throw e; } try { int sleepTime = (int)Math.pow(2, retryCount); Thread.sleep(sleepTime * 1000); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); throw new RuntimeException("Thread interrupted during retry delay", ie); } } else { throw e; } } } } }
Valkey-java
引入Valkey-java依赖。
重要必须使用 valkey-java 5.4.0 及以上版本。
<dependency> <groupId>io.valkey</groupId> <artifactId>valkey-java</artifactId> <version>5.4.0</version> </dependency>
代码示例。
import java.time.Duration; import io.valkey.DefaultJedisClientConfig; import io.valkey.ExceptionHandler; import io.valkey.HostAndPort; import io.valkey.UnifiedJedis; import io.valkey.providers.PooledConnectionProvider; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class ThrottledTest { private static final Logger logger = LoggerFactory.getLogger(ThrottledTest.class); /** * exponential backoff. */ static class ExponentialBackoffCallback implements ExceptionHandler.ErrorCallback { private int attempt = 0; @Override public void onError(String errorMessage) { int sleepTime = (int)Math.pow(2, attempt); try { logger.info("Sleeping for " + sleepTime + " seconds before handling: " + errorMessage); Thread.sleep(sleepTime * 1000); } catch (InterruptedException ie) { // ignore error } attempt++; } } public static void main(String[] args) { if (args.length < 3) { System.out.println("Usage: java -jar ThrottledTest.jar <host> <port> <password>"); return; } String host = args[0]; int port = Integer.parseInt(args[1]); String password = args[2]; GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig(); poolConfig.setMaxTotal(32); poolConfig.setMaxIdle(32); poolConfig.setMinIdle(16); int maxAttempts = 100; // max attempts to retry Duration maxTotalRetriesDuration = Duration.ofSeconds(1000); // max total retries duration PooledConnectionProvider provider = new PooledConnectionProvider(new HostAndPort(host, port), DefaultJedisClientConfig.builder().password(password).build(), poolConfig); ExceptionHandler handler = new ExceptionHandler(); handler.register( message -> message.contains("THROTTLED"), new ExponentialBackoffCallback() ); UnifiedJedis unifiedJedis = new UnifiedJedis(provider, maxAttempts, maxTotalRetriesDuration, handler); for (int i = 0; i < 4; i++) { // use 4 thread to make high qps new Thread(new Runnable() { @Override public void run() { for (int i = 0; i < Integer.MAX_VALUE; i++) { try { unifiedJedis.set("" + i, "" + i); } catch (Exception e) { logger.error("Error occurred {}", e.getMessage()); } } } }).start(); } } }
redis-py
本示例基于 redis-py 6.1.1版本。
import sys
import time
import threading
import logging
from redis import Redis, RedisError
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
MAX_RETRY = 10 # max retry attempts
def execute_with_retry(redis_client, key, value):
retry_count = 0
while retry_count < MAX_RETRY:
try:
redis_client.set(key, value)
break # If successful, exit the loop
except RedisError as e:
if "THROTTLED" in str(e):
logger.info(f"Throttled error occurred (attempt {retry_count}): {e}")
retry_count += 1
if retry_count >= MAX_RETRY:
logger.info("Max retry attempts reached.")
raise e
sleep_time = 2 ** retry_count
time.sleep(sleep_time)
else:
logger.error(f"Non-throttled Redis error: {e}")
raise e
def worker(redis_client):
i = 0
while True:
try:
execute_with_retry(redis_client, f"key{i}", f"value{i}")
i += 1
except Exception as e:
logger.exception(f"Unexpected error in worker: {e}")
time.sleep(1) # Avoid tight loop in case of persistent errors
def main():
if len(sys.argv) < 4:
print("Usage: python script.py <host> <port> <password>")
return
host = sys.argv[1]
port = int(sys.argv[2])
password = sys.argv[3]
redis_client = Redis(
host=host,
port=port,
password=password,
socket_timeout=3,
decode_responses=True
)
# Test the connection
try:
redis_client.ping()
logger.info("Successfully connected to Redis")
except RedisError as e:
logger.error(f"Failed to connect to Redis: {e}")
return
# Create and start 4 threads
threads = []
for i in range(10):
thread = threading.Thread(target=worker, args=(redis_client,))
thread.daemon = True
thread.start()
threads.append(thread)
logger.info(f"Started worker thread {i}")
# Keep the main thread running
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
logger.info("Program interrupted. Exiting...")
# Wait for all threads to complete
for thread in threads:
thread.join()
if __name__ == "__main__":
main()
该文章对您有帮助吗?