Getting started with Confluent Kafka as a data professional – part 2
Overview
This is the 2nd part of an article series that intends to explain how to publish and consume your own events using the Confluent Platform for Kafka, and the corresponding C# libraries. Because of the length the article has been broken up into four pieces, with the first part explaining the context and this 2nd part explaining the code itself for the publisher. Part 3 will explain the consumer side, and finally in part 4 everything will be demonstrated in a video.
Sample code is provided in https://github.com/RoelantVos/confluent-data-object-mapping.
Creating this working example with the provided resources was a bit of trial-and-error in some cases, hence the idea to publish my experiences here. Most of the code has been modified from the excellent set of examples provided by Confluent on https://github.com/confluentinc/confluent-kafka-dotnet and https://docs.confluent.io/current/clients/dotnet.html.
The intent – programmatically trigger event publication
To create my own publication / consumer services using Kakfa I wanted to be able to use a custom schema. This is because I want to control not only when and how events are published, but also how these can be consumed by upstream applications. Also, most examples online are using (arguably easier) string messages and not complex classes.
This is slightly different from using a (3rd party) connector, of which there are many available. However – connectors to databases tend to have the feature/issue that they control the table in the database directly and in my setup I wanted to avoid this, to understand what options there are.
Of course, Kafka decouples consumers from publishers through the topic and you can use different connectors for both publication and subscription.
In essence, what I wanted to test is how one can influence the exact point (in code) when events can be triggered and not retrieve the information from some database. The ability to do this is a huge benefit, because for us data people, it allows to match the business (process) model to the data model. What I mean by this is that we can actually record the data points directly from the (execution of the) business process itself, and through this set up an accurate representation of what happens.
This means coding, and it is what I attempted to replicate in a relatively easy example.
Functionally, I am publishing changes in code generation metadata to a topic called ‘dataObjectMappings’. Basically, every time something changes in the design metadata I want to publish these changes so upstream consumers can adapt the Data Warehouse and simulate automated refactoring of the structures and ETL logic.
The definition for a ‘dataObjectMapping’ is part of the generic interface for automation metadata and essentially describes a source-to-target mapping.
In light of the recent graph model and templating engine updates this supports that the changes made in the graph can be published to a Kafka topic, which can then be used directly by the consuming automation software (VDW) to generate the code for a Data Warehouse.
The setup – Confluent Cloud and C#
To create the example I have registered an account with the Confluent Cloud and created a cluster. That’s it. Pretty much everything else is done via code in this example.

