Enterprise observability for Rivulet.Core with EventListener wrappers, metric aggregators, and health check integration.
$ dotnet add package Rivulet.DiagnosticsEnterprise observability for Rivulet.Core with EventListener wrappers, metric aggregators, and health check integration.
dotnet add package Rivulet.Diagnostics
using Rivulet.Diagnostics;
using var listener = new RivuletConsoleListener();
await Enumerable.Range(1, 100)
.ToAsyncEnumerable()
.SelectParallelAsync(async x =>
{
await ProcessAsync(x);
return x;
}, new ParallelOptionsRivulet
{
MaxDegreeOfParallelism = 10
});
using var listener = new RivuletFileListener(
"metrics.log",
maxFileSizeBytes: 10 * 1024 * 1024 // 10MB
);
// Your parallel operations...
using var listener = new RivuletStructuredLogListener("metrics.json");
// Or with custom action
using var listener = new RivuletStructuredLogListener(json =>
{
// Send to your logging system
logger.LogInformation(json);
});
using var aggregator = new MetricsAggregator(TimeSpan.FromSeconds(10));
aggregator.OnAggregation += metrics =>
{
foreach (var metric in metrics)
{
Console.WriteLine($"{metric.DisplayName}:");
Console.WriteLine($" Min: {metric.Min:F2}");
Console.WriteLine($" Max: {metric.Max:F2}");
Console.WriteLine($" Avg: {metric.Average:F2}");
Console.WriteLine($" Current: {metric.Current:F2}");
}
};
// Your parallel operations...
using var exporter = new PrometheusExporter();
// Your parallel operations...
// Export to Prometheus format
var prometheusText = exporter.Export();
await File.WriteAllTextAsync("metrics.prom", prometheusText);
// Or get as dictionary
var metrics = exporter.ExportDictionary();
// In Program.cs or Startup.cs
// Step 1: Register PrometheusExporter (required dependency for health check)
builder.Services.AddSingleton<PrometheusExporter>();
// Step 2: Register health check
builder.Services.AddHealthChecks()
.AddCheck<RivuletHealthCheck>("rivulet", tags: new[] { "ready" });
// Step 3: Configure thresholds (optional)
builder.Services.Configure<RivuletHealthCheckOptions>(options =>
{
options.ErrorRateThreshold = 0.1; // 10% error rate triggers degraded status
options.FailureCountThreshold = 100; // 100 failures triggers unhealthy status
});
// Step 4: Add health check endpoint
app.MapHealthChecks("/health");
// Health check will return:
// - Healthy: Error rate below threshold and failures below threshold
// - Degraded: Error rate exceeds threshold
// - Unhealthy: Failure count exceeds threshold or unable to collect metrics
using var diagnostics = new DiagnosticsBuilder()
.AddConsoleListener()
.AddFileListener("metrics.log")
.AddStructuredLogListener("metrics.json")
.AddMetricsAggregator(TimeSpan.FromSeconds(10), metrics =>
{
// Handle aggregated metrics
})
.AddPrometheusExporter(out var exporter)
.Build();
// Your parallel operations...
// Export Prometheus metrics
var prometheusText = exporter.Export();
Rivulet.Diagnostics exposes the following metrics from Rivulet.Core:
// When creating RivuletHealthCheck manually (not recommended, use DI instead)
var exporter = new PrometheusExporter();
var healthCheck = new RivuletHealthCheck(exporter, new RivuletHealthCheckOptions
{
ErrorRateThreshold = 0.05, // 5% error rate triggers degraded
FailureCountThreshold = 50 // 50 failures triggers unhealthy
});
// Recommended: Use dependency injection (shown above in Health Check Integration)
RivuletHealthCheckOptions properties:
ErrorRateThreshold (double, default: 0.1)
FailureCountThreshold (long, default: 1000)
using var console = new RivuletConsoleListener();
using var file = new RivuletFileListener("metrics.log");
using var structured = new RivuletStructuredLogListener("metrics.json");
using var aggregator = new MetricsAggregator(TimeSpan.FromSeconds(5));
// All listeners will receive metrics simultaneously
using var listener = new RivuletStructuredLogListener(json =>
{
telemetryClient.TrackEvent("RivuletMetrics", new Dictionary<string, string>
{
["metrics"] = json
});
});
using var listener = new RivuletStructuredLogListener("metrics.json");
// Configure Filebeat to ship metrics.json to Elasticsearch
using var exporter = new PrometheusExporter();
// Expose metrics endpoint
app.MapGet("/metrics", () => exporter.Export());
MIT License - see LICENSE file for details