当前位置: 首页 > news >正文

手机app界面设计网站网站信息内容建设自查

手机app界面设计网站,网站信息内容建设自查,云尚网络科技有限公司网站建设,电子商务网站开发实训文章目录 1. Kafka 生产者2. kafaka 命令行操作3. Kafka 生产者发送消息流程4. Kafka 生产者发送消息的3种方式1. 发送即忘记2. 同步发送3. 异步发送 5. Kafka 消息对象 ProducerRecord 1. Kafka 生产者 Kafka 生产者是指使用 Apache Kafka 消息系统的应用程序#xff0c;它们… 文章目录 1. Kafka 生产者2. kafaka 命令行操作3. Kafka 生产者发送消息流程4. Kafka 生产者发送消息的3种方式1. 发送即忘记2. 同步发送3. 异步发送 5. Kafka 消息对象 ProducerRecord 1. Kafka 生产者 Kafka 生产者是指使用 Apache Kafka 消息系统的应用程序它们负责将消息发送到 Kafka 集群中的一个或多个主题topic。生产者可以将消息发送到指定的主题也可以根据分区策略将消息发送到多个分区中。生产者可以以异步或同步方式发送消息并且可以配置消息的可靠性和持久性等属性。在 Kafka 中生产者是消息的源头它们将消息发送到 Kafka 集群中供消费者消费。 2. kafaka 命令行操作 ① 启动 Zookeeper 集群 [rootmaster01 bin]# pwd /root/ch/soft/zk/zk-01/bin [rootmaster01 bin]# ./zkServer.sh start[rootmaster01 bin]# pwd /root/ch/soft/zk/zk-02/bin [rootmaster01 bin]# ./zkServer.sh start[rootmaster01 bin]# pwd /root/ch/soft/zk/zk-03/bin [rootmaster01 bin]# ./zkServer.sh start② 启动 kafka 集群 [rootmaster01 kafka01]# pwd /root/ch/soft/kafka/kafka01 [rootmaster01 kafka01]# bin/kafka-server-start.sh config/server.properties[rootmaster01 kafka02]# pwd /root/ch/soft/kafka/kafka02 [rootmaster01 kafka02]# bin/kafka-server-start.sh config/server.properties[rootmaster01 kafka03]# pwd /root/ch/soft/kafka/kafka03 [rootmaster01 kafka03]# bin/kafka-server-start.sh config/server.properties③ 创建主题 test [rootmaster01 kafka01]# bin/kafka-topics.sh --zookeeper localhost:2183 --create --partitions 3 --replication-factor 2 --topic test Created topic test. [rootmaster01 kafka01]# bin/kafka-topics.sh --zookeeper localhost:2183 --describe --topic test Topic:test PartitionCount:3 ReplicationFactor:2 Configs: Topic: test Partition: 0 Leader: 2 Replicas: 2,1 Isr: 2,1 Topic: test Partition: 1 Leader: 0 Replicas: 0,2 Isr: 0,2 Topic: test Partition: 2 Leader: 1 Replicas: 1,0 Isr: 1,0④ 生产者发送消息到主题test [rootmaster01 kafka01]# bin/kafka-console-producer.sh --broker-list 10.65.132.2:9093 --topic test hello 你好kafka!⑤ 消费者消费主题test的消息 [rootmaster01 kafka01]# bin/kafka-console-consumer.sh --bootstrap-server 10.65.132.2:9093 --topic test --from-beginning hello 你好kafka!3. Kafka 生产者发送消息流程 ① 首先要构造一个 ProducerRecord 对象该对象可以声明主题Topic、分区Partition、键 Key以及值 Value主题和值是必须要声明的分区和键可以不用指定。 ② 调用send() 方法进行消息发送。 ③ 因为消息要到网络上进行传输所以必须进行序列化序列化器的作用就是把消息的 key 和value对象序列化成字节数组。 ④ 接下来数据传到分区器如果之间的 ProducerRecord 对象指定了分区那么分区器将不再做任何事直接把指定的分区返回如果没有那么分区器会根据 Key 来选择一个分区选择好分区之后生产者就知道该往哪个主题和分区发送记录了。 ⑤ 接着这条记录会被添加到一个记录批次里面这个批次里所有的消息会被发送到相同的主题和分区。会有一个独立的线程来把这些记录批次发送到相应的 Broker 上。 ⑥ Broker 服务器在收到这些消息时会返回一个响应。如果消息成功写入 Kafka 就返回一个RecordMetaData 对象它包含了主题和分区信息以及记录在分区里的偏移量。如果写入失败 就会返回一个错误。生产者在收到错误之后会尝试重新发送消息几次之后如果还是失败 就返回错误信息。 4. Kafka 生产者发送消息的3种方式 发送消息主要有三种模式发后即忘记、同步及异步。在同步模式下程序会一直等待某个操作完成后才会继续执行下一个操作在异步模式下程序可以同时执行多个操作不会阻塞其他操作。 KafkaProducer 的 send() 方法用于向 Kafka 集群发送消息。该方法的语法如下 public interface ProducerK, V extends Closeable {FutureRecordMetadata send(ProducerRecordK, V record);FutureRecordMetadata send(ProducerRecordK, V record, Callback callback); }其中ProducerRecordK, V 表示要发送的消息记录K 和 V 分别表示键和值的类型。send() 方法返回一个 Future 对象表示异步发送消息的结果。 1. 发送即忘记 发送即忘记生产者发送消息后不会等待服务器的响应直接发送下一条消息。它只管往Kafka中发送消息而并不关心消息是否正确到达。在大多数情况下这种发送方式没有什么问题不过在某些时候比如发生不可重试异常时会造成消息的丢失。这种发送方式的性能最高可靠性也最差。 public class CustomProducer01 {private static final String brokerList 10.65.132.2:9093;private static final String topic test;public static Properties initConfig(){Properties properties new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,brokerList);properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());return properties;}public static void main(String[] args) {// kafka生产者属性配置Properties properties initConfig();// kafka生产者发送消息默认是异步发送方式KafkaProducerString, String kafkaProducer new KafkaProducerString, String(properties);ProducerRecordString, String producerRecord new ProducerRecord(topic, 你好kafka!);try{// 发送消息kafkaProducer.send(producerRecord);}catch (Exception e){e.printStackTrace();}// 关闭资源kafkaProducer.close();} }cmd命令行窗口开启 kafka 消息者观察消费者是否接收到消息 [rootmaster01 kafka01]# bin/kafka-console-consumer.sh --bootstrap-server 10.65.132.2:9093 --topic test --from-beginning 你好kafka!2. 同步发送 send方法本身就是异步的send方法返回的Future对象可以使调用方稍后获得发送的结果。在执行send方法之后可以调用 get方法来阻塞等待Kafka的响应直到消息发送成功或者发生异常。如果发生异常那么就需要捕获异常并交由外层逻辑处理。 Future 接口源码 public interface FutureV {boolean cancel(boolean mayInterruptIfRunning);boolean isCancelled();boolean isDone();V get() throws InterruptedException, ExecutionException;V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }Future接口是Java中用于表示异步计算结果的接口。它定义了一些方法用于查询异步计算是否完成、获取计算结果等操作。 cancel方法用于取消异步计算isCancelled方法用于判断异步计算是否已经被取消isDone方法用于判断异步计算是否已经完成。get方法用于获取异步计算的结果如果计算还没有完成则该方法会阻塞直到计算完成。如果计算被取消则该方法会抛出CancellationException异常。如果计算抛出异常则该方法会抛出ExecutionException异常。get(long timeout, TimeUnit unit)方法与get方法类似但是它会在指定的时间内等待计算完成如果超时则会抛出TimeoutException异常。 Future 表示一个任务的生命周期并提供了相应的方法来判断任务是否已经完成或取消以及获取任务的结果和取消任务等。既然KafkaProducer.send方法的返回值是一个Future类型的对象那么完全可以用Java语言层面的技巧来丰富应用的实现比如使用Future中的 getlong timeoutTimeUnit unit方法实现可超时的阻塞。 public class CustomProducer01 {private static final String brokerList 10.65.132.2:9093;private static final String topic test;public static Properties initConfig(){Properties properties new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,brokerList);properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());return properties;}public static void main(String[] args) {// kafka生产者属性配置Properties properties initConfig();// kafka生产者发送消息默认是异步发送方式KafkaProducerString, String kafkaProducer new KafkaProducerString, String(properties);ProducerRecordString, String producerRecord new ProducerRecord(topic, 你好kafka同步发送!);try{// 发送消息FutureRecordMetadata future kafkaProducer.send(producerRecord);// 获取异步计算的结果如果计算还没有完成则该方法会阻塞直到计算完成RecordMetadata recordMetadata future.get();System.out.println(metadata.topic() recordMetadata.topic());}catch (Exception e){e.printStackTrace();}// 关闭资源kafkaProducer.close();} }[rootmaster01 kafka01]# bin/kafka-console-consumer.sh --bootstrap-server 10.65.132.2:9093 --topic test --from-beginning 你好kafka! 你好kafka同步发送!在RecordMetadata对象里包含了消息的一些元数据信息比如当前消息的主题、分区号、分区中的偏移量offset、时间戳等。 3. 异步发送 生产者发送消息后不会等待服务器的响应而是通过回调函数来处理服务器的响应。回调函数会在 producer 收到 ack 时调用该方法有两个参数分别是元数据信息RecordMetadata和异常信息Exception如果 Exception 为 null说明消息发送成功如果 Exception 不为 null说明消息发送失败。 注意消息发送失败会自动重试不需要我们在回调函数中手动重试。 public class CustomProducer01 {private static final String brokerList 10.65.132.2:9093;private static final String topic test;public static Properties initConfig(){Properties properties new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,brokerList);properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());return properties;}public static void main(String[] args) {// kafka生产者属性配置Properties properties initConfig();// kafka生产者发送消息默认是异步发送方式KafkaProducerString, String kafkaProducer new KafkaProducerString, String(properties);ProducerRecordString, String producerRecord new ProducerRecord(topic, 你好kafka异步发送带返回值!);try{// 发送消息kafkaProducer.send(producerRecord, new Callback() {Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {// 说明消息发送成功if(enull){System.out.println(metadata.topic() recordMetadata.topic());System.out.println(metadata.partition() recordMetadata.partition());}}});}catch (Exception e){e.printStackTrace();}// 关闭资源kafkaProducer.close();} }[rootmaster01 kafka01]# bin/kafka-console-consumer.sh --bootstrap-server 10.65.132.2:9093 --topic test --from-beginning 你好kafka! 你好kafka同步发送! 你好kafka异步发送带回调函数!Kafka生产者异步发送消息时可以通过指定回调函数来处理发送结果。当消息发送完成后回调函数会被调用以通知应用程序消息发送的结果。具体来说当生产者成功发送消息时回调函数会被传递一个RecordMetadata对象该对象包含了发送消息的相关信息如消息所在的分区、消息在分区中的偏移量等。如果发送消息失败则回调函数会被传递一个非空的Exception对象以指示发送失败的原因。 需要注意的是回调函数是在生产者的I/O线程中被调用的因此应该尽量避免在回调函数中执行耗时的操作以免影响生产者的性能。 5. Kafka 消息对象 ProducerRecord ① ProducerRecord 成员变量 public class ProducerRecordK, V {// 消息要发送到的主题private final String topic;// 消息要发送到的分区号如果为null则由Kafka自动选择分区private final Integer partition;// 消息的键private final K key;// 消息的值private final V value;// 消息的时间戳如果为null则使用当前时间戳private final Long timestamp;// 消息的头部信息private final Headers headers;// ..... }topic和partition字段分别代表消息要发往的主题和分区号。key是用来指定消息的键它不仅是消息的附加信息还可以用来计算分区号进而可以让消息发往特定的分区。前面提及消息以主题为单位进行归类而这个key可以让消息再进行二次归类同一个key的消息会被划分到同一个分区中。value是指消息体一般不为空如果为空则表示特定的消息。timestamp是指消息的时间戳它有CreateTime和LogAppendTime两种类型前者表示消息创建的时间后者表示消息追加到日志文件的时间。 ② ProducerRecord 构造函数 public class ProducerRecordK, V {private final String topic;private final Integer partition;private final Headers headers;private final K key;private final V value;private final Long timestamp;public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, IterableHeader headers) {if (topic null)throw new IllegalArgumentException(Topic cannot be null.);if (timestamp ! null timestamp 0)throw new IllegalArgumentException(String.format(Invalid timestamp: %d. Timestamp should always be non-negative or null., timestamp));if (partition ! null partition 0)throw new IllegalArgumentException(String.format(Invalid partition: %d. Partition number should always be non-negative or null., partition));this.topic topic;this.partition partition;this.key key;this.value value;this.timestamp timestamp;this.headers new RecordHeaders(headers);}public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) {this(topic, partition, timestamp, key, value, null);}public ProducerRecord(String topic, Integer partition, K key, V value, IterableHeader headers) {this(topic, partition, null, key, value, headers);}public ProducerRecord(String topic, Integer partition, K key, V value) {this(topic, partition, null, key, value, null);}public ProducerRecord(String topic, K key, V value) {this(topic, null, null, key, value, null);}public ProducerRecord(String topic, V value) {this(topic, null, null, null, value, null);} }生产者发送消息的分区选择逻辑 若指定Partition ID则消息发送至指定的Partition若未指定Partition ID但指定了Key则消息会按照 hasy(key) 发送至对应Partition若既未指定Partition ID也没指定Key则消息会按照round-robin模式发送到每个Partition若同时指定了Partition ID和Key则消息只会发送到指定的Partition (Key不起作用代码逻辑决定)
http://www.pierceye.com/news/140297/

