当前位置: 首页 > news >正文

怎么优化整站自己做购物网站好吗

怎么优化整站,自己做购物网站好吗,asp flash网站模板,wordpress报名系统转自#xff1a;http://www.cnblogs.com/xlturing/p/spark.html 前言 在使用Spark Streaming的过程中对于计算产生结果的进行持久化时#xff0c;我们往往需要操作数据库#xff0c;去统计或者改变一些值。最近一个实时消费者处理任务#xff0c;在使用spark streaming进行…转自http://www.cnblogs.com/xlturing/p/spark.html 前言 在使用Spark Streaming的过程中对于计算产生结果的进行持久化时我们往往需要操作数据库去统计或者改变一些值。最近一个实时消费者处理任务在使用spark streaming进行实时的数据流处理时我需要将计算好的数据更新到hbase和mysql中所以本文对spark操作hbase和mysql的内容进行总结并且对自己踩到的一些坑进行记录。 Spark Streaming持久化设计模式 DStreams输出操作 print打印driver结点上每个Dstream中的前10个batch元素常用于开发和调试saveAsTextFiles(prefix, [suffix])将当前Dstream保存为文件每个interval batch的文件名命名规则基于prefix和suffixprefix-TIME_IN_MS[.suffix].saveAsObjectFiles(prefix, [suffix])将当前的Dstream内容作为Java可序列化对象的序列化文件进行保存每个interval batch的文件命名规则基于prefix和suffix: prefix-TIME_IN_MS[.suffix].saveAsHadoopFiles(prefix, [suffix])将Dstream以hadoop文件的形式进行保存每个interval batch的文件命名规则基于prefix和suffix: prefix-TIME_IN_MS[.suffix].foreachRDD(func)最通用的输出操作可以对从数据流中产生的每一个RDD应用函数_fun_。通常_fun_会将每个RDD中的数据保存到外部系统如将RDD保存到文件或者通过网络连接保存到数据库。值得注意的是_fun_执行在跑应用的driver进程中并且通常会包含RDD action以促使数据流RDD开始计算。使用foreachRDD的设计模式 dstream.foreachRDD对于开发而言提供了很大的灵活性但在使用时也要避免很多常见的坑。我们通常将数据保存到外部系统中的流程是建立远程连接-通过连接传输数据到远程系统-关闭连接。针对这个流程我们很直接的想到了下面的程序代码 dstream.foreachRDD { rdd val connection createNewConnection() // executed at the driver rdd.foreach { record connection.send(record) // executed at the worker } } 在spark踩坑记——初试中对spark的worker和driver进行了整理我们知道在集群模式下上述代码中的connection需要通过序列化对象的形式从driver发送到worker但是connection是无法在机器之间传递的即connection是无法序列化的这样可能会引起_serialization errors (connection object not serializable)_的错误。为了避免这种错误我们将conenction在worker当中建立代码如下 dstream.foreachRDD { rdd rdd.foreach { record val connection createNewConnection()connection.send(record)connection.close() } } 似乎这样问题解决了但是细想下我们在每个rdd的每条记录当中都进行了connection的建立和关闭这会导致不必要的高负荷并且降低整个系统的吞吐量。所以一个更好的方式是使用_rdd.foreachPartition_即对于每一个rdd的partition建立唯一的连接(注每个partition是内的rdd是运行在同一worker之上的)代码如下 dstream.foreachRDD { rdd rdd.foreachPartition { partitionOfRecords val connection createNewConnection()partitionOfRecords.foreach(record connection.send(record))connection.close()} } 这样我们降低了频繁建立连接的负载通常我们在连接数据库时会使用连接池把连接池的概念引入代码优化如下 dstream.foreachRDD { rdd rdd.foreachPartition { partitionOfRecords // ConnectionPool is a static, lazily initialized pool of connectionsval connection ConnectionPool.getConnection()partitionOfRecords.foreach(record connection.send(record))ConnectionPool.returnConnection(connection) // return to the pool for future reuse} } 通过持有一个静态连接池对象我们可以重复利用connection而进一步优化了连接建立的开销从而降低了负载。另外值得注意的是同数据库的连接池类似我们这里所说的连接池同样应该是lazy的按需建立连接并且及时的收回超时的连接。另外值得注意的是 如果在spark streaming中使用了多次foreachRDD它们之间是按照程序顺序向下执行的Dstream对于输出操作的执行策略是lazy的所以如果我们在foreachRDD中不添加任何RDD action那么系统仅仅会接收数据然后将数据丢弃。Spark访问Hbase 上面我们阐述了将spark streaming的Dstream输出到外部系统的基本设计模式这里我们阐述如何将Dstream输出到Hbase集群。 Hbase通用连接类 Scala连接Hbase是通过zookeeper获取信息所以在配置时需要提供zookeeper的相关信息如下 import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.Connection import org.apache.hadoop.hbase.HConstants import org.apache.hadoop.hbase.client.ConnectionFactory object HbaseUtil extends Serializable { private val conf HBaseConfiguration.create() private val para Conf.hbaseConfig // Conf为配置类获取hbase的配置 conf.set(HConstants.ZOOKEEPER_CLIENT_PORT, para.get(port).getOrElse(2181)) conf.set(HConstants.ZOOKEEPER_QUORUM, para.get(quorum).getOrElse(127-0-0-1)) // hosts private val connection ConnectionFactory.createConnection(conf) def getHbaseConn: Connection connection } 根据网上资料Hbase的连接的特殊性我们并没有使用连接池 Hbase输出操作 我们以put操作为例演示将上述设计模式应用到Hbase输出操作当中 dstream.foreachRDD(rdd {if (!rdd.isEmpty) {rdd.foreachPartition(partitionRecords {val connection HbaseUtil.getHbaseConn // 获取Hbase连接partitionRecords.foreach(data {val tableName TableName.valueOf(tableName)val t connection.getTable(tableName)try { val put new Put(Bytes.toBytes(_rowKey_)) // row key // column, qualifier, value put.addColumn(_column_.getBytes, _qualifier_.getBytes, _value_.getBytes) Try(t.put(put)).getOrElse(t.close()) // do some log显示在worker上 } catch { case e: Exception // log error e.printStackTrace() } finally { t.close() } }) }) // do some log(显示在driver上) } }) 关于Hbase的其他操作可以参考Spark 下操作 HBase1.0.0 新 API 填坑记录 重点记录在连接Hbase过程中配置HConstants.ZOOKEEPER_QUORUM的问题 由于Hbase的连接不能直接使用ip地址进行访问,往往需要配置hosts,例如我在上述代码段中127-0-0-1任意我们在hosts中需要配置 127-0-0-1 127.0.0.1 在单机情况下我们只需要配置一台zookeeper所在Hbase的hosts即可但是当切换到Hbase集群是遇到一个诡异的bug问题描述在foreachRDD中将Dstream保存到Hbase时会卡住并且没有任何错误信息爆出没错它就是卡住没反应问题分析由于Hbase集群有多台机器而我们只配置了一台Hbase机器的hosts这样导致Spark集群在访问Hbase时不断的去寻找但却找不到就卡在那里解决方式对每个worker上的hosts配置了所有hbase的节点ip问题解决 Spark访问Mysql 同访问Hbase类似我们也需要有一个可序列化的类来建立Mysql连接这里我们利用了Mysql的C3P0连接池 MySQL通用连接类 import java.sql.Connection import java.util.Propertiesimport com.mchange.v2.c3p0.ComboPooledDataSource class MysqlPool extends Serializable { private val cpds: ComboPooledDataSource new ComboPooledDataSource(true) private val conf Conf.mysqlConfig try { cpds.setJdbcUrl(conf.get(url).getOrElse(jdbc:mysql://127.0.0.1:3306/test_bee?useUnicodetrueamp;characterEncodingUTF-8)); cpds.setDriverClass(com.mysql.jdbc.Driver); cpds.setUser(conf.get(username).getOrElse(root)); cpds.setPassword(conf.get(password).getOrElse()) cpds.setMaxPoolSize(200) cpds.setMinPoolSize(20) cpds.setAcquireIncrement(5) cpds.setMaxStatements(180) } catch { case e: Exception e.printStackTrace() } def getConnection: Connection { try { return cpds.getConnection(); } catch { case ex: Exception ex.printStackTrace() null } } } object MysqlManager { var mysqlManager: MysqlPool _ def getMysqlManager: MysqlPool { synchronized { if (mysqlManager null) { mysqlManager new MysqlPool } } mysqlManager } } 我们利用c3p0建立Mysql连接池然后访问的时候每次从连接池中取出连接用于数据传输。 Mysql输出操作 同样利用之前的foreachRDD设计模式将Dstream输出到mysql的代码如下 dstream.foreachRDD(rdd {if (!rdd.isEmpty) {rdd.foreachPartition(partitionRecords {//从连接池中获取一个连接val conn MysqlManager.getMysqlManager.getConnectionval statement conn.createStatementtry {conn.setAutoCommit(false)partitionRecords.foreach(record { val sql insert into table... // 需要执行的sql操作 statement.addBatch(sql) }) statement.executeBatch conn.commit } catch { case e: Exception // do some log } finally { statement.close() conn.close() } }) } }) 值得注意的是: 我们在提交Mysql的操作的时候并不是每条记录提交一次而是采用了批量提交的形式所以需要将conn.setAutoCommit(false)这样可以进一步提高mysql的效率。如果我们更新Mysql中带索引的字段时会导致更新速度较慢这种情况应想办法避免如果不可避免那就硬上吧T^T)部署 提供一下Spark连接Mysql和Hbase所需要的jar包的maven配置 dependency!-- Hbase --groupIdorg.apache.hbase/groupId artifactIdhbase-client/artifactId version1.0.0/version /dependency dependency groupIdorg.apache.hbase/groupId artifactIdhbase-common/artifactId version1.0.0/version /dependency dependency groupIdorg.apache.hbase/groupId artifactIdhbase-server/artifactId version1.0.0/version /dependency dependency!-- Mysql -- groupIdmysql/groupId artifactIdmysql-connector-java/artifactId version5.1.31/version /dependency dependency groupIdc3p0/groupId artifactIdc3p0/artifactId version0.9.1.2/version /dependency 参考文献 Spark Streaming Programming GuideHBase介绍Spark 下操作 HBase1.0.0 新 APISpark开发快速入门kafka-spark-streaming-mysqlscala实时数据处理示例Spark Streaming 中使用c3p0连接池操作mysql数据库
http://www.pierceye.com/news/149471/

