空负载与“墓碑”记录的日志压缩
使用日志压缩时,您可以发送和接收负载为 null 的消息以标识键的删除。
您还可以由于其他原因接收到 null 值,例如反序列化器在无法反序列化值时可能返回 null。
生成空有效负载
要通过使用 null 发送一个 PulsarTemplate 负载,可以使用流利 API,并将 null 传递给 newMessage() 方法的值参数,例如:
pulsarTemplate
.newMessage(null)
.withTopic("my-topic")
.withSchema(Schema.STRING)
.withMessageCustomizer((mb) -> mb.key("key:1234"))
.send();
发送空值时,必须指定架构类型,因为系统无法从 null 负载中确定消息的类型。 |
处理空负载
对于@PulsarListener和@PulsarReader,根据其消息参数的类型,null有效载荷将传递到监听器方法中,如下所示:
| 参数类型 | 传入的值 |
|---|---|
基本类型 |
|
user-defined |
|
|
非空的 Pulsar 消息,其 |
|
非空 Spring 消息,其 |
|
非空列表,其条目( |
|
非空容器,其中包含非空的 Pulsar 消息,并且 |
当传入的值为 null(即使用基本类型或用户定义类型的单记录侦听器)时,您必须对@Payload参数注解使用required = false。 |
在为监听器负载类型使用 Spring org.springframework.messaging.Message时,其泛型类型信息必须足够宽以接受 Message<PulsarNull>(例如Message、Message<?>或Message<Object>)。这是因为 Spring 消息不允许其有效载荷为空值,而是使用了 |
如果这是压缩日志的墓碑消息,则通常还需要键,以便您的应用程序可以确定哪个键被 "deleted"。以下示例显示了此类配置:
@PulsarListener(
topics = "my-topic",
subscriptionName = "my-topic-sub",
schemaType = SchemaType.STRING)
void myListener(
@Payload(required = false) String msg,
@Header(PulsarHeaders.KEY) String key) {
...
}
|