当前位置: 首页 > news >正文

南华网站建设东莞seo网站推广

南华网站建设,东莞seo网站推广,中国水利教育培训网站,建筑导航网站目录 前置知识课程内容一、Zookeeper Java客户端实战1.1 Zookeeper 原生Java客户端使用1.2 Curator开源客户端使用快速开始使用示例 二、Zookeeper在分布式命名服务中的实战2.1 分布式API目录2.2 分布式节点的命名2.3 分布式的ID生成器 三、zookeeper实现分布式队列3.1 设计思路… 目录 前置知识课程内容一、Zookeeper Java客户端实战1.1 Zookeeper 原生Java客户端使用1.2 Curator开源客户端使用快速开始使用示例 二、Zookeeper在分布式命名服务中的实战2.1 分布式API目录2.2 分布式节点的命名2.3 分布式的ID生成器 三、zookeeper实现分布式队列3.1 设计思路3.2 使用Apache Curator实现分布式队列 学习总结感谢 前置知识 在学习本节课之前至少需要掌握Zookeeper的节点特性以及基本操作。 《【Zookeeper专题】Zookeeper特性与节点数据类型详解》 课程内容 一、Zookeeper Java客户端实战 Zookeeper的客户端有很多这边主要介绍的是两种 Zookeeper官方的Java客户端API第三方的Java客户端APICurator ZooKeeper官方的客户端API提供了基本的操作例如创建会话、增删查改节点等就是对原有命令交互式客户端的封装。不过Zookeeper官方客户端封装度比较低使用起来不是很方便。这种不方便体现在 ZooKeeper的Watcher监测是一次性的每次触发之后都需要重新进行注册会话超时之后没有实现重连机制异常处理烦琐ZooKeeper提供了很多异常对于开发人员来说可能根本不知道应该如何处理这些抛出的异常仅提供了简单的byte[]数组类型的接口没有提供Java POJO级别的序列化数据处理接口创建节点时如果抛出异常需要自行检查节点是否存在无法实现级联删除 当然不便之处不止这些不管怎样在实际开发中我们通常不是很建议使用官方API的。 1.1 Zookeeper 原生Java客户端使用 使用前先引入客户端的依赖 !-- zookeeper client -- dependencygroupIdorg.apache.zookeeper/groupIdartifactIdzookeeper/artifactIdversion3.8.0/version /dependency然后是代码示例 public class ZkClientDemo {private static final String CONNECT_STRlocalhost:2181;private final static String CLUSTER_CONNECT_STR192.168.65.156:2181,192.168.65.190:2181,192.168.65.200:2181;public static void main(String[] args) throws Exception {final CountDownLatch countDownLatchnew CountDownLatch(1);ZooKeeper zooKeeper new ZooKeeper(CLUSTER_CONNECT_STR,4000, new Watcher() {Overridepublic void process(WatchedEvent event) {if(Event.KeeperState.SyncConnectedevent.getState() event.getType() Event.EventType.None){//如果收到了服务端的响应事件连接成功countDownLatch.countDown();System.out.println(连接建立);}}});System.out.printf(连接中);countDownLatch.await();//CONNECTEDSystem.out.println(zooKeeper.getState());//创建持久节点zooKeeper.create(/user,fox.getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);} }客户端主要的API有 create(path, data, acl,createMode)创建一个给定路径的 znode并在 znode 保存 data[]的数据createMode指定 znode 的类型。 delete(path, version)如果给定 path上的znode的版本和给定的version匹配删除znode。 exists(path, watch)判断给定 path 上的 znode 是否存在并在 znode 设置一个 watch。 getData(path, watch)返回给定 path 上的 znode 数据并在 znode 设置一个 watch。 setData(path, data, version)如果给定 path 上的 znode 的版本和给定的 version 匹配设置 znode 数据。 getChildren(path, watch)返回给定 path 上的 znode 的孩子 znode 名字并在 znode 设置一个 watch。 sync(path)把客户端 session 连接节点和 leader 节点进行同步 以上这些API主要的特点如下 所有获取 znode 数据的 API 都可以设置一个 watch 用来监控 znode 的变化所有更新 znode 数据的 API 都有两个版本即无条件更新版本和条件更新版本。如果 version 为 -1更新为无条件更新。否则只有给定的 version 和 znode 当前的 version 一样才会进行更新这样的更新是条件更新。所有的方法都有同步和异步两个版本。同步版本的方法发送请求给 ZooKeeper 并等待服务器的响应。异步版本把请求放入客户端的请求队列然后马上返回。异步版本通过 callback 来接受来自服务端的响应不过ZK有一点不好的是对于同步异步方法没有在方法名上显示注明sync/async而是体现在请求参数callback上 例如这边简单演示一下同步跟异步创建节点方法。 // 同步创建并且返回创建节点的路径信息 Test public void createTest() throws KeeperException, InterruptedException {String path zooKeeper.create(ZK_NODE, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);log.info(created path: {},path); }// 异步创建 // 看最后一个lambda表达式 Test public void createAsycTest() throws InterruptedException {zooKeeper.create(ZK_NODE, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT,(rc, path, ctx, name) - log.info(rc {},path {},ctx {},name {},rc,path,ctx,name),context);TimeUnit.SECONDS.sleep(Integer.MAX_VALUE); }其余API这里就不演示了大家伙感兴趣的可以回头去试试。 1.2 Curator开源客户端使用 Curator是Netflix公司开源的一套ZooKeeper客户端框架和ZkClient一样它解决了非常底层的细节开发工作包括连接、重连、反复注册Watcher的问题以及NodeExistsException异常等。Curator还为ZooKeeper客户端框架提供了一些比较普遍的、开箱即用的、分布式开发用的解决方案例如Recipe、共享锁服务、Master选举机制和分布式计算器等帮助开发者避免了“重复造轮子”的无效开发工作。 快速开始 引入maven依赖 Curator的使用包含了几个包 curator-framework是对ZooKeeper的底层API的一些封装curator-client提供了一些客户端的操作例如重试策略等curator-recipes封装了一些高级特性如Cache事件监听、选举、分布式锁、分布式计数器、分布式Barrier等 !-- zookeeper client -- dependencygroupIdorg.apache.zookeeper/groupIdartifactIdzookeeper/artifactIdversion3.8.0/version /dependency!--curator-- dependencygroupIdorg.apache.curator/groupIdartifactIdcurator-recipes/artifactIdversion5.1.0/versionexclusionsexclusiongroupIdorg.apache.zookeeper/groupIdartifactIdzookeeper/artifactId/exclusion/exclusions /dependency创建一个客户端 在使用curator-framework包操作ZooKeeper前首先要创建一个客户端实例。这是一个CuratorFramework类型的对象有两种方法 使用工厂类CuratorFrameworkFactory的静态newClient()方法 // 重试策略 RetryPolicy retryPolicy new ExponentialBackoffRetry(1000, 3) //创建客户端实例 CuratorFramework client CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy); //启动客户端 client.start();使用工厂类CuratorFrameworkFactory的静态builder构造者方法 //随着重试次数增加重试时间间隔变大,指数倍增长baseSleepTimeMs * Math.max(1, random.nextInt(1 (retryCount 1))) RetryPolicy retryPolicy new ExponentialBackoffRetry(1000, 3);CuratorFramework client CuratorFrameworkFactory.builder().connectString(192.168.128.129:2181).sessionTimeoutMs(5000) // 会话超时时间.connectionTimeoutMs(5000) // 连接超时时间.retryPolicy(retryPolicy).namespace(base) // 包含隔离名称.build(); client.start();buidler调用链函数说明 connectionString服务器地址列表在指定服务器地址列表的时候可以是一个地址也可以是多个地址。如果是多个地址那么每个服务器地址列表用逗号分隔, 如 host1:port1,host2:port2,host3port3retryPolicy重试策略当客户端异常退出或者与服务端失去连接的时候可以通过设置客户端重新连接 ZooKeeper 服务端。而 Curator 提供了 一次重试、多次重试等不同种类的实现方式。在 Curator内部可以通过判断服务器返回的 keeperException 的状态代码来判断是否进行重试处理如果返回的是 OK 表示一切操作都没有问题而 SYSTEMERROR 表示系统或服务端错误 超时时间Curator 客户端创建过程中有两个超时时间的设置。一个是 sessionTimeoutMs 会话超时时间用来设置该条会话在 ZooKeeper 服务端的失效时间。另一个是 connectionTimeoutMs 客户端创建会话的超时时间用来限制客户端发起一个会话连接到接收 ZooKeeper 服务端应答的时间。sessionTimeoutMs 作用在服务端而 connectionTimeoutMs 作用在客户端。 使用示例 创建节点 创建节点的方式如下面的代码所示回顾我们之前课程中讲到的内容描述一个节点要包括节点的类型即临时节点还是持久节点、节点的数据信息、节点是否是有序节点等属性和性质。 Test public void testCreate() throws Exception {String path curatorFramework.create().forPath(/curator-node);curatorFramework.create().withMode(CreateMode.PERSISTENT).forPath(/curator-node,some-data.getBytes())log.info(curator create node :{} successfully.,path); }在 Curator 中可以使用 create 函数创建数据节点并通过 withMode 函数指定节点类型持久化节点临时节点顺序节点临时顺序节点持久化顺序节点等默认是持久化节点之后调用 forPath函数来指定节点的路径和数据信息。 一次性创建带层级结构的节点 Test public void testCreateWithParent() throws Exception {String pathWithParent/node-parent/sub-node-1;String path curatorFramework.create().creatingParentsIfNeeded().forPath(pathWithParent);log.info(curator create node :{} successfully.,path); }获取数据 Test public void testGetData() throws Exception {byte[] bytes curatorFramework.getData().forPath(/curator-node);log.info(get data from node :{} successfully.,new String(bytes)); }更新数据 我们通过客户端实例的 setData() 方法更新 ZooKeeper 服务上的数据节点在setData 方法的后边通过 forPath 函数来指定更新的数据节点路径以及要更新的数据。 Test public void testSetData() throws Exception {curatorFramework.setData().forPath(/curator-node,changed!.getBytes());byte[] bytes curatorFramework.setData().forPath(/curator-node);log.info(get data from node /curator-node :{} successfully.,new String(bytes)); }删除节点 Test public void testDelete() throws Exception {String pathWithParent/node-parent;curatorFramework.delete().guaranteed().deletingChildrenIfNeeded().forPath(pathWithParent); }guaranteed该函数的功能如字面意思一样主要起到一个保障删除成功的作用其底层工作方式是只要该客户端的会话有效就会在后台持续发起删除请求直到该数据节点在 ZooKeeper 服务端被删除。 deletingChildrenIfNeeded指定了该函数后系统在删除该数据节点的时候会以递归的方式直接删除其子节点以及子节点的子节点。 异步接口 Curator 引入了BackgroundCallback 接口用来处理服务器端返回来的信息这个处理过程是在异步线程中调用默认在 EventThread 中调用也可以自定义线程池。 public interface BackgroundCallback {/*** Called when the async background operation completes** param client the client* param event operation result details* throws Exception errors*/public void processResult(CuratorFramework client, CuratorEvent event) throws Exception; }如上接口主要参数为 client 客户端和服务端事件 event。inBackground异步处理默认在EventThread中执行 Test public void test() throws Exception {curatorFramework.getData().inBackground((item1, item2) - {log.info( background: {}, item2);}).forPath(ZK_NODE);TimeUnit.SECONDS.sleep(Integer.MAX_VALUE); }或者使用自定义线程池 Test public void test() throws Exception {ExecutorService executorService Executors.newSingleThreadExecutor();curatorFramework.getData().inBackground((item1, item2) - {log.info( background: {}, item2);},executorService).forPath(ZK_NODE);TimeUnit.SECONDS.sleep(Integer.MAX_VALUE); }Curator 监听器 我们知道ZK的一大特色便是他们的监听机制。Curator在监听方面相比于原生的客户端Curator将重复注册、事件信息等进行了高度封装让用户做到开箱即用。并且在监听事件返回了详细的信息包括变动的节点路径节点值等等这是原生API所没有的。 Curator 引入了 Cache 来实现对 Zookeeper 服务端事件监听Cache 事件监听可以理解为一个本地缓存视图与远程 Zookeeper 视图的对比过程。 官方推荐的节点监听API有 NodeCache已过期对某一个节点进行监听监听事件包括指定路径的增删改等操作 Slf4j public class NodeCacheTest extends AbstractCuratorTest{public static final String NODE_CACHE/node-cache;Testpublic void testNodeCacheTest() throws Exception {createIfNeed(NODE_CACHE);NodeCache nodeCache new NodeCache(curatorFramework, NODE_CACHE);nodeCache.getListenable().addListener(new NodeCacheListener() {Overridepublic void nodeChanged() throws Exception {log.info({} path nodeChanged: ,NODE_CACHE);printNodeData();}});nodeCache.start();}public void printNodeData() throws Exception {byte[] bytes curatorFramework.getData().forPath(NODE_CACHE);log.info(data: {},new String(bytes));} }PathChildrenCache已过期对指定路径节点的一级子目录监听不对该节点的操作监听。换句话说就是对其子目录的增删改操作监听 Slf4j public class PathCacheTest extends AbstractCuratorTest{public static final String PATH/path-cache;Testpublic void testPathCache() throws Exception {createIfNeed(PATH);PathChildrenCache pathChildrenCache new PathChildrenCache(curatorFramework, PATH, true);pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {Overridepublic void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {log.info(event: {},event);}});// 如果设置为true则在首次启动时就会缓存节点内容到Cache中pathChildrenCache.start(true);} }TreeCache已过期综合NodeCache和PathChildrenCahce的特性是对整个目录进行监听可以设置监听深度 public class TreeCacheTest extends AbstractCuratorTest{public static final String TREE_CACHE/tree-path;Testpublic void testTreeCache() throws Exception {createIfNeed(TREE_CACHE);TreeCache treeCache new TreeCache(curatorFramework, TREE_CACHE);treeCache.getListenable().addListener(new TreeCacheListener() {Overridepublic void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {log.info( tree cache: {},event);}});treeCache.start();} }CuratorCache上面的几个节点缓存API其实已经过期了最近的版本开始使用CuratorCache单个接口来替代它们在使用上也更为简单。我们来小小的看一下该类的创建api 如上所示构建节点缓存的build()方法提供了一个可选的参数options。Options是一个内部枚举类型如果不指定默认是缓存【给定节点开始的整个节点树】。 下面是一个简单的使用示例 package org.tuling.zk.curator;import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.imps.CuratorFrameworkState; import org.apache.curator.framework.recipes.cache.CuratorCache; import org.apache.curator.framework.recipes.cache.CuratorCacheListener; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode;public class TestCuratorCache {private final static String CLUSTER_CONNECT_STR114.132.46.145:2181;public static void main(String[] args) throws Exception {//构建客户端实例CuratorFramework curator CuratorFrameworkFactory.builder().connectString(CLUSTER_CONNECT_STR).retryPolicy(new ExponentialBackoffRetry(1000,3)) // 设置重试策略.build();//启动客户端curator.start();assert curator.getState().equals(CuratorFrameworkState.STARTED);curator.blockUntilConnected();if(curator.checkExists().forPath(/father) ! null) {curator.delete().deletingChildrenIfNeeded().forPath(/father);}// 创建CuratorCache实例基于路径/father/son/grandson1这里说的路径都是基于命名空间下的路径// 缓存构建选项是SINGLE_NODE_CACHECuratorCache cache CuratorCache.build(curator, /father/son/grandson1,CuratorCache.Options.SINGLE_NODE_CACHE);// 创建一系列CuratorCache监听器都是通过lambda表达式指定CuratorCacheListener listener CuratorCacheListener.builder()// 初始化完成时调用.forInitialized(() - System.out.println([forInitialized] : Cache initialized))// 添加或更改缓存中的数据时调用.forCreatesAndChanges((oldNode, node) - System.out.printf([forCreatesAndChanges] : Node changed: Old: [%s] New: [%s]\n,oldNode, node))// 添加缓存中的数据时调用.forCreates(childData - System.out.printf([forCreates] : Node created: [%s]\n, childData))// 更改缓存中的数据时调用.forChanges((oldNode, node) - System.out.printf([forChanges] : Node changed: Old: [%s] New: [%s]\n,oldNode, node))// 删除缓存中的数据时调用.forDeletes(childData - System.out.printf([forDeletes] : Node deleted: data: [%s]\n, childData))// 添加、更改或删除缓存中的数据时调用.forAll((type, oldData, data) - System.out.printf([forAll] : type: [%s] [%s] [%s]\n, type, oldData, data)).build();// 给CuratorCache实例添加监听器cache.listenable().addListener(listener);// 启动CuratorCachecache.start();// 创建节点/father/son/grandson1curator.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(/father/son/grandson1, data.getBytes());// 创建节点/father/son/grandson1/testcurator.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(/father/son/grandson1/test, test.getBytes());// 创建节点/father/son/grandson1/test/test2curator.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(/father/son/grandson1/test/test2, test2.getBytes());// 更改节点/father/son/grandson1的数据curator.setData().forPath(/father/son/grandson1, new data.getBytes());// 更改节点/father/son/grandson1/test的数据curator.setData().forPath(/father/son/grandson1/test, new test.getBytes());// 删除节点/father/son/grandson1curator.delete().deletingChildrenIfNeeded().forPath(/father/son/grandson1);Thread.sleep(10000000);} }二、Zookeeper在分布式命名服务中的实战 所谓命名服务其实就是为系统中的资源提供标识能力被命名的服务比如是集群中的某个机器提供服务的地址或者远程对象。ZooKeeper的命名服务主要是利用ZooKeeper节点的树形分层结构和子节点的顺序维护能力来为分布式系统中的资源命名。典型的用到了分布式命名服务的场景有 分布式API目录分布式节点命名分布式ID生成器 2.1 分布式API目录 分布式API目录即为分布式系统中各种API接口服务的名称、链接地址提供类似JNDIJava命名和目录接口中的文件系统的功能。借助于ZooKeeper的树形分层结构就能提供分布式的API调用功能。 在Dubbo中就是使用了当前方式。使用ZooKeeper维护的全局服务接口API的地址列表。大致的思路为 服务提供者Service Provider在启动的时候向ZooKeeper上的指定节点/dubbo/${serviceName}/providers写入自己的API地址这个操作就相当于服务的公开。服务消费者Consumer启动的时候订阅节点/dubbo/{serviceName}/providers下的服务提供者的URL地址获得所有服务提供者的API 大概的模型图如下 2.2 分布式节点的命名 一个分布式系统通常会由很多的节点组成节点的数量不是固定的而是不断动态变化的。比如说当业务不断膨胀和流量洪峰到来时大量的节点可能会动态加入到集群中。而一旦流量洪峰过去了就需要下线大量的节点。再比如说由于机器或者网络的原因一些节点会主动离开集群。 如何为大量的动态节点命名呢一种简单的办法是可以通过配置文件手动为每一个节点命名。但是如果节点数据量太大或者说变动频繁手动命名则是不现实的这就需要用到分布式节点的命名服务。 可用于生成集群节点的编号的方案 使用数据库的自增ID特性用数据表存储机器的MAC地址或者IP来维护使用ZooKeeper持久顺序节点的顺序特性来维护节点的NodeId编号 在第2种方案中集群节点命名服务的基本流程是 启动节点服务连接ZooKeeper检查命名服务根节点是否存在如果不存在就创建系统的根节点在根节点下创建一个临时顺序ZNode节点取回ZNode的编号把它作为分布式系统中节点的NODEID如果临时节点太多可以根据需要删除临时顺序ZNode节点 2.3 分布式的ID生成器 在分布式系统中分布式ID生成器的使用场景非常之多 大量的数据记录需要分布式ID。大量的系统消息需要分布式ID。大量的请求日志如restful的操作记录需要唯一标识以便进行后续的用户行为分析和调用链路分析。分布式节点的命名服务往往也需要分布式ID。… … 传统的数据库自增主键已经不能满足需求。在分布式系统环境中迫切需要一种全新的唯一ID系统这种系统需要满足以下需求 全局唯一不能出现重复ID高可用ID生成系统是基础系统被许多关键系统调用一旦宕机就会造成严重影响 市面上分布式的ID生成器方案大致如下 Java的UUID分布式缓存Redis生成ID利用Redis的原子操作INCR和INCRBY生成全局唯一的IDTwitter的SnowFlake算法雪花算法ZooKeeper生成ID利用ZooKeeper的顺序节点生成全局唯一的IDMongoDb的ObjectId:MongoDB是一个分布式的非结构化NoSQL数据库每插入一条记录会自动生成全局唯一的一个“_id”字段值它是一个12字节的字符串可以作为分布式系统中全局唯一的ID 我们这里介绍一下基于Zookeeper实现分布式ID生成器 基于Zookeeper实现分布式ID生成器 在ZooKeeper节点的四种类型中其中有以下两种类型具备自动编号的能力 PERSISTENT_SEQUENTIAL持久化顺序节点EPHEMERAL_SEQUENTIAL临时顺序节点 ZooKeeper的每一个节点都会为它的第一级子节点维护一份顺序编号会记录每个子节点创建的先后顺序这个顺序编号是分布式同步的也是全局唯一的。 可以通过创建ZooKeeper的临时顺序节点的方法生成全局唯一的ID Slf4j public class IDMaker extends CuratorBaseOperations {private String createSeqNode(String pathPefix) throws Exception {CuratorFramework curatorFramework getCuratorFramework();//创建一个临时顺序节点String destPath curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(pathPefix);return destPath;}public String makeId(String path) throws Exception {String str createSeqNode(path);if(null ! str){//获取末尾的序号int index str.lastIndexOf(path);if(index0){indexpath.length();return indexstr.length() ? str.substring(index):;}}return str;} }Test public void testMarkId() throws Exception {IDMaker idMaker new IDMaker();idMaker.init();String pathPrefix /idmarker/id-;for(int i0;i5;i){new Thread(()-{for (int j0;j10;j){String id null;try {id idMaker.makeId(pathPrefix);log.info({}线程第{}个创建的id为{},Thread.currentThread().getName(),j,id);} catch (Exception e) {e.printStackTrace();}}},threadi).start();}Thread.sleep(Integer.MAX_VALUE); }测试结果如下 基于Zookeeper实现SnowFlakeID算法 Twitter推特的SnowFlake算法是一种著名的分布式服务器用户ID生成算法。SnowFlake算法所生成的ID是一个64bit的长整型数字这个64bit被划分成四个部分其中后面三个部分分别表示时间戳、工作机器ID、序列号。 SnowFlakeID的四个部分具体介绍如下 1第一位占用1 bit其值始终是0没有实际作用 2时间戳占用41 bit精确到毫秒总共可以容纳约69年的时间 3工作机器id占用10 bit最多可以容纳1024个节点 4序列号占用12 bit。这个值意味着在同一毫秒同一节点上可以生成4096个id这已经是相当可观了 在工作节点达到1024顶配的场景下SnowFlake算法在同一毫秒最多可以生成的ID数量为 1024 * 4096 4194304在绝大多数并发场景下都是够用的。 SnowFlake算法的优点 生成ID时不依赖于数据库完全在内存生成高性能和高可用性容量大每秒可生成几百万个IDID呈趋势递增后续插入数据库的索引树时性能较高 SnowFlake算法的缺点 依赖于系统时钟的一致性如果某台机器的系统时钟回拨了有可能造成ID冲突或者ID乱序在启动之前如果这台机器的系统时间回拨过那么有可能出现ID重复的危险 基于ZK实现雪花算法的代码示例如下体现在第三部分机器id上 public class SnowflakeIdGenerator {/*** 单例*/public static SnowflakeIdGenerator instance new SnowflakeIdGenerator();/*** 初始化单例** param workerId 节点Id,最大8091* return the 单例*/public synchronized void init(long workerId) {if (workerId MAX_WORKER_ID) {// zk分配的workerId过大throw new IllegalArgumentException(woker Id wrong: workerId);}instance.workerId workerId;}private SnowflakeIdGenerator() {}/*** 开始使用该算法的时间为: 2017-01-01 00:00:00*/private static final long START_TIME 1483200000000L;/*** worker id 的bit数最多支持8192个节点*/private static final int WORKER_ID_BITS 13;/*** 序列号支持单节点最高每毫秒的最大ID数1024*/private final static int SEQUENCE_BITS 10;/*** 最大的 worker id 8091* -1 的补码二进制全1右移13位, 然后取反*/private final static long MAX_WORKER_ID ~(-1L WORKER_ID_BITS);/*** 最大的序列号1023* -1 的补码二进制全1右移10位, 然后取反*/private final static long MAX_SEQUENCE ~(-1L SEQUENCE_BITS);/*** worker 节点编号的移位*/private final static long WORKER_ID_SHIFT SEQUENCE_BITS;/*** 时间戳的移位*/private final static long TIMESTAMP_LEFT_SHIFT WORKER_ID_BITS SEQUENCE_BITS;/*** 该项目的worker 节点 id*/private long workerId;/*** 上次生成ID的时间戳*/private long lastTimestamp -1L;/*** 当前毫秒生成的序列*/private long sequence 0L;/*** Next id long.** return the nextId*/public Long nextId() {return generateId();}/*** 生成唯一id的具体实现*/private synchronized long generateId() {long current System.currentTimeMillis();if (current lastTimestamp) {// 如果当前时间小于上一次ID生成的时间戳说明系统时钟回退过出现问题返回-1return -1;}if (current lastTimestamp) {// 如果当前生成id的时间还是上次的时间那么对sequence序列号进行1sequence (sequence 1) MAX_SEQUENCE;if (sequence MAX_SEQUENCE) {// 当前毫秒生成的序列数已经大于最大值那么阻塞到下一个毫秒再获取新的时间戳current this.nextMs(lastTimestamp);}} else {// 当前的时间戳已经是下一个毫秒sequence 0L;}// 更新上次生成id的时间戳lastTimestamp current;// 进行移位操作生成int64的唯一ID//时间戳右移动23位long time (current - START_TIME) TIMESTAMP_LEFT_SHIFT;//workerId 右移动10位long workerId this.workerId WORKER_ID_SHIFT;return time | workerId | sequence;}/*** 阻塞到下一个毫秒*/private long nextMs(long timeStamp) {long current System.currentTimeMillis();while (current timeStamp) {current System.currentTimeMillis();}return current;} }三、zookeeper实现分布式队列 常见的消息队列有:RabbitMQRocketMQKafka等。Zookeeper作为一个分布式的小文件管理系统同样能实现简单的队列功能。但是Zookeeper不适合大数据量存储官方并不推荐作为队列使用但由于实现简单集群搭建较为便利因此在一些吞吐量不高的小型系统中还是比较好用的。 3.1 设计思路 创建队列根节点在Zookeeper中创建一个持久节点用作队列的根节点。所有队列元素的节点将放在这个根节点下实现入队操作当需要将一个元素添加到队列时可以在队列的根节点下创建一个临时有序节点。节点的数据可以包含队列元素的信息实现出队操作当需要从队列中取出一个元素时可以执行以下操作 获取根节点下的所有子节点找到具有最小序号的子节点获取该节点的数据删除该节点返回节点的数据 代码示例如下 /*** 入队* param data* throws Exception*/ public void enqueue(String data) throws Exception {// 创建临时有序子节点zk.create(QUEUE_ROOT /queue-, data.getBytes(StandardCharsets.UTF_8),ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); }/*** 出队* return* throws Exception*/ public String dequeue() throws Exception {while (true) {ListString children zk.getChildren(QUEUE_ROOT, false);if (children.isEmpty()) {return null;}Collections.sort(children);for (String child : children) {String childPath QUEUE_ROOT / child;try {byte[] data zk.getData(childPath, false, null);zk.delete(childPath, -1);return new String(data, StandardCharsets.UTF_8);} catch (KeeperException.NoNodeException e) {// 节点已被其他消费者删除尝试下一个节点}}} }3.2 使用Apache Curator实现分布式队列 Apache Curator是一个ZooKeeper客户端的封装库提供了许多高级功能包括分布式队列。 public class CuratorDistributedQueueDemo {private static final String QUEUE_ROOT /curator_distributed_queue;public static void main(String[] args) throws Exception {CuratorFramework client CuratorFrameworkFactory.newClient(localhost:2181,new ExponentialBackoffRetry(1000, 3));client.start();// 定义队列序列化和反序列化QueueSerializerString serializer new QueueSerializerString() {Overridepublic byte[] serialize(String item) {return item.getBytes();}Overridepublic String deserialize(byte[] bytes) {return new String(bytes);}};// 定义队列消费者QueueConsumerString consumer new QueueConsumerString() {Overridepublic void consumeMessage(String message) throws Exception {System.out.println(消费消息: message);}Overridepublic void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {}};// 创建分布式队列DistributedQueueString queue QueueBuilder.builder(client, consumer, serializer, QUEUE_ROOT).buildQueue();queue.start();// 生产消息for (int i 0; i 5; i) {String message Task- i;System.out.println(生产消息: message);queue.put(message);Thread.sleep(1000);}Thread.sleep(10000);queue.close();client.close();} }学习总结 学习了zookeeper客户端Curator的使用 感谢 感谢【51CTO博客】大佬【作者ITKaven】的文章。《ZooKeeper Curator框架之数据缓存与监听CuratorCache》
http://www.pierceye.com/news/178519/

相关文章:

  • 网上购物的网站开发背景wordpress more标签使用教程
  • 多语言网站建设方案新建网站的价格
  • 企业网站服务器的选择企业网站建设市场分析
  • 阜阳做网站的公司网页制作自学教程
  • 阿里巴巴吧网站怎么做网页设计师属于什么部门
  • 望京网站建设公司红酒专业网站建设
  • 兰州市城市建设设计院网站游戏网站搭建需要多少钱
  • 网站建设易客vi设计公司 深圳
  • 白银网站运行网站建设客户去哪里找
  • 建湖网站设计云浮网站设计
  • 招聘网站的简历可以做几份vue可以做pc网站吗
  • 高端个性化网站建设版面设计经历了哪几个阶段
  • wordpress本地图片一个网站完整详细的seo优化方案
  • 试玩网站怎么做5g影视
  • 宝宝投票网站怎么做网站首页添加代码
  • 网站开发分类网站建设的目标
  • 做旅游的网站有哪些专业做鞋子的网站
  • 深圳旅游网站开发新余网络推广
  • 平台网站建设ppt雪梨直播
  • 建设外贸类网站互联网下的网络营销
  • 网站开发需要的知识WordPress要什么环境
  • wordpress 多站点设置欧洲c2c平台
  • 赤峰网站开发公司做网站的公司怎么推销
  • 深圳福田区区住房和建设局网站wordpress 小程序开发
  • 网站建设与推cctv-10手机网站模板用什么做
  • 网站的建站方案网络科技有限公司
  • ps做图游戏下载网站有哪些内容广州网站(建设信科网络)
  • 专做皮鞋销售网站seo网站优化方案
  • 街区网站建设的意义做外贸网站 怎么收钱
  • 北京网站制作公司兴田德润可信赖给钱做h事都行的网站名