Android推送之MQTT基本用法

在Android应用开发中,推送 功能经常是必不可少的一环,目前常用的推送可能有小米推送、极光推送、华为推送、友盟推送等等,在此就不一一说明了。在后台开发中,经常使用ActiveMQ或RabbitMQ等作为消息中间件,Android端可以使用MQTT协议连接ActiveMQ进行通信,实现推送的功能。

在连接 ActiveMQ 之前请先前往 ActiveMQ 官网 下载最新版的 ActiveMQ,解压到本地,并运行 ActiveMQ 。

GitHub 样例代码
点击下载 APK 样例

Demo 演示:

AndroidMQTT-v1.0.gif

依赖及配置

build.gradle 文件中引入 MQTT 协议所需的包。

1
2
3
4
5
dependencies {
// MQTT
implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.0'
implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'
}

AndroidManifest.xml 中引入 MqttService 及相应的权限

1
2
3
4
5
6
7
8
9
10
11
<manifest>
<!-- Permissions the Application Requires -->
<uses-permission android:name="android.permission.INTERNET" />
<uses-permission android:name="android.permission.WAKE_LOCK" />
<uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" />

<application>
<!-- ... -->
<service android:name="org.eclipse.paho.android.service.MqttService" />
</application>
</manifest>

连接 ActiveMQ

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public class MainActivity extends AppCompatActivity {
// MQTT 客户端
private MqttAndroidClient mqttAndroidClient;
// 连接 ActiveMQ 的URI
private String serverUri = "tcp://iot.eclipse.org:1883";
// private String serverUri = "tcp://[your ip]:[mqtt port]";
// 客户端 ID,用以识别客户端
private String clientId = "android_sample_001";
/**
* 使用 MQTT 协议连接 ActiveMQ
*/
private void connect() {
mqttAndroidClient = new MqttAndroidClient(getApplicationContext(), serverUri, clientId);
mqttAndroidClient.setCallback(new MqttCallbackExtended() {
@Override
public void connectComplete(boolean reconnect, String serverURI) {
// TODO 连接完成
}

@Override
public void connectionLost(Throwable cause) {
// TODO 连接中断
}

@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
// TODO 消息到达
}

@Override
public void deliveryComplete(IMqttDeliveryToken token) {
// 消息成功传输后调用
}
});
// 设置 MQTT 连接参数
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
mqttConnectOptions.setAutomaticReconnect(true); // 设置自动重连
mqttConnectOptions.setCleanSession(false); // 设置是否清除 Session
try {
// 开始连接
mqttAndroidClient.connect(mqttConnectOptions, null, new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
// TODO 连接成功
// 连接成功后可订阅主题
subscribeToTopic();
}

@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
// TODO 连接失败
}
});
} catch (MqttException ex) {
ex.printStackTrace();
}
}
}

订阅主题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class MainActivity extends AppCompatActivity {
// MQTT 客户端
private MqttAndroidClient mqttAndroidClient;
// 主题名称
private String topic = "AndroidTopic";
/**
* 订阅主题
*/
private void subscribeToTopic() {
try {
mqttAndroidClient.subscribe(topic, 0, null, new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
// TODO 主题订阅成功
}

@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
// TODO 主题订阅失败
}
});
} catch (MqttException e) {
e.printStackTrace();
}
}
}

发布消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class MainActivity extends AppCompatActivity {
/**
* 发布消息
*/
public void publishMessage(String msg) {
if (isConnected(mqttAndroidClient)) {
try {
// 包装 MqttMessage 消息体
MqttMessage message = new MqttMessage();
message.setPayload(msg.getBytes());

mqttAndroidClient.publish(topic, message, null, new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
// TODO 发布消息成功
}

@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
// TODO 发布消息失败
}
});
} catch (MqttException e) {
e.printStackTrace();
}
}
}
}
James wechat
欢迎您扫一扫上面的微信公众号,订阅我的博客!
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!