重庆网站建设外贸,狼雨seo网站,公司网络销售网络推广方案,东莞网站排名优化费用文章目录 一. StreamTask核心组件与能力二. OneInputStreamTask接入网络数据并处理三. 处理数据1. StreamElement类别2. 业务数据处理逻辑 四. 小结 先来看数据是如何经过网络写入下游Task节点并通过算子进行处理的#xff0c;这里以OneInputStreamTask为例进行说明。
一. St… 文章目录 一. StreamTask核心组件与能力二. OneInputStreamTask接入网络数据并处理三. 处理数据1. StreamElement类别2. 业务数据处理逻辑 四. 小结 先来看数据是如何经过网络写入下游Task节点并通过算子进行处理的这里以OneInputStreamTask为例进行说明。
一. StreamTask核心组件与能力
如代码OneInputStreamTask.init()方法包含了初始化StreamTask主要核心组件的逻辑。
OneInputStreamTask
public void init() throws Exception {StreamConfig configuration getConfiguration();int numberOfInputs configuration.getNumberOfInputs();if (numberOfInputs 0) {// 创建CheckpointedInputGateCheckpointedInputGate inputGate createCheckpointedInputGate();TaskIOMetricGroup taskIOMetricGroup getEnvironment().getMetricGroup().getIOMetricGroup();taskIOMetricGroup.gauge(checkpointAlignmentTime, inputGate::getAlignmentDurationNanos);// 创建DataOutput组件DataOutputIN output createDataOutput();StreamTaskInputIN input createTaskInput(inputGate, output);// 创建StreamOneInputProcessorinputProcessor new StreamOneInputProcessor(input,output,getCheckpointLock(),operatorChain);}headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge);
getEnvironment().getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge::getValue); 创建CheckpointedInputGateCheckpointedInputGate是对InputGate进行封装实现对CheckpointBarrier对齐的功能。此组件可以接入上游Task实例写入指定InputChannel中的Buffer数据。创建DataOutput组件在StreamTaskInput中会将 接入的数据 通过DataOutput组件输出到算子链的HeaderOperator中。创建StreamTaskInput组件用于接收数据将InputGate和DataOutput作为内部成员完成对数据的接入和输出。创建StreamOneInputProcessor数据处理器此组件会被Task线程模型调度并执行实现周期性地从StreamTaskInput组件中读取数据元素并处理。 小结 OneInputStreamTask初始化过程中包括创建StreamTaskInput和DataOutput组件。 接下来了解StreamTask如何利用StreamTaskInput和DataOutput完成数据元素的接收并发送到算子链中进行处理。
二. OneInputStreamTask接入网络数据并处理
StreamTask.processInput()方法定义了处理数据的主要流程。 数据最终会通过MailboxProcessor调度与执行调用StreamOneInputProcessor.processInput()方法完成数据元素的获取和处理调度StreamOneInputProcessor组件串联并运行StreamTaskInput组件、DataOutput组件和OperatorChain组件最终完成数据元素的处理操作。 StreamTask.processInput()
protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {InputStatus status inputProcessor.processInput();// 上游如果还有数据则继续等待执行if (status InputStatus.MORE_AVAILABLE recordWriter.isAvailable()) {return;}// 上游如果没有数据则发送控制消息到控制器if (status InputStatus.END_OF_INPUT) {controller.allActionsCompleted();return;}CompletableFuture? jointFuture getInputOutputJointFuture(status);MailboxDefaultAction.Suspension suspendedDefaultAction controller.suspendDefaultAction();jointFuture.thenRun(suspendedDefaultAction::resume);
}接下来详细看StreamOneInputProcessor.processInput() emitNext()通过StreamTaskNetworkInput接收数据元素并返回InputStatus判断数据元素是否全部消费完毕。emitNext()会将DataOutput作为参数传递到方法内部用于将数据元素输出到算子链中。 public InputStatus processInput() throws Exception {InputStatus status input.emitNext(output);if (status InputStatus.END_OF_INPUT) {synchronized (lock) {operatorChain.endHeadOperatorInput(1);}}return status;
}StreamTaskNetworkInput.emitNext()处理数据逻辑。 //BufferOrEvent代表数据元素可以是Buffer类型也可以是事件类型
//比如CheckpointBarrier、TaskEvent等事件。public InputStatus emitNext(DataOutputT output) throws Exception {while (true) {// 从Deserializer中获取数据元素if (currentRecordDeserializer ! null) {DeserializationResult result currentRecordDeserializer.getNextRecord(deserializationDelegate);// 如果DeserializationResult对应的Buffer数据已经被消费则回收Buffer if (result.isBufferConsumed()) {currentRecordDeserializer.getCurrentBuffer().recycleBuffer();currentRecordDeserializer null;}// 如果result是完整的数据元素则调用processElement()方法进行处理if (result.isFullRecord()) {processElement(deserializationDelegate.getInstance(), output);return InputStatus.MORE_AVAILABLE;}}// 从checkpointedInputGate中拉取数据//如果bufferOrEvent为空则判断checkpointedInputGate是否已经关闭如果已经关闭了则直接返回END_OF_INPUT状态否则返回NOTHING_AVAILABLE状态。OptionalBufferOrEvent bufferOrEvent checkpointedInputGate.pollNext();// 如果有数据则调用processBufferOrEvent()方法进行处理if (bufferOrEvent.isPresent()) {processBufferOrEvent(bufferOrEvent.get());} else {// 如果checkpointedInputGate已关闭则返回END_OF_INPUTif (checkpointedInputGate.isFinished()) {checkState(checkpointedInputGate.getAvailableFuture().isDone(), Finished BarrierHandler should be available);if (!checkpointedInputGate.isEmpty()) {throw new IllegalStateException(Trailing data in checkpoint barrier handler.);}return InputStatus.END_OF_INPUT;}return InputStatus.NOTHING_AVAILABLE;}}
}三. 处理数据
1. StreamElement类别
StreamElement具体类别有StreamRecord、StreamStatus以及Watermark其中StreamRecord就是需要处理的业务数据Watermark则是上游传递下来的Watermark事件。
//StreamTaskNetworkInput.processElement()
private void processElement(StreamElement recordOrMark, DataOutputT output) throws Exception {// StreamRecord类型if (recordOrMark.isRecord()){output.emitRecord(recordOrMark.asRecord());// Watermark类型} else if (recordOrMark.isWatermark()) {statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), lastChannel);// LatencyMarker类型} else if (recordOrMark.isLatencyMarker()) {output.emitLatencyMarker(recordOrMark.asLatencyMarker());// StreamStatus类型} else if (recordOrMark.isStreamStatus()) {statusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(), lastChannel);} else {throw new UnsupportedOperationException(Unknown type of StreamElement);}
} 2. 业务数据处理逻辑
对于业务数据调用output.emitRecord(recordOrMark.asRecord())方法进行数据元素的输出操作然后通过DataOutput输出到算子链中进行处理。
如下方法调用operator处理实际就是在创建StreamTaskNetworkOutput时指定的算子链HeaderOperator。
OneInputStreamTask.StreamTaskNetworkOutput.emitRecord()
public void emitRecord(StreamRecordIN record) throws Exception {synchronized (lock) {//累加器计算消费数量numRecordsIn.inc();//通过算子链处理operator.setKeyContextElement1(record);operator.processElement(record);}
}四. 小结
Flink从InputGate中拉取数据元素并进行反序列化操作转换成StreamElement类型后再调用StreamTaskNetworkOutput.emitRecord()方法将数据元素推送到OperatorChain的HeaderOperator中进行处理。 《Flink设计与实现核心原理与源码解析》 – 张利兵