当前位置: 首页 > news >正文

互联网App网站建设方案建筑人才网评的助工

互联网App网站建设方案,建筑人才网评的助工,网站模板 asp pc wap,电子商务网站建设技能论文kafka-顺序消息实现 场景 在购物付款的时候#xff0c;订单会有不同的订单状态#xff0c;对应不同的状态事件#xff0c;比如#xff1a;待支付#xff0c;支付成功#xff0c;支付失败等等#xff0c;我们会将这些消息推送给消息队列 #xff0c;后续的服务会根据订…kafka-顺序消息实现 场景 在购物付款的时候订单会有不同的订单状态对应不同的状态事件比如待支付支付成功支付失败等等我们会将这些消息推送给消息队列 后续的服务会根据订单状态进行不同的业务处理这就要求订单状态推送就要有状态的保证 解决方案 生产者将相同的key的订单状态事件推送到kafka的同一分区kafka 消费者接收消息消费者将消息提交给线程池线程池根据接收到的消息将订单状态事件使用路由策略选择其中一个线程将具有相同路由key的事件发送到同一个线程的阻塞队列中单个线程不停的从阻塞队列获取订单状态消息消费 代码实现 引入依赖 parentgroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-parent/artifactIdversion3.2.2/versionrelativePath/ !-- lookup parent from repository -- /parent groupIdcom.example/groupId artifactIdboot-kafka/artifactId version0.0.1-SNAPSHOT/version nameboot-kafka/name descriptionboot-kafka/description propertiesjava.version17/java.version /properties dependenciesdependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependencydependencygroupIdorg.springframework.kafka/groupIdartifactIdspring-kafka/artifactId/dependencydependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactIdoptionaltrue/optional/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactIdscopetest/scope/dependencydependencygroupIdorg.springframework.kafka/groupIdartifactIdspring-kafka-test/artifactIdscopetest/scope/dependencydependencygroupIdcom.alibaba/groupIdartifactIdfastjson/artifactIdversion2.0.39/version/dependency /dependencies 使用到的DTO Data public class InterOrderDto extends OrderDto implements OrderMessage{/*** 属于哪个分区*/private String partition;Overridepublic String getUniqueNo() {return getOrderNo();} }Data public class InterOrderDto extends OrderDto implements OrderMessage{/*** 属于哪个分区*/private String partition;Overridepublic String getUniqueNo() {return getOrderNo();} }public interface OrderMessage {/*** 线程池路由key* return*/String getUniqueNo();}定义topic 这里是 3个分区2个副本 Configuration public class KafkaConfiguration {Beanpublic NewTopic topic(){return new NewTopic(Constants.TOPIC_ORDER,3,(short) 2);} }public interface Constants {String TOPIC_ORDER order; }消费者 消费者OrderListener Component Slf4j public class OrderListener {Autowiredprivate OrderThreadPoolOrderWorker, InterOrderDto orderThreadPool;KafkaListener(topics Constants.TOPIC_ORDER, groupId orderGroup, concurrency 3)public void logListener(ConsumerRecordString, String record) {log.debug( receive log event: {}-{}, record.partition(), record.value());try {OrderDto orderDto JSON.parseObject(record.value(), OrderDto.class);InterOrderDto interOrderDto new InterOrderDto();BeanUtils.copyProperties(orderDto, interOrderDto);interOrderDto.setPartition(record.partition() );orderThreadPool.dispatch(interOrderDto);} catch (Exception e) {log.error(# kafka log listener error: {}, record.value(), e);}}}线程池 OrderThreadPool /*** Date: 2024/1/24 10:23* 线程池实现** param W: worker* param D: message*/ Slf4j public class OrderThreadPoolW extends SingleThreadWorkerD, D extends OrderMessage {private ListW workers;private int size;public OrderThreadPool(int size, SupplierW provider) {this.size size;workers new ArrayList(size);for (int i 0; i size; i) {workers.add(provider.get());}if (CollectionUtils.isEmpty(workers)) {throw new RuntimeException(worker size is 0);}start();}/*** route message to single thread** param data*/public void dispatch(D data) {W w getUniqueQueue(data.getUniqueNo());w.offer(data);}private W getUniqueQueue(String uniqueNo) {int queueNo uniqueNo.hashCode() % size;for (W worker : workers) {if (queueNo worker.getQueueNo()) {return worker;}}throw new RuntimeException(worker 路由失败);}/*** start worker, only start once*/private void start() {for (W worker : workers) {new Thread(worker, OWorder- worker.getQueueNo()).start();}}/*** 关闭所有 workder, 等待所有任务执行完*/public void shutdown() {for (W worker : workers) {worker.shutdown();}}} 工作线程SingleThreadWorker, 内部使用阻塞队列使其串行化 /*** Date: 2024/1/24 10:58* single thread with a blocking-queue*/ Slf4j public abstract class SingleThreadWorkerT implements Runnable {private static AtomicInteger cnt new AtomicInteger(0);private BlockingQueueT queue;private boolean started true;/*** worker 唯一id*/Getterprivate int queueNo;public SingleThreadWorker(int size) {this.queue new LinkedBlockingQueue(size);this.queueNo cnt.getAndIncrement();log.info(init worker {}, this.queueNo);}/*** 提交消息** param data*/public void offer(T data) {try {queue.put(data);} catch (InterruptedException e) {log.info({} offer error: {}, Thread.currentThread().getName(), JSON.toJSONString(data), e);}}Overridepublic void run() {log.info({} worker start take , Thread.currentThread().getName());while (started) {try {T data queue.take();doConsumer(data);} catch (InterruptedException e) {log.error(queue take error, e);}}}/*** do real consume message** param data*/protected abstract void doConsumer(T data);/*** consume rest of message in the queue when thread-pool shutdown*/public void shutdown() {this.started false;ArrayListT rest new ArrayList();int i queue.drainTo(rest);if (i 0) {log.info({} has rest in queue {}, Thread.currentThread().getName(), i);for (T t : rest) {doConsumer(t);}}}} 工作线程实现OrderWorker, 这里就单独处理订单事件 /*** Date: 2024/1/24 13:42* 具体消费者*/ Slf4j public class OrderWorker extends SingleThreadWorkerInterOrderDto{public OrderWorker(int size) {super(size);}Overrideprotected void doConsumer(InterOrderDto data) {log.info({} consume msg: {}, Thread.currentThread().getName(), JSON.toJSONString(data));} }生产者 生产者OrderController, 模拟发送不同的事件类型的订单 RestController public class OrderController {Autowiredprivate KafkaTemplateString, String kafkaTemplate;GetMapping(/send)public String send() throws InterruptedException {int size 1000;for (int i 0; i size; i) {OrderDto orderDto new InterOrderDto();orderDto.setOrderNo(i );orderDto.setPayStatus(getStatus(0));orderDto.setTimestamp(System.currentTimeMillis());//相同的key发送到相同的分区kafkaTemplate.send(Constants.TOPIC_ORDER, orderDto.getOrderNo(), JSON.toJSONString(orderDto));TimeUnit.MILLISECONDS.sleep(10);orderDto.setPayStatus(getStatus(1));orderDto.setTimestamp(System.currentTimeMillis());kafkaTemplate.send(Constants.TOPIC_ORDER, orderDto.getOrderNo(), JSON.toJSONString(orderDto));TimeUnit.MILLISECONDS.sleep(10);orderDto.setPayStatus(getStatus(2));orderDto.setTimestamp(System.currentTimeMillis());kafkaTemplate.send(Constants.TOPIC_ORDER, orderDto.getOrderNo(), JSON.toJSONString(orderDto));}return success;}private String getStatus(int status){return status 0 ? 待支付 : status 1 ? 已支付 : 支付失败;} }application.properties 配置 # kafka地址 spring.kafka.bootstrap-servers192.168.x.x:9092 spring.kafka.producer.key-serializerorg.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializerorg.apache.kafka.common.serialization.StringSerializer启动类 Slf4j SpringBootApplication public class BootKafkaApplication {public static void main(String[] args) {SpringApplication.run(BootKafkaApplication.class, args);}/*** 配置线程池* return*/Beanpublic OrderThreadPoolOrderWorker, InterOrderDto orderThreadPool(){OrderThreadPoolOrderWorker, InterOrderDto threadPool new OrderThreadPool(3, () - new OrderWorker(100));Runtime.getRuntime().addShutdownHook(new Thread(() - {log.info(shutdown orderThreadPool);//容器关闭时让工作线程中的任务都被消费完threadPool.shutdown();}));return threadPool;}}测试 访问: http://localhost:8080/send 结果: OWorder-0 worker start take OWorder-0 consume msg: {orderNo:0,partition:2,payStatus:待支付,timestamp:1706084482134,uniqueNo:0} OWorder-0 consume msg: {orderNo:0,partition:2,payStatus:已支付,timestamp:1706084482271,uniqueNo:0} OWorder-0 consume msg: {orderNo:0,partition:2,payStatus:支付失败,timestamp:1706084482282,uniqueNo:0} OWorder-0 consume msg: {orderNo:3,partition:2,payStatus:待支付,timestamp:1706084482326,uniqueNo:3} OWorder-0 consume msg: {orderNo:3,partition:2,payStatus:已支付,timestamp:1706084482336,uniqueNo:3} OWorder-0 consume msg: {orderNo:3,partition:2,payStatus:支付失败,timestamp:1706084482347,uniqueNo:3} OWorder-0 consume msg: {orderNo:6,partition:1,payStatus:待支付,timestamp:1706084482391,uniqueNo:6} OWorder-0 consume msg: {orderNo:6,partition:1,payStatus:已支付,timestamp:1706084482401,uniqueNo:6} OWorder-0 consume msg: {orderNo:6,partition:1,payStatus:支付失败,timestamp:1706084482412,uniqueNo:6}可以发现在我们工作线程中事件消费是有序的 good luck!
http://www.pierceye.com/news/613387/

