当前位置: 首页 > news >正文

怎样创造一个网站长沙景点必去

怎样创造一个网站,长沙景点必去,wordpress速度慢图片,哪个网站做期货数字币文章目录 第1关#xff1a;kafka - 初体验任务描述相关知识Kafka 简述Kafka 应用场景Kafka 架构组件kafka 常用命令 编程要求测试说明答案代码 第2关#xff1a;生产者 #xff08;Producer #xff09; - 简单模式任务描述相关知识Producer 简单模式Producer 的开发步骤Ka… 文章目录 第1关kafka - 初体验任务描述相关知识Kafka 简述Kafka 应用场景Kafka 架构组件kafka 常用命令 编程要求测试说明答案代码 第2关生产者 Producer - 简单模式任务描述相关知识Producer 简单模式Producer 的开发步骤Kafka 常用配置参数 编程要求测试说明答案代码 第3关消费者 Consumer- 自动提交偏移量任务描述相关知识Kafka 消费者开发步骤自动提交偏移量的优劣 编程要求测试说明答案代码 第4关消费者 Consumer - 手动提交偏移量任务描述相关知识Kafka 两种手动提交方式 编程要求测试说明答案代码 第1关kafka - 初体验 任务描述 本关任务使用 Kafka 命令创建一个副本数量为1、分区数量为3的 Topic 。 相关知识 为了完成本关任务你需要掌握1.如何使用 Kafka 的常用命令。 课程视频《Kafka简介》 Kafka 简述 类 JMS 消息队列结合 JMS 中的两种模式可以有多个消费者主动拉取数据在 JMS 中只有点对点模式才有消费者主动拉取数据。 Kafka 是一个生产-消费模型。 Producer 消息生产者就是向 Kafka Broker 发消息的客户端。 Consumer 消息消费者向 Kafka Broker 取消息的客户端。 Topic 我们可以理解为一个队列。 Consumer Group CG这是 Kafka 用来实现一个 Topic 消息的广播发给所有的 Consumer 和单播发给任意一个 Consumer 的手段。一个 Topic 可以有多个CG。Topic 的消息会复制不是真的复制是概念上的到所有的 CG 但每个 Partion 只会把消息发给该 CG 中的一个 Consumer 。如果需要实现广播只要每个 Consumer 有一个独立的 CG 就可以了。要实现单播只要所有的 Consumer 在同一个 CG。用CG 还可以将 Consumer 进行自由的分组而不需要多次发送消息到不同的 Topic。 Broker 一台 Kafka 服务器就是一个 Broker 。一个集群由多个Broker组成。一个 Broker 可以容纳多个 Topic。 Partition 为了实现扩展性一个非常大的 Topic 可以分布到多个Broker即服务器上一个 Topic 可以分为多个 Partition 每个 Partition 是一个有序的队列。Partition 中的每条消息都会被分配一个有序的 Id Offset 。Kafka 只保证按一个 Partition 中的顺序将消息发给 Consumer 不保证一个 Topic 的整体多个 Partition间的顺序。 Offset Kafka 的存储文件都是按照 Offset . index 来命名用Offset 做名字的好处是方便查找。例如你想找位于2049的位置只要找到 2048 . index 的文件即可。当然 the first offset 就是 00000000000 . index。 Kafka 应用场景 日志收集一个公司可以用 Kafka 可以收集各种服务的 Log 通过Kafka 以统一接口服务的方式开放给各种 Consumer 例如 Hadoop 、Hbase 、Solr 等。消息系统解耦和生产者和消费者、缓存消息等。用户活动跟踪Kafka 经常被用来记录 Web 用户或者 App 用户的各种活动如浏览网页、搜索、点击等活动这些活动信息被各个服务器发布到 Kafka 的Topic 中然后订阅者通过订阅这些 Topic 来做实时的监控分析或者装载到 Hadoop 、数据仓库中做离线分析和挖掘。运营指标Kafka 也经常用来记录运营监控数据。包括收集各种分布式应用的数据生产各种操作的集中反馈比如报警和报告。流式处理比如 Spark streaming 和 Storm、Flink。事件源。 Kafka 架构组件 Kafka 中发布订阅的对象是 Topic。我们可以为每类数据创建一个 Topic 把向 Topic 发布消息的客户端称作 Producer 从 Topic 订阅消息的客户端称作 Consumer 。Producers 和 Consumers 可以同时从多个 Topic 读写数据。一个 Kafka 集群由一个或多个 Broker 服务器组成它负责持久化和备份具体的 Kafka 消息。 kafka 常用命令 查看当前服务器中的所有 Topic bin/kafka-topics.sh --list --zookeeper zk01:2181创建 Topic ./kafka-topics.sh --create --zookeeper zk01:2181 --replication-factor 1 --partitions 3 --topic first 说明replication-factor 是指副本数量partitions 是指分区数量 删除 Topic bin/kafka-topics.sh --delete --zookeeper zk01:2181 --topic test 需要 server.properties 中设置 delete.topic.enable true 否则只是标记删除或者直接重启。通过 Shell 命令发送消息 kafka-console-producer.sh --broker-list kafka01:9092 --topic demo通过 Shell 消费消息 bin/kafka-console-consumer.sh --zookeeper zk01:2181 --from-beginning --topic test1查看消费位置 kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper zk01:2181 --group testGroup查看某个 Topic 的详情 kafka-topics.sh --topic test --describe --zookeeper zk01:2181 说明 此处的 zk01 是 Zookeeper 的 IP 地址 kafka01 是 Broker 的 IP 地址 编程要求 根据提示在右侧编辑器补充代码完成以下任务。 创建一个副本数量为1、分区数量为3、名为 demo 的 Topic查看所有 Topic查看名为 demo 的 Topic 的详情信息 注意Broker 的 IP 为127.0.0.1Zookeeper 的 IP 为127.0.0.1 扩展任务 使用一个命令行开启 Kafka Producer Shell 窗口并对名为 demo 的 Topic 进行数据生产使用另一个命令行开启 Kafka Customer Shell 窗口并对名为 demo 的 Topic进行消费 说明扩展任务没有进行评测此任务目的是为了让用户体验下 Kafka 的数据生产与数据消费的两个环节更好理解 Kafka 测试说明 平台会对你的命令进行检验并运行你只需要按照任务需求补充右侧编辑器的代码然后点击评测就ok了。 - - 特别注意为了确保运行拥有一个正常的运行环境请在评测之前重置下运行环境 答案代码 命令行代码 kafka-server-start.sh /opt/kafka_2.11-1.1.0/config/server.propertiesshell 文件 #!/bin/bash#1.创建一个副本数量为1、分区数量为3、名为 demo 的 Topic kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 3 --topic demo#2.查看所有Topic kafka-topics.sh --list --zookeeper 127.0.0.1:2181#3.查看名为demo的Topic的详情信息 kafka-topics.sh --topic demo --describe --zookeeper 127.0.0.1:2181 第2关生产者 Producer - 简单模式 任务描述 本关任务编写一个 Kafka 的 Producer 进行数据生产。 相关知识 为了完成本关任务你需要掌握1.如何使用 Kafka 的 Producer API 进行数据生产。 课程视频《使用Python生产消费kafka的数据》 Producer 简单模式 Producer 采用默认分区方式将消息散列的发送到各个分区当中。 Producer 的开发步骤 创建配置文件对象 Properties props new Properties(); 设置连接 Kakfa 的基本参数如下 props.put(bootstrap.servers, kafka-01:9092,kafka-02:9092,kafka-03:9092); props.put(acks, 1); props.put(key.serializer, org.apache.kafka.common.serialization.StringSerializer); props.put(value.serializer, org.apache.kafka.common.serialization.StringSerializer);创建 Kafka 生产者对象ProducerString, String producer new KafkaProducer(props); 发送消息producer.send(new ProducerRecordString, String(Topic, key, value)); Kafka 常用配置参数 名称说明默认值有效值重要性bootstrap.serverskafka集群的broker-list如 hadoop01:9092,hadoop02:9092无必选key.serializerkey的序列化器ByteArraySerializer StringSerializer必选value.serializervalue的序列化器ByteArraySerializer StringSerializer必选acks确保生产者可靠性设置有三个选项 acks0不等待成功返回 acks1等Leader写成功返回 acksall等Leader和所有ISR中的Follower写成功返回all也可以用-1代替-10,1,-1,all建议必选buffer.memoryProducer总体内存大小33554432不要超过物理内存根据实际情况调整建议必选batch.size每个partition的未发送消息大小16384根据实际情况调整建议必选 编程要求 根据提示在右侧编辑器补充代码使用 Kafka Producer API 对名为 demo 的 Topic 进行数据生产。 测试说明 平台会对你的命令进行检验并运行你只需要按照任务需求补充右侧编辑器的代码然后点击评测就ok了。 - - 答案代码 用 conf/server.properties 如果用 config/server.properties 的话需要把 log.dirs 和 num.partitions 这两个配置改了 cd $KAFKA_HOME/ vim config/server.properties# 使用 :/log.dirs 找到位置或者直接 :$ 在最后一行加 log.dirs/export/servers/logs/kafka/ num.partitions2命令行执行代码 # kafka 依赖 zookeeper所以需要先启动 zookeeper 服务 cd $ZOOKEEPER_HOME/ bin/zkServer.sh start conf/zoo.cfg # 启动 Kafka 服务 cd $KAFKA_HOME/ bin/kafka-server-start.sh -daemon conf/server.propertiesJava 代码 package net.educoder; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; /*** kafka producer 简单模式*/ public class App {public static void main(String[] args) {/*** 1.创建配置文件对象一般采用 Properties*//**----------------begin-----------------------*/Properties props new Properties();/**-----------------end-------------------------*//*** 2.设置kafka的一些参数* bootstrap.servers -- kafka的连接地址 127.0.0.1:9092* key、value的序列化类 --org.apache.kafka.common.serialization.StringSerializer* acks1-1,0*//**-----------------begin-----------------------*/props.put(bootstrap.servers, 127.0.0.1:9092);props.put(acks, 1);props.put(key.serializer, org.apache.kafka.common.serialization.StringSerializer);props.put(value.serializer, org.apache.kafka.common.serialization.StringSerializer);/**-----------------end-------------------------*//*** 3.构建kafkaProducer对象*//**-----------------begin-----------------------*/ProducerString, String producer new KafkaProducer(props);/**-----------------end-------------------------*/for (int i 0; i 100; i) {ProducerRecordString, String record new ProducerRecord(demo, i , i );/*** 4.发送消息*//**-----------------begin-----------------------*/producer.send(record);/**-----------------end-------------------------*/}producer.close();} } 第3关消费者 Consumer- 自动提交偏移量 任务描述 本关任务编写一个 Kafka 消费者并设置自动提交偏移量进行数据消费。 相关知识 为了完成本关任务你需要掌握1.如何编写 Kafka 消费者2.如何使用自动提交偏移量。 Kafka 消费者开发步骤 创建配置文件对象 Properties props new Properties(); 设置连接 Kakfa 的基本参数如下 //设置kafka集群的地址 props.put(bootstrap.servers, 127.0.0.1:9092); //设置消费者组组名字自定义组名字相同的消费者在一个组 props.put(group.id, g1); //开启offset自动提交 props.put(enable.auto.commit, true); //自动提交时间间隔 props.put(auto.commit.interval.ms, 1000); //序列化器 props.put(key.deserializer, org.apache.kafka.common.serialization.StringDeserializer); props.put(value.deserializer, org.apache.kafka.common.serialization.StringDeserializer);创建 Kafka 消费者对象 KafkaConsumerString, String consumer new KafkaConsumer(props); 订阅主题 Topic consumer.subscribe(Arrays.asList(demo)); 消费 Topic 的数据 while (true) {ConsumerRecordsString, String records consumer.poll(100);for (ConsumerRecordString, String record : records)System.out.printf(offset %d, key %s, value %s%n, record.offset(), record.key(), record.value());}自动提交偏移量的优劣 消费者拉取数据之后自动提交偏移量不关心后续对消息的处理是否正确。 优点消费快适用于数据一致性弱的业务场景缺点消息很容易丢失 编程要求 使用 Kafka Consumer API 对名为 demo 的 Topic 进行消费并设置自动提交偏移量。 测试说明 平台会对你的命令进行检验并运行你只需要按照任务需求补充右侧编辑器的代码然后点击评测就 ok 了。 - - 答案代码 package net.educoder; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Arrays; import java.util.Properties; public class App {public static void main(String[] args) {Properties props new Properties();/**--------------begin----------------*///1.设置kafka集群的地址props.put(bootstrap.servers, 127.0.0.1:9092);//2.设置消费者组组名字自定义组名字相同的消费者在一个组props.put(group.id, g1);//3.开启offset自动提交props.put(enable.auto.commit, true);//4.自动提交时间间隔props.put(auto.commit.interval.ms, 1000);//5.序列化器props.put(key.deserializer, org.apache.kafka.common.serialization.StringDeserializer);props.put(value.deserializer, org.apache.kafka.common.serialization.StringDeserializer);/**---------------end---------------*//**--------------begin----------------*///6.创建kafka消费者KafkaConsumerString, String consumer new KafkaConsumer(props);//7.订阅kafka的topicconsumer.subscribe(Arrays.asList(demo));/**---------------end---------------*/int i 1;while (true) {/**----------------------begin--------------------------------*///8.poll消息数据,返回的变量为crsConsumerRecordsString, String crs consumer.poll(100);for (ConsumerRecordString, String cr : crs) {System.out.println(consume data: i);i;}/**----------------------end--------------------------------*/if (i 10) {return;}}} } 第4关消费者 Consumer - 手动提交偏移量 任务描述 本关任务编写一个 Kafka 消费者并使用手动提交偏移量进行数据消费。 相关知识 为了完成本关任务你需要掌握1.如何编写 Kafka 消费者2.如何手动提交偏移量。 Kafka 两种手动提交方式 异步提交( CommitAsync ) 异步模式下提交失败也不会尝试提交。消费者线程不会被阻塞因为异步操作可能在提交偏移量操作结果未返回时就开始下一次拉取操作。 同步提交( CommitSync ) 同步模式下提交失败时一直尝试提交直到遇到无法重试才结束。同步方式下消费者线程在拉取消息时会被阻塞直到偏移量提交操作成功或者在提交过程中发生错误。 注意实现手动提交前需要在创建消费者时关闭自动提交设置enable.auto.commitfalse 编程要求 根据提示在右侧编辑器补充代码使用 Kafka Producer API 对名为 demo 的 Topic 进行数据生产 测试说明 平台会对你的命令进行检验并运行你只需要按照任务需求补充右侧编辑器的代码然后点击评测就ok了。 - - 答案代码 命令行代码 # kafka 依赖 zookeeper所以需要先启动 zookeeper 服务 cd $ZOOKEEPER_HOME/ bin/zkServer.sh start conf/zoo.cfg # 启动 Kafka 服务 cd $KAFKA_HOME/ bin/kafka-server-start.sh -daemon conf/server.propertiesJava 代码 package net.educoder; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Properties; public class App {public static void main(String[] args){Properties props new Properties();/**-----------------begin------------------------*///1.设置kafka集群的地址props.put(bootstrap.servers, 127.0.0.1:9092);//2.设置消费者组组名字自定义组名字相同的消费者在一个组props.put(group.id, g1);//3.关闭offset自动提交props.put(enable.auto.commit, false);//4.序列化器props.put(key.deserializer, org.apache.kafka.common.serialization.StringDeserializer);props.put(value.deserializer, org.apache.kafka.common.serialization.StringDeserializer);/**-----------------end------------------------*//**-----------------begin------------------------*///5.实例化一个消费者KafkaConsumerString, String consumer new KafkaConsumer(props);//6.消费者订阅主题订阅名为demo的主题consumer.subscribe(Arrays.asList(demo));/**-----------------end------------------------*/final int minBatchSize 10;ListConsumerRecordString, String buffer new ArrayList();while (true) {ConsumerRecordsString, String records consumer.poll(100);for (ConsumerRecordString, String record : records) {buffer.add(record);}if (buffer.size() minBatchSize) {for (ConsumerRecord bf : buffer) {System.out.printf(offset %d, key %s, value %s%n, bf.offset(), bf.key(), bf.value());}/**-----------------begin------------------------*///7.手动提交偏移量consumer.commitSync();/**-----------------end------------------------*/buffer.clear();return;}}} }
http://www.pierceye.com/news/309038/

