消息消费
1. @ReactivePulsarListener
当涉及到Pulsar消费者时,我们建议端用户应用程序使用ReactivePulsarListener注解。
要使用ReactivePulsarListener,需要使用@EnableReactivePulsar注解。
当使用Spring Boot支持时,它会自动启用此注解,并配置所有必要的组件,例如消息监听器基础设施(负责创建底层的Pulsar消费者)。
让我们重新审视快速浏览部分中看到的ReactivePulsarListener代码片段:
@ReactivePulsarListener(subscriptionName = "hello-pulsar-sub", topics = "hello-pulsar-topic")
Mono<Void> listen(String message) {
System.out.println(message);
return Mono.empty();
}
监听器方法返回一个Mono<Void>以指示消息是否成功处理。Mono.empty()表示成功(确认),Mono.error()表示失败(否定确认)。 |
您也可以进一步简化此方法:
@ReactivePulsarListener
Mono<Void> listen(String message) {
System.out.println(message);
return Mono.empty();
}
在这种最简单的情况下,当topics没有直接提供时,会使用一个主题解析过程来确定目标主题。
同样,当在@ReactivePulsarListener注解上没有提供subscriptionName时,将使用自动生成的订阅名称。
在前面所示的ReactivePulsarListener方法中,我们接收数据为String,但我们并未指定任何模式类型。
内部上,框架依赖Pulsar的模式机制将数据转换为所需类型。
框架会检测到您期望的String类型,并根据该信息推断schema类型并提供给消费者。
该推断过程适用于所有原始类型。
对于所有非原始类型,默认schema类型假设为JSON。
如果复杂类型使用除JSON(如AVRO或KEY_VALUE)以外的格式,您必须在注解上使用schemaType属性提供schema类型。
这个示例展示了我们如何从主题消费复杂类型:
@ReactivePulsarListener(topics = "my-topic-2", schemaType = SchemaType.JSON)
Mono<Void> listen(Foo message) {
System.out.println(message);
return Mono.empty();
}
让我们再看一下一些其他方式,我们可以通过这些方式来消费。
这个示例直接消费Pulsar消息:
@ReactivePulsarListener(topics = "my-topic")
Mono<Void> listen(org.apache.pulsar.client.api.Message<String> message) {
System.out.println(message.getValue());
return Mono.empty();
}
这个示例消费了一个被Spring消息封装的记录:
@ReactivePulsarListener(topics = "my-topic")
Mono<Void> listen(org.springframework.messaging.Message<String> message) {
System.out.println(message.getPayload());
return Mono.empty();
}
1.1. 流式传输
以上都是按单条记录逐个消费的示例。 然而,使用响应式编程的一个重要原因是为了具备流式处理能力,并支持背压。
以下示例使用ReactivePulsarListener来消费POJO的流:
@ReactivePulsarListener(topics = "streaming-1", stream = true)
Flux<MessageResult<Void>> listen(Flux<org.apache.pulsar.client.api.Message<String>> messages) {
return messages
.doOnNext((msg) -> System.out.println("Received: " + msg.getValue()))
.map(MessageResult::acknowledge);
}
这里我们接收记录作为Pulsar消息的Flux。
此外,要启用在ReactivePulsarListener级别的流消费,需要将注解上的stream属性设置为true。
监听器方法返回一个Flux<MessageResult<Void>>,其中每个元素代表一个已处理的消息,并包含消息id、值以及是否被确认。
MessageResult提供了一组静态工厂方法,可用于创建适当的MessageResult实例。 |
根据中消息的实际类型,框架会尝试推断要使用的模式。
如果包含复杂类型,你仍然需要在ReactivePulsarListener上提供schemaType。
以下监听器使用带有复杂类型的 Spring 消息 Message 包裹:
@ReactivePulsarListener(topics = "streaming-2", stream = true, schemaType = SchemaType.JSON)
Flux<MessageResult<Void>> listen2(Flux<org.springframework.messaging.Message<Foo>> messages) {
return messages
.doOnNext((msg) -> System.out.println("Received: " + msg.getPayload()))
.map(MessageUtils::acknowledge);
}
监听器方法返回一个Flux<MessageResult<Void>>,其中每个元素代表一条已处理的消息,包含消息ID、值以及是否被确认。
Spring MessageUtils 提供了一组静态工厂方法,可用于从Spring消息创建适当的MessageResult实例。
MessageUtils 为Spring消息提供与MessagResult上为Pulsar消息提供的工厂方法相同的功能。 |
不支持在 1 中使用 org.apache.pulsar.client.api.Messages<T> |
1.2. 配置 - 应用属性
监听器依赖 ReactivePulsarConsumerFactory 来创建和管理其用于消费消息的底层 Pulsar consumer。
Spring Boot 提供了这种 consumer 工厂,你可以通过指定 spring.pulsar.consumer.* 应用属性进一步配置。
1.3. 通用记录与 AUTO_CONSUME
如果无法提前知道Pulsar主题的模式类型,可以使用AUTO_CONSUME模式类型来消费通用记录。
在这种情况下,主题会使用与该主题关联的模式信息,将消息反序列化为GenericRecord个对象。
要消费泛型记录,请在您的schemaType = SchemaType.AUTO_CONSUME上设置@ReactivePulsarListener,并使用类型为GenericRecord的Pulsar消息作为消息参数,如下所示。
@ReactivePulsarListener(topics = "my-generic-topic", schemaType = SchemaType.AUTO_CONSUME)
Mono<Void> listen(org.apache.pulsar.client.api.Message<GenericRecord> message) {
GenericRecord record = message.getValue();
record.getFields().forEach((f) ->
System.out.printf("%s = %s%n", f.getName(), record.getField(f)));
return Mono.empty();
}
The GenericRecord API 允许访问字段及其关联的值 |
1.4. 消费者自定义
你可以指定一个 ReactivePulsarListenerMessageConsumerBuilderCustomizer 来配置底层 Pulsar consumer builder,该 builder 最终构建 listener 用于接收消息所使用的消费者。
谨慎使用,因为这会提供对消费者构建器的完全访问权限,调用其某些方法(如 create) 可能会产生未预期的副作用。 |
例如,以下代码展示了如何将订阅的初始位置设置为主题上的最早消息。
@ReactivePulsarListener(topics = "hello-pulsar-topic", consumerCustomizer = "myConsumerCustomizer")
Mono<Void> listen(String message) {
System.out.println(message);
return Mono.empty();
}
@Bean
ReactivePulsarListenerMessageConsumerBuilderCustomizer<String> myConsumerCustomizer() {
return b -> b.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
}
如果您的应用程序只注册了一个@ReactivePulsarListener和一个ReactivePulsarListenerMessageConsumerBuilderCustomizer的bean,那么自定义器将自动应用。 |
您还可以使用自定义程序直接向消费者构建器提供 Pulsar 消费者属性。 这很方便,如果您不想使用前面提到的 Boot 配置属性或有多个配置不同的 ReactivePulsarListener 方法。
以下自定义程序示例使用了直接的 Pulsar 消费者属性:
@Bean
ReactivePulsarListenerMessageConsumerBuilderCustomizer<String> directConsumerPropsCustomizer() {
return b -> b.property("subscriptionName", "subscription-1").property("topicNames", "foo-1");
}
使用的属性是直接的 Pulsar 消费者属性,而不是 spring.pulsar.consumer Spring Boot 配置属性 |
2. 指定模式信息
如前所述,对于 Java 原始类型,Spring for Apache Pulsar 框架可以在 ReactivePulsarListener 上推断出适当的 Schema。
对于非原始类型,如果在注解上未显式指定 Schema,Spring for Apache Pulsar 框架将尝试从类型构建一个 Schema.JSON。
| 支持的复杂模式类型包括:JSON,AVRO,PROTOBUF,AUTO_CONSUME,KEY_VALUE(带 INLINE 编码)。 |
2.1. 自定义模式映射
作为复杂类型的模式指定的替代方案,可以配置模式解析器对类型的映射。这样就无需在监听器上设置模式,因为框架会根据传入消息的类型咨询解析器。
2.1.1. 配置属性
模式映射可以通过spring.pulsar.defaults.type-mappings属性进行配置。
以下示例使用application.yml来为User和Address两个复杂对象添加映射,分别使用AVRO和JSON两个模式:
spring:
pulsar:
defaults:
type-mappings:
- message-type: com.acme.User
schema-info:
schema-type: AVRO
- message-type: com.acme.Address
schema-info:
schema-type: JSON
message-type 是消息类的完全限定名称。 |
2.1.2. 模式解析器自定义器
推荐的方法是通过上述属性来添加映射。 然而,如果需要更多控制,你可以提供一个模式解析器自定义器来添加映射(s)。
以下示例使用了模式解析器自定义器来为User和Address复杂数组使用AVRO和JSON模式进行映射,分别:
@Bean
public SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() {
return (schemaResolver) -> {
schemaResolver.addCustomSchemaMapping(User.class, Schema.AVRO(User.class));
schemaResolver.addCustomSchemaMapping(Address.class, Schema.JSON(Address.class));
}
}
2.1.3. 类型映射注解
另一种为特定消息类型指定默认模式信息的方法是给消息类加上@PulsarMessage注解。
模式信息可以通过在注解上使用schemaType属性进行指定。
以下示例配置系统在生产或消费类型 Foo 的消息时使用 JSON 作为默认模式:
@PulsarMessage(schemaType = SchemaType.JSON)
record Foo(String value) {
}
在进行上述配置后,无需在监听器上设置模式,例如:
@ReactivePulsarListener(topics = "user-topic")
Mono<Void> listen(User user) {
System.out.println(user);
return Mono.empty();
}
3. Message Listener Container 基础设施
在大多数情况下,我们建议直接使用ReactivePulsarListener注解从Pulsar主题中消耗数据,因为该模型涵盖了一大类应用程序用例。但是,了解ReactivePulsarListener如何内部工作非常重要。
当您使用 Spring for Apache Pulsar 时,消息侦听器容器是消息消费的核心。
代码ReactivePulsarListener在后台使用消息侦听器容器基础设施来创建和管理底层 Pulsar 消费者。
3.1. ReactivePulsarMessageListenerContainer
此消息监听器容器的合同通过ReactivePulsarMessageListenerContainer提供,其默认实现创建一个响应式Pulsar消费者,并设置使用所创建的消费者的响应式消息管道。
3.2. 响应式消息管道
管道是底层 Apache Pulsar 响应式客户端的一个特性,它负责以响应方式接收数据,然后将其传递给提供的消息处理器。由于管道处理了大部分工作,因此响应式消息监听容器实现要简单得多。
3.3. ReactivePulsarMessageHandler
\"监听器\"方面由ReactivePulsarMessageHandler提供,其中提供了两种实现:
-
ReactivePulsarOneByOneMessageHandler- 按顺序逐个处理消息 -
ReactivePulsarStreamingHandler- 使用Flux处理多个消息
在直接使用监听器容器时,如果没有指定主题信息,则会采用与 |
4. 并发
在流式模式(stream = true)下消费记录时,由于客户端实现中底层的响应式支持,线程并发会自然产生。
但是,当逐个处理消息时,可以指定并发性以提高处理吞吐量。
只需在@ReactivePulsarListener上设置concurrency属性即可。
此外,当concurrency > 1时,通过在注解上设置useKeyOrderedProcessing = "true"来确保按键对消息进行排序,并因此发送到同一处理程序。
同样,ReactiveMessagePipeline承担了主要工作,我们只需对其设置属性即可。
5. Pulsar 头部
The Pulsar message metadata can be consumed as Spring message headers. The list of available headers can be found in PulsarHeaders.java
5.1. 逐个访问 In OneByOne 监听器
以下示例显示了在使用逐条消息监听器时如何访问Pulsar标头:
@ReactivePulsarListener(topics = "some-topic")
Mono<Void> listen(String data,
@Header(PulsarHeaders.MESSAGE_ID) MessageId messageId,
@Header("foo") String foo) {
System.out.println("Received " + data + " w/ id=" + messageId + " w/ foo=" + foo);
return Mono.empty();
}
在前面的示例中,我们访问了messageId消息元数据以及名为foo的自定义消息属性。Spring@Header注解用于每个报头字段。
您也可以使用 Pulsar 的 Message 作为承载负载的包络。
在这种情况下,用户可以直接在 Pulsar 消息上调用相应的方法来检索元数据。
但是,为了方便,您也可以通过使用 Header 注解来检索。
请注意,您也可以使用 Spring 消息 Message 包络来承载负载,然后通过使用 @Header 来检索 Pulsar 头部。
6. 消息确认
7. 消息重传和错误处理
Apache Pulsar 提供了各种原生的消息重传和错误处理策略。<br>我们将研究这些策略,并了解如何通过 Spring for Apache Pulsar 使用它们。
7.1. acknowledgment 超时
默认情况下,除非消费者崩溃,否则Pulsar消费者不会重新传递消息。但是可以通过在Pulsar消费者上设置确认超时来更改此行为。<br/>如果确认超时属性的值大于零,并且Pulsar消费者在此超时期间内未确认消息,则会重新传递该消息。
您可以直接通过消费者自定义程序将此属性指定为 Pulsar 消费者属性,例如:
@Bean
ReactiveMessageConsumerBuilderCustomizer<String> consumerCustomizer() {
return b -> b.property("ackTimeoutMillis", "60000");
}
7.2. 负确认重传延迟
当负面确认时,Pulsar 消费者允许您指定应用程序希望如何重新传递消息。默认情况下是一分钟内重新发送消息,但您可以使用消费者自定义程序来更改它,例如:
@Bean
ReactiveMessageConsumerBuilderCustomizer<String> consumerCustomizer() {
return b -> b.property("negativeAckRedeliveryDelay", "10ms");
}
7.3. 遗弃邮件主题
Apache Pulsar 允许应用程序在使用 Shared 订阅类型的消费者时,使用死信主题。Exclusive 和 Failover 订阅类型不支持此功能。基本思想是,如果消息重试次数达到一定次数(可能是由于确认超时或否定确认重新交付),一旦重试次数用尽,该消息可以发送到一个特殊的主题,称为死信队列 (DLQ)。通过检查一些代码片段,让我们了解此功能的一些细节:
@Configuration(proxyBeanMethods = false)
class DeadLetterPolicyConfig {
@ReactivePulsarListener(
topics = "topic-with-dlp",
subscriptionType = SubscriptionType.Shared,
deadLetterPolicy = "myDeadLetterPolicy",
consumerCustomizer = "ackTimeoutCustomizer" )
void listen(String msg) {
throw new RuntimeException("fail " + msg);
}
@ReactivePulsarListener(topics = "my-dlq-topic")
void listenDlq(String msg) {
System.out.println("From DLQ: " + msg);
}
@Bean
DeadLetterPolicy myDeadLetterPolicy() {
return DeadLetterPolicy.builder().maxRedeliverCount(10).deadLetterTopic("my-dlq-topic").build();
}
@Bean
ReactiveMessageConsumerBuilderCustomizer<String> ackTimeoutCustomizer() {
return b -> b.property("ackTimeoutMillis", "1000");
}
}
首先,我们有一个特殊的 bean 对应DeadLetterPolicy,它的名字是deadLetterPolicy(你可以随意命名)。此 Bean 指定了许多事项,例如最大交付次数(本例中为 10)以及死信主题的名称 —— my-dlq-topic。如果您不指定死信队列(DLQ)主题名称,则在 Pulsar 中默认为 <topicname>-<subscriptionname>-DLQ。下一步,我们将此bean名称通过设置deadLetterPolicy属性提供给ReactivePulsarListener。请注意,ReactivePulsarListener 的订阅类型为 Shared,因为死信队列(DLQ)功能仅与共享订阅一起使用。
此代码主要用于演示目的,因此我们提供一个 ackTimeoutMillis 值为 1000。
DeadLetterPolicy中的最大重试次数),Pulsar消费者会将消息发布到死信队列主题。我们还有另一个 ReactivePulsarListener 监听死信队列主题,以便在数据发布到死信队列主题时接收数据。
8. Pulsar 阅读器支持
该框架通过ReactivePulsarReaderFactory提供了以响应式方式使用Pulsar Reader的支持。
Spring Boot 提供了此阅读器工厂,可以使用任何spring.pulsar.reader.*应用程序属性进行配置。