消息消费
1. 脉冲星监听器
对于 Pulsar 消费者,我们建议最终用户应用程序使用PulsarListener注解。 使用PulsarListener,您需要使用@EnablePulsar注解。 当您使用 Spring Boot 支持时,它会自动启用此注释并配置所有必要的组件PulsarListener,例如消息侦听器基础设施(负责创建 Pulsar 消费者)。PulsarMessageListenerContainer使用PulsarConsumerFactory创建和管理 Pulsar 消费者,即它用于消费消息的底层 Pulsar 消费者。
Spring Boot 提供了这个消费者工厂,您可以通过指定spring.pulsar.consumer.*应用程序属性。
让我们重新审视一下PulsarListener我们在快速浏览部分看到的代码片段:
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(String message) {
System.out.println("Message Received: " + message);
}
您可以进一步简化此方法:
@PulsarListener
public void listen(String message) {
System.out.println("Message Received: " + message);
}
在这种最基本的形式中,当subscriptionName上未提供@PulsarListener注释将使用自动生成的订阅名称。同样,当topics未直接提供,则使用主题解析过程来确定目标主题。
在PulsarListener方法,我们收到的数据为String,但我们没有指定任何模式类型。在内部,框架依赖于 Pulsar 的模式机制将数据转换为所需的类型。框架检测到您期望的String类型,然后根据该信息推断架构类型,并将该架构提供给使用者。框架对所有原始类型执行此推理。对于所有非原始类型,默认架构假定为 JSON。如果复杂类型使用 JSON 以外的任何内容(例如 AVRO 或 KEY_VALUE),则必须使用schemaType财产。
以下示例显示了另一个PulsarListener方法,该方法采用Integer:
@PulsarListener(subscriptionName = "my-subscription-1", topics = "my-topic-1")
public void listen(Integer message) {
System.out.println(message);
}
以下内容PulsarListener方法展示了我们如何从主题中使用复杂类型:
@PulsarListener(subscriptionName = "my-subscription-2", topics = "my-topic-2", schemaType = SchemaType.JSON)
public void listen(Foo message) {
System.out.println(message);
}
让我们再看看几种方法。
您可以直接使用 Pulsar 消息:
@PulsarListener(subscriptionName = "my-subscription", topics = "my-topic")
public void listen(org.apache.pulsar.client.api.Message<String> message) {
System.out.println(message.getValue());
}
以下示例使用 Spring 消息信封使用记录:
@PulsarListener(subscriptionName = "my-subscription", topics = "my-topic")
public void listen(org.springframework.messaging.Message<String> message) {
System.out.println(message.getPayload());
}
现在让我们看看如何批量使用记录。
以下示例使用PulsarListener要将批量记录作为 POJO 使用:
@PulsarListener(subscriptionName = "hello-batch-subscription", topics = "hello-batch", schemaType = SchemaType.JSON, batch = true)
public void listen(List<Foo> messages) {
System.out.println("records received :" + messages.size());
messages.forEach((message) -> System.out.println("record : " + message));
}
请注意,在此示例中,我们将记录作为集合 (List) 的对象。
此外,要在PulsarListener级别,您需要将batch属性设置为true.
根据实际类型,该Listholds,则框架会尝试推断要使用的模式。
如果List除了 JSON 之外,还包含复杂类型,您仍然需要提供schemaType上PulsarListener.
以下使用MessagePulsar Java 客户端提供的信封:
@PulsarListener(subscriptionName = "hello-batch-subscription", topics = "hello-batch", schemaType = SchemaType.JSON, batch = true)
public void listen(List<Message<Foo>> messages) {
System.out.println("records received :" + messages.size());
messages.forEach((message) -> System.out.println("record : " + message.getValue()));
}
以下示例使用带有 Spring 消息传递信封的批处理记录Message类型:
@PulsarListener(subscriptionName = "hello-batch-subscription", topics = "hello-batch", schemaType = SchemaType.JSON, batch = true)
public void listen(List<org.springframework.messaging.Message<Foo>> messages) {
System.out.println("records received :" + messages.size());
messages.forEach((message) -> System.out.println("record : " + message.getPayload()));
}
最后,您还可以使用Messagesholder 对象:
@PulsarListener(subscriptionName = "hello-batch-subscription", topics = "hello-batch", schemaType = SchemaType.JSON, batch = true)
public void listen(org.apache.pulsar.client.api.Messages<Foo>> messages) {
System.out.println("records received :" + messages.size());
messages.forEach((message) -> System.out.println("record : " + message.getValue()));
}
当您使用PulsarListener,您可以直接在注释本身上提供 Pulsar 消费者属性。
如果您不想使用前面提到的启动配置属性或有多个PulsarListener方法。
以下示例直接在PulsarListener:
@PulsarListener(properties = { "subscriptionName=subscription-1", "topicNames=foo-1", "receiverQueueSize=5000" })
void listen(String message) {
}
使用的属性是直接的 Pulsar 使用者属性,而不是spring.pulsar.consumer应用程序配置属性 |
1.1. 带有AUTO_CONSUME的通用记录
如果没有机会提前知道 Pulsar 主题的 schema 类型,可以使用AUTO_CONSUMEschema 类型来使用通用记录。
在这种情况下,主题将消息反序列化为GenericRecord对象使用与主题关联的架构信息。
要使用通用记录,请将schemaType = SchemaType.AUTO_CONSUME在您的@PulsarListener并使用GenericRecord作为 message 参数,如下所示。
@PulsarListener(topics = "my-generic-topic", schemaType = SchemaType.AUTO_CONSUME)
void listen(org.apache.pulsar.client.api.Message<GenericRecord> message) {
GenericRecord record = message.getValue();
record.getFields().forEach((f) ->
System.out.printf("%s = %s%n", f.getName(), record.getField(f)));
}
这GenericRecordAPI 允许访问字段及其关联值 |
1.2. 自定义 ConsumerBuilder
您可以通过以下方式自定义任何可用字段ConsumerBuilder使用PulsarListenerConsumerBuilderCustomizer通过提供@Bean类型PulsarListenerConsumerBuilderCustomizer然后将其提供给PulsarListener如下图所示。
@PulsarListener(topics = "hello-topic", consumerCustomizer = "myCustomizer")
public void listen(String message) {
System.out.println("Message Received: " + message);
}
@Bean
PulsarListenerConsumerBuilderCustomizer<String> myCustomizer() {
return (builder) -> builder.consumerName("myConsumer");
}
如果您的应用程序只有一个@PulsarListener和单个PulsarListenerConsumerBuilderCustomizerbean 注册,则定制器将自动应用。 |
2. 指定模式信息
如前所述,对于 Java 原语,Spring for Apache Pulsar 框架可以推断出要在PulsarListener.
对于非原始类型,如果未在 Comments 上显式指定 Schema,则 Spring for Apache Pulsar 框架将尝试构建一个Schema.JSON从类型。
| 当前支持的复杂架构类型包括 JSON、AVRO、PROTOBUF、AUTO_CONSUME KEY_VALUE 带内联编码。 |
2.1. 自定义模式映射
作为在PulsarListener对于复杂类型,可以使用类型的映射来配置模式解析器。
这样就无需在侦听器上设置模式,因为框架使用传入消息类型咨询解析器。
2.1.1. 配置属性
架构映射可以使用spring.pulsar.defaults.type-mappings财产。
以下示例使用application.yml为User和Address复杂对象使用AVRO和JSONschemas,分别:
spring:
pulsar:
defaults:
type-mappings:
- message-type: com.acme.User
schema-info:
schema-type: AVRO
- message-type: com.acme.Address
schema-info:
schema-type: JSON
这message-type是消息类的完全限定名称。 |
2.1.2. 模式解析器定制器
添加映射的首选方法是通过上述属性。 但是,如果需要更多控制,可以提供架构解析器定制器来添加映射。
以下示例使用架构解析器定制器为User和Address复杂对象使用AVRO和JSONschemas,分别:
@Bean
public SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() {
return (schemaResolver) -> {
schemaResolver.addCustomSchemaMapping(User.class, Schema.AVRO(User.class));
schemaResolver.addCustomSchemaMapping(Address.class, Schema.JSON(Address.class));
}
}
2.1.3. 类型映射注解
指定要用于特定消息类型的默认模式信息的另一个选项是使用@PulsarMessage注解。
架构信息可以通过schemaType属性。
以下示例将系统配置为在生成或使用Foo:
@PulsarMessage(schemaType = SchemaType.JSON)
record Foo(String value) {
}
使用此配置后,无需在侦听器上设置架构,例如:
@PulsarListener(subscriptionName = "user-sub", topics = "user-topic")
public void listen(User user) {
System.out.println(user);
}
3. 访问 Pulsar 消费者对象
有时,您需要直接访问 Pulsar Consumer 对象。以下示例显示了如何获取它:
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(String message, org.apache.pulsar.client.api.Consumer<String> consumer) {
System.out.println("Message Received: " + message);
ConsumerStats stats = consumer.getStats();
...
}
当访问Consumer对象,不要调用任何会通过调用任何接收方法来更改使用者光标位置的作。所有此类作都必须由容器完成。 |
4. Pulsar 消息监听器容器
现在我们看到了消费者端的基本交互PulsarListener.现在让我们深入了解如何PulsarListener与底层 Pulsar 使用者交互。
请记住,对于最终用户应用程序,在大多数情况下,我们建议使用PulsarListener注释,以便在使用 Spring for Apache Pulsar 时直接从 Pulsar 主题中使用,因为该模型涵盖了广泛的应用程序用例。
但是,重要的是要了解如何PulsarListener在内部工作。本节介绍这些细节。
如前所述,当您使用 Spring for Apache Pulsar 时,消息监听器容器是消息消费的核心。PulsarListener在幕后使用消息侦听器容器基础设施来创建和管理 Pulsar 消费者。
Spring for Apache Pulsar 通过PulsarMessageListenerContainer.
此消息侦听器容器的默认实现是通过DefaultPulsarMessageListenerContainer.
顾名思义,PulsarMessageListenerContainer包含消息侦听器。
容器创建 Pulsar 使用者,然后运行单独的线程来接收和处理数据。
数据由提供的消息侦听器实现处理。
消息侦听器容器使用消费者的batchReceive方法。 收到数据后,它将移交给选定的消息侦听器实现。
当您使用 Spring for Apache Pulsar 时,可以使用以下消息侦听器类型。
我们将在以下部分中看到有关这些不同消息侦听器的详细信息。
然而,在此之前,让我们仔细看看容器本身。
4.1. 默认 PulsarMessageListenerContainer
这是一个基于消费者的单个消息侦听器容器。以下列表显示了其构造函数:
public DefaultPulsarMessageListenerContainer(PulsarConsumerFactory<? super T> pulsarConsumerFactory,
PulsarContainerProperties pulsarContainerProperties)
}
它收到一个PulsarConsumerFactory(它用来创建消费者)和一个PulsarContainerProperties对象(包含有关容器属性的信息)。PulsarContainerProperties具有以下构造函数:
public PulsarContainerProperties(String... topics)
public PulsarContainerProperties(Pattern topicPattern)
您可以通过以下方式提供主题信息PulsarContainerProperties或作为提供给使用者工厂的使用者属性。以下示例使用DefaultPulsarMessageListenerContainer:
Map<String, Object> config = new HashMap<>();
config.put("topics", "my-topic");
PulsarConsumerFactory<String> pulsarConsumerFactorY = DefaultPulsarConsumerFactory<>(pulsarClient, config);
PulsarContainerProperties pulsarContainerProperties = new PulsarContainerProperties();
pulsarContainerProperties.setMessageListener((PulsarRecordMessageListener<?>) (consumer, msg) -> {
});
DefaultPulsarMessageListenerContainer<String> pulsarListenerContainer = new DefaultPulsarMessageListenerContainer(pulsarConsumerFacotyr,
pulsarContainerProperties);
return pulsarListenerContainer;
DefaultPulsarMessageListenerContainer仅创建单个使用者。
如果要通过多个线程管理多个消费者,则需要使用ConcurrentPulsarMessageListenerContainer.
4.2. ConcurrentPulsarMessageListenerContainer
ConcurrentPulsarMessageListenerContainer具有以下构造函数:
public ConcurrentPulsarMessageListenerContainer(PulsarConsumerFactory<? super T> pulsarConsumerFactory,
PulsarContainerProperties pulsarContainerProperties)
ConcurrentPulsarMessageListenerContainer允许您指定concurrency属性。
并发性超过1仅允许非独占订阅 (failover,shared和key-shared).
您只能拥有默认的1当您具有独占订阅模式时,用于并发。
以下示例启用concurrency通过PulsarListener注释failover订阅。
@PulsarListener(topics = "my-topic", subscriptionName = "subscription-1",
subscriptionType = SubscriptionType.Failover, concurrency = "3")
void listen(String message, Consumer<String> consumer) {
...
System.out.println("Current Thread: " + Thread.currentThread().getName());
System.out.println("Current Consumer: " + consumer.getConsumerName());
}
在前面的侦听器中,假定主题my-topic有三个分区。
如果是未分区的主题,则将并发设置为3什么都不做。除了主要的活动消费者之外,您还会得到两个空闲消费者。
如果主题具有三个以上的分区,则消息将在容器创建的使用者之间进行负载均衡。
如果运行此PulsarListener,则您会看到来自不同分区的消息通过不同的使用者使用,如前面示例中的线程名称和使用者名称打印输出所暗示的那样。
当您使用Failover订阅,Pulsar 保证消息排序。 |
以下列表显示了另一个示例PulsarListener,但与Sharedsubscription 和concurrency启用。
@PulsarListener(topics = "my-topic", subscriptionName = "subscription-1",
subscriptionType = SubscriptionType.Shared, concurrency = "5")
void listen(String message) {
...
}
在前面的示例中,PulsarListener创建五个不同的使用者(这一次,我们假设主题有五个分区)。
在此版本中,没有消息排序,因为Shared订阅不保证 Pulsar 中的任何消息排序。 |
如果您需要消息排序,并且仍然需要共享订阅类型,则需要使用Key_Shared订阅类型。
4.3. 消费记录
让我们看看消息侦听器容器如何实现单记录和基于批处理的消息消费。
单条记录消耗
让我们重新审视我们的基本PulsarListener为了这个讨论:
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(String message) {
System.out.println("Message Received: " + message);
}
有了这个PulsarListener方法,我们必须要求 Spring for Apache Pulsar 每次调用带有一条记录的监听器方法。
我们提到消息侦听器容器使用batchReceive方法。
框架检测到PulsarListener在这种情况下,接收一条记录。这意味着,在每次调用该方法时,它都需要一个单记录。
尽管消息侦听器容器会批量使用记录,但它会循环访问接收到的批处理,并通过适配器调用侦听器方法PulsarRecordMessageListener.
正如您在上一节中看到的,PulsarRecordMessageListener从MessageListener由 Pulsar Java 客户端提供,它支持基本的received方法。
批量消耗
以下示例显示了PulsarListener批量消费记录:
@PulsarListener(subscriptionName = "hello-batch-subscription", topics = "hello-batch", schemaType = SchemaType.JSON, batch = true)
public void listen4(List<Foo> messages) {
System.out.println("records received :" + messages.size());
messages.forEach((message) -> System.out.println("record : " + message));
}
当您使用这种类型的PulsarListener,则框架检测到您处于批处理模式。
由于它已经使用消费者的batchReceive方法,它通过适配器将整个批次交给监听器方法,用于PulsarBatchMessageListener.
5. 脉冲星接头
Pulsar 消息元数据可以作为 Spring 消息头使用。可用头的列表可以在 PulsarHeaders.java 中找到。
5.1. 在基于单记录的消费者中访问
以下示例显示了如何在使用单记录消费模式的应用程序中访问各种 Pulsar 标头:
@PulsarListener(topics = "simpleListenerWithHeaders")
void simpleListenerWithHeaders(String data, @Header(PulsarHeaders.MESSAGE_ID) MessageId messageId,
@Header(PulsarHeaders.RAW_DATA) byte[] rawData,
@Header("foo") String foo) {
}
在前面的示例中,我们访问messageId和rawData消息元数据以及名为foo. Spring@Header注释用于每个标题字段。
您还可以使用 Pulsar 的Message作为信封来承载有效负载。这样做时,用户可以直接调用 Pulsar 消息上的相应方法来检索元数据。但是,为了方便起见,您也可以使用Header注解。 请注意,您还可以使用 Spring 消息传递Message信封来承载有效负载,然后使用@Header.
5.2. 在基于批处理记录的消费者中访问
在本节中,我们将了解如何在使用批处理消费者的应用程序中访问各种 Pulsar 标头:
@PulsarListener(topics = "simpleBatchListenerWithHeaders", batch = true)
void simpleBatchListenerWithHeaders(List<String> data,
@Header(PulsarHeaders.MESSAGE_ID) List<MessageId> messageIds,
@Header(PulsarHeaders.TOPIC_NAME) List<String> topicNames, @Header("foo") List<String> fooValues) {
}
在前面的示例中,我们将数据用作List<String>. 在提取各种标头时,我们作为List<>也。 Spring for Apache Pulsar 确保标头列表与数据列表相对应。
当您使用批处理侦听器时,您还可以以相同的方式提取标头,并接收有效负载作为List<org.apache.pulsar.client.api.Message<?>,org.apache.pulsar.client.api.Messages<?>或org.springframework.messaging.Messsge<?>.
6. 消息确认
当您使用 Spring for Apache Pulsar 时,消息确认由框架处理,除非应用程序选择退出。在本节中,我们将详细介绍框架如何处理消息确认。
6.1. 消息确认模式
Spring for Apache Pulsar 提供了以下用于确认消息的模式:
-
BATCH -
RECORD -
MANUAL
BATCH确认模式是默认模式,但您可以在消息侦听器容器上更改它。
在以下部分中,我们将了解同时使用PulsarListener以及它们如何转换为支持消息侦听器容器(并最终转换为 Pulsar 消费者)。
6.2. 单记录模式下的自动消息确认
让我们重新审视我们基于基本的单一消息PulsarListener:
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(String message) {
System.out.println("Message Received: " + message);
}
很自然地想知道,当您使用PulsarListener,特别是如果您熟悉直接使用 Pulsar 消费者。
答案归结为消息侦听器容器,因为它是 Spring for Apache Pulsar 中的中心位置,用于协调所有与消费者相关的活动。
假设您没有覆盖默认行为,那么当您使用上述内容时,这就是在后台发生的情况PulsarListener:
-
首先,侦听器容器以批处理形式接收来自 Pulsar 消费者的消息。
-
收到的消息将传递给
PulsarListener一次一条消息。 -
当所有记录都传递给侦听器方法并成功处理时,容器将确认原始批处理中的所有消息。
这是正常流程。如果原始批次中的任何记录抛出异常,Spring for Apache Pulsar 会单独跟踪这些记录。
当处理批处理中的所有记录时,Spring for Apache Pulsar 会确认所有成功的消息,并否定确认(nack)所有失败的消息。
换句话说,当使用PulsarRecordMessageListener默认的 ack 模式为BATCH,则框架会等待从batchReceive调用 process 成功,然后调用acknowledge方法。
如果任何特定记录在调用处理程序方法时抛出异常,Spring for Apache Pulsar 会跟踪这些记录并单独调用negativeAcknowledge在处理整个批次后,在这些记录上。
如果应用程序希望每条记录发生确认或否定确认,则RECORD可以启用 ACK 模式。
在这种情况下,在处理每条记录后,如果没有错误,则确认消息,如果出现错误,则否定确认。
以下示例启用RECORDack 模式:
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar", ackMode = AckMode.RECORD)
public void listen(String message) {
System.out.println("Message Received: " + message);
}
6.3. 单记录模式下的手动消息确认
您可能并不总是希望框架发送确认,而是直接从应用程序本身执行此作。 Spring for Apache Pulsar 提供了几种启用手动消息确认的方法。以下示例显示了其中之一:
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar", ackMode = AckMode.MANUAL)
public void listen(Message<String> message, Acknowledgment acknowledgment) {
System.out.println("Message Received: " + message.getValue());
acknowledgment.acknowledge();
}
这里有几件事值得解释。首先,我们通过设置ackMode上PulsarListener.
启用手动确认模式时,Spring for Apache Pulsar 允许应用程序注入Acknowledgment对象。 该框架通过选择兼容的消息侦听器容器来实现这一点:PulsarAcknowledgingMessageListener对于基于单条记录的使用,这使您可以访问Acknowledgment对象。
这Acknowledgmentobject 提供了以下 API 方法:
void acknowledge();
void acknowledge(MessageId messageId);
void acknowledge(List<MessageId> messageIds);
void nack();
void nack(MessageId messageId);
您可以注入此Acknowledgmentobject 输入到您的PulsarListener使用MANUALack 模式,然后调用相应的方法之一。
在前面的PulsarListener例如,我们调用一个无参数acknowledge方法。 这是因为框架知道哪个Message它目前在下运行。调用时acknowledge(),则无需接收带有Messageenveloper',而是使用目标类型——String,在此示例中。您还可以调用acknowledge通过提供消息 ID:acknowledge.acknowledge(message.getMessageId());当您使用acknowledge(messageId),您必须使用Message<?>信封。
与可能的确认类似,确认的AcknowledgmentAPI 还提供了否定确认的选项。
请参阅前面显示的 nack 方法。
您也可以调用acknowledge直接在 Pulsar 消费者身上:
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar", ackMode = AckMode.MANUAL)
public void listen(Message<String> message, Consumer<String> consumer) {
System.out.println("Message Received: " + message.getValue());
try {
consumer.acknowledge(message);
}
catch (Exception e) {
....
}
}
调用时acknowledge直接在底层消费者上,需要自己做错误处理。
使用Acknowledgment不需要这样做,因为框架可以为您做到这一点。
因此,您应该使用Acknowledgment使用手动确认时的对象方法。
| 使用手动确认时,重要的是要了解该框架完全不进行任何确认。 因此,在设计应用程序时考虑正确的确认策略非常重要。 |
6.4. 批量消费中的自动消息确认
当您批量使用记录时(请参阅“消息确认模式”),并且使用默认的确认模式BATCH,当整个批处理成功时,确认整个批次。
如果任何记录引发异常,则整个批处理将被否定确认。
请注意,这可能与在生产者端批处理的批次不同。相反,这是从调用batchReceive对消费者
考虑以下批处理侦听器:
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar", batch = true)
public void batchListen(List<Foo> messages) {
for (Foo foo : messages) {
...
}
}
当传入集合中的所有消息 (messages在此示例中),框架会确认所有这些。
在批量模式下消费时,RECORD不允许的确认模式。
这可能会导致问题,因为应用程序可能不希望再次重新传递整个批次。
在这种情况下,您需要使用MANUAL确认模式。
6.5. 批量消费中的手动消息确认
如上一节所示,当MANUALack 模式,则框架不执行任何确认,无论是正面还是负面。
完全取决于应用程序来处理此类问题。
什么时候MANUALack 模式,则 Spring for Apache Pulsar 选择一个兼容的消息监听器容器:PulsarBatchAcknowledgingMessageListener用于批量使用,这使您可以访问Acknowledgment对象。 以下是Acknowledgment应用程序接口:
void acknowledge();
void acknowledge(MessageId messageId);
void acknowledge(List<MessageId> messageIds);
void nack();
void nack(MessageId messageId);
您可以注入此Acknowledgmentobject 输入到您的PulsarListener使用MANUALack 模式。以下列表显示了基于批处理的侦听器的基本示例:
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(List<Message<String>> messgaes, Acknowlegement acknowledgment) {
for (Message<String> message : messages) {
try {
...
acknowledgment.acknowledge(message.getMessageId());
}
catch (Exception e) {
acknowledgment.nack(message.getMessageId());
}
}
}
使用批处理侦听器时,消息侦听器容器无法知道它当前正在哪个记录上运行。因此,要手动确认,您需要使用重载的acknowledge采用MessageId或List<MessageId>. 您还可以使用MessageId对于批处理侦听器。
7. 消息重新传递和错误处理
现在我们已经看到了两者PulsarListener以及消息侦听器容器基础设施及其各种功能,现在让我们尝试了解消息重新传递和错误处理。Apache Pulsar 为消息重新传递和错误处理提供了各种本机策略。我们来看看它们,看看我们如何通过 Spring for Apache Pulsar 使用它们。
7.1. 指定消息重新传递的确认超时
默认情况下,除非使用者崩溃,否则 Pulsar 使用者不会重新传递消息,但您可以通过在 Pulsar 使用者上设置 ack 超时来更改此行为。如果 ack timeout 属性的值大于零,并且 Pulsar 使用者在该超时期限内未确认消息,则会重新传递该消息。
当您使用 Spring for Apache Pulsar 时,您可以通过消费者定制器或使用本机 Pulsar 设置此属性ackTimeoutMillis属性中的properties属性@PulsarListener:
@PulsarListener(subscriptionName = "subscription-1", topics = "topic-1"
properties = {"ackTimeoutMillis=60000"})
public void listen(String s) {
...
}
指定 ack 超时时,如果消费者在 60 秒内未发送确认,则 Pulsar 将消息重新传递给消费者。
如果要为具有不同延迟的确认超时指定一些高级回退选项,可以执行以下作:
@EnablePulsar
@Configuration
class AckTimeoutRedeliveryConfig {
@PulsarListener(subscriptionName = "withAckTimeoutRedeliveryBackoffSubscription",
topics = "withAckTimeoutRedeliveryBackoff-test-topic",
ackTimeoutRedeliveryBackoff = "ackTimeoutRedeliveryBackoff",
properties = { "ackTimeoutMillis=60000" })
void listen(String msg) {
// some long-running process that may cause an ack timeout
}
@Bean
RedeliveryBackoff ackTimeoutRedeliveryBackoff() {
return MultiplierRedeliveryBackoff.builder().minDelayMs(1000).maxDelayMs(10 * 1000).multiplier(2)
.build();
}
}
在前面的示例中,我们为 Pulsar 的RedeliveryBackoff最小延迟为 1 秒,最大延迟为 10 秒,回退乘数为 2。
在初始确认超时发生后,消息重新传递通过此退避 Bean 进行控制。
我们将退避 bean 提供给PulsarListener通过设置ackTimeoutRedeliveryBackoff属性添加到实际的 bean 名称 —ackTimeoutRedeliveryBackoff,在这种情况下。
7.2. 指定否定确认重新传递
当确认否定时,Pulsar 消费者允许您指定应用程序希望如何重新传递消息。
默认设置是在一分钟内重新传递消息,但您可以通过消费者定制器或使用本机 Pulsar 进行更改negativeAckRedeliveryDelay属性中的properties属性@PulsarListener:
@PulsarListener(subscriptionName = "subscription-1", topics = "topic-1"
properties = {"negativeAckRedeliveryDelay=10ms"})
public void listen(String s) {
...
}
您还可以通过提供RedeliveryBackoffbean 并将 bean 名称作为negativeAckRedeliveryBackoffPulsarProducer 上的属性,如下所示:
@EnablePulsar
@Configuration
class NegativeAckRedeliveryConfig {
@PulsarListener(subscriptionName = "withNegRedeliveryBackoffSubscription",
topics = "withNegRedeliveryBackoff-test-topic", negativeAckRedeliveryBackoff = "redeliveryBackoff",
subscriptionType = SubscriptionType.Shared)
void listen(String msg) {
throw new RuntimeException("fail " + msg);
}
@Bean
RedeliveryBackoff redeliveryBackoff() {
return MultiplierRedeliveryBackoff.builder().minDelayMs(1000).maxDelayMs(10 * 1000).multiplier(2)
.build();
}
}
7.3. 使用 Apache Pulsar 的死信主题进行消息重新传递和错误处理
Apache Pulsar 允许应用程序在消费者上使用死信主题,并使用Shared订阅类型。对于Exclusive和Failover订阅类型,此功能不可用。基本思想是,如果消息重试了一定次数(可能是由于 ack 超时或 nack 重新传递),一旦重试次数用完,就可以将消息发送到称为死信队列 (DLQ) 的特殊主题。让我们通过检查一些代码片段来了解此功能的一些细节:
@EnablePulsar
@Configuration
class DeadLetterPolicyConfig {
@PulsarListener(id = "deadLetterPolicyListener", subscriptionName = "deadLetterPolicySubscription",
topics = "topic-with-dlp", deadLetterPolicy = "deadLetterPolicy",
subscriptionType = SubscriptionType.Shared, properties = { "ackTimeoutMillis=1000" })
void listen(String msg) {
throw new RuntimeException("fail " + msg);
}
@PulsarListener(id = "dlqListener", topics = "my-dlq-topic")
void listenDlq(String msg) {
System.out.println("From DLQ: " + msg);
}
@Bean
DeadLetterPolicy deadLetterPolicy() {
return DeadLetterPolicy.builder().maxRedeliverCount(10).deadLetterTopic("my-dlq-topic").build();
}
}
首先,我们有一个特殊的豆子DeadLetterPolicy,并将其命名为deadLetterPolicy(它可以是任何名称)。这个 bean 指定了许多内容,例如最大交付量(在本例中为 10)和死信主题的名称 —my-dlq-topic,在本例中。如果未指定 DLQ 主题名称,则默认为<topicname>-<subscriptionname>-DLQ在 Pulsar 中。接下来,我们将此 bean 名称提供给PulsarListener通过将deadLetterPolicy财产。 请注意,PulsarListener订阅类型为Shared,因为 DLQ 功能仅适用于共享订阅。此代码主要用于演示目的,因此我们提供了ackTimeoutMillis值为 1000。这个想法是代码抛出异常,如果 Pulsar 在 1 秒内没有收到 ack,它就会进行重试。如果该循环持续十次(因为这是我们在DeadLetterPolicy),Pulsar 消费者将消息发布到 DLQ 主题。我们还有另一个PulsarListener侦听 DLQ 主题以接收发布到 DLQ 主题的数据。
7.4. Apache Pulsar 的 Spring 中的本机错误处理
正如我们之前提到的,Apache Pulsar 中的 DLQ 功能仅适用于共享订阅。
如果应用程序需要对非共享订阅使用一些类似的功能,该怎么办?
Pulsar 不支持独占和故障转移订阅的 DLQ 的主要原因是因为这些订阅类型是订单保证的。
允许重新传递、DLQ 等有效地接收无序消息。
但是,如果应用程序可以接受,但更重要的是,非共享订阅需要此 DLQ 功能怎么办?
为此,Spring for Apache Pulsar 提供了一个PulsarConsumerErrorHandler,您可以在 Pulsar 中的任何订阅类型中使用:Exclusive,Failover,Shared或Key_Shared.
当您使用PulsarConsumerErrorHandler从 Spring for Apache Pulsar 中,请确保不要在侦听器上设置 ack 超时属性。
让我们通过检查一些代码片段来了解一些细节:
@EnablePulsar
@Configuration
class PulsarConsumerErrorHandlerConfig {
@Bean
PulsarConsumerErrorHandler<String> pulsarConsumerErrorHandler(
PulsarTemplate<String> pulsarTemplate) {
return new DefaultPulsarConsumerErrorHandler<>(
new PulsarDeadLetterPublishingRecoverer<>(pulsarTemplate, (c, m) -> "my-foo-dlt"), new FixedBackOff(100, 10));
}
@PulsarListener(id = "pulsarConsumerErrorHandler-id", subscriptionName = "pulsatConsumerErrorHandler-subscription",
topics = "pulsarConsumerErrorHandler-topic",
pulsarConsumerErrorHandler = "pulsarConsumerErrorHandler")
void listen(String msg) {
throw new RuntimeException("fail " + msg);
}
@PulsarListener(id = "pceh-dltListener", topics = "my-foo-dlt")
void listenDlt(String msg) {
System.out.println("From DLT: " + msg);
}
}
考虑一下pulsarConsumerErrorHandler豆。
这将创建一个类型为PulsarConsumerErrorHandler并使用 Spring 为 Apache Pulsar 提供的开箱即用的默认实现:DefaultPulsarConsumerErrorHandler.DefaultPulsarConsumerErrorHandler有一个构造函数,该构造函数采用PulsarMessageRecovererFactory和org.springframework.util.backoff.Backoff.PulsarMessageRecovererFactory是一个具有以下 API 的功能接口:
@FunctionalInterface
public interface PulsarMessageRecovererFactory<T> {
/**
* Provides a message recoverer {@link PulsarMessageRecoverer}.
* @param consumer Pulsar consumer
* @return {@link PulsarMessageRecoverer}.
*/
PulsarMessageRecoverer<T> recovererForConsumer(Consumer<T> consumer);
}
这recovererForConsumer方法采用 Pulsar 消费者并返回PulsarMessageRecoverer,这是另一个功能接口。
这是PulsarMessageRecoverer:
public interface PulsarMessageRecoverer<T> {
/**
* Recover a failed message, for e.g. send the message to a DLT.
* @param message Pulsar message
* @param exception exception from failed message
*/
void recoverMessage(Message<T> message, Exception exception);
}
Spring for Apache Pulsar 提供了一个实现PulsarMessageRecovererFactory叫PulsarDeadLetterPublishingRecoverer提供默认实现,可以通过将消息发送到死信主题 (DLT) 来恢复消息。
我们将此实现提供给前面的构造函数DefaultPulsarConsumerErrorHandler.
作为第二个参数,我们提供了一个FixedBackOff.
您还可以提供ExponentialBackoff来自 Spring 的高级退避功能。
然后我们为PulsarConsumerErrorHandler作为属性添加到PulsarListener.
该属性称为pulsarConsumerErrorHandler.
每次PulsarListener方法失败,则重试。
重试次数由Backoff提供的实现值。在我们的示例中,我们进行了 10 次重试(总共 11 次尝试 - 第一次尝试,然后是 10 次重试)。
一旦所有重试都用完,消息将发送到 DLT 主题。
这PulsarDeadLetterPublishingRecoverer我们提供的实现使用PulsarTemplate用于将消息发布到 DLT。
在大多数情况下,相同的自动配置PulsarTemplate来自 Spring Boot 就足够了,但对分区主题有警告。
使用分区主题并对主主题使用自定义消息路由时,必须使用不同的PulsarTemplate不采用自动配置的PulsarProducerFactory填充了值custompartition为message-routing-mode.
您可以使用PulsarConsumerErrorHandler使用以下蓝图:
@Bean
PulsarConsumerErrorHandler<Integer> pulsarConsumerErrorHandler(PulsarClient pulsarClient) {
PulsarProducerFactory<Integer> pulsarProducerFactory = new DefaultPulsarProducerFactory<>(pulsarClient, Map.of());
PulsarTemplate<Integer> pulsarTemplate = new PulsarTemplate<>(pulsarProducerFactory);
BiFunction<Consumer<?>, Message<?>, String> destinationResolver =
(c, m) -> "my-foo-dlt";
PulsarDeadLetterPublishingRecoverer<Integer> pulsarDeadLetterPublishingRecoverer =
new PulsarDeadLetterPublishingRecoverer<>(pulsarTemplate, destinationResolver);
return new DefaultPulsarConsumerErrorHandler<>(pulsarDeadLetterPublishingRecoverer,
new FixedBackOff(100, 5));
}
请注意,我们为PulsarDeadLetterPublishingRecoverer作为第二个构造函数参数。
如果未提供,PulsarDeadLetterPublishingRecoverer使用<subscription-name>-<topic-name>-DLT>作为 DLT 主题名称。
使用此功能时,应通过设置目标解析器来使用正确的目标名称,而不是使用默认值。
当使用单个记录消息侦听器时,就像我们所做的那样PulsarConsumerErrorHnadler,如果使用手动确认,请确保在抛出异常时不要负向确认消息。
相反,将异常重新抛回容器。否则,容器会认为消息是单独处理的,并且不会触发错误处理。
最后,我们有了第二个PulsarListener接收来自 DLT 主题的消息。
在本节提供的示例中,到目前为止,我们只看到了如何使用PulsarConsumerErrorHandler使用单个记录消息侦听器。
接下来,我们看看如何在批量侦听器上使用它。
7.5. 使用 PulsarConsumerErrorHandler 的批量监听器
首先,让我们看看一批PulsarListener方法:
@PulsarListener(subscriptionName = "batch-demo-5-sub", topics = "batch-demo-4", batch = true, concurrency = "3",
subscriptionType = SubscriptionType.Failover,
pulsarConsumerErrorHandler = "pulsarConsumerErrorHandler", ackMode = AckMode.MANUAL)
void listen(List<Message<Integer>> data, Consumer<Integer> consumer, Acknowledgment acknowledgment) {
for (Message<Integer> datum : data) {
if (datum.getValue() == 5) {
throw new PulsarBatchListenerFailedException("failed", datum);
}
acknowledgement.acknowledge(datum.getMessageId());
}
}
@Bean
PulsarConsumerErrorHandler<String> pulsarConsumerErrorHandler(
PulsarTemplate<String> pulsarTemplate) {
return new DefaultPulsarConsumerErrorHandler<>(
new PulsarDeadLetterPublishingRecoverer<>(pulsarTemplate, (c, m) -> "my-foo-dlt"), new FixedBackOff(100, 10));
}
@PulsarListener(subscriptionName = "my-dlt-subscription", topics = "my-foo-dlt")
void dltReceiver(Message<Integer> message) {
System.out.println("DLT - RECEIVED: " + message.getValue());
}
再一次,我们提供了pulsarConsumerErrorHandler属性替换为PulsarConsumerErrorHandlerbean 名称。
当您使用批处理侦听器(如前面的示例所示)并希望使用PulsarConsumerErrorHandler从 Spring for Apache Pulsar 开始,您需要使用手动确认。
这样,您就可以确认所有成功的单个消息。
对于失败的,您必须抛出一个PulsarBatchListenerFailedException与失败的消息。
如果没有这个例外,框架就不知道该如何处理故障。
重试时,容器会向侦听器发送一批新的消息,从失败的消息开始。
如果再次失败,则重试,直到重试用尽,此时消息将发送到 DLT。
此时,容器确认消息,侦听器将与原始批处理中的后续消息一起移交。
8. PulsarListener 上的消费者定制
Spring for Apache Pulsar 提供了一种方便的方法来自定义由PulsarListener.
应用程序可以提供一个 beanPulsarListenerConsumerBuilderCustomizer.
这是一个例子。
@Bean
public PulsarListenerConsumerBuilderCustomizer<String> myCustomizer() {
return cb -> {
cb.subscriptionName("modified-subscription-name");
};
}
然后,可以将此定制器 bean 名称作为属性提供PuslarListener注释,如下所示。
@PulsarListener(subscriptionName = "my-subscription",
topics = "my-topic", consumerCustomizer = "myCustomizer")
void listen(String message) {
}
框架通过PulsarListener并在创建 Pulsar 消费者之前在消费者构建器上应用此定制器。
如果您有多个PulsarListener方法,并且每个方法都有不同的自定义规则,您应该创建多个定制器 bean 并在每个 bean 上附加适当的定制器PulsarListener.
9. 消息监听器容器生命周期
9.1. 暂停和恢复
在某些情况下,应用程序可能希望暂时暂停消息使用,然后稍后恢复。 Spring for Apache Pulsar 提供了暂停和恢复底层消息侦听器容器的功能。 当 Pulsar 消息侦听器容器暂停时,容器为从 Pulsar 消费者接收数据而执行的任何轮询都将暂停。 同样,当容器恢复时,如果主题在暂停时添加了任何新记录,则下一次轮询将开始返回数据。
要暂停或恢复监听器容器,请首先通过PulsarListenerEndpointRegistrybean,然后在容器实例上调用暂停/恢复 API - 如下面的代码片段所示:
@Autowired
private PulsarListenerEndpointRegistry registry;
void someMethod() {
PulsarMessageListenerContainer container = registry.getListenerContainer("my-listener-id");
container.pause();
}
传递给getListenerContainer是容器 ID - 这将是@PulsarListenerid 属性,暂停/恢复@PulsarListener. |
10. Pulsar 读卡器支持
Spring Boot 提供了这个读取器工厂,您可以通过指定任何spring.pulsar.reader.*应用程序属性。
10.1. PulsarReader 注解
虽然可以使用PulsarReaderFactory直接,Spring for Apache Pulsar 提供了PulsarReader注释,您可以使用它来快速阅读主题,而无需自己设置任何阅读器工厂。这类似于背后的相同想法PulsarListener.这是一个简单的例子。
@PulsarReader(id = "reader-demo-id", topics = "reader-demo-topic", startMessageId = "earliest")
void read(String message) {
//...
}
这id属性是可选的,但最佳实践是提供对应用程序有意义的值。如果未指定,将使用自动生成的 ID。另一方面,topics和startMessageId属性是强制性的。 这topics属性可以是单个主题,也可以是逗号分隔的主题列表。 这startMessageId属性指示读者从主题中的特定消息开始。的有效值startMessageId是earliest或latest.假设您希望读者从最早或最新的可用消息以外的主题开始任意读取消息。在这种情况下,您需要使用ReaderBuilderCustomizer自定义ReaderBuilder所以它知道正确的MessageId从开始。
10.2. 自定义 ReaderBuilder
您可以通过以下方式自定义任何可用字段ReaderBuilder使用PulsarReaderReaderBuilderCustomizer在 Apache Pulsar 的 Spring 中。
您可以提供一个@Bean类型PulsarReaderReaderBuilderCustomizer,然后将其提供给PulsarReader如下。
@PulsarReader(id = "reader-customizer-demo-id", topics = "reader-customizer-demo-topic",
readerCustomizer = "myCustomizer")
void read(String message) {
//...
}
@Bean
public PulsarReaderReaderBuilderCustomizer<String> myCustomizer() {
return readerBuilder -> {
readerBuilder.startMessageId(messageId); // the first message read is after this message id.
// Any other customizations on the readerBuilder
};
}
如果您的应用程序只有一个@PulsarReader和单个PulsarReaderReaderBuilderCustomizerbean 注册,则定制器将自动应用。 |