当前位置: 首页 > news >正文

深圳地区5g微波网站建设计划vps wordpress lnmp

深圳地区5g微波网站建设计划,vps wordpress lnmp,网站关键词优化步骤,诸城网络科技网站建设背景#xff1a; 广播状态可以用于规则表或者配置表的实时更新#xff0c;本文就是用一个欺诈检测的flink作业作为例子看一下BroadcastProcessFunction和KeyedBroadcastProcessFunction的使用 BroadcastProcessFunction和KeyedBroadcastProcessFunction的使用 1.首先看主流…背景 广播状态可以用于规则表或者配置表的实时更新本文就是用一个欺诈检测的flink作业作为例子看一下BroadcastProcessFunction和KeyedBroadcastProcessFunction的使用 BroadcastProcessFunction和KeyedBroadcastProcessFunction的使用 1.首先看主流程主流程中使用了两个Broadcast广播的状态这两个Broadcast广播的状态是独立的 // 这里面包含规则广播状态的两次使用方法,分别在DynamicKeyFunction处理函数和DynamicAlertFunction处理函数,注意这两个处理函数中的广播状态是独立的也就是需要分别维度不能共享// Processing pipeline setupDataStreamAlert alerts transactions.connect(rulesStream).process(new DynamicKeyFunction()).uid(DynamicKeyFunction).name(Dynamic Partitioning Function).keyBy((keyed) - keyed.getKey()).connect(rulesStream).process(new DynamicAlertFunction()).uid(DynamicAlertFunction).name(Dynamic Rule Evaluation Function);2.BroadcastProcessFunction的处理这里面会维护这个算子本身的广播状态并把所有的事件扩散发送到下一个算子 public class DynamicKeyFunctionextends BroadcastProcessFunctionTransaction, Rule, KeyedTransaction, String, Integer {Overridepublic void open(Configuration parameters) {}// 这里会把每个事件结合上广播状态中的每个规则生成N条记录流转到下一个算子Overridepublic void processElement(Transaction event, ReadOnlyContext ctx, CollectorKeyedTransaction, String, Integer out)throws Exception {ReadOnlyBroadcastStateInteger, Rule rulesState ctx.getBroadcastState(Descriptors.rulesDescriptor);forkEventForEachGroupingKey(event, rulesState, out);}// 独立维护广播状态,可以在广播状态中新增删除或者清空广播状态Overridepublic void processBroadcastElement(Rule rule, Context ctx, CollectorKeyedTransaction, String, Integer out) throws Exception {log.info({}, rule);BroadcastStateInteger, Rule broadcastState ctx.getBroadcastState(Descriptors.rulesDescriptor);handleRuleBroadcast(rule, broadcastState);if (rule.getRuleState() RuleState.CONTROL) {handleControlCommand(rule.getControlType(), broadcastState);}}}static void handleRuleBroadcast(Rule rule, BroadcastStateInteger, Rule broadcastState)throws Exception {switch (rule.getRuleState()) {case ACTIVE:case PAUSE:broadcastState.put(rule.getRuleId(), rule);break;case DELETE:broadcastState.remove(rule.getRuleId());break;}}3.KeyedBroadcastProcessFunction的处理,这里面也是会维护这个算子本身的广播状态此外还有键值分区状态特别注意的是在处理广播元素时可以用applyToKeyedState方法对所有的键值分区状态应用某个方法对于ontimer方法依然可以访问键值分区状态和广播状态 /** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements. See the NOTICE file* distributed with this work for additional information* regarding copyright ownership. The ASF licenses this file* to you under the Apache License, Version 2.0 (the* License); you may not use this file except in compliance* with the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an AS IS BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package com.ververica.field.dynamicrules.functions;import static com.ververica.field.dynamicrules.functions.ProcessingUtils.addToStateValuesSet; import static com.ververica.field.dynamicrules.functions.ProcessingUtils.handleRuleBroadcast;import com.ververica.field.dynamicrules.Alert; import com.ververica.field.dynamicrules.FieldsExtractor; import com.ververica.field.dynamicrules.Keyed; import com.ververica.field.dynamicrules.Rule; import com.ververica.field.dynamicrules.Rule.ControlType; import com.ververica.field.dynamicrules.Rule.RuleState; import com.ververica.field.dynamicrules.RuleHelper; import com.ververica.field.dynamicrules.RulesEvaluator.Descriptors; import com.ververica.field.dynamicrules.Transaction; import java.math.BigDecimal; import java.util.*; import java.util.Map.Entry; import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.accumulators.SimpleAccumulator; import org.apache.flink.api.common.state.BroadcastState; import org.apache.flink.api.common.state.MapState; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; import org.apache.flink.metrics.Meter; import org.apache.flink.metrics.MeterView; import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction; import org.apache.flink.util.Collector;/** Implements main rule evaluation and alerting logic. */ Slf4j public class DynamicAlertFunctionextends KeyedBroadcastProcessFunctionString, KeyedTransaction, String, Integer, Rule, Alert {private static final String COUNT COUNT_FLINK;private static final String COUNT_WITH_RESET COUNT_WITH_RESET_FLINK;private static int WIDEST_RULE_KEY Integer.MIN_VALUE;private static int CLEAR_STATE_COMMAND_KEY Integer.MIN_VALUE 1;private transient MapStateLong, SetTransaction windowState;private Meter alertMeter;private MapStateDescriptorLong, SetTransaction windowStateDescriptor new MapStateDescriptor(windowState,BasicTypeInfo.LONG_TYPE_INFO,TypeInformation.of(new TypeHintSetTransaction() {}));Overridepublic void open(Configuration parameters) {windowState getRuntimeContext().getMapState(windowStateDescriptor);alertMeter new MeterView(60);getRuntimeContext().getMetricGroup().meter(alertsPerSecond, alertMeter);}// 键值分区状态和广播状态联合处理在这个方法中可以更新键值分区状态然后广播状态只能读取Overridepublic void processElement(KeyedTransaction, String, Integer value, ReadOnlyContext ctx, CollectorAlert out)throws Exception {long currentEventTime value.getWrapped().getEventTime();addToStateValuesSet(windowState, currentEventTime, value.getWrapped());long ingestionTime value.getWrapped().getIngestionTimestamp();ctx.output(Descriptors.latencySinkTag, System.currentTimeMillis() - ingestionTime);Rule rule ctx.getBroadcastState(Descriptors.rulesDescriptor).get(value.getId());if (noRuleAvailable(rule)) {log.error(Rule with ID {} does not exist, value.getId());return;}if (rule.getRuleState() Rule.RuleState.ACTIVE) {Long windowStartForEvent rule.getWindowStartFor(currentEventTime);long cleanupTime (currentEventTime / 1000) * 1000;ctx.timerService().registerEventTimeTimer(cleanupTime);SimpleAccumulatorBigDecimal aggregator RuleHelper.getAggregator(rule);for (Long stateEventTime : windowState.keys()) {if (isStateValueInWindow(stateEventTime, windowStartForEvent, currentEventTime)) {aggregateValuesInState(stateEventTime, aggregator, rule);}}BigDecimal aggregateResult aggregator.getLocalValue();boolean ruleResult rule.apply(aggregateResult);ctx.output(Descriptors.demoSinkTag,Rule rule.getRuleId() | value.getKey() : aggregateResult.toString() - ruleResult);if (ruleResult) {if (COUNT_WITH_RESET.equals(rule.getAggregateFieldName())) {evictAllStateElements();}alertMeter.markEvent();out.collect(new Alert(rule.getRuleId(), rule, value.getKey(), value.getWrapped(), aggregateResult));}}}//维护广播状态新增/删除或者整个清空,值得注意的是处理广播元素时可以对所有的键值分区状态应用某个函数比如这里当收到某个属于控制消息的广播消息时使用applyToKeyedState方法把所有的键值分区状态都清空Overridepublic void processBroadcastElement(Rule rule, Context ctx, CollectorAlert out)throws Exception {log.info({}, rule);BroadcastStateInteger, Rule broadcastState ctx.getBroadcastState(Descriptors.rulesDescriptor);handleRuleBroadcast(rule, broadcastState);updateWidestWindowRule(rule, broadcastState);if (rule.getRuleState() RuleState.CONTROL) {handleControlCommand(rule, broadcastState, ctx);}}private void handleControlCommand(Rule command, BroadcastStateInteger, Rule rulesState, Context ctx) throws Exception {ControlType controlType command.getControlType();switch (controlType) {case EXPORT_RULES_CURRENT:for (Map.EntryInteger, Rule entry : rulesState.entries()) {ctx.output(Descriptors.currentRulesSinkTag, entry.getValue());}break;case CLEAR_STATE_ALL:ctx.applyToKeyedState(windowStateDescriptor, (key, state) - state.clear());break;case CLEAR_STATE_ALL_STOP:rulesState.remove(CLEAR_STATE_COMMAND_KEY);break;case DELETE_RULES_ALL:IteratorEntryInteger, Rule entriesIterator rulesState.iterator();while (entriesIterator.hasNext()) {EntryInteger, Rule ruleEntry entriesIterator.next();rulesState.remove(ruleEntry.getKey());log.info(Removed Rule {}, ruleEntry.getValue());}break;}}private boolean isStateValueInWindow(Long stateEventTime, Long windowStartForEvent, long currentEventTime) {return stateEventTime windowStartForEvent stateEventTime currentEventTime;}private void aggregateValuesInState(Long stateEventTime, SimpleAccumulatorBigDecimal aggregator, Rule rule) throws Exception {SetTransaction inWindow windowState.get(stateEventTime);if (COUNT.equals(rule.getAggregateFieldName())|| COUNT_WITH_RESET.equals(rule.getAggregateFieldName())) {for (Transaction event : inWindow) {aggregator.add(BigDecimal.ONE);}} else {for (Transaction event : inWindow) {BigDecimal aggregatedValue FieldsExtractor.getBigDecimalByName(rule.getAggregateFieldName(), event);aggregator.add(aggregatedValue);}}}private boolean noRuleAvailable(Rule rule) {// This could happen if the BroadcastState in this CoProcessFunction was updated after it was// updated and used in DynamicKeyFunctionif (rule null) {return true;}return false;}private void updateWidestWindowRule(Rule rule, BroadcastStateInteger, Rule broadcastState)throws Exception {Rule widestWindowRule broadcastState.get(WIDEST_RULE_KEY);if (rule.getRuleState() ! Rule.RuleState.ACTIVE) {return;}if (widestWindowRule null) {broadcastState.put(WIDEST_RULE_KEY, rule);return;}if (widestWindowRule.getWindowMillis() rule.getWindowMillis()) {broadcastState.put(WIDEST_RULE_KEY, rule);}}// ontimer方法中可以访问/更新键值分区状态读取广播状态此外ontimer方法和processElement方法以及processBroadcastElement方法是同步的不需要考虑并发访问的问题Overridepublic void onTimer(final long timestamp, final OnTimerContext ctx, final CollectorAlert out)throws Exception {Rule widestWindowRule ctx.getBroadcastState(Descriptors.rulesDescriptor).get(WIDEST_RULE_KEY);OptionalLong cleanupEventTimeWindow Optional.ofNullable(widestWindowRule).map(Rule::getWindowMillis);OptionalLong cleanupEventTimeThreshold cleanupEventTimeWindow.map(window - timestamp - window);cleanupEventTimeThreshold.ifPresent(this::evictAgedElementsFromWindow);}private void evictAgedElementsFromWindow(Long threshold) {try {IteratorLong keys windowState.keys().iterator();while (keys.hasNext()) {Long stateEventTime keys.next();if (stateEventTime threshold) {keys.remove();}}} catch (Exception ex) {throw new RuntimeException(ex);}}private void evictAllStateElements() {try {IteratorLong keys windowState.keys().iterator();while (keys.hasNext()) {keys.next();keys.remove();}} catch (Exception ex) {throw new RuntimeException(ex);}} } ps: ontimer方法和processElement方法是同步访问的没有并发的问题所以不需要考虑同时更新键值分区状态的线程安全问题 参考文献 https://flink.apache.org/2020/01/15/advanced-flink-application-patterns-vol.1-case-study-of-a-fraud-detection-system/
http://www.pierceye.com/news/925892/