To be able to connect to this environment API keys must be setup as well, for both the cluster and the Schema Registry. This is very straightforward and well documented here, so I won’t go into further details.
The API keys are what is needed to connect from the code, so the next step is to create a Visual Studio solution and add the Confluent.Kafka libraries via NuGet. As mentioned at the start of the post, the example solution can be downloaded and modified so this is only required when creating a new solution.
The following libraries have been added to the Visual Studio solution:
- Confluent.Kafka, for interaction with the clusters, topics etc.
- Confluent.SchemaRegistry, for reading/writing against the Schema Registry.
- Confluent.SchemaRegistry.Serdes.Json, to support custom class serialisation & deserialisation.
In the sample solution I have created a publisher project, a consumer project and a 3rd helper library containing shared functionality.
Publishing events
The publishing service is a console application, and the Main() method is displayed below. I use a couple of global parameters for reusability. These contain the configuration settings needed to connect to the cluster as well as the name of the topic(s) to work with.
Note that many Confluent .Net methods require an array of topics as input hence the definition of the topic as an array.
internal static class GlobalParameters
{
// Kafka configuration settings
internal static ClientConfig clientConfig { get; set; }
internal static SchemaRegistryConfig schemaRegistryConfig { get; set; }
// Array of topics to work with in this example
internal static string[] topics { get; } = new string[] {"dataObjectMappings"};
}
static async Task Main()
{
// Setting up the configuration for the Kafka client and Schema registry, saved in a local file
GlobalParameters.clientConfig = await ConfluentHelper.LoadKafkaConfiguration(@"D:\Git_Repositories\confluent-configuration.txt", null);
GlobalParameters.schemaRegistryConfig = await ConfluentHelper.LoadSchemaRegistryConfiguration(@"D:\Git_Repositories\schemaregistry-configuration.txt");
// Clear topic, if existing (reset environment)
await ConfluentHelper.DeleteTopic(GlobalParameters.topics[0], GlobalParameters.clientConfig);
// Create topic, if not existing yet
await ConfluentHelper.CreateTopicIfNotExists(GlobalParameters.topics[0], 1, 3, GlobalParameters.clientConfig);
// Start a file watcher to monitor input directory for mapping Json files
// Event handles on file detection trigger the publishing actions
WatchForFiles();
// Start waiting until Escape is pressed
Console.WriteLine("Press ESC to quit.");
do
{
while (!Console.KeyAvailable)
{
// Wait for anything to happen in the designated directory for the file watcher
}
} while (Console.ReadKey(true).Key != ConsoleKey.Escape);
// END OF APPLICATION
}
What happens here is that at when the application is started, the Kafka and Schema Registry configurations are loaded from a file and stored in memory (in the customer ‘global parameters’ class).
These configuration files need to be present and correct. They can be created using a simple text editor and of course the location needs to be updated in the code.
Confluent configuration file structure:
# Kafka
bootstrap.servers=<your confluent server - see cluster)
security.protocol=SASL_SSL
sasl.mechanisms=PLAIN
sasl.username=<API access key>
sasl.password=<API access password>
Schema registry configuration file structure:
# Confluent Cloud Schema Registry
Url=<Schema Registry endpoint>
BasicAuthCredentialsSource=USER_INFO
BasicAuthUserInfo=<Schema Registry API key>:<Schema Registry API password>
MaxCachedSchemas=10
When the configuration is available, the existing topics are removed and re-created to facilitate easy retesting. This can be commented out if desired.
Now that the topics are in place, a file watcher process is started that will monitor a designated directory for any file changes.
This is how this example will work; any changes made in the files in this directory will be published as events conforming the dataObjectMapping schema. This will continue to happen until the Escape key is pressed, after which the application will end.
The file watcher has been created with a few event handlers that actually perform the publish action (one of these is shown below).
private static void WatchForFiles()
{
string publicationPath = AppDomain.CurrentDomain.BaseDirectory + @"\examples-publication";
if (!Directory.Exists(publicationPath))
{
Directory.CreateDirectory(publicationPath);
}
// Object initialiser
FileSystemWatcher fileSystemWatcher = new FileSystemWatcher
{
Path = publicationPath,
Filter = "*.json",
EnableRaisingEvents = true
};
// Event handles for the file watcher
fileSystemWatcher.Created += FileSystemWatcher_Created;
fileSystemWatcher.Changed += FileSystemWatcher_Changed;
fileSystemWatcher.Deleted += FileSystemWatcher_Deleted;
fileSystemWatcher.Renamed += FileSystemWatcher_Renamed;
Console.Write("Listening for new or updated files.");
}
private static async void FileSystemWatcher_Created(object sender, FileSystemEventArgs e)
{
Console.WriteLine($"A new file {e.Name} has been detected.");
await DeserializeMappingFile(e);
}
The ‘DeserializeMappingFile’ method is called every time a file is created (and modified, deleted or renamed), and the name of the file in question in passed as variable when called.
This is important, because the event of a file modification itself is not enough to work with. The contents of the file need to be loaded from disk into memory – into a DataObjectMappingList object (part of the generic interface for automation again).
Now that the file contents have been loaded into a list of mappings, the program can iterate over this list and publish every DataObjectMapping from this list to Kafka.
private static async Task DeserializeMappingFile(FileSystemEventArgs e)
{
var jsonInput = File.ReadAllText(e.FullPath);
DataObjectMappingList deserialisedMapping = JsonConvert.DeserializeObject<DataObjectMappingList>(jsonInput);
foreach (DataObjectMapping individualMapping in deserialisedMapping.dataObjectMappingList)
{
await PublishEvent(individualMapping, GlobalParameters.schemaRegistryConfig, GlobalParameters.clientConfig,
GlobalParameters.topics);
}
}
We finally arrive at the publish step itself (code below). The actual publish action is done in the context of the schema registry configuration (which is set to automatically update in this case). This will create or update the schema version linked to the events in the topic, supporting a mechanism called schema evolution.
Schema evolution manages the schema for an event / message based on the event offset (the unique event sequence value essentially) and is specifically designed to manage changes in format over time.
In the code as shown below new schemas or changes to existing schemas for the given topic will automatically be updated, ideal for demonstration (though not necessarily production).
public static async Task PublishEvent(DataObjectMapping dataObjectMapping, SchemaRegistryConfig schemaRegistryConfig, ClientConfig kafkaConfig, string[] topics)
{
using (var schemaRegistry = new CachedSchemaRegistryClient(schemaRegistryConfig))
{
//using (var producer = new Producer<string, string>(config, new StringSerializer(Encoding.UTF8), new StringSerializer(Encoding.UTF8)))
// Produce events to the topic
var producer = new ProducerBuilder<string, DataObjectMapping>(kafkaConfig)
.SetValueSerializer(
new JsonSerializer<DataObjectMapping>(schemaRegistry, new JsonSerializerConfig { BufferBytes = 100 })
)
.Build();
var localMessage = new Message<string, DataObjectMapping>();
localMessage.Key = "DataObjectMapping";
localMessage.Value = dataObjectMapping;
// Synchronous producer, does not work with Json serialisation
//producer.Produce(topics[0], localMessage, SyncHandler);
// Create asynchronous task (and wait for it)
var delivery = producer.ProduceAsync(topics[0], localMessage);
await delivery.ContinueWith(AsyncHandler);
producer.Flush(TimeSpan.FromSeconds(10));
}
return;
}
The key line in this code segment is this one:
var producer = new ProducerBuilder<string, DataObjectMapping>(kafkaConfig)
.SetValueSerializer(
new JsonSerializer<DataObjectMapping>(schemaRegistry, new JsonSerializerConfig {BufferBytes = 100 })
)
.Build();
What happens here is that the incoming event value (i.e. the DataObjectMapping) is set to be serialized as a DataObjectMapping object. The JsonSerializer needs to specifically be set for this, and is the reason we need the Schema Registry and SerDes libraries. In other words, we are using the standard Json serializer which will send a byte string to the topic.
This is what we need to do to send custom objects/classes to a Kafka topic. Initially I ran into many issues correctly using the Json serializer (arguably AVRO formats would have been supported more broadly) and I even created my own serializer.
However, with v1.5 of the .Net library these issues seem to be resolved so I have reverted to the above – much simpler – example.
When we will consume these events we need to convert them back from the bytestring to a DataObjectMapping object, for which we will use the corresponding Json deserializer.
The event is then published with these last lines of code. The value of the event (message) is the payload – the actual dataObjectMapping.
var localMessage = new Message<string, DataObjectMapping>();
localMessage.Key = "DataObjectMapping";
localMessage.Value = dataObjectMapping;
// Create asynchronous task (and wait for it)
var delivery = producer.ProduceAsync(topics[0], localMessage);
await delivery.ContinueWith(AsyncHandler);
Running the publish example
When we run the program it will recreate the topic and wait for any changes in files to occur.

