北京网站建设咸宁,缙云建设局网站,应用市场免费下载安装,如何加强校园网站建设前言
下面是zookeeper和kafka的官网下载地址#xff0c;大家可以学习下载
zookeeper下载地址#xff1a;http://zookeeper.apache.org/releases.html
kafka下载地址#xff1a;http://kafka.apache.org/downloads.html
1、添加依赖
在 pom.xml 文件中添加kafka依赖大家可以学习下载
zookeeper下载地址http://zookeeper.apache.org/releases.html
kafka下载地址http://kafka.apache.org/downloads.html
1、添加依赖
在 pom.xml 文件中添加kafka依赖依赖如下 dependencygroupIdorg.springframework.kafka/groupIdartifactIdspring-kafka/artifactId/dependency2、配置Kafka信息
在 application.properties或 application.yml文件中配置 Kafka 的相关信息下面是一个简单的示例
#kafka地址多个地址使用,分隔
spring.kafka.bootstrap-servers127.0.0.1:9092
#消费者组ID
spring.kafka.consumer.group-idmyGroup
#序列化和反序列化
spring.kafka.consumer.key-deserializerorg.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializerorg.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.key-serializerorg.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializerorg.apache.kafka.common.serialization.StringSerializer3、发送消息
因为我们是springboot项目已经集成了KafkaTemplate我们可以直接使用KafkaTemplate来发送消息
下面我编写一个发送消息的生产者
/*** 消息生产者*/
Component
Slf4j
public class KafkaProducer {Autowiredprivate KafkaTemplateString,String kafkaTemplate;/*** 发送消息* param topic 主题* param msg 消息*/public void send(String topic,String msg){kafkaTemplate.send(topic,msg).addCallback(new ListenableFutureCallbackSendResultString, String() {Overridepublic void onFailure(Throwable ex) {log.error(发送消息失败{}, ex);}Overridepublic void onSuccess(SendResultString, String result) {log.info(发送消息成功:{});}});}/*** 发送消息* param topic* param msg*/public void send(String topic, Object msg) {send(topic, JSONObject.toJSONString(msg));}}编写好生产者之后我们就可以使用生产者发送消息如下 Autowiredprivate KafkaProducer kafkaProducer;GetMapping(send)public void sendMsg(){kafkaProducer.send(my-topic,hello world);}如果想定制KafkaTemplate那么可以在配置类进行配置如下所示
Configuration
public class KafakaConfig {Value(${spring.kafka.bootstrap-servers})private String bootstrapServers;/*** 配置属性* return*/Beanpublic MapString, Object producerConfigs() {MapString, Object props new HashMap();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer);return props;}Beanpublic ProducerFactoryString, String producerFactory() {return new DefaultKafkaProducerFactory(producerConfigs());}/*** 定制KafkaTemplate* return*/Beanpublic KafkaTemplateString, String kafkaTemplate() {KafkaTemplateString, String kafkaTemplate new KafkaTemplate(producerFactory());kafkaTemplate.setDefaultTopic(myGroup);return kafkaTemplate;}}4、消费消息
使用 KafkaListener 注解创建 Kafka 消费者并监听指定的主题。接收到消息后可以通过方法参数来接收消息
Slf4j
Component
public class KafkaConsumer {/*** 消费my-topic主题的消息* param message*/KafkaListener(topics my-topic,groupId myGroup1)public void receiveMessage(String message){log.info(消费消息message);}
}同一消费者组只会有一个消费者进行消费如果想配置多个消费者同时处理可以使用 KafkaListener 注解来配置多个消费者。每个消费者需要配置不同的 group-id监听主题一致如下所示就会有两个消费者同时消费
Slf4j
Component
public class KafkaConsumer {KafkaListener(topics my-topic,groupId myGroup1)public void receiveMessage(String message){log.info(消费消息message);}KafkaListener(topics my-topic,groupId myGroup2)public void receiveMessage2(String message){log.info(消费消息message);}}