消息消费
1. Pulsar Listener
When it comes to Pulsar consumers, we recommend that end-user applications use the PulsarListener 注解.
To use PulsarListener, you need to use the @EnablePulsar 注解.
When you use Spring Boot support, it automatically enables this annotation and configures all the components necessary for PulsarListener, such as the message listener infrastructure (which is responsible for creating the Pulsar consumer).
PulsarMessageListenerContainer uses a PulsarConsumerFactory to create and manage the Pulsar consumer the underlying Pulsar consumer that it uses to consume messages.
Spring Boot 提供了这个 consumer 工厂,您可以通过指定 spring.pulsar.consumer.* 应用程序属性进一步配置。
让我们重新审视快速浏览部分中看到的PulsarListener代码片段:
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(String message) {
System.out.println("Message Received: " + message);
}
您可以进一步简化此方法:
@PulsarListener
public void listen(String message) {
System.out.println("Message Received: " + message);
}
在最简单的情况下,当在 @PulsarListener 注解上不提供 subscriptionName 时,将使用自动生成的订阅名称。
类似地,当不直接提供 topics 时,将使用一个 主题解析过程 来确定目标主题。
在前面所示的PulsarListener方法中,我们接收数据作为12类型,并基于该信息推断出模式类型,然后将该模式提供给消费者。
框架会对所有原始类型执行这种推断。
对于所有非原始类型,默认模式为JSON。
如果复杂类型使用除JSON以外的模式(如AVRO或KEY_VALUE),你必须在注解的schemaType
以下示例展示了另一个 PulsarListener 方法,它接受一个 Integer:
@PulsarListener(subscriptionName = "my-subscription-1", topics = "my-topic-1")
public void listen(Integer message) {
System.out.println(message);
}
以下PulsarListener方法展示了如何从主题消费复杂类型:
@PulsarListener(subscriptionName = "my-subscription-2", topics = "my-topic-2", schemaType = SchemaType.JSON)
public void listen(Foo message) {
System.out.println(message);
}
让我们再看一些其他方法。
您可以直接消费Pulsar消息:
@PulsarListener(subscriptionName = "my-subscription", topics = "my-topic")
public void listen(org.apache.pulsar.client.api.Message<String> message) {
System.out.println(message.getValue());
}
以下示例通过使用 Spring 消息封装来消费记录:
@PulsarListener(subscriptionName = "my-subscription", topics = "my-topic")
public void listen(org.springframework.messaging.Message<String> message) {
System.out.println(message.getPayload());
}
现在让我们看看如何以批处理方式消费记录。
以下示例使用 PulsarListener 以 POJOs 的形式以批处理方式消费记录:
@PulsarListener(subscriptionName = "hello-batch-subscription", topics = "hello-batch", schemaType = SchemaType.JSON, batch = true)
public void listen(List<Foo> messages) {
System.out.println("records received :" + messages.size());
messages.forEach((message) -> System.out.println("record : " + message));
}
注意,在此示例中,我们接收记录作为集合(List)的对象。
此外,要启用在PulsarListener级别的批量消费,需要将注解上的batch属性设置为true。
基于List实际持有的类型,框架会尝试推断应使用的模式。
如果List包含除JSON以外的复杂类型,你仍需在PulsarListener上提供schemaType。
以下示例使用了由 Pulsar Java 客户端提供的 Message 邮包:
@PulsarListener(subscriptionName = "hello-batch-subscription", topics = "hello-batch", schemaType = SchemaType.JSON, batch = true)
public void listen(List<Message<Foo>> messages) {
System.out.println("records received :" + messages.size());
messages.forEach((message) -> System.out.println("record : " + message.getValue()));
}
以下示例消费类型为 Spring 消息的包封 Message 类型的批量记录:
@PulsarListener(subscriptionName = "hello-batch-subscription", topics = "hello-batch", schemaType = SchemaType.JSON, batch = true)
public void listen(List<org.springframework.messaging.Message<Foo>> messages) {
System.out.println("records received :" + messages.size());
messages.forEach((message) -> System.out.println("record : " + message.getPayload()));
}
最后,你也可以使用来自 Pulsar 的 Messages 占位对象用于批处理监听器:
@PulsarListener(subscriptionName = "hello-batch-subscription", topics = "hello-batch", schemaType = SchemaType.JSON, batch = true)
public void listen(org.apache.pulsar.client.api.Messages<Foo>> messages) {
System.out.println("records received :" + messages.size());
messages.forEach((message) -> System.out.println("record : " + message.getValue()));
}
当您使用 PulsarListener 时,可以在注解本身上直接提供 Pulsar consumer 属性。
这在不使用前面提到的 Boot 配置属性或有多个 PulsarListener 方法时很方便。
以下示例直接在PulsarListener上使用Pulsar消费者属性:
@PulsarListener(properties = { "subscriptionName=subscription-1", "topicNames=foo-1", "receiverQueueSize=5000" })
void listen(String message) {
}
使用的属性为直接的Pulsar消费者属性,而不是spring.pulsar.consumer应用程序配置属性 |
1.1. 通用记录 with AUTO_CONSUME
如果无法提前知道Pulsar主题的模式类型,可以使用AUTO_CONSUME模式类型来消费通用记录。
在这种情况下,主题会使用与该主题关联的模式信息,将消息反序列化为GenericRecord个对象。
要消费泛型记录,请在您的schemaType = SchemaType.AUTO_CONSUME上设置@PulsarListener,并使用类型为GenericRecord的Pulsar消息作为消息参数,如下所示。
@PulsarListener(topics = "my-generic-topic", schemaType = SchemaType.AUTO_CONSUME)
void listen(org.apache.pulsar.client.api.Message<GenericRecord> message) {
GenericRecord record = message.getValue();
record.getFields().forEach((f) ->
System.out.printf("%s = %s%n", f.getName(), record.getField(f)));
}
The GenericRecord API 允许访问字段及其关联的值 |
1.2. 自定义 ConsumerBuilder
您可以通过提供一个类型为PulsarListenerConsumerBuilderCustomizer的@Bean,使用PulsarListenerConsumerBuilderCustomizer自定义通过ConsumerBuilder可用的任何字段,并将其提供给PulsarListener,如下所示。
@PulsarListener(topics = "hello-topic", consumerCustomizer = "myCustomizer")
public void listen(String message) {
System.out.println("Message Received: " + message);
}
@Bean
PulsarListenerConsumerBuilderCustomizer<String> myCustomizer() {
return (builder) -> builder.consumerName("myConsumer");
}
如果您的应用程序只注册了一个@PulsarListener和一个PulsarListenerConsumerBuilderCustomizer的bean,那么自定义器将自动应用。 |
2. 指定模式信息
如前所述,对于 Java 原始类型,Spring for Apache Pulsar 框架可以在 PulsarListener 上推断出适当的 Schema。
对于非原始类型,如果在注解上未显式指定 Schema,Spring for Apache Pulsar 框架将尝试从类型构建一个 Schema.JSON。
| 支持的复杂模式类型包括:JSON,AVRO,PROTOBUF,AUTO_CONSUME,KEY_VALUE(带 INLINE 编码)。 |
2.1. 自定义模式映射
作为复杂类型的模式指定的替代方案,可以配置模式解析器对类型的映射。这样就无需在监听器上设置模式,因为框架会根据传入消息的类型咨询解析器。
2.1.1. 配置属性
模式映射可以通过spring.pulsar.defaults.type-mappings属性进行配置。
以下示例使用application.yml来为User和Address两个复杂对象添加映射,分别使用AVRO和JSON两个模式:
spring:
pulsar:
defaults:
type-mappings:
- message-type: com.acme.User
schema-info:
schema-type: AVRO
- message-type: com.acme.Address
schema-info:
schema-type: JSON
message-type 是消息类的完全限定名称。 |
2.1.2. 模式解析器自定义器
推荐的方法是通过上述属性来添加映射。 然而,如果需要更多控制,你可以提供一个模式解析器自定义器来添加映射(s)。
以下示例使用了模式解析器自定义器来为User和Address复杂数组使用AVRO和JSON模式进行映射,分别:
@Bean
public SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() {
return (schemaResolver) -> {
schemaResolver.addCustomSchemaMapping(User.class, Schema.AVRO(User.class));
schemaResolver.addCustomSchemaMapping(Address.class, Schema.JSON(Address.class));
}
}
2.1.3. 类型映射注解
另一种为特定消息类型指定默认模式信息的方法是给消息类加上@PulsarMessage注解。
模式信息可以通过在注解上使用schemaType属性进行指定。
以下示例配置系统在生产或消费类型 Foo 的消息时使用 JSON 作为默认模式:
@PulsarMessage(schemaType = SchemaType.JSON)
record Foo(String value) {
}
在进行上述配置后,无需在监听器上设置模式,例如:
@PulsarListener(subscriptionName = "user-sub", topics = "user-topic")
public void listen(User user) {
System.out.println(user);
}
3. 访问 Pulsar 消费者对象
有时,你可能需要直接访问 Pulsar Consumer 对象。 以下示例展示了如何获取它:
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(String message, org.apache.pulsar.client.api.Consumer<String> consumer) {
System.out.println("Message Received: " + message);
ConsumerStats stats = consumer.getStats();
...
}
当通过这种方式访问 Consumer 对象时,请不要调用任何会更改 Consumer 光标位置的操作(不要调用任何 receive 方法)。所有此类操作必须由容器执行。 |
4. Pulsar Message Listener Container
现在我们已经通过 PulsarListener 看到了消费者侧的基本交互。现在让我们深入了解 PulsarListener 如何与底层的 Pulsar 消费者进行交互。
请记住,对于大多数应用场景,我们推荐在使用 Spring for Apache Pulsar 从 Pulsar 主题消费时直接使用 PulsarListener 注解,因为该模型涵盖了广泛的应用场景。
然而,了解 PulsarListener 的内部工作原理很重要。本节将详细介绍这些细节。
As briefly mentioned earlier, the message listener container is at the heart of message consumption when you use Spring for Apache Pulsar.
PulsarListener uses the message listener container infrastructure behind the scenes to create and manage the Pulsar consumer.
Spring for Apache Pulsar provides the contract for this message listener container through PulsarMessageListenerContainer.
The default implementation for this message listener container is provided through DefaultPulsarMessageListenerContainer.
As its name indicates, PulsarMessageListenerContainer contains the message listener.
The container creates the Pulsar consumer and then runs a separate thread to receive and handle the data.
The data is handled by the provided message listener implementation.
消息监听容器通过使用消费者’s 的 batchReceive 方法以批处理方式消费数据。
一旦接收到数据,它将传递给所选的消息监听实现。
以下可用的消息监听器类型适用于使用 Spring for Apache Pulsar。
我们将在以下各节中看到这些不同类型的消息监听器的详细信息。
在进行此操作之前,让我们更仔细地看一下容器本身。
4.1. DefaultPulsarMessageListenerContainer
这是一个基于单消费者的消息监听器容器。 以下代码示例显示了其构造函数:
public DefaultPulsarMessageListenerContainer(PulsarConsumerFactory<? super T> pulsarConsumerFactory,
PulsarContainerProperties pulsarContainerProperties)
}
它接收一个PulsarConsumerFactory(它用其创建消费者)和一个PulsarContainerProperties对象(其中包含有关容器属性的信息)。
PulsarContainerProperties有以下构造函数:
public PulsarContainerProperties(String... topics)
public PulsarContainerProperties(Pattern topicPattern)
你可以通过 PulsarContainerProperties 或作为提供给消费者工厂的消费者属性来提供主题信息。
以下示例使用 DefaultPulsarMessageListenerContainer:
Map<String, Object> config = new HashMap<>();
config.put("topics", "my-topic");
PulsarConsumerFactory<String> pulsarConsumerFactorY = DefaultPulsarConsumerFactory<>(pulsarClient, config);
PulsarContainerProperties pulsarContainerProperties = new PulsarContainerProperties();
pulsarContainerProperties.setMessageListener((PulsarRecordMessageListener<?>) (consumer, msg) -> {
});
DefaultPulsarMessageListenerContainer<String> pulsarListenerContainer = new DefaultPulsarMessageListenerContainer(pulsarConsumerFacotyr,
pulsarContainerProperties);
return pulsarListenerContainer;
如果直接使用监听器容器时未指定主题信息,则会使用 PulsarListener 所使用的相同主题解析过程,唯一的例外是省略了“消息类型默认”步骤。 |
DefaultPulsarMessageListenerContainer 只创建一个消费者。
如果你想要通过多个线程管理多个消费者,需要使用 ConcurrentPulsarMessageListenerContainer。
4.2. ConcurrentPulsarMessageListenerContainer
ConcurrentPulsarMessageListenerContainer 有以下构造函数:
public ConcurrentPulsarMessageListenerContainer(PulsarConsumerFactory<? super T> pulsarConsumerFactory,
PulsarContainerProperties pulsarContainerProperties)
ConcurrentPulsarMessageListenerContainer 允许通过 setter 指定 concurrency 属性。
当使用非独占订阅(failover, shared, 和 key-shared)时,允许的并发数超过 1。
在独占订阅模式下,只能将默认的 1 作为并发设置。
以下示例为 concurrency 通过 PulsarListener 注解启用 failover 订阅功能。
@PulsarListener(topics = "my-topic", subscriptionName = "subscription-1",
subscriptionType = SubscriptionType.Failover, concurrency = "3")
void listen(String message, Consumer<String> consumer) {
...
System.out.println("Current Thread: " + Thread.currentThread().getName());
System.out.println("Current Consumer: " + consumer.getConsumerName());
}
在前面的监听器中,假定主题 my-topic 有三个分区。
如果是非分区主题,将并发设置为 3 没有作用。你将看到除主活动消费者外还有两个空闲消费者。
如果主题的分区数超过三个,容器创建的消费者会在这多个分区之间进行负载均衡。
如果你运行这个 PulsarListener,你会看到来自不同分区的消息通过不同的消费者进行消费,正如前面示例中通过线程名和消费者名打印所暗示的那样。
当使用 Failover 订阅方式在分区主题上时,Pulsar 保证消息有序性。 |
以下代码示例展示了另一个 PulsarListener 的示例,但使用了 Shared 个订阅和 concurrency 个启用。
@PulsarListener(topics = "my-topic", subscriptionName = "subscription-1",
subscriptionType = SubscriptionType.Shared, concurrency = "5")
void listen(String message) {
...
}
在前述示例中,PulsarListener 会创建五个不同的消费者(这次我们假设该主题有五个分区)。
在本版本中,由于订阅数为Shared,Pulsar 不保证任何消息顺序。 |
如果你需要消息有序性并且还想使用共享订阅类型,你需要使用 Key_Shared 订阅类型。
4.3. 消费记录
让我们来看一下消息监听容器如何启用单条记录和基于批次的消息消费。
单条记录消费
让我们重新审视我们本次讨论的基本 PulsarListener:
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(String message) {
System.out.println("Message Received: " + message);
}
使用这个 PulsarListener 方法,我们本质上向 Spring 请求 Apache Pulsar,在每次调用时通过监听器方法传递单个记录。
我们提到消息监听器容器使用 batchReceive 方法在消费者上以批处理方式消费数据。
框架检测到 PulsarListener,在这种情况下每次只接收一个记录。这意味着在每次调用方法时,它需要单个记录。
尽管消息监听器容器以批处理方式消费数据,它会遍历接收到的批次,并通过适配器调用 PulsarRecordMessageListener 中的监听器方法。
正如在上一节中所示,PulsarRecordMessageListener 继承自 Pulsar Java 客户端提供的 MessageListener,并支持基本的 received 方法。
批量消费
以下示例显示了PulsarListener批量消费记录:
@PulsarListener(subscriptionName = "hello-batch-subscription", topics = "hello-batch", schemaType = SchemaType.JSON, batch = true)
public void listen4(List<Foo> messages) {
System.out.println("records received :" + messages.size());
messages.forEach((message) -> System.out.println("record : " + message));
}
当您使用这种PulsarListener类型时,框架会检测到您处于批处理模式。
由于已经通过使用Consumer的batchReceive方法接收了批处理数据,它会通过适配器将整个批处理传递给监听器方法的PulsarBatchMessageListener方法。
5. Pulsar 头部
Pulsar 并没有提供一等的“头部”概念,而是提供了一个用于自定义用户属性的映射,以及访问通常存储在消息头中的消息元数据的方法(例如 id 和 event-time)。
因此,“Pulsar 消息头部”和“Pulsar 消息元数据”的术语可以互换使用。
可用的消息元数据(头部)列表可以在 PulsarHeaders.java 中找到。
5.1. Spring 标题
Spring 消息传递提供了通过其 MessageHeaders 抽象的优秀“标头”支持。
The Pulsar message metadata can be consumed as Spring message headers. The list of available headers can be found in PulsarHeaders.java
5.2. 单条记录为基础的消费者访问
以下示例显示了如何在使用消费单记录模式的应用程序中访问各种 Pulsar 头部:
@PulsarListener(topics = "simpleListenerWithHeaders")
void simpleListenerWithHeaders(String data, @Header(PulsarHeaders.MESSAGE_ID) MessageId messageId,
@Header(PulsarHeaders.RAW_DATA) byte[] rawData,
@Header("foo") String foo) {
}
在前面的例子中,我们访问了messageId和rawData消息元数据的值,以及一个名为foo的自定义消息属性。
Spring 使用 @Header 注解为每个标头字段进行注解。
您也可以使用 Pulsar 的 Message 作为承载负载的包络。
在这种情况下,用户可以直接在 Pulsar 消息上调用相应的方法来检索元数据。
但是,为了方便,您也可以通过使用 Header 注解来检索。
请注意,您也可以使用 Spring 消息 Message 包络来承载负载,然后通过使用 @Header 来检索 Pulsar 头部。
5.3. 批量记录基于消费者的访问
在此部分,我们看到如何在使用批处理消费者的程序中访问各种 Pulsar 头部:
@PulsarListener(topics = "simpleBatchListenerWithHeaders", batch = true)
void simpleBatchListenerWithHeaders(List<String> data,
@Header(PulsarHeaders.MESSAGE_ID) List<MessageId> messageIds,
@Header(PulsarHeaders.TOPIC_NAME) List<String> topicNames, @Header("foo") List<String> fooValues) {
}
在 preceding example 中,我们消费数据时将其作为List<String>。
当提取各种头信息时,也作为List<>进行提取。
Spring for Apache Pulsar 确保头信息列表与数据列表相对应。
你也可以以相同的方式在使用批处理监听器并接收负载为 List<org.apache.pulsar.client.api.Message<?>、org.apache.pulsar.client.api.Messages<?> 或 org.springframework.messaging.Messsge<?> 时提取标头。
5.4. 消息标头映射
The PulsarHeaderMapper 策略用于将头信息映射到和从 Pulsar 用户属性与 Spring MessageHeaders 之间进行映射。
其接口定义如下:
public interface PulsarHeaderMapper {
Map<String, String> toPulsarHeaders(MessageHeaders springHeaders);
MessageHeaders toSpringHeaders(Message<?> pulsarMessage);
}
该框架提供了几种映射器实现。
-
The
JsonPulsarHeaderMapper映射头信息为 JSON 以支持丰富的头信息类型,当 Jackson JSON 库在类路径上时这是默认值。 -
The
ToStringPulsarHeaderMapper使用 header 值的toString()方法将 headers 映射为字符串,是备用的映射器。
5.4.1. JSON Header Mapper
The JsonPulsarHeaderMapper uses a “special” header (with a key of spring_json_header_types) that contains a JSON map of <key>:<type>.
This header is used on the inbound side (Pulsar → Spring) to provide appropriate conversion of each header value to the original type.
可信赖的软件包
默认情况下,JSON 映射器会反序列化所有包中的类。
但是,如果您接收到来自不可信来源的消息,可能希望通过您提供并自定义配置的 ObjectMapper 对象上的 packageNames 属性,仅添加您信任的包。
ToString 类
某些类型不适合JSON序列化,对于这些类型,可能更倾向于使用简单的toString()序列化。
JsonPulsarHeaderMapper有一个名为addToStringClasses()的属性,可让您为此类在出站映射中指定应如何处理的类名。
在入站映射中,它们将作为String进行映射。
默认情况下,仅org.springframework.util.MimeType和org.springframework.http.MediaType会这样处理。
自定义对象映射器
JSON 映射器使用合理配置的 Jackson 2 ObjectMapper 来处理标头值的序列化。
然而,要提供一个自定义对象映射器,只需提供一个具有名称 pulsarHeaderObjectMapper 的 ObjectMapper 杯具。
例如:
@Configuration(proxyBeanMethods = false)
static class PulsarHeadersCustomObjectMapperTestConfig {
@Bean(name = "pulsarHeaderObjectMapper")
ObjectMapper customObjectMapper() {
var objectMapper = new ObjectMapper();
// do things with your special header object mapper here
return objectMapper;
}
}
示例中的对象映射器应该是一个 com.fasterxml.jackson.databind.ObjectMapper 的实例,而不是 被 shade 的 org.apache.pulsar.shade.com.fasterxml.jackson.databind.ObjectMapper。 |
| 相同的 限制 适用于 Jackson 2 与 Jackson 3 的情况也适用于此} |
5.5. 入站/出站模式
在入站侧, 默认情况下,所有 Pulsar 头(消息元数据加上用户属性)都会被映射到 MessageHeaders。
在出站侧, 默认情况下,所有 MessageHeaders 都会被映射,除了 id、timestamp 以及代表 Pulsar 消息元数据的头(即以 pulsar_message_ 前缀的头)。
你可以通过在你提供的 mapper bean 上配置 inboundPatterns 和 outboundPatterns 来指定哪些头被映射到入站和出站消息。
你可以通过将精确的头名称添加到 outboundPatterns 来在出站消息中包含 Pulsar 消息元数据头,模式不支持这些元数据头的模式匹配。
模式很简单,可以包含前导通配符(*)、后导通配符或两者兼具(例如,*.cat.*)。
你可以通过在模式前加上 ! 来否定模式。
第一个匹配到头名称的模式(无论是正向还是负向)将获胜。
当您提供自己的模式时,我们建议包含!id和!timestamp,因为这些标头在入站侧是只读的。 |
6. 消息确认
当你使用 Spring 与 Apache Pulsar 一起使用时,消息确认由框架处理,除非应用程序选择退出。 在本节中,我们将详细介绍框架如何处理消息确认。
6.1. 信息确认(ACK)模式
Spring for Apache Pulsar 提供了以下方式来确认消息:
-
BATCH -
RECORD -
MANUAL
BATCH acknowledgment 模式为默认,但你可以在消息监听器容器中更改它。
在以下部分中,我们将看到在使用单个和批量版本的 PulsarListener 时 acknowledgment 的工作方式,以及它们如何转换到底层的消息监听器容器(最终转换到 Pulsar 消费者)。
6.2. 单记录模式下的自动消息确认
让我们重新审视我们基于基本单消息的 PulsarListener:
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(String message) {
System.out.println("Message Received: " + message);
}
自然而然地会产生疑问,当你使用 PulsarListener 时消息确认机制是如何工作的,尤其是在你熟悉直接使用 Pulsar 消费者的情况下。
答案在于消息监听器容器,它是 Spring 中处理 Apache Pulsar 的核心位置,负责协调所有与消费者相关的活动。
假设你没有覆盖默认行为,当你使用前面的 PulsarListener 时,实际上会发生的情况如下:
-
首先,监听器容器从Pulsar消费者以批处理方式接收消息。
-
接收到的消息会逐条传递到
PulsarListener。 -
当所有记录都传递到监听器方法并成功处理后,容器会确认原始批次的所有消息。
这是正常流程。如果原始批次中的任何记录抛出异常,Spring for Apache Pulsar 会分别跟踪这些记录。
当批次中的所有记录都处理完成后,Spring for Apache Pulsar 会确认所有成功的消息,并对失败的消息进行否定确认(nack)。
换句话说,当使用单条记录消费(使用 PulsarRecordMessageListener)且采用默认的 BATCH 确认模式时,框架会等待通过 batchReceive 调用接收到的所有记录全部处理成功后,再在 Pulsar 消费者上调用 acknowledge 方法。
如果在调用处理方法时某条特定记录抛出异常,Spring for Apache Pulsar 会跟踪这些记录,并在整个批次处理完成后分别对这些记录调用 negativeAcknowledge。
如果应用程序希望按记录进行确认或否定确认,可以启用RECORD确认模式。
在这种情况下,在处理每个记录后,如果没有错误则确认消息,如果出现错误则进行否定确认。
以下示例在Pulsar监听器上启用RECORD确认模式:
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar", ackMode = AckMode.RECORD)
public void listen(String message) {
System.out.println("Message Received: " + message);
}
6.3. 单条记录模式下的手动消息确认
你可能并不总希望框架发送确认,而是由应用程序直接处理。Spring for Apache Pulsar 提供了启用手动消息确认的几种方式。以下示例展示了其中一种:
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar", ackMode = AckMode.MANUAL)
public void listen(Message<String> message, Acknowledgment acknowledgment) {
System.out.println("Message Received: " + message.getValue());
acknowledgment.acknowledge();
}
这里有一些需要解释的事项。首先,我们通过将 ackMode 设置为 PulsarListener 来启用手动确认模式。
在启用手动确认模式时,Spring for Apache Pulsar 会允许应用程序注入一个 Acknowledgment 对象。
该框架通过选择一个兼容的消息监听容器:PulsarAcknowledgingMessageListener 用于基于单条记录的消费,这将使你能够访问到一个 Acknowledgment 对象。
The Acknowledgment 对象提供以下 API 方法:
void acknowledge();
void acknowledge(MessageId messageId);
void acknowledge(List<MessageId> messageIds);
void nack();
void nack(MessageId messageId);
在使用 MANUAL 确认模式时,您可以将此 Acknowledgment 对象注入到您的 PulsarListener 中,然后调用相应的方法之一。
在前面的 PulsarListener 示例中,我们调用了一个无参数的 acknowledge 方法。这是因为在框架知道它当前正在运行的 Message 环境下。acknowledge() 调用时不需要通过 Message 包装器接收负载,而应使用目标类型——String,在这个例子中就是如此。你也可以通过提供消息ID:acknowledge.acknowledge(message.getMessageId()); 来调用不同的 acknowledge 变体。当你使用 acknowledge(messageId) 时,必须通过 Message<?> 包装器来接收负载。
与确认类似,Acknowledgment API 还提供了对负面确认的选择。参见前面显示的 nack 方法。
您也可以直接在 Pulsar 消费者上调用 acknowledge:
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar", ackMode = AckMode.MANUAL)
public void listen(Message<String> message, Consumer<String> consumer) {
System.out.println("Message Received: " + message.getValue());
try {
consumer.acknowledge(message);
}
catch (Exception e) {
....
}
}
当直接在底层消费者上调用 acknowledge 时,需要自己进行错误处理。
使用 Acknowledgment 则不需要这样做,因为框架可以为您完成。
因此,当使用手动确认时,您应该使用 Acknowledgment 对象方法。
| 在使用手动确认时,理解框架完全不会进行任何确认是非常重要的。 因此,在设计应用程序时,仔细考虑正确的确认策略至关重要。 |
6.4. 批量消费中的自动消息确认
当您以批处理方式消费记录时(参见“消息ACK模式”),如果使用默认的ack模式BATCH,则在成功处理整个批次后,会确认整个批次。
如果任何记录抛出异常,则整个批次将被负面确认。
请注意,这可能不是生产者端批处理的同一个批次。相反,这是从调用消费者上的batchReceive返回的批次。
考虑以下批处理侦听器:<br/>
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar", batch = true)
public void batchListen(List<Foo> messages) {
for (Foo foo : messages) {
...
}
}
当传入集合中的所有消息(在此示例中为 messages)处理完毕后,框架会确认全部消息。
批量模式下消费时,RECORD不是允许的确认模式。这可能会导致一个问题,因为应用程序可能不希望整个批次再次被重新传递。在这种情况下,您需要使用MANUAL确认模式。
6.5. 批量消费中的手动消息确认
如前一节所示,当在消息监听器容器上设置MANUAL确认模式时,框架不会进行任何确认(无论是正面还是负面)。这完全取决于应用程序来处理此类问题。当设置MANUAL确认模式时,Spring for Apache Pulsar会选择一个兼容的消息监听器容器:PulsarBatchAcknowledgingMessageListener用于批量消费,从而为您提供访问Acknowledgment对象的机会。以下是AcknowledgmentAPI中可用的方法:
void acknowledge();
void acknowledge(MessageId messageId);
void acknowledge(List<MessageId> messageIds);
void nack();
void nack(MessageId messageId);
在使用 MANUAL 确认模式时,您可以将此 Acknowledgment 对象注入到您的 PulsarListener 中。下面的列表显示了基于批次的监听器的基本示例:
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(List<Message<String>> messgaes, Acknowlegement acknowledgment) {
for (Message<String> message : messages) {
try {
...
acknowledgment.acknowledge(message.getMessageId());
}
catch (Exception e) {
acknowledgment.nack(message.getMessageId());
}
}
}
使用批处理监听器时,消息监听器容器无法知道它当前正在操作的记录。
因此,要手动确认,您需要使用其中一个重载的acknowledge方法,该方法接受一个MessageId或List<MessageId>。
对于批处理监听器,您也可以用MessageId进行负面确认。
7. 消息重传和错误处理
在了解了 PulsarListener 和消息监听容器基础设施及其各种功能之后,让我们现在尝试理解消息重新投递和错误处理。
Apache Pulsar 提供了多种原生的消息重新投递和错误处理策略。我们来看一下它们,并了解如何通过 Spring for Apache Pulsar 使用这些策略。
7.1. 为消息重传指定确认超时
默认情况下,除非消费者崩溃,否则Pulsar消费者不会重新传递消息。但是可以通过在Pulsar消费者上设置确认超时来更改此行为。<br/>如果确认超时属性的值大于零,并且Pulsar消费者在此超时期间内未确认消息,则会重新传递该消息。
使用 Apache Pulsar 的 Spring 时,可以通过 消费者自定义程序 设置此属性,也可以在 @PulsarListener 的 properties 属性中使用原生 Pulsar ackTimeoutMillis 属性:
@PulsarListener(subscriptionName = "subscription-1", topics = "topic-1"
properties = {"ackTimeoutMillis=60000"})
public void listen(String s) {
...
}
指定确认超时时间后,如果消费者在60秒内未发送确认,则Pulsar会将消息重新投递给消费者。
如果您希望为确认超时设置一些高级重试选项,并指定不同的延迟,您可以执行以下操作:<br>
@EnablePulsar
@Configuration
class AckTimeoutRedeliveryConfig {
@PulsarListener(subscriptionName = "withAckTimeoutRedeliveryBackoffSubscription",
topics = "withAckTimeoutRedeliveryBackoff-test-topic",
ackTimeoutRedeliveryBackoff = "ackTimeoutRedeliveryBackoff",
properties = { "ackTimeoutMillis=60000" })
void listen(String msg) {
// some long-running process that may cause an ack timeout
}
@Bean
RedeliveryBackoff ackTimeoutRedeliveryBackoff() {
return MultiplierRedeliveryBackoff.builder().minDelayMs(1000).maxDelayMs(10 * 1000).multiplier(2)
.build();
}
}
在前面的示例中,我们为 Pulsar 的 RedeliveryBackoff 指定一个 bean,最小延迟为 1 秒,最大延迟为 10 秒,并且退避倍数为 2。
初始确认超时发生后,消息重传通过此退避 bean 进行控制。
我们将退避 bean 提供给 PulsarListener 注解,方法是将 ackTimeoutRedeliveryBackoff 属性设置为实际的 bean 名称 —— 在本例中即为 ackTimeoutRedeliveryBackoff。
7.2. 指定否定确认重传
当负面确认时,Pulsar 消费者允许您指定应用程序希望如何重新传递消息。
默认情况下,会在一分钟内重新传递消息,但您可以使用消费者自定义程序或原生 Pulsar negativeAckRedeliveryDelay 属性来更改它,在 @PulsarListener 的 properties 属性中设置 negativeAckRedeliveryDelay:
@PulsarListener(subscriptionName = "subscription-1", topics = "topic-1"
properties = {"negativeAckRedeliveryDelay=10ms"})
public void listen(String s) {
...
}
您还可以通过提供一个 RedeliveryBackoff Bean 并在 PulsarProducer 上将 Bean 名称作为 negativeAckRedeliveryBackoff 属性来指定不同的延迟和退避机制以及倍数,如下所示:
@EnablePulsar
@Configuration
class NegativeAckRedeliveryConfig {
@PulsarListener(subscriptionName = "withNegRedeliveryBackoffSubscription",
topics = "withNegRedeliveryBackoff-test-topic", negativeAckRedeliveryBackoff = "redeliveryBackoff",
subscriptionType = SubscriptionType.Shared)
void listen(String msg) {
throw new RuntimeException("fail " + msg);
}
@Bean
RedeliveryBackoff redeliveryBackoff() {
return MultiplierRedeliveryBackoff.builder().minDelayMs(1000).maxDelayMs(10 * 1000).multiplier(2)
.build();
}
}
7.3. 使用来自 Apache Pulsar 的死信主题进行消息重传和错误处理
Apache Pulsar 允许应用程序在使用 Shared 订阅类型的消费者时,使用死信主题。Exclusive 和 Failover 订阅类型不支持此功能。基本思想是,如果消息重试次数达到一定次数(可能是由于确认超时或否定确认重新交付),一旦重试次数用尽,该消息可以发送到一个特殊的主题,称为死信队列 (DLQ)。通过检查一些代码片段,让我们了解此功能的一些细节:
@EnablePulsar
@Configuration
class DeadLetterPolicyConfig {
@PulsarListener(id = "deadLetterPolicyListener", subscriptionName = "deadLetterPolicySubscription",
topics = "topic-with-dlp", deadLetterPolicy = "deadLetterPolicy",
subscriptionType = SubscriptionType.Shared, properties = { "ackTimeoutMillis=1000" })
void listen(String msg) {
throw new RuntimeException("fail " + msg);
}
@PulsarListener(id = "dlqListener", topics = "my-dlq-topic")
void listenDlq(String msg) {
System.out.println("From DLQ: " + msg);
}
@Bean
DeadLetterPolicy deadLetterPolicy() {
return DeadLetterPolicy.builder().maxRedeliverCount(10).deadLetterTopic("my-dlq-topic").build();
}
}
首先,我们有一个特殊的 bean 对应DeadLetterPolicy,它的名字是deadLetterPolicy(你可以随意命名)。此 Bean 指定了许多事项,例如最大交付次数(本例中为 10)以及死信主题的名称 —— my-dlq-topic。如果您不指定死信队列(DLQ)主题名称,则在 Pulsar 中默认为 <topicname>-<subscriptionname>-DLQ。下一步,我们将此bean名称通过设置deadLetterPolicy属性提供给PulsarListener。请注意,PulsarListener 的订阅类型为 Shared,因为死信队列(DLQ)功能仅与共享订阅一起使用。
此代码主要用于演示目的,因此我们提供一个 ackTimeoutMillis 值为 1000。
DeadLetterPolicy中的最大重试次数),Pulsar消费者会将消息发布到死信队列主题。我们还有另一个 PulsarListener 监听死信队列主题,以便在数据发布到死信队列主题时接收数据。
7.4. Spring for Apache Pulsar 中的原生错误处理
正如我们前面提到的,Apache Pulsar中的死信队列(DLQ)功能仅适用于共享订阅。
如果应用程序需要为非共享订阅使用类似的特性,该怎么办呢?
Pulsar不支持独占和故障转移订阅类型的DLQ的主要原因是这些订阅类型保证了消息顺序。
允许重新投递、DLQ等操作实际上会接收无序的消息。
但是,如果应用程序可以接受这一点,并且更重要的是,需要为非共享订阅提供DLQ功能怎么办呢?
为此,Spring for Apache Pulsar提供了PulsarConsumerErrorHandler,您可以将其用于Pulsar中的任何订阅类型:Exclusive、Failover或Shared。Key_Shared。
使用 Spring for Apache Pulsar 的 PulsarConsumerErrorHandler 时,请确保不要在监听器上设置确认超时属性。
让我们通过检查一些代码片段来了解一些细节:
@EnablePulsar
@Configuration
class PulsarConsumerErrorHandlerConfig {
@Bean
PulsarConsumerErrorHandler<String> pulsarConsumerErrorHandler(
PulsarTemplate<String> pulsarTemplate) {
return new DefaultPulsarConsumerErrorHandler<>(
new PulsarDeadLetterPublishingRecoverer<>(pulsarTemplate, (c, m) -> "my-foo-dlt"), new FixedBackOff(100, 10));
}
@PulsarListener(id = "pulsarConsumerErrorHandler-id", subscriptionName = "pulsatConsumerErrorHandler-subscription",
topics = "pulsarConsumerErrorHandler-topic",
pulsarConsumerErrorHandler = "pulsarConsumerErrorHandler")
void listen(String msg) {
throw new RuntimeException("fail " + msg);
}
@PulsarListener(id = "pceh-dltListener", topics = "my-foo-dlt")
void listenDlt(String msg) {
System.out.println("From DLT: " + msg);
}
}
考虑 pulsarConsumerErrorHandler bean。
这会创建一个类型为 PulsarConsumerErrorHandler 的bean,并使用Spring提供的Apache Pulsar默认实现:DefaultPulsarConsumerErrorHandler。DefaultPulsarConsumerErrorHandler有一个构造函数,接受PulsarMessageRecovererFactory和org.springframework.util.backoff.Backoff。PulsarMessageRecovererFactory是一个具有以下API的功能接口:
@FunctionalInterface
public interface PulsarMessageRecovererFactory<T> {
/**
* Provides a message recoverer {@link PulsarMessageRecoverer}.
* @param consumer Pulsar consumer
* @return {@link PulsarMessageRecoverer}.
*/
PulsarMessageRecoverer<T> recovererForConsumer(Consumer<T> consumer);
}
该 recovererForConsumer 方法接受一个 Pulsar 消费者并返回一个 PulsarMessageRecoverer,这是另一个函数式接口。
以下是 PulsarMessageRecoverer 的 API:
public interface PulsarMessageRecoverer<T> {
/**
* Recover a failed message, for e.g. send the message to a DLT.
* @param message Pulsar message
* @param exception exception from failed message
*/
void recoverMessage(Message<T> message, Exception exception);
}
SprPulsarMessageRecovererFactoryfor Apache Pulsar 提供了一个被称为 PulsarDeadLetterPublishingRecoverer 的实现,该实现在消息发送到死信主题(DLT)时提供了默认的恢复机制。
我们将此实现提供给前面提到的 DefaultPulsarConsumerErrorHandler 的构造函数。
作为第二个参数,我们提供了一个 FixedBackOff。
您还可以从 Spring 中提供的 ExponentialBackoff 来获取高级退避功能。
然后我们将这个 Bean 名称作为属性提供给 PulsarConsumerErrorHandler,并将其称为 PulsarListener。
每个时间当 PulsarListener 方法失败处理一条消息时,它都会被重试。
重试次数由提供的 Backoff 实现值控制。在我们的示例中,我们进行了 10 次重试(总共 11 次尝试——第一次和随后的 10 次重试)。
一旦所有重试都用尽了,消息就会被发送到 DLT 主题。
我们提供的PulsarDeadLetterPublishingRecoverer实现使用了PulsarTemplate,该PulsarTemplate用于将消息发布到DLT。
在大多数情况下,Spring Boot自动配置的PulsarTemplate就足够了,但需要注意分区主题的情况。
当使用分区主题并对主主题使用自定义消息路由时,必须使用不同的PulsarTemplate,而不是采用自动配置的PulsarProducerFactory,因为PulsarProducerFactory被填充了一个值为custompartition的message-routing-mode。
您可以使用具有以下蓝图的PulsarConsumerErrorHandler:
@Bean
PulsarConsumerErrorHandler<Integer> pulsarConsumerErrorHandler(PulsarClient pulsarClient) {
PulsarProducerFactory<Integer> pulsarProducerFactory = new DefaultPulsarProducerFactory<>(pulsarClient, Map.of());
PulsarTemplate<Integer> pulsarTemplate = new PulsarTemplate<>(pulsarProducerFactory);
BiFunction<Consumer<?>, Message<?>, String> destinationResolver =
(c, m) -> "my-foo-dlt";
PulsarDeadLetterPublishingRecoverer<Integer> pulsarDeadLetterPublishingRecoverer =
new PulsarDeadLetterPublishingRecoverer<>(pulsarTemplate, destinationResolver);
return new DefaultPulsarConsumerErrorHandler<>(pulsarDeadLetterPublishingRecoverer,
new FixedBackOff(100, 5));
}
请注意,我们将目标解析器提供给PulsarDeadLetterPublishingRecoverer作为第二个构造函数参数。如果没有提供,则使用PulsarDeadLetterPublishingRecoverer作为DLT主题名称。<subscription-name>-<topic-name>-DLT>当使用此功能时,应通过设置目标解析器来使用适当的目标名称,而不是使用默认值。
在使用单条记录消息监听器时,如我们之前对PulsarConsumerErrorHnadler所做的那样,并且如果您使用手动确认,则确保当抛出异常时不要对消息进行负面确认。相反,应将异常重新抛回容器。否则,容器会认为该消息已单独处理,错误处理不会被触发。
最后,我们有一个接收来自DLT主题消息的第二个PulsarListener。
到目前为止,本节提供的示例中,我们仅了解了如何使用PulsarConsumerErrorHandler与单条记录消息监听器。
接下来,我们将了解如何在批处理监听器上使用它。
7.5. 使用 PulsarConsumerErrorHandler 的批处理监听器
首先,让我们看一个批量PulsarListener方法:
@PulsarListener(subscriptionName = "batch-demo-5-sub", topics = "batch-demo-4", batch = true, concurrency = "3",
subscriptionType = SubscriptionType.Failover,
pulsarConsumerErrorHandler = "pulsarConsumerErrorHandler", ackMode = AckMode.MANUAL)
void listen(List<Message<Integer>> data, Consumer<Integer> consumer, Acknowledgment acknowledgment) {
for (Message<Integer> datum : data) {
if (datum.getValue() == 5) {
throw new PulsarBatchListenerFailedException("failed", datum);
}
acknowledgement.acknowledge(datum.getMessageId());
}
}
@Bean
PulsarConsumerErrorHandler<String> pulsarConsumerErrorHandler(
PulsarTemplate<String> pulsarTemplate) {
return new DefaultPulsarConsumerErrorHandler<>(
new PulsarDeadLetterPublishingRecoverer<>(pulsarTemplate, (c, m) -> "my-foo-dlt"), new FixedBackOff(100, 10));
}
@PulsarListener(subscriptionName = "my-dlt-subscription", topics = "my-foo-dlt")
void dltReceiver(Message<Integer> message) {
System.out.println("DLT - RECEIVED: " + message.getValue());
}
再次,我们使用pulsarConsumerErrorHandler属性并提供PulsarConsumerErrorHandlerbean名称。当您使用批处理监听器(如前面的例子所示)并且想要使用Spring Apache Pulsar中的PulsarConsumerErrorHandler时,需要使用手动确认机制。这样,您可以确认所有成功接收的单个消息。对于失败的消息,必须抛出一个带有该消息信息的PulsarBatchListenerFailedException异常。如果没有这个异常,框架将不知道如何处理失败情况。在重试时,容器会向监听器发送一个新的消息批次,并从失败的消息开始。如果它再次失败,则继续重试,直到耗尽重试次数为止,此时消息会被发送到死信队列(DLT)。在此阶段,容器会确认消息,并将后续消息传递给监听器。
8. 对 PulsarListener 的消费者自定义
Spring for Apache Pulsar 提供了一种方便的方法,用于自定义容器使用的消费者创建。
应用程序可以提供一个 PulsarListenerConsumerBuilderCustomizer 的 bean。
这里是一个例子。
@Bean
public PulsarListenerConsumerBuilderCustomizer<String> myCustomizer() {
return cb -> {
cb.subscriptionName("modified-subscription-name");
};
}
然后,可以将此自定义器的 bean 名称作为属性提供给 PuslarListener 注解,如下所示。
@PulsarListener(subscriptionName = "my-subscription",
topics = "my-topic", consumerCustomizer = "myCustomizer")
void listen(String message) {
}
该框架通过PulsarListener检测提供的bean,并在创建Pulsar消费者之前,将此自定义程序应用于Consumer构建器。
如果您有多个PulsarListener方法,并且每个方法都有不同的自定义规则,那么您应该创建多个定制器bean,并将适当的定制器附加到每个PulsarListener上。
9. Message Listener Container 生命周期
9.1. 暂停与恢复
在某些情况下,应用程序可能需要暂时暂停消息消费,然后稍后恢复。<br/>Spring for Apache Pulsar 提供了暂停和恢复底层消息监听容器的功能。<br/>当 Pulsar 消息监听容器被暂停时,容器用于从 Pulsar 消费者接收数据的任何轮询都将被暂停。<br/>同样地,当容器恢复时,如果主题在此期间有新记录添加,则下次轮询将开始返回数据。
要暂停或恢复侦听器容器,请首先通过PulsarListenerEndpointRegistry Bean 获取容器实例,然后在容器实例上调用暂停/恢复API - 如下面代码片段所示:
@Autowired
private PulsarListenerEndpointRegistry registry;
void someMethod() {
PulsarMessageListenerContainer container = registry.getListenerContainer("my-listener-id");
container.pause();
}
传递给getListenerContainer的id参数是容器id - 当暂停/恢复@PulsarListener时,这将是@PulsarListener id属性的值。 |
10. Pulsar 阅读器支持
该框架提供通过 Pulsar Reader 使用 PulsarReaderFactory 的支持。
Spring Boot 提供了此读取器工厂,您可以进一步通过指定任何 spring.pulsar.reader.* 应用程序属性来配置它。
10.1. PulsarReader 注解
虽然可以直接使用PulsarReaderFactory,但Spring for Apache Pulsar提供了PulsarReader注解,您可以通过它快速从主题中读取数据而无需自己设置任何阅读器工厂。
这与PulsarListener.背后的想法相似
这里是一个简单的例子。
@PulsarReader(id = "reader-demo-id", topics = "reader-demo-topic", startMessageId = "earliest")
void read(String message) {
//...
}
属性id是可选的,但最好提供一个对您的应用程序有意义的值。如果不指定,则将使用自动生成的ID。
另一方面,topics和startMessageId属性是必需的。
属性topics可以是一个主题或逗号分隔的主题列表。
属性startMessageId指示读者从主题中的特定消息开始。
属性startMessageId的有效值为earliest或latest.。
假设您希望读者从除最早或最新可用消息之外的任何位置开始读取消息,则需要使用ReaderBuilderCustomizer来定制ReaderBuilder,以便它知道正确的起始MessageId。
10.2. 自定义 ReaderBuilder
您可以使用 Spring for Apache Pulsar 中的 PulsarReaderReaderBuilderCustomizer 来自定义通过 ReaderBuilder 提供的任何字段。 您可以提供类型为 PulsarReaderReaderBuilderCustomizer 的 @Bean,然后如下所示将其提供给 PulsarReader。
@PulsarReader(id = "reader-customizer-demo-id", topics = "reader-customizer-demo-topic",
readerCustomizer = "myCustomizer")
void read(String message) {
//...
}
@Bean
public PulsarReaderReaderBuilderCustomizer<String> myCustomizer() {
return readerBuilder -> {
readerBuilder.startMessageId(messageId); // the first message read is after this message id.
// Any other customizations on the readerBuilder
};
}
如果您的应用程序只注册了一个@PulsarReader和一个PulsarReaderReaderBuilderCustomizer的bean,那么自定义器将自动应用。 |
10.3. 处理启动失败
消息监听容器在应用上下文刷新时启动。默认情况下,启动期间遇到的任何失败都会重新抛出,应用程序将无法启动。您可以使用相应容器属性上的StartupFailurePolicy调整此行为。
可用选项为:
-
Stop(默认)- 记录异常并重新抛出,实际上会停止应用程序 -
Continue- 记录异常,使容器处于非运行状态,但不停止应用程序 -
Retry- 记录异常,异步重试启动容器,但不停止应用程序。
默认重试行为是进行三次重试,每次尝试之间间隔10秒。<br/>但是可以在相应的容器属性上指定自定义重试模板。<br/>如果在重试次数用尽后容器仍无法重启,则会将其置于非运行状态。