开发购物平台网站费用,智能网站建设商家,wordpress 1g内存够吗,网站建设就业培训目录
1. kafka 下载
2. 修改配置文件
2.1 文件夹内容
2.2 创建一个 data 空文件夹
2.3 修改 zookeeper.properties 配置文件
2.4 修改 server.properties 配置文件
2.5 创建 zk.cmd windows脚本文件
2.6 创建 kfk.cmd windows脚本文件
3. 启动…目录
1. kafka 下载
2. 修改配置文件
2.1 文件夹内容
2.2 创建一个 data 空文件夹
2.3 修改 zookeeper.properties 配置文件
2.4 修改 server.properties 配置文件
2.5 创建 zk.cmd windows脚本文件
2.6 创建 kfk.cmd windows脚本文件
3. 启动 kafka
4. 创建主题生产者消费者演示
4.1 创建 topic 主题
4.2 命令行创建生产者
4.3 命令行创建消费者
4.4 生产者发送消息供消费者消费
5. demo 书写
5.1 创建项目
5.2 引入依赖 5.3 生产者测试类代码书写
5.4 消费者测试类代码书写 1. kafka 下载
本片是小白入门篇所以我们不以Linux操作系统为例选择大多数小白都用的windows。
ksfka下载链接如下点击链接进入官网即可下载
温馨提示JDK版本至少需要1.8高版本也可兼容
Apache Kafkahttps://kafka.apache.org/downloads本篇中以kafka_2.1.3-3.6.1版本为例直接点击对应的版本下载即可tgz包就类似于我们常见的zip下载完成之后解压即可。 下载完毕我们就可以解压得到 kafka 了 解压之后就可以得到 kafka 文件了 2. 修改配置文件
2.1 文件夹内容
打开文件夹后可以发现内部含有bin文件夹config配置夹libs依赖夹等和JDKmaven 问价夹的格式如出一辙 2.2 创建一个 data 空文件夹
后续需要用来存放日志文件只要创建完成就可以了kafka启动后会自动生成日志文件 2.3 修改 zookeeper.properties 配置文件
我们点击进入config文件夹找到 zookeeper.properties 配置文件双击进行修改 然后我们找到 dataDir 将它的值修改为我们刚才创建的 data 文件的路径还要注意一点在后面还要多加一个 /zk因为一会还要配置 server.properties 所以要用将她们两个区分开 2.4 修改 server.properties 配置文件
和刚才一样我们双击修改 server.properties 配置文件 我们修改 log.dirs 的值为刚才创建的 data 文件夹的路径在路径末尾再添加上 /kafka 用来和刚才的zk做区分kafka 文件夹用来存放kafka的日志文件zk 文件夹用来存放zoopeeper的日志文件 2.5 创建 zk.cmd windows脚本文件 以记事本的方式打开然后加入下面这句话
这句话的含义就是启动 Zookeeper 并且启动文件为 zookeeper.properties
call bin/windows/zookeeper-server-start.bat config/zookeeper.properties
2.6 创建 kfk.cmd windows脚本文件
仍然以记事本的方式打开然后加入下面这句话
这句话的含义就是启动 kafka 并且启动文件为 server.properties
call bin/windows/kafka-server-start.bat config/server.properties
此时我们的 kafka 文件夹中就多了我们刚刚创建的 data 文件夹kafka.cmd 脚本文件zk.cmd 脚本文件 3. 启动 kafka
经过第二部的配置现在一切都已经准备就绪我们只需要双击 zk.cmd 和 kafka.cmd 脚本文件启动kafka
这里需要注意一点必须先启动双击 zk.cmd 启动 zookeeper
再双击 kafka.cmd 启动 kafka关闭的时候需要先关闭 kafka再关闭 zookeeper 4. 创建主题生产者消费者演示
4.1 创建 topic 主题
我们来到 bin 文件夹下的 windows 文件夹打开 cmd 命令窗口运行下方命令
# --bootstrap-server localhost:9092 配置服务器连接此处为本机9092为kafka默认端口号
# --topic test 创建topic主题主题名称为 test
# --create 创建topic主题命令
kafka-topics.bat --bootstrap-server localhost:9092 --topic test --create 4.2 命令行创建生产者
仍然是在 windows 文件夹下新建一个命令窗口刚才我们已经创建出了名为 topic 的主题现在运行如下命令启动脚本文件创建生产者连接上我们的 topic 主题
# 运行 kafka-console-producer.bat 脚本创建生产者连接本机9092端口名为 test 的主题
kafka-console-producer.bat --bootstrap-server localhost:9092 --topic test 运行如下所示会出现一个小箭头就说明我们链接主题成功我们生产者发布的主题都会发送到 topic 主题中供消费者去消费使用 4.3 命令行创建消费者
# 运行 kafka-console-consumer.bat 脚本创建消费者连接本机9092端口名为 test 的主题
windowskafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test
运行成功不会有任何显示 4.4 生产者发送消息供消费者消费
如下图所示我在生产者命令窗口输入 hello kafka点击回车我们就可以在消费者中命令窗口中看到发送过来的 hello kafka 消息 5. demo 书写
5.1 创建项目 5.2 引入依赖
在 maven 项目的 pom.xml 文件中加入下方依赖
dependenciesdependencygroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactIdversion3.6.1/version/dependency/dependencies 5.3 生产者测试类代码书写
我们随便创建一个类即可名字随意取代码逻辑备有注释
public class KafkaProducerTest {public static void main(String[] args) {// TODO 创建配置对象// 创建生产者对象又分为两步// 1. 创建配置对象集合MapString, Object configMap new HashMap();configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,localhost:9092);// 2. 配置数据 Key-Value 的序列化方式configMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());configMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());// TODO 创建生产者对象// 将配置对象集合作为参数传入// 返回值就是生产者对象KafkaProducerString, String kafkaProducer new KafkaProducerString, String(configMap);// TODO 创建数据// 自己定义一个数据传入三个参数第一个参数为主题第二个参数为数据的key,第三个参数为数据的valueProducerRecordString,String record new ProducerRecord(test,first,hello kafka);// TODO 发送数据kafkaProducer.send(record);// TODO 关闭生产者对象kafkaProducer.close();}
}
5.4 消费者测试类代码书写
public class KafkaConsumerTest {public static void main(String[] args) {// TODO 创建消费者配置对象// 创建消费者配置对象集合HashMapString, Object consumerConfig new HashMap();consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,localhost:9092);consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());// TODO 创建消费者对象KafkaConsumerString,String consumer new KafkaConsumerString, String(consumerConfig);// TODO 订阅消费主体主题名称为 testconsumer.subscribe(Collections.singletonList(test));// TODO 从主题中获取数据消费final ConsumerRecordsString, String datas consumer.poll(100);for (ConsumerRecordString,String data : datas){System.out.println(data);}// TODO 关闭消费者对象consumer.close();}
}