APP下载

Netty的MQTT协议消息系统的设计与实现

2023-10-04杨建

科技资讯 2023年17期
关键词:消息集群客户端

杨建

(江西工程学院 江西新余 338000)

现阶段,MQTT协议主要有两种类型,一种应用于TCP、IP 网络,另一种应用于传感器网络,能够提供有序、无损、双向连接。目前,MQTT协议主要针对M2M、物联网应用开发,在设计初期充分遵循简单、轻量原则,减少对网络带宽与设备资源的实际需求,确保可靠性、保证性地交付。MQTT协议使用发布、订阅消息模式,将有效信息分配给一个或多个接收者,在实现同应用程序解耦的过程中,屏蔽负载内容,现已广泛应用于IoT、Mobile Internet、智能家居、穿戴和办公等智能硬件、车载自组织网络和电力能源等行业。

1 MQTT协议

MQTT协议中存在发布者、代理者和订阅者3种角色,发布者负责向代理者发布消息,代理者负责订阅者转发消息,其中,消息发布者、订阅者均为客户端,消息代理者为服务端。MQTT协议报文通常由Fixed header、Variable header 与Payload 共同组成,主要消息类型有11 种。MQTT 协议能够提供质量截然不同的3 种消息传递服务。在QOS0 至多一次服务等级中,MQTT 协议消息按照基础TCP、IP网络交付,有可能发生丢失或者产生重复。这种消息传递服务质量等级可以应用在非重要情况下。在QOS1 至少一次服务等级中,MQTT 协议消息可以确保及时送达,有可能产生重复。在QOS2恰好一次服务等级中,MQTT 协议消息可以确保送达且仅被送达一次[1]。

2 MQTT协议消息系统的设计需求分析

2.1 功能性需求

在MQTT 协议消息系统的功能性需求上,主要针对一般用户和系统用户实现。通过对MQTT协议消息系统的设计,IoT物模型管理能够对设备功能实施数字化定义,云端建构实体数据模型后,描述模型功能,便捷化管理设备。系统可以提供IoT设备状态管理,通过上线、下线、离线监控,实时获取设备运行状态、健康状态的数据参数,据此可以搭建设备IoT数据平台,为诊断设备故障、预警夯实基础。端到端的消息路由可以实现对数据的灵活控制,提高边缘计算节点的安全性,便于数据在设备、函数应用与IoT Hub间流转[2]。

在数据、监控状态上报、汇聚到系统中的规则引擎后,根据预先设定规则转发消息至主体设备、业务系统、App、数据库、消息中间件等。系统安全模块通过在设备连接阶段进行设备身份认证,对非法连接请求进行拒绝,确保连接阶段设备合法。与此同时,消息系统通过控制订阅主题进行权限区分,防止因数据泄密、越级访问产生网络阻塞等。为保证MQTT协议消息系统的可用性始终处于高水平,系统需要在集群工作管理模式下,对外部提供一致性服务,确保集群工作过程中的消息发布和消息订阅操作,能够通过正确处理消息的方式,确保消息在各节点的共享。同时,系统可以通过IoT 设备调用服务、IoT 设备远程完整性监控等通用信息服务能力,支持IoT中心所接收的实时传感器数据进行可视化展示、IoT 场景数据采集存储服务、第三方服务等业务相关应用,在系统信息监控过程中,对主题建模最佳数量、序列与角度、CPU使用率、连接总数、活跃数、最大并发数、客户端间发布和订阅信息,监控服务器实时运行状态。

2.2 非功能性需求

为保证MQTT 协议消息系统的实用价值,在非功能性需求上,需要在消息迟延时间、程序并发性和系统可扩展性几方面进行对应约束[3]。MQTT 协议消息系统在传递IoT场景中设备消息方面发挥主要作用,传递消息的及时性,直接决定IoT设备是否能够通过相关指令迅速进行对应操作,因而,需要MQTT协议消息系统在获得消息后,通过端到端的消息路由实时分发消息,系统性能越好。在设计MQTT 协议消息系统过程中,需要侧重考虑系统在程序上的并发性,一旦系统所接入的设备数量大规模增长,并发数逐渐增高,需要考虑并发连接支持高并发的连接数,以此保证消息系统的稳定程度。伴随业务扩展、新需求到来,为便于MQTT协议消息系统在设计过程中有效实施扩展,需要通过集群方式对系统进行部署,开发新功能、提供统一接口。

3 MQTT协议消息系统的总体架构设计

3.1 Netty技术解析

