The documentation you are viewing is for Dapr v1.15 which is an older version of Dapr. For up-to-date documentation, see the latest version.
使用 CloudEvents 进行消息传递
为了实现消息路由并为每条消息提供额外的上下文,Dapr 采用 CloudEvents 1.0 规范 作为其消息格式。通过 Dapr 发送到主题的任何消息都会自动被包装在 CloudEvents 信封中,使用 Content-Type 头部值 作为 datacontenttype 属性。
Dapr 使用 CloudEvents 为事件负载提供额外的上下文,从而实现以下功能:
- 跟踪
- 事件数据的正确反序列化的内容类型
- 发送应用程序的验证
您可以选择以下三种方法之一通过发布订阅发布 CloudEvent:
- 发送一个发布订阅事件,然后由 Dapr 包装在 CloudEvent 信封中。
- 通过覆盖标准 CloudEvent 属性来替换 Dapr 提供的特定 CloudEvents 属性。
- 将您自己的 CloudEvent 信封作为发布订阅事件的一部分编写。
Dapr 生成的 CloudEvents 示例
向 Dapr 发送发布操作会自动将其包装在一个包含以下字段的 CloudEvent 信封中:
- id
- source
- specversion
- type
- traceparent
- traceid
- tracestate
- topic
- pubsubname
- time
- datacontenttype(可选)
以下示例演示了 Dapr 为发布到 orders 主题的操作生成的 CloudEvent,其中包括:
- 一个 W3C traceid,唯一标识消息
- data和 CloudEvent 的字段,其中数据内容被序列化为 JSON
{
  "topic": "orders",
  "pubsubname": "order_pub_sub",
  "traceid": "00-113ad9c4e42b27583ae98ba698d54255-e3743e35ff56f219-01",
  "tracestate": "",
  "data": {
    "orderId": 1
  },
  "id": "5929aaac-a5e2-4ca1-859c-edfe73f11565",
  "specversion": "1.0",
  "datacontenttype": "application/json; charset=utf-8",
  "source": "checkout",
  "type": "com.dapr.event.sent",
  "time": "2020-09-23T06:23:21Z",
  "traceparent": "00-113ad9c4e42b27583ae98ba698d54255-e3743e35ff56f219-01"
}
作为另一个 v1.0 CloudEvent 的示例,以下显示了在 CloudEvent 消息中以 JSON 序列化的 XML 内容的数据:
{
  "topic": "orders",
  "pubsubname": "order_pub_sub",
  "traceid": "00-113ad9c4e42b27583ae98ba698d54255-e3743e35ff56f219-01",
  "tracestate": "",
  "data" : "<note><to></to><from>user2</from><message>Order</message></note>",
  "id" : "id-1234-5678-9101",
  "specversion" : "1.0",
  "datacontenttype" : "text/xml",
  "subject" : "Test XML Message",
  "source" : "https://example.com/message",
  "type" : "xml.message",
   "time" : "2020-09-23T06:23:21Z"
}
替换 Dapr 生成的 CloudEvents 值
Dapr 自动生成多个 CloudEvent 属性。您可以通过提供以下可选元数据键/值来替换这些生成的 CloudEvent 属性:
- cloudevent.id: 覆盖- id
- cloudevent.source: 覆盖- source
- cloudevent.type: 覆盖- type
- cloudevent.traceid: 覆盖- traceid
- cloudevent.tracestate: 覆盖- tracestate
- cloudevent.traceparent: 覆盖- traceparent
使用这些元数据属性替换 CloudEvents 属性的能力适用于所有发布订阅组件。
示例
例如,要替换代码中上述 CloudEvent 示例中的 source 和 id 值:
with DaprClient() as client:
    order = {'orderId': i}
    # 使用 Dapr 发布订阅发布事件/消息
    result = client.publish_event(
        pubsub_name='order_pub_sub',
        topic_name='orders',
        publish_metadata={'cloudevent.id': 'd99b228f-6c73-4e78-8c4d-3f80a043d317', 'cloudevent.source': 'payment'}
    )
