使用vertx-mqtt作为mqtt客户端时,发现运行一段时间后会报出ERROR io.vertx.mqtt.impl.MqttClientImpl - Attempt to exceed the limit of 50 inflight messages的错误。

通过观察io.vertx.mqtt.impl.MqttClientImpl类的源码可知,在客户端发布消息时会先判断countInflightQueue的数量是不是大于最大的InflightQueue,这里的countInflightQueue指的就是已经发布的消息质量为Qos1或Qos2并且没有收到服务端ack确认的消息数量,所以解决的办法就是让服务端进行ack确认回复,或者把消息质量换成Qos0(当然要根据场景来定,一般是服务端按照协议实现就没问题),下面是源码关键部分:

1
2
3
4
5
6
7
8
9
10
public Future<Integer> publish(String topic, Buffer payload, MqttQoS qosLevel, boolean isDup, boolean isRetain) {  
// ...
if (this.countInflightQueue >= this.options.getMaxInflightQueue()) {
msg = String.format("Attempt to exceed the limit of %d inflight messages", this.options.getMaxInflightQueue());
log.error(msg);
exception = new MqttException(2, msg);
return this.ctx.failedFuture(exception);
}
// ...
}

另外补充emqx对这部分的说明:

In order to improve message throughput efficiency and reduce the impact of network fluctuations, multiple unacknowledged QoS 1 and QoS 2 packets are allowed to exist on the network link at the same time by EMQX broker. These packets that has been sent but unconfirmed will be stored in the inflight Window until the confirmation is completed.

When the simultaneously existing packets in the network link exceeds the limit, that is, the Inflight Window reaches its length limit of the (see max_inflight), the EMQX broker will no longer send subsequent packets, but will store these packets in the Message Queue. Once a packet is confirmed in the Inflight Window, the packets in the Message Queue will be sent in first-in, first-out order and stored in the Inflight Window.

When the client is offline, Message Queue is also used to store QoS 0 messages, which will be sent the next time the client is online. This feature is enabled by default, and you can also disable it manually, which can be seen in mqueue_store_qos0.

It should be noted that if the Message Queue also reaches the length limit, subsequent packets will still be buffered to the Message Queue, but the first buffered message in the corresponding Message Queue will be discarded. If there are QoS 0 messages in the queue, the QoS 0 messages are discarded first. Therefore, it is very important to configure a suitable message queue length limit (see max_mqueue_len) according to your actual situation.