空负载与“墓碑”记录的日志压缩

使用日志压缩时,您可以发送和接收负载为 null 的消息以标识键的删除。
您还可以由于其他原因接收到 null 值,例如反序列化器在无法反序列化值时可能返回 nullspring-doc.cadn.net.cn

生成空有效负载

要通过使用 null 发送一个 PulsarTemplate 负载,可以使用流利 API,并将 null 传递给 newMessage() 方法的值参数,例如:
spring-doc.cadn.net.cn

pulsarTemplate
        .newMessage(null)
        .withTopic("my-topic")
        .withSchema(Schema.STRING)
        .withMessageCustomizer((mb) -> mb.key("key:1234"))
        .send();
发送空值时,必须指定架构类型,因为系统无法从 null 负载中确定消息的类型。

处理空负载

对于@PulsarListener@PulsarReader,根据其消息参数的类型,null有效载荷将传递到监听器方法中,如下所示:spring-doc.cadn.net.cn

参数类型 传入的值

基本类型spring-doc.cadn.net.cn

nullspring-doc.cadn.net.cn

user-definedspring-doc.cadn.net.cn

nullspring-doc.cadn.net.cn

org.apache.pulsar.client.api.Message<T>spring-doc.cadn.net.cn

非空的 Pulsar 消息,其 getValue() 返回 nullspring-doc.cadn.net.cn

org.springframework.messaging.Message<T>spring-doc.cadn.net.cn

非空 Spring 消息,其 getPayload() 返回 PulsarNullspring-doc.cadn.net.cn

List<X>spring-doc.cadn.net.cn

非空列表,其条目(X)是上述类型之一,并相应地起作用(即原始条目为 null 等。)spring-doc.cadn.net.cn

org.apache.pulsar.client.api.Messages<T>spring-doc.cadn.net.cn

非空容器,其中包含非空的 Pulsar 消息,并且getValue()返回nullspring-doc.cadn.net.cn


当传入的值为 null(即使用基本类型或用户定义类型的单记录侦听器)时,您必须对@Payload参数注解使用required = false
在为监听器负载类型使用 Spring org.springframework.messaging.Message时,其泛型类型信息必须足够宽以接受 Message<PulsarNull>(例如MessageMessage<?>Message<Object>)。

spring-doc.cadn.net.cn

这是因为 Spring 消息不允许其有效载荷为空值,而是使用了 PulsarNull 占位符替代。spring-doc.cadn.net.cn

如果这是压缩日志的墓碑消息,则通常还需要键,以便您的应用程序可以确定哪个键被 "deleted"。以下示例显示了此类配置:spring-doc.cadn.net.cn

@PulsarListener(
        topics = "my-topic",
        subscriptionName = "my-topic-sub",
        schemaType = SchemaType.STRING)
void myListener(
        @Payload(required = false) String msg,
        @Header(PulsarHeaders.KEY) String key) {
    ...
}

@PulsarReader 目前还不支持 @Header 参数,因此在日志压缩场景中用处较小。spring-doc.cadn.net.cn