消息 生产

1. ReactivePulsarTemplate

在Pulsar生产者一侧,Spring Boot自动配置提供一个ReactivePulsarTemplate用于发布记录。该模板实现了一个名为ReactivePulsarOperations的接口,并通过其契约提供了发布记录的方法。spring-doc.cadn.net.cn

该模板提供了接受单条消息并返回一个 Mono<MessageId> 的 send 方法。 它还提供了接受多条消息(以 ReactiveStreams 的 Publisher 类型形式提供)并返回一个 Flux<MessageId> 的 send 方法。spring-doc.cadn.net.cn

对于不包含主题参数的API变体,会使用主题解析过程来确定目标主题。

1.1. 灵活API

该模板提供一个流畅构建器来处理更复杂的发送请求。spring-doc.cadn.net.cn

1.2. 消息自定义

您可以指定一个 MessageSpecBuilderCustomizer 来配置传出消息。例如,以下代码展示了如何发送带密钥的消息:spring-doc.cadn.net.cn

template.newMessage(msg)
    .withMessageCustomizer((mc) -> mc.key("foo-msg-key"))
    .send();

1.3. 发送方自定义

你可以通过指定一个 ReactiveMessageSenderBuilderCustomizer 来配置底层的 Pulsar 发送器构建器,该构建器最终会构建用于发送传出消息的发送器。spring-doc.cadn.net.cn

谨慎使用,因为这会给予发送器构建器完整的访问权限,调用其某些方法(如 create) 可能会产生意外的副作用。

例如,以下代码展示了如何禁用批处理并启用分块:spring-doc.cadn.net.cn

template.newMessage(msg)
    .withSenderCustomizer((sc) -> sc.enableChunking(true).enableBatching(false))
    .send();

这个其他示例展示了在发布记录到分区主题时如何使用自定义路由。 在发送器构建器上指定您的自定义MessageRouter实现,例如:spring-doc.cadn.net.cn

template.newMessage(msg)
    .withSenderCustomizer((sc) -> sc.messageRouter(messageRouter))
    .send();
注意,当使用一个 MessageRouter 时,spring.pulsar.producer.message-routing-mode 的唯一有效设置是 custom

2. 指定模式信息

如果使用Java原始类型,框架会自动检测其模式,发布数据时无需指定任何模式类型。 对于非原始类型,如果在对ReactivePulsarTemplate调用send操作时未显式指定Schema,Spring for Apache Pulsar框架将尝试从类型构建Schema.JSONspring-doc.cadn.net.cn

支持的复杂模式类型包括:JSON、AVRO、PROTOBUF、AUTO_PRODUCE_BYTES 以及带内联编码的 KEY_VALUE。

2.1. 自定义模式映射

作为在针对复杂类型的send操作调用ReactivePulsarTemplate时指定模式的替代方案,可以通过为类型配置模式解析器映射。这消除了需要指定模式的必要,因为框架会使用 outgoing 消息类型 咨询解析器。spring-doc.cadn.net.cn

2.1.1. 配置属性

模式映射可以通过spring.pulsar.defaults.type-mappings属性进行配置。 以下示例使用application.yml来为UserAddress两个复杂对象添加映射,分别使用AVROJSON两个模式:spring-doc.cadn.net.cn

spring:
  pulsar:
    defaults:
      type-mappings:
        - message-type: com.acme.User
          schema-info:
            schema-type: AVRO
        - message-type: com.acme.Address
          schema-info:
            schema-type: JSON
message-type 是消息类的完全限定名称。

2.1.2. 模式解析器自定义器

推荐的方法是通过上述属性来添加映射。 然而,如果需要更多控制,你可以提供一个模式解析器自定义器来添加映射(s)。spring-doc.cadn.net.cn

以下示例使用了模式解析器自定义器来为UserAddress复杂数组使用AVROJSON模式进行映射,分别:spring-doc.cadn.net.cn

@Bean
public SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() {
	return (schemaResolver) -> {
		schemaResolver.addCustomSchemaMapping(User.class, Schema.AVRO(User.class));
		schemaResolver.addCustomSchemaMapping(Address.class, Schema.JSON(Address.class));
	}
}

2.1.3. 类型映射注解

另一种为特定消息类型指定默认模式信息的方法是给消息类加上@PulsarMessage注解。 模式信息可以通过在注解上使用schemaType属性进行指定。spring-doc.cadn.net.cn

以下示例配置系统在生产或消费类型 Foo 的消息时使用 JSON 作为默认模式:spring-doc.cadn.net.cn

@PulsarMessage(schemaType = SchemaType.JSON)
record Foo(String value) {
}

配置完成后,发送操作无需再指定架构。spring-doc.cadn.net.cn

2.2. 使用 AUTO_SCHEMA 生成

如果无法提前知道Pulsar主题的模式类型,可以使用AUTO_PRODUCE模式来安全地发布原始JSON或Avro负载作为byte[]spring-doc.cadn.net.cn

在这种情况下,生产者会验证传出字节是否与目标主题的模式兼容。spring-doc.cadn.net.cn

在模板发送操作中,只需按照以下示例所示,在模板上指定模式Schema.AUTO_PRODUCE_BYTES()即可:spring-doc.cadn.net.cn

void sendUserAsBytes(ReactivePulsarTemplate<byte[]> template, byte[] userAsBytes) {
	template.send("user-topic", userAsBytes, Schema.AUTO_PRODUCE_BYTES()).subscribe();
}
仅支持 Avro 和 JSON 架构类型。

3. ReactivePulsarSenderFactory

ReactivePulsarTemplate 依赖于一个 ReactivePulsarSenderFactory 来实际创建底层发送器。spring-doc.cadn.net.cn

Spring Boot 提供了此发送者工厂,可以使用任意 spring.pulsar.producer.* 应用程序属性对其进行配置。spring-doc.cadn.net.cn

如果直接使用发送方工厂API时未指定主题信息,则会使用与ReactivePulsarTemplate相同的主题解析过程,但有一个例外:“消息类型默认”步骤被省略

3.1. 生产者缓存

每个底层Pulsar生产者都会消耗资源。为了提高性能并避免不断创建生产者,ReactiveMessageSenderCache在底层Apache Pulsar Reactive客户端中缓存了它所创建的生产者。它们以LRU方式缓存,并且当在一个配置的时间段内未被使用时会被驱逐。spring-doc.cadn.net.cn

您可以通过指定任何spring.pulsar.producer.cache.*应用程序属性来配置缓存设置。spring-doc.cadn.net.cn