1. 小趣网首页
  2. 源码解析
  3. 中间件系列源码

RabbitMQ Binder

用法

对于使用RabbitMQ绑定器,您只需要使用以下Maven坐标将其添加到您的Spring Cloud Stream应用程序:

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>

或者,您也可以使用Spring Cloud Stream RabbitMQ入门。

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

RabbitMQ Binder概述

以下可以看到RabbitMQ活页夹的操作简化图。

兔粘合剂

图14. RabbitMQ Binder

RabbitMQ Binder实现将每个目的地映射到TopicExchange。对于每个消费者组,Queue将绑定到该TopicExchange。每个消费者实例对其组的Queue具有相应的RabbitMQ Consumer实例。对于分区生成器/消费者,队列后缀为分区索引,并使用分区索引作为路由密钥。

使用autoBindDlq选项,您可以选择配置绑定器来创建和配置死信队列(DLQ)(以及死信交换DLX)。死信队列具有目标名称,附有.dlq。如果重试启用(maxAttempts > 1),则会将失败的消息传递到DLQ。如果禁用重试(maxAttempts = 1),则应将requeueRejected设置为false(默认),以使失败的消息将路由到DLQ,而不是重新排队。此外,republishToDlq导致绑定器向DLQ发布失败的消息(而不是拒绝它); 这使得能够将标题中的附加信息添加到消息中,例如x-exception-stacktrace头中的堆栈跟踪。此选项不需要重试启用; 一次尝试后,您可以重新发布失败的消息。从版本1.2开始,您可以配置重新发布的消息传递模式; 见财产republishDeliveryMode

重要requeueRejected设置为true将导致消息被重新排序并重新发送,这可能不是您想要的,除非故障问题是短暂的。一般来说,最好通过将maxAttempts设置为大于1,或将republishToDlq设置为true来启用binder内的重试。

有关这些属性的更多信息,请参阅RabbitMQ Binder Properties

框架不提供消耗死信消息(或重新路由到主队列)的任何标准机制。Dead-Letter队列处理中描述了一些选项。

注意在Spring Cloud Stream应用程序中使用多个 RabbitMQ绑定器时,禁用“RabbitAutoConfiguration”以避免将RabbitAutoConfiguration应用于两个绑定器的相同配置很重要。

配置选项

本节包含特定于RabbitMQ Binder和绑定频道的设置。

有关通用绑定配置选项和属性,请参阅Spring Cloud Stream核心文档

RabbitMQ Binder Properties

默认情况下,RabbitMQ binder使用Spring Boot的ConnectionFactory,因此它支持RabbitMQ的所有Spring Boot配置选项。(有关参考,请参阅Spring Boot文档。)RabbitMQ配置选项使用spring.rabbitmq前缀。

除Spring Boot选项之外,RabbitMQ binder还支持以下属性:spring.cloud.stream.rabbit.binder.adminAddresses

RabbitMQ管理插件网址的逗号分隔列表。仅在nodes包含多个条目时使用。此列表中的每个条目必须在spring.rabbitmq.addresses中具有相应的条目。

默认值:空。spring.cloud.stream.rabbit.binder.nodes

RabbitMQ节点名称的逗号分隔列表。当多个条目用于查找队列所在的服务器地址时。此列表中的每个条目必须在spring.rabbitmq.addresses中具有相应的条目。

默认值:空。spring.cloud.stream.rabbit.binder.compressionLevel

压缩绑定的压缩级别。见java.util.zip.Deflater

默认值:1(BEST_LEVEL)。

RabbitMQ消费者Properties

以下属性仅适用于Rabbit消费者,并且必须以spring.cloud.stream.rabbit.bindings.<channelName>.consumer.为前缀。acknowledgeMode

确认模式。

默认值:AUTO。autoBindDlq

是否自动声明DLQ并将其绑定到绑定器DLX。

默认值:false。bindingRoutingKey

将队列绑定到交换机的路由密钥(如果bindQueuetrue)。将附加分区目的地-<instanceIndex>

