消息 生产
1. ReactivePulsarTemplate
在Pulsar生产者一侧,Spring Boot自动配置提供一个ReactivePulsarTemplate用于发布记录。该模板实现了一个名为ReactivePulsarOperations的接口,并通过其契约提供了发布记录的方法。
该模板提供了接受单条消息并返回一个 Mono<MessageId> 的 send 方法。
它还提供了接受多条消息(以 ReactiveStreams 的 Publisher 类型形式提供)并返回一个 Flux<MessageId> 的 send 方法。
| 对于不包含主题参数的API变体,会使用主题解析过程来确定目标主题。 |
1.1. 灵活API
该模板提供一个流畅构建器来处理更复杂的发送请求。
1.2. 消息自定义
您可以指定一个 MessageSpecBuilderCustomizer 来配置传出消息。例如,以下代码展示了如何发送带密钥的消息:
template.newMessage(msg)
.withMessageCustomizer((mc) -> mc.key("foo-msg-key"))
.send();
1.3. 发送方自定义
你可以通过指定一个 ReactiveMessageSenderBuilderCustomizer 来配置底层的 Pulsar 发送器构建器,该构建器最终会构建用于发送传出消息的发送器。
谨慎使用,因为这会给予发送器构建器完整的访问权限,调用其某些方法(如 create) 可能会产生意外的副作用。 |
例如,以下代码展示了如何禁用批处理并启用分块:
template.newMessage(msg)
.withSenderCustomizer((sc) -> sc.enableChunking(true).enableBatching(false))
.send();
这个其他示例展示了在发布记录到分区主题时如何使用自定义路由。
在发送器构建器上指定您的自定义MessageRouter实现,例如:
template.newMessage(msg)
.withSenderCustomizer((sc) -> sc.messageRouter(messageRouter))
.send();
注意,当使用一个 MessageRouter 时,spring.pulsar.producer.message-routing-mode 的唯一有效设置是 custom。 |
2. 指定模式信息
如果使用Java原始类型,框架会自动检测其模式,发布数据时无需指定任何模式类型。
对于非原始类型,如果在对ReactivePulsarTemplate调用send操作时未显式指定Schema,Spring for Apache Pulsar框架将尝试从类型构建Schema.JSON。
| 支持的复杂模式类型包括:JSON、AVRO、PROTOBUF、AUTO_PRODUCE_BYTES 以及带内联编码的 KEY_VALUE。 |
2.1. 自定义模式映射
作为在针对复杂类型的send操作调用ReactivePulsarTemplate时指定模式的替代方案,可以通过为类型配置模式解析器映射。这消除了需要指定模式的必要,因为框架会使用 outgoing 消息类型 咨询解析器。
2.1.1. 配置属性
模式映射可以通过spring.pulsar.defaults.type-mappings属性进行配置。
以下示例使用application.yml来为User和Address两个复杂对象添加映射,分别使用AVRO和JSON两个模式:
spring:
pulsar:
defaults:
type-mappings:
- message-type: com.acme.User
schema-info:
schema-type: AVRO
- message-type: com.acme.Address
schema-info:
schema-type: JSON
message-type 是消息类的完全限定名称。 |
2.1.2. 模式解析器自定义器
推荐的方法是通过上述属性来添加映射。 然而,如果需要更多控制,你可以提供一个模式解析器自定义器来添加映射(s)。
以下示例使用了模式解析器自定义器来为User和Address复杂数组使用AVRO和JSON模式进行映射,分别:
@Bean
public SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() {
return (schemaResolver) -> {
schemaResolver.addCustomSchemaMapping(User.class, Schema.AVRO(User.class));
schemaResolver.addCustomSchemaMapping(Address.class, Schema.JSON(Address.class));
}
}
2.2. 使用 AUTO_SCHEMA 生成
如果无法提前知道Pulsar主题的模式类型,可以使用AUTO_PRODUCE模式来安全地发布原始JSON或Avro负载作为byte[]。
在这种情况下,生产者会验证传出字节是否与目标主题的模式兼容。
在模板发送操作中,只需按照以下示例所示,在模板上指定模式Schema.AUTO_PRODUCE_BYTES()即可:
void sendUserAsBytes(ReactivePulsarTemplate<byte[]> template, byte[] userAsBytes) {
template.send("user-topic", userAsBytes, Schema.AUTO_PRODUCE_BYTES()).subscribe();
}
| 仅支持 Avro 和 JSON 架构类型。 |
3. ReactivePulsarSenderFactory
该 ReactivePulsarTemplate 依赖于一个 ReactivePulsarSenderFactory 来实际创建底层发送器。
Spring Boot 提供了此发送者工厂,可以使用任意 spring.pulsar.producer.* 应用程序属性对其进行配置。
如果直接使用发送方工厂API时未指定主题信息,则会使用与ReactivePulsarTemplate相同的主题解析过程,但有一个例外:“消息类型默认”步骤被省略。 |
3.1. 生产者缓存
每个底层Pulsar生产者都会消耗资源。为了提高性能并避免不断创建生产者,ReactiveMessageSenderCache在底层Apache Pulsar Reactive客户端中缓存了它所创建的生产者。它们以LRU方式缓存,并且当在一个配置的时间段内未被使用时会被驱逐。
您可以通过指定任何spring.pulsar.producer.cache.*应用程序属性来配置缓存设置。