网站建设与域名备案,wordpress 腾讯验证码,百度输入法,英山县城乡建设规划局网站大数据技术之Kafka#xff1a;一篇文章带你学会Kafka
第1章Kafka概述
1.1 消息队列 #xff08;1#xff09;点对点模式#xff08;一对一#xff0c;消费者主动拉取数据#xff0c;消息收到后消息清除#xff09;点对点模型通常是一个基于拉取或者轮询的消息传送模型…大数据技术之Kafka一篇文章带你学会Kafka
第1章Kafka概述
1.1 消息队列 1点对点模式一对一消费者主动拉取数据消息收到后消息清除点对点模型通常是一个基于拉取或者轮询的消息传送模型这种模型从队列中请求信息而不是将消息推送到客户端。这个模型的特点是发送到队列的消息被一个且只有一个接收者接收处理即使有多个消息监听者也是如此。 2发布/订阅模式一对多数据生产后推送给所有订阅者 发布订阅模型则是一个基于推送的消息传送模型。发布订阅模型可以有多种不同的订阅者临时订阅者只在主动监听主题时才接收消息而持久订阅者则监听主题的所有消息即使当前订阅者不可用处于离线状态。
1.2 为什么需要消息队列
1解耦 允许你独立的扩展或修改两边的处理过程只要确保它们遵守同样的接口约束。 2冗余 消息队列把数据进行持久化直到它们已经被完全处理通过这一方式规避了数据丢失风险。许多消息队列所采用的插入-获取-删除范式中在把一个消息从队列中删除之前需要你的处理系统明确的指出该消息已经被处理完毕从而确保你的数据被安全的保存直到你使用完毕。 3扩展性 因为消息队列解耦了你的处理过程所以增大消息入队和处理的频率是很容易的只要另外增加处理过程即可。 4灵活性 峰值处理能力 在访问量剧增的情况下应用仍然需要继续发挥作用但是这样的突发流量并不常见。如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力而不会因为突发的超负荷的请求而完全崩溃。 5可恢复性 系统的一部分组件失效时不会影响到整个系统。消息队列降低了进程间的耦合度所以即使一个处理消息的进程挂掉加入队列中的消息仍然可以在系统恢复后被处理。 6顺序保证 在大多使用场景下数据处理的顺序都很重要。大部分消息队列本来就是排序的并且能保证数据会按照特定的顺序来处理。Kafka保证一个Partition内的消息的有序性 7缓冲 有助于控制和优化数据流经过系统的速度解决生产消息和消费消息的处理速度不一致的情况。 8异步通信 很多时候用户不想也不需要立即处理消息。消息队列提供了异步处理机制允许用户把一个消息放入队列但并不立即处理它。想向队列中放入多少消息就放多少然后在需要的时候再去处理它们。
1.3 什么是Kafka
在流式计算中Kafka一般用来缓存数据Storm通过消费Kafka的数据进行计算。 1Apache Kafka是一个开源消息系统由Scala写成。是由Apache软件基金会开发的一个开源消息系统项目。 2Kafka最初是由LinkedIn公司开发并于2011年初开源。2012年10月从Apache Incubator毕业。该项目的目标是为处理实时数据提供一个统一、高通量、低等待的平台。 3Kafka是一个分布式消息队列。Kafka对消息保存时根据Topic进行归类发送消息者称为Producer消息接受者称为Consumer此外kafka集群有多个kafka实例组成每个实例(server)称为broker。 4无论是kafka集群还是consumer都依赖于zookeeper集群保存一些meta信息来保证系统可用性。
1.4 Kafka架构 1Producer 消息生产者就是向kafka broker发消息的客户端 2Consumer 消息消费者向kafka broker取消息的客户端 3Topic 可以理解为一个队列 4 Consumer Group CG这是kafka用来实现一个topic消息的广播发给所有的consumer和单播发给任意一个consumer的手段。一个topic可以有多个CG。topic的消息会复制不是真的复制是概念上的到所有的CG但每个partion只会把消息发给该CG中的一个consumer。如果需要实现广播只要每个consumer有一个独立的CG就可以了。要实现单播只要所有的consumer在同一个CG。用CG还可以将consumer进行自由的分组而不需要多次发送消息到不同的topic 5Broker 一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic 6Partition为了实现扩展性一个非常大的topic可以分布到多个broker即服务器上一个topic可以分为多个partition每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的idoffset。kafka只保证按一个partition中的顺序将消息发给consumer不保证一个topic的整体多个partition间的顺序 7Offsetkafka的存储文件都是按照offset.kafka来命名用offset做名字的好处是方便查找。例如你想找位于2049的位置只要找到2048.kafka的文件即可。当然the first offset就是00000000000.kafka。
第2章 Kafka集群部署
2.1 环境准备 2.1.1 集群规划
hadoop102 hadoop103 hadoop104
zk zk zk
kafka kafka kafka2.1.2 jar包下载 http://kafka.apache.org/downloads.html 2.2 Kafka集群部署 1解压安装包
[atguiguhadoop102 software]$ tar -zxvf kafka_2.11-0.11.0.0.tgz -C /opt/module/2修改解压后的文件名称
[atguiguhadoop102 module]$ mv kafka_2.11-0.11.0.0/ kafka3在/opt/module/kafka目录下创建logs文件夹
[atguiguhadoop102 kafka]$ mkdir logs4修改配置文件
[atguiguhadoop102 kafka]$ cd config/
[atguiguhadoop102 config]$ vi server.properties输入以下内容
#broker的全局唯一编号不能重复
broker.id0
#删除topic功能使能
delete.topic.enabletrue
#处理网络请求的线程数量
num.network.threads3
#用来处理磁盘IO的现成数量
num.io.threads8
#发送套接字的缓冲区大小
socket.send.buffer.bytes102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes102400
#请求套接字的缓冲区大小
socket.request.max.bytes104857600
#kafka运行日志存放的路径
log.dirs/opt/module/kafka/logs
#topic在当前broker上的分区个数
num.partitions1
#用来恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir1
#segment文件保留的最长时间超时将被删除
log.retention.hours168
#配置连接Zookeeper集群地址
zookeeper.connecthadoop102:2181,hadoop103:2181,hadoop104:21815配置环境变量
[atguiguhadoop102 module]$ sudo vi /etc/profile#KAFKA_HOME
export KAFKA_HOME/opt/module/kafka
export PATH$PATH:$KAFKA_HOME/bin[atguiguhadoop102 module]$ source /etc/profile6分发安装包
[atguiguhadoop102 module]$ xsync kafka/注意分发之后记得配置其他机器的环境变量 7分别在hadoop103和hadoop104上修改配置文件/opt/module/kafka/config/server.properties中的broker.id1、broker.id2 注broker.id不得重复 8启动集群
依次在hadoop102、hadoop103、hadoop104节点上启动kafka
[atguiguhadoop102 kafka]$ bin/kafka-server-start.sh config/server.properties
[atguiguhadoop103 kafka]$ bin/kafka-server-start.sh config/server.properties
[atguiguhadoop104 kafka]$ bin/kafka-server-start.sh config/server.properties 9关闭集群
[atguiguhadoop102 kafka]$ bin/kafka-server-stop.sh stop
[atguiguhadoop103 kafka]$ bin/kafka-server-stop.sh stop
[atguiguhadoop104 kafka]$ bin/kafka-server-stop.sh stop2.3 Kafka命令行操作 1查看当前服务器中的所有topic
[atguiguhadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --list2创建topic
[atguiguhadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 \
--create --replication-factor 3 --partitions 1 --topic first选项说明 –topic 定义topic名 –replication-factor 定义副本数 –partitions 定义分区数
3删除topic
[atguiguhadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 \
--delete --topic first需要server.properties中设置delete.topic.enabletrue否则只是标记删除或者直接重启。 4发送消息
[atguiguhadoop102 kafka]$ bin/kafka-console-producer.sh \
--broker-list hadoop102:9092 --topic first
hello world
atguigu atguigu5消费消息
[atguiguhadoop103 kafka]$ bin/kafka-console-consumer.sh \
--zookeeper hadoop102:2181 --from-beginning --topic first–from-beginning会把first主题中以往所有的数据都读取出来。根据业务场景选择是否增加该配置。 6查看某个Topic的详情
[atguiguhadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 \
--describe --topic first标题文本样式列表图片链接目录代码片表格注脚注释自定义列表LaTeX 数学公式插入甘特图插入UML图插入Mermaid流程图插入Flowchart流程图插入类图快捷键 标题复制
第3章 Kafka工作流程分析 3.1 Kafka生产过程分析 3.1.1 写入方式 producer采用推push模式将消息发布到broker每条消息都被追加append到分区patition中属于顺序写磁盘顺序写磁盘效率比随机写内存要高保障kafka吞吐率。 3.1.2 分区Partition 消息发送时都被发送到一个topic其本质就是一个目录而topic是由一些Partition Logs(分区日志)组成其组织结构如下图所示 我们可以看到每个Partition中的消息都是有序的生产的消息被不断追加到Partition log上其中的每一个消息都被赋予了一个唯一的offset值。 1分区的原因 1方便在集群中扩展每个Partition可以通过调整以适应它所在的机器而一个topic又可以有多个Partition组成因此整个集群就可以适应任意大小的数据了 2可以提高并发因为可以以Partition为单位读写了。 2分区的原则 1指定了patition则直接使用 2未指定patition但指定key通过对key的value进行hash出一个patition 3patition和key都未指定使用轮询选出一个patition。 DefaultPartitioner类
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {ListPartitionInfo partitions cluster.partitionsForTopic(topic);int numPartitions partitions.size();if (keyBytes null) {int nextValue nextValue(topic);ListPartitionInfo availablePartitions cluster.availablePartitionsForTopic(topic);if (availablePartitions.size() 0) {int part Utils.toPositive(nextValue) % availablePartitions.size();return availablePartitions.get(part).partition();} else {// no partitions are available, give a non-available partitionreturn Utils.toPositive(nextValue) % numPartitions;}} else {// hash the keyBytes to choose a partitionreturn Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;}}3.1.3 副本Replication 同一个partition可能会有多个replication对应 server.properties 配置中的 default.replication.factorN。没有replication的情况下一旦broker 宕机其上所有 patition 的数据都不可被消费同时producer也不能再将数据存于其上的patition。引入replication之后同一个partition可能会有多个replication而这时需要在这些replication之间选出一个leaderproducer和consumer只与这个leader交互其它replication作为follower从leader 中复制数据。 3.1.4 写入流程 producer写入消息流程如下 1producer先从zookeeper的 /brokers/…/state节点找到该partition的leader 2producer将消息发送给该leader 3leader将消息写入本地log 4followers从leader pull消息写入本地log后向leader发送ACK 5leader收到所有ISR中的replication的ACK后增加HWhigh watermark最后commit 的offset并向producer发送ACK 3.2 Broker 保存消息 3.2.1 存储方式 物理上把topic分成一个或多个patition对应 server.properties 中的num.partitions3配置每个patition物理上对应一个文件夹该文件夹存储该patition的所有消息和索引文件如下
[atguiguhadoop102 logs]$ ll
drwxrwxr-x. 2 atguigu atguigu 4096 8月 6 14:37 first-0
drwxrwxr-x. 2 atguigu atguigu 4096 8月 6 14:35 first-1
drwxrwxr-x. 2 atguigu atguigu 4096 8月 6 14:37 first-2
[atguiguhadoop102 logs]$ cd first-0
[atguiguhadoop102 first-0]$ ll
-rw-rw-r--. 1 atguigu atguigu 10485760 8月 6 14:33 00000000000000000000.index
-rw-rw-r--. 1 atguigu atguigu 219 8月 6 15:07 00000000000000000000.log
-rw-rw-r--. 1 atguigu atguigu 10485756 8月 6 14:33 00000000000000000000.timeindex
-rw-rw-r--. 1 atguigu atguigu 8 8月 6 14:37 leader-epoch-checkpoint3.2.2 存储策略 无论消息是否被消费kafka都会保留所有消息。有两种策略可以删除旧数据 1基于时间log.retention.hours168 2基于大小log.retention.bytes1073741824 需要注意的是因为Kafka读取特定消息的时间复杂度为O(1)即与文件大小无关所以这里删除过期文件与提高 Kafka 性能无关。
3.2.3 Zookeeper存储结构 注意producer不在zk中注册消费者在zk中注册。 3.3 Kafka消费过程分析 kafka提供了两套consumer API高级Consumer API和低级Consumer API。 3.3.1 高级API 1高级API优点 高级API 写起来简单 不需要自行去管理offset系统通过zookeeper自行管理。 不需要管理分区副本等情况.系统自动管理。 消费者断线会自动根据上一次记录在zookeeper中的offset去接着获取数据默认设置1分钟更新一下zookeeper中存的offset 可以使用group来区分对同一个topic 的不同程序访问分离开来不同的group记录不同的offset这样不同程序读取同一个topic才不会因为offset互相影响 2高级API缺点 不能自行控制offset对于某些特殊需求来说 不能细化控制如分区、副本、zk等 3.3.2 低级API 1低级 API 优点 能够让开发者自己控制offset想从哪里读取就从哪里读取。 自行控制连接分区对分区自定义进行负载均衡 对zookeeper的依赖性降低如offset不一定非要靠zk存储自行存储offset即可比如存在文件或者内存中 2低级API缺点 太过复杂需要自行控制offset连接哪个分区找到分区leader 等。 3.3.3 消费者组 消费者是以consumer group消费者组的方式工作由一个或者多个消费者组成一个组共同消费一个topic。每个分区在同一时间只能由group中的一个消费者读取但是多个group可以同时消费这个partition。在图中有一个由三个消费者组成的group有一个消费者读取主题中的两个分区另外两个分别读取一个分区。某个消费者读取某个分区也可以叫做某个消费者是某个分区的拥有者。 在这种情况下消费者可以通过水平扩展的方式同时读取大量的消息。另外如果一个消费者失败了那么其他的group成员会自动负载均衡读取之前失败的消费者读取的分区。 3.3.4 消费方式 consumer采用pull拉模式从broker中读取数据。 push推模式很难适应消费速率不同的消费者因为消息发送速率是由broker决定的。它的目标是尽可能以最快速度传递消息但是这样很容易造成consumer来不及处理消息典型的表现就是拒绝服务以及网络拥塞。而pull模式则可以根据consumer的消费能力以适当的速率消费消息。 对于Kafka而言pull模式更合适它可简化broker的设计consumer可自主控制消费消息的速率同时consumer可以自己控制消费方式——即可批量消费也可逐条消费同时还能选择不同的提交方式从而实现不同的传输语义。 pull模式不足之处是如果kafka没有数据消费者可能会陷入循环中一直等待数据到达。为了避免这种情况我们在我们的拉请求中有参数允许消费者请求在等待数据到达的“长轮询”中进行阻塞并且可选地等待到给定的字节数以确保大的传输大小。 3.3.5 消费者组案例 1需求测试同一个消费者组中的消费者同一时刻只能有一个消费者消费。 2案例实操 1在hadoop102、hadoop103上修改/opt/module/kafka/config/consumer.properties配置文件中的group.id属性为任意组名。
[atguiguhadoop103 config]$ vi consumer.properties
group.idatguigu2在hadoop102、hadoop103上分别启动消费者
[atguiguhadoop102 kafka]$ bin/kafka-console-consumer.sh \
--zookeeper hadoop102:2181 --topic first --consumer.config config/consumer.properties
[atguiguhadoop103 kafka]$ bin/kafka-console-consumer.sh --zookeeper hadoop102:2181 --topic first --consumer.config config/consumer.properties3在hadoop104上启动生产者[atguiguhadoop104 kafka]$ bin/kafka-console-producer.sh \
--broker-list hadoop102:9092 --topic first
hello world4查看hadoop102和hadoop103的接收者。同一时刻只有一个消费者接收到消息。第4章 Kafka API实战
4.1 环境准备 1启动zk和kafka集群在kafka集群中打开一个消费者
[atguiguhadoop102 kafka]$ bin/kafka-console-consumer.sh \
--zookeeper hadoop102:2181 --topic first2导入pom依赖
dependencies!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --dependencygroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactIdversion0.11.0.0/version/dependency!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka --dependencygroupIdorg.apache.kafka/groupIdartifactIdkafka_2.12/artifactIdversion0.11.0.0/version/dependency
/dependencies4.2 Kafka生产者Java API 4.2.1 创建生产者过时的API
package com.atguigu.kafka;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;public class OldProducer {SuppressWarnings(deprecation)public static void main(String[] args) {Properties properties new Properties();properties.put(metadata.broker.list, hadoop102:9092);properties.put(request.required.acks, 1);properties.put(serializer.class, kafka.serializer.StringEncoder);ProducerInteger, String producer new ProducerInteger,String(new ProducerConfig(properties));KeyedMessageInteger, String message new KeyedMessageInteger, String(first, hello world);producer.send(message );}
}4.2.2 创建生产者新API
package com.atguigu.kafka;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;public class NewProducer {public static void main(String[] args) {Properties props new Properties();// Kafka服务端的主机名和端口号props.put(bootstrap.servers, hadoop103:9092);// 等待所有副本节点的应答props.put(acks, all);// 消息发送最大尝试次数props.put(retries, 0);// 一批消息处理大小props.put(batch.size, 16384);// 请求延时props.put(linger.ms, 1);// 发送缓存区内存大小props.put(buffer.memory, 33554432);// key序列化props.put(key.serializer, org.apache.kafka.common.serialization.StringSerializer);// value序列化props.put(value.serializer, org.apache.kafka.common.serialization.StringSerializer);ProducerString, String producer new KafkaProducer(props);for (int i 0; i 50; i) {producer.send(new ProducerRecordString, String(first, Integer.toString(i), hello world- i));}producer.close();}
}4.2.3 创建生产者带回调函数新API
package com.atguigu.kafka;
import java.util.Properties;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;public class CallBackProducer {public static void main(String[] args) {Properties props new Properties();// Kafka服务端的主机名和端口号props.put(bootstrap.servers, hadoop103:9092);// 等待所有副本节点的应答props.put(acks, all);// 消息发送最大尝试次数props.put(retries, 0);// 一批消息处理大小props.put(batch.size, 16384);// 增加服务端请求延时props.put(linger.ms, 1);
// 发送缓存区内存大小props.put(buffer.memory, 33554432);// key序列化props.put(key.serializer, org.apache.kafka.common.serialization.StringSerializer);// value序列化props.put(value.serializer, org.apache.kafka.common.serialization.StringSerializer);KafkaProducerString, String kafkaProducer new KafkaProducer(props);for (int i 0; i 50; i) {kafkaProducer.send(new ProducerRecordString, String(first, hello i), new Callback() {Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (metadata ! null) {System.err.println(metadata.partition() --- metadata.offset());}}});}kafkaProducer.close();}
}4.2.4 自定义分区生产者 0需求将所有数据存储到topic的第0号分区上 1定义一个类实现Partitioner接口重写里面的方法过时API
package com.atguigu.kafka;
import java.util.Map;
import kafka.producer.Partitioner;public class CustomPartitioner implements Partitioner {public CustomPartitioner() {super();}Overridepublic int partition(Object key, int numPartitions) {// 控制分区return 0;}
}
2自定义分区新API
package com.atguigu.kafka;
import java.util.Map;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;public class CustomPartitioner implements Partitioner {Overridepublic void configure(MapString, ? configs) {}Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {// 控制分区return 0;}Overridepublic void close() {}
}3在代码中调用
package com.atguigu.kafka;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;public class PartitionerProducer {public static void main(String[] args) {Properties props new Properties();// Kafka服务端的主机名和端口号props.put(bootstrap.servers, hadoop103:9092);// 等待所有副本节点的应答props.put(acks, all);// 消息发送最大尝试次数props.put(retries, 0);// 一批消息处理大小props.put(batch.size, 16384);// 增加服务端请求延时props.put(linger.ms, 1);// 发送缓存区内存大小props.put(buffer.memory, 33554432);// key序列化props.put(key.serializer, org.apache.kafka.common.serialization.StringSerializer);// value序列化props.put(value.serializer, org.apache.kafka.common.serialization.StringSerializer);// 自定义分区props.put(partitioner.class, com.atguigu.kafka.CustomPartitioner);ProducerString, String producer new KafkaProducer(props);producer.send(new ProducerRecordString, String(first, 1, atguigu));producer.close();}
}4测试 1在hadoop102上监控/opt/module/kafka/logs/目录下first主题3个分区的log日志动态变化情况
[atguiguhadoop102 first-0]$ tail -f 00000000000000000000.log
[atguiguhadoop102 first-1]$ tail -f 00000000000000000000.log
[atguiguhadoop102 first-2]$ tail -f 00000000000000000000.log2发现数据都存储到指定的分区了。4.3 Kafka消费者Java API 4.3.1 高级API 0在控制台创建发送者
[atguiguhadoop104 kafka]$ bin/kafka-console-producer.sh \
--broker-list hadoop102:9092 --topic first
hello world1创建消费者过时API
package com.atguigu.kafka.consume;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;public class CustomConsumer {SuppressWarnings(deprecation)public static void main(String[] args) {Properties properties new Properties();properties.put(zookeeper.connect, hadoop102:2181);properties.put(group.id, g1);properties.put(zookeeper.session.timeout.ms, 500);properties.put(zookeeper.sync.time.ms, 250);properties.put(auto.commit.interval.ms, 1000);// 创建消费者连接器ConsumerConnector consumer Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));HashMapString, Integer topicCount new HashMap();topicCount.put(first, 1);MapString, ListKafkaStreambyte[], byte[] consumerMap consumer.createMessageStreams(topicCount);KafkaStreambyte[], byte[] stream consumerMap.get(first).get(0);ConsumerIteratorbyte[], byte[] it stream.iterator();while (it.hasNext()) {System.out.println(new String(it.next().message()));}}
}2官方提供案例自动维护消费情况新API
package com.atguigu.kafka.consume;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;public class CustomNewConsumer {public static void main(String[] args) {Properties props new Properties();// 定义kakfa 服务的地址不需要将所有broker指定上 props.put(bootstrap.servers, hadoop102:9092);// 制定consumer group props.put(group.id, test);// 是否自动确认offset props.put(enable.auto.commit, true);// 自动确认offset的时间间隔 props.put(auto.commit.interval.ms, 1000);// key的序列化类props.put(key.deserializer, org.apache.kafka.common.serialization.StringDeserializer);// value的序列化类 props.put(value.deserializer, org.apache.kafka.common.serialization.StringDeserializer);// 定义consumer KafkaConsumerString, String consumer new KafkaConsumer(props);// 消费者订阅的topic, 可同时订阅多个 consumer.subscribe(Arrays.asList(first, second,third));while (true) {// 读取数据读取超时时间为100ms ConsumerRecordsString, String records consumer.poll(100);for (ConsumerRecordString, String record : records)System.out.printf(offset %d, key %s, value %s%n, record.offset(), record.key(), record.value());}}
}4.3.2 低级API 实现使用低级API读取指定topic指定partition,指定offset的数据。 1消费者使用低级API 的主要步骤 2方法描述 3代码
package com.atguigu;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.cluster.BrokerEndPoint;
import kafka.common.ErrorMapping;
import kafka.common.TopicAndPartition;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.MessageAndOffset;public class SimpleExample {private ListString m_replicaBrokers new ArrayList();public SimpleExample() {m_replicaBrokers new ArrayList();}public static void main(String args[]) {SimpleExample example new SimpleExample();// 最大读取消息数量long maxReads Long.parseLong(3);// 要订阅的topicString topic test1;// 要查找的分区int partition Integer.parseInt(0);// broker节点的ipListString seeds new ArrayList();seeds.add(192.168.9.102);seeds.add(192.168.9.103);seeds.add(192.168.9.104);// 端口int port Integer.parseInt(9092);try {example.run(maxReads, topic, partition, seeds, port);} catch (Exception e) {System.out.println(Oops: e);e.printStackTrace();}}public void run(long a_maxReads, String a_topic, int a_partition, ListString a_seedBrokers, int a_port) throws Exception {// 获取指定Topic partition的元数据PartitionMetadata metadata findLeader(a_seedBrokers, a_port, a_topic, a_partition);if (metadata null) {System.out.println(Cant find metadata for Topic and Partition. Exiting);return;}if (metadata.leader() null) {System.out.println(Cant find Leader for Topic and Partition. Exiting);return;}String leadBroker metadata.leader().host();String clientName Client_ a_topic _ a_partition;SimpleConsumer consumer new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);long readOffset getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.EarliestTime(), clientName);int numErrors 0;while (a_maxReads 0) {if (consumer null) {consumer new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);}FetchRequest req new FetchRequestBuilder().clientId(clientName).addFetch(a_topic, a_partition, readOffset, 100000).build();FetchResponse fetchResponse consumer.fetch(req);if (fetchResponse.hasError()) {numErrors;// Something went wrong!short code fetchResponse.errorCode(a_topic, a_partition);System.out.println(Error fetching data from the Broker: leadBroker Reason: code);if (numErrors 5)break;if (code ErrorMapping.OffsetOutOfRangeCode()) {// We asked for an invalid offset. For simple case ask for// the last element to resetreadOffset getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.LatestTime(), clientName);continue;}consumer.close();consumer null;leadBroker findNewLeader(leadBroker, a_topic, a_partition, a_port);continue;}numErrors 0;long numRead 0;for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) {long currentOffset messageAndOffset.offset();if (currentOffset readOffset) {System.out.println(Found an old offset: currentOffset Expecting: readOffset);continue;}readOffset messageAndOffset.nextOffset();ByteBuffer payload messageAndOffset.message().payload();byte[] bytes new byte[payload.limit()];payload.get(bytes);System.out.println(String.valueOf(messageAndOffset.offset()) : new String(bytes, UTF-8));numRead;a_maxReads--;}if (numRead 0) {try {Thread.sleep(1000);} catch (InterruptedException ie) {}}}if (consumer ! null)consumer.close();}public static long getLastOffset(SimpleConsumer consumer, String topic, int partition, long whichTime, String clientName) {TopicAndPartition topicAndPartition new TopicAndPartition(topic, partition);MapTopicAndPartition, PartitionOffsetRequestInfo requestInfo new HashMapTopicAndPartition, PartitionOffsetRequestInfo();requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));kafka.javaapi.OffsetRequest request new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);OffsetResponse response consumer.getOffsetsBefore(request);if (response.hasError()) {System.out.println(Error fetching data Offset Data the Broker. Reason: response.errorCode(topic, partition));return 0;}long[] offsets response.offsets(topic, partition);return offsets[0];}private String findNewLeader(String a_oldLeader, String a_topic, int a_partition, int a_port) throws Exception {for (int i 0; i 3; i) {boolean goToSleep false;PartitionMetadata metadata findLeader(m_replicaBrokers, a_port, a_topic, a_partition);if (metadata null) {goToSleep true;} else if (metadata.leader() null) {goToSleep true;} else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) i 0) {// first time through if the leader hasnt changed give// ZooKeeper a second to recover// second time, assume the broker did recover before failover,// or it was a non-Broker issue//goToSleep true;} else {return metadata.leader().host();}if (goToSleep) {Thread.sleep(1000);}}System.out.println(Unable to find new leader after Broker failure. Exiting);throw new Exception(Unable to find new leader after Broker failure. Exiting);}private PartitionMetadata findLeader(ListString a_seedBrokers, int a_port, String a_topic, int a_partition) {PartitionMetadata returnMetaData null;loop:for (String seed : a_seedBrokers) {SimpleConsumer consumer null;try {consumer new SimpleConsumer(seed, a_port, 100000, 64 * 1024, leaderLookup);ListString topics Collections.singletonList(a_topic);TopicMetadataRequest req new TopicMetadataRequest(topics);kafka.javaapi.TopicMetadataResponse resp consumer.send(req);ListTopicMetadata metaData resp.topicsMetadata();for (TopicMetadata item : metaData) {for (PartitionMetadata part : item.partitionsMetadata()) {if (part.partitionId() a_partition) {returnMetaData part;break loop;}}}} catch (Exception e) {System.out.println(Error communicating with Broker [ seed ] to find Leader for [ a_topic , a_partition ] Reason: e);} finally {if (consumer ! null)consumer.close();}}if (returnMetaData ! null) {m_replicaBrokers.clear();for (BrokerEndPoint replica : returnMetaData.replicas()) {m_replicaBrokers.add(replica.host());}}return returnMetaData;}
}第5章 Kafka producer拦截器(interceptor)
5.1 拦截器原理 Producer拦截器(interceptor)是在Kafka 0.10版本被引入的主要用于实现clients端的定制化控制逻辑。 对于producer而言interceptor使得用户在消息发送前以及producer回调逻辑前有机会对消息做一些定制化需求比如修改消息等。同时producer允许用户指定多个interceptor按序作用于同一条消息从而形成一个拦截链(interceptor chain)。Intercetpor的实现接口是org.apache.kafka.clients.producer.ProducerInterceptor其定义的方法包括 1configure(configs) 获取配置信息和初始化数据时调用。 2onSend(ProducerRecord) 该方法封装进KafkaProducer.send方法中即它运行在用户主线程中。Producer确保在消息被序列化以及计算分区前调用该方法。用户可以在该方法中对消息做任何操作但最好保证不要修改消息所属的topic和分区否则会影响目标分区的计算 3onAcknowledgement(RecordMetadata, Exception) 该方法会在消息被应答或消息发送失败时调用并且通常都是在producer回调逻辑触发之前。onAcknowledgement运行在producer的IO线程中因此不要在该方法中放入很重的逻辑否则会拖慢producer的消息发送效率 4close 关闭interceptor主要用于执行一些资源清理工作 如前所述interceptor可能被运行在多个线程中因此在具体实现时用户需要自行确保线程安全。另外倘若指定了多个interceptor则producer将按照指定顺序调用它们并仅仅是捕获每个interceptor可能抛出的异常记录到错误日志中而非在向上传递。这在使用过程中要特别留意。 5.2 拦截器案例 1需求 实现一个简单的双interceptor组成的拦截链。第一个interceptor会在消息发送前将时间戳信息加到消息value的最前部第二个interceptor会在消息发送后更新成功发送消息数或失败发送消息数。 2案例实操 1增加时间戳拦截器
package com.atguigu.kafka.interceptor;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;public class TimeInterceptor implements ProducerInterceptorString, String {Overridepublic void configure(MapString, ? configs) {}Overridepublic ProducerRecordString, String onSend(ProducerRecordString, String record) {// 创建一个新的record把时间戳写入消息体的最前部return new ProducerRecord(record.topic(), record.partition(), record.timestamp(), record.key(),System.currentTimeMillis() , record.value().toString());}Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {}Overridepublic void close() {}
}2统计发送消息成功和发送失败消息数并在producer关闭时打印这两个计数器
package com.atguigu.kafka.interceptor;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;public class CounterInterceptor implements ProducerInterceptorString, String{private int errorCounter 0;private int successCounter 0;Overridepublic void configure(MapString, ? configs) {}Overridepublic ProducerRecordString, String onSend(ProducerRecordString, String record) {return record;}Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {// 统计成功和失败的次数if (exception null) {successCounter;} else {errorCounter;}}Overridepublic void close() {// 保存结果System.out.println(Successful sent: successCounter);System.out.println(Failed sent: errorCounter);}
}3producer主程序
package com.atguigu.kafka.interceptor;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;public class InterceptorProducer {public static void main(String[] args) throws Exception {// 1 设置配置信息Properties props new Properties();props.put(bootstrap.servers, hadoop102:9092);props.put(acks, all);props.put(retries, 0);props.put(batch.size, 16384);props.put(linger.ms, 1);props.put(buffer.memory, 33554432);props.put(key.serializer, org.apache.kafka.common.serialization.StringSerializer);props.put(value.serializer, org.apache.kafka.common.serialization.StringSerializer);// 2 构建拦截链ListString interceptors new ArrayList();interceptors.add(com.atguigu.kafka.interceptor.TimeInterceptor); interceptors.add(com.atguigu.kafka.interceptor.CounterInterceptor); props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);String topic first;ProducerString, String producer new KafkaProducer(props);// 3 发送消息for (int i 0; i 10; i) {ProducerRecordString, String record new ProducerRecord(topic, message i);producer.send(record);}// 4 一定要关闭producer这样才会调用interceptor的close方法producer.close();}
}3测试 1在kafka上启动消费者然后运行客户端java程序。
[atguiguhadoop102 kafka]$ bin/kafka-console-consumer.sh \
--zookeeper hadoop102:2181 --from-beginning --topic first1501904047034,message0
1501904047225,message1
1501904047230,message2
1501904047234,message3
1501904047236,message4
1501904047240,message5
1501904047243,message6
1501904047246,message7
1501904047249,message8
1501904047252,message92观察java平台控制台输出数据如下
Successful sent: 10
Failed sent: 0第6章 Kafka Streams
6.1 概述 6.1.1 Kafka Streams Kafka Streams。Apache Kafka开源项目的一个组成部分。是一个功能强大易于使用的库。用于在Kafka上构建高可分布式、拓展性容错的应用程序。 6.1.2 Kafka Streams特点 1功能强大 高扩展性弹性容错 2轻量级 无需专门的集群 一个库而不是框架 3完全集成 100%的Kafka 0.10.0版本兼容 易于集成到现有的应用程序 4实时性 毫秒级延迟 并非微批处理 窗口允许乱序数据 允许迟到数据 6.1.3 为什么要有Kafka Stream 当前已经有非常多的流式处理系统最知名且应用最多的开源流式处理系统有Spark Streaming和Apache Storm。Apache Storm发展多年应用广泛提供记录级别的处理能力当前也支持SQL on Stream。而Spark Streaming基于Apache Spark可以非常方便与图计算SQL处理等集成功能强大对于熟悉其它Spark应用开发的用户而言使用门槛低。另外目前主流的Hadoop发行版如Cloudera和Hortonworks都集成了Apache Storm和Apache Spark使得部署更容易。 既然Apache Spark与Apache Storm拥用如此多的优势那为何还需要Kafka Stream呢主要有如下原因。 第一Spark和Storm都是流式处理框架而Kafka Stream提供的是一个基于Kafka的流式处理类库。框架要求开发者按照特定的方式去开发逻辑部分供框架调用。开发者很难了解框架的具体运行方式从而使得调试成本高并且使用受限。而Kafka Stream作为流式处理类库直接提供具体的类给开发者调用整个应用的运行方式主要由开发者控制方便使用和调试。 第二虽然Cloudera与Hortonworks方便了Storm和Spark的部署但是这些框架的部署仍然相对复杂。而Kafka Stream作为类库可以非常方便的嵌入应用程序中它对应用的打包和部署基本没有任何要求。 第三就流式处理系统而言基本都支持Kafka作为数据源。例如Storm具有专门的kafka-spout而Spark也提供专门的spark-streaming-kafka模块。事实上Kafka基本上是主流的流式处理系统的标准数据源。换言之大部分流式系统中都已部署了Kafka此时使用Kafka Stream的成本非常低。 第四使用Storm或Spark Streaming时需要为框架本身的进程预留资源如Storm的supervisor和Spark on YARN的node manager。即使对于应用实例而言框架本身也会占用部分资源如Spark Streaming需要为shuffle和storage预留内存。但是Kafka作为类库不占用系统资源。 第五由于Kafka本身提供数据持久化因此Kafka Stream提供滚动部署和滚动升级以及重新计算的能力。 第六由于Kafka Consumer Rebalance机制Kafka Stream可以在线动态调整并行度。 6.2 Kafka Stream数据清洗案例 0需求 实时处理单词带有””前缀的内容。例如输入”atguiguximenqing”最终处理成“ximenqing” 1需求分析 2案例实操 1创建一个工程并添加jar包 2创建主类
package com.atguigu.kafka.stream;
import java.util.Properties;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.TopologyBuilder;public class Application {public static void main(String[] args) {// 定义输入的topicString from first;// 定义输出的topicString to second;// 设置参数Properties settings new Properties();settings.put(StreamsConfig.APPLICATION_ID_CONFIG, logFilter);settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hadoop102:9092);StreamsConfig config new StreamsConfig(settings);// 构建拓扑TopologyBuilder builder new TopologyBuilder();builder.addSource(SOURCE, from).addProcessor(PROCESS, new ProcessorSupplierbyte[], byte[]() {Overridepublic Processorbyte[], byte[] get() {// 具体分析处理return new LogProcessor();}}, SOURCE).addSink(SINK, to, PROCESS);// 创建kafka streamKafkaStreams streams new KafkaStreams(builder, config);streams.start();}
}3具体业务处理
package com.atguigu.kafka.stream;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;public class LogProcessor implements Processorbyte[], byte[] {private ProcessorContext context;Overridepublic void init(ProcessorContext context) {this.context context;}Overridepublic void process(byte[] key, byte[] value) {String input new String(value);// 如果包含“”则只保留该标记后面的内容if (input.contains()) {input input.split()[1].trim();// 输出到下一个topiccontext.forward(logProcessor.getBytes(), input.getBytes());}else{context.forward(logProcessor.getBytes(), input.getBytes());}}Overridepublic void punctuate(long timestamp) {}Overridepublic void close() {}
}4运行程序 5在hadoop104上启动生产者
[atguiguhadoop104 kafka]$ bin/kafka-console-producer.sh \
--broker-list hadoop102:9092 --topic firsthelloworld
hatguigu
hahaha6在hadoop103上启动消费者
[atguiguhadoop103 kafka]$ bin/kafka-console-consumer.sh \
--zookeeper hadoop102:2181 --from-beginning --topic secondworld
atguigu
hahaha第7章 扩展
7.1 Kafka与Flume比较 在企业中必须要清楚流式数据采集框架flume和kafka的定位是什么 flumecloudera公司研发: 适合多个生产者 适合下游数据消费者不多的情况 适合数据安全性要求不高的操作 适合与Hadoop生态圈对接的操作。 kafkalinkedin公司研发: 适合数据下游消费众多的情况 适合数据安全性要求较高的操作支持replication。 因此我们常用的一种模型是 线上数据 -- flume -- kafka -- flume(根据情景增删该流程) -- HDFS 7.2 Flume与kafka集成 1配置flume(flume-kafka.conf)
# define
a1.sources r1
a1.sinks k1
a1.channels c1# source
a1.sources.r1.type exec
a1.sources.r1.command tail -F -c 0 /opt/module/datas/flume.log
a1.sources.r1.shell /bin/bash -c# sink
a1.sinks.k1.type org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sinks.k1.kafka.topic first
a1.sinks.k1.kafka.flumeBatchSize 20
a1.sinks.k1.kafka.producer.acks 1
a1.sinks.k1.kafka.producer.linger.ms 1# channel
a1.channels.c1.type memory
a1.channels.c1.capacity 1000
a1.channels.c1.transactionCapacity 100# bind
a1.sources.r1.channels c1
a1.sinks.k1.channel c12 启动kafkaIDEA消费者 3 进入flume根目录下启动flume
$ bin/flume-ng agent -c conf/ -n a1 -f jobs/flume-kafka.conf4 向 /opt/module/datas/flume.log里追加数据查看kafka消费者消费情况
$ echo hello /opt/module/datas/flume.log