建立个网站需要多少钱,织梦建站教程视频,建一个区域网站需要多少资金,wordpress固定连接打不开Spark-Streaming概述
Spark Streaming 用于流式数据的处理。 和 Spark 基于 RDD 的概念很相似#xff0c;Spark Streaming 使用离散化流(discretized stream)作为抽象表示#xff0c;叫作 DStream。
DStream 是随时间推移而收到的数据的序列。
Spark-Streaming的特点…Spark-Streaming概述
Spark Streaming 用于流式数据的处理。 和 Spark 基于 RDD 的概念很相似Spark Streaming 使用离散化流(discretized stream)作为抽象表示叫作 DStream。
DStream 是随时间推移而收到的数据的序列。
Spark-Streaming的特点易用、容错、易整合到spark体系。
Spark-Streaming架构 DStream实操
案例词频统计
idea中运行
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}object wordcount {def main(args: Array[String]): Unit {// 创建 SparkConf 对象设置运行模式为本地多线程应用名为 streamingval sparkConf new SparkConf().setMaster(local[*]).setAppName(streaming)// 创建 StreamingContext 对象设置批处理间隔为 3 秒val ssc new StreamingContext(sparkConf, Seconds(3))// 从指定的主机和端口接收文本流数据val lineStreams ssc.socketTextStream(node01, 9999)// 将每行文本拆分为单词val wordStreams lineStreams.flatMap(_.split( ))// 为每个单词映射为 (单词, 1) 的键值对val wordAndOneStreams wordStreams.map((_, 1))// 按单词进行分组并对每个单词的计数进行累加val wordAndCountStreams wordAndOneStreams.reduceByKey(_ _)// 打印每个批次中每个单词的计数结果wordAndCountStreams.print()// 启动流式计算ssc.start()// 等待计算终止ssc.awaitTermination()}
}在虚拟机中输入 nc -lk 9999 并输入数据 结果 解析 对数据的操作也是按照 RDD 为单位来进行的
计算过程由 Spark Engine 来完成
DStream 创建
RDD队列
案例
循环创建几个 RDD将 RDD 放入队列。通过 SparkStream 创建 Dstream计算 WordCount
代码
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.rdd.RDD
import scala.collection.mutableobject RDD {def main(args: Array[String]): Unit {// 创建 SparkConf 对象设置运行模式为本地多线程应用名为 RDDStreamval sparkConf new SparkConf().setMaster(local[*]).setAppName(RDDStream)// 创建 StreamingContext 对象设置批处理间隔为 4 秒val ssc new StreamingContext(sparkConf, Seconds(4))// 创建一个可变队列用于存储 RDDval rddQueue new mutable.Queue[RDD[Int]]()// 从队列中创建输入流oneAtATime 为 false 表示可以同时处理多个 RDDval inputStream ssc.queueStream(rddQueue, oneAtATime false)// 将输入流中的每个元素映射为 (元素, 1) 的键值对val mappedStream inputStream.map((_, 1))// 按键对键值对进行聚合统计每个键的出现次数val reducedStream mappedStream.reduceByKey(_ _)// 打印每个批次中每个键的计数结果reducedStream.print()// 启动流式计算ssc.start()// 循环 5 次每次向队列中添加一个 RDD并休眠 2 秒for (i - 1 to 5) {rddQueue ssc.sparkContext.makeRDD(1 to 300, 10)Thread.sleep(2000)}// 等待计算终止ssc.awaitTermination()}
}运行结果 自定义数据源
自定义数据源
import java.io.{BufferedReader, InputStreamReader}
import java.net.Socket
import java.nio.charset.StandardCharsetsimport org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiverimport scala.util.control.NonFatalclass CustomerReceiver(host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_ONLY) {override def onStart(): Unit {new Thread(Socket Receiver) {override def run(): Unit {receive()}}.start()}def receive(): Unit {var socket: Socket nullvar reader: BufferedReader nulltry {socket new Socket(host, port)reader new BufferedReader(new InputStreamReader(socket.getInputStream, StandardCharsets.UTF_8))var input: String reader.readLine()while (!isStopped() input ! null) {store(input)input reader.readLine()}} catch {case NonFatal(e) restart(Error receiving data, e)} finally {if (reader ! null) {try {reader.close()} catch {case NonFatal(e) println(sError closing reader: ${e.getMessage})}}if (socket ! null) {try {socket.close()} catch {case NonFatal(e) println(sError closing socket: ${e.getMessage})}}}restart(Restarting receiver)}override def onStop(): Unit {}
} 使用自定义的数据源采集数据
object sparkConf {def main(args: Array[String]): Unit {try {// 创建 SparkConf 对象设置运行模式为本地多线程应用名为 streamval sparkConf new SparkConf().setMaster(local[*]).setAppName(stream)// 创建 StreamingContext 对象设置批处理间隔为 5 秒val ssc new StreamingContext(sparkConf, Seconds(5))// 使用自定义 Receiver 创建输入流val lineStream ssc.receiverStream(new CustomerReceiver(node01, 9999))// 将每行文本拆分为单词val wordStream lineStream.flatMap(_.split( ))// 为每个单词映射为 (单词, 1) 的键值对val wordAndOneStream wordStream.map((_, 1))// 按单词进行分组并对每个单词的计数进行累加val wordAndCountStream wordAndOneStream.reduceByKey(_ _)// 打印每个批次中每个单词的计数结果wordAndCountStream.print()// 启动流式计算ssc.start()// 等待计算终止ssc.awaitTermination()
}}}