相关文章:

  • 上海做网站搜索一下马来西亚的网站建设的竞争对手的分析
  • 建站优化易下拉系统163邮箱登录注册
  • c 做网站电子商务平台中搜索词拆解包括
  • 腾讯云10g数字盘做网站够么四川省建设人才网
  • 批量 网站标题中海园林建设有限公司网站
  • 鲜花网站数据库建设免费律师咨询
  • 团队网站建设哪家便宜制作公司网站流程
  • 青龙桥网站建设企业网页是什么
  • 上海网站建设备案号怎么恢复法律咨询网站开发
  • 烟台做网站价格动力网站建设
  • 北戴河网站建设墨刀制作网页教程
  • 成都网站设计开发做得好微信商城怎么开发
  • 江西省城乡建设培训网-官方网站上海建设集团有限公司
  • 凡科网站设计模板grimhelm wordpress
  • 自己做的网站不备案行吗建筑工程集团有限公司
  • 网站初期 权重怎么做彩票类网站开发
  • 南通网站定制公司服务器网站建设维护合同
  • 亳州做商标网站的公司免费的网站模板
  • 西南城乡建设部网站首页python3做网站教程
  • 网站首页设计欣赏个人电影网站建设
  • 导航网站建设怎么给网站图片加alt
  • 备案成功后怎么建设网站宠物喂养网页设计模板以及代码
  • 东莞哪家网站建设比较好wordpress更改语言设置
  • 如何找做网站的客户wordpress适合视频网站吗
  • 网站建设的业务流程图拔萝卜视频播放在线观看免费
  • 建个网站要多少钱高安网站制作
  • dw设计模板百度ocpc如何优化
  • 苏宁网站优化与推广html教程网站
  • 怎么做网站网页免费高清屏幕录像
  • 网络推广哪个网站好亚马逊网站开发使用的什么方式