当前位置: 首页 > news >正文

网站制作技术网站建设一般好久到期

网站制作技术,网站建设一般好久到期,企业主体负责人和网站负责人,php网站开发好学吗【IDEASpark Streaming 3.4.1Dstream监控套接字流统计WordCount保存至MySQL8】 把DStream写入到MySQL数据库中 Spark 3.4.1MySQL 8.0.30sbt 1.9.2 文章目录 【IDEASpark Streaming 3.4.1Dstream监控套接字流统计WordCount保存至MySQL8】前言一、背景说明二、使用步骤1.引入库2…【IDEASpark Streaming 3.4.1Dstream监控套接字流统计WordCount保存至MySQL8】 把DStream写入到MySQL数据库中 Spark 3.4.1MySQL 8.0.30sbt 1.9.2 文章目录 【IDEASpark Streaming 3.4.1Dstream监控套接字流统计WordCount保存至MySQL8】前言一、背景说明二、使用步骤1.引入库2.开发代码运行测试 总结 前言 需要基于Spark Streaming 将实时监控的套接字流统计WordCount结果保存至MySQL 提示本项目通过sbt控制依赖 一、背景说明 在Spark应用中外部系统经常需要使用到Spark DStream处理后的数据因此需要采用输出操作把DStream的数据输出到数据库或者文件系统中 Spark Streaming是一个基于Spark的实时计算框架它可以从多种数据源消费数据并对数据进行高效、可扩展、容错的处理。Spark Streaming的工作原理有以下几个步骤 数据接收Spark Streaming可以从各种输入源接收数据如Kafka、Flume、Twitter、Kinesis等然后将数据分发到Spark集群中的不同节点上。每个节点上有一个接收器Receiver负责接收数据并将数据存储在内存或磁盘中。数据划分Spark Streaming将连续的数据流划分为一系列小批量Batch的数据每个批次包含一定时间间隔内的数据。这个时间间隔称为批处理间隔Batch Interval可以根据应用的需求进行设置。每个批次的数据都被封装成一个RDDRDD是Spark的核心数据结构表示一个不可变的分布式数据集。数据处理Spark Streaming对每个批次的RDD进行转换和输出操作实现对流数据的处理和分析。转换操作可以使用Spark Core提供的各种函数如map、reduce、join等也可以使用Spark Streaming提供的一些特殊函数如window、updateStateByKey等。输出操作可以将处理结果保存到外部系统中如HDFS、数据库等。数据输出Spark Streaming将处理结果以DStream的形式输出DStream是一系列连续的RDD组成的序列表示一个离散化的数据流。DStream可以被进一步转换或输出到其他系统中。 DStream有状态转换操作是指在Spark Streaming中对DStream进行一些基于历史数据或中间结果的转换从而得到一个新的DStream。 二、使用步骤 1.引入库 ThisBuild / version : 0.1.0-SNAPSHOTThisBuild / scalaVersion : 2.13.11lazy val root (project in file(.)).settings(name : SparkLearning,idePackagePrefix : Some(cn.lh.spark),libraryDependencies org.apache.spark %% spark-sql % 3.4.1,libraryDependencies org.apache.spark %% spark-core % 3.4.1,libraryDependencies org.apache.hadoop % hadoop-auth % 3.3.6,libraryDependencies org.apache.spark %% spark-streaming % 3.4.1,libraryDependencies org.apache.spark %% spark-streaming-kafka-0-10 % 3.4.1,libraryDependencies org.apache.spark %% spark-mllib % 3.4.1 % provided,libraryDependencies mysql % mysql-connector-java % 8.0.30 ) 2.开发代码 为了实现通过spark Streaming 监控控制台输入需要开发两个代码 NetworkWordCountStatefultoMysql.scalaStreamingSaveMySQL8.scala NetworkWordCountStatefultoMysql.scala package cn.lh.spark import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} object NetworkWordCountStatefultoMysql { def main(args: Array[String]): Unit { // 定义状态更新函数 val updateFunc (values: Seq[Int], state: Option[Int]) { val currentCount values.foldLeft(0)(_ _) val previousCount state.getOrElse(0) Some(currentCount previousCount) } // 设置log4j日志级别 StreamingExamples.setStreamingLogLevels() val conf: SparkConf new SparkConf().setAppName(NetworkCountStateful).setMaster(local[2]) val scc: StreamingContext new StreamingContext(conf, Seconds(5)) // 设置检查点具有容错机制 scc.checkpoint(F:\\niit\\2023\\2023_2\\Spark\\codes\\checkpoint) val lines: ReceiverInputDStream[String] scc.socketTextStream(192.168.137.110, 9999) val words: DStream[String] lines.flatMap(_.split( )) val wordDstream: DStream[(String, Int)] words.map(x (x, 1)) val stateDstream: DStream[(String, Int)] wordDstream.updateStateByKey[Int](updateFunc) // 打印出状态 stateDstream.print() // 将统计结果保存到MySQL中 stateDstream.foreachRDD(rdd { val repartitionedRDD rdd.repartition(3) repartitionedRDD.foreachPartition(StreamingSaveMySQL8.writeToMySQL) }) scc.start() scc.awaitTermination() scc.stop() } }StreamingSaveMySQL8.scala package cn.lh.spark import java.sql.DriverManager object StreamingSaveMySQL8 { // 定义写入 MySQL 的函数 def writeToMySQL(iter: Iterator[(String,Int)]): Unit { // 保存到MySQL val ip 192.168.137.110 val port 3306 val db sparklearning val username lh val pwd Lh123456! val jdbcurl sjdbc:mysql://$ip:$port/$db val conn DriverManager.getConnection(jdbcurl, username, pwd) val statement conn.prepareStatement(INSERT INTO wordcount (word,count) VALUES (?,?)) try { // 写入数据 iter.foreach { wc statement.setString(1, wc._1.trim) statement.setInt(2, wc._2.toInt) statement.executeUpdate() } } catch { case e:Exception e.printStackTrace() } finally { if(statement ! null){ statement.close() } if(conn!null){ conn.close() } } } } 运行测试 准备工作 提前在mysql中新建数据表保存Spark Streaming写入的数据 启动nc -lk 9999 启动 NetworkWordCountStatefultoMysql.scala ![[Pasted image 20230804214904.png]] 在nc端口输入字符再分别到idea控制台和MySQL检查结果 总结 本次实验通过IDEA基于Spark Streaming 3.4.1开发程序监控套接字流并统计字符串实现实时统计单词出现的数量。试验成功相对简单。 后期改善点如下 通过配置文件读取mysql数据库相应的配置信息不要写死在代码里写入数据时sql语句【插入的表信息】可以在调用方法时当作参数输入iter: Iterator[(String,Int)] 应用泛型插入表时自动保存插入时间 欢迎各位开发者一同改进代码有问题有疑问提出来交流。谢谢
http://www.pierceye.com/news/548702/

相关文章:

  • 网站备案幕布照片怎么算合格北京专业网站开发
  • 浙江建设职业技术学院塘栖校区网站网络推广方法有几种
  • 呢图网站场建设封面长安镇做网站
  • 搜索关键词的网站黄山做网站
  • 网络建站系统去除wordpress主题头部作者信息
  • 鼓楼微网站开发番禺有经验的网站建设
  • 对网站建设心得进入公众号继续阅读怎么弄
  • 网站建设公司投诉电话可以先做网站后备案吗
  • 顺的品牌网站设计价位网站开发合同书
  • 用html5制作个人网站深圳百度国际大厦
  • 网络网站公司网站管理平台扩展插件
  • 个人小说网站怎么做把自己做的网页变成网站
  • led网站制作网站登录后不显示内容
  • 推广网站公司兴义市住房和城乡建设局网站
  • 营销网站建设的公司哪家好创建网站超链接
  • 苏州网站优化排名推广企业网站建设价格
  • 在线制作论坛网站wordpress建立好的网站
  • 网站的功能规范商城网站静态模板下载
  • 双语言网站源码制作网页的软件哪个好
  • 政务网站建设需求网站首页页面代码
  • 网站产品详情页怎么做的用服务器做网站空间
  • 河北网站制作报价长春市建设技工学校网站
  • 盘锦做网站专家常州网站建设企业网站制作
  • 关于建设网站的报告wordpress 视频 广告插件
  • 生态养殖网站模板网赌网站建设多少钱
  • wordpress is长沙百度提升优化
  • 福州网站建设哪个好网页被禁止浏览怎么解决
  • 缩短链接的网站磁力猫引擎
  • 佛山网站到首页排名网站开发招标任务书
  • 网站建设相关专业手机网站建设软件有哪些