当前位置: 首页 > news >正文

网站发的文章如何优化网站建设支出

网站发的文章如何优化,网站建设支出,如何做网站 优帮云,wordpress客户使用的后端目录 0. 相关文章链接 1. RDD队列 1.1. 用法及说明 1.2. 案例实操 2. 自定义数据源 2.1. 用法和说明 2.2. 案例实操 3. Kafka数据源 3.1. 版本选型 3.2. Kafka 0-8 Receiver 模式#xff08;当前3.x版本不适用#xff09; 3.3. Kafka 0-8 Direct 模式#xff08;…目录 0. 相关文章链接 1. RDD队列 1.1. 用法及说明 1.2. 案例实操 2. 自定义数据源 2.1. 用法和说明 2.2. 案例实操 3. Kafka数据源 3.1. 版本选型 3.2. Kafka 0-8 Receiver 模式当前3.x版本不适用 3.3. Kafka 0-8 Direct 模式当前3.x版本不适用 3.4. Kafka 0-10 Direct 模式3.x版本中使用此模式 0. 相关文章链接 Spark文章汇总  1. RDD队列 1.1. 用法及说明 测试过程中可以通过使用 ssc.queueStream(queueOfRDDs)来创建 DStream每一个推送到这个队列中的 RDD都会作为一个 DStream 处理。  1.2. 案例实操 需求循环创建几个 RDD将 RDD 放入队列。通过 SparkStream 创建 Dstream计算WordCount编写代码 import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.streaming.dstream.{DStream, InputDStream, ReceiverInputDStream} import org.apache.spark.streaming.{Seconds, StreamingContext}import scala.collection.mutableobject StreamTest {def main(args: Array[String]): Unit {//1.初始化Spark配置信息val sparkConf: SparkConf new SparkConf().setMaster(local[*]).setAppName(StreamTest)//2.初始化SparkStreamingContextval ssc: StreamingContext new StreamingContext(sparkConf, Seconds(3))//3.创建RDD队列val rddQueue: mutable.Queue[RDD[Int]] new mutable.Queue[RDD[Int]]()//4.创建QueueInputDStreamval inputStream: InputDStream[Int] ssc.queueStream(rddQueue, oneAtATime false)//5.处理队列中的RDD数据val mappedStream: DStream[(Int, Int)] inputStream.map(((_: Int), 1))val reducedStream: DStream[(Int, Int)] mappedStream.reduceByKey((_: Int) _)//6.打印结果reducedStream.print()//7.启动任务ssc.start()//8.循环创建并向RDD队列中放入RDDfor (i - 1 to 5) {rddQueue ssc.sparkContext.makeRDD(1 to 300, 10)Thread.sleep(2000)}ssc.awaitTermination()}} 数据展示 ------------------------------------------- Time: 1689147795000 ms ------------------------------------------- (84,1) (96,1) (120,1) (180,1) (276,1) (156,1) (216,1) (300,1) (48,1) (240,1) ...------------------------------------------- Time: 1689147798000 ms ------------------------------------------- (84,2) (96,2) (120,2) (180,2) (276,2) (156,2) (216,2) (300,2) (48,2) (240,2) ...------------------------------------------- Time: 1689147801000 ms ------------------------------------------- (84,1) (96,1) (120,1) (180,1) (276,1) (156,1) (216,1) (300,1) (48,1) (240,1) ...------------------------------------------- Time: 1689147804000 ms ------------------------------------------- (84,1) (96,1) (120,1) (180,1) (276,1) (156,1) (216,1) (300,1) (48,1) (240,1) ...------------------------------------------- Time: 1689147807000 ms -------------------------------------------------------------------------------------- Time: 1689147810000 ms ------------------------------------------- 2. 自定义数据源 2.1. 用法和说明 需要继承 Receiver并实现 onStart、 onStop 方法来自定义数据源采集。 2.2. 案例实操 需求自定义数据源实现监控某个端口号获取该端口号内容。自定义数据源 import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.receiver.Receiver import java.io.{BufferedReader, InputStreamReader} import java.net.Socket import java.nio.charset.StandardCharsetsclass CustomerReceiver(host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_ONLY) {//最初启动的时候调用该方法作用为读数据并将数据发送给Sparkoverride def onStart(): Unit {new Thread(Socket Receiver) {override def run() {receive()}}.start()}//读数据并将数据发送给Sparkdef receive(): Unit {//创建一个Socketval socket: Socket new Socket(host, port)//定义一个变量用来接收端口传过来的数据var input: String null//创建一个BufferedReader用于读取端口传来的数据val reader: BufferedReader new BufferedReader(new InputStreamReader(socket.getInputStream, StandardCharsets.UTF_8))//读取数据input reader.readLine()//当receiver没有关闭并且输入数据不为空则循环发送数据给Sparkwhile (!isStopped() input ! null) {store(input)input reader.readLine()}//跳出循环则关闭资源reader.close()socket.close()//重启任务restart(restart)}override def onStop(): Unit {}} 使用自定义的数据源采集数据 import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.{Seconds, StreamingContext}object StreamTest {def main(args: Array[String]): Unit {//1.初始化Spark配置信息val sparkConf: SparkConf new SparkConf().setMaster(local[*]).setAppName(StreamTest)//2.初始化SparkStreamingContextval ssc: StreamingContext new StreamingContext(sparkConf, Seconds(3))//3.创建自定义receiver的Streamingval lineStream: ReceiverInputDStream[String] ssc.receiverStream(new CustomerReceiver(localhost, 9999))//4.将每一行数据做切分形成一个个单词val wordStream: DStream[String] lineStream.flatMap(_.split( ))//5.将单词映射成元组word,1val wordAndOneStream: DStream[(String, Int)] wordStream.map((_, 1))//6.将相同的单词次数做统计val wordAndCountStream: DStream[(String, Int)] wordAndOneStream.reduceByKey(_ _)//7.打印wordAndCountStream.print()//8.启动SparkStreamingContextssc.start()ssc.awaitTermination()}} 展示结果 ------------------------------------------- Time: 1689148212000 ms -------------------------------------------------------------------------------------- Time: 1689148215000 ms ------------------------------------------- (abc,2) (hello,1)------------------------------------------- Time: 1689148218000 ms ------------------------------------------- 3. Kafka数据源 3.1. 版本选型 ReceiverAPI需要一个专门的 Executor 去接收数据然后发送给其他的 Executor 做计算。存在的问题接收数据的 Executor 和计算的 Executor 速度会有所不同特别在接收数据的 Executor 速度大于计算的 Executor 速度会导致计算数据的节点内存溢出。早期版本中提供此方式当前版本不适用。DirectAPI是由计算的 Executor 来主动消费 Kafka 的数据速度由自身控制。  3.2. Kafka 0-8 Receiver 模式当前3.x版本不适用 需求通过 SparkStreaming 从 Kafka 读取数据并将读取过来的数据做简单计算最终打印到控制台导入依赖 dependencygroupIdorg.apache.spark/groupIdartifactIdspark-streaming-kafka-0-8_2.11/artifactIdversion2.4.5/version /dependency 编写代码 import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.ReceiverInputDStream import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.{Seconds, StreamingContext}object StreamTest {def main(args: Array[String]): Unit {//1.初始化Spark配置信息val sparkConf: SparkConf new SparkConf().setMaster(local[*]).setAppName(StreamTest)//2.初始化SparkStreamingContextval ssc: StreamingContext new StreamingContext(sparkConf, Seconds(3))//3.读取Kafka数据创建DStream(基于Receive方式)val kafkaDStream: ReceiverInputDStream[(String, String)] KafkaUtils.createStream(ssc,bigdata01:2181,bigdata02:2181,bigdata03:2181,StreamTest,Map[String, Int](test - 1))//4.计算WordCountkafkaDStream.map {case (_, value) (value, 1)}.reduceByKey(_ _).print()//5.开启任务ssc.start()ssc.awaitTermination()}} 3.3. Kafka 0-8 Direct 模式当前3.x版本不适用 需求通过 SparkStreaming 从 Kafka 读取数据并将读取过来的数据做简单计算最终打印到控制台导入依赖 dependencygroupIdorg.apache.spark/groupIdartifactIdspark-streaming-kafka-0-8_2.11/artifactIdversion2.4.5/version /dependency 编写代码自动维护 offset import kafka.serializer.StringDecoder import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.InputDStream import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.{Seconds, StreamingContext}object StreamTest {def main(args: Array[String]): Unit {//1.初始化Spark配置信息val sparkConf: SparkConf new SparkConf().setMaster(local[*]).setAppName(StreamTest)//2.初始化SparkStreamingContext并设置CKval ssc: StreamingContext new StreamingContext(sparkConf, Seconds(3))ssc.checkpoint(./checkpoint)//3.定义Kafka参数val kafkaPara: Map[String, String] Map[String, String](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG - bigdata01:9092,bigdata02:9092,bigdata03:9092,ConsumerConfig.GROUP_ID_CONFIG - StreamTest)//4.读取Kafka数据val kafkaDStream: InputDStream[(String, String)] KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc,kafkaPara,Set(test))//5.计算WordCountkafkaDStream.map((_: (String, String))._2).flatMap((_: String).split( )).map(((_: String), 1)).reduceByKey((_: Int) (_: Int)).print()//6. 开启任务ssc.start()ssc.awaitTermination()}} 编写代码手动维护 offset import kafka.common.TopicAndPartition import kafka.message.MessageAndMetadata import kafka.serializer.StringDecoder import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.streaming.dstream.{DStream, InputDStream} import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange} import org.apache.spark.streaming.{Seconds, StreamingContext}object StreamTest {def main(args: Array[String]): Unit {//1.初始化Spark配置信息val sparkConf: SparkConf new SparkConf().setMaster(local[*]).setAppName(StreamTest)//2.初始化SparkStreamingContext并设置CKval ssc: StreamingContext new StreamingContext(sparkConf, Seconds(3))ssc.checkpoint(./checkpoint)//3.定义Kafka参数val kafkaPara: Map[String, String] Map[String, String](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG - bigdata01:9092,bigdata02:9092,bigdata03:9092,ConsumerConfig.GROUP_ID_CONFIG - StreamTest)//4.获取上一次启动最后保留的OffsetgetOffset(MySQL)val fromOffsets: Map[TopicAndPartition, Long] Map[TopicAndPartition, Long](TopicAndPartition(test, 0) - 20)//5.读取Kafka数据创建DStreamval kafkaDStream: InputDStream[String] KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, String](ssc,kafkaPara,fromOffsets,(m: MessageAndMetadata[String, String]) m.message())//6.获取当前消费数据的offset信息并保存到创建的数组里// 注意// 使用的方法为通过transform算子将这个批次的数据转换成RDD然后使用asInstanceOf方法将RDD转换成HasOffsetRanges即可以获取offsetRanges// transform算子的用法是将这个批次的DStream转换成RDD但是transform是转换算子所以如果没有使用行动算子那其内部的内容不会进行运算var offsetRanges: Array[OffsetRange] Array.emptykafkaDStream.transform {rdd: RDD[String] {offsetRanges rdd.asInstanceOf[HasOffsetRanges].offsetRangesfor (offset - offsetRanges) {println(s${offset.topic}:${offset.partition}:${offset.fromOffset}:${offset.untilOffset})}rdd.flatMap((_: String).split( )).map(((_: String), 1)).reduceByKey((_: Int) (_: Int))}}//7.打印Offset信息kafkaDStream.foreachRDD {rdd: RDD[String] {for (offset - rdd.asInstanceOf[HasOffsetRanges].offsetRanges) {println(s${offset.topic}:${offset.partition}:${offset.fromOffset}:${offset.untilOffset})}}}//8.开启任务ssc.start()ssc.awaitTermination()}} 3.4. Kafka 0-10 Direct 模式3.x版本中使用此模式 需求通过 SparkStreaming 从 Kafka 读取数据并将读取过来的数据做简单计算最终打印到控制台导入依赖 dependencygroupIdorg.apache.spark/groupIdartifactIdspark-streaming-kafka-0-10_2.12/artifactIdversion${spark.version}/versionscopeprovided/scope /dependency dependencygroupIdcom.fasterxml.jackson.core/groupIdartifactIdjackson-core/artifactIdversion2.10.1/versionscopeprovided/scope /dependency 编写代码 import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord} import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.{DStream, InputDStream} import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies} import org.apache.spark.streaming.{Seconds, StreamingContext}object StreamTest {def main(args: Array[String]): Unit {//1.初始化Spark配置信息val sparkConf: SparkConf new SparkConf().setMaster(local[*]).setAppName(StreamTest)//2.初始化SparkStreamingContext并设置CKval ssc: StreamingContext new StreamingContext(sparkConf, Seconds(3))ssc.checkpoint(./checkpoint)//3.定义Kafka参数val kafkaPara: Map[String, Object] Map[String, Object](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG - bigdata01:9092,bigdata01:9092,bigdata01:9092,ConsumerConfig.GROUP_ID_CONFIG - StreamTest,ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG - org.apache.kafka.common.serialization.StringDeserializer,ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG - org.apache.kafka.common.serialization.StringDeserializer)//4.读取Kafka数据创建DStreamval kafkaDStream: InputDStream[ConsumerRecord[String, String]] KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](Set(testTopic), kafkaPara))//5.将每条消息的KV取出val valueDStream: DStream[String] kafkaDStream.map((record: ConsumerRecord[String, String]) record.value())//6.计算WordCountvalueDStream.flatMap(_.split( )).map((_, 1)).reduceByKey(_ _).print()//7.开启任务ssc.start()ssc.awaitTermination()}} 可以通过命令行查看Kafka对应Topic的消费进度 bin/kafka-consumer-groups.sh --describe --bootstrap-server bigdata01:9092 --group testTopic注其他Spark相关系列文章链接由此进 -  Spark文章汇总
http://www.pierceye.com/news/687957/

