移动端
重修Android:基于MQTT协议的内网消息推送实践
前言 在 Android 开发过程中消息推送的应用场景很广泛,很多应用都通过接入第三方的推送如极光、友盟等快速集成消息推送能力。但在一些公网受限的环境下无法使用第三方的推送, 一般我们...
前言
在 Android 开发过程中消息推送的应用场景很广泛,很多应用都通过接入第三方的推送如极光、友盟等快速集成消息推送能力。但在一些公网受限的环境下无法使用第三方的推送, 一般我们可以采用 轮训、长链接、MQTT 等方案实现内网的消息推送。其中长连接实现较为复杂, 而轮询这种次世代的方案面临着成本大、送达率不确定等; 相对而言 MQTT 具有服务质量(Qos)机制能搞保证送达率,相对使用 Netty 实现长连接实现起来较为简单。
MQTT
MQTT 是一个基于客户端-服务器的消息发布/订阅传输协议。MQTT 协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。
1. 常见的 Mqtt Broker 服务器
- EMQ X (https://www.emqx.com/zh/downloads?product=broker)
- ActiveMQ/Artemis(https://activemq.apache.org/)
- ...
本文中使用了 EMQX, MacOS 环境的安装教程请看: 安装教程.
2. 服务质量
-
Qos0 (At most once,至多一次)
Sender 发送的一条消息,Receiver 最多能收到一次. 如果 Sender 发送消息失败了也不会重新尝试发送消息
-
Qos1 (At least once,至少一次)
Sender 发送的一条消息,Receiver 至少能收到一次,也就是说 Sender 向 Receiver 发送消息,如果 Sender 发送消息失败了, 会重新尝试发送消息直到 Receiver 收到消息为止,因为 Sender 可能发送了多次消息,所以 Receiver 也有可能接收到多条消息
-
Qos.2 (Exactly once,确保只有一次)
Sender 发送的一条消息,会确保 Receiver 能收到而且只收到一次,也就是说 Sender 尽力向 Receiver 发送消息,如果发送失败,会继续重试,直到 Receiver 收到消息为止,同时保证 Receiver 不会因为消息重传而收到重复的消息。
注意: QoS 是 Sender 和 Receiver 之间的协议,而不是 Publisher 和 Subscriber 之间的协议。换句话说,Publisher 发布了一条 QoS1 的消息,只能保证 Broker 能至少收到一次这个消息;而对于 Subscriber 能否至少收到一次这个消息,还要取决于 Subscriber 在 Subscibe 的时候和 Broker 协商的 QoS 等级
Android 下的 MQTT 使用实践
1. 添加依赖
一般我们都是采用了 eclipse 的 paho 作为客户端连接 Broker, 但是通过 paho 的主页可以看到使用中存在的问题挺多的并且已经一年多没有更新, 主要就是没人处理 PR。本文使用了该项目社区 hannesa2/paho.mqtt.android 维护的项目。
项目的 build.gradle
allprojects {
repositories {
// 添加 jitpack
maven { url '<https://www.jitpack.io>' }
}
}
模块的 build.gradle
dependencies {
implementation 'androidx.legacy:legacy-support-v4:1.0.0'
implementation 'com.github.hannesa2:paho.mqtt.android:$latestVersion'
}
2. 初始化 MqttAndroidClient
val callback = object : MqttCallbackExtended {
override fun connectionLost(cause: Throwable?) {
// 与 Broker 连接断开
}
override fun messageArrived(topic: String?, message: MqttMessage?) {
// 接收到 broker 更新的消息
}
override fun deliveryComplete(token: IMqttDeliveryToken?) {
// 完成消息的投递
}
override fun connectComplete(reconnect: Boolean, serverURI: String?) {
// 连接完成. 其中 reconnect 表示是否是重连
/// 当连接是否是首次连接的时候, 应该订阅默认topic.
if (!reconnect) {
subscribeDefaultTopic()
}
}
}
val mqttClient = MqttAndroidClient(context, mqttConfig.serverUrl, mqttConfig.clientId).apply {
// 设置 MqttAndroidClient 的相关回调.
addCallback(callback)
// 设置Mqtt系统通知栏.
setForegroundService(notification, 0)
}
3. 开始连接
val connectOptions = MqttConnectOptions().apply {
// 是否自动清除 session. 注意如果为 true 则会清除session. 会导致如果你掉线的期间,
// 你所订阅的topic有新的消息,等你重新连接上后因为session被清除了,你将无法接收到在你
// 离线期间的新消息
isCleanSession = false
// 是否自动重新连接。当客户端网络异常或进入后台后导致连接中断,在这期间会不断的尝试重连,
// 重连等待最初会等待1 秒钟, 每次重连失败等待时间就会加倍,直到 2 分钟,此时延迟将保持在 2 分钟。
isAutomaticReconnect = true
// 连接超时
connectionTimeout = 15
// 设置'保持活动'间隔
keepAliveInterval = 30
// userName = "username"
// password = "password".toCharArray()
}
val listenerMqttAction = object: IMqttActionListener{
/**
* Mqtt 执行连接/解除连接成功的回调
*/
override fun onSuccess(asyncActionToken: IMqttToken?) {
// 操作成功
}
/**
* Mqtt 执行连接/解除连接失败的回调
*/
override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) {
// 操作失败
}
}
// region 开始连接
mqttClient.connect(connectOptions, null, listenerMqttAction)
// endregion 开始连接
// region 断开连接
mqttClient.disconnect(null, listenerMqttAction)
// region 断开连接
4. 订阅/取消订阅 Topic
//region 订阅
mqttClient.subscribe("topic", 2, object: IMqttMessageListener {
override fun messageArrived(topic: String?, message: MqttMessage?) {
// 接收到了消息
val msgStr = message?.payload.toString()
}
})
//endregion 订阅
//region 取消订阅
mqttClient.unsubscribe(it.topic)
//endregion 取消订阅
5. 发布消息
// 发布消息时 topic 不允许使用通配符.
mqttClient.publish("testtopic", MqttMessage().apply {
payload = "hello broker!".toByteArray(charset = Charsets.UTF_8)
// 发送消息时的 Qos 为 Sender 与 Broker 通讯的服务质量.
qos = 2
})
注意: MQTT 允许使用通配符订阅主题,但是并不允许使用通配符广播(发送消息)
使用 MQTT X 进行测试
-
接收订阅的 Topic 发布的消息

-
客户端掉线重连后接收掉线期间发布的消息

-
后端可以通过监听 SYS 中的 connect 和 disconnected 感知客户端是否在线
监听客户端上下线
- 上线 Topic:
$SYS/brokers/+/clients/+/connected - 下线 Topic:
$SYS/brokers/+/clients/+/disconnected
- 上线 Topic:
到这里我们就在受限网络中搭建了一套可用的消息推送服务, 后端如果是 Java 可以很方便的通过 Springboot 快速集成 MQTT 给移动端发送消息。
如果想要了解更多 MQTT 的原理可以查看大佬的文章: 为什么每份 Android 简历都说 “熟悉 MQTT 协议”?