默认值:#。bindQueue

是否将队列绑定到目的地交换机?如果您已经设置了自己的基础设施并且先前已经创建/绑定了队列,请设置为false

默认值:true。deadLetterQueueName

DLQ的名称

默认值:prefix+destination.dlqdeadLetterExchange

分配给队列的DLX; 如果autoBindDlq为true

默认值:’prefix + DLX’deadLetterRoutingKey

一个死信路由密钥分配给队列; 如果autoBindDlq为true

默认值:destinationdeclareExchange

是否为目的地申报交换。

默认值:true。delayedExchange

是否将交换声明为Delayed Message Exchange – 需要在代理上延迟的消息交换插件。x-delayed-type参数设置为exchangeType

默认值:false。dlqDeadLetterExchange

如果DLQ被声明,则将DLX分配给该队列

默认值:nonedlqDeadLetterRoutingKey

如果DLQ被声明,则会将一个死信路由密钥分配给该队列; 默认无

默认值:nonedlqExpires

未使用的死信队列被删除多久(ms)

默认值:no expirationdlqMaxLength

死信队列中的最大消息数

默认值:no limitdlqMaxLengthBytes

来自所有消息的死信队列中的最大字节数

默认值:no limitdlqMaxPriority

死信队列中消息的最大优先级(0-255)

默认值:nonedlqTtl

声明(ms)时默认适用于死信队列的时间

默认值:no limitdurableSubscription

订阅是否应该耐用。仅当group也被设置时才有效。

默认值:true。exchangeAutoDelete

如果declareExchange为真,则交换机是否应该自动删除(删除最后一个队列后删除)。

默认值:true。exchangeDurable

如果declareExchange为真,则交换应该是否持久(经纪人重新启动)。

默认值:true。exchangeType

交换类型; 非分区目的地的directfanouttopic directtopic分区目的地。

默认值:topic。到期

未使用的队列被删除多久(ms)

默认值:no expirationheaderPatterns

要从入站邮件映射的头文件。

默认值:['*'](所有标题)。maxConcurrency

最大消费者人数

默认值:1。最长长度

队列中最大消息数

默认值:no limitmaxLengthBytes

来自所有消息的队列中最大字节数

默认:no limitmaxPriority

队列中消息的最大优先级(0-255)默认

none预取

预取计数。

默认值:1。字首

要添加到destination和队列名称的前缀。

默认值:“”。recoveryInterval

连接恢复尝试之间的间隔,以毫秒为单位。

默认值:5000。requeueRejected

在重试禁用或重新发布ToDlq是否为false时,是否应重新发送传递失败。

默认值:false。republishDeliveryMode

republishToDlqtrue时,指定重新发布的邮件的传递模式。

默认值:DeliveryMode.PERSISTENTrepublishToDlq

默认情况下,尝试重试后失败的消息将被拒绝。如果配置了死信队列(DLQ),则RabbitMQ将将失败的消息(未更改)路由到DLQ。如果设置为true,则绑定器将重新发布具有附加头的DLQ的失败消息,包括最终失败的原因的异常消息和堆栈跟踪。

默认值:false交易

是否使用交易渠道。

默认值:false。TTL

声明(ms)时默认适用于队列的时间

默认值:no limittxSize

阿克斯之间的交付次数。

默认值:1

兔子生产者Properties

以下属性仅适用于Rabbit生产者,必须以spring.cloud.stream.rabbit.bindings.<channelName>.producer.为前缀。autoBindDlq

是否自动声明DLQ并将其绑定到绑定器DLX。

默认值:false。batchingEnabled

是否启用生产者的消息批处理。

默认值:false。BATCHSIZE

批量启动时要缓冲的消息数。

默认值:100。batchBufferLimit

默认值:10000。batchTimeout

默认值:5000。bindingRoutingKey

将队列绑定到交换机的路由密钥(如果bindQueuetrue)。仅适用于非分区目的地。仅适用于requiredGroups,然后仅提供给这些组。

默认值:#。bindQueue

是否将队列绑定到目的地交换机?如果您已经设置了自己的基础架构并且先前已经创建/绑定了队列,请设置为false。仅适用于requiredGroups,然后仅提供给这些组。

默认值:true。压缩

发送时是否应压缩数据。

默认值:false。deadLetterQueueName

DLQ的名称仅适用于requiredGroups,仅适用于这些组。

默认值:prefix+destination.dlqdeadLetterExchange

分配给队列的DLX; 如果autoBindDlq为true只适用于requiredGroups,然后只提供给这些组。

默认值:’prefix + DLX’deadLetterRoutingKey

一个死信路由密钥分配给队列; 如果autoBindDlq为true只适用于requiredGroups,然后只提供给这些组。

默认值:destinationdeclareExchange

是否为目的地申报交换。

默认值:true。延迟

评估应用于消息(x-delay头)的延迟的Spel表达式 – 如果交换不是延迟的消息交换,则不起作用。

默认值:No x-delay头设置。delayedExchange

是否将交换声明为Delayed Message Exchange – 需要经纪人上的延迟消息交换插件。x-delayed-type参数设置为exchangeType

默认值:false。deliveryMode

交货方式。

默认值:PERSISTENT。dlqDeadLetterExchange

如果DLQ被声明,则分配给该队列的DLX只适用于requiredGroups,然后仅提供给这些组。

默认值:nonedlqDeadLetterRoutingKey

如果DLQ被声明,则会将一个死信路由密钥分配给该队列; 默认值none仅在提供requiredGroups时才适用,然后仅适用于这些组。

默认值:nonedlqExpires

未使用的死信队列被删除之前多久(ms)仅适用于requiredGroups,然后仅提供给这些组。

默认值:no expirationdlqMaxLength

死信队列中的最大消息数仅适用于requiredGroups,仅适用于这些组。

默认值:no limitdlqMaxLengthBytes

来自所有消息的死信队列中的最大字节数仅适用于requiredGroups,然后仅提供给这些组。

默认值:no limitdlqMaxPriority

死信队列中消息的最大优先级(0-255)仅适用于requiredGroups,然后仅提供给这些组。

默认值:nonedlqTtl

声明(ms)的默认时间适用于死信队列仅适用于requiredGroups,然后仅提供给这些组。

默认值:no limitexchangeAutoDelete

如果declareExchange为真,则交换机是否应该自动删除(删除最后一个队列后删除)。

默认值:true。exchangeDurable

如果declareExchange为真,则交换应该是持久的(经纪人重新启动)。

默认值:true。exchangeType

交换类型; directfanouttopicdirecttopic

默认值:topic。到期

在未使用的队列被删除之前多久(ms)仅适用于requiredGroups,然后只提供给这些组。

默认值:no expirationheaderPatterns

要将标头映射到出站邮件的模式。

默认值:['*'](所有标题)。最长长度

队列中最大消息数仅适用于requiredGroups,仅适用于这些组。

默认值:no limitmaxLengthBytes

来自所有消息的队列中最大字节数仅适用于requiredGroups,仅适用于这些组。

默认值:no limitmaxPriority

队列中消息的最大优先级(0-255)仅适用于requiredGroups,仅适用于这些组。默认

none字首

要添加到destination交换机名称的前缀。

默认值:“”。routingKeyExpression

一个SpEL表达式来确定在发布消息时使用的路由密钥。

默认值:destinationdestination-<partition>分区目的地。交易

是否使用交易渠道。

默认值:false。TTL

声明时默认适用于队列的时间(ms)仅适用于requiredGroups,然后仅适用于这些组。

默认值:no limit

注意在RabbitMQ的情况下,内容类型头可以由外部应用程序设置。Spring Cloud Stream支持它们作为用于任何类型传输(包括通常不支持头文件的Kafka)的传输的扩展内部协议的一部分)。

重试RabbitMQ Binder