Please note that in debug mode the application runs in a separate directory, so we have to navigate here to test things out by copying a file from the provided examples directory into the directory (drag and drop) that is monitored by the file watcher process.
The result looks like this:

The copied file ‘HUB_CUSTOMER.json’, contains all mappings to populate the Virtual Data Warehouse Core Business Concept object ‘Customer’. In technical terms, the Json file contains a DataObjectMappingList with four DataObjectMappings. As per our code these are published as individual events.
In the Kafka cluster a new topic is created containing these events:

Last, but not least we can now also view the corresponding schema in the Schema Registry, which has been correctly set to the generic interface for automation.

About synchronous and asynchronous processing
This example shows an asynchronous process, which in .Net uses the Task object to execute things on independent threads. Processes are executed separately as a result and may have different durations as a result.
It is also possible to produce events using a synchronous process, but according to the documentation this has a large negative impact on throughput and should only used in high-concurrency scenarios. Some more information on this is available on https://www.confluent.io/blog/designing-the-net-api-for-apache-kafka/.
I couldn’t get the Json serializer to work with the synchronous processing (let’s face it, sending metadata changes is not going to be a performance bottleneck!), and I suspect this is a bug in the library. It did work fine for <string,string> type messages but not with <string,<custom class>> type scenarios as would be needed for a DataObjectMapping.
However, the asynchronous processing works fine for this use-case too.