为啥使用RabbitMQ
之前项目使用MQ都是和物联网设备通讯,通讯协议都是基于业务,传输内容相对简单,现在需要对接一个系统,内容中有中文,再使用MQTT出现了中文乱码,晚上查询后发现需要修改源码,项目周期比较紧,没时间研究源码如何修改,因此决定使用RabbitMQ
整合
1、添加依赖,pom.xml文件中添加以下的内容
<!-- RabbitMQ -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2、添加配置application.properties中添加以下配置
spring.rabbitmq.host=192.168.0.20
spring.rabbitmq.port=5672
spring.rabbitmq.username=clien1
spring.rabbitmq.password=clien1
#spring.rabbitmq.host=RabbitMQ主机的IP或域名
#spring.rabbitmq.port=RabbitMQ的端口
#spring.rabbitmq.username=用户名
#spring.rabbitmq.password=密码
3.1.1、tocpic模式,新建TopicRabbitConfig类
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* Topic模式
* @Author: anshen
* @Date: 2023/8/11 18:04
*/
@Configuration
public class TopicRabbitConfig {
//绑定键
public static final String TOPIC_QUEUE = "topic.queue";
public static final String TOPIC_EXCHANGE = "topic.exchange";
@Bean
public Queue topicQueue() {
return new Queue(TOPIC_QUEUE, false);
}
@Bean
public TopicExchange topicExchange() {
return new TopicExchange(TOPIC_EXCHANGE, true, false);
}
@Bean
public Binding topicBinding() {
return BindingBuilder.bind(topicQueue()).to(topicExchange()).with("rabbitmq.message");
}
}
3.1.2、新建消费者类 TopicReceiver
import com.alibaba.fastjson2.JSONObject;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import lombok.extern.slf4j.Slf4j;
/**
* topic模式消费者
* @Author: anshen
* @Date: 2023/8/14 16:32
*/
@Component
@Slf4j
public class TopicReceiver {
// queues是指要监听的队列的名字
@RabbitListener(queues = TopicRabbitConfig.TOPIC_QUEUE)
public void receiveTopic(JSONObject obj) {
log.info("receiveTopic收到消息:" + obj.toString());
}
}
3.1.3、发送消息新建TopicSender类
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.config.RabbitMQTopicConfig;
import lombok.extern.slf4j.Slf4j;
/**
* topic模式生产者
* @Author: anshen
* @Date: 2023/8/14 17:10
*/
@Component
@Slf4j
public class TopicSender {
@Autowired
private AmqpTemplate amqpTemplate;
/**
* topic 模式
*/
public void sendTopic(JSONObject obj) {
log.info("sendTopic已发送消息");
// 第一个参数:TopicExchange名字
// 第二个参数:Route-Key
// 第三个参数:要发送的内容
this.amqpTemplate.convertAndSend(RabbitMQTopicConfig.TOPIC_EXCHANGE, "rabbitmq.message", obj);
}
}
3.2.1、Fanout模式,新建FanoutRabbitConfig类
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* Fanout模式
* @Author: anshen
* @Date: 2023/8/14 17:10
*/
@Configuration
public class FanoutRabbitConfig {
public static final String FANOUT_QUEUE = "fanout.queue";
public static final String FANOUT_EXCHANGE = "fanout.exchange";
@Bean
public Queue fanoutQueue() {
return new Queue(FANOUT_QUEUE);
}
/**
* Fanout模式 Fanout 就是我们熟悉的广播模式或者订阅模式,
* 给Fanout交换机发送消息,绑定了这个交换机的所有队列都收到这个消息。
*
* @return
*/
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange(FANOUT_EXCHANGE);
}
@Bean
public Binding fanoutBinding() {
return BindingBuilder.bind(fanoutQueue()).to(fanoutExchange());
}
3.2.2、新建消费者类FanoutReceiver
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
/**
* @Author: anshen
* @Date: 2023/8/14 17:46
*/
@Component
@Slf4j
public class FanoutReceiver {
// queues是指要监听的队列的名字
@RabbitListener(queues = FanoutRabbitConfig.FANOUT_QUEUE)
public void receiveFanout(JSONObject obj) {
log.info("receiveFanout收到消息" + obj.toString());
}
}
3.2.3、发送消息新建
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.config.RabbitMQFanoutConfig;
import lombok.extern.slf4j.Slf4j;
/**
* fanout模式生产者
* @Author: anshen
* @Date: 2023/8/11 18:04
*/
@Component
@Slf4j
public class FanoutSender {
@Autowired
private AmqpTemplate amqpTemplate;
public void sendFanout(JSONObject obj) {
log.info("sendFanout已发送消息");
// 这里的第2个参数为空。
// 因为fanout 交换器不处理路由键,只是简单的将队列绑定到交换器上,
// 每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上
this.amqpTemplate.convertAndSend(RabbitMQFanoutConfig.FANOUT_EXCHANGE, "", obj);
}
}
注意:本文归作者所有,未经作者允许,不得转载