当前位置: 首页 > news >正文

重庆网站建设外贸狼雨seo网站

重庆网站建设外贸,狼雨seo网站,公司网络销售网络推广方案,东莞网站排名优化费用文章目录 一. StreamTask核心组件与能力二. OneInputStreamTask接入网络数据并处理三. 处理数据1. StreamElement类别2. 业务数据处理逻辑 四. 小结 先来看数据是如何经过网络写入下游Task节点并通过算子进行处理的#xff0c;这里以OneInputStreamTask为例进行说明。 一. St… 文章目录 一. StreamTask核心组件与能力二. OneInputStreamTask接入网络数据并处理三. 处理数据1. StreamElement类别2. 业务数据处理逻辑 四. 小结 先来看数据是如何经过网络写入下游Task节点并通过算子进行处理的这里以OneInputStreamTask为例进行说明。 一. StreamTask核心组件与能力 如代码OneInputStreamTask.init()方法包含了初始化StreamTask主要核心组件的逻辑。 OneInputStreamTask public void init() throws Exception {StreamConfig configuration getConfiguration();int numberOfInputs configuration.getNumberOfInputs();if (numberOfInputs 0) {// 创建CheckpointedInputGateCheckpointedInputGate inputGate createCheckpointedInputGate();TaskIOMetricGroup taskIOMetricGroup getEnvironment().getMetricGroup().getIOMetricGroup();taskIOMetricGroup.gauge(checkpointAlignmentTime, inputGate::getAlignmentDurationNanos);// 创建DataOutput组件DataOutputIN output createDataOutput();StreamTaskInputIN input createTaskInput(inputGate, output);// 创建StreamOneInputProcessorinputProcessor new StreamOneInputProcessor(input,output,getCheckpointLock(),operatorChain);}headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge); getEnvironment().getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge::getValue); 创建CheckpointedInputGateCheckpointedInputGate是对InputGate进行封装实现对CheckpointBarrier对齐的功能。此组件可以接入上游Task实例写入指定InputChannel中的Buffer数据。创建DataOutput组件在StreamTaskInput中会将 接入的数据 通过DataOutput组件输出到算子链的HeaderOperator中。创建StreamTaskInput组件用于接收数据将InputGate和DataOutput作为内部成员完成对数据的接入和输出。创建StreamOneInputProcessor数据处理器此组件会被Task线程模型调度并执行实现周期性地从StreamTaskInput组件中读取数据元素并处理。 小结 OneInputStreamTask初始化过程中包括创建StreamTaskInput和DataOutput组件。 接下来了解StreamTask如何利用StreamTaskInput和DataOutput完成数据元素的接收并发送到算子链中进行处理。 二. OneInputStreamTask接入网络数据并处理 StreamTask.processInput()方法定义了处理数据的主要流程。 数据最终会通过MailboxProcessor调度与执行调用StreamOneInputProcessor.processInput()方法完成数据元素的获取和处理调度StreamOneInputProcessor组件串联并运行StreamTaskInput组件、DataOutput组件和OperatorChain组件最终完成数据元素的处理操作。 StreamTask.processInput() protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {InputStatus status inputProcessor.processInput();// 上游如果还有数据则继续等待执行if (status InputStatus.MORE_AVAILABLE recordWriter.isAvailable()) {return;}// 上游如果没有数据则发送控制消息到控制器if (status InputStatus.END_OF_INPUT) {controller.allActionsCompleted();return;}CompletableFuture? jointFuture getInputOutputJointFuture(status);MailboxDefaultAction.Suspension suspendedDefaultAction controller.suspendDefaultAction();jointFuture.thenRun(suspendedDefaultAction::resume); }接下来详细看StreamOneInputProcessor.processInput() emitNext()通过StreamTaskNetworkInput接收数据元素并返回InputStatus判断数据元素是否全部消费完毕。emitNext()会将DataOutput作为参数传递到方法内部用于将数据元素输出到算子链中。 public InputStatus processInput() throws Exception {InputStatus status input.emitNext(output);if (status InputStatus.END_OF_INPUT) {synchronized (lock) {operatorChain.endHeadOperatorInput(1);}}return status; }StreamTaskNetworkInput.emitNext()处理数据逻辑。 //BufferOrEvent代表数据元素可以是Buffer类型也可以是事件类型 //比如CheckpointBarrier、TaskEvent等事件。public InputStatus emitNext(DataOutputT output) throws Exception {while (true) {// 从Deserializer中获取数据元素if (currentRecordDeserializer ! null) {DeserializationResult result currentRecordDeserializer.getNextRecord(deserializationDelegate);// 如果DeserializationResult对应的Buffer数据已经被消费则回收Buffer if (result.isBufferConsumed()) {currentRecordDeserializer.getCurrentBuffer().recycleBuffer();currentRecordDeserializer null;}// 如果result是完整的数据元素则调用processElement()方法进行处理if (result.isFullRecord()) {processElement(deserializationDelegate.getInstance(), output);return InputStatus.MORE_AVAILABLE;}}// 从checkpointedInputGate中拉取数据//如果bufferOrEvent为空则判断checkpointedInputGate是否已经关闭如果已经关闭了则直接返回END_OF_INPUT状态否则返回NOTHING_AVAILABLE状态。OptionalBufferOrEvent bufferOrEvent checkpointedInputGate.pollNext();// 如果有数据则调用processBufferOrEvent()方法进行处理if (bufferOrEvent.isPresent()) {processBufferOrEvent(bufferOrEvent.get());} else {// 如果checkpointedInputGate已关闭则返回END_OF_INPUTif (checkpointedInputGate.isFinished()) {checkState(checkpointedInputGate.getAvailableFuture().isDone(), Finished BarrierHandler should be available);if (!checkpointedInputGate.isEmpty()) {throw new IllegalStateException(Trailing data in checkpoint barrier handler.);}return InputStatus.END_OF_INPUT;}return InputStatus.NOTHING_AVAILABLE;}} }三. 处理数据 1. StreamElement类别 StreamElement具体类别有StreamRecord、StreamStatus以及Watermark其中StreamRecord就是需要处理的业务数据Watermark则是上游传递下来的Watermark事件。 //StreamTaskNetworkInput.processElement() private void processElement(StreamElement recordOrMark, DataOutputT output) throws Exception {// StreamRecord类型if (recordOrMark.isRecord()){output.emitRecord(recordOrMark.asRecord());// Watermark类型} else if (recordOrMark.isWatermark()) {statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), lastChannel);// LatencyMarker类型} else if (recordOrMark.isLatencyMarker()) {output.emitLatencyMarker(recordOrMark.asLatencyMarker());// StreamStatus类型} else if (recordOrMark.isStreamStatus()) {statusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(), lastChannel);} else {throw new UnsupportedOperationException(Unknown type of StreamElement);} } 2. 业务数据处理逻辑 对于业务数据调用output.emitRecord(recordOrMark.asRecord())方法进行数据元素的输出操作然后通过DataOutput输出到算子链中进行处理。 如下方法调用operator处理实际就是在创建StreamTaskNetworkOutput时指定的算子链HeaderOperator。 OneInputStreamTask.StreamTaskNetworkOutput.emitRecord() public void emitRecord(StreamRecordIN record) throws Exception {synchronized (lock) {//累加器计算消费数量numRecordsIn.inc();//通过算子链处理operator.setKeyContextElement1(record);operator.processElement(record);} }四. 小结 Flink从InputGate中拉取数据元素并进行反序列化操作转换成StreamElement类型后再调用StreamTaskNetworkOutput.emitRecord()方法将数据元素推送到OperatorChain的HeaderOperator中进行处理。 《Flink设计与实现核心原理与源码解析》 – 张利兵
http://www.pierceye.com/news/998964/

