怎么给自己的网站设置关键词,网站备份还原,seo查询 站长工具,毕设做网站答辩会要求当场演示吗文章目录 使用api 实现 topic 增删改查pom.xmllog4j.properties创建、查询 Topic生产者、消费者 api自定义 生产者分区发送策略自定义序列化器自定义 生产者拦截器offset 提交控制确认-acks 与 重试-retries幂等消息生产者事务生产者消费者事务 04_kafka_java-api 使用api 实现… 文章目录 使用api 实现 topic 增删改查pom.xmllog4j.properties创建、查询 Topic生产者、消费者 api自定义 生产者分区发送策略自定义序列化器自定义 生产者拦截器offset 提交控制确认-acks 与 重试-retries幂等消息生产者事务生产者消费者事务 04_kafka_java-api 使用api 实现增删改查 发布订阅 sub/ assign
使用api 实现 topic 增删改查
需要额外注意的是 ip-主机名 主机名映射需要在开发机器上配置
pom.xml
?xml version1.0 encodingUTF-8?
project xmlnshttp://maven.apache.org/POM/4.0.0xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersiongroupIdcn.qww/groupIdartifactIdkafka-demo/artifactIdversion1.0/versionpropertiesmaven.compiler.source8/maven.compiler.sourcemaven.compiler.target8/maven.compiler.targetproject.build.sourceEncodingUTF-8/project.build.sourceEncoding/propertiesdependencies!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --dependencygroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactIdversion2.2.0/version/dependency!-- https://mvnrepository.com/artifact/log4j/log4j --dependencygroupIdlog4j/groupIdartifactIdlog4j/artifactIdversion1.2.17/version/dependency!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api --dependencygroupIdorg.slf4j/groupIdartifactIdslf4j-api/artifactIdversion1.7.25/version/dependency!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 --dependencygroupIdorg.slf4j/groupIdartifactIdslf4j-log4j12/artifactIdversion1.7.25/version/dependency!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 --dependencygroupIdorg.apache.commons/groupIdartifactIdcommons-lang3/artifactIdversion3.9/version/dependency/dependencies/projectlog4j.properties
log4j.rootLogger info,consolelog4j.appender.console org.apache.log4j.ConsoleAppender
log4j.appender.console.Target System.out
log4j.appender.console.layout org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern %p %d{yyyy-MM-dd HH:mm:ss} %c - %m%n
创建、查询 Topic
package cn.qww.topic;import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.KafkaFuture;import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;public class TopicCreate {public static void main(String[] args) throws ExecutionException, InterruptedException {//配置连接参数Properties props new Properties();props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafka_1:9092,kafka_2:9092,kafka_3:9092);KafkaAdminClient adminClient (KafkaAdminClient) KafkaAdminClient.create(props);//创建TopicsListNewTopic newTopics Arrays.asList(new NewTopic(topic03, 2, (short) 3));CreateTopicsResult topics adminClient.createTopics(newTopics);MapString, KafkaFutureVoid values topics.values();System.out.println(values);//删除Topic// adminClient.deleteTopics(Arrays.asList(topic02));listTopic(adminClient);describeTopic(adminClient);adminClient.close();}/*** 查询topics*/private static void listTopic(KafkaAdminClient adminClient) throws ExecutionException, InterruptedException {KafkaFutureSetString nameFutures adminClient.listTopics().names();for (String name : nameFutures.get()) {System.out.println(topic_name: name);}}/*** describeTopic*/private static void describeTopic(KafkaAdminClient adminClient) throws ExecutionException, InterruptedException {DescribeTopicsResult describeTopics adminClient.describeTopics(Arrays.asList(topic02));MapString, TopicDescription tdm describeTopics.all().get();for (Map.EntryString, TopicDescription entry : tdm.entrySet()) {System.out.println(entry.getKey() \t entry.getValue());}}
}
生产者、消费者 api
producer
package cn.qww.curpdr;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class Producer {public static void main(String[] args) throws InterruptedException {//1.创建链接参数Properties propsnew Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,kafka_1:9092,kafka_2:9092,kafka_3:9092);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());//2.创建生产者KafkaProducerString,String producer new KafkaProducer(props);for(int i 0; i 10; i){Thread.sleep(100);ProducerRecordString, String record new ProducerRecord(topic03, key_ i, value_ i);producer.send(record);}producer.close();}
}consumer
package cn.qww.curpdr;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Iterator;
import java.util.Properties;
import java.util.regex.Pattern;public class Consumer {public static void main(String[] args) {//1.创建Kafka链接参数Properties propsnew Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,kafka_1:9092,kafka_2:9092,kafka_3:9092);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());props.put(ConsumerConfig.GROUP_ID_CONFIG,group01);//2.创建Topic消费者try (KafkaConsumerString, String consumer new KafkaConsumer(props)) {//3.订阅topic开头的消息队列consumer.subscribe(Pattern.compile(^topic.*$));while (true) {ConsumerRecordsString, String consumerRecords consumer.poll(Duration.ofSeconds(1));IteratorConsumerRecordString, String recordIterator consumerRecords.iterator();while (recordIterator.hasNext()) {ConsumerRecordString, String record recordIterator.next();String key record.key();String value record.value();long offset record.offset();int partition record.partition();System.out.println(key: key ,value: value ,partition: partition ,offset: offset);}}}}
}自定义 生产者分区发送策略 默认生产者分发策略消息有key 使用hash没有key 使用轮询 自定义分区类
package cn.qww;public class Config {public static String BOOTSTRAP_SERVERS_CONFIG kafka_1:9092,kafka_2:9092,kafka_3:9092;
}---
package cn.qww.c_custompartition;import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.utils.Utils;import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;public class CustomPartition implements Partitioner {private AtomicInteger atomicInteger new AtomicInteger(0);Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {int numPartitions cluster.partitionsForTopic(topic).size();if (keyBytes null || keyBytes.length 0) {int i atomicInteger.addAndGet(1);int nodeI (i Integer.MAX_VALUE) % numPartitions;System.out.println(nodeI);return nodeI;} else {return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;}}Overridepublic void close() {System.out.println(close);}Overridepublic void configure(MapString, ? configs) {System.out.println(configure);}
}
生产者
package cn.qww.c_custompartition;import cn.qww.Config;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class ProducerUseCustomPartition {public static void main(String[] args) {//1.创建链接参数Properties props new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Config.BOOTSTRAP_SERVERS_CONFIG);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartition.class.getName());//2.创建生产者KafkaProducerString, String producer new KafkaProducer(props);for (Integer i 0; i 6; i) {ProducerRecordString, String record new ProducerRecord(topic04, value_ i);
// ProducerRecordString, String record new ProducerRecord(topic04, key_ i,value_ i);producer.send(record);}producer.close();}
}
消费者
package cn.qww.c_custompartition;import cn.qww.Config;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Iterator;
import java.util.Properties;
import java.util.regex.Pattern;public class ConsumerTest {public static void main(String[] args) {//1.创建Kafka链接参数Properties propsnew Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Config.BOOTSTRAP_SERVERS_CONFIG);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());props.put(ConsumerConfig.GROUP_ID_CONFIG,group01);//2.创建Topic消费者KafkaConsumerString,String consumer new KafkaConsumer(props);//3.订阅topic开头的消息队列consumer.subscribe(Pattern.compile(^topic.*$));while (true){ConsumerRecordsString, String consumerRecords consumer.poll(Duration.ofSeconds(1));IteratorConsumerRecordString, String recordIterator consumerRecords.iterator();while (recordIterator.hasNext()){ConsumerRecordString, String record recordIterator.next();String key record.key();String value record.value();long offset record.offset();int partition record.partition();System.out.println(key:key,value:value,partition:partition,offset:offset);}}}
}
自定义序列化器
序列化器
package cn.qww.d_serializer;import org.apache.commons.lang3.SerializationUtils;
import org.apache.kafka.common.serialization.Serializer;import java.io.Serializable;
import java.util.Map;public class ObjectSerializer implements SerializerObject {Overridepublic void configure(MapString, ? configs, boolean isKey) {System.out.println(configure);}Overridepublic byte[] serialize(String topic, Object data) {// commons-lang3 包return SerializationUtils.serialize((Serializable) data);}Overridepublic void close() {System.out.println(close);}
} 反序列化器 package cn.qww.d_serializer;import org.apache.commons.lang3.SerializationUtils;
import org.apache.kafka.common.serialization.Deserializer;import java.util.Map;public class ObjectDeserializer implements DeserializerObject {Overridepublic void configure(MapString, ? configs, boolean isKey) {System.out.println(configure);}Overridepublic Object deserialize(String topic, byte[] data) {return SerializationUtils.deserialize(data);}Overridepublic void close() {System.out.println(close);}
} 自定义类 package cn.qww.d_serializer;import java.io.Serializable;public class CustomObj implements Serializable {private Integer id;private String name;public CustomObj(Integer id, String name) {this.id id;this.name name;}public Integer getId() {return id;}public void setId(Integer id) {this.id id;}public String getName() {return name;}public void setName(String name) {this.name name;}Overridepublic String toString() {return CustomObj{ id id , name name \ };}
} 消费者 package cn.qww.d_serializer;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Properties;
import java.util.regex.Pattern;public class SerConsumer {public static void main(String[] args) {//1.创建Kafka链接参数Properties propsnew Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,cn.qww.Config.BOOTSTRAP_SERVERS_CONFIG);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,ObjectDeserializer.class.getName());props.put(ConsumerConfig.GROUP_ID_CONFIG,group01);//2.创建Topic消费者KafkaConsumerString, CustomObj consumernew KafkaConsumerString, CustomObj(props);consumer.subscribe(Arrays.asList(topic02));while (true){ConsumerRecordsString, CustomObj consumerRecords consumer.poll(Duration.ofSeconds(1));IteratorConsumerRecordString, CustomObj recordIterator consumerRecords.iterator();while (recordIterator.hasNext()){ConsumerRecordString, CustomObj record recordIterator.next();String key record.key();CustomObj value record.value();long offset record.offset();int partition record.partition();System.out.println(key:key,value:value,partition:partition,offset:offset);}}}
} 生产者 package cn.qww.d_serializer;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class SerProducer {public static void main(String[] args) {//1.创建链接参数Properties props new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cn.qww.Config.BOOTSTRAP_SERVERS_CONFIG);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ObjectSerializer.class.getName());//2.创建生产者KafkaProducerString, CustomObj producer new KafkaProducerString, CustomObj(props);for (Integer i 0; i 10; i) {ProducerRecordString, CustomObj record new ProducerRecord(topic02, key i, new CustomObj(i, name_ i));producer.send(record);}producer.close();}
}
自定义 生产者拦截器 拦截器类 package cn.qww.e_interceptors;import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;import java.util.Map;public class CustomProducerInterceptor implements ProducerInterceptor {Overridepublic ProducerRecord onSend(ProducerRecord record) {ProducerRecord wrapRecord new ProducerRecord(record.topic(), record.key() _suf, record.value() _val_suf);wrapRecord.headers().add(header:, header_val.getBytes());System.out.println(topic: wrapRecord.topic() , partition: wrapRecord.partition());return wrapRecord;}Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {System.out.println(metadata: metadata , exception: exception);}Overridepublic void close() {System.out.println(close);}Overridepublic void configure(MapString, ? configs) {System.out.println(configure);}
} 消费者 package cn.qww.e_interceptors;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Properties;
import java.util.regex.Pattern;public class IcptConsumer {public static void main(String[] args) {//1.创建Kafka链接参数Properties props new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cn.qww.Config.BOOTSTRAP_SERVERS_CONFIG);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.GROUP_ID_CONFIG, group01);//2.创建Topic消费者KafkaConsumerString, String consumer new KafkaConsumerString, String(props);consumer.subscribe(Arrays.asList(topic01));while (true) {ConsumerRecordsString, String consumerRecords consumer.poll(Duration.ofSeconds(1));IteratorConsumerRecordString, String recordIterator consumerRecords.iterator();while (recordIterator.hasNext()) {ConsumerRecordString, String record recordIterator.next();String key record.key();String value record.value();long offset record.offset();int partition record.partition();System.out.println(key: key ,value: value ,partition: partition ,offset: offset ,header: record.headers());}}}
} 生产者 package cn.qww.e_interceptors;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class IcptProducer {public static void main(String[] args) {//1.创建链接参数Properties props new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cn.qww.Config.BOOTSTRAP_SERVERS_CONFIG);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, CustomProducerInterceptor.class.getName());//2.创建生产者KafkaProducerString, String producer new KafkaProducerString, String(props);for (Integer i 0; i 10; i) {ProducerRecordString, String record new ProducerRecord(topic01, key i, error i);producer.send(record);}producer.close();}
}
offset 提交控制
第一次连接消息队列 auto.offset.reset 设置
package cn.qww.f_offset_autocommit;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Properties;public class AutoOffsetResetConfigConsumer {public static void main(String[] args) {//1.创建Kafka链接参数Properties props new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cn.qww.Config.BOOTSTRAP_SERVERS_CONFIG);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 第一次访问时 读到历史的消息
// props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,earliest);
// props.put(ConsumerConfig.GROUP_ID_CONFIG, group01);props.put(ConsumerConfig.GROUP_ID_CONFIG, g2);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, earliest);//2.创建Topic消费者KafkaConsumerString, String consumer new KafkaConsumer(props);//3.订阅topic开头的消息队列consumer.subscribe(Arrays.asList(topic01));while (true) {ConsumerRecordsString, String consumerRecords consumer.poll(Duration.ofSeconds(1));IteratorConsumerRecordString, String recordIterator consumerRecords.iterator();while (recordIterator.hasNext()) {ConsumerRecordString, String record recordIterator.next();String key record.key();String value record.value();long offset record.offset();int partition record.partition();System.out.println(key: key ,value: value ,partition: partition ,offset: offset);}}}
}
关闭自动提交
package cn.qww.f_offset_autocommit;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.regex.Pattern;public class CloseAutoCommitConsumer {public static void main(String[] args) {//1.创建Kafka链接参数Properties props new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cn.qww.Config.BOOTSTRAP_SERVERS_CONFIG);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.GROUP_ID_CONFIG, g4);props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, earliest);//2.创建Topic消费者KafkaConsumerString, String consumer new KafkaConsumer(props);consumer.subscribe(Arrays.asList(topic01));while (true) {ConsumerRecordsString, String consumerRecords consumer.poll(Duration.ofSeconds(1));IteratorConsumerRecordString, String recordIterator consumerRecords.iterator();while (recordIterator.hasNext()) {ConsumerRecordString, String record recordIterator.next();String key record.key();String value record.value();long offset record.offset();int partition record.partition();MapTopicPartition, OffsetAndMetadata offsets new HashMapTopicPartition, OffsetAndMetadata();offsets.put(new TopicPartition(record.topic(), partition), new OffsetAndMetadata(offset 1));consumer.commitAsync(offsets, (offsets1, exception) - System.out.println(完成 offsets1 提交));System.out.println(key: key ,value: value ,partition: partition ,offset: offset);}}}
}
调整自动提交间隔 auto.commit.interval.ms
package cn.qww.f_offset_autocommit;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Iterator;
import java.util.Properties;
import java.util.regex.Pattern;public class ModifyCommitIntervalConsumer {public static void main(String[] args) {//1.创建Kafka链接参数Properties propsnew Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,cn.qww.Config.BOOTSTRAP_SERVERS_CONFIG);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());props.put(ConsumerConfig.GROUP_ID_CONFIG,group01);props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,10000);//2.创建Topic消费者KafkaConsumerString,String consumernew KafkaConsumerString, String(props);//3.订阅topic开头的消息队列consumer.subscribe(Pattern.compile(^topic.*$));while (true){ConsumerRecordsString, String consumerRecords consumer.poll(Duration.ofSeconds(1));IteratorConsumerRecordString, String recordIterator consumerRecords.iterator();while (recordIterator.hasNext()){ConsumerRecordString, String record recordIterator.next();String key record.key();String value record.value();long offset record.offset();int partition record.partition();System.out.println(key:key,value:value,partition:partition,offset:offset);}}}
}生产者
package cn.qww.f_offset_autocommit;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class Producer {public static void main(String[] args) {//1.创建链接参数Properties props new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cn.qww.Config.BOOTSTRAP_SERVERS_CONFIG);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());//2.创建生产者KafkaProducerString, String producer new KafkaProducerString, String(props);for (Integer i 0; i 10; i) {ProducerRecordString, String record new ProducerRecord(topic01, key i, value i);producer.send(record);}producer.close();}
}确认-acks 与 重试-retries
生产者配置 acks 与 retries
package cn.qww.g_acks_retries;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.io.IOException;
import java.util.Properties;public class AckRetriesProducer {public static void main(String[] args) throws IOException {//1.创建链接参数Properties props new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cn.qww.Config.BOOTSTRAP_SERVERS_CONFIG);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 1);props.put(ProducerConfig.ACKS_CONFIG, all);props.put(ProducerConfig.RETRIES_CONFIG, 4);//2.创建生产者KafkaProducerString, String producer new KafkaProducerString, String(props);ProducerRecordString, String record new ProducerRecord(topic04, key-ack, value-retries );producer.send(record);producer.flush();producer.close();
// System.in.read();}
}
消费者
package cn.qww.g_acks_retries;import cn.qww.Config;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Properties;
import java.util.regex.Pattern;public class ConsumerTest {public static void main(String[] args) {//1.创建Kafka链接参数Properties props new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Config.BOOTSTRAP_SERVERS_CONFIG);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.GROUP_ID_CONFIG, group01);//2.创建Topic消费者KafkaConsumerString, String consumer new KafkaConsumer(props);consumer.subscribe(Arrays.asList(topic04));while (true) {ConsumerRecordsString, String consumerRecords consumer.poll(Duration.ofSeconds(1));IteratorConsumerRecordString, String recordIterator consumerRecords.iterator();while (recordIterator.hasNext()) {ConsumerRecordString, String record recordIterator.next();String key record.key();String value record.value();long offset record.offset();int partition record.partition();System.out.println(key: key ,value: value ,partition: partition ,offset: offset);}}}
}
幂等消息
生产者设置幂等消息enable.idempotencetrue
package cn.qww.i_idempotence;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class IdempotenceProducer {public static void main(String[] args) {//1.创建链接参数Properties props new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cn.qww.Config.BOOTSTRAP_SERVERS_CONFIG);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 1);props.put(ProducerConfig.ACKS_CONFIG, -1);props.put(ProducerConfig.RETRIES_CONFIG, 3);props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);//2.创建生产者KafkaProducerString, String producer new KafkaProducerString, String(props);ProducerRecordString, String record new ProducerRecord(topic04, key-idempotence, value-idempotence );producer.send(record);producer.flush();producer.close();}
}消费者
package cn.qww.i_idempotence;import cn.qww.Config;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Properties;public class ConsumerTest {public static void main(String[] args) {//1.创建Kafka链接参数Properties props new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Config.BOOTSTRAP_SERVERS_CONFIG);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.GROUP_ID_CONFIG, group01);//2.创建Topic消费者KafkaConsumerString, String consumer new KafkaConsumer(props);consumer.subscribe(Arrays.asList(topic04));while (true) {ConsumerRecordsString, String consumerRecords consumer.poll(Duration.ofSeconds(1));IteratorConsumerRecordString, String recordIterator consumerRecords.iterator();while (recordIterator.hasNext()) {ConsumerRecordString, String record recordIterator.next();String key record.key();String value record.value();long offset record.offset();int partition record.partition();System.out.println(key: key ,value: value ,partition: partition ,offset: offset);}}}
}生产者事务
生产者开启事务
package cn.qww.h_transaction;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;
import java.util.UUID;public class TxProducer {public static void main(String[] args) {//1.创建链接参数Properties props new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cn.qww.Config.BOOTSTRAP_SERVERS_CONFIG);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 事务 id 具有唯一性props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, tx-id UUID.randomUUID().toString());// 配置批处理size/*props.put(ProducerConfig.BATCH_SIZE_CONFIG, 20);// batch 中数据不足时等待时间 10msprops.put(ProducerConfig.LINGER_MS_CONFIG, 10);*///2.创建生产者KafkaProducerString, String producer new KafkaProducerString, String(props);producer.initTransactions();//初始化事务try {producer.beginTransaction();//开启事务for (Integer i 0; i 10; i) {if (i 7) {int k i / 0;System.out.println(k);}ProducerRecordString, String record new ProducerRecord(topic04, key_tx_ i, tx_val_ i);producer.send(record);producer.flush();}producer.commitTransaction();//提交事务} catch (Exception e) {e.printStackTrace();producer.abortTransaction();//终止事务}producer.close();}
}
消费者 设置读已提交read_committed
package cn.qww.h_transaction;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Properties;
import java.util.regex.Pattern;public class ReadCommittedConsumer {public static void main(String[] args) {//1.创建Kafka链接参数Properties propsnew Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,cn.qww.Config.BOOTSTRAP_SERVERS_CONFIG);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());props.put(ConsumerConfig.GROUP_ID_CONFIG,group01);// 默认值 read_uncommitted
// props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,read_committed);//2.创建Topic消费者KafkaConsumerString,String consumernew KafkaConsumerString, String(props);consumer.subscribe(Arrays.asList(topic04));while (true){ConsumerRecordsString, String consumerRecords consumer.poll(Duration.ofSeconds(1));IteratorConsumerRecordString, String recordIterator consumerRecords.iterator();while (recordIterator.hasNext()){ConsumerRecordString, String record recordIterator.next();String key record.key();String value record.value();long offset record.offset();int partition record.partition();System.out.println(key:key,value:value,partition:partition,offset:offset);}}}
}
生产者消费者事务 上游生产者—(msg)— 【topic04 中游业务 —(msg_processed)— topic02】 下游消费者 上游生产者
package cn.qww.h_transaction.conxpdr;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;
import java.util.UUID;public class SrcProducer {public static void main(String[] args) {//1.创建链接参数Properties props new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cn.qww.Config.BOOTSTRAP_SERVERS_CONFIG);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 事务 id 具有唯一性props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, tx-id UUID.randomUUID().toString());// 配置批处理size/*props.put(ProducerConfig.BATCH_SIZE_CONFIG, 20);// batch 中数据不足时等待时间 10msprops.put(ProducerConfig.LINGER_MS_CONFIG, 10);*///2.创建生产者KafkaProducerString, String producer new KafkaProducerString, String(props);producer.initTransactions();//初始化事务try {producer.beginTransaction();//开启事务for (Integer i 0; i 10; i) {/*if (i 7) {int k i / 0;System.out.println(k);}*/ProducerRecordString, String record new ProducerRecord(topic04, key_tx_ i, tx_val_ i);producer.send(record);producer.flush();}producer.commitTransaction();//提交事务} catch (Exception e) {e.printStackTrace();producer.abortTransaction();//终止事务}producer.close();}
}
消费者用于测试上游生产者确实已经发送消息可以没有
package cn.qww.h_transaction.conxpdr;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Properties;public class ReadCommittedConsumer {public static void main(String[] args) {//1.创建Kafka链接参数Properties propsnew Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,cn.qww.Config.BOOTSTRAP_SERVERS_CONFIG);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());props.put(ConsumerConfig.GROUP_ID_CONFIG,group01);// 默认值 read_uncommitted
// props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,read_committed);//2.创建Topic消费者KafkaConsumerString,String consumernew KafkaConsumerString, String(props);consumer.subscribe(Arrays.asList(topic04));while (true){ConsumerRecordsString, String consumerRecords consumer.poll(Duration.ofSeconds(1));IteratorConsumerRecordString, String recordIterator consumerRecords.iterator();while (recordIterator.hasNext()){ConsumerRecordString, String record recordIterator.next();String key record.key();String value record.value();long offset record.offset();int partition record.partition();System.out.println(key:key,value:value,partition:partition,offset:offset);}}}
}
中游业务从上游读取数据并将处理后的数据发送到下游topic整体在一个事务里处理
package cn.qww.h_transaction.conxpdr;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;public class TxConsumerProducer {public static void main(String[] args) {Properties propsConsumernew Properties();propsConsumer.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,cn.qww.Config.BOOTSTRAP_SERVERS_CONFIG);propsConsumer.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());propsConsumer.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());propsConsumer.put(ConsumerConfig.GROUP_ID_CONFIG,group02);propsConsumer.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);propsConsumer.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,read_committed);KafkaConsumerString, String consumer new KafkaConsumerString, String(propsConsumer);consumer.subscribe(Arrays.asList(topic04));Properties propsnew Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,cn.qww.Config.BOOTSTRAP_SERVERS_CONFIG);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,tx-id);KafkaProducerString,String producer new KafkaProducerString, String(props);producer.initTransactions();//初始化事务try{while(true){ConsumerRecordsString, String consumerRecords consumer.poll(Duration.ofSeconds(1));IteratorConsumerRecordString, String consumerRecordIterator consumerRecords.iterator();//开启事务控制producer.beginTransaction();MapTopicPartition, OffsetAndMetadata offsets new HashMap();while (consumerRecordIterator.hasNext()){ConsumerRecordString, String record consumerRecordIterator.next();//创建RecordProducerRecordString,String producerRecordnew ProducerRecordString,String(topic02,record.key(),record.value());producer.send(producerRecord);//记录元数据offsets.put(new TopicPartition(record.topic(),record.partition()),new OffsetAndMetadata(record.offset()1));}//提交事务producer.sendOffsetsToTransaction(offsets,group02);producer.commitTransaction();}}catch (Exception e){producer.abortTransaction();//终止事务}finally {producer.close();}}
}
下游topic 的消费者
package cn.qww.h_transaction.conxpdr;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Properties;public class DownStreamConsumer {public static void main(String[] args) {//1.创建Kafka链接参数Properties propsnew Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,cn.qww.Config.BOOTSTRAP_SERVERS_CONFIG);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());props.put(ConsumerConfig.GROUP_ID_CONFIG,group01);// 默认值 read_uncommittedprops.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,read_committed);//2.创建Topic消费者KafkaConsumerString,String consumernew KafkaConsumerString, String(props);consumer.subscribe(Arrays.asList(topic02));while (true){ConsumerRecordsString, String consumerRecords consumer.poll(Duration.ofSeconds(1));IteratorConsumerRecordString, String recordIterator consumerRecords.iterator();while (recordIterator.hasNext()){ConsumerRecordString, String record recordIterator.next();String key record.key();String value record.value();long offset record.offset();int partition record.partition();System.out.println(key:key,value:value,partition:partition,offset:offset);}}}
}