Spring 框架
Spring for Apache Pulsar 提供了对 Pulsar IO (连接器) 和 Pulsar Functions 的基本支持,允许用户定义由 sources、processors 和 sinks 组成的流处理管道。
sources 和 sinks 由 Pulsar IO (连接器) 模型,processors 由 Pulsar Functions 表示。
| 因为 connectors 本质上只是特殊的函数,为简单起见,我们将 sources、sinks 和 functions collectively 称为 "Pulsar Functions"。 |
1. Pulsar Function 管理
The framework provides the PulsarFunctionAdministration component to manage Pulsar functions.
When you use the Pulsar Spring Boot starter, you get the PulsarFunctionAdministration auto-configured.
默认情况下,应用程序会尝试连接到本地的 Pulsar 实例位于 localhost:8080。
然而,因为它会利用已配置的 PulsarAdministration,请参阅 Pulsar Admin Client 了解可用的客户端选项(包括认证)。
可用的额外配置选项可以通过 spring.pulsar.function.* 应用程序属性进行设置。
2. 自动功能管理
在应用程序启动时,框架会在应用程序上下文中查找所有PulsarFunction、PulsarSink和PulsarSource类型的bean。
对于每个bean,会根据相应的Pulsar函数类型、函数配置以及该函数是否已经存在,创建或更新对应的Pulsar函数。
根据函数类型、函数配置和函数是否已经存在,调用相应的API。
PulsarFunction、PulsarSink 和 PulsarSource 这三个 beans 是围绕 Apache Pulsar 配置对象 FunctionConfig、SinkConfig 和 SourceConfig 的简单包装器。
由于支持的连接器数量较多(且配置各异),框架不会尝试创建一个与这些各异的 Apache Pulsar 连接器相匹配的配置属性层次结构。
相反,由用户负责提供完整的配置对象,框架会根据提供的配置进行管理(创建/更新)。 |
在应用程序关闭时,启动期间处理过的所有功能都会执行其停止策略,会被留在服务器上、停止或从 Pulsar 服务器中删除。
3. 局限性
4. 配置
4.1. Pulsar 函数归档
每个Pulsar功能都由一个实际的存档(例如jar文件)表示。
通过archive属性指定源和接收器的存档路径,通过jar属性指定函数的存档路径。
以下规则确定路径的"type":
-
The path is a URL when it starts w/
(file|http|https|function|sink|source):// -
The path is built-in when it starts w/
builtin://(points to one of the provided out-of-the-box connectors) -
如果路径为本地,否则请提供其他路径。
在创建/更新操作中,根据路径 "type" 的不同,所执行的操作如下:
-
当路径是 URL 时,服务器会下载内容
-
当路径为内置
-
当路径为本地时,内容会被上传到服务器
5. 自定义函数
在 Pulsar 文档 中可以找到有关如何开发和打包自定义函数的详细信息。然而,从高层次上看,要求如下:
-
代码使用 Java8
-
代码实现的是要么
java.util.Function要么org.apache.pulsar.functions.api.Function -
打包为uber jar
在函数构建并打包后,有几种方法可以使其可供函数注册使用。
6. 示例
这里有的一些示例展示了如何配置一个PulsarSource bean,从而导致PulsarFunctionAdministration自动创建后端Pulsar source连接器。
@Bean
PulsarSource rabbitSource() {
Map<String, Object> configs = new HashMap<>();
configs.put("host", "my.rabbit.host");
configs.put("port", 5672);
configs.put("virtualHost", "/");
configs.put("username", "guest");
configs.put("password", "guest");
configs.put("queueName", "test_rabbit");
configs.put("connectionName", "test-connection");
SourceConfig sourceConfig = SourceConfig.builder()
.tenant("public")
.namespace("default")
.name("rabbit-test-source")
.archive("builtin://rabbitmq")
.topicName("incoming_rabbit")
.configs(configs).build();
return new PulsarSource(sourceConfig, null);
}
下一个示例与上一个示例相同,只是使用了通过 Spring Boot 自动配置的 RabbitProperties 以减轻配置负担。当然,这要求应用程序使用启用了 Rabbit 自动配置的 Spring Boot。
@Bean
PulsarSource rabbitSourceWithBootProps(RabbitProperties props) {
Map<String, Object> configs = new HashMap<>();
configs.put("host", props.determineHost());
configs.put("port", props.determinePort());
configs.put("virtualHost", props.determineVirtualHost());
configs.put("username", props.determineUsername());
configs.put("password", props.determinePassword());
configs.put("queueName", "test_rabbit");
configs.put("connectionName", "test-connection");
SourceConfig sourceConfig = SourceConfig.builder()
.tenant("public")
.namespace("default")
.name("rabbit-test-source")
.archive("builtin://rabbitmq")
.topicName("incoming_rabbit")
.configs(configs).build();
return new PulsarSource(sourceConfig, null);
}
| 对于更详细的示例,请参见 使用 Pulsar Functions 的样本流管道 示例应用程序 |