ZooKeeper(分布式锁)

ZooKeeper(分布式锁)


    分布式锁是控制分布式系统之间同步访问共享资源的一种方式。在分布式系统中,常常需要协调他们的动作。如果不同的系统或是同一个系统的不同主机之间共享了一个或一组资源,那么访问这些资源的时候,往往需要互斥来防止彼此干扰来保证一致性,在这种情况下,便需要使用到分布式锁。
    在分布式系统中,常常需要协调他们的动作。如果不同的系统或是同一个系统的不同主机之间共享了一个或一组资源,那么访问这些资源的时候,往往需要互斥来防止彼此干扰来保证一致性,这个时候,便需要使用到分布式锁。


    进程需要访问共享数据时, 就在"/locks"节点下创建一个sequence类型的子节点, 称为thisPath. 当thisPath在所有子节点中最小时, 说明该进程获得了锁. 进程获得锁之后, 就可以访问共享资源了. 访问完成后, 需要将thisPath删除. 锁由新的最小的子节点获得.
有了清晰的思路之后, 还需要补充一些细节. 进程如何知道thisPath是所有子节点中最小的呢? 可以在创建的时候, 通过getChildren方法获取子节点列表, 然后在列表中找到排名比thisPath前1位的节点, 称为waitPath, 然后在waitPath上注册监听, 当waitPath被删除后, 进程获得通知, 此时说明该进程获得了锁.



分布式锁的的demo:


package com.sw.lock;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

public class DistributedLock implements Lock, Watcher{
 private ZooKeeper zk = null;

 private String root = "/locks";//根
 private String lockName;//竞争资源的标志
 private String waitNode;//等待前一个锁
 private String myZnode;//当前锁

 private CountDownLatch latch;//计数器

 private int sessionTimeout = 5000;

 private boolean isGetLock = false;

 static volatile AtomicInteger count = new AtomicInteger(0);

 private DistributedLock(){}

 public static DistributedLock instanceLock(String lockName){
        return new DistributedLock(lockName);
 }

 /**
  * 创建分布式锁,使用前请确认config配置的zookeeper服务可用
  * @param config 127.0.0.1:2181
  * @param lockName 竞争资源标志,lockName中不能包含单词lock
  */
    private DistributedLock(String lockName){
       this.lockName = lockName;
       // 创建一个与服务器的连接
       try {
    	   zk = initZk();
    	   Stat stat = zk.exists(root, false);
    	   if(stat == null){
    		   // 创建根节点
    		   zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
    	   }
       } catch (KeeperException e) {
   		   throw new LockException(e);
       } catch (InterruptedException e) {
   		   throw new LockException(e);
       }
    }

 /**
  * zookeeper节点的监视器
 */
    public void process(WatchedEvent event) {
    	if(this.latch != null) {
            this.latch.countDown();
        }
    }

    public void lock() {
    	try {
    		if(this.tryLock()){
    			//System.out.println("Thread " + Thread.currentThread().getId() + " " +myZnode + " get lock true");
    			return;
    		}
    		else{
    			waitForLock(waitNode, sessionTimeout);//等待锁
    		}
    	} catch (KeeperException e) {
    		throw new LockException(e);
    	} catch (InterruptedException e) {
    		throw new LockException(e);
    	}
    }

