当前位置: 首页 > news >正文

网站建设属于什么类目手机网站开发需要哪些人才

网站建设属于什么类目,手机网站开发需要哪些人才,爱心互助网站开发,做百度网站优化多少钱这篇文章简单介绍如何在ubuntu上安装kafka#xff0c;并使用kafka完成消息的发送和接收。 一、安装kafka 访问kafka官网Apache Kafka#xff0c;然后点击快速开始 紧接着#xff0c;点击Download 最后点击下载链接下载安装包 如果下载缓慢#xff0c;博主已经把安装包上传… 这篇文章简单介绍如何在ubuntu上安装kafka并使用kafka完成消息的发送和接收。 一、安装kafka 访问kafka官网Apache Kafka然后点击快速开始 紧接着点击Download 最后点击下载链接下载安装包 如果下载缓慢博主已经把安装包上传到百度网盘 链接https://pan.baidu.com/s/1nZ1duIt64ZVUsimaQ1meZA?pwd3aoh 提取码3aoh --来自百度网盘超级会员V3的分享 二、启动kafka 经过上一步下载完成后按照页面的提示启动kafka 1、通过远程连接工具如finalshell、xshell上传kafka_2.13-3.6.0.tgz到服务器上的usr目录 2、切换到usr目录解压kafka_2.13-3.6.0.tgz cd /usrtar -zxzf kafka_2.13-3.6.0.tgz 3、启动zookeeper 修改配置文件confg/zookeeper.properties修改一下数据目录 dataDir/usr/local/zookeeper 然后通过以下命令启动kafka自带的zookeeper bin/zookeeper-server-start.sh config/zookeeper.properties 4、启动kafka 修改配置文件confg/server.properties修改一下kafka保存日志的目录 log.dirs/usr/local/kafka/logs 然后新开一个连接窗口通过以下命令启动kafka bin/kafka-server-start.sh config/server.properties 三、kafka发送、接收消息 创建topic bin/kafka-topics.sh --create --topic hello --bootstrap-server localhost:9092 生产消息 往刚刚创建的topic里发送消息可以一次性发送多条消息点击CtrlC完成发送 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic hello 消费消息 消费最新的消息 新开一个连接窗口在命令行输入以下命令拉取topic为hello上的消息 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic hello 消费之前的消息 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic hello 指定偏移量消费 指定从第几条消息开始消费这里--offset参数设置的偏移量是从0开始的。 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --partition 0 --offset 1 --topic hello 消息的分组消费 每个消费者都可以指定一个消费者组 kafka 中的同一条消息只能被同一个消费者组下的某一个消费 者消费。而不属于同一个消费者组的其他消费者也可以消费到这一条消息。 通过以下命令在启动消费者时设置分组 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.idhelloGroup --topic hello 四、Java中使用kafka 通过maven官网搜索kafka的maven依赖版本 https://central.sonatype.com/search?qkafkahttps://central.sonatype.com/search?qkafka然后通过IntelliJ IDEA创建一个maven项目kafka在pom.xml中添加kafka的依赖 ?xml version1.0 encodingUTF-8? project xmlnshttp://maven.apache.org/POM/4.0.0xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersiongroupIdorg.example/groupIdartifactIdkafka/artifactIdversion1.0-SNAPSHOT/versionpropertiesmaven.compiler.source8/maven.compiler.sourcemaven.compiler.target8/maven.compiler.target/propertiesdependenciesdependencygroupIdorg.apache.kafka/groupIdartifactIdkafka_2.12/artifactIdversion3.6.0/version/dependency/dependencies /project 创建消息生产者 生产者工厂类 package producer;import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig;import java.util.Properties;/*** 消息生产者工厂类* author heyunlin* version 1.0*/ public class MessageProducerFactory {private static final String BOOTSTRAP_SERVERS 192.168.254.128:9092;public static ProducerString, String getProducer() {//PART1:设置发送者相关属性Properties props new Properties();// 此处配置的是kafka的端口props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);// 配置key的序列化类props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer);// 配置value的序列化类props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer);return new KafkaProducer(props);}} 测试发送消息 package producer;import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata;/*** author heyunlin* version 1.0*/ public class MessageProducer {private static final String TOPIC hello;public static void main(String[] args) {ProducerRecordString, String record new ProducerRecord(TOPIC, 1, Message From Producer.);ProducerString, String producer MessageProducerFactory.getProducer();// 同步发送消息producer.send(record);// 异步发送消息producer.send(record, new Callback() {Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {String topic recordMetadata.topic();long offset recordMetadata.offset();int partition recordMetadata.partition();String message recordMetadata.toString();System.out.println(topic topic);System.out.println(offset offset);System.out.println(message message);System.out.println(partition partition);}});// 加上这行代码才会发送消息producer.close();}} 创建消息消费者 消费者工厂类 package consumer;import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Properties;/*** 消息生产者工厂类* author heyunlin* version 1.0*/ public class MessageConsumerFactory {private static final String BOOTSTRAP_SERVERS 192.168.254.128:9092;public static ConsumerString, String getConsumer() {//PART1:设置发送者相关属性Properties props new Properties();//kafka地址props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);//每个消费者要指定一个groupprops.put(ConsumerConfig.GROUP_ID_CONFIG, helloGroup);//key序列化类props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer);//value序列化类props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer);return new KafkaConsumer(props);}} 测试消费消息 package consumer;import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords;import java.time.Duration; import java.util.Collections;/*** author heyunlin* version 1.0*/ public class MessageConsumer {private static final String TOPIC hello;public static void main(String[] args) {ConsumerString, String consumer MessageConsumerFactory.getConsumer();consumer.subscribe(Collections.singletonList(TOPIC));while (true) {ConsumerRecordsString, String records consumer.poll(Duration.ofNanos(100));for (ConsumerRecordString, String record : records) {System.out.println(record.key() : record.value());}// 提交偏移量避免消息重复推送consumer.commitSync(); // 同步提交// consumer.commitAsync(); // 异步提交}}} 五、springboot整合kafka 开始前的准备工作 然后通过IntelliJ IDEA创建一个springboot项目springboot-kafka在pom.xml中添加kafka的依赖 dependencygroupIdorg.springframework.kafka/groupIdartifactIdspring-kafka/artifactId /dependency 然后修改application.yml添加kafka相关配置 spring:kafka:bootstrap-servers: 192.168.254.128:9092producer:acks: 1retries: 3batch-size: 16384properties:linger:ms: 0buffer-memory: 33554432key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:group-id: helloGroupenable-auto-commit: falseauto-commit-interval: 1000auto-offset-reset: latestproperties:request:timeout:ms: 18000session:timeout:ms: 12000key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer 创建消息生产者 package com.example.springboot.kafka.producer;import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RestController;/*** author heyunlin* version 1.0*/ RestController RequestMapping(path /producer, produces application/json;charsetutf-8) public class KafkaProducer {private final KafkaTemplateString, Object kafkaTemplate;Autowiredpublic KafkaProducer(KafkaTemplateString, Object kafkaTemplate) {this.kafkaTemplate kafkaTemplate;}RequestMapping(value /sendMessage, method RequestMethod.GET)public String sendMessage(String message) {kafkaTemplate.send(hello, message);return 发送成功~;}} 创建消息消费者 package com.example.springboot.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component;/*** author heyunlin* version 1.0*/ Component public class KafkaConsumer {KafkaListener(topics hello)public void receiveMessage(ConsumerRecordString, String record) {String topic record.topic();long offset record.offset();int partition record.partition();System.out.println(topic topic);System.out.println(offset offset);System.out.println(partition partition);}} 然后访问网址http://localhost:8080/producer/sendMessage?messagehello往topic为hello的消息队列发送消息。控制台打印了参数成功监听到发送的消息。 文章涉及的项目已经上传到gitee按需获取~ Java中操作kafka的基本项目https://gitee.com/he-yunlin/kafka.git springboot整合kafka案例项目https://gitee.com/he-yunlin/springboot-kafka.git
http://www.pierceye.com/news/415261/

