Core of .NET suite for Apache Kafka. KNet is a comprehensive .NET suite for Apache Kafka providing all features: Producer, Consumer, Admin, Streams, Connect, backends (KRaft).
$ dotnet add package MASES.KNetTo use KNet classes the developer can write code in .NET using the same classes available in the official Apache Kafka™ package. If classes or methods are not available yet it is possible to use the approach synthetized in What to do if an API was not yet implemented
KNet accepts many command-line switches to customize its behavior. The full list is available at Command line switch page.
One of the most important command-line switch is JVMPath and it is available in JCOBridge switches: it can be used to set-up the location of the JVM library if JCOBridge is not able to identify a suitable JRE/JDK installation. If a developer is using KNet within its own product it is possible to override the JVMPath property with a snippet like the following one:
class MyKNetCore : KNetCore
{
public override string JVMPath
{
get
{
string pathToJVM = "Set here the path to JVM library or use your own search method";
return pathToJVM;
}
}
}
IMPORTANT NOTE: pathToJVM shall be escaped
string pathToJVM = "C:\\Program Files\\Eclipse Adoptium\\jdk-11.0.18.10-hotspot\\bin\\server\\jvm.dll";string pathToJVM = @"C:\Program Files\Eclipse Adoptium\jdk-11.0.18.10-hotspot\bin\server\jvm.dll";JCOBridge try to identify a suitable JRE/JDK installation within the system using some standard mechanism of JRE/JDK: JAVA_HOME environment variable or Windows registry if available.
However it is possible, on Windows operating systems, that the library raises an InvalidOperationException: Missing Java Key in registry: Couldn't find Java installed on the machine.
This means that neither nor Windows registry contains information about a default installed JRE/JDK: some vendors may not setup them.
If the developer/user encounter this condition can do the following steps:
JAVA_HOMEset | findstr JAVA_HOME and verify the result;JAVA_HOME environment variable is not set at system level, but at a different level like user level which is not visible from the KNet process that raised the exception;JAVA_HOME at system level e.g. JAVA_HOME=C:\Program Files\Eclipse Adoptium\jdk-11.0.18.10-hotspot\;JCOBRIDGE_JVMPath at system level e.g. JCOBRIDGE_JVMPath=C:\Program Files\Eclipse Adoptium\jdk-11.0.18.10-hotspot\.IMPORTANT NOTES:
JCOBRIDGE_JVMPath or JAVA_HOME environment variables or Windows registry (on Windows OSes) shall be availableJCOBRIDGE_JVMPath environment variable takes precedence over JAVA_HOME and Windows registry: you can set JCOBRIDGE_JVMPath to C:\Program Files\Eclipse Adoptium\jdk-11.0.18.10-hotspot\bin\server\jvm.dll and avoid to override JVMPath in your codeJVMPath takes precedence over JCOBRIDGE_JVMPath/JAVA_HOME environment variables or Windows registryBelow the reader can found two different version of producer examples.
A basic producer can be like the following one:
using MASES.KNet;
using Org.Apache.Kafka™.Clients.Producer;
using Java.Util;
using System;
using System.Threading;
namespace MASES.KNetTemplate.KNetProducer
{
class Program
{
const string theServer = "localhost:9092";
const string theTopic = "myTopic";
static string serverToUse = theServer;
static string topicToUse = theTopic;
static readonly ManualResetEvent resetEvent = new ManualResetEvent(false);
static void Main(string[] args)
{
KNetCore.CreateGlobalInstance();
var appArgs = KNetCore.FilteredArgs;
if (appArgs.Length != 0)
{
serverToUse = args[0];
}
/**** Direct mode ******
Properties props = new Properties();
props.Put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, serverToUse);
props.Put(ProducerConfig.ACKS_CONFIG, "all");
props.Put(ProducerConfig.RETRIES_CONFIG, 0);
props.Put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.Put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.Put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
******/
Properties props = ProducerConfigBuilder.Create()
.WithBootstrapServers(serverToUse)
.WithAcks(ProducerConfig.Acks.All)
.WithRetries(0)
.WithLingerMs(1)
.WithKeySerializerClass("org.apache.kafka.common.serialization.StringSerializer")
.WithValueSerializerClass("org.apache.kafka.common.serialization.StringSerializer")
.ToProperties();
Console.CancelKeyPress += Console_CancelKeyPress;
Console.WriteLine("Press Ctrl-C to exit");
using (KafkaProducer producer = new KafkaProducer(props))
{
int i = 0;
while (!resetEvent.WaitOne(0))
{
var record = new ProducerRecord<string, string>(topicToUse, i.ToString(), i.ToString());
var result = producer.Send(record);
Console.WriteLine($"Producing: {record} with result: {result.Get()}");
producer.Flush();
i++;
}
}
}
private static void Console_CancelKeyPress(object sender, ConsoleCancelEventArgs e)
{
if (e.Cancel) resetEvent.Set();
}
}
}The example above can be found in the templates package. Its behavior is:
A producer with Callback can be like the following one. In this example the reader can highlight a slightly difference from the corresponding Java code. Surf JVM callbacks to go into detail in the callback management from JVM.
using MASES.KNet;
using Org.Apache.Kafka™.Clients.Producer;
using Java.Util;
using System;
using System.Threading;
namespace MASES.KNetTemplate.KNetProducer
{
class Program
{
const string theServer = "localhost:9092";
const string theTopic = "myTopic";
static string serverToUse = theServer;
static string topicToUse = theTopic;
static readonly ManualResetEvent resetEvent = new ManualResetEvent(false);
static void Main(string[] args)
{
KNetCore.CreateGlobalInstance();
var appArgs = KNetCore.FilteredArgs;
if (appArgs.Length != 0)
{
serverToUse = args[0];
}
/**** Direct mode ******
Properties props = new Properties();
props.Put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, serverToUse);
props.Put(ProducerConfig.ACKS_CONFIG, "all");
props.Put(ProducerConfig.RETRIES_CONFIG, 0);
props.Put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.Put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.Put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
******/
Properties props = ProducerConfigBuilder.Create()
.WithBootstrapServers(serverToUse)
.WithAcks(ProducerConfig.Acks.All)
.WithRetries(0)
.WithLingerMs(1)
.WithKeySerializerClass("org.apache.kafka.common.serialization.StringSerializer")
.WithValueSerializerClass("org.apache.kafka.common.serialization.StringSerializer")
.ToProperties();
Console.CancelKeyPress += Console_CancelKeyPress;
Console.WriteLine("Press Ctrl-C to exit");
using (KafkaProducer producer = new KafkaProducer(props))
{
int i = 0;
using (var callback = new Callback((o1, o2) =>
{
if (o2 != null) Console.WriteLine(o2.ToString());
else Console.WriteLine($"Produced on topic {o1.Topic} at offset {o1.Offset}");
}))
{
while (!resetEvent.WaitOne(0))
{
var record = new ProducerRecord<string, string>(topicToUse, i.ToString(), i.ToString());
var result = producer.Send(record, callback);
Console.WriteLine($"Producing: {record} with result: {result.Get()}");
producer.Flush();
i++;
}
}
}
}
private static void Console_CancelKeyPress(object sender, ConsoleCancelEventArgs e)
{
if (e.Cancel) resetEvent.Set();
}
}
}The example above can be found in the templates package. Its behavior is:
A basic consumer can be like the following one:
using MASES.KNet;
using Org.Apache.Kafka™.Clients.Consumer;
using Java.Util;
using System;
namespace MASES.KNetTemplate.KNetConsumer
{
class Program
{
const string theServer = "localhost:9092";
const string theTopic = "myTopic";
static string serverToUse = theServer;
static string topicToUse = theTopic;
static readonly ManualResetEvent resetEvent = new ManualResetEvent(false);
static void Main(string[] args)
{
KNetCore.CreateGlobalInstance();
var appArgs = KNetCore.FilteredArgs;
if (appArgs.Length != 0)
{
serverToUse = args[0];
}
/**** Direct mode ******
Properties props = new Properties();
props.Put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, serverToUse);
props.Put(ConsumerConfig.GROUP_ID_CONFIG, "test");
props.Put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.Put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.Put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.Put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
*******/
Properties props = ConsumerConfigBuilder.Create()
.WithBootstrapServers(serverToUse)
.WithGroupId("test")
.WithEnableAutoCommit(true)
.WithAutoCommitIntervalMs(1000)
.WithKeyDeserializerClass("org.apache.kafka.common.serialization.StringDeserializer")
.WithValueDeserializerClass("org.apache.kafka.common.serialization.StringDeserializer")
.ToProperties();
Console.CancelKeyPress += Console_CancelKeyPress;
Console.WriteLine("Press Ctrl-C to exit");
using (var consumer = new KafkaConsumer<string, string>(props))
{
var topics = Collections.Singleton(topicToUse);
consumer.Subscribe(topics);
while (!resetEvent.WaitOne(0))
{
var records = consumer.Poll((long)TimeSpan.FromMilliseconds(200).TotalMilliseconds);
foreach (var item in records)
{
Console.WriteLine($"Offset = {item.Offset}, Key = {item.Key}, Value = {item.Value}");
}
}
topics?.Dispose(); // needed to avoid Java.Lang.NullPointerException in some conditions where .NET GC retires topics too early
}
}
private static void Console_CancelKeyPress(object sender, ConsoleCancelEventArgs e)
{
if (e.Cancel) resetEvent.Set();
}
}
}The example above can be found in the templates package. Its behavior is: