网站备案最新备案号,百度推广总部电话,wordpress本地评论插件,免费收录网站大全【README】
本文记录了flink sink操作#xff0c;输出目的存储器#xff08;中间件#xff09;包括
kafka#xff1b;es#xff1b;db#xff1b;等等有很多#xff1b;本文只给出了 sink2kafka的代码#xff1b;
本文使用的flink为 1.14.4 版本#xff1b;
本文部…【README】
本文记录了flink sink操作输出目的存储器中间件包括
kafkaesdb等等有很多本文只给出了 sink2kafka的代码
本文使用的flink为 1.14.4 版本
本文部分内容参考了 flink 官方文档如下
Kafka | Apache Flinkhttps://nightlies.apache.org/flink/flink-docs-master/zh/docs/connectors/datastream/kafka/ 【1】 flink sink2kafka
1场景
消费上游topic hello0415的数据并把数据流输出到下游kafka topic hell0416如我们在java框架中把数据库日志发送到 topic1 然后我想统计执行时间大于3秒的sql则需要把筛选后的sql 发送到 下游 topic2 就可以使用sink 来完成
2代码
/*** Description flink流输出到kafka下沉* author xiao tang* version 1.0.0* createTime 2022年04月16日*/
public class SinkTest1_Kafka {public static void main(String[] args) throws Exception {// 创建执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(4); // 设置全局并行度// 创建flink连接kafkaKafkaSource kafkaSource KafkaSource.Stringbuilder().setValueOnlyDeserializer(new SimpleStringSchema()).setProperties(KafkaConsumerProps._INS.getProps()).setTopics(hello0415).setGroupId(flink).build();DataStreamString kafkaStream env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), kafkaSource);// kafka生产者属性Properties kafkaProducerProps new Properties();kafkaProducerProps.put(ProducerConfig.ACKS_CONFIG, all);kafkaProducerProps.put(ProducerConfig.RETRIES_CONFIG, 3);kafkaProducerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, 1 * KfkNumConst._1K);kafkaProducerProps.put(ProducerConfig.LINGER_MS_CONFIG, 1);kafkaProducerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 32 * KfkNumConst._1M);// 把kafka数据流输出到sink topic hello0416KafkaSinkString sink KafkaSink.Stringbuilder().setKafkaProducerConfig(kafkaProducerProps).setBootstrapServers(192.168.163.201:9092,192.168.163.202:9092,192.168.163.203:9092).setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopic(hello0416).setValueSerializationSchema(new SimpleStringSchema()).build()).setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE).build();// 添加到sinkkafkaStream.sinkTo(sink);// 打印streamkafkaStream.print(kafkaStream);// 执行env.execute(kafkaSinkJob);}
}效果 【补充】
kafka 消费者属性
public enum KafkaConsumerProps {_INS;/* 1.创建kafka生产者的配置信息 */Properties props new Properties();private KafkaConsumerProps() {/*2.指定连接的kafka集群, broker-list */props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 192.168.163.201:9092,192.168.163.202:9092,192.168.163.203:9092);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.GROUP_ID_CONFIG, G1);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, latest);}public Properties getProps() {return props;}
}