当前位置: 首页 > news >正文

建立个网站需要多少钱织梦建站教程视频

建立个网站需要多少钱,织梦建站教程视频,建一个区域网站需要多少资金,wordpress固定连接打不开Spark-Streaming概述 Spark Streaming 用于流式数据的处理。 和 Spark 基于 RDD 的概念很相似#xff0c;Spark Streaming 使用离散化流(discretized stream)作为抽象表示#xff0c;叫作 DStream。 DStream 是随时间推移而收到的数据的序列。 Spark-Streaming的特点…Spark-Streaming概述 Spark Streaming 用于流式数据的处理。 和 Spark 基于 RDD 的概念很相似Spark Streaming 使用离散化流(discretized stream)作为抽象表示叫作 DStream。 DStream 是随时间推移而收到的数据的序列。 Spark-Streaming的特点易用、容错、易整合到spark体系。 Spark-Streaming架构 DStream实操 案例词频统计 idea中运行 import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext}object wordcount {def main(args: Array[String]): Unit {// 创建 SparkConf 对象设置运行模式为本地多线程应用名为 streamingval sparkConf new SparkConf().setMaster(local[*]).setAppName(streaming)// 创建 StreamingContext 对象设置批处理间隔为 3 秒val ssc new StreamingContext(sparkConf, Seconds(3))// 从指定的主机和端口接收文本流数据val lineStreams ssc.socketTextStream(node01, 9999)// 将每行文本拆分为单词val wordStreams lineStreams.flatMap(_.split( ))// 为每个单词映射为 (单词, 1) 的键值对val wordAndOneStreams wordStreams.map((_, 1))// 按单词进行分组并对每个单词的计数进行累加val wordAndCountStreams wordAndOneStreams.reduceByKey(_ _)// 打印每个批次中每个单词的计数结果wordAndCountStreams.print()// 启动流式计算ssc.start()// 等待计算终止ssc.awaitTermination()} }在虚拟机中输入 nc -lk 9999   并输入数据 结果 解析 对数据的操作也是按照 RDD 为单位来进行的 计算过程由 Spark Engine 来完成 DStream 创建 RDD队列 案例 循环创建几个 RDD将 RDD 放入队列。通过 SparkStream 创建 Dstream计算 WordCount 代码 import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.rdd.RDD import scala.collection.mutableobject RDD {def main(args: Array[String]): Unit {// 创建 SparkConf 对象设置运行模式为本地多线程应用名为 RDDStreamval sparkConf new SparkConf().setMaster(local[*]).setAppName(RDDStream)// 创建 StreamingContext 对象设置批处理间隔为 4 秒val ssc new StreamingContext(sparkConf, Seconds(4))// 创建一个可变队列用于存储 RDDval rddQueue new mutable.Queue[RDD[Int]]()// 从队列中创建输入流oneAtATime 为 false 表示可以同时处理多个 RDDval inputStream ssc.queueStream(rddQueue, oneAtATime false)// 将输入流中的每个元素映射为 (元素, 1) 的键值对val mappedStream inputStream.map((_, 1))// 按键对键值对进行聚合统计每个键的出现次数val reducedStream mappedStream.reduceByKey(_ _)// 打印每个批次中每个键的计数结果reducedStream.print()// 启动流式计算ssc.start()// 循环 5 次每次向队列中添加一个 RDD并休眠 2 秒for (i - 1 to 5) {rddQueue ssc.sparkContext.makeRDD(1 to 300, 10)Thread.sleep(2000)}// 等待计算终止ssc.awaitTermination()} }运行结果 自定义数据源 自定义数据源 import java.io.{BufferedReader, InputStreamReader} import java.net.Socket import java.nio.charset.StandardCharsetsimport org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.receiver.Receiverimport scala.util.control.NonFatalclass CustomerReceiver(host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_ONLY) {override def onStart(): Unit {new Thread(Socket Receiver) {override def run(): Unit {receive()}}.start()}def receive(): Unit {var socket: Socket nullvar reader: BufferedReader nulltry {socket new Socket(host, port)reader new BufferedReader(new InputStreamReader(socket.getInputStream, StandardCharsets.UTF_8))var input: String reader.readLine()while (!isStopped() input ! null) {store(input)input reader.readLine()}} catch {case NonFatal(e) restart(Error receiving data, e)} finally {if (reader ! null) {try {reader.close()} catch {case NonFatal(e) println(sError closing reader: ${e.getMessage})}}if (socket ! null) {try {socket.close()} catch {case NonFatal(e) println(sError closing socket: ${e.getMessage})}}}restart(Restarting receiver)}override def onStop(): Unit {} } 使用自定义的数据源采集数据 object sparkConf {def main(args: Array[String]): Unit {try {// 创建 SparkConf 对象设置运行模式为本地多线程应用名为 streamval sparkConf new SparkConf().setMaster(local[*]).setAppName(stream)// 创建 StreamingContext 对象设置批处理间隔为 5 秒val ssc new StreamingContext(sparkConf, Seconds(5))// 使用自定义 Receiver 创建输入流val lineStream ssc.receiverStream(new CustomerReceiver(node01, 9999))// 将每行文本拆分为单词val wordStream lineStream.flatMap(_.split( ))// 为每个单词映射为 (单词, 1) 的键值对val wordAndOneStream wordStream.map((_, 1))// 按单词进行分组并对每个单词的计数进行累加val wordAndCountStream wordAndOneStream.reduceByKey(_ _)// 打印每个批次中每个单词的计数结果wordAndCountStream.print()// 启动流式计算ssc.start()// 等待计算终止ssc.awaitTermination() }}}
http://www.pierceye.com/news/266534/

相关文章:

  • 绵阳网站建设优化甘肃省安装建设集团公司网站
  • wordpress建站知乎广告设计软件coreldraw教程
  • wordpress注册无法发送邮件保定seo外包服务商
  • 进口外贸网站有哪些wordpress百度统计代码
  • 建筑网站排行国外网站备案流程
  • dw做网站一般是多大的尺寸网站开发运行环境论文
  • 湖北省建设厅政务公开网站聊城开发app公司
  • 石家庄网站建设接单金融软件网站建设公司排名
  • 企企业业网网站站建建设设哪个网站可以做纸箱
  • 国外专门做视频翻译网站吗山西时代网站建设
  • 云南省城乡住房与建设厅网站杭州网站制作平台公司
  • 程序员做网站美工能过关吗深圳品牌折扣店
  • 地产网站设计怎么做网贷网站
  • 公司网站是如何搭建的跨境电商被骗血本无归
  • 品牌网站建设目标vps怎么做多个网站
  • 普陀区建设工程质检网站网站建设 工作方案
  • 三河做网站开发公司虚列成本
  • 网站公司建设网站首页注册资本可以随便填吗
  • 网站做链接的意义是什么意思网站设计与制作
  • 快速开发网站的应用程序网站高中建设工具
  • 备案期间网站可以做竞价吗网站开发四川
  • 盐城网站app建设竣工验收备案查询
  • 河南省建设厅八大员网站相城网页设计
  • 建设电子商务网站要多少钱怎么自己开公司
  • 网站设计分析怎么写5年的室内设计师收入
  • 珠海网站建设服务谷歌关键词排名查询工具
  • 三网站合一系统优化最好的安卓手机
  • 那几个网站可以做h5产品经理培训哪个机构好
  • 吉林市做网站阿里巴巴国际站怎么运营
  • 中国网站排名100网站建设属于销售费用