高要网站建设,wordpress比织梦安全吗,高校学校网站建设,傻瓜式自助建站系统前言
在跑调度任务时候#xff0c;有时候子任务需要依赖前置任务的输出#xff0c;但类似读取 Parquet 或者 Orc 文件时#xff0c;如果不判断目录是否为空#xff0c;在输出为空时会报错#xff0c;所以需要 check 一下#xff0c;此外Hadoop通常在写入数据时会在目录中…前言
在跑调度任务时候有时候子任务需要依赖前置任务的输出但类似读取 Parquet 或者 Orc 文件时如果不判断目录是否为空在输出为空时会报错所以需要 check 一下此外Hadoop通常在写入数据时会在目录中生成一个名为_SUCCESS的文件来表示写入操作已成功完成我们在检测时要排除这个文件 HDFS API 判断
from py4j.java_gateway import java_import
from pyspark.sql import SparkSession# 初始化SparkSession
spark SparkSession.builder.appName(Example).getOrCreate()# 导入Hadoop FileSystem类
java_import(spark._jvm, org.apache.hadoop.fs.Path)
java_import(spark._jvm, org.apache.hadoop.fs.FileSystem)# 定义要检查的路径
FEATURE_OUTPUT_PATH your_path_here# 获取Hadoop Configuration
hadoop_conf spark._jsc.hadoopConfiguration()# 获取FileSystem对象
fs spark._jvm.FileSystem.get(hadoop_conf)# 检查路径是否存在
path spark._jvm.Path(FEATURE_OUTPUT_PATH)if fs.exists(path):# 获取目录下所有的文件和子目录status_list fs.listStatus(path)non_success_files [file_status.getPath().getName() for file_status in status_list iffile_status.getPath().getName() ! _SUCCESS]# 检查除_SUCCESS文件外是否还有其他文件if non_success_files:# 读取Parquet文件table spark.read.format(parquet).option(header, true).load(FEATURE_OUTPUT_PATH)else:print(The directory is empty or only contains a _SUCCESS file.)
else:print(The path does not exist.)本地 Shell 判断
注意这段脚本能使用的前提是执行的机器上已经安装和配置了 HDFS 的 shell 命令
import subprocessoutsubprocess.check_output(hadoop fs -ls /tmp/file.txt,shellTrue)outout.strip()outout.split(\n)for l in out:if l.endswith(.txt):print file exitelse:print file not exit