EntityFrameworkCore implementation for key abstractions defined in [Epam.Kafka.PubSub](https://www.nuget.org/packages/Epam.Kafka.PubSub)
$ dotnet add package Epam.Kafka.PubSub.EntityFrameworkCoreEpam.Kafka.PubSub.EntityFrameworkCore package provides entity framework core implementation for key abstractions defined in Epam.Kafka.PubSub.
DbContextSubscriptionHandler<TKey, TValue, TContext>, DbContextEntitySubscriptionHandler<TKey, TValue, TContext TEntity> to help with implementation of ISubscriptionHandler<TKey, TValue> that read data from KAFKA topics and save it to database using DbContext.IExternalOffsetsStorage that store offsets in database via DbContext. TryAddKafkaDbContextState extension method to register it in IServiceCollection.DbContextPublicationHandler<TKey, TValue, TEntity, TContext> and DbContextEntityPublicationHandler<TKey, TValue, TEntity, TContext> to help with implementation of IPublicationHandler<TKey, TValue> that read data from database using DbContext and publish it to kafka topics.Prepare context.
public class SampleDbContext : DbContext, IKafkaStateDbContext
{
public DbSet<KafkaTopicState> KafkaTopicStates => this.Set<KafkaTopicState>();
protected override void OnModelCreating(ModelBuilder modelBuilder)
{
base.OnModelCreating(modelBuilder);
modelBuilder.AddKafkaState();
}
}
Register services and setup topic partition assignment and offsets storage for subscription.
services.TryAddKafkaDbContextState<SampleDbContext>();
KafkaBuilder kafkaBuilder = services.AddKafka();
kafkaBuilder.AddSubscription<string, KafkaEntity, SubscriptionHandlerSample>("Sample")
.WithSubscribeAndExternalOffsets();
Optionally derive entity from interface to use default state management.
IKafkaPublicationEntitypublic class SamplePublicationEntity : IKafkaPublicationEntity
{
public KafkaPublicationState KafkaPubState { get; set; }
public DateTime KafkaPubNbf { get; set; }
}
Create publication handler derived from DbContextEntityPublicationHandler<TKey, TValue, TEntity, TContext> for entity with default state management or from DbContextPublicationHandler<TKey, TValue, TEntity, TContext> for custom state management.
Setup publication
KafkaBuilder kafkaBuilder = services.AddKafka();
kafkaBuilder.AddPublication<string, KafkaEntity, SamplePublicationHandler>("Sample");