Vert.x MQTT Server

    要使用Vert.x MQTT服务器,增加以下依赖到构建描述符中

    • Maven (in your pom.xml):
    • Gradle (in your build.gradle file):
    1. compile io.vertx:vertx-mqtt-server:3.4.1

    开始

    这个例子展示了如何处理一个来自远程MQTT客户端的请求,首先,会创建一个实例和使用endpointHandler方法选定一个处理器用于处理远程客户端发送的CONNECT信息。
    实例,会被当做Handler的参数,它携带了所有主要的与CONNECT消息相关联的信息,例如客户端标识符,用户名/密码,”will”信息,清除session标志,协议版本和保活超时。
    在Handler内,endpoint实例提供accept方法以相应的CONNACK消息回应远程客户端;通过该方式,连接会被建立。最终,服务器通过方法以默认行为的行为(运行在localhost和默认MQTT端口1883)启动。存在一个相同的方法,允许选定一个Handler去检查是否服务器是否已经正常启动。

    1. MqttServer mqttServer = MqttServer.create(vertx);
    2. mqttServer.endpointHandler(endpoint -> {
    3. // 显示主要连接信息
    4. System.out.println("MQTT client [" + endpoint.clientIdentifier() + "] request to connect, clean session = " + endpoint.isCleanSession());
    5. if (endpoint.auth() != null) {
    6. System.out.println("[username = " + endpoint.auth().userName() + ", password = " + endpoint.auth().password() + "]");
    7. }
    8. if (endpoint.will() != null) {
    9. System.out.println("[will topic = " + endpoint.will().willTopic() + " msg = " + endpoint.will().willMessage() +
    10. " QoS = " + endpoint.will().willQos() + " isRetain = " + endpoint.will().isWillRetain() + "]");
    11. }
    12. System.out.println("[keep alive timeout = " + endpoint.keepAliveTimeSeconds() + "]");
    13. // 接受远程客户端连接
    14. endpoint.accept(false);
    15. .listen(ar -> {
    16. if (ar.succeeded()) {
    17. System.out.println("MQTT server is listening on port " + ar.result().actualPort());
    18. } else {
    19. System.out.println("Error on starting the server");
    20. ar.cause().printStackTrace();
    21. }
    22. });

    endpoint实例提供disconnectHandler用于选定一个handler当远程客户端发送DISCONNECT消息会被调用,该handler没有参数。

    1. endpoint.disconnectHandler(v -> {
    2. });

    使用SSL / TLS支持处理客户端连接/断开连接

    服务端支持通过SSL/TLS方式用来验证和加密客户端的连接请求。为了做到这一点,MqttServerOptions类提供了方法用来设置SSL/TLS的用法(传递true作为值)和一些其他提供了服务器验证和私钥(作为java键存储引用,PEM或PFX格式)。在以下例子,setKeyCertOptions方法被用来传递一个PEM格式的证书。该方法需要一个实现了接口的实例,在这种情况下PemKeyCertOptions类被用来提供提供服务器证书和对应与setKeyPath方法的私钥路径。MQTT服务器通常以传递一个Vert.x实例启动和以上的MQTT选项实例作为创建方法的参数。

    所有其他与处理端点连接和断开相关在没有SSL/TLS支持下使用相同方式管理。

    处理客户端订阅和退订请求

    在客户端和服务端的连接被建立后,客户端可以以指定的主题发送订阅消息。MqttEndpoint接口允许使用处理到来的订阅请求。
    这样的Handler接受一个MqttSubscribeMessage接口的实例,which brings the list of topics with related QoS levels as desired by the client.最终,端点实例提供方法以包含了授予QoS级别的相关的SUBACK消息回应客户端。

    1. endpoint.subscribeHandler(subscribe -> {
    2. List<MqttQoS> grantedQosLevels = new ArrayList<>();
    3. for (MqttTopicSubscription s: subscribe.topicSubscriptions()) {
    4. System.out.println("Subscription for " + s.topicName() + " with QoS " + s.qualityOfService());
    5. grantedQosLevels.add(s.qualityOfService());
    6. }
    7. // 确认订阅请求
    8. endpoint.subscribeAcknowledge(subscribe.messageId(), grantedQosLevels);
    9. });
    1. endpoint.unsubscribeHandler(unsubscribe -> {
    2. for (String t: unsubscribe.topics()) {
    3. System.out.println("Unsubscription for " + t);
    4. }
    5. // 确认订阅请求
    6. endpoint.unsubscribeAcknowledge(unsubscribe.messageId());

    为了处理远程客户端发布的消息,MqttEndpoint接口提供了方法用于选定一个handler,当客户端发送一个PUBLISH消息时会调用该handler。
    这个handler接受一个MqttPublishMessage接口的实例作为参数,with the payload, the QoS level, the duplicate and retain flags.

    假如QoS级别是0(最多一次),就没有必要给客户端响应。

    假如QoS级别是1(至少一次),端点需要通过方法回应一个PUBACK消息

    假如QoS级别是2(正好一次),端点需要使用publishReceived方法回应一个PUBREC消息。在这种情况下端点应该处理来自于客户端的PUBREL消息并且(当收到来资源端点的PUBREC消息,远程客户端发送就会发送它),可以通过方法实现。为了关闭QoS级别2传递,端点可以使用publishComplete方法用来发送PUBCOMP消息到客户端。

    1. endpoint.publishHandler(message -> {
    2. System.out.println("Just received message [" + message.payload().toString(Charset.defaultCharset()) + "] with QoS [" + message.qosLevel() + "]");
    3. if (message.qosLevel() == MqttQoS.AT_LEAST_ONCE) {
    4. endpoint.publishAcknowledge(message.messageId());
    5. } else if (message.qosLevel() == MqttQoS.EXACTLY_ONCE) {
    6. endpoint.publishRelease(message.messageId());
    7. }
    8. }).publishReleaseHandler(messageId -> {
    9. endpoint.publishComplete(messageId);
    10. });

    发布消息到客户端

    通过使用publish方法端点可以发布一个消息到远程客户端(发送一个PUBLISH消息),它使用以下入参,要发布的主题,负载,QoS级别,复制和保留标志。

    假如QoS级别是0(最多一次),端点不会收到任何客户端的响应。

    假如QoS级别是2(正好一次),端点需要去处理来自客户端的PUBREC消息。可以通过方法来完成该操作。
    在该Handler内,端点可以使用publishRelease方法响应PUBREL消息给客户端。最后一步是处理来自客户端的PUBCOMP消息;可以使用来指定一个handler当收到PUBCOMP消息时候调用。

    被客户端保活通知

    底层的MQTT保活机制是由服务器内部处理的。当接收到连接消息,服务器关注在该消息内的保活超时时间,用来检查客户端是否在该超市时间内没有发送任何消息。同时,对于接收到每个PINGREQ消息,服务器会以PINGRESP响应。

    即便对于高等级应用不需要处理它,接口依然提供了pingHandler方法用来选定一个handler,当收到来自客户端的PINGREQ消息会被调用。
    对于应用程序来说这只是一个通知,客户端并没有发送任何有意义的信息,只是一个用于检测保活的ping消息。在任何情况下,PINGRESP会被服务器自动发送。

    接口提供了close方法用于关闭服务器。它停止监听到来的连接和关闭所有远程客户端活跃的连接。该方法是一个异步方法并且有个重载方法提供了选定一个完成handler当服务器真正关闭时进行调用。

    1. mqttServer.closeFuture().onComplete{
    2. case Success(result) => println("Success")
    3. case Failure(cause) => println("Failure")
    4. }

    在verticles中自动清理

    假如你是在verticles中创建的MQTT服务器,当verticle取消部署这些服务器会被自动关闭。

    扩展:共享MQTT服务器

    与MQTT服务器相关的handler总是在event loop线程中执行。这意味着在一个多核系统中,仅有一个实例被部署,一个核被使用。为了使用更多的核,可以部署更多的MQTT服务器实例
    可以通过编程方式实现:

    1. for ( i <- 0 until 10) {
    2. var mqttServer = MqttServer.create(vertx)
    3. mqttServer.endpointHandler((endpoint: io.vertx.scala.mqtt.MqttEndpoint) => {
    4. // 处理端点
    5. }).listenFuture().onComplete{
    6. case Success(result) => println("Success")
    7. case Failure(cause) => println("Failure")
    8. }
    9. }
    1. var options = DeploymentOptions()
    2. .setInstances(10)
    3. vertx.deployVerticle("com.mycompany.MyVerticle", options)

    真正是这样的,仅有一个MQTT服务器被部署,但到来的连接会被Vert.x使用轮转算法分发到不同的连接handlers上,在不同的核上执行。