当前位置: 首页 > news >正文

用jsp做的购物网站做电影类网站收入怎么样

用jsp做的购物网站,做电影类网站收入怎么样,河南省建设厅网站103,如何找有需求做网站的公司大数据团队搞数据质量评测。自动化质检和监控平台是用django#xff0c;MR也是通过python实现的。(后来发现有orc压缩问题#xff0c;python不知道怎么解决#xff0c;正在改成java版本) 这里展示一个python编写MR的例子吧。 抄一句话#xff1a;Hadoop Streaming是Hadoop提…大数据团队搞数据质量评测。自动化质检和监控平台是用djangoMR也是通过python实现的。(后来发现有orc压缩问题python不知道怎么解决正在改成java版本) 这里展示一个python编写MR的例子吧。 抄一句话Hadoop Streaming是Hadoop提供的一个编程工具它允许用户使用任何可执行文件或者脚本文件作为Mapper和Reducer。   1、首先先介绍一下背景我们的数据是存放在hive里的。hive建表语句如下 我们将会解析元数据和HDFS上的数据进行merge方便处理。这里的partition_key用的是year/month/day。 hive (gulfstream_ods) desc g_order; OK col_name data_type comment order_id bigint 订单id driver_id bigint 司机id司机抢单前该值为0 driver_phone string 司机电话 passenger_id bigint 乘客id passenger_phone string 乘客电话 car_id int 接驾车辆id area int 城市id district string 城市区号 type int 订单时效0 实时 1预约 current_lng decimal(19,6) 乘客发单时的经度 current_lat decimal(19,6) 乘客发单时的纬度 starting_name string 起点名称 starting_lng decimal(19,6) 起点经度 starting_lat decimal(19,6) 起点纬度 dest_name string 终点名称 dest_lng decimal(19,6) 终点经度 dest_lat decimal(19,6) 终点纬度 driver_start_distance int 司机与出发地的路面距离单位米 start_dest_distance int 出发地与终点的路面距离单位米 departure_time string 出发时间预约单的预约时间实时单为发单时间 strive_time string 抢单成功时间 consult_time string 协商时间 arrive_time string 司机点击‘我已到达’的时间 setoncar_time string 上车时间暂时不用 begin_charge_time string 司机点机‘开始计费’的时间 finish_time string 完成时间 year string month string day string # Partition Information # col_name data_type comment year string month string day string   2、我们解析元数据 这里是解析元数据的过程。之后我们把元数据序列化后存入文件desc.gulfstream_ods.g_order我们将会将此配置文件连同MR脚本一起上传到hadoop集群。 import subprocess from subprocess import Popendef desc_table(db, table):process Popen(hive -e desc %s.%s % (db, table),shellTrue, stdoutsubprocess.PIPE, stderrsubprocess.PIPE)stdout, stderr process.communicate()is_column Truestructure_list list()column_list list()for line in stdout.split(\n):value_list list()if not line or len(line.split()) 2:breakif is_column:column_list line.split()is_column Falsecontinueelse:value_list line.split()structure_dict dict(zip(column_list, value_list))structure_list.append(structure_dict)return structure_list   3、下面是hadoop streaming执行脚本。 #!/bin/bashsource /etc/profilesource ~/.bash_profile#hadoop目录echo HADOOP_HOME: $HADOOP_HOMEHADOOP$HADOOP_HOME/bin/hadoopDB$1TABLE$2YEAR$3MONTH$4DAY$5echo $DB--$TABLE--$YEAR--$MONTH--$DAYif [ $DB gulfstream_ods ]then DB_NAMEgulfstreamelse DB_NAME$DBfiTABLE_NAME$TABLE#输入路径input_path/user/xiaoju/data/bi/$DB_NAME/$TABLE_NAME/$YEAR/$MONTH/$DAY/*#标记文件后缀名input_mark_SUCCESSecho $input_path#输出路径output_path/user/bigdata-t/QA/yangfan/$DB_NAME/$TABLE_NAME/$YEAR/$MONTH/$DAYoutput_mark_SUCCESSecho $output_path#性能约束参数capacity_mapper500capacity_reducer200map_num10reducer_num10queue_nameroot.dashujudidiyanjiuyuan-zhinengpingtaibu.datapolicy-develop#启动job namejob_nameDW_Monitor_${DB_NAME}_${TABLE_NAME}_${YEAR}${MONTH}${DAY}mapperpython mapper.py $DB $TABLE_NAMEreducerpython reducer.py$HADOOP fs -rmr $output_path$HADOOP jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.7.2.jar \ -jobconf mapred.job.name$job_name \ -jobconf mapred.job.queue.name$queue_name \ -jobconf mapred.map.tasks$map_num \ -jobconf mapred.reduce.tasks$reducer_num \ -jobconf mapred.map.capacity$capacity_mapper \ -jobconf mapred.reduce.capacity$capacity_reducer \ -input $input_path \ -output $output_path \ -file ./mapper.py \ -file ./reducer.py \ -file ./utils.py \ -file ./desc.${DB}.${TABLE_NAME} \ -mapper $mapper \ -reducer $reducerif [ $? -ne 0 ]; then echo $DB_NAME $TABLE_NAME $YEAR $MONTH $DAY run faildfi$HADOOP fs -touchz ${output_path}/$output_markrm -rf ./${DB_NAME}.${TABLE_NAME}.${YEAR}-${MONTH}-${DAY}$HADOOP fs -get $output_path/part-00000 ./${DB_NAME}.${TABLE_NAME}.${YEAR}-${MONTH}-${DAY}    4、这里是Wordcount的进阶版本第一个功能是分区域统计订单量第二个功能是在一天中分时段统计订单量。 mapper脚本 # -*- coding:utf-8 -*- #!/usr/bin/env python import sys import json import pickle reload(sys) sys.setdefaultencoding(utf-8)# 将字段和元数据匹配, 返回迭代器 def read_from_input(file, separator, columns):for line in file:if line is None or line :continuedata_list mapper_input(line, separator)if not data_list:continueitem None# 最后3列, 年月日作为partitionkey, 无用if len(data_list) len(columns) - 3:item dict(zip(columns, data_list))elif len(data_list) len(columns):item dict(zip(columns, data_list))if not item:continueyield itemdef index_columns(db, table):with open(desc.%s.%s % (db, table), r) as fr:structure_list deserialize(fr.read())return [column.get(col_name) for column in structure_list]# map入口 def main(separator, columns):items read_from_input(sys.stdin, separator, columns)mapper_result {}for item in items:mapper_plugin_1(item, mapper_result)mapper_plugin_2(item, mapper_result) def mapper_plugin_1(item, mapper_result):# key在现实中可以是不同appkey, 是用来分发到不同的reducer上的, 相同的route用来分发到相同的reducerkey route1area item.get(area)district item.get(district)order_id item.get(order_id)if not area or not district or not order_id:returnmapper_output(key, {area: area, district: district, order_id: order_id, count: 1})def mapper_plugin_2(item, mapper_result):key route2strive_time item.get(strive_time)order_id item.get(order_id)if not strive_time or not order_id:returntry:day_hour strive_time.split(:)[0]mapper_output(key, {order_id: order_id, strive_time: strive_time, count: 1, day_hour: day_hour})except Exception, ex:passdef serialize(data, typejson):if type json:try:return json.dumps(data)except Exception, ex:return elif type pickle:try:return pickle.dumps(data)except Exception, ex:return else:return def deserialize(data, typejson):if type json:try:return json.loads(data)except Exception, ex:return []elif type pickle:try:return pickle.loads(data)except Exception, ex:return []else:return []def mapper_input(line, separator\t):try:return line.split(separator)except Exception, ex:return Nonedef mapper_output(key, data, separator\t):key str(key)data serialize(data)print %s%s%s % (key, separator, data)# print sys.stderr, %s%s%s % (key, separator, data)if __name__ __main__:db sys.argv[1]table sys.argv[2]columns index_columns(db, table)main(||, columns) reducer脚本 #!/usr/bin/env python # vim: set fileencodingutf-8 import sys reload(sys) sys.setdefaultencoding(utf-8) import json import pickle from itertools import groupby from operator import itemgetterdef read_from_mapper(file, separator):for line in file:yield reducer_input(line)def main(separator\t):reducer_result {}line_list read_from_mapper(sys.stdin, separator)for route_key, group in groupby(line_list, itemgetter(0)):if route_key is None:continuereducer_result.setdefault(route_key, {})if route_key route1:reducer_plugin_1(route_key, group, reducer_result)reducer_output(route_key, reducer_result[route_key])if route_key route2:reducer_plugin_2(route_key, group, reducer_result)reducer_output(route_key, reducer_result[route_key]) def reducer_plugin_1(route_key, group, reducer_result):for _, data in group:if data is None or len(data) 0:continueif not data.get(area) or not data.get(district) or not data.get(count):continuekey _.join([data.get(area), data.get(district)])reducer_result[route_key].setdefault(key, 0)reducer_result[route_key][key] int(data.get(count))# print sys.stderr, %s % json.dumps(reducer_result[route_key])def reducer_plugin_2(route_key, group, reducer_result):for _, data in group:if data is None or len(data) 0:continueif not data.get(order_id) or not data.get(strive_time) or not data.get(count) or not data.get(day_hour):continuekey data.get(day_hour)reducer_result[route_key].setdefault(key, {})reducer_result[route_key][key].setdefault(count, 0)reducer_result[route_key][key].setdefault(order_list, [])reducer_result[route_key][key][count] int(data.get(count))if len(reducer_result[route_key][key][order_list]) 100:reducer_result[route_key][key][order_list].append(data.get(order_id))# print sys.stderr, %s % json.dumps(reducer_result[route_key]) def serialize(data, typejson):if type json:try:return json.dumps(data)except Exception, ex:return elif type pickle:try:return pickle.dumps(data)except Exception, ex:return else:return def deserialize(data, typejson):if type json:try:return json.loads(data)except Exception, ex:return []elif type pickle:try:return pickle.loads(data)except Exception, ex:return []else:return []def reducer_input(data, separator\t):data_list data.strip().split(separator, 2)key data_list[0]data deserialize(data_list[1])return [key, data]def reducer_output(key, data, separator\t):key str(key)data serialize(data)print %s\t%s % (key, data)# print sys.stderr, %s\t%s % (key, data)if __name__ __main__:main()   5、上一个版本遭遇了reduce慢的情况原因有两个一是因为route的设置所有相同的route都将分发到同一个reducer造成单个reducer处理压力大性能下降。二是因为集群是搭建在虚拟机上的性能本身就差。可以对这个问题进行改进。改进版本如下方案是在mapper阶段先对数据进行初步的统计缓解reducer的计算压力。 mapper脚本 # -*- coding:utf-8 -*- #!/usr/bin/env python import sys import json import pickle reload(sys) sys.setdefaultencoding(utf-8)# 将字段和元数据匹配, 返回迭代器 def read_from_input(file, separator, columns):for line in file:if line is None or line :continuedata_list mapper_input(line, separator)if not data_list:continueitem None# 最后3列, 年月日作为partitionkey, 无用if len(data_list) len(columns) - 3:item dict(zip(columns, data_list))elif len(data_list) len(columns):item dict(zip(columns, data_list))if not item:continueyield itemdef index_columns(db, table):with open(desc.%s.%s % (db, table), r) as fr:structure_list deserialize(fr.read())return [column.get(col_name) for column in structure_list]# map入口 def main(separator, columns):items read_from_input(sys.stdin, separator, columns)mapper_result {}for item in items:mapper_plugin_1(item, mapper_result)mapper_plugin_2(item, mapper_result)for route_key, route_value in mapper_result.iteritems():for key, value in route_value.iteritems():ret_dict dict()ret_dict[route_key] route_keyret_dict[key] keyret_dict.update(value)mapper_output(route_total, ret_dict)def mapper_plugin_1(item, mapper_result):# key在现实中可以是不同appkey, 是用来分发到不同的reducer上的, 相同的route用来分发到相同的reducerkey route1area item.get(area)district item.get(district)order_id item.get(order_id)if not area or not district or not order_id:returntry:# total统计mapper_result.setdefault(key, {})mapper_result[key].setdefault(_.join([area, district]), {})mapper_result[key][_.join([area, district])].setdefault(count, 0)mapper_result[key][_.join([area, district])].setdefault(order_id, [])mapper_result[key][_.join([area, district])][count] 1if len(mapper_result[key][_.join([area, district])][order_id]) 10:mapper_result[key][_.join([area, district])][order_id].append(order_id)except Exception, ex:passdef mapper_plugin_2(item, mapper_result):key route2strive_time item.get(strive_time)order_id item.get(order_id)if not strive_time or not order_id:returntry:day_hour strive_time.split(:)[0]# total统计mapper_result.setdefault(key, {})mapper_result[key].setdefault(day_hour, {})mapper_result[key][day_hour].setdefault(count, 0)mapper_result[key][day_hour].setdefault(order_id, [])mapper_result[key][day_hour][count] 1if len(mapper_result[key][day_hour][order_id]) 10:mapper_result[key][day_hour][order_id].append(order_id)except Exception, ex:passdef serialize(data, typejson):if type json:try:return json.dumps(data)except Exception, ex:return elif type pickle:try:return pickle.dumps(data)except Exception, ex:return else:return def deserialize(data, typejson):if type json:try:return json.loads(data)except Exception, ex:return []elif type pickle:try:return pickle.loads(data)except Exception, ex:return []else:return []def mapper_input(line, separator\t):try:return line.split(separator)except Exception, ex:return Nonedef mapper_output(key, data, separator\t):key str(key)data serialize(data)print %s%s%s % (key, separator, data)# print sys.stderr, %s%s%s % (key, separator, data)if __name__ __main__:db sys.argv[1]table sys.argv[2]columns index_columns(db, table)main(||, columns) reducer脚本 #!/usr/bin/env python # vim: set fileencodingutf-8 import sys reload(sys) sys.setdefaultencoding(utf-8) import json import pickle from itertools import groupby from operator import itemgetterdef read_from_mapper(file, separator):for line in file:yield reducer_input(line)def main(separator\t):reducer_result {}line_list read_from_mapper(sys.stdin, separator)for route_key, group in groupby(line_list, itemgetter(0)):if route_key is None:continuereducer_result.setdefault(route_key, {})if route_key route_total:reducer_total(route_key, group, reducer_result)reducer_output(route_key, reducer_result[route_key])def reducer_total(route_key, group, reducer_result):for _, data in group:if data is None or len(data) 0:continueif data.get(route_key) route1:reducer_result[route_key].setdefault(data.get(route_key), {})reducer_result[route_key][data.get(key)].setdefault(count, 0)reducer_result[route_key][data.get(key)].setdefault(order_id, [])reducer_result[route_key][data.get(key)][count] data.get(count)for order_id in data.get(order_id):if len(reducer_result[route_key][data.get(key)][order_id]) 10:reducer_result[route_key][data.get(key)][order_id].append(order_id)elif data.get(route_key) route2:reducer_result[route_key].setdefault(data.get(route_key), {})reducer_result[route_key][data.get(key)].setdefault(count, 0)reducer_result[route_key][data.get(key)].setdefault(order_id, [])reducer_result[route_key][data.get(key)][count] data.get(count)for order_id in data.get(order_id):if len(reducer_result[route_key][data.get(key)][order_id]) 10:reducer_result[route_key][data.get(key)][order_id].append(order_id)else:passdef serialize(data, typejson):if type json:try:return json.dumps(data)except Exception, ex:return elif type pickle:try:return pickle.dumps(data)except Exception, ex:return else:return def deserialize(data, typejson):if type json:try:return json.loads(data)except Exception, ex:return []elif type pickle:try:return pickle.loads(data)except Exception, ex:return []else:return []def reducer_input(data, separator\t):data_list data.strip().split(separator, 2)key data_list[0]data deserialize(data_list[1])return [key, data]def reducer_output(key, data, separator\t):key str(key)data serialize(data)print %s\t%s % (key, data)# print sys.stderr, %s\t%s % (key, data)if __name__ __main__:main()   遇到的问题 1、The DiskSpace /user/bigdata/qa quota of  is exceeded 在reducer结束后遭遇如上问题是因为HDFS  路径下的disk容量已经被沾满释放容量即可;  转载于:https://www.cnblogs.com/kangoroo/p/6151104.html
http://www.pierceye.com/news/531106/

