Springboot整合Rabbitmq
安装erlang环境和rabbitmq
注意版本对应
在安装完rabbitMq后,http://ip:15672/ ,是可以看到一个简单后台管理界面。
可以手动创建虚拟host,创建用户,分配权限,创建交换机,创建队列等等,还有查看队列消息,消费效率,推送效率等等。
springboot项目代码
pom,依赖包
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<version>1.5.10.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<version>1.5.10.RELEASE</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>1.5.10.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-thymeleaf</artifactId>
<version>1.5.10.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>1.5.10.RELEASE</version>
</dependency>
</dependencies>
application.yml
server:
port: 8021
spring:
#给项目来个名字
application:
name: rabbitmq
#配置rabbitMq 服务器
rabbitmq:
host: localhost
port: 5672
username: admin
password: admin
virtual-host: nskhost
两个传递信息用的po
package boot.spring.po;
import java.io.Serializable;
public class Mail implements Serializable {
private static final long serialVersionUID = -8140693840257585779L;
private String mailId;
private String country;
private Double weight;
public Mail() {
}
public Mail(String mailId, String country, double weight) {
this.mailId = mailId;
this.country = country;
this.weight = weight;
}
public String getMailId() {
return mailId;
}
public void setMailId(String mailId) {
this.mailId = mailId;
}
public String getCountry() {
return country;
}
public void setCountry(String country) {
this.country = country;
}
public double getWeight() {
return weight;
}
public void setWeight(double weight) {
this.weight = weight;
}
@Override
public String toString() {
return "Mail \[mailId=" + mailId + ", country=" + country + ", weight="
+ weight + "\]";
}
}
```java
package boot.spring.po;
public class TopicMail extends Mail {
String routingkey;
public String getRoutingkey() {
return routingkey;
}
public void setRoutingkey(String routingkey) {
this.routingkey = routingkey;
}
@Override
public String toString() {
return super.toString() + "TopicMail \[routingkey=" + routingkey + "\]";
}
}
直连交换机模式。根据消息携带的路由键将消息投递给对应队列
直连交换机配置类
直连交换机。根据消息携带的路由键将消息投递给对应队列
package boot.spring.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
//direct直连模式的交换机配置,包括一个direct交换机,两个队列,三根网线binding
/**
* 直连交换机。根据消息携带的路由键将消息投递给对应队列
*/
@Configuration
public class DirectExchangeConfig {
/**
* 创建直连交换机
* @return
*/
@Bean
public DirectExchange directExchange() {
DirectExchange directExchange = new DirectExchange("direct");
return directExchange;
}
@Bean
public Queue directQueue1() {
Queue queue = new Queue("directqueue1");
return queue;
}
@Bean
public Queue directQueue2() {
Queue queue = new Queue("directqueue2");
return queue;
}
//3个binding将交换机和相应队列连起来
@Bean
public Binding bindingorange() {
//绑定队列1到交换机,绑定键为orange
Binding binding = BindingBuilder.bind(directQueue1()).to(directExchange()).with("orange");
return binding;
}
@Bean
public Binding bindingblack() {
Binding binding = BindingBuilder.bind(directQueue2()).to(directExchange()).with("black");
return binding;
}
@Bean
public Binding bindinggreen() {
Binding binding = BindingBuilder.bind(directQueue2()).to(directExchange()).with("green");
return binding;
}
}
直连交换机请求方法
@RequestMapping(value = "/direct", produces = {"application/json;charset=UTF-8"})
@ResponseBody
public void direct(@ModelAttribute("mail")TopicMail mail){
System.out.println(mail);
Mail m=new Mail(mail.getMailId(),mail.getCountry(),mail.getWeight());
publisher.senddirectMail(m,mail.getRoutingkey());
}
直连交换机发布服务方法
@Autowired
RabbitTemplate rabbitTemplate;
public void senddirectMail(Mail mail,String routingkey){
rabbitTemplate.convertAndSend("direct",routingkey,mail);
}
直连交换机消费方法
@RabbitListener(queues = "directqueue1")
public void displayMail1(Mail mail)throws Exception{
System.out.println("directqueue1队列监听器1号收到消息"+mail.toString());
}
@RabbitListener(queues = "directqueue2")
public void displayMail2(Mail mail)throws Exception{
System.out.println("directqueue2队列监听器2号收到消息"+mail.toString());
}
生产者消费者模式
只有消费者指定这个队列才能获取到,不需要key,订阅者按顺序循环获取消息
配置类
package boot.spring.config;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
//生产者消费者模式的配置,包括一个队列和两个对应的消费者
/**
* 只有指定这个队列才能获取到,不需要key,订阅者按顺序循环获取消息
*/
@Configuration
public class ProducerConsumerConfig {
@Bean
public Queue myQueue() {
Queue queue = new Queue("myqueue");
return queue;
}
}
请求方法
@Autowired
ProducerImpl producer;
@RequestMapping(value = "/produce", produces = {"application/json;charset=UTF-8"})
@ResponseBody
public void produce(@ModelAttribute("mail")Mail mail)throws Exception{
System.out.println(mail);
producer.sendMail("myqueue",mail);
}
发布服务方法
@Autowired
RabbitTemplate rabbitTemplate;
public void sendMail(String queue,Mail mail){
rabbitTemplate.setQueue(queue);
rabbitTemplate.convertAndSend(queue,mail);
}
消费方法
@RabbitListener(queues = "myqueue")
public void displayMail(Mail mail)throws Exception{
System.out.println("队列监听器1号收到消息"+mail.toString());
}
@RabbitListener(queues = "myqueue")
public void displayMail2(Mail mail)throws Exception{
System.out.println("队列监听器3号收到消息"+mail.toString());
}
@RabbitListener(queues = "myqueue")
public void displayMail(Mail mail)throws Exception{
System.out.println("队列监听器2号收到消息"+mail.toString());
}
发布订阅模式
发布订阅模式的配置,包括两个队列和对应的订阅者,发布者的交换机类型使用fanout(子网广播),两根网线binding用来绑定队列到交换机
配置类
package boot.spring.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
//发布订阅模式的配置,包括两个队列和对应的订阅者,
// 发布者的交换机类型使用fanout(子网广播),两根网线binding用来绑定队列到交换机
/**
* 广播交换机。所有队列都可获取消息
*/
@Configuration
public class PublishSubscribeConfig {
@Bean
public Queue myQueue1() {
Queue queue = new Queue("queue1");
return queue;
}
@Bean
public Queue myQueue2() {
Queue queue = new Queue("queue2");
return queue;
}
@Bean
public FanoutExchange fanoutExchange() {
FanoutExchange fanoutExchange = new FanoutExchange("fanout");
return fanoutExchange;
}
@Bean
public Binding binding1() {
Binding binding = BindingBuilder.bind(myQueue1()).to(fanoutExchange());
return binding;
}
@Bean
public Binding binding2() {
Binding binding = BindingBuilder.bind(myQueue2()).to(fanoutExchange());
return binding;
}
}
请求方法
@Autowired
PublishImpl publisher;
@RequestMapping(value = "/topic", produces = {"application/json;charset=UTF-8"})
@ResponseBody
public void topic(@ModelAttribute("mail")Mail mail)throws Exception{
System.out.println(mail);
publisher.publishMail(mail);
}
发布服务方法
@Autowired
RabbitTemplate rabbitTemplate;
public void publishMail(Mail mail){
rabbitTemplate.convertAndSend("fanout","",mail);
}
消费方法
@RabbitListener(queues = "queue1")
public void subscribe(Mail mail)throws IOException{
System.out.println("订阅者1收到消息"+mail.toString());
}
@RabbitListener(queues = "queue2")
public void subscribe(Mail mail)throws IOException{
System.out.println("订阅者2收到消息"+mail.toString());
}
topic交换机模型
主题交换机,这个交换机其实跟直连交换机流程差不多,但是它的特点就是在它的路由键和绑定键之间是有规则的。 简单地介绍下规则: * (星号) 用来表示一个单词 (必须出现的) # (井号) 用来表示任意数量(零个或多个)单词 通配的绑定键是跟队列进行绑定的,举个小例子 队列Q1 绑定键为 .TT. 队列Q2绑定键为 TT.# 如果一条消息携带的路由键为 A.TT.B,那么队列Q1将会收到; 如果一条消息携带的路由键为TT.AA.BB,那么队列Q2将会收到; 主题交换机是非常强大的,为啥这么膨胀? 当一个队列的绑定键为 “#”(井号) 的时候,这个队列将会无视消息的路由键,接收所有的消息。 当 * (星号) 和 # (井号) 这两个特殊字符都未在绑定键中出现的时候,此时主题交换机就拥有的直连交换机的行为。 所以主题交换机也就实现了扇形交换机的功能,和直连交换机的功能。
配置类
package boot.spring.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
//topic交换机模型,需要一个topic交换机,两个队列和三个binding
/**
* 主题交换机,这个交换机其实跟直连交换机流程差不多,但是它的特点就是在它的路由键和绑定键之间是有规则的。
* 简单地介绍下规则:
*
* * (星号) 用来表示一个单词 (必须出现的)
* # (井号) 用来表示任意数量(零个或多个)单词
* 通配的绑定键是跟队列进行绑定的,举个小例子
* 队列Q1 绑定键为 *.TT.* 队列Q2绑定键为 TT.#
* 如果一条消息携带的路由键为 A.TT.B,那么队列Q1将会收到;
* 如果一条消息携带的路由键为TT.AA.BB,那么队列Q2将会收到;
*
* 主题交换机是非常强大的,为啥这么膨胀?
* 当一个队列的绑定键为 "#"(井号) 的时候,这个队列将会无视消息的路由键,接收所有的消息。
* 当 * (星号) 和 # (井号) 这两个特殊字符都未在绑定键中出现的时候,此时主题交换机就拥有的直连交换机的行为。
* 所以主题交换机也就实现了扇形交换机的功能,和直连交换机的功能。
*/
@Configuration
public class TopicExchangeConfig {
@Bean
public TopicExchange topicExchange() {
TopicExchange topicExchange = new TopicExchange("mytopic");
return topicExchange;
}
@Bean
public Queue topicQueue1() {
Queue queue = new Queue("topicqueue1");
return queue;
}
@Bean
public Queue topicQueue2() {
Queue queue = new Queue("topicqueue2");
return queue;
}
//3个binding将交换机和相应队列连起来
@Bean
public Binding bindingtopic1() {
Binding binding = BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("*.orange.*");//binding key
return binding;
}
@Bean
public Binding bindingtopic2() {
Binding binding = BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("*.*.rabbit");
return binding;
}
@Bean
public Binding bindingtopic3() {
Binding binding = BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("lazy.#");//#表示0个或若干个关键字,*表示一个关键字
return binding;
}
}
请求方法
@Autowired
PublishImpl publisher;
@RequestMapping(value = "/mytopic", produces = {"application/json;charset=UTF-8"})
@ResponseBody
public void topic(@ModelAttribute("mail")TopicMail mail){
System.out.println(mail);
Mail m=new Mail(mail.getMailId(),mail.getCountry(),mail.getWeight());
publisher.sendtopicMail(m,mail.getRoutingkey());
}
服务方法
@Autowired
RabbitTemplate rabbitTemplate;
public void sendtopicMail(Mail mail,String routingkey){
rabbitTemplate.convertAndSend("mytopic",routingkey,mail);
}
消费方法
@RabbitListener(queues = "topicqueue1")
public void displayTopic(Mail mail)throws IOException{
System.out.println("从topicqueue1取出消息"+mail.toString());
}
@RabbitListener(queues = "topicqueue2")
public void displayTopic(Mail mail)throws IOException{
System.out.println("从topicqueue2取出消息"+mail.toString());
}