当前位置: 首页 > news >正文

免费网站建设软件推荐莞城区做网站

免费网站建设软件推荐,莞城区做网站,罗湖网站的建设,有哪些建设工程类网站姓名#xff1a; 总分#xff1a;Hadoop、Hive、HBase、数据集成、Scala阶段测试 一、选择题#xff08;共20道#xff0c;每道0.5分#xff09; 1、下面哪个程序负责HDFS数据存储#xff08; C #xff09; A. NameNode B. Jobtracher C. DataNode D. Sec… 姓名 总分Hadoop、Hive、HBase、数据集成、Scala阶段测试 一、选择题共20道每道0.5分 1、下面哪个程序负责HDFS数据存储 C A. NameNode B. Jobtracher C. DataNode D. SecondaryNameNode 2、下列哪个属性是hdfs-site.xml中的配置 C B A. fs.defaultFS B. dfs.replication C. yarn.resourcemanager.address D. mapreduce.framework.name 解析 fs.defaultFS用于指定HDFSHadoop Distributed File System的默认文件系统URI。它是Java代码访问HDFS时使用的路径。 dfs.replication该配置项用于设置HDFS中数据块的副本数。 yarn.resourcemanager.address用于指定 YARN ResourceManager 的 RPC 服务器地址和端口号。ResourceManager 是 YARN 架构中的核心组件之一负责接收客户端提交的作业如 MapReduce 任务、Spark 任务等并为这些作业分配资源如内存、CPU以在集群中的 NodeManager 上执行。 mapreduce.framework.name该配置项用于指定MapReduce作业的运行框架。在Hadoop 2.x及更高版本中MapReduce作业通常运行在YARN上因此该配置项的值通常为yarn。它告诉MapReduce框架应该使用YARN来管理其作业的执行。 3、Hadoop-3.x集群中的HDFS的默认的数据块的大小是 D A. 256M B.32M C.64M D.128M 4、Hadoop-3.x集群中的HDFS的默认的副本块的个数是 C A. 1 B. 2 C. 3 D. 4 5、请问以下哪个命令组成是错误的 C A.bin/hadoop fs -cat /data/c.txt B. sbin/hdfs dfsadmin -report C. bin/hdfs namenode -format D.sbin/stop-dfs.sh 解析 使用hadoop fs命令来访问HDFS文件系统-cat选项用于查看指定文件这里是/data/c.txt的内容。 用于请求HDFS报告其状态包括健康报告、数据节点报告等。 在Hadoop中hdfs namenode命令应该位于sbin目录下而不是bin目录。这个命令用于格式化HDFS的NameNode通常是在HDFS首次设置或需要重置HDFS元数据时使用的。 用于停止HDFS守护进程NameNode和DataNode。sbin目录下包含了Hadoop的管理脚本如启动和停止服务的脚本。 6、以下与HDFS类似的框架是 C A. NTFS B. FAT32 C. GFS D.EXT3 解析 HDFSHadoop Distributed File SystemHadoop分布式文件系统是一个高度容错性的系统设计用来部署在低廉的硬件上并能提供高吞吐量的数据访问适合大规模数据集上的应用。 GFSGoogle File SystemGoogle文件系统是一个可扩展的分布式文件系统同样用于大型的、分布式的、对大量数据进行访问的应用。 NTFSNew Technology File System是Windows操作系统的一个文件系统NTFS主要用于个人计算机和小型服务器的数据存储和管理。 FAT32File Allocation Table 32是另一种文件系统主要用于小型的存储设备如U盘和早期的硬盘驱动器。 EXT3Third extended filesystem是Linux系统的一种常见的文件系统主要用于Linux操作系统的数据存储和管理。 7、HBase启动不需要哪个进程 D A. HMaster B. HRegionServer C. QuorumPeerMain D. NodeManager A. HMaster这是HBase的主节点进程负责协调集群中的所有RegionServer并处理元数据操作。因此HBase启动时需要HMaster进程。 B. HRegionServer这是负责处理数据的读写请求以及Region的负载均衡的进程。HBase启动时需要RegionServer进程来提供数据服务。 C. QuorumPeerMain这是ZooKeeper服务器的主类。如果HBase配置为使用ZooKeeper进行协调和管理那么ZooKeeper包括QuorumPeerMain进程将是HBase启动过程中的一部分。虽然它不是HBase本身的进程但HBase的运行依赖于ZooKeeper因此在某种程度上可以认为HBase启动时需要ZooKeeper包括QuorumPeerMain的支持。 D. NodeManager这是YARNYet Another Resource Negotiator框架中的一部分用于管理集群中的节点资源如CPU、内存等。YARN是Hadoop生态系统中的一个资源管理和任务调度的框架与HBase的核心功能不直接相关。HBase并不依赖于YARN或NodeManager来运行其基本的数据存储和查询功能。因此HBase启动时不需要NodeManager进程。 8、下列哪个是纯离线数据采集工具 B A. FlinkX B. Sqoop C. Flume D. Canal FlinkX是一款基于Flink的分布式离线/实时数据同步插件可实现多种异构数据源高效的数据同步。 Sqoop是一个纯离线的数据采集工具主要用于将关系型数据库如MySQL中的数据导入到Hadoop的HDFS中或者从HDFS导出到关系型数据库中。高度依赖MapReduce和YARN Flume是一个分布式、高可靠性和高可用性的海量日志收集系统主要用于实时采集日志数据。 Canal是一个基于MySQL数据库增量日志解析的实时数据同步工具它主要用于提供增量数据订阅和消费。 9、Map的输出结果首先被写入 A A. 内存 B. 缓存 C. 磁盘 D. 以上都正确 Map的输出结果首先被写入的是一个内存缓冲区这个缓冲区可以看作是内存的一部分。 10、MapReduce与HBase的关系哪些描述是正确的 B A. 两者不可或缺MapReduce是HBase可以正常运行的保证 B. 两者不是强关联关系没有MapReduceHBase可以正常运行 C. MapReduce不可以直接访问HBase D. 它们之间没有任何关系 A. 两者不可或缺MapReduce是HBase可以正常运行的保证 这个描述并不准确。虽然MapReduce和HBase都是Hadoop生态系统中的重要组件但它们各自承担着不同的职责。HBase是一个开源的、分布式的非关系型数据库而MapReduce是一种用于大规模数据处理的编程模型。HBase的正常运行并不直接依赖于MapReduce它可以独立运行并通过其他方式如HBase自带的计算框架Coprocessor进行数据处理。 B. 两者不是强关联关系没有MapReduceHBase可以正常运行 这个描述是正确的。HBase和MapReduce是两个独立的项目它们之间的关系是相互依赖但非强制的。HBase可以独立于MapReduce运行并通过其他机制如HBase Shell、HBase API等进行数据的读写和管理。同时MapReduce也可以处理非HBase来源的数据如HDFS上的文件等。 C. MapReduce不可以直接访问HBase 这个描述不完全准确。实际上MapReduce可以通过HBase提供的HBaseTableInputFormat类来直接访问HBase中的数据。这个类允许MapReduce任务将HBase表作为输入源从而可以对HBase中的数据进行读取和处理。 D. 它们之间没有任何关系 这个描述显然是错误的。HBase和MapReduce都是Hadoop生态系统中的关键组件它们之间存在着紧密的联系和交互。HBase可以作为MapReduce任务的输入或输出源而MapReduce则可以对HBase中的数据进行高效的并行处理。 11.下列哪个不是Spark的执行模式 C A. Local B. YARN C. Mesos D. HDFS A. Local这是Spark的一个执行模式其中Spark应用程序在单个JVM进程中运行通常用于开发、测试和调试目的。在这个模式下Spark不需要启动集群而是在本地机器上运行非常适合小规模数据处理和快速原型开发。因此A选项是Spark的一个执行模式。 B. YARNYARNYet Another Resource Negotiator是Apache Hadoop的资源管理器用于在Hadoop集群上管理资源和调度任务。Spark可以在YARN上运行将YARN作为集群管理器以管理集群资源和调度Spark任务。YARN模式包括yarn-client和yarn-cluster两种运行模式分别适用于不同的场景。因此B选项也是Spark的一个执行模式。 C. Mesos虽然Mesos是一个开源的集群管理器用于管理跨多种框架包括Spark的集群资源但它本身并不是Spark的一种执行模式。相反Spark可以在Mesos上运行利用Mesos提供的资源管理和调度功能。然而在描述Spark的执行模式时我们通常不会说“Mesos是Spark的一个执行模式”而是说“Spark可以在Mesos上运行”。因此C选项不是Spark的直接执行模式。 D. HDFSHDFSHadoop Distributed File System是Apache Hadoop的分布式文件系统用于存储大数据集。它并不是Spark的执行模式而是Spark可以访问的一种数据存储系统。Spark可以从HDFS中读取数据进行处理并将结果写回到HDFS中。然而HDFS与Spark的执行模式是两个不同的概念。因此D选项同样不是Spark的执行模式。 12.在Spark中什么机制用于加速迭代计算 B A. Checkpointing B. Caching C. Broadcasting D. Partitioning 解析 Caching缓存 缓存机制是Spark中用于优化迭代计算的重要手段。Spark允许用户将RDD弹性分布式数据集或DataFrame等数据集缓存到内存中以便在后续的计算中重用。 Checkpointing检查点 虽然Checkpointing也是Spark中的一种容错机制但它主要用于在系统故障或节点故障时恢复数据而不是直接用于加速迭代计算。 Broadcasting广播变量 Broadcasting是Spark中用于优化数据传输的一种机制它允许将大变量如模型参数、大配置对象等从Driver端广播到所有Executor端以减少每个Task的数据传输量。 Partitioning分区 Partitioning是Spark中对数据进行分布式存储和计算的基础。通过合理的分区策略可以将大数据集分割成多个小数据集并分布到不同的节点上进行并行处理。然而分区本身并不直接加速迭代计算而是为并行计算提供了基础。 下列哪个函数属于转换操作Transformation而不是行动操作Action C A. count() B. collect() C. filter() D. saveAsTextFile() 在Spark中persist和cache方法有何区别 B A) cache是persist的一个别名二者完全相同。 B) persist提供多种存储级别而cache总是使用默认的存储级别。 C) cache用于DataFrame而persist用于RDD。 D) persist用于数据的持久化而cache用于数据的临时存储。 在Spark中什么是“窄依赖”Narrow Dependency与“宽依赖”Wide Dependency它们如何影响数据的并行处理 A A) 窄依赖表示每个父RDD分区映射到子RDD的一个分区而宽依赖涉及多个父分区到一个子分区导致shuffle。 B) 宽依赖意味着数据不需要重分布而窄依赖则需要shuffle。 C) 窄依赖和宽依赖都涉及到数据的shuffle只是程度不同。 D) 窄依赖与宽依赖仅在DataFrame中存在对RDD没有意义。 Spark的Job和Stage在执行过程中如何划分 D A) Job由一系列Stage组成每个Stage对应于一个shuffle操作。 B) Job和Stage是同义词没有区别。 C) Stage由一系列Job组成用于并行执行不同的任务。 D) Job是由用户提交的任务Stage是DAGScheduler为优化执行计划而创建的最小执行单元。 解析 Job的定义 Job是Spark中由用户提交的任务通常是由一个Action操作如collect、count、save、reduce等触发的。每个Action操作都会生成一个Job。 Stage的划分 Stage是Spark中Job处理过程要分为的几个阶段。DAGScheduler有向无环图调度器会根据RDD之间的依赖关系特别是宽依赖如shuffle操作将Job划分为多个Stage。划分Stage的依据是RDD之间的宽窄依赖。宽依赖如groupByKey、reduceByKey、join等会导致shuffle操作从而需要在不同节点间重新分配数据。每当遇到宽依赖时DAGScheduler就会切分出一个新的Stage。Stage的数量取决于程序中宽依赖即shuffle操作的数量。每个Stage包含一组可以并行执行的任务Tasks。 Task的定义 Task是Spark中任务运行的最小单位最终是以Task为单位运行在Executor中的。一个Stage会包含多个Task这些Task的数量通常取决于Stage中最后一个RDD的分区数。Task的内容与Stage相同但当分区数量为n时会有n个相同效果的Task被分发到执行程序中执行。 在Spark中mapPartitions与map操作有何区别以及在什么情况下使用mapPartitions更合适 B A) mapPartitions和map都是对每个元素进行操作没有区别。 B) mapPartitions可以访问整个分区的数据适用于需要对分区内的数据进行全局操作的场景。 C) map操作可以改变分区的数量而mapPartitions不能。 D) mapPartitions是map的别名用于提高代码可读性。 Spark的Kryo序列化库如何帮助提高性能 B A) Kryo增加了序列化的复杂度但提高了数据的完整性。 B) Kryo序列化库提供了一种更紧凑、更快的序列化方式减少了网络传输和磁盘I/O的开销。 C) Kryo只用于Spark的内部通信对外部数据无影响。 D) Kryo序列化库是默认的序列化方式不需要配置。 在Spark中SparkSession与SparkContext的关系是什么为何推荐使用SparkSession A A) SparkSession是SparkContext的封装提供了更高级的功能如SQL查询和数据源管理SparkSession简化了API提高了易用性。 B) SparkSession和SparkContext可以互换使用没有推荐使用的原因。 C) SparkContext是SparkSession的前身SparkSession仅用于Spark SQL。 D) SparkSession用于管理执行器SparkContext用于管理Driver程序。 Spark的Broadcast Join与Shuffle Hash Join有何区别在何种情况下应优先考虑使用Broadcast Join D A A) Broadcast Join将较小的表广播到每个节点减少shuffle成本Shuffle Hash Join需要更多网络传输适用于大表间的连接。 B) Broadcast Join和Shuffle Hash Join没有区别只是名称不同。 C) Shuffle Hash Join总是优于Broadcast Join因为它更通用。 D) Broadcast Join用于小数据集Shuffle Hash Join用于大数据集但具体选择与数据大小无关。 解析 Broadcast Join和Shuffle Hash Join是Spark SQL中处理连接Join操作的两种不同策略。Broadcast Join适用于连接操作中的一个小表它会将这个小表广播到每个节点上从而避免了大表的shuffle操作减少了网络传输成本。而Shuffle Hash Join则适用于大数据集之间的连接它需要对数据进行shuffle操作来确保连接的正确性。因此在连接操作中的一个小表时应优先考虑使用Broadcast Join。注意实际选择哪种连接策略还取决于其他因素如数据分布、集群配置等但数据大小是一个重要的考虑因素。 二、填空题共20分每空0.5分 1、启动hdfs的shell脚本是 sh xxx.sh start-dfs.sh 解析 启动hdfs的shell脚本是start-dfs.sh。这个脚本用于启动Hadoop分布式文件系统HDFS的所有守护进程包括NameNode和DataNode等。 2、Block是HDFS的基本存储单元默认大小是 128 MB 3、MapReduce默认输入的格式化类 InputFormat TextInputFormat 解析 MapReduce默认输入的格式化类是TextInputFormat。这是MapReduce的默认输入格式它读取文件的行作为输入其中行的字节偏移量作为键Key行的内容作为值Value。 4、Hadoop三大组件 HDFS 、 MapReduce 、 Yarn 5、Hiveserver2默认的端口 10000 解析 Hiveserver2默认的端口是10000。Hiveserver2是Hive的一个服务组件它允许用户通过JDBC或ODBC等协议远程连接到Hive并执行SQL查询。 6、HBase的RowKey设计三大原则 唯一性 、 散列性 、 长度适中 解析 唯一性确保RowKey的唯一性以便能够唯一标识表中的每一行数据。散列性设计RowKey时应该考虑其散列性避免大量数据集中在少数几个Region上从而导致热点问题。长度适中RowKey的长度应该适中不宜过长也不宜过短。过长的RowKey会占用较多的存储空间而过短的RowKey则可能增加数据倾斜的风险。 7、在Spark中一个 RDD 由一系列的 分区 组成每个Stage由一组 RDD 构成而Stage之间的依赖关系通常由 行动算子 操作触发。 8、hive中数据文件默认存储格式是 txt TextFile 解析 hive中数据文件默认存储格式是TextFile。TextFile是一种简单的文本格式数据以行为单位进行存储每行数据之间通过换行符分隔。 9、spark core中缓存的实现方式有几种 使用cache存储到内存中 使用checkpoint存储到磁盘中 10、hive中sql转变成mr经过4个器分别是解析器 编译器 、 优化器 、 执行器 三、判断题共10道每道1分 1、Block Size是不可以修改的 F 2、如果NameNode意外终止SecondaryNameNode会接替它使集群继续工作 F 3、MapReduce 切片的大小等于 block的大小 F 4、在HBase中由HMaster负责用户的IO请求 F 5、MapReduce中map任务的数量可以自己指定 T 6、DataX只能用于离线数据采集 T 7、Flume运行时需要依赖MapReduce T F 解析 Apache Flume是一个分布式、可靠且可用的服务用于高效地收集、聚合和移动大量日志数据。它并不依赖MapReduce来运行而是可以独立于Hadoop生态系统运行。 8、MapReduce中环形缓冲区默认大小为128M F 9、Spark的SparkContext和SparkSession可以同时存在于同一个应用中SparkContext提供了更多低级的API而SparkSession则提供了高层的API包括SQL和数据源支持。 T 10、Spark的map操作是懒惰求值的只有在触发行动操作时才会执行计算。 T 四、简答题共5道每道4分 1、用自己的语言描述SecondaryNameNode的作用。 SecondaryNameNode作为NameNode的秘书帮助NameNode处理事务。 SecondaryNameNode是用来帮助NameNode完成元数据信息合并从角色上看属于NameNode的“秘书” 1.定期合并FsImage和Edits文件 提供HDFS元数据的冷备份 监控HDFS状态 提升HDFS的可靠性和性能 2、用自己的语言描述spark的数据倾斜优化方式。 1.使用Hive ETL预处理数据如果导致数据倾斜的是Hive表。如果该Hive表中的数据本身很不均匀(比如某个 key对应了100万数据其他key才对应了10条数据)。此时可以评估一下是否可以通过Hive来进行数据预处理(即通过Hive ETL预先对 数据按照key进行聚合或者是预先和其他表进行join)然后在Spark作业中针对的数据源就不是 原来的Hive表了而是预处理后的Hive表。此时由于数据已经预先进行过聚合或join操作了那么 在Spark作业中也就不需要使用原先的shuffle类算子执行这类操作了。 2.过滤少数导致倾斜的key如果发现导致倾斜的key就少数几个而且对计算本身的影响并不大。如果我们判断那少数几个数据量特别多的key对作业的执行和计算结果不是特别 重要的话那么干脆就直接过滤掉那少数几个key。比如在Spark SQL中可以使用where子句过滤 掉这些key或者在Spark Core中对RDD执行filter算子过滤掉这些key。如果需要每次作业执行时 动态判定哪些key的数据量最多然后再进行过滤那么可以使用sample算子对RDD进行采样然后 计算出每个key的数量取数据量最多的key过滤掉即可。 3.提高shuffle操作的并行度在对RDD执行shuffle算子时给shuffle算子传入一个参数比如 reduceByKey(1000)该参数就设置了这个shuffle算子执行时shuffle read task的数量。对于 Spark SQL中的shuffle类语句比如group by、join等需要设置一个参数即 spark.sql.shuffle.partitions该参数代表了shuffle read task的并行度该值默认是200对于很 多场景来说都有点过小。增加shuffle read task的数量可以让原本分配给一个task的多个key分配给多个 task从而让每个task处理比原来更少的数据。4.双重聚合对RDD执行reduceByKey等聚合类shuffle算子或者在Spark SQL中使用group by 语句进行分组聚合时比较适用这种方案。这个方案的核心实现思路就是进行两阶段聚合。第一次是局部聚合先给每个key 都打上一个随机数比如10以内的随机数此时原先一样的key就变成不一样的了比如(hello, 1) (hello, 1) (hello, 1) (hello, 1)就会变成(1_hello, 1) (1_hello, 1) (2_hello, 1) (2_hello, 1)。接着 对打上随机数后的数据执行reduceByKey等聚合操作进行局部聚合那么局部聚合结果就会 变成了(1_hello, 2) (2_hello, 2)。然后将各个key的前缀给去掉就会变成(hello,2)(hello,2)再次 进行全局聚合操作就可以得到最终结果了比如(hello, 4)。 5.将reduce join转为map join在对RDD使用join类操作或者是在Spark SQL中使用join语句时而且join操作中 的一个RDD或表的数据量比较小(比如几百M或者一两G)比较适用此方案。 不使用join算子进行连接操作而使用Broadcast变量与map类算子实现join操作 进而完全规避掉shuffle类的操作彻底避免数据倾斜的发生和出现。将较小RDD中的数据直接通过 collect算子拉取到Driver端的内存中来然后对其创建一个Broadcast变量;接着对另外一个RDD 执行map类算子在算子函数内从Broadcast变量中获取较小RDD的全量数据与当前RDD的每 一条数据按照连接key进行比对如果连接key相同的话那么就将两个RDD的数据用你需要的方式 连接起来。 6.采样倾斜key并分拆join操作两个RDD/Hive表进行join的时候如果数据量都比较大无法采用“解决方案五 ”那么此时可以看一下两个RDD/Hive表中的key分布情况。如果出现数据倾斜是因为其中某一 个RDD/Hive表中的少数几个key的数据量过大而另一个RDD/Hive表中的所有key都分布比较均 匀那么采用这个解决方案是比较合适的。思路对包含少数几个数据量过大的key的那个RDD通过sample算子采样出一份样本来然后统计一下每个 key的数量计算出来数据量最大的是哪 几个key。 然后将这几个key对应的数据从原来的RDD中拆分出来形成一个单独的RDD并给每个key都打上n以 内的随机数作为前缀而不会导致倾斜的大部分key形成另外一个RDD。 接着将需要join的另一个RDD也过滤出来那几个倾斜key对应的数据并形成一个单独的RDD将每条数 据膨胀成n条数据这n条数据都按顺序附加一个0~n的前缀不会导致倾斜的大部分key也形成另外一个 RDD。 再将附加了随机前缀的独立RDD与另一个膨胀n倍的独立RDD进行join此时就可以将原先相同的key打 散成n份分散到多个task中去进行join了。 而另外两个普通的RDD就照常join即可。 最后将两次join的结果使用union算子合并起来即可就是最终的join结果。 7.使用随机前缀和扩容RDD进行join如果在进行join操作时RDD中有大量的key导致数据倾斜那么进行分拆key也没 什么意义。该方案的实现思路基本和“解决方案六”类似首先查看RDD/Hive表中的数据分布情况找到那个造成 数据倾斜的RDD/Hive表比如有多个key都对应了超过1万条数据。 然后将该RDD的每条数据都打上一个n以内的随机前缀。 同时对另外一个正常的RDD进行扩容将每条数据都扩容成n条数据扩容出来的每条数据都依次打上一 个0~n的前缀。 最后将两个处理后的RDD进行join即可。3、用自己的语言描述诉MapReduce流程。 一、输入分片Input Splitting 过程描述在MapReduce作业开始前输入文件或文件夹首先被划分为多个InputSplit输入分片。默认情况下每个HDFS的block数据块对应一个InputSplit。这样做的目的是将大文件分割成多个小块以便并行处理。分片大小分片的大小通常与HDFS的数据块大小相同默认是128MB但也可以根据作业需求进行调整。分片时不考虑数据集整体而是逐个针对每一个文件单独切片。 二、Map阶段 InputFormat类使用InputFormat类的子类如TextInputFormat把输入文件夹划分为InputSplit。RecordReader类每个InputSplit通过RecordReader类被解析成一个个key,value键值对。在TextInputFormat中默认是每行的起始偏移量作为key每行的内容作为value。Mapper类框架调用Mapper类中的map函数输入是k1,v1键值对输出是k2,v2键值对。程序员可以覆盖map函数实现自己的逻辑。 三、Combiner阶段可选 过程描述Combiner是一个本地化的reduce操作发生在map端。它的主要作用是减少map端的输出从而减少shuffle过程中网络传输的数据量提高作业的执行效率。注意Combiner的输出是Reducer的输入因此它绝不能改变最终的计算结果。Combiner适合于等幂操作如累加、最大值等。 四、Shuffle阶段 分区在map函数处理完数据后输出的k2,v2键值对会根据key进行分区不同的分区由不同的reduce task处理。分区操作通常通过哈希函数实现。排序与合并在写入环形缓冲区之前数据会先进行排序默认采用快速排序算法。当缓冲区满默认为80%容量时数据会溢出到磁盘文件中并在溢出前完成排序。多个溢出文件在最终输出前会进行归并排序合并成一个大的有序文件。数据传输map任务完成后reduce任务会启动数据copy线程通过HTTP方式请求map任务所在的NodeManager以获取输出文件。 五、Reduce阶段 数据合并Reduce任务将接收到的所有map任务的输出数据已分区且区内有序进行合并相同key的value值会被放到同一个集合中。Reducer类框架调用Reducer类中的reduce函数对合并后的数据进行处理最终输出结果。OutputFormat类使用OutputFormat类的子类如TextOutputFormat将最终结果输出到文件或数据库等存储介质中。 4、谈谈Hive的优化。 1.本地模式运行当处理一些小任务时可以选择本地模式运行这样会使得任务执行的速度会很快。 2.JVM模式在处理一些需要很多资源的任务时可以先申请一部分的资源等运行结束后再将资源释放。 3.严格模式启动严格模式禁止分区表的全表扫描查询数据时必须加limit禁止笛卡尔积。 4.hive join的数据倾斜问题当小表join小表时不用去管它当小表join大表时小表放在join的左边当大表join大表时应当考虑是否会出现某个reduce数据量过大的情况。空key过滤当有大量数据同时放入一个reduce时应当观察该rowkey一般来说该rowkey对应的数据都是异常数据需要使用sql语句对其进行过滤。空key转换当有大量的数据都对应一个空的rowkey时需要给这些数据随机分配一个rowkey使它们均匀的分布到一些reduce中。 5.自定义map和reduce的数量一般不去修改它。 5、用自己的语言描述诉spark的资源调度和任务调度流程。 spark的资源调度driver向resourcemanager申请资源resourcemanager选择一个空闲的子节点开启applicationmaster任务applicationmaster向resourcemanager提交申请资源开启executor的任务。applicationmaster选择一个空闲子节点开启executor 开启完毕后applicationmaster将executor开启的消息发送给driver让driver发送执行任务。 spark的任务调度流程driver端遇到action算子触发任务执行将任务提交到有向无环图DAGscheduler中根据RDD的血缘关系划分划分stage将RDD中的分区封装成taskset任务发送到TASKscheduler。TASKscheduler取出taskset任务根据RDD 的提供最优的任务执行计划只移动计算不移动数据开始对执行任务。 spark的资源调度 1、Driver提交作业命令 2、向ResourceMananger申请资源 3、ResourceMananger检查一些权限、资源空间在一个相对空闲的子节点上开启一个ApplicationMaster的进程 4、ApplicationMaster向ResourceMananger申请资源启动Executor 5、ResourceMananger启动Executor 6、Executor反向注册给Driver告诉Driver资源申请成功可以发送任务 spark的任务调度流程 7、Driver遇到一个行动算子触发整个作业调度 8、先将整个作业交给DAG有向无环图 DAG Scheduler 9、根据RDD之间的血缘关系找出宽窄依赖有没有shuffle的产生 10、通过宽窄依赖划分stage阶段 11、根据stage阶段将stage中的task任务封装成一个taskSet对象 12、发送给后续的 Task Scheduler Task Scheduler 13、从DAG Scheduler发送过来的taskSet中取出task任务 14、根据RDD五大特性的最后一大特性只移动计算不移动数据将task任务发送到对应的Executor的线程池中执行 五、代码题50分 1、spark sql数据分析以及可视化30分 疫情期间各类政府媒体及社交网站均发布了相关疫情每日统计数据下面基于数据仓库工具Hive请你统计分析相关疫情数据。 提示 数据字段为日期date、省份province、城市city、新增确诊confirm、新增出院heal、新增死亡dead、消息来源source 部分数据截图 题目 请基于covid19.csv数据将数据导入到Hive中使用spark on hive读取数据使用纯SQL完成下列统计分析 请自行在Hive按照数据结构创建对应的表并加载数据 请给出代码语句及结果截图 1、统计湖北省每月新增出院病例总数最多的前3个城市8分 输出[月份城市每月新增出院病例总数排名] create table bigdata30_test3.covid ( dates String, province String, city String, confirm Int, heal Int, dead Int, source String )ROW FORMAT DELIMITED FIELDS TERMINATED BY , LOCATION /bigdata30/data/;select tt1.month, tt1.city, tt1.counts as counts, tt1.rank from (select t1.month, t1.city, t1.counts as counts, -- row_number数据相同也不会导致排名重复 row_number() over(partition by t1.month order by t1.counts desc) rank from (select province, city, -- 只取出月份 subString(dates,0,2) as month, count(heal) as counts from bigdata30_test3.covid where province 湖北 and city ! 境外输入-英国 -- 只取出月份用来分组 group by city,province,subString(dates,0,2)) t1) tt1 -- 取每个排序的前三名 WHERE tt1.rank 3;结果 1月 武汉市 24 1 1月 荆门市 10 2 1月 荆州市 10 3 2月 武汉市 62 1 2月 黄石市 54 2 2月 黄冈市 46 3 3月 武汉市 41 1 3月 鄂州市 32 2 3月 孝感市 28 3 4月 武汉市 29 1 4月 荆门市 1 2 4月 襄阳市 1 32、统计安徽省每月新增确诊人数同比 同比 当月指标 - 上月指标/ 上月指标 6分 输出[月份每月新增确诊人数上月新增确诊人数同比] 将该需求结果写入到mysql中使用fileBI作图柱状图 4分 -- 纯sql DBeaver中执行 select t1.month, t1.counts, LAG(counts,1,-1) over(order by t1.month) as last_counts, casewhen LAG(counts,1,-1) over(order by t1.month) 0 then 没有上一个月的数据else round(((t1.counts - LAG(counts,1,-1) over(order by t1.month)) / LAG(counts,1,-1) over(order by t1.month)),2) end as rate from (select subString(dates,0,2) as month, province, count(confirm) as counts from bigdata30_test3.covid where province 安徽 group by province,subString(dates,0,2)) t1;-- MySQL建表 create table dataMysql ( months varchar(30), counts varchar(30), last_counts varchar(30), rate varchar(30) )// 为了将数据写入到MySQL使用sparksqlpackage com.shujia.DSLexamimport org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Row, SparkSession}import java.sql.{Connection, DriverManager, PreparedStatement}object Exam2_1 {def main(args: Array[String]): Unit {val sparkSession: SparkSession SparkSession.builder().master(local).appName(考试大题一)//参数设置的优先级代码优先级 命令优先级 配置文件优先级.config(spark.sql.shuffle.partitions, 1).enableHiveSupport() // 开启hive的配置.getOrCreate()sparkSession.sql(use bigdata30_test3)//truncate false 时完整地显示某列值不进行任何截断。val dataDF: DataFrame sparkSession.sql(|select|t1.month,|t1.counts,|LAG(counts,1,-1) over(order by t1.month) as last_counts,|case| when LAG(counts,1,-1) over(order by t1.month) 0 then 0| else round(((t1.counts - LAG(counts,1,-1) over(order by t1.month)) / LAG(counts,1,-1) over(order by t1.month)),2)|end as rate|from|(select|subString(dates,0,2) as month,|province,|count(confirm) as counts|from|bigdata30_test3.covid|where province 安徽|group by province,subString(dates,0,2)) t1|.stripMargin)dataDF.foreach((rdd: Row) {//注册驱动Class.forName(com.mysql.jdbc.Driver)//创建数据库连接对象val conn: Connection DriverManager.getConnection(jdbc:mysql://master:3306/exam?useUnicodetruecharacterEncodingUTF-8useSSLfalse,root,123456)//创建预编译对象val statement: PreparedStatement conn.prepareStatement(insert into dataMysql values(?,?,?,?))statement.setString(1, rdd.getAs[String](month))statement.setLong(2, rdd.getAs[Long](counts))statement.setLong(3, rdd.getAs[Long](last_counts))statement.setDouble(4, rdd.getAs[Double](rate))// 执行sql语句statement.executeUpdate()statement.close()conn.close()})} } fineBI作图 3、统计安徽省各城市连续新增确诊人数、连续新增确诊开始日期、连续新增确诊结束日期及连续新增确诊天数12分 输出[城市连续新增确诊人数连续新增确诊开始日期连续新增确诊结束日期连续新增确诊天数] select DISTINCT tt1.city, max(tt1.confirm) over(partition by tt1.city,tt1.flag,SPLIT(tt1.new_day, -)[0]) - min(tt1.confirm) over(partition by tt1.city,tt1.flag,SPLIT(tt1.new_day, -)[0]) as add_confirm, min(tt1.new_day) over(partition by tt1.city,tt1.flag,SPLIT(tt1.new_day, -)[0]) as start_date, max(tt1.new_day) over(partition by tt1.city,tt1.flag,SPLIT(tt1.new_day, -)[0]) as end_date, count(1) over(partition by tt1.city,tt1.flag,SPLIT(tt1.new_day, -)[0]) as counts from (select *, CAST(SPLIT(t1.new_day, -)[1] AS INT) AS day_of_month, (CAST(SPLIT(t1.new_day, -)[1] AS INT) - t1.rank) as flag from (select * , from_unixtime(unix_timestamp(dates,MM月dd日),MM-dd) as new_day, row_number() over(partition by city order by dates) rank from bigdata30_test3.covid where province 安徽 and source 安徽卫健委 and heal IS NOT NULL and confirm IS NOT NULL and dead IS NOT NULL) t1) tt1-- 连续新增确诊人数应该是求出的数据应该是本组的最后一条的confirm减去本组第一天的confirm而不是下面的一组中的最大的confirm减去最小的confirm 该如何求解 max(tt1.confirm) over(partition by tt1.city,tt1.flag,SPLIT(tt1.new_day, -)[0]) -min(tt1.confirm) over(partition by tt1.city,tt1.flag,SPLIT(tt1.new_day, -)[0]) as add_confirm,-- 解决方案FIRST_VALUE()和LAST_VALUE()函数分别获取了每个分组的第一天和最后一天的确诊人数-- 为了避免下述中出现的分组全出现在结果中的问题使用FIRST_VALUE()和LAST_VALUE()函数时 -- 最好指定ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING SELECT DISTINCTtt1.city,LAST_VALUE(tt1.confirm) OVER (PARTITION BY tt1.city, tt1.flag, SPLIT(tt1.new_day, -)[0] ORDER BY tt1.new_day ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) -FIRST_VALUE(tt1.confirm) OVER (PARTITION BY tt1.city, tt1.flag, SPLIT(tt1.new_day, -)[0] ORDER BY tt1.new_day ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS add_confirm,MIN(tt1.new_day) OVER (PARTITION BY tt1.city, tt1.flag, SPLIT(tt1.new_day, -)[0]) AS start_date,MAX(tt1.new_day) OVER (PARTITION BY tt1.city, tt1.flag, SPLIT(tt1.new_day, -)[0]) AS end_date,COUNT(1) OVER (PARTITION BY tt1.city, tt1.flag, SPLIT(tt1.new_day, -)[0]) AS counts FROM(SELECT*,CAST(SPLIT(t1.new_day, -)[1] AS INT) AS day_of_month,(CAST(SPLIT(t1.new_day, -)[1] AS INT) - t1.rank) AS flagFROM(SELECT *,FROM_UNIXTIME(UNIX_TIMESTAMP(dates, MM月dd日), MM-dd) AS new_day,ROW_NUMBER() OVER (PARTITION BY city ORDER BY dates) AS rankFROM bigdata30_test3.covidWHERE province 安徽 AND source 安徽卫健委 AND heal IS NOT NULL AND confirm IS NOT NULL AND dead IS NOT NULL) t1) tt1;-- 注 在SQL中ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING是窗口函数如FIRST_VALUE(), LAST_VALUE(), ROW_NUMBER(), SUM(), AVG()等的一个子句用于指定窗口的范围。 UNBOUNDED PRECEDING表示窗口的起始点是分区中的第一行。 UNBOUNDED FOLLOWING表示窗口的结束点是分区中的最后一行。对于FIRST_VALUE()和LAST_VALUE()这样的函数它们通常需要一个明确的窗口定义来确定“第一”和“最后”是基于什么范围来计算的。如果不提供ROWS BETWEEN子句某些数据库系统可能会报错因为它们不知道应该基于哪些行来计算这些值。-- 不加也可正常执行但是不能加上ORDER BY tt1.new_day否则会出现整个分组都出现在最终的结果里 形如 |合肥市| 0| 01-28| 01-30| 3| |合肥市| 10| 01-28| 01-30| 3| |合肥市| 7| 01-28| 01-30| 3| SELECT DISTINCTtt1.city,LAST_VALUE(tt1.confirm) OVER (PARTITION BY tt1.city, tt1.flag, SPLIT(tt1.new_day, -)[0]) -FIRST_VALUE(tt1.confirm) OVER (PARTITION BY tt1.city, tt1.flag, SPLIT(tt1.new_day, -)[0]) AS add_confirm,MIN(tt1.new_day) OVER (PARTITION BY tt1.city, tt1.flag, SPLIT(tt1.new_day, -)[0]) AS start_date,MAX(tt1.new_day) OVER (PARTITION BY tt1.city, tt1.flag, SPLIT(tt1.new_day, -)[0]) AS end_date,COUNT(1) OVER (PARTITION BY tt1.city, tt1.flag, SPLIT(tt1.new_day, -)[0]) AS counts FROM(SELECT*,CAST(SPLIT(t1.new_day, -)[1] AS INT) AS day_of_month,(CAST(SPLIT(t1.new_day, -)[1] AS INT) - t1.rank) AS flagFROM(SELECT *,FROM_UNIXTIME(UNIX_TIMESTAMP(dates, MM月dd日), MM-dd) AS new_day,ROW_NUMBER() OVER (PARTITION BY city ORDER BY dates) AS rankFROM bigdata30_test3.covidWHERE province 安徽 AND source 安徽卫健委 AND heal IS NOT NULL AND confirm IS NOT NULL AND dead IS NOT NULL) t1) tt1;没指定ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING又加上了order by dates: 会出现下面这种情况 2、spark DSL数据分析20分 现有三份数据结构如下 live_types 直播间信息表 结构live_id live_type ​ 直播间id 直播间类型 live_events 用户访问直播间记录表 结构user_id live_id start_time end_time ​ 用户id 直播间id 开始时间 结束时间 user_info 用户信息表 结构user_id user_name ​ 用户id 用户名 题目 请给出结果截图及Scala代码 1、统计每位用户观看不同类型直播的次数6分 输出[用户id用户名直播间类型次数] import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.{DataFrame, SparkSession}object Exam2 {def main(args: Array[String]): Unit {val sparkSession: SparkSession SparkSession.builder().master(local).appName(社保练习).getOrCreate()import org.apache.spark.sql.functions._import sparkSession.implicits._// user_id live_id start_time end_time// 用户id 直播间id 开始时间 结束时间val live_events: DataFrame sparkSession.read.format(csv).option(sep, \t).schema(user_id INT,live_id INT,start_time timestamp,end_time timestamp).load(spark/data_exam/live_events.txt) // live_events.show()//live_id live_type// 直播间id 直播间类型val live_types: DataFrame sparkSession.read.format(csv).option(sep, \t).schema(live_id INT,live_type String).load(spark/data_exam/live_types.txt) // live_types.show()//user_id user_name// 用户id 用户名val user_info: DataFrame sparkSession.read.format(csv).option(sep, \t).schema(user_id INT,user_name String).load(spark/data_exam/user_info.txt) // user_info.show()/*** 1、统计每位用户观看不同类型直播的次数6分* 输出[用户id用户名直播间类型次数]*/live_events.withColumn(count,count(expr(1)) over Window.partitionBy($user_id,$live_id))// TODO 设置表与表之间进行左连接.join(live_types, live_events(live_id) live_types(live_id), left).join(user_info, live_events(user_id) user_info(user_id), left)/*** 两张表中的字段名相同要注明字段所属表* 否则会报 Reference user_id is ambiguous, could be: user_id, user_id.*/.select(live_events(user_id),$user_name,$live_type,$count).distinct().show()------------------------------ |user_id|user_name|live_type|count| ------------------------------ | 106| Lucy| music| 1| | 100| LiHua| game| 1| | 102| Tom| food| 1| | 104| Bush| game| 1| | 105| Jam| game| 1| | 102| Tom| music| 1| | 100| LiHua| food| 2| | 101| Bob| food| 1| | 101| Bob| game| 1| | 102| Tom| game| 1| | 104| Bush| food| 1| ------------------------------2、统计每位用户累计观看直播时长按时长降序排列6分 输出[用户id用户名累计时长] /*** 2、统计每位用户累计观看直播时长按时长降序排列6分* 输出[用户id用户名累计时长]*/// 100 1 2021-12-01 19:00:00 2021-12-01 19:28:00 start_time timestamp,end_timelive_events.withColumn(times,(unix_timestamp($end_time,yyyy-MM-dd HH:mm:ss) - unix_timestamp($start_time,yyyy-MM-dd HH:mm:ss))).withColumn(all_times, sum($times) over Window.partitionBy($user_id)).join(user_info,user_id)// TODO 时间戳 / 60 在最后查询时可以将秒转换成分钟.select($user_id,$user_name,$all_times / 60).distinct().orderBy($all_times.desc).show()-------------------------------- |user_id|user_name|(all_times / 60)| -------------------------------- | 104| Bush| 178.0| | 101| Bob| 163.0| | 102| Tom| 140.0| | 106| Lucy| 129.0| | 100| LiHua| 110.0| | 105| Jam| 8.0| --------------------------------3、统计不同类型直播用户累计观看时长降序排名8分 输出[直播间id直播间类型用户id用户名累计时长排名] /*** 3、统计不同类型直播用户累计观看时长降序排名8分* 输出[直播间id直播间类型用户id用户名累计时长排名]*/live_events//TODO 在开始得出时间戳的时候就将其转换成以分钟为单位.withColumn(times, (unix_timestamp($end_time, yyyy-MM-dd HH:mm:ss) - unix_timestamp($start_time, yyyy-MM-dd HH:mm:ss)) / 60).withColumn(all_times, sum($times) over Window.partitionBy($user_id)).join(user_info, user_id).join(live_types, live_id).withColumn(rank, row_number() over Window.partitionBy($live_type).orderBy($all_times.desc)).select($live_id,$live_type,$user_id,$user_name,$all_times,$rank).show()--------------------------------------------- |live_id|live_type|user_id|user_name|all_times|rank| --------------------------------------------- | 1| food| 104| Bush| 178.0| 1| | 1| food| 101| Bob| 163.0| 2| | 1| food| 102| Tom| 140.0| 3| | 1| food| 100| LiHua| 110.0| 4| | 1| food| 100| LiHua| 110.0| 5| | 3| music| 102| Tom| 140.0| 1| | 3| music| 106| Lucy| 129.0| 2| | 2| game| 104| Bush| 178.0| 1| | 2| game| 101| Bob| 163.0| 2| | 2| game| 102| Tom| 140.0| 3| | 2| game| 100| LiHua| 110.0| 4| | 2| game| 105| Jam| 8.0| 5| ---------------------------------------------
http://www.pierceye.com/news/896649/

相关文章:

  • 柳州网站建设优化推广wordpress 不显示菜单
  • 网站死循环网站备案和域名备案区别
  • 做网站要学会什么语言装修公司网站模板下载
  • 门户网站建设自查报告网站关键词快速排名技术
  • 如何建网站费用多少全国工商企业查询平台
  • 兰州新区建站什么是网络营销取得成功的基础
  • 南昌 网站 公司wordpress迁移后媒体库丢失
  • 做移动网站点击软件cnzz网站建设
  • 高质量网站外链建设大揭秘做网站之前需要准备什么条件
  • 睢宁做网站百度一下做网站
  • 做国外购物网站国家高职示范校建设网站
  • 网站建设福州公司山西省大同市网站建设公司
  • 浙江网站建设推荐wordpress 增加小工具
  • 个人网站是商业的吗北京网站建设设计
  • 手机网站收费怎么停止网站
  • 网站建设 金疙瘩计划杭州小程序制作公司排行榜
  • 德泰诺网站建设软件著作权登记证书
  • 商标设计网页seo外包公司兴田德润官方地址
  • 网站开发人员岗位成功营销案例分享
  • 赤峰做网站的公司湘潭哪里做网站
  • 免费自助建站郑州官网seo费用
  • 称心的常州网站建设wordpress怎么用两个主题
  • 建设银行北京分行网站做视频网站用什么服务器配置
  • 网站备案流程实名认证医疗网站建设资讯
  • 一个做问卷调查的网站好wordpress七比2
  • 西双版纳网站制作公司临沂企业网站建站模板
  • 培训做网站国内适合个人做外贸的网站有哪些
  • 我想卖自己做的鞋子 上哪个网站好中信银行网站怎么做的怎么烂
  • 在线网站建设工程标准godaddy 上传网站
  • 营销型网站方案ppt模板手机建站平台微点