当前位置: 首页 > news >正文

温州网站推广公司网站国内服务器租用

温州网站推广公司,网站国内服务器租用,福州市住房和城乡建设局网站,电商怎么做新手入门怎么开店前言 首先说一下延迟队列这个东西#xff0c;实际上实现他的方法有很多#xff0c;kafka实现并不是一个最好的选择#xff0c;例如redis的zset可以实现#xff0c;rocketmq天然的可以实现#xff0c;rabbitmq也可以实现。如果切换前几种方案成本高的情况下#xff0c;那…前言 首先说一下延迟队列这个东西实际上实现他的方法有很多kafka实现并不是一个最好的选择例如redis的zset可以实现rocketmq天然的可以实现rabbitmq也可以实现。如果切换前几种方案成本高的情况下那么就使用kafka实现实际上kafka实现延迟队列也是借用了rocketmq的延迟队列思想rocketmq的延迟时间是固定的几个并不是自定义的但是kafka可以实现自定义的延迟时间但是不能过多因为是依据topic实现的接下来我使用go实现简单的kafka的延迟队列。 实现方案 1、首先创建两个topic、一个delayTopic、一个realTopic 2、生产者把消息先发送到delayTopic 3、延迟服务再把delayTopic里面的消息超过我们所设置的时间写入到realTopic 4、消费者再消费realTopic里面的数据即可 具体实现 1、生产者发送消息到延迟队列 msg : sarama.ProducerMessage{Topic: kafka.DelayTopic,Timestamp: time.Now(),Key: sarama.StringEncoder(rta_key),Value: sarama.StringEncoder(riStr),}partition, offset, err : kafka.KafkaDelayQueue.SendMessage(msg)2、延迟服务的消费者消费延迟队列里面的数据到real队列 const (DelayTime time.Minute * 5DelayTopic delayTopicRealTopic realTopic )// KafkaDelayQueueProducer 延迟队列生产者包含了生产者和延迟服务 type KafkaDelayQueueProducer struct {producer sarama.SyncProducer // 生产者delayTopic string // 延迟服务主题 }// NewKafkaDelayQueueProducer 创建延迟队列生产者 // producer 生产者 // delayServiceConsumerGroup 延迟服务消费者组 // delayTime 延迟时间 // delayTopic 延迟服务主题 // realTopic 真实队列主题 func NewKafkaDelayQueueProducer(producer sarama.SyncProducer, delayServiceConsumerGroup sarama.ConsumerGroup,delayTime time.Duration, delayTopic, realTopic string, log *log) *KafkaDelayQueueProducer {var (signals make(chan os.Signal, 1))signal.Notify(signals, syscall.SIGTERM, syscall.SIGINT, os.Interrupt)// 启动延迟服务consumer : NewDelayServiceConsumer(producer, delayTime, realTopic, log)log.Info([NewKafkaDelayQueueProducer] delay queue consumer start)go func() {for {if err : delayServiceConsumerGroup.Consume(context.Background(),[]string{delayTopic}, consumer); err ! nil {log.Error([NewKafkaDelayQueueProducer] delay queue consumer failed,err: , zap.Error(err))break}time.Sleep(2 * time.Second)log.Info([NewKafkaDelayQueueProducer] 检测消费函数是否一直执行)// 检查是否接收到中断信号如果是则退出循环select {case sin : -signals:consumer.Logger.Info([NewKafkaDelayQueueProducer]get signal,, zap.Any(signal, sin))returndefault:}}log.Info([NewKafkaDelayQueueProducer] consumer func exit)}()log.Info([NewKafkaDelayQueueProducer] return KafkaDelayQueueProducer)return KafkaDelayQueueProducer{producer: producer,delayTopic: delayTopic,} }// SendMessage 发送消息 func (q *KafkaDelayQueueProducer) SendMessage(msg *sarama.ProducerMessage) (partition int32, offset int64, err error) {msg.Topic q.delayTopicreturn q.producer.SendMessage(msg) }// DelayServiceConsumer 延迟服务消费者 type DelayServiceConsumer struct {producer sarama.SyncProducerdelay time.DurationrealTopic stringLogger *log.DomobLog }func NewDelayServiceConsumer(producer sarama.SyncProducer, delay time.Duration,realTopic string, log *log.DomobLog) *DelayServiceConsumer {return DelayServiceConsumer{producer: producer,delay: delay,realTopic: realTopic,Logger: log,} }func (c *DelayServiceConsumer) ConsumeClaim(session sarama.ConsumerGroupSession,claim sarama.ConsumerGroupClaim) error {c.Logger.Info([delaye ConsumerClaim] cc)for message : range claim.Messages() {// 如果消息已经超时把消息发送到真实队列now : time.Now()c.Logger.Info([delay ConsumeClaim] out,zap.Any(send real topic res, now.Sub(message.Timestamp) c.delay),zap.Any(message.Timestamp, message.Timestamp),zap.Any(c.delay, c.delay),zap.Any(claim.Messages len, len(claim.Messages())),zap.Any(sub:, now.Sub(message.Timestamp)),zap.Any(meskey:, message.Key),zap.Any(message:, string(message.Value)),)if now.Sub(message.Timestamp) c.delay {c.Logger.Info([delay ConsumeClaim] jinlai, zap.Any(mes, string(message.Value)))_, _, err : c.producer.SendMessage(sarama.ProducerMessage{Topic: c.realTopic,Timestamp: message.Timestamp,Key: sarama.ByteEncoder(message.Key),Value: sarama.ByteEncoder(message.Value),})if err ! nil {c.Logger.Info([delay ConsumeClaim] delay already send to real topic failed, zap.Error(err))return nil}if err nil {session.MarkMessage(message, )c.Logger.Info([delay ConsumeClaim] delay already send to real topic success)continue}}// 否则休眠一秒time.Sleep(time.Second)return nil}c.Logger.Info([delay ConsumeClaim] ph,zap.Any(partitiion, claim.Partition()),zap.Any(HighWaterMarkOffset, claim.HighWaterMarkOffset()))c.Logger.Info([delay ConsumeClaim] delay consumer end)return nil }func (c *DelayServiceConsumer) Setup(sarama.ConsumerGroupSession) error {return nil }func (c *DelayServiceConsumer) Cleanup(sarama.ConsumerGroupSession) error {return nil } 这个方法整体逻辑就是不断消费延迟队列里面的消息判断消息时间是否大于现在如果大于现在说明消息超时了就把该消息发送到真实的队列里面去了真实队列是一直在消费的。如果没超时的话就不会标记消息还会重新消费消费成功会标记该消息。 重点我在测试的时候是一秒拉一次消息但这个也不是太准时不过最终结果差距不大想知道具体怎么消费的可以自己debug 3、真实队列里面的消费逻辑 type ConsumerRta struct {Logger *log }func ConsumerToRequestRta(consumerGroup sarama.ConsumerGroup, lg *log) {var (signals make(chan os.Signal, 1)wg sync.WaitGroup{})signal.Notify(signals, syscall.SIGTERM, syscall.SIGINT, os.Interrupt)wg.Add(1)// 启动消费者协程go func() {defer wg.Done()consumer : NewConsumerRta(lg)consumer.Logger.Info([ConsumerToRequestRta] consumer group start)// 执行消费者组消费for {if err : consumerGroup.Consume(context.Background(), []string{kafka.RealTopic}, consumer); err ! nil {consumer.Logger.Error([ConsumerToRequestRta] Error from consumer group:, zap.Error(err))break}time.Sleep(2 * time.Second) // 等待一段时间后重试// 检查是否接收到中断信号如果是则退出循环select {case sin : -signals:consumer.Logger.Info(get signal,, zap.Any(signal, sin))returndefault:}}}()wg.Wait()lg.Info([ConsumerToRequestRta] consumer end exit) }func NewConsumerRta(lg *log) *ConsumerRta {return ConsumerRta{Logger: lg,} }func (c *ConsumerRta) ConsumeClaim(session sarama.ConsumerGroupSession,claim sarama.ConsumerGroupClaim) error {for message : range claim.Messages() {// 消费逻辑session.MarkMessage(message, )return nil}return nil }func (c *ConsumerRta) Setup(sarama.ConsumerGroupSession) error {return nil }func (c *ConsumerRta) Cleanup(sarama.ConsumerGroupSession) error {return nil } 4、kafka配置 type KafkaConfig struct {BrokerList []stringTopic []stringGroupId []stringCfg *sarama.ConfigPemPath stringKeyPath stringCaPemPath string }var (Producer sarama.SyncProducerConsumerGroupReal sarama.ConsumerGroupConsumerGroupDelay sarama.ConsumerGroupKafkaDelayQueue *KafkaDelayQueueProducer )func NewKafkaConfig(cfg KafkaConfig) (err error) {Producer, err sarama.NewSyncProducer(cfg.BrokerList, cfg.Cfg)if err ! nil {return err}ConsumerGroupReal, err sarama.NewConsumerGroup(cfg.BrokerList, cfg.GroupId[0], cfg.Cfg)if err ! nil {return err}ConsumerGroupDelay, err sarama.NewConsumerGroup(cfg.BrokerList, cfg.GroupId[1], cfg.Cfg)if err ! nil {return err}return nil }func GetKafkaDelayQueue(log *log) {KafkaDelayQueue NewKafkaDelayQueueProducer(Producer, ConsumerGroupDelay, DelayTime, DelayTopic, RealTopic, log) }这个里面我没有怎么封装可以自行封装使用的是IBM的sarama客户端 总结 基本上就是以上三步实现里面的一些log日志可以传递自己的log日志即可使用的是消费者组消费的添加上自己的topic和groupid即可 重点以上实现延迟时间可能不是太精准我使用的时候还是有点小小的误差不过误差不大强相关业务还是使用其他专业实现延迟队列mq或使用自行方案
http://www.pierceye.com/news/336761/