相关文章:

  • 一站式网站建设与运营wordpress后台代码修改
  • 企业品牌类网站有哪些做网站建设的公司是什么类型
  • 自己制作的网站怎么做分页2022建站市场
  • 网贷审核网站怎么做wordpress 文章列表页
  • 搬家网站建设公司西安是哪个省市
  • php 网站 整合 数据库智能建站系统个人网站
  • 福田区罗湖区宝安区龙华区seo上首页排名
  • 网站建设业务员提成企业网站 需求
  • 做淘宝客网站 首选霍常亮国外网页设计
  • 天津小型企业网站设计方案网页升级访问每天自动更新 下载
  • 好的学习网站打广告壹搜网站建设优化排名
  • 响应式设计 手机网站手机自己制作app软件
  • 东方头条网站源码杭州正晖建设工程有限公司网站
  • 阿里巴巴网站建设与维护深圳民治网站建设
  • 郑州短视频代运营seo外链是什么
  • 网站建设公司 经营资质wordpress文学
  • 手机网站建设请示常州建设网站公司哪家好
  • 网站开发报价ppt重庆沙坪坝有哪些大学
  • 牛商网做的包装盒网站怎么在门户网站上发布
  • 北京网络公司建站成品app直播源码下载
  • 帮忙建站的公司百度收录好的网站排名
  • 芯火信息做网站怎么样郑州网站建设老牌公司
  • 龙华营销型网站建设在线生成短链接网址
  • 深圳做公司网站关键词规划师工具
  • 长春市建设信息网站sem代运营推广公司
  • 宜昌网站建设平台有经验的盐城网站开发
  • wordpress 众筹网站模板wordpress首页只显示一篇文章
  • 嘉兴seo网站推广网页设计与制作课程结构
  • 江苏 网站 备案百度站长之家工具
  • 新加坡 网站建设专业简历制作网站有哪些