当前位置: 首页 > news >正文

在线网站源码提取网站怎么快速做收录

在线网站源码提取,网站怎么快速做收录,赣州推广团队,wordpress 自定义表单插件1. 生产消息 1.1 生产消息的基本步骤 #xff08;一#xff09;创建Map类型的配置对象#xff0c;根据场景增加相应的配置属性#xff1a; 参数名参数作用类型默认值推荐值bootstrap.servers集群地址#xff0c;格式为#xff1a;brokerIP1:端口号,brokerIP2:端口号必…1. 生产消息 1.1 生产消息的基本步骤 一创建Map类型的配置对象根据场景增加相应的配置属性 参数名参数作用类型默认值推荐值bootstrap.servers集群地址格式为brokerIP1:端口号,brokerIP2:端口号必须key.serializer对生产数据Key进行序列化的类完整名称必须Kafka提供的字符串序列化类StringSerializervalue.serializer对生产数据Value进行序列化的类完整名称必须Kafka提供的字符串序列化类StringSerializerinterceptor.classes拦截器类名多个用逗号隔开可选batch.size数据批次字节大小。此大小会和数据最大估计值进行比较取大值。估值6121keySize1valueSize11可选16Kretries重试次数可选整型最大值0或整型最大值request.timeout.ms请求超时时间可选30slinger.ms数据批次在缓冲区中停留时间可选acks请求应答类型all(-1), 0, 1可选all(-1)根据数据场景进行设置retry.backoff.ms两次重试之间的时间间隔可选100msbuffer.memory数据收集器缓冲区内存大小可选32M64Mmax.in.flight.requests.per.connection每个节点连接的最大同时处理请求的数量可选5小于等于5enable.idempotence幂等性可选true根据数据场景进行设置partitioner.ignore.keys是否放弃使用数据key选择分区可选falsepartitioner.class分区器类名可选null 二创建待发送数据 在 Kafka 中传递的数据我们称之为消息message或记录record所以Kafka发送数据前需要将待发送的数据封装为指定的数据类型 相关属性必须在构建数据模型时指定其中主题和value的值时必须要传递的。如果配置中开启了自动创建主题那么 Topic 主题可以不存在。value 就是我们需要真正传递的数据了而 Key 可以用于数据的分区定位。 三创建生产者对象发送生产的数据 根据前面提供的配置信息创建生产者对象通过这个生产者对象向 Kafka 服务器节点发送数据而具体的发送是由生产者对象创建时内部构件的多个组件实现的多个组件的关系类似与生产者消费者模式。 1数据生产者KafkaProducer生产者对象用于对我们的数据进行必要的转换和处理将处理后的数据放入到数据收集器中类似于生产者消费者模式下的生产者。 如果配置拦截器栈interceptor.classes那么将数据进行拦截处理。某一个拦截器出现异常并不会影响后续的拦截器处理。因为发送的数据为 KV 数据所以需要根据配置信息中的序列化对象对数据中 Key 和 Value 分别进行序列化处理。计算数据嗦发送的分区位置。将数据追加到数据收集器中。 2数据收集器RecordAccumulator用于收集转换我们生产的数据蕾西与生产者消费者模式下的缓冲区。为了优化数据的传输Kafka 并不是生产一条数据就向 Broker 发送一条数据而是通过合并单条消息进行批量批次发送提高吞吐量减少带宽消耗。 默认情况下一个发送批次的数据容量为 16k这个可以通过参数 batch.size进行改善。批次是和分区进行绑定的。也就是说发往同一个分区的数据会进行合并形成一个批次。如果当前批次能容纳数据那么直接将数据追加到批次中即可如果不能容纳数据那么会产生新的批次放入到当前分区的批次队列中这个队列使用的是 Java 双端队列 Deque。旧的批次关闭不再接收新的数据等待发送。 3数据发送器Sender线程对象用于从收集器中获取数据向服务节点发送。类似于生产者消费者模式下的消费者。因为是线程对象所以启动后会不断轮询获取数据收集器中已经关闭的批次数据。对批次进行整合后再发送到 Broker 节点中 因为数据真正发送的地方是 Broker 节点不是分区。所以需要将从数据收集器中收集到的批次数据按照可用 Broker 节点重新组合成List集合。将组合后的节点List批次的数据封装成客户端请求请求键为Produce发送到网络客户端对象的缓冲区由网络客户端对象通过网络发送给 Broker 节点。Broker 节点获取客户端请求并根据请求键进行后续的数据处理向分区中增加数据。 1.2 生产消息的基本代码 // TODO 配置属性集合 MapString, Object configMap new HashMap(); // TODO 配置属性Kafka服务器集群地址 configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, localhost:9092); // TODO 配置属性Kafka生产的数据为KV对所以在生产数据进行传输前需要分别对K,V进行对应的序列化操作 configMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringSerializer); configMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringSerializer); // TODO 创建Kafka生产者对象建立Kafka连接 // 构造对象时需要传递配置参数 KafkaProducerString, String producer new KafkaProducer(configMap); // TODO 准备数据,定义泛型 // 构造对象时需要传递 【Topic主题名称】【Key】【Value】三个参数 ProducerRecordString, String record new ProducerRecordString, String(test, key1, value1 ); // TODO 生产发送数据 producer.send(record); // TODO 关闭生产者连接 producer.close();1.3 发送消息 1.3.1 拦截器 生产者 API 在数据准备好发送给 Kafka 服务器之前允许我们对生产的数据进行统一的处理比如校验整合数据等等。这些处理我们是可以通过 Kafka 提供的拦截器完成。 这里的拦截器是可以配置多个的。执行时会按照声明顺序执行完一个后再执行下一个。并且某一个拦截器如果出现异常只会跳出当前拦截器逻辑并不会影响后续拦截器的处理。所以开发时需要将拦截器的这种处理方法考虑进去。 1.3.1.1 增加拦截器类 1实现生产者拦截器接口 ProducerInterceptor /*** TODO 自定义数据拦截器* 1. 实现Kafka提供的生产者接口ProducerInterceptor* 2. 定义数据泛型 K, V* 3. 重写方法* onSend* onAcknowledgement* close* configure*/ public class KafkaInterceptorMock implements ProducerInterceptorString, String {Overridepublic ProducerRecordString, String onSend(ProducerRecordString, String record) {return record;}Override public void onAcknowledgement(RecordMetadata metadata, Exception exception) {}Overridepublic void close() {}Overridepublic void configure(MapString, ? configs) {} }2实现接口中的方法根据业务功能重写具体的方法 方法名作用onSend数据发送前会执行此方法进行数据发送前的预处理onAcknowledgement数据发送后获取应答时会执行此方法close生产者关闭时会执行此方法完成一些资源回收和释放的操作configure创建生产者对象的时候会执行此方法可以根据场景对生产者对象的配置进行统一修改或转换。 1.3.1.2 配置拦截器 public class ProducerInterceptorTest {public static void main(String[] args) {MapString, Object configMap new HashMap();configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, localhost:9092);configMap.put( ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());configMap.put( ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());configMap.put( ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, KafkaInterceptorMock.class.getName());KafkaProducerString, String producer null;try {producer new KafkaProducer(configMap);for ( int i 0; i 1; i ) {ProducerRecordString, String record new ProducerRecordString, String(test, key i, value i);final FutureRecordMetadata send producer.send(record);}} catch ( Exception e ) {e.printStackTrace();} finally {if ( producer ! null ) {producer.close();}}} }1.3.2 回调方法 Kafka 发送数据时可以同时传递回调对象Callback用于对数据的发送结果进行对应处理具体代码实现采用匿名类或 Lambda 表达式都可以。 public class KafkaProducerASynTest {public static void main(String[] args) {MapString, Object configMap new HashMap();configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, localhost:9092);configMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringSerializer);configMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringSerializer);KafkaProducerString, String producer new KafkaProducer(configMap);// TODO 循环生产数据for ( int i 0; i 1; i ) {// TODO 创建数据ProducerRecordString, String record new ProducerRecordString, String(test, key i, value i);// TODO 发送数据producer.send(record, new Callback() {// TODO 回调对象public void onCompletion(RecordMetadata recordMetadata, Exception e) {// TODO 当数据发送成功后会回调此方法System.out.println(数据发送成功 recordMetadata.timestamp());}});}producer.close();} }1.3.3 异步发送 Kafka 发送数据时底层的实现类似于生产者消费者模式。对应的底层会由主线程代码作为生产者向缓冲区中放数据而数据发送线程会从缓冲区中获取数据进行发送。Broker 接收到数据后进行后续处理。 如果 Kafka 通过主线程代码将一条数据放入到缓冲区后无需等待数据的后续发送过程就直接发送下一条数据的场合我们就称之为异步发送。 public class KafkaProducerASynTest {public static void main(String[] args) {MapString, Object configMap new HashMap();configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, localhost:9092);configMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringSerializer);configMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringSerializer);KafkaProducerString, String producer new KafkaProducer(configMap);// TODO 循环生产数据for ( int i 0; i 10; i ) {// TODO 创建数据ProducerRecordString, String record new ProducerRecordString, String(test, key i, value i);// TODO 发送数据producer.send(record, new Callback() {// TODO 回调对象public void onCompletion(RecordMetadata recordMetadata, Exception e) {// TODO 当数据发送成功后会回调此方法System.out.println(数据发送成功 recordMetadata.timestamp());}});// TODO 发送当前数据System.out.println(发送数据);}producer.close();} }1.3.4 同步发送 Kafka 发送数据时底层的实现类似于生产者消费者模式。对应的底层会由主线程代码作为生产者向缓冲区中放数据而数据发送线程会从缓冲区中获取数据进行发送。Broker 接收到数据后进行后续处理。 如果 Kafka 通过主线程代码将一条数据放入到缓冲区后需等待数据的后续发送操作的应答状态才能发送下一条数据的场合我们就称之为同步发送。所以这里的所谓同步就是生产数据的线程需要等待线程的应答响应结果。 代码实现上采用的是 JDK1.5 增加的JUC 并发编程的 Future 接口的 get 方法实现。 public class KafkaProducerASynTest {public static void main(String[] args) throws Exception {MapString, Object configMap new HashMap();configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, localhost:9092);configMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringSerializer);configMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringSerializer);KafkaProducerString, String producer new KafkaProducer(configMap);// TODO 循环生产数据for ( int i 0; i 10; i ) {// TODO 创建数据ProducerRecordString, String record new ProducerRecordString, String(test, key i, value i);// TODO 发送数据producer.send(record, new Callback() {// TODO 回调对象public void onCompletion(RecordMetadata recordMetadata, Exception e) {// TODO 当数据发送成功后会回调此方法System.out.println(数据发送成功 recordMetadata.timestamp());}}).get();// TODO 发送当前数据System.out.println(发送数据);}producer.close();} }
http://www.pierceye.com/news/55940/

