pc 网站建设,苏州seo网站诊断,网站用花生壳nas做存储,西宁网站建设报价壹君博贴心接着上文kafka的简述#xff0c;这一章我们一探kafka生产者是如何发送消息到消息服务器的。 代码的入口还是从 kafkaTemplate.send开始 最终我们就会到 org.springframework.kafka.core.KafkaTemplate#doSend方法 这里的关键就是 org.apache.kafka.clients.producer.Producer#… 接着上文kafka的简述这一章我们一探kafka生产者是如何发送消息到消息服务器的。 代码的入口还是从 kafkaTemplate.send开始 最终我们就会到 org.springframework.kafka.core.KafkaTemplate#doSend方法 这里的关键就是 org.apache.kafka.clients.producer.Producer#send(org.apache.kafka.clients.producer.ProducerRecordK,V, org.apache.kafka.clients.producer.Callback) 我们再一路点击下去一直到 org.apache.kafka.clients.producer.KafkaProducer#doSend方法 这里将步骤分为五步 1.更新MetadataMetadata用于存储部分topic数据
2.将发送内容序列化
3.如果我们有多个分区的话在这里会根据算法选择相应的分区
4.向accumulator写入数据accumulator是一种ConcurrentMapTopicPartition, DequeRecordBatch batches;结构在这里对发送数据做零时缓存 5.缓存的够多了唤醒线程发送数据。 所以看到这里我们就明白了kafka不是直接将数据发送到服务器。而是缓存到内存中知道大于batchsize才去做发送 接下来我们看下sender线程做了什么 直接来到 org.apache.kafka.clients.producer.internals.Sender#run(long) 1.连接的获取 org.apache.kafka.clients.NetworkClient#initiateConnect 具体的connect代码如下 首先与kafka serve端建立了一个non blocking 的SocketChannel然后将该channel注册到一个java.nio.channels.Selector上面并注册OP_CONNECT事件。 接下来我们再看下消息的发送 首先调用 client.send(request, now); 这个方法最终会调用 org.apache.kafka.common.network.KafkaChannel#setSend 为每个request注册 OP_WRITE事件 同时把send传递进来 接下来调用 this.client.poll(pollTimeout, now); 这个的调用链是 org.apache.kafka.common.network.Selector#poll---- org.apache.kafka.common.network.Selector#pollSelectionKeys--- 这里的 key.isWritable() 就是我们上文注册写事件当所有的都准备好了我们调用channel将消息发送到服务端 到这里我们就知道了kafka发送消息的大致流程。本文并没有对细节深入只想对kafka做出快速的了解。 转载于:https://www.cnblogs.com/xmzJava/p/9536351.html