服装网站建设需求分析,食品包装设计要点,青岛抖音广告,怎样在工商局网站上做网登生产者通过producerRecord 对象封装消息主题、消息的value#xff08;内容#xff09;、timestamp(时间戳)等 生产者通过send()方法发送消息#xff0c;send()方法会经过如下几步 1. 首先将消息交给拦截器#xff08;Interceptor#xff09;处理, 拦截器对生产者而言内容、timestamp(时间戳)等 生产者通过send()方法发送消息send()方法会经过如下几步 1. 首先将消息交给拦截器Interceptor处理, 拦截器对生产者而言对所有消息都是生效的拦截器也支持链式编程责任器链的效果拦截器一般将一些通用的功能加进来通常在消息发送前producer回调逻辑前对消息做一些定制化需求消息头部添加消息的属性等 2. 接下来交给序列化器SerializerKey的序列化器和value的序列化器对消息的key和value进行序列化序列化为字节数组 3. 然后将序列化的结果交给分区器(Partitioner)分区器有3种策略来计算消息应该属于哪个分区 在producerRecord中直接指定分区分区器会直接将消息放到指定分区 如果没有指定分区器但是消息有key,分区器会根据消息的key计算hash值根据主题分区数量取模来决定将消息放到哪个分区 如果没有指定分区、也没有指定key分区器会以轮询Round Robin的方式给消息分配分区 在这里插入图片描述 消息经过以上拦截器-序列化器-分区器 进行加工后会将消息放到RecordAccumulator缓冲区,对每个分区都会有一个单独的缓冲区经过分区器计算出分区号之后不同的消息就会分配给不同的缓冲区缓冲区里面消息也是有序的我们可以指定对缓冲区里的消息进行分批次也可以指定缓冲区大小 当缓冲区中消息达到条件会按批次发送到broker对应分区上 broker将接收到的消息进行刷盘持久化 一个消息发出去之后服务器broker会返回给producer响应producer再来判断消息是否发送成功 broker返回元数据信息 - 落盘成功 -生产者继续发送后面消息 broker返回元数据信息 - 落盘失败 - 生产者设置了重试次数 - producer 会将消息重新放入缓冲区进行排队等待再次发送当一个消息发送失败重试需要重发消息是放到缓冲区队尾 生产者去缓冲区重试发送 生产者在重试消息时消息的顺序就错了那怎么保证消息的有序性呢 针对这种情况可以做一个配置 参数max.in.flight.requests.per.connection表示producer 在收到broker响应之前可以发送多少批消息默认5 设置此值是1表示broker在响应之前producer不能再向同一个broker发送请求就是我确认一批你再发下一批这样可以保证消息有序性对消息顺序要求不高情况可以不考虑 补充 Producer 创建时会创建一个Sender线程IO线程设置为守护线程 Producer 创建时会创建缓冲区 Producer 生产消息内部是一个异步流程Sender线程不断轮询RecordAccumulator,满足条件后进行真正的网络IO发送消息 RecordAccumulator缓冲区 对每一个分区都有一个缓冲区 每个分区的缓冲区中消息也是有序的可以指定缓冲区中的消息按批次发送 缓冲区大小达到batch.size默认16KB在缓冲区等待时间 lingger.ms 达到上限以上两个条件满足一个即发送一批 可以指定整个缓冲区的大小 批次的概念很好理解缓冲区就像一辆公交车有两种发车方式一是人满了就发车一是等5分钟就发车不管是人满了还是到5分钟了发车go~ 分批发送可以减少网络IO节省带宽使用减少网络传输的压力提升吞吐量 一个批次消息发送后通过网络发往Kafka指定分区然后刷盘到broker如果Producer设置了retries参数值0,那么允许消息发送失败进行重试重试机制由客户端Producer内部实现Broker端消息落盘成功会返回元数据给生产者 通过阻塞直接返回 同步发送通过回调函数返回异步发送