相关文章:

  • 怎么自己做网站挂到百度上通号建设集团有限公司
  • 建设图片展示网站林萌荣温州市网页制作
  • 企业门户网站内容建设如何一个空间做多个网站
  • 行业网站源码网站建设福永附近网络公司
  • 建设网站哪个便宜ps做网站的优点
  • 网站制作中的更多怎么做盘锦市建设银行网站
  • 广西响应式网站建设男女做暧网站
  • 网站建设中心开发公司对施工单位管理措施
  • 青岛网站建设方案优化宋祖儿在哪个网站做网红
  • 莆田网站制作价格wordpress占用带宽
  • 网站用图片做背景搜索引擎推广一般包括哪些
  • 网站首页设计html代码可以发广告的平台
  • 做网站的技术哪个简单泰安吧贴吧
  • 网站制作厂家政务网站开发方案
  • 爱站工具卡片式网站
  • 计算机网站开发图片湛江城乡建设局网站
  • 广州个性化网站开发代做网站关键词
  • 如何开发一个手机网站北京推广网站
  • 企业网站建设合作合同wordpress国产定制主题
  • 万网虚拟机怎么做两个网站网页设计实训步骤
  • 福田做网站公司怎么选择wordpress怎样在列表页使用瀑布流
  • 做导航网站用多大的空间广州天河区有哪些大学
  • 广州市城乡建设部网站首页做婚礼设计在哪个网站下载素材
  • 网站建设推广服务合同范本什么是电子商务专业?
  • 青岛网站建设公司电话棋牌室的网站怎么做
  • 网站更改公司需要重新备案吗传媒网站
  • 海诚网站建设青岛李村网站设计公司
  • 哪个网站可以宣传做的蛋糕网站商城微信支付接口申请
  • 如何做淘客推广网站可信赖的手机网站设计
  • 西城专业网站建设公司哪家好外贸网站优化谷歌关键词排名