当前位置: 首页 > news >正文

外国做动漫图片的网站叫什么名字网站建设家乡网页设计模板

外国做动漫图片的网站叫什么名字,网站建设家乡网页设计模板,公司网站建设周期及费用,网站导航栏固定1、读取json格式的文件创建DataFrame 注意#xff1a; 1、可以两种方式读取json格式的文件。 2、df.show()默认显示前20行数据。 3、DataFrame原生API可以操作DataFrame。 4、注册成临时表时#xff0c;表中的列默认按ascii顺序显示列。 df.createTempView(mytab…1、读取json格式的文件创建DataFrame 注意 1、可以两种方式读取json格式的文件。 2、df.show()默认显示前20行数据。 3、DataFrame原生API可以操作DataFrame。 4、注册成临时表时表中的列默认按ascii顺序显示列。 df.createTempView(mytable) df.createOrReplaceTempView(mytable) df.createGlobalTempView(mytable) df.createOrReplaceGlobalTempView(mytable) Session.sql(select * from global_temp.mytable).show() 5、DataFrame是一个Row类型的RDDdf.rdd()/df.javaRdd()。 java SparkConf conf new SparkConf(); conf.setMaster(local).setAppName(jsonfile); SparkContext sc new SparkContext(conf);//创建sqlContext SQLContext sqlContext new SQLContext(sc);/*** DataFrame的底层是一个一个的RDD RDD的泛型是Row类型。* 以下两种方式都可以读取json格式的文件*/ DataFrame df sqlContext.read().format(json).load(sparksql/json); // DataFrame df2 sqlContext.read().json(sparksql/json.txt); // df2.show();/*** DataFrame转换成RDD*/ RDDRow rdd df.rdd(); /*** 显示 DataFrame中的内容默认显示前20行。如果现实多行要指定多少行show(行数)* 注意当有多个列时显示的列先后顺序是按列的ascii码先后显示。*/ // df.show(); /*** 树形的形式显示schema信息*/ df.printSchema(); /*** dataFram自带的API 操作DataFrame*///select name from table// df.select(name).show();//select name age10 as addage from tabledf.select(df.col(name),df.col(age).plus(10).alias(addage)).show();//select name ,age from table where age19df.select(df.col(name),df.col(age)).where(df.col(age).gt(19)).show();//select count(*) from table group by agedf.groupBy(df.col(age)).count().show();/*** 将DataFrame注册成临时的一张表这张表临时注册到内存中是逻辑上的表不会雾化到磁盘*/df.registerTempTable(jtable);DataFrame sql sqlContext.sql(select age,count(1) from jtable group by age);DataFrame sql2 sqlContext.sql(select * from jtable);sc.stop(); scala: 1.val session SparkSession.builder().appName(jsonData).master(local).getOrCreate() 2.// val frame: DataFrame session.read.json(./data/json) 3.val frame session.read.format(json).load(./data/json) 4.frame.show(100) 5.frame.printSchema() 6. 7./** 8.* DataFrame API 操作 9.*/ 10.//select name ,age from table 11.frame.select(name,age).show(100) 12. 13.//select name,age 10 as addage from table 14.frame.select(frame.col(name),frame.col(age).plus(10).as(addage)).show(100) 15. 16.//select name,age from table where age 19 17.frame.select(name,age).where(frame.col(age).(19)).show(100) 18.frame.filter(age19).show(100) 19. 20.//select name ,age from table order by name asc ,age desc 21.import session.implicits._ 22.frame.sort($name.asc,frame.col(age).desc).show(100) 23. 24.//select name ,age from table where age is not null 25.frame.filter(age is not null).show() 26. 27./** 28.* 创建临时表 29.*/ 30.frame.createTempView(mytable) 31.session.sql(select name ,age from mytable where age 19).show() 32.frame.createOrReplaceTempView(mytable) 33.frame.createGlobalTempView(mytable) 34.frame.createOrReplaceGlobalTempView(mytable) 35. 36./** 37.* dataFrame 转换成RDD 38.*/ 39.val rdd: RDD[Row] frame.rdd 40.rdd.foreach(row{ 41. val name row.getAs[String](name) 42. val age row.getAs[Long](age) 43. println(sname is $name ,age is $age) 44.}) 2、通过json格式的RDD创建DataFrame java: SparkConf conf new SparkConf(); conf.setMaster(local).setAppName(jsonRDD); JavaSparkContext sc new JavaSparkContext(conf); SQLContext sqlContext new SQLContext(sc); JavaRDDString nameRDD sc.parallelize(Arrays.asList({\name\:\zhangsan\,\age\:\18\},{\name\:\lisi\,\age\:\19\},{\name\:\wangwu\,\age\:\20\} )); JavaRDDString scoreRDD sc.parallelize(Arrays.asList( {\name\:\zhangsan\,\score\:\100\}, {\name\:\lisi\,\score\:\200\}, {\name\:\wangwu\,\score\:\300\} ));DataFrame namedf sqlContext.read().json(nameRDD); DataFrame scoredf sqlContext.read().json(scoreRDD); namedf.registerTempTable(name); scoredf.registerTempTable(score);DataFrame result sqlContext.sql(select name.name,name.age,score.score from name,score where name.name score.name); result.show();sc.stop(); scala: 1.val session SparkSession.builder().appName(jsonData).master(local).getOrCreate() 2.val jsonList List[String]( 3. {name:zhangsan,age:18}, 4. {name:lisi,age:19}, 5. {name:wangwu,age:20}, 6. {name:maliu,age:21}, 7. {name:tainqi,age:22} 8.) 9. 10.import session.implicits._ 11.val jsds: Dataset[String] jsonList.toDS() 12.val df session.read.json(jsds) 13.df.show() 14. 15./** 16.* Spark 1.6 17.*/ 18.val jsRDD: RDD[String] session.sparkContext.parallelize(jsonList) 19.val frame: DataFrame session.read.json(jsRDD) 20.frame.show()3、非json格式的RDD创建DataFrame 1、通过反射的方式将非json格式的RDD转换成DataFrame不建议使用 自定义类要可序列化自定义类的访问级别是PublicRDD转成DataFrame后会根据映射将字段按Assci码排序将DataFrame转换成RDD时获取字段两种方式,一种是df.getInt(0)下标获取不推荐使用另一种是df.getAs(“列名”)获取推荐使用 /** * 注意 * 1.自定义类必须是可序列化的 * 2.自定义类访问级别必须是Public * 3.RDD转成DataFrame会把自定义类中字段的名称按assci码排序 */ SparkConf conf new SparkConf(); conf.setMaster(local).setAppName(RDD); JavaSparkContext sc new JavaSparkContext(conf); SQLContext sqlContext new SQLContext(sc); JavaRDDString lineRDD sc.textFile(sparksql/person.txt); JavaRDDPerson personRDD lineRDD.map(new FunctionString, Person() {/*** */private static final long serialVersionUID 1L;Overridepublic Person call(String s) throws Exception {Person p new Person();p.setId(s.split(,)[0]);p.setName(s.split(,)[1]);p.setAge(Integer.valueOf(s.split(,)[2]));return p;} }); /** * 传入进去Person.class的时候sqlContext是通过反射的方式创建DataFrame * 在底层通过反射的方式获得Person的所有field结合RDD本身就生成了DataFrame */ DataFrame df sqlContext.createDataFrame(personRDD, Person.class); df.show(); df.registerTempTable(person); sqlContext.sql(select name from person where id 2).show();/** * 将DataFrame转成JavaRDD * 注意 * 1.可以使用row.getInt(0),row.getString(1)...通过下标获取返回Row类型的数据但是要注意列顺序问题---不常用 * 2.可以使用row.getAs(列名)来获取对应的列值。 * */ JavaRDDRow javaRDD df.javaRDD(); JavaRDDPerson map javaRDD.map(new FunctionRow, Person() {/*** */private static final long serialVersionUID 1L;Overridepublic Person call(Row row) throws Exception {Person p new Person();//p.setId(row.getString(1));//p.setName(row.getString(2));//p.setAge(row.getInt(0));p.setId((String)row.getAs(id));p.setName((String)row.getAs(name));p.setAge((Integer)row.getAs(age));return p;} }); map.foreach(new VoidFunctionPerson() {/*** */private static final long serialVersionUID 1L;Overridepublic void call(Person t) throws Exception {System.out.println(t);} });sc.stop(); scala 1.case class MyPerson(id:Int,name:String,age:Int,score:Double) 2. 3.object Test { 4. def main(args: Array[String]): Unit { 5. val session SparkSession.builder().appName(jsonData).master(local).getOrCreate() 6. val peopleInfo: RDD[String] session.sparkContext.textFile(./data/people.txt) 7. val personRDD : RDD[MyPerson] peopleInfo.map(info { 8.MyPerson(info.split(,)(0).toInt,info.split(,)(1),info.split(,)(2).toInt,info.split(,)(3).toDouble) 9. }) 10. import session.implicits._ 11. val ds personRDD.toDS() 12. ds.createTempView(mytable) 13. session.sql(select * from mytable ).show() 14. } 15.} 2、动态创建Schema将非json格式的RDD转换成DataFrame java: SparkConf conf new SparkConf(); conf.setMaster(local).setAppName(rddStruct); JavaSparkContext sc new JavaSparkContext(conf); SQLContext sqlContext new SQLContext(sc); JavaRDDString lineRDD sc.textFile(./sparksql/person.txt); /*** 转换成Row类型的RDD*/ JavaRDDRow rowRDD lineRDD.map(new FunctionString, Row() {/*** */private static final long serialVersionUID 1L;Overridepublic Row call(String s) throws Exception {return RowFactory.create(String.valueOf(s.split(,)[0]),String.valueOf(s.split(,)[1]),Integer.valueOf(s.split(,)[2]));} }); /*** 动态构建DataFrame中的元数据一般来说这里的字段可以来源自字符串也可以来源于外部数据库*/ ListStructField asList Arrays.asList(DataTypes.createStructField(id, DataTypes.StringType, true),DataTypes.createStructField(name, DataTypes.StringType, true),DataTypes.createStructField(age, DataTypes.IntegerType, true) );StructType schema DataTypes.createStructType(asList); DataFrame df sqlContext.createDataFrame(rowRDD, schema);df.show(); sc.stop(); scala: 1.val session SparkSession.builder().appName(jsonData).master(local).getOrCreate() 2.val peopleInfo: RDD[String] session.sparkContext.textFile(./data/people.txt) 3. 4.val rowRDD: RDD[Row] peopleInfo.map(info { 5. val id info.split(,)(0).toInt 6. val name info.split(,)(1) 7. val age info.split(,)(2).toInt 8. val score info.split(,)(3).toDouble 9. Row(id, name, age, score) 10.}) 11.val structType: StructType StructType(Array[StructField]( 12. StructField(id, IntegerType), 13. StructField(name, StringType), 14. StructField(age, IntegerType), 15. StructField(score, DoubleType) 16.)) 17.val frame: DataFrame session.createDataFrame(rowRDD,structType) 18.frame.createTempView(mytable) 19.session.sql(select * from mytable ).show() 4、读取parquet文件创建DataFrame 注意 可以将DataFrame存储成parquet文件。保存成parquet文件的方式有两种 df.write().mode(SaveMode.Overwrite)format(parquet).save(./sparksql/parquet); df.write().mode(SaveMode.Overwrite).parquet(./sparksql/parquet); SaveMode指定文件保存时的模式。 Overwrite覆盖 Append追加 ErrorIfExists如果存在就报错 Ignore如果存在就忽略 java: SparkConf conf new SparkConf(); conf.setMaster(local).setAppName(parquet); JavaSparkContext sc new JavaSparkContext(conf); SQLContext sqlContext new SQLContext(sc); JavaRDDString jsonRDD sc.textFile(sparksql/json); DataFrame df sqlContext.read().json(jsonRDD); /*** 将DataFrame保存成parquet文件SaveMode指定存储文件时的保存模式* 保存成parquet文件有以下两种方式*/ df.write().mode(SaveMode.Overwrite).format(parquet).save(./sparksql/parquet); df.write().mode(SaveMode.Overwrite).parquet(./sparksql/parquet); df.show(); /*** 加载parquet文件成DataFrame * 加载parquet文件有以下两种方式 */DataFrame load sqlContext.read().format(parquet).load(./sparksql/parquet); load sqlContext.read().parquet(./sparksql/parquet); load.show();sc.stop(); scala: 1.val session SparkSession.builder().appName(jsonData).master(local).getOrCreate() 2.val frame: DataFrame session.read.json(./data/json) 3.frame.show() 4.frame.write.mode(SaveMode.Overwrite).parquet(./data/parquet) 5. 6.val df: DataFrame session.read.format(parquet).load(./data/parquet) 7.df.createTempView(mytable) 8.session.sql(select count(*) from mytable ).show() 5、读取JDBC中的数据创建DataFrame(MySql为例) 两种方式创建DataFrame java: SparkConf conf new SparkConf(); conf.setMaster(local).setAppName(mysql); JavaSparkContext sc new JavaSparkContext(conf); SQLContext sqlContext new SQLContext(sc); /*** 第一种方式读取MySql数据库表加载为DataFrame*/ MapString, String options new HashMapString,String(); options.put(url, jdbc:mysql://192.168.179.4:3306/spark); options.put(driver, com.mysql.jdbc.Driver); options.put(user, root); options.put(password, 123456); options.put(dbtable, person); DataFrame person sqlContext.read().format(jdbc).options(options).load(); person.show(); person.registerTempTable(person); /*** 第二种方式读取MySql数据表加载为DataFrame*/ DataFrameReader reader sqlContext.read().format(jdbc); reader.option(url, jdbc:mysql://192.168.179.4:3306/spark); reader.option(driver, com.mysql.jdbc.Driver); reader.option(user, root); reader.option(password, 123456); reader.option(dbtable, score); DataFrame score reader.load(); score.show(); score.registerTempTable(score);DataFrame result sqlContext.sql(select person.id,person.name,score.score from person,score where person.name score.name); result.show(); /*** 将DataFrame结果保存到Mysql中*/ Properties properties new Properties(); properties.setProperty(user, root); properties.setProperty(password, 123456); result.write().mode(SaveMode.Overwrite).jdbc(jdbc:mysql://192.168.179.4:3306/spark, result, properties);sc.stop(); scala: 1.val session SparkSession.builder().appName(jsonData).master(local).getOrCreate() 2. 3.val prop new Properties() 4.prop.setProperty(user,root) 5.prop.setProperty(password,123456) 6./** 7.* 第一种方式 8.*/ 9.val df1 session.read.jdbc(jdbc:mysql://192.168.179.14:3306/spark,person,prop) 10.df1.show() 11.df1.createTempView(person) 12. 13./** 14.* 第二种方式 15.*/ 16.val map Map[String,String]( 17. url - jdbc:mysql://192.168.179.14:3306/spark, 18. driver - com.mysql.jdbc.Driver, 19. user - root, 20. password - 123456, 21. dbtable - score 22.) 23.val df2 session.read.format(jdbc).options(map).load() 24.df2.show() 25. 26./** 27.* 第三种方式 28.*/ 29.val df3 session.read.format(jdbc) 30. .option(url, jdbc:mysql://192.168.179.14:3306/spark) 31. .option(driver, com.mysql.jdbc.Driver) 32. .option(user, root) 33. .option(password, 123456) 34. .option(dbtable, score) 35. .load() 36.df3.show() 37.df3.createTempView(score) 38. 39.val result session.sql(select person.id,person.name,person.age,score.score from person ,score where person.id score.id) 40. 41.result.show() 42.//将结果保存到mysql中 43.result.write.mode(SaveMode.Overwrite).jdbc(jdbc:mysql://192.168.179.14:3306/spark,result,prop) 44.
http://www.pierceye.com/news/854879/

