事务
本部分描述了 Spring for Apache Pulsar 如何支持事务。
概述
Spring for Apache Pulsar transaction support is built upon the transaction support provided by Spring Framework. 在高层次上,事务性资源会注册到事务管理器中,事务管理器进而处理所注册资源的事务状态(提交、回滚等)。
Spring for Apache Pulsar 提供以下内容:
-
PulsarTransactionManager- 用于正常的 Spring 事务支持 (@Transactional,TransactionTemplate, 等等) -
事务性
PulsarTemplate -
事务性
@PulsarListener -
与其他事务管理器的事务同步
| 尚未将事务支持添加到响应式组件中 |
事务支持默认是禁用的。
在使用 Spring Boot 时,要启用支持,只需将 spring.pulsar.transaction.enabled 属性设置为 1。
在下方各组件部分中列出了进一步的配置选项。
事务性发布 withPulsarTemplate
所有对事务性PulsarTemplate的发送操作都会查找一个活动的事务,并在找到事务时将每个发送操作加入到该事务中。
非事务性使用
默认情况下,PulsarTemplate 的事务性也可以用于非事务性操作。
当未找到现有事务时,将以非事务方式继续发送操作。
但是,如果模板配置为要求事务,则在事务作用域之外尝试使用模板将导致异常。
可以通过一个 TransactionTemplate、一个以 @Transactional 为前缀的方法、调用 executeInTransaction,或通过事务性监听器容器启动事务。 |
本地事务
我们使用术语 "local" 事务来表示一种不被或与 Spring 事务管理设施(即 PulsarTransactionManager)关联的 Pulsar 本机事务。
相反,一种 "synchronized" 事务是指由或与 PulsarTransactionManager 关联并由 Spring 事务管理的事务。
你可以使用 PulsarTemplate 在本地事务中执行一系列操作。
以下示例展示了如何操作:
var results = pulsarTemplate.executeInTransaction((template) -> {
var rv = new HashMap<String, MessageId>();
rv.put("msg1", template.send(topic, "msg1"));
rv.put("msg2", template.send(topic, "msg2"));
return rv;
});
回调参数是executeInTransaction方法被调用的模板实例。
所有对模板的操作都登记在当前事务中。
如果回调正常退出,事务将提交。
如果抛出异常,事务将回滚。
| 如果存在正在进行的同步事务,则会被忽略,并使用一个新的“嵌套”事务。 |
配置
以下事务设置可以直接在 PulsarTemplate 上获得(通过 transactions 字段):
-
enabled- 是否支持事务(默认false) -
required- 是否需要事务(默认false) -
timeout- 事务超时的持续时间(默认null)
当不使用 Spring Boot 时,您可以在此提供的模板上调整这些设置。
然而,当使用 Spring Boot 时,模板会自动配置,且没有机制可以影响属性。
在这种情况下,您可以注册一个 PulsarTemplateCustomizer bean 用于调整设置。
以下示例展示了如何在自动配置的模板上设置超时:
@Bean
PulsarTemplateCustomizer<?> templateCustomizer() {
return (template) -> template.transactions().setTimeout(Duration.ofSeconds(45));
}
事务性接收 with@PulsarListener
当启用监听器事务时,@PulsarListener 注解的监听器方法在同步事务的范围内被调用。
DefaultPulsarMessageListenerContainer 使用一个配置了 TransactionTemplate 的 Spring TransactionTemplate 在方法调用之前发起事务。
每条收到的消息的确认都列示在作用域事务中。
消费-处理-生产场景
一个常见的事务模式是:消费者从Pulsar主题读取消息,对消息进行转换,最后生产者将转换后的消息写入另一个Pulsar主题。
框架在事务已启用且监听器方法使用事务性PulsarTemplate来生产转换后的消息时支持此用例。
给定以下监听器方法:
@PulsarListener(topics = "my-input-topic") (1)
void listen(String msg) { (2)
var transformedMsg = msg.toUpperCase(Locale.ROOT); (3)
this.transactionalTemplate.send("my-output-topic", transformedMsg); (4)
} (5) (6)
以下交互发生在启用了监听器事务时:
| 1 | 监听器容器启动新的事务并以事务的范围调用监听器方法 |
| 2 | 监听器方法接收消息 |
| 3 | 监听器方法转换消息 |
| 4 | 监听器方法使用事务模板发送转换后的消息,将发送操作加入到活动事务中 |
| 5 | 监听器容器自动确认消息,并将确认操作纳入活动事务 |
| 6 | 监听器容器(通过 TransactionTemplate)提交事务 |
如果你没有使用 @PulsarListener 而是直接使用监听器容器,将如上所述一样提供相同的事务支持。
记住,@PulsarListener 仅仅是为了将 Java 方法注册为监听器容器的消息监听器的便利功能。
带有记录监听器的事务
以上示例使用了记录监听器。 当使用记录监听器时,每次监听器方法调用都会创建一个新的事务,这相当于每条消息一个事务。
| 因为事务边界是按消息划分的,而每个消息的确认都会被纳入每个事务,因此不能将批处理确认模式与事务性记录监听器一起使用。 |
配置
监听器容器工厂
The following transaction settings are available directly on the PulsarContainerProperties used by the ConcurrentPulsarListenerContainerFactory when creating listener containers.
These settings affect all listener containers, including the ones used by @PulsarListener.
-
enabled- 是否支持事务(默认false) -
required- 容器是否需要事务(默认false) -
timeout- 事务超时的持续时间(默认null) -
transactionDefinition- 一个具有属性的事务定义蓝图,这些属性将被复制到容器的事务模板中(默认null) -
transactionManager- 用于启动事务的事务管理器
When 不使用 Spring Boot 时,您可以调整您提供的容器工厂上的这些设置。
然而,当使用 Spring Boot 时,容器工厂会自动配置。
在这种情况下,您可以注册一个 org.springframework.boot.autoconfigure.pulsar.PulsarContainerFactoryCustomizer<ConcurrentPulsarListenerContainerFactory<?>> bean 来访问并自定义容器属性。
以下示例展示了如何设置容器工厂上的超时:
@Bean
PulsarContainerFactoryCustomizer<ConcurrentPulsarListenerContainerFactory<?>> containerCustomizer() {
return (containerFactory) -> containerFactory.getContainerProperties().transactions().setTimeout(Duration.ofSeconds(45));
}
使用PulsarTransactionManager
The PulsarTransactionManager 是 Spring Framework 的 PlatformTransactionManager 的实现。
你可以使用 PulsarTransactionManager 与正常的 Spring 事务支持 (@Transactional, TransactionTemplate 和其他)。
如果存在一个事务,那么在该事务作用域内执行的任何PulsarTemplate操作都会加入并参与正在进行的事务。
事务管理器根据成功或失败来提交或回滚事务。
您很可能不会直接使用 PulsarTransactionManager,因为大多数事务性使用场景都由 PulsarTemplate 和 @PulsarListener 覆盖。 |
Pulsar Transactions with Other Transaction Managers
生产者单向事务
如果你希望将记录发送到 Pulsar 并在单个事务中执行一些数据库更新,你可以使用正常的 Spring 事务管理并使用一个 DataSourceTransactionManager。
假设以下示例中有一个名为 "dataSourceTransactionManager" 的 DataSourceTransactionManager bean 注册。 |
@Transactional("dataSourceTransactionManager")
public void myServiceMethod() {
var msg = calculateMessage();
this.pulsarTemplate.send("my-topic", msg);
this.jdbcTemplate.execute("insert into my_table (data) values ('%s')".formatted(msg));
}
拦截器用于@Transactional注解会启动数据库事务,而PulsarTemplate会将事务与DB事务管理器进行同步;每次发送都会参与该事务。
当方法退出时,数据库事务将先提交,随后是Pulsar事务。
如果您希望先提交Pulsar事务,仅在Pulsar事务成功时再提交数据库事务,请使用嵌套的@Transactional方法,外层方法配置为使用DataSourceTransactionManager,内层方法配置为使用PulsarTransactionManager。
@Transactional("dataSourceTransactionManager")
public void myServiceMethod() {
var msg = calculateMessage();
this.jdbcTemplate.execute("insert into my_table (data) values ('%s')".formatted(msg));
this.sendToPulsar(msg);
}
@Transactional("pulsarTransactionManager")
public void sendToPulsar(String msg) {
this.pulsarTemplate.send("my-topic", msg);
}
消费者 + 生产者 事务
如果要消费Pulsar中的记录、向Pulsar发送记录,并在事务中执行一些数据库更新,可以将正常的Spring事务管理(使用一个DataSourceTransactionManager)与容器发起的事务相结合。
在以下示例中,监听容器启动Pulsar事务,而@Transactional注解启动DB事务。
DB事务首先提交;如果Pulsar事务无法提交,记录将被重新投递,因此DB更新应具有幂等性。
@PulsarListener(topics = "my-input-topic")
@Transactional("dataSourceTransactionManager")
void listen(String msg) {
var transformedMsg = msg.toUpperCase(Locale.ROOT);
this.pulsarTemplate.send("my-output-topic", transformedMsg);
this.jdbcTemplate.execute("insert into my_table (data) values ('%s')".formatted(transformedMsg));
}