当前位置: 首页 > news >正文

网站做推广的企业广告推广免费发布

网站做推广的企业,广告推广免费发布,继续浏览此网站(不推荐),物业网站宣传册怎么做关于storm的基础,参照我这篇文章:流式计算storm 关于并发和并行,参照我这篇文章:并发和并行 关于storm的并行度解释,参照我这篇文章:storm的并行度解释 关于storm的流分组策略,参照我这篇文章:storm的流分组策略 关于storm的消息可靠机制,参照我这篇文章:storm的消息可靠机制 …关于storm的基础,参照我这篇文章:流式计算storm 关于并发和并行,参照我这篇文章:并发和并行 关于storm的并行度解释,参照我这篇文章:storm的并行度解释 关于storm的流分组策略,参照我这篇文章:storm的流分组策略 关于storm的消息可靠机制,参照我这篇文章:storm的消息可靠机制 storm简介 Storm是一个免费并开源的分布式实时计算系统。利用Storm可以很容易做到可靠地处理无限的数据流像Hadoop批量处理大数据一样Storm可以实时处理数据。Storm简单可以使用任何编程语言。 storm核心组件 1.Nimbus 相当于storm的master,负责资源分配和任务调度,一个普通的storm集群只有一个nimbus(京东是对nimbus做了集群,加入了选举等概念,防止nimbus突然挂掉) 2.Supervisor 相当于storm的slave,负责接收Nimbus分配的任务,管理和启动所有的Worker 3.Worker 一个Worker就是一个jvm进程,对应一个Topology程序,可以有多个Executor 4.Executor 一个Executor就是一个线程,默认对应一个task,也可以设置成对应多个task 5.Task 一个Task是一个实例(spot/bolt),有多少个task就会new多少个bolt,task是storm中进行计算的最小的运行单位 6.Topology 拓扑结构,一个计算任务场景对应一个拓扑结构,拓扑结构中对声明spout和bolt直接的关系 7.Spout 是拓扑结构中的数据来源,可以向多个bolt发送数据,Spout 既可以定义为可靠的数据源也可以定义为不可靠的数据源 8.Bolt 真正的数据处理部分,一个bolt可以发给多个bolt,多个bolt也可以发给一个bolt 9.Component Spout和Bolt都是Component,Storm定义了一个名叫IComponent的总接口 全家谱如下绿色部分是我们最常用、比较简单的部分。红色部分是与事务相关的 spout和bolt的关系: 整体的topology结构: storm使用zookeeper来协调集群中的多个节点,但并不是用zookeeper来传递消息 zookeeper可以看这个 Nimbus和Supervisor都是无状态的,他们的心跳都由zookeeper协调 storm优点 1.使用简单,容易上手 2.可扩展,可以调整正在运行的topologies的并行度 3.容错,可靠,当工作节点宕了,storm会尝试重启另一个,而且Nimbus和Supervisors都是无状态的,死掉重启都不影响 4.无数据丢失,Storm的抽象组件确保了数据至少处理一次,即使使用消息队列系统失败时,也能确保消息被处理 5.支持多种编程语言,Storm用Thrift定义和提交topologies.由于Thrift能被任何一种编程语言使用,因此,topologies也能被任何一种编程语言定义和使用。 6.容易部署和操作 7.高性能,低延迟 storm入门案例 ( 实时统计单次个数 ) 首先导入maven依赖 dependencygroupIdorg.apache.storm/groupIdartifactIdstorm-core/artifactIdversion1.0.4/version/dependency1.先写一个Spout,确定数据源,实际应用中一般是接入kafka等消息,入门案例使用随机字符串代替 /*** 向后端发射tuple数据流* author soul**/ public class SentenceSpout extends BaseRichSpout {//BaseRichSpout是ISpout接口和IComponent接口的简单实现接口对用不到的方法提供了默认的实现private SpoutOutputCollector collector;private String[] sentences {my name is soul,im a boy,i have a dog,my dog has fleas,my girl friend is beautiful};private int index0;/*** open()方法中是ISpout接口中定义在Spout组件初始化时被调用。* open()接受三个参数:一个包含Storm配置的Map,一个TopologyContext对象提供了topology中组件的信息,SpoutOutputCollector对象提供发射tuple的方法。* 在这个例子中,我们不需要执行初始化,只是简单的存储在一个SpoutOutputCollector实例变量。*/Overridepublic void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {// TODO Auto-generated method stubthis.collector collector;}/*** nextTuple()方法是任何Spout实现的核心。* Storm调用这个方法向输出的collector发出tuple。* 在这里,我们只是发出当前索引的句子并增加该索引准备发射下一个句子。*/Overridepublic void nextTuple() {//collector.emit(new Values(hello world this is a test));// TODO Auto-generated method stubthis.collector.emit(new Values(sentences[index]));index;if (indexsentences.length) {index0;}Utils.sleep(1000);}/*** declareOutputFields是在IComponent接口中定义的所有Storm的组件spout和bolt都必须实现这个接口* 用于告诉Storm流组件将会发出那些数据流每个流的tuple将包含的字段*/Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {// TODO Auto-generated method stubdeclarer.declare(new Fields(sentence));//告诉组件发出数据流包含sentence字段}}2.写第一个bolt,将Spout传过来的Tuple拆成一个个的单次,循环发给下一个bolt /*** 订阅sentence spout发射的tuple流实现分割单词* author soul**/ public class SplitSentenceBolt extends BaseRichBolt {//BaseRichBolt是IComponent和IBolt接口的实现//继承这个类就不用去实现本例不关心的方法private OutputCollector collector;/*** prepare()方法类似于ISpout 的open()方法。* 这个方法在blot初始化时调用可以用来准备bolt用到的资源,比如数据库连接。* 本例子和SentenceSpout类一样,SplitSentenceBolt类不需要太多额外的初始化,* 所以prepare()方法只保存OutputCollector对象的引用。*/Overridepublic void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {// TODO Auto-generated method stubthis.collectorcollector;}/*** SplitSentenceBolt核心功能是在类IBolt定义execute()方法这个方法是IBolt接口中定义。* 每次Bolt从流接收一个订阅的tuple都会调用这个方法。* 本例中,收到的元组中查找“sentence”的值,* 并将该值拆分成单个的词,然后按单词发出新的tuple。*/Overridepublic void execute(Tuple input) {// TODO Auto-generated method stubString sentence input.getStringByField(sentence);String[] words sentence.split( );for (String word : words) {this.collector.emit(new Values(word));//向下一个bolt发射数据}}/*** plitSentenceBolt类定义一个元组流,每个包含一个字段(“word”)。*/Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {// TODO Auto-generated method stubdeclarer.declare(new Fields(word));}}3.再写一个bolt,一方面接收上个bolt传过来的单次,另一方面将相同单次出现的次数记录下来,并将现在的结果传给下个bolt /*** 订阅 split sentence bolt的输出流实现单词计数并发送当前计数给下一个bolt* author soul**/ public class WordCountBolt extends BaseRichBolt {private OutputCollector collector;//存储单词和对应的计数private HashMapString, Long counts null;//注不可序列化对象需在prepare中实例化/*** 大部分实例变量通常是在prepare()中进行实例化这个设计模式是由topology的部署方式决定的* 因为在部署拓扑时,组件spout和bolt是在网络上发送的序列化的实例变量。* 如果spout或bolt有任何non-serializable实例变量在序列化之前被实例化(例如,在构造函数中创建)* 会抛出NotSerializableException并且拓扑将无法发布。* 本例中因为HashMap 是可序列化的,所以可以安全地在构造函数中实例化。* 但是通常情况下最好是在构造函数中对基本数据类型和可序列化的对象进行复制和实例化* 而在prepare()方法中对不可序列化的对象进行实例化。*/Overridepublic void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {// TODO Auto-generated method stubthis.collector collector;this.counts new HashMapString, Long();}/*** 在execute()方法中,我们查找的收到的单词的计数(如果不存在初始化为0)* 然后增加计数并存储,发出一个新的词和当前计数组成的二元组。* 发射计数作为流允许拓扑的其他bolt订阅和执行额外的处理。*/Overridepublic void execute(Tuple input) {// TODO Auto-generated method stubString word input.getStringByField(word);Long count this.counts.get(word);if (count null) {count 0L;//如果不存在初始化为0}count;//增加计数this.counts.put(word, count);//存储计数this.collector.emit(new Values(word,count));}/****/Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {// TODO Auto-generated method stub//声明一个输出流其中tuple包括了单词和对应的计数向后发射//其他bolt可以订阅这个数据流进一步处理declarer.declare(new Fields(word,count));}}4.再写一个bolt,接收上个bolt传过来的单次统计结果,在控制台打印.实际最后一步一般会将数据结果存在非关系型数据库中,比如存入HBase或者Redis中 /*** 生成一份报告* author soul**/ public class ReportBolt extends BaseRichBolt {private HashMapString, Long counts null;//保存单词和对应的计数Overridepublic void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {// TODO Auto-generated method stubthis.counts new HashMapString, Long();}Overridepublic void execute(Tuple input) {// TODO Auto-generated method stubString word input.getStringByField(word);Long count input.getLongByField(count);this.counts.put(word, count);//实时输出System.out.println(结果:this.counts);}Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {// TODO Auto-generated method stub//这里是末端bolt不需要发射数据流这里无需定义}/*** cleanup是IBolt接口中定义* Storm在终止一个bolt之前会调用这个方法* 本例我们利用cleanup()方法在topology关闭时输出最终的计数结果* 通常情况下cleanup()方法用来释放bolt占用的资源如打开的文件句柄或数据库连接* 但是当Storm拓扑在一个集群上运行IBolt.cleanup()方法不能保证执行这里是开发模式生产环境不要这样做。*/Overridepublic void cleanup(){System.out.println(---------- FINAL COUNTS -----------);ArrayListString keys new ArrayListString();keys.addAll(this.counts.keySet());Collections.sort(keys);for(String key : keys){System.out.println(key : this.counts.get(key));}System.out.println(----------------------------);}}5.写拓扑结构,将前面四步的Spout和Bolt组成一个拓扑结构,直接运行主方法就能看到结果,这个是Storm的本地模式,将提交的方法稍作修改,就可以变成集群模式,实际都是集群模式,将这些代码打成jar包传到Nimbus上,运行在集群中 /*** 实现单词计数topology**/ public class App {private static final String SENTENCE_SPOUT_ID sentence-spout;private static final String SPLIT_BOLT_ID split-bolt;private static final String COUNT_BOLT_ID count-bolt;private static final String REPORT_BOLT_ID report-bolt;private static final String TOPOLOGY_NAME word-count-topology;public static void main( String[] args ) //throws Exception{//System.out.println( Hello World! );//实例化spout和boltSentenceSpout spout new SentenceSpout();SplitSentenceBolt splitBolt new SplitSentenceBolt();WordCountBolt countBolt new WordCountBolt();ReportBolt reportBolt new ReportBolt();TopologyBuilder builder new TopologyBuilder();//创建了一个TopologyBuilder实例//TopologyBuilder提供流式风格的API来定义topology组件之间的数据流//builder.setSpout(SENTENCE_SPOUT_ID, spout);//注册一个sentence spout//设置两个Executeor(线程)默认一个builder.setSpout(SENTENCE_SPOUT_ID, spout,2);// SentenceSpout -- SplitSentenceBolt//注册一个bolt并订阅sentence发射出的数据流shuffleGrouping方法告诉Storm要将SentenceSpout发射的tuple随机均匀的分发给SplitSentenceBolt的实例//builder.setBolt(SPLIT_BOLT_ID, splitBolt).shuffleGrouping(SENTENCE_SPOUT_ID);//SplitSentenceBolt单词分割器设置4个Task2个Executeor(线程)builder.setBolt(SPLIT_BOLT_ID, splitBolt,2).setNumTasks(4).shuffleGrouping(SENTENCE_SPOUT_ID);// SplitSentenceBolt -- WordCountBolt//fieldsGrouping将含有特定数据的tuple路由到特殊的bolt实例中//这里fieldsGrouping()方法保证所有“word”字段相同的tuuple会被路由到同一个WordCountBolt实例中//builder.setBolt(COUNT_BOLT_ID, countBolt).fieldsGrouping( SPLIT_BOLT_ID, new Fields(word));//WordCountBolt单词计数器设置4个Executeor(线程)builder.setBolt(COUNT_BOLT_ID, countBolt,4).fieldsGrouping( SPLIT_BOLT_ID, new Fields(word));// WordCountBolt -- ReportBolt//globalGrouping是把WordCountBolt发射的所有tuple路由到唯一的ReportBoltbuilder.setBolt(REPORT_BOLT_ID, reportBolt).globalGrouping(COUNT_BOLT_ID);Config config new Config();//Config类是一个HashMapString,Object的子类用来配置topology运行时的行为//设置worker数量//config.setNumWorkers(2);LocalCluster cluster new LocalCluster();//本地提交cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());Utils.sleep(10000);cluster.killTopology(TOPOLOGY_NAME);cluster.shutdown();} }storm与其他流式计算框架的对比 1.Spark Streaming 在处理前按时间间隔预先将其切分为一段一段的批处理作业. Spark针对持续性数据流的抽象称为DStreamDiscretizedStream, 一个DStream是一个微批处理micro-batching的RDD弹性分布式数据集, 而RDD则是一种分布式数据集能够以两种方式并行运作分别是任意函数和滑动窗口数据的转换。 2.Flink 针对流数据和批数据的分布式处理引擎 原生的流处理系统, 其所要处理的主要场景就是流数据,批数据只是流数据的一个极限特例而已 Flink 会把所有任务当成流来处理 3.Storm 原生的流处理系统,可以做到毫秒级处理
http://www.pierceye.com/news/615308/

相关文章:

  • 宁波建网站选哪家好一点wordpress手机全部显示
  • 如何注册属于自己的网站做列表的网站
  • 网站公司seo杭州网站建设模板
  • 网站内链如何布局优化大师下载
  • 如何做网站需求表格清单电影购买网站怎么设计
  • 有口碑的常州网站建设家政公司网站建设方案
  • 用户体验设计师吉林网站seo
  • 便宜营销型网站建设优化建站多网站绑定域名
  • 什么网站教人做3d效果图网站建设电话销售不被挂断
  • 村级网站建设 不断增强免费logo设计图案创意
  • 做网站优化有什么途径什么类型的公司需要做建设网站的
  • 计算机毕设代做网站深圳自适应网站开发
  • 万网主机建设网站流程idc 网站备案
  • 收费用的网站怎么做珠海网站关键词推广
  • 学技巧网站制作网站建设税率多少
  • 高端网站设计平台网页设计模板的网站
  • 万网云服务器网站上线网站开发开票税率
  • 西安高端网站制作公司网站开发需要哪些知识
  • 不错的网站建设公网站建设产品展示型的
  • 泰安住房和城乡建设局网站东莞网站推广哪家好信息
  • 个人网站制作的选题意义简短干净三字公司起名
  • 网站卡密代理怎么做网站建设有关表格
  • 易语言可以做网站么永久免费linux云主机
  • 什么网站可以免费做视频软件网站广告推广价格
  • 网站建设手机软件黄页88收费吗
  • 郑州网站建设多少钱wordpress分享获得积分
  • 贵阳网站设计模板建设工程监理招标网站
  • 上海专业的网页设计公司百度推广优化怎么做的
  • 河南城乡建设厅网站wordpress 主题 字体
  • 网站编辑的工作内容深圳网站设计公司有哪些