相关文章:

  • 企业自助建站网手机怎么制作钓鱼网站
  • 家乡ppt模板免费下载网站x wordpress 视差 主题
  • 淄博张店外贸建站公司手机微信网页版
  • 网站建设全域云网站建设流程详解
  • 梅州市五华县建设银行网站写作网站招聘
  • 博物馆网站建设情况工业互联网龙头公司排名
  • 做网站用什么系统做网站开发电脑配置
  • 企业网站推广的主要方法上海中汇建设发展有限公司网站
  • 郑州做网站公司电话网站是否有管理员权限
  • 开发建设信息的网站广东省建设厅的注册中心网站首页
  • 用cms做的网站 的步骤有域名如何做网站
  • h5个人网站源码江苏启安建设集团有限公司网站
  • 网站开发net教程网站后台登陆路径
  • 织梦网站模板安装教程国外设计有名网站
  • 最专业企业营销型网站建设南充 网站开发
  • 国外有哪些网站做推广的比较好北京展览馆网站建设
  • 国外英语写作网站网站后台 刷新
  • 如何制作自己的网站详情页设计
  • 南京免费自助建站模板wordpress 增加侧边栏
  • 做信息分类网站难吗广告设计公司有哪些
  • 做seo网站优化多少钱网站开发客户哪里找
  • 做网站一定要云解析吗海南公司注册网站
  • 建站之家官网办公装修设计
  • 永康网站建设的公司wordpress 图片分类
  • 网站商务通弹出窗口图片更换设置wordpress4.9 多站点
  • 如何仿制一个网站注册商标设计
  • 网站建设属于什么岗位旅游网站设计模板
  • 自己做的网站怎么链接火车头采集软件开发模型是什么
  • 新网站怎么做才会被收录正品海外购网站有哪些
  • 广东手机网站建设品牌js制作网页计算器