网站建设与维护方式是什么,做的比较漂亮的中国网站,怎样建设公司网站,企业网站建设 法规MQTT 核心概念
发布订阅
MQTT 基于发布订阅模式#xff0c;它解耦了消息的发送方#xff08;发布者#xff09;和接收方#xff08;订阅者#xff09;#xff0c;引入了一个中间代理的角色来完成消息的路由和分发。发布者和订阅者不需要知道彼此的存在#xff0c;他们…MQTT 核心概念
发布订阅
MQTT 基于发布订阅模式它解耦了消息的发送方发布者和接收方订阅者引入了一个中间代理的角色来完成消息的路由和分发。发布者和订阅者不需要知道彼此的存在他们之间唯一的联系就是对消息的一致约定例如消息将使用什么主题、消息将包含哪些字段等等。这让 MQTT 的通信更加灵活因为我们可以随时动态地增加或减少订阅者和发布者。通过发布订阅我们可以轻易地实现消息的广播、组播和单播。
服务端
在发布消息的客户端和订阅的客户端之间充当中介将所有接收到的消息转发到匹配的订阅客户端。所以有时我们也会直接将服务端称为 Broker。
客户端
使用 MQTT 协议连接到 MQTT 服务端的设备或应用程序。它既可以是发布者也可以是订阅者也可以具备这两种身份。
主题
主题被用来标识和区分不同的消息它是 MQTT 消息路由的基础。发布者可以在发布时指定消息的主题订阅者则可以选择订阅自己感兴趣的主题来接收相关的消息。
通配符
订阅者可以在订阅的主题中使用通配符来达到一次订阅多个主题的目的。MQTT 提供了单层通配符和多层通配符两种主题通配符以满足不同的订阅需要。
QoS
MQTT 定义了三种 QoS 等级来分别提供不同的消息可靠性保证。每条消息都可以在发布时独立设置自己的 QoS。QoS 0 最多交付一次消息可能丢失QoS 1 至少交付一次消息可以保证到达但是可能重复QoS 2 只交付一次消息保证到达并且不会重复。QoS 越大消息的传输复杂程度也越高我们需要根据实际场景来选择合适的 QoS。
会话
QoS 只是设计了消息可靠到达的理论机制而会话则确保了 QoS 1、2 的协议流程得以真正实现。会话是客户端与服务端之间的有状态交互它可以仅持续和网络连接一样长的时间也可以跨越多个网络连接存在我们通常将后者称为持久会话。我们可以选择让连接从已存在的会话中恢复也可以选择从一个全新的会话开始。
保留消息
与普通消息不同保留消息可以保留在 MQTT 服务器中。任何新的订阅者订阅与该保留消息中的主题匹配的主题时都会立即接收到该消息即使这个消息是在它们订阅主题之前发布的。这使订阅者在上线后可以立即获得数据更新而不必等待发布者再次发布消息。在某种程度上我们可以把保留消息当作是一个消息 “云盘” 来使用随时上传消息到 “云盘”然后在任意时刻从 “云盘” 获取消息。当然这个 “云盘” 还有一个主题下只能存储一条最新的保留消息的限制。
遗嘱消息
发布订阅模式的特性决定了除了服务器以外没有客户端能够感知到某个客户端从通信网络中离开。而遗嘱消息则为连接意外断开的客户端提供了向其他客户端发出通知的能力。客户端可以在连接时向服务器设置自己的遗嘱消息服务器将在客户端异常断开后立即或延迟一段时间后发布这个遗嘱消息。而订阅了对应遗嘱主题的客户端将收到这个遗嘱消息并且采取相应的措施例如更新该客户端的在线状态等等。
共享订阅
默认情况下消息会被转发给所有匹配的订阅者。但有时我们可能希望多个客户端协同处理接收到的消息以便以水平扩展的方式来提高负载能力。又或者我们希望为客户端增加一个备份客户端当主客户端离线时能够无缝切换到备份客户端继续接收消息以确保高可用性。而 MQTT 的共享订阅特性则提供了这一能力。我们可以将客户端划分为多个订阅组消息仍然会被转发给所有订阅组但每个订阅组内每次只会有一个客户端收到消息。 MQTT选型
MQTT BROKER 技术选型 EMQX安装
本地开发环境
可以选择安装Windows版本 Windows安装EMQ X 官方Windows部署 安装成功后直接访问http://localhost:18083/ 账号/密码admin/public
生产环境
EMQX 本身支持分布式集群架构能够在保证高可用性、容错性和可扩展性的同时处理大量的客户端和消息。通过使用 EMQX 集群您可以在一个或多个节点发生故障时仍然保持集群运行从而享受到容错和高可用性的好处。 相比与之前版本EMQX 5.0 集群采用了新的 Mria 集群架构单节点能支持 500 万 MQTT 设备连接集群可扩展至 1 亿并发 MQTT 连接。官方集群部署 安全指南
网络与 TLS 介绍了 EMQX 如何支持端对端加密通信包括如何启用 SSL/TLS 连接和获取 SSL/TLS 证书。 认证 身份认证是物联网应用的重要组成部分可以帮助有效阻止非法客户端的连接。为了提供更好的安全保障EMQX 支持多种认证机制如 X.509 证书认证、密码认证、JWT 认证、基于 MQTT 5.0 协议的增强认证以及 PSK 认证。本节介绍了这些认证机制的工作方式和配置方法。 授权 在 EMQX 中授权是指对 MQTT 客户端的发布和订阅操作进行权限控制。本节将介绍如何通过内置数据库、文件、或通过集成 MySQL、PostgreSQL、MongoDB 和 Redis 进行授权相关操作。 黑名单 EMQX 为用户提供了黑名单功能用户可以通过 Dashboard 和 HTTP API 将指定客户端加入黑名单以拒绝该客户端访问除了客户端标识符以外还支持直接封禁用户名甚至 IP 地址。 连接抖动检测 EMQX 支持自动封禁那些被检测到短时间内频繁登录的客户端并且在一段时间内拒绝这些客户端的登录以避免此类客户端过多占用服务器资源而影响其他客户端的正常使用。
认证
EMQX Dashboard 提供了开箱即用的认证与权限管理功能用户仅通过用户界面就可以快速实现客户端认证授权机制的配置无需编写代码或手动编辑配置文件即可对接各类数据源与认证服务实现各个级别与各类场景下的安全配置以更高的开发效率获得更安全的保障。
创建认证
在认证页面下的右上角点击 创建 按钮即可进入到创建认证的页面。创建一个认证需要选择一种认证方式选择完成后需要选择一个存储或获取认证信息的数据源JWT 认证方式除外认证数据可以从这些数据源包括数据库或 HTTP 服务中获取最后再配置连接到该数据源的连接信息即可。 认证方式Password-Based使用客户端 ID 或用户名加密码的认证方式 数据源选择redis 选择加密方式及加盐方式加密方式md5 加盐方式prefix 初始化数据到redis: HMSET “mqtt_user:username” “password_hash” “66ace8890090c2a50e729318d45fe53b” “salt” “abc”
验证 http签名配置
创建API秘钥 记录秘钥
appId: ************* appSecret: *************
MQTT通用组件开发
源码地址
目录
├─component-mqtt-client └─component-mqtt-client-starter
component-mqtt-client
mqtt上下文 建立连接
public MqttClientApp connect() {countDownLatch new CountDownLatch(1);Vertx.vertx().deployVerticle(this);return this;
}接收消息 Overridepublic void start() {if (Objects.isNull(this.mqttClient)) {this.mqttClient MqttClient.create(vertx, createMqttClientOptions());}//接收服务端消息处理handlermqttClient.publishHandler(pub - {Buffer buffer pub.payload();String topicName pub.topicName();String[] split topicName.split(/);String string buffer.toString(StandardCharsets.UTF_8);UpMessage upRawMessage new UpMessage();HashMapString, Object headers Maps.newHashMap();headers.put(topic,topicName);headers.put(qos,pub.qosLevel().value());upRawMessage.setHeaders(headers);upRawMessage.setMessageContent(string);upRawMessage.setProductKey(split[0]);upRawMessage.setDeviceId(split[1]);mqttListenerList.forEach(f - {String topic f.getTopic();String[] listenerTopic topic.split(/);boolean flag true;for (int i 0; i split.length; i) {if (allWildcard.equals(listenerTopic[i])) {break;}if (singleWildcard.equals(listenerTopic[i])) {continue;}if (!split[i].equals(listenerTopic[i])) {flag false;break;}}if (flag){f.onMessage(upRawMessage);}});});mqttClient.closeHandler(unused - getVertx().setTimer(RECONNECT_INTERVAL, h - start()));mqttClient.connect(mqttConfig.getListenerInfos().getPort(), mqttConfig.getListenerInfos().getHost(),s - {if (s.succeeded()) {log.info(MqttClient connect success.);subscribe();countDownLatch.countDown();} else {log.error(MqttClient connect fail: , s.cause());if (s.cause() ! null) {vertx.setTimer(RECONNECT_INTERVAL, handler - this.start());}}});}长连接推送消息
public MqttResp publish(MqttReq request) {
MqttResp response new MqttResp();
Buffer payload Buffer.buffer(request.getMessageContent());
mqttClient.publish(request.getTopic(), payload, MqttQoS.valueOf(request.getQos()), false, false, s - {if (s.succeeded()) {log.info(MqttClient publish success[{}], s.result());} else {log.error(MqttClient publish fail., s.cause());}
});
response.setCode(200);
return response;
}http推送消息
public MapString, ? callHttp(MqttReq params) {
String path ;
String url config().getAddress() path;
log.debug(http url[{}] requestBodyStr[{}], url, params.getMessageContent());Dict dict Dict.create();
dict.set(topic, params.getTopic()); //订阅主题
dict.set(payload, params.getMessageContent()); //内容
dict.set(qos, 0); //质量
dict.set(retain,false); //是否保存
String requestBodyStr JSON.toJSONString(dict);RequestBody requestBody RequestBody.create(HTTP_MEDIA_TYPE_JSON_UTF8, requestBodyStr);
Request request new Request.Builder()
.url(url)
.post(requestBody)
.header(Content-Type, application/json)
.header(Authorization, Credentials.basic(config().getAppId(), config().getAppSecret()))
.build();try (Response response getHttpClientInstance().newCall(request).execute()) {log.debug(Call http success. url[{}] response[{}], url, response);if (response.code() 404) {return ImmutableMap.of(code, 404, Message, 404 Not Found);} else if (!response.isSuccessful()) {return ImmutableMap.of(code, response.code(), Message, Server Error);}// 输出响应内容assert response.body() ! null;String string response.body().string();return JSON.parseObject(string);
} catch (IOException e) {log.warn(Call http failed, {}. url[{}] requestBodyStr[{}], e.getMessage(), url, requestBodyStr);
}return Collections.emptyMap();
}Mqtt配置信息 public class MqttConfig {private String appId;private String appSecret;private String address;private String username;private String password;private ListenerInfo listenerInfos;DataNoArgsConstructorAllArgsConstructorpublic static class ListenerInfo {private String host;private int port;private boolean ssl;//订阅的topicprivate ListString subscribeTopics;}}监听信息接口 public interface MqttListener {void setTopic(String topic);String getTopic();void onMessage(Message message);
}component-mqtt-client-starter
MqttClientAutoConfiguration META-INF com.gitee.xmhzzz.component.mqtt.client.MqttClientAutoConfigurationMqtt服务实战demo
通过component-mqtt-client-starter快速构建mqtt-service服务
发生消息
public class MqttController {Autowiredprivate IMqttApi IMqttApi;PostMapping(/pub/tcp)public void pubTcp(){MqttReq mqttReq new MqttReq();mqttReq.setTopic(topicA/001/in);MapString, Object map Maps.newHashMap();map.put(1,o);mqttReq.setData(map);IMqttApi.tcpPub(mqttReq);}
}监听消息
Slf4j
Component
public class AMqttListener implements MqttListener {private String topic;public AMqttListener() {this.topic topicA//msg;}Overridepublic void setTopic(String topic) {}Overridepublic String getTopic() {return this.topic;}Overridepublic void onMessage(Message message) {log.info(a message[{}], JSONObject.toJSONString(message));}
}