相关文章:

  • wordpress 建单页网站个人博客网站模板源码
  • 电商网站数据中心建设方案网站开发程序排名
  • vi毕业设计代做网站苏州模板网站专业设计
  • 济南php网站开发dw做网站简单首页
  • iis5.1 发布网站什么网站可以做机票行程单
  • 做网站公司排名山东省住房和城乡建设部网站
  • 山东建站管理系统亚洲成品1688进入
  • 什么是大型门户网站对网站开发的理解500字
  • 枣庄市建设项目环评备案网站爱情动作片做网站
  • 足球网站建设意义企业网站建设参考资料
  • 网站怎么注册啊动画制作专业就业前景
  • 营销型网站怎么建设js网站跳转代码
  • 福建城建设厅官方网站公司网站建设费会计处理
  • 深圳建设网站龙岗网站建设网页设计与制作课程思政
  • 建筑公司网站建设方案网站怎么建设
  • 如何搭建一个自己上传视频的网站做微商网站发帖免费教程
  • 建设部标准规范网站成考报名系统入口官网
  • 国外哪个网站做c 挣钱购物网站开发 项目描述
  • 中国企业网站建设网站如何去分析
  • 网站建设好后能修改吗佛山 技术支持 骏域网站建设
  • 网站开发是什重庆网站网络推广推广
  • 汕头网站开发企业查询官网免费查询一下
  • 文明网i中国精神文明建设门户网站seo优化课程
  • 乐从网站建设公司自建的电子网站如何做推广
  • 电子商务网站建设与管理的考试html做网站经验技巧
  • 创建qq网站吗郑州华久做网站
  • 网店怎么做网站seo啥意思怎么做
  • 专业做网站优化需要多久劳务公司网站建设方案
  • 门户网站建设厂商名录苏州seo排名公司
  • 四个平台建设网站不显示图片用群晖nas做网站