常设中国建设工程法律论坛网站,简单的网站建立怎么做,推广普通话奋进新征程手抄报,wordpress 自动标签使用 Redis Stream 实现消息队列IntroRedis 5.0 中增加了 Stream 的支持#xff0c;利用 Stream 我们可以实现可靠的消息队列#xff0c;并且支持一个消息被多个消费者所消费#xff0c;可以很好的实现消息队列Simple Usage首先我们来看一个简单版本的 Stream 使用#xff… 使用 Redis Stream 实现消息队列IntroRedis 5.0 中增加了 Stream 的支持利用 Stream 我们可以实现可靠的消息队列并且支持一个消息被多个消费者所消费可以很好的实现消息队列Simple Usage首先我们来看一个简单版本的 Stream 使用我们在代码里使用一个发布者一个消费者来模拟一个简单的消息队列的场景来看下面的测试代码private const string StreamKey test-simple-stream;public static async Task MainTest()
{await RedisHelper.GetDatabase().KeyDeleteAsync(StreamKey);// register background consumer_ Task.Factory.StartNew(Consume).ConfigureAwait(false);//await Publish();
}private static async Task Publish()
{Console.WriteLine(Press Enter to publish messages, Press Q to exit);var input Console.ReadLine();while (input is not q and not Q){var redis RedisHelper.GetDatabase();for (var i 0; i 10; i){await redis.StreamAddAsync(StreamKey, message, $test_message_{i});}input Console.ReadLine();}
}private static async Task Consume()
{var lastMsgId 0-0;while (true){await InvokeHelper.TryInvokeAsync(async () {var redis RedisHelper.GetDatabase();var entries await redis.StreamReadAsync(StreamKey, lastMsgId, 2);if (entries.Length 0){return;}foreach (var entry in entries){Console.WriteLine(entry.Id);entry.Values.Dump();// delete message if you want// redis.StreamDelete(StreamKey, new[] { entry.Id });}lastMsgId entries[^1].Id;});await Task.Delay(200);}
}
上面的代码会使用一个后台线程来运行一个 Consumer 来从 Stream 中读取消息有两种消费消息的模式一种是自己维护一个处理的消息 offset每次从这个 offset 之后读取新消息另外一种模式不需要维护本地的 offset可以在处理完消息之后直接删掉消息默认消息是不会删消息的所以如果不删消息的话需要维护Publisher 每次会发布 10 条消息Consumer 每次会读取两条消息处理之后会等待 200 ms之后再查询消息来看一下运行效果吧Consumer Group上面的示例会相对来说比较简单只有一个 Consumer但是在比较常用的场景下往往会有多个消费者处理比如说用户注册成功之后发布一条消息可能会有多个 Consumer 同时给用户发邮件或短信以及给用户加积分等操作这种场景下使用上面的模式就不合适了Redis Stream 中增加了 Consumer Group 的概念有的人甚至称 Redis 内置了一个 Kafka在创建了 Consumer Group 之后向 Stream 发布消息的时候会广播到各个 Consumer Group 中每个 Consumer Group 的消息消费是独立的不同的 Consumer Group 的消费速度可以不一致一个 Consumer Group 也可以有多个 Consumer 同时运行同一个 Group 内的多个 Consumer 是会共享一个 Consumer Group 的消息消费而且我们可以手动进行消息的 ACK来看下面的示例代码吧private const string StreamKey test-stream-group;
private static int _consumerCount;public static async Task MainTest()
{await RedisHelper.GetDatabase().KeyDeleteAsync(StreamKey);// register background consumer_ await Task.Factory.StartNew(Consume).ConfigureAwait(false);_ await Task.Factory.StartNew(Consume).ConfigureAwait(false);//await Publish();
}private static async Task Publish()
{Console.WriteLine(Press Enter to publish messages, Press Q to exit);var input Console.ReadLine();while (input is not q and not Q){var redis RedisHelper.GetDatabase();for (var i 0; i 10; i){await redis.StreamAddAsync(StreamKey, message, $test_message_{i});}input Console.ReadLine();}
}private static async Task Consume()
{Interlocked.Increment(ref _consumerCount);var groupName $group-{_consumerCount};var consumerName $consumer-{_consumerCount};var redis RedisHelper.GetDatabase();redis.StreamCreateConsumerGroup(StreamKey, groupName);while (true){await InvokeHelper.TryInvokeAsync(async () {var messages await redis.StreamReadGroupAsync(StreamKey, groupName, consumerName, count: SecurityHelper.Random.Next(1, 4));if (messages.Length 0){return;}foreach (var message in messages){Console.WriteLine(${groupName}-{message.Id}-{message.Values.ToJson()});await redis.StreamAcknowledgeAsync(StreamKey, groupName, message.Id);}});await Task.Delay(200);}
}
上面的示例代码会先注册两个 Consumer Group两个 Consumer Group 内各有一个 consumer你也可以使用多个 consumer为了体现各个 Consumer Group 是独立的每次获取消息的 Count 是会随机指定的在读取的消息之后会输出消息内容来代替处理消息的逻辑处理完成之后进行消息的 ACK消息的发布逻辑和上面的示例是类似的上述代码执行输出示例可以看到我们发布的消息每一个 consumer group 都会处理消息而且处理消息的速度是独立的互不影响通过 XINFO 命令我们可以对 Stream 做一些监控More利用 Redis 的 Stream 我们可以实现可靠的一个消息机制stream 的每一条消息都会有一个消息 Id默认是两个部分一个部分是时间戳另一个部分是一个序列号消息 Id 可以自定义但是通常情况下推荐用默认的 idRedis 中的 List、HashSet、Set、ZSet 这些数据类型中没有元素的时候会把对应的 Key 也会删掉但是 Stream 是不会的Stream 允许没有消息的时候依然存在Redis Stream 使用的时候需要注意我们是可以指定 Stream 的消息长度的如果我们指定了最大消息长度 10000超出 10000 的时候旧消息就会被挤出队列可能会出现消息的丢失需要对 Stream 做必要的监控和报警Referenceshttps://redis.io/topics/streams-introhttps://redis.io/commandshttps://github.com/WeihanLi/SamplesInPractice/tree/master/RedisSample