Spring 框架

Spring for Apache Pulsar 提供了对 Pulsar IO (连接器) 和 Pulsar Functions 的基本支持,允许用户定义由 sourcesprocessorssinks 组成的流处理管道。 sourcessinksPulsar IO (连接器) 模型,processorsPulsar Functions 表示。spring-doc.cadn.net.cn

因为 connectors 本质上只是特殊的函数,为简单起见,我们将 sources、sinks 和 functions collectively 称为 "Pulsar Functions"。
Pre-requisites

熟悉度 - 期望受众对 Pulsar IOPulsar Functions 有所熟悉。 如果情况并非如此,查看其入门指南可能会有帮助。spring-doc.cadn.net.cn

功能启用 - 要使用这些功能,必须启用并配置所支持的Apache Pulsar功能(默认情况下是禁用的)。 内置连接器可能还需要在Pulsar集群上安装。spring-doc.cadn.net.cn

查看 Pulsar IOPulsar Functions 的文档以获取更多详情。spring-doc.cadn.net.cn

1. Pulsar Function 管理

The framework provides the PulsarFunctionAdministration component to manage Pulsar functions. When you use the Pulsar Spring Boot starter, you get the PulsarFunctionAdministration auto-configured.spring-doc.cadn.net.cn

默认情况下,应用程序会尝试连接到本地的 Pulsar 实例位于 localhost:8080。 然而,因为它会利用已配置的 PulsarAdministration,请参阅 Pulsar Admin Client 了解可用的客户端选项(包括认证)。 可用的额外配置选项可以通过 spring.pulsar.function.* 应用程序属性进行设置。spring-doc.cadn.net.cn

2. 自动功能管理

在应用程序启动时,框架会在应用程序上下文中查找所有PulsarFunctionPulsarSinkPulsarSource类型的bean。 对于每个bean,会根据相应的Pulsar函数类型、函数配置以及该函数是否已经存在,创建或更新对应的Pulsar函数。 根据函数类型、函数配置和函数是否已经存在,调用相应的API。spring-doc.cadn.net.cn

PulsarFunctionPulsarSinkPulsarSource 这三个 beans 是围绕 Apache Pulsar 配置对象 FunctionConfigSinkConfigSourceConfig 的简单包装器。 由于支持的连接器数量较多(且配置各异),框架不会尝试创建一个与这些各异的 Apache Pulsar 连接器相匹配的配置属性层次结构。 相反,由用户负责提供完整的配置对象,框架会根据提供的配置进行管理(创建/更新)。

在应用程序关闭时,启动期间处理过的所有功能都会执行其停止策略,会被留在服务器上、停止或从 Pulsar 服务器中删除。spring-doc.cadn.net.cn

3. 局限性

3.1. No Magic Pulsar 功能

Pulsar 函数和自定义连接器由自定义应用程序代码(例如,一个 java.util.Function)表示。 没有自动注册自定义代码的魔法支持。 虽然这将非常棒,但存在一些技术挑战,尚未实现。 因此,由用户确保函数(或自定义连接器)在函数配置中指定的位置可用。 例如,如果函数配置的 jar 值为 ./some/path/MyFunction.jar,则函数 jar 文件必须存在于指定路径。spring-doc.cadn.net.cn

3.2. 名称标识符

The name 属性来自函数配置,用于作为标识来确定函数是否已经存在,以决定是执行更新还是创建操作。 因此,如果希望进行函数更新,该名称不应被修改。spring-doc.cadn.net.cn

4. 配置

4.1. Pulsar 函数归档

每个Pulsar功能都由一个实际的存档(例如jar文件)表示。 通过archive属性指定源和接收器的存档路径,通过jar属性指定函数的存档路径。spring-doc.cadn.net.cn

以下规则确定路径的"type":spring-doc.cadn.net.cn

在创建/更新操作中,根据路径 "type" 的不同,所执行的操作如下:spring-doc.cadn.net.cn

4.2. 内置源和接收器

Apache Pulsar 提供了许多源和接收器连接器(即内置连接器)。要使用内置连接器,只需将 archive 设置为 builtin://<connector-type>(例如 builtin://rabbit)。spring-doc.cadn.net.cn

5. 自定义函数

Pulsar 文档 中可以找到有关如何开发和打包自定义函数的详细信息。然而,从高层次上看,要求如下:spring-doc.cadn.net.cn

在函数构建并打包后,有几种方法可以使其可供函数注册使用。spring-doc.cadn.net.cn

5.1. file://

可以将jar文件上传到服务器,然后在函数配置的1属性中通过file://进行引用spring-doc.cadn.net.cn

5.2. 本地

该jar文件可以保留在本地,然后通过在函数配置的jar属性中引用本地路径进行引用。spring-doc.cadn.net.cn

5.3. http://

jar 文件可以通过 HTTP 服务器提供并随后在函数配置中的 http(s):// 属性使用 jar 进行引用spring-doc.cadn.net.cn

5.4. 功能://

The jar 文件可以上传到 Pulsar 包管理器,然后通过在函数配置的 1 属性中引用 0spring-doc.cadn.net.cn

6. 示例

这里有的一些示例展示了如何配置一个PulsarSource bean,从而导致PulsarFunctionAdministration自动创建后端Pulsar source连接器。spring-doc.cadn.net.cn

PulsarSource 使用内置的 Rabbit 连接器
@Bean
PulsarSource rabbitSource() {
    Map<String, Object> configs = new HashMap<>();
    configs.put("host", "my.rabbit.host");
    configs.put("port", 5672);
    configs.put("virtualHost", "/");
    configs.put("username", "guest");
    configs.put("password", "guest");
    configs.put("queueName", "test_rabbit");
    configs.put("connectionName", "test-connection");
    SourceConfig sourceConfig = SourceConfig.builder()
            .tenant("public")
            .namespace("default")
            .name("rabbit-test-source")
            .archive("builtin://rabbitmq")
            .topicName("incoming_rabbit")
            .configs(configs).build();
    return new PulsarSource(sourceConfig, null);
}

下一个示例与上一个示例相同,只是使用了通过 Spring Boot 自动配置的 RabbitProperties 以减轻配置负担。当然,这要求应用程序使用启用了 Rabbit 自动配置的 Spring Boot。spring-doc.cadn.net.cn

使用内置的Rabbit连接器和Spring Boot RabbitProperties的PulsarSource
@Bean
PulsarSource rabbitSourceWithBootProps(RabbitProperties props) {
    Map<String, Object> configs = new HashMap<>();
    configs.put("host", props.determineHost());
    configs.put("port", props.determinePort());
    configs.put("virtualHost", props.determineVirtualHost());
    configs.put("username", props.determineUsername());
    configs.put("password", props.determinePassword());
    configs.put("queueName", "test_rabbit");
    configs.put("connectionName", "test-connection");
    SourceConfig sourceConfig = SourceConfig.builder()
            .tenant("public")
            .namespace("default")
            .name("rabbit-test-source")
            .archive("builtin://rabbitmq")
            .topicName("incoming_rabbit")
            .configs(configs).build();
    return new PulsarSource(sourceConfig, null);
}
对于更详细的示例,请参见 使用 Pulsar Functions 的样本流管道 示例应用程序