事务

本部分描述了 Spring for Apache Pulsar 如何支持事务。spring-doc.cadn.net.cn

概述

Spring for Apache Pulsar transaction support is built upon the transaction support provided by Spring Framework. 在高层次上,事务性资源会注册到事务管理器中,事务管理器进而处理所注册资源的事务状态(提交、回滚等)。spring-doc.cadn.net.cn

Spring for Apache Pulsar 提供以下内容:spring-doc.cadn.net.cn

尚未将事务支持添加到响应式组件中

事务支持默认是禁用的。 在使用 Spring Boot 时,要启用支持,只需将 spring.pulsar.transaction.enabled 属性设置为 1。 在下方各组件部分中列出了进一步的配置选项。spring-doc.cadn.net.cn

事务性发布 withPulsarTemplate

所有对事务性PulsarTemplate的发送操作都会查找一个活动的事务,并在找到事务时将每个发送操作加入到该事务中。spring-doc.cadn.net.cn

非事务性使用

默认情况下,PulsarTemplate 的事务性也可以用于非事务性操作。 当未找到现有事务时,将以非事务方式继续发送操作。 但是,如果模板配置为要求事务,则在事务作用域之外尝试使用模板将导致异常。spring-doc.cadn.net.cn

可以通过一个 TransactionTemplate、一个以 @Transactional 为前缀的方法、调用 executeInTransaction,或通过事务性监听器容器启动事务。

本地事务

我们使用术语 "local" 事务来表示一种不被或与 Spring 事务管理设施(即 PulsarTransactionManager)关联的 Pulsar 本机事务。 相反,一种 "synchronized" 事务是指由或与 PulsarTransactionManager 关联并由 Spring 事务管理的事务。spring-doc.cadn.net.cn

你可以使用 PulsarTemplate 在本地事务中执行一系列操作。 以下示例展示了如何操作:spring-doc.cadn.net.cn

var results = pulsarTemplate.executeInTransaction((template) -> {
    var rv = new HashMap<String, MessageId>();
    rv.put("msg1", template.send(topic, "msg1"));
    rv.put("msg2", template.send(topic, "msg2"));
    return rv;
});

回调参数是executeInTransaction方法被调用的模板实例。 所有对模板的操作都登记在当前事务中。 如果回调正常退出,事务将提交。 如果抛出异常,事务将回滚。spring-doc.cadn.net.cn

如果存在正在进行的同步事务,则会被忽略,并使用一个新的“嵌套”事务。

配置

以下事务设置可以直接在 PulsarTemplate 上获得(通过 transactions 字段):spring-doc.cadn.net.cn

当不使用 Spring Boot 时,您可以在此提供的模板上调整这些设置。 然而,当使用 Spring Boot 时,模板会自动配置,且没有机制可以影响属性。 在这种情况下,您可以注册一个 PulsarTemplateCustomizer bean 用于调整设置。 以下示例展示了如何在自动配置的模板上设置超时:spring-doc.cadn.net.cn

@Bean
PulsarTemplateCustomizer<?> templateCustomizer() {
    return (template) -> template.transactions().setTimeout(Duration.ofSeconds(45));
}

事务性接收 with@PulsarListener

当启用监听器事务时,@PulsarListener 注解的监听器方法在同步事务的范围内被调用。spring-doc.cadn.net.cn

DefaultPulsarMessageListenerContainer 使用一个配置了 TransactionTemplate 的 Spring TransactionTemplate 在方法调用之前发起事务。spring-doc.cadn.net.cn

每条收到的消息的确认都列示在作用域事务中。spring-doc.cadn.net.cn

消费-处理-生产场景

一个常见的事务模式是:消费者从Pulsar主题读取消息,对消息进行转换,最后生产者将转换后的消息写入另一个Pulsar主题。 框架在事务已启用且监听器方法使用事务性PulsarTemplate来生产转换后的消息时支持此用例。spring-doc.cadn.net.cn

给定以下监听器方法:spring-doc.cadn.net.cn

@PulsarListener(topics = "my-input-topic") (1)
void listen(String msg) { (2)
    var transformedMsg = msg.toUpperCase(Locale.ROOT); (3)
    this.transactionalTemplate.send("my-output-topic", transformedMsg); (4)
} (5) (6)

以下交互发生在启用了监听器事务时:spring-doc.cadn.net.cn

1 监听器容器启动新的事务并以事务的范围调用监听器方法
2 监听器方法接收消息
3 监听器方法转换消息
4 监听器方法使用事务模板发送转换后的消息,将发送操作加入到活动事务中
5 监听器容器自动确认消息,并将确认操作纳入活动事务
6 监听器容器(通过 TransactionTemplate)提交事务

如果你没有使用 @PulsarListener 而是直接使用监听器容器,将如上所述一样提供相同的事务支持。 记住,@PulsarListener 仅仅是为了将 Java 方法注册为监听器容器的消息监听器的便利功能。spring-doc.cadn.net.cn

带有记录监听器的事务

以上示例使用了记录监听器。 当使用记录监听器时,每次监听器方法调用都会创建一个新的事务,这相当于每条消息一个事务。spring-doc.cadn.net.cn

因为事务边界是按消息划分的,而每个消息的确认都会被纳入每个事务,因此不能将批处理确认模式与事务性记录监听器一起使用。

