This is documentation on a preview feature.

发布/订阅

创建发布/订阅组件只需几个基本步骤。

添加发布/订阅命名空间

添加与发布/订阅相关的命名空间的 using 语句。

using Dapr.PluggableComponents.Components;
using Dapr.PluggableComponents.Components.PubSub;

实现 IPubSub

创建一个实现 IPubSub 接口的类。

internal sealed class MyPubSub : IPubSub
{
    public Task InitAsync(MetadataRequest request, CancellationToken cancellationToken = default)
    {
        // 使用配置的元数据初始化组件...
    }

    public Task PublishAsync(PubSubPublishRequest request, CancellationToken cancellationToken = default)
    {
        // 将消息发送到指定的“主题”...
    }

    public Task PullMessagesAsync(PubSubPullMessagesTopic topic, MessageDeliveryHandler<string?, PubSubPullMessagesResponse> deliveryHandler, CancellationToken cancellationToken = default)
    {
        // 持续检查主题中的消息并将其传递给 Dapr 运行时,直到取消为止...
    }
}

PullMessagesAsync() 方法是一个“长时间运行”的调用,因为在取消之前不期望返回(例如,通过 cancellationToken)。需要从中提取消息的“主题”通过 topic 参数传递,而传递到 Dapr 运行时是通过 deliveryHandler 回调执行的。传递机制允许组件在应用程序(由 Dapr 运行时服务)确认消息处理时接收通知。

    public async Task PullMessagesAsync(PubSubPullMessagesTopic topic, MessageDeliveryHandler<string?, PubSubPullMessagesResponse> deliveryHandler, CancellationToken cancellationToken = default)
    {
        TimeSpan pollInterval = // 轮询间隔(可以从初始化元数据中获取)...

        // 持续轮询主题直到取消...
        while (!cancellationToken.IsCancellationRequested)
        {
            var messages = // 从主题中轮询获取消息...

            foreach (var message in messages)
            {
                // 将消息传递给 Dapr 运行时...
                await deliveryHandler(
                    new PubSubPullMessagesResponse(topicName)
                    {
                        // 设置消息内容...
                    },
                    // 当应用程序确认消息时调用的回调...
                    async errorMessage =>
                    {
                        // 空消息表示应用程序成功处理了消息...
                        if (String.IsNullOrEmpty(errorMessage))
                        {
                            // 从主题中删除消息...
                        }
                    });
            }

            // 等待下一个轮询(或取消)...
            await Task.Delay(pollInterval, cancellationToken);
        }
    }

注册发布/订阅组件

在主程序文件中(例如,Program.cs),使用应用程序服务注册发布/订阅组件。

using Dapr.PluggableComponents;

var app = DaprPluggableComponentsApplication.Create();

app.RegisterService(
    "<socket name>",
    serviceBuilder =>
    {
        serviceBuilder.RegisterPubSub<MyPubSub>();
    });

app.Run();