# 功能描述
支持对mqtt协议的存取。
配置文件为application-mqtt.yml
提示
(1) 需在application.yml中增加spring.profiles.active中增加mqtt才能激活配置文件
(2) 需设置application-mqtt.yml中的sei.cloud.mqtt.enabled为true才能开启mqtt服务
# 配置一览表
sei:
cloud:
mqtt:
enabled: false #是否开启mqtt
server:
ip: "tcp://127.0.0.1:1883"
username: ""
password: ""
client:
client-id: "clientId002"
keep-session: true #是否保留会话
keep-alive-interval: 60 #设置长连接最大时长(秒),默认为60秒
automatic-reconnect: true #自动断线重连
clean-session: false #不清除会话而保持上次连接会话,设置为false后才能断线重连后接收离线消息
max-inflight: 10 #允许同时发送多少条消息(QOS1未收到PUBACK或QOS2未收到PUBCOMP的消息)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# Service接口
# 第一步 引入包
<dependency>
<groupId>sei-cloud</groupId>
<artifactId>mqtt</artifactId>
</dependency>
1
2
3
4
2
3
4
# 第二步 引入接口
@Resource
MqttService mqttService;
1
2
2
# 第三步 使用接口
public interface MqttService {
/**
* 获得客户端ID
* @return String
*/
String getClientID();
/**
* 获得mqtt连接
* @return MqttClient
* @throws MqttException 异常
*/
MqttClient getMqttClient() throws MqttException;
/**
* 发布消息一条消息
* @param topic: 主题
* @param message: 消息
* @throws MqttException 异常
*/
void publish(String topic, String message) throws MqttException;
/**
* 发布消息一条消息
* @param topic: 主题
* @param message: 消息
* @throws MqttException 异常
*/
void publish(String topic, MqttMessage message) throws MqttException;
/**
* 发布消息一条消息
* @param topic: 主题
* @param qos: qos
* @param retained: false:只能拿连接后发布的消息(连接之前的信息会丢失),建议设置为true
* @param message: 消息
* @throws MqttException 异常
*/
void publish(String topic, int qos, boolean retained, String message) throws MqttException;
/**
* 发布消息一条消息
* @param topic: 主题
* @param payload: 消息
* @param qos: qos
* @param retained: false:只能拿连接后发布的消息(连接之前的信息会丢失),建议设置为true
* @throws MqttException 异常
*/
void publish(String topic, byte[] payload, int qos, boolean retained) throws MqttException;
/**
* 订阅一个主题并设置消息解读类
* @param topic: 主题
* @param messageListener: 消息解读类
* @throws MqttException 异常
* 例子:
* subscribe("FIRE/CQ-BS/13/#", new IMqttMessageListener() {
* public void messageArrived(String topic, MqttMessage message) throws Exception {
* System.out.println("接收消息主题 : " + topic);
* System.out.println("接收消息Qos : " + message.getQos());
* System.out.println("接收消息内容 : " + new String(message.getPayload(),"UTF-8"));
* }
* });
*/
void subscribe(String topic, IMqttMessageListener messageListener) throws MqttException;
/**
* 订阅多个主题, qos默认为1, 执行自己的消息解读类
* @param topics: 主题
* @param messageListener: 消息解读类
* @throws MqttException 异常
*/
void subscribe(String[] topics, IMqttMessageListener[] messageListener) throws MqttException;
/**
* 订阅某个主题, 自定义解析器
* @param topic: 主题
* @param qos: qos
* @param messageListener: IMqttMessageListener类型解析器
* @throws MqttException 异常
*/
void subscribe(String topic, int qos, IMqttMessageListener messageListener) throws MqttException;
/**
* 订阅多个主题并设置每个主题的qos和解析器
* @param topic: 主题
* @param qos: qos
* @param messageListener: IMqttMessageListener类型解析器
* @throws MqttException 异常
*/
void subscribe(String[] topic, int[] qos, IMqttMessageListener[] messageListener) throws MqttException;
/**
* 取消订阅某个主题
* @param topic: 主题
* @throws MqttException 异常
*/
void unsubscribe(String topic) throws MqttException;
/**
* 取消订阅多个个主题
* @param topic: 主题
* @throws MqttException 异常
*/
void unsubscribe(String[] topic) throws MqttException;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107