Pulsar 客户端
当你使用 Pulsar Spring Boot Starter 时,会自动配置 PulsarClient。
默认情况下,应用程序会尝试连接到本地的 Pulsar 实例,地址为 pulsar://localhost:6650。
可以通过设置 spring.pulsar.client.service-url 属性为其他值来调整该配置。
| 该值必须是一个有效的 Pulsar 协议 URL |
您可以通过指定任何的 spring.pulsar.client.* 应用属性进一步配置客户端。
如果未使用Starters,您需要自行配置并注册 PulsarClient。
有一个 DefaultPulsarClientFactory 接受一个 builder 自定义器,可用于帮助完成此操作。 |
1. TLS 加密(SSL)
默认情况下,Pulsar 客户端使用明文与 Pulsar 服务通信。 以下部分描述了如何配置 Pulsar 客户端使用 TLS 加密(SSL)。 前提是 Broker 也已配置使用 TLS 加密。
The Spring Boot auto-configuration 目前不支持任何 TLS/SSL 配置属性。
您可以提供一个 PulsarClientBuilderCustomizer 以在 Pulsar 客户端构建器上设置必要的属性。
Pulsar 支持 Privacy Enhanced Mail (PEM) 和 Java KeyStore (JKS) 证书格式。
按照以下步骤配置 TLS:
-
调整 Pulsar 客户端服务 URL 以使用
pulsar+ssl://方案和 TLS 端口(通常为6651)。 -
调整管理员客户端服务URL以使用
https://方案和TLS网页端口(通常为8443)。 -
提供客户端构建器自定义器(设置相关属性到构建器上)。
你可以在此上方的文档中找到更多关于上述内容的信息:Pulsar TLS 加密 文档。
2. 认证
要连接需要身份验证的 Pulsar 集群,您需要指定要使用的身份验证插件以及该插件所需的任何参数。 当使用 Spring Boot 自动配置时,可以通过配置属性(在大多数情况下)设置插件和插件参数。
|
您需要确保在 例如,如果你想为 |
|
使用环境变量设置认证参数通常存在问题,因为在转换过程中大小写信息会丢失。
例如,考虑以下通过环境变量设置的
当 Spring Boot 加载此属性时,它会使用
|
当不使用 Spring Boot 自动配置时,您可以使用 org.apache.pulsar.client.api.AuthenticationFactory 来创建认证,然后将其直接设置在由客户端工厂提供的客户端自定义器的 Pulsar 客户端构建器上。
以下列表展示了如何配置每种受支持的认证机制。
点击 here 对于 Athenz
spring:
pulsar:
client:
authentication:
plugin-class-name: org.apache.pulsar.client.impl.auth.AuthenticationAthenz
param:
tenantDomain: ...
tenantService: ...
providerDomain: ...
privateKey: ...
keyId: ...
| 这也要求使用 TLS 加密。 |
点击 这里 以 Token
spring:
pulsar:
client:
authentication:
plugin-class-name: org.apache.pulsar.client.impl.auth.AuthenticationToken
param:
token: some-token-goes-here
点击 here 对于 Basic
spring:
pulsar:
client:
authentication:
plugin-class-name: org.apache.pulsar.client.impl.auth.AuthenticationBasic
param:
userId: ...
password: ...
点击 here 对于 OAuth2
spring:
pulsar:
client:
authentication:
plugin-class-name: org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
param:
issuerUrl: ...
privateKey: ...
audience: ...
scope: ...
点击 here 为 Sasl
spring:
pulsar:
client:
authentication:
plugin-class-name: org.apache.pulsar.client.impl.auth.AuthenticationSasl
param:
saslJaasClientSectionName: ...
serverType: ...
点击 here 以获取 mTLS (PEM)
Because this option requires TLS encryption, which already requires you to provide a client builder customizer, it is recommended to simply add the authentication directly on the client builder in your provided TLS customizer.
You can use the org.apache.pulsar.client.api.AuthenticationFactory to help create the authentication object as follows: |
Authentication auth = AuthenticationFactory.TLS("/path/to/my-role.cert.pem", "/path/to/my-role.key-pk8.pem");
查看 mTLS (PEM) 的官方 Pulsar 文档。
点击 here 为 mTLS (JKS)
Because this option requires TLS encryption, which already requires you to provide a client builder customizer, it is recommended to simply add the authentication directly on the client builder in your provided TLS customizer.
You can use the org.apache.pulsar.client.api.AuthenticationFactory to help create the authentication object as follows: |
Authentication auth = AuthenticationFactory.create(
"org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls",
Map.of("keyStoreType", "JKS", "keyStorePath", "/path/to/my/keystore.jks", "keyStorePassword", "clientpw"));
查看 mTLS (JKS) 的官方文档。
You can find more information on each of the support plugins and their required properties in the official Pulsar 安全 文档。
3. 自动集群级故障转移
The Pulsar Spring Boot Starter 也自动配置了 PulsarClient 用于 自动集群级故障转移。
您可以使用 spring.pulsar.client.failover.* 应用属性来配置集群级的故障转移。
以下示例配置了客户端,包含一个主集群和两个备份集群。
spring:
pulsar:
client:
service-url: "pulsar://my.primary.server:6650"
failover:
delay: 30s
switch-back-delay: 15s
check-interval: 1s
backup-clusters:
- service-url: "pulsar://my.second.server:6650"
authentication:
plugin-class-name: org.apache.pulsar.client.impl.auth.AuthenticationToken
param:
token: "my-token"
- service-url: "pulsar://my.third.server:6650"
| 除客户端配置外,还需要满足一些在使用此功能时必须满足的代理服务器前提条件。 |
在不使用 Spring Boot 自动配置的情况下,您可以提供一个客户端自定义器来配置客户端以实现集群级故障转移。