Springboot整合Rabbitmq

安装erlang环境和rabbitmq

注意版本对应

官方网址https://www.rabbitmq.com/which-erlang.html

在安装完rabbitMq后,http://ip:15672/ ,是可以看到一个简单后台管理界面。

可以手动创建虚拟host,创建用户,分配权限,创建交换机,创建队列等等,还有查看队列消息,消费效率,推送效率等等。

springboot项目代码

pom,依赖包

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
        <version>1.5.10.RELEASE</version>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <version>1.5.10.RELEASE</version>
        <scope>test</scope>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
        <version>1.5.10.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-thymeleaf</artifactId>
        <version>1.5.10.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
        <version>1.5.10.RELEASE</version>
    </dependency>
</dependencies>

application.yml

server:
  port: 8021
spring:
  #给项目来个名字
  application:
    name: rabbitmq
  #配置rabbitMq 服务器
  rabbitmq:
    host: localhost
    port: 5672
    username: admin
    password: admin
    virtual-host: nskhost

两个传递信息用的po

package boot.spring.po;

import java.io.Serializable;

public class Mail implements Serializable {

    private static final long serialVersionUID = -8140693840257585779L;
    private String mailId;
    private String country;
    private Double weight;


    public Mail() {
    }

    public Mail(String mailId, String country, double weight) {
        this.mailId = mailId;
        this.country = country;
        this.weight = weight;
    }

    public String getMailId() {
        return mailId;
    }

    public void setMailId(String mailId) {
        this.mailId = mailId;
    }

    public String getCountry() {
        return country;
    }

    public void setCountry(String country) {
        this.country = country;
    }

    public double getWeight() {
        return weight;
    }

    public void setWeight(double weight) {
        this.weight = weight;
    }

