空负载与“墓碑”记录的日志压缩

使用日志压缩时,您可以发送和接收负载为 null 的消息以标识键的删除。
您还可以由于其他原因接收到 null 值,例如反序列化器在无法反序列化值时可能返回 nullspring-doc.cadn.net.cn

生成空有效负载

您可以使用ReactivePulsarTemplatesend发送null值,例如,通过将null消息参数值传递给send方法之一。spring-doc.cadn.net.cn

reactiveTemplate
        .send(null, Schema.STRING)
        .subscribe();
发送空值时,必须指定架构类型,因为系统无法从 null 负载中确定消息的类型。

处理空负载

对于 @ReactivePularListenernull 负载根据其消息参数的类型传递到侦听器方法中:spring-doc.cadn.net.cn

参数类型 传入的值

基本类型spring-doc.cadn.net.cn

nullspring-doc.cadn.net.cn

user-definedspring-doc.cadn.net.cn

nullspring-doc.cadn.net.cn

org.apache.pulsar.client.api.Message<T>spring-doc.cadn.net.cn

非空的 Pulsar 消息,其 getValue() 返回 nullspring-doc.cadn.net.cn

org.springframework.messaging.Message<T>spring-doc.cadn.net.cn

非空 Spring 消息,其 getPayload() 返回 PulsarNullspring-doc.cadn.net.cn

Flux<org.apache.pulsar.client.api.Message<T>>spring-doc.cadn.net.cn

非空的流,其条目为非空的Pulsar消息,getValue()返回nullspring-doc.cadn.net.cn

Flux<org.springframework.messaging.Message<T>>spring-doc.cadn.net.cn

一个非空的 Flux,其条目是非空的 Spring 消息,getPayload() 返回 PulsarNullspring-doc.cadn.net.cn


当传入的值为 null(即使用基本类型或用户定义类型的单记录侦听器)时,您必须对@Payload参数注解使用required = false
在为监听器负载类型使用 Spring org.springframework.messaging.Message时,其泛型类型信息必须足够宽以接受 Message<PulsarNull>(例如MessageMessage<?>Message<Object>)。

spring-doc.cadn.net.cn

这是因为 Spring 消息不允许其有效载荷为空值,而是使用了 PulsarNull 占位符替代。spring-doc.cadn.net.cn

如果这是压缩日志的墓碑消息,则通常还需要键,以便您的应用程序可以确定哪个键被 "deleted"。以下示例显示了此类配置:spring-doc.cadn.net.cn

@ReactivePulsarListener(
        topics = "my-topic",
        subscriptionName = "my-topic-sub",
        schemaType = SchemaType.STRING)
Mono<Void> myListener(
        @Payload(required = false) String msg,
        @Header(PulsarHeaders.KEY) String key) {
    ...
}
使用流式消息监听器(Flux)时,头支持是有限的,因此在日志压缩场景下用处不大。