当前位置: 首页 > news >正文

上海创意网站建设pw网站更换域名

上海创意网站建设,pw网站更换域名,游戏网站建设免费,长沙工程建设管理中心网站[TOC]DStream的各种transformationTransformation Meaningmap(func) 对DStream中的各个元素进行func函数操作#xff0c;然后返回一个新的DStream.flatMap(func) 与map方法类似#xff0c;只不过各个输入项可以被输出为零个或多个输出项filter(func) 过滤出所有函数func返回值…[TOC]DStream的各种transformationTransformation Meaningmap(func) 对DStream中的各个元素进行func函数操作然后返回一个新的DStream.flatMap(func) 与map方法类似只不过各个输入项可以被输出为零个或多个输出项filter(func) 过滤出所有函数func返回值为true的DStream元素并返回一个新的DStreamrepartition(numPartitions) 增加或减少DStream中的分区数从而改变DStream的并行度union(otherStream) 将源DStream和输入参数为otherDStream的元素合并并返回一个新的DStream.count() 通过对DStreaim中的各个RDD中的元素进行计数然后返回只有一个元素的RDD构成的DStreamreduce(func) 对源DStream中的各个RDD中的元素利用func进行聚合操作然后返回只有一个元素的RDD构成的新的DStream.countByValue() 对于元素类型为K的DStream返回一个元素为(K,Long)键值对形式的新的DStreamLong对应的值为源DStream中各个RDD的key出现的次数reduceByKey(func, [numTasks]) 利用func函数对源DStream中的key进行聚合操作然后返回新的(KV)对构成的DStreamjoin(otherStream, [numTasks]) 输入为(K,V)、(K,W)类型的DStream返回一个新的(K(VW)类型的DStreamcogroup(otherStream, [numTasks]) 输入为(K,V)、(K,W)类型的DStream返回一个新的 (K, Seq[V], Seq[W]) 元组类型的DStreamtransform(func) 通过RDD-to-RDD函数作用于源码DStream中的各个RDD可以是任意的RDD操作从而返回一个新的RDDupdateStateByKey(func) 根据于key的前置状态和key的新值对key进行更新返回一个新状态的DstreamWindow 函数可以看到很多都是在RDD中已经有的transformation算子操作所以这里只关注transform、updateStateByKey和window函数transformation之transform操作DStream transform1、transform操作应用在DStream上时可以用于执行任意的RDD到RDD的转换操作。它可以用于实现DStream API中所没有提供的操作。比如说DStream API中并没有提供将一个DStream中的每个batch与一个特定的RDD进行join的操作。但是我们自己就可以使用transform操作来实现该功能。2、DStream.join()只能join其他DStream。在DStream每个batch的RDD计算出来之后会去跟其他DStream的RDD进行join。案例测试代码如下package cn.xpleaf.bigdata.spark.scala.streaming.p1import org.apache.log4j.{Level, Logger}import org.apache.spark.SparkConfimport org.apache.spark.rdd.RDDimport org.apache.spark.streaming.{Seconds, StreamingContext}import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}/*** 使用Transformation之transform来完成在线黑名单过滤* 需求* 将日志数据中来自于ip[27.19.74.143, 110.52.250.126]实时过滤掉* 数据格式* 27.19.74.143##2016-05-30 17:38:20##GET /static/image/common/faq.gif HTTP/1.1##200##1127*/object _06SparkStreamingTransformOps {def main(args: Array[String]): Unit {if (args null || args.length 2) {System.err.println(Parameter Errors! Usage: |hostname: 监听的网络socket的主机名或ip地址|port 监听的网络socket的端口.stripMargin)System.exit(-1)}Logger.getLogger(org.apache.spark).setLevel(Level.OFF)val conf new SparkConf().setAppName(_01SparkStreamingNetWorkOps.getClass.getSimpleName).setMaster(local[2])val ssc new StreamingContext(conf, Seconds(2))val hostname args(0).trimval port args(1).trim.toInt//黑名单数据val blacklist List((27.19.74.143, true), (110.52.250.126, true))// val blacklist List(27.19.74.143, 110.52.250.126)val blacklistRDD:RDD[(String, Boolean)] ssc.sparkContext.parallelize(blacklist)val linesDStream:ReceiverInputDStream[String] ssc.socketTextStream(hostname, port)// 如果用到一个DStream和rdd进行操作无法使用dstream直接操作只能使用transform来进行操作val filteredDStream:DStream[String] linesDStream.transform(rdd {val ip2InfoRDD:RDD[(String, String)] rdd.map{line {(line.split(##)(0), line)}}/** A(M) B(N)两张表* across join* 交叉连接没有on条件的连接会产生笛卡尔积(M*N条记录) 不能用* inner join* 等值连接取A表和B表的交集也就是获取在A和B中都有的数据没有的剔除掉 不能用* left outer join* 外链接最常用就是左外连接(将左表中所有的数据保留右表中能够对应上的数据正常显示在右表中对应不上显示为null)* 可以通过非空判断是左外连接达到inner join的结果*/val joinedInfoRDD:RDD[(String, (String, Option[Boolean]))] ip2InfoRDD.leftOuterJoin(blacklistRDD)joinedInfoRDD.filter{case (ip, (line, joined)) {joined None}}//执行过滤操作.map{case (ip, (line, joined)) line}})filteredDStream.print()ssc.start()ssc.awaitTermination()ssc.stop() // stop中的boolean参数设置为true关闭该ssc对应的SparkContext默认为false只关闭自身}}nc中产生数据[uplookinguplooking01 ~]$ nc -lk 489327.19.74.143##2016-05-30 17:38:20##GET /data/attachment/common/c8/common_2_verify_icon.png HTTP/1.1##200##582110.52.250.126##2016-05-30 17:38:20##GET /static/js/logging.js?y7a HTTP/1.1##200##6038.35.201.144##2016-05-30 17:38:20##GET /uc_server/avatar.php?uid29331sizemiddle HTTP/1.1##301##-输出结果如下-------------------------------------------Time: 1526006084000 ms-------------------------------------------8.35.201.144##2016-05-30 17:38:20##GET /uc_server/avatar.php?uid29331sizemiddle HTTP/1.1##301##-transformation之updateStateByKey操作概述1、Spark Streaming的updateStateByKey可以DStream中的数据进行按key做reduce操作然后对各个批次的数据进行累加。2、 updateStateByKey 解释以DStream中的数据进行按key做reduce操作然后对各个批次的数据进行累加在有新的数据信息进入或更新时可以让用户保持想要的任何状。使用这个功能需要完成两步1) 定义状态可以是任意数据类型2) 定义状态更新函数用一个函数指定如何使用先前的状态从输入流中的新值更新状态。对于有状态操作要不断的把当前和历史的时间切片的RDD累加计算随着时间的流失计算的数据规模会变得越来越大3、要思考的是如果数据量很大的时候或者对性能的要求极为苛刻的情况下可以考虑将数据放在Redis或者tachyon或者ignite上4、注意updateStateByKey操作要求必须开启Checkpoint机制。案例Scala版测试代码如下package cn.xpleaf.bigdata.spark.scala.streaming.p1import org.apache.log4j.{Level, Logger}import org.apache.spark.SparkConfimport org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}import org.apache.spark.streaming.{Seconds, StreamingContext}/*** 状态函数updateStateByKey* 更新key的状态(就是key对应的value)** 通常的作用计算某个key截止到当前位置的状态* 统计截止到目前为止的word对应count* 要想完成截止到目前为止的操作必须将历史的数据和当前最新的数据累计起来所以需要一个地方来存放历史数据* 这个地方就是checkpoint目录**/object _07SparkStreamingUpdateStateByKeyOps {def main(args: Array[String]): Unit {if (args null || args.length 2) {System.err.println(Parameter Errors! Usage: |hostname: 监听的网络socket的主机名或ip地址|port 监听的网络socket的端口.stripMargin)System.exit(-1)}val hostname args(0).trimval port args(1).trim.toIntLogger.getLogger(org.apache.spark).setLevel(Level.OFF)val conf new SparkConf().setAppName(_07SparkStreamingUpdateStateByKeyOps.getClass.getSimpleName).setMaster(local[2])val ssc new StreamingContext(conf, Seconds(2))ssc.checkpoint(hdfs://ns1/checkpoint/streaming/usb)// 接收到的当前批次的数据val linesDStream:ReceiverInputDStream[String] ssc.socketTextStream(hostname, port)// 这是记录下来的当前批次的数据val rbkDStream:DStream[(String, Int)] linesDStream.flatMap(_.split( )).map((_, 1)).reduceByKey(__)val usbDStream:DStream[(String, Int)] rbkDStream.updateStateByKey(updateFunc)usbDStream.print()ssc.start()ssc.awaitTermination()ssc.stop() // stop中的boolean参数设置为true关闭该ssc对应的SparkContext默认为false只关闭自身}/*** param seq 当前批次的key对应的数据* param history 历史key对应的数据可能有可能没有* return*/def updateFunc(seq: Seq[Int], history: Option[Int]): Option[Int] {var sum seq.sumif(history.isDefined) {sum history.get}Option[Int](sum)}}nc产生数据[uplookinguplooking01 ~]$ nc -lk 4893hello hellohello you hello he hello me输出结果如下-------------------------------------------Time: 1526009358000 ms-------------------------------------------(hello,2)18/05/11 11:29:18 INFO WriteAheadLogManager for Thread: Attempting to clear 0 old log files in hdfs://ns1/checkpoint/streaming/usb/receivedBlockMetadata older than 1526009338000:-------------------------------------------Time: 1526009360000 ms-------------------------------------------(hello,5)(me,1)(you,1)(he,1)18/05/11 11:29:20 INFO WriteAheadLogManager for Thread: Attempting to clear 0 old log files in hdfs://ns1/checkpoint/streaming/usb/receivedBlockMetadata older than 1526009340000:-------------------------------------------Time: 1526009362000 ms-------------------------------------------(hello,5)(me,1)(you,1)(he,1)Java版用法略有不同主要是 状态更新函数的写法上有区别如下package cn.xpleaf.bigdata.spark.java.streaming.p1;import com.google.common.base.Optional;import org.apache.log4j.Level;import org.apache.log4j.Logger;import org.apache.spark.SparkConf;import org.apache.spark.api.java.function.FlatMapFunction;import org.apache.spark.api.java.function.Function2;import org.apache.spark.streaming.Durations;import org.apache.spark.streaming.api.java.JavaDStream;import org.apache.spark.streaming.api.java.JavaPairDStream;import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;import org.apache.spark.streaming.api.java.JavaStreamingContext;import scala.Tuple2;import java.util.Arrays;import java.util.List;public class _02SparkStreamingUpdateStateByKeyOps {public static void main(String[] args) {if(args null || args.length 2) {System.err.println(Parameter Errors! Usage: );System.exit(-1);}Logger.getLogger(org.apache.spark).setLevel(Level.OFF);SparkConf conf new SparkConf().setAppName(_02SparkStreamingUpdateStateByKeyOps.class.getSimpleName()).setMaster(local[2]);JavaStreamingContext jsc new JavaStreamingContext(conf, Durations.seconds(2));jsc.checkpoint(hdfs://ns1/checkpoint/streaming/usb);String hostname args[0].trim();int port Integer.valueOf(args[1].trim());JavaReceiverInputDStream lineDStream jsc.socketTextStream(hostname, port);//默认的持久化级别MEMORY_AND_DISK_SER_2JavaDStream wordsDStream lineDStream.flatMap(new FlatMapFunction() {Overridepublic Iterable call(String line) throws Exception {return Arrays.asList(line.split( ));}});JavaPairDStream pairsDStream wordsDStream.mapToPair(word - {return new Tuple2(word, 1);});JavaPairDStream rbkDStream pairsDStream.reduceByKey(new Function2() {Overridepublic Integer call(Integer v1, Integer v2) throws Exception {return v1 v2;}});// 做历史的累计操作JavaPairDStream usbDStream rbkDStream.updateStateByKey(new Function2, Optional, Optional() {Overridepublic Optional call(List current, Optional history) throws Exception {int sum 0;for (int i : current) {sum i;}if (history.isPresent()) {sum history.get();}return Optional.of(sum);}});usbDStream.print();jsc.start();//启动流式计算jsc.awaitTermination();//等待执行结束jsc.close();}}transformation之window操作DStream window 滑动窗口Spark Streaming提供了滑动窗口操作的支持从而让我们可以对一个滑动窗口内的数据执行计算操作。每次掉落在窗口内的RDD的数据会被聚合起来执行计算操作然后生成的RDD会作为window DStream的一个RDD。比如下图中就是对每三秒钟的数据执行一次滑动窗口计算这3秒内的3个RDD会被聚合起来进行处理然后过了两秒钟又会对最近三秒内的数据执行滑动窗口计算。所以每个滑动窗口操作都必须指定两个参数窗口长度以及滑动间隔而且这两个参数值都必须是batch间隔的整数倍。1.红色的矩形就是一个窗口窗口hold的是一段时间内的数据流。2.这里面每一个time都是时间单元在官方的例子中每隔window size是3 time unit, 而且每隔2个单位时间窗口会slide一次。所以基于窗口的操作需要指定2个参数window length - The duration of the window (3 in the figure)slide interval - The interval at which the window-based operation is performed (2 in the figure).1.窗口大小个人感觉是一段时间内数据的容器。2.滑动间隔就是我们可以理解的cron表达式吧。举个例子吧还是以最著名的wordcount举例每隔10秒统计一下过去30秒过来的数据。// Reduce last 30 seconds of data, every 10 secondsval windowedWordCounts pairs.reduceByKeyAndWindow(_ _, Seconds(30), Seconds(10))DSstream window滑动容器功能window 对每个滑动窗口的数据执行自定义的计算countByWindow 对每个滑动窗口的数据执行count操作reduceByWindow 对每个滑动窗口的数据执行reduce操作reduceByKeyAndWindow 对每个滑动窗口的数据执行reduceByKey操作countByValueAndWindow 对每个滑动窗口的数据执行countByValue操作案例测试代码如下package cn.xpleaf.bigdata.spark.scala.streaming.p1import org.apache.log4j.{Level, Logger}import org.apache.spark.SparkConfimport org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}import org.apache.spark.streaming.{Seconds, StreamingContext}/***窗口函数window* 每隔多长时间(滑动频率slideDuration)统计过去多长时间(窗口长度windowDuration)中的数据* 需要注意的就是窗口长度和滑动频率* windowDuration M*batchIntervalslideDuration N*batchInterval*/object _08SparkStreamingWindowOps {def main(args: Array[String]): Unit {if (args null || args.length 2) {System.err.println(Parameter Errors! Usage: |hostname: 监听的网络socket的主机名或ip地址|port 监听的网络socket的端口.stripMargin)System.exit(-1)}val hostname args(0).trimval port args(1).trim.toIntLogger.getLogger(org.apache.spark).setLevel(Level.OFF)val conf new SparkConf().setAppName(_08SparkStreamingWindowOps.getClass.getSimpleName).setMaster(local[2])val ssc new StreamingContext(conf, Seconds(2))// 接收到的当前批次的数据val linesDStream:ReceiverInputDStream[String] ssc.socketTextStream(hostname, port)val pairsDStream:DStream[(String, Int)] linesDStream.flatMap(_.split( )).map((_, 1))// 每隔4s统计过去6s中产生的数据val retDStream:DStream[(String, Int)] pairsDStream.reduceByKeyAndWindow(__, windowDuration Seconds(6), slideDuration Seconds(4))retDStream.print()ssc.start()ssc.awaitTermination()ssc.stop() // stop中的boolean参数设置为true关闭该ssc对应的SparkContext默认为false只关闭自身}}nc产生数据[uplookinguplooking01 ~]$ nc -lk 4893hello youhello hehello mehello youhello he输出结果如下-------------------------------------------Time: 1526016316000 ms-------------------------------------------(hello,4)(me,1)(you,2)(he,1)-------------------------------------------Time: 1526016320000 ms-------------------------------------------(hello,5)(me,1)(you,2)(he,2)-------------------------------------------Time: 1526016324000 ms-------------------------------------------DStream的output操作以及foreachRDDDStream output操作1、print 打印每个batch中的前10个元素主要用于测试或者是不需要执行什么output操作时用于简单触发一下job。2、saveAsTextFile(prefix, [suffix]) 将每个batch的数据保存到文件中。每个batch的文件的命名格式为prefix-TIME_IN_MS[.suffix]3、saveAsObjectFile 同上但是将每个batch的数据以序列化对象的方式保存到SequenceFile中。4、saveAsHadoopFile 同上将数据保存到Hadoop文件中5、foreachRDD 最常用的output操作遍历DStream中的每个产生的RDD进行处理。可以将每个RDD中的数据写入外部存储比如文件、数据库、缓存等。通常在其中是针对RDD执行action操作的比如foreach。DStream foreachRDD详解相关内容其实在Spark开发调优中已经有相关的说明。通常在foreachRDD中都会创建一个Connection比如JDBC Connection然后通过Connection将数据写入外部存储。误区一在RDD的foreach操作外部创建Connection这种方式是错误的因为它会导致Connection对象被序列化后传输到每个Task中。而这种Connection对象实际上一般是不支持序列化的也就无法被传输。dstream.foreachRDD { rdd val connection createNewConnection()rdd.foreach { record connection.send(record)}}误区二在RDD的foreach操作内部创建Connection这种方式是可以的但是效率低下。因为它会导致对于RDD中的每一条数据都创建一个Connection对象。而通常来说Connection的创建是很消耗性能的。dstream.foreachRDD { rdd rdd.foreach { record val connection createNewConnection()connection.send(record)connection.close()}}DStream foreachRDD合理使用合理方式一使用RDD的foreachPartition操作并且在该操作内部创建Connection对象这样就相当于是为RDD的每个partition创建一个Connection对象节省资源的多了。dstream.foreachRDD { rdd rdd.foreachPartition { partitionOfRecords val connection createNewConnection()partitionOfRecords.foreach(record connection.send(record))connection.close()}}合理方式二自己手动封装一个静态连接池使用RDD的foreachPartition操作并且在该操作内部从静态连接池中通过静态方法获取到一个连接使用之后再还回去。这样的话甚至在多个RDD的partition之间也可以复用连接了。而且可以让连接池采取懒创建的策略并且空闲一段时间后将其释放掉。dstream.foreachRDD { rdd rdd.foreachPartition { partitionOfRecords val connection ConnectionPool.getConnection()partitionOfRecords.foreach(record connection.send(record))ConnectionPool.returnConnection(connection)}}foreachRDD 与foreachPartition实现实战需要注意的是(1)、你最好使用forEachPartition函数来遍历RDD并且在每台Work上面创建数据库的connection。(2)、如果你的数据库并发受限可以通过控制数据的分区来减少并发。(3)、在插入MySQL的时候最好使用批量插入。(4)确保你写入的数据库过程能够处理失败因为你插入数据库的过程可能会经过网络这可能导致数据插入数据库失败。(5)、不建议将你的RDD数据写入到MySQL等关系型数据库中。这部分内容其实可以参考开发调优部分的案例只是那里并没有foreachRDD因为其并没有使用DStream但是原理是一样的因为最终都是针对RDD来进行操作的。
http://www.pierceye.com/news/56400/

