当前位置: 首页 > >

RabbitMQ成员简介

发布时间:



文章目录
Exchange 交换机交换机属性Direct ExchangeProducerConsumer
Topic ExchangeProducerConsumer
Fanout ExchangeProducerConsumer
Headers Exchange
Binding-绑定Queue-消息队列Message-消息Message-其他属性代码演示ProducerConsumer

Virtual Host-虚拟主机相关链接


Exchange 交换机

Exchange : 接收消息, 并根据路由键转发消息所绑定的队列


交换机属性
Name : 交换机名称Type : 交换机类型, direct, topic, fanout, headersDurability : 是否需要持久化, true为持久化Auto Delete : 当最后一个绑定到Exchange上的队列删除后, 自动删除该ExchangeInternal : 当前Exchange是否用于RabbitMQ内部使用, 默认为False, 这个属性很少会用到Arguments : 扩展参数, 用于扩展AMQP协议制定化使用
Direct Exchange

Direct Exchange : 所有发送到Direct Exchange的消息被转发到RoutingKey中指定的Queue



注意 : Direct模式可以使用RabbitMQ自带的Exchange(default Exchange), 所以不需要将Exchange进行任何绑定(binding)操作, 消息传递时, RoutingKey必须完全匹配才会被队列接收, 否则该消息会被抛弃




Producer

package com.qiyexue.exchange.direct;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
* Direct模式的生产者
*
* @author 七夜雪
* @create 2018-12-13 22:00
*/
public class ProducerByDirect {

public static void main(String[] args) throws IOException, TimeoutException {
// 1. 创建连接工厂, 设置属性
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.72.138");
factory.setPort(5672);
factory.setVirtualHost("/");

// 2. 获取连接
Connection connection = factory.newConnection();

// 3. 创建channel
Channel channel = connection.createChannel();

// 4. 声明
String exchangeName = "test_direct_exchange";
// Direct模式必须和消费者保持一致才能发送消息, 不然消息会被丢弃
String routingKey = "test.direct";

// 5. 发送消息
String msg = "Hello RabbitMQ By Direct";
channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());

// 6. 关闭连接
channel.close();
connection.close();
}

}


Consumer

package com.qiyexue.exchange.direct;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
* Direct模式消费者
*
* @author 七夜雪
* @create 2018-12-13 22:01
*/
public class ConsumerByDirect {

public static void main(String[] args) throws Exception {
// 1. 创建工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.72.138");
factory.setPort(5672);
factory.setVirtualHost("/");

// 2. 获取连接
Connection connection = factory.newConnection();

// 3. 创建channel
Channel channel = connection.createChannel();

// 4. 声明
// 交换机名称
String exchangeName = "test_direct_exchange";
// 交换机类型
String exchangeType = "direct";
String queueName = "test_direct_queue";
// Direct模式RoutingKey必须和生产者保持一致才能消费
String routingKey = "test.direct";
// 表示声明了一个交换机, 后面几个参数分别为durable, autoDelete, internal, arguments
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
// 声明一个队列, 后面四个参数分别为durable, exclusive, autoDelete, arguments
// durable : 是否持久化消息
channel.queueDeclare(queueName, false, false, false, null);
// 建立一个绑定关系
channel.queueBind(queueName, exchangeName, routingKey);

QueueingConsumer consumer = new QueueingConsumer(channel);
// 参数 : 队列名称, autoAck:是否自动确认, consumer
channel.basicConsume(queueName, true, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("收到消息 : " + msg);
}
}

}

Topic Exchange

所有发送到Topic Exchange的消息将被转发到所有关心RoutingKey中指定Topic的Queue上

Exchange将RoutingKey和某个Topic进行模糊匹配, 此时队列需要绑定一个Topic



可以使用通配符进行模糊匹配“#” : 匹配一个或多个词“*” : 匹配一个词


Producer