    @Override
    public String toString() {
        return "Mail \[mailId=" + mailId + ", country=" + country + ", weight="
                + weight + "\]";
    }

}
```java
package boot.spring.po;

public class TopicMail extends Mail {
    String routingkey;

    public String getRoutingkey() {
        return routingkey;
    }

    public void setRoutingkey(String routingkey) {
        this.routingkey = routingkey;
    }

    @Override
    public String toString() {
        return super.toString() + "TopicMail \[routingkey=" + routingkey + "\]";
    }

}

直连交换机模式。根据消息携带的路由键将消息投递给对应队列

直连交换机配置类

直连交换机。根据消息携带的路由键将消息投递给对应队列

package boot.spring.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

//direct直连模式的交换机配置,包括一个direct交换机,两个队列,三根网线binding

/**
 * 直连交换机。根据消息携带的路由键将消息投递给对应队列
 */
@Configuration
public class DirectExchangeConfig {
    /**
     * 创建直连交换机
     * @return
     */
    @Bean
    public DirectExchange directExchange() {
        DirectExchange directExchange = new DirectExchange("direct");
        return directExchange;
    }

    @Bean
    public Queue directQueue1() {
        Queue queue = new Queue("directqueue1");
        return queue;
    }

    @Bean
    public Queue directQueue2() {
        Queue queue = new Queue("directqueue2");
        return queue;
    }

    //3个binding将交换机和相应队列连起来
    @Bean
    public Binding bindingorange() {
        //绑定队列1到交换机,绑定键为orange
        Binding binding = BindingBuilder.bind(directQueue1()).to(directExchange()).with("orange");
        return binding;
    }

    @Bean
    public Binding bindingblack() {
        Binding binding = BindingBuilder.bind(directQueue2()).to(directExchange()).with("black");
        return binding;
    }

    @Bean
    public Binding bindinggreen() {
        Binding binding = BindingBuilder.bind(directQueue2()).to(directExchange()).with("green");
        return binding;
    }
}

直连交换机请求方法

@RequestMapping(value = "/direct", produces = {"application/json;charset=UTF-8"})
@ResponseBody
public void direct(@ModelAttribute("mail")TopicMail mail){
        System.out.println(mail);
        Mail m=new Mail(mail.getMailId(),mail.getCountry(),mail.getWeight());
        publisher.senddirectMail(m,mail.getRoutingkey());
}

直连交换机发布服务方法

@Autowired
RabbitTemplate rabbitTemplate;
public void senddirectMail(Mail mail,String routingkey){
        rabbitTemplate.convertAndSend("direct",routingkey,mail);
}

直连交换机消费方法

@RabbitListener(queues = "directqueue1")
public void displayMail1(Mail mail)throws Exception{
        System.out.println("directqueue1队列监听器1号收到消息"+mail.toString());
}
@RabbitListener(queues = "directqueue2")
public void displayMail2(Mail mail)throws Exception{
        System.out.println("directqueue2队列监听器2号收到消息"+mail.toString());
}

生产者消费者模式

只有消费者指定这个队列才能获取到,不需要key,订阅者按顺序循环获取消息

配置类

package boot.spring.config;

import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

//生产者消费者模式的配置,包括一个队列和两个对应的消费者

/**
 * 只有指定这个队列才能获取到,不需要key,订阅者按顺序循环获取消息
 */
@Configuration
public class ProducerConsumerConfig {

    @Bean
    public Queue myQueue() {
        Queue queue = new Queue("myqueue");
        return queue;
    }

}

请求方法

@Autowired
ProducerImpl producer;
@RequestMapping(value = "/produce", produces = {"application/json;charset=UTF-8"})
@ResponseBody
public void produce(@ModelAttribute("mail")Mail mail)throws Exception{
        System.out.println(mail);
        producer.sendMail("myqueue",mail);
}

发布服务方法

@Autowired
RabbitTemplate rabbitTemplate;
public void sendMail(String queue,Mail mail){
        rabbitTemplate.setQueue(queue);
        rabbitTemplate.convertAndSend(queue,mail);
}

消费方法

@RabbitListener(queues = "myqueue")
public void displayMail(Mail mail)throws Exception{
        System.out.println("队列监听器1号收到消息"+mail.toString());
}

@RabbitListener(queues = "myqueue")
public void displayMail2(Mail mail)throws Exception{
        System.out.println("队列监听器3号收到消息"+mail.toString());
}
@RabbitListener(queues = "myqueue")
public void displayMail(Mail mail)throws Exception{
        System.out.println("队列监听器2号收到消息"+mail.toString());
}

发布订阅模式

发布订阅模式的配置,包括两个队列和对应的订阅者,发布者的交换机类型使用fanout(子网广播),两根网线binding用来绑定队列到交换机

配置类

package boot.spring.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


//发布订阅模式的配置,包括两个队列和对应的订阅者,
// 发布者的交换机类型使用fanout(子网广播),两根网线binding用来绑定队列到交换机

/**
 * 广播交换机。所有队列都可获取消息
 */
@Configuration
public class PublishSubscribeConfig {

    @Bean
    public Queue myQueue1() {
        Queue queue = new Queue("queue1");
        return queue;
    }

    @Bean
    public Queue myQueue2() {
        Queue queue = new Queue("queue2");
        return queue;
    }

    @Bean
    public FanoutExchange fanoutExchange() {
        FanoutExchange fanoutExchange = new FanoutExchange("fanout");
        return fanoutExchange;
    }

    @Bean
    public Binding binding1() {
        Binding binding = BindingBuilder.bind(myQueue1()).to(fanoutExchange());
        return binding;
    }

    @Bean
    public Binding binding2() {
        Binding binding = BindingBuilder.bind(myQueue2()).to(fanoutExchange());
        return binding;
    }

}

请求方法

@Autowired
PublishImpl publisher;
@RequestMapping(value = "/topic", produces = {"application/json;charset=UTF-8"})
@ResponseBody
public void topic(@ModelAttribute("mail")Mail mail)throws Exception{
        System.out.println(mail);
        publisher.publishMail(mail);
}

发布服务方法

@Autowired
RabbitTemplate rabbitTemplate;
public void publishMail(Mail mail){
        rabbitTemplate.convertAndSend("fanout","",mail);
}

消费方法

@RabbitListener(queues = "queue1")
public void subscribe(Mail mail)throws IOException{
        System.out.println("订阅者1收到消息"+mail.toString());
}
@RabbitListener(queues = "queue2")
public void subscribe(Mail mail)throws IOException{
        System.out.println("订阅者2收到消息"+mail.toString());
}

topic交换机模型

主题交换机,这个交换机其实跟直连交换机流程差不多,但是它的特点就是在它的路由键和绑定键之间是有规则的。  简单地介绍下规则:  * (星号) 用来表示一个单词 (必须出现的)  # (井号) 用来表示任意数量(零个或多个)单词  通配的绑定键是跟队列进行绑定的,举个小例子  队列Q1 绑定键为 .TT.          队列Q2绑定键为 TT.#  如果一条消息携带的路由键为 A.TT.B,那么队列Q1将会收到;  如果一条消息携带的路由键为TT.AA.BB,那么队列Q2将会收到;  主题交换机是非常强大的,为啥这么膨胀?  当一个队列的绑定键为 “#”(井号) 的时候,这个队列将会无视消息的路由键,接收所有的消息。  当 * (星号) 和 # (井号) 这两个特殊字符都未在绑定键中出现的时候,此时主题交换机就拥有的直连交换机的行为。  所以主题交换机也就实现了扇形交换机的功能,和直连交换机的功能。

配置类

package boot.spring.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
//topic交换机模型,需要一个topic交换机,两个队列和三个binding

/**
 * 主题交换机,这个交换机其实跟直连交换机流程差不多,但是它的特点就是在它的路由键和绑定键之间是有规则的。
 * 简单地介绍下规则:
 *
 * *  (星号) 用来表示一个单词 (必须出现的)
 * #  (井号) 用来表示任意数量(零个或多个)单词
 * 通配的绑定键是跟队列进行绑定的,举个小例子
 * 队列Q1 绑定键为 *.TT.*          队列Q2绑定键为  TT.#
 * 如果一条消息携带的路由键为 A.TT.B,那么队列Q1将会收到;
 * 如果一条消息携带的路由键为TT.AA.BB,那么队列Q2将会收到;
 *
 * 主题交换机是非常强大的,为啥这么膨胀?
 * 当一个队列的绑定键为 "#"(井号) 的时候,这个队列将会无视消息的路由键,接收所有的消息。
 * 当 * (星号) 和 # (井号) 这两个特殊字符都未在绑定键中出现的时候,此时主题交换机就拥有的直连交换机的行为。
 * 所以主题交换机也就实现了扇形交换机的功能,和直连交换机的功能。
 */
@Configuration
public class TopicExchangeConfig {
    @Bean
    public TopicExchange topicExchange() {
        TopicExchange topicExchange = new TopicExchange("mytopic");
        return topicExchange;
    }

    @Bean
    public Queue topicQueue1() {
        Queue queue = new Queue("topicqueue1");
        return queue;
    }

    @Bean
    public Queue topicQueue2() {
        Queue queue = new Queue("topicqueue2");
        return queue;
    }

    //3个binding将交换机和相应队列连起来
    @Bean
    public Binding bindingtopic1() {
        Binding binding = BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("*.orange.*");//binding key
        return binding;
    }

    @Bean
    public Binding bindingtopic2() {
        Binding binding = BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("*.*.rabbit");
        return binding;
    }

    @Bean
    public Binding bindingtopic3() {
        Binding binding = BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("lazy.#");//#表示0个或若干个关键字,*表示一个关键字
        return binding;
    }
}

请求方法

@Autowired
PublishImpl publisher;
@RequestMapping(value = "/mytopic", produces = {"application/json;charset=UTF-8"})
@ResponseBody
public void topic(@ModelAttribute("mail")TopicMail mail){
        System.out.println(mail);
        Mail m=new Mail(mail.getMailId(),mail.getCountry(),mail.getWeight());
        publisher.sendtopicMail(m,mail.getRoutingkey());
}

服务方法

@Autowired
RabbitTemplate rabbitTemplate;
public void sendtopicMail(Mail mail,String routingkey){
        rabbitTemplate.convertAndSend("mytopic",routingkey,mail);
 }

消费方法

@RabbitListener(queues = "topicqueue1")
public void displayTopic(Mail mail)throws IOException{
        System.out.println("从topicqueue1取出消息"+mail.toString());
 }
@RabbitListener(queues = "topicqueue2")
public void displayTopic(Mail mail)throws IOException{
        System.out.println("从topicqueue2取出消息"+mail.toString());
}

另外还有 Header Exchange 头交换机 ,Default Exchange 默认交换机,Dead Letter Exchange 死信交换机