淄博周村学校网站建设报价,找网站公司企业备案,卡易售网站建设,怎么修改WordPress文件結構前言由于kafka强依赖于zookeeper#xff0c;所以需先搭建好zookeeper集群。由于zookeeper是由java编写的#xff0c;需运行在jvm上#xff0c;所以首先应具备java环境。(ps#xff1a;默认您的centos系统可联网#xff0c;本教程就不教配置ip什么的了)(ps2#xff1a;没有… 前言由于kafka强依赖于zookeeper所以需先搭建好zookeeper集群。由于zookeeper是由java编写的需运行在jvm上所以首先应具备java环境。(ps默认您的centos系统可联网本教程就不教配置ip什么的了)(ps2没有wget的先装一下yum install wget)(ps3人啊就是要条理。东边放一点西边放一点过段时间就不知道自己装在哪里了。本教程所有下载均放在/usr/local目录下)(ps4kafka可能有内置zookeeper感觉可以越过zookeeper教程但是这里也配置出来了。我没试过)一、配置jdk因为oracle 公司不允许直接通过wget 下载官网上的jdk包。所以你直接wget以下地址下载下来的是一个只有5k的网页文件而已并不是需要的jdk包。(垄断地位就是任性)。(请通过java -version判断是否自带jdk我的没带)1、官网下载下面是jdk8的官方下载地址https://www.oracle.com/technetwork/java/javase/downloads/java-archive-javase8u211-later-5573849.html2、上传解压这里通过xftp上传到服务器指定位置/usr/local对压缩文件进行解压tar -zxvf jdk-8u221-linux-x64.tar.gz对解压后的文件夹进行改名mv jdk1.8.0_221 jdk1.83、配置环境变量vim /etc/profile#java environmentexport JAVA_HOME/usr/local/jdk1.8export CLASSPATH.:${JAVA_HOME}/jre/lib/rt.jar:${JAVA_HOME}/lib/dt.jar:${JAVA_HOME}/lib/tools.jarexport PATH$PATH:${JAVA_HOME}/bin操作之后的界面如下运行命令使环境生效source /etc/profile二、搭建zookeeper集群1、下载zookeeper创建zookeeper目录在该目录下进行下载mkdir /usr/local/zookeeper这一步如果出现连接被拒绝时可多试几次我就是第二次请求才成功的。wget http://archive.apache.org/dist/zookeeper/zookeeper-3.4.6/zookeeper-3.4.6.tar.gz等待下载完成之后解压tar -zxvf zookeeper-3.4.6.tar.gz重命名为zookeeper1mv zookeeper-3.4.6 zookeeper1cp -r zookeeper1 zookeeper2cp -r zookeeper1 zookeeper32、创建data、logs文件夹在zookeeper1目录下创建在data目录下新建myid文件。内容为13、修改zoo.cfg文件cd /usr/local/zookeeper/zookeeper1/conf/cp zoo_sample.cfg zoo.cfg进行过上面两步之后有zoo.cfg文件了现在修改内容为dataDir/usr/local/zookeeper/zookeeper1/datadataLogDir/usr/local/zookeeper/zookeeper1/logsserver.1192.168.233.11:2888:3888server.2192.168.233.11:2889:3889server.3192.168.233.11:2890:38904、搭建zookeeper2首先复制改名。cd /usr/local/zookeeper/cp -r zookeeper1 zookeeper2然后修改具体的某些配置vim zookeeper2/conf/zoo.cfg将下图三个地方1改成2vim zookeeper2/data/myid同时将myid中的值改成25、搭建zookeeper3同上复制改名cp -r zookeeper1 zookeeper3vim zookeeper3/conf/zoo.cfg修改为3vim zookeeper3/data/myid修改为36、测试zookeeper集群cd /usr/local/zookeeper/zookeeper1/bin/由于启动所需代码比较多这里简单写了一个启动脚本vim startstart的内容如下cd /usr/local/zookeeper/zookeeper1/bin/./zkServer.sh start ../conf/zoo.cfgcd /usr/local/zookeeper/zookeeper2/bin/./zkServer.sh start ../conf/zoo.cfgcd /usr/local/zookeeper/zookeeper3/bin/./zkServer.sh start ../conf/zoo.cfg下面是连接脚本vim loginlogin内容如下./zkCli.sh -server 192.168.233.11:2181,192.168.233.11:2182,192.168.233.11:2183脚本编写完成接下来启动sh startsh login启动集群成功如下图这里zookeeper就告一段落了由于zookeeper占用着输入窗口这里可以在xshell右键标签新建ssh渠道。然后就可以在新窗口继续操作kafka了三、搭建kafka集群1、下载kafka首先创建kafka目录mkdir /usr/local/kafka然后在该目录下载cd /usr/local/kafka/wget https://archive.apache.org/dist/kafka/1.1.0/kafka_2.11-1.1.0.tgz下载成功之后解压tar -zxvf kafka_2.11-1.1.0.tgz2、修改集群配置首先进入conf目录下cd /usr/local/kafka/kafka_2.11-1.1.0/config修改server.properties修改内容broker.id0log.dirs/tmp/kafka-logslistenersPLAINTEXT://192.168.233.11:9092复制两份server.propertiescp server.properties server2.propertiescp server.properties server3.properties修改server2.propertiesvim server2.properties修改主要内容为broker.id1log.dirs/tmp/kafka-logs1listenersPLAINTEXT://192.168.233.11:9093如上修改server3.properties修改内容为broker.id2log.dirs/tmp/kafka-logs2listenersPLAINTEXT://192.168.233.11:90943、启动kafka这里还是在bin目录编写一个脚本cd ../bin/vim start脚本内容为./kafka-server-start.sh ../config/server.properties ./kafka-server-start.sh ../config/server2.properties ./kafka-server-start.sh ../config/server3.properties 通过jps命令可以查看到共启动了3个kafka。4、创建Topiccd /usr/local/kafka/kafka_2.11-1.1.0bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topickafka打印了几条日志在启动的zookeeper中可以通过命令查询到这条topicls /brokers/topics查看kafka状态bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic可以看到此时有三个节点 1 2 0Leader 是1 因为分区只有一个 所以在0上面Replicas主从备份是 1,2,0ISR(in-sync)现在存活的信息也是 1,2,05、启动生产者bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic由于不能按删除不能按左右键去调整所以语句有些乱啊。em…6、启动消费者bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic可以看出启动消费者之后就会自动消费。在生产者又造了一条。消费者自动捕获成功四、集成springboot先贴一张kafka兼容性目录不满足的话启动springboot的时候会抛异常的ps该走的岔路我都走了o(╥﹏╥)o(我的kafka-clients是1.1.0spring-kafka是2.2.2中间那列暂时不用管)回归正题搞了两个小时终于搞好了想哭…遇到的问题基本就是jar版本不匹配。上面的步骤我也都会相应的去修改争取大家按照本教程一遍过1、pom文件?xml version1.0 encodingUTF-8?project xmlnshttp://maven.apache.org/POM/4.0.0 xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0modelVersionparentgroupIdorg.springframework.bootgroupIdartifactIdspring-boot-starter-parentartifactIdversion2.1.1.RELEASEversionrelativePath/ parentgroupIdcom.gzkygroupIdartifactIdstudyartifactIdversion0.0.1-SNAPSHOTversionnamestudynamedescriptionDemo project for Spring Bootdescriptionpropertiesjava.version1.8java.versionpropertiesdependenciesdependencygroupIdorg.springframework.bootgroupIdartifactIdspring-boot-starter-webartifactIddependencydependencygroupIdorg.springframework.bootgroupIdartifactIdspring-boot-starter-testartifactIdscopetestscopeexclusionsexclusiongroupIdorg.junit.vintagegroupIdartifactIdjunit-vintage-engineartifactIdexclusionexclusionsdependencydependencygroupIdorg.springframework.bootgroupIdartifactIdspring-boot-starter-redisartifactIdversion1.3.8.RELEASEversiondependencydependencygroupIdredis.clientsgroupIdartifactIdjedisartifactIddependencydependencygroupIdorg.springframework.kafkagroupIdartifactIdspring-kafkaartifactIdversion2.2.0.RELEASEversiondependencydependencygroupIdorg.apache.kafkagroupIdartifactIdkafka-clientsartifactIddependencydependenciesbuildpluginsplugingroupIdorg.springframework.bootgroupIdartifactIdspring-boot-maven-pluginartifactIdpluginpluginsbuildprojectpom文件中重点是下面这两个版本。parentgroupIdorg.springframework.bootgroupIdartifactIdspring-boot-starter-parentartifactIdversion2.1.1.RELEASEversionrelativePath/ parentdependencygroupIdorg.springframework.kafkagroupIdartifactIdspring-kafkaartifactIdversion2.2.0.RELEASEversiondependency2、application.ymlspring:redis:cluster:#设置key的生存时间当key过期时它会被自动删除expire-seconds: 120#设置命令的执行时间如果超过这个时间则报错;command-timeout: 5000#设置redis集群的节点信息其中namenode为域名解析通过解析域名来获取相应的地址;nodes: 192.168.233.11:9001,192.168.233.11:9002,192.168.233.11:9003,192.168.233.11:9004,192.168.233.11:9005,192.168.233.11:9006kafka:# 指定kafka 代理地址可以多个bootstrap-servers: 192.168.233.11:9092,192.168.233.11:9093,192.168.233.11:9094producer:retries: 0# 每次批量发送消息的数量batch-size: 16384buffer-memory: 33554432# 指定消息key和消息体的编解码方式key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:# 指定默认消费者group idgroup-id: test-groupauto-offset-reset: earliestenable-auto-commit: trueauto-commit-interval: 100# 指定消息key和消息体的编解码方式key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerserver:port: 8085servlet:#context-path: /rediscontext-path: /kafka没有配置Redis的可以把Redis部分删掉也就是下图想学习配置Redis集群的可以参考《Redis集群redis-cluster的搭建及集成springboot》3、生产者package com.gzky.study.utils;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.stereotype.Component;/** * kafka生产者工具类 * * author biws * date 2019/12/17 **/Componentpublic class KfkaProducer {private static Logger logger LoggerFactory.getLogger(KfkaProducer.class);Autowiredprivate KafkaTemplate kafkaTemplate;/** * 生产数据 * param str 具体数据 */public void send(String str) { logger.info(生产数据 str); kafkaTemplate.send(testTopic, str); }}4、消费者package com.gzky.study.utils;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.stereotype.Component;/** * kafka消费者监听消息 * * author biws * date 2019/12/17 **/Componentpublic class KafkaConsumerListener {private static Logger logger LoggerFactory.getLogger(KafkaConsumerListener.class);KafkaListener(topics testTopic)public void onMessage(String str){//insert(str);//这里为插入数据库代码 logger.info(监听到 str); System.out.println(监听到 str); }}5、对外接口package com.gzky.study.controller;import com.gzky.study.utils.KfkaProducer;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.*;/** * kafka对外接口 * * author biws * date 2019/12/17 **/RestControllerpublic class KafkaController {Autowired KfkaProducer kfkaProducer;/** * 生产消息 * param str * return */RequestMapping(value /sendKafkaWithTestTopic,method RequestMethod.GET)ResponseBody public boolean sendTopic(RequestParam String str){kfkaProducer.send(str);return true; }}6、postman测试这里首先应该在服务器启动监听器(kafka根目录),下面命令必须是具体的服务器ip不能是localhost是我踩过的坑推荐此处重启一下集群关闭kafka命令cd /usr/local/kafka/kafka_2.11-1.1.0/bin./kafka-server-stop.sh ../config/server.properties ./kafka-server-stop.sh ../config/server2.properties ./kafka-server-stop.sh ../config/server3.properties 此处应该jps看一下等待所有的kafka都关闭(关不掉的kill掉)再重新启动kafka./kafka-server-start.sh ../config/server.properties ./kafka-server-start.sh ../config/server2.properties ./kafka-server-start.sh ../config/server3.properties 等待kafka启动成功后启动消费者监听端口cd /usr/local/kafka/kafka_2.11-1.1.0bin/kafka-console-consumer.sh --bootstrap-server 192.168.233.11:9092 --from-beginning --topic testTopic曾经我乱输的测试信息全部被监听过来了启动springboot服务然后用postman生产消息然后享受成果服务器端监听成功。项目中也监听成功点个在看 你最好看往期推荐腾讯强推Redis成长手册原理应用集群拓展源码五飞阿里要求其程序员必须精通的并发编程笔记原理模式应用支付宝阿牛整合NettyRedisZK「终极」高并发手册饿了么架构师发布“绝版”Java并发实现原理JDK源码剖析不吹不擂10年架构师公开分享SQL工作笔记5字真言总结到位由浅入深吃透容器云微服务K8SMQ阿里云内部实施手册