Netty 是经由JBOSS 应用服务器所提供的Java 网络开源应用框架,功能强大。Netty提供全新方式开发Web server、客户端过程,该方式易用性、扩展性显著增强,在复杂的内部环境基础上,允许更高的吞吐量。一以贯之,Netty 能够提供异步通信方式、事件驱动策略的Web 应用程序框架与工具,快速实现Web server、客户端管理程序的高性能、高可靠性开发。Netty使用传输控制协议,为多客户端提供消息查询服务,促使该过程相对简单。现阶段,Netty 的核心组件主要包括由java.nio.channels包定义的Channel通道、Callback回调、异步模型Future、可扩展的事件模型和Netty、代码的主要扩展与定制点Channel Handler[4]。其中,Channel 通道通常代表一个实体的开放性连接,异步模型Future提供在操作完成过程中通知应用程序的方式,其能够在未来某一时刻完成,提供对结果的访问,Channel Handler是一个父接口,本身并未提供过多方法。

3.2 MQTT协议消息系统的总体设计

现阶段,MQTT协议消息系统是IoT重要的传输协议,应用广泛,负责进行设备接入和采集数据,需要根据业务实际需求将获取的数据存储至Mysql 数据库、统一上报到相关业务系统等。在搭建MQTT协议消息系统过程中主要以集群方式实现,通过集群的方式可以解决基于MQTT协议的Rabbit MQ消息收发,实现远程收发、AMQP与MQTT间的收发。单个节点能够提供完整服务,可以支撑MQTT协议消息的传递,多节点协调对外提供一致性服务。

每单个节点的MQTT 代理服务端在搭建过程中,使用Netty 技术建构Boss Event Loop 线程模型,用以处理客户端连接请求,建构Work Event Loop线程模型,用以处理客户端的读写请求。消息入站后进行字节解码,消息出站前将MQTT 客户端对象编译成为字节[5]。Engine x 是高性能Web 服务器,其作为云上数据库的存储引擎,可以通过数据流代理Stream 模块实现协议的代理转发tcp报文,主要用于数据流代理和负载均衡等。分布式应用程序协调服务软件zookeeper 作为服务集群,MQTT 协议消息系统的总体设计主要由传感器、车载装置、智慧工厂等物模型与各个主题模块、设备运行状态管理模块、消息路由模块、安全模块(SE)、集群模块、监控模块组成。

4 MQTT协议消息系统的模块功能实现

4.1 服务器终端实现

服务器终端能够在消息系统中接受、发送、解析和计算IoT设备控制命令与数据指令,全监控IoT设备与数据存储设备信息,主要在下述几个模块进行。

模块一,传感器、车载装置、智慧工厂等物模型与各个主题模块。消息系统通过TSL 语言描述物模型,采取JSON格式对物模型数据进行类似格式上报,在主题实现过程中将IoT 设备主题划分为上行、下行两类数据。

模块二,设备运行状态管理模块。该模块是后续消息发布和订阅操作实现的基础,需要在连接设备时通过发送报文连接消息系统。消息系统通过定义Session信息实现持久和非持久会话的存储[6]。一旦设备成功与消息系统建立连接,通过维护TCP长连接,检测设备运行状态,及时将不够活跃的连接断开,充分将服务器内存资源释放出来。基于MQTT 协议规范,始终保持连接的设备需要主动向服务节点定时传送PING 包REQ心跳请求,系统接收请求后及时进行将心跳响应(PINGRESP)返回至客户端,进而保持设备连接状态,成功测试网络处于通顺状态。为实现设备心跳,需要使用Netty 技术实现连接读写事件、空闲事件检查,根据设备心跳时间触发Netty核心组件,找到设备对应连接并予以关闭,释放服务器资源。

模块三,消息路由模块。在实现即时消息推送过程中,通过成功连接设备端、系统,首先发送报文请求,进行协议处理,提取消息发布主题、消息传递服务质量级别、消息内容,进行封装后调用发布权限,验证设备是否具备。在消息推送前需要进行主题订阅匹配查询,找出各层级设备订阅列表和订阅该主题消息的设备,写入消息到设备对应消息队列后,将消息推送给指定客户端。在实现离线消息推送过程中,需要结合MQTT协议规范存储设备离线消息。在消息推送后进行主题订阅匹配查询,通过设备状态查询模块查找设备在线情况,便于系统通过离线数据写入模块,将设备离线消息写入数据库[7]。一旦系统初始化开始,为保证离线消息毫秒级查询,需要在处理逻辑中调用generate Row Key 生成消息的行键。最终,将消息存储在列表里批量提交,通过后台程序及时删除推送过的消息。在桥接数据到Kafka开源流处理平台时,需要根据预设规则将主题消息筛选,通过写入对应主题,促使其他业务订阅平台主题消费消息。

