当前位置: 首页 > news >正文

怎么给自己的网站设置关键词网站备份还原

怎么给自己的网站设置关键词,网站备份还原,seo查询 站长工具,毕设做网站答辩会要求当场演示吗文章目录 使用api 实现 topic 增删改查pom.xmllog4j.properties创建、查询 Topic生产者、消费者 api自定义 生产者分区发送策略自定义序列化器自定义 生产者拦截器offset 提交控制确认-acks 与 重试-retries幂等消息生产者事务生产者消费者事务 04_kafka_java-api 使用api 实现… 文章目录 使用api 实现 topic 增删改查pom.xmllog4j.properties创建、查询 Topic生产者、消费者 api自定义 生产者分区发送策略自定义序列化器自定义 生产者拦截器offset 提交控制确认-acks 与 重试-retries幂等消息生产者事务生产者消费者事务 04_kafka_java-api 使用api 实现增删改查 发布订阅 sub/ assign 使用api 实现 topic 增删改查 需要额外注意的是 ip-主机名 主机名映射需要在开发机器上配置 pom.xml ?xml version1.0 encodingUTF-8? project xmlnshttp://maven.apache.org/POM/4.0.0xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersiongroupIdcn.qww/groupIdartifactIdkafka-demo/artifactIdversion1.0/versionpropertiesmaven.compiler.source8/maven.compiler.sourcemaven.compiler.target8/maven.compiler.targetproject.build.sourceEncodingUTF-8/project.build.sourceEncoding/propertiesdependencies!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --dependencygroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactIdversion2.2.0/version/dependency!-- https://mvnrepository.com/artifact/log4j/log4j --dependencygroupIdlog4j/groupIdartifactIdlog4j/artifactIdversion1.2.17/version/dependency!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api --dependencygroupIdorg.slf4j/groupIdartifactIdslf4j-api/artifactIdversion1.7.25/version/dependency!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 --dependencygroupIdorg.slf4j/groupIdartifactIdslf4j-log4j12/artifactIdversion1.7.25/version/dependency!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 --dependencygroupIdorg.apache.commons/groupIdartifactIdcommons-lang3/artifactIdversion3.9/version/dependency/dependencies/projectlog4j.properties log4j.rootLogger info,consolelog4j.appender.console org.apache.log4j.ConsoleAppender log4j.appender.console.Target System.out log4j.appender.console.layout org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern %p %d{yyyy-MM-dd HH:mm:ss} %c - %m%n 创建、查询 Topic package cn.qww.topic;import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.CreateTopicsResult; import org.apache.kafka.clients.admin.DescribeTopicsResult; import org.apache.kafka.clients.admin.KafkaAdminClient; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.common.KafkaFuture;import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; import java.util.concurrent.ExecutionException;public class TopicCreate {public static void main(String[] args) throws ExecutionException, InterruptedException {//配置连接参数Properties props new Properties();props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafka_1:9092,kafka_2:9092,kafka_3:9092);KafkaAdminClient adminClient (KafkaAdminClient) KafkaAdminClient.create(props);//创建TopicsListNewTopic newTopics Arrays.asList(new NewTopic(topic03, 2, (short) 3));CreateTopicsResult topics adminClient.createTopics(newTopics);MapString, KafkaFutureVoid values topics.values();System.out.println(values);//删除Topic// adminClient.deleteTopics(Arrays.asList(topic02));listTopic(adminClient);describeTopic(adminClient);adminClient.close();}/*** 查询topics*/private static void listTopic(KafkaAdminClient adminClient) throws ExecutionException, InterruptedException {KafkaFutureSetString nameFutures adminClient.listTopics().names();for (String name : nameFutures.get()) {System.out.println(topic_name: name);}}/*** describeTopic*/private static void describeTopic(KafkaAdminClient adminClient) throws ExecutionException, InterruptedException {DescribeTopicsResult describeTopics adminClient.describeTopics(Arrays.asList(topic02));MapString, TopicDescription tdm describeTopics.all().get();for (Map.EntryString, TopicDescription entry : tdm.entrySet()) {System.out.println(entry.getKey() \t entry.getValue());}} } 生产者、消费者 api producer package cn.qww.curpdr;import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class Producer {public static void main(String[] args) throws InterruptedException {//1.创建链接参数Properties propsnew Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,kafka_1:9092,kafka_2:9092,kafka_3:9092);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());//2.创建生产者KafkaProducerString,String producer new KafkaProducer(props);for(int i 0; i 10; i){Thread.sleep(100);ProducerRecordString, String record new ProducerRecord(topic03, key_ i, value_ i);producer.send(record);}producer.close();} }consumer package cn.qww.curpdr;import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration; import java.util.Iterator; import java.util.Properties; import java.util.regex.Pattern;public class Consumer {public static void main(String[] args) {//1.创建Kafka链接参数Properties propsnew Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,kafka_1:9092,kafka_2:9092,kafka_3:9092);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());props.put(ConsumerConfig.GROUP_ID_CONFIG,group01);//2.创建Topic消费者try (KafkaConsumerString, String consumer new KafkaConsumer(props)) {//3.订阅topic开头的消息队列consumer.subscribe(Pattern.compile(^topic.*$));while (true) {ConsumerRecordsString, String consumerRecords consumer.poll(Duration.ofSeconds(1));IteratorConsumerRecordString, String recordIterator consumerRecords.iterator();while (recordIterator.hasNext()) {ConsumerRecordString, String record recordIterator.next();String key record.key();String value record.value();long offset record.offset();int partition record.partition();System.out.println(key: key ,value: value ,partition: partition ,offset: offset);}}}} }自定义 生产者分区发送策略 默认生产者分发策略消息有key 使用hash没有key 使用轮询 自定义分区类 package cn.qww;public class Config {public static String BOOTSTRAP_SERVERS_CONFIG kafka_1:9092,kafka_2:9092,kafka_3:9092; }--- package cn.qww.c_custompartition;import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.utils.Utils;import java.util.Map; import java.util.concurrent.atomic.AtomicInteger;public class CustomPartition implements Partitioner {private AtomicInteger atomicInteger new AtomicInteger(0);Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {int numPartitions cluster.partitionsForTopic(topic).size();if (keyBytes null || keyBytes.length 0) {int i atomicInteger.addAndGet(1);int nodeI (i Integer.MAX_VALUE) % numPartitions;System.out.println(nodeI);return nodeI;} else {return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;}}Overridepublic void close() {System.out.println(close);}Overridepublic void configure(MapString, ? configs) {System.out.println(configure);} } 生产者 package cn.qww.c_custompartition;import cn.qww.Config; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class ProducerUseCustomPartition {public static void main(String[] args) {//1.创建链接参数Properties props new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Config.BOOTSTRAP_SERVERS_CONFIG);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartition.class.getName());//2.创建生产者KafkaProducerString, String producer new KafkaProducer(props);for (Integer i 0; i 6; i) {ProducerRecordString, String record new ProducerRecord(topic04, value_ i); // ProducerRecordString, String record new ProducerRecord(topic04, key_ i,value_ i);producer.send(record);}producer.close();} } 消费者 package cn.qww.c_custompartition;import cn.qww.Config; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration; import java.util.Iterator; import java.util.Properties; import java.util.regex.Pattern;public class ConsumerTest {public static void main(String[] args) {//1.创建Kafka链接参数Properties propsnew Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Config.BOOTSTRAP_SERVERS_CONFIG);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());props.put(ConsumerConfig.GROUP_ID_CONFIG,group01);//2.创建Topic消费者KafkaConsumerString,String consumer new KafkaConsumer(props);//3.订阅topic开头的消息队列consumer.subscribe(Pattern.compile(^topic.*$));while (true){ConsumerRecordsString, String consumerRecords consumer.poll(Duration.ofSeconds(1));IteratorConsumerRecordString, String recordIterator consumerRecords.iterator();while (recordIterator.hasNext()){ConsumerRecordString, String record recordIterator.next();String key record.key();String value record.value();long offset record.offset();int partition record.partition();System.out.println(key:key,value:value,partition:partition,offset:offset);}}} } 自定义序列化器 序列化器 package cn.qww.d_serializer;import org.apache.commons.lang3.SerializationUtils; import org.apache.kafka.common.serialization.Serializer;import java.io.Serializable; import java.util.Map;public class ObjectSerializer implements SerializerObject {Overridepublic void configure(MapString, ? configs, boolean isKey) {System.out.println(configure);}Overridepublic byte[] serialize(String topic, Object data) {// commons-lang3 包return SerializationUtils.serialize((Serializable) data);}Overridepublic void close() {System.out.println(close);} } 反序列化器 package cn.qww.d_serializer;import org.apache.commons.lang3.SerializationUtils; import org.apache.kafka.common.serialization.Deserializer;import java.util.Map;public class ObjectDeserializer implements DeserializerObject {Overridepublic void configure(MapString, ? configs, boolean isKey) {System.out.println(configure);}Overridepublic Object deserialize(String topic, byte[] data) {return SerializationUtils.deserialize(data);}Overridepublic void close() {System.out.println(close);} } 自定义类 package cn.qww.d_serializer;import java.io.Serializable;public class CustomObj implements Serializable {private Integer id;private String name;public CustomObj(Integer id, String name) {this.id id;this.name name;}public Integer getId() {return id;}public void setId(Integer id) {this.id id;}public String getName() {return name;}public void setName(String name) {this.name name;}Overridepublic String toString() {return CustomObj{ id id , name name \ };} } 消费者 package cn.qww.d_serializer;import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration; import java.util.Arrays; import java.util.Iterator; import java.util.Properties; import java.util.regex.Pattern;public class SerConsumer {public static void main(String[] args) {//1.创建Kafka链接参数Properties propsnew Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,cn.qww.Config.BOOTSTRAP_SERVERS_CONFIG);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,ObjectDeserializer.class.getName());props.put(ConsumerConfig.GROUP_ID_CONFIG,group01);//2.创建Topic消费者KafkaConsumerString, CustomObj consumernew KafkaConsumerString, CustomObj(props);consumer.subscribe(Arrays.asList(topic02));while (true){ConsumerRecordsString, CustomObj consumerRecords consumer.poll(Duration.ofSeconds(1));IteratorConsumerRecordString, CustomObj recordIterator consumerRecords.iterator();while (recordIterator.hasNext()){ConsumerRecordString, CustomObj record recordIterator.next();String key record.key();CustomObj value record.value();long offset record.offset();int partition record.partition();System.out.println(key:key,value:value,partition:partition,offset:offset);}}} } 生产者 package cn.qww.d_serializer;import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class SerProducer {public static void main(String[] args) {//1.创建链接参数Properties props new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cn.qww.Config.BOOTSTRAP_SERVERS_CONFIG);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ObjectSerializer.class.getName());//2.创建生产者KafkaProducerString, CustomObj producer new KafkaProducerString, CustomObj(props);for (Integer i 0; i 10; i) {ProducerRecordString, CustomObj record new ProducerRecord(topic02, key i, new CustomObj(i, name_ i));producer.send(record);}producer.close();} } 自定义 生产者拦截器 拦截器类 package cn.qww.e_interceptors;import org.apache.kafka.clients.producer.ProducerInterceptor; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata;import java.util.Map;public class CustomProducerInterceptor implements ProducerInterceptor {Overridepublic ProducerRecord onSend(ProducerRecord record) {ProducerRecord wrapRecord new ProducerRecord(record.topic(), record.key() _suf, record.value() _val_suf);wrapRecord.headers().add(header:, header_val.getBytes());System.out.println(topic: wrapRecord.topic() , partition: wrapRecord.partition());return wrapRecord;}Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {System.out.println(metadata: metadata , exception: exception);}Overridepublic void close() {System.out.println(close);}Overridepublic void configure(MapString, ? configs) {System.out.println(configure);} } 消费者 package cn.qww.e_interceptors;import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration; import java.util.Arrays; import java.util.Iterator; import java.util.Properties; import java.util.regex.Pattern;public class IcptConsumer {public static void main(String[] args) {//1.创建Kafka链接参数Properties props new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cn.qww.Config.BOOTSTRAP_SERVERS_CONFIG);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.GROUP_ID_CONFIG, group01);//2.创建Topic消费者KafkaConsumerString, String consumer new KafkaConsumerString, String(props);consumer.subscribe(Arrays.asList(topic01));while (true) {ConsumerRecordsString, String consumerRecords consumer.poll(Duration.ofSeconds(1));IteratorConsumerRecordString, String recordIterator consumerRecords.iterator();while (recordIterator.hasNext()) {ConsumerRecordString, String record recordIterator.next();String key record.key();String value record.value();long offset record.offset();int partition record.partition();System.out.println(key: key ,value: value ,partition: partition ,offset: offset ,header: record.headers());}}} } 生产者 package cn.qww.e_interceptors;import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class IcptProducer {public static void main(String[] args) {//1.创建链接参数Properties props new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cn.qww.Config.BOOTSTRAP_SERVERS_CONFIG);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, CustomProducerInterceptor.class.getName());//2.创建生产者KafkaProducerString, String producer new KafkaProducerString, String(props);for (Integer i 0; i 10; i) {ProducerRecordString, String record new ProducerRecord(topic01, key i, error i);producer.send(record);}producer.close();} } offset 提交控制 第一次连接消息队列 auto.offset.reset 设置 package cn.qww.f_offset_autocommit;import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration; import java.util.Arrays; import java.util.Iterator; import java.util.Properties;public class AutoOffsetResetConfigConsumer {public static void main(String[] args) {//1.创建Kafka链接参数Properties props new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cn.qww.Config.BOOTSTRAP_SERVERS_CONFIG);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 第一次访问时 读到历史的消息 // props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,earliest); // props.put(ConsumerConfig.GROUP_ID_CONFIG, group01);props.put(ConsumerConfig.GROUP_ID_CONFIG, g2);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, earliest);//2.创建Topic消费者KafkaConsumerString, String consumer new KafkaConsumer(props);//3.订阅topic开头的消息队列consumer.subscribe(Arrays.asList(topic01));while (true) {ConsumerRecordsString, String consumerRecords consumer.poll(Duration.ofSeconds(1));IteratorConsumerRecordString, String recordIterator consumerRecords.iterator();while (recordIterator.hasNext()) {ConsumerRecordString, String record recordIterator.next();String key record.key();String value record.value();long offset record.offset();int partition record.partition();System.out.println(key: key ,value: value ,partition: partition ,offset: offset);}}} } 关闭自动提交 package cn.qww.f_offset_autocommit;import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetCommitCallback; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration; import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Properties; import java.util.regex.Pattern;public class CloseAutoCommitConsumer {public static void main(String[] args) {//1.创建Kafka链接参数Properties props new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cn.qww.Config.BOOTSTRAP_SERVERS_CONFIG);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.GROUP_ID_CONFIG, g4);props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, earliest);//2.创建Topic消费者KafkaConsumerString, String consumer new KafkaConsumer(props);consumer.subscribe(Arrays.asList(topic01));while (true) {ConsumerRecordsString, String consumerRecords consumer.poll(Duration.ofSeconds(1));IteratorConsumerRecordString, String recordIterator consumerRecords.iterator();while (recordIterator.hasNext()) {ConsumerRecordString, String record recordIterator.next();String key record.key();String value record.value();long offset record.offset();int partition record.partition();MapTopicPartition, OffsetAndMetadata offsets new HashMapTopicPartition, OffsetAndMetadata();offsets.put(new TopicPartition(record.topic(), partition), new OffsetAndMetadata(offset 1));consumer.commitAsync(offsets, (offsets1, exception) - System.out.println(完成 offsets1 提交));System.out.println(key: key ,value: value ,partition: partition ,offset: offset);}}} } 调整自动提交间隔 auto.commit.interval.ms package cn.qww.f_offset_autocommit;import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration; import java.util.Iterator; import java.util.Properties; import java.util.regex.Pattern;public class ModifyCommitIntervalConsumer {public static void main(String[] args) {//1.创建Kafka链接参数Properties propsnew Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,cn.qww.Config.BOOTSTRAP_SERVERS_CONFIG);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());props.put(ConsumerConfig.GROUP_ID_CONFIG,group01);props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,10000);//2.创建Topic消费者KafkaConsumerString,String consumernew KafkaConsumerString, String(props);//3.订阅topic开头的消息队列consumer.subscribe(Pattern.compile(^topic.*$));while (true){ConsumerRecordsString, String consumerRecords consumer.poll(Duration.ofSeconds(1));IteratorConsumerRecordString, String recordIterator consumerRecords.iterator();while (recordIterator.hasNext()){ConsumerRecordString, String record recordIterator.next();String key record.key();String value record.value();long offset record.offset();int partition record.partition();System.out.println(key:key,value:value,partition:partition,offset:offset);}}} }生产者 package cn.qww.f_offset_autocommit;import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class Producer {public static void main(String[] args) {//1.创建链接参数Properties props new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cn.qww.Config.BOOTSTRAP_SERVERS_CONFIG);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());//2.创建生产者KafkaProducerString, String producer new KafkaProducerString, String(props);for (Integer i 0; i 10; i) {ProducerRecordString, String record new ProducerRecord(topic01, key i, value i);producer.send(record);}producer.close();} }确认-acks 与 重试-retries 生产者配置 acks 与 retries package cn.qww.g_acks_retries;import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer;import java.io.IOException; import java.util.Properties;public class AckRetriesProducer {public static void main(String[] args) throws IOException {//1.创建链接参数Properties props new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cn.qww.Config.BOOTSTRAP_SERVERS_CONFIG);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 1);props.put(ProducerConfig.ACKS_CONFIG, all);props.put(ProducerConfig.RETRIES_CONFIG, 4);//2.创建生产者KafkaProducerString, String producer new KafkaProducerString, String(props);ProducerRecordString, String record new ProducerRecord(topic04, key-ack, value-retries );producer.send(record);producer.flush();producer.close(); // System.in.read();} } 消费者 package cn.qww.g_acks_retries;import cn.qww.Config; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration; import java.util.Arrays; import java.util.Iterator; import java.util.Properties; import java.util.regex.Pattern;public class ConsumerTest {public static void main(String[] args) {//1.创建Kafka链接参数Properties props new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Config.BOOTSTRAP_SERVERS_CONFIG);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.GROUP_ID_CONFIG, group01);//2.创建Topic消费者KafkaConsumerString, String consumer new KafkaConsumer(props);consumer.subscribe(Arrays.asList(topic04));while (true) {ConsumerRecordsString, String consumerRecords consumer.poll(Duration.ofSeconds(1));IteratorConsumerRecordString, String recordIterator consumerRecords.iterator();while (recordIterator.hasNext()) {ConsumerRecordString, String record recordIterator.next();String key record.key();String value record.value();long offset record.offset();int partition record.partition();System.out.println(key: key ,value: value ,partition: partition ,offset: offset);}}} } 幂等消息 生产者设置幂等消息enable.idempotencetrue package cn.qww.i_idempotence;import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class IdempotenceProducer {public static void main(String[] args) {//1.创建链接参数Properties props new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cn.qww.Config.BOOTSTRAP_SERVERS_CONFIG);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 1);props.put(ProducerConfig.ACKS_CONFIG, -1);props.put(ProducerConfig.RETRIES_CONFIG, 3);props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);//2.创建生产者KafkaProducerString, String producer new KafkaProducerString, String(props);ProducerRecordString, String record new ProducerRecord(topic04, key-idempotence, value-idempotence );producer.send(record);producer.flush();producer.close();} }消费者 package cn.qww.i_idempotence;import cn.qww.Config; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration; import java.util.Arrays; import java.util.Iterator; import java.util.Properties;public class ConsumerTest {public static void main(String[] args) {//1.创建Kafka链接参数Properties props new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Config.BOOTSTRAP_SERVERS_CONFIG);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.GROUP_ID_CONFIG, group01);//2.创建Topic消费者KafkaConsumerString, String consumer new KafkaConsumer(props);consumer.subscribe(Arrays.asList(topic04));while (true) {ConsumerRecordsString, String consumerRecords consumer.poll(Duration.ofSeconds(1));IteratorConsumerRecordString, String recordIterator consumerRecords.iterator();while (recordIterator.hasNext()) {ConsumerRecordString, String record recordIterator.next();String key record.key();String value record.value();long offset record.offset();int partition record.partition();System.out.println(key: key ,value: value ,partition: partition ,offset: offset);}}} }生产者事务 生产者开启事务 package cn.qww.h_transaction;import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties; import java.util.UUID;public class TxProducer {public static void main(String[] args) {//1.创建链接参数Properties props new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cn.qww.Config.BOOTSTRAP_SERVERS_CONFIG);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 事务 id 具有唯一性props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, tx-id UUID.randomUUID().toString());// 配置批处理size/*props.put(ProducerConfig.BATCH_SIZE_CONFIG, 20);// batch 中数据不足时等待时间 10msprops.put(ProducerConfig.LINGER_MS_CONFIG, 10);*///2.创建生产者KafkaProducerString, String producer new KafkaProducerString, String(props);producer.initTransactions();//初始化事务try {producer.beginTransaction();//开启事务for (Integer i 0; i 10; i) {if (i 7) {int k i / 0;System.out.println(k);}ProducerRecordString, String record new ProducerRecord(topic04, key_tx_ i, tx_val_ i);producer.send(record);producer.flush();}producer.commitTransaction();//提交事务} catch (Exception e) {e.printStackTrace();producer.abortTransaction();//终止事务}producer.close();} } 消费者 设置读已提交read_committed package cn.qww.h_transaction;import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration; import java.util.Arrays; import java.util.Iterator; import java.util.Properties; import java.util.regex.Pattern;public class ReadCommittedConsumer {public static void main(String[] args) {//1.创建Kafka链接参数Properties propsnew Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,cn.qww.Config.BOOTSTRAP_SERVERS_CONFIG);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());props.put(ConsumerConfig.GROUP_ID_CONFIG,group01);// 默认值 read_uncommitted // props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,read_committed);//2.创建Topic消费者KafkaConsumerString,String consumernew KafkaConsumerString, String(props);consumer.subscribe(Arrays.asList(topic04));while (true){ConsumerRecordsString, String consumerRecords consumer.poll(Duration.ofSeconds(1));IteratorConsumerRecordString, String recordIterator consumerRecords.iterator();while (recordIterator.hasNext()){ConsumerRecordString, String record recordIterator.next();String key record.key();String value record.value();long offset record.offset();int partition record.partition();System.out.println(key:key,value:value,partition:partition,offset:offset);}}} } 生产者消费者事务 上游生产者—(msg)— 【topic04 中游业务 —(msg_processed)— topic02】 下游消费者 上游生产者 package cn.qww.h_transaction.conxpdr;import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties; import java.util.UUID;public class SrcProducer {public static void main(String[] args) {//1.创建链接参数Properties props new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cn.qww.Config.BOOTSTRAP_SERVERS_CONFIG);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 事务 id 具有唯一性props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, tx-id UUID.randomUUID().toString());// 配置批处理size/*props.put(ProducerConfig.BATCH_SIZE_CONFIG, 20);// batch 中数据不足时等待时间 10msprops.put(ProducerConfig.LINGER_MS_CONFIG, 10);*///2.创建生产者KafkaProducerString, String producer new KafkaProducerString, String(props);producer.initTransactions();//初始化事务try {producer.beginTransaction();//开启事务for (Integer i 0; i 10; i) {/*if (i 7) {int k i / 0;System.out.println(k);}*/ProducerRecordString, String record new ProducerRecord(topic04, key_tx_ i, tx_val_ i);producer.send(record);producer.flush();}producer.commitTransaction();//提交事务} catch (Exception e) {e.printStackTrace();producer.abortTransaction();//终止事务}producer.close();} } 消费者用于测试上游生产者确实已经发送消息可以没有 package cn.qww.h_transaction.conxpdr;import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration; import java.util.Arrays; import java.util.Iterator; import java.util.Properties;public class ReadCommittedConsumer {public static void main(String[] args) {//1.创建Kafka链接参数Properties propsnew Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,cn.qww.Config.BOOTSTRAP_SERVERS_CONFIG);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());props.put(ConsumerConfig.GROUP_ID_CONFIG,group01);// 默认值 read_uncommitted // props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,read_committed);//2.创建Topic消费者KafkaConsumerString,String consumernew KafkaConsumerString, String(props);consumer.subscribe(Arrays.asList(topic04));while (true){ConsumerRecordsString, String consumerRecords consumer.poll(Duration.ofSeconds(1));IteratorConsumerRecordString, String recordIterator consumerRecords.iterator();while (recordIterator.hasNext()){ConsumerRecordString, String record recordIterator.next();String key record.key();String value record.value();long offset record.offset();int partition record.partition();System.out.println(key:key,value:value,partition:partition,offset:offset);}}} } 中游业务从上游读取数据并将处理后的数据发送到下游topic整体在一个事务里处理 package cn.qww.h_transaction.conxpdr;import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer;import java.time.Duration; import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Properties;public class TxConsumerProducer {public static void main(String[] args) {Properties propsConsumernew Properties();propsConsumer.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,cn.qww.Config.BOOTSTRAP_SERVERS_CONFIG);propsConsumer.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());propsConsumer.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());propsConsumer.put(ConsumerConfig.GROUP_ID_CONFIG,group02);propsConsumer.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);propsConsumer.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,read_committed);KafkaConsumerString, String consumer new KafkaConsumerString, String(propsConsumer);consumer.subscribe(Arrays.asList(topic04));Properties propsnew Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,cn.qww.Config.BOOTSTRAP_SERVERS_CONFIG);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,tx-id);KafkaProducerString,String producer new KafkaProducerString, String(props);producer.initTransactions();//初始化事务try{while(true){ConsumerRecordsString, String consumerRecords consumer.poll(Duration.ofSeconds(1));IteratorConsumerRecordString, String consumerRecordIterator consumerRecords.iterator();//开启事务控制producer.beginTransaction();MapTopicPartition, OffsetAndMetadata offsets new HashMap();while (consumerRecordIterator.hasNext()){ConsumerRecordString, String record consumerRecordIterator.next();//创建RecordProducerRecordString,String producerRecordnew ProducerRecordString,String(topic02,record.key(),record.value());producer.send(producerRecord);//记录元数据offsets.put(new TopicPartition(record.topic(),record.partition()),new OffsetAndMetadata(record.offset()1));}//提交事务producer.sendOffsetsToTransaction(offsets,group02);producer.commitTransaction();}}catch (Exception e){producer.abortTransaction();//终止事务}finally {producer.close();}} } 下游topic 的消费者 package cn.qww.h_transaction.conxpdr;import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration; import java.util.Arrays; import java.util.Iterator; import java.util.Properties;public class DownStreamConsumer {public static void main(String[] args) {//1.创建Kafka链接参数Properties propsnew Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,cn.qww.Config.BOOTSTRAP_SERVERS_CONFIG);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());props.put(ConsumerConfig.GROUP_ID_CONFIG,group01);// 默认值 read_uncommittedprops.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,read_committed);//2.创建Topic消费者KafkaConsumerString,String consumernew KafkaConsumerString, String(props);consumer.subscribe(Arrays.asList(topic02));while (true){ConsumerRecordsString, String consumerRecords consumer.poll(Duration.ofSeconds(1));IteratorConsumerRecordString, String recordIterator consumerRecords.iterator();while (recordIterator.hasNext()){ConsumerRecordString, String record recordIterator.next();String key record.key();String value record.value();long offset record.offset();int partition record.partition();System.out.println(key:key,value:value,partition:partition,offset:offset);}}} }
http://www.pierceye.com/news/853056/