相关文章:

  • 做网站比较专业的公司微信商城在哪里找
  • 网站建设开发的流程网站标题title怎么写
  • 网络营销的优势海宁网站怎么做seo
  • wordpress 英文主题南宁网站排名优化公司
  • 行业网站建设方案有专门做电商网站的CMS吗
  • 网站备案 快递公司变更流程
  • 简单的做图网站wordpress加密授权
  • 哪里做网站域名不用备案新华舆情监测平台
  • 品牌工厂网站建设qt 网站开发
  • xxx网站建设规划家庭服务网站的营销策略
  • 哪里可以做宝盈网站江门百度seo公司
  • 电子商务的网站建设名词解释如何建立官网
  • 网站建设维护外包群排名优化软件
  • 苏州专业建设网站镇江网站建设找思创网络
  • 长春网站排名提升seo关键词推广多少钱
  • 头条网站怎么做的在网站上放广告
  • 网站建设费的会计分录wordpress c博客
  • 网站开发语言字典使用apmserv本地搭建多个网站
  • 建网站费用记账北京时间网站建设
  • 兴化网站开发佛山营销网站建设联系方式
  • 安居客官网网站天津 网站设计制作公司
  • seo建站优化价格表中山网站建设品牌
  • wp网站源码聊城市住房和城乡建设局网站首页
  • 个人博客网站总结买东西的网站
  • 兰州新区小程序建站网站的漂浮广告怎么做
  • 用vs代码做网站线上拓客渠道有哪些
  • 微信网站界面如何免费创建自己的平台
  • 电商设计一般都是做什么潍坊网站seo外包
  • 大城怎么样做网站雄安建设工程信息网站
  • 郑州网站建设方案服务安全狗iis版删了以后 网站打不开