当前位置: 首页 > news >正文

综合网站系统北京官网开发

综合网站系统,北京官网开发,网站推广渠道有哪些,企业所得税怎么算公式文章目录 一、flink 流式读取文件夹、文件二、flink 写入文件系统——StreamFileSink三、查看完整代码 一、flink 流式读取文件夹、文件 Apache Flink针对文件系统实现了一个可重置的source连接器#xff0c;将文件看作流来读取数据。如下面的例子所示#xff1a; StreamExe… 文章目录 一、flink 流式读取文件夹、文件二、flink 写入文件系统——StreamFileSink三、查看完整代码 一、flink 流式读取文件夹、文件 Apache Flink针对文件系统实现了一个可重置的source连接器将文件看作流来读取数据。如下面的例子所示 StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();TextInputFormat textInputFormat new TextInputFormat(null);DataStreamSourceString source env.readFile(textInputFormat, sourcePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 30000L);StreamExecutionEnvironment.readFile()接收如下参数 FileInputFormat参数负责读取文件中的内容。文件路径。如果文件路径指向单个文件那么将会读取这个文件。如果路径指向一个文件夹FileInputFormat将会扫描文件夹中所有的文件。PROCESS_CONTINUOUSLY将会周期性的扫描文件以便扫描到文件新的改变。30000L表示多久扫描一次监听的文件。 FileInputFormat是一个特定的InputFormat用来从文件系统中读取文件。FileInputFormat分两步读取文件。首先扫描文件系统的路径然后为所有匹配到的文件创建所谓的input splits。一个input split将会定义文件上的一个范围一般通过读取的开始偏移量和读取长度来定义。在将一个大的文件分割成一堆小的splits以后这些splits可以分发到不同的读任务这样就可以并行的读取文件了。FileInputFormat的第二步会接收一个input split读取被split定义的文件范围然后返回对应的数据。 DataStream应用中使用的FileInputFormat需要实现CheckpointableInputFormat接口。这个接口定义了方法来做检查点和重置文件片段的当前的读取位置。 在Flink 1.7中Flink提供了一些类这些类继承了FileInputFormat并实现了CheckpointableInputFormat接口。TextInputFormat一行一行的读取文件而CsvInputFormat使用逗号分隔符来读取文件。 二、flink 写入文件系统——StreamFileSink 该Sink不但可以将数据写入到各种文件系统中而且整合了checkpoint机制来保证Exacly Once语义还可以对文件进行分桶存储还支持以列式存储的格式写入功能更强大。 streamFileSink中输出的文件其生命周期会经历3中状态 in-progress Files 当前文件正在写入中Pending Files 当处于 In-progress 状态的文件关闭closed了就变为 Pending 状态Finished Files 在成功的 Checkpoint 后Pending 状态将变为 Finished 状态 下面是一个简答的例子 , 将接收到的数据流 ,写入到文件中保存 ! 数据文件格式是行式存储格式 BucketAssignerString, String assigner new DateTimeBucketAssigner(yyyy-MM-dd, ZoneId.of(Asia/Shanghai));StreamingFileSinkString fileSink StreamingFileSink.StringforRowFormat(new Path(savePath),new SimpleStringEncoder(UTF-8)).withRollingPolicy(DefaultRollingPolicy.builder().withRolloverInterval(TimeUnit.MINUTES.toMillis(20))//至少包含 20 分钟的数据.withInactivityInterval(TimeUnit.MINUTES.toMillis(20))//最近 20 分钟没有收到新的数据.withMaxPartSize(1024 * 1024 * 1024)//文件大小已达到 1 GB.build()).withBucketAssigner(assigner).build();其中特别说明了如果使用 FileSink 在 STREAMING 模式的时候必须开启 checkpoint不然的话会导致每个分片文件一直处于 in-progress 或者 pending 状态不能保证整个写入流程的安全性。 所以在我们上述的示例中我们并未开启 checkpoint 导致写出文件一直处于 inprogress 状态。如果加上 checkpoint 后 ​ 将数据以列式存储的格式输出到文件中 三、查看完整代码 import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.serialization.SimpleStringEncoder; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.io.TextInputFormat; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner; import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner; import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy; import org.apache.flink.streaming.api.functions.source.FileProcessingMode;import java.time.ZoneId; import java.util.concurrent.TimeUnit;public class WordTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//2.设置CK状态后端env.setStateBackend(new FsStateBackend(hdfs://nameservice1/tmp/kafka_test/data/chatgpt/mnbvc/checkpoint));env.enableCheckpointing(1000*60*3);// 每 ** ms 开始一次 checkpointenv.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);// 设置模式为精确一次env.getCheckpointConfig().setCheckpointTimeout(1000*60*5);// Checkpoint 必须在** ms内完成否则就会被抛弃env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);// 同一时间只允许一个 checkpoint 进行env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000);// 确认 checkpoints 之间的时间会进行 ** msenv.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);env.getCheckpointConfig().setTolerableCheckpointFailureNumber(5);// 允许两个连续的 checkpoint 错误env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10,TimeUnit.SECONDS)));//重启策略重启3次间隔10s// 使用 externalized checkpoints这样 checkpoint 在作业取消后仍就会被保留env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);String sourcePath hdfs://nameservice1/ec/data/chatgpt/mnbvc/mnbvc_website/format_com;String savePath hdfs://nameservice1/ec/data/chatgpt/mnbvc/mnbvc_website/format_filter_01;TextInputFormat textInputFormat new TextInputFormat(null);DataStreamSourceString source env.readFile(textInputFormat, sourcePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 30000L);BucketAssignerString, String assigner new DateTimeBucketAssigner(yyyy-MM-dd, ZoneId.of(Asia/Shanghai));StreamingFileSinkString fileSink StreamingFileSink.StringforRowFormat(new Path(savePath),new SimpleStringEncoder(UTF-8)).withRollingPolicy(DefaultRollingPolicy.builder().withRolloverInterval(TimeUnit.MINUTES.toMillis(20))//至少包含 20 分钟的数据.withInactivityInterval(TimeUnit.MINUTES.toMillis(20))//最近 20 分钟没有收到新的数据.withMaxPartSize(1024 * 1024 * 1024)//文件大小已达到 1 GB.build()).withBucketAssigner(assigner).build();source.map(line - JSONObject.parseObject(line)).filter(line - line.getString(text).length() 200 line.getInteger(id) % 7 0).map(line - JSON.toJSONString(line)).addSink(fileSink);env.execute();} }
http://www.pierceye.com/news/293152/

