引言
Paho MQTT Java 是由 Eclipse 基金会提供的常用 MQTT 客户端实现之一,广泛用于物联网设备与服务之间的通信。近期在项目中使用
paho.mqtt.java 时,我们遇到了一个问题:在模拟设备场景下使用同步或异步客户端接收消息时,解析指令后发送 ACK给服务端,会导致客户端只能接收一次消息,之后消息接收停止。问题重现
使用的 paho 版本如下
同步客户端的实现
以下是使用同步 MQTT 客户端时的代码示例:
问题现象:
在回调方法
messageArrived() 中,客户端收到消息后立即发布了 ACK 消息。然而,这种操作导致后续消息无法接收,并且客户端似乎进入了阻塞状态。

异步客户端的实现
同样,使用异步客户端时也发现了类似的问题。以下是异步 MQTT 客户端的代码示例:
问题现象:
类似地,在异步客户端中,消息发布后客户端就因为异常断联了。

问题分析
通过对 Paho MQTT Java 的源码分析,我们发现同步客户端和异步客户端的核心逻辑共用同一套底层实现。
关键问题出现在回调函数
messageArrived() 中: 在回调线程内进行耗时操作(如发布消息)阻塞了回调线程,从而导致客户端无法继续处理后续的 MQTT 消息。客户端的线程模型分析
在 Paho 的内部实现中,MQTT 客户端使用了多线程模型:
线程模型解析:
- CommsReceiver:负责接收来自 MQTT Broker 的消息,并触发回调。
- CommsSender:负责将消息发送给 Broker。
- Callback Thread:用于处理回调事件,如
messageArrived()、connectionLost()等。
- PingSender: 使用 new Timer 创建,用于连接成功后维持设备心跳。
如果在
messageArrived() 回调中阻塞回调线程(例如同步发布消息),那么会导致消息处理管道被堵塞,后续消息无法正常接收。进入到调试状态,查看线程情况的话,可以看到使用 paho mqtt 客户端时对应的4个线程

