当前位置: 首页 > news >正文

微信小程序怎拼做搬家网站免费咨询英文

微信小程序怎拼做搬家网站,免费咨询英文,制作百度移动网站模板,网络程序员MQTT#xff08;Message Queuing Telemetry Transport#xff09;是一种轻量级的发布/订阅消息传输协议。但是目前虽然mqtt的客户端很多#xff0c;但是服务端着实不多#xff0c;常见的服务端如mosquitto或emqx。但是golang语言的实现几乎找不到。golang的轻量级部署和高并… MQTTMessage Queuing Telemetry Transport是一种轻量级的发布/订阅消息传输协议。但是目前虽然mqtt的客户端很多但是服务端着实不多常见的服务端如mosquitto或emqx。但是golang语言的实现几乎找不到。golang的轻量级部署和高并发高性能很合适做mqtt Broker。本文将详细介绍如何使用 Go 语言实现一个简单轻量级且高性能的 MQTT Broker并涵盖MQTT3.1.1协议的核心特性和完整功能。 1. 需求分析 本文选择golang语言实现一个完整的 MQTT 3.1.1 Broker不涉及集群支持和协议版本检测。简单且轻量级不但可以替代mosquitto后续还可以灵活的做扩展如增加webUI的管理界面。且部署也很简单一个exe可执行文件。 完整项目开源地址https://github.com/yangyongzhen/goang-mqtt-broker gitee: https://gitee.com/yyz116/goang-mqtt-broker 可执行文件在release目录下。 1.1 实现效果截图 服务启动 客户端发布 客户端订阅 使用mosquitto客户端测试效果 优化增加基于redis的持久化存储 可在etc/config.yaml文件中配置是否启用redis的持久化。默认基于内存。 windows下的可执行文件仅有7M左右大小简单小巧。且代码开源方便定制。可以作为替代mosquitto的另外一种选择。 1.2 功能特性 1.2.1核心功能 ✅ 完整的 MQTT 3.1.1 协议支持✅ QoS 0, 1, 2 消息传递保证✅ 会话管理持久会话和清理会话✅ 保留消息Retained Messages✅ 遗嘱消息Last Will and Testament✅ 主题通配符 和 # 通配符支持✅ 客户端认证用户名/密码✅ 保活机制Keep Alive✅ 并发安全 1.2.2 架构特性 ️ 模块化设计易于扩展 可插拔存储接口 线程安全的并发处理 内置监控指标 Docker 支持 2. 项目架构设计 架构设计 数据流 客户端连接 → TCP Server 接受连接 协议解析 → Client 解析 MQTT 数据包 认证验证 → Auth 模块验证用户凭据 会话管理 → Storage 加载/保存会话信息 消息路由 → Broker 根据订阅关系路由消息 主题匹配 → Topic Manager 处理通配符匹配 2.1 目录结构 mqtt-broker/ ├── README.md ├── Makefile ├── Dockerfile ├── go.mod ├── go.sum ├── cmd/ │ ├── broker/ │ │ └── main.go │ └── test-client/ │ └── main.go ├── internal/ │ ├── auth/ │ │ └── auth.go │ ├── broker/ │ │ ├── broker.go │ │ ├── client.go │ │ └── topic.go │ ├── protocol/ │ │ ├── common/ │ │ │ └── types.go │ │ └── mqtt311/ │ │ └── packet.go │ └── storage/ │ ├── interface.go │ └── memory/ │ └── store.go └── pkg/└── mqtt/└── packet.go2.2 主要模块 cmd/broker/main.go程序入口。internal/broker/Broker 核心逻辑包括连接管理、消息路由等。internal/storage/存储接口和内存实现。pkg/mqtt/packet.goMQTT 数据包编码和解码。 3. 核心实现 3.1 存储接口 在 internal/storage/interface.go 文件中定义存储接口 package storageimport (github.com/yangyongzhen/mqtt-broker/internal/protocol/common )type Store interface {SaveSession(clientID string, session *Session) errorLoadSession(clientID string) (*Session, error)DeleteSession(clientID string) errorSaveMessage(clientID string, message *common.Message) errorLoadMessages(clientID string) ([]*common.Message, error)DeleteMessage(clientID string, packetID uint16) errorSaveRetainedMessage(topic string, message *common.Message) errorLoadRetainedMessage(topic string) (*common.Message, error)DeleteRetainedMessage(topic string) errorSaveSubscription(clientID string, subscription *common.Subscription) errorLoadSubscriptions(clientID string) ([]*common.Subscription, error)DeleteSubscription(clientID string, topic string) error }type Session struct {ClientID stringCleanSession boolSubscriptions map[string]*common.SubscriptionPendingAcks map[uint16]*common.MessageLastSeen time.Time }3.2 内存存储实现 在 internal/storage/memory/store.go 文件中实现内存存储 package memoryimport (syncgithub.com/yangyongzhen/mqtt-broker/internal/storagegithub.com/yangyongzhen/mqtt-broker/internal/protocol/common )type MemoryStore struct {sessions map[string]*storage.SessionretainedMsgs map[string]*common.MessageclientMessages map[string][]*common.Messagemu sync.RWMutex }func NewMemoryStore() *MemoryStore {return MemoryStore{sessions: make(map[string]*storage.Session),retainedMsgs: make(map[string]*common.Message),clientMessages: make(map[string][]*common.Message),} }func (m *MemoryStore) SaveSession(clientID string, session *storage.Session) error {m.mu.Lock()defer m.mu.Unlock()m.sessions[clientID] sessionreturn nil }func (m *MemoryStore) LoadSession(clientID string) (*storage.Session, error) {m.mu.RLock()defer m.mu.RUnlock()session, exists : m.sessions[clientID]if !exists {return nil, nil}return session, nil }// 其他方法省略...3.3 客户端连接管理 在 internal/broker/client.go 文件中实现客户端连接管理 package brokerimport (bufiofmtnetsynctimegithub.com/yangyongzhen/mqtt-broker/internal/protocol/commongithub.com/yangyongzhen/mqtt-broker/internal/protocol/mqtt311github.com/yangyongzhen/mqtt-broker/internal/storagegithub.com/yangyongzhen/mqtt-broker/pkg/mqtt )type Client struct {conn net.ConnclientID stringinfo *common.ClientInfosession *storage.Sessionbroker *BrokerpacketReader *mqtt.PacketReaderwriteChan chan []bytecloseChan chan struct{}keepAliveTimer *time.Timermu sync.RWMutexconnected boolnextPacketID uint16pendingAcks map[uint16]*PendingMessage }type PendingMessage struct {Message *common.MessageTimestamp time.TimeRetries int }func NewClient(conn net.Conn, broker *Broker) *Client {return Client{conn: conn,broker: broker,packetReader: mqtt.NewPacketReader(conn),writeChan: make(chan []byte, 1000),closeChan: make(chan struct{}),pendingAcks: make(map[uint16]*PendingMessage),nextPacketID: 1,} }func (c *Client) Start() {go c.readLoop()go c.writeLoop()go c.retryLoop() }func (c *Client) readLoop() {defer c.Close()for {select {case -c.closeChan:returndefault:packet, err : c.packetReader.ReadPacket()if err ! nil {fmt.Printf(Read packet error: %v\n, err)return}c.handlePacket(packet)}} }func (c *Client) writeLoop() {defer c.Close()for {select {case data : -c.writeChan:if _, err : c.conn.Write(data); err ! nil {fmt.Printf(Write error: %v\n, err)return}case -c.closeChan:return}} }func (c *Client) retryLoop() {ticker : time.NewTicker(5 * time.Second)defer ticker.Stop()for {select {case -ticker.C:c.retryPendingMessages()case -c.closeChan:return}} }func (c *Client) handlePacket(packet common.Packet) {switch p : packet.(type) {case *mqtt311.ConnectPacket:c.handleConnect(p)case *mqtt311.PublishPacket:c.handlePublish(p)case *mqtt311.SubscribePacket:c.handleSubscribe(p)case *mqtt311.UnsubscribePacket:c.handleUnsubscribe(p)case *mqtt311.PingreqPacket:c.handlePingReq()case *mqtt311.DisconnectPacket:c.handleDisconnect()} }// handleConnect, handlePublish 等其他方法省略...3.4 主 Broker 实现 在 internal/broker/broker.go 文件中实现主 Broker 的逻辑 package brokerimport (fmtnetsynctimegithub.com/yangyongzhen/mqtt-broker/internal/authgithub.com/yangyongzhen/mqtt-broker/internal/protocol/commongithub.com/yangyongzhen/mqtt-broker/internal/storage )type Broker struct {listener net.Listenerclients map[string]*ClienttopicManager *TopicManagerstore storage.Storeauth auth.Authenticatormu sync.RWMutexrunning boolconfig *Config }type Config struct {MaxConnections intMaxMessageSize intRetainedMsgLimit intSessionExpiry time.DurationMessageExpiry time.Duration }func NewBroker(store storage.Store, authenticator auth.Authenticator) *Broker {return Broker{clients: make(map[string]*Client),topicManager: NewTopicManager(),store: store,auth: authenticator,config: Config{MaxConnections: 10000,MaxMessageSize: 1024 * 1024,RetainedMsgLimit: 10000,SessionExpiry: 24 * time.Hour,MessageExpiry: 24 * time.Hour,},} }func (b *Broker) Start(address string) error {listener, err : net.Listen(tcp, address)if err ! nil {return err}b.listener listenerb.running truefmt.Printf(MQTT Broker started on %s\n, address)for b.running {conn, err : listener.Accept()if err ! nil {if b.running {fmt.Printf(Accept error: %v\n, err)}continue}client : NewClient(conn, b)go client.Start()}return nil }func (b *Broker) Stop() {b.running falseif b.listener ! nil {b.listener.Close()}b.mu.Lock()defer b.mu.Unlock()for _, client : range b.clients {client.Close()} }// AddClient, RemoveClient, PublishMessage 等其他方法省略...3.5 主程序入口 在 cmd/broker/main.go 文件中定义主程序入口 package mainimport (flagfmtlogosos/signalsyscallgithub.com/yangyongzhen/mqtt-broker/internal/authgithub.com/yangyongzhen/mqtt-broker/internal/brokergithub.com/yangyongzhen/mqtt-broker/internal/storage/memory )func main() {addr : flag.String(addr, :1883, MQTT broker address)flag.Parse()authenticator : auth.NewSimpleAuthenticator() // 示例认证器需要自行实现store : memory.NewMemoryStore()b : broker.NewBroker(store, authenticator)go func() {if err : b.Start(*addr); err ! nil {log.Fatalf(Failed to start MQTT broker: %v, err)}}()sigChan : make(chan os.Signal, 1)signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)-sigChanb.Stop()fmt.Println(MQTT broker stopped) }以上代码是实现一个简单的 MQTT Broker 的基础框架更多详细功能和性能优化可以根据实际需求进行扩展和改进。 安装和运行 克隆项目 git clone your-repo-url cd mqtt-brokergo mod tidy安装依赖 go mod tidy构建项目 make build或者 go build -o bin/mqtt-broker cmd/broker/main.go 运行 Brokermake run或者 ./bin/mqtt-broker -addr:1883 -debug使用 Docker 构建镜像 docker build -t mqtt-broker . #### 运行容器 docker run -p 1883:1883 mqtt-broker #### 使用示例**启动 Broker** #### 默认端口 1883 go run cmd/broker/main.go ##### 自定义端口和调试模式 go run cmd/broker/main.go -addr:1883 -debug测试客户端 项目包含一个简单的测试客户端可以用来测试 broker 功能 订阅消息 go run cmd/test-client/main.go -modesub -topictest/hello -clientsubscriber1发布消息 go run cmd/test-client/main.go -modepub -topictest/hello -msgHello MQTT! -clientpublisher1使用第三方客户端 你也可以使用任何标准的 MQTT 客户端连接到 broker 使用 mosquitto 客户端 订阅 mosquitto_sub -h localhost -p 1883 -t test/topic发布 mosquitto_pub -h localhost -p 1883 -t test/topic -m Hello World使用认证 默认用户 admin/password, test/test123 mosquitto_pub -h localhost -p 1883 -u admin -P password -t “test/topic” -m “Authenticated message” 配置说明 命令行参数 参数 默认值 说明 -addr :1883 Broker 监听地址 -debug false 启用调试日志 内置用户 Broker 默认创建了以下测试用户 用户名 密码 admin password test test123 项目开源地址 https://github.com/yangyongzhen/goang-mqtt-broker gitee: https://gitee.com/yyz116/goang-mqtt-broker 作者 作者csdn猫哥转载请注明出处: https://blog.csdn.net/yyz_1987
http://www.pierceye.com/news/838141/