相关文章:

  • 宜昌网站建设选择宜昌慧享互动线上店免费推广的软件
  • 网站建设主流语言织梦网站流动广告代码
  • 南京做网站公司哪个网站上做ppt比较好看的
  • 在服务器上搭建网站中国建设银行淮南分行网站
  • 网站建设什么服务器品牌哪个好南京企业制作网站
  • 太原有哪些做网站的公司如何伪原创 网站
  • 设计好的网站网站策划方案详解
  • 建网站潞城哪家强?企业网络推广技巧
  • 怎么建设网站让国外看wordpress 公司内网
  • 虚拟主机购买网站网站值不值得做seo
  • 长沙网站排名优化如何在网站做电子杂志
  • 石家庄科技网站在线解压zip网站
  • 不良网站举报中心官网做网站必须买云虚拟主机吗
  • 网站建设实习wordpress 登陆 插件下载
  • 做耳鼻喉医院网站多少钱北京网站建设营销
  • 济南网站建设就选搜点网络ok外贸平台补贴政策
  • 网站建设 学校百度快照优化培训班
  • 做阀门的英文网站怎么写西安seo服务公司排名
  • 淘宝客网站如何做推广古董手表网站
  • 网站虚拟主机查询企业文化建设的内容有哪些
  • 财经大学网站建设apicloud wordpress
  • 平面设计网站排行榜刚进外贸公司一个月多少钱
  • 企业网站最下面的那栏叫啥广州编程培训机构哪里好
  • 怎么学建设网站网站建设敬请期待图片素材
  • 滴滴出行的网站是哪家公司做的新媒体营销课程心得体会
  • 中国室内设计师联盟网站浙江手机版建站系统开发
  • 网站开源代码模版广州公司注册核名查询系统官网
  • 海外网站seo丹阳市住房建设管理局网站
  • 定制公司网站沙市做网站weisword
  • 平湖模板网站建设公司网站建设项目报告书