当前位置: 首页 > news >正文

网站建设费属于文化事业建设费软件工程师发展前景

网站建设费属于文化事业建设费,软件工程师发展前景,印象笔记配置到wordpress,网站排版类型spark-streaming连接消费nsq目的使用 NSQ作为消息流使用 spark-streaming 进行消费对数据进行清洗后#xff0c;保存到hive仓库中连接方案1、编写Spark Streaming Custom Receivers(spark-streaming 自定义接收器)#xff0c;详细见文档2、使用 nsq 官方提供的Java程序连接包…spark-streaming连接消费nsq目的使用 NSQ作为消息流使用 spark-streaming 进行消费对数据进行清洗后保存到hive仓库中连接方案1、编写Spark Streaming Custom Receivers(spark-streaming 自定义接收器)详细见文档2、使用 nsq 官方提供的Java程序连接包 JavaNSQClient 详细见文档详细代码自定义连接器ReliableNSQReceiver.scalaimport com.github.brainlag.nsq.callbacks.NSQMessageCallbackimport com.github.brainlag.nsq.lookup.DefaultNSQLookupimport com.github.brainlag.nsq.{NSQConsumer, NSQMessage}import org.apache.spark.internal.Loggingimport org.apache.spark.storage.StorageLevelimport org.apache.spark.streaming.receiver.Receiverclass MessageCallbacks(store_fun:String Unit) extends NSQMessageCallback with Logging {def message(message: NSQMessage): Unit {val s new String(message.getMessage())store_fun(s)message.finished()}}/* 自定义连接器 */class ReliableNSQReceiver(host: String, port: Int, topic: String, channel: String)extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging {var consumer: NSQConsumer nulldef onStart() {// 启动通过连接接收数据的线程new Thread(Socket Receiver) {override def run() { receive() }}.start()}def onStop() {logInfo(Stopped receiving)consumer.close}/** 接收数据 */private def receive() {try {val lookup new DefaultNSQLookuplookup.addLookupAddress(host, port)consumer new NSQConsumer(lookup, topic, channel, new MessageCallbacks(store))consumer.start} catch {case e: java.net.ConnectException restart(Error connecting to host : port, e)case t: Throwable restart(Error receiving data, t)}}}使用连接器import com.google.gson.JsonParserimport org.apache.spark.SparkConfimport org.apache.spark.internal.Loggingimport org.apache.spark.sql.{DataFrame, SparkSession}import org.apache.spark.streaming.dstream.DStreamimport org.apache.spark.streaming.{Seconds, StreamingContext}/** 在定义一个 context 之后,您必须执行以下操作.* 通过创建输入 DStreams 来定义输入源.* 通过应用转换和输出操作 DStreams 定义流计算(streaming computations).* 开始接收输入并且使用 streamingContext.start() 来处理数据.* 使用 streamingContext.awaitTermination() 等待处理被终止(手动或者由于任何错误).* 使用 streamingContext.stop() 来手动的停止处理.*/object ELKStreaming extends Logging{def main(args: Array[String]): Unit {if (args.length 4) {System.err.println(Usage: ELKStreaming )System.exit(1)}logInfo(start )StreamingExamples.setStreamingLogLevels()val sparkConf new SparkConf().setAppName(ELKStreaming).setMaster(yarn).set(hive.metastore.uris, thrift://hadoop15.bigdata.org:9083)// 创建一个批次间隔为10val ssc new StreamingContext(sparkConf, Seconds(args(2).toInt))// 使用自定义的NSQReceiverval lines ssc.receiverStream(new ReliableNSQReceiver(args(0), args(1).toInt, log, scalatest))val hiveStream: DStream[(String, String)] lines.map(line prefix_exit(line))// 将计算后的数据保存到hive中hiveStream.foreachRDD(rdd {// 利用SparkConf来初始化SparkSession。val sparkSession: SparkSession SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()// 导入隐式转换来将RDDimport sparkSession.implicits._// 将RDD转换成DFval df: DataFrame rdd.toDF(str, ymd)// 取出表中的字段logInfo(df count df.count)df.createOrReplaceTempView(spark_logs)sparkSession.sql(insert into args(3) partition (ymd) select str,ymd from spark_logs)})ssc.start()ssc.awaitTermination()}def prefix_exit(line:String):(String,String) {// 对数据进行清洗计算val obj new JsonParser().parse(line).getAsJsonObjectval data_str1 obj.get(recv_timestamp).toString().split(T|Z|\)val data_str2 data_str1(1).split(-)val data_str3 data_str2(1)/data_str2(2)/data_str2(0) data_str1(2) [I] obj.get(index_type).toString().split(\)(1) lineval data_str4 data_str2(0)data_str2(1)data_str2(2)(data_str3.toString(), data_str4.toString())}}
http://www.pierceye.com/news/917818/

相关文章:

  • 设计得很好的企业网站wordpress 标签云
  • 杂志网站模板苏州设计网页网站好
  • 设计理论网站清远市发布
  • 长沙本土网站制作公司wordpress thegem
  • 网站后台 源码重庆电力公司网站
  • 泰安企业网站建设电话廉江手机网站建设公司
  • 自已建网站微信登录珠海绿网科技有限公司
  • 大良网站制作太原建筑公司网站
  • 网站开发的交付文档抖音代运营费用明细
  • 自适应网站建设沈阳网站安全建设需求
  • 列表主题wordpress国外seo综合查询
  • 装修网站怎么做推广做百度网站每年的费用多少
  • 网站搭建免费视频教程省企联网站建设要求
  • 天津大学生专业做网站建设网站价格
  • 携程网站建设进度及实施过程文具电子商务网站开发内容
  • 怎么查看网站打开速度网站源码整站下载
  • 北京城乡住房建设部网站常见的网络营销推广方式有哪些
  • 做网站的成本费用钱宝网站怎么做任务
  • 网站上的格式用html怎么做部队网站设计
  • 帮客户做网站内容社交网站有哪些如何做
  • 网站开发与设计实训总结两千字公众号制作的网站开发
  • 一个公司做2个产品网站怎么做的用html5做的网站素材
  • 内乡网站建设咸阳网站建设报价
  • 企业网站多少钱扶余手机网站开发
  • 做外汇网站卖判刑多少年如何找回网站后台密码
  • 怎么做优惠券网站asp.net mvc 5网站开发之美
  • 网站底部浮动电话广告福建住房和城乡建设部网站
  • 建站之星破解版wordpress 置顶排序
  • c2c网站代表和网址涟源市建设局网站
  • 哪个网站有免费的模板免费网上商城系统