相关文章:

  • 尚品中国多年专注于高端网站建设免费加盟无需店面
  • 游标卡尺 东莞网站建设wordpress 域名解析
  • 站长工具视频怎么开免费网站
  • 网站地址怎么申请注册最近新闻大事
  • interidea 做网站网站域名备案 更改
  • 哈尔滨公司做网站动画设计的大学排名
  • 网站建设与网页制作试卷网站搜索引擎优化推广
  • 网站子目录设计网站开发数据库技术
  • 可以做音基题的音乐网站上海网站设计公司有哪些
  • 昆明做网站公司做家居的网站
  • 网站建设首选易网宣软文代发
  • 手机版网站开发用什么语言自己建设的网站怎么赚钱
  • 宁波建设监理协会网站软件开发文档的作用
  • 兰州电商平台网站建设网路神做网站怎么样
  • 诸城易讯网站建设服务中心网址的域名
  • 制作商务网站应用公园制作app软件下载
  • 情头定制网站被称为网站开发神器
  • 宝安网站设计案例淘宝页面制作
  • 天津品牌网站制作怎样建设网站流程
  • 怎样进行公司网站建设wordpress主题公司
  • 外宣做网站宣传网站功能描述
  • 部队网站建设多少钱营销自己的网站
  • 长春市城乡建设部网站南昌诚推网络技术有限公司
  • 网站 建设 欢迎你濮阳家电网站建设
  • 怎么快速建立一个网站如何用腾讯云服务器搭建wordpress
  • 五屏网站建设多少钱深圳网站公司有哪些
  • 莆田网站建站wordpress cd
  • 软件下载安装免费南京seo关键词优化服务
  • 广州网站设计软件建设将网站加入受信网站再试
  • 淘宝联盟网站备案常见的互联网应用