当前位置: 首页 > news >正文

合肥网站设计哪家公司好上海企业名录大全黄页

合肥网站设计哪家公司好,上海企业名录大全黄页,成品网站seo,山东省建筑工程信息网Kraft模式安全认证 前章内容聊到了Kafka的Kraft集群的配置及使用。本篇再来说说kafka的安全认证方面的配置#xff0c;。 Kafka提供了多种方式来进行安全认证#xff0c;包括身份认证、授权和加密传输。一些常用的Kafka安全认证方式#xff1a; SSL/TLS#xff1a;使用S…Kraft模式安全认证 前章内容聊到了Kafka的Kraft集群的配置及使用。本篇再来说说kafka的安全认证方面的配置。 Kafka提供了多种方式来进行安全认证包括身份认证、授权和加密传输。一些常用的Kafka安全认证方式 SSL/TLS使用SSL/TLS协议来加密Kafka与客户端之间的通信保证数据的机密性和完整性。可以通过配置Kafka的SSL证书、密钥和信任的CA证书来启用SSL/TLS。客户端也需要使用相应的证书与Kafka进行通信。SASLSimple Authentication and Security Layer使用SASL进行身份认证。Kafka支持多种SASL机制如PLAIN、GSSAPI等。可以通过配置Kafka的SASL机制和用户凭证用户名和密码、密钥等来启用SASL身份认证。ACLAccess Control List使用ACL进行授权管理。ACL允许你配置哪些用户或组可以访问Kafka的哪些主题和分区并对其进行读取或写入权限的控制。ACL的配置可以在Kafka的配置文件中进行。 这些安全认证方式可以单独使用也可以组合使用以实现更高级别的安全性。为了配置Kafka的安全认证需要对Kafka和客户端进行相应的配置并生成所需的证书和凭证。 本文针对SASL进行身份认证 开始配置 服务器数量有限暂时使用单机部署kafka集群此文给予配置参考实际还是要按项目的真实情况去处理了。 准备3个kafka分别是kafka01、kafka02、kafka03分别到它们的config/kraft/server.properties中做配置 kafka01的server.properties process.rolesbroker,controller node.id1 controller.quorum.voters1localhost:19093,2localhost:29093,3localhost:39093 listenersSASL_PLAINTEXT://:19092,CONTROLLER://:19093 sasl.enabled.mechanismsPLAIN sasl.mechanism.inter.broker.protocolPLAIN inter.broker.listener.nameSASL_PLAINTEXT advertised.listenersSASL_PLAINTEXT://192.168.8.122:19092 controller.listener.namesCONTROLLER log.dirs/wlh/kafka01/datakafka02 process.rolesbroker,controller node.id2 controller.quorum.voters1localhost:19093,2localhost:29093,3localhost:39093 listenersSASL_PLAINTEXT://:29092,CONTROLLER://:29093 sasl.enabled.mechanismsPLAIN sasl.mechanism.inter.broker.protocolPLAIN inter.broker.listener.nameSASL_PLAINTEXT advertised.listenersSASL_PLAINTEXT://192.168.8.122:29092 controller.listener.namesCONTROLLER log.dirs/wlh/kafka02/datakafka03 process.rolesbroker,controller node.id3 controller.quorum.voters1localhost:19093,2localhost:29093,3localhost:39093 listenersSASL_PLAINTEXT://:39092,CONTROLLER://:39093 sasl.enabled.mechanismsPLAIN sasl.mechanism.inter.broker.protocolPLAIN inter.broker.listener.nameSASL_PLAINTEXT advertised.listenersSASL_PLAINTEXT://192.168.8.122:39092 controller.listener.namesCONTROLLER log.dirs/wlh/kafka03/data先确保你的kafka的数据目录是空的执行下删除(后面初始化时会自动创建目录) rm -rf /wlh/kafka01/data /wlh/kafka02/data /wlh/kafka03/data创建一个kafka sasl认证的服务配置 可以在kafka的config目录下新建一个kafka_server_jaas.conf文件然后认证信息写好 KafkaServer {org.apache.kafka.common.security.plain.PlainLoginModule requiredserviceNamekafkausernameadminpasswordeystar8888user_kafkakafka1234; };上面的配置中声明了管理员为admin密码是eystar8888并且声明了一个用户名为kafka密码是kafka1234的用户客户端连接时使用用户为kafka可以成功进行认证。 而需要注意的是上面的配置中的分号;不能少否则就掉坑里了。 配置kafka服务的启动脚本 上面设置好sasl认证的配置后我们需要在kafka启动的服务脚本中将此配置加入进去。 可以直接修改bin/kafka-server-start.sh亦或者拷贝一份kafka-server-start.sh命名为kafka-server-start-saal.sh(名称自定义即可) export KAFKA_OPTS-Djava.security.auth.login.config/wlh/kafka01/config/kafka_server_jaas.confkafka02和kafka03同样这样配置好 export KAFKA_OPTS-Djava.security.auth.login.config/wlh/kafka02/config/kafka_server_jaas.confexport KAFKA_OPTS-Djava.security.auth.login.config/wlh/kafka03/config/kafka_server_jaas.conf开始执行启动kafka集群 # 生成一个uuid后面需要用 /wlh/kafka01/bin/kafka-storage.sh random-uuid# 格式化存储 /wlh/kafka01/bin/kafka-storage.sh format -t xtzWWN4bTjitpL3kfd9s5g -c /wlh/kafka01/config/kraft/server.properties /wlh/kafka02/bin/kafka-storage.sh format -t xtzWWN4bTjitpL3kfd9s5g -c /wlh/kafka02/config/kraft/server.properties /wlh/kafka03/bin/kafka-storage.sh format -t xtzWWN4bTjitpL3kfd9s5g -c /wlh/kafka03/config/kraft/server.properties开始启动kafka(-daemon后台启动) # 分别启动它们 /wlh/kafka01/bin/kafka-server-start-saal.sh -daemon /wlh/kafka01/config/kraft/server.properties /wlh/kafka02/bin/kafka-server-start-saal.sh -daemon /wlh/kafka02/config/kraft/server.properties /wlh/kafka03/bin/kafka-server-start-saal.sh -daemon /wlh/kafka03/config/kraft/server.properties服务启动完成。。。 Tip:服务器端口要打开服务器端口要打开端口打开或者关了防火墙也行。 使用java进行连接 无论是使用kafka的API还是直接使用spring集成kafka都是可以的。 我这里就采用kafka的API方式了。 导入kafka-clients依赖 dependencygroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactIdversion2.7.2/version /dependency在application.properties中配置相关属性注意spring.kafka.jaas-config是结尾是有一个分号;的若不写是连接不到kafka的。 spring.kafka.bootstrap-servers192.168.8.122:19092,192.168.8.122:29092,192.168.8.122:39092 spring.kafka.jaas-configorg.apache.kafka.common.security.plain.PlainLoginModule required usernameadmin passwordkafka1234; spring.kafka.topicstest在java配置类中进行接收并且创建生产者和消费者 package xxx.xxx.xxx; import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.security.auth.SecurityProtocol; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Component;import java.util.Properties;/*** author wlh* date 2023/8/10*/ ConditionalOnProperty(spring.kafka.bootstrap-servers) Component public class KafkaProperties {Value(${spring.kafka.bootstrap-servers})private String bootstrapServer;Value(${spring.kafka.jaas-config})private String jaasConfig;public static String topics;Value(${spring.kafka.topics})private void setTopics(String topics) {KafkaProperties.topics topics;}/*** 获取生产者配置** return 配置信息*/public Properties getProducerProperties() {Properties properties new Properties();properties.put(bootstrap.servers, bootstrapServer);String SERIALIZER org.apache.kafka.common.serialization.StringSerializer;properties.put(key.serializer, SERIALIZER);properties.put(value.serializer, SERIALIZER);fillSecurityProperties(properties);return properties;}// 消费者配置public Properties getConsumerProperties() {Properties properties new Properties();properties.put(bootstrap.servers, bootstrapServer);properties.put(group.id, test); // group.id可以自定义String DESERIALIZER org.apache.kafka.common.serialization.StringDeserializer;properties.put(key.deserializer, DESERIALIZER);properties.put(value.deserializer, DESERIALIZER);fillSecurityProperties(properties);return properties;}// 安全认证的配置private void fillSecurityProperties(Properties properties) {properties.setProperty(security.protocol, SecurityProtocol.SASL_PLAINTEXT.name);String SASL_MECHANISM PLAIN;properties.put(SaslConfigs.SASL_MECHANISM, SASL_MECHANISM);properties.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig);}} 创建生产者和消费者 package xxx.xxx.xxx; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;import java.util.Arrays; import java.util.Collections; import java.util.List;/*** author wlh* date 2023/08/10*/ ConditionalOnProperty(spring.kafka.bootstrap-servers) Slf4j RequiredArgsConstructor Configuration public class KafkaConfig {private final KafkaProperties kafkaProperties;// 创建生产者Beanpublic KafkaProducerString, String kafkaProducer() {return new KafkaProducer(kafkaProperties.getProducerProperties());}// 创建消费者Beanpublic KafkaConsumerString, String kafkaConsumer() {KafkaConsumerString, String kafkaConsumer new KafkaConsumer(kafkaProperties.getConsumerProperties());ListString topicList Collections.singletonList(test); // 这里写死了可自行扩展kafkaConsumer.subscribe(topicList);log.info(消息订阅成功! topic:{}, topicList);log.info(消费者配置{}, kafkaProperties.getConsumerProperties().toString());return kafkaConsumer;}}信息发送的Util工具类 package xxx.xxx.xxx;import com.alibaba.excel.util.StringUtils; import com.alibaba.fastjson.JSON; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component;import java.util.Arrays; import java.util.Collections; import java.util.List;Component Slf4j public class KafkaSendUtil {AutowiredKafkaProducerString, String kafkaProducer;Asyncpublic void sendMsg(String topic, String msg) {ListString topics;if (StringUtils.isBlank(topic)) {topics Arrays.asList(KafkaProperties.topics.split(,));} else {topics Collections.singletonList(topic);}for (String sendTopic : topics) {ProducerRecordString, String record new ProducerRecord(sendTopic, msg);log.info(正在发送kafka数据数据{}, msg);kafkaProducer.send(record);}}}实例 简单做一个实例调通一下数据。监听方式可以不按照本文的本文只是做测试。 kafka消费者监听器 package xxx.xxx.xxx;import lombok.extern.slf4j.Slf4j; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.stereotype.Component;import java.util.Arrays; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors;Slf4j Component public class KafkaListener implements ApplicationRunner {public static ExecutorService executorService Executors.newFixedThreadPool(2);Overridepublic void run(ApplicationArguments args) {log.info(监听服务启动!);executorService.execute(() - {MessageHandler kafkaListenMessageHandler SpringBeanUtils.getBean(MessageHandler.class);kafkaListenMessageHandler.onMessage(SpringBeanUtils.getBean(kafkaConsumer), Arrays.asList(test)); // 这里是监听的kafka的topic这里写死了自己扩展即可});} }Bean的工具类 package com.bjmetro.top.global.kafka;import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.stereotype.Component;SuppressWarnings(unchecked) Component public class SpringBeanUtils implements ApplicationContextAware {private static ApplicationContext applicationContext;Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {SpringBeanUtils.applicationContext applicationContext;}public static T T getBean(String beanName) {if (applicationContext.containsBean(beanName)) {return (T) applicationContext.getBean(beanName);} else {return null;}}public static T T getBean(ClassT clazz) {return applicationContext.getBean(clazz);} }消费者处理消息 package com.bjmetro.top.global.kafka;import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.springframework.stereotype.Component;import java.nio.charset.StandardCharsets; import java.util.List;Slf4j Component public class MessageHandler {void onMessage(KafkaConsumer kafkaConsumer, ListString topic) {log.info(队列开始监听topic {}, topic);while (true) {ConsumerRecordsString, String records kafkaConsumer.poll(1000);for (ConsumerRecordString, String record : records) {log.info(partition:{} offset {}, key {}, value {}, record.partition(), record.offset(), record.key(), record.value());try {String messageData new String(record.value().getBytes(), StandardCharsets.UTF_8);System.out.println(收到消息 messageData);} catch (Exception e) {log.error(消息处理异常);}}}}}做一个消息推送的接口 Autowired KafkaSendUtil sendUtil; PostMapping(/kafka/send) public ResponseResult sendKafka(RequestParam(msg) String msg) {sendUtil.sendMsg(null, msg); // 这里topic传空默认从application.properties中取了return new ResponseResult(ResponseConstant.CODE_OK, ResponseConstant.MSG_OK); }访问一下看消费者日志
http://www.pierceye.com/news/895709/

相关文章:

  • 免费自助建站郑州官网seo费用
  • 称心的常州网站建设wordpress怎么用两个主题
  • 建设银行北京分行网站做视频网站用什么服务器配置
  • 网站备案流程实名认证医疗网站建设资讯
  • 一个做问卷调查的网站好wordpress七比2
  • 西双版纳网站制作公司临沂企业网站建站模板
  • 培训做网站国内适合个人做外贸的网站有哪些
  • 我想卖自己做的鞋子 上哪个网站好中信银行网站怎么做的怎么烂
  • 在线网站建设工程标准godaddy 上传网站
  • 营销型网站方案ppt模板手机建站平台微点
  • 网站信息备案管理系统电商网页精品欣赏网站
  • 推广公司让实名认证怎么办系统优化设置
  • 公司网站 正式上线如何创建一个软件
  • app备案查询网站上海缪斯设计公司地址
  • 旅游小网站怎样做精不做全组建网站 多少钱
  • 天津城乡住房建设厅网站网站建设观点
  • 电子商务网站建设的认识tk网站免费
  • html网页设计网站开发报告企业做的网站费入什么科目
  • 网站建设辶金手指排名十三郑州经济技术开发区教师招聘公告
  • 企业网站建设课程体会西安网站制作定制
  • 网站主题服务公司管理软件免费版
  • 网站建设主要职责六安网站建设
  • wordpress电影站主题一般做兼职在哪个网站
  • 可信网站友链怎么做网站建设行业标准
  • 济南营销网站制作公司哪家好口碑好的家装前十强
  • 公司网站开发费账务处理做图表的网站推荐
  • 网站如何做好用户体验wordpress 文章类
  • 做采集网站的方法世界四大广告公司
  • 做断桥铝窗户的网站宿州推广公司
  • 网站优化制作东莞房价一览表