just do it

emqtt使用心得

《一》、http鉴权认证

启动http认证插件以后emqtt默认的认证模块全部失效。
设置配置emqttd_auth_http.conf文件,关于http关于鉴权机制有两个类型的认证,分别为:super_req,auth_req。
请求建立连接时emqtt服务器会默认先发送sup_req,若sup_req被拒绝那么服务器会再次发送auth_req。

《二》、http访问规则acl

同样配置emqttd_auth_http.conf文件中的acl_req。acl_req只会发送一个请求,也就是第一次acl_req的请求被拒绝之后,emqtt服务器不会再进行其他acl的匹配。
如果emqttd_auth_http.conf中配置了super_req,那么每次进行acl认证的时候都会先向鉴权服务发送一个super_req的认证请求。

消费者:同一个client每次订阅一个topic都会发送一个acl请求。若消费者保持了session,并且订阅的Atopic,那么下次此消费者再次连接的时候默认订阅了Atopic而不用进行sub Atopic的请求。
生产者:在连接即(connection)未断开的情况下,生产者向同一个topic发送多次消息,只会进行一次acl的请求;生产者分别向多个topic发送多个消息时,有多少个topic就发送多少个acl请求。

如果鉴权时super的请求通过了,那么将不会进行acl的认证,emqtt服务器默认super用户可以任意pub和sub

《三》、emqtt的会话保持与保留消息设置

会话保持:设置setCleanSession(false);

组合一:消费者和生产者都未会话保持。
情景1:生产者先发送消息到emqtt,消费再上线订阅主题,消费者未接收到消息。
情景2:消费者先上线订阅主题,生产者再向主题发布消息,消费者接受到消息。

组合二:消费者未会话保持,生产者会话保持。
情景1:生产者先发送消息到emqtt,消费再上线订阅主题,消费者未接收到消息。
情景2:消费者先上线订阅主题,生产者再向主题发布消息,消费者接受到消息。

组合三:消费者会话保持,生产者未会话保持。
情景1:生产者先发送消息到emqtt,消费再上线订阅主题,消费者接收到消息。(消费者已经会话保持并且订阅了一次主题后。)
情景2:消费者先上线订阅主题,生产者再向主题发布消息,消费者接受到消息。

组合四:消费者会话保持,生产者会话保持。
情景1:生产者先发送消息到emqtt,消费再上线订阅主题,消费者接收到消息。
情景2:消费者先上线订阅主题,生产者再向主题发布消息,消费者接受到消息。
情景3:消费者未上线,生产者向主题发布消息,emqtt服务器宕机。消费者未接受到消息。且此时之前已经持久化的客户端、topic、session、sub全部丢失。

【结论】 设置setCleanSession(false),仅当emqtt服务器未宕机时,进行客户端、topic、session、sub的保持,且此会话保持并不进行消息的持久化。

设置发送保留消息,即再publish的时候设置retain参数为true。
组合一:消费者和生产者都未会话保持。
情景1:生产者先发送消息到emqtt,消费再上线订阅主题,消费者接收到消息。
情景2:消费者先上线订阅主题,生产者再向主题发布消息,消费者接受到消息。
情景3: 消费者还未上线订阅主题,生产者发送完消息后服务器宕机,消费者接受到消息。
情景4: 同一clientid的消费者反复下线再上线。每一次上线此消费者接受到消息。

组合二:消费者未持久化,生产者会话保持。
情景1:生产者先发送消息到emqtt,消费再上线订阅主题,消费者接收到消息。
情景2:消费者先上线订阅主题,生产者再向主题发布消息,消费者接受到消息。
情景3: 消费者还未上线订阅主题,生产者发送完消息后服务器宕机,消费者接受到消息。
情景4: 同一clientid的消费者反复下线再上线。每一次上线此消费者接受到消息。

组合三:消费者持久化,生产者未会话保持。
情景1:生产者先发送消息到emqtt,消费再上线订阅主题,消费者接收到消息。
情景2:消费者先上线订阅主题,生产者再向主题发布消息,消费者接受到消息。
情景3: 消费者还未上线订阅主题,生产者发送完消息后服务器宕机,消费者接受到消息。
情景4: 同一clientid的消费者反复下线再上线。此消费者只会接受到一次消息。

组合四:消费者会话保持,生产者会话保持。
情景1:生产者先发送消息到emqtt,消费再上线订阅主题,消费者接收到消息。
情景2:消费者先上线订阅主题,生产者再向主题发布消息,消费者接受到消息。
情景3: 消费者还未上线订阅主题,生产者发送完消息后服务器宕机,消费者接受到消息。
情景4: 同一clientid的消费者反复下线再上线。此消费者只会接受到一次消息。