相关文章:

  • 惠州做网站公司网页游戏排行榜前十名歌
  • 会ps的如何做网站高等教材建筑电气久久建筑网
  • 甘肃住房城乡建设厅网站wordpress风格化页面
  • 起名网站建设免费找素材软件
  • 网站基本信息设置链接搜索
  • 广州海珠网站开发营销策划
  • 医院网站制作公司专门做spa的网站
  • 企业网页制作与网站设计网站必须天天更新吗
  • 乌苏市城乡建设局网站外贸网网站建设
  • html5网站开发实例书籍凡科建站代理
  • 与建设部网站网站注册登录页面设计
  • 企业网站推广计划免费最新如何建设网站教程视频
  • 17一起做网站普宁站好看个人网页模板
  • 民治营销网站专业网站建设价格最优
  • 免费的html网站做柜子喜欢上哪些网站看
  • 网站没备案怎么做加速现代装修风格三室两厅效果图
  • 互助平台网站建设网上商城怎么购物
  • 百度知道山东网站建设建设网站成本预算
  • 人人做免费网站网站建站是 什么
  • 以背景做网站视频为单位网站建设实施方案
  • 简洁大气企业网站模板西安个人做网站
  • 做一个网站需要到哪里做辽宁同鑫建设网站
  • 开发网站监控推荐扬中市建设局网站
  • 手机网站根目录简述一个网站设计的主要步骤
  • 网站改版seo建议网页设计师的能力
  • 网站上线前应该备案吗温州网站建设风格
  • 网站建设书籍免费聊城市东昌府区建设路小学网站
  • 网站标题优化怎么做找人一起做素材网站
  • 如何创建个人网站模板用织梦做模板网站
  • 平台建站建设做网站一定要有营业执照吗