Chd (Cleverly Handle Difficulty) packages are easy to use. This package contains Kafka integration utilities and helper wrappers for producers and consumers using Confluent.Kafka, including configuration and retry patterns.
License
—
Deps
3
Install Size
—
Vulns
✓ 0
Published
Feb 19, 2026
$ dotnet add package Chd.Library.KafkaChd (Cleverly Handle Difficulty) library helps you cleverly handle difficulty, write code quickly, and keep your application stable.
Chd.Library.Kafka is a simple yet powerful .NET Core helper library for Apache Kafka.
It abstracts configuration, connection management and (de)serialization, making it easy to produce/consume messages using familiar DI patterns.
Leverages Confluent.Kafka and Newtonsoft.Json.
AddKafkaProducer, AddKafkaConsumer<T>(topic)IMessageProducer)appsettings.jsondotnet add package Chd.Library.Kafka
appsettings.json"Kafka": {
"Host": "127.0.0.1",
"Port": "9092"
}
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddKafkaProducer();
builder.Services.AddKafkaConsumer<EmailMessageConsumer>("topic1");
builder.Services.AddKafkaConsumer<NotificationConsumer>("topic2");
Define your message contract:
public class EmailMessage
{
public string Subject { get; set; }
public string Content { get; set; }
public string To { get; set; }
}
Inject the MessageProducer and send messages:
using Library.Kafka;
using Microsoft.AspNetCore.Mvc;
[ApiController]
[Route("kafka")]
public class KafkaTestController : ControllerBase
{
private readonly MessageProducer _messageProducer;
public KafkaTestController(MessageProducer messageProducer)
{
_messageProducer = messageProducer;
}
[HttpGet("AddMessagesToKafkaQueue")]
public string AddMessagesToKafkaQueue()
{
_messageProducer.Produce("topic1", new EmailMessage {
Subject = "Test1 Subject", Content = "Test1 Content", To = "test1@gmail.com"
});
System.Threading.Thread.Sleep(3000);
_messageProducer.Produce("topic2", new EmailMessage {
Subject = "Test2 Subject", Content = "Test2 Content", To = "test2@gmail.com"
});
System.Threading.Thread.Sleep(3000);
return "Ok";
}
}
Create a consumer by inheriting ConsumerServiceBase (for raw messages and errors):
using Library.Kafka;
using Confluent.Kafka;
public class EmailMessageConsumer : ConsumerServiceBase
{
public override void OnMessageDelivered(string message)
{
var email = Newtonsoft.Json.JsonConvert.DeserializeObject<EmailMessage>(message);
// handle email delivery logic here
}
public override void OnErrorOccured(Error error)
{
// handle/log error as needed
}
}
Then register and activate each consumer in your DI container:
builder.Services.AddKafkaConsumer<EmailMessageConsumer>("topic1");
Consumers run as background hosted services.
IHostedService background tasks.OnMessageDelivered and OnErrorOccured.builder.Services.AddKafkaConsumer<Consumer1>("topic1");
builder.Services.AddKafkaConsumer<Consumer2>("topic2");
// as many as needed
You may also build type-safe consumers by using message base generics:
using Library.Kafka;
using Newtonsoft.Json;
public class StrongTypedConsumer : ConsumerServiceBase
{
public override void OnMessageDelivered(string message)
{
var msgObj = JsonConvert.DeserializeObject<MyCustomMessage>(message);
// Now process the strongly-typed object...
}
public override void OnErrorOccured(Error error)
{
// Handle error
}
}
Override OnErrorOccured for logging, retry, or alerts:
public override void OnErrorOccured(Error error)
{
Console.WriteLine($"Kafka Error: {error.Reason}");
}
Producer
void Produce(string topic, object message)
Consumer
ConsumerServiceBase
void OnMessageDelivered(string message)void OnErrorOccured(Error error)Registration
services.AddKafkaProducer()services.AddKafkaConsumer<T>(string topic) — registers T as consumer for topic.See all contributors on NuGet.
For issues, contributions or feedback, see mehmet-yoldas/library-core