做网站要写多少行代码,网易云音乐网页版,做的网站 如何在局域网内访问,wordpress更好用吗生产者发送到对应的分区有以下几种方式#xff1a;(1)指定了patition#xff0c;则直接使用#xff1b;(可以查阅对应的java api, 有多种参数)(2)未指定patition但指定key#xff0c;通过对key的value进行hash出一个patition#xff1b;(3)patition和key都未指定#xff…生产者发送到对应的分区有以下几种方式(1)指定了patition则直接使用(可以查阅对应的java api, 有多种参数)(2)未指定patition但指定key通过对key的value进行hash出一个patition(3)patition和key都未指定使用轮询选出一个patition。但是kafka提供了自定义分区算法的功能,由业务手动实现分布1、实现一个自定义分区类custompartitioner实现partitionerimport org.apache.kafka.clients.producer.partitioner;import org.apache.kafka.common.cluster;import java.util.map;public class custompartitioner implements partitioner {/**** param topic 当前的发送的topic* param key 当前的key值* param keybytes 当前的key的字节数组* param value 当前的value值* param valuebytes 当前的value的字节数组* param cluster* return*/overridepublic int partition(string topic, object key, byte[] keybytes, object value, byte[] valuebytes, cluster cluster) {//这边根据返回值就是分区号, 这边就是固定发送到三号分区return 3;}overridepublic void close() {}overridepublic void configure(map configs) {}}2、producer配置文件指定具体的分区类// 具体的分区类props.put(producerconfig.partitioner_class_config, kafka.custompartitioner);技巧可以使用producerconfig中提供的配置producerconfigkafka producer拦截器拦截器(interceptor)是在kafka 0.10版本被引入的。interceptor使得用户在消息发送前以及producer回调逻辑前有机会对消息做一些定制化需求比如修改消息等。许用户指定多个interceptor按序作用于同一条消息从而形成一个拦截链(interceptor chain)。所使用的类为org.apache.kafka.clients.producer.producerinterceptor我们可以编码测试下1、定义消息拦截器实现消息处理(可以是加时间戳等等unid等等。)import org.apache.kafka.clients.producer.producerinterceptor;import org.apache.kafka.clients.producer.producerrecord;import org.apache.kafka.clients.producer.recordmetadata;import java.util.map;import java.util.uuid;public class messageinterceptor implements producerinterceptor {overridepublic void configure(map configs) {system.out.println(这是messageinterceptor的configure方法);}/*** 这个是消息发送之前进行处理** param record* return*/overridepublic producerrecord onsend(producerrecord record) {// 创建一个新的record把uuid入消息体的最前部system.out.println(为消息添加uuid);return new producerrecord(record.topic(), record.partition(), record.timestamp(), record.key(),uuid.randomuuid().tostring().replace(-, ) , record.value());}/*** 这个是生产者回调函数调用之前处理* param metadata* param exception*/overridepublic void onacknowledgement(recordmetadata metadata, exception exception) {system.out.println(messageinterceptor拦截器的onacknowledgement方法);}overridepublic void close() {system.out.println(messageinterceptor close 方法);}}2、定义计数拦截器import java.util.map;import org.apache.kafka.clients.producer.producerinterceptor;import org.apache.kafka.clients.producer.producerrecord;import org.apache.kafka.clients.producer.recordmetadata;public class counterinterceptor implements producerinterceptor{private int errorcounter 0;private int successcounter 0;overridepublic void configure(map configs) {system.out.println(这是counterinterceptor的configure方法);}overridepublic producerrecord onsend(producerrecord record) {system.out.println(counterinterceptor计数过滤器不对消息做任何操作);return record;}overridepublic void onacknowledgement(recordmetadata metadata, exception exception) {// 统计成功和失败的次数system.out.println(counterinterceptor过滤器执行统计失败和成功数量);if (exception null) {successcounter;} else {errorcounter;}}overridepublic void close() {// 保存结果system.out.println(successful sent: successcounter);system.out.println(failed sent: errorcounter);}}3、producer客户端import org.apache.kafka.clients.producer.*;import java.util.arraylist;import java.util.list;import java.util.properties;public class producer1 {public static void main(string[] args) throws exception {properties props new properties();// kafka服务端的主机名和端口号props.put(bootstrap.servers, localhost:9092);// 等待所有副本节点的应答props.put(acks, all);// 消息发送最大尝试次数props.put(retries, 0);// 一批消息处理大小props.put(batch.size, 16384);// 请求延时可能生产数据太快了props.put(linger.ms, 1);// 发送缓存区内存大小数据是先放到生产者的缓冲区props.put(buffer.memory, 33554432);// key序列化props.put(key.serializer, org.apache.kafka.common.serialization.stringserializer);// value序列化props.put(value.serializer, org.apache.kafka.common.serialization.stringserializer);// 具体的分区类props.put(producerconfig.partitioner_class_config, kafka.custompartitioner);//定义拦截器list interceptors new arraylist();interceptors.add(kafka.messageinterceptor);interceptors.add(kafka.counterinterceptor);props.put(producerconfig.interceptor_classes_config, interceptors);producer producer new kafkaproducer(props);for (int i 0; i 1; i) {producer.send(new producerrecord(test_0515, i , xxx- i), new callback() {public void oncompletion(recordmetadata recordmetadata, exception e) {system.out.println(这是producer回调函数);}});}/*system.out.println(现在执行关闭producer);producer.close();*/producer.close();}}总结我们可以知道拦截器链各个方法的执行顺序假如有a、b拦截器在一个拦截器链中(1)执行a的configure方法执行b的configure方法(2)执行a的onsend方法b的onsend方法(3)生产者发送完毕后执行a的onacknowledgement方法b的onacknowledgement方法。(4)执行producer自身的callback回调函数。(5)执行a的close方法b的close方法。以上就是本文的全部内容希望对大家的学习有所帮助也希望大家多多支持萬仟网。如您对本文有疑问或者有任何想说的请点击进行留言回复万千网友为您解惑