相关文章:

  • 制作网站公司图片山东省建设工程质量监督总站网站
  • 物流网站模板免费长沙推广型网站建设
  • 电商网站策划做网站知乎
  • 彩票网站开发是否合法网站开发中遇到的主要问题
  • 网站建设 人员 年终总结表白网站制作器
  • 怎么发布个人网站上海网站制作推广
  • 外国人做汉字网站网站访问量过大
  • 南昌做公司网站哪家好手机端网站自动弹出营销qq
  • 网站开发参考文献2015年后出售网站平台
  • 做外国网站买域名上海网站建设的英文
  • 好看的静态网站信产部网站备案
  • 怎样建设网站 需要哪些条件wordpress安装主题要多久
  • 高端网站设计平台高端网站设计企业印象笔记wordpress同步
  • 汽车网站建设的目的公司简介模板设计图片
  • 做外贸的社交网站怎么攻击网站吗
  • 网站布局手机百度网址大全
  • 企业网站做多大擦边球做网站挣钱
  • 网站怎么备份做网站建设要学多久
  • 怎样做买东西的网站外汇期货喊单网站怎么做的
  • 博客网站推荐郑州哪里做网站
  • 贵州建设职业技术学院网站网站开发 多语言
  • 网站后台管理系统怎么进重庆建设工程安全管理局网站
  • 移动网站开发的视频下载百度网盘下载官网
  • 在百度备案网站建设工程检测网
  • 广州企业网站营销电话公司网站怎么做啊
  • 如何利用视频网站做推广网站开发管理学什么
  • 福建漳发建设有限公司网站做网站申请什么商标
  • 专门做房产的网站上海网站开发毕业生
  • 网站域名已经解析但没有被百度等搜索引擎收录怎么办可以做投票功能的网站
  • 重庆网站设计总部什么是社交电商平台