测试在向一个主题发送了一个retain消息后,再发送一条不同的retain消息,那么消费者将接收到最后发送的那条retain消息。
【结论】 保留消息可以进行消息持久化,与客户端、topic、session、sub是否保持无关,且服务器宕机之后此消息依然存在。
【重要】1、一个topic中只能同时存在一条retain消息,即后发送的retain消息会覆盖之前的retain消息。
2、若消费者没有进行会话保持操作,那么每次此消费者重新上线都会接受到该retain消息,可以认
为未进行会话保持的消费者每次上线都是一个新的消费者尽管此消费者还保持着同一个clientid。
若消费者已经进行了会话保持,那么此消费者只会在没有接受到retain消息时才进行消息的接受,也就是说,
会话保持的消费者只接受一次retain消息,无论此消费者反复上线多少次,除非生产者又发布了一条retain消息。
3、清除retain消息,向此topic中发送一条为空的retain消息就能将之前的retain消息清除掉,此处注意空消息不是null,而是””。

《四》、订阅消息主题

订阅消息时可以用+、#通配符进行订阅主题匹配如:
‘+’: 表示通配一个层级,例如a/+,匹配a/x, a/y

‘#’: 表示通配多个层级,例如a/#,匹配a/x, a/b/c/d
订阅者订阅的主题为a/+或者a/#订阅者的acl请求只会发一次。

《五》、系统主题的订阅

由于启动了http认证插件,因此我们可以通过web服务来控制具体哪个client可以订阅系统主题。

一、系统主题clients上线下线状态
上线主题:$SYS/brokers/${node}(注:emqtt服务的节点名称)/clients/${clientid}(注:需要监测状态的客户端唯一标识)/connected
消息为json格式:
{“clientid”:”\u7cfb\u7edf\u4e3b\u9898\u8ba2\u9605\u8005″,”username”:”\u7cfb\u7edf\u4e3b\u9898\u8ba2\u9605\u8005″,”ipaddress”:”192.168.2.117″,”session”:true,”protocol”:3,”connack”:0,”ts”:1476777074}
参数说明clientid 当前连接的客户端id,username 当前用户名,ipadddress 客户端地址,session 布尔值,true标识会话保持,false为会话不保持, protocol mqtt协议版本,connack ?,ts 建立连接的时间戳。
下线主题:$SYS/brokers/${node}(注:emqtt服务的节点名称)/clients/${clientid}(注:需要监测状态的客户端唯一标识)/disconnected
消息为json格式例如:{“clientid”:”\u7cfb\u7edf\u4e3b\u9898\u8ba2\u9605\u8005″,”reason”:”closed”,”ts”:1476775914}
【注意】:当客户端连接时正常断开时(发送disconnect命令)reason的结果值是normal;当客户端异常断开连接时reason的结果是colsed。

《六》、消费者接受消息的顺序

1、消费者只订阅一个绝对的topic而不使用通配符进行订阅,遵循消息先进先出原则。
2、消费者通过通配符进行消息的订阅,依然遵循先进先出原则。例如消费者订阅了a/#的topic,生产者发送消息的方式如下代码块:

for(int i=0;i<1000;i++){
//发布消息
callbackConnection.publish("alarm/ipc"+i,
("我是ipc"+i).getBytes(), QoS.AT_LEAST_ONCE, false, new Callback<Void>() {
public void onSuccess(Void v) {
System.out.println("===========消息发布成功============");
}
public void onFailure(Throwable value) {
System.out.println("========消息发布失败=======");
callbackConnection.disconnect(null);
}
});
}
for(int i=0;i<10;i++){
//发布消息
callbackConnection.publish("alarm/ipc"+i,
("我是ipc"+i).getBytes(), QoS.AT_LEAST_ONCE, false, new Callback<Void>() {
public void onSuccess(Void v) {
System.out.println("===========消息发布成功============");
}
public void onFailure(Throwable value) {
System.out.println("========消息发布失败=======");
callbackConnection.disconnect(null);
}
});
}

那么消费者接受消息的顺序是“我是ipc0”……“我是ipc999”、“我是ipc0”……“我是ipc9”。

《七》、emqttd 服务器命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 守护进程启动emqttd
./bin/emqttd start
 
# 调试启动emqttd
./bin/emqttd console
 
# 检查运行状态
./bin/emqttd_ctl status
 
# 停止emqttd
./bin/emqttd stop
 
#创建admin账户:
./bin/emqttd_ctl admins add admin public
 
#重置admin账户密码:
./bin/emqttd_ctl admins passwd admin public123
 
#删除admin账户:
./bin/emqttd_ctl admins del admin

rpm安装

1
2
3
4
5
6
7
8
9
10
# 安装
rpm -ivh emqttd-centos7-v2.1.2-1.el7.centos.x86_64.rpm
# 启动
service emqttd start
#或
emqttd start
# 停止
service emqttd stop
#或
emqttd stop

《八》、mqtt消息QoS

MQTT发布消息QoS保证不是端到端的,是客户端与服务器之间的。订阅者收到MQTT消息的QoS级别,最终取决于发布消息的QoS和主题订阅的QoS。

发布消息的QoS 主题订阅的QoS 接收消息的QoS
0 0 0
0 1 0
0 2 0
1 0 0
1 1 1
1 2 1
2 0 0
2 1 1
2 2 2
点赞