带有批处理监听器的事务

当使用批处理监听器时,每次监听器方法调用都会创建一个新的事务,这相当于每批消息一个事务。spring-doc.cadn.net.cn

事务性批处理监听器目前不支持自定义错误处理程序。

配置

监听器容器工厂

The following transaction settings are available directly on the PulsarContainerProperties used by the ConcurrentPulsarListenerContainerFactory when creating listener containers. These settings affect all listener containers, including the ones used by @PulsarListener.spring-doc.cadn.net.cn

When 不使用 Spring Boot 时,您可以调整您提供的容器工厂上的这些设置。 然而,当使用 Spring Boot 时,容器工厂会自动配置。 在这种情况下,您可以注册一个 org.springframework.boot.autoconfigure.pulsar.PulsarContainerFactoryCustomizer<ConcurrentPulsarListenerContainerFactory<?>> bean 来访问并自定义容器属性。 以下示例展示了如何设置容器工厂上的超时:spring-doc.cadn.net.cn

@Bean
PulsarContainerFactoryCustomizer<ConcurrentPulsarListenerContainerFactory<?>> containerCustomizer() {
    return (containerFactory) -> containerFactory.getContainerProperties().transactions().setTimeout(Duration.ofSeconds(45));
}

@PulsarListener

默认情况下,每个监听器都会尊重其对应监听器容器工厂的事务设置。 然而,用户可以通过在每个@PulsarListener上设置transactional属性来覆盖容器工厂的设置,如下所示:spring-doc.cadn.net.cn

  • 如果容器工厂启用了事务,那么transactional = false将为该indiviuall listener禁用事务。spring-doc.cadn.net.cn

  • 如果容器工厂启用了事务且为必需,那么尝试设置 transactional = false 将会抛出一个异常,提示事务是必需的。spring-doc.cadn.net.cn

  • 如果容器工厂的事务功能被禁用,那么尝试设置transactional = true将会被忽略,并会记录一条警告。spring-doc.cadn.net.cn

使用PulsarTransactionManager

The PulsarTransactionManager 是 Spring Framework 的 PlatformTransactionManager 的实现。 你可以使用 PulsarTransactionManager 与正常的 Spring 事务支持 (@Transactional, TransactionTemplate 和其他)。spring-doc.cadn.net.cn

如果存在一个事务,那么在该事务作用域内执行的任何PulsarTemplate操作都会加入并参与正在进行的事务。 事务管理器根据成功或失败来提交或回滚事务。spring-doc.cadn.net.cn

您很可能不会直接使用 PulsarTransactionManager,因为大多数事务性使用场景都由 PulsarTemplate@PulsarListener 覆盖。

Pulsar Transactions with Other Transaction Managers

生产者单向事务

如果你希望将记录发送到 Pulsar 并在单个事务中执行一些数据库更新,你可以使用正常的 Spring 事务管理并使用一个 DataSourceTransactionManagerspring-doc.cadn.net.cn

假设以下示例中有一个名为 "dataSourceTransactionManager" 的 DataSourceTransactionManager bean 注册。
@Transactional("dataSourceTransactionManager")
public void myServiceMethod() {
    var msg = calculateMessage();
    this.pulsarTemplate.send("my-topic", msg);
    this.jdbcTemplate.execute("insert into my_table (data) values ('%s')".formatted(msg));
}

拦截器用于@Transactional注解会启动数据库事务,而PulsarTemplate会将事务与DB事务管理器进行同步;每次发送都会参与该事务。 当方法退出时,数据库事务将先提交,随后是Pulsar事务。spring-doc.cadn.net.cn

如果您希望先提交Pulsar事务,仅在Pulsar事务成功时再提交数据库事务,请使用嵌套的@Transactional方法,外层方法配置为使用DataSourceTransactionManager,内层方法配置为使用PulsarTransactionManagerspring-doc.cadn.net.cn

@Transactional("dataSourceTransactionManager")
public void myServiceMethod() {
    var msg = calculateMessage();
    this.jdbcTemplate.execute("insert into my_table (data) values ('%s')".formatted(msg));
    this.sendToPulsar(msg);
}

@Transactional("pulsarTransactionManager")
public void sendToPulsar(String msg) {
    this.pulsarTemplate.send("my-topic", msg);
}

消费者 + 生产者 事务

如果要消费Pulsar中的记录、向Pulsar发送记录,并在事务中执行一些数据库更新,可以将正常的Spring事务管理(使用一个DataSourceTransactionManager)与容器发起的事务相结合。spring-doc.cadn.net.cn

在以下示例中,监听容器启动Pulsar事务,而@Transactional注解启动DB事务。 DB事务首先提交;如果Pulsar事务无法提交,记录将被重新投递,因此DB更新应具有幂等性。spring-doc.cadn.net.cn

@PulsarListener(topics = "my-input-topic")
@Transactional("dataSourceTransactionManager")
void listen(String msg) {
    var transformedMsg = msg.toUpperCase(Locale.ROOT);
    this.pulsarTemplate.send("my-output-topic", transformedMsg);
    this.jdbcTemplate.execute("insert into my_table (data) values ('%s')".formatted(transformedMsg));
}