如何:在 .NET SDK 中编写和管理 Dapr 流式订阅
我们来创建一个使用流式功能的发布/订阅主题或队列的订阅。我们将使用此处提供的简单示例,进行演示,并逐步讲解如何在运行时配置消息处理程序,而无需预先配置端点。在本指南中,您将会学习如何:
- 部署一个 .NET Web API 应用程序 (StreamingSubscriptionExample)
- 使用 Dapr .NET Messaging SDK 动态订阅发布/订阅主题。
前提条件
- Dapr CLI
- 已初始化的 Dapr 环境
- 安装 .NET 6、.NET 8 或 .NET 9
- 项目中安装 Dapr.Messaging NuGet 包
注意
请注意,虽然 .NET 6 是 Dapr v1.15 中支持的最低版本,但只有 .NET 8 和 .NET 9 将在 v1.16 及更高版本中继续受到支持。设置环境
克隆 .NET SDK 仓库。
git clone https://github.com/dapr/dotnet-sdk.git
从 .NET SDK 根目录,导航到 Dapr 流式发布/订阅示例。
cd examples/Client/PublishSubscribe
本地运行应用程序
要运行 Dapr 应用程序,您需要启动 .NET 程序和一个 Dapr sidecar。导航到 StreamingSubscriptionExample
目录。
cd StreamingSubscriptionExample
我们将运行一个命令,同时启动 Dapr sidecar 和 .NET 程序。
dapr run --app-id pubsubapp --dapr-grpc-port 4001 --dapr-http-port 3500 -- dotnet run
Dapr 监听 HTTP 请求在
http://localhost:3500
,而 gRPC 请求在http://localhost:4001
。
使用依赖注入注册 Dapr PubSub 客户端
Dapr Messaging SDK 提供了一个扩展方法来简化 Dapr PubSub 客户端的注册。在 Program.cs
中完成依赖注入注册之前,添加以下行:
var builder = WebApplication.CreateBuilder(args);
//可以在这两行之间的任何位置添加
builder.Services.AddDaprPubSubClient(); //就是这样
var app = builder.Build();
您可能希望为 Dapr PubSub 客户端提供一些配置选项,这些选项应在每次调用 sidecar 时存在,例如 Dapr API 令牌,或者您希望使用非标准的 HTTP 或 gRPC 端点。这可以通过使用允许配置 DaprPublishSubscribeClientBuilder
实例的注册方法重载来实现:
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddDaprPubSubClient((_, daprPubSubClientBuilder) => {
daprPubSubClientBuilder.UseDaprApiToken("abc123");
daprPubSubClientBuilder.UseHttpEndpoint("http://localhost:8512"); //非标准 sidecar HTTP 端点
});
var app = builder.Build();
尽管如此,您可能希望注入的任何值需要从其他来源检索,该来源本身注册为依赖项。您可以使用另一个重载将 IServiceProvider
注入到配置操作方法中。在以下示例中,我们注册了一个虚构的单例,可以从某处检索 secret 并将其传递到 AddDaprJobClient
的配置方法中,以便我们可以从其他地方检索我们的 Dapr API 令牌以在此处注册:
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddSingleton<SecretRetriever>();
builder.Services.AddDaprPubSubClient((serviceProvider, daprPubSubClientBuilder) => {
var secretRetriever = serviceProvider.GetRequiredService<SecretRetriever>();
var daprApiToken = secretRetriever.GetSecret("DaprApiToken").Value;
daprPubSubClientBuilder.UseDaprApiToken(daprApiToken);
daprPubSubClientBuilder.UseHttpEndpoint("http://localhost:8512");
});
var app = builder.Build();
使用 IConfiguration 使用 Dapr PubSub 客户端
可以使用注册的 IConfiguration
中的值配置 Dapr PubSub 客户端,而无需显式指定每个值覆盖,如前一节中使用 DaprPublishSubscribeClientBuilder
所示。相反,通过填充通过依赖注入提供的 IConfiguration
,AddDaprPubSubClient()
注册将自动使用这些值覆盖其各自的默认值。
首先在您的配置中填充值。这可以通过多种不同的方式完成,如下所示。
通过 ConfigurationBuilder
配置
应用程序设置可以在不使用配置源的情况下配置,而是通过使用 ConfigurationBuilder
实例在内存中填充值:
var builder = WebApplication.CreateBuilder();
//创建配置
var configuration = new ConfigurationBuilder()
.AddInMemoryCollection(new Dictionary<string, string> {
{ "DAPR_HTTP_ENDPOINT", "http://localhost:54321" },
{ "DAPR_API_TOKEN", "abc123" }
})
.Build();
builder.Configuration.AddConfiguration(configuration);
builder.Services.AddDaprPubSubClient(); //这将自动从 IConfiguration 填充 HTTP 端点和 API 令牌值
通过环境变量配置
应用程序设置可以从可用于您的应用程序的环境变量中访问。
以下环境变量将用于填充用于注册 Dapr PubSub 客户端的 HTTP 端点和 API 令牌。
键 | 值 |
---|---|
DAPR_HTTP_ENDPOINT | http://localhost:54321 |
DAPR_API_TOKEN | abc123 |
var builder = WebApplication.CreateBuilder();
builder.Configuration.AddEnvironmentVariables();
builder.Services.AddDaprPubSubClient();
Dapr PubSub 客户端将被配置为使用 HTTP 端点 http://localhost:54321
并用 API 令牌头 abc123
填充所有出站请求。
通过前缀环境变量配置
然而,在共享主机场景中,多个应用程序都在同一台机器上运行而不使用容器或在开发环境中,前缀环境变量并不罕见。以下示例假设 HTTP 端点和 API 令牌都将从前缀为 “myapp_” 的环境变量中提取。在此场景中使用的两个环境变量如下:
键 | 值 |
---|---|
myapp_DAPR_HTTP_ENDPOINT | http://localhost:54321 |
myapp_DAPR_API_TOKEN | abc123 |
这些环境变量将在以下示例中加载到注册的配置中,并在没有附加前缀的情况下提供。
var builder = WebApplication.CreateBuilder();
builder.Configuration.AddEnvironmentVariables(prefix: "myapp_");
builder.Services.AddDaprPubSubClient();
Dapr PubSub 客户端将被配置为使用 HTTP 端点 http://localhost:54321
并用 API 令牌头 abc123
填充所有出站请求。
不依赖于依赖注入使用 Dapr PubSub 客户端
虽然使用依赖注入简化了 .NET 中复杂类型的使用,并使处理复杂配置变得更容易,但您不需要以这种方式注册 DaprPublishSubscribeClient
。相反,您还可以选择从 DaprPublishSubscribeClientBuilder
实例创建它的实例,如下所示:
public class MySampleClass
{
public void DoSomething()
{
var daprPubSubClientBuilder = new DaprPublishSubscribeClientBuilder();
var daprPubSubClient = daprPubSubClientBuilder.Build();
//使用 `daprPubSubClient` 做一些事情
}
}
设置消息处理程序
Dapr 中的流式订阅实现使您可以更好地控制事件的背压处理,通过在您的应用程序准备好接受它们之前将消息保留在 Dapr 运行时中。 .NET SDK 支持一个高性能队列,用于在处理挂起时在您的应用程序中维护这些消息的本地缓存。这些消息将保留在队列中,直到每个消息的处理超时或采取响应操作(通常在处理成功或失败后)。在 Dapr 运行时收到此响应操作之前,消息将由 Dapr 保留,并在服务故障时可用。
可用的各种响应操作如下:
响应操作 | 描述 |
---|---|
重试 | 事件应在将来再次传递。 |
丢弃 | 事件应被删除(或转发到死信队列,如果已配置)并且不再尝试。 |
成功 | 事件应被删除,因为它已成功处理。 |
处理程序将一次只接收一条消息,如果为订阅提供了取消令牌,则将在处理程序调用期间提供此令牌。
处理程序必须配置为返回一个 Task<TopicResponseAction>
,指示这些操作之一,即使是从 try/catch 块中返回。如果您的处理程序未捕获异常,订阅将在订阅注册期间配置的选项中使用响应操作。
以下演示了示例中提供的示例消息处理程序:
Task<TopicResponseAction> HandleMessageAsync(TopicMessage message, CancellationToken cancellationToken = default)
{
try
{
//对消息做一些事情
Console.WriteLine(Encoding.UTF8.GetString(message.Data.Span));
return Task.FromResult(TopicResponseAction.Success);
}
catch
{
return Task.FromResult(TopicResponseAction.Retry);
}
}
配置并订阅 PubSub 主题
流式订阅的配置需要在 Dapr 中注册的 PubSub 组件的名称、要订阅的主题或队列的名称、提供订阅配置的 DaprSubscriptionOptions
、消息处理程序和可选的取消令牌。 DaprSubscriptionOptions
的唯一必需参数是默认的 MessageHandlingPolicy
,它由每个事件的超时和超时时要采取的 TopicResponseAction
组成。
其他选项如下:
属性名称 | 描述 |
---|---|
Metadata | 额外的订阅元数据 |
DeadLetterTopic | 发送丢弃消息的死信主题的可选名称。 |
MaximumQueuedMessages | 默认情况下,内部队列没有强制的最大边界,但设置此属性将施加上限。 |
MaximumCleanupTimeout | 当订阅被处理或令牌标记取消请求时,这指定了处理内部队列中剩余消息的最大时间。 |
然后按以下示例配置订阅:
var messagingClient = app.Services.GetRequiredService<DaprPublishSubscribeClient>();
var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(60)); //覆盖默认的30秒
var options = new DaprSubscriptionOptions(new MessageHandlingPolicy(TimeSpan.FromSeconds(10), TopicResponseAction.Retry));
var subscription = await messagingClient.SubscribeAsync("pubsub", "mytopic", options, HandleMessageAsync, cancellationTokenSource.Token);
终止并清理订阅
当您完成订阅并希望停止接收新事件时,只需等待对订阅实例的 DisposeAsync()
调用。这将导致客户端取消注册其他事件,并在处理所有仍在背压队列中的事件(如果有)后,处理任何内部资源。此清理将限于在注册订阅时提供的 DaprSubscriptionOptions
中的超时间隔,默认情况下设置为 30 秒。
Feedback
Was this page helpful?
Glad to hear it! Please tell us how we can improve.
Sorry to hear that. Please tell us how we can improve.