管道传输(Pipeline)

当您有批量操作、提高命令执行性能等需求时,您可以使用Redis管道传输(Pipeline,后面称为Pipeline)机制。Pipeline可以将多个命令同时发给服务端,减少网络延迟,并提高性能。云数据库Tair(兼容 Redis)支持原生Redis Pipeline。

Pipeline简介

通常情况下,客户端与Redis服务端通信时采用的是Ping-pong网络交互模式,Ping-pong模式是指客户端(Client)发送一个命令后会等待命令的执行结果,在客户端收到服务器端(Server)返回的结果后,再发送下一个命令,以此类推。

Redis也支持Pipeline模式,不同于Ping-pong模式,Pipeline模式类似流水线的工作模式:客户端发送一个命令后无需等待执行结果,会继续发送其他命令;在全部请求发送完毕后,客户端关闭请求,开始接收响应,收到执行结果后再与之前发送的命令按顺序进行一一匹配。在Pipeline模式的具体实现中,大部分Redis客户端采用批处理的方式,即一次发送多个命令,在接收完所有命令执行结果后再返回给上层业务。

下图为Ping-pong模式与Pipeline模式的网络通信示意图。pipeline

使用Pipeline可通过降低网络往返时延(Round-trip time,简称RTT),减少read()write()的系统调用以及进程上下文切换次数,以提升程序的执行效率与性能。

Pipeline在某些场景下非常有效,例如有多个操作命令需要被迅速提交至服务器端,但用户并不依赖每个操作返回的响应结果,对结果响应也无需立即获得,那么Pipeline就可以用来作为优化性能的批处理工具。

重要

使用Pipeline时客户端将独占与服务器端的连接,此期间将不能进行其他“非Pipeline”类型操作,直至Pipeline被关闭;如果要同时执行其他操作,可以为Pipeline操作单独建立一个连接,将其与常规操作分开。

更多信息,请参见Redis pipeline

注意事项

  • Pipeline不能保证原子性。

    Pipeline模式只是将客户端发送命令的方式改为发送批量命令,而服务端在处理批量命令的数据流时,仍然是解析出多个单命令并按顺序执行,各个命令相互独立,即服务端仍有可能在该过程中执行其他客户端的命令。如需保证原子性,请使用事务或Lua脚本。

  • 若Pipeline执行过程中发生错误,不支持回滚。

    Pipeline没有事务的特性,如待执行命令的前后存在依赖关系,请勿使用Pipeline。

    说明

    某些客户端(例如redis-py)在实现Pipeline时使用事务命令MULTI、EXEC进行伪装,请您在使用过程中关注Pipeline与事务的区别,否则可能会产生报错,关于事务的限制请参见Redis transactions

  • 由于服务端以及部分客户端存在缓存区限制,建议单次Pipeline中不要使用过多的命令。

  • Pipeline的本质为客户端与服务端的交互模式,与服务端的架构无关,因此集群架构代理模式、集群架构直连模式以及读写分离架构实例均支持Pipeline。

    说明

    由于集群架构本身具有一定限制,例如不支持在单个命令中访问跨Slot的Key、当访问到不属于本节点的数据时会产生-MOVED错误等,请在集群架构中使用Pipeline时确保Pipeline内部的命令符合集群架构的可执行条件,具体限制请参见集群架构与读写分离架构实例的命令限制

代码示例

性能对比

如下代码将演示使用Pipeline与不使用Pipeline的性能对比。

package pipeline.kvstore.aliyun.com;
import java.util.Date;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Pipeline;
public class RedisPipelinePerformanceTest {
        static final String host = "xxxxxx.m.cnhza.kvstore.aliyuncs.com";
        static final int port = 6379;
        static final String password = "password";
        public static void main(String[] args) {
            Jedis jedis = new Jedis(host, port);
                //ApsaraDB for Redis的实例密码
                String authString = jedis.auth(password);// password
                if (!authString.equals("OK")) {
                    System.err.println("AUTH Failed: " + authString);
                    jedis.close();
                    return;
                }
                //连续执行多次命令操作
                final int COUNT=5000;
                String key = "KVStore-Tanghan";
                // 1 ---不使用pipeline操作---
                jedis.del(key);//初始化key
                Date ts1 = new Date();
                for (int i = 0; i < COUNT; i++) {
                    //发送一个请求,并接收一个响应(Send Request and  Receive Response)
                    jedis.incr(key);
                }
                Date ts2 = new Date();
                System.out.println("不用Pipeline > value为:"+jedis.get(key)+" > 操作用时:" + (ts2.getTime() - ts1.getTime())+ "ms");
                //2 ----对比使用pipeline操作---
                jedis.del(key);//初始化key
                Pipeline p1 = jedis.pipelined();
                Date ts3 = new Date();
                for (int i = 0; i < COUNT; i++) {
                    //发出请求 Send Request 
                    p1.incr(key);
                }
                //接收响应 Receive Response
                p1.sync();
                Date ts4 = new Date();
                System.out.println("使用Pipeline > value为:"+jedis.get(key)+" > 操作用时:" + (ts4.getTime() - ts3.getTime())+ "ms");
                jedis.close();
        }
    }