源码分析与核心接口
MqttClient和MqttAsyncClient:这是 Paho 客户端的两个主要接口,分别提供同步和异步的 MQTT 通信。MqttClient:这是一个同步客户端,调用方法会阻塞直到操作完成。MqttAsyncClient:这是一个异步客户端,调用方法立即返回,可以通过回调和IMqttToken追踪操作结果。
MqttMessage:代表 MQTT 消息的内容,包括负载(payload)和消息选项,如 QoS 等级。
IMqttToken和IMqttDeliveryToken:用于追踪和控制异步操作(如发布消息)。IMqttDeliveryToken是IMqttToken的子接口,专门用于追踪发布的完成情况。
ClientComms:这是核心通信类,负责与 MQTT 服务器的实际通信。它管理客户端的连接、消息的发送和接收,并协调其他线程的执行。
CommsCallback:处理从 MQTT 服务器接收到的消息和事件,并调用用户的回调函数来处理消息的到达、交付确认和连接丢失等事件。
CommsReceiver和CommsSender:这两个类分别负责接收和发送 MQTT 消息的数据包。它们分别在各自的线程中运行,确保客户端能够同时发送和接收数据。
Token:内部实现的IMqttToken接口,用于管理和跟踪异步操作的状态。发布消息、订阅等操作都会创建一个对应的Token。
不同线程的工作
- 消息发送:当用户调用
publish()方法发布消息时,当前线程会构建MqttPublish消息会被放入ClientComms的发送队列(ClientState中Vector pendingMessages)中,CommsSender线程会从队列中取出消息并发送到服务器。发送完成后,服务器会返回PUBACK或PUBCOMP消息,CommsReceiver线程负责接收这些确认消息。
- 消息接收:
CommsReceiver线程阻塞式地从DataInputStream流中读取TCP包,并根据MQTT协议进行解析拼装message,当服务器发送一条消息时,接收到该消息后,通过ClientState.notifyReceiveMsg到CommsCallback.messageArrived,将接收到的消息放到了Vector<MqttWireMessage> messageQueue队列中。CommsCallback线程一直查询messageQueue是否有新数据,当有新消息到来时,会在CommsCallback线程调用用户定义的messageArrived()方法来处理消息。
ClientState 的作用
ClientState 是 Paho MQTT Java 客户端的状态管理器,它的主要职责包括:- 管理消息队列: 它维护了待发送消息队列(
pendingMessages)和已发送但未确认的消息队列(pendingFlows)。这些队列用来管理不同 QoS 等级下的消息流转。
- 消息重发管理: 对于 QoS 1 和 QoS 2 的消息,
ClientState负责在消息未及时确认时进行重发,以保证消息的到达和传输。
- 管理连接状态:
ClientState追踪客户端的连接状态,控制客户端是否处于连接中、断开中等状态,并在需要时触发相应的回调。
- 生成和管理
MqttToken: 对于每个 MQTT 操作(如发布、订阅、连接、断开等),ClientState会生成一个MqttToken来跟踪其状态,并在操作完成后更新MqttToken的状态。
MqttToken 的作用
MqttToken 是用于跟踪单个 MQTT 操作的对象,每一个 MqttToken 都代表了一个异步操作的状态。它的职责包括:- 跟踪操作的完成状态:
MqttToken记录操作是否已完成(如发布消息、订阅主题等),以及操作是否成功。
- 存储操作结果:
MqttToken可以存储操作的结果(如服务器返回的订阅确认信息SUBACK),并在操作完成时将结果返回给调用者。
- 提供回调支持:
MqttToken支持为操作设置回调方法(如IMqttActionListener),这些回调在操作完成后会被调用。
ClientState 和 MqttToken 的交互逻辑
ClientState 和 MqttToken 之间的关系和交互流程可以概括为以下几个步骤:- 创建
MqttToken: 当用户发起一个操作(如publish、subscribe、connect等)时,ClientState会生成一个对应的MqttToken,用于跟踪该操作的状态。例如,当用户发布一条消息时,会生成一个MqttToken来跟踪PUBLISH请求的状态。
- 记录和跟踪状态:
ClientState会将该MqttToken记录在相关的待处理队列中,并在消息被成功发送到服务器或收到服务器确认(如PUBACK)时,更新MqttToken的状态。此时,MqttToken可以用来查询该操作的结果,例如操作是否成功、是否收到服务器的确认。
- 处理服务器响应: 当
CommsReceiver接收到来自服务器的响应(如PUBACK、SUBACK等)时,它会将这些响应传递给ClientState,由ClientState更新对应的MqttToken。例如,当接收到SUBACK时,ClientState会更新对应订阅操作的MqttToken状态为完成,并调用该MqttToken关联的回调函数。
- 完成操作后的回调和状态清理: 在操作完成时(如消息发布确认完成或订阅确认到达),
ClientState会通知相应的MqttToken并触发其回调。同时,ClientState会从待处理的消息队列中移除该操作的记录。
消息处理中的阻塞问题
在 MQTT 的线程模型中,回调线程的核心职责是快速处理消息并返回。如果回调线程中执行了同步阻塞操作,如发布消息的
publish().waitForCompletion(),则会导致:- 消息处理线程被占用:无法处理新的消息。
- 连接不稳定:在异步客户端中,阻塞回调可能导致连接被异常断开。
以示例中的使用异步客户端
publish 一条消息为例,查看完整的代码调用在 callback 线程 中调用
waitForCompletion(3000) 会阻塞该线程。然而,成功处理 token 的逻辑和调用 responseLock.notifyAll() 也在同一 callback 线程中执行。由于线程在等待时阻塞自身,导致无法及时调用 notifyAll() 解除等待。这种情况会使得 waitForCompletion() 超时,进而无法执行后续的日志打印操作,从而造成未能记录消息接收日志的问题。PS: 在阅读源码的过程中,我们可以看到,Paho MQTT 客户端的网络 I/O 模型是 阻塞 I/O(Blocking I/O) 模型。
解决方案与最佳实践
使用独立线程处理消息发布
在回调函数中,不应直接发布消息,而是应将消息交由另一个线程池处理,确保回调函数能够及时返回。示例如下:
避免在回调中执行耗时操作
- 非阻塞操作:尽量避免在回调中调用阻塞方法,如
waitForCompletion()。
- 异步处理:使用线程池或异步框架处理复杂的业务逻辑。
结论
在使用 Paho MQTT Java 客户端时,回调函数中的阻塞操作会导致消息接收异常和客户端断连问题。为避免此类问题,建议在回调中使用 线程池 或 异步任务 处理复杂逻辑,确保回调线程快速返回。
通过这种优化,我们能够保证 MQTT 客户端的稳定性,并提高消息处理效率。
- 作者:Yibin
- 链接:https://yibin.dev/article/11f60b50-99a4-800c-9f0d-f261612f441b
- 声明:本文采用 CC BY-NC-SA 4.0 许可协议,转载请注明出处。
相关文章







