精通Spring Boot 3 : 10. 使用 Spring Boot 进行消息通信 (2)

基于 Spring Boot 的 JMS

本节介绍了如何将 Java 消息服务(JMS)与 Spring Boot 应用程序集成,使用我们的两个主要项目。JMS 自 1980 年代末期以来就已经存在,并且仍然被一些大型公司,尤其是金融机构广泛使用。JMS 最初由 Sun Microsystems 开发,随后与 Oracle 一起进入 Java 社区过程(JCP),以开发 2.0 版本。JMS 持续演进,目前已成为 Jakarta EE 生态系统的一部分,称为 Jakarta Messaging。JMS 3.0 仍在开发中。

重要的是要了解,JMS 同时支持点对点和发布-订阅两种消息模型。在点对点模型中,队列是通信的核心,而在发布-订阅模型中,主题则是核心。请参见图 10-1。

Spring 框架支持 JMS,提供了多个实用类来实现消息的发布和消费。JmsTemplate 类封装了连接、会话、多线程、重连等所有繁琐的代码。JdbcTemplate 类在第 4 章和第 5 章中几乎是相同的,但当然是用于消息传递。此外,Spring 还提供了多个注解,帮助您轻松构建消费者,例如@JmsListener 注解。如果您希望手动处理消息,可以使用
SimpleMessageListenerContainer 注解,它为您提供了更多对消息解决方案的控制。

我们在用哪个券商?

目前,IT 市场上有许多 JMS 代理实现可供选择,如 Amazon SQS、Apache ActiveMQ Classic 和 Artemis、IBM MQ 以及 JBoss Messaging Open Message Queue 等。每种实现都有一些额外功能,例如支持除 JMS 以外的多种协议。

为了演示,我们将使用 Apache ActiveMQ Artemis 代理。该代理不仅包含最新的 JMS 实现,还支持 AMQP、STOMP、MQTT 和 OpenWire 等新协议及其他功能。如果您想了解更多关于 ActiveMQ Artemis 的信息,请访问
https://activemq.apache.org/components/artemis/。我们将使用的代码对于您选择的任何其他代理都是相同的。所需的重要代理特定信息包括代理的 URL 和凭据。

带有 JMS 的用户应用程序

让我们从用户应用程序开始,看看将 JMS 添加到这个项目有多简单。您可以从 Apress 网站或 GitHub 仓库获取代码:
https://github.com/felipeg48/pro-spring-boot-3rd。代码位于 10-messaging-jms/users 文件夹中。

如果您从头开始使用 Spring Initializr (https://start.spring.io),请点击“添加依赖项”,在搜索框中输入 JMS,选择“Spring for Apache ActiveMQ Artemis”依赖项。同时添加 Web、验证、JPA、H2、PostgreSQL、Lombok 和 Docker Compose 支持依赖项。将组 ID 字段设置为 com.apress,将工件和名称字段都设置为 users。然后,点击“生成”,下载项目,解压缩并导入到您喜欢的 IDE 中。

打开 build.gradle 文件,参见列表 10-5。

plugins {
    id 'java'
    id 'org.springframework.boot' version '3.2.3'
    id 'io.spring.dependency-management' version '1.1.4'
}
group = 'com.apress'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '17'
repositories {
    mavenCentral()
}
dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-web'
    implementation 'org.springframework.boot:spring-boot-starter-validation'
    implementation 'org.springframework.boot:spring-boot-starter-artemis'
    implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310'
    developmentOnly 'org.springframework.boot:spring-boot-docker-compose'
    implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
    runtimeOnly 'com.h2database:h2'
    runtimeOnly 'org.postgresql:postgresql'
    compileOnly 'org.projectlombok:lombok'
    annotationProcessor 'org.projectlombok:lombok'
    // Web
    implementation 'org.webjars:bootstrap:5.2.3'
    testImplementation 'org.springframework.boot:spring-boot-starter-test'
}
tasks.named('test') {
    useJUnitPlatform()
}

10-5 build.gradle

由于我们在 build.gradle 文件中添加了
spring-boot-starter-artemis 依赖,Spring Boot 会自动配置 ConnectionFactory(使用默认的 broker-url、模式和其他属性)、JmsTemplate、MessageConverter(默认适用于实现 Serializable 的类、Map 或 String 类型)以及其他重要类,以便在使用时能够正常工作。jackson-datatype-jsr310 依赖将帮助处理包含 LocalDateTime 作为属性/字段的事件序列化。此外,我们还使用了
spring-boot-docker-compose 依赖,这样我们就可以运行应用程序,它会查找 docker-compose.yaml 文件并启动所需的服务。

我们将使用上一节中的事件,并需要添加一些功能来支持 JSON 序列化。接下来,打开或创建 UserRemovedEvent 类。请参见列表 10-6。

package com.apress.users.events;
import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.time.LocalDateTime;
@AllArgsConstructor
@NoArgsConstructor
@Data
public class UserRemovedEvent {
    private String email;
    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
           @JsonSerialize(using = LocalDateTimeSerializer.class)
    private LocalDateTime removed;
}

10-6 src/main/java/apress/com/users/events/UserRemovedEvent.java

我们在 UserRemovedEvent 类中添加了 @JsonFormat 和 @JsonSerialize 注解。这些注解有助于以 yyy-MM-dd HH:mm:ss 格式序列化 LocalDateTime。如果你去掉这些注解,表达简单的内容就会变得冗长(在我看来这是不必要的信息)。

接下来,UserActivatedEvent 类的代码与之前在清单 10-1 中展示的完全相同,因此无需再次展示。

接下来,打开或创建 UserLogs 类。请参阅列表 10-7。

package com.apress.users.events;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import static com.apress.users.jms.UserJmsConfig.DESTINATION_ACTIVATED;
import static com.apress.users.jms.UserJmsConfig.DESTINATION_REMOVED;
@AllArgsConstructor
@Slf4j
@Component
public class UserLogs {
    private JmsTemplate jmsTemplate;
    @Async
    @EventListener
    void userActiveStatusEventHandler(UserActivatedEvent event){
        this.jmsTemplate.convertAndSend(DESTINATION_ACTIVATED,event);
        log.info("User {} active status: {}",event.getEmail(),event.isActive());
    }
    @Async
    @EventListener
    void userDeletedEventHandler(UserRemovedEvent event){
        this.jmsTemplate.convertAndSend(DESTINATION_REMOVED,event);
        log.info("User {} DELETED at {}",event.getEmail(),event.getRemoved());
    }
}

10-7 src/main/java/apress/com/users/events/UserLogs.java

列表 10-7 显示,现在我们不仅在监听事件,还通过使用 JmsTemplate 和调用 convertAndSend 方法将事件作为消息发送。这个方法被重载以接受不同的参数,这里我们使用目标(这可以是队列的名称,因为这是一个点对点模型;在发布-订阅模型中,它将是主题的名称),这个目标将是接收消息的队列名称(DESTINATION_ACTIVATED=activated-users 和 DESTINATION_REMOVED=removed-users),以及实际的消息(事件,可以是用户激活事件或用户移除事件)作为第二个参数。 JmsTemplate 默认使用“火并忘”模式,除非明确要求立即响应(sendAndReceive)。它会通过消息转换器来转换消息,默认情况下会尝试执行 MessageConverter 接口的实现,并判断消息是否为 String、Map、Serializable 或 byte[]类型。如果无法识别类型,将抛出
MessageConversionException。

我们需要将这些事件以 JSON 格式发送。为此,我们可以让 JmsTemplate 使用 JSON 消息转换器。接下来,打开或创建 UserJmsConfig 类。请参见清单 10-8。

package com.apress.users.jms;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.support.converter.MappingJackson2Messa geConverter;
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.jms.support.converter.MessageType;
@Configuration
public class UserJmsConfig {
    public final static String DESTINATION_ACTIVATED = "activated-users";
           public final static String DESTINATION_REMOVED = "removed-users";
    @Bean
    public MessageConverter messageConverter(){
MappingJackson2MessageConverter converter = new
                                           MappingJackson2MessageConverter();
            converter.setTypeIdPropertyName("_type");
            converter.setTargetType(MessageType.TEXT);
            return converter;
    }
}

10-8 src/main/java/apress/com/users/jms/UserJmsConfig.java

UserJmsConfig 类使用 @Configuration 注解进行标记,这将在应用程序启动时被识别,并帮助 Spring 查找任何 @Bean 定义并进行相应配置。在这个类中,我们定义了两个常量,DESTINATION_ACTIVATED=activated-users 和 DESTINATION_REMOVED=removed-users,分别代表队列(或主题,具体取决于使用的模型)的名称。默认情况下,当 JmsTemplate 执行 convertAndSend 方法时,Spring Boot 会自动创建这些队列,因此无需手动或以编程方式创建它们。当然,如果需要,您可以覆盖默认设置,但在这种情况下,我们选择了更简单的选项。

我们正在定义 MessageConverter,并重写它,因为默认情况下,Spring Boot 会配置这个 bean。我们创建了一个
MappingJackson2MessageConverter,并将目标类型设置为 TEXT,这意味着我们的事件对象将以 JSON 格式的文本发送。同时,我们定义了一个 TypeId,以帮助转换器识别需要进行序列化和反序列化的类类型。TypeId 可以是您喜欢的任何文本,这只是转换器的一个提示(例如,它可以是 _class_、_id_ 或自定义),在这种情况下我们使用 _type。

接下来,打开或创建 docker-compose.yaml 文件。请参阅列表 10-9。

version: "3"
services:
  artemis:
    container_name: artemis
    hostname: artemis
    image: apache/activemq-artemis:latest-alpine
    platform: linux/amd64
    restart: always
    environment:
      EXTRA_ARGS: "--nio --relax-jolokia --http-host 0.0.0.0"
    ports:
      - "61616:61616"
      - "8161:8161"

10-9 docker-compose.yaml

正如您所见,docker-compose.yaml 文件非常简单。当应用程序启动并检测到
spring-boot-docker-compose 依赖时,它会查找该文件并启动相关服务。

默认情况下,Apache ActiveMQ Artemis 的凭据为 artemis/artemis,因此需要在 application.properties 文件中提供 spring.artermis.* 属性。请参见清单 10-10。

spring.h2.console.enabled=true
spring.datasource.generate-unique-name=false
spring.datasource.name=test-db
# JMS Remote
spring.artemis.user=artemis
spring.artemis.password=artemis
#spring.artemis.broker-url=tcp://localhost:61616
#spring.artemis.mode=native

10-10 src/main/resources/application.properties

现在我们准备将事件以 JSON 消息的形式发布到 JMS 代理。