当前位置: 首页 > news >正文

房地产集团网站建设方案福州网站建设 联系yanktcn 04

房地产集团网站建设方案,福州网站建设 联系yanktcn 04,做英文网站建设,做网站送推广RDD的持久化 RDD缓存 当RDD被重复使用#xff0c;或者计算该RDD比较容易出错#xff0c;而且需要消耗比较多的资源和时间的时候#xff0c;我们就可以将该RDD缓存起来。 主要作用: 提升Spark程序的计算效率 注意事项: RDD的缓存可以存储在内存或者是磁盘上#xff0c;甚至…RDD的持久化 RDD缓存 当RDD被重复使用或者计算该RDD比较容易出错而且需要消耗比较多的资源和时间的时候我们就可以将该RDD缓存起来。 主要作用: 提升Spark程序的计算效率 注意事项: RDD的缓存可以存储在内存或者是磁盘上甚至可以存储在Executor进程的堆外内存中。主要是放在内存中因此缓存的数据是不太稳定可靠。 由于是临时存储可能会存在丢失所以缓存操作并不会将RDD之间的依赖关系给截断掉(丢失掉)因为当缓存 失效后可以全部重新计算 缓存的API都是Lazy惰性的如果需要触发缓存操作推荐调用count算子因为运行效率高 设置缓存的API: rdd.cache(): 将RDD的数据缓存储内存中rdd.persist(缓存的级别/位置): 将RDD的数据存储在指定位置手动清理缓存API:rdd.unpersist() 默认情况下当整个Spark应用程序执行完成后缓存数据会自动失效会被自动删除缓存的级别/位置DISK_ONLY: 只存储在磁盘DISK_ONLY_2: 只存储在磁盘并且有2个副本DISK_ONLY_3: 只存储在磁盘并且有3个副本MEMORY_ONLY: 只存储在内存中MEMORY_ONLY_2: 只存储在内存中并且有2个副本MEMORY_AND_DISK: 存储在内存和磁盘中先放在内存再放在磁盘MEMORY_AND_DISK_2: 存储在内存和磁盘中先放在内存再放在磁盘并且有2个副本OFF_HEAP: Executor进程的堆外内存工作中最常用的是: MEMORY_AND_DISK和MEMORY_AND_DISK_2。优先推荐使用MEMORY_AND_DISK演示缓存的使用操作: import timefrom pyspark import SparkConf, SparkContext, StorageLevel import os import jieba# 绑定指定的Python解释器 os.environ[SPARK_HOME] /export/server/spark os.environ[PYSPARK_PYTHON] /root/anaconda3/bin/python3 os.environ[PYSPARK_DRIVER_PYTHON] /root/anaconda3/bin/python3# 需要过滤的关键词黑名单 keyword_black_list [,.,的,com]# ctrlaltM将代码封装成函数/方法 # 3.2- 需求一统计每个关键词出现了多少次。先提取需要操作的字段并且分词这一步类似WordCount中的对每行进行切分处理再仿照WordCount实现。 def top10_keyword():keyword_rdd etl_rdd.flatMap(lambda line_tup: list(jieba.cut(line_tup[2])))# print(keyword_rdd.take(10))# 数据结构转变。将单词变成元组# keyword_map_rdd keyword_rdd.filter(lambda word:word! or word!.).map(lambda word:(word,1))keyword_map_rdd keyword_rdd.filter(lambda word: word not in keyword_black_list).map(lambda word: (word, 1))# 分组聚合操作keyword_result_rdd keyword_map_rdd.reduceByKey(lambda agg, curr: agg curr)# print(keyword_result_rdd.take(100))# 对结果中关键词的次数降序排序取TOP10keyword_result keyword_result_rdd.top(10, keylambda tup: tup[1])print(keyword_result)# 3.3- 需求二统计每个用户每个搜索内容点击的次数 def content():# 从原始的6个字段中提取出2个字段得到 (用户,搜索内容)new_tup_tmp_rdd etl_rdd.map(lambda tup: (tup[1], tup[2]))# 数据格式转换输入(张三,鸡你太美) - hello输出((张三,鸡你太美),1) - (hello,1)new_tup_rdd new_tup_tmp_rdd.map(lambda tup: (tup, 1))# new_tup_rdd new_tup_tmp_rdd.map(lambda tup:(tup[0],tup[1],1))# 分组聚合content_result new_tup_rdd.reduceByKey(lambda agg, curr: agg curr)print(content_result.take(10))if __name__ __main__:# 1- 创建SparkContextconf SparkConf().setAppName(sogou_demo).setMaster(local[*])sc SparkContext(confconf)# 2- 数据输入init_rdd sc.textFile(file:///export/data/spark_core/data/SogouQ.sample)print(ETL处理前数据条数,init_rdd.count())# 3- 数据处理# 3.1- ETL数据的清洗、转换、加载split()默认按照空白字符进行切分。例如空格、制表符、回车换行符等map和flatMap的主要区别flatMap对每一个元素处理以后会将结果打平/压扁到一个更大的容器当中。map_rdd init_rdd.map(lambda line:line.split())# print(调用map算子后的内容,map_rdd.take(10))# flatmap_rdd init_rdd.flatMap(lambda line: line.split())# print(调用flatMap算子后的内容,flatmap_rdd.take(10))# 过滤掉每行中没有6个字段的数据filter_rdd map_rdd.filter(lambda line_list: len(line_list)6)# 数据结构转换为了演示而演示etl_rdd filter_rdd.map(lambda line_list:(line_list[0],line_list[1],line_list[2][1:-1], # 省略前后的中括号line_list[3],line_list[4],line_list[5]))# 设置缓存。并且调用count算子触发操作# etl_rdd.cache().count()etl_rdd.persist(storageLevelStorageLevel.MEMORY_AND_DISK).count()print(ETL处理后数据条数, etl_rdd.count())# 3.2- 需求一统计每个关键词出现了多少次# top10_keyword()# 3.3- 需求二统计每个用户每个搜索内容点击的次数content()time.sleep(20)# 手动清理缓存。你对哪个RDD设置了缓存那么你就对那个RDD清理缓存。也需要调用count算子触发。etl_rdd.unpersist().count()time.sleep(100)# 5- 释放资源sc.stop()无缓存的DAG流程图显示 有缓存的DAG流程图显示 RDD的checkpoint检查点 RDD缓存主要是将数据存储在内存中是临时存储不太稳定它主要是用来提升程序运行效率的。RDD的checkpoint(检查点)主要是将数据存储在HDFS上是持久化存储。而HDFS存储数据有3副本的机制让数据更加安全可靠。 checkpoint认为使用磁盘或者HDFS存储数据之后数据非常的安全可靠因此checkpoint会将RDD间的依赖关系给删除/丢弃掉。因此如果checkpoint的数据真的出现了问题是无法在从头开始计算。 checkpoint主要作用: 提高程序的容错性 注意事项: checkpoint可以将数据存储在磁盘或者HDFS上主要是将数据存储在HDFS上。 相关API: sc.setCheckpointDir(存储路径): 设置checkpoint数据存放路径 rdd.checkpoint(): 对指定RDD启用checkpoint rdd.count(): 触发checkpoint 代码演示: import timefrom pyspark import SparkConf, SparkContext, StorageLevel import os import jieba# 绑定指定的Python解释器 os.environ[SPARK_HOME] /export/server/spark os.environ[PYSPARK_PYTHON] /root/anaconda3/bin/python3 os.environ[PYSPARK_DRIVER_PYTHON] /root/anaconda3/bin/python3# 需要过滤的关键词黑名单 keyword_black_list [,.,的,com]# ctrlaltM将代码封装成函数/方法 # 3.2- 需求一统计每个关键词出现了多少次。先提取需要操作的字段并且分词这一步类似WordCount中的对每行进行切分处理再仿照WordCount实现。 def top10_keyword():keyword_rdd etl_rdd.flatMap(lambda line_tup: list(jieba.cut(line_tup[2])))# print(keyword_rdd.take(10))# 数据结构转变。将单词变成元组# keyword_map_rdd keyword_rdd.filter(lambda word:word! or word!.).map(lambda word:(word,1))keyword_map_rdd keyword_rdd.filter(lambda word: word not in keyword_black_list).map(lambda word: (word, 1))# 分组聚合操作keyword_result_rdd keyword_map_rdd.reduceByKey(lambda agg, curr: agg curr)# print(keyword_result_rdd.take(100))# 对结果中关键词的次数降序排序取TOP10keyword_result keyword_result_rdd.top(10, keylambda tup: tup[1])print(keyword_result)# 3.3- 需求二统计每个用户每个搜索内容点击的次数 def content():# 从原始的6个字段中提取出2个字段得到 (用户,搜索内容)new_tup_tmp_rdd etl_rdd.map(lambda tup: (tup[1], tup[2]))# 数据格式转换输入(张三,鸡你太美) - hello输出((张三,鸡你太美),1) - (hello,1)new_tup_rdd new_tup_tmp_rdd.map(lambda tup: (tup, 1))# new_tup_rdd new_tup_tmp_rdd.map(lambda tup:(tup[0],tup[1],1))# 分组聚合content_result new_tup_rdd.reduceByKey(lambda agg, curr: agg curr)print(content_result.take(10))if __name__ __main__:# 1- 创建SparkContextconf SparkConf().setAppName(sogou_demo).setMaster(local[*])sc SparkContext(confconf)# 设置checkpoint路径sc.setCheckpointDir(hdfs://node1:8020/chk)# 2- 数据输入init_rdd sc.textFile(file:///export/data/spark_core/data/SogouQ.sample)print(ETL处理前数据条数,init_rdd.count())# 3- 数据处理# 3.1- ETL数据的清洗、转换、加载split()默认按照空白字符进行切分。例如空格、制表符、回车换行符等map和flatMap的主要区别flatMap对每一个元素处理以后会将结果打平/压扁到一个更大的容器当中。map_rdd init_rdd.map(lambda line:line.split())# print(调用map算子后的内容,map_rdd.take(10))# flatmap_rdd init_rdd.flatMap(lambda line: line.split())# print(调用flatMap算子后的内容,flatmap_rdd.take(10))# 过滤掉每行中没有6个字段的数据filter_rdd map_rdd.filter(lambda line_list: len(line_list)6)# 数据结构转换为了演示而演示etl_rdd filter_rdd.map(lambda line_list:(line_list[0],line_list[1],line_list[2][1:-1], # 省略前后的中括号line_list[3],line_list[4],line_list[5]))# 对指定RDD启用checkpointetl_rdd.checkpoint()# 调用count算子触发checkpoint操作etl_rdd.count()print(ETL处理后数据条数, etl_rdd.count())# 3.2- 需求一统计每个关键词出现了多少次# top10_keyword()# 3.3- 需求二统计每个用户每个搜索内容点击的次数content()time.sleep(1000)# 5- 释放资源sc.stop()没有设置检查点正常的DAG执行流图 设置检查点后 缓存和checkpoint的区别 1- 数据存储位置不同 缓存: 存储在内存或者磁盘 或者 堆外内存中 checkpoint检查点: 可以将数据存储在磁盘或者HDFS上, 在集群模式下, 仅能保存到HDFS上 2- 数据生命周期: 缓存: 当程序执行完成后, 或者手动调用unpersist 缓存都会被删除 checkpoint检查点: 即使程序退出后, checkpoint检查点的数据依然是存在的, 不会删除, 需要手动删除 3- 血缘关系: 缓存: 不会截断RDD之间的血缘关系, 因为缓存数据有可能是失效, 当失效后, 需要重新回溯计算操作 checkpoint检查点: 会截断掉依赖关系, 因为checkpoint将数据保存到更加安全可靠的位置, 不会发生数据丢失的问题, 当执行失败的时候, 也不需要重新回溯执行 4- 主要作用不同 缓存: 提高Spark程序的运行效率 checkpoint检查点: 提高Spark程序的容错性 思考既然持久化的方案有两种那么在生产环境中应该使用哪种方案呢? 在同一个项目中推荐缓存和checkpoint(检查点)同时配合使用。使用顺序如下: 在代码中先设置缓存再设置checkpoint检查点然后再一同使用Action算子触发推荐使用count算子。因为这个顺序只会有一次IO写的过程。实际过程如下: 程序会优先从缓存中读取数据如果发现缓存中没有数据。再从checkpoint中读取数据并且接着将读取到的数据重新在内存中放置一份后续还是优先从缓存中读取测试: import timefrom pyspark import SparkConf, SparkContext, StorageLevel import os import jieba# 绑定指定的Python解释器 os.environ[SPARK_HOME] /export/server/spark os.environ[PYSPARK_PYTHON] /root/anaconda3/bin/python3 os.environ[PYSPARK_DRIVER_PYTHON] /root/anaconda3/bin/python3# 需要过滤的关键词黑名单 keyword_black_list [,.,的,com]# ctrlaltM将代码封装成函数/方法 # 3.2- 需求一统计每个关键词出现了多少次。先提取需要操作的字段并且分词这一步类似WordCount中的对每行进行切分处理再仿照WordCount实现。 def top10_keyword():keyword_rdd etl_rdd.flatMap(lambda line_tup: list(jieba.cut(line_tup[2])))# print(keyword_rdd.take(10))# 数据结构转变。将单词变成元组# keyword_map_rdd keyword_rdd.filter(lambda word:word! or word!.).map(lambda word:(word,1))keyword_map_rdd keyword_rdd.filter(lambda word: word not in keyword_black_list).map(lambda word: (word, 1))# 分组聚合操作keyword_result_rdd keyword_map_rdd.reduceByKey(lambda agg, curr: agg curr)# print(keyword_result_rdd.take(100))# 对结果中关键词的次数降序排序取TOP10keyword_result keyword_result_rdd.top(10, keylambda tup: tup[1])print(keyword_result)# 3.3- 需求二统计每个用户每个搜索内容点击的次数 def content():# 从原始的6个字段中提取出2个字段得到 (用户,搜索内容)new_tup_tmp_rdd etl_rdd.map(lambda tup: (tup[1], tup[2]))# 数据格式转换输入(张三,鸡你太美) - hello输出((张三,鸡你太美),1) - (hello,1)new_tup_rdd new_tup_tmp_rdd.map(lambda tup: (tup, 1))# new_tup_rdd new_tup_tmp_rdd.map(lambda tup:(tup[0],tup[1],1))# 分组聚合content_result new_tup_rdd.reduceByKey(lambda agg, curr: agg curr)print(content_result.take(10))if __name__ __main__:# 1- 创建SparkContextconf SparkConf().setAppName(sogou_demo).setMaster(local[*])sc SparkContext(confconf)# 设置checkpoint路径sc.setCheckpointDir(hdfs://node1:8020/chk)# 2- 数据输入init_rdd sc.textFile(file:///export/data/spark_core/data/SogouQ.sample)print(ETL处理前数据条数,init_rdd.count())# 3- 数据处理# 3.1- ETL数据的清洗、转换、加载split()默认按照空白字符进行切分。例如空格、制表符、回车换行符等map和flatMap的主要区别flatMap对每一个元素处理以后会将结果打平/压扁到一个更大的容器当中。map_rdd init_rdd.map(lambda line:line.split())# print(调用map算子后的内容,map_rdd.take(10))# flatmap_rdd init_rdd.flatMap(lambda line: line.split())# print(调用flatMap算子后的内容,flatmap_rdd.take(10))# 过滤掉每行中没有6个字段的数据filter_rdd map_rdd.filter(lambda line_list: len(line_list)6)# 数据结构转换为了演示而演示etl_rdd filter_rdd.map(lambda line_list:(line_list[0],line_list[1],line_list[2][1:-1], # 省略前后的中括号line_list[3],line_list[4],line_list[5]))# 先缓存 etl_rdd.persist(storageLevelStorageLevel.MEMORY_AND_DISK)# 再checkpointetl_rdd.checkpoint()# 最后调用count算子一同触发etl_rdd.count()print(ETL处理后数据条数, etl_rdd.count())# 3.2- 需求一统计每个关键词出现了多少次# top10_keyword()# 3.3- 需求二统计每个用户每个搜索内容点击的次数content()time.sleep(1000)# 5- 释放资源sc.stop()DAG有向无环图 Spark内核调度 RDD的依赖 RDD依赖一个RDD的形成可能是由一个或者多个RDD得到的此时这个RDD和之前的RDD之间产生依赖关系。 在Spark中RDD之间的依赖关系主要有二种类型 窄依赖 作用: 能够让Spark程序并行计算。也就是一个分区数据计算出现问题以后其他的分区计算不受到任何影响特点: 父RDD的分区和子RDD的分区关系是一对一的关系。 也就是父RDD分区的数据会整个被下游子RDD的分区接收宽依赖 作用: 划分Stage的重要依据。宽依赖也叫做Shuffle依赖 特点: 父RDD的分区和子RDD的分区关系是一对多的关系。也就是父RDD的分区数据会被分成多份给到下游子RDD的多个分区所接收。注意: 如果有宽依赖shuffle下游的其他操作必须等待shuffle执行完成以后才能够继续执行。为了避免数据不完整在实际使用中不需要纠结哪些算子会存在shuffle以需求为目标**。虽然shuffle的存在会影响一定的效率, 但是以完成任务为准则**该用那个算子就使用那个算子即可不要过分纠结。 算子中一般以ByKey结尾的会发生shuffle另外是重分区算子会发生shuffleDAG和Stage DAG有向无环图主要描述一段执行任务从开始一直往下走不允许出现回调操作 Spark应用程序中遇到一个Action算子就会触发形成一个Job任务的产生。 对于每一个Job的任务都会产生一个DAG执行流程图那么这个流程图是如何形成的呢? 层级关系 1- 一个Spark应用程序 - 遇到一个Action算子就会触发形成一个Job任务 2- 一个Job任务只有一个DAG有向无环图 3- 一个DAG有向无环图 - 有多个Stage 4- 一个Stage - 有多个Task线程 5- 一个RDD - 有多个分区 6- 一个分区会被一个Task线程所处理DAG执行流程图形成和Stage划分 1- Spark应用程序遇到Action算子后就会触发一个Job任务的产生。Job任务会将它所依赖的所有算子全部加载进来形成一个Stage2- 接着从Action算子从后往前进行回溯遇到窄依赖就将算子放在同一个Stage当中如果遇到宽依赖就划分形成新的Stage。最后一直回溯完成细化剖析Stage内部的流程 默认并行度的值确认: 因为是使用textFile读取HDFS上的文件因此RDD分区数max(文件的block块的数量, defaultMinPartition)。继续需要知道defaultMinPartition的值是多少。defaultMinPartitionmin(spark.default.parallelism,2)取最小值。最终我们确认spark.default.parallelism的参数值就能够最终确认RDD的分区数有多少个spark.default.parallelism参数值确认过程如下 1- 如果有父RDD就取父RDD的最大分区数 2- 如果没有父RDD根据集群模式进行取值2.1- 本地模式机器的最大CPU核数2.2- 了解Mesos默认是82.3- 其他模式所有执行节点上的核总数或2以较大者为准Spark Shuffle Spark中shuffle的发展历程: 1- 在1.1版本以前Spark采用Hash shuffle (优化前 和 优化后)2- 在1.1版本的时候Spark推出了Sort Shuffle3- 在1.5版本的时候Spark引入钨丝计划(优化为主)4- 在1.6版本的时候将钨丝计划合并到sortShuffle中5- 在2.0版本的时候将Hash Shuffle移除将Hash shuffle方案移植到Sort Shuffle在优化前的Hash shuffle: 存在的问题上游map端的每个Task会产生与下游Task个数相等的小文件个数。这种情况会导致上游有非常多的小文件。另外下游reduce端来拉取文件的时候会有大量的网络IO和磁盘IO过程因为要打开和读取多个小文件。经过优化后的Hash shuffle 变成了由每个Executor进程产生与下游Task个数相等的小文件数。这样可以大量减小小文件的产生以及降低下游拉取文件时候的网络IO和磁盘IO过程Sort shuffle: Sort Shuffle分成了两种: 普通机制和bypass机制。具体使用哪种由Spark底层决定。普通机制的运行过程: 每个上游Task线程处理数据数据处理完以后先放在内存中。接着对内存中的数据进行分区、排序。将内存中的数据溢写到磁盘形成一个个的小文件。溢写完成以后会将多个小文件合并成一个大的磁盘文件。并且针对每个大的磁盘文件会提供一个索引文件。接着是下游Task根据索引文件来读取相应的数据。bypass机制: 就是在普通机制的基础上省略了排序的过程bypass机制的触发条件是 1- 上游RDD的分区数量最多不能超过200个 2- 上游不能对数据进行提前聚合操作因为提前聚合需要先进行分组操作而分组的操作实际上是有排序的操作Job调度流程 主要是讨论在Driver内部是如何调度任务 1- Driver进程启动后底层PY4J创建SparkContext顶级对象。在创建该对象的过程中还会创建另外两个对象分别是: DAGScheduler和TaskScheduler DAGScheduler: DAG调度器。将Job任务形成DAG有向无环图和划分Stage的阶段 TaskScheduler: Task调度器。将Task线程分配给到具体的Executor执行 2- 一个Spark程序遇到一个Action算子就会触发产生一个Job任务。SparkContext将Job任务给到DAG调度器拿到Job任务后会将Job任务形成DAG有向无环图和划分Stage的阶段。并且会确定每个Stage阶段有多少个Task线程会将众多的Task线程放到TaskSet的集合中。DAG调度器将TaskSet集合给到Task调度器 3- Task调度器拿到TaskSet集合以后将Task分配给到给到具体的Executor执行。底层是基于SchedulerBackend调度队列来实现的。 4- Executor开始执行任务。并且Driver会监控各个Executor的执行状态直到所有的Executor执行完成就认为任务运行结束 5- 后续过程和之前一样 Spark RDD 并行度 整个Spark应用中影响并行度的因素有以下两个原因: 1- 资源的并行度: Executor数量 和 CPU核心数 以及 内存的大小2- 数据的并行度: Task的线程数 和 分区数量 一般将Task线程数设置为CPU核数的2-3倍。另外每个线程分配3-5GB的内存资源。如何设置并行度: 说明: spark.default.parallelism该参数是SparkCore中的参数。该参数只会影响shuffle以后的分区数量。 另外该参数对parallelize并行化本地集合创建的RDD不起作用。
http://www.pierceye.com/news/269089/

相关文章:

  • 06628 网页制作与网站建设深圳建筑人才网为什么电脑打不开
  • 企业网站建设方讯快速建站代理
  • 全面的基础微网站开发wordpress首页插件
  • 陕西省住房和城乡建设厅网站上怎么打印证书中盛客户管理软件
  • html网站标题怎么做的国外免费推广平台有哪些
  • 网站制作com cn域名有什么区别网站制作哪家好
  • 平湖网站设计北京广告公司名录
  • 不良网站进入窗口免费正能量安全的南昌网站制作
  • 商品交换电子商务网站开发网站首页制作公司
  • wordpress全站备份建设网站和推广
  • 广州市官网网站建设哪家好上海营销型网站建设公司
  • 江山网站制作瑞安自适应网站建设
  • 生意网官方网站高端建设网站
  • 公司网站建设南宁腾讯企业邮箱登录入口手机版
  • 简历网站推荐做网站公司是干什么的
  • 网站备案率是什么会展相关app和网站的建设情况
  • 南京网站设计网站建设上海网站域名备案处
  • 做网站市场分析三视觉平面设计网
  • 网站建设中++模板企业网站部署计划
  • 房产部门成立网站wordpress站内搜索次数
  • 网站建设合同管辖地广州敏城建设工程有限公司网站
  • 班级网站主页设计模板购买网站域名空间
  • 做响应式网站最大宽度景观设计公司起名
  • 有小广告的网站适合40岁女人的培训班
  • html5网站建设有什么网站用名字做图片
  • 合肥珍岛公司做网站推广怎么样关键词排名优化如何
  • 做讲课ppt的网站郑州市建设局官方网站
  • 邢台集团网站建设报价免费推广网站有哪些
  • 龙华网站建设营销推广广东东莞区号
  • 徐汇网站开发培训企业建网站报价