信阳建设网站,开远市住房和城乡建设局网站,门户网站建设经济交流材料,展示型网站企业网站建设整体架构 整个生产者客户端由两个县城协调运行#xff0c;这两个线程分别为主线程和Sender线程#xff08;发送线程#xff09;。
主线程中由KafkaProducer创建消息#xff0c;然后通过可能的拦截器#xff0c;序列化器和分区器之后缓存到消息累加器#xff08;RecordAc…整体架构 整个生产者客户端由两个县城协调运行这两个线程分别为主线程和Sender线程发送线程。
主线程中由KafkaProducer创建消息然后通过可能的拦截器序列化器和分区器之后缓存到消息累加器RecordAccumulator。Sender线程负责从RecordAccumulator中获取消息并将其发送到kafka中。
RecordAccumulator 主要用来缓存消息以便 Sender 线程可以批量发送进而减少网络传输的资源消耗以提升性能。RecordAccumulator 缓存的大小可以通过生产者客户端参数 buffer.memory 配置默认值为 33554432B即32MB。如果生产者发送消息的速度超过发送到服务器的速度则会导致生产者空间不足这个时候 KafkaProducer 的 send() 方法调用要么被阻塞要么抛出异常这个取决于参数 max.block.ms 的配置此参数的默认值为60000即60秒。
主线程中发送过来的消息都会被追加到 RecordAccumulator 的某个双端队列Deque中在 RecordAccumulator 的内部为每个分区都维护了一个双端队列队列中的内容就是 ProducerBatch即 Deque。消息写入缓存时追加到双端队列的尾部Sender 读取消息时从双端队列的头部读取。注意 ProducerBatch 不是 ProducerRecordProducerBatch 中可以包含一至多个 ProducerRecord。通俗地说ProducerRecord 是生产者中创建的消息而 ProducerBatch 是指一个消息批次ProducerRecord 会被包含在 ProducerBatch 中这样可以使字节的使用更加紧凑。与此同时将较小的 ProducerRecord 拼凑成一个较大的 ProducerBatch也可以减少网络请求的次数以提升整体的吞吐量。
Sender 从 RecordAccumulator 中获取缓存的消息之后会进一步将原本分区, Deque ProducerBatch 的保存形式转变成 Node, List ProducerBatch 的形式其中 Node 表示 Kafka 集群的 broker 节点。对于网络连接来说生产者客户端是与具体的 broker 节点建立的连接也就是向具体的 broker 节点发送消息而并不关心消息属于哪一个分区而对于 KafkaProducer 的应用逻辑而言我们只关注向哪个分区中发送哪些消息所以在这里需要做一个应用逻辑层面到网络I/O层面的转换。
在转换成 Node, List 的形式之后Sender 还会进一步封装成 Node, Request 的形式这样就可以将 Request 请求发往各个 Node 了这里的 Request 是指 Kafka 的各种协议请求对于消息发送而言就是指具体的 ProduceRequest。
请求在从 Sender 线程发往 Kafka 之前还会保存到 InFlightRequests 中InFlightRequests 保存对象的具体形式为 MapNodeId, Deque它的主要作用是缓存了已经发出去但还没有收到响应的请求NodeId 是一个 String 类型表示节点的 id 编号。与此同时InFlightRequests 还提供了许多管理类的方法并且通过配置参数还可以限制每个连接也就是客户端与 Node 之间的连接最多缓存的请求数。这个配置参数为 max.in.flight.requests.per.connection默认值为5即每个连接最多只能缓存5个未响应的请求超过该数值之后就不能再向这个连接发送更多的请求了除非有缓存的请求收到了响应Response。通过比较 Deque 的 size 与这个参数的大小来判断对应的 Node 中是否已经堆积了很多未响应的消息如果真是如此那么说明这个 Node 节点负载较大或网络连接有问题再继续向其发送请求会增大请求超时的可能。
前面提及的 InFlightRequests 还可以获得 leastLoadedNode即所有 Node 中负载最小的那一个。这里的负载最小是通过每个 Node 在 InFlightRequests 中还未确认的请求决定的未确认的请求越多则认为负载越大。对于下图中的 InFlightRequests 来说图中展示了三个节点 Node0、Node1和Node2很明显 Node1 的负载最小。也就是说Node1 为当前的 leastLoadedNode。选择 leastLoadedNode 发送请求可以使它能够尽快发出避免因网络拥塞等异常而影响整体的进度。leastLoadedNode 的概念可以用于多个应用场合比如元数据请求、消费者组播协议的交互。 生产者重要参数
acks
acks1。默认值生产者发送消息之后只要分区的 leader 副本成功写入消息那么它就会收到来自服务端的成功响应。如果消息无法写入 leader 副本比如在 leader 副本崩溃、重新选举新的 leader 副本的过程中那么生产者就会收到一个错误的响应为了避免消息丢失生产者可以选择重发消息。如果消息写入 leader 副本并返回成功响应给生产者且在被其他 follower 副本拉取之前 leader 副本崩溃那么此时消息还是会丢失因为新选举的 leader 副本中并没有这条对应的消息。acks 设置为1是消息可靠性和吞吐量之间的折中方案。acks0。生产者发送消息之后不需要等待任何服务端的响应。如果在消息从发送到写入 Kafka 的过程中出现某些异常导致 Kafka 并没有收到这条消息那么生产者也无从得知消息也就丢失了。在其他配置环境相同的情况下acks 设置为0可以达到最大的吞吐量。acks-1或acksall。生产者在消息发送之后需要等待 ISR 中的所有副本都成功写入消息之后才能够收到来自服务端的成功响应。在其他配置环境相同的情况下acks 设置为 -1all 可以达到最强的可靠性。但这并不意味着消息就一定可靠因为ISR中可能只有 leader 副本这样就退化成了 acks1 的情况。要获得更高的消息可靠性需要配合 min.insync.replicas 等参数的联动
retries和retry.backoff.ms
retries 参数用来配置生产者重试的次数默认值为0即在发生异常的时候不进行任何重试动作。消息在从生产者发出到成功写入服务器之前可能发生一些临时性的异常比如网络抖动、leader 副本的选举等这种异常往往是可以自行恢复的生产者可以通过配置 retries 大于0的值以此通过内部重试来恢复而不是一味地将异常抛给生产者的应用程序。如果重试达到设定的次数那么生产者就会放弃重试并返回异常。不过并不是所有的异常都是可以通过重试来解决的比如消息太大超过 max.request.size 参数配置的值时这种方式就不可行了。
重试还和另一个参数 retry.backoff.ms 有关这个参数的默认值为100它用来设定两次重试之间的时间间隔避免无效的频繁重试。在配置 retries 和 retry.backoff.ms 之前最好先估算一下可能的异常恢复时间这样可以设定总的重试时间大于这个异常恢复时间以此来避免生产者过早地放弃重试。
Kafka 可以保证同一个分区中的消息是有序的。如果生产者按照一定的顺序发送消息那么这些消息也会顺序地写入分区进而消费者也可以按照同样的顺序消费它们。
对于某些应用来说顺序性非常重要比如 MySQL 的 binlog 传输如果出现错误就会造成非常严重的后果。如果将 retries 参数配置为非零值并且 max.in.flight.requests.per.connection 参数配置为大于1的值那么就会出现错序的现象如果第一批次消息写入失败而第二批次消息写入成功那么生产者会重试发送第一批次的消息此时如果第一批次的消息写入成功那么这两个批次的消息就出现了错序。一般而言在需要保证消息顺序的场合建议把参数 max.in.flight.requests.per.connection 配置为1而不是把 retries 配置为0不过这样也会影响整体的吞吐。