springboot2.6.2系列教程之消息传递-13

springboot2.6.2系列教程之消息传递-13,第1张

springboot2.6.2系列教程之消息传递-13

目录

消息传递

JMS

ActiveMQ 支持发送消息接收消息 AMQP

RabbitMQ 支持发送消息接收消息 Apache Kafka 支持

发送消息接收消息Kafka 流使用嵌入式 Kafka 进行测试 Spring 集成

消息传递

spring framework 为与消息传递系统的集成提供了广泛的支持,从使用 JMS API 的简化使用JmsTemplate到异步接收消息的完整基础架构。Spring AMQP 为高级消息队列协议提供了类似的功能集。RabbitTemplateSpring Boot 还为RabbitMQ提供了自动配置选项。Spring WebSocket 原生包括对 STOMP 消息传递的支持,Spring Boot 通过启动器和少量的自动配置来支持它。Spring Boot 还支持 Apache Kafka。

JMS

该javax.jms.ConnectionFactory接口提供了一种创建javax.jms.Connection用于与 JMS 代理交互的标准方法。尽管 Spring 需要ConnectionFactory与 JMS 一起使用,但您通常不需要自己直接使用它,而是可以依赖更高级别的消息传递抽象。Spring Boot 还自动配置必要的基础设施来发送和接收消息。

ActiveMQ 支持

当ActiveMQ在类路径上可用时,Spring Boot 还可以配置一个ConnectionFactory. 如果存在代理,则会自动启动和配置嵌入式代理(前提是未通过配置指定代理 URL,并且未在配置中禁用嵌入式代理)。

如果使用spring-boot-starter-activemq,则提供连接或嵌入 ActiveMQ 实例所需的依赖项,以及与 JMS 集成的 Spring 基础架构。

ActiveMQ 配置由spring.activemq.*.

默认情况下,ActiveMQ 自动配置为使用VM 传输,它启动嵌入在同一 JVM 实例中的代理。

您可以通过配置属性来禁用嵌入式代理,spring.activemq.in-memory如下例所示:

spring:
  activemq:
    in-memory: false

如果您配置代理 URL,嵌入式代理也将被禁用,如以下示例所示:

spring:
  activemq:
    broker-url: "tcp://192.168.1.210:9876"
    user: "admin"
    password: "secret"
发送消息

SpringJmsTemplate是自动配置的,您可以直接将其自动装配到您自己的 bean 中,如下例所示:

@Component
public class MyBean {

    private final JmsTemplate jmsTemplate;

    public MyBean(JmsTemplate jmsTemplate) {
        this.jmsTemplate = jmsTemplate;
    }

    // ...

}
接收消息

当 JMS 基础设施存在时,可以对任何 bean 进行注释@JmsListener以创建侦听器端点。如果没有JmsListenerContainerFactory定义,则自动配置一个默认值。如果定义了 a DestinationResolver、 aMessageConverter或 a javax.jms.ExceptionListenerbean,它们将自动与默认工厂关联。

默认情况下,默认工厂是事务性的。如果您在存在 a 的基础架构中运行JtaTransactionManager,则默认情况下它与侦听器容器相关联。如果不是,sessionTransacted则启用该标志。在后一种情况下,您可以通过添加@Transactional侦听器方法(或其委托)将本地数据存储事务与传入消息的处理相关联。这可确保在本地事务完成后确认传入消息。这还包括发送已在同一 JMS 会话上执行的响应消息。

以下组件在someQueue目标上创建侦听器端点:

@Component
public class MyBean {

    @JmsListener(destination = "someQueue")
    public void processMessage(String content) {
        // ...
    }

}
AMQP

高级消息队列协议 (AMQP) 是面向消息中间件的平台中立、线路级协议。Spring AMQP 项目将核心 Spring 概念应用于基于 AMQP 的消息传递解决方案的开发。Spring Boot 为通过 RabbitMQ 使用 AMQP 提供了多种便利,包括spring-boot-starter-amqp“Starter”。

RabbitMQ 支持

RabbitMQ是基于 AMQP 协议的轻量级、可靠、可扩展、可移植的消息代理。Spring 使用RabbitMQ通过 AMQP 协议进行通信。

RabbitMQ 配置由spring.rabbitmq.*. 例如,您可以在 中声明以下部分application.properties:

spring:
  rabbitmq:
    host: "localhost"
    port: 5672
    username: "admin"
    password: "secret"

或者,您可以使用以下addresses属性配置相同的连接:

spring:
  rabbitmq:
    addresses: "amqp://admin:secret@localhost"
发送消息

Spring 的AmqpTemplate和AmqpAdmin是自动配置的,您可以将它们直接自动装配到您自己的 bean 中,如下例所示:

@Component
public class MyBean {

    private final AmqpAdmin amqpAdmin;

    private final AmqpTemplate amqpTemplate;

    public MyBean(AmqpAdmin amqpAdmin, AmqpTemplate amqpTemplate) {
        this.amqpAdmin = amqpAdmin;
        this.amqpTemplate = amqpTemplate;
    }

    // ...

}

要重试 *** 作,您可以启用重试AmqpTemplate(例如,在代理连接丢失的情况下):

spring:
  rabbitmq:
    template:
      retry:
        enabled: true
        initial-interval: "2s"
接收消息

当 Rabbit 基础设施存在时,可以对任何 bean 进行注释@RabbitListener以创建侦听器端点。如果没有RabbitListenerContainerFactory定义,则自动配置默认值,您可以使用该属性SimpleRabbitListenerContainerFactory切换到直接容器。spring.rabbitmq.listener.type如果定义了 aMessageConverter或MessageRecovererbean,它会自动与默认工厂关联。

以下示例组件在someQueue队列上创建一个侦听器端点:

@Component
public class MyBean {

    @RabbitListener(queues = "someQueue")
    public void processMessage(String content) {
        // ...
    }

}
Apache Kafka 支持

通过提供项目的自动配置来支持Apache Kafkaspring-kafka。

Kafka 配置由spring.kafka.*. 例如,您可以在 中声明以下部分application.properties:

spring:
  kafka:
    bootstrap-servers: "localhost:9092"
    consumer:
      group-id: "myGroup"
发送消息

SpringKafkaTemplate是自动配置的,您可以直接在自己的 bean 中自动装配它,如下例所示:

@Component
public class MyBean {

    private final KafkaTemplate kafkaTemplate;

    public MyBean(KafkaTemplate kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    // ...

}
接收消息

当存在 Apache Kafka 基础架构时,可以使用任何 bean 进行注释@KafkaListener以创建侦听器端点。如果没有KafkaListenerContainerFactory定义,则自动配置一个默认值,其中定义了spring.kafka.listener.*.

以下组件在someTopic主题上创建侦听器端点:

@Component
public class MyBean {

    @KafkaListener(topics = "someTopic")
    public void processMessage(String content) {
        // ...
    }

}

如果KafkaTransactionManager定义了 bean,它会自动关联到容器工厂。类似地,如果定义了RecordFilterStrategy、ErrorHandler、或bean CommonErrorHandler,它会自动关联到默认工厂。AfterRollbackProcessor``ConsumerAwareRebalanceListener

根据侦听器类型,一个RecordMessageConverter或BatchMessageConverterbean 与默认工厂相关联。如果RecordMessageConverter批处理侦听器仅存在一个 bean,则将其包装在BatchMessageConverter.

Kafka 流

Spring for Apache Kafka 提供了一个工厂 bean 来创建StreamsBuilder对象并管理其流的生命周期。KafkaStreamsConfiguration只要在类路径上, Spring Boot 就会自动配置所需的bean,并且注释kafka-streams启用了 Kafka Streams 。@EnableKafkaStreams

启用 Kafka Streams 意味着必须设置应用程序 ID 和引导服务器。前者可以使用 配置,如果未设置则spring.kafka.streams.application-id默认为。spring.application.name后者可以全局设置或仅针对流专门覆盖。

使用专用属性可以使用几个附加属性;可以使用spring.kafka.streams.properties命名空间设置其他任意 Kafka 属性。另请参阅features.html了解更多信息。

要使用工厂 bean,请按照以下示例所示连接StreamsBuilder到您的 bean:@Bean

@Configuration(proxyBeanMethods = false)
@EnableKafkaStreams
public class MyKafkaStreamsConfiguration {

    @Bean
    public KStream kStream(StreamsBuilder streamsBuilder) {
        KStream stream = streamsBuilder.stream("ks1In");
        stream.map(this::uppercasevalue).to("ks1Out", Produced.with(Serdes.Integer(), new JsonSerde<>()));
        return stream;
    }

    private KeyValue uppercasevalue(Integer key, String value) {
        return new KeyValue<>(key, value.toUpperCase());
    }

}

默认情况下,由StreamBuilder它创建的对象管理的流是自动启动的。spring.kafka.streams.auto-startup您可以使用该属性自定义此行为。

使用嵌入式 Kafka 进行测试

Spring for Apache Kafka 提供了一种使用嵌入式 Apache Kafka 代理测试项目的便捷方式。要使用此功能,请使用模块注释测试@EmbeddedKafka类spring-kafka-test。有关更多信息,请参阅 Spring for Apache Kafka参考手册。

要使 Spring Boot 自动配置与上述嵌入式 Apache Kafka 代理一起工作,您需要将嵌入式代理地址(由 填充EmbeddedKafkaBroker)的系统属性重新映射到 Apache Kafka 的 Spring Boot 配置属性中。有几种方法可以做到这一点:

提供一个系统属性以将嵌入的代理地址映射到spring.kafka.bootstrap-servers测试类中:

static {
    System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY, "spring.kafka.bootstrap-servers");
}

@EmbeddedKafka在注解上配置一个属性名称:

@SpringBootTest
@EmbeddedKafka(topics = "someTopic", bootstrapServersProperty = "spring.kafka.bootstrap-servers")
class MyTest {

    // ...

}

在配置属性中使用占位符:

spring:
  kafka:
    bootstrap-servers: "${spring.embedded.kafka.brokers}"
Spring 集成

Spring Boot 为使用Spring Integration提供了多种便利,包括spring-boot-starter-integration“Starter”。Spring Integration 提供了对消息传递以及 HTTP、TCP 等其他传输的抽象。如果 Spring Integration 在您的类路径上可用,它会通过@EnableIntegration注解进行初始化。

Spring Integration 轮询逻辑依赖于自动配置的TaskScheduler. 可以使用配置属性自定义默认值Pollermetadata(每秒轮询无限数量的消息) 。spring.integration.poller.*

Spring Boot 还配置了一些由额外的 Spring Integration 模块触发的特性。如果spring-integration-jmx也在类路径上,则通过 JMX 发布消息处理统计信息。如果spring-integration-jdbc可用,则可以在启动时创建默认数据库模式,如下行所示:

spring:
  integration:
    jdbc:
      initialize-schema: "always"

如果spring-integration-rsocket可用,开发人员可以使用"spring.rsocket.server.*"属性配置 RSocket 服务器,并让它使用IntegrationRSocketEndpoint或RSocketOutboundGateway组件来处理传入的 RSocket 消息。此基础设施可以处理 Spring Integration RSocket 通道适配器和@MessageMapping处理程序("spring.integration.rsocket.server.message-mapping-enabled"已配置)。

Spring Boot 还可以自动配置ClientRSocketConnectorusing 配置属性:

# Connecting to a RSocket server over TCP
spring:
  integration:
    rsocket:
      client:
        host: "example.org"
        port: 9898
# Connecting to a RSocket Server over WebSocket
spring:
  integration:
    rsocket:
      client:
        uri: "ws://example.org"

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5717400.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存