在输入了正确的云数据库Tair(兼容 Redis)实例访问地址和密码之后,运行以上Java程序,输出结果如下。从中可以看出使用pipeline的性能要快的多。

不用Pipeline > value为:5000 > 操作用时:5844ms
使用Pipeline > value为:5000 > 操作用时:78ms

响应数据(Response)的处理方式

在Jedis中使用Pipeline时,对于响应数据(Response)的处理有两种方式,详情请参见以下代码示例。

package pipeline.kvstore.aliyun.com;
import java.util.List;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Pipeline;
import redis.clients.jedis.Response;
    public class PipelineClientTest {
        static final String host = "xxxxxxxx.m.cnhza.kvstore.aliyuncs.com";
        static final int port = 6379;
        static final String password = "password";
        public static void main(String[] args) {
            Jedis jedis = new Jedis(host, port);
                // 实例密码
                String authString = jedis.auth(password);// password
                if (!authString.equals("OK")) {
                    System.err.println("AUTH Failed: " + authString);
                    jedis.close();
                    return;
                }
                String key = "KVStore-Test1";
                jedis.del(key);//初始化
                // -------- 方法1
                Pipeline p1 = jedis.pipelined();
                System.out.println("-----方法1-----");
                for (int i = 0; i < 5; i++) {
                    p1.incr(key);
                    System.out.println("Pipeline发送请求");
                }
                // 发送请求完成,开始接收响应
                System.out.println("发送请求完成,开始接收响应");
                List<Object> responses = p1.syncAndReturnAll();
                if (responses == null || responses.isEmpty()) {
                    jedis.close();
                    throw new RuntimeException("Pipeline error: 没有接收到响应");
                }
                for (Object resp : responses) {
                    System.out.println("Pipeline接收响应Response: " + resp.toString());
                }
                System.out.println();
                //-------- 方法2
                System.out.println("-----方法2-----");
                jedis.del(key);//初始化
                Pipeline p2 = jedis.pipelined();  
                //需要先声明Response
                Response<Long> r1 = p2.incr(key); 
                System.out.println("Pipeline发送请求");
                Response<Long> r2 = p2.incr(key);
                System.out.println("Pipeline发送请求");
                Response<Long> r3 = p2.incr(key);
                System.out.println("Pipeline发送请求");
                Response<Long> r4 = p2.incr(key);  
                System.out.println("Pipeline发送请求");
                Response<Long> r5 = p2.incr(key);
                System.out.println("Pipeline发送请求");
                try{  
                    r1.get();  //此时还未开始接收响应,所以此操作会出错
                }catch(Exception e){  
                    System.out.println(" <<< Pipeline error:还未开始接收响应  >>> ");  
                }  
             // 发送请求完成,开始接收响应
                System.out.println("发送请求完成,开始接收响应");
                p2.sync();  
                System.out.println("Pipeline接收响应Response: " + r1.get());  
                System.out.println("Pipeline接收响应Response: " + r2.get());  
                System.out.println("Pipeline接收响应Response: " + r3.get());
                System.out.println("Pipeline接收响应Response: " + r4.get());
                System.out.println("Pipeline接收响应Response: " + r5.get());
                jedis.close();
            }
    }

在输入了正确的云数据库Tair(兼容 Redis)实例访问地址和密码之后,运行以上Java程序,输出结果如下:

-----方法1-----
Pipeline发送请求
Pipeline发送请求
Pipeline发送请求
Pipeline发送请求
Pipeline发送请求
发送请求完成,开始接收响应
Pipeline接收响应Response: 1
Pipeline接收响应Response: 2
Pipeline接收响应Response: 3
Pipeline接收响应Response: 4
Pipeline接收响应Response: 5
-----方法2-----
Pipeline发送请求
Pipeline发送请求
Pipeline发送请求
Pipeline发送请求
Pipeline发送请求
 <<< Pipeline error:还未开始接收响应  >>> 
发送请求完成,开始接收响应
Pipeline接收响应Response: 1
Pipeline接收响应Response: 2
Pipeline接收响应Response: 3
Pipeline接收响应Response: 4
Pipeline接收响应Response: 5