package com.qiyexue.exchange.topic;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
* Topic模式生产者
*
* @author 七夜雪
* @create 2018-12-14 8:07
*/
public class ProducerByTopic {

public static void main(String[] args) throws IOException, TimeoutException {
// 创建工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.72.138");
factory.setPort(5672);
factory.setVirtualHost("/");

// 创建连接
Connection connection = factory.newConnection();

// 创建channel
Channel channel = connection.createChannel();

// 声明
String exchangeName = "test_topic_exchange";
String routingKey1 = "tingxuelou.biluo";
String routingKey2 = "tingxuelou.hongchen";
String routingKey3 = "tingxuelou.hufa.zimo";

String msg = "test topic By routingKey : ";
channel.basicPublish(exchangeName, routingKey1, null, (msg + routingKey1).getBytes());
channel.basicPublish(exchangeName, routingKey2, null, (msg + routingKey2).getBytes());
channel.basicPublish(exchangeName, routingKey3, null, (msg + routingKey3).getBytes());

// 关闭连接
channel.close();
connection.close();

}

}


Consumer

package com.qiyexue.exchange.topic;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

/**
* topic模式消费者
*
* @author 七夜雪
* @create 2018-12-14 20:10
*/
public class ConsumerByTopic {

public static void main(String[] args) throws Exception {
// 创建工厂
// 创建工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.72.138");
factory.setPort(5672);
factory.setVirtualHost("/");

// 创建连接
Connection connection = factory.newConnection();

// 创建channel
Channel channel = connection.createChannel();

// 声明Exchange
String exchangeName = "test_topic_exchange";
String exchangetype = "topic";
// tingxuelou.#
String routingKey = "tingxuelou.*";
channel.exchangeDeclare(exchangeName, exchangetype);

// 声明队列
String queueName = "test_topic_queue";
channel.queueDeclare(queueName, false, false, false, null);

// 绑定队列
channel.queueBind(queueName, exchangeName, routingKey);

// 创建消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, consumer);

while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
System.out.println(new String(delivery.getBody()));
}
}

}


Fanout Exchange

不处理路由键, 只需要简单的将队列绑定到交换机上

发送到交换机的消息都会被转发到与该交换机绑定的所有队列上

Fanout交换机转发消息是最快的



Producer

package com.qiyexue.exchange.fanout;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
* fanout模式生产者
*
* @author 七夜雪
* @create 2018-12-14 20:36
*/
public class ProducerByFanout {

public static void main(String[] args) throws Exception {
// 1. 创建连接工厂, 设置属性
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.72.138");
factory.setPort(5672);
factory.setVirtualHost("/");

// 2. 获取连接
Connection connection = factory.newConnection();

// 3. 创建channel
Channel channel = connection.createChannel();

String exchangeName = "test_fanout_exchange";
String routingKey = "无所谓";
for (int i = 0; i < 5; i++) {
String msg = "Fanout 模式消息..";
channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
}

// 关闭连接
channel.close();
connection.close();
}

}

Consumer

package com.qiyexue.exchange.fanout;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

/**
* Fanout模式消费者
*
* @author 七夜雪
* @create 2018-12-14 20:40
*/
public class ConsumerByFanout {

public static void main(String[] args) throws Exception {
// 1. 创建连接工厂, 设置属性
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.72.138");
factory.setPort(5672);
factory.setVirtualHost("/");

// 2. 获取连接
Connection connection = factory.newConnection();

// 3. 创建channel
Channel channel = connection.createChannel();

// 4. 声明Exchange
String exchangeName = "test_fanout_exchange";
String exchangeType = "fanout";
channel.exchangeDeclare(exchangeName, exchangeType);

// 5. 声明消息队列
String routingKey = "";
String queueName = "test_fanout_queue";
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);

// 6. 创建消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, consumer);

while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("收到消息 : " + msg);
}

}

}

Headers Exchange
Headers Exchange不使用RoutingKey去绑定, 而是通过消息headers的键值对匹配这个Exchange很少会使用, 这里就不细说了
Binding-绑定
Exchange和Exchange, Queue之间的连接关系绑定中可以包含RoutingKey或者参数
Queue-消息队列

消息队列, 实际存储消息数据

Durability : 是否持久化



Durable : 是


Transient : 否


Auto delete : 如选yes,代表当最后一个监听被移除之后, 该Queue会自动被删除


Message-消息
服务和应用程序之间传送的数据本质上就是一段数据, 由Properties和Payload(Body)组成常用属性 : delivery mode, headers(自定义属性)
Message-其他属性
content_type, content_encoding, prioritycorrelation_id : 可以认为是消息的唯一idreplay_to : 重回队列设定expiration : 消息过期时间message_id : 消息idtimestamp, type, user_id, app_id, cluster_id
代码演示
Producer

package com.qiyexue.message;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.util.HashMap;
import java.util.Map;

/**
* 生产者
*
* @author 七夜雪
* @create 2018-12-13 20:43
*/
public class Producer {

public static void main(String[] args) throws Exception {
// 1. 创建连接工厂, 设置属性
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.72.138");
factory.setPort(5672);
factory.setVirtualHost("/");

// 2. 创建连接
Connection connection = factory.newConnection();

// 3. 使用connection创建channel
Channel channel = connection.createChannel();

// 4. 通过channel发送消息
String msg = "hello rabbitmq!";
AMQP.BasicProperties properties = new AMQP.BasicProperties();
Map headers = new HashMap();
headers.put("name", "七夜雪");
properties = properties.builder()
// 设置编码为UTF8
.contentEncoding("UTF-8")
// 设置自定义Header
.headers(headers)
// 设置消息失效时间
.expiration("5000").build();

for (int i = 0; i < 5; i++) {
// 不指定exchange的情况下, 使用默认的exchange, routingKey与队列名相等
channel.basicPublish("", "test01", properties, msg.getBytes());
}

// 5. 关闭连接
channel.close();
connection.close();
}

}

Consumer

package com.qiyexue.message;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;

/**
* 消费者
*
* @author 七夜雪
* @create 2018-12-13 20:57
*/
public class Consumer {

public static void main(String[] args) throws Exception {
// 1. 创建连接工厂, 设置属性
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.72.138");
factory.setPort(5672);
factory.setVirtualHost("/");

// 2. 创建连接
Connection connection = factory.newConnection();

// 3. 使用connection创建channel
Channel channel = connection.createChannel();

// 4. 声明(创建)一个队列
String queueName = "test01";
channel.queueDeclare(queueName,true, false, false, null);

// 5. 创建消费者
QueueingConsumer consumer = new QueueingConsumer(channel);

// 6. 设置channel
channel.basicConsume(queueName, true, consumer);
while (true) {
// 7. 获取消息
Delivery delivery = consumer.nextDelivery();
System.out.println(new String(delivery.getBody()));
// 获取head中内容
System.out.println(delivery.getProperties().getHeaders().get("name"));
}

}

}

Virtual Host-虚拟主机
虚拟地址, 用于进行逻辑隔离, 最上层的消息路由一个Virtual Host里面可以有若干个Exchange和Queue同一个Virtual Host里面不能有相同名称的Exchange或Queue
相关链接

RabbitMQ入门与AMQP协议简介
RabbitMQ成员简介
RabbitMQ高级特性-消息可靠性投递
RabbitMQ高级特性-幂等性保障
RabbitMQ高级特性-Confirm确认消息
RabbitMQ高级特性-Return消息机制
RabbitMQ高级特性-消费端自定义监听
RabbitMQ高级特性-消费端限流
RabbitMQ高级特性-消费端ACK与重回队列
RabbitMQ高级特性-TTL队列/消息
RabbitMQ高级特性-死信队列(DLX)
Spring AMQP整合RabbitMQ
SpringBoot整合RabbitMQ
RabbitMQ集群架构模式介绍
从零开始搭建高可用RabbitMQ镜像模式集群
RabbitMQ集群恢复与故障转移
RabbitMQ-基础组件封装
Git代码地址



慕课网学*笔记



友情链接: