3.5 消息系统
3.5.1 消息系统的作用
在架构设计方法论中,有一种架构模式被称为事件驱动架构模式。这种架构模式是一种异步分发事件的模式,常用于设计高度可拓展的应用。事件驱动的架构模式有两种实现方式:一种是中介模式,另一种是代理模式。
中介模式适合于复杂的业务流程,需要一个居中协调的业务场景。代理模式适合业务需求简单,但对处理速度和扩展性要求很高的业务场景。无论采用哪一种模式,在源系统(即触发事件的系统)和事件处理器(即真正拥有业务逻辑的组件)之间都需要一个连接点,这个连接点就是消息系统。图3-62和图3-63分别展示了中介模式和代理模式的流程。
图3-62 中介模式的流程
图3-63 代理模式的流程
消息系统一般用于以下两种情况:
• 组件解耦。
• 流程异步化。
3.5.2 消息系统的两种模式
消息系统的主流实现有两种模式,分别是MQ(Message Queue,消息队列)模式和Publish/Subscribe(发布/订阅)模式,下面我们分别对这两种模式进行介绍:
1. MQ模式
如图3-64所示,在MQ模式下,消息的生产者和订阅者通过消息队列进行解耦,多个生产者可以向同一个Queue发生消息。只要有任何一个消费者读取(在读取时会加锁)并消费了该消息,那么这条消息将会从队列中被移除,其他的消费者将无法再次消费该消息。因此,对于任何一条消息,在MQ模式下有且仅有一个消费者可以消费该消息(消息只能被消费一次)。
图3-64 MQ模式
2. Publish/Subscribe模式
如图3-65所示,在Publish/Subscribe模式中,任意消息都可以被多个订阅者同时接收和处理,这种方式允许消息生产者同时通知多个订阅者,以达到消息群发的目的。
图3-65 Publish/Subscribe模式
大多数开源中间件都同时实现了这两种消息处理模式,目前主流的中间件是MQ、RabbitMQ和RocketMQ,本书将以RabbitMQ为例来编写课程代码。(注:Kafka是一种与MQ截然不同的消息系统)
RabbitMQ是一个通用的消息代理,目前RabbitMQ支持的协议有MQTT、AMQP、和STOMP。在RabbitMQ的顶层架构图中主要有四种组件,RabbitMQ架构如图3-66所示。
如图3-66所示,图中四个组件的具体功能如下:
(1)Producer:Producer是消息的生产者,它推送消息到交换机,同时也负责产生消息交换路由(Routing)的主键(Routing Key)。
图3-66 RabbitMQ架构图
(2)Exchange(交换机)和Queue(队列):
• Exchanges将接收的消息路由到其他交换机或队列中。
• RabbitMQ发送ACK(即Acknowledge回执)到Producer,告知其消息已被处理。
(3)Consumer(消息消费者):
Consumer维持一个与RabbitMQ服务器的长连接,并告知RabbitMQ它在消费哪些消息队列的消息。一旦有消息到达指定queue,该消息就会被推送(push)给Consumer。Consumer发送ACK给RabbitMQ,告诉RabbitMQ这条消息是消费成功还是消费失败。一旦消费成功,该消息将会从Queue中被移除。
RabbitMQ可以配置多种消息消费模式。
• Producer和Consumer是一对一的关系,即一个Producer只向一个Queue发送消息,并且只有一个Consumer从这个Queue中消费消息,Producer和Consumer的一对一消费关系如图3-67所示。
图3-67 Producer和Consumer的一对一消费关系
• 多个Producer对应多个Queue,即任意Producer发来的消息将被发布到多个Queue中,每个Queue对应一个Consumer,如图3-68所示。
图3-68 每个Consumer消费一条消息
• 多个Producer对应多个Consumer,但这些Consumer被配置到一个消费组中,同一个消费组内的每条消息仅被组内的一个Consumer消费,如图3-69所示。
图3-69 每条消息仅被一个Consumer消费
从架构角度来看,Kafka与MQ完全不同,Kafka最初被设计为分布式日志提交系统,在Kafka体系中也没有Queue这种队列结构的实现。Kafka的特点如下:
• 分布式(Distributed):Kafka系统都是以分布式集群部署的方式实现容错和扩展的。
• 多副本(Replicated):任何一条日志(或消息)在Kafka系统里面都会被复制多份并保存在不同的集群节点。
• 日志(Log):在Kafka系统中所有的消息都以日志的形式存在,这些日志被归类为不同的主题(Topic)。
在Kafka中最重要的几个概念如下:
• Topic:在Kafka中,所有的消息(log)都是按照Topic进行归类的,每个Topic的消息都会被分布到多个Partition上,消息在Partition上按照写入的先后顺序排列,Kafka消息写入的过程如图3-70所示。
图3-70 Kafka消息写入的过程
• Broker:Kafka集群中的一台或多台服务器统称为Broker,Topic的消息写入Partition中,而Partition存在于Broker上,每个Broker可以包含多个不同Topic的Partition。
• Partition:Partition是Topic物理上的分组,每个Topic内的消息都被分成多个Partition存储,每个Partition又有多个副本。在这些Partition中,有的是Leader角色(Leader可以被认为是主节点),即负责消息的写入;有的是Replica角色(即Leader的备份),Replica可以在Leader不可用的时候转化为Leader角色。 Broker和Leader的关系如图3-71所示。
图3-71 Broker和Leader的关系
• Offset:任何一条消息被写入Partition时,都会被分配一个ID,这个ID就是Offset,Offset是一条消息在Partition上被存放的位置。
• Producer:消息的发送者,它只和Leader角色的Partition进行通信,当消息通过Leader被写入集群之后,Leader会将消息复制给其他节点。Producer将日志写入Leader的过程如图3-72所示。
图3-72 Producer将日志写入Leader的过程
• Consumer group:即消费组,在Kafka中消费者是以消费组的形式存在的,当消费组订阅某个Topic后,消费组中的Consumer会和Topic下的某个Partition建立消费关系。Consumer按照Offset的先后顺序从Partition上获取消息进行消费,并自行维护其当前读取的Offset值。Consumer Group消费消息如图3-73所示。
图3-73 Consumer Group消费消息
总结来说,MQ和Kafka的区别如下:
• MQ既支持MQ模式又支持publish/subscribe模式,Kafka不支持MQ模式,只支持基于Topic的publish/subscribe模式。
• MQ的客户端所收到的消息是从MQ推送(Push)到客户端的,而Kafka的客户端要自己从服务器上拉取(Pull)消息。
• Kafka相比传统的MQ组件有更优秀的吞吐量。
Kafka通常和ZooKeeper搭配使用,Kafka节点的数据也保存在ZooKeeper中。由于篇幅原因,本章不对ZooKeeper做深入介绍,感兴趣的读者可以自行搜索ZooKeeper相关知识进行学习。
3.5.3 集成RabbitMQ
3.5.3.1 安装RabbitMQ
手动安装RabbitMQ的流程比较复杂,因为RabbitMQ需要ErLang环境支持,建议读者使用Docker镜像来安装RabbitMQ,本节以Docker镜像为例讲解RabbitMQ的安装过程。
在成功安装Docker之后(请读者至Docker官网下载安装Docker,或者至本书第17章Docker容器技术了解Docker的安装过程),在命令行执行以下命令获取RabbitMQ的Docker镜像:
(1)docker pull rabbitmq。
(2)docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management。
我们配置了端口5672和15672,其中端口5672是MQ组件监听的TCP端口,端口15672是AdminUI界面使用的HTTP端口,运行成功之后,尝试打开Admin UI的地址(http://localhost:15672)进入登录界面,RabbitMQ登录界面如图3-74所示。
图3-74 RabbitMQ登录界面
输入默认的用户名和密码(guest/guest),进入Admin Portal页面,Admin Portal首页如图3-75所示。
图3-75 Admin Portal首页
3.5.3.2 Spring Boot与MQ集成
本节我们将在coupon-user-service项目中添加一个Producer()方法,当用户使用了优惠券后发送一条消息到MQ组件,同时在coupon-template-service中添加一个Consumer()方法,用来消费这条消息。
第一步,我们需要在coupon-user-service项目的pom.xml文件中添加MQ的依赖项,具体代码如下:
第二步,在application.yml文件中添加RabbitMQ相关的连接串和用户名、密码的配置,具体代码如下:
第三步,创建RabbitMqProducer类,我们通过这个类发送消息到RabbitMQ,具体代码如下:
第四步,在UserServiceImpl中注入RabbitMqProducer对象,并在placeOrder()方法中添加消息发送的逻辑,具体代码如下:
完成以上几步修改之后,coupon-user-service的改造就完成了。
第五步,我们用相同的方式在coupon-template-service项目中添加spring-boot-starter-amqp依赖项,并在application.yml文件中配置RabbitMQ,application.xml文件的具体代码如下:
注意,coupon-template-service中的RabbitMQ配置项的名称和coupon-user-service中的配置项名称是不同的。
第六步,我们创建RabbitConsumer作为服务消费者,具体代码如下:
注意,通过@RabbitListener注解内的参数名称可以看出,RabbitConsumer连接的是RabbitMQ中的Queue队列,而不是Exchange交换机。因此,我们需要在RabbitMQ控制台中做一些配置,将Queue和Exchange二者关联起来。
首先,登录RabbitMQ控制台,单击Exchange面板下的Add a new exchange按钮,添加Exchange的入口,如图3-76所示。
图3-76 添加Exchange的入口
下一个页面是添加Exchange的页面,我们输入Name参数为broadview.direct,并选择Type参数为direct,单击Add exchange按钮完成添加操作。添加Exchange的页面如图3-77所示。
图3-77 添加Exchange的页面
添加成功之后,我们就可以看到如图3-78所示的Exchange详情页面。
图3-78 Exchange详情页面
Exchange添加好之后,我们需要添加对应的Queue。在Queues面板下单击Add a new queue按钮,添加Queue的入口页面如图3-79所示。
图3-79 添加Queue的入口页面
此时我们跳转到如图3-80所示的添加Queue页面,在页面中输入Name参数为broadview.queue,其他均保持默认值即可,单击Add queue按钮提交。
图3-80 添加Queue页面
在Queue添加完成之后,我们回到如图3-78所示的Exchange页面,在页面中的Add binding from this exchange部分设置To queue为broadview.queue,设置Routing key为broadview.routingKey,并单击Bind按钮。绑定Queue到Exchange的页面如图3-81所示。
图3-81 绑定Queue到Exchange
配置妥当之后,我们将看到如图3-82所示的绑定成功信息。
图3-82 绑定成功信息
为了验证本节配置的正确性,读者可以启动项目,并调用coupon-user-service服务发送消息,同时观察消息是否顺利发送到RabbitMQ并被消费者正确消费。
3.5.4 集成Kafka
3.5.4.1 安装Kafka
Kafka依赖于ZooKeeper,本章以macOS为例来演示安装ZooKeeper和Kafka的过程。在开始安装前,请确保JDK已经在本机安装成功。
下载安装ZooKeeper的过程如下(本节演示的是单机模式):
(1)从ZooKeeper官方网站下载ZooKeeper安装包,笔者下载的版本是3.6.2。
(2)下载成功后,在下载目录运行以下命令(根据下载版本不同会略有差异):
• tar -zxf apache-zookeeper-3.6.2-bin.tar.gz。
• cd apache-zookeeper-3.6.2-bin。
• mkdir data。
(3)修改ZooKeeper的配置文件。
• 进入ZooKeeper解压目录下的conf目录。
• 利用样例配置文件创建新的配置文件——cp zoo_sample.cfg zoo.cfg。
• 修改zoo.cfg文件,具体代码如下:
保存并退出编辑器。
(4)启动ZooKeeper。
• 进入ZooKeeper解压目录下的bin目录。
• 运行./zkServer.sh start,ZooKeeper正常启动的日志界面如图3-83所示。
图3-83 ZooKeeper正常启动的日志界面
下载安装Kafka的步骤如下(本节演示的是单机模式):
(1)从Kafka的官网下载Kafka安装包,笔者下载的版本是kafka_2.12-2.6.0.tgz。
(2)下载成功后,解压下载的tgz文件。
(3)进入Kafka解压缩后的目录,查看config目录下的server.properties,主要确认zookeeper.connect参数是否与将要连接的ZooKeeper一致,默认情况下是localhost:2181,与安装ZooKeeper时我们配置的ZooKeeper端口号相同。
(4)进入bin目录运行sh ./kafka-server-start.sh ../config/server.propertie。
如果以上配置全部正确,我们会看到如图3-84所示的Kafka启动界面。
图3-84 Kafka启动界面
至此,Kafka在本机安装成功。需要注意的是,本节中我们采用了单机运行模式,这种模式只适合本地演示,在实际工作中,我们通常需要以集群模式运行Kafka和ZooKeeper。
3.5.4.2 Spring Boot与Kafka集成
以coupon-template-service和coupon-user-service项目为例,本节我们需要实现的业务需求是:当coupon-user-service服务接收用户申请优惠券的请求时,通知coupon-template-service服务做统计。因此,coupon-user-service是消息的生产者,coupon-template-service是消息的消费者。
首先,我们在coupon-user-service项目中添加Kafka依赖,Kafka依赖项的版本必须与Spring Boot的版本兼容,Spring Boot和Kafka版本兼容关系如图3-85所示。
图3-85 Spring Boot和Kafka版本兼容关系
根据图3-85所示,本章所使用的Spring Boot版本为2.2.10,Spring Kafka的最高版本为2.4.9,引入的Kafka依赖项的具体代码如下:
然后,我们修改application.yml文件,添加与Kafka producer相关的配置,代码如下:
接着,创建KafkaSender类,用来发送Kafka消息,具体代码如下:
最后,修改UserServiceImpl类,将KafkaSender类的实例注入进来,具体实现代码如下:
经过以上步骤,我们就完成了消息生产者的改造。
接下来,我们在coupon-template-service中实现Kafka消息的消费。
首先,我们在coupon-template-service中添加Spring Kafka依赖,并将Kafka Consumer的配置代码加入application.yml文件中,具体代码如下:
然后,我们创建KafkaConsumer类,通过@KafkaListener注解添加一个监听器。在监听器中,我们通过topics属性指定了当前消费者所监听的Topic的名称,具体代码如下:
通常,Kafka会在运行期自动创建Topic,如果我们没有开启Topic的自动创建功能,那么可以通过命令行来创建一个Kafka Topic,具体命令如下:
最后,启动coupon-user-service和coupon-template-service,并调用相关服务,验证Kafka消息是否被正常消费。