概观

在绑定器中启用重试时,侦听器容器线程将被挂起,以配置任何后退时段。在单个消费者需要严格排序时,这可能很重要,但是对于其他用例,它可以防止在该线程上处理其他消息。使用绑定器重试的另一种方法是设置死机字符随着时间生活在死信队列(DLQ)上,以及DLQ本身的死信配置。有关这里讨论的属性的更多信息,请参阅RabbitMQ Binder Properties。启用此功能的示例配置:

  • autoBindDlq设置为true – 绑定器将创建一个DLQ; 您可以选择在deadLetterQueueName中指定一个名称
  • dlqTtl设置为您要在重新投递之间等待的退出时间
  • dlqDeadLetterExchange设置为默认交换 – DLQ的过期消息将被路由到原始队列,因为默认deadLetterRoutingKey是队列名称(destination.group

要强制一个消息被填字,抛出一个AmqpRejectAndDontRequeueException,或设置requeueRejectedtrue并抛出任何异常。

循环将继续没有结束,这对于短暂的问题是很好的,但是您可能想在一些尝试后放弃。幸运的是,RabbitMQ提供了x-death标题,允许您确定发生了多少个周期。

在放弃之后确认一则消息,抛出一个ImmediateAcknowledgeAmqpException

把它放在一起

---
spring.cloud.stream.bindings.input.destination=myDestination
spring.cloud.stream.bindings.input.group=consumerGroup
#disable binder retries
spring.cloud.stream.bindings.input.consumer.max-attempts=1
#dlx/dlq setup
spring.cloud.stream.rabbit.bindings.input.consumer.auto-bind-dlq=true
spring.cloud.stream.rabbit.bindings.input.consumer.dlq-ttl=5000
spring.cloud.stream.rabbit.bindings.input.consumer.dlq-dead-letter-exchange=
---

此配置创建一个与通配符路由密钥#交换主题的队列myDestination.consumerGroup的交换myDestination。它创建一个绑定到具有路由密钥myDestination.consumerGroup的直接交换DLX的DLQ。当消息被拒绝时,它们被路由到DLQ。5秒钟后,消息过期,并使用队列名称作为路由密钥路由到原始队列。Spring Boot申请

@SpringBootApplication
@EnableBinding(Sink.class)
public class XDeathApplication {

    public static void main(String[] args) {
        SpringApplication.run(XDeathApplication.class, args);
    }

    @StreamListener(Sink.INPUT)
    public void listen(String in, @Header(name = "x-death", required = false) Map<?,?> death) {
        if (death != null && death.get("count").equals(3L)) {
            // giving up - don't send to DLX
            throw new ImmediateAcknowledgeAmqpException("Failed after 4 attempts");
        }
        throw new AmqpRejectAndDontRequeueException("failed");
    }

}

请注意,x-death标题中的count属性是Long

Dead-Letter队列处理

因为不可能预料到用户如何处理死信消息,所以框架不提供任何标准的机制来处理它们。如果死刑的原因是暂时的,您可能希望将邮件路由到原始队列。但是,如果问题是一个永久性的问题,那可能会导致无限循环。以下spring-boot应用程序是如何将这些消息路由到原始队列的示例,但是在三次尝试之后将其移动到第三个“停车场”队列。第二个例子使用RabbitMQ延迟消息交换来向被重新排序的消息引入延迟。在这个例子中,每次尝试的延迟都会增加。这些示例使用@RabbitListener从DLQ接收消息,您也可以在批处理过程中使用RabbitTemplate.receive()

这些示例假定原始目的地是so8400in,消费者组是so8400

非分区目的地

前两个示例是目的地分区。

@SpringBootApplication
public class ReRouteDlqApplication {

    private static final String ORIGINAL_QUEUE = "so8400in.so8400";

    private static final String DLQ = ORIGINAL_QUEUE + ".dlq";

    private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";

    private static final String X_RETRIES_HEADER = "x-retries";

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
        System.out.println("Hit enter to terminate");
        System.in.read();
        context.close();
    }

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RabbitListener(queues = DLQ)
    public void rePublish(Message failedMessage) {
        Integer retriesHeader = (Integer) failedMessage.getMessageProperties().getHeaders().get(X_RETRIES_HEADER);
        if (retriesHeader == null) {
            retriesHeader = Integer.valueOf(0);
        }
        if (retriesHeader < 3) {
            failedMessage.getMessageProperties().getHeaders().put(X_RETRIES_HEADER, retriesHeader + 1);
            this.rabbitTemplate.send(ORIGINAL_QUEUE, failedMessage);
        }
        else {
            this.rabbitTemplate.send(PARKING_LOT, failedMessage);
        }
    }

    @Bean
    public Queue parkingLot() {
        return new Queue(PARKING_LOT);
    }

}
@SpringBootApplication
public class ReRouteDlqApplication {

    private static final String ORIGINAL_QUEUE = "so8400in.so8400";

    private static final String DLQ = ORIGINAL_QUEUE + ".dlq";

    private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";

    private static final String X_RETRIES_HEADER = "x-retries";

    private static final String DELAY_EXCHANGE = "dlqReRouter";

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
        System.out.println("Hit enter to terminate");
        System.in.read();
        context.close();
    }

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RabbitListener(queues = DLQ)
    public void rePublish(Message failedMessage) {
        Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
        Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
        if (retriesHeader == null) {
            retriesHeader = Integer.valueOf(0);
        }
        if (retriesHeader < 3) {
            headers.put(X_RETRIES_HEADER, retriesHeader + 1);
            headers.put("x-delay", 5000 * retriesHeader);
            this.rabbitTemplate.send(DELAY_EXCHANGE, ORIGINAL_QUEUE, failedMessage);
        }
        else {
            this.rabbitTemplate.send(PARKING_LOT, failedMessage);
        }
    }

    @Bean
    public DirectExchange delayExchange() {
        DirectExchange exchange = new DirectExchange(DELAY_EXCHANGE);
        exchange.setDelayed(true);
        return exchange;
    }

    @Bean
    public Binding bindOriginalToDelay() {
        return BindingBuilder.bind(new Queue(ORIGINAL_QUEUE)).to(delayExchange()).with(ORIGINAL_QUEUE);
    }

    @Bean
    public Queue parkingLot() {
        return new Queue(PARKING_LOT);
    }

}

分区目的地

对于分区目的地,所有分区都有一个DLQ,我们从头部确定原始队列。

republishToDlq = FALSE

republishToDlqfalse时,RabbitMQ将消息发布到DLX / DLQ,其中包含有关原始目的地信息的x-death标题。

@SpringBootApplication
public class ReRouteDlqApplication {

	private static final String ORIGINAL_QUEUE = "so8400in.so8400";

	private static final String DLQ = ORIGINAL_QUEUE + ".dlq";

	private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";

	private static final String X_DEATH_HEADER = "x-death";

	private static final String X_RETRIES_HEADER = "x-retries";

	public static void main(String[] args) throws Exception {
		ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
		System.out.println("Hit enter to terminate");
		System.in.read();
		context.close();
	}

	@Autowired
	private RabbitTemplate rabbitTemplate;

	@SuppressWarnings("unchecked")
	@RabbitListener(queues = DLQ)
	public void rePublish(Message failedMessage) {
		Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
		Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
		if (retriesHeader == null) {
			retriesHeader = Integer.valueOf(0);
		}
		if (retriesHeader < 3) {
			headers.put(X_RETRIES_HEADER, retriesHeader + 1);
			List<Map<String, ?>> xDeath = (List<Map<String, ?>>) headers.get(X_DEATH_HEADER);
			String exchange = (String) xDeath.get(0).get("exchange");
			List<String> routingKeys = (List<String>) xDeath.get(0).get("routing-keys");
			this.rabbitTemplate.send(exchange, routingKeys.get(0), failedMessage);
		}
		else {
			this.rabbitTemplate.send(PARKING_LOT, failedMessage);
		}
	}