var order = new Order(i);
using var client = new DaprClientBuilder().Build();
// 覆盖 cloudevent 元数据
var metadata = new Dictionary<string,string>() {
    { "cloudevent.source", "payment" },
    { "cloudevent.id", "d99b228f-6c73-4e78-8c4d-3f80a043d317" }
}
// 使用 Dapr 发布订阅发布事件/消息
await client.PublishEventAsync("order_pub_sub", "orders", order, metadata);
Console.WriteLine("Published data: " + order);
await Task.Delay(TimeSpan.FromSeconds(1));
然后 JSON 负载反映新的 source 和 id 值:
{
  "topic": "orders",
  "pubsubname": "order_pub_sub",
  "traceid": "00-113ad9c4e42b27583ae98ba698d54255-e3743e35ff56f219-01",
  "tracestate": "",
  "data": {
    "orderId": 1
  },
  "id": "d99b228f-6c73-4e78-8c4d-3f80a043d317",
  "specversion": "1.0",
  "datacontenttype": "application/json; charset=utf-8",
  "source": "payment",
  "type": "com.dapr.event.sent",
  "time": "2020-09-23T06:23:21Z",
  "traceparent": "00-113ad9c4e42b27583ae98ba698d54255-e3743e35ff56f219-01"
}
重要
虽然您可以替换traceid/traceparent 和 tracestate,但这样做可能会干扰事件跟踪并在跟踪工具中报告不一致的结果。建议使用 Open Telemetry 进行分布式跟踪。了解更多关于分布式跟踪的信息。
发布您自己的 CloudEvent
如果您想使用自己的 CloudEvent,请确保将 datacontenttype 指定为 application/cloudevents+json。
如果应用程序编写的 CloudEvent 不包含 CloudEvent 规范中最低要求的字段,则消息将被拒绝。如果缺少,Dapr 会将以下字段添加到 CloudEvent 中:
- time
- traceid
- traceparent
- tracestate
- topic
- pubsubname
- source
- type
- specversion
您可以向自定义 CloudEvent 添加不属于官方 CloudEvent 规范的其他字段。Dapr 将按原样传递这些字段。
示例
发布一个 CloudEvent 到 orders 主题:
dapr publish --publish-app-id orderprocessing --pubsub order-pub-sub --topic orders --data '{\"orderId\": \"100\"}'
发布一个 CloudEvent 到 orders 主题:
curl -X POST http://localhost:3601/v1.0/publish/order-pub-sub/orders -H "Content-Type: application/cloudevents+json" -d '{"specversion" : "1.0", "type" : "com.dapr.cloudevent.sent", "source" : "testcloudeventspubsub", "subject" : "Cloud Events Test", "id" : "someCloudEventId", "time" : "2021-08-02T09:00:00Z", "datacontenttype" : "application/cloudevents+json", "data" : {"orderId": "100"}}'
发布一个 CloudEvent 到 orders 主题:
Invoke-RestMethod -Method Post -ContentType 'application/cloudevents+json' -Body '{"specversion" : "1.0", "type" : "com.dapr.cloudevent.sent", "source" : "testcloudeventspubsub", "subject" : "Cloud Events Test", "id" : "someCloudEventId", "time" : "2021-08-02T09:00:00Z", "datacontenttype" : "application/cloudevents+json", "data" : {"orderId": "100"}}' -Uri 'http://localhost:3601/v1.0/publish/order-pub-sub/orders'
事件去重
使用 Dapr 创建的 CloudEvents 时,信封中包含一个 id 字段,应用程序可以使用该字段执行消息去重。Dapr 不会自动进行去重处理。Dapr 支持使用本身具备消息去重功能的消息代理。
下一步
- 了解为什么您可能不想使用 CloudEvents
- 试用 发布订阅快速入门
- 发布订阅组件列表
- 阅读 API 参考
Feedback
Was this page helpful?
Glad to hear it! Please tell us how we can improve.
Sorry to hear that. Please tell us how we can improve.