网站建设费属于文化事业建设费,软件工程师发展前景,印象笔记配置到wordpress,网站排版类型spark-streaming连接消费nsq目的使用 NSQ作为消息流使用 spark-streaming 进行消费对数据进行清洗后#xff0c;保存到hive仓库中连接方案1、编写Spark Streaming Custom Receivers(spark-streaming 自定义接收器)#xff0c;详细见文档2、使用 nsq 官方提供的Java程序连接包…spark-streaming连接消费nsq目的使用 NSQ作为消息流使用 spark-streaming 进行消费对数据进行清洗后保存到hive仓库中连接方案1、编写Spark Streaming Custom Receivers(spark-streaming 自定义接收器)详细见文档2、使用 nsq 官方提供的Java程序连接包 JavaNSQClient 详细见文档详细代码自定义连接器ReliableNSQReceiver.scalaimport com.github.brainlag.nsq.callbacks.NSQMessageCallbackimport com.github.brainlag.nsq.lookup.DefaultNSQLookupimport com.github.brainlag.nsq.{NSQConsumer, NSQMessage}import org.apache.spark.internal.Loggingimport org.apache.spark.storage.StorageLevelimport org.apache.spark.streaming.receiver.Receiverclass MessageCallbacks(store_fun:String Unit) extends NSQMessageCallback with Logging {def message(message: NSQMessage): Unit {val s new String(message.getMessage())store_fun(s)message.finished()}}/* 自定义连接器 */class ReliableNSQReceiver(host: String, port: Int, topic: String, channel: String)extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging {var consumer: NSQConsumer nulldef onStart() {// 启动通过连接接收数据的线程new Thread(Socket Receiver) {override def run() { receive() }}.start()}def onStop() {logInfo(Stopped receiving)consumer.close}/** 接收数据 */private def receive() {try {val lookup new DefaultNSQLookuplookup.addLookupAddress(host, port)consumer new NSQConsumer(lookup, topic, channel, new MessageCallbacks(store))consumer.start} catch {case e: java.net.ConnectException restart(Error connecting to host : port, e)case t: Throwable restart(Error receiving data, t)}}}使用连接器import com.google.gson.JsonParserimport org.apache.spark.SparkConfimport org.apache.spark.internal.Loggingimport org.apache.spark.sql.{DataFrame, SparkSession}import org.apache.spark.streaming.dstream.DStreamimport org.apache.spark.streaming.{Seconds, StreamingContext}/** 在定义一个 context 之后,您必须执行以下操作.* 通过创建输入 DStreams 来定义输入源.* 通过应用转换和输出操作 DStreams 定义流计算(streaming computations).* 开始接收输入并且使用 streamingContext.start() 来处理数据.* 使用 streamingContext.awaitTermination() 等待处理被终止(手动或者由于任何错误).* 使用 streamingContext.stop() 来手动的停止处理.*/object ELKStreaming extends Logging{def main(args: Array[String]): Unit {if (args.length 4) {System.err.println(Usage: ELKStreaming )System.exit(1)}logInfo(start )StreamingExamples.setStreamingLogLevels()val sparkConf new SparkConf().setAppName(ELKStreaming).setMaster(yarn).set(hive.metastore.uris, thrift://hadoop15.bigdata.org:9083)// 创建一个批次间隔为10val ssc new StreamingContext(sparkConf, Seconds(args(2).toInt))// 使用自定义的NSQReceiverval lines ssc.receiverStream(new ReliableNSQReceiver(args(0), args(1).toInt, log, scalatest))val hiveStream: DStream[(String, String)] lines.map(line prefix_exit(line))// 将计算后的数据保存到hive中hiveStream.foreachRDD(rdd {// 利用SparkConf来初始化SparkSession。val sparkSession: SparkSession SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()// 导入隐式转换来将RDDimport sparkSession.implicits._// 将RDD转换成DFval df: DataFrame rdd.toDF(str, ymd)// 取出表中的字段logInfo(df count df.count)df.createOrReplaceTempView(spark_logs)sparkSession.sql(insert into args(3) partition (ymd) select str,ymd from spark_logs)})ssc.start()ssc.awaitTermination()}def prefix_exit(line:String):(String,String) {// 对数据进行清洗计算val obj new JsonParser().parse(line).getAsJsonObjectval data_str1 obj.get(recv_timestamp).toString().split(T|Z|\)val data_str2 data_str1(1).split(-)val data_str3 data_str2(1)/data_str2(2)/data_str2(0) data_str1(2) [I] obj.get(index_type).toString().split(\)(1) lineval data_str4 data_str2(0)data_str2(1)data_str2(2)(data_str3.toString(), data_str4.toString())}}