In this article you can find out how to create Kafka consumer using .Net 8

All examples you can find on my github

First thing you have to do, is install kafka with zookeeper on your computer. To do it please have a look to this article.

Go to Visual Studio and create new project in .net 8. Right click on you project and create add new package.

Install Confluent.Kafka lib + Microsoft.Extensions.Hosting

Add new class KafkaConsumerBackgroundWorker

using Microsoft.Extensions.Hosting;

namespace KafkaConsumer
{
    internal class KafkaConsumerBackgroundWorker : BackgroundService
    {
        protected override Task ExecuteAsync(CancellationToken stoppingToken)
        {
            Console.WriteLine("Execute backgound worker");
            return Task.CompletedTask;
        }
    }
}

Register newly added class in Program.cs

using KafkaConsumer;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;

HostApplicationBuilder builder = Host.CreateApplicationBuilder(args);

builder.Services.AddHostedService<KafkaConsumerBackgroundWorker>();
using IHost host = builder.Build();

await host.RunAsync();

When you run application, you can see, that your background worker has been executed.

No we are ready to implement Kafka consumer.

Start your kafka. You can see how to do it in this article

Go to your KafkaConsumerBackgroundWorker.cs and write following code.

using Confluent.Kafka;
using Microsoft.Extensions.Hosting;

namespace KafkaConsumer
{
    internal class KafkaConsumerBackgroundWorker : BackgroundService
    {
        protected override Task ExecuteAsync(CancellationToken stoppingToken)
        {
            var conf = new ConsumerConfig
            {
                GroupId = "MyGroupId",
                BootstrapServers = "localhost:29092",
                AutoOffsetReset = AutoOffsetReset.Earliest
            };

            using (var c = new ConsumerBuilder<Ignore, string>(conf).Build())
            {
                c.Subscribe("Test");

                CancellationTokenSource cts = new CancellationTokenSource();
                Console.CancelKeyPress += (_, e) => {
                    e.Cancel = true;
                    cts.Cancel();
                };

                try
                {
                    while (true)
                    {
                        try
                        {
                            var result = c.Consume(cts.Token);
                            Console.WriteLine($"Consumed message '{result.Message.Value}' at: '{result.TopicPartitionOffset}'.");

                            //Do whatever you want with your message
                        }
                        catch (ConsumeException e)
                        {
                            Console.WriteLine($"Error occured: {e.Error.Reason}");
                        }
                    }
                }
                catch (OperationCanceledException)
                {
                    c.Close();
                }
            }
            return Task.CompletedTask;
        }
    }
}

Some explanations:
GroupId is id of your consumer. If you have 1 consumer you can put any string. When you have more then one consumers, then consumers with the same id will be consume messages alternately – first message – first consumer, second message – second consumer etc. so you can use your Kafka for load balancing. Anyway if you want make your services to consume messages in the same time, you have to create more partition on topic.
If your consumers has different id, then each consumer will consume the same massage.

Now you can test your application. Run the application and go to offset explorer to send the message. You can find here how to install offset explorer.

You can see on your application, that the message has been consumed correctly.

Important note.
Your message must be processed in 300s. After this time your consumer will be marked as offline. This time can be changed by parameter “MaxPollIntervalMs” in consumer config. I recommend to process message in separated thread/process for example using Hangfire.

2 Comments

Leave a Reply