模块四,安全模块(SE)。使用“一型号一密钥”的方式验证设备注册情况,使用一机器一密钥的方式验证设备认证情况。设备连接时,通过提取clientid、username和password这3个参数解析连接报文,向设备身份验证服务发送请求。使用标识、IP、用户名、密码和主题为参数发起请求权限,处理ACL HTTP请求。

模块五,集群模块。集群通常由节点构成,节点不规则分布于JVM、物理机之上,一旦集群系统被触发,节点将自动加入集群,节点状态变化通过gossip 通信协议传输至集群每个节点,确保集群状态一致性[8]。因MQTT 协议消息系统通过去中心化集群方式搭建,存在多个服务节点,IoT 设备发布消息后,节点需要精准转发消息,确保消息路由推送消息的正确与完整程度。

模块六,监控模块。该模块通过变量方式存储各节点设备连接数、主题订阅数、客户端等信息,在数据库中,一旦需要查看相应信息,可以依靠脚本及时获取信息并定时更新。

4.2 消息代理服务器实现

在通信过程,需要设备和服务器间的通信协议、服务器与客户端间利用AMQPAMQP传输的通信协议、服务器与移动通信终端间利用MQTT协议传输的通信协议[9]。实现MQTT协议消息系统代理服务器工作,需要科学管理已经订阅的消息,通过多终端转发消息,查询存储信息,同时,通过设置用户权限实现“1&X”服务关系。在多终端服务端口,用户可以通过ID查询订阅成功的MQTT 主题,通过多终端对设备实施远程控制。

5 MQTT协议消息系统的功能与性能测试

Netty 的MQTT 协议消息系统的设计与实现,使用客户端工具进行消息系统的功能测试。在测试过程中,客户端工具通过创建多样化连接设置的客户端满足测试需求,选用MQTT Box 作为主测试工具,验证MQTT协议消息系统是否满足系统功能需求。在功能测试过程中,主要对IoT 设备身份认证ID²、发布消息、消息订阅主题(Subscribe)、接收推送消息、自定义心跳包结构体、消息重投功能(republish)进行[10]。

针对发布消息、消息订阅主题、接收推送消息、自定义心跳包结构体、消息重投功能的测试通常由用例编号、用例名称、用例标题、测试目的、测试步骤(预置条件、测试步骤)、预期结果、测试结果等共同组成。选用MQTT.fx,使用Java 语言编写的客户端工具,针对MQTT 客户端进行Topic 订阅和消息发布,同时启动客户端A 和客户端B,A 用于消息发布,B 用于Topic 订阅,测试MQTT 协议消息系统是否支持QOS2 恰好一次服务等级质量[11]。

在单节点性能测试上,可以通过两台机器执行程序模拟,多次执行命令设置连接数,通过Web监控页面连接成功率,5 000 连接数的CPU 占用率为10.2%,10 000连接数的CPU占用率为18.5%,20 000连接数的CPU 占用率为28.3%,30 000 连接数的CPU 占用率为41.6%,38 000 连接数的CPU 占用率为55.1%,结果均为成功。可见连接数在38 000内,设备连接较为稳定。在集群性能测试过程中,通过Jmeter 测试软件模拟设置并发连接数及相关参数、点击执行,生成测试报告。MQTT协议消息系统在集群环境下可以处理更多设备连接数,节点集群处理100 000 连接非常稳定,CPU 负载较低,消息时延在1 s内,满足对系统需求。

6 结语

综上所述,Netty 提供API 接口从网络处理代码中解耦核心业务逻辑与辅助功能,完全基于Java NIO(无阻塞的输入/输出)中的Buffer 实现内部可扩展性解决方案,在NIO通道进行交互的过程中,将数据移进移出通道,架构编程服务器与客户端框架结构,实现服务端、客户端要求的功能。以Netty 技术为支撑的MQTT协议消息系统属于IoT服务运营平台组成部分之一,与IoT设备管理子系统同处于IoT服务运营平台接入层底部,作为平台重要的基础,MQTT协议消息系统可以为平台上层应用提供强大的数据支撑,与接入层的规则引擎、IoT 应用微服务、数据存储管理系统通过HTTP API 网关、RPC 服务器等实现服务间的协同。测试结果表明,MQTT协议消息系统能够满足预期设计目标,可利用在多种物联网系统中。

猜你喜欢

消息集群客户端
一张图看5G消息
海上小型无人机集群的反制装备需求与应对之策研究
一种无人机集群发射回收装置的控制系统设计
县级台在突发事件报道中如何应用手机客户端
孵化垂直频道:新闻客户端新策略
基于Vanconnect的智能家居瘦客户端的设计与实现
Python与Spark集群在收费数据分析中的应用
勤快又呆萌的集群机器人
消息
消息