相关文章:

  • 印度做网站wordpress 锁定地址
  • 做网站的服务器带宽一般多少游戏开发培训机构
  • 网站设计制作培训微信开放平台文档
  • 私人申请建设网站多少钱html如何建网站
  • 网站怎么在微博推广石家庄模板建站平台
  • 贵阳网站开发方舟网络wordpress静态化链接
  • 如何建设一个公司网站英文网站建设多少钱
  • 国外做水广告网站大全app开发公司查询
  • 苏州商城网站制作免费下载ppt模板的网站有哪些
  • 北京智能网站建设企业wordpress 找源码
  • 无锡网站维护公司wordpress 目录排序
  • 自己搭建的ftp怎么做网站装修公司展厅效果图
  • 做网站手机验证收费吗百度竞价推广是什么工作
  • 电商网站 案例熊掌号怎么域名做网站
  • 做网站怎么改关键词安卓开发软件工具
  • 做SEO公司多给网站wordpress 固定链接 无法访问
  • 潍坊百度网站优化网站建设相关文章
  • 做学术研究的网站怎样建设个人游戏网站
  • dede淘宝客网站网站页面优化简单吗
  • 长春做网站优化的公司赣州做网站公司哪家好
  • 网站开发宝典做网站属于软件开发吗
  • 网站建设要求 优帮云福州模板建站定制网站
  • wordpress本地更换为网站域名jsp网站开发书籍
  • 做一个网站的流程沧州网站建设
  • 山东省城乡住房建设厅网站住房建设部网站监理员
  • 怎么做百度网站验证保健品商城网站模板
  • 丹东市做网站广东做网站的公司
  • 网站收录大全销售推广
  • 网站发展历程东莞企业网站建设制作
  • 厦门市建设局查询保障摇号网站首页做房产网站长