1.jar包依赖
org.springframework.boot spring-boot-starter-amqp 1.5.10.RELEASE
2.application.properties配置文件
## 配置rabbitMQspring.application.name=spirng-boot-rabbitmq-examplespring.rabbitmq.host=192.168.80.6spring.rabbitmq.port=5672spring.rabbitmq.username=adminspring.rabbitmq.password=admin
3.queue模式
<1>.一对一的字符串的简单传输
1).队列配置
/** * 队列的配置 * @author mengjh * */@Configurationpublic class RabbitConfig { @Bean public Queue Queue() { return new Queue("hello"); }}
2).发送者
/** * hello的发送者 * @author Leruan * */@Componentpublic class HelloSender { @Autowired private AmqpTemplate rabbitTemplate; public void send() { String context = "hello " + new Date(); System.out.println("Sender : " + context); this.rabbitTemplate.convertAndSend("hello", context); }}
3).接受者
/** * hello的接受者 * @author * */@Component@RabbitListener(queues = "hello")public class HelloReceiver { @RabbitHandler public void process(String hello) { System.out.println("Receiver : " + hello); }}
4).测试
/** * rabbitmq的hello测试 * @author * */@RunWith(SpringRunner.class)@SpringBootTestpublic class RabbitMQHelloTest { @Autowired private HelloSender helloSender; @Test public void hello() throws Exception { helloSender.send(); }}
<2>.一对多和多对多传输
1).在配置队列中添加配置
@Bean /** * 测试多对多 */public Queue neoQueue() { return new Queue("neo");}
2).发送者
@Componentpublic class NeoSender1 { @Autowired private AmqpTemplate rabbitTemplate; public void send(int i) { String context = "spirng boot neo queue" + " ****** " + i; System.out.println("Sender1 : " + context); this.rabbitTemplate.convertAndSend("neo", context); }}@Componentpublic class NeoSender2 { @Autowired private AmqpTemplate rabbitTemplate; public void send(int i) { String context = "spirng boot neo queue" + " ****** " + i; System.out.println("Sender2 : " + context); this.rabbitTemplate.convertAndSend("neo", context); }}
3).接受者
@Component@RabbitListener(queues = "neo")public class NeoReceiver1 { @RabbitHandler public void process(String neo) { System.out.println("Receiver 1: " + neo); }}@Component@RabbitListener(queues = "neo")public class NeoReceiver2 { @RabbitHandler public void process(String neo) { System.out.println("Receiver 2: " + neo); }}
4).测试
@RunWith(SpringRunner.class)@SpringBootTestpublic class ManyTest { @Autowired private NeoSender1 neoSender1; @Autowired private NeoSender2 neoSender2; @Test public void oneToMany() throws Exception { for (int i=0;i<100;i++){ neoSender1.send(i); } } @Test public void manyToMany() throws Exception { for (int i=0;i<100;i++){ neoSender1.send(i); neoSender2.send(i); } }}
<3>.传输对象和集合类型数据
1).在队列配置中添加配置
@Bean/** * 对象 */public Queue objQueue() { return new Queue("object");}@Bean/** * 集合 */public Queue listQueue() { return new Queue("list");}
2).发送者
/** * rabbitmq传输对象-发送端 * @author mengjh * */@Componentpublic class ObjectSender { @Autowired private AmqpTemplate rabbitTemplate; public void send(UserEntity user) { System.out.println("Sender : " + user); this.rabbitTemplate.convertAndSend("object", user); }}/** * 测试rabbitMQ关于list--发送端 * @author Leruan * */@Componentpublic class ListSender { @Autowired private AmqpTemplate rabbitTemplate; public void send(List list) { System.out.println("Sender : " + list); this.rabbitTemplate.convertAndSend("list", list); }}
3).接受者
/** * rabbitmq传输对象-接收端 * @author mengjh * */@Component@RabbitListener(queues = "object")public class ObjectReceiver { @RabbitHandler public void process(UserEntity user) { System.out.println("Receiver : " + user); }}/** * rabbitmq测试接收list * @author mengjh * */@Componentpublic class ListReceiver { @RabbitListener(queues = "list") public void process(Listuser) { System.out.println("Receiver : " + user); }}
4).测试
/** * rabbitmq的object测试 * @author Leruan * */@RunWith(SpringRunner.class)@SpringBootTestpublic class RabbitMQObjectTest { @Autowired private ObjectSender objSender; @Autowired private User1Mapper userMapper; @Test public void test() { objSender.send(userMapper.getOne(1)); }}/** * rabbitmq的list测试 * @author mengjh * */@RunWith(SpringRunner.class)@SpringBootTestpublic class RabbitMQListTest { @Autowired private ListSender listSender; @Autowired private User1Mapper userMapper; @Test public void test() { Listusers = userMapper.getAll(); listSender.send(users); }}
4.topic模式
<1>.配置topic的队列TopicRabbitConfig.java
@Configurationpublic class TopicRabbitConfig { final static String message = "TopicName1"; final static String messages = "TopicName2"; @Bean public Queue queueMessage() { return new Queue(TopicRabbitConfig); } @Bean public Queue queueMessages() { return new Queue(TopicRabbitConfig.messages); } @Bean TopicExchange exchange() { return new TopicExchange("exchange"); } @Bean Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) { return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message"); } @Bean Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) { return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#"); }}
<2>.topic发送者
@Componentpublic class TopicSender { @Autowired private AmqpTemplate rabbitTemplate; public void send() { String context = "hi, i am message all"; System.out.println("Sender : " + context); this.rabbitTemplate.convertAndSend("topicExchange", "topic.1", context); } public void send1() { String context = "hi, i am message 1"; System.out.println("Sender : " + context); this.rabbitTemplate.convertAndSend("topicExchange", "topic.message", context); } public void send2() { String context = "hi, i am messages 2"; System.out.println("Sender : " + context); this.rabbitTemplate.convertAndSend("topicExchange", "topic.messages", context); }}
3).topic接受者
/** * 发布订阅方式的rabbitmq接收端1 * @author mengjh * */@Component@RabbitListener(queues = "TopicName1")public class TopicReceiver1 { @RabbitHandler public void process(String message) { System.out.println("Topic Receiver1 : " + message); }}/** * 发布订阅方式的rabbitmq接收端2 * @author * */@Component@RabbitListener(queues = "TopicName2")public class TopicReceiver2 { @RabbitHandler public void process(String message) { System.out.println("Topic Receiver2 : " + message); }}
<4>.测试
@RunWith(SpringRunner.class)@SpringBootTestpublic class RabbitMQTopicTest { @Autowired private TopicSender sender; @Test public void topic() throws Exception { sender.send(); } @Test public void topic1() throws Exception { sender.send1(); } @Test public void topic2() throws Exception { sender.send2(); }}
5.Fanout广播模式
<1>.配置类
/** * 广播模式的配置类 * @param CMessage * @param fanoutExchange * @return */@Configurationpublic class FanoutRabbitConfig { @Bean public Queue AMessage() { return new Queue("fanout.A"); } @Bean public Queue BMessage() { return new Queue("fanout.B"); } @Bean public Queue CMessage() { return new Queue("fanout.C"); } @Bean FanoutExchange fanoutExchange() { return new FanoutExchange("fanoutExchange"); } @Bean Binding bindingExchangeA(Queue AMessage, FanoutExchange fanoutExchange) { return BindingBuilder.bind(AMessage).to(fanoutExchange); } @Bean Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) { return BindingBuilder.bind(BMessage).to(fanoutExchange); } @Bean Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) { return BindingBuilder.bind(CMessage).to(fanoutExchange); }}
<2>.发送者
/** * 广播模式的发送者 * @author * */@Componentpublic class FanoutSender { @Autowired private AmqpTemplate rabbitTemplate; public void send() { String context = "hi, fanout msg "; System.out.println("Sender : " + context); this.rabbitTemplate.convertAndSend("fanoutExchange", "", context); }}
<3>.接受者
/** * 广播模式的接受者A * @author java_ * */@Component@RabbitListener(queues = "fanout.A")public class FanoutReceiverA { @RabbitHandler public void process(String message) { System.out.println("fanout Receiver A : " + message); }}/** * 广播模式的接受者B * @author java_ * */@Component@RabbitListener(queues = "fanout.B")public class FanoutReceiverB { @RabbitHandler public void process(String message) { System.out.println("fanout Receiver B: " + message); }}/** * 广播模式的接受者C * @author * */@Component@RabbitListener(queues = "fanout.C")public class FanoutReceiverC { @RabbitHandler public void process(String message) { System.out.println("fanout Receiver C: " + message); }}
<4>.测试类
/** * 广播模式rabbitmq的测试 * @author java_ * */@RunWith(SpringRunner.class)@SpringBootTestpublic class FanoutRabbitTest { @Autowired private FanoutSender sender; @Test public void fanoutSender() throws Exception { sender.send(); }}