六安做网站的公司,wordpress教程 好看,做料理网站关键词怎么设置,合肥建设银行官网招聘网站Flink状态管理 状态概述状态分类 键控、按键分区状态概述值状态 ValueState列表状态 ListStateMap状态 MapState归约状态 ReducingState聚合状态 Aggregating State 算子状态概述列表状态 ListState联合列表状态 UnionListState广播状态 Broadcast State 状态有效期 (TTL)概述S… Flink状态管理 状态概述状态分类 键控、按键分区状态概述值状态 ValueState列表状态 ListStateMap状态 MapState归约状态 ReducingState聚合状态 Aggregating State 算子状态概述列表状态 ListState联合列表状态 UnionListState广播状态 Broadcast State 状态有效期 (TTL)概述StateTtlConfig 配置对象参数说明清理使用示例 状态后端 State Backend概述可用状态后端状态后端的配置 状态
概述 在流处理任务中数据会以连续的流的形式输入到Flink中而状态计算允许我们跟踪和处理这些输入数据的状态信息。状态可以是任何需要记录和使用的数据例如聚合计数、累积结果、窗口中的中间状态等。 Flink中的状态管理是指在流处理任务中对数据的状态进行有效管理和维护的过程。状态管理是非常重要的因为它允许我们在流式处理中维护和操作数据的状态信息以实现复杂的计算逻辑和应用需求。 状态分类 在Flink中Flink状态有两种系统状态Managed State和原始状态Raw State。通常使用系统状态而原始状态则需要自定义实现。 系统状态根据数据集是否按照某一个Key进行分区将状态分为算子状态Operator State和按键分区状态Keyed State。 1.系统状态 由Flink管理的全局状态可以在整个应用程序中共享。系统状态与算子或键无关可以被整个应用程序中的所有算子访问和更新。 2.原始状态 原始状态是一种低级别的状态表示形式它提供了一种灵活的方式来定义和管理状态。它允许开发人员自定义状态的存储和访问方式以满足特定的需求。 3.算子状态 用于在算子之间维护中间结果、聚合状态等。它与具体的算子实例绑定与其他算子实例的状态相互独立。算子状态是分布式的可以在故障恢复时进行检查点和状态恢复。 一个算子任务会按照并行度分为多个并行子任务执行而不同的子任务会占据不同的任务槽task slot。由于不同的slot在计算资源上是物理隔离的所以Flink能管理的状态在并行任务间是无法共享的每个状态只能针对当前子任务的实例有效。 4.按键分区状态 与流的键相关联的状态用于存储和管理与每个键相关的数据信息。按键分区状态能在Keyed Stream或Keyed ProcessFunction中使用。它会根据键将数据进行分区保证相同键的数据会被同一个状态管理。 很多有状态的操作如聚合、窗口都是要先做keyBy进行按键分区之后任务所进行的所有计算都应该只针对当前key有效所以状态也应该按照key彼此隔离。 键控、按键分区状态
概述 按键分区状态Keyed State是任务按照键key来访问和维护的状态。它就是以key为作用范围进行隔离。 注意 使用按键分区状态必须基于Keyed Stream。没有进行keyBy分区的Data Stream即使转换算子实现了对应的富函数类也不能通过运行时上下文访问按键分区状态。 Keyed State在Flink中分为不同类型具体支持的状态类型如下所示
ValueStateT:存储和访问单个值的状态通常是一个单一的状态值。它可以用于存储中间结果、累加器等ListStateT:存储和访问元素列表的状态通常用于按键分区的列表操作MapStateUK,UV:存储和访问键值对的状态通常用于需要以键-值对形式存储和检索数据的情况AggregatingStateIN,OUT:使用用户定义的聚合函数来逐个聚合元素的状态通常用于对数据进行聚合操作如计算平均值ReducingStateT:使用用户定义的reduce函数来逐个聚合元素的状态通常用于聚合操作如求和值状态 ValueState 值状态ValueState是Flink中的一种状态类型用于存储和访问单个值。它可以用于在状态中保存和维护一个单一的值。 值状态通常用于在状态中存储一些需要随时间更新的值例如计数器、累加器、最大/最小值等。 接口如下
// T是泛型表示状态的数据内容可以是任何具体的数据类型
public interface ValueStateT extends State {// 获取当前状态的值T value() throws IOException;// 对状态进行更新传入的参数value就是要覆写的状态值void update(T var1) throws IOException;
}创建一个状态描述器StateDescriptor来提供状态的基本信息状态描述器构造方法如下
public class ValueStateDescriptorT extends StateDescriptorValueStateT, T {// 需要传入状态的名称和类型public ValueStateDescriptor(String name, TypeInformationT typeInfo) {super(name, typeInfo, (Object)null);}
}当前输入数据与上一条数据差值比较 public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 从socket接收数据流SingleOutputStreamOperatorTuple2String, Integer streamSource env.socketTextStream(IP, 8086)// 将输入数据转换为Tuple2.map(new MapFunctionString, Tuple2String, Integer() {Overridepublic Tuple2String, Integer map(String value) throws Exception {String[] split value.split(,);return Tuple2.of(split[0], Integer.valueOf(split[1]));}})// 指定 watermark策略.assignTimestampsAndWatermarks(// 定义Watermark策略WatermarkStrategy.Tuple2String, IntegerforBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((value, ts) - value.f1 * 1000L));// keyBy分区KeyedStreamTuple2String, Integer, String keyByStream streamSource.keyBy(a - a.f0);keyByStream.process(new MyKeyedProcessFunction()).print();env.execute();}public static class MyKeyedProcessFunction extends KeyedProcessFunctionString, Tuple2String, Integer, String {// 定义状态ValueStateInteger lastState;/*** 在open方法中初始化状态** param configuration* throws Exception*/Overridepublic void open(Configuration configuration) throws Exception {super.open(configuration);// 创建一个状态描述器StateDescriptor来提供状态的基本信息 传入状态的名称和存储类型lastState getRuntimeContext().getState(new ValueStateDescriptorInteger(lastState, Types.INT));}Overridepublic void processElement(Tuple2String, Integer value, Context ctx, CollectorString out) throws Exception {// 取出上一条数据的水位值,注意Integer默认值是nullint lastValue lastState.value() null ? 0 : lastState.value();// 求水位线差值的绝对值5的数据Integer currentValue value.f1;if (Math.abs(currentValue - lastValue) 5) {out.collect(窗口 value.f0 数据 value 当前值 currentValue 上一条数据值 lastValue 差值5);}// 更新状态里的水位值lastState.update(currentValue);}}输入测试数据
nc -lk 8086
key1,5
key1,7
key1,13
key1,20
key1,10控制台输出结果
窗口key1 数据(key1,13) 当前值13 上一条数据值7 差值5
窗口key1 数据(key1,20) 当前值20 上一条数据值13 差值5
窗口key1 数据(key1,10) 当前值10 上一条数据值20 差值5列表状态 ListState 列表状态ListState是Flink中的一种状态类型用于存储和访问元素列表。它可以用于在状态中保存和维护一组元素并对列表中的元素进行添加、删除和访问操作。 列表状态通常用于需要在状态中保存多个元素的场景例如累积计算、聚合操作或缓冲区管理等。 在ListState接口中同样有一个类型参数T表示列表中数据的类型。
public interface ListStateT extends MergingStateT, IterableT {void update(ListT var1) throws Exception;void addAll(ListT var1) throws Exception;
}ListState也提供了一系列的方法来操作状态使用方式与一般的List非常相似。
IterableT get()获取当前的列表状态返回的是一个可迭代类型IterableTupdate(ListT values)传入一个列表values直接对状态进行覆盖add(T value)在状态列表中添加一个元素valueaddAll(ListT values)向列表中添加多个元素以列表values形式传入void clear() 清空List状态 本组数据ListState的状态描述器就叫作ListStateDescriptor用法跟ValueStateDescriptor完全一致。
定义一个描述列表状态的描述符。描述符指定状态的名称和类型状态描述器构造方法如下
public class ListStateDescriptorT extends StateDescriptorListStateT, ListT {public ListStateDescriptor(String name, TypeInformationT elementTypeInfo) {super(name, new ListTypeInfo(elementTypeInfo), (Object)null);}
} 取流中3个最大值且排序
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 从socket接收数据流SingleOutputStreamOperatorTuple2String, Integer streamSource env.socketTextStream(IP, 8086)// 将输入数据转换为Tuple2.map(new MapFunctionString, Tuple2String, Integer() {Overridepublic Tuple2String, Integer map(String value) throws Exception {String[] split value.split(,);return Tuple2.of(split[0], Integer.valueOf(split[1]));}})// 指定 watermark策略.assignTimestampsAndWatermarks(// 定义Watermark策略WatermarkStrategy.Tuple2String, IntegerforBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((value, ts) - value.f1 * 1000L));// keyBy分区KeyedStreamTuple2String, Integer, String keyByStream streamSource.keyBy(a - a.f0);keyByStream.process(new MyKeyedProcessFunction()).print();env.execute();}public static class MyKeyedProcessFunction extends KeyedProcessFunctionString, Tuple2String, Integer, String {// 定义状态ListStateInteger listState;/*** 在open方法中初始化状态** param configuration* throws Exception*/Overridepublic void open(Configuration configuration) throws Exception {super.open(configuration);// 创建一个状态描述器StateDescriptor来提供状态的基本信息 传入状态的名称和存储类型listState getRuntimeContext().getListState(new ListStateDescriptorInteger(listState, Types.INT));}Overridepublic void processElement(Tuple2String, Integer value, Context ctx, CollectorString out) throws Exception {// 来一条数据则存list状态里listState.add(value.f1);// 从list状态拿出来,得到一个IterableIterableInteger iterableList listState.get();// 拷贝到List中ListInteger list new ArrayList();for (Integer val : iterableList) {list.add(val);}// 对List进行降序排序list.sort((o1, o2) - o2 - o1);// list中的个数是连续变大的一但超过3个就立即清理if (list.size() 3) {// 元素清除清除第4个list.remove(3);}out.collect(keyBy value.f0 当前数据 value 最大3个水位值 list.toString());// 更新list状态listState.update(list);}}key1,1
key1,5
key1,7
key1,8
key1,9keyBykey1 当前数据(key1,1) 最大3个水位值[1]
keyBykey1 当前数据(key1,5) 最大3个水位值[5, 1]
keyBykey1 当前数据(key1,7) 最大3个水位值[7, 5, 1]
keyBykey1 当前数据(key1,8) 最大3个水位值[8, 7, 5]
keyBykey1 当前数据(key1,9) 最大3个水位值[9, 8, 7]Map状态 MapState Map 状态MapState是 Flink 中的一种状态类型用于存储和访问键值对。它可以用于在状态中保存和维护一组键值对。 Map 状态通常用于需要根据键进行查找和更新的场景例如缓存、索引、关联操作等。对应的是MapStateUK, UV接口有UK、UV两个泛型分别表示保存的key和value的类型。 MapState提供了操作映射状态的方法与Map的使用非常类似。另外MapState也提供了获取整个映射相关信息的方法
public interface MapStateUK, UV extends State {// 传入一个key作为参数查询对应的value值UV get(UK var1) throws Exception;// 传入一个键值对更新key对应的value值void put(UK var1, UV var2) throws Exception;// 将传入的映射map中所有的键值对全部添加到映射状态中void putAll(MapUK, UV var1) throws Exception;// 将指定key对应的键值对删除void remove(UK var1) throws Exception;// 判断是否存在指定的key返回一个boolean值boolean contains(UK var1) throws Exception;// 获取映射状态中所有的键值对IterableMap.EntryUK, UV entries() throws Exception;// 获取映射状态中所有的键key返回一个可迭代Iterable类型IterableUK keys() throws Exception;// 获取映射状态中所有的值value返回一个可迭代Iterable类型IterableUV values() throws Exception;// 获取迭代器IteratorMap.EntryUK, UV iterator() throws Exception;// 判断映射是否为空返回一个boolean值boolean isEmpty() throws Exception;
}模拟统计 数字 出现频率计数 public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 从socket接收数据流SingleOutputStreamOperatorTuple2String, Integer streamSource env.socketTextStream(IP, 8086)// 将输入数据转换为Tuple2.map(new MapFunctionString, Tuple2String, Integer() {Overridepublic Tuple2String, Integer map(String value) throws Exception {String[] split value.split(,);return Tuple2.of(split[0], Integer.valueOf(split[1]));}})// 指定 watermark策略.assignTimestampsAndWatermarks(// 定义Watermark策略WatermarkStrategy.Tuple2String, IntegerforBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((value, ts) - value.f1 * 1000L));// keyBy分区KeyedStreamTuple2String, Integer, String keyByStream streamSource.keyBy(a - a.f0);keyByStream.process(new MyKeyedProcessFunction()).print();env.execute();}public static class MyKeyedProcessFunction extends KeyedProcessFunctionString, Tuple2String, Integer, String {// 定义状态MapStateInteger, Integer mapState;/*** 在open方法中初始化状态** param configuration* throws Exception*/Overridepublic void open(Configuration configuration) throws Exception {super.open(configuration);// 创建一个状态描述器StateDescriptor来提供状态的基本信息 传入状态的名称和存储类型mapState getRuntimeContext().getMapState(new MapStateDescriptorInteger, Integer(mapState, Types.INT, Types.INT));}/*** 模拟统计 数字 出现频率计数*/Overridepublic void processElement(Tuple2String, Integer value, Context ctx, CollectorString out) throws Exception {// 判断是否存在对应的keyInteger number value.f1;if (mapState.contains(number)) {// 包含key直接对value1Integer count mapState.get(number);mapState.put(number, count);} else {// 不包含key初始化mapState.put(number, 1);}out.collect(keyBy value.f0 数字 number 出现次数 mapState.get(number));}}nc -lk 8086
key1,1
key1,1
key1,2
key1,3
key1,2
key1,1keyBykey1 数字1 出现次数1
keyBykey1 数字1 出现次数2
keyBykey1 数字2 出现次数1
keyBykey1 数字3 出现次数1
keyBykey1 数字2 出现次数2
keyBykey1 数字1 出现次数3归约状态 ReducingState 归约状态Reducing State是 Flink 中一种特殊类型的状态用于对输入流进行归约操作。归约操作将输入流中的元素逐个进行聚合生成一个汇总的结果值。不同于普通的 Map、List 或 Value 状态归约状态可以在接收到新的元素时对当前的状态值进行相应的归约操作。 归约状态ReducingState类似于值状态不过需要对添加进来的所有数据进行归约将归约聚合之后的值作为状态保存下来。 使用接口ReducingState调用的方法类似于ListState只不过它保存的只是一个聚合值调用.add()方法时不是在状态列表里添加元素而是直接把新数据和之前的状态进行归约并用得到的结果更新状态。
// 对Reducing状态获取结果
OUT get() throws Exception;
// 对Reducing状态添加数据
void add(IN var1) throws Exception;
// 对Reducing状态清空数据
void clear();创建一个状态描述器StateDescriptor来提供状态的基本信息状态描述器构造方法如下
public class ReducingStateDescriptorT extends StateDescriptorReducingStateT, T {private final ReduceFunctionT reduceFunction;public ReducingStateDescriptor(String name, ReduceFunctionT reduceFunction, TypeInformationT typeInfo) {super(name, typeInfo, (Object)null);this.reduceFunction (ReduceFunction)Preconditions.checkNotNull(reduceFunction);}
}使用归约状态来计算输入流中的累计和 public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 从socket接收数据流SingleOutputStreamOperatorTuple2String, Integer streamSource env.socketTextStream(IP, 8086)// 将输入数据转换为Tuple2.map(new MapFunctionString, Tuple2String, Integer() {Overridepublic Tuple2String, Integer map(String value) throws Exception {String[] split value.split(,);return Tuple2.of(split[0], Integer.valueOf(split[1]));}})// 指定 watermark策略.assignTimestampsAndWatermarks(// 定义Watermark策略WatermarkStrategy.Tuple2String, IntegerforBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((value, ts) - value.f1 * 1000L));// keyBy分区KeyedStreamTuple2String, Integer, String keyByStream streamSource.keyBy(a - a.f0);keyByStream.process(new MyKeyedProcessFunction()).print();env.execute();}public static class MyKeyedProcessFunction extends KeyedProcessFunctionString, Tuple2String, Integer, String {// 定义状态ReducingStateInteger reducingState;/*** 在open方法中初始化状态** param configuration* throws Exception*/Overridepublic void open(Configuration configuration) throws Exception {super.open(configuration);// 创建一个状态描述器StateDescriptor来提供状态的基本信息 传入状态的名称、reduceFunction、存储类型reducingState getRuntimeContext().getReducingState(new ReducingStateDescriptorInteger(reducingState, new ReduceFunctionInteger() {Overridepublic Integer reduce(Integer value1, Integer value2) throws Exception {return value1 value2;}}, Types.INT));}Overridepublic void processElement(Tuple2String, Integer value, Context ctx, CollectorString out) throws Exception {// 来一条数据则添加到reducing状态里reducingState.add(value.f1);// 对本组的Reducing状态获取结果Integer sum reducingState.get();out.collect(keyBy value.f0 当前数据 value 水位值合计 sum);}}key1,1
key1,2
key1,3keyBykey1 当前数据(key1,1) 水位值合计1
keyBykey1 当前数据(key1,2) 水位值合计3
keyBykey1 当前数据(key1,3) 水位值合计6聚合状态 Aggregating State 聚合状态是Flink 中一种特殊类型的状态用于对输入流进行聚合操作。聚合操作将输入流中的元素逐个进行聚合并生成一个汇总的结果值。与归约状态不同聚合状态可以在接收到新的元素时根据自定义的聚合逻辑对当前的状态值进行增量聚合。 AggregatingState接口相关方法
// 对Reducing状态获取结果
OUT get() throws Exception;
// 对Reducing状态添加数据
void add(IN var1) throws Exception;
// 对Reducing状态清空数据
void clear();与归约状态不同的是它的聚合逻辑是由在描述器中传入一个更加一般化的聚合函数AggregateFunction来定义的 。里面通过一个累加器Accumulator来表示状态所以聚合的状态类型可以跟添加进来的数据类型完全不同使用更加灵活。
public class AggregatingStateDescriptorIN, ACC, OUT extends StateDescriptorAggregatingStateIN, OUT, ACC {private final AggregateFunctionIN, ACC, OUT aggFunction;public AggregatingStateDescriptor(String name, AggregateFunctionIN, ACC, OUT aggFunction, TypeInformationACC stateType) {super(name, stateType, (Object)null);this.aggFunction (AggregateFunction)Preconditions.checkNotNull(aggFunction);}
}模拟统计 数字 出现频率计数 public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 从socket接收数据流SingleOutputStreamOperatorTuple2String, Integer streamSource env.socketTextStream(IP, 8086)// 将输入数据转换为Tuple2.map(new MapFunctionString, Tuple2String, Integer() {Overridepublic Tuple2String, Integer map(String value) throws Exception {String[] split value.split(,);return Tuple2.of(split[0], Integer.valueOf(split[1]));}})// 指定 watermark策略.assignTimestampsAndWatermarks(// 定义Watermark策略WatermarkStrategy.Tuple2String, IntegerforBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((value, ts) - value.f1 * 1000L));// keyBy分区KeyedStreamTuple2String, Integer, String keyByStream streamSource.keyBy(a - a.f0);keyByStream.process(new MyKeyedProcessFunction()).print();env.execute();}/*** param1 键的类型* param2 输入类型* param3 输出元素的类型*/public static class MyKeyedProcessFunction extends KeyedProcessFunctionString, Tuple2String, Integer, String {// 定义状态AggregatingStateInteger, HashMapInteger, Integer aggregatingState;/*** 在open方法中初始化状态** param configuration* throws Exception*/Overridepublic void open(Configuration configuration) throws Exception {super.open(configuration);// 创建一个状态描述器StateDescriptor来提供状态的基本信息 传入状态的名称、AggregateFunction、累加器类型aggregatingState getRuntimeContext().getAggregatingState(new AggregatingStateDescriptorInteger, HashMapInteger, Integer, HashMapInteger, Integer(aggregatingState,new MyAggregateFunction(),TypeInformation.of(new TypeHintHashMapInteger, Integer() {})));}Overridepublic void processElement(Tuple2String, Integer value, Context ctx, CollectorString out) throws Exception {// 将水位值添加到聚合状态中aggregatingState.add(value.f1);// 从聚合状态中获取结果HashMapInteger, Integer res aggregatingState.get();out.collect(keyBy value.f0 数字 value.f1 出现次数 res.get(value.f1));}}/*** param1 聚合的值的类型 (输入值)* param2 累加器的类型 (中间聚合状态)* param3 聚合结果的类型*/public static class MyAggregateFunction implements AggregateFunctionInteger, HashMapInteger, Integer, HashMapInteger, Integer {// 创建累加器类型HashMapInteger, IntegerOverridepublic HashMapInteger, Integer createAccumulator() {HashMapInteger, Integer map new HashMap();return map;}Overridepublic HashMapInteger, Integer add(Integer value, HashMapInteger, Integer accumulator) {if (accumulator.containsKey(value)) {Integer sum accumulator.get(value) 1;accumulator.put(value, sum);} else {accumulator.put(value, 1);}return accumulator;}Overridepublic HashMapInteger, Integer getResult(HashMapInteger, Integer accumulator) {return accumulator;}Overridepublic HashMapInteger, Integer merge(HashMapInteger, Integer a, HashMapInteger, Integer b) {return null;}}key1,1
key1,2
key1,3
key1,2
key1,3
key1,2keyBykey1 数字1 出现次数1
keyBykey1 数字2 出现次数1
keyBykey1 数字3 出现次数1
keyBykey1 数字2 出现次数2
keyBykey1 数字3 出现次数2
keyBykey1 数字2 出现次数3算子状态
概述 算子状态Operator State是 Flink 中一种用于保存和管理算子Operator状态的机制。算子状态通常用于在算子之间保持一些中间结果或者用于保存全局信息。 算子状态是一个算子并行实例上定义的状态作用范围被限定为当前算子任务。算子状态跟数据的key无关所以不同key的数据只要被分发到同一个并行子任务就会访问到同一个算子状态。 算子状态一般用在Source或Sink等与外部系统连接的算子上或者完全没有key定义的场景。 当算子的并行度发生变化时算子状态也支持在并行的算子任务实例之间做重组分配。根据状态的类型不同重组分配的方案也会不同。 在Flink中算子任务可以分为无状态和有状态两种情况。
无状态算子 无状态的算子任务只需要观察每个独立事件根据当前输入的数据直接转换输出结果。例如基本转换算子map、filter、flatMap等计算时不依赖其他数据就都属于无状态的算子。 有状态算子 有状态的算子任务除当前数据之外还需要一些其他数据来得到计算结果。其他数据就是所谓的状态。例如聚合算子、窗口算子都属于有状态的算子。 算子状态有以下几个特点
算子状态是与算子实例绑定的每个算子实例都会维护自己的状态。这意味着在并行计算中每个并行实例都会有独立的状态算子状态可以是一种类型也可以是多种类型的组合。常见的算子状态类型包括 ValueState、ListState、MapState 等算子状态可以在算子实例之间进行快速的备份和恢复以保证程序的容错性算子状态可以存储在内存中也可以通过配置选择将其存储在外部持久化存储系统中如 RocksDB算子状态也支持不同的结构类型主要有三种ListState、UnionListState和BroadcastState。
列表状态 ListState 在算子状态的上下文中不会按键分别处理状态每一个并行子任务上会保留一个列表 当算子并行度进行缩放调整时算子的列表状态中的所有元素项会被统一收集起来相当于把多个分区的列表合并成了一个大列表然后再均匀地分配给所有并行任务。 public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);// 从socket接收数据流DataStreamSourceString source env.socketTextStream(IP, 8086);source.map(new MyMapFunction()).print();env.execute();}// 实现CheckpointedFunction接口public static class MyMapFunction implements MapFunctionString, Integer, CheckpointedFunction {// 本地变量private Integer count 0;// 定义状态private ListStateInteger state;Overridepublic Integer map(String value) throws Exception {return count;}/*** 本地变量持久化将本地变量拷贝到算子状态中,开启checkpoint时才会调用*/Overridepublic void snapshotState(FunctionSnapshotContext context) throws Exception {System.out.println(snapshotState...);// 清空算子状态state.clear();// 将 本地变量 添加到 算子状态 中state.add(count);}/*** 初始化本地变量程序启动和恢复时 从状态中把数据添加到本地变量每个子任务调用一次*/Overridepublic void initializeState(FunctionInitializationContext context) throws Exception {System.out.println(initializeState...);// 从上下文初始化 子状态state context.getOperatorStateStore().getListState(new ListStateDescriptorInteger(state, Types.INT));// 从算子状态中把数据拷贝到本地变量if (context.isRestored()) {for (Integer val : state.get()) {count val;}}}}初始化本地变量方法与并行度设置有关
initializeState...
initializeState...输入测试数据
1
2
3
41 1
2 1
1 2
2 2联合列表状态 UnionListState 它与常规列表状态的区别在于算子并行度进行缩放调整时对于状态的分配方式不同。 在并行度调整时常规列表状态是轮询分配状态项而联合列表状态的算子则会直接广播状态的完整列表。 并行度缩放之后的并行子任务就获取到了联合后完整的大列表可以自行选择要使用的状态项和要丢弃的状态项。 /*** 初始化本地变量程序启动和恢复时 从状态中把数据添加到本地变量每个子任务调用一次*/Overridepublic void initializeState(FunctionInitializationContext context) throws Exception {System.out.println(initializeState...);// 从上下文初始化 子状态state context.getOperatorStateStore().getUnionListState(new ListStateDescriptorInteger(union-state, Types.INT));// 从算子状态中把数据拷贝到本地变量if (context.isRestored()) {for (Integer val : state.get()) {count val;}}}广播状态 Broadcast State 广播状态是 Flink 中一种特殊的算子状态类型可用于在流处理任务中将数据广播到所有并行任务中共享和访问。它适用于将少量的全局信息广播到算子的每个实例以便进行更灵活的计算。 因为广播状态在每个并行子任务上的实例都一样所以在并行度调整的时候就比较简单只要复制一份到新的并行任务就可以实现扩展。而对于并行度缩小的情况可以将多余的并行子任务连同状态直接删除因为状态都是复制出来的并不会丢失 广播状态具有以下特点
广播状态只需要占用少量的内存因为它通常用于存储比较小的全局数据或配置信息广播状态在整个任务中共享使得每个算子实例都可以访问广播状态中的数据而无需进行网络通信广播状态在任务开始时被广播并分发到每个算子实例保持数据的一致性更改广播状态示例 public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);// 从socket接收数据流SingleOutputStreamOperatorTuple2String, Integer sourceMap env.socketTextStream(IP, 8086)// 将输入数据转换为Tuple2.map(new MapFunctionString, Tuple2String, Integer() {Overridepublic Tuple2String, Integer map(String value) throws Exception {String[] split value.split(,);return Tuple2.of(split[0], Integer.valueOf(split[1]));}});// 广播配置流DataStreamSourceString dataStreamSource env.socketTextStream(IP, 8087);// 使用给定的名称和给定的类型信息新建一个MapStateDescriptorMapStateDescriptorString, Integer broadcastMapState new MapStateDescriptor(broadcast-state, Types.STRING, Types.INT);// 得到广播流BroadcastStreamString broadcastStream dataStreamSource.broadcast(broadcastMapState);// 数据流和广播配置流进行关联BroadcastConnectedStreamTuple2String, Integer, String broadcastConnectedStream sourceMap.connect(broadcastStream);// 调用 processbroadcastConnectedStream.process(new BroadcastProcessFunctionTuple2String, Integer, String, String() {/*** 数据流的处理* 数据流只能读取广播状态不能修改* param value 非广播侧的输入类型* param ctx 广播端的输入类型* param out 运算符的输出类型*/Overridepublic void processElement(Tuple2String, Integer value, ReadOnlyContext ctx, CollectorString out) throws Exception {// 通过上下文获取广播状态取出值ReadOnlyBroadcastStateString, Integer broadcastState ctx.getBroadcastState(broadcastMapState);Integer configValue broadcastState.get(myConfig);// 注意刚启动时可能是数据流的第一条数据先来configValue (configValue null ? 0 : configValue);if (value.f1 configValue) {out.collect(输入数字 value.f1 广播状态值 configValue);} else {out.collect(输入数字 value.f1 广播状态值 configValue);}}/*** 广播后配置流的处理* 只有广播流才能修改广播状态*/Overridepublic void processBroadcastElement(String value, Context ctx, CollectorString out) throws Exception {// 通过上下文获取广播状态往里面写数据BroadcastStateString, Integer broadcastState ctx.getBroadcastState(broadcastMapState);broadcastState.put(myConfig, Integer.valueOf(value));}}).print();env.execute();}输入测试数据
nc -lk 8086
key1,1
key1,2
key1,3输出
1 输入数字1 广播状态值0
2 输入数字2 广播状态值0
1 输入数字3 广播状态值0更改广播状态
nc -lk 8087
5输入测试数据
nc -lk 8086
key1,6
key1,8输出
2 输入数字6 广播状态值5
1 输入数字8 广播状态值5状态有效期 (TTL)
概述 状态效期、生存时间State TTLTime-to-Live是 Flink 中的一个功能用于为状态设置过期时间。通过设置状态生存时间可以自动清理过期的状态数据避免无限增长的状态。 任何类型的keyed state都可以有 有效期 (TTL)。如果配置了TTL且状态值已过期则会尽最大可能清除对应的值 所有状态类型都支持单元素的TTL。 这意味着列表元素和映射元素将独立到期。 StateTtlConfig 配置对象 配置状态的TTL时需要创建一个StateTtlConfig配置对象然后调用状态描述器的enableTimeToLive()方法启动TTL功能。 创建一个StateTtlConfig配置对象 StateTtlConfig stateTtlConfig StateTtlConfig.newBuilder(Time.seconds(10)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired).build();启动TTL功能 ValueStateDescriptorString valueStateDescriptor new ValueStateDescriptor(MyState, String.class);valueStateDescriptor.enableTimeToLive(stateTtlConfig);参数说明
newBuilder() 状态TTL配置的构造器方法必须调用返回一个Builder之后再调用.build()方法就可以得到StateTtlConfig。方法需要传入一个Time作为参数这就是设定的状态生存时间。 setUpdateType() 设置更新类型。更新类型指定了什么时候更新状态失效时间 DisabledTTL 已禁用。这意味着状态不会过期它将一直保持有效直到显式删除或状态存储由于其他原因而被清理OnCreateAndWrite表示只有创建状态和更改状态写操作时更新失效时间。配置默认为OnCreateAndWriteOnReadAndWrite表示无论读写操作都会更新失效时间也就是只要对状态进行了访问就表明它是活跃的从而延长生存时间setStateVisibility() 设置状态的可见性。状态可见性是指因为清除操作并不是实时的当状态过期之后还可能继续存在如果对它进行访问能否正常读取到是一个问题 NeverReturnExpired默认行为表示从不返回过期值也就是只要过期就认为它已经被清除不能继续读取ReturnExpireDefNotCleanedUp如果过期状态还存在就返回它的值清理
过期数据的清理 默认情况下过期数据会在读取的时候被删除同时会有后台线程定期清理StateBackend支持。可以通过StateTtlConfig配置关闭后台清理 StateTtlConfig ttlConfig StateTtlConfig.newBuilder(Time.seconds(1)).disableCleanupInBackground().build();全量快照时进行清理 可以启用全量快照时进行清理的策略这可以减少整个快照的大小。当前实现中不会清理本地的状态但从上次快照恢复时不会恢复那些已经删除的过期数据。 StateTtlConfig ttlConfig StateTtlConfig.newBuilder(Time.seconds(1)).cleanupFullSnapshot().build();增量数据清理
在状态访问或处理时进行会在存储后端保留一个所有状态的惰性全局迭代器。 每次触发增量清理时从迭代器中选择已经过期的数进行清理。 StateTtlConfig ttlConfig StateTtlConfig.newBuilder(Time.seconds(1))/*** cleanupSize 每次清理时检查状态的条目数在每个状态访问时触发* runCleanupForEveryRecord 表示是否在处理每条记录时触发清理*/.cleanupIncrementally(10, true).build();在RocksDB压缩时清理 如果使用RocksDBstatebackend则会启用Flink为RocksDB定制的压缩过滤器。RocksDB会周期性的对数据进行合并压缩从而减少存储空间。Flink提供的RocksDB压缩过滤器会在压缩时过滤掉已经过期的状态数据。 StateTtlConfig ttlConfig StateTtlConfig.newBuilder(Time.seconds(1)).cleanupInRocksdbCompactFilter(1000).build();使用示例 public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 从socket接收数据流SingleOutputStreamOperatorTuple2String, Integer streamSource env.socketTextStream(IP, 8086)// 将输入数据转换为Tuple2.map(new MapFunctionString, Tuple2String, Integer() {Overridepublic Tuple2String, Integer map(String value) throws Exception {String[] split value.split(,);return Tuple2.of(split[0], Integer.valueOf(split[1]));}})// 指定 watermark策略.assignTimestampsAndWatermarks(// 定义Watermark策略WatermarkStrategy.Tuple2String, IntegerforBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((value, ts) - value.f1 * 1000L));// keyBy分区KeyedStreamTuple2String, Integer, String keyByStream streamSource.keyBy(a - a.f0);keyByStream.process(new MyKeyedProcessFunction()).print();env.execute();}public static class MyKeyedProcessFunction extends KeyedProcessFunctionString, Tuple2String, Integer, String {// 定义状态ValueStateInteger lastState;/*** 在open方法中初始化状态** param configuration* throws Exception*/Overridepublic void open(Configuration configuration) throws Exception {super.open(configuration);// 创建StateTtlConfigStateTtlConfig stateTtlConfig StateTtlConfig.newBuilder(Time.seconds(5)) // 过期时间5s
// .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) // 状态 创建和写入更新 时更新过期时间.setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite) // 状态 读取、创建和写入更新 时更新 过期时间.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) // 不返回过期的状态值.build();// 状态描述器 启用TTLValueStateDescriptorInteger stateDescriptor new ValueStateDescriptor(lastState, Types.INT);stateDescriptor.enableTimeToLive(stateTtlConfig);this.lastState getRuntimeContext().getState(stateDescriptor);}Overridepublic void processElement(Tuple2String, Integer value, Context ctx, CollectorString out) throws Exception {// 获取状态值Integer lastValue lastState.value();out.collect(keyBy value.f0 状态值: lastValue);// 更新状态lastState.update(value.f1);}}快速输入测试数据
key1,1
key1,2
key1,4keyBykey1 状态值: null
keyBykey1 状态值: 1
keyBykey1 状态值: 2等待超过5秒输入测试数据
key1,6keyBykey1 状态值: null状态后端 State Backend
概述 状态后端是 Flink 中用于管理和持久化状态数据的机制。状态后端负责将算子状态和键控状态Keyed State存储在可靠且可恢复的存储系统中并提供对状态数据的读取和写入操作。 状态后端主要负责管理本地状态的存储方式和位置 可用状态后端
Flink内置了以下这些开箱即用的state backends 如果不设置默认使用HashMapStateBackend。两种状态后端最大的区别就在于本地状态存放在哪里 HashMapStateBackend: 哈希表状态后端EmbeddedRocksDBStateBackend:内嵌RocksDB状态后端1.HashMapStateBackend 在HashMapStateBackend内部数据以Java对象的形式存储在堆中。Key/value形式的状态和窗口算子会持有一个hash table其中存储着状态值、触发器。 具有以下特点
高性能由于状态存储在内存中哈希表状态后端提供极快的数据读取和写入性能低延迟状态的访问速度非常快因为无需进行磁盘或网络访问低容错性哈希表状态后端不提供持久化能力即在故障发生时可能会丢失状态数据。适用于开发和调试环境或对数据一致性要求较低的场景2.EmbeddedRocksDBStateBackend 将状态数据存储在硬盘上的RocksDB数据库中RocksDB是一种内嵌的key-value存储介质可以把数据持久化到本地硬盘。配置开启后会将处理中的数据全部放入RocksDB数据库中RocksDB默认存储在TaskManager的本地数据目录里。 RocksDB的状态数据被存储为序列化的字节数组读写操作需要序列化/反序列化因此状态的访问性能要差一些。另外因为做了序列化key的比较也会按照字节进行而不是直接调用.hashCode()和.equals()方法。 不同于HashMapStateBackend中的java对象状态数据被以序列化字节数组的方式存储需要序列化、反序列化因此key之间的比较是以字节序的形式进行而不是使用Java的调用.hashCode()和.equals()方法。 执行是异步快照不会因为保存检查点而阻塞数据的处理并且还提供了增量式保存检查点的机制在很多情况下可以大大提升保存效率。 具有以下特点
持久化和可恢复性内嵌RocksDB状态后端可将状态数据持久化到磁盘并在故障发生时能够恢复状态数据高容量由于状态存储在磁盘上内嵌RocksDB状态后端可以处理大规模的状态数据中等性能相较于哈希表状态后端内嵌RocksDB状态后端的读写性能略低。但由于RocksDB是一个高效的键值存储引擎它仍然提供了相对较好的读写性能状态后端的配置 默认状态后端是由集群配置文件flink-conf.yaml指定的配置的键名称为state.backend。 默认配置对集群上运行的所有作业都有效可以通过更改配置值来改变默认的状态后端。还可以在代码中为当前作业单独配置状态后端这个配置会覆盖掉集群配置文件的默认值。 1.配置默认全局的状态后端
在flink-conf.yaml中可以使用state.backend来配置默认状态后端。
# 默认状态后端,哈希表状态后端
state.backend.type: hashmap# 内嵌RocksDB状态后端
state.backend.type: rocksdb# 定义检查点和元数据写入的目录
state.checkpoints.dir: hdfs://node01:8020/flink/checkpoints2.设置每个Job的状态后端
使用hashmap状态后端
StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();
HashMapStateBackend hashMapStateBackend new HashMapStateBackend();env.setStateBackend(hashMapStateBackend);使用rocksdb状态后端
StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();
EmbeddedRocksDBStateBackend embeddedRocksDBStateBackend new EmbeddedRocksDBStateBackend();env.setStateBackend(embeddedRocksDBStateBackend);注意Flink发行版中默认包含了RocksDB(解压的Flink安装包)在IDE中使用rocksdb状态后端需要为Flink项目添加依赖
dependencygroupIdorg.apache.flink/groupIdartifactIdflink-statebackend-rocksdb/artifactIdversion${flink.version}/version
/dependency3.提交参数指定
flink run-application -t yarn-application-p 2 -Dstate.backend.typerocksdb -c 全类名 jar包