大连企业网站制作,如何查看百度搜索指数,章丘建设网站,莱芜信息港Flink中的分流
在Flink中将数据流切分为多个子数据流#xff0c;子数据流称为”旁路输出数据流“。 #mermaid-svg-bnbf0HOpEUsgi9nh {font-family:trebuchet ms,verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-bnbf0HOpEUsgi9nh .error-icon{…Flink中的分流
在Flink中将数据流切分为多个子数据流子数据流称为”旁路输出数据流“。 #mermaid-svg-bnbf0HOpEUsgi9nh {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-bnbf0HOpEUsgi9nh .error-icon{fill:#552222;}#mermaid-svg-bnbf0HOpEUsgi9nh .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-bnbf0HOpEUsgi9nh .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-bnbf0HOpEUsgi9nh .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-bnbf0HOpEUsgi9nh .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-bnbf0HOpEUsgi9nh .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-bnbf0HOpEUsgi9nh .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-bnbf0HOpEUsgi9nh .marker{fill:#333333;stroke:#333333;}#mermaid-svg-bnbf0HOpEUsgi9nh .marker.cross{stroke:#333333;}#mermaid-svg-bnbf0HOpEUsgi9nh svg{font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-bnbf0HOpEUsgi9nh .label{font-family:"trebuchet ms",verdana,arial,sans-serif;color:#333;}#mermaid-svg-bnbf0HOpEUsgi9nh .cluster-label text{fill:#333;}#mermaid-svg-bnbf0HOpEUsgi9nh .cluster-label span{color:#333;}#mermaid-svg-bnbf0HOpEUsgi9nh .label text,#mermaid-svg-bnbf0HOpEUsgi9nh span{fill:#333;color:#333;}#mermaid-svg-bnbf0HOpEUsgi9nh .node rect,#mermaid-svg-bnbf0HOpEUsgi9nh .node circle,#mermaid-svg-bnbf0HOpEUsgi9nh .node ellipse,#mermaid-svg-bnbf0HOpEUsgi9nh .node polygon,#mermaid-svg-bnbf0HOpEUsgi9nh .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-bnbf0HOpEUsgi9nh .node .label{text-align:center;}#mermaid-svg-bnbf0HOpEUsgi9nh .node.clickable{cursor:pointer;}#mermaid-svg-bnbf0HOpEUsgi9nh .arrowheadPath{fill:#333333;}#mermaid-svg-bnbf0HOpEUsgi9nh .edgePath .path{stroke:#333333;stroke-width:2.0px;}#mermaid-svg-bnbf0HOpEUsgi9nh .flowchart-link{stroke:#333333;fill:none;}#mermaid-svg-bnbf0HOpEUsgi9nh .edgeLabel{background-color:#e8e8e8;text-align:center;}#mermaid-svg-bnbf0HOpEUsgi9nh .edgeLabel rect{opacity:0.5;background-color:#e8e8e8;fill:#e8e8e8;}#mermaid-svg-bnbf0HOpEUsgi9nh .cluster rect{fill:#ffffde;stroke:#aaaa33;stroke-width:1px;}#mermaid-svg-bnbf0HOpEUsgi9nh .cluster text{fill:#333;}#mermaid-svg-bnbf0HOpEUsgi9nh .cluster span{color:#333;}#mermaid-svg-bnbf0HOpEUsgi9nh div.mermaidTooltip{position:absolute;text-align:center;max-width:200px;padding:2px;font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:12px;background:hsl(80, 100%, 96.2745098039%);border:1px solid #aaaa33;border-radius:2px;pointer-events:none;z-index:100;}#mermaid-svg-bnbf0HOpEUsgi9nh :root{--mermaid-font-family:"trebuchet ms",verdana,arial,sans-serif;} 拆分流 正常处理 异常处理 数据读取 合法入库 异常监控 拆分流数据的方式
Split已经废弃不推荐使用FliterSideOut推荐使用
Fliter分流的Java实现 public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 指标明细DataStreamString detailMessage KafkaConfigUtil.buildSource(env).map((MapFunctionString, String) kafkaMessage - {JSONObject jsonobject null;try {jsonobject JSONObject.parseObject(kafkaMessage);} catch (Exception e) {LOG.warn(报文格式错误:{}, kafkaMessage);}if (null jsonobject || jsonobject.isEmpty()) {LOG.warn(报文内容不合法:{}, JSONObject.toJSONString(jsonobject));} else {if (!EventsServiceEnum.MapReduce.getValue().equals(jsonobject.get(service)) !EventsServiceEnum.Spark.getValue().equals(jsonobject.get(service))) {LOG.warn(报文所属服务不存在:{}, JSONObject.toJSONString(jsonobject));}}return JSONObject.toJSONString(jsonobject);});// 将原始流中包含demo的数据筛选出来DataStreamString diagnosisMessages detailMessage.filter((FilterFunctionString) kafkaMessage - (kafkaMessage.contains(demo))).map((MapFunctionString, String) sparkMessage - {// 为达到实验效果进行日志输出LOG.info([is demo message]:{}, sparkMessage);return sparkMessage;});env.execute(Flink Streaming Java API Skeleton);}
SideOut分流的Java实现 public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();System.out.println(【SideOutputDemo】);// 指标明细DataStreamString mainMessage KafkaConfigUtil.buildSource(env).map((MapFunctionString, String) kafkaMessage - {JSONObject jsonobject null;try {jsonobject JSONObject.parseObject(kafkaMessage);} catch (Exception e) {LOG.warn(报文格式错误:{}, kafkaMessage);}if (null jsonobject || jsonobject.isEmpty()) {LOG.warn(报文内容不合法:{}, JSONObject.toJSONString(jsonobject));} else {if (!EventsServiceEnum.MapReduce.getValue().equals(jsonobject.get(service)) !EventsServiceEnum.Spark.getValue().equals(jsonobject.get(service))) {LOG.warn(报文所属服务不存在:{}, JSONObject.toJSONString(jsonobject));}}return JSONObject.toJSONString(jsonobject);});// 定义一个切分(旁路输出)final OutputTagString outputTag new OutputTagString(Spark_END) {};SingleOutputStreamOperatorString sp mainMessage.process(new ProcessFunctionString, String() {Overridepublic void processElement(String s, Context context, CollectorString collector) throws Exception {// 向常规流主流中添加数据collector.collect(s);// 向旁路输出流中添加数据if (s.contains(AppPhaseEnum.Spark_APP_End.getValue())) {context.output(outputTag, s);}}});sp.map((MapFunctionString, String) sparkMessage - {LOG.info(主流的数据: {}, sparkMessage);return sparkMessage;});DataStreamString tag sp.getSideOutput(outputTag);tag.map((MapFunctionString, String) sparkMessage - {LOG.info(旁路[{}]的数据: {}, outputTag.getId(), sparkMessage);return sparkMessage;});env.execute(Flink Streaming Java API Skeleton);}SideOutPut 是 Flink 框架推荐的分流方法在使用 SideOutPut 时需要按照以下步骤进行 为每个分支流定义一个 SideOutPut。 为定义好的 SideOutPut发出数据。只有以下特定的函数才能通过Context上下文对象向旁路输出的SideOutPut发送数据。 ProcessFunction处理函数单流输入函数KeyedProcessFunction处理函数单流输入函数CoProcessFunction处理函数双流流输入函数KeyedCoProcessFunction处理函数双流流输入函数ProcessWindowFunction窗口函数全量计算函数ProcessAllWindowFunction窗口函数全量计算函数它与 ProcessWindowFunction 类似但是它会对窗口中的所有数据进行处理而不是仅处理触发窗口计算的数据。 例子中使用ProcessFunction实现流拆分。 根据SideOutPut 的ID标识获取旁路输出流进行数据继续处理。
拆分方式对比Split不支持链式拆分切分得到的流是不能进行再次切分的Fliter多分支流需要多次遍历原始流进行筛选。浪费集群的资源SideOut以多次进行拆分的支持链式拆分。