	@Bean
	public Queue parkingLot() {
		return new Queue(PARKING_LOT);
	}

}
republishToDlq =真

republishToDlqtrue时,重新发布恢复器将原始交换和路由密钥添加到标题。

@SpringBootApplication
public class ReRouteDlqApplication {

	private static final String ORIGINAL_QUEUE = "so8400in.so8400";

	private static final String DLQ = ORIGINAL_QUEUE + ".dlq";

	private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";

	private static final String X_RETRIES_HEADER = "x-retries";

	private static final String X_ORIGINAL_EXCHANGE_HEADER = RepublishMessageRecoverer.X_ORIGINAL_EXCHANGE;

	private static final String X_ORIGINAL_ROUTING_KEY_HEADER = RepublishMessageRecoverer.X_ORIGINAL_ROUTING_KEY;

	public static void main(String[] args) throws Exception {
		ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
		System.out.println("Hit enter to terminate");
		System.in.read();
		context.close();
	}

	@Autowired
	private RabbitTemplate rabbitTemplate;

	@RabbitListener(queues = DLQ)
	public void rePublish(Message failedMessage) {
		Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
		Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
		if (retriesHeader == null) {
			retriesHeader = Integer.valueOf(0);
		}
		if (retriesHeader < 3) {
			headers.put(X_RETRIES_HEADER, retriesHeader + 1);
			String exchange = (String) headers.get(X_ORIGINAL_EXCHANGE_HEADER);
			String originalRoutingKey = (String) headers.get(X_ORIGINAL_ROUTING_KEY_HEADER);
			this.rabbitTemplate.send(exchange, originalRoutingKey, failedMessage);
		}
		else {
			this.rabbitTemplate.send(PARKING_LOT, failedMessage);
		}
	}

	@Bean
	public Queue parkingLot() {
		return new Queue(PARKING_LOT);
	}

}

Spring Cloud Bus

Spring Cloud Bus将分布式系统的节点与轻量级消息代理链接。这可以用于广播状态更改(例如配置更改)或其他管理指令。一个关键的想法是,总线就像一个分布式执行器,用于扩展的Spring Boot应用程序,但也可以用作应用程序之间的通信通道。目前唯一的实现是使用AMQP代理作为传输,但是相同的基本功能集(还有一些取决于传输)在其他传输的路线图上。

注意Spring Cloud根据非限制性Apache 2.0许可证发布。如果您想为文档的这一部分做出贡献,或者发现错误,请在github中找到项目中的源代码和问题跟踪器。

快速开始

90%高可用的千亿级微服务架构之道深入学习一线大厂必备微服务架构技术。VIP 教程限时免费领取。 ⇐ 立即查看

Spring Cloud Bus的工作原理是添加Spring Boot自动配置,如果它在类路径中检测到自身。所有您需要做的是启用总线是将spring-cloud-starter-bus-amqpspring-cloud-starter-bus-kafka添加到您的依赖关系管理中,并且Spring Cloud负责其余部分。确保代理(RabbitMQ或Kafka)可用和配置:在本地主机上运行,​​您不应该做任何事情,但如果您远程运行使用Spring Cloud连接器或Spring Boot定义经纪人凭据的约定,例如Rabbitapplication.yml

spring:
  rabbitmq:
    host: mybroker.com
    port: 5672
    username: user
    password: secret

总线当前支持向所有节点发送消息,用于特定服务的所有节点(由Eureka定义)。未来可能会添加更多的选择器标准(即,仅数据中心Y中的服务X节点等)。/bus/*执行器命名空间下还有一些http端点。目前有两个实施。第一个/bus/env发送密钥/值对来更新每个节点的Spring环境。第二个,/bus/refresh,将重新加载每个应用程序的配置,就好像他们在他们的/refresh端点上都被ping过。

注意总线起动器覆盖了Rabbit和Kafka,因为这是两种最常用的实现方式,但是Spring Cloud Stream非常灵活,绑定器将与spring-cloud-bus结合使用。

处理实例

90%高可用的千亿级微服务架构之道深入学习一线大厂必备微服务架构技术。VIP 教程限时免费领取。 ⇐ 立即查看

HTTP端点接受“目的地”参数,例如“/ bus / refresh?destination = customers:9000”,其中目的地是ApplicationContext ID。如果ID由总线上的一个实例拥有,那么它将处理消息,所有其他实例将忽略它。Spring Boot将ContextIdApplicationContextInitializer中的ID设置为spring.application.name,活动配置文件和server.port的组合。

寻址服务的所有实例

90%高可用的千亿级微服务架构之道深入学习一线大厂必备微服务架构技术。VIP 教程限时免费领取。 ⇐ 立即查看

“destination”参数用于Spring PathMatcher(路径分隔符为冒号:)以确定实例是否处理该消息。使用上述示例,“/ bus / refresh?destination = customers:**”将针对“客户”服务的所有实例,而不管配置文件和端口设置为ApplicationContext ID。

应用程序上下文ID必须是唯一的

90%高可用的千亿级微服务架构之道深入学习一线大厂必备微服务架构技术。VIP 教程限时免费领取。 ⇐ 立即查看

总线尝试从原始ApplicationEvent一次消除处理事件两次,一次从队列中消除。为此,它会检查发送应用程序上下文id,以重新显示当前的应用程序上下文ID。如果服务的多个实例具有相同的应用程序上下文id,则不会处理事件。在本地机器上运行,每个服务将在不同的端口上,这将是应用程序上下文ID的一部分。Cloud Foundry提供了区分的索引。要确保应用程序上下文ID是唯一的,请将spring.application.index设置为服务的每个实例唯一的值。例如,在lattice中,在application.properties中设置spring.application.index=${INSTANCE_INDEX}(如果使用configserver,请设置bootstrap.properties)。

自定义Message Broker

90%高可用的千亿级微服务架构之道深入学习一线大厂必备微服务架构技术。VIP 教程限时免费领取。 ⇐ 立即查看

Spring Cloud Bus使用 Spring Cloud Stream广播消息,以便获取消息流,只需要在类路径中包含您选择的binder实现。有AMQP(RabbitMQ)和Kafka(spring-cloud-starter-bus-[amqp,kafka])的公共汽车专用起动方便。一般来说,Spring Cloud Stream依赖于用于配置中间件的Spring Boot自动配置约定,因此例如AMQP代理地址可以使用spring.rabbitmq.*配置属性更改。Spring Cloud Bus在spring.cloud.bus.*中具有少量本地配置属性(例如spring.cloud.bus.destination是使用外部中间件的主题的名称)。通常,默认值就足够了。

要更多地了解如何自定义消息代理设置,请参阅Spring Cloud Stream文档。

跟踪Bus Events

90%高可用的千亿级微服务架构之道深入学习一线大厂必备微服务架构技术。VIP 教程限时免费领取。 ⇐ 立即查看

可以通过设置spring.cloud.bus.trace.enabled=true来跟踪总线事件(RemoteApplicationEvent的子类)。如果这样做,那么Spring Boot TraceRepository(如果存在)将显示每个发送的事件和来自每个服务实例的所有ack。示例(来自/trace端点):

{
  "timestamp": "2015-11-26T10:24:44.411+0000",
  "info": {
    "signal": "spring.cloud.bus.ack",
    "type": "RefreshRemoteApplicationEvent",
    "id": "c4d374b7-58ea-4928-a312-31984def293b",
    "origin": "stores:8081",
    "destination": "*:**"
  }
  },
  {
  "timestamp": "2015-11-26T10:24:41.864+0000",
  "info": {
    "signal": "spring.cloud.bus.sent",
    "type": "RefreshRemoteApplicationEvent",
    "id": "c4d374b7-58ea-4928-a312-31984def293b",
    "origin": "customers:9000",
    "destination": "*:**"
  }
  },
  {
  "timestamp": "2015-11-26T10:24:41.862+0000",
  "info": {
    "signal": "spring.cloud.bus.ack",
    "type": "RefreshRemoteApplicationEvent",
    "id": "c4d374b7-58ea-4928-a312-31984def293b",
    "origin": "customers:9000",
    "destination": "*:**"
  }
}

该跟踪显示RefreshRemoteApplicationEventcustomers:9000发送到所有服务,并且已被customers:9000stores:8081收到(acked)。

为了处理信号,您可以向您的应用添加AckRemoteApplicationEventSentApplicationEvent类型的@EventListener(并启用跟踪)。或者您可以利用TraceRepository并从中挖掘数据。

注意任何总线应用程序都可以跟踪ack,但有时在一个可以对数据进行更复杂查询的中央服务器中执行此操作是有用的。或者将其转发到专门的跟踪服务。

广播自己的Events

90%高可用的千亿级微服务架构之道深入学习一线大厂必备微服务架构技术。VIP 教程限时免费领取。 ⇐ 立即查看

总线可以携带任何类型为RemoteApplicationEvent的事件,但默认传输是JSON,并且解串器需要知道哪些类型将提前使用。要注册一个新类型,它需要在org.springframework.cloud.bus.event的子包中。

要自定义事件名称,您可以在自定义类上使用@JsonTypeName,或者依赖默认策略来使用类的简单名称。请注意,生产者和消费者都需要访问类定义。

在自定义包中注册事件

如果您不能或不想为自定义事件使用org.springframework.cloud.bus.event的子包,则必须使用@RemoteApplicationEventScan指定要扫描类型为RemoteApplicationEvent的事件的包。使用@RemoteApplicationEventScan指定的软件包包括子包。

例如,如果您有一个名为FooEvent的自定义事件:

package com.acme;

public class FooEvent extends RemoteApplicationEvent {
    ...
}

您可以通过以下方式与解串器注册此事件:

package com.acme;

@Configuration
@RemoteApplicationEventScan
public class BusConfiguration {
    ...
}

没有指定一个值,使用@RemoteApplicationEventScan的类的包将被注册。在这个例子中,com.acme将使用BusConfiguration的包进行注册。

您还可以使用@RemoteApplicationEventScan上的valuebasePackagesbasePackageClasses属性明确指定要扫描的软件包。例如:

package com.acme;

@Configuration
//@RemoteApplicationEventScan({"com.acme", "foo.bar"})
//@RemoteApplicationEventScan(basePackages = {"com.acme", "foo.bar", "fizz.buzz"})
@RemoteApplicationEventScan(basePackageClasses = BusConfiguration.class)
public class BusConfiguration {
    ...
}

以上@RemoteApplicationEventScan的所有示例都是等效的,因为com.acme程序包将通过在@RemoteApplicationEventScan上明确指定程序包来注册。请注意,您可以指定要扫描的多个基本软件包。

本文归属 thenx - 小趣网版权所有,若需转载请联系站长
网站标题:小趣网
网站地址:https://thenx.org.cn
文章标题:RabbitMQ Binder
文章链接:https://thenx.org.cn/1096.html
作者昵称:thenx-admin
作者链接:https://thenx.org.cn/%e4%b8%aa%e4%ba%ba%e4%b8%ad%e5%bf%83%e9%a1%b5%e9%9d%a2/thenx-admin

发表评论

登录后才能评论

评论列表(1条)

  • thenx-admin
    thenx-admin 2020年2月26日 下午2:35

    以上@RemoteApplicationEventScan的所有示例都是等效的,因为com.acme程序包将通过在@RemoteApplicationEventScan上明确指定程序包来注册

联系我们

站长邮箱: opensource@thenx.org

Github: https://github.com/thenx-projects

注意:但凡您有需要任何问题及需求,都可以通过以上联系方式联系本站站长 !

QR code