相关文章:

  • 建站的费用服务器搭建网站环境
  • 查看公司信息的网站旅游网站效果图
  • 娄底网站制作重庆专题片制作
  • 网站建设佰金手指科杰十七织梦淘客网站
  • 财务系统seo西安
  • 如何做好网站建设的关键重点网站地图那么建设
  • 打开山东城市建设职业学院网站自己网站做优化的有权利卖么
  • 境外电商网站建设sem推广优化
  • 五站合一自建网站制作网站用什么软件有哪些
  • 查法人信息的网站开发公司一季度汇报
  • 国外的购物网站有哪些安徽省住房和城乡建设厅官方网站
  • 网站策划需要什么能力网页游戏平台软件
  • phpmysql网站开发网络结构
  • 微官网和移动网站区别论坛网站建设多少钱
  • 怎么做公司网站优化凡科h5登录入口
  • 做电影网站如何推广方案房产网络平台
  • 站长工具 seo查询python爬数据做网站
  • 网站 底部医院网站建设的要求
  • asp网站静态化seo关键词排名优化软件怎么选
  • wordpress apache版本北京seo招聘
  • 南京玄武网站建设信息服务公司的经营范围有哪些
  • 旅游网站建设与翻译wordpress 显示作者
  • 网站建设与维护报告总结国家外汇管理局网站怎么做报告
  • 南沙区网站建设网站开发人员薪酬
  • 设计外贸英文网站简述网站开发的流程
  • 电商网站设计是干什么的如何建设cpa影视网站
  • wordpress设置阅读全文什么是seo搜索引擎优化
  • 网站名重复网站建设的经验之谈
  • 网站优化软件排名器有含义的公司名
  • 像wordpress一样的网站吗老徐蜂了网站策划书