How to: Author and manage Dapr streaming subscriptions in the .NET SDK
Let’s create a subscription to a pub/sub topic or queue at using the streaming capability. We’ll use the simple example provided here, for the following demonstration and walk through it as an explainer of how you can configure message handlers at runtime and which do not require an endpoint to be pre-configured. In this guide, you will:
- Deploy a .NET Web API application (StreamingSubscriptionExample)
- Utilize the Dapr .NET Messaging SDK to subscribe dynamically to a pub/sub topic.
Prerequisites
- Dapr CLI
- Initialized Dapr environment
- .NET 6, .NET 8 or .NET 9 installed
- Dapr.Messaging NuGet package installed to your project
Note
Note that while .NET 6 is the minimum support version of .NET in Dapr v1.15, only .NET 8 and .NET 9 will continue to be supported by Dapr in v1.16 and later.Set up the environment
Clone the .NET SDK repo.
git clone https://github.com/dapr/dotnet-sdk.git
From the .NET SDK root directory, navigate to the Dapr streaming PubSub example.
cd examples/Client/PublishSubscribe
Run the application locally
To run the Dapr application, you need to start the .NET program and a Dapr sidecar. Navigate to the StreamingSubscriptionExample
directory.
cd StreamingSubscriptionExample
We’ll run a command that starts both the Dapr sidecar and the .NET program at the same time.
dapr run --app-id pubsubapp --dapr-grpc-port 4001 --dapr-http-port 3500 -- dotnet run
Dapr listens for HTTP requests at
http://localhost:3500
and internal Jobs gRPC requests athttp://localhost:4001
.
Register the Dapr PubSub client with dependency injection
The Dapr Messaging SDK provides an extension method to simplify the registration of the Dapr PubSub client. Before
completing the dependency injection registration in Program.cs
, add the following line:
var builder = WebApplication.CreateBuilder(args);
//Add anywhere between these two
builder.Services.AddDaprPubSubClient(); //That's it
var app = builder.Build();
It’s possible that you may want to provide some configuration options to the Dapr PubSub client that
should be present with each call to the sidecar such as a Dapr API token, or you want to use a non-standard
HTTP or gRPC endpoint. This be possible through use of an overload of the registration method that allows configuration
of a DaprPublishSubscribeClientBuilder
instance:
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddDaprPubSubClient((_, daprPubSubClientBuilder) => {
daprPubSubClientBuilder.UseDaprApiToken("abc123");
daprPubSubClientBuilder.UseHttpEndpoint("http://localhost:8512"); //Non-standard sidecar HTTP endpoint
});
var app = builder.Build();
Still, it’s possible that whatever values you wish to inject need to be retrieved from some other source, itself registered as a dependency. There’s one more overload you can use to inject an IServiceProvider
into the configuration action method. In the following example, we register a fictional singleton that can retrieve secrets from somewhere and pass it into the configuration method for AddDaprJobClient
so
we can retrieve our Dapr API token from somewhere else for registration here:
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddSingleton<SecretRetriever>();
builder.Services.AddDaprPubSubClient((serviceProvider, daprPubSubClientBuilder) => {
var secretRetriever = serviceProvider.GetRequiredService<SecretRetriever>();
var daprApiToken = secretRetriever.GetSecret("DaprApiToken").Value;
daprPubSubClientBuilder.UseDaprApiToken(daprApiToken);
daprPubSubClientBuilder.UseHttpEndpoint("http://localhost:8512");
});
var app = builder.Build();
Use the Dapr PubSub client using IConfiguration
It’s possible to configure the Dapr PubSub client using the values in your registered IConfiguration
as well without
explicitly specifying each of the value overrides using the DaprPublishSubscribeClientBuilder
as demonstrated in the previous
section. Rather, by populating an IConfiguration
made available through dependency injection the AddDaprPubSubClient()
registration will automatically use these values over their respective defaults.
Start by populating the values in your configuration. This can be done in several different ways as demonstrated below.
Configuration via ConfigurationBuilder
Application settings can be configured without using a configuration source and by instead populating the value in-memory
using a ConfigurationBuilder
instance:
var builder = WebApplication.CreateBuilder();
//Create the configuration
var configuration = new ConfigurationBuilder()
.AddInMemoryCollection(new Dictionary<string, string> {
{ "DAPR_HTTP_ENDPOINT", "http://localhost:54321" },
{ "DAPR_API_TOKEN", "abc123" }
})
.Build();
builder.Configuration.AddConfiguration(configuration);
builder.Services.AddDaprPubSubClient(); //This will automatically populate the HTTP endpoint and API token values from the IConfiguration
Configuration via Environment Variables
Application settings can be accessed from environment variables available to your application.
The following environment variables will be used to populate both the HTTP endpoint and API token used to register the Dapr PubSub client.
Key | Value |
---|---|
DAPR_HTTP_ENDPOINT | http://localhost:54321 |
DAPR_API_TOKEN | abc123 |
var builder = WebApplication.CreateBuilder();
builder.Configuration.AddEnvironmentVariables();
builder.Services.AddDaprPubSubClient();
The Dapr PubSub client will be configured to use both the HTTP endpoint http://localhost:54321
and populate all outbound
requests with the API token header abc123
.
Configuration via prefixed Environment Variables
However, in shared-host scenarios where there are multiple applications all running on the same machine without using containers or in development environments, it’s not uncommon to prefix environment variables. The following example assumes that both the HTTP endpoint and the API token will be pulled from environment variables prefixed with the value “myapp_”. The two environment variables used in this scenario are as follows:
Key | Value |
---|---|
myapp_DAPR_HTTP_ENDPOINT | http://localhost:54321 |
myapp_DAPR_API_TOKEN | abc123 |
These environment variables will be loaded into the registered configuration in the following example and made available without the prefix attached.
var builder = WebApplication.CreateBuilder();
builder.Configuration.AddEnvironmentVariables(prefix: "myapp_");
builder.Services.AddDaprPubSubClient();
The Dapr PubSub client will be configured to use both the HTTP endpoint http://localhost:54321
and populate all outbound
requests with the API token header abc123
.
Use the Dapr PubSub client without relying on dependency injection
While the use of dependency injection simplifies the use of complex types in .NET and makes it easier to
deal with complicated configurations, you’re not required to register the DaprPublishSubscribeClient
in this way.
Rather, you can also elect to create an instance of it from a DaprPublishSubscribeClientBuilder
instance as
demonstrated below:
public class MySampleClass
{
public void DoSomething()
{
var daprPubSubClientBuilder = new DaprPublishSubscribeClientBuilder();
var daprPubSubClient = daprPubSubClientBuilder.Build();
//Do something with the `daprPubSubClient`
}
}
Set up message handler
The streaming subscription implementation in Dapr gives you greater control over handling backpressure from events by leaving the messages in the Dapr runtime until your application is ready to accept them. The .NET SDK supports a high-performance queue for maintaining a local cache of these messages in your application while processing is pending. These messages will persist in the queue until processing either times out for each one or a response action is taken for each (typically after processing succeeds or fails). Until this response action is received by the Dapr runtime, the messages will be persisted by Dapr and made available in case of a service failure.
The various response actions available are as follows:
Response Action | Description |
---|---|
Retry | The event should be delivered again in the future. |
Drop | The event should be deleted (or forwarded to a dead letter queue, if configured) and not attempted again. |
Success | The event should be deleted as it was successfully processed. |
The handler will receive only one message at a time and if a cancellation token is provided to the subscription, this token will be provided during the handler invocation.
The handler must be configured to return a Task<TopicResponseAction>
indicating one of these operations, even if from
a try/catch block. If an exception is not caught by your handler, the subscription will use the response action configured
in the options during subscription registration.
The following demonstrates the sample message handler provided in the example:
Task<TopicResponseAction> HandleMessageAsync(TopicMessage message, CancellationToken cancellationToken = default)
{
try
{
//Do something with the message
Console.WriteLine(Encoding.UTF8.GetString(message.Data.Span));
return Task.FromResult(TopicResponseAction.Success);
}
catch
{
return Task.FromResult(TopicResponseAction.Retry);
}
}
Configure and subscribe to the PubSub topic
Configuration of the streaming subscription requires the name of the PubSub component registered with Dapr, the name
of the topic or queue being subscribed to, the DaprSubscriptionOptions
providing the configuration for the subscription,
the message handler and an optional cancellation token. The only required argument to the DaprSubscriptionOptions
is
the default MessageHandlingPolicy
which consists of a per-event timeout and the TopicResponseAction
to take when
that timeout occurs.
Other options are as follows:
Property Name | Description |
---|---|
Metadata | Additional subscription metadata |
DeadLetterTopic | The optional name of the dead-letter topic to send dropped messages to. |
MaximumQueuedMessages | By default, there is no maximum boundary enforced for the internal queue, but setting this |
property would impose an upper limit. | |
MaximumCleanupTimeout | When the subscription is disposed of or the token flags a cancellation request, this specifies |
the maximum amount of time available to process the remaining messages in the internal queue. |
Subscription is then configured as in the following example:
var messagingClient = app.Services.GetRequiredService<DaprPublishSubscribeClient>();
var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(60)); //Override the default of 30 seconds
var options = new DaprSubscriptionOptions(new MessageHandlingPolicy(TimeSpan.FromSeconds(10), TopicResponseAction.Retry));
var subscription = await messagingClient.SubscribeAsync("pubsub", "mytopic", options, HandleMessageAsync, cancellationTokenSource.Token);
Terminate and clean up subscription
When you’ve finished with your subscription and wish to stop receiving new events, simply await a call to
DisposeAsync()
on your subscription instance. This will cause the client to unregister from additional events and
proceed to finish processing all the events still leftover in the backpressure queue, if any, before disposing of any
internal resources. This cleanup will be limited to the timeout interval provided in the DaprSubscriptionOptions
when
the subscription was registered and by default, this is set to 30 seconds.
Feedback
Was this page helpful?
Glad to hear it! Please tell us how we can improve.
Sorry to hear that. Please tell us how we can improve.