MQTT

MQTT pubsub组件的详细文档

组件格式

要配置MQTT pub/sub,您需要创建一个类型为pubsub.mqtt的组件。请参阅pub/sub broker组件文件以了解ConsumerID的自动生成方式。阅读操作指南:发布和订阅指南以了解如何创建和应用pub/sub配置。

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: mqtt-pubsub
spec:
  type: pubsub.mqtt
  version: v1
  metadata:
  - name: url
    value: "tcp://[username][:password]@host.domain[:port]"
  - name: qos
    value: 1
  - name: retain
    value: "false"
  - name: cleanSession
    value: "false"
  - name: consumerID
    value: "channel1"

规格元数据字段

字段 必需 详情 示例
url Y MQTT broker的地址。可以使用secretKeyRef来引用密钥。
对于非TLS通信,使用**tcp://** URI方案。
对于TLS通信,使用**ssl://** URI方案。
"tcp://[username][:password]@host.domain[:port]"
consumerID N 用于连接到MQTT broker的消费者连接的客户端ID。默认为Dapr应用ID。
注意:如果未设置producerID,则在此值后附加-consumer用于消费者连接
可以设置为字符串值(如上例中的"channel1")或字符串格式值(如"{podName}"等)。查看可以在组件元数据中使用的所有模板标签。
producerID N 用于连接到MQTT broker的生产者连接的客户端ID。默认为{consumerID}-producer "myMqttProducerApp"
qos N 表示消息的服务质量级别(QoS)(更多信息)。默认为1 0, 1, 2
retain N 定义broker是否将消息保存为指定主题的最后已知良好值。默认为"false" "true", "false"
cleanSession N 如果为"true",则在连接消息中设置clean_session标志到MQTT broker(更多信息)。默认为"false" "true", "false"
caCert 使用TLS时必需 用于验证服务器TLS证书的证书颁发机构(CA)证书,格式为PEM。 "-----BEGIN CERTIFICATE-----\n<base64-encoded DER>\n-----END CERTIFICATE-----"
clientCert 使用TLS时必需 TLS客户端证书,格式为PEM。必须与clientKey一起使用。 "-----BEGIN CERTIFICATE-----\n<base64-encoded DER>\n-----END CERTIFICATE-----"
clientKey 使用TLS时必需 TLS客户端密钥,格式为PEM。必须与clientCert一起使用。可以使用secretKeyRef来引用密钥。 "-----BEGIN RSA PRIVATE KEY-----\n<base64-encoded PKCS8>\n-----END RSA PRIVATE KEY-----"

启用消息传递重试

MQTT pub/sub组件不支持内置的重试策略。这意味着sidecar只会向服务发送一次消息。如果服务标记消息为未处理,则消息不会被确认回broker。只有当broker重新发送消息时,才会重试。

要使Dapr使用更复杂的重试策略,可以将重试弹性策略应用于MQTT pub/sub组件。

两种重试方式之间有一个关键区别:

  1. 未确认消息的重新传递完全依赖于broker。Dapr不保证这一点。一些broker如emqxvernemq等支持它,但它不是MQTT3规范的一部分。

  2. 使用重试弹性策略使得同一个Dapr sidecar重试重新传递消息。因此是同一个Dapr sidecar和同一个应用接收相同的消息。

使用TLS进行通信

要配置使用TLS进行通信,请确保MQTT broker(例如,mosquitto)配置为支持证书,并在组件配置中提供caCertclientCertclientKey元数据。例如:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: mqtt-pubsub
spec:
  type: pubsub.mqtt
  version: v1
  metadata:
  - name: url
    value: "ssl://host.domain[:port]"
  - name: qos
    value: 1
  - name: retain
    value: "false"
  - name: cleanSession
    value: "false"
  - name: caCert
    value: ${{ myLoadedCACert }}
  - name: clientCert
    value: ${{ myLoadedClientCert }}
  - name: clientKey
    secretKeyRef:
      name: myMqttClientKey
      key: myMqttClientKey
auth:
  secretStore: <SECRET_STORE_NAME>

注意,虽然caCertclientCert值可能不是密钥,但为了方便起见,它们也可以从Dapr密钥存储中引用。

消费共享主题

在消费共享主题时,每个消费者必须有一个唯一标识符。默认情况下,应用ID用于唯一标识每个消费者和发布者。在selfhost模式下,调用每个dapr run时使用不同的应用ID即可使它们从同一个共享主题中消费。然而,在Kubernetes上,应用pod的多个实例将共享相同的应用ID,禁止所有实例消费同一个主题。为了解决这个问题,配置组件的consumerID元数据为{uuid}标签,这将在启动时为每个实例提供一个随机生成的consumerID值。例如:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: mqtt-pubsub
spec:
  type: pubsub.mqtt
  version: v1
  metadata:
    - name: consumerID
      value: "{uuid}"
    - name: url
      value: "tcp://admin:public@localhost:1883"
    - name: qos
      value: 1
    - name: retain
      value: "false"
    - name: cleanSession
      value: "true"

注意,在这种情况下,每次Dapr重启时,consumer ID的值都是随机的,因此我们也将cleanSession设置为true。

创建MQTT broker


您可以使用Docker本地运行MQTT broker:

docker run -d -p 1883:1883 -p 9001:9001 --name mqtt eclipse-mosquitto:1.6

然后您可以使用客户端端口与服务器交互:mqtt://localhost:1883


您可以在kubernetes中使用以下yaml运行MQTT broker:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: mqtt-broker
  labels:
    app-name: mqtt-broker
spec:
  replicas: 1
  selector:
    matchLabels:
      app-name: mqtt-broker
  template:
    metadata:
      labels:
        app-name: mqtt-broker
    spec:
      containers:
        - name: mqtt
          image: eclipse-mosquitto:1.6
          imagePullPolicy: IfNotPresent
          ports:
            - name: default
              containerPort: 1883
              protocol: TCP
            - name: websocket
              containerPort: 9001
              protocol: TCP
---
apiVersion: v1
kind: Service
metadata:
  name: mqtt-broker
  labels:
    app-name: mqtt-broker
spec:
  type: ClusterIP
  selector:
    app-name: mqtt-broker
  ports:
    - port: 1883
      targetPort: default
      name: default
      protocol: TCP
    - port: 9001
      targetPort: websocket
      name: websocket
      protocol: TCP

然后您可以使用客户端端口与服务器交互:tcp://mqtt-broker.default.svc.cluster.local:1883

相关链接