ppt做视频的模板下载网站有哪些,网页qq登录保护在哪里,vs怎么做网站的首页,wordpress 如何采集golang开源的可嵌入应用程序高性能的MQTT服务
什么是MQTT#xff1f;
MQTT#xff08;Message Queuing Telemetry Transport#xff09;是一种轻量级的、开放的消息传输协议#xff0c;设计用于在低带宽、高延迟或不可靠的网络环境中进行通信。MQTT最初由IBM开发#xf…golang开源的可嵌入应用程序高性能的MQTT服务
什么是MQTT
MQTTMessage Queuing Telemetry Transport是一种轻量级的、开放的消息传输协议设计用于在低带宽、高延迟或不可靠的网络环境中进行通信。MQTT最初由IBM开发现已成为OASIS标准。 MQTT的设计目标是提供一种简单、轻量、可扩展的协议适用于各种设备和网络条件。它通常用于物联网IoT和传感器网络其中设备需要以有效的方式进行通信并且资源如带宽和电池寿命可能受到限制。 MQTT的简单设计和适用性使其成为物联网中常用的通信协议之一。它被广泛用于传感器网络、嵌入式设备、移动应用程序和其他场景中提供了一种可靠、高效的消息传输机制。
什么是Mochi-MQTT
源代码地址https://github.com/mochi-mqtt/server
Mochi MQTT 是一个完全兼容 MQTT v5的可嵌入的中间件/服务器完全使用 Go 语言编写旨在用于遥测和物联网项目的开发。它可以作为独立的二进制文件使用也可以嵌入到你自己的应用程序中库来使用经过提出的设计以实现问题的轻量化和快速部署同时也非常重视代码的质量和可维护性。
用途
物联网项目开发时常常需要使用MQTT协议对设备接入在很多场景中私有化部署物联网系统时资源比较少性能要求高一些大型的MQTT服务不满足要求而且代码不可控。 还有在边缘场景下需要在边缘网关边缘控制器设备上部署物联网系统但是边缘网关的资源很少内存大约只有4G所以使用java开发的物联网系统就很难部署上去使用C/C开发效率又很低所以Go语言是最合适的 Mochi-MQTT刚好又完全是Go编写的开源的可以嵌入到自己的程序启动。
Mochi MQTT独立部署
Golang的环境配置这里不做说明请看我前面的博文说明
Mochi MQTT 可以作为独立的中间件使用。只需拉取此仓库代码然后在 cmd 文件夹中运行 cmd/main.go 默认将开启下面几个服务端口 tcp (:1883)、websocket (:1882) 和服务状态监控 (:8080) 。
cd cmd
go build -o mqtt ./mqttdocker部署
可以从 Docker Hub 仓库中拉取并运行Mochi MQTT官方镜像
docker pull mochimqtt/server
或者
docker run mochimqtt/server也提供了一个简单的 Dockerfile用于运行 cmd/main.go 中的 Websocket(:1882)、TCP(:1883) 和服务端状态信息(:8080)这三个服务监听
docker build -t mochi:latest .
docker run -p 1883:1883 -p 1882:1882 -p 8080:8080 mochi:latest嵌入自己项目运行和开发
下载Mochi MQTT包
go get github.com/mochi-mqtt/server/v2将Mochi MQTT作为包导入使用, 示例代码如下
import (mqttServer github.com/mochi-mqtt/server/v2github.com/mochi-mqtt/server/v2/listenersgithub.com/mochi-mqtt/server/v2/packets
)var Server *mqttServer.Serverfunc ServerMqttInit() {// 创建新的 MQTT 服务器。Server mqttServer.New(mqttServer.Options{InlineClient: true, // 启动内联客户端})// 初始化数据库实例edge : edgeHook{deviceDao: deviceDao.NewDeviceRepository(),productDao: productDao.NewProductRepository(),}// 添加自定义权限方法err : Server.AddHook(edge, nil)if err ! nil {log.Fatal(err)}// 在1883端口上创建一个 TCP 服务端。tcp : listeners.NewTCP(t1, :1883, nil)err Server.AddListener(tcp)if err ! nil {log.Fatal(err)}// 在1882端口上创建一个 Websocket 服务端。ws : listeners.NewWebsocket(ws1, :1882, nil)err server.AddListener(ws)if err ! nil {log.Fatal(err)}go func() {err : Server.Serve()if err ! nil {log.Fatal(err)}}()
}type edgeHook struct {mqttServer.HookBasedeviceDao deviceDao.DeviceRepositoryproductDao productDao.ProductRepository
}func (h *edgeHook) ID() string {return mqtt-auth
}func (h *edgeHook) Provides(b byte) bool {// 实现钩子函数return bytes.Contains([]byte{//MQTT连接时认证. 当用户尝试与服务器进行身份验证时调用。mqttServer.OnConnectAuthenticate,//MQTT topic权限控制. 当用户尝试发布或订阅主题时调用用来检测ACL规则。mqttServer.OnACLCheck,//在新客户端连接并进行身份验证后会立即调用此方法并在会话建立和发送CONNACK之前立即调用。mqttServer.OnSessionEstablish,//当客户端因任何原因断开连接时调用。mqttServer.OnDisconnect,//当客户端向订阅者发布消息后调用。mqttServer.OnPublished,}, []byte{b})
}// OnConnectAuthenticate MQTT连接时认证. 当用户尝试与服务器进行身份验证时调用。
func (h *edgeHook) OnConnectAuthenticate(cl *mqttServer.Client, pk packets.Packet) bool {username : string(pk.Connect.Username)password : string(pk.Connect.Password)if username || len(username) 0 {return false}if password || len(password) 0 {return false}return true
}// OnACLCheck MQTT topic权限控制. 当用户尝试发布或订阅主题时调用用来检测ACL规则。
func (h *edgeHook) OnACLCheck(cl *mqttServer.Client, topic string, write bool) bool {username : string(cl.Properties.Username)if username || len(username) 0 {return false}if topic || len(topic) 0 {return false}return true
}// OnSessionEstablish 在新客户端连接并进行身份验证后会立即调用此方法并在会话建立和发送CONNACK之前立即调用。
func (h *edgeHook) OnSessionEstablish(cl *mqttServer.Client, pk packets.Packet) {username : string(cl.Properties.Username)if username || len(username) 0 {return}//设备连接MQTT成功后保存设备在线状态
}// OnDisconnect 当客户端因任何原因断开连接时调用。
func (h *edgeHook) OnDisconnect(cl *mqttServer.Client, err error, expire bool) {username : string(cl.Properties.Username)if username || len(username) 0 {return}//设备断开MQTT成功后保存设备离线状态
}// OnPublished 当客户端向订阅者发布消息后调用。
func (h *edgeHook) OnPublished(cl *mqttServer.Client, pk packets.Packet) {Log.Infof(mqtt server OnPublished info topic%s, msg%s, pk.TopicName, string(pk.Payload))//收到客户端消息后做业务逻辑处理
}// 使用内联客户端方式向MQTT发送消息
func PublishMsg(topic string, msg []byte) bool {err : Server.Publish(topic, msg, false, 0)if err ! nil {Log.Errorf(mqtt EdgePublish error%v, topic%s, msg%s, err, topic, msg)return false}return true
}// 使用内联客户端方式订阅边缘MQTT消息topic
func SubscribeTopic(topic string, subscriptionId int, callback func(topic string, msg []byte)) {callbackFn : func(cl *mqttServer.Client, sub packets.Subscription, pk packets.Packet) {Log.Info(mqtt EdgeSubscribe received message, client, cl.ID, subscriptionId, sub.Identifier,topic, pk.TopicName, payload, string(pk.Payload))callback(pk.TopicName, pk.Payload)}_ Server.Subscribe(topic, subscriptionId, callbackFn)
}// 使用内联客户端方式取消订阅边缘MQTT消息topic
func UnsubscribeTopic(topic string, subscriptionId int) {_ Server.Unsubscribe(topic, subscriptionId)
}func main() {// 创建信号用于等待服务端关闭信号sigs : make(chan os.Signal, 1)done : make(chan bool, 1)signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)go func() {-sigsdone - true}()-doneLog.Error(caught signal, stopping...)Server.Close()Log.Error(main.go finished)
}监控MQTT指标信息
mqttRouters : r.Group(/mqtt, func(context *gin.Context) {}){mqttRouters.GET(stats, func(c *gin.Context) {util.R(c, nil, mqtt.Server.Info)})}详情使用指南请看https://github.com/mochi-mqtt/server