找客户app,成都seo学徒,如何查询网站服务商,萧县住房和城乡建设局网站流式读取文件数据 from pyspark.sql import SparkSession
ss SparkSession.builder.getOrCreate()
# todo 注意1#xff1a;流式读取目录下的文件 --》一定一定要是目录#xff0c;不是具体的文件#xff0c;# 目录下产生新文件会进行读取# todo 注意点2#xff1…流式读取文件数据 from pyspark.sql import SparkSession
ss SparkSession.builder.getOrCreate()
# todo 注意1流式读取目录下的文件 --》一定一定要是目录不是具体的文件# 目录下产生新文件会进行读取# todo 注意点2csv和JSON必须指定schema 以前的JSON文件是不要指定df_csv ss.readStream.csv(‘hdfs://node1:8020/目录’) df_json ss.readStream.json(‘hdfs://node1:8020/目录’)
# todo 每个options都不一样options2 { ‘host’:‘192.168.88.100’, ‘port’:9999 }
options{ # 每个批次读取1个文件 ‘maxFilesPerTrigger’:1, ‘latestFirst’:‘true’ }
df_json.writeStream.start(format‘console’,outputMode‘complete’).awaitTermination()
流式读取文件的注意点 删除已经处理的文件文件一 你修改了文件一的内容不修改文件名你再次上传会发现它不去读取 但是你不修改文件内容修改文件名你再上传会发现它还会去读取 场景某天你上传一个文件发现它不做任何读取和处理你需要考虑这个文件名以前是否处理过了。 文件的读取方式在实际开发中用的比较少每生产一条数据就要生成一个文件单单正对流处理 但是如果将多条数据收集之后同一写入文件那就变成了和批处理方式一样的开发 文件读取数据的参数指定 当spark读不过来的时候可以调整latestFirst设置为True就会处理最新的文件 true时就会将所有相同文件名认定为同一个文件不管全部路径是否相同这就涉及到相同的路径不会连续处理 上面刚说的