济阳网站建设公司,做非法网站怎么规避,设计之家破解版,网站建设陆金手指科捷11前言 最新项目中要用到消息队列来做消息的传输#xff0c;之所以选着 Kafka 是因为要配合其他 java 项目中#xff0c;所以就对 Kafka 了解了一下#xff0c;也算是做个笔记吧。 本篇不谈论 Kafka 和其他的一些消息队列的区别#xff0c;包括性能及其使用方式。 简介 Kafka… 前言 最新项目中要用到消息队列来做消息的传输之所以选着 Kafka 是因为要配合其他 java 项目中所以就对 Kafka 了解了一下也算是做个笔记吧。 本篇不谈论 Kafka 和其他的一些消息队列的区别包括性能及其使用方式。 简介 Kafka 是一个实现了分布式的、具有分区、以及复制的日志的一个服务。它通过一套独特的设计提供了消息系统中间件的功能。它是一种发布订阅功能的消息系统。 一些名词 如果要使用 Kafka 那么在 Kafka 中有一些名词需要知道文本不讨论这些名词是否在其他消息队列中具有相同的含义。所有名词均是针对于 Kafka。 Message 消息就是要发送的内容一般包装成一个消息对象。 Topic 通俗来讲的话就是放置“消息”的地方也就是说消息投递的一个容器。假如把消息看作是信封的话那么 Topic 就是一个邮筒如下图所示 Partition Log Partition 分区可以理解为一个逻辑上的分区像是我们电脑的磁盘 C:, D:, E: 盘一样,Kafka 为每个分区维护着一份日志Log文件。 每个分区是一个有序的不可修改的消息组成的队列。 当消息过来的时候会被追加到日志文件中这个追加是根据 commit 命令来执行的。 分区中的每一条消息都有一个编号叫做 offset id这个 id 在当前分区中是唯一的并且是递增的。 日志就是用来记录分区中接收到的消息因为每一个 Topic 可以同时向一个或者多个分区投递消息所以实际在存储日志的时候每个分区会对应一个日志目录其命名规则一般为 topic_name-partition_id, 目录中就是一个分区的一份 commit log 日志文件。 Kafka 集群会保存一个时间段内所有被发布出来的信息无论这个消息是否已经被消费过这个时间段是可以配置的。比如日志保存时间段被设置为2天那么2天以内发布的消息都是可以消费的而之前的消息为了释放空间将会抛弃掉。Kafka的性能与数据量不相干所以保存大量的消息数据不会造成性能问题。 对日志进行分区主要是为了以下几个目的第一、这可以让log的伸缩能力超过单台服务器上线每个独立的partition的大小受限于单台服务器的容积但是一个topic可以有很多partition从而使得它有能力处理任意大小的数据。第二、在并行处理方面这可以作为一个独立的单元。 生产者 Producers 和其他消息队列一样生产者通常都是消息的产生方。在 Kafka 中它决定消息发送到指定Topic的哪个分区上。 消费者 Consumers 消费者就是消息的使用者在消费者端也有几个名词需要区分一下。 一般消息队列有两种模式的消费方式分别是 队列模式 和 订阅模式。 队列模式一对一就是一个消息只能被一个消费者消费不能重复消费。一般情况队列支持存在多个消费者但是对于一个消息只会有一个消费者可以消费它。 订阅模式一对多一个消息可能被多次消费消息生产者将消息发布到Topic中只要是订阅改Topic的消费者都可以消费。 Consumer Subscriber Group 组是一个消费者的集合每一组都有一个或者多个消费者Kafka 中在一个组内消息只能被消费一次。 在发布订阅模式中消费者是以组的方式进行订阅的就是Consumer Group他们的关系如下图 每个发布到Topic上的消息都会被投递到每个订阅了此Topic的消费者组中的某一个消费者也就是每个组都会被投递但是每个组都只会有一个消费者消费这个消息。 开头介绍了Kafka 是 发布-订阅 功能的消息队列所以在Kafka中队列模式是通过单个消费者组实现的也就是整个结构中只有一个消费者组消费者之间负载均衡。 Kafka 集群 Borker Kafka 集群有多个服务器组成每个服务器称做一个 Broker。同一个Topic的消息按照一定的key和算法被分区存储在不同的Broker上。 上图引用自http://blog.csdn.net/lizhitao 因为 Kafka 的集群它是通过将分区散布到各个Server的实现的也就是说集群中每个服务器他们都是彼此共享分区的数据和请求每个分区的日志文件被复制成指定分数分散在各个集群机器这样来实现的故障转移。 对于每一个分区都会有一个服务器作为它的 leader 并且有零个或者多个服务器作为followers 。leader 服务器负责处理关于这个 partition 所有的读写请求 followers 服务器则被动的复制 leader 服务器。如果有 leader 服务器失效那么 followers 服务器将有一台被自动选举成为新的 leader 。每个服务器作为某些 partition 的 leader 的同时也作为其它服务器的 follower 从而实现了集群的负载均衡。 .NET Core Kafka 客户端 在 .NET Core 中有相对应的开源 kafka sdk 项目就是 Rdkafka。它同时支持 .NET 4.5并且支持跨平台可以运行于LinuxmacOS 和 Windows。 RdKafka Github https://github.com/ah-/rdkafka-dotnet RdKafka Nuget Install-Package RdKafka 生产者 API // Producer 接受一个或多个 BrokerListusing (Producer producer new Producer(127.0.0.1:9092))//发送到一个名为 testtopic 的Topic如果没有就会创建一个using (Topic topic producer.Topic(testtopic)) { //将message转为一个 byte[]byte[] data Encoding.UTF8.GetBytes(Hello RdKafka);DeliveryReport deliveryReport await topic.Produce(data);Console.WriteLine($发送到分区{deliveryReport.Partition}, Offset 为: {deliveryReport.Offset});
} 消费者 API 由于 Kafka 是以消费者组的形式进行消费的所以需要指定一个GroupId。 在内部实现上消费者是通过一个轮询机制来实现的对 Topic 消息的监控这也是Kafka推荐的方式在 Rdkafka 中轮询的间隔为 1 秒钟。 //配置消费者组var config new Config() { GroupId example-csharp-consumer }; using (var consumer new EventConsumer(config, 127.0.0.1:9092)) { //注册一个事件consumer.OnMessage (obj, msg) { string text Encoding.UTF8.GetString(msg.Payload, 0, msg.Payload.Length);Console.WriteLine($Topic: {msg.Topic} Partition: {msg.Partition} Offset: {msg.Offset} {text});}; //订阅一个或者多个Topicconsumer.Subscribe(new[] { testtopic }); //启动consumer.Start();Console.WriteLine(Started consumer, press enter to stop consuming);Console.ReadLine();
} 原文地址http://www.cnblogs.com/savorboard/p/dotnetcore-kafka.html .NET社区新闻深度好文微信中搜索dotNET跨平台或扫描二维码关注