Getting started with Confluent Kafka as a data professional – part 3
Overview
This is the 3rd part of an article series explaining how to publish and consume your own events using the Confluent Platform for Kafka, and the corresponding C# libraries. Because of the length, the article has been broken up into four pieces. The first part explains the context, the 2nd part explains the code for the publisher and this 3rd part goes through the consumer code. Finally in part 4 everything will be demonstrated in a video.
Sample code is provided in https://github.com/RoelantVos/confluent-data-object-mapping.
Creating this working example with the provided resources was a bit of trial-and-error in some cases. Especially the .Net support is very new at the time of writing, hence the idea to publish my experiences here.
Most of the code has been modified from the excellent set of examples provided by Confluent on https://github.com/confluentinc/confluent-kafka-dotnet and https://docs.confluent.io/current/clients/dotnet.html.
Consuming the events
The events we triggered when changing the Data Warehouse automation metadata are now available in the Kafka topic ‘dataObjectMappings’. The next step is to understand how we can subscribe to this topic and start receiving the available events as well as any new ones coming in.
The consumer code I have prepared uses the same global parameters as used in the publisher example (GlobalParameters class). It also reuses the connection files containing the API keys, although the Schema Registry is not necessary for the consumer to support (de)serialization.
Other than this, the consumer example is much simpler code-wise compared to the publisher.
The approach is to create an infinite loop, that can be quit using a predefined escape character (cancellation token, CTRL-C in this case). While in this ‘consume loop’, the application will be ready to receive any new events as soon as they are posted. The latency between events becoming available in the topic and being consumed by this specific consumer (group) will be visible later in the cloud environment.
Events are captured as part of a consumer group in Kafka, which is assigned by name in the code for this purpose. This is where offsets are managed amongst other things. If the end of the events is detected (i.e. the consumer is up to date) this will be reported back to the user. If new events were to arrive again after this these will still be picked up as normal.
For demonstration purposes, as events are consumed they are added to a local object – a list of Data Object Mappings (List<DataObjectMapping>). This is done to show that the deserialization is working correctly and can be used in downstream code.
The consumer code
The consumer code is displayed below. In addition to the functional explanation in the previous section it’s worth pointing out the deserialization into the expected object. This is set when defining the consumer, using the SetValueDeserializer, and it makes sure that the event is interpreted as a DataObjectMapping.
The result is returned back to the user based on a few properties of the DataObjectMapping; the name of the source- and target Data Object. This demonstrates that the serialization/deserialization has worked as intended.
static async Task Main()
{
// Setting up the configuration for the Kafka client and Schema registry, saved in a local file
GlobalParameters.clientConfig = await ConfluentHelper.LoadKafkaConfiguration(@"D:\Git_Repositories\confluent-configuration.txt", null);
List<DataObjectMapping> localMappingList = new List<DataObjectMapping>();
// Consumer group
var consumerConfig = new ConsumerConfig(GlobalParameters.clientConfig)
{
GroupId = "data-object-mapping-consumer",
AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = true,
EnablePartitionEof = true
};
// Cancellation input key (token) for quitting the process in async mode
CancellationTokenSource cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) =>
{
e.Cancel = true; // prevent the process from terminating.
cts.Cancel();
};
using (var consumer = new ConsumerBuilder<string, DataObjectMapping>(consumerConfig)
//.SetValueDeserializer(new JsonDeserializer<DataObjectMapping>().AsSyncOverAsync())
.SetValueDeserializer(new JsonDeserializer<DataObjectMapping>().AsSyncOverAsync())
.SetErrorHandler((_, e) => Console.WriteLine($"Error: {e.Reason}"))
.SetStatisticsHandler((_, json) => Console.WriteLine($"Statistics: {json}"))
.SetPartitionsAssignedHandler((c, partitions) =>
{
Console.WriteLine($"Assigned partitions: [{string.Join(", ", partitions)}]");
})
.SetPartitionsRevokedHandler((c, partitions) =>
{
Console.WriteLine($"Revoking assignment: [{string.Join(", ", partitions)}]");
})
.Build())
{
consumer.Subscribe(GlobalParameters.topics);
try
{
while (true)
{
try
{
var consumeResult = consumer.Consume(cts.Token);
if (consumeResult.IsPartitionEOF)
{
Console.WriteLine(
$"Reached end of topic {consumeResult.Topic}, partition {consumeResult.Partition}, offset {consumeResult.Offset}.");
continue;
}
Console.WriteLine($"Received message at {consumeResult.TopicPartitionOffset}: ${consumeResult.Message.Value.sourceDataObject.name}-{consumeResult.Message.Value.targetDataObject.name}");
localMappingList.Add(consumeResult.Message.Value);
}
catch (ConsumeException e)
{
Console.WriteLine($"Consume error: {e.Error.Reason}");
}
}
}
catch (OperationCanceledException)
{
Console.WriteLine("Closing consumer.");
consumer.Close();
}
}
Running the consumer example
As we run the consumer we can see that the events are picked up from the topic.

With a breakpoint in the code you can also see the objects available in memory, in the List<DataObjectMapping>. The content can now be used in subsequent processes in the application.

Finally, on the cloud platform the consumer group has been created and it is visible what the lag is between events produced to the topic and this specific consumer catching up. Because the offsets are recorded against the consumer group we cannot restart the application and receive the same events – although with some effort it’s possible to reset the offsets again.

Hopefully this will help someone getting started using the .Net libraries for Confluent Kafka!
Tomorrow I will create a quick video showing how all of this works in a quick demonstration.