相关文章:

  • 晋江网站建设公司电脑培训网
  • 电子商务网站开发的题网站关键词排名怎么提升
  • 在百度网站备案查询上显示未备案是什么意思wordpress资源分享主题
  • 夏县做网站郑州做商城网站
  • 网站首页推荐网络服务提供者发现用户利用其网络服务对未成年
  • 中外网站建设区别微信软文是什么意思
  • 苏州网站建设极简幕枫卫浴网站建设
  • 优秀企业网站欣赏网站的备案怎么处理
  • 怎样做古玩网站毕业设计开题报告网站开发
  • 西安网站 建设app注册推广
  • 丹徒网站建设公司代理公司注册价格
  • 网站建站建设网站中国商标商标查询网
  • 机械加工网站平台南京app制作开发公司
  • 用vs2008做网站教程seo推广网址
  • 正规制作网站公司哪家好视觉传达设计专业作品集
  • 做网站多少钱特惠西宁君博s网站网站建设多少钱
  • 建筑模版东莞网站建设技术支持手机网站开发学习
  • 专业网站建设效果显著做设计找参考的设计网站有那些
  • 最新网站建设技术2022年新闻摘抄简短
  • 手机网站总是自动跳转最吃香的男生十大手艺
  • 免费网站推广软件哪个好企业vi设计公司价格
  • 自助建网站不需要域名番禺网站优化平台
  • 一般建设网站的常见问题国家企业信用信息公示官网
  • 韩国美容网站 模板互联网大赛官网入口
  • 太原网站开发哪家好wordpress怎么贴代码
  • 深圳网站设计与制作网站建设公司海南
  • 做网站需要什么cailiao网站项目申报书建设规模
  • wordpress手机网站模板wordpress分类设置seo
  • 哪个网站设计好互助网站制作公司
  • 网站建设评估报告惠民建设局网站