当前位置: 首页 > news >正文

做外贸在什么网站做wordpress html 单页模板

做外贸在什么网站做,wordpress html 单页模板,游戏小程序代理,塑胶东莞网站建设技术支持【Kafka】| 总结/Edison Zhou1可用的Kafka .NET客户端作为一个.NET Developer#xff0c;自然想要在.NET项目中集成Kafka实现发布订阅功能。那么#xff0c;目前可用的Kafka客户端有哪些呢#xff1f;目前.NET圈子主流使用的是 Confluent.Kafkaconfluent-kafka-dotnet : htt… 【Kafka】| 总结/Edison Zhou1可用的Kafka .NET客户端作为一个.NET Developer自然想要在.NET项目中集成Kafka实现发布订阅功能。那么目前可用的Kafka客户端有哪些呢目前.NET圈子主流使用的是 Confluent.Kafkaconfluent-kafka-dotnet : https://github.com/confluentinc/confluent-kafka-dotnet其他主流的客户端还有rdkafka-dotnet项目但是其已经被并入confluent-kakfa-dotnet项目进行维护了。因此推荐使用confluent-kafka-dotnet其配置友好功能也更全面。NCC千星项目CAP的Kafka扩展包DotNetCore.CAP.Kafka内部也是基于Confluent.Kafka来实现的接下来本文就来在.NET Core项目下通过Confluent.Kafka和CAP两个主流开源项目来操作一下Kafka实现一下发布订阅的功能。2基于Confluent.Kafka的Sample要完成本文示例首先得有一个启动好的Kafka Broker服务。关于如何搭建Kafka请参考上一篇通过Docker部署Kafka集群。安装相关组件在.NET Core项目中新建一个类库暂且命名为EDT.Kafka.Core安装Confluent.Kafka组件PMInstall-Package Confluent.Kafka编写KafkaService编写IKafkaService接口namespace EDT.Kafka.Core {public interface IKafkaService{Task PublishAsyncT(string topicName, T message) where T : class;Task SubscribeAsyncT(IEnumerablestring topics, ActionT messageFunc, CancellationToken cancellationToken default) where T : class;} }编写KafkaService实现类namespace EDT.Kafka.Core {public class KafkaService : IKafkaService{public static string KAFKA_SERVERS 127.0.0.1:9091;public async Task PublishAsyncT(string topicName, T message) where T : class{var config new ProducerConfig { BootstrapServers KAFKA_SERVERS,BatchSize 16384, // 修改批次大小为16KLingerMs 20 // 修改等待时间为20ms};using (var producer new ProducerBuilderstring, string(config).Build()){await producer.ProduceAsync(topicName, new Messagestring, string{Key Guid.NewGuid().ToString(),Value JsonConvert.SerializeObject(message)}); ;}}public async Task SubscribeAsyncT(IEnumerablestring topics, ActionT messageFunc, CancellationToken cancellationToken default) where T : class{var config new ConsumerConfig{BootstrapServers KAFKA_SERVERS,GroupId Consumer,EnableAutoCommit false, // 禁止AutoCommitAcks Acks.Leader, // 假设只需要Leader响应即可AutoOffsetReset AutoOffsetReset.Earliest // 从最早的开始消费起};using (var consumer  new ConsumerBuilderIgnore, string(config).Build()){consumer.Subscribe(topics);try{while (true){try{var consumeResult consumer.Consume(cancellationToken);Console.WriteLine($Consumed message {consumeResult.Message?.Value} at: {consumeResult?.TopicPartitionOffset}.);if (consumeResult.IsPartitionEOF){Console.WriteLine($ - {DateTime.Now:yyyy-MM-dd HH:mm:ss} 已经到底了{consumeResult.Topic}, partition {consumeResult.Partition}, offset {consumeResult.Offset}.);continue;}T messageResult null;try{messageResult JsonConvert.DeserializeObjectT(consumeResult.Message.Value);}catch (Exception ex){var errorMessage $ - {DateTime.Now:yyyy-MM-dd HH:mm:ss}【Exception 消息反序列化失败Value{consumeResult.Message.Value}】 {ex.StackTrace?.ToString()};Console.WriteLine(errorMessage);messageResult null;}if (messageResult ! null/* consumeResult.Offset % commitPeriod 0*/){messageFunc(messageResult);try{consumer.Commit(consumeResult);}catch (KafkaException e){Console.WriteLine(e.Message);}}}catch (ConsumeException e){Console.WriteLine($Consume error: {e.Error.Reason});}}}catch (OperationCanceledException){Console.WriteLine(Closing consumer.);consumer.Close();}}await Task.CompletedTask;}} }为了方便后续的演示在此项目中再创建一个类 EventDatapublic class EventData {public string TopicName { get; set; }public string Message { get; set; }public DateTime EventTime { get; set; } }编写Producer新建一个Console项目暂且命名为EDT.Kafka.Demo.Producer其主体内容如下namespace EDT.Kafka.Demo.Producer {public class Program{static async Task Main(string[] args){KafkaService.KAFKA_SERVERS kafka1:9091,kafka2:9092,kafka3:9093;var kafkaService new KafkaService();for (int i 0; i 50; i){var eventData new EventData{TopicName testtopic,Message $This is a message from Producer, Index : {i 1},EventTime DateTime.Now};await kafkaService.PublishAsync(eventData.TopicName, eventData);}Console.WriteLine(Publish Done!);Console.ReadKey();}} }编写Consumer新建一个Console项目暂且命名为EDT.Kafka.Demo.Consumer其主体内容如下namespace EDT.Kafka.Demo.Consumer {public class Program{static async Task Main(string[] args){KafkaService.KAFKA_SERVERS kafka1:9091,kafka2:9092,kafka3:9093;var kafkaService new KafkaService();var topics new Liststring { testtopic };await kafkaService.SubscribeAsyncEventData(topics, (eventData) {Console.WriteLine($ - {eventData.EventTime: yyyy-MM-dd HH:mm:ss} 【{eventData.TopicName}】- 已处理);});}} }测试Pub/Sub效果将Producer和Consumer两个项目都启动起来可以看到当Consumer消费完50条消息并一一确认之后Producer这边就算发布结束。3基于CAP项目的Sample模拟场景说明假设我们有两个微服务一个是Catalog微服务一个是Basket微服务当Catalog微服务产生了Product价格更新的事件就会将其发布到KafkaBasket微服务作为消费者就会订阅这个消息然后更新购物车中对应商品的最新价格。Catalog API新建一个ASP.NET Core WebAPI项目然后分别安装以下组件PMInstall Package DotNetCore.CAP PMInstall Package DotNetCore.CAP.MongoDB PMInstall Package DotNetCore.CAP.Kafka在Startup中的ConfigureServices方法中注入CAPpublic void ConfigureServices(IServiceCollection services) {......services.AddCap(x {x.UseMongoDB(mongodb://account:passwordmongodb-server:27017/products?authSourceadmin);x.UseKafka(kafka1:9091,kafka2:9092,kafka3:9093);}); }新建一个ProductController实现一个Update产品价格的接口在其中通过CapPublisher完成发布消息到Kafkanamespace EDT.Demo.Catalog.API.Controllers {[ApiController][Route([controller])]public class ProductController : ControllerBase{private static readonly IListProduct Products new ListProduct{new Product { Id 0001, Name 电动牙刷A, Price 99.90M, Introduction 暂无介绍 },new Product { Id 0002, Name 电动牙刷B, Price 199.90M, Introduction 暂无介绍 },new Product { Id 0003, Name 洗衣机A, Price 2999.90M, Introduction 暂无介绍 },new Product { Id 0004, Name 洗衣机B, Price 3999.90M, Introduction 暂无介绍 },new Product { Id 0005, Name 电视机A, Price 1899.90M, Introduction 暂无介绍 },};private readonly ICapPublisher _publisher;private readonly IMapper _mapper;public ProductController(ICapPublisher publisher, IMapper mapper){_publisher publisher;_mapper mapper;}[HttpGet]public IListProductDTO Get(){return _mapper.MapIListProductDTO(Products); ;}[HttpPut]public async TaskIActionResult UpdatePrice(string id, decimal newPrice){// 业务代码var product Products.FirstOrDefault(p p.Id id);product.Price newPrice;// 发布消息await _publisher.PublishAsync(ProductPriceChanged, new ProductDTO { Id product.Id, Name product.Name, Price product.Price});return NoContent();}} }Basket API参照Catalog API项目创建ASP.NET Core WebAPI项目并安装对应组件在ConfigureServices方法中注入CAP。新建一个BasketController用于订阅Kafka对应TopicProductPriceChanged 的消息。namespace EDT.Demo.Basket.API.Controllers {[ApiController][Route([controller])]public class BasketController : ControllerBase{private static readonly IListMyBasketDTO Baskets new ListMyBasketDTO{new MyBasketDTO { UserId U001, Catalogs new ListCatalog{new Catalog { Product new ProductDTO { Id 0001, Name 电动牙刷A, Price 99.90M }, Count 2 },new Catalog { Product new ProductDTO { Id 0005, Name 电视机A, Price 1899.90M }, Count 1 },} },new MyBasketDTO { UserId U002, Catalogs new ListCatalog{new Catalog { Product new ProductDTO { Id 0002, Name 电动牙刷B, Price 199.90M }, Count 2 },new Catalog { Product new ProductDTO { Id 0004, Name 洗衣机B, Price 3999.90M }, Count 1 },}}};[HttpGet]public IListMyBasketDTO Get(){return Baskets;}[NonAction][CapSubscribe(ProductPriceChanged)]public async Task RefreshBasketProductPrice(ProductDTO productDTO){if (productDTO null)return;foreach (var basket in Baskets){foreach (var catalog in basket.Catalogs){if (catalog.Product.Id productDTO.Id){catalog.Product.Price productDTO.Price;break;}}}await Task.CompletedTask;}} }测试效果同时启动Catalog API 和 Basket API两个项目。首先通过Swagger在Basket API中查看所有用户购物车中的商品的价格可以看到0002的商品是199.9元。然后通过Swagger在Catalog API中更新Id为0002的商品的价格至499.9元。最后通过Swagger在Basket API中查看所有用户购物车中的商品的价格可以看到0002的商品已更新至499.9元。End总结本文总结了.NET Core如何通过对应客户端操作Kafka基于Confluent.Kafka项目和CAP项目可以方便的实现发布订阅的效果。参考资料阿星Plus《.NET Core下使用Kafka》https://blog.csdn.net/meowv/article/details/108675741麦比乌斯皇《.NET使用Kafka小结》https://www.cnblogs.com/hsxian/p/12907542.htmlTony《.NET Core事件总线解决方案CAP基于Kafka》https://www.cnblogs.com/Tony100/archive/2019/01/29/10333440.html极客时间胡夕《Kafka核心技术与实战》B站尚硅谷《Kafka 3.x入门到精通教程》年终总结Edison的2020年终总结数字化转型我在传统企业做数字化转型C#刷题C#刷剑指Offer算法题系列文章目录.NET面试.NET开发面试知识体系.NET大会2020年中国.NET开发者大会PDF资料
http://www.pierceye.com/news/839495/