相关文章:

  • 动力无限西安网站建设wordpress图片到本地
  • 重庆微信营销网站建设seo快照推广
  • dedecms小说网站模板如何以目录形式访问网站
  • 有哪些可以做网站的平台怎样把录的视频做一下传到网站
  • 网站域名如何续费福建建筑人才服务中心
  • 站长平台怎么添加网站网站中文名注册
  • 唐四薪php网站开发答案想制作一个网站要多少钱
  • ppt网站建设答案东台网页定制
  • 做网站 侵权如何制作手机版网站
  • 代发网站建设app网站软件
  • 家居企业网站建设如何国家企业信息公示系统全国
  • 网站平台如何推广wordpress登录页面显示ip
  • 如何做网站百度排名优化推广的目的是什么
  • 重庆忠县网站建设公司哪家专业芷江建设局的工作人员网站
  • 戴尔电脑网站建设方案范文室内设计方案图
  • 餐厅网站建设策划方案网站建设存在的具体问题
  • 竞价页面网站做优化广告商对接平台
  • 网站后台如何修改密码wordpress调整页面布局
  • 东莞热点网站建设莱州人才网
  • 线上渠道推广网站的优化哪个好
  • 群晖可以做网站服务器微信朋友圈怎么发链接那种网页怎么制作
  • wordpress 公司网站网站和公众号的区别
  • 数据库跟网站seo流程
  • 网站首页关键词设置网站的整体风格
  • wordpress源码站整站源码制作wordpress插件
  • 海口网站建设就q479185700上墙网站空间后台怎么进入
  • 四川电子有限公司 - 手机网站如何做网站家具导购
  • 网站经营性备案修改wordpress自带小工具
  • 网站怎么建设可以发图评论网站建设制作公
  • 做销售的网站设计公司是做什么的