发布和消费分区主题

在以下示例中,我们向名为 hello-pulsar-partitioned 的主题发布数据。 这是一个分区主题,对于本示例,我们假设该主题已预先创建并包含三个分区。spring-doc.cadn.net.cn

@SpringBootApplication
public class PulsarBootPartitioned {

	public static void main(String[] args) {
		SpringApplication.run(PulsarBootPartitioned.class, "--spring.pulsar.producer.message-routing-mode=CustomPartition");
	}

	@Bean
	public ApplicationRunner runner(PulsarTemplate<String> pulsarTemplate) {
		pulsarTemplate.setDefaultTopicName("hello-pulsar-partitioned");
		return args -> {
			for (int i = 0; i < 10; i++) {
				pulsarTemplate.sendAsync("hello john doe 0 ", new FooRouter());
				pulsarTemplate.sendAsync("hello alice doe 1", new BarRouter());
				pulsarTemplate.sendAsync("hello buzz doe 2", new BuzzRouter());
			}
		};
	}

	@PulsarListener(subscriptionName = "hello-pulsar-partitioned-subscription", topics = "hello-pulsar-partitioned")
	public void listen(String message) {
		System.out.println("Message Received: " + message);
	}

    static class FooRouter implements MessageRouter {

		@Override
		public int choosePartition(Message<?> msg, TopicMetadata metadata) {
			return 0;
		}
	}

	static class BarRouter implements MessageRouter {

		@Override
		public int choosePartition(Message<?> msg, TopicMetadata metadata) {
			return 1;
		}
	}

	static class BuzzRouter implements MessageRouter {

		@Override
		public int choosePartition(Message<?> msg, TopicMetadata metadata) {
			return 2;
		}
	}

}

在前述示例中,我们向分区主题发布消息,并希望将某些数据片段发布到特定的分区。 如果让 Pulsar 使用默认设置,分区分配会采用轮询模式,我们希望覆盖这一行为。 为此,我们提供一个消息路由器对象,并使用其 send 方法。 考虑下面这三种消息路由器的实现。 FooRouter 始终将消息发送到分区 0BarRouter 发送到分区 1BuzzRouter 发送到分区 2。 请注意,我们现在使用的是 PulsarTemplate 类的 sendAsync 方法,该方法返回一个 CompletableFuture。 在运行应用程序时,还需要将生产者的 messageRoutingMode 设置为 CustomPartitionspring.pulsar.producer.message-routing-mode)。spring-doc.cadn.net.cn

在消费者侧,我们使用一个PulsarListener并采用独占订阅类型。 这意味着所有分区的数据都会落在同一个消费者身上,且不提供顺序保证。spring-doc.cadn.net.cn

如果我们希望每个分区由一个单独的消费者消费,可以切换到 failover 订阅模式并添加三个单独的消费者:spring-doc.cadn.net.cn

@PulsarListener(subscriptionName = "hello-pulsar-partitioned-subscription", topics = "hello-pulsar-partitioned", subscriptionType = SubscriptionType.Failover)
public void listen1(String foo) {
    System.out.println("Message Received 1: " + foo);
}

@PulsarListener(subscriptionName = "hello-pulsar-partitioned-subscription", topics = "hello-pulsar-partitioned", subscriptionType = SubscriptionType.Failover)
public void listen2(String foo) {
    System.out.println("Message Received 2: " + foo);
}

@PulsarListener(subscriptionName = "hello-pulsar-partitioned-subscription",  topics = "hello-pulsar-partitioned", subscriptionType = SubscriptionType.Failover)
public void listen3(String foo) {
    System.out.println("Message Received 3: " + foo);
}

当你采用这种方法时,单个分区总是由一个专门的消费者消费。spring-doc.cadn.net.cn

在类似的情况下,如果你想使用 Pulsar 的共享消费者类型,可以使用 shared 订阅类型。 然而,当你使用 shared 模式时,会失去任何顺序保证,因为一个消费者可能会在另一个消费者有机会之前先收到来自所有分区的消息。spring-doc.cadn.net.cn

考虑以下示例:spring-doc.cadn.net.cn

@PulsarListener(subscriptionName = "hello-pulsar-shared-subscription", topics = "hello-pulsar-partitioned", subscriptionType = SubscriptionType.Shared)
public void listen1(String foo) {
    System.out.println("Message Received 1: " + foo);
}

@PulsarListener(subscriptionName = "hello-pulsar-shared-subscription", topics = "hello-pulsar-partitioned", subscriptionType = SubscriptionType.Shared)
public void listen2(String foo) {
    System.out.println("Message Received 2: " + foo);
}