合肥网站设计哪家公司好,上海企业名录大全黄页,成品网站seo,山东省建筑工程信息网Kraft模式安全认证
前章内容聊到了Kafka的Kraft集群的配置及使用。本篇再来说说kafka的安全认证方面的配置#xff0c;。
Kafka提供了多种方式来进行安全认证#xff0c;包括身份认证、授权和加密传输。一些常用的Kafka安全认证方式#xff1a;
SSL/TLS#xff1a;使用S…Kraft模式安全认证
前章内容聊到了Kafka的Kraft集群的配置及使用。本篇再来说说kafka的安全认证方面的配置。
Kafka提供了多种方式来进行安全认证包括身份认证、授权和加密传输。一些常用的Kafka安全认证方式
SSL/TLS使用SSL/TLS协议来加密Kafka与客户端之间的通信保证数据的机密性和完整性。可以通过配置Kafka的SSL证书、密钥和信任的CA证书来启用SSL/TLS。客户端也需要使用相应的证书与Kafka进行通信。SASLSimple Authentication and Security Layer使用SASL进行身份认证。Kafka支持多种SASL机制如PLAIN、GSSAPI等。可以通过配置Kafka的SASL机制和用户凭证用户名和密码、密钥等来启用SASL身份认证。ACLAccess Control List使用ACL进行授权管理。ACL允许你配置哪些用户或组可以访问Kafka的哪些主题和分区并对其进行读取或写入权限的控制。ACL的配置可以在Kafka的配置文件中进行。
这些安全认证方式可以单独使用也可以组合使用以实现更高级别的安全性。为了配置Kafka的安全认证需要对Kafka和客户端进行相应的配置并生成所需的证书和凭证。
本文针对SASL进行身份认证
开始配置
服务器数量有限暂时使用单机部署kafka集群此文给予配置参考实际还是要按项目的真实情况去处理了。
准备3个kafka分别是kafka01、kafka02、kafka03分别到它们的config/kraft/server.properties中做配置
kafka01的server.properties
process.rolesbroker,controller
node.id1
controller.quorum.voters1localhost:19093,2localhost:29093,3localhost:39093
listenersSASL_PLAINTEXT://:19092,CONTROLLER://:19093
sasl.enabled.mechanismsPLAIN
sasl.mechanism.inter.broker.protocolPLAIN
inter.broker.listener.nameSASL_PLAINTEXT
advertised.listenersSASL_PLAINTEXT://192.168.8.122:19092
controller.listener.namesCONTROLLER
log.dirs/wlh/kafka01/datakafka02
process.rolesbroker,controller
node.id2
controller.quorum.voters1localhost:19093,2localhost:29093,3localhost:39093
listenersSASL_PLAINTEXT://:29092,CONTROLLER://:29093
sasl.enabled.mechanismsPLAIN
sasl.mechanism.inter.broker.protocolPLAIN
inter.broker.listener.nameSASL_PLAINTEXT
advertised.listenersSASL_PLAINTEXT://192.168.8.122:29092
controller.listener.namesCONTROLLER
log.dirs/wlh/kafka02/datakafka03
process.rolesbroker,controller
node.id3
controller.quorum.voters1localhost:19093,2localhost:29093,3localhost:39093
listenersSASL_PLAINTEXT://:39092,CONTROLLER://:39093
sasl.enabled.mechanismsPLAIN
sasl.mechanism.inter.broker.protocolPLAIN
inter.broker.listener.nameSASL_PLAINTEXT
advertised.listenersSASL_PLAINTEXT://192.168.8.122:39092
controller.listener.namesCONTROLLER
log.dirs/wlh/kafka03/data先确保你的kafka的数据目录是空的执行下删除(后面初始化时会自动创建目录)
rm -rf /wlh/kafka01/data /wlh/kafka02/data /wlh/kafka03/data创建一个kafka sasl认证的服务配置 可以在kafka的config目录下新建一个kafka_server_jaas.conf文件然后认证信息写好
KafkaServer {org.apache.kafka.common.security.plain.PlainLoginModule requiredserviceNamekafkausernameadminpasswordeystar8888user_kafkakafka1234;
};上面的配置中声明了管理员为admin密码是eystar8888并且声明了一个用户名为kafka密码是kafka1234的用户客户端连接时使用用户为kafka可以成功进行认证。
而需要注意的是上面的配置中的分号;不能少否则就掉坑里了。 配置kafka服务的启动脚本 上面设置好sasl认证的配置后我们需要在kafka启动的服务脚本中将此配置加入进去。
可以直接修改bin/kafka-server-start.sh亦或者拷贝一份kafka-server-start.sh命名为kafka-server-start-saal.sh(名称自定义即可) export KAFKA_OPTS-Djava.security.auth.login.config/wlh/kafka01/config/kafka_server_jaas.confkafka02和kafka03同样这样配置好
export KAFKA_OPTS-Djava.security.auth.login.config/wlh/kafka02/config/kafka_server_jaas.confexport KAFKA_OPTS-Djava.security.auth.login.config/wlh/kafka03/config/kafka_server_jaas.conf开始执行启动kafka集群 # 生成一个uuid后面需要用
/wlh/kafka01/bin/kafka-storage.sh random-uuid# 格式化存储
/wlh/kafka01/bin/kafka-storage.sh format -t xtzWWN4bTjitpL3kfd9s5g -c /wlh/kafka01/config/kraft/server.properties
/wlh/kafka02/bin/kafka-storage.sh format -t xtzWWN4bTjitpL3kfd9s5g -c /wlh/kafka02/config/kraft/server.properties
/wlh/kafka03/bin/kafka-storage.sh format -t xtzWWN4bTjitpL3kfd9s5g -c /wlh/kafka03/config/kraft/server.properties开始启动kafka(-daemon后台启动)
# 分别启动它们
/wlh/kafka01/bin/kafka-server-start-saal.sh -daemon /wlh/kafka01/config/kraft/server.properties
/wlh/kafka02/bin/kafka-server-start-saal.sh -daemon /wlh/kafka02/config/kraft/server.properties
/wlh/kafka03/bin/kafka-server-start-saal.sh -daemon /wlh/kafka03/config/kraft/server.properties服务启动完成。。。
Tip:服务器端口要打开服务器端口要打开端口打开或者关了防火墙也行。
使用java进行连接
无论是使用kafka的API还是直接使用spring集成kafka都是可以的。
我这里就采用kafka的API方式了。
导入kafka-clients依赖
dependencygroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactIdversion2.7.2/version
/dependency在application.properties中配置相关属性注意spring.kafka.jaas-config是结尾是有一个分号;的若不写是连接不到kafka的。
spring.kafka.bootstrap-servers192.168.8.122:19092,192.168.8.122:29092,192.168.8.122:39092
spring.kafka.jaas-configorg.apache.kafka.common.security.plain.PlainLoginModule required usernameadmin passwordkafka1234;
spring.kafka.topicstest在java配置类中进行接收并且创建生产者和消费者
package xxx.xxx.xxx;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;import java.util.Properties;/*** author wlh* date 2023/8/10*/
ConditionalOnProperty(spring.kafka.bootstrap-servers)
Component
public class KafkaProperties {Value(${spring.kafka.bootstrap-servers})private String bootstrapServer;Value(${spring.kafka.jaas-config})private String jaasConfig;public static String topics;Value(${spring.kafka.topics})private void setTopics(String topics) {KafkaProperties.topics topics;}/*** 获取生产者配置** return 配置信息*/public Properties getProducerProperties() {Properties properties new Properties();properties.put(bootstrap.servers, bootstrapServer);String SERIALIZER org.apache.kafka.common.serialization.StringSerializer;properties.put(key.serializer, SERIALIZER);properties.put(value.serializer, SERIALIZER);fillSecurityProperties(properties);return properties;}// 消费者配置public Properties getConsumerProperties() {Properties properties new Properties();properties.put(bootstrap.servers, bootstrapServer);properties.put(group.id, test); // group.id可以自定义String DESERIALIZER org.apache.kafka.common.serialization.StringDeserializer;properties.put(key.deserializer, DESERIALIZER);properties.put(value.deserializer, DESERIALIZER);fillSecurityProperties(properties);return properties;}// 安全认证的配置private void fillSecurityProperties(Properties properties) {properties.setProperty(security.protocol, SecurityProtocol.SASL_PLAINTEXT.name);String SASL_MECHANISM PLAIN;properties.put(SaslConfigs.SASL_MECHANISM, SASL_MECHANISM);properties.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig);}}
创建生产者和消费者
package xxx.xxx.xxx;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.Arrays;
import java.util.Collections;
import java.util.List;/*** author wlh* date 2023/08/10*/
ConditionalOnProperty(spring.kafka.bootstrap-servers)
Slf4j
RequiredArgsConstructor
Configuration
public class KafkaConfig {private final KafkaProperties kafkaProperties;// 创建生产者Beanpublic KafkaProducerString, String kafkaProducer() {return new KafkaProducer(kafkaProperties.getProducerProperties());}// 创建消费者Beanpublic KafkaConsumerString, String kafkaConsumer() {KafkaConsumerString, String kafkaConsumer new KafkaConsumer(kafkaProperties.getConsumerProperties());ListString topicList Collections.singletonList(test); // 这里写死了可自行扩展kafkaConsumer.subscribe(topicList);log.info(消息订阅成功! topic:{}, topicList);log.info(消费者配置{}, kafkaProperties.getConsumerProperties().toString());return kafkaConsumer;}}信息发送的Util工具类
package xxx.xxx.xxx;import com.alibaba.excel.util.StringUtils;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;import java.util.Arrays;
import java.util.Collections;
import java.util.List;Component
Slf4j
public class KafkaSendUtil {AutowiredKafkaProducerString, String kafkaProducer;Asyncpublic void sendMsg(String topic, String msg) {ListString topics;if (StringUtils.isBlank(topic)) {topics Arrays.asList(KafkaProperties.topics.split(,));} else {topics Collections.singletonList(topic);}for (String sendTopic : topics) {ProducerRecordString, String record new ProducerRecord(sendTopic, msg);log.info(正在发送kafka数据数据{}, msg);kafkaProducer.send(record);}}}实例
简单做一个实例调通一下数据。监听方式可以不按照本文的本文只是做测试。
kafka消费者监听器
package xxx.xxx.xxx;import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;import java.util.Arrays;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;Slf4j
Component
public class KafkaListener implements ApplicationRunner {public static ExecutorService executorService Executors.newFixedThreadPool(2);Overridepublic void run(ApplicationArguments args) {log.info(监听服务启动!);executorService.execute(() - {MessageHandler kafkaListenMessageHandler SpringBeanUtils.getBean(MessageHandler.class);kafkaListenMessageHandler.onMessage(SpringBeanUtils.getBean(kafkaConsumer), Arrays.asList(test)); // 这里是监听的kafka的topic这里写死了自己扩展即可});}
}Bean的工具类
package com.bjmetro.top.global.kafka;import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;SuppressWarnings(unchecked)
Component
public class SpringBeanUtils implements ApplicationContextAware {private static ApplicationContext applicationContext;Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {SpringBeanUtils.applicationContext applicationContext;}public static T T getBean(String beanName) {if (applicationContext.containsBean(beanName)) {return (T) applicationContext.getBean(beanName);} else {return null;}}public static T T getBean(ClassT clazz) {return applicationContext.getBean(clazz);}
}消费者处理消息
package com.bjmetro.top.global.kafka;import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.stereotype.Component;import java.nio.charset.StandardCharsets;
import java.util.List;Slf4j
Component
public class MessageHandler {void onMessage(KafkaConsumer kafkaConsumer, ListString topic) {log.info(队列开始监听topic {}, topic);while (true) {ConsumerRecordsString, String records kafkaConsumer.poll(1000);for (ConsumerRecordString, String record : records) {log.info(partition:{} offset {}, key {}, value {}, record.partition(), record.offset(), record.key(), record.value());try {String messageData new String(record.value().getBytes(), StandardCharsets.UTF_8);System.out.println(收到消息 messageData);} catch (Exception e) {log.error(消息处理异常);}}}}}做一个消息推送的接口
Autowired
KafkaSendUtil sendUtil;
PostMapping(/kafka/send)
public ResponseResult sendKafka(RequestParam(msg) String msg) {sendUtil.sendMsg(null, msg); // 这里topic传空默认从application.properties中取了return new ResponseResult(ResponseConstant.CODE_OK, ResponseConstant.MSG_OK);
}访问一下看消费者日志