消息 生产

1. Pulsar 模板

在Pulsar生产者一侧,Spring Boot自动配置提供一个PulsarTemplate用于发布记录。该模板实现了一个名为PulsarOperations的接口,并通过其契约提供了发布记录的方法。spring-doc.cadn.net.cn

存在两类 send API 方法:sendsendAsyncsend 方法通过在 Pulsar 生产者上使用同步发送能力来阻塞调用。 一旦消息在代理上持久化,它们会返回该消息的 MessageIdsendAsync 是异步调用,是非阻塞的。 它们会返回一个 CompletableFuture,您可以用它在消息发布后异步接收消息 ID 一旦消息发布。spring-doc.cadn.net.cn

对于不包含主题参数的API变体,会使用主题解析过程来确定目标主题。

1.1. 简单的 API

该模板提供了一些方法(\"send\" 前缀开头),用于简单的发送请求。对于更复杂的发送请求,一个流畅的 API 可以让您配置更多选项。spring-doc.cadn.net.cn

1.2. 流式API

该模板提供一个流畅构建器来处理更复杂的发送请求。spring-doc.cadn.net.cn

1.3. 消息自定义

您可以指定一个 TypedMessageBuilderCustomizer 来配置传出消息。例如,以下代码展示了如何发送带密钥的消息:spring-doc.cadn.net.cn

template.newMessage(msg)
    .withMessageCustomizer((mb) -> mb.key("foo-msg-key"))
    .send();

1.4. 生产者自定义

您可以指定一个 ProducerBuilderCustomizer 来配置底层的 Pulsar 生产者构建器,该构建器最终会构造用于发送传出消息的生产者。spring-doc.cadn.net.cn

请谨慎使用,因为这会完全访问生产者构建器并调用其某些方法(例如create)可能会产生意外的副作用。

例如,以下代码展示了如何禁用批处理并启用分块:spring-doc.cadn.net.cn

template.newMessage(msg)
    .withProducerCustomizer((pb) -> pb.enableChunking(true).enableBatching(false))
    .send();

此示例展示了如何在向分区主题发布记录时使用自定义路由。请在Producer构建器上指定您的自定义MessageRouter实现,例如:spring-doc.cadn.net.cn

template.newMessage(msg)
    .withProducerCustomizer((pb) -> pb.messageRouter(messageRouter))
    .send();
注意,当使用一个 MessageRouter 时,spring.pulsar.producer.message-routing-mode 的唯一有效设置是 custom

此示例展示了如何添加一个ProducerInterceptor,该拦截器将在消息被发布到代理之前捕获并修改生产者接收到的消息:spring-doc.cadn.net.cn

template.newMessage(msg)
    .withProducerCustomizer((pb) -> pb.intercept(interceptor))
    .send();

自定义程序仅适用于发送操作中使用的生产者。如果要将自定义程序应用于所有生产者,必须按照全局生产者自定义中的描述提供给生产者工厂。spring-doc.cadn.net.cn

在使用 Lambda 自定义器时,必须遵循“Lambda 自定义器注意事项”中所述的规则。

2. 指定模式信息

如果使用Java原始类型,框架会自动检测其模式,发布数据时无需指定任何模式类型。 对于非原始类型,如果在对PulsarTemplate调用send操作时未显式指定Schema,Spring for Apache Pulsar框架将尝试从类型构建Schema.JSONspring-doc.cadn.net.cn

支持的复杂模式类型包括:JSON、AVRO、PROTOBUF、AUTO_PRODUCE_BYTES 以及带内联编码的 KEY_VALUE。

2.1. 自定义模式映射

作为在针对复杂类型的send操作调用PulsarTemplate时指定模式的替代方案,可以通过为类型配置模式解析器映射。这消除了需要指定模式的必要,因为框架会使用 outgoing 消息类型 咨询解析器。spring-doc.cadn.net.cn

2.1.1. 配置属性

模式映射可以通过spring.pulsar.defaults.type-mappings属性进行配置。 以下示例使用application.yml来为UserAddress两个复杂对象添加映射,分别使用AVROJSON两个模式:spring-doc.cadn.net.cn

spring:
  pulsar:
    defaults:
      type-mappings:
        - message-type: com.acme.User
          schema-info:
            schema-type: AVRO
        - message-type: com.acme.Address
          schema-info:
            schema-type: JSON
message-type 是消息类的完全限定名称。

2.1.2. 模式解析器自定义器

推荐的方法是通过上述属性来添加映射。 然而,如果需要更多控制,你可以提供一个模式解析器自定义器来添加映射(s)。spring-doc.cadn.net.cn

以下示例使用了模式解析器自定义器来为UserAddress复杂数组使用AVROJSON模式进行映射,分别:spring-doc.cadn.net.cn

@Bean
public SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() {
	return (schemaResolver) -> {
		schemaResolver.addCustomSchemaMapping(User.class, Schema.AVRO(User.class));
		schemaResolver.addCustomSchemaMapping(Address.class, Schema.JSON(Address.class));
	}
}

2.1.3. 类型映射注解

另一种为特定消息类型指定默认模式信息的方法是给消息类加上@PulsarMessage注解。 模式信息可以通过在注解上使用schemaType属性进行指定。spring-doc.cadn.net.cn

以下示例配置系统在生产或消费类型 Foo 的消息时使用 JSON 作为默认模式:spring-doc.cadn.net.cn

@PulsarMessage(schemaType = SchemaType.JSON)
record Foo(String value) {
}

配置完成后,发送操作无需再指定架构。spring-doc.cadn.net.cn

2.2. 使用 AUTO_SCHEMA 生成

如果无法提前知道Pulsar主题的模式类型,可以使用AUTO_PRODUCE模式来安全地发布原始JSON或Avro负载作为byte[]spring-doc.cadn.net.cn

在这种情况下,生产者会验证传出字节是否与目标主题的模式兼容。spring-doc.cadn.net.cn

在模板发送操作中,只需按照以下示例所示,在模板上指定模式Schema.AUTO_PRODUCE_BYTES()即可:spring-doc.cadn.net.cn

void sendUserAsBytes(PulsarTemplate<byte[]> template, byte[] userAsBytes) {
	template.send("user-topic", userAsBytes, Schema.AUTO_PRODUCE_BYTES());
}
仅支持 Avro 和 JSON 架构类型。

3. Pulsar Producer Factory

PulsarTemplate依赖于一个PulsarProducerFactory来实际创建底层生产者。Spring Boot自动配置还提供了此生产者工厂,您可以进一步通过指定任何spring.pulsar.producer.*应用程序属性进行配置。spring.pulsar.producer.*spring-doc.cadn.net.cn

如果在直接使用生产者工厂API时未指定主题信息,则会使用与PulsarTemplate相同的主题解析过程,但有一个例外:省略了“消息类型默认”步骤。

3.1. 全局生产者自定义

该框架提供了ProducerBuilderCustomizer契约,允许您配置用于构造每个生产者的底层构建器。
要自定义所有生产者,可以将一组定制器传递给PulsarProducerFactory构造函数。
使用多个定制器时,它们会按照列表中出现的顺序依次应用。spring-doc.cadn.net.cn

如果您使用 Spring Boot 自动配置,可以将自定义程序指定为 bean,它们会自动传递给 PulsarProducerFactory,并按照其 @Order 注解进行排序。

如果你想仅对单个生产者应用定制器,你可以使用Fluent API并在发送时指定定制器。spring-doc.cadn.net.cn

4. Pulsar Producer Caching

每个底层的 Pulsar 生产者都会消耗资源。为了提高性能并避免不断创建生产者,生产者工厂会缓存它所创建的生产者。它们以最近最少使用(LRU)的方式进行缓存,并在配置的时间段内未被使用时被逐出。缓存键仅由足够的信息组成,以确保调用方在后续创建请求中返回相同的生产者。spring-doc.cadn.net.cn

此外,您可以通过指定任何spring.pulsar.producer.cache.*应用程序属性来配置缓存设置。spring-doc.cadn.net.cn

4.1. Lambda 自定义器注意事项

任何用户提供的生产者自定义器也会包含在缓存键中。
由于缓存键依赖于equals/hashCode的有效实现,因此使用Lambda自定义器时必须小心。spring-doc.cadn.net.cn

规则:如果两个使用相同Lambda实例且在其闭包外部不依赖任何变量的自定义程序作为Lambda实现,则它们将在equals/hashCode处匹配。

为了明确上述规则,我们将看几个例子。
在下面的例子中,自定义器被定义为内联Lambda表达式,这意味着每次调用sendUser时都使用相同的Lambda实例。此外,它不需要其闭包之外的任何变量。因此,它作为缓存键匹配。spring-doc.cadn.net.cn

void sendUser() {
    var user = randomUser();
    template.newMessage(user)
        .withTopic("user-topic")
        .withProducerCustomizer((b) -> b.producerName("user"))
        .send();
}

在接下来的这个例子中,自定义器被定义为内联 Lambda 表达式,这意味着每次调用 sendUser 都会使用同一个 Lambda 实例。然而,它需要在其闭包外部的一个变量。因此,它 不会 匹配缓存键。spring-doc.cadn.net.cn

void sendUser() {
    var user = randomUser();
    var name = randomName();
    template.newMessage(user)
        .withTopic("user-topic")
        .withProducerCustomizer((b) -> b.producerName(name))
        .send();
}

在此最后一个例子中,自定义器被定义为内联Lambda表达式,这意味着每次调用sendUser都会使用相同的Lambda实例。尽管它使用了一个变量名,但该变量并非在其闭包外部创建,因此匹配为缓存键。
这说明了变量可以在内部的Lambda闭包中使用,并且甚至可以调用静态方法。spring-doc.cadn.net.cn

void sendUser() {
    var user = randomUser();
    template.newMessage(user)
        .withTopic("user-topic")
        .withProducerCustomizer((b) -> {
           var name = SomeHelper.someStaticMethod();
           b.producerName(name);
        })
        .send();
}
规则: 如果您的 Lambda 自定义器未被定义为单一实例(即在后续调用中使用相同的实例)它需要在其闭包外定义的变量,则您必须提供一个具有有效 equals/hashCode 实现的自定义器。
如果未遵循这些规则,则生产者缓存将始终无法命中,您的应用性能将会受到影响。

5. 在生产者处拦截消息

添加一个ProducerInterceptor允许你在消息被发布到代理之前拦截并修改生产者接收到的消息。为此,你可以将一组拦截器传递给PulsarTemplate构造函数。当使用多个拦截器时,它们的应用顺序就是列表中出现的顺序。spring-doc.cadn.net.cn

如果您使用 Spring Boot 自动配置,可以将拦截器指定为 Bean。它们会自动传递给 PulsarTemplate。通过使用 @Order 注解来实现拦截器的排序,如下所示:spring-doc.cadn.net.cn

@Bean
@Order(100)
ProducerInterceptor firstInterceptor() {
  ...
}

@Bean
@Order(200)
ProducerInterceptor secondInterceptor() {
  ...
}
如果您不使用Starters,则需要自行配置并注册上述组件。