JetStream

NATS JetStream 组件的详细说明文档

组件格式

要配置 JetStream 的发布/订阅功能,需要创建一个类型为 pubsub.jetstream 的组件。请参考 pubsub broker 组件文件 以了解 ConsumerID 的自动生成方式。阅读 发布和订阅指南 以获取创建和应用 pubsub 配置的步骤。

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: jetstream-pubsub
spec:
  type: pubsub.jetstream
  version: v1
  metadata:
  - name: natsURL
    value: "nats://localhost:4222"
  - name: jwt # 可选。用于分布式 JWT 认证。
    value: "eyJhbGciOiJ...6yJV_adQssw5c"
  - name: seedKey # 可选。用于分布式 JWT 认证。
    value: "SUACS34K232O...5Z3POU7BNIL4Y"
  - name: tls_client_cert # 可选。用于 TLS 客户端认证。
    value: "/path/to/tls.crt"
  - name: tls_client_key # 可选。用于 TLS 客户端认证。
    value: "/path/to/tls.key"
  - name: token # 可选。用于基于令牌的认证。
    value: "my-token"
  - name: name
    value: "my-conn-name"
  - name: streamName
    value: "my-stream"
  - name: durableName 
    value: "my-durable-subscription"
  - name: queueGroupName
    value: "my-queue-group"
  - name: startSequence
    value: 1
  - name: startTime # Unix 时间戳格式
    value: 1630349391
  - name: flowControl
    value: false
  - name: ackWait
    value: 10s
  - name: maxDeliver
    value: 5
  - name: backOff
    value: "50ms, 1s, 10s"
  - name: maxAckPending
    value: 5000
  - name: replicas
    value: 1
  - name: memoryStorage
    value: false
  - name: rateLimit
    value: 1024
  - name: heartbeat
    value: 15s
  - name: ackPolicy
    value: explicit
  - name: deliverPolicy
    value: all
  - name: domain
    value: hub
  - name: apiPrefix
    value: PREFIX

规格元数据字段

字段 必需 详情 示例
natsURL NATS 服务器地址 URL "nats://localhost:4222"
jwt NATS 分布式认证 JWT "eyJhbGciOiJ...6yJV_adQssw5c"
seedKey NATS 分布式认证种子密钥 "SUACS34K232O...5Z3POU7BNIL4Y"
tls_client_cert NATS TLS 客户端认证证书 "/path/to/tls.crt"
tls_client_key NATS TLS 客户端认证密钥 "/path/to/tls.key"
token NATS 基于令牌的认证 "my-token"
name NATS 连接名称 "my-conn-name"
streamName 要绑定的 JetStream 流名称 "my-stream"
durableName 持久名称 "my-durable"
queueGroupName 队列组名称 "my-queue"
startSequence 开始序列 1
startTime 开始时间,Unix 时间戳格式 1630349391
flowControl 流量控制 true
ackWait 确认等待 10s
maxDeliver 最大投递次数 15
backOff 退避 "50ms, 1s, 5s, 10s"
maxAckPending 最大确认待处理 5000
replicas 副本 3
memoryStorage 内存存储 false
rateLimit 速率限制 1024
heartbeat 心跳 10s
ackPolicy 确认策略 explicit
deliverPolicy 其中之一:all, last, new, sequence, time all
domain JetStream Leafonodes HUB
apiPrefix [JetStream Leafnodes] PREFIX

创建 NATS 服务器


您可以使用 Docker 在本地运行启用 JetStream 的 NATS 服务器:

docker run -d -p 4222:4222 nats:latest -js

然后,您可以通过客户端端口与服务器交互:localhost:4222


使用 helm 在 Kubernetes 上安装 NATS JetStream:

helm repo add nats https://nats-io.github.io/k8s/helm/charts/
helm install --set nats.jetstream.enabled=true my-nats nats/nats

这将在 default 命名空间中安装一个 NATS 服务器。要与 NATS 交互,请找到服务:

kubectl get svc my-nats

有关 helm chart 设置的更多信息,请参阅 Helm chart 文档

创建 JetStream

为特定主题创建 NATS JetStream 是至关重要的。例如,对于在本地运行的 NATS 服务器,使用:

nats -s localhost:4222 stream add myStream --subjects mySubject

示例:竞争消费者模式

假设您希望每条消息仅由具有相同 app-id 的一个应用程序或 pod 处理。通常,consumerID 元数据规范可以帮助您定义竞争消费者。

由于 NATS JetStream 不支持 consumerID,您需要指定 durableNamequeueGroupName 来实现竞争消费者模式。例如:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: pubsub
spec:
  type: pubsub.jetstream
  version: v1
  metadata:
  - name: name
    value: "my-conn-name"
  - name: streamName
    value: "my-stream"
  - name: durableName 
    value: "my-durable-subscription"
  - name: queueGroupName
    value: "my-queue-group"

相关链接