当前位置: 首页 > news >正文

知名企业门户网站建设联系电话营销型和展示型网站的区别

知名企业门户网站建设联系电话,营销型和展示型网站的区别,建筑效果图素材网站,网站备案 英文队列这种数据结构都不陌生#xff0c;特点就是先进先出。有很多常用的消息中间件可以有现成的该部分功能#xff0c;这里使用zookeeper基于发布订阅模式来实现分布式队列。对应的会有一个生产者和一个消费者。 这里理论上还是使用顺序节点。生产者不断产生新的顺序子节点特点就是先进先出。有很多常用的消息中间件可以有现成的该部分功能这里使用zookeeper基于发布订阅模式来实现分布式队列。对应的会有一个生产者和一个消费者。 这里理论上还是使用顺序节点。生产者不断产生新的顺序子节点消费者watcher监听节点新增事件来消费消息。 生产者 CuratorFramework client ... client.start(); String path /testqueue; client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath(path,11.getBytes()) 消费者 CuratorFramework client ... client.start(); String path /testqueue; PathChildrenCache pathCache new PathChildrenCache(client,path,true); pathCache.getListenable().addListener(new PathChildrenCacheListener() {Overridepublic void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {if(event.getType() PathChildrenCacheEvent.Type.CHILD_ADDED){ChildData data event.getData();//handle msgclient.delete().forPath(data.getPath());}} }); pathCache.start();使用curator queue 先来使用基本的队列类DistributedQueue。 DistributedQueue的初始化需要提交准备几个参数 client连接就不多说了: CuratorFramework client ...QueueSerializer这个主要是用来指定对消息data进行序列化和反序列化 这里就搞一个简单的字符串类型 QueueSerializerString serializer new QueueSerializerString() {Overridepublic byte[] serialize(String item) {return item.getBytes();}Overridepublic String deserialize(byte[] bytes) {return new String(bytes);} };QueueConsumer消息consumer当有新消息来的时候会调用consumer.consumeMessage()来处理消息 这里也搞个简单的string类型的处理consumer QueueConsumerString consumer new QueueConsumerString() {Overridepublic void consumeMessage(String s) throws Exception {System.out.println(receive msg:s);}Overridepublic void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {//TODO} };队列消息发布 //队列节点路径 String queuePath /queue; //使用上面准备的几个参数构造DistributedQueue对象 DistributedQueueString queue QueueBuilder.builder(client,consumer,serializer,queuePath).buildQueue(); queue.start(); //调用put方法生产消息 queue.put(hello); queue.put(msg); Thread.sleep(2000); queue.put(3);这样在启动测试程序在consumer的consumeMessage方法就会收到queue.put的消息。 这里有个问题有没有发现在初始化queue的时候需要指定consumer那岂不是只能同一个程序中生产消费何来的分布式 其实这里在queue对象创建的时候consumer可以为null这个时候queue就只生产消息。具体的逻辑需要看下DistributedQueue类的源码。 在DistributedQueue类的构造函数有一步设置isProducerOnly属性 isProducerOnly (consumer null);然后在start()方法会根据isProducerOnly来判断启动方式 if ( !isProducerOnly || (maxItems ! QueueBuilder.NOT_SET) ) {childrenCache.start(); }if ( !isProducerOnly ) {service.submit(new CallableObject(){Overridepublic Object call(){runLoop();return null;}}); }这里看到consumer为空两个if不成立不会初始化对那个的消息消费逻辑wather监听。只需要在另一个程序里创建queue启动时指定consumer即可。 源码分析 先从消息的发布也就是put方法 首先调用makeItemPath()获取创建节点路径 ZKPaths.makePath(queuePath, QUEUE_ITEM_NAME);这里QUEUE_ITEM_NAME“queue-”。 然后调用internalPut()方法来创建节点路径 //先累加消息数量putCount putCount.incrementAndGet(); //使用serializer序列化消息数据 byte[] bytes ItemSerializer.serialize(multiItem, serializer); //根据background来创建节点 if ( putInBackground ) {doPutInBackground(item, path, givenMultiItem, bytes); } else {doPutInForeground(item, path, givenMultiItem, bytes); }看doPutInForeground里就是具体的创建节点了 //创建节点 client.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath(path, bytes); //哦错了这里putCount不是总消息数是正在创建消息数创建完再回减 synchronized(putCount) {putCount.decrementAndGet();putCount.notifyAll(); }//如果有对应的lisener依次调用 putListenerContainer.forEach(listener - {if ( item ! null ){listener.putCompleted(item);}else{listener.putMultiCompleted(givenMultiItem);} });消息的发布就完成了。 然后是消息的consumer这里肯定是使用的watcher。这里还是回到前面start方法处根据isProducerOnly属性判断有两步操作 1、childrenCache.start(); childrenCache初始化是在queue的构造函数里 childrenCache new ChildrenCache(client, queuePath)其start方法会调用 private final CuratorWatcher watcher new CuratorWatcher() {Overridepublic void process(WatchedEvent event) throws Exception{if ( !isClosed.get() ){sync(true);}} };private final BackgroundCallback callback new BackgroundCallback(){Overridepublic void processResult(CuratorFramework client, CuratorEvent event) throws Exception{if ( event.getResultCode() KeeperException.Code.OK.intValue() ){setNewChildren(event.getChildren());}}};void start() throws Exception{sync(true);}private synchronized void sync(boolean watched) throws Exception{if ( watched ){//走这里client.getChildren().usingWatcher(watcher).inBackground(callback).forPath(path);}else{client.getChildren().inBackground(callback).forPath(path);}} 这里先把代码都贴上看到内部定义了一个watcher和callback。这里inBackground就是watcher到事件使用callback进行处理最后是调用到setNewChildren方法 private synchronized void setNewChildren(ListString newChildren) {if ( newChildren ! null ){Data currentData children.get();//将数据设置到children变量里消息版本1children.set(new Data(newChildren, currentData.version 1));//notifyAll() 等待线程获取消息notifyFromCallback();} }这里有引入了一个children变量然后将数据设置到了该变量里。 private final AtomicReferenceData children new AtomicReferenceData(new Data(Lists.StringnewArrayList(), 0));children其实是线程间通信一个共享数据容器变量。这里设置了数据然后具体的数据消费在下一步。 2、线程池里丢了个任务去执行runLoop();方法。 回到DistributedQueue.start的第二步执行runLoop()方法看名字就应该知道了一直轮询获取消息。 还是来看代码吧 private void runLoop() {long currentVersion -1;long maxWaitMs -1;//while一直轮询while ( state.get() State.STARTED ){try{//从childrenCache里获取数据ChildrenCache.Data data (maxWaitMs 0) ? childrenCache.blockingNextGetData(currentVersion, maxWaitMs, TimeUnit.MILLISECONDS) : childrenCache.blockingNextGetData(currentVersion);currentVersion data.version;ListString children Lists.newArrayList(data.children);sortChildren(children); // makes sure items are processed in the correct orderif ( children.size() 0 ){maxWaitMs getDelay(children.get(0));if ( maxWaitMs 0 ){continue;}}else{continue;}/**处理数据 这里取出消息后会删除节点然后使用serializer反序列化节点数据调用consumer.consumeMessage来处理消息**/processChildren(children, currentVersion);}}} }这里获取数据使用了childrenCache.blockingNextGetData synchronized Data blockingNextGetData(long startVersion, long maxWait, TimeUnit unit) throws InterruptedException {long startMs System.currentTimeMillis();boolean hasMaxWait (unit ! null);long maxWaitMs hasMaxWait ? unit.toMillis(maxWait) : -1;//数据版本没变一直wait等待while ( startVersion children.get().version ){if ( hasMaxWait ){long elapsedMs System.currentTimeMillis() - startMs;long thisWaitMs maxWaitMs - elapsedMs;if ( thisWaitMs 0 ){break;}wait(thisWaitMs);}else{wait();}}return children.get(); }这里就有wait阻塞等消息当消息来时候会被唤醒。 其它类型队列 curator对优先队列(DistributedPriorityQueue)、延迟队列(DistributedDelayQueue)都有对应的实现有兴趣的自己看吧。
http://www.pierceye.com/news/193934/

相关文章:

  • 网站备案号查电话号码商场网站开发
  • 手机网站建站教育模板下载泰州公司注册
  • 如何做商业网站推广西安市城乡建设管理局网站的公示栏
  • 上海做兼职哪个网站腾讯企业邮箱域名是什么
  • 霸州网站制作棋牌网站建设源码
  • 茶叶网站制作模板网页设计在安阳工资多少
  • 网站建设项目验收方案自己做捕鱼网站能不能挣钱
  • 微信网页网站怎么做我为群众办实事实践活动
  • 建设银行发卡银行网站福州 网站设计
  • 网站备案号码舟山高端网站建设
  • 买奢侈品代工厂做的产品的网站名建立网站 英语怎么说
  • 网站访问者qq计算机等级培训机构
  • 可以让外国人做问卷调查的网站济南优化seo网站建设公司
  • odoo做网站创建企业需要什么条件
  • 山西省旅游网站建设分析wordpress 个人介绍
  • 山东高级网站建设赚钱
  • 做网站大概要多少钱新建网站的外链多久生效
  • 天河区建设网站品牌网站建设小8蝌蚪
  • 深圳市企业网站seo点击软件小程序游戏开发公司
  • 南宁企业网站设计公怎么进wordpress
  • 商务网站建设一万字做视频剪辑接私活的网站
  • 网站开发绪论phpstudy建wordpress
  • 网站建设的基本流程有哪些wordpress产品页布局
  • 写过太原的网站免费漫画大全免费版
  • 毕业设计做系统好还是网站好冠县网站建设公司
  • 网站管理制度建设开发一个网站需要多少时间
  • 高校网站建设说明书微信公众号涨粉 网站
  • 深圳网站建设公司哪里好中国施工企业管理协会官网
  • 网站自动抢注步步高学习机进网站怎么做
  • 带域名的网站打不开深圳网站优化多少钱