相关文章:

  • 做公司网站的费用flash交互网站页面切换制作
  • 网络推广渠道有哪些百度手机seo
  • 重庆专业网站建设公司哪家好seo的中文意思是什么
  • 做品牌折扣微信推广的网站网站换主机换域名
  • 营销型网站有哪些建设流程怎样制作免费的网站
  • 天津建设工程计价网站手工加工网
  • 温州做美食网站网站建设的方案模板下载
  • 如何快速网站备案以用户为中心 建设学校网站
  • 宣传型网站有哪些宁波建设信息港网站
  • php网站开发是做什么的phpcms v9企业网站模板(简洁利于优化)
  • 什么是网站和网页wordpress启用插件出错
  • asp网站制作工具怎么样做国际网站生意
  • 签订网站建设合同山东建设工程招标网官方网站
  • 迅速建设企业网站外贸网站服务器选择
  • 建设网站详细流程wordpress建站数据库
  • 贵阳建立网站聊城网站建设设计
  • 网站怎么设置关键词百度网址大全首页设为首页
  • 中企动力网站怎么样怎么做公司内网网站
  • 求职网站网页模板一个网站可以做多少个小程序
  • 深圳市住房和建设局网站登录怎样在百度建网站
  • 外国做视频在线观看网站asp简单网站开发
  • 介绍移动互联网的网站有哪些做网站时怎么选择数据库类型
  • 工厂的网站在哪里做的免费建站的软件
  • 中国电子系统建设三公司网站网站建设上如何提高市场竞争力
  • 青海住房和建设厅网站电子商务网站建设与管理教案
  • 免费在线自助建站搬瓦工可以长期做网站
  • 建设外贸网站报价外贸网站制作推广公司
  • 网站开发人员工作内容白沟做网站
  • 产品展示网站模板源码产品宣传
  • 国内wordpress有名的网站河南住房和城乡建设厅网站资质