    public boolean tryLock() {
    	try {
    		String splitStr = "_lock_";
    		if(lockName.contains(splitStr)){
    			throw new LockException("lockName can not contains \\u000B");
    		}
    		//创建临时子节点
    		myZnode = zk.create(root + "/" + lockName + splitStr, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
    		//System.out.println(myZnode + " is created ");

    		//取出所有子节点
    		List<String> subNodes = zk.getChildren(root, false);
    		//取出所有lockName的锁
    		List<String> lockObjNodes = new ArrayList<String>();
    		for (String node : subNodes) {
    			String _node = node.split(splitStr)[0];
    			if(_node.equals(lockName)){
    				lockObjNodes.add(node);
    			}
    		}
    		Collections.sort(lockObjNodes);
    		//System.out.println(myZnode + "==" + lockObjNodes.get(0));

    		if(myZnode.equals(root+"/"+lockObjNodes.get(0))){
    			//如果是最小的节点,则表示取得锁
    			return true;
    		}
    		//如果不是最小的节点,找到比自己小1的节点
    		String subMyZnode = myZnode.substring(myZnode.lastIndexOf("/") + 1);
    		waitNode = lockObjNodes.get(Collections.binarySearch(lockObjNodes, subMyZnode) - 1);
    	} catch (KeeperException e) {
    		throw new LockException(e);
    	} catch (InterruptedException e) {
    		throw new LockException(e);
    	}
    	return false;
    }

    @SuppressWarnings("finally")
 public boolean tryLock(long time, TimeUnit unit) {
    	try {
    		if(this.tryLock()){
    			return true;
    		}
           return waitForLock(waitNode,time);
    	} catch (Exception e) {
    		throw new LockException(e);
    	}finally{
        	return false;
    	}
    }

    private boolean waitForLock(String lower, long waitTime) throws InterruptedException, KeeperException {
        Stat stat = zk.exists(root + "/" + lower,true);
        //判断比自己小一个数的节点是否存在,如果不存在则无需等待锁,同时注册监听
        if(stat != null){
        	//System.out.println("Thread " + Thread.currentThread().getId() + " waiting for " + root + "/" + lower);
        	this.latch = new CountDownLatch(1);
        	isGetLock = this.latch.await(waitTime, TimeUnit.MILLISECONDS);
        	this.latch = null;
        }
        return true;
    }

    public void unlock() {
    	try {
    		//System.out.println("unlock " + myZnode);
    		zk.delete(myZnode,-1);
    		myZnode = null;
    		//zk.close();
    	} catch (InterruptedException e) {
    		throw new LockException(e);
    	} catch (KeeperException e) {
    		throw new LockException(e);
    	}
    }

    public synchronized ZooKeeper initZk() {
 try {
 if(zk==null){
 zk = new ZooKeeper("121.40.121.133:4181", sessionTimeout,this);
 }

 } catch (IOException e) {
    		throw new LockException("zk init connect fail" + e.getMessage());
 //System.err.println("zk init connect fail" + e.getMessage());
 }
 return zk;
 }

    public void lockInterruptibly() throws InterruptedException {
    	this.lock();
    }

    public Condition newCondition() {
    	return null;
    }


 public boolean isGetLock() {
 return isGetLock;
 }

 class LockException extends RuntimeException {
 private static final long serialVersionUID = 1L;
 public LockException(String e){
 super(e);
 }
 public LockException(Exception e){
 super(e);
 }
 }

    public static void main(String[] args) throws Exception {
        final long starttime = System.currentTimeMillis();

    	for(int i=0;i<30;i++){
 new Thread(new Runnable() {
 @Override
 public void run() {
 DistributedLock lock = DistributedLock.instanceLock("mylock");;
 while(true){
 try {
 lock.lock();

 count.incrementAndGet();
    		System.err.println(System.currentTimeMillis()+"|"+Thread.currentThread().getId() + " | lock value: " + count.get());

 } catch (Exception e) {
 e.printStackTrace();
 }finally{
 lock.unlock();
 long endtime = System.currentTimeMillis();
                            System.err.println(count.get()/((endtime-starttime)/1000)+"/s");
 }
 }
 }
 }).start();

 }
    	//Thread.sleep(10000);
 }
 } 
 
 

基于curator的实现

import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.ExponentialBackoffRetry;

import com.mogoroom.core.util.log.Logger;
import com.mogoroom.core.util.log.LoggerFactory;

public class DistributedLock extends IDistributedLock{
 public static final Logger LOGGER = LoggerFactory.getLogger(DistributedLock.class);
 
 
 public static long value = 1;
 public static String servers = "192.168.30.107:2181";
 
 private static CuratorFramework curator = null;
 private static zkListener listener = null;
 private static ThreadLocal<InterProcessMutex> lockLocal = new ThreadLocal<InterProcessMutex>();
 
 
 public synchronized static void init(String servers,String pwd){
 if(curator==null){
 curator = CuratorFrameworkFactory.builder().retryPolicy(new ExponentialBackoffRetry(1000, 3)).connectString(servers).build();
 listener = new zkListener();
    curator.getConnectionStateListenable().addListener(listener) ;  
    curator.start();
 }
 }
 public static boolean acquire(String key){
 try {
 InterProcessMutex lock = new InterProcessMutex(DistributedLock.curator, key);
 lock.acquire(3,TimeUnit.SECONDS);
    lockLocal.set(lock);
 }catch (IllegalMonitorStateException e) {
 LOGGER.warn("DistributedLock acquire error",e);
 return false;
 }catch (Exception e) {
 LOGGER.warn("DistributedLock acquire error",e);
 return false;
 }
 return true;
 }
 public static boolean acquire(String key, long timeout){
 try {
 InterProcessMutex lock = new InterProcessMutex(DistributedLock.curator, key);
 lock.acquire(timeout,TimeUnit.SECONDS);
    lockLocal.set(lock);
 } catch (Exception e) {
 LOGGER.warn("DistributedLock acquire error",e);
 return false;
 }
 return true;
 }
 public static void release(String key) {
 try {
 InterProcessMutex lock = lockLocal.get();
            lock.release();
        } catch (Exception e) {
 LOGGER.warn("DistributedLock release error",e);
        }
 }
 
 
 public static void main (String[] args) {
 DistributedLock.init("192.168.30.107:2181","");
 
        
    long start = System.currentTimeMillis();
    Executor pool = Executors.newFixedThreadPool(10);
    for (int i = 0; i < 300; i ++) {
        pool.execute(new Runnable() {
            public void run() {
            	while(true){
            		boolean getLock = false;
            		try {
            			if(DistributedLock.acquire("/lock", 2)){
            				getLock = true;
            				value++;
                    System.out.println(Thread.currentThread().getName()+","+value);
                    //Thread.sleep(1);
            	    long end = System.currentTimeMillis();
            	    long tps = value/((end-start)/1000);
                    System.out.println("tps count/s : "+tps);
            			}
                } catch (Exception e) {
                    e.printStackTrace();
                }finally{
                	if(getLock){
                	DistributedLock.release("/lock");
                	}
                }
            	}
            }
        });
    }
 }
}

class zkListener implements ConnectionStateListener{
 public static final Logger LOGGER = LoggerFactory.getLogger(DistributedLock.class);

 @Override
 public void stateChanged(CuratorFramework client, ConnectionState state) {  
        if (state == ConnectionState.LOST) {  
            //连接丢失  
 LOGGER.warn("DistributedLock lost session with zookeeper");

        } else if (state == ConnectionState.CONNECTED) {  
            //连接新建  
        	System.out.println("connected with zookeeper");  
 LOGGER.warn("DistributedLock connected with zookeeper");
        } else if (state == ConnectionState.RECONNECTED) {  
 LOGGER.warn("DistributedLock reconnected with zookeeper");
              
        }  
    } 
 
}