A library used to read from an Azure Storage Queue. Use this package in combination with code samples for Azure Communication Services projects where Event Grid subscriptions from a storage queue are used.
$ dotnet add package AzureStorage.QueueService
This project abstracts away the complexities of using the Azure Storage account's queue feature targeting .NET 6 and higher.
You will need to create an Azure Storage account in the Azure portal using a unique name, then create a queue, and finally obtain your connection string.
BinaryData.ReceiveMessagesAsync<T> is called, however this is configurable.System.Text.Json deserialization behavior. This can be overridden by specifying your own JsonSerializerOptions as seen below.PeekMessages<T> which returns a collection but doesn't remove them from the queue.Add the Nuget package JasonShave.AzureStorage.QueueService to your .NET project
Set your ConnectionString and QueueName properties in your .NET User Secrets store, appsettings.json, or anywhere your IConfiguration provider can look for the QueueClientSettings. For example:
{
"QueueClientSettings" : {
"ConnectionString": "[your_connection_string]",
"QueueName": "[your_queue_name]",
"CreateIfNotExists": true
}
}
or set the endpoint URI which inclues the queue name and use a Token Credential during startup:
{
"QueueClientSettings": {
"EndpointUri": "https://somestorageaccount.queue.corewindows.net/myqueuename",
}
}
services.AddAzureStorageQueueClient(x =>
x.AddDefaultClient(y =>
{
y.EndpointUri = new Uri(builder.Configuration["SomeKey:EndpointUri"]),
y.TokenCredential = new DefaultAzureCredential(),
}));
You can create your queue in advance or allow the library to create it during runtime by setting the CreateIfNotExists property to true.
The library has been updated to handle both a default client and a named-client experience similar to how the .NET IHttpClientFactory works. This section outlines both ways you can configure dependency injection to either use the default client or a named client in case you have more than one queue to pull from.
Use the AddAzureStorageQueueClient() method and specify the settings for the ConnectionString and the QueueName or use the IConfiguration binder to bind with a JSON configuration as shown above.
services.AddAzureStorageQueueClient(x =>
x.AddDefaultClient(y =>
{
y.ConnectionString = "[your_connection_string]";
y.QueueName = "[your_queue_name]";
}));
// get configuration from IConfiguration binder
services.AddAzureStorageQueueClient(x =>
x.AddDefaultClient(y => Configuration.Bind(nameof(QueueClientSettings), y)));
Use the AddAzureStorageQueueClient() method with the AddClient() method to add and configure different queue clients which can be obtained using the IQueueClientFactory and the GetQueueClient() method.
services.AddAzureStorageQueueClient(x =>
x.AddClient("MyClient1", y =>
{
y.ConnectionString = "[your_connection_string]";
y.QueueName = "[your_queue_name]";
}));
// get configuration from IConfiguration binder
services.AddAzureStorageQueueClient(x =>
x.AddClient("MyClient1", y => Configuration.Bind(nameof(QueueClientSettings), y)));
// add multiple named clients and a default client
services.AddAzureStorageQueueClient(x =>
{
x.AddClient("MyClient1", y => Configuration.Bind(nameof(QueueClientSettings), y));
x.AddClient("MyClient2", y =>
{
y.ConnectionString = "[your_connection_string]";
y.QueueName = "[your_queue_name]";
});
x.AddDefaultClient(y => Configuration.Bind(nameof(DefaultQueueClientSettings), y));
});
// inject the IQueueClientFactory and get a default client
public class MyClass
{
private readonly AzureStorageQueueClient _queueClient;
public MyClass(IQueueClientFactory queueClientFactory)
{
_queueClient = queueClientFactory.GetQueueClient();
}
}
// inject the IQueueClientFactory and get a default client
public class MyClass
{
private readonly AzureStorageQueueClient _queueClient;
public MyClass(IQueueClientFactory queueClientFactory)
{
_queueClient = queueClientFactory.GetQueueClient("MyClient1");
}
}
The following example shows the .NET Worker Service template where the class uses the IHostedService interface to send a message every five seconds.
Inject the IQueueClientFactory interface and use as follows:
public class Sender : IHostedService
{
private readonly AzureStorageQueueClient _queueClient;
public Sender(IQueueClientFactory queueClientFactory) => _queueClient = queueClientFactory.GetQueueClient();
public async Task StartAsync(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
var myMessage = new MyMessage("Test");
await _queueClient.SendMessageAsync<MyMessage>(myMessage, cancellationToken);
await Task.Delay(5000);
}
}
}
The following example shows the .NET Worker Service template where the class uses the IHostedService interface to run a particular code block repeatedly. The application will receive the payload from the queue repeatedly.
Inject the IQueueClientFactory interface and use as follows:
public class MySubscriber : IHostedService
{
private readonly AzureStorageQueueClient _queueClient;
private readonly IMyMessageHandler _myMessageHandler; // see optional handler below
public MySubscriber(IQueueClientFactory queueClientFactory, IMyMessageHandler myMessageHandler)
{
// get the default queue client
_queueClient = queueClientFactory.GetQueueClient();
_myMessageHandler = myMessageHandler;
}
public async Task StartAsync(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
await _queueClient.ReceiveMessagesAsync<MyMessage>(
message => _myMessageHandler.HandleAsync(message),
exception => _myMessageHandler.HandleExceptionAsync(exception),
cancellationToken);
await Task.Delay(1000);
}
}
}
The library allows you to pull multiple messages by specifying the maxMessage count as an integer in the ReceiveMessagesAsync<T>() method. These are sent to the handler as individual messages but pulled from the queue as a batch the consuming application would hold a lock on for the default duration used in the Azure Storage Queue library.
await _queueClient.ReceiveMessagesAsync<MyMessage>(HandleMessage, HandleException, cancellationToken, 10);
The library supports OpenTelemetry logs, traces, and metrics and can be configured using the standard OTEL library as follows:
builder.Services.AddOpenTelemetry()
.WithTracing(t => t.AddAzureStorageQueueTracing())
.WithMetrics(m => m.AddAzureStorageQueueMetrics());
The library inclues two flags to control traces for sending messages and the creation of an Activity (span) for each query to retrieve a message. These two activities are:
NOTE: To observe the above spans, you need to configure the library to create them. Be cautious on enabling the
Queue.Readspan though as it creates a lot of data in your telemetry system; particularly if you read with a high frequency (i.e. every 1 second, etc.).
builder.Services.ConfigureQueueServiceTelemetry(t =>
{
t.CreateNewActivityOnMessageRetrieval = true; // Queue.Read trace
t.CreateNewActivityOnMessageSend = true; // Queue.Send trace
});
The following metrics have also been configured and read through tools like Azure Monitor or Grafana:
queue_messages_received_totalqueue_messages_sent_totalqueue_messages_processed_totalqueue_message_processing_duration_secondsThis project is licensed under the MIT License - see the LICENSE.md file for details.