相关文章:

  • 产品包装设计素材网站均安建网站
  • 甘肃建设网站找别人做网站需要注意什么
  • php做电子商城网站创业找项目
  • 网站建设建设公司有哪些帝国手机网站cms系统
  • 网站设计包括什么软件房产交易网上预约平台
  • 企业做网站有什么好处坏处四川住房城乡和城乡建设厅网站首页
  • 小学学校网站建设计划广州seo顾问服务
  • 做淘宝素材网站哪个好用网站制作网站建设需要多少钱
  • 住房建设部官方网站设计费计取wordpress仿百度搜索主题
  • 云建站平台哪家好沈阳百度seo关键词排名优化软件
  • 响应式网站设计的优点国内优秀设计网站
  • 网站开发集成环境国内html5网站欣赏
  • iis7.5 没有默认网站北京seo的排名优化
  • 两学一做网站是多少钱营销型网站策划怎么做
  • 渭南做网站的自建房设计图
  • 移动网站建设价格线上推广专员是干嘛的
  • 做化妆刷的外贸网站企业网站托管备案
  • 湖南省建设干部学校 网站金融直播室网站建设
  • 贵州建设厅特殊工种考试网站photoshop平面设计教学视频
  • 怎么推广我的网站代理网站推荐
  • wordpress主题站模板做网站跟做APP哪个容易
  • 杭州网站建设公司推荐网站建设优化服务渠道
  • php是网站开发语言吗做网站前端需要编程基础吗
  • python 网站开发 前端企业信用信息系统官网
  • 公司网站设计有哪些使用技巧呢商城网站建设怎么收费
  • 东莞做网站平台安阳营销型网站建设
  • 如何查看网站开发语言百度排行榜风云榜
  • 泉州 网站建设公司首选广告设计公司名字有寓意有创意
  • 天津个人做网站慈利网站制作
  • 专门做推广的网站吗宿迁房价2023年最新房价