相关文章:

  • 建设建材网站的目的做网站一定要用服务器吗
  • 域名与网站建设wordpress子主题安全
  • 黄冈建设信息网站深圳建筑行业招聘网
  • 做公司网站可以抄别人的吗公司网站设计哪家公司好
  • 网站开发用什么技术宣传片拍摄制作报价单
  • 新农村建设网站知乎网站服务器空间选择
  • 网站怎么做百度才会收录网站建设项目规划书
  • 新网站推广前端培训多少钱
  • 湖北网站建设搭建厦门高端网站建设公司
  • 广东大唐建设网站如何开发微信公众号小程序
  • 山东电商网站建设wordpress问答社区模板
  • 建网站的书籍服务平台管理系统
  • 衡水哪个公司做网站好什么做书籍的网站好
  • 网站tdk设置界面可信网站的认证
  • 免费发布推广信息网站企业管理系统大全免费
  • 做网站没有成本费用如何做账用QQ群做网站排名
  • 建设官方企业网站北京软件开发公司找和丰软件专业
  • 北京燕华工程建设有限公司网站青岛网站建设公
  • 深圳市外贸网站建设如何做网站跳转登入
  • 网页游戏折扣充值平台南宁seo网站推广服务
  • 志愿海南网站天津做胎儿鉴定网站
  • 网站建设可以自己建设服务器吗深圳ui设计师招聘
  • 网站制作与网页建设配音阁在线制作网站
  • 网站的构思设计素材网站源码
  • 手机百度 网站提交做网站没灵感
  • 一般网站的宽度是多少室内设计说明200字
  • 雅安交通建设集团网站微商城网站建设公司的价格
  • 利用ionic做的网站广州移动 网站设计
  • 网站后台支持的字体前端开发基础知识
  • 服务器网站管理系统wordpress使用攻略