pom文件都是相同的
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.1.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.bjsxt</groupId>
<artifactId>spring-boot-direct-consumer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>spring-boot-direct-consumer</name>
<description>spring-boot-direct-consumer</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
Direct 交换器(发布与订阅 完全匹配)
需求
server.port=8081
spring.rabbitmq.host=192.168.181.133
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=111111
#设置交换器的名称
mq.config.exchange=log.direct
#设置info队列名称
mq.config.queue.info=log.info
#设置info的路由键
mq.config.queue.info.routing.key=log.info.routing.key
#设置error队列名称
mq.config.queue.error=log.error
#设置error的路由键
mq.config.queue.error.routing.key=log.error.routing.key
server.port=8080
spring.rabbitmq.host=192.168.181.133
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=111111
#设置交换器的名称
mq.config.exchange=log.direct
#设置info的路由键
mq.config.queue.info.routing.key=log.info.routing.key
#设置error队列名称
mq.config.queue.error=log.error
#设置error的路由键
mq.config.queue.error.routing.key=log.error.routing.key
package com.bjsxt.receive;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
/***
* 消息的接收者
*/
@Component
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value = "${mq.config.queue.info}",autoDelete = "true"),
exchange = @Exchange(value = "${mq.config.exchange}",type = ExchangeTypes.DIRECT),
key="${mq.config.queue.info.routing.key}"
)
)
public class InfoReceiver {
/**
* 接收消息的方法
* 采用消息队列监听机制
* @param msg
*/
@RabbitHandler
public void process(String msg){
System.out.println("info--receiver=:"+msg);
}
}
package com.bjsxt.receive;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
/***
* 消息的接收者
*/
@Component
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value = "${mq.config.queue.error}",autoDelete = "true"),
exchange = @Exchange(value = "${mq.config.exchange}",type = ExchangeTypes.DIRECT),
key="${mq.config.queue.error.routing.key}"
)
)
public class ErrorReceiver {
/**
* 接收消息的方法
* 采用消息队列监听机制
* @param msg
*/
@RabbitHandler
public void process(String msg){
System.out.println("Error--receiver=:"+msg);
}
}
Sender
package com.bjsxt.send;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
/**
* 消息发送者
*/
@Component
public class Sender {
@Autowired
private AmqpTemplate amqpTemplate;
@Value("${mq.config.exchange}")
private String exchange;
@Value("${mq.config.queue.error.routing.key}")
private String routingkey;
/**
* 发送消息的方法
* @param msg
*/
public void sendMsg(String msg){
/*向消息队列发送消息*/
/*
* 参数一:队列的名称
* 参数二:发送的消息
* */
amqpTemplate.convertAndSend(exchange,routingkey,msg);
}
}
package com.bjsxt.test;
import com.bjsxt.SpringBootDirectProviderApplication;
import com.bjsxt.send.Sender;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = SpringBootDirectProviderApplication.class)
public class RabbitMQTest {
@Autowired
private Sender sender;
@Test
public void queueTest() throws InterruptedException {
while (true){
Thread.sleep(1000);
sender.sendMsg("你好RabbitMQ");
}
}
}
server.port=8083
spring.rabbitmq.host=192.168.181.133
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=111111
#设置交换器的名称
mq.config.exchange=log.topic
#设置info队列名称
mq.config.queue.info=log.info
#设置info队列名称
mq.config.queue.error=log.error
#log 队列名称
mq.config.queue.logs=log.all
server.port=8084
spring.rabbitmq.host=192.168.181.133
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=111111
#设置交换器的名称
mq.config.exchange=log.topic
package com.bjsxt.send;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
/**
* 消息发送者
*/
@Component
public class UserSender {
@Autowired
private AmqpTemplate amqpTemplate;
@Value("${mq.config.exchange}")
private String exchange;
/**
* 发送消息的方法
* @param msg
*/
public void sendMsg(String msg){
/*向消息队列发送消息*/
/*
* 参数一:队列的名称
* 参数二:发送的消息
* */
amqpTemplate.convertAndSend(exchange,"user.log.bug", "user.log.debug....."+msg);
amqpTemplate.convertAndSend(exchange,"user.log.info", "user.log.info....."+msg);
amqpTemplate.convertAndSend(exchange,"user.log.warn", "user.log.warn....."+msg);
amqpTemplate.convertAndSend(exchange,"user.log.error", "user.log.error....."+msg);
}
}
package com.bjsxt.receive;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
/***
* 消息的接收者
*/
@Component
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value = "${mq.config.queue.info}",autoDelete = "true"),
exchange = @Exchange(value = "${mq.config.exchange}",type = ExchangeTypes.TOPIC),
key="*.log.info"
)
)
public class InfoReceiver {
/**
* 接收消息的方法
* 采用消息队列监听机制
* @param msg
*/
@RabbitHandler
public void process(String msg){
System.out.println("info--receiver=:"+msg);
}
}
package com.bjsxt.receive;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
/***
* 消息的接收者
*/
@Component
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value = "${mq.config.queue.logs}",autoDelete = "true"),
exchange = @Exchange(value = "${mq.config.exchange}",type = ExchangeTypes.TOPIC),
key="*.log.*"
)
)
public class LogReceiver {
/**
* 接收消息的方法
* 采用消息队列监听机制
* @param msg
*/
@RabbitHandler
public void process(String msg){
System.out.println("log--receiver=:"+msg);
}
}
spring.application.name=springcloud-mq
spring.rabbitmq.host=192.168.181.133
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=111111
#设置交换器的名称
mq.config.exchange=order.fanout
#短信服务队列名称
mq.config.queue.sms=order.sms
#push 服务队列名称
mq.config.queue.push=order.push
spring.application.name=springcloud-mq
spring.rabbitmq.host=192.168.181.133
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=111111
#设置交换器的名称
mq.config.exchange=order.fanout
rabbitAmqpTemplate.convertAndSend(this.exchange,"", msg);
因篇幅问题不能全部显示,请点此查看更多更全内容
Copyright © 2019- esig.cn 版权所有 湘ICP备2023023988号-3
违法及侵权请联系:TEL:199 1889 7713 E-MAIL:2724546146@qq.com
本站由北京市万商天勤律师事务所王兴未律师提供法律服务