Rico Suter's blog.
 


In one of my projects we are ingesting lots of device events with Azure Event Hubs. In the given scenario it is important to process the events from one device in sequence. In our initial implementation we just read a batch of events from the partition and process the events within the batch in sequence. However at some point we reached a limit and we couldn’t use all CPU resources of the machines we used - mainly because half the work of one event is I/O bound.

If you process each partition stricly sequentially then your maximum parallelization is the number of partitions.

So what can we do? Of course the simplest solution is to increase the number of partitions in the Event Hub - but what if you already reached the maximum number of partitions (e.g. 32) or you cannot change the partition count because it is a running system?

A general solution to this works as follows: The partition consumers reads a batch of events and then processes this batch as fast and efficient as possible, maybe not strictly sequentially.

How can we improve this processing in our scenario? Instead of processing all events in sequence we process the events in the batch in parallel but in sequential groups by device. This way we can parallelize within the batch and at the same time ensure that events from one device are processed in sequence.

The implementation of this is quite simple:

  • Group the events by partition key and put the groups into a queue
  • Spawn multiple threads and process the groups in parallel but sequentially within the groups

This functionality is implemented in the open-source library Namotion.Messaging (in the library events are the same as messages):

public static async Task ProcessByPartitionKeyAsync<TMessage, TPartitionKey>(
    this IEnumerable<Message> messages,
    Func<Message, TMessage> transform,
    Func<TMessage, TPartitionKey> partitionKeySelector,
    Func<IEnumerable<TMessage>, CancellationToken, Task> processPartitionMessages,
    int partitionParallelization = 4,
    CancellationToken cancellationToken = default)
{
    var deserializedMessages = transform != null ? messages
        .Select(m => transform(m))
        .AsParallel()
            .AsOrdered()
        .ToArray() : messages.Cast<TMessage>().ToArray();

    var batchPartitionsQueue = new ConcurrentQueue<TMessage[]>(
        deserializedMessages
            .GroupBy(partitionKeySelector)
            .Select(g => g.ToArray())
            .ToArray());

    var tasks = Enumerable
        .Range(0, Math.Min(deserializedMessages.Length, partitionParallelization))
        .Select(i => Task.Run(async () =>
        {
            while (batchPartitionsQueue.TryDequeue(out var batchPartition))
            {
                await processPartitionMessages(batchPartition, cancellationToken);
            }
        }, cancellationToken));

    await Task.WhenAll(tasks);
}

MessageEnumerableExtensions.cs

These extension methods can now be easily used in a event receiver background service:

public class MyBackgroundService : BackgroundService
{
    private const int PartitionParallelization = 4;
    private readonly IMessageReceiver receiver;

    public MyBackgroundService(IMessageReceiver receiver)
    {
        this.receiver = receiver;
    }

    protected override async Task ExecuteAsync(CancellationToken cancellationToken)
    {
        await receiver.ListenWithRetryAsync(ProcessMessagesAsync, logger, cancellationToken);
    }

    private async Task ProcessMessagesAsync(IEnumerable<Message> allMessages, CancellationToken cancellationToken)
    {
        await allMessages.ProcessByPartitionKeyAsync(
            m => DeserializeMessage(m.Content),
            m => m.DeviceId,
            async (messages, ct) =>
            {
                foreach (var message in messages)
                {
                    ... // TODO: Process messages
                }
            }, PartitionParallelization, cancellationToken);

        await receiver.ConfirmAsync(allMessages, cancellationToken);
    }

    private DeviceMessage DeserializeMessage(Message message)
    {
        return ...; // TODO: Deserialize message
    }
}


Discussion