In this article I will show the fast way to read data from database and sent to Kafka topic without storing in memory.

The fastest way would be use the Stream Processors in Kafka, but for this moment (January 2024) there is no support Stream Processors in Kafka for .NET.

All code examples you can find here.

First thing you have to do is create new database and source table in SQL Server. To do it please execute script in Sql Management Studio.

USE [master]
GO

CREATE DATABASE [Kafka]
 CONTAINMENT = NONE
 ON  PRIMARY 
( NAME = N'Kafka', FILENAME = N'C:\Program Files\Microsoft SQL Server\MSSQL15.SQLEXPRESS\MSSQL\DATA\Kafka.mdf' , SIZE = 8192KB , MAXSIZE = UNLIMITED, FILEGROWTH = 65536KB )
 LOG ON 
( NAME = N'Kafka_log', FILENAME = N'C:\Program Files\Microsoft SQL Server\MSSQL15.SQLEXPRESS\MSSQL\DATA\Kafka_log.ldf' , SIZE = 8192KB , MAXSIZE = 2048GB , FILEGROWTH = 65536KB )
 WITH CATALOG_COLLATION = DATABASE_DEFAULT
GO
ALTER DATABASE [Kafka] SET COMPATIBILITY_LEVEL = 150
GO
IF (1 = FULLTEXTSERVICEPROPERTY('IsFullTextInstalled'))
begin
EXEC [Kafka].[dbo].[sp_fulltext_database] @action = 'enable'
end
GO
ALTER DATABASE [Kafka] SET ANSI_NULL_DEFAULT OFF 
GO
ALTER DATABASE [Kafka] SET ANSI_NULLS OFF 
GO
ALTER DATABASE [Kafka] SET ANSI_PADDING OFF 
GO
ALTER DATABASE [Kafka] SET ANSI_WARNINGS OFF 
GO
ALTER DATABASE [Kafka] SET ARITHABORT OFF 
GO
ALTER DATABASE [Kafka] SET AUTO_CLOSE OFF 
GO
ALTER DATABASE [Kafka] SET AUTO_SHRINK OFF 
GO
ALTER DATABASE [Kafka] SET AUTO_UPDATE_STATISTICS ON 
GO
ALTER DATABASE [Kafka] SET CURSOR_CLOSE_ON_COMMIT OFF 
GO
ALTER DATABASE [Kafka] SET CURSOR_DEFAULT  GLOBAL 
GO
ALTER DATABASE [Kafka] SET CONCAT_NULL_YIELDS_NULL OFF 
GO
ALTER DATABASE [Kafka] SET NUMERIC_ROUNDABORT OFF 
GO
ALTER DATABASE [Kafka] SET QUOTED_IDENTIFIER OFF 
GO
ALTER DATABASE [Kafka] SET RECURSIVE_TRIGGERS OFF 
GO
ALTER DATABASE [Kafka] SET  DISABLE_BROKER 
GO
ALTER DATABASE [Kafka] SET AUTO_UPDATE_STATISTICS_ASYNC OFF 
GO
ALTER DATABASE [Kafka] SET DATE_CORRELATION_OPTIMIZATION OFF 
GO
ALTER DATABASE [Kafka] SET TRUSTWORTHY OFF 
GO
ALTER DATABASE [Kafka] SET ALLOW_SNAPSHOT_ISOLATION OFF 
GO
ALTER DATABASE [Kafka] SET PARAMETERIZATION SIMPLE 
GO
ALTER DATABASE [Kafka] SET READ_COMMITTED_SNAPSHOT OFF 
GO
ALTER DATABASE [Kafka] SET HONOR_BROKER_PRIORITY OFF 
GO
ALTER DATABASE [Kafka] SET RECOVERY SIMPLE 
GO
ALTER DATABASE [Kafka] SET  MULTI_USER 
GO
ALTER DATABASE [Kafka] SET PAGE_VERIFY CHECKSUM  
GO
ALTER DATABASE [Kafka] SET DB_CHAINING OFF 
GO
ALTER DATABASE [Kafka] SET FILESTREAM( NON_TRANSACTED_ACCESS = OFF ) 
GO
ALTER DATABASE [Kafka] SET TARGET_RECOVERY_TIME = 60 SECONDS 
GO
ALTER DATABASE [Kafka] SET DELAYED_DURABILITY = DISABLED 
GO
ALTER DATABASE [Kafka] SET ACCELERATED_DATABASE_RECOVERY = OFF  
GO
ALTER DATABASE [Kafka] SET QUERY_STORE = OFF
GO
USE [Kafka]
GO

SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
CREATE TABLE [dbo].[KafkaMessages](
	[Id] [int] IDENTITY(1,1) NOT NULL,
	[Message] [nvarchar](max) NOT NULL,
 CONSTRAINT [PK_KafkaMessages] PRIMARY KEY CLUSTERED 
(
	[Id] ASC
)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON, OPTIMIZE_FOR_SEQUENTIAL_KEY = OFF) ON [PRIMARY]
) ON [PRIMARY] TEXTIMAGE_ON [PRIMARY]
GO
SET IDENTITY_INSERT [dbo].[KafkaMessages] ON 
GO
INSERT [dbo].[KafkaMessages] ([Id], [Message]) VALUES (1, N'1')
GO
INSERT [dbo].[KafkaMessages] ([Id], [Message]) VALUES (2, N'2')
GO
INSERT [dbo].[KafkaMessages] ([Id], [Message]) VALUES (3, N'3')
GO
INSERT [dbo].[KafkaMessages] ([Id], [Message]) VALUES (4, N'4')
GO
INSERT [dbo].[KafkaMessages] ([Id], [Message]) VALUES (5, N'5')
GO
INSERT [dbo].[KafkaMessages] ([Id], [Message]) VALUES (6, N'6')
GO
INSERT [dbo].[KafkaMessages] ([Id], [Message]) VALUES (7, N'7')
GO
INSERT [dbo].[KafkaMessages] ([Id], [Message]) VALUES (8, N'8')
GO
INSERT [dbo].[KafkaMessages] ([Id], [Message]) VALUES (9, N'9')
GO
INSERT [dbo].[KafkaMessages] ([Id], [Message]) VALUES (10, N'10')
GO
SET IDENTITY_INSERT [dbo].[KafkaMessages] OFF
GO
USE [master]
GO
ALTER DATABASE [Kafka] SET  READ_WRITE 
GO

When you have your database we can install required nugget packages:
– Confluent.Kafka
– Dapper
– System.Data.SqlClient

Now you can start our Kafka with zookeeper using docker-compose. You can find here, how to start docker-compose and offset explorer .

First thing you will code, is fast reading data from Kafka without data buffering using Dapper. To do it we will create new Database Repository.

using Dapper;
using System.Data.SqlClient;

namespace KafkaDatabaseProducer
{
    internal class DatabaseRepository
    {
        public async IAsyncEnumerable<DatabaseMessage> GetMessages()
        {
            var query = "SELECT [Message] FROM [Kafka].[dbo].[KafkaMessages]";
            var connectionString = "Server=Win10\\SQLEXPRESS;Database=Kafka;User=sa;Password=Password!";

            using var connection = new SqlConnection(connectionString);
            using var reader = await connection.ExecuteReaderAsync(query);
            var rowParser = reader.GetRowParser<DatabaseMessage>();

            while (await reader.ReadAsync())
            {
                ////yield to avoid store data in memory
                yield return rowParser(reader);
            }
        }
    }

    internal class DatabaseMessage
    {
        public string Message { get; set; }
    }
}

The second thing to do, is to create Kafka Producer for sending messages from database. So let’s create your Kafka producer.
Important thing to do is to not await for delivery result, but check them all on sending finish!

using Confluent.Kafka;

namespace KafkaDatabaseProducer
{
    internal class KafkaProducer : IDisposable
    {
        IProducer<Null, string> _producer;
        ICollection<Task<DeliveryResult<Null, string>>> _deliveryResults = new List<Task<DeliveryResult<Null, string>>>();

        public KafkaProducer()
        {
            var config = new ProducerConfig { BootstrapServers = "localhost:29092" };
            IProducer<Null, string> _producer = new ProducerBuilder<Null, string>(config).Build();
        }

        public void SendMessage(string message)
        {
            var task = _producer.ProduceAsync("Test", new Message<Null, string> { Value = message });
            _deliveryResults.Add(task);
        }

        public async Task CheckSending() 
        {
            var results = await Task.WhenAll(_deliveryResults);

            if (results.Any(x => x.Status != PersistenceStatus.Persisted))
            {
                Console.WriteLine("Some messages has been not delivered!");
            }
        }

        public void Dispose()
        {
            _producer.Dispose();
        }
    }
}

Now you can write main logic of application.

using KafkaDatabaseProducer;

internal class Program
{
    public static async Task Main(string[] args)
    {
        DatabaseRepository databaseRepository = new DatabaseRepository();
        KafkaProducer kafkaProducer = new KafkaProducer();

        await foreach (var data in databaseRepository.GetMessages())
        {
            kafkaProducer.SendMessage(data.Message);
        }

        await kafkaProducer.CheckSending();

        Console.ReadKey();
    }
}

After run the application, you can see something like this.

You can also check if your messages has been correctly persisted on Kafka.

Happy coding!

2 Comments

Leave a Reply