怎么优化整站,自己做购物网站好吗,asp flash网站模板,wordpress报名系统转自#xff1a;http://www.cnblogs.com/xlturing/p/spark.html 前言 在使用Spark Streaming的过程中对于计算产生结果的进行持久化时#xff0c;我们往往需要操作数据库#xff0c;去统计或者改变一些值。最近一个实时消费者处理任务#xff0c;在使用spark streaming进行…转自http://www.cnblogs.com/xlturing/p/spark.html 前言 在使用Spark Streaming的过程中对于计算产生结果的进行持久化时我们往往需要操作数据库去统计或者改变一些值。最近一个实时消费者处理任务在使用spark streaming进行实时的数据流处理时我需要将计算好的数据更新到hbase和mysql中所以本文对spark操作hbase和mysql的内容进行总结并且对自己踩到的一些坑进行记录。 Spark Streaming持久化设计模式 DStreams输出操作 print打印driver结点上每个Dstream中的前10个batch元素常用于开发和调试saveAsTextFiles(prefix, [suffix])将当前Dstream保存为文件每个interval batch的文件名命名规则基于prefix和suffixprefix-TIME_IN_MS[.suffix].saveAsObjectFiles(prefix, [suffix])将当前的Dstream内容作为Java可序列化对象的序列化文件进行保存每个interval batch的文件命名规则基于prefix和suffix: prefix-TIME_IN_MS[.suffix].saveAsHadoopFiles(prefix, [suffix])将Dstream以hadoop文件的形式进行保存每个interval batch的文件命名规则基于prefix和suffix: prefix-TIME_IN_MS[.suffix].foreachRDD(func)最通用的输出操作可以对从数据流中产生的每一个RDD应用函数_fun_。通常_fun_会将每个RDD中的数据保存到外部系统如将RDD保存到文件或者通过网络连接保存到数据库。值得注意的是_fun_执行在跑应用的driver进程中并且通常会包含RDD action以促使数据流RDD开始计算。使用foreachRDD的设计模式 dstream.foreachRDD对于开发而言提供了很大的灵活性但在使用时也要避免很多常见的坑。我们通常将数据保存到外部系统中的流程是建立远程连接-通过连接传输数据到远程系统-关闭连接。针对这个流程我们很直接的想到了下面的程序代码  dstream.foreachRDD { rdd val connection  createNewConnection()  // executed at the driver rdd.foreach { record  connection.send(record) // executed at the worker } }  在spark踩坑记——初试中对spark的worker和driver进行了整理我们知道在集群模式下上述代码中的connection需要通过序列化对象的形式从driver发送到worker但是connection是无法在机器之间传递的即connection是无法序列化的这样可能会引起_serialization errors (connection object not serializable)_的错误。为了避免这种错误我们将conenction在worker当中建立代码如下 dstream.foreachRDD { rdd rdd.foreach { record val connection  createNewConnection()connection.send(record)connection.close() } } 似乎这样问题解决了但是细想下我们在每个rdd的每条记录当中都进行了connection的建立和关闭这会导致不必要的高负荷并且降低整个系统的吞吐量。所以一个更好的方式是使用_rdd.foreachPartition_即对于每一个rdd的partition建立唯一的连接(注每个partition是内的rdd是运行在同一worker之上的)代码如下 dstream.foreachRDD { rdd rdd.foreachPartition { partitionOfRecords val connection  createNewConnection()partitionOfRecords.foreach(record  connection.send(record))connection.close()}
} 这样我们降低了频繁建立连接的负载通常我们在连接数据库时会使用连接池把连接池的概念引入代码优化如下 dstream.foreachRDD { rdd rdd.foreachPartition { partitionOfRecords // ConnectionPool is a static, lazily initialized pool of connectionsval connection  ConnectionPool.getConnection()partitionOfRecords.foreach(record  connection.send(record))ConnectionPool.returnConnection(connection)  // return to the pool for future reuse}
} 通过持有一个静态连接池对象我们可以重复利用connection而进一步优化了连接建立的开销从而降低了负载。另外值得注意的是同数据库的连接池类似我们这里所说的连接池同样应该是lazy的按需建立连接并且及时的收回超时的连接。另外值得注意的是 如果在spark streaming中使用了多次foreachRDD它们之间是按照程序顺序向下执行的Dstream对于输出操作的执行策略是lazy的所以如果我们在foreachRDD中不添加任何RDD action那么系统仅仅会接收数据然后将数据丢弃。Spark访问Hbase 上面我们阐述了将spark streaming的Dstream输出到外部系统的基本设计模式这里我们阐述如何将Dstream输出到Hbase集群。 Hbase通用连接类 Scala连接Hbase是通过zookeeper获取信息所以在配置时需要提供zookeeper的相关信息如下 import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Connection
import org.apache.hadoop.hbase.HConstants import org.apache.hadoop.hbase.client.ConnectionFactory object HbaseUtil extends Serializable { private val conf  HBaseConfiguration.create() private val para  Conf.hbaseConfig // Conf为配置类获取hbase的配置 conf.set(HConstants.ZOOKEEPER_CLIENT_PORT, para.get(port).getOrElse(2181)) conf.set(HConstants.ZOOKEEPER_QUORUM, para.get(quorum).getOrElse(127-0-0-1)) // hosts private val connection  ConnectionFactory.createConnection(conf) def getHbaseConn: Connection  connection } 根据网上资料Hbase的连接的特殊性我们并没有使用连接池 Hbase输出操作 我们以put操作为例演示将上述设计模式应用到Hbase输出操作当中 dstream.foreachRDD(rdd  {if (!rdd.isEmpty) {rdd.foreachPartition(partitionRecords  {val connection  HbaseUtil.getHbaseConn // 获取Hbase连接partitionRecords.foreach(data  {val tableName  TableName.valueOf(tableName)val t  connection.getTable(tableName)try { val put  new Put(Bytes.toBytes(_rowKey_)) // row key // column, qualifier, value put.addColumn(_column_.getBytes, _qualifier_.getBytes, _value_.getBytes) Try(t.put(put)).getOrElse(t.close()) // do some log显示在worker上 } catch { case e: Exception  // log error e.printStackTrace() } finally { t.close() } }) }) // do some log(显示在driver上) } }) 关于Hbase的其他操作可以参考Spark 下操作 HBase1.0.0 新 API 填坑记录 重点记录在连接Hbase过程中配置HConstants.ZOOKEEPER_QUORUM的问题  由于Hbase的连接不能直接使用ip地址进行访问,往往需要配置hosts,例如我在上述代码段中127-0-0-1任意我们在hosts中需要配置 127-0-0-1 127.0.0.1  在单机情况下我们只需要配置一台zookeeper所在Hbase的hosts即可但是当切换到Hbase集群是遇到一个诡异的bug问题描述在foreachRDD中将Dstream保存到Hbase时会卡住并且没有任何错误信息爆出没错它就是卡住没反应问题分析由于Hbase集群有多台机器而我们只配置了一台Hbase机器的hosts这样导致Spark集群在访问Hbase时不断的去寻找但却找不到就卡在那里解决方式对每个worker上的hosts配置了所有hbase的节点ip问题解决 Spark访问Mysql 同访问Hbase类似我们也需要有一个可序列化的类来建立Mysql连接这里我们利用了Mysql的C3P0连接池 MySQL通用连接类 import java.sql.Connection
import java.util.Propertiesimport com.mchange.v2.c3p0.ComboPooledDataSource class MysqlPool extends Serializable { private val cpds: ComboPooledDataSource  new ComboPooledDataSource(true) private val conf  Conf.mysqlConfig try { cpds.setJdbcUrl(conf.get(url).getOrElse(jdbc:mysql://127.0.0.1:3306/test_bee?useUnicodetrueamp;characterEncodingUTF-8)); cpds.setDriverClass(com.mysql.jdbc.Driver); cpds.setUser(conf.get(username).getOrElse(root)); cpds.setPassword(conf.get(password).getOrElse()) cpds.setMaxPoolSize(200) cpds.setMinPoolSize(20) cpds.setAcquireIncrement(5) cpds.setMaxStatements(180) } catch { case e: Exception  e.printStackTrace() } def getConnection: Connection  { try { return cpds.getConnection(); } catch { case ex: Exception  ex.printStackTrace() null } } } object MysqlManager { var mysqlManager: MysqlPool  _ def getMysqlManager: MysqlPool  { synchronized { if (mysqlManager  null) { mysqlManager  new MysqlPool } } mysqlManager } } 我们利用c3p0建立Mysql连接池然后访问的时候每次从连接池中取出连接用于数据传输。 Mysql输出操作 同样利用之前的foreachRDD设计模式将Dstream输出到mysql的代码如下 dstream.foreachRDD(rdd  {if (!rdd.isEmpty) {rdd.foreachPartition(partitionRecords  {//从连接池中获取一个连接val conn  MysqlManager.getMysqlManager.getConnectionval statement  conn.createStatementtry {conn.setAutoCommit(false)partitionRecords.foreach(record  { val sql  insert into table... // 需要执行的sql操作 statement.addBatch(sql) }) statement.executeBatch conn.commit } catch { case e: Exception  // do some log } finally { statement.close() conn.close() } }) } }) 值得注意的是: 我们在提交Mysql的操作的时候并不是每条记录提交一次而是采用了批量提交的形式所以需要将conn.setAutoCommit(false)这样可以进一步提高mysql的效率。如果我们更新Mysql中带索引的字段时会导致更新速度较慢这种情况应想办法避免如果不可避免那就硬上吧T^T)部署 提供一下Spark连接Mysql和Hbase所需要的jar包的maven配置 dependency!-- Hbase --groupIdorg.apache.hbase/groupId artifactIdhbase-client/artifactId version1.0.0/version /dependency dependency groupIdorg.apache.hbase/groupId artifactIdhbase-common/artifactId version1.0.0/version /dependency dependency groupIdorg.apache.hbase/groupId artifactIdhbase-server/artifactId version1.0.0/version /dependency dependency!-- Mysql -- groupIdmysql/groupId artifactIdmysql-connector-java/artifactId version5.1.31/version /dependency dependency groupIdc3p0/groupId artifactIdc3p0/artifactId version0.9.1.2/version /dependency 参考文献 Spark Streaming Programming GuideHBase介绍Spark 下操作 HBase1.0.0 新 APISpark开发快速入门kafka-spark-streaming-mysqlscala实时数据处理示例Spark Streaming 中使用c3p0连接池操作mysql数据库