小精灵网站在线做语文,计算机网络应用主要学什么,wordpress添加go,wordpress设置页面访问权限默认#xff1a;从topic中指定的group上次消费的位置开始消费。所以必须配置group.id参数从消费者组提交的偏移量开始读取分区#xff08;kafka或zookeeper中#xff09;。如果找不到分区的偏移量#xff0c;auto.offset.reset将使用属性中的设置。如果是默认行为(setStart…默认从topic中指定的group上次消费的位置开始消费。所以必须配置group.id参数从消费者组提交的偏移量开始读取分区kafka或zookeeper中。如果找不到分区的偏移量auto.offset.reset将使用属性中的设置。如果是默认行为(setStartFromGroupOffsets),那么任务从检查点重启按照重启前的offset进行消费如果直接重启不从检查点重启并且group.id不变程序会按照上次提交的offset的位置继续消费。如果group.id改变了则程序按照auto.offset.reset设置的属性进行消费。但是如果程序带有状态的算子还是建议使用检查点重启。final StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.enableCheckpointing(5000);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);Properties props new Properties();
props.setProperty(bootstrap.servers,KAFKA_BROKER);
props.setProperty(zookeeper.connect, ZK_HOST);
props.setProperty(group.id,GROUP_ID);
props.setProperty(key.deserializer, org.apache.kafka.common.serialization.StringDeserializer);
props.setProperty(value.deserializer, org.apache.kafka.common.serialization.StringDeserializer);
FlinkKafkaConsumer011String consumer new FlinkKafkaConsumer011(TOPIC, new SimpleStringSchema(), props);consumer.setStartFromGroupOffsets();注意以下五种方式运行时优先级都比KafkaProperties中配置的auto.offset.reset优先级高。方式一 : 指定topic, 指定partition的offset位置MapKafkaTopicPartition, Long offsets new HashedMap();
offsets.put(new KafkaTopicPartition(topic_name, 0), 11L);
offsets.put(new KafkaTopicPartition(topic_name, 1), 22L);
offsets.put(new KafkaTopicPartition(topic_name, 2), 33L);
consumer.setStartFromSpecificOffsets(offsets);MapKafkaTopicPartition, Long Long参数指定的offset位置KafkaTopicPartition构造函数有两个参数第一个为topic名字第二个为分区数.如果使用者需要读取在提供的偏移量映射中没有指定偏移量的分区则它将回退到setStartFromGroupOffsets()该特定分区的默认组偏移行为。当作业从故障中自动恢复或使用保存点手动恢复时这些起始位置配置方法不会影响起始位置。在恢复时每个Kafka分区的起始位置由存储在保存点或检查点中的偏移量确定。consumer.setStartFromSpecificOffsets(offsets);方式二: 从topic中最初的数据开始消费consumer.setStartFromEarliest();方式三: 从指定的时间戳开始consumer.setStartFromTimestamp(1559801580000l);对于每个分区时间戳大于或等于指定时间戳的记录将用作起始位置。如果分区的最新记录早于时间戳则只会从最新记录中读取分区。在此模式下Kafka中的已提交偏移将被忽略不会用作起始位置。时间戳指的是kafka中消息自带的时间戳。方式四 从最新的数据开始消费consumer.setStartFromLatest();方式五同一默认参见 https://mp.weixin.qq.com/s?__bizMzU5Mzk3MDA3Mwmid2247483866idx2sn6a3b458caf5bebf0171f9fbd834b7517chksmfe09172cc97e9e3a590f5ea2978d078b1b46d94f86bd344173fa69c1d63790b09d2fe173bffbtoken1856795336langzh_CN#rd