机械行业营销型网站,wordpress获取当前分类下的子分类,网页设计一级页面,做一个网页一般多少钱文章目录 前言一、分布式锁的通用实现思路二、ZK实现分布式锁的思路三、ZK实现分布式锁的编码实现1、核心工具类实现2、测试代码编写线程安全问题复现使用上面封装的ZkLockHelper实现的分布式锁 优点缺点 总结 前言
就像上篇文章zookeeper全系列学习之统一配置获取说的#x… 文章目录 前言一、分布式锁的通用实现思路二、ZK实现分布式锁的思路三、ZK实现分布式锁的编码实现1、核心工具类实现2、测试代码编写线程安全问题复现使用上面封装的ZkLockHelper实现的分布式锁 优点缺点 总结 前言
就像上篇文章zookeeper全系列学习之统一配置获取说的有了naocs谁还用zk做配置中心呢一样现在项目中用zk实现分布式锁的估计也很少了但是我认为它其实是有存在的价值的因为它的临时顺序节点的特点当客户端不可用时他能及时识别从而避免客户端开线程去主动删除无论是为了学习还是工作亦或是为了拓展知识面我们还是了解下为好下面开始正文 一、分布式锁的通用实现思路
分布式锁的概念以及常规解决方案可以参考之前的博客聊聊分布式锁的解决方案今天我们先分析下分布式锁的实现思路
首先需要保证唯一性即某一时点只能有一个线程访问某一资源比方说待办短信通知功能每天早上九点短信提醒所有工单的处理人处理工单假设服务部署了20个容器那么早上九点的时候会有20个线程启动准备发送短信此时我们只能让一个线程执行短信发送否则用户会收到20条相同的短信其次需要考虑下何时应该释放锁这又分三种情况一是拿到锁的线程正常结束另一种是获取锁的线程异常退出还有种是获取锁的线程一直阻塞第一种情况直接释放即可第二种情况可以通过定义下锁的过期时间然后通过定时任务去释放锁zk的话直接通过临时节点即可最后一种阻塞的情况也可以通过定时任务来释放但是需要根据业务来综合判断如果业务本身就是长时间耗时的操作那么锁的过期时间就得设置的久一点最后当拿到锁的线程释放锁的时候如何通知其他线程可以抢锁了呢 这里简单介绍两种解决方案一种是所有需要锁的线程主动轮询固定时间去访问下看锁是否释放但是这种方案无端增加服务器压力并且时效性无法保证另一种就是zk的watch监听锁所在的目录一有变化立马得到通知
二、ZK实现分布式锁的思路
zk通过每个线程在同一父目录下创建临时有序节点然后通过比较节点的id大小来实现分布式锁功能再通过zk的watch机制实时获取节点的状态如果被删除立即重新争抢锁具体流程见线图 提示需要关注下图里判断自身不是最小节点时的监听情况为什么不监听父节点原因图里已有描述这里就不再赘述 三、ZK实现分布式锁的编码实现
1、核心工具类实现
通过不断的调试我封装了一个ZkLockHelper类里面封装了上锁和释放锁的方法为了方便我将zk的一些监听和回调机智也融合到一起了并没有抽出来下面贴上该类的全部代码
package com.darling.service.zookeeper.lock;import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.junit.platform.commons.util.StringUtils;import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;/*** description:* author: dll* date: Created in 2022/11/4 8:41* version:* modified By:*/
Data
Slf4j
public class ZkLockHelper implements AsyncCallback.StringCallback, AsyncCallback.StatCallback,Watcher, AsyncCallback.ChildrenCallback {private final String lockPath /lockItem;ZooKeeper zkClient;String threadName;CountDownLatch cd new CountDownLatch(1);private String pathName;/*** 上锁*/public void tryLock() {try {log.info(线程:{}正在创建节点,threadName);zkClient.create(lockPath,(threadName).getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,this,AAA);log.info(线程:{}正在阻塞......,threadName);// 由于上面是异步创建所以这里需要阻塞住当前线程cd.await();} catch (InterruptedException e) {e.printStackTrace();}}/*** 释放锁*/public void unLock() {try {zkClient.delete(pathName,-1);System.out.println(threadName 工作结束....);} catch (Exception e) {e.printStackTrace();}}/*** create方法的回调创建成功后在此处获取/DCSLock的子目录比较节点ID是否最小是则拿到锁。。。* param rc 状态码* param path create方法的path入参* param ctx create方法的上下文入参* param name 创建成功的临时有序节点的名称即在path的后面加上了zk维护的自增ID* 注意如果创建的不是有序节点那么此处的name和path的内容一致*/Overridepublic void processResult(int rc, String path, Object ctx, String name) {log.info(processResult,rx:{},path:{},ctx:{},name:{},rc,path,ctx.toString(),name);if (StringUtils.isNotBlank(name)) {try {pathName name ;// 此处path需注意要写/zkClient.getChildren(/, false,this,123);
// ListString children zkClient.getChildren(/, false);
// log.info(threadName:{},children:{},threadName,children);
// // 给children排序
// Collections.sort(children);
// int i children.indexOf(pathName.substring(1));
// // 判断自身是否第一个
// if (Objects.equals(i,0)) {
// // 是第一个则表示抢到了锁
// log.info(线程{}抢到了锁,threadName);
// cd.countDown();
// }else {
// // 表示没抢到锁
// log.info(线程{}抢锁失败重新注册监听器,threadName);
// zkClient.exists(/children.get(i-1),this,this,AAA);
// }} catch (Exception e) {e.printStackTrace();}}}/*** exists方法的回调此处暂不做处理* param rc* param path* param ctx* param stat*/Overridepublic void processResult(int rc, String path, Object ctx, Stat stat) {}/*** exists的watch监听* param event*/Overridepublic void process(WatchedEvent event) {//如果第一个线程锁释放了等价于第一个线程删除了节点此时只有第二个线程会监控的到switch (event.getType()) {case None:break;case NodeCreated:break;case NodeDeleted:zkClient.getChildren(/, false,this,123);
// // 此处path需注意要写/
// ListString children null;
// try {
// children zkClient.getChildren(/, false);
// } catch (KeeperException e) {
// e.printStackTrace();
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// log.info(threadName:{},children:{},threadName,children);
// // 给children排序
// Collections.sort(children);
// int i children.indexOf(pathName.substring(1));
// // 判断自身是否第一个
// if (Objects.equals(i,0)) {
// // 是第一个则表示抢到了锁
// log.info(线程{}抢到了锁,threadName);
// cd.countDown();
// }else {
// /**
// * 表示没抢到锁需要判断前置节点存不存在其实这里并不是特别关心前置节点存不存在所以其回调可以不处理
// * 但是这里关注的前置节点的监听当前置节点监听到被删除时就是其他线程抢锁之时
// */
// zkClient.exists(/children.get(i-1),this,this,AAA);
// }break;case NodeDataChanged:break;case NodeChildrenChanged:break;}}/*** getChildren方法的回调* param rc* param path* param ctx* param children*/Overridepublic void processResult(int rc, String path, Object ctx, ListString children) {try {log.info(threadName:{},children:{}, threadName, children);if (Objects.isNull(children)) {return;}// 给children排序Collections.sort(children);int i children.indexOf(pathName.substring(1));// 判断自身是否第一个if (Objects.equals(i, 0)) {// 是第一个则表示抢到了锁log.info(线程{}抢到了锁, threadName);cd.countDown();} else {// 表示没抢到锁log.info(线程{}抢锁失败重新注册监听器, threadName);/*** 表示没抢到锁需要判断前置节点存不存在其实这里并不是特别关心前置节点存不存在所以其回调可以不处理* 但是这里关注的前置节点的监听当前置节点监听到被删除时就是其他线程抢锁之时*/zkClient.exists(/ children.get(i - 1), this, this, AAA);}} catch (Exception e) {e.printStackTrace();}}
}提示代码中注释的代码块可以关注下原本是直接阻塞式编程将获取所有子节点并释放锁的操作直接写在getChildren方法的回调里后来发现当节点被删除时我们还要重新抢锁那么代码就冗余了于是结合响应式编程的思想将这段核心代码放到getChildren方法的回调里这样代码简洁了并且可以让业务更只关注于getChildren这件事了 2、测试代码编写
线程安全问题复现
package com.darling.service.zookeeper.lock;import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;/*** description: 开启是个线程给i做递减操作未加锁的情况下会有线程安全问题* author: dll* date: Created in 2022/11/8 8:32* version:* modified By:*/
Slf4j
public class ZkLockTest02 {private int i 10;Testpublic void test() throws InterruptedException {for (int n 0; n 10; n) {new Thread(new Runnable() {SneakyThrowsOverridepublic void run() {Thread.sleep(100);incre();}}).start();}Thread.sleep(5000);log.info(i {},i);}/*** i递减 线程不安全*/public void incre(){
// i.incrementAndGet();log.info(当前线程{},i {},Thread.currentThread().getName(),i--);}
}上面代码运行结果如下
使用上面封装的ZkLockHelper实现的分布式锁
package com.darling.service.zookeeper.lock;import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.zookeeper.ZooKeeper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;/*** description: 使用zk实现的分布式锁解决线程安全问题* author: dll* date: Created in 2022/11/8 8:32* version:* modified By:*/
Slf4j
public class ZkLockTest03 {ZooKeeper zkClient;Beforepublic void conn (){zkClient ZkUtil.getZkClient();}Afterpublic void close (){try {zkClient.close();} catch (InterruptedException e) {e.printStackTrace();}}private int i 10;Testpublic void test() throws InterruptedException {for (int n 0; n 10; n) {new Thread(new Runnable() {SneakyThrowsOverridepublic void run() {Thread.sleep(100);ZkLockHelper zkHelper new ZkLockHelper();// 这里给zkHelper设置threadName是为了后续调试的时候日志打印便于观察存在的问题String threadName Thread.currentThread().getName();zkHelper.setThreadName(threadName);zkHelper.setZkClient(zkClient);// tryLock上锁zkHelper.tryLock();incre();log.info(线程{}正在执行业务代码...,threadName);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}// 释放锁zkHelper.unLock();}}).start();}while (true) {}}/*** i递减 线程不安全*/public void incre(){
// i.incrementAndGet();log.info(☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆当前线程{},i {},Thread.currentThread().getName(),i--);}
}运行结果如下 由于日志中掺杂着zk的日志所有此处并未截全但是也能看到i是在按规律递减的不会出现通过线程拿到相同值的情况 #四、zk实现分布式锁的优缺点 优点
集群部署不存在单点故障问题统一视图 zk集群每个节点对外提供的数据是一致的数据一致性有所报障临时有序节点 zk提供临时有序节点这样当客户端失去连接时会自动释放锁不用像其他方案一样当拿到锁的实例服务不可用时需要定时任务去删除锁临时节点的特性就是当客户端失去连接会自动删除watch能力加持 当获取不到锁时无需客户端定期轮询争抢只需watch前一节点即可当有变化时会及时通知比普通方案即及时又高效注意这里最好只watch前一节点如果watch整个父目录的话当客户端并发较大时会不断有请求进出zk给zk性能带来压力
缺点
与单机版redis比较的话性能肯定较差但是当客户端集群足够庞大且业务量足够多时肯定还是集群更加稳定极端情况下还是会出现多个线程抢到同一把锁的问题假设某个线程拿到锁后还没执行业务代码就进入长时间的垃圾收集STW了此时与zk的连接也会消失然后此时别的线程的watch会被触发从而抢到锁去执行了但是当stw的线程恢复过来时继续执行自身的业务代码此时就会出现不一致的问题了当然个人认为这种设想太过极端了毕竟如果stw时间过长肯定会影响整个集群的性能的,所以我感觉可以不必考虑真的要解决那么再加上mysql乐观锁吧 总结
好了zk实现分布式锁的编码实现就到这了后续有时间再写redis、数据库实现的其实思路缕清了编码实现还是相对简单的