池州专业网站建设,开发公司工程队营销的支持,百度推广关键词查询,店面装修设计Zookeeper 能保证数据的强一致性#xff0c;用户任何时候都可以相信集群中每个节点的数据都是相同的。一个用户创建一个节点作为锁#xff0c;另一个用户检测该节点#xff0c;如果存在#xff0c;代表别的用户已经锁住#xff0c;如果不存在#xff0c;则可以创建一个节… Zookeeper 能保证数据的强一致性用户任何时候都可以相信集群中每个节点的数据都是相同的。一个用户创建一个节点作为锁另一个用户检测该节点如果存在代表别的用户已经锁住如果不存在则可以创建一个节点代表拥有一个锁。 本篇内容包括Demo 概述、代码实现、测试结果 文章目录一、Demo 概述1、关于 zookeeper “命名服务协调”2、Demo 设计3、Demo 前提二、代码实现1、引用 Maven 依赖2、ConnectionWatcher 类创建 Zookeeper 连接3、ActiveKeyValueStore 类读写 Zookeeper 数据4、ZkLock 类实现分布式锁三、测试结果一、Demo 概述
1、关于 zookeeper “命名服务协调”
Zookeeper 能保证数据的强一致性用户任何时候都可以相信集群中每个节点的数据都是相同的。一个用户创建一个节点作为锁另一个用户检测该节点如果存在代表别的用户已经锁住如果不存在则可以创建一个节点代表拥有一个锁。
2、Demo 设计
分布式锁本质就是多个资源竞争者对一份资源的排他占有
我们设置多个线程分别在同一 path 下创建节点没个线程获取当前 path 下子节点看最小子节点是否为自身是则加锁成功更好的方式是用 Watcher 对前一个地址监控这里图方便用子节点排序取最小的方式 线程加锁成功后执行任务执行完毕后解锁
3、Demo 前提
参考Mac通过Docker安装Zookeeper集群 二、代码实现
1、引用 Maven 依赖 !-- 选择对应的Zookeeper版本 --dependencygroupIdorg.apache.zookeeper/groupIdartifactIdzookeeper/artifactIdversion3.7.0/version/dependency2、ConnectionWatcher 类创建 Zookeeper 连接
import java.io.IOException;
import java.util.concurrent.CountDownLatch;import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;public class ConnectionWatcher implements Watcher {private final CountDownLatch connectedSignal new CountDownLatch(1);private static final int SESSION_TIMEOUT 5000;protected ZooKeeper zk;public void connect(String hosts) throws IOException, InterruptedException {zk new ZooKeeper(hosts, SESSION_TIMEOUT, this);connectedSignal.await();}Overridepublic void process(WatchedEvent event) {if (event.getState() Event.KeeperState.SyncConnected) {connectedSignal.countDown();}}public void close() throws InterruptedException {zk.close();}}3、ActiveKeyValueStore 类读写 Zookeeper 数据
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.List;import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;public class ActiveKeyValueStore extends ConnectionWatcher {private static final Charset CHARSET StandardCharsets.UTF_8;int state 0;/*** 写入节点数据** param path 节点地址* param value 数据值* throws InterruptedException 中断异常* throws KeeperException ZooKeeper异常*/public void write(String path, String value) throws InterruptedException, KeeperException {Stat stat zk.exists(path, false);if (stat null) {if (value null) {zk.create(path, null,ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);} else {zk.create(path, value.getBytes(CHARSET),ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);}} else {if (value null) {zk.setData(path, null, -1);} else {zk.setData(path, value.getBytes(CHARSET), -1);}}}public boolean lock(String path, String name) throws InterruptedException, KeeperException {boolean flag tryLock(path, name);if (flag) {state;}return flag;}public boolean tryLock(String path, String name) throws InterruptedException, KeeperException {String lockPath path / name;zk.create(lockPath, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);ListString waits readChildren(path, null);Collections.sort(waits);if (waits.get(0).equals(name)) {return true;}CountDownLatch latch new CountDownLatch(1);for (int i 0; i waits.size(); i) {String cur waits.get(i);if (!cur.equalsIgnoreCase(name)) {continue;}String prePath path / waits.get(i - 1);zk.exists(prePath, new Watcher() {Overridepublic void process(WatchedEvent event) {latch.countDown();}});break;}latch.await();return true;}public boolean unlock(String path, String name) {if (state 1) {state--;return true;}String lockPath path / name;try {Stat stat zk.exists(lockPath, false);int version stat.getVersion();zk.delete(lockPath, version);state--;return true;} catch (Exception e) {System.out.println(unlock: lockPath ,exception,);}return false;}/*** 获取所有子节点** param path 节点地址* param watcher watcher* return 所有子节点* throws InterruptedException 中断异常* throws KeeperException ZooKeeper异常*/public ListString readChildren(String path, Watcher watcher) throws InterruptedException, KeeperException {ListString childrens null;if (watcher null) {childrens zk.getChildren(path, false);} else {childrens zk.getChildren(path, watcher, null);}return childrens;}
}4、ZkLock 类实现分布式锁
import lombok.SneakyThrows;
import org.apache.zookeeper.KeeperException;import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;public class ZkLock {/*** 开启的线程数模拟多客户端操作*/private static final int CLIENTS_NUM 3;private final ActiveKeyValueStore store;public ZkLock(String hosts) throws IOException, InterruptedException {//定义一个类store new ActiveKeyValueStore();//连接Zookeeperstore.connect(hosts);}public static void testLock() {//线程计数器控制业务的执行final CountDownLatch countDownLatch new CountDownLatch(CLIENTS_NUM);for (int i 0; i CLIENTS_NUM; i) {new Thread() {Overridepublic void run() {}}.start();}try {// 堵塞线程任务执行完后释放countDownLatch.await();} catch (InterruptedException e) {e.printStackTrace();}}public static void main(String[] args) throws IOException, InterruptedException, KeeperException {String hosts localhost:2181;ZkLock zkLock new ZkLock(hosts);// 创建父节点zkLock.store.write(/lock4, 父亲节点);//CountDownLatch latch new CountDownLatch(CLIENTS_NUM);for (int i 0; i CLIENTS_NUM; i) {int finalI i;new Thread() {SneakyThrowsOverridepublic void run() {String name Thread- String.valueOf(finalI);zkLock.store.lock(/lock4, name);TimeUnit.SECONDS.sleep(2);System.out.println(线程- name 执行完毕);latch.countDown();zkLock.store.unlock(/lock4, name);}}.start();}latch.await();System.out.println(end ...);}}三、测试结果
ZkLock 代码测试结果如下
线程-Thread-0执行完毕
线程-Thread-1执行完毕
线程-Thread-2执行完毕
end ...通过 ZkLock 打印的信息可以看出已经成功模拟实现分布式锁