Atc.Kusto contains a .NET library designed to facilitate the execution of Kusto queries and commands within Azure Data Explorer environments
$ dotnet add package Atc.KustoAtc.Kusto is a .NET library designed to facilitate the execution of Kusto queries and commands within Azure Data Explorer environments/clusters.
The library provides a streamlined interface for handling Kusto operations, making it easier to retrieve and process data efficiently.
The library extends the official .NET SDK, and adds the following add-on functionality, which supports passing parameters and proper deserialization:
To seamlessly integrate Azure Data Explorer (Kusto) services into your application, you can utilize the provided ServiceCollection extension methods. These methods simplify the setup process and ensure that the Kusto services are correctly configured and ready to use within your application's service architecture.
The extension methods allow you to configure Kusto services using different approaches — explicit parameters, a pre-configured AtcKustoOptions instance, or an Action<AtcKustoOptions> delegate for dynamic configuration.
All methods ensure that the Kusto services are added to the application's service collection and configured according to the specified parameters, making them available throughout your application via dependency injection.
If you prefer to configure Kusto services with explicit values for the cluster's host address, database name, and credentials, you can use the following approach:
var builder = WebApplication.CreateBuilder(args);
builder.Services.ConfigureAzureDataExplorer(
new Uri(builder.Configuration["Kusto:HostAddress"]),
builder.Configuration["Kusto:DatabaseName"],
new DefaultAzureCredential());
When you already have a pre-configured AtcKustoOptions instance, you can directly pass it to the configuration method:
var builder = WebApplication.CreateBuilder(args);
var kustoOptions = new AtcKustoOptions
{
HostAddress = builder.Configuration["Kusto:HostAddress"],
DatabaseName = builder.Configuration["Kusto:DatabaseName"],
Credential = new DefaultAzureCredential(),
};
builder.Services.ConfigureAzureDataExplorer(kustoOptions);
For more flexibility, you can configure Kusto services using an Action delegate. This is particularly useful when you need to dynamically adjust settings during application startup:
var builder = WebApplication.CreateBuilder(args);
builder.Services.ConfigureAzureDataExplorer(options =>
{
options.HostAddress = builder.Configuration["Kusto:HostAddress"];
options.DatabaseName = builder.Configuration["Kusto:DatabaseName"];
options.Credential = new DefaultAzureCredential();
});
A Kusto query can be added by creating two files in your project:
.kusto script file containing the Kusto query itself (with "Build Action" set to "Embedded resource").kusto script.The .NET record should to derive from one of the following base types:
| Base type | Description |
|---|---|
KustoCommand | Used for Kusto commands that do not produce an output. |
KustoQuery<T> | Used for Kusto queries that returns a result. |
Note: The base types handles the loading of the embedded
.kustoscript file, passing of parameters and deserialization of the output._
To enable compile-time validation of .kusto files, you must configure your .csproj to include .kusto files as both embedded resources and additional files:
<ItemGroup>
<!-- Required for runtime: embeds .kusto files in the assembly -->
<EmbeddedResource Include="**/*.kusto" />
<!-- Required for compile-time validation: allows the analyzer to verify .kusto files exist -->
<AdditionalFiles Include="**/*.kusto" />
</ItemGroup>
What this enables:
Without the <AdditionalFiles> configuration, the compiler will report errors (ATCK301) even if the .kusto files exist, because the analyzer cannot see them during compilation.
Parameters are specified by adding them to record, and declare them at the top of the .kusto script, like this:
// file: GetTeamQuery.cs
public record GetTeamQuery(long TeamId)
: KustoScript, IKustoQuery<Team>
{
public Team? ReadResult(IDataReader reader)
=> reader.ReadObjects<Team>().FirstOrDefault();
}
// file: GetTeamQuery.kusto
declare query_parameters (
teamId:long)
;
Teams
| where entityId == teamId
| project
Id = tolong(payload.id),
Name = tostring(payload.name)
The query result is mapped to the specified output contract, by matching parameter names like this:
// file: Team.cs
public record Team(
string Id,
string Name);
Note: The above example in GetTeamQuery.cs is used to directly override the ReadResults method, if this is not needed, simply inherit directly from KustoQuery and accept the default implementation of the ReadResult method.
public record GetTeamQuery(long TeamId)
: KustoQuery<Team>;
The following examples demonstrate different types of queries, showcasing single result queries, list queries, and more complex queries with multiple result sets.
The following C# record is defined in the CustomerByIdQuery.cs file:
public record CustomerByIdQuery(long CustomerId)
: KustoQuery<Customer>;
The following KQL query is defined in the CustomerByIdQuery.kusto file:
declare query_parameters (
customerId:long
);
Customers
| where customerId == CustomerKey
| project
CustomerKey,
FirstName,
LastName,
CompanyName,
CityName,
StateProvinceName,
RegionCountryName,
ContinentName,
Gender,
MaritalStatus,
Education,
Occupation
The following C# record is defined in the CustomerSalesQuery.cs file:
public record CustomerSalesQuery
: KustoQuery<CustomerSales>;
The following KQL query is defined in the CustomerSalesQuery.kusto file:
Customers
| join kind=inner SalesFact on CustomerKey
| extend CustomerName = strcat(FirstName, ' ', LastName)
| summarize
SalesAmount = round(sum(SalesAmount), 2),
TotalCost = round(sum(TotalCost), 2)
by CustomerKey, CustomerName
The following C# record is defined in the CustomersSplitByGenderQuery.cs file:
public record CustomersSplitByGenderQuery
: KustoScript, IKustoQuery<CustomersByGender>
{
public CustomersByGender ReadResult(IDataReader reader)
=> new(
reader.ReadObjects<Customer>(),
reader.ReadObjectsFromNextResult<Customer>(),
reader.ReadObjectsFromNextResult<CustomerGenderCount>());
}
The following KQL query is defined in the CustomersSplitByGenderQuery.kusto file:
// Create materialized result with rows from customers
let customers = materialize(Customers
| project
CustomerKey,
FirstName,
LastName,
CompanyName,
CityName,
StateProvinceName,
RegionCountryName,
ContinentName,
Gender,
MaritalStatus,
Education,
Occupation)
;
// Female Customers
customers
| where Gender == "F"
;
// Male Customers
customers
| where Gender == "M"
;
// Customer count by gender
customers
| summarize Count = count() by Gender
Kusto scripts can be executed using the IKustoProcessor registered in the DI container, like this:
app.MapGet(
"/customers/{customerId}",
async static (
int customerId,
IKustoProcessor processor,
CancellationToken cancellationToken)
=> (IResult)(await processor.ExecuteQuery(
new CustomersQuery(customerId),
cancellationToken)
switch
{
[{ } customer] => TypedResults.Ok((object?)customer),
_ => TypedResults.NotFound(),
}))
.WithName("GetCustomerById")
.WithOpenApi();
The processor can also perform pagination by using the ExecutePagedQuery overload, taking in a session id, a continuation token and a max item count, like this:
app.MapGet(
"/customers",
async static (
[FromHeader(Name = "x-client-session-id")] string? sessionId,
[FromHeader(Name = "x-pageSize")] int? pageSize,
[FromHeader(Name = "x-continuation-token")] string? continuationToken,
IKustoProcessor processor,
CancellationToken cancellationToken)
=> await processor.ExecutePagedQuery(
new CustomersQuery(),
sessionId,
pageSize ?? 100,
continuationToken,
cancellationToken))
.WithName("GetCustomers")
.WithOpenApi();
The pageSize specifies how many items to return for each page. Each page is returned with a continuationToken that can be specified to fetch the next page.
The optional sessionId can be provided to optimize the use of storage on the ADX. If the same sessionId is specified for two calls they will share the underlying storage for pagination results.
Streaming queries allow you to process large result sets more efficiently by streaming results as they become available. Atc.Kusto provides two approaches for streaming:
Direct streaming yields rows immediately as they are processed from Kusto, providing the lowest latency and minimal memory usage. This approach is suitable when you need to process a large number of results as quickly as possible and don't require metadata about the query execution:
// Define your streaming query
public record CustomersStreamingQuery()
: KustoStreamingQuery<Customer>;
// Execute the streaming query and process results as they arrive
await foreach (var customer in kustoProcessor.ExecuteStreamingQuery(
new CustomersStreamingQuery(),
cancellationToken))
{
// Process each customer as it arrives
Console.WriteLine($"{customer.FirstName} {customer.LastName}");
}
Buffered streaming provides additional metadata like table schemas and completion information, while still allowing you to stream the results:
// Execute buffered streaming query
var streamingResult = await kustoProcessor.ExecuteBufferedStreamingQuery(
new CustomersStreamingQuery(),
cancellationToken);
// Access metadata if needed
Console.WriteLine($"Has errors: {streamingResult.Completion?.HasErrors}");
// Stream the results
await foreach (var customer in streamingResult.Rows.WithCancellation(cancellationToken))
{
// Process each customer
Console.WriteLine($"{customer.FirstName} {customer.LastName}");
}
In a web API scenario, you can return the stream directly to the client:
app.MapGet(
"/customers/stream",
(IKustoProcessorFactory processorFactory, CancellationToken cancellationToken) =>
Task.FromResult(processorFactory.Create("DatabaseName")
.ExecuteStreamingQuery(
new CustomersStreamingQuery(),
cancellationToken)))
.WithName("GetCustomersStream");
This returns a streamed response to the client, which can be processed as it arrives.
The library provides built-in health check support for Azure Data Explorer clusters, which can be easily integrated with ASP.NET Core's Health Checks API. This feature allows you to monitor the health of your Kusto clusters and integrate it with your application's monitoring infrastructure.
To add a Kusto cluster health check to your application, use the AddKustoHealthCheck extension method:
// Configure health checks
builder.Services
.AddHealthChecks()
.AddKustoHealthCheck(
name: "adx", // Optional: Name for the health check
connectionName: "DefaultConnection", // Optional: Connection name to use
databaseName: null, // Optional: Database name
tags: new[] { "adx", "database" }); // Optional: Tags
The health check will execute a .show diagnostics query against the cluster to retrieve health information.
The health check returns detailed health information about your Kusto cluster:
The health check maps cluster health to ASP.NET Core health statuses:
You can also use the IKustoHealthCheck interface directly in your code:
public class MyService
{
private readonly IKustoHealthCheck healthCheck;
public MyService(IKustoHealthCheck healthCheck)
{
this.healthCheck = healthCheck;
}
public async Task CheckClusterHealth()
{
var result = await healthCheck.CheckHealthAsync("MyConnection");
if (!result.IsHealthy)
{
// Handle unhealthy cluster scenario
Console.WriteLine($"Cluster unhealthy: {result.NotHealthyReason}");
}
}
}
For more details, check the health check sample.
See the sample api for an example on how to configure the Atc.Kusto library. Also see the sample console application for an example of utilizing the library directly without being wrapped in an API.
Both samples are querying the "ContosoSales" database of the Microsoft ADX sample cluster.
Atc.Kusto supports cooperative cancellation via CancellationToken for all query types. In addition to local cancellation, the library can also issue a server-side cancel control command so the running Kusto query is aborted in the cluster.
Server-side cancel is enabled by default and can be toggled per-call via options:
AtcQueryOptions.EnableServerSideCancellationAtcStreamingQueryOptions.EnableServerSideCancellationServer-side cancellation (default - recommended):
Local-only cancellation (opt-out):
When to use each mode:
Use server-side cancellation (default) for:
Consider local-only cancellation for:
Direct streaming with cancellation enabled (default):
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(1));
await foreach (var row in processor.ExecuteStreamingQuery(new CustomersStreamingQuery(), cts.Token))
{
// consume rows
}
Opt out of server-side cancellation:
var options = new AtcStreamingQueryOptions { EnableServerSideCancellation = false };
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(1));
await foreach (var row in processor.ExecuteStreamingQuery(new CustomersStreamingQuery(), options, cts.Token))
{
// consume rows
}
The API sample exposes endpoints to demonstrate cancellation behavior:
Query options can be configured per-call via AtcQueryOptions (for standard queries) or AtcStreamingQueryOptions (for streaming queries).
You can set a server-side query timeout using the QueryTimeout property. When set, this limits how long the Kusto cluster will execute your query before timing out:
var options = new AtcQueryOptions
{
QueryTimeout = TimeSpan.FromMinutes(10) // Extend timeout for long-running queries
};
var result = await processor.ExecuteQuery(
new LongRunningQuery(),
options,
cancellationToken);
For streaming queries:
var options = new AtcStreamingQueryOptions
{
QueryTimeout = TimeSpan.FromMinutes(10)
};
await foreach (var row in processor.ExecuteStreamingQuery(new LargeDataQuery(), options, cancellationToken))
{
// Process rows
}
Default behavior: When QueryTimeout is not set (null), the Kusto server default timeout of approximately 4 minutes applies.
Atc.Kusto supports opt-in OpenTelemetry tracing. When enabled, the library emits spans for query and command execution, which can be collected by any OpenTelemetry-compatible backend.
To enable tracing, add the Atc.Kusto activity source to your OpenTelemetry configuration:
using Atc.Kusto.Diagnostics;
builder.Services.AddOpenTelemetry()
.WithTracing(tracing => tracing
.AddSource(KustoDiagnostics.SourceName));
When enabled, you'll see spans for:
kusto.query - Standard query executionkusto.command - Command executionkusto.streaming - Streaming query executionEach span includes:
db.statement: The query or command textIf you don't add KustoDiagnostics.SourceName to your OpenTelemetry configuration, no activities are created and there is zero performance overhead.
A Roslyn analyzer is bundled with the Atc.Kusto NuGet package and provides compile-time validation of your Kusto queries. No separate package installation is required - the analyzer activates automatically when you reference Atc.Kusto.
The analyzer validates:
.kusto files exist and are properly configured| Rule | Severity | Description |
|---|---|---|
| ATCK301 | Error | Missing .kusto embedded resource file |
| ATCK302 | Error | Parameter count mismatch between C# and Kusto |
| ATCK303 | Error | Parameter type mismatch between C# and Kusto |
| ATCK304 | Error | Parameter order mismatch between C# and Kusto |
| ATCK305 | Warning | Empty .kusto script file |
| ATCK306 | Error | Projection field not found in result type |
| ATCK307 | Info | Result type property not projected |
| ATCK308 | Info | Missing final | project statement |
| ATCK309 | Warning | Projection field naming mismatch (snake_case vs PascalCase) |