怎样加强企业网站建设,网络营销的现状,网站开发应聘信息,wordpress创建知识库这篇文章继续进行有关使用MapReduce进行数据密集型处理的书中实现算法的系列文章。 第一部分可以在这里找到。 在上一篇文章中#xff0c;我们讨论了使用本地聚合技术来减少通过网络进行混洗和传输的数据量的方法。 减少传输的数据量是提高MapReduce作业效率的主要方法之一。 … 这篇文章继续进行有关使用MapReduce进行数据密集型处理的书中实现算法的系列文章。 第一部分可以在这里找到。 在上一篇文章中我们讨论了使用本地聚合技术来减少通过网络进行混洗和传输的数据量的方法。 减少传输的数据量是提高MapReduce作业效率的主要方法之一。 单词计数MapReduce作业用于演示本地聚合。 由于结果只需要总数因此我们可以为合并器重新使用相同的化简器因为更改加数的顺序或分组不会影响总和。 但是如果您想要平均水平呢 然后由于计算平均值的平均值不等于原始数字集的平均值因此相同的方法将行不通。 尽管有了一点见识我们仍然可以使用本地聚合。 对于这些示例我们将使用Hadoop最终指南书中使用的NCDC天气数据集的示例。 我们将计算1901年每个月的平均温度。可以在MapReduce的数据密集型处理的第3.1.3章中找到组合器和映射器内组合选项的平均值算法。 一种尺寸并不适合所有人 上一次我们介绍了两种用于在MapReduce作业中减少数据的方法Hadoop组合器和映射器内组合方法。 Hadoop框架将组合器视为一种优化并且无法保证调用组合器的次数如果有的话。 结果映射器必须以减速器期望的形式发出数据因此如果不涉及组合器则最终结果不会更改。 要针对计算平均值进行调整我们需要返回到映射器并更改其输出。 映射器更改 在单词计数示例中未优化的映射器仅发出单词和1的计数。合并器和映射器内组合映射器通过将每个单词保留为哈希映射中的键总计数为n来优化此输出。值。 每次看到一个单词计数都将增加1。在这种设置下如果未调用组合器则缩减器将接收到该单词作为键并将一长串的1s加在一起从而得到相同的输出当然使用映射器内组合映射器可以避免此问题因为可以保证合并结果是映射器代码的一部分。 为了计算平均值我们将使基本映射器发出一个字符串键将天气观测的年和月连接在一起和一个自定义可写对象称为TemperatureAveragingPair。 TemperatureAveragingPair对象将包含两个数字IntWritables获取的温度和一个计数。 我们将从Hadoop权威指南中获取MaximumTemperatureMapper并以此为灵感来创建AverageTemperatureMapper public class AverageTemperatureMapper extends MapperLongWritable, Text, Text, TemperatureAveragingPair {//sample line of weather data//002902907099999190101010600464333023450FM-12000599999V0202701N015919999999N0000001N9-0078199999102001ADDGF10899199999999999private Text outText new Text();private TemperatureAveragingPair pair new TemperatureAveragingPair();private static final int MISSING 9999;Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line value.toString();String yearMonth line.substring(15, 21);int tempStartPosition 87;if (line.charAt(tempStartPosition) ) {tempStartPosition 1;}int temp Integer.parseInt(line.substring(tempStartPosition, 92));if (temp ! MISSING) {outText.set(yearMonth);pair.set(temp, 1);context.write(outText, pair);}}
} 通过使映射器输出键和TemperatureAveragingPair对象无论调用组合器如何我们的MapReduce程序都可以保证具有正确的结果。 合路器 我们需要减少发送的数据量因此我们将对温度求和对计数求和并分别存储。 这样我们将减少发送的数据但保留计算正确平均值所需的格式。 如果/在调用组合器时它将采用所有传入的TemperatureAveragingPair对象并为同一键发出单个TemperatureAveragingPair对象其中包含温度和计数值的总和。 这是合并器的代码 public class AverageTemperatureCombiner extends ReducerText,TemperatureAveragingPair,Text,TemperatureAveragingPair {private TemperatureAveragingPair pair new TemperatureAveragingPair();Overrideprotected void reduce(Text key, IterableTemperatureAveragingPair values, Context context) throws IOException, InterruptedException {int temp 0;int count 0;for (TemperatureAveragingPair value : values) {temp value.getTemp().get();count value.getCount().get();}pair.set(temp,count);context.write(key,pair);}
} 但是我们非常有兴趣确保我们减少了发送到reducer的数据量因此我们将看看下一步如何实现。 在Mapper合并平均值中 类似于单词计数示例为了计算平均值映射器内组合映射器将使用哈希图将连接的年月作为键将TemperatureAveragingPair作为值。 每次获得相同的年月组合时我们都会将对对象从地图中取出添加温度并将计数增加一个。 调用cleanup方法后我们将发出所有对及其各自的键 public class AverageTemperatureCombiningMapper extends MapperLongWritable, Text, Text, TemperatureAveragingPair {//sample line of weather data//002902907099999190101010600464333023450FM-12000599999V0202701N015919999999N0000001N9-0078199999102001ADDGF10899199999999999private static final int MISSING 9999;private MapString,TemperatureAveragingPair pairMap new HashMapString,TemperatureAveragingPair();Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line value.toString();String yearMonth line.substring(15, 21);int tempStartPosition 87;if (line.charAt(tempStartPosition) ) {tempStartPosition 1;}int temp Integer.parseInt(line.substring(tempStartPosition, 92));if (temp ! MISSING) {TemperatureAveragingPair pair pairMap.get(yearMonth);if(pair null){pair new TemperatureAveragingPair();pairMap.put(yearMonth,pair);}int temps pair.getTemp().get() temp;int count pair.getCount().get() 1;pair.set(temps,count);}}Overrideprotected void cleanup(Context context) throws IOException, InterruptedException {SetString keys pairMap.keySet();Text keyText new Text();for (String key : keys) {keyText.set(key);context.write(keyText,pairMap.get(key));}}
} 通过遵循在映射调用之间跟踪数据的相同模式我们可以通过实现映射器内合并策略来实现可靠的数据减少。 同样的注意事项适用于在对映射器的所有调用中保持状态但是考虑使用这种方法可以提高处理效率这是值得考虑的。 减速器 在这一点上编写我们的reducer很容易为每个键列出一对配对将所有温度和计数求和然后将温度的总和除以计数的总和。 public class AverageTemperatureReducer extends ReducerText, TemperatureAveragingPair, Text, IntWritable {private IntWritable average new IntWritable();Overrideprotected void reduce(Text key, IterableTemperatureAveragingPair values, Context context) throws IOException, InterruptedException {int temp 0;int count 0;for (TemperatureAveragingPair pair : values) {temp pair.getTemp().get();count pair.getCount().get();}average.set(temp / count);context.write(key, average);}
} 结果 使用合并器和映射器内合并映射器选项可以预测结果从而显着减少数据输出。 未优化的映射器选项 12/10/10 23:05:28 INFO mapred.JobClient: Reduce input groups12
12/10/10 23:05:28 INFO mapred.JobClient: Combine output records0
12/10/10 23:05:28 INFO mapred.JobClient: Map input records6565
12/10/10 23:05:28 INFO mapred.JobClient: Reduce shuffle bytes111594
12/10/10 23:05:28 INFO mapred.JobClient: Reduce output records12
12/10/10 23:05:28 INFO mapred.JobClient: Spilled Records13128
12/10/10 23:05:28 INFO mapred.JobClient: Map output bytes98460
12/10/10 23:05:28 INFO mapred.JobClient: Total committed heap usage (bytes)269619200
12/10/10 23:05:28 INFO mapred.JobClient: Combine input records0
12/10/10 23:05:28 INFO mapred.JobClient: Map output records6564
12/10/10 23:05:28 INFO mapred.JobClient: SPLIT_RAW_BYTES108
12/10/10 23:05:28 INFO mapred.JobClient: Reduce input records6564 组合器选项 12/10/10 23:07:19 INFO mapred.JobClient: Reduce input groups12
12/10/10 23:07:19 INFO mapred.JobClient: Combine output records12
12/10/10 23:07:19 INFO mapred.JobClient: Map input records6565
12/10/10 23:07:19 INFO mapred.JobClient: Reduce shuffle bytes210
12/10/10 23:07:19 INFO mapred.JobClient: Reduce output records12
12/10/10 23:07:19 INFO mapred.JobClient: Spilled Records24
12/10/10 23:07:19 INFO mapred.JobClient: Map output bytes98460
12/10/10 23:07:19 INFO mapred.JobClient: Total committed heap usage (bytes)269619200
12/10/10 23:07:19 INFO mapred.JobClient: Combine input records6564
12/10/10 23:07:19 INFO mapred.JobClient: Map output records6564
12/10/10 23:07:19 INFO mapred.JobClient: SPLIT_RAW_BYTES108
12/10/10 23:07:19 INFO mapred.JobClient: Reduce input records12 映射器内合并选项 12/10/10 23:09:09 INFO mapred.JobClient: Reduce input groups12
12/10/10 23:09:09 INFO mapred.JobClient: Combine output records0
12/10/10 23:09:09 INFO mapred.JobClient: Map input records6565
12/10/10 23:09:09 INFO mapred.JobClient: Reduce shuffle bytes210
12/10/10 23:09:09 INFO mapred.JobClient: Reduce output records12
12/10/10 23:09:09 INFO mapred.JobClient: Spilled Records24
12/10/10 23:09:09 INFO mapred.JobClient: Map output bytes180
12/10/10 23:09:09 INFO mapred.JobClient: Total committed heap usage (bytes)269619200
12/10/10 23:09:09 INFO mapred.JobClient: Combine input records0
12/10/10 23:09:09 INFO mapred.JobClient: Map output records12
12/10/10 23:09:09 INFO mapred.JobClient: SPLIT_RAW_BYTES108
12/10/10 23:09:09 INFO mapred.JobClient: Reduce input records12 计算结果 注意示例文件中的温度以摄氏度* 10为单位 未优化 合路器 映射器内合并器映射器 190101 -25 190102 -91 190103 -49 190104 22 190105 76 190106 146 190107 192 190108 170 190109 114 190110 86 190111 -16 190112 -77 190101 -25 190102 -91 190103 -49 190104 22 190105 76 190106 146 190107 192 190108 170 190109 114 190110 86 190111 -16 190112 -77 190101 -25 190102 -91 190103 -49 190104 22 190105 76 190106 146 190107 192 190108 170 190109 114 190110 86 190111 -16 190112 -77 结论 对于简单的情况可以将reducer重用为组合器和更复杂的情况对于如何构造数据同时仍能从本地聚集数据以提高处理效率有所了解我们已经介绍了本地聚集。 进一步阅读 Jimmy Lin和Chris Dyer 使用MapReduce进行的数据密集型处理 Hadoop Tom White 的权威指南 来自博客的源代码 Hadoop API MRUnit用于单元测试Apache Hadoop映射减少工作 Gutenberg项目提供了大量纯文本格式的书籍非常适合在本地测试Hadoop作业。 参考 使用MapReduce进行数据密集型文本处理-本地聚合第二部分来自我们的JCG合作伙伴 Bill Bejeck来自“ 随机编码思考”博客。 翻译自: https://www.javacodegeeks.com/2012/10/mapreduce-working-through-data-2.html