重修Android:基于MQTT协议的内网消息推送实践

前言 在 Android 开发过程中消息推送的应用场景很广泛,很多应用都通过接入第三方的推送如极光、友盟等快速集成消息推送能力。但在一些公网受限的环境下无法使用第三方的推送, 一般我们...

前言

在 Android 开发过程中消息推送的应用场景很广泛,很多应用都通过接入第三方的推送如极光、友盟等快速集成消息推送能力。但在一些公网受限的环境下无法使用第三方的推送, 一般我们可以采用 轮训、长链接、MQTT 等方案实现内网的消息推送。其中长连接实现较为复杂, 而轮询这种次世代的方案面临着成本大、送达率不确定等; 相对而言 MQTT 具有服务质量(Qos)机制能搞保证送达率,相对使用 Netty 实现长连接实现起来较为简单。

MQTT

MQTT 是一个基于客户端-服务器的消息发布/订阅传输协议。MQTT 协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。

1. 常见的 Mqtt Broker 服务器

本文中使用了 EMQX, MacOS 环境的安装教程请看: 安装教程.

2. 服务质量

  • Qos0 (At most once,至多一次)

    Sender 发送的一条消息,Receiver 最多能收到一次. 如果 Sender 发送消息失败了也不会重新尝试发送消息

  • Qos1 (At least once,至少一次)

    Sender 发送的一条消息,Receiver 至少能收到一次,也就是说 Sender 向 Receiver 发送消息,如果 Sender 发送消息失败了, 会重新尝试发送消息直到 Receiver 收到消息为止,因为 Sender 可能发送了多次消息,所以 Receiver 也有可能接收到多条消息

  • Qos.2 (Exactly once,确保只有一次)

    Sender 发送的一条消息,会确保 Receiver 能收到而且只收到一次,也就是说 Sender 尽力向 Receiver 发送消息,如果发送失败,会继续重试,直到 Receiver 收到消息为止,同时保证 Receiver 不会因为消息重传而收到重复的消息。

注意: QoS 是 Sender 和 Receiver 之间的协议,而不是 Publisher 和 Subscriber 之间的协议。换句话说,Publisher 发布了一条 QoS1 的消息,只能保证 Broker 能至少收到一次这个消息;而对于 Subscriber 能否至少收到一次这个消息,还要取决于 Subscriber 在 Subscibe 的时候和 Broker 协商的 QoS 等级

Android 下的 MQTT 使用实践

1. 添加依赖

一般我们都是采用了 eclipse 的 paho 作为客户端连接 Broker, 但是通过 paho 的主页可以看到使用中存在的问题挺多的并且已经一年多没有更新, 主要就是没人处理 PR。本文使用了该项目社区 hannesa2/paho.mqtt.android 维护的项目。

项目的 build.gradle

allprojects {
    repositories {

      // 添加 jitpack
      maven { url '<https://www.jitpack.io>' }
    }
}

模块的 build.gradle

dependencies {
  implementation 'androidx.legacy:legacy-support-v4:1.0.0'
  implementation 'com.github.hannesa2:paho.mqtt.android:$latestVersion'
}

2. 初始化 MqttAndroidClient

val callback = object : MqttCallbackExtended {
   override fun connectionLost(cause: Throwable?) {
      // 与 Broker 连接断开
   }

   override fun messageArrived(topic: String?, message: MqttMessage?) {
      // 接收到 broker 更新的消息
   }

   override fun deliveryComplete(token: IMqttDeliveryToken?) {
      // 完成消息的投递
   }

   override fun connectComplete(reconnect: Boolean, serverURI: String?) {
     // 连接完成. 其中 reconnect 表示是否是重连
     /// 当连接是否是首次连接的时候, 应该订阅默认topic.
     if (!reconnect) {
         subscribeDefaultTopic()
     }
   }
}

val mqttClient = MqttAndroidClient(context, mqttConfig.serverUrl, mqttConfig.clientId).apply {

    // 设置 MqttAndroidClient 的相关回调.
    addCallback(callback)

    // 设置Mqtt系统通知栏.
		setForegroundService(notification, 0)
}

3. 开始连接

val connectOptions = MqttConnectOptions().apply {
  // 是否自动清除 session. 注意如果为 true 则会清除session. 会导致如果你掉线的期间,
  // 你所订阅的topic有新的消息,等你重新连接上后因为session被清除了,你将无法接收到在你
  // 离线期间的新消息
  isCleanSession = false
  // 是否自动重新连接。当客户端网络异常或进入后台后导致连接中断,在这期间会不断的尝试重连,
  // 重连等待最初会等待1 秒钟, 每次重连失败等待时间就会加倍,直到 2 分钟,此时延迟将保持在 2 分钟。
  isAutomaticReconnect = true
  // 连接超时
  connectionTimeout = 15
  // 设置'保持活动'间隔
  keepAliveInterval = 30
  // userName = "username"
  // password = "password".toCharArray()
}

val listenerMqttAction = object: IMqttActionListener{
    /**
     * Mqtt 执行连接/解除连接成功的回调
     */
    override fun onSuccess(asyncActionToken: IMqttToken?) {
        // 操作成功
    }

    /**
     * Mqtt 执行连接/解除连接失败的回调
     */
    override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) {
        // 操作失败
    }
}

// region 开始连接
mqttClient.connect(connectOptions, null, listenerMqttAction)
// endregion 开始连接

// region 断开连接
mqttClient.disconnect(null, listenerMqttAction)
// region 断开连接

4. 订阅/取消订阅 Topic

//region 订阅
mqttClient.subscribe("topic", 2, object: IMqttMessageListener {
    override fun messageArrived(topic: String?, message: MqttMessage?) {
      // 接收到了消息
      val msgStr = message?.payload.toString()
    }
})
//endregion 订阅

//region 取消订阅
mqttClient.unsubscribe(it.topic)
//endregion 取消订阅

5. 发布消息

// 发布消息时 topic 不允许使用通配符.
mqttClient.publish("testtopic", MqttMessage().apply {
   payload = "hello broker!".toByteArray(charset = Charsets.UTF_8)
   // 发送消息时的 Qos 为 Sender 与 Broker 通讯的服务质量.
   qos = 2
})

注意: MQTT 允许使用通配符订阅主题,但是并不允许使用通配符广播(发送消息)

使用 MQTT X 进行测试

  • 接收订阅的 Topic 发布的消息

    https://oss.j3dream.top//img/202109021614983.png@img.blog.webp

  • 客户端掉线重连后接收掉线期间发布的消息

    https://oss.j3dream.top//img/202109021624301.png@img.blog.webp

  • 后端可以通过监听 SYS 中的 connect 和 disconnected 感知客户端是否在线

    监听客户端上下线

    • 上线 Topic: $SYS/brokers/+/clients/+/connected
    • 下线 Topic$SYS/brokers/+/clients/+/disconnected

到这里我们就在受限网络中搭建了一套可用的消息推送服务, 后端如果是 Java 可以很方便的通过 Springboot 快速集成 MQTT 给移动端发送消息。

如果想要了解更多 MQTT 的原理可以查看大佬的文章: 为什么每份 Android 简历都说 “熟悉 MQTT 协议”?

实践案例: example-mqtt-client-android