1.1.ActiveMQ
ActiveMQ的特点
- ActiveMQ是Apache出品,最流行的,能力强劲的开源消息总线,并且完全支持JMS规范的消息中间件。
- 有丰富的API,多种集群架构模式使得ActiveMQ成为业界老牌消息中间件,在中小型企业中应用广泛。
- 面对超大规模并发的时候ActiveMQ会造成阻塞等问题。。??
1.2.Kafka
Kafka的特点
- Kafka是开源的分布式发布—订阅消息系统,目前属于Apache的顶级项目。
- Kafka追求高吞吐量,一开始的目的就是用于日志收集和传输。
- 0.8版本开始支持复制,不支持事务,対消息重复、丢失、错误没有严格的要求。
- Kafka适合产生大量数据的互联网服务的数据收集业务。
1.3.RocketMQ
RocketMQ特点
- RocketMQ是阿里开源的消息中间件,纯Java开发,有高吞吐量、高可用性、适合大规模分布式系统应用的特点。
- RocketMQ思路取源于Kafka,対消息的可靠传输及事务性做了优化,在阿里内部被广泛用于交易、充值、流计算、消息推送、日志流式处理等场景。
- 但是RocketMQ收费。。????
1.4.RabbitMQ
RabbitMQ特点
- RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。
- AMQP的主要特征是面向消息、队列、路由(包括点对点和发布—订阅)、可靠性、安全。
- AMQP协议更多用在企业系统内,対数据一致性、稳定性和可靠性要求很高的场景,対性能和吞吐量的要求排在其次。性能和Kafka没发比但是比ActiveMQ要好很多。
2.RabbitMQ核心概念以及AMQP协议
2.1.互联网大厂为什么选择RabbitMQ?
- RabbitMQ使用Erlang语言来编写的,并且是基于AMQP协议的。
- 开源、性能优秀,稳定性保障。
- 提供 可靠性消息投递模式(confirm)、返回模式(return)。
- 与SpringAMQP完美的整合、API丰富。
- 集群模式丰富,表达式配置,HA模式,镜像队列模型。
- 保障数据不丢失的前提下做到高可靠性、可用性。
2.2.RabbitMQ的高性能是如何做到的?
- Erlang语言最初在于交换机领域的架构模式,这样使得RabbitMQ在Broker之间进行数据交互的性能是非常优秀的。
- Erlang的优点:Erlang有着和原生Socket一样的延迟。
2.3.什么是AMQP高级协议?
AMQP协议简介
- AMQP全称:Advanced Message Queuing Protocol 高级消息队列协议。
- AMQP定义:是具有现代特征的二进制协议。是一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。
AMQP协议模型
2.4.AMQP核心概念是什么?
- Server:又称作Broker,接受客户端的连接,实现AMQP实体服务。
- Connection:连接,应用程序与Broker的网络连接。
- Channel:网络信道,几乎所有的操作都在Channel中进行,Channel是进行消息读写的通道。客户端可以建立多个Channel,每个Channel代表一个会话任务。
- Message:消息。服务器和应用程序之间传送的数据,由Properties和Body组成。Properties可以对消息进行修饰,比如消息的优先级、延迟等高级特性;Body就是消息体内容。
- Virtual Host:虚拟主机,用于进行逻辑隔离,最上层的消息路由。一个Virtual Host里面可以有若干个Exchange和Queue,同一个Virtual Host里面不能有相同名称的Exchange和Queue。
- Exchange:交换机,接收消息。根据Routing Key转发消息到绑定的队列。
- Binding:Exchange和Queue之间的虚拟连接,Binding中可以包含Routing Key。
- Routing Key:一个路由规则,虚拟机可以用它来确定如何路由一个特点消息。
- Queue:也成为了Message Queue,消息队列,保存消息并转发给消费者。
2.5.RabbitMQ整体架构模型
2.6.RabbitMQ消息是如何进行流转的?
2.7.RabbitMQ安装和使用
官网地址:https://www.rabbitmq.com/
Erlang和RabbitMQ版本对照表:https://www.rabbitmq.com/which-erlang.html
相关软件包和资料:链接:https://pan.baidu.com/s/1uFP2YU7xPK2KBkaP4gXvLw
提取码:s6z8
安装Rabbitmq
# 1、将需要的安装包下载到Linux系统
erlang-18.3-1.el7.centos.x86_64.rpm # Erlang语言安装包
rabbitmq-server-3.6.5-1.noarch.rpm # RabbitMQ安装包
socat-1.7.3.2-1.1.el7.x86_64.rpm # 秘钥
# 2、安装Erlang环境
rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm
# 3、安装socat
rpm -ivh socat-1.7.3.2-1.1.el7.x86_64.rpm
# 4、安装RabbitMQ
rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm
# 5、RabbitMQ配置文件的修改
路径: vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app # rabbit.app是json格式的配置文件
42 {loopback_users, [guest]}, # 修改42行 去掉guest两边的引号和尖括号,最后结果和左边代码保持一致
Rabbitmq服务的启动和停止
# 1、查看Rabbitmq可以使用的命令
[root@centos-7-test1 ~]# rabbitmq
rabbitmqctl rabbitmq-plugins rabbitmq-server
# 2、启动Rabbitmq服务
rabbitmq-server start & # & 代表后台启动服务
# 启动RabbitMQ服务
[root@centos-7-test1 ~]# rabbitmq-server start &
[1] 3293
[root@centos-7-test1 ~]#
RabbitMQ 3.6.5. Copyright (C) 2007-2016 Pivotal Software, Inc.
## ## Licensed under the MPL. See http://www.rabbitmq.com/
## ##
########## Logs: /var/log/rabbitmq/rabbit@centos-7-test1.log
###### ## /var/log/rabbitmq/rabbit@centos-7-test1-sasl.log
##########
Starting broker...
completed with 0 plugins.
# 3、查看Rabbitmq是否启动成功? 看到下面的结果就证明RabbitMQ启动成功了!
[root@centos-7-test1 ~]# lsof -i:5672
COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
beam.smp 2586 rabbitmq 52u IPv6 36396 0t0 TCP *:amqp (LISTEN)
# 4、Rabbitmq服务的停止
rabbitmqctl stop_app
# 5、安装Rabbitmq管理插件,可以通过浏览器控制台访问
rabbitmq-plugins list # 可以查看Rabbitmq提供的插件
rabbitmq-plugins enable rabbitmq_management # 安装Rabbitmq控制台插件 控制台默认端口是15672
# 6、浏览器访问Rabbitmq控制台 默认用户名和密码都是guest
http://192.168.110.133:15672
2.8.命令行和管控台
基础操作
# 1、启动应用
rabbitmqctl start_app
# 2、关闭应用
rabbitmqctl stop_app
# 3、节点状态
rabbitmqctl status
# 4、添加用户
rabbitmqctl add_user [username] [password]
# 4、列出所有的用户列表
rabbitmqctl list_users
# 5、删除用户
rabbitmqctl delete_user [username]
# 6、清除用户权限
rabbitmqctl clear_permissions -p [vhostpath] [username]
# 7、列出用户权限
rabbitmqctl list_user_permissions [username]
# 8、修改密码
rabbitmqctl change_password [username] [newpassword]
# 9、设置用户权限
rabbitmqctl set_permissions -p [vhostpath] [username] ".*" ".*" ".*"
# 10、创建虚拟主机
rabbitmqctl add_vhost [vhostpath]
# 11、列出所有的虚拟主机
rabbitmqctl list_vhosts
# 12、列出虚拟主机上所有权限
rabbitmqctl list_permissions -p [vhostpath]
# 13、删除虚拟主机
rabbitmqctl delete_vhost [vhostpath]
# 14、查看所有队列消息
rabbitmqctl list_queues
# 15、清除队列里的消息
rabbitmqctl -p [vhostpath] purge_queue blue
高级操作
# 1、移除所有数据,要在 rabbitmqctl stop_app 之后使用
rabbitmqctl reset
# 2、组成集群命令
rabbitmqctl join_cluster [--ram]
# 3、查看集群状态
rabbitmqctl cluster_status
# 4、修改集群节点的存储形式
rabbitmqctl change_cluster_node_type disc | ram
# 5、摘除节点
rabbitmqctl forget_cluster_node [--offline]
# 6、修改节点名称
rabbitmqctl rename_cluster_node [oldnode1] [newnode1]...
2.9.RabbitMQ消息生产与消费
Pom依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.6.5</version>
</dependency>
消费者
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
/**
* Rabbitmq 消费者
* 先启动消费者创建队列,生产者再投递。
*/
public class Consumer {
public static final String HOST = "192.168.110.133";
public static final String QUEUE_NAME = "test001";
public static void main(String[] args) throws Exception {
// 1、创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(HOST); // ConnectionFactory 默认端口是5672 virtualHost是"/"
// 2、通过创建ConnectionFactory创建Connection
Connection connection = connectionFactory.newConnection();
// 3、通过Connection创建Channel
Channel channel = connection.createChannel();
/**
* 4、声明队列
* queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments)
* queue:队列的名字。
* durable:是否持久化。
* exclusive:是否独占 独占的意思就是这个Queue只能有我这一个Channel监听。
* autoDelete:设置为true,没有消费者监听该队列,该队列就会自动删除。
* arguments:扩展参数。
*/
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 5、创建消费者,在RabbitMQ中不需要定义QueueingProducer 只需要定义QueueingConsumer即可
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
/**
* 6、channel的设置
* basicConsume(String queue, boolean autoAck, Consumer callback)
* queue:队列的名字
* autoAck:是否自动签收
* callback:具体的消费者对象
*/
String consumerTag = channel.basicConsume(QUEUE_NAME, true, queueingConsumer);
System.out.println("******consumerTag*****" + consumerTag);
while (true) {
/**
* 7、获取消息
* nextDelivery():如果获取不到消息就一直阻塞
*/
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
// 8、获得消息体
String msg = new String(delivery.getBody());
System.out.println("*******消费端接收到的消息为=====>" + msg);
}
}
}
生产者
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* Rabbitmq 生产者
*/
public class Producer {
public static final String HOST = "192.168.110.133";
public static void main(String[] args) throws Exception {
// 1、创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(HOST); // ConnectionFactory 默认端口是5672 virtualHost是"/"
// 2、通过创建ConnectionFactory创建Connection
Connection connection = connectionFactory.newConnection();
// 3、通过Connection创建Channel
Channel channel = connection.createChannel();
// 4、通过Channel发送数据
// basicPublish(String exchange,String routingKey,BasicProperties props,byte[] body)
channel.basicPublish("", "test001", null, "hello rabbitmq".getBytes());
// 5、关闭连接
channel.close();
connection.close();
System.out.println("*****生产者消息发布成功!****");
}
}
问题:生产者发送消息,没有指定交换机但是routingKey为”test001”就可以路由到”test001”队列,为什么?
RabbitMQ生产者投递消息如果不指定Exchange,那么就会默认使用AMQP.default这个Exchange,它的路由规则就是根据生产者指定的RoutingKey和队列的名字去对比,如果名字一致就将生产者的消息发送给该队列。
2.10.RabbitMQ交换机详解
2.10.1.Exchange的概念
Exchange:接收消息,并根据RoutingKey转发消息所绑定的队列。
2.10.2.Exchange属性
- Name:Exchange名称。
- Type:Exchange的类型。direct、topic、fanout、headers。
- Durability:是否需要持久化,true为持久化。
- Auto Delete:当最后一个绑定到Exchange上的队列删除后,自动删除该Exchange。
- Internal:当前Exchange是否用于RabbitMQ内部使用,默认为false。(很少使用)
- Arguments:扩展参数,用于扩展AMQP协议自制定化使用。
2.10.3.Direct Exchange
Direct Exchange基本概念
- 所有发送到Direct Exchange的消息被转发到Routing key中指定的Queue。
- 一句话:直连的方式,生产者发送消息的Routing Key和Direct Exchange的Routing Key必须完全匹配,才会路由到绑定的Queue。
消费者
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
/**
* 消费者 接收和Direct Exchange绑定的Queue的消息
* 消费者先启动:声明Exchange、Queue和绑定关系
*/
public class ConsumerDirectExchange {
public static final String HOST = "192.168.110.133";
public static final String EXCHANGE_NAME = "test_direct_exchange";
public static final String EXCHANGE_TYPE = "direct";
public static final String ROUTING_KEY = "test.direct";
public static final String QUEUE_NAME = "test_direct_queue";
public static void main(String[] args) throws Exception {
// 1、创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(HOST); // ConnectionFactory 默认端口是5672 virtualHost是"/"
// 2、通过创建ConnectionFactory创建Connection
Connection connection = connectionFactory.newConnection();
// 3、通过Connection创建Channel
Channel channel = connection.createChannel();
/**
* 4、声明Exchange
* exchangeDeclare(String exchange,String type,boolean durable,boolean autoDelete,boolean internal,Map<String, Object> arguments)
* exchange:交换机名字。
* type:交换机类型。
* durable:是否持久化。
* autoDelete:true表示当没有队列绑定到Exchange上时,自动删除该Exchange。
* internal:当前Exchange是否用于RabbitMQ内部使用,一般是false。
* arguments:其他参数。
*/
channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE, true, false, false, null);
/**
* 5、声明队列
* queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
*/
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
/**
* 6、Exchange和Queue的绑定关系
* queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments)
*/
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY, null);
// 7、创建消费者
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
/**
* 8、channel的设置
* basicConsume(String queue, boolean autoAck, Consumer callback)
*/
channel.basicConsume(QUEUE_NAME, true, queueingConsumer);
while (true) {
//9、接收消息
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("*******消费端接收到的消息为=====>" + msg);
}
}
}
生产者
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* 生产者 向Direct Exchange发送消息
*/
public class ProducerDirectExchange {
public static final String HOST = "192.168.110.133";
public static final String EXCHANGE_NAME = "test_direct_exchange";
public static final String ROUTING_KEY = "test.direct";
public static void main(String[] args) throws Exception {
// 1、创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(HOST); // ConnectionFactory 默认端口是5672 virtualHost是"/"
// 2、通过创建ConnectionFactory创建Connection
Connection connection = connectionFactory.newConnection();
// 3、通过Connection创建Channel
Channel channel = connection.createChannel();
// 4、发送消息
String msg = "Hello RabbitMQ Direct Exchange Message...";
// basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, msg.getBytes());
// 5、关闭资源
channel.close();
connection.close();
System.out.println("***ProducerDirectExchange 消息发送成功***");
}
}
2.10.4.Topic Exchange
Topic Exchange基本概念
- 所有发送到Topic Exchange的消息被转发到所有关心Routing Key中指定Topic的Queue上。
- Exchange将Routing Key和某个Topic进行模糊匹配,此时队列需要绑定一个Topic。
- 一句话:Topic Exchange和Queue绑定Routing Key可以使用通配符,生产者发送消息的Routing Key只要和Topic Exchange的Routing Key匹配就能路由到Topic Exchange绑定的队列。
模糊匹配可以使用通配符
- 符号 “#” 匹配一个或多个词。
- 符号 “*” 匹配不多不少一个词。
- 例如:”log.#” 能够匹配到 “log.info.aa”。”log.*” 只能匹配到 “log.err”。
消费者
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
/**
* 消费者 接收Topic Exchange绑定的Queue的消息
* 消费者先启动:声明Exchange、Queue和绑定关系
*/
public class ConsumerTopicExchange {
public static final String HOST = "192.168.110.133";
public static final String EXCHANGE_NAME = "test_topic_exchange";
public static final String EXCHANGE_TYPE = "topic";
public static final String ROUTING_KEY = "user.#";
public static final String QUEUE_NAME = "test_topic_queue";
public static void main(String[] args) throws Exception {
// 1、创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(HOST); // ConnectionFactory 默认端口是5672 virtualHost是"/"
// 2、通过创建ConnectionFactory创建Connection
Connection connection = connectionFactory.newConnection();
// 3、通过Connection创建Channel
Channel channel = connection.createChannel();
/**
* 4、声明Exchange
* exchangeDeclare(String exchange,String type,boolean durable,boolean autoDelete,boolean internal,Map<String, Object> arguments)
*/
channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE, true, false, false, null);
/**
* 5、声明队列
* queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments)
*/
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
/**
* 6、Exchange和Queue的绑定关系
* queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments)
*/
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY, null);
// 7、创建消费者
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
/**
* 8、channel的设置
* basicConsume(String queue, boolean autoAck, Consumer callback)
*/
channel.basicConsume(QUEUE_NAME, true, queueingConsumer);
while (true) {
// 9、接收消息
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("*******消费端接收到的消息为=====>" + msg);
}
}
}
生产者
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* 生产者向 Topic Exchange发送消息
*/
public class ProducerTopicExchange {
public static final String HOST = "192.168.110.133";
public static final String EXCHANGE_NAME = "test_topic_exchange";
public static final String ROUTING_KEY = "user.name.zs";
public static void main(String[] args) throws Exception {
// 1、创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(HOST); // ConnectionFactory 默认端口是5672 virtualHost是"/"
// 2、通过创建ConnectionFactory创建Connection
Connection connection = connectionFactory.newConnection();
// 3、通过Connection创建Channel
Channel channel = connection.createChannel();
/**
* 4、发送消息
* basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
*/
String msg = "Hello Topic Exchange Message...";
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, msg.getBytes());
// 5、关闭资源
channel.close();
connection.close();
System.out.println("***Producer Topic Exchange 消息发送成功***");
}
}
2.10.5.Fanout Exchange
Fanout Exchange基本概念
- Fanout Exchange不处理Routing Key,只需要简单的将Queue绑定到Exchange上。
- 发送到Exchange的消息都会被转发到与该Exchange绑定的所有Queue上。
- Fanout Exchange转发消息是最快的。
消费者
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
/**
* 消费者 接收Fanout Exchange 绑定的Queue的消息
* 消费者先启动:声明Exchange、Queue和绑定关系
* 注意:Fanout Exchange的Routing Key不能设置为null
*/
public class ConsumerFanoutExchange {
public static final String HOST = "192.168.110.133";
public static final String EXCHANGE_NAME = "test_fanout_exchange";
public static final String EXCHANGE_TYPE = "fanout";
public static final String ROUTING_KEY = "";
public static final String QUEUE_NAME = "test_fanout_queue";
public static void main(String[] args) throws Exception {
// 1、创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(HOST); // ConnectionFactory 默认端口是5672 virtualHost是"/"
// 2、通过创建ConnectionFactory创建Connection
Connection connection = connectionFactory.newConnection();
// 3、通过Connection创建Channel
Channel channel = connection.createChannel();
/**
* 4、声明Exchange
* exchangeDeclare(String exchange,String type,boolean durable,boolean autoDelete,boolean internal,Map<String, Object> arguments)
*/
channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE, true, false, false, null);
/**
* 5、声明队列
* queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments)
*/
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
/**
* 6、Exchange和Queue的绑定关系
* queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments)
*/
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY, null);
// 7、创建消费者
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
/**
* 8、channel的设置
* basicConsume(String queue, boolean autoAck, Consumer callback)
*/
channel.basicConsume(QUEUE_NAME, true, queueingConsumer);
while (true) {
// 9、接收消息
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("*******消费端接收到的消息为=====>" + msg);
}
}
}
生产者
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* 生产者 向Fanout Exchange发送消息 根本就不走Routing Key的!
* Routing Key不能为null,随便写一个字符串即可。
*/
public class ProducerFanoutExchange {
public static final String HOST = "192.168.110.133";
public static final String EXCHANGE_NAME = "test_fanout_exchange";
public static final String ROUTING_KEY = "TangShi";
public static void main(String[] args) throws Exception {
// 1、创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(HOST); // ConnectionFactory 默认端口是5672 virtualHost是"/"
// 2、通过创建ConnectionFactory创建Connection
Connection connection = connectionFactory.newConnection();
// 3、通过Connection创建Channel
Channel channel = connection.createChannel();
/**
* 4、发送消息
* basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
*/
String msg = "Hello Fanout Exchange Message...";
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, msg.getBytes());
// 5、关闭资源
channel.close();
connection.close();
System.out.println("***Producer Fanout Exchange 消息发送成功***");
}
}
2.11.RabbitMQ队列、绑定、虚拟主机、消息
2.11.1.Binding(绑定)
Binding基本概念
- Exchange和Exchange、Queue之间的绑定关系。
- Binding中可以包含Routing Key或者参数。
2.11.2.Queue(队列)
Queue基本概念
- 消息队列,实际存储消息数据。
- Durability:是否持久化。Durable:是,Transient:否。
- Auto Delete:如果选true,代表当最后一个监听被移除之后,该Queue会自动删除。
2.11.3.Message(消息)
Message基本概念
- Message是服务器和应用程序之前传送的数据。
- Message本质上就是一段数据,由Properties和Payload(Body)组成。
Message的属性
private String contentType;
private String contentEncoding;
private Map<String,Object> headers; // 自定义属性
private Integer deliveryMode; // 消息的送达模式
private Integer priority; // 消息的优先级
private String correlationId; // 消息的唯一ID
private String replyTo; // 消息失败返回的队列
private String expiration; // 消息的过期时间
private String messageId; // 消息的ID
private Date timestamp; // 时间戳
private String type;
private String userId;
private String appId;
private String clusterId;
消费者接收消息Properties
import com.rabbitmq.client.*;
/**
* 消费者 测试获得消息的Properties
*/
public class MessageConsumer {
public static final String HOST = "192.168.110.133";
public static final String EXCHANGE_NAME = "test_message_exchange";
public static final String EXCHANGE_TYPE = "direct";
public static final String ROUTING_KEY = "test.message";
public static final String QUEUE_NAME = "test_message_queue";
public static void main(String[] args) throws Exception {
// 1、创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(HOST); // ConnectionFactory 默认端口是5672 virtualHost是"/"
// 2、通过创建ConnectionFactory创建Connection
Connection connection = connectionFactory.newConnection();
// 3、通过Connection创建Channel
Channel channel = connection.createChannel();
/**
* 4、声明Exchange
* exchangeDeclare(String exchange,String type,boolean durable,boolean autoDelete,boolean internal,Map<String, Object> arguments)
*/
channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE, true, false, false, null);
/**
* 5、声明队列
* queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
*/
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
/**
* 6、Exchange和Queue的绑定关系
* queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments)
*/
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY, null);
// 7、创建消费者
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
/**
* 8、channel的设置
* basicConsume(String queue, boolean autoAck, Consumer callback)
*/
channel.basicConsume(QUEUE_NAME, true, queueingConsumer);
while (true) {
//9、接收消息
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
//接收消息体
String msg = new String(delivery.getBody());
//接收消息的Properties
AMQP.BasicProperties properties = delivery.getProperties();
System.out.println("*******消费端接收到的消息为=====>" + msg);
System.out.println("*****自定义属性1*******" + properties.getHeaders().get("my1"));
System.out.println("*****自定义属性2*******" + properties.getHeaders().get("my2"));
System.out.println("*****DeliveryMode*********" + properties.getDeliveryMode());
System.out.println("*****Expiration********" + properties.getExpiration());
System.out.println("*****ContentEncoding********" + properties.getContentEncoding());
}
}
}
生产者设置消息Properties
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.util.HashMap;
import java.util.Map;
/**
* 生产者 测试给消息添加Properties
*/
public class MessageProducer {
public static final String HOST = "192.168.110.133";
public static final String EXCHANGE_NAME = "test_message_exchange";
public static final String ROUTING_KEY = "test.message";
public static void main(String[] args) throws Exception {
// 1、创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(HOST); // ConnectionFactory 默认端口是5672 virtualHost是"/"
// 2、通过创建ConnectionFactory创建Connection
Connection connection = connectionFactory.newConnection();
// 3、通过Connection创建Channel
Channel channel = connection.createChannel();
/**
* 4、发送消息
* basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
*/
// 设置消息的Properties
Map<String, Object> headers = new HashMap<>();
headers.put("my1", "111");
headers.put("my2", "222");
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
.deliveryMode(2) //消息的投递模式 2代表持久化消息 1代表非持久化消息
.contentEncoding("UTF-8") //字符集
.expiration("20000") //如果20S没有被消费者消费 就会被自动清除
.headers(headers) // 自定义属性
.build();
String msg = "Hello Message Properties Test...";
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, properties, msg.getBytes());
// 5、关闭资源
channel.close();
connection.close();
System.out.println("***Set Message Properties 消息发送成功***");
}
}
2.11.4.Virtual Host(虚拟主机)
Virtual Host基本概念
- 虚拟地址,用于进行逻辑隔离,最上层的消息路由。
- 一个Virtual Host里面可以有若干个Exchange和Queue。
- 同一个Virtual Host里面不能有相同名称的Exchange和Queue。
3.RabbitMQ的高级特性
3.1.如何保障消息100%的投递成功?
3.1.1.什么是消息的可靠性传递?
- 保障消息的成功发出。
- 保障MQ节点的成功接收。
- 发送端收到MQ节点(Broker)的应答。
- 完善的消息补偿机制。
3.1.2.生产端—可靠性投递解决方案
3.1.2.1.方案一:消息落库,対消息状态进行打标。
消息落库图示
思考:如果使用消息落库的可靠性投递,在高并发场景是否合适?
3.1.2.2.方案二:消息的延迟投递,做二次确认,回调检查。
消息回调检查图示
3.2.消息幂等性概念
幂等性是什么?
- 一句话:用户对于同一操作发起的一次或者多次请求,最后的结果都是相同的,这就是幂等性。在MQ中就是保障消息不要被重复消费。
3.3.在海量订单产生的业务高峰期,如何避免消息的重复消费?
3.3.1.消费端的幂等性
- 消费端实现幂等性,就意味着,我们的消息永远不会消费多次,即使我们收到了多条一样的消息。
3.3.2.业界主流的幂等性操作
3.3.2.1.方案一:唯一ID + 指纹码机制,利用数据库主键去重。
- select count(1) from t_order where id = 唯一ID+指纹码
- 好处:实现简单
- 坏处:高并发下数据库写入性能瓶颈。
- 解决方案:跟进ID进行分库分表进行算法路由。
3.3.2.2.方案二:利用Redis的原子性去实现。
使用Redis进行幂等,需要考虑的问题
- 我们是否要进行数据落库,如果落库的话,关键解决的问题是数据库和缓存如何做到原子性?
- 如果不进行落库,那么都存储到Redis缓存中,如何设置定时同步策略。
3.4.两种消息投递方式:Confirm确认消息、Return返回消息
3.4.1.Confirm确认消息(生产端)
Confirm消息确认机制
- 消息的确认,是指生产者投递消息后,如果Broker收到消息,会给生产者一个应答。
- 生产者进行接收应答,用来确认这条消息是否发送到Broker,这种方式也是消息的可靠性投递的核心保障。
3.4.2.实现Confirm确认消息
消费者
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
/**
* 消费者 用来测试Rabbitmq的Confirm机制
*/
public class Consumer {
public static final String HOST = "192.168.110.133";
public static final String EXCHANGE_NAME = "test_confirm_exchange";
public static final String EXCHANGE_TYPE = "direct";
public static final String ROUTING_KEY = "test.confirm";
public static final String QUEUE_NAME = "test_confirm_queue";
public static void main(String[] args) throws Exception {
// 1、创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(HOST);
// 2、通过创建ConnectionFactory创建Connection
Connection connection = connectionFactory.newConnection();
// 3、通过Connection创建Channel
Channel channel = connection.createChannel();
/**
* 4、声明Exchange
* exchangeDeclare(String exchange,String type,boolean durable,boolean autoDelete,boolean internal,Map<String, Object> arguments)
*/
channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE, true, false, false, null);
/**
* 5、声明队列
* queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
*/
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
/**
* 6、Exchange和Queue的绑定关系
* queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments)
*/
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY, null);
// 7、创建消费者
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
/**
* 8、channel的设置
* basicConsume(String queue, boolean autoAck, Consumer callback)
*/
channel.basicConsume(QUEUE_NAME, true, queueingConsumer);
while (true) {
//9、接收消息
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("*******消费端接收到的消息为=====>" + msg);
}
}
}
生产者设置消息的确认模式和监听
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
/**
* 生产者 测试消息发送到Broker的Confirm机制
* 第一步:声明消息的确认模式 channel.confirmSelect();
* 第二步:添加监听 channel.addConfirmListener();
*/
public class Producer {
public static final String HOST = "192.168.110.133";
public static final String EXCHANGE_NAME = "test_confirm_exchange";
public static final String ROUTING_KEY = "test.confirm";
public static void main(String[] args) throws Exception {
// 1、创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(HOST);
// 2、通过创建ConnectionFactory创建Connection
Connection connection = connectionFactory.newConnection();
// 3、通过Connection创建Channel
Channel channel = connection.createChannel();
// 4、指定消息的确认模式:Confirm
channel.confirmSelect();
/**
* 5、发送消息
* basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
*/
String msg = "Hello Message Confirm...";
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, msg.getBytes());
/**
* 6、添加一个确认监听
*/
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("***********消息成功发送到Broker!***************");
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("**************消息发送失败!********************");
}
});
}
}
3.4.3.Return返回消息(生产端)
Return消息返回机制
- Return Listener用于处理一些不可路由的消息。
- 在某些情况下,如果我们在发送消息的时候,当前的Exchange不存在或者指定的Routing Key不存在,这个时候我们需要监听这种不可达的消息,就要使用Return Listener!
Return机制基础API的关键配置项
- Mandatory:如果为true,则监听器会收到路由不可达的消息,然后进行后续处理;如果为false,那么Broker会自动删除该消息。
3.4.4.实现Return 消息返回
生产者测试Return机制
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* 生产者 测试消息的Return机制
* 当有不可路由的消息时候发送到Broker,来监听这些不可达的消息。
* 第一步:mandatory设置为true监听不可达的消息。
* 第二步:channel.addReturnListener()添加消息返回机制的监听者。
*/
public class Producer {
public static final String HOST = "192.168.110.133";
public static final String EXCHANGE_NAME = "test_confirm_exchange";
public static final String ROUTING_KEY = "TangShi"; //不存在的Routing Key
public static void main(String[] args) throws Exception {
// 1、创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(HOST);
// 2、通过创建ConnectionFactory创建Connection
Connection connection = connectionFactory.newConnection();
// 3、通过Connection创建Channel
Channel channel = connection.createChannel();
/**
* 4、发送消息
* basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body)
* mandatory:如果为true,则监听器会收到路由不可达的消息,然后进行后续处理;如果为false,那么Broker会自动删除该消息。
*/
String msg = "Hello Message Return....";
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, true, null, msg.getBytes());
/**
* 5、监控不可路由的消息
* handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body)
* replyCode:响应码
* replyText:文本
* exchange:具体的Exchange
* routingKey:具体的Routing Key
* properties:消息的属性
* body:实际的消息体的内容。
*/
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body);
System.out.println("replyCode:" + replyCode);
System.out.println("replyText:" + replyText);
System.out.println("exchange:" + exchange);
System.out.println("routingKey:" + routingKey);
System.out.println("body:" + msg);
}
});
}
}
// 控制台打印结果为:
replyCode:312
replyText:NO_ROUTE
exchange:test_confirm_exchange
routingKey:TangShi
body:Hello Message Return....
3.5.自定义消费者监听
自定义消费者
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
/**
* 自定义的消费者
* 需要继承 com.rabbitmq.client.DefaultConsumer
* 重写handleDelivery()方法
*/
public class CustomConsumer extends DefaultConsumer {
public CustomConsumer(Channel channel) {
super(channel);
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body);
System.out.println("consumerTag:" + consumerTag);
System.out.println("envelope:" + envelope.toString());
System.out.println("body:" + msg);
}
}
消费端
package com.ymy.rabbitmq.custom;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* 消费者 测试自定义的消费者
*/
public class Consumer {
public static final String HOST = "192.168.110.133";
public static final String EXCHANGE_NAME = "test_custom_consumer_exchange";
public static final String EXCHANGE_TYPE = "direct";
public static final String ROUTING_KEY = "test.custom";
public static final String QUEUE_NAME = "test_custom_consumer_queue";
public static void main(String[] args) throws Exception {
// 1、创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(HOST); // ConnectionFactory 默认端口是5672 virtualHost是"/"
// 2、通过创建ConnectionFactory创建Connection
Connection connection = connectionFactory.newConnection();
// 3、通过Connection创建Channel
Channel channel = connection.createChannel();
/**
* 4、声明Exchange
* exchangeDeclare(String exchange,String type,boolean durable,boolean autoDelete,boolean internal,Map<String, Object> arguments)
*/
channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE, true, false, false, null);
/**
* 5、声明队列
* queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
*/
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
/**
* 6、Exchange和Queue的绑定关系
* queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments)
*/
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY, null);
/**
* 7、channel的设置
* basicConsume(String queue, boolean autoAck, Consumer callback)
*/
channel.basicConsume(QUEUE_NAME, true, new CustomConsumer(channel)); //这里的消费端用我们自定义的消费者
}
}
// 控制台打印结果:
consumerTag:amq.ctag-hDLldSV5KZuRFkdGJLoYng
envelope:Envelope(deliveryTag=1, redeliver=false, exchange=test_custom_consumer_exchange, routingKey=test.custom)
body:Hello Custom Consumer...
3.6.消息的限流
什么是消费端的限流?
- 假设一个场景,首先我们的Rabbitmq服务器上有上万条未处理的消息,我们随便打开一个消费者客户端,会出现以下情况:
- 巨量的消息瞬间全部推送过来,但是我们单个客户端无法同时处理这么多数据!
- RabbitMQ提供一种qos(服务质量保证)功能,即在非自动确认消息(一定不能设置AutoACK)的前提下,如果一定数目的消息(通过基于consume或者channel设置Qos的值)未被确认前,不进行消费新的消息。
void basicQos(int prefetchSize, int prefetchCount, boolean global)
/**
* prefetchSize:0 不做限制。
* prefetchCount:会告诉RabbitMQ不要同时给一个消费者推送多于N个消息,即一旦有N个消息还没有ACK,则该consumer将block, * 直到有消息ACK。
* global:true应用与channel,false应用于单个consumer。一个channel可以有多个consumer监听。
*/
void basicQos(int prefetchSize, int prefetchCount, boolean global)
- 注意:在AutoACK的情况下,Qos是不会生效的,一定要设置手动签收。
自定义消费者手动ACK
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
/**
* 自定义的消费者
* 需要继承 com.rabbitmq.client.DefaultConsumer
* 重写handleDelivery()方法
*/
public class CustomConsumer extends DefaultConsumer {
private Channel channel;
public CustomConsumer(Channel channel) {
super(channel);
this.channel = channel;
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body);
System.out.println("-----------------------------------------------");
System.out.println("consumerTag:" + consumerTag);
System.out.println("envelope:" + envelope.toString());
System.out.println("body:" + msg);
System.out.println("-----------------------------------------------");
// 设置2S的延迟
try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); }
/**
* 手动ACK
* basicAck(long deliveryTag, boolean multiple)
* multiple:false。因为我们是逐条消息ACK的所以设置为false,不批量签收。
*/
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
消费端
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.ymy.rabbitmq.custom.CustomConsumer;
/**
* 消费端 测试消费端消息的限流
* 注意:消息限流的前提是设置手动ACK!
*/
public class QosConsumer {
public static final String HOST = "192.168.110.133";
public static final String EXCHANGE_NAME = "test_qos_exchange";
public static final String EXCHANGE_TYPE = "direct";
public static final String ROUTING_KEY = "test.qos";
public static final String QUEUE_NAME = "test_qos_queue";
public static void main(String[] args) throws Exception {
// 1、创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(HOST); // ConnectionFactory 默认端口是5672 virtualHost是"/"
// 2、通过创建ConnectionFactory创建Connection
Connection connection = connectionFactory.newConnection();
// 3、通过Connection创建Channel
Channel channel = connection.createChannel();
/**
* 4、声明Exchange
* exchangeDeclare(String exchange,String type,boolean durable,boolean autoDelete,boolean internal,Map<String, Object> arguments)
*/
channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE, true, false, false, null);
/**
* 5、声明队列
* queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
*/
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
/**
* 6、Exchange和Queue的绑定关系
* queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments)
*/
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY, null);
/**
* 7、消息限流
* ACK一定要设置为false!
* prefetchCount设置为1,消费端处理完一条消息之后Broker再推送一条消息
*/
channel.basicQos(0, 1, false);
channel.basicConsume(QUEUE_NAME, false, new CustomConsumer(channel));
}
}
3.7.消费端的ACK与重回队列机制
3.7.1.消费端的手工ACK和NACK
- 消费端进行消费的时候,如果由于业务异常我们可以进行日志记录,然后进行补偿!
- 如果由于服务器宕机等严重问题,那我们就需要手工进行ACK保障消费端消费成功。
3.7.2.消费端的重回队列
- 消费端重回队列是为了对没有处理成功的消息,把消息重新会递给Broker!
- 一般在实际应用中,都会关闭重回队列,也就是设置为false。
自定义消费者ACK、NACK和重回队列
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
/**
* 自定义消费者 测试ACK和NACK
*/
public class CstConsumer extends DefaultConsumer {
private Channel channel;
public CstConsumer(Channel channel) {
super(channel);
this.channel = channel;
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body);
System.out.println("---------------------------------------");
System.out.println("consumerTag:" + consumerTag);
System.out.println("envelope:" + envelope.toString());
System.out.println("body:" + msg);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if ((Integer) properties.getHeaders().get("flag") == 0) {
System.out.println("********NACK***************");
/**
* basicNack(long deliveryTag, boolean multiple, boolean requeue)
* multiple:是否批量NACK
* requeue:是否重回队列
*/
channel.basicNack(envelope.getDeliveryTag(), false, true);
} else {
System.out.println("********ACK***************");
// basicAck(long deliveryTag, boolean multiple)
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
}
消费端
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class AckConsumer {
public static final String HOST = "192.168.110.133";
public static final String EXCHANGE_NAME = "test_ack_exchange";
public static final String EXCHANGE_TYPE = "direct";
public static final String ROUTING_KEY = "test.ack";
public static final String QUEUE_NAME = "test_ack_queue";
public static void main(String[] args) throws Exception {
// 1、创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(HOST); // ConnectionFactory 默认端口是5672 virtualHost是"/"
// 2、通过创建ConnectionFactory创建Connection
Connection connection = connectionFactory.newConnection();
// 3、通过Connection创建Channel
Channel channel = connection.createChannel();
/**
* 4、声明Exchange
* exchangeDeclare(String exchange,String type,boolean durable,boolean autoDelete,boolean internal,Map<String, Object> arguments)
*/
channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE, true, false, false, null);
/**
* 5、声明队列
* queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
*/
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
/**
* 6、Exchange和Queue的绑定关系
* queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments)
*/
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY, null);
/**
* 7、消息消费
* ACK一定要设置为false!
* basicConsume(String queue, boolean autoAck, Consumer callback)
*/
channel.basicConsume(QUEUE_NAME, false, new CstConsumer(channel));
}
}
生产端
import com.rabbitmq.client.*;
import java.util.HashMap;
import java.util.Map;
public class AckProducer {
public static final String HOST = "192.168.110.133";
public static final String EXCHANGE_NAME = "test_ack_exchange";
public static final String ROUTING_KEY = "test.ack";
public static void main(String[] args) throws Exception {
// 1、创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(HOST); // ConnectionFactory 默认端口是5672 virtualHost是"/"
// 2、通过创建ConnectionFactory创建Connection
Connection connection = connectionFactory.newConnection();
// 3、通过Connection创建Channel
Channel channel = connection.createChannel();
for (int i = 0; i < 5; i++) {
/**
* 4、发送消息
* basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
*/
String msg = "Hello Rabbitmq Ack Test..." + i;
Map<String, Object> headers = new HashMap<>();
headers.put("flag", i);
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
.headers(headers)
.build();
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, properties, msg.getBytes());
}
}
}
3.8.TTL消息
TTL是什么?
- TTL:Time To Live,也就是生存时间。
- RabbitMQ支持消息的过期时间,在消息发送时可以指定。
- RabbitMQ支持队列的过期时间,从消息入队开始计算,只要超过了队列的超时时间配置,那么消息会自动清除。
3.9.死信队列
3.9.1.DLE(Dead-Letter-Exchange)
- 利用DLX,当消息在一个队列中变成Dead Message后,它会被重新publish到另一个Exchange,这个Exchange就是DLX。
- DLX也是一个正常的Exchange,和一般的Exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。
- 当这个队列中有Dead Message时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange上去,进而被路由到另一个队列。
- 可以监听这个死信队列中消息做相应的处理,这个特性可以弥补RabbitMQ3.0以前支持的immediate参数的功能。
3.9.2.消息变成Dead Message的情况
- 消息被消费者拒绝(basicReject/basicNack)并且不能重回队列requeue=false。
- 消息TTL过期。
- 队列达到最大长度。
3.9.3.死信队列的代码实现
定义DLX
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* 死信队列:DLX其实是正常的Exchange
* 注:DLX的Exchange Type必须是topic。Routing Key为"#"表示全部匹配。
*/
public class DLX {
public static final String HOST = "192.168.110.133";
public static final String EXCHANGE_NAME = "dlx_exchange";
public static final String EXCHANGE_TYPE = "topic";
public static final String ROUTING_KEY = "#";
public static final String QUEUE_NAME = "dlx_queue";
public static void main(String[] args) throws Exception {
// 1、创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(HOST); // ConnectionFactory 默认端口是5672 virtualHost是"/"
// 2、通过创建ConnectionFactory创建Connection
Connection connection = connectionFactory.newConnection();
// 3、通过Connection创建Channel
Channel channel = connection.createChannel();
/**
* 4、声明Exchange
* exchangeDeclare(String exchange,String type,boolean durable,boolean autoDelete,boolean internal,Map<String, Object> arguments)
*/
channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE, true, false, false, null);
/**
* 5、声明队列
* queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
*/
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
/**
* 6、Exchange和Queue的绑定关系
* queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments)
*/
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY, null);
}
}
消费端
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.ymy.rabbitmq.dlx.custom.NormalCustomConsume;
import java.util.HashMap;
import java.util.Map;
/**
* 消费者 接收正常Queue的消息
* 声明队列的时候加上arguments属性指定DLX
*/
public class NormalConsumer {
public static final String HOST = "192.168.110.133";
public static final String EXCHANGE_NAME = "normal_exchange";
public static final String DLX_EXCHANGE_NAME = "dlx_exchange";
public static final String EXCHANGE_TYPE = "direct";
public static final String ROUTING_KEY = "test.normal";
public static final String QUEUE_NAME = "normal_queue";
public static void main(String[] args) throws Exception {
// 1、创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(HOST); // ConnectionFactory 默认端口是5672 virtualHost是"/"
// 2、通过创建ConnectionFactory创建Connection
Connection connection = connectionFactory.newConnection();
// 3、通过Connection创建Channel
Channel channel = connection.createChannel();
/**
* 4、声明Exchange
* exchangeDeclare(String exchange,String type,boolean durable,boolean autoDelete,boolean internal,Map<String, Object> arguments)
*/
channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE, true, false, false, null);
/**
* 5、声明队列
* queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
*/
// 在属性中添加DLX
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", DLX_EXCHANGE_NAME);
channel.queueDeclare(QUEUE_NAME, true, false, false, arguments);
/**
* 6、Exchange和Queue的绑定关系
* queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments)
*/
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY, null);
/**
* 7、消费者
* basicConsume(String queue, boolean autoAck, Consumer callback)
*/
channel.basicConsume(QUEUE_NAME, false, new NormalCustomConsume(channel));
}
}
自定义的消费者
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
/**
* 自定义的消费者
*/
public class NormalCustomConsume extends DefaultConsumer {
private Channel channel;
public NormalCustomConsume(Channel channel) {
super(channel);
this.channel = channel;
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body);
System.out.println("--------------------------------------------------------------------");
System.out.println("********body*******:" + body);
if ((Integer) properties.getHeaders().get("flag") == 0) {
System.out.println("********NACK************");
// basicNack(long deliveryTag, boolean multiple, boolean requeue)
channel.basicNack(envelope.getDeliveryTag(), false, false);
} else {
System.out.println("********ACK************");
// basicAck(long deliveryTag, boolean multiple)
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
}
生产端
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.util.HashMap;
import java.util.Map;
/**
* 生产者 向正常的Exchange投递消息
*/
public class NormalProducer {
public static final String HOST = "192.168.110.133";
public static final String EXCHANGE_NAME = "normal_exchange";
public static final String ROUTING_KEY = "test.normal";
public static void main(String[] args) throws Exception {
// 1、创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(HOST); // ConnectionFactory 默认端口是5672 virtualHost是"/"
// 2、通过创建ConnectionFactory创建Connection
Connection connection = connectionFactory.newConnection();
// 3、通过Connection创建Channel
Channel channel = connection.createChannel();
for (int i = 0; i < 5; i++) {
String msg = "Hello Rabbitmq Message.." + i;
Map<String, Object> headers = new HashMap<>();
headers.put("flag", i);
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
.headers(headers)
.expiration("10000") // 10S没有消费者处理该消息就过期
.build();
// 发送消息 basicPublish(String exchange, String routingKey, AMQP.BasicProperties props, byte[] body)
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, properties, msg.getBytes());
}
}
}
4.RabbitMQ高级整合应用
4.1.RabbitMQ整合Spring AMQP
4.1.1.RabbitAdmin
- 注意:autoStartUp必须要设置为true,否则Spring容器不会加载RabbitAdmin类。
- RabbitAdmin底层实现就是从Spring容器中获取Exchange、Binding、RoutingKey以及Queue的@Bean声明。
- 然后使用RabbitTemplate的execute()方法执行对应的声明、修改、删除等一系列RabbitMQ基础功能操作。例如:添加一个Exchange、删除一个Binding、清空队列里的消息等等。
pom
<!--踩坑记:amqp-client低版本和spring-amqp整合会报错-->
<!--amqp-client version 5.4.3-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
</dependency>
<!--spring-amqp version 2.2.2-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置类
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConf {
private static final String ADDRESSES = "192.168.110.133:5672";
private static final int PORT = 5672;
private static final String USERNAME = "guest";
private static final String PASSWORD = "guest";
private static final String VIRTUAL_HOST = "/";
/**
* ConnectionFactory
*/
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
cachingConnectionFactory.setAddresses(ADDRESSES);
cachingConnectionFactory.setPort(PORT);
cachingConnectionFactory.setUsername(USERNAME);
cachingConnectionFactory.setPassword(PASSWORD);
cachingConnectionFactory.setVirtualHost(VIRTUAL_HOST);
return cachingConnectionFactory;
}
/**
* RabbitAdmin
* 注意:该方法的参数要和上面ConnectionFactory的Bean名字一致
*/
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
rabbitAdmin.setAutoStartup(true); // autoStartUp要设置为true
return rabbitAdmin;
}
}
RabbitAdmin基础API
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
/**
* RabbitMQ 测试类
*/
@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitMQTest {
@Autowired
private RabbitAdmin rabbitAdmin;
@Test
public void testAdmin() {
/**
* 1、RabbitAdmin声明Exchange
*/
// DirectExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments)
rabbitAdmin.declareExchange(new DirectExchange("test.direct", false, false, null));
// TopicExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments)
rabbitAdmin.declareExchange(new TopicExchange("test.topic", false, false, null));
// FanoutExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments)
rabbitAdmin.declareExchange(new FanoutExchange("test.fanout", false, false, null));
/**
* 2、RabbitAdmin声明Queue
*/
// Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, @Nullable Map<String, Object> arguments)
rabbitAdmin.declareQueue(new Queue("test.direct.queue", false, false, false, null));
rabbitAdmin.declareQueue(new Queue("test.topic.queue", false, false, false, null));
rabbitAdmin.declareQueue(new Queue("test.fanout.queue", false, false, false, null));
/**
* 3、RabbitAdmin绑定Exchange和Queue
*/
// Binding(String destination, DestinationType destinationType, String exchange, String routingKey, @Nullable Map<String, Object> arguments)
rabbitAdmin.declareBinding(new Binding("test.direct.queue", Binding.DestinationType.QUEUE, "test.direct", "admin.direct", null));
/**
* 4、RabbitAdmin声明Queue和Exchange并进行绑定
*/
rabbitAdmin.declareBinding(BindingBuilder
.bind(new Queue("test.topic.queue", false, false, false, null))
.to(new TopicExchange("test.topic", false, false, null))
.with("user.#")); // Topic Exchange这里有Routing KeyAPI
// Fanout Exchange 这里没有Routing Key API
rabbitAdmin.declareBinding(BindingBuilder
.bind(new Queue("test.fanout.queue", false, false, false, null))
.to(new FanoutExchange("test.fanout", false, false, null)));
/**
* 5、RabbitAdmin 清空指定队列的消息
*/
rabbitAdmin.purgeQueue("test.direct.queue", false);
}
}
Spring容器声明Exchange、Queue和Binding
/**
* 1、设置Exchange的类型
* DirectExchange
* TopicExchange
* FanoutExchange
* 2、将Queue绑定到Exchange
*
* 3、在配置类中声明Exchange、Queue、Binding就可以直接使用了!!!
*/
@Bean
public TopicExchange topicExchange01() {
// TopicExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments)
return new TopicExchange("topic01", true, false, null);
}
@Bean
public Queue queue01() {
// Queue(String name, boolean durable, boolean exclusive, boolean autoDelete)
return new Queue("queue01", true, false, false);
}
@Bean
public Binding binding01() {
return BindingBuilder.bind(queue01()).to(topicExchange01()).with("spring.*");
}
4.1.2.RabbitTemplate
- RabbitTemplate是RabbitMQ在与SpringAMQP整合的时候进行发送消息的关键类。
- RabbitTemplate提供了可靠性消息投递方法、回调监听消息接口ConfirmCallback、返回值确认接口ReturnCallback等等。同样我们需要进行注入到spring容器中,然后直接使用。
- 在与Spring整合时需要实例化,但是在与SpringBoot整合时,在配置文件里添加配置即可。
配置类
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConf {
private static final String ADDRESSES = "192.168.110.133:5672";
private static final int PORT = 5672;
private static final String USERNAME = "guest";
private static final String PASSWORD = "guest";
private static final String VIRTUAL_HOST = "/";
/**
* ConnectionFactory
*/
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
cachingConnectionFactory.setAddresses(ADDRESSES);
cachingConnectionFactory.setPort(PORT);
cachingConnectionFactory.setUsername(USERNAME);
cachingConnectionFactory.setPassword(PASSWORD);
cachingConnectionFactory.setVirtualHost(VIRTUAL_HOST);
return cachingConnectionFactory;
}
/**
* RabbitAdmin
* 注意:该方法的参数要和上面ConnectionFactory的Bean名字一致
*/
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
rabbitAdmin.setAutoStartup(true); // autoStartUp要设置为true
return rabbitAdmin;
}
/**
* RabbitTemplate
* 注意:该方法的参数要和上面ConnectionFactory的Bean名字一致
*/
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
return new RabbitTemplate(connectionFactory);
}
/**
* 1、设置Exchange的类型
* DirectExchange
* TopicExchange
* FanoutExchange
* 2、将Queue绑定到Exchange
*/
@Bean
public TopicExchange topicExchange01() {
// TopicExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments)
return new TopicExchange("topic01", true, false, null);
}
@Bean
public Queue queue01() {
// Queue(String name, boolean durable, boolean exclusive, boolean autoDelete)
return new Queue("queue01", true, false, false);
}
@Bean
public Binding binding01() {
return BindingBuilder.bind(queue01()).to(topicExchange01()).with("spring.*");
}
}
RabbitTemplate发送消息
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitTemplateTest {
@Autowired
private RabbitTemplate rabbitTemplate;
private static final String EXCHANGE_NAME = "topic01";
private static final String ROUTING_KEY = "spring.template";
@Test
public void testSendMessage() {
// 1、spring封装的消息属性
MessageProperties messageProperties = new MessageProperties();
messageProperties.getHeaders().put("describe", "信息描述");
messageProperties.getHeaders().put("type", "自定义信息类型");
// 2、创建消息
Message message = new Message("Hello RabbitMQ...".getBytes(), messageProperties);
/**
* 3、发送消息
* convertAndSend(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor)
* message:Object类型不是只能传Message,也可以传其他对象。
* messagePostProcessor:在消息发送到Broker之前対Message进行修改。
*/
rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, message, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
System.out.println("************消息发送到Broker之前再对消息进行额外的设置**************");
message.getMessageProperties().getHeaders().put("describe", "额外的信息修改描述");
message.getMessageProperties().getHeaders().put("attribute", "额外新加的attribute");
return message;
}
});
}
}
4.1.3.SimpleMessageListenerContainer
- SimpleMessageListenerContainer这个类非常强大,可以监听队列(多个队列)、自动启动、自动声明功能。
- 设置事务特性、事务管理器、事务属性、事务容量、是否开启事务、回滚消息等。
- 设置消费者数量、最小最大数量、批量消费。
- 设置消息确认和消息签收模式、是否重回队列、异常捕获Handler函数。
- 设置消费者标签生成策略、是否独占模式、消费者属性等。
- 设置具体的监听器、消息转换器等等。
- 注意:SimpleMessageListenerContainer可以进行动态设置,比如在运行中的应用可以动态的修改其消费者数量的大小、接收消息的模式等。
- 很多基于RabbitMQ的自制定化后端管控台在进行动态设置的时候,也是根据这个动态设置特性去实现的。所以可以看出SpringAMQP非常的强大。
配置
/**
* SimpleMessageListenerContainer
* 配置好了之后启动SpringBoot主函数即可消费Message
*/
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
// setQueues(Queue... queues) 设置队列的监听!
container.setQueues(queue01());
// 设置消费者数量
container.setConcurrentConsumers(1);
container.setMaxConcurrentConsumers(5);
// 设置是否重回队列
container.setDefaultRequeueRejected(false);
// 设置消息的签收模式
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
// 生成消费端的标签策略
container.setConsumerTagStrategy(new ConsumerTagStrategy() {
@Override
public String createConsumerTag(String queue) {
return queue + "_" + UUID.randomUUID().toString().substring(0, 6);
}
});
// 消息的监听
container.setMessageListener(new ChannelAwareMessageListener() {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
String msg = new String(message.getBody());
System.out.println("*******消费者*******:" + msg);
}
});
return container;
}
4.1.4.MessageListenerAdapter
- MessageListenerAdapter:消息监听适配器。
自定义的消息委托者
/**
* 这个类是我们自定义的,但是方法名都是固定的。
*/
public class MessageDelegate {
public void handleMessage(byte[] messageBody) {
System.out.println("默认方法,消息内容:" + new String(messageBody));
}
}
配置
/**
* SimpleMessageListenerContainer
* 配置好了之后启动SpringBoot主函数即可消费Message
*/
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
// setQueues(Queue... queues) 设置队列的监听!
container.setQueues(queue01());
// 设置消费者数量
container.setConcurrentConsumers(1);
container.setMaxConcurrentConsumers(5);
// 设置是否重回队列
container.setDefaultRequeueRejected(false);
// 设置消息的签收模式
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
// 生成消费端的标签策略
container.setConsumerTagStrategy(new ConsumerTagStrategy() {
@Override
public String createConsumerTag(String queue) {
return queue + "_" + UUID.randomUUID().toString().substring(0, 6);
}
});
// 通过MessageListenerAdapter进行消息的监听
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
container.setMessageListener(adapter);
return container;
}
4.1.5.MessageConverter
- 在进行发送消息的时候,正常情况下消息体为二进制的数据方式进行传输,如果希望内部帮我们进行转换,或者指定自定义的转换器,就需要用到MessageConverter。
- 自定义常用转换器:MessageConverter,一般来讲都需要实现这个接口。
- 重写下面两个方法:
- toMessage():java对象转换为Message。
- fromMessage():Message对象转换为java对象。
- Json转换器:Jackson2JsonMessageConverter:可以将java对象转为json。
- DefaultJackson2JavaTypeMapper映射器:可以进行Java对象的映射。
- 自定义二进制转换器:比如图片类型、PDF、PPT、流媒体。
4.2.RabbitMQ整合Spring Boot
4.2.1.生产端
pom
<!--amqp-client version 5.4.3-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
</dependency>
<!--spring-amqp version 2.2.2-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
application.yml
server:
port: 8801
spring:
application:
name: spring-boot-rabbitmq-producer
rabbitmq:
host: 192.168.110.133
port: 5672
username: guest
password: guest
virtual-host: /
publisher-confirm-type: simple # 开启消息确认
publisher-returns: true # 开启消息返回
template:
mandatory: true # 设置true 监听器会收到路由不可达消息
配置类
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
/**
* 消息序列化
*/
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
}
发送消息并设置Confirm和Return
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.util.Map;
import java.util.UUID;
@Component
public class RabbitSender {
private static final String EXCHANGE_NAME = "exchange-1";
private static final String ROUTING_KEY = "springboot.hello";
@Autowired
private RabbitTemplate rabbitTemplate;
final ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("*************消息发送成功**************");
System.out.println("correlationData:" + correlationData);
System.out.println("ack:" + ack);
System.out.println("cause:" + cause);
}
};
final ReturnCallback returnCallback = new ReturnCallback() {
@Override
public void returnedMessage(org.springframework.amqp.core.Message message,
int replyCode, String replyText,
String exchange, String routingKey) {
System.out.println("**************Return********************");
System.out.println("replyCode:" + replyCode);
System.out.println("replyText:" + replyText);
System.out.println("exchange:" + exchange);
System.out.println("routingKey:" + routingKey);
}
};
public void sendObject(Object message) {
// 设置消息确认
rabbitTemplate.setConfirmCallback(confirmCallback);
// 设置消息返回
rabbitTemplate.setReturnCallback(returnCallback);
// id + 时间戳 全局唯一
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString().substring(0, 6) + "_" + LocalDateTime.now());
// convertAndSend(String exchange, String routingKey, Object message, CorrelationData correlationData)
rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, message, correlationData);
}
测试
@RunWith(SpringRunner.class)
@SpringBootTest
public class SendAndCallbackTest {
@Resource
private RabbitSender rabbitSender;
@Test
public void sendObject() {
rabbitSender.sendObject(new Order("1", "订单"));
}
}
4.2.2.消费端
消费端配置详解
- 首先配置手工确认模式,用于ACK的手工处理,这样我们可以保证消息的可靠性送达,或者再消费端消费失败的时候可以做到重回队列、根据业务记录日志等处理。
- 可以设置消费端的监听个数和最大个数,用于控制消费端的并发情况。
- 消费端监听@RabbitMQListener注解,这个在实际工作中非常好用。
- @RabbitMQListener是一个组合注解,里面可以注解配置@QueueBinding、@Queue、@Exchange直接通过这个组合注解一次性搞定消费端Exchange、Queue、Binding、Routing Key,并且配置监听功能等。
pom
<!--web-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--amqp-client version 5.4.3-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
</dependency>
<!--spring-amqp version 2.2.2-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
application.yml
server:
port: 8802
spring:
application:
name: spring-boot-rabbitmq-consumer
rabbitmq:
host: 39.97.3.60
port: 5672
username: guest
password: guest
virtual-host: /
listener:
simple:
acknowledge-mode: manual # 手工签收
concurrency: 5 # 最小的监听者数量
max-concurrency: 10 # 最大的监听者数量
配置类
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConf {
/**
* 消息序列化
*/
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
}
消息的监听
import com.rabbitmq.client.Channel;
import com.ymy.spring.boot.rabbitmq.entity.Order;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* 消费端监听队列 如果Message是java对象一定要写无参构造器!!
*/
@Component
public class RabbitReceiverObject {
private static final String EXCHANGE_NAME = "exchange-object";
private static final String ROUTING_KEY = "springboot.*";
private static final String QUEUE_NAME = "queue-object";
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = QUEUE_NAME,
durable = "true"),
exchange = @Exchange(value = EXCHANGE_NAME,
type = ExchangeTypes.TOPIC,
durable = Exchange.FALSE),
key = ROUTING_KEY
))
@RabbitHandler
public void onOrderMessage(@Payload Order order,
@Headers Map<String, Object> headers, Channel channel) throws Exception {
Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
System.out.println("order id---> " + order.getId());
System.out.println("order name---> " + order.getName());
System.out.println(" deliveryTag ----> " + deliveryTag);
channel.basicAck(deliveryTag, false);
}
}
4.3.RabbitMQ整合Spring Cloud Stream
4.3.1.Stream的基本介绍
- @Output:输出注解,用于定义消息生产者接口。
- @Input:输入注解,用于定义消息的消费者接口。
- @StreamListener:用于定义监听方法的注解。
- Spring Cloud Stream框架在实现高性能消息的生产和消费的场景非常合适,但是有一个非常大的问题就是不能实现可靠性的投递,也就是没办法保证消息的100%可靠性,会存在少量消息丢失的问题。
4.3.2.生产端
pom
<dependencies>
<!--web-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--监控-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<!--stream rabbit -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<!--test-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
application.yml
server:
port: 8803
spring:
application:
name: spring-cloud-stream-rabbitmq-producer
cloud:
stream:
binders:
rabbit:
type: rabbit
environment:
spring:
rabbitmq:
host: 39.97.3.60
port: 5672
username: guest
password: guest
virtual-host: /
bindings:
output:
destination: exchange_stream
group: queue_stream
binder: rabbit # 这里和binders.rabbit对应
发送消息
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
@EnableBinding(Source.class) // 一定要启动绑定
@Component
public class RabbitSender {
@Autowired
private MessageChannel output;
/**
* 发送消息
*/
public void send(Object message) {
output.send(MessageBuilder.withPayload(message).build());
}
}
测试
import com.ymy.spring.cloud.stream.sender.RabbitSender;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import javax.annotation.Resource;
@RunWith(SpringRunner.class)
@SpringBootTest
public class SendTest {
@Resource
private RabbitSender rabbitSender;
@Test
public void send() {
rabbitSender.send("hello rabbitmq!");
}
}
4.3.3.消费端
pom
<dependencies>
<!--web-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--监控-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<!--stream rabbit -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<!--test-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
application.yml
server:
port: 8804
spring:
application:
name: spring-cloud-stream-rabbitmq-consumer
cloud:
stream:
binders:
rabbit:
type: rabbit
environment:
spring:
rabbitmq:
host: 39.97.3.60
port: 5672
username: guest
password: guest
virtual-host: /
bindings:
input:
destination: exchange_stream
group: queue_stream
binder: rabbit # 这里和binders.rabbit对应
consumer:
concurrency: 1
rabbit:
bindings:
input:
consumer:
acknowledge-mode: MANUAL
max-concurrency: 5
消费端监听
package com.ymy.spring.cloud.stream.receive;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
@EnableBinding(Sink.class)
@Component
public class RabbitReceiver {
@StreamListener(Sink.INPUT)
public void receiver(Message message) throws Exception{
Channel channel = (Channel) message.getHeaders().get(AmqpHeaders.CHANNEL);
Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
System.out.println(message.getPayload().getClass().getSimpleName());
channel.basicAck(deliveryTag, false);
}
}