The documentation you are viewing is for Dapr v1.15 which is an older version of Dapr. For up-to-date documentation, see the latest version.
RocketMQ
Component format
To set up RocketMQ pub/sub, create a component of type pubsub.rocketmq. See the pub/sub broker component file to learn how ConsumerID is automatically generated. Read the How-to: Publish and Subscribe guide on how to create and apply a pub/sub configuration.
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: rocketmq-pubsub
spec:
  type: pubsub.rocketmq
  version: v1
  metadata:
    - name: instanceName
      value: dapr-rocketmq-test
    - name: consumerGroup
      value: dapr-rocketmq-test-g-c
    - name: producerGroup 
      value: dapr-rocketmq-test-g-p
    - name: consumerID
      value: channel1
    - name: nameSpace
      value: dapr-test
    - name: nameServer
      value: "127.0.0.1:9876,127.0.0.2:9876"
    - name: retries
      value: 3
    - name: consumerModel
      value: "clustering"
    - name: consumeOrderly
      value: false
Warning
The above example uses secrets as plain strings. It is recommended to use a secret store for the secrets as described here.Spec metadata fields
| Field | Required | Details | default | Example | 
|---|---|---|---|---|
| instanceName | N | Instance name | time.Now().String() | dapr-rocketmq-test | 
| consumerGroup | N | Consumer group name. Recommend. If producerGroupisnull,groupNameis used. | dapr-rocketmq-test-g-c  | |
| producerGroup (consumerID) | N | Producer group name. Recommended. If producerGroupisnull,consumerIDis used. IfconsumerIDalso is null,groupNameis used. | dapr-rocketmq-test-g-p | |
| consumerID | N | Consumer ID (consumer tag) organizes one or more consumers into a group. Consumers with the same consumer ID work as one virtual consumer; for example, a message is processed only once by one of the consumers in the group. If the consumerIDis not provided, the Dapr runtime set it to the Dapr application ID (appID) value. | Can be set to string value (such as "channel1"in the example above) or string format value (such as"{podName}", etc.). See all of template tags you can use in your component metadata. | |
| groupName | N | Consumer/Producer group name. Depreciated. | dapr-rocketmq-test-g | |
| nameSpace | N | RocketMQ namespace | dapr-rocketmq | |
| nameServerDomain | N | RocketMQ name server domain | https://my-app.net:8080/nsaddr | |
| nameServer | N | RocketMQ name server, separated by “,” or “;” | 127.0.0.1:9876;127.0.0.2:9877,127.0.0.3:9877 | |
| accessKey | N | Access Key (Username) | "admin" | |
| secretKey | N | Secret Key (Password) | "password" | |
| securityToken | N | Security Token | ||
| retries | N | Number of retries to send a message to broker | 3 | 3 | 
| producerQueueSelector (queueSelector) | N | Producer Queue selector. There are five implementations of queue selector: hash,random,manual,roundRobin,dapr. | dapr | hash | 
| consumerModel | N | Message model that defines how messages are delivered to each consumer client. RocketMQ supports two message models: clusteringandbroadcasting. | clustering | broadcasting,clustering | 
| fromWhere (consumeFromWhere) | N | Consuming point on consumer booting. There are three consuming points: CONSUME_FROM_LAST_OFFSET,CONSUME_FROM_FIRST_OFFSET,CONSUME_FROM_TIMESTAMP | CONSUME_FROM_LAST_OFFSET | CONSUME_FROM_LAST_OFFSET | 
| consumeTimestamp | N | Backtracks consumption time with second precision. Time format is yyyymmddhhmmss. For example,20131223171201implies the time of 17:12:01 and date of December 23, 2013 |  time.Now().Add(time.Minute * (-30)).Format("20060102150405") | 20131223171201 | 
| consumeOrderly | N | Determines if it’s an ordered message using FIFO order. | false | false | 
| consumeMessageBatchMaxSize | N | Batch consumption size out of range [1, 1024] | 512 | 10 | 
| consumeConcurrentlyMaxSpan | N | Concurrently max span offset. This has no effect on sequential consumption. Range: [1, 65535] | 1000 | 1000 | 
| maxReconsumeTimes | N | Max re-consume times. -1means 16 times. If messages are re-consumed more than {@link maxReconsumeTimes} before success, they’ll be directed to a deletion queue. | Orderly message is MaxInt32; Concurrently message is16 | 16 | 
| autoCommit | N | Enable auto commit | true | false | 
| consumeTimeout | N | Maximum amount of time a message may block the consuming thread. Time unit: Minute | 15 | 15 | 
| consumerPullTimeout | N | The socket timeout in milliseconds | ||
| pullInterval | N | Message pull interval | 100 | 100 | 
| pullBatchSize | N | The number of messages pulled from the broker at a time. If pullBatchSizeisnull, useConsumerBatchSize.pullBatchSizeout of range[1, 1024] | 32 | 10 | 
| pullThresholdForQueue | N | Flow control threshold on queue level. Each message queue will cache a maximum of 1000 messages by default. Consider the PullBatchSize- the instantaneous value may exceed the limit. Range:[1, 65535] | 1024 | 1000 | 
| pullThresholdForTopic | N | Flow control threshold on topic level. The value of pullThresholdForQueuewill be overwritten and calculated based onpullThresholdForTopicif it isn’t unlimited. For example, if the value ofpullThresholdForTopicis 1000 and 10 message queues are assigned to this consumer, thenpullThresholdForQueuewill be set to 100. Range:[1, 6553500] | -1(Unlimited) | 10 | 
| pullThresholdSizeForQueue | N | Limit the cached message size on queue level. Consider the pullBatchSize- the instantaneous value may exceed the limit. The size of a message is only measured by message body, so it’s not accurate. Range:[1, 1024] | 100 | 100 | 
| pullThresholdSizeForTopic | N | Limit the cached message size on topic level. The value of pullThresholdSizeForQueuewill be overwritten and calculated based onpullThresholdSizeForTopicif it isn’t unlimited. For example, if the value ofpullThresholdSizeForTopicis 1000 MiB and 10 message queues are assigned to this consumer, thenpullThresholdSizeForQueuewill be set to 100 MiB. Range:[1, 102400] | -1 | 100 | 
| content-type | N | Message content type. | "text/plain" | "application/cloudevents+json; charset=utf-8","application/octet-stream" | 
| logLevel | N | Log level | warn | info | 
| sendTimeOut | N | Send message timeout to connect RocketMQ’s broker, measured in nanoseconds. Deprecated. | 3 seconds | 10000000000 | 
| sendTimeOutSec | N | Timeout duration for publishing a message in seconds. If sendTimeOutSecisnull,sendTimeOutis used. | 3 seconds | 3 | 
| mspProperties | N | The RocketMQ message properties in this collection are passed to the APP in Data Separate multiple properties with “,” | key,mkey | 
For backwards-compatibility reasons, the following values in the metadata are supported, although their use is discouraged.
| Field (supported but deprecated) | Required | Details | Example | 
|---|---|---|---|
| groupName | N | Producer group name for RocketMQ publishers | "my_unique_group_name" | 
| sendTimeOut | N | Timeout duration for publishing a message in nanoseconds | 0 | 
| consumerBatchSize | N | The number of messages pulled from the broker at a time | 32 | 
Setup RocketMQ
See https://rocketmq.apache.org/docs/quick-start/ to setup a local RocketMQ instance.
Per-call metadata fields
Partition Key
When invoking the RocketMQ pub/sub, it’s possible to provide an optional partition key by using the metadata query param in the request url.
You need to specify rocketmq-tag , "rocketmq-key" , rocketmq-shardingkey , rocketmq-queue in metadata
Example:
curl -X POST http://localhost:3500/v1.0/publish/myRocketMQ/myTopic?metadata.rocketmq-tag=?&metadata.rocketmq-key=?&metadata.rocketmq-shardingkey=key&metadata.rocketmq-queue=1 \
  -H "Content-Type: application/json" \
  -d '{
        "data": {
          "message": "Hi"
        }
      }'
QueueSelector
The RocketMQ component contains a total of five queue selectors. The RocketMQ client provides the following queue selectors:
- HashQueueSelector
- RandomQueueSelector
- RoundRobinQueueSelector
- ManualQueueSelector
To learn more about these RocketMQ client queue selectors, read the RocketMQ documentation.
The Dapr RocketMQ component implements the following queue selector:
- DaprQueueSelector
This article focuses on the design of DaprQueueSelector.
DaprQueueSelector
DaprQueueSelector integrates three queue selectors:
- HashQueueSelector
- RoundRobinQueueSelector
- ManualQueueSelector
DaprQueueSelector gets the queue id from the request parameter. You can set the queue id by running the following:
http://localhost:3500/v1.0/publish/myRocketMQ/myTopic?metadata.rocketmq-queue=1
The ManualQueueSelector is implemented using the method above.
Next, the DaprQueueSelector tries to:
- Get a ShardingKey
- Hash the ShardingKeyto determine the queue id.
You can set the ShardingKey by doing the following:
http://localhost:3500/v1.0/publish/myRocketMQ/myTopic?metadata.rocketmq-shardingkey=key
If the ShardingKey does not exist, the RoundRobin algorithm is used to determine the queue id.
Related links
- Basic schema for a Dapr component
- Pub/Sub building block
- Read this guide for instructions on configuring pub/sub components
Feedback
Was this page helpful?
Glad to hear it! Please tell us how we can improve.
Sorry to hear that. Please tell us how we can improve.