Core of .NET suite for Apache Kafka. KNet is a comprehensive .NET suite for Apache Kafka providing all features: Producer, Consumer, Admin, Streams, Connect, backends (KRaft).
$ dotnet add package MASES.KNetTo use KNet classes the developer can write code in .NET using the same classes available in the official Apache Kafka™ package. If classes or methods are not available yet it is possible to use the approach synthetized in What to do if an API was not yet implemented
KNet accepts many command-line switches to customize its behavior. The full list is available at Command line switch page.
One of the most important command-line switch is JVMPath and it is available in JCOBridge switches: it can be used to set-up the location of the JVM™ library if JCOBridge is not able to identify a suitable JRE/JDK installation. If a developer is using KNet within its own product it is possible to override the JVMPath property with a snippet like the following one:
class MyKNetCore : KNetCore
{
public override string JVMPath
{
get
{
string pathToJVM = "Set here the path to JVM library or use your own search method";
return pathToJVM;
}
}
}
[!IMPORTANT]
pathToJVMshall be escaped
string pathToJVM = "C:\\Program Files\\Eclipse Adoptium\\jdk-11.0.18.10-hotspot\\bin\\server\\jvm.dll";string pathToJVM = @"C:\Program Files\Eclipse Adoptium\jdk-11.0.18.10-hotspot\bin\server\jvm.dll";
JCOBridge try to identify a suitable JRE/JDK installation within the system using some standard mechanism of JRE/JDK: JAVA_HOME environment variable or Windows registry if available.
However it is possible, on Windows operating systems, that the library raises an InvalidOperationException: Missing Java Key in registry: Couldn't find Java installed on the machine.
This means that neither nor Windows registry contains information about a default installed JRE/JDK: some vendors may not setup them.
If the developer/user encounter this condition can do the following steps:
JAVA_HOMEset | findstr JAVA_HOME and verify the result;JAVA_HOME environment variable is not set at system level, but at a different level like user level which is not visible from the KNet process that raised the exception;JAVA_HOME at system level e.g. JAVA_HOME=C:\Program Files\Eclipse Adoptium\jdk-11.0.18.10-hotspot\;JCOBRIDGE_JVMPath at system level e.g. JCOBRIDGE_JVMPath=C:\Program Files\Eclipse Adoptium\jdk-11.0.18.10-hotspot\.[!IMPORTANT]
- One of
JCOBRIDGE_JVMPathorJAVA_HOMEenvironment variables or Windows registry (on Windows OSes) shall be availableJCOBRIDGE_JVMPathenvironment variable takes precedence overJAVA_HOMEand Windows registry: you can setJCOBRIDGE_JVMPathtoC:\Program Files\Eclipse Adoptium\jdk-11.0.18.10-hotspot\bin\server\jvm.dlland avoid to overrideJVMPathin your code- After first initialization steps,
JVMPathtakes precedence overJCOBRIDGE_JVMPath/JAVA_HOMEenvironment variables or Windows registry
KNet uses an embedded JVM™ through JNet/JCOBridge, however JVM™ initialization is incompatible with CET because the code used to identify CPU try to modify the return address and this is considered from CET a violation: see this comment.
From .NET 9 preview 6, CET is enabled by default on supported hardware when the final stage produce an executable artifact, i.e. the csproj file contains <OutputType>Exe</OutputType>.
If the application, upon startup, fails with the error 0xc0000409 (subcode 0x30) it was compiled with CET enabled and it fails during JVM™ initialization.
To solve the issue there are four possible solutions:
<PropertyGroup Condition="'$(TargetFramework)' == 'net9.0'">
<!--see https://learn.microsoft.com/en-us/dotnet/core/compatibility/interop/9.0/cet-support-->
<CETCompat>false</CETCompat>
</PropertyGroup>
dotnet app host, as reported in https://github.com/masesgroup/JCOBridgePublic/issues/7#issuecomment-2550031946, with a syntax like: dotnet MyApplication.dll
instead of the classic:
MyApplication.exe
reg add "HKEY_LOCAL_MACHINE\SOFTWARE\Microsoft\Windows NT\CurrentVersion\Image File Execution Options\MyApplication.exe" /v MitigationOptions /t REG_BINARY /d "0000000000000000000000000000002000" /f
then run:
MyApplication.exe
Use the following to enable again CET:
reg delete "HKEY_LOCAL_MACHINE\SOFTWARE\Microsoft\Windows NT\CurrentVersion\Image File Execution Options\MyApplication.exe" /v MitigationOptions /f
Below the reader can found two different version of producer examples.
A basic producer can be like the following one:
using MASES.KNet;
using Org.Apache.Kafka™.Clients.Producer;
using Java.Util;
using System;
using System.Threading;
namespace MASES.KNetTemplate.KNetProducer
{
class Program
{
const string theServer = "localhost:9092";
const string theTopic = "myTopic";
static string serverToUse = theServer;
static string topicToUse = theTopic;
static readonly ManualResetEvent resetEvent = new ManualResetEvent(false);
static void Main(string[] args)
{
KNetCore.CreateGlobalInstance();
var appArgs = KNetCore.FilteredArgs;
if (appArgs.Length != 0)
{
serverToUse = args[0];
}
/**** Direct mode ******
Properties props = new Properties();
props.Put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, serverToUse);
props.Put(ProducerConfig.ACKS_CONFIG, "all");
props.Put(ProducerConfig.RETRIES_CONFIG, 0);
props.Put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.Put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.Put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
******/
Properties props = ProducerConfigBuilder.Create()
.WithBootstrapServers(serverToUse)
.WithAcks(ProducerConfig.Acks.All)
.WithRetries(0)
.WithLingerMs(1)
.WithKeySerializerClass("org.apache.kafka.common.serialization.StringSerializer")
.WithValueSerializerClass("org.apache.kafka.common.serialization.StringSerializer")
.ToProperties();
Console.CancelKeyPress += Console_CancelKeyPress;
Console.WriteLine("Press Ctrl-C to exit");
using (KafkaProducer producer = new KafkaProducer(props))
{
int i = 0;
while (!resetEvent.WaitOne(0))
{
var record = new ProducerRecord<string, string>(topicToUse, i.ToString(), i.ToString());
var result = producer.Send(record);
Console.WriteLine($"Producing: {record} with result: {result.Get()}");
producer.Flush();
i++;
}
}
}
private static void Console_CancelKeyPress(object sender, ConsoleCancelEventArgs e)
{
if (e.Cancel) resetEvent.Set();
}
}
}
The example above can be found in the templates package. Its behavior is:
A producer with Callback can be like the following one. In this example the reader can highlight a slightly difference from the corresponding Java™ code. Surf JVM callbacks to go into detail in the callback management from JVM™.
using MASES.KNet;
using Org.Apache.Kafka™.Clients.Producer;
using Java.Util;
using System;
using System.Threading;
namespace MASES.KNetTemplate.KNetProducer
{
class Program
{
const string theServer = "localhost:9092";
const string theTopic = "myTopic";
static string serverToUse = theServer;
static string topicToUse = theTopic;
static readonly ManualResetEvent resetEvent = new ManualResetEvent(false);
static void Main(string[] args)
{
KNetCore.CreateGlobalInstance();
var appArgs = KNetCore.FilteredArgs;
if (appArgs.Length != 0)
{
serverToUse = args[0];
}
/**** Direct mode ******
Properties props = new Properties();
props.Put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, serverToUse);
props.Put(ProducerConfig.ACKS_CONFIG, "all");
props.Put(ProducerConfig.RETRIES_CONFIG, 0);
props.Put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.Put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.Put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
******/
Properties props = ProducerConfigBuilder.Create()
.WithBootstrapServers(serverToUse)
.WithAcks(ProducerConfig.Acks.All)
.WithRetries(0)
.WithLingerMs(1)
.WithKeySerializerClass("org.apache.kafka.common.serialization.StringSerializer")
.WithValueSerializerClass("org.apache.kafka.common.serialization.StringSerializer")
.ToProperties();
Console.CancelKeyPress += Console_CancelKeyPress;
Console.WriteLine("Press Ctrl-C to exit");
using (KafkaProducer producer = new KafkaProducer(props))
{
int i = 0;
using (var callback = new Callback((o1, o2) =>
{
if (o2 != null) Console.WriteLine(o2.ToString());
else Console.WriteLine($"Produced on topic {o1.Topic} at offset {o1.Offset}");
}))
{
while (!resetEvent.WaitOne(0))
{
var record = new ProducerRecord<string, string>(topicToUse, i.ToString(), i.ToString());
var result = producer.Send(record, callback);
Console.WriteLine($"Producing: {record} with result: {result.Get()}");
producer.Flush();
i++;
}
}
}
}
private static void Console_CancelKeyPress(object sender, ConsoleCancelEventArgs e)
{
if (e.Cancel) resetEvent.Set();
}
}
}
The example above can be found in the templates package. Its behavior is:
A basic consumer can be like the following one:
using MASES.KNet;
using Org.Apache.Kafka™.Clients.Consumer;
using Java.Util;
using System;
namespace MASES.KNetTemplate.KNetConsumer
{
class Program
{
const string theServer = "localhost:9092";
const string theTopic = "myTopic";
static string serverToUse = theServer;
static string topicToUse = theTopic;
static readonly ManualResetEvent resetEvent = new ManualResetEvent(false);
static void Main(string[] args)
{
KNetCore.CreateGlobalInstance();
var appArgs = KNetCore.FilteredArgs;
if (appArgs.Length != 0)
{
serverToUse = args[0];
}
/**** Direct mode ******
Properties props = new Properties();
props.Put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, serverToUse);
props.Put(ConsumerConfig.GROUP_ID_CONFIG, "test");
props.Put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.Put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.Put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.Put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
*******/
Properties props = ConsumerConfigBuilder.Create()
.WithBootstrapServers(serverToUse)
.WithGroupId("test")
.WithEnableAutoCommit(true)
.WithAutoCommitIntervalMs(1000)
.WithKeyDeserializerClass("org.apache.kafka.common.serialization.StringDeserializer")
.WithValueDeserializerClass("org.apache.kafka.common.serialization.StringDeserializer")
.ToProperties();
Console.CancelKeyPress += Console_CancelKeyPress;
Console.WriteLine("Press Ctrl-C to exit");
using (var consumer = new KafkaConsumer<string, string>(props))
{
var topics = Collections.Singleton(topicToUse);
consumer.Subscribe(topics);
while (!resetEvent.WaitOne(0))
{
var records = consumer.Poll((long)TimeSpan.FromMilliseconds(200).TotalMilliseconds);
foreach (var item in records)
{
Console.WriteLine($"Offset = {item.Offset}, Key = {item.Key}, Value = {item.Value}");
}
}
topics?.Dispose(); // needed to avoid Java.Lang.NullPointerException in some conditions where .NET GC retires topics too early
}
}
private static void Console_CancelKeyPress(object sender, ConsoleCancelEventArgs e)
{
if (e.Cancel) resetEvent.Set();
}
}
}
The example above can be found in the templates package. Its behavior is: