SpringBoot整合MQTT

      最后更新:2022-08-02 19:31:08 手机定位技术交流文章

      MQTT协议

      • MQTT(Message Queuing Telemetry Transport,消息队列遥感传输协议),是一种基于发布/订阅(publish/subscribe)模式的“轻量级”通讯协议,该协议建立在TCP/IP协议上,它于199年由IBM发行。
        MQTT最大的优点是,它提供可靠的实时消息服务以连接最小代码和有限的带宽的远程设备。
        作为一种低成本、低宽带的即时通讯协议,它在互联网、小设备、移动应用等领域有着广泛的应用。

        MQTT协议特点

      • MQTT是一个基于客户端服务器的消息发布/订阅传输协议。

      • MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。在很多情况下,包括受限的环境中,如:机器与机器(M2M)通信和物联网(IoT)。

      MQTT Client

      • publisher 和 subscriber 都属于 MQTT Client,出版商和订阅者的想法,事实上,这是一个相对的概念,这意味着当前客户端正在发布或接收消息,发布和订阅功能也可以由同一MQTT客户端实现。MQTT客户端是任何运行MQTT库的设备,通过网络连接到MQTT代理(从微控制器到成熟服务器)。例如,MQTT客户端可以是一个非常小的资源有限设备,它通过无线网络连接,并且有一个最低的图书馆。基本上,任何使用 TCP/IP 协议使用 MQTT 设备的都可以称之为 MQTT Client。MQTT协议的客户端实现非常简单和直接,易于实现是MQTT非常适合小型设备的原因之一。MQTT客户端库可以在许多编程语言中使用。例如,Android, Arduino, C, C++, C#, Go, iOS, Java, JavaScript和.NET。

      MQTT Broker

      • 与 MQTT Client 对应的就是 MQTT Broker,经纪人是任何出版/订阅协议的核心,根据实现的不同,代理人可以处理数百万MQTT客户端连接.经纪人负责接收所有信息,过滤消息,确定每个消息的客户端是哪种,并向相应的客户发送消息,经纪人也负责存储对话数据,这些数据包括订阅和错过消息。经纪人也负责客户认证和授权。

      MQTT Connection

      • MQTT 协议基于 TCP/IP。客户端和代理都需要有一个 TCP/IP 协议支持。 MQTT 连接始终位于

      准备工作(下载经纪人服务点,相关客户工具)

      • 服务端工具:
        • mosquitto https://mosquitto.org/download/
      • 客户端工具:
        • MQTTX https://mqttx.app/zh#download

      创建一个SpringBoot项目(仅在这里自己建立它)

      • 修改 thepom.xml,增加对相关 mqtt 的依赖性

        <?xml version="1.0" encoding="UTF-8"?>


        4.0.0

        org.springframework.boot
        spring-boot-starter-parent
        2.1.17.RELEASE


        com.huawen
        mqtt-demo
        0.0.1
        mqtt-demo
        Boot with MQTT Demo

        <java.version>1.8</java.version>



        org.springframework.boot
        spring-boot-starter

      • 自定义yml配置

        spring:
        application:
        name: MQTT-DEMO
        server:
        port: 8989
        #mqtt properties
        mqtt:
        #uris可以是多个,所以这是一个群
        uris:
        - tcp://127.0.0.1:1883
        clientId: mqtt_test1
        topics:
        - demo
        - test
        username: admin
        password: 123456
        timeout: 30
        keepalive: 60
        qos: 1

      • 添加配置配置读取yml文件(使用Lombok需要添加pom依赖)

        package com.huawen.mqtt.config;

        import lombok.Data;
        import org.springframework.boot.context.properties.ConfigurationProperties;
        import org.springframework.stereotype.Component;

        /**

        • @author:xjl

        • @date:2022/5/5 17:27

        • @Description: MQTT的配置类
          **/
          @Component
          @ConfigurationProperties(prefix = “mqtt”)
          @Data
          public class MqttConfiguration {

          /**

          • uris服务器地址配置
            */
            private String[] uris;

          /**

          • clientId
            */
            private String clientId;

          /**

          • 话题
            */
            private String[] topics;

          /**

          • 用户名
            */
            private String username;

          /**

          • 密码
            */
            private String password;

          /**

          • 连接超时时长
            */
            private Integer timeout;

          /**

          • 保持生命时间
            */
            private Integer keepalive;

          /**

          • 遗嘱消息 QoS
            */
            private Integer qos;
            }
      • 消费者配置

        package com.huawen.mqtt.config;

        import lombok.extern.slf4j.Slf4j;
        import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
        import org.springframework.context.annotation.Bean;
        import org.springframework.context.annotation.Configuration;
        import org.springframework.integration.annotation.ServiceActivator;
        import org.springframework.integration.channel.DirectChannel;
        import org.springframework.integration.core.MessageProducer;
        import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
        import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
        import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
        import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
        import org.springframework.integration.mqtt.support.MqttHeaders;
        import org.springframework.messaging.MessageChannel;
        import org.springframework.messaging.MessageHandler;

        import javax.annotation.Resource;

        /**

        • @author:xjl

        • @date:2022/5/6 9:06

        • @Description: MQTT 消费端的配置
          **/
          @Configuration
          @Slf4j
          public class MqttInBoundConfiguration {
          @Resource
          private MqttConfiguration mqttProperties;

          //==================================== 消费消息==========================================//

          /**

          • 入站通道
          • 返回消息通道对象 {@link MessageChannel}
            */
            @Bean(“input”)
            public MessageChannel mqttInputChannel() {
            //直连通道
            return new DirectChannel();
            }

          /**

          • 创建MqttPahoClientFactory设置MQTT经纪人的连接属性如果使用SSL认证,此设置也需要
          • @return MQTT客户端工厂 {@link MqttPahoClientFactory}
            */
            @Bean
            public MqttPahoClientFactory inClientFactory() {
            //设置连接属性
            DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
            MqttConnectOptions options = new MqttConnectOptions();
            options.setServerURIs(mqttProperties.getUris());
            options.setUserName(mqttProperties.getUsername());
            options.setPassword(mqttProperties.getPassword().toCharArray());
            options.setConnectionTimeout(mqttProperties.getTimeout());
            options.setKeepAliveInterval(mqttProperties.getKeepalive());
            //接收一个非线性消息,该消息告诉代理客户端是否创建一个持续的会议错误创建一个持续的会议
            options.setCleanSession(false);
            //切断后重新设置连接
            options.setAutomaticReconnect(true);
            factory.setConnectionOptions(options);
            return factory;
            }

          /**

          • 入站
          • 返回消息提供者{@link MessageProducer}
            */
            @Bean
            public MessageProducer producer() {
            // Paho客户端消息驱动通道适配器,主要用来订阅主题 对inboundTopics主题进行监听
            //clientId添加后缀或报告重试不能重复
            MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getClientId()+“_customer”, inClientFactory(), mqttProperties.getTopics());
            adapter.setCompletionTimeout(5000);
            //Paho消息转换器
            DefaultPahoMessageConverter defaultPahoMessageConverter = new DefaultPahoMessageConverter();
            //按字符接收消息
            // defaultPahoMessageConverter.setPayloadAsBytes(true);
            adapter.setConverter(defaultPahoMessageConverter);
            // 设置QoS
            adapter.setQos(mqttProperties.getQos());
            adapter.setOutputChannel(mqttInputChannel());
            return adapter;
            }

          /**

          • 通过通道获取数据
          • ServiceActivator注解表明:当前方法用于处理MQTT消息,inputChannel参数指定了用于消费消息的channel。
          • tips:
          • 异步处理
          • 返回消息处理 {@link MessageHandler}
            */
            @Bean
            @ServiceActivator(inputChannel = “input”)
            public MessageHandler handler() {
            return message -> {
            log.info(“收到的完整消息为—>{}”, message);
            log.info(“----------------------”);
            log.info(“message:” + message.getPayload());
            log.info(“Id:” + message.getHeaders().getId());
            log.info(“receivedQos:” + message.getHeaders().get(MqttHeaders.RECEIVED_QOS));
            String topic = (String) message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC);
            log.info(“topic:” + topic);
            log.info(“----------------------”);
            };
            }
            }
      • 生产者配置

        package com.huawen.mqtt.config;

        import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
        import org.springframework.context.annotation.Bean;
        import org.springframework.context.annotation.Configuration;
        import org.springframework.integration.annotation.ServiceActivator;
        import org.springframework.integration.channel.DirectChannel;
        import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
        import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
        import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
        import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
        import org.springframework.messaging.MessageChannel;
        import org.springframework.messaging.MessageHandler;

        import javax.annotation.Resource;

        /**

        • @author:xjl

        • @date:2022/5/6 8:49

        • @Description: MQTT 生产端的配置
          **/
          @Configuration
          public class MqttOutBoundConfiguration {
          @Resource
          private MqttConfiguration mqttProperties;

          //==================================== 发送消息==========================================//

          /**

          • 出站通道
          • 返回消息通道对象 {@link MessageChannel}
            */
            @Bean(“out”)
            public MessageChannel mqttOutBoundChannel() {
            //直连通道
            return new DirectChannel();
            }

          /**

          • 创建MqttPahoClientFactory设置MQTT经纪人的连接属性如果使用SSL认证,此设置也需要
          • @return MQTT客户端工厂 {@link MqttPahoClientFactory}
            */
            @Bean
            public MqttPahoClientFactory outClientFactory() {
            DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
            String[] uris = mqttProperties.getUris();
            MqttConnectOptions options = new MqttConnectOptions();
            options.setServerURIs(uris);
            options.setUserName(mqttProperties.getUsername());
            options.setPassword(mqttProperties.getPassword().toCharArray());
            options.setConnectionTimeout(mqttProperties.getTimeout());
            options.setKeepAliveInterval(mqttProperties.getKeepalive());
            //接收一个非线性消息,该消息告诉代理客户端是否创建一个持续的会议错误创建一个持续的会议
            options.setCleanSession(false);
            //切断后重新设置连接
            options.setAutomaticReconnect(true);
            factory.setConnectionOptions(options);
            return factory;
            }

          /**

          • 出站
          • 返回消息处理 {@link MessageHandler}
            */
            @Bean
            @ServiceActivator(inputChannel = “out”)
            public MessageHandler mqttOutbound() {
            //发送消息和消费者消息Channel可以使用相同的MqttPahoClientFactory
            //clientId添加后缀或报告重试不能重复
            MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttProperties.getClientId() + “_producer”, outClientFactory());
            // 如果设置为 true, 即非同步, 发送时消息不会被阻塞.
            messageHandler.setAsync(true);
            //设置默认QoS
            messageHandler.setDefaultQos(mqttProperties.getQos());
            //Paho消息转换器
            DefaultPahoMessageConverter defaultPahoMessageConverter = new DefaultPahoMessageConverter();
            //按默认字符类型发送消息
            // defaultPahoMessageConverter.setPayloadAsBytes(true);
            messageHandler.setConverter(defaultPahoMessageConverter);
            return messageHandler;
            }
            }
      • 创建发送数据的通用接口

        package com.huawen.mqtt.inter;

        import org.springframework.integration.annotation.MessagingGateway;
        import org.springframework.integration.mqtt.support.MqttHeaders;
        import org.springframework.messaging.handler.annotation.Header;

        /**

        • @author:xjl

        • @date:2022/5/6 9:20

        • @Description: 接口MqttGateway
          /
          @MessagingGateway(defaultRequestChannel = “out”)
          public interface MqttGateway {
          /

          • 定义发送消息的重载方法
          • @param加载
            */
            void sendToMqtt(String payload);

          /**

          • 向指定主题发送消息
          • @param topic topic话题
          • @param加载
            */
            void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);

          /**

          • 指定发送消息的主题和qos
          • @param topic topic话题
          • @param qos qos
          • @param加载 (字符串类型)
            */
            void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);

          /**

          • 指定发送消息的主题和qos
          • @param topic topic话题
          • @param qos qos
          • @param加载 (字节数组类型)
            */
            void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, byte[] payload);
            }
      • 制造商测试控制器

        package com.huawen.mqtt.controller;

        import com.huawen.mqtt.bean.MyMessage;
        import com.huawen.mqtt.inter.MqttGateway;
        import org.springframework.web.bind.annotation.PostMapping;
        import org.springframework.web.bind.annotation.RequestBody;
        import org.springframework.web.bind.annotation.RestController;

        import javax.annotation.Resource;

        /**

        • @author:xjl

        • @date:2022/5/6 9:17

        • @Description: mqtt发布消息controller
          **/
          @RestController
          public class MqttPublishController {
          @Resource
          private MqttGateway mqttGateWay;

          @PostMapping(“/send”)
          public String send(@RequestBody MyMessage myMessage) {
          //向指定对象发送消息
          mqttGateWay.sendToMqtt(myMessage.getTopic(), 1, myMessage.getContent());
          return "send topic: " + myMessage.getTopic() + ", message : " + myMessage.getContent();
          }
          }

      来源地址: https://github.Kyrie XJL/MQTT_Demo

      先自我介绍一下,他高中毕业了13年,曾经在小公司待过,去过华为OPPO等大厂,18年进入阿里,直到现在。了解大多数年轻的Java工程师,想要升技能,经常需要找到自己的成长或向班上汇报。但对于培训机构来说,学费大约是人民币,着实压力不小。当你不在系统时,自我学习的效率很低,而且很持久。也很容易停止天花板技术。所以我为你收集了一个"java开发工具"初衷也很简单,这是一个想帮助自己学习的朋友,却不知道该从哪里学习。同时减少每个人的负担.添加下方名片,你可以得到完整的学习信息

      本文由 在线网速测试 整理编辑,转载请注明出处,原文链接:https://www.wangsu123.cn/news/31334.html

          热门文章

          文章分类