Redis(Pipeline)

Redis(Pipeline)

 Pipeline:“管道”,和很多设计模式中的“管道”具有同样的概念,pipleline的操作,将明确client与server端的交互,都是“单向的”:你可以将多个command,依次发给server,但在此期间,你将无法获得单个command的响应数据,此后你可以关闭“请求”,然后依次获取每个command的响应结果。

    从简单来说,在IO操作层面,对于client而言,就是一次批量的连续的“write”请求,然后是批量的连续的“read”操作。其实对于底层Socket-IO而言,对于client而言,只不过是多次write,然后一次read的操作;对于server端,input通道上read到数据之后,就会立即被实施,也会和非pipeline一样在output通道上输出执行的结果,只不过此时数据会被阻塞在网络缓冲区上,直到client端开始read或者关闭链接。神秘的面纱已被解开,或许你也能创造一个pipeline的实现。

    非pipleline模式:

    Request---->执行

    ---->Response

    Request---->执行

    ---->Response

    Pipeline模式下:

    Request---->执行,Server将响应结果队列化

    Request---->执行,Server将响应结果队列化

    ---->Response

    ---->Response

    Client端根据Redis的数据协议,将响应结果进行解析,并将结果做类似于“队列化”的操作。

 
Java代码  收藏代码
  1. public void pipeline(){  
  2.         String key = "pipeline-test";  
  3.         String old = jedis.get(key);  
  4.         if(old != null){  
  5.             System.out.println("Key:" + key + ",old value:" + old);  
  6.         }  
  7.         //代码模式1,这种模式是最常见的方式  
  8.         Pipeline p1 = jedis.pipelined();  
  9.         p1.incr(key);  
  10.         System.out.println("Request incr");  
  11.         p1.incr(key);  
  12.         System.out.println("Request incr");  
  13.         //结束pipeline,并开始从相应中获得数据  
  14.         List<Object> responses = p1.syncAndReturnAll();  
  15.         if(responses == null || responses.isEmpty()){  
  16.             throw new RuntimeException("Pipeline error: no response...");  
  17.         }  
  18.         for(Object resp : responses){  
  19.             System.out.println("Response:" + resp.toString());//注意,此处resp的类型为Long  
  20.         }  
  21.         //代码模式2  
  22.         Pipeline p2 = jedis.pipelined();  
  23.         Response<Long> r1 = p2.incr(key);  
  24.         try{  
  25.             r1.get();  
  26.         }catch(Exception e){  
  27.             System.out.println("Error,you cant get() before sync,because IO of response hasn't begin..");  
  28.         }  
  29.         Response<Long> r2 = p2.incr(key);  
  30.         p2.sync();  
  31.         System.out.println("Pipeline,mode 2,--->" + r1.get());  
  32.         System.out.println("Pipeline,mode 2,--->" + r2.get());  
  33.           
  34.     }  
     不过需要明确一下,pipeline和“事务”是两个完全不同的概念,pipeline只是表达“交互”中操作的传递的方向性,pipeline也可以在事务中运行,也可以不在。无论如何,pipeline中发送的每个command都会被server立即执行,如果执行失败,将会在此后的相应中得到信息;也就是pipeline并不是表达“所有command都一起成功”的语义;但是如果pipeline的操作被封装在事务中,那么将有事务来确保操作的成功与失败(事实上,Redis的事务,仍然不像严格意义上的事务,稍后介绍)。
Java代码  收藏代码
  1. public void txPipeline(){  
  2.     String key = "pipeline-test";  
  3.     String old = jedis.get(key);  
  4.     if(old != null){  
  5.         System.out.println("Key:" + key + ",old value:" + old);  
  6.     }  
  7.     Pipeline p1 = jedis.pipelined();  
  8.     p1.multi();//开启事务  
  9.     p1.incr(key);  
  10.     System.out.println("Request incr");  
  11.     p1.incr(key);  
  12.     System.out.println("Request incr");  
  13.     Response<List<Object>> txresult= p1.exec();//提交事务  
  14.     p1.sync();//关闭pipeline  
  15.     //结束pipeline,并开始从相应中获得数据  
  16.     List<Object> responses = txresult.get();  
  17.     if(responses == null || responses.isEmpty()){  
  18.         throw new RuntimeException("Pipeline error: no response...");  
  19.     }  
  20.     for(Object resp : responses){  
  21.         System.out.println("Response:" + resp.toString());//注意,此处resp的类型为Long  
  22.     }  
  23. }  
     Pipeline在某些场景下非常有用,比如有多个command需要被“及时的”提交,而且他们对相应结果没有互相依赖,而且对结果响应也无需立即获得,那么pipeline就可以充当这种“批处理”的工具;而且在一定程度上,可以较大的提升性能,性能提升的原因主要是TCP链接中较少了“交互往返”的时间。

    不过在编码时请注意,pipeline期间将“独占”链接,此期间将不能进行非“管道”类型的其他操作,直到pipeline关闭;比如在上述代码中间,使用jedis.set(key,value)等操作都将抛出异常。

    如果你的pipeline的指令集很庞大,为了不干扰链接中的其他操作,你可以为pipeline操作新建Client链接,让pipeline和其他正常操作分离在2个client中。不过pipeline事实上所能容忍的操作个数,和socket-output缓冲区大小/返回结果的数据尺寸都有很大的关系;同时也意味着每个redis-server同时所能支撑的pipeline链接的个数,也是有限的,这将受限于server的物理内存或网络接口的缓冲能力。

 

    使用场景举例:因为业务需要,我们需要把用户的操作过程记录在日志中以方便以后的统计,每隔3个小时生成一个新的日志文件,那么后台处理线程,将会扫描日志文件并将每条日志输出为“operation”:1,即表示操作次数为1;如果每个operation都发送一个command,事实上性能是很差的,而且是没有必要的;那么我们就可以使用pipeline批量提交即可。